实例

文档的这一部分是一个简单的示例代码集合,可以帮助您快速开始应用程序开发。这些示例中的大多数都是分类的,并为您提供了指向 Sanic Repository

基本实例

示例的这一部分是提供SANIC应用程序的简单用例示例的代码集合。

简单应用程序

一个简单的sanic应用程序 async 方法与 textjson 类型响应。

from sanic import Sanic
from sanic import response as res

app = Sanic(__name__)


@app.route("/")
async def test(req):
    return res.text("I\'m a teapot", status=418)


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)
from sanic import Sanic
from sanic import response

app = Sanic(__name__)


@app.route("/")
async def test(request):
    return response.json({"test": True})


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

简单的应用程序 Sanic Views

展示简单的使用机制 sanic.viewes.HTTPMethodView 以及将其扩展为提供自定义 async 行为为 view .

from sanic import Sanic
from sanic.views import HTTPMethodView
from sanic.response import text

app = Sanic('some_name')


class SimpleView(HTTPMethodView):

    def get(self, request):
        return text('I am get method')

    def post(self, request):
        return text('I am post method')

    def put(self, request):
        return text('I am put method')

    def patch(self, request):
        return text('I am patch method')

    def delete(self, request):
        return text('I am delete method')


class SimpleAsyncView(HTTPMethodView):

    async def get(self, request):
        return text('I am async get method')

    async def post(self, request):
        return text('I am async post method')

    async def put(self, request):
        return text('I am async put method')


app.add_route(SimpleView.as_view(), '/')
app.add_route(SimpleAsyncView.as_view(), '/async')

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000, debug=True)

URL重定向

from sanic import Sanic
from sanic import response

app = Sanic(__name__)

    
@app.route('/')
def handle_request(request):
    return response.redirect('/redirect')


@app.route('/redirect')
async def test(request):
    return response.json({"Redirected": True})


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

命名URL重定向

Sanic 提供一种简单易用的方法,通过名为 url_for 它采用唯一的URL名称作为参数,并返回为其分配的实际路由。这将有助于简化在应用程序的不同部分之间重定向用户所需的工作。

from sanic import Sanic
from sanic import response

app = Sanic(__name__)


@app.route('/')
async def index(request):
    # generate a URL for the endpoint `post_handler`
    url = app.url_for('post_handler', post_id=5)
    # the URL is `/posts/5`, redirect to it
    return response.redirect(url)


@app.route('/posts/<post_id>')
async def post_handler(request, post_id):
    return response.text('Post - {}'.format(post_id))
    
if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000, debug=True)

蓝图

Sanic provides an amazing feature to group your APIs and routes under a logical collection that can easily be imported and plugged into any of your sanic application and it's called blueprints

from sanic import Blueprint, Sanic
from sanic.response import file, json

app = Sanic(__name__)
blueprint = Blueprint('name', url_prefix='/my_blueprint')
blueprint2 = Blueprint('name2', url_prefix='/my_blueprint2')
blueprint3 = Blueprint('name3', url_prefix='/my_blueprint3')


@blueprint.route('/foo')
async def foo(request):
    return json({'msg': 'hi from blueprint'})


@blueprint2.route('/foo')
async def foo2(request):
    return json({'msg': 'hi from blueprint2'})


@blueprint3.route('/foo')
async def index(request):
    return await file('websocket.html')


@app.websocket('/feed')
async def foo3(request, ws):
    while True:
        data = 'hello!'
        print('Sending: ' + data)
        await ws.send(data)
        data = await ws.recv()
        print('Received: ' + data)

app.blueprint(blueprint)
app.blueprint(blueprint2)
app.blueprint(blueprint3)

app.run(host="0.0.0.0", port=8000, debug=True)

日志功能增强

尽管 Sanic 附带了一组日志记录支持,它允许最终用户自定义在应用程序运行时处理日志记录的方式。

from sanic import Sanic
from sanic import response
import logging

logging_format = "[%(asctime)s] %(process)d-%(levelname)s "
logging_format += "%(module)s::%(funcName)s():l%(lineno)d: "
logging_format += "%(message)s"

