From c346abe6e23e761696b49b68d75ea220c98dae5b Mon Sep 17 00:00:00 2001 From: Leo Franchi Date: Fri, 22 Mar 2013 15:55:40 -0400 Subject: [PATCH] Send Tornado callback notifications via RabbitMQ (imported from commit 83814d8c6b78fe725aedea9d98fb588ed31123e7) --- zephyr/management/commands/runtornado.py | 6 +++++- zephyr/tornado_callbacks.py | 13 ++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/zephyr/management/commands/runtornado.py b/zephyr/management/commands/runtornado.py index 5f75cbd5bc..8f100a824f 100644 --- a/zephyr/management/commands/runtornado.py +++ b/zephyr/management/commands/runtornado.py @@ -11,6 +11,7 @@ from tornado import ioloop from zephyr.lib.debug import interactive_debug_listen from zephyr.lib.response import json_response from zephyr.lib.queue import TornadoQueueClient +from zephyr import tornado_callbacks # A hack to keep track of how much time we spend working, versus sleeping in # the event loop. @@ -124,11 +125,14 @@ class Command(BaseCommand): print "Quit the server with %s." % quit_command if settings.USING_RABBITMQ: + # Process notifications received via RabbitMQ queue_client = TornadoQueueClient() while not queue_client.ready(): step_tornado_ioloop() - # FIXME: Register consumer callbacks here + def process_notification(chan, method, props, data): + tornado_callbacks.process_notification(data) + queue_client.register_json_consumer('notify_tornado', process_notification) try: # Application is an instance of Django's standard wsgi handler. diff --git a/zephyr/tornado_callbacks.py b/zephyr/tornado_callbacks.py index 51c1fa0ed1..43b1c743a0 100644 --- a/zephyr/tornado_callbacks.py +++ b/zephyr/tornado_callbacks.py @@ -4,6 +4,7 @@ from zephyr.models import Message, UserProfile, UserMessage, \ from zephyr.decorator import JsonableError from zephyr.lib.cache_helpers import cache_get_message +from zephyr.lib.queue import SimpleQueueClient import os import sys @@ -247,7 +248,17 @@ def process_notification(data): # # We use JSON rather than bare form parameters, so that we can represent # different types and for compatibility with non-HTTP transports. -def send_notification(data): + +def send_notification_http(data): requests.post(settings.TORNADO_SERVER + '/notify_tornado', data=dict( data = simplejson.dumps(data), secret = settings.SHARED_SECRET)) + +def send_notification_rabbitmq(data): + notification_queue.json_publish('notify_tornado', data) + +if settings.USING_RABBITMQ: + notification_queue = SimpleQueueClient() + send_notification = send_notification_rabbitmq +else: + send_notification = send_notification_http