协程结合线程和进程
很多现有的库都没有准备好与asyncio一起使用。它们可能会阻塞,或者依赖于模块不可用的并发功能。但仍然可以使用这些库,在一个基于asyncio的应用程序中,通过使用concurrent.futures的executor ,可以在单独的线程或单独的进程中运行代码。
线程
事件循环的run_in_executor()方法需要executor 实例,可调用的常规调用以及要传递给可调用对象的任何参数。它返回一个Future,可以用来等待函数完成它的工作并返回一些东西。如果没有executor被传入,则创建一个ThreadPoolExecutor。这个例子明确地创建了一个执行器来限制它可用的工作线程的数量。
一个ThreadPoolExecutor启动它的工作线程,然后在一个线程中调用每个提供的函数。这个例子展示了如何结合run_in_executor()和wait()让协程让出控制权给事件循环,当阻塞函数在单独的线程中运行时,然后在这些函数完成时唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| import asyncio import concurrent.futures import logging import sys import time def blocks(n): log = logging.getLogger('blocks({})'.format(n)) log.info('running') time.sleep(0.1) log.info('done') return n ** 2 async def run_blocking_tasks(executor): log = logging.getLogger('run_blocking_tasks') log.info('starting') log.info('creating executor tasks') loop = asyncio.get_event_loop() blocking_tasks = [ loop.run_in_executor(executor, blocks, i) for i in range(6) ] log.info('waiting for executor tasks') completed, pending = await asyncio.wait(blocking_tasks) results = [t.result() for t in completed] log.info('results: {!r}'.format(results)) log.info('exiting') if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(threadName)10s %(name)18s: %(message)s', stream=sys.stderr, ) executor = concurrent.futures.ThreadPoolExecutor( max_workers=3, ) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete( run_blocking_tasks(executor) ) finally: event_loop.close()
|
asyncio_executor_thread.py使用日志记录来方便地指示哪个线程和函数正在生成每个日志消息。因为每次调用block()都会使用一个单独的记录器,所以输出清楚地显示了相同的线程被重用,以不同的参数调用函数的多个副本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| $ python3 asyncio_executor_thread.py MainThread run_blocking_tasks: starting MainThread run_blocking_tasks: creating executor tasks Thread-1 blocks(0): running Thread-2 blocks(1): running Thread-3 blocks(2): running MainThread run_blocking_tasks: waiting for executor tasks Thread-1 blocks(0): done Thread-3 blocks(2): done Thread-1 blocks(3): running Thread-2 blocks(1): done Thread-3 blocks(4): running Thread-2 blocks(5): running Thread-1 blocks(3): done Thread-2 blocks(5): done Thread-3 blocks(4): done MainThread run_blocking_tasks: results: [16, 4, 1, 0, 25, 9] MainThread run_blocking_tasks: exiting
|
进程
ProcessPoolExecutor的工作方式大致相同,创建一组工作进程而不是线程。使用单独的进程需要更多的系统资源,但是对于计算密集型操作,可以在每个CPU内核上运行单独的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='PID %(process)5s %(name)18s: %(message)s', stream=sys.stderr, ) executor = concurrent.futures.ProcessPoolExecutor( max_workers=3, ) event_loop = asyncio.get_event_loop() try: event_loop.run_until_complete( run_blocking_tasks(executor) ) finally: event_loop.close()
|
从线程转移到进程所需的唯一更改是创建不同类型的executor。此示例还将日志记录格式字符串更改为包含进程标识而不是线程名称,以演示实际上这些任务正在单独的进程中运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| $ python3 asyncio_executor_process.py PID 16429 run_blocking_tasks: starting PID 16429 run_blocking_tasks: creating executor tasks PID 16429 run_blocking_tasks: waiting for executor tasks PID 16430 blocks(0): running PID 16431 blocks(1): running PID 16432 blocks(2): running PID 16430 blocks(0): done PID 16432 blocks(2): done PID 16431 blocks(1): done PID 16430 blocks(3): running PID 16432 blocks(4): running PID 16431 blocks(5): running PID 16431 blocks(5): done PID 16432 blocks(4): done PID 16430 blocks(3): done PID 16429 run_blocking_tasks: results: [4, 0, 16, 1, 9, 25] PID 16429 run_blocking_tasks: exiting
|
本文翻译自《The Python3 Standard Library By Example》asyncio相关章节