mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			288 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			288 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import logging
 | 
						|
from collections.abc import Collection
 | 
						|
from contextlib import suppress
 | 
						|
from typing import Any, Optional
 | 
						|
from urllib.parse import unquote
 | 
						|
 | 
						|
import tornado.web
 | 
						|
from asgiref.sync import sync_to_async
 | 
						|
from django import http
 | 
						|
from django.core import signals
 | 
						|
from django.core.handlers import base
 | 
						|
from django.core.handlers.wsgi import WSGIRequest, get_script_name
 | 
						|
from django.http import HttpRequest, HttpResponse
 | 
						|
from django.urls import set_script_prefix
 | 
						|
from django.utils.cache import patch_vary_headers
 | 
						|
from tornado.iostream import StreamClosedError
 | 
						|
from tornado.wsgi import WSGIContainer
 | 
						|
from typing_extensions import override
 | 
						|
 | 
						|
from zerver.lib.response import AsynchronousResponse, json_response
 | 
						|
from zerver.tornado.descriptors import get_descriptor_by_handler_id
 | 
						|
 | 
						|
current_handler_id = 0
 | 
						|
handlers: dict[int, "AsyncDjangoHandler"] = {}
 | 
						|
fake_wsgi_container = WSGIContainer(lambda environ, start_response: [])
 | 
						|
 | 
						|
 | 
						|
def get_handler_by_id(handler_id: int) -> Optional["AsyncDjangoHandler"]:
 | 
						|
    return handlers.get(handler_id)
 | 
						|
 | 
						|
 | 
						|
def allocate_handler_id(handler: "AsyncDjangoHandler") -> int:
 | 
						|
    global current_handler_id
 | 
						|
    handlers[current_handler_id] = handler
 | 
						|
    handler_id = current_handler_id
 | 
						|
    current_handler_id += 1
 | 
						|
    return handler_id
 | 
						|
 | 
						|
 | 
						|
def clear_handler_by_id(handler_id: int) -> None:
 | 
						|
    if handler_id in handlers:
 | 
						|
        del handlers[handler_id]
 | 
						|
 | 
						|
 | 
						|
def handler_stats_string() -> str:
 | 
						|
    return f"{len(handlers)} handlers, latest ID {current_handler_id}"
 | 
						|
 | 
						|
 | 
						|
