diff --git a/zerver/tests/test_auth_backends.py b/zerver/tests/test_auth_backends.py index 2a723a206e..644abb1aeb 100644 --- a/zerver/tests/test_auth_backends.py +++ b/zerver/tests/test_auth_backends.py @@ -4,9 +4,10 @@ from django.http import HttpResponse from django.test import TestCase from django_auth_ldap.backend import _LDAPUser from django.test.client import RequestFactory -from typing import Any, Callable, Dict +from typing import Any, Callable, Dict, Optional from builtins import object from oauth2client.crypt import AppIdentityError +from django.core import signing import jwt import mock @@ -40,6 +41,7 @@ from social.backends.github import GithubOrganizationOAuth2, GithubTeamOAuth2, \ from six import text_type from six.moves import urllib +from six.moves.http_cookies import SimpleCookie import ujson from fakeldap import MockLDAP @@ -485,11 +487,18 @@ class ResponseMock(object): # type: () -> str return "Response text" -class GoogleLoginTest(ZulipTestCase): - def google_oauth2_test(self, token_response, account_response): - # type: (ResponseMock, ResponseMock) -> HttpResponse - result = self.client_get("/accounts/login/google/send/") +class GoogleOAuthTest(ZulipTestCase): + def google_oauth2_test(self, token_response, account_response, subdomain=None): + # type: (ResponseMock, ResponseMock, Optional[str]) -> HttpResponse + url = "/accounts/login/google/send/" + if subdomain is not None: + url += "?subdomain=" + subdomain + + result = self.client_get(url) self.assertEquals(result.status_code, 302) + if 'google' not in result.url: + return result + # Now extract the CSRF token from the redirect URL parsed_url = urllib.parse.urlparse(result.url) csrf_state = urllib.parse.parse_qs(parsed_url.query)['state'] @@ -500,6 +509,100 @@ class GoogleLoginTest(ZulipTestCase): dict(state=csrf_state)) return result +class GoogleSubdomainLoginTest(GoogleOAuthTest): + def get_signed_subdomain_cookie(self, data): + # type: (Dict[str, str]) -> Dict[str, str] + key = 'subdomain.signature' + salt = key + 'zerver.views.auth' + value = ujson.dumps(data) + return {key: signing.get_cookie_signer(salt=salt).sign(value)} + + def unsign_subdomain_cookie(self, result): + # type: (HttpResponse) -> Dict[str, Any] + key = 'subdomain.signature' + salt = key + 'zerver.views.auth' + cookie = result.cookies.get(key) + value = signing.get_cookie_signer(salt=salt).unsign(cookie.value, max_age=15) + return ujson.loads(value) + + def test_google_oauth2_start(self): + # type: () -> None + with mock.patch('zerver.views.auth.get_subdomain', return_value='zulip'): + result = self.client_get('/accounts/login/google/') + self.assertEquals(result.status_code, 302) + parsed_url = urllib.parse.urlparse(result.url) + subdomain = urllib.parse.parse_qs(parsed_url.query)['subdomain'] + self.assertEqual(subdomain, ['zulip']) + + def test_google_oauth2_success(self): + # type: () -> None + token_response = ResponseMock(200, {'access_token': "unique_token"}) + account_data = dict(name=dict(formatted="Full Name"), + emails=[dict(type="account", + value="hamlet@zulip.com")]) + account_response = ResponseMock(200, account_data) + with self.settings(REALMS_HAVE_SUBDOMAINS=True): + result = self.google_oauth2_test(token_response, account_response, 'zulip') + + data = self.unsign_subdomain_cookie(result) + self.assertEqual(data['email'], 'hamlet@zulip.com') + self.assertEqual(data['name'], 'Full Name') + self.assertEqual(data['subdomain'], 'zulip') + self.assertEquals(result.status_code, 302) + parsed_url = urllib.parse.urlparse(result.url) + uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc, + parsed_url.path) + self.assertEquals(uri, 'http://zulip.testserver/accounts/login/subdomain/') + + def test_log_into_subdomain(self): + # type: () -> None + data = {'name': 'Full Name', + 'email': 'hamlet@zulip.com', + 'subdomain': 'zulip'} + + self.client.cookies = SimpleCookie(self.get_signed_subdomain_cookie(data)) + with mock.patch('zerver.views.auth.get_subdomain', return_value='zulip'): + result = self.client_get('/accounts/login/subdomain/') + self.assertEquals(result.status_code, 302) + user_profile = get_user_profile_by_email('hamlet@zulip.com') + self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) + + def test_user_cannot_log_into_nonexisting_realm(self): + # type: () -> None + token_response = ResponseMock(200, {'access_token': "unique_token"}) + account_data = dict(name=dict(formatted="Full Name"), + emails=[dict(type="account", + value="hamlet@zulip.com")]) + account_response = ResponseMock(200, account_data) + result = self.google_oauth2_test(token_response, account_response, 'acme') + self.assertEquals(result.status_code, 302) + self.assertIn('subdomain=1', result.url) + + def test_user_cannot_log_into_wrong_subdomain(self): + # type: () -> None + data = {'name': 'Full Name', + 'email': 'hamlet@zulip.com', + 'subdomain': 'acme'} + + self.client.cookies = SimpleCookie(self.get_signed_subdomain_cookie(data)) + with mock.patch('zerver.views.auth.get_subdomain', return_value='zulip'): + result = self.client_get('/accounts/login/subdomain/') + self.assertEquals(result.status_code, 400) + + def test_log_into_subdomain_when_signature_is_bad(self): + # type: () -> None + self.client.cookies = SimpleCookie({'subdomain.signature': 'invlaid'}) + with mock.patch('zerver.views.auth.get_subdomain', return_value='zulip'): + result = self.client_get('/accounts/login/subdomain/') + self.assertEquals(result.status_code, 400) + + def test_log_into_subdomain_when_state_is_not_passed(self): + # type: () -> None + with mock.patch('zerver.views.auth.get_subdomain', return_value='zulip'): + result = self.client_get('/accounts/login/subdomain/') + self.assertEquals(result.status_code, 400) + +class GoogleLoginTest(GoogleOAuthTest): def test_google_oauth2_success(self): # type: () -> None token_response = ResponseMock(200, {'access_token': "unique_token"})