mirror of
https://github.com/zulip/zulip.git
synced 2025-11-17 20:41:46 +00:00
Hash the salt, user-id, and now avatar version into the filename. This allows the URL contents to be immutable, and thus to be marked as immutable and cacheable. Since avatars are served unauthenticated, hashing with a server-side salt makes the current and past avatars not enumerable. This requires plumbing the current (or future) avatar version through various parts of the upload process. Since this already requires a full migration of current avatars, also take the opportunity to fix the missing `.png` on S3 uploads (#12852). We switch from SHA-1 to SHA-256, but truncate it such that avatar URL data does not substantially increase in size. Fixes: #12852.
541 lines
23 KiB
Python
541 lines
23 KiB
Python
import os
|
|
import re
|
|
from io import BytesIO, StringIO
|
|
from unittest.mock import patch
|
|
from urllib.parse import urlsplit
|
|
|
|
import botocore.exceptions
|
|
import pyvips
|
|
from django.conf import settings
|
|
|
|
import zerver.lib.upload
|
|
from zerver.actions.user_settings import do_delete_avatar_image
|
|
from zerver.lib.avatar_hash import user_avatar_path
|
|
from zerver.lib.create_user import copy_default_settings
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
from zerver.lib.test_helpers import (
|
|
create_s3_buckets,
|
|
get_test_image_file,
|
|
read_test_image_file,
|
|
use_s3_backend,
|
|
)
|
|
from zerver.lib.thumbnail import (
|
|
DEFAULT_AVATAR_SIZE,
|
|
DEFAULT_EMOJI_SIZE,
|
|
MEDIUM_AVATAR_SIZE,
|
|
resize_avatar,
|
|
)
|
|
from zerver.lib.upload import (
|
|
all_message_attachments,
|
|
delete_export_tarball,
|
|
delete_message_attachment,
|
|
delete_message_attachments,
|
|
save_attachment_contents,
|
|
upload_export_tarball,
|
|
upload_message_attachment,
|
|
)
|
|
from zerver.lib.upload.s3 import S3UploadBackend
|
|
from zerver.models import Attachment, RealmEmoji, UserProfile
|
|
from zerver.models.realms import get_realm
|
|
from zerver.models.users import get_system_bot
|
|
|
|
|
|
class S3Test(ZulipTestCase):
|
|
@use_s3_backend
|
|
def test_upload_message_attachment(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
url = upload_message_attachment(
|
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
|
)
|
|
|
|
base = "/user_uploads/"
|
|
self.assertEqual(base, url[: len(base)])
|
|
path_id = re.sub(r"/user_uploads/", "", url)
|
|
content = bucket.Object(path_id).get()["Body"].read()
|
|
self.assertEqual(b"zulip!", content)
|
|
|
|
uploaded_file = Attachment.objects.get(owner=user_profile, path_id=path_id)
|
|
self.assert_length(b"zulip!", uploaded_file.size)
|
|
|
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
|
body = f"First message ...[zulip.txt](http://{user_profile.realm.host}{url})"
|
|
self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test")
|
|
|
|
@use_s3_backend
|
|
def test_save_attachment_contents(self) -> None:
|
|
create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
user_profile = self.example_user("hamlet")
|
|
url = upload_message_attachment(
|
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
|
)
|
|
|
|
path_id = re.sub(r"/user_uploads/", "", url)
|
|
output = BytesIO()
|
|
save_attachment_contents(path_id, output)
|
|
self.assertEqual(output.getvalue(), b"zulip!")
|
|
|
|
@use_s3_backend
|
|
def test_upload_message_attachment_s3_cross_realm_path(self) -> None:
|
|
"""
|
|
Verifies that the path of a file uploaded by a cross-realm bot to another
|
|
realm is correct.
|
|
"""
|
|
create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
|
|
internal_realm = get_realm(settings.SYSTEM_BOT_REALM)
|
|
zulip_realm = get_realm("zulip")
|
|
user_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT, internal_realm.id)
|
|
self.assertEqual(user_profile.realm, internal_realm)
|
|
|
|
url = upload_message_attachment(
|
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile, zulip_realm
|
|
)
|
|
# Ensure the correct realm id of the target realm is used instead of the bot's realm.
|
|
self.assertTrue(url.startswith(f"/user_uploads/{zulip_realm.id}/"))
|
|
|
|
@use_s3_backend
|
|
def test_delete_message_attachment(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
url = upload_message_attachment(
|
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
|
)
|
|
|
|
path_id = re.sub(r"/user_uploads/", "", url)
|
|
self.assertIsNotNone(bucket.Object(path_id).get())
|
|
self.assertTrue(delete_message_attachment(path_id))
|
|
with self.assertRaises(botocore.exceptions.ClientError):
|
|
bucket.Object(path_id).load()
|
|
|
|
@use_s3_backend
|
|
def test_delete_message_attachments(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
path_ids = []
|
|
for n in range(1, 5):
|
|
url = upload_message_attachment(
|
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
|
)
|
|
path_id = re.sub(r"/user_uploads/", "", url)
|
|
self.assertIsNotNone(bucket.Object(path_id).get())
|
|
path_ids.append(path_id)
|
|
|
|
with patch.object(S3UploadBackend, "delete_message_attachment") as single_delete:
|
|
delete_message_attachments(path_ids)
|
|
single_delete.assert_not_called()
|
|
for path_id in path_ids:
|
|
with self.assertRaises(botocore.exceptions.ClientError):
|
|
bucket.Object(path_id).load()
|
|
|
|
@use_s3_backend
|
|
def test_delete_message_attachment_when_file_doesnt_exist(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
|
with self.assertRaises(botocore.exceptions.ClientError):
|
|
bucket.Object("non-existent-file").load()
|
|
with self.assertLogs(level="WARNING") as warn_log:
|
|
self.assertEqual(False, delete_message_attachment("non-existent-file"))
|
|
self.assertEqual(
|
|
warn_log.output,
|
|
[
|
|
"WARNING:root:non-existent-file does not exist. Its entry in the database will be removed."
|
|
],
|
|
)
|
|
|
|
@use_s3_backend
|
|
def test_all_message_attachments(self) -> None:
|
|
create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
path_ids = []
|
|
for n in range(1, 5):
|
|
url = upload_message_attachment(
|
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
|
)
|
|
path_ids.append(re.sub(r"/user_uploads/", "", url))
|
|
found_paths = [r[0] for r in all_message_attachments()]
|
|
self.assertEqual(sorted(found_paths), sorted(path_ids))
|
|
|
|
@use_s3_backend
|
|
def test_user_uploads_authed(self) -> None:
|
|
"""
|
|
A call to /json/user_uploads should return a url and actually create an object.
|
|
"""
|
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
|
|
|
self.login("hamlet")
|
|
fp = StringIO("zulip!")
|
|
fp.name = "zulip.txt"
|
|
|
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
|
response_dict = self.assert_json_success(result)
|
|
self.assertIn("uri", response_dict)
|
|
base = "/user_uploads/"
|
|
url = response_dict["uri"]
|
|
self.assertEqual(base, url[: len(base)])
|
|
|
|
# In development, this is just a redirect
|
|
response = self.client_get(url)
|
|
redirect_url = response["Location"]
|
|
path = urlsplit(redirect_url).path
|
|
assert path.startswith("/")
|
|
key = path[len("/") :]
|
|
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
|
|
|
|
prefix = f"/internal/s3/{settings.S3_AUTH_UPLOADS_BUCKET}.s3.amazonaws.com/"
|
|
with self.settings(DEVELOPMENT=False):
|
|
response = self.client_get(url)
|
|
redirect_url = response["X-Accel-Redirect"]
|
|
path = urlsplit(redirect_url).path
|
|
assert path.startswith(prefix)
|
|
key = path[len(prefix) :]
|
|
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
|
|
|
|
# Check the download endpoint
|
|
download_url = url.replace("/user_uploads/", "/user_uploads/download/")
|
|
with self.settings(DEVELOPMENT=False):
|
|
response = self.client_get(download_url)
|
|
redirect_url = response["X-Accel-Redirect"]
|
|
path = urlsplit(redirect_url).path
|
|
assert path.startswith(prefix)
|
|
key = path[len(prefix) :]
|
|
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
|
|
|
|
# Now try the endpoint that's supposed to return a temporary URL for access
|
|
# to the file.
|
|
result = self.client_get("/json" + url)
|
|
data = self.assert_json_success(result)
|
|
url_only_url = data["url"]
|
|
|
|
self.assertNotEqual(url_only_url, url)
|
|
self.assertIn("user_uploads/temporary/", url_only_url)
|
|
self.assertTrue(url_only_url.endswith("zulip.txt"))
|
|
# The generated URL has a token authorizing the requester to access the file
|
|
# without being logged in.
|
|
self.logout()
|
|
with self.settings(DEVELOPMENT=False):
|
|
self.client_get(url_only_url)
|
|
redirect_url = response["X-Accel-Redirect"]
|
|
path = urlsplit(redirect_url).path
|
|
assert path.startswith(prefix)
|
|
key = path[len(prefix) :]
|
|
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
|
|
|
|
# The original url shouldn't work when logged out:
|
|
with self.settings(DEVELOPMENT=False):
|
|
result = self.client_get(url)
|
|
self.assertEqual(result.status_code, 302)
|
|
self.assertTrue(result.headers["Location"].endswith(f"/login/?next={url}"))
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
self.subscribe(hamlet, "Denmark")
|
|
body = f"First message ...[zulip.txt](http://{hamlet.realm.host}" + url + ")"
|
|
self.send_stream_message(hamlet, "Denmark", body, "test")
|
|
|
|
@use_s3_backend
|
|
def test_user_avatars_base(self) -> None:
|
|
backend = zerver.lib.upload.upload_backend
|
|
assert isinstance(backend, S3UploadBackend)
|
|
self.assertEqual(
|
|
backend.construct_public_upload_url_base(),
|
|
f"https://{settings.S3_AVATAR_BUCKET}.s3.amazonaws.com/",
|
|
)
|
|
|
|
with self.settings(S3_AVATAR_PUBLIC_URL_PREFIX="https://avatars.example.com"):
|
|
self.assertEqual(
|
|
backend.construct_public_upload_url_base(), "https://avatars.example.com/"
|
|
)
|
|
|
|
with self.settings(S3_AVATAR_PUBLIC_URL_PREFIX="https://avatars.example.com/"):
|
|
self.assertEqual(
|
|
backend.construct_public_upload_url_base(), "https://avatars.example.com/"
|
|
)
|
|
|
|
@use_s3_backend
|
|
def test_user_avatars_redirect(self) -> None:
|
|
create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
self.login("hamlet")
|
|
with get_test_image_file("img.png") as image_file:
|
|
result = self.client_post("/json/users/me/avatar", {"file": image_file})
|
|
|
|
response_dict = self.assert_json_success(result)
|
|
self.assertIn("avatar_url", response_dict)
|
|
base = f"https://{settings.S3_AVATAR_BUCKET}.s3.amazonaws.com/"
|
|
url = self.assert_json_success(result)["avatar_url"]
|
|
self.assertEqual(base, url[: len(base)])
|
|
|
|
# Try hitting the equivalent `/user_avatars` endpoint
|
|
wrong_url = "/user_avatars/" + url[len(base) :]
|
|
result = self.client_get(wrong_url)
|
|
self.assertEqual(result.status_code, 301)
|
|
self.assertEqual(result["Location"], url)
|
|
|
|
@use_s3_backend
|
|
def test_upload_avatar_image(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
path_id = user_avatar_path(user_profile)
|
|
original_image_path_id = path_id + ".original"
|
|
medium_path_id = path_id + "-medium.png"
|
|
|
|
with get_test_image_file("img.png") as image_file:
|
|
zerver.lib.upload.upload_avatar_image(image_file, user_profile, future=False)
|
|
test_image_data = read_test_image_file("img.png")
|
|
test_medium_image_data = resize_avatar(test_image_data, MEDIUM_AVATAR_SIZE)
|
|
|
|
original_image_key = bucket.Object(original_image_path_id)
|
|
self.assertEqual(original_image_key.key, original_image_path_id)
|
|
image_data = original_image_key.get()["Body"].read()
|
|
self.assertEqual(image_data, test_image_data)
|
|
|
|
medium_image_key = bucket.Object(medium_path_id)
|
|
self.assertEqual(medium_image_key.key, medium_path_id)
|
|
medium_image_data = medium_image_key.get()["Body"].read()
|
|
self.assertEqual(medium_image_data, test_medium_image_data)
|
|
|
|
bucket.Object(medium_image_key.key).delete()
|
|
zerver.lib.upload.ensure_avatar_image(user_profile, medium=True)
|
|
medium_image_key = bucket.Object(medium_path_id)
|
|
self.assertEqual(medium_image_key.key, medium_path_id)
|
|
|
|
@use_s3_backend
|
|
def test_copy_avatar_image(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
|
|
self.login("hamlet")
|
|
with get_test_image_file("img.png") as image_file:
|
|
self.client_post("/json/users/me/avatar", {"file": image_file})
|
|
|
|
source_user_profile = self.example_user("hamlet")
|
|
target_user_profile = self.example_user("othello")
|
|
|
|
copy_default_settings(source_user_profile, target_user_profile)
|
|
|
|
source_path_id = user_avatar_path(source_user_profile)
|
|
target_path_id = user_avatar_path(target_user_profile)
|
|
self.assertNotEqual(source_path_id, target_path_id)
|
|
|
|
source_image_key = bucket.Object(source_path_id + ".png")
|
|
target_image_key = bucket.Object(target_path_id + ".png")
|
|
self.assertEqual(target_image_key.key, target_path_id + ".png")
|
|
self.assertEqual(source_image_key.content_type, target_image_key.content_type)
|
|
source_image_data = source_image_key.get()["Body"].read()
|
|
target_image_data = target_image_key.get()["Body"].read()
|
|
|
|
source_original_image_path_id = source_path_id + ".original"
|
|
target_original_image_path_id = target_path_id + ".original"
|
|
target_original_image_key = bucket.Object(target_original_image_path_id)
|
|
self.assertEqual(target_original_image_key.key, target_original_image_path_id)
|
|
source_original_image_key = bucket.Object(source_original_image_path_id)
|
|
self.assertEqual(
|
|
source_original_image_key.content_type, target_original_image_key.content_type
|
|
)
|
|
source_image_data = source_original_image_key.get()["Body"].read()
|
|
target_image_data = target_original_image_key.get()["Body"].read()
|
|
self.assertEqual(source_image_data, target_image_data)
|
|
|
|
target_medium_path_id = target_path_id + "-medium.png"
|
|
source_medium_path_id = source_path_id + "-medium.png"
|
|
source_medium_image_key = bucket.Object(source_medium_path_id)
|
|
target_medium_image_key = bucket.Object(target_medium_path_id)
|
|
self.assertEqual(target_medium_image_key.key, target_medium_path_id)
|
|
self.assertEqual(source_medium_image_key.content_type, target_medium_image_key.content_type)
|
|
source_medium_image_data = source_medium_image_key.get()["Body"].read()
|
|
target_medium_image_data = target_medium_image_key.get()["Body"].read()
|
|
self.assertEqual(source_medium_image_data, target_medium_image_data)
|
|
|
|
@use_s3_backend
|
|
def test_ensure_avatar_image(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
base_file_path = user_avatar_path(user_profile)
|
|
file_path = base_file_path + ".png"
|
|
original_file_path = base_file_path + ".original"
|
|
medium_file_path = base_file_path + "-medium.png"
|
|
|
|
with get_test_image_file("img.png") as image_file:
|
|
zerver.lib.upload.upload_avatar_image(image_file, user_profile, future=False)
|
|
|
|
key = bucket.Object(original_file_path)
|
|
image_data = key.get()["Body"].read()
|
|
|
|
zerver.lib.upload.ensure_avatar_image(user_profile)
|
|
resized_avatar = resize_avatar(image_data)
|
|
key = bucket.Object(file_path)
|
|
self.assertEqual(resized_avatar, key.get()["Body"].read())
|
|
|
|
zerver.lib.upload.ensure_avatar_image(user_profile, medium=True)
|
|
resized_avatar = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
|
|
key = bucket.Object(medium_file_path)
|
|
self.assertEqual(resized_avatar, key.get()["Body"].read())
|
|
|
|
@use_s3_backend
|
|
def test_delete_avatar_image(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
|
|
self.login("hamlet")
|
|
with get_test_image_file("img.png") as image_file:
|
|
self.client_post("/json/users/me/avatar", {"file": image_file})
|
|
|
|
user = self.example_user("hamlet")
|
|
|
|
avatar_base_path = user_avatar_path(user)
|
|
avatar_path_id = avatar_base_path + ".png"
|
|
avatar_original_image_path_id = avatar_base_path + ".original"
|
|
avatar_medium_path_id = avatar_base_path + "-medium.png"
|
|
|
|
self.assertEqual(user.avatar_source, UserProfile.AVATAR_FROM_USER)
|
|
self.assertIsNotNone(bucket.Object(avatar_path_id))
|
|
self.assertIsNotNone(bucket.Object(avatar_original_image_path_id))
|
|
self.assertIsNotNone(bucket.Object(avatar_medium_path_id))
|
|
|
|
do_delete_avatar_image(user, acting_user=user)
|
|
|
|
self.assertEqual(user.avatar_source, UserProfile.AVATAR_FROM_GRAVATAR)
|
|
|
|
# Confirm that the avatar files no longer exist in S3.
|
|
with self.assertRaises(botocore.exceptions.ClientError):
|
|
bucket.Object(avatar_path_id).load()
|
|
with self.assertRaises(botocore.exceptions.ClientError):
|
|
bucket.Object(avatar_original_image_path_id).load()
|
|
with self.assertRaises(botocore.exceptions.ClientError):
|
|
bucket.Object(avatar_medium_path_id).load()
|
|
|
|
@use_s3_backend
|
|
def test_upload_realm_icon_image(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
with get_test_image_file("img.png") as image_file:
|
|
zerver.lib.upload.upload_backend.upload_realm_icon_image(image_file, user_profile)
|
|
|
|
original_path_id = os.path.join(str(user_profile.realm.id), "realm", "icon.original")
|
|
original_key = bucket.Object(original_path_id)
|
|
self.assertEqual(read_test_image_file("img.png"), original_key.get()["Body"].read())
|
|
|
|
resized_path_id = os.path.join(str(user_profile.realm.id), "realm", "icon.png")
|
|
resized_data = bucket.Object(resized_path_id).get()["Body"].read()
|
|
resized_image = pyvips.Image.new_from_buffer(resized_data, "")
|
|
self.assertEqual(DEFAULT_AVATAR_SIZE, resized_image.height)
|
|
self.assertEqual(DEFAULT_AVATAR_SIZE, resized_image.width)
|
|
|
|
@use_s3_backend
|
|
def _test_upload_logo_image(self, night: bool, file_name: str) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
with get_test_image_file("img.png") as image_file:
|
|
zerver.lib.upload.upload_backend.upload_realm_logo_image(
|
|
image_file, user_profile, night
|
|
)
|
|
|
|
original_path_id = os.path.join(
|
|
str(user_profile.realm.id), "realm", f"{file_name}.original"
|
|
)
|
|
original_key = bucket.Object(original_path_id)
|
|
self.assertEqual(read_test_image_file("img.png"), original_key.get()["Body"].read())
|
|
|
|
resized_path_id = os.path.join(str(user_profile.realm.id), "realm", f"{file_name}.png")
|
|
resized_data = bucket.Object(resized_path_id).get()["Body"].read()
|
|
resized_image = pyvips.Image.new_from_buffer(resized_data, "")
|
|
self.assertEqual(DEFAULT_AVATAR_SIZE, resized_image.height)
|
|
self.assertEqual(DEFAULT_AVATAR_SIZE, resized_image.width)
|
|
|
|
def test_upload_realm_logo_image(self) -> None:
|
|
self._test_upload_logo_image(night=False, file_name="logo")
|
|
self._test_upload_logo_image(night=True, file_name="night_logo")
|
|
|
|
@use_s3_backend
|
|
def test_get_emoji_url(self) -> None:
|
|
emoji_name = "emoji.png"
|
|
realm_id = 1
|
|
bucket = settings.S3_AVATAR_BUCKET
|
|
path = RealmEmoji.PATH_ID_TEMPLATE.format(realm_id=realm_id, emoji_file_name=emoji_name)
|
|
|
|
url = zerver.lib.upload.upload_backend.get_emoji_url("emoji.png", realm_id)
|
|
|
|
expected_url = f"https://{bucket}.s3.amazonaws.com/{path}"
|
|
self.assertEqual(expected_url, url)
|
|
|
|
emoji_name = "animated_image.gif"
|
|
|
|
path = RealmEmoji.PATH_ID_TEMPLATE.format(realm_id=realm_id, emoji_file_name=emoji_name)
|
|
still_path = RealmEmoji.STILL_PATH_ID_TEMPLATE.format(
|
|
realm_id=realm_id, emoji_filename_without_extension=os.path.splitext(emoji_name)[0]
|
|
)
|
|
|
|
url = zerver.lib.upload.upload_backend.get_emoji_url("animated_image.gif", realm_id)
|
|
still_url = zerver.lib.upload.upload_backend.get_emoji_url(
|
|
"animated_image.gif", realm_id, still=True
|
|
)
|
|
|
|
expected_url = f"https://{bucket}.s3.amazonaws.com/{path}"
|
|
self.assertEqual(expected_url, url)
|
|
expected_still_url = f"https://{bucket}.s3.amazonaws.com/{still_path}"
|
|
self.assertEqual(expected_still_url, still_url)
|
|
|
|
@use_s3_backend
|
|
def test_upload_emoji_image(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
emoji_name = "emoji.png"
|
|
with get_test_image_file("img.png") as image_file:
|
|
zerver.lib.upload.upload_emoji_image(image_file, emoji_name, user_profile)
|
|
|
|
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
|
|
realm_id=user_profile.realm_id,
|
|
emoji_file_name=emoji_name,
|
|
)
|
|
original_key = bucket.Object(emoji_path + ".original")
|
|
self.assertEqual(read_test_image_file("img.png"), original_key.get()["Body"].read())
|
|
|
|
resized_data = bucket.Object(emoji_path).get()["Body"].read()
|
|
resized_image = pyvips.Image.new_from_buffer(resized_data, "")
|
|
self.assertEqual(DEFAULT_EMOJI_SIZE, resized_image.height)
|
|
self.assertEqual(DEFAULT_EMOJI_SIZE, resized_image.width)
|
|
|
|
@use_s3_backend
|
|
def test_tarball_upload_and_deletion(self) -> None:
|
|
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
|
|
|
|
user_profile = self.example_user("iago")
|
|
self.assertTrue(user_profile.is_realm_admin)
|
|
|
|
tarball_path = os.path.join(settings.TEST_WORKER_DIR, "tarball.tar.gz")
|
|
with open(tarball_path, "w") as f:
|
|
f.write("dummy")
|
|
|
|
total_bytes_transferred = 0
|
|
|
|
def percent_callback(bytes_transferred: int) -> None:
|
|
nonlocal total_bytes_transferred
|
|
total_bytes_transferred += bytes_transferred
|
|
|
|
url = upload_export_tarball(
|
|
user_profile.realm, tarball_path, percent_callback=percent_callback
|
|
)
|
|
# Verify the percent_callback API works
|
|
self.assertEqual(total_bytes_transferred, 5)
|
|
|
|
result = re.search(re.compile(r"([0-9a-fA-F]{32})"), url)
|
|
if result is not None:
|
|
hex_value = result.group(1)
|
|
expected_url = f"https://{bucket.name}.s3.amazonaws.com/exports/{hex_value}/{os.path.basename(tarball_path)}"
|
|
self.assertEqual(url, expected_url)
|
|
|
|
# Delete the tarball.
|
|
with self.assertLogs(level="WARNING") as warn_log:
|
|
self.assertIsNone(delete_export_tarball("/not_a_file"))
|
|
self.assertEqual(
|
|
warn_log.output,
|
|
["WARNING:root:not_a_file does not exist. Its entry in the database will be removed."],
|
|
)
|
|
path_id = urlsplit(url).path
|
|
self.assertEqual(delete_export_tarball(path_id), path_id)
|