This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

生产者

基础知识

您可以使用 Connection

>>> producer = connection.Producer()

您还可以实例化 Producer 它直接将通道或连接作为参数:

>>> with Connection('amqp://') as conn:
...     with conn.channel() as channel:
...          producer = Producer(channel)

有了Producer实例,您可以发布消息:

>>> from kombu import Exchange

>>> exchange = Exchange('name', type='direct')

>>> producer.publish(
...      {'hello': 'world'},  # message to send
...      exchange=exchange,   # destination exchange
...      routing_key='rk',    # destination routing key,
...      declare=[exchange],  # make sure exchange is declared,
... )

大多数情况下,您将从连接池获得一个连接,该连接可能是陈旧的,或者您可能在发送消息的过程中丢失该连接。使用重试是处理这些间歇性故障的好方法:

>>> producer.publish({'hello': 'world', ..., retry=True})

此外,还可以指定重试策略,它是受 retry_over_time() 功能

>>> producer.publish(
...     {'hello': 'world'}, ...,
...     retry=True,
...     retry_policy={
...         'interval_start': 0, # First retry immediately,
...         'interval_step': 2,  # then increase by 2s for every retry.
...         'interval_max': 30,  # but don't exceed 30s between retries.
...         'max_retries': 30,   # give up after 30 tries.
...     },
... )

这个 declare 参数允许您传递在发送消息之前必须声明的实体列表。这在使用 retry 标志,因为代理可能在重试期间实际重新启动,在这种情况下,非持久性实体将被删除。

假设您正在编写一个任务队列,而工作进程可能尚未启动,因此队列不会声明。在这种情况下,您需要同时定义交换和声明队列,以便在工作进程离线时将消息传递到队列:

>>> from kombu import Exchange, Queue
>>> task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks')

>>> producer.publish(
...     {'hello': 'world'}, ...,
...     retry=True,
...     exchange=task_queue.exchange,
...     routing_key=task_queue.routing_key,
...     declare=[task_queue],  # declares exchange, queue and binds.
... )

使用匿名交换绕过路由

您可以绕过Broker的路由机制,直接投递到队列中,使用anon-Exchange:将交换参数设置为空字符串,将路由键设置为队列的名称:

>>> producer.publish(
...     {'hello': 'world'},
...     exchange='',
...     routing_key=task_queue.name,
... )

序列化

JSON是将非字符串对象传递给发布时的默认序列化程序,但您也可以指定不同的序列化程序:

>>> producer.publish({'hello': 'world'}, serializer='pickle')

看见 序列化 以获取更多信息。

参考

class kombu.Producer(channel, exchange=None, routing_key=None, serializer=None, auto_declare=None, compression=None, on_return=None)[源代码]

消息制作人。

论点:

Channel(kombu.Connection,Channel):连接或通道。Exchange(kombu.entity.Exchange,str):可选默认交换。ROUTING_KEY(Str):可选的默认路由键。序列化程序(Str):默认的序列化程序。缺省值为 "json" 。COMPRESSION(字符串):默认压缩方式。

默认设置为无压缩。

AUTO_DECLARE(Bool):自动声明默认交换

在实例化时。缺省值为 True

On_Return(Callable):回调以调用无法投递的消息,

mandatoryimmediate 争论到 publish() 使用的是。此回调需要以下签名: (exception, exchange, routing_key, message) 。请注意,生产者需要排出事件才能使用此功能。

auto_declare = True

默认情况下,如果设置了默认交换,则在发布消息时将声明该交换。

compression = None

默认压缩方法。默认情况下禁用。

declare()[源代码]

申报交易所。

注:

在实例化时,这会自动发生 auto_declare 标志已启用。

exchange = None

默认汇兑

maybe_declare(entity, retry=False, **retry_policy)[源代码]

如果在本次会议期间尚未声明汇率,请声明汇率。

on_return = None

基本返回回调。

publish(body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, **properties)[源代码]

将消息发布到指定的Exchange。

论点:

Body(Any):消息正文。ROUTING_KEY(Str):消息路由键。Delivery_模式(枚举):请参阅 delivery_mode 。必填项(Bool):目前不支持。Immediate(Bool):当前不支持。优先级(Int):消息优先级。介于0和9之间的数字。content_type(Str):内容类型。默认设置为自动检测。CONTENT_ENCODING(字符串):内容编码。默认设置为自动检测。序列化程序(Str):要使用的序列化程序。默认设置为自动检测。压缩(Str):要使用的压缩方法。默认设置为无。Header(Dict):要传递的任意标头的映射

使用邮件正文。

Exchange(kombu.entity.Exchange,str):覆盖交换。

请注意,此交换必须已申报。

声明(顺序 [EntityT] ):所需实体的可选列表

这肯定是在发布消息之前声明的。这些实体将使用 maybe_declare()

重试(Bool):重试发布或声明实体(如果

连接中断。

RETRY_POLICY(DICT):重试配置,这是关键字

支持: ensure()

过期(浮动):可以为每条消息指定以秒为单位的TTL。

默认设置为无到期。

超时(浮点):将超时设置为等待最大超时秒数

用于发布消息。

**PROPERTIES(ANY):其他消息属性,参见AMQP规范。

revive(channel)[源代码]

断开连接后恢复制作人。

routing_key = ''

默认路由密钥。

serializer = None

要使用的默认序列化程序。默认为JSON。