This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
ApacheQPID传输- kombu.transport.qpid
¶
用于Kombu的QPID传输模块。
Qpid 交通工具使用 qpid-python 作为客户和 qpid-tools 用于经纪人管理。
使用此传输时,您必须安装必要的依赖项。这些依赖项可通过PYPI获得,并可使用pip命令安装:
$ pip install kombu[qpid]
或者手动安装要求:
$ pip install qpid-tools qpid-python
Python3和PyPy限制
由于底层依赖项不兼容,QPID传输不支持Python3或PyPy环境。此版本经过测试,可与Python2.7配合使用。
功能¶
类型:原生
支持Direct:是
支持主题:是
支持扇出:支持
支持优先级:是
支持TTL:支持
身份验证¶
此传输支持使用QPID代理进行SASL身份验证。通常,SASL机制是根据可能的机制的客户端列表和服务器列表协商的,但在实践中,不同的SASL客户端库提供了不同的行为。这些不同的行为导致预期的SASL机制在许多情况下不被选择。因此,此传输根据下表限制了基于Kombu配置的机制类型。
Broker String |
SASL Mechanism |
Qid://主机名/ |
ANONYMOUS |
Qid://用户名:Password@Hostname/ |
PLAIN |
请参阅下面的说明 |
EXTERNAL |
用户可以覆盖上述SASL选择行为,并使用 login_method
参数设置为 Connection
对象。该字符串可以是单个SASL机制或以空格分隔的SASL机制列表。如果您将芹菜与Kombu一起使用,则可以通过设置 BROKER_LOGIN_METHOD 可选的芹菜。
备注
在使用SSL时,QPID用户可能希望覆盖SASL机制以使用 EXTERNAL 。在这种情况下,QPID需要提供与 CN SSL客户端证书的。确保代理字符串包含相应的用户名。例如,如果客户端证书具有 CN=asdf 并且客户端连接到 example.com 在端口5671上,代理字符串应为:
qpid://asdf@example.com:5671/
交通选择¶
这个 transport_options
参数设置为 Connection
对象被直接传递给 qpid.messaging.endpoints.Connection
作为关键字参数。这些选项覆盖并替换任何其他默认值或指定值。如果使用芹菜,这可以通过设置 BROKER_TRANSPORT_OPTIONS 可选的芹菜。
运输¶
- class kombu.transport.qpid.Transport(*args, **kwargs)[源代码]¶
QPID代理的Kombu本机传输。
为Kombu提供本机传输,允许消费者和生产者从代理读取消息或向代理写入消息。该传输能够同时支持同步和异步读取。所有写入都是通过
Channel
支持此传输的对象。异步读取是使用调用
drain_events()
,它同步读取异步获取的消息,然后通过调用Connection
对象。传输还提供了建立和关闭与代理的连接的方法。此传输建立了类似工厂的模式,允许使用单例模式将所有连接合并为单个连接。
交通工具可以创建
Channel
对象与代理进行通信时使用create_channel()
方法。传输根据Kombu 3.0接口识别可恢复的连接错误和可恢复的通道错误。这些异常以元组的形式列出,并存储在传输类属性中 recoverable_connection_errors 和 recoverable_channel_errors 分别进行了分析。引发的任何异常如果不是这些元组的成员,都被认为是不可恢复的。这使得Kombu对某些操作的自动重试的支持能够正常运行。
为了向后兼容Kombu 3.0之前的异常接口,还列出了以下可恢复的错误 connection_errors 和 channel_errors 。
- class Connection(**connection_options)¶
QPID连接。
将连接对象封装为
Transport
。- 参数:
host -- 连接应连接到的主机。
port -- 连接应连接到的端口。
username -- 连接应使用的用户名。可选的。
password -- 连接应使用的密码。可选,但需要用户名。
transport -- 连接应使用的传输类型。‘tcp’或‘ssl’应为值。
timeout -- 连接连接到代理时使用的超时。
sasl_mechanisms -- 要使用的SASL身份验证机制类型。有关有效值的说明,请参阅SASL文档。
备注
消息传递有一个AuthenticationFailure异常类型,但却引发了一条ConnectionError消息,指出在这些情况下发生了身份验证失败。ConnectionError被列为可恢复的错误类型,因此如果引发ConnectionError,Kombu将尝试重试。在不调整凭据的情况下重试该操作是不正确的,因此此方法专门检查是否存在指示发生身份验证失败的ConnectionError。在这些情况下,错误类型会发生变化,同时保留原始消息并引发错误,因此kombu将允许将异常视为不可恢复。
连接对象是由
Transport
在呼叫期间establish_connection()
。这个Transport
将连接选项作为关键字传入,这些关键字应用于创建的任何连接。每个Transport
只创建一个连接。Connection对象维护对
Connection
可以通过名为的绑定getter方法访问get_qpid_connection()
方法。每个通道使用每个通道的连接BrokerAgent
,并且传输为所有发送者和接收者维护会话。Connection对象还负责维护在收到消息时应调用的回调引用的字典。这些回调保存在_CALLBACKS中,并以与收到的消息相关联的队列名称为关键字。_回调在中设置
Channel.basic_consume()
,已删除Channel.basic_cancel()
,并叫来了Transport.drain_events()
。以下键应至少作为关键字参数传入:
所有关键字参数都收集到CONNECTION_OPTIONS字典中,并直接传递到
qpid.messaging.endpoints.Connection.establish()
。- class Channel(connection, transport)¶
支持代理配置和消息发送和接收。
- 参数:
connection (kombu.transport.qpid.Connection) -- 此Channel可以引用的Connection对象。目前仅用于访问回调。
transport (kombu.transport.qpid.Transport) -- 此频道关联的传输。
频道对象被设计为与AMQP 0-10及更早版本中定义的频道具有方法奇偶性,这允许以下代理操作:
Exchange声明和删除
队列声明和删除
对绑定和解除绑定操作进行排队
队列长度和清除操作
发送/接收/拒绝消息
对消息进行结构化、编码和解码
支持同步和异步读取
正在读取有关交换、队列和绑定的状态
通道设计为所有通道都与代理共享单个TCP连接,但在受益于共享的TCP连接的同时,提供与代理的隔离通信级别。频道被赋予了它的
Connection
对象设置为Transport
这实例化了通道。此频道继承自
StdChannel
,这使得这是一个“本地”频道,而不是一个“虚拟”频道,它将从kombu.transports.virtual
。使用此通道发送的消息将被分配一个Delivery_Tag。在消息准备发送时为消息生成Delivery_Tag
basic_publish()
。对于每个频道实例,Delivery_Tag是唯一的。Delivery_Tag在其他对象中没有有意义的上下文,仅在此对象的内存中维护,并且基础QoS
提供支持的对象。每个频道对象恰好实例化一个
QoS
用于预取限制和异步打包的对象。这个QoS
对象通过属性方法延迟实例化qos()
。这个QoS
对象是不应直接访问的支持对象,只能由频道本身访问。对队列的同步读取是使用调用
basic_get()
它使用_get()
来执行读数。这些方法立即读取,不接受任何形式的超时。basic_get()
同步读取并在返回消息之前确认消息。在所有情况下都会执行ACK,因为使用qpid.Messaging读取消息但不确认它们的应用程序将会遇到内存泄漏。的no_ack参数basic_get()
不会影响Acking功能。通过使用启动使用者来完成对队列的异步读取
basic_consume()
。每次调用basic_consume()
会引起一场Receiver
要在上创建Session
由 :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call toTransport.basic_drain()
发生。的预取计数值QoS
对象是新接收器的容量值。新的接收器容量必须始终至少为1,否则看起来没有一个接收器可以读取,并且永远不会被读取。每次调用
basic_consume()
创建一个使用者,该使用者被赋予一个使用者标记,该标记由basic_consume()
。已启动的消费者可以通过其Consumer_tag使用以下命令取消使用basic_cancel()
。消费者的取消会导致Receiver
要关闭的对象。通过支持异步消息打包
basic_ack()
,并由Delivery_Tag引用。Channel对象使用其QoS
对象来执行消息打包。- class Message(payload, channel=None, **kwargs)¶
消息对象。
- accept¶
- body¶
- channel¶
- content_encoding¶
- content_type¶
- delivery_info¶
- delivery_tag¶
- headers¶
- properties¶
- serializable()¶
- class QoS(session, prefetch_count=1)¶
用于消息预取和打包的帮助器对象。
- 关键字参数:
prefetch_count -- 初始预取计数,硬设置为1。
注意:PREFETCH_COUNT当前硬设置为1,需要改进
此对象是使用
Channel
举个例子。服务质量允许prefetch_count
设置为对应的未处理消息的数量Channel
应允许预取。设置prefetch_count
设置为0将禁用预取限制,并且对象可以容纳任意数量的消息。消息是使用添加的
append()
,它们将一直保持,直到通过调用ack()
。在收到ACK或关闭会话之前,代理不会将已接收但未确认的消息传递给另一个使用者。使用Delivery_Tag引用消息,每个消息都是唯一的Channel
。传递标记在此对象外部进行管理,并与消息一起传递到append()
。未确认的消息可以通过以下方式从服务质量中查找get()
并且可以使用以下命令拒绝和忘记reject()
。- ack(delivery_tag)¶
通过Delivery_Tag确认消息。
一旦消息被处理并且可以被代理忘记,就以异步方式调用。
- 参数:
delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。
- append(message, delivery_tag)¶
将消息追加到未确认消息列表中。
添加一条消息,由Delivery_Tag引用,用于稍后删除、拒绝或获取。消息通过Delivery_Tag保存到DICT中。
- 参数:
message (qpid.messaging.Message) -- 尚未确认的已接收消息。
delivery_tag (uuid.UUID) -- 收到此消息时引用此消息的UUID。
- can_consume()¶
如果是,则返回True
Channel
可以使用更多的消息。用于确保客户端遵守当前活动的预取限制。
- 返回:
如果此Qos对象可以接受更多消息而不违反prefetch_count,则为True。如果PREFETCH_COUNT为0,则CAN_Consumer将始终返回True。
- 返回类型:
- can_consume_max_estimate()¶
返回剩余消息容量。
返回一个估计的未处理消息数。
kombu.transport.qpid.Channel
可以接受而不超过prefetch_count
。如果prefetch_count
为0,则此方法返回1。- 返回:
在不违反PREFETCH_COUNT的情况下可以提取的估计消息数。
- 返回类型:
- basic_ack(delivery_tag, multiple=False)¶
通过Delivery_Tag确认消息。
确认Delivery_Tag引用的邮件。消息只能使用以下命令确认
basic_ack()
如果它们是通过以下方式获得的basic_consume()
。这是异步读取行为的ACK部分。在内部,此方法使用
QoS
对象,该对象存储消息并负责打包。
- basic_cancel(consumer_tag)¶
按消费者标签取消消费者。
请求消费者停止从其队列中读取消息。消费者是一个
Receiver
,并使用以下命令关闭close()
。此方法还清除消费者的所有挥之不去的引用。
- 参数:
consumer_tag (an immutable object) -- 指要注销的消费者的标签。最初在将使用者创建为参数时指定
basic_consume()
。
- basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)¶
启动从队列读取的异步使用者。
此方法启动类型为
Receiver
使用Session
由创建并引用Transport
从由名称指定的队列中读取消息,直到通过调用basic_cancel()
。消息稍后可通过同步调用
Transport.drain_events()
,它将从由此方法启动的使用者中排出。Transport.drain_events()
是同步的,但通过网络接收消息是异步发生的,因此它应该仍然执行得很好。Transport.drain_events()
调用此处随self.Message类型的消息一起提供的回调。每个消费者都由一个Consumer_tag引用,该标签由该方法的调用方提供。
此方法在以队列名称为关键字的dict中设置对self.Connection对象的回调。
drain_events()
负责在收到消息时调用该回调。的调用方处理完消息后,会将收到的所有消息都添加到要保存的Qos对象中,以备以后使用
drain_events()
。消息可以在通过调用basic_ack()
。如果no_ack为True,则no_ack标志指示消息的接收者不会调用
basic_ack()
后来。由于该消息稍后不会被确认,因此它将立即被确认。basic_consume()
在调用回调之前转换消息对象类型。最初,该消息以qpid.messaging.Message
。此方法解包qpid.messaging.Message
并创建一个self.Message类型的新对象。此方法将用户传递的回调包装在运行时构建的函数中,该函数提供来自
qpid.messaging.Message
至Message
,并将消息添加到关联的QoS
对象用于异步定位(如有必要)。- 参数:
queue (str) -- 要从中消费消息的队列的名称
no_ack (bool) -- 如果为True,则消息将不会被保存以供稍后访问,但将立即被访问。如果为False,则将保存消息以供稍后通过调用
basic_ack()
。callback (a callable object) -- 当消息到达队列时将调用的可调用对象。
consumer_tag (an immutable object) -- 引用所创建的使用者的标记。需要使用此Consumer_Tag来取消消费者。
- basic_get(queue, no_ack=False, **kwargs)¶
非阻塞单个消息按名称从队列获取和确认。
在内部,此方法使用
_get()
来获取消息。如果一个Empty
异常由以下人员引发_get()
,则此方法会将其静默并返回None。如果_get()
不返回消息,则该消息被确认。No_ack参数对ACK行为没有影响,所有消息在所有情况下都被ACK。此方法从不将获取的消息添加到用于异步ACK的内部Qos对象。此方法在方法传递时转换方法的对象类型。从经纪人那里拿到的,
_get()
返回一个qpid.messaging.Message
,但此方法获取qpid.messaging.Message
并实例化一个Message
使用基于self.Message的类设置的有效负载创建。
- basic_publish(message, exchange, routing_key, **kwargs)¶
使用路由键将消息发布到Exchange上。
使用ROUTING_KEY指定的路由关键字将消息发布到由名称指定的交换上。在发送邮件之前,通过以下方式准备邮件:
对正文进行编码,使用
encode_body()
- 将正文包装为缓冲区对象,以便
qpid.messaging.endpoints.Sender
使用可以支持任意大消息的内容类型。
将Delivery_Tag设置为随机uuid.uuid
将Exchange和Routing_Key信息设置为Delivery_Info
内部使用
_put()
以同步发送消息。此消息通常由kombu.messaging.Producer._publish
作为消息发布的最后一步。- 参数:
message (dict) -- 包含关键字值与消息数据配对的字典。方法生成有效的消息字典。
prepare_message()
方法。exchange (str) -- 要向其提交此消息的交换的名称。
routing_key (str) -- 要用作消息的路由密钥被提交到Exchange上。
- basic_qos(prefetch_count, *args)¶
变化
QoS
此频道的设置。设置此通道可以获取和保留的未确认消息的数量。PREFETCH_VALUE还用作任何新
Receiver
物体。目前,该值被硬编码为1。
- 参数:
prefetch_count (int) -- 没有用过。此方法被硬编码为1。
- basic_reject(delivery_tag, requeue=False)¶
通过Delivery_Tag拒绝邮件。
拒绝频道已接收但尚未确认的消息。消息由其Delivery_Tag引用。
如果RESEUE为FALSE,则被拒绝的消息将被代理丢弃,并且不会传递给任何其他使用者。如果Requeue为True,则被拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给先前拒绝该邮件的同一使用者。
- 参数:
delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。
- 关键字参数:
requeue -- 如果为False,则被拒绝的消息将由代理丢弃,并且不会传递给任何其他消费者。如果为True,则拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给以前拒绝该邮件的同一使用者。
- body_encoding = 'base64'¶
默认正文编码。注:
transport_options['body_encoding']
将覆盖此值。
- close()¶
取消所有关联的消息并关闭频道。
这将取消所有使用者,方法是调用
basic_cancel()
对于每个已知的Consumer_Tag。它还会关闭self._Broker会话。隐式关闭会话会导致代理将所有未完成、未确认的消息视为未传递。
- codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}¶
二进制<->ASCII编解码器。
- decode_body(body, encoding=None)¶
使用可选指定的编码对正文进行解码。
编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。
- encode_body(body, encoding=None)¶
使用可选指定的编码对正文进行编码。
编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。
- exchange_declare(exchange='', type='direct', durable=False, **kwargs)¶
创建一个新的交易所。
创建特定类型的交换,并可选择使该交换持久。如果请求的名称的交换已经存在,则不会采取任何操作,也不会引发任何异常。持久交易所将在经纪商重启后存活下来,而非持久交易所则不会。
交易所根据其类型提供行为。预期的行为是在AMQP 0-10和之前的规范中定义的行为,包括‘直接’、‘主题’和‘扇出’功能。
- 关键字参数:
type -- 交换类型。有效值包括‘DIRECT’、‘TOPIC’和‘FANOUT’。
exchange -- 要创建的交换的名称。如果未指定Exchange,则将使用空字符串作为名称。
durable -- 如果交换应该是持久的,则为True,否则为False。
- prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)¶
准备要发送的消息数据。
此消息通常由
kombu.messaging.Producer._publish()
作为消息发布中的准备步骤。- 参数:
body (str) -- 消息的正文
- 关键字参数:
priority -- 0到9之间的数字,用于设置消息的优先级。
content_type -- 消息正文应被视为的Content_Type。如果未设置此项,则
qpid.messaging.endpoints.Sender
对象尝试从正文中自动检测Content_type。content_encoding -- 消息正文的Content_Coding编码为。
headers -- 应设置的其他邮件标头。作为键-值对传入。
properties -- 要在消息上设置的消息属性。
- 返回:
返回封装消息属性的Dict对象。有关可以设置的属性的详细信息,请参见参数。
- 返回类型:
- queue_bind(queue, exchange, routing_key, **kwargs)¶
使用绑定密钥将队列绑定到交换。
使用特定的绑定密钥将按名称指定的队列绑定到按名称指定的交换。要成功完成绑定,代理上必须已存在队列和交换。队列可以使用不同的密钥多次绑定到交换。
- queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)¶
创建由名称指定的新队列。
如果该队列已经存在,则不会对该队列进行任何更改,并且返回值将返回有关现有队列的信息。
队列名称是必需的,并指定为第一个参数。
如果PASSIVE为True,服务器将不创建队列。客户端可以使用它来检查队列是否存在,而无需修改服务器状态。默认值为FALSE。
如果耐久为True,则队列将是耐久的。服务器重新启动时,持久队列保持活动状态。如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。默认值为FALSE。
如果EXCLUSIVE为True,则队列将是独占的。独占队列只能由当前连接使用。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。默认值为FALSE。
如果AUTO_DELETE为True,则在所有使用者使用完队列后将其删除。最后一个消费者可以明确取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。默认为True。
未使用NoWait参数。它是0-9-1协议的一部分,但此AMQP客户端实现了0-10,删除了nowait选项。
Arguments参数是用于声明队列的一组参数。参数作为字典传递或不传递。如果被动为True,则忽略此字段。默认设置为无。
此方法返回一个
namedtuple
其名称为‘QUEUE_DECLARE_OK_T’,队列名称为‘QUEUE’,队列上的消息计数为‘MESSAGE_COUNT’,活动使用者数量为‘Consumer_COUNT’。命名的元组值分别按Queue、Message_Count和Consumer_Count排序。由于芹菜对事件的非ACK,因此在以字符串‘celeryev’开头或以字符串‘pidbox’结尾的任何队列上都设置了环策略。这些是西芹事件队列,西芹不会对它们进行确认,从而导致消息堆积。最终,除非设置了“环”策略,否则QPID将停止为消息提供服务,此时支持队列的缓冲区将变为循环。
- 参数:
- 返回:
将声明的队列表示为命名元组的命名元组。元组值按队列、消息计数和活动使用者计数排序。
- 返回类型:
namedtuple
- queue_delete(queue, if_unused=False, if_empty=False, **kwargs)¶
按名称删除队列。
删除按名称指定的队列。使用IF_UNUSED关键字参数,只有在绑定了0个使用者时,才能执行删除操作。使用IF_EMPTY关键字参数,仅当队列中有0条消息时才能进行删除。
- 参数:
queue (str) -- 要删除的队列的名称。
- 关键字参数:
if_unused -- 如果为True,则仅当队列有0个使用者时才删除。如果为False,则删除队列,即使绑定了使用者也是如此。
if_empty -- 如果为True,则仅在队列为空时删除该队列。如果为False,则在队列为空或不为空时将其删除。
- queue_purge(queue, **kwargs)¶
从队列中删除所有未送达的邮件。
从按名称指定的队列中清除所有未送达的邮件。如果队列不存在,则会引发异常。首先检查队列消息深度,然后要求代理清除该数量的消息。返回请求清除的消息的整数个。实际清除的邮件数可能与请求的要清除的邮件数不同。
有时会要求清除已传递的邮件,但不会。这种情况以静默方式失败,这是已传递给另一个使用者的消息的正确行为,该使用者尚未确认该消息,并且仍与代理有活动会话。在这种情况下,清除消息是不安全的,并将由代理保留。客户端无法更改此传递行为。
在内部,此方法依赖于
_purge()
。
- queue_unbind(queue, exchange, routing_key, **kwargs)¶
解除队列与具有给定绑定密钥的交换的绑定。
从已使用绑定键绑定的、由名称指定的交换取消绑定由名称指定的队列。队列和交换必须已存在于代理上,并与绑定密钥绑定,才能成功完成操作。队列可以使用不同的密钥多次绑定到交换,因此绑定密钥是显式解绑的必填字段。
- close()¶
关闭连接。
关闭连接将关闭该连接使用的所有关联会话、发送方或接收方。
- get_qpid_connection()¶
返回已有的连接(单例)。
- 返回:
现有的qpid.Messaging.Connection
- 返回类型:
qpid.messaging.endpoints.Connection
- channel_errors = (None,)¶
由于通道/方法故障而可能发生的错误元组。
- close_connection(connection)[源代码]¶
关闭
Connection
对象。- 参数:
connection (
kombu.transport.qpid.Connection
) -- 应该关闭的连接。
- connection_errors = (None, <class 'OSError'>)¶
由于连接故障而可能发生的错误的元组。
- create_channel(connection)[源代码]¶
创建并返回
Channel
。创建一个新频道,并将该频道追加到连接已知的频道列表中。一旦创建了新的频道,就会将其返回。
- 参数:
connection (kombu.transport.qpid.Connection) -- 应该支持新的
Channel
。- 返回:
制作的新频道。
- 返回类型:
- property default_connection_params¶
返回带有默认连接参数的DICT。
只要传输的创建者没有指定必需的参数,就会使用这些连接参数。
- 返回:
包含默认参数的字典。
- 返回类型:
- drain_events(connection, timeout=0, **kwargs)[源代码]¶
处理和调用所有就绪传输消息的回调。
从所有已准备好的事件中排出
Receiver
它们正在异步获取消息。对于每个排空的消息,该消息被调用到相应的回调。回调按队列名称进行组织。
- 参数:
connection (kombu.transport.qpid.Connection) -- 这个
Connection
它包含按队列名称索引的回调,此方法将调用这些回调。- 关键字参数:
timeout -- 限制此方法将运行的时间的超时。超时可能会中断正在等待新消息的阻塞读取,或者导致此方法在清空所有消息之前返回。默认为0。
- driver_name = 'qpid'¶
驱动程序库的名称(例如‘py-amqp’、‘redis’)。
- driver_type = 'qpid'¶
驱动程序的类型,可用于使用AMQP协议(DRIVER_TYPE:‘AMQP’)、Redis(DRIVER_TYPE:‘REDIS’)等来分隔传输...
- establish_connection()[源代码]¶
建立连接对象。
确定在创建此传输所需的任何连接时要使用的正确选项,并创建
Connection
对象,该对象在需要时为生成的连接保存这些值。这些选项混合了通过传输的创建者传入的内容和提供的默认值default_connection_params()
。选项包括代理网络设置、超时行为、身份验证和身份验证设置。此方法还创建并存储
Session
使用Connection
由此方法创建的。会话存储在Self上。- 返回:
被创造的
Connection
对象,则返回。- 返回类型:
- implements = {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}¶
- on_readable(connection, loop)[源代码]¶
处理与此传输关联的任何邮件。
此方法通过发出对self.r文件描述符的Read调用来从外部监视的文件描述符中清除单个消息,该调用删除了由QPID会话消息回调处理程序放入管道中的单个‘0’字符。一旦读取了“0”,所有可用的事件都会通过调用
drain_events()
。文件描述符self.r被修改为非阻塞的,以确保在不再有消息时意外调用此方法不会导致无限阻塞。
预计不会有任何东西从
drain_events()
因为drain_events()
控件上维护的回调来处理消息Connection
对象。什么时候drain_events()
返回时,所有关联的消息都已被处理。此方法调用DRAIN_EVENTS(),它读取此传输可用的尽可能多的消息,然后返回。它阻止阅读和处理大量消息可能需要时间,但它不会阻止等待新消息到达。什么时候
drain_events()
称为超时,则未指定超时,这会导致此行为。值得注意的一个有趣行为是,当多个消息准备就绪时,该方法从self.r中删除了一个‘0’字符,但是
drain_events()
可以处理任意数量的消息。在这种情况下,可能会在self.r上留下额外的‘0’字符以供读取,其中与这些‘0’字符对应的消息已经被处理。外部EPOLL循环将错误地认为其他数据已准备好读取,并将不必要地调用ON_READABLE,对要读取的每个‘0’调用一次。其他呼叫至on_readable()
不会产生负面影响,并且最终会从self.r文件描述符中清除符号。如果新消息在此耗尽期间出现,也将得到适当处理。- 参数:
connection (kombu.transport.qpid.Connection) -- 与可读事件关联的连接,其中包含需要为可读对象调用的回调。
loop (kombu.asynchronous.Hub) -- 包含类似EPOLL功能的异步循环对象。
- polling_interval = None¶
- recoverable_channel_errors = (None,)¶
- recoverable_connection_errors = (None, <class 'OSError'>)¶
- register_with_event_loop(connection, loop)[源代码]¶
使用循环注册文件描述符和回调。
注册当外部EPOLL循环发现注册的文件描述符可以读取时调用的回调self.on_Readable。文件描述符由该传输创建,并在消息可用时写入。
因为Support_ev==True,所以Celery希望调用此方法来使Transport有机会注册一个读文件描述符,以便由Celery使用事件I/O通知机制(如EPOLL)进行外部监视。还注册了一个回调,一旦外部EPOLL循环准备好处理与准备为此传输处理的消息相关联的EPOLL事件,就将调用该回调。
在实例化传输之后,每个传输只进行一次注册调用。
- 参数:
connection (kombu.transport.qpid.Connection) -- 对与此传输关联的连接的引用。
loop (kombu.asynchronous.hub.Hub) -- 对外部循环的引用。
连接¶
- class kombu.transport.qpid.Connection(**connection_options)[源代码]¶
QPID连接。
将连接对象封装为
Transport
。- 参数:
host -- 连接应连接到的主机。
port -- 连接应连接到的端口。
username -- 连接应使用的用户名。可选的。
password -- 连接应使用的密码。可选,但需要用户名。
transport -- 连接应使用的传输类型。‘tcp’或‘ssl’应为值。
timeout -- 连接连接到代理时使用的超时。
sasl_mechanisms -- 要使用的SASL身份验证机制类型。有关有效值的说明,请参阅SASL文档。
备注
消息传递有一个AuthenticationFailure异常类型,但却引发了一条ConnectionError消息,指出在这些情况下发生了身份验证失败。ConnectionError被列为可恢复的错误类型,因此如果引发ConnectionError,Kombu将尝试重试。在不调整凭据的情况下重试该操作是不正确的,因此此方法专门检查是否存在指示发生身份验证失败的ConnectionError。在这些情况下,错误类型会发生变化,同时保留原始消息并引发错误,因此kombu将允许将异常视为不可恢复。
连接对象是由
Transport
在呼叫期间establish_connection()
。这个Transport
将连接选项作为关键字传入,这些关键字应用于创建的任何连接。每个Transport
只创建一个连接。Connection对象维护对
Connection
可以通过名为的绑定getter方法访问get_qpid_connection()
方法。每个通道使用每个通道的连接BrokerAgent
,并且传输为所有发送者和接收者维护会话。Connection对象还负责维护在收到消息时应调用的回调引用的字典。这些回调保存在_CALLBACKS中,并以与收到的消息相关联的队列名称为关键字。_回调在中设置
Channel.basic_consume()
,已删除Channel.basic_cancel()
,并叫来了Transport.drain_events()
。以下键应至少作为关键字参数传入:
所有关键字参数都收集到CONNECTION_OPTIONS字典中,并直接传递到
qpid.messaging.endpoints.Connection.establish()
。- class Channel(connection, transport)¶
支持代理配置和消息发送和接收。
- 参数:
connection (kombu.transport.qpid.Connection) -- 此Channel可以引用的Connection对象。目前仅用于访问回调。
transport (kombu.transport.qpid.Transport) -- 此频道关联的传输。
频道对象被设计为与AMQP 0-10及更早版本中定义的频道具有方法奇偶性,这允许以下代理操作:
Exchange声明和删除
队列声明和删除
对绑定和解除绑定操作进行排队
队列长度和清除操作
发送/接收/拒绝消息
对消息进行结构化、编码和解码
支持同步和异步读取
正在读取有关交换、队列和绑定的状态
通道设计为所有通道都与代理共享单个TCP连接,但在受益于共享的TCP连接的同时,提供与代理的隔离通信级别。频道被赋予了它的
Connection
对象设置为Transport
这实例化了通道。此频道继承自
StdChannel
,这使得这是一个“本地”频道,而不是一个“虚拟”频道,它将从kombu.transports.virtual
。使用此通道发送的消息将被分配一个Delivery_Tag。在消息准备发送时为消息生成Delivery_Tag
basic_publish()
。对于每个频道实例,Delivery_Tag是唯一的。Delivery_Tag在其他对象中没有有意义的上下文,仅在此对象的内存中维护,并且基础QoS
提供支持的对象。每个频道对象恰好实例化一个
QoS
用于预取限制和异步打包的对象。这个QoS
对象通过属性方法延迟实例化qos()
。这个QoS
对象是不应直接访问的支持对象,只能由频道本身访问。对队列的同步读取是使用调用
basic_get()
它使用_get()
来执行读数。这些方法立即读取,不接受任何形式的超时。basic_get()
同步读取并在返回消息之前确认消息。在所有情况下都会执行ACK,因为使用qpid.Messaging读取消息但不确认它们的应用程序将会遇到内存泄漏。的no_ack参数basic_get()
不会影响Acking功能。通过使用启动使用者来完成对队列的异步读取
basic_consume()
。每次调用basic_consume()
会引起一场Receiver
要在上创建Session
由 :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call toTransport.basic_drain()
发生。的预取计数值QoS
对象是新接收器的容量值。新的接收器容量必须始终至少为1,否则看起来没有一个接收器可以读取,并且永远不会被读取。每次调用
basic_consume()
创建一个使用者,该使用者被赋予一个使用者标记,该标记由basic_consume()
。已启动的消费者可以通过其Consumer_tag使用以下命令取消使用basic_cancel()
。消费者的取消会导致Receiver
要关闭的对象。通过支持异步消息打包
basic_ack()
,并由Delivery_Tag引用。Channel对象使用其QoS
对象来执行消息打包。- class Message(payload, channel=None, **kwargs)¶
消息对象。
- accept¶
- body¶
- channel¶
- content_encoding¶
- content_type¶
- delivery_info¶
- delivery_tag¶
- headers¶
- properties¶
- serializable()¶
- class QoS(session, prefetch_count=1)¶
用于消息预取和打包的帮助器对象。
- 关键字参数:
prefetch_count -- 初始预取计数,硬设置为1。
注意:PREFETCH_COUNT当前硬设置为1,需要改进
此对象是使用
Channel
举个例子。服务质量允许prefetch_count
设置为对应的未处理消息的数量Channel
应允许预取。设置prefetch_count
设置为0将禁用预取限制,并且对象可以容纳任意数量的消息。消息是使用添加的
append()
,它们将一直保持,直到通过调用ack()
。在收到ACK或关闭会话之前,代理不会将已接收但未确认的消息传递给另一个使用者。使用Delivery_Tag引用消息,每个消息都是唯一的Channel
。传递标记在此对象外部进行管理,并与消息一起传递到append()
。未确认的消息可以通过以下方式从服务质量中查找get()
并且可以使用以下命令拒绝和忘记reject()
。- ack(delivery_tag)¶
通过Delivery_Tag确认消息。
一旦消息被处理并且可以被代理忘记,就以异步方式调用。
- 参数:
delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。
- append(message, delivery_tag)¶
将消息追加到未确认消息列表中。
添加一条消息,由Delivery_Tag引用,用于稍后删除、拒绝或获取。消息通过Delivery_Tag保存到DICT中。
- 参数:
message (qpid.messaging.Message) -- 尚未确认的已接收消息。
delivery_tag (uuid.UUID) -- 收到此消息时引用此消息的UUID。
- can_consume()¶
如果是,则返回True
Channel
可以使用更多的消息。用于确保客户端遵守当前活动的预取限制。
- 返回:
如果此Qos对象可以接受更多消息而不违反prefetch_count,则为True。如果PREFETCH_COUNT为0,则CAN_Consumer将始终返回True。
- 返回类型:
- can_consume_max_estimate()¶
返回剩余消息容量。
返回一个估计的未处理消息数。
kombu.transport.qpid.Channel
可以接受而不超过prefetch_count
。如果prefetch_count
为0,则此方法返回1。- 返回:
在不违反PREFETCH_COUNT的情况下可以提取的估计消息数。
- 返回类型:
- basic_ack(delivery_tag, multiple=False)¶
通过Delivery_Tag确认消息。
确认Delivery_Tag引用的邮件。消息只能使用以下命令确认
basic_ack()
如果它们是通过以下方式获得的basic_consume()
。这是异步读取行为的ACK部分。在内部,此方法使用
QoS
对象,该对象存储消息并负责打包。
- basic_cancel(consumer_tag)¶
按消费者标签取消消费者。
请求消费者停止从其队列中读取消息。消费者是一个
Receiver
,并使用以下命令关闭close()
。此方法还清除消费者的所有挥之不去的引用。
- 参数:
consumer_tag (an immutable object) -- 指要注销的消费者的标签。最初在将使用者创建为参数时指定
basic_consume()
。
- basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)¶
启动从队列读取的异步使用者。
此方法启动类型为
Receiver
使用Session
由创建并引用Transport
从由名称指定的队列中读取消息,直到通过调用basic_cancel()
。消息稍后可通过同步调用
Transport.drain_events()
,它将从由此方法启动的使用者中排出。Transport.drain_events()
是同步的,但通过网络接收消息是异步发生的,因此它应该仍然执行得很好。Transport.drain_events()
调用此处随self.Message类型的消息一起提供的回调。每个消费者都由一个Consumer_tag引用,该标签由该方法的调用方提供。
此方法在以队列名称为关键字的dict中设置对self.Connection对象的回调。
drain_events()
负责在收到消息时调用该回调。的调用方处理完消息后,会将收到的所有消息都添加到要保存的Qos对象中,以备以后使用
drain_events()
。消息可以在通过调用basic_ack()
。如果no_ack为True,则no_ack标志指示消息的接收者不会调用
basic_ack()
后来。由于该消息稍后不会被确认,因此它将立即被确认。basic_consume()
在调用回调之前转换消息对象类型。最初,该消息以qpid.messaging.Message
。此方法解包qpid.messaging.Message
并创建一个self.Message类型的新对象。此方法将用户传递的回调包装在运行时构建的函数中,该函数提供来自
qpid.messaging.Message
至Message
,并将消息添加到关联的QoS
对象用于异步定位(如有必要)。- 参数:
queue (str) -- 要从中消费消息的队列的名称
no_ack (bool) -- 如果为True,则消息将不会被保存以供稍后访问,但将立即被访问。如果为False,则将保存消息以供稍后通过调用
basic_ack()
。callback (a callable object) -- 当消息到达队列时将调用的可调用对象。
consumer_tag (an immutable object) -- 引用所创建的使用者的标记。需要使用此Consumer_Tag来取消消费者。
- basic_get(queue, no_ack=False, **kwargs)¶
非阻塞单个消息按名称从队列获取和确认。
在内部,此方法使用
_get()
来获取消息。如果一个Empty
异常由以下人员引发_get()
,则此方法会将其静默并返回None。如果_get()
不返回消息,则该消息被确认。No_ack参数对ACK行为没有影响,所有消息在所有情况下都被ACK。此方法从不将获取的消息添加到用于异步ACK的内部Qos对象。此方法在方法传递时转换方法的对象类型。从经纪人那里拿到的,
_get()
返回一个qpid.messaging.Message
,但此方法获取qpid.messaging.Message
并实例化一个Message
使用基于self.Message的类设置的有效负载创建。
- basic_publish(message, exchange, routing_key, **kwargs)¶
使用路由键将消息发布到Exchange上。
使用ROUTING_KEY指定的路由关键字将消息发布到由名称指定的交换上。在发送邮件之前,通过以下方式准备邮件:
对正文进行编码,使用
encode_body()
- 将正文包装为缓冲区对象,以便
qpid.messaging.endpoints.Sender
使用可以支持任意大消息的内容类型。
将Delivery_Tag设置为随机uuid.uuid
将Exchange和Routing_Key信息设置为Delivery_Info
内部使用
_put()
以同步发送消息。此消息通常由kombu.messaging.Producer._publish
作为消息发布的最后一步。- 参数:
message (dict) -- 包含关键字值与消息数据配对的字典。方法生成有效的消息字典。
prepare_message()
方法。exchange (str) -- 要向其提交此消息的交换的名称。
routing_key (str) -- 要用作消息的路由密钥被提交到Exchange上。
- basic_qos(prefetch_count, *args)¶
变化
QoS
此频道的设置。设置此通道可以获取和保留的未确认消息的数量。PREFETCH_VALUE还用作任何新
Receiver
物体。目前,该值被硬编码为1。
- 参数:
prefetch_count (int) -- 没有用过。此方法被硬编码为1。
- basic_reject(delivery_tag, requeue=False)¶
通过Delivery_Tag拒绝邮件。
拒绝频道已接收但尚未确认的消息。消息由其Delivery_Tag引用。
如果RESEUE为FALSE,则被拒绝的消息将被代理丢弃,并且不会传递给任何其他使用者。如果Requeue为True,则被拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给先前拒绝该邮件的同一使用者。
- 参数:
delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。
- 关键字参数:
requeue -- 如果为False,则被拒绝的消息将由代理丢弃,并且不会传递给任何其他消费者。如果为True,则拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给以前拒绝该邮件的同一使用者。
- body_encoding = 'base64'¶
默认正文编码。注:
transport_options['body_encoding']
将覆盖此值。
- close()¶
取消所有关联的消息并关闭频道。
这将取消所有使用者,方法是调用
basic_cancel()
对于每个已知的Consumer_Tag。它还会关闭self._Broker会话。隐式关闭会话会导致代理将所有未完成、未确认的消息视为未传递。
- codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}¶
二进制<->ASCII编解码器。
- decode_body(body, encoding=None)¶
使用可选指定的编码对正文进行解码。
编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。
- encode_body(body, encoding=None)¶
使用可选指定的编码对正文进行编码。
编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。
- exchange_declare(exchange='', type='direct', durable=False, **kwargs)¶
创建一个新的交易所。
创建特定类型的交换,并可选择使该交换持久。如果请求的名称的交换已经存在,则不会采取任何操作,也不会引发任何异常。持久交易所将在经纪商重启后存活下来,而非持久交易所则不会。
交易所根据其类型提供行为。预期的行为是在AMQP 0-10和之前的规范中定义的行为,包括‘直接’、‘主题’和‘扇出’功能。
- 关键字参数:
type -- 交换类型。有效值包括‘DIRECT’、‘TOPIC’和‘FANOUT’。
exchange -- 要创建的交换的名称。如果未指定Exchange,则将使用空字符串作为名称。
durable -- 如果交换应该是持久的,则为True,否则为False。
- prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)¶
准备要发送的消息数据。
此消息通常由
kombu.messaging.Producer._publish()
作为消息发布中的准备步骤。- 参数:
body (str) -- 消息的正文
- 关键字参数:
priority -- 0到9之间的数字,用于设置消息的优先级。
content_type -- 消息正文应被视为的Content_Type。如果未设置此项,则
qpid.messaging.endpoints.Sender
对象尝试从正文中自动检测Content_type。content_encoding -- 消息正文的Content_Coding编码为。
headers -- 应设置的其他邮件标头。作为键-值对传入。
properties -- 要在消息上设置的消息属性。
- 返回:
返回封装消息属性的Dict对象。有关可以设置的属性的详细信息,请参见参数。
- 返回类型:
- queue_bind(queue, exchange, routing_key, **kwargs)¶
使用绑定密钥将队列绑定到交换。
使用特定的绑定密钥将按名称指定的队列绑定到按名称指定的交换。要成功完成绑定,代理上必须已存在队列和交换。队列可以使用不同的密钥多次绑定到交换。
- queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)¶
创建由名称指定的新队列。
如果该队列已经存在,则不会对该队列进行任何更改,并且返回值将返回有关现有队列的信息。
队列名称是必需的,并指定为第一个参数。
如果PASSIVE为True,服务器将不创建队列。客户端可以使用它来检查队列是否存在,而无需修改服务器状态。默认值为FALSE。
如果耐久为True,则队列将是耐久的。服务器重新启动时,持久队列保持活动状态。如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。默认值为FALSE。
如果EXCLUSIVE为True,则队列将是独占的。独占队列只能由当前连接使用。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。默认值为FALSE。
如果AUTO_DELETE为True,则在所有使用者使用完队列后将其删除。最后一个消费者可以明确取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。默认为True。
未使用NoWait参数。它是0-9-1协议的一部分,但此AMQP客户端实现了0-10,删除了nowait选项。
Arguments参数是用于声明队列的一组参数。参数作为字典传递或不传递。如果被动为True,则忽略此字段。默认设置为无。
此方法返回一个
namedtuple
其名称为‘QUEUE_DECLARE_OK_T’,队列名称为‘QUEUE’,队列上的消息计数为‘MESSAGE_COUNT’,活动使用者数量为‘Consumer_COUNT’。命名的元组值分别按Queue、Message_Count和Consumer_Count排序。由于芹菜对事件的非ACK,因此在以字符串‘celeryev’开头或以字符串‘pidbox’结尾的任何队列上都设置了环策略。这些是西芹事件队列,西芹不会对它们进行确认,从而导致消息堆积。最终,除非设置了“环”策略,否则QPID将停止为消息提供服务,此时支持队列的缓冲区将变为循环。
- 参数:
- 返回:
将声明的队列表示为命名元组的命名元组。元组值按队列、消息计数和活动使用者计数排序。
- 返回类型:
namedtuple
- queue_delete(queue, if_unused=False, if_empty=False, **kwargs)¶
按名称删除队列。
删除按名称指定的队列。使用IF_UNUSED关键字参数,只有在绑定了0个使用者时,才能执行删除操作。使用IF_EMPTY关键字参数,仅当队列中有0条消息时才能进行删除。
- 参数:
queue (str) -- 要删除的队列的名称。
- 关键字参数:
if_unused -- 如果为True,则仅当队列有0个使用者时才删除。如果为False,则删除队列,即使绑定了使用者也是如此。
if_empty -- 如果为True,则仅在队列为空时删除该队列。如果为False,则在队列为空或不为空时将其删除。
- queue_purge(queue, **kwargs)¶
从队列中删除所有未送达的邮件。
从按名称指定的队列中清除所有未送达的邮件。如果队列不存在,则会引发异常。首先检查队列消息深度,然后要求代理清除该数量的消息。返回请求清除的消息的整数个。实际清除的邮件数可能与请求的要清除的邮件数不同。
有时会要求清除已传递的邮件,但不会。这种情况以静默方式失败,这是已传递给另一个使用者的消息的正确行为,该使用者尚未确认该消息,并且仍与代理有活动会话。在这种情况下,清除消息是不安全的,并将由代理保留。客户端无法更改此传递行为。
在内部,此方法依赖于
_purge()
。
- queue_unbind(queue, exchange, routing_key, **kwargs)¶
解除队列与具有给定绑定密钥的交换的绑定。
从已使用绑定键绑定的、由名称指定的交换取消绑定由名称指定的队列。队列和交换必须已存在于代理上,并与绑定密钥绑定,才能成功完成操作。队列可以使用不同的密钥多次绑定到交换,因此绑定密钥是显式解绑的必填字段。
渠道¶
- class kombu.transport.qpid.Channel(connection, transport)[源代码]¶
支持代理配置和消息发送和接收。
- 参数:
connection (kombu.transport.qpid.Connection) -- 此Channel可以引用的Connection对象。目前仅用于访问回调。
transport (kombu.transport.qpid.Transport) -- 此频道关联的传输。
频道对象被设计为与AMQP 0-10及更早版本中定义的频道具有方法奇偶性,这允许以下代理操作:
Exchange声明和删除
队列声明和删除
对绑定和解除绑定操作进行排队
队列长度和清除操作
发送/接收/拒绝消息
对消息进行结构化、编码和解码
支持同步和异步读取
正在读取有关交换、队列和绑定的状态
通道设计为所有通道都与代理共享单个TCP连接,但在受益于共享的TCP连接的同时,提供与代理的隔离通信级别。频道被赋予了它的
Connection
对象设置为Transport
这实例化了通道。此频道继承自
StdChannel
,这使得这是一个“本地”频道,而不是一个“虚拟”频道,它将从kombu.transports.virtual
。使用此通道发送的消息将被分配一个Delivery_Tag。在消息准备发送时为消息生成Delivery_Tag
basic_publish()
。对于每个频道实例,Delivery_Tag是唯一的。Delivery_Tag在其他对象中没有有意义的上下文,仅在此对象的内存中维护,并且基础QoS
提供支持的对象。每个频道对象恰好实例化一个
QoS
用于预取限制和异步打包的对象。这个QoS
对象通过属性方法延迟实例化qos()
。这个QoS
对象是不应直接访问的支持对象,只能由频道本身访问。对队列的同步读取是使用调用
basic_get()
它使用_get()
来执行读数。这些方法立即读取,不接受任何形式的超时。basic_get()
同步读取并在返回消息之前确认消息。在所有情况下都会执行ACK,因为使用qpid.Messaging读取消息但不确认它们的应用程序将会遇到内存泄漏。的no_ack参数basic_get()
不会影响Acking功能。通过使用启动使用者来完成对队列的异步读取
basic_consume()
。每次调用basic_consume()
会引起一场Receiver
要在上创建Session
由 :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call toTransport.basic_drain()
发生。的预取计数值QoS
对象是新接收器的容量值。新的接收器容量必须始终至少为1,否则看起来没有一个接收器可以读取,并且永远不会被读取。每次调用
basic_consume()
创建一个使用者,该使用者被赋予一个使用者标记,该标记由basic_consume()
。已启动的消费者可以通过其Consumer_tag使用以下命令取消使用basic_cancel()
。消费者的取消会导致Receiver
要关闭的对象。通过支持异步消息打包
basic_ack()
,并由Delivery_Tag引用。Channel对象使用其QoS
对象来执行消息打包。- class Message(payload, channel=None, **kwargs)¶
使用的消息类。
- accept¶
- body¶
- channel¶
- content_encoding¶
- content_type¶
- delivery_info¶
- delivery_tag¶
- headers¶
- properties¶
- serializable()¶
- class QoS(session, prefetch_count=1)¶
将使用qos属性实例化的类引用。
- ack(delivery_tag)¶
通过Delivery_Tag确认消息。
一旦消息被处理并且可以被代理忘记,就以异步方式调用。
- 参数:
delivery_tag (uuid.UUID) -- 与要确认的邮件关联的传递标记。
- append(message, delivery_tag)¶
将消息追加到未确认消息列表中。
添加一条消息,由Delivery_Tag引用,用于稍后删除、拒绝或获取。消息通过Delivery_Tag保存到DICT中。
- 参数:
message (qpid.messaging.Message) -- 尚未确认的已接收消息。
delivery_tag (uuid.UUID) -- 收到此消息时引用此消息的UUID。
- can_consume()¶
如果是,则返回True
Channel
可以使用更多的消息。用于确保客户端遵守当前活动的预取限制。
- 返回:
如果此Qos对象可以接受更多消息而不违反prefetch_count,则为True。如果PREFETCH_COUNT为0,则CAN_Consumer将始终返回True。
- 返回类型:
- can_consume_max_estimate()¶
返回剩余消息容量。
返回一个估计的未处理消息数。
kombu.transport.qpid.Channel
可以接受而不超过prefetch_count
。如果prefetch_count
为0,则此方法返回1。- 返回:
在不违反PREFETCH_COUNT的情况下可以提取的估计消息数。
- 返回类型:
- basic_ack(delivery_tag, multiple=False)[源代码]¶
通过Delivery_Tag确认消息。
确认Delivery_Tag引用的邮件。消息只能使用以下命令确认
basic_ack()
如果它们是通过以下方式获得的basic_consume()
。这是异步读取行为的ACK部分。在内部,此方法使用
QoS
对象,该对象存储消息并负责打包。
- basic_cancel(consumer_tag)[源代码]¶
按消费者标签取消消费者。
请求消费者停止从其队列中读取消息。消费者是一个
Receiver
,并使用以下命令关闭close()
。此方法还清除消费者的所有挥之不去的引用。
- 参数:
consumer_tag (an immutable object) -- 指要注销的消费者的标签。最初在将使用者创建为参数时指定
basic_consume()
。
- basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)[源代码]¶
启动从队列读取的异步使用者。
此方法启动类型为
Receiver
使用Session
由创建并引用Transport
从由名称指定的队列中读取消息,直到通过调用basic_cancel()
。消息稍后可通过同步调用
Transport.drain_events()
,它将从由此方法启动的使用者中排出。Transport.drain_events()
是同步的,但通过网络接收消息是异步发生的,因此它应该仍然执行得很好。Transport.drain_events()
调用此处随self.Message类型的消息一起提供的回调。每个消费者都由一个Consumer_tag引用,该标签由该方法的调用方提供。
此方法在以队列名称为关键字的dict中设置对self.Connection对象的回调。
drain_events()
负责在收到消息时调用该回调。的调用方处理完消息后,会将收到的所有消息都添加到要保存的Qos对象中,以备以后使用
drain_events()
。消息可以在通过调用basic_ack()
。如果no_ack为True,则no_ack标志指示消息的接收者不会调用
basic_ack()
后来。由于该消息稍后不会被确认,因此它将立即被确认。basic_consume()
在调用回调之前转换消息对象类型。最初,该消息以qpid.messaging.Message
。此方法解包qpid.messaging.Message
并创建一个self.Message类型的新对象。此方法将用户传递的回调包装在运行时构建的函数中,该函数提供来自
qpid.messaging.Message
至Message
,并将消息添加到关联的QoS
对象用于异步定位(如有必要)。- 参数:
queue (str) -- 要从中消费消息的队列的名称
no_ack (bool) -- 如果为True,则消息将不会被保存以供稍后访问,但将立即被访问。如果为False,则将保存消息以供稍后通过调用
basic_ack()
。callback (a callable object) -- 当消息到达队列时将调用的可调用对象。
consumer_tag (an immutable object) -- 引用所创建的使用者的标记。需要使用此Consumer_Tag来取消消费者。
- basic_get(queue, no_ack=False, **kwargs)[源代码]¶
非阻塞单个消息按名称从队列获取和确认。
在内部,此方法使用
_get()
来获取消息。如果一个Empty
异常由以下人员引发_get()
,则此方法会将其静默并返回None。如果_get()
不返回消息,则该消息被确认。No_ack参数对ACK行为没有影响,所有消息在所有情况下都被ACK。此方法从不将获取的消息添加到用于异步ACK的内部Qos对象。此方法在方法传递时转换方法的对象类型。从经纪人那里拿到的,
_get()
返回一个qpid.messaging.Message
,但此方法获取qpid.messaging.Message
并实例化一个Message
使用基于self.Message的类设置的有效负载创建。
- basic_publish(message, exchange, routing_key, **kwargs)[源代码]¶
使用路由键将消息发布到Exchange上。
使用ROUTING_KEY指定的路由关键字将消息发布到由名称指定的交换上。在发送邮件之前,通过以下方式准备邮件:
对正文进行编码,使用
encode_body()
- 将正文包装为缓冲区对象,以便
qpid.messaging.endpoints.Sender
使用可以支持任意大消息的内容类型。
将Delivery_Tag设置为随机uuid.uuid
将Exchange和Routing_Key信息设置为Delivery_Info
内部使用
_put()
以同步发送消息。此消息通常由kombu.messaging.Producer._publish
作为消息发布的最后一步。- 参数:
message (dict) -- 包含关键字值与消息数据配对的字典。方法生成有效的消息字典。
prepare_message()
方法。exchange (str) -- 要向其提交此消息的交换的名称。
routing_key (str) -- 要用作消息的路由密钥被提交到Exchange上。
- basic_qos(prefetch_count, *args)[源代码]¶
变化
QoS
此频道的设置。设置此通道可以获取和保留的未确认消息的数量。PREFETCH_VALUE还用作任何新
Receiver
物体。目前,该值被硬编码为1。
- 参数:
prefetch_count (int) -- 没有用过。此方法被硬编码为1。
- basic_reject(delivery_tag, requeue=False)[源代码]¶
通过Delivery_Tag拒绝邮件。
拒绝频道已接收但尚未确认的消息。消息由其Delivery_Tag引用。
如果RESEUE为FALSE,则被拒绝的消息将被代理丢弃,并且不会传递给任何其他使用者。如果Requeue为True,则被拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给先前拒绝该邮件的同一使用者。
- 参数:
delivery_tag (uuid.UUID) -- 与要拒绝的邮件关联的传递标记。
- 关键字参数:
requeue -- 如果为False,则被拒绝的消息将由代理丢弃,并且不会传递给任何其他消费者。如果为True,则拒绝的邮件将被重新排队以传递给另一个使用者,可能传递给以前拒绝该邮件的同一使用者。
- body_encoding = 'base64'¶
默认正文编码。注:
transport_options['body_encoding']
将覆盖此值。
- close()[源代码]¶
取消所有关联的消息并关闭频道。
这将取消所有使用者,方法是调用
basic_cancel()
对于每个已知的Consumer_Tag。它还会关闭self._Broker会话。隐式关闭会话会导致代理将所有未完成、未确认的消息视为未传递。
- codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}¶
二进制<->ASCII编解码器。
- decode_body(body, encoding=None)[源代码]¶
使用可选指定的编码对正文进行解码。
编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。
- encode_body(body, encoding=None)[源代码]¶
使用可选指定的编码对正文进行编码。
编码可以通过名称指定,并在self.codecs中查找。Self.codecs使用字符串作为其键,该键指定编码的名称,然后该值是一个实例化的对象,它可以通过encode和decode方法提供该类型的编码/解码。
- exchange_declare(exchange='', type='direct', durable=False, **kwargs)[源代码]¶
创建一个新的交易所。
创建特定类型的交换,并可选择使该交换持久。如果请求的名称的交换已经存在,则不会采取任何操作,也不会引发任何异常。持久交易所将在经纪商重启后存活下来,而非持久交易所则不会。
交易所根据其类型提供行为。预期的行为是在AMQP 0-10和之前的规范中定义的行为,包括‘直接’、‘主题’和‘扇出’功能。
- 关键字参数:
type -- 交换类型。有效值包括‘DIRECT’、‘TOPIC’和‘FANOUT’。
exchange -- 要创建的交换的名称。如果未指定Exchange,则将使用空字符串作为名称。
durable -- 如果交换应该是持久的,则为True,否则为False。
- prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)[源代码]¶
准备要发送的消息数据。
此消息通常由
kombu.messaging.Producer._publish()
作为消息发布中的准备步骤。- 参数:
body (str) -- 消息的正文
- 关键字参数:
priority -- 0到9之间的数字,用于设置消息的优先级。
content_type -- 消息正文应被视为的Content_Type。如果未设置此项,则
qpid.messaging.endpoints.Sender
对象尝试从正文中自动检测Content_type。content_encoding -- 消息正文的Content_Coding编码为。
headers -- 应设置的其他邮件标头。作为键-值对传入。
properties -- 要在消息上设置的消息属性。
- 返回:
返回封装消息属性的Dict对象。有关可以设置的属性的详细信息,请参见参数。
- 返回类型:
- queue_bind(queue, exchange, routing_key, **kwargs)[源代码]¶
使用绑定密钥将队列绑定到交换。
使用特定的绑定密钥将按名称指定的队列绑定到按名称指定的交换。要成功完成绑定,代理上必须已存在队列和交换。队列可以使用不同的密钥多次绑定到交换。
- queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)[源代码]¶
创建由名称指定的新队列。
如果该队列已经存在,则不会对该队列进行任何更改,并且返回值将返回有关现有队列的信息。
队列名称是必需的,并指定为第一个参数。
如果PASSIVE为True,服务器将不创建队列。客户端可以使用它来检查队列是否存在,而无需修改服务器状态。默认值为FALSE。
如果耐久为True,则队列将是耐久的。服务器重新启动时,持久队列保持活动状态。如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。默认值为FALSE。
如果EXCLUSIVE为True,则队列将是独占的。独占队列只能由当前连接使用。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。默认值为FALSE。
如果AUTO_DELETE为True,则在所有使用者使用完队列后将其删除。最后一个消费者可以明确取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。默认为True。
未使用NoWait参数。它是0-9-1协议的一部分,但此AMQP客户端实现了0-10,删除了nowait选项。
Arguments参数是用于声明队列的一组参数。参数作为字典传递或不传递。如果被动为True,则忽略此字段。默认设置为无。
此方法返回一个
namedtuple
其名称为‘QUEUE_DECLARE_OK_T’,队列名称为‘QUEUE’,队列上的消息计数为‘MESSAGE_COUNT’,活动使用者数量为‘Consumer_COUNT’。命名的元组值分别按Queue、Message_Count和Consumer_Count排序。由于芹菜对事件的非ACK,因此在以字符串‘celeryev’开头或以字符串‘pidbox’结尾的任何队列上都设置了环策略。这些是西芹事件队列,西芹不会对它们进行确认,从而导致消息堆积。最终,除非设置了“环”策略,否则QPID将停止为消息提供服务,此时支持队列的缓冲区将变为循环。
- 参数:
- 返回:
将声明的队列表示为命名元组的命名元组。元组值按队列、消息计数和活动使用者计数排序。
- 返回类型:
namedtuple
- queue_delete(queue, if_unused=False, if_empty=False, **kwargs)[源代码]¶
按名称删除队列。
删除按名称指定的队列。使用IF_UNUSED关键字参数,只有在绑定了0个使用者时,才能执行删除操作。使用IF_EMPTY关键字参数,仅当队列中有0条消息时才能进行删除。
- 参数:
queue (str) -- 要删除的队列的名称。
- 关键字参数:
if_unused -- 如果为True,则仅当队列有0个使用者时才删除。如果为False,则删除队列,即使绑定了使用者也是如此。
if_empty -- 如果为True,则仅在队列为空时删除该队列。如果为False,则在队列为空或不为空时将其删除。
- queue_purge(queue, **kwargs)[源代码]¶
从队列中删除所有未送达的邮件。
从按名称指定的队列中清除所有未送达的邮件。如果队列不存在,则会引发异常。首先检查队列消息深度,然后要求代理清除该数量的消息。返回请求清除的消息的整数个。实际清除的邮件数可能与请求的要清除的邮件数不同。
有时会要求清除已传递的邮件,但不会。这种情况以静默方式失败,这是已传递给另一个使用者的消息的正确行为,该使用者尚未确认该消息,并且仍与代理有活动会话。在这种情况下,清除消息是不安全的,并将由代理保留。客户端无法更改此传递行为。
在内部,此方法依赖于
_purge()
。