流动
请求流
Sanic允许您按流获取请求数据,如下所示。当请求结束时, await request.stream.read() 收益率 None . 只有post、put和patch decorator有流参数。
from sanic import Sanic
from sanic.views import CompositionView
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator
from sanic.blueprints import Blueprint
from sanic.response import stream, text
bp = Blueprint('blueprint_request_stream')
app = Sanic(__name__)
class SimpleView(HTTPMethodView):
@stream_decorator
async def post(self, request):
result = ''
while True:
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8')
return text(result)
@app.post('/stream', stream=True)
async def handler(request):
async def streaming(response):
while True:
body = await request.stream.read()
if body is None:
break
body = body.decode('utf-8').replace('1', 'A')
await response.write(body)
return stream(streaming)
@bp.put('/bp_stream', stream=True)
async def bp_put_handler(request):
result = ''
while True:
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)
# You can also use `bp.add_route()` with stream argument
async def bp_post_handler(request):
result = ''
while True:
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8').replace('1', 'A')
return text(result)
bp.add_route(bp_post_handler, '/bp_stream', methods=['POST'], stream=True)
async def post_handler(request):
result = ''
while True:
body = await request.stream.read()
if body is None:
break
result += body.decode('utf-8')
return text(result)
app.blueprint(bp)
app.add_route(SimpleView.as_view(), '/method_view')
view = CompositionView()
view.add(['POST'], post_handler, stream=True)
app.add_route(view, '/composition_view')
if __name__ == '__main__':
app.run(host='127.0.0.1', port=8000)
响应流
Sanic允许您使用 stream 方法。此方法接受一个协程回调,该回调通过 StreamingHTTPResponse 写入的对象。一个简单的例子如下:
from sanic import Sanic
from sanic.response import stream
app = Sanic(__name__)
@app.route("/")
async def test(request):
async def sample_streaming_fn(response):
await response.write('foo,')
await response.write('bar')
return stream(sample_streaming_fn, content_type='text/csv')
这在希望将内容流式传输到源于外部服务(如数据库)的客户端的情况下非常有用。例如,可以使用异步游标将数据库记录流式传输到客户端 asyncpg 提供:
@app.route("/")
async def index(request):
async def stream_from_db(response):
conn = await asyncpg.connect(database='test')
async with conn.transaction():
async for record in conn.cursor('SELECT generate_series(0, 10)'):
await response.write(record[0])
return stream(stream_from_db)
如果客户端支持HTTP/1.1,Sanic将使用 chunked transfer encoding ;可以使用 chunked 选择权 stream 功能。
文件流
Sanic提供 sanic.response.file_stream 当您要发送大文件时非常有用的函数。它返回一个 StreamingHTTPResponse 对象,默认情况下将使用分块传输编码;因此Sanic不添加 Content-Length 响应中的HTTP头。如果要使用此头,可以禁用分块传输编码并手动添加它:
from aiofiles import os as async_os
from sanic.response import file_stream
@app.route("/")
async def index(request):
file_path = "/srv/www/whatever.png"
file_stat = await async_os.stat(file_path)
headers = {"Content-Length": str(file_stat.st_size)}
return await file_stream(
file_path,
headers=headers,
chunked=False,
)