mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 14:03:30 +00:00
rabbitmq: Do a better job of retrying failed connections.
Empirically, the retry in `_on_connection_closed` didn't actually work -- if a reconnect failed, that was it, and the exception handler didn't get run. A traceback would get logged, but all its frames were in Tornado or Pika, not our own code; presumably something magic and async was happening to the exception. Moreover, though we would make one attempt to reconnect if we had a connection that got closed, we didn't have any form of retry if the original attempt at connecting failed in the first place. Happily, upstream offers a perfectly reasonable bit of API that avoids both of these problems: the on-open-error callback. So use that.
This commit is contained in:
@@ -10,6 +10,7 @@ from django.conf import settings
|
||||
import pika
|
||||
from pika.adapters.blocking_connection import BlockingChannel
|
||||
from pika.spec import Basic
|
||||
from tornado import ioloop
|
||||
import ujson
|
||||
|
||||
from zerver.lib.utils import statsd
|
||||
@@ -188,6 +189,7 @@ class TornadoQueueClient(SimpleQueueClient):
|
||||
self.connection = ExceptionFreeTornadoConnection(
|
||||
self._get_parameters(),
|
||||
on_open_callback = self._on_open,
|
||||
on_open_error_callback = self._on_connection_open_error,
|
||||
on_close_callback = self._on_connection_closed,
|
||||
)
|
||||
|
||||
@@ -195,24 +197,24 @@ class TornadoQueueClient(SimpleQueueClient):
|
||||
self.connection = None
|
||||
self.channel = None
|
||||
self.queues = set()
|
||||
self.log.warning("TornadoQueueClient attempting to reconnect to RabbitMQ")
|
||||
self._connect()
|
||||
|
||||
CONNECTION_RETRY_SECS = 2
|
||||
|
||||
def _on_connection_open_error(self, connection: pika.connection.Connection,
|
||||
message: Optional[str]=None) -> None:
|
||||
retry_secs = self.CONNECTION_RETRY_SECS
|
||||
self.log.critical("TornadoQueueClient couldn't connect to RabbitMQ, retrying in %d secs..."
|
||||
% (retry_secs,))
|
||||
ioloop.IOLoop.instance().call_later(retry_secs, self._reconnect)
|
||||
|
||||
def _on_connection_closed(self, connection: pika.connection.Connection,
|
||||
reply_code: int, reply_text: str) -> None:
|
||||
self.log.warning("TornadoQueueClient lost connection to RabbitMQ, reconnecting...")
|
||||
from tornado import ioloop
|
||||
|
||||
# Try to reconnect in two seconds
|
||||
retry_seconds = 2
|
||||
|
||||
def on_timeout() -> None:
|
||||
try:
|
||||
self._reconnect()
|
||||
except pika.exceptions.AMQPConnectionError:
|
||||
self.log.critical("Failed to reconnect to RabbitMQ, retrying...")
|
||||
ioloop.IOLoop.instance().call_later(retry_seconds, on_timeout)
|
||||
|
||||
ioloop.IOLoop.instance().call_later(retry_seconds, on_timeout)
|
||||
retry_secs = self.CONNECTION_RETRY_SECS
|
||||
self.log.warning("TornadoQueueClient lost connection to RabbitMQ, reconnecting in %d secs..."
|
||||
% (retry_secs,))
|
||||
ioloop.IOLoop.instance().call_later(retry_secs, self._reconnect)
|
||||
|
||||
def _on_open(self, connection: pika.connection.Connection) -> None:
|
||||
self.connection.channel(
|
||||
|
||||
Reference in New Issue
Block a user