rest: Extract remote_server_path from rest_path.

This allows us to separate the zilencer paths from other JSON paths,
with explicit type annotation expecting `RemoteZulipServer` as the
second parameter of the handler using
authenticated_remote_server_view.

The test case is also updated to remove a test for a situation that no
longer occurs anymore, since we don't perform subdomain checks on
remote servers.

Signed-off-by: Zixuan James Li <p359101898@gmail.com>
This commit is contained in:
Zixuan James Li
2022-08-01 16:55:13 -04:00
committed by Tim Abbott
parent dd2fd8edda
commit 5c49e4ba06
5 changed files with 68 additions and 38 deletions

View File

@@ -1,14 +1,29 @@
from django.http import HttpRequest
from functools import wraps
from typing import Any, Callable
from django.http import HttpRequest, HttpResponse
from django.urls import path
from django.urls.resolvers import URLPattern
from django.utils.crypto import constant_time_compare
from django.utils.translation import gettext as _
from typing_extensions import Concatenate, ParamSpec
from zerver.decorator import process_client
from zerver.lib.exceptions import ErrorCode, JsonableError, RemoteServerDeactivatedError
from zerver.decorator import get_basic_credentials, process_client
from zerver.lib.exceptions import (
ErrorCode,
JsonableError,
RemoteServerDeactivatedError,
UnauthorizedError,
)
from zerver.lib.rate_limiter import rate_limit
from zerver.lib.request import RequestNotes
from zerver.lib.rest import get_target_view_function_or_response
from zerver.lib.subdomains import get_subdomain
from zerver.models import Realm
from zilencer.models import RemoteZulipServer, get_remote_server_by_uuid
ParamT = ParamSpec("ParamT")
class InvalidZulipServerError(JsonableError):
code = ErrorCode.INVALID_ZULIP_SERVER
@@ -48,3 +63,39 @@ def validate_remote_server(
RequestNotes.get_notes(request).remote_server = remote_server
process_client(request)
return remote_server
def authenticated_remote_server_view(
view_func: Callable[Concatenate[HttpRequest, RemoteZulipServer, ParamT], HttpResponse]
) -> Callable[Concatenate[HttpRequest, ParamT], HttpResponse]:
@wraps(view_func)
def _wrapped_view_func(
request: HttpRequest, /, *args: ParamT.args, **kwargs: ParamT.kwargs
) -> HttpResponse:
role, api_key = get_basic_credentials(request)
if "@" in role:
raise JsonableError(_("Must validate with valid Zulip server API key"))
try:
remote_server = validate_remote_server(request, role, api_key)
except JsonableError as e:
raise UnauthorizedError(e.msg)
rate_limit(request)
return view_func(request, remote_server, *args, **kwargs)
return _wrapped_view_func
def remote_server_dispatch(request: HttpRequest, **kwargs: Any) -> HttpResponse:
result = get_target_view_function_or_response(request, kwargs)
if isinstance(result, HttpResponse):
return result
target_function, view_flags = result
return authenticated_remote_server_view(target_function)(request, **kwargs)
def remote_server_path(
route: str,
**handlers: Callable[Concatenate[HttpRequest, RemoteZulipServer, ParamT], HttpResponse],
) -> URLPattern:
return path(route, remote_server_dispatch, handlers)