Files
zulip/zerver/worker/email_mirror.py
Alex Vandiver 1f0cfd4662 email-mirror: Add a standalone server that processes incoming email.
Using postfix to handle the incoming email gateway complicates things
a great deal:

- It cannot verify that incoming email addresses exist in Zulip before
  accepting them; it thus accepts mail at the `RCPT TO` stage which it
  cannot handle, and thus must reject after the `DATA`.

- It is built to handle both incoming and outgoing email, which
  results in subtle errors (1c17583ad5, 79931051bd, a53092687e,
  #18600).

- Rate-limiting happens much too late to avoid denial of
  service (#12501).

- Mis-configurations of the HTTP endpoint can break incoming
  mail (#18105).

Provide a replacement SMTP server which accepts incoming email on port
25, verifies that Zulip can accept the address, and that no
rate-limits are being broken, and then adds it directly to the
relevant queue.

Removes an incorrect comment which implied that missed-message
addresses were only usable once.  We leave rate-limiting to only
channel email addresses, since missed-message addresses are unlikely
to be placed into automated systems, as channel email addresses are.

Also simplifies #7814 somewhat.
2025-05-19 16:39:44 -07:00

39 lines
1.2 KiB
Python

# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
import base64
import email.parser
import email.policy
import logging
from collections.abc import Mapping
from email.message import EmailMessage
from typing import Any
from typing_extensions import override
from zerver.lib.email_mirror import process_message as mirror_email
from zerver.worker.base import QueueProcessingWorker, WorkerTimeoutError, assign_queue
logger = logging.getLogger(__name__)
@assign_queue("email_mirror")
class MirrorWorker(QueueProcessingWorker):
MAX_CONSUME_SECONDS = 5
@override
def consume(self, event: Mapping[str, Any]) -> None:
rcpt_to = event["rcpt_to"]
content = base64.b64decode(event["msg_base64"])
msg = email.parser.BytesParser(_class=EmailMessage, policy=email.policy.default).parsebytes(
content
)
try:
mirror_email(msg, rcpt_to=rcpt_to)
except WorkerTimeoutError: # nocoverage
logging.error(
"Timed out ingesting message-id %s to %s (%d bytes) -- dropping!",
msg["Message-ID"] or "<?>",
rcpt_to,
len(content),
)
return