queue: Rename consume_wrapper to a better name.

This commit is contained in:
Alex Vandiver
2020-10-09 13:44:15 -07:00
parent d5a6b0f99a
commit 2547bdbf4a
2 changed files with 4 additions and 4 deletions

View File

@@ -383,7 +383,7 @@ def queue_json_publish(
else: else:
# Must be imported here: A top section import leads to circular imports # Must be imported here: A top section import leads to circular imports
from zerver.worker.queue_processors import get_worker from zerver.worker.queue_processors import get_worker
get_worker(queue_name).consume_wrapper(event) get_worker(queue_name).consume_single_event(event)
def retry_event(queue_name: str, def retry_event(queue_name: str,
event: Dict[str, Any], event: Dict[str, Any],

View File

@@ -306,9 +306,9 @@ class QueueProcessingWorker(ABC):
self.consume_iteration_counter = 0 self.consume_iteration_counter = 0
self.update_statistics(remaining_local_queue_size) self.update_statistics(remaining_local_queue_size)
def consume_wrapper(self, data: Dict[str, Any]) -> None: def consume_single_event(self, event: Dict[str, Any]) -> None:
consume_func = lambda events: self.consume(events[0]) consume_func = lambda events: self.consume(events[0])
self.do_consume(consume_func, [data]) self.do_consume(consume_func, [event])
def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None: def _handle_consume_exception(self, events: List[Dict[str, Any]], exception: Exception) -> None:
with configure_scope() as scope: with configure_scope() as scope:
@@ -340,7 +340,7 @@ class QueueProcessingWorker(ABC):
def start(self) -> None: def start(self) -> None:
assert self.q is not None assert self.q is not None
self.initialize_statistics() self.initialize_statistics()
self.q.register_json_consumer(self.queue_name, self.consume_wrapper) self.q.register_json_consumer(self.queue_name, self.consume_single_event)
self.q.start_consuming() self.q.start_consuming()
def stop(self) -> None: # nocoverage def stop(self) -> None: # nocoverage