Let clients specify how long queues shall live, within limits

(imported from commit 86609f6611ed37b45b28c31e541528ce260d62c8)
This commit is contained in:
Leo Franchi
2013-08-05 16:09:12 -04:00
parent 59041d993b
commit 26cd96f132
4 changed files with 27 additions and 12 deletions

View File

@@ -23,6 +23,11 @@ from zerver.models import get_client
# due to the accumulation of message data in those queues.
IDLE_EVENT_QUEUE_TIMEOUT_SECS = 60 * 10
EVENT_QUEUE_GC_FREQ_MSECS = 1000 * 60 * 5
# Capped limit for how long a client can request an event queue
# to live
MAX_QUEUE_TIMEOUT_SECS = 7 * 24 * 60 * 60
# The heartbeats effectively act as a server-side timeout for
# get_events(). The actual timeout value is randomized for each
# client connection based on the below value. We ensure that the
@@ -32,16 +37,20 @@ HEARTBEAT_MIN_FREQ_SECS = 45
class ClientDescriptor(object):
def __init__(self, user_profile_id, id, event_types, client_type,
apply_markdown=True):
apply_markdown=True, lifespan_secs=0):
self.user_profile_id = user_profile_id
self.current_handler = None
self.event_queue = EventQueue(id)
self.queue_timeout = lifespan_secs
self.event_types = event_types
self.last_connection_time = time.time()
self.apply_markdown = apply_markdown
self.client_type = client_type
self._timeout_handle = None
# Clamp queue_timeout to between minimum and maximum timeouts
self.queue_timeout = max(IDLE_EVENT_QUEUE_TIMEOUT_SECS, min(self.queue_timeout, MAX_QUEUE_TIMEOUT_SECS))
def prepare_for_pickling(self):
self.current_handler = None
self._timeout_handle = None
@@ -68,8 +77,11 @@ class ClientDescriptor(object):
return type in self.event_types
def idle(self, now):
if not hasattr(self, 'queue_timeout'):
self.queue_timeout = IDLE_EVENT_QUEUE_TIMEOUT_SECS
return (self.current_handler is None
and now - self.last_connection_time >= IDLE_EVENT_QUEUE_TIMEOUT_SECS)
and now - self.last_connection_time >= self.queue_timeout)
def connect_handler(self, handler):
self.current_handler = handler
@@ -137,12 +149,12 @@ def get_client_descriptors_for_user(user_profile_id):
return user_clients.get(user_profile_id, [])
def allocate_client_descriptor(user_profile_id, event_types, client_type,
apply_markdown):
apply_markdown, lifespan_secs):
global next_queue_id
id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id)
next_queue_id += 1
client = ClientDescriptor(user_profile_id, id, event_types, client_type,
apply_markdown)
apply_markdown, lifespan_secs)
clients[id] = client
user_clients.setdefault(user_profile_id, []).append(client)
return client
@@ -244,12 +256,13 @@ def extract_json_response(resp):
return resp.json
def request_event_queue(user_profile, user_client, apply_markdown,
event_types=None):
queue_lifespan_secs, event_types=None):
if settings.TORNADO_SERVER:
req = {'dont_block' : 'true',
'apply_markdown': ujson.dumps(apply_markdown),
'client' : 'internal',
'user_client' : user_client.name}
'user_client' : user_client.name,
'lifespan_secs' : queue_lifespan_secs}
if event_types is not None:
req['event_types'] = ujson.dumps(event_types)
resp = requests.get(settings.TORNADO_SERVER + '/api/v1/events',