Extract data from cache in tornado_callbacks with cache_get_many

This reduces the number of memcached calls we make in our time-
slice-limited tornado event handler.

(imported from commit 8903ce4ac754ba82d57e04d1b0356be7533edee2)
This commit is contained in:
Leo Franchi
2013-09-13 16:35:27 -04:00
parent 5eaeddea12
commit 7bb96bd36b
2 changed files with 69 additions and 9 deletions

View File

@@ -233,6 +233,9 @@ def user_profile_by_email_cache_key(email):
def user_profile_by_id_cache_key(user_profile_id): def user_profile_by_id_cache_key(user_profile_id):
return "user_profile_by_id:%s" % (user_profile_id,) return "user_profile_by_id:%s" % (user_profile_id,)
def cache_save_user_profile(user_profile):
cache_set(user_profile_by_id_cache_key(user_profile.id), user_profile, timeout=3600*24*7)
# Called by models.py to flush the user_profile cache whenever we save # Called by models.py to flush the user_profile cache whenever we save
# a user_profile object # a user_profile object
def update_user_profile_cache(sender, **kwargs): def update_user_profile_cache(sender, **kwargs):

View File

@@ -5,7 +5,9 @@ from zerver.models import Message, UserProfile, UserMessage, \
Recipient, Stream, get_stream, get_user_profile_by_id Recipient, Stream, get_stream, get_user_profile_by_id
from zerver.decorator import JsonableError from zerver.decorator import JsonableError
from zerver.lib.cache_helpers import cache_get_message from zerver.lib.cache import cache_get_many, message_cache_key, \
user_profile_by_id_cache_key, cache_save_user_profile
from zerver.lib.cache_helpers import cache_save_message
from zerver.lib.queue import queue_json_publish from zerver.lib.queue import queue_json_publish
from zerver.lib.event_queue import get_client_descriptors_for_user from zerver.lib.event_queue import get_client_descriptors_for_user
@@ -249,11 +251,14 @@ def update_pointer(user_profile_id, new_pointer):
client.add_event(event.copy()) client.add_event(event.copy())
def receives_offline_notifications(user_profile_id): def receives_offline_notifications(user_profile):
user_profile = get_user_profile_by_id(user_profile_id)
return (user_profile.enable_offline_email_notifications and return (user_profile.enable_offline_email_notifications and
not user_profile.is_bot) not user_profile.is_bot)
def receives_offline_notifications_by_id(user_profile_id):
user_profile = get_user_profile_by_id(user_profile_id)
return receives_offline_notifications(user_profile)
def build_offline_notification_event(user_profile_id, message_id): def build_offline_notification_event(user_profile_id, message_id):
return {"user_profile_id": user_profile_id, return {"user_profile_id": user_profile_id,
"message_id": message_id, "message_id": message_id,
@@ -268,7 +273,7 @@ def missedmessage_hook(user_profile_id, queue, last_for_client):
# If a user has gone offline but has unread messages # If a user has gone offline but has unread messages
# received in the idle time, send them a missed # received in the idle time, send them a missed
# message email # message email
if not receives_offline_notifications(user_profile_id): if not receives_offline_notifications_by_id(user_profile_id):
return return
message_ids = [] message_ids = []
@@ -283,14 +288,66 @@ def missedmessage_hook(user_profile_id, queue, last_for_client):
event = build_offline_notification_event(user_profile_id, msg_id) event = build_offline_notification_event(user_profile_id, msg_id)
queue_json_publish("missedmessage_emails", event, lambda event: None) queue_json_publish("missedmessage_emails", event, lambda event: None)
def cache_load_message_data(message_id, users):
# Get everything that we'll need out of memcached in one fetch, to save round-trip times:
# * The message itself
# * Every recipient's UserProfile
user_profile_keys = [user_profile_by_id_cache_key(user_data['id']) for user_data in users]
cache_keys = [message_cache_key(message_id)]
cache_keys.extend(user_profile_keys)
# Single memcached fetch
result = cache_get_many(cache_keys)
cache_extractor = lambda result: result[0] if result is not None else None
message = cache_extractor(result.get(cache_keys[0], None))
user_profiles = dict((user_data['id'], cache_extractor(result.get(user_profile_by_id_cache_key(user_data['id']), None)))
for user_data in users)
# Any data that was not found in memcached, we have to load from the database
# and save back. This should never happen---we take steps to keep recent messages,
# all user profile & presence objects in memcached.
if message is None:
if not settings.TEST_SUITE:
logging.warning("Tornado failed to load message from memcached when delivering!")
message = Message.objects.select_related().get(id=message_id)
cache_save_message(message)
for user_profile_id, user_profile in user_profiles.iteritems():
if user_profile:
continue
if not settings.TEST_SUITE:
logging.warning("Tornado failed to load user profile from memcached when delivering message!")
user_profile = UserProfile.objects.select_related().get(id=user_profile_id)
user_profiles[user_profile_id] = user_profile
cache_save_user_profile(user_profile)
return message, user_profiles
def receiver_is_idle(user_profile):
# If a user has no message-receiving event queues, they've got no open zulip
# session so we notify them
all_client_descriptors = get_client_descriptors_for_user(user_profile.id)
message_event_queues = [client for client in all_client_descriptors if client.accepts_event_type('message')]
off_zulip = len(message_event_queues) == 0
return off_zulip
def process_new_message(data): def process_new_message(data):
message = cache_get_message(data['message']) message, user_profiles = cache_load_message_data(data['message'],
data['users'])
message_dict_markdown = message.to_dict(True) message_dict_markdown = message.to_dict(True)
message_dict_no_markdown = message.to_dict(False) message_dict_no_markdown = message.to_dict(False)
for user_data in data['users']: for user_data in data['users']:
user_profile_id = user_data['id'] user_profile_id = user_data['id']
user_profile = user_profiles[user_data['id']]
flags = user_data.get('flags', []) flags = user_data.get('flags', [])
user_receive_message(user_profile_id, message) user_receive_message(user_profile_id, message)
@@ -322,11 +379,11 @@ def process_new_message(data):
received_pm = message.recipient.type in (Recipient.PERSONAL, Recipient.HUDDLE) and \ received_pm = message.recipient.type in (Recipient.PERSONAL, Recipient.HUDDLE) and \
user_profile_id != message.sender.id user_profile_id != message.sender.id
mentioned = 'mentioned' in flags mentioned = 'mentioned' in flags
all_client_descriptors = get_client_descriptors_for_user(user_profile_id)
message_event_queues = [client for client in all_client_descriptors if client.accepts_event_type('message')] idle = receiver_is_idle(user_profile)
idle = len(message_event_queues) == 0
if (received_pm or mentioned) and idle: if (received_pm or mentioned) and idle:
if receives_offline_notifications(user_profile_id): if receives_offline_notifications(user_profile):
event = build_offline_notification_event(user_profile_id, message.id) event = build_offline_notification_event(user_profile_id, message.id)
# We require RabbitMQ to do this, as we can't call the email handler # We require RabbitMQ to do this, as we can't call the email handler