import_realm: Migrate from run_parallel to multiprocessing.

Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
This commit is contained in:
Anders Kaseorg
2019-07-27 16:08:18 -07:00
committed by Tim Abbott
parent 9a2aad58d0
commit 7f410ff0de

View File

@@ -1,5 +1,6 @@
import datetime
import logging
import multiprocessing
import os
import secrets
import shutil
@@ -27,7 +28,6 @@ from zerver.lib.bulk_create import bulk_create_users, bulk_set_users_or_streams_
from zerver.lib.export import DATE_FIELDS, Field, Path, Record, TableData, TableName
from zerver.lib.markdown import markdown_convert
from zerver.lib.markdown import version as markdown_version
from zerver.lib.parallel import run_parallel
from zerver.lib.server_initialization import create_internal_realm, server_initialized
from zerver.lib.streams import render_stream_description
from zerver.lib.timestamp import datetime_to_timestamp
@@ -601,6 +601,32 @@ def bulk_import_client(data: TableData, model: Any, table: TableName) -> None:
client = Client.objects.create(name=item['name'])
update_id_map(table='client', old_id=item['id'], new_id=client.id)
def process_avatars(record: Dict[str, Any]) -> None:
from zerver.lib.upload import upload_backend
if record['s3_path'].endswith('.original'):
user_profile = get_user_profile_by_id(record['user_profile_id'])
if settings.LOCAL_UPLOADS_DIR is not None:
avatar_path = user_avatar_path_from_ids(user_profile.id, record['realm_id'])
medium_file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars",
avatar_path) + '-medium.png'
if os.path.exists(medium_file_path):
# We remove the image here primarily to deal with
# issues when running the import script multiple
# times in development (where one might reuse the
# same realm ID from a previous iteration).
os.remove(medium_file_path)
try:
upload_backend.ensure_medium_avatar_image(user_profile=user_profile)
if record.get("importer_should_thumbnail"):
upload_backend.ensure_basic_avatar_image(user_profile=user_profile)
except BadImageError:
logging.warning(
"Could not thumbnail avatar image for user %s; ignoring",
user_profile.id,
)
# Delete the record of the avatar to avoid 404s.
do_change_avatar_fields(user_profile, UserProfile.AVATAR_FROM_GRAVATAR, acting_user=None)
def import_uploads(realm: Realm, import_dir: Path, processes: int, processing_avatars: bool=False,
processing_emojis: bool=False, processing_realm_icons: bool=False) -> None:
if processing_avatars and processing_emojis:
@@ -723,47 +749,19 @@ def import_uploads(realm: Realm, import_dir: Path, processes: int, processing_av
shutil.copy(orig_file_path, file_path)
if processing_avatars:
from zerver.lib.upload import upload_backend
# Ensure that we have medium-size avatar images for every
# avatar. TODO: This implementation is hacky, both in that it
# does get_user_profile_by_id for each user, and in that it
# might be better to require the export to just have these.
def process_avatars(record: Dict[Any, Any]) -> int:
if record['s3_path'].endswith('.original'):
user_profile = get_user_profile_by_id(record['user_profile_id'])
if settings.LOCAL_UPLOADS_DIR is not None:
avatar_path = user_avatar_path_from_ids(user_profile.id, record['realm_id'])
medium_file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars",
avatar_path) + '-medium.png'
if os.path.exists(medium_file_path):
# We remove the image here primarily to deal with
# issues when running the import script multiple
# times in development (where one might reuse the
# same realm ID from a previous iteration).
os.remove(medium_file_path)
try:
upload_backend.ensure_medium_avatar_image(user_profile=user_profile)
if record.get("importer_should_thumbnail"):
upload_backend.ensure_basic_avatar_image(user_profile=user_profile)
except BadImageError:
logging.warning(
"Could not thumbnail avatar image for user %s; ignoring",
user_profile.id,
)
# Delete the record of the avatar to avoid 404s.
do_change_avatar_fields(user_profile, UserProfile.AVATAR_FROM_GRAVATAR, acting_user=None)
return 0
if processes == 1:
for record in records:
process_avatars(record)
else:
connection.close()
output = []
for (status, job) in run_parallel(process_avatars, records, processes):
output.append(job)
with multiprocessing.Pool(processes) as p:
for out in p.imap_unordered(process_avatars, records):
pass
# Importing data suffers from a difficult ordering problem because of
# models that reference each other circularly. Here is a correct order.