This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

文件系统传输- kombu.transport.filesystem

Kombu的文件系统传输模块。

使用文件系统作为消息存储库进行传输。写入队列的消息存储在 data_folder_in 从队列中读取的目录和消息 data_folder_out 目录。这两个目录都必须手动创建。简单的例子:

  • 制片人:

import kombu

conn = kombu.Connection(
    'filesystem://', transport_options={
        'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
    }
)
conn.connect()

test_queue = kombu.Queue('test', routing_key='test')

with conn as conn:
    with conn.default_channel as channel:
        producer = kombu.Producer(channel)
        producer.publish(
                    {'hello': 'world'},
                    retry=True,
                    exchange=test_queue.exchange,
                    routing_key=test_queue.routing_key,
                    declare=[test_queue],
                    serializer='pickle'
        )
  • 消费者:

import kombu

conn = kombu.Connection(
    'filesystem://', transport_options={
        'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
    }
)
conn.connect()

def callback(body, message):
    print(body, message)
    message.ack()

test_queue = kombu.Queue('test', routing_key='test')

with conn as conn:
    with conn.default_channel as channel:
        consumer = kombu.Consumer(
            conn, [test_queue], accept=['pickle']
        )
        consumer.register_callback(callback)
        with consumer:
            conn.drain_events(timeout=1)

功能

  • 类型:虚拟

  • 支持Direct:是

  • 支持主题:是

  • 支持扇出:支持

  • 支持优先级:否

  • 支持TTL:否

连接字符串

连接字符串的格式如下:

filesystem://

交通选择

  • data_folder_in -消息写入队列时存储的目录。

  • data_folder_out -从队列读取时从中读取消息的目录。

  • store_processed -如果设置为True,则所有已处理的邮件都将备份到 processed_folder

  • processed_folder -备份已处理文件的目录。

  • control_folder -存储交换队列表的目录。

运输

class kombu.transport.filesystem.Transport(client, **kwargs)[源代码]

文件系统传输。

class Channel(connection, **kwargs)

文件系统通道。

property control_folder
property data_folder_in
property data_folder_out
get_table(exchange)

获取以下项的绑定表 exchange

property processed_folder
property store_processed
supports_fanout = True

如果通道支持扇出交换,则设置标志。

property transport_options
default_port = 0

未指定端口时使用的端口号。

driver_name = 'filesystem'

驱动程序库的名称(例如‘py-amqp’、‘redis’)。

driver_type = 'filesystem'

驱动程序的类型,可用于使用AMQP协议(DRIVER_TYPE:‘AMQP’)、Redis(DRIVER_TYPE:‘REDIS’)等来分隔传输...

driver_version()[源代码]
global_state = <kombu.transport.virtual.base.BrokerState object>
implements = {'asynchronous': False, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}

渠道

class kombu.transport.filesystem.Channel(connection, **kwargs)[源代码]

文件系统通道。

property control_folder
property data_folder_in
property data_folder_out
get_table(exchange)[源代码]

获取以下项的绑定表 exchange

property processed_folder
property store_processed
supports_fanout = True

如果通道支持扇出交换,则设置标志。

property transport_options