This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
文件系统传输- kombu.transport.filesystem
¶
Kombu的文件系统传输模块。
使用文件系统作为消息存储库进行传输。写入队列的消息存储在 data_folder_in 从队列中读取的目录和消息 data_folder_out 目录。这两个目录都必须手动创建。简单的例子:
制片人:
import kombu
conn = kombu.Connection(
'filesystem://', transport_options={
'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
}
)
conn.connect()
test_queue = kombu.Queue('test', routing_key='test')
with conn as conn:
with conn.default_channel as channel:
producer = kombu.Producer(channel)
producer.publish(
{'hello': 'world'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='pickle'
)
消费者:
import kombu
conn = kombu.Connection(
'filesystem://', transport_options={
'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
}
)
conn.connect()
def callback(body, message):
print(body, message)
message.ack()
test_queue = kombu.Queue('test', routing_key='test')
with conn as conn:
with conn.default_channel as channel:
consumer = kombu.Consumer(
conn, [test_queue], accept=['pickle']
)
consumer.register_callback(callback)
with consumer:
conn.drain_events(timeout=1)
功能¶
类型:虚拟
支持Direct:是
支持主题:是
支持扇出:支持
支持优先级:否
支持TTL:否
连接字符串¶
连接字符串的格式如下:
filesystem://
交通选择¶
data_folder_in
-消息写入队列时存储的目录。data_folder_out
-从队列读取时从中读取消息的目录。store_processed
-如果设置为True,则所有已处理的邮件都将备份到processed_folder
。processed_folder
-备份已处理文件的目录。control_folder
-存储交换队列表的目录。
运输¶
- class kombu.transport.filesystem.Transport(client, **kwargs)[源代码]¶
文件系统传输。
- class Channel(connection, **kwargs)¶
文件系统通道。
- property control_folder¶
- property data_folder_in¶
- property data_folder_out¶
- get_table(exchange)¶
获取以下项的绑定表 exchange 。
- property processed_folder¶
- property store_processed¶
- supports_fanout = True¶
如果通道支持扇出交换,则设置标志。
- property transport_options¶
- default_port = 0¶
未指定端口时使用的端口号。
- driver_name = 'filesystem'¶
驱动程序库的名称(例如‘py-amqp’、‘redis’)。
- driver_type = 'filesystem'¶
驱动程序的类型,可用于使用AMQP协议(DRIVER_TYPE:‘AMQP’)、Redis(DRIVER_TYPE:‘REDIS’)等来分隔传输...
- global_state = <kombu.transport.virtual.base.BrokerState object>¶
- implements = {'asynchronous': False, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}¶