This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

纯 Python AMQP传输- kombu.transport.pyamqp

Kombu的pyamqp传输模块。

使用Py-AMQP库的纯Python AMQP传输。

功能

  • 类型:原生

  • 支持Direct:是

  • 支持主题:是

  • 支持扇出:支持

  • 支持优先级:是

  • 支持TTL:支持

连接字符串

连接字符串可以采用以下格式:

amqp://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
amqp://

对于TLS加密使用:

amqps://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]

交通选择

传输选项被传递给基础y-amqp的构造函数 Connection 班级。

使用TLS

可通过以下方式启用TLS上的传输 ssl 的参数 Connection 班级。通过设置 ssl=True ,使用TLS传输::

conn = Connect('amqp://', ssl=True)

这相当于 amqps:// 传输URI::

conn = Connect('amqps://')

为了将附加参数添加到底层TLS, ssl 应使用DICT而不是True::设置参数

conn = Connect('amqp://broker.example.com', ssl={
        'keyfile': '/path/to/keyfile'
        'certfile': '/path/to/certfile',
        'ca_certs': '/path/to/ca_certfile'
    }
)

所有参数都将传递给 ssl 的参数 amqp.connection.Connection 班级。

SSL选项 server_hostname 可以设置为 None 这是使用Broker URL中的主机名造成的。这在使用故障切换填充时非常有用 server_hostname 使用当前使用的代理::

conn = Connect('amqp://broker1.example.com;broker2.example.com', ssl={
        'server_hostname': None
    }
)

运输

class kombu.transport.pyamqp.Transport(client, default_port=None, default_ssl_port=None, **kwargs)[源代码]

AMQP传输。

class Connection(host='localhost:5672', userid='guest', password='guest', login_method=None, login_response=None, authentication=(), virtual_host='/', locale='en_US', client_properties=None, ssl=False, connect_timeout=None, channel_max=None, frame_max=None, heartbeat=0, on_open=None, on_blocked=None, on_unblocked=None, confirm_publish=False, on_tune_ok=None, read_timeout=None, write_timeout=None, socket_settings=None, frame_handler=<function frame_handler>, frame_writer=<function frame_writer>, **kwargs)

AMQP连接。

class Channel(connection, channel_id=None, auto_decode=True, on_open=None)

AMQP频道。

class Message(msg, channel=None, **kwargs)

AMQP消息。

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
auto_decode
channel_id
connection
is_closing
message_to_python(raw_message)

将编码的消息正文转换回Python值。

method_queue
prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None, _Message=<class 'amqp.basic_message.Message'>)

准备邮件,以便可以使用此传输发送邮件。

prepare_queue_arguments(arguments, **kwargs)
auto_decode
channel_id
connection
is_closing
method_queue
channel_errors = (<class 'amqp.exceptions.ChannelError'>,)

由于通道/方法故障而可能发生的错误元组。

close_connection(connection)[源代码]

关闭AMQP Broker连接。

connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)

由于连接故障而可能发生的错误的元组。

create_channel(connection)[源代码]
property default_connection_params
default_port = 5672

未指定端口时使用的默认端口。

default_ssl_port = 5671
drain_events(connection, **kwargs)[源代码]
driver_name = 'py-amqp'

驱动程序库的名称(例如‘py-amqp’、‘redis’)。

driver_type = 'amqp'

驱动程序的类型,可用于使用AMQP协议(DRIVER_TYPE:‘AMQP’)、Redis(DRIVER_TYPE:‘REDIS’)等来分隔传输...

driver_version()[源代码]
establish_connection()[源代码]

建立与AMQP Broker的连接。

