develop
异步协程中的Barrier
Table of Contents
1. asyncio.Barrier是什么?
在 Python 的 asyncio 模块中,asyncio.Barrier(异步屏障),核心作用是确保多个协程在共同的执行节点处“汇合”,只有当所有协程都到达该节点并准备就绪后,才能一起继续执行后续逻辑,避免因部分协程未完成前置操作导致的同步问题。在 asyncio 中,它基于以下核心机制实现:
- 参与计数机制:维护一个固定的“参与数”(parties),代表需要参与同步的协程总数,该数值在初始化时确定。
- 就绪跟踪机制:实时记录已到达屏障等待点的协程数量(就绪数),通过计数器动态更新。
- 事件驱动唤醒:内部集成 asyncio.Event(事件),当就绪数达到参与数时,触发事件以唤醒所有等待的协程,确保“同时继续”。
- 状态重置机制:支持屏障的重复使用,当所有协程通过屏障后,自动重置就绪数等状态,可用于多轮同步场景。
Barrier 可直观理解为协程的“集合点”或“起跑线”,只有所有协程都在集合点就位,才能一起开启下一段执行流程。
核心概念
asyncio.Barrier 主要解决 “多个协程需在执行流程的某个阶段点完成汇合,全部准备就绪后再同时进入下一阶段” 的场景。其内部包含以下关键组件:
- 参与数(parties):初始化时设定的、必须参与同步的协程总数,是屏障的核心参数(如设定为 5,则必须有 5 个协程到达屏障才能继续)。
- 就绪数(waiters):当前已到达屏障并进入等待状态的协程数量,随协程调用
wait()方法动态递增。 - 内部事件(Event):用于实现协程的阻塞与唤醒——当就绪数未达到参与数时,协程会阻塞在事件上;当就绪数满足条件时,事件被触发,所有阻塞的协程同时被唤醒。
- 与 asyncio.Semaphore 的核心区别:
- Semaphore 用于控制并发协程的数量上限(如最多 3 个协程同时执行),协程可随时获取/释放“许可”,进出灵活且无需固定数量。
- Barrier 强调“齐步走”逻辑,要求固定数量的协程全部到达屏障点,缺一不可,只有满足条件后所有协程才能一起继续执行。
主要方法与属性
- 初始化方法:
__init__(self, parties)- 功能:创建一个 Barrier 实例,指定需要同步的协程总数。
- 参数:
parties(正整数),必须大于 0,代表需要等待的协程总数。 - 示例:
barrier = asyncio.Barrier(5)—— 创建一个需 5 个协程参与同步的屏障。
- 核心等待方法:
wait()- 功能:协程方法(需通过
await调用),标记当前协程已到达屏障点并进入等待状态。 - 执行逻辑:
- 调用后,屏障的“就绪数”自动加 1。
- 若就绪数 < 参与数:当前协程阻塞,等待其他协程到达。
- 若就绪数 == 参与数(最后一个协程调用
wait()):所有等待的协程同时被唤醒,且屏障自动重置就绪数(为下一轮同步做准备)。
- 异常:若屏障被
reset()或处于损坏状态,调用wait()会引发BrokenBarrierError。
- 功能:协程方法(需通过
- 重置方法:
reset()- 功能:将屏障恢复到初始状态,强制中断当前所有等待的协程。
- 执行逻辑:
- 清除所有等待的协程,并唤醒它们(被唤醒的协程会触发
BrokenBarrierError)。 - 重置“就绪数”为 0,标记屏障为“未损坏”状态。
- 清除所有等待的协程,并唤醒它们(被唤醒的协程会触发
- 适用场景:当需要取消当前轮次的同步、或重新启动新轮次同步时使用。
- 核心属性
| 属性名 | 功能描述 |
|---|---|
parties | 只读属性,返回屏障初始化时设定的“参与数”(需同步的协程总数)。 |
n_waiting | 只读属性,返回当前正在屏障处等待的协程数量(就绪数的实时值)。 |
broken | 只读属性,返回布尔值,指示屏障是否处于“损坏”状态(如被 reset() 触发)。 |
2. 举个栗子来说明
import asyncio
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
async def worker(barrier, name, phase1_duration, phase2_duration):
# 第一阶段
logging.info(f"{name} 开始第一阶段")
await asyncio.sleep(phase1_duration) # 模拟工作
await barrier.wait() # 等待所有worker到达第一阶段
logging.info(f"{name} 结束第一阶段")
# 第二阶段
logging.info(f"{name} 开始第二阶段")
await asyncio.sleep(phase2_duration) # 模拟工作
await barrier.wait() # 等待所有worker到达第二阶段
logging.info(f"{name} 结束第二阶段")
async def main():
num_workers = 3
barrier = asyncio.Barrier(num_workers) # 3个参与者
await asyncio.gather(
worker(barrier, "Worker 1", 1, 2),
worker(barrier, "Worker 2", 2, 3),
worker(barrier, "Worker 3", 3, 4)
)
asyncio.run(main())
Log输出如下:
11:13:27 - MainThread - INFO - Worker 1 开始第一阶段
11:13:27 - MainThread - INFO - Worker 2 开始第一阶段
11:13:27 - MainThread - INFO - Worker 3 开始第一阶段
11:13:30 - MainThread - INFO - Worker 3 结束第一阶段
11:13:30 - MainThread - INFO - Worker 3 开始第二阶段
11:13:30 - MainThread - INFO - Worker 1 结束第一阶段
11:13:30 - MainThread - INFO - Worker 1 开始第二阶段
11:13:30 - MainThread - INFO - Worker 2 结束第一阶段
11:13:30 - MainThread - INFO - Worker 2 开始第二阶段
11:13:34 - MainThread - INFO - Worker 3 结束第二阶段
11:13:34 - MainThread - INFO - Worker 1 结束第二阶段
11:13:34 - MainThread - INFO - Worker 2 结束第二阶段
3. 结果说明
由结果可知:
- 主协程开始后,初始化了需要3个协程参与的屏障barrier,然后开始执行3个worker协程。
- 每个worker协程都调用worker函数,该函数执行两个阶段的工作,在每个阶段结束时调用
await barrier.wait()等待其他协程。 - 每个阶段的执行耗时遵循木桶效应,每个阶段的总耗时是当前阶段中耗时最长的worker来决定的。
- 在第一阶段,所有worker协程都开始执行,但只有当最后一个协程(Worker 3)完成第一阶段工作并调用
await barrier.wait()后,所有协程才同时被唤醒继续执行第二阶段。 - 在第二阶段也是一样,所有协程都必须完成第二阶段工作并调用
await barrier.wait()后才会同时结束。
- 在第一阶段,所有worker协程都开始执行,但只有当最后一个协程(Worker 3)完成第一阶段工作并调用
- 使用asyncio.gather()等待所有worker协程完成。
通过日志可以看出,所有协程在屏障点同步,确保了所有协程在同一时刻进入下一阶段,实现了协调执行的效果。
DEVELOP · ASYNCIO
develop python3 async coroutine
