Files
zulip/corporate/lib/remote_billing_util.py
Anders Kaseorg 4e04eed745 typos: Fix typos caught by typos and mwic.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2025-02-18 19:44:02 -08:00

183 lines
5.8 KiB
Python

import logging
from typing import Literal, TypedDict, cast
from django.http import HttpRequest
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from zerver.lib.exceptions import JsonableError, RemoteBillingAuthenticationError
from zerver.lib.timestamp import datetime_to_timestamp
from zilencer.models import (
RemoteRealm,
RemoteRealmBillingUser,
RemoteServerBillingUser,
RemoteZulipServer,
)
billing_logger = logging.getLogger("corporate.stripe")
# The sessions are relatively short-lived, so that we can avoid issues
# with users who have their privileges revoked on the remote server
# maintaining access to the billing page for too long.
REMOTE_BILLING_SESSION_VALIDITY_SECONDS = 2 * 60 * 60
class RemoteBillingUserDict(TypedDict):
user_uuid: str
user_email: str
user_full_name: str
class RemoteBillingIdentityDict(TypedDict):
user: RemoteBillingUserDict
remote_server_uuid: str
remote_realm_uuid: str
remote_billing_user_id: int | None
authenticated_at: int
uri_scheme: Literal["http://", "https://"]
next_page: str | None
class LegacyServerIdentityDict(TypedDict):
# Currently this has only one field. We can extend this
# to add more information as appropriate.
remote_server_uuid: str
remote_billing_user_id: int | None
authenticated_at: int
class RemoteBillingIdentityExpiredError(Exception):
def __init__(
self,
*,
realm_uuid: str | None = None,
server_uuid: str | None = None,
uri_scheme: Literal["http://", "https://"] | None = None,
) -> None:
self.realm_uuid = realm_uuid
self.server_uuid = server_uuid
self.uri_scheme = uri_scheme
def get_identity_dict_from_session(
request: HttpRequest,
*,
realm_uuid: str | None,
server_uuid: str | None,
) -> RemoteBillingIdentityDict | LegacyServerIdentityDict | None:
if not (realm_uuid or server_uuid):
return None
identity_dicts = request.session.get("remote_billing_identities")
if identity_dicts is None:
return None
if realm_uuid is not None:
result = identity_dicts.get(f"remote_realm:{realm_uuid}")
else:
assert server_uuid is not None
result = identity_dicts.get(f"remote_server:{server_uuid}")
if result is None:
return None
if (
datetime_to_timestamp(timezone_now()) - result["authenticated_at"]
> REMOTE_BILLING_SESSION_VALIDITY_SECONDS
):
# In this case we raise, because callers want to catch this as an explicitly
# different scenario from the user not being authenticated, to handle it nicely
# by redirecting them to their login page.
raise RemoteBillingIdentityExpiredError(
realm_uuid=result.get("remote_realm_uuid"),
server_uuid=result.get("remote_server_uuid"),
uri_scheme=result.get("uri_scheme"),
)
return result
def get_remote_realm_and_user_from_session(
request: HttpRequest,
realm_uuid: str | None,
) -> tuple[RemoteRealm, RemoteRealmBillingUser]:
# Cannot use isinstance with TypedDicts, to make mypy know
# which of the TypedDicts in the Union this is - so just cast it.
identity_dict = cast(
RemoteBillingIdentityDict | None,
get_identity_dict_from_session(request, realm_uuid=realm_uuid, server_uuid=None),
)
if identity_dict is None:
raise RemoteBillingAuthenticationError
remote_server_uuid = identity_dict["remote_server_uuid"]
remote_realm_uuid = identity_dict["remote_realm_uuid"]
try:
remote_realm = RemoteRealm.objects.get(
uuid=remote_realm_uuid, server__uuid=remote_server_uuid
)
except RemoteRealm.DoesNotExist:
raise AssertionError(
"The remote realm is missing despite being in the RemoteBillingIdentityDict"
)
if (
remote_realm.registration_deactivated
or remote_realm.realm_deactivated
or remote_realm.server.deactivated
):
raise JsonableError(_("Registration is deactivated"))
remote_billing_user_id = identity_dict["remote_billing_user_id"]
# We only put IdentityDicts with remote_billing_user_id in the session in this flow,
# because the RemoteRealmBillingUser already exists when this is inserted into the session
# at the end of authentication.
assert remote_billing_user_id is not None
try:
remote_billing_user = RemoteRealmBillingUser.objects.get(
id=remote_billing_user_id, remote_realm=remote_realm
)
except RemoteRealmBillingUser.DoesNotExist:
raise AssertionError
return remote_realm, remote_billing_user
def get_remote_server_and_user_from_session(
request: HttpRequest,
server_uuid: str,
) -> tuple[RemoteZulipServer, RemoteServerBillingUser | None]:
identity_dict: LegacyServerIdentityDict | None = get_identity_dict_from_session(
request, realm_uuid=None, server_uuid=server_uuid
)
if identity_dict is None:
raise RemoteBillingAuthenticationError
remote_server_uuid = identity_dict["remote_server_uuid"]
try:
remote_server = RemoteZulipServer.objects.get(uuid=remote_server_uuid)
except RemoteZulipServer.DoesNotExist:
raise JsonableError(_("Invalid remote server."))
if remote_server.deactivated:
raise JsonableError(_("Registration is deactivated"))
remote_billing_user_id = identity_dict.get("remote_billing_user_id")
if remote_billing_user_id is None:
return remote_server, None
try:
remote_billing_user = RemoteServerBillingUser.objects.get(
id=remote_billing_user_id, remote_server=remote_server
)
except RemoteServerBillingUser.DoesNotExist:
remote_billing_user = None
return remote_server, remote_billing_user