Files
zulip/zerver/tornado/django_api.py
Alex Vandiver e5f62d083e tornado: Merge the TORNADO_SERVER and TORNADO_PORTS configs.
Having both of these is confusing; TORNADO_SERVER is used only when
there is one TORNADO_PORT.  Its primary use is actually to be _unset_,
and signal that in-process handling is to be done.

Rename to USING_TORNADO, to parallel the existing USING_RABBITMQ, and
switch the places that used it for its contents to using
TORNADO_PORTS.
2020-09-21 15:36:16 -07:00

129 lines
5.2 KiB
Python

from functools import lru_cache
from typing import Any, Container, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse
import orjson
import requests
from django.conf import settings
from requests.adapters import ConnectionError, HTTPAdapter
from requests.models import PreparedRequest, Response
from requests.packages.urllib3.util.retry import Retry
from zerver.lib.queue import queue_json_publish
from zerver.models import Client, Realm, UserProfile
from zerver.tornado.event_queue import process_notification
from zerver.tornado.sharding import get_tornado_port, get_tornado_uri, notify_tornado_queue_name
class TornadoAdapter(HTTPAdapter):
def __init__(self) -> None:
# All of the POST requests we make to Tornado are safe to
# retry; allow retries of them, which is not the default.
retry_methods = Retry.DEFAULT_METHOD_WHITELIST | set(['POST'])
retry = Retry(total=3, backoff_factor=1, method_whitelist=retry_methods)
super().__init__(max_retries=retry)
def send(
self,
request: PreparedRequest,
stream: bool = False,
timeout: Union[None, float, Tuple[float, float], Tuple[float, None]] = 0.5,
verify: Union[bool, str] = True,
cert: Union[None, bytes, str, Container[Union[bytes, str]]] = None,
proxies: Optional[Mapping[str, str]] = None,
) -> Response:
if not proxies:
proxies = {}
merged_proxies = {**proxies, "no_proxy": "localhost,127.0.0.1"}
try:
resp = super().send(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=merged_proxies)
except ConnectionError:
parsed_url = urlparse(request.url)
logfile = f"tornado-{parsed_url.port}.log" if settings.TORNADO_PROCESSES > 1 else "tornado.log"
raise ConnectionError(
f"Django cannot connect to Tornado server ({request.url}); "
f"check {settings.ERROR_FILE_LOG_PATH} and {logfile}"
)
resp.raise_for_status()
return resp
@lru_cache(None)
def requests_client() -> requests.Session:
c = requests.Session()
adapter = TornadoAdapter()
for scheme in ("https://", "http://"):
c.mount(scheme, adapter)
return c
def request_event_queue(user_profile: UserProfile, user_client: Client, apply_markdown: bool,
client_gravatar: bool, slim_presence: bool, queue_lifespan_secs: int,
event_types: Optional[Iterable[str]]=None,
all_public_streams: bool=False,
narrow: Iterable[Sequence[str]]=[],
bulk_message_deletion: bool=False) -> Optional[str]:
if not settings.USING_TORNADO:
return None
tornado_uri = get_tornado_uri(user_profile.realm)
req = {'dont_block': 'true',
'apply_markdown': orjson.dumps(apply_markdown),
'client_gravatar': orjson.dumps(client_gravatar),
'slim_presence': orjson.dumps(slim_presence),
'all_public_streams': orjson.dumps(all_public_streams),
'client': 'internal',
'user_profile_id': user_profile.id,
'user_client': user_client.name,
'narrow': orjson.dumps(narrow),
'secret': settings.SHARED_SECRET,
'lifespan_secs': queue_lifespan_secs,
'bulk_message_deletion': orjson.dumps(bulk_message_deletion)}
if event_types is not None:
req['event_types'] = orjson.dumps(event_types)
resp = requests_client().post(
tornado_uri + '/api/v1/events/internal',
data=req
)
return resp.json()['queue_id']
def get_user_events(user_profile: UserProfile, queue_id: str, last_event_id: int) -> List[Dict[str, Any]]:
if not settings.USING_TORNADO:
return []
tornado_uri = get_tornado_uri(user_profile.realm)
post_data: Dict[str, Any] = {
'queue_id': queue_id,
'last_event_id': last_event_id,
'dont_block': 'true',
'user_profile_id': user_profile.id,
'secret': settings.SHARED_SECRET,
'client': 'internal',
}
resp = requests_client().post(
tornado_uri + '/api/v1/events/internal',
data=post_data
)
return resp.json()['events']
def send_notification_http(realm: Realm, data: Mapping[str, Any]) -> None:
if not settings.USING_TORNADO or settings.RUNNING_INSIDE_TORNADO:
process_notification(data)
else:
tornado_uri = get_tornado_uri(realm)
requests_client().post(
tornado_uri + "/notify_tornado",
data=dict(data=orjson.dumps(data), secret=settings.SHARED_SECRET),
)
def send_event(realm: Realm, event: Mapping[str, Any],
users: Union[Iterable[int], Iterable[Mapping[str, Any]]]) -> None:
"""`users` is a list of user IDs, or in the case of `message` type
events, a list of dicts describing the users and metadata about
the user/message pair."""
port = get_tornado_port(realm)
queue_json_publish(notify_tornado_queue_name(port),
dict(event=event, users=list(users)),
lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs))