Optimize query_all_subs_by_stream().

Using lightweight objects will speed up adding new users
to realms.

We also sort the query results, which lets us itertools.groupby
to more efficiently build the data structure.

Profiling on a large data set shows about a 25x speedup for this
function, and before the optimization, this function accounts
for most of the time spend in bulk_add_subscriptions.

There's a lot less memory to allocate.  I didn't measure
the memory difference.

When we test-deployed this to chat.zulip.org, we got about a 6x
speedup.
This commit is contained in:
Steve Howell
2017-10-06 08:35:55 -07:00
committed by Tim Abbott
parent f5ddc40d14
commit a331b4f64d

View File

@@ -1951,16 +1951,52 @@ def get_peer_user_ids_for_stream_change(stream, altered_user_ids, subscribed_use
# structure to stay up-to-date.
return set(active_user_ids(stream.realm_id)) - set(altered_user_ids)
def query_all_subs_by_stream(streams):
# type: (Iterable[Stream]) -> Dict[int, List[UserProfile]]
all_subs = Subscription.objects.filter(recipient__type=Recipient.STREAM,
recipient__type_id__in=[stream.id for stream in streams],
user_profile__is_active=True,
active=True).select_related('recipient', 'user_profile')
class UserLite(object):
'''
This is a lightweight object that we use for highly
optimized codepaths that sometimes process ~30k subscription
rows when bulk-adding streams for newly registered users.
This little wrapper is a lot less expensive than full-blown
UserProfile objects, but it lets the rest of the code work
with the nice object syntax.
Long term, we want to avoid sending around all of these emails,
so we can make this class go away and just deal with user_ids.
'''
def __init__(self, user_id, email):
# type: (int, Text) -> None
self.id = user_id
self.email = email
def query_all_subs_by_stream(streams):
# type: (Iterable[Stream]) -> Dict[int, List[UserLite]]
all_subs = Subscription.objects.filter(
recipient__type=Recipient.STREAM,
recipient__type_id__in=[stream.id for stream in streams],
user_profile__is_active=True,
active=True
).values(
'recipient__type_id',
'user_profile_id',
'user_profile__email',
).order_by(
'recipient__type_id',
)
get_stream_id = itemgetter('recipient__type_id')
all_subs_by_stream = defaultdict(list) # type: Dict[int, List[UserLite]]
for stream_id, rows in itertools.groupby(all_subs, get_stream_id):
users = [
UserLite(
user_id=row['user_profile_id'],
email=row['user_profile__email'],
)
for row in rows
]
all_subs_by_stream[stream_id] = users
all_subs_by_stream = defaultdict(list) # type: Dict[int, List[UserProfile]]
for sub in all_subs:
all_subs_by_stream[sub.recipient.type_id].append(sub.user_profile)
return all_subs_by_stream
def bulk_add_subscriptions(streams, users, from_stream_creation=False, acting_user=None):