asyncio之协议抽象类的异步I/O

协议抽象类的异步I/O

到目前为止,这些例子都避免了混合并发和I/O操作,一次只关注一个概念。然后当I/O阻塞时切换上下文是asyncio的主要是用情景之一。前面已经介绍过并发的概念,本章节将介绍两个实现简单的echo服务器和客户端,跟socket和socketserver章节使用的例子类似。客户端可以连接到服务器,发送一些数据,然后接收相同的数据作为响应。每次发起I/O操作时,执行代码都会放弃对事件循环的控制,允许其他任务运行,直到I/O准备就绪。

Echo Server

服务器通过导入设置所需的模块asyncio和logging来启动,然后创建一个事件循环对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import logging
import sys
SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')
event_loop = asyncio.get_event_loop()

然后定义一个asyncio.Protocol的子类来处理客户端通信。协议对象的方法是基于与服务器socket关联的事件来调用的。

1
class EchoServer(asyncio.Protocol):

每个新的客户端连接都会触发调用connection_made()。transport参数是asyncio.Transport的一个实例,它提供了使用socket进行异步I/O的抽象。不同类型的通信提供不同的传输实现,全部使用相同的API。例如,有单独的传输类用于处理socket和用于处理子进程的管道。传入客户端的地址可通过一个特殊实现的方法get_extra_info()从传输中获得。

1
2
3
4
5
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log = logging.getLogger('EchoServer_{}_{}'.format(*self.address))
self.log.debug('connection accepted')

连接建立后,当数据从客户端发送到服务器调用协议的data_received()方法传递数据进行处理。数据以字节字符串形式进行传递,应用程序以适当的方式解码。在这里记录结果,然后通过调用transport.write()立即将响应发送回客户端。

1
2
3
4
def data_received(self, data):
self.log.debug('received {!r}'.format(data))
self.transport.write(data)
self.log.debug('sent {!r}'.format(data))

一些传输支持特殊的文件结束指示符(EOF)。当遇到EOF时,eof_received()被调用。在这里的实现中,EOF被发送回客户端以表明它已被接收。因为不是所有的传输都支持明确的EOF,该协议首先询问运输工具是否安全发送EOF。

1
2
3
4
def eof_received(self):
self.log.debug('received EOF')
if self.transport.can_write_eof():
self.transport.write_eof()

当连接关闭时,通常或者由于错误,协议的connection_lost()方法将会被调用。如果出现错误,则参数包含适当的异常对象。否则它是None。

1
2
3
4
5
6
def connection_lost(self, error):
if error:
self.log.error('ERROR: {}'.format(error))
else:
self.log.debug('closing')
super().connection_lost(error)

通过两个步骤来启动服务器,首先,应用程序通知事件循环,使用协议类以及要侦听的主机名和socket创建一个新的服务器对象。create_server()方法是一个协程,所以结果必须由事件循环处理才能真正启动服务器。完成协程会生成绑定到事件循环的asyncio.Server实例。

1
2
3
4
5
# 创建服务器,然后让循环完成协程
factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)
# 启动真正的事件循环
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

然后,需要运行事件循环才能处理事件并处理客户端请求。对于长期运行的服务,run_forever()方法是最简单的方法。当事件循环停止时,无论是通过应用程序代码还是通过发信号通知进程,都可以关闭服务器以正确清理socket,然后在程序退出之前事件循环可以关闭,以完成处理任何其他的协程。

1
2
3
4
5
6
7
8
9
# 永久运行事件循环以处理所有连接
try:
event_loop.run_forever()
finally:
log.debug('closing server')
server.close()
event_loop.run_until_complete(server.wait_closed())
log.debug('closing event loop')
event_loop.close()

Echo Client

使用协议类构建客户端与构建服务器非常相似。代码再次通过导入设置所需的模块asyncio和logging来启动,然后创建一个事件循环对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio
import functools
import logging
import sys
MESSAGES = [
b'This is the message. ',
b'It will be sent ',
b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')
event_loop = asyncio.get_event_loop

客户端协议类定义与服务器相同的方法,具有不同的实现。类的构造函数接受两个参数,要发送的消息的列表以及用于通过从服务器接收响应来发信号通知客户已经完成了一个工作周期的Future实例。

1
2
3
4
5
6
7
class EchoClient(asyncio.Protocol):
def __init__(self, messages, future):
super().__init__()
self.messages = messages
self.log = logging.getLogger('EchoClient')
self.f = future

当客户端成功连接到服务器时,它立即开始通信。尽管底层网络代码可以将多个消息组合成一个传输,但是消息序列一次只能发送一个消息。当所有的消息都发送完成时,发送一个EOF。
虽然看起来数据全部是立即发送,实际上,传输对象缓冲传出数据,并在socket的缓冲区准备好接收数据时设置一个实际传输的回调。这一切都是透明处理的,所以应用程序代码可以像I/O操作马上发生一样写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log.debug(
'connecting to {} port {}'.format(*self.address)
)
# This could be transport.writelines() except that
# would make it harder to show each part of the message
# being sent.
for msg in self.messages:
transport.write(msg)
self.log.debug('sending {!r}'.format(msg))
if transport.can_write_eof():
transport.write_eof()

当收到来自服务器的响应时,记录下来。

1
2
def data_received(self, data):
self.log.debug('received {!r}'.format(data))

当接收到文件结束标记或从服务器端关闭连接时,本地传输对象关闭,Future对象通过设置结果被标记为已完成。

1
2
3
4
5
6
7
8
9
10
11
12
def eof_received(self):
self.log.debug('received EOF')
self.transport.close()
if not self.f.done():
self.f.set_result(True)
def connection_lost(self, exc):
self.log.debug('server closed connection')
self.transport.close()
if not self.f.done():
self.f.set_result(True)
super().connection_lost(exc)

协议类通常被传递给事件循环来创建连接。在这种情况下,因为事件循环无法将多余的参数传递给协议构造函数,有必要创建一个partial来包装客户端类并传递消息列表和Future实例进行发送。然后在调用create_connection()建立客户端连接时,使用新的可调用代替类。

1
2
3
4
5
6
7
8
9
10
11
client_completed = asyncio.Future()
client_factory = functools.partial(
EchoClient,
messages=MESSAGES,
future=client_completed,
)
factory_coroutine = event_loop.create_connection(
client_factory,
*SERVER_ADDRESS,
)

要启动客户端运行,事件循环被调用一次与协程创建客户端,然后再与Future实例交给客户端进行通信。使用这样的两个调用可以避免在客户端程序中产生一个无限循环,在完成与服务器的通信后,可能要退出。如果只有第一个调用被用来等待协程来创建客户端,它可能不会处理所有的响应数据,并正确地清理到服务器的连接。

1
2
3
4
5
6
7
log.debug('waiting for client to complete')
try:
event_loop.run_until_complete(factory_coroutine)
event_loop.run_until_complete(client_completed)
finally:
log.debug('closing event loop')
event_loop.close()

输出

在一个窗口中运行服务器,在另一个窗口中运行客户端将生成以下输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop
$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop
$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop

尽管客户端总是单独发送消息,但是客户端第一次运行服务器却会收到一条大消息,并将消息回传给客户端。这些结果在后续运行中会有所不同,具体取决于网络的繁忙程度以及在准备好所有数据之前是否刷新了网络缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ python3 asyncio_echo_server_protocol.py
asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
EchoServer_::1_63347: connection accepted
EchoServer_::1_63347: received b'This is the message. It will be sent in parts.'
EchoServer_::1_63347: sent b'This is the message. It will be sent in parts.'
EchoServer_::1_63347: received EOF
EchoServer_::1_63347: closing
EchoServer_::1_63387: connection accepted
EchoServer_::1_63387: received b'This is the message. '
EchoServer_::1_63387: sent b'This is the message. '
EchoServer_::1_63387: received b'It will be sent in parts.'
EchoServer_::1_63387: sent b'It will be sent in parts.'
EchoServer_::1_63387: received EOF
EchoServer_::1_63387: closing
EchoServer_::1_63389: connection accepted
EchoServer_::1_63389: received b'This is the message. It will be sent '
EchoServer_::1_63389: sent b'This is the message. It will be sent '
EchoServer_::1_63389: received b'in parts.'
EchoServer_::1_63389: sent b'in parts.'
EchoServer_::1_63389: received EOF
EchoServer_::1_63389: closing

其他实例

asyncio官方文档也提供了相应的例子,下面展示给大家:
TCP Echo Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
class EchoServerClientProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

TCP Echo Client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
'127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

UDP Echo Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
transport.close()
loop.close()

UDP Echo Client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
class EchoClientProtocol:
def __init__(self, message, loop):
self.message = message
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Socket closed, stop the event loop")
loop = asyncio.get_event_loop()
loop.stop()
loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, loop),
remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

本文翻译自《The Python3 Standard Library By Example》asyncio相关章节

文章目录
  1. 1. Echo Server
  2. 2. Echo Client
  3. 3. 输出
  4. 4. 其他实例
|