auth: Add ExternalAuthResult to manage data in authentication flows.

This new type eliminates a bunch of messy code that previously
involved passing around long lists of mixed positional keyword and
arguments, instead using a consistent data object for communicating
about the state of an external authentication (constructed in
backends.py).

The result is a significantly more readable interface between
zproject/backends.py and zerver/views/auth.py, though likely more
could be done.

This has the side effect of renaming fields for internally passed
structures from name->full_name, next->redirect_to; this results in
most of the test codebase changes.

Modified by tabbott to add comments and collaboratively rewrite the
initialization logic.
This commit is contained in:
Mateusz Mandera
2020-02-23 18:58:08 +01:00
committed by Tim Abbott
parent 3f69500765
commit f1ec02b40a
6 changed files with 255 additions and 265 deletions

View File

@@ -23,7 +23,6 @@ from zerver.tornado import event_queue
from zerver.tornado.handlers import allocate_handler_id
from zerver.worker import queue_processors
from zerver.lib.integrations import WEBHOOK_INTEGRATIONS
from zerver.views.auth import get_login_data
from zerver.models import (
get_realm,
@@ -36,6 +35,8 @@ from zerver.models import (
UserProfile,
)
from zproject.backends import ExternalAuthResult, ExternalAuthDataDict
if TYPE_CHECKING:
# Avoid an import cycle; we only need these for type annotations.
from zerver.lib.test_classes import ZulipTestCase, MigrationsTestCase
@@ -445,10 +446,10 @@ def write_instrumentation_reports(full_suite: bool, include_webhooks: bool) -> N
print(" %s" % (untested_pattern,))
sys.exit(1)
def load_subdomain_token(response: HttpResponse) -> Dict[str, Any]:
def load_subdomain_token(response: HttpResponse) -> ExternalAuthDataDict:
assert isinstance(response, HttpResponseRedirect)
token = response.url.rsplit('/', 1)[1]
data = get_login_data(token, should_delete=False)
data = ExternalAuthResult(login_token=token, delete_stored_data=False).data_dict
assert data is not None
return data

View File

@@ -66,10 +66,10 @@ from zproject.backends import ZulipDummyBackend, EmailAuthBackend, \
ZulipLDAPException, query_ldap, sync_user_from_ldap, SocialAuthMixin, \
PopulateUserLDAPError, SAMLAuthBackend, saml_auth_enabled, email_belongs_to_ldap, \
get_external_method_dicts, AzureADAuthBackend, check_password_strength, \
ZulipLDAPUser, RateLimitedAuthenticationByUsername
ZulipLDAPUser, RateLimitedAuthenticationByUsername, ExternalAuthResult, \
ExternalAuthDataDict
from zerver.views.auth import (maybe_send_to_registration,
store_login_data, LOGIN_TOKEN_LENGTH)
from zerver.views.auth import maybe_send_to_registration
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.response import OneLogin_Saml2_Response
@@ -842,9 +842,9 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase):
subdomain='zulip', next='/user_uploads/image')
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], self.name)
self.assertEqual(data['full_name'], self.name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(data['next'], '/user_uploads/image')
self.assertEqual(data['redirect_to'], '/user_uploads/image')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
@@ -860,9 +860,9 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase):
next='/user_uploads/image')
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], self.name)
self.assertEqual(data['full_name'], self.name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(data['next'], '/user_uploads/image')
self.assertEqual(data['redirect_to'], '/user_uploads/image')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
@@ -1012,7 +1012,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase):
subdomain='zulip', is_signup=True)
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], 'Full Name')
# Verify data has the full_name consistent with the user we're logging in as.
self.assertEqual(data['full_name'], self.example_user("hamlet").full_name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
@@ -1031,7 +1032,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase):
expect_confirm_registration_page: bool=False,) -> None:
data = load_subdomain_token(result)
self.assertEqual(data['email'], email)
self.assertEqual(data['name'], name)
self.assertEqual(data['full_name'], name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
@@ -1055,9 +1056,9 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase):
do_confirm_url = result.url
result = self.client_get(do_confirm_url, name = name)
self.assert_in_response('action="/accounts/register/"', result)
data = {"from_confirmation": "1",
confirmation_data = {"from_confirmation": "1",
"key": confirmation_key}
result = self.client_post('/accounts/register/', data)
result = self.client_post('/accounts/register/', confirmation_data)
if not skip_registration_form:
self.assert_in_response("We just need you to do one last thing", result)
@@ -1218,7 +1219,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase):
self.assertEqual(result.status_code, 302)
data = load_subdomain_token(result)
self.assertEqual(data['email'], email)
self.assertEqual(data['name'], name)
self.assertEqual(data['full_name'], name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
@@ -1243,7 +1244,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase):
self.assertEqual(result.status_code, 302)
data = load_subdomain_token(result)
self.assertEqual(data['email'], email)
self.assertEqual(data['name'], name)
self.assertEqual(data['full_name'], name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
@@ -1749,7 +1750,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
subdomain='zulip')
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], self.name)
self.assertEqual(data['full_name'], self.name)
self.assertEqual(data['subdomain'], 'zulip')
@override_settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip')
@@ -1774,7 +1775,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
subdomain='zulip')
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], self.name)
self.assertEqual(data['full_name'], self.name)
self.assertEqual(data['subdomain'], 'zulip')
def test_github_auth_enabled(self) -> None:
@@ -1800,9 +1801,9 @@ class GitHubAuthBackendTest(SocialAuthBase):
next='/user_uploads/image')
data = load_subdomain_token(result)
self.assertEqual(data['email'], 'nonprimary@zulip.com')
self.assertEqual(data['name'], 'Non Primary')
self.assertEqual(data['full_name'], 'Non Primary')
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(data['next'], '/user_uploads/image')
self.assertEqual(data['redirect_to'], '/user_uploads/image')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
@@ -1826,9 +1827,9 @@ class GitHubAuthBackendTest(SocialAuthBase):
next='/user_uploads/image')
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], self.name)
self.assertEqual(data['full_name'], self.name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(data['next'], '/user_uploads/image')
self.assertEqual(data['redirect_to'], '/user_uploads/image')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
@@ -1857,9 +1858,9 @@ class GitHubAuthBackendTest(SocialAuthBase):
next='/user_uploads/image')
data = load_subdomain_token(result)
self.assertEqual(data['email'], account_data_dict["email"])
self.assertEqual(data['name'], self.name)
self.assertEqual(data['full_name'], self.name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(data['next'], '/user_uploads/image')
self.assertEqual(data['redirect_to'], '/user_uploads/image')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
@@ -1870,6 +1871,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
# In the login flow, if multiple of the user's verified emails
# are associated with existing accounts, we expect the choose
# email screen to select which account to use.
hamlet = self.example_user("hamlet")
account_data_dict = dict(email='hamlet@zulip.com', name="Hamlet")
email_data = [
dict(email=account_data_dict["email"],
@@ -1888,9 +1890,9 @@ class GitHubAuthBackendTest(SocialAuthBase):
next='/user_uploads/image')
data = load_subdomain_token(result)
self.assertEqual(data['email'], 'hamlet@zulip.com')
self.assertEqual(data['name'], 'Hamlet')
self.assertEqual(data['full_name'], hamlet.full_name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(data['next'], '/user_uploads/image')
self.assertEqual(data['redirect_to'], '/user_uploads/image')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
@@ -1942,9 +1944,9 @@ class GitHubAuthBackendTest(SocialAuthBase):
next='/user_uploads/image')
data = load_subdomain_token(result)
self.assertEqual(data['email'], account_data_dict["email"])
self.assertEqual(data['name'], account_data_dict["name"])
self.assertEqual(data['full_name'], account_data_dict["name"])
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(data['next'], '/user_uploads/image')
self.assertEqual(data['redirect_to'], '/user_uploads/image')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
@@ -2112,10 +2114,10 @@ class GoogleAuthBackendTest(SocialAuthBase):
with self.settings(AUTHENTICATION_BACKENDS=('zproject.backends.GoogleAuthBackend',)):
self.assertTrue(google_auth_enabled())
def get_log_into_subdomain(self, data: Dict[str, Any], *, subdomain: str='zulip',
def get_log_into_subdomain(self, data: ExternalAuthDataDict, *, subdomain: str='zulip',
force_token: Optional[str]=None) -> HttpResponse:
if force_token is None:
token = store_login_data(data)
token = ExternalAuthResult(data_dict=data).store_data()
else:
token = force_token
url_path = reverse('zerver.views.auth.log_into_subdomain', args=[token])
@@ -2123,14 +2125,14 @@ class GoogleAuthBackendTest(SocialAuthBase):
def test_redirect_to_next_url_for_log_into_subdomain(self) -> None:
def test_redirect_to_next_url(next: str='') -> HttpResponse:
data = {'name': 'Hamlet',
data = {'full_name': 'Hamlet',
'email': self.example_email("hamlet"),
'subdomain': 'zulip',
'is_signup': False,
'next': next}
'redirect_to': next} # type: ExternalAuthDataDict
user_profile = self.example_user('hamlet')
with mock.patch(
'zerver.views.auth.authenticate_remote_user',
'zerver.views.auth.authenticate',
return_value=user_profile):
with mock.patch('zerver.views.auth.do_login'):
result = self.get_log_into_subdomain(data)
@@ -2148,24 +2150,24 @@ class GoogleAuthBackendTest(SocialAuthBase):
self.assertEqual(res.url, 'http://zulip.testserver/#narrow/stream/7-test-here')
def test_log_into_subdomain_when_token_is_malformed(self) -> None:
data = {'name': 'Full Name',
data = {'full_name': 'Full Name',
'email': self.example_email("hamlet"),
'subdomain': 'zulip',
'is_signup': False,
'next': ''}
'redirect_to': ''} # type: ExternalAuthDataDict
with mock.patch("logging.warning") as mock_warn:
result = self.get_log_into_subdomain(data, force_token='nonsense')
mock_warn.assert_called_once_with("log_into_subdomain: Malformed token given: nonsense")
self.assertEqual(result.status_code, 400)
def test_log_into_subdomain_when_token_not_found(self) -> None:
data = {'name': 'Full Name',
data = {'full_name': 'Full Name',
'email': self.example_email("hamlet"),
'subdomain': 'zulip',
'is_signup': False,
'next': ''}
'redirect_to': ''} # type: ExternalAuthDataDict
with mock.patch("logging.warning") as mock_warn:
token = generate_random_token(LOGIN_TOKEN_LENGTH)
token = generate_random_token(ExternalAuthResult.LOGIN_TOKEN_LENGTH)
result = self.get_log_into_subdomain(data, force_token=token)
mock_warn.assert_called_once_with("log_into_subdomain: Invalid token given: %s" % (token,))
self.assertEqual(result.status_code, 400)
@@ -2177,21 +2179,23 @@ class GoogleAuthBackendTest(SocialAuthBase):
existing_user.email = 'whatever@zulip.com'
existing_user.save()
data = {'name': 'Full Name',
data = {'full_name': 'Full Name',
'email': 'existing@zulip.com',
'subdomain': 'zulip',
'is_signup': True,
'next': ''}
'redirect_to': ''} # type: ExternalAuthDataDict
result = self.get_log_into_subdomain(data)
self.assertEqual(result.status_code, 200)
self.assert_in_response('existing@zulip.com already has an account', result)
# Should simply get logged into the existing account:
self.assertEqual(result.status_code, 302)
self.assert_logged_in_user_id(existing_user.id)
def test_log_into_subdomain_when_is_signup_is_true_and_new_user(self) -> None:
data = {'name': 'New User Name',
data = {'full_name': 'New User Name',
'email': 'new@zulip.com',
'subdomain': 'zulip',
'is_signup': True,
'next': ''}
'redirect_to': ''} # type: ExternalAuthDataDict
result = self.get_log_into_subdomain(data)
self.assertEqual(result.status_code, 302)
confirmation = Confirmation.objects.all().first()
@@ -2199,10 +2203,10 @@ class GoogleAuthBackendTest(SocialAuthBase):
self.assertIn('do_confirm/' + confirmation_key, result.url)
result = self.client_get(result.url)
self.assert_in_response('action="/accounts/register/"', result)
data = {"from_confirmation": "1",
"full_name": data['name'],
confirmation_data = {"from_confirmation": "1",
"full_name": data['full_name'],
"key": confirmation_key}
result = self.client_post('/accounts/register/', data, subdomain="zulip")
result = self.client_post('/accounts/register/', confirmation_data, subdomain="zulip")
self.assert_in_response("We just need you to do one last thing", result)
# Verify that the user is asked for name but not password
@@ -2210,11 +2214,11 @@ class GoogleAuthBackendTest(SocialAuthBase):
self.assert_in_success_response(['id_full_name'], result)
def test_log_into_subdomain_when_is_signup_is_false_and_new_user(self) -> None:
data = {'name': 'New User Name',
data = {'full_name': 'New User Name',
'email': 'new@zulip.com',
'subdomain': 'zulip',
'is_signup': False,
'next': ''}
'redirect_to': ''} # type: ExternalAuthDataDict
result = self.get_log_into_subdomain(data)
self.assertEqual(result.status_code, 200)
self.assert_in_response('No account found for', result)
@@ -2227,10 +2231,10 @@ class GoogleAuthBackendTest(SocialAuthBase):
self.assertIn('do_confirm/' + confirmation_key, url)
result = self.client_get(url)
self.assert_in_response('action="/accounts/register/"', result)
data = {"from_confirmation": "1",
"full_name": data['name'],
confirmation_data = {"from_confirmation": "1",
"full_name": data['full_name'],
"key": confirmation_key}
result = self.client_post('/accounts/register/', data, subdomain="zulip")
result = self.client_post('/accounts/register/', confirmation_data, subdomain="zulip")
self.assert_in_response("We just need you to do one last thing", result)
# Verify that the user is asked for name but not password
@@ -2238,11 +2242,11 @@ class GoogleAuthBackendTest(SocialAuthBase):
self.assert_in_success_response(['id_full_name'], result)
def test_log_into_subdomain_when_using_invite_link(self) -> None:
data = {'name': 'New User Name',
data = {'full_name': 'New User Name',
'email': 'new@zulip.com',
'subdomain': 'zulip',
'is_signup': True,
'next': ''}
'redirect_to': ''} # type: ExternalAuthDataDict
realm = get_realm("zulip")
realm.invite_required = True
@@ -2277,7 +2281,7 @@ class GoogleAuthBackendTest(SocialAuthBase):
result = self.client_get(result.url)
self.assert_in_response('action="/accounts/register/"', result)
data2 = {"from_confirmation": "1",
"full_name": data['name'],
"full_name": data['full_name'],
"key": confirmation_key}
result = self.client_post('/accounts/register/', data2, subdomain="zulip")
self.assert_in_response("We just need you to do one last thing", result)
@@ -2298,20 +2302,21 @@ class GoogleAuthBackendTest(SocialAuthBase):
self.assertEqual(sorted(new_streams), stream_names)
def test_log_into_subdomain_when_email_is_none(self) -> None:
data = {'name': None,
data = {'full_name': None,
'email': None,
'subdomain': 'zulip',
'is_signup': False,
'next': ''}
'redirect_to': ''} # type: ExternalAuthDataDict
with mock.patch('logging.warning'):
with mock.patch('logging.warning') as mock_warn:
result = self.get_log_into_subdomain(data)
self.assert_in_success_response(["You need an invitation to join this organization."], result)
self.assertEqual(result.status_code, 400)
mock_warn.assert_called_once()
def test_user_cannot_log_into_wrong_subdomain_with_cookie(self) -> None:
data = {'name': 'Full Name',
def test_user_cannot_log_into_wrong_subdomain(self) -> None:
data = {'full_name': 'Full Name',
'email': self.example_email("hamlet"),
'subdomain': 'zephyr'}
'subdomain': 'zephyr'} # type: ExternalAuthDataDict
result = self.get_log_into_subdomain(data)
self.assert_json_error(result, "Invalid subdomain")

View File

@@ -67,6 +67,8 @@ from zerver.lib.test_classes import (
from zerver.lib.name_restrictions import is_disposable_domain
from zerver.context_processors import common_context
from zproject.backends import ExternalAuthResult, ExternalAuthDataDict
import re
import smtplib
import time
@@ -78,47 +80,44 @@ import urllib
import pytz
class RedirectAndLogIntoSubdomainTestCase(ZulipTestCase):
def test_cookie_data(self) -> None:
realm = Realm.objects.all().first()
name = 'Hamlet'
email = self.example_email("hamlet")
response = redirect_and_log_into_subdomain(realm, name, email)
def test_data(self) -> None:
realm = get_realm("zulip")
user_profile = self.example_user("hamlet")
name = user_profile.full_name
email = user_profile.delivery_email
response = redirect_and_log_into_subdomain(ExternalAuthResult(user_profile=user_profile))
data = load_subdomain_token(response)
self.assertDictEqual(data, {'name': name, 'next': '',
self.assertDictEqual(data, {'full_name': name,
'email': email,
'full_name_validated': False,
'subdomain': realm.subdomain,
'is_signup': False,
'desktop_flow_otp': None,
'mobile_flow_otp': None,
'multiuse_object_key': ''})
'is_signup': False})
response = redirect_and_log_into_subdomain(realm, name, email,
is_signup=True,
multiuse_object_key='key')
data_dict = ExternalAuthDataDict(is_signup=True, multiuse_object_key='key')
response = redirect_and_log_into_subdomain(ExternalAuthResult(user_profile=user_profile,
data_dict=data_dict))
data = load_subdomain_token(response)
self.assertDictEqual(data, {'name': name, 'next': '',
self.assertDictEqual(data, {'full_name': name,
'email': email,
'full_name_validated': False,
'subdomain': realm.subdomain,
'is_signup': True,
'desktop_flow_otp': None,
'mobile_flow_otp': None,
# the email has an account at the subdomain,
# so is_signup get overridden to False:
'is_signup': False,
'multiuse_object_key': 'key'
})
response = redirect_and_log_into_subdomain(realm, name, email,
data_dict = ExternalAuthDataDict(email=self.nonreg_email("alice"),
full_name="Alice",
subdomain=realm.subdomain,
is_signup=True,
full_name_validated=True,
multiuse_object_key='key')
response = redirect_and_log_into_subdomain(ExternalAuthResult(data_dict=data_dict))
data = load_subdomain_token(response)
self.assertDictEqual(data, {'name': name, 'next': '',
'email': email,
self.assertDictEqual(data, {'full_name': "Alice",
'email': self.nonreg_email("alice"),
'full_name_validated': True,
'subdomain': realm.subdomain,
'is_signup': True,
'desktop_flow_otp': None,
'mobile_flow_otp': None,
'multiuse_object_key': 'key'
})

View File

@@ -17,7 +17,7 @@ from django.views.generic import TemplateView
from django.utils.translation import ugettext as _
from django.utils.http import is_safe_url
import urllib
from typing import Any, Dict, List, Optional, Mapping
from typing import Any, Dict, List, Optional, Mapping, cast
from confirmation.models import Confirmation, create_confirmation_link
from zerver.context_processors import zulip_default_context, get_realm_from_request, \
@@ -28,7 +28,6 @@ from zerver.forms import HomepageForm, OurAuthenticationForm, \
from zerver.lib.mobile_auth_otp import otp_encrypt_api_key
from zerver.lib.push_notifications import push_notifications_enabled
from zerver.lib.realm_icon import realm_icon_url
from zerver.lib.redis_utils import get_redis_client, get_dict_from_redis, put_dict_in_redis
from zerver.lib.request import REQ, has_request_variables, JsonableError
from zerver.lib.response import json_success, json_error
from zerver.lib.sessions import set_expirable_session_var
@@ -44,7 +43,8 @@ from zerver.signals import email_on_new_login
from zproject.backends import password_auth_enabled, dev_auth_enabled, \
ldap_auth_enabled, ZulipLDAPConfigurationError, ZulipLDAPAuthBackend, \
AUTH_BACKEND_NAME_MAP, auth_enabled_helper, saml_auth_enabled, SAMLAuthBackend, \
redirect_to_config_error, ZulipRemoteUserBackend, validate_otp_params
redirect_to_config_error, ZulipRemoteUserBackend, validate_otp_params, ExternalAuthResult, \
ExternalAuthDataDict
from version import ZULIP_VERSION, API_FEATURE_LEVEL
import jwt
@@ -57,8 +57,6 @@ from two_factor.views import LoginView as BaseTwoFactorLoginView
ExtraContext = Optional[Dict[str, Any]]
redis_client = get_redis_client()
def get_safe_redirect_to(url: str, redirect_host: str) -> str:
is_url_safe = is_safe_url(url=url, allowed_hosts=set(redirect_host))
if is_url_safe:
@@ -210,29 +208,19 @@ def maybe_send_to_registration(request: HttpRequest, email: str, full_name: str=
context.update(extra_context)
return render(request, 'zerver/accounts_home.html', context=context)
def register_remote_user(request: HttpRequest, email: str,
full_name: str='',
mobile_flow_otp: Optional[str]=None,
desktop_flow_otp: Optional[str]=None,
is_signup: bool=False,
multiuse_object_key: str='',
full_name_validated: bool=False) -> HttpResponse:
def register_remote_user(request: HttpRequest, result: ExternalAuthResult) -> HttpResponse:
# We have verified the user controls an email address, but
# there's no associated Zulip user account. Consider sending
# the request to registration.
return maybe_send_to_registration(request, email, full_name, password_required=False,
mobile_flow_otp=mobile_flow_otp,
desktop_flow_otp=desktop_flow_otp,
is_signup=is_signup, multiuse_object_key=multiuse_object_key,
full_name_validated=full_name_validated)
kwargs = cast(Dict[str, Any], result.copy_data_dict())
# maybe_send_to_registration doesn't take these arguments, so delete them.
kwargs.pop('subdomain', None)
kwargs.pop('redirect_to', None)
def login_or_register_remote_user(request: HttpRequest, email: str,
user_profile: Optional[UserProfile], full_name: str='',
mobile_flow_otp: Optional[str]=None,
desktop_flow_otp: Optional[str]=None,
is_signup: bool=False, redirect_to: str='',
multiuse_object_key: str='',
full_name_validated: bool=False) -> HttpResponse:
kwargs["password_required"] = False
return maybe_send_to_registration(request, **kwargs)
def login_or_register_remote_user(request: HttpRequest, result: ExternalAuthResult) -> HttpResponse:
"""Given a successful authentication showing the user controls given
email address (email) and potentially a UserProfile
object (if the user already has a Zulip account), redirect the
@@ -251,17 +239,14 @@ def login_or_register_remote_user(request: HttpRequest, email: str,
* A zulip:// URL to send control back to the mobile or desktop apps if they
are doing authentication using the mobile_flow_otp or desktop_flow_otp flow.
"""
user_profile = result.user_profile
if user_profile is None or user_profile.is_mirror_dummy:
return register_remote_user(request, email, full_name,
is_signup=is_signup,
mobile_flow_otp=mobile_flow_otp,
desktop_flow_otp=desktop_flow_otp,
multiuse_object_key=multiuse_object_key,
full_name_validated=full_name_validated)
return register_remote_user(request, result)
# Otherwise, the user has successfully authenticated to an
# account, and we need to do the right thing depending whether
# or not they're using the mobile OTP flow or want a browser session.
mobile_flow_otp = result.data_dict.get('mobile_flow_otp')
desktop_flow_otp = result.data_dict.get('desktop_flow_otp')
if mobile_flow_otp is not None:
return finish_mobile_flow(request, user_profile, mobile_flow_otp)
elif desktop_flow_otp is not None:
@@ -269,7 +254,7 @@ def login_or_register_remote_user(request: HttpRequest, email: str,
do_login(request, user_profile)
redirect_to = get_safe_redirect_to(redirect_to, user_profile.realm.uri)
redirect_to = get_safe_redirect_to(result.data_dict.get('redirect_to', ''), user_profile.realm.uri)
return HttpResponseRedirect(redirect_to)
def finish_desktop_flow(request: HttpRequest, user_profile: UserProfile,
@@ -278,13 +263,12 @@ def finish_desktop_flow(request: HttpRequest, user_profile: UserProfile,
The desktop otp flow returns to the app (through a zulip:// redirect)
a token that allows obtaining (through log_into_subdomain) a logged in session
for the user account we authenticated in this flow.
The token can only be used once and within LOGIN_KEY_EXPIRATION_SECONDS
The token can only be used once and within ExternalAuthResult.LOGIN_KEY_EXPIRATION_SECONDS
of being created, as nothing more powerful is needed for the desktop flow
and this ensures the key can only be used for completing this authentication attempt.
"""
data = {'email': user_profile.delivery_email,
'subdomain': user_profile.realm.subdomain}
token = store_login_data(data)
result = ExternalAuthResult(user_profile=user_profile)
token = result.store_data()
response = create_response_for_otp_flow(token, otp, user_profile,
encrypted_key_field_name='otp_encrypted_login_key')
browser_url = user_profile.realm.uri + reverse('zerver.views.auth.log_into_subdomain', args=[token])
@@ -366,10 +350,18 @@ def remote_user_sso(request: HttpRequest,
redirect_to = request.GET.get('next', '')
email = remote_user_to_email(remote_user)
return login_or_register_remote_user(request, email, user_profile,
data_dict = ExternalAuthDataDict(
email=email,
mobile_flow_otp=mobile_flow_otp,
desktop_flow_otp=desktop_flow_otp,
redirect_to=redirect_to)
redirect_to=redirect_to
)
if realm:
data_dict["subdomain"] = realm.subdomain
else:
data_dict["subdomain"] = '' # realm creation happens on root subdomain
result = ExternalAuthResult(user_profile=user_profile, data_dict=data_dict)
return login_or_register_remote_user(request, result)
@csrf_exempt
@log_view_func
@@ -403,8 +395,16 @@ def remote_user_jwt(request: HttpRequest) -> HttpResponse:
except Realm.DoesNotExist:
raise JsonableError(_("Wrong subdomain"))
user_profile = authenticate_remote_user(realm, email)
return login_or_register_remote_user(request, email, user_profile, remote_user)
user_profile = authenticate(username=email,
realm=realm,
use_dummy_backend=True)
if user_profile is None:
result = ExternalAuthResult(data_dict={"email": email, "full_name": remote_user,
"subdomain": realm.subdomain})
else:
result = ExternalAuthResult(user_profile=user_profile)
return login_or_register_remote_user(request, result)
def oauth_redirect_to_root(request: HttpRequest, url: str,
sso_type: str, is_signup: bool=False,
@@ -485,27 +485,7 @@ def start_social_signup(request: HttpRequest, backend: str, extra_arg: Optional[
return oauth_redirect_to_root(request, backend_url, 'social', is_signup=True,
extra_url_params=extra_url_params)
def authenticate_remote_user(realm: Realm,
email_address: Optional[str]) -> Optional[UserProfile]:
if email_address is None:
# No need to authenticate if email address is None. We already
# know that user_profile would be None as well. In fact, if we
# call authenticate in this case, we might get an exception from
# ZulipDummyBackend which doesn't accept a None as a username.
logging.warning("Email address was None while trying to authenticate "
"remote user.")
return None
user_profile = authenticate(username=email_address,
realm=realm,
use_dummy_backend=True)
return user_profile
_subdomain_token_salt = 'zerver.views.auth.log_into_subdomain'
LOGIN_KEY_PREFIX = "login_key_"
LOGIN_KEY_FORMAT = LOGIN_KEY_PREFIX + "{token}"
LOGIN_KEY_EXPIRATION_SECONDS = 15
LOGIN_TOKEN_LENGTH = UserProfile.API_KEY_LENGTH
@log_view_func
def log_into_subdomain(request: HttpRequest, token: str) -> HttpResponse:
@@ -513,102 +493,26 @@ def log_into_subdomain(request: HttpRequest, token: str) -> HttpResponse:
redirect_and_log_into_subdomain called on auth.zulip.example.com),
call login_or_register_remote_user, passing all the authentication
result data that has been stored in redis, associated with this token.
Obligatory fields for the data are 'subdomain' and 'email', because this endpoint
needs to know which user and realm to log into. Others are optional and only used
if the user account still needs to be made and they're passed as argument to the
register_remote_user function.
"""
if not has_api_key_format(token): # The tokens are intended to have the same format as API keys.
logging.warning("log_into_subdomain: Malformed token given: %s" % (token,))
return HttpResponse(status=400)
data = get_login_data(token)
if data is None:
try:
result = ExternalAuthResult(login_token=token)
except ExternalAuthResult.InvalidTokenError:
logging.warning("log_into_subdomain: Invalid token given: %s" % (token,))
return render(request, 'zerver/log_into_subdomain_token_invalid.html', status=400)
# We extract fields provided by the caller via the data object.
# The only fields that are required are email and subdomain (if we
# are simply doing login); more fields are expected if this is a
# new account registration flow or we're going to a specific
# narrow after login.
subdomain = get_subdomain(request)
if data['subdomain'] != subdomain:
if result.data_dict['subdomain'] != subdomain:
raise JsonableError(_("Invalid subdomain"))
email_address = data['email']
full_name = data.get('name', '')
is_signup = data.get('is_signup', False)
redirect_to = data.get('next', '')
mobile_flow_otp = data.get('mobile_flow_otp')
desktop_flow_otp = data.get('desktop_flow_otp')
full_name_validated = data.get('full_name_validated', False)
multiuse_object_key = data.get('multiuse_object_key', '')
return login_or_register_remote_user(request, result)
# We cannot pass the actual authenticated user_profile object that
# was fetched by the original authentication backend and passed
# into redirect_and_log_into_subdomain through a signed URL token,
# so we need to re-fetch it from the database.
if is_signup:
# If we are creating a new user account, user_profile will
# always have been None, so we set that here. In the event
# that a user account with this email was somehow created in a
# race, the eventual registration code will catch that and
# throw an error, so we don't need to check for that here.
user_profile = None
else:
# We're just trying to login. We can be reasonably confident
# that this subdomain actually has a corresponding active
# realm, since the signed cookie proves there was one very
# recently. But as part of fetching the UserProfile object
# for the target user, we use DummyAuthBackend, which
# conveniently re-validates that the realm and user account
# were not deactivated in the meantime.
# Note: Ideally, we'd have a nice user-facing error message
# for the case where this auth fails (because e.g. the realm
# or user was deactivated since the signed cookie was
# generated < 15 seconds ago), but the authentication result
# is correct in those cases and such a race would be very
# rare, so a nice error message is low priority.
realm = get_realm(subdomain)
user_profile = authenticate_remote_user(realm, email_address)
return login_or_register_remote_user(request, email_address, user_profile,
full_name,
is_signup=is_signup, redirect_to=redirect_to,
mobile_flow_otp=mobile_flow_otp,
desktop_flow_otp=desktop_flow_otp,
multiuse_object_key=multiuse_object_key,
full_name_validated=full_name_validated)
def store_login_data(data: Dict[str, Any]) -> str:
key = put_dict_in_redis(redis_client, LOGIN_KEY_FORMAT, data,
expiration_seconds=LOGIN_KEY_EXPIRATION_SECONDS,
token_length=LOGIN_TOKEN_LENGTH)
token = key.split(LOGIN_KEY_PREFIX, 1)[1] # remove the prefix
return token
def get_login_data(token: str, should_delete: bool=True) -> Optional[Dict[str, Any]]:
key = LOGIN_KEY_FORMAT.format(token=token)
data = get_dict_from_redis(redis_client, LOGIN_KEY_FORMAT, key)
if data is not None and should_delete:
redis_client.delete(key)
return data
def redirect_and_log_into_subdomain(realm: Realm, full_name: str, email_address: str,
is_signup: bool=False, redirect_to: str='',
mobile_flow_otp: Optional[str]=None,
desktop_flow_otp: Optional[str]=None,
multiuse_object_key: str='',
full_name_validated: bool=False,) -> HttpResponse:
data = {'name': full_name, 'email': email_address, 'subdomain': realm.subdomain,
'is_signup': is_signup, 'next': redirect_to,
'mobile_flow_otp': mobile_flow_otp,
'desktop_flow_otp': desktop_flow_otp,
'multiuse_object_key': multiuse_object_key,
'full_name_validated': full_name_validated}
token = store_login_data(data)
def redirect_and_log_into_subdomain(result: ExternalAuthResult) -> HttpResponse:
token = result.store_data()
realm = get_realm(result.data_dict["subdomain"])
subdomain_login_uri = (realm.uri
+ reverse('zerver.views.auth.log_into_subdomain', args=[token]))
return redirect(subdomain_login_uri)

View File

@@ -37,7 +37,7 @@ from zerver.views.auth import create_preregistration_user, redirect_and_log_into
from zproject.backends import ldap_auth_enabled, password_auth_enabled, \
ZulipLDAPExceptionNoMatchingLDAPUser, email_auth_enabled, ZulipLDAPAuthBackend, \
email_belongs_to_ldap, any_social_backend_enabled
email_belongs_to_ldap, any_social_backend_enabled, ExternalAuthResult
from confirmation.models import Confirmation, RealmCreationKey, ConfirmationKeyException, \
validate_key, create_confirmation_link, get_object_from_key, \
@@ -343,7 +343,7 @@ def accounts_register(request: HttpRequest) -> HttpResponse:
# Because for realm creation, registration happens on the
# root domain, we need to log them into the subdomain for
# their new realm.
return redirect_and_log_into_subdomain(realm, full_name, email)
return redirect_and_log_into_subdomain(ExternalAuthResult(user_profile=user_profile))
# This dummy_backend check below confirms the user is
# authenticating to the correct subdomain.

View File

@@ -16,7 +16,8 @@ import copy
import logging
import magic
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, TypeVar, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, TypeVar, Union, \
cast
from typing_extensions import TypedDict
from zxcvbn import zxcvbn
@@ -24,7 +25,7 @@ from django_auth_ldap.backend import LDAPBackend, LDAPReverseEmailSearch, \
_LDAPUser, ldap_error
from decorator import decorator
from django.contrib.auth import get_backends
from django.contrib.auth import authenticate, get_backends
from django.contrib.auth.backends import RemoteUserBackend
from django.conf import settings
from django.core.exceptions import ValidationError
@@ -907,6 +908,94 @@ def external_auth_method(cls: Type[ExternalAuthMethod]) -> Type[ExternalAuthMeth
EXTERNAL_AUTH_METHODS.append(cls)
return cls
# We want to be able to store this data in redis, so it has to be easy to serialize.
# That's why we avoid having fields that could pose a problem for that.
ExternalAuthDataDict = TypedDict('ExternalAuthDataDict', {
'subdomain': str,
'full_name': str,
'email': str,
'is_signup': bool,
'redirect_to': str,
'mobile_flow_otp': Optional[str],
'desktop_flow_otp': Optional[str],
'multiuse_object_key': str,
'full_name_validated': bool,
}, total=False)
class ExternalAuthResult:
LOGIN_KEY_PREFIX = "login_key_"
LOGIN_KEY_FORMAT = LOGIN_KEY_PREFIX + "{token}"
LOGIN_KEY_EXPIRATION_SECONDS = 15
LOGIN_TOKEN_LENGTH = UserProfile.API_KEY_LENGTH
def __init__(self, *, user_profile: Optional[UserProfile]=None,
data_dict: Optional[ExternalAuthDataDict]=None,
login_token: Optional[str]=None,
delete_stored_data: bool=True) -> None:
if data_dict is None:
data_dict = {}
if login_token is not None:
assert (not data_dict) and (user_profile is None), ("Passing in data_dict or user_profile " +
"with login_token is disallowed.")
self.instantiate_with_token(login_token, delete_stored_data)
else:
self.data_dict = cast(ExternalAuthDataDict, {**data_dict})
self.user_profile = user_profile
if self.user_profile is not None:
# Ensure data inconsistent with the user_profile wasn't passed in inside the data_dict argument.
assert 'full_name' not in data_dict or data_dict['full_name'] == self.user_profile.full_name
assert 'email' not in data_dict or data_dict['email'] == self.user_profile.delivery_email
# Update these data_dict fields to ensure consistency with self.user_profile. This is mostly
# defensive code, but is useful in these scenarios:
# 1. user_profile argument was passed in, and no full_name or email_data in the data_dict arg.
# 2. We're instantiating from the login_token and the user has changed their full_name since
# the data was stored under the token.
self.data_dict['full_name'] = self.user_profile.full_name
self.data_dict['email'] = self.user_profile.delivery_email
if 'subdomain' not in self.data_dict:
self.data_dict['subdomain'] = self.user_profile.realm.subdomain
if not self.user_profile.is_mirror_dummy:
self.data_dict['is_signup'] = False
def copy_data_dict(self) -> ExternalAuthDataDict:
return self.data_dict.copy()
def store_data(self) -> str:
key = put_dict_in_redis(redis_client, self.LOGIN_KEY_FORMAT, cast(Dict[str, Any], self.data_dict),
expiration_seconds=self.LOGIN_KEY_EXPIRATION_SECONDS,
token_length=self.LOGIN_TOKEN_LENGTH)
token = key.split(self.LOGIN_KEY_PREFIX, 1)[1] # remove the prefix
return token
def instantiate_with_token(self, token: str, delete_stored_data: bool=True) -> None:
key = self.LOGIN_KEY_FORMAT.format(token=token)
data = get_dict_from_redis(redis_client, self.LOGIN_KEY_FORMAT, key)
if data is None or None in [data.get("email"), data.get("subdomain")]:
raise self.InvalidTokenError
if delete_stored_data:
redis_client.delete(key)
self.data_dict = cast(ExternalAuthDataDict, data)
# Here we refetch the UserProfile object (if any) for this
# ExternalAuthResult. Using authenticate() will re-check for
# (unlikely) races like the realm or user having been deactivated
# between generating this ExternalAuthResult and accessing it.
#
# In theory, we should return_data here so the caller can do
# more customized error messages for those unlikely races, but
# it's likely not worth implementing.
realm = get_realm(data['subdomain'])
self.user_profile = authenticate(username=data['email'], realm=realm,
use_dummy_backend=True)
class InvalidTokenError(Exception):
pass
@external_auth_method
class ZulipRemoteUserBackend(RemoteUserBackend, ExternalAuthMethod):
"""Authentication backend that reads the Apache REMOTE_USER variable.
@@ -1178,14 +1267,21 @@ def social_auth_finish(backend: Any,
# The next step is to call login_or_register_remote_user, but
# there are two code paths here because of an optimization to save
# a redirect on mobile and desktop.
data_dict = ExternalAuthDataDict(
subdomain=realm.subdomain,
is_signup=is_signup,
redirect_to=redirect_to,
multiuse_object_key=multiuse_object_key,
full_name_validated=full_name_validated,
mobile_flow_otp=mobile_flow_otp,
desktop_flow_otp=desktop_flow_otp
)
if user_profile is None:
data_dict.update(dict(full_name=full_name, email=email_address))
result = ExternalAuthResult(user_profile=user_profile, data_dict=data_dict)
if mobile_flow_otp or desktop_flow_otp:
extra_kwargs = {}
if mobile_flow_otp:
extra_kwargs["mobile_flow_otp"] = mobile_flow_otp
elif desktop_flow_otp:
extra_kwargs["desktop_flow_otp"] = desktop_flow_otp
if user_profile is not None and not user_profile.is_mirror_dummy:
# For mobile and desktop app authentication, login_or_register_remote_user
# will redirect to a special zulip:// URL that is handled by
@@ -1193,14 +1289,7 @@ def social_auth_finish(backend: Any,
# redirect directly from here, saving a round trip over what
# we need to do to create session cookies on the right domain
# in the web login flow (below).
return login_or_register_remote_user(
strategy.request, email_address,
user_profile, full_name,
is_signup=is_signup,
redirect_to=redirect_to,
full_name_validated=full_name_validated,
**extra_kwargs
)
return login_or_register_remote_user(strategy.request, result)
else:
# The user needs to register, so we need to go the realm's
# subdomain for that.
@@ -1219,15 +1308,7 @@ def social_auth_finish(backend: Any,
# cryptographically signed token) to a route on
# subdomain.zulip.example.com that will verify the signature and
# then call login_or_register_remote_user.
return redirect_and_log_into_subdomain(
realm, full_name, email_address,
is_signup=is_signup,
redirect_to=redirect_to,
multiuse_object_key=multiuse_object_key,
full_name_validated=full_name_validated,
mobile_flow_otp=mobile_flow_otp,
desktop_flow_otp=desktop_flow_otp
)
return redirect_and_log_into_subdomain(result)
class SocialAuthMixin(ZulipAuthMixin, ExternalAuthMethod):
# Whether we expect that the full_name value obtained by the