Add all_public_streams parameter to register API.

It makes the event queue return all messages on public streams, rather
than only the user's subscriptions. It's meant for use with chat bots.

(imported from commit 12d7e9e9586369efa7e7ff9eb060f25360327f71)
This commit is contained in:
Kevin Mehall
2013-10-17 17:51:25 -04:00
parent 06566b3776
commit 7b8dea3d54
5 changed files with 90 additions and 44 deletions

View File

@@ -1560,9 +1560,9 @@ def get_status_dict(requesting_user_profile):
def do_events_register(user_profile, user_client, apply_markdown=True, def do_events_register(user_profile, user_client, apply_markdown=True,
event_types=None, queue_lifespan_secs=0): event_types=None, queue_lifespan_secs=0, all_public_streams=False):
queue_id = request_event_queue(user_profile, user_client, apply_markdown, queue_id = request_event_queue(user_profile, user_client, apply_markdown,
queue_lifespan_secs, event_types) queue_lifespan_secs, event_types, all_public_streams)
if queue_id is None: if queue_id is None:
raise JsonableError("Could not allocate event queue") raise JsonableError("Could not allocate event queue")

View File

@@ -36,15 +36,20 @@ MAX_QUEUE_TIMEOUT_SECS = 7 * 24 * 60 * 60
HEARTBEAT_MIN_FREQ_SECS = 45 HEARTBEAT_MIN_FREQ_SECS = 45
class ClientDescriptor(object): class ClientDescriptor(object):
def __init__(self, user_profile_id, id, event_types, client_type, def __init__(self, user_profile_id, realm_id, id, event_types, client_type,
apply_markdown=True, lifespan_secs=0): apply_markdown=True, all_public_streams=False, lifespan_secs=0):
# These objects are pickled on shutdown and restored on restart.
# If fields are added or semantics are changed, temporary code must be
# added to load_event_queues() to update the restored objects
self.user_profile_id = user_profile_id self.user_profile_id = user_profile_id
self.realm_id = realm_id
self.current_handler = None self.current_handler = None
self.event_queue = EventQueue(id) self.event_queue = EventQueue(id)
self.queue_timeout = lifespan_secs self.queue_timeout = lifespan_secs
self.event_types = event_types self.event_types = event_types
self.last_connection_time = time.time() self.last_connection_time = time.time()
self.apply_markdown = apply_markdown self.apply_markdown = apply_markdown
self.all_public_streams = all_public_streams
self.client_type = client_type self.client_type = client_type
self._timeout_handle = None self._timeout_handle = None
@@ -129,6 +134,8 @@ class EventQueue(object):
clients = {} clients = {}
# maps user id to list of client descriptors # maps user id to list of client descriptors
user_clients = {} user_clients = {}
# maps realm id to list of client descriptors with all_public_streams=True
realm_clients_all_streams = {}
# list of registered gc hooks. # list of registered gc hooks.
# each one will be called with a user profile id, queue, and bool # each one will be called with a user profile id, queue, and bool
@@ -148,33 +155,49 @@ def get_client_descriptor(queue_id):
def get_client_descriptors_for_user(user_profile_id): def get_client_descriptors_for_user(user_profile_id):
return user_clients.get(user_profile_id, []) return user_clients.get(user_profile_id, [])
def allocate_client_descriptor(user_profile_id, event_types, client_type, def get_client_descriptors_for_realm_all_streams(realm_id):
apply_markdown, lifespan_secs): return realm_clients_all_streams.get(realm_id, [])
def allocate_client_descriptor(user_profile_id, realm_id, event_types, client_type,
apply_markdown, all_public_streams, lifespan_secs):
global next_queue_id global next_queue_id
id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id) id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id)
next_queue_id += 1 next_queue_id += 1
client = ClientDescriptor(user_profile_id, id, event_types, client_type, client = ClientDescriptor(user_profile_id, realm_id, id, event_types, client_type,
apply_markdown, lifespan_secs) apply_markdown, all_public_streams, lifespan_secs)
clients[id] = client clients[id] = client
user_clients.setdefault(user_profile_id, []).append(client) user_clients.setdefault(user_profile_id, []).append(client)
if all_public_streams:
realm_clients_all_streams.setdefault(realm_id, []).append(client)
return client return client
def gc_event_queues(): def gc_event_queues():
start = time.time() start = time.time()
to_remove = set() to_remove = set()
affected_users = set() affected_users = set()
affected_realms = set()
for (id, client) in clients.iteritems(): for (id, client) in clients.iteritems():
if client.idle(start): if client.idle(start):
to_remove.add(id) to_remove.add(id)
affected_users.add(client.user_profile_id) affected_users.add(client.user_profile_id)
affected_realms.add(client.realm_id)
def filter_client_dict(client_dict, key):
if key not in client_dict:
return
new_client_list = filter(lambda c: c.event_queue.id not in to_remove,
client_dict[key])
if len(new_client_list) == 0:
del client_dict[key]
else:
client_dict[key] = new_client_list
for user_id in affected_users: for user_id in affected_users:
new_client_list = filter(lambda c: c.event_queue.id not in to_remove, filter_client_dict(user_clients, user_id)
user_clients[user_id])
if len(new_client_list) == 0: for realm_id in affected_realms:
del user_clients[user_id] filter_client_dict(realm_clients_all_streams, realm_id)
else:
user_clients[user_id] = new_client_list
for id in to_remove: for id in to_remove:
for cb in gc_hooks: for cb in gc_hooks:
@@ -210,11 +233,17 @@ def load_event_queues():
pass pass
for client in clients.itervalues(): for client in clients.itervalues():
# The following client_type block can be dropped once we've # The following block can be dropped once we've
# cleared out all our old event queues # cleared out all our old event queues
if not hasattr(client, 'client_type'): if not hasattr(client, 'realm_id') or not hasattr(client, 'all_public_streams'):
client.client_type = get_client("website") from zerver.models import get_user_profile_by_id
client.realm_id = get_user_profile_by_id(client.user_profile_id).realm.id
client.all_public_streams = False
logging.info("Tornado updated a queue")
user_clients.setdefault(client.user_profile_id, []).append(client) user_clients.setdefault(client.user_profile_id, []).append(client)
if client.all_public_streams:
realm_clients_all_streams.setdefault(client.realm_id, []).append(client)
logging.info('Tornado loaded %d event queues in %.3fs' logging.info('Tornado loaded %d event queues in %.3fs'
% (len(clients), time.time() - start)) % (len(clients), time.time() - start))
@@ -256,10 +285,11 @@ def extract_json_response(resp):
return resp.json return resp.json
def request_event_queue(user_profile, user_client, apply_markdown, def request_event_queue(user_profile, user_client, apply_markdown,
queue_lifespan_secs, event_types=None): queue_lifespan_secs, event_types=None, all_public_streams=False):
if settings.TORNADO_SERVER: if settings.TORNADO_SERVER:
req = {'dont_block' : 'true', req = {'dont_block' : 'true',
'apply_markdown': ujson.dumps(apply_markdown), 'apply_markdown': ujson.dumps(apply_markdown),
'all_public_streams': ujson.dumps(all_public_streams),
'client' : 'internal', 'client' : 'internal',
'user_client' : user_client.name, 'user_client' : user_client.name,
'lifespan_secs' : queue_lifespan_secs} 'lifespan_secs' : queue_lifespan_secs}

