Files
zulip/corporate/lib/remote_billing_util.py
Mateusz Mandera abdfdeffe4 remote_billing: Implement confirmation flow for legacy servers.
For the last form (with Full Name and ToS consent field), this pretty
shamelessly re-uses and directly renders the
corporate/remote_realm_billing_finalize_login_confirmation.html
template. That's probably good in terms of re-use, but calls for a
clean-up commit that will generalize the name of this template and the
classes/ids in the HTML.
2023-12-08 23:49:10 -08:00

165 lines
5.2 KiB
Python

import logging
from typing import Literal, Optional, Tuple, TypedDict, Union, cast
from django.http import HttpRequest
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from zerver.lib.exceptions import JsonableError, RemoteBillingAuthenticationError
from zerver.lib.timestamp import datetime_to_timestamp
from zilencer.models import RemoteRealm, RemoteServerBillingUser, RemoteZulipServer
billing_logger = logging.getLogger("corporate.stripe")
# The sessions are relatively short-lived, so that we can avoid issues
# with users who have their privileges revoked on the remote server
# maintaining access to the billing page for too long.
REMOTE_BILLING_SESSION_VALIDITY_SECONDS = 2 * 60 * 60
class RemoteBillingUserDict(TypedDict):
user_uuid: str
user_email: str
user_full_name: str
class RemoteBillingIdentityDict(TypedDict):
user: RemoteBillingUserDict
remote_server_uuid: str
remote_realm_uuid: str
authenticated_at: int
uri_scheme: Literal["http://", "https://"]
next_page: Optional[str]
class LegacyServerIdentityDict(TypedDict):
# Currently this has only one field. We can extend this
# to add more information as appropriate.
remote_server_uuid: str
remote_billing_user_id: Optional[int]
authenticated_at: int
class RemoteBillingIdentityExpiredError(Exception):
def __init__(
self,
*,
realm_uuid: Optional[str] = None,
server_uuid: Optional[str] = None,
uri_scheme: Optional[Literal["http://", "https://"]] = None,
) -> None:
self.realm_uuid = realm_uuid
self.server_uuid = server_uuid
self.uri_scheme = uri_scheme
def get_identity_dict_from_session(
request: HttpRequest,
*,
realm_uuid: Optional[str],
server_uuid: Optional[str],
) -> Optional[Union[RemoteBillingIdentityDict, LegacyServerIdentityDict]]:
if not (realm_uuid or server_uuid):
return None
identity_dicts = request.session.get("remote_billing_identities")
if identity_dicts is None:
return None
if realm_uuid is not None:
result = identity_dicts.get(f"remote_realm:{realm_uuid}")
else:
assert server_uuid is not None
result = identity_dicts.get(f"remote_server:{server_uuid}")
if result is None:
return None
if (
datetime_to_timestamp(timezone_now()) - result["authenticated_at"]
> REMOTE_BILLING_SESSION_VALIDITY_SECONDS
):
# In this case we raise, because callers want to catch this as an explicitly
# different scenario from the user not being authenticated, to handle it nicely
# by redirecting them to their login page.
raise RemoteBillingIdentityExpiredError(
realm_uuid=result.get("remote_realm_uuid"),
server_uuid=result.get("remote_server_uuid"),
uri_scheme=result.get("uri_scheme"),
)
return result
def get_remote_realm_from_session(
request: HttpRequest,
realm_uuid: Optional[str],
) -> RemoteRealm:
# Cannot use isinstance with TypeDicts, to make mypy know
# which of the TypedDicts in the Union this is - so just cast it.
identity_dict = cast(
Optional[RemoteBillingIdentityDict],
get_identity_dict_from_session(request, realm_uuid=realm_uuid, server_uuid=None),
)
if identity_dict is None:
raise RemoteBillingAuthenticationError
remote_server_uuid = identity_dict["remote_server_uuid"]
remote_realm_uuid = identity_dict["remote_realm_uuid"]
try:
remote_realm = RemoteRealm.objects.get(
uuid=remote_realm_uuid, server__uuid=remote_server_uuid
)
except RemoteRealm.DoesNotExist:
raise AssertionError(
"The remote realm is missing despite being in the RemoteBillingIdentityDict"
)
if (
remote_realm.registration_deactivated
or remote_realm.realm_deactivated
or remote_realm.server.deactivated
):
raise JsonableError(_("Registration is deactivated"))
return remote_realm
def get_remote_server_and_user_from_session(
request: HttpRequest,
server_uuid: str,
) -> Tuple[RemoteZulipServer, Optional[RemoteServerBillingUser]]:
identity_dict = cast(
Optional[LegacyServerIdentityDict],
get_identity_dict_from_session(request, realm_uuid=None, server_uuid=server_uuid),
)
if identity_dict is None:
raise RemoteBillingAuthenticationError
remote_server_uuid = identity_dict["remote_server_uuid"]
try:
remote_server = RemoteZulipServer.objects.get(uuid=remote_server_uuid)
except RemoteZulipServer.DoesNotExist:
raise JsonableError(_("Invalid remote server."))
if remote_server.deactivated:
raise JsonableError(_("Registration is deactivated"))
remote_billing_user_id = identity_dict.get("remote_billing_user_id")
if remote_billing_user_id is None:
return remote_server, None
try:
remote_billing_user = RemoteServerBillingUser.objects.get(
id=remote_billing_user_id, remote_server=remote_server
)
except RemoteServerBillingUser.DoesNotExist:
remote_billing_user = None
return remote_server, remote_billing_user