This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

常见实用程序- kombu.common

公共实用程序。

class kombu.common.Broadcast(name=None, queue=None, unique=False, auto_delete=True, exchange=None, alias=None, **kwargs)[源代码]

广播队列。

用于定义广播队列的便利类。

每个队列实例都将有一个唯一的名称,并且队列和交换都配置为自动删除。

论点:

Name(Str):用作交换的名称。Queue(Str):默认情况下,队列使用唯一ID

每个消费者的名字。您可以在此处指定自定义队列名称。

唯一(Bool):始终创建唯一队列

即使提供了队列名称也是如此。

**kwargs(Any):请参见 Queue 查看列表

支持的其他关键字参数。

attrs: tuple[tuple[str, Any], ...] = (('name', None), ('exchange', None), ('routing_key', None), ('queue_arguments', None), ('binding_arguments', None), ('consumer_arguments', None), ('durable', <class 'bool'>), ('exclusive', <class 'bool'>), ('auto_delete', <class 'bool'>), ('no_ack', None), ('alias', None), ('bindings', <class 'list'>), ('no_declare', <class 'bool'>), ('expires', <class 'float'>), ('message_ttl', <class 'float'>), ('max_length', <class 'int'>), ('max_length_bytes', <class 'int'>), ('max_priority', <class 'int'>), ('queue', None))
kombu.common.collect_replies(conn, channel, queue, *args, **kwargs)[源代码]

生成器从以下位置收集回复 queue

kombu.common.drain_consumer(consumer, limit=1, timeout=None, callbacks=None)[源代码]

从消费者实例中排出消息。

kombu.common.eventloop(conn, limit=None, timeout=None, ignore_timeouts=False)[源代码]

最佳实践生成器包装 Connection.drain_events

能够有限制地永远排出事件,并可选择忽略超时错误(超时1通常用于套接字可能“卡住”的环境,这是Kombu消费者的最佳实践)。

eventloop 是一台发电机。

示例

>>> from kombu.common import eventloop
>>> def run(conn):
...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
...     next(it)   # one event consumed, or timed out.
...
...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
...         pass  # loop forever.

它还接受可选的限制参数,并且默认情况下传播超时错误::

for _ in eventloop(connection, limit=1, timeout=1):
    pass
kombu.common.insured(pool, fun, args, kwargs, errback=None, on_revive=None, **opts)[源代码]

用于处理连接错误的函数包装。

确保在间歇性连接失败的情况下仍能完成执行Broker命令的功能。

kombu.common.itermessages(conn, channel, queue, limit=1, timeout=None, callbacks=None, **kwargs)[源代码]

消息上的迭代器。

kombu.common.maybe_declare(entity, channel=None, retry=False, **retry_policy)[源代码]

声明实体(缓存)。

kombu.common.send_reply(exchange, req, msg, producer=None, retry=False, retry_policy=None, **props)[源代码]

发送请求回复。

论点:

Exchange(kombu.Exchange,str):回复交换请求(~kombu.Message):原始请求,消息带有

一个 reply_to 财产。

Producer(kombu.Producer):Producer实例重试(Bool):如果为真,则必须根据

这个 reply_policy 争论。

RETRY_POLICY(DICT):重试设置。**道具(ANY):额外属性。

kombu.common.uuid(_uuid: ~typing.Callable[[], ~uuid.UUID] = <function uuid4>) str[源代码]

生成UUID4格式的唯一id。