diff --git a/humbug/urls.py b/humbug/urls.py index ba5b3cb91e..5bf36f89c3 100644 --- a/humbug/urls.py +++ b/humbug/urls.py @@ -9,7 +9,7 @@ import os.path urlpatterns = patterns('', url(r'^$', 'zephyr.views.home', name='home'), url(r'^update$', 'zephyr.views.update', name='update'), - url(r'^get_updates$', 'zephyr.views.get_updates', name='get_updates'), + url(r'^get_updates_longpoll$', 'zephyr.views.get_updates_longpoll', name='get_updates_longpoll'), url(r'^zephyr/', 'zephyr.views.zephyr', name='zephyr'), url(r'^personal-zephyr/', 'zephyr.views.personal_zephyr', name='personal_zephyr'), url(r'^accounts/home/', 'zephyr.views.accounts_home', name='accounts_home'), @@ -17,7 +17,7 @@ urlpatterns = patterns('', url(r'^accounts/logout/', 'django.contrib.auth.views.logout', {'template_name': 'zephyr/index.html'}), url(r'^accounts/register/', 'zephyr.views.register', name='register'), url(r'^static/(?P.*)$', 'django.views.static.serve', - {'document_root': os.path.join(settings.SITE_ROOT, 'static/')}) + {'document_root': os.path.join(settings.SITE_ROOT, '..', 'zephyr', 'static/')}) # Uncomment the admin/doc line below to enable admin documentation: # url(r'^admin/doc/', include('django.contrib.admindocs.urls')), diff --git a/zephyr/decorator.py b/zephyr/decorator.py new file mode 100644 index 0000000000..e800535ed9 --- /dev/null +++ b/zephyr/decorator.py @@ -0,0 +1,22 @@ +import tornado.web +import types + +class TornadoAsyncException(Exception): pass + +class _DefGen_Return(BaseException): + def __init__(self, value): + self.value = value + +def returnResponse(value): + raise _DefGen_Return(value) + +def asynchronous(method): + def wrapper(request, *args, **kwargs): + try: + v = method(request, request._tornado_handler, *args, **kwargs) + if v == None or type(v) == types.GeneratorType: + raise TornadoAsyncException + except _DefGen_Return, e: + request._tornado_handler.finish(e.value.content) + return v + return wrapper diff --git a/zephyr/management/commands/runtornado.py b/zephyr/management/commands/runtornado.py new file mode 100644 index 0000000000..6248afb8a1 --- /dev/null +++ b/zephyr/management/commands/runtornado.py @@ -0,0 +1,317 @@ +from django.core.management.base import BaseCommand, CommandError +from optparse import make_option +from django.conf import settings +import os +import sys +import tornado.web + +class Command(BaseCommand): + option_list = BaseCommand.option_list + ( + make_option('--noreload', action='store_false', + dest='auto_reload', default=True, + help="Configures tornado to not auto-reload (for prod use)."), + make_option('--nokeepalive', action='store_true', + dest='no_keep_alive', default=False, + help="Tells Tornado to NOT keep alive http connections."), + make_option('--noxheaders', action='store_false', + dest='xheaders', default=True, + help="Tells Tornado to NOT override remote IP with X-Real-IP."), + ) + help = "Starts a Tornado Web server wrapping Django." + args = '[optional port number or ipaddr:port]\n (use multiple ports to start multiple servers)' + + def handle(self, *addrport, **options): + # setup unbuffered I/O + sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) + sys.stderr = os.fdopen(sys.stderr.fileno(), 'w', 0) + + if len(addrport) == 0: + self.run_one(**options) + elif len(addrport) == 1: + self.run_one(addrport[0], **options) + else: + from multiprocessing import Process + + plist = [] + for ap in addrport: + p = Process(target=self.run_one, args=(ap,), kwargs=options) + p.start() + plist.append(p) + + while plist: + if plist[0].exitcode is None: + plist.pop(0) + else: + plist[0].join() + + + def run_one(self, addrport, **options): + import django + from django.core.handlers.wsgi import WSGIHandler + from tornado import httpserver, wsgi, ioloop, web + + if not addrport: + addr = '' + port = '8000' + else: + try: + addr, port = addrport.split(':') + except ValueError: + addr, port = '', addrport + if not addr: + addr = '127.0.0.1' + + if not port.isdigit(): + raise CommandError("%r is not a valid port number." % port) + + auto_reload = options.get('auto_reload', False) + xheaders = options.get('xheaders', True) + no_keep_alive = options.get('no_keep_alive', False) + quit_command = 'CTRL-C' + + if settings.DEBUG: + import logging + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + def inner_run(): + from django.conf import settings + from django.utils import translation + translation.activate(settings.LANGUAGE_CODE) + + print "Validating Django models.py..." + self.validate(display_num_errors=True) + print "\nDjango version %s" % (django.get_version()) + print "Tornado server is running at http://%s:%s/" % (addr, port) + print "Quit the server with %s." % quit_command + + from tornado.web import FallbackHandler, StaticFileHandler + django_app = wsgi.WSGIContainer(WSGIHandler()) + + try: + # Application is an instance of Django's standard wsgi handler. + application = web.Application([(r"/get_updates_longpoll", AsyncDjangoHandler), + (r".*", FallbackHandler, dict(fallback=django_app)), + ]) + + # start tornado web server in single-threaded mode + http_server = httpserver.HTTPServer(application, + xheaders=xheaders, + no_keep_alive=no_keep_alive) + http_server.listen(int(port), address=addr) + + ioloop.IOLoop.instance().start() + except KeyboardInterrupt: + sys.exit(0) + + if auto_reload: + from tornado import autoreload + autoreload.start() + + inner_run() + +# +# Modify the base Tornado handler for Django +# +from threading import Lock +from django.core.handlers import base +from django.core.urlresolvers import set_script_prefix +from django.core import signals + +class AsyncDjangoHandler(tornado.web.RequestHandler, base.BaseHandler): + initLock = Lock() + + def __init__(self, *args, **kwargs): + super(AsyncDjangoHandler, self).__init__(*args, **kwargs) + + # Set up middleware if needed. We couldn't do this earlier, because + # settings weren't available. + self._request_middleware = None + self.initLock.acquire() + # Check that middleware is still uninitialised. + if self._request_middleware is None: + self.load_middleware() + self.initLock.release() + self._auto_finish = False + + def prepare(func): + """Patches the Cookie header in the Tornado request to fulfull + Django's strict string-type cookie policy""" + def inner_func(self,**kwargs): + if u'Cookie' in self.request.headers: + raw_cookie = self.request.headers[u'Cookie'] + if isinstance(raw_cookie, unicode): + if hasattr(escape, "native_str"): + self.request.headers[u'Cookie'] = escape.native_str(raw_cookie) + else: + print "Method 'native_str' in module 'escape' not found." + self.request.headers[u'Cookie'] = str(raw_cookie) + return func(self) + return inner_func + + def get(self): + from tornado.wsgi import HTTPRequest, WSGIContainer + from django.core.handlers.wsgi import WSGIRequest, STATUS_CODE_TEXT + import urllib + + environ = WSGIContainer.environ(self.request) + environ['PATH_INFO'] = urllib.unquote(environ['PATH_INFO']) + request = WSGIRequest(environ) + request._tornado_handler = self + + set_script_prefix(base.get_script_name(environ)) + signals.request_started.send(sender=self.__class__) + try: + response = self.get_response(request) + + if not response: + return + + # Apply response middleware + for middleware_method in self._response_middleware: + response = middleware_method(request, response) + response = self.apply_response_fixes(request, response) + finally: + signals.request_finished.send(sender=self.__class__) + + status_text = STATUS_CODE_TEXT.get(response.status_code, "UNKNOWN") + status = '%s (%s)' % (response.status_code, status_text) + + self.set_status(response.status_code) + for h in response.items(): + self.set_header(h[0], h[1]) + + if not hasattr(self, "_new_cookies"): + self._new_cookies = [] + self._new_cookies.append(response.cookies) + + self.write(response.content) + self.finish() + + + def head(self): + self.get() + + def post(self): + self.get() + + # Based on django.core.handlers.base: get_response + def get_response(self, request): + "Returns an HttpResponse object for the given HttpRequest" + from django import http + from django.core import exceptions, urlresolvers + from django.conf import settings + + try: + try: + # Setup default url resolver for this thread. + urlconf = settings.ROOT_URLCONF + urlresolvers.set_urlconf(urlconf) + resolver = urlresolvers.RegexURLResolver(r'^/', urlconf) + + # Apply request middleware + for middleware_method in self._request_middleware: + response = middleware_method(request) + if response: + break + + if hasattr(request, "urlconf"): + # Reset url resolver with a custom urlconf. + urlconf = request.urlconf + urlresolvers.set_urlconf(urlconf) + resolver = urlresolvers.RegexURLResolver(r'^/', urlconf) + + callback, callback_args, callback_kwargs = resolver.resolve( + request.path_info) + + # Apply view middleware + for middleware_method in self._view_middleware: + response = middleware_method(request, callback, callback_args, callback_kwargs) + if response: + break + + from ...decorator import TornadoAsyncException + + try: + response = callback(request, *callback_args, **callback_kwargs) + except TornadoAsyncException, e: + # TODO: Maybe add debugging output here + return + except Exception, e: + # If the view raised an exception, run it through exception + # middleware, and if the exception middleware returns a + # response, use that. Otherwise, reraise the exception. + for middleware_method in self._exception_middleware: + response = middleware_method(request, e) + if response: + break + if response is None: + raise + + if response is None: + try: + view_name = callback.func_name + except AttributeError: + view_name = callback.__class__.__name__ + '.__call__' + raise ValueError("The view %s.%s returned None." % + (callback.__module__, view_name)) + + # If the response supports deferred rendering, apply template + # response middleware and the render the response + if hasattr(response, 'render') and callable(response.render): + for middleware_method in self._template_response_middleware: + response = middleware_method(request, response) + response = response.render() + + + except http.Http404, e: + if settings.DEBUG: + from django.views import debug + response = debug.technical_404_response(request, e) + else: + try: + callback, param_dict = resolver.resolve404() + response = callback(request, **param_dict) + except: + try: + response = self.handle_uncaught_exception(request, resolver, sys.exc_info()) + finally: + receivers = signals.got_request_exception.send(sender=self.__class__, request=request) + except exceptions.PermissionDenied: + logger.warning( + 'Forbidden (Permission denied): %s', request.path, + extra={ + 'status_code': 403, + 'request': request + }) + try: + callback, param_dict = resolver.resolve403() + response = callback(request, **param_dict) + except: + try: + response = self.handle_uncaught_exception(request, + resolver, sys.exc_info()) + finally: + signals.got_request_exception.send( + sender=self.__class__, request=request) + except SystemExit: + # See https://code.djangoproject.com/ticket/4701 + raise + except Exception, e: + exc_info = sys.exc_info() + receivers = signals.got_request_exception.send(sender=self.__class__, request=request) + return self.handle_uncaught_exception(request, resolver, exc_info) + finally: + # Reset urlconf on the way out for isolation + urlresolvers.set_urlconf(None) + + try: + # Apply response middleware, regardless of the response + for middleware_method in self._response_middleware: + response = middleware_method(request, response) + response = self.apply_response_fixes(request, response) + except: # Any exception should be gathered and handled + signals.got_request_exception.send(sender=self.__class__, request=request) + response = self.handle_uncaught_exception(request, resolver, sys.exc_info()) + + return response diff --git a/zephyr/models.py b/zephyr/models.py index 9bfccc03a6..a6c13d06ce 100644 --- a/zephyr/models.py +++ b/zephyr/models.py @@ -16,10 +16,31 @@ def get_display_recipient(recipient): user = User.objects.get(pk=recipient.user_or_class) return user.username +callback_table = {} + class UserProfile(models.Model): user = models.OneToOneField(User) pointer = models.IntegerField() + # The user receives this message + def receive(self, message): + global callback_table + + # Should also store in permanent database the receipt + for cb in callback_table.get(self.user.id, []): + cb([message]) + + callback_table[self.user.id] = [] + + def add_callback(self, cb, last_received): + global callback_table + + # This filter should also restrict to the current user's subs + new_zephyrs = Zephyr.objects.filter(id__gt=last_received) + if new_zephyrs: + return cb(new_zephyrs) + callback_table.setdefault(self.user.id, []).append(cb) + def __repr__(self): return "" % (self.user.username,) @@ -55,9 +76,36 @@ class Zephyr(models.Model): display_recipient = get_display_recipient(self.recipient) return "" % (display_recipient, self.instance, self.sender) +def send_zephyr(**kwargs): + zephyr = kwargs["instance"] + if zephyr.recipient.type == "personal": + recipients = UserProfile.objects.filter(user=zephyr.recipient.user_or_class) + assert(len(recipients) == 1) + elif zephyr.recipient.type == "class": + recipients = [UserProfile.objects.get(user=s.userprofile_id) for + s in Subscription.objects.filter(recipient_id=zephyr.recipient)] + else: + raise + for recipient in recipients: + recipient.receive(zephyr) + +post_save.connect(send_zephyr, sender=Zephyr) + class Subscription(models.Model): userprofile_id = models.ForeignKey(UserProfile) recipient_id = models.ForeignKey(Recipient) def __repr__(self): return " %r>" % (self.userprofile_id, self.recipient_id) + +def filter_by_subscriptions(zephyrs, user): + userprofile = UserProfile.objects.get(user=user) + subscribed_zephyrs = [] + subscriptions = [sub.recipient_id for sub in Subscription.objects.filter(userprofile_id=userprofile)] + for zephyr in zephyrs: + # If you are subscribed to the personal or class, or if you sent the personal, you can see the zephyr. + if (zephyr.recipient in subscriptions) or \ + (zephyr.recipient.type == "personal" and zephyr.sender == userprofile): + subscribed_zephyrs.append(zephyr) + + return subscribed_zephyrs diff --git a/zephyr/static/js/zephyr.js b/zephyr/static/js/zephyr.js index 6be1d0bafc..2b171ee8c2 100644 --- a/zephyr/static/js/zephyr.js +++ b/zephyr/static/js/zephyr.js @@ -197,10 +197,6 @@ function unhide() { $("#narrow_indicator").html(""); } -$(function() { - setInterval(get_updates, 1000); -}); - function newline2br(content) { return content.replace(/\n/g, '
'); } @@ -228,10 +224,19 @@ function add_message(index, zephyr) { $("#table tr:last").after(new_str); } -function get_updates() { +function get_updates_longpoll(data) { + if (data && data.zephyrs) { + $.each(data.zephyrs, add_message); + } var last_received = $("tr:last").attr("id"); - $.post("get_updates", {last_received: last_received}, - function(data) { - $.each(data, add_message); - }, "json"); + $.post("get_updates_longpoll", + {last_received: last_received}, + function(data) { + get_updates_longpoll(data); + }, "json"); } + +$(document).ready(function() { + get_updates_longpoll() +}); + diff --git a/zephyr/views.py b/zephyr/views.py index 06343d1422..9871dcc7e9 100644 --- a/zephyr/views.py +++ b/zephyr/views.py @@ -8,10 +8,12 @@ from django.shortcuts import render from django.utils.timezone import utc from django.contrib.auth.models import User -from zephyr.models import Zephyr, UserProfile, ZephyrClass, Subscription, \ - Recipient, get_display_recipient +from zephyr.models import Zephyr, UserProfile, ZephyrClass, Recipient, get_display_recipient, filter_by_subscriptions from zephyr.forms import RegistrationForm +import tornado.web +from zephyr.decorator import asynchronous + import datetime import simplejson @@ -41,7 +43,7 @@ def home(request): if not request.user.is_authenticated(): return HttpResponseRedirect('accounts/home/') - zephyrs = filter_by_subscription(Zephyr.objects.all(), request.user) + zephyrs = filter_by_subscriptions(Zephyr.objects.all(), request.user) for zephyr in zephyrs: zephyr.display_recipient = get_display_recipient(zephyr.recipient) @@ -64,37 +66,39 @@ def update(request): user_profile.save() return HttpResponse(simplejson.dumps({}), mimetype='application/json') -def filter_by_subscription(zephyrs, user): - userprofile = UserProfile.objects.get(user=user) - subscribed_zephyrs = [] - subscriptions = [sub.recipient_id for sub in Subscription.objects.filter(userprofile_id=userprofile)] - for zephyr in zephyrs: - # If you are subscribed to the personal or class, or if you sent the personal, you can see the zephyr. - if (zephyr.recipient in subscriptions) or \ - ((zephyr.sender == userprofile) and zephyr.recipient.type == "personal"): - subscribed_zephyrs.append(zephyr) - - return subscribed_zephyrs - -def get_updates(request): +@asynchronous +def get_updates_longpoll(request, handler): if not request.POST: - # Do something + # TODO: Do something pass + last_received = request.POST.get('last_received') - new_zephyrs = filter_by_subscription(Zephyr.objects.filter(id__gt=last_received), - request.user) - new_zephyr_list = [] - for zephyr in new_zephyrs: - new_zephyr_list.append({"id": zephyr.id, - "sender": zephyr.sender.user.username, - "type": zephyr.recipient.type, - "display_recipient": get_display_recipient(zephyr.recipient), - "instance": zephyr.instance, - "content": zephyr.content - }) + if not last_received: + # TODO: return error? + pass - return HttpResponse(simplejson.dumps(new_zephyr_list), - mimetype='application/json') + user = request.user + user_profile = UserProfile.objects.get(user=user) + + def on_receive(zephyrs): + if handler.request.connection.stream.closed(): + return + new_zephyr_list = [] + for zephyr in zephyrs: + new_zephyr_list.append({"id": zephyr.id, + "sender": zephyr.sender.user.username, + "display_recipient": get_display_recipient(zephyr.recipient), + "type": zephyr.recipient.type, + "instance": zephyr.instance, + "content": zephyr.content + }) + try: + handler.finish({'zephyrs': new_zephyr_list}) + except socket.error, e: + pass + + # We need to replace this abstraction with the message list + user_profile.add_callback(handler.async_callback(on_receive), last_received) @login_required def personal_zephyr(request):