This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

虚拟传输基类- kombu.transport.virtual

交通工具

class kombu.transport.virtual.Transport(client, **kwargs)[源代码]

虚拟交通工具。

论点:

客户端(kombu.Connection):这是其传输的客户端。

Channel = <class 'kombu.transport.virtual.base.Channel'>
Cycle = <class 'kombu.utils.scheduling.FairCycle'>
polling_interval = 1.0

在不成功的民意调查之间睡觉的时间到了。

default_port = None

未指定端口时使用的端口号。

state
cycle = None

FairCycle 实例用于公平地从通道中排出事件(由构造函数设置)。

establish_connection()[源代码]
close_connection(connection)[源代码]
create_channel(connection)[源代码]
close_channel(channel)[源代码]
drain_events(connection, timeout=None)[源代码]

渠道

class kombu.transport.virtual.AbstractChannel[源代码]

抽象通道接口。

这是一个抽象类,定义了您通常希望在虚拟通道中实现的通道方法。

注:

不要直接派生子类,而是继承 Channel

class kombu.transport.virtual.Channel(connection, **kwargs)[源代码]

虚拟频道。

论点:

Connection(ConnectionT):此传输实例

频道是的一部分。

Message = <class 'kombu.transport.virtual.base.Message'>

使用的消息类。

state

包含交换和绑定的代理状态。

qos

QoS 此渠道的经理。

do_restore = True

用于在通道超出范围时恢复未确认消息的标志。

exchange_types = {'direct': <class 'kombu.transport.virtual.exchange.DirectExchange'>, 'fanout': <class 'kombu.transport.virtual.exchange.FanoutExchange'>, 'topic': <class 'kombu.transport.virtual.exchange.TopicExchange'>}

交换类型和对应类的映射。

exchange_declare(exchange=None, type='direct', durable=False, auto_delete=False, arguments=None, nowait=False, passive=False)[源代码]

申报兑换。

exchange_delete(exchange, if_unused=False, nowait=False)[源代码]

删除 exchange 以及它的所有绑定。

queue_declare(queue=None, passive=False, **kwargs)[源代码]

声明队列。

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)[源代码]

删除队列。

queue_bind(queue, exchange=None, routing_key='', arguments=None, **kwargs)[源代码]

捆绑 queueexchange 使用 routing key

queue_purge(queue, **kwargs)[源代码]

从队列中删除所有就绪消息。

basic_publish(message, exchange, routing_key, **kwargs)[源代码]

发布消息。

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)[源代码]

消费来源: queue

basic_cancel(consumer_tag)[源代码]

按消费者标签取消消费者。

basic_get(queue, no_ack=False, **kwargs)[源代码]

直接访问(同步)获取消息。

basic_ack(delivery_tag, multiple=False)[源代码]

确认消息。

basic_recover(requeue=False)[源代码]

恢复未确认的邮件。

basic_reject(delivery_tag, requeue=False)[源代码]

拒绝邮件。

basic_qos(prefetch_size=0, prefetch_count=0, apply_global=False)[源代码]

更改此频道的服务质量设置。

注:

仅限 prefetch_count 受支持。

get_table(exchange)[源代码]

获取以下项的绑定表 exchange

typeof(exchange, default='direct')[源代码]

获取的交换类型实例 exchange

drain_events(timeout=None, callback=None)[源代码]
prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)[源代码]

准备消息数据。

message_to_python(raw_message)[源代码]

将原始消息转换为 Message 举个例子。

flow(active=True)[源代码]

启用/禁用消息流。

抛出:

NotImplementedError -- As Flow:不是由基本虚拟实现实现的。

close()[源代码]

关闭通道。

取消所有使用者,并重新排队未确认的消息。

消息

class kombu.transport.virtual.Message(payload, channel=None, **kwargs)[源代码]

消息对象。

exception MessageStateError

该消息已被确认。

add_note()

Exception.add_note(Note)--向异常添加注释

args
with_traceback()

Exception.with_traceback(TB)--set self.__traceback__ 去结核病,回到赛尔夫。

accept
ack(multiple=False)[源代码]

确认此消息正在处理中。

这将从队列中删除该消息。

抛出:

MessageStateError -- 如果消息已经:已确认/已重新排队/已拒绝。

ack_log_error(logger, errors, multiple=False)[源代码]
property acknowledged

如果消息已被确认,则设置为True。

body
channel
content_encoding
content_type
decode()[源代码]

反序列化消息体。

返回发布者发送的原始python结构。

注:

对返回值进行备注,使用 _decode 以迫使重新评估。

delivery_info
delivery_tag
errors = None
headers
property payload

解码后的消息正文。

properties
reject(requeue=False)[源代码]

拒绝此邮件。

该消息将被服务器丢弃。

抛出:

MessageStateError -- 如果消息已经:已确认/已重新排队/已拒绝。

reject_log_error(logger, errors, requeue=False)[源代码]
requeue()[源代码]

拒绝此邮件并将其放回队列中。

警告:

不能将此方法用作选择要处理的邮件的手段。

引发MessageStateError:

如果消息已经:已确认/已重新排队/已拒绝。

serializable()[源代码]

服务质量

class kombu.transport.virtual.QoS(channel, prefetch_count=0)[源代码]

服务质量保证。

仅支持 prefetch_count 在这点上。

论点:

Channel(Channel):连接通道。PREFETCH_COUNT(Int):初始预取次数,默认为0。

ack(delivery_tag)[源代码]

确认消息并从事务状态中删除。

append(message, delivery_tag)[源代码]

将消息追加到事务状态。

can_consume()[源代码]

如果可以从中消费频道,则返回TRUE。

用于确保客户端遵守当前活动的预取限制。

can_consume_max_estimate()[源代码]

返回允许返回的最大消息数。

从代理返回可能允许使用者一次使用的估计消息数。这用于这样的服务,其中批量‘GET MESSAGE’调用比许多单独的‘GET MESSAGE’调用更受欢迎--如SQS。

返回:

int

返回类型:

greater than zero.

get(delivery_tag)[源代码]
prefetch_count = 0

当前预取计数值

reject(delivery_tag, requeue=False)[源代码]

从事务状态中删除并重新排队消息。

restore_at_shutdown = True

如果禁用,未确认的邮件将不会在关机时恢复。

restore_unacked()[源代码]

恢复所有未确认的消息。

restore_unacked_once(stderr=None)[源代码]

在关机/GC收集时恢复所有未确认的消息。

注:

每个实例只能调用一次,后续调用将被忽略。

restore_visible(*args, **kwargs)[源代码]

恢复任何挂起的未获知的邮件。

要为VILECTIVE_TIMEOUT样式实现填写的。

注:

这是可选的实现,目前仅供Redis传输使用。

内存中状态

class kombu.transport.virtual.BrokerState(exchanges=None)[源代码]

代理状态保存交换、队列和绑定。

binding_declare(queue, exchange, routing_key, arguments)[源代码]
binding_delete(queue, exchange, routing_key)[源代码]
bindings = None

这是实际的绑定注册表,用于存储绑定并在固定时间内测试“in”关系。它的结构如下:

{
    (queue, exchange, routing_key): arguments,
    # ...,
}
clear()[源代码]
exchanges = None

将交换名称映射到 kombu.transport.virtual.exchange.ExchangeType

has_binding(queue, exchange, routing_key)[源代码]
queue_bindings(queue)[源代码]
queue_bindings_delete(queue)[源代码]
queue_index = None

队列索引用于直接(恒定时间)访问某个队列的所有绑定。它的结构如下:

{
    queue: {
        (queue, exchange, routing_key),
        # ...,
    },
    # ...,
}