export: Plumb consented_user_ids to export_usermessage_batch in a file.

This allows us to get rid of the call to `get_consented_user_ids` in
`fetch_usermessages`. Now it's only called at the beginning of the
export, eliminating the redundant db query and also resolving the
potential for data consistency issues, if some users change their
consent setting after the export starts.

Now the full export process operates with a single snapshot of these
consenting user ids.

These ids need to be plumbed through via a file rather than normal arg
passing, because this is a separate management command, run in
subprocesses during the export.
This commit is contained in:
Mateusz Mandera
2025-03-07 20:51:51 +08:00
committed by Tim Abbott
parent 747e73470e
commit 13303fd916
2 changed files with 41 additions and 6 deletions

View File

@@ -1413,6 +1413,7 @@ def fetch_usermessages(
user_profile_ids: set[int], user_profile_ids: set[int],
message_filename: Path, message_filename: Path,
export_full_with_consent: bool, export_full_with_consent: bool,
consented_user_ids: set[int] | None = None,
) -> list[Record]: ) -> list[Record]:
# UserMessage export security rule: You can export UserMessages # UserMessage export security rule: You can export UserMessages
# for the messages you exported for the users in your realm. # for the messages you exported for the users in your realm.
@@ -1420,7 +1421,7 @@ def fetch_usermessages(
user_profile__realm=realm, message_id__in=message_ids user_profile__realm=realm, message_id__in=message_ids
) )
if export_full_with_consent: if export_full_with_consent:
consented_user_ids = get_consented_user_ids(realm) assert consented_user_ids is not None
user_profile_ids = consented_user_ids & user_profile_ids user_profile_ids = consented_user_ids & user_profile_ids
user_message_chunk = [] user_message_chunk = []
for user_message in user_message_query: for user_message in user_message_query:
@@ -1435,7 +1436,10 @@ def fetch_usermessages(
def export_usermessages_batch( def export_usermessages_batch(
input_path: Path, output_path: Path, export_full_with_consent: bool input_path: Path,
output_path: Path,
export_full_with_consent: bool,
consented_user_ids: set[int] | None = None,
) -> None: ) -> None:
"""As part of the system for doing parallel exports, this runs on one """As part of the system for doing parallel exports, this runs on one
batch of Message objects and adds the corresponding UserMessage batch of Message objects and adds the corresponding UserMessage
@@ -1453,7 +1457,12 @@ def export_usermessages_batch(
user_profile_ids = set(input_data["zerver_userprofile_ids"]) user_profile_ids = set(input_data["zerver_userprofile_ids"])
realm = Realm.objects.get(id=input_data["realm_id"]) realm = Realm.objects.get(id=input_data["realm_id"])
zerver_usermessage_data = fetch_usermessages( zerver_usermessage_data = fetch_usermessages(
realm, message_ids, user_profile_ids, output_path, export_full_with_consent realm,
message_ids,
user_profile_ids,
output_path,
export_full_with_consent,
consented_user_ids=consented_user_ids,
) )
output_data: TableData = dict( output_data: TableData = dict(
@@ -2270,6 +2279,7 @@ def do_export_realm(
threads=threads, threads=threads,
output_dir=output_dir, output_dir=output_dir,
export_full_with_consent=export_type == RealmExport.EXPORT_FULL_WITH_CONSENT, export_full_with_consent=export_type == RealmExport.EXPORT_FULL_WITH_CONSENT,
exportable_user_ids=exportable_user_ids,
) )
do_common_export_processes(output_dir) do_common_export_processes(output_dir)
@@ -2329,11 +2339,21 @@ def create_soft_link(source: Path, in_progress: bool = True) -> None:
def launch_user_message_subprocesses( def launch_user_message_subprocesses(
threads: int, output_dir: Path, export_full_with_consent: bool threads: int,
output_dir: Path,
export_full_with_consent: bool,
exportable_user_ids: set[int] | None,
) -> None: ) -> None:
logging.info("Launching %d PARALLEL subprocesses to export UserMessage rows", threads) logging.info("Launching %d PARALLEL subprocesses to export UserMessage rows", threads)
pids = {} pids = {}
if export_full_with_consent:
assert exportable_user_ids is not None
consented_user_ids_filepath = os.path.join(output_dir, "consented_user_ids.json")
with open(consented_user_ids_filepath, "wb") as f:
f.write(orjson.dumps(list(exportable_user_ids)))
logging.info("Created consented_user_ids.json file.")
for shard_id in range(threads): for shard_id in range(threads):
arguments = [ arguments = [
os.path.join(settings.DEPLOY_ROOT, "manage.py"), os.path.join(settings.DEPLOY_ROOT, "manage.py"),

View File

@@ -4,6 +4,7 @@ import os
from argparse import ArgumentParser from argparse import ArgumentParser
from typing import Any from typing import Any
import orjson
from typing_extensions import override from typing_extensions import override
from zerver.lib.export import export_usermessages_batch from zerver.lib.export import export_usermessages_batch
@@ -26,7 +27,18 @@ class Command(ZulipBaseCommand):
@override @override
def handle(self, *args: Any, **options: Any) -> None: def handle(self, *args: Any, **options: Any) -> None:
logging.info("Starting UserMessage batch thread %s", options["thread"]) logging.info("Starting UserMessage batch thread %s", options["thread"])
files = set(glob.glob(os.path.join(options["path"], "messages-*.json.partial"))) path = options["path"]
files = set(glob.glob(os.path.join(path, "messages-*.json.partial")))
export_full_with_consent = options["export_full_with_consent"]
consented_user_ids = None
if export_full_with_consent:
consented_user_ids_path = os.path.join(path, "consented_user_ids.json")
assert os.path.exists(consented_user_ids_path)
with open(consented_user_ids_path, "rb") as f:
consented_user_ids = set(orjson.loads(f.read()))
for partial_path in files: for partial_path in files:
locked_path = partial_path.replace(".json.partial", ".json.locked") locked_path = partial_path.replace(".json.partial", ".json.locked")
output_path = partial_path.replace(".json.partial", ".json") output_path = partial_path.replace(".json.partial", ".json")
@@ -38,7 +50,10 @@ class Command(ZulipBaseCommand):
logging.info("Thread %s processing %s", options["thread"], output_path) logging.info("Thread %s processing %s", options["thread"], output_path)
try: try:
export_usermessages_batch( export_usermessages_batch(
locked_path, output_path, options["export_full_with_consent"] locked_path,
output_path,
export_full_with_consent,
consented_user_ids=consented_user_ids,
) )
except BaseException: except BaseException:
# Put the item back in the free pool when we fail # Put the item back in the free pool when we fail