refactor: Extract bulk functions to add/remove peers.

We also streamline some of the error handling code
by doing everything up front.  This will prevent
scenarios where a single bad stream_id/user_id causes a
bunch of the same warnings in an inner loop.
This commit is contained in:
Steve Howell
2021-01-27 14:54:06 +00:00
committed by Tim Abbott
parent e820c43b77
commit 2edfdb4ff8
5 changed files with 83 additions and 79 deletions

View File

@@ -2,7 +2,7 @@
const {strict: assert} = require("assert");
const {set_global, with_field, zrequire} = require("../zjsunit/namespace");
const {set_global, zrequire} = require("../zjsunit/namespace");
const {make_stub, with_stub} = require("../zjsunit/stub");
const {run_test} = require("../zjsunit/test");
@@ -123,70 +123,29 @@ test("add error handling", (override) => {
});
});
test("peer event error handling (bad stream_ids)", (override) => {
test("peer event error handling (bad stream_ids/user_ids)", (override) => {
override("compose_fade.update_faded_users", () => {});
const add_event = {
type: "subscription",
op: "peer_add",
stream_ids: [99999],
stream_ids: [8888, 9999],
user_ids: [3333, 4444],
};
blueslip.expect("warn", "Cannot find stream for peer_add: 99999");
blueslip.expect("warn", "We have untracked stream_ids: 8888,9999");
blueslip.expect("warn", "We have untracked user_ids: 3333,4444");
dispatch(add_event);
blueslip.reset();
const remove_event = {
type: "subscription",
op: "peer_remove",
stream_ids: [99999],
stream_ids: [8888, 9999],
user_ids: [3333, 4444],
};
blueslip.expect("warn", "Cannot find stream for peer_remove: 99999");
blueslip.expect("warn", "We have untracked stream_ids: 8888,9999");
blueslip.expect("warn", "We have untracked user_ids: 3333,4444");
dispatch(remove_event);
});
test("peer event error handling (add/remove subscriber)", (override) => {
override("compose_fade.update_faded_users", () => {});
override("subs.update_subscribers_ui", () => {});
stream_data.add_sub({
name: "devel",
stream_id: 1,
});
with_field(
peer_data,
"add_subscriber",
() => false,
() => {
const add_event = {
type: "subscription",
op: "peer_add",
stream_ids: [1],
user_ids: [99999], // id is irrelevant
};
blueslip.expect("warn", "Cannot process peer_add event");
dispatch(add_event);
blueslip.reset();
},
);
with_field(
peer_data,
"remove_subscriber",
() => false,
() => {
const remove_event = {
type: "subscription",
op: "peer_remove",
stream_ids: [1],
user_ids: [99999], // id is irrelevant
};
blueslip.expect("warn", "Cannot process peer_remove event.");
dispatch(remove_event);
},
);
});

View File

@@ -94,6 +94,7 @@ export function get_subscribers(stream_id) {
export function set_subscribers(stream_id, user_ids) {
const subscribers = new LazySet(user_ids || []);
stream_subscribers.set(stream_id, subscribers);
return subscribers;
}
export function add_subscriber(stream_id, user_id) {
@@ -128,6 +129,26 @@ export function remove_subscriber(stream_id, user_id) {
return true;
}
export function bulk_add_subscribers({stream_ids, user_ids}) {
// We rely on our callers to validate stream_ids and user_ids.
for (const stream_id of stream_ids) {
const subscribers = stream_subscribers.get(stream_id) || set_subscribers(stream_id);
for (const user_id of user_ids) {
subscribers.add(user_id);
}
}
}
export function bulk_remove_subscribers({stream_ids, user_ids}) {
// We rely on our callers to validate stream_ids and user_ids.
for (const stream_id of stream_ids) {
const subscribers = stream_subscribers.get(stream_id) || set_subscribers(stream_id);
for (const user_id of user_ids) {
subscribers.delete(user_id);
}
}
}
export function is_user_subscribed(stream_id, user_id) {
// Most callers should call stream_data.is_user_subscribed,
// which does additional checks.

View File

@@ -56,6 +56,25 @@ export function get_by_user_id(user_id, ignore_missing) {
return people_by_user_id_dict.get(user_id);
}
export function validate_user_ids(user_ids) {
const good_ids = [];
const bad_ids = [];
for (const user_id of user_ids) {
if (people_by_user_id_dict.has(user_id)) {
good_ids.push(user_id);
} else {
bad_ids.push(user_id);
}
}
if (bad_ids.length > 0) {
blueslip.warn(`We have untracked user_ids: ${bad_ids}`);
}
return good_ids;
}
export function get_by_email(email) {
const person = people_dict.get(email);

View File

@@ -345,42 +345,28 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) {
}
}
} else if (event.op === "peer_add") {
for (const stream_id of event.stream_ids) {
const stream_ids = stream_data.validate_stream_ids(event.stream_ids);
const user_ids = people.validate_user_ids(event.user_ids);
peer_data.bulk_add_subscribers({stream_ids, user_ids});
for (const stream_id of stream_ids) {
const sub = stream_data.get_sub_by_id(stream_id);
if (!sub) {
blueslip.warn("Cannot find stream for peer_add: " + stream_id);
continue;
}
for (const user_id of event.user_ids) {
if (!peer_data.add_subscriber(stream_id, user_id)) {
blueslip.warn("Cannot process peer_add event");
continue;
}
}
subs.update_subscribers_ui(sub);
}
compose_fade.update_faded_users();
} else if (event.op === "peer_remove") {
for (const stream_id of event.stream_ids) {
const stream_ids = stream_data.validate_stream_ids(event.stream_ids);
const user_ids = people.validate_user_ids(event.user_ids);
peer_data.bulk_remove_subscribers({stream_ids, user_ids});
for (const stream_id of stream_ids) {
const sub = stream_data.get_sub_by_id(stream_id);
if (!sub) {
blueslip.warn("Cannot find stream for peer_remove: " + stream_id);
continue;
}
for (const user_id of event.user_ids) {
if (!peer_data.remove_subscriber(sub.stream_id, user_id)) {
blueslip.warn("Cannot process peer_remove event.");
continue;
}
}
subs.update_subscribers_ui(sub);
}
compose_fade.update_faded_users();
} else if (event.op === "remove") {
for (const rec of event.subscriptions) {

View File

@@ -225,6 +225,25 @@ exports.get_sub_by_id = function (stream_id) {
return subs_by_stream_id.get(stream_id);
};
exports.validate_stream_ids = function (stream_ids) {
const good_ids = [];
const bad_ids = [];
for (const stream_id of stream_ids) {
if (subs_by_stream_id.has(stream_id)) {
good_ids.push(stream_id);
} else {
bad_ids.push(stream_id);
}
}
if (bad_ids.length > 0) {
blueslip.warn(`We have untracked stream_ids: ${bad_ids}`);
}
return good_ids;
};
exports.get_stream_id = function (name) {
// Note: Only use this function for situations where
// you are comfortable with a user dealing with an