3 Commits

Author SHA1 Message Date
greirson
369077676d refactor(storage): Streamline S3 adapter code and improve metadata handling
- Simplified S3 client initialization and improved error handling for configuration failures.
- Refactored metadata management functions to enhance clarity and maintainability.
- Updated upload logic to better handle multipart uploads and buffer management.
- Improved logging for upload progress and error scenarios, ensuring better visibility during operations.

This commit enhances the S3 adapter's code quality and improves the overall upload process by refining metadata management and error handling.
2025-05-06 13:50:44 -07:00
greirson
165223f8ed feat(upload): Enhance file upload functionality with improved metadata management and user experience
- Updated index.html to accept only image and video file types for uploads, enhancing user guidance.
- Refactored upload.js to improve error handling and validation for batch uploads, ensuring better user feedback.
- Introduced quality-of-life features in the upload process, including a flag for active uploads to prevent page closure during uploads.
- Enhanced S3 adapter to manage upload metadata more effectively, allowing for resumable uploads and better tracking of upload states.

This commit significantly improves the upload experience by refining file type handling, enhancing error management, and ensuring robust metadata support for uploads.
2025-05-06 13:44:26 -07:00
greirson
1273fe92b1 feat(storage): Enhance S3 adapter with unique folder prefix handling
- Added functionality to ensure top-level folder prefixes are unique per batch to prevent collisions during uploads.
- Improved error logging for S3 operations, including detailed context for failures.
- Refactored metadata management functions to streamline the upload process and ensure robust handling of local metadata.
- Updated the S3 client initialization and metadata directory management for better reliability.

This commit enhances the S3 adapter's capability to manage file uploads more effectively, improving user experience and reducing potential conflicts during concurrent uploads.
2025-05-06 10:50:33 -07:00
4 changed files with 1226 additions and 1066 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,200 +1,152 @@
/**
* File upload route handlers.
* Delegates storage operations to the configured storage adapter.
* Handles multipart uploads via adapter logic.
* File upload route handlers and batch upload management.
* Handles file uploads, chunked transfers, and folder creation.
* Manages upload sessions using persistent metadata for resumability.
*/
const express = require('express');
const router = express.Router();
const path = require('path'); // Still needed for extension checks
const { config } = require('../config');
const logger = require('../utils/logger');
const { storageAdapter } = require('../storage'); // Import the adapter factory's result
const { isDemoMode } = require('../utils/demoMode'); // Keep demo check for specific route behavior if needed
const { isDemoMode } = require('../utils/demoMode');
const { storageAdapter } = require('../storage'); // Import the storage adapter
const { isValidBatchId } = require('../utils/fileUtils');
const crypto = require('crypto'); // Keep crypto for demo mode uploadId
// --- Routes ---
// Initialize upload
router.post('/init', async (req, res) => {
// Note: Demo mode might bypass storage adapter logic via middleware or adapter factory itself.
// If specific demo responses are needed here, keep the check.
// DEMO MODE CHECK - Bypass persistence if in demo mode
if (isDemoMode()) {
// Simplified Demo Response (assuming demoAdapter handles non-persistence)
const { filename = 'demo_file', fileSize = 0 } = req.body;
const demoUploadId = 'demo-' + Math.random().toString(36).substr(2, 9);
logger.info(`[DEMO] Init request for ${filename}, size ${fileSize}. Returning ID ${demoUploadId}`);
if (Number(fileSize) === 0) {
logger.success(`[DEMO] Simulated completion of zero-byte file: ${filename}`);
// Potentially call demoAdapter.completeUpload or similar mock logic if needed
}
return res.json({ uploadId: demoUploadId });
const { filename, fileSize } = req.body;
const uploadId = `demo-${crypto.randomBytes(16).toString('hex')}`;
logger.info(`[DEMO] Initialized upload for ${filename} (${fileSize} bytes) with ID ${uploadId}`);
// Simulate zero-byte completion for demo
if (Number(fileSize) === 0) {
logger.success(`[DEMO] Completed zero-byte file upload: ${filename}`);
// sendNotification(filename, 0, config); // In demo, notifications are typically skipped or mocked by demoAdapter
}
return res.json({ uploadId });
}
const { filename, fileSize } = req.body;
const clientBatchId = req.headers['x-batch-id']; // Adapter might use this
const clientBatchId = req.headers['x-batch-id'];
// --- Basic validations ---
if (!filename) return res.status(400).json({ error: 'Missing filename' });
if (fileSize === undefined || fileSize === null) return res.status(400).json({ error: 'Missing fileSize' });
const size = Number(fileSize);
if (isNaN(size) || size < 0) return res.status(400).json({ error: 'Invalid file size' });
const maxSizeInBytes = config.maxFileSize;
if (size > maxSizeInBytes) return res.status(413).json({ error: 'File too large', limit: maxSizeInBytes });
// --- Max File Size Check ---
if (size > config.maxFileSize) {
logger.warn(`Upload rejected: File size ${size} exceeds limit ${config.maxFileSize}`);
return res.status(413).json({ error: 'File too large', limit: config.maxFileSize });
}
// --- Extension Check ---
// Perform extension check before handing off to adapter
if (config.allowedExtensions && config.allowedExtensions.length > 0) {
const fileExt = path.extname(filename).toLowerCase();
// Check if the extracted extension (including '.') is in the allowed list
if (!fileExt || !config.allowedExtensions.includes(fileExt)) {
logger.warn(`Upload rejected: File type not allowed: ${filename} (Extension: ${fileExt || 'none'})`);
return res.status(400).json({ error: 'File type not allowed', receivedExtension: fileExt || 'none' });
}
logger.debug(`File extension ${fileExt} allowed for ${filename}`);
// Validate clientBatchId if provided
if (clientBatchId && !isValidBatchId(clientBatchId)) {
return res.status(400).json({ error: 'Invalid batch ID format' });
}
try {
// Delegate initialization to the storage adapter
const result = await storageAdapter.initUpload(filename, size, clientBatchId);
// Respond with the uploadId generated by the adapter/system
res.json({ uploadId: result.uploadId });
const { uploadId } = await storageAdapter.initUpload(filename, size, clientBatchId);
logger.info(`[Route /init] Storage adapter initialized upload: ${uploadId} for ${filename}`);
res.json({ uploadId });
} catch (err) {
logger.error(`[Route /init] Upload initialization failed: ${err.message}`, err.stack);
// Map common errors
let statusCode = 500;
let clientMessage = 'Failed to initialize upload.';
if (err.message.includes('Invalid batch ID format')) {
statusCode = 400;
clientMessage = err.message;
} else if (err.name === 'NoSuchBucket' || err.name === 'AccessDenied') { // S3 Specific
statusCode = 500; // Internal config error
clientMessage = 'Storage configuration error.';
} else if (err.code === 'EACCES' || err.code === 'EPERM' || err.message.includes('writable')) { // Local Specific
statusCode = 500;
clientMessage = 'Storage permission or access error.';
logger.error(`[Route /init] Upload initialization failed: ${err.message} ${err.stack}`);
// Check for specific error types if adapter throws them (e.g., size limit from adapter)
if (err.message.includes('File too large') || err.status === 413) {
return res.status(413).json({ error: 'File too large', details: err.message, limit: config.maxFileSize });
}
// Add more specific error mapping based on adapter exceptions if needed
res.status(statusCode).json({ error: clientMessage, details: err.message }); // Include details only for logging/debugging
if (err.message.includes('File type not allowed') || err.status === 400) {
return res.status(400).json({ error: 'File type not allowed', details: err.message });
}
return res.status(500).json({ error: 'Failed to initialize upload via adapter', details: err.message });
}
});
// Upload chunk
router.post('/chunk/:uploadId', express.raw({
limit: config.maxFileSize + (10 * 1024 * 1024), // Allow slightly larger raw body than max file size
type: 'application/octet-stream'
router.post('/chunk/:uploadId', express.raw({
limit: config.maxFileSize + (10 * 1024 * 1024), // Generous limit for raw body
type: 'application/octet-stream'
}), async (req, res) => {
const { uploadId } = req.params;
const chunk = req.body;
const clientBatchId = req.headers['x-batch-id']; // May be useful for logging context
// ** CRITICAL FOR S3: Get Part Number from client **
// Client needs to send this, e.g., ?partNumber=1, ?partNumber=2, ...
const partNumber = parseInt(req.query.partNumber || '1', 10);
if (isNaN(partNumber) || partNumber < 1) {
logger.error(`[Route /chunk] Invalid partNumber received: ${req.query.partNumber}`);
return res.status(400).json({ error: 'Missing or invalid partNumber query parameter (must be >= 1)' });
}
// Demo mode handling (simplified)
if (isDemoMode()) {
logger.debug(`[DEMO /chunk] Received chunk for ${uploadId}, part ${partNumber}, size ${chunk?.length || 0}`);
// Simulate progress - more sophisticated logic could go in a demoAdapter
const demoProgress = Math.min(100, Math.random() * 100);
const completed = demoProgress > 95; // Simulate completion occasionally
if (completed) {
logger.info(`[DEMO /chunk] Simulated completion for ${uploadId}`);
}
return res.json({ bytesReceived: 0, progress: demoProgress, completed }); // Approximate response
}
if (!chunk || chunk.length === 0) {
logger.warn(`[Route /chunk] Received empty chunk for uploadId: ${uploadId}, part ${partNumber}`);
return res.status(400).json({ error: 'Empty chunk received' });
// DEMO MODE CHECK
if (isDemoMode()) {
const { uploadId } = req.params;
logger.debug(`[DEMO] Received chunk for ${uploadId}`);
// Fake progress - requires knowing file size which isn't easily available here in demo
const demoProgress = Math.min(100, Math.random() * 100); // Placeholder
return res.json({ bytesReceived: 0, progress: demoProgress });
}
const { uploadId } = req.params;
let chunk = req.body;
const chunkSize = chunk.length;
if (!chunkSize) return res.status(400).json({ error: 'Empty chunk received' });
try {
// Delegate chunk storage to the adapter
const result = await storageAdapter.storeChunk(uploadId, chunk, partNumber);
// Delegate to storage adapter
// The adapter's storeChunk should handle partNumber for S3 internally
const { bytesReceived, progress, completed } = await storageAdapter.storeChunk(uploadId, chunk);
logger.debug(`[Route /chunk] Stored chunk for ${uploadId}. Progress: ${progress}%, Completed by adapter: ${completed}`);
// If the adapter indicates completion after storing this chunk, finalize the upload
if (result.completed) {
logger.info(`[Route /chunk] Chunk ${partNumber} for ${uploadId} triggered completion. Finalizing...`);
if (completed) {
logger.info(`[Route /chunk] Adapter reported completion for ${uploadId}. Finalizing...`);
try {
const completionResult = await storageAdapter.completeUpload(uploadId);
logger.success(`[Route /chunk] Successfully finalized upload ${uploadId}. Final path/key: ${completionResult.finalPath}`);
// Send final success response (ensure progress is 100)
return res.json({ bytesReceived: result.bytesReceived, progress: 100, completed: true });
} catch (completionError) {
logger.error(`[Route /chunk] CRITICAL: Failed to finalize completed upload ${uploadId} after storing chunk ${partNumber}: ${completionError.message}`, completionError.stack);
// What to return to client? The chunk was stored, but completion failed.
// Return 500, indicating server-side issue during finalization.
return res.status(500).json({ error: 'Upload chunk received, but failed to finalize.', details: completionError.message });
const finalizationResult = await storageAdapter.completeUpload(uploadId);
logger.success(`[Route /chunk] Successfully finalized upload ${uploadId}. Final path/key: ${finalizationResult.finalPath}`);
// The adapter's completeUpload method is responsible for sending notifications and cleaning its metadata.
} catch (completeErr) {
logger.error(`[Route /chunk] CRITICAL: Failed to finalize completed upload ${uploadId} after storing chunk: ${completeErr.message} ${completeErr.stack}`);
// If completeUpload fails, the client might retry the chunk.
// The adapter's storeChunk should be idempotent or handle this.
// We still return the progress of the chunk write to the client.
// The client will likely retry, or the user will see the upload stall at 100% if this was the last chunk.
// Consider what to return to client here. For now, return chunk progress but log server error.
// The 'completed' flag from storeChunk might cause client to stop sending if it thinks it's done.
// If completeUpload fails, maybe the response to client should indicate not fully complete yet?
// Let's return the original progress. The client will retry if needed.
return res.status(500).json({ error: 'Chunk processed but finalization failed on server.', details: completeErr.message, currentProgress: progress });
}
} else {
// Chunk stored, but upload not yet complete, return progress
res.json({ bytesReceived: result.bytesReceived, progress: result.progress, completed: false });
}
res.json({ bytesReceived, progress });
} catch (err) {
logger.error(`[Route /chunk] Chunk upload failed for ${uploadId}, part ${partNumber}: ${err.message}`, err.stack);
// Map common errors
let statusCode = 500;
let clientMessage = 'Failed to process chunk.';
if (err.message.includes('Upload session not found') || err.name === 'NoSuchUpload' || err.code === 'ENOENT') {
statusCode = 404;
clientMessage = 'Upload session not found or already completed/aborted.';
} else if (err.name === 'InvalidPart' || err.name === 'InvalidPartOrder') { // S3 Specific
statusCode = 400;
clientMessage = 'Invalid upload chunk sequence or data.';
} else if (err.name === 'SlowDown') { // S3 Throttling
statusCode = 429;
clientMessage = 'Upload rate limit exceeded by storage provider, please try again later.';
} else if (err.code === 'EACCES' || err.code === 'EPERM' ) { // Local specific
statusCode = 500;
clientMessage = 'Storage permission error while writing chunk.';
logger.error(`[Route /chunk] Chunk upload failed for ${uploadId}: ${err.message} ${err.stack}`);
if (err.message.includes('Upload session not found')) {
return res.status(404).json({ error: 'Upload session not found or already completed', details: err.message });
}
// Add more specific error mapping if needed
res.status(statusCode).json({ error: clientMessage, details: err.message });
// Don't delete adapter's metadata on generic chunk errors, let client retry or adapter's cleanup handle stale entries.
res.status(500).json({ error: 'Failed to process chunk via adapter', details: err.message });
}
});
// Cancel upload
router.post('/cancel/:uploadId', async (req, res) => {
const { uploadId } = req.params;
// DEMO MODE CHECK
if (isDemoMode()) {
logger.info(`[DEMO /cancel] Request received for ${uploadId}`);
// Call demoAdapter.abortUpload(uploadId) if it exists?
return res.json({ message: 'Upload cancelled (Demo)' });
logger.info(`[DEMO] Upload cancelled: ${req.params.uploadId}`);
return res.json({ message: 'Upload cancelled (Demo)' });
}
const { uploadId } = req.params;
logger.info(`[Route /cancel] Received cancel request for upload: ${uploadId}`);
try {
// Delegate cancellation to the storage adapter
await storageAdapter.abortUpload(uploadId);
res.json({ message: 'Upload cancelled successfully or was already inactive.' });
logger.info(`[Route /cancel] Upload ${uploadId} cancelled via storage adapter.`);
res.json({ message: 'Upload cancelled or already complete' });
} catch (err) {
// Abort errors are often less critical, log them but maybe return success anyway
logger.error(`[Route /cancel] Error during upload cancellation for ${uploadId}: ${err.message}`, err.stack);
// Don't necessarily send 500, as the goal is just to stop the upload client-side
// Maybe just return success but log the server-side issue?
// Or return 500 if S3 abort fails significantly? Let's return 500 for now.
res.status(500).json({ error: 'Failed to cancel upload on server.', details: err.message });
logger.error(`[Route /cancel] Error during upload cancellation for ${uploadId}: ${err.message}`);
// Adapters should handle "not found" gracefully or throw specific error
if (err.message.includes('not found')) { // Generic check
return res.status(404).json({ error: 'Upload not found or already processed', details: err.message });
}
res.status(500).json({ error: 'Failed to cancel upload via adapter' });
}
});
// Export the router, remove previous function exports
module.exports = { router };
module.exports = {
router
// Remove internal metadata/batch cleanup exports as they are adapter-specific now or not used by router
};

View File

@@ -25,7 +25,7 @@ async function startServer() {
// Start the server
const server = app.listen(config.port, () => {
logger.info(`Server running at ${config.baseUrl}`);
logger.info(`Upload directory: ${config.uploadDisplayPath}`);
logger.info(`Upload directory (for local adapter state/uploads): ${config.uploadDir}`);
// List directory contents in development
if (config.nodeEnv === 'development') {

View File

@@ -3,590 +3,393 @@
* Handles file operations for storing files on AWS S3 or S3-compatible services.
* Implements the storage interface expected by the application routes.
* Uses local files in '.metadata' directory to track multipart upload progress.
* Buffers individual parts for MPU or entire small files before S3 PUT.
*/
const {
S3Client,
CreateMultipartUploadCommand,
UploadPartCommand,
CompleteMultipartUploadCommand,
AbortMultipartUploadCommand,
ListObjectsV2Command,
GetObjectCommand,
DeleteObjectCommand,
PutObjectCommand // For zero-byte files
} = require('@aws-sdk/client-s3');
const { getSignedUrl } = require("@aws-sdk/s3-request-presigner");
const fs = require('fs').promises;
const fsSync = require('fs'); // For synchronous checks
const path = require('path');
const crypto = require('crypto');
const { config } = require('../config');
const logger = require('../utils/logger');
const {
sanitizePathPreserveDirs,
isValidBatchId,
formatFileSize // Keep for potential future use or consistency
} = require('../utils/fileUtils');
const { sendNotification } = require('../services/notifications'); // Needed for completion
// --- Constants ---
const METADATA_DIR = path.join(config.uploadDir, '.metadata'); // Use local dir for metadata state
const UPLOAD_TIMEOUT = 30 * 60 * 1000; // 30 minutes timeout for stale *local* metadata cleanup
// --- S3 Client Initialization ---
let s3Client;
try {
const s3ClientConfig = {
region: config.s3Region,
credentials: {
accessKeyId: config.s3AccessKeyId,
secretAccessKey: config.s3SecretAccessKey,
},
...(config.s3EndpointUrl && { endpoint: config.s3EndpointUrl }),
...(config.s3ForcePathStyle && { forcePathStyle: true }),
};
if (s3ClientConfig.endpoint) {
logger.info(`[S3 Adapter] Configuring S3 client for endpoint: ${s3ClientConfig.endpoint}`);
}
if (s3ClientConfig.forcePathStyle) {
logger.info(`[S3 Adapter] Configuring S3 client with forcePathStyle: true`);
}
s3Client = new S3Client(s3ClientConfig);
logger.success('[S3 Adapter] S3 Client configured successfully.');
} catch (error) {
logger.error(`[S3 Adapter] Failed to configure S3 client: ${error.message}`);
// This is critical, throw an error to prevent the adapter from being used incorrectly
throw new Error('S3 Client configuration failed. Check S3 environment variables.');
}
// --- Metadata Helper Functions (Adapted for S3, store state locally) ---
async function ensureMetadataDirExists() {
// Reuse logic from local adapter - S3 adapter still needs local dir for state
try {
if (!fsSync.existsSync(METADATA_DIR)) {
await fs.mkdir(METADATA_DIR, { recursive: true });
logger.info(`[S3 Adapter] Created local metadata directory: ${METADATA_DIR}`);
}
await fs.access(METADATA_DIR, fsSync.constants.W_OK);
} catch (err) {
logger.error(`[S3 Adapter] Local metadata directory error (${METADATA_DIR}): ${err.message}`);
throw new Error(`Failed to access or create local metadata directory for S3 adapter state: ${METADATA_DIR}`);
}
}
// Read/Write/Delete functions are identical to localAdapter as they manage local state files
async function readUploadMetadata(uploadId) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.warn(`[S3 Adapter] Attempted to read metadata with invalid uploadId: ${uploadId}`);
return null;
}
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try {
const data = await fs.readFile(metaFilePath, 'utf8');
// Ensure 'parts' is always an array on read
const metadata = JSON.parse(data);
metadata.parts = metadata.parts || [];
return metadata;
} catch (err) {
if (err.code === 'ENOENT') { return null; }
logger.error(`[S3 Adapter] Error reading metadata for ${uploadId}: ${err.message}`);
throw err;
}
}
async function writeUploadMetadata(uploadId, metadata) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.error(`[S3 Adapter] Attempted to write metadata with invalid uploadId: ${uploadId}`);
return;
}
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
metadata.lastActivity = Date.now();
metadata.parts = metadata.parts || []; // Ensure parts array exists
try {
const tempMetaPath = `${metaFilePath}.${crypto.randomBytes(4).toString('hex')}.tmp`;
await fs.writeFile(tempMetaPath, JSON.stringify(metadata, null, 2));
await fs.rename(tempMetaPath, metaFilePath);
} catch (err) {
logger.error(`[S3 Adapter] Error writing metadata for ${uploadId}: ${err.message}`);
try { await fs.unlink(tempMetaPath); } catch (unlinkErr) {/* ignore */}
throw err;
}
}
async function deleteUploadMetadata(uploadId) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.warn(`[S3 Adapter] Attempted to delete metadata with invalid uploadId: ${uploadId}`);
return;
}
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try {
await fs.unlink(metaFilePath);
logger.debug(`[S3 Adapter] Deleted metadata file: ${uploadId}.meta`);
} catch (err) {
if (err.code !== 'ENOENT') {
logger.error(`[S3 Adapter] Error deleting metadata file ${uploadId}.meta: ${err.message}`);
}
}
}
// Ensure metadata dir exists on initialization
ensureMetadataDirExists().catch(err => {
logger.error(`[S3 Adapter] Initialization failed: ${err.message}`);
process.exit(1); // Exit if we can't manage metadata state
S3Client, CreateMultipartUploadCommand, UploadPartCommand,
CompleteMultipartUploadCommand, AbortMultipartUploadCommand, ListObjectsV2Command,
GetObjectCommand, DeleteObjectCommand, PutObjectCommand
} = require('@aws-sdk/client-s3');
const { getSignedUrl } = require("@aws-sdk/s3-request-presigner");
const fs = require('fs').promises;
const fsSync = require('fs'); // For synchronous checks
const path = require('path');
const crypto = require('crypto');
const { config } = require('../config');
const logger = require('../utils/logger');
const { sanitizePathPreserveDirs, isValidBatchId, formatFileSize } = require('../utils/fileUtils');
const { sendNotification } = require('../services/notifications');
// --- Constants ---
const METADATA_DIR = path.join(config.uploadDir, '.metadata');
const TEMP_CHUNK_DIR = path.join(config.uploadDir, '.temp_chunks'); // For buffering parts or small files
const UPLOAD_TIMEOUT = 30 * 60 * 1000; // 30 min local stale cleanup
const MIN_S3_TOTAL_SIZE_FOR_MULTIPART = 5 * 1024 * 1024; // 5MB
const S3_PART_SIZE = 5 * 1024 * 1024; // Min 5MB for S3 parts (except last)
// --- Helper function to convert stream to string ---
async function streamToString(stream) {
if (!stream || typeof stream.pipe !== 'function') return Promise.resolve(null);
return new Promise((resolve, reject) => {
const chunks = [];
stream.on('data', (chunk) => chunks.push(chunk));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
// --- Interface Implementation ---
/**
* Initializes an S3 multipart upload session (or direct put for zero-byte).
* @param {string} filename - Original filename/path from client.
* @param {number} fileSize - Total size of the file.
* @param {string} clientBatchId - Optional batch ID from client.
* @returns {Promise<{uploadId: string}>} Object containing the application's upload ID.
*/
async function initUpload(filename, fileSize, clientBatchId) {
await ensureMetadataDirExists(); // Re-check before operation
const size = Number(fileSize);
const appUploadId = crypto.randomBytes(16).toString('hex'); // Our internal ID
// --- Path handling and Sanitization for S3 Key ---
const sanitizedFilename = sanitizePathPreserveDirs(filename);
// S3 keys should not start with /
const s3Key = path.normalize(sanitizedFilename)
.replace(/^(\.\.(\/|\\|$))+/, '')
.replace(/\\/g, '/')
.replace(/^\/+/, '');
logger.info(`[S3 Adapter] Init request for S3 Key: ${s3Key}`);
// --- Handle Zero-Byte Files ---
if (size === 0) {
try {
const putCommand = new PutObjectCommand({
Bucket: config.s3BucketName,
Key: s3Key,
Body: '', // Empty body
ContentLength: 0
});
await s3Client.send(putCommand);
logger.success(`[S3 Adapter] Completed zero-byte file upload directly: ${s3Key}`);
// No metadata needed for zero-byte files as they are completed atomically
sendNotification(filename, 0, config); // Send notification (use original filename)
// Return an uploadId that won't conflict or be processable by chunk/complete
return { uploadId: `zero-byte-${appUploadId}` }; // Or maybe return null/special status?
// Returning a unique ID might be safer for client state.
} catch (putErr) {
logger.error(`[S3 Adapter] Failed to put zero-byte object ${s3Key}: ${putErr.message}`);
throw putErr; // Let the route handler deal with it
}
let s3Client;
try {
s3Client = new S3Client({
region: config.s3Region,
credentials: { accessKeyId: config.s3AccessKeyId, secretAccessKey: config.s3SecretAccessKey },
...(config.s3EndpointUrl && { endpoint: config.s3EndpointUrl }),
...(config.s3ForcePathStyle && { forcePathStyle: true }),
});
logger.success('[S3 Adapter] S3 Client configured.');
} catch (error) {
logger.error(`[S3 Adapter] Failed to configure S3 client: ${error.message}`);
throw new Error('S3 Client configuration failed.');
}
async function ensureDirExists(dirPath, purpose) {
try {
if (!fsSync.existsSync(dirPath)) {
await fs.mkdir(dirPath, { recursive: true });
logger.info(`[S3 Adapter] Created local ${purpose} directory: ${dirPath}`);
}
}
// --- Initiate Multipart Upload for Non-Zero Files ---
try {
const createCommand = new CreateMultipartUploadCommand({
Bucket: config.s3BucketName,
Key: s3Key,
// TODO: Consider adding ContentType if available/reliable: metadata.contentType
// TODO: Consider adding Metadata: { 'original-filename': filename } ?
});
const response = await s3Client.send(createCommand);
const s3UploadId = response.UploadId;
if (!s3UploadId) {
throw new Error('S3 did not return an UploadId');
}
logger.info(`[S3 Adapter] Initiated multipart upload for ${s3Key} (S3 UploadId: ${s3UploadId})`);
// --- Create and Persist Local Metadata ---
const batchId = clientBatchId || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
const metadata = {
appUploadId: appUploadId, // Store our ID
s3UploadId: s3UploadId,
s3Key: s3Key,
originalFilename: filename, // Keep original for notifications etc.
fileSize: size,
bytesReceived: 0, // Track approximate bytes locally
parts: [], // Array to store { PartNumber, ETag }
batchId,
createdAt: Date.now(),
lastActivity: Date.now()
};
await writeUploadMetadata(appUploadId, metadata); // Write metadata keyed by our appUploadId
return { uploadId: appUploadId }; // Return OUR internal upload ID to the client
} catch (err) {
logger.error(`[S3 Adapter] Failed to initiate multipart upload for ${s3Key}: ${err.message}`);
// TODO: Map specific S3 errors (e.g., NoSuchBucket, AccessDenied) to better client messages
throw err;
}
await fs.access(dirPath, fsSync.constants.W_OK);
} catch (err) {
logger.error(`[S3 Adapter] Local ${purpose} directory error (${dirPath}): ${err.message}`);
throw new Error(`Failed to access/create local ${purpose} directory: ${dirPath}`);
}
/**
* Uploads a chunk as a part to S3.
* @param {string} appUploadId - The application's upload ID.
* @param {Buffer} chunk - The data chunk to store.
* @param {number} partNumber - The sequential number of this part (starting from 1).
* @returns {Promise<{bytesReceived: number, progress: number, completed: boolean}>} Upload status.
*/
async function storeChunk(appUploadId, chunk, partNumber) {
const chunkSize = chunk.length;
if (!chunkSize) throw new Error('Empty chunk received');
if (partNumber < 1) throw new Error('PartNumber must be 1 or greater');
const metadata = await readUploadMetadata(appUploadId);
if (!metadata || !metadata.s3UploadId) { // Check for s3UploadId presence
logger.warn(`[S3 Adapter] Metadata or S3 UploadId not found for chunk: ${appUploadId}. Upload might be complete, cancelled, or zero-byte.`);
throw new Error('Upload session not found or already completed');
}
// --- Sanity Check ---
// S3 handles duplicate part uploads gracefully (last one wins), so less critical than local append.
// We still track bytesReceived locally for progress approximation.
if (metadata.bytesReceived >= metadata.fileSize && metadata.fileSize > 0) {
logger.warn(`[S3 Adapter] Received chunk for already completed upload ${appUploadId}. Ignoring.`);
// Can't really finalize again easily without full parts list. Indicate completion based on local state.
const progress = metadata.fileSize > 0 ? 100 : 0;
return { bytesReceived: metadata.bytesReceived, progress, completed: true };
}
try {
const uploadPartCommand = new UploadPartCommand({
Bucket: config.s3BucketName,
Key: metadata.s3Key,
UploadId: metadata.s3UploadId,
Body: chunk,
PartNumber: partNumber,
ContentLength: chunkSize // Required for UploadPart
});
const response = await s3Client.send(uploadPartCommand);
const etag = response.ETag;
if (!etag) {
throw new Error(`S3 did not return an ETag for PartNumber ${partNumber}`);
}
// --- Update Local Metadata ---
// Ensure parts are stored correctly
metadata.parts = metadata.parts || [];
metadata.parts.push({ PartNumber: partNumber, ETag: etag });
// Sort parts just in case uploads happen out of order client-side (though unlikely with current client)
metadata.parts.sort((a, b) => a.PartNumber - b.PartNumber);
// Update approximate bytes received
metadata.bytesReceived = (metadata.bytesReceived || 0) + chunkSize;
// Cap bytesReceived at fileSize for progress calculation
metadata.bytesReceived = Math.min(metadata.bytesReceived, metadata.fileSize);
await writeUploadMetadata(appUploadId, metadata);
// --- Calculate Progress ---
const progress = metadata.fileSize === 0 ? 100 :
Math.min(Math.round((metadata.bytesReceived / metadata.fileSize) * 100), 100);
logger.debug(`[S3 Adapter] Part ${partNumber} uploaded for ${appUploadId} (ETag: ${etag}). Progress: ~${progress}%`);
// Check for completion potential based on local byte tracking
const completed = metadata.bytesReceived >= metadata.fileSize;
if (completed) {
logger.info(`[S3 Adapter] Upload ${appUploadId} potentially complete based on bytes received.`);
}
return { bytesReceived: metadata.bytesReceived, progress, completed };
} catch (err) {
logger.error(`[S3 Adapter] Failed to upload part ${partNumber} for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
// TODO: Map specific S3 errors (InvalidPart, SlowDown, etc.)
throw err;
}
}
// Metadata functions (read, write, delete) remain largely the same as your last working version.
// Ensure metadata includes: appUploadId, s3Key, originalFilename, fileSize, bytesReceived, batchId,
// createdAt, lastActivity, isMultipartUpload, tempPartPath, s3UploadId (for MPU), parts (for MPU), currentPartBytes (for MPU).
async function readUploadMetadata(uploadId) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return null;
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try {
const data = await fs.readFile(metaFilePath, 'utf8');
const metadata = JSON.parse(data);
metadata.parts = metadata.parts || []; // Ensure parts array exists if MPU
return metadata;
} catch (err) {
if (err.code === 'ENOENT') return null;
logger.error(`[S3 Adapter] Error reading metadata for ${uploadId}: ${err.message}`);
throw err;
}
}
async function writeUploadMetadata(uploadId, metadata) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return;
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
metadata.lastActivity = Date.now();
try {
const tempMetaPath = `${metaFilePath}.${crypto.randomBytes(4).toString('hex')}.tmp`;
await fs.writeFile(tempMetaPath, JSON.stringify(metadata, null, 2));
await fs.rename(tempMetaPath, metaFilePath);
} catch (err) {
logger.error(`[S3 Adapter] Error writing metadata for ${uploadId}: ${err.message}`);
try { await fs.unlink(tempMetaPath); } catch (unlinkErr) {/* ignore */}
throw err;
}
}
async function deleteUploadMetadata(uploadId) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return;
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try { await fs.unlink(metaFilePath); }
catch (err) { if (err.code !== 'ENOENT') logger.error(`[S3 Adapter] Err deleting meta ${uploadId}.meta: ${err.message}`);}
}
Promise.all([
ensureDirExists(METADATA_DIR, 'metadata'),
ensureDirExists(TEMP_CHUNK_DIR, 'part/small file buffering')
]).catch(err => { logger.error(`[S3 Adapter] Critical dir ensure error: ${err.message}`); process.exit(1); });
async function initUpload(filename, fileSize, clientBatchId) {
const size = Number(fileSize);
const appUploadId = crypto.randomBytes(16).toString('hex');
const sanitizedFilename = sanitizePathPreserveDirs(filename);
const s3Key = path.normalize(sanitizedFilename).replace(/^(\.\.(\/|\\|$))+/, '').replace(/\\/g, '/').replace(/^\/+/, '');
logger.info(`[S3 Adapter] Init: Key: ${s3Key}, Size: ${size}`);
const batchId = clientBatchId || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
const tempPartPath = path.join(TEMP_CHUNK_DIR, `${appUploadId}.partbuffer`); // Buffer for current part or small file
const baseMetadata = {
appUploadId, s3Key, originalFilename: filename, fileSize: size,
bytesReceived: 0, batchId, createdAt: Date.now(), lastActivity: Date.now(),
tempPartPath, currentPartBytes: 0, parts: []
};
if (size === 0) {
await s3Client.send(new PutObjectCommand({ Bucket: config.s3BucketName, Key: s3Key, Body: '', ContentLength: 0 }));
logger.success(`[S3 Adapter] Zero-byte uploaded: ${s3Key}`);
sendNotification(filename, 0, config);
return { uploadId: `zero-byte-${appUploadId}` };
}
let metadata;
if (size < MIN_S3_TOTAL_SIZE_FOR_MULTIPART) {
logger.info(`[S3 Adapter] Small file (<${MIN_S3_TOTAL_SIZE_FOR_MULTIPART}B). Will use single PUT.`);
metadata = { ...baseMetadata, isMultipartUpload: false };
} else {
logger.info(`[S3 Adapter] Large file (>=${MIN_S3_TOTAL_SIZE_FOR_MULTIPART}B). Will use MPU.`);
const mpuResponse = await s3Client.send(new CreateMultipartUploadCommand({ Bucket: config.s3BucketName, Key: s3Key }));
if (!mpuResponse.UploadId) throw new Error('S3 did not return UploadId for MPU.');
metadata = { ...baseMetadata, isMultipartUpload: true, s3UploadId: mpuResponse.UploadId };
logger.info(`[S3 Adapter] MPU initiated for ${s3Key}, S3UploadId: ${metadata.s3UploadId}`);
}
await fs.writeFile(tempPartPath, ''); // Create empty buffer file
await writeUploadMetadata(appUploadId, metadata);
logger.info(`[S3 Adapter] Initialized upload ${appUploadId} for ${s3Key}. Temp buffer: ${tempPartPath}. MPU: ${metadata.isMultipartUpload}`);
return { uploadId: appUploadId };
}
async function _uploadBufferedPart(metadata) {
const partBuffer = await fs.readFile(metadata.tempPartPath);
if (partBuffer.length === 0) {
logger.warn(`[S3 Adapter MPU] Attempted to upload empty part for ${metadata.appUploadId}. Skipping.`);
return; // Don't upload empty parts
}
const partNumber = metadata.parts.length + 1;
logger.info(`[S3 Adapter MPU] Uploading part ${partNumber} (${partBuffer.length} bytes) for ${metadata.appUploadId} (Key: ${metadata.s3Key})`);
/**
* Finalizes a completed S3 multipart upload.
* @param {string} appUploadId - The application's upload ID.
* @returns {Promise<{filename: string, size: number, finalPath: string}>} Details of the completed file (finalPath is S3 Key).
*/
async function completeUpload(appUploadId) {
const metadata = await readUploadMetadata(appUploadId);
if (!metadata || !metadata.s3UploadId || !metadata.parts || metadata.parts.length === 0) {
logger.warn(`[S3 Adapter] completeUpload called for ${appUploadId}, but metadata, S3 UploadId, or parts list is missing/empty. Assuming already completed or invalid state.`);
// Check if object exists as a fallback? Risky.
throw new Error('Upload completion failed: Required metadata or parts list not found');
}
const uploadPartCmd = new UploadPartCommand({
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
Body: partBuffer, PartNumber: partNumber, ContentLength: partBuffer.length
});
const partResponse = await s3Client.send(uploadPartCmd);
metadata.parts.push({ PartNumber: partNumber, ETag: partResponse.ETag });
// Basic check if enough bytes were tracked locally (approximate check)
if (metadata.bytesReceived < metadata.fileSize) {
logger.warn(`[S3 Adapter] Attempting to complete upload ${appUploadId} but locally tracked bytes (${metadata.bytesReceived}) are less than expected size (${metadata.fileSize}). Proceeding anyway.`);
}
try {
const completeCommand = new CompleteMultipartUploadCommand({
Bucket: config.s3BucketName,
Key: metadata.s3Key,
UploadId: metadata.s3UploadId,
MultipartUpload: {
Parts: metadata.parts // Use the collected parts { PartNumber, ETag }
},
});
const response = await s3Client.send(completeCommand);
// Example response: { ETag: '"..."', Location: '...', Key: '...', Bucket: '...' }
logger.success(`[S3 Adapter] Finalized multipart upload: ${metadata.s3Key} (ETag: ${response.ETag})`);
// Clean up local metadata AFTER successful S3 completion
await deleteUploadMetadata(appUploadId);
// Send notification
sendNotification(metadata.originalFilename, metadata.fileSize, config);
// Return info consistent with local adapter where possible
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
} catch (err) {
logger.error(`[S3 Adapter] Failed to complete multipart upload for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
// Specific S3 errors like InvalidPartOrder, EntityTooSmall might occur here.
// If Complete fails, S3 *might* have already assembled it (rare).
// Check if the object now exists? If so, maybe delete metadata? Complex recovery.
// For now, just log the error and throw. The local metadata will persist.
if (err.Code === 'NoSuchUpload') {
logger.warn(`[S3 Adapter] CompleteMultipartUpload failed with NoSuchUpload for ${appUploadId}. Assuming already completed or aborted.`);
await deleteUploadMetadata(appUploadId).catch(()=>{}); // Attempt metadata cleanup
// Check if final object exists?
try {
// Use GetObject or HeadObject to check
await s3Client.send(new GetObjectCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key }));
logger.info(`[S3 Adapter] Final object ${metadata.s3Key} exists after NoSuchUpload error. Treating as completed.`);
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
} catch (headErr) {
// Final object doesn't exist either.
throw new Error('Completion failed: Upload session not found and final object does not exist.');
}
}
throw err;
}
// Reset buffer for next part
await fs.writeFile(metadata.tempPartPath, '');
metadata.currentPartBytes = 0;
logger.info(`[S3 Adapter MPU] Part ${partNumber} uploaded for ${metadata.appUploadId}. ETag: ${partResponse.ETag}`);
}
async function storeChunk(appUploadId, chunk) {
const chunkSize = chunk.length;
if (!chunkSize) throw new Error('Empty chunk received for storeChunk');
const metadata = await readUploadMetadata(appUploadId);
if (!metadata) throw new Error(`Upload session ${appUploadId} not found or already completed.`);
await fs.appendFile(metadata.tempPartPath, chunk);
metadata.bytesReceived += chunkSize;
metadata.currentPartBytes += chunkSize;
let justUploadedAPart = false;
if (metadata.isMultipartUpload) {
// Upload part if buffer is full or it's the last overall chunk for the file
const isLastChunkOfFile = metadata.bytesReceived >= metadata.fileSize;
if (metadata.currentPartBytes >= S3_PART_SIZE || (isLastChunkOfFile && metadata.currentPartBytes > 0)) {
await _uploadBufferedPart(metadata);
justUploadedAPart = true; // indicates that currentPartBytes was reset
}
/**
* Aborts an ongoing S3 multipart upload.
* @param {string} appUploadId - The application's upload ID.
* @returns {Promise<void>}
*/
async function abortUpload(appUploadId) {
const metadata = await readUploadMetadata(appUploadId);
if (!metadata || !metadata.s3UploadId) {
logger.warn(`[S3 Adapter] Abort request for non-existent or completed upload: ${appUploadId}`);
await deleteUploadMetadata(appUploadId); // Clean up local metadata if it exists anyway
return;
}
await writeUploadMetadata(appUploadId, metadata); // Persist bytesReceived, parts array, currentPartBytes
const progress = metadata.fileSize === 0 ? 100 : Math.min(Math.round((metadata.bytesReceived / metadata.fileSize) * 100), 100);
const completed = metadata.bytesReceived >= metadata.fileSize;
logger.debug(`[S3 Adapter] Chunk stored for ${appUploadId}. Total ${metadata.bytesReceived}/${metadata.fileSize} (${progress}%). Part buffered: ${metadata.currentPartBytes}. Part uploaded: ${justUploadedAPart}`);
if (completed) logger.info(`[S3 Adapter] All data for ${appUploadId} received locally. Ready for S3 finalization.`);
return { bytesReceived: metadata.bytesReceived, progress, completed };
}
async function completeUpload(appUploadId) {
const metadata = await readUploadMetadata(appUploadId);
if (!metadata) throw new Error(`Cannot complete: Metadata for ${appUploadId} not found.`);
if (metadata.bytesReceived < metadata.fileSize) {
logger.error(`[S3 Adapter] FATAL: Attempt to complete ${appUploadId} with ${metadata.bytesReceived}/${metadata.fileSize} bytes.`);
throw new Error(`Incomplete data for ${appUploadId} cannot be finalized.`);
}
try {
if (metadata.isMultipartUpload) {
// If there's any data left in the buffer for the last part, upload it
if (metadata.currentPartBytes > 0) {
logger.info(`[S3 Adapter MPU] Uploading final remaining buffered part for ${appUploadId}`);
await _uploadBufferedPart(metadata); // This will also update metadata and save it
// Re-read metadata to get the final parts list if _uploadBufferedPart wrote it
const updatedMetadata = await readUploadMetadata(appUploadId);
if (!updatedMetadata) throw new Error("Metadata disappeared after final part upload.");
metadata.parts = updatedMetadata.parts;
}
try {
const abortCommand = new AbortMultipartUploadCommand({
Bucket: config.s3BucketName,
Key: metadata.s3Key,
UploadId: metadata.s3UploadId,
});
await s3Client.send(abortCommand);
logger.info(`[S3 Adapter] Aborted multipart upload: ${appUploadId} (Key: ${metadata.s3Key})`);
} catch (err) {
if (err.name === 'NoSuchUpload') {
logger.warn(`[S3 Adapter] Multipart upload ${appUploadId} (Key: ${metadata.s3Key}) not found during abort. Already aborted or completed.`);
} else {
logger.error(`[S3 Adapter] Failed to abort multipart upload for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
// Don't delete local metadata if abort failed, might be retryable or need manual cleanup
throw err; // Rethrow S3 error
}
if (!metadata.parts || metadata.parts.length === 0 && metadata.fileSize > 0) { // fileSize > 0 check because a 0 byte file could be MPU if MIN_S3_TOTAL_SIZE_FOR_MULTIPART is 0
throw new Error(`No parts recorded for MPU ${appUploadId} of size ${metadata.fileSize}. Cannot complete.`);
}
// Delete local metadata AFTER successful abort or if NoSuchUpload
await deleteUploadMetadata(appUploadId);
const completeCmd = new CompleteMultipartUploadCommand({
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
MultipartUpload: { Parts: metadata.parts },
});
const response = await s3Client.send(completeCmd);
logger.success(`[S3 Adapter MPU] Finalized: ${metadata.s3Key} (ETag: ${response.ETag})`);
} else {
// Single PUT for small files
const fileBuffer = await fs.readFile(metadata.tempPartPath);
if (fileBuffer.length !== metadata.fileSize) {
throw new Error(`Buffered size ${fileBuffer.length} != metadata size ${metadata.fileSize} for ${appUploadId}`);
}
logger.info(`[S3 Adapter SinglePut] Uploading ${metadata.s3Key} via single PutObject from buffer.`);
await s3Client.send(new PutObjectCommand({
Bucket: config.s3BucketName, Key: metadata.s3Key,
Body: fileBuffer, ContentLength: metadata.fileSize
}));
logger.success(`[S3 Adapter SinglePut] Finalized: ${metadata.s3Key}`);
}
/**
* Lists files in the S3 bucket.
* @returns {Promise<Array<{filename: string, size: number, formattedSize: string, uploadDate: Date}>>} List of files.
*/
async function listFiles() {
try {
const command = new ListObjectsV2Command({
Bucket: config.s3BucketName,
// Optional: Add Prefix if you want to list within a specific 'folder'
// Prefix: 'uploads/'
});
// TODO: Add pagination handling if expecting >1000 objects
const response = await s3Client.send(command);
const files = (response.Contents || [])
// Optional: Filter out objects that might represent folders if necessary
// .filter(item => !(item.Key.endsWith('/') && item.Size === 0))
.map(item => ({
filename: item.Key, // S3 Key is the filename/path
size: item.Size,
formattedSize: formatFileSize(item.Size), // Use utility
uploadDate: item.LastModified
}));
// Sort by date, newest first
files.sort((a, b) => b.uploadDate.getTime() - a.uploadDate.getTime());
return files;
} catch (err) {
logger.error(`[S3 Adapter] Failed to list objects in bucket ${config.s3BucketName}: ${err.message}`);
throw err;
}
await fs.unlink(metadata.tempPartPath).catch(err => logger.warn(`[S3 Adapter] Failed to delete temp part buffer ${metadata.tempPartPath}: ${err.message}`));
await deleteUploadMetadata(appUploadId);
sendNotification(metadata.originalFilename, metadata.fileSize, config);
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
} catch (err) {
logger.error(`[S3 Adapter] Failed to complete S3 upload for ${appUploadId} (Key: ${metadata.s3Key}, MPU: ${metadata.isMultipartUpload}): ${err.message} ${err.name}`);
if (err && err.$response) {
const responseBody = err.$response.body ? await streamToString(err.$response.body) : "No body/streamed";
console.error("[S3 Adapter] Raw Error $response:", {statusCode: err.$response.statusCode, headers: err.$response.headers, body: responseBody});
} else {
console.error("[S3 Adapter] Error object (no $response):", JSON.stringify(err, Object.getOwnPropertyNames(err), 2));
}
/**
* Generates a presigned URL for downloading an S3 object.
* @param {string} s3Key - The S3 Key (filename/path) of the object.
* @returns {Promise<{type: string, value: string}>} Object indicating type ('url') and value (the presigned URL).
*/
async function getDownloadUrlOrStream(s3Key) {
// Input `s3Key` is assumed to be sanitized by the calling route/logic
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) {
logger.error(`[S3 Adapter] Invalid S3 key detected for download: ${s3Key}`);
throw new Error('Invalid filename');
}
try {
const command = new GetObjectCommand({
Bucket: config.s3BucketName,
Key: s3Key,
// Optional: Override response headers like filename
// ResponseContentDisposition: `attachment; filename="${path.basename(s3Key)}"`
});
// Generate presigned URL (expires in 1 hour by default, adjustable)
const url = await getSignedUrl(s3Client, command, { expiresIn: 3600 });
logger.info(`[S3 Adapter] Generated presigned URL for ${s3Key}`);
return { type: 'url', value: url };
} catch (err) {
logger.error(`[S3 Adapter] Failed to generate presigned URL for ${s3Key}: ${err.message}`);
if (err.name === 'NoSuchKey') {
throw new Error('File not found in S3');
}
throw err; // Re-throw other S3 errors
}
if (metadata.isMultipartUpload && metadata.s3UploadId) {
logger.warn(`[S3 Adapter MPU] Attempting to abort failed MPU ${metadata.s3UploadId}`);
await s3Client.send(new AbortMultipartUploadCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId }))
.catch(abortErr => logger.error(`[S3 Adapter MPU] Failed to abort MPU after error: ${abortErr.message}`));
}
/**
* Deletes an object from the S3 bucket.
* @param {string} s3Key - The S3 Key (filename/path) of the object to delete.
* @returns {Promise<void>}
*/
async function deleteFile(s3Key) {
// Input `s3Key` is assumed to be sanitized
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) {
logger.error(`[S3 Adapter] Invalid S3 key detected for delete: ${s3Key}`);
throw new Error('Invalid filename');
}
try {
const command = new DeleteObjectCommand({
Bucket: config.s3BucketName,
Key: s3Key,
});
await s3Client.send(command);
logger.info(`[S3 Adapter] Deleted object: ${s3Key}`);
} catch (err) {
// DeleteObject is idempotent, so NoSuchKey isn't typically an error unless you need to know.
logger.error(`[S3 Adapter] Failed to delete object ${s3Key}: ${err.message}`);
throw err;
}
throw err;
}
}
async function abortUpload(appUploadId) {
const metadata = await readUploadMetadata(appUploadId);
if (!metadata) {
logger.warn(`[S3 Adapter] Abort: Metadata for ${appUploadId} not found.`);
await deleteUploadMetadata(appUploadId); // ensure local meta is gone
const tempPartPath = path.join(TEMP_CHUNK_DIR, `${appUploadId}.partbuffer`); // guess path
await fs.unlink(tempPartPath).catch(()=>{}); // try delete orphan buffer
return;
}
if (metadata.tempPartPath) {
await fs.unlink(metadata.tempPartPath)
.then(() => logger.info(`[S3 Adapter] Deleted temp part buffer on abort: ${metadata.tempPartPath}`))
.catch(err => { if (err.code !== 'ENOENT') logger.error(`[S3 Adapter] Err deleting temp buffer ${metadata.tempPartPath} on abort: ${err.message}`);});
}
if (metadata.isMultipartUpload && metadata.s3UploadId) {
try {
await s3Client.send(new AbortMultipartUploadCommand({
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
}));
logger.info(`[S3 Adapter MPU] Aborted S3 MPU: ${metadata.s3UploadId}`);
} catch (err) {
if (err.name !== 'NoSuchUpload') { logger.error(`[S3 Adapter MPU] Failed to abort S3 MPU ${metadata.s3UploadId}: ${err.message}`);}
else {logger.warn(`[S3 Adapter MPU] S3 MPU ${metadata.s3UploadId} not found during abort (already gone).`);}
}
/**
* Cleans up stale *local* metadata files for S3 uploads.
* Relies on S3 Lifecycle Policies for actual S3 cleanup.
* @returns {Promise<void>}
*/
async function cleanupStale() {
logger.info('[S3 Adapter] Running cleanup for stale local metadata files...');
let cleanedCount = 0;
let checkedCount = 0;
}
await deleteUploadMetadata(appUploadId);
}
async function listFiles() {
try {
const command = new ListObjectsV2Command({ Bucket: config.s3BucketName });
const response = await s3Client.send(command);
const files = (response.Contents || [])
.map(item => ({
filename: item.Key, size: item.Size,
formattedSize: formatFileSize(item.Size), uploadDate: item.LastModified
})).sort((a, b) => b.uploadDate.getTime() - a.uploadDate.getTime());
return files;
} catch (err) { logger.error(`[S3 Adapter] Failed to list S3 objects: ${err.message}`); throw err; }
}
async function getDownloadUrlOrStream(s3Key) {
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid S3 key for download');
try {
const url = await getSignedUrl(s3Client, new GetObjectCommand({ Bucket: config.s3BucketName, Key: s3Key }), { expiresIn: 3600 });
logger.info(`[S3 Adapter] Generated presigned URL for ${s3Key}`);
return { type: 'url', value: url };
} catch (err) {
logger.error(`[S3 Adapter] Failed to get presigned URL for ${s3Key}: ${err.message}`);
if (err.name === 'NoSuchKey') throw new Error('File not found in S3');
throw err;
}
}
async function deleteFile(s3Key) {
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid S3 key for delete');
try {
await s3Client.send(new DeleteObjectCommand({ Bucket: config.s3BucketName, Key: s3Key }));
logger.info(`[S3 Adapter] Deleted S3 object: ${s3Key}`);
} catch (err) { logger.error(`[S3 Adapter] Failed to delete S3 object ${s3Key}: ${err.message}`); throw err; }
}
async function cleanupStale() {
logger.info('[S3 Adapter] Cleaning stale local metadata & temp part buffers...');
let cleanedMeta = 0, checkedMeta = 0, cleanedBuffers = 0;
const now = Date.now();
try { // Stale .meta files
const metaFiles = await fs.readdir(METADATA_DIR);
for (const file of metaFiles) {
if (!file.endsWith('.meta')) continue;
checkedMeta++;
const appUploadId = file.replace('.meta', '');
const metaFilePath = path.join(METADATA_DIR, file);
try {
await ensureMetadataDirExists(); // Re-check
const files = await fs.readdir(METADATA_DIR);
const now = Date.now();
for (const file of files) {
if (file.endsWith('.meta')) {
checkedCount++;
const appUploadId = file.replace('.meta', '');
const metaFilePath = path.join(METADATA_DIR, file);
try {
const data = await fs.readFile(metaFilePath, 'utf8');
const metadata = JSON.parse(data);
// Check inactivity based on local metadata timestamp
if (now - (metadata.lastActivity || metadata.createdAt || 0) > UPLOAD_TIMEOUT) {
logger.warn(`[S3 Adapter] Found stale local metadata: ${file}. Last activity: ${new Date(metadata.lastActivity || metadata.createdAt)}. S3 UploadId: ${metadata.s3UploadId || 'N/A'}`);
// Only delete the LOCAL metadata file. DO NOT ABORT S3 UPLOAD HERE.
await deleteUploadMetadata(appUploadId); // Use helper
cleanedCount++;
}
} catch (readErr) {
logger.error(`[S3 Adapter] Error reading/parsing local metadata ${metaFilePath} during cleanup: ${readErr.message}. Skipping.`);
await fs.unlink(metaFilePath).catch(()=>{ logger.warn(`[S3 Adapter] Failed to delete potentially corrupt local metadata file: ${metaFilePath}`) });
}
} else if (file.endsWith('.tmp')) {
// Clean up potential leftover temp metadata files (same as local adapter)
const tempMetaPath = path.join(METADATA_DIR, file);
try {
const stats = await fs.stat(tempMetaPath);
if (now - stats.mtime.getTime() > UPLOAD_TIMEOUT) {
logger.warn(`[S3 Adapter] Deleting stale temporary local metadata file: ${file}`);
await fs.unlink(tempMetaPath);
}
} catch (statErr) {
if (statErr.code !== 'ENOENT') {
logger.error(`[S3 Adapter] Error checking temp local metadata file ${tempMetaPath}: ${statErr.message}`);
}
}
const metadata = JSON.parse(await fs.readFile(metaFilePath, 'utf8'));
if (now - (metadata.lastActivity || metadata.createdAt || 0) > UPLOAD_TIMEOUT) {
logger.warn(`[S3 Adapter] Stale meta: ${file}. AppUploadId: ${appUploadId}. Cleaning.`);
if (metadata.tempPartPath) {
await fs.unlink(metadata.tempPartPath).then(()=>cleanedBuffers++).catch(()=>{});
}
if (metadata.isMultipartUpload && metadata.s3UploadId) {
await s3Client.send(new AbortMultipartUploadCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId }))
.catch(e => {if (e.name !== 'NoSuchUpload') logger.error(`Stale MPU abort fail: ${e.message}`);});
}
await fs.unlink(metaFilePath);
cleanedMeta++;
}
if (checkedCount > 0 || cleanedCount > 0) {
logger.info(`[S3 Adapter] Local metadata cleanup finished. Checked: ${checkedCount}, Cleaned stale local files: ${cleanedCount}.`);
}
// Log the crucial recommendation
logger.warn(`[S3 Adapter] IMPORTANT: For S3 storage, configure Lifecycle Rules on your bucket (${config.s3BucketName}) or use provider-specific tools to automatically clean up incomplete multipart uploads after a few days. This adapter only cleans up local tracking files.`);
} catch (err) {
if (err.code === 'ENOENT' && err.path === METADATA_DIR) {
logger.warn('[S3 Adapter] Local metadata directory not found during cleanup scan.');
} else {
logger.error(`[S3 Adapter] Error during local metadata cleanup scan: ${err.message}`);
}
} catch (e) { logger.error(`Error processing stale meta ${file}: ${e.message}. Deleting.`); await fs.unlink(metaFilePath).catch(()=>{});}
}
} catch (e) { if (e.code !== 'ENOENT') logger.error(`Meta dir cleanup error: ${e.message}`);}
try { // Orphaned .partbuffer files
const bufferFiles = await fs.readdir(TEMP_CHUNK_DIR);
for (const file of bufferFiles) {
if (!file.endsWith('.partbuffer')) continue;
const bufferFilePath = path.join(TEMP_CHUNK_DIR, file);
const stats = await fs.stat(bufferFilePath);
if (now - stats.mtime.getTime() > UPLOAD_TIMEOUT) {
logger.warn(`[S3 Adapter] Stale orphaned buffer: ${file}. Deleting.`);
await fs.unlink(bufferFilePath);
cleanedBuffers++;
}
}
module.exports = {
initUpload,
storeChunk,
completeUpload,
abortUpload,
listFiles,
getDownloadUrlOrStream,
deleteFile,
cleanupStale
};
} catch (e) { if (e.code !== 'ENOENT') logger.error(`Temp buffer dir cleanup error: ${e.message}`);}
if (checkedMeta || cleanedMeta || cleanedBuffers) {
logger.info(`[S3 Adapter] Local cleanup: MetaChecked: ${checkedMeta}, MetaCleaned: ${cleanedMeta}, BuffersCleaned: ${cleanedBuffers}.`);
}
logger.warn(`[S3 Adapter] Reminder: Configure S3 Lifecycle Rules on bucket '${config.s3BucketName}' for S3-side MPU cleanup.`);
}
module.exports = {
initUpload, storeChunk, completeUpload, abortUpload,
listFiles, getDownloadUrlOrStream, deleteFile, cleanupStale
};