This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
连接和传输¶
基础知识¶
要发送和接收消息,您需要传输和连接。有几种传输方式可供选择(AMQP、librabbitmq、redis、QPID、In-Memory等),您甚至可以创建自己的。默认传输为AMQP。
使用默认传输创建连接:
>>> from kombu import Connection
>>> connection = Connection('amqp://guest:guest@localhost:5672//')
连接还不会建立,因为连接是在需要时建立的。如果要显式建立连接,则必须调用 connect()
方法:
>>> connection.connect()
您还可以检查连接是否已连接:
>>> connection.connected
True
连接在使用后必须始终关闭:
>>> connection.close()
但最佳做法是释放连接,如果连接与连接池相关联,这将释放资源;如果连接与连接池相关联,则关闭连接,并使以后更容易过渡到连接池:
>>> connection.release()
参见
当然,连接可以用作上下文,我们鼓励您这样做,因为这样会更难忘记释放开放资源:
with Connection() as connection:
# work with connection
西芹配SQS¶
默认情况下,SQS Broker URL不包括队列名称前缀。因此,我们可以使用以下代码片段来使其在芹菜中工作。
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker="sqs://",
broker_transport_options={
"queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
},
)
task_base = celery.Task
class ContextTask(task_base):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return task_base.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
URLs¶
连接参数可以以URL形式提供,格式如下:
transport://userid:password@hostname:port/virtual_host
所有这些都是有效的URL:
# Specifies using the amqp transport only, default values
# are taken from the keyword arguments.
amqp://
# Using Redis
redis://localhost:6379/
# Using Redis over a Unix socket
redis+socket:///tmp/redis.sock
# Using Redis sentinel
sentinel://sentinel1:26379;sentinel://sentinel2:26379
# Using Qpid
qpid://localhost/
# Using virtual host '/foo'
amqp://localhost//foo
# Using virtual host 'foo'
amqp://localhost/foo
# Using Pyro with name server running on 'localhost'
pyro://localhost/kombu.broker
URL的查询部分也可用于设置选项,例如:
amqp://localhost/myvhost?ssl=1
看见 关键字参数 以获取支持的选项列表。
没有选项的连接将使用缺省连接设置,即使用本地主机主机、缺省端口、用户名 guest 、密码 guest 和虚拟主机“/”。不带参数的连接等同于:
>>> Connection('amqp://guest:guest@localhost:5672//')
默认端口是特定于传输的,对于AMQP,该端口为5672。
根据所使用的传输方式,其他字段也可能具有不同的含义。例如,Redis传输使用 virtual_host 参数作为Redis数据库编号。
关键字参数¶
这个 Connection
类支持其他关键字参数,这些参数包括:
- 主机名:
默认主机名(如果URL中未提供)。
- 用户ID:
默认用户名(如果URL中未提供)。
- 口令:
默认密码(如果URL中未提供)。
- virtual_host:
默认虚拟主机(如果URL中未提供)。
- 端口:
默认端口(如果URL中未提供)。
- 运输:
默认传输(如果URL中未提供)。可以是指定类路径的字符串。(例如:
kombu.transport.pyamqp:Transport
)或其中一个别名:pyamqp
,librabbitmq
,redis
,qpid
,memory
以此类推。- SSL:
使用SSL连接到服务器。缺省值为
False
。仅受AMQP和QPID传输支持。- 坚持:
坚持连接到服务器。 No longer supported, relic from AMQP 0.8
- connect_timeout:
连接到服务器的超时时间(秒)。可能不受指定传输的支持。
- transport_options:
要传递给备用Kombu通道实现的附加连接参数的字典。有关可用选项,请查阅运输文档。
AMQP传输¶
有4种运输工具可供AMQP使用。
pyamqp
使用纯Python库amqp
,与Kombu一起自动安装。librabbitmq
使用用C编写的高性能传输。这需要librabbitmq
要安装的Python包,它会自动编译C库。amqp
试图利用librabbitmq
但又回落到pyamqp
。qpid
使用纯Python库qpid.messaging
,与Kombu一起自动安装。QPID库使用AMQP,但使用的是专门由ApacheQPid代理支持的定制扩展。
为了获得最高的性能,您应该安装 librabbitmq
包裹。要确保使用librabbitmq,您可以在传输URL中显式指定它,或使用 amqp
有后备力量。
运输比较¶
Client |
Type |
Direct |
Topic |
Fanout |
Priority |
amqp |
原生的 |
是 |
是 |
是 |
是 [3] |
qpid |
原生的 |
是 |
是 |
是 |
不是 |
redis |
虚拟 |
是 |
是 |
是(酒吧/订阅) |
是 |
SQS |
虚拟 |
是 |
是 [1] |
是 [2] |
不是 |
zookeeper |
虚拟 |
是 |
是 [1] |
不是 |
是 |
in-memory |
虚拟 |
是 |
是 [1] |
不是 |
不是 |
SLMQ |
虚拟 |
是 |
是 [1] |
不是 |
不是 |