mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	This requires doing a puppet apply on our servers to take effect properly. (imported from commit 19dc56f071f07a5d2571eef49dd835121b2e82b6)
		
			
				
	
	
		
			236 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			236 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import absolute_import
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
from django.utils.timezone import now
 | 
						|
 | 
						|
from zerver.models import Message, UserProfile, \
 | 
						|
    Recipient, get_user_profile_by_id
 | 
						|
 | 
						|
from zerver.decorator import JsonableError
 | 
						|
from zerver.lib.cache import cache_get_many, message_cache_key, \
 | 
						|
    user_profile_by_id_cache_key, cache_save_user_profile
 | 
						|
from zerver.lib.cache_helpers import cache_save_message
 | 
						|
from zerver.lib.queue import queue_json_publish
 | 
						|
from zerver.lib.event_queue import get_client_descriptors_for_user,\
 | 
						|
    get_client_descriptors_for_realm_all_streams
 | 
						|
from zerver.lib.timestamp import timestamp_to_datetime
 | 
						|
 | 
						|
import time
 | 
						|
import logging
 | 
						|
import requests
 | 
						|
import ujson
 | 
						|
import datetime
 | 
						|
 | 
						|
# Send email notifications to idle users
 | 
						|
# after they are idle for 1 hour
 | 
						|
NOTIFY_AFTER_IDLE_HOURS = 1
 | 
						|
 | 
						|
def update_pointer(user_profile_id, new_pointer):
 | 
						|
    event = dict(type='pointer', pointer=new_pointer)
 | 
						|
    for client in get_client_descriptors_for_user(user_profile_id):
 | 
						|
        if client.accepts_event_type(event['type']):
 | 
						|
            client.add_event(event.copy())
 | 
						|
 | 
						|
 | 
						|
def receives_offline_notifications(user_profile):
 | 
						|
    return ((user_profile.enable_offline_email_notifications or
 | 
						|
             user_profile.enable_offline_push_notifications) and
 | 
						|
            not user_profile.is_bot)
 | 
						|
 | 
						|
def receives_offline_notifications_by_id(user_profile_id):
 | 
						|
    user_profile = get_user_profile_by_id(user_profile_id)
 | 
						|
    return receives_offline_notifications(user_profile)
 | 
						|
 | 
						|
def build_offline_notification_event(user_profile_id, message_id):
 | 
						|
    return {"user_profile_id": user_profile_id,
 | 
						|
            "message_id": message_id,
 | 
						|
            "timestamp": time.time()}
 | 
						|
 | 
						|
def missedmessage_hook(user_profile_id, queue, last_for_client):
 | 
						|
    # Only process missedmessage hook when the last queue for a
 | 
						|
    # client has been garbage collected
 | 
						|
    if not last_for_client:
 | 
						|
        return
 | 
						|
 | 
						|
    # If a user has gone offline but has unread messages
 | 
						|
    # received in the idle time, send them a missed
 | 
						|
    # message email
 | 
						|
    if not receives_offline_notifications_by_id(user_profile_id):
 | 
						|
        return
 | 
						|
 | 
						|
    message_ids = []
 | 
						|
    for event in queue.event_queue.contents():
 | 
						|
        if not event['type'] == 'message' or not event['flags']:
 | 
						|
            continue
 | 
						|
 | 
						|
        if 'mentioned' in event['flags'] and not 'read' in event['flags']:
 | 
						|
            message_ids.append(event['message']['id'])
 | 
						|
 | 
						|
    for msg_id in message_ids:
 | 
						|
        event = build_offline_notification_event(user_profile_id, msg_id)
 | 
						|
        queue_json_publish("missedmessage_emails", event, lambda event: None)
 | 
						|
 | 
						|
def cache_load_message_data(message_id, users):
 | 
						|
    # Get everything that we'll need out of memcached in one fetch, to save round-trip times:
 | 
						|
    # * The message itself
 | 
						|
    # * Every recipient's UserProfile
 | 
						|
    user_profile_keys = [user_profile_by_id_cache_key(user_data['id']) for user_data in users]
 | 
						|
 | 
						|
    cache_keys = [message_cache_key(message_id)]
 | 
						|
    cache_keys.extend(user_profile_keys)
 | 
						|
 | 
						|
    # Single memcached fetch
 | 
						|
    result = cache_get_many(cache_keys)
 | 
						|
 | 
						|
    cache_extractor = lambda result: result[0] if result is not None else None
 | 
						|
 | 
						|
    message = cache_extractor(result.get(cache_keys[0], None))
 | 
						|
 | 
						|
    user_profiles = dict((user_data['id'], cache_extractor(result.get(user_profile_by_id_cache_key(user_data['id']), None)))
 | 
						|
                            for user_data in users)
 | 
						|
 | 
						|
    # Any data that was not found in memcached, we have to load from the database
 | 
						|
    # and save back. This should never happen---we take steps to keep recent messages,
 | 
						|
    # all user profile & presence objects in memcached.
 | 
						|
    if message is None:
 | 
						|
        if not settings.TEST_SUITE:
 | 
						|
            logging.warning("Tornado failed to load message from memcached when delivering!")
 | 
						|
 | 
						|
        message = Message.objects.select_related().get(id=message_id)
 | 
						|
        cache_save_message(message)
 | 
						|
 | 
						|
    for user_profile_id, user_profile in user_profiles.iteritems():
 | 
						|
        if user_profile:
 | 
						|
            continue
 | 
						|
        if not settings.TEST_SUITE:
 | 
						|
            logging.warning("Tornado failed to load user profile from memcached when delivering message!")
 | 
						|
 | 
						|
        user_profile = UserProfile.objects.select_related().get(id=user_profile_id)
 | 
						|
        user_profiles[user_profile_id] = user_profile
 | 
						|
        cache_save_user_profile(user_profile)
 | 
						|
 | 
						|
    return message, user_profiles
 | 
						|
 | 
						|
def receiver_is_idle(user_profile, realm_presences):
 | 
						|
    # If a user has no message-receiving event queues, they've got no open zulip
 | 
						|
    # session so we notify them
 | 
						|
    all_client_descriptors = get_client_descriptors_for_user(user_profile.id)
 | 
						|
    message_event_queues = [client for client in all_client_descriptors if client.accepts_event_type('message')]
 | 
						|
    off_zulip = len(message_event_queues) == 0
 | 
						|
 | 
						|
    # It's possible a recipient is not in the realm of a sender. We don't have
 | 
						|
    # presence information in this case (and it's hard to get without an additional
 | 
						|
    # db query) so we simply don't try to guess if this cross-realm recipient
 | 
						|
    # has been idle for too long
 | 
						|
    if realm_presences is None or not user_profile.email in realm_presences:
 | 
						|
        return off_zulip
 | 
						|
 | 
						|
    # If the most recent online status from a user is >1hr in the past, we notify
 | 
						|
    # them regardless of whether or not they have an open window
 | 
						|
    user_presence = realm_presences[user_profile.email]
 | 
						|
    idle_too_long = False
 | 
						|
    newest = None
 | 
						|
    for client, status in user_presence.iteritems():
 | 
						|
        if newest is None or status['timestamp'] > newest['timestamp']:
 | 
						|
            newest = status
 | 
						|
 | 
						|
    update_time = timestamp_to_datetime(newest['timestamp'])
 | 
						|
    if now() - update_time > datetime.timedelta(hours=NOTIFY_AFTER_IDLE_HOURS):
 | 
						|
        idle_too_long = True
 | 
						|
 | 
						|
    return off_zulip or idle_too_long
 | 
						|
 | 
						|
def process_new_message(data):
 | 
						|
    message, user_profiles = cache_load_message_data(data['message'],
 | 
						|
                                                     data['users'])
 | 
						|
 | 
						|
    realm_presences = data['presences']
 | 
						|
 | 
						|
    message_dict_markdown = message.to_dict(True)
 | 
						|
    message_dict_no_markdown = message.to_dict(False)
 | 
						|
 | 
						|
    # To remove duplicate clients: Maps queue ID to (Client, flags)
 | 
						|
    send_to_clients = dict()
 | 
						|
 | 
						|
    if 'stream_name' in data and not data.get("invite_only"):
 | 
						|
        for client in get_client_descriptors_for_realm_all_streams(data['realm_id']):
 | 
						|
            send_to_clients[client.event_queue.id] = (client, None)
 | 
						|
 | 
						|
    for user_data in data['users']:
 | 
						|
        user_profile_id = user_data['id']
 | 
						|
        user_profile = user_profiles[user_data['id']]
 | 
						|
        flags = user_data.get('flags', [])
 | 
						|
 | 
						|
        for client in get_client_descriptors_for_user(user_profile_id):
 | 
						|
            send_to_clients[client.event_queue.id] = (client, flags)
 | 
						|
 | 
						|
        # If the recipient was offline and the message was a single or group PM to him
 | 
						|
        # or she was @-notified potentially notify more immediately
 | 
						|
        received_pm = message.recipient.type in (Recipient.PERSONAL, Recipient.HUDDLE) and \
 | 
						|
                        user_profile_id != message.sender.id
 | 
						|
        mentioned = 'mentioned' in flags
 | 
						|
 | 
						|
        if (received_pm or mentioned) and receiver_is_idle(user_profile, realm_presences):
 | 
						|
            if receives_offline_notifications(user_profile):
 | 
						|
                event = build_offline_notification_event(user_profile_id, message.id)
 | 
						|
 | 
						|
                # We require RabbitMQ to do this, as we can't call the email handler
 | 
						|
                # from the Tornado process. So if there's no rabbitmq support do nothing
 | 
						|
                queue_json_publish("missedmessage_emails", event, lambda event: None)
 | 
						|
 | 
						|
    for client, flags in send_to_clients.itervalues():
 | 
						|
        if not client.accepts_event_type('message'):
 | 
						|
            continue
 | 
						|
 | 
						|
        # The below prevents (Zephyr) mirroring loops.
 | 
						|
        if ('mirror' in message.sending_client.name and
 | 
						|
            message.sending_client == client.client_type):
 | 
						|
            continue
 | 
						|
 | 
						|
        if client.apply_markdown:
 | 
						|
            message_dict = message_dict_markdown
 | 
						|
        else:
 | 
						|
            message_dict = message_dict_no_markdown
 | 
						|
 | 
						|
        # Make sure Zephyr mirroring bots know whether stream is invite-only
 | 
						|
        if "mirror" in client.client_type.name and data.get("invite_only"):
 | 
						|
            message_dict = message_dict.copy()
 | 
						|
            message_dict["invite_only_stream"] = True
 | 
						|
 | 
						|
        event = dict(type='message', message=message_dict, flags=flags)
 | 
						|
        client.add_event(event)
 | 
						|
 | 
						|
def process_event(data):
 | 
						|
    event = data['event']
 | 
						|
    for user_profile_id in data['users']:
 | 
						|
        for client in get_client_descriptors_for_user(user_profile_id):
 | 
						|
            if client.accepts_event_type(event['type']):
 | 
						|
                client.add_event(event.copy())
 | 
						|
 | 
						|
def process_notification(data):
 | 
						|
    if 'type' not in data:
 | 
						|
        # Generic event that doesn't need special handling
 | 
						|
        process_event(data)
 | 
						|
    elif data['type'] == 'new_message':
 | 
						|
        process_new_message(data)
 | 
						|
    elif data['type'] == 'pointer_update':
 | 
						|
        update_pointer(data['user'], data['new_pointer'])
 | 
						|
    else:
 | 
						|
        raise JsonableError('bad notification type ' + data['type'])
 | 
						|
 | 
						|
# Runs in the Django process to send a notification to Tornado.
 | 
						|
#
 | 
						|
# We use JSON rather than bare form parameters, so that we can represent
 | 
						|
# different types and for compatibility with non-HTTP transports.
 | 
						|
 | 
						|
def send_notification_http(data):
 | 
						|
    if settings.TORNADO_SERVER and not settings.RUNNING_INSIDE_TORNADO:
 | 
						|
        requests.post(settings.TORNADO_SERVER + '/notify_tornado', data=dict(
 | 
						|
                data   = ujson.dumps(data),
 | 
						|
                secret = settings.SHARED_SECRET))
 | 
						|
    else:
 | 
						|
        process_notification(data)
 | 
						|
 | 
						|
def send_notification(data):
 | 
						|
    return queue_json_publish("notify_tornado", data, send_notification_http)
 |