Move QOS configuration into connection, not queue verification.

Prior to aa032bf62c, QOS prefetch was set on every `publish` and
before every `start_json_consumer` -- which had a large and
unnecessary effect on publishing rates, which don't care about the
prefetch QOS settings at all, much less re-setting them before every
publish.

Unfortunately, that change had the effect of causing prefetch settings
to almost never be respected -- since the configuration happened in
`ensure_queue`s re-check that the connection was still live.  The
initial connection is established in `__init__` via `_connect`, and
the consumer only calls `ensure_queue` once, before setting up the
consumer.

Having no prefetch value set causes an unbounded prefetch; this
manifests itself as the server attempting to shove every event down to
the worker as soon as it starts consuming; if the client cannot keep
up, the server closes the connection.  The worker observes the
connection has been shut down, and restarts.  While this does make
forward progress, it causes large queues to make progress more slowly,
as they suffer from sporadic restarts.

Shift the QOS configuration to when the connection is set up, which is
a more sensible place for it in general -- and ensures that it is set
on consumers and producers alike, but only once per connection
establishment.
This commit is contained in:
Alex Vandiver
2023-03-20 15:51:14 +00:00
committed by Tim Abbott
parent ad357b18c0
commit 311a76ed1c

View File

@@ -143,6 +143,7 @@ class SimpleQueueClient(QueueClient[BlockingChannel]):
start = time.time()
self.connection = pika.BlockingConnection(self._get_parameters())
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=self.prefetch)
self.log.info("SimpleQueueClient connected (connecting took %.3fs)", time.time() - start)
def _reconnect(self) -> None:
@@ -161,7 +162,6 @@ class SimpleQueueClient(QueueClient[BlockingChannel]):
if self.connection is None or not self.connection.is_open:
self._connect()
assert self.channel is not None
self.channel.basic_qos(prefetch_count=self.prefetch)
else:
assert self.channel is not None