mirror of
https://github.com/zulip/zulip.git
synced 2025-11-05 06:23:38 +00:00
Track Tornado handlers by uniquely assigned IDs rather than objects.
This is early preparation for splitting apart Tornado into a queue server and a frontend server.
This commit is contained in:
@@ -21,6 +21,7 @@ from zerver.decorator import RespondAsynchronously, JsonableError
|
|||||||
from zerver.lib.cache import cache_get_many, message_cache_key, \
|
from zerver.lib.cache import cache_get_many, message_cache_key, \
|
||||||
user_profile_by_id_cache_key, cache_save_user_profile
|
user_profile_by_id_cache_key, cache_save_user_profile
|
||||||
from zerver.lib.cache_helpers import cache_with_key
|
from zerver.lib.cache_helpers import cache_with_key
|
||||||
|
from zerver.lib.handlers import get_handler_by_id
|
||||||
from zerver.lib.utils import statsd
|
from zerver.lib.utils import statsd
|
||||||
from zerver.middleware import async_request_restart
|
from zerver.middleware import async_request_restart
|
||||||
from zerver.models import get_client, Message
|
from zerver.models import get_client, Message
|
||||||
@@ -57,7 +58,7 @@ class ClientDescriptor(object):
|
|||||||
# Additionally, the to_dict and from_dict methods must be updated
|
# Additionally, the to_dict and from_dict methods must be updated
|
||||||
self.user_profile_id = user_profile_id
|
self.user_profile_id = user_profile_id
|
||||||
self.realm_id = realm_id
|
self.realm_id = realm_id
|
||||||
self.current_handler = None
|
self.current_handler_id = None
|
||||||
self.event_queue = event_queue
|
self.event_queue = event_queue
|
||||||
self.queue_timeout = lifespan_secs
|
self.queue_timeout = lifespan_secs
|
||||||
self.event_types = event_types
|
self.event_types = event_types
|
||||||
@@ -97,30 +98,32 @@ class ClientDescriptor(object):
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
def prepare_for_pickling(self):
|
def prepare_for_pickling(self):
|
||||||
self.current_handler = None
|
self.current_handler_id = None
|
||||||
self._timeout_handle = None
|
self._timeout_handle = None
|
||||||
|
|
||||||
def add_event(self, event):
|
def add_event(self, event):
|
||||||
if self.current_handler is not None:
|
if self.current_handler_id is not None:
|
||||||
async_request_restart(self.current_handler._request)
|
handler = get_handler_by_id(self.current_handler_id)
|
||||||
|
async_request_restart(handler._request)
|
||||||
|
|
||||||
self.event_queue.push(event)
|
self.event_queue.push(event)
|
||||||
self.finish_current_handler()
|
self.finish_current_handler()
|
||||||
|
|
||||||
def finish_current_handler(self):
|
def finish_current_handler(self):
|
||||||
if self.current_handler is not None:
|
if self.current_handler_id is not None:
|
||||||
err_msg = "Got error finishing handler for queue %s" % (self.event_queue.id,)
|
err_msg = "Got error finishing handler for queue %s" % (self.event_queue.id,)
|
||||||
try:
|
try:
|
||||||
# We call async_request_restart here in case we are
|
# We call async_request_restart here in case we are
|
||||||
# being finished without any events (because another
|
# being finished without any events (because another
|
||||||
# get_events request has supplanted this request)
|
# get_events request has supplanted this request)
|
||||||
async_request_restart(self.current_handler._request)
|
handler = get_handler_by_id(self.current_handler_id)
|
||||||
self.current_handler._request._log_data['extra'] = "[%s/1]" % (self.event_queue.id,)
|
request = handler._request
|
||||||
self.current_handler.zulip_finish(dict(result='success', msg='',
|
async_request_restart(request)
|
||||||
|
request._log_data['extra'] = "[%s/1]" % (self.event_queue.id,)
|
||||||
|
handler.zulip_finish(dict(result='success', msg='',
|
||||||
events=self.event_queue.contents(),
|
events=self.event_queue.contents(),
|
||||||
queue_id=self.event_queue.id),
|
queue_id=self.event_queue.id),
|
||||||
self.current_handler._request,
|
request, apply_markdown=self.apply_markdown)
|
||||||
apply_markdown=self.apply_markdown)
|
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
if e.message != 'Stream is closed':
|
if e.message != 'Stream is closed':
|
||||||
logging.exception(err_msg)
|
logging.exception(err_msg)
|
||||||
@@ -149,11 +152,12 @@ class ClientDescriptor(object):
|
|||||||
if not hasattr(self, 'queue_timeout'):
|
if not hasattr(self, 'queue_timeout'):
|
||||||
self.queue_timeout = IDLE_EVENT_QUEUE_TIMEOUT_SECS
|
self.queue_timeout = IDLE_EVENT_QUEUE_TIMEOUT_SECS
|
||||||
|
|
||||||
return (self.current_handler is None
|
return (self.current_handler_id is None
|
||||||
and now - self.last_connection_time >= self.queue_timeout)
|
and now - self.last_connection_time >= self.queue_timeout)
|
||||||
|
|
||||||
def connect_handler(self, handler):
|
def connect_handler(self, handler_id):
|
||||||
self.current_handler = handler
|
self.current_handler_id = handler_id
|
||||||
|
handler = get_handler_by_id(self.current_handler_id)
|
||||||
handler.client_descriptor = self
|
handler.client_descriptor = self
|
||||||
self.last_connection_time = time.time()
|
self.last_connection_time = time.time()
|
||||||
def timeout_callback():
|
def timeout_callback():
|
||||||
@@ -166,13 +170,14 @@ class ClientDescriptor(object):
|
|||||||
self._timeout_handle = ioloop.add_timeout(heartbeat_time, timeout_callback)
|
self._timeout_handle = ioloop.add_timeout(heartbeat_time, timeout_callback)
|
||||||
|
|
||||||
def disconnect_handler(self, client_closed=False):
|
def disconnect_handler(self, client_closed=False):
|
||||||
if self.current_handler:
|
if self.current_handler_id:
|
||||||
self.current_handler.client_descriptor = None
|
handler = get_handler_by_id(self.current_handler_id)
|
||||||
|
request = handler._request
|
||||||
|
handler.client_descriptor = None
|
||||||
if client_closed:
|
if client_closed:
|
||||||
request = self.current_handler._request
|
|
||||||
logging.info("Client disconnected for queue %s (%s via %s)" % \
|
logging.info("Client disconnected for queue %s (%s via %s)" % \
|
||||||
(self.event_queue.id, request._email, request.client.name))
|
(self.event_queue.id, request._email, request.client.name))
|
||||||
self.current_handler = None
|
self.current_handler_id = None
|
||||||
if self._timeout_handle is not None:
|
if self._timeout_handle is not None:
|
||||||
ioloop = tornado.ioloop.IOLoop.instance()
|
ioloop = tornado.ioloop.IOLoop.instance()
|
||||||
ioloop.remove_timeout(self._timeout_handle)
|
ioloop.remove_timeout(self._timeout_handle)
|
||||||
@@ -436,7 +441,7 @@ def setup_event_queue():
|
|||||||
|
|
||||||
def fetch_events(user_profile_id, user_profile_realm_id, user_profile_email,
|
def fetch_events(user_profile_id, user_profile_realm_id, user_profile_email,
|
||||||
queue_id, last_event_id, event_types, user_client, apply_markdown,
|
queue_id, last_event_id, event_types, user_client, apply_markdown,
|
||||||
all_public_streams, lifespan_secs, narrow, dont_block, handler):
|
all_public_streams, lifespan_secs, narrow, dont_block, handler_id):
|
||||||
was_connected = False
|
was_connected = False
|
||||||
orig_queue_id = queue_id
|
orig_queue_id = queue_id
|
||||||
extra_log_data = ""
|
extra_log_data = ""
|
||||||
@@ -472,7 +477,7 @@ def fetch_events(user_profile_id, user_profile_realm_id, user_profile_email,
|
|||||||
if was_connected:
|
if was_connected:
|
||||||
logging.info("Disconnected handler for queue %s (%s/%s)" % (queue_id, user_profile_email,
|
logging.info("Disconnected handler for queue %s (%s/%s)" % (queue_id, user_profile_email,
|
||||||
user_client.name))
|
user_client.name))
|
||||||
client.connect_handler(handler)
|
client.connect_handler(handler_id)
|
||||||
return (RespondAsynchronously, None)
|
return (RespondAsynchronously, None)
|
||||||
|
|
||||||
# The following functions are called from Django
|
# The following functions are called from Django
|
||||||
|
|||||||
12
zerver/lib/handlers.py
Normal file
12
zerver/lib/handlers.py
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
current_handler_id = 0
|
||||||
|
handlers = {}
|
||||||
|
|
||||||
|
def get_handler_by_id(handler_id):
|
||||||
|
return handlers[handler_id]
|
||||||
|
|
||||||
|
def allocate_handler_id(handler):
|
||||||
|
global current_handler_id
|
||||||
|
handlers[current_handler_id] = handler
|
||||||
|
ret = current_handler_id
|
||||||
|
current_handler_id += 1
|
||||||
|
return ret
|
||||||
@@ -11,6 +11,7 @@ from zerver.lib.response import json_success, json_error
|
|||||||
from zerver.lib.validator import check_bool, check_list, check_string
|
from zerver.lib.validator import check_bool, check_list, check_string
|
||||||
from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor, \
|
from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor, \
|
||||||
process_notification, fetch_events
|
process_notification, fetch_events
|
||||||
|
from zerver.lib.handlers import allocate_handler_id
|
||||||
from zerver.lib.narrow import check_supported_events_narrow_filter
|
from zerver.lib.narrow import check_supported_events_narrow_filter
|
||||||
|
|
||||||
import ujson
|
import ujson
|
||||||
@@ -54,10 +55,11 @@ def get_events_backend(request, user_profile, handler = None,
|
|||||||
if user_client is None:
|
if user_client is None:
|
||||||
user_client = request.client
|
user_client = request.client
|
||||||
|
|
||||||
|
handler_id = allocate_handler_id(handler)
|
||||||
(result, log_data) = fetch_events(
|
(result, log_data) = fetch_events(
|
||||||
user_profile.id, user_profile.realm_id, user_profile.email, queue_id,
|
user_profile.id, user_profile.realm_id, user_profile.email, queue_id,
|
||||||
last_event_id, event_types, user_client, apply_markdown, all_public_streams,
|
last_event_id, event_types, user_client, apply_markdown, all_public_streams,
|
||||||
lifespan_secs, narrow, dont_block, handler)
|
lifespan_secs, narrow, dont_block, handler_id)
|
||||||
request._log_data['extra'] = log_data
|
request._log_data['extra'] = log_data
|
||||||
if result == RespondAsynchronously:
|
if result == RespondAsynchronously:
|
||||||
handler._request = request
|
handler._request = request
|
||||||
|
|||||||
Reference in New Issue
Block a user