This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

ApacheQPID传输- kombu.transport.qpid

用于Kombu的QPID传输模块。

Qpid 交通工具使用 qpid-python 作为客户和 qpid-tools 用于经纪人管理。

使用此传输时,您必须安装必要的依赖项。这些依赖项可通过PYPI获得,并可使用pip命令安装:

$ pip install kombu[qpid]

或者手动安装要求:

$ pip install qpid-tools qpid-python

Python3和PyPy限制

由于底层依赖项不兼容,QPID传输不支持Python3或PyPy环境。此版本经过测试,可与Python2.7配合使用。

功能

  • 类型:原生

  • 支持Direct:是

  • 支持主题:是

  • 支持扇出:支持

  • 支持优先级:是

  • 支持TTL:支持

身份验证

此传输支持使用QPID代理进行SASL身份验证。通常,SASL机制是根据可能的机制的客户端列表和服务器列表协商的,但在实践中,不同的SASL客户端库提供了不同的行为。这些不同的行为导致预期的SASL机制在许多情况下不被选择。因此,此传输根据下表限制了基于Kombu配置的机制类型。

Broker String

SASL Mechanism

Qid://主机名/

ANONYMOUS

Qid://用户名:Password@Hostname/

PLAIN

请参阅下面的说明

EXTERNAL

用户可以覆盖上述SASL选择行为,并使用 login_method 参数设置为 Connection 对象。该字符串可以是单个SASL机制或以空格分隔的SASL机制列表。如果您将芹菜与Kombu一起使用,则可以通过设置 BROKER_LOGIN_METHOD 可选的芹菜。

备注

在使用SSL时,QPID用户可能希望覆盖SASL机制以使用 EXTERNAL 。在这种情况下,QPID需要提供与 CN SSL客户端证书的。确保代理字符串包含相应的用户名。例如,如果客户端证书具有 CN=asdf 并且客户端连接到 example.com 在端口5671上,代理字符串应为:

qpid://asdf@example.com:5671/

交通选择

这个 transport_options 参数设置为 Connection 对象被直接传递给 qpid.messaging.endpoints.Connection 作为关键字参数。这些选项覆盖并替换任何其他默认值或指定值。如果使用芹菜,这可以通过设置 BROKER_TRANSPORT_OPTIONS 可选的芹菜。

运输

class kombu.transport.qpid.Transport(*args, **kwargs)[源代码]

QPID代理的Kombu本机传输。

为Kombu提供本机传输,允许消费者和生产者从代理读取消息或向代理写入消息。该传输能够同时支持同步和异步读取。所有写入都是通过 Channel 支持此传输的对象。

异步读取是使用调用 drain_events() ,它同步读取异步获取的消息,然后通过调用 Connection 对象。

传输还提供了建立和关闭与代理的连接的方法。此传输建立了类似工厂的模式,允许使用单例模式将所有连接合并为单个连接。

交通工具可以创建 Channel 对象与代理进行通信时使用 create_channel() 方法。

传输根据Kombu 3.0接口识别可恢复的连接错误和可恢复的通道错误。这些异常以元组的形式列出,并存储在传输类属性中 recoverable_connection_errorsrecoverable_channel_errors 分别进行了分析。引发的任何异常如果不是这些元组的成员,都被认为是不可恢复的。这使得Kombu对某些操作的自动重试的支持能够正常运行。

为了向后兼容Kombu 3.0之前的异常接口,还列出了以下可恢复的错误 connection_errorschannel_errors

class Connection(**connection_options)

QPID连接。

将连接对象封装为 Transport

参数:
  • host -- 连接应连接到的主机。

  • port -- 连接应连接到的端口。

  • username -- 连接应使用的用户名。可选的。

  • password -- 连接应使用的密码。可选,但需要用户名。

  • transport -- 连接应使用的传输类型。‘tcp’或‘ssl’应为值。

  • timeout -- 连接连接到代理时使用的超时。

  • sasl_mechanisms -- 要使用的SASL身份验证机制类型。有关有效值的说明,请参阅SASL文档。

备注

消息传递有一个AuthenticationFailure异常类型,但却引发了一条ConnectionError消息,指出在这些情况下发生了身份验证失败。ConnectionError被列为可恢复的错误类型,因此如果引发ConnectionError,Kombu将尝试重试。在不调整凭据的情况下重试该操作是不正确的,因此此方法专门检查是否存在指示发生身份验证失败的ConnectionError。在这些情况下,错误类型会发生变化,同时保留原始消息并引发错误,因此kombu将允许将异常视为不可恢复。

连接对象是由 Transport 在呼叫期间 establish_connection() 。这个 Transport 将连接选项作为关键字传入,这些关键字应用于创建的任何连接。每个 Transport 只创建一个连接。

Connection对象维护对 Connection 可以通过名为的绑定getter方法访问 get_qpid_connection() 方法。每个通道使用每个通道的连接 BrokerAgent ,并且传输为所有发送者和接收者维护会话。

Connection对象还负责维护在收到消息时应调用的回调引用的字典。这些回调保存在_CALLBACKS中,并以与收到的消息相关联的队列名称为关键字。_回调在中设置 Channel.basic_consume() ,已删除 Channel.basic_cancel() ,并叫来了 Transport.drain_events()

以下键应至少作为关键字参数传入:

所有关键字参数都收集到CONNECTION_OPTIONS字典中,并直接传递到 qpid.messaging.endpoints.Connection.establish()

class Channel(connection, transport)

支持代理配置和消息发送和接收。

参数:

频道对象被设计为与AMQP 0-10及更早版本中定义的频道具有方法奇偶性,这允许以下代理操作:

  • Exchange声明和删除

  • 队列声明和删除

  • 对绑定和解除绑定操作进行排队

  • 队列长度和清除操作

  • 发送/接收/拒绝消息

  • 对消息进行结构化、编码和解码

  • 支持同步和异步读取

  • 正在读取有关交换、队列和绑定的状态

通道设计为所有通道都与代理共享单个TCP连接,但在受益于共享的TCP连接的同时,提供与代理的隔离通信级别。频道被赋予了它的 Connection 对象设置为 Transport 这实例化了通道。

此频道继承自 StdChannel ,这使得这是一个“本地”频道,而不是一个“虚拟”频道,它将从 kombu.transports.virtual

使用此通道发送的消息将被分配一个Delivery_Tag。在消息准备发送时为消息生成Delivery_Tag basic_publish() 。对于每个频道实例,Delivery_Tag是唯一的。Delivery_Tag在其他对象中没有有意义的上下文,仅在此对象的内存中维护,并且基础 QoS 提供支持的对象。

每个频道对象恰好实例化一个 QoS 用于预取限制和异步打包的对象。这个 QoS 对象通过属性方法延迟实例化 qos() 。这个 QoS 对象是不应直接访问的支持对象,只能由频道本身访问。

对队列的同步读取是使用调用 basic_get() 它使用 _get() 来执行读数。这些方法立即读取,不接受任何形式的超时。 basic_get() 同步读取并在返回消息之前确认消息。在所有情况下都会执行ACK,因为使用qpid.Messaging读取消息但不确认它们的应用程序将会遇到内存泄漏。的no_ack参数 basic_get() 不会影响Acking功能。

通过使用启动使用者来完成对队列的异步读取 basic_consume() 。每次调用 basic_consume() 会引起一场 Receiver 要在上创建 Session 由 :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call to Transport.basic_drain() 发生。的预取计数值 QoS 对象是新接收器的容量值。新的接收器容量必须始终至少为1,否则看起来没有一个接收器可以读取,并且永远不会被读取。

每次调用 basic_consume() 创建一个使用者,该使用者被赋予一个使用者标记,该标记由 basic_consume() 。已启动的消费者可以通过其Consumer_tag使用以下命令取消使用 basic_cancel() 。消费者的取消会导致 Receiver 要关闭的对象。

通过支持异步消息打包 basic_ack() ,并由Delivery_Tag引用。Channel对象使用其 QoS 对象来执行消息打包。

class Message(payload, channel=None, **kwargs)

消息对象。

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
serializable()
class QoS(session, prefetch_count=1)

用于消息预取和打包的帮助器对象。

关键字参数:

prefetch_count -- 初始预取计数,硬设置为1。

注意:PREFETCH_COUNT当前硬设置为1,需要改进

此对象是使用 Channel 举个例子。服务质量允许 prefetch_count 设置为对应的未处理消息的数量 Channel 应允许预取。设置 prefetch_count 设置为0将禁用预取限制,并且对象可以容纳任意数量的消息。

消息是使用添加的 append() ,它们将一直保持,直到通过调用 ack() 。在收到ACK或关闭会话之前,代理不会将已接收但未确认的消息传递给另一个使用者。使用Delivery_Tag引用消息,每个消息都是唯一的 Channel 。传递标记在此对象外部进行管理,并与消息一起传递到 append() 。未确认的消息可以通过以下方式从服务质量中查找 get() 并且可以使用以下命令拒绝和忘记 reject()

ack(delivery_tag)

通过Delivery_Tag确认消息。

一旦消息被处理并且可以被代理忘记,就以异步方式调用。

参数:

delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。

append(message, delivery_tag)

将消息追加到未确认消息列表中。

添加一条消息,由Delivery_Tag引用,用于稍后删除、拒绝或获取。消息通过Delivery_Tag保存到DICT中。

参数:
  • message (qpid.messaging.Message) -- 尚未确认的已接收消息。

  • delivery_tag (uuid.UUID) -- 收到此消息时引用此消息的UUID。

can_consume()

如果是,则返回True Channel 可以使用更多的消息。

用于确保客户端遵守当前活动的预取限制。

返回:

如果此Qos对象可以接受更多消息而不违反prefetch_count,则为True。如果PREFETCH_COUNT为0,则CAN_Consumer将始终返回True。

返回类型:

bool

can_consume_max_estimate()

返回剩余消息容量。

返回一个估计的未处理消息数。 kombu.transport.qpid.Channel 可以接受而不超过 prefetch_count 。如果 prefetch_count 为0,则此方法返回1。

返回:

在不违反PREFETCH_COUNT的情况下可以提取的估计消息数。

返回类型:

int

get(delivery_tag)

通过Delivery_Tag获取未确认的消息。

如果使用无效的Delivery_Tag调用 KeyError 都被养大了。

参数:

delivery_tag (uuid.UUID) -- 与要返回的邮件关联的传递标记。

返回:

由Delivery_Tag查找的未被确认的邮件。

返回类型:

qpid.messaging.Message

reject(delivery_tag, requeue=False)

通过Delivery_Tag拒绝邮件。

显式通知代理与此Qos对象相关联的通道正在拒绝先前传递的消息。

如果RESEUE为FALSE,则消息不会重新排队以传递给另一个使用者。如果Requeue为True,则消息将重新排队以传递给另一个使用者。

参数:

delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。

关键字参数:

requeue -- 如果为True,则将通知代理重新排队该消息。如果为假,代理将被告知完全丢弃该消息。在这两种情况下,消息都将从此对象中删除。

basic_ack(delivery_tag, multiple=False)

通过Delivery_Tag确认消息。

确认Delivery_Tag引用的邮件。消息只能使用以下命令确认 basic_ack() 如果它们是通过以下方式获得的 basic_consume() 。这是异步读取行为的ACK部分。

在内部,此方法使用 QoS 对象,该对象存储消息并负责打包。

参数:
  • delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。

  • multiple (bool) -- 未实施。如果设置为True,则引发AssertionError。

basic_cancel(consumer_tag)

按消费者标签取消消费者。

请求消费者停止从其队列中读取消息。消费者是一个 Receiver ,并使用以下命令关闭 close()

此方法还清除消费者的所有挥之不去的引用。

参数:

consumer_tag (an immutable object) -- 指要注销的消费者的标签。最初在将使用者创建为参数时指定 basic_consume()

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)

启动从队列读取的异步使用者。

此方法启动类型为 Receiver 使用 Session 由创建并引用 Transport 从由名称指定的队列中读取消息,直到通过调用 basic_cancel()

消息稍后可通过同步调用 Transport.drain_events() ,它将从由此方法启动的使用者中排出。 Transport.drain_events() 是同步的,但通过网络接收消息是异步发生的,因此它应该仍然执行得很好。 Transport.drain_events() 调用此处随self.Message类型的消息一起提供的回调。

每个消费者都由一个Consumer_tag引用,该标签由该方法的调用方提供。

此方法在以队列名称为关键字的dict中设置对self.Connection对象的回调。 drain_events() 负责在收到消息时调用该回调。

的调用方处理完消息后,会将收到的所有消息都添加到要保存的Qos对象中,以备以后使用 drain_events() 。消息可以在通过调用 basic_ack()

如果no_ack为True,则no_ack标志指示消息的接收者不会调用 basic_ack() 后来。由于该消息稍后不会被确认,因此它将立即被确认。

basic_consume() 在调用回调之前转换消息对象类型。最初,该消息以 qpid.messaging.Message 。此方法解包 qpid.messaging.Message 并创建一个self.Message类型的新对象。

此方法将用户传递的回调包装在运行时构建的函数中,该函数提供来自 qpid.messaging.MessageMessage ,并将消息添加到关联的 QoS 对象用于异步定位(如有必要)。

参数:
  • queue (str) -- 要从中消费消息的队列的名称

  • no_ack (bool) -- 如果为True,则消息将不会被保存以供稍后访问,但将立即被访问。如果为False,则将保存消息以供稍后通过调用 basic_ack()

  • callback (a callable object) -- 当消息到达队列时将调用的可调用对象。

  • consumer_tag (an immutable object) -- 引用所创建的使用者的标记。需要使用此Consumer_Tag来取消消费者。

basic_get(queue, no_ack=False, **kwargs)

非阻塞单个消息按名称从队列获取和确认。

在内部,此方法使用 _get() 来获取消息。如果一个 Empty 异常由以下人员引发 _get() ,则此方法会将其静默并返回None。如果 _get() 不返回消息,则该消息被确认。No_ack参数对ACK行为没有影响,所有消息在所有情况下都被ACK。此方法从不将获取的消息添加到用于异步ACK的内部Qos对象。

此方法在方法传递时转换方法的对象类型。从经纪人那里拿到的, _get() 返回一个 qpid.messaging.Message ,但此方法获取 qpid.messaging.Message 并实例化一个 Message 使用基于self.Message的类设置的有效负载创建。

参数:

queue (str) -- 从中提取消息的队列名称。

关键字参数:

no_ack -- No_ack参数对此方法的ACK行为没有影响。未确认的消息会在qpid.Messaging中造成内存泄漏,并且在任何情况下都需要被确认。

返回:

收到的消息。

返回类型:

Message

basic_publish(message, exchange, routing_key, **kwargs)

使用路由键将消息发布到Exchange上。

使用ROUTING_KEY指定的路由关键字将消息发布到由名称指定的交换上。在发送邮件之前,通过以下方式准备邮件:

  • 对正文进行编码,使用 encode_body()

  • 将正文包装为缓冲区对象,以便

    qpid.messaging.endpoints.Sender 使用可以支持任意大消息的内容类型。

  • 将Delivery_Tag设置为随机uuid.uuid

  • 将Exchange和Routing_Key信息设置为Delivery_Info

内部使用 _put() 以同步发送消息。此消息通常由 kombu.messaging.Producer._publish 作为消息发布的最后一步。

参数:
  • message (dict) -- 包含关键字值与消息数据配对的字典。方法生成有效的消息字典。 prepare_message() 方法。

  • exchange (str) -- 要向其提交此消息的交换的名称。

  • routing_key (str) -- 要用作消息的路由密钥被提交到Exchange上。

basic_qos(prefetch_count, *args)

变化 QoS 此频道的设置。

设置此通道可以获取和保留的未确认消息的数量。PREFETCH_VALUE还用作任何新 Receiver 物体。

目前,该值被硬编码为1。

参数:

prefetch_count (int) -- 没有用过。此方法被硬编码为1。

basic_reject(delivery_tag, requeue=False)

通过Delivery_Tag拒绝邮件。

拒绝频道已接收但尚未确认的消息。消息由其Delivery_Tag引用。

如果RESEUE为FALSE,则被拒绝的消息将被代理丢弃,并且不会传递给任何其他使用者。如果Requeue为True,则被拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给先前拒绝该邮件的同一使用者。

参数:

delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。

关键字参数:

requeue -- 如果为False,则被拒绝的消息将由代理丢弃,并且不会传递给任何其他消费者。如果为True,则拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给以前拒绝该邮件的同一使用者。

body_encoding = 'base64'

默认正文编码。注: transport_options['body_encoding'] 将覆盖此值。

close()

取消所有关联的消息并关闭频道。

这将取消所有使用者,方法是调用 basic_cancel() 对于每个已知的Consumer_Tag。它还会关闭self._Broker会话。隐式关闭会话会导致代理将所有未完成、未确认的消息视为未传递。

codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}

二进制<->ASCII编解码器。

decode_body(body, encoding=None)

使用可选指定的编码对正文进行解码。

编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。

参数:

body (str) -- 要编码的正文。

关键字参数:

encoding -- 要使用的编码类型。必须是self.codes中列出的受支持的编解码器。

返回:

如果指定了编码,则返回解码后的正文。如果未指定编码,则原封不动地返回正文。

返回类型:

str

encode_body(body, encoding=None)

使用可选指定的编码对正文进行编码。

编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。

参数:

body (str) -- 要编码的正文。

关键字参数:

encoding -- 要使用的编码类型。必须是self.codes中列出的受支持的编解码器。

返回:

如果指定了编码,则返回一个元组,第一个位置为编码体,第二个位置为编码使用的位置。如果未指定编码,则原封不动地传递正文。

返回类型:

tuple

exchange_declare(exchange='', type='direct', durable=False, **kwargs)

创建一个新的交易所。

创建特定类型的交换,并可选择使该交换持久。如果请求的名称的交换已经存在,则不会采取任何操作,也不会引发任何异常。持久交易所将在经纪商重启后存活下来,而非持久交易所则不会。

交易所根据其类型提供行为。预期的行为是在AMQP 0-10和之前的规范中定义的行为,包括‘直接’、‘主题’和‘扇出’功能。

关键字参数:
  • type -- 交换类型。有效值包括‘DIRECT’、‘TOPIC’和‘FANOUT’。

  • exchange -- 要创建的交换的名称。如果未指定Exchange,则将使用空字符串作为名称。

  • durable -- 如果交换应该是持久的,则为True,否则为False。

exchange_delete(exchange_name, **kwargs)

删除按名称指定的交换。

参数:

exchange_name (str) -- 要删除的交换的名称。

prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)

准备要发送的消息数据。

此消息通常由 kombu.messaging.Producer._publish() 作为消息发布中的准备步骤。

参数:

body (str) -- 消息的正文

关键字参数:
  • priority -- 0到9之间的数字,用于设置消息的优先级。

  • content_type -- 消息正文应被视为的Content_Type。如果未设置此项,则 qpid.messaging.endpoints.Sender 对象尝试从正文中自动检测Content_type。

  • content_encoding -- 消息正文的Content_Coding编码为。

  • headers -- 应设置的其他邮件标头。作为键-值对传入。

  • properties -- 要在消息上设置的消息属性。

返回:

返回封装消息属性的Dict对象。有关可以设置的属性的详细信息,请参见参数。

返回类型:

dict

property qos

QoS 此渠道的经理。

延迟实例化类型为 QoS 在访问self.qos属性时。

返回:

已存在的或新创建的Qos对象

返回类型:

QoS

queue_bind(queue, exchange, routing_key, **kwargs)

使用绑定密钥将队列绑定到交换。

使用特定的绑定密钥将按名称指定的队列绑定到按名称指定的交换。要成功完成绑定,代理上必须已存在队列和交换。队列可以使用不同的密钥多次绑定到交换。

参数:
  • queue (str) -- 要绑定的队列的名称。

  • exchange (str) -- 队列应绑定到的交换的名称。

  • routing_key (str) -- 指定队列应绑定到指定交换的绑定键。

queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)

创建由名称指定的新队列。

如果该队列已经存在,则不会对该队列进行任何更改,并且返回值将返回有关现有队列的信息。

队列名称是必需的,并指定为第一个参数。

如果PASSIVE为True,服务器将不创建队列。客户端可以使用它来检查队列是否存在,而无需修改服务器状态。默认值为FALSE。

如果耐久为True,则队列将是耐久的。服务器重新启动时,持久队列保持活动状态。如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。默认值为FALSE。

如果EXCLUSIVE为True,则队列将是独占的。独占队列只能由当前连接使用。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。默认值为FALSE。

如果AUTO_DELETE为True,则在所有使用者使用完队列后将其删除。最后一个消费者可以明确取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。默认为True。

未使用NoWait参数。它是0-9-1协议的一部分,但此AMQP客户端实现了0-10,删除了nowait选项。

Arguments参数是用于声明队列的一组参数。参数作为字典传递或不传递。如果被动为True,则忽略此字段。默认设置为无。

此方法返回一个 namedtuple 其名称为‘QUEUE_DECLARE_OK_T’,队列名称为‘QUEUE’,队列上的消息计数为‘MESSAGE_COUNT’,活动使用者数量为‘Consumer_COUNT’。命名的元组值分别按Queue、Message_Count和Consumer_Count排序。

由于芹菜对事件的非ACK,因此在以字符串‘celeryev’开头或以字符串‘pidbox’结尾的任何队列上都设置了环策略。这些是西芹事件队列,西芹不会对它们进行确认,从而导致消息堆积。最终,除非设置了“环”策略,否则QPID将停止为消息提供服务,此时支持队列的缓冲区将变为循环。

参数:
  • queue (str) -- 要创建的队列的名称。

  • passive (bool) -- 如果为True,则服务器不会创建队列。

  • durable (bool) -- 如果为True,则队列将是持久的。

  • exclusive (bool) -- 如果为True,则队列将是独占的。

  • auto_delete (bool) -- 如果为True,则在所有使用者使用完队列后将其删除。

  • nowait (bool) -- 由于0-10规范不包括该参数,因此不使用该参数。

  • arguments (dict or None) -- 用于声明队列的一组参数。

返回:

将声明的队列表示为命名元组的命名元组。元组值按队列、消息计数和活动使用者计数排序。

返回类型:

namedtuple

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)

按名称删除队列。

删除按名称指定的队列。使用IF_UNUSED关键字参数,只有在绑定了0个使用者时,才能执行删除操作。使用IF_EMPTY关键字参数,仅当队列中有0条消息时才能进行删除。

参数:

queue (str) -- 要删除的队列的名称。

关键字参数:
  • if_unused -- 如果为True,则仅当队列有0个使用者时才删除。如果为False,则删除队列,即使绑定了使用者也是如此。

  • if_empty -- 如果为True,则仅在队列为空时删除该队列。如果为False,则在队列为空或不为空时将其删除。

queue_purge(queue, **kwargs)

从队列中删除所有未送达的邮件。

从按名称指定的队列中清除所有未送达的邮件。如果队列不存在,则会引发异常。首先检查队列消息深度,然后要求代理清除该数量的消息。返回请求清除的消息的整数个。实际清除的邮件数可能与请求的要清除的邮件数不同。

有时会要求清除已传递的邮件,但不会。这种情况以静默方式失败,这是已传递给另一个使用者的消息的正确行为,该使用者尚未确认该消息,并且仍与代理有活动会话。在这种情况下,清除消息是不安全的,并将由代理保留。客户端无法更改此传递行为。

在内部,此方法依赖于 _purge()

参数:

queue (str) -- 应删除所有消息的队列的名称。

返回:

请求清除的邮件数。

返回类型:

int

加薪:

qpid.messaging.exceptions.NotFound 如果找不到要清除的队列。

queue_unbind(queue, exchange, routing_key, **kwargs)

解除队列与具有给定绑定密钥的交换的绑定。

从已使用绑定键绑定的、由名称指定的交换取消绑定由名称指定的队列。队列和交换必须已存在于代理上,并与绑定密钥绑定,才能成功完成操作。队列可以使用不同的密钥多次绑定到交换,因此绑定密钥是显式解绑的必填字段。

参数:
  • queue (str) -- 要解除绑定的队列的名称。

  • exchange (str) -- 队列应从其解除绑定的交换的名称。

  • routing_key (str) -- 应解除绑定的指定队列和指定交换之间的现有绑定密钥。

typeof(exchange, default='direct')

获取交换类型。

查找并返回按名称指定的交换的交换类型。交换类型应为“DIRECT”、“TOPIC”和“FAND OUT”,它们与AMQP 0-10及更早版本中指定的交换功能相对应。如果找不到交换,则返回默认的交换类型。

参数:

exchange (str) -- 要查找其类型的交换。

关键字参数:

default -- 当交换不存在时要采用的交换类型。

返回:

交换类型为‘DIRECT’、‘TOPIC’或‘FAND’。

返回类型:

str

close()

关闭连接。

关闭连接将关闭该连接使用的所有关联会话、发送方或接收方。

close_channel(channel)

关闭一个频道。

方法的引用指定的通道 Channel 对象。

参数:

channel (Channel.) -- 应该关闭的频道。

get_qpid_connection()

返回已有的连接(单例)。

返回:

现有的qpid.Messaging.Connection

返回类型:

qpid.messaging.endpoints.Connection

channel_errors = (None,)

由于通道/方法故障而可能发生的错误元组。

close_connection(connection)[源代码]

关闭 Connection 对象。

参数:

connection (kombu.transport.qpid.Connection) -- 应该关闭的连接。

connection_errors = (None, <class 'OSError'>)

由于连接故障而可能发生的错误的元组。

create_channel(connection)[源代码]

创建并返回 Channel

创建一个新频道,并将该频道追加到连接已知的频道列表中。一旦创建了新的频道,就会将其返回。

参数:

connection (kombu.transport.qpid.Connection) -- 应该支持新的 Channel

返回:

制作的新频道。

返回类型:

kombu.transport.qpid.Channel

property default_connection_params

返回带有默认连接参数的DICT。

只要传输的创建者没有指定必需的参数,就会使用这些连接参数。

返回:

包含默认参数的字典。

返回类型:

dict

drain_events(connection, timeout=0, **kwargs)[源代码]

处理和调用所有就绪传输消息的回调。

从所有已准备好的事件中排出 Receiver 它们正在异步获取消息。

对于每个排空的消息,该消息被调用到相应的回调。回调按队列名称进行组织。

参数:

connection (kombu.transport.qpid.Connection) -- 这个 Connection 它包含按队列名称索引的回调,此方法将调用这些回调。

关键字参数:

timeout -- 限制此方法将运行的时间的超时。超时可能会中断正在等待新消息的阻塞读取,或者导致此方法在清空所有消息之前返回。默认为0。

driver_name = 'qpid'

驱动程序库的名称(例如‘py-amqp’、‘redis’)。

driver_type = 'qpid'

驱动程序的类型,可用于使用AMQP协议(DRIVER_TYPE:‘AMQP’)、Redis(DRIVER_TYPE:‘REDIS’)等来分隔传输...

establish_connection()[源代码]

建立连接对象。

确定在创建此传输所需的任何连接时要使用的正确选项,并创建 Connection 对象,该对象在需要时为生成的连接保存这些值。这些选项混合了通过传输的创建者传入的内容和提供的默认值 default_connection_params() 。选项包括代理网络设置、超时行为、身份验证和身份验证设置。

此方法还创建并存储 Session 使用 Connection 由此方法创建的。会话存储在Self上。

返回:

被创造的 Connection 对象,则返回。

返回类型:

Connection

implements = {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}
on_readable(connection, loop)[源代码]

处理与此传输关联的任何邮件。

此方法通过发出对self.r文件描述符的Read调用来从外部监视的文件描述符中清除单个消息,该调用删除了由QPID会话消息回调处理程序放入管道中的单个‘0’字符。一旦读取了“0”,所有可用的事件都会通过调用 drain_events()

文件描述符self.r被修改为非阻塞的,以确保在不再有消息时意外调用此方法不会导致无限阻塞。

预计不会有任何东西从 drain_events() 因为 drain_events() 控件上维护的回调来处理消息 Connection 对象。什么时候 drain_events() 返回时,所有关联的消息都已被处理。

此方法调用DRAIN_EVENTS(),它读取此传输可用的尽可能多的消息,然后返回。它阻止阅读和处理大量消息可能需要时间,但它不会阻止等待新消息到达。什么时候 drain_events() 称为超时,则未指定超时,这会导致此行为。

值得注意的一个有趣行为是,当多个消息准备就绪时,该方法从self.r中删除了一个‘0’字符,但是 drain_events() 可以处理任意数量的消息。在这种情况下,可能会在self.r上留下额外的‘0’字符以供读取,其中与这些‘0’字符对应的消息已经被处理。外部EPOLL循环将错误地认为其他数据已准备好读取,并将不必要地调用ON_READABLE,对要读取的每个‘0’调用一次。其他呼叫至 on_readable() 不会产生负面影响,并且最终会从self.r文件描述符中清除符号。如果新消息在此耗尽期间出现,也将得到适当处理。

参数:
polling_interval = None
recoverable_channel_errors = (None,)
recoverable_connection_errors = (None, <class 'OSError'>)
register_with_event_loop(connection, loop)[源代码]

使用循环注册文件描述符和回调。

注册当外部EPOLL循环发现注册的文件描述符可以读取时调用的回调self.on_Readable。文件描述符由该传输创建,并在消息可用时写入。

因为Support_ev==True,所以Celery希望调用此方法来使Transport有机会注册一个读文件描述符,以便由Celery使用事件I/O通知机制(如EPOLL)进行外部监视。还注册了一个回调,一旦外部EPOLL循环准备好处理与准备为此传输处理的消息相关联的EPOLL事件,就将调用该回调。

在实例化传输之后,每个传输只进行一次注册调用。

参数:
verify_runtime_environment()[源代码]

验证运行时环境是否可接受。

此方法作为 __init__ 并在Python3或PyPI环境中引发运行错误。此模块与Python3或PyPI不兼容。RuntimeError预先向用户指出了这一点,并建议改用Python2.6+。

此方法还检查是否安装了依赖项qpidtoollibs和qpid.Messaging。如果没有安装其中任何一个,则会引发运行错误。

加薪:

如果运行时环境不可接受,则返回RounmeError。

连接

class kombu.transport.qpid.Connection(**connection_options)[源代码]

QPID连接。

将连接对象封装为 Transport

参数:
  • host -- 连接应连接到的主机。

  • port -- 连接应连接到的端口。

  • username -- 连接应使用的用户名。可选的。

  • password -- 连接应使用的密码。可选,但需要用户名。

  • transport -- 连接应使用的传输类型。‘tcp’或‘ssl’应为值。

  • timeout -- 连接连接到代理时使用的超时。

  • sasl_mechanisms -- 要使用的SASL身份验证机制类型。有关有效值的说明,请参阅SASL文档。

备注

消息传递有一个AuthenticationFailure异常类型,但却引发了一条ConnectionError消息,指出在这些情况下发生了身份验证失败。ConnectionError被列为可恢复的错误类型,因此如果引发ConnectionError,Kombu将尝试重试。在不调整凭据的情况下重试该操作是不正确的,因此此方法专门检查是否存在指示发生身份验证失败的ConnectionError。在这些情况下,错误类型会发生变化,同时保留原始消息并引发错误,因此kombu将允许将异常视为不可恢复。

连接对象是由 Transport 在呼叫期间 establish_connection() 。这个 Transport 将连接选项作为关键字传入,这些关键字应用于创建的任何连接。每个 Transport 只创建一个连接。

Connection对象维护对 Connection 可以通过名为的绑定getter方法访问 get_qpid_connection() 方法。每个通道使用每个通道的连接 BrokerAgent ,并且传输为所有发送者和接收者维护会话。

Connection对象还负责维护在收到消息时应调用的回调引用的字典。这些回调保存在_CALLBACKS中,并以与收到的消息相关联的队列名称为关键字。_回调在中设置 Channel.basic_consume() ,已删除 Channel.basic_cancel() ,并叫来了 Transport.drain_events()

以下键应至少作为关键字参数传入:

所有关键字参数都收集到CONNECTION_OPTIONS字典中,并直接传递到 qpid.messaging.endpoints.Connection.establish()

class Channel(connection, transport)

支持代理配置和消息发送和接收。

参数:

频道对象被设计为与AMQP 0-10及更早版本中定义的频道具有方法奇偶性,这允许以下代理操作:

  • Exchange声明和删除

  • 队列声明和删除

  • 对绑定和解除绑定操作进行排队

  • 队列长度和清除操作

  • 发送/接收/拒绝消息

  • 对消息进行结构化、编码和解码

  • 支持同步和异步读取

  • 正在读取有关交换、队列和绑定的状态

通道设计为所有通道都与代理共享单个TCP连接,但在受益于共享的TCP连接的同时,提供与代理的隔离通信级别。频道被赋予了它的 Connection 对象设置为 Transport 这实例化了通道。

此频道继承自 StdChannel ,这使得这是一个“本地”频道,而不是一个“虚拟”频道,它将从 kombu.transports.virtual

使用此通道发送的消息将被分配一个Delivery_Tag。在消息准备发送时为消息生成Delivery_Tag basic_publish() 。对于每个频道实例,Delivery_Tag是唯一的。Delivery_Tag在其他对象中没有有意义的上下文,仅在此对象的内存中维护,并且基础 QoS 提供支持的对象。

每个频道对象恰好实例化一个 QoS 用于预取限制和异步打包的对象。这个 QoS 对象通过属性方法延迟实例化 qos() 。这个 QoS 对象是不应直接访问的支持对象,只能由频道本身访问。

对队列的同步读取是使用调用 basic_get() 它使用 _get() 来执行读数。这些方法立即读取,不接受任何形式的超时。 basic_get() 同步读取并在返回消息之前确认消息。在所有情况下都会执行ACK,因为使用qpid.Messaging读取消息但不确认它们的应用程序将会遇到内存泄漏。的no_ack参数 basic_get() 不会影响Acking功能。

通过使用启动使用者来完成对队列的异步读取 basic_consume() 。每次调用 basic_consume() 会引起一场 Receiver 要在上创建 Session 由 :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call to Transport.basic_drain() 发生。的预取计数值 QoS 对象是新接收器的容量值。新的接收器容量必须始终至少为1,否则看起来没有一个接收器可以读取,并且永远不会被读取。

每次调用 basic_consume() 创建一个使用者,该使用者被赋予一个使用者标记,该标记由 basic_consume() 。已启动的消费者可以通过其Consumer_tag使用以下命令取消使用 basic_cancel() 。消费者的取消会导致 Receiver 要关闭的对象。

通过支持异步消息打包 basic_ack() ,并由Delivery_Tag引用。Channel对象使用其 QoS 对象来执行消息打包。

class Message(payload, channel=None, **kwargs)

消息对象。

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
serializable()
class QoS(session, prefetch_count=1)

用于消息预取和打包的帮助器对象。

关键字参数:

prefetch_count -- 初始预取计数,硬设置为1。

注意:PREFETCH_COUNT当前硬设置为1,需要改进

此对象是使用 Channel 举个例子。服务质量允许 prefetch_count 设置为对应的未处理消息的数量 Channel 应允许预取。设置 prefetch_count 设置为0将禁用预取限制,并且对象可以容纳任意数量的消息。

消息是使用添加的 append() ,它们将一直保持,直到通过调用 ack() 。在收到ACK或关闭会话之前,代理不会将已接收但未确认的消息传递给另一个使用者。使用Delivery_Tag引用消息,每个消息都是唯一的 Channel 。传递标记在此对象外部进行管理,并与消息一起传递到 append() 。未确认的消息可以通过以下方式从服务质量中查找 get() 并且可以使用以下命令拒绝和忘记 reject()

ack(delivery_tag)

通过Delivery_Tag确认消息。

一旦消息被处理并且可以被代理忘记,就以异步方式调用。

参数:

delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。

append(message, delivery_tag)

将消息追加到未确认消息列表中。

添加一条消息,由Delivery_Tag引用,用于稍后删除、拒绝或获取。消息通过Delivery_Tag保存到DICT中。

参数:
  • message (qpid.messaging.Message) -- 尚未确认的已接收消息。

  • delivery_tag (uuid.UUID) -- 收到此消息时引用此消息的UUID。

can_consume()

如果是,则返回True Channel 可以使用更多的消息。

用于确保客户端遵守当前活动的预取限制。

返回:

如果此Qos对象可以接受更多消息而不违反prefetch_count,则为True。如果PREFETCH_COUNT为0,则CAN_Consumer将始终返回True。

返回类型:

bool

can_consume_max_estimate()

返回剩余消息容量。

返回一个估计的未处理消息数。 kombu.transport.qpid.Channel 可以接受而不超过 prefetch_count 。如果 prefetch_count 为0,则此方法返回1。

返回:

在不违反PREFETCH_COUNT的情况下可以提取的估计消息数。

返回类型:

int

get(delivery_tag)

通过Delivery_Tag获取未确认的消息。

如果使用无效的Delivery_Tag调用 KeyError 都被养大了。

参数:

delivery_tag (uuid.UUID) -- 与要返回的邮件关联的传递标记。

返回:

由Delivery_Tag查找的未被确认的邮件。

返回类型:

qpid.messaging.Message

reject(delivery_tag, requeue=False)

通过Delivery_Tag拒绝邮件。

显式通知代理与此Qos对象相关联的通道正在拒绝先前传递的消息。

如果RESEUE为FALSE,则消息不会重新排队以传递给另一个使用者。如果Requeue为True,则消息将重新排队以传递给另一个使用者。

参数:

delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。

关键字参数:

requeue -- 如果为True,则将通知代理重新排队该消息。如果为假,代理将被告知完全丢弃该消息。在这两种情况下,消息都将从此对象中删除。

basic_ack(delivery_tag, multiple=False)

通过Delivery_Tag确认消息。

确认Delivery_Tag引用的邮件。消息只能使用以下命令确认 basic_ack() 如果它们是通过以下方式获得的 basic_consume() 。这是异步读取行为的ACK部分。

在内部,此方法使用 QoS 对象,该对象存储消息并负责打包。

参数:
  • delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。

  • multiple (bool) -- 未实施。如果设置为True,则引发AssertionError。

basic_cancel(consumer_tag)

按消费者标签取消消费者。

请求消费者停止从其队列中读取消息。消费者是一个 Receiver ,并使用以下命令关闭 close()

此方法还清除消费者的所有挥之不去的引用。

参数:

consumer_tag (an immutable object) -- 指要注销的消费者的标签。最初在将使用者创建为参数时指定 basic_consume()

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)

启动从队列读取的异步使用者。

此方法启动类型为 Receiver 使用 Session 由创建并引用 Transport 从由名称指定的队列中读取消息,直到通过调用 basic_cancel()

消息稍后可通过同步调用 Transport.drain_events() ,它将从由此方法启动的使用者中排出。 Transport.drain_events() 是同步的,但通过网络接收消息是异步发生的,因此它应该仍然执行得很好。 Transport.drain_events() 调用此处随self.Message类型的消息一起提供的回调。

每个消费者都由一个Consumer_tag引用,该标签由该方法的调用方提供。

此方法在以队列名称为关键字的dict中设置对self.Connection对象的回调。 drain_events() 负责在收到消息时调用该回调。

的调用方处理完消息后,会将收到的所有消息都添加到要保存的Qos对象中,以备以后使用 drain_events() 。消息可以在通过调用 basic_ack()

如果no_ack为True,则no_ack标志指示消息的接收者不会调用 basic_ack() 后来。由于该消息稍后不会被确认,因此它将立即被确认。

basic_consume() 在调用回调之前转换消息对象类型。最初,该消息以 qpid.messaging.Message 。此方法解包 qpid.messaging.Message 并创建一个self.Message类型的新对象。

此方法将用户传递的回调包装在运行时构建的函数中,该函数提供来自 qpid.messaging.MessageMessage ,并将消息添加到关联的 QoS 对象用于异步定位(如有必要)。

参数:
  • queue (str) -- 要从中消费消息的队列的名称

  • no_ack (bool) -- 如果为True,则消息将不会被保存以供稍后访问,但将立即被访问。如果为False,则将保存消息以供稍后通过调用 basic_ack()

  • callback (a callable object) -- 当消息到达队列时将调用的可调用对象。

  • consumer_tag (an immutable object) -- 引用所创建的使用者的标记。需要使用此Consumer_Tag来取消消费者。

basic_get(queue, no_ack=False, **kwargs)

非阻塞单个消息按名称从队列获取和确认。

在内部,此方法使用 _get() 来获取消息。如果一个 Empty 异常由以下人员引发 _get() ,则此方法会将其静默并返回None。如果 _get() 不返回消息,则该消息被确认。No_ack参数对ACK行为没有影响,所有消息在所有情况下都被ACK。此方法从不将获取的消息添加到用于异步ACK的内部Qos对象。

此方法在方法传递时转换方法的对象类型。从经纪人那里拿到的, _get() 返回一个 qpid.messaging.Message ,但此方法获取 qpid.messaging.Message 并实例化一个 Message 使用基于self.Message的类设置的有效负载创建。

参数:

queue (str) -- 从中提取消息的队列名称。

关键字参数:

no_ack -- No_ack参数对此方法的ACK行为没有影响。未确认的消息会在qpid.Messaging中造成内存泄漏,并且在任何情况下都需要被确认。

返回:

收到的消息。

返回类型:

Message

basic_publish(message, exchange, routing_key, **kwargs)

使用路由键将消息发布到Exchange上。

使用ROUTING_KEY指定的路由关键字将消息发布到由名称指定的交换上。在发送邮件之前,通过以下方式准备邮件:

  • 对正文进行编码,使用 encode_body()

  • 将正文包装为缓冲区对象,以便

    qpid.messaging.endpoints.Sender 使用可以支持任意大消息的内容类型。

  • 将Delivery_Tag设置为随机uuid.uuid

  • 将Exchange和Routing_Key信息设置为Delivery_Info

内部使用 _put() 以同步发送消息。此消息通常由 kombu.messaging.Producer._publish 作为消息发布的最后一步。

参数:
  • message (dict) -- 包含关键字值与消息数据配对的字典。方法生成有效的消息字典。 prepare_message() 方法。

  • exchange (str) -- 要向其提交此消息的交换的名称。

  • routing_key (str) -- 要用作消息的路由密钥被提交到Exchange上。

basic_qos(prefetch_count, *args)

变化 QoS 此频道的设置。

设置此通道可以获取和保留的未确认消息的数量。PREFETCH_VALUE还用作任何新 Receiver 物体。

目前,该值被硬编码为1。

参数:

prefetch_count (int) -- 没有用过。此方法被硬编码为1。

basic_reject(delivery_tag, requeue=False)

通过Delivery_Tag拒绝邮件。

拒绝频道已接收但尚未确认的消息。消息由其Delivery_Tag引用。

如果RESEUE为FALSE,则被拒绝的消息将被代理丢弃,并且不会传递给任何其他使用者。如果Requeue为True,则被拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给先前拒绝该邮件的同一使用者。

参数:

delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。

关键字参数:

requeue -- 如果为False,则被拒绝的消息将由代理丢弃,并且不会传递给任何其他消费者。如果为True,则拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给以前拒绝该邮件的同一使用者。

body_encoding = 'base64'

默认正文编码。注: transport_options['body_encoding'] 将覆盖此值。

close()

取消所有关联的消息并关闭频道。

这将取消所有使用者,方法是调用 basic_cancel() 对于每个已知的Consumer_Tag。它还会关闭self._Broker会话。隐式关闭会话会导致代理将所有未完成、未确认的消息视为未传递。

codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}

二进制<->ASCII编解码器。

decode_body(body, encoding=None)

使用可选指定的编码对正文进行解码。

编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。

参数:

body (str) -- 要编码的正文。

关键字参数:

encoding -- 要使用的编码类型。必须是self.codes中列出的受支持的编解码器。

返回:

如果指定了编码,则返回解码后的正文。如果未指定编码,则原封不动地返回正文。

返回类型:

str

encode_body(body, encoding=None)

使用可选指定的编码对正文进行编码。

编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。

参数:

body (str) -- 要编码的正文。

关键字参数:

encoding -- 要使用的编码类型。必须是self.codes中列出的受支持的编解码器。

返回:

如果指定了编码,则返回一个元组,第一个位置为编码体,第二个位置为编码使用的位置。如果未指定编码,则原封不动地传递正文。

返回类型:

tuple

exchange_declare(exchange='', type='direct', durable=False, **kwargs)

创建一个新的交易所。

创建特定类型的交换,并可选择使该交换持久。如果请求的名称的交换已经存在,则不会采取任何操作,也不会引发任何异常。持久交易所将在经纪商重启后存活下来,而非持久交易所则不会。

交易所根据其类型提供行为。预期的行为是在AMQP 0-10和之前的规范中定义的行为,包括‘直接’、‘主题’和‘扇出’功能。

关键字参数:
  • type -- 交换类型。有效值包括‘DIRECT’、‘TOPIC’和‘FANOUT’。

  • exchange -- 要创建的交换的名称。如果未指定Exchange,则将使用空字符串作为名称。

  • durable -- 如果交换应该是持久的,则为True,否则为False。

exchange_delete(exchange_name, **kwargs)

删除按名称指定的交换。

参数:

exchange_name (str) -- 要删除的交换的名称。

prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)

准备要发送的消息数据。

此消息通常由 kombu.messaging.Producer._publish() 作为消息发布中的准备步骤。

参数:

body (str) -- 消息的正文

关键字参数:
  • priority -- 0到9之间的数字,用于设置消息的优先级。

  • content_type -- 消息正文应被视为的Content_Type。如果未设置此项,则 qpid.messaging.endpoints.Sender 对象尝试从正文中自动检测Content_type。

  • content_encoding -- 消息正文的Content_Coding编码为。

  • headers -- 应设置的其他邮件标头。作为键-值对传入。

  • properties -- 要在消息上设置的消息属性。

返回:

返回封装消息属性的Dict对象。有关可以设置的属性的详细信息,请参见参数。

返回类型:

dict

property qos

QoS 此渠道的经理。

延迟实例化类型为 QoS 在访问self.qos属性时。

返回:

已存在的或新创建的Qos对象

返回类型:

QoS

queue_bind(queue, exchange, routing_key, **kwargs)

使用绑定密钥将队列绑定到交换。

使用特定的绑定密钥将按名称指定的队列绑定到按名称指定的交换。要成功完成绑定,代理上必须已存在队列和交换。队列可以使用不同的密钥多次绑定到交换。

参数:
  • queue (str) -- 要绑定的队列的名称。

  • exchange (str) -- 队列应绑定到的交换的名称。

  • routing_key (str) -- 指定队列应绑定到指定交换的绑定键。

queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)

创建由名称指定的新队列。

如果该队列已经存在,则不会对该队列进行任何更改,并且返回值将返回有关现有队列的信息。

队列名称是必需的,并指定为第一个参数。

如果PASSIVE为True,服务器将不创建队列。客户端可以使用它来检查队列是否存在,而无需修改服务器状态。默认值为FALSE。

如果耐久为True,则队列将是耐久的。服务器重新启动时,持久队列保持活动状态。如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。默认值为FALSE。

如果EXCLUSIVE为True,则队列将是独占的。独占队列只能由当前连接使用。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。默认值为FALSE。

如果AUTO_DELETE为True,则在所有使用者使用完队列后将其删除。最后一个消费者可以明确取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。默认为True。

未使用NoWait参数。它是0-9-1协议的一部分,但此AMQP客户端实现了0-10,删除了nowait选项。

Arguments参数是用于声明队列的一组参数。参数作为字典传递或不传递。如果被动为True,则忽略此字段。默认设置为无。

此方法返回一个 namedtuple 其名称为‘QUEUE_DECLARE_OK_T’,队列名称为‘QUEUE’,队列上的消息计数为‘MESSAGE_COUNT’,活动使用者数量为‘Consumer_COUNT’。命名的元组值分别按Queue、Message_Count和Consumer_Count排序。

由于芹菜对事件的非ACK,因此在以字符串‘celeryev’开头或以字符串‘pidbox’结尾的任何队列上都设置了环策略。这些是西芹事件队列,西芹不会对它们进行确认,从而导致消息堆积。最终,除非设置了“环”策略,否则QPID将停止为消息提供服务,此时支持队列的缓冲区将变为循环。

参数:
  • queue (str) -- 要创建的队列的名称。

  • passive (bool) -- 如果为True,则服务器不会创建队列。

  • durable (bool) -- 如果为True,则队列将是持久的。

  • exclusive (bool) -- 如果为True,则队列将是独占的。

  • auto_delete (bool) -- 如果为True,则在所有使用者使用完队列后将其删除。

  • nowait (bool) -- 由于0-10规范不包括该参数,因此不使用该参数。

  • arguments (dict or None) -- 用于声明队列的一组参数。

返回:

将声明的队列表示为命名元组的命名元组。元组值按队列、消息计数和活动使用者计数排序。

返回类型:

namedtuple

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)

按名称删除队列。

删除按名称指定的队列。使用IF_UNUSED关键字参数,只有在绑定了0个使用者时,才能执行删除操作。使用IF_EMPTY关键字参数,仅当队列中有0条消息时才能进行删除。

参数:

queue (str) -- 要删除的队列的名称。

关键字参数:
  • if_unused -- 如果为True,则仅当队列有0个使用者时才删除。如果为False,则删除队列,即使绑定了使用者也是如此。

  • if_empty -- 如果为True,则仅在队列为空时删除该队列。如果为False,则在队列为空或不为空时将其删除。

queue_purge(queue, **kwargs)

从队列中删除所有未送达的邮件。

从按名称指定的队列中清除所有未送达的邮件。如果队列不存在,则会引发异常。首先检查队列消息深度,然后要求代理清除该数量的消息。返回请求清除的消息的整数个。实际清除的邮件数可能与请求的要清除的邮件数不同。

有时会要求清除已传递的邮件,但不会。这种情况以静默方式失败,这是已传递给另一个使用者的消息的正确行为,该使用者尚未确认该消息,并且仍与代理有活动会话。在这种情况下,清除消息是不安全的,并将由代理保留。客户端无法更改此传递行为。

在内部,此方法依赖于 _purge()

参数:

queue (str) -- 应删除所有消息的队列的名称。

返回:

请求清除的邮件数。

返回类型:

int

加薪:

qpid.messaging.exceptions.NotFound 如果找不到要清除的队列。

queue_unbind(queue, exchange, routing_key, **kwargs)

解除队列与具有给定绑定密钥的交换的绑定。

从已使用绑定键绑定的、由名称指定的交换取消绑定由名称指定的队列。队列和交换必须已存在于代理上,并与绑定密钥绑定,才能成功完成操作。队列可以使用不同的密钥多次绑定到交换,因此绑定密钥是显式解绑的必填字段。

参数:
  • queue (str) -- 要解除绑定的队列的名称。

  • exchange (str) -- 队列应从其解除绑定的交换的名称。

  • routing_key (str) -- 应解除绑定的指定队列和指定交换之间的现有绑定密钥。

typeof(exchange, default='direct')

获取交换类型。

查找并返回按名称指定的交换的交换类型。交换类型应为“DIRECT”、“TOPIC”和“FAND OUT”,它们与AMQP 0-10及更早版本中指定的交换功能相对应。如果找不到交换,则返回默认的交换类型。

参数:

exchange (str) -- 要查找其类型的交换。

关键字参数:

default -- 当交换不存在时要采用的交换类型。

返回:

交换类型为‘DIRECT’、‘TOPIC’或‘FAND’。

返回类型:

str

close()[源代码]

关闭连接。

关闭连接将关闭该连接使用的所有关联会话、发送方或接收方。

close_channel(channel)[源代码]

关闭一个频道。

方法的引用指定的通道 Channel 对象。

参数:

channel (Channel.) -- 应该关闭的频道。

get_qpid_connection()[源代码]

返回已有的连接(单例)。

返回:

现有的qpid.Messaging.Connection

返回类型:

qpid.messaging.endpoints.Connection

渠道

class kombu.transport.qpid.Channel(connection, transport)[源代码]

支持代理配置和消息发送和接收。

参数:

频道对象被设计为与AMQP 0-10及更早版本中定义的频道具有方法奇偶性,这允许以下代理操作:

  • Exchange声明和删除

  • 队列声明和删除

  • 对绑定和解除绑定操作进行排队

  • 队列长度和清除操作

  • 发送/接收/拒绝消息

  • 对消息进行结构化、编码和解码

  • 支持同步和异步读取

  • 正在读取有关交换、队列和绑定的状态

通道设计为所有通道都与代理共享单个TCP连接,但在受益于共享的TCP连接的同时,提供与代理的隔离通信级别。频道被赋予了它的 Connection 对象设置为 Transport 这实例化了通道。

此频道继承自 StdChannel ,这使得这是一个“本地”频道,而不是一个“虚拟”频道,它将从 kombu.transports.virtual

使用此通道发送的消息将被分配一个Delivery_Tag。在消息准备发送时为消息生成Delivery_Tag basic_publish() 。对于每个频道实例,Delivery_Tag是唯一的。Delivery_Tag在其他对象中没有有意义的上下文,仅在此对象的内存中维护,并且基础 QoS 提供支持的对象。

每个频道对象恰好实例化一个 QoS 用于预取限制和异步打包的对象。这个 QoS 对象通过属性方法延迟实例化 qos() 。这个 QoS 对象是不应直接访问的支持对象,只能由频道本身访问。

对队列的同步读取是使用调用 basic_get() 它使用 _get() 来执行读数。这些方法立即读取,不接受任何形式的超时。 basic_get() 同步读取并在返回消息之前确认消息。在所有情况下都会执行ACK,因为使用qpid.Messaging读取消息但不确认它们的应用程序将会遇到内存泄漏。的no_ack参数 basic_get() 不会影响Acking功能。

通过使用启动使用者来完成对队列的异步读取 basic_consume() 。每次调用 basic_consume() 会引起一场 Receiver 要在上创建 Session 由 :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call to Transport.basic_drain() 发生。的预取计数值 QoS 对象是新接收器的容量值。新的接收器容量必须始终至少为1,否则看起来没有一个接收器可以读取,并且永远不会被读取。

每次调用 basic_consume() 创建一个使用者,该使用者被赋予一个使用者标记,该标记由 basic_consume() 。已启动的消费者可以通过其Consumer_tag使用以下命令取消使用 basic_cancel() 。消费者的取消会导致 Receiver 要关闭的对象。

通过支持异步消息打包 basic_ack() ,并由Delivery_Tag引用。Channel对象使用其 QoS 对象来执行消息打包。

class Message(payload, channel=None, **kwargs)

使用的消息类。

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
serializable()
class QoS(session, prefetch_count=1)

将使用qos属性实例化的类引用。

ack(delivery_tag)

通过Delivery_Tag确认消息。

一旦消息被处理并且可以被代理忘记,就以异步方式调用。

参数:

delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。

append(message, delivery_tag)

将消息追加到未确认消息列表中。

添加一条消息,由Delivery_Tag引用,用于稍后删除、拒绝或获取。消息通过Delivery_Tag保存到DICT中。

参数:
  • message (qpid.messaging.Message) -- 尚未确认的已接收消息。

  • delivery_tag (uuid.UUID) -- 收到此消息时引用此消息的UUID。

can_consume()

如果是,则返回True Channel 可以使用更多的消息。

用于确保客户端遵守当前活动的预取限制。

返回:

如果此Qos对象可以接受更多消息而不违反prefetch_count,则为True。如果PREFETCH_COUNT为0,则CAN_Consumer将始终返回True。

返回类型:

bool

can_consume_max_estimate()

返回剩余消息容量。

返回一个估计的未处理消息数。 kombu.transport.qpid.Channel 可以接受而不超过 prefetch_count 。如果 prefetch_count 为0,则此方法返回1。

返回:

在不违反PREFETCH_COUNT的情况下可以提取的估计消息数。

返回类型:

int

get(delivery_tag)

通过Delivery_Tag获取未确认的消息。

如果使用无效的Delivery_Tag调用 KeyError 都被养大了。

参数:

delivery_tag (uuid.UUID) -- 与要返回的邮件关联的传递标记。

返回:

由Delivery_Tag查找的未被确认的邮件。

返回类型:

qpid.messaging.Message

reject(delivery_tag, requeue=False)

通过Delivery_Tag拒绝邮件。

显式通知代理与此Qos对象相关联的通道正在拒绝先前传递的消息。

如果RESEUE为FALSE,则消息不会重新排队以传递给另一个使用者。如果Requeue为True,则消息将重新排队以传递给另一个使用者。

参数:

delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。

关键字参数:

requeue -- 如果为True,则将通知代理重新排队该消息。如果为假,代理将被告知完全丢弃该消息。在这两种情况下,消息都将从此对象中删除。

basic_ack(delivery_tag, multiple=False)[源代码]

通过Delivery_Tag确认消息。

确认Delivery_Tag引用的邮件。消息只能使用以下命令确认 basic_ack() 如果它们是通过以下方式获得的 basic_consume() 。这是异步读取行为的ACK部分。

在内部,此方法使用 QoS 对象,该对象存储消息并负责打包。

参数:
  • delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。

  • multiple (bool) -- 未实施。如果设置为True,则引发AssertionError。

basic_cancel(consumer_tag)[源代码]

按消费者标签取消消费者。

请求消费者停止从其队列中读取消息。消费者是一个 Receiver ,并使用以下命令关闭 close()

此方法还清除消费者的所有挥之不去的引用。

参数:

consumer_tag (an immutable object) -- 指要注销的消费者的标签。最初在将使用者创建为参数时指定 basic_consume()

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)[源代码]

启动从队列读取的异步使用者。

此方法启动类型为 Receiver 使用 Session 由创建并引用 Transport 从由名称指定的队列中读取消息,直到通过调用 basic_cancel()

消息稍后可通过同步调用 Transport.drain_events() ,它将从由此方法启动的使用者中排出。 Transport.drain_events() 是同步的,但通过网络接收消息是异步发生的,因此它应该仍然执行得很好。 Transport.drain_events() 调用此处随self.Message类型的消息一起提供的回调。

每个消费者都由一个Consumer_tag引用,该标签由该方法的调用方提供。

此方法在以队列名称为关键字的dict中设置对self.Connection对象的回调。 drain_events() 负责在收到消息时调用该回调。

的调用方处理完消息后,会将收到的所有消息都添加到要保存的Qos对象中,以备以后使用 drain_events() 。消息可以在通过调用 basic_ack()

如果no_ack为True,则no_ack标志指示消息的接收者不会调用 basic_ack() 后来。由于该消息稍后不会被确认,因此它将立即被确认。

basic_consume() 在调用回调之前转换消息对象类型。最初,该消息以 qpid.messaging.Message 。此方法解包 qpid.messaging.Message 并创建一个self.Message类型的新对象。

此方法将用户传递的回调包装在运行时构建的函数中,该函数提供来自 qpid.messaging.MessageMessage ,并将消息添加到关联的 QoS 对象用于异步定位(如有必要)。

参数:
  • queue (str) -- 要从中消费消息的队列的名称

  • no_ack (bool) -- 如果为True,则消息将不会被保存以供稍后访问,但将立即被访问。如果为False,则将保存消息以供稍后通过调用 basic_ack()

  • callback (a callable object) -- 当消息到达队列时将调用的可调用对象。

  • consumer_tag (an immutable object) -- 引用所创建的使用者的标记。需要使用此Consumer_Tag来取消消费者。

basic_get(queue, no_ack=False, **kwargs)[源代码]

非阻塞单个消息按名称从队列获取和确认。

在内部,此方法使用 _get() 来获取消息。如果一个 Empty 异常由以下人员引发 _get() ,则此方法会将其静默并返回None。如果 _get() 不返回消息,则该消息被确认。No_ack参数对ACK行为没有影响,所有消息在所有情况下都被ACK。此方法从不将获取的消息添加到用于异步ACK的内部Qos对象。

此方法在方法传递时转换方法的对象类型。从经纪人那里拿到的, _get() 返回一个 qpid.messaging.Message ,但此方法获取 qpid.messaging.Message 并实例化一个 Message 使用基于self.Message的类设置的有效负载创建。

参数:

queue (str) -- 从中提取消息的队列名称。

关键字参数:

no_ack -- No_ack参数对此方法的ACK行为没有影响。未确认的消息会在qpid.Messaging中造成内存泄漏,并且在任何情况下都需要被确认。

返回:

收到的消息。

返回类型:

Message

basic_publish(message, exchange, routing_key, **kwargs)[源代码]

使用路由键将消息发布到Exchange上。

使用ROUTING_KEY指定的路由关键字将消息发布到由名称指定的交换上。在发送邮件之前,通过以下方式准备邮件:

  • 对正文进行编码,使用 encode_body()

  • 将正文包装为缓冲区对象,以便

    qpid.messaging.endpoints.Sender 使用可以支持任意大消息的内容类型。

  • 将Delivery_Tag设置为随机uuid.uuid

  • 将Exchange和Routing_Key信息设置为Delivery_Info

内部使用 _put() 以同步发送消息。此消息通常由 kombu.messaging.Producer._publish 作为消息发布的最后一步。

参数:
  • message (dict) -- 包含关键字值与消息数据配对的字典。方法生成有效的消息字典。 prepare_message() 方法。

  • exchange (str) -- 要向其提交此消息的交换的名称。

  • routing_key (str) -- 要用作消息的路由密钥被提交到Exchange上。

basic_qos(prefetch_count, *args)[源代码]

变化 QoS 此频道的设置。

设置此通道可以获取和保留的未确认消息的数量。PREFETCH_VALUE还用作任何新 Receiver 物体。

目前,该值被硬编码为1。

参数:

prefetch_count (int) -- 没有用过。此方法被硬编码为1。

basic_reject(delivery_tag, requeue=False)[源代码]

通过Delivery_Tag拒绝邮件。

拒绝频道已接收但尚未确认的消息。消息由其Delivery_Tag引用。

如果RESEUE为FALSE,则被拒绝的消息将被代理丢弃,并且不会传递给任何其他使用者。如果Requeue为True,则被拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给先前拒绝该邮件的同一使用者。

参数:

delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。

关键字参数:

requeue -- 如果为False,则被拒绝的消息将由代理丢弃,并且不会传递给任何其他消费者。如果为True,则拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给以前拒绝该邮件的同一使用者。

body_encoding = 'base64'

默认正文编码。注: transport_options['body_encoding'] 将覆盖此值。

close()[源代码]

取消所有关联的消息并关闭频道。

这将取消所有使用者,方法是调用 basic_cancel() 对于每个已知的Consumer_Tag。它还会关闭self._Broker会话。隐式关闭会话会导致代理将所有未完成、未确认的消息视为未传递。

codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}

二进制<->ASCII编解码器。

decode_body(body, encoding=None)[源代码]

使用可选指定的编码对正文进行解码。

编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。

参数:

body (str) -- 要编码的正文。

关键字参数:

encoding -- 要使用的编码类型。必须是self.codes中列出的受支持的编解码器。

返回:

如果指定了编码,则返回解码后的正文。如果未指定编码,则原封不动地返回正文。

返回类型:

str

encode_body(body, encoding=None)[源代码]

使用可选指定的编码对正文进行编码。

编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。

参数:

body (str) -- 要编码的正文。

关键字参数:

encoding -- 要使用的编码类型。必须是self.codes中列出的受支持的编解码器。

返回:

如果指定了编码,则返回一个元组,第一个位置为编码体,第二个位置为编码使用的位置。如果未指定编码,则原封不动地传递正文。

返回类型:

tuple

exchange_declare(exchange='', type='direct', durable=False, **kwargs)[源代码]

创建一个新的交易所。

创建特定类型的交换,并可选择使该交换持久。如果请求的名称的交换已经存在,则不会采取任何操作,也不会引发任何异常。持久交易所将在经纪商重启后存活下来,而非持久交易所则不会。

交易所根据其类型提供行为。预期的行为是在AMQP 0-10和之前的规范中定义的行为,包括‘直接’、‘主题’和‘扇出’功能。

关键字参数:
  • type -- 交换类型。有效值包括‘DIRECT’、‘TOPIC’和‘FANOUT’。

  • exchange -- 要创建的交换的名称。如果未指定Exchange,则将使用空字符串作为名称。

  • durable -- 如果交换应该是持久的,则为True,否则为False。

exchange_delete(exchange_name, **kwargs)[源代码]

删除按名称指定的交换。

参数:

exchange_name (str) -- 要删除的交换的名称。

prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)[源代码]

准备要发送的消息数据。

此消息通常由 kombu.messaging.Producer._publish() 作为消息发布中的准备步骤。

参数:

body (str) -- 消息的正文

关键字参数:
  • priority -- 0到9之间的数字,用于设置消息的优先级。

  • content_type -- 消息正文应被视为的Content_Type。如果未设置此项,则 qpid.messaging.endpoints.Sender 对象尝试从正文中自动检测Content_type。

  • content_encoding -- 消息正文的Content_Coding编码为。

  • headers -- 应设置的其他邮件标头。作为键-值对传入。

  • properties -- 要在消息上设置的消息属性。

返回:

返回封装消息属性的Dict对象。有关可以设置的属性的详细信息,请参见参数。

返回类型:

dict

property qos

QoS 此渠道的经理。

延迟实例化类型为 QoS 在访问self.qos属性时。

返回:

已存在的或新创建的Qos对象

返回类型:

QoS

queue_bind(queue, exchange, routing_key, **kwargs)[源代码]

使用绑定密钥将队列绑定到交换。

使用特定的绑定密钥将按名称指定的队列绑定到按名称指定的交换。要成功完成绑定,代理上必须已存在队列和交换。队列可以使用不同的密钥多次绑定到交换。

参数:
  • queue (str) -- 要绑定的队列的名称。

  • exchange (str) -- 队列应绑定到的交换的名称。

  • routing_key (str) -- 指定队列应绑定到指定交换的绑定键。

queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)[源代码]

创建由名称指定的新队列。

如果该队列已经存在,则不会对该队列进行任何更改,并且返回值将返回有关现有队列的信息。

队列名称是必需的,并指定为第一个参数。

如果PASSIVE为True,服务器将不创建队列。客户端可以使用它来检查队列是否存在,而无需修改服务器状态。默认值为FALSE。

如果耐久为True,则队列将是耐久的。服务器重新启动时,持久队列保持活动状态。如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。默认值为FALSE。

如果EXCLUSIVE为True,则队列将是独占的。独占队列只能由当前连接使用。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。默认值为FALSE。

如果AUTO_DELETE为True,则在所有使用者使用完队列后将其删除。最后一个消费者可以明确取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。默认为True。

未使用NoWait参数。它是0-9-1协议的一部分,但此AMQP客户端实现了0-10,删除了nowait选项。

Arguments参数是用于声明队列的一组参数。参数作为字典传递或不传递。如果被动为True,则忽略此字段。默认设置为无。

此方法返回一个 namedtuple 其名称为‘QUEUE_DECLARE_OK_T’,队列名称为‘QUEUE’,队列上的消息计数为‘MESSAGE_COUNT’,活动使用者数量为‘Consumer_COUNT’。命名的元组值分别按Queue、Message_Count和Consumer_Count排序。

由于芹菜对事件的非ACK,因此在以字符串‘celeryev’开头或以字符串‘pidbox’结尾的任何队列上都设置了环策略。这些是西芹事件队列,西芹不会对它们进行确认,从而导致消息堆积。最终,除非设置了“环”策略,否则QPID将停止为消息提供服务,此时支持队列的缓冲区将变为循环。

参数:
  • queue (str) -- 要创建的队列的名称。

  • passive (bool) -- 如果为True,则服务器不会创建队列。

  • durable (bool) -- 如果为True,则队列将是持久的。

  • exclusive (bool) -- 如果为True,则队列将是独占的。

  • auto_delete (bool) -- 如果为True,则在所有使用者使用完队列后将其删除。

  • nowait (bool) -- 由于0-10规范不包括该参数,因此不使用该参数。

  • arguments (dict or None) -- 用于声明队列的一组参数。

返回:

将声明的队列表示为命名元组的命名元组。元组值按队列、消息计数和活动使用者计数排序。

返回类型:

namedtuple

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)[源代码]

按名称删除队列。

删除按名称指定的队列。使用IF_UNUSED关键字参数,只有在绑定了0个使用者时,才能执行删除操作。使用IF_EMPTY关键字参数,仅当队列中有0条消息时才能进行删除。

参数:

queue (str) -- 要删除的队列的名称。

关键字参数:
  • if_unused -- 如果为True,则仅当队列有0个使用者时才删除。如果为False,则删除队列,即使绑定了使用者也是如此。

  • if_empty -- 如果为True,则仅在队列为空时删除该队列。如果为False,则在队列为空或不为空时将其删除。

queue_purge(queue, **kwargs)[源代码]

从队列中删除所有未送达的邮件。

从按名称指定的队列中清除所有未送达的邮件。如果队列不存在,则会引发异常。首先检查队列消息深度,然后要求代理清除该数量的消息。返回请求清除的消息的整数个。实际清除的邮件数可能与请求的要清除的邮件数不同。

有时会要求清除已传递的邮件,但不会。这种情况以静默方式失败,这是已传递给另一个使用者的消息的正确行为,该使用者尚未确认该消息,并且仍与代理有活动会话。在这种情况下,清除消息是不安全的,并将由代理保留。客户端无法更改此传递行为。

在内部,此方法依赖于 _purge()

参数:

queue (str) -- 应删除所有消息的队列的名称。

返回:

请求清除的邮件数。

返回类型:

int

加薪:

qpid.messaging.exceptions.NotFound 如果找不到要清除的队列。

queue_unbind(queue, exchange, routing_key, **kwargs)[源代码]

解除队列与具有给定绑定密钥的交换的绑定。

从已使用绑定键绑定的、由名称指定的交换取消绑定由名称指定的队列。队列和交换必须已存在于代理上,并与绑定密钥绑定,才能成功完成操作。队列可以使用不同的密钥多次绑定到交换,因此绑定密钥是显式解绑的必填字段。

参数:
  • queue (str) -- 要解除绑定的队列的名称。

  • exchange (str) -- 队列应从其解除绑定的交换的名称。

  • routing_key (str) -- 应解除绑定的指定队列和指定交换之间的现有绑定密钥。

typeof(exchange, default='direct')[源代码]

获取交换类型。

查找并返回按名称指定的交换的交换类型。交换类型应为“DIRECT”、“TOPIC”和“FAND OUT”,它们与AMQP 0-10及更早版本中指定的交换功能相对应。如果找不到交换,则返回默认的交换类型。

参数:

exchange (str) -- 要查找其类型的交换。

关键字参数:

default -- 当交换不存在时要采用的交换类型。

返回:

交换类型为‘DIRECT’、‘TOPIC’或‘FAND’。

返回类型:

str

消息

class kombu.transport.qpid.Message(payload, channel=None, **kwargs)[源代码]

消息对象。

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
serializable()[源代码]