Add server-side event filtering based on type

Clients can now request to receive only certain kinds of events,
although they always receive restart events.

(imported from commit 1e72981f8fe763829ab2abde1e35f94cad5c34e4)
This commit is contained in:
Zev Benjamin
2013-03-22 17:43:49 -04:00
parent f2b1de7940
commit 5b20dcf03c
4 changed files with 34 additions and 19 deletions

View File

@@ -15,10 +15,11 @@ import signal
IDLE_EVENT_QUEUE_TIMEOUT_SECS = 60 * 10
class ClientDescriptor(object):
def __init__(self, user_profile_id, id, apply_markdown=True):
def __init__(self, user_profile_id, id, event_types=None, apply_markdown=True):
self.user_profile_id = user_profile_id
self.current_handler = None
self.event_queue = EventQueue(id)
self.event_types = event_types
self.last_connection_time = time.time()
self.apply_markdown = apply_markdown
@@ -39,6 +40,11 @@ class ClientDescriptor(object):
except socket.error:
pass
def accepts_event_type(self, type):
if self.event_types is None:
return True
return type in self.event_types
def idle(self, now):
self.check_connection()
return (self.current_handler is None
@@ -87,11 +93,11 @@ user_clients = {}
next_queue_id = 0
def allocate_client_descriptor(user_profile_id, apply_markdown):
def allocate_client_descriptor(user_profile_id, event_types, apply_markdown):
global next_queue_id
id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id)
next_queue_id += 1
client = ClientDescriptor(user_profile_id, id, apply_markdown)
client = ClientDescriptor(user_profile_id, id, event_types, apply_markdown)
clients[id] = client
user_clients.setdefault(user_profile_id, []).append(client)
return client
@@ -151,9 +157,10 @@ def load_event_queues():
% (len(clients), time.time() - start))
def send_restart_events():
for client in clients.itervalues():
event = dict(type='restart', server_generation=settings.SERVER_GENERATION)
client.add_event(event)
for client in clients.itervalues():
# All clients get restart events
client.add_event(event.copy())
def setup_event_queue(io_loop):
load_event_queues()
@@ -174,11 +181,13 @@ def setup_event_queue(io_loop):
# The following functions are called from Django
def request_event_queue(user_profile, apply_markdown):
def request_event_queue(user_profile, apply_markdown, event_types=None):
if settings.TORNADO_SERVER:
req = {'dont_block' : 'true',
'apply_markdown': simplejson.dumps(apply_markdown),
'client' : 'internal'}
if event_types is not None:
req['event_types'] = simplejson.dumps(event_types)
resp = requests.get(settings.TORNADO_SERVER + '/api/v1/events',
auth=requests.auth.HTTPBasicAuth(user_profile.user.email,
user_profile.api_key),
@@ -197,6 +206,7 @@ def get_user_events(user_profile, queue_id, last_event_id):
user_profile.api_key),
params={'queue_id' : queue_id,
'last_event_id': last_event_id,
'dont_block' : 'true',
'client' : 'internal'})
resp.raise_for_status()

View File

@@ -240,9 +240,10 @@ def update_pointer(user_profile_id, new_pointer):
new_pointer=new_pointer,
update_types=["pointer_update"])
for client in user_clients.get(user_profile_id, []):
event = dict(type='pointer', pointer=new_pointer)
client.add_event(event)
for client in user_clients.get(user_profile_id, []):
if client.accepts_event_type(event['type']):
client.add_event(event.copy())
def process_new_message(data):
message = cache_get_message(data['message'])
@@ -251,10 +252,10 @@ def process_new_message(data):
user_receive_message(user_profile_id, message)
for client in user_clients.get(user_profile_id, []):
if client.accepts_event_type('message'):
event = dict(type='message', message=message.to_dict(client.apply_markdown))
client.add_event(event)
if 'stream_name' in data:
stream_receive_message(data['realm_id'], data['stream_name'], message)

View File

@@ -3,7 +3,7 @@ from zephyr.models import UserActivity
from zephyr.decorator import asynchronous, authenticated_api_view, \
authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \
has_request_variables, POST, to_non_negative_int, json_to_bool, \
has_request_variables, POST, to_non_negative_int, json_to_bool, json_to_list, \
JsonableError, authenticated_rest_api_view, REQ
from zephyr.lib.response import json_response, json_success, json_error
@@ -211,10 +211,12 @@ def rest_get_events(request, user_profile, handler,
def get_events_backend(request, user_profile, handler,
last_event_id = REQ(converter=int, default=None),
queue_id = REQ(default=None), apply_markdown=True,
event_types = REQ(default=None, converter=json_to_list),
dont_block = REQ(default=False, converter=json_to_bool)):
if queue_id is None:
if dont_block:
client = allocate_client_descriptor(user_profile.id, apply_markdown)
client = allocate_client_descriptor(user_profile.id, event_types,
apply_markdown)
queue_id = client.event_queue.id
else:
return json_error("Missing 'queue_id' argument")

View File

@@ -1415,16 +1415,18 @@ def api_events_register(request, user_profile,
@has_request_variables
def events_register_backend(request, user_profile, apply_markdown=True,
event_types=POST(converter=json_to_list)):
queue_id = request_event_queue(user_profile, apply_markdown)
event_types=POST(converter=json_to_list, default=None)):
queue_id = request_event_queue(user_profile, apply_markdown, event_types)
if queue_id is None:
return json_error(msg="Could not allocate queue")
ret = {'queue_id': queue_id}
if event_types is not None:
event_types = set(event_types)
# Fetch initial data
if "message" in event_types:
# Fetch initial data. When event_types is not specified, clients
# want all event types.
if event_types is None or "message" in event_types:
# The client should use get_old_messages() to fetch messages
# starting with the max_message_id. They will get messages
# newer than that ID via get_events()
@@ -1433,7 +1435,7 @@ def events_register_backend(request, user_profile, apply_markdown=True,
ret['max_message_id'] = messages[0].id
else:
ret['max_message_id'] = -1
if "pointer" in event_types:
if event_types is None or "pointer" in event_types:
ret['pointer'] = user_profile.pointer
# Apply events that came in while we were fetching initial data