mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 14:03:30 +00:00 
			
		
		
		
	The only other flavor we have seen so far is ConnectionResetError, but that is just as much a client error which we should log and move on, and not log an exception for.
		
			
				
	
	
		
			214 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			214 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import asyncio
 | 
						|
import base64
 | 
						|
import email
 | 
						|
import grp
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import pwd
 | 
						|
import signal
 | 
						|
import smtplib
 | 
						|
import socket
 | 
						|
from collections.abc import Awaitable
 | 
						|
from contextlib import suppress
 | 
						|
from ssl import SSLContext, SSLError
 | 
						|
from typing import Any
 | 
						|
 | 
						|
from aiosmtpd.controller import UnthreadedController
 | 
						|
from aiosmtpd.handlers import Message as MessageHandler
 | 
						|
from aiosmtpd.smtp import SMTP, Envelope, Session, TLSSetupException
 | 
						|
from asgiref.sync import sync_to_async
 | 
						|
from django.conf import settings
 | 
						|
from django.core.mail import EmailMultiAlternatives as DjangoEmailMultiAlternatives
 | 
						|
from typing_extensions import override
 | 
						|
 | 
						|
from version import ZULIP_VERSION
 | 
						|
from zerver.lib.email_mirror import decode_stream_email_address, validate_to_address
 | 
						|
from zerver.lib.email_mirror_helpers import (
 | 
						|
    ZulipEmailForwardError,
 | 
						|
    get_email_gateway_message_string_from_address,
 | 
						|
)
 | 
						|
from zerver.lib.exceptions import JsonableError, RateLimitedError
 | 
						|
from zerver.lib.logging_util import log_to_file
 | 
						|
from zerver.lib.queue import queue_json_publish_rollback_unsafe
 | 
						|
 | 
						|
logger = logging.getLogger("zerver.lib.email_mirror")
 | 
						|
log_to_file(logger, settings.EMAIL_MIRROR_LOG_PATH)
 | 
						|
 | 
						|
 | 
						|
def send_to_postmaster(msg: email.message.Message) -> None:
 | 
						|
    # RFC5321 says:
 | 
						|
    #   Any system that includes an SMTP server supporting mail relaying or
 | 
						|
    #   delivery MUST support the reserved mailbox "postmaster" as a case-
 | 
						|
    #   insensitive local name.  This postmaster address is not strictly
 | 
						|
    #   necessary if the server always returns 554 on connection opening (as
 | 
						|
    #   described in Section 3.1).  The requirement to accept mail for
 | 
						|
    #   postmaster implies that RCPT commands that specify a mailbox for
 | 
						|
    #   postmaster at any of the domains for which the SMTP server provides
 | 
						|
    #   mail service, as well as the special case of "RCPT TO:<Postmaster>"
 | 
						|
    #   (with no domain specification), MUST be supported.
 | 
						|
    #
 | 
						|
    # We forward such mail to the ZULIP_ADMINISTRATOR.
 | 
						|
    mail = DjangoEmailMultiAlternatives(
 | 
						|
        subject=f"Mail to postmaster: {msg['Subject']}",
 | 
						|
        from_email=settings.NOREPLY_EMAIL_ADDRESS,
 | 
						|
        to=[settings.ZULIP_ADMINISTRATOR],
 | 
						|
    )
 | 
						|
    mail.attach(None, msg, "message/rfc822")
 | 
						|
    try:
 | 
						|
        mail.send()
 | 
						|
    except smtplib.SMTPResponseException as e:
 | 
						|
        logger.exception(
 | 
						|
            "Error sending bounce email to %s with error code %s: %s",
 | 
						|
            mail.to,
 | 
						|
            e.smtp_code,
 | 
						|
            e.smtp_error,
 | 
						|
            stack_info=True,
 | 
						|
        )
 | 
						|
    except smtplib.SMTPException as e:
 | 
						|
        logger.exception("Error sending bounce email to %s: %s", mail.to, str(e), stack_info=True)
 | 
						|
 | 
						|
 | 
						|