get_heartbeat_interval(connection)[源代码]
get_manager(*args, **kwargs)[源代码]
heartbeat_check(connection, rate=2)[源代码]
implements = {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'headers', 'topic'}), 'heartbeats': True}
qos_semantics_matches_spec(connection)[源代码]
recoverable_channel_errors = (<class 'amqp.exceptions.RecoverableChannelError'>,)
recoverable_connection_errors = (<class 'amqp.exceptions.RecoverableConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
register_with_event_loop(connection, loop)[源代码]
verify_connection(connection)[源代码]

连接

class kombu.transport.pyamqp.Connection(host='localhost:5672', userid='guest', password='guest', login_method=None, login_response=None, authentication=(), virtual_host='/', locale='en_US', client_properties=None, ssl=False, connect_timeout=None, channel_max=None, frame_max=None, heartbeat=0, on_open=None, on_blocked=None, on_unblocked=None, confirm_publish=False, on_tune_ok=None, read_timeout=None, write_timeout=None, socket_settings=None, frame_handler=<function frame_handler>, frame_writer=<function frame_writer>, **kwargs)[源代码]

AMQP连接。

class Channel(connection, channel_id=None, auto_decode=True, on_open=None)

AMQP频道。

Consumer(*args, **kwargs)
class Message(msg, channel=None, **kwargs)

AMQP消息。

exception MessageStateError

该消息已被确认。

add_note()

Exception.add_note(Note)--向异常添加注释

args
with_traceback()

Exception.with_traceback(TB)--set self.__traceback__ 去结核病,回到赛尔夫。

accept
ack(multiple=False)

确认此消息正在处理中。

这将从队列中删除该消息。

抛出:

MessageStateError -- 如果消息已经:已确认/已重新排队/已拒绝。

ack_log_error(logger, errors, multiple=False)
property acknowledged

如果消息已被确认,则设置为True。

body
channel
content_encoding
content_type
decode()

反序列化消息体。

返回发布者发送的原始python结构。

注:

对返回值进行备注,使用 _decode 以迫使重新评估。

delivery_info
delivery_tag
errors = None
headers
property payload

解码后的消息正文。

properties
reject(requeue=False)

拒绝此邮件。

该消息将被服务器丢弃。

抛出:

MessageStateError -- 如果消息已经:已确认/已重新排队/已拒绝。

reject_log_error(logger, errors, requeue=False)
requeue()

拒绝此邮件并将其放回队列中。

警告:

不能将此方法用作选择要处理的邮件的手段。

引发MessageStateError:

如果消息已经:已确认/已重新排队/已拒绝。

Producer(*args, **kwargs)
after_reply_message_received(queue)

在收到RPC回复后调用了回调。

备注

回复队列语义:用于在收到临时回复消息后删除队列。

auto_decode
basic_ack(delivery_tag, multiple=False, argsig='Lb')

确认一条或多条消息。

此方法确认通过Deliver或Get-OK方法传递的一条或多条消息。客户端可以要求确认单个消息或直到(包括)特定消息的一组消息。

参数:
  • delivery_tag -- 龙龙服务器分配的递送标签服务器分配的特定于渠道的递送标签规则:递送标签仅在接收邮件的渠道内有效。即,客户端不能在一个通道上接收消息,然后在另一个通道上确认它。规则:服务器不能对交付标记使用零值。零是保留给客户端使用的,这意味着“到目前为止收到的所有消息”。

  • multiple -- 布尔确认多条消息如果设置为True,则传递标记被视为“最多并包括”,这样客户端就可以用一种方法确认多条消息。如果设置为False,则传递标记指的是单个邮件。如果多个字段为True,且传递标记为零,则通知服务器确认所有未完成的邮件。规则:服务器必须验证非零传递标记是否引用了传递的消息,如果不是这样,则引发通道异常。

basic_cancel(consumer_tag, nowait=False, argsig='sb')

结束队列使用者。

此方法取消使用者。这不会影响已传递的消息,但它确实意味着服务器不会为该使用者发送更多消息。客户端可以在发送Cancel方法和接收Cancel-OK回复之间接收任意数量的消息。

规则:

如果当客户端发送取消命令时队列不再存在,或者消费者因其他原因被取消,则该命令无效。

参数:
  • consumer_tag -- 短字符串使用者标签使用者的识别符,在当前连接中有效。规则:消费者标签仅在创建消费者的渠道内有效。即,客户端不能在一个渠道中创建消费者,然后在另一个渠道中使用它。

  • nowait -- 布尔值不发送回复方法如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

basic_consume(queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, callback=None, arguments=None, on_cancel=None, argsig='BssbbbbF')

启动队列使用者。

此方法要求服务器启动“使用者”,这是对来自特定队列的消息的临时请求。消费者的存续期和他们创建的渠道一样长,或者直到客户取消他们为止。

规则:

服务器应该至少支持每个队列16个使用者,除非该队列被声明为私有的,并且理想情况下,除非由可用资源定义,否则不施加任何限制。

参数:
  • queue -- Shorstr指定要从中消费的队列的名称。如果队列名称为空,则表示通道的当前队列,这是最后声明的队列。规则:如果客户端之前没有声明队列,并且此方法中的队列名称为空,则服务器必须引发连接异常,回复代码为530(不允许)。

  • consumer_tag -- Shorstr指定使用者的标识符。使用者标记是连接的本地标记,因此两个客户端可以使用相同的使用者标记。如果此字段为空,服务器将生成唯一标记。规则:标记不能引用现有的使用者。如果客户端尝试使用相同的非空标记创建两个使用者,则服务器必须引发带有回复代码530(不允许)的连接异常。

  • no_local -- 布尔值不传递自己的消息如果设置了非本地字段,则服务器将不向发布消息的客户端发送消息。

  • no_ack -- 布尔值不需要确认如果设置了此字段,则服务器不期望对消息进行确认。也就是说,当消息被传递到客户端时,服务器代表客户端自动且静默地确认它。此功能提高了性能,但以可靠性为代价。如果客户端在将消息传递到应用程序之前死亡,消息可能会丢失。

  • exclusive -- 布尔请求独占访问请求独占使用者访问,这意味着只有该使用者可以访问队列。规则:如果服务器在被询问时不能授予对队列的独占访问权限--因为有其他消费者处于活动状态--它必须引发通道异常,返回代码为403(拒绝访问)。

  • nowait -- 布尔值不发送回复方法如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

  • callback -- 对于代理传递的每个消息,随每个传递的消息一起调用的Python可调用函数/方法,将使用消息对象作为单个参数来调用可调用函数/方法。如果没有指定Callable,消息就会被悄悄丢弃,在这种情况下,no_ack可能应该设置为True。

basic_get(queue='', no_ack=False, argsig='Bsb')

直接访问队列。

此方法使用同步对话提供对队列中消息的直接访问,该对话是为同步功能比性能更重要的特定类型的应用程序设计的。

参数:
  • queue -- Shorstr指定要从中消费的队列的名称。如果队列名称为空,则表示通道的当前队列,这是最后声明的队列。规则:如果客户端之前没有声明队列,并且此方法中的队列名称为空,则服务器必须引发连接异常,回复代码为530(不允许)。

  • no_ack -- 布尔值不需要确认如果设置了此字段,则服务器不期望对消息进行确认。也就是说,当消息被传递到客户端时,服务器代表客户端自动且静默地确认它。此功能提高了性能,但以可靠性为代价。如果客户端在将消息传递到应用程序之前死亡,消息可能会丢失。

非阻塞,返回amqp.Basic_Message.Message对象,如果队列为空,则返回None。

basic_publish(msg, exchange='', routing_key='', mandatory=False, immediate=False, timeout=None, confirm_timeout=None, argsig='Bssbb')

发布一条消息。

此方法将消息发布到特定的交换。消息将被路由到Exchange配置定义的队列,并在提交事务(如果有)时分发给任何活动使用者。

当通道处于确认模式时(当连接参数CONFIRM_PUBLISH设置为True时),每条消息都被确认。当Broker拒绝已发布的消息(例如,到期的内部Broker约束)时,会引发MessageNack异常,并将CONFIRM_TIMEOUT设置为等待消息确认的最大确认_超时秒数。

参数:
  • exchange -- Shorstr指定要发布到的交换的名称。交换名称可以为空,表示默认交换。如果指定了交换器名称,但该交换器不存在,则服务器将引发通道异常。规则:服务器必须接受空白的交换名称以表示默认交换。规则:交易所可以拒绝基本内容,在这种情况下,它必须引发通道异常,回复代码为540(未实现)。

  • routing_key -- Shortstr邮件路由键指定邮件的路由键。路由密钥用于根据Exchange配置来路由邮件。

  • mandatory -- Boolean表示强制路由此标志告诉服务器如何在消息无法路由到队列时做出反应。如果此标志为True,则服务器将返回带有返回方法的不可路由消息。如果此标志为FALSE,服务器将静默丢弃该消息。规则:服务器应实现强制标志。

  • immediate -- 布尔请求立即送达此标志告诉服务器在消息无法立即路由到队列使用者的情况下如何作出反应。如果设置了此标志,服务器将返回带有返回方法的无法送达的邮件。如果此标志为零,则服务器将对消息进行排队,但不能保证消息将被使用。规则:服务器应实现立即标志。

  • timeout -- 发布的短超时设置等待消息发布的最大超时秒数。

  • confirm_timeout -- 短确认超时用于在确认模式下发布当通道处于确认模式时,将确认超时设置为等待消息确认的最大确认超时秒。

basic_publish_confirm(*args, **kwargs)
basic_qos(prefetch_size, prefetch_count, a_global, argsig='lBb')

指定服务质量。

该方法要求特定的服务质量。可以为当前通道或连接上的所有通道指定服务质量。Qos方法的特定属性和语义始终取决于内容类语义。虽然在原则上,Qos方法可以应用于两个对等体,但它目前仅对服务器有意义。

参数:
  • prefetch_size -- 较长的预取窗口(以八位字节为单位)客户端可以请求提前发送消息,以便当客户端完成消息处理时,以下消息已在本地保存,而不需要沿通道发送。预回迁可以提高性能。此字段以八位字节为单位指定预取窗口大小。如果消息大小等于或小于可用预取大小(并且也落入其他预取限制),则服务器将提前发送消息。可以设置为零,这意味着“没有特定的限制”,尽管其他预取限制仍然适用。如果设置了no-ack选项,则忽略预取大小。规则:当客户端没有处理任何消息时,服务器必须忽略此设置-即,预取大小不限制向客户端传输单个消息,而只是在客户端仍有一个或多个未确认的消息时提前发送更多消息。

  • prefetch_count -- 消息中的短预取窗口根据整个消息指定预取窗口。此字段可与预取大小字段结合使用;只有在两个预取窗口(以及通道和连接级别的窗口)都允许的情况下,才会提前发送消息。如果设置了no-ack选项,则忽略预取计数。规则:服务器可以提前发送比客户端指定的预取窗口所允许的更少的数据,但不能发送更多的数据。

  • a_global -- 布尔值定义了一个服务质量范围。此参数的语义在AMQP 0-9-1标准和RabbitMQ Broker之间不同:在AMQP 0-9-1中的含义:False:在通道上的所有消费者之间共享True:在连接上的所有消费者中共享在RABBITMQ:False:分别应用于通道上的每个新消费者True:Shared在通道上的所有消费者中共享

basic_recover(requeue=False)

重新传递未确认的邮件。

此方法要求代理在指定的通道上重新传递所有未确认的消息。可以重新传递零个或多个消息。此方法仅允许在非事务通道上使用。

规则:

服务器必须在重新发送的所有邮件上设置重新传递标志。

规则:

如果在事务处理的通道上调用此异常,则服务器必须引发通道异常。

参数:

requeue -- 布尔值重新排队邮件如果此字段为FALSE,则邮件将重新传递给原始收件人。如果此字段为True,则服务器将尝试重新排队邮件,然后可能会将其传递给其他订阅方。

basic_recover_async(requeue=False)
basic_reject(delivery_tag, requeue, argsig='Lb')

拒绝传入的消息。

此方法允许客户端拒绝消息。它可用于中断和取消大型传入消息,或将无法处理的消息返回到其原始队列。

规则:

服务器应该能够在使用Deliver或Get-OK方法发送消息内容时接受和处理Reject方法。即服务器应该在发送输出帧的同时读取和处理传入的方法。为了取消部分发送的内容,服务器发送大小为1的内容正文帧(即,除了帧结束八位字节外没有任何数据)。

规则:

服务器应将此方法解释为客户端此时无法处理消息。

规则:

客户端不得将此方法用作选择要处理的消息的手段。被拒绝的消息可以被丢弃或死信,而不一定传递给另一个客户端。

参数:
  • delivery_tag -- 龙龙服务器分配的递送标签服务器分配的特定于渠道的递送标签规则:递送标签仅在接收邮件的渠道内有效。即,客户端不能在一个通道上接收消息,然后在另一个通道上确认它。规则:服务器不能对交付标记使用零值。零是保留给客户端使用的,这意味着“到目前为止收到的所有消息”。

  • requeue -- 布尔值重新排队消息如果此字段为FALSE,则消息将被丢弃。如果此字段为True,则服务器将尝试重新排队消息。规则:服务器不得在当前通道的上下文中将消息传递到同一客户端。建议的策略是尝试将消息传递给替代使用者,如果不可能,则将消息移动到死信队列。服务器可以使用更复杂的跟踪来将消息保留在队列中,并在稍后将其重新传递给相同的客户端。

channel_id
close(reply_code=0, reply_text='', method_sig=(0, 0), argsig='BsBB')

请求关闭频道。

此方法指示发送方要关闭通道。这可能是由于内部条件(例如强制关闭)或由于处理特定方法(即异常)时出错造成的。如果关闭是由异常引起的,则发送方提供导致异常的方法的类和方法ID。

规则:

发送此方法后,必须丢弃除Channel.Close-OK之外的任何接收到的方法。

规则:

发送此方法的对等体可以使用计数器或超时来检测另一个对等体未能正确响应频道。关闭-OK。

参数:
  • reply_code -- 缩短回复代码。AMQ应答码在AMQ RFC 011中定义。

  • reply_text -- 缩写本地化回复文本。可以将此文本记录为解决问题的帮助。

  • class_id -- 短暂失败的方法类当关闭由方法异常引发时,这是该方法的类。

  • method_id -- 短失败方法ID当关闭由方法异常引发时,这是该方法的ID。

collect()

把这个东西拆了。

最好在我们同意关闭服务器后再打来。

confirm_select(nowait=False)

为此频道启用发布者确认。

注意:这是RabbitMQ的扩展。

如果通道处于事务模式,则现在可以使用。

参数:

nowait -- 如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

connection
dispatch_method(method_sig, payload, content)
exchange_bind(destination, source='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')

将交换绑定到交换。

规则:

服务器必须允许和忽略重复绑定--即具有相同参数的特定交换的两个或多个绑定方法--而不能将其视为错误。

规则:

服务器必须允许创建交换绑定循环,包括允许将交换绑定到其自身。

规则:

服务器不得将同一消息多次发送到目标交换机,即使交换机和绑定的拓扑导致到该交换机的多个(甚至无限)路由也是如此。

参数:
  • reserved-1 -- 短的

  • destination -- Short str指定要绑定的目标交换的名称。规则:不得允许客户端绑定不存在的目标Exchange。规则:服务器必须接受空白的交换名称以表示默认交换。

  • source -- Short str指定要绑定的源交换的名称。规则:不得允许客户端绑定不存在的源交换。规则:服务器必须接受空白的交换名称以表示默认交换。

  • routing-key -- Shortstr指定绑定的路由关键字。路由密钥用于根据Exchange配置来路由邮件。并非所有交换机都使用路由密钥-请参阅特定的交换机文档。

  • no-wait -- 比特

  • arguments -- 表绑定的一组参数。这些参数的语法和语义取决于交换类。

exchange_declare(exchange, type, passive=False, durable=False, auto_delete=True, nowait=False, arguments=None, argsig='BssbbbbbF')

声明交换,如果需要则创建。

如果该交换尚不存在,则此方法创建该交换,如果该交换存在,则验证它是否属于正确和预期的类。

规则:

服务器应支持每个虚拟主机至少16个交换,并且理想情况下,除非由可用资源定义,否则不施加任何限制。

参数:
  • exchange -- 短字符串规则:交换名称以“AMQ”开头。预留给预先申报和标准化的交易所。如果客户端尝试创建以“amq.”开头的交换,则服务器必须引发带有回复代码403(拒绝访问)的通道异常。

  • type -- 短字符串交换类型每个交换属于由服务器实现一组交换类型之一。交换类型定义了交换的功能,即消息如何通过它进行路由。尝试更改现有交换的类型是无效的,也没有意义。规则:如果已存在不同类型的交换,则服务器必须引发连接异常,并返回回复代码507(不允许)。规则:如果服务器不支持请求的交换类型,则必须引发连接异常,并返回回复代码503(命令无效)。

  • passive -- 布尔值不创建交换如果设置,服务器将不创建交换。客户端可以使用它来检查是否存在交换,而无需修改服务器状态。规则:如果已设置,并且交换尚不存在,则服务器必须引发通道异常,回复代码为404(未找到)。

  • durable -- 布尔请求持久交换如果在创建新交换时设置,则该交换将被标记为持久交换。服务器重新启动时,持久交换保持活动状态。如果/当服务器重新启动时,非持久交换(瞬时交换)将被清除。规则:服务器必须同时支持持久交换和临时交换。规则:如果交换已存在,则服务器必须忽略持久字段。

  • auto_delete -- 布尔自动删除如果设置为未使用,则在所有队列使用完交换后将其删除。规则:服务器应允许在确定某个交换未被使用(或不再使用)和删除该交换之间有一段合理的延迟。它至少必须允许客户端创建一个交换,然后将一个队列绑定到它,在这两个操作之间有一个很小但不是零的延迟。规则:如果交换已存在,服务器必须忽略自动删除字段。

  • nowait -- 布尔值不发送回复方法如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

  • arguments -- 声明的表参数值一组用于声明的参数值。这些参数的语法和语义取决于服务器实现。如果被动为True,则忽略此字段。

exchange_delete(exchange, if_unused=False, nowait=False, argsig='Bsbb')

删除交换。

此方法删除交换。删除交换时,该交换上的所有队列绑定都将取消。

参数:
  • exchange -- 短规则:交易所必须存在。尝试删除不存在的交换会导致通道异常。

  • if_unused -- 仅布尔删除如果设置为未使用,则服务器将仅在没有队列绑定的情况下删除交换。如果交换具有队列绑定,则服务器不会删除它,而是引发通道异常。规则:如果设置,则服务器应删除该交换,但前提是该交换没有队列绑定。规则:如果设置,则在Exchange正在使用时,服务器应引发通道异常。

  • nowait -- 布尔值不发送回复方法如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

exchange_unbind(destination, source='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')

解除交易所与交易所的绑定。

规则:

如果解除绑定失败,服务器必须引发连接异常。

参数:
  • reserved-1 -- 短的

  • destination -- Short str指定要解除绑定的目标交换的名称。规则:客户端不得尝试从不存在的Exchange解除绑定。规则:服务器必须接受空白的交换名称以表示默认交换。

  • source -- Short str指定要解除绑定的源交换的名称。规则:客户端不得尝试从不存在的交换解除绑定。规则:服务器必须接受空白的交换名称以表示默认交换。

  • routing-key -- Short str指定要解除绑定的绑定的路由键。

  • no-wait -- 比特

  • arguments -- 表指定要解除绑定的绑定的参数。

flow(active)

启用/禁用来自对等设备的流量。

此方法要求对等方暂停或重新启动内容数据流。这是一种简单的流量控制机制,对等方可以使用它来避免使其队列溢出或发现自己接收的消息多于其可以处理的消息。请注意,此方法不适用于窗口控件。接收到停止发送内容的请求的对等体应该完成当前内容的发送(如果有的话),然后等待,直到它接收到流重新启动方法。

规则:

当打开新通道时,该通道处于活动状态。一些应用程序假定通道在启动之前处于非活动状态。为了模拟这种行为,客户端可以打开通道,然后暂停它。

规则:

在多个帧中发送内容数据时,对等项应监控通道中的传入方法并响应通道。尽可能快速地流动。

规则:

对等方可能出于内部原因使用Channel.Flow方法来限制传入的内容数据,例如,在通过较慢的连接交换数据时。

规则:

请求Channel.Flow方法的对等方可能会断开连接和/或禁止不尊重该请求的对等方。

参数:

active -- 布尔开始/停止内容帧如果为True,则对等方开始发送内容帧。如果为False,则对等方停止发送内容帧。

get_bindings()
is_closing
message_to_python(raw_message)

将编码的消息正文转换回Python值。

method_queue
no_ack_consumers = None
open()

打开一条通道以供使用。

此方法打开一个虚拟连接(通道)。

规则:

当通道已打开时,不得调用此方法。

参数:

out_of_band -- Shorstr(不推荐使用)带外设置在此通道上配置带外传输。此字段的语法和含义将在以后正式定义。

prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None, _Message=<class 'amqp.basic_message.Message'>)

