Files
zulip/corporate/lib/remote_billing_util.py
Mateusz Mandera 134e3bfa68 remote_billing: Add redirects to login for unauthed user in legacy flow.
Analogical to the more complex mechanism implemented for the RemoteRealm
flow in a previous commit in
authenticated_remote_realm_management_endpoint.

As explained in the code comment, this is much easier because:

In this flow, we can only redirect to our local "legacy server flow
login" page. That means that we can do it universally whether the user
has an expired
identity_dict, or just lacks any form of authentication info at all -
there are no security concerns since this is just a local redirect.
2023-12-03 10:39:56 -08:00

152 lines
4.7 KiB
Python

import logging
from typing import Literal, Optional, TypedDict, Union, cast
from django.http import HttpRequest
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from zerver.lib.exceptions import JsonableError, RemoteBillingAuthenticationError
from zerver.lib.timestamp import datetime_to_timestamp
from zilencer.models import RemoteRealm, RemoteZulipServer
billing_logger = logging.getLogger("corporate.stripe")
# The sessions are relatively short-lived, so that we can avoid issues
# with users who have their privileges revoked on the remote server
# maintaining access to the billing page for too long.
REMOTE_BILLING_SESSION_VALIDITY_SECONDS = 2 * 60 * 60
class RemoteBillingUserDict(TypedDict):
user_uuid: str
user_email: str
user_full_name: str
class RemoteBillingIdentityDict(TypedDict):
user: RemoteBillingUserDict
remote_server_uuid: str
remote_realm_uuid: str
authenticated_at: int
uri_scheme: Literal["http://", "https://"]
next_page: Optional[str]
class LegacyServerIdentityDict(TypedDict):
# Currently this has only one field. We can extend this
# to add more information as appropriate.
remote_server_uuid: str
authenticated_at: int
class RemoteBillingIdentityExpiredError(Exception):
def __init__(
self,
*,
realm_uuid: Optional[str] = None,
server_uuid: Optional[str] = None,
uri_scheme: Optional[Literal["http://", "https://"]] = None,
) -> None:
self.realm_uuid = realm_uuid
self.server_uuid = server_uuid
self.uri_scheme = uri_scheme
def get_identity_dict_from_session(
request: HttpRequest,
*,
realm_uuid: Optional[str],
server_uuid: Optional[str],
) -> Optional[Union[RemoteBillingIdentityDict, LegacyServerIdentityDict]]:
if not (realm_uuid or server_uuid):
return None
identity_dicts = request.session.get("remote_billing_identities")
if identity_dicts is None:
return None
if realm_uuid is not None:
result = identity_dicts.get(f"remote_realm:{realm_uuid}")
else:
assert server_uuid is not None
result = identity_dicts.get(f"remote_server:{server_uuid}")
if result is None:
return None
if (
datetime_to_timestamp(timezone_now()) - result["authenticated_at"]
> REMOTE_BILLING_SESSION_VALIDITY_SECONDS
):
# In this case we raise, because callers want to catch this as an explicitly
# different scenario from the user not being authenticated, to handle it nicely
# by redirecting them to their login page.
raise RemoteBillingIdentityExpiredError(
realm_uuid=result.get("remote_realm_uuid"),
server_uuid=result.get("remote_server_uuid"),
uri_scheme=result.get("uri_scheme"),
)
return result
def get_remote_realm_from_session(
request: HttpRequest,
realm_uuid: Optional[str],
) -> RemoteRealm:
# Cannot use isinstance with TypeDicts, to make mypy know
# which of the TypedDicts in the Union this is - so just cast it.
identity_dict = cast(
Optional[RemoteBillingIdentityDict],
get_identity_dict_from_session(request, realm_uuid=realm_uuid, server_uuid=None),
)
if identity_dict is None:
raise RemoteBillingAuthenticationError
remote_server_uuid = identity_dict["remote_server_uuid"]
remote_realm_uuid = identity_dict["remote_realm_uuid"]
try:
remote_realm = RemoteRealm.objects.get(
uuid=remote_realm_uuid, server__uuid=remote_server_uuid
)
except RemoteRealm.DoesNotExist:
raise AssertionError(
"The remote realm is missing despite being in the RemoteBillingIdentityDict"
)
if (
remote_realm.registration_deactivated
or remote_realm.realm_deactivated
or remote_realm.server.deactivated
):
raise JsonableError(_("Registration is deactivated"))
return remote_realm
def get_remote_server_from_session(
request: HttpRequest,
server_uuid: str,
) -> RemoteZulipServer:
identity_dict: Optional[LegacyServerIdentityDict] = get_identity_dict_from_session(
request, realm_uuid=None, server_uuid=server_uuid
)
if identity_dict is None:
raise RemoteBillingAuthenticationError
remote_server_uuid = identity_dict["remote_server_uuid"]
try:
remote_server = RemoteZulipServer.objects.get(uuid=remote_server_uuid)
except RemoteZulipServer.DoesNotExist:
raise JsonableError(_("Invalid remote server."))
if remote_server.deactivated:
raise JsonableError(_("Registration is deactivated"))
return remote_server