传输和协议¶
序言
传输和协议由 low-level 事件循环API,例如 loop.create_connection()
.它们使用基于回调的编程风格,并支持网络或IPC协议(如HTTP)的高性能实现。
本质上,传输和协议应该只在库和框架中使用,而不应该在高级异步应用程序中使用。
本文档页包括 Transports 和 Protocols .
介绍
在最高层次上,交通部门关心的是 how 字节被传输,而协议决定 哪一个 要传输的字节(以及在某种程度上)。
另一种说法是:从传输的角度来看,传输是套接字(或类似的I/O端点)的抽象,而协议是应用程序的抽象。
另一个视图是传输接口和协议接口共同定义了一个抽象接口,用于使用网络I/O和进程间I/O。
传输和协议对象之间总是有1:1的关系:协议调用传输方法发送数据,而传输调用协议方法传递已接收的数据。
大多数面向连接的事件循环方法(例如 loop.create_connection()
)通常接受 protocol_factory 用于创建 协议 接受连接的对象,由*transport*对象。这种方法通常返回 (transport, protocol)
.
目录
本文档页包含以下部分:
这个 Transports 节文档异步
BaseTransport
,ReadTransport
,WriteTransport
,Transport
,DatagramTransport
和SubprocessTransport
类。这个 Protocols 节文档异步
BaseProtocol
,Protocol
,BufferedProtocol
,DatagramProtocol
和SubprocessProtocol
类。这个 Examples 部分展示了如何使用传输、协议和低级事件循环API。
传输工具¶
源代码: Lib/asyncio/transports.py
传输是由提供的类 asyncio
为了抽象出各种各样的沟通渠道。
传输对象始终由 asyncio event loop .
Asyncio实现TCP、UDP、SSL和子进程管道的传输。传输上可用的方法取决于传输的类型。
传输等级是 not thread safe .
传输层次结构¶
- class asyncio.BaseTransport¶
所有传输的基类。包含所有异步传输共享的方法。
- class asyncio.WriteTransport(BaseTransport)¶
只写连接的基本传输。
的实例 WriteTransport 类从返回
loop.connect_write_pipe()
事件循环方法,也可用于与子流程相关的方法,如loop.subprocess_exec()
.
- class asyncio.ReadTransport(BaseTransport)¶
用于只读连接的基本传输。
的实例 ReadTransport 类从返回
loop.connect_read_pipe()
事件循环方法,也可用于与子流程相关的方法,如loop.subprocess_exec()
.
- class asyncio.Transport(WriteTransport, ReadTransport)¶
表示双向传输的接口,如TCP连接。
用户不直接实例化传输;他们调用实用程序函数,将其传递给协议工厂以及创建传输和协议所需的其他信息。
的实例*transport*类从事件循环方法返回或由其使用,如
loop.create_connection()
,loop.create_unix_connection()
,loop.create_server()
,loop.sendfile()
等。
- class asyncio.DatagramTransport(BaseTransport)¶
数据报(UDP)连接的传输。
的实例 DatagramTransport 类从返回
loop.create_datagram_endpoint()
事件循环方法。
- class asyncio.SubprocessTransport(BaseTransport)¶
表示父操作系统进程与其子操作系统进程之间的连接的抽象。
的实例 SubprocessTransport 类从事件循环方法返回
loop.subprocess_shell()
和loop.subprocess_exec()
.
基础传输¶
- BaseTransport.close()¶
关闭传输。
如果传输具有用于传出数据的缓冲区,则缓冲数据将异步刷新。将不再接收数据。刷新所有缓冲数据后,协议的
protocol.connection_lost()
方法将用调用None
作为它的参数。
- BaseTransport.is_closing()¶
返回
True
如果传输关闭或关闭。
- BaseTransport.get_extra_info(name, default=None)¶
返回有关其使用的传输或基础资源的信息。
name 表示要获取的传输特定信息的字符串。
default 如果信息不可用,或者传输不支持使用给定的第三方事件循环实现或在当前平台上查询,则返回的值。
例如,以下代码尝试获取传输的基础套接字对象:
sock = transport.get_extra_info('socket') if sock is not None: print(sock.getsockopt(...))
在某些传输上可以查询的信息类别:
Socket:
'peername'
:套接字连接到的远程地址,结果为socket.socket.getpeername()
(None
出错时)'socket'
:socket.socket
实例'sockname'
: the socket's own address, result ofsocket.socket.getsockname()
SSL套接字:
'compression'
: the compression algorithm being used as a string, orNone
if the connection isn't compressed; result ofssl.SSLSocket.compression()
'cipher'
: a three-value tuple containing the name of the cipher being used, the version of the SSL protocol that defines its use, and the number of secret bits being used; result ofssl.SSLSocket.cipher()
'peercert'
: peer certificate; result ofssl.SSLSocket.getpeercert()
'sslcontext'
:ssl.SSLContext
实例'ssl_object'
:ssl.SSLObject
或ssl.SSLSocket
实例
管道:
'pipe'
管道对象
子过程:
'subprocess'
:subprocess.Popen
实例
- BaseTransport.set_protocol(protocol)¶
设置新协议。
只有当两个协议都被记录为支持交换机时,才应该执行交换协议。
- BaseTransport.get_protocol()¶
返回当前协议。
只读传输¶
- ReadTransport.is_reading()¶
返回
True
如果传输正在接收新数据。3.7 新版功能.
- ReadTransport.pause_reading()¶
暂停传输的接收端。不会向协议的
protocol.data_received()
方法直到resume_reading()
被称为。在 3.7 版更改: 方法是等幂的,即当传输已经暂停或关闭时可以调用它。
- ReadTransport.resume_reading()¶
恢复接收端。协议的
protocol.data_received()
如果某些数据可供读取,将再次调用方法。在 3.7 版更改: 方法是等幂的,即当传输已经在读取时可以调用它。
只写传输¶
- WriteTransport.abort()¶
立即关闭传输,而不等待挂起的操作完成。缓冲数据将丢失。将不再接收数据。协议的
protocol.connection_lost()
方法最终将用None
作为它的参数。
- WriteTransport.can_write_eof()¶
返回
True
如果传输支持write_eof()
,False
如果没有。
- WriteTransport.get_write_buffer_size()¶
返回传输使用的输出缓冲区的当前大小。
- WriteTransport.get_write_buffer_limits()¶
得到 high 和 low 用于写入流控制的水印。返回元组
(low, high)
在哪里? low 和 high 是字节的正数。使用
set_write_buffer_limits()
设置限制。3.4.2 新版功能.
- WriteTransport.set_write_buffer_limits(high=None, low=None)¶
设置 high 和 low 用于写入流控制的水印。
这两个值(以字节数度量)控制协议
protocol.pause_writing()
和protocol.resume_writing()
方法被调用。如果指定,低水印必须小于或等于高水印。既不 high 也不 low 可以是负数。pause_writing()
当缓冲区大小大于或等于 high 价值。如果写入已暂停,resume_writing()
当缓冲区大小小于或等于 low 价值。默认值是特定于实现的。如果只给出高水印,则低水印默认为小于或等于高水印的特定于实现的值。设置 high 零兵力 low 归零,并导致
pause_writing()
当缓冲区变为非空时调用。设置 low 零原因resume_writing()
只在缓冲区为空时调用。对任一限制使用零通常都是次优的,因为它减少了同时进行I/O和计算的机会。使用
get_write_buffer_limits()
达到极限。
- WriteTransport.write(data)¶
写一些 data 传输字节。
此方法不阻塞;它缓冲数据并安排数据异步发送。
- WriteTransport.writelines(list_of_data)¶
向传输写入数据字节列表(或任何不可写入的字节)。这在功能上等同于调用
write()
在iterable生成的每个元素上,但可以更有效地实现。
- WriteTransport.write_eof()¶
刷新所有缓冲数据后,关闭传输的写入端。数据仍可能被接收。
这种方法可以提高
NotImplementedError
如果传输(如ssl)不支持半封闭连接。
数据报传输¶
- DatagramTransport.sendto(data, addr=None)¶
发送 data 到远程对等机的字节 addr (与传输相关的目标地址)。如果 addr 是
None
,数据将发送到传输创建时给定的目标地址。此方法不阻塞;它缓冲数据并安排数据异步发送。
- DatagramTransport.abort()¶
立即关闭传输,而不等待挂起的操作完成。缓冲数据将丢失。将不再接收数据。协议的
protocol.connection_lost()
方法最终将用None
作为它的参数。
子流程传输¶
- SubprocessTransport.get_pid()¶
以整数形式返回子进程ID。
- SubprocessTransport.get_pipe_transport(fd)¶
返回与整数文件描述符对应的通信管道的传输 fd :
0
: readable streaming transport of the standard input (stdin), orNone
if the subprocess was not created withstdin=PIPE
1
: writable streaming transport of the standard output (stdout), orNone
if the subprocess was not created withstdout=PIPE
2
: writable streaming transport of the standard error (stderr), orNone
if the subprocess was not created withstderr=PIPE
其他 fd :
None
- SubprocessTransport.get_returncode()¶
以整数形式返回子进程返回代码,或者
None
如果它没有返回,这与subprocess.Popen.returncode
属性。
- SubprocessTransport.kill()¶
终止子进程。
在POSIX系统上,函数将sigkill发送到子进程。在Windows上,此方法是
terminate()
.
- SubprocessTransport.send_signal(signal)¶
发送 信号 子进程的编号,如
subprocess.Popen.send_signal()
.
- SubprocessTransport.terminate()¶
停止子进程。
在POSIX系统上,此方法将sigterm发送到子进程。在Windows上,调用Windows API函数TerminateProcess()停止子进程。
协议¶
Asyncio提供了一组抽象的基类,这些类应该用于实现网络协议。这些类应该与 transports .
抽象基本协议类的子类可以实现一些或所有方法。所有这些方法都是回调:它们由某些事件的传输调用,例如在接收到某些数据时。基本协议方法应该由相应的传输调用。
基本协议¶
- class asyncio.BaseProtocol¶
具有所有协议共享的方法的基本协议。
- class asyncio.Protocol(BaseProtocol)¶
用于实现流协议(TCP、Unix套接字等)的基类。
- class asyncio.BufferedProtocol(BaseProtocol)¶
用于实现流协议的基类,并手动控制接收缓冲区。
- class asyncio.DatagramProtocol(BaseProtocol)¶
用于实现数据报(UDP)协议的基类。
- class asyncio.SubprocessProtocol(BaseProtocol)¶
用于实现与子进程(单向管道)通信的协议的基类。
基本协议¶
所有异步协议都可以实现基本协议回调。
连接回调
在所有协议上调用连接回调,每次成功连接只调用一次。所有其他协议回调只能在这两个方法之间调用。
- BaseProtocol.connection_made(transport)¶
在建立连接时调用。
这个*transport*参数是表示连接的传输。协议负责存储对其传输的引用。
流控制回调
流控制回调可以由传输调用,以暂停或恢复由协议执行的写入操作。
参见 set_write_buffer_limits()
方法获取更多详细信息。
- BaseProtocol.pause_writing()¶
当传输的缓冲区超过高水位线时调用。
- BaseProtocol.resume_writing()¶
当传输的缓冲区低于低水位线时调用。
如果缓冲区大小等于高水印, pause_writing()
未调用:缓冲区大小必须严格超过。
相反地, resume_writing()
当缓冲区大小等于或小于低水印时调用。这些结束条件对于确保当任何一个标记为零时事情按预期进行都很重要。
流协议¶
事件方法,例如 loop.create_server()
, loop.create_unix_server()
, loop.create_connection()
, loop.create_unix_connection()
, loop.connect_accepted_socket()
, loop.connect_read_pipe()
和 loop.connect_write_pipe()
接受返回流协议的工厂。
- Protocol.data_received(data)¶
当接收到某些数据时调用。 data 是包含传入数据的非空字节对象。
数据是否被缓冲、分块或重新组装取决于传输。一般来说,您不应该依赖于特定的语义,而应该使您的解析具有通用性和灵活性。但是,总是以正确的顺序接收数据。
当连接打开时,可以任意多次调用该方法。
然而,
protocol.eof_received()
最多调用一次。一次 eof_received() 被称为data_received()
不再调用。
- Protocol.eof_received()¶
当另一端发出信号时调用,它将不再发送任何数据(例如通过调用
transport.write_eof()
,如果另一端也使用Asyncio)。此方法可能返回一个错误值(包括
None
,在这种情况下,传输将自动关闭。相反,如果此方法返回一个真值,则使用的协议决定是否关闭传输。因为默认实现返回None
,它隐式关闭连接。一些传输(包括SSL)不支持半封闭连接,在这种情况下,从该方法返回true将导致连接被关闭。
状态机:
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
缓冲流协议¶
3.7 新版功能.
缓冲协议可以与支持的任何事件循环方法一起使用 Streaming Protocols .
BufferedProtocol
实现允许显式地手动分配和控制接收缓冲区。然后,事件循环可以使用协议提供的缓冲区来避免不必要的数据复制。这可能导致接收大量数据的协议的性能显著提高。复杂的协议实现可以显著减少缓冲区分配的数量。
调用以下回调 BufferedProtocol
实例:
- BufferedProtocol.get_buffer(sizehint)¶
调用以分配新的接收缓冲区。
西辛 是返回缓冲区的建议最小大小。返回的缓冲区小于或大于 西辛 建议。当设置为-1时,缓冲区大小可以是任意的。返回零大小的缓冲区是错误的。
get_buffer()
必须返回实现 buffer protocol .
- BufferedProtocol.buffer_updated(nbytes)¶
用接收的数据更新缓冲区时调用。
字节数 是写入缓冲区的字节总数。
- BufferedProtocol.eof_received()¶
参见
protocol.eof_received()
方法。
get_buffer()
在连接过程中可以任意多次调用。然而, protocol.eof_received()
最多调用一次,如果调用, get_buffer()
和 buffer_updated()
以后不会再调用了。
状态机:
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
数据报协议¶
数据报协议实例应由传递给 loop.create_datagram_endpoint()
方法。
- DatagramProtocol.datagram_received(data, addr)¶
当接收到数据报时调用。 data 是包含传入数据的字节对象。 addr 是发送数据的对等机的地址;确切的格式取决于传输。
- DatagramProtocol.error_received(exc)¶
当上一个发送或接收操作引发
OSError
. exc 是OSError
实例。当传输(例如UDP)检测到数据报无法传递到其收件人时,在极少数情况下调用此方法。但在许多情况下,无法传递的数据报将被静默丢弃。
注解
在BSD系统(MacOS、FreeBSD等)上,数据报协议不支持流控制,因为没有可靠的方法检测由于写入太多数据包而导致的发送失败。
套接字总是显示为“就绪”,多余的数据包被丢弃。一个 OSError
具有 errno
设置为 errno.ENOBUFS
可以提出也可以不提出;如果提出,将报告给 DatagramProtocol.error_received()
但除此之外被忽略。
子进程协议¶
数据报协议实例应由传递给 loop.subprocess_exec()
和 loop.subprocess_shell()
方法。
- SubprocessProtocol.pipe_data_received(fd, data)¶
当子进程将数据写入其stdout或stderr管道时调用。
fd 是管道的整数文件描述符。
data 是包含接收数据的非空字节对象。
- SubprocessProtocol.pipe_connection_lost(fd, exc)¶
当与子进程通信的管道之一关闭时调用。
fd 是已关闭的整数文件描述符。
- SubprocessProtocol.process_exited()¶
当子进程退出时调用。
实例¶
TCP回音服务器¶
使用创建TCP Echo服务器 loop.create_server()
方法,返回接收到的数据,然后关闭连接::
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
参见
这个 TCP echo server using streams 示例使用高级 asyncio.start_server()
功能。
TCP回音客户端¶
使用的TCP Echo客户端 loop.create_connection()
方法,发送数据,并等待连接关闭:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
参见
这个 TCP echo client using streams 示例使用高级 asyncio.open_connection()
功能。
UDP回声服务器¶
一个UDP Echo服务器,使用 loop.create_datagram_endpoint()
方法,将接收到的数据发送回:
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
UDP Echo客户端¶
一个UDP Echo客户端,使用 loop.create_datagram_endpoint()
方法,发送数据并在收到答案时关闭传输:
import asyncio
class EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Hello World!"
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, on_con_lost),
remote_addr=('127.0.0.1', 9999))
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
连接现有Socket¶
等待直到套接字使用 loop.create_connection()
具有协议的方法:
import asyncio
import socket
class MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport;
# connection_lost() will be called automatically.
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Create a pair of connected sockets
rsock, wsock = socket.socketpair()
# Register the socket to wait for data.
transport, protocol = await loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Simulate the reception of data from the network.
loop.call_soon(wsock.send, 'abc'.encode())
try:
await protocol.on_con_lost
finally:
transport.close()
wsock.close()
asyncio.run(main())
参见
这个 watch a file descriptor for read events 示例使用低级 loop.add_reader()
方法注册一个fd。
这个 register an open socket to wait for data using streams 示例使用由 open_connection()
在协程中起作用。
loop.subprocess_exec()和subprocessProtocol¶
用于获取子进程输出并等待子进程退出的子进程协议的示例。
子进程由 loop.subprocess_exec()
方法:
import asyncio
import sys
class DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
code = 'import datetime; print(datetime.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
transport, protocol = await loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', code,
stdin=None, stderr=None)
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await exit_future
# Close the stdout pipe.
transport.close()
# Read the output which was collected by the
# pipe_data_received() method of the protocol.
data = bytes(protocol.output)
return data.decode('ascii').rstrip()
date = asyncio.run(get_date())
print(f"Current date: {date}")
也见 same example 使用高级API编写。