Simplify how we apply events for unread messages.

The logic to apply events to page_params['unread_msgs'] was
complicated due to the aggregated data structures that we pass
down to the client.

Now we defer the aggregation logic until after we apply the
events.  This leads to some simplifications in that codepath,
as well as some performance enhancements.

The intermediate data structure has sets and dictionaries that
generally are keyed by message_id, so most message-related
updates are O(1) in nature.

Also, by waiting to compute the counts until the end, it's a
bit less messy to try to keep track of increments/decrements.
Instead, we just update the dictionaries and sets during the
event-apply phase.

This change also fixes some corner cases:

    * We now respect mutes when updating counts.
    * For message updates, instead of bluntly updating
      the whole topic bucket, we update individual
      message ids.

Unfortunately, this change doesn't seem to address the pesky
test that fails sporadically on Travis, related to mention
updates.  It will change the symptom, slightly, though.
This commit is contained in:
Steve Howell
2017-10-04 15:34:19 -07:00
committed by Tim Abbott
parent c567f105c9
commit e56084fcf7
3 changed files with 74 additions and 68 deletions

View File

@@ -19,8 +19,9 @@ from zerver.lib.attachments import user_attachments
from zerver.lib.avatar import avatar_url, avatar_url_from_dict from zerver.lib.avatar import avatar_url, avatar_url_from_dict
from zerver.lib.hotspots import get_next_hotspots from zerver.lib.hotspots import get_next_hotspots
from zerver.lib.message import ( from zerver.lib.message import (
aggregate_unread_data,
apply_unread_message_event, apply_unread_message_event,
get_unread_message_ids_per_recipient, get_raw_unread_data,
) )
from zerver.lib.narrow import check_supported_events_narrow_filter from zerver.lib.narrow import check_supported_events_narrow_filter
from zerver.lib.soft_deactivation import maybe_catch_up_soft_deactivated_user from zerver.lib.soft_deactivation import maybe_catch_up_soft_deactivated_user
@@ -168,7 +169,7 @@ def fetch_initial_state_data(user_profile, event_types, queue_id,
# message updates. This is due to the fact that new messages will not # message updates. This is due to the fact that new messages will not
# generate a flag update so we need to use the flags field in the # generate a flag update so we need to use the flags field in the
# message event. # message event.
state['unread_msgs'] = get_unread_message_ids_per_recipient(user_profile) state['raw_unread_msgs'] = get_raw_unread_data(user_profile)
if want('stream'): if want('stream'):
state['streams'] = do_get_streams(user_profile) state['streams'] = do_get_streams(user_profile)
@@ -192,22 +193,15 @@ def fetch_initial_state_data(user_profile, event_types, queue_id,
return state return state
def remove_message_id_from_unread_mgs(state, remove_id): def remove_message_id_from_unread_mgs(state, message_id):
# type: (Dict[str, Dict[str, Any]], int) -> None # type: (Dict[str, Dict[str, Any]], int) -> None
for message_type in ['pms', 'streams', 'huddles']: raw_unread = state['raw_unread_msgs']
threads = state['unread_msgs'][message_type]
for obj in threads:
msg_ids = obj['unread_message_ids']
if remove_id in msg_ids:
state['unread_msgs']['count'] -= 1
msg_ids.remove(remove_id)
state['unread_msgs'][message_type] = [
obj for obj in threads
if obj['unread_message_ids']
]
if remove_id in state['unread_msgs']['mentions']: for key in ['pm_dict', 'stream_dict', 'huddle_dict']:
state['unread_msgs']['mentions'].remove(remove_id) raw_unread[key].pop(message_id, None)
raw_unread['unmuted_stream_msgs'].discard(message_id)
raw_unread['mentions'].discard(message_id)
def apply_events(state, events, user_profile, include_subscribers=True, def apply_events(state, events, user_profile, include_subscribers=True,
fetch_event_types=None): fetch_event_types=None):
@@ -229,8 +223,10 @@ def apply_event(state, event, user_profile, include_subscribers):
# type: (Dict[str, Any], Dict[str, Any], UserProfile, bool) -> None # type: (Dict[str, Any], Dict[str, Any], UserProfile, bool) -> None
if event['type'] == "message": if event['type'] == "message":
state['max_message_id'] = max(state['max_message_id'], event['message']['id']) state['max_message_id'] = max(state['max_message_id'], event['message']['id'])
if 'unread_msgs' in state: if 'raw_unread_msgs' in state:
apply_unread_message_event(state['unread_msgs'], event['message']) apply_unread_message_event(user_profile,
state['raw_unread_msgs'],
event['message'])
elif event['type'] == "hotspots": elif event['type'] == "hotspots":
state['hotspots'] = event['hotspots'] state['hotspots'] = event['hotspots']
@@ -434,13 +430,15 @@ def apply_event(state, event, user_profile, include_subscribers):
presence_user_profile = get_user(event['email'], user_profile.realm) presence_user_profile = get_user(event['email'], user_profile.realm)
state['presences'][event['email']] = UserPresence.get_status_dict_by_user(presence_user_profile)[event['email']] state['presences'][event['email']] = UserPresence.get_status_dict_by_user(presence_user_profile)[event['email']]
elif event['type'] == "update_message": elif event['type'] == "update_message":
# The client will get the updated message directly, but we need to # We don't return messages in /register, so we don't need to
# update the subjects of our unread message ids # do anything for content updates, but we may need to update
if 'subject' in event and 'unread_msgs' in state: # the unread_msgs data if the topic of an unread message changed.
for obj in state['unread_msgs']['streams']: if 'subject' in event:
if obj['stream_id'] == event['stream_id']: stream_dict = state['raw_unread_msgs']['stream_dict']
if obj['topic'] == event['orig_subject']: topic = event['subject']
obj['topic'] = event['subject'] for message_id in event['message_ids']:
if message_id in stream_dict:
stream_dict[message_id]['topic'] = topic
elif event['type'] == "delete_message": elif event['type'] == "delete_message":
max_message = Message.objects.filter( max_message = Message.objects.filter(
usermessage__user_profile=user_profile).order_by('-id').first() usermessage__user_profile=user_profile).order_by('-id').first()
@@ -458,8 +456,9 @@ def apply_event(state, event, user_profile, include_subscribers):
# Typing notification events are transient and thus ignored # Typing notification events are transient and thus ignored
pass pass
elif event['type'] == "update_message_flags": elif event['type'] == "update_message_flags":
# The client will get the message with the updated flags directly but # We don't return messages in `/register`, so most flags we
# we need to keep the unread_msgs updated. # can ignore, but we do need to update the unread_msgs data if
# unread state is changed.
if event['flag'] == 'read' and event['operation'] == 'add': if event['flag'] == 'read' and event['operation'] == 'add':
for remove_id in event['messages']: for remove_id in event['messages']:
remove_message_id_from_unread_mgs(state, remove_id) remove_message_id_from_unread_mgs(state, remove_id)
@@ -527,6 +526,21 @@ def do_events_register(user_profile, user_client, apply_markdown=True,
apply_events(ret, events, user_profile, include_subscribers=include_subscribers, apply_events(ret, events, user_profile, include_subscribers=include_subscribers,
fetch_event_types=fetch_event_types) fetch_event_types=fetch_event_types)
'''
NOTE:
Below is an example of post-processing initial state data AFTER we
apply events. For large payloads like `unread_msgs`, it's helpful
to have an intermediate data structure that is easy to manipulate
with O(1)-type operations as we apply events.
Then, only at the end, we put it in the form that's more appropriate
for client.
'''
if 'raw_unread_msgs' in ret:
ret['unread_msgs'] = aggregate_unread_data(ret['raw_unread_msgs'])
del ret['raw_unread_msgs']
if len(events) > 0: if len(events) > 0:
ret['last_event_id'] = events[-1]['id'] ret['last_event_id'] = events[-1]['id']
else: else:

View File

@@ -13,7 +13,10 @@ from zerver.lib.cache import cache_with_key, to_dict_cache_key
from zerver.lib.request import JsonableError from zerver.lib.request import JsonableError
from zerver.lib.str_utils import force_bytes, dict_with_str_keys from zerver.lib.str_utils import force_bytes, dict_with_str_keys
from zerver.lib.timestamp import datetime_to_timestamp from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.topic_mutes import build_topic_mute_checker from zerver.lib.topic_mutes import (
build_topic_mute_checker,
topic_is_muted,
)
from zerver.models import ( from zerver.models import (
get_display_recipient_by_id, get_display_recipient_by_id,
@@ -552,6 +555,7 @@ def get_raw_unread_data(user_profile):
return dict( return dict(
pm_dict=pm_dict, pm_dict=pm_dict,
stream_dict=stream_dict, stream_dict=stream_dict,
muted_stream_ids=muted_stream_ids,
unmuted_stream_msgs=unmuted_stream_msgs, unmuted_stream_msgs=unmuted_stream_msgs,
huddle_dict=huddle_dict, huddle_dict=huddle_dict,
mentions=mentions, mentions=mentions,
@@ -602,10 +606,8 @@ def aggregate_unread_data(raw_data):
return result return result
def apply_unread_message_event(state, message): def apply_unread_message_event(user_profile, state, message):
# type: (Dict[str, Any], Dict[str, Any]) -> None # type: (UserProfile, Dict[str, Any], Dict[str, Any]) -> None
state['count'] += 1
message_id = message['id'] message_id = message['id']
if message['type'] == 'stream': if message['type'] == 'stream':
message_type = 'stream' message_type = 'stream'
@@ -622,49 +624,36 @@ def apply_unread_message_event(state, message):
raise AssertionError("Invalid message type %s" % (message['type'],)) raise AssertionError("Invalid message type %s" % (message['type'],))
if message_type == 'stream': if message_type == 'stream':
unread_key = 'streams'
stream_id = message['stream_id'] stream_id = message['stream_id']
topic = message['subject'] topic = message['subject']
new_row = dict(
my_key = (stream_id, topic) # type: Any
key_func = lambda obj: (obj['stream_id'], obj['topic'])
new_obj = dict(
stream_id=stream_id, stream_id=stream_id,
topic=topic, topic=topic,
unread_message_ids=[message_id],
) )
elif message_type == 'private': state['stream_dict'][message_id] = new_row
unread_key = 'pms'
sender_id = message['sender_id']
my_key = sender_id if stream_id not in state['muted_stream_ids']:
key_func = lambda obj: obj['sender_id'] # This next check hits the database.
new_obj = dict( if not topic_is_muted(user_profile, stream_id, topic):
state['unmuted_stream_msgs'].add(message_id)
elif message_type == 'private':
sender_id = message['sender_id']
new_row = dict(
sender_id=sender_id, sender_id=sender_id,
unread_message_ids=[message_id],
) )
state['pm_dict'][message_id] = new_row
else: else:
unread_key = 'huddles'
display_recipient = message['display_recipient'] display_recipient = message['display_recipient']
user_ids = [obj['id'] for obj in display_recipient] user_ids = [obj['id'] for obj in display_recipient]
user_ids = sorted(user_ids) user_ids = sorted(user_ids)
my_key = ','.join(str(uid) for uid in user_ids) user_ids_string = ','.join(str(uid) for uid in user_ids)
key_func = lambda obj: obj['user_ids_string'] new_row = dict(
new_obj = dict( user_ids_string=user_ids_string,
user_ids_string=my_key,
unread_message_ids=[message_id],
) )
state['huddle_dict'][message_id] = new_row
if message.get('is_mentioned'): mentioned = message.get('is_mentioned', False)
if message_id not in state['mentions']: if mentioned:
state['mentions'].append(message_id) state['mentions'].add(message_id)
for obj in state[unread_key]:
if key_func(obj) == my_key:
obj['unread_message_ids'].append(message_id)
obj['unread_message_ids'].sort()
return
state[unread_key].append(new_obj)
state[unread_key].sort(key=key_func)

View File

@@ -71,7 +71,11 @@ from zerver.lib.events import (
apply_events, apply_events,
fetch_initial_state_data, fetch_initial_state_data,
) )
from zerver.lib.message import render_markdown from zerver.lib.message import (
get_unread_message_ids_per_recipient,
render_markdown,
UnreadMessagesResult,
)
from zerver.lib.test_helpers import POSTRequestMock, get_subscription, \ from zerver.lib.test_helpers import POSTRequestMock, get_subscription, \
stub_event_queue_user_events stub_event_queue_user_events
from zerver.lib.test_classes import ( from zerver.lib.test_classes import (
@@ -1715,9 +1719,8 @@ class FetchInitialStateDataTest(ZulipTestCase):
'hello3') 'hello3')
def get_unread_data(): def get_unread_data():
# type: () -> Dict[str, Any] # type: () -> UnreadMessagesResult
result = fetch_initial_state_data(user_profile, None, "")['unread_msgs'] return get_unread_message_ids_per_recipient(user_profile)
return result
result = get_unread_data() result = get_unread_data()