diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 7e292d0faa..d7ba92fdab 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -207,6 +207,15 @@ class QueueProcessingWorker(ABC): ) os.rename(tmp_fn, fn) + def get_remaining_queue_size(self) -> int: + if self.q is not None: + return self.q.queue_size() + else: + # This is a special case that will happen if we're operating without + # using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of + # and the only reasonable size to return is 0. + return 0 + @abstractmethod def consume(self, data: Dict[str, Any]) -> None: pass @@ -228,11 +237,7 @@ class QueueProcessingWorker(ABC): if consume_time_seconds is not None: self.recent_consume_times.append((len(events), consume_time_seconds)) - if self.q is not None: - remaining_queue_size = self.q.queue_size() - else: - remaining_queue_size = 0 - + remaining_queue_size = self.get_remaining_queue_size() if remaining_queue_size == 0: self.queue_last_emptied_timestamp = time.time() self.consumed_since_last_emptied = 0