Files
zulip/zerver/management/commands/export_usermessage_batch.py
Mateusz Mandera 9a49b6a62c export: Plumb consented_user_ids to export_usermessage_batch in a file.
This allows us to get rid of the call to `get_consented_user_ids` in
`fetch_usermessages`. Now it's only called at the beginning of the
export, eliminating the redundant db query and also resolving the
potential for data consistency issues, if some users change their
consent setting after the export starts.

Now the full export process operates with a single snapshot of these
consenting user ids.

These ids need to be plumbed through via a file rather than normal arg
passing, because this is a separate management command, run in
subprocesses during the export.
2025-03-28 17:44:28 -07:00

62 lines
2.3 KiB
Python

import glob
import logging
import os
from argparse import ArgumentParser
from typing import Any
import orjson
from typing_extensions import override
from zerver.lib.export import export_usermessages_batch
from zerver.lib.management import ZulipBaseCommand
class Command(ZulipBaseCommand):
help = """UserMessage fetching helper for export.py"""
@override
def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument("--path", help="Path to find messages.json archives")
parser.add_argument("--thread", help="Thread ID")
parser.add_argument(
"--export-full-with-consent",
action="store_true",
help="Whether to export private data of users who consented",
)
@override
def handle(self, *args: Any, **options: Any) -> None:
logging.info("Starting UserMessage batch thread %s", options["thread"])
path = options["path"]
files = set(glob.glob(os.path.join(path, "messages-*.json.partial")))
export_full_with_consent = options["export_full_with_consent"]
consented_user_ids = None
if export_full_with_consent:
consented_user_ids_path = os.path.join(path, "consented_user_ids.json")
assert os.path.exists(consented_user_ids_path)
with open(consented_user_ids_path, "rb") as f:
consented_user_ids = set(orjson.loads(f.read()))
for partial_path in files:
locked_path = partial_path.replace(".json.partial", ".json.locked")
output_path = partial_path.replace(".json.partial", ".json")
try:
os.rename(partial_path, locked_path)
except FileNotFoundError:
# Already claimed by another process
continue
logging.info("Thread %s processing %s", options["thread"], output_path)
try:
export_usermessages_batch(
locked_path,
output_path,
export_full_with_consent,
consented_user_ids=consented_user_ids,
)
except BaseException:
# Put the item back in the free pool when we fail
os.rename(locked_path, partial_path)
raise