diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index 3abaad80c1..9bc70922df 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -252,7 +252,10 @@ class TornadoQueueClient(QueueClient[Channel]): def __init__(self) -> None: super().__init__( # TornadoConnection can process heartbeats, so enable them. - rabbitmq_heartbeat=None + rabbitmq_heartbeat=None, + # Only ask for 100 un-acknowledged messages at once from + # the server, rather than an unbounded number. + prefetch=100, ) self._on_open_cbs: List[Callable[[Channel], None]] = [] self._connection_failure_count = 0 diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 626acaff6a..b552a1b558 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -219,6 +219,12 @@ class QueueProcessingWorker(ABC): CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50 MAX_SECONDS_BEFORE_UPDATE_STATS = 30 + # How many un-acknowledged events the worker should have on hand, + # fetched from the rabbitmq server. Larger values may be more + # performant, but if queues are large, cause more network IO at + # startup and steady-state memory. + PREFETCH = 100 + def __init__(self) -> None: self.q: Optional[SimpleQueueClient] = None if not hasattr(self, "queue_name"): @@ -390,7 +396,7 @@ class QueueProcessingWorker(ABC): check_and_send_restart_signal() def setup(self) -> None: - self.q = SimpleQueueClient() + self.q = SimpleQueueClient(prefetch=self.PREFETCH) def start(self) -> None: assert self.q is not None @@ -409,6 +415,9 @@ class LoopQueueProcessingWorker(QueueProcessingWorker): sleep_delay = 1 batch_size = 100 + def setup(self) -> None: + self.q = SimpleQueueClient(prefetch=max(self.PREFETCH, self.batch_size)) + def start(self) -> None: # nocoverage assert self.q is not None self.initialize_statistics()