mirror of
https://github.com/zulip/zulip.git
synced 2025-11-10 17:07:07 +00:00
Refactor update_message_flags to do the query in the django process.
Trac #1398. (imported from commit c001747d8e6d78a12fe535c36a81c12592976840)
This commit is contained in:
@@ -836,44 +836,21 @@ def update_user_presence(user_profile, client, log_time, status):
|
|||||||
queue_json_publish("user_activity", event, process_user_presence_event)
|
queue_json_publish("user_activity", event, process_user_presence_event)
|
||||||
|
|
||||||
def update_message_flags(user_profile, operation, flag, messages, all):
|
def update_message_flags(user_profile, operation, flag, messages, all):
|
||||||
rest_until = None
|
flagattr = getattr(UserMessage.flags, flag)
|
||||||
|
|
||||||
if all:
|
if all:
|
||||||
log_statsd_event('bankruptcy')
|
log_statsd_event('bankruptcy')
|
||||||
|
msgs = UserMessage.objects.filter(user_profile=user_profile)
|
||||||
|
else:
|
||||||
|
msgs = UserMessage.objects.filter(user_profile=user_profile,
|
||||||
|
message__id__in=messages)
|
||||||
|
|
||||||
# Do the first 450 message updates in-process, as this is a
|
if operation == 'add':
|
||||||
# bankruptcy request and the user is about to reload. We don't
|
msgs.update(flags=F('flags').bitor(flagattr))
|
||||||
# want them to see a bunch of unread messages while we go about
|
elif operation == 'remove':
|
||||||
# doing the work
|
msgs.update(flags=F('flags').bitand(~flagattr))
|
||||||
first_batch = 450
|
|
||||||
flagattr = getattr(UserMessage.flags, flag)
|
|
||||||
|
|
||||||
all_ums = UserMessage.objects.filter(user_profile=user_profile)
|
statsd.incr("flags.%s.%s" % (flag, operation), len(msgs))
|
||||||
if operation == "add":
|
|
||||||
umessages = all_ums.filter(flags=~flagattr)
|
|
||||||
elif operation == "remove":
|
|
||||||
umessages = all_ums.filter(flags=flagattr)
|
|
||||||
|
|
||||||
mids = [m.id for m in umessages.order_by('-id')[:first_batch]]
|
|
||||||
to_update = UserMessage.objects.filter(id__in=mids)
|
|
||||||
|
|
||||||
if operation == "add":
|
|
||||||
to_update.update(flags=F('flags').bitor(flagattr))
|
|
||||||
elif operation == "remove":
|
|
||||||
to_update.update(flags=F('flags').bitand(~flagattr))
|
|
||||||
|
|
||||||
if len(mids) == 0:
|
|
||||||
return True
|
|
||||||
|
|
||||||
rest_until = mids[len(mids) - 1]
|
|
||||||
|
|
||||||
event = {'type': 'update_message',
|
|
||||||
'user_profile_id': user_profile.id,
|
|
||||||
'operation': operation,
|
|
||||||
'flag': flag,
|
|
||||||
'messages': messages,
|
|
||||||
'until_id': rest_until}
|
|
||||||
queue_json_publish("user_activity", event, process_update_message_flags)
|
|
||||||
|
|
||||||
def process_user_presence_event(event):
|
def process_user_presence_event(event):
|
||||||
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
||||||
@@ -882,50 +859,6 @@ def process_user_presence_event(event):
|
|||||||
status = event["status"]
|
status = event["status"]
|
||||||
return do_update_user_presence(user_profile, client, log_time, status)
|
return do_update_user_presence(user_profile, client, log_time, status)
|
||||||
|
|
||||||
def process_update_message_flags(event):
|
|
||||||
user_profile = get_user_profile_by_id(event["user_profile_id"])
|
|
||||||
try:
|
|
||||||
until_id = event["until_id"]
|
|
||||||
messages = event["messages"]
|
|
||||||
flag = event["flag"]
|
|
||||||
op = event["operation"]
|
|
||||||
except (KeyError, AttributeError):
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Shell out bankruptcy requests as we split them up into many
|
|
||||||
# pieces to avoid swamping the db
|
|
||||||
if until_id and not settings.TEST_SUITE:
|
|
||||||
update_flags_externally(op, flag, user_profile, until_id)
|
|
||||||
return True
|
|
||||||
|
|
||||||
flagattr = getattr(UserMessage.flags, flag)
|
|
||||||
msgs = UserMessage.objects.filter(user_profile=user_profile,
|
|
||||||
message__id__in=messages)
|
|
||||||
|
|
||||||
# If we're running in the test suite, don't shell out to manage.py.
|
|
||||||
# Updates that the manage.py command makes don't seem to be immediately
|
|
||||||
# reflected in the next in-process sqlite queries.
|
|
||||||
# TODO(leo) remove when tests switch to postgres
|
|
||||||
if settings.TEST_SUITE and until_id:
|
|
||||||
msgs = UserMessage.objects.filter(user_profile=user_profile,
|
|
||||||
id__lte=until_id)
|
|
||||||
|
|
||||||
if op == 'add':
|
|
||||||
msgs.update(flags=F('flags').bitor(flagattr))
|
|
||||||
elif op == 'remove':
|
|
||||||
msgs.update(flags=F('flags').bitand(~flagattr))
|
|
||||||
|
|
||||||
statsd.incr("flags.%s.%s" % (flag, op), len(msgs))
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
def update_flags_externally(op, flag, user_profile, until_id):
|
|
||||||
args = ['python', os.path.join(os.path.dirname(__file__), '../..', 'manage.py'),
|
|
||||||
'set_message_flags', '--for-real', '-o', op, '-f', flag, '-m', user_profile.email,
|
|
||||||
'-u', str(until_id)]
|
|
||||||
|
|
||||||
subprocess.Popen(args, stdin=subprocess.PIPE, stdout=None, stderr=None)
|
|
||||||
|
|
||||||
def subscribed_to_stream(user_profile, stream):
|
def subscribed_to_stream(user_profile, stream):
|
||||||
try:
|
try:
|
||||||
if Subscription.objects.get(user_profile=user_profile,
|
if Subscription.objects.get(user_profile=user_profile,
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ from django.core.management.base import BaseCommand
|
|||||||
import ujson
|
import ujson
|
||||||
import pika
|
import pika
|
||||||
from zephyr.lib.actions import process_user_activity_event, \
|
from zephyr.lib.actions import process_user_activity_event, \
|
||||||
process_user_presence_event, process_update_message_flags
|
process_user_presence_event
|
||||||
from zephyr.lib.queue import SimpleQueueClient
|
from zephyr.lib.queue import SimpleQueueClient
|
||||||
import sys
|
import sys
|
||||||
import signal
|
import signal
|
||||||
@@ -24,8 +24,6 @@ class Command(BaseCommand):
|
|||||||
process_user_activity_event(event)
|
process_user_activity_event(event)
|
||||||
elif msg_type == 'user_presence':
|
elif msg_type == 'user_presence':
|
||||||
process_user_presence_event(event)
|
process_user_presence_event(event)
|
||||||
elif msg_type == 'update_message':
|
|
||||||
process_update_message_flags(event)
|
|
||||||
else:
|
else:
|
||||||
print("[*] Unknown message type: %s" (msg_type,))
|
print("[*] Unknown message type: %s" (msg_type,))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user