准备邮件,以便可以使用此传输发送邮件。

prepare_queue_arguments(arguments, **kwargs)
queue_bind(queue, exchange='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')

将队列绑定到交换。

此方法将队列绑定到交换。在绑定队列之前,它不会接收任何消息。在经典消息传递模型中,存储转发队列绑定到DEST交换,订阅队列绑定到DEST_WARD交换。

规则:

服务器必须允许忽略重复绑定--即特定队列的两个或多个具有相同参数的绑定方法--而不将它们视为错误。

规则:

如果绑定失败,服务器必须引发连接异常。

规则:

服务器不得允许持久队列绑定到临时交换。如果客户端尝试这样做,服务器必须引发通道异常。

规则:

持久队列的绑定是自动持久的,服务器应在服务器重新启动后恢复此类绑定。

规则:

服务器应支持每个队列至少4个绑定,理想情况下,除非由可用资源定义,否则不施加任何限制。

参数:
  • queue -- Short str指定要绑定的队列的名称。如果队列名称为空,则表示通道的当前队列,这是最后声明的队列。规则:如果客户端之前没有声明队列,并且此方法中的队列名称为空,则服务器必须引发连接异常,回复代码为530(不允许)。规则:如果队列不存在,服务器必须引发通道异常,回复代码为404(未找到)。

  • exchange -- 短字符串要绑定到的交换的名称。规则:如果交换不存在,服务器必须引发通道异常,回复代码为404(未找到)。

  • routing_key -- Shortstr消息路由键指定绑定的路由键。路由密钥用于根据Exchange配置来路由邮件。并非所有交换机都使用路由密钥-请参阅特定的交换机文档。如果路由键为空,并且队列名称为空,则路由键将是该通道的当前队列,这是最后声明的队列。

  • nowait -- 布尔值不发送回复方法如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

  • arguments -- 用于绑定的表参数绑定的一组参数。这些参数的语法和语义取决于交换类。

queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None, argsig='BsbbbbbF')

声明队列,如果需要则创建。

此方法创建或检查队列。在创建新队列时,客户端可以指定各种属性来控制队列及其内容的持久性,以及队列的共享级别。

规则:

服务器必须为新创建的队列创建到缺省交换的缺省绑定,缺省交换是‘DIRECT’类型的交换。

规则:

服务器应支持每个虚拟主机至少256个队列,理想情况下,除非由可用资源定义,否则不施加任何限制。

参数:
  • queue -- Short str规则:队列名称可以为空,在这种情况下,服务器必须使用唯一生成的名称创建一个新队列,并在Declare-OK方法中将其返回给客户端。规则:队列名称以“AMQ”开头。为预先声明的和标准化的服务器队列保留。如果队列名称以“amq”开头。并且被动选项为假,则服务器必须引发连接异常,回复代码为403(拒绝访问)。

  • passive -- 布尔值不创建队列如果设置,服务器将不创建队列。客户端可以使用它来检查队列是否存在,而无需修改服务器状态。规则:如果设置,并且队列不存在,则服务器必须使用回复代码404(未找到)进行响应,并引发通道异常。

  • durable -- 布尔请求持久队列如果在创建新队列时设置,该队列将被标记为持久队列。服务器重新启动时,持久队列保持活动状态。如果/当服务器重新启动时,非持久队列(临时队列)将被清除。请注意,持久队列不一定保存持久消息,尽管将持久消息发送到临时队列没有意义。规则:服务器必须在重新启动后重新创建持久队列。规则:服务器必须同时支持持久队列和临时队列。规则:如果队列已经存在,服务器必须忽略耐久字段。

  • exclusive -- 布尔请求独占队列独占队列只能由当前连接使用。设置‘EXCLUSIVE’标志总是意味着‘自动删除’。规则:服务器必须同时支持独占(私有)和非独占(共享)队列。规则:如果指定了‘EXCLUSIVE’,并且队列已经存在并且由不同的连接拥有,则服务器必须引发通道异常。

  • auto_delete -- 布尔自动删除队列如果设置为未使用,则在所有使用者使用完队列后将其删除。最后一个消费者可以明确取消,也可以因为其渠道关闭而取消。如果队列中没有消费者,则不会将其删除。规则:服务器应允许在确定队列未被使用(或不再使用)和删除队列之间有一段合理的延迟。至少,它必须允许客户端创建一个队列,然后创建一个从队列中读取数据的使用者,在这两个操作之间有一个很小但不为零的延迟。服务器应该同样允许可能过早断开连接的客户端,并且希望在不丢失消息的情况下从同一队列重新消费。我们建议使用可配置的超时,合适的缺省值为一分钟。规则:如果队列已经存在,服务器必须忽略自动删除字段。

  • nowait -- 布尔值不发送回复方法如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

  • arguments -- 声明的表参数值一组用于声明的参数值。这些参数的语法和语义取决于服务器实现。如果被动为True,则忽略此字段。

