This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

连接- kombu.connection

客户端(连接)。

连接

class kombu.connection.Connection(hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs)[源代码]

与经纪人的联系。

示例:

>>> Connection('amqp://guest:guest@localhost:5672//')
>>> Connection('amqp://foo;amqp://bar',
...            failover_strategy='round-robin')
>>> Connection('redis://', transport_options={
...     'visibility_timeout': 3000,
... })
>>> import ssl
>>> Connection('amqp://', login_method='EXTERNAL', ssl={
...    'ca_certs': '/etc/pki/tls/certs/something.crt',
...    'keyfile': '/etc/something/system.key',
...    'certfile': '/etc/something/system.cert',
...    'cert_reqs': ssl.CERT_REQUIRED,
... })

注:

目前,SSL仅适用于Py-AMQP和QPID传输。对于其他交通工具,您可以使用stantnel。

论点:

URL(字符串,序列):代理URL,或URL列表。

关键字参数:

SSL(bool/dict):使用SSL连接到服务器。

缺省值为 False 。可能不受指定传输的支持。

传输(Transport):如果未在URL中指定,则为默认传输。CONNECT_TIMEOUT(FLOAT):连接到

伺服器。可能不受指定传输的支持。

TRANSPORTS_OPTIONS(DICT):附加连接参数的DICT

传递到备用Kombu通道实现。有关可用选项,请查阅运输文档。

心跳(Float):心跳间隔,单位为Int/Float秒。

请注意,如果启用了心跳,则 heartbeat_check() 方法必须定期调用,大约每秒调用一次。

注:

当需要时,连接被延迟地建立。如果需要建立连接,则通过调用 connect() **

>>> conn = Connection('amqp://')
>>> conn.connect()

并始终记住关闭连接::

>>> conn.release()

这些选项已被URL参数替换,但仍支持向后兼容:

关键字主机名:

主机名/地址。注意:不能同时指定URL参数和使用HOSTNAME关键字参数。

关键字用户ID:

默认用户名(如果URL中未提供)。

关键字密码:

默认密码(如果URL中未提供)。

关键字VALUAL_HOST:

默认虚拟主机(如果URL中未提供)。

关键字端口:

默认端口(如果URL中未提供)。

ChannelPool(limit=None, **kwargs)[源代码]

频道池。

论点:

Limit(Int):活动通道的最大数量。

默认设置为无限制。

示例:

>>> connection = Connection('amqp://')
>>> pool = connection.ChannelPool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "kombu/connection.py", line 354, in acquire
  raise ChannelLimitExceeded(self.limit)
    kombu.connection.ChannelLimitExceeded: 2
>>> c1.release()
>>> c3 = pool.acquire()
Consumer(queues=None, channel=None, *args, **kwargs)[源代码]

创造新 kombu.Consumer 举个例子。

Pool(limit=None, **kwargs)[源代码]

连接池。

论点:

Limit(Int):最大活动连接数。

默认设置为无限制。

示例:

>>> connection = Connection('amqp://')
>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "kombu/connection.py", line 354, in acquire
  raise ConnectionLimitExceeded(self.limit)
    kombu.exceptions.ConnectionLimitExceeded: 2
>>> c1.release()
>>> c3 = pool.acquire()
Producer(channel=None, *args, **kwargs)[源代码]

创造新 kombu.Producer 举个例子。

