diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index ddae096377..89faef6e53 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -1560,9 +1560,9 @@ def get_status_dict(requesting_user_profile): def do_events_register(user_profile, user_client, apply_markdown=True, - event_types=None, queue_lifespan_secs=0): + event_types=None, queue_lifespan_secs=0, all_public_streams=False): queue_id = request_event_queue(user_profile, user_client, apply_markdown, - queue_lifespan_secs, event_types) + queue_lifespan_secs, event_types, all_public_streams) if queue_id is None: raise JsonableError("Could not allocate event queue") diff --git a/zerver/lib/event_queue.py b/zerver/lib/event_queue.py index 948f165210..f3d546df9c 100644 --- a/zerver/lib/event_queue.py +++ b/zerver/lib/event_queue.py @@ -36,15 +36,20 @@ MAX_QUEUE_TIMEOUT_SECS = 7 * 24 * 60 * 60 HEARTBEAT_MIN_FREQ_SECS = 45 class ClientDescriptor(object): - def __init__(self, user_profile_id, id, event_types, client_type, - apply_markdown=True, lifespan_secs=0): + def __init__(self, user_profile_id, realm_id, id, event_types, client_type, + apply_markdown=True, all_public_streams=False, lifespan_secs=0): + # These objects are pickled on shutdown and restored on restart. + # If fields are added or semantics are changed, temporary code must be + # added to load_event_queues() to update the restored objects self.user_profile_id = user_profile_id + self.realm_id = realm_id self.current_handler = None self.event_queue = EventQueue(id) self.queue_timeout = lifespan_secs self.event_types = event_types self.last_connection_time = time.time() self.apply_markdown = apply_markdown + self.all_public_streams = all_public_streams self.client_type = client_type self._timeout_handle = None @@ -129,6 +134,8 @@ class EventQueue(object): clients = {} # maps user id to list of client descriptors user_clients = {} +# maps realm id to list of client descriptors with all_public_streams=True +realm_clients_all_streams = {} # list of registered gc hooks. # each one will be called with a user profile id, queue, and bool @@ -148,33 +155,49 @@ def get_client_descriptor(queue_id): def get_client_descriptors_for_user(user_profile_id): return user_clients.get(user_profile_id, []) -def allocate_client_descriptor(user_profile_id, event_types, client_type, - apply_markdown, lifespan_secs): +def get_client_descriptors_for_realm_all_streams(realm_id): + return realm_clients_all_streams.get(realm_id, []) + +def allocate_client_descriptor(user_profile_id, realm_id, event_types, client_type, + apply_markdown, all_public_streams, lifespan_secs): global next_queue_id id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id) next_queue_id += 1 - client = ClientDescriptor(user_profile_id, id, event_types, client_type, - apply_markdown, lifespan_secs) + client = ClientDescriptor(user_profile_id, realm_id, id, event_types, client_type, + apply_markdown, all_public_streams, lifespan_secs) clients[id] = client user_clients.setdefault(user_profile_id, []).append(client) + if all_public_streams: + realm_clients_all_streams.setdefault(realm_id, []).append(client) return client def gc_event_queues(): start = time.time() to_remove = set() affected_users = set() + affected_realms = set() for (id, client) in clients.iteritems(): if client.idle(start): to_remove.add(id) affected_users.add(client.user_profile_id) + affected_realms.add(client.realm_id) + + def filter_client_dict(client_dict, key): + if key not in client_dict: + return + + new_client_list = filter(lambda c: c.event_queue.id not in to_remove, + client_dict[key]) + if len(new_client_list) == 0: + del client_dict[key] + else: + client_dict[key] = new_client_list for user_id in affected_users: - new_client_list = filter(lambda c: c.event_queue.id not in to_remove, - user_clients[user_id]) - if len(new_client_list) == 0: - del user_clients[user_id] - else: - user_clients[user_id] = new_client_list + filter_client_dict(user_clients, user_id) + + for realm_id in affected_realms: + filter_client_dict(realm_clients_all_streams, realm_id) for id in to_remove: for cb in gc_hooks: @@ -210,11 +233,17 @@ def load_event_queues(): pass for client in clients.itervalues(): - # The following client_type block can be dropped once we've + # The following block can be dropped once we've # cleared out all our old event queues - if not hasattr(client, 'client_type'): - client.client_type = get_client("website") + if not hasattr(client, 'realm_id') or not hasattr(client, 'all_public_streams'): + from zerver.models import get_user_profile_by_id + client.realm_id = get_user_profile_by_id(client.user_profile_id).realm.id + client.all_public_streams = False + logging.info("Tornado updated a queue") + user_clients.setdefault(client.user_profile_id, []).append(client) + if client.all_public_streams: + realm_clients_all_streams.setdefault(client.realm_id, []).append(client) logging.info('Tornado loaded %d event queues in %.3fs' % (len(clients), time.time() - start)) @@ -256,10 +285,11 @@ def extract_json_response(resp): return resp.json def request_event_queue(user_profile, user_client, apply_markdown, - queue_lifespan_secs, event_types=None): + queue_lifespan_secs, event_types=None, all_public_streams=False): if settings.TORNADO_SERVER: req = {'dont_block' : 'true', 'apply_markdown': ujson.dumps(apply_markdown), + 'all_public_streams': ujson.dumps(all_public_streams), 'client' : 'internal', 'user_client' : user_client.name, 'lifespan_secs' : queue_lifespan_secs} diff --git a/zerver/tornado_callbacks.py b/zerver/tornado_callbacks.py index 076d70a0e5..b66e7a7f96 100644 --- a/zerver/tornado_callbacks.py +++ b/zerver/tornado_callbacks.py @@ -11,7 +11,8 @@ from zerver.lib.cache import cache_get_many, message_cache_key, \ user_profile_by_id_cache_key, cache_save_user_profile from zerver.lib.cache_helpers import cache_save_message from zerver.lib.queue import queue_json_publish -from zerver.lib.event_queue import get_client_descriptors_for_user +from zerver.lib.event_queue import get_client_descriptors_for_user,\ + get_client_descriptors_for_realm_all_streams from zerver.lib.timestamp import timestamp_to_datetime import os @@ -375,6 +376,13 @@ def process_new_message(data): message_dict_markdown = message.to_dict(True) message_dict_no_markdown = message.to_dict(False) + # To remove duplicate clients: Maps queue ID to (Client, flags) + send_to_clients = dict() + + if 'stream_name' in data and not data.get("invite_only"): + for client in get_client_descriptors_for_realm_all_streams(data['realm_id']): + send_to_clients[client.event_queue.id] = (client, None) + for user_data in data['users']: user_profile_id = user_data['id'] user_profile = user_profiles[user_data['id']] @@ -383,26 +391,7 @@ def process_new_message(data): user_receive_message(user_profile_id, message) for client in get_client_descriptors_for_user(user_profile_id): - if not client.accepts_event_type('message'): - continue - - # The below prevents (Zephyr) mirroring loops. - if ('mirror' in message.sending_client.name and - message.sending_client == client.client_type): - continue - - if client.apply_markdown: - message_dict = message_dict_markdown - else: - message_dict = message_dict_no_markdown - - # Make sure Zephyr mirroring bots know whether stream is invite-only - if "mirror" in client.client_type.name and data.get("invite_only"): - message_dict = message_dict.copy() - message_dict["invite_only_stream"] = True - - event = dict(type='message', message=message_dict, flags=flags) - client.add_event(event) + send_to_clients[client.event_queue.id] = (client, flags) # If the recipient was offline and the message was a single or group PM to him # or she was @-notified potentially notify more immediately @@ -418,6 +407,28 @@ def process_new_message(data): # from the Tornado process. So if there's no rabbitmq support do nothing queue_json_publish("missedmessage_emails", event, lambda event: None) + for client, flags in send_to_clients.itervalues(): + if not client.accepts_event_type('message'): + continue + + # The below prevents (Zephyr) mirroring loops. + if ('mirror' in message.sending_client.name and + message.sending_client == client.client_type): + continue + + if client.apply_markdown: + message_dict = message_dict_markdown + else: + message_dict = message_dict_no_markdown + + # Make sure Zephyr mirroring bots know whether stream is invite-only + if "mirror" in client.client_type.name and data.get("invite_only"): + message_dict = message_dict.copy() + message_dict["invite_only_stream"] = True + + event = dict(type='message', message=message_dict, flags=flags) + client.add_event(event) + if 'stream_name' in data: stream_receive_message(data['realm_id'], data['stream_name'], message) diff --git a/zerver/tornadoviews.py b/zerver/tornadoviews.py index 46e76d5362..4e85700b72 100644 --- a/zerver/tornadoviews.py +++ b/zerver/tornadoviews.py @@ -193,6 +193,7 @@ def get_events_backend(request, user_profile, handler = None, last_event_id = REQ(converter=int, default=None), queue_id = REQ(default=None), apply_markdown = REQ(default=False, converter=json_to_bool), + all_public_streams = REQ(default=False, converter=json_to_bool), event_types = REQ(default=None, converter=json_to_list), dont_block = REQ(default=False, converter=json_to_bool), lifespan_secs = REQ(default=0, converter=int)): @@ -202,8 +203,9 @@ def get_events_backend(request, user_profile, handler = None, orig_queue_id = queue_id if queue_id is None: if dont_block: - client = allocate_client_descriptor(user_profile.id, event_types, - user_client, apply_markdown, lifespan_secs) + client = allocate_client_descriptor(user_profile.id, user_profile.realm.id, + event_types, user_client, apply_markdown, + all_public_streams, lifespan_secs) queue_id = client.event_queue.id else: return json_error("Missing 'queue_id' argument") diff --git a/zerver/views/__init__.py b/zerver/views/__init__.py index f1094d0673..34221046f6 100644 --- a/zerver/views/__init__.py +++ b/zerver/views/__init__.py @@ -2393,16 +2393,19 @@ def json_events_register(request, user_profile): # Does not need to be authenticated because it's called from rest_dispatch @has_request_variables def api_events_register(request, user_profile, - apply_markdown=REQ(default=False, converter=json_to_bool)): + apply_markdown=REQ(default=False, converter=json_to_bool), + all_public_streams=REQ(default=False, converter=json_to_bool)): return events_register_backend(request, user_profile, - apply_markdown=apply_markdown) + apply_markdown=apply_markdown, + all_public_streams=all_public_streams) @has_request_variables def events_register_backend(request, user_profile, apply_markdown=True, + all_public_streams=False, event_types=REQ(converter=json_to_list, default=None), queue_lifespan_secs=REQ(converter=int, default=0)): ret = do_events_register(user_profile, request.client, apply_markdown, - event_types, queue_lifespan_secs) + event_types, queue_lifespan_secs, all_public_streams) return json_success(ret) @authenticated_json_post_view