falcon.media.msgpack 源代码

from __future__ import absolute_import  # NOTE(kgriffs): Work around a Cython bug

from typing import Union

from falcon import errors
from falcon.media.base import BaseHandler
from falcon.media.base import BinaryBaseHandlerWS


[文档]class MessagePackHandler(BaseHandler): """Handler built using the :py:mod:`msgpack` module. This handler uses ``msgpack.unpackb()`` and ``msgpack.Packer().pack()``. The MessagePack ``bin`` type is used to distinguish between Unicode strings (of type ``str``) and byte strings (of type ``bytes``). This handler will raise a :class:`falcon.MediaNotFoundError` when attempting to parse an empty body; it will raise a :class:`falcon.MediaMalformedError` if an error happens while parsing the body. Note: This handler requires the extra ``msgpack`` package (version 0.5.2 or higher), which must be installed in addition to ``falcon`` from PyPI: .. code:: $ pip install msgpack """ def __init__(self): import msgpack packer = msgpack.Packer(autoreset=True, use_bin_type=True) self._pack = packer.pack self._unpackb = msgpack.unpackb # NOTE(kgriffs): To be safe, only enable the optimized protocol when # not subclassed. if type(self) is MessagePackHandler: self._serialize_sync = self._pack self._deserialize_sync = self._deserialize def _deserialize(self, data): if not data: raise errors.MediaNotFoundError('MessagePack') try: # NOTE(jmvrbanac): Using unpackb since we would need to manage # a buffer for Unpacker() which wouldn't gain us much. return self._unpackb(data, raw=False) except ValueError as err: raise errors.MediaMalformedError('MessagePack') from err def deserialize(self, stream, content_type, content_length): return self._deserialize(stream.read()) async def deserialize_async(self, stream, content_type, content_length): return self._deserialize(await stream.read()) def serialize(self, media, content_type) -> bytes: return self._pack(media) async def serialize_async(self, media, content_type) -> bytes: return self._pack(media)
[文档]class MessagePackHandlerWS(BinaryBaseHandlerWS): """WebSocket media handler for de(serializing) MessagePack to/from BINARY payloads. This handler uses ``msgpack.unpackb()`` and ``msgpack.packb()``. The MessagePack ``bin`` type is used to distinguish between Unicode strings (of type ``str``) and byte strings (of type ``bytes``). Note: This handler requires the extra ``msgpack`` package (version 0.5.2 or higher), which must be installed in addition to ``falcon`` from PyPI: .. code:: $ pip install msgpack """ __slots__ = ['msgpack', 'packer'] def __init__(self): import msgpack packer = msgpack.Packer(autoreset=True, use_bin_type=True) self._pack = packer.pack self._unpackb = msgpack.unpackb def serialize(self, media: object) -> Union[bytes, bytearray, memoryview]: return self._pack(media) def deserialize(self, payload: bytes) -> object: # NOTE(jmvrbanac): Using unpackb since we would need to manage # a buffer for Unpacker() which wouldn't gain us much. return self._unpackb(payload, raw=False)