mirror of
https://github.com/zulip/zulip.git
synced 2025-11-09 08:26:11 +00:00
Make event queues persistent across Tornado restarts using cPickle
(imported from commit 1434d1e9d394d725827b1740c0c07249d5e716ed)
This commit is contained in:
@@ -1,11 +1,16 @@
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from tornado.ioloop import PeriodicCallback
|
from tornado.ioloop import PeriodicCallback
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
import socket
|
import socket
|
||||||
import logging
|
import logging
|
||||||
import simplejson
|
import simplejson
|
||||||
import requests
|
import requests
|
||||||
|
import cPickle as pickle
|
||||||
|
import atexit
|
||||||
|
import sys
|
||||||
|
import signal
|
||||||
|
|
||||||
IDLE_EVENT_QUEUE_TIMEOUT_SECS = 60 * 10
|
IDLE_EVENT_QUEUE_TIMEOUT_SECS = 60 * 10
|
||||||
|
|
||||||
@@ -54,7 +59,6 @@ class ClientDescriptor(object):
|
|||||||
class EventQueue(object):
|
class EventQueue(object):
|
||||||
def __init__(self, id):
|
def __init__(self, id):
|
||||||
self.queue = deque()
|
self.queue = deque()
|
||||||
self.connected_handler = None
|
|
||||||
self.next_event_id = 0
|
self.next_event_id = 0
|
||||||
self.id = id
|
self.id = id
|
||||||
|
|
||||||
@@ -116,7 +120,47 @@ def gc_event_queues():
|
|||||||
% (len(to_remove), len(affected_users), time.time() - start,
|
% (len(to_remove), len(affected_users), time.time() - start,
|
||||||
len(clients)))
|
len(clients)))
|
||||||
|
|
||||||
def setup_event_queue_gc(io_loop):
|
PERSISTENT_QUEUE_FILENAME = os.path.join(os.path.dirname(__file__),
|
||||||
|
"..", "event_queues.pickle")
|
||||||
|
|
||||||
|
def dump_event_queues():
|
||||||
|
start = time.time()
|
||||||
|
# Remove unpickle-able attributes
|
||||||
|
for client in clients.itervalues():
|
||||||
|
client.current_handler = None
|
||||||
|
|
||||||
|
with file(PERSISTENT_QUEUE_FILENAME, "w") as stored_queues:
|
||||||
|
pickle.dump(clients, stored_queues)
|
||||||
|
|
||||||
|
logging.info('Tornado dumped %d event queues in %.3fs'
|
||||||
|
% (len(clients), time.time() - start))
|
||||||
|
|
||||||
|
def setup_event_queue(io_loop):
|
||||||
|
global clients
|
||||||
|
start = time.time()
|
||||||
|
# Read in existing event queues
|
||||||
|
try:
|
||||||
|
with file(PERSISTENT_QUEUE_FILENAME, "r") as stored_queues:
|
||||||
|
clients = pickle.load(stored_queues)
|
||||||
|
except (IOError, EOFError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
for client in clients.itervalues():
|
||||||
|
user_clients.setdefault(client.user_profile_id, []).append(client)
|
||||||
|
|
||||||
|
logging.info('Tornado loaded %d event queues in %.3fs'
|
||||||
|
% (len(clients), time.time() - start))
|
||||||
|
|
||||||
|
atexit.register(dump_event_queues)
|
||||||
|
# Make sure we dump event queues even if we exit via signal
|
||||||
|
signal.signal(signal.SIGTERM, lambda signum, stack: sys.exit(1))
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.remove(PERSISTENT_QUEUE_FILENAME)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Set up event queue garbage collection
|
||||||
pc = PeriodicCallback(gc_event_queues, EVENT_QUEUE_GC_FREQ_MSECS, io_loop)
|
pc = PeriodicCallback(gc_event_queues, EVENT_QUEUE_GC_FREQ_MSECS, io_loop)
|
||||||
pc.start()
|
pc.start()
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ from tornado import ioloop
|
|||||||
from zephyr.lib.debug import interactive_debug_listen
|
from zephyr.lib.debug import interactive_debug_listen
|
||||||
from zephyr.lib.response import json_response
|
from zephyr.lib.response import json_response
|
||||||
from zephyr import tornado_callbacks
|
from zephyr import tornado_callbacks
|
||||||
from zephyr.lib.event_queue import setup_event_queue_gc
|
from zephyr.lib.event_queue import setup_event_queue
|
||||||
|
|
||||||
if settings.USING_RABBITMQ:
|
if settings.USING_RABBITMQ:
|
||||||
from zephyr.lib.queue import queue_client
|
from zephyr.lib.queue import queue_client
|
||||||
@@ -105,7 +105,7 @@ class Command(BaseCommand):
|
|||||||
if django.conf.settings.DEBUG:
|
if django.conf.settings.DEBUG:
|
||||||
ioloop.IOLoop.instance().set_blocking_log_threshold(5)
|
ioloop.IOLoop.instance().set_blocking_log_threshold(5)
|
||||||
|
|
||||||
setup_event_queue_gc(ioloop.IOLoop.instance())
|
setup_event_queue(ioloop.IOLoop.instance())
|
||||||
ioloop.IOLoop.instance().start()
|
ioloop.IOLoop.instance().start()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|||||||
Reference in New Issue
Block a user