环保部网站建设项目,怎么做代理人金沙网站,产品推广渠道有哪些,淘客做网站多少钱池化介绍
在当今计算机科学和软件工程的领域中#xff0c;池化技术如线程池、连接池和对象池等已经成为优化资源利用率和提高软件性能的重要工具。然而#xff0c;在 Python 的协程领域#xff0c;我们却很少见到类似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。为什…池化介绍
在当今计算机科学和软件工程的领域中池化技术如线程池、连接池和对象池等已经成为优化资源利用率和提高软件性能的重要工具。然而在 Python 的协程领域我们却很少见到类似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。为什么会这样呢
首先Python Coroutine 的特性使得池化技术在协程中的应用相对较少。与像 Golang 这样支持有栈协程的语言不同Python Coroutine 是无栈的无法跨核执行从而限制了协程池发挥多核优势的可能性。
其次Python Coroutine 的轻量级和快速创建销毁的特性使得频繁创建和销毁协程并不会带来显著的性能损耗。这也解释了为什么 Python 官方一直没有引入 CoroutinePoolExecutor。
然而作为开发者我们仍然可以在特定场景下考虑协程的池化。虽然 Python Coroutine 轻量但在一些需要大量协程协同工作的应用中池化技术能够提供更方便、统一的调度子协程的方式。尤其是在涉及到异步操作的同时需要控制并发数量时协程池的优势就显而易见了。
关于 Python 官方是否会在未来引入类似于 TaskGroup 的 CoroutinePoolExecutor这或许是一个悬而未决的问题。考虑到 Python 在异步编程方面的快速发展我们不能排除未来可能性的存在。或许有一天我们会看到 TaskGroup 引入一个 max_workers 的形参以更好地支持对协程池的需求。
在实际开发中我们也可以尝试编写自己的 CoroutinePoolExecutor以满足特定业务场景的需求。通过合理的设计架构和对数据流的全局考虑我们可以最大程度地发挥协程池的优势提高系统的性能和响应速度。
在接下来的文章中我们将探讨如何设计和实现一个简单的 CoroutinePoolExecutor以及在实际项目中的应用场景。通过深入理解协程池的工作原理我们或许能更好地利用这一技术使我们的异步应用更为高效。
如何开始编写
如何开始编写 CoroutinePoolExecutor首先我们要明确出其适用范畴、考虑到使用方式和其潜在的风险点
它并不适用于 Mult Thread Mult Event Loop 的场景因此它并非线程安全的。应当保持和 ThreadPoolExecutor 相同的调用方式。不同于 Mult Thread 中子线程不依赖于主线程的运行而在 Mult Coroutine 中子协程必须依赖于主协程因此主协程在子协程没有全部运行完毕之前不能直接 done 掉。这也解释了为什么 TaskGroup 官方实现中没有提供类似于 shutdown 之类的方法而是只提供上下文管理的运行方式。
有了上述 3 点的考量我们决定将 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。这样的好处在于作为学习者一方面可以了解 ThreadPoolExecutor 的内部实现机制另一方面站在巨人肩膀上的编程借鉴往往会事半功倍对于自我的提升也是较为明显的。
在考虑这些因素的同时我们将继续深入研究协程池的设计和实现。通过对适用范围和使用方式的明确我们能更好地把握 CoroutinePoolExecutor 的潜在优势为异步应用的性能提升做出更有针对性的贡献。
具体代码实现
在这里我先贴出完整的代码实现其中着重点已经用注释标明。
以下是 CoroutinePoolExecutor 的代码实现
import os
import asyncio
import weakref
import logging
import itertoolsasync def _worker(executor_reference: CoroutinePoolExecutor, work_queue: asyncio.Queue):try:while True:work_item await work_queue.get()if work_item is not None:await work_item.run()del work_itemexecutor executor_reference()if executor is not None:# Notify available coroutinesexecutor._idle_semaphore.release()del executorcontinue# Notifies the next coroutine task that it is time to exitawait work_queue.put(None)breakexcept Exception as exc:logging.critical(Exception in worker, exc_infoTrue)class _WorkItem:def __init__(self, future, coro):self.future futureself.coro coroasync def run(self):try:result await self.coroexcept Exception as exc:self.future.set_exception(exc)else:self.future.set_result(result)class CoroutinePoolExecutor:Coroutine pool implemented based on ThreadPoolExecutorDifferent from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutineSo you must use the shutdown method to wait for all subtasks and wait for them to complete execution# Used to assign unique thread names when coroutine_name_prefix is not supplied._counter itertools.count().__next__def __init__(self, max_workers, coroutine_name_prefix):if max_workers is None:max_workers min(32, (os.cpu_count() or 1) 4)if max_workers 0:raise ValueError(max_workers must be greater than 0)self._max_workers max_workersself._work_queue asyncio.Queue()self._idle_semaphore asyncio.Semaphore(0)self._coroutines set()self._shutdown Falseself._shutdown_lock asyncio.Lock()self._coroutine_name_prefix (coroutine_name_prefix or (f{__class__.__name__}-{self._counter()}))async def submit(self, coro):async with self._shutdown_lock:# When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.# This is because after shutdown, all sub-coroutines will end their work# one after another. Even if there are new coroutine tasks, they will not# be reactivated.if self._shutdown:raise RuntimeError(cannot schedule new coroutine task after shutdown)f asyncio.Future()w _WorkItem(f,coro)await self._work_queue.put(w)await self._adjust_coroutine_count()return fasync def _adjust_coroutine_count(self):try:# 2 functions:# - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.# - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified# Since the Semaphore provided by asyncio does not have a timeout# parameter, you can choose to use it with wait_for.if await asyncio.wait_for(self._idle_semaphore.acquire(),0):returnexcept TimeoutError:passnum_coroutines len(self._coroutines)if num_coroutines self._max_workers:coroutine_name f{self._coroutine_name_prefix or self}_{num_coroutines}t asyncio.create_task(coro_worker(weakref.ref(self),self._work_queue),namecoroutine_name)self._coroutines.add(t)async def shutdown(self, waitTrue, *, cancel_futuresFalse):async with self._shutdown_lock:self._shutdown Trueif cancel_futures:while True:try:work_item self._work_queue.get_nowait()except asyncio.QueueEmpty:breakif work_item is not None:work_item.future.cancel()# None is an exit signal, given by the shutdown method, when the shutdown method is called# will notify the sub-coroutine to stop working and exit the loopawait self._work_queue.put(None)if wait:for t in self._coroutines:await tasync def __aenter__(self):return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):await self.shutdown(waitTrue)return False以下是 CoroutinePoolExecutor 的使用方式
import asynciofrom coroutinepoolexecutor import CoroutinePoolExecutorasync def task(i):await asyncio.sleep(1)print(ftask-{i})async def main():async with CoroutinePoolExecutor(2) as executor:for i in range(10):await executor.submit(task(i))if __name__ __main__:asyncio.run(main())我们知道在线程池中工作线程一旦创建会不断的领取新的任务并执行除开 shutdown() 调用否则对于静态的线程池来讲工作线程不会自己结束。
在上述协程池代码实现中CoroutinePoolExecutor 类包含了主要的对外调用功能的接口、内部提供了存储 task 的 Queue、工作协程自动生成 name 的计数器、保障协程的信号量锁等等。
而 _worker 函数是工作协程的运行函数其会在工作协程启动后不断的从 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具体执行 coro task。
剩下的 _WorkItem 是一个 future 对象与 coro task 的封装器其功能是解耦 future 对象和 coro task、并在 coro task 运行时和运行后设置 future 的结果。
对于异步循环的思考
在此 CoroutinePoolExecutor 实现后我其实又有了一个新的思考。Python 的 EventLoop 相较于 Node.js 的 EventLoop 来说其实更加的底层它有感的暴露了出来。
具体体现在当 Python Event Loop 启动后如果 main coroutine 停止运行那么所有的 subtask coroutine 也会停止运行尤其是对于一些需要清理资源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都会在某些情况显得无措说的更具体点就是不知道在什么时候调用。
对于这些问题我们可以继承 BaseEventLoop 自己手动对 EventLoop 的功能进行扩展如在事件循环关闭之前添加 hook function甚至可以限制整个 EventLoop 的 max_workers 或者做成动态的可调节 coroutine 数量的 EventLoop 都行。
无论如何只要心里有想法就可以去将它实现 .. 学习本身就是一个不断挑战的过程。