This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
Kombu- kombu
¶
用于Python的消息库。
- kombu.enable_insecure_serializers(choices=<object object>)[源代码]¶
启用被认为不安全的序列化程序。
注:¶
将启用
pickle
,yaml
和msgpack
默认情况下,但您也可以指定要启用的序列化程序列表(按名称或内容类型)。
- kombu.disable_insecure_serializers(allowed=<object object>)[源代码]¶
禁用不受信任的序列化程序。
将禁用所有序列化程序,但
json
或者,您可以指定要允许的反序列化程序列表。注:¶
生产者仍然能够以这些格式序列化数据,但消费者不会接受使用不受信任的内容类型的传入数据。
连接¶
- class kombu.Connection(hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs)[源代码]¶
与经纪人的联系。
示例:¶
>>> Connection('amqp://guest:guest@localhost:5672//') >>> Connection('amqp://foo;amqp://bar', ... failover_strategy='round-robin') >>> Connection('redis://', transport_options={ ... 'visibility_timeout': 3000, ... })
>>> import ssl >>> Connection('amqp://', login_method='EXTERNAL', ssl={ ... 'ca_certs': '/etc/pki/tls/certs/something.crt', ... 'keyfile': '/etc/something/system.key', ... 'certfile': '/etc/something/system.cert', ... 'cert_reqs': ssl.CERT_REQUIRED, ... })
注:¶
目前,SSL仅适用于Py-AMQP和QPID传输。对于其他交通工具,您可以使用stantnel。
论点:¶
URL(字符串,序列):代理URL,或URL列表。
关键字参数:¶
- SSL(bool/dict):使用SSL连接到服务器。
缺省值为
False
。可能不受指定传输的支持。
传输(Transport):如果未在URL中指定,则为默认传输。CONNECT_TIMEOUT(FLOAT):连接到
伺服器。可能不受指定传输的支持。
- TRANSPORTS_OPTIONS(DICT):附加连接参数的DICT
传递到备用Kombu通道实现。有关可用选项,请查阅运输文档。
- 心跳(Float):心跳间隔,单位为Int/Float秒。
请注意,如果启用了心跳,则
heartbeat_check()
方法必须定期调用,大约每秒调用一次。
注:¶
当需要时,连接被延迟地建立。如果需要建立连接,则通过调用
connect()
**>>> conn = Connection('amqp://') >>> conn.connect()
并始终记住关闭连接::
>>> conn.release()
这些选项已被URL参数替换,但仍支持向后兼容:
- 关键字主机名:
主机名/地址。注意:不能同时指定URL参数和使用HOSTNAME关键字参数。
- 关键字用户ID:
默认用户名(如果URL中未提供)。
- 关键字密码:
默认密码(如果URL中未提供)。
- 关键字VALUAL_HOST:
默认虚拟主机(如果URL中未提供)。
- 关键字端口:
默认端口(如果URL中未提供)。
属性
- hostname = None¶
- port = None¶
- userid = None¶
- password = None¶
- virtual_host = '/'¶
- ssl = None¶
- login_method = None¶
- failover_strategy = 'round-robin'¶
用于在连接失败后重新连接时选择新主机的策略。“循环”、“洗牌”或任何定制迭代器之一,不断产生新的URL以供尝试。
- connect_timeout = 5¶
- heartbeat = None¶
心跳数值,目前仅受Py-AMQP传输支持。
- default_channel¶
默认频道。
在访问时创建,并在连接关闭时关闭。
注:¶
当您只需要一个通道时,可以用于自动通道处理,如果将连接而不是通道传递给需要通道的函数,则它也是隐式使用的通道。
- connected¶
如果已建立连接,则返回TRUE。
- recoverable_connection_errors¶
可恢复的连接错误。
可以从中恢复但必须首先关闭并重新建立连接的与连接相关的异常的列表。
- recoverable_channel_errors¶
可恢复的通道错误。
无需重新建立连接即可自动恢复的与通道相关的异常列表。
- connection_errors¶
连接可能引发的异常列表。
- channel_errors¶
通道可能引发的异常列表。
- transport¶
- uri_prefix = None¶
- declared_entities = None¶
声明实体的缓存是针对每个连接的,以防服务器丢失数据。
- cycle = None¶
迭代器返回在连接失败时尝试的下一个代理URL(由
failover_strategy
)。
- host¶
以冒号分隔的主机名/端口对形式的主机。
- manager¶
AMQP管理API。
试验性管理器,可用于管理/监视Broker实例。
并非所有运输工具都可用。
- supports_heartbeats¶
- is_evented¶
方法
- as_uri(include_password=False, mask='**', getfields=operator.itemgetter('port', 'userid', 'password', 'virtual_host', 'transport')) str [源代码]¶
将连接参数转换为URL形式。
- drain_events(**kwargs)[源代码]¶
等待来自服务器的单个事件。
论点:¶
Timeout(Float):放弃前的超时时间(秒)。
- 引发socket.timeout:
如果超过超时。:
- autoretry(fun, channel=None, **ensure_options)[源代码]¶
支持的函数的装饰符
channel
关键字参数。如果生成的可调用函数引发连接或通道相关错误,则它将重试调用该函数。返回值将是
(retval, last_created_channel)
。如果一个
channel
如果没有提供,则会自动获取一个(记得在之后关闭它)。示例:¶
>>> channel = connection.channel() >>> try: ... ret, channel = connection.autoretry( ... publish_messages, channel) ... finally: ... channel.close()
- ensure(obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None, retry_errors=None)[源代码]¶
确保操作完成。
而不管发生任何通道/连接错误。
通过建立连接并重新应用函数来重试。
论点:¶
OBJ:确保对其执行操作的对象。Fun(Callable):要应用的方法。
- Errback(Callable):可选的回调
无法建立连接。提供的参数是引发的异常和将休眠的间隔
(exc, interval)
。- MAX_RETRIES(Int):重试的最大次数。
如果超过此限制,将重新引发连接错误。
- INTERVAL_START(FLOAT):我们开始的秒数
睡了一觉。
- INTERVAL_STEP(FLOAT):间隔增加了多少秒
每一次重试。
- Interval_max(浮点数):休眠间隔的最大秒数
每次重试。
- On_revive(Callable):可选的回调
恢复成功完成
- RETRY_ERROR(元组):要重试的可选错误列表
而不管连接状态如何。
示例
>>> from kombu import Connection, Producer >>> conn = Connection('amqp://') >>> producer = Producer(conn)
>>> def errback(exc, interval): ... logger.error('Error: %r', exc, exc_info=1) ... logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish, ... errback=errback, max_retries=3) >>> publish({'hello': 'world'}, routing_key='dest')
- heartbeat_check(rate=2)[源代码]¶
检查一下心跳。
允许传输器执行使心跳工作所需的任何周期性任务。这应该大约每秒都被调用。
如果当前传输不支持心跳,则这是noop操作。
论点:¶
- Rate(Int):rate是调用记号的频率
与实际的心跳值进行比较。例如,如果心跳设置为3秒,并且每隔3/2秒调用一次滴答,则速率为2。该值当前未被任何传输使用。
- close()¶
关闭连接(如果打开)。
- Producer(channel=None, *args, **kwargs)[源代码]¶
创造新
kombu.Producer
举个例子。
- Consumer(queues=None, channel=None, *args, **kwargs)[源代码]¶
创造新
kombu.Consumer
举个例子。
- Pool(limit=None, **kwargs)[源代码]¶
连接池。
论点:¶
- Limit(Int):最大活动连接数。
默认设置为无限制。
示例:¶
>>> connection = Connection('amqp://') >>> pool = connection.Pool(2) >>> c1 = pool.acquire() >>> c2 = pool.acquire() >>> c3 = pool.acquire() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "kombu/connection.py", line 354, in acquire raise ConnectionLimitExceeded(self.limit) kombu.exceptions.ConnectionLimitExceeded: 2 >>> c1.release() >>> c3 = pool.acquire()
- ChannelPool(limit=None, **kwargs)[源代码]¶
频道池。
论点:¶
- Limit(Int):活动通道的最大数量。
默认设置为无限制。
示例:¶
>>> connection = Connection('amqp://') >>> pool = connection.ChannelPool(2) >>> c1 = pool.acquire() >>> c2 = pool.acquire() >>> c3 = pool.acquire() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "kombu/connection.py", line 354, in acquire raise ChannelLimitExceeded(self.limit) kombu.connection.ChannelLimitExceeded: 2 >>> c1.release() >>> c3 = pool.acquire()
- SimpleQueue(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[源代码]¶
简单的持久队列API。
创造新
SimpleQueue
,使用来自该连接的通道。如果
name
是一个字符串,则将使用该名称作为队列和交换的名称自动创建队列和交换,并将其用作默认路由键。论点:¶
- SimpleBuffer(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[源代码]¶
简单的临时队列API。
创造新
SimpleQueue
使用来自此连接的通道。
交易所¶
创建交换声明的示例::
>>> news_exchange = Exchange('news', type='topic')
暂时 news_exchange 只是一个声明,您不能对其执行操作。它只描述了交易所的名称和选项。
交换可以是绑定的,也可以是未绑定的。绑定表示交换与通道相关联,并且可以在其上执行操作。要绑定交换,您可以使用通道作为参数调用交换::
>>> bound_exchange = news_exchange(channel)
现在您可以执行如下操作 declare()
或 delete()
**
>>> # Declare exchange manually
>>> bound_exchange.declare()
>>> # Publish raw string message using low-level exchange API
>>> bound_exchange.publish(
... 'Cure for cancer found!',
... routing_key='news.science',
... )
>>> # Delete exchange.
>>> bound_exchange.delete()
- class kombu.Exchange(name='', type='', channel=None, **kwargs)[源代码]¶
一份交易所声明。
论点:¶
名称(字符串):请参见
name
。文字(字符串):请参见type
。Channel(kombu.Connection,Channel):请参见channel
。耐久(Bool):请参见durable
。AUTO_DELETE(布尔):请参见auto_delete
。Delivery_模式(枚举):请参阅delivery_mode
。参数(DICT):请参见arguments
。NO_DECLARE(Bool):请参见no_declare
- name(str)¶
默认为无名称(默认交换)。
- 类型:
交易所的名称。
- type(str)¶
This description of AMQP exchange types was shamelessly stolen from the blog post `AMQP in 10 minutes: Part 4`_ by Rajith Attapattu. Reading this article is recommended if you're new to amqp.
“AMQP定义了四种默认交换类型(路由算法),涵盖了大多数常见的消息传递用例。AMQP Broker还可以定义其他交换类型,因此有关可用交换类型的详细信息,请参阅您的Broker手册。
direct ( default )
消息中的路由关键字与队列绑定到此交换时使用的路由条件之间的直接匹配。
topic
路由键与交换/队列绑定中指定的路由模式之间的通配符匹配。路由键被视为由分隔的零个或多个单词 "." 并支持特殊通配符。 "*" 匹配单个单词,并且 "#" 匹配零个或多个单词。
fanout
队列在没有参数的情况下绑定到此交换。因此,发送到此交换的任何消息都将被转发到绑定到此交换的所有队列。
headers
队列通过包含标头和值(可选)的参数表绑定到此交换。名为“x-Match”的特殊参数确定匹配算法,其中 "all" 隐含着一个 AND (所有配对必须匹配)和 "any" 暗示 OR (必须至少有一对匹配)。
arguments
用于指定参数。
Channel(Channel):交换绑定到的通道(如果绑定)。
- 持久(Bool):持久交换在服务器重新启动时保持活动状态。
当服务器重新启动时,非持久交换(临时交换)将被清除。缺省值为
True
。- AUTO_DELETE(Bool):如果设置,则在所有队列
已经用完了。缺省值为
False
。- DELIVERY_MODE(枚举):消息使用的默认传递模式。
该值是一个整数或别名字符串。
1或 "transient"
这条消息是暂时的。这意味着它只存储在内存中,如果服务器死机或重新启动,它就会丢失。
- 2或“永久”( default )
这条消息是持久的。这意味着消息既存储在内存中,也存储在磁盘上,因此在服务器死机或重新启动时保留。
默认值为2(永久)。
- Arguments(Dict):指定何时交换的其他参数
被宣布为。
- NO_DECLARE(Bool):从不声明此交换
(
declare()
什么都不做)。
- maybe_bind(channel: Channel | Connection) _MaybeChannelBoundType ¶
将实例绑定到通道(如果尚未绑定)。
- Message(body, delivery_mode=None, properties=None, **kwargs)[源代码]¶
创建要与其一起发送的消息实例
publish()
。论点:¶
Body(Any):消息正文。
- DELIVERY_MODE(Bool):设置自定义交付模式。
默认为
delivery_mode
。- 优先级(Int):消息优先级,0为已配置的代理
最大优先级,越高越好。
- Content_type(Str):消息内容_类型。如果为Content_Type
则不会发生序列化,因为假定这是一个二进制对象,或者您已经完成了自己的序列化。如果使用内置序列化,则保留为空,因为我们的库正确设置了CONTENT_TYPE。
- CONTENT_ENCODING(Str):该对象所在的字符集
是经过编码的。如果发送原始二进制对象,请使用“二进制”。如果使用内置序列化,因为我们的库正确设置了CONTENT_ENCODING,则将其留空。
属性(Dict):消息属性。
Headers(Dict):消息标头。
- PERSISTENT_DELIVERY_MODE = 2¶
- TRANSIENT_DELIVERY_MODE = 1¶
- attrs: tuple[tuple[str, Any], ...] = (('name', None), ('type', None), ('arguments', None), ('durable', <class 'bool'>), ('passive', <class 'bool'>), ('auto_delete', <class 'bool'>), ('delivery_mode', <function Exchange.<lambda>>), ('no_declare', <class 'bool'>))¶
- auto_delete = False¶
- bind_to(exchange='', routing_key='', arguments=None, nowait=False, channel=None, **kwargs)[源代码]¶
将交换绑定到另一个交换。
论点:¶
- NoWait(Bool):如果设置,服务器将不响应,并且调用
不会阻止等待回复。缺省值为
False
。
- property can_cache_declaration¶
布尔(X)->布尔
当参数x为True时返回True,否则返回False。内置的True和False是bool类的仅有的两个实例。类bool是int类的子类,不能作为子类。
- declare(nowait=False, passive=None, channel=None)[源代码]¶
申报交易所。
在代理上创建交换,除非设置了被动,在这种情况下,它将只断言交换存在。
- 论据:
- NoWait(Bool):如果设置,服务器将不响应,并且
我们不会等待回复。缺省值为
False
。
- delete(if_unused=False, nowait=False)[源代码]¶
删除服务器上的交换声明。
论点:¶
- IF_UNUSED(Bool):仅当交换没有绑定时才删除。
缺省值为
False
。- NoWait(Bool):如果设置,服务器将不响应,并且
我们不会等待回复。缺省值为
False
。
- delivery_mode = None¶
- durable = True¶
- name = ''¶
- no_declare = False¶
- passive = False¶
- publish(message, routing_key=None, mandatory=False, immediate=False, exchange=None)[源代码]¶
发布消息。
论点:¶
- 消息(联合 [kombu.Message, str, bytes] ):
要发布的消息。
ROUTING_KEY(Str):消息路由键。必填项(Bool):目前不支持。Immediate(Bool):当前不支持。
- type = 'direct'¶
队列¶
中使用交换创建队列的示例 Exchange
示例::
>>> science_news = Queue('science_news',
... exchange=news_exchange,
... routing_key='news.science')
暂时 science_news 只是一个声明,您不能对其执行操作。它只描述了队列的名称和选项。
队列可以绑定或解除绑定。绑定表示队列与通道相关联,并且可以对其执行操作。要绑定队列,您可以使用通道作为参数调用队列实例::
>>> bound_science_news = science_news(channel)
现在您可以执行如下操作 declare()
或 purge()
:
>>> bound_science_news.declare()
>>> bound_science_news.purge()
>>> bound_science_news.delete()
- class kombu.Queue(name='', exchange=None, routing_key='', channel=None, bindings=None, on_declared=None, **kwargs)[源代码]¶
队列声明。
论点:¶
名称(字符串):请参见
name
。Exchange(Exchange,str):请参阅exchange
。ROUTING_KEY(字符串):参见routing_key
。Channel(kombu.Connection,Channel):请参见channel
。耐久(Bool):请参见durable
。独家(Bool):请参阅exclusive
。AUTO_DELETE(布尔):请参见auto_delete
。Queue_Arguments(Dict):请参见queue_arguments
。BINDING_ARGUMENTS(Dict):请参见binding_arguments
。Consumer_Arguments(Dict):请参阅consumer_arguments
。NO_DECLARE(Bool):请参见no_declare
。ON_DECLARATED(可调用):请参见on_declared
。Expires(浮动):请参见expires
。Message_ttl(浮点):请参见message_ttl
。最大长度(Int):请参见max_length
。最大长度字节数(Int):请参见max_length_bytes
。MAX_PRIORITY(Int):请参见max_priority
。- name(str)¶
默认情况下没有名称(默认队列目标)。
- 类型:
队列的名称。
- routing_key(str)¶
路由键的解释取决于
Exchange.type
。直接兑换
如果消息的路由键属性与
routing_key
属性是相同的。扇出交换
始终匹配,即使绑定没有键。
话题交流
通过基元模式匹配方案匹配消息的路由键属性。然后,消息路由键由由点分隔的单词组成 ("." ,如域名),并且有两个特殊字符可用;星号 ("*" )和散列 ("#" )。星号匹配任何单词,而散列匹配零个或多个单词。例如 "*.stock.#" 匹配路由关键字 "usd.stock" 和 "eur.stock.db" 但不是 "stock.nasdaq" 。
- 类型:
路由密钥(如果有),也称为 binding key 。
- channel(ChannelT)¶
- 类型:
队列绑定到的通道(如果绑定)。
- durable(bool)¶
如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。
缺省值为
True
。- 类型:
服务器重新启动时,持久队列保持活动状态。
- exclusive(bool)¶
当前连接。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。
缺省值为
False
。- 类型:
独占队列只能由
- auto_delete(bool)¶
已经用完了。最后一个用户可以显式取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。
- 类型:
如果设置,则在所有使用者
- expires(float)¶
队列应该过期。
过期时间决定队列在被自动删除之前可以保持多长时间不使用。 Unused 表示队列没有使用者,队列尚未重新声明,并且
Queue.get
至少在到期期限内未被调用。请参阅https://www.rabbitmq.com/ttl.html#queue-ttl
RabbitMQ extension :仅在使用RabbitMQ时可用。
- 类型:
设置此事件的过期时间(以秒为单位
- message_ttl(float)¶
此设置控制消息可以在队列中保留多长时间而不被使用。如果在消息使用者收到该消息之前已过了过期时间,则该消息将被删除,并且不会有任何使用者看到该消息。
请参阅https://www.rabbitmq.com/ttl.html#per-queue-message-ttl
RabbitMQ extension :仅在使用RabbitMQ时可用。
- 类型:
消息生存时间(秒)。
- max_length(int)¶
排队可以坚持。
如果队列大小中的消息数超过此限制,则新消息将被丢弃(如果死信交换处于活动状态,则为死信)。
请参阅https://www.rabbitmq.com/maxlength.html
RabbitMQ extension :仅在使用RabbitMQ时可用。
- 类型:
设置的最大消息数
- max_length_bytes(int)¶
队列中的消息的数量。
如果队列中所有消息的总大小超过此限制,则新消息将被丢弃(如果死信交换处于活动状态,则为死信)。
RabbitMQ extension :仅在使用RabbitMQ时可用。
- 类型:
设置总计的最大大小(以字节为单位
- max_priority(int)¶
例如,如果值为10,则可以传递到此队列的消息可以具有
priority
介于0和10之间的值,其中10是最高优先级。未设置最大优先级的RabbitMQ队列将忽略消息中的优先级字段,因此,如果需要优先级,则需要设置最大优先级字段以将队列声明为优先级队列。
RabbitMQ extension :仅在使用RabbitMQ时可用。
- 类型:
设置此队列的最高优先级数。
- queue_arguments(Dict)¶
排队。可用于设置RabbitMQ/AMQP的参数值
queue.declare
。- 类型:
声明时使用的其他参数
- binding_arguments(Dict)¶
排队。可用于设置RabbitMQ/AMQP的参数值
queue.declare
。- 类型:
绑定时使用的其他参数
- consumer_arguments(Dict)¶
从这个队列中。可用于设置RabbitMQ/AMQP的参数值
basic.consume
。- 类型:
使用时使用的其他参数
- alias(str)¶
其中,例如为具有自动生成的队列名称的队列提供备用名称。
- 类型:
在Kombu中未使用,但应用程序可以利用
- on_declared(Callable)¶
队列已声明(
queue_declare
操作完成)。此函数必须具有至少接受3个位置参数的签名:(name, messages, consumers)
。- 类型:
时要应用的可选回调
- maybe_bind(channel: Channel | Connection) _MaybeChannelBoundType ¶
将实例绑定到通道(如果尚未绑定)。
- exception ContentDisallowed¶
消费者不允许此内容类型。
- attrs: tuple[tuple[str, Any], ...] = (('name', None), ('exchange', None), ('routing_key', None), ('queue_arguments', None), ('binding_arguments', None), ('consumer_arguments', None), ('durable', <class 'bool'>), ('exclusive', <class 'bool'>), ('auto_delete', <class 'bool'>), ('no_ack', None), ('alias', None), ('bindings', <class 'list'>), ('no_declare', <class 'bool'>), ('expires', <class 'float'>), ('message_ttl', <class 'float'>), ('max_length', <class 'int'>), ('max_length_bytes', <class 'int'>), ('max_priority', <class 'int'>))¶
- auto_delete = False¶
- property can_cache_declaration¶
布尔(X)->布尔
当参数x为True时返回True,否则返回False。内置的True和False是bool类的仅有的两个实例。类bool是int类的子类,不能作为子类。
- consume(consumer_tag='', callback=None, no_ack=None, nowait=False)[源代码]¶
启动队列使用者。
消费者的存续期和他们创建的渠道一样长,或者直到客户取消他们为止。
论点:¶
- Consumer_tag(Str):消费者的唯一标识符。
使用者标记是连接的本地标记,因此两个客户端可以使用相同的使用者标记。如果此字段为空,服务器将生成唯一标记。
- No_ack(Bool):如果启用,代理将自动
确认消息。
Nowait(Bool):不要等待回复。
回调(Callable):为每条传递的消息调用回调。
- delete(if_unused=False, if_empty=False, nowait=False)[源代码]¶
删除队列。
论点:¶
- IF_UNUSED(Bool):如果设置,服务器将只删除队列
如果它没有消费者的话。如果队列有消费者,则会引发通道错误。
- If_Empty(Bool):如果设置,服务器将仅在以下情况下删除队列
它是空的。如果它不为空,则会引发通道错误。
Nowait(Bool):不要等待回复。
- durable = True¶
- exchange = <unbound Exchange ''(direct)>¶
- exclusive = False¶
- get(no_ack=None, accept=None)[源代码]¶
轮询服务器以获取新消息。
此方法使用同步对话提供对队列中消息的直接访问,该对话是为同步功能比性能更重要的特定类型的应用程序设计的。
- 返回:
~kombu.Message --或
None
否则的话。- 返回类型:
if a message was available,
论点:¶
- No_ack(Bool):如果启用,代理将
自动确认消息。
接受(设置 [str] ):可接受内容类型的自定义列表。
- name = ''¶
- no_ack = False¶
- queue_declare(nowait=False, passive=False, channel=None)[源代码]¶
在服务器上声明队列。
论点:¶
Nowait(Bool):不要等待回复。PASSIVE(Bool):如果设置,服务器将不创建队列。
客户端可以使用它来检查队列是否存在,而无需修改服务器状态。
- routing_key = ''¶
消息制作者¶
- class kombu.Producer(channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None)[源代码]¶
消息制作人。
论点:¶
Channel(kombu.Connection,Channel):连接或通道。Exchange(kombu.entity.Exchange,str):可选默认交换。ROUTING_KEY(Str):可选的默认路由键。序列化程序(Str):默认的序列化程序。缺省值为 "json" 。COMPRESSION(字符串):默认压缩方式。
默认设置为无压缩。
- AUTO_DECLARE(Bool):自动声明默认交换
在实例化时。缺省值为
True
。- On_Return(Callable):回调以调用无法投递的消息,
当 mandatory 或 immediate 争论到
publish()
使用的是。此回调需要以下签名: (exception, exchange, routing_key, message) 。请注意,生产者需要排出事件才能使用此功能。
- channel¶
- exchange = None¶
默认汇兑
- routing_key = ''¶
默认路由密钥。
- serializer = None¶
要使用的默认序列化程序。默认为JSON。
- compression = None¶
默认压缩方法。默认情况下禁用。
- auto_declare = True¶
默认情况下,如果设置了默认交换,则在发布消息时将声明该交换。
- on_return = None¶
基本返回回调。
- connection¶
- declare()[源代码]¶
申报交易所。
注:¶
在实例化时,这会自动发生
auto_declare
标志已启用。
- publish(body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, **properties)[源代码]¶
将消息发布到指定的Exchange。
论点:¶
Body(Any):消息正文。ROUTING_KEY(Str):消息路由键。Delivery_模式(枚举):请参阅
delivery_mode
。必填项(Bool):目前不支持。Immediate(Bool):当前不支持。优先级(Int):消息优先级。介于0和9之间的数字。content_type(Str):内容类型。默认设置为自动检测。CONTENT_ENCODING(字符串):内容编码。默认设置为自动检测。序列化程序(Str):要使用的序列化程序。默认设置为自动检测。压缩(Str):要使用的压缩方法。默认设置为无。Header(Dict):要传递的任意标头的映射使用邮件正文。
- Exchange(kombu.entity.Exchange,str):覆盖交换。
请注意,此交换必须已申报。
- 声明(顺序 [EntityT] ):所需实体的可选列表
这肯定是在发布消息之前声明的。这些实体将使用
maybe_declare()
。- 重试(Bool):重试发布或声明实体(如果
连接中断。
- RETRY_POLICY(DICT):重试配置,这是关键字
支持:
ensure()
。- 过期(浮动):可以为每条消息指定以秒为单位的TTL。
默认设置为无到期。
- 超时(浮点):将超时设置为等待最大超时秒数
用于发布消息。
**PROPERTIES(ANY):其他消息属性,参见AMQP规范。
消息消费者¶
- class kombu.Consumer(channel, queues=None, no_ack=None, auto_declare=None, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None)[源代码]¶
消息消费者。
论点:¶
Channel(kombu.Connection,Channel):请参见
channel
。队列(序列 [kombu.Queue] ):请参见queues
。No_ack(Bool):请参见no_ack
。AUTO_DECLARE(Bool):请参见auto_declare
回调(顺序 [Callable] ):请参见callbacks
。On_Message(可调用):请参阅on_message
ON_DECODE_ERROR(可调用):请参见on_decode_error
。Prefetch_count(Int):请参见prefetch_count
。- channel = None¶
要用于此使用者的连接/通道。
- no_ack = None¶
用于自动消息确认的标志。如果启用,代理将自动确认消息。这可以提高性能,但意味着您无法控制何时删除邮件。
默认情况下禁用。
- auto_declare = True¶
默认情况下,所有实体都将在实例化时声明,如果您希望手动处理此操作,可以将其设置为
False
。
- callbacks = None¶
收到消息时按顺序调用的回调列表。
回调的签名必须带有两个参数: (body, message) ,它是已解码的消息正文和
Message
举个例子。
- on_message = None¶
每当收到消息时调用的可选函数。
定义后,将调用此函数,而不是
receive()
方法,以及callbacks
将被禁用。所以这可以用来替代
callbacks
当你不想让身体被自动解码的时候。请注意,如果消息具有compression
标题集。回调的签名必须采用单个参数,即
Message
对象。另请注意,
message.body
属性,它是消息体的原始内容,在某些情况下可能是只读的buffer
对象。
- on_decode_error = None¶
消息无法解码时调用的回调。
回调的签名必须带有两个参数: (message, exc) ,这是无法解码的消息,以及尝试解码它时发生的异常。
- connection¶
- declare()[源代码]¶
声明队列、交换和绑定。
注:¶
这是在实例化时自动完成的,当
auto_declare
已经设置好了。
- register_callback(callback)[源代码]¶
注册一个要在收到消息时调用的新回调。
注:¶
回调的签名需要接受两个参数: (body, message) ,它是已解码的消息正文和
Message
举个例子。
- consume(no_ack=None)[源代码]¶
开始消费消息。
可以多次调用,但请注意,虽然它将使用自上次调用以来添加的新队列,但它不会取消从删除的队列中使用(使用
cancel_by_queue()
)。论点:¶
No_ack(Bool):请参见
no_ack
。
- flow(active)[源代码]¶
启用/禁用来自对等设备的流量。
这是一种简单的流量控制机制,对等方可以使用它来避免使其队列溢出或发现自己接收的消息多于其可以处理的消息。
接收到停止发送内容的请求的对等点将完成发送当前内容(如果有),然后等待,直到流被重新激活。
- qos(prefetch_size=0, prefetch_count=0, apply_global=False)[源代码]¶
指定服务质量。
客户端可以请求提前发送消息,以便当客户端完成处理消息时,下面的消息已经保存在本地,而不需要沿着通道发送。预回迁可以提高性能。
则忽略预取窗口。如果
no_ack
选项已设置。论点:¶
- PREFETCH_SIZE(Int):以八位字节为单位指定预取窗口。
如果消息大小等于或小于可用预取大小(也在其他预取限制内),服务器将提前发送消息。可以设置为零,这意味着“没有特定的限制”,尽管其他预取限制仍然适用。
- Prefetch_count(Int):指定预取窗口
完整的信息。
APPLY_GLOBAL(Bool):在所有通道上全局应用新设置。
- recover(requeue=False)[源代码]¶
重新传递未确认的邮件。
要求代理在指定通道上重新传递所有未确认的消息。
论点:¶
- 重新排队(Bool):默认情况下将重新传递消息
给最初的收件人。使用 requeue 设置为True时,服务器将尝试重新排队该消息,然后可能会将其传递给其他订阅者。