mirror of
				https://github.com/zulip/zulip.git
				synced 2025-10-26 09:34:02 +00:00 
			
		
		
		
	Fixes #20028. There's no reason to have a special `RealmCreationKey` class - the `Confirmation` system already does this job. This is somewhat complicated by the need to write a migration for `RealmCreationKey`->`Confirmation` for pre-existing, valid objects, to avoid breaking realm creation links that haven't been used yet.
		
			
				
	
	
		
			309 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			309 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright: (c) 2008, Jarek Zgoda <jarek.zgoda@gmail.com>
 | |
| 
 | |
| __revision__ = "$Id: models.py 28 2009-10-22 15:03:02Z jarek.zgoda $"
 | |
| import secrets
 | |
| from base64 import b32encode
 | |
| from collections.abc import Mapping
 | |
| from datetime import timedelta
 | |
| from typing import TypeAlias, Union, cast
 | |
| from urllib.parse import urljoin
 | |
| 
 | |
| from django.conf import settings
 | |
| from django.contrib.contenttypes.fields import GenericForeignKey
 | |
| from django.contrib.contenttypes.models import ContentType
 | |
| from django.db import models
 | |
| from django.db.models import CASCADE
 | |
| from django.http import HttpRequest, HttpResponse
 | |
| from django.template.response import TemplateResponse
 | |
| from django.urls import reverse
 | |
| from django.utils.timezone import now as timezone_now
 | |
| from typing_extensions import override
 | |
| 
 | |
| from confirmation import settings as confirmation_settings
 | |
| from zerver.lib.types import UNSET, Unset
 | |
| from zerver.models import (
 | |
|     EmailChangeStatus,
 | |
|     MultiuseInvite,
 | |
|     PreregistrationRealm,
 | |
|     PreregistrationUser,
 | |
|     Realm,
 | |
|     RealmReactivationStatus,
 | |
|     UserProfile,
 | |
| )
 | |
| from zerver.models.prereg_users import RealmCreationStatus
 | |
| 
 | |
| if settings.ZILENCER_ENABLED:
 | |
|     from zilencer.models import (
 | |
|         PreregistrationRemoteRealmBillingUser,
 | |
|         PreregistrationRemoteServerBillingUser,
 | |
|     )
 | |
| 
 | |
| 
 | |
| class ConfirmationKeyError(Exception):
 | |
|     WRONG_LENGTH = 1
 | |
|     EXPIRED = 2
 | |
|     DOES_NOT_EXIST = 3
 | |
| 
 | |
|     def __init__(self, error_type: int) -> None:
 | |
|         super().__init__()
 | |
|         self.error_type = error_type
 | |
| 
 | |
| 
 | |
| def render_confirmation_key_error(
 | |
|     request: HttpRequest, exception: ConfirmationKeyError
 | |
| ) -> HttpResponse:
 | |
|     if exception.error_type == ConfirmationKeyError.WRONG_LENGTH:
 | |
|         return TemplateResponse(request, "confirmation/link_malformed.html", status=404)
 | |
|     if exception.error_type == ConfirmationKeyError.EXPIRED:
 | |
|         return TemplateResponse(request, "confirmation/link_expired.html", status=404)
 | |
|     return TemplateResponse(request, "confirmation/link_does_not_exist.html", status=404)
 | |
| 
 | |
| 
 | |
| def generate_key() -> str:
 | |
|     # 24 characters * 5 bits of entropy/character = 120 bits of entropy
 | |
|     return b32encode(secrets.token_bytes(15)).decode().lower()
 | |
| 
 | |
| 
 | |
| NoZilencerConfirmationObjT: TypeAlias = (
 | |
|     MultiuseInvite
 | |
|     | PreregistrationRealm
 | |
|     | PreregistrationUser
 | |
|     | EmailChangeStatus
 | |
|     | UserProfile
 | |
|     | RealmReactivationStatus
 | |
|     | RealmCreationStatus
 | |
| )
 | |
