asyncio之协程结合线程和进程

协程结合线程和进程

很多现有的库都没有准备好与asyncio一起使用。它们可能会阻塞,或者依赖于模块不可用的并发功能。但仍然可以使用这些库,在一个基于asyncio的应用程序中,通过使用concurrent.futures的executor ,可以在单独的线程或单独的进程中运行代码。

线程

事件循环的run_in_executor()方法需要executor 实例,可调用的常规调用以及要传递给可调用对象的任何参数。它返回一个Future,可以用来等待函数完成它的工作并返回一些东西。如果没有executor被传入,则创建一个ThreadPoolExecutor。这个例子明确地创建了一个执行器来限制它可用的工作线程的数量。
一个ThreadPoolExecutor启动它的工作线程,然后在一个线程中调用每个提供的函数。这个例子展示了如何结合run_in_executor()和wait()让协程让出控制权给事件循环,当阻塞函数在单独的线程中运行时,然后在这些函数完成时唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import asyncio
import concurrent.futures
import logging
import sys
import time
def blocks(n):
log = logging.getLogger('blocks({})'.format(n))
log.info('running')
time.sleep(0.1)
log.info('done')
return n ** 2
async def run_blocking_tasks(executor):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, i)
for i in range(6)
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()

asyncio_executor_thread.py使用日志记录来方便地指示哪个线程和函数正在生成每个日志消息。因为每次调用block()都会使用一个单独的记录器,所以输出清楚地显示了相同的线程被重用,以不同的参数调用函数的多个副本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ python3 asyncio_executor_thread.py
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
Thread-1 blocks(0): running
Thread-2 blocks(1): running
Thread-3 blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
Thread-1 blocks(0): done
Thread-3 blocks(2): done
Thread-1 blocks(3): running
Thread-2 blocks(1): done
Thread-3 blocks(4): running
Thread-2 blocks(5): running
Thread-1 blocks(3): done
Thread-2 blocks(5): done
Thread-3 blocks(4): done
MainThread run_blocking_tasks: results: [16, 4, 1, 0, 25, 9]
MainThread run_blocking_tasks: exiting

进程

ProcessPoolExecutor的工作方式大致相同,创建一组工作进程而不是线程。使用单独的进程需要更多的系统资源,但是对于计算密集型操作,可以在每个CPU内核上运行单独的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if __name__ == '__main__':
# Configure logging to show the id of the process
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='PID %(process)5s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited process pool.
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()

从线程转移到进程所需的唯一更改是创建不同类型的executor。此示例还将日志记录格式字符串更改为包含进程标识而不是线程名称,以演示实际上这些任务正在单独的进程中运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ python3 asyncio_executor_process.py
PID 16429 run_blocking_tasks: starting
PID 16429 run_blocking_tasks: creating executor tasks
PID 16429 run_blocking_tasks: waiting for executor tasks
PID 16430 blocks(0): running
PID 16431 blocks(1): running
PID 16432 blocks(2): running
PID 16430 blocks(0): done
PID 16432 blocks(2): done
PID 16431 blocks(1): done
PID 16430 blocks(3): running
PID 16432 blocks(4): running
PID 16431 blocks(5): running
PID 16431 blocks(5): done
PID 16432 blocks(4): done
PID 16430 blocks(3): done
PID 16429 run_blocking_tasks: results: [4, 0, 16, 1, 9, 25]
PID 16429 run_blocking_tasks: exiting

本文翻译自《The Python3 Standard Library By Example》asyncio相关章节

文章目录
  1. 1. 线程
  2. 2. 进程
|