retention: Batch Message archiving queries.

We batch queries that archive Messages, to limit the maximum amount of
Message objects archived in a single query. This leads to the archiving
of other related objects being batched as well, because we loop over
chunks of archived messages and archive their related objects per-chunk.
This commit is contained in:
Mateusz Mandera
2019-06-10 18:09:50 +02:00
committed by Tim Abbott
parent 86840adda5
commit f06a4b4eab
2 changed files with 71 additions and 43 deletions

View File

@@ -12,6 +12,8 @@ from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMe
from typing import Any, Dict, List from typing import Any, Dict, List
MESSAGE_BATCH_SIZE = 1000
models_with_message_key = [ models_with_message_key = [
{ {
'class': Reaction, 'class': Reaction,
@@ -65,24 +67,46 @@ def ids_list_to_sql_query_format(ids: List[int]) -> str:
return ids_string return ids_string
def run_message_batch_query(query: str, chunk_size: int=MESSAGE_BATCH_SIZE,
**kwargs: Any) -> List[List[int]]:
id_chunks = [] # type: List[List[int]]
while True:
new_chunk = move_rows(Message, query, chunk_size=chunk_size, **kwargs)
if new_chunk:
id_chunks.append(new_chunk)
# We run the loop, until the query returns fewer results than chunk_size, which means we are done:
if len(new_chunk) < chunk_size:
return id_chunks
# Note about batching these Message archiving queries:
# We can simply use LIMIT without worrying about OFFSETs and ordering
# while executing batches, because any Message already archived (in the previous batch)
# will not show up in the "SELECT ... FROM zerver_message ..." query for the next batches.
def move_expired_messages_to_archive_by_recipient(recipient: Recipient, def move_expired_messages_to_archive_by_recipient(recipient: Recipient,
message_retention_days: int) -> List[int]: message_retention_days: int,
chunk_size: int=MESSAGE_BATCH_SIZE) -> List[List[int]]:
query = """ query = """
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp) INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp)
SELECT {src_fields}, '{archive_timestamp}' SELECT {src_fields}, '{archive_timestamp}'
FROM zerver_message FROM zerver_message
LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id
WHERE zerver_message.recipient_id = {recipient_id} WHERE zerver_message.recipient_id = {recipient_id}
AND zerver_message.pub_date < '{check_date}' AND zerver_message.pub_date < '{check_date}'
AND zerver_archivedmessage.id is NULL AND zerver_archivedmessage.id is NULL
LIMIT {chunk_size}
RETURNING id RETURNING id
""" """
check_date = timezone_now() - timedelta(days=message_retention_days) check_date = timezone_now() - timedelta(days=message_retention_days)
return move_rows(Message, query, returning_id=True, return run_message_batch_query(query, returning_id=True,
recipient_id=recipient.id, check_date=check_date.isoformat()) recipient_id=recipient.id, check_date=check_date.isoformat(),
chunk_size=chunk_size)
def move_expired_personal_and_huddle_messages_to_archive(realm: Realm) -> List[int]: def move_expired_personal_and_huddle_messages_to_archive(realm: Realm,
chunk_size: int=MESSAGE_BATCH_SIZE
) -> List[List[int]]:
cross_realm_bot_ids_list = [get_user_including_cross_realm(email).id cross_realm_bot_ids_list = [get_user_including_cross_realm(email).id
for email in settings.CROSS_REALM_BOT_EMAILS] for email in settings.CROSS_REALM_BOT_EMAILS]
cross_realm_bot_ids = str(tuple(cross_realm_bot_ids_list)) cross_realm_bot_ids = str(tuple(cross_realm_bot_ids_list))
@@ -93,27 +117,28 @@ def move_expired_personal_and_huddle_messages_to_archive(realm: Realm) -> List[i
# once https://github.com/zulip/zulip/issues/11015 is solved. # once https://github.com/zulip/zulip/issues/11015 is solved.
query = """ query = """
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp) INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp)
SELECT {src_fields}, '{archive_timestamp}' SELECT {src_fields}, '{archive_timestamp}'
FROM zerver_message FROM zerver_message
INNER JOIN zerver_recipient ON zerver_recipient.id = zerver_message.recipient_id INNER JOIN zerver_recipient ON zerver_recipient.id = zerver_message.recipient_id
INNER JOIN zerver_userprofile ON zerver_userprofile.id = zerver_message.sender_id INNER JOIN zerver_userprofile ON zerver_userprofile.id = zerver_message.sender_id
LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id
WHERE zerver_userprofile.id NOT IN {cross_realm_bot_ids} WHERE zerver_userprofile.id NOT IN {cross_realm_bot_ids}
AND zerver_userprofile.realm_id = {realm_id} AND zerver_userprofile.realm_id = {realm_id}
AND zerver_recipient.type in {recipient_types} AND zerver_recipient.type in {recipient_types}
AND zerver_message.pub_date < '{check_date}' AND zerver_message.pub_date < '{check_date}'
AND zerver_archivedmessage.id is NULL AND zerver_archivedmessage.id is NULL
LIMIT {chunk_size}
RETURNING id RETURNING id
""" """
assert realm.message_retention_days is not None assert realm.message_retention_days is not None
check_date = timezone_now() - timedelta(days=realm.message_retention_days) check_date = timezone_now() - timedelta(days=realm.message_retention_days)
return move_rows(Message, query, returning_id=True, cross_realm_bot_ids=cross_realm_bot_ids, return run_message_batch_query(query, returning_id=True, cross_realm_bot_ids=cross_realm_bot_ids,
realm_id=realm.id, recipient_types=recipient_types, check_date=check_date.isoformat()) realm_id=realm.id, recipient_types=recipient_types,
check_date=check_date.isoformat(), chunk_size=chunk_size)
def move_models_with_message_key_to_archive(msg_ids: List[int]) -> None: def move_models_with_message_key_to_archive(msg_ids: List[int]) -> None:
if not msg_ids: assert len(msg_ids) > 0
return
for model in models_with_message_key: for model in models_with_message_key:
query = """ query = """
@@ -129,8 +154,7 @@ def move_models_with_message_key_to_archive(msg_ids: List[int]) -> None:
message_ids=ids_list_to_sql_query_format(msg_ids)) message_ids=ids_list_to_sql_query_format(msg_ids))
def move_attachments_to_archive(msg_ids: List[int]) -> None: def move_attachments_to_archive(msg_ids: List[int]) -> None:
if not msg_ids: assert len(msg_ids) > 0
return
query = """ query = """
INSERT INTO zerver_archivedattachment ({dst_fields}, archive_timestamp) INSERT INTO zerver_archivedattachment ({dst_fields}, archive_timestamp)
@@ -147,8 +171,7 @@ def move_attachments_to_archive(msg_ids: List[int]) -> None:
def move_attachments_message_rows_to_archive(msg_ids: List[int]) -> None: def move_attachments_message_rows_to_archive(msg_ids: List[int]) -> None:
if not msg_ids: assert len(msg_ids) > 0
return
query = """ query = """
INSERT INTO zerver_archivedattachment_messages (id, archivedattachment_id, archivedmessage_id) INSERT INTO zerver_archivedattachment_messages (id, archivedattachment_id, archivedmessage_id)
@@ -182,18 +205,22 @@ def move_related_objects_to_archive(msg_ids: List[int]) -> None:
move_attachments_to_archive(msg_ids) move_attachments_to_archive(msg_ids)
move_attachments_message_rows_to_archive(msg_ids) move_attachments_message_rows_to_archive(msg_ids)
def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int) -> None: def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int,
msg_ids = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days) chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
move_related_objects_to_archive(msg_ids) message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days,
delete_messages(msg_ids) chunk_size)
for chunk in message_id_chunks:
move_related_objects_to_archive(chunk)
delete_messages(chunk)
def archive_personal_and_huddle_messages() -> None: def archive_personal_and_huddle_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
for realm in Realm.objects.filter(message_retention_days__isnull=False): for realm in Realm.objects.filter(message_retention_days__isnull=False):
msg_ids = move_expired_personal_and_huddle_messages_to_archive(realm) message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
move_related_objects_to_archive(msg_ids) for chunk in message_id_chunks:
delete_messages(msg_ids) move_related_objects_to_archive(chunk)
delete_messages(chunk)
def archive_stream_messages() -> None: def archive_stream_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
# We don't archive, if the stream has message_retention_days set to -1, # We don't archive, if the stream has message_retention_days set to -1,
# or if neither the stream nor the realm have a retention policy. # or if neither the stream nor the realm have a retention policy.
streams = Stream.objects.exclude(message_retention_days=-1).filter( streams = Stream.objects.exclude(message_retention_days=-1).filter(
@@ -209,11 +236,11 @@ def archive_stream_messages() -> None:
recipients = get_stream_recipients([stream.id for stream in streams]) recipients = get_stream_recipients([stream.id for stream in streams])
for recipient in recipients: for recipient in recipients:
archive_messages_by_recipient(recipient, retention_policy_dict[recipient.type_id]) archive_messages_by_recipient(recipient, retention_policy_dict[recipient.type_id], chunk_size)
def archive_messages() -> None: def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
archive_stream_messages() archive_stream_messages(chunk_size)
archive_personal_and_huddle_messages() archive_personal_and_huddle_messages(chunk_size)
# Since figuring out which attachments can be deleted requires scanning the whole # Since figuring out which attachments can be deleted requires scanning the whole
# ArchivedAttachment table, we should do this just once, at the end of the archiving process: # ArchivedAttachment table, we should do this just once, at the end of the archiving process:

View File

@@ -13,7 +13,7 @@ from zerver.models import (Message, Realm, UserProfile, Stream, ArchivedUserMess
get_realm, get_user_profile_by_email, get_stream, get_system_bot) get_realm, get_user_profile_by_email, get_stream, get_system_bot)
from zerver.lib.retention import ( from zerver.lib.retention import (
archive_messages, archive_messages,
move_messages_to_archive move_messages_to_archive,
) )
# Class with helper functions useful for testing archiving of reactions: # Class with helper functions useful for testing archiving of reactions:
@@ -22,6 +22,7 @@ from zerver.tests.test_reactions import EmojiReactionBase
ZULIP_REALM_DAYS = 30 ZULIP_REALM_DAYS = 30
MIT_REALM_DAYS = 100 MIT_REALM_DAYS = 100
class RetentionTestingBase(ZulipTestCase): class RetentionTestingBase(ZulipTestCase):
""" """
Test receiving expired messages retention tool. Test receiving expired messages retention tool.
@@ -249,7 +250,7 @@ class TestArchivingGeneral(RetentionTestingBase):
expired_msg_ids = expired_mit_msg_ids + expired_zulip_msg_ids expired_msg_ids = expired_mit_msg_ids + expired_zulip_msg_ids
expired_usermsg_ids = self._get_usermessage_ids(expired_msg_ids) expired_usermsg_ids = self._get_usermessage_ids(expired_msg_ids)
archive_messages() archive_messages(chunk_size=2) # Specify low chunk_size to test batching.
# Make sure we archived what neeeded: # Make sure we archived what neeeded:
self._verify_archive_data(expired_msg_ids, expired_usermsg_ids) self._verify_archive_data(expired_msg_ids, expired_usermsg_ids)