mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	This commit prefixes stream names in urls with stream ids, so that the urls don't break when we rename streams. strean name: foo bar.com% before: #narrow/stream/foo.20bar.2Ecom.25 after: #narrow/stream/20-foo-bar.2Ecom.25 For new realms, everything is simple under the new scheme, since we just parse out the stream id every time to figure out where to narrow. For old realms, any old URLs will still work under the new scheme, assuming the stream hasn't been renamed (and of course old urls wouldn't have survived stream renaming in the first place). The one exception is the hopefully rare case of a stream name starting with something like "99-" and colliding with another stream whose id is 99. The way that we enocde the stream name portion of the URL is kind of unimportant now, since we really only look at the stream id, but we still want a safe encoding of the name that is mostly human readable, so we now convert spaces to dashes in the stream name. Also, we try to ensure more code on both sides (frontend and backend) calls common functions to do the encoding. Fixes #4713
		
			
				
	
	
		
			585 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			585 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
import subprocess
 | 
						|
 | 
						|
from django.http import HttpResponse
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
 | 
						|
from zerver.lib.test_helpers import (
 | 
						|
    most_recent_message,
 | 
						|
    most_recent_usermessage,
 | 
						|
    POSTRequestMock)
 | 
						|
 | 
						|
from zerver.lib.test_classes import (
 | 
						|
    ZulipTestCase,
 | 
						|
)
 | 
						|
 | 
						|
from zerver.models import (
 | 
						|
    get_display_recipient,
 | 
						|
    get_realm,
 | 
						|
    get_stream,
 | 
						|
    get_client,
 | 
						|
    Recipient,
 | 
						|
    UserProfile,
 | 
						|
    UserActivity,
 | 
						|
    Realm
 | 
						|
)
 | 
						|
 | 
						|
from zerver.lib.actions import (
 | 
						|
    create_stream_if_needed,
 | 
						|
    encode_email_address,
 | 
						|
    do_create_user
 | 
						|
)
 | 
						|
from zerver.lib.email_mirror import (
 | 
						|
    process_message, process_stream_message, ZulipEmailForwardError,
 | 
						|
    create_missed_message_address,
 | 
						|
    get_missed_message_token_from_address,
 | 
						|
)
 | 
						|
 | 
						|
from zerver.lib.digest import gather_new_streams, handle_digest_email, enqueue_emails
 | 
						|
from zerver.lib.send_email import FromAddress
 | 
						|
from zerver.lib.notifications import (
 | 
						|
    handle_missedmessage_emails,
 | 
						|
)
 | 
						|
from zerver.management.commands import email_mirror
 | 
						|
 | 
						|
from email.mime.text import MIMEText
 | 
						|
 | 
						|
import datetime
 | 
						|
import time
 | 
						|
import re
 | 
						|
import ujson
 | 
						|
import mock
 | 
						|
import os
 | 
						|
import sys
 | 
						|
from io import StringIO
 | 
						|
from django.conf import settings
 | 
						|
 | 
						|
from typing import Any, Callable, Dict, Mapping, Union, Text
 | 
						|
 | 
						|
class TestEmailMirrorLibrary(ZulipTestCase):
 | 
						|
    def test_get_missed_message_token(self) -> None:
 | 
						|
 | 
						|
        def get_token(address: Text) -> Text:
 | 
						|
            with self.settings(EMAIL_GATEWAY_PATTERN="%s@example.com"):
 | 
						|
                return get_missed_message_token_from_address(address)
 | 
						|
 | 
						|
        address = 'mm' + ('x' * 32) + '@example.com'
 | 
						|
        token = get_token(address)
 | 
						|
        self.assertEqual(token, 'x' * 32)
 | 
						|
 | 
						|
        # This next section was a bug at one point--we'd treat ordinary
 | 
						|
        # user addresses that happened to begin with "mm" as being
 | 
						|
        # the special mm+32chars tokens.
 | 
						|
        address = 'mmathers@example.com'
 | 
						|
        with self.assertRaises(ZulipEmailForwardError):
 | 
						|
            get_token(address)
 | 
						|
 | 
						|
        # Now test the case where we our address does not match the
 | 
						|
        # EMAIL_GATEWAY_PATTERN.
 | 
						|
        # This used to crash in an ugly way; we want to throw a proper
 | 
						|
        # exception.
 | 
						|
        address = 'alice@not-the-domain-we-were-expecting.com'
 | 
						|
        with self.assertRaises(ZulipEmailForwardError):
 | 
						|
            get_token(address)
 | 
						|
 | 
						|
class TestStreamEmailMessagesSuccess(ZulipTestCase):
 | 
						|
    def test_receive_stream_email_messages_success(self) -> None:
 | 
						|
 | 
						|
        # build dummy messages for stream
 | 
						|
        # test valid incoming stream message is processed properly
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        self.login(user_profile.email)
 | 
						|
        self.subscribe(user_profile, "Denmark")
 | 
						|
        stream = get_stream("Denmark", user_profile.realm)
 | 
						|
 | 
						|
        stream_to_address = encode_email_address(stream)
 | 
						|
 | 
						|
        incoming_valid_message = MIMEText('TestStreamEmailMessages Body')  # type: Any # https://github.com/python/typeshed/issues/275
 | 
						|
 | 
						|
        incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
 | 
						|
        incoming_valid_message['From'] = self.example_email('hamlet')
 | 
						|
        incoming_valid_message['To'] = stream_to_address
 | 
						|
        incoming_valid_message['Reply-to'] = self.example_email('othello')
 | 
						|
 | 
						|
        process_message(incoming_valid_message)
 | 
						|
 | 
						|
        # Hamlet is subscribed to this stream so should see the email message from Othello.
 | 
						|
        message = most_recent_message(user_profile)
 | 
						|
 | 
						|
        self.assertEqual(message.content, "TestStreamEmailMessages Body")
 | 
						|
        self.assertEqual(get_display_recipient(message.recipient), stream.name)
 | 
						|
        self.assertEqual(message.topic_name(), incoming_valid_message['Subject'])
 | 
						|
 | 
						|
class TestStreamEmailMessagesEmptyBody(ZulipTestCase):
 | 
						|
    def test_receive_stream_email_messages_empty_body(self) -> None:
 | 
						|
 | 
						|
        # build dummy messages for stream
 | 
						|
        # test message with empty body is not sent
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        self.login(user_profile.email)
 | 
						|
        self.subscribe(user_profile, "Denmark")
 | 
						|
        stream = get_stream("Denmark", user_profile.realm)
 | 
						|
 | 
						|
        stream_to_address = encode_email_address(stream)
 | 
						|
        headers = {}
 | 
						|
        headers['Reply-To'] = self.example_email('othello')
 | 
						|
 | 
						|
        # empty body
 | 
						|
        incoming_valid_message = MIMEText('')  # type: Any # https://github.com/python/typeshed/issues/275
 | 
						|
 | 
						|
        incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
 | 
						|
        incoming_valid_message['From'] = self.example_email('hamlet')
 | 
						|
        incoming_valid_message['To'] = stream_to_address
 | 
						|
        incoming_valid_message['Reply-to'] = self.example_email('othello')
 | 
						|
 | 
						|
        exception_message = ""
 | 
						|
        debug_info = {}  # type: Dict[str, Any]
 | 
						|
 | 
						|
        # process_message eats the exception & logs an error which can't be parsed here
 | 
						|
        # so calling process_stream_message directly
 | 
						|
        try:
 | 
						|
            process_stream_message(incoming_valid_message['To'],
 | 
						|
                                   incoming_valid_message['Subject'],
 | 
						|
                                   incoming_valid_message,
 | 
						|
                                   debug_info)
 | 
						|
        except ZulipEmailForwardError as e:
 | 
						|
            # empty body throws exception
 | 
						|
            exception_message = str(e)
 | 
						|
        self.assertEqual(exception_message, "Unable to find plaintext or HTML message body")
 | 
						|
 | 
						|
class TestMissedPersonalMessageEmailMessages(ZulipTestCase):
 | 
						|
    def test_receive_missed_personal_message_email_messages(self) -> None:
 | 
						|
 | 
						|
        # build dummy messages for missed messages email reply
 | 
						|
        # have Hamlet send Othello a PM. Othello will reply via email
 | 
						|
        # Hamlet will receive the message.
 | 
						|
        email = self.example_email('hamlet')
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/messages", {"type": "private",
 | 
						|
                                                     "content": "test_receive_missed_message_email_messages",
 | 
						|
                                                     "client": "test suite",
 | 
						|
                                                     "to": self.example_email('othello')})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        user_profile = self.example_user('othello')
 | 
						|
        usermessage = most_recent_usermessage(user_profile)
 | 
						|
 | 
						|
        # we don't want to send actual emails but we do need to create and store the
 | 
						|
        # token for looking up who did reply.
 | 
						|
        mm_address = create_missed_message_address(user_profile, usermessage.message)
 | 
						|
 | 
						|
        incoming_valid_message = MIMEText('TestMissedMessageEmailMessages Body')  # type: Any # https://github.com/python/typeshed/issues/275
 | 
						|
 | 
						|
        incoming_valid_message['Subject'] = 'TestMissedMessageEmailMessages Subject'
 | 
						|
        incoming_valid_message['From'] = self.example_email('othello')
 | 
						|
        incoming_valid_message['To'] = mm_address
 | 
						|
        incoming_valid_message['Reply-to'] = self.example_email('othello')
 | 
						|
 | 
						|
        process_message(incoming_valid_message)
 | 
						|
 | 
						|
        # self.login(self.example_email("hamlet"))
 | 
						|
        # confirm that Hamlet got the message
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        message = most_recent_message(user_profile)
 | 
						|
 | 
						|
        self.assertEqual(message.content, "TestMissedMessageEmailMessages Body")
 | 
						|
        self.assertEqual(message.sender, self.example_user('othello'))
 | 
						|
        self.assertEqual(message.recipient.id, user_profile.id)
 | 
						|
        self.assertEqual(message.recipient.type, Recipient.PERSONAL)
 | 
						|
 | 
						|