| ZilencerConfirmationObjT: TypeAlias = Union[
 | |
|     NoZilencerConfirmationObjT,
 | |
|     "PreregistrationRemoteServerBillingUser",
 | |
|     "PreregistrationRemoteRealmBillingUser",
 | |
| ]
 | |
| 
 | |
| ConfirmationObjT: TypeAlias = NoZilencerConfirmationObjT | ZilencerConfirmationObjT
 | |
| 
 | |
| 
 | |
| def get_object_from_key(
 | |
|     confirmation_key: str,
 | |
|     confirmation_types: list[int],
 | |
|     *,
 | |
|     mark_object_used: bool,
 | |
|     allow_used: bool = False,
 | |
| ) -> ConfirmationObjT:
 | |
|     """Access a confirmation object from one of the provided confirmation
 | |
|     types with the provided key.
 | |
| 
 | |
|     The mark_object_used parameter determines whether to mark the
 | |
|     confirmation object as used (which generally prevents it from
 | |
|     being used again). It should always be False for MultiuseInvite
 | |
|     objects, since they are intended to be used multiple times.
 | |
| 
 | |
|     By default, used confirmation objects cannot be used again as part
 | |
|     of their security model.
 | |
|     """
 | |
| 
 | |
|     # Confirmation keys used to be 40 characters
 | |
|     if len(confirmation_key) not in (24, 40):
 | |
|         raise ConfirmationKeyError(ConfirmationKeyError.WRONG_LENGTH)
 | |
|     try:
 | |
|         confirmation = Confirmation.objects.get(
 | |
|             confirmation_key=confirmation_key, type__in=confirmation_types
 | |
|         )
 | |
|     except Confirmation.DoesNotExist:
 | |
|         raise ConfirmationKeyError(ConfirmationKeyError.DOES_NOT_EXIST)
 | |
| 
 | |
|     if confirmation.expiry_date is not None and timezone_now() > confirmation.expiry_date:
 | |
|         raise ConfirmationKeyError(ConfirmationKeyError.EXPIRED)
 | |
| 
 | |
|     obj = confirmation.content_object
 | |
|     assert obj is not None
 | |
| 
 | |
|     forbidden_statuses = {confirmation_settings.STATUS_REVOKED}
 | |
|     if not allow_used:
 | |
|         forbidden_statuses.add(confirmation_settings.STATUS_USED)
 | |
| 
 | |
|     if hasattr(obj, "status") and obj.status in forbidden_statuses:
 | |
|         # Confirmations where the object has the status attribute are one-time use
 | |
|         # and are marked after being revoked (or used).
 | |
|         raise ConfirmationKeyError(ConfirmationKeyError.EXPIRED)
 | |
| 
 | |
|     if mark_object_used:
 | |
|         # MultiuseInvite objects do not use the STATUS_USED status, since they are
 | |
|         # intended to be used more than once.
 | |
|         assert confirmation.type != Confirmation.MULTIUSE_INVITE
 | |
|         assert hasattr(obj, "status")
 | |
|         obj.status = getattr(settings, "STATUS_USED", 1)
 | |
|         obj.save(update_fields=["status"])
 | |
|     return obj
 | |
| 
 | |
| 
 | |
| def create_confirmation_object(
 | |
|     obj: ConfirmationObjT,
 | |
|     confirmation_type: int,
 | |
|     *,
 | |
|     validity_in_minutes: int | None | Unset = UNSET,
 | |
|     no_associated_realm_object: bool = False,
 | |
| ) -> "Confirmation":
 | |
|     # validity_in_minutes is an override for the default values which are
 | |
|     # determined by the confirmation_type - its main purpose is for use
 | |
|     # in tests which may want to have control over the exact expiration time.
 | |
|     key = generate_key()
 | |
| 
 | |
|     # Some confirmation objects, like those for realm creation or those used
 | |
|     # for the self-hosted management flows, are not associated with a realm
 | |
|     # hosted by this Zulip server.
 | |
|     if no_associated_realm_object:
 | |
|         realm = None
 | |
|     else:
 | |
|         obj = cast(NoZilencerConfirmationObjT, obj)
 | |
|         assert not isinstance(obj, PreregistrationRealm | RealmCreationStatus)
 | |
|         realm = obj.realm
 | |
| 
 | |
|     current_time = timezone_now()
 | |
|     expiry_date = None
 | |
|     if not isinstance(validity_in_minutes, Unset):
 | |
|         if validity_in_minutes is None:
 | |
|             expiry_date = None
 | |
|         else:
 | |
|             assert validity_in_minutes is not None
 | |
|             expiry_date = current_time + timedelta(minutes=validity_in_minutes)
 | |
|     else:
 | |
|         expiry_date = current_time + timedelta(days=_properties[confirmation_type].validity_in_days)
 | |
| 
 | |
|     return Confirmation.objects.create(
 | |
|         content_object=obj,
 | |
|         date_sent=current_time,
 | |
|         confirmation_key=key,
 | |
|         realm=realm,
 | |
|         expiry_date=expiry_date,
 | |
|         type=confirmation_type,
 | |
|     )
 | |
| 
 | |
| 
 | |
| def create_confirmation_link(
 | |
|     obj: ConfirmationObjT,
 | |
|     confirmation_type: int,
 | |
|     *,
 | |
|     validity_in_minutes: int | None | Unset = UNSET,
 | |
|     url_args: Mapping[str, str] = {},
 | |
|     no_associated_realm_object: bool = False,
 | |
| ) -> str:
 | |
|     conf = create_confirmation_object(
 | |
|         obj,
 | |
|         confirmation_type,
 | |
|         validity_in_minutes=validity_in_minutes,
 | |
|         no_associated_realm_object=no_associated_realm_object,
 | |
|     )
 | |
|     result = confirmation_url_for(
 | |
|         conf,
 | |
|         url_args=url_args,
 | |
|     )
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def confirmation_url_for(confirmation_obj: "Confirmation", url_args: Mapping[str, str] = {}) -> str:
 | |
|     return confirmation_url(
 | |
|         confirmation_obj.confirmation_key, confirmation_obj.realm, confirmation_obj.type, url_args
 | |
|     )
 | |
| 
 | |
| 
 | |
| def confirmation_url(
 | |
|     confirmation_key: str,
 | |
|     realm: Realm | None,
 | |
|     confirmation_type: int,
 | |
|     url_args: Mapping[str, str] = {},
 | |
| ) -> str:
 | |
|     url_args = dict(url_args)
 | |
|     url_args["confirmation_key"] = confirmation_key
 | |
|     return urljoin(
 | |
|         settings.ROOT_DOMAIN_URI if realm is None else realm.url,
 | |
|         reverse(_properties[confirmation_type].url_name, kwargs=url_args),
 | |
|     )
 | |
| 
 | |
| 
 | |
| class Confirmation(models.Model):
 | |
|     content_type = models.ForeignKey(ContentType, on_delete=CASCADE)
 | |
|     object_id = models.PositiveBigIntegerField(db_index=True)
 | |
|     content_object = GenericForeignKey("content_type", "object_id")
 | |
|     date_sent = models.DateTimeField(db_index=True)
 | |
|     confirmation_key = models.CharField(max_length=40, db_index=True)
 | |
|     expiry_date = models.DateTimeField(db_index=True, null=True)
 | |
|     realm = models.ForeignKey(Realm, null=True, on_delete=CASCADE)
 | |
| 
 | |
|     # The following list is the set of valid types
 | |
|     USER_REGISTRATION = 1
 | |
|     INVITATION = 2
 | |
|     EMAIL_CHANGE = 3
 | |
|     UNSUBSCRIBE = 4
 | |
|     SERVER_REGISTRATION = 5
 | |
|     MULTIUSE_INVITE = 6
 | |
