From 0aa20cb5944a3de2c20bbf4230ae154b4a9d591d Mon Sep 17 00:00:00 2001 From: Leo Franchi Date: Wed, 17 Apr 2013 12:11:28 -0400 Subject: [PATCH] Rework saved consumer logic in TornadoQueue to always reconnect consumers (imported from commit 0627d769349077c1e795db9215b17f538e9ec75c) --- zephyr/lib/queue.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/zephyr/lib/queue.py b/zephyr/lib/queue.py index 499a466db1..f760ddcdbc 100644 --- a/zephyr/lib/queue.py +++ b/zephyr/lib/queue.py @@ -39,9 +39,10 @@ class SimpleQueueClient(object): def _generate_ctag(self, queue_name): return "%s_%s" % (queue_name, str(random.getrandbits(16))) - def _reconnect_callbacks(self): + def _reconnect_consumer_callbacks(self): for queue, consumers in self.consumers.items(): for consumer in consumers: + self.log.info("Queue reconnecting saved consumer %s to queue %s" % (consumer, queue)) self.ensure_queue(queue, lambda: self.channel.basic_consume( consumer, queue=queue, @@ -119,10 +120,12 @@ calling _adapter_disconnect, ignoring" % (e,)) class TornadoQueueClient(SimpleQueueClient): # Based on: # https://pika.readthedocs.org/en/0.9.8/examples/asynchronous_consumer_example.html + def __init__(self): + super(TornadoQueueClient, self).__init__() + self._on_open_cbs = [] def _connect(self, on_open_cb = None): self.log.info("Beginning TornadoQueueClient connection") - self._on_open_cbs = [] if on_open_cb: self._on_open_cbs.append(on_open_cb) self.connection = ExceptionFreeTornadoConnection( @@ -131,11 +134,11 @@ class TornadoQueueClient(SimpleQueueClient): stop_ioloop_on_close = False) self.connection.add_on_close_callback(self._on_connection_closed) - def _reconnect(self, on_open_cb = None): + def _reconnect(self): self.connection = None self.channel = None self.queues = set() - self._connect(on_open_cb) + self._connect() def _on_open(self, connection): self.connection.channel( @@ -145,7 +148,7 @@ class TornadoQueueClient(SimpleQueueClient): self.channel = channel for callback in self._on_open_cbs: callback() - self._on_open_cbs = [] + self._reconnect_consumer_callbacks() self.log.info('TornadoQueueClient connected') def _on_connection_closed(self, method_frame): @@ -156,7 +159,7 @@ class TornadoQueueClient(SimpleQueueClient): retry_seconds = 2 def on_timeout(): try: - self._reconnect(self._reconnect_callbacks) + self._reconnect() except pika.exceptions.AMQPConnectionError: self.log.critical("Failed to reconnect to RabbitMQ, retrying...") ioloop.IOLoop.instance().add_timeout(time.time() + retry_seconds, on_timeout) @@ -185,7 +188,7 @@ class TornadoQueueClient(SimpleQueueClient): ch.basic_ack(delivery_tag=method.delivery_tag) if not self.ready(): - self._on_open_cbs.append(lambda: self.register_consumer(queue_name, consumer)) + self.consumers[queue_name].add(wrapped_consumer) return self.consumers[queue_name].add(wrapped_consumer)