asyncio之使用子进程

使用子进程

和其他程序或者进程一起工作的需求很常见,利用现有的代码而不用重写它,或者访问Python中没有的库或功能。与网络I/O一样,asyncio包含两个抽象,用于启动另一个程序,然后与之交互。

使用协议抽象子进程

这个例子使用一个协程启动一个进程来运行Unix命令df来查找本地磁盘上的可用空间。它使用subprocess_exec()启动进程,并将其绑定到一个协议类,该类知道如何读取df命令输出并解析它。协议类的方法根据子进程的I/O事件自动调用。由于stdin和stderr参数都设置为None,因此这些通信通道未连接到新进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import functools
async def run_df(loop):
print('in run_df')
cmd_done = asyncio.Future(loop=loop)
factory = functools.partial(DFProtocol, cmd_done)
proc = loop.subprocess_exec(
factory,
'df', '-hl',
stdin=None,
stderr=None,
)
try:
print('launching process')
transport, protocol = await proc
print('waiting for process to complete')
await cmd_done
finally:
transport.close()
return cmd_done.result()

DFProtocol类是SubprocessProtocol的子类,它定义了一个通过管道与另一个进程通信的API。done参数将成为调用者用来监视进程完成的Future。

1
2
3
4
5
6
7
8
class DFProtocol(asyncio.SubprocessProtocol):
FD_NAMES = ['stdin', 'stdout', 'stderr']
def __init__(self, done_future):
self.done = done_future
self.buffer = bytearray()
super().__init__()

与socket通信一样,当设置新进程的输入通道时,会调用connection_made()。transport参数是BaseSubprocessTransport子类的一个实例。如果进程配置为接收输入,它可以读取进程输出的数据并将数据写入进程的输入流。

1
2
3
def connection_made(self, transport):
print('process started {}'.format(transport.get_pid()))
self.transport = transport

当进程产生输出时,pipe_data_received()被调用,其中文件描述符是来自数据发送的文件,实际的数据是从管道中读取的。协议类将进程的标准输出通道的输出保存在缓冲区中供以后处理。

1
2
3
4
def pipe_data_received(self, fd, data):
print('read {} bytes from {}'.format(len(data), self.FD_NAMES[fd]))
if fd == 1:
self.buffer.extend(data)

当进程终止时,process_exited()被调用。通过调用get_returncode()可以从传输对象获得进程的退出代码。在这种情况下,如果没有错误报告,则在通过Future实例返回之前,可用的输出将被解码和解析。如果有错误,则结果被假设为空。设置将来的结果告诉run_df()该进程已经退出,所以它将清理并返回结果。

1
2
3
4
5
6
7
8
9
10
def process_exited(self):
print('process exited')
return_code = self.transport.get_returncode()
print('return code {}'.format(return_code))
if not return_code:
cmd_output = bytes(self.buffer).decode()
results = self._parse_results(cmd_output)
else:
results = []
self.done.set_result((return_code, results))

命令输出被解析成一系列字典,将每个输出行的标题名称映射到它们的值,并返回结果列表。

1
2
3
4
5
6
7
8
9
10
11
12
def _parse_results(self, output):
print('parsing results')
if not output:
return []
lines = output.splitlines()
headers = lines[0].split()
devices = lines[1:]
results = [
dict(zip(headers, line.split()))
for line in devices
]
return results

run_df()协程使用run_until_complete()运行,然后检查结果并打印每个设备上的可用空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
event_loop = asyncio.get_event_loop()
try:
return_code, results = event_loop.run_until_complete(
run_df(event_loop)
)
finally:
event_loop.close()
if return_code:
print('error exit {}'.format(return_code))
else:
print('\nFree space:')
for r in results:
print('{Mounted:25}: {Avail}'.format(**r))

下面的输出显示了所用步骤的顺序,以及运行系统的三个驱动器上的可用空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ python3 asyncio_subprocess_protocol.py
in run_df
launching process
process started 49675
waiting for process to complete
read 332 bytes from stdout
process exited
return code 0
parsing results
Free space:
/ : 233Gi
/Volumes/hubertinternal : 157Gi
/Volumes/hubert-tm : 2.3Ti

使用协程和流子进程

要使用协程直接运行进程,而不是通过协议子类访问它,调用create_subprocess_exec()并指定哪个stdout,stderr和stdin连接到管道。协程生成子进程的结果是一个Process实例,可用于操纵子进程或与之进行通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import asyncio.subprocess
async def run_df():
print('in run_df')
buffer = bytearray()
create = asyncio.create_subprocess_exec(
'df', '-hl',
stdout=asyncio.subprocess.PIPE,
)
print('launching process')
proc = await create
print('process started {}'.format(proc.pid))

在这个例子中,除了命令行参数外,df不需要任何输入,所以下一步就是读取所有的输出。有了Protocol,就无法控制一次读取多少数据。这个例子使用readline(),但是它也可以直接调用read()来读取不是面向行的数据。与协议示例一样,该命令的输出被缓冲,因此可以稍后进行解析。

