performance: Extract subscribers/peers in bulk.

We replace get_peer_user_ids_for_stream_change
with two bulk functions to get peers and/or
subscribers.

Note that we have three codepaths that care about
peers:

    subscribing existing users:
        we need to tell peers about new subscribers
        we need to tell subscribed user about old subscribers

    unsubscribing existing users:
        we only need to tell peers who unsubscribed

    subscribing new user:
        we only need to tell peers about the new user
        (right now we generate send_event
        calls to tell the new user about existing
        subscribers, but this is a waste
        of effort that we will fix soon)

The two bulk functions are this:

    bulk_get_subscriber_peer_info
    bulk_get_peers

They have some overlap in the implementation,
but there are some nuanced differences that are
described in the comments.

Looking up peers/subscribers in bulk leads to some
nice optimizations.

We will save some memchached traffic if you are
subscribing to multiple public streams.

We will save a query in the remove-subscriber
case if you are only dealing with private streams.
This commit is contained in:
Steve Howell
2020-10-13 10:53:23 +00:00
committed by Tim Abbott
parent 94e41c71f9
commit b4346d0276
3 changed files with 196 additions and 103 deletions

View File

@@ -122,8 +122,9 @@ from zerver.lib.sessions import delete_user_sessions
from zerver.lib.storage import static_path
from zerver.lib.stream_recipient import StreamRecipientMap
from zerver.lib.stream_subscription import (
bulk_get_peers,
bulk_get_subscriber_peer_info,
get_active_subscriptions_for_stream_id,
get_active_subscriptions_for_stream_ids,
get_bulk_stream_subscriber_info,
get_stream_subscriptions_for_user,
get_stream_subscriptions_for_users,
@@ -2769,53 +2770,6 @@ def send_subscription_add_events(
subscriptions=sub_dicts)
send_event(realm, event, [user_id])
def get_peer_user_ids_for_stream_change(stream: Stream,
altered_user_ids: Iterable[int],
subscribed_user_ids: Iterable[int]) -> Set[int]:
'''
altered_user_ids is the user_ids that we are adding/removing
subscribed_user_ids is the already-subscribed user_ids
Based on stream policy, we notify the correct bystanders, while
not notifying altered_users (who get subscribers via another event)
'''
if stream.invite_only:
# PRIVATE STREAMS
# Realm admins can access all private stream subscribers. Send them an
# event even if they aren't subscribed to stream.
realm_admin_ids = [user.id for user in stream.realm.get_admin_users_and_bots()]
user_ids_to_notify = []
user_ids_to_notify.extend(realm_admin_ids)
user_ids_to_notify.extend(subscribed_user_ids)
return set(user_ids_to_notify) - set(altered_user_ids)
else:
# PUBLIC STREAMS
# We now do "peer_add" or "peer_remove" events even for streams
# users were never subscribed to, in order for the neversubscribed
# structure to stay up-to-date.
return set(active_non_guest_user_ids(stream.realm_id)) - set(altered_user_ids)
def get_user_ids_for_streams(stream_ids: Set[int]) -> Dict[int, Set[int]]:
all_subs = get_active_subscriptions_for_stream_ids(stream_ids).filter(
user_profile__is_active=True,
).values(
'recipient__type_id',
'user_profile_id',
).order_by(
'recipient__type_id',
)
get_stream_id = itemgetter('recipient__type_id')
result: Dict[int, Set[int]] = defaultdict(set)
for stream_id, rows in itertools.groupby(all_subs, get_stream_id):
user_ids = {row['user_profile_id'] for row in rows}
result[stream_id] = user_ids
return result
SubT = Tuple[List[SubInfo], List[SubInfo]]
def bulk_add_subscriptions(
realm: Realm,
@@ -2891,17 +2845,18 @@ def bulk_add_subscriptions(
for sub_info in subs_to_add + subs_to_activate:
new_stream_user_ids[sub_info.stream.id].add(sub_info.user.id)
# Notify all existing users on streams that users have joined
# First, get all users subscribed to the streams that we care about
# We fetch all subscription information upfront, as it's used throughout
# the following code and we want to minize DB queries
all_subscribers_by_stream = get_user_ids_for_streams(
stream_ids=set(new_stream_user_ids.keys())
)
stream_dict = {stream.id: stream for stream in streams}
new_streams = [
stream_dict[stream_id]
for stream_id in new_stream_user_ids
]
subscriber_peer_info = bulk_get_subscriber_peer_info(
realm=realm,
streams=new_streams,
)
# We now send several types of events to notify browsers. The
# first batch is notifications to users on invite-only streams
# that the stream exists.
@@ -2914,14 +2869,14 @@ def bulk_add_subscriptions(
send_subscription_add_events(
realm=realm,
sub_info_list=subs_to_add + subs_to_activate,
subscriber_dict=all_subscribers_by_stream,
subscriber_dict=subscriber_peer_info.subscribed_ids,
)
send_peer_add_events(
realm=realm,
new_stream_user_ids=new_stream_user_ids,
stream_dict=stream_dict,
all_subscribers_by_stream=all_subscribers_by_stream,
peer_id_dict=subscriber_peer_info.peer_ids,
)
return (
@@ -2994,9 +2949,9 @@ def send_peer_add_events(
realm: Realm,
stream_dict: Dict[int, Stream],
new_stream_user_ids: Dict[int, Set[int]],
all_subscribers_by_stream: Dict[int, Set[int]],
peer_id_dict: Dict[int, Set[int]],
) -> None:
# The second batch is events for other users who are tracking the
# Send peer_add events to other users who are tracking the
# subscribers lists of streams in their browser; everyone for
# public streams and only existing subscribers for private streams.
for stream_id, altered_user_ids in new_stream_user_ids.items():
@@ -3005,13 +2960,7 @@ def send_peer_add_events(
if stream.is_in_zephyr_realm and not stream.invite_only:
continue
subscribed_user_ids = all_subscribers_by_stream[stream.id]
peer_user_ids = get_peer_user_ids_for_stream_change(
stream=stream,
altered_user_ids=altered_user_ids,
subscribed_user_ids=subscribed_user_ids,
)
peer_user_ids = peer_id_dict[stream_id] - altered_user_ids
if peer_user_ids:
for new_user_id in altered_user_ids:
@@ -3024,21 +2973,17 @@ def send_peer_remove_events(
realm: Realm,
streams: List[Stream],
altered_user_dict: Dict[int, Set[int]],
all_subscribers_by_stream: Dict[int, Set[int]],
) -> None:
peer_dict = bulk_get_peers(
realm=realm,
streams=streams,
)
for stream in streams:
if stream.is_in_zephyr_realm and not stream.invite_only:
continue
altered_user_ids = altered_user_dict[stream.id]
subscribed_user_ids = all_subscribers_by_stream[stream.id]
peer_user_ids = get_peer_user_ids_for_stream_change(
stream=stream,
altered_user_ids=altered_user_ids,
subscribed_user_ids=subscribed_user_ids,
)
peer_user_ids = list(peer_dict[stream.id] - altered_user_ids)
if peer_user_ids:
for removed_user_id in altered_user_ids:
@@ -3160,14 +3105,10 @@ def bulk_remove_subscriptions(users: Iterable[UserProfile],
'stream_ids': [stream.id for stream in streams]}
queue_json_publish("deferred_work", event)
stream_ids = {stream.id for stream in streams}
all_subscribers_by_stream = get_user_ids_for_streams(stream_ids=stream_ids)
send_peer_remove_events(
realm=our_realm,
streams=streams,
altered_user_dict=altered_user_dict,
all_subscribers_by_stream=all_subscribers_by_stream,
)
new_vacant_streams = set(occupied_streams_before) - set(occupied_streams_after)

View File

@@ -1,10 +1,26 @@
import itertools
from collections import defaultdict
from dataclasses import dataclass
from operator import itemgetter
from typing import Any, Dict, List, Optional, Set, Tuple
from django.db.models.query import QuerySet
from zerver.models import Recipient, Stream, Subscription, UserProfile
from zerver.models import (
Realm,
Recipient,
Stream,
Subscription,
UserProfile,
active_non_guest_user_ids,
)
@dataclass
class SubscriberPeerInfo:
subscribed_ids: Dict[int, Set[int]]
peer_ids: Dict[int, Set[int]]
def get_active_subscriptions_for_stream_id(stream_id: int) -> QuerySet:
# TODO: Change return type to QuerySet[Subscription]
return Subscription.objects.filter(
@@ -73,6 +89,98 @@ def num_subscribers_for_stream_id(stream_id: int) -> int:
user_profile__is_active=True,
).count()
def get_user_ids_for_streams(stream_ids: Set[int]) -> Dict[int, Set[int]]:
all_subs = get_active_subscriptions_for_stream_ids(stream_ids).filter(
user_profile__is_active=True,
).values(
'recipient__type_id',
'user_profile_id',
).order_by(
'recipient__type_id',
)
get_stream_id = itemgetter('recipient__type_id')
result: Dict[int, Set[int]] = defaultdict(set)
for stream_id, rows in itertools.groupby(all_subs, get_stream_id):
user_ids = {row['user_profile_id'] for row in rows}
result[stream_id] = user_ids
return result
def bulk_get_subscriber_peer_info(
realm: Realm,
streams: List[Stream],
) -> SubscriberPeerInfo:
"""
Glossary:
subscribed_ids:
This shows the users who are actually subscribed to the
stream, which we generally send to the person subscribing
to the stream.
peer_ids:
These are the folks that need to know about a new subscriber.
It's usually a superset of the subscribers.
"""
subscribed_ids = {}
peer_ids = {}
private_stream_ids = {stream.id for stream in streams if stream.invite_only}
public_stream_ids = {stream.id for stream in streams if not stream.invite_only}
stream_user_ids = get_user_ids_for_streams(private_stream_ids | public_stream_ids)
if private_stream_ids:
realm_admin_ids = {user.id for user in realm.get_admin_users_and_bots()}
for stream_id in private_stream_ids:
subscribed_user_ids = stream_user_ids.get(stream_id, set())
subscribed_ids[stream_id] = subscribed_user_ids
peer_ids[stream_id] = subscribed_user_ids | realm_admin_ids
if public_stream_ids:
non_guests = active_non_guest_user_ids(realm.id)
for stream_id in public_stream_ids:
subscribed_user_ids = stream_user_ids.get(stream_id, set())
subscribed_ids[stream_id] = subscribed_user_ids
peer_ids[stream_id] = set(non_guests)
return SubscriberPeerInfo(
subscribed_ids=subscribed_ids,
peer_ids=peer_ids,
)
def bulk_get_peers(
realm: Realm,
streams: List[Stream],
) -> Dict[int, Set[int]]:
# This is almost a subset of bulk_get_subscriber_peer_info,
# with the nuance that we don't have to query subscribers
# for public streams. (The other functions tries to save
# a query hop.)
peer_ids = {}
private_stream_ids = {stream.id for stream in streams if stream.invite_only}
public_stream_ids = {stream.id for stream in streams if not stream.invite_only}
if private_stream_ids:
realm_admin_ids = {user.id for user in realm.get_admin_users_and_bots()}
stream_user_ids = get_user_ids_for_streams(private_stream_ids)
for stream_id in private_stream_ids:
subscribed_user_ids = stream_user_ids.get(stream_id, set())
peer_ids[stream_id] = subscribed_user_ids | realm_admin_ids
if public_stream_ids:
non_guests = active_non_guest_user_ids(realm.id)
for stream_id in public_stream_ids:
peer_ids[stream_id] = set(non_guests)
return peer_ids
def handle_stream_notifications_compatibility(user_profile: Optional[UserProfile],
stream_dict: Dict[str, Any],

View File

@@ -1382,8 +1382,13 @@ class StreamAdminTest(ZulipTestCase):
those you aren't on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=True, invite_only=False, target_users_subbed=True)
query_count=21,
target_users=[self.example_user('cordelia')],
is_realm_admin=True,
is_subbed=True,
invite_only=False,
target_users_subbed=True,
)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
@@ -1400,7 +1405,7 @@ class StreamAdminTest(ZulipTestCase):
for name in ['cordelia', 'prospero', 'iago', 'hamlet', 'ZOE']
]
result = self.attempt_unsubscribe_of_principal(
query_count=58,
query_count=57,
cache_count=9,
target_users=target_users,
is_realm_admin=True,
@@ -1418,8 +1423,13 @@ class StreamAdminTest(ZulipTestCase):
are on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=True, invite_only=True, target_users_subbed=True)
query_count=22,
target_users=[self.example_user('cordelia')],
is_realm_admin=True,
is_subbed=True,
invite_only=True,
target_users_subbed=True,
)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
@@ -1430,8 +1440,14 @@ class StreamAdminTest(ZulipTestCase):
streams you aren't on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=False, invite_only=True, target_users_subbed=True, other_sub_users=[self.example_user("othello")])
query_count=22,
target_users=[self.example_user('cordelia')],
is_realm_admin=True,
is_subbed=False,
invite_only=True,
target_users_subbed=True,
other_sub_users=[self.example_user("othello")],
)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
@@ -1441,8 +1457,14 @@ class StreamAdminTest(ZulipTestCase):
You can remove others from public streams you're a stream administrator of.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=False,
is_stream_admin=True, is_subbed=True, invite_only=False, target_users_subbed=True)
query_count=21,
target_users=[self.example_user('cordelia')],
is_realm_admin=False,
is_stream_admin=True,
is_subbed=True,
invite_only=False,
target_users_subbed=True,
)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
@@ -1456,7 +1478,7 @@ class StreamAdminTest(ZulipTestCase):
for name in ['cordelia', 'prospero', 'othello', 'hamlet', 'ZOE']
]
result = self.attempt_unsubscribe_of_principal(
query_count=58,
query_count=57,
cache_count=9,
target_users=target_users,
is_realm_admin=False,
@@ -1474,8 +1496,14 @@ class StreamAdminTest(ZulipTestCase):
You can remove others from private streams you're a stream administrator of.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=False,
is_stream_admin=True, is_subbed=True, invite_only=True, target_users_subbed=True)
query_count=22,
target_users=[self.example_user('cordelia')],
is_realm_admin=False,
is_stream_admin=True,
is_subbed=True,
invite_only=True,
target_users_subbed=True,
)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
@@ -1490,17 +1518,28 @@ class StreamAdminTest(ZulipTestCase):
def test_admin_remove_others_from_stream_legacy_emails(self) -> None:
result = self.attempt_unsubscribe_of_principal(
query_count=22, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=True, invite_only=False, target_users_subbed=True, using_legacy_emails=True)
query_count=21,
target_users=[self.example_user('cordelia')],
is_realm_admin=True,
is_subbed=True,
invite_only=False,
target_users_subbed=True,
using_legacy_emails=True,
)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
self.assertEqual(len(json["not_removed"]), 0)
def test_admin_remove_multiple_users_from_stream_legacy_emails(self) -> None:
result = self.attempt_unsubscribe_of_principal(
query_count=31, target_users=[self.example_user('cordelia'), self.example_user('prospero')],
is_realm_admin=True, is_subbed=True, invite_only=False, target_users_subbed=True,
using_legacy_emails=True)
query_count=30,
target_users=[self.example_user('cordelia'), self.example_user('prospero')],
is_realm_admin=True,
is_subbed=True,
invite_only=False,
target_users_subbed=True,
using_legacy_emails=True,
)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 2)
self.assertEqual(len(json["not_removed"]), 0)
@@ -1636,8 +1675,13 @@ class StreamAdminTest(ZulipTestCase):
fails gracefully.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=12, target_users=[self.example_user('cordelia')], is_realm_admin=True,
is_subbed=False, invite_only=False, target_users_subbed=False)
query_count=11,
target_users=[self.example_user('cordelia')],
is_realm_admin=True,
is_subbed=False,
invite_only=False,
target_users_subbed=False,
)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 0)
self.assertEqual(len(json["not_removed"]), 1)
@@ -3215,7 +3259,7 @@ class SubscriptionAPITest(ZulipTestCase):
)
self.assert_length(query_count, 60)
self.assert_length(cache_count, 6)
self.assert_length(cache_count, 4)
peer_events = [e for e in events
if e['event'].get('op') == 'peer_remove']
@@ -3346,7 +3390,7 @@ class SubscriptionAPITest(ZulipTestCase):
# The only known O(N) behavior here is that we call
# principal_to_user_profile for each of our users.
self.assert_length(queries, 19)
self.assert_length(cache_tries, 28)
self.assert_length(cache_tries, 4)
def test_subscriptions_add_for_principal(self) -> None:
"""