This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
连接- kombu.connection
¶
客户端(连接)。
连接¶
- class kombu.connection.Connection(hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs)[源代码]¶
与经纪人的联系。
示例:¶
>>> Connection('amqp://guest:guest@localhost:5672//') >>> Connection('amqp://foo;amqp://bar', ... failover_strategy='round-robin') >>> Connection('redis://', transport_options={ ... 'visibility_timeout': 3000, ... })
>>> import ssl >>> Connection('amqp://', login_method='EXTERNAL', ssl={ ... 'ca_certs': '/etc/pki/tls/certs/something.crt', ... 'keyfile': '/etc/something/system.key', ... 'certfile': '/etc/something/system.cert', ... 'cert_reqs': ssl.CERT_REQUIRED, ... })
注:¶
目前,SSL仅适用于Py-AMQP和QPID传输。对于其他交通工具,您可以使用stantnel。
论点:¶
URL(字符串,序列):代理URL,或URL列表。
关键字参数:¶
- SSL(bool/dict):使用SSL连接到服务器。
缺省值为
False
。可能不受指定传输的支持。
传输(Transport):如果未在URL中指定,则为默认传输。CONNECT_TIMEOUT(FLOAT):连接到
伺服器。可能不受指定传输的支持。
- TRANSPORTS_OPTIONS(DICT):附加连接参数的DICT
传递到备用Kombu通道实现。有关可用选项,请查阅运输文档。
- 心跳(Float):心跳间隔,单位为Int/Float秒。
请注意,如果启用了心跳,则
heartbeat_check()
方法必须定期调用,大约每秒调用一次。
注:¶
当需要时,连接被延迟地建立。如果需要建立连接,则通过调用
connect()
**>>> conn = Connection('amqp://') >>> conn.connect()
并始终记住关闭连接::
>>> conn.release()
这些选项已被URL参数替换,但仍支持向后兼容:
- 关键字主机名:
主机名/地址。注意:不能同时指定URL参数和使用HOSTNAME关键字参数。
- 关键字用户ID:
默认用户名(如果URL中未提供)。
- 关键字密码:
默认密码(如果URL中未提供)。
- 关键字VALUAL_HOST:
默认虚拟主机(如果URL中未提供)。
- 关键字端口:
默认端口(如果URL中未提供)。
- ChannelPool(limit=None, **kwargs)[源代码]¶
频道池。
论点:¶
- Limit(Int):活动通道的最大数量。
默认设置为无限制。
示例:¶
>>> connection = Connection('amqp://') >>> pool = connection.ChannelPool(2) >>> c1 = pool.acquire() >>> c2 = pool.acquire() >>> c3 = pool.acquire() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "kombu/connection.py", line 354, in acquire raise ChannelLimitExceeded(self.limit) kombu.connection.ChannelLimitExceeded: 2 >>> c1.release() >>> c3 = pool.acquire()
- Consumer(queues=None, channel=None, *args, **kwargs)[源代码]¶
创造新
kombu.Consumer
举个例子。
- Pool(limit=None, **kwargs)[源代码]¶
连接池。
论点:¶
- Limit(Int):最大活动连接数。
默认设置为无限制。
示例:¶
>>> connection = Connection('amqp://') >>> pool = connection.Pool(2) >>> c1 = pool.acquire() >>> c2 = pool.acquire() >>> c3 = pool.acquire() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "kombu/connection.py", line 354, in acquire raise ConnectionLimitExceeded(self.limit) kombu.exceptions.ConnectionLimitExceeded: 2 >>> c1.release() >>> c3 = pool.acquire()
- Producer(channel=None, *args, **kwargs)[源代码]¶
创造新
kombu.Producer
举个例子。
- SimpleBuffer(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[源代码]¶
简单的临时队列API。
创造新
SimpleQueue
使用来自此连接的通道。
- SimpleQueue(name, no_ack=None, queue_opts=None, queue_args=None, exchange_opts=None, channel=None, **kwargs)[源代码]¶
简单的持久队列API。
创造新
SimpleQueue
,使用来自该连接的通道。如果
name
是一个字符串,则将使用该名称作为队列和交换的名称自动创建队列和交换,并将其用作默认路由键。论点:¶
- as_uri(include_password=False, mask='**', getfields=operator.itemgetter('port', 'userid', 'password', 'virtual_host', 'transport')) str [源代码]¶
将连接参数转换为URL形式。
- autoretry(fun, channel=None, **ensure_options)[源代码]¶
支持的函数的装饰符
channel
关键字参数。如果生成的可调用函数引发连接或通道相关错误,则它将重试调用该函数。返回值将是
(retval, last_created_channel)
。如果一个
channel
如果没有提供,则会自动获取一个(记得在之后关闭它)。示例:¶
>>> channel = connection.channel() >>> try: ... ret, channel = connection.autoretry( ... publish_messages, channel) ... finally: ... channel.close()
- property channel_errors¶
通道可能引发的异常列表。
- close()¶
关闭连接(如果打开)。
- connect_timeout = 5¶
- property connected¶
如果已建立连接,则返回TRUE。
- property connection_errors¶
连接可能引发的异常列表。
- cycle = None¶
迭代器返回在连接失败时尝试的下一个代理URL(由
failover_strategy
)。
- declared_entities = None¶
声明实体的缓存是针对每个连接的,以防服务器丢失数据。
- property default_channel: Channel¶
默认频道。
在访问时创建,并在连接关闭时关闭。
注:¶
当您只需要一个通道时,可以用于自动通道处理,如果将连接而不是通道传递给需要通道的函数,则它也是隐式使用的通道。
- drain_events(**kwargs)[源代码]¶
等待来自服务器的单个事件。
论点:¶
Timeout(Float):放弃前的超时时间(秒)。
- 引发socket.timeout:
如果超过超时。:
- ensure(obj, fun, errback=None, max_retries=None, interval_start=1, interval_step=1, interval_max=1, on_revive=None, retry_errors=None)[源代码]¶
确保操作完成。
而不管发生任何通道/连接错误。
通过建立连接并重新应用函数来重试。
论点:¶
OBJ:确保对其执行操作的对象。Fun(Callable):要应用的方法。
- Errback(Callable):可选的回调
无法建立连接。提供的参数是引发的异常和将休眠的间隔
(exc, interval)
。- MAX_RETRIES(Int):重试的最大次数。
如果超过此限制,将重新引发连接错误。
- INTERVAL_START(FLOAT):我们开始的秒数
睡了一觉。
- INTERVAL_STEP(FLOAT):间隔增加了多少秒
每一次重试。
- Interval_max(浮点数):休眠间隔的最大秒数
每次重试。
- On_revive(Callable):可选的回调
恢复成功完成
- RETRY_ERROR(元组):要重试的可选错误列表
而不管连接状态如何。
示例
>>> from kombu import Connection, Producer >>> conn = Connection('amqp://') >>> producer = Producer(conn)
>>> def errback(exc, interval): ... logger.error('Error: %r', exc, exc_info=1) ... logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish, ... errback=errback, max_retries=3) >>> publish({'hello': 'world'}, routing_key='dest')
- failover_strategies = {'round-robin': <class 'itertools.cycle'>, 'shuffle': <function shufflecycle>}¶
- failover_strategy = 'round-robin'¶
用于在连接失败后重新连接时选择新主机的策略。“循环”、“洗牌”或任何定制迭代器之一,不断产生新的URL以供尝试。
- heartbeat = None¶
心跳数值,目前仅受Py-AMQP传输支持。
- heartbeat_check(rate=2)[源代码]¶
检查一下心跳。
允许传输器执行使心跳工作所需的任何周期性任务。这应该大约每秒都被调用。
如果当前传输不支持心跳,则这是noop操作。
论点:¶
- Rate(Int):rate是调用记号的频率
与实际的心跳值进行比较。例如,如果心跳设置为3秒,并且每隔3/2秒调用一次滴答,则速率为2。该值当前未被任何传输使用。
- property host¶
以冒号分隔的主机名/端口对形式的主机。
- hostname = None¶
- property is_evented¶
- login_method = None¶
- property manager¶
AMQP管理API。
试验性管理器,可用于管理/监视Broker实例。
并非所有运输工具都可用。
- password = None¶
- port = None¶
- property qos_semantics_matches_spec¶
- property recoverable_channel_errors¶
可恢复的通道错误。
无需重新建立连接即可自动恢复的与通道相关的异常列表。
- property recoverable_connection_errors¶
可恢复的连接错误。
可以从中恢复但必须首先关闭并重新建立连接的与连接相关的异常的列表。
- resolve_aliases = {'librabbitmq': 'amqp', 'pyamqp': 'amqp'}¶
- ssl = None¶
- property supports_heartbeats¶
- property transport¶
- transport_options = None¶
其他特定于传输的选项,传递给传输实例。
- uri_prefix = None¶
- userid = None¶
- virtual_host = '/'¶
泳池¶
参见
捷径方法 Connection.Pool()
和 Connection.ChannelPool()
是实例化这些类的推荐方式。
- class kombu.connection.ConnectionPool(connection, limit=None, **kwargs)[源代码]¶
连接池。
- LimitExceeded = <class 'kombu.exceptions.ConnectionLimitExceeded'>¶
- acquire(block=False, timeout=None)¶
获取资源。
论点:¶
- BLOCK(Bool):如果超过限制,
然后阻止,直到有可用的项目。
- 超时(浮点数):等待超时
如果
block
是真的。缺省值为None
(永远)。
- 提升LimitExceed:
IF BLOCK为FALSE且已超过限制。:
- release(resource)¶
- force_close_all()¶
关闭并删除池中的所有资源(包括正在使用的资源)。
用于在分叉之后关闭父进程的资源(例如套接字/连接)。
- class kombu.connection.ChannelPool(connection, limit=None, **kwargs)[源代码]¶
频道池。
- LimitExceeded = <class 'kombu.exceptions.ChannelLimitExceeded'>¶
- acquire(block=False, timeout=None)¶
获取资源。
论点:¶
- BLOCK(Bool):如果超过限制,
然后阻止,直到有可用的项目。
- 超时(浮点数):等待超时
如果
block
是真的。缺省值为None
(永远)。
- 提升LimitExceed:
IF BLOCK为FALSE且已超过限制。:
- release(resource)¶
- force_close_all()¶
关闭并删除池中的所有资源(包括正在使用的资源)。
用于在分叉之后关闭父进程的资源(例如套接字/连接)。