asyncio之并发执行Tasks

并发执行Tasks

Tasks是与事件循环交互的主要方式之一。Tasks会包装协程并对其运行状态进行追踪。Tasks是Future的子类,所以其他协程也能够等待它们,并且每一个Task在任务完成后都有一个可被检索到的结果。

启动Task

为了启动一个Task,可以使用create_task()来创建一个Task实例。只要循环运行并且协程不返回,由此产生的Task将作为由事件循环管理的并发操作的一部分运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
async def task_func():
print('in task_func')
return 'the result'
async def main(loop):
print('creating task')
task = loop.create_task(task_func())
print('waiting for {!r}'.format(task))
return_value = await task
print('task completed {!r}'.format(task))
print('return value: {!r}'.format(return_value))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

这个例子中,在main()函数退出之前等待Task返回结果。

1
2
3
4
5
6
7
8
9
$ python3 asyncio_create_task.py
creating task
waiting for <Task pending coro=<task_func() running at
asyncio_create_task.py:12>>
in task_func
task completed <Task finished coro=<task_func() done, defined at
asyncio_create_task.py:12> result='the result'>
return value: 'the result'

取消Task

通过保留从create_task()返回的Task对象,可以在完成之前取消Task的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
async def task_func():
print('in task_func')
return 'the result'
async def main(loop):
print('creating task')
task = loop.create_task(task_func())
print('canceling task')
task.cancel()
print('canceled task {!r}'.format(task))
try:
await task
except asyncio.CancelledError:
print('caught error from canceled task')
else:
print('task result: {!r}'.format(task.result()))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

这个例子中,在启动事件循环之前创建然后又取消了Task,在获取结果时从run_until_complete()中抛出了CancelledError异常。

1
2
3
4
5
6
7
$ python3 asyncio_cancel_task.py
creating task
canceling task
canceled task <Task cancelling coro=<task_func() running at
asyncio_cancel_task.py:12>>
caught error from canceled task

如果一个Task在等待其他并发操作时被取消了,通过在等待的地方引发CancelledError异常来通知Task的取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
async def task_func():
print('in task_func, sleeping')
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
print('task_func was canceled')
raise
return 'the result'
def task_canceller(t):
print('in task_canceller')
t.cancel()
print('canceled the task')
async def main(loop):
print('creating task')
task = loop.create_task(task_func())
loop.call_soon(task_canceller, task)
try:
await task
except asyncio.CancelledError:
print('main() also sees task as canceled')
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

如有必要,捕获异常提供了清理已经完成的工作的机会。

1
2
3
4
5
6
7
8
$ python3 asyncio_cancel_task2.py
creating task
in task_func, sleeping
in task_canceller
canceled the task
task_func was canceled
main() also sees task as canceled

从协程创建Task

ensure_future()函数返回一个绑定到协程执行的任务。然后可以将Task实例传递给其他代码,在不知道原始协程是如何构造或调用的情况下,可以等待它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
async def wrapped():
print('wrapped')
return 'result'
async def inner(task):
print('inner: starting')
print('inner: waiting for {!r}'.format(task))
result = await task
print('inner: task returned {!r}'.format(result))
async def starter():
print('starter: creating task')
task = asyncio.ensure_future(wrapped())
print('starter: waiting for inner')
await inner(task)
print('starter: inner returned')
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
result = event_loop.run_until_complete(starter())
finally:
event_loop.close()

注意,协程被发送到 ensure_future()时没有被启动,直到对它使用await才允许被执行。

1
2
3
4
5
6
7
8
9
10
11
$ python3 asyncio_ensure_future.py
entering event loop
starter: creating task
starter: waiting for inner
inner: starting
inner: waiting for <Task pending coro=<wrapped() running at
asyncio_ensure_future.py:12>>
wrapped
inner: task returned 'result'
starter: inner returned

本文翻译自《The Python3 Standard Library By Example》asyncio相关章节

文章目录
  1. 1. 启动Task
  2. 2. 取消Task
  3. 3. 从协程创建Task
|