class TestMissedHuddleMessageEmailMessages(ZulipTestCase):
 | 
						|
    def test_receive_missed_huddle_message_email_messages(self) -> None:
 | 
						|
 | 
						|
        # build dummy messages for missed messages email reply
 | 
						|
        # have Othello send Iago and Cordelia a PM. Cordelia will reply via email
 | 
						|
        # Iago and Othello will receive the message.
 | 
						|
        email = self.example_email('othello')
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/messages", {"type": "private",
 | 
						|
                                                     "content": "test_receive_missed_message_email_messages",
 | 
						|
                                                     "client": "test suite",
 | 
						|
                                                     "to": ujson.dumps([self.example_email('cordelia'),
 | 
						|
                                                                        self.example_email('iago')])})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        user_profile = self.example_user('cordelia')
 | 
						|
        usermessage = most_recent_usermessage(user_profile)
 | 
						|
 | 
						|
        # we don't want to send actual emails but we do need to create and store the
 | 
						|
        # token for looking up who did reply.
 | 
						|
        mm_address = create_missed_message_address(user_profile, usermessage.message)
 | 
						|
 | 
						|
        incoming_valid_message = MIMEText('TestMissedHuddleMessageEmailMessages Body')  # type: Any # https://github.com/python/typeshed/issues/275
 | 
						|
 | 
						|
        incoming_valid_message['Subject'] = 'TestMissedHuddleMessageEmailMessages Subject'
 | 
						|
        incoming_valid_message['From'] = self.example_email('cordelia')
 | 
						|
        incoming_valid_message['To'] = mm_address
 | 
						|
        incoming_valid_message['Reply-to'] = self.example_email('cordelia')
 | 
						|
 | 
						|
        process_message(incoming_valid_message)
 | 
						|
 | 
						|
        # Confirm Iago received the message.
 | 
						|
        user_profile = self.example_user('iago')
 | 
						|
        message = most_recent_message(user_profile)
 | 
						|
 | 
						|
        self.assertEqual(message.content, "TestMissedHuddleMessageEmailMessages Body")
 | 
						|
        self.assertEqual(message.sender, self.example_user('cordelia'))
 | 
						|
        self.assertEqual(message.recipient.type, Recipient.HUDDLE)
 | 
						|
 | 
						|
        # Confirm Othello received the message.
 | 
						|
        user_profile = self.example_user('othello')
 | 
						|
        message = most_recent_message(user_profile)
 | 
						|
 | 
						|
        self.assertEqual(message.content, "TestMissedHuddleMessageEmailMessages Body")
 | 
						|
        self.assertEqual(message.sender, self.example_user('cordelia'))
 | 
						|
        self.assertEqual(message.recipient.type, Recipient.HUDDLE)
 | 
						|
 | 
						|
class TestEmptyGatewaySetting(ZulipTestCase):
 | 
						|
    def test_missed_message(self) -> None:
 | 
						|
        email = self.example_email('othello')
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/messages", {"type": "private",
 | 
						|
                                                     "content": "test_receive_missed_message_email_messages",
 | 
						|
                                                     "client": "test suite",
 | 
						|
                                                     "to": ujson.dumps([self.example_email('cordelia'),
 | 
						|
                                                                        self.example_email('iago')])})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        user_profile = self.example_user('cordelia')
 | 
						|
        usermessage = most_recent_usermessage(user_profile)
 | 
						|
        with self.settings(EMAIL_GATEWAY_PATTERN=''):
 | 
						|
            mm_address = create_missed_message_address(user_profile, usermessage.message)
 | 
						|
            self.assertEqual(mm_address, FromAddress.NOREPLY)
 | 
						|
 | 
						|
    def test_encode_email_addr(self) -> None:
 | 
						|
        stream = get_stream("Denmark", get_realm("zulip"))
 | 
						|
 | 
						|
        with self.settings(EMAIL_GATEWAY_PATTERN=''):
 | 
						|
            test_address = encode_email_address(stream)
 | 
						|
            self.assertEqual(test_address, '')
 | 
						|
 | 
						|
class TestDigestEmailMessages(ZulipTestCase):
 | 
						|
    @mock.patch('zerver.lib.digest.enough_traffic')
 | 
						|
    @mock.patch('zerver.lib.digest.send_future_email')
 | 
						|
    def test_receive_digest_email_messages(self, mock_send_future_email: mock.MagicMock,
 | 
						|
                                           mock_enough_traffic: mock.MagicMock) -> None:
 | 
						|
 | 
						|
        # build dummy messages for missed messages email reply
 | 
						|
        # have Hamlet send Othello a PM. Othello will reply via email
 | 
						|
        # Hamlet will receive the message.
 | 
						|
        email = self.example_email('hamlet')
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post("/json/messages", {"type": "private",
 | 
						|
                                                     "content": "test_receive_missed_message_email_messages",
 | 
						|
                                                     "client": "test suite",
 | 
						|
                                                     "to": self.example_email('othello')})
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        user_profile = self.example_user('othello')
 | 
						|
        cutoff = time.mktime(datetime.datetime(year=2016, month=1, day=1).timetuple())
 | 
						|
 | 
						|
        handle_digest_email(user_profile.id, cutoff)
 | 
						|
        self.assertEqual(mock_send_future_email.call_count, 1)
 | 
						|
        self.assertEqual(mock_send_future_email.call_args[1]['to_user_id'], user_profile.id)
 | 
						|
 | 
						|
    @mock.patch('zerver.lib.digest.queue_digest_recipient')
 | 
						|
    @mock.patch('zerver.lib.digest.timezone_now')
 | 
						|
    def test_inactive_users_queued_for_digest(self, mock_django_timezone: mock.MagicMock,
 | 
						|
                                              mock_queue_digest_recipient: mock.MagicMock) -> None:
 | 
						|
        cutoff = timezone_now()
 | 
						|
        # Test Tuesday
 | 
						|
        mock_django_timezone.return_value = datetime.datetime(year=2016, month=1, day=5)
 | 
						|
        all_user_profiles = UserProfile.objects.filter(
 | 
						|
            is_active=True, is_bot=False, enable_digest_emails=True)
 | 
						|
        # Check that all users without an a UserActivity entry are considered
 | 
						|
        # inactive users and get enqueued.
 | 
						|
        enqueue_emails(cutoff)
 | 
						|
        self.assertEqual(mock_queue_digest_recipient.call_count, all_user_profiles.count())
 | 
						|
        mock_queue_digest_recipient.reset_mock()
 | 
						|
        for realm in Realm.objects.filter(deactivated=False, show_digest_email=True):
 | 
						|
            user_profiles = all_user_profiles.filter(realm=realm)
 | 
						|
            for user_profile in user_profiles:
 | 
						|
                UserActivity.objects.create(
 | 
						|
                    last_visit=cutoff - datetime.timedelta(days=1),
 | 
						|
                    user_profile=user_profile,
 | 
						|
                    count=0,
 | 
						|
                    client=get_client('test_client'))
 | 
						|
        # Check that inactive users are enqueued
 | 
						|
        enqueue_emails(cutoff)
 | 
						|
        self.assertEqual(mock_queue_digest_recipient.call_count, all_user_profiles.count())
 | 
						|
 | 
						|
    @mock.patch('zerver.lib.digest.enough_traffic', return_value=True)
 | 
						|
    @mock.patch('zerver.lib.digest.timezone_now')
 | 
						|
    def test_active_users_not_enqueued(self, mock_django_timezone: mock.MagicMock,
 | 
						|
                                       mock_enough_traffic: mock.MagicMock) -> None:
 | 
						|
        cutoff = timezone_now()
 | 
						|
        # A Tuesday
 | 
						|
        mock_django_timezone.return_value = datetime.datetime(year=2016, month=1, day=5)
 | 
						|
        realms = Realm.objects.filter(deactivated=False, show_digest_email=True)
 | 
						|
        for realm in realms:
 | 
						|
            user_profiles = UserProfile.objects.filter(realm=realm)
 | 
						|
            for counter, user_profile in enumerate(user_profiles, 1):
 | 
						|
                UserActivity.objects.create(
 | 
						|
                    last_visit=cutoff + datetime.timedelta(days=1),
 | 
						|
                    user_profile=user_profile,
 | 
						|
                    count=0,
 | 
						|
                    client=get_client('test_client'))
 | 
						|
        # Check that an active user is not enqueued
 | 
						|
        with mock.patch('zerver.lib.digest.queue_digest_recipient') as mock_queue_digest_recipient:
 | 
						|
            enqueue_emails(cutoff)
 | 
						|
            self.assertEqual(mock_queue_digest_recipient.call_count, 0)
 | 
						|
 | 
						|
    @mock.patch('zerver.lib.digest.queue_digest_recipient')
 | 
						|
    @mock.patch('zerver.lib.digest.timezone_now')
 | 
						|
    def test_only_enqueue_on_valid_day(self, mock_django_timezone: mock.MagicMock,
 | 
						|
                                       mock_queue_digest_recipient: mock.MagicMock) -> None:
 | 
						|
        # Not a Tuesday
 | 
						|
        mock_django_timezone.return_value = datetime.datetime(year=2016, month=1, day=6)
 | 
						|
 | 
						|
        # Check that digests are not sent on days other than Tuesday.
 | 
						|
        cutoff = timezone_now()
 | 
						|
        enqueue_emails(cutoff)
 | 
						|
        self.assertEqual(mock_queue_digest_recipient.call_count, 0)
 | 
						|
 | 
						|
    @mock.patch('zerver.lib.digest.queue_digest_recipient')
 | 
						|
    @mock.patch('zerver.lib.digest.timezone_now')
 | 
						|
    def test_no_email_digest_for_bots(self, mock_django_timezone: mock.MagicMock,
 | 
						|
                                      mock_queue_digest_recipient: mock.MagicMock) -> None:
 | 
						|
        cutoff = timezone_now()
 | 
						|
        # A Tuesday
 | 
						|
        mock_django_timezone.return_value = datetime.datetime(year=2016, month=1, day=5)
 | 
						|
        bot = do_create_user('some_bot@example.com', 'password', get_realm('zulip'), 'some_bot', '',
 | 
						|
                             bot_type=UserProfile.DEFAULT_BOT)
 | 
						|
        UserActivity.objects.create(
 | 
						|
            last_visit=cutoff - datetime.timedelta(days=1),
 | 
						|
            user_profile=bot,
 | 
						|
            count=0,
 | 
						|
            client=get_client('test_client'))
 | 
						|
 | 
						|
        # Check that bots are not sent emails
 | 
						|
        enqueue_emails(cutoff)
 | 
						|
        for arg in mock_queue_digest_recipient.call_args_list:
 | 
						|
            user = arg[0][0]
 | 
						|
            self.assertNotEqual(user.id, bot.id)
 | 
						|
 | 
						|
    @mock.patch('zerver.lib.digest.timezone_now')
 | 
						|
    def test_new_stream_link(self, mock_django_timezone: mock.MagicMock) -> None:
 | 
						|
        cutoff = datetime.datetime(year=2017, month=11, day=1)
 | 
						|
        mock_django_timezone.return_value = datetime.datetime(year=2017, month=11, day=5)
 | 
						|
        cordelia = self.example_user('cordelia')
 | 
						|
        stream_id = create_stream_if_needed(cordelia.realm, 'New stream')[0].id
 | 
						|
        new_stream = gather_new_streams(cordelia, cutoff)[1]
 | 
						|
        expected_html = "<a href='http://zulip.testserver/#narrow/stream/{stream_id}-New-stream'>New stream</a>".format(stream_id=stream_id)
 | 
						|
        self.assertIn(expected_html, new_stream['html'])
 | 
						|
 | 
						|
