mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	github: Add a complete end-to-end GitHub OAuth2 test.
This revised GitHub auth backend test is inspired by the end-to-end flow model of the Google auth backend test. My hope is that we will be able to migrate the rest of the important cases in the GitHub auth backend tests to this model and then delete what is now GitHubAuthBackendLegacyTest. The next step after that will be to merge the GitHub and Google auth tests (since actually, the actual test functions are basically identical between the two).
This commit is contained in:
		@@ -11,6 +11,9 @@ from builtins import object
 | 
			
		||||
from oauth2client.crypt import AppIdentityError
 | 
			
		||||
from django.core import signing
 | 
			
		||||
from django.urls import reverse
 | 
			
		||||
import httpretty
 | 
			
		||||
import os
 | 
			
		||||
import sys
 | 
			
		||||
 | 
			
		||||
import jwt
 | 
			
		||||
import mock
 | 
			
		||||
@@ -60,6 +63,7 @@ from social_django.storage import BaseDjangoStorage
 | 
			
		||||
from social_core.backends.github import GithubOrganizationOAuth2, GithubTeamOAuth2, \
 | 
			
		||||
    GithubOAuth2
 | 
			
		||||
 | 
			
		||||
import json
 | 
			
		||||
import urllib
 | 
			
		||||
from http.cookies import SimpleCookie
 | 
			
		||||
import ujson
 | 
			
		||||
