",
            )
class InitialPasswordTest(ZulipTestCase):
    def test_none_initial_password_salt(self) -> None:
        with self.settings(INITIAL_PASSWORD_SALT=None):
            self.assertIsNone(initial_password("test@test.com"))
class PasswordResetTest(ZulipTestCase):
    """
    Log in, reset password, log out, log in with new password.
    """
    def get_reset_mail_body(self, subdomain: str = "zulip") -> str:
        from django.core.mail import outbox
        [message] = outbox
        self.assertEqual(self.email_envelope_from(message), settings.NOREPLY_EMAIL_ADDRESS)
        # The email might be sent in different languages for i18n testing
        self.assertRegex(
            self.email_display_from(message),
            rf'^{_("Zulip Account Security")} <{self.TOKENIZED_NOREPLY_REGEX}>\Z',
        )
        self.assertIn(f"{subdomain}.testserver", message.extra_headers["List-Id"])
        return str(message.body)
    def test_password_reset(self) -> None:
        user = self.example_user("hamlet")
        email = user.delivery_email
        old_password = initial_password(email)
        assert old_password is not None
        self.login_user(user)
        # test password reset template
        result = self.client_get("/accounts/password/reset/")
        self.assert_in_response("Reset your password", result)
        # start the password reset process by supplying an email address
        result = self.client_post("/accounts/password/reset/", {"email": email})
        # check the redirect link telling you to check mail for password reset link
        self.assertEqual(result.status_code, 302)
        self.assertTrue(result["Location"].endswith("/accounts/password/reset/done/"))
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email in a few minutes to finish the process.", result)
        # Check that the password reset email is from a noreply address.
        body = self.get_reset_mail_body()
        self.assertIn("reset your password", body)
        # Visit the password reset link.
        password_reset_url = self.get_confirmation_url_from_outbox(
            email, url_pattern=settings.EXTERNAL_HOST + r"(\S\S+)"
        )
        result = self.client_get(password_reset_url)
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(f"/{PasswordResetConfirmView.reset_url_token}/")
        )
        final_reset_url = result["Location"]
        result = self.client_get(final_reset_url)
        self.assertEqual(result.status_code, 200)
        # Reset your password
        with self.settings(PASSWORD_MIN_LENGTH=3, PASSWORD_MIN_GUESSES=1000):
            # Verify weak passwords don't work.
            result = self.client_post(
                final_reset_url, {"new_password1": "easy", "new_password2": "easy"}
            )
            self.assert_in_response("The password is too weak.", result)
            result = self.client_post(
                final_reset_url, {"new_password1": "f657gdGGk9", "new_password2": "f657gdGGk9"}
            )
            # password reset succeeded
            self.assertEqual(result.status_code, 302)
            self.assertTrue(result["Location"].endswith("/password/done/"))
            # log back in with new password
            self.login_by_email(email, password="f657gdGGk9")
            user_profile = self.example_user("hamlet")
            self.assert_logged_in_user_id(user_profile.id)
            # make sure old password no longer works
            self.assert_login_failure(email, password=old_password)
    @patch("django.http.HttpRequest.get_host")
    def test_password_reset_page_redirects_for_root_alias_when_root_domain_landing_page_is_enabled(
        self, mock_get_host: MagicMock
    ) -> None:
        mock_get_host.return_value = "alias.testserver"
        with self.settings(ROOT_DOMAIN_LANDING_PAGE=True, ROOT_SUBDOMAIN_ALIASES=["alias"]):
            result = self.client_get("/accounts/password/reset/")
            self.assertEqual(result.status_code, 302)
            self.assertEqual(
                result["Location"], "/accounts/go/?next=%2Faccounts%2Fpassword%2Freset%2F"
            )
        mock_get_host.return_value = "www.testserver"
        with self.settings(
            ROOT_DOMAIN_LANDING_PAGE=True,
        ):
            result = self.client_get("/accounts/password/reset/")
            self.assertEqual(result.status_code, 302)
            self.assertEqual(
                result["Location"], "/accounts/go/?next=%2Faccounts%2Fpassword%2Freset%2F"
            )
    @patch("django.http.HttpRequest.get_host")
    def test_password_reset_page_redirects_for_root_domain_when_root_domain_landing_page_is_enabled(
        self, mock_get_host: MagicMock
    ) -> None:
        mock_get_host.return_value = "testserver"
        with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
            result = self.client_get("/accounts/password/reset/")
            self.assertEqual(result.status_code, 302)
            self.assertEqual(
                result["Location"], "/accounts/go/?next=%2Faccounts%2Fpassword%2Freset%2F"
            )
        mock_get_host.return_value = "www.zulip.example.com"
        with self.settings(
            ROOT_DOMAIN_LANDING_PAGE=True,
            EXTERNAL_HOST="www.zulip.example.com",
        ):
            result = self.client_get("/accounts/password/reset/")
            self.assertEqual(result.status_code, 302)
            self.assertEqual(
                result["Location"], "/accounts/go/?next=%2Faccounts%2Fpassword%2Freset%2F"
            )
    @patch("django.http.HttpRequest.get_host")
    def test_password_reset_page_works_for_root_alias_when_root_domain_landing_page_is_not_enabled(
        self, mock_get_host: MagicMock
    ) -> None:
        mock_get_host.return_value = "alias.testserver"
        with self.settings(ROOT_SUBDOMAIN_ALIASES=["alias"]):
            result = self.client_get("/accounts/password/reset/")
            self.assertEqual(result.status_code, 200)
        mock_get_host.return_value = "www.testserver"
        result = self.client_get("/accounts/password/reset/")
        self.assertEqual(result.status_code, 200)
    @patch("django.http.HttpRequest.get_host")
    def test_password_reset_page_works_for_root_domain_when_root_domain_landing_page_is_not_enabled(
        self, mock_get_host: MagicMock
    ) -> None:
        mock_get_host.return_value = "testserver"
        result = self.client_get("/accounts/password/reset/")
        self.assertEqual(result.status_code, 200)
        mock_get_host.return_value = "www.zulip.example.com"
        with self.settings(EXTERNAL_HOST="www.zulip.example.com", ROOT_SUBDOMAIN_ALIASES=[]):
            result = self.client_get("/accounts/password/reset/")
            self.assertEqual(result.status_code, 200)
    @patch("django.http.HttpRequest.get_host")
    def test_password_reset_page_works_always_for_subdomains(
        self, mock_get_host: MagicMock
    ) -> None:
        mock_get_host.return_value = "lear.testserver"
        with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
            result = self.client_get("/accounts/password/reset/")
            self.assertEqual(result.status_code, 200)
        result = self.client_get("/accounts/password/reset/")
        self.assertEqual(result.status_code, 200)
    def test_password_reset_for_non_existent_user(self) -> None:
        email = "nonexisting@mars.com"
        # start the password reset process by supplying an email address
        result = self.client_post("/accounts/password/reset/", {"email": email})
        # check the redirect link telling you to check mail for password reset link
        self.assertEqual(result.status_code, 302)
        self.assertTrue(result["Location"].endswith("/accounts/password/reset/done/"))
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email in a few minutes to finish the process.", result)
        # Check that the password reset email is from a noreply address.
        body = self.get_reset_mail_body()
        self.assertIn("Somebody (possibly you) requested a new password", body)
        self.assertIn("You do not have an account", body)
        self.assertIn("safely ignore", body)
        self.assertNotIn("reset your password", body)
        self.assertNotIn("deactivated", body)
    def test_password_reset_for_deactivated_user(self) -> None:
        user_profile = self.example_user("hamlet")
        email = user_profile.delivery_email
        do_deactivate_user(user_profile, acting_user=None)
        # start the password reset process by supplying an email address
        result = self.client_post("/accounts/password/reset/", {"email": email})
        # check the redirect link telling you to check mail for password reset link
        self.assertEqual(result.status_code, 302)
        self.assertTrue(result["Location"].endswith("/accounts/password/reset/done/"))
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email in a few minutes to finish the process.", result)
        # Check that the password reset email is from a noreply address.
        body = self.get_reset_mail_body()
        self.assertIn("Somebody (possibly you) requested a new password", body)
        self.assertIn("has been deactivated", body)
        self.assertIn("safely ignore", body)
        self.assertNotIn("reset your password", body)
        self.assertNotIn("not have an account", body)
    def test_password_reset_with_deactivated_realm(self) -> None:
        user_profile = self.example_user("hamlet")
        email = user_profile.delivery_email
        do_deactivate_realm(user_profile.realm, acting_user=None)
        # start the password reset process by supplying an email address
        with self.assertLogs(level="INFO") as m:
            result = self.client_post("/accounts/password/reset/", {"email": email})
            self.assertEqual(m.output, ["INFO:root:Realm is deactivated"])
        # check the redirect link telling you to check mail for password reset link
        self.assertEqual(result.status_code, 302)
        self.assertTrue(result["Location"].endswith("/accounts/password/reset/done/"))
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email in a few minutes to finish the process.", result)
        # Check that the password reset email is from a noreply address.
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
    @ratelimit_rule(10, 2, domain="password_reset_form_by_email")
    def test_rate_limiting(self) -> None:
        user_profile = self.example_user("hamlet")
        email = user_profile.delivery_email
        from django.core.mail import outbox
        start_time = time.time()
        with patch("time.time", return_value=start_time):
            self.client_post("/accounts/password/reset/", {"email": email})
            self.client_post("/accounts/password/reset/", {"email": email})
            self.assert_length(outbox, 2)
            # Too many password reset emails sent to the address, we won't send more.
            with self.assertLogs(level="INFO") as info_logs:
                self.client_post("/accounts/password/reset/", {"email": email})
            self.assertEqual(
                info_logs.output,
                [
                    "INFO:root:Too many password reset attempts for email hamlet@zulip.com from 127.0.0.1"
                ],
            )
            self.assert_length(outbox, 2)
            # Resetting for a different address works though.
            self.client_post("/accounts/password/reset/", {"email": self.example_email("othello")})
            self.assert_length(outbox, 3)
            self.client_post("/accounts/password/reset/", {"email": self.example_email("othello")})
            self.assert_length(outbox, 4)
        # After time, password reset emails can be sent again.
        with patch("time.time", return_value=start_time + 11):
            self.client_post("/accounts/password/reset/", {"email": email})
            self.client_post("/accounts/password/reset/", {"email": email})
            self.assert_length(outbox, 6)
    def test_wrong_subdomain(self) -> None:
        email = self.example_email("hamlet")
        # start the password reset process by supplying an email address
        result = self.client_post("/accounts/password/reset/", {"email": email}, subdomain="zephyr")
        # check the redirect link telling you to check mail for password reset link
        self.assertEqual(result.status_code, 302)
        self.assertTrue(result["Location"].endswith("/accounts/password/reset/done/"))
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email in a few minutes to finish the process.", result)
        body = self.get_reset_mail_body("zephyr")
        self.assertIn("Somebody (possibly you) requested a new password", body)
        self.assertIn("You do not have an account", body)
        self.assertIn(
            "active accounts in the following organization(s).\nhttp://zulip.testserver", body
        )
        self.assertIn("safely ignore", body)
        self.assertNotIn("reset your password", body)
        self.assertNotIn("deactivated", body)
    def test_wrong_subdomain_i18n(self) -> None:
        user_profile = self.example_user("hamlet")
        email = user_profile.delivery_email
        # Send a password reset request with a different language to a wrong subdomain
        result = self.client_post(
            "/accounts/password/reset/",
            {"email": email},
            HTTP_ACCEPT_LANGUAGE="de",
            subdomain="lear",
        )
        self.assertEqual(result.status_code, 302)
        with translation.override("de"):
            body = self.get_reset_mail_body("lear")
            self.assertIn("hat ein neues Passwort", body)
    def test_invalid_subdomain(self) -> None:
        email = self.example_email("hamlet")
        # start the password reset process by supplying an email address
        result = self.client_post(
            "/accounts/password/reset/", {"email": email}, subdomain="invalid"
        )
        # check the redirect link telling you to check mail for password reset link
        self.assertEqual(result.status_code, 404)
        self.assert_in_response("There is no Zulip organization hosted at this subdomain.", result)
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_auth_only(self) -> None:
        """If the email auth backend is not enabled, password reset should do nothing"""
        email = self.example_email("hamlet")
        with self.assertLogs(level="INFO") as m:
            result = self.client_post("/accounts/password/reset/", {"email": email})
            self.assertEqual(
                m.output,
                [
                    "INFO:root:Password reset attempted for hamlet@zulip.com even though password auth is disabled."
                ],
            )
        # check the redirect link telling you to check mail for password reset link
        self.assertEqual(result.status_code, 302)
        self.assertTrue(result["Location"].endswith("/accounts/password/reset/done/"))
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email in a few minutes to finish the process.", result)
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.EmailAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_and_email_auth(self) -> None:
        """If both email and LDAP auth backends are enabled, limit password
        reset to users outside the LDAP domain"""
        # If the domain matches, we don't generate an email
        with self.settings(LDAP_APPEND_DOMAIN="zulip.com"):
            email = self.example_email("hamlet")
            with self.assertLogs(level="INFO") as m:
                result = self.client_post("/accounts/password/reset/", {"email": email})
                self.assertEqual(
                    m.output, ["INFO:root:Password reset not allowed for user in LDAP domain"]
                )
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
        # If the domain doesn't match, we do generate an email
        with self.settings(LDAP_APPEND_DOMAIN="example.com"):
            email = self.example_email("hamlet")
            result = self.client_post("/accounts/password/reset/", {"email": email})
            self.assertEqual(result.status_code, 302)
            self.assertTrue(result["Location"].endswith("/accounts/password/reset/done/"))
            result = self.client_get(result["Location"])
        body = self.get_reset_mail_body()
        self.assertIn("reset your password", body)
    def test_redirect_endpoints(self) -> None:
        """
        These tests are mostly designed to give us 100% URL coverage
        in our URL coverage reports.  Our mechanism for finding URL
        coverage doesn't handle redirects, so we just have a few quick
        tests here.
        """
        result = self.client_get("/accounts/password/reset/done/")
        self.assert_in_success_response(["Check your email"], result)
        result = self.client_get("/accounts/password/done/")
        self.assert_in_success_response(["You've set a new password!"], result)
        result = self.client_get("/accounts/send_confirm/?email=alice@example.com")
        self.assert_in_success_response(["/accounts/home/"], result)
        result = self.client_get(
            "/accounts/new/send_confirm/?email=alice@example.com&realm_name=Zulip+test&realm_type=10&realm_subdomain=zuliptest"
        )
        self.assert_in_success_response(["/new/"], result)
    def test_password_reset_for_soft_deactivated_user(self) -> None:
        user_profile = self.example_user("hamlet")
        email = user_profile.delivery_email
        with self.soft_deactivate_and_check_long_term_idle(user_profile, False):
            # start the password reset process by supplying an email address
            result = self.client_post("/accounts/password/reset/", {"email": email})
            # check the redirect link telling you to check mail for password reset link
            self.assertEqual(result.status_code, 302)
            self.assertTrue(result["Location"].endswith("/accounts/password/reset/done/"))
