diff --git a/zerver/worker/email_mirror.py b/zerver/worker/email_mirror.py index b2b6cb3496..755241b417 100644 --- a/zerver/worker/email_mirror.py +++ b/zerver/worker/email_mirror.py @@ -16,18 +16,21 @@ from zerver.lib.email_mirror import ( ) from zerver.lib.email_mirror import process_message as mirror_email from zerver.lib.exceptions import RateLimitedError -from zerver.worker.base import QueueProcessingWorker, assign_queue +from zerver.worker.base import QueueProcessingWorker, WorkerTimeoutError, assign_queue logger = logging.getLogger(__name__) @assign_queue("email_mirror") class MirrorWorker(QueueProcessingWorker): + MAX_CONSUME_SECONDS = 5 + @override def consume(self, event: Mapping[str, Any]) -> None: rcpt_to = event["rcpt_to"] + content = base64.b64decode(event["msg_base64"]) msg = email.message_from_bytes( - base64.b64decode(event["msg_base64"]), + content, policy=email.policy.default, ) assert isinstance(msg, EmailMessage) # https://github.com/python/typeshed/issues/2417 @@ -45,4 +48,13 @@ class MirrorWorker(QueueProcessingWorker): ) return - mirror_email(msg, rcpt_to=rcpt_to) + try: + mirror_email(msg, rcpt_to=rcpt_to) + except WorkerTimeoutError: # nocoverage + logging.error( + "Timed out ingesting message-id %s to %s (%d bytes) -- dropping!", + msg["Message-ID"] or "", + rcpt_to, + len(content), + ) + return