def finish_handler(handler_id: int, event_queue_id: str, contents: list[dict[str, Any]]) -> None:
 | 
						|
    try:
 | 
						|
        # We do the import during runtime to avoid cyclic dependency
 | 
						|
        # with zerver.lib.request
 | 
						|
        from zerver.lib.request import RequestNotes
 | 
						|
        from zerver.middleware import async_request_timer_restart
 | 
						|
 | 
						|
        # The request handler may have been GC'd by a
 | 
						|
        # on_connection_close elsewhere already, so we have to check
 | 
						|
        # it is still around.
 | 
						|
        handler = get_handler_by_id(handler_id)
 | 
						|
        if handler is None:
 | 
						|
            return
 | 
						|
        request = handler._request
 | 
						|
        assert request is not None
 | 
						|
 | 
						|
        # We call async_request_timer_restart here in case we are
 | 
						|
        # being finished without any events (because another
 | 
						|
        # get_events request has supplanted this request)
 | 
						|
        async_request_timer_restart(request)
 | 
						|
        log_data = RequestNotes.get_notes(request).log_data
 | 
						|
        assert log_data is not None
 | 
						|
        if len(contents) != 1:
 | 
						|
            log_data["extra"] = f"[{event_queue_id}/1]"
 | 
						|
        else:
 | 
						|
            log_data["extra"] = "[{}/1/{}]".format(event_queue_id, contents[0]["type"])
 | 
						|
 | 
						|
        tornado.ioloop.IOLoop.current().add_callback(
 | 
						|
            handler.zulip_finish,
 | 
						|
            dict(result="success", msg="", events=contents, queue_id=event_queue_id),
 | 
						|
            request,
 | 
						|
        )
 | 
						|
    except Exception as e:
 | 
						|
        if not (
 | 
						|
            (isinstance(e, OSError) and str(e) == "Stream is closed")
 | 
						|
            or (isinstance(e, AssertionError) and str(e) == "Request closed")
 | 
						|
        ):
 | 
						|
            logging.exception(
 | 
						|
                "Got error finishing handler for queue %s", event_queue_id, stack_info=True
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
class AsyncDjangoHandler(tornado.web.RequestHandler):
 | 
						|
    handler_id: int
 | 
						|
 | 
						|
    SUPPORTED_METHODS: Collection[str] = {"GET", "POST", "DELETE"}  # type: ignore[assignment]  # https://github.com/tornadoweb/tornado/pull/3354
 | 
						|
 | 
						|
    @override
 | 
						|
    def initialize(self, django_handler: base.BaseHandler) -> None:
 | 
						|
        self.django_handler = django_handler
 | 
						|
 | 
						|
        # Prevent Tornado from automatically finishing the request
 | 
						|
        self._auto_finish = False
 | 
						|
 | 
						|
        # Handler IDs are allocated here, and the handler ID map must
 | 
						|
        # be cleared when the handler finishes its response.  See
 | 
						|
        # on_finish and on_connection_close.
 | 
						|
        self.handler_id = allocate_handler_id(self)
 | 
						|
 | 
						|
        self._request: HttpRequest | None = None
 | 
						|
 | 
						|
    @override
 | 
						|
    def on_finish(self) -> None:
 | 
						|
        # Note that this only runs on _successful_ requests.  If the
 | 
						|
        # client closes the connection, see on_connection_close,
 | 
						|
        # below.
 | 
						|
        clear_handler_by_id(self.handler_id)
 | 
						|
 | 
						|
    @override
 | 
						|
    def __repr__(self) -> str:
 | 
						|
        descriptor = get_descriptor_by_handler_id(self.handler_id)
 | 
						|
        return f"AsyncDjangoHandler<{self.handler_id}, {descriptor}>"
 | 
						|
 | 
						|
    async def convert_tornado_request_to_django_request(self) -> HttpRequest:
 | 
						|
        # This takes the WSGI environment that Tornado received (which
 | 
						|
        # fully describes the HTTP request that was sent to Tornado)
 | 
						|
        # and pass it to Django's WSGIRequest to generate a Django
 | 
						|
        # HttpRequest object with the original Tornado request's HTTP
 | 
						|
        # headers, parameters, etc.
 | 
						|
        environ = fake_wsgi_container.environ(self.request)
 | 
						|
        environ["PATH_INFO"] = unquote(environ["PATH_INFO"])
 | 
						|
 | 
						|
        # Django WSGIRequest setup code that should match logic from
 | 
						|
        # Django's WSGIHandler.__call__ before the call to
 | 
						|
        # `get_response()`.
 | 
						|
        set_script_prefix(get_script_name(environ))
 | 
						|
        await signals.request_started.asend(sender=type(self.django_handler))
 | 
						|
        self._request = WSGIRequest(environ)
 | 
						|
 | 
						|
        # We do the import during runtime to avoid cyclic dependency
 | 
						|
        from zerver.lib.request import RequestNotes
 | 
						|
 | 
						|
        # Provide a way for application code to access this handler
 | 
						|
        # given the HttpRequest object.
 | 
						|
        RequestNotes.get_notes(self._request).tornado_handler_id = self.handler_id
 | 
						|
 | 
						|
        return self._request
 | 
						|
 | 
						|
    async def write_django_response_as_tornado_response(self, response: HttpResponse) -> None:
 | 
						|
        # This takes a Django HttpResponse and copies its HTTP status
 | 
						|
        # code, headers, cookies, and content onto this
 | 
						|
        # tornado.web.RequestHandler (which is how Tornado prepares a
 | 
						|
        # response to write).
 | 
						|
 | 
						|
        # Copy the HTTP status code.
 | 
						|
        self.set_status(response.status_code)
 | 
						|
 | 
						|
        # Copy the HTTP headers (iterating through a Django
 | 
						|
        # HttpResponse is the way to access its headers as key/value pairs)
 | 
						|
        for h in response.items():
 | 
						|
            self.set_header(h[0], h[1])
 | 
						|
 | 
						|
        # Copy any cookies
 | 
						|
        if not hasattr(self, "_new_cookies"):
 | 
						|
            self._new_cookies: list[http.cookie.SimpleCookie] = []
 | 
						|
        self._new_cookies.append(response.cookies)
 | 
						|
 | 
						|
        # Copy the response content
 | 
						|
        self.write(response.content)
 | 
						|
 | 
						|
        # Close the connection.
 | 
						|
        # While writing the response, we might realize that the
 | 
						|
        # user already closed the connection; that is fine.
 | 
						|
        with suppress(StreamClosedError):
 | 
						|
            await self.finish()
 | 
						|
 | 
						|
    @override
 | 
						|
    async def get(self, *args: Any, **kwargs: Any) -> None:
 | 
						|
        request = await self.convert_tornado_request_to_django_request()
 | 
						|
        response = await sync_to_async(
 | 
						|
            lambda: self.django_handler.get_response(request), thread_sensitive=True
 | 
						|
        )()
 | 
						|
 | 
						|
        try:
 | 
						|
            if isinstance(response, AsynchronousResponse):
 | 
						|
                # We import async_request_timer_restart during runtime
 | 
						|
                # to avoid cyclic dependency with zerver.lib.request
 | 
						|
                from zerver.middleware import async_request_timer_stop
 | 
						|
 | 
						|
                # For asynchronous requests, this is where we exit
 | 
						|
                # without returning the HttpResponse that Django
 | 
						|
                # generated back to the user in order to long-poll the
 | 
						|
                # connection.  We save some timers here in order to
 | 
						|
                # support accurate accounting of the total resources
 | 
						|
                # consumed by the request when it eventually returns a
 | 
						|
                # response and is logged.
 | 
						|
                async_request_timer_stop(request)
 | 
						|
            else:
 | 
						|
                # For normal/synchronous requests that don't end up
 | 
						|
                # long-polling, we just need to write the HTTP
 | 
						|
                # response that Django prepared for us via Tornado.
 | 
						|
                assert isinstance(response, HttpResponse)
 | 
						|
                await self.write_django_response_as_tornado_response(response)
 | 
						|
        finally:
 | 
						|
            # Tell Django that we're done processing this request on
 | 
						|
            # the Django side; this triggers cleanup work like
 | 
						|
            # resetting the urlconf and any cache/database
 | 
						|
            # connections.
 | 
						|
            await sync_to_async(response.close, thread_sensitive=True)()
 | 
						|
 | 
						|
    @override
 | 
						|
    async def post(self, *args: Any, **kwargs: Any) -> None:
 | 
						|
        await self.get(*args, **kwargs)
 | 
						|
 | 
						|
    @override
 | 
						|
    async def delete(self, *args: Any, **kwargs: Any) -> None:
 | 
						|
        await self.get(*args, **kwargs)
 | 
						|
 | 
						|
    @override
 | 
						|
    def on_connection_close(self) -> None:
 | 
						|
        # Register a Tornado handler that runs when client-side
 | 
						|
        # connections are closed to notify the events system.
 | 
						|
 | 
						|
        # If the client goes away, garbage collect the handler (with
 | 
						|
        # attached request information).
 | 
						|
        clear_handler_by_id(self.handler_id)
 | 
						|
        client_descriptor = get_descriptor_by_handler_id(self.handler_id)
 | 
						|
        if client_descriptor is not None:
 | 
						|
            client_descriptor.disconnect_handler(client_closed=True)
 | 
						|
 | 
						|
    async def zulip_finish(self, result_dict: dict[str, Any], old_request: HttpRequest) -> None:
 | 
						|
        # Function called when we want to break a long-polled
 | 
						|
        # get_events request and return a response to the client.
 | 
						|
 | 
						|
        # Marshall the response data from result_dict.
 | 
						|
        if result_dict["result"] == "error":
 | 
						|
            self.set_status(400)
 | 
						|
 | 
						|
        # The `result` dictionary contains the data we want to return
 | 
						|
        # to the client.  We want to do so in a proper Tornado HTTP
 | 
						|
        # response after running the Django response middleware (which
 | 
						|
        # does things like log the request, add rate-limit headers,
 | 
						|
        # etc.).  The Django middleware API expects to receive a fresh
 | 
						|
        # HttpRequest object, and so to minimize hacks, our strategy
 | 
						|
        # is to create a duplicate Django HttpRequest object, tagged
 | 
						|
        # to automatically return our data in its response, and call
 | 
						|
        # Django's main self.get_response() handler to generate an
 | 
						|
        # HttpResponse with all Django middleware run.
 | 
						|
        request = await self.convert_tornado_request_to_django_request()
 | 
						|
 | 
						|
        # We import RequestNotes during runtime to avoid
 | 
						|
        # cyclic import
 | 
						|
        from zerver.lib.request import RequestNotes
 | 
						|
 | 
						|
        request_notes = RequestNotes.get_notes(request)
 | 
						|
        old_request_notes = RequestNotes.get_notes(old_request)
 | 
						|
 | 
						|
        # Add to this new HttpRequest logging data from the processing of
 | 
						|
        # the original request; we will need these for logging.
 | 
						|
        request_notes.log_data = old_request_notes.log_data
 | 
						|
        request_notes.ratelimits_applied += old_request_notes.ratelimits_applied
 | 
						|
        request_notes.requester_for_logs = old_request_notes.requester_for_logs
 | 
						|
        request.user = old_request.user
 | 
						|
        request_notes.client = old_request_notes.client
 | 
						|
        request_notes.client_name = old_request_notes.client_name
 | 
						|
        request_notes.client_version = old_request_notes.client_version
 | 
						|
 | 
						|
        # The saved_response attribute, if present, causes
 | 
						|
        # rest_dispatch to return the response immediately before
 | 
						|
        # doing any work.  This arrangement allows Django's full
 | 
						|
        # request/middleware system to run unmodified while avoiding
 | 
						|
        # running expensive things like Zulip's authentication code a
 | 
						|
        # second time.
 | 
						|
        request_notes.saved_response = json_response(
 | 
						|
            res_type=result_dict["result"], data=result_dict, status=self.get_status()
 | 
						|
        )
 | 
						|
 | 
						|
        response = await sync_to_async(
 | 
						|
            lambda: self.django_handler.get_response(request), thread_sensitive=True
 | 
						|
        )()
 | 
						|
        try:
 | 
						|
            # Explicitly mark requests as varying by cookie, since the
 | 
						|
            # middleware will not have seen a session access
 | 
						|
            patch_vary_headers(response, ("Cookie",))
 | 
						|
            assert isinstance(response, HttpResponse)
 | 
						|
            await self.write_django_response_as_tornado_response(response)
 | 
						|
        finally:
 | 
						|
            # Tell Django we're done processing this request
 | 
						|
            await sync_to_async(response.close, thread_sensitive=True)()
 |