email_mirror: Add send_to_email_mirror management command.

Closes #11195. We add a management command to allow us to send emails
to the email mirror directly. The command doesn't require any
configuring of email sending or receiving for the email mirror,
it passes the emails directly using the process_message function.
This commit is contained in:
Mateusz Mandera
2019-01-09 12:27:29 +01:00
committed by Tim Abbott
parent 726767ece5
commit 3e5f89f2fe
4 changed files with 181 additions and 1 deletions

View File

@@ -3,7 +3,9 @@
import glob
import os
import re
import email
from datetime import timedelta
from email.mime.text import MIMEText
from email.utils import parseaddr
from mock import MagicMock, patch, call
from typing import List, Dict, Any, Optional
@@ -16,8 +18,9 @@ from zerver.lib.management import ZulipBaseCommand, CommandError, check_config
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import stdout_suppressed
from zerver.lib.test_runner import slow
from zerver.models import get_user_profile_by_email
from zerver.models import Recipient, get_user_profile_by_email, get_stream
from zerver.lib.test_helpers import most_recent_message
from zerver.models import get_realm, UserProfile, Realm
from confirmation.models import RealmCreationKey, generate_realm_creation_url
@@ -295,3 +298,46 @@ class TestRealmReactivationEmail(ZulipTestCase):
realm = get_realm('zulip')
with self.assertRaisesRegex(CommandError, "The realm %s is already active." % (realm.name,)):
call_command(self.COMMAND_NAME, "--realm=zulip")
class TestSendToEmailMirror(ZulipTestCase):
COMMAND_NAME = "send_to_email_mirror"
def test_sending_a_fixture(self) -> None:
fixture_path = "zerver/tests/fixtures/email/1.txt"
user_profile = self.example_user('hamlet')
self.login(user_profile.email)
self.subscribe(user_profile, "Denmark")
call_command(self.COMMAND_NAME, "--fixture={}".format(fixture_path))
message = most_recent_message(user_profile)
# last message should be equal to the body of the email in 1.txt
self.assertEqual(message.content, "Email fixture 1.txt body")
def test_sending_a_json_fixture(self) -> None:
fixture_path = "zerver/tests/fixtures/email/1.json"
user_profile = self.example_user('hamlet')
self.login(user_profile.email)
self.subscribe(user_profile, "Denmark")
call_command(self.COMMAND_NAME, "--fixture={}".format(fixture_path))
message = most_recent_message(user_profile)
# last message should be equal to the body of the email in 1.json
self.assertEqual(message.content, "Email fixture 1.json body")
def test_stream_option(self) -> None:
fixture_path = "zerver/tests/fixtures/email/1.txt"
user_profile = self.example_user('hamlet')
self.login(user_profile.email)
self.subscribe(user_profile, "Denmark2")
call_command(self.COMMAND_NAME, "--fixture={}".format(fixture_path), "--stream=Denmark2")
message = most_recent_message(user_profile)
# last message should be equal to the body of the email in 1.txt
self.assertEqual(message.content, "Email fixture 1.txt body")
stream_id = get_stream("Denmark2", message.sender.realm).id
self.assertEqual(message.recipient.type, Recipient.STREAM)
self.assertEqual(message.recipient.type_id, stream_id)