mirror of
https://github.com/zulip/zulip.git
synced 2025-11-02 04:53:36 +00:00
queue_processors: Set a bounded prefetch size on rabbitmq queues.
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
This commit is contained in:
committed by
Tim Abbott
parent
7c3507feef
commit
faeffa2466
@@ -252,7 +252,10 @@ class TornadoQueueClient(QueueClient[Channel]):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
# TornadoConnection can process heartbeats, so enable them.
|
||||
rabbitmq_heartbeat=None
|
||||
rabbitmq_heartbeat=None,
|
||||
# Only ask for 100 un-acknowledged messages at once from
|
||||
# the server, rather than an unbounded number.
|
||||
prefetch=100,
|
||||
)
|
||||
self._on_open_cbs: List[Callable[[Channel], None]] = []
|
||||
self._connection_failure_count = 0
|
||||
|
||||
@@ -219,6 +219,12 @@ class QueueProcessingWorker(ABC):
|
||||
CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50
|
||||
MAX_SECONDS_BEFORE_UPDATE_STATS = 30
|
||||
|
||||
# How many un-acknowledged events the worker should have on hand,
|
||||
# fetched from the rabbitmq server. Larger values may be more
|
||||
# performant, but if queues are large, cause more network IO at
|
||||
# startup and steady-state memory.
|
||||
PREFETCH = 100
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.q: Optional[SimpleQueueClient] = None
|
||||
if not hasattr(self, "queue_name"):
|
||||
@@ -390,7 +396,7 @@ class QueueProcessingWorker(ABC):
|
||||
check_and_send_restart_signal()
|
||||
|
||||
def setup(self) -> None:
|
||||
self.q = SimpleQueueClient()
|
||||
self.q = SimpleQueueClient(prefetch=self.PREFETCH)
|
||||
|
||||
def start(self) -> None:
|
||||
assert self.q is not None
|
||||
@@ -409,6 +415,9 @@ class LoopQueueProcessingWorker(QueueProcessingWorker):
|
||||
sleep_delay = 1
|
||||
batch_size = 100
|
||||
|
||||
def setup(self) -> None:
|
||||
self.q = SimpleQueueClient(prefetch=max(self.PREFETCH, self.batch_size))
|
||||
|
||||
def start(self) -> None: # nocoverage
|
||||
assert self.q is not None
|
||||
self.initialize_statistics()
|
||||
|
||||
Reference in New Issue
Block a user