mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	This will allow for customized senders for emails, e.g. 'Zulip Digest' for digest emails and 'Zulip Missed Messages' for missed message emails. Also: * Converts the sender name to always be "Zulip", if the from_email used to be settings.NOREPLY_EMAIL_ADDRESS or settings.ZULIP_ADMINISTRATOR. * Changes the default value of settings.NOREPLY_EMAIL_ADDRESS in the prod_setting_template to no longer have a display name. The only use of that display name was in the email pathway.
		
			
				
	
	
		
			1947 lines
		
	
	
		
			82 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1947 lines
		
	
	
		
			82 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
from __future__ import absolute_import
 | 
						|
import datetime
 | 
						|
from django.conf import settings
 | 
						|
from django.contrib.contenttypes.models import ContentType
 | 
						|
from django.contrib.sites.models import Site
 | 
						|
from django.http import HttpResponse
 | 
						|
from django.test import TestCase
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
 | 
						|
from mock import patch, MagicMock
 | 
						|
from zerver.lib.test_helpers import MockLDAP
 | 
						|
 | 
						|
from confirmation.models import Confirmation
 | 
						|
 | 
						|
from zilencer.models import Deployment
 | 
						|
 | 
						|
from zerver.forms import HomepageForm, WRONG_SUBDOMAIN_ERROR
 | 
						|
from zerver.lib.actions import do_change_password
 | 
						|
from zerver.views.auth import login_or_register_remote_user
 | 
						|
from zerver.views.invite import get_invitee_emails_set
 | 
						|
from zerver.views.registration import confirmation_key, \
 | 
						|
    redirect_and_log_into_subdomain, send_registration_completion_email
 | 
						|
 | 
						|
from zerver.models import (
 | 
						|
    get_realm, get_prereg_user_by_email, get_user,
 | 
						|
    get_unique_open_realm, get_unique_non_system_realm,
 | 
						|
    completely_open, get_recipient,
 | 
						|
    PreregistrationUser, Realm, RealmDomain, Recipient, Message,
 | 
						|
    Referral, ScheduledJob, UserProfile, UserMessage,
 | 
						|
    Stream, Subscription, ScheduledJob, flush_per_request_caches
 | 
						|
)
 | 
						|
from zerver.lib.actions import (
 | 
						|
    set_default_streams,
 | 
						|
    do_change_is_admin,
 | 
						|
    get_stream,
 | 
						|
    do_create_realm,
 | 
						|
)
 | 
						|
from zerver.lib.send_email import display_email, send_email, send_future_email
 | 
						|
from zerver.lib.initial_password import initial_password
 | 
						|
from zerver.lib.actions import (
 | 
						|
    do_deactivate_realm,
 | 
						|
    do_set_realm_property,
 | 
						|
    add_new_user_history,
 | 
						|
)
 | 
						|
from zerver.lib.mobile_auth_otp import xor_hex_strings, ascii_to_hex, \
 | 
						|
    otp_encrypt_api_key, is_valid_otp, hex_to_ascii, otp_decrypt_api_key
 | 
						|
from zerver.lib.notifications import enqueue_welcome_emails, \
 | 
						|
    one_click_unsubscribe_link
 | 
						|
from zerver.lib.test_helpers import find_pattern_in_email, find_key_by_email, queries_captured, \
 | 
						|
    HostRequestMock, unsign_subdomain_cookie, POSTRequestMock
 | 
						|
from zerver.lib.test_classes import (
 | 
						|
    ZulipTestCase,
 | 
						|
)
 | 
						|
from zerver.lib.test_runner import slow
 | 
						|
from zerver.lib.sessions import get_session_dict_user
 | 
						|
from zerver.context_processors import common_context
 | 
						|
 | 
						|
from collections import defaultdict
 | 
						|
import re
 | 
						|
import ujson
 | 
						|
 | 
						|
from typing import Any, Dict, List, Optional, Set
 | 
						|
 | 
						|
from six.moves import urllib
 | 
						|
from six.moves import range
 | 
						|
from typing import Any, Text
 | 
						|
import os
 | 
						|
 | 
						|
class RedirectAndLogIntoSubdomainTestCase(ZulipTestCase):
 | 
						|
    def test_cookie_data(self):
 | 
						|
        # type: () -> None
 | 
						|
        realm = Realm.objects.all().first()
 | 
						|
        name = 'Hamlet'
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        response = redirect_and_log_into_subdomain(realm, name, email)
 | 
						|
        data = unsign_subdomain_cookie(response)
 | 
						|
        self.assertDictEqual(data, {'name': name, 'email': email,
 | 
						|
                                    'subdomain': realm.subdomain,
 | 
						|
                                    'is_signup': False})
 | 
						|
 | 
						|
        response = redirect_and_log_into_subdomain(realm, name, email,
 | 
						|
                                                   is_signup=True)
 | 
						|
        data = unsign_subdomain_cookie(response)
 | 
						|
        self.assertDictEqual(data, {'name': name, 'email': email,
 | 
						|
                                    'subdomain': realm.subdomain,
 | 
						|
                                    'is_signup': True})
 | 
						|
 | 
						|
class AddNewUserHistoryTest(ZulipTestCase):
 | 
						|
    def test_add_new_user_history_race(self):
 | 
						|
        # type: () -> None
 | 
						|
        """Sends a message during user creation"""
 | 
						|
        # Create a user who hasn't had historical messages added
 | 
						|
        stream_dict = {
 | 
						|
            "Denmark": {"description": "A Scandinavian country", "invite_only": False},
 | 
						|
            "Verona": {"description": "A city in Italy", "invite_only": False}
 | 
						|
        }  # type: Dict[Text, Dict[Text, Any]]
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        set_default_streams(realm, stream_dict)
 | 
						|
        with patch("zerver.lib.actions.add_new_user_history"):
 | 
						|
            self.register(self.nonreg_email('test'), "test")
 | 
						|
        user_profile = self.nonreg_user('test')
 | 
						|
 | 
						|
        subs = Subscription.objects.select_related("recipient").filter(
 | 
						|
            user_profile=user_profile, recipient__type=Recipient.STREAM)
 | 
						|
        streams = Stream.objects.filter(id__in=[sub.recipient.type_id for sub in subs])
 | 
						|
        self.send_message(self.example_email('hamlet'), streams[0].name, Recipient.STREAM, "test")
 | 
						|
        add_new_user_history(user_profile, streams)
 | 
						|
 | 
						|
class PasswordResetTest(ZulipTestCase):
 | 
						|
    """
 | 
						|
    Log in, reset password, log out, log in with new password.
 | 
						|
    """
 | 
						|
 | 
						|
    def test_password_reset(self):
 | 
						|
        # type: () -> None
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        old_password = initial_password(email)
 | 
						|
 | 
						|
        self.login(email)
 | 
						|
 | 
						|
        # test password reset template
 | 
						|
        result = self.client_get('/accounts/password/reset/')
 | 
						|
        self.assert_in_response('Reset your password', result)
 | 
						|
 | 
						|
        # start the password reset process by supplying an email address
 | 
						|
        result = self.client_post('/accounts/password/reset/', {'email': email})
 | 
						|
 | 
						|
        # check the redirect link telling you to check mail for password reset link
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/password/reset/done/"))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
 | 
						|
        self.assert_in_response("Check your email to finish the process.", result)
 | 
						|
 | 
						|
        # Visit the password reset link.
 | 
						|
        password_reset_url = self.get_confirmation_url_from_outbox(email, "(\S+)")
 | 
						|
        result = self.client_get(password_reset_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        # Reset your password
 | 
						|
        result = self.client_post(password_reset_url,
 | 
						|
                                  {'new_password1': 'new_password',
 | 
						|
                                   'new_password2': 'new_password'})
 | 
						|
 | 
						|
        # password reset succeeded
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith("/password/done/"))
 | 
						|
 | 
						|
        # log back in with new password
 | 
						|
        self.login(email, password='new_password')
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
 | 
						|
 | 
						|
        # make sure old password no longer works
 | 
						|
        self.login(email, password=old_password, fails=True)
 | 
						|
 | 
						|
    def test_invalid_subdomain(self):
 | 
						|
        # type: () -> None
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        string_id = 'hamlet'
 | 
						|
        name = 'Hamlet'
 | 
						|
        do_create_realm(
 | 
						|
            string_id,
 | 
						|
            name,
 | 
						|
            restricted_to_domain=False,
 | 
						|
            invite_required=False
 | 
						|
        )
 | 
						|
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=True):
 | 
						|
            with patch('zerver.forms.get_subdomain', return_value=string_id):
 | 
						|
                # start the password reset process by supplying an email address
 | 
						|
                result = self.client_post(
 | 
						|
                    '/accounts/password/reset/', {'email': email})
 | 
						|
 | 
						|
        # check the redirect link telling you to check mail for password reset link
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/password/reset/done/"))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
 | 
						|
        self.assert_in_response("Check your email to finish the process.", result)
 | 
						|
 | 
						|
        from django.core.mail import outbox
 | 
						|
        self.assertEqual(len(outbox), 1)
 | 
						|
        message = outbox.pop()
 | 
						|
        self.assertIn("hamlet@zulip.com does not\nhave an active account in http://",
 | 
						|
                      message.body)
 | 
						|
 | 
						|
    def test_correct_subdomain(self):
 | 
						|
        # type: () -> None
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        string_id = 'zulip'
 | 
						|
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=True):
 | 
						|
            with patch('zerver.forms.get_subdomain', return_value=string_id):
 | 
						|
                # start the password reset process by supplying an email address
 | 
						|
                result = self.client_post(
 | 
						|
                    '/accounts/password/reset/', {'email': email})
 | 
						|
 | 
						|
        # check the redirect link telling you to check mail for password reset link
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/password/reset/done/"))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
 | 
						|
        self.assert_in_response("Check your email to finish the process.", result)
 | 
						|
 | 
						|
        from django.core.mail import outbox
 | 
						|
        self.assertEqual(len(outbox), 1)
 | 
						|
        message = outbox.pop()
 | 
						|
        self.assertIn("Psst. Word on the street is that you forgot your password,",
 | 
						|
                      message.body)
 | 
						|
 | 
						|
    def test_redirect_endpoints(self):
 | 
						|
        # type: () -> None
 | 
						|
        '''
 | 
						|
        These tests are mostly designed to give us 100% URL coverage
 | 
						|
        in our URL coverage reports.  Our mechanism for finding URL
 | 
						|
        coverage doesn't handle redirects, so we just have a few quick
 | 
						|
        tests here.
 | 
						|
        '''
 | 
						|
        result = self.client_get('/accounts/password/reset/done/')
 | 
						|
        self.assert_in_success_response(["Check your email"], result)
 | 
						|
 | 
						|
        result = self.client_get('/accounts/password/done/')
 | 
						|
        self.assert_in_success_response(["We've reset your password!"], result)
 | 
						|
 | 
						|
        result = self.client_get('/accounts/send_confirm/alice@example.com')
 | 
						|
        self.assert_in_success_response(["Still no email?"], result)
 | 
						|
 | 
						|
class LoginTest(ZulipTestCase):
 | 
						|
    """
 | 
						|
    Logging in, registration, and logging out.
 | 
						|
    """
 | 
						|
 | 
						|
    def test_login(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
 | 
						|
 | 
						|
    def test_login_bad_password(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"), password="wrongpassword", fails=True)
 | 
						|
        self.assertIsNone(get_session_dict_user(self.client.session))
 | 
						|
 | 
						|
    def test_login_nonexist_user(self):
 | 
						|
        # type: () -> None
 | 
						|
        result = self.login_with_return("xxx@zulip.com", "xxx")
 | 
						|
        self.assert_in_response("Please enter a correct email and password", result)
 | 
						|
 | 
						|
    def test_register(self):
 | 
						|
        # type: () -> None
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        stream_dict = {"stream_"+str(i): {"description": "stream_%s_description" % i, "invite_only": False}
 | 
						|
                       for i in range(40)}  # type: Dict[Text, Dict[Text, Any]]
 | 
						|
        for stream_name in stream_dict.keys():
 | 
						|
            self.make_stream(stream_name, realm=realm)
 | 
						|
 | 
						|
        set_default_streams(realm, stream_dict)
 | 
						|
        # Clear all the caches.
 | 
						|
        flush_per_request_caches()
 | 
						|
        ContentType.objects.clear_cache()
 | 
						|
        Site.objects.clear_cache()
 | 
						|
 | 
						|
        with queries_captured() as queries:
 | 
						|
            self.register(self.nonreg_email('test'), "test")
 | 
						|
        # Ensure the number of queries we make is not O(streams)
 | 
						|
        self.assert_length(queries, 66)
 | 
						|
        user_profile = self.nonreg_user('test')
 | 
						|
        self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
 | 
						|
        self.assertFalse(user_profile.enable_stream_desktop_notifications)
 | 
						|
 | 
						|
    def test_register_deactivated(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        If you try to register for a deactivated realm, you get a clear error
 | 
						|
        page.
 | 
						|
        """
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        realm.deactivated = True
 | 
						|
        realm.save(update_fields=["deactivated"])
 | 
						|
 | 
						|
        result = self.register(self.nonreg_email('test'), "test")
 | 
						|
        self.assert_in_response("has been deactivated", result)
 | 
						|
 | 
						|
        with self.assertRaises(UserProfile.DoesNotExist):
 | 
						|
            self.nonreg_user('test')
 | 
						|
 | 
						|
    def test_login_deactivated(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        If you try to log in to a deactivated realm, you get a clear error page.
 | 
						|
        """
 | 
						|
        realm = get_realm("zulip")
 | 
						|
        realm.deactivated = True
 | 
						|
        realm.save(update_fields=["deactivated"])
 | 
						|
 | 
						|
        result = self.login_with_return(self.example_email("hamlet"))
 | 
						|
        self.assert_in_response("has been deactivated", result)
 | 
						|
 | 
						|
    def test_logout(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        # We use the logout API, not self.logout, to make sure we test
 | 
						|
        # the actual logout code path.
 | 
						|
        self.client_post('/accounts/logout/')
 | 
						|
        self.assertIsNone(get_session_dict_user(self.client.session))
 | 
						|
 | 
						|
    def test_non_ascii_login(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        You can log in even if your password contain non-ASCII characters.
 | 
						|
        """
 | 
						|
        email = self.nonreg_email('test')
 | 
						|
        password = u"hümbüǵ"
 | 
						|
 | 
						|
        # Registering succeeds.
 | 
						|
        self.register(email, password)
 | 
						|
        user_profile = self.nonreg_user('test')
 | 
						|
        self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
 | 
						|
        self.logout()
 | 
						|
        self.assertIsNone(get_session_dict_user(self.client.session))
 | 
						|
 | 
						|
        # Logging in succeeds.
 | 
						|
        self.logout()
 | 
						|
        self.login(email, password)
 | 
						|
        self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
 | 
						|
 | 
						|
    def test_login_page_redirects_logged_in_user(self):
 | 
						|
        # type: () -> None
 | 
						|
        """You will be redirected to the app's main page if you land on the
 | 
						|
        login page when already logged in.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("cordelia"))
 | 
						|
        response = self.client_get("/login/")
 | 
						|
        self.assertEqual(response["Location"], "/")
 | 
						|
 | 
						|
class InviteUserTest(ZulipTestCase):
 | 
						|
 | 
						|
    def invite(self, users, streams, body=''):
 | 
						|
        # type: (Text, List[Text], str) -> HttpResponse
 | 
						|
        """
 | 
						|
        Invites the specified users to Zulip with the specified streams.
 | 
						|
 | 
						|
        users should be a string containing the users to invite, comma or
 | 
						|
            newline separated.
 | 
						|
 | 
						|
        streams should be a list of strings.
 | 
						|
        """
 | 
						|
 | 
						|
        return self.client_post("/json/invite_users",
 | 
						|
                                {"invitee_emails": users,
 | 
						|
                                 "stream": streams,
 | 
						|
                                 "custom_body": body})
 | 
						|
 | 
						|
    def check_sent_emails(self, correct_recipients, custom_body=None):
 | 
						|
        # type: (List[Text], Optional[str]) -> None
 | 
						|
        from django.core.mail import outbox
 | 
						|
        self.assertEqual(len(outbox), len(correct_recipients))
 | 
						|
        email_recipients = [email.recipients()[0] for email in outbox]
 | 
						|
        self.assertEqual(sorted(email_recipients), sorted(correct_recipients))
 | 
						|
        if len(outbox) == 0:
 | 
						|
            return
 | 
						|
 | 
						|
        if custom_body is None:
 | 
						|
            self.assertNotIn("Message from", outbox[0].body)
 | 
						|
        else:
 | 
						|
            self.assertIn("Message from ", outbox[0].body)
 | 
						|
            self.assertIn(custom_body, outbox[0].body)
 | 
						|
 | 
						|
    def test_successful_invite_user(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/invite_users with valid parameters causes an invitation
 | 
						|
        email to be sent.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        invitee = "alice-test@zulip.com"
 | 
						|
        self.assert_json_success(self.invite(invitee, ["Denmark"]))
 | 
						|
        self.assertTrue(find_key_by_email(invitee))
 | 
						|
        self.check_sent_emails([invitee])
 | 
						|
 | 
						|
    def test_successful_invite_user_with_custom_body(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/invite_users with valid parameters causes an invitation
 | 
						|
        email to be sent.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        invitee = "alice-test@zulip.com"
 | 
						|
        body = "Custom Text."
 | 
						|
        self.assert_json_success(self.invite(invitee, ["Denmark"], body))
 | 
						|
        self.assertTrue(find_pattern_in_email(invitee, body))
 | 
						|
        self.check_sent_emails([invitee], custom_body=body)
 | 
						|
 | 
						|
    def test_successful_invite_user_with_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/invite_users with valid parameters causes an invitation
 | 
						|
        email to be sent.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        email = "alice-test@zulip.com"
 | 
						|
        invitee = "Alice Test <{}>".format(email)
 | 
						|
        self.assert_json_success(self.invite(invitee, ["Denmark"]))
 | 
						|
        self.assertTrue(find_key_by_email(email))
 | 
						|
        self.check_sent_emails([email])
 | 
						|
 | 
						|
    def test_successful_invite_user_with_name_and_normal_one(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/invite_users with valid parameters causes an invitation
 | 
						|
        email to be sent.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        email = "alice-test@zulip.com"
 | 
						|
        email2 = "bob-test@zulip.com"
 | 
						|
        invitee = "Alice Test <{}>, {}".format(email, email2)
 | 
						|
        self.assert_json_success(self.invite(invitee, ["Denmark"]))
 | 
						|
        self.assertTrue(find_key_by_email(email))
 | 
						|
        self.assertTrue(find_key_by_email(email2))
 | 
						|
        self.check_sent_emails([email, email2])
 | 
						|
 | 
						|
    def test_require_realm_admin(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        The invite_by_admins_only realm setting works properly.
 | 
						|
        """
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        realm.invite_by_admins_only = True
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        email = "alice-test@zulip.com"
 | 
						|
        email2 = "bob-test@zulip.com"
 | 
						|
        invitee = "Alice Test <{}>, {}".format(email, email2)
 | 
						|
        self.assert_json_error(self.invite(invitee, ["Denmark"]),
 | 
						|
                               "Must be a realm administrator")
 | 
						|
 | 
						|
        # Now verify an administrator can do it
 | 
						|
        self.login("iago@zulip.com")
 | 
						|
        self.assert_json_success(self.invite(invitee, ["Denmark"]))
 | 
						|
        self.assertTrue(find_key_by_email(email))
 | 
						|
        self.assertTrue(find_key_by_email(email2))
 | 
						|
        self.check_sent_emails([email, email2])
 | 
						|
 | 
						|
    def test_successful_invite_user_with_notifications_stream(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/invite_users with valid parameters unconditionally
 | 
						|
        subscribes the invitee to the notifications stream if it exists and is
 | 
						|
        public.
 | 
						|
        """
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        notifications_stream = get_stream('Verona', realm)
 | 
						|
        realm.notifications_stream = notifications_stream
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        invitee = 'alice-test@zulip.com'
 | 
						|
        self.assert_json_success(self.invite(invitee, ['Denmark']))
 | 
						|
        self.assertTrue(find_key_by_email(invitee))
 | 
						|
        self.check_sent_emails([invitee])
 | 
						|
 | 
						|
        prereg_user = get_prereg_user_by_email(invitee)
 | 
						|
        streams = list(prereg_user.streams.all())
 | 
						|
        self.assertTrue(notifications_stream in streams)
 | 
						|
 | 
						|
    def test_invite_user_signup_initial_history(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Test that a new user invited to a stream receives some initial
 | 
						|
        history but only from public streams.
 | 
						|
        """
 | 
						|
        self.login(self.example_email('hamlet'))
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        private_stream_name = "Secret"
 | 
						|
        self.make_stream(private_stream_name, invite_only=True)
 | 
						|
        self.subscribe_to_stream(user_profile.email, private_stream_name)
 | 
						|
        public_msg_id = self.send_message(self.example_email("hamlet"), "Denmark", Recipient.STREAM,
 | 
						|
                                          "Public topic", "Public message")
 | 
						|
        secret_msg_id = self.send_message(self.example_email("hamlet"), private_stream_name, Recipient.STREAM,
 | 
						|
                                          "Secret topic", "Secret message")
 | 
						|
        invitee = self.nonreg_email('alice')
 | 
						|
        self.assert_json_success(self.invite(invitee, [private_stream_name, "Denmark"]))
 | 
						|
        self.assertTrue(find_key_by_email(invitee))
 | 
						|
 | 
						|
        self.submit_reg_form_for_user(invitee, "password")
 | 
						|
        invitee_profile = self.nonreg_user('alice')
 | 
						|
        invitee_msg_ids = [um.message_id for um in
 | 
						|
                           UserMessage.objects.filter(user_profile=invitee_profile)]
 | 
						|
        self.assertTrue(public_msg_id in invitee_msg_ids)
 | 
						|
        self.assertFalse(secret_msg_id in invitee_msg_ids)
 | 
						|
 | 
						|
    def test_multi_user_invite(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Invites multiple users with a variety of delimiters.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        # Intentionally use a weird string.
 | 
						|
        self.assert_json_success(self.invite(
 | 
						|
            """bob-test@zulip.com,     carol-test@zulip.com,
 | 
						|
            dave-test@zulip.com
 | 
						|
 | 
						|
 | 
						|
earl-test@zulip.com""", ["Denmark"]))
 | 
						|
        for user in ("bob", "carol", "dave", "earl"):
 | 
						|
            self.assertTrue(find_key_by_email("%s-test@zulip.com" % (user,)))
 | 
						|
        self.check_sent_emails(["bob-test@zulip.com", "carol-test@zulip.com",
 | 
						|
                                "dave-test@zulip.com", "earl-test@zulip.com"])
 | 
						|
 | 
						|
    def test_missing_or_invalid_params(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Tests inviting with various missing or invalid parameters.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        self.assert_json_error(
 | 
						|
            self.client_post("/json/invite_users", {"invitee_emails": "foo@zulip.com",
 | 
						|
                                                    "custom_body": ''}),
 | 
						|
            "You must specify at least one stream for invitees to join.")
 | 
						|
 | 
						|
        for address in ("noatsign.com", "outsideyourdomain@example.net"):
 | 
						|
            self.assert_json_error(
 | 
						|
                self.invite(address, ["Denmark"]),
 | 
						|
                "Some emails did not validate, so we didn't send any invitations.")
 | 
						|
        self.check_sent_emails([])
 | 
						|
 | 
						|
        self.assert_json_error(
 | 
						|
            self.invite("", ["Denmark"]),
 | 
						|
            "You must specify at least one email address.")
 | 
						|
        self.check_sent_emails([])
 | 
						|
 | 
						|
    def test_invalid_stream(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Tests inviting to a non-existent stream.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        self.assert_json_error(self.invite("iago-test@zulip.com", ["NotARealStream"]),
 | 
						|
                               "Stream does not exist: NotARealStream. No invites were sent.")
 | 
						|
        self.check_sent_emails([])
 | 
						|
 | 
						|
    def test_invite_existing_user(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        If you invite an address already using Zulip, no invitation is sent.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        self.assert_json_error(
 | 
						|
            self.client_post("/json/invite_users",
 | 
						|
                             {"invitee_emails": self.example_email("hamlet"),
 | 
						|
                              "stream": ["Denmark"],
 | 
						|
                              "custom_body": ''}),
 | 
						|
            "We weren't able to invite anyone.")
 | 
						|
        self.assertRaises(PreregistrationUser.DoesNotExist,
 | 
						|
                          lambda: PreregistrationUser.objects.get(
 | 
						|
                              email=self.example_email("hamlet")))
 | 
						|
        self.check_sent_emails([])
 | 
						|
 | 
						|
    def test_invite_some_existing_some_new(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        If you invite a mix of already existing and new users, invitations are
 | 
						|
        only sent to the new users.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        existing = [self.example_email("hamlet"), u"othello@zulip.com"]
 | 
						|
        new = [u"foo-test@zulip.com", u"bar-test@zulip.com"]
 | 
						|
 | 
						|
        result = self.client_post("/json/invite_users",
 | 
						|
                                  {"invitee_emails": "\n".join(existing + new),
 | 
						|
                                   "stream": ["Denmark"],
 | 
						|
                                   "custom_body": ''})
 | 
						|
        self.assert_json_error(result,
 | 
						|
                               "Some of those addresses are already using Zulip, \
 | 
						|
so we didn't send them an invitation. We did send invitations to everyone else!")
 | 
						|
 | 
						|
        # We only created accounts for the new users.
 | 
						|
        for email in existing:
 | 
						|
            self.assertRaises(PreregistrationUser.DoesNotExist,
 | 
						|
                              lambda: PreregistrationUser.objects.get(
 | 
						|
                                  email=email))
 | 
						|
        for email in new:
 | 
						|
            self.assertTrue(PreregistrationUser.objects.get(email=email))
 | 
						|
 | 
						|
        # We only sent emails to the new users.
 | 
						|
        self.check_sent_emails(new)
 | 
						|
 | 
						|
        prereg_user = get_prereg_user_by_email('foo-test@zulip.com')
 | 
						|
        self.assertEqual(prereg_user.email, 'foo-test@zulip.com')
 | 
						|
 | 
						|
    def test_invite_outside_domain_in_closed_realm(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        In a realm with `restricted_to_domain = True`, you can't invite people
 | 
						|
        with a different domain from that of the realm or your e-mail address.
 | 
						|
        """
 | 
						|
        zulip_realm = get_realm("zulip")
 | 
						|
        zulip_realm.restricted_to_domain = True
 | 
						|
        zulip_realm.save()
 | 
						|
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        external_address = "foo@example.com"
 | 
						|
 | 
						|
        self.assert_json_error(
 | 
						|
            self.invite(external_address, ["Denmark"]),
 | 
						|
            "Some emails did not validate, so we didn't send any invitations.")
 | 
						|
 | 
						|
    def test_invite_outside_domain_in_open_realm(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        In a realm with `restricted_to_domain = False`, you can invite people
 | 
						|
        with a different domain from that of the realm or your e-mail address.
 | 
						|
        """
 | 
						|
        zulip_realm = get_realm("zulip")
 | 
						|
        zulip_realm.restricted_to_domain = False
 | 
						|
        zulip_realm.save()
 | 
						|
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        external_address = "foo@example.com"
 | 
						|
 | 
						|
        self.assert_json_success(self.invite(external_address, ["Denmark"]))
 | 
						|
        self.check_sent_emails([external_address])
 | 
						|
 | 
						|
    def test_invite_outside_domain_before_closing(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        If you invite someone with a different domain from that of the realm
 | 
						|
        when `restricted_to_domain = False`, but `restricted_to_domain` later
 | 
						|
        changes to true, the invitation should succeed but the invitee's signup
 | 
						|
        attempt should fail.
 | 
						|
        """
 | 
						|
        zulip_realm = get_realm("zulip")
 | 
						|
        zulip_realm.restricted_to_domain = False
 | 
						|
        zulip_realm.save()
 | 
						|
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        external_address = "foo@example.com"
 | 
						|
 | 
						|
        self.assert_json_success(self.invite(external_address, ["Denmark"]))
 | 
						|
        self.check_sent_emails([external_address])
 | 
						|
 | 
						|
        zulip_realm.restricted_to_domain = True
 | 
						|
        zulip_realm.save()
 | 
						|
 | 
						|
        result = self.submit_reg_form_for_user("foo@example.com", "password")
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
        self.assert_in_response("only allows users with e-mail", result)
 | 
						|
 | 
						|
    def test_invite_with_non_ascii_streams(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Inviting someone to streams with non-ASCII characters succeeds.
 | 
						|
        """
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        invitee = "alice-test@zulip.com"
 | 
						|
 | 
						|
        stream_name = u"hümbüǵ"
 | 
						|
 | 
						|
        # Make sure we're subscribed before inviting someone.
 | 
						|
        self.subscribe_to_stream(self.example_email("hamlet"), stream_name)
 | 
						|
 | 
						|
        self.assert_json_success(self.invite(invitee, [stream_name]))
 | 
						|
 | 
						|
    def test_refer_friend(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        user.invites_granted = 1
 | 
						|
        user.invites_used = 0
 | 
						|
        user.save()
 | 
						|
 | 
						|
        invitee = "alice-test@zulip.com"
 | 
						|
        result = self.client_post('/json/refer_friend', dict(email=invitee))
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        # verify this works
 | 
						|
        Referral.objects.get(user_profile=user, email=invitee)
 | 
						|
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertEqual(user.invites_used, 1)
 | 
						|
 | 
						|
    def test_refer_friend_no_email(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        user.invites_granted = 1
 | 
						|
        user.invites_used = 0
 | 
						|
        user.save()
 | 
						|
 | 
						|
        self.assert_json_error(
 | 
						|
            self.client_post('/json/refer_friend', dict(email='')),
 | 
						|
            "No email address specified")
 | 
						|
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertEqual(user.invites_used, 0)
 | 
						|
 | 
						|
    def test_refer_friend_no_invites(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login(self.example_email("hamlet"))
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        user.invites_granted = 1
 | 
						|
        user.invites_used = 1
 | 
						|
        user.save()
 | 
						|
 | 
						|
        invitee = "alice-test@zulip.com"
 | 
						|
        self.assert_json_error(
 | 
						|
            self.client_post('/json/refer_friend', dict(email=invitee)),
 | 
						|
            "Insufficient invites")
 | 
						|
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertEqual(user.invites_used, 1)
 | 
						|
 | 
						|
    def test_invitation_reminder_email(self):
 | 
						|
        # type: () -> None
 | 
						|
        from django.core.mail import outbox
 | 
						|
 | 
						|
        # All users belong to zulip realm
 | 
						|
        referrer_user = 'hamlet'
 | 
						|
        current_user_email = self.example_email(referrer_user)
 | 
						|
        self.login(current_user_email)
 | 
						|
        invitee = self.nonreg_email('alice')
 | 
						|
        self.assert_json_success(self.invite(invitee, ["Denmark"]))
 | 
						|
        self.assertTrue(find_key_by_email(invitee))
 | 
						|
        self.check_sent_emails([invitee])
 | 
						|
 | 
						|
        data = {"email": invitee, "referrer_email": current_user_email}
 | 
						|
        invitee = get_prereg_user_by_email(data["email"])
 | 
						|
        referrer = self.example_user(referrer_user)
 | 
						|
        link = Confirmation.objects.get_link_for_object(invitee, host=referrer.realm.host)
 | 
						|
        context = common_context(referrer)
 | 
						|
        context.update({
 | 
						|
            'activate_url': link,
 | 
						|
            'referrer_name': referrer.full_name,
 | 
						|
            'referrer_email': referrer.email,
 | 
						|
            'referrer_realm_name': referrer.realm.name,
 | 
						|
        })
 | 
						|
        with self.settings(EMAIL_BACKEND='django.core.mail.backends.console.EmailBackend'):
 | 
						|
            send_future_email(
 | 
						|
                "zerver/emails/invitation_reminder", data["email"],
 | 
						|
                from_address=settings.ZULIP_ADMINISTRATOR.split()[-1], context=context)
 | 
						|
        email_jobs_to_deliver = ScheduledJob.objects.filter(
 | 
						|
            type=ScheduledJob.EMAIL,
 | 
						|
            scheduled_timestamp__lte=timezone_now())
 | 
						|
        self.assertEqual(len(email_jobs_to_deliver), 1)
 | 
						|
        email_count = len(outbox)
 | 
						|
        for job in email_jobs_to_deliver:
 | 
						|
            self.assertTrue(send_email(**ujson.loads(job.data)))
 | 
						|
        self.assertEqual(len(outbox), email_count + 1)
 | 
						|
 | 
						|
class InviteeEmailsParserTests(TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.email1 = "email1@zulip.com"
 | 
						|
        self.email2 = "email2@zulip.com"
 | 
						|
        self.email3 = "email3@zulip.com"
 | 
						|
 | 
						|
    def test_if_emails_separated_by_commas_are_parsed_and_striped_correctly(self):
 | 
						|
        # type: () -> None
 | 
						|
        emails_raw = "{} ,{}, {}".format(self.email1, self.email2, self.email3)
 | 
						|
        expected_set = {self.email1, self.email2, self.email3}
 | 
						|
        self.assertEqual(get_invitee_emails_set(emails_raw), expected_set)
 | 
						|
 | 
						|
    def test_if_emails_separated_by_newlines_are_parsed_and_striped_correctly(self):
 | 
						|
        # type: () -> None
 | 
						|
        emails_raw = "{}\n {}\n {} ".format(self.email1, self.email2, self.email3)
 | 
						|
        expected_set = {self.email1, self.email2, self.email3}
 | 
						|
        self.assertEqual(get_invitee_emails_set(emails_raw), expected_set)
 | 
						|
 | 
						|
    def test_if_emails_from_email_client_separated_by_newlines_are_parsed_correctly(self):
 | 
						|
        # type: () -> None
 | 
						|
        emails_raw = "Email One <{}>\nEmailTwo<{}>\nEmail Three<{}>".format(self.email1, self.email2, self.email3)
 | 
						|
        expected_set = {self.email1, self.email2, self.email3}
 | 
						|
        self.assertEqual(get_invitee_emails_set(emails_raw), expected_set)
 | 
						|
 | 
						|
    def test_if_emails_in_mixed_style_are_parsed_correctly(self):
 | 
						|
        # type: () -> None
 | 
						|
        emails_raw = "Email One <{}>,EmailTwo<{}>\n{}".format(self.email1, self.email2, self.email3)
 | 
						|
        expected_set = {self.email1, self.email2, self.email3}
 | 
						|
        self.assertEqual(get_invitee_emails_set(emails_raw), expected_set)
 | 
						|
 | 
						|
 | 
						|
class EmailUnsubscribeTests(ZulipTestCase):
 | 
						|
    def test_error_unsubscribe(self):
 | 
						|
        # type: () -> None
 | 
						|
 | 
						|
        # An invalid insubscribe token "test123" produces an error.
 | 
						|
        result = self.client_get('/accounts/unsubscribe/missed_messages/test123')
 | 
						|
        self.assert_in_response('Unknown email unsubscribe request', result)
 | 
						|
 | 
						|
        # An unknown message type "fake" produces an error.
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "fake")
 | 
						|
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
 | 
						|
        self.assert_in_response('Unknown email unsubscribe request', result)
 | 
						|
 | 
						|
    def test_missedmessage_unsubscribe(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        We provide one-click unsubscribe links in missed message
 | 
						|
        e-mails that you can click even when logged out to update your
 | 
						|
        email notification settings.
 | 
						|
        """
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        user_profile.enable_offline_email_notifications = True
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        unsubscribe_link = one_click_unsubscribe_link(user_profile,
 | 
						|
                                                      "missed_messages")
 | 
						|
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
 | 
						|
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
        # Circumvent user_profile caching.
 | 
						|
        user_profile = UserProfile.objects.get(email=self.example_email("hamlet"))
 | 
						|
        self.assertFalse(user_profile.enable_offline_email_notifications)
 | 
						|
 | 
						|
    def test_welcome_unsubscribe(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        We provide one-click unsubscribe links in welcome e-mails that you can
 | 
						|
        click even when logged out to stop receiving them.
 | 
						|
        """
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
 | 
						|
        # Simulate a new user signing up, which enqueues 2 welcome e-mails.
 | 
						|
        enqueue_welcome_emails(email, "King Hamlet")
 | 
						|
        self.assertEqual(2, len(ScheduledJob.objects.filter(
 | 
						|
            type=ScheduledJob.EMAIL, filter_string__iexact=email)))
 | 
						|
 | 
						|
        # Simulate unsubscribing from the welcome e-mails.
 | 
						|
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "welcome")
 | 
						|
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
 | 
						|
 | 
						|
        # The welcome email jobs are no longer scheduled.
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
        self.assertEqual(0, len(ScheduledJob.objects.filter(
 | 
						|
            type=ScheduledJob.EMAIL, filter_string__iexact=email)))
 | 
						|
 | 
						|
    def test_digest_unsubscribe(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        We provide one-click unsubscribe links in digest e-mails that you can
 | 
						|
        click even when logged out to stop receiving them.
 | 
						|
 | 
						|
        Unsubscribing from these emails also dequeues any digest email jobs that
 | 
						|
        have been queued.
 | 
						|
        """
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        self.assertTrue(user_profile.enable_digest_emails)
 | 
						|
 | 
						|
        # Enqueue a fake digest email.
 | 
						|
        context = {'name': '', 'realm_uri': '', 'unread_pms': [], 'hot_conversations': [],
 | 
						|
                   'new_users': [], 'new_streams': {'plain': []}, 'unsubscribe_link': ''}
 | 
						|
        send_future_email('zerver/emails/digest', display_email(user_profile), context=context)
 | 
						|
 | 
						|
        self.assertEqual(1, len(ScheduledJob.objects.filter(
 | 
						|
            type=ScheduledJob.EMAIL, filter_string__iexact=email)))
 | 
						|
 | 
						|
        # Simulate unsubscribing from digest e-mails.
 | 
						|
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "digest")
 | 
						|
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
 | 
						|
 | 
						|
        # The setting is toggled off, and scheduled jobs have been removed.
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
        # Circumvent user_profile caching.
 | 
						|
        user_profile = UserProfile.objects.get(email=self.example_email("hamlet"))
 | 
						|
        self.assertFalse(user_profile.enable_digest_emails)
 | 
						|
        self.assertEqual(0, len(ScheduledJob.objects.filter(
 | 
						|
            type=ScheduledJob.EMAIL, filter_string__iexact=email)))
 | 
						|
 | 
						|
class RealmCreationTest(ZulipTestCase):
 | 
						|
 | 
						|
    def test_create_realm(self):
 | 
						|
        # type: () -> None
 | 
						|
        password = "test"
 | 
						|
        string_id = "zuliptest"
 | 
						|
        email = "user1@test.com"
 | 
						|
        realm = get_realm('test')
 | 
						|
 | 
						|
        # Make sure the realm does not exist
 | 
						|
        self.assertIsNone(realm)
 | 
						|
 | 
						|
        with self.settings(OPEN_REALM_CREATION=True):
 | 
						|
            # Create new realm with the email
 | 
						|
            result = self.client_post('/create_realm/', {'email': email})
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
            self.assertTrue(result["Location"].endswith(
 | 
						|
                "/accounts/send_confirm/%s" % (email,)))
 | 
						|
            result = self.client_get(result["Location"])
 | 
						|
            self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
 | 
						|
            # Visit the confirmation link.
 | 
						|
            confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
            result = self.client_get(confirmation_url)
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
            result = self.submit_reg_form_for_user(email, password, realm_subdomain=string_id)
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
 | 
						|
            # Make sure the realm is created
 | 
						|
            realm = get_realm(string_id)
 | 
						|
            self.assertIsNotNone(realm)
 | 
						|
            self.assertEqual(realm.string_id, string_id)
 | 
						|
            self.assertEqual(get_user(email, realm).realm, realm)
 | 
						|
 | 
						|
            # Check defaults
 | 
						|
            self.assertEqual(realm.org_type, Realm.CORPORATE)
 | 
						|
            self.assertEqual(realm.restricted_to_domain, False)
 | 
						|
            self.assertEqual(realm.invite_required, True)
 | 
						|
 | 
						|
            self.assertTrue(result["Location"].endswith("/"))
 | 
						|
 | 
						|
            # Check welcome messages
 | 
						|
            for stream_name in ['general', 'social', 'zulip']:
 | 
						|
                stream = get_stream(stream_name, realm)
 | 
						|
                recipient = get_recipient(Recipient.STREAM, stream.id)
 | 
						|
                messages = Message.objects.filter(recipient=recipient)
 | 
						|
                self.assertEqual(len(messages), 1)
 | 
						|
                self.assertIn('Welcome to', messages[0].content)
 | 
						|
 | 
						|
    def test_create_realm_existing_email(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Trying to create a realm with an existing email should just redirect to
 | 
						|
        a login page.
 | 
						|
        """
 | 
						|
        with self.settings(OPEN_REALM_CREATION=True):
 | 
						|
            email = self.example_email("hamlet")
 | 
						|
            result = self.client_post('/create_realm/', {'email': email})
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
            self.assertIn('login', result['Location'])
 | 
						|
 | 
						|
    def test_create_realm_no_creation_key(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Trying to create a realm without a creation_key should fail when
 | 
						|
        OPEN_REALM_CREATION is false.
 | 
						|
        """
 | 
						|
        email = "user1@test.com"
 | 
						|
        realm = get_realm('test')
 | 
						|
 | 
						|
        # Make sure the realm does not exist
 | 
						|
        self.assertIsNone(realm)
 | 
						|
 | 
						|
        with self.settings(OPEN_REALM_CREATION=False):
 | 
						|
            # Create new realm with the email, but no creation key.
 | 
						|
            result = self.client_post('/create_realm/', {'email': email})
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
            self.assert_in_response('New organization creation disabled.', result)
 | 
						|
 | 
						|
    def test_create_realm_with_subdomain(self):
 | 
						|
        # type: () -> None
 | 
						|
        password = "test"
 | 
						|
        string_id = "zuliptest"
 | 
						|
        email = "user1@test.com"
 | 
						|
        realm_name = "Test"
 | 
						|
 | 
						|
        # Make sure the realm does not exist
 | 
						|
        self.assertIsNone(get_realm('test'))
 | 
						|
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=True), self.settings(OPEN_REALM_CREATION=True):
 | 
						|
            # Create new realm with the email
 | 
						|
            result = self.client_post('/create_realm/', {'email': email})
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
            self.assertTrue(result["Location"].endswith(
 | 
						|
                "/accounts/send_confirm/%s" % (email,)))
 | 
						|
            result = self.client_get(result["Location"])
 | 
						|
            self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
 | 
						|
            # Visit the confirmation link.
 | 
						|
            confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
            result = self.client_get(confirmation_url)
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
            result = self.submit_reg_form_for_user(email, password,
 | 
						|
                                                   realm_subdomain = string_id,
 | 
						|
                                                   realm_name=realm_name,
 | 
						|
                                                   # Pass HTTP_HOST for the target subdomain
 | 
						|
                                                   HTTP_HOST=string_id + ".testserver")
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
 | 
						|
            # Make sure the realm is created
 | 
						|
            realm = get_realm(string_id)
 | 
						|
            self.assertIsNotNone(realm)
 | 
						|
            self.assertEqual(realm.string_id, string_id)
 | 
						|
            self.assertEqual(get_user(email, realm).realm, realm)
 | 
						|
 | 
						|
            self.assertEqual(realm.name, realm_name)
 | 
						|
            self.assertEqual(realm.subdomain, string_id)
 | 
						|
 | 
						|
    def test_mailinator_signup(self):
 | 
						|
        # type: () -> None
 | 
						|
        with self.settings(OPEN_REALM_CREATION=True):
 | 
						|
            result = self.client_post('/create_realm/', {'email': "hi@mailinator.com"})
 | 
						|
            self.assert_in_response('Please use your real email address.', result)
 | 
						|
 | 
						|
    def test_subdomain_restrictions(self):
 | 
						|
        # type: () -> None
 | 
						|
        password = "test"
 | 
						|
        email = "user1@test.com"
 | 
						|
        realm_name = "Test"
 | 
						|
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=False), self.settings(OPEN_REALM_CREATION=True):
 | 
						|
            result = self.client_post('/create_realm/', {'email': email})
 | 
						|
            self.client_get(result["Location"])
 | 
						|
            confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
            self.client_get(confirmation_url)
 | 
						|
 | 
						|
            errors = {'id': "at least 3 characters",
 | 
						|
                      '-id': "cannot start or end with a",
 | 
						|
                      'string-ID': "lowercase letters",
 | 
						|
                      'string_id': "lowercase letters",
 | 
						|
                      'stream': "unavailable",
 | 
						|
                      'streams': "unavailable",
 | 
						|
                      'about': "unavailable",
 | 
						|
                      'abouts': "unavailable",
 | 
						|
                      'zephyr': "unavailable"}
 | 
						|
            for string_id, error_msg in errors.items():
 | 
						|
                result = self.submit_reg_form_for_user(email, password,
 | 
						|
                                                       realm_subdomain = string_id,
 | 
						|
                                                       realm_name = realm_name)
 | 
						|
                self.assert_in_response(error_msg, result)
 | 
						|
 | 
						|
            # test valid subdomain
 | 
						|
            result = self.submit_reg_form_for_user(email, password,
 | 
						|
                                                   realm_subdomain = 'a-0',
 | 
						|
                                                   realm_name = realm_name)
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
 | 
						|
class UserSignUpTest(ZulipTestCase):
 | 
						|
 | 
						|
    def test_user_default_language_and_timezone(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Check if the default language of new user is the default language
 | 
						|
        of the realm.
 | 
						|
        """
 | 
						|
        email = self.nonreg_email('newguy')
 | 
						|
        password = "newpassword"
 | 
						|
        timezone = "US/Mountain"
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        do_set_realm_property(realm, 'default_language', u"de")
 | 
						|
 | 
						|
        result = self.client_post('/accounts/home/', {'email': email})
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
 | 
						|
        # Visit the confirmation link.
 | 
						|
        confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        # Pick a password and agree to the ToS.
 | 
						|
        result = self.submit_reg_form_for_user(email, password, timezone=timezone)
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
 | 
						|
        user_profile = self.nonreg_user('newguy')
 | 
						|
        self.assertEqual(user_profile.default_language, realm.default_language)
 | 
						|
        self.assertEqual(user_profile.timezone, timezone)
 | 
						|
        from django.core.mail import outbox
 | 
						|
        outbox.pop()
 | 
						|
 | 
						|
    def test_signup_already_active(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Check if signing up with an active email redirects to a login page.
 | 
						|
        """
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        result = self.client_post('/accounts/home/', {'email': email})
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertIn('login', result['Location'])
 | 
						|
 | 
						|
    def test_signup_invalid_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Check if an invalid name during signup is handled properly.
 | 
						|
        """
 | 
						|
        email = "newguy@zulip.com"
 | 
						|
        password = "newpassword"
 | 
						|
 | 
						|
        result = self.client_post('/accounts/home/', {'email': email})
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
 | 
						|
        # Visit the confirmation link.
 | 
						|
        confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        # Pick a password and agree to the ToS.
 | 
						|
        result = self.submit_reg_form_for_user(email, password, full_name="<invalid>")
 | 
						|
        self.assert_in_success_response(["Invalid characters in name!"], result)
 | 
						|
 | 
						|
    def test_signup_without_password(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Check if signing up without a password works properly when
 | 
						|
        password_auth_enabled is False.
 | 
						|
        """
 | 
						|
 | 
						|
        email = self.nonreg_email('newuser')
 | 
						|
 | 
						|
        result = self.client_post('/accounts/home/', {'email': email})
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
 | 
						|
        # Visit the confirmation link.
 | 
						|
        confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        with patch('zerver.views.registration.password_auth_enabled', return_value=False):
 | 
						|
            result = self.client_post(
 | 
						|
                '/accounts/register/',
 | 
						|
                {'full_name': 'New User',
 | 
						|
                 'realm_name': 'Zulip Test',
 | 
						|
                 'realm_subdomain': 'zuliptest',
 | 
						|
                 'key': find_key_by_email(email),
 | 
						|
                 'realm_org_type': Realm.CORPORATE,
 | 
						|
                 'terms': True})
 | 
						|
 | 
						|
        # User should now be logged in.
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        user_profile = self.nonreg_user('newuser')
 | 
						|
        self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
 | 
						|
 | 
						|
    def test_signup_without_full_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Check if signing up without a full name redirects to a registration
 | 
						|
        form.
 | 
						|
        """
 | 
						|
        email = "newguy@zulip.com"
 | 
						|
        password = "newpassword"
 | 
						|
 | 
						|
        result = self.client_post('/accounts/home/', {'email': email})
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
 | 
						|
        # Visit the confirmation link.
 | 
						|
        confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        result = self.client_post(
 | 
						|
            '/accounts/register/',
 | 
						|
            {'password': password,
 | 
						|
             'realm_name': 'Zulip Test',
 | 
						|
             'realm_subdomain': 'zuliptest',
 | 
						|
             'key': find_key_by_email(email),
 | 
						|
             'realm_org_type': Realm.CORPORATE,
 | 
						|
             'terms': True,
 | 
						|
             'from_confirmation': '1'})
 | 
						|
        self.assert_in_success_response(["You're almost there."], result)
 | 
						|
 | 
						|
    def test_signup_with_full_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Check if signing up without a full name redirects to a registration
 | 
						|
        form.
 | 
						|
        """
 | 
						|
        email = "newguy@zulip.com"
 | 
						|
        password = "newpassword"
 | 
						|
 | 
						|
        result = self.client_post('/accounts/home/', {'email': email})
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
 | 
						|
        # Visit the confirmation link.
 | 
						|
        confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        result = self.client_post(
 | 
						|
            '/accounts/register/',
 | 
						|
            {'password': password,
 | 
						|
             'realm_name': 'Zulip Test',
 | 
						|
             'realm_subdomain': 'zuliptest',
 | 
						|
             'key': find_key_by_email(email),
 | 
						|
             'realm_org_type': Realm.CORPORATE,
 | 
						|
             'terms': True,
 | 
						|
             'full_name': "New Guy",
 | 
						|
             'from_confirmation': '1'})
 | 
						|
        self.assert_in_success_response(["You're almost there."], result)
 | 
						|
 | 
						|
    def test_signup_invalid_subdomain(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Check if attempting to authenticate to the wrong subdomain logs an
 | 
						|
        error and redirects.
 | 
						|
        """
 | 
						|
        email = "newuser@zulip.com"
 | 
						|
        password = "newpassword"
 | 
						|
 | 
						|
        result = self.client_post('/accounts/home/', {'email': email})
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
 | 
						|
        # Visit the confirmation link.
 | 
						|
        confirmation_url = self.get_confirmation_url_from_outbox(email)
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        def invalid_subdomain(**kwargs):
 | 
						|
            # type: (**Any) -> Any
 | 
						|
            return_data = kwargs.get('return_data', {})
 | 
						|
            return_data['invalid_subdomain'] = True
 | 
						|
 | 
						|
        with patch('zerver.views.registration.authenticate', side_effect=invalid_subdomain):
 | 
						|
            with patch('logging.error') as mock_error:
 | 
						|
                result = self.client_post(
 | 
						|
                    '/accounts/register/',
 | 
						|
                    {'password': password,
 | 
						|
                     'full_name': 'New User',
 | 
						|
                     'realm_name': 'Zulip Test',
 | 
						|
                     'realm_subdomain': 'zuliptest',
 | 
						|
                     'key': find_key_by_email(email),
 | 
						|
                     'realm_org_type': Realm.CORPORATE,
 | 
						|
                     'terms': True})
 | 
						|
        mock_error.assert_called_once()
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
 | 
						|
    def test_unique_completely_open_domain(self):
 | 
						|
        # type: () -> None
 | 
						|
        password = "test"
 | 
						|
        email = "user1@acme.com"
 | 
						|
        subdomain = "zulip"
 | 
						|
        realm_name = "Zulip"
 | 
						|
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        realm.restricted_to_domain = False
 | 
						|
        realm.invite_required = False
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        for string_id in ('simple', 'zephyr'):
 | 
						|
            realm = get_realm(string_id)
 | 
						|
            do_deactivate_realm(realm)
 | 
						|
            realm.save()
 | 
						|
 | 
						|
        result = self.client_post('/register/', {'email': email})
 | 
						|
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
        # Visit the confirmation link.
 | 
						|
        from django.core.mail import outbox
 | 
						|
        for message in reversed(outbox):
 | 
						|
            if email in message.to:
 | 
						|
                confirmation_link_pattern = re.compile(settings.EXTERNAL_HOST + "(\S+)>")
 | 
						|
                confirmation_url = confirmation_link_pattern.search(
 | 
						|
                    message.body).groups()[0]
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            raise AssertionError("Couldn't find a confirmation email.")
 | 
						|
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        result = self.submit_reg_form_for_user(email,
 | 
						|
                                               password,
 | 
						|
                                               realm_name=realm_name,
 | 
						|
                                               realm_subdomain=subdomain,
 | 
						|
                                               # Pass HTTP_HOST for the target subdomain
 | 
						|
                                               HTTP_HOST=subdomain + ".testserver")
 | 
						|
        self.assert_in_success_response(["You're almost there."], result)
 | 
						|
 | 
						|
    def test_completely_open_domain_success(self):
 | 
						|
        # type: () -> None
 | 
						|
        password = "test"
 | 
						|
        email = "user1@acme.com"
 | 
						|
        subdomain = "zulip"
 | 
						|
        realm_name = "Zulip"
 | 
						|
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        realm.restricted_to_domain = False
 | 
						|
        realm.invite_required = False
 | 
						|
        realm.save()
 | 
						|
 | 
						|
        result = self.client_post('/register/zulip/', {'email': email})
 | 
						|
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
        # Visit the confirmation link.
 | 
						|
        from django.core.mail import outbox
 | 
						|
        for message in reversed(outbox):
 | 
						|
            if email in message.to:
 | 
						|
                confirmation_link_pattern = re.compile(settings.EXTERNAL_HOST + "(\S+)>")
 | 
						|
                confirmation_url = confirmation_link_pattern.search(
 | 
						|
                    message.body).groups()[0]
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            raise AssertionError("Couldn't find a confirmation email.")
 | 
						|
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        result = self.submit_reg_form_for_user(email,
 | 
						|
                                               password,
 | 
						|
                                               realm_name=realm_name,
 | 
						|
                                               realm_subdomain=subdomain,
 | 
						|
                                               # Pass HTTP_HOST for the target subdomain
 | 
						|
                                               HTTP_HOST=subdomain + ".testserver")
 | 
						|
        self.assert_in_success_response(["You're almost there."], result)
 | 
						|
 | 
						|
    def test_failed_signup_with_realm_str(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Signing up with the special accounts_home_with_realm_str endpoint should
 | 
						|
        fail (i.e. redirect to the standard accounts_home) if
 | 
						|
        settings.REALMS_HAVE_SUBDOMAINS is true, or if the realm is not
 | 
						|
        completely open.
 | 
						|
        """
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        realm.restricted_to_domain = False
 | 
						|
        realm.invite_required = False
 | 
						|
        realm.save()
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=True):
 | 
						|
            email = 'user1@acme.com'
 | 
						|
            result = self.client_post('/register/zulip/', {'email': email})
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
            self.assertIn('accounts/home', result['Location'])
 | 
						|
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        realm.invite_required = True
 | 
						|
        realm.save()
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=False):
 | 
						|
            email = 'user1@acme.com'
 | 
						|
            result = self.client_post('/register/zulip/', {'email': email})
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
            self.assertIn('accounts/home', result['Location'])
 | 
						|
 | 
						|
    def test_failed_signup_due_to_restricted_domain(self):
 | 
						|
        # type: () -> None
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        realm.invite_required = False
 | 
						|
        realm.save()
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS = True):
 | 
						|
            request = HostRequestMock(host = realm.host)
 | 
						|
            request.session = {} # type: ignore
 | 
						|
            email = 'user@acme.com'
 | 
						|
            form = HomepageForm({'email': email}, realm=realm)
 | 
						|
            self.assertIn("Your email address, {}, is not in one of the domains".format(email),
 | 
						|
                          form.errors['email'][0])
 | 
						|
 | 
						|
    def test_failed_signup_due_to_invite_required(self):
 | 
						|
        # type: () -> None
 | 
						|
        realm = get_realm('zulip')
 | 
						|
        realm.invite_required = True
 | 
						|
        realm.save()
 | 
						|
        request = HostRequestMock(host = realm.host)
 | 
						|
        request.session = {} # type: ignore
 | 
						|
        email = 'user@zulip.com'
 | 
						|
        form = HomepageForm({'email': email}, realm=realm)
 | 
						|
        self.assertIn("Please request an invite for {} from".format(email),
 | 
						|
                      form.errors['email'][0])
 | 
						|
 | 
						|
    def test_failed_signup_due_to_nonexistent_realm(self):
 | 
						|
        # type: () -> None
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS = True):
 | 
						|
            request = HostRequestMock(host = 'acme.' + settings.EXTERNAL_HOST)
 | 
						|
            request.session = {} # type: ignore
 | 
						|
            email = 'user@acme.com'
 | 
						|
            form = HomepageForm({'email': email}, realm=None)
 | 
						|
            self.assertIn("organization you are trying to join using {} does "
 | 
						|
                          "not exist".format(email), form.errors['email'][0])
 | 
						|
 | 
						|
    def test_registration_through_ldap(self):
 | 
						|
        # type: () -> None
 | 
						|
        password = "testing"
 | 
						|
        email = "newuser@zulip.com"
 | 
						|
        subdomain = "zulip"
 | 
						|
        realm_name = "Zulip"
 | 
						|
        ldap_user_attr_map = {'full_name': 'fn', 'short_name': 'sn'}
 | 
						|
 | 
						|
        ldap_patcher = patch('django_auth_ldap.config.ldap.initialize')
 | 
						|
        mock_initialize = ldap_patcher.start()
 | 
						|
        mock_ldap = MockLDAP()
 | 
						|
        mock_initialize.return_value = mock_ldap
 | 
						|
 | 
						|
        mock_ldap.directory = {
 | 
						|
            'uid=newuser,ou=users,dc=zulip,dc=com': {
 | 
						|
                'userPassword': 'testing',
 | 
						|
                'fn': ['New User Name']
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        with patch('zerver.views.registration.get_subdomain', return_value=subdomain):
 | 
						|
            result = self.client_post('/register/', {'email': email})
 | 
						|
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
        # Visit the confirmation link.
 | 
						|
        from django.core.mail import outbox
 | 
						|
        for message in reversed(outbox):
 | 
						|
            if email in message.to:
 | 
						|
                confirmation_link_pattern = re.compile(settings.EXTERNAL_HOST + "(\S+)>")
 | 
						|
                confirmation_url = confirmation_link_pattern.search(
 | 
						|
                    message.body).groups()[0]
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            raise AssertionError("Couldn't find a confirmation email.")
 | 
						|
 | 
						|
        with self.settings(
 | 
						|
                POPULATE_PROFILE_VIA_LDAP=True,
 | 
						|
                LDAP_APPEND_DOMAIN='zulip.com',
 | 
						|
                AUTH_LDAP_BIND_PASSWORD='',
 | 
						|
                AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
 | 
						|
                AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',),
 | 
						|
                AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'):
 | 
						|
            result = self.client_get(confirmation_url)
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
            # The full_name should not be overriden by the value from LDAP if
 | 
						|
            # request.session['authenticated_full_name'] has not been set yet.
 | 
						|
            with patch('zerver.views.registration.name_changes_disabled', return_value=True):
 | 
						|
                result = self.submit_reg_form_for_user(email,
 | 
						|
                                                       password,
 | 
						|
                                                       full_name="Non LDAP Full Name",
 | 
						|
                                                       realm_name=realm_name,
 | 
						|
                                                       realm_subdomain=subdomain,
 | 
						|
                                                       # Pass HTTP_HOST for the target subdomain
 | 
						|
                                                       HTTP_HOST=subdomain + ".testserver")
 | 
						|
            self.assert_in_success_response(["You're almost there.",
 | 
						|
                                             "Non LDAP Full Name",
 | 
						|
                                             "newuser@zulip.com"],
 | 
						|
                                            result)
 | 
						|
 | 
						|
            # Submitting the registration form with from_confirmation='1' sets
 | 
						|
            # the value of request.session['authenticated_full_name'] from LDAP.
 | 
						|
            result = self.submit_reg_form_for_user(email,
 | 
						|
                                                   password,
 | 
						|
                                                   realm_name=realm_name,
 | 
						|
                                                   realm_subdomain=subdomain,
 | 
						|
                                                   from_confirmation='1',
 | 
						|
                                                   # Pass HTTP_HOST for the target subdomain
 | 
						|
                                                   HTTP_HOST=subdomain + ".testserver")
 | 
						|
            self.assert_in_success_response(["You're almost there.",
 | 
						|
                                             "New User Name",
 | 
						|
                                             "newuser@zulip.com"],
 | 
						|
                                            result)
 | 
						|
 | 
						|
            # The full name be populated from the value of
 | 
						|
            # request.session['authenticated_full_name'] from LDAP in the case
 | 
						|
            # where from_confirmation and name_changes_disabled are both False.
 | 
						|
            with patch('zerver.views.registration.name_changes_disabled', return_value=True):
 | 
						|
                result = self.submit_reg_form_for_user(email,
 | 
						|
                                                       password,
 | 
						|
                                                       full_name="Non LDAP Full Name",
 | 
						|
                                                       realm_name=realm_name,
 | 
						|
                                                       realm_subdomain=subdomain,
 | 
						|
                                                       # Pass HTTP_HOST for the target subdomain
 | 
						|
                                                       HTTP_HOST=subdomain + ".testserver")
 | 
						|
            self.assert_in_success_response(["You're almost there.",
 | 
						|
                                             "New User Name",
 | 
						|
                                             "newuser@zulip.com"],
 | 
						|
                                            result)
 | 
						|
 | 
						|
            # Test the TypeError exception handler
 | 
						|
            mock_ldap.directory = {
 | 
						|
                'uid=newuser,ou=users,dc=zulip,dc=com': {
 | 
						|
                    'userPassword': 'testing',
 | 
						|
                    'fn': None  # This will raise TypeError
 | 
						|
                }
 | 
						|
            }
 | 
						|
            result = self.submit_reg_form_for_user(email,
 | 
						|
                                                   password,
 | 
						|
                                                   realm_name=realm_name,
 | 
						|
                                                   realm_subdomain=subdomain,
 | 
						|
                                                   from_confirmation='1',
 | 
						|
                                                   # Pass HTTP_HOST for the target subdomain
 | 
						|
                                                   HTTP_HOST=subdomain + ".testserver")
 | 
						|
            self.assert_in_success_response(["You're almost there.",
 | 
						|
                                             "newuser@zulip.com"],
 | 
						|
                                            result)
 | 
						|
 | 
						|
    def test_realm_creation_through_ldap(self):
 | 
						|
        # type: () -> None
 | 
						|
        password = "testing"
 | 
						|
        email = "newuser@zulip.com"
 | 
						|
        subdomain = "zulip"
 | 
						|
        realm_name = "Zulip"
 | 
						|
        ldap_user_attr_map = {'full_name': 'fn', 'short_name': 'sn'}
 | 
						|
 | 
						|
        ldap_patcher = patch('django_auth_ldap.config.ldap.initialize')
 | 
						|
        mock_initialize = ldap_patcher.start()
 | 
						|
        mock_ldap = MockLDAP()
 | 
						|
        mock_initialize.return_value = mock_ldap
 | 
						|
 | 
						|
        mock_ldap.directory = {
 | 
						|
            'uid=newuser,ou=users,dc=zulip,dc=com': {
 | 
						|
                'userPassword': 'testing',
 | 
						|
                'fn': ['New User Name']
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        with patch('zerver.views.registration.get_subdomain', return_value=subdomain):
 | 
						|
            result = self.client_post('/register/', {'email': email})
 | 
						|
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
        # Visit the confirmation link.
 | 
						|
        from django.core.mail import outbox
 | 
						|
        for message in reversed(outbox):
 | 
						|
            if email in message.to:
 | 
						|
                confirmation_link_pattern = re.compile(settings.EXTERNAL_HOST + "(\S+)>")
 | 
						|
                confirmation_url = confirmation_link_pattern.search(
 | 
						|
                    message.body).groups()[0]
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            raise AssertionError("Couldn't find a confirmation email.")
 | 
						|
 | 
						|
        with self.settings(
 | 
						|
            POPULATE_PROFILE_VIA_LDAP=True,
 | 
						|
            LDAP_APPEND_DOMAIN='zulip.com',
 | 
						|
            AUTH_LDAP_BIND_PASSWORD='',
 | 
						|
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
 | 
						|
            AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',),
 | 
						|
            AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com',
 | 
						|
            TERMS_OF_SERVICE=False,
 | 
						|
        ):
 | 
						|
            result = self.client_get(confirmation_url)
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
            key = find_key_by_email(email),
 | 
						|
            confirmation = Confirmation.objects.get(confirmation_key=key[0])
 | 
						|
            prereg_user = confirmation.content_object
 | 
						|
            prereg_user.realm_creation = True
 | 
						|
            prereg_user.save()
 | 
						|
 | 
						|
            result = self.submit_reg_form_for_user(email,
 | 
						|
                                                   password,
 | 
						|
                                                   realm_name=realm_name,
 | 
						|
                                                   realm_subdomain=subdomain,
 | 
						|
                                                   from_confirmation='1',
 | 
						|
                                                   # Pass HTTP_HOST for the target subdomain
 | 
						|
                                                   HTTP_HOST=subdomain + ".testserver")
 | 
						|
            self.assert_in_success_response(["You're almost there.",
 | 
						|
                                             "newuser@zulip.com"],
 | 
						|
                                            result)
 | 
						|
 | 
						|
        mock_ldap.reset()
 | 
						|
        mock_initialize.stop()
 | 
						|
 | 
						|
    @patch('DNS.dnslookup', return_value=[['sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh']])
 | 
						|
    def test_registration_of_mirror_dummy_user(self, ignored):
 | 
						|
        # type: (Any) -> None
 | 
						|
        password = "test"
 | 
						|
        subdomain = "sipb"
 | 
						|
        realm_name = "MIT"
 | 
						|
        user_profile = self.mit_user("sipbtest")
 | 
						|
        email = user_profile.email
 | 
						|
        user_profile.is_mirror_dummy = True
 | 
						|
        user_profile.is_active = False
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        result = self.client_post('/register/', {'email': email})
 | 
						|
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertTrue(result["Location"].endswith(
 | 
						|
            "/accounts/send_confirm/%s" % (email,)))
 | 
						|
        result = self.client_get(result["Location"])
 | 
						|
        self.assert_in_response("Check your email so we can get started.", result)
 | 
						|
        # Visit the confirmation link.
 | 
						|
        from django.core.mail import outbox
 | 
						|
        for message in reversed(outbox):
 | 
						|
            if email in message.to:
 | 
						|
                confirmation_link_pattern = re.compile(settings.EXTERNAL_HOST + "(\S+)>")
 | 
						|
                confirmation_url = confirmation_link_pattern.search(
 | 
						|
                    message.body).groups()[0]
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            raise AssertionError("Couldn't find a confirmation email.")
 | 
						|
 | 
						|
        result = self.client_get(confirmation_url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        # If the mirror dummy user is already active, attempting to submit the
 | 
						|
        # registration form should just redirect to a login page.
 | 
						|
        user_profile.is_active = True
 | 
						|
        user_profile.save()
 | 
						|
        result = self.submit_reg_form_for_user(email,
 | 
						|
                                               password,
 | 
						|
                                               realm_name=realm_name,
 | 
						|
                                               realm_subdomain=subdomain,
 | 
						|
                                               from_confirmation='1',
 | 
						|
                                               # Pass HTTP_HOST for the target subdomain
 | 
						|
                                               HTTP_HOST=subdomain + ".testserver")
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertIn('login', result['Location'])
 | 
						|
        user_profile.is_active = False
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        result = self.submit_reg_form_for_user(email,
 | 
						|
                                               password,
 | 
						|
                                               realm_name=realm_name,
 | 
						|
                                               realm_subdomain=subdomain,
 | 
						|
                                               from_confirmation='1',
 | 
						|
                                               # Pass HTTP_HOST for the target subdomain
 | 
						|
                                               HTTP_HOST=subdomain + ".testserver")
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
        result = self.submit_reg_form_for_user(email,
 | 
						|
                                               password,
 | 
						|
                                               realm_name=realm_name,
 | 
						|
                                               realm_subdomain=subdomain,
 | 
						|
                                               # Pass HTTP_HOST for the target subdomain
 | 
						|
                                               HTTP_HOST=subdomain + ".testserver")
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
 | 
						|
 | 
						|
    def test_registration_of_active_mirror_dummy_user(self):
 | 
						|
        # type: (Any) -> None
 | 
						|
        """
 | 
						|
        Trying to activate an already-active mirror dummy user should just
 | 
						|
        redirect to a login page.
 | 
						|
        """
 | 
						|
        user_profile = self.mit_user("sipbtest")
 | 
						|
        email = user_profile.email
 | 
						|
        user_profile.is_mirror_dummy = True
 | 
						|
        user_profile.is_active = True
 | 
						|
        user_profile.save()
 | 
						|
 | 
						|
        result = self.client_post('/register/', {'email': email})
 | 
						|
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertIn('login', result['Location'])
 | 
						|
 | 
						|
class TestOpenRealms(ZulipTestCase):
 | 
						|
    def test_open_realm_logic(self):
 | 
						|
        # type: () -> None
 | 
						|
        realm = get_realm('simple')
 | 
						|
        do_deactivate_realm(realm)
 | 
						|
 | 
						|
        mit_realm = get_realm("zephyr")
 | 
						|
        self.assertEqual(get_unique_open_realm(), None)
 | 
						|
        mit_realm.restricted_to_domain = False
 | 
						|
        mit_realm.save()
 | 
						|
        self.assertTrue(completely_open(mit_realm))
 | 
						|
        self.assertEqual(get_unique_open_realm(), None)
 | 
						|
        with self.settings(SYSTEM_ONLY_REALMS={"zulip"}):
 | 
						|
            self.assertEqual(get_unique_open_realm(), mit_realm)
 | 
						|
        mit_realm.restricted_to_domain = True
 | 
						|
        mit_realm.save()
 | 
						|
        with self.settings(SYSTEM_ONLY_REALMS={"zulip"}):
 | 
						|
            self.assertEqual(get_unique_open_realm(), None)
 | 
						|
            self.assertEqual(get_unique_non_system_realm(), mit_realm)
 | 
						|
 | 
						|
class DeactivateUserTest(ZulipTestCase):
 | 
						|
 | 
						|
    def test_deactivate_user(self):
 | 
						|
        # type: () -> None
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        self.login(email)
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
        result = self.client_delete('/json/users/me')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        user = self.example_user('hamlet')
 | 
						|
        self.assertFalse(user.is_active)
 | 
						|
        self.login(email, fails=True)
 | 
						|
 | 
						|
    def test_do_not_deactivate_final_admin(self):
 | 
						|
        # type: () -> None
 | 
						|
        email = self.example_email("iago")
 | 
						|
        self.login(email)
 | 
						|
        user = self.example_user('iago')
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
        result = self.client_delete('/json/users/me')
 | 
						|
        self.assert_json_error(result, "Cannot deactivate the only organization administrator")
 | 
						|
        user = self.example_user('iago')
 | 
						|
        self.assertTrue(user.is_active)
 | 
						|
        self.assertTrue(user.is_realm_admin)
 | 
						|
        email = self.example_email("hamlet")
 | 
						|
        user_2 = self.example_user('hamlet')
 | 
						|
        do_change_is_admin(user_2, True)
 | 
						|
        self.assertTrue(user_2.is_realm_admin)
 | 
						|
        result = self.client_delete('/json/users/me')
 | 
						|
        self.assert_json_success(result)
 | 
						|
        do_change_is_admin(user, True)
 | 
						|
 | 
						|
class TestLoginPage(ZulipTestCase):
 | 
						|
    def test_login_page_wrong_subdomain_error(self):
 | 
						|
        # type: () -> None
 | 
						|
        result = self.client_get("/login/?subdomain=1")
 | 
						|
        self.assertIn(WRONG_SUBDOMAIN_ERROR, result.content.decode('utf8'))
 | 
						|
 | 
						|
    @patch('django.http.HttpRequest.get_host')
 | 
						|
    def test_login_page_redirects_for_root_alias(self, mock_get_host):
 | 
						|
        # type: (MagicMock) -> None
 | 
						|
        mock_get_host.return_value = 'www.testserver'
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=True,
 | 
						|
                           ROOT_SUBDOMAIN_ALIASES=['www']):
 | 
						|
            result = self.client_get("/en/login/")
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
            self.assertEqual(result.url, '/find_my_team/')
 | 
						|
 | 
						|
    @patch('django.http.HttpRequest.get_host')
 | 
						|
    def test_login_page_redirects_for_root_domain(self, mock_get_host):
 | 
						|
        # type: (MagicMock) -> None
 | 
						|
        mock_get_host.return_value = 'testserver'
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=True,
 | 
						|
                           ROOT_SUBDOMAIN_ALIASES=['www']):
 | 
						|
            result = self.client_get("/en/login/")
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
            self.assertEqual(result.url, '/find_my_team/')
 | 
						|
 | 
						|
        mock_get_host.return_value = 'www.testserver.com'
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=True,
 | 
						|
                           EXTERNAL_HOST='www.testserver.com',
 | 
						|
                           ROOT_SUBDOMAIN_ALIASES=['test']):
 | 
						|
            result = self.client_get("/en/login/")
 | 
						|
            self.assertEqual(result.status_code, 302)
 | 
						|
            self.assertEqual(result.url, '/find_my_team/')
 | 
						|
 | 
						|
    @patch('django.http.HttpRequest.get_host')
 | 
						|
    def test_login_page_works_without_subdomains(self, mock_get_host):
 | 
						|
        # type: (MagicMock) -> None
 | 
						|
        mock_get_host.return_value = 'www.testserver'
 | 
						|
        with self.settings(ROOT_SUBDOMAIN_ALIASES=['www']):
 | 
						|
            result = self.client_get("/en/login/")
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        mock_get_host.return_value = 'testserver'
 | 
						|
        with self.settings(ROOT_SUBDOMAIN_ALIASES=['www']):
 | 
						|
            result = self.client_get("/en/login/")
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
class TestFindMyTeam(ZulipTestCase):
 | 
						|
    def test_template(self):
 | 
						|
        # type: () -> None
 | 
						|
        result = self.client_get('/find_my_team/')
 | 
						|
        self.assertIn("Find your team", result.content.decode('utf8'))
 | 
						|
 | 
						|
    def test_result(self):
 | 
						|
        # type: () -> None
 | 
						|
        url = '/find_my_team/?emails=iago@zulip.com,cordelia@zulip.com'
 | 
						|
        result = self.client_get(url)
 | 
						|
        content = result.content.decode('utf8')
 | 
						|
        self.assertIn("Emails sent! You will only receive emails", content)
 | 
						|
        self.assertIn(self.example_email("iago"), content)
 | 
						|
        self.assertIn(self.example_email("cordelia"), content)
 | 
						|
 | 
						|
    def test_find_team_ignore_invalid_email(self):
 | 
						|
        # type: () -> None
 | 
						|
        url = '/find_my_team/?emails=iago@zulip.com,invalid_email'
 | 
						|
        result = self.client_get(url)
 | 
						|
        content = result.content.decode('utf8')
 | 
						|
        self.assertIn("Emails sent! You will only receive emails", content)
 | 
						|
        self.assertIn(self.example_email("iago"), content)
 | 
						|
        self.assertNotIn("invalid_email", content)
 | 
						|
 | 
						|
    def test_find_team_zero_emails(self):
 | 
						|
        # type: () -> None
 | 
						|
        data = {'emails': ''}
 | 
						|
        result = self.client_post('/find_my_team/', data)
 | 
						|
        self.assertIn('This field is required', result.content.decode('utf8'))
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
    def test_find_team_one_email(self):
 | 
						|
        # type: () -> None
 | 
						|
        data = {'emails': self.example_email("hamlet")}
 | 
						|
        result = self.client_post('/find_my_team/', data)
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        self.assertEqual(result.url, '/find_my_team/?emails=hamlet%40zulip.com')
 | 
						|
 | 
						|
    def test_find_team_multiple_emails(self):
 | 
						|
        # type: () -> None
 | 
						|
        data = {'emails': 'hamlet@zulip.com,iago@zulip.com'}
 | 
						|
        result = self.client_post('/find_my_team/', data)
 | 
						|
        self.assertEqual(result.status_code, 302)
 | 
						|
        expected = '/find_my_team/?emails=hamlet%40zulip.com%2Ciago%40zulip.com'
 | 
						|
        self.assertEqual(result.url, expected)
 | 
						|
 | 
						|
    def test_find_team_more_than_ten_emails(self):
 | 
						|
        # type: () -> None
 | 
						|
        data = {'emails': ','.join(['hamlet-{}@zulip.com'.format(i) for i in range(11)])}
 | 
						|
        result = self.client_post('/find_my_team/', data)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
        self.assertIn("Please enter at most 10", result.content.decode('utf8'))
 | 
						|
 | 
						|
class ConfirmationKeyTest(ZulipTestCase):
 | 
						|
    def test_confirmation_key(self):
 | 
						|
        # type: () -> None
 | 
						|
        request = MagicMock()
 | 
						|
        request.session = {
 | 
						|
            'confirmation_key': {'confirmation_key': 'xyzzy'}
 | 
						|
        }
 | 
						|
        result = confirmation_key(request)
 | 
						|
        self.assert_json_success(result)
 | 
						|
        self.assert_in_response('xyzzy', result)
 | 
						|
 | 
						|
class MobileAuthOTPTest(ZulipTestCase):
 | 
						|
    def test_xor_hex_strings(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.assertEqual(xor_hex_strings('1237c81ab', '18989fd12'), '0aaf57cb9')
 | 
						|
        with self.assertRaises(AssertionError):
 | 
						|
            xor_hex_strings('1', '31')
 | 
						|
 | 
						|
    def test_is_valid_otp(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.assertEqual(is_valid_otp('1234'), False)
 | 
						|
        self.assertEqual(is_valid_otp('1234abcd' * 8), True)
 | 
						|
        self.assertEqual(is_valid_otp('1234abcZ' * 8), False)
 | 
						|
 | 
						|
    def test_ascii_to_hex(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.assertEqual(ascii_to_hex('ZcdR1234'), '5a63645231323334')
 | 
						|
        self.assertEqual(hex_to_ascii('5a63645231323334'), 'ZcdR1234')
 | 
						|
 | 
						|
    def test_otp_encrypt_api_key(self):
 | 
						|
        # type: () -> None
 | 
						|
        hamlet = self.example_user('hamlet')
 | 
						|
        hamlet.api_key = '12ac' * 8
 | 
						|
        otp = '7be38894' * 8
 | 
						|
        result = otp_encrypt_api_key(hamlet, otp)
 | 
						|
        self.assertEqual(result, '4ad1e9f7' * 8)
 | 
						|
 | 
						|
        decryped = otp_decrypt_api_key(result, otp)
 | 
						|
        self.assertEqual(decryped, hamlet.api_key)
 | 
						|
 | 
						|
class LoginOrAskForRegistrationTestCase(ZulipTestCase):
 | 
						|
    def test_confirm(self):
 | 
						|
        # type: () -> None
 | 
						|
        request = POSTRequestMock({}, None)
 | 
						|
        email = 'new@zulip.com'
 | 
						|
        user_profile = None
 | 
						|
        full_name = 'New User'
 | 
						|
        invalid_subdomain = False
 | 
						|
        result = login_or_register_remote_user(
 | 
						|
            request,
 | 
						|
            email,
 | 
						|
            user_profile,
 | 
						|
            full_name=full_name,
 | 
						|
            invalid_subdomain=invalid_subdomain)
 | 
						|
        self.assert_in_response('You attempted to login using the email account',
 | 
						|
                                result)
 | 
						|
        self.assert_in_response('new@zulip.com, but this email address does not',
 | 
						|
                                result)
 | 
						|
 | 
						|
    def test_invalid_subdomain(self):
 | 
						|
        # type: () -> None
 | 
						|
        request = POSTRequestMock({}, None)
 | 
						|
        email = 'new@zulip.com'
 | 
						|
        user_profile = None
 | 
						|
        full_name = 'New User'
 | 
						|
        invalid_subdomain = True
 | 
						|
        response = login_or_register_remote_user(
 | 
						|
            request,
 | 
						|
            email,
 | 
						|
            user_profile,
 | 
						|
            full_name=full_name,
 | 
						|
            invalid_subdomain=invalid_subdomain)
 | 
						|
        self.assertEqual(response.status_code, 302)
 | 
						|
        self.assertIn('/accounts/login/?subdomain=1', response.url)
 | 
						|
 | 
						|
    def test_invalid_email(self):
 | 
						|
        # type: () -> None
 | 
						|
        request = POSTRequestMock({}, None)
 | 
						|
        email = None
 | 
						|
        user_profile = None
 | 
						|
        full_name = 'New User'
 | 
						|
        invalid_subdomain = False
 | 
						|
        response = login_or_register_remote_user(
 | 
						|
            request,
 | 
						|
            email,
 | 
						|
            user_profile,
 | 
						|
            full_name=full_name,
 | 
						|
            invalid_subdomain=invalid_subdomain)
 | 
						|
        self.assert_in_response('Please click the following button if '
 | 
						|
                                'you wish to register', response)
 | 
						|
 | 
						|
    def test_login_under_subdomains(self):
 | 
						|
        # type: () -> None
 | 
						|
        request = POSTRequestMock({}, None)
 | 
						|
        setattr(request, 'session', self.client.session)
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        user_profile.backend = 'zproject.backends.GitHubAuthBackend'
 | 
						|
        full_name = 'Hamlet'
 | 
						|
        invalid_subdomain = False
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=True):
 | 
						|
            response = login_or_register_remote_user(
 | 
						|
                request,
 | 
						|
                user_profile.email,
 | 
						|
                user_profile,
 | 
						|
                full_name=full_name,
 | 
						|
                invalid_subdomain=invalid_subdomain)
 | 
						|
            user_id = get_session_dict_user(getattr(request, 'session'))
 | 
						|
            self.assertEqual(user_id, user_profile.id)
 | 
						|
            self.assertEqual(response.status_code, 302)
 | 
						|
            self.assertIn('http://zulip.testserver', response.url)
 | 
						|
 | 
						|
    def test_login_without_subdomains(self):
 | 
						|
        # type: () -> None
 | 
						|
        request = POSTRequestMock({}, None)
 | 
						|
        setattr(request, 'session', self.client.session)
 | 
						|
        setattr(request, 'get_host', lambda: 'localhost')
 | 
						|
        user_profile = self.example_user('hamlet')
 | 
						|
        user_profile.backend = 'zproject.backends.GitHubAuthBackend'
 | 
						|
        full_name = 'Hamlet'
 | 
						|
        invalid_subdomain = False
 | 
						|
        with self.settings(REALMS_HAVE_SUBDOMAINS=False):
 | 
						|
            response = login_or_register_remote_user(
 | 
						|
                request,
 | 
						|
                user_profile.email,
 | 
						|
                user_profile,
 | 
						|
                full_name=full_name,
 | 
						|
                invalid_subdomain=invalid_subdomain)
 | 
						|
            user_id = get_session_dict_user(getattr(request, 'session'))
 | 
						|
            self.assertEqual(user_id, user_profile.id)
 | 
						|
            self.assertEqual(response.status_code, 302)
 | 
						|
            self.assertIn('http://localhost', response.url)
 |