mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-29 19:13:53 +00:00 
			
		
		
		
	This makes `has_request_variables` more generic, in the sense of the return value, and also makes it more accurate, in the sense of requiring the first parameter of the decorated function to be `HttpRequest`, and preserving the function signature without using `cast`. This affects some callers of `has_request_variables` or the callers of its decoratedfunctions in the following manners: - Decorated non-view functions called directly in other functions cannot use `request` as a keyword argument. Becasue `Concatenate` turns the concatenated parameters (`request: HttpRequest` in this case) into positional-only parameters. Callers of `get_chart_data` are thus refactored. - Functions to be decorated that accept variadic keyword arguments must define `request: HttpRequest` as positional-only. Mypy in strict mode rejects such functions otherwise because it is possible for the caller to pass a keyword argument that has the same name as `request` for `**kwargs`. No defining `request: HttpRequest` as positional-only breaks type safety because function with positional-or-keyword parameters cannot be considered a subtype of a function with the same parameters in which some of them are positional-only. Consider `f(x: int, /, **kwargs: object) -> int` and `g(x: int, **kwargs: object) -> int`. `f(12, x="asd")` is valid but `g(12, x="asd")` is not. Signed-off-by: Zixuan James Li <p359101898@gmail.com>
		
			
				
	
	
		
			1068 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1068 lines
		
	
	
		
			42 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import secrets
 | |
| import urllib
 | |
| from email.headerregistry import Address
 | |
| from functools import wraps
 | |
| from typing import Any, Dict, List, Mapping, Optional, cast
 | |
| from urllib.parse import urlencode
 | |
| 
 | |
| import jwt
 | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM
 | |
| from django.conf import settings
 | |
| from django.contrib.auth import authenticate
 | |
| from django.contrib.auth.views import LoginView as DjangoLoginView
 | |
| from django.contrib.auth.views import PasswordResetView as DjangoPasswordResetView
 | |
| from django.contrib.auth.views import logout_then_login as django_logout_then_login
 | |
| from django.core.exceptions import ValidationError
 | |
| from django.core.validators import validate_email
 | |
| from django.forms import Form
 | |
| from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, HttpResponseServerError
 | |
| from django.shortcuts import redirect, render
 | |
| from django.template.response import SimpleTemplateResponse
 | |
| from django.urls import reverse
 | |
| from django.utils.html import escape
 | |
| from django.utils.http import url_has_allowed_host_and_scheme
 | |
| from django.utils.translation import gettext as _
 | |
| from django.views.decorators.csrf import csrf_exempt
 | |
| from django.views.decorators.http import require_safe
 | |
| from markupsafe import Markup as mark_safe
 | |
| from social_django.utils import load_backend, load_strategy
 | |
| from two_factor.forms import BackupTokenForm
 | |
| from two_factor.views import LoginView as BaseTwoFactorLoginView
 | |
| 
 | |
| from confirmation.models import (
 | |
|     Confirmation,
 | |
|     ConfirmationKeyException,
 | |
|     create_confirmation_link,
 | |
|     get_object_from_key,
 | |
|     render_confirmation_key_error,
 | |
| )
 | |
| from version import API_FEATURE_LEVEL, ZULIP_MERGE_BASE, ZULIP_VERSION
 | |
| from zerver.context_processors import get_realm_from_request, login_context, zulip_default_context
 | |
| from zerver.decorator import do_login, log_view_func, process_client, require_post
 | |
| from zerver.forms import (
 | |
|     DEACTIVATED_ACCOUNT_ERROR,
 | |
|     AuthenticationTokenForm,
 | |
|     HomepageForm,
 | |
|     OurAuthenticationForm,
 | |
|     ZulipPasswordResetForm,
 | |
| )
 | |
| from zerver.lib.exceptions import (
 | |
|     AuthenticationFailedError,
 | |
|     InvalidSubdomainError,
 | |
|     JsonableError,
 | |
|     PasswordAuthDisabledError,
 | |
|     PasswordResetRequiredError,
 | |
|     RateLimited,
 | |
|     RealmDeactivatedError,
 | |
|     UserDeactivatedError,
 | |
| )
 | |
| from zerver.lib.mobile_auth_otp import otp_encrypt_api_key
 | |
| from zerver.lib.push_notifications import push_notifications_enabled
 | |
| from zerver.lib.pysa import mark_sanitized
 | |
| from zerver.lib.realm_icon import realm_icon_url
 | |
| from zerver.lib.request import REQ, RequestNotes, has_request_variables
 | |
| from zerver.lib.response import json_success
 | |
| from zerver.lib.sessions import set_expirable_session_var
 | |
| from zerver.lib.subdomains import get_subdomain, is_subdomain_root_or_alias
 | |
| from zerver.lib.types import ViewFuncT
 | |
| from zerver.lib.url_encoding import append_url_query_string
 | |
| from zerver.lib.user_agent import parse_user_agent
 | |
| from zerver.lib.users import get_api_key, is_2fa_verified
 | |
| from zerver.lib.utils import has_api_key_format
 | |
| from zerver.lib.validator import validate_login_email
 | |
| from zerver.models import (
 | |
|     MultiuseInvite,
 | |
|     PreregistrationUser,
 | |
|     Realm,
 | |
|     UserProfile,
 | |
|     filter_to_valid_prereg_users,
 | |
|     get_realm,
 | |
|     remote_user_to_email,
 | |
| )
 | |
| from zerver.signals import email_on_new_login
 | |
| from zproject.backends import (
 | |
|     AUTH_BACKEND_NAME_MAP,
 | |
|     AppleAuthBackend,
 | |
|     ExternalAuthDataDict,
 | |
|     ExternalAuthResult,
 | |
|     GenericOpenIdConnectBackend,
 | |
|     SAMLAuthBackend,
 | |
|     ZulipLDAPAuthBackend,
 | |
|     ZulipLDAPConfigurationError,
 | |
|     ZulipRemoteUserBackend,
 | |
|     auth_enabled_helper,
 | |
|     dev_auth_enabled,
 | |
|     ldap_auth_enabled,
 | |
|     password_auth_enabled,
 | |
|     saml_auth_enabled,
 | |
|     validate_otp_params,
 | |
| )
 | |
| 
 | |
| ExtraContext = Optional[Dict[str, Any]]
 | |
| 
 | |
| 
 | |
| def get_safe_redirect_to(url: str, redirect_host: str) -> str:
 | |
|     is_url_safe = url_has_allowed_host_and_scheme(url=url, allowed_hosts=None)
 | |
|     if is_url_safe:
 | |
|         # Mark as safe to prevent Pysa from surfacing false positives for
 | |
|         # open redirects. In this branch, we have already checked that the URL
 | |
|         # points to the specified 'redirect_host', or is relative.
 | |
|         return urllib.parse.urljoin(redirect_host, mark_sanitized(url))
 | |
|     else:
 | |
|         return redirect_host
 | |
| 
 | |
| 
 | |