class TestReplyExtraction(ZulipTestCase):
 | 
						|
    def test_reply_is_extracted_from_plain(self) -> None:
 | 
						|
 | 
						|
        # build dummy messages for stream
 | 
						|
        # test valid incoming stream message is processed properly
 | 
						|
        email = self.example_email('hamlet')
 | 
						|
        self.login(email)
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        self.subscribe(user_profile, "Denmark")
 | 
						|
        stream = get_stream("Denmark", user_profile.realm)
 | 
						|
 | 
						|
        stream_to_address = encode_email_address(stream)
 | 
						|
        text = """Reply
 | 
						|
 | 
						|
        -----Original Message-----
 | 
						|
 | 
						|
        Quote"""
 | 
						|
 | 
						|
        incoming_valid_message = MIMEText(text)  # type: Any # https://github.com/python/typeshed/issues/275
 | 
						|
 | 
						|
        incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
 | 
						|
        incoming_valid_message['From'] = self.example_email('hamlet')
 | 
						|
        incoming_valid_message['To'] = stream_to_address
 | 
						|
        incoming_valid_message['Reply-to'] = self.example_email('othello')
 | 
						|
 | 
						|
        process_message(incoming_valid_message)
 | 
						|
 | 
						|
        # Hamlet is subscribed to this stream so should see the email message from Othello.
 | 
						|
        message = most_recent_message(user_profile)
 | 
						|
 | 
						|
        self.assertEqual(message.content, "Reply")
 | 
						|
 | 
						|
    def test_reply_is_extracted_from_html(self) -> None:
 | 
						|
 | 
						|
        # build dummy messages for stream
 | 
						|
        # test valid incoming stream message is processed properly
 | 
						|
        email = self.example_email('hamlet')
 | 
						|
        self.login(email)
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        self.subscribe(user_profile, "Denmark")
 | 
						|
        stream = get_stream("Denmark", user_profile.realm)
 | 
						|
 | 
						|
        stream_to_address = encode_email_address(stream)
 | 
						|
        html = """
 | 
						|
        <html>
 | 
						|
            <body>
 | 
						|
                <p>Reply</p>
 | 
						|
                <blockquote>
 | 
						|
 | 
						|
                    <div>
 | 
						|
                        On 11-Apr-2011, at 6:54 PM, Bob <bob@example.com> wrote:
 | 
						|
                    </div>
 | 
						|
 | 
						|
                    <div>
 | 
						|
                        Quote
 | 
						|
                    </div>
 | 
						|
 | 
						|
                </blockquote>
 | 
						|
            </body>
 | 
						|
        </html>
 | 
						|
        """
 | 
						|
 | 
						|
        incoming_valid_message = MIMEText(html, 'html')  # type: Any # https://github.com/python/typeshed/issues/275
 | 
						|
 | 
						|
        incoming_valid_message['Subject'] = 'TestStreamEmailMessages Subject'
 | 
						|
        incoming_valid_message['From'] = self.example_email('hamlet')
 | 
						|
        incoming_valid_message['To'] = stream_to_address
 | 
						|
        incoming_valid_message['Reply-to'] = self.example_email('othello')
 | 
						|
 | 
						|
        process_message(incoming_valid_message)
 | 
						|
 | 
						|
        # Hamlet is subscribed to this stream so should see the email message from Othello.
 | 
						|
        message = most_recent_message(user_profile)
 | 
						|
 | 
						|
        self.assertEqual(message.content, 'Reply')
 | 
						|
 | 
						|
