tornado: Extract functions called from django into one module.

This makes clearer the separation of concerns.
This commit is contained in:
Alex Vandiver
2020-08-10 09:40:38 -07:00
committed by Tim Abbott
parent 33e19fa7d1
commit 39368cad3a
8 changed files with 106 additions and 103 deletions

View File

@@ -112,6 +112,7 @@ not_yet_fully_covered = {path for target in [
# Tornado should ideally have full coverage, but we're not there.
'zerver/tornado/autoreload.py',
'zerver/tornado/descriptors.py',
'zerver/tornado/django_api.py',
'zerver/tornado/event_queue.py',
'zerver/tornado/exceptions.py',
'zerver/tornado/handlers.py',

View File

@@ -229,7 +229,7 @@ from zerver.models import (
stream_name_in_use,
validate_attachment_request,
)
from zerver.tornado.event_queue import send_event
from zerver.tornado.django_api import send_event
if settings.BILLING_ENABLED:
from corporate.lib.stripe import downgrade_now, update_license_ledger_if_needed

View File

@@ -57,7 +57,7 @@ from zerver.models import (
get_realm_domains,
realm_filters_for_realm,
)
from zerver.tornado.event_queue import get_user_events, request_event_queue
from zerver.tornado.django_api import get_user_events, request_event_queue
from zproject.backends import email_auth_enabled, password_auth_enabled

View File

@@ -22,7 +22,7 @@ from zerver.models import (
get_stream_by_id_in_realm,
is_cross_realm_bot_email,
)
from zerver.tornado.event_queue import send_event
from zerver.tornado.django_api import send_event
def get_default_value_for_history_public_to_subscribers(

View File

@@ -53,7 +53,7 @@ from zerver.models import (
get_realm,
get_stream,
)
from zerver.tornado import event_queue
from zerver.tornado import django_api as django_tornado_api
from zerver.tornado.handlers import AsyncDjangoHandler, allocate_handler_id
from zerver.worker import queue_processors
from zproject.backends import ExternalAuthDataDict, ExternalAuthResult
@@ -91,15 +91,15 @@ def simulated_queue_client(client: Callable[..., Any]) -> Iterator[None]:
@contextmanager
def tornado_redirected_to_list(lst: List[Mapping[str, Any]]) -> Iterator[None]:
real_event_queue_process_notification = event_queue.process_notification
event_queue.process_notification = lambda notice: lst.append(notice)
real_event_queue_process_notification = django_tornado_api.process_notification
django_tornado_api.process_notification = lambda notice: lst.append(notice)
# process_notification takes a single parameter called 'notice'.
# lst.append takes a single argument called 'object'.
# Some code might call process_notification using keyword arguments,
# so mypy doesn't allow assigning lst.append to process_notification
# So explicitly change parameter name to 'notice' to work around this problem
yield
event_queue.process_notification = real_event_queue_process_notification
django_tornado_api.process_notification = real_event_queue_process_notification
class EventInfo:
def populate(self, call_args_list: List[Any]) -> None:

View File

@@ -37,7 +37,7 @@ from zerver.models import (
get_user_profile_by_email,
)
# Class with helper functions useful for testing archiving of reactions:
from zerver.tornado.event_queue import send_event
from zerver.tornado.django_api import send_event
ZULIP_REALM_DAYS = 30
MIT_REALM_DAYS = 100

View File

@@ -0,0 +1,96 @@
import logging
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Union
import requests
import ujson
from django.conf import settings
from zerver.lib.queue import queue_json_publish
from zerver.models import Client, Realm, UserProfile
from zerver.tornado.event_queue import process_notification
from zerver.tornado.sharding import get_tornado_port, get_tornado_uri, notify_tornado_queue_name
requests_client = requests.Session()
for host in ['127.0.0.1', 'localhost']:
if settings.TORNADO_SERVER and host in settings.TORNADO_SERVER:
# This seems like the only working solution to ignore proxy in
# requests library.
requests_client.trust_env = False
def request_event_queue(user_profile: UserProfile, user_client: Client, apply_markdown: bool,
client_gravatar: bool, slim_presence: bool, queue_lifespan_secs: int,
event_types: Optional[Iterable[str]]=None,
all_public_streams: bool=False,
narrow: Iterable[Sequence[str]]=[],
bulk_message_deletion: bool=False) -> Optional[str]:
if settings.TORNADO_SERVER:
tornado_uri = get_tornado_uri(user_profile.realm)
req = {'dont_block': 'true',
'apply_markdown': ujson.dumps(apply_markdown),
'client_gravatar': ujson.dumps(client_gravatar),
'slim_presence': ujson.dumps(slim_presence),
'all_public_streams': ujson.dumps(all_public_streams),
'client': 'internal',
'user_profile_id': user_profile.id,
'user_client': user_client.name,
'narrow': ujson.dumps(narrow),
'secret': settings.SHARED_SECRET,
'lifespan_secs': queue_lifespan_secs,
'bulk_message_deletion': ujson.dumps(bulk_message_deletion)}
if event_types is not None:
req['event_types'] = ujson.dumps(event_types)
try:
resp = requests_client.post(tornado_uri + '/api/v1/events/internal',
data=req)
except requests.adapters.ConnectionError:
logging.error('Tornado server does not seem to be running, check %s '
'and %s for more information.',
settings.ERROR_FILE_LOG_PATH, "tornado.log")
raise requests.adapters.ConnectionError(
f"Django cannot connect to Tornado server ({tornado_uri}); try restarting")
resp.raise_for_status()
return resp.json()['queue_id']
return None
def get_user_events(user_profile: UserProfile, queue_id: str, last_event_id: int) -> List[Dict[str, Any]]:
if settings.TORNADO_SERVER:
tornado_uri = get_tornado_uri(user_profile.realm)
post_data: Dict[str, Any] = {
'queue_id': queue_id,
'last_event_id': last_event_id,
'dont_block': 'true',
'user_profile_id': user_profile.id,
'secret': settings.SHARED_SECRET,
'client': 'internal',
}
resp = requests_client.post(tornado_uri + '/api/v1/events/internal',
data=post_data)
resp.raise_for_status()
return resp.json()['events']
return []
def send_notification_http(realm: Realm, data: Mapping[str, Any]) -> None:
if settings.TORNADO_SERVER and not settings.RUNNING_INSIDE_TORNADO:
tornado_uri = get_tornado_uri(realm)
requests_client.post(tornado_uri + '/notify_tornado', data=dict(
data = ujson.dumps(data),
secret = settings.SHARED_SECRET))
else:
process_notification(data)
def send_event(realm: Realm, event: Mapping[str, Any],
users: Union[Iterable[int], Iterable[Mapping[str, Any]]]) -> None:
"""`users` is a list of user IDs, or in the case of `message` type
events, a list of dicts describing the users and metadata about
the user/message pair."""
port = get_tornado_port(realm)
queue_json_publish(notify_tornado_queue_name(port),
dict(event=event, users=list(users)),
lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs))

View File

@@ -27,7 +27,6 @@ from typing import (
cast,
)
import requests
import tornado.ioloop
import ujson
from django.conf import settings
@@ -41,7 +40,7 @@ from zerver.lib.queue import queue_json_publish, retry_event
from zerver.lib.request import JsonableError
from zerver.lib.utils import statsd
from zerver.middleware import async_request_timer_restart
from zerver.models import Client, Realm, UserProfile
from zerver.models import UserProfile
from zerver.tornado.autoreload import add_reload_hook
from zerver.tornado.descriptors import clear_descriptor_by_handler_id, set_descriptor_by_handler_id
from zerver.tornado.exceptions import BadEventQueueIdError
@@ -51,14 +50,6 @@ from zerver.tornado.handlers import (
get_handler_by_id,
handler_stats_string,
)
from zerver.tornado.sharding import get_tornado_port, get_tornado_uri, notify_tornado_queue_name
requests_client = requests.Session()
for host in ['127.0.0.1', 'localhost']:
if settings.TORNADO_SERVER and host in settings.TORNADO_SERVER:
# This seems like the only working solution to ignore proxy in
# requests library.
requests_client.trust_env = False
# The idle timeout used to be a week, but we found that in that
# situation, queues from dead browser sessions would grow quite large
@@ -607,67 +598,6 @@ def fetch_events(query: Mapping[str, Any]) -> Dict[str, Any]:
client.connect_handler(handler_id, client_type_name)
return dict(type="async")
# The following functions are called from Django
def request_event_queue(user_profile: UserProfile, user_client: Client, apply_markdown: bool,
client_gravatar: bool, slim_presence: bool, queue_lifespan_secs: int,
event_types: Optional[Iterable[str]]=None,
all_public_streams: bool=False,
narrow: Iterable[Sequence[str]]=[],
bulk_message_deletion: bool=False) -> Optional[str]:
if settings.TORNADO_SERVER:
tornado_uri = get_tornado_uri(user_profile.realm)
req = {'dont_block': 'true',
'apply_markdown': ujson.dumps(apply_markdown),
'client_gravatar': ujson.dumps(client_gravatar),
'slim_presence': ujson.dumps(slim_presence),
'all_public_streams': ujson.dumps(all_public_streams),
'client': 'internal',
'user_profile_id': user_profile.id,
'user_client': user_client.name,
'narrow': ujson.dumps(narrow),
'secret': settings.SHARED_SECRET,
'lifespan_secs': queue_lifespan_secs,
'bulk_message_deletion': ujson.dumps(bulk_message_deletion)}
if event_types is not None:
req['event_types'] = ujson.dumps(event_types)
try:
resp = requests_client.post(tornado_uri + '/api/v1/events/internal',
data=req)
except requests.adapters.ConnectionError:
logging.error('Tornado server does not seem to be running, check %s '
'and %s for more information.',
settings.ERROR_FILE_LOG_PATH, "tornado.log")
raise requests.adapters.ConnectionError(
f"Django cannot connect to Tornado server ({tornado_uri}); try restarting")
resp.raise_for_status()
return resp.json()['queue_id']
return None
def get_user_events(user_profile: UserProfile, queue_id: str, last_event_id: int) -> List[Dict[str, Any]]:
if settings.TORNADO_SERVER:
tornado_uri = get_tornado_uri(user_profile.realm)
post_data: Dict[str, Any] = {
'queue_id': queue_id,
'last_event_id': last_event_id,
'dont_block': 'true',
'user_profile_id': user_profile.id,
'secret': settings.SHARED_SECRET,
'client': 'internal',
}
resp = requests_client.post(tornado_uri + '/api/v1/events/internal',
data=post_data)
resp.raise_for_status()
return resp.json()['events']
return []
# Send email notifications to idle users
# after they are idle for 1 hour
NOTIFY_AFTER_IDLE_HOURS = 1
@@ -1162,27 +1092,3 @@ def get_wrapped_process_notification(queue_name: str) -> Callable[[Dict[str, Any
retry_event(queue_name, notice, failure_processor)
return wrapped_process_notification
# Runs in the Django process to send a notification to Tornado.
#
# We use JSON rather than bare form parameters, so that we can represent
# different types and for compatibility with non-HTTP transports.
def send_notification_http(realm: Realm, data: Mapping[str, Any]) -> None:
if settings.TORNADO_SERVER and not settings.RUNNING_INSIDE_TORNADO:
tornado_uri = get_tornado_uri(realm)
requests_client.post(tornado_uri + '/notify_tornado', data=dict(
data = ujson.dumps(data),
secret = settings.SHARED_SECRET))
else:
process_notification(data)
def send_event(realm: Realm, event: Mapping[str, Any],
users: Union[Iterable[int], Iterable[Mapping[str, Any]]]) -> None:
"""`users` is a list of user IDs, or in the case of `message` type
events, a list of dicts describing the users and metadata about
the user/message pair."""
port = get_tornado_port(realm)
queue_json_publish(notify_tornado_queue_name(port),
dict(event=event, users=list(users)),
lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs))