This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
连接池和生产者池¶
默认池¶
Kombu附带两个全局池:一个连接池和一个生产者池。
这些都很方便,而且它们是全局的,这可能不是问题,因为连接通常应该限制在进程级别,而不是每个线程/应用程序等等,但如果您需要每个线程的定制池,请参阅 自定义池组 。
连接池组¶
连接池提供如下形式 kombu.pools.connections
。这是一个池组,这意味着您为其提供一个连接实例,然后您将得到一个池实例。我们为每个连接实例设置一个池,以支持同一应用程序中的多个连接。具有相同连接参数的所有连接实例将获得相同的池:
>>> from kombu import Connection
>>> from kombu.pools import connections
>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
让我们获取并释放一个连接:
from kombu import Connection
from kombu.pools import connections
connection = Connection('redis://localhost:6379')
with connections[connection].acquire(block=True) as conn:
print('Got connection: {0!r}'.format(connection.as_uri()))
备注
这个 block=True
这意味着获取呼叫将被阻止,直到池中有连接可用。请注意,这将永远阻塞,以防您的代码中存在未释放连接的死锁。有一个 timeout
参数可以用来防范这种情况(请参见 kombu.connection.Resource.acquire()
)。
如果禁用了阻止,并且池中没有任何连接,则 kombu.exceptions.ConnectionLimitExceeded
将引发异常。
事情就是这样。如果您需要同时连接到多个代理,您也可以这样做:
from kombu import Connection
from kombu.pools import connections
c1 = Connection('amqp://')
c2 = Connection('redis://')
with connections[c1].acquire(block=True) as conn1:
with connections[c2].acquire(block=True) as conn2:
# ....
Producer池组¶
这是一个池组,就像连接一样,只是它管理 Producer
用于发布消息的实例。
下面是使用生产者池将消息发布到 news
交易所:
from kombu import Connection, Exchange
from kombu.pools import producers
# The exchange we send our news articles to.
news_exchange = Exchange('news')
# The article we want to send
article = {'title': 'No cellular coverage on the tube for 2012',
'ingress': 'yadda yadda yadda'}
# The broker where our exchange is.
connection = Connection('amqp://guest:guest@localhost:5672//')
with producers[connection].acquire(block=True) as producer:
producer.publish(
article,
exchange=news_exchange,
routing_key='domestic',
declare=[news_exchange],
serializer='json',
compression='zlib')
泳池限制¶
默认情况下,每个连接实例都有10个连接的限制。您可以使用以下命令更改此限制 kombu.pools.set_limit()
。您可以在运行时扩大池,但不能缩小池,因此最好在应用程序启动后尽早设置限制:
>>> from kombu import pools
>>> pools.set_limit()
您还可以使用以下命令获取电流限制 kombu.pools.get_limit()
:
>>> from kombu import pools
>>> pools.get_limit()
10
>>> pools.set_limit(100)
100
>>> kombu.pools.get_limit()
100
重置所有池¶
您可以使用关闭所有活动连接并重置所有池组 kombu.pools.reset()
功能。请注意,这将不会尊重当前使用这些连接的任何内容,因此只会将这些连接从它们的脚下拖走:在使用它之前,您应该非常小心。
如果进程是派生的,则Kombu将重置池,以便派生的进程从干净的池组开始。
自定义池组¶
要维护您自己的池组,您应该创建自己的池组 Connections
和 kombu.pools.Producers
实例:
from kombu import pools
from kombu import Connection
connections = pools.Connections(limit=100)
producers = pools.Producers(limit=connections.limit)
connection = Connection('amqp://guest:guest@localhost:5672//')
with connections[connection].acquire(block=True):
# ...
如果要使用可通过设置的全局限制 set_limit()
您可以使用特定值作为 limit
论据:
from kombu import pools
connections = pools.Connections(limit=pools.use_default_limit)