| def create_preregistration_user(
 | |
|     email: str,
 | |
|     realm: Optional[Realm],
 | |
|     realm_creation: bool = False,
 | |
|     password_required: bool = True,
 | |
|     full_name: Optional[str] = None,
 | |
|     full_name_validated: bool = False,
 | |
|     multiuse_invite: Optional[MultiuseInvite] = None,
 | |
| ) -> PreregistrationUser:
 | |
|     assert not (realm_creation and realm is not None)
 | |
|     assert not (realm is None and not realm_creation)
 | |
| 
 | |
|     return PreregistrationUser.objects.create(
 | |
|         email=email,
 | |
|         realm_creation=realm_creation,
 | |
|         password_required=password_required,
 | |
|         realm=realm,
 | |
|         full_name=full_name,
 | |
|         full_name_validated=full_name_validated,
 | |
|         multiuse_invite=multiuse_invite,
 | |
|     )
 | |
| 
 | |
| 
 | |
| def maybe_send_to_registration(
 | |
|     request: HttpRequest,
 | |
|     email: str,
 | |
|     full_name: str = "",
 | |
|     mobile_flow_otp: Optional[str] = None,
 | |
|     desktop_flow_otp: Optional[str] = None,
 | |
|     is_signup: bool = False,
 | |
|     password_required: bool = True,
 | |
|     multiuse_object_key: str = "",
 | |
|     full_name_validated: bool = False,
 | |
| ) -> HttpResponse:
 | |
|     """Given a successful authentication for an email address (i.e. we've
 | |
|     confirmed the user controls the email address) that does not
 | |
|     currently have a Zulip account in the target realm, send them to
 | |
|     the registration flow or the "continue to registration" flow,
 | |
|     depending on is_signup, whether the email address can join the
 | |
|     organization (checked in HomepageForm), and similar details.
 | |
|     """
 | |
| 
 | |
|     # In the desktop and mobile registration flows, the sign up
 | |
|     # happens in the browser so the user can use their
 | |
|     # already-logged-in social accounts.  Then at the end, with the
 | |
|     # user account created, we pass the appropriate data to the app
 | |
|     # via e.g. a `zulip://` redirect.  We store the OTP keys for the
 | |
|     # mobile/desktop flow in the session with 1-hour expiry, because
 | |
|     # we want this configuration of having a successful authentication
 | |
|     # result in being logged into the app to persist if the user makes
 | |
|     # mistakes while trying to authenticate (E.g. clicks the wrong
 | |
|     # Google account, hits back, etc.) during a given browser session,
 | |
|     # rather than just logging into the web app in the target browser.
 | |
|     #
 | |
|     # We can't use our usual pre-account-creation state storage
 | |
|     # approach of putting something in PreregistrationUser, because
 | |
|     # that would apply to future registration attempts on other
 | |
|     # devices, e.g. just creating an account on the web on their laptop.
 | |
|     assert not (mobile_flow_otp and desktop_flow_otp)
 | |
|     if mobile_flow_otp:
 | |
|         set_expirable_session_var(
 | |
|             request.session, "registration_mobile_flow_otp", mobile_flow_otp, expiry_seconds=3600
 | |
|         )
 | |
|     elif desktop_flow_otp:
 | |
|         set_expirable_session_var(
 | |
|             request.session, "registration_desktop_flow_otp", desktop_flow_otp, expiry_seconds=3600
 | |
|         )
 | |
| 
 | |
|     try:
 | |
|         # TODO: This should use get_realm_from_request, but a bunch of tests
 | |
|         # rely on mocking get_subdomain here, so they'll need to be tweaked first.
 | |
|         realm: Optional[Realm] = get_realm(get_subdomain(request))
 | |
|     except Realm.DoesNotExist:
 | |
|         realm = None
 | |
| 
 | |
|     multiuse_obj: Optional[MultiuseInvite] = None
 | |
|     from_multiuse_invite = False
 | |
|     if multiuse_object_key:
 | |
|         from_multiuse_invite = True
 | |
|         try:
 | |
|             confirmation_obj = get_object_from_key(
 | |
|                 multiuse_object_key, [Confirmation.MULTIUSE_INVITE], mark_object_used=False
 | |
|             )
 | |
|         except ConfirmationKeyException as exception:
 | |
|             return render_confirmation_key_error(request, exception)
 | |
| 
 | |
|         assert isinstance(confirmation_obj, MultiuseInvite)
 | |
|         multiuse_obj = confirmation_obj
 | |
|         if realm != multiuse_obj.realm:
 | |
|             return render(request, "confirmation/link_does_not_exist.html", status=404)
 | |
| 
 | |
|         invited_as = multiuse_obj.invited_as
 | |
|     else:
 | |
|         invited_as = PreregistrationUser.INVITE_AS["MEMBER"]
 | |
| 
 | |
|     form = HomepageForm({"email": email}, realm=realm, from_multiuse_invite=from_multiuse_invite)
 | |
|     if form.is_valid():
 | |
|         # If the email address is allowed to sign up for an account in
 | |
|         # this organization, construct a PreregistrationUser and
 | |
|         # Confirmation objects, and then send the user to account
 | |
|         # creation or confirm-continue-registration depending on
 | |
|         # is_signup.
 | |
|         try:
 | |
|             prereg_user = filter_to_valid_prereg_users(
 | |
|                 PreregistrationUser.objects.filter(email__iexact=email, realm=realm)
 | |
|             ).latest("invited_at")
 | |
| 
 | |
|             # password_required and full_name data passed here as argument should take precedence
 | |
|             # over the defaults with which the existing PreregistrationUser that we've just fetched
 | |
|             # was created.
 | |
|             prereg_user.password_required = password_required
 | |
|             update_fields = ["password_required"]
 | |
|             if full_name:
 | |
|                 prereg_user.full_name = full_name
 | |
|                 prereg_user.full_name_validated = full_name_validated
 | |
|                 update_fields.extend(["full_name", "full_name_validated"])
 | |
|             prereg_user.save(update_fields=update_fields)
 | |
|         except PreregistrationUser.DoesNotExist:
 | |
|             prereg_user = create_preregistration_user(
 | |
|                 email,
 | |
|                 realm,
 | |
|                 password_required=password_required,
 | |
|                 full_name=full_name,
 | |
|                 full_name_validated=full_name_validated,
 | |
|                 multiuse_invite=multiuse_obj,
 | |
|             )
 | |
| 
 | |
|         if multiuse_obj is not None:
 | |
|             request.session.modified = True
 | |
|             streams_to_subscribe = list(multiuse_obj.streams.all())
 | |
|             prereg_user.streams.set(streams_to_subscribe)
 | |
|             prereg_user.invited_as = invited_as
 | |
|             prereg_user.multiuse_invite = multiuse_obj
 | |
|             prereg_user.save()
 | |
| 
 | |
|         confirmation_link = create_confirmation_link(prereg_user, Confirmation.USER_REGISTRATION)
 | |
|         if is_signup:
 | |
|             return redirect(confirmation_link)
 | |
| 
 | |