|     NEW_REALM_USER_REGISTRATION = 7
 | |
|     REALM_REACTIVATION = 8
 | |
|     REMOTE_SERVER_BILLING_LEGACY_LOGIN = 9
 | |
|     REMOTE_REALM_BILLING_LEGACY_LOGIN = 10
 | |
|     CAN_CREATE_REALM = 11
 | |
|     type = models.PositiveSmallIntegerField()
 | |
| 
 | |
|     class Meta:
 | |
|         unique_together = ("type", "confirmation_key")
 | |
|         indexes = [
 | |
|             models.Index(fields=["content_type", "object_id"]),
 | |
|         ]
 | |
| 
 | |
|     @override
 | |
|     def __str__(self) -> str:
 | |
|         return f"{self.content_object!r}"
 | |
| 
 | |
| 
 | |
| class ConfirmationType:
 | |
|     def __init__(
 | |
|         self,
 | |
|         url_name: str,
 | |
|         validity_in_days: int = settings.CONFIRMATION_LINK_DEFAULT_VALIDITY_DAYS,
 | |
|     ) -> None:
 | |
|         self.url_name = url_name
 | |
|         self.validity_in_days = validity_in_days
 | |
| 
 | |
| 
 | |
| _properties = {
 | |
|     Confirmation.USER_REGISTRATION: ConfirmationType("get_prereg_key_and_redirect"),
 | |
|     Confirmation.INVITATION: ConfirmationType(
 | |
|         "get_prereg_key_and_redirect", validity_in_days=settings.INVITATION_LINK_VALIDITY_DAYS
 | |
|     ),
 | |
|     Confirmation.EMAIL_CHANGE: ConfirmationType("confirm_email_change_get"),
 | |
|     Confirmation.UNSUBSCRIBE: ConfirmationType(
 | |
|         "unsubscribe",
 | |
|         validity_in_days=1000000,  # should never expire
 | |
|     ),
 | |
|     Confirmation.MULTIUSE_INVITE: ConfirmationType(
 | |
|         "join", validity_in_days=settings.INVITATION_LINK_VALIDITY_DAYS
 | |
|     ),
 | |
|     Confirmation.CAN_CREATE_REALM: ConfirmationType(
 | |
|         "create_realm", validity_in_days=settings.CAN_CREATE_REALM_LINK_VALIDITY_DAYS
 | |
|     ),
 | |
|     Confirmation.NEW_REALM_USER_REGISTRATION: ConfirmationType("get_prereg_key_and_redirect"),
 | |
|     Confirmation.REALM_REACTIVATION: ConfirmationType("realm_reactivation_get"),
 | |
| }
 | |
| if settings.ZILENCER_ENABLED:
 | |
|     _properties[Confirmation.REMOTE_SERVER_BILLING_LEGACY_LOGIN] = ConfirmationType(
 | |
|         "remote_billing_legacy_server_from_login_confirmation_link"
 | |
|     )
 | |
|     _properties[Confirmation.REMOTE_REALM_BILLING_LEGACY_LOGIN] = ConfirmationType(
 | |
|         "remote_realm_billing_from_login_confirmation_link"
 | |
|     )
 | |
| 
 | |
| 
 | |
| def one_click_unsubscribe_link(user_profile: UserProfile, email_type: str) -> str:
 | |
|     """
 | |
|     Generate a unique link that a logged-out user can visit to unsubscribe from
 | |
|     Zulip e-mails without having to first log in.
 | |
|     """
 | |
|     return create_confirmation_link(
 | |
|         user_profile, Confirmation.UNSUBSCRIBE, url_args={"email_type": email_type}
 | |
|     )
 | |
| 
 | |
| 
 | |
| def generate_realm_creation_url(by_admin: bool = False) -> str:
 | |
|     from zerver.views.registration import prepare_realm_creation_url
 | |
| 
 | |
|     return prepare_realm_creation_url(presume_email_valid=by_admin)
 |