subscription_data: Flesh out partial suscription data sent to client.

Part of #34244.
This commit is contained in:
Evy Kassirer
2025-06-20 19:19:42 -07:00
committed by Tim Abbott
parent 9ef3a19451
commit 0c5e1ac492
4 changed files with 103 additions and 39 deletions

View File

@@ -3,6 +3,7 @@ from collections.abc import Callable, Collection, Iterable, Mapping
from operator import itemgetter from operator import itemgetter
from typing import Any, Literal from typing import Any, Literal
from django.conf import settings
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.db import connection from django.db import connection
from django.db.models import QuerySet from django.db.models import QuerySet
@@ -516,6 +517,7 @@ def bulk_get_subscriber_user_ids(
stream_dicts: Collection[Mapping[str, Any]], stream_dicts: Collection[Mapping[str, Any]],
user_profile: UserProfile, user_profile: UserProfile,
subscribed_stream_ids: set[int], subscribed_stream_ids: set[int],
streams_to_partially_fetch: list[int],
) -> dict[int, list[int]]: ) -> dict[int, list[int]]:
"""sub_dict maps stream_id => whether the user is subscribed to that stream.""" """sub_dict maps stream_id => whether the user is subscribed to that stream."""
target_stream_dicts = [] target_stream_dicts = []
@@ -539,10 +541,19 @@ def bulk_get_subscriber_user_ids(
target_stream_dicts.append(stream_dict) target_stream_dicts.append(stream_dict)
recip_to_stream_id = {stream["recipient_id"]: stream["id"] for stream in target_stream_dicts} recip_to_stream_id = {stream["recipient_id"]: stream["id"] for stream in target_stream_dicts}
recipient_ids = sorted(stream["recipient_id"] for stream in target_stream_dicts) full_fetch_recipient_ids = sorted(
stream["recipient_id"]
for stream in target_stream_dicts
if stream["id"] not in streams_to_partially_fetch
)
partial_fetch_recipient_ids = sorted(
stream["recipient_id"]
for stream in target_stream_dicts
if stream["id"] in streams_to_partially_fetch
)
result: dict[int, list[int]] = {stream["id"]: [] for stream in stream_dicts} result: dict[int, list[int]] = {stream["id"]: [] for stream in stream_dicts}
if not recipient_ids: if not full_fetch_recipient_ids and not partial_fetch_recipient_ids:
return result return result
""" """
@@ -559,10 +570,18 @@ def bulk_get_subscriber_user_ids(
zerver_subscription.user_profile_id zerver_subscription.user_profile_id
FROM FROM
zerver_subscription zerver_subscription
JOIN zerver_userprofile on zerver_userprofile.id = zerver_subscription.user_profile_id
WHERE WHERE
zerver_subscription.recipient_id in %(recipient_ids)s AND
zerver_subscription.active AND zerver_subscription.active AND
zerver_subscription.is_user_active zerver_subscription.is_user_active AND
(
zerver_subscription.recipient_id = ANY (%(full_fetch_recipient_ids)s)
OR
(
zerver_subscription.recipient_id = ANY (%(partial_fetch_recipient_ids)s) AND
(zerver_userprofile.is_bot OR (NOT zerver_userprofile.long_term_idle))
)
)
ORDER BY ORDER BY
zerver_subscription.recipient_id, zerver_subscription.recipient_id,
zerver_subscription.user_profile_id zerver_subscription.user_profile_id
@@ -570,7 +589,13 @@ def bulk_get_subscriber_user_ids(
) )
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute(query, {"recipient_ids": tuple(recipient_ids)}) cursor.execute(
query,
{
"full_fetch_recipient_ids": full_fetch_recipient_ids,
"partial_fetch_recipient_ids": partial_fetch_recipient_ids,
},
)
rows = cursor.fetchall() rows = cursor.fetchall()
cursor.close() cursor.close()
@@ -858,44 +883,38 @@ def gather_subscriptions_helper(
get_stream_id(sub_dict) for sub_dict in sub_dicts if sub_dict["active"] get_stream_id(sub_dict) for sub_dict in sub_dicts if sub_dict["active"]
} }
# If the client only wants partial subscriber data, we send:
# - all subscribers (full data) for channels with fewer than 250 subscribers
# - only bots and recently active users for channels with >= 250 subscribers
streams_to_partially_fetch = []
if include_subscribers == "partial":
streams_to_partially_fetch = [
stream.id
for stream in all_streams
if stream.subscriber_count >= settings.MIN_PARTIAL_SUBSCRIBERS_CHANNEL_SIZE
]
subscriber_map = bulk_get_subscriber_user_ids( subscriber_map = bulk_get_subscriber_user_ids(
all_stream_dicts, all_stream_dicts,
user_profile, user_profile,
subscribed_stream_ids, subscribed_stream_ids,
streams_to_partially_fetch,
) )
# Eventually "partial subscribers" will return (at minimum):
# (1) all subscriptions for recently active users (for the buddy list)
# (2) subscriptions for all bots
#
# For now, we're only doing (2).
send_partial_subscribers = include_subscribers == "partial"
partial_subscriber_map: dict[int, list[int]] = dict()
if send_partial_subscribers:
bot_users = set(
UserProfile.objects.filter(
is_bot=True, realm=user_profile.realm, is_active=True
).values_list("id", flat=True)
)
for stream_id, users in subscriber_map.items():
partial_subscribers = [user_id for user_id in users if user_id in bot_users]
if len(partial_subscribers) != len(users):
partial_subscriber_map[stream_id] = partial_subscribers
for lst in [subscribed, unsubscribed]: for lst in [subscribed, unsubscribed]:
for stream_dict in lst: for stream_dict in lst:
assert isinstance(stream_dict["stream_id"], int) assert isinstance(stream_dict["stream_id"], int)
stream_id = stream_dict["stream_id"] stream_id = stream_dict["stream_id"]
if send_partial_subscribers and partial_subscriber_map.get(stream_id) is not None: if stream_id in streams_to_partially_fetch:
stream_dict["partial_subscribers"] = partial_subscriber_map[stream_id] stream_dict["partial_subscribers"] = subscriber_map[stream_id]
else: else:
stream_dict["subscribers"] = subscriber_map[stream_id] stream_dict["subscribers"] = subscriber_map[stream_id]
for slim_stream_dict in never_subscribed: for slim_stream_dict in never_subscribed:
assert isinstance(slim_stream_dict["stream_id"], int) assert isinstance(slim_stream_dict["stream_id"], int)
stream_id = slim_stream_dict["stream_id"] stream_id = slim_stream_dict["stream_id"]
if send_partial_subscribers and partial_subscriber_map.get(stream_id) is not None: if stream_id in streams_to_partially_fetch:
slim_stream_dict["partial_subscribers"] = partial_subscriber_map[stream_id] slim_stream_dict["partial_subscribers"] = subscriber_map[stream_id]
else: else:
slim_stream_dict["subscribers"] = subscriber_map[stream_id] slim_stream_dict["subscribers"] = subscriber_map[stream_id]

View File

@@ -2,6 +2,7 @@ from typing import TYPE_CHECKING, Any
import orjson import orjson
from django.conf import settings from django.conf import settings
from django.test import override_settings
from typing_extensions import override from typing_extensions import override
from zerver.actions.streams import ( from zerver.actions.streams import (
@@ -9,6 +10,7 @@ from zerver.actions.streams import (
do_change_stream_permission, do_change_stream_permission,
do_deactivate_stream, do_deactivate_stream,
) )
from zerver.lib.create_user import create_user
from zerver.lib.email_mirror_helpers import encode_email_address, get_channel_email_token from zerver.lib.email_mirror_helpers import encode_email_address, get_channel_email_token
from zerver.lib.subscription_info import gather_subscriptions, gather_subscriptions_helper from zerver.lib.subscription_info import gather_subscriptions, gather_subscriptions_helper
from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_classes import ZulipTestCase
@@ -661,15 +663,29 @@ class GetSubscribersTest(ZulipTestCase):
stream_name = gather_subscriptions(self.user_profile)[0][0]["name"] stream_name = gather_subscriptions(self.user_profile)[0][0]["name"]
self.make_successful_subscriber_request(stream_name) self.make_successful_subscriber_request(stream_name)
@override_settings(MIN_PARTIAL_SUBSCRIBERS_CHANNEL_SIZE=5)
def test_gather_partial_subscriptions(self) -> None: def test_gather_partial_subscriptions(self) -> None:
othello = self.example_user("othello") othello = self.example_user("othello")
idle_users = [
create_user(
email=f"original_user{i}@zulip.com",
password=None,
realm=othello.realm,
full_name=f"Full Name {i}",
)
for i in range(5)
]
for user in idle_users:
user.long_term_idle = True
user.save()
bot = self.create_test_bot("bot", othello, "Foo Bot") bot = self.create_test_bot("bot", othello, "Foo Bot")
stream_names = [ stream_names = [
"never_subscribed_only_bots", "never_subscribed_only_bots",
"never_subscribed_more_than_bots", "never_subscribed_many_more_than_bots",
"unsubscribed_only_bots", "unsubscribed_only_bots",
"subscribed_more_than_bots", "subscribed_more_than_bots_including_idle",
"subscribed_many_more_than_bots",
] ]
for stream_name in stream_names: for stream_name in stream_names:
self.make_stream(stream_name) self.make_stream(stream_name)
@@ -681,8 +697,12 @@ class GetSubscribersTest(ZulipTestCase):
) )
self.subscribe_via_post( self.subscribe_via_post(
self.user_profile, self.user_profile,
["never_subscribed_more_than_bots"], ["never_subscribed_many_more_than_bots"],
dict(principals=orjson.dumps([bot.id, othello.id]).decode()), dict(
principals=orjson.dumps(
[bot.id, othello.id] + [user.id for user in idle_users]
).decode()
),
) )
self.subscribe_via_post( self.subscribe_via_post(
self.user_profile, self.user_profile,
@@ -695,11 +715,24 @@ class GetSubscribersTest(ZulipTestCase):
) )
self.subscribe_via_post( self.subscribe_via_post(
self.user_profile, self.user_profile,
["subscribed_more_than_bots"], ["subscribed_more_than_bots_including_idle"],
dict(principals=orjson.dumps([bot.id, othello.id, self.user_profile.id]).decode()), dict(
principals=orjson.dumps(
[bot.id, othello.id, self.user_profile.id, idle_users[0].id]
).decode()
),
)
self.subscribe_via_post(
self.user_profile,
["subscribed_many_more_than_bots"],
dict(
principals=orjson.dumps(
[bot.id, othello.id, self.user_profile.id] + [user.id for user in idle_users]
).decode()
),
) )
with self.assert_database_query_count(10): with self.assert_database_query_count(9):
sub_data = gather_subscriptions_helper(self.user_profile, include_subscribers="partial") sub_data = gather_subscriptions_helper(self.user_profile, include_subscribers="partial")
never_subscribed_streams = sub_data.never_subscribed never_subscribed_streams = sub_data.never_subscribed
unsubscribed_streams = sub_data.unsubscribed unsubscribed_streams = sub_data.unsubscribed
@@ -719,8 +752,9 @@ class GetSubscribersTest(ZulipTestCase):
self.assert_length(sub["subscribers"], 1) self.assert_length(sub["subscribers"], 1)
self.assertIsNone(sub.get("partial_subscribers")) self.assertIsNone(sub.get("partial_subscribers"))
continue continue
if sub["name"] == "never_subscribed_more_than_bots": if sub["name"] == "never_subscribed_many_more_than_bots":
self.assert_length(sub["partial_subscribers"], 1) # the bot and Othello (who is not long_term_idle)
self.assert_length(sub["partial_subscribers"], 2)
self.assertIsNone(sub.get("subscribers")) self.assertIsNone(sub.get("subscribers"))
for sub in unsubscribed_streams: for sub in unsubscribed_streams:
@@ -730,10 +764,16 @@ class GetSubscribersTest(ZulipTestCase):
break break
for sub in subscribed_streams: for sub in subscribed_streams:
if sub["name"] == "subscribed_more_than_bots": # fewer than MIN_PARTIAL_SUBSCRIBERS_CHANNEL_SIZE subscribers,
self.assert_length(sub["partial_subscribers"], 1) # so we get all of them
if sub["name"] == "subscribed_more_than_bots_including_idle":
self.assertIsNone(sub.get("partial_subscribers"))
self.assert_length(sub["subscribers"], 4)
continue
if sub["name"] == "subscribed_many_more_than_bots":
# the bot, Othello (who is not long_term_idle), and current user
self.assert_length(sub["partial_subscribers"], 3)
self.assertIsNone(sub.get("subscribers")) self.assertIsNone(sub.get("subscribers"))
break
def test_gather_subscriptions(self) -> None: def test_gather_subscriptions(self) -> None:
""" """

View File

@@ -195,6 +195,7 @@ class TestMiscStuff(ZulipTestCase):
stream_dicts=[], stream_dicts=[],
user_profile=user_profile, user_profile=user_profile,
subscribed_stream_ids=set(), subscribed_stream_ids=set(),
streams_to_partially_fetch=[],
) )
self.assertEqual(result, {}) self.assertEqual(result, {})

View File

@@ -737,3 +737,7 @@ VERIFY_WEBHOOK_SIGNATURES = True
# SCIM API configuration. # SCIM API configuration.
SCIM_CONFIG: dict[str, SCIMConfigDict] = {} SCIM_CONFIG: dict[str, SCIMConfigDict] = {}
# Minimum number of subscribers in a channel for us to no longer
# send full subscriber data to the client.
MIN_PARTIAL_SUBSCRIBERS_CHANNEL_SIZE = 250