mirror of
https://github.com/zulip/zulip.git
synced 2025-11-11 01:16:19 +00:00
@@ -2,6 +2,7 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import ujson
|
import ujson
|
||||||
|
import smtplib
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.http import HttpResponse
|
from django.http import HttpResponse
|
||||||
@@ -60,6 +61,31 @@ class WorkerTest(ZulipTestCase):
|
|||||||
worker.setup()
|
worker.setup()
|
||||||
worker.start()
|
worker.start()
|
||||||
|
|
||||||
|
def test_email_sending_worker_retries(self):
|
||||||
|
# type: () -> None
|
||||||
|
"""Tests the retry_send_email_failures decorator to make sure it
|
||||||
|
retries sending the email 3 times and then gives up."""
|
||||||
|
fake_client = self.FakeClient()
|
||||||
|
|
||||||
|
data = {'test': 'test', 'failed_tries': 0, 'id': 'test_missed'}
|
||||||
|
fake_client.queue.append(('missedmessage_email_senders', data))
|
||||||
|
|
||||||
|
def fake_publish(queue_name, event, processor):
|
||||||
|
# type: (str, Dict[str, Any], Callable[[Any], None]) -> None
|
||||||
|
fake_client.queue.append((queue_name, event))
|
||||||
|
|
||||||
|
with simulated_queue_client(lambda: fake_client):
|
||||||
|
worker = queue_processors.MissedMessageSendingWorker()
|
||||||
|
worker.setup()
|
||||||
|
with patch('zerver.worker.queue_processors.send_email_from_dict',
|
||||||
|
side_effect=smtplib.SMTPServerDisconnected), \
|
||||||
|
patch('zerver.lib.queue.queue_json_publish',
|
||||||
|
side_effect=fake_publish), \
|
||||||
|
patch('logging.exception'):
|
||||||
|
worker.start()
|
||||||
|
|
||||||
|
self.assertEqual(data['failed_tries'], 4)
|
||||||
|
|
||||||
def test_UserActivityWorker(self):
|
def test_UserActivityWorker(self):
|
||||||
# type: () -> None
|
# type: () -> None
|
||||||
fake_client = self.FakeClient()
|
fake_client = self.FakeClient()
|
||||||
|
|||||||
@@ -4,6 +4,10 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, cast
|
|||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
|
import smtplib
|
||||||
|
import socket
|
||||||
|
|
||||||
from zulip_bots.lib import ExternalBotHandler, StateHandler
|
from zulip_bots.lib import ExternalBotHandler, StateHandler
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -16,7 +20,7 @@ from zerver.models import \
|
|||||||
from zerver.lib.context_managers import lockfile
|
from zerver.lib.context_managers import lockfile
|
||||||
from zerver.lib.error_notify import do_report_error
|
from zerver.lib.error_notify import do_report_error
|
||||||
from zerver.lib.feedback import handle_feedback
|
from zerver.lib.feedback import handle_feedback
|
||||||
from zerver.lib.queue import SimpleQueueClient, queue_json_publish
|
from zerver.lib.queue import SimpleQueueClient, queue_json_publish, retry_event
|
||||||
from zerver.lib.timestamp import timestamp_to_datetime
|
from zerver.lib.timestamp import timestamp_to_datetime
|
||||||
from zerver.lib.notifications import handle_missedmessage_emails, enqueue_welcome_emails
|
from zerver.lib.notifications import handle_missedmessage_emails, enqueue_welcome_emails
|
||||||
from zerver.lib.push_notifications import handle_push_notification
|
from zerver.lib.push_notifications import handle_push_notification
|
||||||
@@ -98,6 +102,27 @@ def check_and_send_restart_signal():
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def retry_send_email_failures(func):
|
||||||
|
# type: (Callable[[Any, Dict[str, Any]], None]) -> Callable[[QueueProcessingWorker, Dict[str, Any]], None]
|
||||||
|
# If we don't use cast() and use QueueProcessingWorker instead of Any in
|
||||||
|
# function type annotation then mypy complains.
|
||||||
|
func = cast(Callable[[QueueProcessingWorker, Dict[str, Any]], None], func)
|
||||||
|
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(worker, data):
|
||||||
|
# type: (QueueProcessingWorker, Dict[str, Any]) -> None
|
||||||
|
try:
|
||||||
|
func(worker, data)
|
||||||
|
except (smtplib.SMTPServerDisconnected, socket.gaierror):
|
||||||
|
|
||||||
|
def on_failure(event):
|
||||||
|
# type: (Dict[str, Any]) -> None
|
||||||
|
logging.exception("Event {} failed".format(event['id']))
|
||||||
|
|
||||||
|
retry_event(worker.queue_name, data, on_failure)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
class QueueProcessingWorker(object):
|
class QueueProcessingWorker(object):
|
||||||
queue_name = None # type: str
|
queue_name = None # type: str
|
||||||
|
|
||||||
@@ -245,6 +270,7 @@ class MissedMessageWorker(QueueProcessingWorker):
|
|||||||
|
|
||||||
@assign_queue('missedmessage_email_senders')
|
@assign_queue('missedmessage_email_senders')
|
||||||
class MissedMessageSendingWorker(QueueProcessingWorker):
|
class MissedMessageSendingWorker(QueueProcessingWorker):
|
||||||
|
@retry_send_email_failures
|
||||||
def consume(self, data):
|
def consume(self, data):
|
||||||
# type: (Dict[str, Any]) -> None
|
# type: (Dict[str, Any]) -> None
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user