mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	The tests in GetUpdatesTest had some callback logic that has been dead code for at least three months. We now fully exercise the callback codepath and make sure that the callbacks do happen. (imported from commit f5d8fbab28ecc34dc81d3d0c29058b66c10f378f)
		
			
				
	
	
		
			360 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			360 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import absolute_import
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
from zephyr.models import Message, UserProfile, UserMessage, \
 | 
						|
    Recipient, Stream, get_stream, get_user_profile_by_id
 | 
						|
 | 
						|
from zephyr.decorator import JsonableError
 | 
						|
from zephyr.lib.cache_helpers import cache_get_message
 | 
						|
from zephyr.lib.queue import queue_json_publish
 | 
						|
from zephyr.lib.event_queue import get_client_descriptors_for_user
 | 
						|
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import logging
 | 
						|
import requests
 | 
						|
import ujson
 | 
						|
import subprocess
 | 
						|
import collections
 | 
						|
from django.db import connection
 | 
						|
 | 
						|
class Callbacks(object):
 | 
						|
    # A user received a message. The key is user_profile.id.
 | 
						|
    TYPE_USER_RECEIVE = 0
 | 
						|
 | 
						|
    # A stream received a message. The key is a tuple
 | 
						|
    #   (realm_id, lowercased stream name).
 | 
						|
    # See comment attached to the global stream_messages for why.
 | 
						|
    # Callers of this callback need to be careful to provide
 | 
						|
    # a lowercased stream name.
 | 
						|
    TYPE_STREAM_RECEIVE = 1
 | 
						|
 | 
						|
    # A user's pointer was updated. The key is user_profile.id.
 | 
						|
    TYPE_POINTER_UPDATE = 2
 | 
						|
 | 
						|
    TYPE_MAX = 3
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self.table = {}
 | 
						|
 | 
						|
    def add(self, key, cb_type, callback):
 | 
						|
        if not self.table.has_key(key):
 | 
						|
            self.create_key(key)
 | 
						|
        self.table[key][cb_type].append(callback)
 | 
						|
 | 
						|
    def call(self, key, cb_type, **kwargs):
 | 
						|
        if not self.table.has_key(key):
 | 
						|
            self.create_key(key)
 | 
						|
 | 
						|
        for cb in self.table[key][cb_type]:
 | 
						|
            cb(**kwargs)
 | 
						|
 | 
						|
        self.table[key][cb_type] = []
 | 
						|
 | 
						|
    def create_key(self, key):
 | 
						|
        self.table[key] = [[] for i in range(0, Callbacks.TYPE_MAX)]
 | 
						|
 | 
						|
callbacks_table = Callbacks()
 | 
						|
 | 
						|
def add_user_receive_callback(user_profile, cb):
 | 
						|
    callbacks_table.add(user_profile.id, Callbacks.TYPE_USER_RECEIVE, cb)
 | 
						|
 | 
						|
def add_stream_receive_callback(realm_id, stream_name, cb):
 | 
						|
    callbacks_table.add((realm_id, stream_name.lower()), Callbacks.TYPE_STREAM_RECEIVE, cb)
 | 
						|
 | 
						|
def add_pointer_update_callback(user_profile, cb):
 | 
						|
    callbacks_table.add(user_profile.id, Callbacks.TYPE_POINTER_UPDATE, cb)
 | 
						|
 | 
						|
# in-process caching mechanism for tracking usermessages
 | 
						|
#
 | 
						|
# user table:   Map user_profile_id => [deque of message ids he received]
 | 
						|
#
 | 
						|
# We don't use all the features of a deque -- the important ones are:
 | 
						|
# * O(1) insert of new highest message id
 | 
						|
# * O(k) read of highest k message ids
 | 
						|
# * Automatic maximum size support.
 | 
						|
#
 | 
						|
# stream table: Map (realm_id, lowercased stream name) => [deque of message ids it received]
 | 
						|
#
 | 
						|
# Why don't we index by the stream_id? Because the client will make a
 | 
						|
# request that specifies a particular realm and stream name, and since
 | 
						|
# we're running within tornado, we don't want to have to do a database
 | 
						|
# lookup to find the matching entry in this table.
 | 
						|
 | 
						|
mtables = {
 | 
						|
    'user': {},
 | 
						|
    'stream': {},
 | 
						|
}
 | 
						|
 | 
						|
USERMESSAGE_CACHE_COUNT = 25000
 | 
						|
STREAMMESSAGE_CACHE_COUNT = 5000
 | 
						|
cache_minimum_id = sys.maxint
 | 
						|
def initialize_user_messages():
 | 
						|
    global cache_minimum_id
 | 
						|
    try:
 | 
						|
        cache_minimum_id = Message.objects.all().order_by("-id")[0].id - USERMESSAGE_CACHE_COUNT
 | 
						|
    except Message.DoesNotExist:
 | 
						|
        cache_minimum_id = 1
 | 
						|
 | 
						|
    # These next few lines implement the following Django ORM
 | 
						|
    # algorithm using raw SQL:
 | 
						|
    ## for um in UserMessage.objects.filter(message_id__gte=cache_minimum_id).order_by("message"):
 | 
						|
    ##     add_user_message(um.user_profile_id, um.message_id)
 | 
						|
    # We do this because marshalling the Django objects is very
 | 
						|
    # inefficient; total time consumed with the raw SQL is about
 | 
						|
    # 600ms, vs. 3000ms-5000ms if we go through the ORM.
 | 
						|
    cursor = connection.cursor()
 | 
						|
    cursor.execute("SELECT user_profile_id, message_id from zephyr_usermessage " +
 | 
						|
                   "where message_id >= %s order by message_id", [cache_minimum_id])
 | 
						|
    for row in cursor.fetchall():
 | 
						|
        (user_profile_id, message_id) = row
 | 
						|
        add_user_message(user_profile_id, message_id)
 | 
						|
 | 
						|
    streams = {}
 | 
						|
    for stream in Stream.objects.select_related().all():
 | 
						|
        streams[stream.id] = stream
 | 
						|
    for m in (Message.objects.only("id", "recipient").select_related("recipient")
 | 
						|
              .filter(id__gte=cache_minimum_id + (USERMESSAGE_CACHE_COUNT - STREAMMESSAGE_CACHE_COUNT),
 | 
						|
                      recipient__type=Recipient.STREAM).order_by("id")):
 | 
						|
        stream = streams[m.recipient.type_id]
 | 
						|
        add_stream_message(stream.realm.id, stream.name, m.id)
 | 
						|
 | 
						|
    if not settings.DEPLOYED and not settings.TEST_SUITE:
 | 
						|
        # Filling the memcached cache is a little slow, so do it in a child process.
 | 
						|
        # For DEPLOYED cases, we run this from restart_server.
 | 
						|
        subprocess.Popen(["python", os.path.join(os.path.dirname(__file__), "..", "manage.py"),
 | 
						|
                          "fill_memcached_caches"])
 | 
						|
 | 
						|
def add_user_message(user_profile_id, message_id):
 | 
						|
    add_table_message("user", user_profile_id, message_id)
 | 
						|
 | 
						|
def add_stream_message(realm_id, stream_name, message_id):
 | 
						|
    add_table_message("stream", (realm_id, stream_name.lower()), message_id)
 | 
						|
 | 
						|
def add_table_message(table, key, message_id):
 | 
						|
    if cache_minimum_id == sys.maxint:
 | 
						|
        initialize_user_messages()
 | 
						|
    mtables[table].setdefault(key, collections.deque(maxlen=400))
 | 
						|
    mtables[table][key].appendleft(message_id)
 | 
						|
 | 
						|
def fetch_user_messages(user_profile_id, last):
 | 
						|
    return fetch_table_messages("user", user_profile_id, last)
 | 
						|
 | 
						|
def fetch_stream_messages(realm_id, stream_name, last):
 | 
						|
    return fetch_table_messages("stream", (realm_id, stream_name.lower()), last)
 | 
						|
 | 
						|
def fetch_table_messages(table, key, last):
 | 
						|
    if cache_minimum_id == sys.maxint:
 | 
						|
        initialize_user_messages()
 | 
						|
 | 
						|
    # We need to initialize the deque here for any new users or
 | 
						|
    # streams that were created since Tornado was started
 | 
						|
    mtables[table].setdefault(key, collections.deque(maxlen=400))
 | 
						|
 | 
						|
    # We need to do this check after initialize_user_messages has been called.
 | 
						|
    if len(mtables[table][key]) == 0:
 | 
						|
        # Since the request contains a value of "last", we can assume
 | 
						|
        # that the relevant user or stream has actually received a
 | 
						|
        # message, which means that mtabes[table][key] will not remain
 | 
						|
        # empty after the below completes.
 | 
						|
        #
 | 
						|
        # Thus, we will run this code at most once per key (user or
 | 
						|
        # stream that is being lurked on).  Further, we only do this
 | 
						|
        # query for those keys that have not received a message since
 | 
						|
        # cache_minimum_id.  So we can afford to do a database query
 | 
						|
        # from Tornado in this case.
 | 
						|
        if table == "user":
 | 
						|
            logging.info("tornado: Doing database query for user %d" % (key,),)
 | 
						|
            for um in reversed(UserMessage.objects.filter(user_profile_id=key).order_by('-message')[:400]):
 | 
						|
                add_user_message(um.user_profile_id, um.message_id)
 | 
						|
        elif table == "stream":
 | 
						|
            logging.info("tornado: Doing database query for stream %s" % (key,))
 | 
						|
            (realm_id, stream_name) = key
 | 
						|
            stream = get_stream(stream_name, realm_id)
 | 
						|
            # If a buggy client submits a "last" value with a nonexistent stream,
 | 
						|
            # do nothing (and proceed to longpoll) rather than crashing.
 | 
						|
            if stream is not None:
 | 
						|
                recipient = Recipient.objects.get(type=Recipient.STREAM, type_id=stream.id)
 | 
						|
                for m in Message.objects.only("id", "recipient").filter(recipient=recipient).order_by("id")[:400]:
 | 
						|
                    add_stream_message(realm_id, stream_name, m.id)
 | 
						|
 | 
						|
    if len(mtables[table][key]) == 0:
 | 
						|
        # Check the our assumption above that there are messages here.
 | 
						|
        # If false, this may just mean a misbehaving client submitted
 | 
						|
        # "last" even though it has no messages (in which case we
 | 
						|
        # should proceed with longpolling by falling through).  But it
 | 
						|
        # could also be a server bug, so we log a warning.
 | 
						|
        logging.warning("Unexpected empty message queue for key %s!" % (key,))
 | 
						|
    elif last < mtables[table][key][-1]:
 | 
						|
        # The user's client has a way-too-old value for 'last'
 | 
						|
        # (presumably 400 messages old), we should return an error
 | 
						|
 | 
						|
        # The error handler for get_updates in zephyr.js parses this
 | 
						|
        # message. If you change this message, you must update that
 | 
						|
        # error handler.
 | 
						|
        raise JsonableError("last value of %d too old!  Minimum valid is %d!" %
 | 
						|
                            (last, mtables[table][key][-1]))
 | 
						|
 | 
						|
    message_list = []
 | 
						|
    for message_id in mtables[table][key]:
 | 
						|
        if message_id <= last:
 | 
						|
            return reversed(message_list)
 | 
						|
        message_list.append(message_id)
 | 
						|
    return []
 | 
						|
 | 
						|
# The user receives this message
 | 
						|
def user_receive_message(user_profile_id, message):
 | 
						|
    add_user_message(user_profile_id, message.id)
 | 
						|
    callbacks_table.call(user_profile_id, Callbacks.TYPE_USER_RECEIVE,
 | 
						|
                         messages=[message], update_types=["new_messages"])
 | 
						|
 | 
						|
# The stream receives this message
 | 
						|
def stream_receive_message(realm_id, stream_name, message):
 | 
						|
    add_stream_message(realm_id, stream_name, message.id)
 | 
						|
    callbacks_table.call((realm_id, stream_name.lower()),
 | 
						|
                         Callbacks.TYPE_STREAM_RECEIVE,
 | 
						|
                         messages=[message], update_types=["new_messages"])
 | 
						|
 | 
						|
# Simple caching implementation module for user pointers
 | 
						|
#
 | 
						|
# TODO: Write something generic in cache.py to support this
 | 
						|
# functionality?  The current primitives there don't support storing
 | 
						|
# to the cache.
 | 
						|
user_pointers = {}
 | 
						|
def get_user_pointer(user_profile_id):
 | 
						|
    if user_pointers == {}:
 | 
						|
        # Once, on startup, fill in the user_pointers table with
 | 
						|
        # everyone's current pointers
 | 
						|
        for u in UserProfile.objects.all():
 | 
						|
            user_pointers[u.id] = u.pointer
 | 
						|
    if user_profile_id not in user_pointers:
 | 
						|
        # This is a new user created since Tornado was started, so
 | 
						|
        # they will have an initial pointer of -1.
 | 
						|
        return -1
 | 
						|
    return user_pointers[user_profile_id]
 | 
						|
 | 
						|
def set_user_pointer(user_profile_id, pointer):
 | 
						|
    user_pointers[user_profile_id] = pointer
 | 
						|
 | 
						|
def update_pointer(user_profile_id, new_pointer):
 | 
						|
    set_user_pointer(user_profile_id, new_pointer)
 | 
						|
    callbacks_table.call(user_profile_id, Callbacks.TYPE_POINTER_UPDATE,
 | 
						|
                         new_pointer=new_pointer,
 | 
						|
                         update_types=["pointer_update"])
 | 
						|
 | 
						|
    event = dict(type='pointer', pointer=new_pointer)
 | 
						|
    for client in get_client_descriptors_for_user(user_profile_id):
 | 
						|
        if client.accepts_event_type(event['type']):
 | 
						|
            client.add_event(event.copy())
 | 
						|
 | 
						|
 | 
						|
def receives_offline_notifications(user_profile_id):
 | 
						|
    user_profile = get_user_profile_by_id(user_profile_id)
 | 
						|
    return (user_profile.enable_offline_email_notifications and
 | 
						|
            not user_profile.is_bot)
 | 
						|
 | 
						|
def build_offline_notification_event(user_profile_id, message_id):
 | 
						|
    return {"user_profile_id": user_profile_id,
 | 
						|
            "message_id": message_id,
 | 
						|
            "timestamp": time.time()}
 | 
						|
 | 
						|
def missedmessage_hook(user_profile_id, queue, last_for_client):
 | 
						|
    # Only process missedmessage hook when the last queue for a
 | 
						|
    # client has been garbage collected
 | 
						|
    if not last_for_client:
 | 
						|
        return
 | 
						|
 | 
						|
    # If a user has gone offline but has unread messages
 | 
						|
    # received in the idle time, send them a missed
 | 
						|
    # message email
 | 
						|
    if not receives_offline_notifications(user_profile_id):
 | 
						|
        return
 | 
						|
 | 
						|
    message_ids = []
 | 
						|
    for event in queue.event_queue.contents():
 | 
						|
        if not event['type'] == 'message' or not event['flags']:
 | 
						|
            continue
 | 
						|
 | 
						|
        if 'mentioned' in event['flags'] and not 'read' in event['flags']:
 | 
						|
            message_ids.append(event['message']['id'])
 | 
						|
 | 
						|
    for msg_id in message_ids:
 | 
						|
        event = build_offline_notification_event(user_profile_id, msg_id)
 | 
						|
        queue_json_publish("missedmessage_emails", event, lambda event: None)
 | 
						|
 | 
						|
def process_new_message(data):
 | 
						|
    message = cache_get_message(data['message'])
 | 
						|
 | 
						|
    message_dict_markdown = message.to_dict(True)
 | 
						|
    message_dict_no_markdown = message.to_dict(False)
 | 
						|
 | 
						|
    for user_data in data['users']:
 | 
						|
        user_profile_id = user_data['id']
 | 
						|
        flags = user_data.get('flags', [])
 | 
						|
 | 
						|
        user_receive_message(user_profile_id, message)
 | 
						|
 | 
						|
        for client in get_client_descriptors_for_user(user_profile_id):
 | 
						|
            # The below prevents (Zephyr) mirroring loops.
 | 
						|
            if client.accepts_event_type('message') and not \
 | 
						|
                    ('mirror' in message.sending_client.name and
 | 
						|
                     message.sending_client == client.client_type):
 | 
						|
                if client.apply_markdown:
 | 
						|
                    message_dict = message_dict_markdown
 | 
						|
                else:
 | 
						|
                    message_dict = message_dict_no_markdown
 | 
						|
                event = dict(type='message', message=message_dict, flags=flags)
 | 
						|
                client.add_event(event)
 | 
						|
 | 
						|
        # If the recipient was offline and the message was a single or group PM to him
 | 
						|
        # or she was @-notified potentially notify more immediately
 | 
						|
        received_pm = message.recipient.type in (Recipient.PERSONAL, Recipient.HUDDLE) and \
 | 
						|
                        user_profile_id != message.sender.id
 | 
						|
        mentioned = 'mentioned' in flags
 | 
						|
        idle = len(get_client_descriptors_for_user(user_profile_id)) == 0
 | 
						|
        if (received_pm or mentioned) and idle:
 | 
						|
            if receives_offline_notifications(user_profile_id):
 | 
						|
                event = build_offline_notification_event(user_profile_id, message.id)
 | 
						|
 | 
						|
                # We require RabbitMQ to do this, as we can't call the email handler
 | 
						|
                # from the Tornado process. So if there's no rabbitmq support do nothing
 | 
						|
                queue_json_publish("missedmessage_emails", event, lambda event: None)
 | 
						|
 | 
						|
    if 'stream_name' in data:
 | 
						|
        stream_receive_message(data['realm_id'], data['stream_name'], message)
 | 
						|
 | 
						|
def process_event(data):
 | 
						|
    event = data['event']
 | 
						|
    for user_profile_id in data['users']:
 | 
						|
        for client in get_client_descriptors_for_user(user_profile_id):
 | 
						|
            if client.accepts_event_type(event['type']):
 | 
						|
                client.add_event(event.copy())
 | 
						|
 | 
						|
def process_notification(data):
 | 
						|
    if 'type' not in data:
 | 
						|
        # Generic event that doesn't need special handling
 | 
						|
        process_event(data)
 | 
						|
    elif data['type'] == 'new_message':
 | 
						|
        process_new_message(data)
 | 
						|
    elif data['type'] == 'pointer_update':
 | 
						|
        update_pointer(data['user'], data['new_pointer'])
 | 
						|
    else:
 | 
						|
        raise JsonableError('bad notification type ' + data['type'])
 | 
						|
 | 
						|
# Runs in the Django process to send a notification to Tornado.
 | 
						|
#
 | 
						|
# We use JSON rather than bare form parameters, so that we can represent
 | 
						|
# different types and for compatibility with non-HTTP transports.
 | 
						|
 | 
						|
def send_notification_http(data):
 | 
						|
    if settings.TORNADO_SERVER:
 | 
						|
        requests.post(settings.TORNADO_SERVER + '/notify_tornado', data=dict(
 | 
						|
                data   = ujson.dumps(data),
 | 
						|
                secret = settings.SHARED_SECRET))
 | 
						|
    else:
 | 
						|
        process_notification(data)
 | 
						|
 | 
						|
def send_notification(data):
 | 
						|
    return queue_json_publish("notify_tornado", data, send_notification_http)
 |