This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

连接和传输

基础知识

要发送和接收消息,您需要传输和连接。有几种传输方式可供选择(AMQP、librabbitmq、redis、QPID、In-Memory等),您甚至可以创建自己的。默认传输为AMQP。

使用默认传输创建连接:

>>> from kombu import Connection
>>> connection = Connection('amqp://guest:guest@localhost:5672//')

连接还不会建立,因为连接是在需要时建立的。如果要显式建立连接,则必须调用 connect() 方法:

>>> connection.connect()

您还可以检查连接是否已连接:

>>> connection.connected
True

连接在使用后必须始终关闭:

>>> connection.close()

但最佳做法是释放连接,如果连接与连接池相关联,这将释放资源;如果连接与连接池相关联,则关闭连接,并使以后更容易过渡到连接池:

>>> connection.release()

当然,连接可以用作上下文,我们鼓励您这样做,因为这样会更难忘记释放开放资源:

with Connection() as connection:
    # work with connection

西芹配SQS

默认情况下,SQS Broker URL不包括队列名称前缀。因此,我们可以使用以下代码片段来使其在芹菜中工作。

from celery import Celery
def make_celery(app):
    celery = Celery(
        app.import_name,
        broker="sqs://",
        broker_transport_options={
            "queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
        },
    )
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

    return celery

URLs

连接参数可以以URL形式提供,格式如下:

transport://userid:password@hostname:port/virtual_host

所有这些都是有效的URL:

# Specifies using the amqp transport only, default values
# are taken from the keyword arguments.
amqp://

# Using Redis
redis://localhost:6379/

# Using Redis over a Unix socket
redis+socket:///tmp/redis.sock

# Using Redis sentinel
sentinel://sentinel1:26379;sentinel://sentinel2:26379

# Using Qpid
qpid://localhost/

# Using virtual host '/foo'
amqp://localhost//foo

# Using virtual host 'foo'
amqp://localhost/foo

# Using Pyro with name server running on 'localhost'
pyro://localhost/kombu.broker

URL的查询部分也可用于设置选项,例如:

amqp://localhost/myvhost?ssl=1

看见 关键字参数 以获取支持的选项列表。

没有选项的连接将使用缺省连接设置,即使用本地主机主机、缺省端口、用户名 guest 、密码 guest 和虚拟主机“/”。不带参数的连接等同于:

>>> Connection('amqp://guest:guest@localhost:5672//')

默认端口是特定于传输的,对于AMQP,该端口为5672。

根据所使用的传输方式,其他字段也可能具有不同的含义。例如,Redis传输使用 virtual_host 参数作为Redis数据库编号。

关键字参数

这个 Connection 类支持其他关键字参数,这些参数包括:

主机名:

默认主机名(如果URL中未提供)。

用户ID:

默认用户名(如果URL中未提供)。

口令:

默认密码(如果URL中未提供)。

virtual_host:

默认虚拟主机(如果URL中未提供)。

端口:

默认端口(如果URL中未提供)。

运输:

默认传输(如果URL中未提供)。可以是指定类路径的字符串。(例如: kombu.transport.pyamqp:Transport )或其中一个别名: pyamqplibrabbitmqredisqpidmemory 以此类推。

SSL:

使用SSL连接到服务器。缺省值为 False 。仅受AMQP和QPID传输支持。

坚持:

坚持连接到服务器。 No longer supported, relic from AMQP 0.8

connect_timeout:

连接到服务器的超时时间(秒)。可能不受指定传输的支持。

transport_options:

要传递给备用Kombu通道实现的附加连接参数的字典。有关可用选项,请查阅运输文档。

AMQP传输

有4种运输工具可供AMQP使用。

  1. pyamqp 使用纯Python库 amqp ,与Kombu一起自动安装。

  2. librabbitmq 使用用C编写的高性能传输。这需要 librabbitmq 要安装的Python包,它会自动编译C库。

  3. amqp 试图利用 librabbitmq 但又回落到 pyamqp

  4. qpid 使用纯Python库 qpid.messaging ,与Kombu一起自动安装。QPID库使用AMQP,但使用的是专门由ApacheQPid代理支持的定制扩展。

为了获得最高的性能,您应该安装 librabbitmq 包裹。要确保使用librabbitmq,您可以在传输URL中显式指定它,或使用 amqp 有后备力量。

运输比较

Client

Type

Direct

Topic

Fanout

Priority

amqp

原生的

[3]

qpid

原生的

不是

redis

虚拟

是(酒吧/订阅)

SQS

虚拟

[1]

[2]

不是

zookeeper

虚拟

[1]

不是

in-memory

虚拟

[1]

不是

不是

SLMQ

虚拟

[1]

不是

不是