MAILS_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "fixtures", "email")
 | 
						|
 | 
						|
 | 
						|
class TestScriptMTA(ZulipTestCase):
 | 
						|
 | 
						|
    def test_success(self) -> None:
 | 
						|
        script = os.path.join(os.path.dirname(__file__),
 | 
						|
                              '../../scripts/lib/email-mirror-postfix')
 | 
						|
 | 
						|
        sender = self.example_email('hamlet')
 | 
						|
        stream = get_stream("Denmark", get_realm("zulip"))
 | 
						|
        stream_to_address = encode_email_address(stream)
 | 
						|
 | 
						|
        template_path = os.path.join(MAILS_DIR, "simple.txt")
 | 
						|
        with open(template_path) as template_file:
 | 
						|
            mail_template = template_file.read()
 | 
						|
        mail = mail_template.format(stream_to_address=stream_to_address, sender=sender)
 | 
						|
        read_pipe, write_pipe = os.pipe()
 | 
						|
        os.write(write_pipe, mail.encode())
 | 
						|
        os.close(write_pipe)
 | 
						|
        subprocess.check_call(
 | 
						|
            [script, '-r', stream_to_address, '-s', settings.SHARED_SECRET, '-t'],
 | 
						|
            stdin=read_pipe)
 | 
						|
 | 
						|
    def test_error_no_recipient(self) -> None:
 | 
						|
        script = os.path.join(os.path.dirname(__file__),
 | 
						|
                              '../../scripts/lib/email-mirror-postfix')
 | 
						|
 | 
						|
        sender = self.example_email('hamlet')
 | 
						|
        stream = get_stream("Denmark", get_realm("zulip"))
 | 
						|
        stream_to_address = encode_email_address(stream)
 | 
						|
        template_path = os.path.join(MAILS_DIR, "simple.txt")
 | 
						|
        with open(template_path) as template_file:
 | 
						|
            mail_template = template_file.read()
 | 
						|
        mail = mail_template.format(stream_to_address=stream_to_address, sender=sender)
 | 
						|
        read_pipe, write_pipe = os.pipe()
 | 
						|
        os.write(write_pipe, mail.encode())
 | 
						|
        os.close(write_pipe)
 | 
						|
        success_call = True
 | 
						|
        try:
 | 
						|
            subprocess.check_output([script, '-s', settings.SHARED_SECRET, '-t'],
 | 
						|
                                    stdin=read_pipe)
 | 
						|
        except subprocess.CalledProcessError as e:
 | 
						|
            self.assertEqual(
 | 
						|
                e.output,
 | 
						|
                b'5.1.1 Bad destination mailbox address: No missed message email address.\n'
 | 
						|
            )
 | 
						|
            self.assertEqual(e.returncode, 67)
 | 
						|
            success_call = False
 | 
						|
        self.assertFalse(success_call)
 | 
						|
 | 
						|
 | 
						|
