mirror of
https://github.com/zulip/zulip.git
synced 2025-11-18 21:48:30 +00:00
Now reading API keys from a user is done with the get_api_key wrapper method, rather than directly fetching it from the user object. Also, every place where an action should be done for each API key is now using get_all_api_keys. This method returns for the moment a single-item list, containing the specified user's API key. This commit is the first step towards allowing users have multiple API keys.
323 lines
15 KiB
Python
323 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
from django.conf import settings
|
|
|
|
from zerver.lib.test_classes import ZulipTestCase, UploadSerializeMixin
|
|
from zerver.lib.test_helpers import use_s3_backend, override_settings
|
|
from zerver.lib.users import get_api_key
|
|
|
|
from io import StringIO
|
|
from boto.s3.connection import S3Connection
|
|
import ujson
|
|
import urllib
|
|
import base64
|
|
|
|
class ThumbnailTest(ZulipTestCase):
|
|
|
|
@use_s3_backend
|
|
def test_s3_source_type(self) -> None:
|
|
def get_file_path_urlpart(uri: str, size: str='') -> str:
|
|
base = '/user_uploads/'
|
|
url_in_result = 'smart/filters:no_upscale()/%s/source_type/s3'
|
|
if size:
|
|
url_in_result = '/%s/%s' % (size, url_in_result)
|
|
upload_file_path = uri[len(base):]
|
|
hex_uri = base64.urlsafe_b64encode(upload_file_path.encode()).decode('utf-8')
|
|
return url_in_result % (hex_uri)
|
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
|
|
self.login(self.example_email("hamlet"))
|
|
fp = StringIO("zulip!")
|
|
fp.name = "zulip.jpeg"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
self.assert_json_success(result)
|
|
json = ujson.loads(result.content)
|
|
self.assertIn("uri", json)
|
|
uri = json["uri"]
|
|
base = '/user_uploads/'
|
|
self.assertEqual(base, uri[:len(base)])
|
|
|
|
quoted_uri = urllib.parse.quote(uri[1:], safe='')
|
|
|
|
# Test full size image.
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri)
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test thumbnail size.
|
|
result = self.client_get("/thumbnail?url=%s&size=thumbnail" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri, '0x100')
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Tests the /api/v1/thumbnail api endpoint with standard API auth
|
|
self.logout()
|
|
result = self.api_get(
|
|
self.example_email("hamlet"),
|
|
'/thumbnail?url=%s&size=full' %
|
|
(quoted_uri,))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri)
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test with another user trying to access image using thumbor.
|
|
self.login(self.example_email("iago"))
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 403, result)
|
|
self.assert_in_response("You are not authorized to view this file.", result)
|
|
|
|
def test_external_source_type(self) -> None:
|
|
def run_test_with_image_url(image_url: str) -> None:
|
|
# Test full size image.
|
|
self.login(self.example_email("hamlet"))
|
|
quoted_url = urllib.parse.quote(image_url, safe='')
|
|
encoded_url = base64.urlsafe_b64encode(image_url.encode()).decode('utf-8')
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_url))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = '/smart/filters:no_upscale()/' + encoded_url + '/source_type/external'
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test thumbnail size.
|
|
result = self.client_get("/thumbnail?url=%s&size=thumbnail" % (quoted_url))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = '/0x100/smart/filters:no_upscale()/' + encoded_url + '/source_type/external'
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test api endpoint with standard API authentication.
|
|
self.logout()
|
|
user_profile = self.example_user("hamlet")
|
|
result = self.api_get(user_profile.email,
|
|
"/thumbnail?url=%s&size=thumbnail" % (quoted_url,))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = '/0x100/smart/filters:no_upscale()/' + encoded_url + '/source_type/external'
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test api endpoint with legacy API authentication.
|
|
user_profile = self.example_user("hamlet")
|
|
result = self.client_get("/thumbnail?url=%s&size=thumbnail&api_key=%s" % (
|
|
quoted_url, get_api_key(user_profile)))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = '/0x100/smart/filters:no_upscale()/' + encoded_url + '/source_type/external'
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test a second logged-in user; they should also be able to access it
|
|
user_profile = self.example_user("iago")
|
|
result = self.client_get("/thumbnail?url=%s&size=thumbnail&api_key=%s" % (
|
|
quoted_url, get_api_key(user_profile)))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = '/0x100/smart/filters:no_upscale()/' + encoded_url + '/source_type/external'
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test with another user trying to access image using thumbor.
|
|
# File should be always accessible to user in case of external source
|
|
self.login(self.example_email("iago"))
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_url))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = '/smart/filters:no_upscale()/' + encoded_url + '/source_type/external'
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
image_url = 'https://images.foobar.com/12345'
|
|
run_test_with_image_url(image_url)
|
|
|
|
image_url = 'http://images.foobar.com/12345'
|
|
run_test_with_image_url(image_url)
|
|
|
|
def test_local_file_type(self) -> None:
|
|
def get_file_path_urlpart(uri: str, size: str='') -> str:
|
|
base = '/user_uploads/'
|
|
url_in_result = 'smart/filters:no_upscale()/%s/source_type/local_file'
|
|
if size:
|
|
url_in_result = '/%s/%s' % (size, url_in_result)
|
|
upload_file_path = uri[len(base):]
|
|
hex_uri = base64.urlsafe_b64encode(upload_file_path.encode()).decode('utf-8')
|
|
return url_in_result % (hex_uri)
|
|
|
|
self.login(self.example_email("hamlet"))
|
|
fp = StringIO("zulip!")
|
|
fp.name = "zulip.jpeg"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
self.assert_json_success(result)
|
|
json = ujson.loads(result.content)
|
|
self.assertIn("uri", json)
|
|
uri = json["uri"]
|
|
base = '/user_uploads/'
|
|
self.assertEqual(base, uri[:len(base)])
|
|
|
|
# Test full size image.
|
|
# We remove the forward slash infront of the `/user_uploads/` to match
|
|
# bugdown behaviour.
|
|
quoted_uri = urllib.parse.quote(uri[1:], safe='')
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri)
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test thumbnail size.
|
|
result = self.client_get("/thumbnail?url=%s&size=thumbnail" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri, '0x100')
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test with a unicode filename.
|
|
fp = StringIO("zulip!")
|
|
fp.name = "μένει.jpg"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
self.assert_json_success(result)
|
|
json = ujson.loads(result.content)
|
|
self.assertIn("uri", json)
|
|
uri = json["uri"]
|
|
|
|
# We remove the forward slash infront of the `/user_uploads/` to match
|
|
# bugdown behaviour.
|
|
quoted_uri = urllib.parse.quote(uri[1:], safe='')
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri)
|
|
self.assertIn(expected_part_url, result.url)
|
|
self.logout()
|
|
|
|
# Tests the /api/v1/thumbnail api endpoint with HTTP basic auth.
|
|
user_profile = self.example_user("hamlet")
|
|
result = self.api_get(
|
|
self.example_email("hamlet"),
|
|
'/thumbnail?url=%s&size=full' %
|
|
(quoted_uri,))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri)
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Tests the /api/v1/thumbnail api endpoint with ?api_key
|
|
# auth.
|
|
user_profile = self.example_user("hamlet")
|
|
result = self.client_get(
|
|
'/thumbnail?url=%s&size=full&api_key=%s' %
|
|
(quoted_uri, get_api_key(user_profile)))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri)
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test with another user trying to access image using thumbor.
|
|
self.login(self.example_email("iago"))
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 403, result)
|
|
self.assert_in_response("You are not authorized to view this file.", result)
|
|
|
|
@override_settings(THUMBOR_URL='127.0.0.1:9995')
|
|
def test_with_static_files(self) -> None:
|
|
self.login(self.example_email("hamlet"))
|
|
uri = '/static/images/cute/turtle.png'
|
|
quoted_uri = urllib.parse.quote(uri[1:], safe='')
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
self.assertEqual(uri, result.url)
|
|
|
|
def test_with_thumbor_disabled(self) -> None:
|
|
self.login(self.example_email("hamlet"))
|
|
fp = StringIO("zulip!")
|
|
fp.name = "zulip.jpeg"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
self.assert_json_success(result)
|
|
json = ujson.loads(result.content)
|
|
self.assertIn("uri", json)
|
|
uri = json["uri"]
|
|
base = '/user_uploads/'
|
|
self.assertEqual(base, uri[:len(base)])
|
|
|
|
quoted_uri = urllib.parse.quote(uri[1:], safe='')
|
|
|
|
with self.settings(THUMBOR_URL=''):
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
self.assertEqual(uri, result.url)
|
|
|
|
uri = 'https://www.google.com/images/srpr/logo4w.png'
|
|
quoted_uri = urllib.parse.quote(uri, safe='')
|
|
with self.settings(THUMBOR_URL=''):
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
self.assertEqual(uri, result.url)
|
|
|
|
uri = 'http://www.google.com/images/srpr/logo4w.png'
|
|
quoted_uri = urllib.parse.quote(uri, safe='')
|
|
with self.settings(THUMBOR_URL=''):
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
base = 'https://external-content.zulipcdn.net/7b6552b60c635e41e8f6daeb36d88afc4eabde79/687474703a2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67'
|
|
self.assertEqual(base, result.url)
|
|
|
|
def test_with_different_THUMBOR_URL(self) -> None:
|
|
self.login(self.example_email("hamlet"))
|
|
fp = StringIO("zulip!")
|
|
fp.name = "zulip.jpeg"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
self.assert_json_success(result)
|
|
json = ujson.loads(result.content)
|
|
self.assertIn("uri", json)
|
|
uri = json["uri"]
|
|
base = '/user_uploads/'
|
|
self.assertEqual(base, uri[:len(base)])
|
|
|
|
quoted_uri = urllib.parse.quote(uri[1:], safe='')
|
|
hex_uri = base64.urlsafe_b64encode(uri[len('/user_uploads/'):].encode()).decode('utf-8')
|
|
with self.settings(THUMBOR_URL='http://test-thumborhost.com'):
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
base = 'http://test-thumborhost.com/'
|
|
self.assertEqual(base, result.url[:len(base)])
|
|
expected_part_url = '/smart/filters:no_upscale()/' + hex_uri + '/source_type/local_file'
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
def test_with_different_sizes(self) -> None:
|
|
def get_file_path_urlpart(uri: str, size: str='') -> str:
|
|
base = '/user_uploads/'
|
|
url_in_result = 'smart/filters:no_upscale()/%s/source_type/local_file'
|
|
if size:
|
|
url_in_result = '/%s/%s' % (size, url_in_result)
|
|
upload_file_path = uri[len(base):]
|
|
hex_uri = base64.urlsafe_b64encode(upload_file_path.encode()).decode('utf-8')
|
|
return url_in_result % (hex_uri)
|
|
|
|
self.login(self.example_email("hamlet"))
|
|
fp = StringIO("zulip!")
|
|
fp.name = "zulip.jpeg"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
self.assert_json_success(result)
|
|
json = ujson.loads(result.content)
|
|
self.assertIn("uri", json)
|
|
uri = json["uri"]
|
|
base = '/user_uploads/'
|
|
self.assertEqual(base, uri[:len(base)])
|
|
|
|
# Test with size supplied as a query parameter.
|
|
# size=thumbnail should return a 0x100 sized image.
|
|
# size=full should return the original resolution image.
|
|
quoted_uri = urllib.parse.quote(uri[1:], safe='')
|
|
result = self.client_get("/thumbnail?url=%s&size=thumbnail" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri, '0x100')
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
result = self.client_get("/thumbnail?url=%s&size=full" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 302, result)
|
|
expected_part_url = get_file_path_urlpart(uri)
|
|
self.assertIn(expected_part_url, result.url)
|
|
|
|
# Test with size supplied as a query parameter where size is anything
|
|
# else than 'full' or 'thumbnail'. Result should be an error message.
|
|
result = self.client_get("/thumbnail?url=%s&size=480x360" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 403, result)
|
|
self.assert_in_response("Invalid size.", result)
|
|
|
|
# Test with no size param supplied. In this case as well we show an
|
|
# error message.
|
|
result = self.client_get("/thumbnail?url=%s" % (quoted_uri))
|
|
self.assertEqual(result.status_code, 400, "Missing 'size' argument")
|