tests: Add uuid_get and uuid_post.

We want a clean codepath for the vast majority
of cases of using api_get/api_post, which now
uses email and which we'll soon convert to
accepting `user` as a parameter.

These apis that take two different types of
values for the same parameter make sweeps
like this kinda painful, and they're pretty
easy to avoid by extracting helpers to do
the actual common tasks.  So, for example,
here I still keep a common method to
actually encode the credentials (since
the whole encode/decode business is an
annoying detail that you don't want to fix
in two places):

    def encode_credentials(self, identifier: str, api_key: str) -> str:
        """
        identifier: Can be an email or a remote server uuid.
        """
        credentials = "%s:%s" % (identifier, api_key)
        return 'Basic ' + base64.b64encode(credentials.encode('utf-8')).decode('utf-8')

But then the rest of the code has two separate
codepaths.

And for the uuid functions, we no longer have
crufty references to realm.  (In fairness, realm
will also go away when we introduce users.)

For the `is_remote_server` helper, I just inlined
it, since it's now only needed in one place, and the
name didn't make total sense anyway, plus it wasn't
a super robust check.  In context, it's easier
just to use a comment now to say what we're doing:

    # If `role` doesn't look like an email, it might be a uuid.
    if settings.ZILENCER_ENABLED and role is not None and '@' not in role:
        # do stuff
This commit is contained in:
Steve Howell
2020-03-10 11:34:25 +00:00
committed by Tim Abbott
parent 00dc976379
commit 626ad0078d
6 changed files with 91 additions and 71 deletions

View File

@@ -21,7 +21,7 @@ from zerver.lib.exceptions import UnexpectedWebhookEventType
from zerver.lib.queue import queue_json_publish
from zerver.lib.subdomains import get_subdomain, user_matches_subdomain
from zerver.lib.timestamp import datetime_to_timestamp, timestamp_to_datetime
from zerver.lib.utils import statsd, is_remote_server, has_api_key_format
from zerver.lib.utils import statsd, has_api_key_format
from zerver.lib.exceptions import JsonableError, ErrorCode, \
InvalidJSONError, InvalidAPIKeyError, InvalidAPIKeyFormatError, \
OrganizationAdministratorRequired
@@ -186,7 +186,8 @@ def validate_api_key(request: HttpRequest, role: Optional[str],
if role is not None:
role = role.strip()
if settings.ZILENCER_ENABLED and role is not None and is_remote_server(role):
# If `role` doesn't look like an email, it might be a uuid.
if settings.ZILENCER_ENABLED and role is not None and '@' not in role:
try:
remote_server = get_remote_server_by_uuid(role)
except RemoteZulipServer.DoesNotExist:

View File

@@ -22,7 +22,6 @@ from django.utils import translation
from two_factor.models import PhoneDevice
from zerver.lib.initial_password import initial_password
from zerver.lib.utils import is_remote_server
from zerver.lib.users import get_api_key
from zerver.lib.sessions import get_session_dict_user
from zerver.lib.webhooks.common import get_fixture_http_headers, standardize_headers
@@ -419,37 +418,58 @@ class ZulipTestCase(TestCase):
else:
raise AssertionError("Couldn't find a confirmation email.")
def encode_credentials(self, identifier: str, realm: str="zulip") -> str:
def encode_uuid(self, uuid: str) -> str:
"""
identifier: Can be an email or a remote server uuid.
"""
if identifier in self.API_KEYS:
api_key = self.API_KEYS[identifier]
if uuid in self.API_KEYS:
api_key = self.API_KEYS[uuid]
else:
if is_remote_server(identifier):
api_key = get_remote_server_by_uuid(identifier).api_key
else:
user = get_user_by_delivery_email(identifier, get_realm(realm))
api_key = get_api_key(user)
self.API_KEYS[identifier] = api_key
api_key = get_remote_server_by_uuid(uuid).api_key
self.API_KEYS[uuid] = api_key
return self.encode_credentials(uuid, api_key)
def encode_email(self, email: str, realm: str="zulip") -> str:
assert '@' in email
if email in self.API_KEYS:
api_key = self.API_KEYS[email]
else:
user = get_user_by_delivery_email(email, get_realm(realm))
api_key = get_api_key(user)
self.API_KEYS[email] = api_key
return self.encode_credentials(email, api_key)
def encode_credentials(self, identifier: str, api_key: str) -> str:
"""
identifier: Can be an email or a remote server uuid.
"""
credentials = "%s:%s" % (identifier, api_key)
return 'Basic ' + base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
def uuid_get(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_uuid(identifier)
return self.client_get(*args, **kwargs)
def uuid_post(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_uuid(identifier)
return self.client_post(*args, **kwargs)
def api_get(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(identifier, kwargs.get('subdomain', 'zulip'))
kwargs['HTTP_AUTHORIZATION'] = self.encode_email(identifier, kwargs.get('subdomain', 'zulip'))
return self.client_get(*args, **kwargs)
def api_post(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(identifier, kwargs.get('subdomain', 'zulip'))
kwargs['HTTP_AUTHORIZATION'] = self.encode_email(identifier, kwargs.get('subdomain', 'zulip'))
return self.client_post(*args, **kwargs)
def api_patch(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(identifier, kwargs.get('subdomain', 'zulip'))
kwargs['HTTP_AUTHORIZATION'] = self.encode_email(identifier, kwargs.get('subdomain', 'zulip'))
return self.client_patch(*args, **kwargs)
def api_delete(self, identifier: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(identifier, kwargs.get('subdomain', 'zulip'))
kwargs['HTTP_AUTHORIZATION'] = self.encode_email(identifier, kwargs.get('subdomain', 'zulip'))
return self.client_delete(*args, **kwargs)
def get_streams(self, user_profile: UserProfile) -> List[str]:
@@ -846,7 +866,7 @@ class WebhookTestCase(ZulipTestCase):
self.url = self.build_webhook_url()
def api_stream_message(self, email: str, *args: Any, **kwargs: Any) -> HttpResponse:
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
kwargs['HTTP_AUTHORIZATION'] = self.encode_email(email)
return self.send_and_test_stream_message(*args, **kwargs)
def send_and_test_stream_message(self, fixture_name: str, expected_topic: Optional[str]=None,

View File

@@ -198,11 +198,3 @@ def split_by(array: List[Any], group_size: int, filler: Any) -> List[List[Any]]:
"""
args = [iter(array)] * group_size
return list(map(list, zip_longest(*args, fillvalue=filler)))
def is_remote_server(identifier: str) -> bool:
"""
This function can be used to identify the source of API auth
request. We can have two types of sources, Remote Zulip Servers
and UserProfiles.
"""
return "@" not in identifier

View File

@@ -408,7 +408,7 @@ class SkipRateLimitingTest(ZulipTestCase):
return json_success()
request = HostRequestMock(host="zulip.testserver")
request.META['HTTP_AUTHORIZATION'] = self.encode_credentials(self.example_email("hamlet"))
request.META['HTTP_AUTHORIZATION'] = self.encode_email(self.example_email("hamlet"))
request.method = 'POST'
with mock.patch('zerver.decorator.rate_limit') as rate_limit_mock:
@@ -476,7 +476,7 @@ class DecoratorLoggingTestCase(ZulipTestCase):
webhook_bot_realm = get_realm('zulip')
request = HostRequestMock()
request.META['HTTP_AUTHORIZATION'] = self.encode_credentials(webhook_bot_email)
request.META['HTTP_AUTHORIZATION'] = self.encode_email(webhook_bot_email)
request.method = 'POST'
request.host = "zulip.testserver"
@@ -519,7 +519,7 @@ body:
webhook_bot_realm = get_realm('zulip')
request = HostRequestMock()
request.META['HTTP_AUTHORIZATION'] = self.encode_credentials(webhook_bot_email)
request.META['HTTP_AUTHORIZATION'] = self.encode_email(webhook_bot_email)
request.method = 'POST'
request.host = "zulip.testserver"
@@ -560,7 +560,7 @@ body:
raise Exception("raised by a non-webhook view")
request = HostRequestMock()
request.META['HTTP_AUTHORIZATION'] = self.encode_credentials("aaron@zulip.com")
request.META['HTTP_AUTHORIZATION'] = self.encode_email("aaron@zulip.com")
request.method = 'POST'
request.host = "zulip.testserver"

View File

@@ -98,15 +98,18 @@ class BouncerTestCase(ZulipTestCase):
# args[0] is method, args[1] is URL.
local_url = args[1].replace(settings.PUSH_NOTIFICATION_BOUNCER_URL, "")
if args[0] == "POST":
result = self.api_post(self.server_uuid,
result = self.uuid_post(
self.server_uuid,
local_url,
kwargs['data'],
subdomain="")
subdomain='')
elif args[0] == "GET":
result = self.api_get(self.server_uuid,
result = self.uuid_get(
self.server_uuid,
local_url,
kwargs['data'],
subdomain="")
subdomain='')
else:
raise AssertionError("Unsupported method for bounce_request")
return result
@@ -128,9 +131,9 @@ class PushBouncerNotificationTest(BouncerTestCase):
token_kind = PushDeviceToken.GCM
endpoint = '/api/v1/remotes/push/unregister'
result = self.api_post(self.server_uuid, endpoint, {'token_kind': token_kind})
result = self.uuid_post(self.server_uuid, endpoint, {'token_kind': token_kind})
self.assert_json_error(result, "Missing 'token' argument")
result = self.api_post(self.server_uuid, endpoint, {'token': token})
result = self.uuid_post(self.server_uuid, endpoint, {'token': token})
self.assert_json_error(result, "Missing 'token_kind' argument")
# We need the root ('') subdomain to be in use for this next
@@ -152,13 +155,13 @@ class PushBouncerNotificationTest(BouncerTestCase):
endpoint = '/api/v1/remotes/push/register'
result = self.api_post(self.server_uuid, endpoint, {'user_id': user_id, 'token_kind': token_kind})
result = self.uuid_post(self.server_uuid, endpoint, {'user_id': user_id, 'token_kind': token_kind})
self.assert_json_error(result, "Missing 'token' argument")
result = self.api_post(self.server_uuid, endpoint, {'user_id': user_id, 'token': token})
result = self.uuid_post(self.server_uuid, endpoint, {'user_id': user_id, 'token': token})
self.assert_json_error(result, "Missing 'token_kind' argument")
result = self.api_post(self.server_uuid, endpoint, {'token': token, 'token_kind': token_kind})
result = self.uuid_post(self.server_uuid, endpoint, {'token': token, 'token_kind': token_kind})
self.assert_json_error(result, "Missing 'user_id' argument")
result = self.api_post(self.server_uuid, endpoint, {'user_id': user_id, 'token': token, 'token_kind': 17})
result = self.uuid_post(self.server_uuid, endpoint, {'user_id': user_id, 'token': token, 'token_kind': 17})
self.assert_json_error(result, "Invalid token type")
result = self.api_post(self.example_email("hamlet"), endpoint, {'user_id': user_id,
@@ -178,9 +181,10 @@ class PushBouncerNotificationTest(BouncerTestCase):
'token': token})
self.assert_json_error(result, "Must validate with valid Zulip server API key")
result = self.api_post(self.server_uuid, endpoint, {'user_id': user_id,
'token_kind': token_kind,
'token': token},
result = self.uuid_post(
self.server_uuid,
endpoint,
dict(user_id=user_id, token_kind=token_kind, token=token),
subdomain="zulip")
self.assert_json_error(result, "Invalid subdomain for push notifications bouncer",
status_code=401)
@@ -188,9 +192,10 @@ class PushBouncerNotificationTest(BouncerTestCase):
# We do a bit of hackery here to the API_KEYS cache just to
# make the code simple for sending an incorrect API key.
self.API_KEYS[self.server_uuid] = 'invalid'
result = self.api_post(self.server_uuid, endpoint, {'user_id': user_id,
'token_kind': token_kind,
'token': token})
result = self.uuid_post(
self.server_uuid,
endpoint,
dict(user_id=user_id, token_kind=token_kind, token=token))
self.assert_json_error(result, "Zulip server auth failure: key does not match role 1234-abcd",
status_code=401)
@@ -215,7 +220,7 @@ class PushBouncerNotificationTest(BouncerTestCase):
payload = self.get_generic_payload(method)
# Verify correct results are success
result = self.api_post(self.server_uuid, endpoint, payload)
result = self.uuid_post(self.server_uuid, endpoint, payload)
self.assert_json_success(result)
remote_tokens = RemotePushDeviceToken.objects.filter(token=payload['token'])
@@ -225,20 +230,20 @@ class PushBouncerNotificationTest(BouncerTestCase):
# Try adding/removing tokens that are too big...
broken_token = "x" * 5000 # too big
payload['token'] = broken_token
result = self.api_post(self.server_uuid, endpoint, payload)
result = self.uuid_post(self.server_uuid, endpoint, payload)
self.assert_json_error(result, 'Empty or invalid length token')
def test_remote_push_unregister_all(self) -> None:
payload = self.get_generic_payload('register')
# Verify correct results are success
result = self.api_post(self.server_uuid,
result = self.uuid_post(self.server_uuid,
'/api/v1/remotes/push/register', payload)
self.assert_json_success(result)
remote_tokens = RemotePushDeviceToken.objects.filter(token=payload['token'])
self.assertEqual(len(remote_tokens), 1)
result = self.api_post(self.server_uuid,
result = self.uuid_post(self.server_uuid,
'/api/v1/remotes/push/unregister/all',
dict(user_id=10))
self.assert_json_success(result)
@@ -257,7 +262,7 @@ class PushBouncerNotificationTest(BouncerTestCase):
'token': 'xyz uses non-hex characters',
'token_kind': PushDeviceToken.APNS,
}
result = self.api_post(self.server_uuid, endpoint, payload)
result = self.uuid_post(self.server_uuid, endpoint, payload)
self.assert_json_error(result, 'Invalid APNS token')
@override_settings(PUSH_NOTIFICATION_BOUNCER_URL='https://push.zulip.org.example.com')
@@ -466,7 +471,8 @@ class AnalyticsBouncerTest(BouncerTestCase):
realmauditlog_data) = build_analytics_data(RealmCount.objects.all(),
InstallationCount.objects.all(),
RealmAuditLog.objects.all())
result = self.api_post(self.server_uuid,
result = self.uuid_post(
self.server_uuid,
'/api/v1/remotes/server/analytics',
{'realm_counts': ujson.dumps(realm_count_data),
'installation_counts': ujson.dumps(installation_count_data),
@@ -479,7 +485,7 @@ class AnalyticsBouncerTest(BouncerTestCase):
# IntegrityError that will be thrown in here from breaking
# the unittest transaction.
with transaction.atomic():
result = self.api_post(
result = self.uuid_post(
self.server_uuid,
'/api/v1/remotes/server/analytics',
{'realm_counts': ujson.dumps(realm_count_data),
@@ -656,7 +662,8 @@ class HandlePushNotificationTest(PushNotificationTest):
# args[0] is method, args[1] is URL.
local_url = args[1].replace(settings.PUSH_NOTIFICATION_BOUNCER_URL, "")
if args[0] == "POST":
result = self.api_post(self.server_uuid,
result = self.uuid_post(
self.server_uuid,
local_url,
kwargs['data'],
content_type="application/json")

View File

@@ -51,7 +51,7 @@ Requester Bob <requester-bob@example.com> updated [ticket #11](http://test1234zz
self.url = self.build_webhook_url()
payload = self.get_body('status_changed_fixture_with_missing_key')
kwargs = {
'HTTP_AUTHORIZATION': self.encode_credentials(self.TEST_USER_EMAIL),
'HTTP_AUTHORIZATION': self.encode_email(self.TEST_USER_EMAIL),
'content_type': 'application/x-www-form-urlencoded',
}
result = self.client_post(self.url, payload, **kwargs)
@@ -80,7 +80,7 @@ Requester Bob <requester-bob@example.com> updated [ticket #11](http://test1234zz
self.url = self.build_webhook_url()
payload = self.get_body('unknown_payload')
kwargs = {
'HTTP_AUTHORIZATION': self.encode_credentials(self.TEST_USER_EMAIL),
'HTTP_AUTHORIZATION': self.encode_email(self.TEST_USER_EMAIL),
'content_type': 'application/x-www-form-urlencoded',
}
result = self.client_post(self.url, payload, **kwargs)