返回包含3项的元组:

队列的名称(对于自动命名的队列至关重要)、消息计数和使用者计数

queue_delete(queue='', if_unused=False, if_empty=False, nowait=False, argsig='Bsbbb')

删除队列。

此方法删除队列。删除队列时,如果在服务器配置中定义了死信队列,则任何挂起的消息都将发送到死信队列,并且该队列上的所有使用者都将被取消。

规则:

服务器应该使用死信队列来保存在已删除队列上挂起的消息,并且可以为系统管理员提供将这些消息移回活动队列的工具。

参数:
  • queue -- Shorstr指定要删除的队列的名称。如果队列名称为空,则表示通道的当前队列,这是最后声明的队列。规则:如果客户端之前没有声明队列,并且此方法中的队列名称为空,则服务器必须引发连接异常,回复代码为530(不允许)。规则:队列必须存在。尝试删除不存在的队列会导致通道异常。

  • if_unused -- 仅当未使用时才删除布尔值如果设置,则服务器仅在没有使用者的情况下删除队列。如果队列有使用者,服务器不会删除它,而是引发通道异常。规则:删除队列时,服务器必须遵守IF-UNUSED标志。

  • if_empty -- 仅布尔值DELETE如果设置为空,则服务器仅在队列没有消息时删除该队列。如果队列不为空,则服务器引发通道异常。

  • nowait -- 布尔值不发送回复方法如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