View File

@@ -11,7 +11,8 @@ from zerver.lib.cache import cache_get_many, message_cache_key, \
user_profile_by_id_cache_key, cache_save_user_profile user_profile_by_id_cache_key, cache_save_user_profile
from zerver.lib.cache_helpers import cache_save_message from zerver.lib.cache_helpers import cache_save_message
from zerver.lib.queue import queue_json_publish from zerver.lib.queue import queue_json_publish
from zerver.lib.event_queue import get_client_descriptors_for_user from zerver.lib.event_queue import get_client_descriptors_for_user,\
get_client_descriptors_for_realm_all_streams
from zerver.lib.timestamp import timestamp_to_datetime from zerver.lib.timestamp import timestamp_to_datetime
import os import os
@@ -375,6 +376,13 @@ def process_new_message(data):
message_dict_markdown = message.to_dict(True) message_dict_markdown = message.to_dict(True)
message_dict_no_markdown = message.to_dict(False) message_dict_no_markdown = message.to_dict(False)
# To remove duplicate clients: Maps queue ID to (Client, flags)
send_to_clients = dict()
if 'stream_name' in data and not data.get("invite_only"):
for client in get_client_descriptors_for_realm_all_streams(data['realm_id']):
send_to_clients[client.event_queue.id] = (client, None)
for user_data in data['users']: for user_data in data['users']:
user_profile_id = user_data['id'] user_profile_id = user_data['id']
user_profile = user_profiles[user_data['id']] user_profile = user_profiles[user_data['id']]
@@ -383,6 +391,23 @@ def process_new_message(data):
user_receive_message(user_profile_id, message) user_receive_message(user_profile_id, message)
for client in get_client_descriptors_for_user(user_profile_id): for client in get_client_descriptors_for_user(user_profile_id):
send_to_clients[client.event_queue.id] = (client, flags)
# If the recipient was offline and the message was a single or group PM to him
# or she was @-notified potentially notify more immediately
received_pm = message.recipient.type in (Recipient.PERSONAL, Recipient.HUDDLE) and \
user_profile_id != message.sender.id
mentioned = 'mentioned' in flags
if (received_pm or mentioned) and receiver_is_idle(user_profile, realm_presences):
if receives_offline_notifications(user_profile):
event = build_offline_notification_event(user_profile_id, message.id)
# We require RabbitMQ to do this, as we can't call the email handler
# from the Tornado process. So if there's no rabbitmq support do nothing
queue_json_publish("missedmessage_emails", event, lambda event: None)
for client, flags in send_to_clients.itervalues():
if not client.accepts_event_type('message'): if not client.accepts_event_type('message'):
continue continue
@@ -404,20 +429,6 @@ def process_new_message(data):
event = dict(type='message', message=message_dict, flags=flags) event = dict(type='message', message=message_dict, flags=flags)
client.add_event(event) client.add_event(event)
# If the recipient was offline and the message was a single or group PM to him
# or she was @-notified potentially notify more immediately
received_pm = message.recipient.type in (Recipient.PERSONAL, Recipient.HUDDLE) and \
user_profile_id != message.sender.id
mentioned = 'mentioned' in flags
if (received_pm or mentioned) and receiver_is_idle(user_profile, realm_presences):
if receives_offline_notifications(user_profile):
event = build_offline_notification_event(user_profile_id, message.id)
# We require RabbitMQ to do this, as we can't call the email handler
# from the Tornado process. So if there's no rabbitmq support do nothing
queue_json_publish("missedmessage_emails", event, lambda event: None)
if 'stream_name' in data: if 'stream_name' in data:
stream_receive_message(data['realm_id'], data['stream_name'], message) stream_receive_message(data['realm_id'], data['stream_name'], message)

