queue_processors: Avoid queue worker timeouts in tests.

For tests that use the dev server, like test-api, test-js-with-puppeteer,
we don't have the consumers for the queues. As they eventually timeout,
we get unnecessary error messages. This adds a new flag, disable_timeout,
to disable this behavior for the test cases.
This commit is contained in:
Zixuan James Li
2023-06-27 20:06:57 -04:00
committed by Tim Abbott
parent a686c0cc02
commit b6d1e56cac
3 changed files with 62 additions and 34 deletions

View File

@@ -431,10 +431,11 @@ def queue_json_publish(
elif processor: elif processor:
processor(event) processor(event)
else: else:
# The else branch is only hit during tests, where rabbitmq is not enabled.
# Must be imported here: A top section import leads to circular imports # Must be imported here: A top section import leads to circular imports
from zerver.worker.queue_processors import get_worker from zerver.worker.queue_processors import get_worker
get_worker(queue_name).consume_single_event(event) get_worker(queue_name, disable_timeout=True).consume_single_event(event)
def retry_event( def retry_event(

View File

@@ -703,10 +703,13 @@ class WorkerTest(ZulipTestCase):
def consume(self, data: Mapping[str, Any]) -> None: def consume(self, data: Mapping[str, Any]) -> None:
if data["type"] == "timeout": if data["type"] == "timeout":
time.sleep(5) time.sleep(1.5)
processed.append(data["type"]) processed.append(data["type"])
fake_client = FakeClient() fake_client = FakeClient()
def assert_timeout(should_timeout: bool, threaded: bool, disable_timeout: bool) -> None:
processed.clear()
for msg in ["good", "fine", "timeout", "back to normal"]: for msg in ["good", "fine", "timeout", "back to normal"]:
fake_client.enqueue("timeout_worker", {"type": msg}) fake_client.enqueue("timeout_worker", {"type": msg})
@@ -715,8 +718,11 @@ class WorkerTest(ZulipTestCase):
os.remove(fn) os.remove(fn)
with simulated_queue_client(fake_client): with simulated_queue_client(fake_client):
worker = TimeoutWorker() worker = TimeoutWorker(threaded=threaded, disable_timeout=disable_timeout)
worker.setup() worker.setup()
if not should_timeout:
worker.start()
else:
with self.assertLogs(level="ERROR") as m: with self.assertLogs(level="ERROR") as m:
worker.start() worker.start()
self.assertEqual( self.assertEqual(
@@ -725,6 +731,10 @@ class WorkerTest(ZulipTestCase):
) )
self.assertIn(m.records[0].stack_info, m.output[0]) self.assertIn(m.records[0].stack_info, m.output[0])
if not should_timeout:
self.assertEqual(processed, ["good", "fine", "timeout", "back to normal"])
return
self.assertEqual(processed, ["good", "fine", "back to normal"]) self.assertEqual(processed, ["good", "fine", "back to normal"])
with open(fn) as f: with open(fn) as f:
line = f.readline().strip() line = f.readline().strip()
@@ -733,6 +743,12 @@ class WorkerTest(ZulipTestCase):
event = events[0] event = events[0]
self.assertEqual(event["type"], "timeout") self.assertEqual(event["type"], "timeout")
# Do the bulky truth table check
assert_timeout(should_timeout=False, threaded=True, disable_timeout=False)
assert_timeout(should_timeout=False, threaded=True, disable_timeout=True)
assert_timeout(should_timeout=True, threaded=False, disable_timeout=False)
assert_timeout(should_timeout=False, threaded=False, disable_timeout=True)
def test_embed_links_timeout(self) -> None: def test_embed_links_timeout(self) -> None:
@queue_processors.assign_queue("timeout_worker", is_test_queue=True) @queue_processors.assign_queue("timeout_worker", is_test_queue=True)
class TimeoutWorker(FetchLinksEmbedData): class TimeoutWorker(FetchLinksEmbedData):

View File

@@ -161,8 +161,10 @@ def register_worker(
test_queues.add(queue_name) test_queues.add(queue_name)
def get_worker(queue_name: str, threaded: bool = False) -> "QueueProcessingWorker": def get_worker(
return worker_classes[queue_name](threaded=threaded) queue_name: str, threaded: bool = False, disable_timeout: bool = False
) -> "QueueProcessingWorker":
return worker_classes[queue_name](threaded=threaded, disable_timeout=disable_timeout)
def get_active_worker_queues(only_test_queues: bool = False) -> List[str]: def get_active_worker_queues(only_test_queues: bool = False) -> List[str]:
@@ -222,9 +224,10 @@ class QueueProcessingWorker(ABC):
# startup and steady-state memory. # startup and steady-state memory.
PREFETCH = 100 PREFETCH = 100
def __init__(self, threaded: bool = False) -> None: def __init__(self, threaded: bool = False, disable_timeout: bool = False) -> None:
self.q: Optional[SimpleQueueClient] = None self.q: Optional[SimpleQueueClient] = None
self.threaded = threaded self.threaded = threaded
self.disable_timeout = disable_timeout
if not hasattr(self, "queue_name"): if not hasattr(self, "queue_name"):
raise WorkerDeclarationError("Queue worker declared without queue_name") raise WorkerDeclarationError("Queue worker declared without queue_name")
@@ -302,7 +305,7 @@ class QueueProcessingWorker(ABC):
self.update_statistics() self.update_statistics()
time_start = time.time() time_start = time.time()
if self.MAX_CONSUME_SECONDS and not self.threaded: if self.MAX_CONSUME_SECONDS and not self.threaded and not self.disable_timeout:
try: try:
signal.signal( signal.signal(
signal.SIGALRM, signal.SIGALRM,
@@ -763,8 +766,8 @@ class MissedMessageWorker(QueueProcessingWorker):
@assign_queue("email_senders") @assign_queue("email_senders")
class EmailSendingWorker(LoopQueueProcessingWorker): class EmailSendingWorker(LoopQueueProcessingWorker):
def __init__(self, threaded: bool = False) -> None: def __init__(self, threaded: bool = False, disable_timeout: bool = False) -> None:
super().__init__(threaded) super().__init__(threaded, disable_timeout)
self.connection: BaseEmailBackend = initialize_connection(None) self.connection: BaseEmailBackend = initialize_connection(None)
@retry_send_email_failures @retry_send_email_failures
@@ -1178,9 +1181,13 @@ class NoopWorker(QueueProcessingWorker):
"""Used to profile the queue processing framework, in zilencer's queue_rate.""" """Used to profile the queue processing framework, in zilencer's queue_rate."""
def __init__( def __init__(
self, threaded: bool = False, max_consume: int = 1000, slow_queries: Sequence[int] = [] self,
threaded: bool = False,
disable_timeout: bool = False,
max_consume: int = 1000,
slow_queries: Sequence[int] = [],
) -> None: ) -> None:
super().__init__(threaded) super().__init__(threaded, disable_timeout)
self.consumed = 0 self.consumed = 0
self.max_consume = max_consume self.max_consume = max_consume
self.slow_queries: Set[int] = set(slow_queries) self.slow_queries: Set[int] = set(slow_queries)
@@ -1202,9 +1209,13 @@ class BatchNoopWorker(LoopQueueProcessingWorker):
batch_size = 100 batch_size = 100
def __init__( def __init__(
self, threaded: bool = False, max_consume: int = 1000, slow_queries: Sequence[int] = [] self,
threaded: bool = False,
disable_timeout: bool = False,
max_consume: int = 1000,
slow_queries: Sequence[int] = [],
) -> None: ) -> None:
super().__init__(threaded) super().__init__(threaded, disable_timeout)
self.consumed = 0 self.consumed = 0
self.max_consume = max_consume self.max_consume = max_consume
self.slow_queries: Set[int] = set(slow_queries) self.slow_queries: Set[int] = set(slow_queries)