asyncio之用控制结构组建协程

用控制结构组建协程

内置的语言关键字await可以轻松管理一系列协程之间的线性控制流程,更复杂的结构允许一个协程等待好几个其他的并行协程完成,这些也可以使用asyncio提供的工具完成。

等待多个协程

把一个操作分成许多部分并分别执行是很有用的。比如说,下载多个远程资源或者查询远程APIs。在执行顺序无关紧要的情况下,可能会有任意数量的操作, wait()可以被用来暂停一个协程知道其他的操作全部完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
async def phase(i):
print('in phase {}'.format(i))
await asyncio.sleep(0.1 * i)
print('done with phase {}'.format(i))
return 'phase {} result'.format(i)
async def main(num_phases):
print('starting main')
phases = [phase(i) for i in range(num_phases)]
print('waiting for phases to complete')
completed, pending = await asyncio.wait(phases)
results = [t.result() for t in completed]
print('results: {!r}'.format(results))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()

在内部,wait()使用set保存它创建的Task实例,这导致它们开始和结束的顺序都不可预测。wait()的返回值是一个元组,包含两个set集合,已经结束和待处理的Tasks。

1
2
3
4
5
6
7
8
9
10
11
$ python3 asyncio_wait.py
starting main
waiting for phases to complete
in phase 0
in phase 1
in phase 2
done with phase 0
done with phase 1
done with phase 2
results: ['phase 1 result', 'phase 0 result', 'phase 2 result']

如果wait()使用了timeout值就会剩下还未处理的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
async def phase(i):
print('in phase {}'.format(i))
try:
await asyncio.sleep(0.1 * i)
except asyncio.CancelledError:
print('phase {} canceled'.format(i))
raise
else:
print('done with phase {}'.format(i))
return 'phase {} result'.format(i)
async def main(num_phases):
print('starting main')
phases = [phase(i) for i in range(num_phases)]
print('waiting 0.1 for phases to complete')
completed, pending = await asyncio.wait(phases, timeout=0.1)
print('{} completed and {} pending'.format(
len(completed), len(pending),
))
if pending:
print('canceling tasks')
for t in pending:
t.cancel()
print('exiting main')
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()

剩下的后台操作要么被取消,要么等待完成。在事件循环还在运行的时候让它们处于待处理状态会让它们继续执行,如果整个操作被认为是中止的,这可能是不可取的。在流程结束时将它们留在待处理状态将导致警告报告。

1
2
3
4
5
6
7
8
9
10
11
12
13
$ python3 asyncio_wait_timeout.py
starting main
waiting 0.1 for phases to complete
in phase 1
in phase 0
in phase 2
done with phase 0
1 completed and 2 pending
cancelling tasks
exiting main
phase 1 cancelled
phase 2 cancelled

从协程聚集结果

如果后台的执行步骤是明确的,并且这些执行步骤的结果很重要, 使用gather()等待多个操作可能更有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
async def phase1():
print('in phase1')
await asyncio.sleep(2)
print('done with phase1')
return 'phase1 result'
async def phase2():
print('in phase2')
await asyncio.sleep(1)
print('done with phase2')
return 'phase2 result'
async def main():
print('starting main')
print('waiting for phases to complete')
results = await asyncio.gather(phase1(), phase2())
print('results: {!r}'.format(results))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main())
finally:
event_loop.close()

通过gather()创建的Tasks没有对外暴露,所以他们不能被取消。返回值是与传递给gather()参数顺序相同的结果列表,不管后台操作实际完成的顺序如何。

1
2
3
4
5
6
7
8
9
$ python3 asyncio_gather.py
starting main
waiting for phases to complete
in phase2
in phase1
done with phase2
done with phase1
results: ['phase1 result', 'phase2 result']

操作完成时进行处理

as_completed()是一个生成器函数管理给定的协程列表的执行并且在每一个任务运行完结束都会产生一个结果。和wait()一样,as_completed()是不能保证运行顺序的,但在执行其他操作之前,没有必要等待所有后台操作完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
async def phase(i):
print('in phase {}'.format(i))
await asyncio.sleep(0.5 - (0.1 * i))
print('done with phase {}'.format(i))
return 'phase {} result'.format(i)
async def main(num_phases):
print('starting main')
phases = [phase(i) for i in range(num_phases)]
print('waiting for phases to complete')
results = []
for next_to_complete in asyncio.as_completed(phases):
answer = await next_to_complete
print('received answer {!r}'.format(answer))
results.append(answer)
print('results: {!r}'.format(results))
return results
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()

在这个例子中启动了好几个后台操作,他们完成的顺序与他们开始的顺序相反。当生成器进行消费,事件循环通过await关键字等待协程结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ python3 asyncio_as_completed.py
starting main
waiting for phases to complete
in phase 0
in phase 2
in phase 1
done with phase 2
received answer 'phase 2 result'
done with phase 1
received answer 'phase 1 result'
done with phase 0
received answer 'phase 0 result'
results: ['phase 2 result', 'phase 1 result', 'phase 0 result']

本文翻译自《The Python3 Standard Library By Example》asyncio相关章节

文章目录
  1. 1. 等待多个协程
  2. 2. 从协程聚集结果
  3. 3. 操作完成时进行处理
|