diff --git a/zerver/tests/test_auth_backends.py b/zerver/tests/test_auth_backends.py index 80bd1f29a1..28abc745de 100644 --- a/zerver/tests/test_auth_backends.py +++ b/zerver/tests/test_auth_backends.py @@ -10,6 +10,7 @@ from typing import Any, Callable, Dict, List, Optional, Set, Tuple from oauth2client.crypt import AppIdentityError from django.core import signing from django.urls import reverse + import httpretty import os @@ -404,12 +405,25 @@ class ResponseMock: def text(self) -> str: return "Response text" -class GitHubAuthBackendTest(ZulipTestCase): +class SocialAuthBase(ZulipTestCase): + """This is a base class for testing social-auth backends. Following + methods can be overrided by subclasses as per the backend: + + registerExtraEndpoints() - If the backend being tested calls some extra + endpoints then they can be added here. + + get_account_data_dict() - Return the data returned by the user info endpoint + according to the respective backend. + """ + # Don't run base class tests, make sure to set it to False + # in subclass otherwise its tests will not run. + __unittest_skip__ = True + def setUp(self) -> None: self.user_profile = self.example_user('hamlet') self.email = self.user_profile.email self.name = 'Hamlet' - self.backend = GitHubAuthBackend() + self.backend = self.BACKEND_CLASS self.backend.strategy = DjangoStrategy(storage=BaseDjangoStorage()) self.user_profile.backend = self.backend @@ -421,13 +435,13 @@ class GitHubAuthBackendTest(ZulipTestCase): from social_core.backends.utils import load_backends load_backends(settings.AUTHENTICATION_BACKENDS, force_load=True) - def github_oauth2_test(self, account_data_dict: Dict[str, str], - *, subdomain: Optional[str]=None, - mobile_flow_otp: Optional[str]=None, - is_signup: Optional[str]=None, - email_data: Optional[List[Dict[str, Any]]]=None, - next: str='') -> HttpResponse: - url = "/accounts/login/social/github" + def social_auth_test(self, account_data_dict: Dict[str, str], + *, subdomain: Optional[str]=None, + mobile_flow_otp: Optional[str]=None, + is_signup: Optional[str]=None, + next: str='', + **extra_data: Any) -> HttpResponse: + url = self.LOGIN_URL params = {} headers = {} if subdomain is not None: @@ -436,90 +450,71 @@ class GitHubAuthBackendTest(ZulipTestCase): params['mobile_flow_otp'] = mobile_flow_otp headers['HTTP_USER_AGENT'] = "ZulipAndroid" if is_signup is not None: - url = "/accounts/register/social/github" + url = self.SIGNUP_URL params['next'] = next if len(params) > 0: url += "?%s" % (urllib.parse.urlencode(params)) result = self.client_get(url, **headers) - expected_result_url_prefix = 'http://testserver/login/github/' + expected_result_url_prefix = 'http://testserver/login/%s/' % (self.backend.name,) if settings.SOCIAL_AUTH_SUBDOMAIN is not None: - expected_result_url_prefix = ('http://%s.testserver/login/github/' % - settings.SOCIAL_AUTH_SUBDOMAIN) + expected_result_url_prefix = ('http://%s.testserver/login/%s/' % + (settings.SOCIAL_AUTH_SUBDOMAIN, self.backend.name,)) if result.status_code != 302 or not result.url.startswith(expected_result_url_prefix): return result result = self.client_get(result.url, **headers) self.assertEqual(result.status_code, 302) - assert 'https://github.com/login/oauth/authorize' in result.url + assert self.AUTHORIZATION_URL in result.url self.client.cookies = result.cookies # Next, the browser requests result["Location"], and gets - # redirected back to /complete/github. + # redirected back to the registered redirect uri. token_data_dict = { 'access_token': 'foobar', 'token_type': 'bearer' } - if not email_data: - # Keeping a verified email before the primary email makes sure - # get_verified_emails puts the primary email at the start of the - # email list returned as social_associate_user_helper assumes the - # first email as the primary email. - email_data = [ - dict(email="notprimary@example.com", - verified=True), - dict(email=account_data_dict["email"], - verified=True, - primary=True), - dict(email="ignored@example.com", - verified=False), - ] - # We register callbacks for the key URLs on github.com that - # /complete/github will call + # We register callbacks for the key URLs on Identity Provider that + # auth completion url will call httpretty.enable() httpretty.register_uri( httpretty.POST, - "https://github.com/login/oauth/access_token", + self.ACCESS_TOKEN_URL, match_querystring=False, status=200, body=json.dumps(token_data_dict)) httpretty.register_uri( httpretty.GET, - "https://api.github.com/user", + self.USER_INFO_URL, status=200, body=json.dumps(account_data_dict) ) - httpretty.register_uri( - httpretty.GET, - "https://api.github.com/user/emails", - status=200, - body=json.dumps(email_data) - ) + self.registerExtraEndpoints(account_data_dict, **extra_data) parsed_url = urllib.parse.urlparse(result.url) csrf_state = urllib.parse.parse_qs(parsed_url.query)['state'] - result = self.client_get("/complete/github/", + result = self.client_get(self.AUTH_FINISH_URL, dict(state=csrf_state), **headers) httpretty.disable() return result - @override_settings(SOCIAL_AUTH_GITHUB_KEY=None) - def test_github_oauth2_no_key(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip', next='/user_uploads/image') - self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/config-error/github") + def test_social_auth_no_key(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + with self.settings(**{self.CLIENT_KEY_SETTING: None}): + result = self.social_auth_test(account_data_dict, + subdomain='zulip', next='/user_uploads/image') + self.assertEqual(result.status_code, 302) + self.assertEqual(result.url, self.CONFIG_ERROR_URL) - def test_github_oauth2_success(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip', next='/user_uploads/image') + def test_social_auth_success(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + result = self.social_auth_test(account_data_dict, + subdomain='zulip', next='/user_uploads/image') data = load_subdomain_token(result) self.assertEqual(data['email'], self.example_email("hamlet")) self.assertEqual(data['name'], 'Hamlet') @@ -532,10 +527,11 @@ class GitHubAuthBackendTest(ZulipTestCase): self.assertTrue(uri.startswith('http://zulip.testserver/accounts/login/subdomain/')) @override_settings(SOCIAL_AUTH_SUBDOMAIN=None) - def test_github_when_social_auth_subdomain_is_not_set(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip', next='/user_uploads/image') + def test_when_social_auth_subdomain_is_not_set(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + result = self.social_auth_test(account_data_dict, + subdomain='zulip', + next='/user_uploads/image') data = load_subdomain_token(result) self.assertEqual(data['email'], self.example_email("hamlet")) self.assertEqual(data['name'], 'Hamlet') @@ -547,133 +543,69 @@ class GitHubAuthBackendTest(ZulipTestCase): parsed_url.path) self.assertTrue(uri.startswith('http://zulip.testserver/accounts/login/subdomain/')) - def test_github_oauth2_email_not_verified(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - email_data = [ - dict(email=account_data_dict["email"], - verified=False, - primary=True), - ] - with mock.patch('logging.warning') as mock_warning: - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip', - email_data=email_data) - self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") - mock_warning.assert_called_once_with("Social auth (GitHub) failed " - "because user has no verified emails") - - @override_settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp') - def test_github_oauth2_github_team_not_member_failed(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - with mock.patch('social_core.backends.github.GithubTeamOAuth2.user_data', - side_effect=AuthFailed('Not found')), \ - mock.patch('logging.info') as mock_info: - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip') - self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") - mock_info.assert_called_once_with("GitHub user is not member of required team") - - @override_settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp') - def test_github_oauth2_github_team_member_success(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - with mock.patch('social_core.backends.github.GithubTeamOAuth2.user_data', - return_value=account_data_dict): - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip') - data = load_subdomain_token(result) - self.assertEqual(data['email'], self.example_email("hamlet")) - self.assertEqual(data['name'], 'Hamlet') - self.assertEqual(data['subdomain'], 'zulip') - - @override_settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip') - def test_github_oauth2_github_organization_not_member_failed(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - with mock.patch('social_core.backends.github.GithubOrganizationOAuth2.user_data', - side_effect=AuthFailed('Not found')), \ - mock.patch('logging.info') as mock_info: - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip') - self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") - mock_info.assert_called_once_with("GitHub user is not member of required organization") - - @override_settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip') - def test_github_oauth2_github_organization_member_success(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - with mock.patch('social_core.backends.github.GithubOrganizationOAuth2.user_data', - return_value=account_data_dict): - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip') - data = load_subdomain_token(result) - self.assertEqual(data['email'], self.example_email("hamlet")) - self.assertEqual(data['name'], 'Hamlet') - self.assertEqual(data['subdomain'], 'zulip') - - def test_github_oauth2_deactivated_user(self) -> None: + def test_social_auth_deactivated_user(self) -> None: user_profile = self.example_user("hamlet") do_deactivate_user(user_profile) - account_data_dict = dict(email=self.email, name=self.name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip') + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + result = self.social_auth_test(account_data_dict, + subdomain='zulip') self.assertEqual(result.status_code, 302) self.assertEqual(result.url, "/login/") # TODO: verify whether we provide a clear error message - def test_github_oauth2_invalid_realm(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) + def test_social_auth_invalid_realm(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) with mock.patch('zerver.middleware.get_realm', return_value=get_realm("zulip")): # This mock.patch case somewhat hackishly arranges it so # that we switch realms halfway through the test - result = self.github_oauth2_test(account_data_dict, - subdomain='invalid', next='/user_uploads/image') + result = self.social_auth_test(account_data_dict, + subdomain='invalid', next='/user_uploads/image') self.assertEqual(result.status_code, 302) self.assertEqual(result.url, "/accounts/login/?subdomain=1") - def test_github_oauth2_invalid_email(self) -> None: - account_data_dict = dict(email="invalid", name=self.name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip', next='/user_uploads/image') + def test_social_auth_invalid_email(self) -> None: + account_data_dict = self.get_account_data_dict(email="invalid", name=self.name) + result = self.social_auth_test(account_data_dict, + subdomain='zulip', next='/user_uploads/image') self.assertEqual(result.status_code, 302) self.assertEqual(result.url, "/login/?next=/user_uploads/image") def test_user_cannot_log_into_nonexisting_realm(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - result = self.github_oauth2_test(account_data_dict, - subdomain='nonexistent') + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + result = self.social_auth_test(account_data_dict, + subdomain='nonexistent') self.assert_in_success_response(["There is no Zulip organization hosted at this subdomain."], result) def test_user_cannot_log_into_wrong_subdomain(self) -> None: - account_data_dict = dict(email=self.email, name=self.name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zephyr') + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + result = self.social_auth_test(account_data_dict, + subdomain='zephyr') self.assertTrue(result.url.startswith("http://zephyr.testserver/accounts/login/subdomain/")) result = self.client_get(result.url.replace('http://zephyr.testserver', ''), subdomain="zephyr") self.assert_in_success_response(['Your email address, hamlet@zulip.com, is not in one of the domains ', 'that are allowed to register for accounts in this organization.'], result) - def test_github_oauth2_mobile_success(self) -> None: + def test_social_auth_mobile_success(self) -> None: mobile_flow_otp = '1234abcd' * 8 - account_data_dict = dict(email=self.email, name='Full Name') + account_data_dict = self.get_account_data_dict(email=self.email, name='Full Name') self.assertEqual(len(mail.outbox), 0) self.user_profile.date_joined = timezone_now() - datetime.timedelta(seconds=JUST_CREATED_THRESHOLD + 1) self.user_profile.save() with self.settings(SEND_LOGIN_EMAILS=True): # Verify that the right thing happens with an invalid-format OTP - result = self.github_oauth2_test(account_data_dict, subdomain='zulip', - mobile_flow_otp="1234") + result = self.social_auth_test(account_data_dict, subdomain='zulip', + mobile_flow_otp="1234") self.assert_json_error(result, "Invalid OTP") - result = self.github_oauth2_test(account_data_dict, subdomain='zulip', - mobile_flow_otp="invalido" * 8) + result = self.social_auth_test(account_data_dict, subdomain='zulip', + mobile_flow_otp="invalido" * 8) self.assert_json_error(result, "Invalid OTP") # Now do it correctly - result = self.github_oauth2_test(account_data_dict, subdomain='zulip', - mobile_flow_otp=mobile_flow_otp) + result = self.social_auth_test(account_data_dict, subdomain='zulip', + mobile_flow_otp=mobile_flow_otp) self.assertEqual(result.status_code, 302) redirect_url = result['Location'] parsed_url = urllib.parse.urlparse(redirect_url) @@ -687,13 +619,13 @@ class GitHubAuthBackendTest(ZulipTestCase): self.assertEqual(len(mail.outbox), 1) self.assertIn('Zulip on Android', mail.outbox[0].body) - def test_github_oauth2_registration_existing_account(self) -> None: + def test_social_auth_registration_existing_account(self) -> None: """If the user already exists, signup flow just logs them in""" email = "hamlet@zulip.com" name = 'Full Name' - account_data_dict = dict(email=email, name=name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip', is_signup='1') + account_data_dict = self.get_account_data_dict(email=email, name=name) + result = self.social_auth_test(account_data_dict, + subdomain='zulip', is_signup='1') data = load_subdomain_token(result) self.assertEqual(data['email'], self.example_email("hamlet")) self.assertEqual(data['name'], 'Full Name') @@ -707,14 +639,14 @@ class GitHubAuthBackendTest(ZulipTestCase): # Name wasn't changed at all self.assertEqual(hamlet.full_name, "King Hamlet") - def test_github_oauth2_registration(self) -> None: - """If the user doesn't exist yet, GitHub auth can be used to register an account""" + def test_social_auth_registration(self) -> None: + """If the user doesn't exist yet, social auth can be used to register an account""" email = "newuser@zulip.com" name = 'Full Name' realm = get_realm("zulip") - account_data_dict = dict(email=email, name=name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip', is_signup='1') + account_data_dict = self.get_account_data_dict(email=email, name=name) + result = self.social_auth_test(account_data_dict, + subdomain='zulip', is_signup='1') data = load_subdomain_token(result) self.assertEqual(data['email'], email) @@ -755,13 +687,13 @@ class GitHubAuthBackendTest(ZulipTestCase): user_profile = get_user(email, realm) self.assertEqual(get_session_dict_user(self.client.session), user_profile.id) - def test_github_oauth2_registration_without_is_signup(self) -> None: - """If the user doesn't exist yet, GitHub auth can be used to register an account""" + def test_social_auth_registration_without_is_signup(self) -> None: + """If `is_signup` is not set then a new account isn't created""" email = "newuser@zulip.com" name = 'Full Name' - account_data_dict = dict(email=email, name=name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip') + account_data_dict = self.get_account_data_dict(email=email, name=name) + result = self.social_auth_test(account_data_dict, + subdomain='zulip') self.assertEqual(result.status_code, 302) data = load_subdomain_token(result) self.assertEqual(data['email'], email) @@ -777,13 +709,13 @@ class GitHubAuthBackendTest(ZulipTestCase): self.assertEqual(result.status_code, 200) self.assert_in_response("No account found for newuser@zulip.com.", result) - def test_github_oauth2_registration_without_is_signup_closed_realm(self) -> None: + def test_social_auth_registration_without_is_signup_closed_realm(self) -> None: """If the user doesn't exist yet in closed realm, give an error""" email = "nonexisting@phantom.com" name = 'Full Name' - account_data_dict = dict(email=email, name=name) - result = self.github_oauth2_test(account_data_dict, - subdomain='zulip') + account_data_dict = self.get_account_data_dict(email=email, name=name) + result = self.social_auth_test(account_data_dict, + subdomain='zulip') self.assertEqual(result.status_code, 302) data = load_subdomain_token(result) self.assertEqual(data['email'], email) @@ -802,21 +734,126 @@ class GitHubAuthBackendTest(ZulipTestCase): 'in one of the domains that are allowed to register ' 'for accounts in this organization.'.format(email), result) - def test_github_complete(self) -> None: + def test_social_auth_complete(self) -> None: with mock.patch('social_core.backends.oauth.BaseOAuth2.process_error', side_effect=AuthFailed('Not found')): - result = self.client_get(reverse('social:complete', args=['github'])) + result = self.client_get(reverse('social:complete', args=[self.backend.name])) self.assertEqual(result.status_code, 302) self.assertIn('login', result.url) - def test_github_complete_when_base_exc_is_raised(self) -> None: + def test_social_auth_complete_when_base_exc_is_raised(self) -> None: with mock.patch('social_core.backends.oauth.BaseOAuth2.auth_complete', side_effect=AuthStateForbidden('State forbidden')), \ mock.patch('zproject.backends.logging.warning'): - result = self.client_get(reverse('social:complete', args=['github'])) + result = self.client_get(reverse('social:complete', args=[self.backend.name])) self.assertEqual(result.status_code, 302) self.assertIn('login', result.url) +class GitHubAuthBackendTest(SocialAuthBase): + __unittest_skip__ = False + + BACKEND_CLASS = GitHubAuthBackend + CLIENT_KEY_SETTING = "SOCIAL_AUTH_GITHUB_KEY" + LOGIN_URL = "/accounts/login/social/github" + SIGNUP_URL = "/accounts/register/social/github" + AUTHORIZATION_URL = "https://github.com/login/oauth/authorize" + ACCESS_TOKEN_URL = "https://github.com/login/oauth/access_token" + USER_INFO_URL = "https://api.github.com/user" + AUTH_FINISH_URL = "/complete/github/" + CONFIG_ERROR_URL = "/config-error/github" + + def registerExtraEndpoints(self, + account_data_dict: Dict[str, str], + **extra_data: Any) -> None: + # Keeping a verified email before the primary email makes sure + # get_verified_emails puts the primary email at the start of the + # email list returned as social_associate_user_helper assumes the + # first email as the primary email. + email_data = [ + dict(email="notprimary@example.com", + verified=True), + dict(email=account_data_dict["email"], + verified=True, + primary=True), + dict(email="ignored@example.com", + verified=False), + ] + email_data = extra_data.get("email_data", email_data) + + httpretty.register_uri( + httpretty.GET, + "https://api.github.com/user/emails", + status=200, + body=json.dumps(email_data) + ) + + def get_account_data_dict(self, email: str, name: str) -> Dict[str, Any]: + return dict(email=email, name=name) + + def test_social_auth_email_not_verified(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + email_data = [ + dict(email=account_data_dict["email"], + verified=False, + primary=True), + ] + with mock.patch('logging.warning') as mock_warning: + result = self.social_auth_test(account_data_dict, + subdomain='zulip', + email_data=email_data) + self.assertEqual(result.status_code, 302) + self.assertEqual(result.url, "/login/") + mock_warning.assert_called_once_with("Social auth (GitHub) failed " + "because user has no verified emails") + + @override_settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp') + def test_social_auth_github_team_not_member_failed(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + with mock.patch('social_core.backends.github.GithubTeamOAuth2.user_data', + side_effect=AuthFailed('Not found')), \ + mock.patch('logging.info') as mock_info: + result = self.social_auth_test(account_data_dict, + subdomain='zulip') + self.assertEqual(result.status_code, 302) + self.assertEqual(result.url, "/login/") + mock_info.assert_called_once_with("GitHub user is not member of required team") + + @override_settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp') + def test_social_auth_github_team_member_success(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + with mock.patch('social_core.backends.github.GithubTeamOAuth2.user_data', + return_value=account_data_dict): + result = self.social_auth_test(account_data_dict, + subdomain='zulip') + data = load_subdomain_token(result) + self.assertEqual(data['email'], self.example_email("hamlet")) + self.assertEqual(data['name'], 'Hamlet') + self.assertEqual(data['subdomain'], 'zulip') + + @override_settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip') + def test_social_auth_github_organization_not_member_failed(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + with mock.patch('social_core.backends.github.GithubOrganizationOAuth2.user_data', + side_effect=AuthFailed('Not found')), \ + mock.patch('logging.info') as mock_info: + result = self.social_auth_test(account_data_dict, + subdomain='zulip') + self.assertEqual(result.status_code, 302) + self.assertEqual(result.url, "/login/") + mock_info.assert_called_once_with("GitHub user is not member of required organization") + + @override_settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip') + def test_social_auth_github_organization_member_success(self) -> None: + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + with mock.patch('social_core.backends.github.GithubOrganizationOAuth2.user_data', + return_value=account_data_dict): + result = self.social_auth_test(account_data_dict, + subdomain='zulip') + data = load_subdomain_token(result) + self.assertEqual(data['email'], self.example_email("hamlet")) + self.assertEqual(data['name'], 'Hamlet') + self.assertEqual(data['subdomain'], 'zulip') + def test_github_auth_enabled(self) -> None: with self.settings(AUTHENTICATION_BACKENDS=('zproject.backends.GitHubAuthBackend',)): self.assertTrue(github_auth_enabled())