class LoginTest(ZulipTestCase):
    """
    Logging in, registration, and logging out.
    """
    def test_login(self) -> None:
        self.login("hamlet")
        user_profile = self.example_user("hamlet")
        self.assert_logged_in_user_id(user_profile.id)
    def test_login_deactivated_user(self) -> None:
        user_profile = self.example_user("hamlet")
        do_deactivate_user(user_profile, acting_user=None)
        result = self.login_with_return(user_profile.delivery_email, "xxx")
        self.assertEqual(result.status_code, 200)
        self.assert_in_response(
            f"Your account {user_profile.delivery_email} has been deactivated.", result
        )
        self.assert_logged_in_user_id(None)
    def test_login_deactivate_user_error(self) -> None:
        """
        This is meant to test whether the error message signaled by the
        is_deactivated is shown independently of whether the Email
        backend is enabled.
        """
        user_profile = self.example_user("hamlet")
        realm = user_profile.realm
        self.assertTrue(email_auth_enabled(realm))
        url = f"{realm.uri}/login/?" + urlencode({"is_deactivated": user_profile.delivery_email})
        result = self.client_get(url)
        self.assertEqual(result.status_code, 200)
        self.assert_in_response(
            f"Your account {user_profile.delivery_email} has been deactivated.", result
        )
        auth_dict = realm.authentication_methods_dict()
        auth_dict["Email"] = False
        do_set_realm_authentication_methods(realm, auth_dict, acting_user=None)
        result = self.client_get(url)
        self.assertEqual(result.status_code, 200)
        self.assert_in_response(
            f"Your account {user_profile.delivery_email} has been deactivated.", result
        )
    def test_login_bad_password(self) -> None:
        user = self.example_user("hamlet")
        password: Optional[str] = "wrongpassword"
        result = self.login_with_return(user.delivery_email, password=password)
        self.assert_in_success_response([user.delivery_email], result)
        self.assert_logged_in_user_id(None)
        # Parallel test to confirm that the right password works using the
        # same login code, which verifies our failing test isn't broken
        # for some other reason.
        password = initial_password(user.delivery_email)
        result = self.login_with_return(user.delivery_email, password=password)
        self.assertEqual(result.status_code, 302)
        self.assert_logged_in_user_id(user.id)
    @override_settings(RATE_LIMITING_AUTHENTICATE=True)
    @ratelimit_rule(10, 2, domain="authenticate_by_username")
    def test_login_bad_password_rate_limiter(self) -> None:
        user_profile = self.example_user("hamlet")
        email = user_profile.delivery_email
        start_time = time.time()
        with patch("time.time", return_value=start_time):
            self.login_with_return(email, password="wrongpassword")
            self.assert_logged_in_user_id(None)
            self.login_with_return(email, password="wrongpassword")
            self.assert_logged_in_user_id(None)
            # We're over the allowed limit, so the next attempt, even with the correct
            # password, will get blocked.
            result = self.login_with_return(email)
            self.assert_in_success_response(["Try again in 10 seconds"], result)
        # After time passes, we should be able to log in.
        with patch("time.time", return_value=start_time + 11):
            self.login_with_return(email)
            self.assert_logged_in_user_id(user_profile.id)
    def test_login_with_old_weak_password_after_hasher_change(self) -> None:
        user_profile = self.example_user("hamlet")
        password = "a_password_of_22_chars"
        with self.settings(PASSWORD_HASHERS=("django.contrib.auth.hashers.SHA1PasswordHasher",)):
            user_profile.set_password(password)
            user_profile.save()
        with self.settings(
            PASSWORD_HASHERS=(
                "django.contrib.auth.hashers.MD5PasswordHasher",
                "django.contrib.auth.hashers.SHA1PasswordHasher",
            ),
            PASSWORD_MIN_LENGTH=30,
        ), self.assertLogs("zulip.auth.email", level="INFO"):
            result = self.login_with_return(self.example_email("hamlet"), password)
            self.assertEqual(result.status_code, 200)
            self.assert_in_response(
                "Your password has been disabled because it is too weak.", result
            )
            self.assert_logged_in_user_id(None)
    def test_login_nonexistent_user(self) -> None:
        result = self.login_with_return("xxx@zulip.com", "xxx")
        self.assertEqual(result.status_code, 200)
        self.assert_in_response("Please enter a correct email and password", result)
        self.assert_logged_in_user_id(None)
    def test_login_wrong_subdomain(self) -> None:
        user_profile = self.mit_user("sipbtest")
        email = user_profile.delivery_email
        with self.assertLogs("zulip.auth.OurAuthenticationForm", level="INFO") as m:
            result = self.login_with_return(email, "xxx")
            matching_accounts_dict = {"realm_id": user_profile.realm_id, "id": user_profile.id}
            self.assertEqual(
                m.output,
                [
                    f"INFO:zulip.auth.OurAuthenticationForm:User attempted password login to wrong subdomain zulip. Matching accounts: [{matching_accounts_dict}]"
                ],
            )
        self.assertEqual(result.status_code, 200)
        expected_error = (
            "Please enter a correct email and password. Note that both fields may be case-sensitive"
        )
        self.assert_in_response(expected_error, result)
        self.assert_logged_in_user_id(None)
    def test_login_invalid_subdomain(self) -> None:
        result = self.login_with_return(self.example_email("hamlet"), "xxx", subdomain="invalid")
        self.assertEqual(result.status_code, 404)
        self.assert_in_response("There is no Zulip organization hosted at this subdomain.", result)
        self.assert_logged_in_user_id(None)
    def test_register(self) -> None:
        reset_email_visibility_to_everyone_in_zulip_realm()
        realm = get_realm("zulip")
        hamlet = self.example_user("hamlet")
        stream_names = [f"stream_{i}" for i in range(40)]
        for stream_name in stream_names:
            stream = self.make_stream(stream_name, realm=realm)
            DefaultStream.objects.create(stream=stream, realm=realm)
        # Make sure there's at least one recent message to be mark
        # unread.  This prevents a bug where this test would start
        # failing the test database was generated more than
        # ONBOARDING_RECENT_TIMEDELTA ago.
        self.subscribe(hamlet, "stream_0")
        self.send_stream_message(
            hamlet,
            "stream_0",
            topic_name="test topic",
            content="test message",
        )
        # Clear all the caches.
        flush_per_request_caches()
        ContentType.objects.clear_cache()
        # Ensure the number of queries we make is not O(streams)
        with self.assert_database_query_count(102), cache_tries_captured() as cache_tries:
            with self.captureOnCommitCallbacks(execute=True):
                self.register(self.nonreg_email("test"), "test")
        # We can probably avoid a couple cache hits here, but there doesn't
        # seem to be any O(N) behavior.  Some of the cache hits are related
        # to sending messages, such as getting the welcome bot, looking up
        # the alert words for a realm, etc.
        self.assert_length(cache_tries, 20)
        user_profile = self.nonreg_user("test")
        self.assert_logged_in_user_id(user_profile.id)
        self.assertFalse(user_profile.enable_stream_desktop_notifications)
        self.check_user_added_in_system_group(user_profile)
    def test_register_deactivated(self) -> None:
        """
        If you try to register for a deactivated realm, you get a clear error
        page.
        """
        realm = get_realm("zulip")
        realm.deactivated = True
        realm.save(update_fields=["deactivated"])
        result = self.client_post(
            "/accounts/home/", {"email": self.nonreg_email("test")}, subdomain="zulip"
        )
        self.assertEqual(result.status_code, 302)
        self.assertEqual("/accounts/deactivated/", result["Location"])
        with self.assertRaises(UserProfile.DoesNotExist):
            self.nonreg_user("test")
    def test_register_with_invalid_email(self) -> None:
        """
        If you try to register with invalid email, you get an invalid email
        page
        """
        invalid_email = "foo\x00bar"
        result = self.client_post("/accounts/home/", {"email": invalid_email}, subdomain="zulip")
        self.assertEqual(result.status_code, 200)
        self.assertContains(result, "Enter a valid email address")
    def test_register_deactivated_partway_through(self) -> None:
        """
        If you try to register for a deactivated realm, you get a clear error
        page.
        """
        email = self.nonreg_email("test")
        result = self.client_post("/accounts/home/", {"email": email}, subdomain="zulip")
        self.assertEqual(result.status_code, 302)
        self.assertNotIn("deactivated", result["Location"])
        realm = get_realm("zulip")
        realm.deactivated = True
        realm.save(update_fields=["deactivated"])
        result = self.submit_reg_form_for_user(email, "abcd1234", subdomain="zulip")
        self.assertEqual(result.status_code, 302)
        self.assertEqual("/accounts/deactivated/", result["Location"])
        with self.assertRaises(UserProfile.DoesNotExist):
            self.nonreg_user("test")
    def test_login_deactivated_realm(self) -> None:
        """
        If you try to log in to a deactivated realm, you get a clear error page.
        """
        realm = get_realm("zulip")
        realm.deactivated = True
        realm.save(update_fields=["deactivated"])
        result = self.login_with_return(self.example_email("hamlet"), subdomain="zulip")
        self.assertEqual(result.status_code, 302)
        self.assertEqual("/accounts/deactivated/", result["Location"])
    def test_logout(self) -> None:
        self.login("hamlet")
        # We use the logout API, not self.logout, to make sure we test
        # the actual logout code path.
        self.client_post("/accounts/logout/")
        self.assert_logged_in_user_id(None)
    def test_non_ascii_login(self) -> None:
        """
        You can log in even if your password contain non-ASCII characters.
        """
        email = self.nonreg_email("test")
        password = "hümbüǵ"
        # Registering succeeds.
        self.register(email, password)
        user_profile = self.nonreg_user("test")
        self.assert_logged_in_user_id(user_profile.id)
        self.logout()
        self.assert_logged_in_user_id(None)
        # Logging in succeeds.
        self.logout()
        self.login_by_email(email, password)
        self.assert_logged_in_user_id(user_profile.id)
    @override_settings(TWO_FACTOR_AUTHENTICATION_ENABLED=False)
    def test_login_page_redirects_logged_in_user(self) -> None:
        """You will be redirected to the app's main page if you land on the
        login page when already logged in.
        """
        self.login("cordelia")
        response = self.client_get("/login/")
        self.assertEqual(response["Location"], "http://zulip.testserver/")
    def test_options_request_to_login_page(self) -> None:
        response = self.client_options("/login/")
        self.assertEqual(response.status_code, 200)
    @override_settings(TWO_FACTOR_AUTHENTICATION_ENABLED=True)
    def test_login_page_redirects_logged_in_user_under_2fa(self) -> None:
        """You will be redirected to the app's main page if you land on the
        login page when already logged in.
        """
        user_profile = self.example_user("cordelia")
        self.create_default_device(user_profile)
        self.login("cordelia")
        self.login_2fa(user_profile)
        response = self.client_get("/login/")
        self.assertEqual(response["Location"], "http://zulip.testserver/")
    def test_start_two_factor_auth(self) -> None:
        request = HostRequestMock()
        with patch("zerver.views.auth.TwoFactorLoginView") as mock_view:
            mock_view.as_view.return_value = lambda *a, **k: HttpResponse()
            response = start_two_factor_auth(request)
            self.assertTrue(isinstance(response, HttpResponse))
    def test_do_two_factor_login(self) -> None:
        user_profile = self.example_user("hamlet")
        self.create_default_device(user_profile)
        request = HostRequestMock()
        with patch("zerver.decorator.django_otp.login") as mock_login:
            do_two_factor_login(request, user_profile)
            mock_login.assert_called_once()
    def test_zulip_default_context_does_not_load_inline_previews(self) -> None:
        realm = get_realm("zulip")
        description = "https://www.google.com/images/srpr/logo4w.png"
        realm.description = description
        realm.save(update_fields=["description"])
        response: HttpResponseBase = self.client_get("/login/")
        expected_response = """\
https://www.google.com/images/srpr/logo4w.png
"""
        assert isinstance(response, TemplateResponse)
        assert response.context_data is not None
        self.assertEqual(response.context_data["realm_description"], expected_response)
        self.assertEqual(response.status_code, 200)
class EmailUnsubscribeTests(ZulipTestCase):
    def test_error_unsubscribe(self) -> None:
        # An invalid unsubscribe token "test123" produces an error.
        result = self.client_get("/accounts/unsubscribe/missed_messages/test123")
        self.assert_in_response("Unknown email unsubscribe request", result)
        # An unknown message type "fake" produces an error.
        user_profile = self.example_user("hamlet")
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "fake")
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
        self.assert_in_response("Unknown email unsubscribe request", result)
    def test_message_notification_emails_unsubscribe(self) -> None:
        """
        We provide one-click unsubscribe links in message notification emails
        that you can click even when logged out to update your
        email notification settings.
        """
        user_profile = self.example_user("hamlet")
        user_profile.enable_offline_email_notifications = True
        user_profile.save()
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "missed_messages")
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
        self.assertEqual(result.status_code, 200)
        user_profile.refresh_from_db()
        self.assertFalse(user_profile.enable_offline_email_notifications)
    def test_welcome_unsubscribe(self) -> None:
        """
        We provide one-click unsubscribe links in welcome e-mails that you can
        click even when logged out to stop receiving them.
        """
        user_profile = self.example_user("hamlet")
        # Simulate a new user signing up, which enqueues 3 welcome e-mails.
        enqueue_welcome_emails(user_profile)
        self.assertEqual(3, ScheduledEmail.objects.filter(users=user_profile).count())
        # Simulate unsubscribing from the welcome e-mails.
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "welcome")
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
        # The welcome email jobs are no longer scheduled.
        self.assertEqual(result.status_code, 200)
        self.assertEqual(0, ScheduledEmail.objects.filter(users=user_profile).count())
    def test_digest_unsubscribe(self) -> None:
        """
        We provide one-click unsubscribe links in digest e-mails that you can
        click even when logged out to stop receiving them.
        Unsubscribing from these emails also dequeues any digest email jobs that
        have been queued.
        """
        user_profile = self.example_user("hamlet")
        self.assertTrue(user_profile.enable_digest_emails)
        # Enqueue a fake digest email.
        context = {
            "name": "",
            "realm_uri": "",
            "unread_pms": [],
            "hot_conversations": [],
            "new_users": [],
            "new_streams": {"plain": []},
            "unsubscribe_link": "",
        }
        send_future_email(
            "zerver/emails/digest",
            user_profile.realm,
            to_user_ids=[user_profile.id],
            context=context,
        )
        self.assertEqual(1, ScheduledEmail.objects.filter(users=user_profile).count())
        # Simulate unsubscribing from digest e-mails.
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "digest")
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
        # The setting is toggled off, and scheduled jobs have been removed.
        self.assertEqual(result.status_code, 200)
        # Circumvent user_profile caching.
        user_profile.refresh_from_db()
        self.assertFalse(user_profile.enable_digest_emails)
        self.assertEqual(0, ScheduledEmail.objects.filter(users=user_profile).count())
    def test_login_unsubscribe(self) -> None:
        """
        We provide one-click unsubscribe links in login
        e-mails that you can click even when logged out to update your
        email notification settings.
        """
        user_profile = self.example_user("hamlet")
        user_profile.enable_login_emails = True
        user_profile.save()
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "login")
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
        self.assertEqual(result.status_code, 200)
        user_profile.refresh_from_db()
        self.assertFalse(user_profile.enable_login_emails)
    def test_marketing_unsubscribe(self) -> None:
        """
        We provide one-click unsubscribe links in marketing e-mails that you can
        click even when logged out to stop receiving them.
        """
        user_profile = self.example_user("hamlet")
        self.assertTrue(user_profile.enable_marketing_emails)
        # Simulate unsubscribing from marketing e-mails.
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "marketing")
        result = self.client_get(urllib.parse.urlparse(unsubscribe_link).path)
        self.assertEqual(result.status_code, 200)
        # Circumvent user_profile caching.
        user_profile.refresh_from_db()
        self.assertFalse(user_profile.enable_marketing_emails)
    def test_marketing_unsubscribe_post(self) -> None:
        """
        The List-Unsubscribe-Post header lets email clients trigger an
        automatic unsubscription request via POST (see RFC 8058), so
        test that too.
        """
        user_profile = self.example_user("hamlet")
        self.assertTrue(user_profile.enable_marketing_emails)
        # Simulate unsubscribing from marketing e-mails.
        unsubscribe_link = one_click_unsubscribe_link(user_profile, "marketing")
        client = Client(enforce_csrf_checks=True)
        result = client.post(
            urllib.parse.urlparse(unsubscribe_link).path, {"List-Unsubscribe": "One-Click"}
        )
        self.assertEqual(result.status_code, 200)
        # Circumvent user_profile caching.
        user_profile.refresh_from_db()
        self.assertFalse(user_profile.enable_marketing_emails)