SimpleBuffer(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[源代码]

简单的临时队列API。

创造新 SimpleQueue 使用来自此连接的通道。

SimpleQueue(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[源代码]

简单的持久队列API。

创造新 SimpleQueue ,使用来自该连接的通道。

如果 name 是一个字符串,则将使用该名称作为队列和交换的名称自动创建队列和交换,并将其用作默认路由键。

论点:

Name(str,kombu.Queue):队列的名称。No_ack(Bool):禁用确认。默认值为FALSE。Queue_opts(Dict):传递给

自动创建的 Queue

Queue_args(Dict):传递给

自动创建的 Queue 用于设置实现扩展(例如在RabbitMQ中)。

Exchange_OPTS(Dict):传递给

自动创建的 Exchange

Channel(Channel):要使用的自定义频道。如果未指定,则

使用连接默认通道。

as_uri(include_password=False, mask='**', getfields=operator.itemgetter('port', 'userid', 'password', 'virtual_host', 'transport')) str[源代码]

将连接参数转换为URL形式。

autoretry(fun, channel=None, **ensure_options)[源代码]

支持的函数的装饰符 channel 关键字参数。

如果生成的可调用函数引发连接或通道相关错误,则它将重试调用该函数。返回值将是 (retval, last_created_channel)

如果一个 channel 如果没有提供,则会自动获取一个(记得在之后关闭它)。

示例:

>>> channel = connection.channel()
>>> try:
...    ret, channel = connection.autoretry(
...         publish_messages, channel)
... finally:
...    channel.close()
channel()[源代码]

创建并返回一个新频道。

property channel_errors

通道可能引发的异常列表。

clone(**kwargs)[源代码]

使用相同的设置创建连接的副本。

close()

关闭连接(如果打开)。

collect(socket_timeout=None)[源代码]
completes_cycle(retries)[源代码]

如果循环在以下次数后完成,则返回TRUE retries

connect()[源代码]

立即建立与服务器的连接。

connect_timeout = 5
property connected

如果已建立连接,则返回TRUE。

property connection

基础连接对象。

警告:

该实例是特定于传输的,因此不依赖于该对象的接口。

property connection_errors

连接可能引发的异常列表。

create_transport()[源代码]
cycle = None

迭代器返回在连接失败时尝试的下一个代理URL(由 failover_strategy )。

declared_entities = None

声明实体的缓存是针对每个连接的,以防服务器丢失数据。

property default_channel: Channel

默认频道。

在访问时创建,并在连接关闭时关闭。

注:

当您只需要一个通道时,可以用于自动通道处理,如果将连接而不是通道传递给需要通道的函数,则它也是隐式使用的通道。

drain_events(**kwargs)[源代码]

等待来自服务器的单个事件。

论点:

Timeout(Float):放弃前的超时时间(秒)。

引发socket.timeout:

如果超过超时。:

ensure(obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None, retry_errors=None)[源代码]

确保操作完成。

而不管发生任何通道/连接错误。

通过建立连接并重新应用函数来重试。

论点:

OBJ:确保对其执行操作的对象。Fun(Callable):要应用的方法。

Errback(Callable):可选的回调

无法建立连接。提供的参数是引发的异常和将休眠的间隔 (exc, interval)

MAX_RETRIES(Int):重试的最大次数。

如果超过此限制,将重新引发连接错误。

INTERVAL_START(FLOAT):我们开始的秒数

睡了一觉。

INTERVAL_STEP(FLOAT):间隔增加了多少秒

每一次重试。

Interval_max(浮点数):休眠间隔的最大秒数

每次重试。

On_revive(Callable):可选的回调

恢复成功完成

RETRY_ERROR(元组):要重试的可选错误列表

而不管连接状态如何。

示例

>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
>>> def errback(exc, interval):
...     logger.error('Error: %r', exc, exc_info=1)
...     logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish,
...                       errback=errback, max_retries=3)
>>> publish({'hello': 'world'}, routing_key='dest')
ensure_connection(*args, **kwargs)[源代码]

用于追溯兼容性的公共接口_Assue_Connection。

返回kombu.Connection实例。

failover_strategies = {'round-robin': <class 'itertools.cycle'>, 'shuffle': <function shufflecycle>}
failover_strategy = 'round-robin'

用于在连接失败后重新连接时选择新主机的策略。“循环”、“洗牌”或任何定制迭代器之一,不断产生新的URL以供尝试。

get_heartbeat_interval()[源代码]
get_manager(*args, **kwargs)[源代码]
get_transport_cls()[源代码]

获取当前使用的传输类。

heartbeat = None

心跳数值,目前仅受Py-AMQP传输支持。

heartbeat_check(rate=2)[源代码]

检查一下心跳。

允许传输器执行使心跳工作所需的任何周期性任务。这应该大约每秒都被调用。

如果当前传输不支持心跳,则这是noop操作。

论点:

Rate(Int):rate是调用记号的频率

与实际的心跳值进行比较。例如,如果心跳设置为3秒,并且每隔3/2秒调用一次滴答,则速率为2。该值当前未被任何传输使用。

property host

以冒号分隔的主机名/端口对形式的主机。

hostname = None
info()[源代码]

获取连接信息。

property is_evented
login_method = None
property manager

AMQP管理API。

试验性管理器,可用于管理/监视Broker实例。

并非所有运输工具都可用。

maybe_close_channel(channel)[源代码]

关闭给定的通道,但忽略连接和通道错误。

maybe_switch_next()[源代码]

切换到当前故障转移策略指定的下一个URL。

password = None
port = None
property qos_semantics_matches_spec
property recoverable_channel_errors

可恢复的通道错误。

无需重新建立连接即可自动恢复的与通道相关的异常列表。

property recoverable_connection_errors

可恢复的连接错误。

可以从中恢复但必须首先关闭并重新建立连接的与连接相关的异常的列表。

register_with_event_loop(loop)[源代码]
release()[源代码]

关闭连接(如果打开)。

resolve_aliases = {'librabbitmq': 'amqp', 'pyamqp': 'amqp'}
revive(new_channel)[源代码]

在重新建立连接后恢复连接。

ssl = None
supports_exchange_type(exchange_type)[源代码]
property supports_heartbeats
switch(conn_str)[源代码]

切换连接参数以使用新的URL或主机名。

注:

不会重新连接!

论点:

Conn_str(Str):主机名或URL。

property transport
transport_options = None

其他特定于传输的选项,传递给传输实例。

uri_prefix = None
userid = None
virtual_host = '/'

泳池

参见

捷径方法 Connection.Pool()Connection.ChannelPool() 是实例化这些类的推荐方式。

class kombu.connection.ConnectionPool(connection, limit=None, **kwargs)[源代码]

连接池。

LimitExceeded = <class 'kombu.exceptions.ConnectionLimitExceeded'>
acquire(block=False, timeout=None)

获取资源。

论点:

BLOCK(Bool):如果超过限制,

然后阻止,直到有可用的项目。

超时(浮点数):等待超时

如果 block 是真的。缺省值为 None (永远)。

提升LimitExceed:

IF BLOCK为FALSE且已超过限制。:

release(resource)
force_close_all()

关闭并删除池中的所有资源(包括正在使用的资源)。

用于在分叉之后关闭父进程的资源(例如套接字/连接)。

class kombu.connection.ChannelPool(connection, limit=None, **kwargs)[源代码]

频道池。

LimitExceeded = <class 'kombu.exceptions.ChannelLimitExceeded'>
acquire(block=False, timeout=None)

获取资源。

论点:

BLOCK(Bool):如果超过限制,

然后阻止,直到有可用的项目。

超时(浮点数):等待超时

如果 block 是真的。缺省值为 None (永远)。

提升LimitExceed:

IF BLOCK为FALSE且已超过限制。:

release(resource)
force_close_all()

关闭并删除池中的所有资源(包括正在使用的资源)。

用于在分叉之后关闭父进程的资源(例如套接字/连接)。