mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 14:03:30 +00:00
queue: Allow passing down a prefetch count to pika.
This commit is contained in:
committed by
Tim Abbott
parent
8d239f4984
commit
7c3507feef
@@ -32,10 +32,12 @@ class QueueClient(Generic[ChannelT], metaclass=ABCMeta):
|
|||||||
self,
|
self,
|
||||||
# Disable RabbitMQ heartbeats by default because BlockingConnection can't process them
|
# Disable RabbitMQ heartbeats by default because BlockingConnection can't process them
|
||||||
rabbitmq_heartbeat: Optional[int] = 0,
|
rabbitmq_heartbeat: Optional[int] = 0,
|
||||||
|
prefetch: int = 0,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.log = logging.getLogger("zulip.queue")
|
self.log = logging.getLogger("zulip.queue")
|
||||||
self.queues: Set[str] = set()
|
self.queues: Set[str] = set()
|
||||||
self.channel: Optional[ChannelT] = None
|
self.channel: Optional[ChannelT] = None
|
||||||
|
self.prefetch = prefetch
|
||||||
self.consumers: Dict[str, Set[Consumer[ChannelT]]] = defaultdict(set)
|
self.consumers: Dict[str, Set[Consumer[ChannelT]]] = defaultdict(set)
|
||||||
self.rabbitmq_heartbeat = rabbitmq_heartbeat
|
self.rabbitmq_heartbeat = rabbitmq_heartbeat
|
||||||
self.is_consuming = False
|
self.is_consuming = False
|
||||||
@@ -158,9 +160,12 @@ class SimpleQueueClient(QueueClient[BlockingChannel]):
|
|||||||
self._connect()
|
self._connect()
|
||||||
|
|
||||||
assert self.channel is not None
|
assert self.channel is not None
|
||||||
|
self.channel.basic_qos(prefetch_count=self.prefetch)
|
||||||
|
|
||||||
if queue_name not in self.queues:
|
if queue_name not in self.queues:
|
||||||
self.channel.queue_declare(queue=queue_name, durable=True)
|
self.channel.queue_declare(queue=queue_name, durable=True)
|
||||||
self.queues.add(queue_name)
|
self.queues.add(queue_name)
|
||||||
|
|
||||||
callback(self.channel)
|
callback(self.channel)
|
||||||
|
|
||||||
def start_json_consumer(
|
def start_json_consumer(
|
||||||
@@ -329,9 +334,13 @@ class TornadoQueueClient(QueueClient[Channel]):
|
|||||||
self.connection.close()
|
self.connection.close()
|
||||||
|
|
||||||
def ensure_queue(self, queue_name: str, callback: Callable[[Channel], object]) -> None:
|
def ensure_queue(self, queue_name: str, callback: Callable[[Channel], object]) -> None:
|
||||||
def finish(frame: Any) -> None:
|
def set_qos(frame: Any) -> None:
|
||||||
assert self.channel is not None
|
assert self.channel is not None
|
||||||
self.queues.add(queue_name)
|
self.queues.add(queue_name)
|
||||||
|
self.channel.basic_qos(prefetch_count=self.prefetch, callback=finish)
|
||||||
|
|
||||||
|
def finish(frame: Any) -> None:
|
||||||
|
assert self.channel is not None
|
||||||
callback(self.channel)
|
callback(self.channel)
|
||||||
|
|
||||||
if queue_name not in self.queues:
|
if queue_name not in self.queues:
|
||||||
@@ -342,7 +351,7 @@ class TornadoQueueClient(QueueClient[Channel]):
|
|||||||
return
|
return
|
||||||
|
|
||||||
assert self.channel is not None
|
assert self.channel is not None
|
||||||
self.channel.queue_declare(queue=queue_name, durable=True, callback=finish)
|
self.channel.queue_declare(queue=queue_name, durable=True, callback=set_qos)
|
||||||
else:
|
else:
|
||||||
assert self.channel is not None
|
assert self.channel is not None
|
||||||
callback(self.channel)
|
callback(self.channel)
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ Event = Dict[str, Any]
|
|||||||
|
|
||||||
|
|
||||||
class FakeClient:
|
class FakeClient:
|
||||||
def __init__(self) -> None:
|
def __init__(self, prefetch: int = 0) -> None:
|
||||||
self.queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
self.queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
||||||
|
|
||||||
def enqueue(self, queue_name: str, data: Dict[str, Any]) -> None:
|
def enqueue(self, queue_name: str, data: Dict[str, Any]) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user