mirror of
https://github.com/zulip/zulip.git
synced 2025-11-02 21:13:36 +00:00
Implements a nice redirect flow to give a good UX for users attempting to access a remote billing page with an expired RemoteRealm session e.g. /realm/some-uuid/sponsorship - perhaps through their browser history or just their session expired while they were doing things in this billing system. The logic has a few pieces: 1. get_remote_realm_from_session, if the user doesn't have a identity_dict will raise RemoteBillingAuthenticationError. 2. If the user has an identity_dict, but it's expired, then get_identity_dict_from_session inside of get_remote_realm_from_session will raise RemoteBillingIdentityExpiredError. 3. The decorator authenticated_remote_realm_management_endpoint catches that exception and uses some general logic, described in more detail in the comments in the code, to figure out the right URL to redirect them to. Something like: https://theirserver.example.com/self-hosted-billing/?next_page=... where the next_page param is determined based on parsing request.path to see what kind of endpoint they're trying to access. 4. The remote_server_billing_entry endpoint is tweaked to also send its uri scheme to the bouncer, so that the bouncer can know whether to do the redirect on http or https.
173 lines
5.8 KiB
Python
173 lines
5.8 KiB
Python
import re
|
|
from typing import Optional
|
|
|
|
from django.conf import settings
|
|
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
|
|
from django.shortcuts import render
|
|
from django.utils.translation import gettext as _
|
|
|
|
from zerver.decorator import human_users_only, zulip_login_required
|
|
from zerver.lib.exceptions import (
|
|
JsonableError,
|
|
MissingRemoteRealmError,
|
|
OrganizationOwnerRequiredError,
|
|
)
|
|
from zerver.lib.push_notifications import (
|
|
InvalidPushDeviceTokenError,
|
|
add_push_device_token,
|
|
b64_to_hex,
|
|
remove_push_device_token,
|
|
send_test_push_notification,
|
|
uses_notification_bouncer,
|
|
)
|
|
from zerver.lib.remote_server import (
|
|
UserDataForRemoteBilling,
|
|
get_realms_info_for_push_bouncer,
|
|
send_realms_only_to_push_bouncer,
|
|
send_to_push_bouncer,
|
|
)
|
|
from zerver.lib.request import REQ, has_request_variables
|
|
from zerver.lib.response import json_success
|
|
from zerver.lib.typed_endpoint import typed_endpoint
|
|
from zerver.lib.validator import check_string
|
|
from zerver.models import PushDeviceToken, UserProfile
|
|
|
|
|
|
def check_app_id(var_name: str, val: object) -> str:
|
|
# Garbage values should be harmless, but we can be picky
|
|
# as insurance against bugs somewhere.
|
|
s = check_string(var_name, val)
|
|
if not re.fullmatch("[.a-zA-Z0-9-]+", s):
|
|
raise JsonableError(_("Invalid app ID"))
|
|
return s
|
|
|
|
|
|
def validate_token(token_str: str, kind: int) -> None:
|
|
if token_str == "" or len(token_str) > 4096:
|
|
raise JsonableError(_("Empty or invalid length token"))
|
|
if kind == PushDeviceToken.APNS:
|
|
# Validate that we can actually decode the token.
|
|
try:
|
|
b64_to_hex(token_str)
|
|
except Exception:
|
|
raise JsonableError(_("Invalid APNS token"))
|
|
|
|
|
|
@human_users_only
|
|
@has_request_variables
|
|
def add_apns_device_token(
|
|
request: HttpRequest,
|
|
user_profile: UserProfile,
|
|
token: str = REQ(),
|
|
appid: str = REQ(str_validator=check_app_id),
|
|
) -> HttpResponse:
|
|
validate_token(token, PushDeviceToken.APNS)
|
|
add_push_device_token(user_profile, token, PushDeviceToken.APNS, ios_app_id=appid)
|
|
return json_success(request)
|
|
|
|
|
|
@human_users_only
|
|
@has_request_variables
|
|
def add_android_reg_id(
|
|
request: HttpRequest, user_profile: UserProfile, token: str = REQ()
|
|
) -> HttpResponse:
|
|
validate_token(token, PushDeviceToken.GCM)
|
|
add_push_device_token(user_profile, token, PushDeviceToken.GCM)
|
|
return json_success(request)
|
|
|
|
|
|
@human_users_only
|
|
@has_request_variables
|
|
def remove_apns_device_token(
|
|
request: HttpRequest, user_profile: UserProfile, token: str = REQ()
|
|
) -> HttpResponse:
|
|
validate_token(token, PushDeviceToken.APNS)
|
|
remove_push_device_token(user_profile, token, PushDeviceToken.APNS)
|
|
return json_success(request)
|
|
|
|
|
|
@human_users_only
|
|
@has_request_variables
|
|
def remove_android_reg_id(
|
|
request: HttpRequest, user_profile: UserProfile, token: str = REQ()
|
|
) -> HttpResponse:
|
|
validate_token(token, PushDeviceToken.GCM)
|
|
remove_push_device_token(user_profile, token, PushDeviceToken.GCM)
|
|
return json_success(request)
|
|
|
|
|
|
@human_users_only
|
|
@has_request_variables
|
|
def send_test_push_notification_api(
|
|
request: HttpRequest, user_profile: UserProfile, token: Optional[str] = REQ(default=None)
|
|
) -> HttpResponse:
|
|
# If a token is specified in the request, the test notification is supposed to be sent
|
|
# to that device. If no token is provided, the test notification should be sent to
|
|
# all devices registered for the user.
|
|
if token is not None:
|
|
try:
|
|
devices = [PushDeviceToken.objects.get(token=token, user=user_profile)]
|
|
except PushDeviceToken.DoesNotExist:
|
|
raise InvalidPushDeviceTokenError
|
|
else:
|
|
devices = list(PushDeviceToken.objects.filter(user=user_profile))
|
|
|
|
send_test_push_notification(user_profile, devices)
|
|
|
|
return json_success(request)
|
|
|
|
|
|
@zulip_login_required
|
|
@typed_endpoint
|
|
def self_hosting_auth_redirect(
|
|
request: HttpRequest,
|
|
*,
|
|
next_page: Optional[str] = None,
|
|
) -> HttpResponse: # nocoverage
|
|
if not settings.DEVELOPMENT or not uses_notification_bouncer():
|
|
return render(request, "404.html", status=404)
|
|
|
|
user = request.user
|
|
assert user.is_authenticated
|
|
assert isinstance(user, UserProfile)
|
|
if not user.has_billing_access:
|
|
# We may want to replace this with an html error page at some point,
|
|
# but this endpoint shouldn't be accessible via the UI to an unauthorized
|
|
# user - and they need to directly enter the URL in their browser. So a json
|
|
# error may be sufficient.
|
|
raise OrganizationOwnerRequiredError
|
|
|
|
realm_info = get_realms_info_for_push_bouncer(user.realm_id)[0]
|
|
|
|
user_info = UserDataForRemoteBilling(
|
|
uuid=user.uuid,
|
|
email=user.delivery_email,
|
|
full_name=user.full_name,
|
|
)
|
|
|
|
post_data = {
|
|
"user": user_info.model_dump_json(),
|
|
"realm": realm_info.model_dump_json(),
|
|
# The uri_scheme is necessary for the bouncer to know the correct URL
|
|
# to redirect the user to for re-authing in case the session expires.
|
|
# Otherwise, the bouncer would know only the realm.host but be missing
|
|
# the knowledge of whether to use http or https.
|
|
"uri_scheme": settings.EXTERNAL_URI_SCHEME,
|
|
}
|
|
if next_page is not None:
|
|
post_data["next_page"] = next_page
|
|
|
|
try:
|
|
result = send_to_push_bouncer("POST", "server/billing", post_data)
|
|
except MissingRemoteRealmError:
|
|
# Upload realm info and re-try. It should work now.
|
|
send_realms_only_to_push_bouncer()
|
|
result = send_to_push_bouncer("POST", "server/billing", post_data)
|
|
|
|
if result["result"] != "success":
|
|
raise JsonableError(_("Error returned by the bouncer: {result}").format(result=result))
|
|
|
|
redirect_url = result["billing_access_url"]
|
|
assert isinstance(redirect_url, str)
|
|
return HttpResponseRedirect(redirect_url)
|