WebSocket(仅限ASGI)

Falcon建立在 ASGI WebSocket Specification 提供简单、严肃的WebSocket服务器实现。

同时支持这两种功能 WebSocketServer-Sent Events (SSE),Falcon促进了ASGI应用程序与Web浏览器、移动应用程序或其他客户端应用程序之间的实时、面向事件的通信。

备注

另请参阅 falcon.asgi.Response.sse 以了解有关Falcon的服务器发送事件(SSE)支持的更多信息。

使用

使用Falcon,您可以轻松地将WebSocket支持添加到ASGI应用程序中的任何路由,只需实现 on_websocket() 该路由的资源类中的响应者。与常规HTTP请求一样,可以使用中间件组件和媒体处理程序来增强WebSocket流。

当WebSocket握手到达时(通过标准HTTP请求),Falcon将首先像往常一样将其路由到特定的资源类实例。在此过程中,如果在为应用程序配置的任何中间件对象上实现了以下中间件方法,则将调用以下中间件方法:

class SomeMiddleware:
    async def process_request_ws(self, req, ws):
        """Process a WebSocket handshake request before routing it.

        Note:
            Because Falcon routes each request based on req.path, a
            request can be effectively re-routed by setting that
            attribute to a new value from within process_request().

        Args:
            req: Request object that will eventually be
                passed into an on_websocket() responder method.
            ws: The WebSocket object that will be passed into
                on_websocket() after routing.
        """

    async def process_resource_ws(self, req, ws, resource, params):
        """Process a WebSocket handshake request after routing.

        Note:
            This method is only called when the request matches
            a route to a resource.

        Args:
            req: Request object that will be passed to the
                routed responder.
            ws: WebSocket object that will be passed to the
                routed responder.
            resource: Resource object to which the request was
                routed.
            params: A dict-like object representing any additional
                params derived from the route's URI template fields,
                that will be passed to the resource's responder
                method as keyword arguments.
        """

如果找到所请求路径的路由,则框架将检查名为 on_websocket() 在目标资源上。如果找到响应器,则会以与常规调用类似的方式调用该响应器 on_get() 应答器,除了 falcon.asgi.WebSocket 对象,而不是类型为 falcon.asgi.Response

例如,给定一条包含 account_id 参数,则框架将期待一个 on_websocket() 与此类似的响应者:

async def on_websocket(self, req: Request, ws: WebSocket, account_id: str):
    pass

如果没有与WebSocket握手中请求的路径匹配的路由,则控制权将传递给默认响应器,该响应器只是引发 HTTPRouteNotFound 。默认情况下,此错误将呈现为具有3404关闭代码的403响应。可以通过添加自定义错误处理程序来修改此行为(另请参阅: add_error_handler() )。

类似地,如果存在路由,但目标资源没有实现 on_websocket() 响应器,框架将调用默认响应器,该响应器将引发 HTTPMethodNotAllowed 。默认情况下,此类将呈现为具有3405关闭代码的403响应。

丢失的连接

当应用程序尝试从客户端接收消息时,ASGI服务器会发出 disconnect 如果连接因任何原因丢失,则引发。Falcon通过引发 WebSocketDisconnected 给呼叫者。

另一方面,ASGI规范要求ASGI服务器在连接丢失后静默使用应用程序发送的消息(即,这不应被视为错误)。因此,主要将出站事件流式传输到客户端的终结点可能会在连接断开后持续一段时间不必要地消耗资源。

作为解决办法,Falcon实现一个小型传入消息队列,该队列用于检测丢失的连接,然后引发 WebSocketDisconnected 在呼叫方下次尝试发送消息时发送给呼叫方。

仅当应用程序本身不经常使用来自客户端的消息以快速检测连接丢失时,才需要此解决方法。否则,可以通过设置来禁用Falcon的接收队列,以稍微提高性能 max_receive_queue0 通过 ws_options

还要注意的是,在这方面,一些ASGI服务器实现并不严格遵循ASGI规范,事实上,当应用程序在客户端断开连接后尝试发送消息时会引发错误。如果测试显示您选择的ASGI服务器属于这种情况,则可以安全地禁用Falcon自己的接收队列。

错误处理

Falcon处理由 on_websocket() 以与其他应答者提出的错误类似的方式处理应答者的错误,但有以下注意事项。

首先,当调用自定义错误处理程序时,框架将传递 None 对于 resp 参数,而 WebSocket 表示当前连接的对象将作为名为的关键字参数传递 ws ::

async def my_error_handler(req, resp, ex, params, ws=None):
    # When invoked as a result of an error being raised by an
    #   on_websocket() responder, resp will be None and
    #   ws will be the same falcon.asgi.WebSocket object that
    #   was passed into the responder.
    pass

其次,需要注意的是,如果没有与WebSocket握手请求中的路径匹配的路由,或者匹配的资源没有实现 on_websocket() 响应器,则将调用默认的HTTP错误响应器,从而导致请求被拒绝,并显示 HTTP 403 响应和WebSocket关闭代码 3404 (未找到)或 3405 (不允许使用方法)。一般来说,如果默认响应者或 on_websocket() 引发 HTTPError ,则默认错误处理程序将关闭 WebSocket 与框架的连接关闭通过添加以下内容派生的代码 3000 到HTTP状态代码(例如, 3404 )。

最后,在一般未处理异常的情况下,将调用默认错误处理程序,该处理程序将尽最大努力清理连接,并使用标准WebSocket Close代码将其关闭 1011 (内部错误)。如果您的ASGI服务器不支持此代码,框架将使用代码 3011 相反,您也可以通过 error_close_code 的属性 ws_options

与任何响应者一样,应用程序的默认错误处理程序可以通过 add_error_handler()

媒体处理程序

默认情况下, send_media()receive_media() 对于文本有效负载,将序列化到JSON(和从JSON反序列化),对于二进制有效负载,将序列化到MessagePack或从MessagePack序列化(另请参阅: 内置媒体处理程序 )。

备注

为了使用默认的MessagePack处理程序,额外的 msgpack 除了必须安装软件包(版本0.5.2或更高版本)外,还必须安装 falcon 来自PyPI:

$ pip install msgpack

WebSocket媒体处理可以通过使用 falcon.asgi.App.ws_options 为一种或两种负载类型指定替代处理程序,如下例所示。

# Let's say we want to use a faster JSON library. You could also use this
#   pattern to add serialization support for custom types that aren't
#   normally JSON-serializable out of the box.
class RapidJSONHandler(falcon.media.TextBaseHandlerWS):
    def serialize(self, media: object) -> str:
        return rapidjson.dumps(media, ensure_ascii=False)

    # The raw TEXT payload will be passed as a Unicode string
    def deserialize(self, payload: str) -> object:
        return rapidjson.loads(payload)


# And/or for binary mode we want to use CBOR:
class CBORHandler(media.BinaryBaseHandlerWS):
    def serialize(self, media: object) -> bytes:
        return cbor2.dumps(media)

    # The raw BINARY payload will be passed as a byte string
    def deserialize(self, payload: bytes) -> object:
        return cbor2.loads(payload)

app = falcon.asgi.App()

# Expected to (de)serialize from/to str
json_handler = RapidJSONHandler()
app.ws_options.media_handlers[falcon.WebSocketPayloadType.TEXT] = json_handler

# Expected to (de)serialize from/to bytes, bytearray, or memoryview
cbor_handler = ProtocolBuffersHandler()
app.ws_options.media_handlers[falcon.WebSocketPayloadType.BINARY] = cbor_handler

这个 falcon 模块定义了以下内容 Enum 用于指定WebSocket负载类型的值:

falcon.WebSocketPayloadType.TEXT
falcon.WebSocketPayloadType.BINARY

扩展示例

下面是一个更全面的(尽管相当做作的)示例,它说明了应用程序与WebSocket连接交互的一些不同方式。此示例还介绍了框架引发的一些常见WebSocket错误。

import falcon.asgi
import falcon.media


