mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 14:03:30 +00:00
rabbitmq: Reorder a bit to group our reconnect logic together.
This commit is contained in:
@@ -197,17 +197,6 @@ class TornadoQueueClient(SimpleQueueClient):
|
||||
self.queues = set()
|
||||
self._connect()
|
||||
|
||||
def _on_open(self, connection: pika.connection.Connection) -> None:
|
||||
self.connection.channel(
|
||||
on_open_callback = self._on_channel_open)
|
||||
|
||||
def _on_channel_open(self, channel: BlockingChannel) -> None:
|
||||
self.channel = channel
|
||||
for callback in self._on_open_cbs:
|
||||
callback()
|
||||
self._reconnect_consumer_callbacks()
|
||||
self.log.info('TornadoQueueClient connected')
|
||||
|
||||
def _on_connection_closed(self, connection: pika.connection.Connection,
|
||||
reply_code: int, reply_text: str) -> None:
|
||||
self.log.warning("TornadoQueueClient lost connection to RabbitMQ, reconnecting...")
|
||||
@@ -225,6 +214,17 @@ class TornadoQueueClient(SimpleQueueClient):
|
||||
|
||||
ioloop.IOLoop.instance().call_later(retry_seconds, on_timeout)
|
||||
|
||||
def _on_open(self, connection: pika.connection.Connection) -> None:
|
||||
self.connection.channel(
|
||||
on_open_callback = self._on_channel_open)
|
||||
|
||||
def _on_channel_open(self, channel: BlockingChannel) -> None:
|
||||
self.channel = channel
|
||||
for callback in self._on_open_cbs:
|
||||
callback()
|
||||
self._reconnect_consumer_callbacks()
|
||||
self.log.info('TornadoQueueClient connected')
|
||||
|
||||
def ensure_queue(self, queue_name: str, callback: Callable[[], None]) -> None:
|
||||
def finish(frame: Any) -> None:
|
||||
self.queues.add(queue_name)
|
||||
|
||||
Reference in New Issue
Block a user