mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			183 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			183 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import logging
 | 
						|
from typing import Literal, TypedDict, cast
 | 
						|
 | 
						|
from django.http import HttpRequest
 | 
						|
from django.utils.timezone import now as timezone_now
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
 | 
						|
from zerver.lib.exceptions import JsonableError, RemoteBillingAuthenticationError
 | 
						|
from zerver.lib.timestamp import datetime_to_timestamp
 | 
						|
from zilencer.models import (
 | 
						|
    RemoteRealm,
 | 
						|
    RemoteRealmBillingUser,
 | 
						|
    RemoteServerBillingUser,
 | 
						|
    RemoteZulipServer,
 | 
						|
)
 | 
						|
 | 
						|
billing_logger = logging.getLogger("corporate.stripe")
 | 
						|
 | 
						|
# The sessions are relatively short-lived, so that we can avoid issues
 | 
						|
# with users who have their privileges revoked on the remote server
 | 
						|
# maintaining access to the billing page for too long.
 | 
						|
REMOTE_BILLING_SESSION_VALIDITY_SECONDS = 2 * 60 * 60
 | 
						|
 | 
						|
 | 
						|
class RemoteBillingUserDict(TypedDict):
 | 
						|
    user_uuid: str
 | 
						|
    user_email: str
 | 
						|
    user_full_name: str
 | 
						|
 | 
						|
 | 
						|
class RemoteBillingIdentityDict(TypedDict):
 | 
						|
    user: RemoteBillingUserDict
 | 
						|
    remote_server_uuid: str
 | 
						|
    remote_realm_uuid: str
 | 
						|
 | 
						|
    remote_billing_user_id: int | None
 | 
						|
    authenticated_at: int
 | 
						|
    uri_scheme: Literal["http://", "https://"]
 | 
						|
 | 
						|
    next_page: str | None
 | 
						|
 | 
						|
 | 
						|
class LegacyServerIdentityDict(TypedDict):
 | 
						|
    # Currently this has only one field. We can extend this
 | 
						|
    # to add more information as appropriate.
 | 
						|
    remote_server_uuid: str
 | 
						|
 | 
						|
    remote_billing_user_id: int | None
 | 
						|
    authenticated_at: int
 | 
						|
 | 
						|
 | 
						|
class RemoteBillingIdentityExpiredError(Exception):
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        *,
 | 
						|
        realm_uuid: str | None = None,
 | 
						|
        server_uuid: str | None = None,
 | 
						|
        uri_scheme: Literal["http://", "https://"] | None = None,
 | 
						|
    ) -> None:
 | 
						|
        self.realm_uuid = realm_uuid
 | 
						|
        self.server_uuid = server_uuid
 | 
						|
        self.uri_scheme = uri_scheme
 | 
						|
 | 
						|
 | 
						|
def get_identity_dict_from_session(
 | 
						|
    request: HttpRequest,
 | 
						|
    *,
 | 
						|
    realm_uuid: str | None,
 | 
						|
    server_uuid: str | None,
 | 
						|
) -> RemoteBillingIdentityDict | LegacyServerIdentityDict | None:
 | 
						|
    if not (realm_uuid or server_uuid):
 | 
						|
        return None
 | 
						|
 | 
						|
    identity_dicts = request.session.get("remote_billing_identities")
 | 
						|
    if identity_dicts is None:
 | 
						|
        return None
 | 
						|
 | 
						|
    if realm_uuid is not None:
 | 
						|
        result = identity_dicts.get(f"remote_realm:{realm_uuid}")
 | 
						|
    else:
 | 
						|
        assert server_uuid is not None
 | 
						|
        result = identity_dicts.get(f"remote_server:{server_uuid}")
 | 
						|
 | 
						|
    if result is None:
 | 
						|
        return None
 | 
						|
    if (
 | 
						|
        datetime_to_timestamp(timezone_now()) - result["authenticated_at"]
 | 
						|
        > REMOTE_BILLING_SESSION_VALIDITY_SECONDS
 | 
						|
    ):
 | 
						|
        # In this case we raise, because callers want to catch this as an explicitly
 | 
						|
        # different scenario from the user not being authenticated, to handle it nicely
 | 
						|
        # by redirecting them to their login page.
 | 
						|
        raise RemoteBillingIdentityExpiredError(
 | 
						|
            realm_uuid=result.get("remote_realm_uuid"),
 | 
						|
            server_uuid=result.get("remote_server_uuid"),
 | 
						|
            uri_scheme=result.get("uri_scheme"),
 | 
						|
        )
 | 
						|
 | 
						|
    return result
 | 
						|
 | 
						|
 | 
						|
def get_remote_realm_and_user_from_session(
 | 
						|
    request: HttpRequest,
 | 
						|
    realm_uuid: str | None,
 | 
						|
) -> tuple[RemoteRealm, RemoteRealmBillingUser]:
 | 
						|
    # Cannot use isinstance with TypedDicts, to make mypy know
 | 
						|
    # which of the TypedDicts in the Union this is - so just cast it.
 | 
						|
    identity_dict = cast(
 | 
						|
        RemoteBillingIdentityDict | None,
 | 
						|
        get_identity_dict_from_session(request, realm_uuid=realm_uuid, server_uuid=None),
 | 
						|
    )
 | 
						|
 | 
						|
    if identity_dict is None:
 | 
						|
        raise RemoteBillingAuthenticationError
 | 
						|
 | 
						|
    remote_server_uuid = identity_dict["remote_server_uuid"]
 | 
						|
    remote_realm_uuid = identity_dict["remote_realm_uuid"]
 | 
						|
 | 
						|
    try:
 | 
						|
        remote_realm = RemoteRealm.objects.get(
 | 
						|
            uuid=remote_realm_uuid, server__uuid=remote_server_uuid
 | 
						|
        )
 | 
						|
    except RemoteRealm.DoesNotExist:
 | 
						|
        raise AssertionError(
 | 
						|
            "The remote realm is missing despite being in the RemoteBillingIdentityDict"
 | 
						|
        )
 | 
						|
 | 
						|
    if (
 | 
						|
        remote_realm.registration_deactivated
 | 
						|
        or remote_realm.realm_deactivated
 | 
						|
        or remote_realm.server.deactivated
 | 
						|
    ):
 | 
						|
        raise JsonableError(_("Registration is deactivated"))
 | 
						|
 | 
						|
    remote_billing_user_id = identity_dict["remote_billing_user_id"]
 | 
						|
    # We only put IdentityDicts with remote_billing_user_id in the session in this flow,
 | 
						|
    # because the RemoteRealmBillingUser already exists when this is inserted into the session
 | 
						|
    # at the end of authentication.
 | 
						|
    assert remote_billing_user_id is not None
 | 
						|
 | 
						|
    try:
 | 
						|
        remote_billing_user = RemoteRealmBillingUser.objects.get(
 | 
						|
            id=remote_billing_user_id, remote_realm=remote_realm
 | 
						|
        )
 | 
						|
    except RemoteRealmBillingUser.DoesNotExist:
 | 
						|
        raise AssertionError
 | 
						|
 | 
						|
    return remote_realm, remote_billing_user
 | 
						|
 | 
						|
 | 
						|
def get_remote_server_and_user_from_session(
 | 
						|
    request: HttpRequest,
 | 
						|
    server_uuid: str,
 | 
						|
) -> tuple[RemoteZulipServer, RemoteServerBillingUser | None]:
 | 
						|
    identity_dict: LegacyServerIdentityDict | None = get_identity_dict_from_session(
 | 
						|
        request, realm_uuid=None, server_uuid=server_uuid
 | 
						|
    )
 | 
						|
 | 
						|
    if identity_dict is None:
 | 
						|
        raise RemoteBillingAuthenticationError
 | 
						|
 | 
						|
    remote_server_uuid = identity_dict["remote_server_uuid"]
 | 
						|
    try:
 | 
						|
        remote_server = RemoteZulipServer.objects.get(uuid=remote_server_uuid)
 | 
						|
    except RemoteZulipServer.DoesNotExist:
 | 
						|
        raise JsonableError(_("Invalid remote server."))
 | 
						|
 | 
						|
    if remote_server.deactivated:
 | 
						|
        raise JsonableError(_("Registration is deactivated"))
 | 
						|
 | 
						|
    remote_billing_user_id = identity_dict.get("remote_billing_user_id")
 | 
						|
    if remote_billing_user_id is None:
 | 
						|
        return remote_server, None
 | 
						|
 | 
						|
    try:
 | 
						|
        remote_billing_user = RemoteServerBillingUser.objects.get(
 | 
						|
            id=remote_billing_user_id, remote_server=remote_server
 | 
						|
        )
 | 
						|
    except RemoteServerBillingUser.DoesNotExist:
 | 
						|
        remote_billing_user = None
 | 
						|
 | 
						|
    return remote_server, remote_billing_user
 |