diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 711d4b1ff9..321521eb21 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -18,8 +18,17 @@ from zerver.worker.queue_processors import ( get_active_worker_queues, QueueProcessingWorker, LoopQueueProcessingWorker, + MissedMessageWorker, ) +Event = Dict[str, Any] + +# This is used for testing LoopQueueProcessingWorker, which +# would run forever if we don't mock time.sleep to abort the +# loop. +class AbortLoop(Exception): + pass + class WorkerTest(ZulipTestCase): class FakeClient: def __init__(self) -> None: @@ -36,6 +45,98 @@ class WorkerTest(ZulipTestCase): callback = self.consumers[queue_name] callback(data) + def drain_queue(self, queue_name: str, json: bool) -> List[Event]: + assert json + events = [ + dct + for (queue_name, dct) + in self.queue + ] + + # IMPORTANT! + # This next line prevents us from double draining + # queues, which was a bug at one point. + self.queue = [] + + return events + + def test_missed_message_worker(self) -> None: + cordelia = self.example_user('cordelia') + hamlet = self.example_user('hamlet') + othello = self.example_user('othello') + + hamlet1_msg_id = self.send_personal_message( + from_email=cordelia.email, + to_email=hamlet.email, + content='hi hamlet', + ) + + hamlet2_msg_id = self.send_personal_message( + from_email=cordelia.email, + to_email=hamlet.email, + content='goodbye hamlet', + ) + + othello_msg_id = self.send_personal_message( + from_email=cordelia.email, + to_email=othello.email, + content='where art thou, othello?', + ) + + events = [ + dict(user_profile_id=hamlet.id, message_id=hamlet1_msg_id), + dict(user_profile_id=hamlet.id, message_id=hamlet2_msg_id), + dict(user_profile_id=othello.id, message_id=othello_msg_id), + ] + + fake_client = self.FakeClient() + for event in events: + fake_client.queue.append(('missedmessage_emails', event)) + + mmw = MissedMessageWorker() + + time_mock = patch( + 'zerver.worker.queue_processors.time.sleep', + side_effect=AbortLoop, + ) + + send_mock = patch( + 'zerver.lib.notifications.do_send_missedmessage_events_reply_in_zulip' + ) + + with send_mock as sm, time_mock as tm: + with simulated_queue_client(lambda: fake_client): + try: + mmw.setup() + mmw.start() + except AbortLoop: + pass + + self.assertEqual(tm.call_args[0][0], 120) # should sleep two minutes + + args = [c[0] for c in sm.call_args_list] + arg_dict = { + arg[0].id: dict( + missed_messages=arg[1], + count=arg[2], + ) + for arg in args + } + + hamlet_info = arg_dict[hamlet.id] + self.assertEqual(hamlet_info['count'], 2) + self.assertEqual( + {m.content for m in hamlet_info['missed_messages']}, + {'hi hamlet', 'goodbye hamlet'}, + ) + + othello_info = arg_dict[othello.id] + self.assertEqual(othello_info['count'], 1) + self.assertEqual( + {m.content for m in othello_info['missed_messages']}, + {'where art thou, othello?'} + ) + def test_mirror_worker(self) -> None: fake_client = self.FakeClient() data = [ diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 368a74e586..9ddf77180e 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -276,7 +276,6 @@ class MissedMessageWorker(LoopQueueProcessingWorker): sleep_delay = 2 * 60 def consume_batch(self, missed_events: List[Dict[str, Any]]) -> None: - missed_events = self.q.drain_queue("missedmessage_emails", json=True) by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]] for event in missed_events: