mirror of
https://github.com/zulip/zulip.git
synced 2025-11-10 00:46:03 +00:00
Previously, if last was None, we wouldn't check dont_block, server_generation, or any of the other reasons that get_updates might return immediately, and just unconditionally entered longpolling mode. In the process, this reorders return_messages_immediately to have fewer cases and thus be easier to read. (imported from commit 67803b8bfc7d9c9c1a4d6916eb2fb62664fb35a9)
219 lines
9.2 KiB
Python
219 lines
9.2 KiB
Python
from zephyr.models import Message, UserProfile, UserMessage, UserActivity
|
|
|
|
from zephyr.decorator import asynchronous, authenticated_api_view, \
|
|
authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \
|
|
has_request_variables, POST, json_to_list, to_non_negative_int
|
|
from zephyr.lib.response import json_success, json_error
|
|
|
|
import datetime
|
|
import simplejson
|
|
import socket
|
|
import time
|
|
|
|
SERVER_GENERATION = int(time.time())
|
|
|
|
@internal_notify_view
|
|
def notify_new_message(request):
|
|
# If a message for some reason has no recipients (e.g. it is sent
|
|
# by a bot to a stream that nobody is subscribed to), just skip
|
|
# the message gracefully
|
|
if request.POST["users"] == "":
|
|
return json_success()
|
|
|
|
# FIXME: better query
|
|
users = [UserProfile.objects.get(id=user)
|
|
for user in json_to_list(request.POST['users'])]
|
|
message = Message.objects.get(id=request.POST['message'])
|
|
|
|
# Cause message.to_dict() to return the dicts already rendered in the other process.
|
|
#
|
|
# We decode this JSON only to eventually re-encode it as JSON.
|
|
# This isn't trivial to fix, because we do access some fields in the meantime
|
|
# (see send_with_safety_check). It's probably not a big deal.
|
|
message.precomputed_dicts = simplejson.loads(request.POST['rendered'])
|
|
|
|
for user in users:
|
|
user.receive(message)
|
|
|
|
return json_success()
|
|
|
|
@internal_notify_view
|
|
def notify_pointer_update(request):
|
|
# FIXME: better query
|
|
user_profile = UserProfile.objects.get(id=request.POST['user'])
|
|
new_pointer = int(request.POST['new_pointer'])
|
|
pointer_updater = request.POST['pointer_updater']
|
|
|
|
user_profile.update_pointer(new_pointer, pointer_updater)
|
|
|
|
return json_success()
|
|
|
|
@asynchronous
|
|
@authenticated_json_post_view
|
|
def json_get_updates(request, user_profile, handler):
|
|
client_id = request.session.session_key
|
|
return get_updates_backend(request, user_profile, handler, client_id,
|
|
client=request._client, apply_markdown=True)
|
|
|
|
@asynchronous
|
|
@authenticated_api_view
|
|
@has_request_variables
|
|
def api_get_messages(request, user_profile, handler, client_id=POST(default=None),
|
|
apply_markdown=POST(default=False, converter=simplejson.loads)):
|
|
return get_updates_backend(request, user_profile, handler, client_id,
|
|
apply_markdown=apply_markdown,
|
|
client=request._client)
|
|
|
|
def format_updates_response(messages=[], apply_markdown=True,
|
|
user_profile=None, new_pointer=None,
|
|
client=None, update_types=[],
|
|
client_server_generation=None):
|
|
if client is not None and client.name.endswith("_mirror"):
|
|
messages = [m for m in messages if m.sending_client.name != client.name]
|
|
ret = {'messages': [message.to_dict(apply_markdown) for message in messages],
|
|
"result": "success",
|
|
"msg": "",
|
|
'update_types': update_types}
|
|
if client_server_generation is not None:
|
|
ret['server_generation'] = SERVER_GENERATION
|
|
if new_pointer is not None:
|
|
ret['new_pointer'] = new_pointer
|
|
if user_profile.realm.domain == "mit.edu":
|
|
try:
|
|
activity = UserActivity.objects.get(user_profile = user_profile,
|
|
query="/api/v1/get_messages",
|
|
client__name="zephyr_mirror")
|
|
ret['zephyr_mirror_active'] = \
|
|
(activity.last_visit.replace(tzinfo=None) >
|
|
datetime.datetime.utcnow() - datetime.timedelta(minutes=5))
|
|
except UserActivity.DoesNotExist:
|
|
ret['zephyr_mirror_active'] = False
|
|
|
|
return ret
|
|
|
|
def return_messages_immediately(user_profile, client_id, last,
|
|
client_server_generation,
|
|
client_pointer, dont_block, **kwargs):
|
|
update_types = []
|
|
new_pointer = None
|
|
if dont_block:
|
|
update_types.append("nonblocking_request")
|
|
|
|
if (client_server_generation is not None and
|
|
client_server_generation != SERVER_GENERATION):
|
|
update_types.append("client_reload")
|
|
|
|
ptr = user_profile.pointer
|
|
if (client_pointer is not None and ptr > client_pointer):
|
|
new_pointer = ptr
|
|
update_types.append("pointer_update")
|
|
|
|
if last is not None:
|
|
query = Message.objects.select_related().filter(
|
|
usermessage__user_profile = user_profile).order_by('id')
|
|
messages = query.filter(id__gt=last)[:400]
|
|
|
|
# Filter for mirroring before checking whether there are any
|
|
# messages to pass on. If we don't do this, when the only message
|
|
# to forward is one that was sent via the mirroring, the API
|
|
# client will end up in an endless loop requesting more data from
|
|
# us.
|
|
if "client" in kwargs and kwargs["client"].name.endswith("_mirror"):
|
|
messages = [m for m in messages if
|
|
m.sending_client.name != kwargs["client"].name]
|
|
else: # last is None, so we're not interested in any old messages
|
|
messages = []
|
|
|
|
if messages:
|
|
update_types.append("new_messages")
|
|
|
|
if update_types:
|
|
return format_updates_response(messages=messages,
|
|
user_profile=user_profile,
|
|
new_pointer=new_pointer,
|
|
client_server_generation=client_server_generation,
|
|
update_types=update_types,
|
|
**kwargs)
|
|
|
|
return None
|
|
|
|
def send_with_safety_check(response, handler, apply_markdown=True, **kwargs):
|
|
# Make sure that Markdown rendering really happened, if requested.
|
|
# This is a security issue because it's where we escape HTML.
|
|
# c.f. ticket #64
|
|
#
|
|
# apply_markdown=True is the fail-safe default.
|
|
if response['result'] == 'success' and apply_markdown:
|
|
for msg in response['messages']:
|
|
if msg['content_type'] != 'text/html':
|
|
handler.set_status(500)
|
|
handler.finish('Internal error: bad message format')
|
|
return
|
|
if response['result'] == 'error':
|
|
handler.set_status(400)
|
|
handler.finish(response)
|
|
|
|
@has_request_variables
|
|
def get_updates_backend(request, user_profile, handler, client_id,
|
|
last = POST(converter=to_non_negative_int, default=None),
|
|
client_server_generation = POST(whence='server_generation', default=None,
|
|
converter=int),
|
|
client_pointer = POST(whence='pointer', converter=int, default=None),
|
|
dont_block = POST(converter=simplejson.loads, default=False),
|
|
**kwargs):
|
|
resp = return_messages_immediately(user_profile, client_id, last,
|
|
client_server_generation,
|
|
client_pointer,
|
|
dont_block, **kwargs)
|
|
if resp is not None:
|
|
send_with_safety_check(resp, handler, **kwargs)
|
|
|
|
# We have already invoked handler.finish(), so we bypass the usual view
|
|
# response path. We are "responding asynchronously" except that it
|
|
# already happened. This is slightly weird, but lets us share
|
|
# send_with_safety_check with the code below.
|
|
return RespondAsynchronously
|
|
|
|
# Enter long-polling mode.
|
|
#
|
|
# Instead of responding to the client right away, leave our connection open
|
|
# and return to the Tornado main loop. One of the notify_* views will
|
|
# eventually invoke one of these callbacks, which will send the delayed
|
|
# response.
|
|
|
|
def cb(**cb_kwargs):
|
|
if handler.request.connection.stream.closed():
|
|
return
|
|
try:
|
|
# It would be nice to be able to do these checks in
|
|
# UserProfile.receive, but it doesn't know what the value
|
|
# of "last" was for each callback.
|
|
if last is not None and "messages" in cb_kwargs:
|
|
messages = cb_kwargs["messages"]
|
|
|
|
# Make sure the client doesn't get a message twice
|
|
# when messages are processed out of order.
|
|
if messages[0].id <= last:
|
|
# We must return a response because we don't have
|
|
# a way to re-queue a callback and so the client
|
|
# must do it by making a new request
|
|
handler.finish({"result": "success",
|
|
"msg": "",
|
|
'update_types': []})
|
|
return
|
|
|
|
kwargs.update(cb_kwargs)
|
|
res = format_updates_response(user_profile=user_profile,
|
|
client_server_generation=client_server_generation,
|
|
**kwargs)
|
|
send_with_safety_check(res, handler, **kwargs)
|
|
except socket.error:
|
|
pass
|
|
|
|
user_profile.add_receive_callback(handler.async_callback(cb))
|
|
if client_pointer is not None:
|
|
user_profile.add_pointer_update_callback(handler.async_callback(cb))
|
|
|
|
# runtornado recognizes this special return value.
|
|
return RespondAsynchronously
|