logging.basicConfig(
    format=logging_format,
    level=logging.DEBUG
)
log = logging.getLogger()

# Set logger to override default basicConfig
sanic = Sanic()


@sanic.route("/")
def test(request):
    log.info("received request; responding with 'hey'")
    return response.text("hey")

sanic.run(host="0.0.0.0", port=8000)

下面的示例提供了一个示例代码,演示了 sanic.app.Sanic.middleware() 为了提供一种机制,为每个传入请求分配一个唯一的请求ID,并通过 aiotask-context .

'''
Based on example from https://github.com/Skyscanner/aiotask-context
and `examples/{override_logging,run_async}.py`.

Needs https://github.com/Skyscanner/aiotask-context/tree/52efbc21e2e1def2d52abb9a8e951f3ce5e6f690 or newer

$ pip install git+https://github.com/Skyscanner/aiotask-context.git
'''

import asyncio
import uuid
import logging
from signal import signal, SIGINT

from sanic import Sanic
from sanic import response

import uvloop
import aiotask_context as context

log = logging.getLogger(__name__)


class RequestIdFilter(logging.Filter):
    def filter(self, record):
        record.request_id = context.get('X-Request-ID')
        return True


LOG_SETTINGS = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'DEBUG',
            'formatter': 'default',
            'filters': ['requestid'],
        },
    },
    'filters': {
        'requestid': {
            '()': RequestIdFilter,
        },
    },
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s:%(lineno)d %(request_id)s | %(message)s',
        },
    },
    'loggers': {
        '': {
            'level': 'DEBUG',
            'handlers': ['console'],
            'propagate': True
        },
    }
}


app = Sanic(__name__, log_config=LOG_SETTINGS)


@app.middleware('request')
async def set_request_id(request):
    request_id = request.headers.get('X-Request-ID') or str(uuid.uuid4())
    context.set("X-Request-ID", request_id)


@app.route("/")
async def test(request):
    log.debug('X-Request-ID: %s', context.get('X-Request-ID'))
    log.info('Hello from test!')
    return response.json({"test": True})


if __name__ == '__main__':
    asyncio.set_event_loop(uvloop.new_event_loop())
    server = app.create_server(host="0.0.0.0", port=8000, return_asyncio_server=True)
    loop = asyncio.get_event_loop()
    loop.set_task_factory(context.task_factory)
    task = asyncio.ensure_future(server)
    try:
        loop.run_forever()
    except:
        loop.stop()

Sanic流支持

Sanic 框架附带了对大文件流式处理的内置支持,下面的代码解释了设置 Sanic 支持流媒体的应用程序。

from sanic import Sanic
from sanic.views import CompositionView
from sanic.views import HTTPMethodView
from sanic.views import stream as stream_decorator
from sanic.blueprints import Blueprint
from sanic.response import stream, text

bp = Blueprint('blueprint_request_stream')
app = Sanic('request_stream')


class SimpleView(HTTPMethodView):

    @stream_decorator
    async def post(self, request):
        result = ''
        while True:
            body = await request.stream.get()
            if body is None:
                break
            result += body.decode('utf-8')
        return text(result)


@app.post('/stream', stream=True)
async def handler(request):
    async def streaming(response):
        while True:
            body = await request.stream.get()
            if body is None:
                break
            body = body.decode('utf-8').replace('1', 'A')
            await response.write(body)
    return stream(streaming)


@bp.put('/bp_stream', stream=True)
async def bp_handler(request):
    result = ''
    while True:
        body = await request.stream.get()
        if body is None:
            break
        result += body.decode('utf-8').replace('1', 'A')
    return text(result)


async def post_handler(request):
    result = ''
    while True:
        body = await request.stream.get()
        if body is None:
            break
        result += body.decode('utf-8')
    return text(result)

app.blueprint(bp)
app.add_route(SimpleView.as_view(), '/method_view')
view = CompositionView()
view.add(['POST'], post_handler, stream=True)
app.add_route(view, '/composition_view')


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

