This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.
自动故障切换¶
自动故障转移是用于连接到群集代理的功能。使用自动故障转移的应用程序应该能够自动连接到健康的节点,并对群集中节点的意外故障做出反应。
连接故障切换¶
这个 Connection
正在接受指向多个经纪人的多个URL。在连接到Broker期间,Kombu会自动从列表中挑选健康的节点。在下面的示例中,Kombu使用Health.example.com代理:
>>> conn = Connection(
... 'amqp://guest:guest@broken.example.com;guest:guest@healthy.example.com'
... )
>>> conn.connect()
>>> conn
<Connection: amqp://guest:**@healthy.example.com at 0x6fffff751710>
Connection
还接受Failover_Strategy参数,该参数定义了尝试节点的策略:
>>> Connection(
... 'amqp://broker1.example.com;amqp://broker2.example.com',
... failover_strategy='round-robin'
... )
可用故障转移策略的当前列表在kombu.Connection模块中定义:
>>> import kombu
>>> kombu.connection.failover_strategies
{'round-robin': <class 'itertools.cycle'>, 'shuffle': <function shufflecycle at 0x6fffff8547a0>}
在连接期间进行故障转移仅在呼叫期间进行故障转移 connect()
方法论 Connection
。
操作故障切换¶
在中使用多个连接字符串的连接故障转移 Connection
解决了在创建新连接时Broker不可用的问题。但在现实世界中,这些连接是长期存在的,因此有可能在连接的生存期内代理失败。对于此场景,需要重试对Broker执行的操作。重试可确保失败的操作触发到健康的代理的新连接,并重新执行失败的操作。
故障切换在 ensure()
尝试执行该函数的方法。当联系Broker失败时,它会重新连接底层连接并再次重新执行该函数。下面的示例确保 publish()
方法在发生错误时重新执行:
>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
>>> def errback(exc, interval):
... logger.error('Error: %r', exc, exc_info=1)
... logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish,
... errback=errback, max_retries=3)
>>> publish({'hello': 'world'}, routing_key='dest')
一些方法接受通道作为参数,例如 declare()
。由于Channel是作为参数传递的,因此在故障转移期间不会自动刷新,因此重试调用方法失败。在此场景中 autoretry()
需要使用自动传递通道并在故障切换期间对其进行刷新的:
>>> import kombu
>>> conn = kombu.Connection('amqp://broker1:5672;amqp://broker2:5672')
>>> conn.connect()
>>> q = kombu.Queue('test_queue')
>>> declare = conn.autoretry(q.declare)
>>> declare()
制片人¶
publish()
可以使用以下功能进行自动故障切换 ensure()
如前所述。此外,它还包含重试参数作为重试的快捷方式。以下示例在发生错误时重试发布:
>>> from kombu import *
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
... with conn.channel() as channel:
... producer = conn.Producer()
... producer = Producer(channel)
... producer.publish(
... {'hello': 'world'}, routing_key='queue', retry=True
... )
消费者¶
具有故障转移功能的使用者可以使用以下功能实施:
>>> def consume():
... while True:
... try:
... conn.drain_events(timeout=1)
... except socket.timeout:
... pass
此函数使用超时在无限循环中排空事件,以避免不可用代理的阻塞连接。具有故障转移的使用者是通过使用 ensure()
方法:
>>> consume = conn.ensure(conn, consume)
>>> consume()
实施具有故障转移功能的消费者的完整示例如下:
>>> from kombu import *
>>> import socket
>>> def callback(body, message):
... print(body)
... message.ack()
>>> queue = Queue('queue', routing_key='queue')
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
... def consume():
... while True:
... try:
... conn.drain_events(timeout=1)
... except socket.timeout:
... pass
... with conn.channel() as channel:
... consumer = Consumer(channel, queue)
... consumer.register_callback(callback)
... with consumer:
... while True:
... consume = conn.ensure(conn, consume)
... consume()
在将消费者实现为 ConsumerMixin
,故障转移功能是通过使用 ensure()
:
>>> from kombu import *
>>> from kombu.mixins import ConsumerMixin
>>> class C(ConsumerMixin):
... def __init__(self, connection):
... self.connection = connection
... def get_consumers(self, Consumer, channel):
... return [
... Consumer(
... [Queue('queue', routing_key='queue')],
... callbacks=[self.on_message], accept=['json']
... ),
... ]
... def on_message(self, body, message):
... print('RECEIVED MESSAGE: {0!r}'.format(body))
... message.ack()
... def consume(self, *args, **kwargs):
... consume = conn.ensure(conn, super().consume)
... return consume(*args, **kwargs)
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
... C(conn).run()