Automatically register consumer once asynchronous connection is completed

(imported from commit 3d860a9b79a42beffbd3b73f49aa2c25986dd3c6)
This commit is contained in:
Leo Franchi
2013-03-19 14:29:22 -04:00
parent 452105e393
commit f64bc59dfb
2 changed files with 20 additions and 12 deletions

View File

@@ -103,7 +103,9 @@ class TornadoQueueClient(SimpleQueueClient):
def _connect(self, on_open_cb = None):
self.log.info("Beginning TornadoQueueClient connection")
self._on_open_cb = on_open_cb
self._on_open_cbs = []
if on_open_cb:
self._on_open_cbs.append(on_open_cb)
self.connection = pika.adapters.TornadoConnection(
self._get_parameters(),
on_open_callback = self._on_open,
@@ -122,8 +124,9 @@ class TornadoQueueClient(SimpleQueueClient):
def _on_channel_open(self, channel):
self.channel = channel
if self._on_open_cb:
self._on_open_cb()
for callback in self._on_open_cbs:
callback()
self._on_open_cbs = []
self.log.info('TornadoQueueClient connected')
def _on_connection_closed(self, method_frame):
@@ -151,3 +154,17 @@ class TornadoQueueClient(SimpleQueueClient):
callback=finish)
else:
callback()
def register_consumer(self, queue_name, consumer):
def wrapped_consumer(ch, method, properties, body):
consumer(ch, method, properties, body)
ch.basic_ack(delivery_tag=method.delivery_tag)
if not self.ready():
self._on_open_cbs.append(lambda: self.register_consumer(queue_name, consumer))
return
self.consumers[queue_name].add(wrapped_consumer)
self.ensure_queue(queue_name,
lambda: self.channel.basic_consume(wrapped_consumer, queue=queue_name,
consumer_tag=self._generate_ctag(queue_name)))