示例客户端应用程序,按客户端代码显示流式应用程序的用法。

import requests

# Warning: This is a heavy process.

data = ""
for i in range(1, 250000):
    data += str(i)

r = requests.post('http://0.0.0.0:8000/stream', data=data)
print(r.text)

SANIC并发支持

Sanic supports the ability to start an app with multiple worker support. However, it's important to be able to limit the concurrency per process/loop in order to ensure an efficient execution. The following section of the code provides a brief example of how to limit the concurrency with the help of asyncio.Semaphore

from sanic import Sanic
from sanic.response import json

import asyncio
import aiohttp

app = Sanic(__name__)

sem = None


@app.listener('before_server_start')
def init(sanic, loop):
    global sem
    concurrency_per_worker = 4
    sem = asyncio.Semaphore(concurrency_per_worker, loop=loop)

async def bounded_fetch(session, url):
    """
    Use session object to perform 'get' request on url
    """
    async with sem, session.get(url) as response:
        return await response.json()


@app.route("/")
async def test(request):
    """
    Download and serve example JSON
    """
    url = "https://api.github.com/repos/channelcat/sanic"

    async with aiohttp.ClientSession() as session:
        response = await bounded_fetch(session, url)
        return json(response)


app.run(host="0.0.0.0", port=8000, workers=2)

通过Docker的Sanic部署

部署一个 sanic 应用程序通过 dockerdocker-compose 是一个容易实现的任务,下面的示例提供了示例的部署 simple_server.py

FROM python:3.5
MAINTAINER Channel Cat <channelcat@gmail.com>

ADD . /code
RUN pip3 install git+https://github.com/channelcat/sanic

EXPOSE 8000

WORKDIR /code

CMD ["python", "simple_server.py"]
version: '2'
services:
  sanic:
    build: .
    ports:
      - "8000:8000"

监控和错误处理

Sanic 通过以下方式提供全局异常处理程序的可扩展裸最小实现: sanic.handlers.ErrorHandler . 此示例演示如何扩展它以启用某些自定义行为。

"""
Example intercepting uncaught exceptions using Sanic's error handler framework.
This may be useful for developers wishing to use Sentry, Airbrake, etc.
or a custom system to log and monitor unexpected errors in production.
First we create our own class inheriting from Handler in sanic.exceptions,
and pass in an instance of it when we create our Sanic instance. Inside this
class' default handler, we can do anything including sending exceptions to
an external service.
"""
from sanic.handlers import ErrorHandler
from sanic.exceptions import SanicException
"""
Imports and code relevant for our CustomHandler class
(Ordinarily this would be in a separate file)
"""


class CustomHandler(ErrorHandler):

    def default(self, request, exception):
        # Here, we have access to the exception object
        # and can do anything with it (log, send to external service, etc)

        # Some exceptions are trivial and built into Sanic (404s, etc)
        if not isinstance(exception, SanicException):
            print(exception)

        # Then, we must finish handling the exception by returning
        # our response to the client
        # For this we can just call the super class' default handler
        return super().default(request, exception)


"""
This is an ordinary Sanic server, with the exception that we set the
server's error_handler to an instance of our CustomHandler
"""

from sanic import Sanic

app = Sanic(__name__)

handler = CustomHandler()
app.error_handler = handler


@app.route("/")
async def test(request):
    # Here, something occurs which causes an unexpected exception
    # This exception will flow to our custom handler.
    raise SanicException('You Broke It!')

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000, debug=True)

使用外部服务提供商进行监控

import logging
import socket
from os import getenv
from platform import node
from uuid import getnode as get_mac

from logdna import LogDNAHandler

from sanic import Sanic
from sanic.response import json
from sanic.request import Request

log = logging.getLogger('logdna')
log.setLevel(logging.INFO)


def get_my_ip_address(remote_server="google.com"):
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.connect((remote_server, 80))
        return s.getsockname()[0]


def get_mac_address():
    h = iter(hex(get_mac())[2:].zfill(12))
    return ":".join(i + next(h) for i in h)


