[socket] Use socket req ids that contain queue ids

(imported from commit 781eafe60bf117d79ce3a30deea02ec1e875060a)
This commit is contained in:
Leo Franchi
2014-01-17 15:35:25 -05:00
parent 24cb5fb079
commit 56d9446c41
3 changed files with 21 additions and 10 deletions

View File

@@ -5,7 +5,7 @@ function Socket(url) {
this._is_reconnecting = false;
this._reconnect_initiation_time = null;
this._send_queue = [];
this._next_req_id = 0;
this._next_req_id_counter = 0;
this._requests = {};
this._connection_failures = 0;
this._reconnect_timeout_id = null;
@@ -43,10 +43,21 @@ Socket.prototype = {
this._do_send('request', msg, success, error);
},
_get_next_req_id: function Socket__get_next_req_id() {
var req_id = page_params.event_queue_id + ':' + this._next_req_id_counter;
this._next_req_id_counter++;
return req_id;
},
_req_id_too_new: function Socket__req_id_too_new(req_id) {
var counter = req_id.split(':')[2];
return parseInt(counter, 10) >= this._next_req_id_counter;
},
_do_send: function Socket__do_send(type, msg, success, error) {
var req_id = this._next_req_id;
var req_id = this._get_next_req_id();
var that = this;
this._next_req_id++;
this._requests[req_id] = {type: type, request: msg, success: success,
error: error};
this._requests[req_id].ack_timeout_id = setTimeout(function () {
@@ -106,7 +117,7 @@ Socket.prototype = {
_process_response: function Socket__process_response(req_id, response) {
var req_info = this._requests[req_id];
if (req_info === undefined) {
if (req_id >= this._next_req_id) {
if (this._req_id_too_new(req_id)) {
blueslip.error("Got a response for an unknown request",
{request_id: req_id, next_id: this._next_req_id,
outstanding_ids: _.keys(this._requests)});
@@ -133,7 +144,7 @@ Socket.prototype = {
var req_info = this._requests[req_id];
if (req_info === undefined) {
blueslip.error("Got an ACK for an unknown request",
{request_id: req_id, next_id: this._next_req_id,
{request_id: req_id, next_id: this._next_req_id_counter,
outstanding_ids: _.keys(this._requests)});
return;
}

View File

@@ -59,8 +59,8 @@ def deregister_connection(conn):
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
def req_redis_key(client_id, req_id):
return 'socket_req_status:%s:%s' % (client_id, req_id)
def req_redis_key(req_id):
return 'socket_req_status:%s' % (req_id,)
class SocketAuthError(Exception):
def __init__(self, msg):
@@ -137,7 +137,7 @@ class SocketConnection(sockjs.tornado.SockJSConnection):
if status_inquiries is not None:
results = {}
for inquiry in status_inquiries:
status = redis_client.hgetall(req_redis_key(self.client_id, inquiry))
status = redis_client.hgetall(req_redis_key(inquiry))
if len(status) == 0:
status['status'] = 'not_received'
if 'response' in status:
@@ -191,7 +191,7 @@ class SocketConnection(sockjs.tornado.SockJSConnection):
status_code=403, error_content=ujson.dumps(response))
return
redis_key = req_redis_key(self.client_id, msg['req_id'])
redis_key = req_redis_key(msg['req_id'])
with redis_client.pipeline() as pipeline:
pipeline.hmset(redis_key, {'status': 'received'})
pipeline.expire(redis_key, 60 * 60 * 24)

View File

@@ -298,7 +298,7 @@ class MessageSenderWorker(QueueProcessingWorker):
result = {'response': ujson.loads(resp_content), 'req_id': event['req_id'],
'server_meta': server_meta}
redis_key = req_redis_key(server_meta['client_id'], event['req_id'])
redis_key = req_redis_key(event['req_id'])
self.redis_client.hmset(redis_key, {'status': 'complete',
'response': resp_content});