mirror of
https://github.com/zulip/zulip.git
synced 2025-11-20 22:48:16 +00:00
This helper does some nice things like printing out the data structure incase of failure.
172 lines
7.0 KiB
Python
172 lines
7.0 KiB
Python
from smtplib import SMTP, SMTPDataError, SMTPException, SMTPRecipientsRefused
|
|
from unittest import mock
|
|
|
|
from django.core.mail.backends.locmem import EmailBackend
|
|
from django.core.mail.backends.smtp import EmailBackend as SMTPBackend
|
|
from django.core.mail.message import sanitize_address
|
|
|
|
from zerver.lib.send_email import (
|
|
EmailNotDeliveredException,
|
|
FromAddress,
|
|
build_email,
|
|
initialize_connection,
|
|
logger,
|
|
send_email,
|
|
)
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
|
|
|
|
class TestBuildEmail(ZulipTestCase):
|
|
def test_build_SES_compatible_From_field_limit(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
limit_length_name = "a" * (320 - len(sanitize_address(FromAddress.NOREPLY, "utf-8")) - 3)
|
|
mail = build_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=limit_length_name,
|
|
from_address=FromAddress.NOREPLY,
|
|
language="en",
|
|
)
|
|
self.assertEqual(
|
|
mail.extra_headers["From"], "{} <{}>".format(limit_length_name, FromAddress.NOREPLY)
|
|
)
|
|
|
|
def test_build_and_send_SES_incompatible_From_address(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
from_name = "Zulip"
|
|
# Address by itself is > 320 bytes even without the name. Should trigger exception.
|
|
overly_long_address = "a" * 320 + "@zulip.com"
|
|
mail = build_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=from_name,
|
|
from_address=overly_long_address,
|
|
language="en",
|
|
)
|
|
self.assertEqual(mail.extra_headers["From"], overly_long_address)
|
|
self.assertTrue(len(sanitize_address(mail.extra_headers["From"], "utf-8")) > 320)
|
|
|
|
with mock.patch.object(
|
|
EmailBackend, "send_messages", side_effect=SMTPDataError(242, "From field too long.")
|
|
):
|
|
with self.assertLogs(logger=logger) as info_log:
|
|
with self.assertRaises(EmailNotDeliveredException):
|
|
send_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=from_name,
|
|
from_address=overly_long_address,
|
|
language="en",
|
|
)
|
|
self.assert_length(info_log.records, 2)
|
|
self.assertEqual(
|
|
info_log.output[0], f"INFO:{logger.name}:Sending password_reset email to {mail.to}"
|
|
)
|
|
self.assertTrue(
|
|
info_log.output[1].startswith(
|
|
f"ERROR:{logger.name}:Error sending password_reset email to {mail.to} with error code 242: From field too long."
|
|
)
|
|
)
|
|
|
|
def test_build_and_send_refused(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
address = "zulip@example.com"
|
|
with mock.patch.object(
|
|
EmailBackend,
|
|
"send_messages",
|
|
side_effect=SMTPRecipientsRefused(recipients={address: (550, b"User unknown")}),
|
|
):
|
|
with self.assertLogs(logger=logger) as info_log:
|
|
with self.assertRaises(EmailNotDeliveredException):
|
|
send_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name="Zulip",
|
|
from_address=address,
|
|
language="en",
|
|
)
|
|
self.assert_length(info_log.records, 2)
|
|
self.assertEqual(
|
|
info_log.output[0], f"INFO:{logger.name}:Sending password_reset email to {[hamlet]}"
|
|
)
|
|
self.assertFalse(
|
|
info_log.output[1].startswith(
|
|
f"ERROR:{logger.name}:Error sending password_reset email to {[hamlet]}: {{'{address}': (550, 'User unknown')}}"
|
|
)
|
|
)
|
|
|
|
|
|
class TestSendEmail(ZulipTestCase):
|
|
def test_initialize_connection(self) -> None:
|
|
# Test the new connection case
|
|
with mock.patch.object(EmailBackend, "open", return_value=True):
|
|
backend = initialize_connection(None)
|
|
self.assertTrue(isinstance(backend, EmailBackend))
|
|
|
|
backend = mock.MagicMock(spec=SMTPBackend)
|
|
backend.connection = mock.MagicMock(spec=SMTP)
|
|
|
|
self.assertTrue(isinstance(backend, SMTPBackend))
|
|
|
|
# Test the old connection case when it is still open
|
|
backend.open.return_value = False
|
|
backend.connection.noop.return_value = [250]
|
|
initialize_connection(backend)
|
|
self.assertEqual(backend.open.call_count, 1)
|
|
self.assertEqual(backend.connection.noop.call_count, 1)
|
|
|
|
# Test the old connection case when it was closed by the server
|
|
backend.connection.noop.return_value = [404]
|
|
backend.close.return_value = False
|
|
initialize_connection(backend)
|
|
# 2 more calls to open, 1 more call to noop and 1 call to close
|
|
self.assertEqual(backend.open.call_count, 3)
|
|
self.assertEqual(backend.connection.noop.call_count, 2)
|
|
self.assertEqual(backend.close.call_count, 1)
|
|
|
|
# Test backoff procedure
|
|
backend.open.side_effect = OSError
|
|
with self.assertRaises(OSError):
|
|
initialize_connection(backend)
|
|
# 3 more calls to open as we try 3 times before giving up
|
|
self.assertEqual(backend.open.call_count, 6)
|
|
|
|
def test_send_email_exceptions(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
from_name = FromAddress.security_email_from_name(language="en")
|
|
# Used to check the output
|
|
mail = build_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=from_name,
|
|
from_address=FromAddress.NOREPLY,
|
|
language="en",
|
|
)
|
|
self.assertEqual(
|
|
mail.extra_headers["From"], "{} <{}>".format(from_name, FromAddress.NOREPLY)
|
|
)
|
|
|
|
# We test the two cases that should raise an EmailNotDeliveredException
|
|
errors = {
|
|
f"Unknown error sending password_reset email to {mail.to}": [0],
|
|
f"Error sending password_reset email to {mail.to}": [SMTPException()],
|
|
}
|
|
|
|
for message, side_effect in errors.items():
|
|
with mock.patch.object(EmailBackend, "send_messages", side_effect=side_effect):
|
|
with self.assertLogs(logger=logger) as info_log:
|
|
with self.assertRaises(EmailNotDeliveredException):
|
|
send_email(
|
|
"zerver/emails/password_reset",
|
|
to_emails=[hamlet],
|
|
from_name=from_name,
|
|
from_address=FromAddress.NOREPLY,
|
|
language="en",
|
|
)
|
|
self.assert_length(info_log.records, 2)
|
|
self.assertEqual(
|
|
info_log.output[0],
|
|
f"INFO:{logger.name}:Sending password_reset email to {mail.to}",
|
|
)
|
|
self.assertTrue(info_log.output[1].startswith(f"ERROR:zulip.send_email:{message}"))
|