Personal Blog

develop

线程 VS 协程

在之前文章中,已经介绍了异步协程的各种同步原语的使用方法。所以这里介绍python3中传统线程的使用方法,方便进行对比和加深印象。

1. threading模块

在 Python 中,threading 线程模块提供了与 asyncio 中 Event、Lock、Condition、Semaphore、Barrier 一一对应的同步原语,用于解决多线程间的竞争条件、同步执行等问题。二者核心功能一致(均为同步机制),只是底层依赖的调度模型不同 - threading 基于操作系统线程 - asyncio 基于单线程+调度实现的协程

以下对照asyncio中的同步原语,介绍threading模块中的同步原语。在理解threading模块中同步原语的使用方法时,更好的理解asyncio中的同步原语的功能。

2. threading 与 asyncio 同步原语对比

对比维度threading 同步原语asyncio 同步原语
底层依赖操作系统内核线程调度(抢占式)Python 事件循环(协作式)
阻塞行为线程阻塞时会释放 CPU(内核调度其他线程)协程阻塞时会让出控制权(事件循环调度其他协程)
使用场景I/O 密集型任务,线程切换有较大开销I/O 密集型任务(协程的调度开销很小)
上下文管理均支持 with 语句均支持 async with 语句(因协程需异步上下文)

3.栗子对比

之前我们详细罗列过asyncio版本的各种同步原语,这里介绍threading模块中的同步原语。

1. 事件(Event)

Event的作用、方法都和asyncio中的Event一致,可以参考Event是什么?,不再赘述

Event的栗子

asyncio中的Event的栗子一致,不过使用threading来实现

import logging
import threading
import time

# 配置日志格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)

def worker(event: threading.Event):
    logging.info("子线程等待异步事件...")
    logging.info(f"event is set? {event.is_set()}")
    event.wait()  # 触发事件
    logging.info(f"event is set? {event.is_set()}")
    logging.info("子线程收到事件信号、处理任务并退出")

if __name__ == "__main__":
    event = threading.Event()
    t = threading.Thread(target=worker, args=(event,), daemon=True)
    t.start()

    # 主线程延迟后发送信号
    logging.info("主线程开始...")
    
    time.sleep(2)
    event.set()  # 等待事件触发
    t.join()
    logging.info("主线程结束...")

Log输出如下:

19:43:51 - MainThread - INFO - 主线程开始...
19:43:51 - Thread-6 (worker) - INFO - 子线程等待异步事件...
19:43:51 - Thread-6 (worker) - INFO - event is set? False
19:43:53 - Thread-6 (worker) - INFO - event is set? True
19:43:53 - Thread-6 (worker) - INFO - 子线程收到事件信号、处理任务并退出
19:43:53 - MainThread - INFO - 主线程结束...

对比asyncio.Event的Log,两个版本的功能一致,区别就是threading版本中logging信息中,线程名称为不同的Thread-6(worker),而不是MainThread,这是由于threading模块中新建的thread会自动设置线程名称为Thread-6,而不是在主线程中执行。

2. 锁(Lock)

Lock的作用、方法都和asyncio中的Lock一致,可以参考Lock是什么?,不再赘述

Lock的栗子

asyncio中的Lock的栗子一致,不过使用threading来实现

import threading
import logging
import time

# 配置日志格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)

# 全局计数器
counter = 0


def worker(lock: threading.Lock, name: str):
    global counter
    with lock:  # 自动调用acquire(),退出时自动调用release()
        counter += 1
        logging.info(f"Worker {name} 获得锁,开始操作,计数器值: {counter}")
        time.sleep(1)  # 模拟耗时操作
        logging.info(f"Worker {name} 释放锁")


def main():
    global counter
    lock = threading.Lock()
    threads = []
    
    # 创建10个线程
    for i in range(10):
        thread = threading.Thread(target=worker, args=(lock, f"{i+1}"))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    logging.info(f"所有任务完成,最终计数器值: {counter}")


if __name__ == "__main__":
    main()

Log输出如下:

14:09:39 - Thread-6 (worker) - INFO - Worker 1 获得锁,开始操作,计数器值: 1
14:09:40 - Thread-6 (worker) - INFO - Worker 1 释放锁
14:09:40 - Thread-7 (worker) - INFO - Worker 2 获得锁,开始操作,计数器值: 2
14:09:41 - Thread-7 (worker) - INFO - Worker 2 释放锁
14:09:41 - Thread-8 (worker) - INFO - Worker 3 获得锁,开始操作,计数器值: 3
14:09:42 - Thread-8 (worker) - INFO - Worker 3 释放锁
14:09:42 - Thread-9 (worker) - INFO - Worker 4 获得锁,开始操作,计数器值: 4
14:09:43 - Thread-9 (worker) - INFO - Worker 4 释放锁
14:09:43 - Thread-10 (worker) - INFO - Worker 5 获得锁,开始操作,计数器值: 5
14:09:44 - Thread-10 (worker) - INFO - Worker 5 释放锁
14:09:44 - Thread-11 (worker) - INFO - Worker 6 获得锁,开始操作,计数器值: 6
14:09:45 - Thread-11 (worker) - INFO - Worker 6 释放锁
14:09:45 - Thread-12 (worker) - INFO - Worker 7 获得锁,开始操作,计数器值: 7
14:09:46 - Thread-12 (worker) - INFO - Worker 7 释放锁
14:09:46 - Thread-13 (worker) - INFO - Worker 8 获得锁,开始操作,计数器值: 8
14:09:47 - Thread-13 (worker) - INFO - Worker 8 释放锁
14:09:47 - Thread-14 (worker) - INFO - Worker 9 获得锁,开始操作,计数器值: 9
14:09:48 - Thread-14 (worker) - INFO - Worker 9 释放锁
14:09:48 - Thread-15 (worker) - INFO - Worker 10 获得锁,开始操作,计数器值: 10
14:09:49 - Thread-15 (worker) - INFO - Worker 10 释放锁
14:09:49 - MainThread - INFO - 所有任务完成,最终计数器值: 10

对比asyncio.Lock的Log,两个版本的功能一致,区别就是threading版本中,真正的工作内容(worker)是在其他的子线程中执行,而不是MainThread。因为Lock的存在,导致线程之间无法并发执行,而是退化成了串行执行。

3. 条件变量(Condition)

Condition的作用、方法都和asyncio中的Condition一致,可以参考Condition是什么?,不再赘述

Condition的栗子

asyncio中的Condition的栗子一致,不过使用threading来实现

import threading
import time
import logging

# 配置日志格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)

class SharedBuffer:
    def __init__(self):
        self.buffer = []
        self.cond = threading.Condition()  # 条件变量
        self.max_size = 3

    def produce(self, item):
        with self.cond:
            # 缓冲区满时等待
            while len(self.buffer) == self.max_size:
                self.cond.wait()
            self.buffer.append(item)
            logging.info(f"生产: {item}, 缓冲区: {self.buffer}")
            self.cond.notify()  # 通知消费者

    def consume(self):
        with self.cond:
            # 缓冲区空时等待
            while not self.buffer:
                self.cond.wait()
            item = self.buffer.pop(0)
            logging.info(f"消费: {item}, 缓冲区: {self.buffer}")
            self.cond.notify()  # 通知生产者
            return item

def producer(buffer):
    for i in range(10):
        buffer.produce(i)
        time.sleep(1)

def consumer(buffer):
    for _ in range(10):
        buffer.consume()
        time.sleep(2)