class RealmCreationTest(ZulipTestCase):
    @override_settings(OPEN_REALM_CREATION=True)
    def check_able_to_create_realm(self, email: str, password: str = "test") -> None:
        internal_realm = get_realm(settings.SYSTEM_BOT_REALM)
        notification_bot = get_system_bot(settings.NOTIFICATION_BOT, internal_realm.id)
        signups_stream, _ = create_stream_if_needed(notification_bot.realm, "signups")
        string_id = "zuliptest"
        org_name = "Zulip Test"
        # Make sure the realm does not exist
        with self.assertRaises(Realm.DoesNotExist):
            get_realm(string_id)
        # Create new realm with the email
        result = self.submit_realm_creation_form(
            email, realm_subdomain=string_id, realm_name=org_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(org_name)}&realm_type=10&realm_subdomain={string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        prereg_realm = PreregistrationRealm.objects.get(email=email)
        self.assertEqual(prereg_realm.name, "Zulip Test")
        self.assertEqual(prereg_realm.org_type, Realm.ORG_TYPES["business"]["id"])
        self.assertEqual(prereg_realm.string_id, string_id)
        # Check confirmation email has the correct subject and body, extract
        # confirmation link and visit it
        confirmation_url = self.get_confirmation_url_from_outbox(
            email,
            email_subject_contains="Create your Zulip organization",
            email_body_contains="You have requested a new Zulip organization",
        )
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email, password, realm_subdomain=string_id, realm_name=org_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].startswith("http://zuliptest.testserver/accounts/login/subdomain/")
        )
        # Make sure the realm is created
        realm = get_realm(string_id)
        self.assertEqual(realm.string_id, string_id)
        user = get_user(email, realm)
        self.assertEqual(user.realm, realm)
        # Check that user is the owner.
        self.assertEqual(user.role, UserProfile.ROLE_REALM_OWNER)
        # Check defaults
        self.assertEqual(realm.org_type, Realm.ORG_TYPES["business"]["id"])
        self.assertEqual(realm.emails_restricted_to_domains, False)
        self.assertEqual(realm.invite_required, True)
        prereg_realm = PreregistrationRealm.objects.get(email=email)
        # Check created_realm and created_user field of PreregistrationRealm object
        self.assertEqual(prereg_realm.created_realm, realm)
        self.assertEqual(prereg_realm.created_user, user)
        self.assertEqual(prereg_realm.status, confirmation_settings.STATUS_USED)
        # Check welcome messages
        for stream_name, text, message_count in [
            (Realm.DEFAULT_NOTIFICATION_STREAM_NAME, "with the topic", 3),
            (Realm.INITIAL_PRIVATE_STREAM_NAME, "private stream", 1),
        ]:
            stream = get_stream(stream_name, realm)
            recipient = stream.recipient
            messages = Message.objects.filter(recipient=recipient).order_by("date_sent")
            self.assert_length(messages, message_count)
            self.assertIn(text, messages[0].content)
        # Check admin organization's signups stream messages
        recipient = signups_stream.recipient
        messages = Message.objects.filter(recipient=recipient).order_by("id")
        self.assert_length(messages, 1)
        # Check organization name, subdomain and organization type are in message content
        self.assertIn("Zulip Test", messages[0].content)
        self.assertIn("zuliptest", messages[0].content)
        self.assertIn("Organization type: Business", messages[0].content)
        self.assertEqual("new organizations", messages[0].topic_name())
        realm_creation_audit_log = RealmAuditLog.objects.get(
            realm=realm, event_type=RealmAuditLog.REALM_CREATED
        )
        self.assertEqual(realm_creation_audit_log.acting_user, user)
        self.assertEqual(realm_creation_audit_log.event_time, realm.date_created)
        # Piggyback a little check for how we handle
        # empty string_ids.
        realm.string_id = ""
        self.assertEqual(realm.display_subdomain, ".")
    def test_create_realm_non_existing_email(self) -> None:
        self.check_able_to_create_realm("user1@test.com")
    def test_create_realm_existing_email(self) -> None:
        self.check_able_to_create_realm("hamlet@zulip.com")
    @override_settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipLDAPAuthBackend",))
    def test_create_realm_ldap_email(self) -> None:
        self.init_default_ldap_database()
        with self.settings(LDAP_EMAIL_ATTR="mail"):
            self.check_able_to_create_realm(
                "newuser_email@zulip.com", self.ldap_password("newuser_with_email")
            )
    def test_create_realm_as_system_bot(self) -> None:
        result = self.submit_realm_creation_form(
            email="notification-bot@zulip.com", realm_subdomain="zuliptest", realm_name="Zulip test"
        )
        self.assertEqual(result.status_code, 200)
        self.assert_in_response("notification-bot@zulip.com is reserved for system bots", result)
    def test_create_realm_no_creation_key(self) -> None:
        """
        Trying to create a realm without a creation_key should fail when
        OPEN_REALM_CREATION is false.
        """
        email = "user1@test.com"
        with self.settings(OPEN_REALM_CREATION=False):
            # Create new realm with the email, but no creation key.
            result = self.submit_realm_creation_form(
                email, realm_subdomain="zuliptest", realm_name="Zulip test"
            )
            self.assertEqual(result.status_code, 200)
            self.assert_in_response("Organization creation link required", result)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_create_realm_with_subdomain(self) -> None:
        password = "test"
        string_id = "zuliptest"
        email = "user1@test.com"
        realm_name = "Test"
        # Make sure the realm does not exist
        with self.assertRaises(Realm.DoesNotExist):
            get_realm(string_id)
        # Create new realm with the email
        result = self.submit_realm_creation_form(
            email, realm_subdomain=string_id, realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(realm_name)}&realm_type=10&realm_subdomain={string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email, password, realm_subdomain=string_id, realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"], subdomain=string_id)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "http://zuliptest.testserver")
        # Make sure the realm is created
        realm = get_realm(string_id)
        self.assertEqual(realm.string_id, string_id)
        self.assertEqual(get_user(email, realm).realm, realm)
        self.assertEqual(realm.name, realm_name)
        self.assertEqual(realm.subdomain, string_id)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_create_realm_with_marketing_emails_enabled(self) -> None:
        password = "test"
        string_id = "zuliptest"
        email = "user1@test.com"
        realm_name = "Test"
        # Make sure the realm does not exist
        with self.assertRaises(Realm.DoesNotExist):
            get_realm(string_id)
        # Create new realm with the email
        result = self.submit_realm_creation_form(
            email, realm_subdomain=string_id, realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(realm_name)}&realm_type=10&realm_subdomain={string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email,
            password,
            realm_subdomain=string_id,
            realm_name=realm_name,
            enable_marketing_emails=True,
        )
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"], subdomain=string_id)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "http://zuliptest.testserver")
        # Make sure the realm is created
        realm = get_realm(string_id)
        self.assertEqual(realm.string_id, string_id)
        user = get_user(email, realm)
        self.assertEqual(user.realm, realm)
        self.assertTrue(user.enable_marketing_emails)
    @override_settings(OPEN_REALM_CREATION=True, CORPORATE_ENABLED=False)
    def test_create_realm_without_prompting_for_marketing_emails(self) -> None:
        password = "test"
        string_id = "zuliptest"
        email = "user1@test.com"
        realm_name = "Test"
        # Make sure the realm does not exist
        with self.assertRaises(Realm.DoesNotExist):
            get_realm(string_id)
        # Create new realm with the email
        result = self.submit_realm_creation_form(
            email, realm_subdomain=string_id, realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(realm_name)}&realm_type=10&realm_subdomain={string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        # Simulate the initial POST that is made by confirm-preregistration.js
        # by triggering submit on confirm_preregistration.html.
        payload = {
            "full_name": "",
            "key": find_key_by_email(email),
            "from_confirmation": "1",
        }
        result = self.client_post("/realm/register/", payload)
        # Assert that the form did not prompt the user for enabling
        # marketing emails.
        self.assert_not_in_success_response(['input id="id_enable_marketing_emails"'], result)
        result = self.submit_reg_form_for_user(
            email,
            password,
            realm_subdomain=string_id,
            realm_name=realm_name,
        )
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"], subdomain=string_id)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "http://zuliptest.testserver")
        # Make sure the realm is created
        realm = get_realm(string_id)
        self.assertEqual(realm.string_id, string_id)
        user = get_user(email, realm)
        self.assertEqual(user.realm, realm)
        self.assertFalse(user.enable_marketing_emails)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_create_realm_with_marketing_emails_disabled(self) -> None:
        password = "test"
        string_id = "zuliptest"
        email = "user1@test.com"
        realm_name = "Zulip test"
        # Make sure the realm does not exist
        with self.assertRaises(Realm.DoesNotExist):
            get_realm(string_id)
        # Create new realm with the email
        result = self.submit_realm_creation_form(
            email, realm_subdomain=string_id, realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(realm_name)}&realm_type=10&realm_subdomain={string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email,
            password,
            realm_subdomain=string_id,
            realm_name=realm_name,
            enable_marketing_emails=False,
        )
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"], subdomain=string_id)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "http://zuliptest.testserver")
        # Make sure the realm is created
        realm = get_realm(string_id)
        self.assertEqual(realm.string_id, string_id)
        user = get_user(email, realm)
        self.assertEqual(user.realm, realm)
        self.assertFalse(user.enable_marketing_emails)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_create_regular_realm_welcome_bot_direct_message(self) -> None:
        password = "test"
        string_id = "zuliptest"
        email = "user1@test.com"
        realm_name = "Test"
        # Create new realm with the email.
        result = self.submit_realm_creation_form(
            email, realm_subdomain=string_id, realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(realm_name)}&realm_type=10&realm_subdomain={string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email,
            password,
            realm_subdomain=string_id,
            realm_name=realm_name,
            enable_marketing_emails=False,
        )
        self.assertEqual(result.status_code, 302)
        # Make sure the correct Welcome Bot direct message is sent.
        welcome_msg = Message.objects.filter(
            sender__email="welcome-bot@zulip.com", recipient__type=Recipient.PERSONAL
        ).latest("id")
        self.assertTrue(welcome_msg.content.startswith("Hello, and welcome to Zulip!"))
        # Organization type is not education or education_nonprofit,
        # and organization is not a demo organization.
        self.assertIn("Getting started guide", welcome_msg.content)
        self.assertNotIn("Using Zulip for a class guide", welcome_msg.content)
        self.assertNotIn("demo organization", welcome_msg.content)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_create_education_demo_organiztion_welcome_bot_direct_message(self) -> None:
        password = "test"
        string_id = "zuliptest"
        email = "user1@test.com"
        realm_name = "Test"
        # Create new realm with the email.
        result = self.submit_realm_creation_form(
            email,
            realm_subdomain=string_id,
            realm_name=realm_name,
            realm_type=Realm.ORG_TYPES["education"]["id"],
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(realm_name)}&realm_type=35&realm_subdomain={string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email,
            password,
            realm_subdomain=string_id,
            realm_name=realm_name,
            enable_marketing_emails=False,
            realm_type=Realm.ORG_TYPES["education"]["id"],
            is_demo_organization=True,
        )
        self.assertEqual(result.status_code, 302)
        # Make sure the correct Welcome Bot direct message is sent.
        welcome_msg = Message.objects.filter(
            sender__email="welcome-bot@zulip.com", recipient__type=Recipient.PERSONAL
        ).latest("id")
        self.assertTrue(welcome_msg.content.startswith("Hello, and welcome to Zulip!"))
        # Organization type is education, and organization is a demo organization.
        self.assertNotIn("Getting started guide", welcome_msg.content)
        self.assertIn("Using Zulip for a class guide", welcome_msg.content)
        self.assertIn("demo organization", welcome_msg.content)
    @override_settings(OPEN_REALM_CREATION=True, FREE_TRIAL_DAYS=30)
    def test_create_realm_during_free_trial(self) -> None:
        password = "test"
        string_id = "zuliptest"
        email = "user1@test.com"
        realm_name = "Test"
        with self.assertRaises(Realm.DoesNotExist):
            get_realm(string_id)
        result = self.submit_realm_creation_form(
            email, realm_subdomain=string_id, realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(realm_name)}&realm_type=10&realm_subdomain={string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email, password, realm_subdomain=string_id, realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"], subdomain=string_id)
        self.assertEqual(result["Location"], "http://zuliptest.testserver/upgrade/?onboarding=true")
        result = self.client_get(result["Location"], subdomain=string_id)
        self.assert_in_success_response(["Not ready to start your trial?"], result)
        realm = get_realm(string_id)
        self.assertEqual(realm.string_id, string_id)
        self.assertEqual(get_user(email, realm).realm, realm)
        self.assertEqual(realm.name, realm_name)
        self.assertEqual(realm.subdomain, string_id)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_create_two_realms(self) -> None:
        """
        Verify correct behavior and PreregistrationRealm handling when using
        two pre-generated realm creation links to create two different realms.
        """
        password = "test"
        first_string_id = "zuliptest"
        second_string_id = "zuliptest2"
        email = "user1@test.com"
        first_realm_name = "Test"
        second_realm_name = "Test"
        # Make sure the realms do not exist
        with self.assertRaises(Realm.DoesNotExist):
            get_realm(first_string_id)
        with self.assertRaises(Realm.DoesNotExist):
            get_realm(second_string_id)
        # Now we pre-generate two realm creation links
        result = self.submit_realm_creation_form(
            email, realm_subdomain=first_string_id, realm_name=first_realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(first_realm_name)}&realm_type=10&realm_subdomain={first_string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        first_confirmation_url = self.get_confirmation_url_from_outbox(email)
        self.assertEqual(PreregistrationRealm.objects.filter(email=email, status=0).count(), 1)
        result = self.submit_realm_creation_form(
            email, realm_subdomain=second_string_id, realm_name=second_realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/new/send_confirm/?email={urllib.parse.quote(email)}&realm_name={urllib.parse.quote_plus(second_realm_name)}&realm_type=10&realm_subdomain={second_string_id}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        second_confirmation_url = self.get_confirmation_url_from_outbox(email)
        self.assertNotEqual(first_confirmation_url, second_confirmation_url)
        self.assertEqual(PreregistrationRealm.objects.filter(email=email, status=0).count(), 2)
        # Create and verify the first realm
        result = self.client_get(first_confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email,
            password,
            realm_subdomain=first_string_id,
            realm_name=first_realm_name,
            key=first_confirmation_url.split("/")[-1],
        )
        self.assertEqual(result.status_code, 302)
        # Make sure the realm is created
        realm = get_realm(first_string_id)
        self.assertEqual(realm.string_id, first_string_id)
        self.assertEqual(realm.name, first_realm_name)
        # One of the PreregistrationRealm should have been used up:
        self.assertEqual(PreregistrationRealm.objects.filter(email=email, status=0).count(), 1)
        # Create and verify the second realm
        result = self.client_get(second_confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email,
            password,
            realm_subdomain=second_string_id,
            realm_name=second_realm_name,
            key=second_confirmation_url.split("/")[-1],
        )
        self.assertEqual(result.status_code, 302)
        # Make sure the realm is created
        realm = get_realm(second_string_id)
        self.assertEqual(realm.string_id, second_string_id)
        self.assertEqual(realm.name, second_realm_name)
        # The remaining PreregistrationRealm should have been used up:
        self.assertEqual(PreregistrationRealm.objects.filter(email=email, status=0).count(), 0)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_invalid_email_signup(self) -> None:
        result = self.submit_realm_creation_form(
            email=" None:
        result = self.client_post("/new/", {"email": "hi@mailinator.com"})
        self.assert_in_response("Please use your real email address.", result)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_subdomain_restrictions(self) -> None:
        password = "test"
        email = "user1@test.com"
        realm_name = "Test"
        errors = {
            "id": "length 3 or greater",
            "-id": "cannot start or end with a",
            "string-ID": "lowercase letters",
            "string_id": "lowercase letters",
            "stream": "unavailable",
            "streams": "unavailable",
            "about": "unavailable",
            "abouts": "unavailable",
            "zephyr": "unavailable",
        }
        for string_id, error_msg in errors.items():
            result = self.submit_realm_creation_form(
                email, realm_subdomain=string_id, realm_name=realm_name
            )
            self.assert_in_response(error_msg, result)
        # test valid subdomain
        result = self.submit_realm_creation_form(
            email, realm_subdomain="a-0", realm_name=realm_name
        )
        self.client_get(result["Location"])
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        self.client_get(confirmation_url)
        result = self.submit_reg_form_for_user(
            email, password, realm_subdomain="a-0", realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].startswith("http://a-0.testserver/accounts/login/subdomain/")
        )
    @override_settings(OPEN_REALM_CREATION=True)
    def test_create_realm_using_old_subdomain_of_a_realm(self) -> None:
        realm = get_realm("zulip")
        do_change_realm_subdomain(realm, "new-name", acting_user=None)
        email = "user1@test.com"
        result = self.submit_realm_creation_form(email, realm_subdomain="test", realm_name="Test")
        self.assert_in_response("Subdomain unavailable. Please choose a different one.", result)
    @override_settings(OPEN_REALM_CREATION=True)
    def test_subdomain_restrictions_root_domain(self) -> None:
        password = "test"
        email = "user1@test.com"
        realm_name = "Test"
        # test root domain will fail with ROOT_DOMAIN_LANDING_PAGE
        with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
            result = self.submit_realm_creation_form(
                email, realm_subdomain="", realm_name=realm_name
            )
            self.assert_in_response("unavailable", result)
        # test valid use of root domain
        result = self.submit_realm_creation_form(email, realm_subdomain="", realm_name=realm_name)
        self.client_get(result["Location"])
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        self.client_get(confirmation_url)
        result = self.submit_reg_form_for_user(
            email, password, realm_subdomain="", realm_name=realm_name
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].startswith("http://testserver/accounts/login/subdomain/")
        )
    @override_settings(OPEN_REALM_CREATION=True)
    def test_subdomain_restrictions_root_domain_option(self) -> None:
        password = "test"
        email = "user1@test.com"
        realm_name = "Test"
        # test root domain will fail with ROOT_DOMAIN_LANDING_PAGE
        with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
            result = self.submit_realm_creation_form(
                email, realm_subdomain="abcdef", realm_name=realm_name, realm_in_root_domain="true"
            )
            self.assert_in_response("unavailable", result)
        # test valid use of root domain
        result = self.submit_realm_creation_form(
            email, realm_subdomain="abcdef", realm_name=realm_name, realm_in_root_domain="true"
        )
        self.client_get(result["Location"])
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        self.client_get(confirmation_url)
        result = self.submit_reg_form_for_user(
            email,
            password,
            realm_subdomain="abcdef",
            realm_in_root_domain="true",
            realm_name=realm_name,
        )
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].startswith("http://testserver/accounts/login/subdomain/")
        )
    def test_is_root_domain_available(self) -> None:
        self.assertTrue(is_root_domain_available())
        with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
            self.assertFalse(is_root_domain_available())
        realm = get_realm("zulip")
        realm.string_id = Realm.SUBDOMAIN_FOR_ROOT_DOMAIN
        realm.save()
        self.assertFalse(is_root_domain_available())
    def test_subdomain_check_api(self) -> None:
        result = self.client_get("/json/realm/subdomain/zulip")
        self.assert_in_success_response(
            ["Subdomain unavailable. Please choose a different one."], result
        )
        result = self.client_get("/json/realm/subdomain/zu_lip")
        self.assert_in_success_response(
            ["Subdomain can only have lowercase letters, numbers, and '-'s."], result
        )
        with self.settings(SOCIAL_AUTH_SUBDOMAIN="zulipauth"):
            result = self.client_get("/json/realm/subdomain/zulipauth")
            self.assert_in_success_response(
                ["Subdomain unavailable. Please choose a different one."], result
            )
        result = self.client_get("/json/realm/subdomain/hufflepuff")
        self.assert_in_success_response(["available"], result)
        self.assert_not_in_success_response(["unavailable"], result)
    def test_subdomain_check_management_command(self) -> None:
        # Short names should not work, even with the flag
        with self.assertRaises(ValidationError):
            check_subdomain_available("aa")
        with self.assertRaises(ValidationError):
            check_subdomain_available("aa", allow_reserved_subdomain=True)
        # Malformed names should never work
        with self.assertRaises(ValidationError):
            check_subdomain_available("-ba_d-")
        with self.assertRaises(ValidationError):
            check_subdomain_available("-ba_d-", allow_reserved_subdomain=True)
        with patch("zerver.lib.name_restrictions.is_reserved_subdomain", return_value=False):
            # Existing realms should never work even if they are not reserved keywords
            with self.assertRaises(ValidationError):
                check_subdomain_available("zulip")
            with self.assertRaises(ValidationError):
                check_subdomain_available("zulip", allow_reserved_subdomain=True)
        # Reserved ones should only work with the flag
        with self.assertRaises(ValidationError):
            check_subdomain_available("stream")
        check_subdomain_available("stream", allow_reserved_subdomain=True)
class UserSignUpTest(ZulipTestCase):
    def _assert_redirected_to(self, result: "TestHttpResponse", url: str) -> None:
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["LOCATION"], url)
    def verify_signup(
        self,
        *,
        email: str = "newguy@zulip.com",
        password: Optional[str] = "newpassword",
        full_name: str = "New user's name",
        realm: Optional[Realm] = None,
        subdomain: Optional[str] = None,
    ) -> Union[UserProfile, "TestHttpResponse"]:
        """Common test function for signup tests.  It is a goal to use this
        common function for all signup tests to avoid code duplication; doing
        so will likely require adding new parameters."""
        if realm is None:  # nocoverage
            realm = get_realm("zulip")
        client_kwargs: Dict[str, Any] = {}
        if subdomain:
            client_kwargs["subdomain"] = subdomain
        result = self.client_post("/accounts/home/", {"email": email}, **client_kwargs)
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"], **client_kwargs)
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url, **client_kwargs)
        self.assertEqual(result.status_code, 200)
        # Pick a password and agree to the ToS. This should create our
        # account, log us in, and redirect to the app.
        result = self.submit_reg_form_for_user(
            email, password, full_name=full_name, **client_kwargs
        )
        if result.status_code == 200:
            # This usually indicated an error returned when submitting the form.
            # Return the result for the caller to deal with reacting to this, since
            # in many tests this is expected and the caller wants to assert the content
            # of the error.
            return result
        # Verify that we were served a redirect to the app.
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], f"{realm.uri}/")
        # Verify that we successfully logged in.
        user_profile = get_user_by_delivery_email(email, realm)
        self.assert_logged_in_user_id(user_profile.id)
        return user_profile
    def test_bad_email_configuration_for_accounts_home(self) -> None:
        """
        Make sure we redirect for EmailNotDeliveredError.
        """
        email = self.nonreg_email("newguy")
        smtp_mock = patch(
            "zerver.views.registration.send_confirm_registration_email",
            side_effect=EmailNotDeliveredError,
        )
        with smtp_mock, self.assertLogs(level="ERROR") as m:
            result = self.client_post("/accounts/home/", {"email": email})
        self._assert_redirected_to(result, "/config-error/smtp")
        self.assertEqual(m.output, ["ERROR:root:Error in accounts_home"])
    def test_bad_email_configuration_for_create_realm(self) -> None:
        """
        Make sure we redirect for EmailNotDeliveredError.
        """
        email = self.nonreg_email("newguy")
        smtp_mock = patch(
            "zerver.views.registration.send_confirm_registration_email",
            side_effect=EmailNotDeliveredError,
        )
        with smtp_mock, self.assertLogs(level="ERROR") as m:
            result = self.submit_realm_creation_form(
                email, realm_subdomain="zuliptest", realm_name="Zulip test"
            )
        self._assert_redirected_to(result, "/config-error/smtp")
        self.assertEqual(m.output, ["ERROR:root:Error in create_realm"])
    def test_user_default_language_and_timezone(self) -> None:
        """
        Check if the default language of new user is set using the browser locale
        """
        email = self.nonreg_email("newguy")
        password = "newpassword"
        timezone = "America/Denver"
        realm = get_realm("zulip")
        do_set_realm_property(realm, "default_language", "de", acting_user=None)
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        # Pick a password and agree to the ToS.
        result = self.submit_reg_form_for_user(
            email, password, timezone=timezone, HTTP_ACCEPT_LANGUAGE="fr,en;q=0.9"
        )
        self.assertEqual(result.status_code, 302)
        user_profile = self.nonreg_user("newguy")
        self.assertNotEqual(user_profile.default_language, realm.default_language)
        self.assertEqual(user_profile.default_language, "fr")
        self.assertEqual(user_profile.timezone, timezone)
        from django.core.mail import outbox
        outbox.pop()
    def test_default_language_with_unsupported_browser_locale(self) -> None:
        email = self.nonreg_email("newguy")
        password = "newpassword"
        realm = get_realm("zulip")
        do_set_realm_property(realm, "default_language", "de", acting_user=None)
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        # Pick a password and agree to the ToS.
        result = self.submit_reg_form_for_user(email, password, HTTP_ACCEPT_LANGUAGE="en-IND")
        self.assertEqual(result.status_code, 302)
        user_profile = self.nonreg_user("newguy")
        self.assertEqual(user_profile.default_language, realm.default_language)
        from django.core.mail import outbox
        outbox.pop()
    def test_default_twenty_four_hour_time(self) -> None:
        """
        Check if the default twenty_four_hour_time setting of new user
        is the default twenty_four_hour_time of the realm.
        """
        email = self.nonreg_email("newguy")
        password = "newpassword"
        realm = get_realm("zulip")
        realm_user_default = RealmUserDefault.objects.get(realm=realm)
        do_set_realm_user_default_setting(
            realm_user_default, "twenty_four_hour_time", True, acting_user=None
        )
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(email, password)
        self.assertEqual(result.status_code, 302)
        user_profile = self.nonreg_user("newguy")
        realm_user_default = RealmUserDefault.objects.get(realm=realm)
        self.assertEqual(
            user_profile.twenty_four_hour_time, realm_user_default.twenty_four_hour_time
        )
    def test_email_address_visibility_for_new_user(self) -> None:
        email = self.nonreg_email("newguy")
        password = "newpassword"
        realm = get_realm("zulip")
        realm_user_default = RealmUserDefault.objects.get(realm=realm)
        self.assertEqual(
            realm_user_default.email_address_visibility, UserProfile.EMAIL_ADDRESS_VISIBILITY_ADMINS
        )
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        # Pick a password and agree to the ToS.
        result = self.submit_reg_form_for_user(
            email, password, email_address_visibility=UserProfile.EMAIL_ADDRESS_VISIBILITY_NOBODY
        )
        self.assertEqual(result.status_code, 302)
        # Realm-level default is overridden by the value passed during signup.
        user_profile = self.nonreg_user("newguy")
        self.assertEqual(
            user_profile.email_address_visibility, UserProfile.EMAIL_ADDRESS_VISIBILITY_NOBODY
        )
        from django.core.mail import outbox
        outbox.pop()
    def test_signup_already_active(self) -> None:
        """
        Check if signing up with an active email redirects to a login page.
        """
        email = self.example_email("hamlet")
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertIn("login", result["Location"])
        result = self.client_get(result["Location"])
        self.assert_in_response("You've already registered", result)
    def test_signup_system_bot(self) -> None:
        email = "notification-bot@zulip.com"
        result = self.client_post("/accounts/home/", {"email": email}, subdomain="lear")
        self.assertEqual(result.status_code, 302)
        self.assertIn("login", result["Location"])
        result = self.client_get(result["Location"])
        # This is not really the right error message, but at least it's an error.
        self.assert_in_response("You've already registered", result)
    def test_signup_existing_email(self) -> None:
        """
        Check if signing up with an email used in another realm succeeds.
        """
        email = self.example_email("hamlet")
        self.verify_signup(email=email, realm=get_realm("lear"), subdomain="lear")
        self.assertEqual(UserProfile.objects.filter(delivery_email=email).count(), 2)
    def test_signup_invalid_name(self) -> None:
        """
        Check if an invalid name during signup is handled properly.
        """
        result = self.verify_signup(full_name="")
        # _WSGIPatchedWSGIResponse does not exist in Django, thus the inverted isinstance check.
        assert not isinstance(result, UserProfile)
        self.assert_in_success_response(["Invalid characters in name!"], result)
        # Verify that the user is asked for name and password
        self.assert_in_success_response(["id_password", "id_full_name"], result)
    def test_signup_without_password(self) -> None:
        """
        Check if signing up without a password works properly when
        password_auth_enabled is False.
        """
        email = self.nonreg_email("newuser")
        with patch("zerver.views.registration.password_auth_enabled", return_value=False):
            user_profile = self.verify_signup(email=email, password=None)
        assert isinstance(user_profile, UserProfile)
        # User should now be logged in.
        self.assert_logged_in_user_id(user_profile.id)
    def test_signup_without_full_name(self) -> None:
        """
        Check if signing up without a full name redirects to a registration
        form.
        """
        email = "newguy@zulip.com"
        password = "newpassword"
        result = self.verify_signup(email=email, password=password, full_name="")
        # _WSGIPatchedWSGIResponse does not exist in Django, thus the inverted isinstance check.
        assert not isinstance(result, UserProfile)
        self.assert_in_success_response(
            ["Enter your account details to complete registration."], result
        )
        # Verify that the user is asked for name and password
        self.assert_in_success_response(["id_password", "id_full_name"], result)
    def test_signup_email_message_contains_org_header(self) -> None:
        email = "newguy@zulip.com"
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        from django.core.mail import outbox
        self.assertEqual(outbox[0].extra_headers["List-Id"], "Zulip Dev ")
    def test_signup_with_full_name(self) -> None:
        """
        Check if signing up without a full name redirects to a registration
        form.
        """
        email = "newguy@zulip.com"
        password = "newpassword"
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        result = self.client_post(
            "/accounts/register/",
            {
                "password": password,
                "key": find_key_by_email(email),
                "terms": True,
                "full_name": "New Guy",
                "from_confirmation": "1",
            },
        )
        self.assert_in_success_response(
            ["Enter your account details to complete registration."], result
        )
    def test_signup_with_weak_password(self) -> None:
        """
        Check if signing up without a full name redirects to a registration
        form.
        """
        email = "newguy@zulip.com"
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        with self.settings(PASSWORD_MIN_LENGTH=6, PASSWORD_MIN_GUESSES=1000):
            result = self.client_post(
                "/accounts/register/",
                {
                    "password": "easy",
                    "key": find_key_by_email(email),
                    "terms": True,
                    "full_name": "New Guy",
                    "from_confirmation": "1",
                },
            )
            self.assert_in_success_response(
                ["Enter your account details to complete registration."], result
            )
            result = self.submit_reg_form_for_user(email, "easy", full_name="New Guy")
            self.assert_in_success_response(["The password is too weak."], result)
            with self.assertRaises(UserProfile.DoesNotExist):
                # Account wasn't created.
                get_user(email, get_realm("zulip"))
    def test_signup_with_default_stream_group(self) -> None:
        # Check if user is subscribed to the streams of default
        # stream group as well as default streams.
        email = self.nonreg_email("newguy")
        password = "newpassword"
        realm = get_realm("zulip")
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"])
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        default_streams = []
        existing_default_streams = DefaultStream.objects.filter(realm=realm)
        self.assert_length(existing_default_streams, 1)
        self.assertEqual(existing_default_streams[0].stream.name, "Verona")
        default_streams.append(existing_default_streams[0].stream)
        for stream_name in ["venice", "rome"]:
            stream = get_stream(stream_name, realm)
            do_add_default_stream(stream)
            default_streams.append(stream)
        group1_streams = []
        for stream_name in ["scotland", "denmark"]:
            stream = get_stream(stream_name, realm)
            group1_streams.append(stream)
        do_create_default_stream_group(realm, "group 1", "group 1 description", group1_streams)
        result = self.submit_reg_form_for_user(email, password, default_stream_groups=["group 1"])
        self.check_user_subscribed_only_to_streams("newguy", default_streams + group1_streams)
    def test_signup_two_confirmation_links(self) -> None:
        email = self.nonreg_email("newguy")
        password = "newpassword"
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"])
        first_confirmation_url = self.get_confirmation_url_from_outbox(email)
        first_confirmation_key = find_key_by_email(email)
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"])
        second_confirmation_url = self.get_confirmation_url_from_outbox(email)
        # Sanity check:
        self.assertNotEqual(first_confirmation_url, second_confirmation_url)
        # Register the account (this will use the second confirmation url):
        result = self.submit_reg_form_for_user(
            email, password, full_name="New Guy", from_confirmation="1"
        )
        self.assert_in_success_response(
            ["Enter your account details to complete registration.", "New Guy", email], result
        )
        result = self.submit_reg_form_for_user(email, password, full_name="New Guy")
        user_profile = UserProfile.objects.get(delivery_email=email)
        self.assertEqual(user_profile.delivery_email, email)
        # Now try to to register using the first confirmation url:
        result = self.client_get(first_confirmation_url)
        self.assertEqual(result.status_code, 404)
        result = self.client_post(
            "/accounts/register/",
            {
                "password": password,
                "key": first_confirmation_key,
                "terms": True,
                "full_name": "New Guy",
                "from_confirmation": "1",
            },
        )
        # Error page should be displayed
        self.assertEqual(result.status_code, 404)
        self.assert_in_response(
            "Whoops. The confirmation link has expired or been deactivated.", result
        )
    def test_signup_with_multiple_default_stream_groups(self) -> None:
        # Check if user is subscribed to the streams of default
        # stream groups as well as default streams.
        email = self.nonreg_email("newguy")
        password = "newpassword"
        realm = get_realm("zulip")
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"])
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        DefaultStream.objects.filter(realm=realm).delete()
        default_streams = []
        for stream_name in ["venice", "verona"]:
            stream = get_stream(stream_name, realm)
            do_add_default_stream(stream)
            default_streams.append(stream)
        group1_streams = []
        for stream_name in ["scotland", "denmark"]:
            stream = get_stream(stream_name, realm)
            group1_streams.append(stream)
        do_create_default_stream_group(realm, "group 1", "group 1 description", group1_streams)
        group2_streams = []
        for stream_name in ["scotland", "rome"]:
            stream = get_stream(stream_name, realm)
            group2_streams.append(stream)
        do_create_default_stream_group(realm, "group 2", "group 2 description", group2_streams)
        result = self.submit_reg_form_for_user(
            email, password, default_stream_groups=["group 1", "group 2"]
        )
        self.check_user_subscribed_only_to_streams(
            "newguy", list(set(default_streams + group1_streams + group2_streams))
        )
    def test_signup_without_user_settings_from_another_realm(self) -> None:
        hamlet_in_zulip = self.example_user("hamlet")
        email = hamlet_in_zulip.delivery_email
        password = "newpassword"
        subdomain = "lear"
        realm = get_realm("lear")
        # Make an account in the Zulip realm, but we're not copying from there.
        hamlet_in_zulip.left_side_userlist = True
        hamlet_in_zulip.default_language = "de"
        hamlet_in_zulip.emojiset = "twitter"
        hamlet_in_zulip.high_contrast_mode = True
        hamlet_in_zulip.enter_sends = True
        hamlet_in_zulip.tutorial_status = UserProfile.TUTORIAL_FINISHED
        hamlet_in_zulip.email_address_visibility = UserProfile.EMAIL_ADDRESS_VISIBILITY_EVERYONE
        hamlet_in_zulip.save()
        result = self.client_post("/accounts/home/", {"email": email}, subdomain=subdomain)
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"], subdomain=subdomain)
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url, subdomain=subdomain)
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email, password, source_realm_id="", HTTP_HOST=subdomain + ".testserver"
        )
        hamlet = get_user(self.example_email("hamlet"), realm)
        self.assertEqual(hamlet.left_side_userlist, False)
        self.assertEqual(hamlet.default_language, "en")
        self.assertEqual(hamlet.emojiset, "google")
        self.assertEqual(hamlet.high_contrast_mode, False)
        self.assertEqual(hamlet.enable_stream_audible_notifications, False)
        self.assertEqual(hamlet.enter_sends, False)
        self.assertEqual(hamlet.tutorial_status, UserProfile.TUTORIAL_WAITING)
    def test_signup_with_user_settings_from_another_realm(self) -> None:
        hamlet_in_zulip = self.example_user("hamlet")
        email = hamlet_in_zulip.delivery_email
        password = "newpassword"
        subdomain = "lear"
        lear_realm = get_realm("lear")
        self.login("hamlet")
        with get_test_image_file("img.png") as image_file:
            self.client_post("/json/users/me/avatar", {"file": image_file})
        hamlet_in_zulip.refresh_from_db()
        hamlet_in_zulip.left_side_userlist = True
        hamlet_in_zulip.default_language = "de"
        hamlet_in_zulip.emojiset = "twitter"
        hamlet_in_zulip.high_contrast_mode = True
        hamlet_in_zulip.enter_sends = True
        hamlet_in_zulip.tutorial_status = UserProfile.TUTORIAL_FINISHED
        hamlet_in_zulip.email_address_visibility = UserProfile.EMAIL_ADDRESS_VISIBILITY_EVERYONE
        hamlet_in_zulip.save()
        # Now we'll be making requests to another subdomain, so we need to logout
        # to avoid polluting the session in the test environment by still being
        # logged in.
        self.logout()
        result = self.client_post("/accounts/home/", {"email": email}, subdomain=subdomain)
        self.assertEqual(result.status_code, 302)
        result = self.client_get(result["Location"], subdomain=subdomain)
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url, subdomain=subdomain)
        self.assertEqual(result.status_code, 200)
        result = self.client_post(
            "/accounts/register/",
            {"password": password, "key": find_key_by_email(email), "from_confirmation": "1"},
            subdomain=subdomain,
        )
        self.assert_in_success_response(
            [
                "Import settings from existing Zulip account",
                "selected >\n                                Zulip Dev",
                "Enter your account details to complete registration.",
            ],
            result,
        )
        result = self.submit_reg_form_for_user(
            email,
            password,
            source_realm_id=str(hamlet_in_zulip.realm.id),
            HTTP_HOST=subdomain + ".testserver",
            email_address_visibility=UserProfile.EMAIL_ADDRESS_VISIBILITY_NOBODY,
        )
        hamlet_in_lear = get_user_by_delivery_email(email, lear_realm)
        self.assertEqual(hamlet_in_lear.left_side_userlist, True)
        self.assertEqual(hamlet_in_lear.default_language, "de")
        self.assertEqual(hamlet_in_lear.emojiset, "twitter")
        self.assertEqual(hamlet_in_lear.high_contrast_mode, True)
        self.assertEqual(hamlet_in_lear.enter_sends, True)
        self.assertEqual(hamlet_in_lear.enable_stream_audible_notifications, False)
        self.assertEqual(hamlet_in_lear.tutorial_status, UserProfile.TUTORIAL_FINISHED)
        self.assertEqual(
            hamlet_in_lear.email_address_visibility, UserProfile.EMAIL_ADDRESS_VISIBILITY_NOBODY
        )
        zulip_path_id = avatar_disk_path(hamlet_in_zulip)
        lear_path_id = avatar_disk_path(hamlet_in_lear)
        with open(zulip_path_id, "rb") as f:
            zulip_avatar_bits = f.read()
        with open(lear_path_id, "rb") as f:
            lear_avatar_bits = f.read()
        self.assertGreater(len(zulip_avatar_bits), 500)
        self.assertEqual(zulip_avatar_bits, lear_avatar_bits)
    def test_signup_invalid_subdomain(self) -> None:
        """
        Check if attempting to authenticate to the wrong subdomain logs an
        error and redirects.
        """
        email = "newuser@zulip.com"
        password = "newpassword"
        result = self.client_post("/accounts/home/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        confirmation_url = self.get_confirmation_url_from_outbox(email)
        result = self.client_get(confirmation_url)
        self.assertEqual(result.status_code, 200)
        def invalid_subdomain(**kwargs: Any) -> Any:
            return_data = kwargs.get("return_data", {})
            return_data["invalid_subdomain"] = True
        with patch("zerver.views.registration.authenticate", side_effect=invalid_subdomain):
            with self.assertLogs(level="ERROR") as m:
                result = self.client_post(
                    "/accounts/register/",
                    {
                        "password": password,
                        "full_name": "New User",
                        "key": find_key_by_email(email),
                        "terms": True,
                    },
                )
                self.assertEqual(
                    m.output,
                    ["ERROR:root:Subdomain mismatch in registration zulip: newuser@zulip.com"],
                )
        self.assertEqual(result.status_code, 302)
    def test_signup_using_invalid_subdomain_preserves_state_of_form(self) -> None:
        """
        Check that when we give invalid subdomain and submit the registration form
        all the values in the form are preserved.
        """
        realm = get_realm("zulip")
        email = self.example_email("iago")
        realm_name = "Test"
        result = self.submit_realm_creation_form(
            email, realm_subdomain=realm.string_id, realm_name=realm_name
        )
        self.assert_in_success_response(
            [
                "Subdomain unavailable. Please choose a different one.",
                'value="Test"',
                'name="realm_name"',
            ],
            result,
        )
    def test_replace_subdomain_in_confirmation_link(self) -> None:
        """
        Check that manually changing the subdomain in a registration
        confirmation link doesn't allow you to register to a different realm.
        """
        email = "newuser@zulip.com"
        self.client_post("/accounts/home/", {"email": email})
        result = self.client_post(
            "/accounts/register/",
            {
                "password": "password",
                "key": find_key_by_email(email),
                "terms": True,
                "full_name": "New User",
                "from_confirmation": "1",
            },
            subdomain="zephyr",
        )
        self.assertEqual(result.status_code, 404)
        self.assert_in_response("We couldn't find your confirmation link", result)
    def test_signup_to_realm_on_manual_license_plan(self) -> None:
        realm = get_realm("zulip")
        denmark_stream = get_stream("Denmark", realm)
        realm.signup_notifications_stream = denmark_stream
        realm.save(update_fields=["signup_notifications_stream"])
        _, ledger = self.subscribe_realm_to_monthly_plan_on_manual_license_management(realm, 5, 5)
        with self.settings(BILLING_ENABLED=True):
            form = HomepageForm({"email": self.nonreg_email("test")}, realm=realm)
            self.assertIn(
                "New members cannot join this organization because all Zulip licenses",
                form.errors["email"][0],
            )
            last_message = Message.objects.last()
            assert last_message is not None
            self.assertIn(
                f"A new member ({self.nonreg_email('test')}) was unable to join your organization because all Zulip",
                last_message.content,
            )
            self.assertEqual(last_message.recipient.type_id, denmark_stream.id)
        ledger.licenses_at_next_renewal = 50
        ledger.save(update_fields=["licenses_at_next_renewal"])
        with self.settings(BILLING_ENABLED=True):
            form = HomepageForm({"email": self.nonreg_email("test")}, realm=realm)
            self.assertIn(
                "New members cannot join this organization because all Zulip licenses",
                form.errors["email"][0],
            )
        ledger.licenses = 50
        ledger.save(update_fields=["licenses"])
        with self.settings(BILLING_ENABLED=True):
            form = HomepageForm({"email": self.nonreg_email("test")}, realm=realm)
            self.assertEqual(form.errors, {})
    def test_failed_signup_due_to_restricted_domain(self) -> None:
        realm = get_realm("zulip")
        do_set_realm_property(realm, "invite_required", False, acting_user=None)
        do_set_realm_property(realm, "emails_restricted_to_domains", True, acting_user=None)
        email = "user@acme.com"
        form = HomepageForm({"email": email}, realm=realm)
        self.assertIn(
            f"Your email address, {email}, is not in one of the domains", form.errors["email"][0]
        )
    def test_failed_signup_due_to_disposable_email(self) -> None:
        realm = get_realm("zulip")
        realm.emails_restricted_to_domains = False
        realm.disallow_disposable_email_addresses = True
        realm.save()
        email = "abc@mailnator.com"
        form = HomepageForm({"email": email}, realm=realm)
        self.assertIn("Please use your real email address", form.errors["email"][0])
    def test_failed_signup_due_to_email_containing_plus(self) -> None:
        realm = get_realm("zulip")
        realm.emails_restricted_to_domains = True
        realm.save()
        email = "iago+label@zulip.com"
        form = HomepageForm({"email": email}, realm=realm)
        self.assertIn(
            "Email addresses containing + are not allowed in this organization.",
            form.errors["email"][0],
        )
    def test_failed_signup_due_to_invite_required(self) -> None:
        realm = get_realm("zulip")
        realm.invite_required = True
        realm.save()
        email = "user@zulip.com"
        form = HomepageForm({"email": email}, realm=realm)
        self.assertIn(f"Please request an invite for {email} from", form.errors["email"][0])
    def test_failed_signup_due_to_nonexistent_realm(self) -> None:
        email = "user@acme.com"
        form = HomepageForm({"email": email}, realm=None)
        self.assertIn(
            f"organization you are trying to join using {email} does not exist",
            form.errors["email"][0],
        )
    def test_access_signup_page_in_root_domain_without_realm(self) -> None:
        result = self.client_get("/register", subdomain="", follow=True)
        self.assert_in_success_response(["Find your Zulip accounts"], result)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_registration_from_confirmation(self) -> None:
        password = self.ldap_password("newuser")
        email = "newuser@zulip.com"
        subdomain = "zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        from django.core.mail import outbox
        for message in reversed(outbox):
            if email in message.to:
                match = re.search(settings.EXTERNAL_HOST + r"(\S+)>", str(message.body))
                assert match is not None
                [confirmation_url] = match.groups()
                break
        else:
            raise AssertionError("Couldn't find a confirmation email.")
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            result = self.client_get(confirmation_url)
            self.assertEqual(result.status_code, 200)
            # Full name should be set from LDAP
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Ignore",
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assert_in_success_response(
                [
                    "Enter your account details to complete registration.",
                    "New LDAP fullname",
                    "newuser@zulip.com",
                ],
                result,
            )
            # Verify that the user is asked for name
            self.assert_in_success_response(["id_full_name"], result)
            # Verify that user is asked for its LDAP/Active Directory password.
            self.assert_in_success_response(
                ["Enter your LDAP/Active Directory password.", "ldap-password"], result
            )
            self.assert_not_in_success_response(["id_password"], result)
            # Test the TypeError exception handler
            with patch(
                "zproject.backends.ZulipLDAPAuthBackendBase.get_mapped_name", side_effect=TypeError
            ):
                result = self.submit_reg_form_for_user(
                    email,
                    password,
                    from_confirmation="1",
                    # Pass HTTP_HOST for the target subdomain
                    HTTP_HOST=subdomain + ".testserver",
                )
            self.assert_in_success_response(
                ["Enter your account details to complete registration.", "newuser@zulip.com"],
                result,
            )
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.EmailAuthBackend",
            "zproject.backends.ZulipLDAPUserPopulator",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_populate_only_registration_from_confirmation(self) -> None:
        password = self.ldap_password("newuser")
        email = "newuser@zulip.com"
        subdomain = "zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        from django.core.mail import outbox
        for message in reversed(outbox):
            if email in message.to:
                match = re.search(settings.EXTERNAL_HOST + r"(\S+)>", str(message.body))
                assert match is not None
                [confirmation_url] = match.groups()
                break
        else:
            raise AssertionError("Couldn't find a confirmation email.")
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_BIND_PASSWORD="",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
            AUTH_LDAP_USER_DN_TEMPLATE="uid=%(user)s,ou=users,dc=zulip,dc=com",
        ):
            result = self.client_get(confirmation_url)
            self.assertEqual(result.status_code, 200)
            # Full name should be set from LDAP
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Ignore",
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assert_in_success_response(
                [
                    "Enter your account details to complete registration.",
                    "New LDAP fullname",
                    "newuser@zulip.com",
                ],
                result,
            )
            # Verify that the user is asked for name
            self.assert_in_success_response(["id_full_name"], result)
            # Verify that user is NOT asked for its LDAP/Active Directory password.
            # LDAP is not configured for authentication in this test.
            self.assert_not_in_success_response(
                ["Enter your LDAP/Active Directory password.", "ldap-password"], result
            )
            # If we were using e.g. the SAML auth backend, there
            # shouldn't be a password prompt, but since it uses the
            # EmailAuthBackend, there should be password field here.
            self.assert_in_success_response(["id_password"], result)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.SAMLAuthBackend",
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_registration_email_backend_disabled_bypass_attempt(self) -> None:
        """
        Tests for the case of LDAP + external auth backend being the ones enabled and
        a user using the registration page to get a confirmation link and then trying
        to use it to create a new account with their own email that's not authenticated
        by either of the backends.
        """
        email = "no_such_user_in_ldap@example.com"
        subdomain = "zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        full_name = "New LDAP fullname"
        result = self.client_post("/register/", {"email": email}, subdomain=subdomain)
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ), self.assertLogs("zulip.ldap", level="DEBUG") as ldap_logs, self.assertLogs(
            level="WARNING"
        ) as root_logs:
            # Click confirmation link
            result = self.submit_reg_form_for_user(
                email,
                None,
                full_name="Ignore",
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assert_in_success_response(
                ["Enter your account details to complete registration.", email], result
            )
            # Submit the final form, attempting to register the user despite
            # no match in ldap.
            result = self.submit_reg_form_for_user(
                email,
                "newpassword",
                full_name=full_name,
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            # Didn't create an account
            with self.assertRaises(UserProfile.DoesNotExist):
                UserProfile.objects.get(delivery_email=email)
            self.assertEqual(result.status_code, 302)
            self.assertEqual(
                result["Location"], "/accounts/login/?email=no_such_user_in_ldap%40example.com"
            )
            self.assertEqual(
                root_logs.output,
                [
                    "WARNING:root:New account email no_such_user_in_ldap@example.com could not be found in LDAP",
                ],
            )
            self.assertEqual(
                ldap_logs.output,
                [
                    "DEBUG:zulip.ldap:ZulipLDAPAuthBackend: Email no_such_user_in_ldap@example.com does not match LDAP domain zulip.com.",
                ],
            )
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_registration_end_to_end(self) -> None:
        password = self.ldap_password("newuser")
        email = "newuser@zulip.com"
        subdomain = "zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        full_name = "New LDAP fullname"
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            # Click confirmation link
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Ignore",
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            # Full name should be set from LDAP
            self.assert_in_success_response(
                [
                    "Enter your account details to complete registration.",
                    full_name,
                    "newuser@zulip.com",
                ],
                result,
            )
            # Submit the final form with the wrong password.
            result = self.submit_reg_form_for_user(
                email,
                "wrongpassword",
                full_name=full_name,
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            # Didn't create an account
            with self.assertRaises(UserProfile.DoesNotExist):
                user_profile = UserProfile.objects.get(delivery_email=email)
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "/accounts/login/?email=newuser%40zulip.com")
            # Submit the final form with the correct password.
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name=full_name,
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            user_profile = UserProfile.objects.get(delivery_email=email)
            # Name comes from form which was set by LDAP.
            self.assertEqual(user_profile.full_name, full_name)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_split_full_name_mapping(self) -> None:
        self.init_default_ldap_database()
        ldap_user_attr_map = {"first_name": "sn", "last_name": "cn"}
        subdomain = "zulip"
        email = "newuser_splitname@zulip.com"
        password = self.ldap_password("newuser_splitname")
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            # Click confirmation link
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Ignore",
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            # Test split name mapping.
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Ignore",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            user_profile = UserProfile.objects.get(delivery_email=email)
            # Name comes from form which was set by LDAP.
            self.assertEqual(user_profile.full_name, "First Last")
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_auto_registration_on_login(self) -> None:
        """The most common way for LDAP authentication to be used is with a
        server that doesn't have a terms-of-service required, in which
        case we offer a complete single-sign-on experience (where the
        user just enters their LDAP username and password, and their
        account is created if it doesn't already exist).
        This test verifies that flow.
        """
        password = self.ldap_password("newuser")
        email = "newuser@zulip.com"
        subdomain = "zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {
            "full_name": "cn",
            "custom_profile_field__phone_number": "homePhone",
        }
        full_name = "New LDAP fullname"
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            self.login_with_return(email, password, HTTP_HOST=subdomain + ".testserver")
            user_profile = UserProfile.objects.get(delivery_email=email)
            # Name comes from form which was set by LDAP.
            self.assertEqual(user_profile.full_name, full_name)
            # Test custom profile fields are properly synced.
            phone_number_field = CustomProfileField.objects.get(
                realm=user_profile.realm, name="Phone number"
            )
            phone_number_field_value = CustomProfileFieldValue.objects.get(
                user_profile=user_profile, field=phone_number_field
            )
            self.assertEqual(phone_number_field_value.value, "a-new-number")
    @override_settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipLDAPAuthBackend",))
    def test_ldap_auto_registration_on_login_invalid_email_in_directory(self) -> None:
        password = self.ldap_password("newuser_with_email")
        username = "newuser_with_email"
        subdomain = "zulip"
        self.init_default_ldap_database()
        self.change_ldap_user_attr("newuser_with_email", "mail", "thisisnotavalidemail")
        with self.settings(
            LDAP_EMAIL_ATTR="mail",
        ), self.assertLogs("zulip.auth.ldap", "WARNING") as mock_log:
            original_user_count = UserProfile.objects.count()
            self.login_with_return(username, password, HTTP_HOST=subdomain + ".testserver")
            # Verify that the process failed as intended - no UserProfile is created.
            self.assertEqual(UserProfile.objects.count(), original_user_count)
            self.assertEqual(
                mock_log.output,
                ["WARNING:zulip.auth.ldap:thisisnotavalidemail is not a valid email address."],
            )
    @override_settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipLDAPAuthBackend",))
    def test_ldap_registration_multiple_realms(self) -> None:
        password = self.ldap_password("newuser")
        email = "newuser@zulip.com"
        self.init_default_ldap_database()
        ldap_user_attr_map = {
            "full_name": "cn",
        }
        do_create_realm("test", "test", emails_restricted_to_domains=False)
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            subdomain = "zulip"
            self.login_with_return(email, password, HTTP_HOST=subdomain + ".testserver")
            user_profile = UserProfile.objects.get(delivery_email=email, realm=get_realm("zulip"))
            self.logout()
            # Test registration in another realm works.
            subdomain = "test"
            self.login_with_return(email, password, HTTP_HOST=subdomain + ".testserver")
            user_profile = UserProfile.objects.get(delivery_email=email, realm=get_realm("test"))
            self.assertEqual(user_profile.delivery_email, email)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_ldap_registration_when_names_changes_are_disabled(self) -> None:
        password = self.ldap_password("newuser")
        email = "newuser@zulip.com"
        subdomain = "zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            # Click confirmation link. This will 'authenticated_full_name'
            # session variable which will be used to set the fullname of
            # the user.
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Ignore",
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            with patch("zerver.views.registration.name_changes_disabled", return_value=True):
                result = self.submit_reg_form_for_user(
                    email,
                    password,
                    # Pass HTTP_HOST for the target subdomain
                    HTTP_HOST=subdomain + ".testserver",
                )
            user_profile = UserProfile.objects.get(delivery_email=email)
            # Name comes from LDAP session.
            self.assertEqual(user_profile.full_name, "New LDAP fullname")
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.EmailAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_signup_with_ldap_and_email_enabled_using_email_with_ldap_append_domain(self) -> None:
        password = "nonldappassword"
        email = "newuser@zulip.com"
        subdomain = "zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # If the user's email is inside the LDAP directory and we just
        # have a wrong password, then we refuse to create an account
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            result = self.submit_reg_form_for_user(
                email,
                password,
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assertEqual(result.status_code, 200)
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Non-LDAP Full Name",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assertEqual(result.status_code, 302)
            # We get redirected back to the login page because password was wrong
            self.assertEqual(result["Location"], "/accounts/login/?email=newuser%40zulip.com")
            self.assertFalse(UserProfile.objects.filter(delivery_email=email).exists())
        # For the rest of the test we delete the user from ldap.
        del self.mock_ldap.directory["uid=newuser,ou=users,dc=zulip,dc=com"]
        # If the user's email is not in the LDAP directory, but fits LDAP_APPEND_DOMAIN,
        # we refuse to create the account.
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ), self.assertLogs("zulip.ldap", "DEBUG") as debug_log:
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Non-LDAP Full Name",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assertEqual(result.status_code, 302)
            # We get redirected back to the login page because emails matching LDAP_APPEND_DOMAIN,
            # aren't allowed to create non-LDAP accounts.
            self.assertEqual(result["Location"], "/accounts/login/?email=newuser%40zulip.com")
            self.assertFalse(UserProfile.objects.filter(delivery_email=email).exists())
            self.assertEqual(
                debug_log.output,
                [
                    "DEBUG:zulip.ldap:ZulipLDAPAuthBackend: No LDAP user matching django_to_ldap_username result: newuser. Input username: newuser@zulip.com"
                ],
            )
        # If the email is outside of LDAP_APPEND_DOMAIN, we successfully create a non-LDAP account,
        # with the password managed in the Zulip database.
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="example.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            with self.assertLogs(level="WARNING") as m:
                result = self.submit_reg_form_for_user(
                    email,
                    password,
                    from_confirmation="1",
                    # Pass HTTP_HOST for the target subdomain
                    HTTP_HOST=subdomain + ".testserver",
                )
            self.assertEqual(result.status_code, 200)
            self.assertEqual(
                m.output,
                ["WARNING:root:New account email newuser@zulip.com could not be found in LDAP"],
            )
            with self.assertLogs("zulip.ldap", "DEBUG") as debug_log:
                result = self.submit_reg_form_for_user(
                    email,
                    password,
                    full_name="Non-LDAP Full Name",
                    # Pass HTTP_HOST for the target subdomain
                    HTTP_HOST=subdomain + ".testserver",
                )
            self.assertEqual(
                debug_log.output,
                [
                    "DEBUG:zulip.ldap:ZulipLDAPAuthBackend: Email newuser@zulip.com does not match LDAP domain example.com."
                ],
            )
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "http://zulip.testserver/")
            user_profile = UserProfile.objects.get(delivery_email=email)
            # Name comes from the POST request, not LDAP
            self.assertEqual(user_profile.full_name, "Non-LDAP Full Name")
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.EmailAuthBackend",
            "zproject.backends.ZulipDummyBackend",
        )
    )
    def test_signup_with_ldap_and_email_enabled_using_email_with_ldap_email_search(self) -> None:
        # If the user's email is inside the LDAP directory and we just
        # have a wrong password, then we refuse to create an account
        password = "nonldappassword"
        email = "newuser_email@zulip.com"  # belongs to user uid=newuser_with_email in the test directory
        subdomain = "zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_EMAIL_ATTR="mail",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            result = self.submit_reg_form_for_user(
                email,
                password,
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assertEqual(result.status_code, 200)
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Non-LDAP Full Name",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assertEqual(result.status_code, 302)
            # We get redirected back to the login page because password was wrong
            self.assertEqual(result["Location"], "/accounts/login/?email=newuser_email%40zulip.com")
            self.assertFalse(UserProfile.objects.filter(delivery_email=email).exists())
        # If the user's email is not in the LDAP directory, though, we
        # successfully create an account with a password in the Zulip
        # database.
        password = "nonldappassword"
        email = "nonexistent@zulip.com"
        subdomain = "zulip"
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_EMAIL_ATTR="mail",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            with self.assertLogs(level="WARNING") as m:
                result = self.submit_reg_form_for_user(
                    email,
                    password,
                    from_confirmation="1",
                    # Pass HTTP_HOST for the target subdomain
                    HTTP_HOST=subdomain + ".testserver",
                )
                self.assertEqual(result.status_code, 200)
                self.assertEqual(
                    m.output,
                    [
                        "WARNING:root:New account email nonexistent@zulip.com could not be found in LDAP"
                    ],
                )
            with self.assertLogs("zulip.ldap", "DEBUG") as debug_log:
                result = self.submit_reg_form_for_user(
                    email,
                    password,
                    full_name="Non-LDAP Full Name",
                    # Pass HTTP_HOST for the target subdomain
                    HTTP_HOST=subdomain + ".testserver",
                )
            self.assertEqual(
                debug_log.output,
                [
                    "DEBUG:zulip.ldap:ZulipLDAPAuthBackend: No LDAP user matching django_to_ldap_username result: nonexistent@zulip.com. Input username: nonexistent@zulip.com"
                ],
            )
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "http://zulip.testserver/")
            user_profile = UserProfile.objects.get(delivery_email=email)
            # Name comes from the POST request, not LDAP
            self.assertEqual(user_profile.full_name, "Non-LDAP Full Name")
    def ldap_invite_and_signup_as(
        self, invite_as: int, streams: Sequence[str] = ["Denmark"]
    ) -> None:
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        subdomain = "zulip"
        email = "newuser@zulip.com"
        password = self.ldap_password("newuser")
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
        ):
            with self.assertLogs("zulip.ldap", "DEBUG") as debug_log:
                # Invite user.
                self.login("iago")
            self.assertEqual(
                debug_log.output,
                [
                    "DEBUG:zulip.ldap:ZulipLDAPAuthBackend: No LDAP user matching django_to_ldap_username result: iago. Input username: iago@zulip.com"
                ],
            )
            stream_ids = [self.get_stream_id(stream_name) for stream_name in streams]
            response = self.client_post(
                "/json/invites",
                {
                    "invitee_emails": email,
                    "stream_ids": orjson.dumps(stream_ids).decode(),
                    "invite_as": invite_as,
                },
            )
            self.assert_json_success(response)
            self.logout()
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Ignore",
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assertEqual(result.status_code, 200)
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="Ignore",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assertEqual(result.status_code, 302)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.EmailAuthBackend",
        )
    )
    def test_ldap_invite_user_as_admin(self) -> None:
        self.ldap_invite_and_signup_as(PreregistrationUser.INVITE_AS["REALM_ADMIN"])
        user_profile = UserProfile.objects.get(delivery_email=self.nonreg_email("newuser"))
        self.assertTrue(user_profile.is_realm_admin)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.EmailAuthBackend",
        )
    )
    def test_ldap_invite_user_as_guest(self) -> None:
        self.ldap_invite_and_signup_as(PreregistrationUser.INVITE_AS["GUEST_USER"])
        user_profile = UserProfile.objects.get(delivery_email=self.nonreg_email("newuser"))
        self.assertTrue(user_profile.is_guest)
    @override_settings(
        AUTHENTICATION_BACKENDS=(
            "zproject.backends.ZulipLDAPAuthBackend",
            "zproject.backends.EmailAuthBackend",
        )
    )
    def test_ldap_invite_streams(self) -> None:
        stream_name = "Rome"
        realm = get_realm("zulip")
        stream = get_stream(stream_name, realm)
        default_streams = get_default_streams_for_realm(realm.id)
        default_streams_name = [stream.name for stream in default_streams]
        self.assertNotIn(stream_name, default_streams_name)
        # Invite user.
        self.ldap_invite_and_signup_as(
            PreregistrationUser.INVITE_AS["REALM_ADMIN"], streams=[stream_name]
        )
        user_profile = UserProfile.objects.get(delivery_email=self.nonreg_email("newuser"))
        self.assertTrue(user_profile.is_realm_admin)
        sub = get_stream_subscriptions_for_user(user_profile).filter(recipient__type_id=stream.id)
        self.assert_length(sub, 1)
    def test_registration_when_name_changes_are_disabled(self) -> None:
        """
        Test `name_changes_disabled` when we are not running under LDAP.
        """
        password = self.ldap_password("newuser")
        email = "newuser@zulip.com"
        subdomain = "zulip"
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        with patch("zerver.views.registration.name_changes_disabled", return_value=True):
            result = self.submit_reg_form_for_user(
                email,
                password,
                full_name="New Name",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            user_profile = UserProfile.objects.get(delivery_email=email)
            # 'New Name' comes from POST data; not from LDAP session.
            self.assertEqual(user_profile.full_name, "New Name")
    def test_realm_creation_through_ldap(self) -> None:
        password = self.ldap_password("newuser")
        email = "newuser@zulip.com"
        subdomain = "zulip"
        realm_name = "Zulip"
        self.init_default_ldap_database()
        ldap_user_attr_map = {"full_name": "cn"}
        with patch("zerver.views.registration.get_subdomain", return_value=subdomain):
            result = self.client_post("/register/", {"email": email})
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"])
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        from django.core.mail import outbox
        for message in reversed(outbox):
            if email in message.to:
                match = re.search(settings.EXTERNAL_HOST + r"(\S+)>", str(message.body))
                assert match is not None
                [confirmation_url] = match.groups()
                break
        else:
            raise AssertionError("Couldn't find a confirmation email.")
        with self.settings(
            POPULATE_PROFILE_VIA_LDAP=True,
            LDAP_APPEND_DOMAIN="zulip.com",
            AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
            AUTHENTICATION_BACKENDS=("zproject.backends.ZulipLDAPAuthBackend",),
            TERMS_OF_SERVICE_VERSION=1.0,
        ):
            result = self.client_get(confirmation_url)
            self.assertEqual(result.status_code, 200)
            key = find_key_by_email(email)
            confirmation = Confirmation.objects.get(confirmation_key=key)
            prereg_user = confirmation.content_object
            assert prereg_user is not None
            prereg_user.realm_creation = True
            prereg_user.save()
            result = self.submit_reg_form_for_user(
                email,
                password,
                realm_name=realm_name,
                realm_subdomain=subdomain,
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
            self.assert_in_success_response(
                ["Enter your account details to complete registration.", "newuser@zulip.com"],
                result,
            )
    @patch(
        "DNS.dnslookup",
        return_value=[["sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh"]],
    )
    def test_registration_of_mirror_dummy_user(self, ignored: Any) -> None:
        password = "test"
        subdomain = "zephyr"
        user_profile = self.mit_user("sipbtest")
        email = user_profile.delivery_email
        user_profile.is_mirror_dummy = True
        user_profile.save()
        change_user_is_active(user_profile, False)
        result = self.client_post("/register/", {"email": email}, subdomain="zephyr")
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].endswith(
                f"/accounts/send_confirm/?email={urllib.parse.quote(email)}"
            )
        )
        result = self.client_get(result["Location"], subdomain="zephyr")
        self.assert_in_response("Check your email", result)
        # Visit the confirmation link.
        from django.core.mail import outbox
        for message in reversed(outbox):
            if email in message.to:
                match = re.search(settings.EXTERNAL_HOST + r"(\S+)>", str(message.body))
                assert match is not None
                [confirmation_url] = match.groups()
                break
        else:
            raise AssertionError("Couldn't find a confirmation email.")
        result = self.client_get(confirmation_url, subdomain="zephyr")
        self.assertEqual(result.status_code, 200)
        # If the mirror dummy user is already active, attempting to
        # submit the registration form should raise an AssertionError
        # (this is an invalid state, so it's a bug we got here):
        change_user_is_active(user_profile, True)
        with self.assertRaisesRegex(
            AssertionError, "Mirror dummy user is already active!"
        ), self.assertLogs("django.request", "ERROR") as error_log:
            result = self.submit_reg_form_for_user(
                email,
                password,
                from_confirmation="1",
                # Pass HTTP_HOST for the target subdomain
                HTTP_HOST=subdomain + ".testserver",
            )
        self.assertTrue(
            "ERROR:django.request:Internal Server Error: /accounts/register/" in error_log.output[0]
        )
        self.assertTrue(
            'raise AssertionError("Mirror dummy user is already active!' in error_log.output[0]
        )
        self.assertTrue(
            "AssertionError: Mirror dummy user is already active!" in error_log.output[0]
        )
        change_user_is_active(user_profile, False)
        result = self.submit_reg_form_for_user(
            email,
            password,
            from_confirmation="1",
            # Pass HTTP_HOST for the target subdomain
            HTTP_HOST=subdomain + ".testserver",
        )
        self.assertEqual(result.status_code, 200)
        result = self.submit_reg_form_for_user(
            email,
            password,
            # Pass HTTP_HOST for the target subdomain
            HTTP_HOST=subdomain + ".testserver",
        )
        self.assertEqual(result.status_code, 302)
        self.assert_logged_in_user_id(user_profile.id)
    @patch(
        "DNS.dnslookup",
        return_value=[["sipbtest:*:20922:101:Fred Sipb,,,:/mit/sipbtest:/bin/athena/tcsh"]],
    )
    def test_registration_of_active_mirror_dummy_user(self, ignored: Any) -> None:
        """
        Trying to activate an already-active mirror dummy user should
        raise an AssertionError.
        """
        user_profile = self.mit_user("sipbtest")
        email = user_profile.delivery_email
        user_profile.is_mirror_dummy = True
        user_profile.save()
        change_user_is_active(user_profile, True)
        with self.assertRaisesRegex(
            AssertionError, "Mirror dummy user is already active!"
        ), self.assertLogs("django.request", "ERROR") as error_log:
            self.client_post("/register/", {"email": email}, subdomain="zephyr")
        self.assertTrue(
            "ERROR:django.request:Internal Server Error: /register/" in error_log.output[0]
        )
        self.assertTrue(
            'raise AssertionError("Mirror dummy user is already active!' in error_log.output[0]
        )
        self.assertTrue(
            "AssertionError: Mirror dummy user is already active!" in error_log.output[0]
        )
    @override_settings(TERMS_OF_SERVICE_VERSION=None)
    def test_dev_user_registration(self) -> None:
        """Verify that /devtools/register_user creates a new user, logs them
        in, and redirects to the logged-in app."""
        count = UserProfile.objects.count()
        email = f"user-{count}@zulip.com"
        result = self.client_post("/devtools/register_user/")
        user_profile = UserProfile.objects.all().order_by("id").last()
        assert user_profile is not None
        self.assertEqual(result.status_code, 302)
        self.assertEqual(user_profile.delivery_email, email)
        self.assertEqual(result["Location"], "http://zulip.testserver/")
        self.assert_logged_in_user_id(user_profile.id)
    @override_settings(TERMS_OF_SERVICE_VERSION=None)
    def test_dev_user_registration_create_realm(self) -> None:
        count = UserProfile.objects.count()
        string_id = f"realm-{count}"
        result = self.client_post("/devtools/register_realm/")
        self.assertEqual(result.status_code, 302)
        self.assertTrue(
            result["Location"].startswith(f"http://{string_id}.testserver/accounts/login/subdomain")
        )
        result = self.client_get(result["Location"], subdomain=string_id)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], f"http://{string_id}.testserver")
        user_profile = UserProfile.objects.all().order_by("id").last()
        assert user_profile is not None
        self.assert_logged_in_user_id(user_profile.id)
    @override_settings(TERMS_OF_SERVICE_VERSION=None)
    def test_dev_user_registration_create_demo_realm(self) -> None:
        result = self.client_post("/devtools/register_demo_realm/")
        self.assertEqual(result.status_code, 302)
        realm = Realm.objects.latest("date_created")
        self.assertTrue(
            result["Location"].startswith(
                f"http://{realm.string_id}.testserver/accounts/login/subdomain"
            )
        )
        result = self.client_get(result["Location"], subdomain=realm.string_id)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], f"http://{realm.string_id}.testserver")
        user_profile = UserProfile.objects.all().order_by("id").last()
        assert user_profile is not None
        self.assert_logged_in_user_id(user_profile.id)
        expected_deletion_date = realm.date_created + datetime.timedelta(
            days=settings.DEMO_ORG_DEADLINE_DAYS
        )
        self.assertEqual(realm.demo_organization_scheduled_deletion_date, expected_deletion_date)
    def test_get_default_language_for_new_user(self) -> None:
        realm = get_realm("zulip")
        req = HostRequestMock()
        req.META["HTTP_ACCEPT_LANGUAGE"] = "de,en"
        self.assertEqual(get_default_language_for_new_user(req, realm), "de")
        do_set_realm_property(realm, "default_language", "hi", acting_user=None)
        realm.refresh_from_db()
        req = HostRequestMock()
        req.META["HTTP_ACCEPT_LANGUAGE"] = "de,en"
        self.assertEqual(get_default_language_for_new_user(req, realm), "de")
        req = HostRequestMock()
        req.META["HTTP_ACCEPT_LANGUAGE"] = ""
        self.assertEqual(get_default_language_for_new_user(req, realm), "hi")
class DeactivateUserTest(ZulipTestCase):
    def test_deactivate_user(self) -> None:
        user = self.example_user("hamlet")
        email = user.email
        self.login_user(user)
        self.assertTrue(user.is_active)
        result = self.client_delete("/json/users/me")
        self.assert_json_success(result)
        user = self.example_user("hamlet")
        self.assertFalse(user.is_active)
        password = initial_password(email)
        assert password is not None
        self.assert_login_failure(email, password=password)
    def test_do_not_deactivate_final_owner(self) -> None:
        user = self.example_user("desdemona")
        user_2 = self.example_user("iago")
        self.login_user(user)
        self.assertTrue(user.is_active)
        result = self.client_delete("/json/users/me")
        self.assert_json_error(result, "Cannot deactivate the only organization owner.")
        user = self.example_user("desdemona")
        self.assertTrue(user.is_active)
        self.assertTrue(user.is_realm_owner)
        do_change_user_role(user_2, UserProfile.ROLE_REALM_OWNER, acting_user=None)
        self.assertTrue(user_2.is_realm_owner)
        result = self.client_delete("/json/users/me")
        self.assert_json_success(result)
        do_change_user_role(user, UserProfile.ROLE_REALM_OWNER, acting_user=None)
    def test_do_not_deactivate_final_user(self) -> None:
        realm = get_realm("zulip")
        for user_profile in UserProfile.objects.filter(realm=realm).exclude(
            role=UserProfile.ROLE_REALM_OWNER
        ):
            do_deactivate_user(user_profile, acting_user=None)
        user = self.example_user("desdemona")
        self.login_user(user)
        result = self.client_delete("/json/users/me")
        self.assert_json_error(result, "Cannot deactivate the only user.")
class TestLoginPage(ZulipTestCase):
    @patch("django.http.HttpRequest.get_host")
    def test_login_page_redirects_for_root_alias(self, mock_get_host: MagicMock) -> None:
        mock_get_host.return_value = "www.testserver"
        with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
            result = self.client_get("/en/login/")
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "/accounts/go/")
            result = self.client_get("/en/login/", {"next": "/upgrade/"})
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "/accounts/go/?next=%2Fupgrade%2F")
    @patch("django.http.HttpRequest.get_host")
    def test_login_page_redirects_for_root_domain(self, mock_get_host: MagicMock) -> None:
        mock_get_host.return_value = "testserver"
        with self.settings(ROOT_DOMAIN_LANDING_PAGE=True):
            result = self.client_get("/en/login/")
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "/accounts/go/")
            result = self.client_get("/en/login/", {"next": "/upgrade/"})
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "/accounts/go/?next=%2Fupgrade%2F")
        mock_get_host.return_value = "www.zulip.example.com"
        with self.settings(
            ROOT_DOMAIN_LANDING_PAGE=True,
            EXTERNAL_HOST="www.zulip.example.com",
            ROOT_SUBDOMAIN_ALIASES=["test"],
        ):
            result = self.client_get("/en/login/")
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "/accounts/go/")
            result = self.client_get("/en/login/", {"next": "/upgrade/"})
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "/accounts/go/?next=%2Fupgrade%2F")
    def test_login_page_redirects_using_next_when_already_authenticated(self) -> None:
        hamlet = self.example_user("hamlet")
        self.login("hamlet")
        result = self.client_get("/login/", {"next": "/upgrade/"})
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], f"{hamlet.realm.uri}/upgrade/")
    @patch("django.http.HttpRequest.get_host")
    def test_login_page_works_without_subdomains(self, mock_get_host: MagicMock) -> None:
        mock_get_host.return_value = "www.testserver"
        with self.settings(ROOT_SUBDOMAIN_ALIASES=["www"]):
            result = self.client_get("/en/login/")
            self.assertEqual(result.status_code, 200)
        mock_get_host.return_value = "testserver"
        with self.settings(ROOT_SUBDOMAIN_ALIASES=["www"]):
            result = self.client_get("/en/login/")
            self.assertEqual(result.status_code, 200)
    def test_login_page_registration_hint(self) -> None:
        response = self.client_get("/login/")
        self.assert_not_in_success_response(
            ["Don't have an account yet? You need to be invited to join this organization."],
            response,
        )
        realm = get_realm("zulip")
        realm.invite_required = True
        realm.save(update_fields=["invite_required"])
        response = self.client_get("/login/")
        self.assert_in_success_response(
            ["Don't have an account yet? You need to be invited to join this organization."],
            response,
        )
    @patch("django.http.HttpRequest.get_host", return_value="auth.testserver")
    def test_social_auth_subdomain_login_page(self, mock_get_host: MagicMock) -> None:
        result = self.client_get("http://auth.testserver/login/")
        self.assertEqual(result.status_code, 400)
        self.assert_in_response("Authentication subdomain", result)
        zulip_realm = get_realm("zulip")
        session = self.client.session
        session["subdomain"] = "zulip"
        session.save()
        result = self.client_get("http://auth.testserver/login/")
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], zulip_realm.uri)
        session = self.client.session
        session["subdomain"] = "invalid"
        session.save()
        result = self.client_get("http://auth.testserver/login/")
        self.assertEqual(result.status_code, 400)
        self.assert_in_response("Authentication subdomain", result)
    def test_login_page_is_deactivated_validation(self) -> None:
        with patch("zerver.views.auth.logging.info") as mock_info:
            result = self.client_get("/login/?is_deactivated=invalid_email")
            mock_info.assert_called_once()
            self.assert_not_in_success_response(["invalid_email"], result)
