email-mirror: Add a standalone server that processes incoming email.

Using postfix to handle the incoming email gateway complicates things
a great deal:

- It cannot verify that incoming email addresses exist in Zulip before
  accepting them; it thus accepts mail at the `RCPT TO` stage which it
  cannot handle, and thus must reject after the `DATA`.

- It is built to handle both incoming and outgoing email, which
  results in subtle errors (1c17583ad5, 79931051bd, a53092687e,
  #18600).

- Rate-limiting happens much too late to avoid denial of
  service (#12501).

- Mis-configurations of the HTTP endpoint can break incoming
  mail (#18105).

Provide a replacement SMTP server which accepts incoming email on port
25, verifies that Zulip can accept the address, and that no
rate-limits are being broken, and then adds it directly to the
relevant queue.

Removes an incorrect comment which implied that missed-message
addresses were only usable once.  We leave rate-limiting to only
channel email addresses, since missed-message addresses are unlikely
to be placed into automated systems, as channel email addresses are.

Also simplifies #7814 somewhat.
This commit is contained in:
Alex Vandiver
2021-09-13 18:06:26 +00:00
committed by Tim Abbott
parent e6bcde00e3
commit 1f0cfd4662
13 changed files with 737 additions and 109 deletions

View File

@@ -116,7 +116,12 @@ def execute_from_command_line(argv: list[str] | None = None) -> None:
if __name__ == "__main__":
assert_not_running_as_root()
if len(sys.argv) > 1 and sys.argv[1] == "email_server":
# This needs to be able to run as root to read certificates
# for STARTTLS, and bind to port 25.
pass
else:
assert_not_running_as_root()
config_file = configparser.RawConfigParser()
config_file.read("/etc/zulip/zulip.conf")

View File

@@ -0,0 +1,23 @@
class zulip::local_mailserver {
include zulip::snakeoil
package { 'postfix':
# TODO/compatibility: We can remove this when upgrading directly
# from 10.x is no longer possible. We do not use "purged" here,
# since that would remove config files, which users may have had
# installed.
ensure => absent,
}
file { "${zulip::common::supervisor_conf_dir}/email-mirror.conf":
ensure => file,
require => [
Package[supervisor],
Package[postfix],
],
owner => 'root',
group => 'root',
mode => '0644',
content => template('zulip/supervisor/email-mirror.conf.template.erb'),
notify => Service[$zulip::common::supervisor_service],
}
}

View File

@@ -0,0 +1,14 @@
[program:zulip-email-server]
command=nice -n15 /home/zulip/deployments/current/manage.py email_server --user zulip --group zulip
environment=HTTP_proxy="<%= @proxy %>",HTTPS_proxy="<%= @proxy %>"
priority=350 ; the relative start priority (default 999)
autostart=true ; start at supervisord start (default: true)
autorestart=true ; whether/when to restart (default: unexpected)
stopsignal=TERM ; signal used to kill process (default TERM)
topwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10)
user=root ; setuid to this UNIX account to run the program
redirect_stderr=true ; redirect proc stderr to stdout (default false)
stdout_logfile=/var/log/zulip/email_server.log ; stdout log path, NONE for none; default AUTO
stdout_logfile_maxbytes=20MB ; max # logfile bytes b4 rotation (default 50MB)
stdout_logfile_backups=3 ; # of stdout logfile backups (default 10)
directory=/home/zulip/deployments/current/

View File

@@ -110,6 +110,8 @@ if has_application_server():
# the restart is fine, as clients will transparently retry.
workers.append("zulip-tus")
workers.extend(list_supervisor_processes(["zulip-email-server"]))
if has_process_fts_updates():
workers.append("process-fts-updates")

View File

@@ -51,6 +51,7 @@ if has_application_server():
services.append("zulip-workers:*")
services.append("zulip-tus")
services.append("zulip-katex")
services.append("zulip-email-server")
if has_application_server(once=True):
# These used to be included in "zulip-workers:*"; we may be
# stopping an older version of Zulip, which has not applied

View File

@@ -38,6 +38,8 @@ FILES_WITH_LEGACY_SUBJECT = {
# to fix everything until we migrate the DB to "topic".
"zerver/tests/test_message_fetch.py",
"zerver/tests/test_message_topics.py",
# This is actually email subjects
"zerver/lib/email_mirror_server.py",
}
shebang_rules: list["Rule"] = [

View File

@@ -0,0 +1,216 @@
import asyncio
import base64
import email
import grp
import logging
import os
import pwd
import signal
import smtplib
import socket
from collections.abc import Awaitable
from contextlib import suppress
from ssl import SSLContext, SSLError
from typing import Any
from aiosmtpd.controller import UnthreadedController
from aiosmtpd.handlers import Message as MessageHandler
from aiosmtpd.smtp import SMTP, Envelope, Session, TLSSetupException
from asgiref.sync import sync_to_async
from django.conf import settings
from django.core.mail import EmailMultiAlternatives as DjangoEmailMultiAlternatives
from typing_extensions import override
from version import ZULIP_VERSION
from zerver.lib.email_mirror import (
decode_stream_email_address,
is_missed_message_address,
rate_limit_mirror_by_realm,
validate_to_address,
)
from zerver.lib.email_mirror_helpers import (
ZulipEmailForwardError,
get_email_gateway_message_string_from_address,
)
from zerver.lib.exceptions import RateLimitedError
from zerver.lib.logging_util import log_to_file
from zerver.lib.queue import queue_json_publish_rollback_unsafe
logger = logging.getLogger("zerver.lib.email_mirror")
log_to_file(logger, settings.EMAIL_MIRROR_LOG_PATH)
def send_to_postmaster(msg: email.message.Message) -> None:
# RFC5321 says:
# Any system that includes an SMTP server supporting mail relaying or
# delivery MUST support the reserved mailbox "postmaster" as a case-
# insensitive local name. This postmaster address is not strictly
# necessary if the server always returns 554 on connection opening (as
# described in Section 3.1). The requirement to accept mail for
# postmaster implies that RCPT commands that specify a mailbox for
# postmaster at any of the domains for which the SMTP server provides
# mail service, as well as the special case of "RCPT TO:<Postmaster>"
# (with no domain specification), MUST be supported.
#
# We forward such mail to the ZULIP_ADMINISTRATOR.
mail = DjangoEmailMultiAlternatives(
subject=f"Mail to postmaster: {msg['Subject']}",
from_email=settings.NOREPLY_EMAIL_ADDRESS,
to=[settings.ZULIP_ADMINISTRATOR],
)
mail.attach(None, msg, "message/rfc822")
try:
mail.send()
except smtplib.SMTPResponseException as e:
logger.exception(
"Error sending bounce email to %s with error code %s: %s",
mail.to,
e.smtp_code,
e.smtp_error,
stack_info=True,
)
except smtplib.SMTPException as e:
logger.exception("Error sending bounce email to %s: %s", mail.to, str(e), stack_info=True)
class ZulipMessageHandler(MessageHandler):
def __init__(self) -> None:
super().__init__(email.message.Message)
async def handle_RCPT(
self,
server: SMTP,
session: Session,
envelope: Envelope,
address: str,
rcpt_options: list[str],
) -> str:
# Rewrite all postmaster email addresses to just "postmaster"
if address.lower() == "postmaster":
envelope.rcpt_tos.append("postmaster")
return "250 Continue"
with suppress(ZulipEmailForwardError):
if get_email_gateway_message_string_from_address(address).lower() == "postmaster":
envelope.rcpt_tos.append("postmaster")
return "250 Continue"
try:
await sync_to_async(validate_to_address)(address)
if not is_missed_message_address(address):
# Only channel email addresses are rate-limited, since
# they are likely to be used as the destination for
# mails from automated systems.
recipient_realm = await sync_to_async(
lambda a: decode_stream_email_address(a)[0].realm
)(address)
try:
rate_limit_mirror_by_realm(recipient_realm)
except RateLimitedError:
logger.warning(
"Rejecting a MAIL FROM: %s to realm: %s - rate limited.",
envelope.mail_from,
recipient_realm.name,
)
return "550 4.7.0 Rate-limited due to too many emails on this realm."
except ZulipEmailForwardError as e:
return f"550 5.1.1 Bad destination mailbox address: {e}"
envelope.rcpt_tos.append(address)
return "250 Continue"
@override
def handle_message(self, message: email.message.Message) -> None:
msg_base64 = base64.b64encode(bytes(message))
for address in message["X-RcptTo"].split(", "):
if address == "postmaster":
send_to_postmaster(message)
else:
queue_json_publish_rollback_unsafe(
"email_mirror",
{
"rcpt_to": address,
"msg_base64": msg_base64.decode(),
},
)
async def handle_exception(self, error: Exception) -> str:
if isinstance(error, TLSSetupException) and isinstance(
error.__cause__, SSLError
): # nocoverage
logger.info("Dropping invalid TLS connection: %s", error.__cause__.reason)
# The client probably never sees this error code, but for completeness:
return f"421 4.7.6 TLS error: {error.__cause__.reason}"
else:
logger.exception("SMTP session exception")
return "500 Server error"
class PermissionDroppingUnthreadedController(UnthreadedController): # nocoverage
@override
def __init__(
self,
user: str | None = None,
group: str | None = None,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
# These may remain None in development, when elevated
# privileges are not needed because a non-low port is chosen.
self.user_id: int | None = None
self.group_id: int | None = None
if user is not None:
self.user_id = pwd.getpwnam(user).pw_uid
if group is not None:
self.group_id = grp.getgrnam(group).gr_gid
@override
def _create_server(self) -> Awaitable[asyncio.AbstractServer]:
# Make the listen socket, then drop privileges before starting
# the server
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((self.hostname, self.port))
if os.geteuid() == 0:
assert self.user_id is not None
assert self.group_id is not None
logger.info("Dropping privileges to uid %d / gid %d", self.user_id, self.group_id)
os.setgid(self.group_id)
os.setuid(self.user_id)
server = self.loop.create_server(
self._factory_invoker,
sock=server_socket,
ssl=self.ssl_context,
)
return server
def run_smtp_server(
user: str | None, group: str | None, host: str, port: int, tls_context: SSLContext | None
) -> None: # nocoverage
logger.info("Listening on %s:%d", host, port)
server = PermissionDroppingUnthreadedController(
user=user,
group=group,
hostname=host,
port=port,
handler=ZulipMessageHandler(),
tls_context=tls_context,
ident=f"Zulip Server {ZULIP_VERSION}",
)
server.loop.add_signal_handler(signal.SIGINT, server.loop.stop)
server.begin()
with suppress(KeyboardInterrupt):
# The KeyboardInterrupt will exit the loop, but there's no
# reason to throw a stacktrace rather than conduct an ordered
# exit.
server.loop.run_forever()
server.end()

View File

@@ -4,10 +4,9 @@ for forwarding emails into Zulip.
https://zulip.readthedocs.io/en/latest/production/email-gateway.html
The email gateway supports two major modes of operation: An email
server (using postfix) where the email address configured in
EMAIL_GATEWAY_PATTERN delivers emails directly to Zulip, and this, a
cron job that connects to an IMAP inbox (which receives the emails)
periodically.
server where the email address configured in EMAIL_GATEWAY_PATTERN
delivers emails directly to Zulip, and this, a cron job that connects
to an IMAP inbox (which receives the emails) periodically.
Run this in a cron job every N minutes if you have configured Zulip to
poll an external IMAP mailbox for messages. The script will then

View File

@@ -0,0 +1,73 @@
import os
import ssl
from typing import Any
from urllib.parse import SplitResult
from django.core.management.base import BaseCommand, CommandParser
from typing_extensions import override
from zerver.lib.email_mirror_server import run_smtp_server
class Command(BaseCommand):
help = """SMTP server to ingest incoming emails"""
@override
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--listen", help="[Port, or address:port, to bind HTTP server to]", default="0.0.0.0:25"
)
parser.add_argument(
"--user",
help="User to drop privileges to, if started as root.",
type=str,
required=(os.geteuid() == 0),
)
parser.add_argument(
"--group",
help="Group to drop privileges to, if started as root.",
type=str,
required=(os.geteuid() == 0),
)
tls_cert: str | None = None
tls_key: str | None = None
if os.access("/etc/ssl/certs/zulip.combined-chain.crt", os.R_OK) and os.access(
"/etc/ssl/private/zulip.key", os.R_OK
):
tls_cert = "/etc/ssl/certs/zulip.combined-chain.crt"
tls_key = "/etc/ssl/private/zulip.key"
elif os.access("/etc/ssl/certs/ssl-cert-snakeoil.pem", os.R_OK) and os.access(
"/etc/ssl/private/ssl-cert-snakeoil.key", os.R_OK
):
tls_cert = "/etc/ssl/certs/ssl-cert-snakeoil.pem"
tls_key = "/etc/ssl/private/ssl-cert-snakeoil.key"
parser.add_argument(
"--tls-cert",
help="Path to TLS certificate chain file",
type=str,
default=tls_cert,
)
parser.add_argument(
"--tls-key",
help="Path to TLS private key file",
type=str,
default=tls_key,
)
@override
def handle(self, *args: Any, **options: Any) -> None:
listen = options["listen"]
if listen.isdigit():
host, port = "0.0.0.0", int(listen) # noqa: S104
else:
r = SplitResult("", listen, "", "", "")
if r.port is None:
raise RuntimeError(f"{listen!r} does not have a valid port number.")
host, port = r.hostname or "0.0.0.0", r.port # noqa: S104
if options["tls_cert"] and options["tls_key"]:
tls_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
tls_context.load_cert_chain(options["tls_cert"], options["tls_key"])
else:
tls_context = None
run_smtp_server(options["user"], options["group"], host, port, tls_context)

View File

@@ -1,23 +1,36 @@
import asyncio
import base64
import email.parser
import email.policy
import os
import subprocess
from collections.abc import Callable, Mapping
from contextlib import suppress
from datetime import timedelta
from email.headerregistry import Address
from email.message import EmailMessage, MIMEPart
from smtplib import SMTPException, SMTPSenderRefused
from typing import TYPE_CHECKING, Any
from unittest import mock
import orjson
import time_machine
from aiosmtpd.smtp import SMTP
from asgiref.sync import sync_to_async
from django.conf import settings
from django.core import mail
from django.core.mail.backends.locmem import EmailBackend
from django.test import override_settings
from django.utils.timezone import now as timezone_now
from zerver.actions.realm_settings import do_deactivate_realm
from zerver.actions.streams import do_change_stream_group_based_setting, do_deactivate_stream
from zerver.actions.users import do_change_user_role, do_deactivate_user
from zerver.lib.email_mirror import (
RateLimitedRealmMirror,
create_missed_message_address,
filter_footer,
generate_missed_message_token,
get_missed_message_token_from_address,
is_forwarded,
is_missed_message_address,
@@ -34,6 +47,7 @@ from zerver.lib.email_mirror_helpers import (
get_channel_email_token,
get_email_gateway_message_string_from_address,
)
from zerver.lib.email_mirror_server import ZulipMessageHandler, send_to_postmaster
from zerver.lib.email_notifications import convert_html_to_markdown
from zerver.lib.send_email import FromAddress
from zerver.lib.streams import ensure_stream
@@ -2072,3 +2086,383 @@ class TestEmailMirrorLogAndReport(ZulipTestCase):
redacted_message = redact_email_address(error_message)
self.assertEqual(redacted_message, expected_message)
class TestEmailMirrorServer(ZulipTestCase):
def test_send_postmaster(self) -> None:
email = EmailMessage()
email.set_content("Hello postmaster!")
email["Subject"] = "This goes to the postmaster"
email["From"] = "bogus@example.com"
email["To"] = "postmaster"
send_to_postmaster(email)
self.assert_length(mail.outbox, 1)
self.assertEqual(mail.outbox[0].subject, "Mail to postmaster: This goes to the postmaster")
self.assertEqual(mail.outbox[0].body, "")
self.assert_length(mail.outbox[0].attachments, 1)
def test_send_postmaster_failure(self) -> None:
email = EmailMessage()
email.set_content("Hello postmaster!")
email["Subject"] = "This goes to the postmaster"
email["From"] = "bogus@example.com"
email["To"] = "postmaster"
with (
mock.patch.object(EmailBackend, "send_messages", side_effect=SMTPException("moose")),
self.assertLogs("zerver.lib.email_mirror", "ERROR") as error_log,
):
send_to_postmaster(email)
self.assert_length(error_log.output, 1)
self.assertEqual(
error_log.output[0].splitlines()[0],
"ERROR:zerver.lib.email_mirror:Error sending bounce email to ['desdemona+admin@zulip.com']: moose",
)
with (
mock.patch.object(
EmailBackend,
"send_messages",
side_effect=SMTPSenderRefused(
530, b"5.5.1 Authentication required", "noreply@testserver"
),
),
self.assertLogs("zerver.lib.email_mirror", "ERROR") as error_log,
):
send_to_postmaster(email)
self.assert_length(error_log.output, 1)
self.assertEqual(
error_log.output[0].splitlines()[0],
(
"ERROR:zerver.lib.email_mirror:Error sending bounce email to ['desdemona+admin@zulip.com']"
" with error code 530: b'5.5.1 Authentication required'"
),
)
async def handler_response(self, commands: list[str]) -> list[str]:
responses: list[bytes] = []
transport = mock.Mock()
transport.get_extra_info.return_value = "other-host:1234"
transport.write = responses.append
protocol = SMTP(
handler=ZulipMessageHandler(),
hostname="testhost",
ident="Zulip 1.2.3",
)
protocol.connection_made(transport)
protocol.data_received(b"".join([c.encode() + b"\r\n" for c in commands]))
with suppress(asyncio.CancelledError):
assert protocol._handler_coroutine
await protocol._handler_coroutine
return [r.decode() for r in responses]
@override_settings(EMAIL_GATEWAY_PATTERN="%s@zulip.example.com")
async def test_handler_error(self) -> None:
with (
mock.patch.object(
ZulipMessageHandler, "handle_RCPT", side_effect=Exception("Some bug")
) as m,
self.assertLogs("zerver.lib.email_mirror", level="WARNING") as error_logs,
):
self.assertEqual(
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
"RCPT TO: <bogus@other.example.com>",
"QUIT",
]
),
[
"220 testhost Zulip 1.2.3\r\n",
"250 testhost\r\n",
"250 OK\r\n",
"500 Server error\r\n",
"221 Bye\r\n",
],
)
m.assert_called_once()
self.assert_length(error_logs.output, 1)
self.assertTrue("Exception: Some bug" in error_logs.output[0])
@override_settings(EMAIL_GATEWAY_PATTERN="%s@zulip.example.com")
async def test_handler_invalid_domain(self) -> None:
self.assertEqual(
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
"RCPT TO: <bogus@other.example.com>",
"QUIT",
]
),
[
"220 testhost Zulip 1.2.3\r\n",
"250 testhost\r\n",
"250 OK\r\n",
"550 5.1.1 Bad destination mailbox address: Address not recognized by gateway.\r\n",
"221 Bye\r\n",
],
)
@override_settings(EMAIL_GATEWAY_PATTERN="%s@zulip.example.com")
async def test_handler_invalid_recipient(self) -> None:
self.assertEqual(
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
"RCPT TO: <bogus@zulip.example.com>",
"QUIT",
]
),
[
"220 testhost Zulip 1.2.3\r\n",
"250 testhost\r\n",
"250 OK\r\n",
"550 5.1.1 Bad destination mailbox address: Bad stream token from email recipient bogus@zulip.example.com\r\n",
"221 Bye\r\n",
],
)
@override_settings(EMAIL_GATEWAY_PATTERN="%s@zulip.example.com")
async def test_handler_postmaster(self) -> None:
for postmaster_email in ("postmaster", "postmaster@zulip.example.com"):
self.assertEqual(
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
f"RCPT TO: <{postmaster_email}>",
"DATA",
"From: test@example.com",
f"To: {postmaster_email}",
"Subject: Email the postmaster",
"",
"Some body!",
".",
"QUIT",
]
),
[
"220 testhost Zulip 1.2.3\r\n",
"250 testhost\r\n",
"250 OK\r\n",
"250 Continue\r\n",
"354 End data with <CR><LF>.<CR><LF>\r\n",
"250 OK\r\n",
"221 Bye\r\n",
],
)
self.assert_length(mail.outbox, 1)
mail.outbox = []
@override_settings(
EMAIL_GATEWAY_PATTERN="%s@zulip.example.com", RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)]
)
async def test_handler_stream_rate_limiting(self) -> None:
stream_name = "some str"
realm = await sync_to_async(lambda: get_realm("zulip"))()
stream = await sync_to_async(lambda: ensure_stream(realm, stream_name, acting_user=None))()
hamlet = await sync_to_async(lambda: self.example_user("hamlet"))()
email_token = await sync_to_async(
lambda: get_channel_email_token(stream, creator=hamlet, sender=hamlet)
)()
email_address = encode_email_address(stream.name, email_token)
RateLimitedRealmMirror(realm).clear_history()
now = timezone_now()
with time_machine.travel(now, tick=False):
for i in (1, 2):
with mock.patch(
"zerver.lib.email_mirror_server.queue_json_publish_rollback_unsafe"
) as m:
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
f"RCPT TO: <{email_address}>",
"DATA",
f"From: {hamlet.delivery_email}",
f"To: {email_address}",
"Subject: Stream message",
"",
"Some body!",
".",
"QUIT",
]
)
m.assert_called_once()
self.assertEqual(
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
f"RCPT TO: <{email_address}>",
"QUIT",
]
),
[
"220 testhost Zulip 1.2.3\r\n",
"250 testhost\r\n",
"250 OK\r\n",
"550 4.7.0 Rate-limited due to too many emails on this realm.\r\n",
"221 Bye\r\n",
],
)
with (
time_machine.travel(now + timedelta(hours=1), tick=False),
mock.patch("zerver.lib.email_mirror_server.queue_json_publish_rollback_unsafe") as m,
):
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
f"RCPT TO: <{email_address}>",
"DATA",
f"From: {hamlet.delivery_email}",
f"To: {email_address}",
"Subject: Stream message",
"",
"Some body!",
".",
"QUIT",
]
)
m.assert_called_once()
@override_settings(EMAIL_GATEWAY_PATTERN="%s@zulip.example.com")
async def test_handler_invalid_missedmessage(self) -> None:
email_address = settings.EMAIL_GATEWAY_PATTERN % (generate_missed_message_token(),)
self.assertEqual(
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
f"RCPT TO: <{email_address}>",
"QUIT",
]
),
[
"220 testhost Zulip 1.2.3\r\n",
"250 testhost\r\n",
"250 OK\r\n",
"550 5.1.1 Bad destination mailbox address: Zulip notification reply address is invalid.\r\n",
"221 Bye\r\n",
],
)
@override_settings(EMAIL_GATEWAY_PATTERN="%s@zulip.example.com")
async def test_handler_missedmessage(self) -> None:
othello = await sync_to_async(lambda: self.example_user("othello"))()
usermessage = await sync_to_async(lambda: most_recent_usermessage(othello))()
mm_address = await sync_to_async(
lambda: create_missed_message_address(othello, usermessage.message)
)()
with mock.patch("zerver.lib.email_mirror_server.queue_json_publish_rollback_unsafe") as m:
self.assertEqual(
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
f"RCPT TO: <{mm_address}>",
"DATA",
f"From: {othello.delivery_email}",
f"To: {mm_address}",
"Subject: Missed-message reply",
"",
"Some body!",
".",
"QUIT",
]
),
[
"220 testhost Zulip 1.2.3\r\n",
"250 testhost\r\n",
"250 OK\r\n",
"250 Continue\r\n",
"354 End data with <CR><LF>.<CR><LF>\r\n",
"250 OK\r\n",
"221 Bye\r\n",
],
)
message_lines = (
f"From: {othello.delivery_email}",
f"To: {mm_address}",
"Subject: Missed-message reply",
"X-Peer: other-host:1234",
"X-MailFrom: test@example.com",
f"X-RcptTo: {mm_address}",
"",
"Some body!",
)
m.assert_called_once_with(
"email_mirror",
{
"rcpt_to": mm_address,
"msg_base64": base64.b64encode(
b"".join([line.encode() + b"\n" for line in message_lines])
).decode(),
},
)
@override_settings(EMAIL_GATEWAY_PATTERN="%s@zulip.example.com")
async def test_handler_stream(self) -> None:
stream_name = "some str"
realm = await sync_to_async(lambda: get_realm("zulip"))()
stream = await sync_to_async(lambda: ensure_stream(realm, stream_name, acting_user=None))()
hamlet = await sync_to_async(lambda: self.example_user("hamlet"))()
email_token = await sync_to_async(
lambda: get_channel_email_token(stream, creator=hamlet, sender=hamlet)
)()
email_address = encode_email_address(stream.name, email_token)
with mock.patch("zerver.lib.email_mirror_server.queue_json_publish_rollback_unsafe") as m:
self.assertEqual(
await self.handler_response(
[
"HELO localhost",
"MAIL FROM: <test@example.com>",
f"RCPT TO: <{email_address}>",
"DATA",
f"From: {hamlet.delivery_email}",
f"To: {email_address}",
"Subject: Stream message",
"",
"Some body!",
".",
"QUIT",
]
),
[
"220 testhost Zulip 1.2.3\r\n",
"250 testhost\r\n",
"250 OK\r\n",
"250 Continue\r\n",
"354 End data with <CR><LF>.<CR><LF>\r\n",
"250 OK\r\n",
"221 Bye\r\n",
],
)
message_lines = (
f"From: {hamlet.delivery_email}",
f"To: {email_address}",
"Subject: Stream message",
"X-Peer: other-host:1234",
"X-MailFrom: test@example.com",
f"X-RcptTo: {email_address}",
"",
"Some body!",
)
m.assert_called_once_with(
"email_mirror",
{
"rcpt_to": email_address,
"msg_base64": base64.b64encode(
b"".join([line.encode() + b"\n" for line in message_lines])
).decode(),
},
)

