Send message data to Tornado via rabbitmq instead of memcached.

This is a lot simpler and eliminates a possible failure mode in the
data transfer path.

(imported from commit 19308d2715bbd12dc9385234f1d9156f91bdfae0)
This commit is contained in:
Tim Abbott
2014-01-23 16:26:41 -05:00
parent fd3fc095b5
commit 3bcba79e77
2 changed files with 12 additions and 19 deletions

View File

@@ -358,8 +358,6 @@ def do_send_messages(messages):
# Render Markdown etc. here and store (automatically) in # Render Markdown etc. here and store (automatically) in
# memcached, so that the single-threaded Tornado server # memcached, so that the single-threaded Tornado server
# doesn't have to. # doesn't have to.
message['message'].to_dict(apply_markdown=True)
message['message'].to_dict(apply_markdown=False)
user_flags = user_message_flags.get(message['message'].id, {}) user_flags = user_message_flags.get(message['message'].id, {})
sender = message['message'].sender sender = message['message'].sender
user_presences = get_status_dict(sender) user_presences = get_status_dict(sender)
@@ -371,6 +369,8 @@ def do_send_messages(messages):
data = dict( data = dict(
type = 'new_message', type = 'new_message',
message = message['message'].id, message = message['message'].id,
message_dict_markdown = message['message'].to_dict(apply_markdown=True),
message_dict_no_markdown = message['message'].to_dict(apply_markdown=False),
presences = presences, presences = presences,
users = [{'id': user.id, users = [{'id': user.id,
'flags': user_flags.get(user.id, []), 'flags': user_flags.get(user.id, []),

View File

@@ -64,12 +64,6 @@ def missedmessage_hook(user_profile_id, queue, last_for_client):
if notify_info.get('send_email', False): if notify_info.get('send_email', False):
queue_json_publish("missedmessage_}emails", event, lambda event: None) queue_json_publish("missedmessage_}emails", event, lambda event: None)
@cache_with_key(message_cache_key, timeout=3600*24)
def get_message_by_id_dbwarn(message_id):
if not settings.TEST_SUITE:
logging.warning("Tornado failed to load message from memcached when delivering!")
return Message.objects.select_related().get(id=message_id)
def receiver_is_idle(user_profile_id, realm_presences): def receiver_is_idle(user_profile_id, realm_presences):
# If a user has no message-receiving event queues, they've got no open zulip # If a user has no message-receiving event queues, they've got no open zulip
# session so we notify them # session so we notify them
@@ -100,13 +94,14 @@ def receiver_is_idle(user_profile_id, realm_presences):
return off_zulip or idle_too_long return off_zulip or idle_too_long
def process_new_message(data): def process_new_message(data):
message = get_message_by_id_dbwarn(data['message'])
realm_presences = data['presences'] realm_presences = data['presences']
sender_queue_id = data.get('sender_queue_id', None) sender_queue_id = data.get('sender_queue_id', None)
message_dict_markdown = data['message_dict_markdown']
message_dict_markdown = message.to_dict(True) message_dict_no_markdown = data['message_dict_no_markdown']
message_dict_no_markdown = message.to_dict(False) sender_id = message_dict_markdown['sender_id']
message_id = message_dict_markdown['id']
message_type = message_dict_markdown['type']
sending_client = message_dict_markdown['client']
# To remove duplicate clients: Maps queue ID to {'client': Client, 'flags': flags} # To remove duplicate clients: Maps queue ID to {'client': Client, 'flags': flags}
send_to_clients = dict() send_to_clients = dict()
@@ -131,14 +126,12 @@ def process_new_message(data):
# If the recipient was offline and the message was a single or group PM to him # If the recipient was offline and the message was a single or group PM to him
# or she was @-notified potentially notify more immediately # or she was @-notified potentially notify more immediately
received_pm = message.recipient.type in (Recipient.PERSONAL, Recipient.HUDDLE) and \ received_pm = message_type == "private" and user_profile_id != sender_id
user_profile_id != message.sender.id
mentioned = 'mentioned' in flags mentioned = 'mentioned' in flags
idle = receiver_is_idle(user_profile_id, realm_presences) idle = receiver_is_idle(user_profile_id, realm_presences)
always_push_notify = user_data.get('always_push_notify', False) always_push_notify = user_data.get('always_push_notify', False)
if (received_pm or mentioned) and (idle or always_push_notify): if (received_pm or mentioned) and (idle or always_push_notify):
event = build_offline_notification_event(user_profile_id, message.id) event = build_offline_notification_event(user_profile_id, message_id)
queue_json_publish("missedmessage_mobile_notifications", event, lambda event: None) queue_json_publish("missedmessage_mobile_notifications", event, lambda event: None)
notified = dict(push_notified=True) notified = dict(push_notified=True)
# Don't send missed message emails if always_push_notify is True # Don't send missed message emails if always_push_notify is True
@@ -185,8 +178,8 @@ def process_new_message(data):
continue continue
# The below prevents (Zephyr) mirroring loops. # The below prevents (Zephyr) mirroring loops.
if ('mirror' in message.sending_client.name and if ('mirror' in sending_client and
message.sending_client == client.client_type): sending_client.lower() == client.client_type.name.lower()):
continue continue
client.add_event(event) client.add_event(event)