mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	2FA: Add zulip_otp_required decorator.
We need to add this because otp_required doesn't play well with tests.
This commit is contained in:
		@@ -1,7 +1,10 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import django_otp
 | 
					import django_otp
 | 
				
			||||||
from two_factor.utils import default_device
 | 
					from two_factor.utils import default_device
 | 
				
			||||||
 | 
					from django_otp import user_has_device, _user_is_authenticated
 | 
				
			||||||
 | 
					from django_otp.conf import settings as otp_settings
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from django.contrib.auth.decorators import user_passes_test as django_user_passes_test
 | 
				
			||||||
from django.utils.translation import ugettext as _
 | 
					from django.utils.translation import ugettext as _
 | 
				
			||||||
from django.http import HttpResponseRedirect, HttpResponse
 | 
					from django.http import HttpResponseRedirect, HttpResponse
 | 
				
			||||||
from django.contrib.auth import REDIRECT_FIELD_NAME, login as django_login
 | 
					from django.contrib.auth import REDIRECT_FIELD_NAME, login as django_login
 | 
				
			||||||
@@ -445,10 +448,16 @@ def zulip_login_required(
 | 
				
			|||||||
        login_url=login_url,
 | 
					        login_url=login_url,
 | 
				
			||||||
        redirect_field_name=redirect_field_name
 | 
					        redirect_field_name=redirect_field_name
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    otp_required_decorator = zulip_otp_required(
 | 
				
			||||||
 | 
					        redirect_field_name=redirect_field_name,
 | 
				
			||||||
 | 
					        login_url=login_url
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if function:
 | 
					    if function:
 | 
				
			||||||
        # Add necessary logging data via add_logging_data
 | 
					        # Add necessary logging data via add_logging_data
 | 
				
			||||||
        return actual_decorator(add_logging_data(function))
 | 
					        return actual_decorator(zulip_otp_required(add_logging_data(function)))
 | 
				
			||||||
    return actual_decorator  # nocoverage # We don't use this without a function
 | 
					    return actual_decorator(otp_required_decorator)  # nocoverage # We don't use this without a function
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def require_server_admin(view_func: ViewFuncT) -> ViewFuncT:
 | 
					def require_server_admin(view_func: ViewFuncT) -> ViewFuncT:
 | 
				
			||||||
    @zulip_login_required
 | 
					    @zulip_login_required
 | 
				
			||||||
@@ -797,3 +806,37 @@ def return_success_on_head_request(view_func: ViewFuncT) -> ViewFuncT:
 | 
				
			|||||||
            return json_success()
 | 
					            return json_success()
 | 
				
			||||||
        return view_func(request, *args, **kwargs)
 | 
					        return view_func(request, *args, **kwargs)
 | 
				
			||||||
    return _wrapped_view_func  # type: ignore # https://github.com/python/mypy/issues/1927
 | 
					    return _wrapped_view_func  # type: ignore # https://github.com/python/mypy/issues/1927
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def zulip_otp_required(view: Any=None,
 | 
				
			||||||
 | 
					                       redirect_field_name: str='next',
 | 
				
			||||||
 | 
					                       login_url: str=settings.HOME_NOT_LOGGED_IN,
 | 
				
			||||||
 | 
					                       ) -> Callable[..., HttpResponse]:
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					    The reason we need to create this function is that the stock
 | 
				
			||||||
 | 
					    otp_required decorator doesn't play well with tests. We cannot
 | 
				
			||||||
 | 
					    enable/disable if_configured parameter during tests since the decorator
 | 
				
			||||||
 | 
					    retains its value due to closure.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Similar to :func:`~django.contrib.auth.decorators.login_required`, but
 | 
				
			||||||
 | 
					    requires the user to be :term:`verified`. By default, this redirects users
 | 
				
			||||||
 | 
					    to :setting:`OTP_LOGIN_URL`.
 | 
				
			||||||
 | 
					    """
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test(user: UserProfile) -> bool:
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        :if_configured: If ``True``, an authenticated user with no confirmed
 | 
				
			||||||
 | 
					        OTP devices will be allowed. Default is ``False``. If ``False``,
 | 
				
			||||||
 | 
					        2FA will not do any authentication.
 | 
				
			||||||
 | 
					        """
 | 
				
			||||||
 | 
					        if_configured = settings.TWO_FACTOR_AUTHENTICATION_ENABLED
 | 
				
			||||||
 | 
					        if not if_configured:
 | 
				
			||||||
 | 
					            return True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return user.is_verified() or (_user_is_authenticated(user)
 | 
				
			||||||
 | 
					                                      and not user_has_device(user))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    decorator = django_user_passes_test(test,
 | 
				
			||||||
 | 
					                                        login_url=login_url,
 | 
				
			||||||
 | 
					                                        redirect_field_name=redirect_field_name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return decorator if (view is None) else decorator(view)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -6,6 +6,8 @@ import os
 | 
				
			|||||||
from collections import defaultdict
 | 
					from collections import defaultdict
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
 | 
					from typing import Any, Dict, Iterable, List, Optional, Tuple
 | 
				
			||||||
 | 
					from django_otp.conf import settings as otp_settings
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from django.test import TestCase, override_settings
 | 
					from django.test import TestCase, override_settings
 | 
				
			||||||
from django.http import HttpResponse, HttpRequest
 | 
					from django.http import HttpResponse, HttpRequest
 | 
				
			||||||
from django.test.client import RequestFactory
 | 
					from django.test.client import RequestFactory
 | 
				
			||||||
@@ -35,7 +37,8 @@ from zerver.decorator import (
 | 
				
			|||||||
    authenticate_notify, cachify,
 | 
					    authenticate_notify, cachify,
 | 
				
			||||||
    get_client_name, internal_notify_view, is_local_addr,
 | 
					    get_client_name, internal_notify_view, is_local_addr,
 | 
				
			||||||
    rate_limit, validate_api_key, logged_in_and_active,
 | 
					    rate_limit, validate_api_key, logged_in_and_active,
 | 
				
			||||||
    return_success_on_head_request, to_not_negative_int_or_none
 | 
					    return_success_on_head_request, to_not_negative_int_or_none,
 | 
				
			||||||
 | 
					    zulip_login_required
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from zerver.lib.cache import ignore_unhashable_lru_cache
 | 
					from zerver.lib.cache import ignore_unhashable_lru_cache
 | 
				
			||||||
from zerver.lib.validator import (
 | 
					from zerver.lib.validator import (
 | 
				
			||||||
@@ -1343,6 +1346,67 @@ class TestZulipLoginRequiredDecorator(ZulipTestCase):
 | 
				
			|||||||
            result = self.client_get('/accounts/accept_terms/')
 | 
					            result = self.client_get('/accounts/accept_terms/')
 | 
				
			||||||
            self.assertEqual(result.status_code, 302)
 | 
					            self.assertEqual(result.status_code, 302)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_2fa_failure(self) -> None:
 | 
				
			||||||
 | 
					        @zulip_login_required
 | 
				
			||||||
 | 
					        def test_view(request: HttpRequest) -> HttpResponse:
 | 
				
			||||||
 | 
					            return HttpResponse('Success')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        request = HttpRequest()
 | 
				
			||||||
 | 
					        request.META['SERVER_NAME'] = 'localhost'
 | 
				
			||||||
 | 
					        request.META['SERVER_PORT'] = 80
 | 
				
			||||||
 | 
					        request.META['PATH_INFO'] = ''
 | 
				
			||||||
 | 
					        request.user = hamlet = self.example_user('hamlet')
 | 
				
			||||||
 | 
					        request.user.is_verified = lambda: False
 | 
				
			||||||
 | 
					        self.login(hamlet.email)
 | 
				
			||||||
 | 
					        request.session = self.client.session
 | 
				
			||||||
 | 
					        request.get_host = lambda: 'zulip.testserver'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        response = test_view(request)
 | 
				
			||||||
 | 
					        content = getattr(response, 'content')
 | 
				
			||||||
 | 
					        self.assertEqual(content.decode(), 'Success')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        with self.settings(TWO_FACTOR_AUTHENTICATION_ENABLED=True):
 | 
				
			||||||
 | 
					            request = HttpRequest()
 | 
				
			||||||
 | 
					            request.META['SERVER_NAME'] = 'localhost'
 | 
				
			||||||
 | 
					            request.META['SERVER_PORT'] = 80
 | 
				
			||||||
 | 
					            request.META['PATH_INFO'] = ''
 | 
				
			||||||
 | 
					            request.user = hamlet = self.example_user('hamlet')
 | 
				
			||||||
 | 
					            request.user.is_verified = lambda: False
 | 
				
			||||||
 | 
					            self.login(hamlet.email)
 | 
				
			||||||
 | 
					            request.session = self.client.session
 | 
				
			||||||
 | 
					            request.get_host = lambda: 'zulip.testserver'
 | 
				
			||||||
 | 
					            self.create_default_device(request.user)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            response = test_view(request)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            status_code = getattr(response, 'status_code')
 | 
				
			||||||
 | 
					            self.assertEqual(status_code, 302)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            url = getattr(response, 'url')
 | 
				
			||||||
 | 
					            response_url = url.split("?")[0]
 | 
				
			||||||
 | 
					            self.assertEqual(response_url, settings.HOME_NOT_LOGGED_IN)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_2fa_success(self) -> None:
 | 
				
			||||||
 | 
					        @zulip_login_required
 | 
				
			||||||
 | 
					        def test_view(request: HttpRequest) -> HttpResponse:
 | 
				
			||||||
 | 
					            return HttpResponse('Success')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        with self.settings(TWO_FACTOR_AUTHENTICATION_ENABLED=True):
 | 
				
			||||||
 | 
					            request = HttpRequest()
 | 
				
			||||||
 | 
					            request.META['SERVER_NAME'] = 'localhost'
 | 
				
			||||||
 | 
					            request.META['SERVER_PORT'] = 80
 | 
				
			||||||
 | 
					            request.META['PATH_INFO'] = ''
 | 
				
			||||||
 | 
					            request.user = hamlet = self.example_user('hamlet')
 | 
				
			||||||
 | 
					            request.user.is_verified = lambda: True
 | 
				
			||||||
 | 
					            self.login(hamlet.email)
 | 
				
			||||||
 | 
					            request.session = self.client.session
 | 
				
			||||||
 | 
					            request.get_host = lambda: 'zulip.testserver'
 | 
				
			||||||
 | 
					            self.create_default_device(request.user)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            response = test_view(request)
 | 
				
			||||||
 | 
					            content = getattr(response, 'content')
 | 
				
			||||||
 | 
					            self.assertEqual(content.decode(), 'Success')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class TestRequireDecorators(ZulipTestCase):
 | 
					class TestRequireDecorators(ZulipTestCase):
 | 
				
			||||||
    def test_require_server_admin_decorator(self) -> None:
 | 
					    def test_require_server_admin_decorator(self) -> None:
 | 
				
			||||||
        user_email = self.example_email('hamlet')
 | 
					        user_email = self.example_email('hamlet')
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user