diff --git a/tools/linter_lib/custom_check.py b/tools/linter_lib/custom_check.py index e0a0e961b7..b4903f4af5 100644 --- a/tools/linter_lib/custom_check.py +++ b/tools/linter_lib/custom_check.py @@ -417,6 +417,7 @@ python_rules = RuleList( 'zerver/migrations/0104_fix_unreads.py', 'zerver/migrations/0206_stream_rendered_description.py', 'zerver/migrations/0209_user_profile_no_empty_password.py', + 'zerver/migrations/0260_missed_message_addresses_from_redis_to_db.py', 'pgroonga/migrations/0002_html_escape_subject.py', ]), 'description': "Don't import models or other code in migrations; see docs/subsystems/schema-migrations.md", diff --git a/zerver/lib/email_mirror.py b/zerver/lib/email_mirror.py index ece29110e1..f4b9ff5962 100644 --- a/zerver/lib/email_mirror.py +++ b/zerver/lib/email_mirror.py @@ -8,6 +8,7 @@ from email.utils import getaddresses import email.message as message from django.conf import settings +from django.utils.timezone import timedelta, now as timezone_now from zerver.lib.actions import internal_send_message, internal_send_private_message, \ internal_send_stream_message, internal_send_huddle_message, \ @@ -16,14 +17,13 @@ from zerver.lib.email_mirror_helpers import decode_email_address, \ get_email_gateway_message_string_from_address, ZulipEmailForwardError from zerver.lib.email_notifications import convert_html_to_markdown from zerver.lib.queue import queue_json_publish -from zerver.lib.redis_utils import get_redis_client -from zerver.lib.upload import upload_message_file from zerver.lib.utils import generate_random_token +from zerver.lib.upload import upload_message_file from zerver.lib.send_email import FromAddress from zerver.lib.rate_limiter import RateLimitedObject, rate_limit_entity from zerver.lib.exceptions import RateLimited -from zerver.models import Stream, Recipient, \ - get_user_profile_by_id, get_display_recipient, \ +from zerver.models import Stream, Recipient, MissedMessageEmailAddress, \ + get_display_recipient, \ Message, Realm, UserProfile, get_system_bot, get_user, get_stream_by_id_in_realm from zproject.backends import is_user_active @@ -79,12 +79,8 @@ def log_and_report(email_message: message.Message, error_message: str, to: Optio # Temporary missed message addresses -redis_client = get_redis_client() - - -def missed_message_redis_key(token: str) -> str: - return 'missed_message:' + token - +def generate_missed_message_token() -> str: + return 'mm' + generate_random_token(32) def is_missed_message_address(address: str) -> bool: try: @@ -107,8 +103,17 @@ def get_missed_message_token_from_address(address: str) -> str: if not is_mm_32_format(msg_string): raise ZulipEmailForwardError('Could not parse missed message address') - # strip off the 'mm' before returning the redis key - return msg_string[2:] + return msg_string + +def get_missed_message_address(address: str) -> MissedMessageEmailAddress: + token = get_missed_message_token_from_address(address) + try: + return MissedMessageEmailAddress.objects.select_related().get( + email_token=token, + timestamp__gt=timezone_now() - timedelta(seconds=MissedMessageEmailAddress.EXPIRY_SECONDS) + ) + except MissedMessageEmailAddress.DoesNotExist: + raise ZulipEmailForwardError("Missed message address expired or doesn't exist") def create_missed_message_address(user_profile: UserProfile, message: Message) -> str: if settings.EMAIL_GATEWAY_PATTERN == '': @@ -116,43 +121,10 @@ def create_missed_message_address(user_profile: UserProfile, message: Message) - "NOREPLY_EMAIL_ADDRESS in the 'from' field.") return FromAddress.NOREPLY - if message.recipient.type == Recipient.PERSONAL: - # We need to reply to the sender so look up their personal recipient_id - recipient_id = message.sender.recipient_id - else: - recipient_id = message.recipient_id - - data = { - 'user_profile_id': user_profile.id, - 'recipient_id': recipient_id, - 'subject': message.topic_name().encode('utf-8'), - } - - while True: - token = generate_random_token(32) - key = missed_message_redis_key(token) - if redis_client.hsetnx(key, 'uses_left', 1): - break - - with redis_client.pipeline() as pipeline: - pipeline.hmset(key, data) - pipeline.expire(key, 60 * 60 * 24 * 5) - pipeline.execute() - - address = 'mm' + token - return settings.EMAIL_GATEWAY_PATTERN % (address,) - - -def mark_missed_message_address_as_used(address: str) -> None: - token = get_missed_message_token_from_address(address) - key = missed_message_redis_key(token) - with redis_client.pipeline() as pipeline: - pipeline.hincrby(key, 'uses_left', -1) - pipeline.expire(key, 60 * 60 * 24 * 5) - new_value = pipeline.execute()[0] - if new_value < 0: - redis_client.delete(key) - raise ZulipEmailForwardError('Missed message address has already been used') + mm_address = MissedMessageEmailAddress.objects.create(message=message, + user_profile=user_profile, + email_token=generate_missed_message_token()) + return str(mm_address) def construct_zulip_body(message: message.Message, realm: Realm, show_sender: bool=False, include_quotes: bool=False, include_footer: bool=False) -> str: @@ -174,18 +146,23 @@ def construct_zulip_body(message: message.Message, realm: Realm, show_sender: bo return body def send_to_missed_message_address(address: str, message: message.Message) -> None: - token = get_missed_message_token_from_address(address) - key = missed_message_redis_key(token) - result = redis_client.hmget(key, 'user_profile_id', 'recipient_id', 'subject') - if not all(val is not None for val in result): - raise ZulipEmailForwardError('Missing missed message address data') - user_profile_id, recipient_id, subject_b = result # type: (bytes, bytes, bytes) + mm_address = get_missed_message_address(address) + if not mm_address.is_usable(): + raise ZulipEmailForwardError("Missed message address out of uses.") + mm_address.increment_times_used() + + user_profile = mm_address.user_profile + topic = mm_address.message.topic_name() + + if mm_address.message.recipient.type == Recipient.PERSONAL: + # We need to reply to the sender so look up their personal recipient_id + recipient = mm_address.message.sender.recipient + else: + recipient = mm_address.message.recipient - user_profile = get_user_profile_by_id(user_profile_id) if not is_user_active(user_profile): logger.warning("Sending user is not active. Ignoring this missed message email.") return - recipient = Recipient.objects.get(id=recipient_id) body = construct_zulip_body(message, user_profile.realm) @@ -193,7 +170,7 @@ def send_to_missed_message_address(address: str, message: message.Message) -> No stream = get_stream_by_id_in_realm(recipient.type_id, user_profile.realm) internal_send_stream_message( user_profile.realm, user_profile, stream, - subject_b.decode('utf-8'), body + topic, body ) recipient_str = stream.name elif recipient.type == Recipient.PERSONAL: @@ -372,12 +349,10 @@ def process_stream_message(to: str, message: message.Message) -> None: logger.info("Successfully processed email to %s (%s)" % ( stream.name, stream.realm.string_id)) -def process_missed_message(to: str, message: message.Message, pre_checked: bool) -> None: - if not pre_checked: - mark_missed_message_address_as_used(to) +def process_missed_message(to: str, message: message.Message) -> None: send_to_missed_message_address(to, message) -def process_message(message: message.Message, rcpt_to: Optional[str]=None, pre_checked: bool=False) -> None: +def process_message(message: message.Message, rcpt_to: Optional[str]=None) -> None: to = None # type: Optional[str] try: @@ -387,7 +362,7 @@ def process_message(message: message.Message, rcpt_to: Optional[str]=None, pre_c to = find_emailgateway_recipient(message) if is_missed_message_address(to): - process_missed_message(to, message, pre_checked) + process_missed_message(to, message) else: process_stream_message(to, message) except ZulipEmailForwardError as e: @@ -401,7 +376,9 @@ def mirror_email_message(data: Dict[str, str]) -> Dict[str, str]: rcpt_to = data['recipient'] if is_missed_message_address(rcpt_to): try: - mark_missed_message_address_as_used(rcpt_to) + mm_address = get_missed_message_address(rcpt_to) + if not mm_address.is_usable(): + raise ZulipEmailForwardError("Missed message address out of uses.") except ZulipEmailForwardError: return { "status": "error", diff --git a/zerver/migrations/0260_missed_message_addresses_from_redis_to_db.py b/zerver/migrations/0260_missed_message_addresses_from_redis_to_db.py new file mode 100644 index 0000000000..dc49fdbc1a --- /dev/null +++ b/zerver/migrations/0260_missed_message_addresses_from_redis_to_db.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +from django.core.exceptions import ObjectDoesNotExist +from django.db import migrations +from django.db.backends.postgresql_psycopg2.schema import DatabaseSchemaEditor +from django.db.migrations.state import StateApps + +# Imported to avoid needing to duplicate redis-related code. +from zerver.lib.redis_utils import get_redis_client +from zerver.lib.utils import generate_random_token + +def generate_missed_message_token() -> str: + return 'mm' + generate_random_token(32) + +def move_missed_message_addresses_to_database(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None: + redis_client = get_redis_client() + MissedMessageEmailAddress = apps.get_model('zerver', 'MissedMessageEmailAddress') + UserProfile = apps.get_model('zerver', 'UserProfile') + Message = apps.get_model('zerver', 'Message') + Recipient = apps.get_model('zerver', 'Recipient') + RECIPIENT_PERSONAL = 1 + RECIPIENT_STREAM = 2 + + all_mm_keys = redis_client.keys('missed_message:*') + for key in all_mm_keys: + # Don't migrate mm addresses that have already been used. + if redis_client.hincrby(key, 'uses_left', -1) < 0: + redis_client.delete(key) + continue + + result = redis_client.hmget(key, 'user_profile_id', 'recipient_id', 'subject') + if not all(val is not None for val in result): + # Missing data, skip this key; this should never happen + redis_client.delete(key) + continue + + user_profile_id, recipient_id, subject_b = result # type: (bytes, bytes, bytes) + topic_name = subject_b.decode('utf-8') + + # The data model for missed-message emails has changed in two + # key ways: We're moving it from redis to the database for + # better persistence, and also replacing the stream + topic + # (as the reply location) with a message to reply to. Because + # the redis data structure only had stream/topic pairs, we use + # the following migration logic to find the latest message in + # the thread indicated by the redis data (if it exists). + try: + user_profile = UserProfile.objects.get(id=user_profile_id) + recipient = Recipient.objects.get(id=recipient_id) + + if recipient.type == RECIPIENT_STREAM: + message = Message.objects.filter(subject__iexact=topic_name, + recipient_id=recipient.id).latest('id') + elif recipient.type == RECIPIENT_PERSONAL: + # Tie to the latest PM from the sender to this user; + # we expect at least one existed because it generated + # this missed-message email, so we can skip the + # normally required additioanl check for messages we + # ourselves sent to the target user. + message = Message.objects.filter(recipient_id=user_profile.recipient_id, + sender_id=recipient.type_id).latest('id') + else: + message = Message.objects.filter(recipient_id=recipient.id).latest('id') + except ObjectDoesNotExist: + # If all messages in the original thread were deleted or + # had their topics edited, we can't find an appropriate + # message to tag; we just skip migrating this message. + # The consequence (replies to this particular + # missed-message email bouncing) is acceptable. + redis_client.delete(key) + continue + + # The timestamp will be set to the default (now) which means + # the address will take longer to expire than it would have in + # redis, but this small issue is probably worth the simplicity + # of not having to figure out the precise timestamp. + MissedMessageEmailAddress.objects.create(message=message, + user_profile=user_profile, + email_token=generate_missed_message_token()) + # We successfully transferred this missed-message email's data + # to the database, so this message can be deleted from redis. + redis_client.delete(key) + +class Migration(migrations.Migration): + # Atomicity is not feasible here, since we're doing operations on redis too. + # It's better to be non-atomic on both redis and database, than atomic + # on the database and not on redis. + atomic = False + + dependencies = [ + ('zerver', '0259_missedmessageemailaddress'), + ] + + operations = [ + migrations.RunPython(move_missed_message_addresses_to_database, reverse_code=migrations.RunPython.noop), + ] diff --git a/zerver/models.py b/zerver/models.py index d4aa0131c4..e1d7d40288 100644 --- a/zerver/models.py +++ b/zerver/models.py @@ -2511,6 +2511,7 @@ class ScheduledEmail(AbstractScheduledJob): class MissedMessageEmailAddress(models.Model): EXPIRY_SECONDS = 60 * 60 * 24 * 5 + ALLOWED_USES = 1 message = models.ForeignKey(Message, on_delete=CASCADE) # type: Message user_profile = models.ForeignKey(UserProfile, on_delete=CASCADE) # type: UserProfile @@ -2524,6 +2525,15 @@ class MissedMessageEmailAddress(models.Model): def __str__(self) -> str: return settings.EMAIL_GATEWAY_PATTERN % (self.email_token,) + def is_usable(self) -> bool: + not_expired = timezone_now() <= self.timestamp + timedelta(seconds=self.EXPIRY_SECONDS) + has_uses_left = self.times_used < self.ALLOWED_USES + return has_uses_left and not_expired + + def increment_times_used(self) -> None: + self.times_used += 1 + self.save(update_fields=["times_used"]) + class ScheduledMessage(models.Model): sender = models.ForeignKey(UserProfile, on_delete=CASCADE) # type: UserProfile recipient = models.ForeignKey(Recipient, on_delete=CASCADE) # type: Recipient diff --git a/zerver/tests/test_email_mirror.py b/zerver/tests/test_email_mirror.py index fa18b8b463..b3138e3aa5 100644 --- a/zerver/tests/test_email_mirror.py +++ b/zerver/tests/test_email_mirror.py @@ -18,13 +18,15 @@ from zerver.models import ( get_realm, get_stream, get_system_bot, + MissedMessageEmailAddress, Recipient, ) from zerver.lib.actions import ensure_stream, do_deactivate_realm, do_deactivate_user from zerver.lib.email_mirror import ( - process_message, process_missed_message, + process_message, + process_missed_message, create_missed_message_address, get_missed_message_token_from_address, strip_from_subject, @@ -44,6 +46,7 @@ from zerver.lib.email_mirror_helpers import ( from zerver.lib.email_notifications import convert_html_to_markdown from zerver.lib.send_email import FromAddress +from zerver.worker.queue_processors import MirrorWorker from email import message_from_string from email.mime.text import MIMEText @@ -55,7 +58,7 @@ import mock import os from django.conf import settings -from typing import Any, Callable, Dict, Mapping, Union, Optional +from typing import Any, Callable, Dict, Mapping, Optional class TestEncodeDecode(ZulipTestCase): def _assert_options(self, options: Dict[str, bool], show_sender: bool=False, @@ -164,7 +167,7 @@ class TestGetMissedMessageToken(ZulipTestCase): address = 'mm' + ('x' * 32) + '@example.com' self.assertTrue(is_missed_message_address(address)) token = get_missed_message_token_from_address(address) - self.assertEqual(token, 'x' * 32) + self.assertEqual(token, 'mm' + 'x' * 32) # This next section was a bug at one point--we'd treat ordinary # user addresses that happened to begin with "mm" as being @@ -511,41 +514,6 @@ class TestStreamEmailMessagesEmptyBody(ZulipTestCase): self.assertEqual(message.content, "(No email body)") -class TestMissedMessageEmailMessageTokenMissingData(ZulipTestCase): - # Test for the case "if not all(val is not None for val in result):" - # on result returned by redis_client.hmget in send_to_missed_message_address: - def test_receive_missed_message_email_token_missing_data(self) -> None: - email = self.example_email('hamlet') - self.login(email) - result = self.client_post("/json/messages", {"type": "private", - "content": "test_receive_missed_message_email_token_missing_data", - "client": "test suite", - "to": self.example_email('othello')}) - self.assert_json_success(result) - - user_profile = self.example_user('othello') - usermessage = most_recent_usermessage(user_profile) - - mm_address = create_missed_message_address(user_profile, usermessage.message) - - incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body') - - incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject' - incoming_valid_message['From'] = self.example_email('othello') - incoming_valid_message['To'] = mm_address - incoming_valid_message['Reply-to'] = self.example_email('othello') - - # We need to force redis_client.hmget to return some None values: - with mock.patch('zerver.lib.email_mirror.redis_client.hmget', - return_value=[None, None, None]): - exception_message = '' - try: - process_missed_message(mm_address, incoming_valid_message, False) - except ZulipEmailForwardError as e: - exception_message = str(e) - - self.assertEqual(exception_message, 'Missing missed message address data') - class TestMissedMessageEmailMessages(ZulipTestCase): def test_receive_missed_personal_message_email_messages(self) -> None: @@ -672,6 +640,47 @@ class TestMissedMessageEmailMessages(ZulipTestCase): self.assertEqual(message.recipient.type, Recipient.STREAM) self.assertEqual(message.recipient.id, usermessage.message.recipient.id) + def test_missed_stream_message_email_response_tracks_topic_change(self) -> None: + self.subscribe(self.example_user("hamlet"), "Denmark") + self.subscribe(self.example_user("othello"), "Denmark") + email = self.example_email('hamlet') + self.login(email) + result = self.client_post("/json/messages", {"type": "stream", + "topic": "test topic", + "content": "test_receive_missed_stream_message_email_messages", + "client": "test suite", + "to": "Denmark"}) + self.assert_json_success(result) + + user_profile = self.example_user('othello') + usermessage = most_recent_usermessage(user_profile) + + mm_address = create_missed_message_address(user_profile, usermessage.message) + + # The mm address has been generated, now we change the topic of the message and see + # if the response to the mm address will be correctly posted with the updated topic. + usermessage.message.subject = "updated topic" + usermessage.message.save(update_fields=["subject"]) + + incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body') + + incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject' + incoming_valid_message['From'] = self.example_email('othello') + incoming_valid_message['To'] = mm_address + incoming_valid_message['Reply-to'] = self.example_email('othello') + + process_message(incoming_valid_message) + + # confirm that Hamlet got the message + user_profile = self.example_user('hamlet') + message = most_recent_message(user_profile) + + self.assertEqual(message.subject, "updated topic") + self.assertEqual(message.content, "TestMissedMessageEmailMessages Body") + self.assertEqual(message.sender, self.example_user('othello')) + self.assertEqual(message.recipient.type, Recipient.STREAM) + self.assertEqual(message.recipient.id, usermessage.message.recipient.id) + def test_missed_message_email_response_from_deactivated_user(self) -> None: self.subscribe(self.example_user("hamlet"), "Denmark") self.subscribe(self.example_user("othello"), "Denmark") @@ -736,6 +745,36 @@ class TestMissedMessageEmailMessages(ZulipTestCase): # Since othello's realm is deactivated, his message shouldn't be posted: self.assertEqual(initial_last_message, self.get_last_message()) + def test_missed_message_email_multiple_responses(self) -> None: + self.subscribe(self.example_user("hamlet"), "Denmark") + self.subscribe(self.example_user("othello"), "Denmark") + email = self.example_email('hamlet') + self.login(email) + + result = self.client_post("/json/messages", {"type": "stream", + "topic": "test topic", + "content": "test_receive_missed_stream_message_email_messages", + "client": "test suite", + "to": "Denmark"}) + self.assert_json_success(result) + + user_profile = self.example_user('othello') + message = most_recent_message(user_profile) + + mm_address = create_missed_message_address(user_profile, message) + incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body') + + incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject' + incoming_valid_message['From'] = self.example_email('othello') + incoming_valid_message['To'] = mm_address + incoming_valid_message['Reply-to'] = self.example_email('othello') + + for i in range(0, MissedMessageEmailAddress.ALLOWED_USES): + process_missed_message(mm_address, incoming_valid_message) + + with self.assertRaises(ZulipEmailForwardError): + process_missed_message(mm_address, incoming_valid_message) + class TestEmptyGatewaySetting(ZulipTestCase): def test_missed_message(self) -> None: email = self.example_email('othello') @@ -932,10 +971,14 @@ class TestEmailMirrorTornadoView(ZulipTestCase): mail = mail_template.format(stream_to_address=to_address, sender=sender) def check_queue_json_publish(queue_name: str, - event: Union[Mapping[str, Any], str], + event: Mapping[str, Any], processor: Optional[Callable[[Any], None]]=None) -> None: self.assertEqual(queue_name, "email_mirror") self.assertEqual(event, {"rcpt_to": to_address, "message": mail}) + MirrorWorker().consume(event) + + self.assertEqual(self.get_last_message().content, + "This is a plain-text message for testing Zulip.") mock_queue_json_publish.side_effect = check_queue_json_publish request_data = { @@ -980,9 +1023,12 @@ class TestEmailMirrorTornadoView(ZulipTestCase): result = self.send_offline_message(mm_address, self.example_email('cordelia')) self.assert_json_success(result) - def test_using_mm_address_twice(self) -> None: + def test_using_mm_address_multiple_times(self) -> None: mm_address = self.send_private_message() - self.send_offline_message(mm_address, self.example_email('cordelia')) + for i in range(0, MissedMessageEmailAddress.ALLOWED_USES): + result = self.send_offline_message(mm_address, self.example_email('cordelia')) + self.assert_json_success(result) + result = self.send_offline_message(mm_address, self.example_email('cordelia')) self.assert_json_error( result, diff --git a/zerver/tests/test_email_notifications.py b/zerver/tests/test_email_notifications.py index a8e61edd68..d9e5331153 100644 --- a/zerver/tests/test_email_notifications.py +++ b/zerver/tests/test_email_notifications.py @@ -134,7 +134,7 @@ class TestMissedMessages(ZulipTestCase): return re.sub(r'\s+', ' ', s) def _get_tokens(self) -> List[str]: - return [str(random.getrandbits(32)) for _ in range(30)] + return ['mm' + str(random.getrandbits(32)) for _ in range(30)] def _test_cases(self, msg_id: int, body: str, email_subject: str, send_as_user: bool, verify_html_body: bool=False, @@ -144,10 +144,10 @@ class TestMissedMessages(ZulipTestCase): othello = self.example_user('othello') hamlet = self.example_user('hamlet') tokens = self._get_tokens() - with patch('zerver.lib.email_mirror.generate_random_token', side_effect=tokens): + with patch('zerver.lib.email_mirror.generate_missed_message_token', side_effect=tokens): handle_missedmessage_emails(hamlet.id, [{'message_id': msg_id, 'trigger': trigger}]) if settings.EMAIL_GATEWAY_PATTERN != "": - reply_to_addresses = [settings.EMAIL_GATEWAY_PATTERN % (u'mm' + t,) for t in tokens] + reply_to_addresses = [settings.EMAIL_GATEWAY_PATTERN % (t,) for t in tokens] reply_to_emails = [formataddr(("Zulip", address)) for address in reply_to_addresses] else: reply_to_emails = ["noreply@testserver"] diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 7a2bb1d92f..a861bdb912 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -563,7 +563,7 @@ class MirrorWorker(QueueProcessingWorker): return mirror_email(email.message_from_string(event["message"]), - rcpt_to=rcpt_to, pre_checked=True) + rcpt_to=rcpt_to) @assign_queue('test', queue_type="test") class TestWorker(QueueProcessingWorker):