Personal Blog

develop

异步协程中的Barrier

介绍python3中异步协程中的Barrier的使用方法

1. asyncio.Barrier是什么?

在 Python 的 asyncio 模块中,asyncio.Barrier(异步屏障),核心作用是确保多个协程在共同的执行节点处“汇合”,只有当所有协程都到达该节点并准备就绪后,才能一起继续执行后续逻辑,避免因部分协程未完成前置操作导致的同步问题。在 asyncio 中,它基于以下核心机制实现:

  1. 参与计数机制:维护一个固定的“参与数”(parties),代表需要参与同步的协程总数,该数值在初始化时确定。
  2. 就绪跟踪机制:实时记录已到达屏障等待点的协程数量(就绪数),通过计数器动态更新。
  3. 事件驱动唤醒:内部集成 asyncio.Event(事件),当就绪数达到参与数时,触发事件以唤醒所有等待的协程,确保“同时继续”。
  4. 状态重置机制:支持屏障的重复使用,当所有协程通过屏障后,自动重置就绪数等状态,可用于多轮同步场景。

Barrier 可直观理解为协程的“集合点”或“起跑线”,只有所有协程都在集合点就位,才能一起开启下一段执行流程。

核心概念

asyncio.Barrier 主要解决 “多个协程需在执行流程的某个阶段点完成汇合,全部准备就绪后再同时进入下一阶段” 的场景。其内部包含以下关键组件:

  • 参与数(parties):初始化时设定的、必须参与同步的协程总数,是屏障的核心参数(如设定为 5,则必须有 5 个协程到达屏障才能继续)。
  • 就绪数(waiters):当前已到达屏障并进入等待状态的协程数量,随协程调用 wait() 方法动态递增。
  • 内部事件(Event):用于实现协程的阻塞与唤醒——当就绪数未达到参与数时,协程会阻塞在事件上;当就绪数满足条件时,事件被触发,所有阻塞的协程同时被唤醒。
  • 与 asyncio.Semaphore 的核心区别
    • Semaphore 用于控制并发协程的数量上限(如最多 3 个协程同时执行),协程可随时获取/释放“许可”,进出灵活且无需固定数量。
    • Barrier 强调“齐步走”逻辑,要求固定数量的协程全部到达屏障点,缺一不可,只有满足条件后所有协程才能一起继续执行。

主要方法与属性

  1. 初始化方法:__init__(self, parties)
    • 功能:创建一个 Barrier 实例,指定需要同步的协程总数。
    • 参数parties(正整数),必须大于 0,代表需要等待的协程总数。
    • 示例barrier = asyncio.Barrier(5) —— 创建一个需 5 个协程参与同步的屏障。
  2. 核心等待方法:wait()
    • 功能:协程方法(需通过 await 调用),标记当前协程已到达屏障点并进入等待状态。
    • 执行逻辑
      1. 调用后,屏障的“就绪数”自动加 1。
      2. 若就绪数 < 参与数:当前协程阻塞,等待其他协程到达。
      3. 若就绪数 == 参与数(最后一个协程调用 wait()):所有等待的协程同时被唤醒,且屏障自动重置就绪数(为下一轮同步做准备)。
    • 异常:若屏障被 reset() 或处于损坏状态,调用 wait() 会引发 BrokenBarrierError
  3. 重置方法:reset()
    • 功能:将屏障恢复到初始状态,强制中断当前所有等待的协程。
    • 执行逻辑
      1. 清除所有等待的协程,并唤醒它们(被唤醒的协程会触发 BrokenBarrierError)。
      2. 重置“就绪数”为 0,标记屏障为“未损坏”状态。
    • 适用场景:当需要取消当前轮次的同步、或重新启动新轮次同步时使用。
  4. 核心属性
属性名功能描述
parties只读属性,返回屏障初始化时设定的“参与数”(需同步的协程总数)。
n_waiting只读属性,返回当前正在屏障处等待的协程数量(就绪数的实时值)。
broken只读属性,返回布尔值,指示屏障是否处于“损坏”状态(如被 reset() 触发)。

2. 举个栗子来说明

import asyncio
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)

async def worker(barrier, name, phase1_duration, phase2_duration):
    # 第一阶段
    logging.info(f"{name} 开始第一阶段")
    await asyncio.sleep(phase1_duration)  # 模拟工作
    await barrier.wait()  # 等待所有worker到达第一阶段
    logging.info(f"{name} 结束第一阶段")
    
    # 第二阶段
    logging.info(f"{name} 开始第二阶段")
    await asyncio.sleep(phase2_duration)  # 模拟工作
    await barrier.wait()  # 等待所有worker到达第二阶段
    logging.info(f"{name} 结束第二阶段")
    
async def main():
    num_workers = 3
    barrier = asyncio.Barrier(num_workers)  # 3个参与者
    await asyncio.gather(
        worker(barrier, "Worker 1", 1, 2),
        worker(barrier, "Worker 2", 2, 3),
        worker(barrier, "Worker 3", 3, 4)
    )

asyncio.run(main())

Log输出如下:

11:13:27 - MainThread - INFO - Worker 1 开始第一阶段
11:13:27 - MainThread - INFO - Worker 2 开始第一阶段
11:13:27 - MainThread - INFO - Worker 3 开始第一阶段
11:13:30 - MainThread - INFO - Worker 3 结束第一阶段
11:13:30 - MainThread - INFO - Worker 3 开始第二阶段
11:13:30 - MainThread - INFO - Worker 1 结束第一阶段
11:13:30 - MainThread - INFO - Worker 1 开始第二阶段
11:13:30 - MainThread - INFO - Worker 2 结束第一阶段
11:13:30 - MainThread - INFO - Worker 2 开始第二阶段
11:13:34 - MainThread - INFO - Worker 3 结束第二阶段
11:13:34 - MainThread - INFO - Worker 1 结束第二阶段
11:13:34 - MainThread - INFO - Worker 2 结束第二阶段

3. 结果说明

由结果可知:

  1. 主协程开始后,初始化了需要3个协程参与的屏障barrier,然后开始执行3个worker协程。
  2. 每个worker协程都调用worker函数,该函数执行两个阶段的工作,在每个阶段结束时调用await barrier.wait()等待其他协程。
  3. 每个阶段的执行耗时遵循木桶效应,每个阶段的总耗时是当前阶段中耗时最长的worker来决定的。
    1. 在第一阶段,所有worker协程都开始执行,但只有当最后一个协程(Worker 3)完成第一阶段工作并调用await barrier.wait()后,所有协程才同时被唤醒继续执行第二阶段。
    2. 在第二阶段也是一样,所有协程都必须完成第二阶段工作并调用await barrier.wait()后才会同时结束。
  4. 使用asyncio.gather()等待所有worker协程完成。

通过日志可以看出,所有协程在屏障点同步,确保了所有协程在同一时刻进入下一阶段,实现了协调执行的效果。

DEVELOP · ASYNCIO
develop python3 async coroutine