This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

简单的界面

kombu.simple 是AMQP排队的一个简单接口。它与 Queue 类,这使得它非常适合具有基本消息传递需求的用户。

与定义交换和队列不同,简单类只需要两个参数,一个连接通道和一个名称。该名称用作队列、交换和路由关键字。如果需要,您可以指定一个 Queue 作为名称参数。

此外, Connection 提供了使用当前连接创建简单队列的快捷方式:

>>> queue = connection.SimpleQueue('myqueue')
>>> # ... do something with queue
>>> queue.close()

这相当于:

>>> from kombu.simple import SimpleBuffer

>>> channel = connection.channel()
>>> queue = SimpleBuffer(channel, 'mybuffer')
>>> # ... do something with queue
>>> channel.close()
>>> queue.close()

发送和接收消息

简单的接口定义了两个类; SimpleQueue ,以及 SimpleBuffer 。前者用于持久化消息,后者用于临时的、类似缓冲区的队列。它们都有相同的接口,因此您可以互换使用它们。

下面是一个使用 SimpleQueue 类来生成和使用日志记录消息:

import socket
import datetime
from time import time
from kombu import Connection


class Logger:

    def __init__(self, connection, queue_name='log_queue',
            serializer='json', compression=None):
        self.queue = connection.SimpleQueue(queue_name)
        self.serializer = serializer
        self.compression = compression

    def log(self, message, level='INFO', context={}):
        self.queue.put({'message': message,
                        'level': level,
                        'context': context,
                        'hostname': socket.gethostname(),
                        'timestamp': time()},
                        serializer=self.serializer,
                        compression=self.compression)

    def process(self, callback, n=1, timeout=1):
        for i in xrange(n):
            log_message = self.queue.get(block=True, timeout=1)
            entry = log_message.payload # deserialized data.
            callback(entry)
            log_message.ack() # remove message from queue

    def close(self):
        self.queue.close()


if __name__ == '__main__':
    from contextlib import closing

    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        with closing(Logger(conn)) as logger:

            # Send message
            logger.log('Error happened while encoding video',
                        level='ERROR',
                        context={'filename': 'cutekitten.mpg'})

            # Consume and process message

            # This is the callback called when a log message is
            # received.
            def dump_entry(entry):
                date = datetime.datetime.fromtimestamp(entry['timestamp'])
                print('[%s %s %s] %s %r' % (date,
                                            entry['hostname'],
                                            entry['level'],
                                            entry['message'],
                                            entry['context']))

            # Process a single message using the callback above.
            logger.process(dump_entry, n=1)