|         context = {"email": email, "continue_link": confirmation_link, "full_name": full_name}
 | |
|         return render(request, "zerver/confirm_continue_registration.html", context=context)
 | |
| 
 | |
|     # This email address it not allowed to join this organization, so
 | |
|     # just send the user back to the registration page.
 | |
|     url = reverse("register")
 | |
|     context = login_context(request)
 | |
|     extra_context: Mapping[str, Any] = {
 | |
|         "form": form,
 | |
|         "current_url": lambda: url,
 | |
|         "from_multiuse_invite": from_multiuse_invite,
 | |
|         "multiuse_object_key": multiuse_object_key,
 | |
|         "mobile_flow_otp": mobile_flow_otp,
 | |
|         "desktop_flow_otp": desktop_flow_otp,
 | |
|     }
 | |
|     context.update(extra_context)
 | |
|     return render(request, "zerver/accounts_home.html", context=context)
 | |
| 
 | |
| 
 | |
| def register_remote_user(request: HttpRequest, result: ExternalAuthResult) -> HttpResponse:
 | |
|     # We have verified the user controls an email address, but
 | |
|     # there's no associated Zulip user account.  Consider sending
 | |
|     # the request to registration.
 | |
|     kwargs: Dict[str, Any] = dict(result.data_dict)
 | |
|     # maybe_send_to_registration doesn't take these arguments, so delete them.
 | |
|     kwargs.pop("subdomain", None)
 | |
|     kwargs.pop("redirect_to", None)
 | |
|     kwargs.pop("is_realm_creation", None)
 | |
| 
 | |
|     kwargs["password_required"] = False
 | |
|     return maybe_send_to_registration(request, **kwargs)
 | |
| 
 | |
| 
 | |
| def login_or_register_remote_user(request: HttpRequest, result: ExternalAuthResult) -> HttpResponse:
 | |
|     """Given a successful authentication showing the user controls given
 | |
|     email address (email) and potentially a UserProfile
 | |
|     object (if the user already has a Zulip account), redirect the
 | |
|     browser to the appropriate place:
 | |
| 
 | |
|     * The logged-in app if the user already has a Zulip account and is
 | |
|       trying to log in, potentially to an initial narrow or page that had been
 | |
|       saved in the `redirect_to` parameter.
 | |
|     * The registration form if is_signup was set (i.e. the user is
 | |
|       trying to create a Zulip account)
 | |
|     * A special `confirm_continue_registration.html` "do you want to
 | |
|       register or try another account" if the user doesn't have a
 | |
|       Zulip account but is_signup is False (i.e. the user tried to log in
 | |
|       and then did social authentication selecting an email address that does
 | |
|       not have a Zulip account in this organization).
 | |
|     * A zulip:// URL to send control back to the mobile or desktop apps if they
 | |
|       are doing authentication using the mobile_flow_otp or desktop_flow_otp flow.
 | |
|     """
 | |
|     user_profile = result.user_profile
 | |
|     if user_profile is None or user_profile.is_mirror_dummy:
 | |
|         return register_remote_user(request, result)
 | |
|     # Otherwise, the user has successfully authenticated to an
 | |
|     # account, and we need to do the right thing depending whether
 | |
|     # or not they're using the mobile OTP flow or want a browser session.
 | |
|     is_realm_creation = result.data_dict.get("is_realm_creation")
 | |
|     mobile_flow_otp = result.data_dict.get("mobile_flow_otp")
 | |
|     desktop_flow_otp = result.data_dict.get("desktop_flow_otp")
 | |
|     if mobile_flow_otp is not None:
 | |
|         return finish_mobile_flow(request, user_profile, mobile_flow_otp)
 | |
|     elif desktop_flow_otp is not None:
 | |
|         return finish_desktop_flow(request, user_profile, desktop_flow_otp)
 | |
| 
 | |
|     do_login(request, user_profile)
 | |
| 
 | |
|     redirect_to = result.data_dict.get("redirect_to", "")
 | |
|     if is_realm_creation is not None and settings.BILLING_ENABLED:
 | |
|         from corporate.lib.stripe import is_free_trial_offer_enabled
 | |
| 
 | |
|         if is_free_trial_offer_enabled():
 | |
|             redirect_to = "{}?onboarding=true".format(reverse("initial_upgrade"))
 | |
| 
 | |
|     redirect_to = get_safe_redirect_to(redirect_to, user_profile.realm.uri)
 | |
|     return HttpResponseRedirect(redirect_to)
 | |
| 
 | |
| 
 | |
| def finish_desktop_flow(request: HttpRequest, user_profile: UserProfile, otp: str) -> HttpResponse:
 | |
|     """
 | |
|     The desktop otp flow returns to the app (through the clipboard)
 | |
|     a token that allows obtaining (through log_into_subdomain) a logged in session
 | |
|     for the user account we authenticated in this flow.
 | |
|     The token can only be used once and within ExternalAuthResult.LOGIN_KEY_EXPIRATION_SECONDS
 | |
|     of being created, as nothing more powerful is needed for the desktop flow
 | |
|     and this ensures the key can only be used for completing this authentication attempt.
 | |
|     """
 | |
|     result = ExternalAuthResult(user_profile=user_profile)
 | |
|     token = result.store_data()
 | |
|     key = bytes.fromhex(otp)
 | |
|     iv = secrets.token_bytes(12)
 | |
|     desktop_data = (iv + AESGCM(key).encrypt(iv, token.encode(), b"")).hex()
 | |
|     context = {
 | |
|         "desktop_data": desktop_data,
 | |
|         "browser_url": reverse("login_page", kwargs={"template_name": "zerver/login.html"}),
 | |
|         "realm_icon_url": realm_icon_url(user_profile.realm),
 | |
|     }
 | |
|     return render(request, "zerver/desktop_redirect.html", context=context)
 | |
| 
 | |
| 
 | |
| def finish_mobile_flow(request: HttpRequest, user_profile: UserProfile, otp: str) -> HttpResponse:
 | |
|     # For the mobile OAuth flow, we send the API key and other
 | |
|     # necessary details in a redirect to a zulip:// URI scheme.
 | |
|     api_key = get_api_key(user_profile)
 | |
|     response = create_response_for_otp_flow(
 | |
|         api_key, otp, user_profile, encrypted_key_field_name="otp_encrypted_api_key"
 | |
|     )
 | |
| 
 | |
|     # Since we are returning an API key instead of going through
 | |
|     # the Django login() function (which creates a browser
 | |
|     # session, etc.), the "new login" signal handler (which
 | |
|     # triggers an email notification new logins) will not run
 | |
|     # automatically.  So we call it manually here.
 | |
|     #
 | |
|     # Arguably, sending a fake 'user_logged_in' signal would be a better approach:
 | |
|     #   user_logged_in.send(sender=user_profile.__class__, request=request, user=user_profile)
 | |
|     email_on_new_login(sender=user_profile.__class__, request=request, user=user_profile)
 | |
| 
 | |
