mirror of
https://github.com/zulip/zulip.git
synced 2025-11-06 15:03:34 +00:00
Send heartbeat events to all clients
This effectively implements a server-side timeout for get_events (imported from commit 4151562358329a6a5662f219ab2a9241e54ad585)
This commit is contained in:
@@ -11,8 +11,10 @@ import atexit
|
|||||||
import sys
|
import sys
|
||||||
import signal
|
import signal
|
||||||
import tornado
|
import tornado
|
||||||
|
import random
|
||||||
|
|
||||||
IDLE_EVENT_QUEUE_TIMEOUT_SECS = 60 * 10
|
IDLE_EVENT_QUEUE_TIMEOUT_SECS = 60 * 10
|
||||||
|
HEARTBEAT_MIN_FREQ_SECS = 50
|
||||||
|
|
||||||
class ClientDescriptor(object):
|
class ClientDescriptor(object):
|
||||||
def __init__(self, user_profile_id, id, event_types=None, apply_markdown=True):
|
def __init__(self, user_profile_id, id, event_types=None, apply_markdown=True):
|
||||||
@@ -22,13 +24,17 @@ class ClientDescriptor(object):
|
|||||||
self.event_types = event_types
|
self.event_types = event_types
|
||||||
self.last_connection_time = time.time()
|
self.last_connection_time = time.time()
|
||||||
self.apply_markdown = apply_markdown
|
self.apply_markdown = apply_markdown
|
||||||
|
self._timeout_handle = None
|
||||||
|
|
||||||
|
def prepare_for_pickling(self):
|
||||||
|
self.current_handler = None
|
||||||
|
self._timeout_handle = None
|
||||||
|
|
||||||
def add_event(self, event):
|
def add_event(self, event):
|
||||||
if self.current_handler is not None:
|
if self.current_handler is not None:
|
||||||
self.current_handler._request._time_restarted = time.time()
|
self.current_handler._request._time_restarted = time.time()
|
||||||
|
|
||||||
self.event_queue.push(event)
|
self.event_queue.push(event)
|
||||||
self.check_connection()
|
|
||||||
if self.current_handler is not None:
|
if self.current_handler is not None:
|
||||||
try:
|
try:
|
||||||
self.current_handler.humbug_finish(dict(result='success', msg='',
|
self.current_handler.humbug_finish(dict(result='success', msg='',
|
||||||
@@ -36,9 +42,9 @@ class ClientDescriptor(object):
|
|||||||
queue_id=self.event_queue.id),
|
queue_id=self.event_queue.id),
|
||||||
self.current_handler._request,
|
self.current_handler._request,
|
||||||
apply_markdown=self.apply_markdown)
|
apply_markdown=self.apply_markdown)
|
||||||
return
|
|
||||||
except socket.error:
|
except socket.error:
|
||||||
pass
|
pass
|
||||||
|
self.disconnect_handler()
|
||||||
|
|
||||||
def accepts_event_type(self, type):
|
def accepts_event_type(self, type):
|
||||||
if self.event_types is None:
|
if self.event_types is None:
|
||||||
@@ -46,21 +52,26 @@ class ClientDescriptor(object):
|
|||||||
return type in self.event_types
|
return type in self.event_types
|
||||||
|
|
||||||
def idle(self, now):
|
def idle(self, now):
|
||||||
self.check_connection()
|
|
||||||
return (self.current_handler is None
|
return (self.current_handler is None
|
||||||
and now - self.last_connection_time >= IDLE_EVENT_QUEUE_TIMEOUT_SECS)
|
and now - self.last_connection_time >= IDLE_EVENT_QUEUE_TIMEOUT_SECS)
|
||||||
|
|
||||||
def connect_handler(self, handler):
|
def connect_handler(self, handler):
|
||||||
self.current_handler = handler
|
self.current_handler = handler
|
||||||
self.last_connection_time = time.time()
|
self.last_connection_time = time.time()
|
||||||
|
def timeout_callback():
|
||||||
|
self._timeout_handle = None
|
||||||
|
# All clients get heartbeat events
|
||||||
|
self.add_event(dict(type='heartbeat'))
|
||||||
|
ioloop = tornado.ioloop.IOLoop.instance()
|
||||||
|
heartbeat_time = time.time() + HEARTBEAT_MIN_FREQ_SECS + random.randint(0, 10)
|
||||||
|
self._timeout_handle = ioloop.add_timeout(heartbeat_time, timeout_callback)
|
||||||
|
|
||||||
def disconnect_handler(self):
|
def disconnect_handler(self):
|
||||||
self.current_handler = None
|
self.current_handler = None
|
||||||
|
if self._timeout_handle is not None:
|
||||||
def check_connection(self):
|
ioloop = tornado.ioloop.IOLoop.instance()
|
||||||
if (self.current_handler is not None
|
ioloop.remove_timeout(self._timeout_handle)
|
||||||
and self.current_handler.request.connection.stream.closed()):
|
self._timeout_handle = None
|
||||||
self.current_handler = None
|
|
||||||
|
|
||||||
class EventQueue(object):
|
class EventQueue(object):
|
||||||
def __init__(self, id):
|
def __init__(self, id):
|
||||||
@@ -133,7 +144,7 @@ def dump_event_queues():
|
|||||||
start = time.time()
|
start = time.time()
|
||||||
# Remove unpickle-able attributes
|
# Remove unpickle-able attributes
|
||||||
for client in clients.itervalues():
|
for client in clients.itervalues():
|
||||||
client.current_handler = None
|
client.prepare_for_pickling()
|
||||||
|
|
||||||
with file(PERSISTENT_QUEUE_FILENAME, "w") as stored_queues:
|
with file(PERSISTENT_QUEUE_FILENAME, "w") as stored_queues:
|
||||||
pickle.dump(clients, stored_queues)
|
pickle.dump(clients, stored_queues)
|
||||||
|
|||||||
Reference in New Issue
Block a user