Upgrade pika to 1.1.*.

Upgrade pika to 1.1.* and make some changes accordingly
to comply with the new version.

Fixes #12899.
This commit is contained in:
Rafid Aslam
2019-10-09 09:38:43 +07:00
committed by Tim Abbott
parent c7c6f01236
commit 447f74ae63
6 changed files with 14 additions and 14 deletions

View File

@@ -52,7 +52,7 @@ class SimpleQueueClient:
credentials = pika.PlainCredentials(settings.RABBITMQ_USERNAME,
settings.RABBITMQ_PASSWORD)
return pika.ConnectionParameters(settings.RABBITMQ_HOST,
heartbeat_interval=self.rabbitmq_heartbeat,
heartbeat=self.rabbitmq_heartbeat,
credentials=credentials)
def _generate_ctag(self, queue_name: str) -> str:
@@ -60,8 +60,8 @@ class SimpleQueueClient:
def _reconnect_consumer_callback(self, queue: str, consumer: Consumer) -> None:
self.log.info("Queue reconnecting saved consumer %s to queue %s" % (consumer, queue))
self.ensure_queue(queue, lambda: self.channel.basic_consume(consumer,
queue=queue,
self.ensure_queue(queue, lambda: self.channel.basic_consume(queue,
consumer,
consumer_tag=self._generate_ctag(queue)))
def _reconnect_consumer_callbacks(self) -> None:
@@ -124,7 +124,7 @@ class SimpleQueueClient:
self.consumers[queue_name].add(wrapped_consumer)
self.ensure_queue(queue_name,
lambda: self.channel.basic_consume(wrapped_consumer, queue=queue_name,
lambda: self.channel.basic_consume(queue_name, wrapped_consumer,
consumer_tag=self._generate_ctag(queue_name)))
def register_json_consumer(self, queue_name: str,
@@ -282,7 +282,7 @@ class TornadoQueueClient(SimpleQueueClient):
self.consumers[queue_name].add(wrapped_consumer)
self.ensure_queue(queue_name,
lambda: self.channel.basic_consume(wrapped_consumer, queue=queue_name,
lambda: self.channel.basic_consume(queue_name, wrapped_consumer,
consumer_tag=self._generate_ctag(queue_name)))
queue_client = None # type: Optional[SimpleQueueClient]