mirror of
https://github.com/zulip/zulip.git
synced 2025-11-12 09:58:06 +00:00
Rewrite message flag handling to fork out to a subprocess for batch handling
(imported from commit 1ef846f542950cabf32f8b176f5591cf5794a0ff)
This commit is contained in:
@@ -83,7 +83,7 @@
|
|||||||
<hr />
|
<hr />
|
||||||
|
|
||||||
<div>
|
<div>
|
||||||
<button class="btn" data-dismiss="modal" aria-hidden="true" onclick="mark_all_as_read(); fast_forward_pointer(this);">
|
<button class="btn" data-dismiss="modal" aria-hidden="true" onclick="fast_forward_pointer(this);">
|
||||||
Declare Humbug bankruptcy
|
Declare Humbug bankruptcy
|
||||||
</button>
|
</button>
|
||||||
<p>
|
<p>
|
||||||
|
|||||||
@@ -550,12 +550,41 @@ if settings.USING_RABBITMQ or settings.TEST_SUITE:
|
|||||||
process_user_presence_event(event)
|
process_user_presence_event(event)
|
||||||
|
|
||||||
def update_message_flags(user_profile, operation, flag, messages, all):
|
def update_message_flags(user_profile, operation, flag, messages, all):
|
||||||
|
rest_until = None
|
||||||
|
|
||||||
|
if all:
|
||||||
|
# Do the first 450 message updates in-process, as this is a
|
||||||
|
# bankruptcy request and the user is about to reload. We don't
|
||||||
|
# want them to see a bunch of unread messages while we go about
|
||||||
|
# doing the work
|
||||||
|
first_batch = 450
|
||||||
|
flagattr = getattr(UserMessage.flags, flag)
|
||||||
|
|
||||||
|
all_ums = UserMessage.objects.filter(user_profile=user_profile)
|
||||||
|
if operation == "add":
|
||||||
|
umessages = all_ums.filter(flags=~flagattr)
|
||||||
|
elif operation == "remove":
|
||||||
|
umessages = all_ums.filter(flags=flagattr)
|
||||||
|
|
||||||
|
mids = [m.id for m in umessages.order_by('-id')[:first_batch]]
|
||||||
|
to_update = UserMessage.objects.filter(id__in=mids)
|
||||||
|
|
||||||
|
if operation == "add":
|
||||||
|
to_update.update(flags=F('flags') | flagattr)
|
||||||
|
elif operation == "remove":
|
||||||
|
to_update.update(flags=F('flags') & ~flagattr)
|
||||||
|
|
||||||
|
if len(mids) == 0:
|
||||||
|
return True
|
||||||
|
|
||||||
|
rest_until = mids[len(mids) - 1]
|
||||||
|
|
||||||
event = {'type': 'update_message',
|
event = {'type': 'update_message',
|
||||||
'user_profile_id': user_profile.id,
|
'user_profile_id': user_profile.id,
|
||||||
'operation': operation,
|
'operation': operation,
|
||||||
'flag': flag,
|
'flag': flag,
|
||||||
'messages': messages,
|
'messages': messages,
|
||||||
'all': all}
|
'until_id': rest_until}
|
||||||
if settings.USING_RABBITMQ:
|
if settings.USING_RABBITMQ:
|
||||||
actions_queue.json_publish("user_activity", event)
|
actions_queue.json_publish("user_activity", event)
|
||||||
else:
|
else:
|
||||||
@@ -574,25 +603,45 @@ def process_user_presence_event(event):
|
|||||||
def process_update_message_flags(event):
|
def process_update_message_flags(event):
|
||||||
user_profile = UserProfile.objects.get(id=event["user_profile_id"])
|
user_profile = UserProfile.objects.get(id=event["user_profile_id"])
|
||||||
try:
|
try:
|
||||||
msg_ids = event["messages"]
|
until_id = event["until_id"]
|
||||||
flag = getattr(UserMessage.flags, event["flag"])
|
messages = event["messages"]
|
||||||
|
flag = event["flag"]
|
||||||
op = event["operation"]
|
op = event["operation"]
|
||||||
except (KeyError, AttributeError):
|
except (KeyError, AttributeError):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if event["all"] == True:
|
# Shell out bankruptcy requests as we split them up into many
|
||||||
messages = UserMessage.objects.filter(user_profile=user_profile)
|
# pieces to avoid swamping the db
|
||||||
else:
|
if until_id and not settings.TEST_SUITE:
|
||||||
messages = UserMessage.objects.filter(user_profile=user_profile,
|
update_flags_externally(op, flag, user_profile, until_id)
|
||||||
message__id__in=msg_ids)
|
return True
|
||||||
|
|
||||||
if op == "add":
|
flagattr = getattr(UserMessage.flags, flag)
|
||||||
messages.update(flags=F('flags') | flag)
|
msgs = UserMessage.objects.filter(user_profile=user_profile,
|
||||||
elif op == "remove":
|
message__id__in=messages)
|
||||||
messages.update(flags=F('flags') & ~flag)
|
|
||||||
|
# If we're running in the test suite, don't shell out to manage.py.
|
||||||
|
# Updates that the manage.py command makes don't seem to be immediately
|
||||||
|
# reflected in the next in-process sqlite queries.
|
||||||
|
# TODO(leo) remove when tests switch to postgres
|
||||||
|
if settings.TEST_SUITE and until_id:
|
||||||
|
msgs = UserMessage.objects.filter(user_profile=user_profile,
|
||||||
|
id__lte=until_id)
|
||||||
|
|
||||||
|
if op == 'add':
|
||||||
|
msgs.update(flags=F('flags') | flagattr)
|
||||||
|
elif op == 'remove':
|
||||||
|
msgs.update(flags=F('flags') & ~flagattr)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def update_flags_externally(op, flag, user_profile, until_id):
|
||||||
|
args = ['python', os.path.join(os.path.dirname(__file__), '../..', 'manage.py'),
|
||||||
|
'set_message_flags', '--for-real', '-o', op, '-f', flag, '-m', user_profile.user.email,
|
||||||
|
'-u', str(until_id)]
|
||||||
|
|
||||||
|
subprocess.Popen(args, stdin=subprocess.PIPE, stdout=None, stderr=None)
|
||||||
|
|
||||||
def subscribed_to_stream(user_profile, stream):
|
def subscribed_to_stream(user_profile, stream):
|
||||||
try:
|
try:
|
||||||
if Subscription.objects.get(user_profile=user_profile,
|
if Subscription.objects.get(user_profile=user_profile,
|
||||||
|
|||||||
81
zephyr/management/commands/set_message_flags.py
Normal file
81
zephyr/management/commands/set_message_flags.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
from optparse import make_option
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from django.core.management.base import BaseCommand
|
||||||
|
|
||||||
|
from zephyr.lib.actions import do_deactivate, user_sessions
|
||||||
|
from zephyr.lib import utils
|
||||||
|
from zephyr.models import UserMessage, UserProfile
|
||||||
|
from django.db import transaction, models
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = """Sets user message flags. Used internally by actions.py. Marks all
|
||||||
|
Expects a comma-delimited list of user message ids via stdin, and an EOF to terminate."""
|
||||||
|
|
||||||
|
option_list = BaseCommand.option_list + (
|
||||||
|
make_option('-r', '--for-real',
|
||||||
|
dest='for_real',
|
||||||
|
action='store_true',
|
||||||
|
default=False,
|
||||||
|
help="Actually change message flags. Default is a dry run."),
|
||||||
|
make_option('-f', '--flag',
|
||||||
|
dest='flag',
|
||||||
|
type='string',
|
||||||
|
help="The flag to add of remove"),
|
||||||
|
make_option('-o', '--op',
|
||||||
|
dest='op',
|
||||||
|
type='string',
|
||||||
|
help="The operation to do: 'add' or 'remove'"),
|
||||||
|
make_option('-u', '--until',
|
||||||
|
dest='all_until',
|
||||||
|
type='string',
|
||||||
|
help="Mark all messages <= specific usermessage id"),
|
||||||
|
make_option('-m', '--email',
|
||||||
|
dest='email',
|
||||||
|
type='string',
|
||||||
|
help="Email to set messages for"),
|
||||||
|
)
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
if not options["flag"] or not options["op"] or not options["email"]:
|
||||||
|
print "Please specify an operation, a flag and an email"
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
op = options['op']
|
||||||
|
flag = getattr(UserMessage.flags, options['flag'])
|
||||||
|
all_until = options['all_until']
|
||||||
|
email = options['email']
|
||||||
|
|
||||||
|
user_profile = UserProfile.objects.get(user__email=email)
|
||||||
|
|
||||||
|
if not all_until:
|
||||||
|
message_ids = [mid.strip() for mid in sys.stdin.read().split(',')]
|
||||||
|
mids = [m.id for m in UserMessage.objects.filter(user_profile=user_profile,
|
||||||
|
message__id__in=message_ids)
|
||||||
|
.order_by('-id')]
|
||||||
|
else:
|
||||||
|
mids = [m.id for m in UserMessage.objects.filter(user_profile=user_profile,
|
||||||
|
id__lte=all_until)
|
||||||
|
.order_by('-id')]
|
||||||
|
if options["for_real"]:
|
||||||
|
sys.stdin.close()
|
||||||
|
sys.stdout.close()
|
||||||
|
sys.stderr.close()
|
||||||
|
|
||||||
|
def do_update(batch):
|
||||||
|
with transaction.commit_on_success():
|
||||||
|
msgs = UserMessage.objects.filter(id__in=batch)
|
||||||
|
if op == 'add':
|
||||||
|
msgs.update(flags=models.F('flags') | flag)
|
||||||
|
elif op == 'remove':
|
||||||
|
msgs.update(flags=models.F('flags') & ~flag)
|
||||||
|
|
||||||
|
if not options["for_real"]:
|
||||||
|
logging.info("Updating %s by %s %s" % (mids, op, flag))
|
||||||
|
logging.info("Dry run completed. Run with --for-real to change message flags.")
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
utils.run_in_batches(mids, 400, do_update, sleep_time=3)
|
||||||
|
exit(0)
|
||||||
@@ -228,7 +228,7 @@ function update_unread_counts() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function mark_all_as_read() {
|
function mark_all_as_read(cont) {
|
||||||
$.each(all_msg_list.all(), function (idx, msg) {
|
$.each(all_msg_list.all(), function (idx, msg) {
|
||||||
msg.flags = msg.flags || [];
|
msg.flags = msg.flags || [];
|
||||||
msg.flags.push('read');
|
msg.flags.push('read');
|
||||||
@@ -243,7 +243,8 @@ function mark_all_as_read() {
|
|||||||
all: true,
|
all: true,
|
||||||
op: 'add',
|
op: 'add',
|
||||||
flag: 'read'},
|
flag: 'read'},
|
||||||
dataType: 'json'});
|
dataType: 'json',
|
||||||
|
success: cont});
|
||||||
}
|
}
|
||||||
|
|
||||||
function unread_hashkey(message) {
|
function unread_hashkey(message) {
|
||||||
@@ -919,10 +920,12 @@ function fast_forward_pointer(btn) {
|
|||||||
data: {email: email},
|
data: {email: email},
|
||||||
dataType: 'json',
|
dataType: 'json',
|
||||||
success: function (data) {
|
success: function (data) {
|
||||||
|
mark_all_as_read(function () {
|
||||||
furthest_read = data.max_message_id;
|
furthest_read = data.max_message_id;
|
||||||
send_pointer_update();
|
send_pointer_update();
|
||||||
ui.change_tab_to('#home');
|
ui.change_tab_to('#home');
|
||||||
reload.initiate({immediate: true});
|
reload.initiate({immediate: true});
|
||||||
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user