stream_topic_history_util: Add retry mechanism to get_server_history.

Earlier, in `get_server_history` the API call to fetch stream-topic
history didn't have a retry logic.

This commit adds a retry mechanism.

If the request to fetch stream topic history fails, we now retry
the request up to a fixed number of times before giving up.
This improves UX in case of temporary network issues.
This commit is contained in:
Prakhar Pratyush
2025-06-26 19:06:01 +05:30
committed by Tim Abbott
parent 2bf13c36be
commit 43249cb431
2 changed files with 58 additions and 27 deletions

View File

@@ -3,6 +3,7 @@ import {z} from "zod";
import * as channel from "./channel.ts";
import * as stream_topic_history from "./stream_topic_history.ts";
import * as util from "./util.ts";
const stream_topic_history_response_schema = z.object({
topics: z.array(
@@ -15,6 +16,38 @@ const stream_topic_history_response_schema = z.object({
const pending_on_success_callbacks = new Map<number, (() => void)[]>();
export const MAX_RETRIES = 5;
function fetch_channel_history_with_retry(stream_id: number, attempt = 1): void {
if (attempt > MAX_RETRIES) {
pending_on_success_callbacks.delete(stream_id);
stream_topic_history.remove_request_pending_for(stream_id);
return;
}
const url = "/json/users/me/" + stream_id + "/topics";
void channel.get({
url,
data: {allow_empty_topic_name: true},
success(raw_data) {
const data = stream_topic_history_response_schema.parse(raw_data);
const server_history = data.topics;
stream_topic_history.add_history(stream_id, server_history);
stream_topic_history.remove_request_pending_for(stream_id);
for (const callback of pending_on_success_callbacks.get(stream_id)!) {
callback();
}
pending_on_success_callbacks.delete(stream_id);
},
error(xhr) {
const retry_delay_secs = util.get_retry_backoff_seconds(xhr, attempt);
setTimeout(() => {
fetch_channel_history_with_retry(stream_id, attempt + 1);
}, retry_delay_secs * 1000);
},
});
}
export function get_server_history(stream_id: number, on_success: () => void): void {
if (stream_topic_history.has_history_for(stream_id)) {
on_success();
@@ -30,27 +63,7 @@ export function get_server_history(stream_id: number, on_success: () => void): v
stream_topic_history.add_request_pending_for(stream_id);
pending_on_success_callbacks.set(stream_id, [on_success]);
const url = "/json/users/me/" + stream_id + "/topics";
void channel.get({
url,
data: {allow_empty_topic_name: true},
success(raw_data) {
const data = stream_topic_history_response_schema.parse(raw_data);
const server_history = data.topics;
stream_topic_history.add_history(stream_id, server_history);
stream_topic_history.remove_request_pending_for(stream_id);
for (const callback of pending_on_success_callbacks.get(stream_id)!) {
callback();
}
pending_on_success_callbacks.delete(stream_id);
},
error() {
// TODO: Implement some sort of retry logic,
// before giving up on the first failure.
pending_on_success_callbacks.delete(stream_id);
stream_topic_history.remove_request_pending_for(stream_id);
},
});
fetch_channel_history_with_retry(stream_id);
}
export function update_topic_last_message_id(

View File

@@ -2,7 +2,7 @@
const assert = require("node:assert/strict");
const {mock_esm, zrequire} = require("./lib/namespace.cjs");
const {mock_esm, set_global, zrequire} = require("./lib/namespace.cjs");
const {run_test, noop} = require("./lib/test.cjs");
const channel = mock_esm("../src/channel");
@@ -291,7 +291,6 @@ test("server_history_end_to_end", () => {
];
let get_success_callback;
let get_error_callback;
let on_success_called;
channel.get = (opts) => {
@@ -299,7 +298,6 @@ test("server_history_end_to_end", () => {
assert.deepEqual(opts.data, {allow_empty_topic_name: true});
assert.ok(stream_topic_history.is_request_pending_for(stream_id));
get_success_callback = opts.success;
get_error_callback = opts.error;
};
stream_topic_history_util.get_server_history(stream_id, noop);
@@ -309,9 +307,6 @@ test("server_history_end_to_end", () => {
stream_topic_history_util.get_server_history(stream_id, noop);
assert.ok(stream_topic_history.is_request_pending_for(stream_id));
get_error_callback();
assert.ok(!stream_topic_history.is_request_pending_for(stream_id));
stream_topic_history_util.get_server_history(stream_id, () => {
on_success_called = true;
});
@@ -345,6 +340,29 @@ test("server_history_end_to_end", () => {
assert.ok(on_success_called);
});
test("server_history_error", () => {
set_global("setTimeout", (f) => {
f();
});
stream_topic_history.reset();
const channel_id = 99;
let total_attempts = 0;
channel.get = (opts) => {
assert.equal(opts.url, "/json/users/me/99/topics");
assert.ok(stream_topic_history.is_request_pending_for(channel_id));
// This mocks error on each GET request.
opts.error();
total_attempts += 1;
};
stream_topic_history_util.get_server_history(channel_id, noop);
// Verify that we stop after MAX_RETRIES attempt.
assert.deepEqual(total_attempts, stream_topic_history_util.MAX_RETRIES);
assert.ok(!stream_topic_history.is_request_pending_for(channel_id));
});
test("ask_server_for_latest_topic_data", () => {
stream_topic_history.set_update_topic_last_message_id((stream_id, topic_name) => {
stream_topic_history_util.update_topic_last_message_id(stream_id, topic_name, noop);