diff --git a/zephyr/management/commands/runtornado.py b/zephyr/management/commands/runtornado.py index 5f75cbd5bc..8f100a824f 100644 --- a/zephyr/management/commands/runtornado.py +++ b/zephyr/management/commands/runtornado.py @@ -11,6 +11,7 @@ from tornado import ioloop from zephyr.lib.debug import interactive_debug_listen from zephyr.lib.response import json_response from zephyr.lib.queue import TornadoQueueClient +from zephyr import tornado_callbacks # A hack to keep track of how much time we spend working, versus sleeping in # the event loop. @@ -124,11 +125,14 @@ class Command(BaseCommand): print "Quit the server with %s." % quit_command if settings.USING_RABBITMQ: + # Process notifications received via RabbitMQ queue_client = TornadoQueueClient() while not queue_client.ready(): step_tornado_ioloop() - # FIXME: Register consumer callbacks here + def process_notification(chan, method, props, data): + tornado_callbacks.process_notification(data) + queue_client.register_json_consumer('notify_tornado', process_notification) try: # Application is an instance of Django's standard wsgi handler. diff --git a/zephyr/tornado_callbacks.py b/zephyr/tornado_callbacks.py index 51c1fa0ed1..43b1c743a0 100644 --- a/zephyr/tornado_callbacks.py +++ b/zephyr/tornado_callbacks.py @@ -4,6 +4,7 @@ from zephyr.models import Message, UserProfile, UserMessage, \ from zephyr.decorator import JsonableError from zephyr.lib.cache_helpers import cache_get_message +from zephyr.lib.queue import SimpleQueueClient import os import sys @@ -247,7 +248,17 @@ def process_notification(data): # # We use JSON rather than bare form parameters, so that we can represent # different types and for compatibility with non-HTTP transports. -def send_notification(data): + +def send_notification_http(data): requests.post(settings.TORNADO_SERVER + '/notify_tornado', data=dict( data = simplejson.dumps(data), secret = settings.SHARED_SECRET)) + +def send_notification_rabbitmq(data): + notification_queue.json_publish('notify_tornado', data) + +if settings.USING_RABBITMQ: + notification_queue = SimpleQueueClient() + send_notification = send_notification_rabbitmq +else: + send_notification = send_notification_http