[manual] Add endpoint to cleanup a finished events queue.

This requires a puppet apply on each of staging and prod0 to update
the nginx configuration to support the new URL when it is deployed.

(imported from commit a35a71a563fd1daca0d3ea4ec6874c5719a8564f)
This commit is contained in:
Tim Abbott
2013-11-19 17:11:30 -05:00
parent 5973f20b69
commit ca8225cf47
6 changed files with 37 additions and 13 deletions

View File

@@ -14,7 +14,7 @@ location /static/ {
} }
# Send longpoll requests to Tornado # Send longpoll requests to Tornado
location ~ /json/get_events|/api/v1/events { location ~ /json/get_events|/json/events|/api/v1/events {
proxy_pass http://localhost:9993; proxy_pass http://localhost:9993;
proxy_redirect off; proxy_redirect off;

View File

@@ -84,6 +84,7 @@ class Resource(resource.Resource):
request.requestHeaders.setRawHeaders('X-Forwarded-Host', [proxy_host]) request.requestHeaders.setRawHeaders('X-Forwarded-Host', [proxy_host])
if (request.uri in ['/json/get_events'] or if (request.uri in ['/json/get_events'] or
request.uri.startswith('/json/events') or
request.uri.startswith('/api/v1/events') or request.uri.startswith('/api/v1/events') or
request.uri.startswith('/sockjs')): request.uri.startswith('/sockjs')):
return proxy.ReverseProxyResource('localhost', tornado_port, '/'+name) return proxy.ReverseProxyResource('localhost', tornado_port, '/'+name)

View File

@@ -141,6 +141,10 @@ class ClientDescriptor(object):
ioloop.remove_timeout(self._timeout_handle) ioloop.remove_timeout(self._timeout_handle)
self._timeout_handle = None self._timeout_handle = None
def cleanup(self):
do_gc_event_queues([self.event_queue.id], [self.user_profile_id],
[self.realm_id])
class EventQueue(object): class EventQueue(object):
def __init__(self, id): def __init__(self, id):
self.queue = deque() self.queue = deque()
@@ -217,17 +221,7 @@ def allocate_client_descriptor(user_profile_id, realm_id, event_types, client_ty
realm_clients_all_streams.setdefault(realm_id, []).append(client) realm_clients_all_streams.setdefault(realm_id, []).append(client)
return client return client
def gc_event_queues(): def do_gc_event_queues(to_remove, affected_users, affected_realms):
start = time.time()
to_remove = set()
affected_users = set()
affected_realms = set()
for (id, client) in clients.iteritems():
if client.idle(start):
to_remove.add(id)
affected_users.add(client.user_profile_id)
affected_realms.add(client.realm_id)
def filter_client_dict(client_dict, key): def filter_client_dict(client_dict, key):
if key not in client_dict: if key not in client_dict:
return return
@@ -250,6 +244,19 @@ def gc_event_queues():
cb(clients[id].user_profile_id, clients[id], clients[id].user_profile_id not in user_clients) cb(clients[id].user_profile_id, clients[id], clients[id].user_profile_id not in user_clients)
del clients[id] del clients[id]
def gc_event_queues():
start = time.time()
to_remove = set()
affected_users = set()
affected_realms = set()
for (id, client) in clients.iteritems():
if client.idle(start):
to_remove.add(id)
affected_users.add(client.user_profile_id)
affected_realms.add(client.realm_id)
do_gc_event_queues(to_remove, affected_users, affected_realms)
logging.info(('Tornado removed %d idle event queues owned by %d users in %.3fs.' logging.info(('Tornado removed %d idle event queues owned by %d users in %.3fs.'
+ ' Now %d active queues') + ' Now %d active queues')
% (len(to_remove), len(affected_users), time.time() - start, % (len(to_remove), len(affected_users), time.time() - start,

View File

@@ -88,6 +88,7 @@ class Command(BaseCommand):
try: try:
urls = (r"/notify_tornado", urls = (r"/notify_tornado",
r"/json/get_events", r"/json/get_events",
r"/json/events",
r"/api/v1/events", r"/api/v1/events",
) )
@@ -178,6 +179,9 @@ class AsyncDjangoHandler(tornado.web.RequestHandler, base.BaseHandler):
def post(self): def post(self):
self.get() self.get()
def delete(self):
self.get()
# Based on django.core.handlers.base: get_response # Based on django.core.handlers.base: get_response
def get_response(self, request): def get_response(self, request):
"Returns an HttpResponse object for the given HttpRequest" "Returns an HttpResponse object for the given HttpRequest"

View File

@@ -23,6 +23,17 @@ def notify(request):
process_notification(ujson.loads(request.POST['data'])) process_notification(ujson.loads(request.POST['data']))
return json_success() return json_success()
@has_request_variables
def cleanup_event_queue(request, user_profile, queue_id=REQ()):
client = get_client_descriptor(queue_id)
if client is None:
return json_error("Bad event queue id: %s" % (queue_id,))
if user_profile.id != client.user_profile_id:
return json_error("You are not authorized to access this queue")
request._log_data['extra'] = "[%s]" % (queue_id,)
client.cleanup()
return json_success()
@authenticated_json_post_view @authenticated_json_post_view
def json_get_events(request, user_profile): def json_get_events(request, user_profile):
return get_events_backend(request, user_profile, apply_markdown=True) return get_events_backend(request, user_profile, apply_markdown=True)

View File

@@ -219,7 +219,8 @@ v1_api_and_json_patterns = patterns('zerver.views',
) + patterns('zerver.tornadoviews', ) + patterns('zerver.tornadoviews',
url(r'^events$', 'rest_dispatch', url(r'^events$', 'rest_dispatch',
{'GET': 'get_events_backend'}), {'GET': 'get_events_backend',
'DELETE': 'cleanup_event_queue'}),
) )
if not settings.ENTERPRISE: if not settings.ENTERPRISE:
v1_api_and_json_patterns += patterns('', v1_api_and_json_patterns += patterns('',