remote_billing: Sort out remote_billing_identities typing.

This does two important things:
1. Fix return type of get_identity_dict_from_session to correctly be
   Optional[Union[RemoteBillingIdentityDict, LegacyServerIdentityDict]].
   RemoteBillingIdentityDict is the type in the 8.0+ auth flow,
   LegacyServerIdentityDict is the type in old servers flow, where only
   the server uuid info is available.
2. The uuid key used in request.session["remote_billing_identities"]
   should be explicitly namespaced depending on which flow and type
   we're
   dealing with - to avoid confusion in case of collisions between a
   realm and server that have the same UUID. Such a situation should not
   occur naturally and I haven't come up with any actual exploitation
   ideas that could utilize this by manipulating your server/realm
   uuids, but it's much easier to just not think about such collision
   security implications by making them impossible.
This commit is contained in:
Mateusz Mandera
2023-11-30 20:47:23 +01:00
committed by Tim Abbott
parent 8370268f89
commit 5a198c639e
4 changed files with 34 additions and 21 deletions

View File

@@ -49,14 +49,11 @@ def authenticated_remote_realm_management_endpoint(
return render(request, "404.html", status=404)
realm_uuid = kwargs.get("realm_uuid")
server_uuid = kwargs.get("server_uuid")
if realm_uuid is not None and not isinstance(realm_uuid, str):
raise TypeError("realm_uuid must be a string or None")
if server_uuid is not None and not isinstance(server_uuid, str):
raise TypeError("server_uuid must be a string or None")
remote_realm = get_remote_realm_from_session(
request, realm_uuid=realm_uuid, server_uuid=server_uuid
)
remote_realm = get_remote_realm_from_session(request, realm_uuid)
billing_session = RemoteRealmBillingSession(remote_realm)
return view_func(request, billing_session)

View File

@@ -1,5 +1,5 @@
import logging
from typing import Optional, TypedDict
from typing import Optional, TypedDict, Union, cast
from django.http import HttpRequest
from django.utils.translation import gettext as _
@@ -28,25 +28,34 @@ class LegacyServerIdentityDict(TypedDict):
def get_identity_dict_from_session(
request: HttpRequest,
*,
realm_uuid: Optional[str],
server_uuid: Optional[str],
) -> Optional[RemoteBillingIdentityDict]:
authed_uuid = realm_uuid or server_uuid
assert authed_uuid is not None
) -> Optional[Union[RemoteBillingIdentityDict, LegacyServerIdentityDict]]:
if not (realm_uuid or server_uuid):
return None
identity_dicts = request.session.get("remote_billing_identities")
if identity_dicts is not None:
return identity_dicts.get(authed_uuid)
if identity_dicts is None:
return None
return None
if realm_uuid is not None:
return identity_dicts.get(f"remote_realm:{realm_uuid}")
else:
assert server_uuid is not None
return identity_dicts.get(f"remote_server:{server_uuid}")
def get_remote_realm_from_session(
request: HttpRequest,
realm_uuid: Optional[str],
server_uuid: Optional[str] = None,
) -> RemoteRealm:
identity_dict = get_identity_dict_from_session(request, realm_uuid, server_uuid)
# Cannot use isinstance with TypeDicts, to make mypy know
# which of the TypedDicts in the Union this is - so just cast it.
identity_dict = cast(
Optional[RemoteBillingIdentityDict],
get_identity_dict_from_session(request, realm_uuid=realm_uuid, server_uuid=None),
)
if identity_dict is None:
raise JsonableError(_("User not authenticated"))
@@ -77,7 +86,10 @@ def get_remote_server_from_session(
request: HttpRequest,
server_uuid: str,
) -> RemoteZulipServer:
identity_dict = get_identity_dict_from_session(request, None, server_uuid)
identity_dict: Optional[LegacyServerIdentityDict] = get_identity_dict_from_session(
request, realm_uuid=None, server_uuid=server_uuid
)
if identity_dict is None:
raise JsonableError(_("User not authenticated"))

View File

@@ -42,7 +42,8 @@ class RemoteBillingAuthenticationTest(BouncerTestCase):
next_page=next_page,
)
self.assertEqual(
self.client.session["remote_billing_identities"][str(user.realm.uuid)], identity_dict
self.client.session["remote_billing_identities"][f"remote_realm:{user.realm.uuid!s}"],
identity_dict,
)
# It's up to the caller to verify further details, such as the exact redirect URL,

View File

@@ -85,7 +85,9 @@ def remote_server_billing_finalize_login(
remote_realm_uuid = identity_dict["remote_realm_uuid"]
request.session["remote_billing_identities"] = {}
request.session["remote_billing_identities"][remote_realm_uuid] = identity_dict
request.session["remote_billing_identities"][
f"remote_realm:{remote_realm_uuid}"
] = identity_dict
# TODO: Figure out redirects based on whether the realm/server already has a plan
# and should be taken to /billing or doesn't have and should be taken to /plans.
@@ -132,6 +134,7 @@ def render_tmp_remote_billing_page(
# is authenticated. the uuid_owner_secret is not needed here, it'll be used for
# for validating transfers of a realm to a different RemoteZulipServer (in the
# export-import process).
assert isinstance(remote_realm_uuid, str)
remote_realm = RemoteRealm.objects.get(uuid=remote_realm_uuid, server=remote_server)
except RemoteRealm.DoesNotExist:
raise AssertionError(
@@ -258,8 +261,8 @@ def remote_billing_legacy_server_login(
remote_server_uuid = str(remote_server.uuid)
request.session["remote_billing_identities"] = {}
request.session["remote_billing_identities"][remote_server_uuid] = LegacyServerIdentityDict(
remote_server_uuid=remote_server_uuid
)
request.session["remote_billing_identities"][
f"remote_server:{remote_server_uuid}"
] = LegacyServerIdentityDict(remote_server_uuid=remote_server_uuid)
return HttpResponseRedirect(reverse("remote_billing_page_server", args=(remote_server_uuid,)))