diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 156a2f6c54..792b5e4e13 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -2,6 +2,7 @@ import os import time import ujson +import smtplib from django.conf import settings from django.http import HttpResponse @@ -60,6 +61,31 @@ class WorkerTest(ZulipTestCase): worker.setup() worker.start() + def test_email_sending_worker_retries(self): + # type: () -> None + """Tests the retry_send_email_failures decorator to make sure it + retries sending the email 3 times and then gives up.""" + fake_client = self.FakeClient() + + data = {'test': 'test', 'failed_tries': 0, 'id': 'test_missed'} + fake_client.queue.append(('missedmessage_email_senders', data)) + + def fake_publish(queue_name, event, processor): + # type: (str, Dict[str, Any], Callable[[Any], None]) -> None + fake_client.queue.append((queue_name, event)) + + with simulated_queue_client(lambda: fake_client): + worker = queue_processors.MissedMessageSendingWorker() + worker.setup() + with patch('zerver.worker.queue_processors.send_email_from_dict', + side_effect=smtplib.SMTPServerDisconnected), \ + patch('zerver.lib.queue.queue_json_publish', + side_effect=fake_publish), \ + patch('logging.exception'): + worker.start() + + self.assertEqual(data['failed_tries'], 4) + def test_UserActivityWorker(self): # type: () -> None fake_client = self.FakeClient() diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 9083f1c456..d3ec026e70 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -4,6 +4,10 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, cast import signal import sys import os +from functools import wraps + +import smtplib +import socket from zulip_bots.lib import ExternalBotHandler, StateHandler from django.conf import settings @@ -16,7 +20,7 @@ from zerver.models import \ from zerver.lib.context_managers import lockfile from zerver.lib.error_notify import do_report_error from zerver.lib.feedback import handle_feedback -from zerver.lib.queue import SimpleQueueClient, queue_json_publish +from zerver.lib.queue import SimpleQueueClient, queue_json_publish, retry_event from zerver.lib.timestamp import timestamp_to_datetime from zerver.lib.notifications import handle_missedmessage_emails, enqueue_welcome_emails from zerver.lib.push_notifications import handle_push_notification @@ -98,6 +102,27 @@ def check_and_send_restart_signal(): except Exception: pass +def retry_send_email_failures(func): + # type: (Callable[[Any, Dict[str, Any]], None]) -> Callable[[QueueProcessingWorker, Dict[str, Any]], None] + # If we don't use cast() and use QueueProcessingWorker instead of Any in + # function type annotation then mypy complains. + func = cast(Callable[[QueueProcessingWorker, Dict[str, Any]], None], func) + + @wraps(func) + def wrapper(worker, data): + # type: (QueueProcessingWorker, Dict[str, Any]) -> None + try: + func(worker, data) + except (smtplib.SMTPServerDisconnected, socket.gaierror): + + def on_failure(event): + # type: (Dict[str, Any]) -> None + logging.exception("Event {} failed".format(event['id'])) + + retry_event(worker.queue_name, data, on_failure) + + return wrapper + class QueueProcessingWorker(object): queue_name = None # type: str @@ -245,6 +270,7 @@ class MissedMessageWorker(QueueProcessingWorker): @assign_queue('missedmessage_email_senders') class MissedMessageSendingWorker(QueueProcessingWorker): + @retry_send_email_failures def consume(self, data): # type: (Dict[str, Any]) -> None try: