This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

连接池和生产者池

默认池

Kombu附带两个全局池:一个连接池和一个生产者池。

这些都很方便,而且它们是全局的,这可能不是问题,因为连接通常应该限制在进程级别,而不是每个线程/应用程序等等,但如果您需要每个线程的定制池,请参阅 自定义池组

连接池组

连接池提供如下形式 kombu.pools.connections 。这是一个池组,这意味着您为其提供一个连接实例,然后您将得到一个池实例。我们为每个连接实例设置一个池,以支持同一应用程序中的多个连接。具有相同连接参数的所有连接实例将获得相同的池:

>>> from kombu import Connection
>>> from kombu.pools import connections

>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>
>>> connections[Connection('redis://localhost:6379')]
<kombu.connection.ConnectionPool object at 0x101805650>

让我们获取并释放一个连接:

from kombu import Connection
from kombu.pools import connections

connection = Connection('redis://localhost:6379')

with connections[connection].acquire(block=True) as conn:
    print('Got connection: {0!r}'.format(connection.as_uri()))

备注

这个 block=True 这意味着获取呼叫将被阻止,直到池中有连接可用。请注意,这将永远阻塞,以防您的代码中存在未释放连接的死锁。有一个 timeout 参数可以用来防范这种情况(请参见 kombu.connection.Resource.acquire() )。

如果禁用了阻止,并且池中没有任何连接,则 kombu.exceptions.ConnectionLimitExceeded 将引发异常。

事情就是这样。如果您需要同时连接到多个代理,您也可以这样做:

from kombu import Connection
from kombu.pools import connections

c1 = Connection('amqp://')
c2 = Connection('redis://')

with connections[c1].acquire(block=True) as conn1:
    with connections[c2].acquire(block=True) as conn2:
        # ....

Producer池组

这是一个池组,就像连接一样,只是它管理 Producer 用于发布消息的实例。

下面是使用生产者池将消息发布到 news 交易所:

from kombu import Connection, Exchange
from kombu.pools import producers

# The exchange we send our news articles to.
news_exchange = Exchange('news')

# The article we want to send
article = {'title': 'No cellular coverage on the tube for 2012',
           'ingress': 'yadda yadda yadda'}

# The broker where our exchange is.
connection = Connection('amqp://guest:guest@localhost:5672//')

with producers[connection].acquire(block=True) as producer:
    producer.publish(
        article,
        exchange=news_exchange,
        routing_key='domestic',
        declare=[news_exchange],
        serializer='json',
        compression='zlib')

泳池限制

默认情况下,每个连接实例都有10个连接的限制。您可以使用以下命令更改此限制 kombu.pools.set_limit() 。您可以在运行时扩大池,但不能缩小池,因此最好在应用程序启动后尽早设置限制:

>>> from kombu import pools
>>> pools.set_limit()

您还可以使用以下命令获取电流限制 kombu.pools.get_limit()

>>> from kombu import pools
>>> pools.get_limit()
10
>>> pools.set_limit(100)
100
>>> kombu.pools.get_limit()
100

重置所有池

您可以使用关闭所有活动连接并重置所有池组 kombu.pools.reset() 功能。请注意,这将不会尊重当前使用这些连接的任何内容,因此只会将这些连接从它们的脚下拖走:在使用它之前,您应该非常小心。

如果进程是派生的,则Kombu将重置池,以便派生的进程从干净的池组开始。

自定义池组

要维护您自己的池组,您应该创建自己的池组 Connectionskombu.pools.Producers 实例:

from kombu import pools
from kombu import Connection

connections = pools.Connections(limit=100)
producers = pools.Producers(limit=connections.limit)

connection = Connection('amqp://guest:guest@localhost:5672//')

with connections[connection].acquire(block=True):
    # ...

如果要使用可通过设置的全局限制 set_limit() 您可以使用特定值作为 limit 论据:

from kombu import pools

connections = pools.Connections(limit=pools.use_default_limit)