mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	This is some of the code we'd need if we wanted to have Zulip generate avatars for things. Since it is so little useful code, and it's not clear we will need this feature ever, we can remove this code to make the codebase less confusing. It'd be easy to dig this out of history if we ever want it. Fixes #2101.
		
			
				
	
	
		
			574 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			574 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
from __future__ import absolute_import
 | 
						|
from django.conf import settings
 | 
						|
from django.test import TestCase, override_settings
 | 
						|
from unittest import skip
 | 
						|
 | 
						|
from zerver.lib.avatar import avatar_url
 | 
						|
from zerver.lib.bugdown import url_filename
 | 
						|
from zerver.lib.test_helpers import ZulipTestCase
 | 
						|
from zerver.lib.test_runner import slow
 | 
						|
from zerver.lib.upload import sanitize_name, S3UploadBackend, \
 | 
						|
    upload_message_image, delete_message_image, LocalUploadBackend
 | 
						|
import zerver.lib.upload
 | 
						|
from zerver.models import Attachment, Recipient, get_user_profile_by_email, \
 | 
						|
    get_old_unclaimed_attachments, Message, UserProfile
 | 
						|
from zerver.lib.actions import do_delete_old_unclaimed_attachments
 | 
						|
 | 
						|
import ujson
 | 
						|
from six.moves import urllib
 | 
						|
 | 
						|
from boto.s3.connection import S3Connection
 | 
						|
from boto.s3.key import Key
 | 
						|
from six.moves import StringIO as _StringIO
 | 
						|
import mock
 | 
						|
import os
 | 
						|
import shutil
 | 
						|
import re
 | 
						|
import datetime
 | 
						|
import requests
 | 
						|
import base64
 | 
						|
from datetime import timedelta
 | 
						|
from django.utils import timezone
 | 
						|
 | 
						|
from moto import mock_s3
 | 
						|
 | 
						|
TEST_AVATAR_DIR = os.path.join(os.path.dirname(__file__), 'images')
 | 
						|
 | 
						|
from typing import Any, Callable, TypeVar
 | 
						|
 | 
						|
def destroy_uploads():
 | 
						|
    # type: () -> None
 | 
						|
    if os.path.exists(settings.LOCAL_UPLOADS_DIR):
 | 
						|
        shutil.rmtree(settings.LOCAL_UPLOADS_DIR)
 | 
						|
 | 
						|
class StringIO(_StringIO):
 | 
						|
    name = '' # https://github.com/python/typeshed/issues/598
 | 
						|
 | 
						|
class FileUploadTest(ZulipTestCase):
 | 
						|
 | 
						|
    def test_rest_endpoint(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Tests the /api/v1/user_uploads api endpoint. Here a single file is uploaded
 | 
						|
        and downloaded using a username and api_key
 | 
						|
        """
 | 
						|
        fp = StringIO("zulip!")
 | 
						|
        fp.name = "zulip.txt"
 | 
						|
 | 
						|
        # Upload file via API
 | 
						|
        auth_headers = self.api_auth('hamlet@zulip.com')
 | 
						|
        result = self.client_post('/api/v1/user_uploads', {'file': fp}, **auth_headers)
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        self.assertIn("uri", json)
 | 
						|
        uri = json["uri"]
 | 
						|
        base = '/user_uploads/'
 | 
						|
        self.assertEquals(base, uri[:len(base)])
 | 
						|
 | 
						|
        # Download file via API
 | 
						|
        self.client_post('/accounts/logout/')
 | 
						|
        response = self.client_get(uri, **auth_headers)
 | 
						|
        data = b"".join(response.streaming_content)
 | 
						|
        self.assertEquals(b"zulip!", data)
 | 
						|
 | 
						|
        # Files uploaded through the API should be accesible via the web client
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        response = self.client_get(uri)
 | 
						|
        data = b"".join(response.streaming_content)
 | 
						|
        self.assertEquals(b"zulip!", data)
 | 
						|
 | 
						|
    def test_file_too_big_failure(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Attempting to upload big files should fail.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        fp = StringIO("bah!")
 | 
						|
        fp.name = "a.txt"
 | 
						|
 | 
						|
        # Use MAX_FILE_UPLOAD_SIZE of 0, because the next increment
 | 
						|
        # would be 1MB.
 | 
						|
        with self.settings(MAX_FILE_UPLOAD_SIZE=0):
 | 
						|
            result = self.client_post("/json/upload_file", {'f1': fp})
 | 
						|
        self.assert_json_error(result, 'File Upload is larger than allowed limit')
 | 
						|
 | 
						|
    def test_multiple_upload_failure(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Attempting to upload two files should fail.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        fp = StringIO("bah!")
 | 
						|
        fp.name = "a.txt"
 | 
						|
        fp2 = StringIO("pshaw!")
 | 
						|
        fp2.name = "b.txt"
 | 
						|
 | 
						|
        result = self.client_post("/json/upload_file", {'f1': fp, 'f2': fp2})
 | 
						|
        self.assert_json_error(result, "You may only upload one file at a time")
 | 
						|
 | 
						|
    def test_no_file_upload_failure(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Calling this endpoint with no files should fail.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
 | 
						|
        result = self.client_post("/json/upload_file")
 | 
						|
        self.assert_json_error(result, "You must specify a file to upload")
 | 
						|
 | 
						|
    def test_download_non_existent_file(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        response = self.client_get('/user_uploads/unk/nonexistent_file')
 | 
						|
        self.assertEquals(response.status_code, 404)
 | 
						|
        self.assertIn('File not found', str(response.content))
 | 
						|
 | 
						|
    def test_serve_s3_error_handling(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        use_s3 = lambda: self.settings(LOCAL_UPLOADS_DIR=None)
 | 
						|
        getting_realm_id = lambda realm_id: mock.patch(
 | 
						|
            'zerver.views.upload.get_realm_for_filename',
 | 
						|
            return_value=realm_id
 | 
						|
        )
 | 
						|
 | 
						|
        # nonexistent_file
 | 
						|
        with use_s3(), getting_realm_id(None):
 | 
						|
            response = self.client_get('/user_uploads/unk/nonexistent_file')
 | 
						|
        self.assertEquals(response.status_code, 404)
 | 
						|
        self.assertIn('File not found', str(response.content))
 | 
						|
 | 
						|
        # invalid realm of 999999 (for non-zulip.com)
 | 
						|
        user = get_user_profile_by_email('hamlet@zulip.com')
 | 
						|
        user.realm.domain = 'example.com'
 | 
						|
        user.realm.save()
 | 
						|
 | 
						|
        with use_s3(), getting_realm_id(999999):
 | 
						|
            response = self.client_get('/user_uploads/unk/whatever')
 | 
						|
        self.assertEquals(response.status_code, 403)
 | 
						|
 | 
						|
    # This test will go through the code path for uploading files onto LOCAL storage
 | 
						|
    # when zulip is in DEVELOPMENT mode.
 | 
						|
    def test_file_upload_authed(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/upload_file should return a uri and actually create an
 | 
						|
        entry in the database. This entry will be marked unclaimed till a message
 | 
						|
        refers it.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        fp = StringIO("zulip!")
 | 
						|
        fp.name = "zulip.txt"
 | 
						|
 | 
						|
        result = self.client_post("/json/upload_file", {'file': fp})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        self.assertIn("uri", json)
 | 
						|
        uri = json["uri"]
 | 
						|
        base = '/user_uploads/'
 | 
						|
        self.assertEquals(base, uri[:len(base)])
 | 
						|
 | 
						|
        # In the future, local file requests will follow the same style as S3
 | 
						|
        # requests; they will be first authenthicated and redirected
 | 
						|
        response = self.client_get(uri)
 | 
						|
        data = b"".join(response.streaming_content)
 | 
						|
        self.assertEquals(b"zulip!", data)
 | 
						|
 | 
						|
        # check if DB has attachment marked as unclaimed
 | 
						|
        entry = Attachment.objects.get(file_name='zulip.txt')
 | 
						|
        self.assertEquals(entry.is_claimed(), False)
 | 
						|
 | 
						|
        self.subscribe_to_stream("hamlet@zulip.com", "Denmark")
 | 
						|
        body = "First message ...[zulip.txt](http://localhost:9991" + uri + ")"
 | 
						|
        self.send_message("hamlet@zulip.com", "Denmark", Recipient.STREAM, body, "test")
 | 
						|
        self.assertIn('title="zulip.txt"', self.get_last_message().rendered_content)
 | 
						|
 | 
						|
    def test_delete_old_unclaimed_attachments(self):
 | 
						|
        # type: () -> None
 | 
						|
 | 
						|
        # Upload some files and make them older than a weeek
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        d1 = StringIO("zulip!")
 | 
						|
        d1.name = "dummy_1.txt"
 | 
						|
        result = self.client_post("/json/upload_file", {'file': d1})
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        uri = json["uri"]
 | 
						|
        d1_path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
 | 
						|
        d2 = StringIO("zulip!")
 | 
						|
        d2.name = "dummy_2.txt"
 | 
						|
        result = self.client_post("/json/upload_file", {'file': d2})
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        uri = json["uri"]
 | 
						|
        d2_path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
 | 
						|
        two_week_ago = timezone.now() - datetime.timedelta(weeks=2)
 | 
						|
        d1_attachment = Attachment.objects.get(path_id = d1_path_id)
 | 
						|
        d1_attachment.create_time = two_week_ago
 | 
						|
        d1_attachment.save()
 | 
						|
        self.assertEqual(str(d1_attachment), u'<Attachment: dummy_1.txt>')
 | 
						|
        d2_attachment = Attachment.objects.get(path_id = d2_path_id)
 | 
						|
        d2_attachment.create_time = two_week_ago
 | 
						|
        d2_attachment.save()
 | 
						|
 | 
						|
        # Send message refering only dummy_1
 | 
						|
        self.subscribe_to_stream("hamlet@zulip.com", "Denmark")
 | 
						|
        body = "Some files here ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
 | 
						|
        self.send_message("hamlet@zulip.com", "Denmark", Recipient.STREAM, body, "test")
 | 
						|
 | 
						|
        # dummy_2 should not exist in database or the uploads folder
 | 
						|
        do_delete_old_unclaimed_attachments(2)
 | 
						|
        self.assertTrue(not Attachment.objects.filter(path_id = d2_path_id).exists())
 | 
						|
        self.assertTrue(not delete_message_image(d2_path_id))
 | 
						|
 | 
						|
    def test_multiple_claim_attachments(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        This test tries to claim the same attachment twice. The messages field in
 | 
						|
        the Attachment model should have both the messages in its entry.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        d1 = StringIO("zulip!")
 | 
						|
        d1.name = "dummy_1.txt"
 | 
						|
        result = self.client_post("/json/upload_file", {'file': d1})
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        uri = json["uri"]
 | 
						|
        d1_path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
 | 
						|
        self.subscribe_to_stream("hamlet@zulip.com", "Denmark")
 | 
						|
        body = "First message ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
 | 
						|
        self.send_message("hamlet@zulip.com", "Denmark", Recipient.STREAM, body, "test")
 | 
						|
        body = "Second message ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
 | 
						|
        self.send_message("hamlet@zulip.com", "Denmark", Recipient.STREAM, body, "test")
 | 
						|
 | 
						|
        self.assertEquals(Attachment.objects.get(path_id=d1_path_id).messages.count(), 2)
 | 
						|
 | 
						|
    def test_check_attachment_reference_update(self):
 | 
						|
        # type: () -> None
 | 
						|
        f1 = StringIO("file1")
 | 
						|
        f1.name = "file1.txt"
 | 
						|
        f2 = StringIO("file2")
 | 
						|
        f2.name = "file2.txt"
 | 
						|
        f3 = StringIO("file3")
 | 
						|
        f3.name = "file3.txt"
 | 
						|
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        result = self.client_post("/json/upload_file", {'file': f1})
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        uri = json["uri"]
 | 
						|
        f1_path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
 | 
						|
        result = self.client_post("/json/upload_file", {'file': f2})
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        uri = json["uri"]
 | 
						|
        f2_path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
 | 
						|
        self.subscribe_to_stream("hamlet@zulip.com", "test")
 | 
						|
        body = ("[f1.txt](http://localhost:9991/user_uploads/" + f1_path_id + ")"
 | 
						|
               "[f2.txt](http://localhost:9991/user_uploads/" + f2_path_id + ")")
 | 
						|
        msg_id = self.send_message("hamlet@zulip.com", "test", Recipient.STREAM, body, "test")
 | 
						|
 | 
						|
        result = self.client_post("/json/upload_file", {'file': f3})
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        uri = json["uri"]
 | 
						|
        f3_path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
 | 
						|
        new_body = ("[f3.txt](http://localhost:9991/user_uploads/" + f3_path_id + ")"
 | 
						|
                   "[f2.txt](http://localhost:9991/user_uploads/" + f2_path_id + ")")
 | 
						|
        result = self.client_post("/json/update_message", {
 | 
						|
            'message_id': msg_id,
 | 
						|
            'content': new_body
 | 
						|
        })
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        message = Message.objects.get(id=msg_id)
 | 
						|
        f1_attachment = Attachment.objects.get(path_id=f1_path_id)
 | 
						|
        f2_attachment = Attachment.objects.get(path_id=f2_path_id)
 | 
						|
        f3_attachment = Attachment.objects.get(path_id=f3_path_id)
 | 
						|
 | 
						|
        self.assertTrue(message not in f1_attachment.messages.all())
 | 
						|
        self.assertTrue(message in f2_attachment.messages.all())
 | 
						|
        self.assertTrue(message in f3_attachment.messages.all())
 | 
						|
 | 
						|
        # Delete all the attachments from the message
 | 
						|
        new_body = "(deleted)"
 | 
						|
        result = self.client_post("/json/update_message", {
 | 
						|
            'message_id': msg_id,
 | 
						|
            'content': new_body
 | 
						|
        })
 | 
						|
        self.assert_json_success(result)
 | 
						|
 | 
						|
        message = Message.objects.get(id=msg_id)
 | 
						|
        f1_attachment = Attachment.objects.get(path_id=f1_path_id)
 | 
						|
        f2_attachment = Attachment.objects.get(path_id=f2_path_id)
 | 
						|
        f3_attachment = Attachment.objects.get(path_id=f3_path_id)
 | 
						|
        self.assertTrue(message not in f1_attachment.messages.all())
 | 
						|
        self.assertTrue(message not in f2_attachment.messages.all())
 | 
						|
        self.assertTrue(message not in f3_attachment.messages.all())
 | 
						|
 | 
						|
    def test_file_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Unicode filenames should be processed correctly.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        for expected in ["Здравейте.txt", "test"]:
 | 
						|
            fp = StringIO("bah!")
 | 
						|
            fp.name = urllib.parse.quote(expected)
 | 
						|
 | 
						|
            result = self.client_post("/json/upload_file", {'f1': fp})
 | 
						|
            content = ujson.loads(result.content)
 | 
						|
            assert sanitize_name(expected) in content['uri']
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        # type: () -> None
 | 
						|
        destroy_uploads()
 | 
						|
 | 
						|
class AvatarTest(ZulipTestCase):
 | 
						|
 | 
						|
    def test_multiple_upload_failure(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Attempting to upload two files should fail.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        fp1 = open(os.path.join(TEST_AVATAR_DIR, 'img.png'), 'rb')
 | 
						|
        fp2 = open(os.path.join(TEST_AVATAR_DIR, 'img.png'), 'rb')
 | 
						|
 | 
						|
        result = self.client_post("/json/set_avatar", {'f1': fp1, 'f2': fp2})
 | 
						|
        self.assert_json_error(result, "You must upload exactly one avatar.")
 | 
						|
 | 
						|
    def test_no_file_upload_failure(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        Calling this endpoint with no files should fail.
 | 
						|
        """
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
 | 
						|
        result = self.client_post("/json/set_avatar")
 | 
						|
        self.assert_json_error(result, "You must upload exactly one avatar.")
 | 
						|
 | 
						|
    correct_files = [
 | 
						|
        ('img.png', 'png_resized.png'),
 | 
						|
        ('img.jpg', None), # jpeg resizing is platform-dependent
 | 
						|
        ('img.gif', 'gif_resized.png'),
 | 
						|
        ('img.tif', 'tif_resized.png')
 | 
						|
    ]
 | 
						|
    corrupt_files = ['text.txt', 'corrupt.png', 'corrupt.gif']
 | 
						|
 | 
						|
    def test_get_gravatar_avatar(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        cordelia = get_user_profile_by_email('cordelia@zulip.com')
 | 
						|
 | 
						|
        cordelia.avatar_source = UserProfile.AVATAR_FROM_GRAVATAR
 | 
						|
        cordelia.save()
 | 
						|
        with self.settings(ENABLE_GRAVATAR=True):
 | 
						|
            response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
 | 
						|
            redirect_url = response['Location']
 | 
						|
            self.assertEqual(redirect_url, avatar_url(cordelia) + '&foo=bar')
 | 
						|
 | 
						|
        with self.settings(ENABLE_GRAVATAR=False):
 | 
						|
            response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
 | 
						|
            redirect_url = response['Location']
 | 
						|
            self.assertTrue(redirect_url.endswith(avatar_url(cordelia) + '&foo=bar'))
 | 
						|
 | 
						|
    def test_get_user_avatar(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        cordelia = get_user_profile_by_email('cordelia@zulip.com')
 | 
						|
 | 
						|
        cordelia.avatar_source = UserProfile.AVATAR_FROM_USER
 | 
						|
        cordelia.save()
 | 
						|
        response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
 | 
						|
        redirect_url = response['Location']
 | 
						|
        self.assertTrue(redirect_url.endswith(avatar_url(cordelia) + '&foo=bar'))
 | 
						|
 | 
						|
    def test_non_valid_user_avatar(self):
 | 
						|
        # type: () -> None
 | 
						|
 | 
						|
        # It's debatable whether we should generate avatars for non-users,
 | 
						|
        # but this test just validates the current code's behavior.
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
 | 
						|
        response = self.client_get("/avatar/nonexistent_user@zulip.com?foo=bar")
 | 
						|
        redirect_url = response['Location']
 | 
						|
        actual_url = 'https://secure.gravatar.com/avatar/444258b521f152129eb0c162996e572d?d=identicon&foo=bar'
 | 
						|
        self.assertEqual(redirect_url, actual_url)
 | 
						|
 | 
						|
    def test_valid_avatars(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/set_avatar with a valid file should return a url and actually create an avatar.
 | 
						|
        """
 | 
						|
        for fname, rfname in self.correct_files:
 | 
						|
            # TODO: use self.subTest once we're exclusively on python 3 by uncommenting the line below.
 | 
						|
            # with self.subTest(fname=fname):
 | 
						|
            self.login("hamlet@zulip.com")
 | 
						|
            fp = open(os.path.join(TEST_AVATAR_DIR, fname), 'rb')
 | 
						|
 | 
						|
            result = self.client_post("/json/set_avatar", {'file': fp})
 | 
						|
            self.assert_json_success(result)
 | 
						|
            json = ujson.loads(result.content)
 | 
						|
            self.assertIn("avatar_url", json)
 | 
						|
            url = json["avatar_url"]
 | 
						|
            base = '/user_avatars/'
 | 
						|
            self.assertEquals(base, url[:len(base)])
 | 
						|
 | 
						|
            if rfname is not None:
 | 
						|
                rfp = open(os.path.join(TEST_AVATAR_DIR, rfname), 'rb')
 | 
						|
                response = self.client_get(url)
 | 
						|
                data = b"".join(response.streaming_content)
 | 
						|
                self.assertEquals(rfp.read(), data)
 | 
						|
 | 
						|
    def test_invalid_avatars(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/set_avatar with an invalid file should fail.
 | 
						|
        """
 | 
						|
        for fname in self.corrupt_files:
 | 
						|
            # with self.subTest(fname=fname):
 | 
						|
            self.login("hamlet@zulip.com")
 | 
						|
            fp = open(os.path.join(TEST_AVATAR_DIR, fname), 'rb')
 | 
						|
 | 
						|
            result = self.client_post("/json/set_avatar", {'file': fp})
 | 
						|
            self.assert_json_error(result, "Could not decode avatar image; did you upload an image file?")
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        # type: () -> None
 | 
						|
        destroy_uploads()
 | 
						|
 | 
						|
class LocalStorageTest(ZulipTestCase):
 | 
						|
 | 
						|
    def test_file_upload_local(self):
 | 
						|
        # type: () -> None
 | 
						|
        sender_email = "hamlet@zulip.com"
 | 
						|
        user_profile = get_user_profile_by_email(sender_email)
 | 
						|
        uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile)
 | 
						|
 | 
						|
        base = '/user_uploads/'
 | 
						|
        self.assertEquals(base, uri[:len(base)])
 | 
						|
        path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
        file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
 | 
						|
        self.assertTrue(os.path.isfile(file_path))
 | 
						|
 | 
						|
    def test_delete_message_image_local(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        fp = StringIO("zulip!")
 | 
						|
        fp.name = "zulip.txt"
 | 
						|
        result = self.client_post("/json/upload_file", {'file': fp})
 | 
						|
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        uri = json["uri"]
 | 
						|
        path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
        self.assertTrue(delete_message_image(path_id))
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        # type: () -> None
 | 
						|
        destroy_uploads()
 | 
						|
 | 
						|
FuncT = TypeVar('FuncT', bound=Callable[..., None])
 | 
						|
 | 
						|
def use_s3_backend(method):
 | 
						|
    # type: (FuncT) -> FuncT
 | 
						|
    @mock_s3
 | 
						|
    @override_settings(LOCAL_UPLOADS_DIR=None)
 | 
						|
    def new_method(*args, **kwargs):
 | 
						|
        # type: (*Any, **Any) -> Any
 | 
						|
        zerver.lib.upload.upload_backend = S3UploadBackend()
 | 
						|
        try:
 | 
						|
            return method(*args, **kwargs)
 | 
						|
        finally:
 | 
						|
            zerver.lib.upload.upload_backend = LocalUploadBackend()
 | 
						|
    return new_method
 | 
						|
 | 
						|
class S3Test(ZulipTestCase):
 | 
						|
 | 
						|
    @use_s3_backend
 | 
						|
    def test_file_upload_s3(self):
 | 
						|
        # type: () -> None
 | 
						|
        conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
 | 
						|
        bucket = conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
 | 
						|
 | 
						|
        sender_email = "hamlet@zulip.com"
 | 
						|
        user_profile = get_user_profile_by_email(sender_email)
 | 
						|
        uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile)
 | 
						|
 | 
						|
        base = '/user_uploads/'
 | 
						|
        self.assertEquals(base, uri[:len(base)])
 | 
						|
        path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
        self.assertEquals(b"zulip!", bucket.get_key(path_id).get_contents_as_string())
 | 
						|
 | 
						|
        self.subscribe_to_stream("hamlet@zulip.com", "Denmark")
 | 
						|
        body = "First message ...[zulip.txt](http://localhost:9991" + uri + ")"
 | 
						|
        self.send_message("hamlet@zulip.com", "Denmark", Recipient.STREAM, body, "test")
 | 
						|
        self.assertIn('title="dummy.txt"', self.get_last_message().rendered_content)
 | 
						|
 | 
						|
    @use_s3_backend
 | 
						|
    def test_message_image_delete_s3(self):
 | 
						|
        # type: () -> None
 | 
						|
        conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
 | 
						|
        conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
 | 
						|
 | 
						|
        sender_email = "hamlet@zulip.com"
 | 
						|
        user_profile = get_user_profile_by_email(sender_email)
 | 
						|
        uri = upload_message_image(u'dummy.txt', u'text/plain', b'zulip!', user_profile)
 | 
						|
 | 
						|
        path_id = re.sub('/user_uploads/', '', uri)
 | 
						|
        self.assertTrue(delete_message_image(path_id))
 | 
						|
 | 
						|
    @use_s3_backend
 | 
						|
    def test_file_upload_authed(self):
 | 
						|
        # type: () -> None
 | 
						|
        """
 | 
						|
        A call to /json/upload_file should return a uri and actually create an object.
 | 
						|
        """
 | 
						|
        conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
 | 
						|
        conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
 | 
						|
 | 
						|
        self.login("hamlet@zulip.com")
 | 
						|
        fp = StringIO("zulip!")
 | 
						|
        fp.name = "zulip.txt"
 | 
						|
 | 
						|
        result = self.client_post("/json/upload_file", {'file': fp})
 | 
						|
        self.assert_json_success(result)
 | 
						|
        json = ujson.loads(result.content)
 | 
						|
        self.assertIn("uri", json)
 | 
						|
        uri = json["uri"]
 | 
						|
        base = '/user_uploads/'
 | 
						|
        self.assertEquals(base, uri[:len(base)])
 | 
						|
 | 
						|
        response = self.client_get(uri)
 | 
						|
        redirect_url = response['Location']
 | 
						|
 | 
						|
        self.assertEquals(b"zulip!", urllib.request.urlopen(redirect_url).read().strip()) # type: ignore # six.moves.urllib.request.urlopen is not defined in typeshed
 | 
						|
 | 
						|
        self.subscribe_to_stream("hamlet@zulip.com", "Denmark")
 | 
						|
        body = "First message ...[zulip.txt](http://localhost:9991" + uri + ")"
 | 
						|
        self.send_message("hamlet@zulip.com", "Denmark", Recipient.STREAM, body, "test")
 | 
						|
        self.assertIn('title="zulip.txt"', self.get_last_message().rendered_content)
 | 
						|
 | 
						|
class UploadTitleTests(TestCase):
 | 
						|
    def test_upload_titles(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.assertEqual(url_filename("http://localhost:9991/user_uploads/1/LUeQZUG5jxkagzVzp1Ox_amr/dummy.txt"), "dummy.txt")
 | 
						|
        self.assertEqual(url_filename("http://localhost:9991/user_uploads/1/94/SzGYe0RFT-tEcOhQ6n-ZblFZ/zulip.txt"), "zulip.txt")
 | 
						|
        self.assertEqual(url_filename("https://zulip.com/user_uploads/4142/LUeQZUG5jxkagzVzp1Ox_amr/pasted_image.png"), "pasted_image.png")
 | 
						|
        self.assertEqual(url_filename("https://zulipchat.com/integrations"), "https://zulipchat.com/integrations")
 | 
						|
        self.assertEqual(url_filename("https://example.com"), "https://example.com")
 | 
						|
 | 
						|
class SanitizeNameTests(TestCase):
 | 
						|
    def test_file_name(self):
 | 
						|
        # type: () -> None
 | 
						|
        self.assertEquals(sanitize_name(u'test.txt'), u'test.txt')
 | 
						|
        self.assertEquals(sanitize_name(u'.hidden'), u'.hidden')
 | 
						|
        self.assertEquals(sanitize_name(u'.hidden.txt'), u'.hidden.txt')
 | 
						|
        self.assertEquals(sanitize_name(u'tarball.tar.gz'), u'tarball.tar.gz')
 | 
						|
        self.assertEquals(sanitize_name(u'.hidden_tarball.tar.gz'), u'.hidden_tarball.tar.gz')
 | 
						|
        self.assertEquals(sanitize_name(u'Testing{}*&*#().ta&&%$##&&r.gz'), u'Testing.tar.gz')
 | 
						|
        self.assertEquals(sanitize_name(u'*testingfile?*.txt'), u'testingfile.txt')
 | 
						|
        self.assertEquals(sanitize_name(u'snowman☃.txt'), u'snowman.txt')
 | 
						|
        self.assertEquals(sanitize_name(u'테스트.txt'), u'테스트.txt')
 | 
						|
        self.assertEquals(sanitize_name(u'~/."\`\?*"u0`000ssh/test.t**{}ar.gz'), u'.u0000sshtest.tar.gz')
 |