peer_data: Use sub.subscriber_count for get_subscriber_count.

This commit is contained in:
Evy Kassirer
2025-06-08 17:30:41 -07:00
committed by Tim Abbott
parent 0c000da267
commit 55634c8fcc
7 changed files with 112 additions and 30 deletions

View File

@@ -7,10 +7,14 @@ import {LazySet} from "./lazy_set.ts";
import {page_params} from "./page_params.ts"; import {page_params} from "./page_params.ts";
import type {User} from "./people.ts"; import type {User} from "./people.ts";
import * as people from "./people.ts"; import * as people from "./people.ts";
import type {StreamSubscription} from "./sub_store.ts";
import * as sub_store from "./sub_store.ts"; import * as sub_store from "./sub_store.ts";
import * as util from "./util.ts"; import * as util from "./util.ts";
// This maps a stream_id to a LazySet of user_ids who are subscribed. // This maps a stream_id to a LazySet of user_ids who are subscribed.
// Make sure that when we have full subscriber data for a stream,
// the size of its subscribers set stays synced with the relevant
// stream's `subscriber_count`.
const stream_subscribers = new Map<number, LazySet>(); const stream_subscribers = new Map<number, LazySet>();
const fetched_stream_ids = new Set<number>(); const fetched_stream_ids = new Set<number>();
export function has_full_subscriber_data(stream_id: number): boolean { export function has_full_subscriber_data(stream_id: number): boolean {
@@ -135,7 +139,7 @@ async function get_full_subscriber_set(
if (fetched_subscribers === null) { if (fetched_subscribers === null) {
return null; return null;
} }
stream_subscribers.set(stream_id, fetched_subscribers); set_subscribers(stream_id, [...fetched_subscribers.keys()]);
} }
return get_loaded_subscriber_subset(stream_id); return get_loaded_subscriber_subset(stream_id);
} }
@@ -194,18 +198,45 @@ export function potential_subscribers(stream_id: number): User[] {
return people.filter_all_users(is_potential_subscriber); return people.filter_all_users(is_potential_subscriber);
} }
function increment_subscriber_count(sub: StreamSubscription, user_id: number): void {
const subscribers = get_loaded_subscriber_subset(sub.stream_id);
if (subscribers.has(user_id)) {
return;
}
sub.subscriber_count += 1;
}
function decrement_subscriber_count(sub: StreamSubscription, user_id: number): void {
const subscribers = get_loaded_subscriber_subset(sub.stream_id);
// If we don't have full subscriber data, we assume that even if didn't know
// they were a subscriber, we still want to decrement the count.
if (fetched_stream_ids.has(sub.stream_id) && !subscribers.has(user_id)) {
return;
}
sub.subscriber_count -= 1;
}
// Note: `subscriber_count` can sometimes be wrong due to races on the backend,
// so we should fetch full subscriber data if we care about this number being
// accurate.
export let get_subscriber_count = (stream_id: number, include_bots = true): number => { export let get_subscriber_count = (stream_id: number, include_bots = true): number => {
if (include_bots) { const sub = sub_store.get(stream_id);
return get_loaded_subscriber_subset(stream_id).size; if (sub === undefined) {
blueslip.warn(`We called get_subscriber_count for an untracked stream: ${stream_id}`);
return 0;
} }
let count = 0; if (include_bots) {
return sub.subscriber_count;
}
let bot_count = 0;
for (const user_id of get_loaded_subscriber_subset(stream_id).keys()) { for (const user_id of get_loaded_subscriber_subset(stream_id).keys()) {
if (!people.is_valid_bot_user(user_id) && people.is_person_active(user_id)) { if (people.is_valid_bot_user(user_id)) {
count += 1; bot_count += 1;
} }
} }
return count; return sub.subscriber_count - bot_count;
}; };
export function rewire_get_subscriber_count(value: typeof get_subscriber_count): void { export function rewire_get_subscriber_count(value: typeof get_subscriber_count): void {
@@ -238,6 +269,12 @@ export async function get_all_subscribers(
export function set_subscribers(stream_id: number, user_ids: number[], full_data = true): void { export function set_subscribers(stream_id: number, user_ids: number[], full_data = true): void {
const subscribers = new LazySet(user_ids); const subscribers = new LazySet(user_ids);
stream_subscribers.set(stream_id, subscribers); stream_subscribers.set(stream_id, subscribers);
const sub = sub_store.get(stream_id);
if (!sub) {
blueslip.warn(`We called set_subscribers for an untracked stream: ${stream_id}`);
} else if (full_data) {
sub.subscriber_count = subscribers.size;
}
if (full_data) { if (full_data) {
fetched_stream_ids.add(stream_id); fetched_stream_ids.add(stream_id);
} }
@@ -251,16 +288,23 @@ export function add_subscriber(stream_id: number, user_id: number): void {
if (person === undefined) { if (person === undefined) {
blueslip.warn(`We tried to add invalid subscriber: ${user_id}`); blueslip.warn(`We tried to add invalid subscriber: ${user_id}`);
} }
const sub = sub_store.get(stream_id);
// If the sub is undefined, we'll be raising a warning in
// `get_loaded_subscriber_subset`, so we don't need to here.
if (sub) {
increment_subscriber_count(sub, user_id);
}
subscribers.add(user_id); subscribers.add(user_id);
} }
export function remove_subscriber(stream_id: number, user_id: number): void { export function remove_subscriber(stream_id: number, user_id: number): void {
const subscribers = get_loaded_subscriber_subset(stream_id); const sub = sub_store.get(stream_id);
if (!subscribers.has(user_id)) { // If the sub is undefined, we'll be raising a warning in
blueslip.warn(`We tried to remove invalid subscriber: ${user_id}`); // `get_loaded_subscriber_subset`, so we don't need to here.
return; if (sub) {
decrement_subscriber_count(sub, user_id);
} }
const subscribers = get_loaded_subscriber_subset(stream_id);
subscribers.delete(user_id); subscribers.delete(user_id);
} }
@@ -274,7 +318,13 @@ export function bulk_add_subscribers({
// We rely on our callers to validate stream_ids and user_ids. // We rely on our callers to validate stream_ids and user_ids.
for (const stream_id of stream_ids) { for (const stream_id of stream_ids) {
const subscribers = get_loaded_subscriber_subset(stream_id); const subscribers = get_loaded_subscriber_subset(stream_id);
const sub = sub_store.get(stream_id);
for (const user_id of user_ids) { for (const user_id of user_ids) {
// If the sub is undefined, we'll be raising a warning in
// `get_loaded_subscriber_subset`, so we don't need to here.
if (sub) {
increment_subscriber_count(sub, user_id);
}
subscribers.add(user_id); subscribers.add(user_id);
} }
@@ -297,7 +347,13 @@ export function bulk_remove_subscribers({
// We rely on our callers to validate stream_ids and user_ids. // We rely on our callers to validate stream_ids and user_ids.
for (const stream_id of stream_ids) { for (const stream_id of stream_ids) {
const subscribers = get_loaded_subscriber_subset(stream_id); const subscribers = get_loaded_subscriber_subset(stream_id);
const sub = sub_store.get(stream_id);
for (const user_id of user_ids) { for (const user_id of user_ids) {
// If the sub is undefined, we'll be raising a warning in
// `get_loaded_subscriber_subset`, so we don't need to here.
if (sub) {
decrement_subscriber_count(sub, user_id);
}
subscribers.delete(user_id); subscribers.delete(user_id);
} }

View File

@@ -1065,18 +1065,18 @@ export function create_sub_from_server_data(
...attrs, ...attrs,
}; };
if (attrs.partial_subscribers !== undefined) {
peer_data.set_subscribers(sub.stream_id, attrs.partial_subscribers, false);
} else {
peer_data.set_subscribers(sub.stream_id, subscriber_user_ids ?? []);
}
clean_up_description(sub); clean_up_description(sub);
stream_info.set(sub.stream_id, sub); stream_info.set(sub.stream_id, sub);
stream_ids_by_name.set(sub.name, sub.stream_id); stream_ids_by_name.set(sub.name, sub.stream_id);
sub_store.add_hydrated_sub(sub.stream_id, sub); sub_store.add_hydrated_sub(sub.stream_id, sub);
if (attrs.partial_subscribers !== undefined) {
peer_data.set_subscribers(sub.stream_id, attrs.partial_subscribers, false);
} else {
peer_data.set_subscribers(sub.stream_id, subscriber_user_ids ?? []);
}
return sub; return sub;
} }

View File

@@ -56,6 +56,10 @@ export const stream_schema = z.object({
rendered_description: z.string(), rendered_description: z.string(),
stream_id: z.number(), stream_id: z.number(),
stream_post_policy: z.nativeEnum(StreamPostPolicy), stream_post_policy: z.nativeEnum(StreamPostPolicy),
// Generally, this should not be accessed directly, since it can
// have small inaccuracies in the event of rare races. See
// the comments on peer_data.get_subscriber_count.
subscriber_count: z.number(),
topics_policy: stream_topics_policy_schema, topics_policy: stream_topics_policy_schema,
}); });

View File

@@ -642,8 +642,8 @@ test_ui("warn_if_private_stream_is_linked", async ({mock_template}) => {
name: "Denmark", name: "Denmark",
stream_id: 22, stream_id: 22,
}; };
peer_data.set_subscribers(secret_stream.stream_id, []);
stream_data.add_sub(secret_stream); stream_data.add_sub(secret_stream);
peer_data.set_subscribers(secret_stream.stream_id, []);
banner_rendered = false; banner_rendered = false;
const $banner_container = $("#compose_banners"); const $banner_container = $("#compose_banners");
$banner_container.set_find_results(".private_stream_warning", []); $banner_container.set_find_results(".private_stream_warning", []);

View File

@@ -147,6 +147,14 @@ test("subscribers", async () => {
potential_subscriber_ids(); potential_subscriber_ids();
blueslip.reset(); blueslip.reset();
const bogus_stream_id = 999;
blueslip.expect(
"warn",
"We called set_subscribers for an untracked stream: " + bogus_stream_id,
);
peer_data.set_subscribers(bogus_stream_id, []);
blueslip.reset();
peer_data.set_subscribers(stream_id, []); peer_data.set_subscribers(stream_id, []);
assert.deepEqual(potential_subscriber_ids(), [ assert.deepEqual(potential_subscriber_ids(), [
me.user_id, me.user_id,
@@ -194,16 +202,12 @@ test("subscribers", async () => {
"warn", "warn",
"We called get_loaded_subscriber_subset for an untracked stream: " + bad_stream_id, "We called get_loaded_subscriber_subset for an untracked stream: " + bad_stream_id,
); );
blueslip.expect("warn", "We tried to remove invalid subscriber: 104");
peer_data.remove_subscriber(bad_stream_id, brutus.user_id); peer_data.remove_subscriber(bad_stream_id, brutus.user_id);
blueslip.reset();
// verify that removing an already-removed subscriber is a noop // verify that removing an already-removed subscriber is a noop
blueslip.expect("warn", "We tried to remove invalid subscriber: 104");
peer_data.remove_subscriber(stream_id, brutus.user_id); peer_data.remove_subscriber(stream_id, brutus.user_id);
assert.ok(!stream_data.is_user_subscribed(stream_id, brutus.user_id)); assert.ok(!stream_data.is_user_subscribed(stream_id, brutus.user_id));
assert.equal(peer_data.get_subscriber_count(stream_id), 0); assert.equal(peer_data.get_subscriber_count(stream_id), 0);
blueslip.reset();
// Verify defensive code in set_subscribers, where the second parameter // Verify defensive code in set_subscribers, where the second parameter
// can be undefined. // can be undefined.
@@ -362,7 +366,7 @@ test("maybe_fetch_stream_subscribers", async () => {
assert.equal(num_attempts, 2); assert.equal(num_attempts, 2);
}); });
test("get_subscriber_count", () => { test("get_subscriber_count", async () => {
people.add_active_user(fred); people.add_active_user(fred);
people.add_active_user(gail); people.add_active_user(gail);
people.add_active_user(george); people.add_active_user(george);
@@ -378,10 +382,11 @@ test("get_subscriber_count", () => {
stream_id: 102, stream_id: 102,
name: "India", name: "India",
subscribed: true, subscribed: true,
subscriber_count: 0,
}; };
stream_data.clear_subscriptions(); stream_data.clear_subscriptions();
blueslip.expect("warn", "We called get_loaded_subscriber_subset for an untracked stream: 102"); blueslip.expect("warn", "We called get_subscriber_count for an untracked stream: 102");
assert.equal(peer_data.get_subscriber_count(india.stream_id), 0); assert.equal(peer_data.get_subscriber_count(india.stream_id), 0);
stream_data.add_sub(india); stream_data.add_sub(india);
@@ -399,6 +404,23 @@ test("get_subscriber_count", () => {
assert.deepStrictEqual(peer_data.get_subscriber_count(india.stream_id), 2); assert.deepStrictEqual(peer_data.get_subscriber_count(india.stream_id), 2);
// Get the count without bots // Get the count without bots
assert.deepStrictEqual(peer_data.get_subscriber_count(india.stream_id, false), 1); assert.deepStrictEqual(peer_data.get_subscriber_count(india.stream_id, false), 1);
// We don't have full data, so we assume this is a decrement even though gail isn't
// in the subscriber list.
india.subscriber_count = 20;
assert.deepStrictEqual(peer_data.get_subscriber_count(india.stream_id), 20);
peer_data.remove_subscriber(india.stream_id, gail.user_id);
assert.deepStrictEqual(peer_data.get_subscriber_count(india.stream_id), 19);
// Now fetch the actual subscribers
channel.get = () => ({
subscribers: [george.user_id, fred.user_id],
});
await peer_data.maybe_fetch_stream_subscribers(india.stream_id);
assert.deepStrictEqual(peer_data.get_subscriber_count(india.stream_id), 2);
// Now we know Gail isn't subscribed, so we don't decrement the count
peer_data.remove_subscriber(india.stream_id, gail.user_id);
assert.deepStrictEqual(peer_data.get_subscriber_count(india.stream_id), 2);
}); });
test("is_subscriber_subset", async () => { test("is_subscriber_subset", async () => {

View File

@@ -136,7 +136,6 @@ const denmark = {
render_subscribers: true, render_subscribers: true,
}; };
const denmark_item = stream_item(denmark); const denmark_item = stream_item(denmark);
peer_data.set_subscribers(denmark.stream_id, [me.user_id, mark.user_id]);
const sweden = { const sweden = {
stream_id: 2, stream_id: 2,
@@ -144,12 +143,13 @@ const sweden = {
subscribed: false, subscribed: false,
}; };
const sweden_item = stream_item(sweden); const sweden_item = stream_item(sweden);
peer_data.set_subscribers(sweden.stream_id, [mark.user_id, jill.user_id]);
const subs = [denmark, sweden]; const subs = [denmark, sweden];
for (const sub of subs) { for (const sub of subs) {
stream_data.add_sub(sub); stream_data.add_sub(sub);
} }
peer_data.set_subscribers(denmark.stream_id, [me.user_id, mark.user_id]);
peer_data.set_subscribers(sweden.stream_id, [mark.user_id, jill.user_id]);
run_test("set_up_user", ({mock_template, override, override_rewire}) => { run_test("set_up_user", ({mock_template, override, override_rewire}) => {
mock_template("typeahead_list_item.hbs", false, (args) => { mock_template("typeahead_list_item.hbs", false, (args) => {

View File

@@ -65,9 +65,6 @@ const germany = {
can_subscribe_group: nobody_group.id, can_subscribe_group: nobody_group.id,
}; };
peer_data.set_subscribers(denmark.stream_id, [1, 2, 77]);
peer_data.set_subscribers(sweden.stream_id, [1, 2, 3, 4, 5]);
const denmark_pill = { const denmark_pill = {
type: "stream", type: "stream",
stream_id: denmark.stream_id, stream_id: denmark.stream_id,
@@ -84,6 +81,9 @@ for (const sub of subs) {
stream_data.add_sub(sub); stream_data.add_sub(sub);
} }
peer_data.set_subscribers(denmark.stream_id, [1, 2, 77]);
peer_data.set_subscribers(sweden.stream_id, [1, 2, 3, 4, 5]);
people.add_active_user(me); people.add_active_user(me);
people.initialize_current_user(me.user_id); people.initialize_current_user(me.user_id);