diff --git a/zerver/lib/export.py b/zerver/lib/export.py index 63d1c4c973..29ba9ff311 100644 --- a/zerver/lib/export.py +++ b/zerver/lib/export.py @@ -88,6 +88,7 @@ ALL_ZULIP_TABLES = { 'zerver_attachment_messages', 'zerver_archivedreaction', 'zerver_archivedsubmessage', + 'zerver_archivetransaction', 'zerver_botconfigdata', 'zerver_botstoragedata', 'zerver_client', @@ -176,6 +177,7 @@ NON_EXPORTED_TABLES = { 'zerver_archivedattachment_messages', 'zerver_archivedreaction', 'zerver_archivedsubmessage', + 'zerver_archivetransaction', # Social auth tables are not needed post-export, since we don't # use any of this state outside of a direct authentication flow. diff --git a/zerver/lib/retention.py b/zerver/lib/retention.py index 5dc383a765..2f8b28df1d 100644 --- a/zerver/lib/retention.py +++ b/zerver/lib/retention.py @@ -9,7 +9,7 @@ from django.utils.timezone import now as timezone_now from zerver.lib.logging_util import log_to_file from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMessage, Realm, Attachment, ArchivedAttachment, Reaction, ArchivedReaction, - SubMessage, ArchivedSubMessage, Recipient, Stream, + SubMessage, ArchivedSubMessage, Recipient, Stream, ArchiveTransaction, get_stream_recipients, get_user_including_cross_realm) from typing import Any, Dict, Iterator, List @@ -51,7 +51,6 @@ def move_rows(src_model: Any, raw_query: str, returning_id: bool=False, sql_args = { 'src_fields': ','.join(src_fields), 'dst_fields': ','.join(dst_fields), - 'archive_timestamp': timezone_now() } sql_args.update(kwargs) with connection.cursor() as cursor: @@ -96,8 +95,8 @@ def move_expired_messages_to_archive_by_recipient(recipient: Recipient, # Important: This function is a generator, and you need to iterate # through the Iterator it returns to execute the queries. query = """ - INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp) - SELECT {src_fields}, '{archive_timestamp}' + INSERT INTO zerver_archivedmessage ({dst_fields}) + SELECT {src_fields} FROM zerver_message LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id WHERE zerver_message.recipient_id = {recipient_id} @@ -126,8 +125,8 @@ def move_expired_personal_and_huddle_messages_to_archive(realm: Realm, # TODO: Remove the "zerver_userprofile.id NOT IN {cross_realm_bot_ids}" clause # once https://github.com/zulip/zulip/issues/11015 is solved. query = """ - INSERT INTO zerver_archivedmessage ({dst_fields}, archive_timestamp) - SELECT {src_fields}, '{archive_timestamp}' + INSERT INTO zerver_archivedmessage ({dst_fields}) + SELECT {src_fields} FROM zerver_message INNER JOIN zerver_recipient ON zerver_recipient.id = zerver_message.recipient_id INNER JOIN zerver_userprofile ON zerver_userprofile.id = zerver_message.sender_id @@ -153,8 +152,8 @@ def move_to_archive_and_delete_models_with_message_key(msg_ids: List[int]) -> No for model in models_with_message_key: query = """ WITH archived_data AS ( - INSERT INTO {archive_table_name} ({dst_fields}, archive_timestamp) - SELECT {src_fields}, '{archive_timestamp}' + INSERT INTO {archive_table_name} ({dst_fields}) + SELECT {src_fields} FROM {table_name} LEFT JOIN {archive_table_name} ON {archive_table_name}.id = {table_name}.id WHERE {table_name}.message_id IN {message_ids} @@ -172,8 +171,8 @@ def move_attachments_to_archive(msg_ids: List[int]) -> None: assert len(msg_ids) > 0 query = """ - INSERT INTO zerver_archivedattachment ({dst_fields}, archive_timestamp) - SELECT {src_fields}, '{archive_timestamp}' + INSERT INTO zerver_archivedattachment ({dst_fields}) + SELECT {src_fields} FROM zerver_attachment INNER JOIN zerver_attachment_messages ON zerver_attachment_messages.attachment_id = zerver_attachment.id @@ -226,7 +225,7 @@ def move_related_objects_to_archive(msg_ids: List[int]) -> None: move_attachments_to_archive(msg_ids) move_attachment_messages_to_archive(msg_ids) -def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]]) -> int: +def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]], realm: Realm) -> int: # This function is carefully designed to achieve our # transactionality goals: A batch of messages is either fully # archived-and-deleted or not transactionally. @@ -244,6 +243,11 @@ def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]]) -> int: except StopIteration: break + archive_transaction = ArchiveTransaction.objects.create( + type=ArchiveTransaction.RETENTION_POLICY_BASED, realm=realm + ) + ArchivedMessage.objects.filter(id__in=chunk).update(archive_transaction=archive_transaction) + move_related_objects_to_archive(chunk) delete_messages(chunk) message_count += len(chunk) @@ -251,16 +255,16 @@ def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]]) -> int: return message_count def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int, - chunk_size: int=MESSAGE_BATCH_SIZE) -> int: + realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> int: message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days, chunk_size) - return run_archiving_in_chunks(message_id_chunks) + return run_archiving_in_chunks(message_id_chunks, realm) def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None: logger.info("Archiving personal and huddle messages for realm " + realm.string_id) message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size) - message_count = run_archiving_in_chunks(message_id_chunks) + message_count = run_archiving_in_chunks(message_id_chunks, realm) logger.info("Done. Archived {} messages".format(message_count)) @@ -284,7 +288,7 @@ def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> message_count = 0 for recipient in recipients: message_count += archive_messages_by_recipient( - recipient, retention_policy_dict[recipient.type_id], chunk_size + recipient, retention_policy_dict[recipient.type_id], realm, chunk_size ) logger.info("Done. Archived {} messages.".format(message_count)) @@ -306,7 +310,9 @@ def move_messages_to_archive(message_ids: List[int]) -> None: if not messages: raise Message.DoesNotExist - ArchivedMessage.objects.bulk_create([ArchivedMessage(**message) for message in messages]) + archive_transaction = ArchiveTransaction.objects.create(type=ArchiveTransaction.MANUAL) + ArchivedMessage.objects.bulk_create([ArchivedMessage(archive_transaction=archive_transaction, + **message) for message in messages]) move_related_objects_to_archive(message_ids) # Remove data from main tables diff --git a/zerver/migrations/0231_add_archive_transaction_model.py b/zerver/migrations/0231_add_archive_transaction_model.py new file mode 100644 index 0000000000..e08ada7fa8 --- /dev/null +++ b/zerver/migrations/0231_add_archive_transaction_model.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.20 on 2019-06-23 21:20 +from __future__ import unicode_literals + +from django.db import migrations, models +import django.db.models.deletion +import django.utils.timezone + + +class Migration(migrations.Migration): + + dependencies = [ + ('zerver', '0230_rename_to_enable_stream_audible_notifications'), + ] + + operations = [ + migrations.CreateModel( + name='ArchiveTransaction', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('timestamp', models.DateTimeField(db_index=True, default=django.utils.timezone.now)), + ('restored', models.BooleanField(db_index=True, default=False)), + ('type', models.PositiveSmallIntegerField(db_index=True)), + ('realm', models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='zerver.Realm')), + ], + ), + migrations.RemoveField( + model_name='archivedattachment', + name='archive_timestamp', + ), + migrations.RemoveField( + model_name='archivedmessage', + name='archive_timestamp', + ), + migrations.RemoveField( + model_name='archivedreaction', + name='archive_timestamp', + ), + migrations.RemoveField( + model_name='archivedsubmessage', + name='archive_timestamp', + ), + migrations.RemoveField( + model_name='archivedusermessage', + name='archive_timestamp', + ), + migrations.AddField( + model_name='archivedmessage', + name='archive_transaction', + field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, to='zerver.ArchiveTransaction'), + ), + ] diff --git a/zerver/models.py b/zerver/models.py index 82a75213c1..ce92a0a1df 100644 --- a/zerver/models.py +++ b/zerver/models.py @@ -1519,14 +1519,26 @@ class AbstractMessage(models.Model): return "<%s: %s / %s / %s>" % (self.__class__.__name__, display_recipient, self.subject, self.sender) +class ArchiveTransaction(models.Model): + timestamp = models.DateTimeField(default=timezone_now, db_index=True) # type: datetime.datetime + # Marks if the data archived in this transaction has been restored: + restored = models.BooleanField(default=False, db_index=True) # type: bool + + type = models.PositiveSmallIntegerField(db_index=True) # type: int + # Valid types: + RETENTION_POLICY_BASED = 1 # Archiving was executed due to automated retention policies + MANUAL = 2 # Archiving was run manually, via move_messages_to_archive function + + # ForeignKey to the realm with which objects archived in this transaction are associated. + # If type is set to MANUAL, this should be null. + realm = models.ForeignKey(Realm, null=True, on_delete=CASCADE) # type: Optional[Realm] class ArchivedMessage(AbstractMessage): """Used as a temporary holding place for deleted messages before they are permanently deleted. This is an important part of a robust 'message retention' feature. """ - archive_timestamp = models.DateTimeField(default=timezone_now, db_index=True) # type: datetime.datetime - + archive_transaction = models.ForeignKey(ArchiveTransaction, on_delete=CASCADE, null=True) # type: Optional[ArchiveTransaction] class Message(AbstractMessage): @@ -1671,7 +1683,6 @@ class SubMessage(AbstractSubMessage): class ArchivedSubMessage(AbstractSubMessage): message = models.ForeignKey(ArchivedMessage, on_delete=CASCADE) # type: ArchivedMessage - archive_timestamp = models.DateTimeField(default=timezone_now, db_index=True) # type: datetime.datetime post_save.connect(flush_submessage, sender=SubMessage) @@ -1730,7 +1741,6 @@ class Reaction(AbstractReaction): class ArchivedReaction(AbstractReaction): message = models.ForeignKey(ArchivedMessage, on_delete=CASCADE) # type: ArchivedMessage - archive_timestamp = models.DateTimeField(default=timezone_now, db_index=True) # type: datetime.datetime # Whenever a message is sent, for each user subscribed to the # corresponding Recipient object, we add a row to the UserMessage @@ -1859,8 +1869,6 @@ class ArchivedUserMessage(AbstractUserMessage): a robust 'message retention' feature. """ message = models.ForeignKey(ArchivedMessage, on_delete=CASCADE) # type: Message - archive_timestamp = models.DateTimeField(default=timezone_now, db_index=True) # type: datetime.datetime - class AbstractAttachment(models.Model): file_name = models.TextField(db_index=True) # type: str @@ -1895,10 +1903,8 @@ class ArchivedAttachment(AbstractAttachment): before they are permanently deleted. This is an important part of a robust 'message retention' feature. """ - archive_timestamp = models.DateTimeField(default=timezone_now, db_index=True) # type: datetime.datetime messages = models.ManyToManyField(ArchivedMessage) # type: Manager - class Attachment(AbstractAttachment): messages = models.ManyToManyField(Message) # type: Manager