retention: Use new ArchiveTransaction model.

We add a new model, ArchiveTransaction, to tie archived objects together
in a coherent way, according to the batches in which they are archived.
This enables making a better system for restoring from archive, and it
seems just more sensible to tie the archived objects in this way, rather
the somewhat vague setting of archive_timestamp to each object using
timezone_now().
This commit is contained in:
Mateusz Mandera
2019-06-18 19:54:09 +02:00
committed by Tim Abbott
parent e8d49330f2
commit a2cce62c1c
4 changed files with 90 additions and 24 deletions

View File

@@ -9,7 +9,7 @@ from django.utils.timezone import now as timezone_now
from zerver.lib.logging_util import log_to_file
from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMessage, Realm,
Attachment, ArchivedAttachment, Reaction, ArchivedReaction,
SubMessage, ArchivedSubMessage, Recipient, Stream,
SubMessage, ArchivedSubMessage, Recipient, Stream, ArchiveTransaction,
get_stream_recipients, get_user_including_cross_realm)
from typing import Any, Dict, Iterator, List
@@ -51,7 +51,6 @@ def move_rows(src_model: Any, raw_query: str, returning_id: bool=False,
sql_args = {
'src_fields': ','.join(src_fields),
'dst_fields': ','.join(dst_fields),
'archive_timestamp': timezone_now()
}
sql_args.update(kwargs)
with connection.cursor() as cursor:
@@ -96,8 +95,8 @@ def move_expired_messages_to_archive_by_recipient(recipient: Recipient,
# Important: This function is a generator, and you need to iterate
# through the Iterator it returns to execute the queries.
query = """
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp)
SELECT {src_fields}, '{archive_timestamp}'
INSERT INTO zerver_archivedmessage ({dst_fields})
SELECT {src_fields}
FROM zerver_message
LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id
WHERE zerver_message.recipient_id = {recipient_id}
@@ -126,8 +125,8 @@ def move_expired_personal_and_huddle_messages_to_archive(realm: Realm,
# TODO: Remove the "zerver_userprofile.id NOT IN {cross_realm_bot_ids}" clause
# once https://github.com/zulip/zulip/issues/11015 is solved.
query = """
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp)
SELECT {src_fields}, '{archive_timestamp}'
INSERT INTO zerver_archivedmessage ({dst_fields})
SELECT {src_fields}
FROM zerver_message
INNER JOIN zerver_recipient ON zerver_recipient.id = zerver_message.recipient_id
INNER JOIN zerver_userprofile ON zerver_userprofile.id = zerver_message.sender_id
@@ -153,8 +152,8 @@ def move_to_archive_and_delete_models_with_message_key(msg_ids: List[int]) -> No
for model in models_with_message_key:
query = """
WITH archived_data AS (
INSERT INTO {archive_table_name} ({dst_fields}, archive_timestamp)
SELECT {src_fields}, '{archive_timestamp}'
INSERT INTO {archive_table_name} ({dst_fields})
SELECT {src_fields}
FROM {table_name}
LEFT JOIN {archive_table_name} ON {archive_table_name}.id = {table_name}.id
WHERE {table_name}.message_id IN {message_ids}
@@ -172,8 +171,8 @@ def move_attachments_to_archive(msg_ids: List[int]) -> None:
assert len(msg_ids) > 0
query = """
INSERT INTO zerver_archivedattachment ({dst_fields}, archive_timestamp)
SELECT {src_fields}, '{archive_timestamp}'
INSERT INTO zerver_archivedattachment ({dst_fields})
SELECT {src_fields}
FROM zerver_attachment
INNER JOIN zerver_attachment_messages
ON zerver_attachment_messages.attachment_id = zerver_attachment.id
@@ -226,7 +225,7 @@ def move_related_objects_to_archive(msg_ids: List[int]) -> None:
move_attachments_to_archive(msg_ids)
move_attachment_messages_to_archive(msg_ids)
def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]]) -> int:
def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]], realm: Realm) -> int:
# This function is carefully designed to achieve our
# transactionality goals: A batch of messages is either fully
# archived-and-deleted or not transactionally.
@@ -244,6 +243,11 @@ def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]]) -> int:
except StopIteration:
break
archive_transaction = ArchiveTransaction.objects.create(
type=ArchiveTransaction.RETENTION_POLICY_BASED, realm=realm
)
ArchivedMessage.objects.filter(id__in=chunk).update(archive_transaction=archive_transaction)
move_related_objects_to_archive(chunk)
delete_messages(chunk)
message_count += len(chunk)
@@ -251,16 +255,16 @@ def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]]) -> int:
return message_count
def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int,
chunk_size: int=MESSAGE_BATCH_SIZE) -> int:
realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> int:
message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days,
chunk_size)
return run_archiving_in_chunks(message_id_chunks)
return run_archiving_in_chunks(message_id_chunks, realm)
def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
logger.info("Archiving personal and huddle messages for realm " + realm.string_id)
message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
message_count = run_archiving_in_chunks(message_id_chunks)
message_count = run_archiving_in_chunks(message_id_chunks, realm)
logger.info("Done. Archived {} messages".format(message_count))
@@ -284,7 +288,7 @@ def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) ->
message_count = 0
for recipient in recipients:
message_count += archive_messages_by_recipient(
recipient, retention_policy_dict[recipient.type_id], chunk_size
recipient, retention_policy_dict[recipient.type_id], realm, chunk_size
)
logger.info("Done. Archived {} messages.".format(message_count))
@@ -306,7 +310,9 @@ def move_messages_to_archive(message_ids: List[int]) -> None:
if not messages:
raise Message.DoesNotExist
ArchivedMessage.objects.bulk_create([ArchivedMessage(**message) for message in messages])
archive_transaction = ArchiveTransaction.objects.create(type=ArchiveTransaction.MANUAL)
ArchivedMessage.objects.bulk_create([ArchivedMessage(archive_transaction=archive_transaction,
**message) for message in messages])
move_related_objects_to_archive(message_ids)
# Remove data from main tables