自己做网站哪种好做,食品网站建设书,24小时24元网站建设,网站建设 主要学是么一、A Bit of Jargon
1、关键术语解析
1.1 并发 (Concurrency)
定义: 并发是指同时处理多个待处理任务的能力#xff0c;这些任务可以依次或并行#xff08;如果可能#xff09;进行#xff0c;最终每个任务都会成功或失败。
理解:
单核 CPU: 即使是单核 CPU 也可以实…一、A Bit of Jargon
1、关键术语解析
1.1 并发 (Concurrency)
定义: 并发是指同时处理多个待处理任务的能力这些任务可以依次或并行如果可能进行最终每个任务都会成功或失败。
理解:
单核 CPU: 即使是单核 CPU 也可以实现并发因为操作系统调度器会交错执行多个任务给人一种同时进行的错觉。多任务处理: 并发也常被称为多任务处理。
举例: 想象你在厨房里同时做三道菜炒菜、煮汤和烤面包。你不会同时做所有事情而是会交替进行例如先炒一会儿菜然后去搅拌汤再去查看面包的状态。这种交替进行的方式就是并发。
1.2 并行 (Parallelism)
定义: 并行是指同时执行多个计算任务的能力。
理解:
硬件要求: 并行需要多核 CPU、多个 CPU、GPU 或集群中的多台计算机。与并发的区别: 并发强调的是任务处理的交替进行而并行则是真正的同时进行。
举例: 回到厨房的例子如果你有多个炉灶和帮手你们可以同时进行炒菜、煮汤和烤面包这就是并行。
1.3 执行单元 (Execution Unit)
定义: 执行单元是执行代码的并发对象每个执行单元都有独立的执行状态和调用栈。
Python 原生支持的执行单元:
进程 (Process): 独立的内存空间隔离性强通信成本高。线程 (Thread): 共享内存空间通信方便但需要处理数据竞争问题。协程 (Coroutine): 在单个线程内运行由事件循环调度资源消耗低但需要协作式调度。
1.4 进程 (Process)
定义: 进程是正在运行的计算机程序的实例使用内存和 CPU 时间片。
特点:
隔离性: 每个进程拥有独立的内存空间彼此隔离。通信方式: 进程间通过管道、套接字或内存映射文件进行通信这些方式只能传输原始字节因此 Python 对象需要序列化。资源消耗: 进程比线程消耗更多资源。抢占式多任务: 操作系统调度器会定期抢占暂停正在运行的进程让其他进程运行。这意味着一个冻结的进程不会冻结整个系统理论上。
举例: 在 Python 中可以使用 multiprocessing 或 concurrent.futures 库启动额外的 Python 进程。
1.5 线程 (Thread)
定义: 线程是进程内的执行单元。
特点:
共享内存: 同一进程内的线程共享相同的内存空间这使得线程间数据共享变得容易但也可能导致数据损坏当多个线程同时更新同一个对象时。资源消耗: 线程比进程消耗更少的资源。抢占式多任务: 线程也在操作系统调度器的监督下进行抢占式多任务处理。
举例: 在 Python 中可以使用 threading 或 concurrent.futures 库创建额外的线程。
1.6 协程 (Coroutine)
定义: 协程是一种可以暂停自身执行并在之后恢复的函数。
Python 中的协程:
经典协程: 由生成器函数构建。原生协程: 使用 async def 定义。
特点:
事件循环: Python 协程通常在单个线程内运行由事件循环调度。协作式多任务: 每个协程必须显式地使用 yield 或 await 关键字让出控制权以便其他协程可以并发但不是并行执行。阻塞问题: 协程中的任何阻塞代码都会阻塞事件循环和所有其他协程这与进程和线程支持的抢占式多任务形成对比。资源消耗: 每个协程消耗的资源比线程或进程少。
举例: 使用 asyncio, Curio 或 Trio 等异步编程框架可以提供事件循环和用于非阻塞、基于协程的 I/O 的支持库。
1.7 队列 (Queue)
定义: 队列是一种数据结构允许我们按 FIFO先进先出顺序放入和取出项目。
用途: 队列允许不同的执行单元交换应用程序数据和控制消息例如错误代码和终止信号。
Python 中的队列实现:
queue 模块: 提供支持线程的队列类。multiprocessing 和 asyncio 包: 实现自己的队列类。其他类型的队列: LifoQueue后进先出和 PriorityQueue优先级队列。
1.8 锁 (Lock)
定义: 锁是执行单元可以用来同步其操作并避免数据损坏的对象。
工作原理: 当更新共享数据结构时运行中的代码应持有相关的锁。这向程序的其他部分发出信号在锁释放之前等待访问相同的数据结构。
最简单类型的锁: 互斥锁mutex用于互斥。
1.9 竞争 (Contention)
定义: 竞争是指对有限资源的争夺。
资源竞争: 当多个执行单元尝试访问共享资源例如锁或存储时就会发生资源竞争。CPU 竞争: 当计算密集型进程或线程必须等待操作系统调度器分配 CPU 时间片时就会发生 CPU 竞争。
2、Python 中的并发支持
2.1 进程和线程与 GIL
GIL (Global Interpreter Lock):
定义: GIL 是控制对对象引用计数和其他内部解释器状态的锁。作用: 在任何时候只有一个 Python 线程可以持有 GIL这意味着即使有多个 CPU 核心也只有一个线程可以执行 Python 代码。影响: 多线程性能: GIL 限制了多线程在 CPU 密集型任务上的性能因为线程之间会竞争 GIL导致上下文切换开销。I/O 密集型任务: 对于 I/O 密集型任务GIL 的影响较小因为 I/O 操作会释放 GIL线程可以在等待 I/O 时让出 CPU。CPU 密集型任务: 对于 CPU 密集型任务顺序的单线程代码通常更简单、更快。
GIL 的释放:
内置函数和 C 扩展: 任何在 Python/C API 级别集成的内置函数或 C 扩展都可以在执行耗时任务时释放 GIL。标准库函数: 所有执行系统调用的标准库函数都会释放 GIL包括所有执行磁盘 I/O、网络 I/O 和 time.sleep() 的函数。NumPy/SciPy: NumPy/SciPy 库中的许多 CPU 密集型函数以及 zlib 和 bz2 模块中的压缩/解压缩函数也会释放 GIL。非 Python 线程: 在 Python/C API 级别集成的扩展也可以启动其他不受 GIL 影响的非 Python 线程但这些线程通常不能更改 Python 对象。
GIL 对网络编程的影响: 由于 I/O 操作会释放 GIL并且网络读写总是意味着高延迟与内存读写相比因此 GIL 对 Python 线程的网络编程影响相对较小。因此David Beazley 说“Python 线程擅长什么都不做。”
多核 CPU 上的 CPU 密集型任务:
解决方案: 要在多核 CPU 上运行 CPU 密集型 Python 代码必须使用多个 Python 进程。
总结:
多线程适用场景: 如果您希望应用程序更好地利用多核机器的计算资源建议使用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。多线程适用场景: 如果您希望同时运行多个 I/O 密集型任务threading 仍然是一个合适的模型。
2.2 协程与 GIL
协程与 GIL 的关系:
默认情况: 默认情况下协程在同一个 Python 线程中共享事件循环因此 GIL 不会影响它们。多线程协程: 在异步程序中使用多个线程是可能的但最佳实践是让一个线程运行事件循环和所有协程而额外的线程执行特定任务。
3、实际应用中的注意事项 选择合适的并发模型: CPU 密集型任务: 使用多进程 (multiprocessing 或 ProcessPoolExecutor)。I/O 密集型任务: 使用多线程 (threading 或 ThreadPoolExecutor) 或协程 (asyncio)。混合任务: 可以结合使用多进程和多线程或协程。 避免 GIL 的限制: 使用多进程: 对于 CPU 密集型任务避免使用多线程转而使用多进程。使用 C 扩展: 利用 C 扩展释放 GIL可以提高多线程性能。 线程安全: 锁的使用: 在多线程环境中更新共享数据时必须使用锁来防止数据竞争。线程安全的数据结构: 使用线程安全的数据结构例如 queue.Queue可以简化并发编程。 协程的阻塞问题: 避免阻塞代码: 在协程中避免使用阻塞代码例如 time.sleep()可以使用 await asyncio.sleep() 代替。异步编程实践: 遵循异步编程的最佳实践例如使用异步库和异步 I/O 操作。
二、A Concurrent Hello World
1、Spinner with Threads
1. 概述
本节将深入探讨如何使用 Python 的 threading 模块实现一个简单的终端动画 Spinner。该程序在执行耗时操作时通过在终端显示旋转动画来提示用户程序正在运行而不是卡死。
2. 主要概念
2.1 线程Thread
定义: 线程是操作系统能够进行运算调度的最小单位是进程中的一个执行流。作用: 在本例中我们使用线程来实现耗时操作与动画的并发执行从而避免阻塞主线程。
2.2 threading.Event 类
定义: Event 是线程间最简单的信号机制用于在线程之间传递事件信号。主要方法: set(): 将内部标志设置为 True通知所有等待该事件的线程。wait(timeoutNone): 阻塞调用线程直到内部标志被设置为 True 或超时如果指定了 timeout。 工作原理: 初始时内部标志为 False。当某个线程调用 set() 时标志变为 True所有等待该事件的线程将被唤醒。如果调用 wait() 时指定了 timeout则线程会在超时后继续执行即使标志仍为 False。
3. 代码详解
3.1 spin 函数
import itertools
import time
from threading import Thread, Eventdef spin(msg: str, done: Event) - None:for char in itertools.cycle(r\|/-):status f\r{char} {msg}print(status, end, flushTrue)if done.wait(0.1):breakblanks * len(status)print(f\r{blanks}\r, end)功能: 在终端显示旋转动画直到接收到停止信号。关键点: itertools.cycle(r\|/-): 创建一个无限循环的迭代器依次生成字符 \, |, /, -, \, ...实现旋转效果。\r: 回车符将光标移动到行首实现覆盖输出达到动画效果。flushTrue: 强制刷新输出缓冲区确保动画及时显示。done.wait(0.1): 每隔 0.1 秒检查一次停止信号。如果接收到信号则退出循环。清除动画: 通过打印与动画相同长度的空格再移动光标到行首实现清除动画的效果。
3.2 slow 函数
def slow() - int:time.sleep(3)return 42功能: 模拟一个耗时操作例如网络请求或复杂计算阻塞调用线程 3 秒后返回结果 42。关键点: time.sleep(3) 会阻塞调用线程但会释放全局解释器锁GIL允许其他线程运行。
3.3 supervisor 函数
def supervisor() - int:done Event()spinner Thread(targetspin, args(thinking!, done))print(fspinner object: {spinner})spinner.start()result slow()done.set()spinner.join()return result功能: 协调主线程和 spinner 线程的运行。关键点: Event(): 创建一个事件对象用于通知 spinner 线程停止。Thread(targetspin, args(thinking!, done)): 创建一个新线程目标函数为 spin参数为 (thinking!, done)。spinner.start(): 启动 spinner 线程开始显示动画。slow(): 调用耗时操作阻塞主线程。done.set(): 设置事件通知 spinner 线程停止。spinner.join(): 等待 spinner 线程结束确保动画被正确清除。
3.4 main 函数
def main() - None:result supervisor()print(fAnswer: {result})功能: 调用 supervisor 函数并输出结果。
4. 运行流程
主线程创建 Event 对象 done。主线程创建并启动 spinner 线程传入消息 thinking! 和 done 事件。主线程调用 slow()阻塞自身 3 秒。Spinner 线程每隔 0.1 秒更新一次动画字符直到 done 事件被设置。slow() 返回后主线程设置 done 事件通知 spinner 线程停止。主线程等待 spinner 线程结束然后输出结果 Answer: 42。
5. 关键点总结
import itertools
import time
from threading import Thread, Eventdef spin(msg: str, done: Event) - None:for char in itertools.cycle(r\|/-):status f\r{char} {msg}print(status, end, flushTrue)if done.wait(0.1):breakblanks * len(status)print(f\r{blanks}\r, end)def slow() - int:time.sleep(3)return 42def supervisor() - int:done Event()spinner Thread(targetspin, args(thinking!, done))print(fspinner object: {spinner})spinner.start()result slow()done.set()spinner.join()return resultdef main() - None:result supervisor()print(fAnswer: {result})if __name__ __main__:main()线程与 GIL: Python 的全局解释器锁GIL允许同一时间只有一个线程执行 Python 字节码。time.sleep() 会释放 GIL因此其他线程可以运行。在本例中slow() 阻塞主线程但 spinner 线程可以继续运行因为 time.sleep() 释放了 GIL。 线程间通信: threading.Event 是一种简单的线程间通信机制用于通知事件的发生。在本例中done 事件用于通知 spinner 线程停止运行。 动画实现: 使用 \r 回车符和 flushTrue 实现覆盖输出达到动画效果。通过循环更新字符实现旋转效果。
2、Spinner with Processes
1. 多进程与多线程概述
在 Python 中实现并发编程主要有两种方式
多线程Threading在同一个进程中创建多个线程共享内存空间但受限于全局解释器锁GIL无法真正利用多核 CPU。多进程Multiprocessing创建多个独立的进程每个进程拥有独立的内存空间和 GIL可以真正利用多核 CPU。
multiprocessing 模块 旨在模拟 threading 模块的 API使得从多线程到多进程的转换更加容易。
2. multiprocessing.Process 类创建和管理子进程
2.1 基本用法
与 threading.Thread 类似multiprocessing.Process 用于创建子进程。以下是 spinner_proc.py 的示例代码
import itertools
import time
from multiprocessing import Process, Event
from multiprocessing import synchronizedef spin(msg: str, done: synchronize.Event) - None: for char in itertools.cycle(r\|/-):status f\r{char} {msg}print(status, end, flushTrue)if done.wait(0.1):breakblanks * len(status)print(f\r{blanks}\r, end)def slow() - int:time.sleep(3)return 42def supervisor() - int:done Event()spinner Process(targetspin, args(thinking!, done)) # 创建子进程print(fspinner object: {spinner})spinner.start() # 启动子进程result slow()done.set() # 通知子进程spinner.join() # 等待子进程结束return resultdef main() - None:result supervisor()print(fAnswer: {result})if __name__ __main__:main()关键点
创建子进程Process(targetspin, args(thinking!, done)) target 参数指定子进程要执行的函数。args 参数是传递给目标函数的参数元组。 启动子进程spinner.start() 启动子进程并执行目标函数。 等待子进程结束spinner.join() 主进程会阻塞直到子进程结束。
输出示例
spinner object: Process nameProcess-1 parent14868 initialProcess-1 是子进程的名称。14868 是运行 spinner_proc.py 的 Python 进程的进程 ID。
2.2 与 threading.Thread 的对比
API 相似性两者都提供了 start()、join() 等方法API 风格相似易于从多线程切换到多进程。实现差异 threading.Event 是一个类而 multiprocessing.Event 是一个函数返回一个 synchronize.Event 实例。需要从 multiprocessing 模块导入 synchronize 以使用类型提示。
3. 进程间通信挑战与解决方案
3.1 进程隔离与数据共享
隔离性操作系统将每个进程隔离进程之间无法直接共享 Python 对象。序列化与反序列化跨进程边界传递数据时需要对数据进行序列化和反序列化这会带来开销。
示例
在 spinner_proc.py 中只有 Event 对象的状态在进程间传递。Event 是由 multiprocessing 模块底层 C 代码使用操作系统信号量实现的因此可以高效地在进程间共享。
3.2 共享内存
从 Python 3.8 开始multiprocessing.shared_memory 模块提供了共享内存功能但存在以下限制
不支持用户自定义类的实例。支持的数据类型有限 原始字节bytesShareableList一种可变序列类型可以存储固定数量的 int、float、bool、None 以及最大 10MB 的 str 和 bytes。
4. 实际应用中的注意事项
4.1 进程池Process Pool
概念预先创建一组进程重复利用这些进程执行任务避免频繁创建和销毁进程带来的开销。使用场景适用于需要大量任务并行执行的场景如 CPU 密集型任务。
示例
from multiprocessing import Pooldef square(x):return x * xif __name__ __main__:with Pool(4) as pool:results pool.map(square, range(10))print(results)输出示例
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]4.2 进程间通信机制
除了共享内存multiprocessing 还提供了多种进程间通信机制
队列Queue线程和进程安全的 FIFO 队列。管道Pipe用于在两个进程之间建立双向通信通道。管理器Manager提供多种共享数据结构如列表、字典等。
队列示例
from multiprocessing import Process, Queuedef worker(q):q.put(Hello from worker)if __name__ __main__:q Queue()p Process(targetworker, args(q,))p.start()print(q.get()) # 输出: Hello from workerp.join()5. 总结
multiprocessing 模块 提供了强大的多进程编程支持弥补了多线程受限于 GIL 的不足。API 相似性 使得从多线程切换到多进程变得相对容易但需要注意进程间通信带来的开销和复杂性。共享内存 提供了高效的进程间数据共享方式但需要谨慎使用以避免数据竞争和死锁。进程池 是管理大量并行任务的有效工具可以提高程序性能。
6. 补充示例使用 multiprocessing 实现生产者-消费者模型
from multiprocessing import Process, Queue
import timedef producer(q, items):for item in items:q.put(item)print(fProduced: {item})time.sleep(1)q.put(None) # 发送终止信号def consumer(q):while True:item q.get() # 这是一个阻塞操作if item is None:breakprint(fConsumed: {item})time.sleep(2)if __name__ __main__:q Queue()p Process(targetconsumer, args(q,))p.start()producer(q, range(5))p.join()输出示例
Produced: 0
Consumed: 0
Produced: 1
Produced: 2
Consumed: 1
Produced: 3
Produced: 4
Consumed: 2
Consumed: 3
Consumed: 4为什么需要 join() 等待子进程完成 在主进程中启动子进程后主进程会继续执行后续代码。如果不使用 join()主进程可能会在子进程完成之前就结束导致子进程成为孤儿进程或僵尸进程。join() 方法会阻塞主进程直到子进程 p 结束。这样可以确保子进程完成其任务后再继续执行主进程的后续操作。 资源管理 如果主进程过早退出可能导致子进程无法正常结束进而导致资源泄漏或其他不可预见的问题。使用 join() 可以确保所有子进程在主进程退出前完成其任务从而正确地释放资源。 同步 在多进程编程中进程之间的同步非常重要。join() 提供了一种简单的同步机制确保主进程在子进程完成后再继续执行。在你的代码中join() 确保主进程等待消费者子进程完成消费所有队列中的元素包括接收到终止信号 None 后退出循环。
q.get()是如何做到阻塞操作的
要深入理解 Queue.get() 的阻塞实现我们需要考虑一些底层的同步机制。Python 的 multiprocessing.Queue 是基于底层的操作系统原语来实现的这些原语确保多进程环境中的数据安全和同步。
1. Queue 的基本结构 multiprocessing.Queue 实际上是通过 multiprocessing.Manager 和底层管道pipe来实现的。它使用进程间通信IPC机制来在进程间传递信息。关键的组件包括
管道Pipe用于在进程之间传递字节流。锁Lock用来确保同时只有一个进程可以访问或修改共享资源。信号量Semaphore或条件变量Condition Variable用于管理进程的同步。
2. 阻塞机制的实现 具体到 Queue.get() 的实现涉及以下几个步骤
a. 获取锁 每次调用 get() 方法时首先需要获取一个锁。这是为了确保当一个进程正在操作队列时其他进程无法进行相同的操作。锁的使用防止了数据竞争和不一致性。
b. 检查队列状态 一旦锁被获取get() 方法会检查队列中是否有可用的数据
如果队列不为空直接从队列中获取数据然后释放锁。进程可以继续执行。如果队列为空进程将进入等待状态。
c. 进入等待状态 如果队列为空get() 会让当前进程进入等待状态。这是通过信号量或条件变量实现的
信号量信号量会被初始化为 0。当队列中加入新数据put() 操作时信号量被增加表示有新的数据可用。阻塞的进程会被唤醒。条件变量条件变量允许进程等待特定的条件成立如队列中有数据。当条件满足时新数据加入队列条件变量会通知等待的进程继续执行。
d. 被唤醒和获取数据 当有新数据被放入队列时put() 方法会更新信号量或条件变量通知阻塞的 get() 操作可以继续执行。被唤醒的 get() 操作将重新获取锁检查队列并取出数据。
e. 释放锁 无论是成功获取数据还是由于其他原因退出get() 方法最后都会释放锁以允许其他进程进行队列操作。
经典问题为什么用while而不是if
这是一个多线程并发编程中经常会遇到的问题尤其是在等待条件时比如经典的生产者-消费者模型很多人会问 为什么在等待条件时常常写成 while not 条件: condition.wait()而不是 if not 条件: condition.wait() 核心原因防止虚假唤醒Spurious Wakeup和竞态条件
1. 虚假唤醒Spurious Wakeup
在多线程环境下有些操作系统或线程库实现允许线程在没有被显式通知的情况下从 wait() 返回这叫做“虚假唤醒”。如果你用if线程醒来后不会再次检查条件可能条件并没有真正满足结果导致程序逻辑错误。while会在每次醒来后重新检查条件确保只有在真正条件满足时才继续执行。
2. 竞争条件Race Condition
在多线程环境中可能多个线程被同时唤醒例如 notify_all() 唤醒多个线程这些线程会争夺资源只有一个能成功拿到其余又要继续等待。如果用if没抢到资源的线程不会再次判断条件会直接往下执行出错。while可以让没抢到资源的线程再次进入等待直到条件真正满足。 一句话记忆 多线程等待条件时用while不是if因为醒来后必须重新检查条件防止虚假唤醒和数据竞争。 7. 常见误区与调试技巧
避免在 __main__ 之外创建进程在 Windows 上必须将创建进程的代码放在 if __name__ __main__ 块中否则会导致递归创建进程。使用 if __name__ __main__ 保护主程序确保子进程不会再次执行主程序代码。注意全局变量进程之间不共享全局变量需要使用进程间通信机制传递数据。调试多进程程序可以使用 print 语句、日志记录或调试工具但要注意进程间输出的顺序。
3、Spinner with Coroutines
1. 协程驱动的 Spinner 程序解析
1.1 程序结构
我们以 spinner_async.py 为例探讨如何使用协程实现一个简单的 Spinner 程序该程序在后台显示旋转指示器同时执行耗时操作。
import asyncio
import itertoolsdef main() - None:result asyncio.run(supervisor())print(fAnswer: {result})async def supervisor() - int:spinner asyncio.create_task(spin(thinking!))print(fspinner object: {spinner})result await slow()spinner.cancel()return resultasync def spin(msg: str) - None:for char in itertools.cycle(r\|/-):status f\r{char} {msg}print(status, flushTrue, end)try:await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks * len(status)print(f\r{blanks}\r, end)async def slow() - int:await asyncio.sleep(3)return 42if __name__ __main__:main()在这个例子中任务调度是由 asyncio 的事件循环负责的。当一个任务协程在执行过程中遇到 await 关键字如 await asyncio.sleep()它会将控制权交还给事件循环让事件循环调度其他准备好执行的任务。这种机制让多个任务可以并发运行而不会因为等待某个任务的完成而阻塞整个程序。
具体调度过程 事件循环: 当程序运行时asyncio.run(supervisor()) 启动了事件循环。事件循环负责管理和调度所有由 asyncio.create_task() 创建的任务。 任务创建与调度: 在 supervisor 协程中asyncio.create_task(spin(thinking!)) 创建了一个新的任务 spin。事件循环立即开始调度 spin 的执行因为它是通过 create_task 显式创建的并且不依赖其他任务完成。 遇到 await 关键字: 在 spin 函数中await asyncio.sleep(.1) 暂停了 spin 的执行将控制权交还给事件循环。事件循环检查其他任务发现 supervisor 仍在等待 slow 的结果而 slow 也在 await asyncio.sleep(3) 上暂停。因为 spin 的暂停时间很短0.1 秒所以事件循环会频繁地重新调度 spin显示旋转动画。 并发执行: 由于 spin 和 slow 都在使用 await 进行异步等待事件循环能够在它们各自的等待期间切换执行。这意味着即使 slow 在等待 3 秒spin 仍能继续执行提供视觉反馈。 任务取消: 一旦 slow 完成并返回结果supervisor 立即调用 spinner.cancel() 来取消 spin 任务。当 spin 检测到取消请求时通过捕获 asyncio.CancelledError 来终止动画循环。
总结
调度角色: 是 asyncio 的事件循环负责调度任务。它在任务遇到 await 时自动切换到其他准备好执行的任务。所以await后面跟可等待对象协程对象、Future、Task对象他们都可能发生I/O等待自动切换: 任务在遇到 await asyncio.sleep() 或其他 I/O 操作时会自动将控制权交还给事件循环允许事件循环安排其他任务的执行。并发执行: 通过这种调度机制多个任务可以并发运行提高程序的响应性和效率特别是在 I/O 密集型操作中。
关于await
在 Python 的异步编程中await 关键字用于暂停协程的执行直到提供的可等待对象完成。可等待对象可以是协程对象、Future、或 Task。当代码执行到 await 时会发生以下事情 暂停当前协程: 当协程执行到 await 语句时它会暂停执行将控制权交还给事件循环。当前协程的状态被保存以便在可等待对象完成后可以恢复执行。 返回控制权给事件循环: await 后的操作通常是 I/O 操作或其他耗时任务。在这些操作进行时协程会将控制权交还给事件循环。事件循环可以利用这个时间去调度其他可运行的任务。这种机制允许其他协程在当前协程等待时执行从而实现并发。 事件循环调度其他任务: 事件循环会检查所有已注册的任务并寻找可以运行的任务即那些不在等待 I/O 的任务。如果有其他任务准备好执行事件循环会调度它们运行。 等待的任务完成后: 一旦 await 后的可等待对象完成例如一个网络请求完成一个定时器到期或一个文件被读取完毕事件循环会将控制权返回给之前暂停的协程。协程从上次暂停的位置恢复执行。 恢复执行: 在可等待对象完成后协程会继续执行 await 之后的代码。这可能涉及处理从 await 表达式返回的结果。
代码示例中的调度过程
以你的代码为例来看 await 的具体调度过程
async def spin(msg: str) - None:for char in itertools.cycle(r\|/-):status f\r{char} {msg}print(status, flushTrue, end)try:# 这里的 await asyncio.sleep(.1) 将暂停 spin 协程await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks * len(status)print(f\r{blanks}\r, end)async def slow() - int:# 这里的 await asyncio.sleep(3) 将暂停 slow 协程await asyncio.sleep(3)return 42在 spin 中遇到 await asyncio.sleep(.1) 时spin 协程暂停让出控制权。事件循环会检查其他任务发现 slow 也在等待await asyncio.sleep(3)。因为 spin 的等待时间很短事件循环在其他任务没有准备好执行时很快会回到 spin。继续执行 spin直到 slow 完成 3 秒的等待并返回结果。supervisor 协程恢复取消 spin打印结果。
这种机制使得 Python 的异步编程可以高效地处理大量 I/O 操作而不会因为等待而阻塞整个程序。
为什么输出是这样的
这个代码的输出为什么一直是先输出spinner object: Task pending name‘Task-2’ corospin() running at /fluent/test.py:15 然后再是动画最后是Answer: 42
在Python的asyncio中创建一个任务如使用asyncio.create_task())会安排该协程在事件循环中运行但不会立即执行该协程的代码。相反它会继续执行当前的同步代码直到遇到await或其他需要切换上下文的点。
具体到你的代码 创建任务: spinner asyncio.create_task(spin(thinking!))这行代码创建了一个任务对象spinner并安排spin协程在事件循环中运行。此时它只是在事件循环中注册了spin并不会立即执行spin的代码。 打印任务对象信息: print(fspinner object: {spinner})创建任务后代码继续执行这一行。这是一个普通的同步操作所以会立即被执行打印出任务对象的信息。 执行spin和slow: 接下来是 result await slow()这行代码会暂停supervisor协程的执行直到slow()完成。在这个暂停期间事件循环会有机会调度并执行其他任务包括spin任务。
正是因为事件循环的这种调度机制创建任务后控制权会立即返回到当前的同步代码块使得创建任务后的代码如打印任务对象信息会在任务的协程代码实际开始执行之前被执行。
1.2 关键概念解析 事件循环Event Loop 协程由事件循环驱动事件循环负责管理协程队列、调度协程执行、监控 I/O 操作事件并在事件发生时将控制权交还给相应的协程。事件循环和所有协程都在单线程内执行因此任何一个协程的阻塞都会导致事件循环变慢进而影响其他协程的执行。 asyncio.run() 函数 用于启动事件循环并驱动传入的协程对象通常是程序的入口点例如本例中的 supervisor。该函数会阻塞直到传入的协程执行完毕并返回协程的返回值。 原生协程Native Coroutines 使用 async def 关键字定义例如 supervisor、spin 和 slow。原生协程需要通过事件循环来驱动执行。 asyncio.create_task() 函数 从协程内部调用用于调度另一个协程的执行。该函数会立即返回一个 asyncio.Task 对象该对象包装了协程对象并提供了控制和管理协程状态的方法。例如spinner asyncio.create_task(spin(thinking!)) 创建了一个用于执行 spin 协程的任务并将其赋值给 spinner 变量。 await 关键字 从协程内部调用用于将控制权转移给另一个协程。当前协程会被挂起直到被 await 的协程执行完毕。例如result await slow() 会挂起 supervisor 协程直到 slow 协程执行完毕并将 slow 的返回值赋给 result。 协程的运行方式 asyncio.run(coro())从普通函数调用用于驱动协程对象通常是程序的入口点。该调用会阻塞直到协程体返回。asyncio.create_task(coro())从协程内部调用用于调度另一个协程的执行。该调用不会挂起当前协程并返回一个 Task 实例。await coro()从协程内部调用用于将控制权转移给被 await 的协程。该调用会挂起当前协程直到被 await 的协程返回。 协程的取消 调用 Task.cancel() 方法会向协程抛出 asyncio.CancelledError 异常协程可以通过捕获该异常来执行清理操作并退出。
1.3 关键函数解析 spin 协程 使用 itertools.cycle 循环显示旋转指示器\|/-。使用 await asyncio.sleep(0.1) 暂停 0.1 秒避免阻塞事件循环。捕获 asyncio.CancelledError 异常以优雅地退出循环。 slow 协程 使用 await asyncio.sleep(3) 模拟耗时操作并返回结果 42。同样使用 await asyncio.sleep 而不是 time.sleep以避免阻塞事件循环。
1.4 重要实验理解协程阻塞的影响
为了更好地理解协程的工作原理我们进行以下实验 实验内容将 slow 协程中的 await asyncio.sleep(3) 替换为 time.sleep(3)。 预期结果 程序显示 spinner 对象例如 Task pending nameTask-2 corospin() running at /path/to/spinner_async.py:12。Spinner 不会显示程序挂起 3 秒。显示 Answer: 42 并结束程序。 原因分析 time.sleep(3) 会阻塞主线程而主线程也是事件循环所在的线程。因此在 time.sleep(3) 期间事件循环无法执行任何其他协程包括 spin 协程。这导致 Spinner 无法显示程序整体被阻塞。 关键结论 永远不要在 asyncio 协程中使用 time.sleep(…)除非你希望暂停整个程序。如果协程需要等待一段时间应该使用 await asyncio.sleep(DELAY)这会将控制权交还给事件循环允许其他挂起的协程执行。
2. 协程与绿色线程Greenlets 绿色线程Greenlets 绿色线程是一种轻量级的协程实现由 greenlet 包提供。不需要使用 yield 或 await 等特殊语法更易于集成到现有的顺序代码库中。例如SQL Alchemy 1.4 ORM 使用绿色线程来实现其与 asyncio 兼容的新异步 API。 gevent 库 gevent 是一个基于绿色线程的网络库通过“猴子补丁monkey patching”的方式将 Python 的标准 socket 模块替换为非阻塞版本。对周围代码具有高度透明性使得将顺序应用程序和库如数据库驱动程序转换为执行并发网络 I/O 变得更加容易。许多开源项目使用 gevent包括广泛部署的 Gunicorn。
3. 总结
协程 是一种强大的并发编程工具在单线程内实现高效的 I/O 操作处理。事件循环 是协程的核心负责调度和管理协程的执行。避免使用 time.sleep()而应使用 await asyncio.sleep()以确保事件循环的流畅运行。绿色线程 和 gevent 提供了另一种异步编程的方式适用于不同的应用场景。
4、Supervisors Side-by-Side
spinner_thread.py和spinner_async.py的代码行数几乎相同。supervisor函数是这些示例的核心。下面我们来详细比较一下它们。示例19 - 8列出了示例19 - 2中的supervisor函数。
示例19 - 8 spinner_thread.py使用线程的supervisor函数
def supervisor() - int:done Event()spinner Thread(targetspin, args(thinking!, done))print(spinner object:, spinner)spinner.start()result slow()done.set()spinner.join()return result作为对比示例19 - 9展示了示例19 - 4中的supervisor协程。
示例19 - 9 spinner_async.py异步的supervisor协程
async def supervisor() - int:spinner asyncio.create_task(spin(thinking!))print(spinner object:, spinner)result await slow()spinner.cancel()return result下面总结这两个supervisor实现之间的异同点
asyncio.Task大致相当于threading.Thread。
示例对比
使用线程的示例 (thread_example.py):
import threading
import timedef task1():for i in range(5):print(fThread 1: {i})time.sleep(1)def task2():for i in range(5):print(fThread 2: {i})time.sleep(1.5)def run_threads():thread1 threading.Thread(targettask1)thread2 threading.Thread(targettask2)thread1.start()thread2.start()thread1.join()thread2.join()print(Both threads have finished.)if __name__ __main__:run_threads()使用asyncio的示例 (asyncio_example.py):
import asyncioasync def task1():for i in range(5):print(fTask 1: {i})await asyncio.sleep(1)async def task2():for i in range(5):print(fTask 2: {i})await asyncio.sleep(1.5)async def run_tasks():task1_coroutine asyncio.create_task(task1())task2_coroutine asyncio.create_task(task2())await task1_coroutineawait task2_coroutineprint(Both tasks have finished.)if __name__ __main__:asyncio.run(run_tasks())解释:
并发执行: 在线程示例中task1和task2在不同的线程中并行执行。在asyncio示例中task1和task2作为协程在同一个线程中并发执行通过事件循环调度。任务调度: 线程由操作系统调度而asyncio任务由事件循环调度。
Task驱动一个协程对象而Thread调用一个可调用对象。
示例对比
使用线程的示例 (thread_callable.py):
import threadingclass MyTask:def run(self):print(Thread is running a callable object.)def run_thread():obj MyTask()thread threading.Thread(targetobj.run)thread.start()thread.join()print(Thread has finished.)if __name__ __main__:run_thread()使用asyncio的示例 (asyncio_coroutine.py):
import asyncioclass MyCoroutine:async def run(self):print(Coroutine is running.)async def run_coroutine():obj MyCoroutine()await obj.run()print(Coroutine has finished.)if __name__ __main__:asyncio.run(run_coroutine())解释:
调用方式: 在线程示例中Thread调用了一个类的run方法这是一个可调用对象。在asyncio示例中asyncio直接调用了一个协程方法run这是因为asyncio只能调度协程。
协程通过await关键字显式地让出控制权。
示例对比
使用线程的示例 (thread_sleep.py):
import threading
import timedef task():for i in range(5):print(fThread: {i})time.sleep(1)def run_thread():thread threading.Thread(targettask)thread.start()thread.join()print(Thread has finished.)if __name__ __main__:run_thread()使用asyncio的示例 (asyncio_await.py):
import asyncioasync def task():for i in range(5):print(fCoroutine: {i})await asyncio.sleep(1)async def run_coroutine():await task()print(Coroutine has finished.)if __name__ __main__:asyncio.run(run_coroutine())解释:
控制权让渡: 在线程示例中time.sleep(1)会阻塞当前线程。在asyncio示例中await asyncio.sleep(1)会暂停当前协程的执行让事件循环调度其他协程执行。
你不能自己实例化Task对象而是通过将一个协程传递给asyncio.create_task(…)来获取它们。
示例对比
使用线程的示例 (thread_instantiate.py):
import threadingdef task():print(Thread is running.)def run_thread():thread threading.Thread(targettask)thread.start()thread.join()print(Thread has finished.)if __name__ __main__:run_thread()使用asyncio的示例 (asyncio_create_task.py):
import asyncioasync def task():print(Task is running.)async def run_tasks():task_coroutine asyncio.create_task(task())await task_coroutineprint(Task has finished.)if __name__ __main__:asyncio.run(run_tasks())解释:
任务创建: 在线程示例中Thread对象是通过threading.Thread直接实例化的。在asyncio示例中Task对象是通过asyncio.create_task创建的而不是通过实例化Task类。
当asyncio.create_task(…)返回一个Task对象时它已经被安排好要运行了但Thread实例必须通过调用其start方法来显式地启动运行。
示例对比
使用线程的示例 (thread_start.py):
import threadingdef task():print(Thread is running.)def run_thread():thread threading.Thread(targettask)# 线程尚未启动# thread.start() 必须调用才能启动线程thread.start()thread.join()print(Thread has finished.)if __name__ __main__:run_thread()使用asyncio的示例 (asyncio_schedule.py):
import asyncioasync def task():print(Task is running.)async def run_tasks():task_coroutine asyncio.create_task(task())# Task 已经被安排运行无需显式启动await task_coroutineprint(Task has finished.)if __name__ __main__:asyncio.run(run_tasks())解释:
任务启动: 在线程示例中必须调用thread.start()才能启动线程。在asyncio示例中asyncio.create_task会立即安排协程执行无需显式启动。
在使用线程的supervisor中slow是一个普通函数由主线程直接调用。在异步的supervisor中slow是一个由await驱动的协程。
示例对比
使用线程的示例 (thread_slow.py):
import threading
import timedef slow():print(Slow function is running.)time.sleep(2)print(Slow function is done.)return 42def supervisor():thread threading.Thread(targetslow)thread.start()thread.join()print(Supervisor has finished.)if __name__ __main__:supervisor()使用asyncio的示例 (asyncio_slow.py):
import asyncioasync def slow():print(Slow coroutine is running.)await asyncio.sleep(2)print(Slow coroutine is done.)return 42async def supervisor():task asyncio.create_task(slow())await taskprint(Supervisor has finished.)if __name__ __main__:asyncio.run(supervisor())解释:
函数调用: 在线程示例中slow是一个普通函数由主线程直接调用并在新线程中执行。在asyncio示例中slow是一个协程通过await调用。
没有从外部终止线程的API相反你必须发送一个信号比如设置done这个Event对象。对于任务有Task.cancel()实例方法它会在协程体当前暂停的await表达式处引发CancelledError异常。
示例对比
使用线程的示例 (thread_cancel.py):
import threading
import time
from threading import Eventdef task(done):while not done.is_set():print(Thread is running.)time.sleep(1)print(Thread has been cancelled.)def run_thread():done Event()thread threading.Thread(targettask, args(done,))thread.start()time.sleep(3)done.set()thread.join()print(Thread has finished.)if __name__ __main__:run_thread()使用asyncio的示例 (asyncio_cancel.py):
import asyncioasync def task():try:while True:print(Task is running.)await asyncio.sleep(1)except asyncio.CancelledError:print(Task has been cancelled.)async def run_tasks():task_coroutine asyncio.create_task(task())await asyncio.sleep(3)task_coroutine.cancel()await task_coroutineprint(Task has finished.)if __name__ __main__:asyncio.run(run_tasks())解释:
取消机制: 在线程示例中通过设置Event对象来通知线程停止。在asyncio示例中使用task.cancel()来取消任务协程会在await asyncio.sleep(1)处捕获CancelledError异常。
supervisor协程必须在main函数中通过asyncio.run来启动。
示例对比
使用线程的示例 (thread_main.py):
import threading
import timedef task():print(Thread is running.)time.sleep(2)print(Thread is done.)def supervisor():thread threading.Thread(targettask)thread.start()thread.join()print(Supervisor has finished.)if __name__ __main__:supervisor()使用asyncio的示例 (asyncio_main.py):
import asyncioasync def task():print(Task is running.)await asyncio.sleep(2)print(Task is done.)async def supervisor():await task()print(Supervisor has finished.)if __name__ __main__:asyncio.run(supervisor())解释:
程序入口: 在线程示例中supervisor函数作为主函数直接调用。在asyncio示例中supervisor协程通过asyncio.run启动。
总结
通过这些具体的对比示例我们可以看到threading.Thread和asyncio.Task在并发执行、调用方式、控制权让渡、任务创建与启动、取消机制以及程序入口等方面的不同。这些差异使得asyncio在处理高并发和I/O密集型任务时更加高效。
关于线程和协程的最后一点如果你用线程进行过任何复杂的编程就会知道理解程序的逻辑有多困难因为调度器可以随时中断线程。你必须记住持有锁来保护程序的关键部分以避免在多步骤操作过程中被中断否则可能会使数据处于无效状态。
而对于协程默认情况下你的代码不会被中断。你必须显式地使用await让程序的其他部分运行。协程不是通过持有锁来同步多个线程的操作而是从定义上就是 “同步” 的任何时刻只有一个协程在运行。当你想放弃控制权时使用await将控制权交回调度器。这就是为什么可以安全地取消一个协程从定义上讲协程只有在await表达式处暂停时才能被取消所以你可以通过处理CancelledError异常来进行清理工作。
time.sleep()调用会阻塞但不做任何事情。现在我们将通过一个CPU密集型的调用来进行实验以便更好地理解全局解释器锁GIL以及CPU密集型函数在异步代码中的影响。
三、The Real Impact of the GIL
——基于多进程、多线程、异步编程的对比分析
1、核心问题GIL 对 CPU 密集型任务的影响
GIL全局解释器锁 是 CPython 解释器的设计特性它确保同一时刻仅有一个线程执行 Python 字节码。这对 CPU 密集型任务有显著影响
I/O 操作等待网络/磁盘时释放 GIL允许其他线程运行如 time.sleep(3) 或 HTTP 请求。CPU 密集型操作长时间占用 CPU 的函数会阻塞其他线程如素数计算。
2、原始例子素数判断函数
import mathdef is_prime(n: int) - bool:if n 2:return Falseif n 2:return Trueif n % 2 0:return Falseroot math.isqrt(n)for i in range(3, root 1, 2): # 仅检查奇数if n % i 0:return Falsereturn True测试 is_prime(5_000_111_000_222_021) 耗时约 3.3 秒CPU 密集型。 3、三大并发模型的行为对比
假设将以下代码中的 time.sleep(3) 或 asyncio.sleep(3) 替换为 is_prime(n) 多进程 (spinner_proc.py) 结果旋转动画持续运行。原理子进程负责动画父进程计算素数。进程间内存隔离GIL 不共享。 # 伪代码示例
from multiprocessing import Process, Eventdef spin():while not done_event.is_set():# 更新动画...if __name__ __main__:done_event Event()p Process(targetspin) # 子进程运行动画p.start()is_prime(n) # 主进程计算素数done_event.set()多线程 (spinner_thread.py) 结果动画持续运行但可能卡顿。原理 Python 默认每 5ms 强制切换线程通过 sys.setswitchinterval 设置。主线程计算素数时每 5ms 被中断一次动画线程获得 GIL 更新状态。 陷阱若线程数 CPU 核心数程序效率会显著下降 # 伪代码示例
import threadingdef spin():while not done_event.is_set():# 更新动画短暂占用 GIL...done_event threading.Event()
t threading.Thread(targetspin)
t.start()
is_prime(n) # 主线程计算素数
done_event.set()异步编程 (spinner_async.py) 结果动画完全冻结原理 异步任务在单线程中运行is_prime 阻塞事件循环。动画任务 (spinner) 从未获得执行机会。 # 伪代码示例
import asyncioasync def spin():while not done:# 更新动画await asyncio.sleep(0.1)async def slow():is_prime(n) # 阻塞整个事件循环return 42async def supervisor():spinner_task asyncio.create_task(spin())result await slow()spinner_task.cancel()return result关键难点与解决方案
1. 异步编程中运行 CPU 密集型任务
问题直接调用 is_prime 会阻塞事件循环。 方案一不推荐插入 await asyncio.sleep(0)
async def is_prime_async(n):for i in range(3, root 1, 2):if n % i 0:return Falseif i % 100_000 1: # 每 10 万次迭代让步一次await asyncio.sleep(0) # 让出控制权return True缺点计算时间从 3.3s 增至 4.9s效率下降 50%且事件循环仍被拖慢。
方案二推荐使用 run_in_executor 移交到线程池
async def slow():loop asyncio.get_running_loop()result await loop.run_in_executor(None, is_prime, n) # 在子线程中运行return result这种情况下的is_prime函数要是非async的如果是async会报错RuntimeWarning: Enable tracemalloc to get the object allocation
loop.run_in_executor 方法用于在不同的线程中运行一个阻塞的同步函数而不是一个协程。因此这个问题导致了 is_prime 协程没有被正确地等待await从而引发了 RuntimeWarning。
所以需要将 is_prime 改为同步函数如果 is_prime 函数本身不需要异步执行你可以将它定义为一个普通的同步函数去掉 async 关键字。这样可以直接使用 loop.run_in_executor 来调用它。
完整代码
import mathdef is_prime(n: int) - bool:if n 2:return Falseif n 2:return Trueif n % 2 0:return Falseroot math.isqrt(n)for i in range(3, root 1, 2): # 仅检查奇数if n % i 0:return Falsereturn Trueimport asyncio
import itertoolsasync def slow():loop asyncio.get_running_loop()result await loop.run_in_executor(None, is_prime, 5_000_111_000_222_021) # 在子线程中运行return resultdef main() - None:result asyncio.run(supervisor())print(fAnswer: {result})async def supervisor() - int:spinner asyncio.create_task(spin(thinking!))print(fspinner object: {spinner})result await slow()spinner.cancel()return resultasync def spin(msg: str) - None:for char in itertools.cycle(r\|/-):status f\r{char} {msg}print(status, flushTrue, end)try:await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks * len(status)print(f\r{blanks}\r, end)if __name__ __main__:main()原理将阻塞函数转移到线程池执行避免阻塞事件循环。
2. 多线程的适用场景
适合I/O 密集型任务如 HTTP 请求、文件读写。避免CPU 密集型任务除非任务能频繁释放 GIL如 NumPy/C 扩展。对比实验
# 两个 CPU 密集型线程 vs 顺序执行
import threading
import time
import mathn 5_000_111_000_222_021def is_prime(n: int) - bool:if n 2:return Falseif n 2:return Trueif n % 2 0:return Falseroot math.isqrt(n)for i in range(3, root 1, 2): # 仅检查奇数if n % i 0:return Falsereturn Truedef task():for _ in range(2): # 并行执行两次is_prime(n)# 顺序执行更快
start time.time()
is_prime(n)
is_prime(n)
print(Sequential:, time.time() - start) # 多线程执行更慢
t1 threading.Thread(targetis_prime, args(n,))
t2 threading.Thread(targetis_prime, args(n,))
t1.start(); t2.start()
t1.join(); t2.join()
print(Threads:, time.time() - start) # Sequential: 2.7513389587402344
# Threads: 5.548290014266968工程实践总结
场景推荐模型原因I/O 密集型网络/磁盘异步编程 (asyncio)高并发、低开销CPU 密集型多进程 (multiprocessing)绕过 GIL利用多核 CPU混合任务异步 线程池I/O 用 asyncioCPU 用 run_in_executor 黄金法则 “I/O 用异步CPU 用进程”避免在异步事件循环中直接调用 CPU 密集型函数 四、A Homegrown Process Pool
本节是为了展示如何使用多个进程处理CPU密集型任务以及使用队列来分配任务和收集结果的常见模式。第20章将介绍一种向进程分配任务的更简单方法使用concurrent.futures包中的ProcessPoolExecutor它内部使用了队列。
1. 问题背景与核心概念
1.1 问题描述
我们需要检测一组大数20个从2到10¹⁶-1的整数是否为素数。这是一个CPU密集型任务因为判断大数是否为素数需要大量计算。
primes.py:
#!/usr/bin/env python3import mathPRIME_FIXTURE [(2, True),(142702110479723, True),(299593572317531, True),(3333333333333301, True),(3333333333333333, False),(3333335652092209, False),(4444444444444423, True),(4444444444444444, False),(4444444488888889, False),(5555553133149889, False),(5555555555555503, True),(5555555555555555, False),(6666666666666666, False),(6666666666666719, True),(6666667141414921, False),(7777777536340681, False),(7777777777777753, True),(7777777777777777, False),(9999999999999917, True),(9999999999999999, False),
]NUMBERS [n for n, _ in PRIME_FIXTURE]# tag::IS_PRIME[]
def is_prime(n: int) - bool:if n 2:return Falseif n 2:return Trueif n % 2 0:return Falseroot math.isqrt(n)for i in range(3, root 1, 2):if n % i 0:return Falsereturn True
# end::IS_PRIME[]if __name__ __main__:for n, prime in PRIME_FIXTURE:prime_res is_prime(n)assert prime_res primeprint(n, prime)1.2 核心挑战
顺序执行单进程效率低下如何利用多核CPU加速计算
1.3 关键解决方案
使用 多进程 (multiprocessing) 和 任务队列 (Queue)
创建多个工作进程并行处理任务使用队列分配任务和收集结果避免GIL全局解释器锁对CPU密集型任务的限制
2. 顺序执行性能基准
2.1 代码实现 (sequential.py)
#!/usr/bin/env python3
from time import perf_counter
from typing import NamedTuple
from primes import is_prime, NUMBERSclass Result(NamedTuple):prime: boolelapsed: floatdef check(n: int) - Result:t0 perf_counter()prime is_prime(n)return Result(prime, perf_counter() - t0)def main() - None:print(fChecking {len(NUMBERS)} numbers sequentially:)t0 perf_counter()for n in NUMBERS:prime, elapsed check(n)label P if prime else print(f{n:16} {label} {elapsed:9.6f}s)elapsed perf_counter() - t0print(fTotal time: {elapsed:.2f}s)if __name__ __main__:main()2.2 运行结果
Checking 20 numbers sequentially:2 P 0.000000s142702110479723 P 0.247988s299593572317531 P 0.338454s
3333333333333301 P 1.098308s
3333333333333333 0.000014s
3333335652092209 1.093885s
4444444444444423 P 1.270619s
4444444444444444 0.000001s
4444444488888889 1.343093s
5555553133149889 1.531583s
5555555555555503 P 1.479670s
5555555555555555 0.000006s
6666666666666666 0.000000s
6666666666666719 P 1.803434s
6666667141414921 1.786589s
7777777536340681 1.739732s
7777777777777753 P 2.059888s
7777777777777777 0.000009s
9999999999999917 P 1.965799s
9999999999999999 0.000005s
Total time: 17.76s2.3 关键观察
所有任务顺序执行小数字检测快微秒级大素数检测慢总时间≈各任务耗时之和
3. 多进程解决方案 (procs.py)
3.1 核心代码解析
import sys
from time import perf_counter
from typing import NamedTuple
from multiprocessing import Process, SimpleQueue, cpu_count
from multiprocessing import queues
from primes import is_prime, NUMBERSclass PrimeResult(NamedTuple):n: int # 存储原始数字prime: bool # 是否为素数elapsed: float # 检测耗时# 类型别名提高可读性
JobQueue queues.SimpleQueue[int]
ResultQueue queues.SimpleQueue[PrimeResult]def check(n: int) - PrimeResult:t0 perf_counter()res is_prime(n)return PrimeResult(n, res, perf_counter() - t0)def worker(jobs: JobQueue, results: ResultQueue) - None:while n : jobs.get(): # 从队列获取任务results.put(check(n)) # 提交结果# 毒丸处理发送终止信号results.put(PrimeResult(0, False, 0.0))def start_jobs(procs: int, jobs: JobQueue, results: ResultQueue) - None:# 填充任务队列for n in NUMBERS:jobs.put(n)# 启动工作进程for _ in range(procs):proc Process(targetworker, args(jobs, results))proc.start()# 添加毒丸每个进程一个for _ in range(procs):jobs.put(0)def main() - None:# 确定进程数默认为CPU核心数procs cpu_count() if len(sys.argv) 2 else int(sys.argv[1])print(fChecking {len(NUMBERS)} numbers with {procs} processes:)t0 perf_counter()# 创建队列jobs: JobQueue SimpleQueue()results: ResultQueue SimpleQueue()# 启动任务start_jobs(procs, jobs, results)# 处理结果checked 0procs_done 0while procs_done procs:n, prime, elapsed results.get()if n 0: # 毒丸检测procs_done 1else:checked 1label P if prime else print(f{n:16} {label} {elapsed:9.6f}s)elapsed perf_counter() - t0print(f{checked} checks in {elapsed:.2f}s)if __name__ __main__:main()3.2 运行结果8进程
Checking 20 numbers with 8 processes:2 P 0.000001s
3333333333333333 0.000005s
4444444444444444 0.000001s142702110479723 P 0.419267s
5555555555555555 0.000004s
6666666666666666 0.000000s299593572317531 P 0.517399s
3333333333333301 P 1.804822s
3333335652092209 1.804431s
4444444444444423 P 1.953585s
7777777777777777 0.000005s
4444444488888889 2.084240s
9999999999999999 0.000007s
5555553133149889 2.250911s
5555555555555503 P 2.257135s
6666666666666719 P 2.442794s
6666667141414921 2.534752s
7777777777777753 P 2.199439s
7777777536340681 2.235224s
9999999999999917 P 2.372946s
20 checks in 4.40s3.3 关键改进
并行处理使用8个进程对应8个物理CPU核心 队列管理 JobQueue分发待检测数字ResultQueue收集检测结果 毒丸模式 (Poison Pill)用特殊值0通知进程终止结果关联在结果中存储原始数字PrimeResult.n解决乱序问题
3.4 性能提升
方案总耗时加速比顺序执行17.76s1x多进程(8)4.40s4.0x 超线程的真相虽然系统报告12个逻辑核心6物理核心×2线程但CPU密集型任务中实际性能接近物理核心数6核心 ps上面的是原文的内容但是这个是超线程Hyper-Threading是一种由英特尔开发的技术用于提高处理器的并行处理能力。通过超线程一个物理核心可以被虚拟化为两个逻辑核心这使得操作系统和应用程序可以将其视为两个独立的核心进行任务分配。 mac的m芯片并没有这个超线程。 M2 芯片有 8 个核心其中包括 4 个性能核心和 4 个能效核心。这意味着在进行 CPU 密集型任务时你可以使用所有 8 个核心来处理任务。与超线程不同M2 芯片的核心是物理核心不是通过超线程技术增加的逻辑核心。因此你可以在 CPU 密集型任务中充分利用这 8 个物理核心的性能。 在理想情况下如果你有8个物理核心并且任务可以完美地分割和并行执行你可能期望接近8倍的加速。但实际还是比较难的 4. 关键技术与工程实践
4.1 进程间通信 (IPC)
问题进程间内存隔离不能直接共享数据 解决方案使用队列multiprocessing.Queue
# 创建队列
from multiprocessing import Queue
job_queue Queue()
result_queue Queue()# 生产者主进程
job_queue.put(task_data)# 消费者工作进程
task job_queue.get()
result process(task)
result_queue.put(result)4.2 毒丸模式 (Poison Pill)
作用优雅终止工作进程 实现技巧
使用不会与正常数据冲突的哨兵值0、None、Ellipsis等每个进程一个毒丸
# 主进程
for _ in range(num_workers):job_queue.put(POISON_PILL) # 发送终止信号# 工作进程
while True:task job_queue.get()if task is POISON_PILL:break# 处理正常任务4.3 结果乱序处理
问题任务完成顺序 ≠ 提交顺序 解决方案结果中携带原始数据标识
# 错误做法丢失关联
results.put(is_prime(n))# 正确做法保留关联
results.put((n, is_prime(n), elapsed_time))4.4 进程数优化
最佳实践进程数 ≈ 物理CPU核心数 图不同进程数下的执行时间6核CPU最佳
5. 多线程方案的陷阱 (threads.py)
5.1 为何线程无效
# 伪代码展示线程方案
from threading import Threaddef worker():while task : task_queue.get():# CPU密集型计算result heavy_computation(task) # GIL在此阻塞其他线程result_queue.put(result)# 创建多个线程实际无法并行执行CPU任务5.2 GIL (全局解释器锁) 的影响
Python解释器一次只允许一个线程执行字节码CPU密集型任务无法充分利用多核线程切换反而增加开销
5.3 实测性能
方案总耗时对比顺序执行顺序执行40.31s基准12线程42.5s更慢 关键结论线程适合I/O密集型任务不适合CPU密集型任务 6. 补充实例I/O密集型任务对比
6.1 模拟I/O任务
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorURLS [https://www.example.com/page1,https://www.example.com/page2,# ... 20个URL
]def fetch(url):start time.perf_counter()response requests.get(url) # I/O阻塞操作elapsed time.perf_counter() - startreturn len(response.content), elapsed6.2 性能对比
方案任务类型20个任务总耗时顺序执行I/O12.7s多线程(20)I/O1.3s多进程(20)I/O3.8s
6.3 关键结论
I/O密集型多线程最佳避免进程创建开销CPU密集型多进程最佳避开GIL限制混合型任务需根据实际情况测试选择
7. 工程实践建议
7.1 避免自造轮子
使用标准库concurrent.futures简化代码
# 使用ProcessPoolExecutor重构procs.py
from concurrent.futures import ProcessPoolExecutordef main():with ProcessPoolExecutor(max_workers12) as executor:futures {executor.submit(is_prime, n): n for n in NUMBERS}for future in as_completed(futures):n futures[future]prime future.result()# 处理结果...7.2 常见陷阱与解决方案
陷阱解决方案忘记if __name__ __main__始终使用守护模式队列阻塞导致死锁设置超时/使用非阻塞方法进程资源泄漏使用上下文管理器管理进程池跨平台序列化问题避免传递lambda/闭包函数
7.3 调试技巧
简化复现减少进程数和任务量日志记录import logging
logging.basicConfig(levellogging.DEBUG,format%(asctime)s - %(name)s - %(levelname)s - %(message)s
)使用ProcessPoolExecutor替代手动管理进程
8. 总结Python在多核世界的生存之道 CPU密集型任务 使用多进程绕过GIL限制优选concurrent.futures.ProcessPoolExecutor进程数≈物理CPU核心数 I/O密集型任务 使用多线程减少上下文切换开销优选concurrent.futures.ThreadPoolExecutor结合asyncio实现更高并发 混合任务优化 分离CPU和I/O逻辑使用进程池线程池分层架构考虑C扩展处理关键计算 分布式扩展 对于超大规模任务使用Celery或Dask跨节点分发结合云服务实现弹性扩展 关键洞见Python通过多进程和异步编程模型结合丰富的生态系统能有效利用多核处理器应对各类计算需求。 总结
在介绍一些理论之后本章展示了通过 Python 三种原生并发编程模型实现的 spinner 脚本
基于 threading 包的线程模型基于 multiprocessing 的进程模型基于 asyncio 的异步协程模型
随后我们通过实验探究了全局解释器锁GIL的实际影响修改 spinner 示例以计算大整数的素性并观察其行为。实验直观表明异步编程asyncio中必须避免 CPU 密集型函数因为它们会阻塞事件循环。实验的线程版本虽然存在 GIL 限制但仍能工作——这是因为 Python 会定期中断线程且该示例仅使用两个线程一个执行 CPU 密集型计算另一个每秒仅驱动动画 10 次。多进程版本则绕开了 GIL通过启动新进程专门处理动画而主进程执行素性检查。
下一个计算多个素数的示例突出了多进程与线程的区别证明只有进程能让 Python 充分利用多核 CPU。对于重计算任务Python 的 GIL 会使线程性能比顺序代码更差。
GIL 是 Python 并发与并行计算讨论的核心但我们不应高估其影响如 725 页“Python 与多核世界”所述。例如GIL 对 Python 在系统管理中的许多用例并无影响。另一方面数据科学和服务器端开发社区已通过针对特定需求的工业级解决方案绕开了 GIL 的限制。最后两节提到了支持 Python 服务器端应用规模化的两个常见组件WSGI 应用服务器和分布式任务队列。