transfer: Migrate from run_parallel to multiprocessing.

Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
This commit is contained in:
Anders Kaseorg
2019-07-27 16:22:28 -07:00
committed by Tim Abbott
parent 7f410ff0de
commit 0f16df2f13

View File

@@ -1,4 +1,5 @@
import logging import logging
import multiprocessing
import os import os
from mimetypes import guess_type from mimetypes import guess_type
@@ -6,7 +7,6 @@ from django.conf import settings
from django.db import connection from django.db import connection
from zerver.lib.avatar_hash import user_avatar_path from zerver.lib.avatar_hash import user_avatar_path
from zerver.lib.parallel import run_parallel
from zerver.lib.upload import S3UploadBackend, upload_image_to_s3 from zerver.lib.upload import S3UploadBackend, upload_image_to_s3
from zerver.models import Attachment, RealmEmoji, UserProfile from zerver.models import Attachment, RealmEmoji, UserProfile
@@ -18,8 +18,7 @@ def transfer_uploads_to_s3(processes: int) -> None:
transfer_message_files_to_s3(processes) transfer_message_files_to_s3(processes)
transfer_emoji_to_s3(processes) transfer_emoji_to_s3(processes)
def transfer_avatars_to_s3(processes: int) -> None: def _transfer_avatar_to_s3(user: UserProfile) -> None:
def _transfer_avatar_to_s3(user: UserProfile) -> int:
avatar_path = user_avatar_path(user) avatar_path = user_avatar_path(user)
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars", avatar_path) + ".original" file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars", avatar_path) + ".original"
try: try:
@@ -28,20 +27,19 @@ def transfer_avatars_to_s3(processes: int) -> None:
logging.info("Uploaded avatar for %s in realm %s", user.id, user.realm.name) logging.info("Uploaded avatar for %s in realm %s", user.id, user.realm.name)
except FileNotFoundError: except FileNotFoundError:
pass pass
return 0
def transfer_avatars_to_s3(processes: int) -> None:
users = list(UserProfile.objects.all()) users = list(UserProfile.objects.all())
if processes == 1: if processes == 1:
for user in users: for user in users:
_transfer_avatar_to_s3(user) _transfer_avatar_to_s3(user)
else: # nocoverage else: # nocoverage
output = []
connection.close() connection.close()
for (status, job) in run_parallel(_transfer_avatar_to_s3, users, processes): with multiprocessing.Pool(processes) as p:
output.append(job) for out in p.imap_unordered(_transfer_avatar_to_s3, users):
pass
def transfer_message_files_to_s3(processes: int) -> None: def _transfer_message_files_to_s3(attachment: Attachment) -> None:
def _transfer_message_files_to_s3(attachment: Attachment) -> int:
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "files", attachment.path_id) file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "files", attachment.path_id)
try: try:
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
@@ -50,22 +48,21 @@ def transfer_message_files_to_s3(processes: int) -> None:
logging.info("Uploaded message file in path %s", file_path) logging.info("Uploaded message file in path %s", file_path)
except FileNotFoundError: # nocoverage except FileNotFoundError: # nocoverage
pass pass
return 0
def transfer_message_files_to_s3(processes: int) -> None:
attachments = list(Attachment.objects.all()) attachments = list(Attachment.objects.all())
if processes == 1: if processes == 1:
for attachment in attachments: for attachment in attachments:
_transfer_message_files_to_s3(attachment) _transfer_message_files_to_s3(attachment)
else: # nocoverage else: # nocoverage
output = []
connection.close() connection.close()
for status, job in run_parallel(_transfer_message_files_to_s3, attachments, processes): with multiprocessing.Pool(processes) as p:
output.append(job) for out in p.imap_unordered(_transfer_message_files_to_s3, attachments):
pass
def transfer_emoji_to_s3(processes: int) -> None: def _transfer_emoji_to_s3(realm_emoji: RealmEmoji) -> None:
def _transfer_emoji_to_s3(realm_emoji: RealmEmoji) -> int:
if not realm_emoji.file_name or not realm_emoji.author: if not realm_emoji.file_name or not realm_emoji.author:
return 0 # nocoverage return # nocoverage
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format( emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=realm_emoji.realm.id, realm_id=realm_emoji.realm.id,
emoji_file_name=realm_emoji.file_name, emoji_file_name=realm_emoji.file_name,
@@ -77,14 +74,14 @@ def transfer_emoji_to_s3(processes: int) -> None:
logging.info("Uploaded emoji file in path %s", emoji_path) logging.info("Uploaded emoji file in path %s", emoji_path)
except FileNotFoundError: # nocoverage except FileNotFoundError: # nocoverage
pass pass
return 0
def transfer_emoji_to_s3(processes: int) -> None:
realm_emojis = list(RealmEmoji.objects.filter()) realm_emojis = list(RealmEmoji.objects.filter())
if processes == 1: if processes == 1:
for realm_emoji in realm_emojis: for realm_emoji in realm_emojis:
_transfer_emoji_to_s3(realm_emoji) _transfer_emoji_to_s3(realm_emoji)
else: # nocoverage else: # nocoverage
output = []
connection.close() connection.close()
for status, job in run_parallel(_transfer_emoji_to_s3, realm_emojis, processes): with multiprocessing.Pool(processes) as p:
output.append(job) for out in p.imap_unordered(_transfer_emoji_to_s3, realm_emojis):
pass