From 19d115a9da0ac3debc2697a43f0c2ef71abdb64e Mon Sep 17 00:00:00 2001 From: Alex Vandiver Date: Fri, 17 Mar 2023 22:10:15 +0000 Subject: [PATCH] email_mirror: Set a short timeout on parsing incoming emails. This timeout needs to be short enough that we don't drop the RabbitMQ connection. Also drop the offending message (by returning with no further exception) so we don't hit a head-of-queue failure situation. Ideally, the parser would just be lightning-fast, so this would never happen. --- zerver/worker/email_mirror.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/zerver/worker/email_mirror.py b/zerver/worker/email_mirror.py index b2b6cb3496..755241b417 100644 --- a/zerver/worker/email_mirror.py +++ b/zerver/worker/email_mirror.py @@ -16,18 +16,21 @@ from zerver.lib.email_mirror import ( ) from zerver.lib.email_mirror import process_message as mirror_email from zerver.lib.exceptions import RateLimitedError -from zerver.worker.base import QueueProcessingWorker, assign_queue +from zerver.worker.base import QueueProcessingWorker, WorkerTimeoutError, assign_queue logger = logging.getLogger(__name__) @assign_queue("email_mirror") class MirrorWorker(QueueProcessingWorker): + MAX_CONSUME_SECONDS = 5 + @override def consume(self, event: Mapping[str, Any]) -> None: rcpt_to = event["rcpt_to"] + content = base64.b64decode(event["msg_base64"]) msg = email.message_from_bytes( - base64.b64decode(event["msg_base64"]), + content, policy=email.policy.default, ) assert isinstance(msg, EmailMessage) # https://github.com/python/typeshed/issues/2417 @@ -45,4 +48,13 @@ class MirrorWorker(QueueProcessingWorker): ) return - mirror_email(msg, rcpt_to=rcpt_to) + try: + mirror_email(msg, rcpt_to=rcpt_to) + except WorkerTimeoutError: # nocoverage + logging.error( + "Timed out ingesting message-id %s to %s (%d bytes) -- dropping!", + msg["Message-ID"] or "", + rcpt_to, + len(content), + ) + return