@@ -376,6 +380,218 @@ class GitHubAuthBackendTest(ZulipTestCase):
 | 
			
		||||
        request.user = self.user_profile
 | 
			
		||||
        self.backend.strategy.request = request
 | 
			
		||||
 | 
			
		||||
    def github_oauth2_test(self, token_data_dict: Dict[str, str], account_data_dict: Dict[str, str],
 | 
			
		||||
                           *, subdomain: Optional[str]=None,
 | 
			
		||||
                           mobile_flow_otp: Optional[str]=None,
 | 
			
		||||
                           is_signup: Optional[str]=None,
 | 
			
		||||
                           next: str='') -> HttpResponse:
 | 
			
		||||
        url = "/accounts/login/social/github"
 | 
			
		||||
        params = {}
 | 
			
		||||
        headers = {}
 | 
			
		||||
        if subdomain is not None:
 | 
			
		||||
            headers['HTTP_HOST'] = subdomain + ".testserver"
 | 
			
		||||
        if mobile_flow_otp is not None:
 | 
			
		||||
            params['mobile_flow_otp'] = mobile_flow_otp
 | 
			
		||||
            headers['HTTP_USER_AGENT'] = "ZulipAndroid"
 | 
			
		||||
        if is_signup is not None:
 | 
			
		||||
            url = "/accounts/register/social/github"
 | 
			
		||||
        params['next'] = next
 | 
			
		||||
        if len(params) > 0:
 | 
			
		||||
            url += "?%s" % (urllib.parse.urlencode(params))
 | 
			
		||||
 | 
			
		||||
        result = self.client_get(url, **headers)
 | 
			
		||||
        if result.status_code != 302 or 'http://testserver/login/github/' not in result.url:
 | 
			
		||||
            return result
 | 
			
		||||
 | 
			
		||||
        result = self.client_get(result.url, **headers)
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        assert 'https://github.com/login/oauth/authorize' in result.url
 | 
			
		||||
 | 
			
		||||
        self.client.cookies = result.cookies
 | 
			
		||||
 | 
			
		||||
        # Next, the browser requests result["Location"], and gets
 | 
			
		||||
        # redirected back to /complete/github.
 | 
			
		||||
 | 
			
		||||
        # We register callbacks for the key URLs on github.com that
 | 
			
		||||
        # /complete/github will call
 | 
			
		||||
        httpretty.enable()
 | 
			
		||||
        httpretty.register_uri(
 | 
			
		||||
            httpretty.POST,
 | 
			
		||||
            "https://github.com/login/oauth/access_token",
 | 
			
		||||
            match_querystring=False,
 | 
			
		||||
            status=200,
 | 
			
		||||
            body=json.dumps(token_data_dict))
 | 
			
		||||
        httpretty.register_uri(
 | 
			
		||||
            httpretty.GET,
 | 
			
		||||
            "https://api.github.com/user",
 | 
			
		||||
            status=200,
 | 
			
		||||
            body=json.dumps(account_data_dict)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        parsed_url = urllib.parse.urlparse(result.url)
 | 
			
		||||
        csrf_state = urllib.parse.parse_qs(parsed_url.query)['state']
 | 
			
		||||
        result = self.client_get("/complete/github/",
 | 
			
		||||
                                 dict(state=csrf_state), **headers)
 | 
			
		||||
        httpretty.disable()
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
    @override_settings(SOCIAL_AUTH_GITHUB_KEY=None)
 | 
			
		||||
    def test_github_oauth2_no_key(self) -> None:
 | 
			
		||||
        token_data_dict = {
 | 
			
		||||
            'access_token': 'foobar',
 | 
			
		||||
            'token_type': 'bearer'
 | 
			
		||||
        }
 | 
			
		||||
        account_data_dict = dict(email=self.email, name=self.name)
 | 
			
		||||
        result = self.github_oauth2_test(token_data_dict, account_data_dict,
 | 
			
		||||
                                         subdomain='zulip', next='/user_uploads/image')
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        self.assertEqual(result.url, "/config-error/github")
 | 
			
		||||
 | 
			
		||||
    def test_github_oauth2_success(self) -> None:
 | 
			
		||||
        token_data_dict = {
 | 
			
		||||
            'access_token': 'foobar',
 | 
			
		||||
            'token_type': 'bearer'
 | 
			
		||||
        }
 | 
			
		||||
        account_data_dict = dict(email=self.email, name=self.name)
 | 
			
		||||
        result = self.github_oauth2_test(token_data_dict, account_data_dict,
 | 
			
		||||
                                         subdomain='zulip', next='/user_uploads/image')
 | 
			
		||||
        data = load_subdomain_token(result)
 | 
			
		||||
        self.assertEqual(data['email'], self.example_email("hamlet"))
 | 
			
		||||
        self.assertEqual(data['name'], 'Hamlet')
 | 
			
		||||
        self.assertEqual(data['subdomain'], 'zulip')
 | 
			
		||||
        self.assertEqual(data['next'], '/user_uploads/image')
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        parsed_url = urllib.parse.urlparse(result.url)
 | 
			
		||||
        uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
 | 
			
		||||
                                 parsed_url.path)
 | 
			
		||||
        self.assertTrue(uri.startswith('http://zulip.testserver/accounts/login/subdomain/'))
 | 
			
		||||
 | 
			
		||||
    def test_user_cannot_log_into_nonexisting_realm(self) -> None:
 | 
			
		||||
        token_data_dict = {
 | 
			
		||||
            'access_token': 'foobar',
 | 
			
		||||
            'token_type': 'bearer'
 | 
			
		||||
        }
 | 
			
		||||
        account_data_dict = dict(email=self.email, name=self.name)
 | 
			
		||||
        result = self.github_oauth2_test(token_data_dict, account_data_dict,
 | 
			
		||||
                                         subdomain='nonexistent')
 | 
			
		||||
        self.assert_in_success_response(["There is no Zulip organization hosted at this subdomain."],
 | 
			
		||||
                                        result)
 | 
			
		||||
 | 
			
		||||
    def test_user_cannot_log_into_wrong_subdomain(self) -> None:
 | 
			
		||||
        token_data_dict = {
 | 
			
		||||
            'access_token': 'foobar',
 | 
			
		||||
            'token_type': 'bearer'
 | 
			
		||||
        }
 | 
			
		||||
        account_data_dict = dict(email=self.email, name=self.name)
 | 
			
		||||
        result = self.github_oauth2_test(token_data_dict, account_data_dict,
 | 
			
		||||
                                         subdomain='zephyr')
 | 
			
		||||
        self.assertTrue(result.url.startswith("http://zephyr.testserver/accounts/login/subdomain/"))
 | 
			
		||||
        result = self.client_get(result.url.replace('http://zephyr.testserver', ''),
 | 
			
		||||
                                 subdomain="zephyr")
 | 
			
		||||
        self.assert_in_success_response(['Your email address, hamlet@zulip.com, is not in one of the domains ',
 | 
			
		||||
                                         'that are allowed to register for accounts in this organization.'], result)
 | 
			
		||||
 | 
			
		||||
    def test_github_oauth2_mobile_success(self) -> None:
 | 
			
		||||
        mobile_flow_otp = '1234abcd' * 8
 | 
			
		||||
        token_data_dict = {
 | 
			
		||||
            'access_token': 'foobar',
 | 
			
		||||
            'token_type': 'bearer'
 | 
			
		||||
        }
 | 
			
		||||
        account_data_dict = dict(email=self.email, name='Full Name')
 | 
			
		||||
        self.assertEqual(len(mail.outbox), 0)
 | 
			
		||||
        with self.settings(SEND_LOGIN_EMAILS=True):
 | 
			
		||||
            # Verify that the right thing happens with an invalid-format OTP
 | 
			
		||||
            result = self.github_oauth2_test(token_data_dict, account_data_dict, subdomain='zulip',
 | 
			
		||||
                                             mobile_flow_otp="1234")
 | 
			
		||||
            self.assert_json_error(result, "Invalid OTP")
 | 
			
		||||
            result = self.github_oauth2_test(token_data_dict, account_data_dict, subdomain='zulip',
 | 
			
		||||
                                             mobile_flow_otp="invalido" * 8)
 | 
			
		||||
            self.assert_json_error(result, "Invalid OTP")
 | 
			
		||||
 | 
			
		||||
            # Now do it correctly
 | 
			
		||||
            result = self.github_oauth2_test(token_data_dict, account_data_dict, subdomain='zulip',
 | 
			
		||||
                                             mobile_flow_otp=mobile_flow_otp)
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        redirect_url = result['Location']
 | 
			
		||||
        parsed_url = urllib.parse.urlparse(redirect_url)
 | 
			
		||||
        query_params = urllib.parse.parse_qs(parsed_url.query)
 | 
			
		||||
        self.assertEqual(parsed_url.scheme, 'zulip')
 | 
			
		||||
        self.assertEqual(query_params["realm"], ['http://zulip.testserver'])
 | 
			
		||||
        self.assertEqual(query_params["email"], [self.example_email("hamlet")])
 | 
			
		||||
        encrypted_api_key = query_params["otp_encrypted_api_key"][0]
 | 
			
		||||
        self.assertEqual(self.example_user('hamlet').api_key,
 | 
			
		||||
                         otp_decrypt_api_key(encrypted_api_key, mobile_flow_otp))
 | 
			
		||||
        self.assertEqual(len(mail.outbox), 1)
 | 
			
		||||
        self.assertIn('Zulip on Android', mail.outbox[0].body)
 | 
			
		||||
 | 
			
		||||
    def test_github_oauth2_registration(self) -> None:
 | 
			
		||||
        """If the user doesn't exist yet, GitHub auth can be used to register an account"""
 | 
			
		||||
        email = "newuser@zulip.com"
 | 
			
		||||
        name = 'Full Name'
 | 
			
		||||
        realm = get_realm("zulip")
 | 
			
		||||
        token_data_dict = {
 | 
			
		||||
            'access_token': 'foobar',
 | 
			
		||||
            'token_type': 'bearer'
 | 
			
		||||
        }
 | 
			
		||||
        account_data_dict = dict(email=email, name=name)
 | 
			
		||||
        result = self.github_oauth2_test(token_data_dict, account_data_dict,
 | 
			
		||||
                                         subdomain='zulip', is_signup='1')
 | 
			
		||||
 | 
			
		||||
        data = load_subdomain_token(result)
 | 
			
		||||
        self.assertEqual(data['email'], email)
 | 
			
		||||
        self.assertEqual(data['name'], name)
 | 
			
		||||
        self.assertEqual(data['subdomain'], 'zulip')
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        parsed_url = urllib.parse.urlparse(result.url)
 | 
			
		||||
        uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
 | 
			
		||||
                                 parsed_url.path)
 | 
			
		||||
        self.assertTrue(uri.startswith('http://zulip.testserver/accounts/login/subdomain/'))
 | 
			
		||||
 | 
			
		||||
        result = self.client_get(result.url)
 | 
			
		||||
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        confirmation = Confirmation.objects.all().first()
 | 
			
		||||
        confirmation_key = confirmation.confirmation_key
 | 
			
		||||
        self.assertIn('do_confirm/' + confirmation_key, result.url)
 | 
			
		||||
        result = self.client_get(result.url)
 | 
			
		||||
        self.assert_in_response('action="/accounts/register/"', result)
 | 
			
		||||
        data = {"from_confirmation": "1",
 | 
			
		||||
                "full_name": name,
 | 
			
		||||
                "key": confirmation_key}
 | 
			
		||||
        result = self.client_post('/accounts/register/', data)
 | 
			
		||||
        self.assert_in_response("You're almost there", result)
 | 
			
		||||
 | 
			
		||||
        # Verify that the user is asked for name but not password
 | 
			
		||||
        self.assert_not_in_success_response(['id_password'], result)
 | 
			
		||||
        self.assert_in_success_response(['id_full_name'], result)
 | 
			
		||||
 | 
			
		||||
        # Click confirm registration button.
 | 
			
		||||
        result = self.client_post(
 | 
			
		||||
            '/accounts/register/',
 | 
			
		||||
            {'full_name': name,
 | 
			
		||||
             'key': confirmation_key,
 | 
			
		||||
             'terms': True})
 | 
			
		||||
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        user_profile = get_user(email, realm)
 | 
			
		||||
        self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
 | 
			
		||||
 | 
			
		||||
