diff --git a/zerver/lib/export.py b/zerver/lib/export.py index 38e83e5deb..8bf0577eda 100644 --- a/zerver/lib/export.py +++ b/zerver/lib/export.py @@ -1413,6 +1413,7 @@ def fetch_usermessages( user_profile_ids: set[int], message_filename: Path, export_full_with_consent: bool, + consented_user_ids: set[int] | None = None, ) -> list[Record]: # UserMessage export security rule: You can export UserMessages # for the messages you exported for the users in your realm. @@ -1420,7 +1421,7 @@ def fetch_usermessages( user_profile__realm=realm, message_id__in=message_ids ) if export_full_with_consent: - consented_user_ids = get_consented_user_ids(realm) + assert consented_user_ids is not None user_profile_ids = consented_user_ids & user_profile_ids user_message_chunk = [] for user_message in user_message_query: @@ -1435,7 +1436,10 @@ def fetch_usermessages( def export_usermessages_batch( - input_path: Path, output_path: Path, export_full_with_consent: bool + input_path: Path, + output_path: Path, + export_full_with_consent: bool, + consented_user_ids: set[int] | None = None, ) -> None: """As part of the system for doing parallel exports, this runs on one batch of Message objects and adds the corresponding UserMessage @@ -1453,7 +1457,12 @@ def export_usermessages_batch( user_profile_ids = set(input_data["zerver_userprofile_ids"]) realm = Realm.objects.get(id=input_data["realm_id"]) zerver_usermessage_data = fetch_usermessages( - realm, message_ids, user_profile_ids, output_path, export_full_with_consent + realm, + message_ids, + user_profile_ids, + output_path, + export_full_with_consent, + consented_user_ids=consented_user_ids, ) output_data: TableData = dict( @@ -2270,6 +2279,7 @@ def do_export_realm( threads=threads, output_dir=output_dir, export_full_with_consent=export_type == RealmExport.EXPORT_FULL_WITH_CONSENT, + exportable_user_ids=exportable_user_ids, ) do_common_export_processes(output_dir) @@ -2329,11 +2339,21 @@ def create_soft_link(source: Path, in_progress: bool = True) -> None: def launch_user_message_subprocesses( - threads: int, output_dir: Path, export_full_with_consent: bool + threads: int, + output_dir: Path, + export_full_with_consent: bool, + exportable_user_ids: set[int] | None, ) -> None: logging.info("Launching %d PARALLEL subprocesses to export UserMessage rows", threads) pids = {} + if export_full_with_consent: + assert exportable_user_ids is not None + consented_user_ids_filepath = os.path.join(output_dir, "consented_user_ids.json") + with open(consented_user_ids_filepath, "wb") as f: + f.write(orjson.dumps(list(exportable_user_ids))) + logging.info("Created consented_user_ids.json file.") + for shard_id in range(threads): arguments = [ os.path.join(settings.DEPLOY_ROOT, "manage.py"), diff --git a/zerver/management/commands/export_usermessage_batch.py b/zerver/management/commands/export_usermessage_batch.py index 33724e1d86..2e19c9ada4 100644 --- a/zerver/management/commands/export_usermessage_batch.py +++ b/zerver/management/commands/export_usermessage_batch.py @@ -4,6 +4,7 @@ import os from argparse import ArgumentParser from typing import Any +import orjson from typing_extensions import override from zerver.lib.export import export_usermessages_batch @@ -26,7 +27,18 @@ class Command(ZulipBaseCommand): @override def handle(self, *args: Any, **options: Any) -> None: logging.info("Starting UserMessage batch thread %s", options["thread"]) - files = set(glob.glob(os.path.join(options["path"], "messages-*.json.partial"))) + path = options["path"] + files = set(glob.glob(os.path.join(path, "messages-*.json.partial"))) + + export_full_with_consent = options["export_full_with_consent"] + consented_user_ids = None + if export_full_with_consent: + consented_user_ids_path = os.path.join(path, "consented_user_ids.json") + assert os.path.exists(consented_user_ids_path) + + with open(consented_user_ids_path, "rb") as f: + consented_user_ids = set(orjson.loads(f.read())) + for partial_path in files: locked_path = partial_path.replace(".json.partial", ".json.locked") output_path = partial_path.replace(".json.partial", ".json") @@ -38,7 +50,10 @@ class Command(ZulipBaseCommand): logging.info("Thread %s processing %s", options["thread"], output_path) try: export_usermessages_batch( - locked_path, output_path, options["export_full_with_consent"] + locked_path, + output_path, + export_full_with_consent, + consented_user_ids=consented_user_ids, ) except BaseException: # Put the item back in the free pool when we fail