asyncio之同步原语

同步原语

尽管asyncio程序通常是单线程运行的,它们仍然可以构建并发程序。基于来自I/O和其他外部事件的延迟和中断,每个协程或Task可能以不可预知的顺序执行。为了支持并发安全,asyncio实现了一些低级的原子操作,类似于threading和multiprocessing模块中的。

Locks

Lock可以用来保护对共享资源的访问,只有锁的持有者可以使用该资源。多次尝试获取锁的操作将被阻止,因此一次只有一个持有者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
import functools
def unlock(lock):
print('callback releasing lock')
lock.release()
async def coro1(lock):
print('coro1 waiting for the lock')
with await lock:
print('coro1 acquired lock')
print('coro1 released lock')
async def coro2(lock):
print('coro2 waiting for the lock')
await lock
try:
print('coro2 acquired lock')
finally:
print('coro2 released lock')
lock.release()
async def main(loop):
# 创建并获取一个共享锁
lock = asyncio.Lock()
print('acquiring the lock before starting coroutines')
await lock.acquire()
print('lock acquired: {}'.format(lock.locked()))
# 安排一个回调来解锁锁
loop.call_later(0.1, functools.partial(unlock, lock))
# 运行想要使用锁的协程
print('waiting for coroutines')
await asyncio.wait([coro1(lock), coro2(lock)]),
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

锁可以直接被调用,使用await获取锁,当任务完成时调用release()方法释放锁。它们也可以使用with await关键字作为异步上下文管理器。

1
2
3
4
5
6
7
8
9
10
11
12
$ python3 asyncio_lock.py
acquiring the lock before starting coroutines
lock acquired: True
waiting for coroutines
coro1 waiting for the lock
coro2 waiting for the lock
callback releasing lock
coro1 acquired lock
coro1 released lock
coro2 acquired lock
coro2 released lock

Events

asyncio.Event是基于threading.Event的,用于允许多个消费者等待发生的事情,而不寻找与通知相关联的特定值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import functools
def set_event(event):
print('setting event in callback')
event.set()
async def coro1(event):
print('coro1 waiting for event')
await event.wait()
print('coro1 triggered')
async def coro2(event):
print('coro2 waiting for event')
await event.wait()
print('coro2 triggered')
async def main(loop):
event = asyncio.Event()
print('event start state: {}'.format(event.is_set()))
loop.call_later(0.1, functools.partial(set_event, event))
await asyncio.wait([coro1(event), coro2(event)])
print('event end state: {}'.format(event.is_set()))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

和Lock一样,coro1()和coro2()都在等待Event被设置,不同之处在于两者都可以在事件状态改变时立即开始,并且他们不需要获取对event对象的唯一保持。

1
2
3
4
5
6
7
8
9
$ python3 asyncio_event.py
event start state: False
coro2 waiting for event
coro1 waiting for event
setting event in callback
coro2 triggered
coro1 triggered
event end state: True

Conditions

Condition的工作方式跟Event的很类似,除了不是通知所有等待的协程,唤醒的等待者的数量是通过notify()参数来控制的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio
async def consumer(condition, n):
with await condition:
print('consumer {} is waiting'.format(n))
await condition.wait()
print('consumer {} triggered'.format(n))
print('ending consumer {}'.format(n))
async def manipulate_condition(condition):
print('starting manipulate_condition')
# 暂停让消费者开始
await asyncio.sleep(0.1)
for i in range(1, 3):
with await condition:
print('notifying {} consumers'.format(i))
condition.notify(n=i)
await asyncio.sleep(0.1)
with await condition:
print('notifying remaining consumers')
condition.notify_all()
print('ending manipulate_condition')
async def main(loop):
# 创建一个condition
condition = asyncio.Condition()
# 设置任务监视condition
consumers = [consumer(condition, i) for i in range(5)]
# 安排一个任务来操作condition变量
loop.create_task(manipulate_condition(condition))
# 等待消费者运行结束
await asyncio.wait(consumers)
event_loop = asyncio.get_event_loop()
try:
result = event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()

这个例子中我们启动了五个监视Condition的消费者,每一个都使用 wait()方法等待让他们继续运行的通知。manipulate_condition()通知了一个消费者然后两个消费者,最后所有的消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ python3 asyncio_condition.py
starting manipulate_condition
consumer 3 is waiting
consumer 1 is waiting
consumer 2 is waiting
consumer 0 is waiting
consumer 4 is waiting
notifying 1 consumers
consumer 3 triggered
ending consumer 3
notifying 2 consumers
consumer 1 triggered
ending consumer 1
consumer 2 triggered
ending consumer 2
notifying remaining consumers
ending manipulate_condition
consumer 0 triggered
ending consumer 0
consumer 4 triggered
ending consumer 4

Queues

asyncio.Queue提供了协程的先入先出的数据结构,类似于多线程的queue.Queue和多进程的multiprocessing.Queue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
async def consumer(n, q):
print('consumer {}: starting'.format(n))
while True:
print('consumer {}: waiting for item'.format(n))
item = await q.get()
print('consumer {}: has item {}'.format(n, item))
if item is None:
# None是停止的信号
q.task_done()
break
else:
await asyncio.sleep(0.01 * item)
q.task_done()
print('consumer {}: ending'.format(n))
async def producer(q, num_workers):
print('producer: starting')
# 添加一些数字到队列模拟任务
for i in range(num_workers * 3):
await q.put(i)
print('producer: added task {} to the queue'.format(i))
# 添加None到队列表示停止信号
print('producer: adding stop signals to the queue')
for i in range(num_workers):
await q.put(None)
print('producer: waiting for queue to empty')
await q.join()
print('producer: ending')
async def main(loop, num_consumers):
# 创建具有固定大小的队列,直到有空位之前队列都会被阻塞
q = asyncio.Queue(maxsize=num_consumers)
# 调度消费者任务
consumers = [loop.create_task(consumer(i, q)) for i in range(num_consumers)]
# 调度生产者任务
prod = loop.create_task(producer(q, num_consumers))
# 等待所有协程完成
await asyncio.wait(consumers + [prod])
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop, 2))
finally:
event_loop.close()

通过put()函数添加元素,get()函数获取元素都是异步操作,因为队列大小可能是固定的(阻塞添加),或者队列可能是空的(阻塞调用来获取项目)。

本文翻译自《The Python3 Standard Library By Example》asyncio相关章节

文章目录
  1. 1. Locks
  2. 2. Events
  3. 3. Conditions
  4. 4. Queues
|