queue: Be more explicit about test/real queue division.

This commit is contained in:
Alex Vandiver
2020-10-23 17:24:10 -07:00
committed by Tim Abbott
parent 31d0141a30
commit 7cf737988d
3 changed files with 36 additions and 34 deletions

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse
import os import os
import sys import sys
@@ -17,10 +16,5 @@ django.setup()
from zerver.worker.queue_processors import get_active_worker_queues from zerver.worker.queue_processors import get_active_worker_queues
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() for worker in sorted(get_active_worker_queues()):
parser.add_argument('--queue-type',
help="Specify which types of queues to list")
args = parser.parse_args()
for worker in sorted(get_active_worker_queues(args.queue_type)):
print(worker) print(worker)

View File

@@ -3,7 +3,7 @@ import os
import smtplib import smtplib
import time import time
from collections import defaultdict from collections import defaultdict
from typing import Any, Callable, Dict, List, Mapping, Optional from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Type
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import orjson import orjson
@@ -516,7 +516,7 @@ class WorkerTest(ZulipTestCase):
def test_error_handling(self) -> None: def test_error_handling(self) -> None:
processed = [] processed = []
@queue_processors.assign_queue('unreliable_worker') @queue_processors.assign_queue('unreliable_worker', is_test_queue=True)
class UnreliableWorker(queue_processors.QueueProcessingWorker): class UnreliableWorker(queue_processors.QueueProcessingWorker):
def consume(self, data: Mapping[str, Any]) -> None: def consume(self, data: Mapping[str, Any]) -> None:
if data["type"] == 'unexpected behaviour': if data["type"] == 'unexpected behaviour':
@@ -553,7 +553,7 @@ class WorkerTest(ZulipTestCase):
processed = [] processed = []
@queue_processors.assign_queue('unreliable_loopworker') @queue_processors.assign_queue('unreliable_loopworker', is_test_queue=True)
class UnreliableLoopWorker(queue_processors.LoopQueueProcessingWorker): class UnreliableLoopWorker(queue_processors.LoopQueueProcessingWorker):
def consume_batch(self, events: List[Dict[str, Any]]) -> None: def consume_batch(self, events: List[Dict[str, Any]]) -> None:
for event in events: for event in events:
@@ -592,7 +592,7 @@ class WorkerTest(ZulipTestCase):
def test_timeouts(self) -> None: def test_timeouts(self) -> None:
processed = [] processed = []
@queue_processors.assign_queue('timeout_worker') @queue_processors.assign_queue('timeout_worker', is_test_queue=True)
class TimeoutWorker(queue_processors.QueueProcessingWorker): class TimeoutWorker(queue_processors.QueueProcessingWorker):
MAX_CONSUME_SECONDS = 1 MAX_CONSUME_SECONDS = 1
@@ -641,11 +641,22 @@ class WorkerTest(ZulipTestCase):
TestWorker() TestWorker()
def test_get_active_worker_queues(self) -> None: def test_get_active_worker_queues(self) -> None:
test_queue_count = len(get_active_worker_queues(queue_type='test')) test_queue_names = set(get_active_worker_queues(only_test_queues=True))
self.assertEqual(3, test_queue_count) # Actually 6, but test_timeouts, which defines TimeoutWorker,
# is called after this
self.assertEqual(5, len(test_queue_names))
worker_queue_count = (len(QueueProcessingWorker.__subclasses__()) + worker_queue_classes: Set[Type[QueueProcessingWorker]] = set()
len(EmailSendingWorker.__subclasses__()) + worker_queue_classes.update(QueueProcessingWorker.__subclasses__())
len(LoopQueueProcessingWorker.__subclasses__()) - 1) worker_queue_classes.update(EmailSendingWorker.__subclasses__())
self.assertEqual(worker_queue_count, len(get_active_worker_queues(include_test_queues=True))) worker_queue_classes.update(LoopQueueProcessingWorker.__subclasses__())
self.assertEqual(worker_queue_count - 3, len(get_active_worker_queues()))
# Remove the abstract class
worker_queue_classes -= set([queue_processors.LoopQueueProcessingWorker])
# This misses that TestWorker, defined in test_worker_noname
# with no assign_queue, because it runs after this
worker_queue_names = set([x.queue_name for x in worker_queue_classes])
self.assertEqual(set(get_active_worker_queues()),
worker_queue_names - test_queue_names)