logdna_options = {
    "app": __name__,
    "index_meta": True,
    "hostname": node(),
    "ip": get_my_ip_address(),
    "mac": get_mac_address()
}

logdna_handler = LogDNAHandler(getenv("LOGDNA_API_KEY"), options=logdna_options)

logdna = logging.getLogger(__name__)
logdna.setLevel(logging.INFO)
logdna.addHandler(logdna_handler)

app = Sanic(__name__)


@app.middleware
def log_request(request: Request):
    logdna.info("I was Here with a new Request to URL: {}".format(request.url))


@app.route("/")
def default(request):
    return json({
        "response": "I was here"
    })


if __name__ == "__main__":
    app.run(
        host="0.0.0.0",
        port=getenv("PORT", 8080)
    )
from os import getenv

from raygun4py.raygunprovider import RaygunSender

from sanic import Sanic
from sanic.exceptions import SanicException
from sanic.handlers import ErrorHandler


class RaygunExceptionReporter(ErrorHandler):

    def __init__(self, raygun_api_key=None):
        super().__init__()
        if raygun_api_key is None:
            raygun_api_key = getenv("RAYGUN_API_KEY")

        self.sender = RaygunSender(raygun_api_key)

    def default(self, request, exception):
        self.sender.send_exception(exception=exception)
        return super().default(request, exception)


raygun_error_reporter = RaygunExceptionReporter()
app = Sanic(__name__, error_handler=raygun_error_reporter)


@app.route("/raise")
async def test(request):
    raise SanicException('You Broke It!')


if __name__ == '__main__':
    app.run(
        host="0.0.0.0",
        port=getenv("PORT", 8080)
    )
import rollbar

from sanic.handlers import ErrorHandler
from sanic import Sanic
from sanic.exceptions import SanicException
from os import getenv

rollbar.init(getenv("ROLLBAR_API_KEY"))


class RollbarExceptionHandler(ErrorHandler):

    def default(self, request, exception):
        rollbar.report_message(str(exception))
        return super().default(request, exception)


app = Sanic(__name__, error_handler=RollbarExceptionHandler())


@app.route("/raise")
def create_error(request):
    raise SanicException("I was here and I don't like where I am")


if __name__ == "__main__":
    app.run(
        host="0.0.0.0",
        port=getenv("PORT", 8080)
    )
from os import getenv

from sentry_sdk import init as sentry_init
from sentry_sdk.integrations.sanic import SanicIntegration

from sanic import Sanic
from sanic.response import json

sentry_init(
    dsn=getenv("SENTRY_DSN"),
    integrations=[SanicIntegration()],
)

app = Sanic(__name__)


# noinspection PyUnusedLocal
@app.route("/working")
async def working_path(request):
    return json({
        "response": "Working API Response"
    })


# noinspection PyUnusedLocal
@app.route("/raise-error")
async def raise_error(request):
    raise Exception("Testing Sentry Integration")


if __name__ == '__main__':
    app.run(
        host="0.0.0.0",
        port=getenv("PORT", 8080)
    )

安全性

下面的示例代码显示了一个简单的基于装饰器的身份验证和授权机制,可以设置该机制来保护 sanic API端点。

# -*- coding: utf-8 -*-

from sanic import Sanic
from functools import wraps
from sanic.response import json

app = Sanic()


def check_request_for_authorization_status(request):
    # Note: Define your check, for instance cookie, session.
    flag = True
    return flag


def authorized():
    def decorator(f):
        @wraps(f)
        async def decorated_function(request, *args, **kwargs):
            # run some method that checks the request
            # for the client's authorization status
            is_authorized = check_request_for_authorization_status(request)

            if is_authorized:
                # the user is authorized.
                # run the handler method and return the response
                response = await f(request, *args, **kwargs)
                return response
            else:
                # the user is not authorized.
                return json({'status': 'not_authorized'}, 403)
        return decorated_function
    return decorator


@app.route("/")
@authorized()
async def test(request):
    return json({'status': 'authorized'})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

SANIC网络套接字

Sanic 提供轻松添加路线并将其映射到 websocket 处理程序。

