asyncio之使用协程和流的异步I/O

使用协程和流的异步I/O

本节将继续实现简单回显服务器和客户端的两个示例程序的备用版本,使用协程和asyncio的流API而不是上一节的协议和传输类抽象。这些示例的操作比先前讨论的协议API的抽象级别要低,但是正在处理的事件是相似的。

Echo Server

服务器通过导入设置所需的模块asyncio和logging来启动,然后创建一个事件循环对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
import logging
import sys
SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')
event_loop = asyncio.get_event_loop()

然后定义一个协程来处理通信,每次客户端连接时,将调用协程的新实例,以便在该函数内代码一次只与一个客户端进行通信。Python的语言运行时管理每个协程实例的状态,所以应用程序代码不需要管理任何额外的数据结构来跟踪不同的客户端。
协程的参数是与新连接关联的StreamReader和StreamWriter实例。和Transport一样,可以通过特殊的方法get_extra_info()来访问客户端地址。

1
2
3
4
async def echo(reader, writer):
address = writer.get_extra_info('peername')
log = logging.getLogger('echo_{}_{}'.format(*address))
log.debug('connection accepted')

虽然在建立连接时调用协程,但是可能没有任何数据要读取。为避免在读取时被阻塞,协程使用await和read()调用来等待,以允许事件循环继续处理其他任务,直到有数据要读取为止。

1
2
while True:
data = await reader.read(128)

如果客户端发送数据,它将从await中返回,并通过将其传递给写入器而返回给客户端。可以使用多个write()调用来缓冲输出数据,然后使用drain()来刷新结果。由于刷新网络I/O可以阻塞,再次await用于恢复对事件循环的控制,该事件循环监视写入socket并在可以发送更多数据时调用写入器。

1
2
3
4
5
if data:
log.debug('received {!r}'.format(data))
writer.write(data)
await writer.drain()
log.debug('sent {!r}'.format(data))

如果客户端没有发送数据,则read()返回一个空的字节字符串,表示连接已关闭。服务器需要关闭写入客户端的socket,然后协程可以返回以表明它已经完成。

1
2
3
4
else:
log.debug('closing')
writer.close()
return

有两个步骤来启动服务器。首先,应用程序告诉事件循环使用协程和主机名以及要监听的socket创建一个新的服务器对象。start_server()方法本身就是一个协程,所以结果必须由事件循环处理才能真正启动服务器。完成协程会生成绑定到事件循环的asyncio.Server实例。

