import logging from functools import lru_cache from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Union import requests import ujson from django.conf import settings from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from zerver.lib.queue import queue_json_publish from zerver.models import Client, Realm, UserProfile from zerver.tornado.event_queue import process_notification from zerver.tornado.sharding import get_tornado_port, get_tornado_uri, notify_tornado_queue_name @lru_cache(None) def requests_client() -> requests.Session: c = requests.Session() if settings.TORNADO_SERVER: if any(host in settings.TORNADO_SERVER for host in ['127.0.0.1', 'localhost']): # Disable trusting the environment, so requests don't # go through any env-configured external proxy. c.trust_env = False # During restarts, the tornado server may be down briefly retry = Retry( total=3, backoff_factor=1, ) adapter = HTTPAdapter(max_retries=retry) for scheme in ("https://", "http://"): c.mount(scheme, adapter) return c def request_event_queue(user_profile: UserProfile, user_client: Client, apply_markdown: bool, client_gravatar: bool, slim_presence: bool, queue_lifespan_secs: int, event_types: Optional[Iterable[str]]=None, all_public_streams: bool=False, narrow: Iterable[Sequence[str]]=[], bulk_message_deletion: bool=False) -> Optional[str]: if not settings.TORNADO_SERVER: return None tornado_uri = get_tornado_uri(user_profile.realm) req = {'dont_block': 'true', 'apply_markdown': ujson.dumps(apply_markdown), 'client_gravatar': ujson.dumps(client_gravatar), 'slim_presence': ujson.dumps(slim_presence), 'all_public_streams': ujson.dumps(all_public_streams), 'client': 'internal', 'user_profile_id': user_profile.id, 'user_client': user_client.name, 'narrow': ujson.dumps(narrow), 'secret': settings.SHARED_SECRET, 'lifespan_secs': queue_lifespan_secs, 'bulk_message_deletion': ujson.dumps(bulk_message_deletion)} if event_types is not None: req['event_types'] = ujson.dumps(event_types) try: resp = requests_client().post(tornado_uri + '/api/v1/events/internal', data=req) except requests.adapters.ConnectionError: logging.error('Tornado server does not seem to be running, check %s ' 'and %s for more information.', settings.ERROR_FILE_LOG_PATH, "tornado.log") raise requests.adapters.ConnectionError( f"Django cannot connect to Tornado server ({tornado_uri}); try restarting") resp.raise_for_status() return resp.json()['queue_id'] def get_user_events(user_profile: UserProfile, queue_id: str, last_event_id: int) -> List[Dict[str, Any]]: if not settings.TORNADO_SERVER: return [] tornado_uri = get_tornado_uri(user_profile.realm) post_data: Dict[str, Any] = { 'queue_id': queue_id, 'last_event_id': last_event_id, 'dont_block': 'true', 'user_profile_id': user_profile.id, 'secret': settings.SHARED_SECRET, 'client': 'internal', } resp = requests_client().post(tornado_uri + '/api/v1/events/internal', data=post_data) resp.raise_for_status() return resp.json()['events'] def send_notification_http(realm: Realm, data: Mapping[str, Any]) -> None: if not settings.TORNADO_SERVER or settings.RUNNING_INSIDE_TORNADO: process_notification(data) else: tornado_uri = get_tornado_uri(realm) requests_client().post( tornado_uri + "/notify_tornado", data=dict(data=ujson.dumps(data), secret=settings.SHARED_SECRET), ) def send_event(realm: Realm, event: Mapping[str, Any], users: Union[Iterable[int], Iterable[Mapping[str, Any]]]) -> None: """`users` is a list of user IDs, or in the case of `message` type events, a list of dicts describing the users and metadata about the user/message pair.""" port = get_tornado_port(realm) queue_json_publish(notify_tornado_queue_name(port), dict(event=event, users=list(users)), lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs))