mirror of
https://github.com/zulip/zulip.git
synced 2025-11-02 21:13:36 +00:00
@@ -23,6 +23,8 @@ class SimpleQueueClient(object):
|
|||||||
self.queues = set() # type: Set[str]
|
self.queues = set() # type: Set[str]
|
||||||
self.channel = None # type: Any
|
self.channel = None # type: Any
|
||||||
self.consumers = defaultdict(set) # type: Dict[str, Set[Any]]
|
self.consumers = defaultdict(set) # type: Dict[str, Set[Any]]
|
||||||
|
# Disable RabbitMQ heartbeats since BlockingConnection can't process them
|
||||||
|
self.rabbitmq_heartbeat = 0
|
||||||
self._connect()
|
self._connect()
|
||||||
|
|
||||||
def _connect(self):
|
def _connect(self):
|
||||||
@@ -38,9 +40,12 @@ class SimpleQueueClient(object):
|
|||||||
self._connect()
|
self._connect()
|
||||||
|
|
||||||
def _get_parameters(self):
|
def _get_parameters(self):
|
||||||
|
# We explicitly disable the RabbitMQ heartbeat feature, since
|
||||||
|
# it doesn't make sense with BlockingConnection
|
||||||
credentials = pika.PlainCredentials(settings.RABBITMQ_USERNAME,
|
credentials = pika.PlainCredentials(settings.RABBITMQ_USERNAME,
|
||||||
settings.RABBITMQ_PASSWORD)
|
settings.RABBITMQ_PASSWORD)
|
||||||
return pika.ConnectionParameters(settings.RABBITMQ_HOST,
|
return pika.ConnectionParameters(settings.RABBITMQ_HOST,
|
||||||
|
heartbeat_interval=self.rabbitmq_heartbeat,
|
||||||
credentials=credentials)
|
credentials=credentials)
|
||||||
|
|
||||||
def _generate_ctag(self, queue_name):
|
def _generate_ctag(self, queue_name):
|
||||||
@@ -158,6 +163,8 @@ class TornadoQueueClient(SimpleQueueClient):
|
|||||||
# https://pika.readthedocs.io/en/0.9.8/examples/asynchronous_consumer_example.html
|
# https://pika.readthedocs.io/en/0.9.8/examples/asynchronous_consumer_example.html
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(TornadoQueueClient, self).__init__()
|
super(TornadoQueueClient, self).__init__()
|
||||||
|
# Enable rabbitmq heartbeat since TornadoConection can process them
|
||||||
|
self.rabbitmq_heartbeat = None
|
||||||
self._on_open_cbs = [] # type: List[Callable[[], None]]
|
self._on_open_cbs = [] # type: List[Callable[[], None]]
|
||||||
|
|
||||||
def _connect(self, on_open_cb = None):
|
def _connect(self, on_open_cb = None):
|
||||||
|
|||||||
Reference in New Issue
Block a user