diff --git a/zephyr/lib/event_queue.py b/zephyr/lib/event_queue.py index c1247d8996..2e95f97981 100644 --- a/zephyr/lib/event_queue.py +++ b/zephyr/lib/event_queue.py @@ -11,8 +11,10 @@ import atexit import sys import signal import tornado +import random IDLE_EVENT_QUEUE_TIMEOUT_SECS = 60 * 10 +HEARTBEAT_MIN_FREQ_SECS = 50 class ClientDescriptor(object): def __init__(self, user_profile_id, id, event_types=None, apply_markdown=True): @@ -22,13 +24,17 @@ class ClientDescriptor(object): self.event_types = event_types self.last_connection_time = time.time() self.apply_markdown = apply_markdown + self._timeout_handle = None + + def prepare_for_pickling(self): + self.current_handler = None + self._timeout_handle = None def add_event(self, event): if self.current_handler is not None: self.current_handler._request._time_restarted = time.time() self.event_queue.push(event) - self.check_connection() if self.current_handler is not None: try: self.current_handler.humbug_finish(dict(result='success', msg='', @@ -36,9 +42,9 @@ class ClientDescriptor(object): queue_id=self.event_queue.id), self.current_handler._request, apply_markdown=self.apply_markdown) - return except socket.error: pass + self.disconnect_handler() def accepts_event_type(self, type): if self.event_types is None: @@ -46,21 +52,26 @@ class ClientDescriptor(object): return type in self.event_types def idle(self, now): - self.check_connection() return (self.current_handler is None and now - self.last_connection_time >= IDLE_EVENT_QUEUE_TIMEOUT_SECS) def connect_handler(self, handler): self.current_handler = handler self.last_connection_time = time.time() + def timeout_callback(): + self._timeout_handle = None + # All clients get heartbeat events + self.add_event(dict(type='heartbeat')) + ioloop = tornado.ioloop.IOLoop.instance() + heartbeat_time = time.time() + HEARTBEAT_MIN_FREQ_SECS + random.randint(0, 10) + self._timeout_handle = ioloop.add_timeout(heartbeat_time, timeout_callback) def disconnect_handler(self): self.current_handler = None - - def check_connection(self): - if (self.current_handler is not None - and self.current_handler.request.connection.stream.closed()): - self.current_handler = None + if self._timeout_handle is not None: + ioloop = tornado.ioloop.IOLoop.instance() + ioloop.remove_timeout(self._timeout_handle) + self._timeout_handle = None class EventQueue(object): def __init__(self, id): @@ -133,7 +144,7 @@ def dump_event_queues(): start = time.time() # Remove unpickle-able attributes for client in clients.itervalues(): - client.current_handler = None + client.prepare_for_pickling() with file(PERSISTENT_QUEUE_FILENAME, "w") as stored_queues: pickle.dump(clients, stored_queues)