|     # Mark this request as having a logged-in user for our server logs.
 | |
|     process_client(request, user_profile)
 | |
|     RequestNotes.get_notes(request).requestor_for_logs = user_profile.format_requestor_for_logs()
 | |
| 
 | |
|     return response
 | |
| 
 | |
| 
 | |
| def create_response_for_otp_flow(
 | |
|     key: str, otp: str, user_profile: UserProfile, encrypted_key_field_name: str
 | |
| ) -> HttpResponse:
 | |
|     realm_uri = user_profile.realm.uri
 | |
| 
 | |
|     # Check if the mobile URI is overridden in settings, if so, replace it
 | |
|     # This block should only apply to the mobile flow, so we if add others, this
 | |
|     # needs to be conditional.
 | |
|     if realm_uri in settings.REALM_MOBILE_REMAP_URIS:
 | |
|         realm_uri = settings.REALM_MOBILE_REMAP_URIS[realm_uri]
 | |
| 
 | |
|     params = {
 | |
|         encrypted_key_field_name: otp_encrypt_api_key(key, otp),
 | |
|         "email": user_profile.delivery_email,
 | |
|         "user_id": user_profile.id,
 | |
|         "realm": realm_uri,
 | |
|     }
 | |
|     # We can't use HttpResponseRedirect, since it only allows HTTP(S) URLs
 | |
|     response = HttpResponse(status=302)
 | |
|     response["Location"] = append_url_query_string("zulip://login", urllib.parse.urlencode(params))
 | |
| 
 | |
|     return response
 | |
| 
 | |
| 
 | |
| @log_view_func
 | |
| @has_request_variables
 | |
| def remote_user_sso(
 | |
|     request: HttpRequest,
 | |
|     mobile_flow_otp: Optional[str] = REQ(default=None),
 | |
|     desktop_flow_otp: Optional[str] = REQ(default=None),
 | |
|     next: str = REQ(default="/"),
 | |
| ) -> HttpResponse:
 | |
|     subdomain = get_subdomain(request)
 | |
|     try:
 | |
|         realm: Optional[Realm] = get_realm(subdomain)
 | |
|     except Realm.DoesNotExist:
 | |
|         realm = None
 | |
| 
 | |
|     if not auth_enabled_helper([ZulipRemoteUserBackend.auth_backend_name], realm):
 | |
|         return config_error(request, "remote_user_backend_disabled")
 | |
| 
 | |
|     try:
 | |
|         remote_user = request.META["REMOTE_USER"]
 | |
|     except KeyError:
 | |
|         return config_error(request, "remote_user_header_missing")
 | |
| 
 | |
|     # Django invokes authenticate methods by matching arguments, and this
 | |
|     # authentication flow will not invoke LDAP authentication because of
 | |
|     # this condition of Django so no need to check if LDAP backend is
 | |
|     # enabled.
 | |
|     validate_login_email(remote_user_to_email(remote_user))
 | |
| 
 | |
|     # Here we support the mobile and desktop flow for REMOTE_USER_BACKEND; we
 | |
|     # validate the data format and then pass it through to
 | |
|     # login_or_register_remote_user if appropriate.
 | |
|     validate_otp_params(mobile_flow_otp, desktop_flow_otp)
 | |
| 
 | |
|     if realm is None:
 | |
|         user_profile = None
 | |
|     else:
 | |
|         user_profile = authenticate(remote_user=remote_user, realm=realm)
 | |
|     if user_profile is not None:
 | |
|         assert isinstance(user_profile, UserProfile)
 | |
| 
 | |
|     email = remote_user_to_email(remote_user)
 | |
|     data_dict = ExternalAuthDataDict(
 | |
|         email=email,
 | |
|         mobile_flow_otp=mobile_flow_otp,
 | |
|         desktop_flow_otp=desktop_flow_otp,
 | |
|         redirect_to=next,
 | |
|     )
 | |
|     if realm:
 | |
|         data_dict["subdomain"] = realm.subdomain
 | |
|     else:
 | |
|         data_dict["subdomain"] = ""  # realm creation happens on root subdomain
 | |
|     result = ExternalAuthResult(user_profile=user_profile, data_dict=data_dict)
 | |
|     return login_or_register_remote_user(request, result)
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| @log_view_func
 | |
| def remote_user_jwt(request: HttpRequest) -> HttpResponse:
 | |
|     subdomain = get_subdomain(request)
 | |
|     try:
 | |
|         key = settings.JWT_AUTH_KEYS[subdomain]["key"]
 | |
|         algorithms = settings.JWT_AUTH_KEYS[subdomain]["algorithms"]
 | |
|     except KeyError:
 | |
|         raise JsonableError(_("Auth key for this subdomain not found."))
 | |
| 
 | |
|     try:
 | |
|         json_web_token = request.POST["json_web_token"]
 | |
|         options = {"verify_signature": True}
 | |
|         payload = jwt.decode(json_web_token, key, algorithms=algorithms, options=options)
 | |
|     except KeyError:
 | |
|         raise JsonableError(_("No JSON web token passed in request"))
 | |
|     except jwt.InvalidTokenError:
 | |
|         raise JsonableError(_("Bad JSON web token"))
 | |
| 
 | |
|     remote_user = payload.get("user", None)
 | |
|     if remote_user is None:
 | |
|         raise JsonableError(_("No user specified in JSON web token claims"))
 | |
|     email_domain = payload.get("realm", None)
 | |
|     if email_domain is None:
 | |
|         raise JsonableError(_("No organization specified in JSON web token claims"))
 | |
| 
 | |
|     email = Address(username=remote_user, domain=email_domain).addr_spec
 | |
| 
 | |
|     try:
 | |
|         realm = get_realm(subdomain)
 | |
|     except Realm.DoesNotExist:
 | |
|         raise JsonableError(_("Wrong subdomain"))
 | |
| 
 | |
|     user_profile = authenticate(username=email, realm=realm, use_dummy_backend=True)
 | |
|     if user_profile is None:
 | |
|         result = ExternalAuthResult(
 | |
|             data_dict={"email": email, "full_name": remote_user, "subdomain": realm.subdomain}
 | |
|         )
 | |
|     else:
 | |
|         assert isinstance(user_profile, UserProfile)
 | |
|         result = ExternalAuthResult(user_profile=user_profile)
 | |
| 
 | |
|     return login_or_register_remote_user(request, result)
 | |
| 
 | |
| 
 | |
| @has_request_variables
 | |
| def oauth_redirect_to_root(
 | |
|     request: HttpRequest,
 | |
|     url: str,
 | |
|     sso_type: str,
 | |
|     is_signup: bool = False,
 | |
|     extra_url_params: Dict[str, str] = {},
 | |
|     next: Optional[str] = REQ(default=None),
 | |
|     multiuse_object_key: str = REQ(default=""),
 | |
|     mobile_flow_otp: Optional[str] = REQ(default=None),
 | |
|     desktop_flow_otp: Optional[str] = REQ(default=None),
 | |
| ) -> HttpResponse:
 | |
