Add presences info to the event data for process_new_message().

The do_send_messages() populates the user_presences data structure
for process_new_message(), so that Tornado code never needs to hit
the database or memcached to get the user presence info.

(imported from commit 194aeaead8fa712297a2ee8aff5aa773b92f1207)
This commit is contained in:
Steve Howell
2013-09-15 13:10:16 -04:00
parent a4744b10ed
commit 99dd2ed8df
3 changed files with 17 additions and 27 deletions

View File

@@ -298,10 +298,18 @@ def do_send_messages(messages):
message['message'].to_dict(apply_markdown=True) message['message'].to_dict(apply_markdown=True)
message['message'].to_dict(apply_markdown=False) message['message'].to_dict(apply_markdown=False)
user_flags = user_message_flags.get(message['message'].id, {}) user_flags = user_message_flags.get(message['message'].id, {})
sender = message['message'].sender
recipient_emails = [user.email for user in message['recipients']]
user_presences = get_status_dict(sender)
presences = {}
for email in recipient_emails:
if email in user_presences:
presences[email] = user_presences[email]
data = dict( data = dict(
type = 'new_message', type = 'new_message',
message = message['message'].id, message = message['message'].id,
sender_realm = message['message'].sender.realm.id, presences = user_presences,
users = [{'id': user.id, 'flags': user_flags.get(user.id, [])} users = [{'id': user.id, 'flags': user_flags.get(user.id, [])}
for user in message['recipients']]) for user in message['recipients']])
if message['message'].recipient.type == Recipient.STREAM: if message['message'].recipient.type == Recipient.STREAM:

View File

@@ -263,8 +263,5 @@ def update_user_presence_cache(sender, **kwargs):
# entry in the UserPresence cache to avoid giving out stale state # entry in the UserPresence cache to avoid giving out stale state
djcache.delete(KEY_PREFIX + status_dict_cache_key(user_profile)) djcache.delete(KEY_PREFIX + status_dict_cache_key(user_profile))
def cache_save_status_dict(realm_id, status_dict):
cache_set(status_dict_cache_key_for_realm_id(realm_id), status_dict, timeout=3600*24*7)
def realm_alert_words_cache_key(realm): def realm_alert_words_cache_key(realm):
return "realm_alert_words:%s" % (realm.domain,) return "realm_alert_words:%s" % (realm.domain,)

View File

@@ -4,13 +4,12 @@ from django.conf import settings
from django.utils.timezone import now from django.utils.timezone import now
from zerver.models import Message, UserProfile, UserMessage, \ from zerver.models import Message, UserProfile, UserMessage, \
Recipient, Stream, get_stream, get_user_profile_by_id, \ Recipient, Stream, get_stream, get_user_profile_by_id
get_status_dict_by_realm, UserPresence
from zerver.decorator import JsonableError from zerver.decorator import JsonableError
from zerver.lib.cache import cache_get_many, message_cache_key, \ from zerver.lib.cache import cache_get_many, message_cache_key, \
user_profile_by_id_cache_key, cache_save_user_profile, \ user_profile_by_id_cache_key, cache_save_user_profile, \
status_dict_cache_key_for_realm_id, cache_save_status_dict status_dict_cache_key_for_realm_id
from zerver.lib.cache_helpers import cache_save_message from zerver.lib.cache_helpers import cache_save_message
from zerver.lib.queue import queue_json_publish from zerver.lib.queue import queue_json_publish
from zerver.lib.event_queue import get_client_descriptors_for_user from zerver.lib.event_queue import get_client_descriptors_for_user
@@ -298,18 +297,13 @@ def missedmessage_hook(user_profile_id, queue, last_for_client):
event = build_offline_notification_event(user_profile_id, msg_id) event = build_offline_notification_event(user_profile_id, msg_id)
queue_json_publish("missedmessage_emails", event, lambda event: None) queue_json_publish("missedmessage_emails", event, lambda event: None)
def cache_load_message_data(message_id, users, sender_realm_id): def cache_load_message_data(message_id, users):
# Get everything that we'll need out of memcached in one fetch, to save round-trip times: # Get everything that we'll need out of memcached in one fetch, to save round-trip times:
# * The message itself # * The message itself
# * Every recipient's UserProfile # * Every recipient's UserProfile
# * The cached status dict (all UserPresences in a realm)
user_profile_keys = [user_profile_by_id_cache_key(user_data['id']) for user_data in users] user_profile_keys = [user_profile_by_id_cache_key(user_data['id']) for user_data in users]
cache_keys = [message_cache_key(message_id)] cache_keys = [message_cache_key(message_id)]
if sender_realm_id is not None:
realm_key = status_dict_cache_key_for_realm_id(sender_realm_id)
cache_keys.append(realm_key)
cache_keys.extend(user_profile_keys) cache_keys.extend(user_profile_keys)
# Single memcached fetch # Single memcached fetch
@@ -318,10 +312,6 @@ def cache_load_message_data(message_id, users, sender_realm_id):
cache_extractor = lambda result: result[0] if result is not None else None cache_extractor = lambda result: result[0] if result is not None else None
message = cache_extractor(result.get(cache_keys[0], None)) message = cache_extractor(result.get(cache_keys[0], None))
if sender_realm_id is not None:
user_presences = cache_extractor(result.get(realm_key))
else:
user_presences = None
user_profiles = dict((user_data['id'], cache_extractor(result.get(user_profile_by_id_cache_key(user_data['id']), None))) user_profiles = dict((user_data['id'], cache_extractor(result.get(user_profile_by_id_cache_key(user_data['id']), None)))
for user_data in users) for user_data in users)
@@ -335,13 +325,7 @@ def cache_load_message_data(message_id, users, sender_realm_id):
message = Message.objects.select_related().get(id=message_id) message = Message.objects.select_related().get(id=message_id)
cache_save_message(message) cache_save_message(message)
if user_presences is None and sender_realm_id is not None:
if not settings.TEST_SUITE:
logging.warning("Tornado failed to load user presences from memcached when delivering message to realm key %s: %s"
% (realm_key, result.get(realm_key, None)))
user_presences = get_status_dict_by_realm(sender_realm_id)
cache_save_status_dict(sender_realm_id, user_presences)
for user_profile_id, user_profile in user_profiles.iteritems(): for user_profile_id, user_profile in user_profiles.iteritems():
if user_profile: if user_profile:
continue continue
@@ -352,7 +336,7 @@ def cache_load_message_data(message_id, users, sender_realm_id):
user_profiles[user_profile_id] = user_profile user_profiles[user_profile_id] = user_profile
cache_save_user_profile(user_profile) cache_save_user_profile(user_profile)
return message, user_profiles, user_presences return message, user_profiles
def receiver_is_idle(user_profile, realm_presences): def receiver_is_idle(user_profile, realm_presences):
# If a user has no message-receiving event queues, they've got no open zulip # If a user has no message-receiving event queues, they've got no open zulip
@@ -384,9 +368,10 @@ def receiver_is_idle(user_profile, realm_presences):
return off_zulip or idle_too_long return off_zulip or idle_too_long
def process_new_message(data): def process_new_message(data):
message, user_profiles, realm_presences = cache_load_message_data(data['message'], message, user_profiles = cache_load_message_data(data['message'],
data['users'], data['users'])
data.get('sender_realm', None))
realm_presences = data['presences']
message_dict_markdown = message.to_dict(True) message_dict_markdown = message.to_dict(True)
message_dict_no_markdown = message.to_dict(False) message_dict_no_markdown = message.to_dict(False)