mirror of
				https://github.com/zulip/zulip.git
				synced 2025-11-04 05:53:43 +00:00 
			
		
		
		
	upload: Extract function to delete file.
This commit is contained in:
		@@ -274,8 +274,20 @@ def get_realm_for_filename(path: str) -> Optional[int]:
 | 
			
		||||
        return None
 | 
			
		||||
    return get_user_profile_by_id(key.metadata["user_profile_id"]).realm_id
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class S3UploadBackend(ZulipUploadBackend):
 | 
			
		||||
    def delete_file_from_s3(self, path_id: str, bucket_name: str) -> bool:
 | 
			
		||||
        conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
 | 
			
		||||
        bucket = get_bucket(conn, bucket_name)
 | 
			
		||||
 | 
			
		||||
        # check if file exists
 | 
			
		||||
        key = bucket.get_key(path_id)
 | 
			
		||||
        if key is not None:
 | 
			
		||||
            bucket.delete_key(key)
 | 
			
		||||
            return True
 | 
			
		||||
 | 
			
		||||
        file_name = path_id.split("/")[-1]
 | 
			
		||||
        logging.warning("%s does not exist. Its entry in the database will be removed." % (file_name,))
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    def upload_message_file(self, uploaded_file_name: str, uploaded_file_size: int,
 | 
			
		||||
                            content_type: Optional[str], file_data: bytes,
 | 
			
		||||
@@ -302,18 +314,7 @@ class S3UploadBackend(ZulipUploadBackend):
 | 
			
		||||
        return url
 | 
			
		||||
 | 
			
		||||
    def delete_message_image(self, path_id: str) -> bool:
 | 
			
		||||
        conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
 | 
			
		||||
        bucket = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET)
 | 
			
		||||
 | 
			
		||||
        # check if file exists
 | 
			
		||||
        key = bucket.get_key(path_id)
 | 
			
		||||
        if key is not None:
 | 
			
		||||
            bucket.delete_key(key)
 | 
			
		||||
            return True
 | 
			
		||||
 | 
			
		||||
        file_name = path_id.split("/")[-1]
 | 
			
		||||
        logging.warning("%s does not exist. Its entry in the database will be removed." % (file_name,))
 | 
			
		||||
        return False
 | 
			
		||||
        return self.delete_file_from_s3(path_id, settings.S3_AUTH_UPLOADS_BUCKET)
 | 
			
		||||
 | 
			
		||||
    def write_avatar_images(self, s3_file_name: str, target_user_profile: UserProfile,
 | 
			
		||||
                            image_data: bytes, content_type: Optional[str]) -> None:
 | 
			
		||||
@@ -477,6 +478,16 @@ def read_local_file(type: str, path: str) -> bytes:
 | 
			
		||||
    with open(file_path, 'rb') as f:
 | 
			
		||||
        return f.read()
 | 
			
		||||
 | 
			
		||||
def delete_local_file(type: str, path: str) -> bool:
 | 
			
		||||
    file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, type, path)
 | 
			
		||||
    if os.path.isfile(file_path):
 | 
			
		||||
        # This removes the file but the empty folders still remain.
 | 
			
		||||
        os.remove(file_path)
 | 
			
		||||
        return True
 | 
			
		||||
    file_name = path.split("/")[-1]
 | 
			
		||||
    logging.warning("%s does not exist. Its entry in the database will be removed." % (file_name,))
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
def get_local_file_path(path_id: str) -> Optional[str]:
 | 
			
		||||
    local_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
 | 
			
		||||
    if os.path.isfile(local_path):
 | 
			
		||||
@@ -501,15 +512,7 @@ class LocalUploadBackend(ZulipUploadBackend):
 | 
			
		||||
        return '/user_uploads/' + path
 | 
			
		||||
 | 
			
		||||
    def delete_message_image(self, path_id: str) -> bool:
 | 
			
		||||
        file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
 | 
			
		||||
        if os.path.isfile(file_path):
 | 
			
		||||
            # This removes the file but the empty folders still remain.
 | 
			
		||||
            os.remove(file_path)
 | 
			
		||||
            return True
 | 
			
		||||
 | 
			
		||||
        file_name = path_id.split("/")[-1]
 | 
			
		||||
        logging.warning("%s does not exist. Its entry in the database will be removed." % (file_name,))
 | 
			
		||||
        return False
 | 
			
		||||
        return delete_local_file('files', path_id)
 | 
			
		||||
 | 
			
		||||
    def write_avatar_images(self, file_path: str, image_data: bytes) -> None:
 | 
			
		||||
        write_local_file('avatars', file_path + '.original', image_data)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user