View File

@@ -13,13 +13,10 @@ import orjson
import time_machine
from django.conf import settings
from django.db.utils import IntegrityError
from django.test import override_settings
from typing_extensions import override
from zerver.lib.email_mirror import RateLimitedRealmMirror
from zerver.lib.email_mirror_helpers import encode_email_address, get_channel_email_token
from zerver.lib.queue import MAX_REQUEST_RETRIES
from zerver.lib.rate_limiter import RateLimiterLockingError
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
from zerver.lib.send_email import EmailNotDeliveredError, FromAddress
from zerver.lib.test_classes import ZulipTestCase
@@ -594,87 +591,6 @@ class WorkerTest(ZulipTestCase):
self.assertEqual(mock_mirror_email.call_count, 3)
@patch("zerver.worker.email_mirror.mirror_email")
@override_settings(RATE_LIMITING_MIRROR_REALM_RULES=[(10, 2)])
def test_mirror_worker_rate_limiting(self, mock_mirror_email: MagicMock) -> None:
fake_client = FakeClient()
realm = get_realm("zulip")
RateLimitedRealmMirror(realm).clear_history()
stream = get_stream("Denmark", realm)
hamlet = self.example_user("hamlet")
email_token = get_channel_email_token(stream, creator=hamlet, sender=hamlet)
stream_to_address = encode_email_address(stream.name, email_token)
data = [
dict(
msg_base64=base64.b64encode(b"\xf3test").decode(),
time=time.time(),
rcpt_to=stream_to_address,
),
] * 5
for element in data:
fake_client.enqueue("email_mirror", element)
with (
simulated_queue_client(fake_client),
self.assertLogs("zerver.worker.email_mirror", level="WARNING") as warn_logs,
):
start_time = time.time()
with patch("time.time", return_value=start_time):
worker = MirrorWorker()
worker.setup()
worker.start()
# Of the first 5 messages, only 2 should be processed
# (the rest being rate-limited):
self.assertEqual(mock_mirror_email.call_count, 2)
# If a new message is sent into the stream mirror, it will get rejected:
fake_client.enqueue("email_mirror", data[0])
worker.start()
self.assertEqual(mock_mirror_email.call_count, 2)
# However, message notification emails don't get rate limited:
with self.settings(EMAIL_GATEWAY_PATTERN="%s@example.com"):
address = "mm" + ("x" * 32) + "@example.com"
event = dict(
msg_base64=base64.b64encode(b"\xf3test").decode(),
time=time.time(),
rcpt_to=address,
)
fake_client.enqueue("email_mirror", event)
worker.start()
self.assertEqual(mock_mirror_email.call_count, 3)
# After some time passes, emails get accepted again:
with patch("time.time", return_value=start_time + 11.0):
fake_client.enqueue("email_mirror", data[0])
worker.start()
self.assertEqual(mock_mirror_email.call_count, 4)
# If RateLimiterLockingError is thrown, we rate-limit the new message:
with (
patch(
"zerver.lib.rate_limiter.RedisRateLimiterBackend.incr_ratelimit",
side_effect=RateLimiterLockingError,
),
self.assertLogs("zerver.lib.rate_limiter", "WARNING") as mock_warn,
):
fake_client.enqueue("email_mirror", data[0])
worker.start()
self.assertEqual(mock_mirror_email.call_count, 4)
self.assertEqual(
mock_warn.output,
[
"WARNING:zerver.lib.rate_limiter:Deadlock trying to incr_ratelimit for RateLimitedRealmMirror:zulip"
],
)
self.assertEqual(
warn_logs.output,
[
"WARNING:zerver.worker.email_mirror:MirrorWorker: Rejecting an email from: None to realm: zulip - rate limited."
]
* 5,
)
def test_email_sending_worker_retries(self) -> None:
"""Tests the retry_send_email_failures decorator to make sure it
retries sending the email 3 times and then gives up."""

