mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-03 21:43:21 +00:00 
			
		
		
		
	auth: Support desktop_flow_otp with remote_user_sso.
This commit is contained in:
		
				
					committed by
					
						
						Tim Abbott
					
				
			
			
				
	
			
			
			
						parent
						
							f8bcadfc63
						
					
				
				
					commit
					eea68ce92d
				
			@@ -2396,6 +2396,92 @@ class TestZulipRemoteUserBackend(ZulipTestCase):
 | 
			
		||||
        self.assertEqual(len(mail.outbox), 1)
 | 
			
		||||
        self.assertIn('Zulip on Android', mail.outbox[0].body)
 | 
			
		||||
 | 
			
		||||
    @override_settings(SEND_LOGIN_EMAILS=True)
 | 
			
		||||
    @override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipRemoteUserBackend',
 | 
			
		||||
                                                'zproject.backends.ZulipDummyBackend'))
 | 
			
		||||
    def test_login_desktop_flow_otp_success_email(self) -> None:
 | 
			
		||||
        user_profile = self.example_user('hamlet')
 | 
			
		||||
        email = user_profile.email
 | 
			
		||||
        user_profile.date_joined = timezone_now() - datetime.timedelta(seconds=61)
 | 
			
		||||
        user_profile.save()
 | 
			
		||||
        desktop_flow_otp = '1234abcd' * 8
 | 
			
		||||
 | 
			
		||||
        # Verify that the right thing happens with an invalid-format OTP
 | 
			
		||||
        result = self.client_post('/accounts/login/sso/',
 | 
			
		||||
                                  dict(desktop_flow_otp="1234"),
 | 
			
		||||
                                  REMOTE_USER=email)
 | 
			
		||||
        self.assert_logged_in_user_id(None)
 | 
			
		||||
        self.assert_json_error_contains(result, "Invalid OTP", 400)
 | 
			
		||||
 | 
			
		||||
        result = self.client_post('/accounts/login/sso/',
 | 
			
		||||
                                  dict(desktop_flow_otp="invalido" * 8),
 | 
			
		||||
                                  REMOTE_USER=email)
 | 
			
		||||
        self.assert_logged_in_user_id(None)
 | 
			
		||||
        self.assert_json_error_contains(result, "Invalid OTP", 400)
 | 
			
		||||
 | 
			
		||||
        result = self.client_post('/accounts/login/sso/',
 | 
			
		||||
                                  dict(desktop_flow_otp=desktop_flow_otp),
 | 
			
		||||
                                  REMOTE_USER=email)
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        redirect_url = result['Location']
 | 
			
		||||
        parsed_url = urllib.parse.urlparse(redirect_url)
 | 
			
		||||
        query_params = urllib.parse.parse_qs(parsed_url.query)
 | 
			
		||||
        self.assertEqual(parsed_url.scheme, 'zulip')
 | 
			
		||||
        self.assertEqual(query_params["realm"], ['http://zulip.testserver'])
 | 
			
		||||
        self.assertEqual(query_params["email"], [self.example_email("hamlet")])
 | 
			
		||||
 | 
			
		||||
        encrypted_key = query_params["otp_encrypted_login_key"][0]
 | 
			
		||||
        decrypted_key = otp_decrypt_api_key(encrypted_key, desktop_flow_otp)
 | 
			
		||||
        auth_url = 'http://zulip.testserver/accounts/login/subdomain/{}'.format(decrypted_key)
 | 
			
		||||
 | 
			
		||||
        result = self.client_get(auth_url)
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        self.assert_logged_in_user_id(user_profile.id)
 | 
			
		||||
 | 
			
		||||
    @override_settings(SEND_LOGIN_EMAILS=True)
 | 
			
		||||
    @override_settings(SSO_APPEND_DOMAIN="zulip.com")
 | 
			
		||||
    @override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipRemoteUserBackend',
 | 
			
		||||
                                                'zproject.backends.ZulipDummyBackend'))
 | 
			
		||||
    def test_login_desktop_flow_otp_success_username(self) -> None:
 | 
			
		||||
        user_profile = self.example_user('hamlet')
 | 
			
		||||
        email = user_profile.email
 | 
			
		||||
        remote_user = email_to_username(email)
 | 
			
		||||
        user_profile.date_joined = timezone_now() - datetime.timedelta(seconds=61)
 | 
			
		||||
        user_profile.save()
 | 
			
		||||
        desktop_flow_otp = '1234abcd' * 8
 | 
			
		||||
 | 
			
		||||
        # Verify that the right thing happens with an invalid-format OTP
 | 
			
		||||
        result = self.client_post('/accounts/login/sso/',
 | 
			
		||||
                                  dict(desktop_flow_otp="1234"),
 | 
			
		||||
                                  REMOTE_USER=remote_user)
 | 
			
		||||
        self.assert_logged_in_user_id(None)
 | 
			
		||||
        self.assert_json_error_contains(result, "Invalid OTP", 400)
 | 
			
		||||
 | 
			
		||||
        result = self.client_post('/accounts/login/sso/',
 | 
			
		||||
                                  dict(desktop_flow_otp="invalido" * 8),
 | 
			
		||||
                                  REMOTE_USER=remote_user)
 | 
			
		||||
        self.assert_logged_in_user_id(None)
 | 
			
		||||
        self.assert_json_error_contains(result, "Invalid OTP", 400)
 | 
			
		||||
 | 
			
		||||
        result = self.client_post('/accounts/login/sso/',
 | 
			
		||||
                                  dict(desktop_flow_otp=desktop_flow_otp),
 | 
			
		||||
                                  REMOTE_USER=remote_user)
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        redirect_url = result['Location']
 | 
			
		||||
        parsed_url = urllib.parse.urlparse(redirect_url)
 | 
			
		||||
        query_params = urllib.parse.parse_qs(parsed_url.query)
 | 
			
		||||
        self.assertEqual(parsed_url.scheme, 'zulip')
 | 
			
		||||
        self.assertEqual(query_params["realm"], ['http://zulip.testserver'])
 | 
			
		||||
        self.assertEqual(query_params["email"], [self.example_email("hamlet")])
 | 
			
		||||
 | 
			
		||||
        encrypted_key = query_params["otp_encrypted_login_key"][0]
 | 
			
		||||
        decrypted_key = otp_decrypt_api_key(encrypted_key, desktop_flow_otp)
 | 
			
		||||
        auth_url = 'http://zulip.testserver/accounts/login/subdomain/{}'.format(decrypted_key)
 | 
			
		||||
 | 
			
		||||
        result = self.client_get(auth_url)
 | 
			
		||||
        self.assertEqual(result.status_code, 302)
 | 
			
		||||
        self.assert_logged_in_user_id(user_profile.id)
 | 
			
		||||
 | 
			
		||||
    def test_redirect_to(self) -> None:
 | 
			
		||||
        """This test verifies the behavior of the redirect_to logic in
 | 
			
		||||
        login_or_register_remote_user."""
 | 
			
		||||
 
 | 
			
		||||
@@ -295,7 +295,8 @@ def create_response_for_otp_flow(key: str, otp: str, user_profile: UserProfile,
 | 
			
		||||
@log_view_func
 | 
			
		||||
@has_request_variables
 | 
			
		||||
def remote_user_sso(request: HttpRequest,
 | 
			
		||||
                    mobile_flow_otp: Optional[str]=REQ(default=None)) -> HttpResponse:
 | 
			
		||||
                    mobile_flow_otp: Optional[str]=REQ(default=None),
 | 
			
		||||
                    desktop_flow_otp: Optional[str]=REQ(default=None)) -> HttpResponse:
 | 
			
		||||
    subdomain = get_subdomain(request)
 | 
			
		||||
    try:
 | 
			
		||||
        realm = get_realm(subdomain)  # type: Optional[Realm]
 | 
			
		||||
@@ -316,12 +317,15 @@ def remote_user_sso(request: HttpRequest,
 | 
			
		||||
    # enabled.
 | 
			
		||||
    validate_login_email(remote_user_to_email(remote_user))
 | 
			
		||||
 | 
			
		||||
    # Here we support the mobile flow for REMOTE_USER_BACKEND; we
 | 
			
		||||
    # Here we support the mobile and desktop flow for REMOTE_USER_BACKEND; we
 | 
			
		||||
    # validate the data format and then pass it through to
 | 
			
		||||
    # login_or_register_remote_user if appropriate.
 | 
			
		||||
    if mobile_flow_otp is not None:
 | 
			
		||||
        if not is_valid_otp(mobile_flow_otp):
 | 
			
		||||
            raise JsonableError(_("Invalid OTP"))
 | 
			
		||||
    if desktop_flow_otp is not None:
 | 
			
		||||
        if not is_valid_otp(desktop_flow_otp):
 | 
			
		||||
            raise JsonableError(_("Invalid OTP"))
 | 
			
		||||
 | 
			
		||||
    subdomain = get_subdomain(request)
 | 
			
		||||
    if realm is None:
 | 
			
		||||
@@ -333,6 +337,8 @@ def remote_user_sso(request: HttpRequest,
 | 
			
		||||
 | 
			
		||||
    return login_or_register_remote_user(request, remote_user, user_profile,
 | 
			
		||||
                                         mobile_flow_otp=mobile_flow_otp,
 | 
			
		||||
                                         desktop_flow_otp=desktop_flow_otp,
 | 
			
		||||
                                         realm=realm,
 | 
			
		||||
                                         redirect_to=redirect_to)
 | 
			
		||||
 | 
			
		||||
@csrf_exempt
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user