Personal Blog

develop

异步协程中的Condition

介绍python3中异步协程中的Condition的使用方法

1. asyncio.Condition是什么?

在 Python 的 asyncio 模块中,asyncio.Condition 是一种用于协调多个协程(coroutine)交互的同步原语。它结合了锁(Lock) 和等待机制,允许协程在某个 “条件满足” 前暂停等待,直到其他协程主动通知条件已满足时再被唤醒继续执行。

核心概念

asyncio.Condition 主要解决 “多个协程需要基于共享状态的变化进行协作” 的场景。Condition 内部包含一个锁(默认是 asyncio.Lock) 和一个等待队列:

  • 锁用于保护共享资源的访问(确保状态修改的原子性)。
  • 等待队列用于存放因 “条件未满足” 而暂停的协程。注意,因为是队列,所以是FIFO。意味着如果多个协程被唤醒的顺序也是先进先出。

主要方法

  1. acquire()\release() 内部锁的获取和释放,进入临界区,修改共享状态后,退出临界区。
  2. wait() 释放内部锁,将当前协程暂停并加入等待队列,直到被 notify() 唤醒。唤醒后会重新尝试获取锁。wait() 可能因 “虚假唤醒”(未被 notify() 唤醒却恢复执行)而返回,因此必须用循环来检测条件是否真的满足。
  3. wait_for(predicate) wait()带条件的版本,等待直到 predicate(一个无参函数,返回布尔值)为 True,内部自动处理等待和条件检查防止“虚假唤醒”。
  4. notify(n=1) 唤醒等待队列中最多 n 个协程(默认唤醒 1 个)。需在持有锁时调用。需要注意notify不释放锁:调用 notify() 后,唤醒的协程不会立即执行,需等待当前协程释放锁(release())后,被唤醒的协程才能重新获取锁并继续。
  5. notify_all() 唤醒等待队列中所有协程。需在持有锁时调用。

2. 举个栗子来说明

import asyncio
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)

MAX_SIZE = 3

async def producer(cond: asyncio.Condition, buffer: list, index):
    async with cond:  # 异步上下文管理
        while len(buffer) == MAX_SIZE:
            await cond.wait()            
        buffer.append(index)
        logging.info(f"生产: {index}, 缓冲区: {buffer}")
        cond.notify()  # 唤醒等待的消费者

async def consumer(cond: asyncio.Condition, buffer: list):
    async with cond:
        # 异步等待条件满足(缓冲区非空),使用wait_for的版本,和producer的wait版本对比
        await cond.wait_for(lambda: len(buffer) > 0)
        item = buffer.pop(0)
        logging.info(f"消费: {item}, 缓冲区: {buffer}")
        cond.notify()  # 唤醒等待的生产者


async def producer_worker(cond, buffer):
    for i in range(10):
        await producer(cond, buffer, i)
        await asyncio.sleep(1)

async def consumer_worker(cond, buffer):
    for i in range(10):
        await consumer(cond, buffer)
        await asyncio.sleep(2)

async def main():
    cond = asyncio.Condition()
    buffer = []
    # 启动异步生产者和消费者(替代线程)
    asyncio.create_task(producer_worker(cond, buffer))
    await consumer_worker(cond, buffer)
    logging.info(f"程序运行结束")
    

asyncio.run(main())

Log输出如下:

17:57:28 - MainThread - INFO - 生产: 0, 缓冲区: [0]               
17:57:28 - MainThread - INFO - 消费: 0, 缓冲区: []           
17:57:29 - MainThread - INFO - 生产: 1, 缓冲区: [1]
17:57:30 - MainThread - INFO - 消费: 1, 缓冲区: []
17:57:30 - MainThread - INFO - 生产: 2, 缓冲区: [2]
17:57:31 - MainThread - INFO - 生产: 3, 缓冲区: [2, 3]
17:57:32 - MainThread - INFO - 消费: 2, 缓冲区: [3]
17:57:32 - MainThread - INFO - 生产: 4, 缓冲区: [3, 4]
17:57:33 - MainThread - INFO - 生产: 5, 缓冲区: [3, 4, 5]
17:57:34 - MainThread - INFO - 消费: 3, 缓冲区: [4, 5]
17:57:34 - MainThread - INFO - 生产: 6, 缓冲区: [4, 5, 6]
17:57:36 - MainThread - INFO - 消费: 4, 缓冲区: [5, 6]
17:57:36 - MainThread - INFO - 生产: 7, 缓冲区: [5, 6, 7]
17:57:38 - MainThread - INFO - 消费: 5, 缓冲区: [6, 7]
17:57:38 - MainThread - INFO - 生产: 8, 缓冲区: [6, 7, 8]
17:57:40 - MainThread - INFO - 消费: 6, 缓冲区: [7, 8]
17:57:40 - MainThread - INFO - 生产: 9, 缓冲区: [7, 8, 9]
17:57:42 - MainThread - INFO - 消费: 7, 缓冲区: [8, 9]
17:57:44 - MainThread - INFO - 消费: 8, 缓冲区: [9]
17:57:46 - MainThread - INFO - 消费: 9, 缓冲区: []
17:57:48 - MainThread - INFO - 程序运行结束

3. 结果说明

由结果可知:

  1. 主协程开始后,先将producer_worker协程加入到事件循环中,然后开始执行。
  2. 等待consumer_worker执行完毕。这里注意producer_worker创建后,我们并没有获取其结果,也没有await他的结果,只await了consumer_worker的结果,这是因为condition中的条件隐性决定的。因为buffer中的数据是先生产完,然后消费完的,所以condition中buffer为空的条件确定了producer_worker和consumer_worker执行完毕的顺序。
  3. wait相关方法的使用:
    1. producer中使用了cond.wait()方法,展示了wait()可能因 “虚假唤醒” 1 2 而返回,因此必须用循环来检测len(buffer) == MAX_SIZE的条件是否真的满足。
    2. consumer中使用了cond.wait_for()方法,展示了内部自动处理等待和条件检查。
  4. cond.notify()唤醒等待队列中的任务
  5. consumer_worker和producer_worker模拟的场景是:buffer中的数据是主要的竞争条件,一旦数据的归属确定后,后续针对数据的耗时处理就可以并行执行,各自互不影响。
  6. 因为producer_worker的速度更快,所以当达到最大值后,就会暂停写入,直到buffer中有空闲位置。
  1. asyncio下的虚假唤醒理由一:cond.wait() 方法的 “虚假唤醒”(spurious wakeup)是指其他协程调用 notifyAll() 方法的情况下,从等待状态(wait)意外地被唤醒,但是此时条件并没有满足。 

  2. asyncio下的虚假唤醒理由二:asyncio 的 API 设计借鉴了传统多线程同步原语的模式,包括对 “虚假唤醒” 的防御性处理。虽然asyncio condition的纯逻辑实现可以避免操作系统级的随机唤醒,但框架本身并不保证 wait() 只会被 notify() 唤醒。 例如,未来版本的 asyncio 可能因优化或逻辑调整引入新的唤醒场景。 

DEVELOP · ASYNCIO
develop python3 async coroutine