|     main_site_uri = settings.ROOT_DOMAIN_URI + url
 | |
|     if settings.SOCIAL_AUTH_SUBDOMAIN is not None and sso_type == "social":
 | |
|         main_site_uri = (
 | |
|             settings.EXTERNAL_URI_SCHEME
 | |
|             + settings.SOCIAL_AUTH_SUBDOMAIN
 | |
|             + "."
 | |
|             + settings.EXTERNAL_HOST
 | |
|         ) + url
 | |
| 
 | |
|     params = {
 | |
|         "subdomain": get_subdomain(request),
 | |
|         "is_signup": "1" if is_signup else "0",
 | |
|     }
 | |
| 
 | |
|     params["multiuse_object_key"] = multiuse_object_key
 | |
| 
 | |
|     # mobile_flow_otp is a one-time pad provided by the app that we
 | |
|     # can use to encrypt the API key when passing back to the app.
 | |
|     validate_otp_params(mobile_flow_otp, desktop_flow_otp)
 | |
|     if mobile_flow_otp is not None:
 | |
|         params["mobile_flow_otp"] = mobile_flow_otp
 | |
|     if desktop_flow_otp is not None:
 | |
|         params["desktop_flow_otp"] = desktop_flow_otp
 | |
| 
 | |
|     if next:
 | |
|         params["next"] = next
 | |
| 
 | |
|     params = {**params, **extra_url_params}
 | |
| 
 | |
|     return redirect(append_url_query_string(main_site_uri, urllib.parse.urlencode(params)))
 | |
| 
 | |
| 
 | |
| def handle_desktop_flow(func: ViewFuncT) -> ViewFuncT:
 | |
|     @wraps(func)
 | |
|     def wrapper(request: HttpRequest, *args: object, **kwargs: object) -> HttpResponse:
 | |
|         user_agent = parse_user_agent(request.headers.get("User-Agent", "Missing User-Agent"))
 | |
|         if user_agent["name"] == "ZulipElectron":
 | |
|             return render(request, "zerver/desktop_login.html")
 | |
| 
 | |
|         return func(request, *args, **kwargs)
 | |
| 
 | |
|     return cast(ViewFuncT, wrapper)  # https://github.com/python/mypy/issues/1927
 | |
| 
 | |
| 
 | |
| @handle_desktop_flow
 | |
| def start_remote_user_sso(request: HttpRequest) -> HttpResponse:
 | |
|     """
 | |
|     The purpose of this endpoint is to provide an initial step in the flow
 | |
|     on which we can handle the special behavior for the desktop app.
 | |
|     /accounts/login/sso may have Apache intercepting requests to it
 | |
|     to do authentication, so we need this additional endpoint.
 | |
|     """
 | |
|     query = request.META["QUERY_STRING"]
 | |
|     return redirect(append_url_query_string(reverse(remote_user_sso), query))
 | |
| 
 | |
| 
 | |
| @handle_desktop_flow
 | |
| def start_social_login(
 | |
|     request: HttpRequest,
 | |
|     backend: str,
 | |
|     extra_arg: Optional[str] = None,
 | |
| ) -> HttpResponse:
 | |
|     backend_url = reverse("social:begin", args=[backend])
 | |
|     extra_url_params: Dict[str, str] = {}
 | |
|     if backend == "saml":
 | |
|         if not SAMLAuthBackend.check_config():
 | |
|             return config_error(request, "saml")
 | |
| 
 | |
|         # This backend requires the name of the IdP (from the list of configured ones)
 | |
|         # to be passed as the parameter.
 | |
|         if not extra_arg or extra_arg not in settings.SOCIAL_AUTH_SAML_ENABLED_IDPS:
 | |
|             logging.info(
 | |
|                 "Attempted to initiate SAML authentication with wrong idp argument: %s", extra_arg
 | |
|             )
 | |
|             return config_error(request, "saml")
 | |
|         extra_url_params = {"idp": extra_arg}
 | |
| 
 | |
|     if backend == "apple" and not AppleAuthBackend.check_config():
 | |
|         return config_error(request, "apple")
 | |
|     if backend == "oidc" and not GenericOpenIdConnectBackend.check_config():
 | |
|         return config_error(request, "oidc")
 | |
| 
 | |
|     # TODO: Add AzureAD also.
 | |
|     if backend in ["github", "google", "gitlab"]:
 | |
|         key_setting = "SOCIAL_AUTH_" + backend.upper() + "_KEY"
 | |
|         secret_setting = "SOCIAL_AUTH_" + backend.upper() + "_SECRET"
 | |
|         if not (getattr(settings, key_setting) and getattr(settings, secret_setting)):
 | |
|             return config_error(request, backend)
 | |
| 
 | |
|     return oauth_redirect_to_root(request, backend_url, "social", extra_url_params=extra_url_params)
 | |
| 
 | |
| 
 | |
| @handle_desktop_flow
 | |
| def start_social_signup(
 | |
|     request: HttpRequest,
 | |
|     backend: str,
 | |
|     extra_arg: Optional[str] = None,
 | |
| ) -> HttpResponse:
 | |
|     backend_url = reverse("social:begin", args=[backend])
 | |
|     extra_url_params: Dict[str, str] = {}
 | |
|     if backend == "saml":
 | |
|         if not SAMLAuthBackend.check_config():
 | |
|             return config_error(request, "saml")
 | |
| 
 | |
|         if not extra_arg or extra_arg not in settings.SOCIAL_AUTH_SAML_ENABLED_IDPS:
 | |
|             logging.info(
 | |
|                 "Attempted to initiate SAML authentication with wrong idp argument: %s", extra_arg
 | |
|             )
 | |
|             return config_error(request, "saml")
 | |
|         extra_url_params = {"idp": extra_arg}
 | |
|     return oauth_redirect_to_root(
 | |
|         request, backend_url, "social", is_signup=True, extra_url_params=extra_url_params
 | |
|     )
 | |
| 
 | |
| 
 | |
| _subdomain_token_salt = "zerver.views.auth.log_into_subdomain"
 | |
| 
 | |
| 
 | |
| @log_view_func
 | |
| def log_into_subdomain(request: HttpRequest, token: str) -> HttpResponse:
 | |
|     """Given a valid authentication token (generated by
 | |
|     redirect_and_log_into_subdomain called on auth.zulip.example.com),
 | |
|     call login_or_register_remote_user, passing all the authentication
 | |
|     result data that has been stored in Redis, associated with this token.
 | |
|     """
 | |
|     # The tokens are intended to have the same format as API keys.
 | |
|     if not has_api_key_format(token):
 | |
|         logging.warning("log_into_subdomain: Malformed token given: %s", token)
 | |
|         return HttpResponse(status=400)
 | |
| 
 | |
|     try:
 | |
|         result = ExternalAuthResult(login_token=token)
 | |
|     except ExternalAuthResult.InvalidTokenError:
 | |
|         logging.warning("log_into_subdomain: Invalid token given: %s", token)
 | |
