develop
线程 VS 协程
1. threading模块
在 Python 中,threading 线程模块提供了与 asyncio 中 Event、Lock、Condition、Semaphore、Barrier 一一对应的同步原语,用于解决多线程间的竞争条件、同步执行等问题。二者核心功能一致(均为同步机制),只是底层依赖的调度模型不同 - threading 基于操作系统线程 - asyncio 基于单线程+调度实现的协程
以下对照asyncio中的同步原语,介绍threading模块中的同步原语。在理解threading模块中同步原语的使用方法时,更好的理解asyncio中的同步原语的功能。
2. threading 与 asyncio 同步原语对比
| 对比维度 | threading 同步原语 | asyncio 同步原语 |
|---|---|---|
| 底层依赖 | 操作系统内核线程调度(抢占式) | Python 事件循环(协作式) |
| 阻塞行为 | 线程阻塞时会释放 CPU(内核调度其他线程) | 协程阻塞时会让出控制权(事件循环调度其他协程) |
| 使用场景 | I/O 密集型任务,线程切换有较大开销 | I/O 密集型任务(协程的调度开销很小) |
| 上下文管理 | 均支持 with 语句 | 均支持 async with 语句(因协程需异步上下文) |
3.栗子对比
之前我们详细罗列过asyncio版本的各种同步原语,这里介绍threading模块中的同步原语。
1. 事件(Event)
Event的作用、方法都和asyncio中的Event一致,可以参考Event是什么?,不再赘述
Event的栗子
和asyncio中的Event的栗子一致,不过使用threading来实现
import logging
import threading
import time
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
def worker(event: threading.Event):
logging.info("子线程等待异步事件...")
logging.info(f"event is set? {event.is_set()}")
event.wait() # 触发事件
logging.info(f"event is set? {event.is_set()}")
logging.info("子线程收到事件信号、处理任务并退出")
if __name__ == "__main__":
event = threading.Event()
t = threading.Thread(target=worker, args=(event,), daemon=True)
t.start()
# 主线程延迟后发送信号
logging.info("主线程开始...")
time.sleep(2)
event.set() # 等待事件触发
t.join()
logging.info("主线程结束...")
Log输出如下:
19:43:51 - MainThread - INFO - 主线程开始...
19:43:51 - Thread-6 (worker) - INFO - 子线程等待异步事件...
19:43:51 - Thread-6 (worker) - INFO - event is set? False
19:43:53 - Thread-6 (worker) - INFO - event is set? True
19:43:53 - Thread-6 (worker) - INFO - 子线程收到事件信号、处理任务并退出
19:43:53 - MainThread - INFO - 主线程结束...
对比asyncio.Event的Log,两个版本的功能一致,区别就是threading版本中logging信息中,线程名称为不同的Thread-6(worker),而不是MainThread,这是由于threading模块中新建的thread会自动设置线程名称为Thread-6,而不是在主线程中执行。
2. 锁(Lock)
Lock的作用、方法都和asyncio中的Lock一致,可以参考Lock是什么?,不再赘述
Lock的栗子
和asyncio中的Lock的栗子一致,不过使用threading来实现
import threading
import logging
import time
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# 全局计数器
counter = 0
def worker(lock: threading.Lock, name: str):
global counter
with lock: # 自动调用acquire(),退出时自动调用release()
counter += 1
logging.info(f"Worker {name} 获得锁,开始操作,计数器值: {counter}")
time.sleep(1) # 模拟耗时操作
logging.info(f"Worker {name} 释放锁")
def main():
global counter
lock = threading.Lock()
threads = []
# 创建10个线程
for i in range(10):
thread = threading.Thread(target=worker, args=(lock, f"{i+1}"))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
logging.info(f"所有任务完成,最终计数器值: {counter}")
if __name__ == "__main__":
main()
Log输出如下:
14:09:39 - Thread-6 (worker) - INFO - Worker 1 获得锁,开始操作,计数器值: 1
14:09:40 - Thread-6 (worker) - INFO - Worker 1 释放锁
14:09:40 - Thread-7 (worker) - INFO - Worker 2 获得锁,开始操作,计数器值: 2
14:09:41 - Thread-7 (worker) - INFO - Worker 2 释放锁
14:09:41 - Thread-8 (worker) - INFO - Worker 3 获得锁,开始操作,计数器值: 3
14:09:42 - Thread-8 (worker) - INFO - Worker 3 释放锁
14:09:42 - Thread-9 (worker) - INFO - Worker 4 获得锁,开始操作,计数器值: 4
14:09:43 - Thread-9 (worker) - INFO - Worker 4 释放锁
14:09:43 - Thread-10 (worker) - INFO - Worker 5 获得锁,开始操作,计数器值: 5
14:09:44 - Thread-10 (worker) - INFO - Worker 5 释放锁
14:09:44 - Thread-11 (worker) - INFO - Worker 6 获得锁,开始操作,计数器值: 6
14:09:45 - Thread-11 (worker) - INFO - Worker 6 释放锁
14:09:45 - Thread-12 (worker) - INFO - Worker 7 获得锁,开始操作,计数器值: 7
14:09:46 - Thread-12 (worker) - INFO - Worker 7 释放锁
14:09:46 - Thread-13 (worker) - INFO - Worker 8 获得锁,开始操作,计数器值: 8
14:09:47 - Thread-13 (worker) - INFO - Worker 8 释放锁
14:09:47 - Thread-14 (worker) - INFO - Worker 9 获得锁,开始操作,计数器值: 9
14:09:48 - Thread-14 (worker) - INFO - Worker 9 释放锁
14:09:48 - Thread-15 (worker) - INFO - Worker 10 获得锁,开始操作,计数器值: 10
14:09:49 - Thread-15 (worker) - INFO - Worker 10 释放锁
14:09:49 - MainThread - INFO - 所有任务完成,最终计数器值: 10
对比asyncio.Lock的Log,两个版本的功能一致,区别就是threading版本中,真正的工作内容(worker)是在其他的子线程中执行,而不是MainThread。因为Lock的存在,导致线程之间无法并发执行,而是退化成了串行执行。
3. 条件变量(Condition)
Condition的作用、方法都和asyncio中的Condition一致,可以参考Condition是什么?,不再赘述
Condition的栗子
和asyncio中的Condition的栗子一致,不过使用threading来实现
import threading
import time
import logging
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
class SharedBuffer:
def __init__(self):
self.buffer = []
self.cond = threading.Condition() # 条件变量
self.max_size = 3
def produce(self, item):
with self.cond:
# 缓冲区满时等待
while len(self.buffer) == self.max_size:
self.cond.wait()
self.buffer.append(item)
logging.info(f"生产: {item}, 缓冲区: {self.buffer}")
self.cond.notify() # 通知消费者
def consume(self):
with self.cond:
# 缓冲区空时等待
while not self.buffer:
self.cond.wait()
item = self.buffer.pop(0)
logging.info(f"消费: {item}, 缓冲区: {self.buffer}")
self.cond.notify() # 通知生产者
return item
def producer(buffer):
for i in range(10):
buffer.produce(i)
time.sleep(1)
def consumer(buffer):
for _ in range(10):
buffer.consume()
time.sleep(2)
if __name__ == "__main__":
buffer = SharedBuffer()
t1 = threading.Thread(target=producer, args=(buffer,))
t2 = threading.Thread(target=consumer, args=(buffer,))
t1.start()
t2.start()
t1.join()
t2.join()
Log输出如下:
10:10:18 - Thread-6 (producer) - INFO - 生产: 0, 缓冲区: [0]
10:10:18 - Thread-7 (consumer) - INFO - 消费: 0, 缓冲区: []
10:10:19 - Thread-6 (producer) - INFO - 生产: 1, 缓冲区: [1]
10:10:20 - Thread-6 (producer) - INFO - 生产: 2, 缓冲区: [1, 2]
10:10:20 - Thread-7 (consumer) - INFO - 消费: 1, 缓冲区: [2]
10:10:21 - Thread-6 (producer) - INFO - 生产: 3, 缓冲区: [2, 3]
10:10:22 - Thread-7 (consumer) - INFO - 消费: 2, 缓冲区: [3]
10:10:22 - Thread-6 (producer) - INFO - 生产: 4, 缓冲区: [3, 4]
10:10:23 - Thread-6 (producer) - INFO - 生产: 5, 缓冲区: [3, 4, 5]
10:10:24 - Thread-7 (consumer) - INFO - 消费: 3, 缓冲区: [4, 5]
10:10:24 - Thread-6 (producer) - INFO - 生产: 6, 缓冲区: [4, 5, 6]
10:10:26 - Thread-7 (consumer) - INFO - 消费: 4, 缓冲区: [5, 6]
10:10:26 - Thread-6 (producer) - INFO - 生产: 7, 缓冲区: [5, 6, 7]
10:10:28 - Thread-7 (consumer) - INFO - 消费: 5, 缓冲区: [6, 7]
10:10:28 - Thread-6 (producer) - INFO - 生产: 8, 缓冲区: [6, 7, 8]
10:10:30 - Thread-7 (consumer) - INFO - 消费: 6, 缓冲区: [7, 8]
10:10:30 - Thread-6 (producer) - INFO - 生产: 9, 缓冲区: [7, 8, 9]
10:10:32 - Thread-7 (consumer) - INFO - 消费: 7, 缓冲区: [8, 9]
10:10:34 - Thread-7 (consumer) - INFO - 消费: 8, 缓冲区: [9]
10:10:36 - Thread-7 (consumer) - INFO - 消费: 9, 缓冲区: []
对比asyncio.Condition的Log,两个版本的功能一致,区别就是threading版本中,真正的工作内容(producer和consumer)是在其他的子线程中执行,而不是MainThread。
4. 信号量(Semaphore)
Semaphore的作用、方法都和asyncio中的Semaphore一致,可以参考Semaphore是什么?,不再赘述
Semaphore的栗子
和asyncio中的Semaphore的栗子一致,不过使用threading来实现
import threading
import logging
import time
# 配置日志格式
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# 定义一个需要控制并发的函数
def limited_task(sem, task_id):
# 通过with自动获取和释放许可
with sem:
logging.info(f"任务 {task_id} 开始执行")
time.sleep(2) # 模拟耗时操作(如网络请求)
logging.info(f"任务 {task_id} 执行完成")
def main():
# 初始化信号量,最多允许3个并发线程
sem = threading.Semaphore(3)
# 创建10个任务(线程)
threads = []
for i in range(10):
thread = threading.Thread(target=limited_task, args=(sem, i))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Log输出如下:
10:17:48 - Thread-6 (limited_task) - INFO - 任务 0 开始执行
10:17:48 - Thread-7 (limited_task) - INFO - 任务 1 开始执行
10:17:48 - Thread-8 (limited_task) - INFO - 任务 2 开始执行
10:17:50 - Thread-6 (limited_task) - INFO - 任务 0 执行完成
10:17:50 - Thread-7 (limited_task) - INFO - 任务 1 执行完成
10:17:50 - Thread-9 (limited_task) - INFO - 任务 3 开始执行
10:17:50 - Thread-10 (limited_task) - INFO - 任务 4 开始执行
10:17:50 - Thread-8 (limited_task) - INFO - 任务 2 执行完成
10:17:50 - Thread-11 (limited_task) - INFO - 任务 5 开始执行
10:17:52 - Thread-9 (limited_task) - INFO - 任务 3 执行完成
10:17:52 - Thread-10 (limited_task) - INFO - 任务 4 执行完成
10:17:52 - Thread-12 (limited_task) - INFO - 任务 6 开始执行
10:17:52 - Thread-11 (limited_task) - INFO - 任务 5 执行完成
10:17:52 - Thread-13 (limited_task) - INFO - 任务 7 开始执行
10:17:52 - Thread-14 (limited_task) - INFO - 任务 8 开始执行
10:17:54 - Thread-12 (limited_task) - INFO - 任务 6 执行完成
10:17:54 - Thread-15 (limited_task) - INFO - 任务 9 开始执行
10:17:54 - Thread-13 (limited_task) - INFO - 任务 7 执行完成
10:17:54 - Thread-14 (limited_task) - INFO - 任务 8 执行完成
10:17:56 - Thread-15 (limited_task) - INFO - 任务 9 执行完成
对比asyncio.Semaphore的Log,两个版本的功能一致,区别就是threading版本中,真正的工作内容(limited_task)是在多个独立的子线程中执行,而不是MainThread。
5. 屏障(Barrier)
Barrier的作用、方法都和asyncio中的Barrier一致,可以参考Barrier是什么?,不再赘述
Barrier的栗子
和asyncio中的Barrier的栗子一致,不过使用threading来实现
import threading
import time
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
def worker(barrier, name, phase1_duration, phase2_duration):
# 第一阶段
logging.info(f"{name} 开始第一阶段")
time.sleep(phase1_duration) # 模拟工作
barrier.wait() # 等待所有worker到达第一阶段
logging.info(f"{name} 结束第一阶段")
# 第二阶段
logging.info(f"{name} 开始第二阶段")
time.sleep(phase2_duration) # 模拟工作
barrier.wait() # 等待所有worker到达第二阶段
logging.info(f"{name} 结束第二阶段")
def main():
num_workers = 3
barrier = threading.Barrier(num_workers) # 3个参与者
# 创建线程
threads = []
workers = [
("Worker 1", 1, 2),
("Worker 2", 2, 3),
("Worker 3", 3, 4)
]
for name, phase1_duration, phase2_duration in workers:
thread = threading.Thread(
target=worker,
args=(barrier, name, phase1_duration, phase2_duration)
)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Log输出如下:
10:25:42 - Thread-6 (worker) - INFO - Worker 1 开始第一阶段
10:25:42 - Thread-7 (worker) - INFO - Worker 2 开始第一阶段
10:25:42 - Thread-8 (worker) - INFO - Worker 3 开始第一阶段
10:25:45 - Thread-8 (worker) - INFO - Worker 3 结束第一阶段
10:25:45 - Thread-6 (worker) - INFO - Worker 1 结束第一阶段
10:25:45 - Thread-7 (worker) - INFO - Worker 2 结束第一阶段
10:25:45 - Thread-8 (worker) - INFO - Worker 3 开始第二阶段
10:25:45 - Thread-6 (worker) - INFO - Worker 1 开始第二阶段
10:25:45 - Thread-7 (worker) - INFO - Worker 2 开始第二阶段
10:25:49 - Thread-8 (worker) - INFO - Worker 3 结束第二阶段
10:25:49 - Thread-6 (worker) - INFO - Worker 1 结束第二阶段
10:25:49 - Thread-7 (worker) - INFO - Worker 2 结束第二阶段
对比asyncio.Barrier的Log,两个版本的功能一致,区别就是threading版本中,真正的工作内容(worker)是在多个独立的子线程中执行,而不是MainThread。
4. 总结
通过以上所有的栗子可知,asyncio中的同步原语均是运行在MainThread中的,而threading中的同步原语运行在多个子线程中。
这也和asyncio的实现原理一致,它不创建新的线程,而是在MainThread中纯代码实现了一个协程的逻辑,所以不存在线程的创建、销毁的开销。 而threading创建新的线程,看起来也可以实现并行的目的,操作系统底层是多个线程,但是因为CPython中存在GIL的缘故,导致真正的执行也是先后顺序执行,而不是真正的并行,没法使用多个CPU来实现并行。 当然asyncio本身就是在一个线程中执行,所以也不是真正的并行,没法使用多个CPU来实现并行。
Python中如果真的要实现多CPU的并发,就需要使用multiprocessing模块。
DEVELOP · ASYNCIO
develop python3 async coroutine
