mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 14:03:30 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			295 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			295 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import io
 | 
						|
import os
 | 
						|
import re
 | 
						|
import unicodedata
 | 
						|
from datetime import datetime
 | 
						|
from typing import IO, Any, BinaryIO, Callable, Iterator, List, Optional, Tuple
 | 
						|
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
from PIL import GifImagePlugin, Image, ImageOps, PngImagePlugin
 | 
						|
from PIL.Image import DecompressionBombError
 | 
						|
 | 
						|
from zerver.lib.exceptions import ErrorCode, JsonableError
 | 
						|
from zerver.models import Attachment, Realm, UserProfile, is_cross_realm_bot_email
 | 
						|
 | 
						|
DEFAULT_AVATAR_SIZE = 100
 | 
						|
MEDIUM_AVATAR_SIZE = 500
 | 
						|
DEFAULT_EMOJI_SIZE = 64
 | 
						|
 | 
						|
# These sizes were selected based on looking at the maximum common
 | 
						|
# sizes in a library of animated custom emoji, balanced against the
 | 
						|
# network cost of very large emoji images.
 | 
						|
MAX_EMOJI_GIF_SIZE = 128
 | 
						|
MAX_EMOJI_GIF_FILE_SIZE_BYTES = 128 * 1024 * 1024  # 128 kb
 | 
						|
 | 
						|
 | 
						|
INLINE_MIME_TYPES = [
 | 
						|
    "application/pdf",
 | 
						|
    "image/gif",
 | 
						|
    "image/jpeg",
 | 
						|
    "image/png",
 | 
						|
    "image/webp",
 | 
						|
    # To avoid cross-site scripting attacks, DO NOT add types such
 | 
						|
    # as application/xhtml+xml, application/x-shockwave-flash,
 | 
						|
    # image/svg+xml, text/html, or text/xml.
 | 
						|
]
 | 
						|
 | 
						|
 | 
						|
def sanitize_name(value: str) -> str:
 | 
						|
    """
 | 
						|
    Sanitizes a value to be safe to store in a Linux filesystem, in
 | 
						|
    S3, and in a URL.  So Unicode is allowed, but not special
 | 
						|
    characters other than ".", "-", and "_".
 | 
						|
 | 
						|
    This implementation is based on django.utils.text.slugify; it is
 | 
						|
    modified by:
 | 
						|
    * adding '.' to the list of allowed characters.
 | 
						|
    * preserving the case of the value.
 | 
						|
    * not stripping trailing dashes and underscores.
 | 
						|
    """
 | 
						|
    value = unicodedata.normalize("NFKC", value)
 | 
						|
    value = re.sub(r"[^\w\s.-]", "", value).strip()
 | 
						|
    value = re.sub(r"[-\s]+", "-", value)
 | 
						|
    assert value not in {"", ".", ".."}
 | 
						|
    return value
 | 
						|
 | 
						|
 | 
						|
class BadImageError(JsonableError):
 | 
						|
    code = ErrorCode.BAD_IMAGE
 | 
						|
 | 
						|
 | 
						|
def resize_avatar(image_data: bytes, size: int = DEFAULT_AVATAR_SIZE) -> bytes:
 | 
						|
    try:
 | 
						|
        im = Image.open(io.BytesIO(image_data))
 | 
						|
        im = ImageOps.exif_transpose(im)
 | 
						|
        im = ImageOps.fit(im, (size, size), Image.Resampling.LANCZOS)
 | 
						|
    except OSError:
 | 
						|
        raise BadImageError(_("Could not decode image; did you upload an image file?"))
 | 
						|
    except DecompressionBombError:
 | 
						|
        raise BadImageError(_("Image size exceeds limit."))
 | 
						|
    out = io.BytesIO()
 | 
						|
    if im.mode == "CMYK":
 | 
						|
        im = im.convert("RGB")
 | 
						|
    im.save(out, format="png")
 | 
						|
    return out.getvalue()
 | 
						|
 | 
						|
 | 
						|
def resize_logo(image_data: bytes) -> bytes:
 | 
						|
    try:
 | 
						|
        im = Image.open(io.BytesIO(image_data))
 | 
						|
        im = ImageOps.exif_transpose(im)
 | 
						|
        im.thumbnail((8 * DEFAULT_AVATAR_SIZE, DEFAULT_AVATAR_SIZE), Image.Resampling.LANCZOS)
 | 
						|
    except OSError:
 | 
						|
        raise BadImageError(_("Could not decode image; did you upload an image file?"))
 | 
						|
    except DecompressionBombError:
 | 
						|
        raise BadImageError(_("Image size exceeds limit."))
 | 
						|
    out = io.BytesIO()
 | 
						|
    if im.mode == "CMYK":
 | 
						|
        im = im.convert("RGB")
 | 
						|
    im.save(out, format="png")
 | 
						|
    return out.getvalue()
 | 
						|
 | 
						|
 | 
						|
def resize_animated(im: Image.Image, size: int = DEFAULT_EMOJI_SIZE) -> bytes:
 | 
						|
    assert im.n_frames > 1
 | 
						|
    frames = []
 | 
						|
    duration_info = []
 | 
						|
    disposals = []
 | 
						|
    # If 'loop' info is not set then loop for infinite number of times.
 | 
						|
    loop = im.info.get("loop", 0)
 | 
						|
    for frame_num in range(0, im.n_frames):
 | 
						|
        im.seek(frame_num)
 | 
						|
        new_frame = im.copy()
 | 
						|
        new_frame.paste(im, (0, 0), im.convert("RGBA"))
 | 
						|
        new_frame = ImageOps.pad(new_frame, (size, size), Image.Resampling.LANCZOS)
 | 
						|
        frames.append(new_frame)
 | 
						|
        if im.info.get("duration") is None:  # nocoverage
 | 
						|
            raise BadImageError(_("Corrupt animated image."))
 | 
						|
        duration_info.append(im.info["duration"])
 | 
						|
        if isinstance(im, GifImagePlugin.GifImageFile):
 | 
						|
            disposals.append(
 | 
						|
                im.disposal_method  # type: ignore[attr-defined]  # private member missing from stubs
 | 
						|
            )
 | 
						|
        elif isinstance(im, PngImagePlugin.PngImageFile):
 | 
						|
            disposals.append(im.info.get("disposal", PngImagePlugin.Disposal.OP_NONE))
 | 
						|
        else:  # nocoverage
 | 
						|
            raise BadImageError(_("Unknown animated image format."))
 | 
						|
    out = io.BytesIO()
 | 
						|
    frames[0].save(
 | 
						|
        out,
 | 
						|
        save_all=True,
 | 
						|
        optimize=False,
 | 
						|
        format=im.format,
 | 
						|
        append_images=frames[1:],
 | 
						|
        duration=duration_info,
 | 
						|
        disposal=disposals,
 | 
						|
        loop=loop,
 | 
						|
    )
 | 
						|
 | 
						|
    return out.getvalue()
 | 
						|
 | 
						|
 | 
						|
def resize_emoji(
 | 
						|
    image_data: bytes, size: int = DEFAULT_EMOJI_SIZE
 | 
						|
) -> Tuple[bytes, bool, Optional[bytes]]:
 | 
						|
    # This function returns three values:
 | 
						|
    # 1) Emoji image data.
 | 
						|
    # 2) If emoji is gif i.e animated.
 | 
						|
    # 3) If is animated then return still image data i.e first frame of gif.
 | 
						|
 | 
						|
    try:
 | 
						|
        im = Image.open(io.BytesIO(image_data))
 | 
						|
        image_format = im.format
 | 
						|
        if getattr(im, "n_frames", 1) > 1:
 | 
						|
            # There are a number of bugs in Pillow which cause results
 | 
						|
            # in resized images being broken. To work around this we
 | 
						|
            # only resize under certain conditions to minimize the
 | 
						|
            # chance of creating ugly images.
 | 
						|
            should_resize = (
 | 
						|
                im.size[0] != im.size[1]  # not square
 | 
						|
                or im.size[0] > MAX_EMOJI_GIF_SIZE  # dimensions too large
 | 
						|
                or len(image_data) > MAX_EMOJI_GIF_FILE_SIZE_BYTES  # filesize too large
 | 
						|
            )
 | 
						|
 | 
						|
            # Generate a still image from the first frame.  Since
 | 
						|
            # we're converting the format to PNG anyway, we resize unconditionally.
 | 
						|
            still_image = im.copy()
 | 
						|
            still_image.seek(0)
 | 
						|
            still_image = ImageOps.exif_transpose(still_image)
 | 
						|
            still_image = ImageOps.fit(still_image, (size, size), Image.Resampling.LANCZOS)
 | 
						|
            out = io.BytesIO()
 | 
						|
            still_image.save(out, format="PNG")
 | 
						|
            still_image_data = out.getvalue()
 | 
						|
 | 
						|
            if should_resize:
 | 
						|
                image_data = resize_animated(im, size)
 | 
						|
 | 
						|
            return image_data, True, still_image_data
 | 
						|
        else:
 | 
						|
            # Note that this is essentially duplicated in the
 | 
						|
            # still_image code path, above.
 | 
						|
            im = ImageOps.exif_transpose(im)
 | 
						|
            im = ImageOps.fit(im, (size, size), Image.Resampling.LANCZOS)
 | 
						|
            out = io.BytesIO()
 | 
						|
            im.save(out, format=image_format)
 | 
						|
            return out.getvalue(), False, None
 | 
						|
    except OSError:
 | 
						|
        raise BadImageError(_("Could not decode image; did you upload an image file?"))
 | 
						|
    except DecompressionBombError:
 | 
						|
        raise BadImageError(_("Image size exceeds limit."))
 | 
						|
 | 
						|
 | 
						|
class ZulipUploadBackend:
 | 
						|
    # Message attachment uploads
 | 
						|
    def get_public_upload_root_url(self) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def generate_message_upload_path(self, realm_id: str, uploaded_file_name: str) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def upload_message_attachment(
 | 
						|
        self,
 | 
						|
        uploaded_file_name: str,
 | 
						|
        uploaded_file_size: int,
 | 
						|
        content_type: Optional[str],
 | 
						|
        file_data: bytes,
 | 
						|
        user_profile: UserProfile,
 | 
						|
        target_realm: Optional[Realm] = None,
 | 
						|
    ) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def save_attachment_contents(self, path_id: str, filehandle: BinaryIO) -> None:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def delete_message_attachment(self, path_id: str) -> bool:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def delete_message_attachments(self, path_ids: List[str]) -> None:
 | 
						|
        for path_id in path_ids:
 | 
						|
            self.delete_message_attachment(path_id)
 | 
						|
 | 
						|
    def all_message_attachments(self) -> Iterator[Tuple[str, datetime]]:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    # Avatar image uploads
 | 
						|
    def get_avatar_url(self, hash_key: str, medium: bool = False) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def upload_avatar_image(
 | 
						|
        self,
 | 
						|
        user_file: IO[bytes],
 | 
						|
        acting_user_profile: UserProfile,
 | 
						|
        target_user_profile: UserProfile,
 | 
						|
        content_type: Optional[str] = None,
 | 
						|
    ) -> None:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def copy_avatar(self, source_profile: UserProfile, target_profile: UserProfile) -> None:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def ensure_avatar_image(self, user_profile: UserProfile, is_medium: bool = False) -> None:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def delete_avatar_image(self, user: UserProfile) -> None:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    # Realm icon and logo uploads
 | 
						|
    def realm_avatar_and_logo_path(self, realm: Realm) -> str:
 | 
						|
        return os.path.join(str(realm.id), "realm")
 | 
						|
 | 
						|
    def get_realm_icon_url(self, realm_id: int, version: int) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def upload_realm_icon_image(self, icon_file: IO[bytes], user_profile: UserProfile) -> None:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def upload_realm_logo_image(
 | 
						|
        self, logo_file: IO[bytes], user_profile: UserProfile, night: bool
 | 
						|
    ) -> None:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    # Realm emoji uploads
 | 
						|
    def get_emoji_url(self, emoji_file_name: str, realm_id: int, still: bool = False) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def upload_emoji_image(
 | 
						|
        self, emoji_file: IO[bytes], emoji_file_name: str, user_profile: UserProfile
 | 
						|
    ) -> bool:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    # Export tarballs
 | 
						|
    def get_export_tarball_url(self, realm: Realm, export_path: str) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def upload_export_tarball(
 | 
						|
        self,
 | 
						|
        realm: Realm,
 | 
						|
        tarball_path: str,
 | 
						|
        percent_callback: Optional[Callable[[Any], None]] = None,
 | 
						|
    ) -> str:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
    def delete_export_tarball(self, export_path: str) -> Optional[str]:
 | 
						|
        raise NotImplementedError
 | 
						|
 | 
						|
 | 
						|
def create_attachment(
 | 
						|
    file_name: str, path_id: str, user_profile: UserProfile, realm: Realm, file_size: int
 | 
						|
) -> None:
 | 
						|
    assert (user_profile.realm_id == realm.id) or is_cross_realm_bot_email(
 | 
						|
        user_profile.delivery_email
 | 
						|
    )
 | 
						|
    attachment = Attachment.objects.create(
 | 
						|
        file_name=file_name,
 | 
						|
        path_id=path_id,
 | 
						|
        owner=user_profile,
 | 
						|
        realm=realm,
 | 
						|
        size=file_size,
 | 
						|
    )
 | 
						|
    from zerver.actions.uploads import notify_attachment_update
 | 
						|
 | 
						|
    notify_attachment_update(user_profile, "add", attachment.to_dict())
 |