溪流¶
流是用于网络连接的高级异步/等待就绪原语。流允许在不使用回调或低级协议和传输的情况下发送和接收数据。
以下是使用异步流编写的TCP Echo客户端示例:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
也见 Examples 下面部分。
流函数
以下顶级异步函数可用于创建和使用流:
- coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)¶
建立网络连接并返回一对
(reader, writer)
物体。归还的人*reader*和*writer*对象是的实例
StreamReader
和StreamWriter
类。limit 确定返回的
StreamReader
实例。默认情况下 limit 设置为64 kib。其余参数直接传递给
loop.create_connection()
.3.7 新版功能: 这个 ssl_handshake_timeout 参数。
- coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶
启动套接字服务器。
这个 client_connected_cb 每当建立新的客户机连接时都会调用回调。它收到一个
(reader, writer)
成对作为两个参数,实例StreamReader
和StreamWriter
类。client_connected_cb 可以是普通的可调用的或 coroutine function ;如果它是协程函数,它将自动作为
Task
.limit 确定返回的
StreamReader
实例。默认情况下 limit 设置为64 kib。其余参数直接传递给
loop.create_server()
.3.7 新版功能: 这个 ssl_handshake_timeout 和 start_serving 参数。
Unix套接字
- coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶
建立一个Unix套接字连接并返回一对
(reader, writer)
.类似
open_connection()
但是在Unix套接字上运行。另请参见
loop.create_unix_connection()
.Availability UNIX。
3.7 新版功能: 这个 ssl_handshake_timeout 参数。
在 3.7 版更改: 这个 path 参数现在可以是 path-like object
- coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶
启动Unix Socket服务器。
类似
start_server()
但适用于Unix套接字。另请参见
loop.create_unix_server()
.Availability UNIX。
3.7 新版功能: 这个 ssl_handshake_timeout 和 start_serving 参数。
在 3.7 版更改: 这个 path 参数现在可以是 path-like object .
StreamReader¶
- class asyncio.StreamReader¶
表示提供API以从IO流读取数据的读卡器对象。
不建议实例化 StreamReader 直接对象;使用
open_connection()
和start_server()
相反。- coroutine read(n=- 1)¶
读到 n 字节。如果 n 未提供或设置为
-1
,读取到EOF并返回所有读取字节。如果接收到EOF且内部缓冲区为空,则返回空
bytes
对象。
- coroutine readline()¶
读取一行,其中“行”是以
\n
.如果收到EOF,并且
\n
找不到,该方法返回部分读取的数据。如果接收到EOF且内部缓冲区为空,则返回空
bytes
对象。
- coroutine readexactly(n)¶
准确阅读 n 字节。
养一个
IncompleteReadError
如果之前达到EOF n 可以读取。使用IncompleteReadError.partial
属性获取部分读取的数据。
- coroutine readuntil(separator=b'\n')¶
从流中读取数据,直到 分离器 被发现。
成功后,数据和分隔符将从内部缓冲区中删除(已使用)。返回的数据将在末尾包含分隔符。
如果读取的数据量超过配置的流限制,则
LimitOverrunError
引发异常,数据保留在内部缓冲区中,可以再次读取。如果在找到完整的分隔符之前达到EOF,则
IncompleteReadError
引发异常,并重置内部缓冲区。这个IncompleteReadError.partial
属性可以包含分隔符的一部分。3.5.2 新版功能.
- at_eof()¶
返回
True
如果缓冲区为空并且feed_eof()
被叫来。
StreamWriter¶
- class asyncio.StreamWriter¶
表示提供API以将数据写入IO流的编写器对象。
不建议实例化 StreamWriter 直接对象;使用
open_connection()
和start_server()
相反。- write(data)¶
该方法尝试写入 data 立即转到基础套接字。如果失败,数据将在内部写缓冲区中排队,直到可以发送为止。
该方法应与
drain()
方法:stream.write(data) await stream.drain()
- writelines(data)¶
该方法立即向基础套接字写入字节列表(或任何iterable)。如果失败,数据将在内部写缓冲区中排队,直到可以发送为止。
该方法应与
drain()
方法:stream.writelines(lines) await stream.drain()
- close()¶
方法关闭流和基础套接字。
该方法应与
wait_closed()
方法:stream.close() await stream.wait_closed()
- can_write_eof()¶
返回
True
如果基础传输支持write_eof()
方法,False
否则。
- write_eof()¶
刷新缓冲的写入数据后,关闭流的写入端。
- transport¶
返回基础异步传输。
- get_extra_info(name, default=None)¶
访问可选的传输信息;请参阅
BaseTransport.get_extra_info()
有关详细信息。
- coroutine drain()¶
等待,直到恢复对流的写入。例子::
writer.write(data) await writer.drain()
这是一种与底层IO写缓冲区交互的流控制方法。当缓冲区的大小达到高水位线时, 排水() 块,直到缓冲区的大小降至低水位线,然后可以继续写入。当无需等待时,
drain()
立即返回。
- is_closing()¶
返回
True
如果流已关闭或正在关闭。3.7 新版功能.
实例¶
使用流的TCP Echo客户端¶
TCP Echo客户端使用 asyncio.open_connection()
功能:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
参见
这个 TCP echo client protocol 示例使用低级 loop.create_connection()
方法。
使用流的TCP Echo服务器¶
TCP Echo服务器使用 asyncio.start_server()
功能:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
参见
这个 TCP echo server protocol 示例使用 loop.create_server()
方法。
获取HTTP头¶
查询命令行上传递的URL的HTTP头的简单示例:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
用法:
python example.py http://example.com/path/page.html
或使用http:
python example.py https://example.com/path/page.html
注册打开的套接字以使用流等待数据¶
协程,等待套接字使用 open_connection()
功能:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
参见
这个 register an open socket to wait for data using a protocol 示例使用低级协议和 loop.create_connection()
方法。
这个 watch a file descriptor for read events 示例使用低级 loop.add_reader()
方法来监视文件描述符。