develop
异步协程中的Semaphore和BoundedSemaphore
1. asyncio.Semaphore是什么?
在 Python 的 asyncio 模块中,asyncio.Semaphore 异步信号量,是一种同步原语,用于控制并发协程的数量,避免资源过载。在 asyncio 中,它基于以下核心机制:
- 计数器机制:维护一个内部计数器,表示可用的”许可”数量
- 等待队列:当没有可用许可时,协程会被放入等待队列,遵循 FIFO 公平性。
- Lock:互斥锁,保护计数器和等待队列的原子操作,避免多协程的竞态条件。
- 关联的事件循环:与 asyncio 事件循环深度集成,支持异步等待 Semaphore比较简单,可以理解为带计数器的Lock。
核心概念
asyncio.Semaphore 主要解决 “多个协程基于竞争资源状态的读写后,可以同时多个协程同时工作” 的场景。Semaphore内部也包含一个锁(默认是 asyncio.Lock) 和一个等待队列:
- 锁用于保护共享资源的访问(确保状态修改的原子性)。
- 等待队列用于存放因未获得计数器许可而暂停的协程。注意,因为是队列,所以是FIFO。意味着如果多个协程被唤醒的顺序也是先进先出。
- 和asyncio.Lock的区别是:
- Semaphore 的锁维护了一个计数器,用于控制对竞争资源的并发访问的同时,还提供了数量的限制。
主要方法
- init(self, value=1):如asyncio.Semaphore(3),初始化信号量对象。初始可用的 “信号量名额”(即最大并发数),默认值为 1
- acquire():获取信号量名额。
- 若计数器有可用名额(value > 0),直接占用一个名额(value -= 1)并立刻返回
- 若计数器无可用名额(value == 0),当前任务会被加入等待队列,进入挂起状态,直到其他任务释放名额。
- release()释放内部锁,并且将计数器加1,将名额空闲出来给其他协程使用。
- locked():返回当前的Semaphore是否已经耗尽名额而处于锁定状态。
2. 举个栗子来说明
import asyncio
import logging
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# 定义一个需要控制并发的协程
async def limited_task(sem, task_id):
# 通过async with自动获取和释放许可
async with sem:
logging.info(f"任务 {task_id} 开始执行")
await asyncio.sleep(2) # 模拟耗时操作(如网络请求)
logging.info(f"任务 {task_id} 执行完成")
async def main():
# 初始化信号量,最多允许3个并发协程
sem = asyncio.Semaphore(3)
# 创建10个任务(协程)
tasks = [limited_task(sem, i) for i in range(10)]
# 等待所有任务完成
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Log输出如下:
18:47:28 - MainThread - INFO - 任务 0 开始执行
18:47:28 - MainThread - INFO - 任务 1 开始执行
18:47:28 - MainThread - INFO - 任务 2 开始执行
18:47:30 - MainThread - INFO - 任务 0 执行完成
18:47:30 - MainThread - INFO - 任务 1 执行完成
18:47:30 - MainThread - INFO - 任务 2 执行完成
18:47:30 - MainThread - INFO - 任务 3 开始执行
18:47:30 - MainThread - INFO - 任务 4 开始执行
18:47:30 - MainThread - INFO - 任务 5 开始执行
18:47:32 - MainThread - INFO - 任务 3 执行完成
18:47:32 - MainThread - INFO - 任务 4 执行完成
18:47:32 - MainThread - INFO - 任务 5 执行完成
18:47:32 - MainThread - INFO - 任务 6 开始执行
18:47:32 - MainThread - INFO - 任务 7 开始执行
18:47:32 - MainThread - INFO - 任务 8 开始执行
18:47:34 - MainThread - INFO - 任务 6 执行完成
18:47:34 - MainThread - INFO - 任务 7 执行完成
18:47:34 - MainThread - INFO - 任务 8 执行完成
18:47:34 - MainThread - INFO - 任务 9 开始执行
18:47:36 - MainThread - INFO - 任务 9 执行完成
3. 结果说明
由结果可知:
- 主协程开始后,初始化计数器为3的信号量sem,然后开始执行。
- 创建10个协程,每个协程都调用limited_task函数,该函数通过async with自动获取和释放信号量的许可。这里就保证了当有多个协程任务在loop中等待运行时,通过信号量控制了最大的并发数。
- 使用asyncio.gather(*tasks)等待所有任务完成。
4. asyncio.BoundedSemaphore又是什么?
BoundedSemaphore和Semaphore类似,但是BoundedSemaphore有最大值限制。BoundedSemaphore的初始化参数为value和bound,bound为最大值限制。
- 错误检测能力
- BoundedSemaphore 能检测到 “过度释放” 的逻辑错误(如多调用了 release())
- Semaphore 对过度释放不做检查,可能导致许可数无限增长,掩盖程序错误
- 许可计数规则
- Semaphore:调用 release() 时会无条件增加许可数,即使已经超过初始值,通过该方法可以增加许可数,但是不推荐这种做法
- BoundedSemaphore:如果释放操作会导致许可数超过初始值,会立即抛出 ValueError
4.1 举个栗子来说明
import asyncio
async def test_semaphore():
sem = asyncio.Semaphore(2) # 初始2个许可
await sem.acquire()
sem.release() # 正常释放:1→2
sem.release() # 过度释放:2→3(不会报错)
print(f"Semaphore 许可数: {sem._value}") # 输出 3
async def test_bounded_semaphore():
bsem = asyncio.BoundedSemaphore(2) # 初始2个许可
await bsem.acquire()
bsem.release() # 正常释放:1→2
try:
bsem.release() # 过度释放:会抛出错误
except ValueError as e:
print(f"BoundedSemaphore 错误: {e}") # 输出错误信息
async def main():
await test_semaphore()
await test_bounded_semaphore()
asyncio.run(main())
Log输出如下:
10:22:16 - MainThread - INFO - Semaphore 许可数: 3
10:22:16 - MainThread - ERROR - BoundedSemaphore 错误: BoundedSemaphore released too many times
4.2 结果说明
- test_semaphore()函数中,Semaphore的许可数会从2变为3,然后过度释放,不会报错。
- test_bounded_semaphore()函数中,BoundedSemaphore会检测到过度释放,并抛出错误。
4.3 使用建议
- 无论是semaphore还是bounded semaphore,都像使用bounded semaphore一样使用。不要过度释放许可数。所以尽可能的使用async with的语法,来保证获取和释放许可的数量是一一对应的。
- 如果你确信要使用过度释放的特性,那请严格监控sem._value的值,因为很可能因为BUG,导致许可数无限增长。后果就是程序无法限制并发数,semaphore用了和没有用是一样的。
DEVELOP · ASYNCIO
develop python3 async coroutine