|         return render(request, "zerver/log_into_subdomain_token_invalid.html", status=400)
 | |
| 
 | |
|     subdomain = get_subdomain(request)
 | |
|     if result.data_dict["subdomain"] != subdomain:
 | |
|         raise JsonableError(_("Invalid subdomain"))
 | |
| 
 | |
|     return login_or_register_remote_user(request, result)
 | |
| 
 | |
| 
 | |
| def redirect_and_log_into_subdomain(result: ExternalAuthResult) -> HttpResponse:
 | |
|     token = result.store_data()
 | |
|     realm = get_realm(result.data_dict["subdomain"])
 | |
|     subdomain_login_uri = realm.uri + reverse(log_into_subdomain, args=[token])
 | |
|     return redirect(subdomain_login_uri)
 | |
| 
 | |
| 
 | |
| def redirect_to_misconfigured_ldap_notice(request: HttpRequest, error_type: int) -> HttpResponse:
 | |
|     if error_type == ZulipLDAPAuthBackend.REALM_IS_NONE_ERROR:
 | |
|         return config_error(request, "ldap")
 | |
|     else:
 | |
|         raise AssertionError("Invalid error type")
 | |
| 
 | |
| 
 | |
| def show_deactivation_notice(request: HttpRequest) -> HttpResponse:
 | |
|     realm = get_realm_from_request(request)
 | |
|     if realm and realm.deactivated:
 | |
|         context = {"deactivated_domain_name": realm.name}
 | |
|         if realm.deactivated_redirect is not None:
 | |
|             context["deactivated_redirect"] = realm.deactivated_redirect
 | |
|         return render(request, "zerver/deactivated.html", context=context)
 | |
| 
 | |
|     return HttpResponseRedirect(reverse("login_page"))
 | |
| 
 | |
| 
 | |
| def redirect_to_deactivation_notice() -> HttpResponse:
 | |
|     return HttpResponseRedirect(reverse(show_deactivation_notice))
 | |
| 
 | |
| 
 | |
| def update_login_page_context(request: HttpRequest, context: Dict[str, Any]) -> None:
 | |
|     for key in ("email", "already_registered"):
 | |
|         try:
 | |
|             context[key] = request.GET[key]
 | |
|         except KeyError:
 | |
|             pass
 | |
| 
 | |
|     deactivated_email = request.GET.get("is_deactivated")
 | |
|     if deactivated_email is None:
 | |
|         return
 | |
|     try:
 | |
|         validate_email(deactivated_email)
 | |
|         context["deactivated_account_error"] = mark_safe(
 | |
|             DEACTIVATED_ACCOUNT_ERROR.format(username=escape(deactivated_email))
 | |
|         )
 | |
|     except ValidationError:
 | |
|         logging.info("Invalid email in is_deactivated param to login page: %s", deactivated_email)
 | |
| 
 | |
| 
 | |
| class TwoFactorLoginView(BaseTwoFactorLoginView):
 | |
|     extra_context: ExtraContext = None
 | |
|     form_list = (
 | |
|         ("auth", OurAuthenticationForm),
 | |
|         ("token", AuthenticationTokenForm),
 | |
|         ("backup", BackupTokenForm),
 | |
|     )
 | |
| 
 | |
|     def __init__(self, extra_context: ExtraContext = None, *args: Any, **kwargs: Any) -> None:
 | |
|         self.extra_context = extra_context
 | |
|         super().__init__(*args, **kwargs)
 | |
| 
 | |
|     def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
 | |
|         context = super().get_context_data(**kwargs)
 | |
|         if self.extra_context is not None:
 | |
|             context.update(self.extra_context)
 | |
|         update_login_page_context(self.request, context)
 | |
| 
 | |
|         realm = get_realm_from_request(self.request)
 | |
|         redirect_to = realm.uri if realm else "/"
 | |
|         context["next"] = self.request.POST.get(
 | |
|             "next",
 | |
|             self.request.GET.get("next", redirect_to),
 | |
|         )
 | |
|         return context
 | |
| 
 | |
|     def done(self, form_list: List[Form], **kwargs: Any) -> HttpResponse:
 | |
|         """
 | |
|         Log in the user and redirect to the desired page.
 | |
| 
 | |
|         We need to override this function so that we can redirect to
 | |
|         realm.uri instead of '/'.
 | |
|         """
 | |
|         realm_uri = self.get_user().realm.uri
 | |
|         # This mock.patch business is an unpleasant hack that we'd
 | |
|         # ideally like to remove by instead patching the upstream
 | |
|         # module to support better configurability of the
 | |
|         # LOGIN_REDIRECT_URL setting.  But until then, it works.  We
 | |
|         # import mock.patch here because mock has an expensive import
 | |
|         # process involving pbr -> pkgresources (which is really slow).
 | |
|         from unittest.mock import patch
 | |
| 
 | |
|         with patch.object(settings, "LOGIN_REDIRECT_URL", realm_uri):
 | |
|             return super().done(form_list, **kwargs)
 | |
| 
 | |
| 
 | |
| @has_request_variables
 | |
| def login_page(
 | |
|     request: HttpRequest,
 | |
|     /,
 | |
|     next: str = REQ(default="/"),
 | |
|     **kwargs: Any,
 | |
| ) -> HttpResponse:
 | |
|     if settings.SOCIAL_AUTH_SUBDOMAIN == get_subdomain(request):
 | |
|         return social_auth_subdomain_login_page(request)
 | |
| 
 | |
|     # To support previewing the Zulip login pages, we have a special option
 | |
|     # that disables the default behavior of redirecting logged-in users to the
 | |
|     # logged-in app.
 | |
|     is_preview = "preview" in request.GET
 | |
|     if settings.TWO_FACTOR_AUTHENTICATION_ENABLED:
 | |
|         if request.user.is_authenticated and is_2fa_verified(request.user):
 | |
|             return HttpResponseRedirect(request.user.realm.uri)
 | |
|     elif request.user.is_authenticated and not is_preview:
 | |
|         return HttpResponseRedirect(request.user.realm.uri)
 | |
|     if is_subdomain_root_or_alias(request) and settings.ROOT_DOMAIN_LANDING_PAGE:
 | |
|         redirect_url = reverse("realm_redirect")
 | |
|         if request.GET:
 | |
|             redirect_url = append_url_query_string(redirect_url, request.GET.urlencode())
 | |
|         return HttpResponseRedirect(redirect_url)
 | |
| 
 | |
|     realm = get_realm_from_request(request)
 | |
|     if realm and realm.deactivated:
 | |
|         return redirect_to_deactivation_notice()
 | |
| 
 | |
|     extra_context = kwargs.pop("extra_context", {})
 | |
|     extra_context["next"] = next
 | |
|     if dev_auth_enabled() and kwargs.get("template_name") == "zerver/development/dev_login.html":
 | |
|         from zerver.views.development.dev_login import add_dev_login_context
 | |
| 
 | |
|         if "new_realm" in request.POST:
 | |
|             try:
 | |
|                 realm = get_realm(request.POST["new_realm"])
 | |
|             except Realm.DoesNotExist:
 | |
|                 realm = None
 | |
| 
 | |
|         add_dev_login_context(realm, extra_context)
 | |
|         if realm and "new_realm" in request.POST:
 | |
|             # If we're switching realms, redirect to that realm, but
 | |
|             # only if it actually exists.
 | |
|             return HttpResponseRedirect(realm.uri)
 | |
| 
 | |
|     if "username" in request.POST:
 | |
|         extra_context["email"] = request.POST["username"]
 | |
|     extra_context.update(login_context(request))
 | |
| 
 | |
|     if settings.TWO_FACTOR_AUTHENTICATION_ENABLED:
 | |
|         return start_two_factor_auth(request, extra_context=extra_context, **kwargs)
 | |
| 
 | |
|     try:
 | |
|         template_response = DjangoLoginView.as_view(
 | |
|             authentication_form=OurAuthenticationForm, extra_context=extra_context, **kwargs
 | |
|         )(request)
 | |
|     except ZulipLDAPConfigurationError as e:
 | |
|         assert len(e.args) > 1
 | |
|         return redirect_to_misconfigured_ldap_notice(request, e.args[1])
 | |
| 
 | |
|     if isinstance(template_response, SimpleTemplateResponse):
 | |
|         # Only those responses that are rendered using a template have
 | |
|         # context_data attribute. This attribute doesn't exist otherwise. It is
 | |
|         # added in SimpleTemplateResponse class, which is a derived class of
 | |
|         # HttpResponse. See django.template.response.SimpleTemplateResponse,
 | |
|         # https://github.com/django/django/blob/2.0/django/template/response.py#L19
 | |
|         assert template_response.context_data is not None
 | |
|         update_login_page_context(request, template_response.context_data)
 | |
| 
 | |
|     assert isinstance(template_response, HttpResponse)
 | |
|     return template_response
 | |
| 
 | |
| 
 | |
| def social_auth_subdomain_login_page(request: HttpRequest) -> HttpResponse:
 | |
|     origin_subdomain = request.session.get("subdomain")
 | |
|     if origin_subdomain is not None:
 | |
|         try:
 | |
|             origin_realm = get_realm(origin_subdomain)
 | |
|             return HttpResponseRedirect(origin_realm.uri)
 | |
|         except Realm.DoesNotExist:
 | |
|             pass
 | |
| 
 | |
|     return render(request, "zerver/auth_subdomain.html", status=400)
 | |
| 
 | |
| 
 | |
| def start_two_factor_auth(
 | |
|     request: HttpRequest, extra_context: ExtraContext = None, **kwargs: Any
 | |
| ) -> HttpResponse:
 | |
|     two_fa_form_field = "two_factor_login_view-current_step"
 | |
|     if two_fa_form_field not in request.POST:
 | |
|         # Here we inject the 2FA step in the request context if it's missing to
 | |
|         # force the user to go to the first step of 2FA authentication process.
 | |
|         # This seems a bit hackish but simplifies things from testing point of
 | |
|         # view. I don't think this can result in anything bad because all the
 | |
|         # authentication logic runs after the auth step.
 | |
|         #
 | |
|         # If we don't do this, we will have to modify a lot of auth tests to
 | |
|         # insert this variable in the request.
 | |
|         request.POST = request.POST.copy()
 | |
|         request.POST.update({two_fa_form_field: "auth"})
 | |
| 
 | |
|     """
 | |
|     This is how Django implements as_view(), so extra_context will be passed
 | |
|     to the __init__ method of TwoFactorLoginView.
 | |
| 
 | |
|     def as_view(cls, **initkwargs):
 | |
|         def view(request, *args, **kwargs):
 | |
|             self = cls(**initkwargs)
 | |
|             ...
 | |
| 
 | |
|         return view
 | |
|     """
 | |
|     two_fa_view = TwoFactorLoginView.as_view(extra_context=extra_context, **kwargs)
 | |
|     return two_fa_view(request, **kwargs)
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| @require_post
 | |
| @has_request_variables
 | |
| def api_fetch_api_key(
 | |
|     request: HttpRequest, username: str = REQ(), password: str = REQ()
 | |
| ) -> HttpResponse:
 | |
|     return_data: Dict[str, bool] = {}
 | |
| 
 | |
|     realm = get_realm_from_request(request)
 | |
|     if realm is None:
 | |
|         raise InvalidSubdomainError()
 | |
| 
 | |
|     if not ldap_auth_enabled(realm=realm):
 | |
|         # In case we don't authenticate against LDAP, check for a valid
 | |
|         # email. LDAP backend can authenticate against a non-email.
 | |
|         validate_login_email(username)
 | |
|     user_profile = authenticate(
 | |
|         request=request, username=username, password=password, realm=realm, return_data=return_data
 | |
|     )
 | |
|     if return_data.get("inactive_user"):
 | |
|         raise UserDeactivatedError()
 | |
|     if return_data.get("inactive_realm"):
 | |
|         raise RealmDeactivatedError()
 | |
|     if return_data.get("password_auth_disabled"):
 | |
|         raise PasswordAuthDisabledError()
 | |
|     if return_data.get("password_reset_needed"):
 | |
|         raise PasswordResetRequiredError()
 | |
|     if user_profile is None:
 | |
|         raise AuthenticationFailedError()
 | |
| 
 | |
|     assert user_profile.is_authenticated
 | |
| 
 | |
|     # Maybe sending 'user_logged_in' signal is the better approach:
 | |
|     #   user_logged_in.send(sender=user_profile.__class__, request=request, user=user_profile)
 | |
|     # Not doing this only because over here we don't add the user information
 | |
|     # in the session. If the signal receiver assumes that we do then that
 | |
|     # would cause problems.
 | |
|     email_on_new_login(sender=user_profile.__class__, request=request, user=user_profile)
 | |
| 
 | |
|     # Mark this request as having a logged-in user for our server logs.
 | |
|     assert isinstance(user_profile, UserProfile)
 | |
|     process_client(request, user_profile)
 | |
|     RequestNotes.get_notes(request).requestor_for_logs = user_profile.format_requestor_for_logs()
 | |
| 
 | |
|     api_key = get_api_key(user_profile)
 | |
|     return json_success(request, data={"api_key": api_key, "email": user_profile.delivery_email})
 | |
| 
 | |
| 
 | |
| def get_auth_backends_data(request: HttpRequest) -> Dict[str, Any]:
 | |
|     """Returns which authentication methods are enabled on the server"""
 | |
|     subdomain = get_subdomain(request)
 | |
|     try:
 | |
|         realm = Realm.objects.get(string_id=subdomain)
 | |
|     except Realm.DoesNotExist:
 | |
|         # If not the root subdomain, this is an error
 | |
|         if subdomain != Realm.SUBDOMAIN_FOR_ROOT_DOMAIN:
 | |
|             raise JsonableError(_("Invalid subdomain"))
 | |
|         # With the root subdomain, it's an error or not depending
 | |