class SomeResource:

    # Get a paginated list of events via a regular HTTP request.
    #
    #   For small-scale, all-in-one apps, it may make sense to support
    #   both a regular HTTP interface and one based on WebSocket
    #   side-by-side in the same deployment. However, these two
    #   interaction models have very different performance characteristics,
    #   and so larger scale-out deployments may wish to specifically
    #   designate instance groups for one type of traffic vs. the
    #   other (although the actual applications may still be capable
    #   of handling both modes).
    #
    async def on_get(self, req: Request, account_id: str):
        pass

    # Push event stream to client. Note that the framework will pass
    #   parameters defined in the URI template as with HTTP method
    #   responders.
    async def on_websocket(self, req: Request, ws: WebSocket, account_id: str):

        # The HTTP request used to initiate the WebSocket handshake can be
        #   examined as needed.
        some_header_value = req.get_header('Some-Header')

        # Reject it?
        if some_condition:
            # If close() is called before accept() the code kwarg is
            #   ignored, if present, and the server returns a 403
            #   HTTP response without upgrading the connection.
            await ws.close()
            return

        # Examine subprotocols advertised by the client. Here let's just
        #   assume we only support wamp, so if the client doesn't advertise
        #   it we reject the connection.
        if 'wamp' not in ws.subprotocols:
            # If close() is not called explicitly, the framework will
            #   take care of it automatically with the default code (1000).
            return

        # If, after examining the connection info, you would like to accept
        #   it, simply call accept() as follows:
        try:
            await ws.accept(subprotocol='wamp')
        except WebSocketDisconnected:
            return

        # Simply start sending messages to the client if this is an event
        #   feed endpoint.
        while True:
            try:
                event = await my_next_event()

                # Send an instance of str as a WebSocket TEXT (0x01) payload
                await ws.send_text(event)

                # Send an instance of bytes, bytearray, or memoryview as a
                #   WebSocket BINARY (0x02) payload.
                await ws.send_data(event)

                # Or if you want it to be serialized to JSON (by default; can
                #   be customized via app.ws_options.media_handlers):
                await ws.send_media(event)  # Defaults to WebSocketPayloadType.TEXT
            except WebSocketDisconnected:
                # Do any necessary cleanup, then bail out
                return

        # ...or loop like this to implement a simple request-response protocol
        while True:
            try:
                # Use this if you expect a WebSocket TEXT (0x01) payload,
                #   decoded from UTF-8 to a Unicode string.
                payload_str = await ws.receive_text()

                # Or if you are expecting a WebSocket BINARY (0x02) payload,
                #   in which case you will end up with a byte string result:
                payload_bytes = await ws.receive_data()

                # Or if you want to get a serialized media object (defaults to
                #   JSON deserialization of text payloads, and MessagePack
                #   deserialization for BINARY payloads, but this can be
                #   customized via app.ws_options.media_handlers).
                media_object = await ws.receive_media()

            except WebSocketDisconnected:
                # Do any necessary cleanup, then bail out
                return
            except TypeError:
                # The received message payload was not of the expected
                #   type (e.g., got BINARY when TEXT was expected).
                pass
            except json.JSONDecodeError:
                # The default media deserializer uses the json standard
                #   library, so you might see this error raised as well.
                pass

            # At any time, you may decide to close the websocket. If the
            #   socket is already closed, this call does nothing (it will
            #   not raise an error.)
            if we_are_so_done_with_this_conversation():
                # https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
                await ws.close(code=1000)
                return

            try:
                # Here we are sending as a binary (0x02) payload type, which
                #   will go find the handler configured for that (defaults to
                #   MessagePack which assumes you've also installed that
                #   package, but this can be customized as mentioned above.')
                await ws.send_media(
                    {'event': 'message'},
                    payload_type=WebSocketPayloadType.BINARY,
                )

            except WebSocketDisconnected:
                # Do any necessary cleanup, then bail out. If ws.close() was
                #   not already called by the app, the framework will take
                #   care of it.

                # NOTE: If you do not handle this exception, it will be
                #   bubbled up to a default error handler that simply
                #   logs the message as a warning and then closes the
                #   server side of the connection. This handler can be
                #   overridden as with any other error handler for the app.

                return

        # ...or run a couple of different loops in parallel to support
        #  independent bidirectional message streams.

        messages = collections.deque()

        async def sink():
            while True:
                try:
                    message = await ws.receive_text()
                except falcon.WebSocketDisconnected:
                    break

                messages.append(message)

        sink_task = falcon.create_task(sink())

        while not sink_task.done():
            while ws.ready and not messages and not sink_task.done():
                await asyncio.sleep(0)

            try:
                await ws.send_text(messages.popleft())
            except falcon.WebSocketDisconnected:
                break

        sink_task.cancel()
        try:
            await sink_task
        except asyncio.CancelledError:
            pass


class SomeMiddleware:
    async def process_request_ws(self, req: Request, ws: WebSocket):
        # This will be called for the HTTP request that initiates the
        #   WebSocket handshake before routing.
        pass

    async def process_resource_ws(self, req: Request, ws: WebSocket, resource, params):
        # This will be called for the HTTP request that initiates the
        #   WebSocket handshake after routing (if a route matches the
        #   request).
        pass


app = falcon.asgi.App(middleware=SomeMiddleware())
app.add_route('/{account_id}/messages', SomeResource())

测试

Falcon的测试框架包括支持使用 falcon.testing.ASGIConductor 类,如下面的示例所示。

# This context manages the ASGI app lifecycle, including lifespan events
async with testing.ASGIConductor(some_app) as c:
    async def post_events():
        for i in range(100):
            await c.simulate_post('/events', json={'id': i}):
            await asyncio.sleep(0.01)

    async def get_events_ws():
        # Simulate a WebSocket connection
        async with c.simulate_ws('/events') as ws:
            while some_condition:
                message = await ws.receive_text()

    asyncio.gather(post_events(), get_events_ws())

另请参阅: simulate_ws()

参考文献

WebSocket类

框架将以下类的实例传递到 on_websocket() 应答者。从概念上讲,该类取代了 falcon.asgi.Response 用于WebSocket连接的类。

class falcon.asgi.WebSocket(ver: str, scope: dict, receive: Callable[[], Awaitable[dict]], send: Callable[[dict], Awaitable], media_handlers: Mapping[WebSocketPayloadType, Union[BinaryBaseHandlerWS, TextBaseHandlerWS]], max_receive_queue: int)[源代码]

表示与客户端的单个WebSocket连接。

ready

True 如果已经接受了WebSocket连接并且客户端仍然连接, False 不然的话。

类型:

布尔

unaccepted

True 如果WebSocket连接尚未被接受, False 不然的话。

类型:

Bool)

closed

True 如果服务器已关闭WebSocket连接或客户端已断开连接。

类型:

布尔

subprotocols

客户端通告的子协议字符串列表,如果未指定子协议,则为空的元组。

类型:

元组 [str]

supports_accept_headers

True 如果托管应用程序的ASGI服务器在接受WebSocket连接时支持发送报头, False 不然的话。

类型:

布尔

async accept(subprotocol: Optional[str] = None, headers: Optional[Union[Iterable[Iterable[str]], Mapping[str, str]]] = None)[源代码]

接受传入的WebSocket连接。

如果在检查连接的属性(报头、通告的子协议等)之后请求应该被接受,那么响应者必须首先等待这个协程方法来完成WebSocket握手。或者,响应方可以通过等待 close() 方法。

关键字参数:
  • subprotocol (str) -- 客户端建议的协议列表中,应用程序希望接受的子协议。如果建议的协议中有多个是可接受的,则应选择客户端列表中的第一个协议(另请参阅: subprotocols )。如果未指定,则SEC-WebSocket-Protocol标头将不会包含在对客户端的响应中。在这种情况下,如果客户端没有接收到明确的协议选择,则它可以选择放弃连接。

  • headers (Iterable[[str, str]]) -- 一个可迭代的 [name: str, value: str] 两项迭代,表示要包括在握手响应中的HTTP标头的集合。两者都有 name价值 必须是类型 str 并且仅包含US-ASCII字符。或者,可以传递一个类似dict的对象,该对象实现 items() 方法。。。注意:仅实施SPEC 2.1版或更高版本的ASGI服务器支持此参数。如果一个应用程序需要与多个ASGI服务器兼容,它可以参考 supports_accept_headers 属性来确定宿主服务器是否支持此功能。

async close(code: Optional[int] = None) None[源代码]

关闭WebSocket连接。

此协程方法发送一个WebSocket CloseEvent 发送到客户端,然后继续实际关闭连接。

响应方还可以使用此方法简单地通过等待连接请求来拒绝连接请求,而不是 accept() 。在这种情况下,客户端将收到握手的HTTP 403响应。

关键字参数:

code (int) -- 对象使用的关闭代码。 CloseEvent (默认值为1000)。另请参阅:https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent

async receive_data() bytes[源代码]

从客户端接收具有二进制数据有效负载的消息。

挡路将等待此协同例程,直到有消息可用或WebSocket断开连接。

async receive_media() object[源代码]

从客户端接收反序列化对象。

传入有效负载类型确定将用于反序列化对象的媒体处理程序(另请参阅: 媒体处理程序 )。

async receive_text() str[源代码]

从客户端接收具有Unicode字符串负载的消息。

挡路将等待此协同例程,直到有消息可用或WebSocket断开连接。

async send_data(payload: Union[bytes, bytearray, memoryview]) None[源代码]

使用二进制数据负载向客户端发送消息。

参数:

payload (Union[bytes, bytearray, memoryview]) -- 要发送的二进制数据。

async send_media(media: object, payload_type: WebSocketPayloadType = WebSocketPayloadType.TEXT) None[源代码]

将可序列化对象发送到客户端。

有效负载类型确定将用于序列化给定对象的媒体处理程序(另请参阅: 媒体处理程序 )。

参数:

media (object) -- 要发送的对象。

关键字参数:

payload_type (falcon.WebSocketPayloadType) -- 用于消息的有效负载类型(默认 falcon.WebSocketPayloadType.TEXT )。必须是以下之一:..代码::Python Falcon.WebSocketPayloadType.TEXT Falcon.WebSocketPayloadType.BINARY

async send_text(payload: str) None[源代码]

使用Unicode字符串负载向客户端发送消息。

参数:

payload (str) -- 要发送的字符串。

内置媒体处理程序

class falcon.media.TextBaseHandlerWS[源代码]

WebSocket文本媒体处理程序的抽象基类。

deserialize(payload: str) object[源代码]

从Unicode字符串反序列化文本有效负载。

默认情况下,此方法引发 NotImplementedError 。因此,如果子类希望支持从文本(0x01)消息有效负载反序列化,则必须重写它。

参数:

payload (str) -- 要反序列化的消息负载。

返回:

反序列化对象。

返回类型:

object

serialize(media: object) str[源代码]

将媒体对象序列化为Unicode字符串。

默认情况下,此方法引发 NotImplementedError 。因此,如果子类希望支持序列化为文本(0x01)消息有效负载,则必须重写它。

参数:

media (object) -- 可序列化对象。

返回:

从输入对象得到的序列化字符串。

返回类型:

str

class falcon.media.BinaryBaseHandlerWS[源代码]

WebSocket二进制媒体处理程序的抽象基类。

deserialize(payload: bytes) object[源代码]

从字节字符串反序列化二进制有效负载。

默认情况下,此方法引发 NotImplementedError 。因此,如果子类希望支持从二进制(0x02)消息有效负载进行反序列化,则必须重写它。

参数:

payload (bytes) -- 要反序列化的消息负载。

返回:

反序列化对象。

返回类型:

object

serialize(media: object) Union[bytes, bytearray, memoryview][源代码]

将媒体对象序列化为字节字符串。

默认情况下,此方法引发 NotImplementedError 。因此,如果子类希望支持序列化到二进制(0x02)消息有效负载,则必须重写它。

参数:

media (object) -- 可序列化对象。

返回:

输入对象中产生的序列化字节字符串。可能是 bytesbytearray ,或 memoryview

返回类型:

bytes

class falcon.media.JSONHandlerWS(dumps=None, loads=None)[源代码]

WebSocket媒体处理程序,用于将JSON反(序列化)到文本负载或从文本有效负载反(序列化)JSON。