class TestFindMyTeam(ZulipTestCase):
    def test_template(self) -> None:
        result = self.client_get("/accounts/find/")
        self.assertIn("Find your Zulip accounts", result.content.decode())
    def test_result(self) -> None:
        # We capitalize a letter in cordelia's email to test that the search is case-insensitive.
        result = self.client_post(
            "/accounts/find/", dict(emails="iago@zulip.com,cordeliA@zulip.com")
        )
        self.assertEqual(result.status_code, 302)
        self.assertEqual(
            result["Location"], "/accounts/find/?emails=iago%40zulip.com%2CcordeliA%40zulip.com"
        )
        result = self.client_get(result["Location"])
        content = result.content.decode()
        self.assertIn("Emails sent! You will only receive emails", content)
        self.assertIn("iago@zulip.com", content)
        self.assertIn("cordeliA@zulip.com", content)
        from django.core.mail import outbox
        self.assert_length(outbox, 2)
        iago_message = outbox[1]
        cordelia_message = outbox[0]
        self.assertIn("Zulip Dev", iago_message.body)
        self.assertNotIn("Lear & Co", iago_message.body)
        self.assertIn("Zulip Dev", cordelia_message.body)
        self.assertIn("Lear & Co", cordelia_message.body)
    def test_find_team_ignore_invalid_email(self) -> None:
        result = self.client_post(
            "/accounts/find/", dict(emails="iago@zulip.com,invalid_email@zulip.com")
        )
        self.assertEqual(result.status_code, 302)
        self.assertEqual(
            result["Location"],
            "/accounts/find/?emails=iago%40zulip.com%2Cinvalid_email%40zulip.com",
        )
        result = self.client_get(result["Location"])
        content = result.content.decode()
        self.assertIn("Emails sent! You will only receive emails", content)
        self.assertIn(self.example_email("iago"), content)
        self.assertIn("invalid_email@", content)
        from django.core.mail import outbox
        self.assert_length(outbox, 1)
    def test_find_team_reject_invalid_email(self) -> None:
        result = self.client_post("/accounts/find/", dict(emails="invalid_string"))
        self.assertEqual(result.status_code, 200)
        self.assertIn(b"Enter a valid email", result.content)
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
        # Just for coverage on perhaps-unnecessary validation code.
        result = self.client_get("/accounts/find/", {"emails": "invalid"})
        self.assertEqual(result.status_code, 200)
    def test_find_team_zero_emails(self) -> None:
        data = {"emails": ""}
        result = self.client_post("/accounts/find/", data)
        self.assertIn("This field is required", result.content.decode())
        self.assertEqual(result.status_code, 200)
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
    def test_find_team_one_email(self) -> None:
        data = {"emails": self.example_email("hamlet")}
        result = self.client_post("/accounts/find/", data)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "/accounts/find/?emails=hamlet%40zulip.com")
        from django.core.mail import outbox
        self.assert_length(outbox, 1)
    def test_find_team_deactivated_user(self) -> None:
        do_deactivate_user(self.example_user("hamlet"), acting_user=None)
        data = {"emails": self.example_email("hamlet")}
        result = self.client_post("/accounts/find/", data)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "/accounts/find/?emails=hamlet%40zulip.com")
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
    def test_find_team_deactivated_realm(self) -> None:
        do_deactivate_realm(get_realm("zulip"), acting_user=None)
        data = {"emails": self.example_email("hamlet")}
        result = self.client_post("/accounts/find/", data)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "/accounts/find/?emails=hamlet%40zulip.com")
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
    def test_find_team_bot_email(self) -> None:
        data = {"emails": self.example_email("webhook_bot")}
        result = self.client_post("/accounts/find/", data)
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "/accounts/find/?emails=webhook-bot%40zulip.com")
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
    def test_find_team_more_than_ten_emails(self) -> None:
        data = {"emails": ",".join(f"hamlet-{i}@zulip.com" for i in range(11))}
        result = self.client_post("/accounts/find/", data)
        self.assertEqual(result.status_code, 200)
        self.assertIn("Please enter at most 10", result.content.decode())
        from django.core.mail import outbox
        self.assert_length(outbox, 0)
class ConfirmationKeyTest(ZulipTestCase):
    def test_confirmation_key(self) -> None:
        request = MagicMock()
        request.session = {
            "confirmation_key": {"confirmation_key": "xyzzy"},
        }
        result = confirmation_key(request)
        self.assert_json_success(result)
        self.assert_in_response("xyzzy", result)
class MobileAuthOTPTest(ZulipTestCase):
    def test_xor_hex_strings(self) -> None:
        self.assertEqual(xor_hex_strings("1237c81ab", "18989fd12"), "0aaf57cb9")
        with self.assertRaises(AssertionError):
            xor_hex_strings("1", "31")
    def test_is_valid_otp(self) -> None:
        self.assertEqual(is_valid_otp("1234"), False)
        self.assertEqual(is_valid_otp("1234abcd" * 8), True)
        self.assertEqual(is_valid_otp("1234abcZ" * 8), False)
    def test_ascii_to_hex(self) -> None:
        self.assertEqual(ascii_to_hex("ZcdR1234"), "5a63645231323334")
        self.assertEqual(hex_to_ascii("5a63645231323334"), "ZcdR1234")
    def test_otp_encrypt_api_key(self) -> None:
        api_key = "12ac" * 8
        otp = "7be38894" * 8
        result = otp_encrypt_api_key(api_key, otp)
        self.assertEqual(result, "4ad1e9f7" * 8)
        decrypted = otp_decrypt_api_key(result, otp)
        self.assertEqual(decrypted, api_key)
