zerver/lib/upload.py: Fix string types.

This commit is contained in:
Eklavya Sharma
2016-06-29 20:43:28 +05:30
parent f094123fd3
commit b76dc9bf4e
2 changed files with 32 additions and 34 deletions

View File

@@ -4,7 +4,6 @@ from typing import Optional, Tuple, Mapping, Any
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from django.conf import settings from django.conf import settings
from django.template.defaultfilters import slugify from django.template.defaultfilters import slugify
from django.utils.encoding import force_text
from django.core.files import File from django.core.files import File
from django.http import HttpRequest from django.http import HttpRequest
from jinja2 import Markup as mark_safe from jinja2 import Markup as mark_safe
@@ -12,6 +11,7 @@ import unicodedata
from zerver.lib.avatar import user_avatar_hash from zerver.lib.avatar import user_avatar_hash
from zerver.lib.request import JsonableError from zerver.lib.request import JsonableError
from zerver.lib.str_utils import force_text, force_str, NonBinaryStr
from boto.s3.bucket import Bucket from boto.s3.bucket import Bucket
from boto.s3.key import Key from boto.s3.key import Key
@@ -26,10 +26,9 @@ from six.moves import urllib
import base64 import base64
import os import os
import re import re
import six
from PIL import Image, ImageOps from PIL import Image, ImageOps
from six import binary_type, text_type from six import binary_type, text_type
from six.moves import cStringIO as StringIO import io
import random import random
import logging import logging
@@ -48,8 +47,8 @@ import logging
# "file name" is the original filename provided by the user run # "file name" is the original filename provided by the user run
# through a sanitization function. # through a sanitization function.
def sanitize_name(value): def sanitize_name(raw_value):
# type: (six.text_type) -> str # type: (NonBinaryStr) -> text_type
""" """
Sanitizes a value to be safe to store in a Linux filesystem, in Sanitizes a value to be safe to store in a Linux filesystem, in
S3, and in a URL. So unicode is allowed, but not special S3, and in a URL. So unicode is allowed, but not special
@@ -61,27 +60,27 @@ def sanitize_name(value):
* adding '.' and '_' to the list of allowed characters. * adding '.' and '_' to the list of allowed characters.
* preserving the case of the value. * preserving the case of the value.
""" """
value = force_text(value) value = force_text(raw_value)
value = unicodedata.normalize('NFKC', value) value = unicodedata.normalize('NFKC', value)
value = re.sub('[^\w\s._-]', '', value, flags=re.U).strip() value = re.sub('[^\w\s._-]', '', value, flags=re.U).strip()
return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U)) return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U))
def random_name(bytes=60): def random_name(bytes=60):
# type: (int) -> str # type: (int) -> text_type
return base64.urlsafe_b64encode(os.urandom(bytes)) return base64.urlsafe_b64encode(os.urandom(bytes)).decode('utf-8')
class BadImageError(JsonableError): class BadImageError(JsonableError):
pass pass
def resize_avatar(image_data): def resize_avatar(image_data):
# type: (str) -> str # type: (binary_type) -> binary_type
AVATAR_SIZE = 100 AVATAR_SIZE = 100
try: try:
im = Image.open(StringIO(image_data)) im = Image.open(io.BytesIO(image_data))
im = ImageOps.fit(im, (AVATAR_SIZE, AVATAR_SIZE), Image.ANTIALIAS) im = ImageOps.fit(im, (AVATAR_SIZE, AVATAR_SIZE), Image.ANTIALIAS)
except IOError: except IOError:
raise BadImageError("Could not decode avatar image; did you upload an image file?") raise BadImageError("Could not decode avatar image; did you upload an image file?")
out = StringIO() out = io.BytesIO()
im.save(out, format='png') im.save(out, format='png')
return out.getvalue() return out.getvalue()
@@ -89,7 +88,7 @@ def resize_avatar(image_data):
class ZulipUploadBackend(object): class ZulipUploadBackend(object):
def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None): def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (str, str, str, UserProfile, Optional[Realm]) -> str # type: (text_type, text_type, binary_type, UserProfile, Optional[Realm]) -> text_type
raise NotImplementedError() raise NotImplementedError()
def upload_avatar_image(self, user_file, user_profile, email): def upload_avatar_image(self, user_file, user_profile, email):
@@ -122,45 +121,43 @@ def upload_image_to_s3(
user_profile, user_profile,
contents, contents,
): ):
# type: (text_type, text_type, text_type, UserProfile, text_type) -> None # type: (NonBinaryStr, text_type, text_type, UserProfile, binary_type) -> None
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, bucket_name) bucket = get_bucket(conn, force_str(bucket_name))
key = Key(bucket) key = Key(bucket)
key.key = file_name key.key = force_str(file_name)
key.set_metadata("user_profile_id", str(user_profile.id)) key.set_metadata("user_profile_id", str(user_profile.id))
key.set_metadata("realm_id", str(user_profile.realm.id)) key.set_metadata("realm_id", str(user_profile.realm.id))
if content_type: if content_type:
headers = {'Content-Type': content_type} headers = {'Content-Type': force_str(content_type)}
else: else:
headers = None headers = None
key.set_contents_from_string(contents, headers=headers) key.set_contents_from_string(contents, headers=headers)
def get_file_info(request, user_file): def get_file_info(request, user_file):
# type: (HttpRequest, File) -> Tuple[str, str] # type: (HttpRequest, File) -> Tuple[text_type, text_type]
# `user_file.name` is a unicode whereas it should be an ascii uploaded_file_name = user_file.name
# so convert it into an ascii.
uploaded_file_name = user_file.name.encode('ascii')
content_type = request.GET.get('mimetype') content_type = request.GET.get('mimetype')
if content_type is None: if content_type is None:
content_type = guess_type(uploaded_file_name)[0] content_type = force_text(guess_type(uploaded_file_name)[0])
else: else:
uploaded_file_name = uploaded_file_name + guess_extension(content_type) uploaded_file_name = uploaded_file_name + guess_extension(content_type)
uploaded_file_name = urllib.parse.unquote(uploaded_file_name).decode('utf-8') uploaded_file_name = urllib.parse.unquote(uploaded_file_name)
return uploaded_file_name, content_type return uploaded_file_name, content_type
def get_signed_upload_url(path): def get_signed_upload_url(path):
# type: (str) -> str # type: (text_type) -> text_type
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
return conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=path) return force_text(conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=force_str(path)))
def get_realm_for_filename(path): def get_realm_for_filename(path):
# type: (str) -> int # type: (text_type) -> Optional[int]
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path) key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path)
if key is None: if key is None:
@@ -170,7 +167,7 @@ def get_realm_for_filename(path):
class S3UploadBackend(ZulipUploadBackend): class S3UploadBackend(ZulipUploadBackend):
def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None): def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (str, str, str, UserProfile, Optional[Realm]) -> str # type: (text_type, text_type, binary_type, UserProfile, Optional[Realm]) -> text_type
bucket_name = settings.S3_AUTH_UPLOADS_BUCKET bucket_name = settings.S3_AUTH_UPLOADS_BUCKET
s3_file_name = "/".join([ s3_file_name = "/".join([
str(target_realm.id if target_realm is not None else user_profile.realm.id), str(target_realm.id if target_realm is not None else user_profile.realm.id),
@@ -234,7 +231,7 @@ class S3UploadBackend(ZulipUploadBackend):
### Local ### Local
def mkdirs(path): def mkdirs(path):
# type: (str) -> None # type: (text_type) -> None
dirname = os.path.dirname(path) dirname = os.path.dirname(path)
if not os.path.isdir(dirname): if not os.path.isdir(dirname):
os.makedirs(dirname) os.makedirs(dirname)
@@ -247,6 +244,7 @@ def write_local_file(type, path, file_data):
f.write(file_data) f.write(file_data)
def get_local_file_path(path_id): def get_local_file_path(path_id):
# type: (text_type) -> Optional[text_type]
local_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id) local_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
if os.path.isfile(local_path): if os.path.isfile(local_path):
return local_path return local_path
@@ -255,7 +253,7 @@ def get_local_file_path(path_id):
class LocalUploadBackend(ZulipUploadBackend): class LocalUploadBackend(ZulipUploadBackend):
def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None): def upload_message_image(self, uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (str, str, str, UserProfile, Optional[Realm]) -> str # type: (text_type, text_type, binary_type, UserProfile, Optional[Realm]) -> text_type
# Split into 256 subdirectories to prevent directories from getting too big # Split into 256 subdirectories to prevent directories from getting too big
path = "/".join([ path = "/".join([
str(user_profile.realm.id), str(user_profile.realm.id),
@@ -305,7 +303,7 @@ def upload_avatar_image(user_file, user_profile, email):
upload_backend.upload_avatar_image(user_file, user_profile, email) upload_backend.upload_avatar_image(user_file, user_profile, email)
def upload_message_image(uploaded_file_name, content_type, file_data, user_profile, target_realm=None): def upload_message_image(uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (str, str, str, UserProfile, Optional[Realm]) -> str # type: (text_type, text_type, binary_type, UserProfile, Optional[Realm]) -> text_type
return upload_backend.upload_message_image(uploaded_file_name, content_type, file_data, return upload_backend.upload_message_image(uploaded_file_name, content_type, file_data,
user_profile, target_realm=target_realm) user_profile, target_realm=target_realm)
@@ -326,11 +324,11 @@ def claim_attachment(user_profile, path_id, message, is_message_realm_public):
return False return False
def create_attachment(file_name, path_id, user_profile): def create_attachment(file_name, path_id, user_profile):
# type: (str, str, UserProfile) -> bool # type: (text_type, text_type, UserProfile) -> bool
Attachment.objects.create(file_name=file_name, path_id=path_id, owner=user_profile, realm=user_profile.realm) Attachment.objects.create(file_name=file_name, path_id=path_id, owner=user_profile, realm=user_profile.realm)
return True return True
def upload_message_image_from_request(request, user_file, user_profile): def upload_message_image_from_request(request, user_file, user_profile):
# type: (HttpRequest, File, UserProfile) -> str # type: (HttpRequest, File, UserProfile) -> text_type
uploaded_file_name, content_type = get_file_info(request, user_file) uploaded_file_name, content_type = get_file_info(request, user_file)
return upload_message_image(uploaded_file_name, content_type, user_file.read(), user_profile) return upload_message_image(uploaded_file_name, content_type, user_file.read(), user_profile)

View File

@@ -271,7 +271,7 @@ class LocalStorageTest(AuthedTestCase):
# type: () -> None # type: () -> None
sender_email = "hamlet@zulip.com" sender_email = "hamlet@zulip.com"
user_profile = get_user_profile_by_email(sender_email) user_profile = get_user_profile_by_email(sender_email)
uri = upload_message_image('dummy.txt', 'text/plain', 'zulip!', user_profile) uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile)
base = '/user_uploads/' base = '/user_uploads/'
self.assertEquals(base, uri[:len(base)]) self.assertEquals(base, uri[:len(base)])
@@ -316,7 +316,7 @@ class S3Test(AuthedTestCase):
sender_email = "hamlet@zulip.com" sender_email = "hamlet@zulip.com"
user_profile = get_user_profile_by_email(sender_email) user_profile = get_user_profile_by_email(sender_email)
uri = upload_message_image('dummy.txt', 'text/plain', 'zulip!', user_profile) uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile)
base = '/user_uploads/' base = '/user_uploads/'
self.assertEquals(base, uri[:len(base)]) self.assertEquals(base, uri[:len(base)])
@@ -336,7 +336,7 @@ class S3Test(AuthedTestCase):
sender_email = "hamlet@zulip.com" sender_email = "hamlet@zulip.com"
user_profile = get_user_profile_by_email(sender_email) user_profile = get_user_profile_by_email(sender_email)
uri = upload_message_image('dummy.txt', 'text/plain', 'zulip!', user_profile) uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile)
path_id = re.sub('/user_uploads/', '', uri) path_id = re.sub('/user_uploads/', '', uri)
self.assertTrue(delete_message_image(path_id)) self.assertTrue(delete_message_image(path_id))