diff --git a/humbug/urls.py b/humbug/urls.py index a4c1eb9378..8bc4a9e1b2 100644 --- a/humbug/urls.py +++ b/humbug/urls.py @@ -102,8 +102,7 @@ urlpatterns = patterns('', url(r'^robots\.txt$', RedirectView.as_view(url='/static/robots.txt')), # Used internally for communication between Django and Tornado processes - url(r'^notify_new_message$', 'zephyr.tornadoviews.notify_new_message'), - url(r'^notify_pointer_update$', 'zephyr.tornadoviews.notify_pointer_update'), + url(r'^notify_tornado$', 'zephyr.tornadoviews.notify'), ) if not settings.DEPLOYED: diff --git a/zephyr/lib/actions.py b/zephyr/lib/actions.py index abe12139e4..7013f5a2ac 100644 --- a/zephyr/lib/actions.py +++ b/zephyr/lib/actions.py @@ -21,12 +21,13 @@ from zephyr.lib.cache import cache_with_key, user_profile_by_id_cache_key, \ user_profile_by_email_cache_key from zephyr.decorator import get_user_profile_by_email, json_to_list +from zephyr import tornado_callbacks + import subprocess import simplejson import time import traceback import re -import requests import datetime import os import platform @@ -176,13 +177,13 @@ def do_send_message(message, rendered_content=None, no_log=False, message.to_dict(apply_markdown=True, rendered_content=rendered_content) message.to_dict(apply_markdown=False) data = dict( - secret = settings.SHARED_SECRET, + type = 'new_message', message = message.id, - users = simplejson.dumps([str(user.id) for user in recipients])) + users = [user.id for user in recipients]) if message.recipient.type == Recipient.STREAM: # Note: This is where authorization for single-stream # get_updates happens! We only attach stream data to the - # notify_new_message request if it's a public stream, + # notify new_message request if it's a public stream, # ensuring that in the tornado server, non-public stream # messages are only associated to their subscribed users. if stream is None: @@ -190,7 +191,7 @@ def do_send_message(message, rendered_content=None, no_log=False, if stream.is_public(): data['realm_id'] = stream.realm.id data['stream_name'] = stream.name - requests.post(settings.TORNADO_SERVER + '/notify_new_message', data=data) + tornado_callbacks.send_notification(data) def create_stream_if_needed(realm, stream_name, invite_only=False): (stream, created) = Stream.objects.get_or_create( diff --git a/zephyr/management/commands/runtornado.py b/zephyr/management/commands/runtornado.py index 1f940ad458..5f75cbd5bc 100644 --- a/zephyr/management/commands/runtornado.py +++ b/zephyr/management/commands/runtornado.py @@ -134,8 +134,7 @@ class Command(BaseCommand): # Application is an instance of Django's standard wsgi handler. application = web.Application([(r"/json/get_updates", AsyncDjangoHandler), (r"/api/v1/get_messages", AsyncDjangoHandler), - (r"/notify_new_message", AsyncDjangoHandler), - (r"/notify_pointer_update", AsyncDjangoHandler), + (r"/notify_tornado", AsyncDjangoHandler), ], debug=django.conf.settings.DEBUG, # Disable Tornado's own request logging, since we have our own diff --git a/zephyr/tornado_callbacks.py b/zephyr/tornado_callbacks.py index 007bd63a21..51c1fa0ed1 100644 --- a/zephyr/tornado_callbacks.py +++ b/zephyr/tornado_callbacks.py @@ -1,11 +1,15 @@ +from django.conf import settings from zephyr.models import Message, UserProfile, UserMessage, \ Recipient, Stream, get_stream from zephyr.decorator import JsonableError +from zephyr.lib.cache_helpers import cache_get_message import os import sys import logging +import requests +import simplejson import subprocess import collections @@ -221,3 +225,29 @@ def update_pointer(user_profile_id, new_pointer): callbacks_table.call(user_profile_id, Callbacks.TYPE_POINTER_UPDATE, new_pointer=new_pointer, update_types=["pointer_update"]) + +def process_new_message(data): + message = cache_get_message(data['message']) + + for user_profile_id in data['users']: + user_receive_message(user_profile_id, message) + + if 'stream_name' in data: + stream_receive_message(data['realm_id'], data['stream_name'], message) + +def process_notification(data): + if data['type'] == 'new_message': + process_new_message(data) + elif data['type'] == 'pointer_update': + update_pointer(data['user'], data['new_pointer']) + else: + raise JsonableError('bad notification type ' + data['type']) + +# Runs in the Django process to send a notification to Tornado. +# +# We use JSON rather than bare form parameters, so that we can represent +# different types and for compatibility with non-HTTP transports. +def send_notification(data): + requests.post(settings.TORNADO_SERVER + '/notify_tornado', data=dict( + data = simplejson.dumps(data), + secret = settings.SHARED_SECRET)) diff --git a/zephyr/tornadoviews.py b/zephyr/tornadoviews.py index 0c65b60909..9d4fbe14d6 100644 --- a/zephyr/tornadoviews.py +++ b/zephyr/tornadoviews.py @@ -3,14 +3,14 @@ from zephyr.models import UserActivity from zephyr.decorator import asynchronous, authenticated_api_view, \ authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \ - has_request_variables, POST, json_to_list, to_non_negative_int + has_request_variables, POST, to_non_negative_int from zephyr.lib.response import json_success -from zephyr.tornado_callbacks import user_receive_message, stream_receive_message, \ - update_pointer, get_user_pointer, fetch_stream_messages, fetch_user_messages, \ +from zephyr.tornado_callbacks import \ + get_user_pointer, fetch_stream_messages, fetch_user_messages, \ add_stream_receive_callback, add_user_receive_callback, \ - add_pointer_update_callback + add_pointer_update_callback, process_notification from zephyr.lib.cache_helpers import cache_get_message @@ -24,27 +24,8 @@ import logging SERVER_GENERATION = int(time.time()) @internal_notify_view -def notify_new_message(request): - recipient_profile_ids = map(int, json_to_list(request.POST['users'])) - message = cache_get_message(int(request.POST['message'])) - - for user_profile_id in recipient_profile_ids: - user_receive_message(user_profile_id, message) - - if 'stream_name' in request.POST: - realm_id = int(request.POST['realm_id']) - stream_name = request.POST['stream_name'] - stream_receive_message(realm_id, stream_name, message) - - return json_success() - -@internal_notify_view -def notify_pointer_update(request): - user_profile_id = int(request.POST['user']) - new_pointer = int(request.POST['new_pointer']) - - update_pointer(user_profile_id, new_pointer) - +def notify(request): + process_notification(simplejson.loads(request.POST['data'])) return json_success() @asynchronous @@ -131,7 +112,7 @@ def return_messages_immediately(user_profile, client_id, last, # Note: We allow any stream name at all here! Validation and # authorization (is the stream "public") are handled by the caller of -# notify_new_message. If a user makes a get_updates request for a +# notify new_message. If a user makes a get_updates request for a # nonexistent or non-public stream, they won't get an error -- they'll # just never receive any messages. @has_request_variables diff --git a/zephyr/views.py b/zephyr/views.py index 8b378a2398..c977915679 100644 --- a/zephyr/views.py +++ b/zephyr/views.py @@ -41,6 +41,8 @@ from zephyr.lib.response import json_success, json_error from zephyr.lib.timestamp import timestamp_to_datetime, datetime_to_timestamp from zephyr.lib.cache import cache_with_key +from zephyr import tornado_callbacks + from confirmation.models import Confirmation import datetime @@ -470,8 +472,8 @@ def update_pointer_backend(request, user_profile, .update(flags=F('flags') | UserMessage.flags.read) if settings.TORNADO_SERVER: - requests.post(settings.TORNADO_SERVER + '/notify_pointer_update', data=dict( - secret = settings.SHARED_SECRET, + tornado_callbacks.send_notification(dict( + type = 'pointer_update', user = user_profile.id, new_pointer = pointer))