mirror of
https://github.com/zulip/zulip.git
synced 2025-11-16 11:52:01 +00:00
Move get_events logic into a backend function in event_queue.py.
This commit is somewhat ugly, but its purpose is to be early preparation for splitting Tornado into a queue server and a frontend server, and this code belongs, by and large, in the queue server component.
This commit is contained in:
@@ -17,6 +17,7 @@ import signal
|
|||||||
import tornado
|
import tornado
|
||||||
import random
|
import random
|
||||||
import traceback
|
import traceback
|
||||||
|
from zerver.decorator import RespondAsynchronously, JsonableError
|
||||||
from zerver.lib.cache import cache_get_many, message_cache_key, \
|
from zerver.lib.cache import cache_get_many, message_cache_key, \
|
||||||
user_profile_by_id_cache_key, cache_save_user_profile
|
user_profile_by_id_cache_key, cache_save_user_profile
|
||||||
from zerver.lib.cache_helpers import cache_with_key
|
from zerver.lib.cache_helpers import cache_with_key
|
||||||
@@ -25,6 +26,7 @@ from zerver.middleware import async_request_restart
|
|||||||
from zerver.models import get_client, Message
|
from zerver.models import get_client, Message
|
||||||
from zerver.lib.narrow import build_narrow_filter
|
from zerver.lib.narrow import build_narrow_filter
|
||||||
from zerver.lib.queue import queue_json_publish
|
from zerver.lib.queue import queue_json_publish
|
||||||
|
from zerver.lib.response import json_success, json_error
|
||||||
from zerver.lib.timestamp import timestamp_to_datetime
|
from zerver.lib.timestamp import timestamp_to_datetime
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
@@ -432,6 +434,47 @@ def setup_event_queue():
|
|||||||
|
|
||||||
send_restart_events()
|
send_restart_events()
|
||||||
|
|
||||||
|
def fetch_events(user_profile_id, user_profile_realm_id, user_profile_email,
|
||||||
|
queue_id, last_event_id, event_types, user_client, apply_markdown,
|
||||||
|
all_public_streams, lifespan_secs, narrow, dont_block, handler):
|
||||||
|
was_connected = False
|
||||||
|
orig_queue_id = queue_id
|
||||||
|
extra_log_data = ""
|
||||||
|
if queue_id is None:
|
||||||
|
if dont_block:
|
||||||
|
client = allocate_client_descriptor(user_profile_id, user_profile_realm_id,
|
||||||
|
event_types, user_client, apply_markdown,
|
||||||
|
all_public_streams, lifespan_secs,
|
||||||
|
narrow=narrow)
|
||||||
|
queue_id = client.event_queue.id
|
||||||
|
else:
|
||||||
|
raise JsonableError("Missing 'queue_id' argument")
|
||||||
|
else:
|
||||||
|
if last_event_id is None:
|
||||||
|
raise JsonableError("Missing 'last_event_id' argument")
|
||||||
|
client = get_client_descriptor(queue_id)
|
||||||
|
if client is None:
|
||||||
|
raise JsonableError("Bad event queue id: %s" % (queue_id,))
|
||||||
|
if user_profile_id != client.user_profile_id:
|
||||||
|
raise JsonableError("You are not authorized to get events from this queue")
|
||||||
|
client.event_queue.prune(last_event_id)
|
||||||
|
was_connected = client.finish_current_handler()
|
||||||
|
|
||||||
|
if not client.event_queue.empty() or dont_block:
|
||||||
|
ret = {'events': client.event_queue.contents()}
|
||||||
|
if orig_queue_id is None:
|
||||||
|
ret['queue_id'] = queue_id
|
||||||
|
extra_log_data = "[%s/%s]" % (queue_id, len(ret["events"]))
|
||||||
|
if was_connected:
|
||||||
|
extra_log_data += " [was connected]"
|
||||||
|
return (json_success(ret), extra_log_data)
|
||||||
|
|
||||||
|
if was_connected:
|
||||||
|
logging.info("Disconnected handler for queue %s (%s/%s)" % (queue_id, user_profile_email,
|
||||||
|
user_client.name))
|
||||||
|
client.connect_handler(handler)
|
||||||
|
return (RespondAsynchronously, None)
|
||||||
|
|
||||||
# The following functions are called from Django
|
# The following functions are called from Django
|
||||||
|
|
||||||
# Workaround to support the Python-requests 1.0 transition of .json
|
# Workaround to support the Python-requests 1.0 transition of .json
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from zerver.decorator import asynchronous, \
|
|||||||
from zerver.lib.response import json_success, json_error
|
from zerver.lib.response import json_success, json_error
|
||||||
from zerver.lib.validator import check_bool, check_list, check_string
|
from zerver.lib.validator import check_bool, check_list, check_string
|
||||||
from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor, \
|
from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor, \
|
||||||
process_notification
|
process_notification, fetch_events
|
||||||
from zerver.lib.narrow import check_supported_events_narrow_filter
|
from zerver.lib.narrow import check_supported_events_narrow_filter
|
||||||
|
|
||||||
import ujson
|
import ujson
|
||||||
@@ -54,42 +54,11 @@ def get_events_backend(request, user_profile, handler = None,
|
|||||||
if user_client is None:
|
if user_client is None:
|
||||||
user_client = request.client
|
user_client = request.client
|
||||||
|
|
||||||
was_connected = False
|
(result, log_data) = fetch_events(
|
||||||
orig_queue_id = queue_id
|
user_profile.id, user_profile.realm_id, user_profile.email, queue_id,
|
||||||
if queue_id is None:
|
last_event_id, event_types, user_client, apply_markdown, all_public_streams,
|
||||||
if dont_block:
|
lifespan_secs, narrow, dont_block, handler)
|
||||||
client = allocate_client_descriptor(user_profile.id, user_profile.realm.id,
|
request._log_data['extra'] = log_data
|
||||||
event_types, user_client, apply_markdown,
|
if result == RespondAsynchronously:
|
||||||
all_public_streams, lifespan_secs,
|
|
||||||
narrow=narrow)
|
|
||||||
queue_id = client.event_queue.id
|
|
||||||
else:
|
|
||||||
return json_error("Missing 'queue_id' argument")
|
|
||||||
else:
|
|
||||||
if last_event_id is None:
|
|
||||||
return json_error("Missing 'last_event_id' argument")
|
|
||||||
client = get_client_descriptor(queue_id)
|
|
||||||
if client is None:
|
|
||||||
return json_error("Bad event queue id: %s" % (queue_id,))
|
|
||||||
if user_profile.id != client.user_profile_id:
|
|
||||||
return json_error("You are not authorized to get events from this queue")
|
|
||||||
client.event_queue.prune(last_event_id)
|
|
||||||
was_connected = client.finish_current_handler()
|
|
||||||
|
|
||||||
if not client.event_queue.empty() or dont_block:
|
|
||||||
ret = {'events': client.event_queue.contents()}
|
|
||||||
if orig_queue_id is None:
|
|
||||||
ret['queue_id'] = queue_id
|
|
||||||
request._log_data['extra'] = "[%s/%s]" % (queue_id, len(ret["events"]))
|
|
||||||
if was_connected:
|
|
||||||
request._log_data['extra'] += " [was connected]"
|
|
||||||
return json_success(ret)
|
|
||||||
|
|
||||||
handler._request = request
|
handler._request = request
|
||||||
if was_connected:
|
return result
|
||||||
logging.info("Disconnected handler for queue %s (%s/%s)" % (queue_id, user_profile.email,
|
|
||||||
user_client.name))
|
|
||||||
client.connect_handler(handler)
|
|
||||||
|
|
||||||
# runtornado recognizes this special return value.
|
|
||||||
return RespondAsynchronously
|
|
||||||
|
|||||||
Reference in New Issue
Block a user