mirror of
https://github.com/zulip/zulip.git
synced 2025-11-06 15:03:34 +00:00
Rework saved consumer logic in TornadoQueue to always reconnect consumers
(imported from commit 0627d769349077c1e795db9215b17f538e9ec75c)
This commit is contained in:
@@ -39,9 +39,10 @@ class SimpleQueueClient(object):
|
|||||||
def _generate_ctag(self, queue_name):
|
def _generate_ctag(self, queue_name):
|
||||||
return "%s_%s" % (queue_name, str(random.getrandbits(16)))
|
return "%s_%s" % (queue_name, str(random.getrandbits(16)))
|
||||||
|
|
||||||
def _reconnect_callbacks(self):
|
def _reconnect_consumer_callbacks(self):
|
||||||
for queue, consumers in self.consumers.items():
|
for queue, consumers in self.consumers.items():
|
||||||
for consumer in consumers:
|
for consumer in consumers:
|
||||||
|
self.log.info("Queue reconnecting saved consumer %s to queue %s" % (consumer, queue))
|
||||||
self.ensure_queue(queue, lambda: self.channel.basic_consume(
|
self.ensure_queue(queue, lambda: self.channel.basic_consume(
|
||||||
consumer,
|
consumer,
|
||||||
queue=queue,
|
queue=queue,
|
||||||
@@ -119,10 +120,12 @@ calling _adapter_disconnect, ignoring" % (e,))
|
|||||||
class TornadoQueueClient(SimpleQueueClient):
|
class TornadoQueueClient(SimpleQueueClient):
|
||||||
# Based on:
|
# Based on:
|
||||||
# https://pika.readthedocs.org/en/0.9.8/examples/asynchronous_consumer_example.html
|
# https://pika.readthedocs.org/en/0.9.8/examples/asynchronous_consumer_example.html
|
||||||
|
def __init__(self):
|
||||||
|
super(TornadoQueueClient, self).__init__()
|
||||||
|
self._on_open_cbs = []
|
||||||
|
|
||||||
def _connect(self, on_open_cb = None):
|
def _connect(self, on_open_cb = None):
|
||||||
self.log.info("Beginning TornadoQueueClient connection")
|
self.log.info("Beginning TornadoQueueClient connection")
|
||||||
self._on_open_cbs = []
|
|
||||||
if on_open_cb:
|
if on_open_cb:
|
||||||
self._on_open_cbs.append(on_open_cb)
|
self._on_open_cbs.append(on_open_cb)
|
||||||
self.connection = ExceptionFreeTornadoConnection(
|
self.connection = ExceptionFreeTornadoConnection(
|
||||||
@@ -131,11 +134,11 @@ class TornadoQueueClient(SimpleQueueClient):
|
|||||||
stop_ioloop_on_close = False)
|
stop_ioloop_on_close = False)
|
||||||
self.connection.add_on_close_callback(self._on_connection_closed)
|
self.connection.add_on_close_callback(self._on_connection_closed)
|
||||||
|
|
||||||
def _reconnect(self, on_open_cb = None):
|
def _reconnect(self):
|
||||||
self.connection = None
|
self.connection = None
|
||||||
self.channel = None
|
self.channel = None
|
||||||
self.queues = set()
|
self.queues = set()
|
||||||
self._connect(on_open_cb)
|
self._connect()
|
||||||
|
|
||||||
def _on_open(self, connection):
|
def _on_open(self, connection):
|
||||||
self.connection.channel(
|
self.connection.channel(
|
||||||
@@ -145,7 +148,7 @@ class TornadoQueueClient(SimpleQueueClient):
|
|||||||
self.channel = channel
|
self.channel = channel
|
||||||
for callback in self._on_open_cbs:
|
for callback in self._on_open_cbs:
|
||||||
callback()
|
callback()
|
||||||
self._on_open_cbs = []
|
self._reconnect_consumer_callbacks()
|
||||||
self.log.info('TornadoQueueClient connected')
|
self.log.info('TornadoQueueClient connected')
|
||||||
|
|
||||||
def _on_connection_closed(self, method_frame):
|
def _on_connection_closed(self, method_frame):
|
||||||
@@ -156,7 +159,7 @@ class TornadoQueueClient(SimpleQueueClient):
|
|||||||
retry_seconds = 2
|
retry_seconds = 2
|
||||||
def on_timeout():
|
def on_timeout():
|
||||||
try:
|
try:
|
||||||
self._reconnect(self._reconnect_callbacks)
|
self._reconnect()
|
||||||
except pika.exceptions.AMQPConnectionError:
|
except pika.exceptions.AMQPConnectionError:
|
||||||
self.log.critical("Failed to reconnect to RabbitMQ, retrying...")
|
self.log.critical("Failed to reconnect to RabbitMQ, retrying...")
|
||||||
ioloop.IOLoop.instance().add_timeout(time.time() + retry_seconds, on_timeout)
|
ioloop.IOLoop.instance().add_timeout(time.time() + retry_seconds, on_timeout)
|
||||||
@@ -185,7 +188,7 @@ class TornadoQueueClient(SimpleQueueClient):
|
|||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
if not self.ready():
|
if not self.ready():
|
||||||
self._on_open_cbs.append(lambda: self.register_consumer(queue_name, consumer))
|
self.consumers[queue_name].add(wrapped_consumer)
|
||||||
return
|
return
|
||||||
|
|
||||||
self.consumers[queue_name].add(wrapped_consumer)
|
self.consumers[queue_name].add(wrapped_consumer)
|
||||||
|
|||||||
Reference in New Issue
Block a user