Files
zulip/zerver/tests/test_auth_backends.py
Tim Abbott e93a2e990f Fix nondeterministic parsing failures in GoogleLoginTest.
Apparently, in urllib.parse, one need to extract the query string from
the rest of the URL before parsing the query string, otherwise the
very first query parameter will have rest of the URL in its name.

This results in a nondeterministic failure that happens 1/N of the
time, where N is the number of fields marshalled from a dictionary
into the query string.
2016-09-13 18:13:28 -07:00

570 lines
26 KiB
Python

# -*- coding: utf-8 -*-
from django.conf import settings
from django.http import HttpResponse
from django.test import TestCase
from django_auth_ldap.backend import _LDAPUser
from django.test.client import RequestFactory
from typing import Any, Callable, Dict
import mock
import re
from zerver.lib.actions import do_deactivate_realm, do_deactivate_user, \
do_reactivate_realm, do_reactivate_user
from zerver.lib.initial_password import initial_password
from zerver.lib.session_user import get_session_dict_user
from zerver.lib.test_helpers import (
ZulipTestCase
)
from zerver.models import \
get_realm, get_user_profile_by_email, email_to_username, UserProfile
from zproject.backends import ZulipDummyBackend, EmailAuthBackend, \
GoogleMobileOauth2Backend, ZulipRemoteUserBackend, ZulipLDAPAuthBackend, \
ZulipLDAPUserPopulator, DevAuthBackend, GitHubAuthBackend
from social.strategies.django_strategy import DjangoStrategy
from social.storage.django_orm import BaseDjangoStorage
from social.backends.github import GithubOrganizationOAuth2, GithubTeamOAuth2, \
GithubOAuth2
from six import text_type
from six.moves import urllib
import ujson
class AuthBackendTest(TestCase):
def verify_backend(self, backend, good_args=None,
good_kwargs=None, bad_kwargs=None,
email_to_username=None):
# type: (Any, List[Any], Dict[str, Any], Dict[str, Any], Callable[[text_type], text_type]) -> None
if good_args is None:
good_args = []
if good_kwargs is None:
good_kwargs = {}
email = u"hamlet@zulip.com"
user_profile = get_user_profile_by_email(email)
username = email
if email_to_username is not None:
username = email_to_username(email)
# If bad_kwargs was specified, verify auth fails in that case
if bad_kwargs is not None:
self.assertIsNone(backend.authenticate(username, **bad_kwargs))
# Verify auth works
result = backend.authenticate(username, *good_args, **good_kwargs)
self.assertEqual(user_profile, result)
# Verify auth fails with a deactivated user
do_deactivate_user(user_profile)
self.assertIsNone(backend.authenticate(username, *good_args, **good_kwargs))
# Reactivate the user and verify auth works again
do_reactivate_user(user_profile)
result = backend.authenticate(username, *good_args, **good_kwargs)
self.assertEqual(user_profile, result)
# Verify auth fails with a deactivated realm
do_deactivate_realm(user_profile.realm)
self.assertIsNone(backend.authenticate(username, *good_args, **good_kwargs))
# Verify auth works again after reactivating the realm
do_reactivate_realm(user_profile.realm)
result = backend.authenticate(username, *good_args, **good_kwargs)
self.assertEqual(user_profile, result)
def test_dummy_backend(self):
# type: () -> None
self.verify_backend(ZulipDummyBackend(),
good_kwargs=dict(use_dummy_backend=True),
bad_kwargs=dict(use_dummy_backend=False))
def test_email_auth_backend(self):
# type: () -> None
email = "hamlet@zulip.com"
user_profile = get_user_profile_by_email(email)
password = "testpassword"
user_profile.set_password(password)
user_profile.save()
self.verify_backend(EmailAuthBackend(),
bad_kwargs=dict(password=''),
good_kwargs=dict(password=password))
def test_email_auth_backend_disabled_password_auth(self):
# type: () -> None
email = u"hamlet@zulip.com"
user_profile = get_user_profile_by_email(email)
password = "testpassword"
user_profile.set_password(password)
user_profile.save()
# Verify if a realm has password auth disabled, correct password is rejected
with mock.patch('zproject.backends.password_auth_enabled', return_value=False):
self.assertIsNone(EmailAuthBackend().authenticate(email, password))
def test_google_backend(self):
# type: () -> None
email = "hamlet@zulip.com"
backend = GoogleMobileOauth2Backend()
payload = dict(email_verified=True,
email=email)
with mock.patch('apiclient.sample_tools.client.verify_id_token', return_value=payload):
self.verify_backend(backend)
# Verify valid_attestation parameter is set correctly
unverified_payload = dict(email_verified=False)
with mock.patch('apiclient.sample_tools.client.verify_id_token', return_value=unverified_payload):
ret = dict() # type: Dict[str, str]
result = backend.authenticate(return_data=ret)
self.assertIsNone(result)
self.assertFalse(ret["valid_attestation"])
nonexistent_user_payload = dict(email_verified=True, email="invalid@zulip.com")
with mock.patch('apiclient.sample_tools.client.verify_id_token',
return_value=nonexistent_user_payload):
ret = dict()
result = backend.authenticate(return_data=ret)
self.assertIsNone(result)
self.assertTrue(ret["valid_attestation"])
def test_ldap_backend(self):
# type: () -> None
email = "hamlet@zulip.com"
password = "test_password"
backend = ZulipLDAPAuthBackend()
# Test LDAP auth fails when LDAP server rejects password
with mock.patch('django_auth_ldap.backend._LDAPUser._authenticate_user_dn', \
side_effect=_LDAPUser.AuthenticationFailed("Failed")), \
mock.patch('django_auth_ldap.backend._LDAPUser._check_requirements'), \
mock.patch('django_auth_ldap.backend._LDAPUser._get_user_attrs',
return_value=dict(full_name=['Hamlet'])):
self.assertIsNone(backend.authenticate(email, password))
# For this backend, we mock the internals of django_auth_ldap
with mock.patch('django_auth_ldap.backend._LDAPUser._authenticate_user_dn'), \
mock.patch('django_auth_ldap.backend._LDAPUser._check_requirements'), \
mock.patch('django_auth_ldap.backend._LDAPUser._get_user_attrs',
return_value=dict(full_name=['Hamlet'])):
self.verify_backend(backend, good_kwargs=dict(password=password))
def test_devauth_backend(self):
# type: () -> None
self.verify_backend(DevAuthBackend())
def test_remote_user_backend(self):
# type: () -> None
self.verify_backend(ZulipRemoteUserBackend())
def test_remote_user_backend_sso_append_domain(self):
# type: () -> None
with self.settings(SSO_APPEND_DOMAIN='zulip.com'):
self.verify_backend(ZulipRemoteUserBackend(),
email_to_username=email_to_username)
def test_github_backend(self):
# type: () -> None
email = 'hamlet@zulip.com'
good_kwargs = dict(response=dict(email=email), return_data=dict())
bad_kwargs = dict() # type: Dict[str, str]
self.verify_backend(GitHubAuthBackend(),
good_kwargs=good_kwargs,
bad_kwargs=bad_kwargs)
class GitHubAuthBackendTest(ZulipTestCase):
def setUp(self):
# type: () -> None
self.email = 'hamlet@zulip.com'
self.name = 'Hamlet'
self.backend = GitHubAuthBackend()
self.backend.strategy = DjangoStrategy(storage=BaseDjangoStorage())
self.user_profile = get_user_profile_by_email(self.email)
self.user_profile.backend = self.backend
def test_github_backend_do_auth(self):
# type: () -> None
def do_auth(*args, **kwargs):
# type: (*Any, **Any) -> UserProfile
return self.user_profile
with mock.patch('zerver.views.login_or_register_remote_user') as result, \
mock.patch('social.backends.github.GithubOAuth2.do_auth',
side_effect=do_auth):
response=dict(email=self.email, name=self.name)
self.backend.do_auth(response=response)
result.assert_called_with(None, self.email, self.user_profile,
self.name)
def test_github_backend_do_auth_for_default(self):
# type: () -> None
def authenticate(*args, **kwargs):
# type: (*Any, **Any) -> None
assert isinstance(kwargs['backend'], GithubOAuth2) == True
with mock.patch('social.backends.github.GithubOAuth2.user_data',
return_value=dict()), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth'), \
mock.patch('social.strategies.django_strategy.'
'DjangoStrategy.authenticate', side_effect=authenticate):
response=dict(email=self.email, name=self.name)
self.backend.do_auth('fake-access-token', response=response)
def test_github_backend_do_auth_for_team(self):
# type: () -> None
def authenticate(*args, **kwargs):
# type: (*Any, **Any) -> None
assert isinstance(kwargs['backend'], GithubTeamOAuth2) == True
with mock.patch('social.backends.github.GithubTeamOAuth2.user_data',
return_value=dict()), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth'), \
mock.patch('social.strategies.django_strategy.'
'DjangoStrategy.authenticate', side_effect=authenticate):
response=dict(email=self.email, name=self.name)
with self.settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp'):
self.backend.do_auth('fake-access-token', response=response)
def test_github_backend_do_auth_for_org(self):
# type: () -> None
def authenticate(*args, **kwargs):
# type: (*Any, **Any) -> None
assert isinstance(kwargs['backend'], GithubOrganizationOAuth2) == True
with mock.patch('social.backends.github.GithubOrganizationOAuth2.user_data',
return_value=dict()), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth'), \
mock.patch('social.strategies.django_strategy.'
'DjangoStrategy.authenticate', side_effect=authenticate):
response=dict(email=self.email, name=self.name)
with self.settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip'):
self.backend.do_auth('fake-access-token', response=response)
def test_github_backend_inactive_user(self):
# type: () -> None
def do_auth_inactive(*args, **kwargs):
# type: (*Any, **Any) -> UserProfile
return_data = kwargs['return_data']
return_data['inactive_user'] = True
return self.user_profile
with mock.patch('zerver.views.login_or_register_remote_user') as result, \
mock.patch('social.backends.github.GithubOAuth2.do_auth',
side_effect=do_auth_inactive):
response=dict(email=self.email, name=self.name)
user = self.backend.do_auth(response=response)
result.assert_not_called()
self.assertIs(user, None)
def test_github_backend_new_user(self):
# type: () -> None
rf = RequestFactory()
request = rf.get('/complete')
request.session = {}
request.user = self.user_profile
self.backend.strategy.request = request
def do_auth(*args, **kwargs):
# type: (*Any, **Any) -> UserProfile
return_data = kwargs['return_data']
return_data['valid_attestation'] = True
return None
with mock.patch('social.backends.github.GithubOAuth2.do_auth',
side_effect=do_auth):
response=dict(email='nonexisting@phantom.com', name='Ghost')
result = self.backend.do_auth(response=response)
self.assert_in_response('action="/register/"', result)
self.assert_in_response('Your e-mail does not match any '
'existing open organization.', result)
class ResponseMock(object):
def __init__(self, status_code, data):
# type: (int, Any) -> None
self.status_code = status_code
self.data = data
def json(self):
# type: () -> str
return self.data
@property
def text(self):
# type: () -> str
return "Response text"
class GoogleLoginTest(ZulipTestCase):
def google_oauth2_test(self, token_response, account_response):
# type: (ResponseMock, ResponseMock) -> HttpResponse
result = self.client_get("/accounts/login/google/")
self.assertEquals(result.status_code, 302)
# Now extract the CSRF token from the redirect URL
parsed_url = urllib.parse.urlparse(result.url)
csrf_state = urllib.parse.parse_qs(parsed_url.query)['state']
with mock.patch("requests.post", return_value=token_response), \
mock.patch("requests.get", return_value=account_response):
result = self.client_get("/accounts/login/google/done/",
dict(state=csrf_state))
return result
def test_google_oauth2_success(self):
# type: () -> None
token_response = ResponseMock(200, {'access_token': "unique_token"})
account_data = dict(name=dict(formatted="Full Name"),
emails=[dict(type="account",
value="hamlet@zulip.com")])
account_response = ResponseMock(200, account_data)
self.google_oauth2_test(token_response, account_response)
user_profile = get_user_profile_by_email('hamlet@zulip.com')
self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
def test_google_oauth2_registration(self):
# type: () -> None
"""If the user doesn't exist yet, Google auth can be used to register an account"""
email = "newuser@zulip.com"
token_response = ResponseMock(200, {'access_token': "unique_token"})
account_data = dict(name=dict(formatted="Full Name"),
emails=[dict(type="account",
value=email)])
account_response = ResponseMock(200, account_data)
result = self.google_oauth2_test(token_response, account_response)
self.assertEqual(result.status_code, 302)
result = self.client_get(result.url)
key_match = re.search('value="(?P<key>[0-9a-f]+)" name="key"', result.content.decode("utf-8"))
name_match = re.search('value="(?P<name>[^"]+)" name="full_name"', result.content.decode("utf-8"))
# This goes through a brief stop on a page that auto-submits via JS
result = self.client_post('/accounts/register/',
{'full_name': name_match.group("name"),
'key': key_match.group("key"),
'from_confirmation': "1"})
self.assertEquals(result.status_code, 200)
result = self.client_post('/accounts/register/',
{'full_name': "New User",
'password': 'test_password',
'key': key_match.group("key"),
'terms': True})
self.assertEquals(result.status_code, 302)
self.assertEquals(result.url, "http://testserver/")
def test_google_oauth2_400_token_response(self):
# type: () -> None
token_response = ResponseMock(400, {})
with mock.patch("logging.warning") as m:
result = self.google_oauth2_test(token_response, None)
self.assertEquals(result.status_code, 400)
self.assertEquals(m.call_args_list[0][0][0],
"User error converting Google oauth2 login to token: Response text")
def test_google_oauth2_500_token_response(self):
# type: () -> None
token_response = ResponseMock(500, {})
with mock.patch("logging.error") as m:
result = self.google_oauth2_test(token_response, None)
self.assertEquals(result.status_code, 400)
self.assertEquals(m.call_args_list[0][0][0],
"Could not convert google oauth2 code to access_token: Response text")
def test_google_oauth2_400_account_response(self):
# type: () -> None
token_response = ResponseMock(200, {'access_token': "unique_token"})
account_response = ResponseMock(400, {})
with mock.patch("logging.warning") as m:
result = self.google_oauth2_test(token_response, account_response)
self.assertEquals(result.status_code, 400)
self.assertEquals(m.call_args_list[0][0][0],
"Google login failed making info API call: Response text")
def test_google_oauth2_500_account_response(self):
# type: () -> None
token_response = ResponseMock(200, {'access_token': "unique_token"})
account_response = ResponseMock(500, {})
with mock.patch("logging.error") as m:
result = self.google_oauth2_test(token_response, account_response)
self.assertEquals(result.status_code, 400)
self.assertEquals(m.call_args_list[0][0][0],
"Google login failed making API call: Response text")
def test_google_oauth2_no_fullname(self):
# type: () -> None
token_response = ResponseMock(200, {'access_token': "unique_token"})
account_data = dict(name=dict(givenName="Test", familyName="User"),
emails=[dict(type="account",
value="hamlet@zulip.com")])
account_response = ResponseMock(200, account_data)
self.google_oauth2_test(token_response, account_response)
user_profile = get_user_profile_by_email('hamlet@zulip.com')
self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
def test_google_oauth2_account_response_no_email(self):
# type: () -> None
token_response = ResponseMock(200, {'access_token': "unique_token"})
account_data = dict(name=dict(formatted="Full Name"),
emails=[])
account_response = ResponseMock(200, account_data)
with mock.patch("logging.error") as m:
result = self.google_oauth2_test(token_response, account_response)
self.assertEquals(result.status_code, 400)
self.assertIn("Google oauth2 account email not found:", m.call_args_list[0][0][0])
def test_google_oauth2_error_access_denied(self):
# type: () -> None
result = self.client_get("/accounts/login/google/done/?error=access_denied")
self.assertEquals(result.status_code, 302)
self.assertEquals(result.url, "http://testserver/")
def test_google_oauth2_error_other(self):
# type: () -> None
with mock.patch("logging.warning") as m:
result = self.client_get("/accounts/login/google/done/?error=some_other_error")
self.assertEquals(result.status_code, 400)
self.assertEquals(m.call_args_list[0][0][0],
"Error from google oauth2 login: some_other_error")
def test_google_oauth2_missing_csrf(self):
# type: () -> None
with mock.patch("logging.warning") as m:
result = self.client_get("/accounts/login/google/done/")
self.assertEquals(result.status_code, 400)
self.assertEquals(m.call_args_list[0][0][0],
'Missing Google oauth2 CSRF state')
def test_google_oauth2_csrf_malformed(self):
# type: () -> None
with mock.patch("logging.warning") as m:
result = self.client_get("/accounts/login/google/done/?state=badstate")
self.assertEquals(result.status_code, 400)
self.assertEquals(m.call_args_list[0][0][0],
'Missing Google oauth2 CSRF state')
def test_google_oauth2_csrf_badstate(self):
# type: () -> None
with mock.patch("logging.warning") as m:
result = self.client_get("/accounts/login/google/done/?state=badstate:otherbadstate")
self.assertEquals(result.status_code, 400)
self.assertEquals(m.call_args_list[0][0][0],
'Google oauth2 CSRF error')
class FetchAPIKeyTest(ZulipTestCase):
def setUp(self):
# type: () -> None
self.email = "hamlet@zulip.com"
self.user_profile = get_user_profile_by_email(self.email)
def test_success(self):
# type: () -> None
result = self.client_post("/api/v1/fetch_api_key",
dict(username=self.email,
password=initial_password(self.email)))
self.assert_json_success(result)
def test_wrong_password(self):
# type: () -> None
result = self.client_post("/api/v1/fetch_api_key",
dict(username=self.email,
password="wrong"))
self.assert_json_error(result, "Your username or password is incorrect.", 403)
def test_password_auth_disabled(self):
# type: () -> None
with mock.patch('zproject.backends.password_auth_enabled', return_value=False):
result = self.client_post("/api/v1/fetch_api_key",
dict(username=self.email,
password=initial_password(self.email)))
self.assert_json_error_contains(result, "Password auth is disabled", 403)
def test_inactive_user(self):
# type: () -> None
do_deactivate_user(self.user_profile)
result = self.client_post("/api/v1/fetch_api_key",
dict(username=self.email,
password=initial_password(self.email)))
self.assert_json_error_contains(result, "Your account has been disabled", 403)
def test_deactivated_realm(self):
# type: () -> None
do_deactivate_realm(self.user_profile.realm)
result = self.client_post("/api/v1/fetch_api_key",
dict(username=self.email,
password=initial_password(self.email)))
self.assert_json_error_contains(result, "Your realm has been deactivated", 403)
class DevFetchAPIKeyTest(ZulipTestCase):
def setUp(self):
# type: () -> None
self.email = "hamlet@zulip.com"
self.user_profile = get_user_profile_by_email(self.email)
def test_success(self):
# type: () -> None
result = self.client_post("/api/v1/dev_fetch_api_key",
dict(username=self.email))
self.assert_json_success(result)
data = ujson.loads(result.content)
self.assertEqual(data["email"], self.email)
self.assertEqual(data['api_key'], self.user_profile.api_key)
def test_inactive_user(self):
# type: () -> None
do_deactivate_user(self.user_profile)
result = self.client_post("/api/v1/dev_fetch_api_key",
dict(username=self.email))
self.assert_json_error_contains(result, "Your account has been disabled", 403)
def test_deactivated_realm(self):
# type: () -> None
do_deactivate_realm(self.user_profile.realm)
result = self.client_post("/api/v1/dev_fetch_api_key",
dict(username=self.email))
self.assert_json_error_contains(result, "Your realm has been deactivated", 403)
def test_dev_auth_disabled(self):
# type: () -> None
with mock.patch('zerver.views.dev_auth_enabled', return_value=False):
result = self.client_post("/api/v1/dev_fetch_api_key",
dict(username=self.email))
self.assert_json_error_contains(result, "Dev environment not enabled.", 400)
class DevGetEmailsTest(ZulipTestCase):
def test_success(self):
# type: () -> None
result = self.client_get("/api/v1/dev_get_emails")
self.assert_json_success(result)
self.assert_in_response("direct_admins", result)
self.assert_in_response("direct_users", result)
def test_dev_auth_disabled(self):
# type: () -> None
with mock.patch('zerver.views.dev_auth_enabled', return_value=False):
result = self.client_get("/api/v1/dev_get_emails")
self.assert_json_error_contains(result, "Dev environment not enabled.", 400)
class FetchAuthBackends(ZulipTestCase):
def test_fetch_auth_backend_format(self):
# type: () -> None
result = self.client_get("/api/v1/get_auth_backends")
self.assert_json_success(result)
data = ujson.loads(result.content)
self.assertEqual(set(data.keys()),
{'msg', 'password', 'google', 'dev', 'result'})
for backend in set(data.keys()) - {'msg', 'result'}:
self.assertTrue(isinstance(data[backend], bool))
def test_fetch_auth_backend(self):
# type: () -> None
backends = [GoogleMobileOauth2Backend(), DevAuthBackend()]
with mock.patch('django.contrib.auth.get_backends', return_value=backends):
result = self.client_get("/api/v1/get_auth_backends")
self.assert_json_success(result)
data = ujson.loads(result.content)
self.assertEqual(data, {
'msg': '',
'password': False,
'google': True,
'dev': True,
'result': 'success',
})