1
2
3
4
5
6
7
while True:
line = await proc.stdout.readline()
print('read {!r}'.format(line))
if not line:
print('no more output from command')
break
buffer.extend(line)

当程序运行完成没有更多的输出时,readline()方法返回一个空的字节字符串。为确保正确清理进程,下一步是等待进程完全退出。

1
2
print('waiting for process to complete')
await proc.wait()

此时,可以检查退出状态,以确定是解析输出还是处理错误,因为它不产生输出。解析逻辑和前面的例子是一样的,但是是独立的函数(这里没有显示),因为没有协议类来隐藏它。在解析数据之后,返回结果和退出代码给来调用者。

1
2
3
4
5
6
7
8
9
return_code = proc.returncode
print('return code {}'.format(return_code))
if not return_code:
cmd_output = bytes(buffer).decode()
results = _parse_results(cmd_output)
else:
results = []
return (return_code, results)

主程序与基于协议的示例类似,因为实现更改在run_df()中是独立的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
event_loop = asyncio.get_event_loop()
try:
return_code, results = event_loop.run_until_complete(
run_df()
)
finally:
event_loop.close()
if return_code:
print('error exit {}'.format(return_code))
else:
print('\nFree space:')
for r in results:
print('{Mounted:25}: {Avail}'.format(**r))

由于df的输出可以一次读取一行,因此可以显示程序的进度。否则,输出看起来类似于前面的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ python3 asyncio_subprocess_coroutine.py
in run_df
launching process
process started 49678
read b'Filesystem Size Used Avail Capacity iused
ifree %iused Mounted on\n'
read b'/dev/disk2s2 446Gi 213Gi 233Gi 48% 55955082
61015132 48% /\n'
read b'/dev/disk1 465Gi 307Gi 157Gi 67% 80514922
41281172 66% /Volumes/hubertinternal\n'
read b'/dev/disk3s2 3.6Ti 1.4Ti 2.3Ti 38% 181837749
306480579 37% /Volumes/hubert-tm\n'
read b''
no more output from command
waiting for process to complete
return code 0
parsing results
Free space:
/ : 233Gi
/Volumes/hubertinternal : 157Gi
/Volumes/hubert-tm : 2.3Ti

发送数据到子进程

前面的两个例子都只使用一个单独的通信通道从第二个进程读取数据。但是通常需要将数据发送到一个命令进行处理。这个例子定义了一个协程来执行Unix命令tr来转换输入流中的字符。在这种情况下,tr用于将小写字母转换为大写字母。to_upper()协程采用事件循环和输入字符串作为参数。它产生了第二个进程运行”tr [:lower:] [:upper:]”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import asyncio.subprocess
async def to_upper(input):
print('in to_upper')
create = asyncio.create_subprocess_exec(
'tr', '[:lower:]', '[:upper:]',
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
print('launching process')
proc = await create
print('pid {}'.format(proc.pid))

接下来,to_upper()使用Process的communications()方法将输入字符串发送到命令,并异步读取所有得到的输出。跟ubprocess.Popen版本的方法一样,communications()返回完整的输出字节字符串。如果一个命令可能产生的数据比可以放进内存的数据更多或者输出必须逐步处理,那么输入就不能一次完成,可以直接使用Process的stdin,stdout和stderr句柄,而不用调用communic()。

1
2
print('communicating with process')
stdout, stderr = await proc.communicate(input.encode())

I/O完成后,等待进程完全退出,确保正确清理。

1
2
print('waiting for process to complete')
await proc.wait()

然后可以检查返回代码,并将输出字节字符串解码,以便从协程中准备返回值。

1
2
3
4
5
6
7
8
return_code = proc.returncode
print('return code {}'.format(return_code))
if not return_code:
results = bytes(stdout).decode()
else:
results = ''
return (return_code, results)

程序的主要部分建立一个要转换的消息字符串,然后设置事件循环来运行to_upper()并打印结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
MESSAGE = """
This message will be converted
to all caps.
"""
event_loop = asyncio.get_event_loop()
try:
return_code, results = event_loop.run_until_complete(
to_upper(MESSAGE)
)
finally:
event_loop.close()
if return_code:
print('error exit {}'.format(return_code))
else:
print('Original: {!r}'.format(MESSAGE))
print('Changed : {!r}'.format(results))

输出显示操作的顺序,然后显示如何转换简单的文本消息。

1
2
3
4
5
6
7
8
9
10
$ python3 asyncio_subprocess_coroutine_write.py
in to_upper
launching process
pid 49684
communicating with process
waiting for process to complete
return code 0
Original: '\nThis message will be converted\nto all caps.\n'
Changed : '\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n'

本文翻译自《The Python3 Standard Library By Example》asyncio相关章节

文章目录
  1. 1. 使用协议抽象子进程
  2. 2. 使用协程和流子进程
  3. 3. 发送数据到子进程
|