mirror of
https://github.com/zulip/zulip.git
synced 2025-10-23 04:52:12 +00:00
Implement long polling using Tornado.
(imported from commit 4385304b27d7fe55a57a23133cd214fe8fc33482)
This commit is contained in:
@@ -9,7 +9,7 @@ import os.path
|
||||
urlpatterns = patterns('',
|
||||
url(r'^$', 'zephyr.views.home', name='home'),
|
||||
url(r'^update$', 'zephyr.views.update', name='update'),
|
||||
url(r'^get_updates$', 'zephyr.views.get_updates', name='get_updates'),
|
||||
url(r'^get_updates_longpoll$', 'zephyr.views.get_updates_longpoll', name='get_updates_longpoll'),
|
||||
url(r'^zephyr/', 'zephyr.views.zephyr', name='zephyr'),
|
||||
url(r'^personal-zephyr/', 'zephyr.views.personal_zephyr', name='personal_zephyr'),
|
||||
url(r'^accounts/home/', 'zephyr.views.accounts_home', name='accounts_home'),
|
||||
@@ -17,7 +17,7 @@ urlpatterns = patterns('',
|
||||
url(r'^accounts/logout/', 'django.contrib.auth.views.logout', {'template_name': 'zephyr/index.html'}),
|
||||
url(r'^accounts/register/', 'zephyr.views.register', name='register'),
|
||||
url(r'^static/(?P<path>.*)$', 'django.views.static.serve',
|
||||
{'document_root': os.path.join(settings.SITE_ROOT, 'static/')})
|
||||
{'document_root': os.path.join(settings.SITE_ROOT, '..', 'zephyr', 'static/')})
|
||||
|
||||
# Uncomment the admin/doc line below to enable admin documentation:
|
||||
# url(r'^admin/doc/', include('django.contrib.admindocs.urls')),
|
||||
|
22
zephyr/decorator.py
Normal file
22
zephyr/decorator.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import tornado.web
|
||||
import types
|
||||
|
||||
class TornadoAsyncException(Exception): pass
|
||||
|
||||
class _DefGen_Return(BaseException):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def returnResponse(value):
|
||||
raise _DefGen_Return(value)
|
||||
|
||||
def asynchronous(method):
|
||||
def wrapper(request, *args, **kwargs):
|
||||
try:
|
||||
v = method(request, request._tornado_handler, *args, **kwargs)
|
||||
if v == None or type(v) == types.GeneratorType:
|
||||
raise TornadoAsyncException
|
||||
except _DefGen_Return, e:
|
||||
request._tornado_handler.finish(e.value.content)
|
||||
return v
|
||||
return wrapper
|
317
zephyr/management/commands/runtornado.py
Normal file
317
zephyr/management/commands/runtornado.py
Normal file
@@ -0,0 +1,317 @@
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from optparse import make_option
|
||||
from django.conf import settings
|
||||
import os
|
||||
import sys
|
||||
import tornado.web
|
||||
|
||||
class Command(BaseCommand):
|
||||
option_list = BaseCommand.option_list + (
|
||||
make_option('--noreload', action='store_false',
|
||||
dest='auto_reload', default=True,
|
||||
help="Configures tornado to not auto-reload (for prod use)."),
|
||||
make_option('--nokeepalive', action='store_true',
|
||||
dest='no_keep_alive', default=False,
|
||||
help="Tells Tornado to NOT keep alive http connections."),
|
||||
make_option('--noxheaders', action='store_false',
|
||||
dest='xheaders', default=True,
|
||||
help="Tells Tornado to NOT override remote IP with X-Real-IP."),
|
||||
)
|
||||
help = "Starts a Tornado Web server wrapping Django."
|
||||
args = '[optional port number or ipaddr:port]\n (use multiple ports to start multiple servers)'
|
||||
|
||||
def handle(self, *addrport, **options):
|
||||
# setup unbuffered I/O
|
||||
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
|
||||
sys.stderr = os.fdopen(sys.stderr.fileno(), 'w', 0)
|
||||
|
||||
if len(addrport) == 0:
|
||||
self.run_one(**options)
|
||||
elif len(addrport) == 1:
|
||||
self.run_one(addrport[0], **options)
|
||||
else:
|
||||
from multiprocessing import Process
|
||||
|
||||
plist = []
|
||||
for ap in addrport:
|
||||
p = Process(target=self.run_one, args=(ap,), kwargs=options)
|
||||
p.start()
|
||||
plist.append(p)
|
||||
|
||||
while plist:
|
||||
if plist[0].exitcode is None:
|
||||
plist.pop(0)
|
||||
else:
|
||||
plist[0].join()
|
||||
|
||||
|
||||
def run_one(self, addrport, **options):
|
||||
import django
|
||||
from django.core.handlers.wsgi import WSGIHandler
|
||||
from tornado import httpserver, wsgi, ioloop, web
|
||||
|
||||
if not addrport:
|
||||
addr = ''
|
||||
port = '8000'
|
||||
else:
|
||||
try:
|
||||
addr, port = addrport.split(':')
|
||||
except ValueError:
|
||||
addr, port = '', addrport
|
||||
if not addr:
|
||||
addr = '127.0.0.1'
|
||||
|
||||
if not port.isdigit():
|
||||
raise CommandError("%r is not a valid port number." % port)
|
||||
|
||||
auto_reload = options.get('auto_reload', False)
|
||||
xheaders = options.get('xheaders', True)
|
||||
no_keep_alive = options.get('no_keep_alive', False)
|
||||
quit_command = 'CTRL-C'
|
||||
|
||||
if settings.DEBUG:
|
||||
import logging
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
def inner_run():
|
||||
from django.conf import settings
|
||||
from django.utils import translation
|
||||
translation.activate(settings.LANGUAGE_CODE)
|
||||
|
||||
print "Validating Django models.py..."
|
||||
self.validate(display_num_errors=True)
|
||||
print "\nDjango version %s" % (django.get_version())
|
||||
print "Tornado server is running at http://%s:%s/" % (addr, port)
|
||||
print "Quit the server with %s." % quit_command
|
||||
|
||||
from tornado.web import FallbackHandler, StaticFileHandler
|
||||
django_app = wsgi.WSGIContainer(WSGIHandler())
|
||||
|
||||
try:
|
||||
# Application is an instance of Django's standard wsgi handler.
|
||||
application = web.Application([(r"/get_updates_longpoll", AsyncDjangoHandler),
|
||||
(r".*", FallbackHandler, dict(fallback=django_app)),
|
||||
])
|
||||
|
||||
# start tornado web server in single-threaded mode
|
||||
http_server = httpserver.HTTPServer(application,
|
||||
xheaders=xheaders,
|
||||
no_keep_alive=no_keep_alive)
|
||||
http_server.listen(int(port), address=addr)
|
||||
|
||||
ioloop.IOLoop.instance().start()
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(0)
|
||||
|
||||
if auto_reload:
|
||||
from tornado import autoreload
|
||||
autoreload.start()
|
||||
|
||||
inner_run()
|
||||
|
||||
#
|
||||
# Modify the base Tornado handler for Django
|
||||
#
|
||||
from threading import Lock
|
||||
from django.core.handlers import base
|
||||
from django.core.urlresolvers import set_script_prefix
|
||||
from django.core import signals
|
||||
|
||||
class AsyncDjangoHandler(tornado.web.RequestHandler, base.BaseHandler):
|
||||
initLock = Lock()
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(AsyncDjangoHandler, self).__init__(*args, **kwargs)
|
||||
|
||||
# Set up middleware if needed. We couldn't do this earlier, because
|
||||
# settings weren't available.
|
||||
self._request_middleware = None
|
||||
self.initLock.acquire()
|
||||
# Check that middleware is still uninitialised.
|
||||
if self._request_middleware is None:
|
||||
self.load_middleware()
|
||||
self.initLock.release()
|
||||
self._auto_finish = False
|
||||
|
||||
def prepare(func):
|
||||
"""Patches the Cookie header in the Tornado request to fulfull
|
||||
Django's strict string-type cookie policy"""
|
||||
def inner_func(self,**kwargs):
|
||||
if u'Cookie' in self.request.headers:
|
||||
raw_cookie = self.request.headers[u'Cookie']
|
||||
if isinstance(raw_cookie, unicode):
|
||||
if hasattr(escape, "native_str"):
|
||||
self.request.headers[u'Cookie'] = escape.native_str(raw_cookie)
|
||||
else:
|
||||
print "Method 'native_str' in module 'escape' not found."
|
||||
self.request.headers[u'Cookie'] = str(raw_cookie)
|
||||
return func(self)
|
||||
return inner_func
|
||||
|
||||
def get(self):
|
||||
from tornado.wsgi import HTTPRequest, WSGIContainer
|
||||
from django.core.handlers.wsgi import WSGIRequest, STATUS_CODE_TEXT
|
||||
import urllib
|
||||
|
||||
environ = WSGIContainer.environ(self.request)
|
||||
environ['PATH_INFO'] = urllib.unquote(environ['PATH_INFO'])
|
||||
request = WSGIRequest(environ)
|
||||
request._tornado_handler = self
|
||||
|
||||
set_script_prefix(base.get_script_name(environ))
|
||||
signals.request_started.send(sender=self.__class__)
|
||||
try:
|
||||
response = self.get_response(request)
|
||||
|
||||
if not response:
|
||||
return
|
||||
|
||||
# Apply response middleware
|
||||
for middleware_method in self._response_middleware:
|
||||
response = middleware_method(request, response)
|
||||
response = self.apply_response_fixes(request, response)
|
||||
finally:
|
||||
signals.request_finished.send(sender=self.__class__)
|
||||
|
||||
status_text = STATUS_CODE_TEXT.get(response.status_code, "UNKNOWN")
|
||||
status = '%s (%s)' % (response.status_code, status_text)
|
||||
|
||||
self.set_status(response.status_code)
|
||||
for h in response.items():
|
||||
self.set_header(h[0], h[1])
|
||||
|
||||
if not hasattr(self, "_new_cookies"):
|
||||
self._new_cookies = []
|
||||
self._new_cookies.append(response.cookies)
|
||||
|
||||
self.write(response.content)
|
||||
self.finish()
|
||||
|
||||
|
||||
def head(self):
|
||||
self.get()
|
||||
|
||||
def post(self):
|
||||
self.get()
|
||||
|
||||
# Based on django.core.handlers.base: get_response
|
||||
def get_response(self, request):
|
||||
"Returns an HttpResponse object for the given HttpRequest"
|
||||
from django import http
|
||||
from django.core import exceptions, urlresolvers
|
||||
from django.conf import settings
|
||||
|
||||
try:
|
||||
try:
|
||||
# Setup default url resolver for this thread.
|
||||
urlconf = settings.ROOT_URLCONF
|
||||
urlresolvers.set_urlconf(urlconf)
|
||||
resolver = urlresolvers.RegexURLResolver(r'^/', urlconf)
|
||||
|
||||
# Apply request middleware
|
||||
for middleware_method in self._request_middleware:
|
||||
response = middleware_method(request)
|
||||
if response:
|
||||
break
|
||||
|
||||
if hasattr(request, "urlconf"):
|
||||
# Reset url resolver with a custom urlconf.
|
||||
urlconf = request.urlconf
|
||||
urlresolvers.set_urlconf(urlconf)
|
||||
resolver = urlresolvers.RegexURLResolver(r'^/', urlconf)
|
||||
|
||||
callback, callback_args, callback_kwargs = resolver.resolve(
|
||||
request.path_info)
|
||||
|
||||
# Apply view middleware
|
||||
for middleware_method in self._view_middleware:
|
||||
response = middleware_method(request, callback, callback_args, callback_kwargs)
|
||||
if response:
|
||||
break
|
||||
|
||||
from ...decorator import TornadoAsyncException
|
||||
|
||||
try:
|
||||
response = callback(request, *callback_args, **callback_kwargs)
|
||||
except TornadoAsyncException, e:
|
||||
# TODO: Maybe add debugging output here
|
||||
return
|
||||
except Exception, e:
|
||||
# If the view raised an exception, run it through exception
|
||||
# middleware, and if the exception middleware returns a
|
||||
# response, use that. Otherwise, reraise the exception.
|
||||
for middleware_method in self._exception_middleware:
|
||||
response = middleware_method(request, e)
|
||||
if response:
|
||||
break
|
||||
if response is None:
|
||||
raise
|
||||
|
||||
if response is None:
|
||||
try:
|
||||
view_name = callback.func_name
|
||||
except AttributeError:
|
||||
view_name = callback.__class__.__name__ + '.__call__'
|
||||
raise ValueError("The view %s.%s returned None." %
|
||||
(callback.__module__, view_name))
|
||||
|
||||
# If the response supports deferred rendering, apply template
|
||||
# response middleware and the render the response
|
||||
if hasattr(response, 'render') and callable(response.render):
|
||||
for middleware_method in self._template_response_middleware:
|
||||
response = middleware_method(request, response)
|
||||
response = response.render()
|
||||
|
||||
|
||||
except http.Http404, e:
|
||||
if settings.DEBUG:
|
||||
from django.views import debug
|
||||
response = debug.technical_404_response(request, e)
|
||||
else:
|
||||
try:
|
||||
callback, param_dict = resolver.resolve404()
|
||||
response = callback(request, **param_dict)
|
||||
except:
|
||||
try:
|
||||
response = self.handle_uncaught_exception(request, resolver, sys.exc_info())
|
||||
finally:
|
||||
receivers = signals.got_request_exception.send(sender=self.__class__, request=request)
|
||||
except exceptions.PermissionDenied:
|
||||
logger.warning(
|
||||
'Forbidden (Permission denied): %s', request.path,
|
||||
extra={
|
||||
'status_code': 403,
|
||||
'request': request
|
||||
})
|
||||
try:
|
||||
callback, param_dict = resolver.resolve403()
|
||||
response = callback(request, **param_dict)
|
||||
except:
|
||||
try:
|
||||
response = self.handle_uncaught_exception(request,
|
||||
resolver, sys.exc_info())
|
||||
finally:
|
||||
signals.got_request_exception.send(
|
||||
sender=self.__class__, request=request)
|
||||
except SystemExit:
|
||||
# See https://code.djangoproject.com/ticket/4701
|
||||
raise
|
||||
except Exception, e:
|
||||
exc_info = sys.exc_info()
|
||||
receivers = signals.got_request_exception.send(sender=self.__class__, request=request)
|
||||
return self.handle_uncaught_exception(request, resolver, exc_info)
|
||||
finally:
|
||||
# Reset urlconf on the way out for isolation
|
||||
urlresolvers.set_urlconf(None)
|
||||
|
||||
try:
|
||||
# Apply response middleware, regardless of the response
|
||||
for middleware_method in self._response_middleware:
|
||||
response = middleware_method(request, response)
|
||||
response = self.apply_response_fixes(request, response)
|
||||
except: # Any exception should be gathered and handled
|
||||
signals.got_request_exception.send(sender=self.__class__, request=request)
|
||||
response = self.handle_uncaught_exception(request, resolver, sys.exc_info())
|
||||
|
||||
return response
|
@@ -16,10 +16,31 @@ def get_display_recipient(recipient):
|
||||
user = User.objects.get(pk=recipient.user_or_class)
|
||||
return user.username
|
||||
|
||||
callback_table = {}
|
||||
|
||||
class UserProfile(models.Model):
|
||||
user = models.OneToOneField(User)
|
||||
pointer = models.IntegerField()
|
||||
|
||||
# The user receives this message
|
||||
def receive(self, message):
|
||||
global callback_table
|
||||
|
||||
# Should also store in permanent database the receipt
|
||||
for cb in callback_table.get(self.user.id, []):
|
||||
cb([message])
|
||||
|
||||
callback_table[self.user.id] = []
|
||||
|
||||
def add_callback(self, cb, last_received):
|
||||
global callback_table
|
||||
|
||||
# This filter should also restrict to the current user's subs
|
||||
new_zephyrs = Zephyr.objects.filter(id__gt=last_received)
|
||||
if new_zephyrs:
|
||||
return cb(new_zephyrs)
|
||||
callback_table.setdefault(self.user.id, []).append(cb)
|
||||
|
||||
def __repr__(self):
|
||||
return "<UserProfile: %s>" % (self.user.username,)
|
||||
|
||||
@@ -55,9 +76,36 @@ class Zephyr(models.Model):
|
||||
display_recipient = get_display_recipient(self.recipient)
|
||||
return "<Zephyr: %s / %s / %r>" % (display_recipient, self.instance, self.sender)
|
||||
|
||||
def send_zephyr(**kwargs):
|
||||
zephyr = kwargs["instance"]
|
||||
if zephyr.recipient.type == "personal":
|
||||
recipients = UserProfile.objects.filter(user=zephyr.recipient.user_or_class)
|
||||
assert(len(recipients) == 1)
|
||||
elif zephyr.recipient.type == "class":
|
||||
recipients = [UserProfile.objects.get(user=s.userprofile_id) for
|
||||
s in Subscription.objects.filter(recipient_id=zephyr.recipient)]
|
||||
else:
|
||||
raise
|
||||
for recipient in recipients:
|
||||
recipient.receive(zephyr)
|
||||
|
||||
post_save.connect(send_zephyr, sender=Zephyr)
|
||||
|
||||
class Subscription(models.Model):
|
||||
userprofile_id = models.ForeignKey(UserProfile)
|
||||
recipient_id = models.ForeignKey(Recipient)
|
||||
|
||||
def __repr__(self):
|
||||
return "<Subscription: %r -> %r>" % (self.userprofile_id, self.recipient_id)
|
||||
|
||||
def filter_by_subscriptions(zephyrs, user):
|
||||
userprofile = UserProfile.objects.get(user=user)
|
||||
subscribed_zephyrs = []
|
||||
subscriptions = [sub.recipient_id for sub in Subscription.objects.filter(userprofile_id=userprofile)]
|
||||
for zephyr in zephyrs:
|
||||
# If you are subscribed to the personal or class, or if you sent the personal, you can see the zephyr.
|
||||
if (zephyr.recipient in subscriptions) or \
|
||||
(zephyr.recipient.type == "personal" and zephyr.sender == userprofile):
|
||||
subscribed_zephyrs.append(zephyr)
|
||||
|
||||
return subscribed_zephyrs
|
||||
|
@@ -197,10 +197,6 @@ function unhide() {
|
||||
$("#narrow_indicator").html("");
|
||||
}
|
||||
|
||||
$(function() {
|
||||
setInterval(get_updates, 1000);
|
||||
});
|
||||
|
||||
function newline2br(content) {
|
||||
return content.replace(/\n/g, '<br />');
|
||||
}
|
||||
@@ -228,10 +224,19 @@ function add_message(index, zephyr) {
|
||||
$("#table tr:last").after(new_str);
|
||||
}
|
||||
|
||||
function get_updates() {
|
||||
function get_updates_longpoll(data) {
|
||||
if (data && data.zephyrs) {
|
||||
$.each(data.zephyrs, add_message);
|
||||
}
|
||||
var last_received = $("tr:last").attr("id");
|
||||
$.post("get_updates", {last_received: last_received},
|
||||
function(data) {
|
||||
$.each(data, add_message);
|
||||
}, "json");
|
||||
$.post("get_updates_longpoll",
|
||||
{last_received: last_received},
|
||||
function(data) {
|
||||
get_updates_longpoll(data);
|
||||
}, "json");
|
||||
}
|
||||
|
||||
$(document).ready(function() {
|
||||
get_updates_longpoll()
|
||||
});
|
||||
|
||||
|
@@ -8,10 +8,12 @@ from django.shortcuts import render
|
||||
from django.utils.timezone import utc
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from zephyr.models import Zephyr, UserProfile, ZephyrClass, Subscription, \
|
||||
Recipient, get_display_recipient
|
||||
from zephyr.models import Zephyr, UserProfile, ZephyrClass, Recipient, get_display_recipient, filter_by_subscriptions
|
||||
from zephyr.forms import RegistrationForm
|
||||
|
||||
import tornado.web
|
||||
from zephyr.decorator import asynchronous
|
||||
|
||||
import datetime
|
||||
import simplejson
|
||||
|
||||
@@ -41,7 +43,7 @@ def home(request):
|
||||
if not request.user.is_authenticated():
|
||||
return HttpResponseRedirect('accounts/home/')
|
||||
|
||||
zephyrs = filter_by_subscription(Zephyr.objects.all(), request.user)
|
||||
zephyrs = filter_by_subscriptions(Zephyr.objects.all(), request.user)
|
||||
for zephyr in zephyrs:
|
||||
zephyr.display_recipient = get_display_recipient(zephyr.recipient)
|
||||
|
||||
@@ -64,37 +66,39 @@ def update(request):
|
||||
user_profile.save()
|
||||
return HttpResponse(simplejson.dumps({}), mimetype='application/json')
|
||||
|
||||
def filter_by_subscription(zephyrs, user):
|
||||
userprofile = UserProfile.objects.get(user=user)
|
||||
subscribed_zephyrs = []
|
||||
subscriptions = [sub.recipient_id for sub in Subscription.objects.filter(userprofile_id=userprofile)]
|
||||
for zephyr in zephyrs:
|
||||
# If you are subscribed to the personal or class, or if you sent the personal, you can see the zephyr.
|
||||
if (zephyr.recipient in subscriptions) or \
|
||||
((zephyr.sender == userprofile) and zephyr.recipient.type == "personal"):
|
||||
subscribed_zephyrs.append(zephyr)
|
||||
|
||||
return subscribed_zephyrs
|
||||
|
||||
def get_updates(request):
|
||||
@asynchronous
|
||||
def get_updates_longpoll(request, handler):
|
||||
if not request.POST:
|
||||
# Do something
|
||||
# TODO: Do something
|
||||
pass
|
||||
|
||||
last_received = request.POST.get('last_received')
|
||||
new_zephyrs = filter_by_subscription(Zephyr.objects.filter(id__gt=last_received),
|
||||
request.user)
|
||||
new_zephyr_list = []
|
||||
for zephyr in new_zephyrs:
|
||||
new_zephyr_list.append({"id": zephyr.id,
|
||||
"sender": zephyr.sender.user.username,
|
||||
"type": zephyr.recipient.type,
|
||||
"display_recipient": get_display_recipient(zephyr.recipient),
|
||||
"instance": zephyr.instance,
|
||||
"content": zephyr.content
|
||||
})
|
||||
if not last_received:
|
||||
# TODO: return error?
|
||||
pass
|
||||
|
||||
return HttpResponse(simplejson.dumps(new_zephyr_list),
|
||||
mimetype='application/json')
|
||||
user = request.user
|
||||
user_profile = UserProfile.objects.get(user=user)
|
||||
|
||||
def on_receive(zephyrs):
|
||||
if handler.request.connection.stream.closed():
|
||||
return
|
||||
new_zephyr_list = []
|
||||
for zephyr in zephyrs:
|
||||
new_zephyr_list.append({"id": zephyr.id,
|
||||
"sender": zephyr.sender.user.username,
|
||||
"display_recipient": get_display_recipient(zephyr.recipient),
|
||||
"type": zephyr.recipient.type,
|
||||
"instance": zephyr.instance,
|
||||
"content": zephyr.content
|
||||
})
|
||||
try:
|
||||
handler.finish({'zephyrs': new_zephyr_list})
|
||||
except socket.error, e:
|
||||
pass
|
||||
|
||||
# We need to replace this abstraction with the message list
|
||||
user_profile.add_callback(handler.async_callback(on_receive), last_received)
|
||||
|
||||
@login_required
|
||||
def personal_zephyr(request):
|
||||
|
Reference in New Issue
Block a user