如果noWait为False,则返回已删除的消息数。

queue_purge(queue='', nowait=False, argsig='Bsb')

清除队列。

此方法从队列中移除所有消息。它不会取消消费者。清除的邮件将被删除,而不会有任何正式的“撤消”机制。

规则:

调用清除必须导致空队列。

规则:

在事务处理的通道上,服务器不得清除已发送到客户端但尚未确认的消息。

规则:

服务器可以实现清除队列或日志,以允许系统管理员恢复意外清除的消息。服务器不应将清除的邮件保存在与实时邮件相同的存储空间中,因为清除的邮件的数量可能会非常大。

参数:
  • queue -- Shorstr指定要清除的队列的名称。如果队列名称为空,则表示通道的当前队列,这是最后声明的队列。规则:如果客户端之前没有声明队列,并且此方法中的队列名称为空,则服务器必须引发连接异常,回复代码为530(不允许)。规则:队列必须存在。尝试清除不存在的队列会导致通道异常。

  • nowait -- 布尔值不发送回复方法如果设置,服务器将不会响应该方法。客户端不应等待回复方法。如果服务器无法完成该方法,它将引发通道或连接异常。

如果noWait为False,则返回多条已清除的消息。

queue_unbind(queue, exchange, routing_key='', nowait=False, arguments=None, argsig='BsssF')

解除队列与交换的绑定。

此方法解除队列与交换的绑定。

规则:

如果解除绑定失败,服务器必须引发连接异常。

参数:
  • queue -- Short str指定要解除绑定的队列的名称。规则:客户端必须指定队列名称或先前已在同一通道规则上声明队列:客户端不得尝试解除绑定不存在的队列。

  • exchange -- 短字符串要解除绑定的交换的名称。规则:客户端不得尝试将队列与不存在的交换解除绑定。规则:服务器必须接受空白的交换名称以表示默认交换。

  • routing_key -- 绑定的Shorstr路由键指定要解绑的绑定的路由键。

  • arguments -- 绑定的表参数指定要解除绑定的绑定的参数。

send_method(sig, format=None, args=None, content=None, wait=None, callback=None, returns_tuple=False)
then(on_success, on_error=None)
tx_commit()

提交当前事务。

此方法提交当前事务中发布和确认的所有消息。新事务在提交后立即开始。

tx_rollback()

放弃当前交易。

此方法将放弃当前事务中发布和确认的所有消息。新事务在回滚后立即启动。

tx_select()

选择标准交易模式。

此方法将通道设置为使用标准事务。在使用COMMIT或ROLLBACK方法之前,客户端必须在通道上至少使用此方法一次。

wait(method, callback=None, timeout=None, returns_tuple=False)
Transport(host, connect_timeout, ssl=False, read_timeout=None, write_timeout=None, socket_settings=None, **kwargs)[源代码]
auto_decode
blocking_read(timeout=None)[源代码]
bytes_recv = 0

