Personal Blog

develop

异步协程中的Semaphore和BoundedSemaphore

介绍python3中异步协程中的Semaphore和BoundedSemaphore的使用方法

1. asyncio.Semaphore是什么?

在 Python 的 asyncio 模块中,asyncio.Semaphore 异步信号量,是一种同步原语,用于控制并发协程的数量,避免资源过载。在 asyncio 中,它基于以下核心机制:

  1. 计数器机制:维护一个内部计数器,表示可用的”许可”数量
  2. 等待队列:当没有可用许可时,协程会被放入等待队列,遵循 FIFO 公平性。
  3. Lock:互斥锁,保护计数器和等待队列的原子操作,避免多协程的竞态条件。
  4. 关联的事件循环:与 asyncio 事件循环深度集成,支持异步等待 Semaphore比较简单,可以理解为带计数器的Lock。

核心概念

asyncio.Semaphore 主要解决 “多个协程基于竞争资源状态的读写后,可以同时多个协程同时工作” 的场景。Semaphore内部也包含一个锁(默认是 asyncio.Lock) 和一个等待队列:

  • 锁用于保护共享资源的访问(确保状态修改的原子性)。
  • 等待队列用于存放因未获得计数器许可而暂停的协程。注意,因为是队列,所以是FIFO。意味着如果多个协程被唤醒的顺序也是先进先出。
  • 和asyncio.Lock的区别是:
    • Semaphore 的锁维护了一个计数器,用于控制对竞争资源的并发访问的同时,还提供了数量的限制。

主要方法

  1. init(self, value=1):如asyncio.Semaphore(3),初始化信号量对象。初始可用的 “信号量名额”(即最大并发数),默认值为 1
  2. acquire():获取信号量名额。
    • 若计数器有可用名额(value > 0),直接占用一个名额(value -= 1)并立刻返回
    • 若计数器无可用名额(value == 0),当前任务会被加入等待队列,进入挂起状态,直到其他任务释放名额。
  3. release()释放内部锁,并且将计数器加1,将名额空闲出来给其他协程使用。
  4. locked():返回当前的Semaphore是否已经耗尽名额而处于锁定状态。

2. 举个栗子来说明

import asyncio
import logging

# 配置日志格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)


# 定义一个需要控制并发的协程
async def limited_task(sem, task_id):
    # 通过async with自动获取和释放许可
    async with sem:
        logging.info(f"任务 {task_id} 开始执行")
        await asyncio.sleep(2)  # 模拟耗时操作(如网络请求)
        logging.info(f"任务 {task_id} 执行完成")

async def main():
    # 初始化信号量,最多允许3个并发协程
    sem = asyncio.Semaphore(3)
    
    # 创建10个任务(协程)
    tasks = [limited_task(sem, i) for i in range(10)]
    
    # 等待所有任务完成
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

Log输出如下:

18:47:28 - MainThread - INFO - 任务 0 开始执行
18:47:28 - MainThread - INFO - 任务 1 开始执行
18:47:28 - MainThread - INFO - 任务 2 开始执行
18:47:30 - MainThread - INFO - 任务 0 执行完成
18:47:30 - MainThread - INFO - 任务 1 执行完成
18:47:30 - MainThread - INFO - 任务 2 执行完成
18:47:30 - MainThread - INFO - 任务 3 开始执行
18:47:30 - MainThread - INFO - 任务 4 开始执行
18:47:30 - MainThread - INFO - 任务 5 开始执行
18:47:32 - MainThread - INFO - 任务 3 执行完成
18:47:32 - MainThread - INFO - 任务 4 执行完成
18:47:32 - MainThread - INFO - 任务 5 执行完成
18:47:32 - MainThread - INFO - 任务 6 开始执行
18:47:32 - MainThread - INFO - 任务 7 开始执行
18:47:32 - MainThread - INFO - 任务 8 开始执行
18:47:34 - MainThread - INFO - 任务 6 执行完成
18:47:34 - MainThread - INFO - 任务 7 执行完成
18:47:34 - MainThread - INFO - 任务 8 执行完成
18:47:34 - MainThread - INFO - 任务 9 开始执行
18:47:36 - MainThread - INFO - 任务 9 执行完成

3. 结果说明

由结果可知:

  1. 主协程开始后,初始化计数器为3的信号量sem,然后开始执行。
  2. 创建10个协程,每个协程都调用limited_task函数,该函数通过async with自动获取和释放信号量的许可。这里就保证了当有多个协程任务在loop中等待运行时,通过信号量控制了最大的并发数。
  3. 使用asyncio.gather(*tasks)等待所有任务完成。

4. asyncio.BoundedSemaphore又是什么?

BoundedSemaphore和Semaphore类似,但是BoundedSemaphore有最大值限制。BoundedSemaphore的初始化参数为value和bound,bound为最大值限制。

  1. 错误检测能力
    • BoundedSemaphore 能检测到 “过度释放” 的逻辑错误(如多调用了 release())
    • Semaphore 对过度释放不做检查,可能导致许可数无限增长,掩盖程序错误
  2. 许可计数规则
    • Semaphore:调用 release() 时会无条件增加许可数,即使已经超过初始值,通过该方法可以增加许可数,但是不推荐这种做法
    • BoundedSemaphore:如果释放操作会导致许可数超过初始值,会立即抛出 ValueError

4.1 举个栗子来说明

import asyncio

async def test_semaphore():
    sem = asyncio.Semaphore(2)  # 初始2个许可
    await sem.acquire()
    sem.release()  # 正常释放:1→2
    sem.release()  # 过度释放:2→3(不会报错)
    print(f"Semaphore 许可数: {sem._value}")  # 输出 3

async def test_bounded_semaphore():
    bsem = asyncio.BoundedSemaphore(2)  # 初始2个许可
    await bsem.acquire()
    bsem.release()  # 正常释放:1→2
    try:
        bsem.release()  # 过度释放:会抛出错误
    except ValueError as e:
        print(f"BoundedSemaphore 错误: {e}")  # 输出错误信息

async def main():
    await test_semaphore()
    await test_bounded_semaphore()

asyncio.run(main())

Log输出如下:

10:22:16 - MainThread - INFO - Semaphore 许可数: 3
10:22:16 - MainThread - ERROR - BoundedSemaphore 错误: BoundedSemaphore released too many times

4.2 结果说明

  1. test_semaphore()函数中,Semaphore的许可数会从2变为3,然后过度释放,不会报错。
  2. test_bounded_semaphore()函数中,BoundedSemaphore会检测到过度释放,并抛出错误。

4.3 使用建议

  1. 无论是semaphore还是bounded semaphore,都像使用bounded semaphore一样使用。不要过度释放许可数。所以尽可能的使用async with的语法,来保证获取和释放许可的数量是一一对应的。
  2. 如果你确信要使用过度释放的特性,那请严格监控sem._value的值,因为很可能因为BUG,导致许可数无限增长。后果就是程序无法限制并发数,semaphore用了和没有用是一样的。

DEVELOP · ASYNCIO
develop python3 async coroutine