develop
异步协程中的Condition
Table of Contents
1. asyncio.Condition是什么?
在 Python 的 asyncio 模块中,asyncio.Condition 是一种用于协调多个协程(coroutine)交互的同步原语。它结合了锁(Lock) 和等待机制,允许协程在某个 “条件满足” 前暂停等待,直到其他协程主动通知条件已满足时再被唤醒继续执行。
核心概念
asyncio.Condition 主要解决 “多个协程需要基于共享状态的变化进行协作” 的场景。Condition 内部包含一个锁(默认是 asyncio.Lock) 和一个等待队列:
- 锁用于保护共享资源的访问(确保状态修改的原子性)。
- 等待队列用于存放因 “条件未满足” 而暂停的协程。注意,因为是队列,所以是FIFO。意味着如果多个协程被唤醒的顺序也是先进先出。
主要方法
- acquire()\release() 内部锁的获取和释放,进入临界区,修改共享状态后,退出临界区。
- wait() 释放内部锁,将当前协程暂停并加入等待队列,直到被 notify() 唤醒。唤醒后会重新尝试获取锁。wait() 可能因 “虚假唤醒”(未被 notify() 唤醒却恢复执行)而返回,因此必须用循环来检测条件是否真的满足。
- wait_for(predicate) wait()带条件的版本,等待直到 predicate(一个无参函数,返回布尔值)为 True,内部自动处理等待和条件检查防止“虚假唤醒”。
- notify(n=1) 唤醒等待队列中最多 n 个协程(默认唤醒 1 个)。需在持有锁时调用。需要注意notify不释放锁:调用 notify() 后,唤醒的协程不会立即执行,需等待当前协程释放锁(release())后,被唤醒的协程才能重新获取锁并继续。
- notify_all() 唤醒等待队列中所有协程。需在持有锁时调用。
2. 举个栗子来说明
import asyncio
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
MAX_SIZE = 3
async def producer(cond: asyncio.Condition, buffer: list, index):
async with cond: # 异步上下文管理
while len(buffer) == MAX_SIZE:
await cond.wait()
buffer.append(index)
logging.info(f"生产: {index}, 缓冲区: {buffer}")
cond.notify() # 唤醒等待的消费者
async def consumer(cond: asyncio.Condition, buffer: list):
async with cond:
# 异步等待条件满足(缓冲区非空),使用wait_for的版本,和producer的wait版本对比
await cond.wait_for(lambda: len(buffer) > 0)
item = buffer.pop(0)
logging.info(f"消费: {item}, 缓冲区: {buffer}")
cond.notify() # 唤醒等待的生产者
async def producer_worker(cond, buffer):
for i in range(10):
await producer(cond, buffer, i)
await asyncio.sleep(1)
async def consumer_worker(cond, buffer):
for i in range(10):
await consumer(cond, buffer)
await asyncio.sleep(2)
async def main():
cond = asyncio.Condition()
buffer = []
# 启动异步生产者和消费者(替代线程)
asyncio.create_task(producer_worker(cond, buffer))
await consumer_worker(cond, buffer)
logging.info(f"程序运行结束")
asyncio.run(main())
Log输出如下:
17:57:28 - MainThread - INFO - 生产: 0, 缓冲区: [0]
17:57:28 - MainThread - INFO - 消费: 0, 缓冲区: []
17:57:29 - MainThread - INFO - 生产: 1, 缓冲区: [1]
17:57:30 - MainThread - INFO - 消费: 1, 缓冲区: []
17:57:30 - MainThread - INFO - 生产: 2, 缓冲区: [2]
17:57:31 - MainThread - INFO - 生产: 3, 缓冲区: [2, 3]
17:57:32 - MainThread - INFO - 消费: 2, 缓冲区: [3]
17:57:32 - MainThread - INFO - 生产: 4, 缓冲区: [3, 4]
17:57:33 - MainThread - INFO - 生产: 5, 缓冲区: [3, 4, 5]
17:57:34 - MainThread - INFO - 消费: 3, 缓冲区: [4, 5]
17:57:34 - MainThread - INFO - 生产: 6, 缓冲区: [4, 5, 6]
17:57:36 - MainThread - INFO - 消费: 4, 缓冲区: [5, 6]
17:57:36 - MainThread - INFO - 生产: 7, 缓冲区: [5, 6, 7]
17:57:38 - MainThread - INFO - 消费: 5, 缓冲区: [6, 7]
17:57:38 - MainThread - INFO - 生产: 8, 缓冲区: [6, 7, 8]
17:57:40 - MainThread - INFO - 消费: 6, 缓冲区: [7, 8]
17:57:40 - MainThread - INFO - 生产: 9, 缓冲区: [7, 8, 9]
17:57:42 - MainThread - INFO - 消费: 7, 缓冲区: [8, 9]
17:57:44 - MainThread - INFO - 消费: 8, 缓冲区: [9]
17:57:46 - MainThread - INFO - 消费: 9, 缓冲区: []
17:57:48 - MainThread - INFO - 程序运行结束
3. 结果说明
由结果可知:
- 主协程开始后,先将producer_worker协程加入到事件循环中,然后开始执行。
- 等待consumer_worker执行完毕。这里注意producer_worker创建后,我们并没有获取其结果,也没有await他的结果,只await了consumer_worker的结果,这是因为condition中的条件隐性决定的。因为buffer中的数据是先生产完,然后消费完的,所以condition中buffer为空的条件确定了producer_worker和consumer_worker执行完毕的顺序。
- wait相关方法的使用:
- cond.notify()唤醒等待队列中的任务
- consumer_worker和producer_worker模拟的场景是:buffer中的数据是主要的竞争条件,一旦数据的归属确定后,后续针对数据的耗时处理就可以并行执行,各自互不影响。
- 因为producer_worker的速度更快,所以当达到最大值后,就会暂停写入,直到buffer中有空闲位置。
DEVELOP · ASYNCIO
develop python3 async coroutine