从套接字成功读取的次数。

bytes_sent = 0

成功写入套接字的次数。

channel(channel_id=None, callback=None)[源代码]

创建新频道。

获取一个由数字Channel_id标识的Channel对象,如果该对象尚不存在,则创建该对象。

channel_errors = (<class 'amqp.exceptions.ChannelError'>,)
channel_id
client_heartbeat = None

客户端建议的原始心跳间隔值。

close(reply_code=0, reply_text='', method_sig=(0, 0), argsig='BsBB')[源代码]

请求关闭连接。

此方法指示发件人要关闭连接。这可能是由于内部条件(例如强制关闭)或由于处理特定方法(即异常)时出错造成的。如果关闭是由异常引起的,则发送方提供导致异常的方法的类和方法ID。

规则:

发送此方法后,必须丢弃除Close-OK方法之外的任何接收到的方法。

规则:

发送该方法的对等体可以使用计数器或超时来检测另一个对等体未能用Close-OK方法正确响应。

规则:

当服务器从客户端接收Close方法时,它必须删除与客户端上下文相关联的所有服务器端资源。客户端在发送或接收Close方法后无法重新连接到上下文。

参数:
  • reply_code -- 缩短回复代码。AMQ应答码在AMQ RFC 011中定义。

  • reply_text -- 缩写本地化回复文本。可以将此文本记录为解决问题的帮助。

  • class_id -- 短暂失败的方法类当关闭由方法异常引发时,这是该方法的类。

  • method_id -- 短失败方法ID当关闭由方法异常引发时,这是该方法的ID。

collect()[源代码]
connect(callback=None)[源代码]
property connected
connection
connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
dispatch_method(method_sig, payload, content)
drain_events(timeout=None)[源代码]
property frame_writer
heartbeat = None

协商后的最终心跳间隔值(浮点秒)

heartbeat_tick(rate=2)[源代码]

如有必要,发送心跳数据包。

抛出:

ConnectionForvced -- 如果最近没有收到任何邮件。

备注

这应该被频繁地调用,大约每秒一次。

关键字参数:

rate (int) -- 以前使用过,但现在被忽略了。

is_alive()[源代码]
is_closing
last_heartbeat_received = 0

上次收到心跳的时间(如果可用,以单调时间表示)。

last_heartbeat_sent = 0

上次发送心跳的时间(如果可用,以单调时间表示)。

library_properties = {'product': 'py-amqp', 'product_version': '5.1.1'}

这些信息被发送到服务器,以宣布我们支持哪些功能、客户端类型等。

method_queue
negotiate_capabilities = {'authentication_failure_close': True, 'connection.blocked': True, 'consumer_cancel_notify': True}

要启用的协议扩展映射。服务器将在SERVER_PROPERTIES中报告这些信息 [capabilities] ,如果此映射中存在密钥,则客户端将根据此映射中设置的值通知服务器启用或禁用该功能。例如,使用:

协商能力={

‘Consumer_Cancel_Notify’:True,

}

如果服务器报告支持此功能,则客户端将启用此功能,但如果值为FALSE,则客户端将禁用此功能。

property on_inbound_frame
on_inbound_method(channel_id, method_sig, payload, content)[源代码]
prev_recv = None

上次检测信号检查时从套接字接收的字节数。

prev_sent = None

上次检测信号检查时发送到套接字的字节数。

recoverable_channel_errors = (<class 'amqp.exceptions.RecoverableChannelError'>,)
recoverable_connection_errors = (<class 'amqp.exceptions.RecoverableConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
send_heartbeat()[源代码]
send_method(sig, format=None, args=None, content=None, wait=None, callback=None, returns_tuple=False)
property server_capabilities
server_heartbeat = None

服务器建议的原始心跳间隔。

property sock
then(on_success, on_error=None)[源代码]
property transport
wait(method, callback=None, timeout=None, returns_tuple=False)

渠道

class kombu.transport.pyamqp.Channel(connection, channel_id=None, auto_decode=True, on_open=None)[源代码]

AMQP频道。

class Message(msg, channel=None, **kwargs)

AMQP消息。

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
auto_decode
channel_id
connection
is_closing
message_to_python(raw_message)[源代码]

将编码的消息正文转换回Python值。

method_queue
prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None, _Message=<class 'amqp.basic_message.Message'>)[源代码]

准备邮件,以便可以使用此传输发送邮件。

prepare_queue_arguments(arguments, **kwargs)[源代码]

消息

class kombu.transport.pyamqp.Message(msg, channel=None, **kwargs)[源代码]

AMQP消息。

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties