mirror of
https://github.com/zulip/zulip.git
synced 2025-11-03 21:43:21 +00:00
python: Normalize quotes with Black.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
committed by
Tim Abbott
parent
11741543da
commit
6e4c3e41dc
@@ -90,10 +90,10 @@ def sanitize_name(value: str) -> str:
|
||||
* adding '.' and '_' to the list of allowed characters.
|
||||
* preserving the case of the value.
|
||||
"""
|
||||
value = unicodedata.normalize('NFKC', value)
|
||||
value = re.sub(r'[^\w\s._-]', '', value, flags=re.U).strip()
|
||||
value = re.sub(r'[-\s]+', '-', value, flags=re.U)
|
||||
assert value not in {'', '.', '..'}
|
||||
value = unicodedata.normalize("NFKC", value)
|
||||
value = re.sub(r"[^\w\s._-]", "", value, flags=re.U).strip()
|
||||
value = re.sub(r"[-\s]+", "-", value, flags=re.U)
|
||||
assert value not in {"", ".", ".."}
|
||||
return mark_safe(value)
|
||||
|
||||
|
||||
@@ -105,14 +105,14 @@ name_to_tag_num = {name: num for num, name in ExifTags.TAGS.items()}
|
||||
|
||||
# https://stackoverflow.com/a/6218425
|
||||
def exif_rotate(image: Image) -> Image:
|
||||
if not hasattr(image, '_getexif'):
|
||||
if not hasattr(image, "_getexif"):
|
||||
return image
|
||||
exif_data = image._getexif()
|
||||
if exif_data is None:
|
||||
return image
|
||||
|
||||
exif_dict = dict(exif_data.items())
|
||||
orientation = exif_dict.get(name_to_tag_num['Orientation'])
|
||||
orientation = exif_dict.get(name_to_tag_num["Orientation"])
|
||||
|
||||
if orientation == 3:
|
||||
return image.rotate(180, expand=True)
|
||||
@@ -134,9 +134,9 @@ def resize_avatar(image_data: bytes, size: int = DEFAULT_AVATAR_SIZE) -> bytes:
|
||||
except DecompressionBombError:
|
||||
raise BadImageError(_("Image size exceeds limit."))
|
||||
out = io.BytesIO()
|
||||
if im.mode == 'CMYK':
|
||||
im = im.convert('RGB')
|
||||
im.save(out, format='png')
|
||||
if im.mode == "CMYK":
|
||||
im = im.convert("RGB")
|
||||
im.save(out, format="png")
|
||||
return out.getvalue()
|
||||
|
||||
|
||||
@@ -150,9 +150,9 @@ def resize_logo(image_data: bytes) -> bytes:
|
||||
except DecompressionBombError:
|
||||
raise BadImageError(_("Image size exceeds limit."))
|
||||
out = io.BytesIO()
|
||||
if im.mode == 'CMYK':
|
||||
im = im.convert('RGB')
|
||||
im.save(out, format='png')
|
||||
if im.mode == "CMYK":
|
||||
im = im.convert("RGB")
|
||||
im.save(out, format="png")
|
||||
return out.getvalue()
|
||||
|
||||
|
||||
@@ -168,7 +168,7 @@ def resize_gif(im: GifImageFile, size: int = DEFAULT_EMOJI_SIZE) -> bytes:
|
||||
new_frame.paste(im, (0, 0), im.convert("RGBA"))
|
||||
new_frame = ImageOps.pad(new_frame, (size, size), Image.ANTIALIAS)
|
||||
frames.append(new_frame)
|
||||
duration_info.append(im.info['duration'])
|
||||
duration_info.append(im.info["duration"])
|
||||
disposals.append(im.disposal_method)
|
||||
out = io.BytesIO()
|
||||
frames[0].save(
|
||||
@@ -302,7 +302,7 @@ def get_bucket(bucket_name: str, session: Optional[Session] = None) -> ServiceRe
|
||||
if session is None:
|
||||
session = boto3.Session(settings.S3_KEY, settings.S3_SECRET_KEY)
|
||||
bucket = session.resource(
|
||||
's3', region_name=settings.S3_REGION, endpoint_url=settings.S3_ENDPOINT_URL
|
||||
"s3", region_name=settings.S3_REGION, endpoint_url=settings.S3_ENDPOINT_URL
|
||||
).Bucket(bucket_name)
|
||||
return bucket
|
||||
|
||||
@@ -321,9 +321,9 @@ def upload_image_to_s3(
|
||||
"realm_id": str(user_profile.realm_id),
|
||||
}
|
||||
|
||||
content_disposition = ''
|
||||
content_disposition = ""
|
||||
if content_type is None:
|
||||
content_type = ''
|
||||
content_type = ""
|
||||
if content_type not in INLINE_MIME_TYPES:
|
||||
content_disposition = "attachment"
|
||||
|
||||
@@ -347,7 +347,7 @@ def check_upload_within_quota(realm: Realm, uploaded_file_size: int) -> None:
|
||||
def get_file_info(request: HttpRequest, user_file: File) -> Tuple[str, int, Optional[str]]:
|
||||
|
||||
uploaded_file_name = user_file.name
|
||||
content_type = request.GET.get('mimetype')
|
||||
content_type = request.GET.get("mimetype")
|
||||
if content_type is None:
|
||||
guessed_type = guess_type(uploaded_file_name)[0]
|
||||
if guessed_type is not None:
|
||||
@@ -365,17 +365,17 @@ def get_file_info(request: HttpRequest, user_file: File) -> Tuple[str, int, Opti
|
||||
|
||||
def get_signed_upload_url(path: str) -> str:
|
||||
client = boto3.client(
|
||||
's3',
|
||||
"s3",
|
||||
aws_access_key_id=settings.S3_KEY,
|
||||
aws_secret_access_key=settings.S3_SECRET_KEY,
|
||||
region_name=settings.S3_REGION,
|
||||
endpoint_url=settings.S3_ENDPOINT_URL,
|
||||
)
|
||||
return client.generate_presigned_url(
|
||||
ClientMethod='get_object',
|
||||
Params={'Bucket': settings.S3_AUTH_UPLOADS_BUCKET, 'Key': path},
|
||||
ClientMethod="get_object",
|
||||
Params={"Bucket": settings.S3_AUTH_UPLOADS_BUCKET, "Key": path},
|
||||
ExpiresIn=SIGNED_UPLOAD_URL_DURATION,
|
||||
HttpMethod='GET',
|
||||
HttpMethod="GET",
|
||||
)
|
||||
|
||||
|
||||
@@ -468,7 +468,7 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
upload_image_to_s3(
|
||||
self.avatar_bucket,
|
||||
s3_file_name,
|
||||
'image/png',
|
||||
"image/png",
|
||||
target_user_profile,
|
||||
resized_data,
|
||||
)
|
||||
@@ -507,7 +507,7 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
s3_target_file_name = user_avatar_path(target_profile)
|
||||
|
||||
key = self.get_avatar_key(s3_source_file_name + ".original")
|
||||
image_data = key.get()['Body'].read()
|
||||
image_data = key.get()["Body"].read()
|
||||
content_type = key.content_type
|
||||
|
||||
self.write_avatar_images(s3_target_file_name, target_profile, image_data, content_type)
|
||||
@@ -522,11 +522,11 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
return f"{self.avatar_bucket_url}{export_path}"
|
||||
|
||||
def realm_avatar_and_logo_path(self, realm: Realm) -> str:
|
||||
return os.path.join(str(realm.id), 'realm')
|
||||
return os.path.join(str(realm.id), "realm")
|
||||
|
||||
def upload_realm_icon_image(self, icon_file: File, user_profile: UserProfile) -> None:
|
||||
content_type = guess_type(icon_file.name)[0]
|
||||
s3_file_name = os.path.join(self.realm_avatar_and_logo_path(user_profile.realm), 'icon')
|
||||
s3_file_name = os.path.join(self.realm_avatar_and_logo_path(user_profile.realm), "icon")
|
||||
|
||||
image_data = icon_file.read()
|
||||
upload_image_to_s3(
|
||||
@@ -541,7 +541,7 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
upload_image_to_s3(
|
||||
self.avatar_bucket,
|
||||
s3_file_name + ".png",
|
||||
'image/png',
|
||||
"image/png",
|
||||
user_profile,
|
||||
resized_data,
|
||||
)
|
||||
@@ -557,9 +557,9 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
) -> None:
|
||||
content_type = guess_type(logo_file.name)[0]
|
||||
if night:
|
||||
basename = 'night_logo'
|
||||
basename = "night_logo"
|
||||
else:
|
||||
basename = 'logo'
|
||||
basename = "logo"
|
||||
s3_file_name = os.path.join(self.realm_avatar_and_logo_path(user_profile.realm), basename)
|
||||
|
||||
image_data = logo_file.read()
|
||||
@@ -575,7 +575,7 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
upload_image_to_s3(
|
||||
self.avatar_bucket,
|
||||
s3_file_name + ".png",
|
||||
'image/png',
|
||||
"image/png",
|
||||
user_profile,
|
||||
resized_data,
|
||||
)
|
||||
@@ -585,9 +585,9 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
|
||||
# ?x=x allows templates to append additional parameters with &s
|
||||
if not night:
|
||||
file_name = 'logo.png'
|
||||
file_name = "logo.png"
|
||||
else:
|
||||
file_name = 'night_logo.png'
|
||||
file_name = "night_logo.png"
|
||||
return f"{self.avatar_bucket_url}/{realm_id}/realm/{file_name}?version={version}"
|
||||
|
||||
def ensure_medium_avatar_image(self, user_profile: UserProfile) -> None:
|
||||
@@ -595,7 +595,7 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
s3_file_name = file_path
|
||||
|
||||
key = self.avatar_bucket.Object(file_path + ".original")
|
||||
image_data = key.get()['Body'].read()
|
||||
image_data = key.get()["Body"].read()
|
||||
|
||||
resized_medium = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
|
||||
upload_image_to_s3(
|
||||
@@ -613,7 +613,7 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
s3_file_name = file_path
|
||||
|
||||
key = self.avatar_bucket.Object(file_path + ".original")
|
||||
image_data = key.get()['Body'].read()
|
||||
image_data = key.get()["Body"].read()
|
||||
|
||||
resized_avatar = resize_avatar(image_data)
|
||||
upload_image_to_s3(
|
||||
@@ -673,15 +673,15 @@ class S3UploadBackend(ZulipUploadBackend):
|
||||
config = Config(signature_version=botocore.UNSIGNED)
|
||||
|
||||
public_url = session.create_client(
|
||||
's3',
|
||||
"s3",
|
||||
region_name=settings.S3_REGION,
|
||||
endpoint_url=settings.S3_ENDPOINT_URL,
|
||||
config=config,
|
||||
).generate_presigned_url(
|
||||
'get_object',
|
||||
"get_object",
|
||||
Params={
|
||||
'Bucket': self.avatar_bucket.name,
|
||||
'Key': key.key,
|
||||
"Bucket": self.avatar_bucket.name,
|
||||
"Key": key.key,
|
||||
},
|
||||
ExpiresIn=0,
|
||||
)
|
||||
@@ -702,13 +702,13 @@ def write_local_file(type: str, path: str, file_data: bytes) -> None:
|
||||
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, type, path)
|
||||
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
with open(file_path, 'wb') as f:
|
||||
with open(file_path, "wb") as f:
|
||||
f.write(file_data)
|
||||
|
||||
|
||||
def read_local_file(type: str, path: str) -> bytes:
|
||||
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, type, path)
|
||||
with open(file_path, 'rb') as f:
|
||||
with open(file_path, "rb") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
@@ -724,7 +724,7 @@ def delete_local_file(type: str, path: str) -> bool:
|
||||
|
||||
|
||||
def get_local_file_path(path_id: str) -> Optional[str]:
|
||||
local_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
|
||||
local_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "files", path_id)
|
||||
if os.path.isfile(local_path):
|
||||
return local_path
|
||||
else:
|
||||
@@ -736,16 +736,16 @@ LOCAL_FILE_ACCESS_TOKEN_SALT = "local_file_"
|
||||
|
||||
def generate_unauthed_file_access_url(path_id: str) -> str:
|
||||
signed_data = TimestampSigner(salt=LOCAL_FILE_ACCESS_TOKEN_SALT).sign(path_id)
|
||||
token = base64.b16encode(signed_data.encode('utf-8')).decode('utf-8')
|
||||
token = base64.b16encode(signed_data.encode("utf-8")).decode("utf-8")
|
||||
|
||||
filename = path_id.split('/')[-1]
|
||||
return reverse('local_file_unauthed', args=[token, filename])
|
||||
filename = path_id.split("/")[-1]
|
||||
return reverse("local_file_unauthed", args=[token, filename])
|
||||
|
||||
|
||||
def get_local_file_path_id_from_token(token: str) -> Optional[str]:
|
||||
signer = TimestampSigner(salt=LOCAL_FILE_ACCESS_TOKEN_SALT)
|
||||
try:
|
||||
signed_data = base64.b16decode(token).decode('utf-8')
|
||||
signed_data = base64.b16decode(token).decode("utf-8")
|
||||
path_id = signer.unsign(signed_data, max_age=timedelta(seconds=60))
|
||||
except (BadSignature, binascii.Error):
|
||||
return None
|
||||
@@ -767,27 +767,27 @@ class LocalUploadBackend(ZulipUploadBackend):
|
||||
path = "/".join(
|
||||
[
|
||||
str(user_profile.realm_id),
|
||||
format(random.randint(0, 255), 'x'),
|
||||
format(random.randint(0, 255), "x"),
|
||||
secrets.token_urlsafe(18),
|
||||
sanitize_name(uploaded_file_name),
|
||||
]
|
||||
)
|
||||
|
||||
write_local_file('files', path, file_data)
|
||||
write_local_file("files", path, file_data)
|
||||
create_attachment(uploaded_file_name, path, user_profile, uploaded_file_size)
|
||||
return '/user_uploads/' + path
|
||||
return "/user_uploads/" + path
|
||||
|
||||
def delete_message_image(self, path_id: str) -> bool:
|
||||
return delete_local_file('files', path_id)
|
||||
return delete_local_file("files", path_id)
|
||||
|
||||
def write_avatar_images(self, file_path: str, image_data: bytes) -> None:
|
||||
write_local_file('avatars', file_path + '.original', image_data)
|
||||
write_local_file("avatars", file_path + ".original", image_data)
|
||||
|
||||
resized_data = resize_avatar(image_data)
|
||||
write_local_file('avatars', file_path + '.png', resized_data)
|
||||
write_local_file("avatars", file_path + ".png", resized_data)
|
||||
|
||||
resized_medium = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
|
||||
write_local_file('avatars', file_path + '-medium.png', resized_medium)
|
||||
write_local_file("avatars", file_path + "-medium.png", resized_medium)
|
||||
|
||||
def upload_avatar_image(
|
||||
self,
|
||||
@@ -817,19 +817,19 @@ class LocalUploadBackend(ZulipUploadBackend):
|
||||
source_file_path = user_avatar_path(source_profile)
|
||||
target_file_path = user_avatar_path(target_profile)
|
||||
|
||||
image_data = read_local_file('avatars', source_file_path + '.original')
|
||||
image_data = read_local_file("avatars", source_file_path + ".original")
|
||||
self.write_avatar_images(target_file_path, image_data)
|
||||
|
||||
def realm_avatar_and_logo_path(self, realm: Realm) -> str:
|
||||
return os.path.join('avatars', str(realm.id), 'realm')
|
||||
return os.path.join("avatars", str(realm.id), "realm")
|
||||
|
||||
def upload_realm_icon_image(self, icon_file: File, user_profile: UserProfile) -> None:
|
||||
upload_path = self.realm_avatar_and_logo_path(user_profile.realm)
|
||||
image_data = icon_file.read()
|
||||
write_local_file(upload_path, 'icon.original', image_data)
|
||||
write_local_file(upload_path, "icon.original", image_data)
|
||||
|
||||
resized_data = resize_avatar(image_data)
|
||||
write_local_file(upload_path, 'icon.png', resized_data)
|
||||
write_local_file(upload_path, "icon.png", resized_data)
|
||||
|
||||
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
|
||||
# ?x=x allows templates to append additional parameters with &s
|
||||
@@ -840,11 +840,11 @@ class LocalUploadBackend(ZulipUploadBackend):
|
||||
) -> None:
|
||||
upload_path = self.realm_avatar_and_logo_path(user_profile.realm)
|
||||
if night:
|
||||
original_file = 'night_logo.original'
|
||||
resized_file = 'night_logo.png'
|
||||
original_file = "night_logo.original"
|
||||
resized_file = "night_logo.png"
|
||||
else:
|
||||
original_file = 'logo.original'
|
||||
resized_file = 'logo.png'
|
||||
original_file = "logo.original"
|
||||
resized_file = "logo.png"
|
||||
image_data = logo_file.read()
|
||||
write_local_file(upload_path, original_file, image_data)
|
||||
|
||||
@@ -854,9 +854,9 @@ class LocalUploadBackend(ZulipUploadBackend):
|
||||
def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
|
||||
# ?x=x allows templates to append additional parameters with &s
|
||||
if night:
|
||||
file_name = 'night_logo.png'
|
||||
file_name = "night_logo.png"
|
||||
else:
|
||||
file_name = 'logo.png'
|
||||
file_name = "logo.png"
|
||||
return f"/user_avatars/{realm_id}/realm/{file_name}?version={version}"
|
||||
|
||||
def ensure_medium_avatar_image(self, user_profile: UserProfile) -> None:
|
||||
@@ -870,7 +870,7 @@ class LocalUploadBackend(ZulipUploadBackend):
|
||||
with open(image_path, "rb") as f:
|
||||
image_data = f.read()
|
||||
resized_medium = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
|
||||
write_local_file('avatars', file_path + '-medium.png', resized_medium)
|
||||
write_local_file("avatars", file_path + "-medium.png", resized_medium)
|
||||
|
||||
def ensure_basic_avatar_image(self, user_profile: UserProfile) -> None: # nocoverage
|
||||
# TODO: Refactor this to share code with ensure_medium_avatar_image
|
||||
@@ -884,7 +884,7 @@ class LocalUploadBackend(ZulipUploadBackend):
|
||||
with open(image_path, "rb") as f:
|
||||
image_data = f.read()
|
||||
resized_avatar = resize_avatar(image_data)
|
||||
write_local_file('avatars', file_path + '.png', resized_avatar)
|
||||
write_local_file("avatars", file_path + ".png", resized_avatar)
|
||||
|
||||
def upload_emoji_image(
|
||||
self, emoji_file: File, emoji_file_name: str, user_profile: UserProfile
|
||||
@@ -896,8 +896,8 @@ class LocalUploadBackend(ZulipUploadBackend):
|
||||
|
||||
image_data = emoji_file.read()
|
||||
resized_image_data = resize_emoji(image_data)
|
||||
write_local_file('avatars', ".".join((emoji_path, "original")), image_data)
|
||||
write_local_file('avatars', emoji_path, resized_image_data)
|
||||
write_local_file("avatars", ".".join((emoji_path, "original")), image_data)
|
||||
write_local_file("avatars", emoji_path, resized_image_data)
|
||||
|
||||
def get_emoji_url(self, emoji_file_name: str, realm_id: int) -> str:
|
||||
return os.path.join(
|
||||
@@ -912,22 +912,22 @@ class LocalUploadBackend(ZulipUploadBackend):
|
||||
percent_callback: Optional[Callable[[Any], None]] = None,
|
||||
) -> str:
|
||||
path = os.path.join(
|
||||
'exports',
|
||||
"exports",
|
||||
str(realm.id),
|
||||
secrets.token_urlsafe(18),
|
||||
os.path.basename(tarball_path),
|
||||
)
|
||||
abs_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'avatars', path)
|
||||
abs_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars", path)
|
||||
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
|
||||
shutil.copy(tarball_path, abs_path)
|
||||
public_url = realm.uri + '/user_avatars/' + path
|
||||
public_url = realm.uri + "/user_avatars/" + path
|
||||
return public_url
|
||||
|
||||
def delete_export_tarball(self, export_path: str) -> Optional[str]:
|
||||
# Get the last element of a list in the form ['user_avatars', '<file_path>']
|
||||
assert export_path.startswith("/")
|
||||
file_path = export_path[1:].split('/', 1)[-1]
|
||||
if delete_local_file('avatars', file_path):
|
||||
file_path = export_path[1:].split("/", 1)[-1]
|
||||
if delete_local_file("avatars", file_path):
|
||||
return export_path
|
||||
return None
|
||||
|
||||
@@ -1023,7 +1023,7 @@ def create_attachment(
|
||||
)
|
||||
from zerver.lib.actions import notify_attachment_update
|
||||
|
||||
notify_attachment_update(user_profile, 'add', attachment.to_dict())
|
||||
notify_attachment_update(user_profile, "add", attachment.to_dict())
|
||||
return True
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user