Files
zulip/zilencer/auth.py
Zixuan James Li 26a518267a rate_limit: Replace rate_limit with inlined rate limit checks.
This change incorporate should_rate_limit into rate_limit_user and
rate_limit_request_by_ip. Note a slight behavior change to other callers
to rate_limit_request_by_ip is made as we now check if the client is
eligible to be exempted from rate limiting now, which was previously
only done as a part of zerver.lib.rate_limiter.rate_limit.

Now we mock zerver.lib.rate_limiter.RateLimitedUser instead of
zerver.decorator.rate_limit_user in
zerver.tests.test_decorators.RateLimitTestCase, because rate_limit_user
will always be called but rate limit only happens the should_rate_limit
check passes;

we can continue to mock zerver.lib.rate_limiter.rate_limit_ip, because the
decorated view functions call rate_limit_request_by_ip that calls
rate_limit_ip when the should_rate_limit check passes.

We need to mock zerver.decorator.rate_limit_user for SkipRateLimitingTest
now because rate_limit has been removed. We don't need to mock
RateLimitedUser in this case because we are only verifying that
the skip_rate_limiting flag works.

To ensure coverage in add_logging_data, a new test case is added to use
a web_public_view (which decorates the view function with
add_logging_data) with a new flag to check_rate_limit_public_or_user_views.

Signed-off-by: Zixuan James Li <p359101898@gmail.com>
2022-08-17 12:05:38 -07:00

123 lines
4.0 KiB
Python

import logging
from functools import wraps
from typing import Any, Callable
from django.http import HttpRequest, HttpResponse
from django.urls import path
from django.urls.resolvers import URLPattern
from django.utils.crypto import constant_time_compare
from django.utils.translation import gettext as _
from typing_extensions import Concatenate, ParamSpec
from zerver.decorator import get_basic_credentials, process_client
from zerver.lib.exceptions import (
ErrorCode,
JsonableError,
RateLimited,
RemoteServerDeactivatedError,
UnauthorizedError,
)
from zerver.lib.rate_limiter import should_rate_limit
from zerver.lib.request import RequestNotes
from zerver.lib.rest import get_target_view_function_or_response
from zerver.lib.subdomains import get_subdomain
from zerver.models import Realm
from zilencer.models import (
RateLimitedRemoteZulipServer,
RemoteZulipServer,
get_remote_server_by_uuid,
)
logger = logging.getLogger(__name__)
ParamT = ParamSpec("ParamT")
class InvalidZulipServerError(JsonableError):
code = ErrorCode.INVALID_ZULIP_SERVER
data_fields = ["role"]
def __init__(self, role: str) -> None:
self.role: str = role
@staticmethod
def msg_format() -> str:
return "Zulip server auth failure: {role} is not registered -- did you run `manage.py register_server`?"
class InvalidZulipServerKeyError(InvalidZulipServerError):
@staticmethod
def msg_format() -> str:
return "Zulip server auth failure: key does not match role {role}"
def rate_limit_remote_server(
request: HttpRequest, remote_server: RemoteZulipServer, domain: str
) -> None:
if not should_rate_limit(request):
return
try:
RateLimitedRemoteZulipServer(remote_server, domain=domain).rate_limit_request(request)
except RateLimited as e:
logger.warning("Remote server %s exceeded rate limits on domain %s", remote_server, domain)
raise e
def validate_remote_server(
request: HttpRequest,
role: str,
api_key: str,
) -> RemoteZulipServer:
try:
remote_server = get_remote_server_by_uuid(role)
except RemoteZulipServer.DoesNotExist:
raise InvalidZulipServerError(role)
if not constant_time_compare(api_key, remote_server.api_key):
raise InvalidZulipServerKeyError(role)
if remote_server.deactivated:
raise RemoteServerDeactivatedError()
if get_subdomain(request) != Realm.SUBDOMAIN_FOR_ROOT_DOMAIN:
raise JsonableError(_("Invalid subdomain for push notifications bouncer"))
RequestNotes.get_notes(request).remote_server = remote_server
process_client(request)
return remote_server
def authenticated_remote_server_view(
view_func: Callable[Concatenate[HttpRequest, RemoteZulipServer, ParamT], HttpResponse]
) -> Callable[Concatenate[HttpRequest, ParamT], HttpResponse]:
@wraps(view_func)
def _wrapped_view_func(
request: HttpRequest, /, *args: ParamT.args, **kwargs: ParamT.kwargs
) -> HttpResponse:
role, api_key = get_basic_credentials(request)
if "@" in role:
raise JsonableError(_("Must validate with valid Zulip server API key"))
try:
remote_server = validate_remote_server(request, role, api_key)
except JsonableError as e:
raise UnauthorizedError(e.msg)
rate_limit_remote_server(request, remote_server, domain="api_by_remote_server")
return view_func(request, remote_server, *args, **kwargs)
return _wrapped_view_func
def remote_server_dispatch(request: HttpRequest, **kwargs: Any) -> HttpResponse:
result = get_target_view_function_or_response(request, kwargs)
if isinstance(result, HttpResponse):
return result
target_function, view_flags = result
return authenticated_remote_server_view(target_function)(request, **kwargs)
def remote_server_path(
route: str,
**handlers: Callable[Concatenate[HttpRequest, RemoteZulipServer, ParamT], HttpResponse],
) -> URLPattern:
return path(route, remote_server_dispatch, handlers)