|         # whether ROOT_DOMAIN_LANDING_PAGE (which indicates whether
 | |
|         # there are some realms without subdomains on this server)
 | |
|         # is set.
 | |
|         if settings.ROOT_DOMAIN_LANDING_PAGE:
 | |
|             raise JsonableError(_("Subdomain required"))
 | |
|         else:
 | |
|             realm = None
 | |
|     result = {
 | |
|         "password": password_auth_enabled(realm),
 | |
|     }
 | |
|     for auth_backend_name in AUTH_BACKEND_NAME_MAP:
 | |
|         key = auth_backend_name.lower()
 | |
|         result[key] = auth_enabled_helper([auth_backend_name], realm)
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def check_server_incompatibility(request: HttpRequest) -> bool:
 | |
|     user_agent = parse_user_agent(request.headers.get("User-Agent", "Missing User-Agent"))
 | |
|     return user_agent["name"] == "ZulipInvalid"
 | |
| 
 | |
| 
 | |
| @require_safe
 | |
| @csrf_exempt
 | |
| def api_get_server_settings(request: HttpRequest) -> HttpResponse:
 | |
|     # Log which client is making this request.
 | |
|     process_client(request)
 | |
|     result = dict(
 | |
|         authentication_methods=get_auth_backends_data(request),
 | |
|         zulip_version=ZULIP_VERSION,
 | |
|         zulip_merge_base=ZULIP_MERGE_BASE,
 | |
|         zulip_feature_level=API_FEATURE_LEVEL,
 | |
|         push_notifications_enabled=push_notifications_enabled(),
 | |
|         is_incompatible=check_server_incompatibility(request),
 | |
|     )
 | |
|     context = zulip_default_context(request)
 | |
|     context.update(login_context(request))
 | |
|     # IMPORTANT NOTE:
 | |
|     # realm_name, realm_icon, etc. are not guaranteed to appear in the response.
 | |
|     # * If they do, that means the server URL has only one realm on it
 | |
|     # * If they don't, the server has multiple realms, and it's not clear which is
 | |
|     #   the requested realm, so we can't send back these data.
 | |
|     for settings_item in [
 | |
|         "email_auth_enabled",
 | |
|         "require_email_format_usernames",
 | |
|         "realm_uri",
 | |
|         "realm_name",
 | |
|         "realm_icon",
 | |
|         "realm_description",
 | |
|         "realm_web_public_access_enabled",
 | |
|         "external_authentication_methods",
 | |
|     ]:
 | |
|         if context[settings_item] is not None:
 | |
|             result[settings_item] = context[settings_item]
 | |
|     return json_success(request, data=result)
 | |
| 
 | |
| 
 | |
| @has_request_variables
 | |
| def json_fetch_api_key(
 | |
|     request: HttpRequest, user_profile: UserProfile, password: str = REQ(default="")
 | |
| ) -> HttpResponse:
 | |
|     realm = get_realm_from_request(request)
 | |
|     if realm is None:
 | |
|         raise JsonableError(_("Invalid subdomain"))
 | |
|     if password_auth_enabled(user_profile.realm):
 | |
|         if not authenticate(
 | |
|             request=request, username=user_profile.delivery_email, password=password, realm=realm
 | |
|         ):
 | |
|             raise JsonableError(_("Password is incorrect."))
 | |
| 
 | |
|     api_key = get_api_key(user_profile)
 | |
|     return json_success(request, data={"api_key": api_key, "email": user_profile.delivery_email})
 | |
| 
 | |
| 
 | |
| logout_then_login = require_post(django_logout_then_login)
 | |
| 
 | |
| 
 | |
| def password_reset(request: HttpRequest) -> HttpResponse:
 | |
|     if is_subdomain_root_or_alias(request) and settings.ROOT_DOMAIN_LANDING_PAGE:
 | |
|         redirect_url = append_url_query_string(
 | |
|             reverse("realm_redirect"), urlencode({"next": reverse("password_reset")})
 | |
|         )
 | |
|         return HttpResponseRedirect(redirect_url)
 | |
| 
 | |
|     try:
 | |
|         response = DjangoPasswordResetView.as_view(
 | |
|             template_name="zerver/reset.html",
 | |
|             form_class=ZulipPasswordResetForm,
 | |
|             success_url="/accounts/password/reset/done/",
 | |
|         )(request)
 | |
|     except RateLimited as e:
 | |
|         assert e.secs_to_freedom is not None
 | |
|         return render(
 | |
|             request,
 | |
|             "zerver/rate_limit_exceeded.html",
 | |
|             context={"retry_after": int(e.secs_to_freedom)},
 | |
|             status=429,
 | |
|         )
 | |
|     assert isinstance(response, HttpResponse)
 | |
|     return response
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| def saml_sp_metadata(request: HttpRequest) -> HttpResponse:  # nocoverage
 | |
|     """
 | |
|     This is the view function for generating our SP metadata
 | |
|     for SAML authentication. It's meant for helping check the correctness
 | |
|     of the configuration when setting up SAML, or for obtaining the XML metadata
 | |
|     if the IdP requires it.
 | |
|     Taken from https://python-social-auth.readthedocs.io/en/latest/backends/saml.html
 | |
|     """
 | |
|     if not saml_auth_enabled():
 | |
|         return config_error(request, "saml")
 | |
| 
 | |
|     complete_url = reverse("social:complete", args=("saml",))
 | |
|     saml_backend = load_backend(load_strategy(request), "saml", complete_url)
 | |
|     metadata, errors = saml_backend.generate_metadata_xml()
 | |
|     if not errors:
 | |
|         return HttpResponse(content=metadata, content_type="text/xml")
 | |
| 
 | |
|     return HttpResponseServerError(content=", ".join(errors))
 | |
| 
 | |
| 
 | |
| def config_error(request: HttpRequest, error_category_name: str) -> HttpResponse:
 | |
|     contexts = {
 | |
|         "apple": {"social_backend_name": "apple", "has_markdown_file": True},
 | |
|         "google": {"social_backend_name": "google", "has_markdown_file": True},
 | |
|         "github": {"social_backend_name": "github", "has_markdown_file": True},
 | |
|         "gitlab": {"social_backend_name": "gitlab", "has_markdown_file": True},
 | |
|         "ldap": {"error_name": "ldap_error_realm_is_none"},
 | |
|         "dev": {"error_name": "dev_not_supported_error"},
 | |
|         "saml": {"social_backend_name": "saml"},
 | |
|         "smtp": {"error_name": "smtp_error"},
 | |
|         "remote_user_backend_disabled": {"error_name": "remoteuser_error_backend_disabled"},
 | |
|         "remote_user_header_missing": {"error_name": "remoteuser_error_remote_user_header_missing"},
 | |
|         # TODO: Improve the config error page for OIDC.
 | |
|         "oidc": {"error_name": "oidc_error"},
 | |
|     }
 | |
| 
 | |
|     return render(request, "zerver/config_error.html", contexts[error_category_name])
 |