This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

胡萝卜兼容性- kombu.compat

胡萝卜兼容界面。

有关文档,请参阅https://pypi.org/project/carrot/。

出版者

替换为 kombu.Producer

class kombu.compat.Publisher(connection, exchange=None, routing_key=None, exchange_type=None, durable=None, auto_delete=None, channel=None, **kwargs)[源代码]

胡萝卜兼容生产商。

auto_declare = True

默认情况下,如果设置了默认交换,则在发布消息时将声明该交换。

auto_delete = False
property backend
property channel
close()[源代码]
compression = None

默认压缩方法。默认情况下禁用。

property connection
declare()

申报交易所。

注:

在实例化时,这会自动发生 auto_declare 标志已启用。

durable = True
exchange = ''

默认汇兑

exchange_type = 'direct'
maybe_declare(entity, retry=False, **retry_policy)

如果在本次会议期间尚未声明汇率,请声明汇率。

on_return = None

基本返回回调。

publish(body, routing_key=None, delivery_mode=None, mandatory=False, immediate=False, priority=0, content_type=None, content_encoding=None, serializer=None, headers=None, compression=None, exchange=None, retry=False, retry_policy=None, declare=None, expiration=None, timeout=None, **properties)

将消息发布到指定的Exchange。

论点:

Body(Any):消息正文。ROUTING_KEY(Str):消息路由键。Delivery_模式(枚举):请参阅 delivery_mode 。必填项(Bool):目前不支持。Immediate(Bool):当前不支持。优先级(Int):消息优先级。介于0和9之间的数字。content_type(Str):内容类型。默认设置为自动检测。CONTENT_ENCODING(字符串):内容编码。默认设置为自动检测。序列化程序(Str):要使用的序列化程序。默认设置为自动检测。压缩(Str):要使用的压缩方法。默认设置为无。Header(Dict):要传递的任意标头的映射

使用邮件正文。

Exchange(kombu.entity.Exchange,str):覆盖交换。

请注意,此交换必须已申报。

声明(顺序 [EntityT] ):所需实体的可选列表

这肯定是在发布消息之前声明的。这些实体将使用 maybe_declare()

