This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

混合课程- kombu.mixins

米辛斯。

class kombu.mixins.ConsumerMixin[源代码]

为实施消费者计划提供便利的混合。

它可以在线程外部使用,也可以与线程一起使用,也可以与绿线程(ventlet/gEvent)一起使用。

基类将需要一个 connection 属性,该属性必须是 Connection 实例,并定义一个 get_consumers() 方法,该方法返回 kombu.Consumer 要使用的实例。支持多个用户非常重要,这样才能将多个通道用于不同的服务质量要求。

示例:

class Worker(ConsumerMixin):
    task_queue = Queue('tasks', Exchange('tasks'), 'tasks')

    def __init__(self, connection):
        self.connection = None

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.task_queue],
                         callbacks=[self.on_task])]

    def on_task(self, body, message):
        print('Got task: {0!r}'.format(body))
        message.ack()
\* :meth:`extra_context`

可选的额外上下文管理器,将在设置连接和使用者后进入。

接受论点 (connection, channel)

\* :meth:`on_connection_error`

如果连接丢失/或不可用,则调用处理程序。

接受论点 (exc, interval) ,其中间隔是重试连接的时间(以秒为单位)。

默认处理程序将记录该异常。

\* :meth:`on_connection_revived`

在连接失败后重新建立连接时立即调用处理程序。

不需要争论。

\* :meth:`on_consume_ready`

当使用者准备好接受消息时调用处理程序。

接受论点 (connection, channel, consumers) 。另请参阅关键字参数 consume 被转发到此处理程序。

\* :meth:`on_consume_end`

在消费者被取消后调用的处理程序。接受论点 (connection, channel)

\* :meth:`on_iteration`

处理程序在排出事件时为每个迭代调用。

不需要争论。

\* :meth:`on_decode_error`

如果使用者无法解码消息体,则处理程序调用。

接受论点 (message, exc) 其中Message是原始消息对象。

默认处理程序将记录错误并确认消息,因此如果您重写,请确保调用Super,或者您自己执行这些步骤。

Consumer()[源代码]
property channel_errors
connect_max_retries = None

尝试重新建立连接的最大重试次数(如果连接丢失/不可用)。

property connection_errors
consume(limit=None, timeout=None, safety_interval=1, **kwargs)[源代码]
consumer_context(**kwargs)[源代码]
create_connection()[源代码]
establish_connection()[源代码]
extra_context(connection, channel)[源代码]
get_consumers(Consumer, channel)[源代码]
maybe_conn_error(fun)[源代码]

使用 kombu.common.ignore_errors() 取而代之的是。

on_connection_error(exc, interval)[源代码]
on_connection_revived()[源代码]
on_consume_end(connection, channel)[源代码]
on_consume_ready(connection, channel, consumers, **kwargs)[源代码]
on_decode_error(message, exc)[源代码]
on_iteration()[源代码]
property restart_limit
run(_tokens=1, **kwargs)[源代码]
should_stop = False

当它设置为True时,使用者应该停止使用并返回,这样如果它是线程的实现,就可以联接它。

class kombu.mixins.ConsumerProducerMixin[源代码]

消费者和生产者的混合。

具有单独连接的Consumer Mixin版本也可用于发布消息。

示例:

class Worker(ConsumerProducerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=Queue('foo'),
                         on_message=self.handle_message,
                         accept='application/json',
                         prefetch_count=10)]

    def handle_message(self, message):
        self.producer.publish(
            {'message': 'hello to you'},
            exchange='',
            routing_key=message.properties['reply_to'],
            correlation_id=message.properties['correlation_id'],
            retry=True,
        )
on_consume_end(connection, channel)[源代码]
property producer
property producer_connection