retention: Use batch size of 100 for stream messages.

Streams can have lots of subscribers, meaning that the archiving process
will be moving tons of UserMessages per message. For that reason, using
a smaller batch size for stream messages is justified.

Some personal messages need to be added in test_scrub_realm to have
coverage of do_delete_messages_by_sender after these changes.
This commit is contained in:
Mateusz Mandera
2020-06-24 16:47:17 +02:00
committed by Tim Abbott
parent 0c6497d43a
commit 890cafac11
2 changed files with 8 additions and 4 deletions

View File

@@ -49,6 +49,7 @@ from confirmation.models import (
) )
from zerver.decorator import statsd_increment from zerver.decorator import statsd_increment
from zerver.lib import bugdown from zerver.lib import bugdown
from zerver.lib import retention as retention
from zerver.lib.addressee import Addressee from zerver.lib.addressee import Addressee
from zerver.lib.alert_words import ( from zerver.lib.alert_words import (
add_user_alert_words, add_user_alert_words,
@@ -4710,6 +4711,7 @@ def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None:
# TODO: We should plan to remove `sender_id` here. # TODO: We should plan to remove `sender_id` here.
event['recipient_id'] = sample_message.recipient_id event['recipient_id'] = sample_message.recipient_id
event['sender_id'] = sample_message.sender_id event['sender_id'] = sample_message.sender_id
archiving_chunk_size = retention.MESSAGE_BATCH_SIZE
if message_type == "stream": if message_type == "stream":
stream_id = sample_message.recipient.type_id stream_id = sample_message.recipient.type_id
@@ -4720,8 +4722,9 @@ def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None:
subscribers = subscribers.exclude(user_profile__long_term_idle=True) subscribers = subscribers.exclude(user_profile__long_term_idle=True)
subscribers_ids = [user.user_profile_id for user in subscribers] subscribers_ids = [user.user_profile_id for user in subscribers]
users_to_notify = list(map(subscriber_info, subscribers_ids)) users_to_notify = list(map(subscriber_info, subscribers_ids))
archiving_chunk_size = retention.STREAM_MESSAGE_BATCH_SIZE
move_messages_to_archive(message_ids, realm=realm) move_messages_to_archive(message_ids, realm=realm, chunk_size=archiving_chunk_size)
event['message_type'] = message_type event['message_type'] = message_type
send_event(realm, event, users_to_notify) send_event(realm, event, users_to_notify)
@@ -4729,7 +4732,7 @@ def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None:
def do_delete_messages_by_sender(user: UserProfile) -> None: def do_delete_messages_by_sender(user: UserProfile) -> None:
message_ids = list(Message.objects.filter(sender=user).values_list('id', flat=True).order_by('id')) message_ids = list(Message.objects.filter(sender=user).values_list('id', flat=True).order_by('id'))
if message_ids: if message_ids:
move_messages_to_archive(message_ids) move_messages_to_archive(message_ids, chunk_size=retention.STREAM_MESSAGE_BATCH_SIZE)
def get_streams_traffic(stream_ids: Set[int]) -> Dict[int, int]: def get_streams_traffic(stream_ids: Set[int]) -> Dict[int, int]:
stat = COUNT_STATS['messages_in_stream:is_bot:day'] stat = COUNT_STATS['messages_in_stream:is_bot:day']

View File

@@ -60,6 +60,7 @@ logger = logging.getLogger('zulip.retention')
log_to_file(logger, settings.RETENTION_LOG_PATH) log_to_file(logger, settings.RETENTION_LOG_PATH)
MESSAGE_BATCH_SIZE = 1000 MESSAGE_BATCH_SIZE = 1000
STREAM_MESSAGE_BATCH_SIZE = 100
TRANSACTION_DELETION_BATCH_SIZE = 100 TRANSACTION_DELETION_BATCH_SIZE = 100
# This data structure declares the details of all database tables that # This data structure declares the details of all database tables that
@@ -362,7 +363,7 @@ def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_B
message_count = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size) message_count = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
logger.info("Done. Archived %s messages", message_count) logger.info("Done. Archived %s messages", message_count)
def archive_stream_messages(realm: Realm, streams: List[Stream], chunk_size: int=MESSAGE_BATCH_SIZE) -> None: def archive_stream_messages(realm: Realm, streams: List[Stream], chunk_size: int=STREAM_MESSAGE_BATCH_SIZE) -> None:
if not streams: if not streams:
return return
@@ -389,7 +390,7 @@ def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
logger.info("Starting the archiving process with chunk_size %s", chunk_size) logger.info("Starting the archiving process with chunk_size %s", chunk_size)
for realm, streams in get_realms_and_streams_for_archiving(): for realm, streams in get_realms_and_streams_for_archiving():
archive_stream_messages(realm, streams, chunk_size) archive_stream_messages(realm, streams, chunk_size=STREAM_MESSAGE_BATCH_SIZE)
if realm.message_retention_days != -1: if realm.message_retention_days != -1:
archive_personal_and_huddle_messages(realm, chunk_size) archive_personal_and_huddle_messages(realm, chunk_size)