retention: Add functions for restoring archived data.

Functions for restoring archived data are added and existing tests are
expanded to restore data they archived and check correctness.
This commit is contained in:
Mateusz Mandera
2019-06-24 17:19:22 +02:00
committed by Tim Abbott
parent 9acd3b0f46
commit 6e46c6d752
2 changed files with 164 additions and 0 deletions

View File

@@ -329,3 +329,96 @@ def move_messages_to_archive(message_ids: List[int], chunk_size: int=MESSAGE_BAT
# Clean up attachments:
archived_attachments = ArchivedAttachment.objects.filter(messages__id__in=message_ids).distinct()
Attachment.objects.filter(messages__isnull=True, id__in=archived_attachments).delete()
def restore_messages_from_archive(archive_transaction_id: int) -> List[int]:
query = """
INSERT INTO zerver_message ({dst_fields})
SELECT {src_fields}
FROM zerver_archivedmessage
LEFT JOIN zerver_message ON zerver_archivedmessage.id = zerver_message.id
WHERE zerver_archivedmessage.archive_transaction_id = {archive_transaction_id}
AND zerver_message.id is NULL
RETURNING id
"""
return move_rows(Message, query, src_db_table='zerver_archivedmessage', returning_id=True,
archive_transaction_id=archive_transaction_id)
def restore_models_with_message_key_from_archive(archive_transaction_id: int) -> None:
for model in models_with_message_key:
query = """
INSERT INTO {table_name} ({dst_fields})
SELECT {src_fields}
FROM {archive_table_name}
INNER JOIN zerver_archivedmessage ON {archive_table_name}.message_id = zerver_archivedmessage.id
LEFT JOIN {table_name} ON {archive_table_name}.id = {table_name}.id
WHERE zerver_archivedmessage.archive_transaction_id = {archive_transaction_id}
AND {table_name}.id IS NULL
"""
move_rows(model['class'], query, src_db_table=model['archive_table_name'],
table_name=model['table_name'],
archive_transaction_id=archive_transaction_id,
archive_table_name=model['archive_table_name'])
def restore_attachments_from_archive(archive_transaction_id: int) -> None:
query = """
INSERT INTO zerver_attachment ({dst_fields})
SELECT {src_fields}
FROM zerver_archivedattachment
INNER JOIN zerver_archivedattachment_messages
ON zerver_archivedattachment_messages.archivedattachment_id = zerver_archivedattachment.id
INNER JOIN zerver_archivedmessage
ON zerver_archivedattachment_messages.archivedmessage_id = zerver_archivedmessage.id
LEFT JOIN zerver_attachment ON zerver_archivedattachment.id = zerver_attachment.id
WHERE zerver_archivedmessage.archive_transaction_id = {archive_transaction_id}
AND zerver_attachment.id IS NULL
GROUP BY zerver_archivedattachment.id
"""
move_rows(Attachment, query, src_db_table='zerver_archivedattachment',
archive_transaction_id=archive_transaction_id)
def restore_attachment_messages_from_archive(archive_transaction_id: int) -> None:
query = """
INSERT INTO zerver_attachment_messages (id, attachment_id, message_id)
SELECT zerver_archivedattachment_messages.id,
zerver_archivedattachment_messages.archivedattachment_id,
zerver_archivedattachment_messages.archivedmessage_id
FROM zerver_archivedattachment_messages
INNER JOIN zerver_archivedmessage
ON zerver_archivedattachment_messages.archivedmessage_id = zerver_archivedmessage.id
LEFT JOIN zerver_attachment_messages
ON zerver_archivedattachment_messages.id = zerver_attachment_messages.id
WHERE zerver_archivedmessage.archive_transaction_id = {archive_transaction_id}
AND zerver_attachment_messages.id IS NULL
"""
with connection.cursor() as cursor:
cursor.execute(query.format(archive_transaction_id=archive_transaction_id))
@transaction.atomic
def restore_data_from_archive(archive_transaction: ArchiveTransaction) -> None:
restore_messages_from_archive(archive_transaction.id)
restore_models_with_message_key_from_archive(archive_transaction.id)
restore_attachments_from_archive(archive_transaction.id)
restore_attachment_messages_from_archive(archive_transaction.id)
archive_transaction.restored = True
archive_transaction.save()
def restore_data_from_archive_by_transactions(archive_transactions: List[ArchiveTransaction]) -> None:
# Looping over the list of ids means we're batching the restoration process by the size of the
# transactions:
for archive_transaction in archive_transactions:
restore_data_from_archive(archive_transaction)
def restore_data_from_archive_by_realm(realm: Realm) -> None:
transactions = ArchiveTransaction.objects.exclude(restored=True).filter(realm=realm)
restore_data_from_archive_by_transactions(transactions)
def restore_all_data_from_archive(restore_manual_transactions: bool=True) -> None:
for realm in Realm.objects.all():
restore_data_from_archive_by_realm(realm)
if restore_manual_transactions:
restore_data_from_archive_by_transactions(
ArchiveTransaction.objects.exclude(restored=True).filter(type=ArchiveTransaction.MANUAL)
)