This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

自动故障切换

自动故障转移是用于连接到群集代理的功能。使用自动故障转移的应用程序应该能够自动连接到健康的节点,并对群集中节点的意外故障做出反应。

连接故障切换

这个 Connection 正在接受指向多个经纪人的多个URL。在连接到Broker期间,Kombu会自动从列表中挑选健康的节点。在下面的示例中,Kombu使用Health.example.com代理:

>>> conn = Connection(
...     'amqp://guest:guest@broken.example.com;guest:guest@healthy.example.com'
... )
>>> conn.connect()
>>> conn
<Connection: amqp://guest:**@healthy.example.com at 0x6fffff751710>

Connection 还接受Failover_Strategy参数,该参数定义了尝试节点的策略:

>>> Connection(
...     'amqp://broker1.example.com;amqp://broker2.example.com',
...     failover_strategy='round-robin'
... )

可用故障转移策略的当前列表在kombu.Connection模块中定义:

>>> import kombu
>>> kombu.connection.failover_strategies
{'round-robin': <class 'itertools.cycle'>, 'shuffle': <function shufflecycle at 0x6fffff8547a0>}

在连接期间进行故障转移仅在呼叫期间进行故障转移 connect() 方法论 Connection

操作故障切换

在中使用多个连接字符串的连接故障转移 Connection 解决了在创建新连接时Broker不可用的问题。但在现实世界中,这些连接是长期存在的,因此有可能在连接的生存期内代理失败。对于此场景,需要重试对Broker执行的操作。重试可确保失败的操作触发到健康的代理的新连接,并重新执行失败的操作。

故障切换在 ensure() 尝试执行该函数的方法。当联系Broker失败时,它会重新连接底层连接并再次重新执行该函数。下面的示例确保 publish() 方法在发生错误时重新执行:

>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
>>> def errback(exc, interval):
...     logger.error('Error: %r', exc, exc_info=1)
...     logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish,
...                       errback=errback, max_retries=3)
>>> publish({'hello': 'world'}, routing_key='dest')

一些方法接受通道作为参数,例如 declare() 。由于Channel是作为参数传递的,因此在故障转移期间不会自动刷新,因此重试调用方法失败。在此场景中 autoretry() 需要使用自动传递通道并在故障切换期间对其进行刷新的:

>>> import kombu
>>> conn = kombu.Connection('amqp://broker1:5672;amqp://broker2:5672')
>>> conn.connect()
>>> q = kombu.Queue('test_queue')

>>> declare = conn.autoretry(q.declare)
>>> declare()

制片人

publish() 可以使用以下功能进行自动故障切换 ensure() 如前所述。此外,它还包含重试参数作为重试的快捷方式。以下示例在发生错误时重试发布:

>>> from kombu import *
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
...     with conn.channel() as channel:
...         producer = conn.Producer()
...         producer = Producer(channel)
...         producer.publish(
...             {'hello': 'world'}, routing_key='queue', retry=True
...         )

消费者

具有故障转移功能的使用者可以使用以下功能实施:

>>> def consume():
...     while True:
...         try:
...             conn.drain_events(timeout=1)
...         except socket.timeout:
...             pass

此函数使用超时在无限循环中排空事件,以避免不可用代理的阻塞连接。具有故障转移的使用者是通过使用 ensure() 方法:

>>> consume = conn.ensure(conn, consume)
>>> consume()

实施具有故障转移功能的消费者的完整示例如下:

>>> from kombu import *
>>> import socket

>>> def callback(body, message):
...     print(body)
...     message.ack()


>>> queue = Queue('queue', routing_key='queue')
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
...     def consume():
...         while True:
...             try:
...                 conn.drain_events(timeout=1)
...             except socket.timeout:
...                 pass
...     with conn.channel() as channel:
...         consumer = Consumer(channel, queue)
...         consumer.register_callback(callback)
...         with consumer:
...             while True:
...                 consume = conn.ensure(conn, consume)
...                 consume()

在将消费者实现为 ConsumerMixin ,故障转移功能是通过使用 ensure()

>>> from kombu import *
>>> from kombu.mixins import ConsumerMixin

>>> class C(ConsumerMixin):
...     def __init__(self, connection):
...         self.connection = connection
...     def get_consumers(self, Consumer, channel):
...         return [
...             Consumer(
...                  [Queue('queue', routing_key='queue')],
...                  callbacks=[self.on_message], accept=['json']
...             ),
...         ]
...     def on_message(self, body, message):
...         print('RECEIVED MESSAGE: {0!r}'.format(body))
...         message.ack()
...     def consume(self, *args, **kwargs):
...         consume = conn.ensure(conn, super().consume)
...         return consume(*args, **kwargs)


>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
...     C(conn).run()