From dbcc76b996e6e89477e5c589fe586c2a12675c20 Mon Sep 17 00:00:00 2001 From: Steve Howell Date: Wed, 15 Nov 2017 06:27:41 -0800 Subject: [PATCH] Fix MissedMessageWorker. This fixes a regression in 25c669df52720876155de8d3493252dcaa8e4c37. We were draining the queue in both the superclass and the subclass, so by the time the subclass started processing events, there were no events to process. Now the subclass properly uses the events passed in from the superclass. --- zerver/tests/test_queue_worker.py | 101 ++++++++++++++++++++++++++++++ zerver/worker/queue_processors.py | 1 - 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 711d4b1ff9..321521eb21 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -18,8 +18,17 @@ from zerver.worker.queue_processors import ( get_active_worker_queues, QueueProcessingWorker, LoopQueueProcessingWorker, + MissedMessageWorker, ) +Event = Dict[str, Any] + +# This is used for testing LoopQueueProcessingWorker, which +# would run forever if we don't mock time.sleep to abort the +# loop. +class AbortLoop(Exception): + pass + class WorkerTest(ZulipTestCase): class FakeClient: def __init__(self) -> None: @@ -36,6 +45,98 @@ class WorkerTest(ZulipTestCase): callback = self.consumers[queue_name] callback(data) + def drain_queue(self, queue_name: str, json: bool) -> List[Event]: + assert json + events = [ + dct + for (queue_name, dct) + in self.queue + ] + + # IMPORTANT! + # This next line prevents us from double draining + # queues, which was a bug at one point. + self.queue = [] + + return events + + def test_missed_message_worker(self) -> None: + cordelia = self.example_user('cordelia') + hamlet = self.example_user('hamlet') + othello = self.example_user('othello') + + hamlet1_msg_id = self.send_personal_message( + from_email=cordelia.email, + to_email=hamlet.email, + content='hi hamlet', + ) + + hamlet2_msg_id = self.send_personal_message( + from_email=cordelia.email, + to_email=hamlet.email, + content='goodbye hamlet', + ) + + othello_msg_id = self.send_personal_message( + from_email=cordelia.email, + to_email=othello.email, + content='where art thou, othello?', + ) + + events = [ + dict(user_profile_id=hamlet.id, message_id=hamlet1_msg_id), + dict(user_profile_id=hamlet.id, message_id=hamlet2_msg_id), + dict(user_profile_id=othello.id, message_id=othello_msg_id), + ] + + fake_client = self.FakeClient() + for event in events: + fake_client.queue.append(('missedmessage_emails', event)) + + mmw = MissedMessageWorker() + + time_mock = patch( + 'zerver.worker.queue_processors.time.sleep', + side_effect=AbortLoop, + ) + + send_mock = patch( + 'zerver.lib.notifications.do_send_missedmessage_events_reply_in_zulip' + ) + + with send_mock as sm, time_mock as tm: + with simulated_queue_client(lambda: fake_client): + try: + mmw.setup() + mmw.start() + except AbortLoop: + pass + + self.assertEqual(tm.call_args[0][0], 120) # should sleep two minutes + + args = [c[0] for c in sm.call_args_list] + arg_dict = { + arg[0].id: dict( + missed_messages=arg[1], + count=arg[2], + ) + for arg in args + } + + hamlet_info = arg_dict[hamlet.id] + self.assertEqual(hamlet_info['count'], 2) + self.assertEqual( + {m.content for m in hamlet_info['missed_messages']}, + {'hi hamlet', 'goodbye hamlet'}, + ) + + othello_info = arg_dict[othello.id] + self.assertEqual(othello_info['count'], 1) + self.assertEqual( + {m.content for m in othello_info['missed_messages']}, + {'where art thou, othello?'} + ) + def test_mirror_worker(self) -> None: fake_client = self.FakeClient() data = [ diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 368a74e586..9ddf77180e 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -276,7 +276,6 @@ class MissedMessageWorker(LoopQueueProcessingWorker): sleep_delay = 2 * 60 def consume_batch(self, missed_events: List[Dict[str, Any]]) -> None: - missed_events = self.q.drain_queue("missedmessage_emails", json=True) by_recipient = defaultdict(list) # type: Dict[int, List[Dict[str, Any]]] for event in missed_events: