mirror of
https://github.com/zulip/zulip.git
synced 2025-11-04 14:03:30 +00:00
retention: Log progress through the archiving process.
This commit is contained in:
committed by
Tim Abbott
parent
e3c7a5d896
commit
cbee5beeac
@@ -5,6 +5,8 @@ from django.conf import settings
|
|||||||
from django.db import connection, transaction
|
from django.db import connection, transaction
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from django.utils.timezone import now as timezone_now
|
from django.utils.timezone import now as timezone_now
|
||||||
|
|
||||||
|
from zerver.lib.logging_util import log_to_file
|
||||||
from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMessage, Realm,
|
from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMessage, Realm,
|
||||||
Attachment, ArchivedAttachment, Reaction, ArchivedReaction,
|
Attachment, ArchivedAttachment, Reaction, ArchivedReaction,
|
||||||
SubMessage, ArchivedSubMessage, Recipient, Stream,
|
SubMessage, ArchivedSubMessage, Recipient, Stream,
|
||||||
@@ -12,6 +14,11 @@ from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMe
|
|||||||
|
|
||||||
from typing import Any, Dict, List
|
from typing import Any, Dict, List
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger('zulip.retention')
|
||||||
|
log_to_file(logger, settings.RETENTION_LOG_PATH)
|
||||||
|
|
||||||
MESSAGE_BATCH_SIZE = 1000
|
MESSAGE_BATCH_SIZE = 1000
|
||||||
|
|
||||||
models_with_message_key = [
|
models_with_message_key = [
|
||||||
@@ -196,6 +203,7 @@ def delete_messages(msg_ids: List[int]) -> None:
|
|||||||
Message.objects.filter(id__in=msg_ids).delete()
|
Message.objects.filter(id__in=msg_ids).delete()
|
||||||
|
|
||||||
def delete_expired_attachments(realm: Realm) -> None:
|
def delete_expired_attachments(realm: Realm) -> None:
|
||||||
|
logger.info("Cleaning up attachments for realm " + realm.string_id)
|
||||||
Attachment.objects.filter(
|
Attachment.objects.filter(
|
||||||
messages__isnull=True,
|
messages__isnull=True,
|
||||||
realm_id=realm.id,
|
realm_id=realm.id,
|
||||||
@@ -208,20 +216,31 @@ def move_related_objects_to_archive(msg_ids: List[int]) -> None:
|
|||||||
move_attachments_message_rows_to_archive(msg_ids)
|
move_attachments_message_rows_to_archive(msg_ids)
|
||||||
|
|
||||||
def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int,
|
def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int,
|
||||||
chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
chunk_size: int=MESSAGE_BATCH_SIZE) -> int:
|
||||||
message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days,
|
message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days,
|
||||||
chunk_size)
|
chunk_size)
|
||||||
|
message_count = 0
|
||||||
for chunk in message_id_chunks:
|
for chunk in message_id_chunks:
|
||||||
move_related_objects_to_archive(chunk)
|
move_related_objects_to_archive(chunk)
|
||||||
delete_messages(chunk)
|
delete_messages(chunk)
|
||||||
|
message_count += len(chunk)
|
||||||
|
|
||||||
|
return message_count
|
||||||
|
|
||||||
def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
||||||
|
logger.info("Archiving personal and huddle messages for realm " + realm.string_id)
|
||||||
|
|
||||||
message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
|
message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
|
||||||
|
message_count = 0
|
||||||
for chunk in message_id_chunks:
|
for chunk in message_id_chunks:
|
||||||
move_related_objects_to_archive(chunk)
|
move_related_objects_to_archive(chunk)
|
||||||
delete_messages(chunk)
|
delete_messages(chunk)
|
||||||
|
message_count += len(chunk)
|
||||||
|
|
||||||
|
logger.info("Done. Archived {} messages".format(message_count))
|
||||||
|
|
||||||
def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
||||||
|
logger.info("Archiving stream messages for realm " + realm.string_id)
|
||||||
# We don't archive, if the stream has message_retention_days set to -1,
|
# We don't archive, if the stream has message_retention_days set to -1,
|
||||||
# or if neither the stream nor the realm have a retention policy.
|
# or if neither the stream nor the realm have a retention policy.
|
||||||
streams = Stream.objects.exclude(message_retention_days=-1).filter(
|
streams = Stream.objects.exclude(message_retention_days=-1).filter(
|
||||||
@@ -237,10 +256,17 @@ def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) ->
|
|||||||
retention_policy_dict[stream.id] = stream.realm.message_retention_days
|
retention_policy_dict[stream.id] = stream.realm.message_retention_days
|
||||||
|
|
||||||
recipients = get_stream_recipients([stream.id for stream in streams])
|
recipients = get_stream_recipients([stream.id for stream in streams])
|
||||||
|
message_count = 0
|
||||||
for recipient in recipients:
|
for recipient in recipients:
|
||||||
archive_messages_by_recipient(recipient, retention_policy_dict[recipient.type_id], chunk_size)
|
message_count += archive_messages_by_recipient(
|
||||||
|
recipient, retention_policy_dict[recipient.type_id], chunk_size
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info("Done. Archived {} messages.".format(message_count))
|
||||||
|
|
||||||
def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
||||||
|
logger.info("Starting the archiving process with chunk_size {}".format(chunk_size))
|
||||||
|
|
||||||
for realm in Realm.objects.all():
|
for realm in Realm.objects.all():
|
||||||
archive_stream_messages(realm, chunk_size)
|
archive_stream_messages(realm, chunk_size)
|
||||||
if realm.message_retention_days:
|
if realm.message_retention_days:
|
||||||
|
|||||||
@@ -1065,6 +1065,7 @@ ZULIP_PATHS = [
|
|||||||
("TRACEMALLOC_DUMP_DIR", "/var/log/zulip/tracemalloc"),
|
("TRACEMALLOC_DUMP_DIR", "/var/log/zulip/tracemalloc"),
|
||||||
("SCHEDULED_MESSAGE_DELIVERER_LOG_PATH",
|
("SCHEDULED_MESSAGE_DELIVERER_LOG_PATH",
|
||||||
"/var/log/zulip/scheduled_message_deliverer.log"),
|
"/var/log/zulip/scheduled_message_deliverer.log"),
|
||||||
|
("RETENTION_LOG_PATH", "/var/log/zulip/message_retention.log"),
|
||||||
]
|
]
|
||||||
|
|
||||||
# The Event log basically logs most significant database changes,
|
# The Event log basically logs most significant database changes,
|
||||||
@@ -1283,6 +1284,10 @@ LOGGING = {
|
|||||||
'zulip.queue': {
|
'zulip.queue': {
|
||||||
'level': 'WARNING',
|
'level': 'WARNING',
|
||||||
},
|
},
|
||||||
|
'zulip.retention': {
|
||||||
|
'handlers': ['file', 'errors_file'],
|
||||||
|
'propagate': False,
|
||||||
|
},
|
||||||
'zulip.soft_deactivation': {
|
'zulip.soft_deactivation': {
|
||||||
'handlers': ['file', 'errors_file'],
|
'handlers': ['file', 'errors_file'],
|
||||||
'propagate': False,
|
'propagate': False,
|
||||||
|
|||||||
Reference in New Issue
Block a user