if __name__ == "__main__":
    buffer = SharedBuffer()
    t1 = threading.Thread(target=producer, args=(buffer,))
    t2 = threading.Thread(target=consumer, args=(buffer,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

Log输出如下:

10:10:18 - Thread-6 (producer) - INFO - 生产: 0, 缓冲区: [0]
10:10:18 - Thread-7 (consumer) - INFO - 消费: 0, 缓冲区: []
10:10:19 - Thread-6 (producer) - INFO - 生产: 1, 缓冲区: [1]
10:10:20 - Thread-6 (producer) - INFO - 生产: 2, 缓冲区: [1, 2]
10:10:20 - Thread-7 (consumer) - INFO - 消费: 1, 缓冲区: [2]
10:10:21 - Thread-6 (producer) - INFO - 生产: 3, 缓冲区: [2, 3]
10:10:22 - Thread-7 (consumer) - INFO - 消费: 2, 缓冲区: [3]
10:10:22 - Thread-6 (producer) - INFO - 生产: 4, 缓冲区: [3, 4]
10:10:23 - Thread-6 (producer) - INFO - 生产: 5, 缓冲区: [3, 4, 5]
10:10:24 - Thread-7 (consumer) - INFO - 消费: 3, 缓冲区: [4, 5]
10:10:24 - Thread-6 (producer) - INFO - 生产: 6, 缓冲区: [4, 5, 6]
10:10:26 - Thread-7 (consumer) - INFO - 消费: 4, 缓冲区: [5, 6]
10:10:26 - Thread-6 (producer) - INFO - 生产: 7, 缓冲区: [5, 6, 7]
10:10:28 - Thread-7 (consumer) - INFO - 消费: 5, 缓冲区: [6, 7]
10:10:28 - Thread-6 (producer) - INFO - 生产: 8, 缓冲区: [6, 7, 8]
10:10:30 - Thread-7 (consumer) - INFO - 消费: 6, 缓冲区: [7, 8]
10:10:30 - Thread-6 (producer) - INFO - 生产: 9, 缓冲区: [7, 8, 9]
10:10:32 - Thread-7 (consumer) - INFO - 消费: 7, 缓冲区: [8, 9]
10:10:34 - Thread-7 (consumer) - INFO - 消费: 8, 缓冲区: [9]
10:10:36 - Thread-7 (consumer) - INFO - 消费: 9, 缓冲区: []

对比asyncio.Condition的Log,两个版本的功能一致,区别就是threading版本中,真正的工作内容(producer和consumer)是在其他的子线程中执行,而不是MainThread。

4. 信号量(Semaphore)

Semaphore的作用、方法都和asyncio中的Semaphore一致,可以参考Semaphore是什么?,不再赘述

Semaphore的栗子

asyncio中的Semaphore的栗子一致,不过使用threading来实现

import threading
import logging
import time

# 配置日志格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)


# 定义一个需要控制并发的函数
def limited_task(sem, task_id):
    # 通过with自动获取和释放许可
    with sem:
        logging.info(f"任务 {task_id} 开始执行")
        time.sleep(2)  # 模拟耗时操作(如网络请求)
        logging.info(f"任务 {task_id} 执行完成")


def main():
    # 初始化信号量,最多允许3个并发线程
    sem = threading.Semaphore(3)
    
    # 创建10个任务(线程)
    threads = []
    for i in range(10):
        thread = threading.Thread(target=limited_task, args=(sem, i))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()


if __name__ == "__main__":
    main()

Log输出如下:

10:17:48 - Thread-6 (limited_task) - INFO - 任务 0 开始执行
10:17:48 - Thread-7 (limited_task) - INFO - 任务 1 开始执行
10:17:48 - Thread-8 (limited_task) - INFO - 任务 2 开始执行
10:17:50 - Thread-6 (limited_task) - INFO - 任务 0 执行完成
10:17:50 - Thread-7 (limited_task) - INFO - 任务 1 执行完成
10:17:50 - Thread-9 (limited_task) - INFO - 任务 3 开始执行
10:17:50 - Thread-10 (limited_task) - INFO - 任务 4 开始执行
10:17:50 - Thread-8 (limited_task) - INFO - 任务 2 执行完成
10:17:50 - Thread-11 (limited_task) - INFO - 任务 5 开始执行
10:17:52 - Thread-9 (limited_task) - INFO - 任务 3 执行完成
10:17:52 - Thread-10 (limited_task) - INFO - 任务 4 执行完成
10:17:52 - Thread-12 (limited_task) - INFO - 任务 6 开始执行
10:17:52 - Thread-11 (limited_task) - INFO - 任务 5 执行完成
10:17:52 - Thread-13 (limited_task) - INFO - 任务 7 开始执行
10:17:52 - Thread-14 (limited_task) - INFO - 任务 8 开始执行
10:17:54 - Thread-12 (limited_task) - INFO - 任务 6 执行完成
10:17:54 - Thread-15 (limited_task) - INFO - 任务 9 开始执行
10:17:54 - Thread-13 (limited_task) - INFO - 任务 7 执行完成
10:17:54 - Thread-14 (limited_task) - INFO - 任务 8 执行完成
10:17:56 - Thread-15 (limited_task) - INFO - 任务 9 执行完成

对比asyncio.Semaphore的Log,两个版本的功能一致,区别就是threading版本中,真正的工作内容(limited_task)是在多个独立的子线程中执行,而不是MainThread。

5. 屏障(Barrier)

Barrier的作用、方法都和asyncio中的Barrier一致,可以参考Barrier是什么?,不再赘述

Barrier的栗子

asyncio中的Barrier的栗子一致,不过使用threading来实现

import threading
import time
import logging

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
    datefmt='%H:%M:%S'
)

def worker(barrier, name, phase1_duration, phase2_duration):
    # 第一阶段
    logging.info(f"{name} 开始第一阶段")
    time.sleep(phase1_duration)  # 模拟工作
    barrier.wait()  # 等待所有worker到达第一阶段
    logging.info(f"{name} 结束第一阶段")
    
    # 第二阶段
    logging.info(f"{name} 开始第二阶段")
    time.sleep(phase2_duration)  # 模拟工作
    barrier.wait()  # 等待所有worker到达第二阶段
    logging.info(f"{name} 结束第二阶段")

def main():
    num_workers = 3
    barrier = threading.Barrier(num_workers)  # 3个参与者
    
    # 创建线程
    threads = []
    workers = [
        ("Worker 1", 1, 2),
        ("Worker 2", 2, 3),
        ("Worker 3", 3, 4)
    ]
    
    for name, phase1_duration, phase2_duration in workers:
        thread = threading.Thread(
            target=worker,
            args=(barrier, name, phase1_duration, phase2_duration)
        )
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

Log输出如下:

10:25:42 - Thread-6 (worker) - INFO - Worker 1 开始第一阶段
10:25:42 - Thread-7 (worker) - INFO - Worker 2 开始第一阶段
10:25:42 - Thread-8 (worker) - INFO - Worker 3 开始第一阶段
10:25:45 - Thread-8 (worker) - INFO - Worker 3 结束第一阶段
10:25:45 - Thread-6 (worker) - INFO - Worker 1 结束第一阶段
10:25:45 - Thread-7 (worker) - INFO - Worker 2 结束第一阶段
10:25:45 - Thread-8 (worker) - INFO - Worker 3 开始第二阶段
10:25:45 - Thread-6 (worker) - INFO - Worker 1 开始第二阶段
10:25:45 - Thread-7 (worker) - INFO - Worker 2 开始第二阶段
10:25:49 - Thread-8 (worker) - INFO - Worker 3 结束第二阶段
10:25:49 - Thread-6 (worker) - INFO - Worker 1 结束第二阶段
10:25:49 - Thread-7 (worker) - INFO - Worker 2 结束第二阶段

对比asyncio.Barrier的Log,两个版本的功能一致,区别就是threading版本中,真正的工作内容(worker)是在多个独立的子线程中执行,而不是MainThread。

4. 总结

通过以上所有的栗子可知,asyncio中的同步原语均是运行在MainThread中的,而threading中的同步原语运行在多个子线程中。

这也和asyncio的实现原理一致,它不创建新的线程,而是在MainThread中纯代码实现了一个协程的逻辑,所以不存在线程的创建、销毁的开销。 而threading创建新的线程,看起来也可以实现并行的目的,操作系统底层是多个线程,但是因为CPython中存在GIL的缘故,导致真正的执行也是先后顺序执行,而不是真正的并行,没法使用多个CPU来实现并行。 当然asyncio本身就是在一个线程中执行,所以也不是真正的并行,没法使用多个CPU来实现并行。

Python中如果真的要实现多CPU的并发,就需要使用multiprocessing模块。

DEVELOP · ASYNCIO
develop python3 async coroutine