streams: Optimize computing users with metadata access.

This commit updates code to optimize computing users who have
metadata access via permission groups so that we do not have
to do DB query for each stream to get recursive members for
the groups having permissions.
This commit is contained in:
Sahil Batra
2025-02-13 15:00:49 +05:30
committed by Tim Abbott
parent 0d1d805ee5
commit a1ac49582b
5 changed files with 83 additions and 27 deletions

View File

@@ -43,6 +43,7 @@ from zerver.lib.streams import (
get_stream_permission_policy_name, get_stream_permission_policy_name,
get_stream_post_policy_value_based_on_group_setting, get_stream_post_policy_value_based_on_group_setting,
get_user_ids_with_metadata_access_via_permission_groups, get_user_ids_with_metadata_access_via_permission_groups,
get_users_dict_with_metadata_access_to_streams_via_permission_groups,
render_stream_description, render_stream_description,
send_stream_creation_event, send_stream_creation_event,
send_stream_deletion_event, send_stream_deletion_event,
@@ -492,6 +493,7 @@ def send_stream_creation_events_for_previously_inaccessible_streams(
stream_dict: dict[int, Stream], stream_dict: dict[int, Stream],
altered_user_dict: dict[int, set[int]], altered_user_dict: dict[int, set[int]],
altered_guests: set[int], altered_guests: set[int],
users_with_metadata_access_via_permission_groups: dict[int, set[int]] | None = None,
) -> None: ) -> None:
stream_ids = set(altered_user_dict.keys()) stream_ids = set(altered_user_dict.keys())
recent_traffic = get_streams_traffic(stream_ids, realm) recent_traffic = get_streams_traffic(stream_ids, realm)
@@ -504,6 +506,7 @@ def send_stream_creation_events_for_previously_inaccessible_streams(
notify_user_ids = [] notify_user_ids = []
if not stream.is_public(): if not stream.is_public():
assert users_with_metadata_access_via_permission_groups is not None
# Users newly added to invite-only streams # Users newly added to invite-only streams
# need a `create` notification. The former, because # need a `create` notification. The former, because
# they need the stream to exist before # they need the stream to exist before
@@ -511,13 +514,10 @@ def send_stream_creation_events_for_previously_inaccessible_streams(
# they can manage the new stream. # they can manage the new stream.
# Realm admins already have all created private streams. # Realm admins already have all created private streams.
realm_admin_ids = {user.id for user in realm.get_admin_users_and_bots()} realm_admin_ids = {user.id for user in realm.get_admin_users_and_bots()}
user_ids_with_metadata_access_via_permission_groups = (
get_user_ids_with_metadata_access_via_permission_groups(stream)
)
notify_user_ids = list( notify_user_ids = list(
stream_users_ids stream_users_ids
- realm_admin_ids - realm_admin_ids
- user_ids_with_metadata_access_via_permission_groups - users_with_metadata_access_via_permission_groups[stream.id]
) )
elif not stream.is_web_public: elif not stream.is_web_public:
# Guese users need a `create` notification for # Guese users need a `create` notification for
@@ -805,9 +805,19 @@ def bulk_add_subscriptions(
new_streams = [stream_dict[stream_id] for stream_id in altered_user_dict] new_streams = [stream_dict[stream_id] for stream_id in altered_user_dict]
private_streams = [stream for stream in new_streams if not stream.is_public()]
users_with_metadata_access_via_permission_groups = None
if private_streams:
users_with_metadata_access_via_permission_groups = (
get_users_dict_with_metadata_access_to_streams_via_permission_groups(
private_streams, realm.id
)
)
subscriber_peer_info = bulk_get_subscriber_peer_info( subscriber_peer_info = bulk_get_subscriber_peer_info(
realm=realm, realm=realm,
streams=new_streams, streams=new_streams,
users_with_metadata_access_via_permission_groups=users_with_metadata_access_via_permission_groups,
) )
# We now send several types of events to notify browsers. The # We now send several types of events to notify browsers. The
@@ -820,6 +830,7 @@ def bulk_add_subscriptions(
stream_dict=stream_dict, stream_dict=stream_dict,
altered_user_dict=altered_user_dict, altered_user_dict=altered_user_dict,
altered_guests=altered_guests, altered_guests=altered_guests,
users_with_metadata_access_via_permission_groups=users_with_metadata_access_via_permission_groups,
) )
send_subscription_add_events( send_subscription_add_events(

View File

@@ -1,3 +1,4 @@
from collections import defaultdict
from collections.abc import Collection, Iterable from collections.abc import Collection, Iterable
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -25,9 +26,9 @@ from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.types import AnonymousSettingGroupDict, APIStreamDict from zerver.lib.types import AnonymousSettingGroupDict, APIStreamDict
from zerver.lib.user_groups import ( from zerver.lib.user_groups import (
get_recursive_group_members, get_recursive_group_members,
get_recursive_group_members_union_for_groups,
get_recursive_membership_groups, get_recursive_membership_groups,
get_role_based_system_groups_dict, get_role_based_system_groups_dict,
get_root_id_annotated_recursive_subgroups_for_groups,
user_has_permission_for_group_setting, user_has_permission_for_group_setting,
) )
from zerver.models import ( from zerver.models import (
@@ -185,20 +186,59 @@ def get_default_values_for_stream_permission_group_settings(
return group_setting_values return group_setting_values
def get_user_ids_with_metadata_access_via_permission_groups(stream: Stream) -> set[int]: def get_users_dict_with_metadata_access_to_streams_via_permission_groups(
return set( streams: list[Stream],
get_recursive_group_members_union_for_groups( realm_id: int,
[stream.can_add_subscribers_group_id, stream.can_administer_channel_group_id] ) -> dict[int, set[int]]:
can_administer_group_ids = {stream.can_administer_channel_group_id for stream in streams}
can_add_subscriber_group_ids = {stream.can_add_subscribers_group_id for stream in streams}
all_permission_group_ids = list(can_administer_group_ids | can_add_subscriber_group_ids)
recursive_subgroups = get_root_id_annotated_recursive_subgroups_for_groups(
all_permission_group_ids, realm_id
)
subgroup_root_id_dict = {}
all_subgroup_ids = set()
for group in recursive_subgroups:
subgroup_root_id_dict[group.id] = group.root_id # type: ignore[attr-defined] # root_id is an annotated field.
all_subgroup_ids.add(group.id)
group_members = (
UserGroupMembership.objects.filter(
user_group_id__in=list(all_subgroup_ids), user_profile__is_active=True
) )
.exclude( .exclude(
# allow_everyone_group=False is false for both # allow_everyone_group=False is false for both
# can_add_subscribers_group and # can_add_subscribers_group and
# can_administer_channel_group, so guest users cannot # can_administer_channel_group, so guest users cannot
# exercise these permission to get metadata access. # exercise these permission to get metadata access.
role=UserProfile.ROLE_GUEST user_profile__role=UserProfile.ROLE_GUEST
) )
.values_list("id", flat=True) .values_list("user_group_id", "user_profile_id")
) )
group_members_dict = defaultdict(set)
for user_group_id, user_profile_id in group_members:
root_id = subgroup_root_id_dict[user_group_id]
group_members_dict[root_id].add(user_profile_id)
users_with_metadata_access_dict = defaultdict(set)
for stream in streams:
users_with_metadata_access_dict[stream.id] = (
group_members_dict[stream.can_administer_channel_group_id]
| group_members_dict[stream.can_add_subscribers_group_id]
)
return users_with_metadata_access_dict
def get_user_ids_with_metadata_access_via_permission_groups(stream: Stream) -> set[int]:
users_with_metadata_access_dict = (
get_users_dict_with_metadata_access_to_streams_via_permission_groups(
[stream], stream.realm_id
)
)
return users_with_metadata_access_dict[stream.id]
@transaction.atomic(savepoint=False) @transaction.atomic(savepoint=False)

View File

@@ -23,7 +23,7 @@ from zerver.lib.streams import (
get_group_setting_value_dict_for_streams, get_group_setting_value_dict_for_streams,
get_setting_values_for_group_settings, get_setting_values_for_group_settings,
get_stream_post_policy_value_based_on_group_setting, get_stream_post_policy_value_based_on_group_setting,
get_user_ids_with_metadata_access_via_permission_groups, get_users_dict_with_metadata_access_to_streams_via_permission_groups,
get_web_public_streams_queryset, get_web_public_streams_queryset,
has_metadata_access_to_channel_via_groups, has_metadata_access_to_channel_via_groups,
subscribed_to_stream, subscribed_to_stream,
@@ -521,6 +521,7 @@ def get_subscribers_query(
def bulk_get_subscriber_peer_info( def bulk_get_subscriber_peer_info(
realm: Realm, realm: Realm,
streams: Collection[Stream] | QuerySet[Stream], streams: Collection[Stream] | QuerySet[Stream],
users_with_metadata_access_via_permission_groups: dict[int, set[int]] | None = None,
) -> SubscriberPeerInfo: ) -> SubscriberPeerInfo:
""" """
Glossary: Glossary:
@@ -553,18 +554,22 @@ def bulk_get_subscriber_peer_info(
if private_streams: if private_streams:
realm_admin_ids = {user.id for user in realm.get_admin_users_and_bots()} realm_admin_ids = {user.id for user in realm.get_admin_users_and_bots()}
if users_with_metadata_access_via_permission_groups is None:
users_with_metadata_access_via_permission_groups = (
get_users_dict_with_metadata_access_to_streams_via_permission_groups(
list(private_streams), realm.id
)
)
for stream in private_streams: for stream in private_streams:
# Realm admins can see all private stream # Realm admins can see all private stream
# subscribers. # subscribers.
subscribed_user_ids = stream_user_ids.get(stream.id, set()) subscribed_user_ids = stream_user_ids.get(stream.id, set())
subscribed_ids[stream.id] = subscribed_user_ids subscribed_ids[stream.id] = subscribed_user_ids
user_ids_with_metadata_access_via_permission_groups = (
get_user_ids_with_metadata_access_via_permission_groups(stream)
)
private_peer_dict[stream.id] = ( private_peer_dict[stream.id] = (
subscribed_user_ids subscribed_user_ids
| realm_admin_ids | realm_admin_ids
| user_ids_with_metadata_access_via_permission_groups | users_with_metadata_access_via_permission_groups[stream.id]
) )
for stream_id in public_stream_ids: for stream_id in public_stream_ids:

View File

@@ -792,7 +792,7 @@ def get_root_id_annotated_recursive_subgroups_for_groups(
# each group root_id and annotates it with that group. # each group root_id and annotates it with that group.
cte = With.recursive( cte = With.recursive(
lambda cte: NamedUserGroup.objects.filter(id__in=user_group_ids, realm=realm_id) lambda cte: UserGroup.objects.filter(id__in=user_group_ids, realm=realm_id)
.values(group_id=F("id"), root_id=F("id")) .values(group_id=F("id"), root_id=F("id"))
.union( .union(
cte.join(NamedUserGroup, direct_supergroups=cte.col.group_id).values( cte.join(NamedUserGroup, direct_supergroups=cte.col.group_id).values(
@@ -801,9 +801,7 @@ def get_root_id_annotated_recursive_subgroups_for_groups(
) )
) )
recursive_subgroups = ( recursive_subgroups = (
cte.join(NamedUserGroup, id=cte.col.group_id) cte.join(UserGroup, id=cte.col.group_id).with_cte(cte).annotate(root_id=cte.col.root_id)
.with_cte(cte)
.annotate(root_id=cte.col.root_id)
) )
return recursive_subgroups return recursive_subgroups

View File

@@ -3391,7 +3391,7 @@ class StreamAdminTest(ZulipTestCase):
are on. are on.
""" """
result = self.attempt_unsubscribe_of_principal( result = self.attempt_unsubscribe_of_principal(
query_count=16, query_count=17,
target_users=[self.example_user("cordelia")], target_users=[self.example_user("cordelia")],
is_realm_admin=True, is_realm_admin=True,
is_subbed=True, is_subbed=True,
@@ -3408,7 +3408,7 @@ class StreamAdminTest(ZulipTestCase):
streams you aren't on. streams you aren't on.
""" """
result = self.attempt_unsubscribe_of_principal( result = self.attempt_unsubscribe_of_principal(
query_count=16, query_count=17,
target_users=[self.example_user("cordelia")], target_users=[self.example_user("cordelia")],
is_realm_admin=True, is_realm_admin=True,
is_subbed=False, is_subbed=False,
@@ -6133,15 +6133,17 @@ class SubscriptionAPITest(ZulipTestCase):
private, "can_administer_channel_group", user6_group, acting_user=user6 private, "can_administer_channel_group", user6_group, acting_user=user6
) )
user7_group = self.create_or_update_anonymous_group_for_setting([user7], []) user7_and_guests_group = self.create_or_update_anonymous_group_for_setting(
[user7, guest], []
)
do_change_stream_group_based_setting( do_change_stream_group_based_setting(
private, "can_add_subscribers_group", user7_group, acting_user=user7 private, "can_add_subscribers_group", user7_and_guests_group, acting_user=user7
) )
# Sends 3 peer-remove events, 2 unsubscribe events # Sends 3 peer-remove events, 2 unsubscribe events
# and 2 stream delete events for private streams. # and 2 stream delete events for private streams.
with ( with (
self.assert_database_query_count(19), self.assert_database_query_count(20),
self.assert_memcached_count(3), self.assert_memcached_count(3),
self.capture_send_event_calls(expected_num_events=7) as events, self.capture_send_event_calls(expected_num_events=7) as events,
): ):
@@ -6230,7 +6232,7 @@ class SubscriptionAPITest(ZulipTestCase):
# Verify that peer_event events are never sent in Zephyr # Verify that peer_event events are never sent in Zephyr
# realm. This does generate stream creation events from # realm. This does generate stream creation events from
# send_stream_creation_events_for_previously_inaccessible_streams. # send_stream_creation_events_for_previously_inaccessible_streams.
with self.assert_database_query_count(num_streams * 2 + 15): with self.assert_database_query_count(num_streams + 17):
with self.capture_send_event_calls(expected_num_events=num_streams + 1) as events: with self.capture_send_event_calls(expected_num_events=num_streams + 1) as events:
self.subscribe_via_post( self.subscribe_via_post(
mit_user, mit_user,
@@ -6697,7 +6699,7 @@ class SubscriptionAPITest(ZulipTestCase):
) )
# Test creating private stream. # Test creating private stream.
with self.assert_database_query_count(49): with self.assert_database_query_count(50):
self.subscribe_via_post( self.subscribe_via_post(
self.test_user, self.test_user,
[new_streams[1]], [new_streams[1]],