class TestEmailMirrorTornadoView(ZulipTestCase):
 | 
						|
 | 
						|
    def send_private_message(self) -> Text:
 | 
						|
        email = self.example_email('othello')
 | 
						|
        self.login(email)
 | 
						|
        result = self.client_post(
 | 
						|
            "/json/messages",
 | 
						|
            {
 | 
						|
                "type": "private",
 | 
						|
                "content": "test_receive_missed_message_email_messages",
 | 
						|
                "client": "test suite",
 | 
						|
                "to": ujson.dumps([self.example_email('cordelia'), self.example_email('iago')])
 | 
						|
            })
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        user_profile = self.example_user('cordelia')
 | 
						|
        user_message = most_recent_usermessage(user_profile)
 | 
						|
        return create_missed_message_address(user_profile, user_message.message)
 | 
						|
 | 
						|
    @mock.patch('zerver.lib.email_mirror.queue_json_publish')
 | 
						|
    def send_offline_message(self, to_address: str, sender: str,
 | 
						|
                             mock_queue_json_publish: mock.Mock) -> HttpResponse:
 | 
						|
        template_path = os.path.join(MAILS_DIR, "simple.txt")
 | 
						|
        with open(template_path) as template_file:
 | 
						|
            mail_template = template_file.read()
 | 
						|
        mail = mail_template.format(stream_to_address=to_address, sender=sender)
 | 
						|
 | 
						|
        def check_queue_json_publish(queue_name: str,
 | 
						|
                                     event: Union[Mapping[str, Any], str],
 | 
						|
                                     processor: Callable[[Any], None]=None) -> None:
 | 
						|
            self.assertEqual(queue_name, "email_mirror")
 | 
						|
            self.assertEqual(event, {"rcpt_to": to_address, "message": mail})
 | 
						|
 | 
						|
        mock_queue_json_publish.side_effect = check_queue_json_publish
 | 
						|
        request_data = {
 | 
						|
            "recipient": to_address,
 | 
						|
            "msg_text": mail
 | 
						|
        }
 | 
						|
        post_data = dict(
 | 
						|
            data=ujson.dumps(request_data),
 | 
						|
            secret=settings.SHARED_SECRET
 | 
						|
        )
 | 
						|
        return self.client_post('/email_mirror_message', post_data)
 | 
						|
 | 
						|
    def test_success_stream(self) -> None:
 | 
						|
        stream = get_stream("Denmark", get_realm("zulip"))
 | 
						|
        stream_to_address = encode_email_address(stream)
 | 
						|
        result = self.send_offline_message(stream_to_address, self.example_email('hamlet'))
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_error_to_stream_with_wrong_address(self) -> None:
 | 
						|
        stream = get_stream("Denmark", get_realm("zulip"))
 | 
						|
        stream_to_address = encode_email_address(stream)
 | 
						|
        stream_to_address = stream_to_address.replace("Denmark", "Wrong_stream")
 | 
						|
 | 
						|
        result = self.send_offline_message(stream_to_address, self.example_email('hamlet'))
 | 
						|
        self.assert_json_error(
 | 
						|
            result,
 | 
						|
            "5.1.1 Bad destination mailbox address: "
 | 
						|
            "Please use the address specified in your Streams page.")
 | 
						|
 | 
						|
    def test_success_to_private(self) -> None:
 | 
						|
        mm_address = self.send_private_message()
 | 
						|
        result = self.send_offline_message(mm_address, self.example_email('cordelia'))
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
    def test_using_mm_address_twice(self) -> None:
 | 
						|
        mm_address = self.send_private_message()
 | 
						|
        self.send_offline_message(mm_address, self.example_email('cordelia'))
 | 
						|
        result = self.send_offline_message(mm_address, self.example_email('cordelia'))
 | 
						|
        self.assert_json_error(
 | 
						|
            result,
 | 
						|
            "5.1.1 Bad destination mailbox address: Bad or expired missed message address.")
 | 
						|
 | 
						|
    def test_wrong_missed_email_private_message(self) -> None:
 | 
						|
        self.send_private_message()
 | 
						|
        mm_address = 'mm' + ('x' * 32) + '@testserver'
 | 
						|
        result = self.send_offline_message(mm_address, self.example_email('cordelia'))
 | 
						|
        self.assert_json_error(
 | 
						|
            result,
 | 
						|
            "5.1.1 Bad destination mailbox address: Bad or expired missed message address.")
 |