View File

@@ -193,6 +193,7 @@ def get_events_backend(request, user_profile, handler = None,
last_event_id = REQ(converter=int, default=None), last_event_id = REQ(converter=int, default=None),
queue_id = REQ(default=None), queue_id = REQ(default=None),
apply_markdown = REQ(default=False, converter=json_to_bool), apply_markdown = REQ(default=False, converter=json_to_bool),
all_public_streams = REQ(default=False, converter=json_to_bool),
event_types = REQ(default=None, converter=json_to_list), event_types = REQ(default=None, converter=json_to_list),
dont_block = REQ(default=False, converter=json_to_bool), dont_block = REQ(default=False, converter=json_to_bool),
lifespan_secs = REQ(default=0, converter=int)): lifespan_secs = REQ(default=0, converter=int)):
@@ -202,8 +203,9 @@ def get_events_backend(request, user_profile, handler = None,
orig_queue_id = queue_id orig_queue_id = queue_id
if queue_id is None: if queue_id is None:
if dont_block: if dont_block:
client = allocate_client_descriptor(user_profile.id, event_types, client = allocate_client_descriptor(user_profile.id, user_profile.realm.id,
user_client, apply_markdown, lifespan_secs) event_types, user_client, apply_markdown,
all_public_streams, lifespan_secs)
queue_id = client.event_queue.id queue_id = client.event_queue.id
else: else:
return json_error("Missing 'queue_id' argument") return json_error("Missing 'queue_id' argument")

View File

@@ -2393,16 +2393,19 @@ def json_events_register(request, user_profile):
# Does not need to be authenticated because it's called from rest_dispatch # Does not need to be authenticated because it's called from rest_dispatch
@has_request_variables @has_request_variables
def api_events_register(request, user_profile, def api_events_register(request, user_profile,
apply_markdown=REQ(default=False, converter=json_to_bool)): apply_markdown=REQ(default=False, converter=json_to_bool),
all_public_streams=REQ(default=False, converter=json_to_bool)):
return events_register_backend(request, user_profile, return events_register_backend(request, user_profile,
apply_markdown=apply_markdown) apply_markdown=apply_markdown,
all_public_streams=all_public_streams)
@has_request_variables @has_request_variables
def events_register_backend(request, user_profile, apply_markdown=True, def events_register_backend(request, user_profile, apply_markdown=True,
all_public_streams=False,
event_types=REQ(converter=json_to_list, default=None), event_types=REQ(converter=json_to_list, default=None),
queue_lifespan_secs=REQ(converter=int, default=0)): queue_lifespan_secs=REQ(converter=int, default=0)):
ret = do_events_register(user_profile, request.client, apply_markdown, ret = do_events_register(user_profile, request.client, apply_markdown,
event_types, queue_lifespan_secs) event_types, queue_lifespan_secs, all_public_streams)
return json_success(ret) return json_success(ret)
@authenticated_json_post_view @authenticated_json_post_view