mirror of
https://github.com/DumbWareio/DumbDrop.git
synced 2025-10-22 23:31:57 +00:00
Compare commits
3 Commits
c24e866074
...
369077676d
Author | SHA1 | Date | |
---|---|---|---|
|
369077676d | ||
|
165223f8ed | ||
|
1273fe92b1 |
1125
public/index.html
1125
public/index.html
File diff suppressed because it is too large
Load Diff
@@ -1,200 +1,152 @@
|
||||
/**
|
||||
* File upload route handlers.
|
||||
* Delegates storage operations to the configured storage adapter.
|
||||
* Handles multipart uploads via adapter logic.
|
||||
* File upload route handlers and batch upload management.
|
||||
* Handles file uploads, chunked transfers, and folder creation.
|
||||
* Manages upload sessions using persistent metadata for resumability.
|
||||
*/
|
||||
|
||||
const express = require('express');
|
||||
const router = express.Router();
|
||||
const path = require('path'); // Still needed for extension checks
|
||||
const { config } = require('../config');
|
||||
const logger = require('../utils/logger');
|
||||
const { storageAdapter } = require('../storage'); // Import the adapter factory's result
|
||||
const { isDemoMode } = require('../utils/demoMode'); // Keep demo check for specific route behavior if needed
|
||||
const { isDemoMode } = require('../utils/demoMode');
|
||||
const { storageAdapter } = require('../storage'); // Import the storage adapter
|
||||
const { isValidBatchId } = require('../utils/fileUtils');
|
||||
const crypto = require('crypto'); // Keep crypto for demo mode uploadId
|
||||
|
||||
// --- Routes ---
|
||||
|
||||
// Initialize upload
|
||||
router.post('/init', async (req, res) => {
|
||||
// Note: Demo mode might bypass storage adapter logic via middleware or adapter factory itself.
|
||||
// If specific demo responses are needed here, keep the check.
|
||||
// DEMO MODE CHECK - Bypass persistence if in demo mode
|
||||
if (isDemoMode()) {
|
||||
// Simplified Demo Response (assuming demoAdapter handles non-persistence)
|
||||
const { filename = 'demo_file', fileSize = 0 } = req.body;
|
||||
const demoUploadId = 'demo-' + Math.random().toString(36).substr(2, 9);
|
||||
logger.info(`[DEMO] Init request for ${filename}, size ${fileSize}. Returning ID ${demoUploadId}`);
|
||||
if (Number(fileSize) === 0) {
|
||||
logger.success(`[DEMO] Simulated completion of zero-byte file: ${filename}`);
|
||||
// Potentially call demoAdapter.completeUpload or similar mock logic if needed
|
||||
}
|
||||
return res.json({ uploadId: demoUploadId });
|
||||
const { filename, fileSize } = req.body;
|
||||
const uploadId = `demo-${crypto.randomBytes(16).toString('hex')}`;
|
||||
logger.info(`[DEMO] Initialized upload for ${filename} (${fileSize} bytes) with ID ${uploadId}`);
|
||||
// Simulate zero-byte completion for demo
|
||||
if (Number(fileSize) === 0) {
|
||||
logger.success(`[DEMO] Completed zero-byte file upload: ${filename}`);
|
||||
// sendNotification(filename, 0, config); // In demo, notifications are typically skipped or mocked by demoAdapter
|
||||
}
|
||||
return res.json({ uploadId });
|
||||
}
|
||||
|
||||
const { filename, fileSize } = req.body;
|
||||
const clientBatchId = req.headers['x-batch-id']; // Adapter might use this
|
||||
const clientBatchId = req.headers['x-batch-id'];
|
||||
|
||||
// --- Basic validations ---
|
||||
if (!filename) return res.status(400).json({ error: 'Missing filename' });
|
||||
if (fileSize === undefined || fileSize === null) return res.status(400).json({ error: 'Missing fileSize' });
|
||||
const size = Number(fileSize);
|
||||
if (isNaN(size) || size < 0) return res.status(400).json({ error: 'Invalid file size' });
|
||||
const maxSizeInBytes = config.maxFileSize;
|
||||
if (size > maxSizeInBytes) return res.status(413).json({ error: 'File too large', limit: maxSizeInBytes });
|
||||
|
||||
// --- Max File Size Check ---
|
||||
if (size > config.maxFileSize) {
|
||||
logger.warn(`Upload rejected: File size ${size} exceeds limit ${config.maxFileSize}`);
|
||||
return res.status(413).json({ error: 'File too large', limit: config.maxFileSize });
|
||||
}
|
||||
|
||||
// --- Extension Check ---
|
||||
// Perform extension check before handing off to adapter
|
||||
if (config.allowedExtensions && config.allowedExtensions.length > 0) {
|
||||
const fileExt = path.extname(filename).toLowerCase();
|
||||
// Check if the extracted extension (including '.') is in the allowed list
|
||||
if (!fileExt || !config.allowedExtensions.includes(fileExt)) {
|
||||
logger.warn(`Upload rejected: File type not allowed: ${filename} (Extension: ${fileExt || 'none'})`);
|
||||
return res.status(400).json({ error: 'File type not allowed', receivedExtension: fileExt || 'none' });
|
||||
}
|
||||
logger.debug(`File extension ${fileExt} allowed for ${filename}`);
|
||||
// Validate clientBatchId if provided
|
||||
if (clientBatchId && !isValidBatchId(clientBatchId)) {
|
||||
return res.status(400).json({ error: 'Invalid batch ID format' });
|
||||
}
|
||||
|
||||
try {
|
||||
// Delegate initialization to the storage adapter
|
||||
const result = await storageAdapter.initUpload(filename, size, clientBatchId);
|
||||
|
||||
// Respond with the uploadId generated by the adapter/system
|
||||
res.json({ uploadId: result.uploadId });
|
||||
const { uploadId } = await storageAdapter.initUpload(filename, size, clientBatchId);
|
||||
logger.info(`[Route /init] Storage adapter initialized upload: ${uploadId} for ${filename}`);
|
||||
res.json({ uploadId });
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[Route /init] Upload initialization failed: ${err.message}`, err.stack);
|
||||
// Map common errors
|
||||
let statusCode = 500;
|
||||
let clientMessage = 'Failed to initialize upload.';
|
||||
if (err.message.includes('Invalid batch ID format')) {
|
||||
statusCode = 400;
|
||||
clientMessage = err.message;
|
||||
} else if (err.name === 'NoSuchBucket' || err.name === 'AccessDenied') { // S3 Specific
|
||||
statusCode = 500; // Internal config error
|
||||
clientMessage = 'Storage configuration error.';
|
||||
} else if (err.code === 'EACCES' || err.code === 'EPERM' || err.message.includes('writable')) { // Local Specific
|
||||
statusCode = 500;
|
||||
clientMessage = 'Storage permission or access error.';
|
||||
logger.error(`[Route /init] Upload initialization failed: ${err.message} ${err.stack}`);
|
||||
// Check for specific error types if adapter throws them (e.g., size limit from adapter)
|
||||
if (err.message.includes('File too large') || err.status === 413) {
|
||||
return res.status(413).json({ error: 'File too large', details: err.message, limit: config.maxFileSize });
|
||||
}
|
||||
// Add more specific error mapping based on adapter exceptions if needed
|
||||
|
||||
res.status(statusCode).json({ error: clientMessage, details: err.message }); // Include details only for logging/debugging
|
||||
if (err.message.includes('File type not allowed') || err.status === 400) {
|
||||
return res.status(400).json({ error: 'File type not allowed', details: err.message });
|
||||
}
|
||||
return res.status(500).json({ error: 'Failed to initialize upload via adapter', details: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// Upload chunk
|
||||
router.post('/chunk/:uploadId', express.raw({
|
||||
limit: config.maxFileSize + (10 * 1024 * 1024), // Allow slightly larger raw body than max file size
|
||||
type: 'application/octet-stream'
|
||||
router.post('/chunk/:uploadId', express.raw({
|
||||
limit: config.maxFileSize + (10 * 1024 * 1024), // Generous limit for raw body
|
||||
type: 'application/octet-stream'
|
||||
}), async (req, res) => {
|
||||
|
||||
const { uploadId } = req.params;
|
||||
const chunk = req.body;
|
||||
const clientBatchId = req.headers['x-batch-id']; // May be useful for logging context
|
||||
|
||||
// ** CRITICAL FOR S3: Get Part Number from client **
|
||||
// Client needs to send this, e.g., ?partNumber=1, ?partNumber=2, ...
|
||||
const partNumber = parseInt(req.query.partNumber || '1', 10);
|
||||
if (isNaN(partNumber) || partNumber < 1) {
|
||||
logger.error(`[Route /chunk] Invalid partNumber received: ${req.query.partNumber}`);
|
||||
return res.status(400).json({ error: 'Missing or invalid partNumber query parameter (must be >= 1)' });
|
||||
}
|
||||
|
||||
// Demo mode handling (simplified)
|
||||
if (isDemoMode()) {
|
||||
logger.debug(`[DEMO /chunk] Received chunk for ${uploadId}, part ${partNumber}, size ${chunk?.length || 0}`);
|
||||
// Simulate progress - more sophisticated logic could go in a demoAdapter
|
||||
const demoProgress = Math.min(100, Math.random() * 100);
|
||||
const completed = demoProgress > 95; // Simulate completion occasionally
|
||||
if (completed) {
|
||||
logger.info(`[DEMO /chunk] Simulated completion for ${uploadId}`);
|
||||
}
|
||||
return res.json({ bytesReceived: 0, progress: demoProgress, completed }); // Approximate response
|
||||
}
|
||||
|
||||
|
||||
if (!chunk || chunk.length === 0) {
|
||||
logger.warn(`[Route /chunk] Received empty chunk for uploadId: ${uploadId}, part ${partNumber}`);
|
||||
return res.status(400).json({ error: 'Empty chunk received' });
|
||||
// DEMO MODE CHECK
|
||||
if (isDemoMode()) {
|
||||
const { uploadId } = req.params;
|
||||
logger.debug(`[DEMO] Received chunk for ${uploadId}`);
|
||||
// Fake progress - requires knowing file size which isn't easily available here in demo
|
||||
const demoProgress = Math.min(100, Math.random() * 100); // Placeholder
|
||||
return res.json({ bytesReceived: 0, progress: demoProgress });
|
||||
}
|
||||
|
||||
const { uploadId } = req.params;
|
||||
let chunk = req.body;
|
||||
const chunkSize = chunk.length;
|
||||
|
||||
if (!chunkSize) return res.status(400).json({ error: 'Empty chunk received' });
|
||||
|
||||
try {
|
||||
// Delegate chunk storage to the adapter
|
||||
const result = await storageAdapter.storeChunk(uploadId, chunk, partNumber);
|
||||
// Delegate to storage adapter
|
||||
// The adapter's storeChunk should handle partNumber for S3 internally
|
||||
const { bytesReceived, progress, completed } = await storageAdapter.storeChunk(uploadId, chunk);
|
||||
logger.debug(`[Route /chunk] Stored chunk for ${uploadId}. Progress: ${progress}%, Completed by adapter: ${completed}`);
|
||||
|
||||
// If the adapter indicates completion after storing this chunk, finalize the upload
|
||||
if (result.completed) {
|
||||
logger.info(`[Route /chunk] Chunk ${partNumber} for ${uploadId} triggered completion. Finalizing...`);
|
||||
if (completed) {
|
||||
logger.info(`[Route /chunk] Adapter reported completion for ${uploadId}. Finalizing...`);
|
||||
try {
|
||||
const completionResult = await storageAdapter.completeUpload(uploadId);
|
||||
logger.success(`[Route /chunk] Successfully finalized upload ${uploadId}. Final path/key: ${completionResult.finalPath}`);
|
||||
// Send final success response (ensure progress is 100)
|
||||
return res.json({ bytesReceived: result.bytesReceived, progress: 100, completed: true });
|
||||
} catch (completionError) {
|
||||
logger.error(`[Route /chunk] CRITICAL: Failed to finalize completed upload ${uploadId} after storing chunk ${partNumber}: ${completionError.message}`, completionError.stack);
|
||||
// What to return to client? The chunk was stored, but completion failed.
|
||||
// Return 500, indicating server-side issue during finalization.
|
||||
return res.status(500).json({ error: 'Upload chunk received, but failed to finalize.', details: completionError.message });
|
||||
const finalizationResult = await storageAdapter.completeUpload(uploadId);
|
||||
logger.success(`[Route /chunk] Successfully finalized upload ${uploadId}. Final path/key: ${finalizationResult.finalPath}`);
|
||||
// The adapter's completeUpload method is responsible for sending notifications and cleaning its metadata.
|
||||
} catch (completeErr) {
|
||||
logger.error(`[Route /chunk] CRITICAL: Failed to finalize completed upload ${uploadId} after storing chunk: ${completeErr.message} ${completeErr.stack}`);
|
||||
// If completeUpload fails, the client might retry the chunk.
|
||||
// The adapter's storeChunk should be idempotent or handle this.
|
||||
// We still return the progress of the chunk write to the client.
|
||||
// The client will likely retry, or the user will see the upload stall at 100% if this was the last chunk.
|
||||
// Consider what to return to client here. For now, return chunk progress but log server error.
|
||||
// The 'completed' flag from storeChunk might cause client to stop sending if it thinks it's done.
|
||||
// If completeUpload fails, maybe the response to client should indicate not fully complete yet?
|
||||
// Let's return the original progress. The client will retry if needed.
|
||||
return res.status(500).json({ error: 'Chunk processed but finalization failed on server.', details: completeErr.message, currentProgress: progress });
|
||||
}
|
||||
} else {
|
||||
// Chunk stored, but upload not yet complete, return progress
|
||||
res.json({ bytesReceived: result.bytesReceived, progress: result.progress, completed: false });
|
||||
}
|
||||
res.json({ bytesReceived, progress });
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[Route /chunk] Chunk upload failed for ${uploadId}, part ${partNumber}: ${err.message}`, err.stack);
|
||||
// Map common errors
|
||||
let statusCode = 500;
|
||||
let clientMessage = 'Failed to process chunk.';
|
||||
|
||||
if (err.message.includes('Upload session not found') || err.name === 'NoSuchUpload' || err.code === 'ENOENT') {
|
||||
statusCode = 404;
|
||||
clientMessage = 'Upload session not found or already completed/aborted.';
|
||||
} else if (err.name === 'InvalidPart' || err.name === 'InvalidPartOrder') { // S3 Specific
|
||||
statusCode = 400;
|
||||
clientMessage = 'Invalid upload chunk sequence or data.';
|
||||
} else if (err.name === 'SlowDown') { // S3 Throttling
|
||||
statusCode = 429;
|
||||
clientMessage = 'Upload rate limit exceeded by storage provider, please try again later.';
|
||||
} else if (err.code === 'EACCES' || err.code === 'EPERM' ) { // Local specific
|
||||
statusCode = 500;
|
||||
clientMessage = 'Storage permission error while writing chunk.';
|
||||
logger.error(`[Route /chunk] Chunk upload failed for ${uploadId}: ${err.message} ${err.stack}`);
|
||||
if (err.message.includes('Upload session not found')) {
|
||||
return res.status(404).json({ error: 'Upload session not found or already completed', details: err.message });
|
||||
}
|
||||
// Add more specific error mapping if needed
|
||||
|
||||
res.status(statusCode).json({ error: clientMessage, details: err.message });
|
||||
// Don't delete adapter's metadata on generic chunk errors, let client retry or adapter's cleanup handle stale entries.
|
||||
res.status(500).json({ error: 'Failed to process chunk via adapter', details: err.message });
|
||||
}
|
||||
});
|
||||
|
||||
// Cancel upload
|
||||
router.post('/cancel/:uploadId', async (req, res) => {
|
||||
const { uploadId } = req.params;
|
||||
|
||||
// DEMO MODE CHECK
|
||||
if (isDemoMode()) {
|
||||
logger.info(`[DEMO /cancel] Request received for ${uploadId}`);
|
||||
// Call demoAdapter.abortUpload(uploadId) if it exists?
|
||||
return res.json({ message: 'Upload cancelled (Demo)' });
|
||||
logger.info(`[DEMO] Upload cancelled: ${req.params.uploadId}`);
|
||||
return res.json({ message: 'Upload cancelled (Demo)' });
|
||||
}
|
||||
|
||||
const { uploadId } = req.params;
|
||||
logger.info(`[Route /cancel] Received cancel request for upload: ${uploadId}`);
|
||||
|
||||
try {
|
||||
// Delegate cancellation to the storage adapter
|
||||
await storageAdapter.abortUpload(uploadId);
|
||||
res.json({ message: 'Upload cancelled successfully or was already inactive.' });
|
||||
logger.info(`[Route /cancel] Upload ${uploadId} cancelled via storage adapter.`);
|
||||
res.json({ message: 'Upload cancelled or already complete' });
|
||||
} catch (err) {
|
||||
// Abort errors are often less critical, log them but maybe return success anyway
|
||||
logger.error(`[Route /cancel] Error during upload cancellation for ${uploadId}: ${err.message}`, err.stack);
|
||||
// Don't necessarily send 500, as the goal is just to stop the upload client-side
|
||||
// Maybe just return success but log the server-side issue?
|
||||
// Or return 500 if S3 abort fails significantly? Let's return 500 for now.
|
||||
res.status(500).json({ error: 'Failed to cancel upload on server.', details: err.message });
|
||||
logger.error(`[Route /cancel] Error during upload cancellation for ${uploadId}: ${err.message}`);
|
||||
// Adapters should handle "not found" gracefully or throw specific error
|
||||
if (err.message.includes('not found')) { // Generic check
|
||||
return res.status(404).json({ error: 'Upload not found or already processed', details: err.message });
|
||||
}
|
||||
res.status(500).json({ error: 'Failed to cancel upload via adapter' });
|
||||
}
|
||||
});
|
||||
|
||||
// Export the router, remove previous function exports
|
||||
module.exports = { router };
|
||||
module.exports = {
|
||||
router
|
||||
// Remove internal metadata/batch cleanup exports as they are adapter-specific now or not used by router
|
||||
};
|
@@ -25,7 +25,7 @@ async function startServer() {
|
||||
// Start the server
|
||||
const server = app.listen(config.port, () => {
|
||||
logger.info(`Server running at ${config.baseUrl}`);
|
||||
logger.info(`Upload directory: ${config.uploadDisplayPath}`);
|
||||
logger.info(`Upload directory (for local adapter state/uploads): ${config.uploadDir}`);
|
||||
|
||||
// List directory contents in development
|
||||
if (config.nodeEnv === 'development') {
|
||||
|
@@ -3,590 +3,393 @@
|
||||
* Handles file operations for storing files on AWS S3 or S3-compatible services.
|
||||
* Implements the storage interface expected by the application routes.
|
||||
* Uses local files in '.metadata' directory to track multipart upload progress.
|
||||
* Buffers individual parts for MPU or entire small files before S3 PUT.
|
||||
*/
|
||||
|
||||
const {
|
||||
S3Client,
|
||||
CreateMultipartUploadCommand,
|
||||
UploadPartCommand,
|
||||
CompleteMultipartUploadCommand,
|
||||
AbortMultipartUploadCommand,
|
||||
ListObjectsV2Command,
|
||||
GetObjectCommand,
|
||||
DeleteObjectCommand,
|
||||
PutObjectCommand // For zero-byte files
|
||||
} = require('@aws-sdk/client-s3');
|
||||
const { getSignedUrl } = require("@aws-sdk/s3-request-presigner");
|
||||
const fs = require('fs').promises;
|
||||
const fsSync = require('fs'); // For synchronous checks
|
||||
const path = require('path');
|
||||
const crypto = require('crypto');
|
||||
const { config } = require('../config');
|
||||
const logger = require('../utils/logger');
|
||||
const {
|
||||
sanitizePathPreserveDirs,
|
||||
isValidBatchId,
|
||||
formatFileSize // Keep for potential future use or consistency
|
||||
} = require('../utils/fileUtils');
|
||||
const { sendNotification } = require('../services/notifications'); // Needed for completion
|
||||
|
||||
// --- Constants ---
|
||||
const METADATA_DIR = path.join(config.uploadDir, '.metadata'); // Use local dir for metadata state
|
||||
const UPLOAD_TIMEOUT = 30 * 60 * 1000; // 30 minutes timeout for stale *local* metadata cleanup
|
||||
|
||||
// --- S3 Client Initialization ---
|
||||
let s3Client;
|
||||
try {
|
||||
const s3ClientConfig = {
|
||||
region: config.s3Region,
|
||||
credentials: {
|
||||
accessKeyId: config.s3AccessKeyId,
|
||||
secretAccessKey: config.s3SecretAccessKey,
|
||||
},
|
||||
...(config.s3EndpointUrl && { endpoint: config.s3EndpointUrl }),
|
||||
...(config.s3ForcePathStyle && { forcePathStyle: true }),
|
||||
};
|
||||
|
||||
if (s3ClientConfig.endpoint) {
|
||||
logger.info(`[S3 Adapter] Configuring S3 client for endpoint: ${s3ClientConfig.endpoint}`);
|
||||
}
|
||||
if (s3ClientConfig.forcePathStyle) {
|
||||
logger.info(`[S3 Adapter] Configuring S3 client with forcePathStyle: true`);
|
||||
}
|
||||
|
||||
s3Client = new S3Client(s3ClientConfig);
|
||||
logger.success('[S3 Adapter] S3 Client configured successfully.');
|
||||
|
||||
} catch (error) {
|
||||
logger.error(`[S3 Adapter] Failed to configure S3 client: ${error.message}`);
|
||||
// This is critical, throw an error to prevent the adapter from being used incorrectly
|
||||
throw new Error('S3 Client configuration failed. Check S3 environment variables.');
|
||||
}
|
||||
|
||||
// --- Metadata Helper Functions (Adapted for S3, store state locally) ---
|
||||
|
||||
async function ensureMetadataDirExists() {
|
||||
// Reuse logic from local adapter - S3 adapter still needs local dir for state
|
||||
try {
|
||||
if (!fsSync.existsSync(METADATA_DIR)) {
|
||||
await fs.mkdir(METADATA_DIR, { recursive: true });
|
||||
logger.info(`[S3 Adapter] Created local metadata directory: ${METADATA_DIR}`);
|
||||
}
|
||||
await fs.access(METADATA_DIR, fsSync.constants.W_OK);
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Local metadata directory error (${METADATA_DIR}): ${err.message}`);
|
||||
throw new Error(`Failed to access or create local metadata directory for S3 adapter state: ${METADATA_DIR}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Read/Write/Delete functions are identical to localAdapter as they manage local state files
|
||||
async function readUploadMetadata(uploadId) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
|
||||
logger.warn(`[S3 Adapter] Attempted to read metadata with invalid uploadId: ${uploadId}`);
|
||||
return null;
|
||||
}
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
try {
|
||||
const data = await fs.readFile(metaFilePath, 'utf8');
|
||||
// Ensure 'parts' is always an array on read
|
||||
const metadata = JSON.parse(data);
|
||||
metadata.parts = metadata.parts || [];
|
||||
return metadata;
|
||||
} catch (err) {
|
||||
if (err.code === 'ENOENT') { return null; }
|
||||
logger.error(`[S3 Adapter] Error reading metadata for ${uploadId}: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function writeUploadMetadata(uploadId, metadata) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
|
||||
logger.error(`[S3 Adapter] Attempted to write metadata with invalid uploadId: ${uploadId}`);
|
||||
return;
|
||||
}
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
metadata.lastActivity = Date.now();
|
||||
metadata.parts = metadata.parts || []; // Ensure parts array exists
|
||||
try {
|
||||
const tempMetaPath = `${metaFilePath}.${crypto.randomBytes(4).toString('hex')}.tmp`;
|
||||
await fs.writeFile(tempMetaPath, JSON.stringify(metadata, null, 2));
|
||||
await fs.rename(tempMetaPath, metaFilePath);
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Error writing metadata for ${uploadId}: ${err.message}`);
|
||||
try { await fs.unlink(tempMetaPath); } catch (unlinkErr) {/* ignore */}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function deleteUploadMetadata(uploadId) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
|
||||
logger.warn(`[S3 Adapter] Attempted to delete metadata with invalid uploadId: ${uploadId}`);
|
||||
return;
|
||||
}
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
try {
|
||||
await fs.unlink(metaFilePath);
|
||||
logger.debug(`[S3 Adapter] Deleted metadata file: ${uploadId}.meta`);
|
||||
} catch (err) {
|
||||
if (err.code !== 'ENOENT') {
|
||||
logger.error(`[S3 Adapter] Error deleting metadata file ${uploadId}.meta: ${err.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure metadata dir exists on initialization
|
||||
ensureMetadataDirExists().catch(err => {
|
||||
logger.error(`[S3 Adapter] Initialization failed: ${err.message}`);
|
||||
process.exit(1); // Exit if we can't manage metadata state
|
||||
S3Client, CreateMultipartUploadCommand, UploadPartCommand,
|
||||
CompleteMultipartUploadCommand, AbortMultipartUploadCommand, ListObjectsV2Command,
|
||||
GetObjectCommand, DeleteObjectCommand, PutObjectCommand
|
||||
} = require('@aws-sdk/client-s3');
|
||||
const { getSignedUrl } = require("@aws-sdk/s3-request-presigner");
|
||||
const fs = require('fs').promises;
|
||||
const fsSync = require('fs'); // For synchronous checks
|
||||
const path = require('path');
|
||||
const crypto = require('crypto');
|
||||
const { config } = require('../config');
|
||||
const logger = require('../utils/logger');
|
||||
const { sanitizePathPreserveDirs, isValidBatchId, formatFileSize } = require('../utils/fileUtils');
|
||||
const { sendNotification } = require('../services/notifications');
|
||||
|
||||
// --- Constants ---
|
||||
const METADATA_DIR = path.join(config.uploadDir, '.metadata');
|
||||
const TEMP_CHUNK_DIR = path.join(config.uploadDir, '.temp_chunks'); // For buffering parts or small files
|
||||
const UPLOAD_TIMEOUT = 30 * 60 * 1000; // 30 min local stale cleanup
|
||||
const MIN_S3_TOTAL_SIZE_FOR_MULTIPART = 5 * 1024 * 1024; // 5MB
|
||||
const S3_PART_SIZE = 5 * 1024 * 1024; // Min 5MB for S3 parts (except last)
|
||||
|
||||
// --- Helper function to convert stream to string ---
|
||||
async function streamToString(stream) {
|
||||
if (!stream || typeof stream.pipe !== 'function') return Promise.resolve(null);
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks = [];
|
||||
stream.on('data', (chunk) => chunks.push(chunk));
|
||||
stream.on('error', reject);
|
||||
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
|
||||
});
|
||||
|
||||
|
||||
// --- Interface Implementation ---
|
||||
|
||||
/**
|
||||
* Initializes an S3 multipart upload session (or direct put for zero-byte).
|
||||
* @param {string} filename - Original filename/path from client.
|
||||
* @param {number} fileSize - Total size of the file.
|
||||
* @param {string} clientBatchId - Optional batch ID from client.
|
||||
* @returns {Promise<{uploadId: string}>} Object containing the application's upload ID.
|
||||
*/
|
||||
async function initUpload(filename, fileSize, clientBatchId) {
|
||||
await ensureMetadataDirExists(); // Re-check before operation
|
||||
|
||||
const size = Number(fileSize);
|
||||
const appUploadId = crypto.randomBytes(16).toString('hex'); // Our internal ID
|
||||
|
||||
// --- Path handling and Sanitization for S3 Key ---
|
||||
const sanitizedFilename = sanitizePathPreserveDirs(filename);
|
||||
// S3 keys should not start with /
|
||||
const s3Key = path.normalize(sanitizedFilename)
|
||||
.replace(/^(\.\.(\/|\\|$))+/, '')
|
||||
.replace(/\\/g, '/')
|
||||
.replace(/^\/+/, '');
|
||||
|
||||
logger.info(`[S3 Adapter] Init request for S3 Key: ${s3Key}`);
|
||||
|
||||
// --- Handle Zero-Byte Files ---
|
||||
if (size === 0) {
|
||||
try {
|
||||
const putCommand = new PutObjectCommand({
|
||||
Bucket: config.s3BucketName,
|
||||
Key: s3Key,
|
||||
Body: '', // Empty body
|
||||
ContentLength: 0
|
||||
});
|
||||
await s3Client.send(putCommand);
|
||||
logger.success(`[S3 Adapter] Completed zero-byte file upload directly: ${s3Key}`);
|
||||
// No metadata needed for zero-byte files as they are completed atomically
|
||||
sendNotification(filename, 0, config); // Send notification (use original filename)
|
||||
// Return an uploadId that won't conflict or be processable by chunk/complete
|
||||
return { uploadId: `zero-byte-${appUploadId}` }; // Or maybe return null/special status?
|
||||
// Returning a unique ID might be safer for client state.
|
||||
} catch (putErr) {
|
||||
logger.error(`[S3 Adapter] Failed to put zero-byte object ${s3Key}: ${putErr.message}`);
|
||||
throw putErr; // Let the route handler deal with it
|
||||
}
|
||||
|
||||
let s3Client;
|
||||
try {
|
||||
s3Client = new S3Client({
|
||||
region: config.s3Region,
|
||||
credentials: { accessKeyId: config.s3AccessKeyId, secretAccessKey: config.s3SecretAccessKey },
|
||||
...(config.s3EndpointUrl && { endpoint: config.s3EndpointUrl }),
|
||||
...(config.s3ForcePathStyle && { forcePathStyle: true }),
|
||||
});
|
||||
logger.success('[S3 Adapter] S3 Client configured.');
|
||||
} catch (error) {
|
||||
logger.error(`[S3 Adapter] Failed to configure S3 client: ${error.message}`);
|
||||
throw new Error('S3 Client configuration failed.');
|
||||
}
|
||||
|
||||
async function ensureDirExists(dirPath, purpose) {
|
||||
try {
|
||||
if (!fsSync.existsSync(dirPath)) {
|
||||
await fs.mkdir(dirPath, { recursive: true });
|
||||
logger.info(`[S3 Adapter] Created local ${purpose} directory: ${dirPath}`);
|
||||
}
|
||||
}
|
||||
|
||||
// --- Initiate Multipart Upload for Non-Zero Files ---
|
||||
try {
|
||||
const createCommand = new CreateMultipartUploadCommand({
|
||||
Bucket: config.s3BucketName,
|
||||
Key: s3Key,
|
||||
// TODO: Consider adding ContentType if available/reliable: metadata.contentType
|
||||
// TODO: Consider adding Metadata: { 'original-filename': filename } ?
|
||||
});
|
||||
|
||||
const response = await s3Client.send(createCommand);
|
||||
const s3UploadId = response.UploadId;
|
||||
|
||||
if (!s3UploadId) {
|
||||
throw new Error('S3 did not return an UploadId');
|
||||
}
|
||||
|
||||
logger.info(`[S3 Adapter] Initiated multipart upload for ${s3Key} (S3 UploadId: ${s3UploadId})`);
|
||||
|
||||
// --- Create and Persist Local Metadata ---
|
||||
const batchId = clientBatchId || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
|
||||
const metadata = {
|
||||
appUploadId: appUploadId, // Store our ID
|
||||
s3UploadId: s3UploadId,
|
||||
s3Key: s3Key,
|
||||
originalFilename: filename, // Keep original for notifications etc.
|
||||
fileSize: size,
|
||||
bytesReceived: 0, // Track approximate bytes locally
|
||||
parts: [], // Array to store { PartNumber, ETag }
|
||||
batchId,
|
||||
createdAt: Date.now(),
|
||||
lastActivity: Date.now()
|
||||
};
|
||||
|
||||
await writeUploadMetadata(appUploadId, metadata); // Write metadata keyed by our appUploadId
|
||||
|
||||
return { uploadId: appUploadId }; // Return OUR internal upload ID to the client
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to initiate multipart upload for ${s3Key}: ${err.message}`);
|
||||
// TODO: Map specific S3 errors (e.g., NoSuchBucket, AccessDenied) to better client messages
|
||||
throw err;
|
||||
}
|
||||
await fs.access(dirPath, fsSync.constants.W_OK);
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Local ${purpose} directory error (${dirPath}): ${err.message}`);
|
||||
throw new Error(`Failed to access/create local ${purpose} directory: ${dirPath}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Uploads a chunk as a part to S3.
|
||||
* @param {string} appUploadId - The application's upload ID.
|
||||
* @param {Buffer} chunk - The data chunk to store.
|
||||
* @param {number} partNumber - The sequential number of this part (starting from 1).
|
||||
* @returns {Promise<{bytesReceived: number, progress: number, completed: boolean}>} Upload status.
|
||||
*/
|
||||
async function storeChunk(appUploadId, chunk, partNumber) {
|
||||
const chunkSize = chunk.length;
|
||||
if (!chunkSize) throw new Error('Empty chunk received');
|
||||
if (partNumber < 1) throw new Error('PartNumber must be 1 or greater');
|
||||
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata || !metadata.s3UploadId) { // Check for s3UploadId presence
|
||||
logger.warn(`[S3 Adapter] Metadata or S3 UploadId not found for chunk: ${appUploadId}. Upload might be complete, cancelled, or zero-byte.`);
|
||||
throw new Error('Upload session not found or already completed');
|
||||
}
|
||||
|
||||
// --- Sanity Check ---
|
||||
// S3 handles duplicate part uploads gracefully (last one wins), so less critical than local append.
|
||||
// We still track bytesReceived locally for progress approximation.
|
||||
if (metadata.bytesReceived >= metadata.fileSize && metadata.fileSize > 0) {
|
||||
logger.warn(`[S3 Adapter] Received chunk for already completed upload ${appUploadId}. Ignoring.`);
|
||||
// Can't really finalize again easily without full parts list. Indicate completion based on local state.
|
||||
const progress = metadata.fileSize > 0 ? 100 : 0;
|
||||
return { bytesReceived: metadata.bytesReceived, progress, completed: true };
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
const uploadPartCommand = new UploadPartCommand({
|
||||
Bucket: config.s3BucketName,
|
||||
Key: metadata.s3Key,
|
||||
UploadId: metadata.s3UploadId,
|
||||
Body: chunk,
|
||||
PartNumber: partNumber,
|
||||
ContentLength: chunkSize // Required for UploadPart
|
||||
});
|
||||
|
||||
const response = await s3Client.send(uploadPartCommand);
|
||||
const etag = response.ETag;
|
||||
|
||||
if (!etag) {
|
||||
throw new Error(`S3 did not return an ETag for PartNumber ${partNumber}`);
|
||||
}
|
||||
|
||||
// --- Update Local Metadata ---
|
||||
// Ensure parts are stored correctly
|
||||
metadata.parts = metadata.parts || [];
|
||||
metadata.parts.push({ PartNumber: partNumber, ETag: etag });
|
||||
// Sort parts just in case uploads happen out of order client-side (though unlikely with current client)
|
||||
metadata.parts.sort((a, b) => a.PartNumber - b.PartNumber);
|
||||
|
||||
// Update approximate bytes received
|
||||
metadata.bytesReceived = (metadata.bytesReceived || 0) + chunkSize;
|
||||
// Cap bytesReceived at fileSize for progress calculation
|
||||
metadata.bytesReceived = Math.min(metadata.bytesReceived, metadata.fileSize);
|
||||
|
||||
await writeUploadMetadata(appUploadId, metadata);
|
||||
|
||||
// --- Calculate Progress ---
|
||||
const progress = metadata.fileSize === 0 ? 100 :
|
||||
Math.min(Math.round((metadata.bytesReceived / metadata.fileSize) * 100), 100);
|
||||
|
||||
logger.debug(`[S3 Adapter] Part ${partNumber} uploaded for ${appUploadId} (ETag: ${etag}). Progress: ~${progress}%`);
|
||||
|
||||
// Check for completion potential based on local byte tracking
|
||||
const completed = metadata.bytesReceived >= metadata.fileSize;
|
||||
if (completed) {
|
||||
logger.info(`[S3 Adapter] Upload ${appUploadId} potentially complete based on bytes received.`);
|
||||
}
|
||||
|
||||
return { bytesReceived: metadata.bytesReceived, progress, completed };
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to upload part ${partNumber} for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
|
||||
// TODO: Map specific S3 errors (InvalidPart, SlowDown, etc.)
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
// Metadata functions (read, write, delete) remain largely the same as your last working version.
|
||||
// Ensure metadata includes: appUploadId, s3Key, originalFilename, fileSize, bytesReceived, batchId,
|
||||
// createdAt, lastActivity, isMultipartUpload, tempPartPath, s3UploadId (for MPU), parts (for MPU), currentPartBytes (for MPU).
|
||||
|
||||
async function readUploadMetadata(uploadId) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return null;
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
try {
|
||||
const data = await fs.readFile(metaFilePath, 'utf8');
|
||||
const metadata = JSON.parse(data);
|
||||
metadata.parts = metadata.parts || []; // Ensure parts array exists if MPU
|
||||
return metadata;
|
||||
} catch (err) {
|
||||
if (err.code === 'ENOENT') return null;
|
||||
logger.error(`[S3 Adapter] Error reading metadata for ${uploadId}: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function writeUploadMetadata(uploadId, metadata) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return;
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
metadata.lastActivity = Date.now();
|
||||
try {
|
||||
const tempMetaPath = `${metaFilePath}.${crypto.randomBytes(4).toString('hex')}.tmp`;
|
||||
await fs.writeFile(tempMetaPath, JSON.stringify(metadata, null, 2));
|
||||
await fs.rename(tempMetaPath, metaFilePath);
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Error writing metadata for ${uploadId}: ${err.message}`);
|
||||
try { await fs.unlink(tempMetaPath); } catch (unlinkErr) {/* ignore */}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function deleteUploadMetadata(uploadId) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return;
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
try { await fs.unlink(metaFilePath); }
|
||||
catch (err) { if (err.code !== 'ENOENT') logger.error(`[S3 Adapter] Err deleting meta ${uploadId}.meta: ${err.message}`);}
|
||||
}
|
||||
|
||||
|
||||
Promise.all([
|
||||
ensureDirExists(METADATA_DIR, 'metadata'),
|
||||
ensureDirExists(TEMP_CHUNK_DIR, 'part/small file buffering')
|
||||
]).catch(err => { logger.error(`[S3 Adapter] Critical dir ensure error: ${err.message}`); process.exit(1); });
|
||||
|
||||
async function initUpload(filename, fileSize, clientBatchId) {
|
||||
const size = Number(fileSize);
|
||||
const appUploadId = crypto.randomBytes(16).toString('hex');
|
||||
const sanitizedFilename = sanitizePathPreserveDirs(filename);
|
||||
const s3Key = path.normalize(sanitizedFilename).replace(/^(\.\.(\/|\\|$))+/, '').replace(/\\/g, '/').replace(/^\/+/, '');
|
||||
logger.info(`[S3 Adapter] Init: Key: ${s3Key}, Size: ${size}`);
|
||||
|
||||
const batchId = clientBatchId || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
|
||||
const tempPartPath = path.join(TEMP_CHUNK_DIR, `${appUploadId}.partbuffer`); // Buffer for current part or small file
|
||||
|
||||
const baseMetadata = {
|
||||
appUploadId, s3Key, originalFilename: filename, fileSize: size,
|
||||
bytesReceived: 0, batchId, createdAt: Date.now(), lastActivity: Date.now(),
|
||||
tempPartPath, currentPartBytes: 0, parts: []
|
||||
};
|
||||
|
||||
if (size === 0) {
|
||||
await s3Client.send(new PutObjectCommand({ Bucket: config.s3BucketName, Key: s3Key, Body: '', ContentLength: 0 }));
|
||||
logger.success(`[S3 Adapter] Zero-byte uploaded: ${s3Key}`);
|
||||
sendNotification(filename, 0, config);
|
||||
return { uploadId: `zero-byte-${appUploadId}` };
|
||||
}
|
||||
|
||||
let metadata;
|
||||
if (size < MIN_S3_TOTAL_SIZE_FOR_MULTIPART) {
|
||||
logger.info(`[S3 Adapter] Small file (<${MIN_S3_TOTAL_SIZE_FOR_MULTIPART}B). Will use single PUT.`);
|
||||
metadata = { ...baseMetadata, isMultipartUpload: false };
|
||||
} else {
|
||||
logger.info(`[S3 Adapter] Large file (>=${MIN_S3_TOTAL_SIZE_FOR_MULTIPART}B). Will use MPU.`);
|
||||
const mpuResponse = await s3Client.send(new CreateMultipartUploadCommand({ Bucket: config.s3BucketName, Key: s3Key }));
|
||||
if (!mpuResponse.UploadId) throw new Error('S3 did not return UploadId for MPU.');
|
||||
metadata = { ...baseMetadata, isMultipartUpload: true, s3UploadId: mpuResponse.UploadId };
|
||||
logger.info(`[S3 Adapter] MPU initiated for ${s3Key}, S3UploadId: ${metadata.s3UploadId}`);
|
||||
}
|
||||
|
||||
await fs.writeFile(tempPartPath, ''); // Create empty buffer file
|
||||
await writeUploadMetadata(appUploadId, metadata);
|
||||
logger.info(`[S3 Adapter] Initialized upload ${appUploadId} for ${s3Key}. Temp buffer: ${tempPartPath}. MPU: ${metadata.isMultipartUpload}`);
|
||||
return { uploadId: appUploadId };
|
||||
}
|
||||
|
||||
async function _uploadBufferedPart(metadata) {
|
||||
const partBuffer = await fs.readFile(metadata.tempPartPath);
|
||||
if (partBuffer.length === 0) {
|
||||
logger.warn(`[S3 Adapter MPU] Attempted to upload empty part for ${metadata.appUploadId}. Skipping.`);
|
||||
return; // Don't upload empty parts
|
||||
}
|
||||
|
||||
const partNumber = metadata.parts.length + 1;
|
||||
logger.info(`[S3 Adapter MPU] Uploading part ${partNumber} (${partBuffer.length} bytes) for ${metadata.appUploadId} (Key: ${metadata.s3Key})`);
|
||||
|
||||
/**
|
||||
* Finalizes a completed S3 multipart upload.
|
||||
* @param {string} appUploadId - The application's upload ID.
|
||||
* @returns {Promise<{filename: string, size: number, finalPath: string}>} Details of the completed file (finalPath is S3 Key).
|
||||
*/
|
||||
async function completeUpload(appUploadId) {
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata || !metadata.s3UploadId || !metadata.parts || metadata.parts.length === 0) {
|
||||
logger.warn(`[S3 Adapter] completeUpload called for ${appUploadId}, but metadata, S3 UploadId, or parts list is missing/empty. Assuming already completed or invalid state.`);
|
||||
// Check if object exists as a fallback? Risky.
|
||||
throw new Error('Upload completion failed: Required metadata or parts list not found');
|
||||
}
|
||||
const uploadPartCmd = new UploadPartCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
|
||||
Body: partBuffer, PartNumber: partNumber, ContentLength: partBuffer.length
|
||||
});
|
||||
const partResponse = await s3Client.send(uploadPartCmd);
|
||||
metadata.parts.push({ PartNumber: partNumber, ETag: partResponse.ETag });
|
||||
|
||||
// Basic check if enough bytes were tracked locally (approximate check)
|
||||
if (metadata.bytesReceived < metadata.fileSize) {
|
||||
logger.warn(`[S3 Adapter] Attempting to complete upload ${appUploadId} but locally tracked bytes (${metadata.bytesReceived}) are less than expected size (${metadata.fileSize}). Proceeding anyway.`);
|
||||
}
|
||||
|
||||
try {
|
||||
const completeCommand = new CompleteMultipartUploadCommand({
|
||||
Bucket: config.s3BucketName,
|
||||
Key: metadata.s3Key,
|
||||
UploadId: metadata.s3UploadId,
|
||||
MultipartUpload: {
|
||||
Parts: metadata.parts // Use the collected parts { PartNumber, ETag }
|
||||
},
|
||||
});
|
||||
|
||||
const response = await s3Client.send(completeCommand);
|
||||
// Example response: { ETag: '"..."', Location: '...', Key: '...', Bucket: '...' }
|
||||
|
||||
logger.success(`[S3 Adapter] Finalized multipart upload: ${metadata.s3Key} (ETag: ${response.ETag})`);
|
||||
|
||||
// Clean up local metadata AFTER successful S3 completion
|
||||
await deleteUploadMetadata(appUploadId);
|
||||
|
||||
// Send notification
|
||||
sendNotification(metadata.originalFilename, metadata.fileSize, config);
|
||||
|
||||
// Return info consistent with local adapter where possible
|
||||
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to complete multipart upload for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
|
||||
// Specific S3 errors like InvalidPartOrder, EntityTooSmall might occur here.
|
||||
// If Complete fails, S3 *might* have already assembled it (rare).
|
||||
// Check if the object now exists? If so, maybe delete metadata? Complex recovery.
|
||||
// For now, just log the error and throw. The local metadata will persist.
|
||||
if (err.Code === 'NoSuchUpload') {
|
||||
logger.warn(`[S3 Adapter] CompleteMultipartUpload failed with NoSuchUpload for ${appUploadId}. Assuming already completed or aborted.`);
|
||||
await deleteUploadMetadata(appUploadId).catch(()=>{}); // Attempt metadata cleanup
|
||||
// Check if final object exists?
|
||||
try {
|
||||
// Use GetObject or HeadObject to check
|
||||
await s3Client.send(new GetObjectCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key }));
|
||||
logger.info(`[S3 Adapter] Final object ${metadata.s3Key} exists after NoSuchUpload error. Treating as completed.`);
|
||||
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
|
||||
} catch (headErr) {
|
||||
// Final object doesn't exist either.
|
||||
throw new Error('Completion failed: Upload session not found and final object does not exist.');
|
||||
}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
// Reset buffer for next part
|
||||
await fs.writeFile(metadata.tempPartPath, '');
|
||||
metadata.currentPartBytes = 0;
|
||||
logger.info(`[S3 Adapter MPU] Part ${partNumber} uploaded for ${metadata.appUploadId}. ETag: ${partResponse.ETag}`);
|
||||
}
|
||||
|
||||
async function storeChunk(appUploadId, chunk) {
|
||||
const chunkSize = chunk.length;
|
||||
if (!chunkSize) throw new Error('Empty chunk received for storeChunk');
|
||||
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata) throw new Error(`Upload session ${appUploadId} not found or already completed.`);
|
||||
|
||||
await fs.appendFile(metadata.tempPartPath, chunk);
|
||||
metadata.bytesReceived += chunkSize;
|
||||
metadata.currentPartBytes += chunkSize;
|
||||
|
||||
let justUploadedAPart = false;
|
||||
if (metadata.isMultipartUpload) {
|
||||
// Upload part if buffer is full or it's the last overall chunk for the file
|
||||
const isLastChunkOfFile = metadata.bytesReceived >= metadata.fileSize;
|
||||
if (metadata.currentPartBytes >= S3_PART_SIZE || (isLastChunkOfFile && metadata.currentPartBytes > 0)) {
|
||||
await _uploadBufferedPart(metadata);
|
||||
justUploadedAPart = true; // indicates that currentPartBytes was reset
|
||||
}
|
||||
|
||||
/**
|
||||
* Aborts an ongoing S3 multipart upload.
|
||||
* @param {string} appUploadId - The application's upload ID.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async function abortUpload(appUploadId) {
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata || !metadata.s3UploadId) {
|
||||
logger.warn(`[S3 Adapter] Abort request for non-existent or completed upload: ${appUploadId}`);
|
||||
await deleteUploadMetadata(appUploadId); // Clean up local metadata if it exists anyway
|
||||
return;
|
||||
}
|
||||
|
||||
await writeUploadMetadata(appUploadId, metadata); // Persist bytesReceived, parts array, currentPartBytes
|
||||
|
||||
const progress = metadata.fileSize === 0 ? 100 : Math.min(Math.round((metadata.bytesReceived / metadata.fileSize) * 100), 100);
|
||||
const completed = metadata.bytesReceived >= metadata.fileSize;
|
||||
|
||||
logger.debug(`[S3 Adapter] Chunk stored for ${appUploadId}. Total ${metadata.bytesReceived}/${metadata.fileSize} (${progress}%). Part buffered: ${metadata.currentPartBytes}. Part uploaded: ${justUploadedAPart}`);
|
||||
if (completed) logger.info(`[S3 Adapter] All data for ${appUploadId} received locally. Ready for S3 finalization.`);
|
||||
|
||||
return { bytesReceived: metadata.bytesReceived, progress, completed };
|
||||
}
|
||||
|
||||
async function completeUpload(appUploadId) {
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata) throw new Error(`Cannot complete: Metadata for ${appUploadId} not found.`);
|
||||
|
||||
if (metadata.bytesReceived < metadata.fileSize) {
|
||||
logger.error(`[S3 Adapter] FATAL: Attempt to complete ${appUploadId} with ${metadata.bytesReceived}/${metadata.fileSize} bytes.`);
|
||||
throw new Error(`Incomplete data for ${appUploadId} cannot be finalized.`);
|
||||
}
|
||||
|
||||
try {
|
||||
if (metadata.isMultipartUpload) {
|
||||
// If there's any data left in the buffer for the last part, upload it
|
||||
if (metadata.currentPartBytes > 0) {
|
||||
logger.info(`[S3 Adapter MPU] Uploading final remaining buffered part for ${appUploadId}`);
|
||||
await _uploadBufferedPart(metadata); // This will also update metadata and save it
|
||||
// Re-read metadata to get the final parts list if _uploadBufferedPart wrote it
|
||||
const updatedMetadata = await readUploadMetadata(appUploadId);
|
||||
if (!updatedMetadata) throw new Error("Metadata disappeared after final part upload.");
|
||||
metadata.parts = updatedMetadata.parts;
|
||||
}
|
||||
|
||||
try {
|
||||
const abortCommand = new AbortMultipartUploadCommand({
|
||||
Bucket: config.s3BucketName,
|
||||
Key: metadata.s3Key,
|
||||
UploadId: metadata.s3UploadId,
|
||||
});
|
||||
await s3Client.send(abortCommand);
|
||||
logger.info(`[S3 Adapter] Aborted multipart upload: ${appUploadId} (Key: ${metadata.s3Key})`);
|
||||
} catch (err) {
|
||||
if (err.name === 'NoSuchUpload') {
|
||||
logger.warn(`[S3 Adapter] Multipart upload ${appUploadId} (Key: ${metadata.s3Key}) not found during abort. Already aborted or completed.`);
|
||||
} else {
|
||||
logger.error(`[S3 Adapter] Failed to abort multipart upload for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
|
||||
// Don't delete local metadata if abort failed, might be retryable or need manual cleanup
|
||||
throw err; // Rethrow S3 error
|
||||
}
|
||||
if (!metadata.parts || metadata.parts.length === 0 && metadata.fileSize > 0) { // fileSize > 0 check because a 0 byte file could be MPU if MIN_S3_TOTAL_SIZE_FOR_MULTIPART is 0
|
||||
throw new Error(`No parts recorded for MPU ${appUploadId} of size ${metadata.fileSize}. Cannot complete.`);
|
||||
}
|
||||
|
||||
// Delete local metadata AFTER successful abort or if NoSuchUpload
|
||||
await deleteUploadMetadata(appUploadId);
|
||||
const completeCmd = new CompleteMultipartUploadCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
|
||||
MultipartUpload: { Parts: metadata.parts },
|
||||
});
|
||||
const response = await s3Client.send(completeCmd);
|
||||
logger.success(`[S3 Adapter MPU] Finalized: ${metadata.s3Key} (ETag: ${response.ETag})`);
|
||||
} else {
|
||||
// Single PUT for small files
|
||||
const fileBuffer = await fs.readFile(metadata.tempPartPath);
|
||||
if (fileBuffer.length !== metadata.fileSize) {
|
||||
throw new Error(`Buffered size ${fileBuffer.length} != metadata size ${metadata.fileSize} for ${appUploadId}`);
|
||||
}
|
||||
logger.info(`[S3 Adapter SinglePut] Uploading ${metadata.s3Key} via single PutObject from buffer.`);
|
||||
await s3Client.send(new PutObjectCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key,
|
||||
Body: fileBuffer, ContentLength: metadata.fileSize
|
||||
}));
|
||||
logger.success(`[S3 Adapter SinglePut] Finalized: ${metadata.s3Key}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Lists files in the S3 bucket.
|
||||
* @returns {Promise<Array<{filename: string, size: number, formattedSize: string, uploadDate: Date}>>} List of files.
|
||||
*/
|
||||
async function listFiles() {
|
||||
try {
|
||||
const command = new ListObjectsV2Command({
|
||||
Bucket: config.s3BucketName,
|
||||
// Optional: Add Prefix if you want to list within a specific 'folder'
|
||||
// Prefix: 'uploads/'
|
||||
});
|
||||
// TODO: Add pagination handling if expecting >1000 objects
|
||||
const response = await s3Client.send(command);
|
||||
|
||||
const files = (response.Contents || [])
|
||||
// Optional: Filter out objects that might represent folders if necessary
|
||||
// .filter(item => !(item.Key.endsWith('/') && item.Size === 0))
|
||||
.map(item => ({
|
||||
filename: item.Key, // S3 Key is the filename/path
|
||||
size: item.Size,
|
||||
formattedSize: formatFileSize(item.Size), // Use utility
|
||||
uploadDate: item.LastModified
|
||||
}));
|
||||
|
||||
// Sort by date, newest first
|
||||
files.sort((a, b) => b.uploadDate.getTime() - a.uploadDate.getTime());
|
||||
|
||||
return files;
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to list objects in bucket ${config.s3BucketName}: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
|
||||
await fs.unlink(metadata.tempPartPath).catch(err => logger.warn(`[S3 Adapter] Failed to delete temp part buffer ${metadata.tempPartPath}: ${err.message}`));
|
||||
await deleteUploadMetadata(appUploadId);
|
||||
sendNotification(metadata.originalFilename, metadata.fileSize, config);
|
||||
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to complete S3 upload for ${appUploadId} (Key: ${metadata.s3Key}, MPU: ${metadata.isMultipartUpload}): ${err.message} ${err.name}`);
|
||||
if (err && err.$response) {
|
||||
const responseBody = err.$response.body ? await streamToString(err.$response.body) : "No body/streamed";
|
||||
console.error("[S3 Adapter] Raw Error $response:", {statusCode: err.$response.statusCode, headers: err.$response.headers, body: responseBody});
|
||||
} else {
|
||||
console.error("[S3 Adapter] Error object (no $response):", JSON.stringify(err, Object.getOwnPropertyNames(err), 2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a presigned URL for downloading an S3 object.
|
||||
* @param {string} s3Key - The S3 Key (filename/path) of the object.
|
||||
* @returns {Promise<{type: string, value: string}>} Object indicating type ('url') and value (the presigned URL).
|
||||
*/
|
||||
async function getDownloadUrlOrStream(s3Key) {
|
||||
// Input `s3Key` is assumed to be sanitized by the calling route/logic
|
||||
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) {
|
||||
logger.error(`[S3 Adapter] Invalid S3 key detected for download: ${s3Key}`);
|
||||
throw new Error('Invalid filename');
|
||||
}
|
||||
|
||||
try {
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: config.s3BucketName,
|
||||
Key: s3Key,
|
||||
// Optional: Override response headers like filename
|
||||
// ResponseContentDisposition: `attachment; filename="${path.basename(s3Key)}"`
|
||||
});
|
||||
|
||||
// Generate presigned URL (expires in 1 hour by default, adjustable)
|
||||
const url = await getSignedUrl(s3Client, command, { expiresIn: 3600 });
|
||||
logger.info(`[S3 Adapter] Generated presigned URL for ${s3Key}`);
|
||||
return { type: 'url', value: url };
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to generate presigned URL for ${s3Key}: ${err.message}`);
|
||||
if (err.name === 'NoSuchKey') {
|
||||
throw new Error('File not found in S3');
|
||||
}
|
||||
throw err; // Re-throw other S3 errors
|
||||
}
|
||||
if (metadata.isMultipartUpload && metadata.s3UploadId) {
|
||||
logger.warn(`[S3 Adapter MPU] Attempting to abort failed MPU ${metadata.s3UploadId}`);
|
||||
await s3Client.send(new AbortMultipartUploadCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId }))
|
||||
.catch(abortErr => logger.error(`[S3 Adapter MPU] Failed to abort MPU after error: ${abortErr.message}`));
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes an object from the S3 bucket.
|
||||
* @param {string} s3Key - The S3 Key (filename/path) of the object to delete.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async function deleteFile(s3Key) {
|
||||
// Input `s3Key` is assumed to be sanitized
|
||||
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) {
|
||||
logger.error(`[S3 Adapter] Invalid S3 key detected for delete: ${s3Key}`);
|
||||
throw new Error('Invalid filename');
|
||||
}
|
||||
|
||||
try {
|
||||
const command = new DeleteObjectCommand({
|
||||
Bucket: config.s3BucketName,
|
||||
Key: s3Key,
|
||||
});
|
||||
await s3Client.send(command);
|
||||
logger.info(`[S3 Adapter] Deleted object: ${s3Key}`);
|
||||
} catch (err) {
|
||||
// DeleteObject is idempotent, so NoSuchKey isn't typically an error unless you need to know.
|
||||
logger.error(`[S3 Adapter] Failed to delete object ${s3Key}: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function abortUpload(appUploadId) {
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata) {
|
||||
logger.warn(`[S3 Adapter] Abort: Metadata for ${appUploadId} not found.`);
|
||||
await deleteUploadMetadata(appUploadId); // ensure local meta is gone
|
||||
const tempPartPath = path.join(TEMP_CHUNK_DIR, `${appUploadId}.partbuffer`); // guess path
|
||||
await fs.unlink(tempPartPath).catch(()=>{}); // try delete orphan buffer
|
||||
return;
|
||||
}
|
||||
|
||||
if (metadata.tempPartPath) {
|
||||
await fs.unlink(metadata.tempPartPath)
|
||||
.then(() => logger.info(`[S3 Adapter] Deleted temp part buffer on abort: ${metadata.tempPartPath}`))
|
||||
.catch(err => { if (err.code !== 'ENOENT') logger.error(`[S3 Adapter] Err deleting temp buffer ${metadata.tempPartPath} on abort: ${err.message}`);});
|
||||
}
|
||||
|
||||
if (metadata.isMultipartUpload && metadata.s3UploadId) {
|
||||
try {
|
||||
await s3Client.send(new AbortMultipartUploadCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
|
||||
}));
|
||||
logger.info(`[S3 Adapter MPU] Aborted S3 MPU: ${metadata.s3UploadId}`);
|
||||
} catch (err) {
|
||||
if (err.name !== 'NoSuchUpload') { logger.error(`[S3 Adapter MPU] Failed to abort S3 MPU ${metadata.s3UploadId}: ${err.message}`);}
|
||||
else {logger.warn(`[S3 Adapter MPU] S3 MPU ${metadata.s3UploadId} not found during abort (already gone).`);}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up stale *local* metadata files for S3 uploads.
|
||||
* Relies on S3 Lifecycle Policies for actual S3 cleanup.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async function cleanupStale() {
|
||||
logger.info('[S3 Adapter] Running cleanup for stale local metadata files...');
|
||||
let cleanedCount = 0;
|
||||
let checkedCount = 0;
|
||||
|
||||
}
|
||||
await deleteUploadMetadata(appUploadId);
|
||||
}
|
||||
|
||||
|
||||
async function listFiles() {
|
||||
try {
|
||||
const command = new ListObjectsV2Command({ Bucket: config.s3BucketName });
|
||||
const response = await s3Client.send(command);
|
||||
const files = (response.Contents || [])
|
||||
.map(item => ({
|
||||
filename: item.Key, size: item.Size,
|
||||
formattedSize: formatFileSize(item.Size), uploadDate: item.LastModified
|
||||
})).sort((a, b) => b.uploadDate.getTime() - a.uploadDate.getTime());
|
||||
return files;
|
||||
} catch (err) { logger.error(`[S3 Adapter] Failed to list S3 objects: ${err.message}`); throw err; }
|
||||
}
|
||||
|
||||
async function getDownloadUrlOrStream(s3Key) {
|
||||
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid S3 key for download');
|
||||
try {
|
||||
const url = await getSignedUrl(s3Client, new GetObjectCommand({ Bucket: config.s3BucketName, Key: s3Key }), { expiresIn: 3600 });
|
||||
logger.info(`[S3 Adapter] Generated presigned URL for ${s3Key}`);
|
||||
return { type: 'url', value: url };
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to get presigned URL for ${s3Key}: ${err.message}`);
|
||||
if (err.name === 'NoSuchKey') throw new Error('File not found in S3');
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function deleteFile(s3Key) {
|
||||
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid S3 key for delete');
|
||||
try {
|
||||
await s3Client.send(new DeleteObjectCommand({ Bucket: config.s3BucketName, Key: s3Key }));
|
||||
logger.info(`[S3 Adapter] Deleted S3 object: ${s3Key}`);
|
||||
} catch (err) { logger.error(`[S3 Adapter] Failed to delete S3 object ${s3Key}: ${err.message}`); throw err; }
|
||||
}
|
||||
|
||||
async function cleanupStale() {
|
||||
logger.info('[S3 Adapter] Cleaning stale local metadata & temp part buffers...');
|
||||
let cleanedMeta = 0, checkedMeta = 0, cleanedBuffers = 0;
|
||||
const now = Date.now();
|
||||
|
||||
try { // Stale .meta files
|
||||
const metaFiles = await fs.readdir(METADATA_DIR);
|
||||
for (const file of metaFiles) {
|
||||
if (!file.endsWith('.meta')) continue;
|
||||
checkedMeta++;
|
||||
const appUploadId = file.replace('.meta', '');
|
||||
const metaFilePath = path.join(METADATA_DIR, file);
|
||||
try {
|
||||
await ensureMetadataDirExists(); // Re-check
|
||||
|
||||
const files = await fs.readdir(METADATA_DIR);
|
||||
const now = Date.now();
|
||||
|
||||
for (const file of files) {
|
||||
if (file.endsWith('.meta')) {
|
||||
checkedCount++;
|
||||
const appUploadId = file.replace('.meta', '');
|
||||
const metaFilePath = path.join(METADATA_DIR, file);
|
||||
|
||||
try {
|
||||
const data = await fs.readFile(metaFilePath, 'utf8');
|
||||
const metadata = JSON.parse(data);
|
||||
|
||||
// Check inactivity based on local metadata timestamp
|
||||
if (now - (metadata.lastActivity || metadata.createdAt || 0) > UPLOAD_TIMEOUT) {
|
||||
logger.warn(`[S3 Adapter] Found stale local metadata: ${file}. Last activity: ${new Date(metadata.lastActivity || metadata.createdAt)}. S3 UploadId: ${metadata.s3UploadId || 'N/A'}`);
|
||||
|
||||
// Only delete the LOCAL metadata file. DO NOT ABORT S3 UPLOAD HERE.
|
||||
await deleteUploadMetadata(appUploadId); // Use helper
|
||||
cleanedCount++;
|
||||
}
|
||||
} catch (readErr) {
|
||||
logger.error(`[S3 Adapter] Error reading/parsing local metadata ${metaFilePath} during cleanup: ${readErr.message}. Skipping.`);
|
||||
await fs.unlink(metaFilePath).catch(()=>{ logger.warn(`[S3 Adapter] Failed to delete potentially corrupt local metadata file: ${metaFilePath}`) });
|
||||
}
|
||||
} else if (file.endsWith('.tmp')) {
|
||||
// Clean up potential leftover temp metadata files (same as local adapter)
|
||||
const tempMetaPath = path.join(METADATA_DIR, file);
|
||||
try {
|
||||
const stats = await fs.stat(tempMetaPath);
|
||||
if (now - stats.mtime.getTime() > UPLOAD_TIMEOUT) {
|
||||
logger.warn(`[S3 Adapter] Deleting stale temporary local metadata file: ${file}`);
|
||||
await fs.unlink(tempMetaPath);
|
||||
}
|
||||
} catch (statErr) {
|
||||
if (statErr.code !== 'ENOENT') {
|
||||
logger.error(`[S3 Adapter] Error checking temp local metadata file ${tempMetaPath}: ${statErr.message}`);
|
||||
}
|
||||
}
|
||||
const metadata = JSON.parse(await fs.readFile(metaFilePath, 'utf8'));
|
||||
if (now - (metadata.lastActivity || metadata.createdAt || 0) > UPLOAD_TIMEOUT) {
|
||||
logger.warn(`[S3 Adapter] Stale meta: ${file}. AppUploadId: ${appUploadId}. Cleaning.`);
|
||||
if (metadata.tempPartPath) {
|
||||
await fs.unlink(metadata.tempPartPath).then(()=>cleanedBuffers++).catch(()=>{});
|
||||
}
|
||||
if (metadata.isMultipartUpload && metadata.s3UploadId) {
|
||||
await s3Client.send(new AbortMultipartUploadCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId }))
|
||||
.catch(e => {if (e.name !== 'NoSuchUpload') logger.error(`Stale MPU abort fail: ${e.message}`);});
|
||||
}
|
||||
await fs.unlink(metaFilePath);
|
||||
cleanedMeta++;
|
||||
}
|
||||
|
||||
if (checkedCount > 0 || cleanedCount > 0) {
|
||||
logger.info(`[S3 Adapter] Local metadata cleanup finished. Checked: ${checkedCount}, Cleaned stale local files: ${cleanedCount}.`);
|
||||
}
|
||||
|
||||
// Log the crucial recommendation
|
||||
logger.warn(`[S3 Adapter] IMPORTANT: For S3 storage, configure Lifecycle Rules on your bucket (${config.s3BucketName}) or use provider-specific tools to automatically clean up incomplete multipart uploads after a few days. This adapter only cleans up local tracking files.`);
|
||||
|
||||
} catch (err) {
|
||||
if (err.code === 'ENOENT' && err.path === METADATA_DIR) {
|
||||
logger.warn('[S3 Adapter] Local metadata directory not found during cleanup scan.');
|
||||
} else {
|
||||
logger.error(`[S3 Adapter] Error during local metadata cleanup scan: ${err.message}`);
|
||||
}
|
||||
} catch (e) { logger.error(`Error processing stale meta ${file}: ${e.message}. Deleting.`); await fs.unlink(metaFilePath).catch(()=>{});}
|
||||
}
|
||||
} catch (e) { if (e.code !== 'ENOENT') logger.error(`Meta dir cleanup error: ${e.message}`);}
|
||||
|
||||
try { // Orphaned .partbuffer files
|
||||
const bufferFiles = await fs.readdir(TEMP_CHUNK_DIR);
|
||||
for (const file of bufferFiles) {
|
||||
if (!file.endsWith('.partbuffer')) continue;
|
||||
const bufferFilePath = path.join(TEMP_CHUNK_DIR, file);
|
||||
const stats = await fs.stat(bufferFilePath);
|
||||
if (now - stats.mtime.getTime() > UPLOAD_TIMEOUT) {
|
||||
logger.warn(`[S3 Adapter] Stale orphaned buffer: ${file}. Deleting.`);
|
||||
await fs.unlink(bufferFilePath);
|
||||
cleanedBuffers++;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
initUpload,
|
||||
storeChunk,
|
||||
completeUpload,
|
||||
abortUpload,
|
||||
listFiles,
|
||||
getDownloadUrlOrStream,
|
||||
deleteFile,
|
||||
cleanupStale
|
||||
};
|
||||
} catch (e) { if (e.code !== 'ENOENT') logger.error(`Temp buffer dir cleanup error: ${e.message}`);}
|
||||
|
||||
if (checkedMeta || cleanedMeta || cleanedBuffers) {
|
||||
logger.info(`[S3 Adapter] Local cleanup: MetaChecked: ${checkedMeta}, MetaCleaned: ${cleanedMeta}, BuffersCleaned: ${cleanedBuffers}.`);
|
||||
}
|
||||
logger.warn(`[S3 Adapter] Reminder: Configure S3 Lifecycle Rules on bucket '${config.s3BucketName}' for S3-side MPU cleanup.`);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
initUpload, storeChunk, completeUpload, abortUpload,
|
||||
listFiles, getDownloadUrlOrStream, deleteFile, cleanupStale
|
||||
};
|
Reference in New Issue
Block a user