1
2
3
factory = asyncio.start_server(echo, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

然后,需要运行事件循环才能处理事件并处理客户端请求。对于长期运行的服务,run_forever()方法是最简单的方法。当事件循环停止时,通过应用程序代码或通过发信号通知进程,可以关闭服务器并正确的清理socket,然后在程序退出之前关闭事件循环以完成处理任何其他协程。

1
2
3
4
5
6
7
8
9
10
try:
event_loop.run_forever()
except KeyboardInterrupt:
pass
finally:
log.debug('closing server')
server.close()
event_loop.run_until_complete(server.wait_closed())
log.debug('closing event loop')
event_loop.close()

Echo Client

使用协程来构建客户端与构建服务器非常相似。代码再次通过导入设置所需的模块asyncio和logging来启动,然后创建一个事件循环对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import logging
import sys
MESSAGES = [
b'This is the message. ',
b'It will be sent ',
b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')
event_loop = asyncio.get_event_loop()

echo_client协程使用参数告诉它服务器在哪里以及要发送什么消息。

1
async def echo_client(address, messages):

协程在任务启动时被调用,但是它没有活动的连接来处理。因此,第一步是让客户建立自己的连接。它使用await来避免在open_connection()协程运行时阻塞其他活动。

1
2
3
log = logging.getLogger('echo_client')
log.debug('connecting to {} port {}'.format(*address))
reader, writer = await asyncio.open_connection(*address)

open_connection()协程返回与新socket关联的StreamReader和StreamWriter实例。下一步是使用写入器将数据发送到服务器。与在服务器中一样,写入器将缓冲输出数据,直到socket准备就绪,或使用drain()来刷新结果。由于刷新网络I /O可以阻塞,再次等待用于恢复对事件循环的控制,该事件循环监视写入socket并在可以发送更多数据时调用写入器。

1
2
3
4
5
6
for msg in messages:
writer.write(msg)
log.debug('sending {!r}'.format(msg))
if writer.can_write_eof():
writer.write_eof()
await writer.drain()

接下来,客户端通过尝试读取数据来查找来自服务器的响应,直到没有数据可读的为止。为避免单个read()调用阻塞,await将控制权限交给了事件循环。如果服务器已经发送数据,则会被记录。如果服务器没有发送数据,则read()返回一个空的字节字符串,表示连接已关闭。客户端需要关闭socket才能发送到服务器,然后返回以表明它已经完成。

1
2
3
4
5
6
7
8
9
log.debug('waiting for response')
while True:
data = await reader.read(128)
if data:
log.debug('received {!r}'.format(data))
else:
log.debug('closing')
writer.close()
return

要启动客户端,使用协程来调用事件循环来创建客户端。使用run_until_complete()可以避免在客户端程序中产生无限循环。与协议示例不同,当协程完成时,不需要单独的future来发信号,因为echo_client()包含所有的客户端逻辑本身,并且只有在收到响应并关闭服务器连接之后才会返回。

1
2
3
4
5
6
7
try:
event_loop.run_until_complete(
echo_client(SERVER_ADDRESS, MESSAGES)
)
finally:
log.debug('closing event loop')
event_loop.close()

输出

在一个窗口中运行服务器,在另一个窗口中运行客户端将生成以下输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent in parts.'
echo_client: closing
main: closing event loop
$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent in parts.'
echo_client: closing
main: closing event loop
$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent '
echo_client: received b'in parts.'
echo_client: closing
main: closing event loop

虽然客户端总是单独发送消息,但客户端运行服务器的前两次会收到一条大消息,并将消息回送给客户端。这些结果在后续运行中会有所不同,具体取决于网络的繁忙程度以及在准备好所有数据之前是否刷新网络缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ python3 asyncio_echo_server_coroutine.py
asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
echo_::1_64624: connection accepted
echo_::1_64624: received b'This is the message. It will be sent in parts.'
echo_::1_64624: sent b'This is the message. It will be sent in parts.'
echo_::1_64624: closing
echo_::1_64626: connection accepted
echo_::1_64626: received b'This is the message. It will be sent in parts.'
echo_::1_64626: sent b'This is the message. It will be sent in parts.'
echo_::1_64626: closing
echo_::1_64627: connection accepted
echo_::1_64627: received b'This is the message. It will be sent '
echo_::1_64627: sent b'This is the message. It will be sent '
echo_::1_64627: received b'in parts.'
echo_::1_64627: sent b'in parts.'
echo_::1_64627: closing

其他实例

上面作者提供的例子稍微有些复杂,还有一些其他实现Echo服务器和客户端的方式,下面是笔者自己收集的一些代码示例,供大家参看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from socket import *
import asyncio
async def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
sock.setblocking(False)
while True:
client, addr = await loop.sock_accept(sock)
print("connect from ", addr)
loop.create_task(echo_handler(client))
async def echo_handler(client):
with client:
while True:
try:
data = await loop.sock_recv(client, 10000)
if not data:
break
await loop.sock_sendall(client, str.encode("Got: ") + data)
except Exception as e:
client.close()
print("connection closed")
break
print("connection closed")
loop = asyncio.get_event_loop()
loop.create_task(echo_server(('0.0.0.0', 25000)))
loop.run_forever()

aysncio官方文档提供的两个示例:
TCP Echo Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print("Received %r from %r" % (message, addr))
print("Send: %r" % message)
writer.write(data)
await writer.drain()
print("Close the client socket")
writer.close()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

TCP Echo Client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
async def tcp_echo_client(message, loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
data = await reader.read(100)
print('Received: %r' % data.decode())
print('Close the socket')
writer.close()
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))

获取HTTP响应头:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import urllib.parse
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
connect = asyncio.open_connection(url.hostname, 443, ssl=True)
else:
connect = asyncio.open_connection(url.hostname, 80)
reader, writer = await connect
query = ('HEAD {path} HTTP/1.0\r\n'
'Host: {hostname}\r\n'
'\r\n').format(path=url.path or '/', hostname=url.hostname)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print('HTTP header> %s' % line)
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_http_headers('http://www.baidu.com'))
loop.run_until_complete(task)
loop.close()

本文翻译自《The Python3 Standard Library By Example》asyncio相关章节

文章目录
  1. 1. Echo Server
  2. 2. Echo Client
  3. 3. 输出
  4. 4. 其他实例
|