class ZulipMessageHandler(MessageHandler):
 | 
						|
    def __init__(self) -> None:
 | 
						|
        super().__init__(email.message.Message)
 | 
						|
 | 
						|
    async def handle_RCPT(
 | 
						|
        self,
 | 
						|
        server: SMTP,
 | 
						|
        session: Session,
 | 
						|
        envelope: Envelope,
 | 
						|
        address: str,
 | 
						|
        rcpt_options: list[str],
 | 
						|
    ) -> str:
 | 
						|
        # Rewrite all postmaster email addresses to just "postmaster"
 | 
						|
        if address.lower() == "postmaster":
 | 
						|
            envelope.rcpt_tos.append("postmaster")
 | 
						|
            return "250 Continue"
 | 
						|
 | 
						|
        with suppress(ZulipEmailForwardError):
 | 
						|
            if get_email_gateway_message_string_from_address(address).lower() == "postmaster":
 | 
						|
                envelope.rcpt_tos.append("postmaster")
 | 
						|
                return "250 Continue"
 | 
						|
 | 
						|
        try:
 | 
						|
            # Attempt to do the ACL-check now, just to reject early.
 | 
						|
            # The authoritative check is done in the worker, but we
 | 
						|
            # wish to reject now if we can do so simply.
 | 
						|
            await sync_to_async(validate_to_address)(address)
 | 
						|
 | 
						|
        except RateLimitedError:
 | 
						|
            recipient_realm = await sync_to_async(
 | 
						|
                lambda a: decode_stream_email_address(a)[0].realm
 | 
						|
            )(address)
 | 
						|
            logger.warning(
 | 
						|
                "Rejecting a MAIL FROM: %s to realm: %s - rate limited.",
 | 
						|
                envelope.mail_from,
 | 
						|
                recipient_realm.name,
 | 
						|
            )
 | 
						|
            return "550 4.7.0 Rate-limited due to too many emails on this realm."
 | 
						|
 | 
						|
        except ZulipEmailForwardError as e:
 | 
						|
            return f"550 5.1.1 Bad destination mailbox address: {e}"
 | 
						|
 | 
						|
        except JsonableError as e:
 | 
						|
            return f"550 5.7.1 Permission denied: {e}"
 | 
						|
 | 
						|
        envelope.rcpt_tos.append(address)
 | 
						|
        return "250 Continue"
 | 
						|
 | 
						|
    @override
 | 
						|
    def handle_message(self, message: email.message.Message) -> None:
 | 
						|
        msg_base64 = base64.b64encode(bytes(message))
 | 
						|
 | 
						|
        for address in message["X-RcptTo"].split(", "):
 | 
						|
            if address == "postmaster":
 | 
						|
                send_to_postmaster(message)
 | 
						|
            else:
 | 
						|
                queue_json_publish_rollback_unsafe(
 | 
						|
                    "email_mirror",
 | 
						|
                    {
 | 
						|
                        "rcpt_to": address,
 | 
						|
                        "msg_base64": msg_base64.decode(),
 | 
						|
                    },
 | 
						|
                )
 | 
						|
 | 
						|
    async def handle_exception(self, error: Exception) -> str:
 | 
						|
        if isinstance(error, TLSSetupException):  # nocoverage
 | 
						|
            if isinstance(error.__cause__, SSLError):
 | 
						|
                reason = error.__cause__.reason
 | 
						|
            else:
 | 
						|
                reason = str(error.__cause__)
 | 
						|
            logger.info("Dropping invalid TLS connection: %s", reason)
 | 
						|
            return f"421 4.7.6 TLS error: {reason}"
 | 
						|
        else:
 | 
						|
            logger.exception("SMTP session exception")
 | 
						|
            return "500 Server error"
 | 
						|
 | 
						|
 | 
						|
class PermissionDroppingUnthreadedController(UnthreadedController):  # nocoverage
 | 
						|
    @override
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        user: str | None = None,
 | 
						|
        group: str | None = None,
 | 
						|
        **kwargs: Any,
 | 
						|
    ) -> None:
 | 
						|
        super().__init__(**kwargs)
 | 
						|
 | 
						|
        # These may remain None in development, when elevated
 | 
						|
        # privileges are not needed because a non-low port is chosen.
 | 
						|
        self.user_id: int | None = None
 | 
						|
        self.group_id: int | None = None
 | 
						|
        if user is not None:
 | 
						|
            self.user_id = pwd.getpwnam(user).pw_uid
 | 
						|
        if group is not None:
 | 
						|
            self.group_id = grp.getgrnam(group).gr_gid
 | 
						|
 | 
						|
    @override
 | 
						|
    def _create_server(self) -> Awaitable[asyncio.AbstractServer]:
 | 
						|
        # Make the listen socket, then drop privileges before starting
 | 
						|
        # the server
 | 
						|
        server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
						|
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | 
						|
        server_socket.bind((self.hostname, self.port))
 | 
						|
        if os.geteuid() == 0:
 | 
						|
            assert self.user_id is not None
 | 
						|
            assert self.group_id is not None
 | 
						|
            logger.info("Dropping privileges to uid %d / gid %d", self.user_id, self.group_id)
 | 
						|
            os.setgid(self.group_id)
 | 
						|
            os.setuid(self.user_id)
 | 
						|
 | 
						|
        server = self.loop.create_server(
 | 
						|
            self._factory_invoker,
 | 
						|
            sock=server_socket,
 | 
						|
            ssl=self.ssl_context,
 | 
						|
        )
 | 
						|
 | 
						|
        return server
 | 
						|
 | 
						|
 | 
						|
def run_smtp_server(
 | 
						|
    user: str | None, group: str | None, host: str, port: int, tls_context: SSLContext | None
 | 
						|
) -> None:  # nocoverage
 | 
						|
    logger.info("Listening on %s:%d", host, port)
 | 
						|
    server = PermissionDroppingUnthreadedController(
 | 
						|
        user=user,
 | 
						|
        group=group,
 | 
						|
        hostname=host,
 | 
						|
        port=port,
 | 
						|
        handler=ZulipMessageHandler(),
 | 
						|
        tls_context=tls_context,
 | 
						|
        ident=f"Zulip Server {ZULIP_VERSION}",
 | 
						|
    )
 | 
						|
 | 
						|
    server.loop.add_signal_handler(signal.SIGINT, server.loop.stop)
 | 
						|
 | 
						|
    server.begin()
 | 
						|
    with suppress(KeyboardInterrupt):
 | 
						|
        # The KeyboardInterrupt will exit the loop, but there's no
 | 
						|
        # reason to throw a stacktrace rather than conduct an ordered
 | 
						|
        # exit.
 | 
						|
        server.loop.run_forever()
 | 
						|
 | 
						|
    server.end()
 |