Use ujson instead of pickle for serializing event queues

(imported from commit 2d90574ae97cc973c9686c2bd557cc199d4b0850)
This commit is contained in:
Zev Benjamin
2013-11-19 17:51:32 -05:00
parent 69e5e2c3d2
commit 25c63f08de
2 changed files with 66 additions and 12 deletions

View File

@@ -17,6 +17,7 @@ import random
import traceback
from zerver.lib.utils import statsd
from zerver.middleware import async_request_restart
from zerver.models import get_client
# The idle timeout used to be a week, but we found that in that
# situation, queues from dead browser sessions would grow quite large
@@ -38,9 +39,10 @@ HEARTBEAT_MIN_FREQ_SECS = 45
class ClientDescriptor(object):
def __init__(self, user_profile_id, realm_id, event_queue, event_types, client_type,
apply_markdown=True, all_public_streams=False, lifespan_secs=0):
# These objects are pickled on shutdown and restored on restart.
# These objects are serialized on shutdown and restored on restart.
# If fields are added or semantics are changed, temporary code must be
# added to load_event_queues() to update the restored objects
# added to load_event_queues() to update the restored objects.
# Additionally, the to_dict and from_dict methods must be updated
self.user_profile_id = user_profile_id
self.realm_id = realm_id
self.current_handler = None
@@ -56,6 +58,26 @@ class ClientDescriptor(object):
# Clamp queue_timeout to between minimum and maximum timeouts
self.queue_timeout = max(IDLE_EVENT_QUEUE_TIMEOUT_SECS, min(self.queue_timeout, MAX_QUEUE_TIMEOUT_SECS))
def to_dict(self):
return dict(user_profile_id=self.user_profile_id,
realm_id=self.realm_id,
event_queue=self.event_queue.to_dict(),
queue_timeout=self.queue_timeout,
event_types=self.event_types,
last_connection_time=self.last_connection_time,
apply_markdown=self.apply_markdown,
all_public_streams=self.all_public_streams,
client_type=self.client_type.name)
@classmethod
def from_dict(cls, d):
ret = cls(d['user_profile_id'], d['realm_id'],
EventQueue.from_dict(d['event_queue']), d['event_types'],
get_client(d['client_type']), d['apply_markdown'], d['all_public_streams'],
d['queue_timeout'])
ret.last_connection_time = d['last_connection_time']
return ret
def prepare_for_pickling(self):
self.current_handler = None
self._timeout_handle = None
@@ -125,6 +147,17 @@ class EventQueue(object):
self.next_event_id = 0
self.id = id
def to_dict(self):
return dict(id=self.id, next_event_id=self.next_event_id,
queue=list(self.queue))
@classmethod
def from_dict(cls, d):
ret = cls(d['id'])
ret.next_event_id = d['next_event_id']
ret.queue = deque(d['queue'])
return ret
def push(self, event):
event['id'] = self.next_event_id
self.next_event_id += 1
@@ -226,12 +259,10 @@ def gc_event_queues():
def dump_event_queues():
start = time.time()
# Remove unpickle-able attributes
for client in clients.itervalues():
client.prepare_for_pickling()
with file(settings.PERSISTENT_QUEUE_FILENAME, "w") as stored_queues:
pickle.dump(clients, stored_queues)
with file(settings.JSON_PERSISTENT_QUEUE_FILENAME, "w") as stored_queues:
ujson.dump([(qid, client.to_dict()) for (qid, client) in clients.iteritems()],
stored_queues)
logging.info('Tornado dumped %d event queues in %.3fs'
% (len(clients), time.time() - start))
@@ -239,11 +270,27 @@ def dump_event_queues():
def load_event_queues():
global clients
start = time.time()
try:
with file(settings.PERSISTENT_QUEUE_FILENAME, "r") as stored_queues:
clients = pickle.load(stored_queues)
except (IOError, EOFError):
pass
if os.path.exists(settings.PERSISTENT_QUEUE_FILENAME):
try:
with file(settings.PERSISTENT_QUEUE_FILENAME, "r") as stored_queues:
clients = pickle.load(stored_queues)
except (IOError, EOFError):
pass
else:
# ujson chokes on bad input pretty easily. We separate out the actual
# file reading from the loading so that we don't silently fail if we get
# bad input.
try:
with file(settings.JSON_PERSISTENT_QUEUE_FILENAME, "r") as stored_queues:
json_data = stored_queues.read()
try:
clients = dict((qid, ClientDescriptor.from_dict(client))
for (qid, client) in ujson.loads(json_data))
except Exception:
logging.exception("Could not deserialize event queues")
except (IOError, EOFError):
pass
for client in clients.itervalues():
# Put code for migrations due to event queue data format changes here
@@ -273,6 +320,11 @@ def setup_event_queue():
except OSError:
pass
try:
os.rename(settings.JSON_PERSISTENT_QUEUE_FILENAME, "/var/tmp/event_queues.json.last")
except OSError:
pass
# Set up event queue garbage collection
ioloop = tornado.ioloop.IOLoop.instance()
pc = tornado.ioloop.PeriodicCallback(gc_event_queues,