diff --git a/zerver/lib/retention.py b/zerver/lib/retention.py index 38b43a3e67..6473f9c489 100644 --- a/zerver/lib/retention.py +++ b/zerver/lib/retention.py @@ -12,6 +12,8 @@ from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMe from typing import Any, Dict, List +MESSAGE_BATCH_SIZE = 1000 + models_with_message_key = [ { 'class': Reaction, @@ -65,24 +67,46 @@ def ids_list_to_sql_query_format(ids: List[int]) -> str: return ids_string +def run_message_batch_query(query: str, chunk_size: int=MESSAGE_BATCH_SIZE, + **kwargs: Any) -> List[List[int]]: + id_chunks = [] # type: List[List[int]] + while True: + new_chunk = move_rows(Message, query, chunk_size=chunk_size, **kwargs) + if new_chunk: + id_chunks.append(new_chunk) + + # We run the loop, until the query returns fewer results than chunk_size, which means we are done: + if len(new_chunk) < chunk_size: + return id_chunks + +# Note about batching these Message archiving queries: +# We can simply use LIMIT without worrying about OFFSETs and ordering +# while executing batches, because any Message already archived (in the previous batch) +# will not show up in the "SELECT ... FROM zerver_message ..." query for the next batches. + def move_expired_messages_to_archive_by_recipient(recipient: Recipient, - message_retention_days: int) -> List[int]: + message_retention_days: int, + chunk_size: int=MESSAGE_BATCH_SIZE) -> List[List[int]]: query = """ INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp) - SELECT {src_fields}, '{archive_timestamp}' - FROM zerver_message - LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id - WHERE zerver_message.recipient_id = {recipient_id} - AND zerver_message.pub_date < '{check_date}' - AND zerver_archivedmessage.id is NULL + SELECT {src_fields}, '{archive_timestamp}' + FROM zerver_message + LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id + WHERE zerver_message.recipient_id = {recipient_id} + AND zerver_message.pub_date < '{check_date}' + AND zerver_archivedmessage.id is NULL + LIMIT {chunk_size} RETURNING id """ check_date = timezone_now() - timedelta(days=message_retention_days) - return move_rows(Message, query, returning_id=True, - recipient_id=recipient.id, check_date=check_date.isoformat()) + return run_message_batch_query(query, returning_id=True, + recipient_id=recipient.id, check_date=check_date.isoformat(), + chunk_size=chunk_size) -def move_expired_personal_and_huddle_messages_to_archive(realm: Realm) -> List[int]: +def move_expired_personal_and_huddle_messages_to_archive(realm: Realm, + chunk_size: int=MESSAGE_BATCH_SIZE + ) -> List[List[int]]: cross_realm_bot_ids_list = [get_user_including_cross_realm(email).id for email in settings.CROSS_REALM_BOT_EMAILS] cross_realm_bot_ids = str(tuple(cross_realm_bot_ids_list)) @@ -93,27 +117,28 @@ def move_expired_personal_and_huddle_messages_to_archive(realm: Realm) -> List[i # once https://github.com/zulip/zulip/issues/11015 is solved. query = """ INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp) - SELECT {src_fields}, '{archive_timestamp}' - FROM zerver_message - INNER JOIN zerver_recipient ON zerver_recipient.id = zerver_message.recipient_id - INNER JOIN zerver_userprofile ON zerver_userprofile.id = zerver_message.sender_id - LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id - WHERE zerver_userprofile.id NOT IN {cross_realm_bot_ids} - AND zerver_userprofile.realm_id = {realm_id} - AND zerver_recipient.type in {recipient_types} - AND zerver_message.pub_date < '{check_date}' - AND zerver_archivedmessage.id is NULL + SELECT {src_fields}, '{archive_timestamp}' + FROM zerver_message + INNER JOIN zerver_recipient ON zerver_recipient.id = zerver_message.recipient_id + INNER JOIN zerver_userprofile ON zerver_userprofile.id = zerver_message.sender_id + LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id + WHERE zerver_userprofile.id NOT IN {cross_realm_bot_ids} + AND zerver_userprofile.realm_id = {realm_id} + AND zerver_recipient.type in {recipient_types} + AND zerver_message.pub_date < '{check_date}' + AND zerver_archivedmessage.id is NULL + LIMIT {chunk_size} RETURNING id """ assert realm.message_retention_days is not None check_date = timezone_now() - timedelta(days=realm.message_retention_days) - return move_rows(Message, query, returning_id=True, cross_realm_bot_ids=cross_realm_bot_ids, - realm_id=realm.id, recipient_types=recipient_types, check_date=check_date.isoformat()) + return run_message_batch_query(query, returning_id=True, cross_realm_bot_ids=cross_realm_bot_ids, + realm_id=realm.id, recipient_types=recipient_types, + check_date=check_date.isoformat(), chunk_size=chunk_size) def move_models_with_message_key_to_archive(msg_ids: List[int]) -> None: - if not msg_ids: - return + assert len(msg_ids) > 0 for model in models_with_message_key: query = """ @@ -129,8 +154,7 @@ def move_models_with_message_key_to_archive(msg_ids: List[int]) -> None: message_ids=ids_list_to_sql_query_format(msg_ids)) def move_attachments_to_archive(msg_ids: List[int]) -> None: - if not msg_ids: - return + assert len(msg_ids) > 0 query = """ INSERT INTO zerver_archivedattachment ({dst_fields}, archive_timestamp) @@ -147,8 +171,7 @@ def move_attachments_to_archive(msg_ids: List[int]) -> None: def move_attachments_message_rows_to_archive(msg_ids: List[int]) -> None: - if not msg_ids: - return + assert len(msg_ids) > 0 query = """ INSERT INTO zerver_archivedattachment_messages (id, archivedattachment_id, archivedmessage_id) @@ -182,18 +205,22 @@ def move_related_objects_to_archive(msg_ids: List[int]) -> None: move_attachments_to_archive(msg_ids) move_attachments_message_rows_to_archive(msg_ids) -def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int) -> None: - msg_ids = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days) - move_related_objects_to_archive(msg_ids) - delete_messages(msg_ids) +def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int, + chunk_size: int=MESSAGE_BATCH_SIZE) -> None: + message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days, + chunk_size) + for chunk in message_id_chunks: + move_related_objects_to_archive(chunk) + delete_messages(chunk) -def archive_personal_and_huddle_messages() -> None: +def archive_personal_and_huddle_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None: for realm in Realm.objects.filter(message_retention_days__isnull=False): - msg_ids = move_expired_personal_and_huddle_messages_to_archive(realm) - move_related_objects_to_archive(msg_ids) - delete_messages(msg_ids) + message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size) + for chunk in message_id_chunks: + move_related_objects_to_archive(chunk) + delete_messages(chunk) -def archive_stream_messages() -> None: +def archive_stream_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None: # We don't archive, if the stream has message_retention_days set to -1, # or if neither the stream nor the realm have a retention policy. streams = Stream.objects.exclude(message_retention_days=-1).filter( @@ -209,11 +236,11 @@ def archive_stream_messages() -> None: recipients = get_stream_recipients([stream.id for stream in streams]) for recipient in recipients: - archive_messages_by_recipient(recipient, retention_policy_dict[recipient.type_id]) + archive_messages_by_recipient(recipient, retention_policy_dict[recipient.type_id], chunk_size) -def archive_messages() -> None: - archive_stream_messages() - archive_personal_and_huddle_messages() +def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None: + archive_stream_messages(chunk_size) + archive_personal_and_huddle_messages(chunk_size) # Since figuring out which attachments can be deleted requires scanning the whole # ArchivedAttachment table, we should do this just once, at the end of the archiving process: diff --git a/zerver/tests/test_retention.py b/zerver/tests/test_retention.py index e13d1549fa..c4b89e81e8 100644 --- a/zerver/tests/test_retention.py +++ b/zerver/tests/test_retention.py @@ -13,7 +13,7 @@ from zerver.models import (Message, Realm, UserProfile, Stream, ArchivedUserMess get_realm, get_user_profile_by_email, get_stream, get_system_bot) from zerver.lib.retention import ( archive_messages, - move_messages_to_archive + move_messages_to_archive, ) # Class with helper functions useful for testing archiving of reactions: @@ -22,6 +22,7 @@ from zerver.tests.test_reactions import EmojiReactionBase ZULIP_REALM_DAYS = 30 MIT_REALM_DAYS = 100 + class RetentionTestingBase(ZulipTestCase): """ Test receiving expired messages retention tool. @@ -249,7 +250,7 @@ class TestArchivingGeneral(RetentionTestingBase): expired_msg_ids = expired_mit_msg_ids + expired_zulip_msg_ids expired_usermsg_ids = self._get_usermessage_ids(expired_msg_ids) - archive_messages() + archive_messages(chunk_size=2) # Specify low chunk_size to test batching. # Make sure we archived what neeeded: self._verify_archive_data(expired_msg_ids, expired_usermsg_ids)