class GitHubAuthBackendLegacyTest(ZulipTestCase):
 | 
			
		||||
    def setUp(self) -> None:
 | 
			
		||||
        self.user_profile = self.example_user('hamlet')
 | 
			
		||||
        self.email = self.user_profile.email
 | 
			
		||||
        self.name = 'Hamlet'
 | 
			
		||||
        self.backend = GitHubAuthBackend()
 | 
			
		||||
        self.backend.strategy = DjangoStrategy(storage=BaseDjangoStorage())
 | 
			
		||||
        self.user_profile.backend = self.backend
 | 
			
		||||
 | 
			
		||||
        rf = RequestFactory()
 | 
			
		||||
        request = rf.get('/complete')
 | 
			
		||||
        request.session = {}
 | 
			
		||||
        request.get_host = lambda: 'zulip.testserver'
 | 
			
		||||
        request.user = self.user_profile
 | 
			
		||||
        self.backend.strategy.request = request
 | 
			
		||||
 | 
			
		||||
    def do_auth(self, *args: Any, **kwargs: Any) -> UserProfile:
 | 
			
		||||
        with self.settings(AUTHENTICATION_BACKENDS=('zproject.backends.GitHubAuthBackend',)):
 | 
			
		||||
            return authenticate(**kwargs)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user