uploads: Move unauth-signed tokens into view.

This commit is contained in:
Alex Vandiver
2022-12-14 22:00:43 +00:00
committed by Alex Vandiver
parent ed6d62a9e7
commit 58dc1059f3
2 changed files with 29 additions and 33 deletions

View File

@@ -1,16 +1,11 @@
import base64
import binascii
import logging import logging
import os import os
import random import random
import secrets import secrets
import shutil import shutil
from datetime import timedelta
from typing import IO, Any, Callable, Literal, Optional from typing import IO, Any, Callable, Literal, Optional
from django.conf import settings from django.conf import settings
from django.core.signing import BadSignature, TimestampSigner
from django.urls import reverse
from zerver.lib.avatar_hash import user_avatar_path from zerver.lib.avatar_hash import user_avatar_path
from zerver.lib.upload.base import ( from zerver.lib.upload.base import (
@@ -68,28 +63,6 @@ def delete_local_file(type: Literal["avatars", "files"], path: str) -> bool:
return False return False
LOCAL_FILE_ACCESS_TOKEN_SALT = "local_file_"
def generate_unauthed_file_access_url(path_id: str) -> str:
signed_data = TimestampSigner(salt=LOCAL_FILE_ACCESS_TOKEN_SALT).sign(path_id)
token = base64.b16encode(signed_data.encode()).decode()
filename = path_id.split("/")[-1]
return reverse("local_file_unauthed", args=[token, filename])
def get_local_file_path_id_from_token(token: str) -> Optional[str]:
signer = TimestampSigner(salt=LOCAL_FILE_ACCESS_TOKEN_SALT)
try:
signed_data = base64.b16decode(token).decode()
path_id = signer.unsign(signed_data, max_age=timedelta(seconds=60))
except (BadSignature, binascii.Error):
return None
return path_id
class LocalUploadBackend(ZulipUploadBackend): class LocalUploadBackend(ZulipUploadBackend):
def get_public_upload_root_url(self) -> str: def get_public_upload_root_url(self) -> str:
return "/user_avatars/" return "/user_avatars/"

View File

@@ -1,11 +1,15 @@
import base64
import binascii
import os import os
from datetime import timedelta
from mimetypes import guess_type from mimetypes import guess_type
from typing import Union from typing import Optional, Union
from urllib.parse import quote, urlparse from urllib.parse import quote, urlparse
from django.conf import settings from django.conf import settings
from django.contrib.auth.models import AnonymousUser from django.contrib.auth.models import AnonymousUser
from django.core.files.uploadedfile import UploadedFile from django.core.files.uploadedfile import UploadedFile
from django.core.signing import BadSignature, TimestampSigner
from django.http import ( from django.http import (
FileResponse, FileResponse,
HttpRequest, HttpRequest,
@@ -15,6 +19,7 @@ from django.http import (
HttpResponseNotFound, HttpResponseNotFound,
) )
from django.shortcuts import redirect from django.shortcuts import redirect
from django.urls import reverse
from django.utils.cache import patch_cache_control from django.utils.cache import patch_cache_control
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
@@ -27,11 +32,7 @@ from zerver.lib.upload import (
upload_message_image_from_request, upload_message_image_from_request,
) )
from zerver.lib.upload.base import INLINE_MIME_TYPES from zerver.lib.upload.base import INLINE_MIME_TYPES
from zerver.lib.upload.local import ( from zerver.lib.upload.local import assert_is_local_storage_path
assert_is_local_storage_path,
generate_unauthed_file_access_url,
get_local_file_path_id_from_token,
)
from zerver.lib.upload.s3 import get_signed_upload_url from zerver.lib.upload.s3 import get_signed_upload_url
from zerver.models import UserProfile, validate_attachment_request from zerver.models import UserProfile, validate_attachment_request
@@ -175,6 +176,28 @@ def serve_file(
return serve_s3(request, path_id, url_only, download=download) return serve_s3(request, path_id, url_only, download=download)
LOCAL_FILE_ACCESS_TOKEN_SALT = "local_file_"
def generate_unauthed_file_access_url(path_id: str) -> str:
signed_data = TimestampSigner(salt=LOCAL_FILE_ACCESS_TOKEN_SALT).sign(path_id)
token = base64.b16encode(signed_data.encode()).decode()
filename = path_id.split("/")[-1]
return reverse("local_file_unauthed", args=[token, filename])
def get_local_file_path_id_from_token(token: str) -> Optional[str]:
signer = TimestampSigner(salt=LOCAL_FILE_ACCESS_TOKEN_SALT)
try:
signed_data = base64.b16decode(token).decode()
path_id = signer.unsign(signed_data, max_age=timedelta(seconds=60))
except (BadSignature, binascii.Error):
return None
return path_id
def serve_local_file_unauthed(request: HttpRequest, token: str, filename: str) -> HttpResponseBase: def serve_local_file_unauthed(request: HttpRequest, token: str, filename: str) -> HttpResponseBase:
path_id = get_local_file_path_id_from_token(token) path_id = get_local_file_path_id_from_token(token)
if path_id is None: if path_id is None: