This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

Kombu- kombu

用于Python的消息库。

kombu.enable_insecure_serializers(choices=<object object>)[源代码]

启用被认为不安全的序列化程序。

注:

将启用 pickleyamlmsgpack 默认情况下,但您也可以指定要启用的序列化程序列表(按名称或内容类型)。

kombu.disable_insecure_serializers(allowed=<object object>)[源代码]

禁用不受信任的序列化程序。

将禁用所有序列化程序,但 json 或者,您可以指定要允许的反序列化程序列表。

注:

生产者仍然能够以这些格式序列化数据,但消费者不会接受使用不受信任的内容类型的传入数据。

连接

class kombu.Connection(hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs)[源代码]

与经纪人的联系。

示例:

>>> Connection('amqp://guest:guest@localhost:5672//')
>>> Connection('amqp://foo;amqp://bar',
...            failover_strategy='round-robin')
>>> Connection('redis://', transport_options={
...     'visibility_timeout': 3000,
... })
>>> import ssl
>>> Connection('amqp://', login_method='EXTERNAL', ssl={
...    'ca_certs': '/etc/pki/tls/certs/something.crt',
...    'keyfile': '/etc/something/system.key',
...    'certfile': '/etc/something/system.cert',
...    'cert_reqs': ssl.CERT_REQUIRED,
... })

注:

目前,SSL仅适用于Py-AMQP和QPID传输。对于其他交通工具,您可以使用stantnel。

论点:

URL(字符串,序列):代理URL,或URL列表。

关键字参数:

SSL(bool/dict):使用SSL连接到服务器。

缺省值为 False 。可能不受指定传输的支持。

传输(Transport):如果未在URL中指定,则为默认传输。CONNECT_TIMEOUT(FLOAT):连接到

伺服器。可能不受指定传输的支持。

TRANSPORTS_OPTIONS(DICT):附加连接参数的DICT

传递到备用Kombu通道实现。有关可用选项,请查阅运输文档。

心跳(Float):心跳间隔,单位为Int/Float秒。

请注意,如果启用了心跳,则 heartbeat_check() 方法必须定期调用,大约每秒调用一次。

注:

当需要时,连接被延迟地建立。如果需要建立连接,则通过调用 connect() **

>>> conn = Connection('amqp://')
>>> conn.connect()

并始终记住关闭连接::

>>> conn.release()

这些选项已被URL参数替换,但仍支持向后兼容:

关键字主机名:

主机名/地址。注意:不能同时指定URL参数和使用HOSTNAME关键字参数。

关键字用户ID:

默认用户名(如果URL中未提供)。

关键字密码:

默认密码(如果URL中未提供)。

关键字VALUAL_HOST:

默认虚拟主机(如果URL中未提供)。

关键字端口:

默认端口(如果URL中未提供)。

属性

hostname = None
port = None
userid = None
password = None
virtual_host = '/'
ssl = None
login_method = None
failover_strategy = 'round-robin'

用于在连接失败后重新连接时选择新主机的策略。“循环”、“洗牌”或任何定制迭代器之一,不断产生新的URL以供尝试。

connect_timeout = 5
heartbeat = None

心跳数值,目前仅受Py-AMQP传输支持。

default_channel

默认频道。

在访问时创建,并在连接关闭时关闭。

注:

当您只需要一个通道时,可以用于自动通道处理,如果将连接而不是通道传递给需要通道的函数,则它也是隐式使用的通道。

connected

如果已建立连接,则返回TRUE。

recoverable_connection_errors

可恢复的连接错误。

可以从中恢复但必须首先关闭并重新建立连接的与连接相关的异常的列表。

recoverable_channel_errors

可恢复的通道错误。

无需重新建立连接即可自动恢复的与通道相关的异常列表。

connection_errors

连接可能引发的异常列表。

channel_errors

通道可能引发的异常列表。

transport
connection

基础连接对象。

警告:

该实例是特定于传输的,因此不依赖于该对象的接口。

uri_prefix = None
declared_entities = None

声明实体的缓存是针对每个连接的,以防服务器丢失数据。

cycle = None

迭代器返回在连接失败时尝试的下一个代理URL(由 failover_strategy )。

host

以冒号分隔的主机名/端口对形式的主机。

