mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 05:53:43 +00:00
[socket] Save pending and queued requests together
(imported from commit aecf59b26938c831da03a6a31545f2522387e67f)
This commit is contained in:
@@ -4,7 +4,6 @@ function Socket(url) {
|
||||
this._is_authenticated = false;
|
||||
this._is_reconnecting = false;
|
||||
this._reconnect_initiation_time = null;
|
||||
this._send_queue = [];
|
||||
this._next_req_id_counter = 0;
|
||||
this._requests = {};
|
||||
this._connection_failures = 0;
|
||||
@@ -33,14 +32,25 @@ function Socket(url) {
|
||||
}
|
||||
|
||||
Socket.prototype = {
|
||||
send: function Socket_send(msg, success, error) {
|
||||
_make_request: function Socket__make_request(type) {
|
||||
return {req_id: this._get_next_req_id(),
|
||||
type: type,
|
||||
state: 'pending'};
|
||||
},
|
||||
|
||||
send: function Socket__send(msg, success, error) {
|
||||
var request = this._make_request('request');
|
||||
request.msg = msg;
|
||||
request.success = success;
|
||||
request.error = error;
|
||||
this._save_request(request);
|
||||
|
||||
if (! this._can_send()) {
|
||||
this._send_queue.push({msg: msg, success: success, error: error});
|
||||
this._try_to_reconnect();
|
||||
return;
|
||||
}
|
||||
|
||||
this._do_send('request', msg, success, error);
|
||||
this._do_send(request);
|
||||
},
|
||||
|
||||
_get_next_req_id: function Socket__get_next_req_id() {
|
||||
@@ -55,20 +65,27 @@ Socket.prototype = {
|
||||
return parseInt(counter, 10) >= this._next_req_id_counter;
|
||||
},
|
||||
|
||||
_do_send: function Socket__do_send(type, msg, success, error) {
|
||||
var req_id = this._get_next_req_id();
|
||||
_req_id_sorter: function Socket__req_id_sorter(req_id_a, req_id_b) {
|
||||
// Sort in ascending order
|
||||
var a_count = parseInt(req_id_a.split(':')[2], 10);
|
||||
var b_count = parseInt(req_id_b.split(':')[2], 10);
|
||||
|
||||
return a_count - b_count;
|
||||
},
|
||||
|
||||
_do_send: function Socket__do_send(request) {
|
||||
var that = this;
|
||||
this._requests[req_id] = {type: type, request: msg, success: success,
|
||||
error: error};
|
||||
this._requests[req_id].ack_timeout_id = setTimeout(function () {
|
||||
blueslip.info("Timeout on ACK for request " + req_id);
|
||||
this._requests[request.req_id].ack_timeout_id = setTimeout(function () {
|
||||
blueslip.info("Timeout on ACK for request " + request.req_id);
|
||||
that._try_to_reconnect();
|
||||
}, 2000);
|
||||
|
||||
try {
|
||||
this._sockjs.send(JSON.stringify({req_id: req_id,
|
||||
type: type, request: msg}));
|
||||
this._update_request_state(request.req_id, 'sent');
|
||||
this._sockjs.send(JSON.stringify({req_id: request.req_id,
|
||||
type: request.type, request: request.msg}));
|
||||
} catch (e) {
|
||||
this._update_request_state(request.req_id, 'pending');
|
||||
if (e instanceof Error && e.message === 'INVALID_STATE_ERR') {
|
||||
// The connection was somehow closed. Our on-close handler will
|
||||
// be called imminently and we'll retry this request upon reconnect.
|
||||
@@ -95,23 +112,12 @@ Socket.prototype = {
|
||||
clearTimeout(req_info.ack_timeout_id);
|
||||
req_info.ack_timeout_id = null;
|
||||
}
|
||||
delete this._requests[req_id];
|
||||
|
||||
if (req_info.type !== 'request') {
|
||||
blueslip.error("Cannot resend message of type: " + req_info.type);
|
||||
return;
|
||||
}
|
||||
|
||||
this.send(req_info.request, req_info.success, req_info.error);
|
||||
},
|
||||
|
||||
_drain_queue: function Socket__drain_queue() {
|
||||
var that = this;
|
||||
var queue = this._send_queue;
|
||||
this._send_queue = [];
|
||||
_.each(queue, function (elem) {
|
||||
that.send(elem.msg, elem.success, elem.error);
|
||||
});
|
||||
this._do_send(req_info);
|
||||
},
|
||||
|
||||
_process_response: function Socket__process_response(req_id, response) {
|
||||
@@ -119,7 +125,7 @@ Socket.prototype = {
|
||||
if (req_info === undefined) {
|
||||
if (this._req_id_too_new(req_id)) {
|
||||
blueslip.error("Got a response for an unknown request",
|
||||
{request_id: req_id, next_id: this._next_req_id,
|
||||
{request_id: req_id, next_id: this._next_req_id_counter,
|
||||
outstanding_ids: _.keys(this._requests)});
|
||||
}
|
||||
// There is a small race where we might start reauthenticating
|
||||
@@ -132,12 +138,12 @@ Socket.prototype = {
|
||||
return;
|
||||
}
|
||||
|
||||
if (response.result === 'success') {
|
||||
if (response.result === 'success' && req_info.success !== undefined) {
|
||||
req_info.success(response);
|
||||
} else {
|
||||
} else if (req_info.error !== undefined) {
|
||||
req_info.error('response', response);
|
||||
}
|
||||
delete this._requests[req_id];
|
||||
this._remove_request(req_id);
|
||||
},
|
||||
|
||||
_process_ack: function Socket__process_ack(req_id) {
|
||||
@@ -173,29 +179,37 @@ Socket.prototype = {
|
||||
// We can only authenticate after the DOM has loaded because we need
|
||||
// the CSRF token
|
||||
$(function () {
|
||||
that._do_send('auth', {csrf_token: csrf_token,
|
||||
queue_id: page_params.event_queue_id,
|
||||
status_inquiries: _.keys(that._requests)},
|
||||
function (resp) {
|
||||
that._is_authenticated = true;
|
||||
that._is_reconnecting = false;
|
||||
that._reconnect_initiation_time = null;
|
||||
that._connection_failures = 0;
|
||||
_.each(resp.status_inquiries, function (status, id) {
|
||||
if (status.status === 'complete') {
|
||||
that._process_response(id, status.response);
|
||||
}
|
||||
if (status.status === 'not_received') {
|
||||
that._resend(id);
|
||||
}
|
||||
});
|
||||
that._drain_queue();
|
||||
},
|
||||
function (type, resp) {
|
||||
blueslip.info("Could not authenticate with server: " + resp.msg);
|
||||
that._connection_failures++;
|
||||
that._try_to_reconnect(that._reconnect_wait_time());
|
||||
});
|
||||
var request = that._make_request('auth');
|
||||
request.msg = {csrf_token: csrf_token,
|
||||
queue_id: page_params.event_queue_id,
|
||||
status_inquiries: _.keys(that._requests)};
|
||||
request.success = function (resp) {
|
||||
that._is_authenticated = true;
|
||||
that._is_reconnecting = false;
|
||||
that._reconnect_initiation_time = null;
|
||||
that._connection_failures = 0;
|
||||
var resend_queue = [];
|
||||
_.each(resp.status_inquiries, function (status, id) {
|
||||
if (status.status === 'complete') {
|
||||
that._process_response(id, status.response);
|
||||
} else if (status.status === 'received') {
|
||||
that._update_request_state(id, 'sent');
|
||||
} else if (status.status === 'not_received') {
|
||||
resend_queue.push(id);
|
||||
}
|
||||
});
|
||||
resend_queue.sort(that._req_id_sorter);
|
||||
_.each(resend_queue, function (id) {
|
||||
that._resend(id);
|
||||
});
|
||||
};
|
||||
request.error = function (type, resp) {
|
||||
blueslip.info("Could not authenticate with server: " + resp.msg);
|
||||
that._connection_failures++;
|
||||
that._try_to_reconnect(that._reconnect_wait_time());
|
||||
};
|
||||
that._save_request(request);
|
||||
that._do_send(request);
|
||||
});
|
||||
};
|
||||
|
||||
@@ -273,7 +287,7 @@ Socket.prototype = {
|
||||
}
|
||||
|
||||
if (val.type === 'auth') {
|
||||
delete that._requests[key];
|
||||
that._remove_request(key);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -289,5 +303,17 @@ Socket.prototype = {
|
||||
that._sockjs = new SockJS(that.url, null, {protocols_whitelist: that._supported_protocols});
|
||||
that._setup_sockjs_callbacks(that._sockjs);
|
||||
}, wait_time);
|
||||
},
|
||||
|
||||
_save_request: function Socket__save_request(request) {
|
||||
this._requests[request.req_id] = request;
|
||||
},
|
||||
|
||||
_remove_request: function Socket__remove_request(req_id) {
|
||||
delete this._requests[req_id];
|
||||
},
|
||||
|
||||
_update_request_state: function Socket__update_request_state(req_id, state) {
|
||||
this._requests[req_id].state = state;
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user