View File

@@ -118,32 +118,29 @@ class WorkerDeclarationException(Exception):
ConcreteQueueWorker = TypeVar('ConcreteQueueWorker', bound='QueueProcessingWorker') ConcreteQueueWorker = TypeVar('ConcreteQueueWorker', bound='QueueProcessingWorker')
def assign_queue( def assign_queue(
queue_name: str, enabled: bool=True, queue_type: str="consumer", queue_name: str, enabled: bool=True, is_test_queue: bool=False,
) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]: ) -> Callable[[Type[ConcreteQueueWorker]], Type[ConcreteQueueWorker]]:
def decorate(clazz: Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]: def decorate(clazz: Type[ConcreteQueueWorker]) -> Type[ConcreteQueueWorker]:
clazz.queue_name = queue_name clazz.queue_name = queue_name
if enabled: if enabled:
register_worker(queue_name, clazz, queue_type) register_worker(queue_name, clazz, is_test_queue)
return clazz return clazz
return decorate return decorate
worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {} worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {} test_queues: Set[str] = set()
def register_worker(queue_name: str, clazz: Type['QueueProcessingWorker'], queue_type: str) -> None: def register_worker(queue_name: str, clazz: Type['QueueProcessingWorker'], is_test_queue: bool=False) -> None:
if queue_type not in queues:
queues[queue_type] = {}
queues[queue_type][queue_name] = clazz
worker_classes[queue_name] = clazz worker_classes[queue_name] = clazz
if is_test_queue:
test_queues.add(queue_name)
def get_worker(queue_name: str) -> 'QueueProcessingWorker': def get_worker(queue_name: str) -> 'QueueProcessingWorker':
return worker_classes[queue_name]() return worker_classes[queue_name]()
def get_active_worker_queues(queue_type: Optional[str]=None, include_test_queues: bool=False) -> List[str]: def get_active_worker_queues(only_test_queues: bool=False) -> List[str]:
"""Returns all the non-test worker queues.""" """Returns all (either test, or real) worker queues."""
if queue_type is None: return [queue_name for queue_name in worker_classes.keys()
return [queue_name for queue_name in worker_classes.keys() if bool(queue_name in test_queues) == only_test_queues]
if include_test_queues or queue_name not in queues["test"]]
return list(queues[queue_type].keys())
def check_and_send_restart_signal() -> None: def check_and_send_restart_signal() -> None:
try: try:
@@ -844,7 +841,7 @@ class DeferredWorker(QueueProcessingWorker):
user_profile.realm.string_id, time.time() - start, user_profile.realm.string_id, time.time() - start,
) )
@assign_queue('test', queue_type="test") @assign_queue('test', is_test_queue=True)
class TestWorker(QueueProcessingWorker): class TestWorker(QueueProcessingWorker):
# This worker allows you to test the queue worker infrastructure without # This worker allows you to test the queue worker infrastructure without
# creating significant side effects. It can be useful in development or # creating significant side effects. It can be useful in development or
@@ -857,7 +854,7 @@ class TestWorker(QueueProcessingWorker):
with open(fn, 'ab') as f: with open(fn, 'ab') as f:
f.write(message + b'\n') f.write(message + b'\n')
@assign_queue('noop', queue_type="test") @assign_queue('noop', is_test_queue=True)
class NoopWorker(QueueProcessingWorker): class NoopWorker(QueueProcessingWorker):
"""Used to profile the queue processing framework, in zilencer's queue_rate.""" """Used to profile the queue processing framework, in zilencer's queue_rate."""
@@ -875,7 +872,7 @@ class NoopWorker(QueueProcessingWorker):
if self.consumed >= self.max_consume: if self.consumed >= self.max_consume:
self.stop() self.stop()
@assign_queue('noop_batch', queue_type="test") @assign_queue('noop_batch', is_test_queue=True)
class BatchNoopWorker(LoopQueueProcessingWorker): class BatchNoopWorker(LoopQueueProcessingWorker):
"""Used to profile the queue processing framework, in zilencer's queue_rate.""" """Used to profile the queue processing framework, in zilencer's queue_rate."""
batch_size = 500 batch_size = 500