This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
开始¶
- 版本:
5.3.2
- 万维网:
- 下载:
- 来源:
- 关键词:
消息传递、AMQP、rabbitmq、redis、MongoDB、python、Queue
关于¶
Kombu 是一个用于Python的消息库。
的目的是 Kombu 就是要通过为AMQ协议提供惯用的高级接口来尽可能地简化在Python中的消息传递,并为常见的消息传递问题提供经过验证和测试的解决方案。
AMQP 是高级消息队列协议,这是一种用于消息定向、排队、路由、可靠性和安全性的开放标准协议, RabbitMQ 消息传递服务器是最流行的实现方式。
功能¶
允许应用程序作者通过使用可插拔传输来支持多个消息服务器解决方案。
虚拟传输使添加对非AMQP传输的支持变得非常容易。已经有内置的支持 Redis , Amazon SQS , Azure Storage Queues , Azure Service Bus , ZooKeeper , SoftLayer MQ , MongoDB 和 Pyro 。
用于单元测试的内存中传输。
支持消息有效负载的自动编码、序列化和压缩。
跨传输进行一致的异常处理。
通过适当地处理连接和通道错误来确保执行操作的能力。
有几个恼人的地方 amqplib 已经修复,比如支持超时和在多个频道上等待事件的能力。
已在使用的项目 carrot 可以通过使用兼容层轻松移植。
有关AMQP的介绍,您应该阅读文章 Rabbits and warrens ,而 Wikipedia article about AMQP 。
运输比较¶
Client |
Type |
Direct |
Topic |
Fanout |
Priority |
TTL |
amqp |
原生的 |
是 |
是 |
是 |
是 [3] |
是 [4] |
qpid |
原生的 |
是 |
是 |
是 |
不是 |
不是 |
redis |
虚拟 |
是 |
是 |
是(酒吧/订阅) |
是 |
不是 |
mongodb |
虚拟 |
是 |
是 |
是 |
是 |
是 |
SQS |
虚拟 |
是 |
是 [1] |
是 [2] |
不是 |
不是 |
zookeeper |
虚拟 |
是 |
是 [1] |
不是 |
是 |
不是 |
in-memory |
虚拟 |
是 |
是 [1] |
不是 |
不是 |
不是 |
SLMQ |
虚拟 |
是 |
是 [1] |
不是 |
不是 |
不是 |
Pyro |
虚拟 |
是 |
是 [1] |
不是 |
不是 |
不是 |
文档¶
Kombu正在使用Sphinx,最新文档可在此处找到:
快速概述¶
from kombu import Connection, Exchange, Queue
media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')
def process_media(body, message):
print(body)
message.ack()
# connections
with Connection('amqp://guest:guest@localhost//') as conn:
# produce
producer = conn.Producer(serializer='json')
producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
exchange=media_exchange, routing_key='video',
declare=[video_queue])
# the declare above, makes sure the video queue is declared
# so that the messages can be delivered.
# It's a best practice in Kombu to have both publishers and
# consumers declare the queue. You can also declare the
# queue manually using:
# video_queue(conn).declare()
# consume
with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
# Process messages and handle events on all channels
while True:
conn.drain_events()
# Consume from several queues on the same channel:
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')
with connection.Consumer([video_queue, image_queue],
callbacks=[process_media]) as consumer:
while True:
connection.drain_events()
或手动处理频道:
with connection.channel() as channel:
producer = Producer(channel, ...)
consumer = Consumer(channel)
所有对象也可以在With语句之外使用,只需记住在使用后关闭对象:
from kombu import Connection, Consumer
connection = Connection()
# ...
connection.release()
consumer = Consumer(channel_or_connection, ...)
consumer.register_callback(my_callback)
consumer.consume()
# ....
consumer.cancel()
Exchange 和 Queue 是简单的声明,可以在配置文件等中进行筛选和使用。
它们也支持操作,但要做到这一点,它们需要绑定到一个通道。
将交换和队列绑定到连接将使其使用该连接的默认通道。
>>> exchange = Exchange('tasks', 'direct')
>>> connection = Connection()
>>> bound_exchange = exchange(connection)
>>> bound_exchange.delete()
# the original exchange is not affected, and stays unbound.
>>> exchange.delete()
raise NotBoundError: Can't call delete on Exchange not bound to
a channel.
术语¶
在开始之前,您应该熟悉一些概念:
生产者
生产商将信息发送到交易所。
交易所
消息被发送到交易所。交换机是命名的,可以配置为使用几种路由算法之一。交换通过将消息中的路由关键字与使用者在绑定到交换时提供的路由关键字进行匹配来将消息路由到使用者。
消费者
使用者声明队列,将其绑定到交换并接收来自该队列的消息。
排队
队列接收发送到交换的消息。队列由消费者声明。
路由密钥
每条消息都有一个路由键。路由密钥的解释取决于交换类型。AMQP标准定义了四种默认的交换类型,供应商可以定义定制类型(有关详细信息,请参阅供应商手册)。
以下是AMQP/0.8定义的默认交换类型:
直接兑换
如果消息的路由键属性与 routing_key 消费者的属性是相同的。
扇形交换机
始终匹配,即使绑定没有路由键。
话题交流
通过基元模式匹配方案匹配消息的路由键属性。然后,消息路由键由由点分隔的单词组成 ("." ,如域名),并且有两个特殊字符可用;星号 ("*" )和散列 ("#" )。星号匹配任何单词,而散列匹配零个或多个单词。例如 "*.stock.#" 匹配路由关键字 "usd.stock" 和 "eur.stock.db" 但不是 "stock.nasdaq" 。
安装¶
您可以安装 Kombu 通过Python程序包索引(PyPI)或来自源代码。
要使用以下工具进行安装 pip 、:
$ pip install kombu
要使用以下工具进行安装 easy_install 、:
$ easy_install kombu
如果您已经下载了源代码tarball,您可以通过执行以下操作来安装它:
$ python setup.py build
# python setup.py install # as root
获取帮助¶
邮件列表¶
加入 carrot-users 邮件列表。
漏洞追踪器¶
如果您有任何建议、错误报告或烦恼,请将它们报告给我们的问题跟踪器http://github.com/celery/kombu/issues/
贡献¶
发展中的 Kombu 发生在吉苏布:http://github.com/celery/kombu
我们非常鼓励您参与开发。如果您不喜欢Github(出于某种原因),欢迎您发送常规补丁。
许可证¶
本软件是按照 New BSD License 。请参阅 LICENSE 完整许可证文本的顶级分发目录中的文件。