import: Avoid unnecessary forks when downloading attachments.

The previous implementation used run_parallel incorrectly, passing it
a set of very small jobs (each was to download a single file), which
meant that we'd end up forking once for every file to download.

This correct implementation sends each of N threads 1/N of the files
to download, which is more consistent with the goal of distributing
the download work between N threads.
This commit is contained in:
Tim Abbott
2018-11-30 16:15:55 -08:00
parent 0191bb593a
commit 48a3975ec0

View File

@@ -3,9 +3,10 @@ import requests
import shutil
import logging
import os
import traceback
import ujson
from typing import List, Dict, Any, Optional, Set, Callable
from typing import List, Dict, Any, Optional, Set, Callable, Iterable, Tuple, TypeVar
from django.forms.models import model_to_dict
from zerver.models import Realm, RealmEmoji, Subscription, Recipient, \
@@ -13,7 +14,7 @@ from zerver.models import Realm, RealmEmoji, Subscription, Recipient, \
from zerver.data_import.sequencer import NEXT_ID
from zerver.lib.actions import STREAM_ASSIGNMENT_COLORS as stream_colors
from zerver.lib.avatar_hash import user_avatar_path_from_ids
from zerver.lib.parallel import run_parallel
from zerver.lib.parallel import run_parallel, JobData
# stubs
ZerverFieldsT = Dict[str, Any]
@@ -388,7 +389,7 @@ def process_avatars(avatar_list: List[ZerverFieldsT], avatar_dir: str, realm_id:
downloaded. For simpler conversions see write_avatar_png.
"""
def get_avatar(avatar_upload_item: List[str]) -> int:
def get_avatar(avatar_upload_item: List[str]) -> None:
avatar_url = avatar_upload_item[0]
image_path = os.path.join(avatar_dir, avatar_upload_item[1])
@@ -398,7 +399,6 @@ def process_avatars(avatar_list: List[ZerverFieldsT], avatar_dir: str, realm_id:
with open(image_path, 'wb') as image_file:
shutil.copyfileobj(response.raw, image_file)
shutil.copy(image_path, original_image_path)
return 0
logging.info('######### GETTING AVATARS #########\n')
logging.info('DOWNLOADING AVATARS .......\n')
@@ -425,7 +425,7 @@ def process_avatars(avatar_list: List[ZerverFieldsT], avatar_dir: str, realm_id:
# Run downloads parallely
output = []
for (status, job) in run_parallel(get_avatar, avatar_upload_list, threads=threads):
for (status, job) in run_parallel_wrapper(get_avatar, avatar_upload_list, threads=threads):
output.append(job)
logging.info('######### GETTING AVATARS FINISHED #########\n')
@@ -462,6 +462,26 @@ def write_avatar_png(avatar_folder: str,
return metadata
ListJobData = TypeVar('ListJobData')
def run_parallel_wrapper(f: Callable[[ListJobData], None], full_items: List[ListJobData],
threads: int=6) -> Iterable[Tuple[int, List[ListJobData]]]:
logging.info("Distributing %s items across %s threads" % (len(full_items), threads))
def wrapping_function(items: List[ListJobData]) -> int:
count = 0
for item in items:
try:
f(item)
except Exception:
logging.info("Error processing item: %s" % (item,))
traceback.print_exc()
count += 1
if count % 1000 == 0:
logging.info("A download thread finished %s items" % (count,))
return 0
job_lists = [full_items[i::threads] for i in range(threads)] # type: List[List[ListJobData]]
return run_parallel(wrapping_function, job_lists, threads=threads)
def process_uploads(upload_list: List[ZerverFieldsT], upload_dir: str,
threads: int) -> List[ZerverFieldsT]:
"""
@@ -471,7 +491,7 @@ def process_uploads(upload_list: List[ZerverFieldsT], upload_dir: str,
1. upload_list: List of uploads to be mapped in uploads records.json file
2. upload_dir: Folder where the downloaded uploads are saved
"""
def get_uploads(upload: List[str]) -> int:
def get_uploads(upload: List[str]) -> None:
upload_url = upload[0]
upload_path = upload[1]
upload_path = os.path.join(upload_dir, upload_path)
@@ -480,7 +500,6 @@ def process_uploads(upload_list: List[ZerverFieldsT], upload_dir: str,
os.makedirs(os.path.dirname(upload_path), exist_ok=True)
with open(upload_path, 'wb') as upload_file:
shutil.copyfileobj(response.raw, upload_file)
return 0
logging.info('######### GETTING ATTACHMENTS #########\n')
logging.info('DOWNLOADING ATTACHMENTS .......\n')
@@ -493,7 +512,7 @@ def process_uploads(upload_list: List[ZerverFieldsT], upload_dir: str,
# Run downloads parallely
output = []
for (status, job) in run_parallel(get_uploads, upload_url_list, threads=threads):
for (status, job) in run_parallel_wrapper(get_uploads, upload_url_list, threads=threads):
output.append(job)
logging.info('######### GETTING ATTACHMENTS FINISHED #########\n')
@@ -522,7 +541,7 @@ def process_emojis(zerver_realmemoji: List[ZerverFieldsT], emoji_dir: str,
2. emoji_dir: Folder where the downloaded emojis are saved
3. emoji_url_map: Maps emoji name to its url
"""
def get_emojis(upload: List[str]) -> int:
def get_emojis(upload: List[str]) -> None:
emoji_url = upload[0]
emoji_path = upload[1]
upload_emoji_path = os.path.join(emoji_dir, emoji_path)
@@ -531,7 +550,6 @@ def process_emojis(zerver_realmemoji: List[ZerverFieldsT], emoji_dir: str,
os.makedirs(os.path.dirname(upload_emoji_path), exist_ok=True)
with open(upload_emoji_path, 'wb') as emoji_file:
shutil.copyfileobj(response.raw, emoji_file)
return 0
emoji_records = []
upload_emoji_list = []
@@ -555,7 +573,7 @@ def process_emojis(zerver_realmemoji: List[ZerverFieldsT], emoji_dir: str,
# Run downloads parallely
output = []
for (status, job) in run_parallel(get_emojis, upload_emoji_list, threads=threads):
for (status, job) in run_parallel_wrapper(get_emojis, upload_emoji_list, threads=threads):
output.append(job)
logging.info('######### GETTING EMOJIS FINISHED #########\n')