此处理程序使用Python的标准 json 库在默认情况下,但可以根据需要轻松配置为使用许多第三方JSON库中的任何一个。例如,通过使用替代库,您通常可以在cpython下实现显著的性能提升。在这方面的好选择包括 orjsonpython-rapidjsonmujson .

备注

如果您要部署到Pypy,我们建议您坚持使用标准库的JSON实现,因为与第三方库相比,它在大多数情况下都会更快。

覆盖默认的JSON实现只是指定所需的 dumpsloads 功能::

import falcon
from falcon import media

import rapidjson

json_handler = media.JSONHandlerWS(
    dumps=rapidjson.dumps,
    loads=rapidjson.loads,
)

app = falcon.asgi.App()
app.ws_options.media_handlers[falcon.WebSocketPayloadType.TEXT] = json_handler

默认情况下, ensure_ascii 传递给 json.dumps 功能。如果您重写 dumps 函数,则需要显式设置 ensure_asciiFalse 以便能够将Unicode字符序列化为UTF-8。这可以通过使用 functools.partial 若要应用所需的关键字参数,请执行以下操作。实际上,您可以使用同样的技术来自定义 dumpsloads 功能::

from functools import partial

from falcon import media
import rapidjson

json_handler = media.JSONHandlerWS(
    dumps=partial(
        rapidjson.dumps,
        ensure_ascii=False, sort_keys=True
    ),
)
关键字参数:
  • dumps (func) -- 序列化JSON时使用的函数。

  • loads (func) -- 反序列化JSON时使用的函数。

class falcon.media.MessagePackHandlerWS[源代码]

WebSocket媒体处理程序,用于将MessagePack反(序列化)到/从二进制有效负载反(序列化)MessagePack。

此处理程序使用 msgpack.unpackb()msgpack.packb() 。MessagePack bin 类型用于区分Unicode字符串(类型为 str )和字节字符串(类型为 bytes )。

备注

此处理程序需要额外的 msgpack 软件包(0.5.2或更高版本),必须安装在 falcon 来自Pypi:

$ pip install msgpack

错误类型

class falcon.WebSocketDisconnected(code: Optional[int] = None)[源代码]

WebSocket连接丢失。

尝试在WebSocket上执行操作并确定客户端已关闭连接、服务器已关闭连接或套接字已丢失时,会引发此错误。

关键字参数:

code (int) -- WebSocket关闭代码,根据WebSocket规范(默认 1000 )。

code

WebSocket关闭代码,符合WebSocket规范。

类型:

利息

class falcon.WebSocketPathNotFound(code: Optional[int] = None)[源代码]

找不到请求路径的路由。

已尝试模拟WebSocket连接,但握手请求中指定的路径与应用程序的任何路由都不匹配。

class falcon.WebSocketHandlerNotFound(code: Optional[int] = None)[源代码]

路由的资源不包含 on_websocket() 操作员。

class falcon.WebSocketServerError(code: Optional[int] = None)[源代码]

服务器遇到意外错误。

class falcon.PayloadTypeError[源代码]

WebSocket消息负载不是预期类型。

选项

class falcon.asgi.WebSocketOptions[源代码]

定义一组可配置的WebSocket选项。

此类的实例通过 falcon.asgi.App.ws_options 为了配置某些 WebSocket 行为。

error_close_code

处理WebSocket连接时引发未处理错误时使用的WebSocket关闭代码(默认 1011 )。有关有效关闭代码和范围的列表,另请参阅:https://tools.ietf.org/html/rfc6455#section-7.4

类型:

利息

media_handlers

一个类似字典的对象,用于根据给定消息的WebSocket有效负载类型(文本与二进制)配置媒体处理程序。另请参阅: 媒体处理程序

类型:

双关语

max_receive_queue

如果接收速率超过应用程序的消耗速率,则要入队的最大传入消息数(默认值 4 )。当达到此限制时,框架将等待接受来自ASGI服务器的新消息,直到应用程序能够赶上为止。

此限制适用于Falcon的传入消息队列,通常应保持较小,因为ASGI服务器维护其自己的接收队列。通过设置,可以完全禁用Falcon的队列 max_receive_queue0 (另请参阅: 丢失的连接 )。

类型:

利息