This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

消费者

基础知识

这个 Consumer 获取要从中消费的连接(或通道)和队列列表。几个消费者可以混合在一起从不同的渠道消费,因为他们都绑定到相同的连接,并且 drain_events 将从该连接上的所有通道中排出事件。

备注

从3.0开始,Kombu默认只接受JSON/BINARY或文本消息,要允许反序列化其他格式,您必须在 accept 参数(除了为消息设置正确的内容类型外):

>>> Consumer(conn, accept=['json', 'pickle', 'msgpack', 'yaml'])

您可以使用连接创建使用者。此使用者正在使用名称为的单个队列 'queue'

>>> queue = Queue('queue', routing_key='queue')
>>> consumer = connection.Consumer(queue)

您也可以直接实例化Consumer,它以通道或连接作为参数。该使用者还从具有名称的单个队列消费 'queue'

>>> queue = Queue('queue', routing_key='queue')
>>> with Connection('amqp://') as conn:
...     with conn.channel() as channel:
...         consumer = Consumer(channel, queue)

消费者需要为接收到的数据指定一个处理程序。此处理程序以回调的形式指定。每次收到新消息时,Kombu都会调用回调函数。该回调带有两个参数: body ,包含由生产者发送的反序列化数据,以及 Message 实例 message 。当设置了手动确认时,用户负责确认消息。

>>> def callback(body, message):
...     print(body)
...     message.ack()

>>> consumer.register_callback(callback)

从单个使用者中排出事件

方法 drain_events 默认情况下无限期阻止。此示例将超时设置为1秒:

>>> with consumer:
...     connection.drain_events(timeout=1)

耗尽多个消费者的事件

每个消费者都有自己的队列列表。每个使用者都接受 'json' 格式:

>>> from kombu.utils.compat import nested

>>> queues1 = [Queue('queue11', routing_key='queue11'),
               Queue('queue12', routing_key='queue12')]
>>> queues2 = [Queue('queue21', routing_key='queue21'),
               Queue('queue22', routing_key='queue22')]
>>> with connection.channel(), connection.channel() as (channel1, channel2):
...     with nested(Consumer(channel1, queues1, accept=['json']),
...                 Consumer(channel2, queues2, accept=['json'])):
...         connection.drain_events(timeout=1)

完整的示例如下所示:

from kombu import Connection, Consumer, Queue

def callback(body, message):
    print('RECEIVED MESSAGE: {0!r}'.format(body))
    message.ack()

queue1 = Queue('queue1', routing_key='queue1')
queue2 = Queue('queue2', routing_key='queue2')

with Connection('amqp://') as conn:
    with conn.channel() as channel:
        consumer = Consumer(conn, [queue1, queue2], accept=['json'])
        consumer.register_callback(callback)
        with consumer:
            conn.drain_events(timeout=1)

使用者Mixin类

Kombu在模块中提供了预定义的Mixin类 mixins 。它包含两个类: ConsumerMixin 用于创造消费者和 ConsumerProducerMixin 用于创建同时支持发布消息的消费者。只需派生MixIn类的子类并覆盖其中的一些方法即可创建消费者:

from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [
            Consumer(channel, callbacks=[self.on_message], accept=['json']),
        ]

    def on_message(self, body, message):
        print('RECEIVED MESSAGE: {0!r}'.format(body))
        message.ack()

C(connection).run()

并再次使用多个频道:

from kombu import Consumer
from kombu.mixins import ConsumerMixin

class C(ConsumerMixin):
    channel2 = None

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, _, default_channel):
        self.channel2 = default_channel.connection.channel()
        return [Consumer(default_channel, queues1,
                         callbacks=[self.on_message],
                         accept=['json']),
                Consumer(self.channel2, queues2,
                         callbacks=[self.on_special_message],
                         accept=['json'])]

    def on_consume_end(self, connection, default_channel):
        if self.channel2:
            self.channel2.close()

C(connection).run()

的主要用途 ConsumerProducerMixin 是创建还需要在单独的连接上发布消息的使用者(例如,发送RPC回复、流结果):

from kombu import Producer, Queue
from kombu.mixins import ConsumerProducerMixin

rpc_queue = Queue('rpc_queue')

class Worker(ConsumerProducerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(
            queues=[rpc_queue],
            on_message=self.on_request,
            accept={'application/json'},
            prefetch_count=1,
        )]

    def on_request(self, message):
        n = message.payload['n']
        print(' [.] fib({0})'.format(n))
        result = fib(n)

        self.producer.publish(
            {'result': result},
            exchange='', routing_key=message.properties['reply_to'],
            correlation_id=message.properties['correlation_id'],
            serializer='json',
            retry=True,
        )
        message.ack()

参见

examples/rpc-tut6/ 在Github储存库中。

高级主题

RabbitMQ

消费者的优先事项

RabbitMQ定义了AMQP协议的使用者优先级扩展,可以通过设置 x-priority 参数为 basic.consume

在Kombu中,您可以在 Queue ,如下所示:

queue = Queue('name', Exchange('exchange_name', type='direct'),
              consumer_arguments={'x-priority': 10})

点击此处阅读更多有关消费者优先事项的信息:https://www.rabbitmq.com/consumer-priority.html

参考

class kombu.Consumer(channel, queues=None, no_ack=None, auto_declare=None, callbacks=None, on_decode_error=None, on_message=None, accept=None, prefetch_count=None, tag_prefix=None)[源代码]

消息消费者。

论点:

Channel(kombu.Connection,Channel):请参见 channel 。队列(序列 [kombu.Queue] ):请参见 queues 。No_ack(Bool):请参见 no_ack 。AUTO_DECLARE(Bool):请参见 auto_declare 回调(顺序 [Callable] ):请参见 callbacks 。On_Message(可调用):请参阅 on_message ON_DECODE_ERROR(可调用):请参见 on_decode_error 。Prefetch_count(Int):请参见 prefetch_count

exception ContentDisallowed

消费者不允许此内容类型。

accept = None

接受的内容类型列表。

如果使用者接收到具有不受信任内容类型的消息,则会引发异常。默认情况下,接受所有内容类型,但不接受 kombu.disable_untrusted_serializers() 在这种情况下,只允许使用json。

add_queue(queue)[源代码]

将队列添加到要从中消费的队列列表中。

注:

这将不会从队列开始消耗,因为您将不得不调用 consume() 之后。

auto_declare = True

默认情况下,所有实体都将在实例化时声明,如果您希望手动处理此操作,可以将其设置为 False

callbacks = None

收到消息时按顺序调用的回调列表。

回调的签名必须带有两个参数: (body, message) ,它是已解码的消息正文和 Message 举个例子。

cancel()[源代码]

结束所有活动队列使用者。

注:

这不会影响已传递的消息,但它确实意味着服务器不会为该使用者发送更多消息。

cancel_by_queue(queue)[源代码]

按队列名称取消使用者。

channel = None

要用于此使用者的连接/通道。

close()

结束所有活动队列使用者。

注:

这不会影响已传递的消息,但它确实意味着服务器不会为该使用者发送更多消息。

consume(no_ack=None)[源代码]

开始消费消息。

可以多次调用,但请注意,虽然它将使用自上次调用以来添加的新队列,但它不会取消从删除的队列中使用(使用 cancel_by_queue() )。

论点:

No_ack(Bool):请参见 no_ack

consuming_from(queue)[源代码]

返回 True 如果当前正在从队列中消费‘。

declare()[源代码]

声明队列、交换和绑定。

注:

这是在实例化时自动完成的,当 auto_declare 已经设置好了。

flow(active)[源代码]

启用/禁用来自对等设备的流量。

这是一种简单的流量控制机制,对等方可以使用它来避免使其队列溢出或发现自己接收的消息多于其可以处理的消息。

接收到停止发送内容的请求的对等点将完成发送当前内容(如果有),然后等待,直到流被重新激活。

no_ack = None

用于自动消息确认的标志。如果启用,代理将自动确认消息。这可以提高性能,但意味着您无法控制何时删除邮件。

默认情况下禁用。

on_decode_error = None

消息无法解码时调用的回调。

回调的签名必须带有两个参数: (message, exc) ,这是无法解码的消息,以及尝试解码它时发生的异常。

on_message = None

每当收到消息时调用的可选函数。

定义后,将调用此函数,而不是 receive() 方法,以及 callbacks 将被禁用。

所以这可以用来替代 callbacks 当你不想让身体被自动解码的时候。请注意,如果消息具有 compression 标题集。

回调的签名必须采用单个参数,即 Message 对象。

另请注意, message.body 属性,它是消息体的原始内容,在某些情况下可能是只读的 buffer 对象。

prefetch_count = None

初始预取计数

如果设置,消费者将在启动时设置PREFETCH_COUNT Qos值。也可以使用以下命令更改 qos()

purge()[源代码]

从所有队列中清除邮件。

警告:

这将 delete all ready messages ,则不存在撤消操作。

qos(prefetch_size=0, prefetch_count=0, apply_global=False)[源代码]

指定服务质量。

客户端可以请求提前发送消息,以便当客户端完成处理消息时,下面的消息已经保存在本地,而不需要沿着通道发送。预回迁可以提高性能。

则忽略预取窗口。如果 no_ack 选项已设置。

论点:

PREFETCH_SIZE(Int):以八位字节为单位指定预取窗口。

如果消息大小等于或小于可用预取大小(也在其他预取限制内),服务器将提前发送消息。可以设置为零,这意味着“没有特定的限制”,尽管其他预取限制仍然适用。

Prefetch_count(Int):指定预取窗口

完整的信息。

APPLY_GLOBAL(Bool):在所有通道上全局应用新设置。

property queues

单人间 Queue ,或要从中消费的队列列表。

receive(body, message)[源代码]

在收到消息时调用的方法。

此邮件发送给已注册的 callbacks

论点:

Body(Any):解码后的消息正文。Message(~kombu.Message):消息实例。

引发未实现的错误:

如果没有消费者回调:注册。

recover(requeue=False)[源代码]

重新传递未确认的邮件。

要求代理在指定通道上重新传递所有未确认的消息。

论点:

重新排队(Bool):默认情况下将重新传递消息

给最初的收件人。使用 requeue 设置为True时,服务器将尝试重新排队该消息,然后可能会将其传递给其他订阅者。

register_callback(callback)[源代码]

注册一个要在收到消息时调用的新回调。

注:

回调的签名需要接受两个参数: (body, message) ,它是已解码的消息正文和 Message 举个例子。

revive(channel)[源代码]

在连接中断后重振消费者。