Handle Tornado callback notify in a more generic, less HTTP-centric way

(imported from commit 85a74d1b40461236c4c95ad688e9796ab50f0bbf)
This commit is contained in:
Keegan McAllister
2013-01-23 17:24:44 -05:00
committed by Leo Franchi
parent 8db92fd995
commit 23b8833ec5
6 changed files with 49 additions and 37 deletions

View File

@@ -102,8 +102,7 @@ urlpatterns = patterns('',
url(r'^robots\.txt$', RedirectView.as_view(url='/static/robots.txt')), url(r'^robots\.txt$', RedirectView.as_view(url='/static/robots.txt')),
# Used internally for communication between Django and Tornado processes # Used internally for communication between Django and Tornado processes
url(r'^notify_new_message$', 'zephyr.tornadoviews.notify_new_message'), url(r'^notify_tornado$', 'zephyr.tornadoviews.notify'),
url(r'^notify_pointer_update$', 'zephyr.tornadoviews.notify_pointer_update'),
) )
if not settings.DEPLOYED: if not settings.DEPLOYED:

View File

@@ -21,12 +21,13 @@ from zephyr.lib.cache import cache_with_key, user_profile_by_id_cache_key, \
user_profile_by_email_cache_key user_profile_by_email_cache_key
from zephyr.decorator import get_user_profile_by_email, json_to_list from zephyr.decorator import get_user_profile_by_email, json_to_list
from zephyr import tornado_callbacks
import subprocess import subprocess
import simplejson import simplejson
import time import time
import traceback import traceback
import re import re
import requests
import datetime import datetime
import os import os
import platform import platform
@@ -176,13 +177,13 @@ def do_send_message(message, rendered_content=None, no_log=False,
message.to_dict(apply_markdown=True, rendered_content=rendered_content) message.to_dict(apply_markdown=True, rendered_content=rendered_content)
message.to_dict(apply_markdown=False) message.to_dict(apply_markdown=False)
data = dict( data = dict(
secret = settings.SHARED_SECRET, type = 'new_message',
message = message.id, message = message.id,
users = simplejson.dumps([str(user.id) for user in recipients])) users = [user.id for user in recipients])
if message.recipient.type == Recipient.STREAM: if message.recipient.type == Recipient.STREAM:
# Note: This is where authorization for single-stream # Note: This is where authorization for single-stream
# get_updates happens! We only attach stream data to the # get_updates happens! We only attach stream data to the
# notify_new_message request if it's a public stream, # notify new_message request if it's a public stream,
# ensuring that in the tornado server, non-public stream # ensuring that in the tornado server, non-public stream
# messages are only associated to their subscribed users. # messages are only associated to their subscribed users.
if stream is None: if stream is None:
@@ -190,7 +191,7 @@ def do_send_message(message, rendered_content=None, no_log=False,
if stream.is_public(): if stream.is_public():
data['realm_id'] = stream.realm.id data['realm_id'] = stream.realm.id
data['stream_name'] = stream.name data['stream_name'] = stream.name
requests.post(settings.TORNADO_SERVER + '/notify_new_message', data=data) tornado_callbacks.send_notification(data)
def create_stream_if_needed(realm, stream_name, invite_only=False): def create_stream_if_needed(realm, stream_name, invite_only=False):
(stream, created) = Stream.objects.get_or_create( (stream, created) = Stream.objects.get_or_create(

View File

@@ -134,8 +134,7 @@ class Command(BaseCommand):
# Application is an instance of Django's standard wsgi handler. # Application is an instance of Django's standard wsgi handler.
application = web.Application([(r"/json/get_updates", AsyncDjangoHandler), application = web.Application([(r"/json/get_updates", AsyncDjangoHandler),
(r"/api/v1/get_messages", AsyncDjangoHandler), (r"/api/v1/get_messages", AsyncDjangoHandler),
(r"/notify_new_message", AsyncDjangoHandler), (r"/notify_tornado", AsyncDjangoHandler),
(r"/notify_pointer_update", AsyncDjangoHandler),
], debug=django.conf.settings.DEBUG, ], debug=django.conf.settings.DEBUG,
# Disable Tornado's own request logging, since we have our own # Disable Tornado's own request logging, since we have our own

View File

@@ -1,11 +1,15 @@
from django.conf import settings
from zephyr.models import Message, UserProfile, UserMessage, \ from zephyr.models import Message, UserProfile, UserMessage, \
Recipient, Stream, get_stream Recipient, Stream, get_stream
from zephyr.decorator import JsonableError from zephyr.decorator import JsonableError
from zephyr.lib.cache_helpers import cache_get_message
import os import os
import sys import sys
import logging import logging
import requests
import simplejson
import subprocess import subprocess
import collections import collections
@@ -221,3 +225,29 @@ def update_pointer(user_profile_id, new_pointer):
callbacks_table.call(user_profile_id, Callbacks.TYPE_POINTER_UPDATE, callbacks_table.call(user_profile_id, Callbacks.TYPE_POINTER_UPDATE,
new_pointer=new_pointer, new_pointer=new_pointer,
update_types=["pointer_update"]) update_types=["pointer_update"])
def process_new_message(data):
message = cache_get_message(data['message'])
for user_profile_id in data['users']:
user_receive_message(user_profile_id, message)
if 'stream_name' in data:
stream_receive_message(data['realm_id'], data['stream_name'], message)
def process_notification(data):
if data['type'] == 'new_message':
process_new_message(data)
elif data['type'] == 'pointer_update':
update_pointer(data['user'], data['new_pointer'])
else:
raise JsonableError('bad notification type ' + data['type'])
# Runs in the Django process to send a notification to Tornado.
#
# We use JSON rather than bare form parameters, so that we can represent
# different types and for compatibility with non-HTTP transports.
def send_notification(data):
requests.post(settings.TORNADO_SERVER + '/notify_tornado', data=dict(
data = simplejson.dumps(data),
secret = settings.SHARED_SECRET))

View File

@@ -3,14 +3,14 @@ from zephyr.models import UserActivity
from zephyr.decorator import asynchronous, authenticated_api_view, \ from zephyr.decorator import asynchronous, authenticated_api_view, \
authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \ authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \
has_request_variables, POST, json_to_list, to_non_negative_int has_request_variables, POST, to_non_negative_int
from zephyr.lib.response import json_success from zephyr.lib.response import json_success
from zephyr.tornado_callbacks import user_receive_message, stream_receive_message, \ from zephyr.tornado_callbacks import \
update_pointer, get_user_pointer, fetch_stream_messages, fetch_user_messages, \ get_user_pointer, fetch_stream_messages, fetch_user_messages, \
add_stream_receive_callback, add_user_receive_callback, \ add_stream_receive_callback, add_user_receive_callback, \
add_pointer_update_callback add_pointer_update_callback, process_notification
from zephyr.lib.cache_helpers import cache_get_message from zephyr.lib.cache_helpers import cache_get_message
@@ -24,27 +24,8 @@ import logging
SERVER_GENERATION = int(time.time()) SERVER_GENERATION = int(time.time())
@internal_notify_view @internal_notify_view
def notify_new_message(request): def notify(request):
recipient_profile_ids = map(int, json_to_list(request.POST['users'])) process_notification(simplejson.loads(request.POST['data']))
message = cache_get_message(int(request.POST['message']))
for user_profile_id in recipient_profile_ids:
user_receive_message(user_profile_id, message)
if 'stream_name' in request.POST:
realm_id = int(request.POST['realm_id'])
stream_name = request.POST['stream_name']
stream_receive_message(realm_id, stream_name, message)
return json_success()
@internal_notify_view
def notify_pointer_update(request):
user_profile_id = int(request.POST['user'])
new_pointer = int(request.POST['new_pointer'])
update_pointer(user_profile_id, new_pointer)
return json_success() return json_success()
@asynchronous @asynchronous
@@ -131,7 +112,7 @@ def return_messages_immediately(user_profile, client_id, last,
# Note: We allow any stream name at all here! Validation and # Note: We allow any stream name at all here! Validation and
# authorization (is the stream "public") are handled by the caller of # authorization (is the stream "public") are handled by the caller of
# notify_new_message. If a user makes a get_updates request for a # notify new_message. If a user makes a get_updates request for a
# nonexistent or non-public stream, they won't get an error -- they'll # nonexistent or non-public stream, they won't get an error -- they'll
# just never receive any messages. # just never receive any messages.
@has_request_variables @has_request_variables

View File

@@ -41,6 +41,8 @@ from zephyr.lib.response import json_success, json_error
from zephyr.lib.timestamp import timestamp_to_datetime, datetime_to_timestamp from zephyr.lib.timestamp import timestamp_to_datetime, datetime_to_timestamp
from zephyr.lib.cache import cache_with_key from zephyr.lib.cache import cache_with_key
from zephyr import tornado_callbacks
from confirmation.models import Confirmation from confirmation.models import Confirmation
import datetime import datetime
@@ -470,8 +472,8 @@ def update_pointer_backend(request, user_profile,
.update(flags=F('flags') | UserMessage.flags.read) .update(flags=F('flags') | UserMessage.flags.read)
if settings.TORNADO_SERVER: if settings.TORNADO_SERVER:
requests.post(settings.TORNADO_SERVER + '/notify_pointer_update', data=dict( tornado_callbacks.send_notification(dict(
secret = settings.SHARED_SECRET, type = 'pointer_update',
user = user_profile.id, user = user_profile.id,
new_pointer = pointer)) new_pointer = pointer))