class NoReplyEmailTest(ZulipTestCase):
    def test_noreply_email_address(self) -> None:
        self.assertTrue(
            re.search(self.TOKENIZED_NOREPLY_REGEX, FromAddress.tokenized_no_reply_address())
        )
        with self.settings(ADD_TOKENS_TO_NOREPLY_ADDRESS=False):
            self.assertEqual(FromAddress.tokenized_no_reply_address(), "noreply@testserver")
class TwoFactorAuthTest(ZulipTestCase):
    @patch("two_factor.plugins.phonenumber.models.totp")
    def test_two_factor_login(self, mock_totp: MagicMock) -> None:
        token = 123456
        email = self.example_email("hamlet")
        password = self.ldap_password("hamlet")
        user_profile = self.example_user("hamlet")
        user_profile.set_password(password)
        user_profile.save()
        self.create_default_device(user_profile)
        def totp(*args: Any, **kwargs: Any) -> int:
            return token
        mock_totp.side_effect = totp
        with self.settings(
            AUTHENTICATION_BACKENDS=("zproject.backends.EmailAuthBackend",),
            TWO_FACTOR_CALL_GATEWAY="two_factor.gateways.fake.Fake",
            TWO_FACTOR_SMS_GATEWAY="two_factor.gateways.fake.Fake",
            TWO_FACTOR_AUTHENTICATION_ENABLED=True,
        ):
            first_step_data = {
                "username": email,
                "password": password,
                "two_factor_login_view-current_step": "auth",
            }
            with self.assertLogs("two_factor.gateways.fake", "INFO") as info_logs:
                result = self.client_post("/accounts/login/", first_step_data)
            self.assertEqual(
                info_logs.output,
                ['INFO:two_factor.gateways.fake:Fake SMS to +12125550100: "Your token is: 123456"'],
            )
            self.assertEqual(result.status_code, 200)
            second_step_data = {
                "token-otp_token": str(token),
                "two_factor_login_view-current_step": "token",
            }
            result = self.client_post("/accounts/login/", second_step_data)
            self.assertEqual(result.status_code, 302)
            self.assertEqual(result["Location"], "http://zulip.testserver")
            # Going to login page should redirect to '/' if user is already
            # logged in.
            result = self.client_get("/accounts/login/")
            self.assertEqual(result["Location"], "http://zulip.testserver/")
class NameRestrictionsTest(ZulipTestCase):
    def test_whitelisted_disposable_domains(self) -> None:
        self.assertFalse(is_disposable_domain("OPayQ.com"))
class RealmRedirectTest(ZulipTestCase):
    def test_realm_redirect_without_next_param(self) -> None:
        result = self.client_get("/accounts/go/")
        self.assert_in_success_response(["Enter your organization's Zulip URL"], result)
        result = self.client_post("/accounts/go/", {"subdomain": "zephyr"})
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "http://zephyr.testserver/login/")
        result = self.client_post("/accounts/go/", {"subdomain": "invalid"})
        self.assert_in_success_response(["We couldn't find that Zulip organization."], result)
    def test_realm_redirect_with_next_param(self) -> None:
        result = self.client_get("/accounts/go/", {"next": "billing"})
        self.assert_in_success_response(
            ["Enter your organization's Zulip URL", 'action="/accounts/go/?next=billing"'], result
        )
        result = self.client_post("/accounts/go/?next=billing", {"subdomain": "lear"})
        self.assertEqual(result.status_code, 302)
        self.assertEqual(result["Location"], "http://lear.testserver/login/?next=billing")