mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	In all the tests files, replaced all occurences of `uri` with `url` appeared in comments, local variablles, function names and their callers.
		
			
				
	
	
		
			277 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			277 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import re
 | 
						|
import urllib
 | 
						|
from io import BytesIO, StringIO
 | 
						|
from urllib.parse import urlparse
 | 
						|
 | 
						|
from django.conf import settings
 | 
						|
from django.http.response import StreamingHttpResponse
 | 
						|
from PIL import Image
 | 
						|
 | 
						|
import zerver.lib.upload
 | 
						|
from zerver.lib.avatar_hash import user_avatar_path
 | 
						|
from zerver.lib.test_classes import UploadSerializeMixin, ZulipTestCase
 | 
						|
from zerver.lib.test_helpers import (
 | 
						|
    get_test_image_file,
 | 
						|
    read_test_image_file,
 | 
						|
)
 | 
						|
from zerver.lib.upload import (
 | 
						|
    all_message_attachments,
 | 
						|
    delete_export_tarball,
 | 
						|
    delete_message_attachment,
 | 
						|
    delete_message_attachments,
 | 
						|
    save_attachment_contents,
 | 
						|
    upload_emoji_image,
 | 
						|
    upload_export_tarball,
 | 
						|
    upload_message_attachment,
 | 
						|
)
 | 
						|
from zerver.lib.upload.base import (
 | 
						|
    DEFAULT_EMOJI_SIZE,
 | 
						|
    MEDIUM_AVATAR_SIZE,
 | 
						|
    resize_avatar,
 | 
						|
)
 | 
						|
from zerver.lib.upload.local import write_local_file
 | 
						|