manager

AMQP管理API。

试验性管理器,可用于管理/监视Broker实例。

并非所有运输工具都可用。

supports_heartbeats
is_evented

方法

as_uri(include_password=False, mask='**', getfields=operator.itemgetter('port', 'userid', 'password', 'virtual_host', 'transport')) str[源代码]

将连接参数转换为URL形式。

connect()[源代码]

立即建立与服务器的连接。

channel()[源代码]

创建并返回一个新频道。

drain_events(**kwargs)[源代码]

等待来自服务器的单个事件。

论点:

Timeout(Float):放弃前的超时时间(秒)。

引发socket.timeout:

如果超过超时。:

release()[源代码]

关闭连接(如果打开)。

autoretry(fun, channel=None, **ensure_options)[源代码]

支持的函数的装饰符 channel 关键字参数。

如果生成的可调用函数引发连接或通道相关错误,则它将重试调用该函数。返回值将是 (retval, last_created_channel)

如果一个 channel 如果没有提供,则会自动获取一个(记得在之后关闭它)。

示例:

>>> channel = connection.channel()
>>> try:
...    ret, channel = connection.autoretry(
...         publish_messages, channel)
... finally:
...    channel.close()
ensure_connection(*args, **kwargs)[源代码]

用于追溯兼容性的公共接口_Assue_Connection。

返回kombu.Connection实例。

ensure(obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None, retry_errors=None)[源代码]

确保操作完成。

而不管发生任何通道/连接错误。

通过建立连接并重新应用函数来重试。

论点:

OBJ:确保对其执行操作的对象。Fun(Callable):要应用的方法。

Errback(Callable):可选的回调

无法建立连接。提供的参数是引发的异常和将休眠的间隔 (exc, interval)

MAX_RETRIES(Int):重试的最大次数。

如果超过此限制,将重新引发连接错误。

INTERVAL_START(FLOAT):我们开始的秒数

睡了一觉。

INTERVAL_STEP(FLOAT):间隔增加了多少秒

每一次重试。

Interval_max(浮点数):休眠间隔的最大秒数

每次重试。

On_revive(Callable):可选的回调

恢复成功完成

RETRY_ERROR(元组):要重试的可选错误列表

而不管连接状态如何。

示例

>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
>>> def errback(exc, interval):
...     logger.error('Error: %r', exc, exc_info=1)
...     logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish,
...                       errback=errback, max_retries=3)
>>> publish({'hello': 'world'}, routing_key='dest')
revive(new_channel)[源代码]

在重新建立连接后恢复连接。

create_transport()[源代码]
get_transport_cls()[源代码]

获取当前使用的传输类。

clone(**kwargs)[源代码]

使用相同的设置创建连接的副本。

info()[源代码]

获取连接信息。

switch(conn_str)[源代码]

切换连接参数以使用新的URL或主机名。

注:

不会重新连接!

论点:

Conn_str(Str):主机名或URL。

maybe_switch_next()[源代码]

切换到当前故障转移策略指定的下一个URL。

heartbeat_check(rate=2)[源代码]

检查一下心跳。

允许传输器执行使心跳工作所需的任何周期性任务。这应该大约每秒都被调用。

如果当前传输不支持心跳,则这是noop操作。

论点:

Rate(Int):rate是调用记号的频率

与实际的心跳值进行比较。例如,如果心跳设置为3秒,并且每隔3/2秒调用一次滴答,则速率为2。该值当前未被任何传输使用。

maybe_close_channel(channel)[源代码]

关闭给定的通道,但忽略连接和通道错误。

register_with_event_loop(loop)[源代码]
close()

关闭连接(如果打开)。

_close()[源代码]

真正紧密的连接,即使是连接池的一部分。

completes_cycle(retries)[源代码]

如果循环在以下次数后完成,则返回TRUE retries

get_manager(*args, **kwargs)[源代码]
Producer(channel=None, *args, **kwargs)[源代码]

创造新 kombu.Producer 举个例子。

Consumer(queues=None, channel=None, *args, **kwargs)[源代码]

创造新 kombu.Consumer 举个例子。

Pool(limit=None, **kwargs)[源代码]

连接池。

论点:

Limit(Int):最大活动连接数。

默认设置为无限制。

示例:

>>> connection = Connection('amqp://')
>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "kombu/connection.py", line 354, in acquire
  raise ConnectionLimitExceeded(self.limit)
    kombu.exceptions.ConnectionLimitExceeded: 2
>>> c1.release()
>>> c3 = pool.acquire()
ChannelPool(limit=None, **kwargs)[源代码]

频道池。

论点:

Limit(Int):活动通道的最大数量。

默认设置为无限制。

示例:

>>> connection = Connection('amqp://')
>>> pool = connection.ChannelPool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "kombu/connection.py", line 354, in acquire
  raise ChannelLimitExceeded(self.limit)
    kombu.connection.ChannelLimitExceeded: 2
>>> c1.release()
>>> c3 = pool.acquire()
SimpleQueue(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[源代码]

简单的持久队列API。

创造新 SimpleQueue ,使用来自该连接的通道。

如果 name 是一个字符串,则将使用该名称作为队列和交换的名称自动创建队列和交换,并将其用作默认路由键。

论点:

Name(str,kombu.Queue):队列的名称。No_ack(Bool):禁用确认。默认值为FALSE。Queue_opts(Dict):传递给

自动创建的 Queue

Queue_args(Dict):传递给

自动创建的 Queue 用于设置实现扩展(例如在RabbitMQ中)。

Exchange_OPTS(Dict):传递给

自动创建的 Exchange

Channel(Channel):要使用的自定义频道。如果未指定,则

使用连接默认通道。

SimpleBuffer(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[源代码]

简单的临时队列API。

创造新 SimpleQueue 使用来自此连接的通道。

交易所

创建交换声明的示例::

>>> news_exchange = Exchange('news', type='topic')

暂时 news_exchange 只是一个声明,您不能对其执行操作。它只描述了交易所的名称和选项。

交换可以是绑定的,也可以是未绑定的。绑定表示交换与通道相关联,并且可以在其上执行操作。要绑定交换,您可以使用通道作为参数调用交换::

>>> bound_exchange = news_exchange(channel)

现在您可以执行如下操作 declare()delete() **

>>> # Declare exchange manually
>>> bound_exchange.declare()

>>> # Publish raw string message using low-level exchange API
>>> bound_exchange.publish(
...     'Cure for cancer found!',
...     routing_key='news.science',
... )

>>> # Delete exchange.
>>> bound_exchange.delete()
class kombu.Exchange(name='', type='', channel=None, **kwargs)[源代码]

一份交易所声明。

论点:

名称(字符串):请参见 name 。文字(字符串):请参见 type 。Channel(kombu.Connection,Channel):请参见 channel 。耐久(Bool):请参见 durable 。AUTO_DELETE(布尔):请参见 auto_delete 。Delivery_模式(枚举):请参阅 delivery_mode 。参数(DICT):请参见 arguments 。NO_DECLARE(Bool):请参见 no_declare

name(str)

默认为无名称(默认交换)。

类型:

交易所的名称。

type(str)

This description of AMQP exchange types was shamelessly stolen from the blog post `AMQP in 10 minutes: Part 4`_ by Rajith Attapattu. Reading this article is recommended if you're new to amqp.

“AMQP定义了四种默认交换类型(路由算法),涵盖了大多数常见的消息传递用例。AMQP Broker还可以定义其他交换类型,因此有关可用交换类型的详细信息,请参阅您的Broker手册。

  • direct ( default )

    消息中的路由关键字与队列绑定到此交换时使用的路由条件之间的直接匹配。

  • topic

    路由键与交换/队列绑定中指定的路由模式之间的通配符匹配。路由键被视为由分隔的零个或多个单词 "." 并支持特殊通配符。 "*" 匹配单个单词,并且 "#" 匹配零个或多个单词。

  • fanout

    队列在没有参数的情况下绑定到此交换。因此,发送到此交换的任何消息都将被转发到绑定到此交换的所有队列。

  • headers

    队列通过包含标头和值(可选)的参数表绑定到此交换。名为“x-Match”的特殊参数确定匹配算法,其中 "all" 隐含着一个 AND (所有配对必须匹配)和 "any" 暗示 OR (必须至少有一对匹配)。

    arguments 用于指定参数。

Channel(Channel):交换绑定到的通道(如果绑定)。

持久(Bool):持久交换在服务器重新启动时保持活动状态。

当服务器重新启动时,非持久交换(临时交换)将被清除。缺省值为 True

AUTO_DELETE(Bool):如果设置,则在所有队列

已经用完了。缺省值为 False

DELIVERY_MODE(枚举):消息使用的默认传递模式。

该值是一个整数或别名字符串。

  • 1或 "transient"

    这条消息是暂时的。这意味着它只存储在内存中,如果服务器死机或重新启动,它就会丢失。

  • 2或“永久”( default )

    这条消息是持久的。这意味着消息既存储在内存中,也存储在磁盘上,因此在服务器死机或重新启动时保留。

默认值为2(永久)。

Arguments(Dict):指定何时交换的其他参数

被宣布为。

NO_DECLARE(Bool):从不声明此交换

(declare() 什么都不做)。

maybe_bind(channel: Channel | Connection) _MaybeChannelBoundType

将实例绑定到通道(如果尚未绑定)。

Message(body, delivery_mode=None, properties=None, **kwargs)[源代码]

创建要与其一起发送的消息实例 publish()

论点:

Body(Any):消息正文。

DELIVERY_MODE(Bool):设置自定义交付模式。

默认为 delivery_mode

优先级(Int):消息优先级,0为已配置的代理

最大优先级,越高越好。

Content_type(Str):消息内容_类型。如果为Content_Type

则不会发生序列化,因为假定这是一个二进制对象,或者您已经完成了自己的序列化。如果使用内置序列化,则保留为空,因为我们的库正确设置了CONTENT_TYPE。

CONTENT_ENCODING(Str):该对象所在的字符集

是经过编码的。如果发送原始二进制对象,请使用“二进制”。如果使用内置序列化,因为我们的库正确设置了CONTENT_ENCODING,则将其留空。

属性(Dict):消息属性。

Headers(Dict):消息标头。

PERSISTENT_DELIVERY_MODE = 2
TRANSIENT_DELIVERY_MODE = 1
attrs: tuple[tuple[str, Any], ...] = (('name', None), ('type', None), ('arguments', None), ('durable', <class 'bool'>), ('passive', <class 'bool'>), ('auto_delete', <class 'bool'>), ('delivery_mode', <function Exchange.<lambda>>), ('no_declare', <class 'bool'>))
auto_delete = False
bind_to(exchange='', routing_key='', arguments=None, nowait=False, channel=None, **kwargs)[源代码]

将交换绑定到另一个交换。

论点:

NoWait(Bool):如果设置,服务器将不响应,并且调用

不会阻止等待回复。缺省值为 False

binding(routing_key='', arguments=None, unbind_arguments=None)[源代码]
property can_cache_declaration

布尔(X)->布尔

当参数x为True时返回True,否则返回False。内置的True和False是bool类的仅有的两个实例。类bool是int类的子类,不能作为子类。

declare(nowait=False, passive=None, channel=None)[源代码]

申报交易所。

在代理上创建交换,除非设置了被动,在这种情况下,它将只断言交换存在。

论据:
NoWait(Bool):如果设置,服务器将不响应,并且

我们不会等待回复。缺省值为 False

delete(if_unused=False, nowait=False)[源代码]

删除服务器上的交换声明。

论点:

IF_UNUSED(Bool):仅当交换没有绑定时才删除。

缺省值为 False

NoWait(Bool):如果设置,服务器将不响应,并且

我们不会等待回复。缺省值为 False

delivery_mode = None
durable = True
name = ''
no_declare = False
passive = False
publish(message, routing_key=None, mandatory=False, immediate=False, exchange=None)[源代码]

发布消息。

论点:

消息(联合 [kombu.Message, str, bytes] ):

要发布的消息。

ROUTING_KEY(Str):消息路由键。必填项(Bool):目前不支持。Immediate(Bool):当前不支持。

type = 'direct'
unbind_from(source='', routing_key='', nowait=False, arguments=None, channel=None)[源代码]

从服务器中删除以前创建的Exchange绑定。

队列

中使用交换创建队列的示例 Exchange 示例::

>>> science_news = Queue('science_news',
...                      exchange=news_exchange,
...                      routing_key='news.science')

暂时 science_news 只是一个声明,您不能对其执行操作。它只描述了队列的名称和选项。

队列可以绑定或解除绑定。绑定表示队列与通道相关联,并且可以对其执行操作。要绑定队列,您可以使用通道作为参数调用队列实例::

>>> bound_science_news = science_news(channel)

现在您可以执行如下操作 declare()purge()

>>> bound_science_news.declare()
>>> bound_science_news.purge()
>>> bound_science_news.delete()
class kombu.Queue(name='', exchange=None, routing_key='', channel=None, bindings=None, on_declared=None, **kwargs)[源代码]

队列声明。

论点:

名称(字符串):请参见 name 。Exchange(Exchange,str):请参阅 exchange 。ROUTING_KEY(字符串):参见 routing_key 。Channel(kombu.Connection,Channel):请参见 channel 。耐久(Bool):请参见 durable 。独家(Bool):请参阅 exclusive 。AUTO_DELETE(布尔):请参见 auto_delete 。Queue_Arguments(Dict):请参见 queue_arguments 。BINDING_ARGUMENTS(Dict):请参见 binding_arguments 。Consumer_Arguments(Dict):请参阅 consumer_arguments 。NO_DECLARE(Bool):请参见 no_declare 。ON_DECLARATED(可调用):请参见 on_declared 。Expires(浮动):请参见 expires 。Message_ttl(浮点):请参见 message_ttl 。最大长度(Int):请参见 max_length 。最大长度字节数(Int):请参见 max_length_bytes 。MAX_PRIORITY(Int):请参见 max_priority

name(str)

默认情况下没有名称(默认队列目标)。

类型:

队列的名称。

exchange(Exchange)
类型:

这个 Exchange 队列绑定到。

routing_key(str)

路由键的解释取决于 Exchange.type

  • 直接兑换

    如果消息的路由键属性与 routing_key 属性是相同的。

  • 扇出交换

    始终匹配,即使绑定没有键。

  • 话题交流

    通过基元模式匹配方案匹配消息的路由键属性。然后,消息路由键由由点分隔的单词组成 ("." ,如域名),并且有两个特殊字符可用;星号 ("*" )和散列 ("#" )。星号匹配任何单词,而散列匹配零个或多个单词。例如 "*.stock.#" 匹配路由关键字 "usd.stock""eur.stock.db" 但不是 "stock.nasdaq"

类型:

路由密钥(如果有),也称为 binding key

channel(ChannelT)
类型:

队列绑定到的通道(如果绑定)。

durable(bool)

如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。

缺省值为 True

类型:

服务器重新启动时,持久队列保持活动状态。

exclusive(bool)

当前连接。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。

缺省值为 False

类型:

独占队列只能由

auto_delete(bool)

已经用完了。最后一个用户可以显式取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。

类型:

如果设置,则在所有使用者

expires(float)

队列应该过期。

过期时间决定队列在被自动删除之前可以保持多长时间不使用。 Unused 表示队列没有使用者,队列尚未重新声明,并且 Queue.get 至少在到期期限内未被调用。

请参阅https://www.rabbitmq.com/ttl.html#queue-ttl

RabbitMQ extension :仅在使用RabbitMQ时可用。

类型:

设置此事件的过期时间(以秒为单位

message_ttl(float)

此设置控制消息可以在队列中保留多长时间而不被使用。如果在消息使用者收到该消息之前已过了过期时间,则该消息将被删除,并且不会有任何使用者看到该消息。

请参阅https://www.rabbitmq.com/ttl.html#per-queue-message-ttl

RabbitMQ extension :仅在使用RabbitMQ时可用。

类型:

消息生存时间(秒)。

max_length(int)

排队可以坚持。

如果队列大小中的消息数超过此限制,则新消息将被丢弃(如果死信交换处于活动状态,则为死信)。

请参阅https://www.rabbitmq.com/maxlength.html

RabbitMQ extension :仅在使用RabbitMQ时可用。

类型:

设置的最大消息数

max_length_bytes(int)

队列中的消息的数量。

如果队列中所有消息的总大小超过此限制,则新消息将被丢弃(如果死信交换处于活动状态,则为死信)。

RabbitMQ extension :仅在使用RabbitMQ时可用。

类型:

设置总计的最大大小(以字节为单位

max_priority(int)

例如,如果值为10,则可以传递到此队列的消息可以具有 priority 介于0和10之间的值,其中10是最高优先级。

未设置最大优先级的RabbitMQ队列将忽略消息中的优先级字段,因此,如果需要优先级,则需要设置最大优先级字段以将队列声明为优先级队列。

RabbitMQ extension :仅在使用RabbitMQ时可用。

类型:

设置此队列的最高优先级数。

queue_arguments(Dict)

排队。可用于设置RabbitMQ/AMQP的参数值 queue.declare

类型:

声明时使用的其他参数

binding_arguments(Dict)

排队。可用于设置RabbitMQ/AMQP的参数值 queue.declare

类型:

绑定时使用的其他参数

consumer_arguments(Dict)

从这个队列中。可用于设置RabbitMQ/AMQP的参数值 basic.consume

类型:

使用时使用的其他参数

alias(str)

其中,例如为具有自动生成的队列名称的队列提供备用名称。

类型:

在Kombu中未使用,但应用程序可以利用

on_declared(Callable)

队列已声明( queue_declare 操作完成)。此函数必须具有至少接受3个位置参数的签名: (name, messages, consumers)

类型:

时要应用的可选回调

no_declare(bool)

实体 (declare() 什么都不做)。

类型:

永远不要声明此队列,也不要相关

maybe_bind(channel: Channel | Connection) _MaybeChannelBoundType

将实例绑定到通道(如果尚未绑定)。

exception ContentDisallowed

消费者不允许此内容类型。

as_dict(recurse=False)[源代码]
attrs: tuple[tuple[str, Any], ...] = (('name', None), ('exchange', None), ('routing_key', None), ('queue_arguments', None), ('binding_arguments', None), ('consumer_arguments', None), ('durable', <class 'bool'>), ('exclusive', <class 'bool'>), ('auto_delete', <class 'bool'>), ('no_ack', None), ('alias', None), ('bindings', <class 'list'>), ('no_declare', <class 'bool'>), ('expires', <class 'float'>), ('message_ttl', <class 'float'>), ('max_length', <class 'int'>), ('max_length_bytes', <class 'int'>), ('max_priority', <class 'int'>))
auto_delete = False
bind(channel)[源代码]

创建绑定到频道的实例的副本。

bind_to(exchange='', routing_key='', arguments=None, nowait=False, channel=None)[源代码]
property can_cache_declaration

布尔(X)->布尔

当参数x为True时返回True,否则返回False。内置的True和False是bool类的仅有的两个实例。类bool是int类的子类,不能作为子类。

cancel(consumer_tag)[源代码]

按消费者标签取消消费者。

consume(consumer_tag='', callback=None, no_ack=None, nowait=False)[源代码]

启动队列使用者。

消费者的存续期和他们创建的渠道一样长,或者直到客户取消他们为止。

论点:

Consumer_tag(Str):消费者的唯一标识符。

使用者标记是连接的本地标记,因此两个客户端可以使用相同的使用者标记。如果此字段为空,服务器将生成唯一标记。

No_ack(Bool):如果启用,代理将自动

确认消息。

Nowait(Bool):不要等待回复。

回调(Callable):为每条传递的消息调用回调。

declare(nowait=False, channel=None)[源代码]

声明队列和交换,然后将队列绑定到交换。

delete(if_unused=False, if_empty=False, nowait=False)[源代码]

删除队列。

论点:

IF_UNUSED(Bool):如果设置,服务器将只删除队列

如果它没有消费者的话。如果队列有消费者,则会引发通道错误。

If_Empty(Bool):如果设置,服务器将仅在以下情况下删除队列

它是空的。如果它不为空,则会引发通道错误。

Nowait(Bool):不要等待回复。

durable = True
exchange = <unbound Exchange ''(direct)>
exclusive = False
classmethod from_dict(queue, **options)[源代码]
get(no_ack=None, accept=None)[源代码]

轮询服务器以获取新消息。

此方法使用同步对话提供对队列中消息的直接访问,该对话是为同步功能比性能更重要的特定类型的应用程序设计的。

返回:

~kombu.Message --或 None 否则的话。

返回类型:

if a message was available,

论点:

No_ack(Bool):如果启用,代理将

自动确认消息。

接受(设置 [str] ):可接受内容类型的自定义列表。

name = ''
no_ack = False
purge(nowait=False)[源代码]

从队列中删除所有就绪消息。

queue_bind(nowait=False, channel=None)[源代码]

在服务器上创建队列绑定。

queue_declare(nowait=False, passive=False, channel=None)[源代码]

在服务器上声明队列。

论点:

Nowait(Bool):不要等待回复。PASSIVE(Bool):如果设置,服务器将不创建队列。

客户端可以使用它来检查队列是否存在,而无需修改服务器状态。

queue_unbind(arguments=None, nowait=False, channel=None)[源代码]
routing_key = ''
unbind_from(exchange='', routing_key='', arguments=None, nowait=False, channel=None)[源代码]

通过从服务器删除绑定来解除绑定队列。

when_bound()[源代码]

绑定类时调用的回调。

消息制作者

class kombu.Producer(channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None)[源代码]

消息制作人。

论点:

Channel(kombu.Connection,Channel):连接或通道。Exchange(kombu.entity.Exchange,str):可选默认交换。ROUTING_KEY(Str):可选的默认路由键。序列化程序(Str):默认的序列化程序。缺省值为 "json" 。COMPRESSION(字符串):默认压缩方式。

默认设置为无压缩。

AUTO_DECLARE(Bool):自动声明默认交换

在实例化时。缺省值为 True

On_Return(Callable):回调以调用无法投递的消息,

mandatoryimmediate 争论到 publish() 使用的是。此回调需要以下签名: (exception, exchange, routing_key, message) 。请注意,生产者需要排出事件才能使用此功能。

channel
exchange = None

默认汇兑

routing_key = ''

默认路由密钥。

serializer = None

要使用的默认序列化程序。默认为JSON。

compression = None

默认压缩方法。默认情况下禁用。

auto_declare = True

默认情况下,如果设置了默认交换,则在发布消息时将声明该交换。

on_return = None

基本返回回调。

connection
declare()[源代码]

申报交易所。

注:

在实例化时,这会自动发生 auto_declare 标志已启用。

maybe_declare(entity, retry=False, **retry_policy)[源代码]

如果在本次会议期间尚未声明汇率,请声明汇率。

publish(body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, **properties)[源代码]

将消息发布到指定的Exchange。

论点:

Body(Any):消息正文。ROUTING_KEY(Str):消息路由键。Delivery_模式(枚举):请参阅 delivery_mode 。必填项(Bool):目前不支持。Immediate(Bool):当前不支持。优先级(Int):消息优先级。介于0和9之间的数字。content_type(Str):内容类型。默认设置为自动检测。CONTENT_ENCODING(字符串):内容编码。默认设置为自动检测。序列化程序(Str):要使用的序列化程序。默认设置为自动检测。压缩(Str):要使用的压缩方法。默认设置为无。Header(Dict):要传递的任意标头的映射

使用邮件正文。

Exchange(kombu.entity.Exchange,str):覆盖交换。

请注意,此交换必须已申报。

声明(顺序 [EntityT] ):所需实体的可选列表

这肯定是在发布消息之前声明的。这些实体将使用 maybe_declare()

重试(Bool):重试发布或声明实体(如果

连接中断。

RETRY_POLICY(DICT):重试配置,这是关键字

支持: ensure()

过期(浮动):可以为每条消息指定以秒为单位的TTL。

默认设置为无到期。

超时(浮点):将超时设置为等待最大超时秒数

用于发布消息。

**PROPERTIES(ANY):其他消息属性,参见AMQP规范。

revive(channel)[源代码]

断开连接后恢复制作人。

消息消费者

class kombu.Consumer(channel, queues=None, no_ack=None, auto_declare=None, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None)[源代码]

消息消费者。

论点:

Channel(kombu.Connection,Channel):请参见 channel 。队列(序列 [kombu.Queue] ):请参见 queues 。No_ack(Bool):请参见 no_ack 。AUTO_DECLARE(Bool):请参见 auto_declare 回调(顺序 [Callable] ):请参见 callbacks 。On_Message(可调用):请参阅 on_message ON_DECODE_ERROR(可调用):请参见 on_decode_error 。Prefetch_count(Int):请参见 prefetch_count

channel = None

要用于此使用者的连接/通道。

queues

单人间 Queue ,或要从中消费的队列列表。

no_ack = None

用于自动消息确认的标志。如果启用,代理将自动确认消息。这可以提高性能,但意味着您无法控制何时删除邮件。

默认情况下禁用。

auto_declare = True

默认情况下,所有实体都将在实例化时声明,如果您希望手动处理此操作,可以将其设置为 False

callbacks = None

收到消息时按顺序调用的回调列表。

回调的签名必须带有两个参数: (body, message) ,它是已解码的消息正文和 Message 举个例子。

on_message = None

每当收到消息时调用的可选函数。

定义后,将调用此函数,而不是 receive() 方法,以及 callbacks 将被禁用。

所以这可以用来替代 callbacks 当你不想让身体被自动解码的时候。请注意,如果消息具有 compression 标题集。

回调的签名必须采用单个参数,即 Message 对象。

另请注意, message.body 属性,它是消息体的原始内容,在某些情况下可能是只读的 buffer 对象。

on_decode_error = None

消息无法解码时调用的回调。

回调的签名必须带有两个参数: (message, exc) ,这是无法解码的消息,以及尝试解码它时发生的异常。

connection
declare()[源代码]

声明队列、交换和绑定。

注:

这是在实例化时自动完成的,当 auto_declare 已经设置好了。

register_callback(callback)[源代码]

注册一个要在收到消息时调用的新回调。

注:

回调的签名需要接受两个参数: (body, message) ,它是已解码的消息正文和 Message 举个例子。

add_queue(queue)[源代码]

将队列添加到要从中消费的队列列表中。

注:

这将不会从队列开始消耗,因为您将不得不调用 consume() 之后。

consume(no_ack=None)[源代码]

开始消费消息。

可以多次调用,但请注意,虽然它将使用自上次调用以来添加的新队列,但它不会取消从删除的队列中使用(使用 cancel_by_queue() )。

论点:

No_ack(Bool):请参见 no_ack

cancel()[源代码]

结束所有活动队列使用者。

注:

这不会影响已传递的消息,但它确实意味着服务器不会为该使用者发送更多消息。

cancel_by_queue(queue)[源代码]

按队列名称取消使用者。

consuming_from(queue)[源代码]

返回 True 如果当前正在从队列中消费‘。

purge()[源代码]

从所有队列中清除邮件。

警告:

这将 delete all ready messages ,则不存在撤消操作。

flow(active)[源代码]

启用/禁用来自对等设备的流量。

这是一种简单的流量控制机制,对等方可以使用它来避免使其队列溢出或发现自己接收的消息多于其可以处理的消息。

接收到停止发送内容的请求的对等点将完成发送当前内容(如果有),然后等待,直到流被重新激活。

qos(prefetch_size=0, prefetch_count=0, apply_global=False)[源代码]

指定服务质量。

客户端可以请求提前发送消息,以便当客户端完成处理消息时,下面的消息已经保存在本地,而不需要沿着通道发送。预回迁可以提高性能。

则忽略预取窗口。如果 no_ack 选项已设置。

论点:

PREFETCH_SIZE(Int):以八位字节为单位指定预取窗口。

如果消息大小等于或小于可用预取大小(也在其他预取限制内),服务器将提前发送消息。可以设置为零,这意味着“没有特定的限制”,尽管其他预取限制仍然适用。

Prefetch_count(Int):指定预取窗口

完整的信息。

APPLY_GLOBAL(Bool):在所有通道上全局应用新设置。

recover(requeue=False)[源代码]

重新传递未确认的邮件。

要求代理在指定通道上重新传递所有未确认的消息。

论点:

重新排队(Bool):默认情况下将重新传递消息

给最初的收件人。使用 requeue 设置为True时,服务器将尝试重新排队该消息,然后可能会将其传递给其他订阅者。

receive(body, message)[源代码]

在收到消息时调用的方法。

此邮件发送给已注册的 callbacks

论点:

Body(Any):解码后的消息正文。Message(~kombu.Message):消息实例。

引发未实现的错误:

如果没有消费者回调:注册。

revive(channel)[源代码]

在连接中断后重振消费者。