<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket demo</title>
    </head>
    <body>
        <script>
            var ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/feed'),
                messages = document.createElement('ul');
            ws.onmessage = function (event) {
                var messages = document.getElementsByTagName('ul')[0],
                    message = document.createElement('li'),
                    content = document.createTextNode('Received: ' + event.data);
                message.appendChild(content);
                messages.appendChild(message);
            };
            document.body.appendChild(messages);
            window.setInterval(function() {
                data = 'bye!'
                ws.send(data);
                var messages = document.getElementsByTagName('ul')[0],
                    message = document.createElement('li'),
                    content = document.createTextNode('Sent: ' + data);
                message.appendChild(content);
                messages.appendChild(message);
            }, 1000);
        </script>
    </body>
</html>
from sanic import Sanic
from sanic.response import file

app = Sanic(__name__)


@app.route('/')
async def index(request):
    return await file('websocket.html')


@app.websocket('/feed')
async def feed(request, ws):
    while True:
        data = 'hello!'
        print('Sending: ' + data)
        await ws.send(data)
        data = await ws.recv()
        print('Received: ' + data)


if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000, debug=True)

主机托管

from sanic import response
from sanic import Sanic
from sanic.blueprints import Blueprint

# Usage
# curl -H "Host: example.com" localhost:8000
# curl -H "Host: sub.example.com" localhost:8000
# curl -H "Host: bp.example.com" localhost:8000/question
# curl -H "Host: bp.example.com" localhost:8000/answer

app = Sanic()
bp = Blueprint("bp", host="bp.example.com")


@app.route('/', host=["example.com",
                      "somethingelse.com",
                      "therestofyourdomains.com"])
async def hello(request):
    return response.text("Some defaults")


@app.route('/', host="sub.example.com")
async def hello(request):
    return response.text("42")


@bp.route("/question")
async def hello(request):
    return response.text("What is the meaning of life?")


@bp.route("/answer")
async def hello(request):
    return response.text("42")

app.blueprint(bp)

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

具有并行测试运行支持的单元测试

下面的示例向您展示了如何启动和运行单元测试 sanic 提供并行测试执行支持的应用程序 pytest-xdist 插件。

"""pytest-xdist example for sanic server

Install testing tools:

    $ pip install pytest pytest-xdist

Run with xdist params:

    $ pytest examples/pytest_xdist.py -n 8  # 8 workers
"""
import re
from sanic import Sanic
from sanic.response import text
from sanic.testing import PORT as PORT_BASE, SanicTestClient
import pytest


@pytest.fixture(scope="session")
def test_port(worker_id):
    m = re.search(r'[0-9]+', worker_id)
    if m:
        num_id = m.group(0)
    else:
        num_id = 0
    port = PORT_BASE + int(num_id)
    return port


@pytest.fixture(scope="session")
def app():
    app = Sanic()

    @app.route('/')
    async def index(request):
        return text('OK')

    return app


@pytest.fixture(scope="session")
def client(app, test_port):
    return SanicTestClient(app, test_port)


@pytest.mark.parametrize('run_id', range(100))
def test_index(client, run_id):
    request, response = client._sanic_endpoint_test('get', '/')
    assert response.status == 200
    assert response.text == 'OK'

修改请求对象

这个 request 中的对象 Sanic 是一种 dict 对象,这意味着 request 对象可以作为常规对象进行操作 dict 对象。

from sanic import Sanic
from sanic.response import text
from random import randint

app = Sanic()


@app.middleware('request')
def append_request(request):
    # Add new key with random value
    request['num'] = randint(0, 100)


@app.get('/pop')
def pop_handler(request):
    # Pop key from request object
    num = request.pop('num')
    return text(num)


@app.get('/key_exist')
def key_exist_handler(request):
    # Check the key is exist or not
    if 'num' in request:
        return text('num exist in request')

    return text('num does not exist in reqeust')


app.run(host="0.0.0.0", port=8000, debug=True)

有关更多示例和有用示例,请访问 Huge-Sanic's GitHub Page