from zerver.models import (
 | 
						|
    Attachment,
 | 
						|
    RealmEmoji,
 | 
						|
    get_realm,
 | 
						|
    get_system_bot,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
 | 
						|
    def test_upload_message_attachment(self) -> None:
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        url = upload_message_attachment(
 | 
						|
            "dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
 | 
						|
        )
 | 
						|
 | 
						|
        base = "/user_uploads/"
 | 
						|
        self.assertEqual(base, url[: len(base)])
 | 
						|
        path_id = re.sub("/user_uploads/", "", url)
 | 
						|
        assert settings.LOCAL_UPLOADS_DIR is not None
 | 
						|
        assert settings.LOCAL_FILES_DIR is not None
 | 
						|
        file_path = os.path.join(settings.LOCAL_FILES_DIR, path_id)
 | 
						|
        self.assertTrue(os.path.isfile(file_path))
 | 
						|
 | 
						|
        uploaded_file = Attachment.objects.get(owner=user_profile, path_id=path_id)
 | 
						|
        self.assert_length(b"zulip!", uploaded_file.size)
 | 
						|
 | 
						|
    def test_save_attachment_contents(self) -> None:
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        url = upload_message_attachment(
 | 
						|
            "dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
 | 
						|
        )
 | 
						|
 | 
						|
        path_id = re.sub("/user_uploads/", "", url)
 | 
						|
        output = BytesIO()
 | 
						|
        save_attachment_contents(path_id, output)
 | 
						|
        self.assertEqual(output.getvalue(), b"zulip!")
 | 
						|
 | 
						|
    def test_upload_message_attachment_local_cross_realm_path(self) -> None:
 | 
						|
        """
 | 
						|
        Verifies that the path of a file uploaded by a cross-realm bot to another
 | 
						|
        realm is correct.
 | 
						|
        """
 | 
						|
 | 
						|
        internal_realm = get_realm(settings.SYSTEM_BOT_REALM)
 | 
						|
        zulip_realm = get_realm("zulip")
 | 
						|
        user_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT, internal_realm.id)
 | 
						|
        self.assertEqual(user_profile.realm, internal_realm)
 | 
						|
 | 
						|
        url = upload_message_attachment(
 | 
						|
            "dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile, zulip_realm
 | 
						|
        )
 | 
						|
        # Ensure the correct realm id of the target realm is used instead of the bot's realm.
 | 
						|
        self.assertTrue(url.startswith(f"/user_uploads/{zulip_realm.id}/"))
 | 
						|
 | 
						|
    def test_delete_message_attachment(self) -> None:
 | 
						|
        self.login("hamlet")
 | 
						|
        fp = StringIO("zulip!")
 | 
						|
        fp.name = "zulip.txt"
 | 
						|
        result = self.client_post("/json/user_uploads", {"file": fp})
 | 
						|
 | 
						|
        response_dict = self.assert_json_success(result)
 | 
						|
        path_id = re.sub("/user_uploads/", "", response_dict["uri"])
 | 
						|
 | 
						|
        assert settings.LOCAL_FILES_DIR is not None
 | 
						|
        file_path = os.path.join(settings.LOCAL_FILES_DIR, path_id)
 | 
						|
        self.assertTrue(os.path.isfile(file_path))
 | 
						|
 | 
						|
        self.assertTrue(delete_message_attachment(path_id))
 | 
						|
        self.assertFalse(os.path.isfile(file_path))
 | 
						|
 | 
						|
    def test_delete_message_attachments(self) -> None:
 | 
						|
        assert settings.LOCAL_UPLOADS_DIR is not None
 | 
						|
        assert settings.LOCAL_FILES_DIR is not None
 | 
						|
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        path_ids = []
 | 
						|
        for n in range(1, 1005):
 | 
						|
            url = upload_message_attachment(
 | 
						|
                "dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
 | 
						|
            )
 | 
						|
            base = "/user_uploads/"
 | 
						|
            self.assertEqual(base, url[: len(base)])
 | 
						|
            path_id = re.sub("/user_uploads/", "", url)
 | 
						|
            path_ids.append(path_id)
 | 
						|
            file_path = os.path.join(settings.LOCAL_FILES_DIR, path_id)
 | 
						|
            self.assertTrue(os.path.isfile(file_path))
 | 
						|
 | 
						|
        delete_message_attachments(path_ids)
 | 
						|
        for path_id in path_ids:
 | 
						|
            file_path = os.path.join(settings.LOCAL_FILES_DIR, path_id)
 | 
						|
            self.assertFalse(os.path.isfile(file_path))
 | 
						|
 | 
						|
    def test_all_message_attachments(self) -> None:
 | 
						|
        write_local_file("files", "foo", b"content")
 | 
						|
        write_local_file("files", "bar/baz", b"content")
 | 
						|
        write_local_file("files", "bar/troz", b"content")
 | 
						|
        write_local_file("files", "test/other/file", b"content")
 | 
						|
        found_files = [r[0] for r in all_message_attachments()]
 | 
						|
        self.assertEqual(sorted(found_files), ["bar/baz", "bar/troz", "foo", "test/other/file"])
 | 
						|
 | 
						|
    def test_avatar_url(self) -> None:
 | 
						|
        self.login("hamlet")
 | 
						|
        with get_test_image_file("img.png") as image_file:
 | 
						|
            result = self.client_post("/json/users/me/avatar", {"file": image_file})
 | 
						|
 | 
						|
        response_dict = self.assert_json_success(result)
 | 
						|
        self.assertIn("avatar_url", response_dict)
 | 
						|
        base = "/user_avatars/"
 | 
						|
        url = self.assert_json_success(result)["avatar_url"]
 | 
						|
        self.assertEqual(base, url[: len(base)])
 | 
						|
 | 
						|
        # That URL is accessible when logged out
 | 
						|
        self.logout()
 | 
						|
        result = self.client_get(url)
 | 
						|
        self.assertEqual(result.status_code, 200)
 | 
						|
 | 
						|
        # We get a resized avatar from it
 | 
						|
        image_data = read_test_image_file("img.png")
 | 
						|
        resized_avatar = resize_avatar(image_data)
 | 
						|
        assert isinstance(result, StreamingHttpResponse)
 | 
						|
        self.assertEqual(resized_avatar, b"".join(result.streaming_content))
 | 
						|
 | 
						|
        with self.settings(DEVELOPMENT=False):
 | 
						|
            # In production, this is an X-Accel-Redirect to the
 | 
						|
            # on-disk content, which nginx serves
 | 
						|
            result = self.client_get(url)
 | 
						|
            self.assertEqual(result.status_code, 200)
 | 
						|
            internal_redirect_path = urlparse(url).path.replace(
 | 
						|
                "/user_avatars/", "/internal/local/user_avatars/"
 | 
						|
            )
 | 
						|
            self.assertEqual(result["X-Accel-Redirect"], internal_redirect_path)
 | 
						|
            self.assertEqual(b"", result.content)
 | 
						|
 | 
						|
    def test_ensure_avatar_image(self) -> None:
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        file_path = user_avatar_path(user_profile)
 | 
						|
 | 
						|
        write_local_file("avatars", file_path + ".original", read_test_image_file("img.png"))
 | 
						|
 | 
						|
        assert settings.LOCAL_UPLOADS_DIR is not None
 | 
						|
        assert settings.LOCAL_AVATARS_DIR is not None
 | 
						|
        image_path = os.path.join(settings.LOCAL_AVATARS_DIR, file_path + ".original")
 | 
						|
        with open(image_path, "rb") as f:
 | 
						|
            image_data = f.read()
 | 
						|
 | 
						|
        resized_avatar = resize_avatar(image_data)
 | 
						|
        zerver.lib.upload.upload_backend.ensure_avatar_image(user_profile)
 | 
						|
        output_path = os.path.join(settings.LOCAL_AVATARS_DIR, file_path + ".png")
 | 
						|
        with open(output_path, "rb") as original_file:
 | 
						|
            self.assertEqual(resized_avatar, original_file.read())
 | 
						|
 | 
						|
        resized_avatar = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
 | 
						|
        zerver.lib.upload.upload_backend.ensure_avatar_image(user_profile, is_medium=True)
 | 
						|
        output_path = os.path.join(settings.LOCAL_AVATARS_DIR, file_path + "-medium.png")
 | 
						|
        with open(output_path, "rb") as original_file:
 | 
						|
            self.assertEqual(resized_avatar, original_file.read())
 | 
						|
 | 
						|
    def test_get_emoji_url(self) -> None:
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        file_name = "emoji.png"
 | 
						|
 | 
						|
        with get_test_image_file("img.png") as image_file:
 | 
						|
            upload_emoji_image(image_file, file_name, user_profile)
 | 
						|
        url = zerver.lib.upload.upload_backend.get_emoji_url(file_name, user_profile.realm_id)
 | 
						|
 | 
						|
        emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
 | 
						|
            realm_id=user_profile.realm_id,
 | 
						|
            emoji_file_name=file_name,
 | 
						|
        )
 | 
						|
        expected_url = f"/user_avatars/{emoji_path}"
 | 
						|
        self.assertEqual(expected_url, url)
 | 
						|
 | 
						|
        file_name = "emoji.gif"
 | 
						|
        with get_test_image_file("animated_img.gif") as image_file:
 | 
						|
            upload_emoji_image(image_file, file_name, user_profile)
 | 
						|
        url = zerver.lib.upload.upload_backend.get_emoji_url(file_name, user_profile.realm_id)
 | 
						|
        still_url = zerver.lib.upload.upload_backend.get_emoji_url(
 | 
						|
            file_name, user_profile.realm_id, still=True
 | 
						|
        )
 | 
						|
 | 
						|
        emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
 | 
						|
            realm_id=user_profile.realm_id,
 | 
						|
            emoji_file_name=file_name,
 | 
						|
        )
 | 
						|
 | 
						|
        still_emoji_path = RealmEmoji.STILL_PATH_ID_TEMPLATE.format(
 | 
						|
            realm_id=user_profile.realm_id,
 | 
						|
            emoji_filename_without_extension=os.path.splitext(file_name)[0],
 | 
						|
        )
 | 
						|
        expected_url = f"/user_avatars/{emoji_path}"
 | 
						|
        self.assertEqual(expected_url, url)
 | 
						|
        expected_still_url = f"/user_avatars/{still_emoji_path}"
 | 
						|
        self.assertEqual(expected_still_url, still_url)
 | 
						|
 | 
						|
    def test_emoji_upload(self) -> None:
 | 
						|
        user_profile = self.example_user("hamlet")
 | 
						|
        file_name = "emoji.png"
 | 
						|
 | 
						|
        with get_test_image_file("img.png") as image_file:
 | 
						|
            upload_emoji_image(image_file, file_name, user_profile)
 | 
						|
 | 
						|
        emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
 | 
						|
            realm_id=user_profile.realm_id,
 | 
						|
            emoji_file_name=file_name,
 | 
						|
        )
 | 
						|
 | 
						|
        assert settings.LOCAL_AVATARS_DIR is not None
 | 
						|
        file_path = os.path.join(settings.LOCAL_AVATARS_DIR, emoji_path)
 | 
						|
        with open(file_path + ".original", "rb") as original_file:
 | 
						|
            self.assertEqual(read_test_image_file("img.png"), original_file.read())
 | 
						|
 | 
						|
        expected_size = (DEFAULT_EMOJI_SIZE, DEFAULT_EMOJI_SIZE)
 | 
						|
        with Image.open(file_path) as resized_image:
 | 
						|
            self.assertEqual(expected_size, resized_image.size)
 | 
						|
 | 
						|
    def test_tarball_upload_and_deletion(self) -> None:
 | 
						|
        user_profile = self.example_user("iago")
 | 
						|
        self.assertTrue(user_profile.is_realm_admin)
 | 
						|
 | 
						|
        assert settings.TEST_WORKER_DIR is not None
 | 
						|
        tarball_path = os.path.join(settings.TEST_WORKER_DIR, "tarball.tar.gz")
 | 
						|
        with open(tarball_path, "w") as f:
 | 
						|
            f.write("dummy")
 | 
						|
 | 
						|
        assert settings.LOCAL_AVATARS_DIR is not None
 | 
						|
        url = upload_export_tarball(user_profile.realm, tarball_path)
 | 
						|
        self.assertTrue(os.path.isfile(os.path.join(settings.LOCAL_AVATARS_DIR, tarball_path)))
 | 
						|
 | 
						|
        result = re.search(re.compile(r"([A-Za-z0-9\-_]{24})"), url)
 | 
						|
        if result is not None:
 | 
						|
            random_name = result.group(1)
 | 
						|
        expected_url = f"http://zulip.testserver/user_avatars/exports/{user_profile.realm_id}/{random_name}/tarball.tar.gz"
 | 
						|
        self.assertEqual(expected_url, url)
 | 
						|
 | 
						|
        # Delete the tarball.
 | 
						|
        with self.assertLogs(level="WARNING") as warn_log:
 | 
						|
            self.assertIsNone(delete_export_tarball("/not_a_file"))
 | 
						|
        self.assertEqual(
 | 
						|
            warn_log.output,
 | 
						|
            ["WARNING:root:not_a_file does not exist. Its entry in the database will be removed."],
 | 
						|
        )
 | 
						|
        path_id = urllib.parse.urlparse(url).path
 | 
						|
        self.assertEqual(delete_export_tarball(path_id), path_id)
 |