stream_topic_history: Ask server for last msg id for historical topics.

Historical topics = Topics which we received from server and have
no messages cached for them to make any sense for the removed messages.
This commit is contained in:
Aman Agrawal
2024-06-29 10:00:56 +00:00
committed by Tim Abbott
parent e82fe09cfc
commit bb5c732dae
2 changed files with 14 additions and 10 deletions

View File

@@ -185,18 +185,14 @@ export class PerStreamHistory {
maybe_remove(topic_name: string, num_messages: number): void { maybe_remove(topic_name: string, num_messages: number): void {
const existing = this.topics.get(topic_name); const existing = this.topics.get(topic_name);
if (!existing || existing.count === 0) { if (!existing) {
return; return;
} }
if (existing.count <= num_messages) { if (existing.count <= num_messages) {
this.topics.delete(topic_name); this.topics.delete(topic_name);
if (!is_complete_for_stream_id(this.stream_id)) { // Verify if this topic still has messages from the server.
// Request server for latest message in topic if we update_topic_last_message_id(this.stream_id, topic_name);
// cannot be sure that we have all messages in the topic.
update_topic_last_message_id(this.stream_id, topic_name);
return;
}
} }
existing.count -= num_messages; existing.count -= num_messages;

View File

@@ -214,15 +214,23 @@ test("server_history", () => {
history = stream_topic_history.get_recent_topic_names(stream_id); history = stream_topic_history.get_recent_topic_names(stream_id);
assert.deepEqual(history, ["hist2", "hist1"]); assert.deepEqual(history, ["hist2", "hist1"]);
// We can try to remove a historical message, but it should // Removing message from a topic fetched from server history, will send
// have no effect. // query to the server to get the latest message id in the topic.
let update_topic_called = false;
stream_topic_history.set_update_topic_last_message_id((stream_id, topic_name) => {
assert.equal(stream_id, 66);
assert.equal(topic_name, "hist2");
update_topic_called = true;
});
stream_topic_history.remove_messages({ stream_topic_history.remove_messages({
stream_id, stream_id,
topic_name: "hist2", topic_name: "hist2",
num_messages: 1, num_messages: 1,
}); });
assert.equal(update_topic_called, true);
history = stream_topic_history.get_recent_topic_names(stream_id); history = stream_topic_history.get_recent_topic_names(stream_id);
assert.deepEqual(history, ["hist2", "hist1"]); assert.deepEqual(history, ["hist1"]);
stream_topic_history.set_update_topic_last_message_id(noop);
// If we call back to the server for history, the // If we call back to the server for history, the
// effect is always additive. We may decide to prune old // effect is always additive. We may decide to prune old