Use bulk queries for adding many subscriptions to a single user.

Previously we only used bulk queries when adding many users to a
single stream, resulting in very slow performance when subscribing
users to large numbers of streams (as happens when setting up a new
MIT realm user).

(imported from commit 849fa7b2a1a146c0a9adc1c727c20c9fbfb7b425)
This commit is contained in:
Tim Abbott
2013-06-25 13:26:58 -04:00
parent 738793a962
commit a66bb508bb
3 changed files with 50 additions and 32 deletions

View File

@@ -8,7 +8,7 @@ from zephyr.models import Realm, Stream, UserProfile, UserActivity, \
DefaultStream, UserPresence, MAX_SUBJECT_LENGTH, \ DefaultStream, UserPresence, MAX_SUBJECT_LENGTH, \
MAX_MESSAGE_LENGTH, get_client, get_stream, get_recipient, get_huddle, \ MAX_MESSAGE_LENGTH, get_client, get_stream, get_recipient, get_huddle, \
get_user_profile_by_id, PreregistrationUser, get_display_recipient, \ get_user_profile_by_id, PreregistrationUser, get_display_recipient, \
to_dict_cache_key, get_realm, stringify_message_dict to_dict_cache_key, get_realm, stringify_message_dict, bulk_get_recipients
from django.db import transaction, IntegrityError from django.db import transaction, IntegrityError
from django.db.models import F, Q from django.db.models import F, Q
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
@@ -570,42 +570,53 @@ def notify_new_subscription(user_profile, stream, subscription, no_log=False):
users=[user_profile.id]) users=[user_profile.id])
tornado_callbacks.send_notification(notice) tornado_callbacks.send_notification(notice)
def bulk_add_subscriptions(stream, users): def bulk_add_subscriptions(streams, users):
recipient = get_recipient(Recipient.STREAM, stream.id) recipients_map = bulk_get_recipients(Recipient.STREAM, [stream.id for stream in streams])
all_subs = Subscription.objects.filter(user_profile__in=users, recipients = [recipient.id for recipient in recipients_map.values()]
recipient__type=Recipient.STREAM)
stream_map = {}
for stream in streams:
stream_map[recipients_map[stream.id].id] = stream
subs_by_user = defaultdict(list) subs_by_user = defaultdict(list)
for sub in all_subs: all_subs_query = Subscription.objects.select_related("user_profile")
for sub in all_subs_query.filter(user_profile__in=users,
recipient__type=Recipient.STREAM):
subs_by_user[sub.user_profile_id].append(sub) subs_by_user[sub.user_profile_id].append(sub)
already_subscribed = [] already_subscribed = []
subs_to_activate = [] subs_to_activate = []
users_needing_new_subs = [] new_subs = []
for user_profile in users: for user_profile in users:
needs_new_sub = True needs_new_sub = set(recipients)
for sub in subs_by_user[user_profile.id]: for sub in subs_by_user[user_profile.id]:
if sub.recipient_id == recipient.id: if sub.recipient_id in needs_new_sub:
needs_new_sub = False needs_new_sub.remove(sub.recipient_id)
if sub.active: if sub.active:
already_subscribed.append(user_profile) already_subscribed.append((user_profile, stream_map[sub.recipient_id]))
else: else:
subs_to_activate.append(sub) subs_to_activate.append((sub, stream_map[sub.recipient_id]))
if needs_new_sub: # Mark the sub as active, without saving, so that
users_needing_new_subs.append(user_profile) # pick_color will consider this to be an active
# subscription when picking colors
sub.active = True
for recipient_id in needs_new_sub:
new_subs.append((user_profile, recipient_id, stream_map[recipient_id]))
subs_to_add = [] subs_to_add = []
for user_profile in users_needing_new_subs: for (user_profile, recipient_id, stream) in new_subs:
color = pick_color_helper(user_profile, subs_by_user[user_profile.id]) color = pick_color_helper(user_profile, subs_by_user[user_profile.id])
subs_to_add.append(Subscription(user_profile=user_profile, sub_to_add = Subscription(user_profile=user_profile, active=True,
active=True, color=color, color=color, recipient_id=recipient_id)
recipient=recipient)) subs_by_user[user_profile.id].append(sub_to_add)
Subscription.objects.bulk_create(subs_to_add) subs_to_add.append((sub_to_add, stream))
Subscription.objects.filter(id__in=[s.id for s in subs_to_activate]).update(active=True) Subscription.objects.bulk_create([sub for (sub, stream) in subs_to_add])
Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in subs_to_activate]).update(active=True)
for sub in subs_to_add + subs_to_activate: for (sub, stream) in subs_to_add + subs_to_activate:
notify_new_subscription(sub.user_profile, stream, sub) notify_new_subscription(sub.user_profile, stream, sub)
return (users_needing_new_subs + [sub.user_profile for sub in subs_to_activate], return ([(user_profile, stream_name) for (user_profile, recipient_id, stream_name) in new_subs] +
[(sub.user_profile, stream_name) for (sub, stream_name) in subs_to_activate],
already_subscribed) already_subscribed)
# When changing this, also change bulk_add_subscriptions # When changing this, also change bulk_add_subscriptions

View File

@@ -5,7 +5,7 @@ from django.conf import settings
from django.contrib.auth.models import AbstractBaseUser, UserManager from django.contrib.auth.models import AbstractBaseUser, UserManager
from zephyr.lib.cache import cache_with_key, update_user_profile_cache, \ from zephyr.lib.cache import cache_with_key, update_user_profile_cache, \
user_profile_by_id_cache_key, user_profile_by_email_cache_key, \ user_profile_by_id_cache_key, user_profile_by_email_cache_key, \
update_user_presence_cache update_user_presence_cache, generic_bulk_cached_fetch
from zephyr.lib.utils import make_safe_digest from zephyr.lib.utils import make_safe_digest
from django.db import transaction, IntegrityError from django.db import transaction, IntegrityError
from zephyr.lib import bugdown from zephyr.lib import bugdown
@@ -269,6 +269,15 @@ def get_recipient_cache_key(type, type_id):
def get_recipient(type, type_id): def get_recipient(type, type_id):
return Recipient.objects.get(type_id=type_id, type=type) return Recipient.objects.get(type_id=type_id, type=type)
def bulk_get_recipients(type, type_ids):
def cache_key_function(type_id):
return get_recipient_cache_key(type, type_id)
def query_function(type_ids):
return Recipient.objects.filter(type=type, type_id__in=type_ids)
return generic_bulk_cached_fetch(cache_key_function, query_function, type_ids,
id_fetcher=lambda recipient: recipient.type_id)
# NB: This function is currently unused, but may come in handy. # NB: This function is currently unused, but may come in handy.
def linebreak(string): def linebreak(string):
return string.replace('\n\n', '<p/>').replace('\n', '<br/>') return string.replace('\n\n', '<p/>').replace('\n', '<br/>')

View File

@@ -1265,18 +1265,16 @@ def add_subscriptions_backend(request, user_profile,
subscribers = [user_profile] subscribers = [user_profile]
streams = list_to_streams(stream_names, user_profile, autocreate=True, invite_only=invite_only) streams = list_to_streams(stream_names, user_profile, autocreate=True, invite_only=invite_only)
private_streams = {}
result = dict(subscribed=[], already_subscribed=[]) (subscribed, already_subscribed) = bulk_add_subscriptions(streams, subscribers)
result = dict(subscribed=defaultdict(list), already_subscribed=defaultdict(list)) result = dict(subscribed=defaultdict(list), already_subscribed=defaultdict(list))
for stream in streams: for (subscriber, stream) in subscribed:
(subscribed, already_subscribed) = bulk_add_subscriptions(stream, subscribers)
for subscriber in subscribed:
result["subscribed"][subscriber.email].append(stream.name) result["subscribed"][subscriber.email].append(stream.name)
for subscriber in already_subscribed: for (subscriber, stream) in already_subscribed:
result["already_subscribed"][subscriber.email].append(stream.name) result["already_subscribed"][subscriber.email].append(stream.name)
private_streams[stream.name] = stream.invite_only
private_streams = dict((stream.name, stream.invite_only) for stream in streams)
# Inform the user if someone else subscribed them to stuff # Inform the user if someone else subscribed them to stuff
if principals and result["subscribed"]: if principals and result["subscribed"]: