diff --git a/zephyr/lib/queue.py b/zephyr/lib/queue.py index 6c448a2204..e14750cebc 100644 --- a/zephyr/lib/queue.py +++ b/zephyr/lib/queue.py @@ -103,7 +103,9 @@ class TornadoQueueClient(SimpleQueueClient): def _connect(self, on_open_cb = None): self.log.info("Beginning TornadoQueueClient connection") - self._on_open_cb = on_open_cb + self._on_open_cbs = [] + if on_open_cb: + self._on_open_cbs.append(on_open_cb) self.connection = pika.adapters.TornadoConnection( self._get_parameters(), on_open_callback = self._on_open, @@ -122,8 +124,9 @@ class TornadoQueueClient(SimpleQueueClient): def _on_channel_open(self, channel): self.channel = channel - if self._on_open_cb: - self._on_open_cb() + for callback in self._on_open_cbs: + callback() + self._on_open_cbs = [] self.log.info('TornadoQueueClient connected') def _on_connection_closed(self, method_frame): @@ -151,3 +154,17 @@ class TornadoQueueClient(SimpleQueueClient): callback=finish) else: callback() + + def register_consumer(self, queue_name, consumer): + def wrapped_consumer(ch, method, properties, body): + consumer(ch, method, properties, body) + ch.basic_ack(delivery_tag=method.delivery_tag) + + if not self.ready(): + self._on_open_cbs.append(lambda: self.register_consumer(queue_name, consumer)) + return + + self.consumers[queue_name].add(wrapped_consumer) + self.ensure_queue(queue_name, + lambda: self.channel.basic_consume(wrapped_consumer, queue=queue_name, + consumer_tag=self._generate_ctag(queue_name))) diff --git a/zephyr/management/commands/runtornado.py b/zephyr/management/commands/runtornado.py index 8f100a824f..070e492b8c 100644 --- a/zephyr/management/commands/runtornado.py +++ b/zephyr/management/commands/runtornado.py @@ -66,12 +66,6 @@ class InstrumentedPoll(object): ioloop._poll = InstrumentedPoll -def step_tornado_ioloop(): - """Run the Tornado ioloop for a short time and return.""" - loop = ioloop.IOLoop.instance() - loop.add_timeout(time.time() + 0.1, loop.stop) - loop.start() - class Command(BaseCommand): option_list = BaseCommand.option_list + ( make_option('--nokeepalive', action='store_true', @@ -127,9 +121,6 @@ class Command(BaseCommand): if settings.USING_RABBITMQ: # Process notifications received via RabbitMQ queue_client = TornadoQueueClient() - while not queue_client.ready(): - step_tornado_ioloop() - def process_notification(chan, method, props, data): tornado_callbacks.process_notification(data) queue_client.register_json_consumer('notify_tornado', process_notification)