export: Stop calling processes threads.

This commit is contained in:
Alex Vandiver
2024-01-19 21:30:24 +00:00
committed by Tim Abbott
parent 131580f23c
commit a815261abf
11 changed files with 49 additions and 47 deletions

View File

@@ -590,7 +590,7 @@ def process_avatars(
avatar_list: list[ZerverFieldsT], avatar_list: list[ZerverFieldsT],
avatar_dir: str, avatar_dir: str,
realm_id: int, realm_id: int,
threads: int, processes: int,
size_url_suffix: str = "", size_url_suffix: str = "",
) -> list[ZerverFieldsT]: ) -> list[ZerverFieldsT]:
""" """
@@ -637,7 +637,7 @@ def process_avatars(
run_parallel( run_parallel(
partial(get_avatar, avatar_dir, size_url_suffix), partial(get_avatar, avatar_dir, size_url_suffix),
avatar_upload_list, avatar_upload_list,
processes=threads, processes=processes,
catch=True, catch=True,
report=lambda count: logging.info("Finished %s items", count), report=lambda count: logging.info("Finished %s items", count),
) )
@@ -658,7 +658,7 @@ def get_uploads(upload_dir: str, upload: list[str]) -> None:
def process_uploads( def process_uploads(
upload_list: list[ZerverFieldsT], upload_dir: str, threads: int upload_list: list[ZerverFieldsT], upload_dir: str, processes: int
) -> list[ZerverFieldsT]: ) -> list[ZerverFieldsT]:
""" """
This function downloads the uploads and saves it in the realm's upload directory. This function downloads the uploads and saves it in the realm's upload directory.
@@ -680,7 +680,7 @@ def process_uploads(
run_parallel( run_parallel(
partial(get_uploads, upload_dir), partial(get_uploads, upload_dir),
upload_url_list, upload_url_list,
processes=threads, processes=processes,
catch=True, catch=True,
report=lambda count: logging.info("Finished %s items", count), report=lambda count: logging.info("Finished %s items", count),
) )
@@ -715,7 +715,7 @@ def process_emojis(
zerver_realmemoji: list[ZerverFieldsT], zerver_realmemoji: list[ZerverFieldsT],
emoji_dir: str, emoji_dir: str,
emoji_url_map: ZerverFieldsT, emoji_url_map: ZerverFieldsT,
threads: int, processes: int,
) -> list[ZerverFieldsT]: ) -> list[ZerverFieldsT]:
""" """
This function downloads the custom emojis and saves in the output emoji folder. This function downloads the custom emojis and saves in the output emoji folder.

View File

@@ -1618,7 +1618,7 @@ def do_convert_zipfile(
original_path: str, original_path: str,
output_dir: str, output_dir: str,
token: str, token: str,
threads: int = 6, processes: int = 6,
convert_slack_threads: bool = False, convert_slack_threads: bool = False,
) -> None: ) -> None:
assert original_path.endswith(".zip") assert original_path.endswith(".zip")
@@ -1661,7 +1661,7 @@ def do_convert_zipfile(
zipObj.extractall(slack_data_dir) zipObj.extractall(slack_data_dir)
do_convert_directory(slack_data_dir, output_dir, token, threads, convert_slack_threads) do_convert_directory(slack_data_dir, output_dir, token, processes, convert_slack_threads)
finally: finally:
# Always clean up the uncompressed directory # Always clean up the uncompressed directory
rm_tree(slack_data_dir) rm_tree(slack_data_dir)
@@ -1686,7 +1686,7 @@ def do_convert_directory(
slack_data_dir: str, slack_data_dir: str,
output_dir: str, output_dir: str,
token: str, token: str,
threads: int = 6, processes: int = 6,
convert_slack_threads: bool = False, convert_slack_threads: bool = False,
) -> None: ) -> None:
check_slack_token_access(token, SLACK_IMPORT_TOKEN_SCOPES) check_slack_token_access(token, SLACK_IMPORT_TOKEN_SCOPES)
@@ -1752,18 +1752,20 @@ def do_convert_directory(
emoji_folder = os.path.join(output_dir, "emoji") emoji_folder = os.path.join(output_dir, "emoji")
os.makedirs(emoji_folder, exist_ok=True) os.makedirs(emoji_folder, exist_ok=True)
emoji_records = process_emojis(realm["zerver_realmemoji"], emoji_folder, emoji_url_map, threads) emoji_records = process_emojis(
realm["zerver_realmemoji"], emoji_folder, emoji_url_map, processes
)
avatar_folder = os.path.join(output_dir, "avatars") avatar_folder = os.path.join(output_dir, "avatars")
avatar_realm_folder = os.path.join(avatar_folder, str(realm_id)) avatar_realm_folder = os.path.join(avatar_folder, str(realm_id))
os.makedirs(avatar_realm_folder, exist_ok=True) os.makedirs(avatar_realm_folder, exist_ok=True)
avatar_records = process_avatars( avatar_records = process_avatars(
avatar_list, avatar_folder, realm_id, threads, size_url_suffix="-512" avatar_list, avatar_folder, realm_id, processes, size_url_suffix="-512"
) )
uploads_folder = os.path.join(output_dir, "uploads") uploads_folder = os.path.join(output_dir, "uploads")
os.makedirs(os.path.join(uploads_folder, str(realm_id)), exist_ok=True) os.makedirs(os.path.join(uploads_folder, str(realm_id)), exist_ok=True)
uploads_records = process_uploads(uploads_list, uploads_folder, threads) uploads_records = process_uploads(uploads_list, uploads_folder, processes)
attachment = {"zerver_attachment": zerver_attachment} attachment = {"zerver_attachment": zerver_attachment}
team_info_dict = get_slack_api_data("https://slack.com/api/team.info", "team", token=token) team_info_dict = get_slack_api_data("https://slack.com/api/team.info", "team", token=token)

View File

@@ -2451,7 +2451,7 @@ def get_exportable_scheduled_message_ids(
def do_export_realm( def do_export_realm(
realm: Realm, realm: Realm,
output_dir: Path, output_dir: Path,
threads: int, processes: int,
export_type: int, export_type: int,
exportable_user_ids: set[int] | None = None, exportable_user_ids: set[int] | None = None,
export_as_active: bool | None = None, export_as_active: bool | None = None,
@@ -2462,11 +2462,11 @@ def do_export_realm(
# indicates a bug. # indicates a bug.
assert export_type == RealmExport.EXPORT_FULL_WITH_CONSENT assert export_type == RealmExport.EXPORT_FULL_WITH_CONSENT
# We need at least one thread running to export # We need at least one process running to export
# UserMessage rows. The management command should # UserMessage rows. The management command should
# enforce this for us. # enforce this for us.
if not settings.TEST_SUITE: if not settings.TEST_SUITE:
assert threads >= 1 assert processes >= 1
realm_config = get_realm_config() realm_config = get_realm_config()
@@ -2544,7 +2544,7 @@ def do_export_realm(
# Start parallel jobs to export the UserMessage objects. # Start parallel jobs to export the UserMessage objects.
launch_user_message_subprocesses( launch_user_message_subprocesses(
threads=threads, processes=processes,
output_dir=output_dir, output_dir=output_dir,
export_full_with_consent=export_type == RealmExport.EXPORT_FULL_WITH_CONSENT, export_full_with_consent=export_type == RealmExport.EXPORT_FULL_WITH_CONSENT,
exportable_user_ids=exportable_user_ids, exportable_user_ids=exportable_user_ids,
@@ -2585,12 +2585,12 @@ def export_attachment_table(
def launch_user_message_subprocesses( def launch_user_message_subprocesses(
threads: int, processes: int,
output_dir: Path, output_dir: Path,
export_full_with_consent: bool, export_full_with_consent: bool,
exportable_user_ids: set[int] | None, exportable_user_ids: set[int] | None,
) -> None: ) -> None:
logging.info("Launching %d PARALLEL subprocesses to export UserMessage rows", threads) logging.info("Launching %d PARALLEL subprocesses to export UserMessage rows", processes)
pids = {} pids = {}
if export_full_with_consent: if export_full_with_consent:
@@ -2600,12 +2600,12 @@ def launch_user_message_subprocesses(
f.write(orjson.dumps(list(exportable_user_ids))) f.write(orjson.dumps(list(exportable_user_ids)))
logging.info("Created consented_user_ids.json file.") logging.info("Created consented_user_ids.json file.")
for shard_id in range(threads): for shard_id in range(processes):
arguments = [ arguments = [
os.path.join(settings.DEPLOY_ROOT, "manage.py"), os.path.join(settings.DEPLOY_ROOT, "manage.py"),
"export_usermessage_batch", "export_usermessage_batch",
f"--path={output_dir}", f"--path={output_dir}",
f"--thread={shard_id}", f"--process={shard_id}",
] ]
if export_full_with_consent: if export_full_with_consent:
arguments.append("--export-full-with-consent") arguments.append("--export-full-with-consent")
@@ -2955,7 +2955,7 @@ def get_consented_user_ids(realm: Realm) -> set[int]:
def export_realm_wrapper( def export_realm_wrapper(
export_row: RealmExport, export_row: RealmExport,
output_dir: str, output_dir: str,
threads: int, processes: int,
upload: bool, upload: bool,
percent_callback: Callable[[Any], None] | None = None, percent_callback: Callable[[Any], None] | None = None,
export_as_active: bool | None = None, export_as_active: bool | None = None,
@@ -2972,7 +2972,7 @@ def export_realm_wrapper(
tarball_path, stats = do_export_realm( tarball_path, stats = do_export_realm(
realm=export_row.realm, realm=export_row.realm,
output_dir=output_dir, output_dir=output_dir,
threads=threads, processes=processes,
export_type=export_row.type, export_type=export_row.type,
export_as_active=export_as_active, export_as_active=export_as_active,
exportable_user_ids=exportable_user_ids, exportable_user_ids=exportable_user_ids,

View File

@@ -32,9 +32,9 @@ class Command(ZulipBaseCommand):
) )
parser.add_argument( parser.add_argument(
"--threads", "--processes",
default=settings.DEFAULT_DATA_EXPORT_IMPORT_PARALLELISM, default=settings.DEFAULT_DATA_EXPORT_IMPORT_PARALLELISM,
help="Threads to use in exporting UserMessage objects in parallel", help="Processes to use in exporting UserMessage objects in parallel",
) )
parser.add_argument( parser.add_argument(
@@ -57,9 +57,9 @@ class Command(ZulipBaseCommand):
if token is None: if token is None:
raise CommandError("Enter Slack legacy token!") raise CommandError("Enter Slack legacy token!")
num_threads = int(options["threads"]) num_processes = int(options["processes"])
if num_threads < 1: if num_processes < 1:
raise CommandError("You must have at least one thread.") raise CommandError("You must have at least one process.")
for path in options["slack_data_path"]: for path in options["slack_data_path"]:
if not os.path.exists(path): if not os.path.exists(path):
@@ -72,7 +72,7 @@ class Command(ZulipBaseCommand):
path, path,
output_dir, output_dir,
token, token,
threads=num_threads, processes=num_processes,
convert_slack_threads=convert_slack_threads, convert_slack_threads=convert_slack_threads,
) )
elif os.path.isfile(path) and path.endswith(".zip"): elif os.path.isfile(path) and path.endswith(".zip"):
@@ -80,7 +80,7 @@ class Command(ZulipBaseCommand):
path, path,
output_dir, output_dir,
token, token,
threads=num_threads, processes=num_processes,
convert_slack_threads=convert_slack_threads, convert_slack_threads=convert_slack_threads,
) )
else: else:

View File

@@ -68,8 +68,8 @@ class Command(ZulipBaseCommand):
make sure you have the procedure right and minimize downtime. make sure you have the procedure right and minimize downtime.
Performance: In one test, the tool exported a realm with hundreds Performance: In one test, the tool exported a realm with hundreds
of users and ~1M messages of history with --threads=1 in about 3 of users and ~1M messages of history with --parallel=1 in about 3
hours of serial runtime (goes down to ~50m with --threads=6 on a hours of serial runtime (goes down to ~50m with --parallel=6 on a
machine with 8 CPUs). Importing that same data set took about 30 machine with 8 CPUs). Importing that same data set took about 30
minutes. But this will vary a lot depending on the average number minutes. But this will vary a lot depending on the average number
of recipients of messages in the realm, hardware, etc.""" of recipients of messages in the realm, hardware, etc."""
@@ -80,9 +80,9 @@ class Command(ZulipBaseCommand):
"--output", dest="output_dir", help="Directory to write exported data to." "--output", dest="output_dir", help="Directory to write exported data to."
) )
parser.add_argument( parser.add_argument(
"--threads", "--parallel",
default=settings.DEFAULT_DATA_EXPORT_IMPORT_PARALLELISM, default=settings.DEFAULT_DATA_EXPORT_IMPORT_PARALLELISM,
help="Threads to use in exporting UserMessage objects in parallel", help="Processes to use in exporting UserMessage objects in parallel",
) )
parser.add_argument( parser.add_argument(
"--public-only", "--public-only",
@@ -121,9 +121,9 @@ class Command(ZulipBaseCommand):
print(f"\033[94mExporting realm\033[0m: {realm.string_id}") print(f"\033[94mExporting realm\033[0m: {realm.string_id}")
num_threads = int(options["threads"]) processes = int(options["parallel"])
if num_threads < 1: if processes < 1:
raise CommandError("You must have at least one thread.") raise CommandError("You must have at least one process.")
if public_only and export_full_with_consent: if public_only and export_full_with_consent:
raise CommandError("Please pass either --public-only or --export-full-with-consennt") raise CommandError("Please pass either --public-only or --export-full-with-consennt")
@@ -183,7 +183,7 @@ class Command(ZulipBaseCommand):
export_realm_wrapper( export_realm_wrapper(
export_row=export_row, export_row=export_row,
output_dir=output_dir, output_dir=output_dir,
threads=num_threads, processes=processes,
upload=options["upload"], upload=options["upload"],
percent_callback=percent_callback, percent_callback=percent_callback,
export_as_active=True if options["deactivate_realm"] else None, export_as_active=True if options["deactivate_realm"] else None,

View File

@@ -17,7 +17,7 @@ class Command(ZulipBaseCommand):
@override @override
def add_arguments(self, parser: ArgumentParser) -> None: def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument("--path", help="Path to find messages.json archives") parser.add_argument("--path", help="Path to find messages.json archives")
parser.add_argument("--thread", help="Thread ID") parser.add_argument("--process", help="Process identifier (used only for debug output)")
parser.add_argument( parser.add_argument(
"--export-full-with-consent", "--export-full-with-consent",
action="store_true", action="store_true",
@@ -26,7 +26,7 @@ class Command(ZulipBaseCommand):
@override @override
def handle(self, *args: Any, **options: Any) -> None: def handle(self, *args: Any, **options: Any) -> None:
logging.info("Starting UserMessage batch thread %s", options["thread"]) logging.info("Starting UserMessage batch process %s", options["process"])
path = options["path"] path = options["path"]
files = set(glob.glob(os.path.join(path, "messages-*.json.partial"))) files = set(glob.glob(os.path.join(path, "messages-*.json.partial")))
@@ -47,7 +47,7 @@ class Command(ZulipBaseCommand):
except FileNotFoundError: except FileNotFoundError:
# Already claimed by another process # Already claimed by another process
continue continue
logging.info("Thread %s processing %s", options["thread"], output_path) logging.info("Process %s processing %s", options["process"], output_path)
try: try:
export_usermessages_batch( export_usermessages_batch(
locked_path, locked_path,

View File

@@ -416,7 +416,7 @@ class RealmImportExportTest(ExportFile):
do_export_realm( do_export_realm(
realm=realm, realm=realm,
output_dir=output_dir, output_dir=output_dir,
threads=0, processes=0,
export_type=export_type, export_type=export_type,
exportable_user_ids=exportable_user_ids, exportable_user_ids=exportable_user_ids,
) )

View File

@@ -537,7 +537,7 @@ class TestExport(ZulipTestCase):
call_command(self.COMMAND_NAME, "-r=zulip", "--export-full-with-consent") call_command(self.COMMAND_NAME, "-r=zulip", "--export-full-with-consent")
m.assert_called_once_with( m.assert_called_once_with(
export_row=mock.ANY, export_row=mock.ANY,
threads=mock.ANY, processes=mock.ANY,
output_dir=mock.ANY, output_dir=mock.ANY,
percent_callback=mock.ANY, percent_callback=mock.ANY,
upload=False, upload=False,

View File

@@ -63,7 +63,7 @@ class RealmExportTest(ZulipTestCase):
self.assertEqual(args["realm"], admin.realm) self.assertEqual(args["realm"], admin.realm)
self.assertEqual(args["export_type"], RealmExport.EXPORT_PUBLIC) self.assertEqual(args["export_type"], RealmExport.EXPORT_PUBLIC)
self.assertTrue(os.path.basename(args["output_dir"]).startswith("zulip-export-")) self.assertTrue(os.path.basename(args["output_dir"]).startswith("zulip-export-"))
self.assertEqual(args["threads"], 6) self.assertEqual(args["processes"], 6)
# Get the entry and test that iago initiated it. # Get the entry and test that iago initiated it.
export_row = RealmExport.objects.first() export_row = RealmExport.objects.first()
@@ -121,7 +121,7 @@ class RealmExportTest(ZulipTestCase):
def fake_export_realm( def fake_export_realm(
realm: Realm, realm: Realm,
output_dir: str, output_dir: str,
threads: int, processes: int,
export_type: int, export_type: int,
exportable_user_ids: set[int] | None = None, exportable_user_ids: set[int] | None = None,
export_as_active: bool | None = None, export_as_active: bool | None = None,
@@ -129,7 +129,7 @@ class RealmExportTest(ZulipTestCase):
self.assertEqual(realm, admin.realm) self.assertEqual(realm, admin.realm)
self.assertEqual(export_type, RealmExport.EXPORT_PUBLIC) self.assertEqual(export_type, RealmExport.EXPORT_PUBLIC)
self.assertTrue(os.path.basename(output_dir).startswith("zulip-export-")) self.assertTrue(os.path.basename(output_dir).startswith("zulip-export-"))
self.assertEqual(threads, 6) self.assertEqual(processes, 6)
# Check that the export shows up as in progress # Check that the export shows up as in progress
result = self.client_get("/json/export/realm") result = self.client_get("/json/export/realm")

View File

@@ -1929,7 +1929,7 @@ by Pieter
with self.assertLogs(level="INFO"), self.settings(EXTERNAL_HOST="zulip.example.com"): with self.assertLogs(level="INFO"), self.settings(EXTERNAL_HOST="zulip.example.com"):
# We need to mock EXTERNAL_HOST to be a valid domain because Slack's importer # We need to mock EXTERNAL_HOST to be a valid domain because Slack's importer
# uses it to generate email addresses for users without an email specified. # uses it to generate email addresses for users without an email specified.
do_convert_zipfile(test_slack_zip_file, output_dir, token, threads=1) do_convert_zipfile(test_slack_zip_file, output_dir, token, processes=1)
self.assertTrue(os.path.exists(output_dir)) self.assertTrue(os.path.exists(output_dir))
self.assertTrue(os.path.exists(output_dir + "/realm.json")) self.assertTrue(os.path.exists(output_dir + "/realm.json"))
@@ -2138,7 +2138,7 @@ by Pieter
with self.assertLogs(level="INFO"), self.settings(EXTERNAL_HOST="zulip.example.com"): with self.assertLogs(level="INFO"), self.settings(EXTERNAL_HOST="zulip.example.com"):
# We need to mock EXTERNAL_HOST to be a valid domain because Slack's importer # We need to mock EXTERNAL_HOST to be a valid domain because Slack's importer
# uses it to generate email addresses for users without an email specified. # uses it to generate email addresses for users without an email specified.
do_convert_zipfile(test_slack_zip_file, output_dir, token, threads=1) do_convert_zipfile(test_slack_zip_file, output_dir, token, processes=1)
@mock.patch("zerver.data_import.slack.check_slack_token_access") @mock.patch("zerver.data_import.slack.check_slack_token_access")
@responses.activate @responses.activate

View File

@@ -173,7 +173,7 @@ class DeferredWorker(QueueProcessingWorker):
export_realm_wrapper( export_realm_wrapper(
export_row=export_row, export_row=export_row,
output_dir=output_dir, output_dir=output_dir,
threads=1 if self.threaded else 6, processes=1 if self.threaded else 6,
upload=True, upload=True,
) )
except Exception: except Exception: