Files
zulip/corporate/tests/test_remote_billing.py
Ethan Mayer c12b94aea4 models: Refactor corporate/models.py into models package.
Fixes #34318.

Seperated models file into a package with component files.
2025-04-08 10:16:35 -07:00

1622 lines
71 KiB
Python

from datetime import timedelta
from typing import TYPE_CHECKING
from unittest import mock
import responses
import time_machine
from django.conf import settings
from django.utils.timezone import now as timezone_now
from typing_extensions import override
from corporate.lib.remote_billing_util import (
REMOTE_BILLING_SESSION_VALIDITY_SECONDS,
LegacyServerIdentityDict,
RemoteBillingIdentityDict,
RemoteBillingUserDict,
)
from corporate.lib.stripe import RemoteRealmBillingSession, RemoteServerBillingSession, add_months
from corporate.models.customers import get_customer_by_remote_realm, get_customer_by_remote_server
from corporate.models.licenses import LicenseLedger
from corporate.models.plans import CustomerPlan, get_current_plan_by_customer
from corporate.views.remote_billing_page import generate_confirmation_link_for_server_deactivation
from zerver.lib.exceptions import RemoteRealmServerMismatchError
from zerver.lib.rate_limiter import RateLimitedIPAddr
from zerver.lib.remote_server import send_server_data_to_push_bouncer
from zerver.lib.send_email import FromAddress
from zerver.lib.test_classes import BouncerTestCase
from zerver.lib.test_helpers import activate_push_notification_service, ratelimit_rule
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.models import Realm, UserProfile
from zerver.models.realms import get_realm
from zilencer.models import (
PreregistrationRemoteRealmBillingUser,
PreregistrationRemoteServerBillingUser,
RateLimitedRemoteZulipServer,
RemoteRealm,
RemoteRealmBillingUser,
RemoteServerBillingUser,
RemoteZulipServer,
)
if TYPE_CHECKING:
from django.test.client import _MonkeyPatchedWSGIResponse as TestHttpResponse
class RemoteRealmBillingTestCase(BouncerTestCase):
def execute_remote_billing_authentication_flow(
self,
user: UserProfile,
next_page: str | None = None,
expect_tos: bool = True,
confirm_tos: bool = True,
first_time_login: bool = True,
# This only matters if first_time_login is True, since otherwise
# there's no confirmation link to be clicked:
return_without_clicking_confirmation_link: bool = False,
# This is in order to return the response early, right after accessing the
# authentication url for the user. This is useful for tests who expect an
# an error there.
return_from_auth_url: bool = False,
) -> "TestHttpResponse":
now = timezone_now()
self_hosted_billing_url = "/self-hosted-billing/"
if next_page is not None:
self_hosted_billing_url += f"?next_page={next_page}"
with time_machine.travel(now, tick=False):
result = self.client_get(self_hosted_billing_url)
self.assertEqual(result.status_code, 302)
self.assertIn("http://selfhosting.testserver/remote-billing-login/", result["Location"])
signed_auth_url = result["Location"]
signed_access_token = signed_auth_url.split("/")[-1]
with time_machine.travel(now, tick=False):
result = self.client_get(signed_auth_url, subdomain="selfhosting")
if return_from_auth_url:
return result
if first_time_login:
self.assertFalse(RemoteRealmBillingUser.objects.filter(user_uuid=user.uuid).exists())
# When logging in for the first time some extra steps are needed
# to confirm and verify the email address.
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(["Enter email"], result)
self.assert_in_success_response([user.realm.host], result)
self.assert_in_success_response(
[f'action="/remote-billing-login/{signed_access_token}/confirm/"'], result
)
with time_machine.travel(now, tick=False):
result = self.client_post(
f"/remote-billing-login/{signed_access_token}/confirm/",
{"email": user.delivery_email},
subdomain="selfhosting",
)
if result.status_code == 429:
# Return rate limit errors early, since they occur in rate limiting tests
# that want to verify them.
return result
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(
[
"To finish logging in, check your email account (",
") for a confirmation email from Zulip.",
user.delivery_email,
],
result,
)
confirmation_url = self.get_confirmation_url_from_outbox(
user.delivery_email,
url_pattern=(
f"{settings.SELF_HOSTING_MANAGEMENT_SUBDOMAIN}.{settings.EXTERNAL_HOST}"
r"(\S+)"
),
email_body_contains="confirm your email and log in to Zulip plan management",
)
if return_without_clicking_confirmation_link:
return result
with time_machine.travel(now, tick=False):
result = self.client_get(confirmation_url, subdomain="selfhosting")
remote_billing_user = RemoteRealmBillingUser.objects.latest("id")
self.assertEqual(remote_billing_user.user_uuid, user.uuid)
self.assertEqual(remote_billing_user.email, user.delivery_email)
prereg_user = PreregistrationRemoteRealmBillingUser.objects.latest("id")
self.assertEqual(prereg_user.created_user, remote_billing_user)
self.assertEqual(remote_billing_user.date_joined, now)
# Now we should be redirected again to the /remote-billing-login/ endpoint
# with a new signed_access_token. Now that the email has been confirmed,
# and we have a RemoteRealmBillingUser entry, we'll be in the same position
# as the case where first_time_login=False.
self.assertEqual(result.status_code, 302)
self.assertTrue(result["Location"].startswith("/remote-billing-login/"))
result = self.client_get(result["Location"], subdomain="selfhosting")
# Final confirmation page - just confirm your details, possibly
# agreeing to ToS if needed and an authenticated session will be granted:
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(["Log in to Zulip plan management"], result)
self.assert_in_success_response([user.realm.host], result)
params = {}
if expect_tos:
self.assert_in_success_response(["I agree", "Terms of Service"], result)
if confirm_tos:
params = {"tos_consent": "true"}
with time_machine.travel(now, tick=False):
result = self.client_post(signed_auth_url, params, subdomain="selfhosting")
if result.status_code >= 400:
# Failures should be returned early so the caller can assert about them.
return result
# Verify the authed data that should have been stored in the session.
remote_billing_user = RemoteRealmBillingUser.objects.get(user_uuid=user.uuid)
identity_dict = RemoteBillingIdentityDict(
user=RemoteBillingUserDict(
user_email=user.delivery_email,
user_uuid=str(user.uuid),
user_full_name=user.full_name,
),
remote_server_uuid=str(self.server.uuid),
remote_realm_uuid=str(user.realm.uuid),
remote_billing_user_id=remote_billing_user.id,
authenticated_at=datetime_to_timestamp(now),
uri_scheme="http://",
next_page=next_page,
)
self.assertEqual(
self.client.session["remote_billing_identities"][f"remote_realm:{user.realm.uuid!s}"],
identity_dict,
)
self.assertEqual(remote_billing_user.last_login, now)
# It's up to the caller to verify further details, such as the exact redirect URL,
# depending on the set up and intent of the test.
return result
@activate_push_notification_service()
class SelfHostedBillingEndpointBasicTest(RemoteRealmBillingTestCase):
@responses.activate
def test_self_hosted_billing_endpoints(self) -> None:
# An ordinary user doesn't have access to these endpoints.
self.login("hamlet")
for url in [
"/self-hosted-billing/",
"/json/self-hosted-billing",
"/self-hosted-billing/not-configured/",
]:
result = self.client_get(url)
self.assert_json_error(result, "Must be an organization owner")
# Login as an organization owner to gain access.
self.login("desdemona")
self.add_mock_response()
self_hosted_billing_url = "/self-hosted-billing/"
self_hosted_billing_json_url = "/json/self-hosted-billing"
with self.settings(ZULIP_SERVICE_PUSH_NOTIFICATIONS=False):
with self.settings(CORPORATE_ENABLED=True):
result = self.client_get(self_hosted_billing_url)
self.assertEqual(result.status_code, 404)
self.assert_in_response("Page not found (404)", result)
with self.settings(CORPORATE_ENABLED=False):
result = self.client_get(self_hosted_billing_url)
self.assertEqual(result.status_code, 302)
redirect_url = result["Location"]
self.assertEqual(redirect_url, "/self-hosted-billing/not-configured/")
with self.assertLogs("django.request"):
result = self.client_get(redirect_url)
self.assert_in_response(
"This server is not configured to use push notifications.", result
)
with self.settings(CORPORATE_ENABLED=True):
result = self.client_get(self_hosted_billing_json_url)
self.assert_json_error(
result, "Server doesn't use the push notification service", 404
)
with self.settings(CORPORATE_ENABLED=False):
result = self.client_get(self_hosted_billing_json_url)
self.assert_json_success(result)
redirect_url = result.json()["billing_access_url"]
self.assertEqual(redirect_url, "/self-hosted-billing/not-configured/")
with self.assertLogs("django.request"):
result = self.client_get(redirect_url)
self.assert_in_response(
"This server is not configured to use push notifications.", result
)
with mock.patch(
"zerver.views.push_notifications.send_to_push_bouncer",
side_effect=RemoteRealmServerMismatchError,
):
result = self.client_get(self_hosted_billing_url)
self.assertEqual(result.status_code, 403)
self.assert_in_response("Unexpected Zulip server registration", result)
result = self.client_get(self_hosted_billing_json_url)
self.assert_json_error(
result,
"Your organization is registered to a different Zulip server. Please contact Zulip support for assistance in resolving this issue.",
403,
)
# Now test successes. We only check that an url for accessing the remote billing system
# is returned (in the appropriate format - redirect or json data, depending on the endpoint).
# We don't need to test that returned URL beyond that, because that's just the full auth flow,
# which gets tested properly in other tests.
result = self.client_get(self_hosted_billing_url)
self.assertEqual(result.status_code, 302)
self.assertIn("http://selfhosting.testserver/remote-billing-login/", result["Location"])
result = self.client_get(self_hosted_billing_json_url)
self.assert_json_success(result)
data = result.json()
self.assertEqual(sorted(data.keys()), ["billing_access_url", "msg", "result"])
self.assertIn(
"http://selfhosting.testserver/remote-billing-login/", data["billing_access_url"]
)
@activate_push_notification_service()
class RemoteBillingAuthenticationTest(RemoteRealmBillingTestCase):
def test_self_hosted_config_error_page(self) -> None:
self.login("desdemona")
with (
self.settings(CORPORATE_ENABLED=False, ZULIP_SERVICE_PUSH_NOTIFICATIONS=False),
self.assertLogs("django.request"),
):
result = self.client_get("/self-hosted-billing/not-configured/")
self.assertEqual(result.status_code, 500)
self.assert_in_response(
"This server is not configured to use push notifications.", result
)
# The page doesn't make sense if PUSH_NOTIFICATION_BOUNCER_URL is configured.
with self.settings(CORPORATE_ENABLED=False):
result = self.client_get("/self-hosted-billing/not-configured/")
self.assertEqual(result.status_code, 404)
# Also doesn't make sense on zulipchat.com (where CORPORATE_ENABLED is True).
with self.settings(CORPORATE_ENABLED=True, ZULIP_SERVICE_PUSH_NOTIFICATIONS=False):
result = self.client_get("/self-hosted-billing/not-configured/")
self.assertEqual(result.status_code, 404)
@responses.activate
def test_remote_billing_authentication_flow(self) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
realm = desdemona.realm
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
result = self.execute_remote_billing_authentication_flow(desdemona)
# TODO: The redirect URL will vary depending on the billing state of the user's
# realm/server when we implement proper logic for that. For now, we can simply
# hard-code an assert about the endpoint.
self.assertEqual(result["Location"], f"/realm/{realm.uuid!s}/plans/")
# Go to the URL we're redirected to after authentication and assert
# some basic expected content.
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response(["showing-self-hosted", "Retain full control"], result)
@ratelimit_rule(10, 3, domain="sends_email_by_remote_server")
@ratelimit_rule(10, 2, domain="sends_email_by_ip")
@responses.activate
def test_remote_billing_authentication_flow_rate_limited(self) -> None:
RateLimitedIPAddr("127.0.0.1", domain="sends_email_by_ip").clear_history()
RateLimitedRemoteZulipServer(
self.server, domain="sends_email_by_remote_server"
).clear_history()
self.login("desdemona")
desdemona = self.example_user("desdemona")
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
for i in range(2):
result = self.execute_remote_billing_authentication_flow(
desdemona, return_without_clicking_confirmation_link=True
)
self.assertEqual(result.status_code, 200)
result = self.execute_remote_billing_authentication_flow(
desdemona, return_without_clicking_confirmation_link=True
)
self.assertEqual(result.status_code, 429)
self.assert_in_response("You have exceeded the limit", result)
# Reset the IP rate limit so that we trigger the server-based one.
RateLimitedIPAddr("127.0.0.1", domain="sends_email_by_ip").clear_history()
result = self.execute_remote_billing_authentication_flow(
desdemona, return_without_clicking_confirmation_link=True
)
self.assertEqual(result.status_code, 200)
with self.assertLogs("zilencer.auth", "WARN") as mock_log:
result = self.execute_remote_billing_authentication_flow(
desdemona, return_without_clicking_confirmation_link=True
)
self.assertEqual(result.status_code, 429)
self.assert_in_response("Your server has exceeded the limit", result)
self.assertEqual(
mock_log.output,
[
f"WARNING:zilencer.auth:Remote server {self.server.hostname} {str(self.server.uuid)[:12]} exceeded "
"rate limits on domain sends_email_by_remote_server"
],
)
@responses.activate
def test_remote_billing_authentication_flow_realm_not_registered(self) -> None:
RemoteRealm.objects.all().delete()
self.login("desdemona")
desdemona = self.example_user("desdemona")
realm = desdemona.realm
self.add_mock_response()
# We do the flow without having the realm registered with the push bouncer.
# In such a case, the local /self-hosted-billing/ endpoint should error-handle
# properly and end up registering the server's realms with the bouncer,
# and successfully completing the flow - transparently to the user.
self.assertFalse(RemoteRealm.objects.filter(uuid=realm.uuid).exists())
# send_server_data_to_push_bouncer will be called within the endpoint's
# error handling to register realms with the bouncer. We mock.patch it
# to be able to assert that it was called - but also use side_effect
# to maintain the original behavior of the function, instead of
# replacing it with a Mock.
with mock.patch(
"zerver.views.push_notifications.send_server_data_to_push_bouncer",
side_effect=send_server_data_to_push_bouncer,
) as m:
result = self.execute_remote_billing_authentication_flow(desdemona)
m.assert_called_once()
# The user's realm should now be registered:
self.assertTrue(RemoteRealm.objects.filter(uuid=realm.uuid).exists())
self.assertEqual(result["Location"], f"/realm/{realm.uuid!s}/plans/")
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response(["showing-self-hosted", "Retain full control"], result)
@responses.activate
def test_remote_billing_authentication_flow_tos_consent_failure(self) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
result = self.execute_remote_billing_authentication_flow(
desdemona,
expect_tos=True,
confirm_tos=False,
)
self.assert_json_error(result, "You must accept the Terms of Service to proceed.")
@responses.activate
def test_remote_billing_authentication_flow_tos_consent_update(self) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
with self.settings(TERMS_OF_SERVICE_VERSION="1.0"):
result = self.execute_remote_billing_authentication_flow(
desdemona,
expect_tos=True,
confirm_tos=True,
)
self.assertEqual(result.status_code, 302)
remote_billing_user = RemoteRealmBillingUser.objects.last()
assert remote_billing_user is not None
self.assertEqual(remote_billing_user.user_uuid, desdemona.uuid)
self.assertEqual(remote_billing_user.tos_version, "1.0")
# Now bump the ToS version. They need to agree again.
with self.settings(TERMS_OF_SERVICE_VERSION="2.0"):
result = self.execute_remote_billing_authentication_flow(
desdemona,
expect_tos=True,
confirm_tos=False,
first_time_login=False,
)
self.assert_json_error(result, "You must accept the Terms of Service to proceed.")
result = self.execute_remote_billing_authentication_flow(
desdemona,
expect_tos=True,
confirm_tos=True,
first_time_login=False,
)
remote_billing_user.refresh_from_db()
self.assertEqual(remote_billing_user.user_uuid, desdemona.uuid)
self.assertEqual(remote_billing_user.tos_version, "2.0")
@responses.activate
def test_remote_billing_authentication_flow_expired_session(self) -> None:
now = timezone_now()
self.login("desdemona")
desdemona = self.example_user("desdemona")
realm = desdemona.realm
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
with time_machine.travel(now, tick=False):
result = self.execute_remote_billing_authentication_flow(desdemona)
self.assertEqual(result["Location"], f"/realm/{realm.uuid!s}/plans/")
final_url = result["Location"]
# Go to the URL we're redirected to after authentication and make sure
# we're granted access.
with time_machine.travel(
now + timedelta(seconds=1),
tick=False,
):
result = self.client_get(final_url, subdomain="selfhosting")
self.assert_in_success_response(["showing-self-hosted", "Retain full control"], result)
# Now go there again, simulating doing this after the session has expired.
# We should be denied access and redirected to re-auth.
with time_machine.travel(
now + timedelta(seconds=REMOTE_BILLING_SESSION_VALIDITY_SECONDS + 1),
tick=False,
):
result = self.client_get(
final_url, subdomain="selfhosting", HTTP_ACCEPT="text/html, */*;q=0.8"
)
self.assertEqual(result.status_code, 302)
self.assertEqual(
result["Location"],
f"http://{desdemona.realm.host}/self-hosted-billing/?next_page=plans",
)
# Opening this re-auth URL in result["Location"] is same as re-doing the auth
# flow via execute_remote_billing_authentication_flow with next_page="plans".
# So let's test that and assert that we end up successfully re-authed on the /plans
# page.
result = self.execute_remote_billing_authentication_flow(
desdemona,
next_page="plans",
# ToS has already been confirmed earlier.
expect_tos=False,
confirm_tos=False,
first_time_login=False,
)
self.assertEqual(result["Location"], f"/realm/{realm.uuid!s}/plans/")
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response(["showing-self-hosted", "Retain full control"], result)
@responses.activate
def test_remote_billing_unauthed_access(self) -> None:
now = timezone_now()
self.login("desdemona")
desdemona = self.example_user("desdemona")
realm = desdemona.realm
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Straight-up access without authing at all:
result = self.client_get(f"/realm/{realm.uuid!s}/plans/", subdomain="selfhosting")
self.assert_json_error(result, "User not authenticated", 401)
result = self.execute_remote_billing_authentication_flow(desdemona)
self.assertEqual(result["Location"], f"/realm/{realm.uuid!s}/plans/")
final_url = result["Location"]
# Sanity check - access is granted after authing:
result = self.client_get(final_url, subdomain="selfhosting")
self.assertEqual(result.status_code, 200)
# Now mess with the identity dict in the session in unlikely ways so that it should
# not grant access.
# First delete the RemoteRealm entry for this session.
RemoteRealm.objects.filter(uuid=realm.uuid).delete()
with self.assertLogs("django.request", "ERROR") as m, self.assertRaises(AssertionError):
self.client_get(final_url, subdomain="selfhosting")
self.assertIn(
"The remote realm is missing despite being in the RemoteBillingIdentityDict",
m.output[0],
)
# Try the case where the identity dict is simultaneously expired.
with (
time_machine.travel(
now + timedelta(seconds=REMOTE_BILLING_SESSION_VALIDITY_SECONDS + 30),
tick=False,
),
self.assertLogs("django.request", "ERROR") as m,
self.assertRaises(AssertionError),
):
self.client_get(final_url, subdomain="selfhosting")
# The django.request log should be a traceback, mentioning the relevant
# exceptions that occurred.
self.assertIn(
"RemoteBillingIdentityExpiredError",
m.output[0],
)
self.assertIn(
"AssertionError",
m.output[0],
)
@responses.activate
def test_remote_billing_authentication_flow_to_sponsorship_page(self) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
realm = desdemona.realm
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
result = self.execute_remote_billing_authentication_flow(desdemona, "sponsorship")
self.assertEqual(result["Location"], f"/realm/{realm.uuid!s}/sponsorship/")
# Go to the URL we're redirected to after authentication and assert
# some basic expected content.
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response(
["Request Zulip", "sponsorship", "Description of your organization"], result
)
@responses.activate
def test_remote_billing_authentication_flow_to_upgrade_page(self) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
realm = desdemona.realm
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
result = self.execute_remote_billing_authentication_flow(desdemona, "upgrade")
self.assertEqual(result["Location"], f"/realm/{realm.uuid!s}/upgrade/")
# Go to the URL we're redirected to after authentication and assert
# some basic expected content.
# TODO: Add test for the case when redirected to error page (not yet implemented)
# due to MissingDataError ('has_stale_audit_log' is True).
with mock.patch("corporate.lib.stripe.has_stale_audit_log", return_value=False):
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response(
["Upgrade", "Purchase Zulip", "Your subscription will renew automatically."], result
)
@responses.activate
def test_remote_billing_authentication_flow_cant_access_billing_without_finishing_confirmation(
self,
) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
realm = desdemona.realm
self.add_mock_response()
result = self.execute_remote_billing_authentication_flow(
desdemona,
expect_tos=True,
confirm_tos=False,
first_time_login=True,
return_without_clicking_confirmation_link=True,
)
result = self.client_get(f"/realm/{realm.uuid!s}/billing/", subdomain="selfhosting")
# Access is not allowed. The user doesn't have an IdentityDict in the session, so
# we can't do a nice redirect back to their original server.
self.assertEqual(result.status_code, 401)
@responses.activate
def test_remote_billing_authentication_flow_generate_two_confirmation_links_before_confirming(
self,
) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
self.add_mock_response()
result = self.execute_remote_billing_authentication_flow(
desdemona,
expect_tos=True,
confirm_tos=False,
first_time_login=True,
return_without_clicking_confirmation_link=True,
)
self.assertEqual(result.status_code, 200)
first_confirmation_url = self.get_confirmation_url_from_outbox(
desdemona.delivery_email,
url_pattern=(
f"{settings.SELF_HOSTING_MANAGEMENT_SUBDOMAIN}.{settings.EXTERNAL_HOST}" + r"(\S+)"
),
)
first_prereg_user = PreregistrationRemoteRealmBillingUser.objects.latest("id")
result = self.execute_remote_billing_authentication_flow(
desdemona,
expect_tos=True,
confirm_tos=False,
first_time_login=True,
return_without_clicking_confirmation_link=True,
)
self.assertEqual(result.status_code, 200)
second_confirmation_url = self.get_confirmation_url_from_outbox(
desdemona.delivery_email,
url_pattern=(
f"{settings.SELF_HOSTING_MANAGEMENT_SUBDOMAIN}.{settings.EXTERNAL_HOST}" + r"(\S+)"
),
)
second_prereg_user = PreregistrationRemoteRealmBillingUser.objects.latest("id")
self.assertNotEqual(first_confirmation_url, second_confirmation_url)
self.assertNotEqual(first_prereg_user.id, second_prereg_user.id)
now = timezone_now()
# Click the first confirmation link.
with time_machine.travel(now, tick=False):
result = self.client_get(first_confirmation_url, subdomain="selfhosting")
self.assertEqual(result.status_code, 302)
self.assertTrue(result["Location"].startswith("/remote-billing-login/"))
# This created the RemoteRealmBillingUser entry.
remote_billing_user = RemoteRealmBillingUser.objects.latest("id")
self.assertEqual(remote_billing_user.user_uuid, desdemona.uuid)
self.assertEqual(remote_billing_user.email, desdemona.delivery_email)
first_prereg_user.refresh_from_db()
self.assertEqual(first_prereg_user.created_user, remote_billing_user)
# Now click the second confirmation link. The RemoteRealmBillingUser entry
# stays the same, since it's already been created, and the user is redirected
# normally further through the flow, while we log this event.
with (
time_machine.travel(now + timedelta(seconds=1), tick=False),
self.assertLogs("corporate.stripe", "INFO") as mock_logger,
):
result = self.client_get(second_confirmation_url, subdomain="selfhosting")
self.assertEqual(result.status_code, 302)
self.assertTrue(result["Location"].startswith("/remote-billing-login/"))
# The RemoteRealmBillingUser entry stays the same.
self.assertEqual(RemoteRealmBillingUser.objects.latest("id"), remote_billing_user)
# The second prereg user is unused, since it wasn't needed.
self.assertEqual(second_prereg_user.created_user, None)
self.assertEqual(
mock_logger.output,
[
"INFO:corporate.stripe:Matching RemoteRealmBillingUser already exists for "
f"PreregistrationRemoteRealmBillingUser {second_prereg_user.id}"
],
)
@responses.activate
def test_transfer_complimentary_access_plan_scheduled_for_upgrade_from_server_to_realm(
self,
) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
# Assert current server is not on any plan.
self.assertIsNone(get_customer_by_remote_server(self.server))
start_date = timezone_now()
end_date = add_months(timezone_now(), 10)
# Migrate server to complimentary access plan.
server_billing_session = RemoteServerBillingSession(self.server)
server_billing_session.create_complimentary_access_plan(start_date, end_date)
server_customer = server_billing_session.get_customer()
assert server_customer is not None
server_plan = get_current_plan_by_customer(server_customer)
assert server_plan is not None
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_SELF_MANAGED_LEGACY)
self.assertEqual(server_plan.tier, CustomerPlan.TIER_SELF_HOSTED_LEGACY)
self.assertEqual(server_plan.status, CustomerPlan.ACTIVE)
# Schedule upgrade for plan.
server_plan.status = CustomerPlan.SWITCH_PLAN_TIER_AT_PLAN_END
server_plan.save(update_fields=["status"])
# Just create a temporary plan and check if gets transferred or not.
server_next_plan = CustomerPlan.objects.create(
customer=server_customer,
billing_cycle_anchor=end_date,
billing_schedule=CustomerPlan.BILLING_SCHEDULE_ANNUAL,
tier=CustomerPlan.TIER_SELF_HOSTED_BUSINESS,
status=CustomerPlan.NEVER_STARTED,
)
# There are four test realms on this server:
# <Realm: zulipinternal 1>, <Realm: zephyr 3>, <Realm: lear 4>, <Realm: zulip 2>
self.assert_length(Realm.objects.all(), 4)
# Delete any existing remote realms.
RemoteRealm.objects.all().delete()
# Send server data to push bouncer.
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Login to plan management.
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=True
)
self.assertEqual(result.status_code, 200)
self.assert_in_response("Plan management not available", result)
# Server plan status stayed the same.
self.server.refresh_from_db()
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_SELF_MANAGED_LEGACY)
# RemoteRealm objects should be created for all realms on the server but no customer plans.
self.assert_length(RemoteRealm.objects.all(), 4)
for remote_realm in RemoteRealm.objects.all():
self.assertIsNone(get_customer_by_remote_realm(remote_realm))
# Same customer plan exists for server since there are multiple realms to manage here.
server_plan.refresh_from_db()
self.assertEqual(get_current_plan_by_customer(server_customer), server_plan)
self.assertEqual(server_plan.customer, server_customer)
# Deactivate realms other than bot realm and zulip realm then try the migration again.
Realm.objects.exclude(string_id__in=["zulip", "zulipinternal"]).update(deactivated=True)
# Send server data to push bouncer.
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Login to plan management. Performs customer migration from server to realms.
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=False
)
self.assertEqual(result.status_code, 302)
# Server plan status was reset
self.server.refresh_from_db()
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_SELF_MANAGED)
# Check if zephyr and lear were deactivated
self.assertCountEqual(
RemoteRealm.objects.filter(realm_deactivated=True).values_list("host", flat=True),
["zephyr.testserver", "lear.testserver"],
)
# Check complimentary access CustomerPlan exists for the one non-deactivated
# "real" realm and does not for the bot realm.
# Sanity check that the setup for this test is the way we think it is.
self.assertEqual(RemoteRealm.objects.filter(realm_deactivated=False).count(), 2)
# These queries have a unique result, so we can use .get().
remote_realm_with_plan = RemoteRealm.objects.get(
realm_deactivated=False, is_system_bot_realm=False
)
system_bot_remote_realm = RemoteRealm.objects.get(
realm_deactivated=False, is_system_bot_realm=True
)
self.assertIsNone(get_customer_by_remote_realm(system_bot_remote_realm))
self.assertEqual(remote_realm_with_plan.host, "zulip.testserver")
customer = get_customer_by_remote_realm(remote_realm_with_plan)
assert customer is not None
# Customer got transferred from server to realm.
self.assertEqual(customer, server_customer)
plan = get_current_plan_by_customer(customer)
assert plan is not None
self.assertEqual(
remote_realm_with_plan.plan_type, RemoteRealm.PLAN_TYPE_SELF_MANAGED_LEGACY
)
self.assertEqual(plan.tier, CustomerPlan.TIER_SELF_HOSTED_LEGACY)
self.assertEqual(plan.status, CustomerPlan.SWITCH_PLAN_TIER_AT_PLAN_END)
self.assertEqual(plan.billing_cycle_anchor, start_date)
self.assertEqual(plan.end_date, end_date)
self.assertEqual(
RemoteRealmBillingSession(remote_realm_with_plan).get_next_plan(plan), server_next_plan
)
@responses.activate
def test_transfer_plan_from_server_to_realm_when_realm_has_customer(
self,
) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
zulip_realm = get_realm("zulip")
server_billing_session = RemoteServerBillingSession(self.server)
server_customer = server_billing_session.update_or_create_customer(
stripe_customer_id="cus_123server"
)
server_plan = CustomerPlan.objects.create(
customer=server_customer,
billing_cycle_anchor=timezone_now(),
billing_schedule=CustomerPlan.BILLING_SCHEDULE_ANNUAL,
tier=CustomerPlan.TIER_SELF_HOSTED_COMMUNITY,
status=CustomerPlan.ACTIVE,
)
self.server.plan_type = RemoteZulipServer.PLAN_TYPE_COMMUNITY
self.server.save(update_fields=["plan_type"])
# Delete any existing remote realms.
RemoteRealm.objects.all().delete()
# We want there to be only a single (non-system bot) realm on the server for our setup.
Realm.objects.exclude(string_id__in=["zulip", "zulipinternal"]).update(deactivated=True)
# Send server data to push bouncer.
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Let's create a plan for the realm. This will conflict with the server plan.
remote_realm = RemoteRealm.objects.get(uuid=zulip_realm.uuid)
realm_billing_session = RemoteRealmBillingSession(remote_realm)
realm_customer = realm_billing_session.update_or_create_customer(
stripe_customer_id="cus_123realm"
)
realm_plan = CustomerPlan.objects.create(
customer=realm_customer,
billing_cycle_anchor=timezone_now(),
billing_schedule=CustomerPlan.BILLING_SCHEDULE_ANNUAL,
tier=CustomerPlan.TIER_SELF_HOSTED_LEGACY,
status=CustomerPlan.ACTIVE,
)
remote_realm.plan_type = RemoteRealm.PLAN_TYPE_SELF_MANAGED_LEGACY
remote_realm.save(update_fields=["plan_type"])
with self.assertLogs("zilencer.views", "WARN") as mock_warn:
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=True
)
self.assertEqual(
mock_warn.output,
[
f"WARNING:zilencer.views:Failed to migrate customer from server (id: {remote_realm.server.id}) to realm (id: {remote_realm.id}): "
"RemoteRealm customer already exists and plans can't be migrated automatically."
],
)
self.assert_json_error(
result,
f"Couldn't reconcile billing data between server and realm. Please contact {FromAddress.SUPPORT}",
)
# If the realm's plan is ENDED, it's safe to move the server plan over.
realm_plan.status = CustomerPlan.ENDED
realm_plan.save(update_fields=["status"])
# However, not if the server's status indicates that there's some kind
# of plan change queued up after the plan, since that state would be
# harder and more risky to try to migrate.
server_plan.status = CustomerPlan.SWITCH_PLAN_TIER_AT_PLAN_END
server_plan.save(update_fields=["status"])
with self.assertLogs("zilencer.views", "WARN") as mock_warn:
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=True
)
self.assertEqual(
mock_warn.output,
[
f"WARNING:zilencer.views:Failed to migrate customer from server (id: {remote_realm.server.id}) to realm (id: {remote_realm.id}): "
"RemoteRealm customer already exists and plans can't be migrated automatically."
],
)
self.assert_json_error(
result,
f"Couldn't reconcile billing data between server and realm. Please contact {FromAddress.SUPPORT}",
)
# Now we simulate a regular, ACTIVE plan for the server again. Such a plan can be
# migrated, but we run into the last issue: the realm's customer already has a
# stripe_customer_id. We wouldn't want to overwrite it, so we error out.
server_plan.status = CustomerPlan.ACTIVE
server_plan.save(update_fields=["status"])
# Sanity check the assumption that stripe_customer_id is as expected for realm_customer.
realm_customer.refresh_from_db()
self.assertEqual(realm_customer.stripe_customer_id, "cus_123realm")
with self.assertLogs("zilencer.views", "WARN") as mock_warn:
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=True
)
self.assertEqual(
mock_warn.output,
[
f"WARNING:zilencer.views:Failed to migrate customer from server (id: {remote_realm.server.id}) to realm (id: {remote_realm.id}): "
"RemoteRealm customer already exists and plans can't be migrated automatically."
],
)
self.assert_json_error(
result,
f"Couldn't reconcile billing data between server and realm. Please contact {FromAddress.SUPPORT}",
)
# Finally, set the stripe_customer_id to None for the realm's customer.
# Having an ACTIVE plan for the server and an ENDED plan for the realm, we now have
# a simple case, where the migration should proceed.
realm_customer.stripe_customer_id = None
realm_customer.save(update_fields=["stripe_customer_id"])
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=False
)
self.assertEqual(result.status_code, 302)
# Server plan status was reset
self.server.refresh_from_db()
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_SELF_MANAGED)
# The Customer objects remain as they were.
self.assertEqual(get_customer_by_remote_realm(remote_realm), realm_customer)
self.assertEqual(get_customer_by_remote_server(self.server), server_customer)
# The plan that used to be for the server, has been migrated to the realm customer:
self.assertEqual(get_current_plan_by_customer(server_customer), None)
self.assertEqual(get_current_plan_by_customer(realm_customer), server_plan)
remote_realm.refresh_from_db()
self.assertEqual(remote_realm.plan_type, RemoteRealm.PLAN_TYPE_COMMUNITY)
realm_customer.refresh_from_db()
self.assertEqual(realm_customer.stripe_customer_id, "cus_123server")
server_customer.refresh_from_db()
self.assertEqual(server_customer.stripe_customer_id, None)
@responses.activate
def test_transfer_business_plan_from_server_to_realm(
self,
) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
# Assert current server is not on any plan.
self.assertIsNone(get_customer_by_remote_server(self.server))
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_SELF_MANAGED)
# Add server to business plan.
server_billing_session = RemoteServerBillingSession(self.server)
server_customer = server_billing_session.update_or_create_customer(stripe_customer_id=None)
assert server_customer is not None
# Just create a temporary plan and check if gets transferred or not.
server_plan = CustomerPlan.objects.create(
customer=server_customer,
billing_cycle_anchor=timezone_now(),
billing_schedule=CustomerPlan.BILLING_SCHEDULE_ANNUAL,
tier=CustomerPlan.TIER_SELF_HOSTED_BUSINESS,
status=CustomerPlan.ACTIVE,
automanage_licenses=True,
)
initial_license_count = 100
LicenseLedger.objects.create(
plan=server_plan,
is_renewal=True,
event_time=timezone_now(),
licenses=initial_license_count,
licenses_at_next_renewal=initial_license_count,
)
self.server.plan_type = RemoteZulipServer.PLAN_TYPE_BUSINESS
self.server.save(update_fields=["plan_type"])
# There are four test realms on this server:
# <Realm: zulipinternal 1>, <Realm: zephyr 3>, <Realm: lear 4>, <Realm: zulip 2>
self.assert_length(Realm.objects.all(), 4)
# Delete any existing remote realms.
RemoteRealm.objects.all().delete()
# Send server data to push bouncer.
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Login to plan management.
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=True
)
self.assertEqual(result.status_code, 200)
self.assert_in_response("Plan management not available", result)
# Server plan status stayed the same.
self.server.refresh_from_db()
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_BUSINESS)
# RemoteRealm objects should be created for all realms on the server but no customer plans.
self.assert_length(RemoteRealm.objects.all(), 4)
for remote_realm in RemoteRealm.objects.all():
self.assertIsNone(get_customer_by_remote_realm(remote_realm))
# Same customer plan exists for server since there are multiple realms to manage here.
server_plan.refresh_from_db()
self.assertEqual(get_current_plan_by_customer(server_customer), server_plan)
self.assertEqual(server_plan.customer, server_customer)
# Deactivate realms other than bot realm and zulip realm then try the migration again.
Realm.objects.exclude(string_id__in=["zulip", "zulipinternal"]).update(deactivated=True)
# Send server data to push bouncer.
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Login to plan management. Performs customer migration from server to realms.
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=False
)
self.assertEqual(result.status_code, 302)
# Server plan status was reset
self.server.refresh_from_db()
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_SELF_MANAGED)
# Check business CustomerPlan exists for all realms except bot realm.
# Sanity check that the setup for this test is the way we think it is.
self.assertEqual(RemoteRealm.objects.filter(realm_deactivated=False).count(), 2)
# These queries have a unique result, so we can use .get().
remote_realm_with_plan = RemoteRealm.objects.get(
realm_deactivated=False, is_system_bot_realm=False
)
system_bot_remote_realm = RemoteRealm.objects.get(
realm_deactivated=False, is_system_bot_realm=True
)
self.assertIsNone(get_customer_by_remote_realm(system_bot_remote_realm))
self.assertEqual(remote_realm_with_plan.host, "zulip.testserver")
customer = get_customer_by_remote_realm(remote_realm_with_plan)
assert customer is not None
# Customer got transferred from server to realm.
self.assertEqual(customer, server_customer)
plan = get_current_plan_by_customer(customer)
assert plan is not None
self.assertEqual(remote_realm_with_plan.plan_type, RemoteRealm.PLAN_TYPE_BUSINESS)
self.assertEqual(plan.tier, CustomerPlan.TIER_SELF_HOSTED_BUSINESS)
self.assertEqual(plan.status, CustomerPlan.ACTIVE)
# Check that an updated license ledger entry was created.
billing_session = RemoteRealmBillingSession(remote_realm=remote_realm_with_plan)
license_ledger = billing_session.get_last_ledger_for_automanaged_plan_if_exists()
billable_licenses = billing_session.get_billable_licenses_for_customer(customer, plan.tier)
assert license_ledger is not None
self.assertNotEqual(initial_license_count, billable_licenses)
self.assertEqual(license_ledger.licenses, initial_license_count)
self.assertEqual(license_ledger.licenses_at_next_renewal, billable_licenses)
self.assertFalse(license_ledger.is_renewal)
@responses.activate
def test_transfer_plan_from_server_to_realm_edge_cases(self) -> None:
self.login("desdemona")
desdemona = self.example_user("desdemona")
# CASE: Server has no customer
self.assertIsNone(get_customer_by_remote_server(self.server))
# Send server data to push bouncer.
self.add_mock_response()
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Login to plan management.
result = self.execute_remote_billing_authentication_flow(desdemona)
self.assertEqual(result.status_code, 302)
# Still no customer.
self.assertIsNone(get_customer_by_remote_server(self.server))
# CASE: Server has customer but no plan.
server_billing_session = RemoteServerBillingSession(self.server)
server_customer = server_billing_session.update_or_create_customer(stripe_customer_id=None)
# Send server data to push bouncer.
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Login to plan management.
result = self.execute_remote_billing_authentication_flow(
desdemona, first_time_login=False, expect_tos=False
)
self.assertEqual(result.status_code, 302)
# Server still has no plan.
self.assertIsNone(get_current_plan_by_customer(server_customer))
# CASE: Server has complimentary access plan but all realms are deactivated.
start_date = timezone_now()
end_date = add_months(timezone_now(), 10)
server_billing_session = RemoteServerBillingSession(self.server)
server_billing_session.create_complimentary_access_plan(start_date, end_date)
# All realms are deactivated.
Realm.objects.all().update(deactivated=True)
# Send server data to push bouncer.
send_server_data_to_push_bouncer(consider_usage_statistics=False)
# Login to plan management.
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=True
)
self.assertEqual(result.status_code, 200)
self.assert_in_response("Plan management not available", result)
# Server stays on the same plan.
server_plan = get_current_plan_by_customer(server_customer)
assert server_plan is not None
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_SELF_MANAGED_LEGACY)
self.assertEqual(server_plan.tier, CustomerPlan.TIER_SELF_HOSTED_LEGACY)
self.assertEqual(server_plan.status, CustomerPlan.ACTIVE)
# CASE: Server has business plan but all realms are deactivated.
server_plan.tier = CustomerPlan.TIER_SELF_HOSTED_BUSINESS
server_plan.save(update_fields=["tier"])
self.server.plan_type = RemoteZulipServer.PLAN_TYPE_BUSINESS
self.server.save(update_fields=["plan_type"])
# Login to plan management.
result = self.execute_remote_billing_authentication_flow(
desdemona, return_from_auth_url=True
)
self.assertEqual(result.status_code, 200)
self.assert_in_response("Plan management not available", result)
# Server stays on the same plan.
server_customer.refresh_from_db()
server_plan.refresh_from_db()
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_BUSINESS)
self.assertEqual(server_plan.tier, CustomerPlan.TIER_SELF_HOSTED_BUSINESS)
# CASE: Server has business plan but there are no realms.
Realm.objects.all().delete()
# Send server data to push bouncer.
send_server_data_to_push_bouncer(consider_usage_statistics=False)
server_customer.refresh_from_db()
server_plan.refresh_from_db()
# Server stays on same plan.
self.assertEqual(self.server.plan_type, RemoteZulipServer.PLAN_TYPE_BUSINESS)
self.assertEqual(server_plan.tier, CustomerPlan.TIER_SELF_HOSTED_BUSINESS)
self.assertEqual(server_plan.status, CustomerPlan.ACTIVE)
class RemoteServerTestCase(BouncerTestCase):
@override
def setUp(self) -> None:
super().setUp()
self.uuid = self.server.uuid
self.secret = self.server.api_key
def execute_remote_billing_authentication_flow(
self,
email: str,
full_name: str,
next_page: str | None = None,
expect_tos: bool = True,
confirm_tos: bool = True,
return_without_clicking_confirmation_link: bool = False,
) -> "TestHttpResponse":
now = timezone_now()
with time_machine.travel(now, tick=False):
payload = {"zulip_org_id": self.uuid, "zulip_org_key": self.secret}
if next_page is not None:
payload["next_page"] = next_page
result = self.client_post(
"/serverlogin/",
payload,
subdomain="selfhosting",
)
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(["Enter log in email"], result)
if next_page is not None:
self.assert_in_success_response(
[f'<input type="hidden" name="next_page" value="{next_page}" />'], result
)
self.assert_in_success_response([f'action="/serverlogin/{self.uuid!s}/confirm/"'], result)
# Verify the partially-authed data that should have been stored in the session. The flow
# isn't complete yet however, and this won't give the user access to authenticated endpoints,
# only allow them to proceed with confirmation.
identity_dict = LegacyServerIdentityDict(
remote_server_uuid=str(self.server.uuid),
authenticated_at=datetime_to_timestamp(now),
remote_billing_user_id=None,
)
self.assertEqual(
self.client.session["remote_billing_identities"][f"remote_server:{self.uuid!s}"],
identity_dict,
)
payload = {"email": email}
if next_page is not None:
payload["next_page"] = next_page
with time_machine.travel(now, tick=False):
result = self.client_post(
f"/serverlogin/{self.uuid!s}/confirm/",
payload,
subdomain="selfhosting",
)
if result.status_code == 429:
# Return rate limit errors early, since they occur in rate limiting tests
# that want to verify them.
return result
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(
["We have sent", "a log in", "link will expire in", email],
result,
)
confirmation_url = self.get_confirmation_url_from_outbox(
email,
url_pattern=(
f"{settings.SELF_HOSTING_MANAGEMENT_SUBDOMAIN}.{settings.EXTERNAL_HOST}" + r"(\S+)"
),
email_body_contains="This link will expire in 24 hours",
)
if return_without_clicking_confirmation_link:
return result
with time_machine.travel(now, tick=False):
result = self.client_get(confirmation_url, subdomain="selfhosting")
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(
[f"Log in to Zulip plan management for {self.server.hostname}", email], result
)
self.assert_in_success_response([f'action="{confirmation_url}"'], result)
if expect_tos:
self.assert_in_success_response(["I agree", "Terms of Service"], result)
payload = {"full_name": full_name}
if confirm_tos:
payload["tos_consent"] = "true"
with time_machine.travel(now, tick=False):
result = self.client_post(confirmation_url, payload, subdomain="selfhosting")
if result.status_code >= 400:
# Early return for the caller to assert about the error.
return result
# The user should now be fully authenticated.
# This should have been created in the process:
remote_billing_user = RemoteServerBillingUser.objects.get(
remote_server=self.server, email=email
)
# Verify the session looks as it should:
identity_dict = LegacyServerIdentityDict(
remote_server_uuid=str(self.server.uuid),
authenticated_at=datetime_to_timestamp(now),
remote_billing_user_id=remote_billing_user.id,
)
self.assertEqual(
self.client.session["remote_billing_identities"][f"remote_server:{self.uuid!s}"],
identity_dict,
)
self.assertEqual(remote_billing_user.last_login, now)
return result
class LegacyServerLoginTest(RemoteServerTestCase):
@ratelimit_rule(10, 3, domain="sends_email_by_remote_server")
@ratelimit_rule(10, 2, domain="sends_email_by_ip")
def test_remote_billing_authentication_flow_rate_limited(self) -> None:
RateLimitedIPAddr("127.0.0.1", domain="sends_email_by_ip").clear_history()
RateLimitedRemoteZulipServer(
self.server, domain="sends_email_by_remote_server"
).clear_history()
self.login("desdemona")
desdemona = self.example_user("desdemona")
for i in range(2):
result = self.execute_remote_billing_authentication_flow(
desdemona.delivery_email,
desdemona.full_name,
return_without_clicking_confirmation_link=True,
)
self.assertEqual(result.status_code, 200)
result = self.execute_remote_billing_authentication_flow(
desdemona.delivery_email,
desdemona.full_name,
return_without_clicking_confirmation_link=True,
)
self.assertEqual(result.status_code, 429)
self.assert_in_response("You have exceeded the limit", result)
# Reset the IP rate limit so that we trigger the server-based one.
RateLimitedIPAddr("127.0.0.1", domain="sends_email_by_ip").clear_history()
result = self.execute_remote_billing_authentication_flow(
desdemona.delivery_email,
desdemona.full_name,
return_without_clicking_confirmation_link=True,
)
self.assertEqual(result.status_code, 200)
with self.assertLogs("zilencer.auth", "WARN") as mock_log:
result = self.execute_remote_billing_authentication_flow(
desdemona.delivery_email,
desdemona.full_name,
return_without_clicking_confirmation_link=True,
)
self.assertEqual(result.status_code, 429)
self.assert_in_response("Your server has exceeded the limit", result)
self.assertEqual(
mock_log.output,
[
f"WARNING:zilencer.auth:Remote server {self.server.hostname} {str(self.server.uuid)[:12]} exceeded "
"rate limits on domain sends_email_by_remote_server"
],
)
def test_server_login_get(self) -> None:
result = self.client_get("/serverlogin/", subdomain="selfhosting")
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(["Authenticate server for Zulip plan management"], result)
def test_server_login_invalid_zulip_org_id(self) -> None:
result = self.client_post(
"/serverlogin/",
{"zulip_org_id": "invalid", "zulip_org_key": "secret"},
subdomain="selfhosting",
)
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(
["This zulip_org_id is not registered with Zulip&#39;s billing management system."],
result,
)
def test_server_login_invalid_zulip_org_key(self) -> None:
result = self.client_post(
"/serverlogin/",
{"zulip_org_id": self.uuid, "zulip_org_key": "invalid"},
subdomain="selfhosting",
)
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(["Invalid zulip_org_key for this zulip_org_id."], result)
def test_server_login_deactivated_server(self) -> None:
self.server.deactivated = True
self.server.save(update_fields=["deactivated"])
result = self.client_post(
"/serverlogin/",
{"zulip_org_id": self.uuid, "zulip_org_key": self.secret},
subdomain="selfhosting",
)
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(["Your server registration has been deactivated."], result)
def test_server_login_success_with_no_plan(self) -> None:
hamlet = self.example_user("hamlet")
now = timezone_now()
with time_machine.travel(now, tick=False):
result = self.execute_remote_billing_authentication_flow(
hamlet.delivery_email, hamlet.full_name, expect_tos=True, confirm_tos=True
)
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], f"/server/{self.uuid}/plans/")
result = self.client_get(f"/server/{self.uuid}/billing/", subdomain="selfhosting")
# The server has no plan, so the /billing page redirects to /upgrade
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], f"/server/{self.uuid}/upgrade/")
# Access on the upgrade page is granted, assert a basic string proving that.
# TODO: Add test for the case when redirected to error page (not yet implemented)
# due to MissingDataError ('has_stale_audit_log' is True).
with mock.patch("corporate.lib.stripe.has_stale_audit_log", return_value=False):
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response([f"Upgrade {self.server.hostname}"], result)
# Verify the RemoteServerBillingUser and PreRegistrationRemoteServerBillingUser
# objects created in the process.
remote_billing_user = RemoteServerBillingUser.objects.latest("id")
self.assertEqual(remote_billing_user.email, hamlet.delivery_email)
prereg_user = PreregistrationRemoteServerBillingUser.objects.latest("id")
self.assertEqual(prereg_user.created_user, remote_billing_user)
self.assertEqual(remote_billing_user.date_joined, now)
def test_server_login_success_consent_is_not_re_asked(self) -> None:
hamlet = self.example_user("hamlet")
result = self.execute_remote_billing_authentication_flow(
hamlet.delivery_email, hamlet.full_name, expect_tos=True, confirm_tos=True
)
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], f"/server/{self.uuid}/plans/")
# Now go through the flow again, but this time we should not be asked to re-confirm ToS.
result = self.execute_remote_billing_authentication_flow(
hamlet.delivery_email, hamlet.full_name, expect_tos=False, confirm_tos=False
)
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], f"/server/{self.uuid}/plans/")
def test_server_login_success_with_next_page(self) -> None:
hamlet = self.example_user("hamlet")
# First test an invalid next_page value.
result = self.client_post(
"/serverlogin/",
{"zulip_org_id": self.uuid, "zulip_org_key": self.secret, "next_page": "invalid"},
subdomain="selfhosting",
)
self.assert_json_error(result, "Invalid next_page", 400)
result = self.execute_remote_billing_authentication_flow(
hamlet.delivery_email, hamlet.full_name, next_page="sponsorship"
)
# We should be redirected to the page dictated by the next_page param.
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], f"/server/{self.uuid}/sponsorship/")
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response(["Request Zulip", "sponsorship", "Community"], result)
def test_server_login_next_page_in_form_persists(self) -> None:
result = self.client_get("/serverlogin/?next_page=billing", subdomain="selfhosting")
self.assert_in_success_response(
['<input type="hidden" name="next_page" value="billing" />'], result
)
result = self.client_post(
"/serverlogin/",
{"zulip_org_id": self.uuid, "zulip_org_key": "invalid", "next_page": "billing"},
subdomain="selfhosting",
)
self.assertEqual(result.status_code, 200)
self.assert_in_success_response(["Invalid zulip_org_key for this zulip_org_id."], result)
# The next_page param should be preserved in the form.
self.assert_in_success_response(
['<input type="hidden" name="next_page" value="billing" />'], result
)
def test_server_billing_unauthed(self) -> None:
hamlet = self.example_user("hamlet")
now = timezone_now()
# Try to open a page with no auth at all.
result = self.client_get(
f"/server/{self.uuid}/billing/",
subdomain="selfhosting",
HTTP_ACCEPT="text/html, */*;q=0.8",
)
self.assertEqual(result.status_code, 302)
# Redirects to the login form with appropriate next_page value.
self.assertEqual(result["Location"], "/serverlogin/?next_page=billing")
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response(
['<input type="hidden" name="next_page" value="billing" />'], result
)
# The full auth flow involves clicking a confirmation link, upon which the user is
# granted an authenticated session. However, in the first part of the process,
# an intermittent session state is created to transition between endpoints.
# The bottom line is that this session state should *not* grant the user actual
# access to the billing management endpoints.
# We verify that here by simulating the user *not* clicking the confirmation link,
# and then trying to access billing management with the intermittent session state.
with time_machine.travel(now, tick=False):
self.execute_remote_billing_authentication_flow(
hamlet.delivery_email,
hamlet.full_name,
next_page="upgrade",
return_without_clicking_confirmation_link=True,
)
result = self.client_get(
f"/server/{self.uuid}/billing/",
subdomain="selfhosting",
HTTP_ACCEPT="text/html, */*;q=0.8",
)
self.assertEqual(result.status_code, 302)
# Redirects to the login form with appropriate next_page value.
self.assertEqual(result["Location"], "/serverlogin/?next_page=billing")
# Now authenticate, going to the /upgrade page since we'll be able to access
# it directly without annoying extra redirects.
with time_machine.travel(now, tick=False):
result = self.execute_remote_billing_authentication_flow(
hamlet.delivery_email, hamlet.full_name, next_page="upgrade"
)
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], f"/server/{self.uuid}/upgrade/")
# Sanity check: access on the upgrade page is granted.
# TODO: Add test for the case when redirected to error page (Not yet implemented)
# due to MissingDataError i.e., when 'has_stale_audit_log' is True.
with mock.patch("corporate.lib.stripe.has_stale_audit_log", return_value=False):
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response([f"Upgrade {self.server.hostname}"], result)
# Now we can simulate an expired identity dict in the session.
with time_machine.travel(
now + timedelta(seconds=REMOTE_BILLING_SESSION_VALIDITY_SECONDS + 30),
tick=False,
):
result = self.client_get(
f"/server/{self.uuid}/upgrade/",
subdomain="selfhosting",
HTTP_ACCEPT="text/html, */*;q=0.8",
)
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], "/serverlogin/?next_page=upgrade")
def test_remote_billing_authentication_flow_tos_consent_failure(self) -> None:
hamlet = self.example_user("hamlet")
result = self.execute_remote_billing_authentication_flow(
hamlet.email, hamlet.full_name, expect_tos=True, confirm_tos=False
)
self.assert_json_error(result, "You must accept the Terms of Service to proceed.")
class TestGenerateDeactivationLink(BouncerTestCase):
def test_generate_deactivation_link(self) -> None:
server = self.server
confirmation_url = generate_confirmation_link_for_server_deactivation(
server, validity_in_minutes=60
)
result = self.client_get(confirmation_url, subdomain="selfhosting")
self.assert_in_success_response(
["Log in to deactivate registration for", server.contact_email], result
)
payload = {"full_name": "test", "tos_consent": "true"}
result = self.client_post(confirmation_url, payload, subdomain="selfhosting")
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], f"/server/{server.uuid!s}/deactivate/")
result = self.client_get(result["Location"], subdomain="selfhosting")
self.assert_in_success_response(
[
"You are about to deactivate this server's",
server.hostname,
f'action="/server/{server.uuid!s}/deactivate/"',
],
result,
)
result = self.client_post(
f"/server/{server.uuid!s}/deactivate/", {"confirmed": "true"}, subdomain="selfhosting"
)
self.assert_in_success_response(
[f"Registration deactivated for<br />{server.hostname}"], result
)
server.refresh_from_db()
self.assertEqual(server.deactivated, True)