重试(Bool):重试发布或声明实体(如果

连接中断。

RETRY_POLICY(DICT):重试配置,这是关键字

支持: ensure()

过期(浮动):可以为每条消息指定以秒为单位的TTL。

默认设置为无到期。

超时(浮点):将超时设置为等待最大超时秒数

用于发布消息。

**PROPERTIES(ANY):其他消息属性,参见AMQP规范。

release()
revive(channel)

断开连接后恢复制作人。

routing_key = ''

默认路由密钥。

send(*args, **kwargs)[源代码]
serializer = None

要使用的默认序列化程序。默认为JSON。

消费者

替换为 kombu.Consumer

class kombu.compat.Consumer(connection, queue=None, exchange=None, routing_key=None, exchange_type=None, durable=None, exclusive=None, auto_delete=None, **kwargs)[源代码]

胡萝卜兼容的消费者。

exception ContentDisallowed

消费者不允许此内容类型。

add_note()

Exception.add_note(Note)--向异常添加注释

args
with_traceback()

Exception.with_traceback(TB)--set self.__traceback__ 去结核病,回到赛尔夫。

accept = None

接受的内容类型列表。

如果使用者接收到具有不受信任内容类型的消息,则会引发异常。默认情况下,接受所有内容类型,但不接受 kombu.disable_untrusted_serializers() 在这种情况下,只允许使用json。

add_queue(queue)[源代码]

将队列添加到要从中消费的队列列表中。

注:

这将不会从队列开始消耗,因为您将不得不调用 consume() 之后。

auto_declare = True

默认情况下,所有实体都将在实例化时声明,如果您希望手动处理此操作,可以将其设置为 False

auto_delete = False
callbacks = None

收到消息时按顺序调用的回调列表。

回调的签名必须带有两个参数: (body, message) ,它是已解码的消息正文和 Message 举个例子。

cancel()[源代码]

结束所有活动队列使用者。

注:

这不会影响已传递的消息,但它确实意味着服务器不会为该使用者发送更多消息。

cancel_by_queue(queue)[源代码]

按队列名称取消使用者。

channel = None

要用于此使用者的连接/通道。

close()[源代码]

结束所有活动队列使用者。

注:

这不会影响已传递的消息,但它确实意味着服务器不会为该使用者发送更多消息。

property connection
consume(no_ack=None)[源代码]

开始消费消息。

可以多次调用,但请注意,虽然它将使用自上次调用以来添加的新队列,但它不会取消从删除的队列中使用(使用 cancel_by_queue() )。

论点:

No_ack(Bool):请参见 no_ack

consuming_from(queue)[源代码]

返回 True 如果当前正在从队列中消费‘。

declare()[源代码]

声明队列、交换和绑定。

注:

这是在实例化时自动完成的,当 auto_declare 已经设置好了。

discard_all(filterfunc=None)[源代码]
durable = True
exchange = ''
exchange_type = 'direct'
exclusive = False
fetch(no_ack=None, enable_callbacks=False)[源代码]
flow(active)[源代码]

启用/禁用来自对等设备的流量。

这是一种简单的流量控制机制,对等方可以使用它来避免使其队列溢出或发现自己接收的消息多于其可以处理的消息。

接收到停止发送内容的请求的对等点将完成发送当前内容(如果有),然后等待,直到流被重新激活。

iterconsume(limit=None, no_ack=None)[源代码]
iterqueue(limit=None, infinite=False)[源代码]
no_ack = None

用于自动消息确认的标志。如果启用,代理将自动确认消息。这可以提高性能,但意味着您无法控制何时删除邮件。

默认情况下禁用。

on_decode_error = None

消息无法解码时调用的回调。

回调的签名必须带有两个参数: (message, exc) ,这是无法解码的消息,以及尝试解码它时发生的异常。

on_message = None

每当收到消息时调用的可选函数。

定义后,将调用此函数,而不是 receive() 方法,以及 callbacks 将被禁用。

所以这可以用来替代 callbacks 当你不想让身体被自动解码的时候。请注意,如果消息具有 compression 标题集。

回调的签名必须采用单个参数,即 Message 对象。

另请注意, message.body 属性,它是消息体的原始内容,在某些情况下可能是只读的 buffer 对象。

prefetch_count = None

初始预取计数

如果设置,消费者将在启动时设置PREFETCH_COUNT Qos值。也可以使用以下命令更改 qos()

process_next()[源代码]
purge()[源代码]

从所有队列中清除邮件。

警告:

这将 delete all ready messages ,则不存在撤消操作。

qos(prefetch_size=0, prefetch_count=0, apply_global=False)[源代码]

指定服务质量。

客户端可以请求提前发送消息,以便当客户端完成处理消息时,下面的消息已经保存在本地,而不需要沿着通道发送。预回迁可以提高性能。

则忽略预取窗口。如果 no_ack 选项已设置。

论点:

PREFETCH_SIZE(Int):以八位字节为单位指定预取窗口。

如果消息大小等于或小于可用预取大小(也在其他预取限制内),服务器将提前发送消息。可以设置为零,这意味着“没有特定的限制”,尽管其他预取限制仍然适用。

Prefetch_count(Int):指定预取窗口

完整的信息。

APPLY_GLOBAL(Bool):在所有通道上全局应用新设置。

queue = ''
property queues
receive(body, message)[源代码]

在收到消息时调用的方法。

此邮件发送给已注册的 callbacks

论点:

Body(Any):解码后的消息正文。Message(~kombu.Message):消息实例。

引发未实现的错误:

如果没有消费者回调:注册。

recover(requeue=False)[源代码]

重新传递未确认的邮件。

要求代理在指定通道上重新传递所有未确认的消息。

论点:

重新排队(Bool):默认情况下将重新传递消息

给最初的收件人。使用 requeue 设置为True时,服务器将尝试重新排队该消息,然后可能会将其传递给其他订阅者。

register_callback(callback)[源代码]

注册一个要在收到消息时调用的新回调。

注:

回调的签名需要接受两个参数: (body, message) ,它是已解码的消息正文和 Message 举个例子。

revive(channel)[源代码]

在连接中断后重振消费者。

routing_key = ''
wait(limit=None)[源代码]

ConsumerSet

替换为 kombu.Consumer

class kombu.compat.ConsumerSet(connection, from_dict=None, consumers=None, channel=None, **kwargs)[源代码]
exception ContentDisallowed

消费者不允许此内容类型。

add_note()

Exception.add_note(Note)--向异常添加注释

args
with_traceback()

Exception.with_traceback(TB)--set self.__traceback__ 去结核病,回到赛尔夫。

accept = None

接受的内容类型列表。

如果使用者接收到具有不受信任内容类型的消息,则会引发异常。默认情况下,接受所有内容类型,但不接受 kombu.disable_untrusted_serializers() 在这种情况下,只允许使用json。

add_consumer(consumer)[源代码]
add_consumer_from_dict(queue, **options)[源代码]
add_queue(queue)

将队列添加到要从中消费的队列列表中。

注:

这将不会从队列开始消耗,因为您将不得不调用 consume() 之后。

auto_declare = True

默认情况下,所有实体都将在实例化时声明,如果您希望手动处理此操作,可以将其设置为 False

callbacks = None

收到消息时按顺序调用的回调列表。

回调的签名必须带有两个参数: (body, message) ,它是已解码的消息正文和 Message 举个例子。

cancel()

结束所有活动队列使用者。

注:

这不会影响已传递的消息,但它确实意味着服务器不会为该使用者发送更多消息。

cancel_by_queue(queue)

按队列名称取消使用者。

channel = None

要用于此使用者的连接/通道。

close()[源代码]

结束所有活动队列使用者。

注:

这不会影响已传递的消息,但它确实意味着服务器不会为该使用者发送更多消息。

property connection
consume(no_ack=None)

开始消费消息。

可以多次调用,但请注意,虽然它将使用自上次调用以来添加的新队列,但它不会取消从删除的队列中使用(使用 cancel_by_queue() )。

论点:

No_ack(Bool):请参见 no_ack

consuming_from(queue)

返回 True 如果当前正在从队列中消费‘。

declare()

声明队列、交换和绑定。

注:

这是在实例化时自动完成的,当 auto_declare 已经设置好了。

discard_all()[源代码]
flow(active)

启用/禁用来自对等设备的流量。

这是一种简单的流量控制机制,对等方可以使用它来避免使其队列溢出或发现自己接收的消息多于其可以处理的消息。

接收到停止发送内容的请求的对等点将完成发送当前内容(如果有),然后等待,直到流被重新激活。

iterconsume(limit=None, no_ack=False)[源代码]
no_ack = None

用于自动消息确认的标志。如果启用,代理将自动确认消息。这可以提高性能,但意味着您无法控制何时删除邮件。

默认情况下禁用。

on_decode_error = None

消息无法解码时调用的回调。

回调的签名必须带有两个参数: (message, exc) ,这是无法解码的消息,以及尝试解码它时发生的异常。

on_message = None

每当收到消息时调用的可选函数。

定义后,将调用此函数,而不是 receive() 方法,以及 callbacks 将被禁用。

所以这可以用来替代 callbacks 当你不想让身体被自动解码的时候。请注意,如果消息具有 compression 标题集。

回调的签名必须采用单个参数,即 Message 对象。

另请注意, message.body 属性,它是消息体的原始内容,在某些情况下可能是只读的 buffer 对象。

prefetch_count = None

初始预取计数

如果设置,消费者将在启动时设置PREFETCH_COUNT Qos值。也可以使用以下命令更改 qos()

purge()

从所有队列中清除邮件。

警告:

这将 delete all ready messages ,则不存在撤消操作。

qos(prefetch_size=0, prefetch_count=0, apply_global=False)

指定服务质量。

客户端可以请求提前发送消息,以便当客户端完成处理消息时,下面的消息已经保存在本地,而不需要沿着通道发送。预回迁可以提高性能。

则忽略预取窗口。如果 no_ack 选项已设置。

论点:

PREFETCH_SIZE(Int):以八位字节为单位指定预取窗口。

如果消息大小等于或小于可用预取大小(也在其他预取限制内),服务器将提前发送消息。可以设置为零,这意味着“没有特定的限制”,尽管其他预取限制仍然适用。

Prefetch_count(Int):指定预取窗口

完整的信息。

APPLY_GLOBAL(Bool):在所有通道上全局应用新设置。

property queues
receive(body, message)

在收到消息时调用的方法。

此邮件发送给已注册的 callbacks

论点:

Body(Any):解码后的消息正文。Message(~kombu.Message):消息实例。

引发未实现的错误:

如果没有消费者回调:注册。

recover(requeue=False)

重新传递未确认的邮件。

要求代理在指定通道上重新传递所有未确认的消息。

论点:

重新排队(Bool):默认情况下将重新传递消息

给最初的收件人。使用 requeue 设置为True时,服务器将尝试重新排队该消息,然后可能会将其传递给其他订阅者。

register_callback(callback)

注册一个要在收到消息时调用的新回调。

注:

回调的签名需要接受两个参数: (body, message) ,它是已解码的消息正文和 Message 举个例子。

revive(channel)[源代码]

在连接中断后重振消费者。