View File

@@ -9,13 +9,7 @@ from typing import Any
from typing_extensions import override
from zerver.lib.email_mirror import (
decode_stream_email_address,
is_missed_message_address,
rate_limit_mirror_by_realm,
)
from zerver.lib.email_mirror import process_message as mirror_email
from zerver.lib.exceptions import RateLimitedError
from zerver.worker.base import QueueProcessingWorker, WorkerTimeoutError, assign_queue
logger = logging.getLogger(__name__)
@@ -32,20 +26,6 @@ class MirrorWorker(QueueProcessingWorker):
msg = email.parser.BytesParser(_class=EmailMessage, policy=email.policy.default).parsebytes(
content
)
if not is_missed_message_address(rcpt_to):
# Missed message addresses are one-time use, so we don't need
# to worry about emails to them resulting in message spam.
recipient_realm = decode_stream_email_address(rcpt_to)[0].realm
try:
rate_limit_mirror_by_realm(recipient_realm)
except RateLimitedError:
logger.warning(
"MirrorWorker: Rejecting an email from: %s to realm: %s - rate limited.",
msg["From"],
recipient_realm.subdomain,
)
return
try:
mirror_email(msg, rcpt_to=rcpt_to)
except WorkerTimeoutError: # nocoverage

View File

@@ -967,6 +967,9 @@ LOGGING: dict[str, Any] = {
"handlers": ["scim_file", "errors_file"],
"propagate": False,
},
"mail.log": {
"level": "WARNING",
},
"pyvips": {
"level": "ERROR",
"handlers": ["console", "errors_file"],