mirror of
https://github.com/zulip/zulip.git
synced 2025-11-03 21:43:21 +00:00
retention: Optimize fetching of realms and streams with retention policy.
This commit is contained in:
committed by
Tim Abbott
parent
50d8d61d3c
commit
812ac4714f
@@ -12,7 +12,7 @@ from zerver.models import (Message, UserMessage, ArchivedUserMessage, Realm,
|
|||||||
SubMessage, ArchivedSubMessage, Recipient, Stream, ArchiveTransaction,
|
SubMessage, ArchivedSubMessage, Recipient, Stream, ArchiveTransaction,
|
||||||
get_user_including_cross_realm)
|
get_user_including_cross_realm)
|
||||||
|
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@@ -303,15 +303,7 @@ def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_B
|
|||||||
message_count = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
|
message_count = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
|
||||||
logger.info("Done. Archived %s messages", message_count)
|
logger.info("Done. Archived %s messages", message_count)
|
||||||
|
|
||||||
def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
def archive_stream_messages(realm: Realm, streams: List[Stream], chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
||||||
# We don't archive, if the stream has message_retention_days set to -1,
|
|
||||||
# or if neither the stream nor the realm have a retention policy.
|
|
||||||
query = Stream.objects.select_related("recipient").filter(
|
|
||||||
realm_id=realm.id).exclude(message_retention_days=-1)
|
|
||||||
if not realm.message_retention_days:
|
|
||||||
query = query.exclude(message_retention_days__isnull=True)
|
|
||||||
|
|
||||||
streams = list(query)
|
|
||||||
if not streams:
|
if not streams:
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -337,18 +329,57 @@ def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) ->
|
|||||||
def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
||||||
logger.info("Starting the archiving process with chunk_size %s", chunk_size)
|
logger.info("Starting the archiving process with chunk_size %s", chunk_size)
|
||||||
|
|
||||||
# We exclude SYSTEM_BOT_REALM here because the logic for archiving
|
for realm, streams in get_realms_and_streams_for_archiving():
|
||||||
# private messages and huddles isn't designed to correctly handle
|
archive_stream_messages(realm, streams, chunk_size)
|
||||||
# that realm. In practice, excluding it has no effect, because
|
|
||||||
# that realm is expected to always have message_retention_days=None.
|
|
||||||
for realm in Realm.objects.exclude(string_id=settings.SYSTEM_BOT_REALM):
|
|
||||||
archive_stream_messages(realm, chunk_size)
|
|
||||||
if realm.message_retention_days:
|
if realm.message_retention_days:
|
||||||
archive_personal_and_huddle_messages(realm, chunk_size)
|
archive_personal_and_huddle_messages(realm, chunk_size)
|
||||||
|
|
||||||
# Messages have been archived for the realm, now we can clean up attachments:
|
# Messages have been archived for the realm, now we can clean up attachments:
|
||||||
delete_expired_attachments(realm)
|
delete_expired_attachments(realm)
|
||||||
|
|
||||||
|
def get_realms_and_streams_for_archiving() -> List[Tuple[Realm, List[Stream]]]:
|
||||||
|
"""
|
||||||
|
This function constructs a list of (realm, streams_of_the_realm) tuples
|
||||||
|
where each realm is a Realm that requires calling the archiving functions on it,
|
||||||
|
and streams_of_the_realm is a list of streams of the realm to call archive_stream_messages with.
|
||||||
|
|
||||||
|
The purpose of this is performance - for servers with thousands of realms, it is important
|
||||||
|
to fetch all this data in bulk.
|
||||||
|
"""
|
||||||
|
|
||||||
|
realm_id_to_realm = {}
|
||||||
|
realm_id_to_streams_list: Dict[int, List[Stream]] = {}
|
||||||
|
|
||||||
|
# All realms with a retention policy set qualify for archiving:
|
||||||
|
for realm in Realm.objects.filter(message_retention_days__isnull=False):
|
||||||
|
realm_id_to_realm[realm.id] = realm
|
||||||
|
realm_id_to_streams_list[realm.id] = []
|
||||||
|
|
||||||
|
# Now we find all streams that require archiving.
|
||||||
|
# First category are streams in retention-enabled realms,
|
||||||
|
# that don't have retention explicitly disabled (through the value -1).
|
||||||
|
query_one = Stream.objects.exclude(message_retention_days=-1) \
|
||||||
|
.filter(realm__message_retention_days__isnull=False) \
|
||||||
|
.select_related('realm', 'recipient')
|
||||||
|
# Second category are streams that are in realms without a realm-wide retention policy,
|
||||||
|
# but have their own stream-specific policy enabled.
|
||||||
|
query_two = Stream.objects.filter(realm__message_retention_days__isnull=True) \
|
||||||
|
.exclude(message_retention_days__isnull=True) \
|
||||||
|
.exclude(message_retention_days=-1) \
|
||||||
|
.select_related('realm', 'recipient')
|
||||||
|
query = query_one.union(query_two)
|
||||||
|
|
||||||
|
for stream in query:
|
||||||
|
realm = stream.realm
|
||||||
|
realm_id_to_realm[realm.id] = realm
|
||||||
|
if realm.id not in realm_id_to_streams_list:
|
||||||
|
realm_id_to_streams_list[realm.id] = []
|
||||||
|
|
||||||
|
realm_id_to_streams_list[realm.id].append(stream)
|
||||||
|
|
||||||
|
return [(realm_id_to_realm[realm_id], realm_id_to_streams_list[realm_id])
|
||||||
|
for realm_id in realm_id_to_realm]
|
||||||
|
|
||||||
def move_messages_to_archive(message_ids: List[int], chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
def move_messages_to_archive(message_ids: List[int], chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
|
||||||
query = SQL("""
|
query = SQL("""
|
||||||
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id)
|
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id)
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
@@ -18,6 +18,7 @@ from zerver.lib.retention import (
|
|||||||
move_messages_to_archive,
|
move_messages_to_archive,
|
||||||
restore_all_data_from_archive,
|
restore_all_data_from_archive,
|
||||||
clean_archived_data,
|
clean_archived_data,
|
||||||
|
get_realms_and_streams_for_archiving,
|
||||||
)
|
)
|
||||||
from zerver.tornado.event_queue import send_event
|
from zerver.tornado.event_queue import send_event
|
||||||
|
|
||||||
@@ -745,6 +746,93 @@ class TestCleaningArchive(ArchiveMessagesTestingBase):
|
|||||||
for message in ArchivedMessage.objects.all():
|
for message in ArchivedMessage.objects.all():
|
||||||
self.assertEqual(message.archive_transaction_id, remaining_transactions[0].id)
|
self.assertEqual(message.archive_transaction_id, remaining_transactions[0].id)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetRealmAndStreamsForArchiving(ZulipTestCase):
|
||||||
|
def fix_ordering_of_result(self, result: List[Tuple[Realm, List[Stream]]]) -> None:
|
||||||
|
"""
|
||||||
|
This is a helper for giving the struture returned by get_realms_and_streams_for_archiving
|
||||||
|
a consistent ordering.
|
||||||
|
"""
|
||||||
|
# Sort the list of tuples by realm id:
|
||||||
|
result.sort(key=lambda x: x[0].id)
|
||||||
|
|
||||||
|
# Now we sort the lists of streams in each tuple:
|
||||||
|
for realm, streams_list in result:
|
||||||
|
streams_list.sort(key=lambda stream: stream.id)
|
||||||
|
|
||||||
|
def simple_get_realms_and_streams_for_archiving(self) -> List[Tuple[Realm, List[Stream]]]:
|
||||||
|
"""
|
||||||
|
This is an implementation of the function we're testing, but using the obvious,
|
||||||
|
unoptimized algorithm. We can use this for additional verification of correctness,
|
||||||
|
by comparing the output of the two implementations.
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = []
|
||||||
|
for realm in Realm.objects.all():
|
||||||
|
if realm.message_retention_days is not None:
|
||||||
|
streams = Stream.objects.filter(realm=realm).exclude(message_retention_days=-1)
|
||||||
|
result.append((realm, list(streams)))
|
||||||
|
else:
|
||||||
|
streams = Stream.objects.filter(realm=realm).exclude(message_retention_days__isnull=True) \
|
||||||
|
.exclude(message_retention_days=-1)
|
||||||
|
if streams.exists():
|
||||||
|
result.append((realm, list(streams)))
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def test_get_realms_and_streams_for_archiving(self) -> None:
|
||||||
|
zulip_realm = get_realm("zulip")
|
||||||
|
zulip_realm.message_retention_days = 10
|
||||||
|
zulip_realm.save()
|
||||||
|
|
||||||
|
verona = get_stream("Verona", zulip_realm)
|
||||||
|
verona.message_retention_days = -1 # Block archiving for this stream
|
||||||
|
verona.save()
|
||||||
|
denmark = get_stream("Denmark", zulip_realm)
|
||||||
|
denmark.message_retention_days = 1
|
||||||
|
denmark.save()
|
||||||
|
|
||||||
|
zephyr_realm = get_realm("zephyr")
|
||||||
|
zephyr_realm.message_retention_days = None
|
||||||
|
zephyr_realm.save()
|
||||||
|
self.make_stream("normal stream", realm=zephyr_realm)
|
||||||
|
|
||||||
|
archiving_blocked_zephyr_stream = self.make_stream("no archiving", realm=zephyr_realm)
|
||||||
|
archiving_blocked_zephyr_stream.message_retention_days = -1
|
||||||
|
archiving_blocked_zephyr_stream.save()
|
||||||
|
|
||||||
|
archiving_enabled_zephyr_stream = self.make_stream("with archiving", realm=zephyr_realm)
|
||||||
|
archiving_enabled_zephyr_stream.message_retention_days = 1
|
||||||
|
archiving_enabled_zephyr_stream.save()
|
||||||
|
|
||||||
|
Realm.objects.create(string_id="no_archiving", invite_required=False, message_retention_days=None)
|
||||||
|
empty_realm_with_archiving = Realm.objects.create(string_id="with_archiving", invite_required=False,
|
||||||
|
message_retention_days=1)
|
||||||
|
|
||||||
|
# We construct a list representing how the result of get_realms_and_streams_for_archiving should be.
|
||||||
|
# One nuisance is that the ordering of the elements in the result structure is not deterministic,
|
||||||
|
# so we use a helper to order both structures in a consistent manner. This wouldn't be necessary
|
||||||
|
# if python had a true "unordered list" data structure. Set doesn't do the job, because it requires
|
||||||
|
# elements to be hashable.
|
||||||
|
expected_result = [
|
||||||
|
(zulip_realm, list(Stream.objects.filter(realm=zulip_realm).exclude(id=verona.id))),
|
||||||
|
(zephyr_realm, [archiving_enabled_zephyr_stream]),
|
||||||
|
(empty_realm_with_archiving, []),
|
||||||
|
]
|
||||||
|
self.fix_ordering_of_result(expected_result)
|
||||||
|
|
||||||
|
simple_algorithm_result = self.simple_get_realms_and_streams_for_archiving()
|
||||||
|
self.fix_ordering_of_result(simple_algorithm_result)
|
||||||
|
|
||||||
|
result = get_realms_and_streams_for_archiving()
|
||||||
|
self.fix_ordering_of_result(result)
|
||||||
|
|
||||||
|
self.assert_length(result, len(expected_result))
|
||||||
|
self.assertEqual(result, expected_result)
|
||||||
|
|
||||||
|
self.assert_length(result, len(simple_algorithm_result))
|
||||||
|
self.assertEqual(result, simple_algorithm_result)
|
||||||
|
|
||||||
class TestDoDeleteMessages(ZulipTestCase):
|
class TestDoDeleteMessages(ZulipTestCase):
|
||||||
def test_do_delete_messages_multiple(self) -> None:
|
def test_do_delete_messages_multiple(self) -> None:
|
||||||
realm = get_realm("zulip")
|
realm = get_realm("zulip")
|
||||||
|
|||||||
Reference in New Issue
Block a user