feat(storage): Enhance S3 adapter with unique folder prefix handling

- Added functionality to ensure top-level folder prefixes are unique per batch to prevent collisions during uploads.
- Improved error logging for S3 operations, including detailed context for failures.
- Refactored metadata management functions to streamline the upload process and ensure robust handling of local metadata.
- Updated the S3 client initialization and metadata directory management for better reliability.

This commit enhances the S3 adapter's capability to manage file uploads more effectively, improving user experience and reducing potential conflicts during concurrent uploads.
This commit is contained in:
greirson
2025-05-06 10:50:33 -07:00
parent c24e866074
commit 1273fe92b1

View File

@@ -3,590 +3,444 @@
* Handles file operations for storing files on AWS S3 or S3-compatible services. * Handles file operations for storing files on AWS S3 or S3-compatible services.
* Implements the storage interface expected by the application routes. * Implements the storage interface expected by the application routes.
* Uses local files in '.metadata' directory to track multipart upload progress. * Uses local files in '.metadata' directory to track multipart upload progress.
* Attempts to make top-level folder prefixes unique per batch if collisions occur.
*/ */
const { const {
S3Client, S3Client,
CreateMultipartUploadCommand, CreateMultipartUploadCommand,
UploadPartCommand, UploadPartCommand,
CompleteMultipartUploadCommand, CompleteMultipartUploadCommand,
AbortMultipartUploadCommand, AbortMultipartUploadCommand,
ListObjectsV2Command, ListObjectsV2Command,
GetObjectCommand, GetObjectCommand,
DeleteObjectCommand, DeleteObjectCommand,
PutObjectCommand // For zero-byte files PutObjectCommand,
} = require('@aws-sdk/client-s3'); HeadObjectCommand
const { getSignedUrl } = require("@aws-sdk/s3-request-presigner"); } = require('@aws-sdk/client-s3');
const fs = require('fs').promises; const { getSignedUrl } = require("@aws-sdk/s3-request-presigner");
const fsSync = require('fs'); // For synchronous checks const fs = require('fs').promises;
const path = require('path'); const fsSync = require('fs');
const crypto = require('crypto'); const path = require('path');
const { config } = require('../config'); const crypto = require('crypto');
const logger = require('../utils/logger'); const util = require('util'); // For detailed error logging
const { const { config } = require('../config');
sanitizePathPreserveDirs, const logger = require('../utils/logger');
isValidBatchId, const {
formatFileSize // Keep for potential future use or consistency sanitizePathPreserveDirs,
} = require('../utils/fileUtils'); formatFileSize
const { sendNotification } = require('../services/notifications'); // Needed for completion } = require('../utils/fileUtils');
const { sendNotification } = require('../services/notifications');
// --- Constants --- const METADATA_DIR = path.join(config.uploadDir, '.metadata');
const METADATA_DIR = path.join(config.uploadDir, '.metadata'); // Use local dir for metadata state const UPLOAD_TIMEOUT = 30 * 60 * 1000; // For local metadata cleanup
const UPLOAD_TIMEOUT = 30 * 60 * 1000; // 30 minutes timeout for stale *local* metadata cleanup
// --- S3 Client Initialization --- // --- S3 Client Initialization ---
let s3Client; let s3Client;
try { try {
const s3ClientConfig = { const s3ClientConfig = {
region: config.s3Region, region: config.s3Region,
credentials: { credentials: {
accessKeyId: config.s3AccessKeyId, accessKeyId: config.s3AccessKeyId,
secretAccessKey: config.s3SecretAccessKey, secretAccessKey: config.s3SecretAccessKey,
}, },
...(config.s3EndpointUrl && { endpoint: config.s3EndpointUrl }), ...(config.s3EndpointUrl && { endpoint: config.s3EndpointUrl }),
...(config.s3ForcePathStyle && { forcePathStyle: true }), ...(config.s3ForcePathStyle && { forcePathStyle: true }),
}; };
if (s3ClientConfig.endpoint) logger.info(`[S3 Adapter] Configuring S3 client for endpoint: ${s3ClientConfig.endpoint}`);
if (s3ClientConfig.forcePathStyle) logger.info(`[S3 Adapter] Configuring S3 client with forcePathStyle: true`);
s3Client = new S3Client(s3ClientConfig);
logger.success('[S3 Adapter] S3 Client configured successfully.');
} catch (error) {
logger.error(`[S3 Adapter] Failed to configure S3 client: ${error.message}`);
throw new Error('S3 Client configuration failed. Check S3 environment variables.');
}
if (s3ClientConfig.endpoint) { // --- Metadata Helper Functions ---
logger.info(`[S3 Adapter] Configuring S3 client for endpoint: ${s3ClientConfig.endpoint}`); async function ensureMetadataDirExists() {
} try {
if (s3ClientConfig.forcePathStyle) { if (!fsSync.existsSync(METADATA_DIR)) {
logger.info(`[S3 Adapter] Configuring S3 client with forcePathStyle: true`); await fs.mkdir(METADATA_DIR, { recursive: true });
} logger.info(`[S3 Adapter] Created local metadata directory: ${METADATA_DIR}`);
}
await fs.access(METADATA_DIR, fsSync.constants.W_OK);
} catch (err) {
logger.error(`[S3 Adapter] Local metadata directory error (${METADATA_DIR}): ${err.message}`);
throw new Error(`Failed to access or create local metadata directory: ${METADATA_DIR}`);
}
}
s3Client = new S3Client(s3ClientConfig); async function readUploadMetadata(uploadId) {
logger.success('[S3 Adapter] S3 Client configured successfully.'); if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
} catch (error) {
logger.error(`[S3 Adapter] Failed to configure S3 client: ${error.message}`);
// This is critical, throw an error to prevent the adapter from being used incorrectly
throw new Error('S3 Client configuration failed. Check S3 environment variables.');
}
// --- Metadata Helper Functions (Adapted for S3, store state locally) ---
async function ensureMetadataDirExists() {
// Reuse logic from local adapter - S3 adapter still needs local dir for state
try {
if (!fsSync.existsSync(METADATA_DIR)) {
await fs.mkdir(METADATA_DIR, { recursive: true });
logger.info(`[S3 Adapter] Created local metadata directory: ${METADATA_DIR}`);
}
await fs.access(METADATA_DIR, fsSync.constants.W_OK);
} catch (err) {
logger.error(`[S3 Adapter] Local metadata directory error (${METADATA_DIR}): ${err.message}`);
throw new Error(`Failed to access or create local metadata directory for S3 adapter state: ${METADATA_DIR}`);
}
}
// Read/Write/Delete functions are identical to localAdapter as they manage local state files
async function readUploadMetadata(uploadId) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.warn(`[S3 Adapter] Attempted to read metadata with invalid uploadId: ${uploadId}`); logger.warn(`[S3 Adapter] Attempted to read metadata with invalid uploadId: ${uploadId}`);
return null; return null;
}
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try {
const data = await fs.readFile(metaFilePath, 'utf8');
// Ensure 'parts' is always an array on read
const metadata = JSON.parse(data);
metadata.parts = metadata.parts || [];
return metadata;
} catch (err) {
if (err.code === 'ENOENT') { return null; }
logger.error(`[S3 Adapter] Error reading metadata for ${uploadId}: ${err.message}`);
throw err;
}
} }
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try {
const data = await fs.readFile(metaFilePath, 'utf8');
const metadata = JSON.parse(data);
metadata.parts = metadata.parts || [];
return metadata;
} catch (err) {
if (err.code === 'ENOENT') return null;
logger.error(`[S3 Adapter] Error reading metadata for ${uploadId}: ${err.message}`);
throw err;
}
}
async function writeUploadMetadata(uploadId, metadata) { async function writeUploadMetadata(uploadId, metadata) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) { if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.error(`[S3 Adapter] Attempted to write metadata with invalid uploadId: ${uploadId}`); logger.error(`[S3 Adapter] Attempted to write metadata with invalid uploadId: ${uploadId}`);
return; return;
}
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
metadata.lastActivity = Date.now();
metadata.parts = metadata.parts || []; // Ensure parts array exists
try {
const tempMetaPath = `${metaFilePath}.${crypto.randomBytes(4).toString('hex')}.tmp`;
await fs.writeFile(tempMetaPath, JSON.stringify(metadata, null, 2));
await fs.rename(tempMetaPath, metaFilePath);
} catch (err) {
logger.error(`[S3 Adapter] Error writing metadata for ${uploadId}: ${err.message}`);
try { await fs.unlink(tempMetaPath); } catch (unlinkErr) {/* ignore */}
throw err;
}
} }
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
metadata.lastActivity = Date.now();
metadata.parts = metadata.parts || [];
try {
const tempMetaPath = `${metaFilePath}.${crypto.randomBytes(4).toString('hex')}.tmp`;
await fs.writeFile(tempMetaPath, JSON.stringify(metadata, null, 2));
await fs.rename(tempMetaPath, metaFilePath);
} catch (err) {
logger.error(`[S3 Adapter] Error writing metadata for ${uploadId}: ${err.message}`);
try { await fs.unlink(tempMetaPath); } catch (unlinkErr) {/* ignore */}
throw err;
}
}
async function deleteUploadMetadata(uploadId) { async function deleteUploadMetadata(uploadId) {
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) { if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
logger.warn(`[S3 Adapter] Attempted to delete metadata with invalid uploadId: ${uploadId}`); logger.warn(`[S3 Adapter] Attempted to delete metadata with invalid uploadId: ${uploadId}`);
return; return;
} }
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`); const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
try { try {
await fs.unlink(metaFilePath); await fs.unlink(metaFilePath);
logger.debug(`[S3 Adapter] Deleted metadata file: ${uploadId}.meta`); logger.debug(`[S3 Adapter] Deleted metadata file: ${uploadId}.meta`);
} catch (err) { } catch (err) {
if (err.code !== 'ENOENT') { if (err.code !== 'ENOENT') logger.error(`[S3 Adapter] Error deleting metadata file ${uploadId}.meta: ${err.message}`);
logger.error(`[S3 Adapter] Error deleting metadata file ${uploadId}.meta: ${err.message}`); }
}
ensureMetadataDirExists().catch(err => {
logger.error(`[S3 Adapter] Initialization failed (metadata dir): ${err.message}`);
process.exit(1);
});
// --- S3 Object/Prefix Utilities ---
const batchS3PrefixMappings = new Map(); // In-memory: originalTopLevelFolder-batchId -> actualS3Prefix
async function s3ObjectExists(key) {
logger.info(`[S3 Adapter] s3ObjectExists: Checking key "${key}"`);
try {
await s3Client.send(new HeadObjectCommand({ Bucket: config.s3BucketName, Key: key }));
logger.info(`[S3 Adapter] s3ObjectExists: HeadObject success for key "${key}". Key EXISTS.`);
return true;
} catch (error) {
if (error.name === 'NotFound' || error.name === 'NoSuchKey' || (error.$metadata && error.$metadata.httpStatusCode === 404)) {
logger.info(`[S3 Adapter] s3ObjectExists: Key "${key}" NOT found (404-like error).`);
return false;
} }
} if (error.name === '403' || (error.$metadata && error.$metadata.httpStatusCode === 403)) {
logger.warn(`[S3 Adapter] s3ObjectExists: Received 403 Forbidden for key "${key}". For unique key generation, treating as 'likely does not exist'.`);
return false;
}
logger.error(`[S3 Adapter DEBUG] Full error object for HeadObject on key "${key}":\n`, util.inspect(error, { showHidden: false, depth: null, colors: false }));
logger.error(`[S3 Adapter] s3ObjectExists: Unhandled error type "${error.name}" for key "${key}": ${error.message}`);
throw error;
}
}
async function getUniqueS3FolderPrefix(originalPrefix, batchId) {
if (!originalPrefix || !originalPrefix.endsWith('/')) {
logger.error("[S3 Adapter] getUniqueS3FolderPrefix: originalPrefix must be a non-empty string ending with '/'");
return originalPrefix; // Or throw error
}
const prefixMapKey = `${originalPrefix}-${batchId}`;
if (batchS3PrefixMappings.has(prefixMapKey)) {
return batchS3PrefixMappings.get(prefixMapKey);
} }
// Ensure metadata dir exists on initialization let currentPrefixToCheck = originalPrefix;
ensureMetadataDirExists().catch(err => { let counter = 1;
logger.error(`[S3 Adapter] Initialization failed: ${err.message}`); const baseName = originalPrefix.slice(0, -1); // "MyFolder" from "MyFolder/"
process.exit(1); // Exit if we can't manage metadata state
});
async function prefixHasObjects(prefix) {
// --- Interface Implementation ---
/**
* Initializes an S3 multipart upload session (or direct put for zero-byte).
* @param {string} filename - Original filename/path from client.
* @param {number} fileSize - Total size of the file.
* @param {string} clientBatchId - Optional batch ID from client.
* @returns {Promise<{uploadId: string}>} Object containing the application's upload ID.
*/
async function initUpload(filename, fileSize, clientBatchId) {
await ensureMetadataDirExists(); // Re-check before operation
const size = Number(fileSize);
const appUploadId = crypto.randomBytes(16).toString('hex'); // Our internal ID
// --- Path handling and Sanitization for S3 Key ---
const sanitizedFilename = sanitizePathPreserveDirs(filename);
// S3 keys should not start with /
const s3Key = path.normalize(sanitizedFilename)
.replace(/^(\.\.(\/|\\|$))+/, '')
.replace(/\\/g, '/')
.replace(/^\/+/, '');
logger.info(`[S3 Adapter] Init request for S3 Key: ${s3Key}`);
// --- Handle Zero-Byte Files ---
if (size === 0) {
try { try {
const putCommand = new PutObjectCommand({ const listResponse = await s3Client.send(new ListObjectsV2Command({
Bucket: config.s3BucketName, Bucket: config.s3BucketName, Prefix: prefix, MaxKeys: 1
Key: s3Key,
Body: '', // Empty body
ContentLength: 0
});
await s3Client.send(putCommand);
logger.success(`[S3 Adapter] Completed zero-byte file upload directly: ${s3Key}`);
// No metadata needed for zero-byte files as they are completed atomically
sendNotification(filename, 0, config); // Send notification (use original filename)
// Return an uploadId that won't conflict or be processable by chunk/complete
return { uploadId: `zero-byte-${appUploadId}` }; // Or maybe return null/special status?
// Returning a unique ID might be safer for client state.
} catch (putErr) {
logger.error(`[S3 Adapter] Failed to put zero-byte object ${s3Key}: ${putErr.message}`);
throw putErr; // Let the route handler deal with it
}
}
// --- Initiate Multipart Upload for Non-Zero Files ---
try {
const createCommand = new CreateMultipartUploadCommand({
Bucket: config.s3BucketName,
Key: s3Key,
// TODO: Consider adding ContentType if available/reliable: metadata.contentType
// TODO: Consider adding Metadata: { 'original-filename': filename } ?
});
const response = await s3Client.send(createCommand);
const s3UploadId = response.UploadId;
if (!s3UploadId) {
throw new Error('S3 did not return an UploadId');
}
logger.info(`[S3 Adapter] Initiated multipart upload for ${s3Key} (S3 UploadId: ${s3UploadId})`);
// --- Create and Persist Local Metadata ---
const batchId = clientBatchId || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
const metadata = {
appUploadId: appUploadId, // Store our ID
s3UploadId: s3UploadId,
s3Key: s3Key,
originalFilename: filename, // Keep original for notifications etc.
fileSize: size,
bytesReceived: 0, // Track approximate bytes locally
parts: [], // Array to store { PartNumber, ETag }
batchId,
createdAt: Date.now(),
lastActivity: Date.now()
};
await writeUploadMetadata(appUploadId, metadata); // Write metadata keyed by our appUploadId
return { uploadId: appUploadId }; // Return OUR internal upload ID to the client
} catch (err) {
logger.error(`[S3 Adapter] Failed to initiate multipart upload for ${s3Key}: ${err.message}`);
// TODO: Map specific S3 errors (e.g., NoSuchBucket, AccessDenied) to better client messages
throw err;
}
}
/**
* Uploads a chunk as a part to S3.
* @param {string} appUploadId - The application's upload ID.
* @param {Buffer} chunk - The data chunk to store.
* @param {number} partNumber - The sequential number of this part (starting from 1).
* @returns {Promise<{bytesReceived: number, progress: number, completed: boolean}>} Upload status.
*/
async function storeChunk(appUploadId, chunk, partNumber) {
const chunkSize = chunk.length;
if (!chunkSize) throw new Error('Empty chunk received');
if (partNumber < 1) throw new Error('PartNumber must be 1 or greater');
const metadata = await readUploadMetadata(appUploadId);
if (!metadata || !metadata.s3UploadId) { // Check for s3UploadId presence
logger.warn(`[S3 Adapter] Metadata or S3 UploadId not found for chunk: ${appUploadId}. Upload might be complete, cancelled, or zero-byte.`);
throw new Error('Upload session not found or already completed');
}
// --- Sanity Check ---
// S3 handles duplicate part uploads gracefully (last one wins), so less critical than local append.
// We still track bytesReceived locally for progress approximation.
if (metadata.bytesReceived >= metadata.fileSize && metadata.fileSize > 0) {
logger.warn(`[S3 Adapter] Received chunk for already completed upload ${appUploadId}. Ignoring.`);
// Can't really finalize again easily without full parts list. Indicate completion based on local state.
const progress = metadata.fileSize > 0 ? 100 : 0;
return { bytesReceived: metadata.bytesReceived, progress, completed: true };
}
try {
const uploadPartCommand = new UploadPartCommand({
Bucket: config.s3BucketName,
Key: metadata.s3Key,
UploadId: metadata.s3UploadId,
Body: chunk,
PartNumber: partNumber,
ContentLength: chunkSize // Required for UploadPart
});
const response = await s3Client.send(uploadPartCommand);
const etag = response.ETag;
if (!etag) {
throw new Error(`S3 did not return an ETag for PartNumber ${partNumber}`);
}
// --- Update Local Metadata ---
// Ensure parts are stored correctly
metadata.parts = metadata.parts || [];
metadata.parts.push({ PartNumber: partNumber, ETag: etag });
// Sort parts just in case uploads happen out of order client-side (though unlikely with current client)
metadata.parts.sort((a, b) => a.PartNumber - b.PartNumber);
// Update approximate bytes received
metadata.bytesReceived = (metadata.bytesReceived || 0) + chunkSize;
// Cap bytesReceived at fileSize for progress calculation
metadata.bytesReceived = Math.min(metadata.bytesReceived, metadata.fileSize);
await writeUploadMetadata(appUploadId, metadata);
// --- Calculate Progress ---
const progress = metadata.fileSize === 0 ? 100 :
Math.min(Math.round((metadata.bytesReceived / metadata.fileSize) * 100), 100);
logger.debug(`[S3 Adapter] Part ${partNumber} uploaded for ${appUploadId} (ETag: ${etag}). Progress: ~${progress}%`);
// Check for completion potential based on local byte tracking
const completed = metadata.bytesReceived >= metadata.fileSize;
if (completed) {
logger.info(`[S3 Adapter] Upload ${appUploadId} potentially complete based on bytes received.`);
}
return { bytesReceived: metadata.bytesReceived, progress, completed };
} catch (err) {
logger.error(`[S3 Adapter] Failed to upload part ${partNumber} for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
// TODO: Map specific S3 errors (InvalidPart, SlowDown, etc.)
throw err;
}
}
/**
* Finalizes a completed S3 multipart upload.
* @param {string} appUploadId - The application's upload ID.
* @returns {Promise<{filename: string, size: number, finalPath: string}>} Details of the completed file (finalPath is S3 Key).
*/
async function completeUpload(appUploadId) {
const metadata = await readUploadMetadata(appUploadId);
if (!metadata || !metadata.s3UploadId || !metadata.parts || metadata.parts.length === 0) {
logger.warn(`[S3 Adapter] completeUpload called for ${appUploadId}, but metadata, S3 UploadId, or parts list is missing/empty. Assuming already completed or invalid state.`);
// Check if object exists as a fallback? Risky.
throw new Error('Upload completion failed: Required metadata or parts list not found');
}
// Basic check if enough bytes were tracked locally (approximate check)
if (metadata.bytesReceived < metadata.fileSize) {
logger.warn(`[S3 Adapter] Attempting to complete upload ${appUploadId} but locally tracked bytes (${metadata.bytesReceived}) are less than expected size (${metadata.fileSize}). Proceeding anyway.`);
}
try {
const completeCommand = new CompleteMultipartUploadCommand({
Bucket: config.s3BucketName,
Key: metadata.s3Key,
UploadId: metadata.s3UploadId,
MultipartUpload: {
Parts: metadata.parts // Use the collected parts { PartNumber, ETag }
},
});
const response = await s3Client.send(completeCommand);
// Example response: { ETag: '"..."', Location: '...', Key: '...', Bucket: '...' }
logger.success(`[S3 Adapter] Finalized multipart upload: ${metadata.s3Key} (ETag: ${response.ETag})`);
// Clean up local metadata AFTER successful S3 completion
await deleteUploadMetadata(appUploadId);
// Send notification
sendNotification(metadata.originalFilename, metadata.fileSize, config);
// Return info consistent with local adapter where possible
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
} catch (err) {
logger.error(`[S3 Adapter] Failed to complete multipart upload for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
// Specific S3 errors like InvalidPartOrder, EntityTooSmall might occur here.
// If Complete fails, S3 *might* have already assembled it (rare).
// Check if the object now exists? If so, maybe delete metadata? Complex recovery.
// For now, just log the error and throw. The local metadata will persist.
if (err.Code === 'NoSuchUpload') {
logger.warn(`[S3 Adapter] CompleteMultipartUpload failed with NoSuchUpload for ${appUploadId}. Assuming already completed or aborted.`);
await deleteUploadMetadata(appUploadId).catch(()=>{}); // Attempt metadata cleanup
// Check if final object exists?
try {
// Use GetObject or HeadObject to check
await s3Client.send(new GetObjectCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key }));
logger.info(`[S3 Adapter] Final object ${metadata.s3Key} exists after NoSuchUpload error. Treating as completed.`);
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
} catch (headErr) {
// Final object doesn't exist either.
throw new Error('Completion failed: Upload session not found and final object does not exist.');
}
}
throw err;
}
}
/**
* Aborts an ongoing S3 multipart upload.
* @param {string} appUploadId - The application's upload ID.
* @returns {Promise<void>}
*/
async function abortUpload(appUploadId) {
const metadata = await readUploadMetadata(appUploadId);
if (!metadata || !metadata.s3UploadId) {
logger.warn(`[S3 Adapter] Abort request for non-existent or completed upload: ${appUploadId}`);
await deleteUploadMetadata(appUploadId); // Clean up local metadata if it exists anyway
return;
}
try {
const abortCommand = new AbortMultipartUploadCommand({
Bucket: config.s3BucketName,
Key: metadata.s3Key,
UploadId: metadata.s3UploadId,
});
await s3Client.send(abortCommand);
logger.info(`[S3 Adapter] Aborted multipart upload: ${appUploadId} (Key: ${metadata.s3Key})`);
} catch (err) {
if (err.name === 'NoSuchUpload') {
logger.warn(`[S3 Adapter] Multipart upload ${appUploadId} (Key: ${metadata.s3Key}) not found during abort. Already aborted or completed.`);
} else {
logger.error(`[S3 Adapter] Failed to abort multipart upload for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
// Don't delete local metadata if abort failed, might be retryable or need manual cleanup
throw err; // Rethrow S3 error
}
}
// Delete local metadata AFTER successful abort or if NoSuchUpload
await deleteUploadMetadata(appUploadId);
}
/**
* Lists files in the S3 bucket.
* @returns {Promise<Array<{filename: string, size: number, formattedSize: string, uploadDate: Date}>>} List of files.
*/
async function listFiles() {
try {
const command = new ListObjectsV2Command({
Bucket: config.s3BucketName,
// Optional: Add Prefix if you want to list within a specific 'folder'
// Prefix: 'uploads/'
});
// TODO: Add pagination handling if expecting >1000 objects
const response = await s3Client.send(command);
const files = (response.Contents || [])
// Optional: Filter out objects that might represent folders if necessary
// .filter(item => !(item.Key.endsWith('/') && item.Size === 0))
.map(item => ({
filename: item.Key, // S3 Key is the filename/path
size: item.Size,
formattedSize: formatFileSize(item.Size), // Use utility
uploadDate: item.LastModified
})); }));
return listResponse.KeyCount > 0;
} catch (error) {
logger.error(`[S3 Adapter] Error listing objects for prefix check "${prefix}": ${error.message}`);
throw error;
}
}
// Sort by date, newest first while (await prefixHasObjects(currentPrefixToCheck)) {
files.sort((a, b) => b.uploadDate.getTime() - a.uploadDate.getTime()); logger.warn(`[S3 Adapter] S3 prefix "${currentPrefixToCheck}" is not empty. Generating unique prefix for base "${baseName}/".`);
currentPrefixToCheck = `${baseName}-${counter}/`;
counter++;
}
return files; if (currentPrefixToCheck !== originalPrefix) {
logger.info(`[S3 Adapter] Using unique S3 folder prefix: "${currentPrefixToCheck}" for original "${originalPrefix}" in batch "${batchId}"`);
}
batchS3PrefixMappings.set(prefixMapKey, currentPrefixToCheck);
return currentPrefixToCheck;
}
} catch (err) { // --- Interface Implementation ---
logger.error(`[S3 Adapter] Failed to list objects in bucket ${config.s3BucketName}: ${err.message}`); async function initUpload(filename, fileSize, clientBatchId) {
throw err; await ensureMetadataDirExists();
const size = Number(fileSize);
const appUploadId = crypto.randomBytes(16).toString('hex');
const batchId = clientBatchId || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
const originalSanitizedFullpath = sanitizePathPreserveDirs(filename);
let s3KeyStructure = path.normalize(originalSanitizedFullpath)
.replace(/^(\.\.(\/|\\|$))+/, '').replace(/\\/g, '/').replace(/^\/+/, '');
let effectiveBasePrefix = ""; // e.g., "MyFolder-1/" or ""
const pathParts = s3KeyStructure.split('/');
const isNestedPath = pathParts.length > 1;
let relativePathInFolder = s3KeyStructure; // Path of the file relative to its (potentially versioned) folder
if (isNestedPath) {
const originalTopLevelFolder = pathParts[0] + '/'; // "MyFolder/"
effectiveBasePrefix = await getUniqueS3FolderPrefix(originalTopLevelFolder, batchId);
relativePathInFolder = pathParts.slice(1).join('/'); // "SubFolder/image.jpg" or "image.jpg"
s3KeyStructure = effectiveBasePrefix + relativePathInFolder; // e.g., "MyFolder-1/SubFolder/image.jpg"
}
logger.info(`[S3 Adapter] Init: Original Full Path: "${originalSanitizedFullpath}", Effective Base Prefix: "${effectiveBasePrefix}", Relative Path: "${relativePathInFolder}"`);
// Now, ensure the s3KeyStructure (which is the full path including the versioned folder prefix)
// is unique at the file level if it already exists.
let finalS3Key = s3KeyStructure;
let fileCounter = 1;
const fileDir = path.dirname(s3KeyStructure); // Can be "." if not nested within sub-sub-folders
const fileExt = path.extname(s3KeyStructure);
const fileBaseName = path.basename(s3KeyStructure, fileExt);
while (await s3ObjectExists(finalS3Key)) {
logger.warn(`[S3 Adapter] S3 file key already exists: "${finalS3Key}". Generating unique file key.`);
finalS3Key = (fileDir === "." ? "" : fileDir + "/") + `${fileBaseName}-${fileCounter}${fileExt}`;
fileCounter++;
}
if (finalS3Key !== s3KeyStructure) {
logger.info(`[S3 Adapter] Using unique S3 file key: "${finalS3Key}"`);
}
// Handle Zero-Byte Files
if (size === 0) {
try {
await s3Client.send(new PutObjectCommand({
Bucket: config.s3BucketName, Key: finalS3Key, Body: '', ContentLength: 0
}));
logger.success(`[S3 Adapter] Completed zero-byte file: ${finalS3Key}`);
sendNotification(originalSanitizedFullpath, 0, config);
return { uploadId: `zero-byte-${appUploadId}` };
} catch (putErr) {
logger.error(`[S3 Adapter] Failed zero-byte PUT for ${finalS3Key}: ${putErr.message}`);
throw putErr;
} }
} }
/** // Initiate Multipart Upload
* Generates a presigned URL for downloading an S3 object. try {
* @param {string} s3Key - The S3 Key (filename/path) of the object. const createCommand = new CreateMultipartUploadCommand({ Bucket: config.s3BucketName, Key: finalS3Key });
* @returns {Promise<{type: string, value: string}>} Object indicating type ('url') and value (the presigned URL). const response = await s3Client.send(createCommand);
*/ const s3UploadId = response.UploadId;
async function getDownloadUrlOrStream(s3Key) { if (!s3UploadId) throw new Error('S3 did not return UploadId');
// Input `s3Key` is assumed to be sanitized by the calling route/logic logger.info(`[S3 Adapter] Multipart initiated for ${finalS3Key} (S3 UploadId: ${s3UploadId})`);
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) {
logger.error(`[S3 Adapter] Invalid S3 key detected for download: ${s3Key}`);
throw new Error('Invalid filename');
}
try { const metadata = {
const command = new GetObjectCommand({ appUploadId, s3UploadId, s3Key: finalS3Key,
Bucket: config.s3BucketName, originalFilename: originalSanitizedFullpath, // Original path from client for notification
Key: s3Key, fileSize: size, bytesReceived: 0, parts: [], batchId,
// Optional: Override response headers like filename createdAt: Date.now(), lastActivity: Date.now()
// ResponseContentDisposition: `attachment; filename="${path.basename(s3Key)}"` };
}); await writeUploadMetadata(appUploadId, metadata);
return { uploadId: appUploadId };
} catch (err) {
logger.error(`[S3 Adapter] Failed multipart init for ${finalS3Key}: ${err.message}`);
throw err;
}
}
// Generate presigned URL (expires in 1 hour by default, adjustable) async function storeChunk(appUploadId, chunk, partNumber) {
const url = await getSignedUrl(s3Client, command, { expiresIn: 3600 }); const chunkSize = chunk.length;
logger.info(`[S3 Adapter] Generated presigned URL for ${s3Key}`); if (!chunkSize) throw new Error('Empty chunk received');
return { type: 'url', value: url }; if (partNumber < 1) throw new Error('PartNumber must be 1 or greater');
} catch (err) { const metadata = await readUploadMetadata(appUploadId);
logger.error(`[S3 Adapter] Failed to generate presigned URL for ${s3Key}: ${err.message}`); if (!metadata || !metadata.s3UploadId) {
if (err.name === 'NoSuchKey') { logger.warn(`[S3 Adapter] Metadata or S3 UploadId not found for chunk: ${appUploadId}`);
throw new Error('File not found in S3'); throw new Error('Upload session not found or already completed');
} }
throw err; // Re-throw other S3 errors if (metadata.bytesReceived >= metadata.fileSize && metadata.fileSize > 0) {
} logger.warn(`[S3 Adapter] Chunk for already completed upload ${appUploadId}. Ignoring.`);
return { bytesReceived: metadata.bytesReceived, progress: 100, completed: true };
} }
/** try {
* Deletes an object from the S3 bucket. const cmd = new UploadPartCommand({
* @param {string} s3Key - The S3 Key (filename/path) of the object to delete. Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
* @returns {Promise<void>} Body: chunk, PartNumber: partNumber, ContentLength: chunkSize
*/ });
async function deleteFile(s3Key) { const response = await s3Client.send(cmd);
// Input `s3Key` is assumed to be sanitized const etag = response.ETag;
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) { if (!etag) throw new Error(`S3 ETag missing for Part ${partNumber}`);
logger.error(`[S3 Adapter] Invalid S3 key detected for delete: ${s3Key}`);
throw new Error('Invalid filename');
}
try { metadata.parts.push({ PartNumber: partNumber, ETag: etag });
const command = new DeleteObjectCommand({ metadata.parts.sort((a, b) => a.PartNumber - b.PartNumber);
Bucket: config.s3BucketName, metadata.bytesReceived = Math.min((metadata.bytesReceived || 0) + chunkSize, metadata.fileSize);
Key: s3Key, await writeUploadMetadata(appUploadId, metadata);
});
await s3Client.send(command); const progress = metadata.fileSize === 0 ? 100 : Math.min(Math.round((metadata.bytesReceived / metadata.fileSize) * 100), 100);
logger.info(`[S3 Adapter] Deleted object: ${s3Key}`); const completed = metadata.bytesReceived >= metadata.fileSize;
} catch (err) { logger.debug(`[S3 Adapter] Part ${partNumber} for ${appUploadId} (Key: ${metadata.s3Key}). ETag: ${etag}. Progress: ~${progress}%. Completed: ${completed}`);
// DeleteObject is idempotent, so NoSuchKey isn't typically an error unless you need to know. return { bytesReceived: metadata.bytesReceived, progress, completed };
logger.error(`[S3 Adapter] Failed to delete object ${s3Key}: ${err.message}`); } catch (err) {
throw err; logger.error(`[S3 Adapter] Failed Part ${partNumber} for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
} throw err;
} }
}
/** async function completeUpload(appUploadId) {
* Cleans up stale *local* metadata files for S3 uploads. const metadata = await readUploadMetadata(appUploadId);
* Relies on S3 Lifecycle Policies for actual S3 cleanup. if (!metadata || !metadata.s3UploadId || !metadata.parts || metadata.parts.length === 0) {
* @returns {Promise<void>} throw new Error('Upload completion failed: Missing metadata/parts');
*/ }
async function cleanupStale() { if (metadata.bytesReceived < metadata.fileSize) {
logger.info('[S3 Adapter] Running cleanup for stale local metadata files...'); logger.warn(`[S3 Adapter] Completing ${appUploadId} with ${metadata.bytesReceived}/${metadata.fileSize} bytes tracked.`);
let cleanedCount = 0; }
let checkedCount = 0; try {
const cmd = new CompleteMultipartUploadCommand({
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
MultipartUpload: { Parts: metadata.parts },
});
const response = await s3Client.send(cmd);
logger.success(`[S3 Adapter] Finalized: ${metadata.s3Key} (ETag: ${response.ETag})`);
await deleteUploadMetadata(appUploadId);
sendNotification(metadata.originalFilename, metadata.fileSize, config);
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
} catch (err) {
logger.error(`[S3 Adapter] Failed CompleteMultipartUpload for ${metadata.s3Key}: ${err.message}`);
if (err.Code === 'NoSuchUpload' || err.name === 'NoSuchUpload') {
logger.warn(`[S3 Adapter] NoSuchUpload on complete for ${appUploadId}. Assuming completed/aborted.`);
await deleteUploadMetadata(appUploadId).catch(()=>{});
try { // Verify if object exists
await s3Client.send(new HeadObjectCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key }));
logger.info(`[S3 Adapter] Final object ${metadata.s3Key} exists after NoSuchUpload. Treating as completed.`);
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
} catch (headErr) { throw new Error('Completion failed: Session & final object not found.'); }
}
throw err;
}
}
try { async function abortUpload(appUploadId) {
await ensureMetadataDirExists(); // Re-check const metadata = await readUploadMetadata(appUploadId);
if (!metadata || !metadata.s3UploadId) {
logger.warn(`[S3 Adapter] Abort for non-existent/completed upload: ${appUploadId}`);
await deleteUploadMetadata(appUploadId); return;
}
try {
await s3Client.send(new AbortMultipartUploadCommand({
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
}));
logger.info(`[S3 Adapter] Aborted: ${appUploadId} (Key: ${metadata.s3Key})`);
} catch (err) {
if (err.name !== 'NoSuchUpload') {
logger.error(`[S3 Adapter] Failed Abort for ${metadata.s3Key}: ${err.message}`); throw err;
}
logger.warn(`[S3 Adapter] NoSuchUpload on abort for ${metadata.s3Key}. Already aborted/completed.`);
}
await deleteUploadMetadata(appUploadId);
}
const files = await fs.readdir(METADATA_DIR); async function listFiles() {
const now = Date.now(); try {
let isTruncated = true; let continuationToken; const allFiles = [];
while(isTruncated) {
const params = { Bucket: config.s3BucketName };
if (continuationToken) params.ContinuationToken = continuationToken;
const response = await s3Client.send(new ListObjectsV2Command(params));
(response.Contents || []).forEach(item => allFiles.push({
filename: item.Key, size: item.Size,
formattedSize: formatFileSize(item.Size), uploadDate: item.LastModified
}));
isTruncated = response.IsTruncated;
continuationToken = response.NextContinuationToken;
}
allFiles.sort((a, b) => b.uploadDate.getTime() - a.uploadDate.getTime());
return allFiles;
} catch (err) {
logger.error(`[S3 Adapter] Failed list objects in ${config.s3BucketName}: ${err.message}`); throw err;
}
}
for (const file of files) { async function getDownloadUrlOrStream(s3Key) {
if (file.endsWith('.meta')) { if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid filename for download');
checkedCount++; try {
const appUploadId = file.replace('.meta', ''); const cmd = new GetObjectCommand({ Bucket: config.s3BucketName, Key: s3Key });
const metaFilePath = path.join(METADATA_DIR, file); const url = await getSignedUrl(s3Client, cmd, { expiresIn: 3600 });
logger.info(`[S3 Adapter] Presigned URL for ${s3Key}`);
return { type: 'url', value: url };
} catch (err) {
logger.error(`[S3 Adapter] Failed presigned URL for ${s3Key}: ${err.message}`);
if (err.name === 'NoSuchKey') throw new Error('File not found in S3'); throw err;
}
}
try { async function deleteFile(s3Key) {
const data = await fs.readFile(metaFilePath, 'utf8'); if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid filename for delete');
const metadata = JSON.parse(data); try {
await s3Client.send(new DeleteObjectCommand({ Bucket: config.s3BucketName, Key: s3Key }));
logger.info(`[S3 Adapter] Deleted: ${s3Key}`);
} catch (err) {
logger.error(`[S3 Adapter] Failed delete for ${s3Key}: ${err.message}`); throw err;
}
}
// Check inactivity based on local metadata timestamp async function cleanupStale() {
if (now - (metadata.lastActivity || metadata.createdAt || 0) > UPLOAD_TIMEOUT) { logger.info('[S3 Adapter] Cleaning stale local metadata...');
logger.warn(`[S3 Adapter] Found stale local metadata: ${file}. Last activity: ${new Date(metadata.lastActivity || metadata.createdAt)}. S3 UploadId: ${metadata.s3UploadId || 'N/A'}`); let cleaned = 0, checked = 0;
try {
// Only delete the LOCAL metadata file. DO NOT ABORT S3 UPLOAD HERE. await ensureMetadataDirExists(); const files = await fs.readdir(METADATA_DIR); const now = Date.now();
await deleteUploadMetadata(appUploadId); // Use helper for (const file of files) {
cleanedCount++; if (file.endsWith('.meta')) {
} checked++; const id = file.replace('.meta',''); const fp = path.join(METADATA_DIR, file);
} catch (readErr) { try {
logger.error(`[S3 Adapter] Error reading/parsing local metadata ${metaFilePath} during cleanup: ${readErr.message}. Skipping.`); const meta = JSON.parse(await fs.readFile(fp, 'utf8'));
await fs.unlink(metaFilePath).catch(()=>{ logger.warn(`[S3 Adapter] Failed to delete potentially corrupt local metadata file: ${metaFilePath}`) }); if (now - (meta.lastActivity || meta.createdAt || 0) > UPLOAD_TIMEOUT) {
logger.warn(`[S3 Adapter] Stale local meta: ${file}, S3 ID: ${meta.s3UploadId||'N/A'}`);
await deleteUploadMetadata(id); cleaned++;
} }
} else if (file.endsWith('.tmp')) { } catch (e) { logger.error(`[S3 Adapter] Error parsing meta ${fp}: ${e.message}`); await fs.unlink(fp).catch(()=>{}); }
// Clean up potential leftover temp metadata files (same as local adapter) } else if (file.endsWith('.tmp')) {
const tempMetaPath = path.join(METADATA_DIR, file); const tmpP = path.join(METADATA_DIR, file);
try { try { if (now - (await fs.stat(tmpP)).mtime.getTime() > UPLOAD_TIMEOUT) { logger.warn(`[S3 Adapter] Deleting stale tmp meta: ${file}`); await fs.unlink(tmpP); }}
const stats = await fs.stat(tempMetaPath); catch (e) { if (e.code!=='ENOENT') logger.error(`[S3 Adapter] Error stat/unlink tmp meta ${tmpP}: ${e.message}`);}
if (now - stats.mtime.getTime() > UPLOAD_TIMEOUT) {
logger.warn(`[S3 Adapter] Deleting stale temporary local metadata file: ${file}`);
await fs.unlink(tempMetaPath);
}
} catch (statErr) {
if (statErr.code !== 'ENOENT') {
logger.error(`[S3 Adapter] Error checking temp local metadata file ${tempMetaPath}: ${statErr.message}`);
}
}
}
} }
if (checkedCount > 0 || cleanedCount > 0) {
logger.info(`[S3 Adapter] Local metadata cleanup finished. Checked: ${checkedCount}, Cleaned stale local files: ${cleanedCount}.`);
}
// Log the crucial recommendation
logger.warn(`[S3 Adapter] IMPORTANT: For S3 storage, configure Lifecycle Rules on your bucket (${config.s3BucketName}) or use provider-specific tools to automatically clean up incomplete multipart uploads after a few days. This adapter only cleans up local tracking files.`);
} catch (err) {
if (err.code === 'ENOENT' && err.path === METADATA_DIR) {
logger.warn('[S3 Adapter] Local metadata directory not found during cleanup scan.');
} else {
logger.error(`[S3 Adapter] Error during local metadata cleanup scan: ${err.message}`);
}
} }
if (checked > 0 || cleaned > 0) logger.info(`[S3 Adapter] Local meta cleanup: Checked ${checked}, Cleaned ${cleaned}.`);
logger.warn(`[S3 Adapter] IMPORTANT: Configure S3 Lifecycle Rules on bucket '${config.s3BucketName}' to clean incomplete multipart uploads.`);
} catch (err) {
if (err.code==='ENOENT'&&err.path===METADATA_DIR) logger.warn('[S3 Adapter] Local meta dir not found for cleanup.');
else logger.error(`[S3 Adapter] Error local meta cleanup: ${err.message}`);
} }
// Also clean up the batchS3PrefixMappings to prevent memory leak on long-running server
// Simple approach: clear if older than BATCH_TIMEOUT (requires storing timestamp, or just clear periodically)
// For now, let's rely on server restarts to clear this in-memory map.
// A more robust solution would be to use a TTL cache or integrate with batchActivity cleanup.
if (batchS3PrefixMappings.size > 1000) { // Arbitrary limit to trigger a clear
logger.warn(`[S3 Adapter] Clearing batchS3PrefixMappings due to size (${batchS3PrefixMappings.size}). Consider a more robust cleanup for this map if server runs for very long periods with many unique batches.`);
batchS3PrefixMappings.clear();
}
}
module.exports = { module.exports = {
initUpload, initUpload, storeChunk, completeUpload, abortUpload,
storeChunk, listFiles, getDownloadUrlOrStream, deleteFile, cleanupStale
completeUpload, };
abortUpload,
listFiles,
getDownloadUrlOrStream,
deleteFile,
cleanupStale
};