mirror of
https://github.com/DumbWareio/DumbDrop.git
synced 2025-10-23 07:41:58 +00:00
feat:(storage) fixing what i borked
This commit is contained in:
1012
public/index.html
1012
public/index.html
File diff suppressed because it is too large
Load Diff
@@ -1,39 +1,13 @@
|
||||
// File: src/config/index.js
|
||||
require('dotenv').config();
|
||||
const { validatePin } = require('../utils/security');
|
||||
const logger = require('../utils/logger'); // Use the default logger instance
|
||||
const logger = require('../utils/logger');
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const { version } = require('../../package.json'); // Get version from package.json
|
||||
// const { version } = require('../../package.json'); // version not currently used, can be removed or kept
|
||||
|
||||
// --- Environment Variables Reference ---
|
||||
/*
|
||||
STORAGE_TYPE - Storage backend ('local' or 's3', default: 'local')
|
||||
// --- Local Storage ---
|
||||
UPLOAD_DIR - Directory for uploads (Docker/production, if STORAGE_TYPE=local)
|
||||
LOCAL_UPLOAD_DIR - Directory for uploads (local dev, fallback: './local_uploads', if STORAGE_TYPE=local)
|
||||
// --- S3 Storage ---
|
||||
S3_REGION - AWS Region for S3 Bucket (required if STORAGE_TYPE=s3)
|
||||
S3_BUCKET_NAME - Name of the S3 Bucket (required if STORAGE_TYPE=s3)
|
||||
S3_ACCESS_KEY_ID - S3 Access Key ID (required if STORAGE_TYPE=s3)
|
||||
S3_SECRET_ACCESS_KEY - S3 Secret Access Key (required if STORAGE_TYPE=s3)
|
||||
S3_ENDPOINT_URL - Custom S3 endpoint URL (optional, for non-AWS S3)
|
||||
S3_FORCE_PATH_STYLE - Force path-style access (true/false, optional, for non-AWS S3)
|
||||
// --- Common ---
|
||||
PORT - Port for the server (default: 3000)
|
||||
NODE_ENV - Node environment (default: 'development')
|
||||
BASE_URL - Base URL for the app (default: http://localhost:${PORT})
|
||||
MAX_FILE_SIZE - Max upload size in MB (default: 1024)
|
||||
AUTO_UPLOAD - Enable auto-upload (true/false, default: false)
|
||||
DUMBDROP_PIN - Security PIN for uploads (required for protected endpoints)
|
||||
DUMBDROP_TITLE - Site title (default: 'DumbDrop')
|
||||
APPRISE_URL - Apprise notification URL (optional)
|
||||
APPRISE_MESSAGE - Notification message template (default provided)
|
||||
APPRISE_SIZE_UNIT - Size unit for notifications (optional)
|
||||
ALLOWED_EXTENSIONS - Comma-separated list of allowed file extensions (optional)
|
||||
ALLOWED_IFRAME_ORIGINS- Comma-separated list of allowed iframe origins (optional)
|
||||
CLIENT_MAX_RETRIES - Max retries for client chunk uploads (default: 5)
|
||||
DEMO_MODE - Enable demo mode (true/false, default: false)
|
||||
*/
|
||||
/* (Comments listing all ENV vars - keep as is) */
|
||||
|
||||
// --- Helper for clear configuration logging ---
|
||||
const logConfig = (message, level = 'info') => {
|
||||
@@ -44,7 +18,7 @@ const logConfig = (message, level = 'info') => {
|
||||
// --- Default configurations ---
|
||||
const DEFAULT_PORT = 3000;
|
||||
const DEFAULT_SITE_TITLE = 'DumbDrop';
|
||||
const DEFAULT_BASE_URL = 'http://localhost:3000';
|
||||
const DEFAULT_BASE_URL_PREFIX = 'http://localhost'; // Prefix, port added later
|
||||
const DEFAULT_CLIENT_MAX_RETRIES = 5;
|
||||
const DEFAULT_STORAGE_TYPE = 'local';
|
||||
|
||||
@@ -54,81 +28,62 @@ const logAndReturn = (key, value, isDefault = false, sensitive = false) => {
|
||||
return value;
|
||||
};
|
||||
|
||||
// --- Utility to detect if running in local development mode ---
|
||||
// (This helps decide whether to *create* LOCAL_UPLOAD_DIR, but doesn't affect UPLOAD_DIR usage in Docker)
|
||||
function isLocalDevelopment() {
|
||||
return process.env.NODE_ENV !== 'production' && !process.env.UPLOAD_DIR;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the local upload directory path.
|
||||
* Only relevant when STORAGE_TYPE is 'local'.
|
||||
* @returns {string|null} The path, or null if storage is not local.
|
||||
*/
|
||||
function determineLocalUploadDirectory() {
|
||||
if (process.env.STORAGE_TYPE && process.env.STORAGE_TYPE.toLowerCase() !== 'local') {
|
||||
return null; // Not using local storage
|
||||
}
|
||||
|
||||
let uploadDir;
|
||||
if (process.env.UPLOAD_DIR) {
|
||||
uploadDir = process.env.UPLOAD_DIR;
|
||||
logger.info(`[Local Storage] Upload directory set from UPLOAD_DIR: ${uploadDir}`);
|
||||
// logger.info(`[Local Storage] Upload directory set from UPLOAD_DIR: ${uploadDir}`); // Logger might not be fully init here
|
||||
} else if (process.env.LOCAL_UPLOAD_DIR) {
|
||||
uploadDir = process.env.LOCAL_UPLOAD_DIR;
|
||||
logger.warn(`[Local Storage] Upload directory using LOCAL_UPLOAD_DIR fallback: ${uploadDir}`);
|
||||
// logger.warn(`[Local Storage] Upload directory using LOCAL_UPLOAD_DIR fallback: ${uploadDir}`);
|
||||
} else {
|
||||
uploadDir = './local_uploads'; // Default local path
|
||||
logger.warn(`[Local Storage] Upload directory using default fallback: ${uploadDir}`);
|
||||
uploadDir = './local_uploads';
|
||||
// logger.warn(`[Local Storage] Upload directory using default fallback: ${uploadDir}`);
|
||||
}
|
||||
logger.info(`[Local Storage] Final upload directory path: ${path.resolve(uploadDir)}`);
|
||||
return uploadDir;
|
||||
// logger.info(`[Local Storage] Final upload directory path: ${path.resolve(uploadDir)}`);
|
||||
return path.resolve(uploadDir); // Always resolve to absolute
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure the local upload directory exists (if applicable and in local dev).
|
||||
*/
|
||||
function ensureLocalUploadDirExists(dirPath) {
|
||||
if (!dirPath || !isLocalDevelopment()) {
|
||||
return; // Only create if using local storage in a local dev environment
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (!fs.existsSync(dirPath)) {
|
||||
fs.mkdirSync(dirPath, { recursive: true });
|
||||
logger.info(`[Local Storage] Created local upload directory: ${dirPath}`);
|
||||
console.log(`[INFO] CONFIGURATION: [Local Storage] Created local upload directory: ${dirPath}`);
|
||||
} else {
|
||||
logger.info(`[Local Storage] Local upload directory exists: ${dirPath}`);
|
||||
console.log(`[INFO] CONFIGURATION: [Local Storage] Local upload directory exists: ${dirPath}`);
|
||||
}
|
||||
// Basic writability check
|
||||
fs.accessSync(dirPath, fs.constants.W_OK);
|
||||
logger.success(`[Local Storage] Local upload directory is writable: ${dirPath}`);
|
||||
console.log(`[SUCCESS] CONFIGURATION: [Local Storage] Local upload directory is writable: ${dirPath}`);
|
||||
} catch (err) {
|
||||
logger.error(`[Local Storage] Failed to create or access local upload directory: ${dirPath}. Error: ${err.message}`);
|
||||
console.error(`[ERROR] CONFIGURATION: [Local Storage] Failed to create or access local upload directory: ${dirPath}. Error: ${err.message}`);
|
||||
throw new Error(`Upload directory "${dirPath}" is not accessible or writable.`);
|
||||
}
|
||||
}
|
||||
|
||||
// --- Determine Storage Type ---
|
||||
const storageTypeInput = process.env.STORAGE_TYPE || DEFAULT_STORAGE_TYPE;
|
||||
const storageType = ['local', 's3'].includes(storageTypeInput.toLowerCase())
|
||||
? storageTypeInput.toLowerCase()
|
||||
: DEFAULT_STORAGE_TYPE;
|
||||
|
||||
if (storageTypeInput.toLowerCase() !== storageType) {
|
||||
logger.warn(`Invalid STORAGE_TYPE "${storageTypeInput}", using default: "${storageType}"`);
|
||||
console.warn(`[WARN] CONFIGURATION: Invalid STORAGE_TYPE "${storageTypeInput}", using default: "${storageType}"`);
|
||||
}
|
||||
|
||||
// Determine and potentially ensure local upload directory
|
||||
const resolvedLocalUploadDir = determineLocalUploadDirectory(); // Will be null if STORAGE_TYPE is 's3'
|
||||
if (resolvedLocalUploadDir) {
|
||||
const resolvedLocalUploadDir = determineLocalUploadDirectory();
|
||||
if (storageType === 'local' && resolvedLocalUploadDir) { // Only ensure if actually using local storage
|
||||
ensureLocalUploadDirExists(resolvedLocalUploadDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to parse the FOOTER_LINKS environment variable
|
||||
* @param {string} linksString - The input string containing links
|
||||
* @returns {Array} - An array of objects containing text and URL
|
||||
*/
|
||||
const parseFooterLinks = (linksString) => {
|
||||
if (!linksString) return [];
|
||||
return linksString.split(',')
|
||||
@@ -136,85 +91,45 @@ const parseFooterLinks = (linksString) => {
|
||||
const parts = linkPair.split('@').map(part => part.trim());
|
||||
if (parts.length === 2 && parts[0] && parts[1] && (parts[1].startsWith('http://') || parts[1].startsWith('https://'))) {
|
||||
return { text: parts[0], url: parts[1] };
|
||||
} else {
|
||||
logger.warn(`Invalid format or URL in FOOTER_LINKS: "${linkPair}". Expected "Text @ http(s)://URL". Skipping.`);
|
||||
return null;
|
||||
}
|
||||
// logger.warn(`Invalid format or URL in FOOTER_LINKS: "${linkPair}".`); // Logger might not be fully init
|
||||
return null;
|
||||
})
|
||||
.filter(link => link !== null);
|
||||
};
|
||||
|
||||
/**
|
||||
* Application configuration
|
||||
* Loads and validates environment variables
|
||||
*/
|
||||
const port = parseInt(process.env.PORT || DEFAULT_PORT, 10);
|
||||
const baseUrl = process.env.BASE_URL || `${DEFAULT_BASE_URL_PREFIX}:${port}/`;
|
||||
|
||||
const config = {
|
||||
// =====================
|
||||
// Core Settings
|
||||
// =====================
|
||||
port: parseInt(process.env.PORT || DEFAULT_PORT, 10),
|
||||
port,
|
||||
nodeEnv: process.env.NODE_ENV || 'development',
|
||||
baseUrl: process.env.BASE_URL || `${DEFAULT_BASE_URL.replace(/:3000$/, '')}:${process.env.PORT || DEFAULT_PORT}/`, // Ensure trailing slash
|
||||
baseUrl,
|
||||
isDemoMode: process.env.DEMO_MODE === 'true',
|
||||
|
||||
// =====================
|
||||
// Storage Settings
|
||||
// =====================
|
||||
storageType: logAndReturn('STORAGE_TYPE', storageType, storageType === DEFAULT_STORAGE_TYPE),
|
||||
/**
|
||||
* The primary directory for storing files or metadata.
|
||||
* If STORAGE_TYPE=local, this is where files are stored.
|
||||
* If STORAGE_TYPE=s3, this is where '.metadata' lives.
|
||||
* We default to the determined local path or a standard './uploads' if S3 is used.
|
||||
*/
|
||||
uploadDir: resolvedLocalUploadDir || path.resolve('./uploads'), // S3 needs a place for metadata too
|
||||
|
||||
// --- S3 Specific (only relevant if storageType is 's3') ---
|
||||
storageType,
|
||||
uploadDir: storageType === 'local' ? resolvedLocalUploadDir : path.resolve(process.env.UPLOAD_DIR || process.env.LOCAL_UPLOAD_DIR || './uploads'), // For S3, metadata dir. Fallback required.
|
||||
s3Region: process.env.S3_REGION || null,
|
||||
s3BucketName: process.env.S3_BUCKET_NAME || null,
|
||||
s3AccessKeyId: process.env.S3_ACCESS_KEY_ID || null,
|
||||
s3SecretAccessKey: process.env.S3_SECRET_ACCESS_KEY || null,
|
||||
s3EndpointUrl: process.env.S3_ENDPOINT_URL || null, // Default to null (AWS default endpoint)
|
||||
s3ForcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true', // Default to false
|
||||
|
||||
// =====================
|
||||
// Upload Behavior
|
||||
// =====================
|
||||
s3EndpointUrl: process.env.S3_ENDPOINT_URL || null,
|
||||
s3ForcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true',
|
||||
maxFileSize: (() => {
|
||||
const sizeInMB = parseInt(process.env.MAX_FILE_SIZE || '1024', 10);
|
||||
if (isNaN(sizeInMB) || sizeInMB <= 0) {
|
||||
logger.error('Invalid MAX_FILE_SIZE, must be a positive number. Using 1024MB.');
|
||||
return 1024 * 1024 * 1024;
|
||||
}
|
||||
return sizeInMB * 1024 * 1024; // Convert MB to bytes
|
||||
return (isNaN(sizeInMB) || sizeInMB <= 0 ? 1024 : sizeInMB) * 1024 * 1024;
|
||||
})(),
|
||||
autoUpload: process.env.AUTO_UPLOAD === 'true',
|
||||
allowedExtensions: process.env.ALLOWED_EXTENSIONS ?
|
||||
process.env.ALLOWED_EXTENSIONS.split(',').map(ext => ext.trim().toLowerCase().replace(/^\./, '.')).filter(Boolean) : // Ensure dot prefix
|
||||
process.env.ALLOWED_EXTENSIONS.split(',').map(ext => ext.trim().toLowerCase().replace(/^\./, '.')).filter(Boolean) :
|
||||
null,
|
||||
clientMaxRetries: (() => {
|
||||
const envValue = process.env.CLIENT_MAX_RETRIES;
|
||||
const defaultValue = DEFAULT_CLIENT_MAX_RETRIES;
|
||||
if (envValue === undefined) return logAndReturn('CLIENT_MAX_RETRIES', defaultValue, true);
|
||||
const retries = parseInt(envValue, 10);
|
||||
if (isNaN(retries) || retries < 0) {
|
||||
logger.warn(`Invalid CLIENT_MAX_RETRIES value: "${envValue}". Using default: ${defaultValue}`);
|
||||
return logAndReturn('CLIENT_MAX_RETRIES', defaultValue, true);
|
||||
}
|
||||
return logAndReturn('CLIENT_MAX_RETRIES', retries);
|
||||
const retries = parseInt(process.env.CLIENT_MAX_RETRIES || DEFAULT_CLIENT_MAX_RETRIES, 10);
|
||||
return (isNaN(retries) || retries < 0) ? DEFAULT_CLIENT_MAX_RETRIES : retries;
|
||||
})(),
|
||||
|
||||
// =====================
|
||||
// Security
|
||||
// =====================
|
||||
pin: validatePin(process.env.DUMBDROP_PIN),
|
||||
pin: validatePin(process.env.DUMBDROP_PIN), // validatePin uses logger, ensure logger is available
|
||||
allowedIframeOrigins: process.env.ALLOWED_IFRAME_ORIGINS ?
|
||||
process.env.ALLOWED_IFRAME_ORIGINS.split(',').map(origin => origin.trim()).filter(Boolean) :
|
||||
null,
|
||||
|
||||
// =====================
|
||||
// UI & Notifications
|
||||
// =====================
|
||||
siteTitle: process.env.DUMBDROP_TITLE || DEFAULT_SITE_TITLE,
|
||||
footerLinks: parseFooterLinks(process.env.FOOTER_LINKS),
|
||||
appriseUrl: process.env.APPRISE_URL || null,
|
||||
@@ -222,113 +137,86 @@ const config = {
|
||||
appriseSizeUnit: process.env.APPRISE_SIZE_UNIT || 'Auto',
|
||||
};
|
||||
|
||||
// --- Log Sensitive & Conditional Config ---
|
||||
logConfig(`NODE_ENV: ${config.nodeEnv}`);
|
||||
logConfig(`PORT: ${config.port}`);
|
||||
logConfig(`BASE_URL: ${config.baseUrl}`);
|
||||
logConfig(`DEMO_MODE: ${config.isDemoMode}`);
|
||||
// --- Log Configuration (after logger is confirmed available) ---
|
||||
// Moved logging to after config object is built, so logger is definitely available
|
||||
logger.info(`--- Configuration Start ---`);
|
||||
logAndReturn('NODE_ENV', config.nodeEnv);
|
||||
logAndReturn('PORT', config.port);
|
||||
logAndReturn('BASE_URL', config.baseUrl);
|
||||
logAndReturn('DEMO_MODE', config.isDemoMode);
|
||||
logAndReturn('STORAGE_TYPE', config.storageType);
|
||||
if (config.storageType === 'local') {
|
||||
logConfig(`Upload Directory (Local): ${config.uploadDir}`);
|
||||
logAndReturn('Upload Directory (Local Storage)', config.uploadDir);
|
||||
} else {
|
||||
logConfig(`Metadata Directory (S3 Mode): ${config.uploadDir}`); // Clarify role in S3 mode
|
||||
logAndReturn('Metadata Directory (S3 Mode)', config.uploadDir); // Clarify role for S3
|
||||
logAndReturn('S3_REGION', config.s3Region);
|
||||
logAndReturn('S3_BUCKET_NAME', config.s3BucketName);
|
||||
logAndReturn('S3_ACCESS_KEY_ID', config.s3AccessKeyId, false, true); // Sensitive
|
||||
logAndReturn('S3_SECRET_ACCESS_KEY', config.s3SecretAccessKey, false, true); // Sensitive
|
||||
logAndReturn('S3_ACCESS_KEY_ID', config.s3AccessKeyId, false, true);
|
||||
logAndReturn('S3_SECRET_ACCESS_KEY', config.s3SecretAccessKey, false, true);
|
||||
if (config.s3EndpointUrl) logAndReturn('S3_ENDPOINT_URL', config.s3EndpointUrl);
|
||||
logAndReturn('S3_FORCE_PATH_STYLE', config.s3ForcePathStyle);
|
||||
}
|
||||
logConfig(`Max File Size: ${config.maxFileSize / (1024 * 1024)}MB`);
|
||||
logConfig(`Auto Upload: ${config.autoUpload}`);
|
||||
if (config.allowedExtensions) logConfig(`Allowed Extensions: ${config.allowedExtensions.join(', ')}`);
|
||||
if (config.pin) logAndReturn('DUMBDROP_PIN', config.pin, false, true); // Sensitive
|
||||
if (config.allowedIframeOrigins) logConfig(`Allowed Iframe Origins: ${config.allowedIframeOrigins.join(', ')}`);
|
||||
logger.info(`Max File Size: ${config.maxFileSize / (1024 * 1024)}MB`);
|
||||
logger.info(`Auto Upload: ${config.autoUpload}`);
|
||||
if (config.allowedExtensions) logger.info(`Allowed Extensions: ${config.allowedExtensions.join(', ')}`);
|
||||
if (config.pin) logAndReturn('DUMBDROP_PIN', config.pin, false, true);
|
||||
if (config.allowedIframeOrigins) logger.info(`Allowed Iframe Origins: ${config.allowedIframeOrigins.join(', ')}`);
|
||||
if (config.appriseUrl) logAndReturn('APPRISE_URL', config.appriseUrl);
|
||||
logger.info(`Client Max Retries: ${config.clientMaxRetries}`);
|
||||
logger.info(`--- Configuration End ---`);
|
||||
|
||||
|
||||
// --- Configuration Validation ---
|
||||
function validateConfig() {
|
||||
const errors = [];
|
||||
|
||||
if (!config.port || config.port <= 0 || config.port > 65535) {
|
||||
errors.push('PORT must be a valid number between 1 and 65535');
|
||||
}
|
||||
|
||||
if (config.maxFileSize <= 0) {
|
||||
errors.push('MAX_FILE_SIZE must be greater than 0');
|
||||
}
|
||||
|
||||
// Validate BASE_URL format and trailing slash
|
||||
if (config.port <= 0 || config.port > 65535) errors.push('PORT must be a valid number between 1 and 65535');
|
||||
if (config.maxFileSize <= 0) errors.push('MAX_FILE_SIZE must be greater than 0');
|
||||
try {
|
||||
let url = new URL(config.baseUrl);
|
||||
if (!config.baseUrl.endsWith('/')) {
|
||||
errors.push('BASE_URL must end with a trailing slash ("/"). Current value: ' + config.baseUrl);
|
||||
// Attempt to fix it for runtime, but still report error
|
||||
// config.baseUrl = config.baseUrl + '/';
|
||||
}
|
||||
} catch (err) {
|
||||
errors.push(`BASE_URL must be a valid URL. Error: ${err.message}`);
|
||||
}
|
||||
new URL(config.baseUrl);
|
||||
if (!config.baseUrl.endsWith('/')) errors.push('BASE_URL must end with a trailing slash ("/"). Current: ' + config.baseUrl);
|
||||
} catch (err) { errors.push(`BASE_URL must be a valid URL. Error: ${err.message}`); }
|
||||
|
||||
// Validate S3 configuration if STORAGE_TYPE is 's3'
|
||||
if (config.storageType === 's3') {
|
||||
if (!config.s3Region) errors.push('S3_REGION is required when STORAGE_TYPE is "s3"');
|
||||
if (!config.s3BucketName) errors.push('S3_BUCKET_NAME is required when STORAGE_TYPE is "s3"');
|
||||
if (!config.s3AccessKeyId) errors.push('S3_ACCESS_KEY_ID is required when STORAGE_TYPE is "s3"');
|
||||
if (!config.s3SecretAccessKey) errors.push('S3_SECRET_ACCESS_KEY is required when STORAGE_TYPE is "s3"');
|
||||
|
||||
if (!config.s3Region) errors.push('S3_REGION is required for S3 storage');
|
||||
if (!config.s3BucketName) errors.push('S3_BUCKET_NAME is required for S3 storage');
|
||||
if (!config.s3AccessKeyId) errors.push('S3_ACCESS_KEY_ID is required for S3 storage');
|
||||
if (!config.s3SecretAccessKey) errors.push('S3_SECRET_ACCESS_KEY is required for S3 storage');
|
||||
if (config.s3ForcePathStyle && !config.s3EndpointUrl) {
|
||||
logger.warn('S3_FORCE_PATH_STYLE is true, but S3_ENDPOINT_URL is not set. This typically requires a custom endpoint.');
|
||||
logger.warn('[Config Validation] S3_FORCE_PATH_STYLE is true, but S3_ENDPOINT_URL is not set. This may not work as expected with default AWS endpoints.');
|
||||
}
|
||||
} else if (config.storageType === 'local') {
|
||||
if (!config.uploadDir) errors.push('Upload directory (UPLOAD_DIR or LOCAL_UPLOAD_DIR) is required for local storage.');
|
||||
else {
|
||||
try { fs.accessSync(config.uploadDir, fs.constants.W_OK); }
|
||||
catch (err) { errors.push(`Local upload directory "${config.uploadDir}" is not writable or does not exist.`); }
|
||||
}
|
||||
}
|
||||
|
||||
// Validate local storage dir only if type is local
|
||||
if (config.storageType === 'local') {
|
||||
if (!config.uploadDir) {
|
||||
errors.push('Upload directory could not be determined for local storage.');
|
||||
// Metadata directory check (for both local file metadata and S3 upload state metadata)
|
||||
if (!config.uploadDir) { // This condition might be redundant if local storage dir is already checked
|
||||
errors.push('A base directory (UPLOAD_DIR or LOCAL_UPLOAD_DIR) is required for metadata storage.');
|
||||
} else {
|
||||
// Check existence and writability again (ensureLocalUploadDirExists might have failed)
|
||||
try {
|
||||
fs.accessSync(config.uploadDir, fs.constants.W_OK);
|
||||
const metadataBase = path.resolve(config.uploadDir); // Base for .metadata
|
||||
if (!fs.existsSync(metadataBase)) {
|
||||
fs.mkdirSync(metadataBase, { recursive: true });
|
||||
logger.info(`[Config Validation] Created base directory for metadata: ${metadataBase}`);
|
||||
}
|
||||
fs.accessSync(metadataBase, fs.constants.W_OK); // Check writability of the parent of .metadata
|
||||
} catch (err) {
|
||||
errors.push(`Local upload directory "${config.uploadDir}" is not writable or does not exist.`);
|
||||
}
|
||||
errors.push(`Cannot access or create base directory for metadata at "${config.uploadDir}". Error: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Check metadata dir existence/writability regardless of storage type, as S3 uses it too
|
||||
try {
|
||||
const metadataParentDir = path.dirname(path.join(config.uploadDir, '.metadata'));
|
||||
if (!fs.existsSync(metadataParentDir)) {
|
||||
fs.mkdirSync(metadataParentDir, { recursive: true });
|
||||
logger.info(`Created base directory for metadata: ${metadataParentDir}`);
|
||||
}
|
||||
fs.accessSync(metadataParentDir, fs.constants.W_OK);
|
||||
} catch (err) {
|
||||
errors.push(`Cannot access or create directory for metadata storage at "${config.uploadDir}". Error: ${err.message}`);
|
||||
}
|
||||
|
||||
|
||||
if (config.nodeEnv === 'production') {
|
||||
if (!config.appriseUrl) {
|
||||
logger.info('Apprise notifications disabled (APPRISE_URL not set).');
|
||||
}
|
||||
}
|
||||
|
||||
if (errors.length > 0) {
|
||||
logger.error('--- CONFIGURATION ERRORS ---');
|
||||
errors.forEach(err => logger.error(`- ${err}`));
|
||||
logger.error('-----------------------------');
|
||||
throw new Error('Configuration validation failed. Please check environment variables.');
|
||||
throw new Error('Configuration validation failed. Please check environment variables and correct the issues.');
|
||||
}
|
||||
logger.success('[Config Validation] Configuration validated successfully.');
|
||||
}
|
||||
|
||||
logger.success('Configuration validated successfully.');
|
||||
}
|
||||
Object.freeze(config); // Freeze after logging and validation
|
||||
|
||||
// Freeze configuration to prevent modifications after initial load
|
||||
Object.freeze(config);
|
||||
|
||||
module.exports = {
|
||||
config,
|
||||
validateConfig
|
||||
};
|
||||
module.exports = { config, validateConfig };
|
@@ -1,152 +1,161 @@
|
||||
/**
|
||||
* File upload route handlers and batch upload management.
|
||||
* Handles file uploads, chunked transfers, and folder creation.
|
||||
* Manages upload sessions using persistent metadata for resumability.
|
||||
* File upload route handlers.
|
||||
* Delegates storage operations to the configured storage adapter.
|
||||
* Handles multipart uploads via adapter logic.
|
||||
*/
|
||||
|
||||
const express = require('express');
|
||||
const router = express.Router();
|
||||
const path = require('path'); // Still needed for extension checks
|
||||
const { config } = require('../config');
|
||||
const logger = require('../utils/logger');
|
||||
const { isDemoMode } = require('../utils/demoMode');
|
||||
const { storageAdapter } = require('../storage'); // Import the storage adapter
|
||||
const { isValidBatchId } = require('../utils/fileUtils');
|
||||
const crypto = require('crypto'); // Keep crypto for demo mode uploadId
|
||||
const { storageAdapter } = require('../storage'); // Import the adapter factory's result
|
||||
const { isDemoMode } = require('../utils/demoMode'); // Keep demo check for specific route behavior if needed
|
||||
|
||||
// --- Routes ---
|
||||
|
||||
// Initialize upload
|
||||
router.post('/init', async (req, res) => {
|
||||
// DEMO MODE CHECK - Bypass persistence if in demo mode
|
||||
if (isDemoMode()) {
|
||||
const { filename, fileSize } = req.body;
|
||||
const uploadId = `demo-${crypto.randomBytes(16).toString('hex')}`;
|
||||
logger.info(`[DEMO] Initialized upload for ${filename} (${fileSize} bytes) with ID ${uploadId}`);
|
||||
// Simulate zero-byte completion for demo
|
||||
if (isDemoMode() && config.storageType !== 's3') { // S3 demo might still hit the adapter for presigned URLs etc.
|
||||
// but local demo can be simpler.
|
||||
const { filename = 'demo_file.txt', fileSize = 0 } = req.body;
|
||||
const demoUploadId = 'demo-' + Math.random().toString(36).substr(2, 9);
|
||||
logger.info(`[DEMO /init] Req for ${filename}, size ${fileSize}. ID ${demoUploadId}`);
|
||||
if (Number(fileSize) === 0) {
|
||||
logger.success(`[DEMO] Completed zero-byte file upload: ${filename}`);
|
||||
// sendNotification(filename, 0, config); // In demo, notifications are typically skipped or mocked by demoAdapter
|
||||
logger.success(`[DEMO /init] Sim complete zero-byte: ${filename}`);
|
||||
}
|
||||
return res.json({ uploadId });
|
||||
return res.json({ uploadId: demoUploadId });
|
||||
}
|
||||
|
||||
const { filename, fileSize } = req.body;
|
||||
const clientBatchId = req.headers['x-batch-id'];
|
||||
|
||||
// --- Basic validations ---
|
||||
if (!filename) return res.status(400).json({ error: 'Missing filename' });
|
||||
if (fileSize === undefined || fileSize === null) return res.status(400).json({ error: 'Missing fileSize' });
|
||||
const size = Number(fileSize);
|
||||
if (isNaN(size) || size < 0) return res.status(400).json({ error: 'Invalid file size' });
|
||||
const maxSizeInBytes = config.maxFileSize;
|
||||
if (size > maxSizeInBytes) return res.status(413).json({ error: 'File too large', limit: maxSizeInBytes });
|
||||
|
||||
// Validate clientBatchId if provided
|
||||
if (clientBatchId && !isValidBatchId(clientBatchId)) {
|
||||
return res.status(400).json({ error: 'Invalid batch ID format' });
|
||||
if (size > config.maxFileSize) {
|
||||
logger.warn(`Upload rejected: File size ${size} exceeds limit ${config.maxFileSize} for ${filename}`);
|
||||
return res.status(413).json({ error: 'File too large', limit: config.maxFileSize });
|
||||
}
|
||||
|
||||
if (config.allowedExtensions && config.allowedExtensions.length > 0) {
|
||||
const fileExt = path.extname(filename).toLowerCase();
|
||||
if (!fileExt || !config.allowedExtensions.includes(fileExt)) {
|
||||
logger.warn(`Upload rejected: File type not allowed: ${filename} (Ext: ${fileExt || 'none'})`);
|
||||
return res.status(400).json({ error: 'File type not allowed', receivedExtension: fileExt || 'none' });
|
||||
}
|
||||
logger.debug(`File extension ${fileExt} allowed for ${filename}`);
|
||||
}
|
||||
|
||||
try {
|
||||
const { uploadId } = await storageAdapter.initUpload(filename, size, clientBatchId);
|
||||
logger.info(`[Route /init] Storage adapter initialized upload: ${uploadId} for ${filename}`);
|
||||
res.json({ uploadId });
|
||||
|
||||
const result = await storageAdapter.initUpload(filename, size, clientBatchId);
|
||||
res.json({ uploadId: result.uploadId });
|
||||
} catch (err) {
|
||||
logger.error(`[Route /init] Upload initialization failed: ${err.message} ${err.stack}`);
|
||||
// Check for specific error types if adapter throws them (e.g., size limit from adapter)
|
||||
if (err.message.includes('File too large') || err.status === 413) {
|
||||
return res.status(413).json({ error: 'File too large', details: err.message, limit: config.maxFileSize });
|
||||
logger.error(`[Route /init] Upload initialization failed for "${filename}": ${err.name} - ${err.message}`, err.stack);
|
||||
let statusCode = 500;
|
||||
let clientMessage = 'Failed to initialize upload.';
|
||||
|
||||
if (err.message.includes('Invalid batch ID format')) {
|
||||
statusCode = 400; clientMessage = err.message;
|
||||
} else if (err.name === 'NoSuchBucket' || err.name === 'AccessDenied') {
|
||||
statusCode = 500; clientMessage = 'Storage configuration error.';
|
||||
} else if (err.code === 'EACCES' || err.code === 'EPERM' || err.message.includes('writable') || err.message.includes('metadata directory')) {
|
||||
statusCode = 500; clientMessage = 'Storage permission or access error.';
|
||||
} else if (err.message.includes('S3 Client configuration failed')) {
|
||||
statusCode = 503; clientMessage = 'Storage service unavailable or misconfigured.';
|
||||
}
|
||||
if (err.message.includes('File type not allowed') || err.status === 400) {
|
||||
return res.status(400).json({ error: 'File type not allowed', details: err.message });
|
||||
}
|
||||
return res.status(500).json({ error: 'Failed to initialize upload via adapter', details: err.message });
|
||||
res.status(statusCode).json({ error: clientMessage, details: config.nodeEnv === 'development' ? err.message : undefined });
|
||||
}
|
||||
});
|
||||
|
||||
// Upload chunk
|
||||
router.post('/chunk/:uploadId', express.raw({
|
||||
limit: config.maxFileSize + (10 * 1024 * 1024), // Generous limit for raw body
|
||||
limit: config.maxFileSize + (10 * 1024 * 1024),
|
||||
type: 'application/octet-stream'
|
||||
}), async (req, res) => {
|
||||
// DEMO MODE CHECK
|
||||
if (isDemoMode()) {
|
||||
const { uploadId } = req.params;
|
||||
logger.debug(`[DEMO] Received chunk for ${uploadId}`);
|
||||
// Fake progress - requires knowing file size which isn't easily available here in demo
|
||||
const demoProgress = Math.min(100, Math.random() * 100); // Placeholder
|
||||
return res.json({ bytesReceived: 0, progress: demoProgress });
|
||||
const chunk = req.body;
|
||||
const partNumber = parseInt(req.query.partNumber, 10); // Ensure partNumber is parsed
|
||||
|
||||
if (isNaN(partNumber) || partNumber < 1) {
|
||||
logger.error(`[Route /chunk] Invalid partNumber for ${uploadId}: ${req.query.partNumber}`);
|
||||
return res.status(400).json({ error: 'Missing or invalid partNumber query parameter (must be >= 1)' });
|
||||
}
|
||||
|
||||
const { uploadId } = req.params;
|
||||
let chunk = req.body;
|
||||
const chunkSize = chunk.length;
|
||||
if (isDemoMode() && config.storageType !== 's3') {
|
||||
logger.debug(`[DEMO /chunk] Chunk for ${uploadId}, part ${partNumber}, size ${chunk?.length || 0}`);
|
||||
const demoProgress = Math.min(100, (Math.random() * 50) + (partNumber * 10) ); // Simulate increasing progress
|
||||
const completed = demoProgress >= 100;
|
||||
if (completed) logger.info(`[DEMO /chunk] Sim completion for ${uploadId}`);
|
||||
return res.json({ bytesReceived: 0, progress: demoProgress, completed });
|
||||
}
|
||||
|
||||
if (!chunkSize) return res.status(400).json({ error: 'Empty chunk received' });
|
||||
if (!chunk || chunk.length === 0) {
|
||||
logger.warn(`[Route /chunk] Empty chunk for ${uploadId}, part ${partNumber}`);
|
||||
return res.status(400).json({ error: 'Empty chunk received' });
|
||||
}
|
||||
|
||||
try {
|
||||
// Delegate to storage adapter
|
||||
// The adapter's storeChunk should handle partNumber for S3 internally
|
||||
const { bytesReceived, progress, completed } = await storageAdapter.storeChunk(uploadId, chunk);
|
||||
logger.debug(`[Route /chunk] Stored chunk for ${uploadId}. Progress: ${progress}%, Completed by adapter: ${completed}`);
|
||||
const result = await storageAdapter.storeChunk(uploadId, chunk, partNumber);
|
||||
|
||||
if (completed) {
|
||||
logger.info(`[Route /chunk] Adapter reported completion for ${uploadId}. Finalizing...`);
|
||||
if (result.completed) {
|
||||
logger.info(`[Route /chunk] Part ${partNumber} for ${uploadId} triggered completion. Finalizing...`);
|
||||
try {
|
||||
const finalizationResult = await storageAdapter.completeUpload(uploadId);
|
||||
logger.success(`[Route /chunk] Successfully finalized upload ${uploadId}. Final path/key: ${finalizationResult.finalPath}`);
|
||||
// The adapter's completeUpload method is responsible for sending notifications and cleaning its metadata.
|
||||
} catch (completeErr) {
|
||||
logger.error(`[Route /chunk] CRITICAL: Failed to finalize completed upload ${uploadId} after storing chunk: ${completeErr.message} ${completeErr.stack}`);
|
||||
// If completeUpload fails, the client might retry the chunk.
|
||||
// The adapter's storeChunk should be idempotent or handle this.
|
||||
// We still return the progress of the chunk write to the client.
|
||||
// The client will likely retry, or the user will see the upload stall at 100% if this was the last chunk.
|
||||
// Consider what to return to client here. For now, return chunk progress but log server error.
|
||||
// The 'completed' flag from storeChunk might cause client to stop sending if it thinks it's done.
|
||||
// If completeUpload fails, maybe the response to client should indicate not fully complete yet?
|
||||
// Let's return the original progress. The client will retry if needed.
|
||||
return res.status(500).json({ error: 'Chunk processed but finalization failed on server.', details: completeErr.message, currentProgress: progress });
|
||||
const completionResult = await storageAdapter.completeUpload(uploadId);
|
||||
logger.success(`[Route /chunk] Finalized upload ${uploadId}. Path/Key: ${completionResult.finalPath}`);
|
||||
return res.json({ bytesReceived: result.bytesReceived, progress: 100, completed: true });
|
||||
} catch (completionError) {
|
||||
logger.error(`[Route /chunk] CRITICAL: Failed to finalize ${uploadId} after part ${partNumber}: ${completionError.message}`, completionError.stack);
|
||||
return res.status(500).json({ error: 'Upload chunk received, but failed to finalize.', details: config.nodeEnv === 'development' ? completionError.message : undefined });
|
||||
}
|
||||
} else {
|
||||
res.json({ bytesReceived: result.bytesReceived, progress: result.progress, completed: false });
|
||||
}
|
||||
res.json({ bytesReceived, progress });
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[Route /chunk] Chunk upload failed for ${uploadId}: ${err.message} ${err.stack}`);
|
||||
if (err.message.includes('Upload session not found')) {
|
||||
return res.status(404).json({ error: 'Upload session not found or already completed', details: err.message });
|
||||
logger.error(`[Route /chunk] Chunk upload failed for ${uploadId}, part ${partNumber}: ${err.name} - ${err.message}`, err.stack);
|
||||
let statusCode = 500;
|
||||
let clientMessage = 'Failed to process chunk.';
|
||||
|
||||
if (err.message.includes('Upload session not found') || err.name === 'NoSuchUpload' || err.code === 'ENOENT' || err.name === 'NotFound' || err.name === 'NoSuchKey') {
|
||||
statusCode = 404; clientMessage = 'Upload session not found or already completed/aborted.';
|
||||
} else if (err.name === 'InvalidPart' || err.name === 'InvalidPartOrder') {
|
||||
statusCode = 400; clientMessage = 'Invalid upload chunk sequence or data.';
|
||||
} else if (err.name === 'SlowDown' || (err.$metadata && err.$metadata.httpStatusCode === 503) ) {
|
||||
statusCode = 429; clientMessage = 'Storage provider rate limit exceeded, please try again later.';
|
||||
} else if (err.code === 'EACCES' || err.code === 'EPERM' ) {
|
||||
statusCode = 500; clientMessage = 'Storage permission error while writing chunk.';
|
||||
}
|
||||
// Don't delete adapter's metadata on generic chunk errors, let client retry or adapter's cleanup handle stale entries.
|
||||
res.status(500).json({ error: 'Failed to process chunk via adapter', details: err.message });
|
||||
res.status(statusCode).json({ error: clientMessage, details: config.nodeEnv === 'development' ? err.message : undefined });
|
||||
}
|
||||
});
|
||||
|
||||
// Cancel upload
|
||||
router.post('/cancel/:uploadId', async (req, res) => {
|
||||
// DEMO MODE CHECK
|
||||
if (isDemoMode()) {
|
||||
logger.info(`[DEMO] Upload cancelled: ${req.params.uploadId}`);
|
||||
const { uploadId } = req.params;
|
||||
|
||||
if (isDemoMode() && config.storageType !== 's3') {
|
||||
logger.info(`[DEMO /cancel] Request for ${uploadId}`);
|
||||
return res.json({ message: 'Upload cancelled (Demo)' });
|
||||
}
|
||||
|
||||
const { uploadId } = req.params;
|
||||
logger.info(`[Route /cancel] Received cancel request for upload: ${uploadId}`);
|
||||
|
||||
logger.info(`[Route /cancel] Cancel request for upload: ${uploadId}`);
|
||||
try {
|
||||
await storageAdapter.abortUpload(uploadId);
|
||||
logger.info(`[Route /cancel] Upload ${uploadId} cancelled via storage adapter.`);
|
||||
res.json({ message: 'Upload cancelled or already complete' });
|
||||
res.json({ message: 'Upload cancelled successfully or was already inactive.' });
|
||||
} catch (err) {
|
||||
logger.error(`[Route /cancel] Error during upload cancellation for ${uploadId}: ${err.message}`);
|
||||
// Adapters should handle "not found" gracefully or throw specific error
|
||||
if (err.message.includes('not found')) { // Generic check
|
||||
return res.status(404).json({ error: 'Upload not found or already processed', details: err.message });
|
||||
logger.error(`[Route /cancel] Error during cancellation for ${uploadId}: ${err.name} - ${err.message}`, err.stack);
|
||||
// Generally, client doesn't need to know if server-side abort failed catastrophically,
|
||||
// as long as client stops sending. However, if it's a config error, 500 is appropriate.
|
||||
let statusCode = err.name === 'NoSuchUpload' ? 200 : 500; // If not found, it's like success for client
|
||||
let clientMessage = err.name === 'NoSuchUpload' ? 'Upload already inactive or not found.' : 'Failed to cancel upload on server.';
|
||||
if (err.name === 'AccessDenied' || err.name === 'NoSuchBucket') {
|
||||
clientMessage = 'Storage configuration error during cancel.';
|
||||
statusCode = 500;
|
||||
}
|
||||
res.status(500).json({ error: 'Failed to cancel upload via adapter' });
|
||||
res.status(statusCode).json({ message: clientMessage, details: config.nodeEnv === 'development' ? err.message : undefined });
|
||||
}
|
||||
});
|
||||
|
||||
module.exports = {
|
||||
router
|
||||
// Remove internal metadata/batch cleanup exports as they are adapter-specific now or not used by router
|
||||
};
|
||||
module.exports = { router }; // Only export the router object
|
104
src/server.js
104
src/server.js
@@ -1,124 +1,110 @@
|
||||
/**
|
||||
* Server entry point that starts the HTTP server and manages connections.
|
||||
* Handles graceful shutdown, connection tracking, and server initialization.
|
||||
* Provides development mode directory listing functionality.
|
||||
*/
|
||||
|
||||
const { app, initialize, config } = require('./app');
|
||||
const { app, initialize, config } = require('./app'); // config is now also exported from app.js
|
||||
const logger = require('./utils/logger');
|
||||
const fs = require('fs');
|
||||
const fs = require('fs'); // Keep for readdirSync if needed for local dev logging
|
||||
const { executeCleanup } = require('./utils/cleanup');
|
||||
const { generatePWAManifest } = require('./scripts/pwa-manifest-generator')
|
||||
const { generatePWAManifest } = require('./scripts/pwa-manifest-generator');
|
||||
|
||||
// Track open connections
|
||||
const connections = new Set();
|
||||
|
||||
/**
|
||||
* Start the server and initialize the application
|
||||
* @returns {Promise<http.Server>} The HTTP server instance
|
||||
*/
|
||||
async function startServer() {
|
||||
try {
|
||||
// Initialize the application
|
||||
await initialize();
|
||||
await initialize(); // This will call validateConfig and load storage adapter via app.js
|
||||
|
||||
// Start the server
|
||||
const server = app.listen(config.port, () => {
|
||||
logger.info(`Server running at ${config.baseUrl}`);
|
||||
logger.info(`Upload directory (for local adapter state/uploads): ${config.uploadDir}`);
|
||||
// ** MODIFIED LOGGING **
|
||||
logger.info(`Active Storage Type: ${config.storageType}`);
|
||||
logger.info(`Data Directory (for uploads or metadata): ${config.uploadDir}`);
|
||||
|
||||
// List directory contents in development
|
||||
if (config.nodeEnv === 'development') {
|
||||
if (config.nodeEnv === 'development' && config.storageType === 'local') {
|
||||
try {
|
||||
// Only list contents if it's local storage and dev mode
|
||||
if (fs.existsSync(config.uploadDir)) {
|
||||
const files = fs.readdirSync(config.uploadDir);
|
||||
logger.info(`Current directory contents (${files.length} files):`);
|
||||
files.forEach(file => {
|
||||
logger.info(`- ${file}`);
|
||||
});
|
||||
logger.info(`Current local upload directory contents (${config.uploadDir}):`);
|
||||
files.forEach(file => logger.info(`- ${file}`));
|
||||
} else {
|
||||
logger.warn(`Local upload directory ${config.uploadDir} does not exist for listing.`);
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error(`Failed to list directory contents: ${err.message}`);
|
||||
logger.error(`Failed to list local upload directory contents: ${err.message}`);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Dynamically generate PWA manifest into public folder
|
||||
generatePWAManifest();
|
||||
|
||||
// Track new connections
|
||||
server.on('connection', (connection) => {
|
||||
connections.add(connection);
|
||||
connection.on('close', () => {
|
||||
connections.delete(connection);
|
||||
});
|
||||
connection.on('close', () => connections.delete(connection));
|
||||
});
|
||||
|
||||
// Shutdown handler function
|
||||
let isShuttingDown = false; // Prevent multiple shutdowns
|
||||
let isShuttingDown = false;
|
||||
const shutdownHandler = async (signal) => {
|
||||
if (isShuttingDown) return;
|
||||
isShuttingDown = true;
|
||||
logger.info(`${signal} received. Shutting down gracefully...`);
|
||||
|
||||
// Start a shorter force shutdown timer
|
||||
const forceShutdownTimer = setTimeout(() => {
|
||||
logger.error('Force shutdown initiated');
|
||||
logger.error('Force shutdown due to timeout.');
|
||||
process.exit(1);
|
||||
}, 3000); // 3 seconds maximum for total shutdown
|
||||
}, 5000); // Increased slightly
|
||||
|
||||
try {
|
||||
// 1. Stop accepting new connections immediately
|
||||
server.unref();
|
||||
server.closeIdleConnections?.(); // Node 18+
|
||||
|
||||
// 2. Close all existing connections with a shorter timeout
|
||||
const connectionClosePromises = Array.from(connections).map(conn => {
|
||||
return new Promise(resolve => {
|
||||
conn.end(() => {
|
||||
connections.delete(conn);
|
||||
const closePromises = Array.from(connections).map(conn => new Promise(resolve => {
|
||||
conn.on('close', resolve); // Ensure close event resolves
|
||||
conn.destroy(); // Actively destroy connections
|
||||
}));
|
||||
|
||||
await Promise.race([
|
||||
Promise.all(closePromises),
|
||||
new Promise(resolve => setTimeout(resolve, 2000)) // Max 2s for connections
|
||||
]);
|
||||
connections.clear();
|
||||
|
||||
|
||||
await new Promise((resolve, reject) => {
|
||||
server.close((err) => {
|
||||
if (err) return reject(err);
|
||||
logger.info('Server closed.');
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
// Wait for connections to close with a timeout
|
||||
await Promise.race([
|
||||
Promise.all(connectionClosePromises),
|
||||
new Promise(resolve => setTimeout(resolve, 1000)) // 1 second timeout for connections
|
||||
]);
|
||||
await executeCleanup(1500); // Max 1.5s for cleanup
|
||||
|
||||
// 3. Close the server
|
||||
await new Promise((resolve) => server.close(resolve));
|
||||
logger.info('Server closed');
|
||||
|
||||
// 4. Run cleanup tasks with a shorter timeout
|
||||
await executeCleanup(1000); // 1 second timeout for cleanup
|
||||
|
||||
// Clear the force shutdown timer since we completed gracefully
|
||||
clearTimeout(forceShutdownTimer);
|
||||
process.exitCode = 0;
|
||||
process.exit(0); // Ensure immediate exit
|
||||
logger.info('Shutdown complete.');
|
||||
process.exit(0);
|
||||
} catch (error) {
|
||||
clearTimeout(forceShutdownTimer); // Clear timer on error too
|
||||
logger.error(`Error during shutdown: ${error.message}`);
|
||||
process.exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
// Handle both SIGTERM and SIGINT
|
||||
process.on('SIGTERM', () => shutdownHandler('SIGTERM'));
|
||||
process.on('SIGINT', () => shutdownHandler('SIGINT'));
|
||||
|
||||
return server;
|
||||
} catch (error) {
|
||||
logger.error('Failed to start server:', error);
|
||||
// Ensure process exits if startServer itself fails before listener setup
|
||||
process.exitCode = 1;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// Only start the server if this file is run directly
|
||||
if (require.main === module) {
|
||||
startServer().catch((error) => {
|
||||
logger.error('Server failed to start:', error);
|
||||
process.exitCode = 1;
|
||||
throw error;
|
||||
// Error already logged by startServer
|
||||
// process.exitCode is already set if startServer throws
|
||||
});
|
||||
}
|
||||
|
||||
|
@@ -3,80 +3,83 @@
|
||||
* Handles file operations for storing files on AWS S3 or S3-compatible services.
|
||||
* Implements the storage interface expected by the application routes.
|
||||
* Uses local files in '.metadata' directory to track multipart upload progress.
|
||||
* Buffers individual parts for MPU or entire small files before S3 PUT.
|
||||
* Attempts to make top-level folder prefixes unique per batch if collisions occur.
|
||||
*/
|
||||
|
||||
const {
|
||||
S3Client, CreateMultipartUploadCommand, UploadPartCommand,
|
||||
CompleteMultipartUploadCommand, AbortMultipartUploadCommand, ListObjectsV2Command,
|
||||
GetObjectCommand, DeleteObjectCommand, PutObjectCommand
|
||||
S3Client,
|
||||
CreateMultipartUploadCommand,
|
||||
UploadPartCommand,
|
||||
CompleteMultipartUploadCommand,
|
||||
AbortMultipartUploadCommand,
|
||||
ListObjectsV2Command,
|
||||
GetObjectCommand,
|
||||
DeleteObjectCommand,
|
||||
PutObjectCommand,
|
||||
HeadObjectCommand
|
||||
} = require('@aws-sdk/client-s3');
|
||||
const { getSignedUrl } = require("@aws-sdk/s3-request-presigner");
|
||||
const fs = require('fs').promises;
|
||||
const fsSync = require('fs'); // For synchronous checks
|
||||
const fsSync = require('fs');
|
||||
const path = require('path');
|
||||
const crypto = require('crypto');
|
||||
const util = require('util'); // For detailed error logging
|
||||
const { config } = require('../config');
|
||||
const logger = require('../utils/logger');
|
||||
const { sanitizePathPreserveDirs, isValidBatchId, formatFileSize } = require('../utils/fileUtils');
|
||||
const {
|
||||
sanitizePathPreserveDirs,
|
||||
formatFileSize
|
||||
} = require('../utils/fileUtils');
|
||||
const { sendNotification } = require('../services/notifications');
|
||||
|
||||
// --- Constants ---
|
||||
const METADATA_DIR = path.join(config.uploadDir, '.metadata');
|
||||
const TEMP_CHUNK_DIR = path.join(config.uploadDir, '.temp_chunks'); // For buffering parts or small files
|
||||
const UPLOAD_TIMEOUT = 30 * 60 * 1000; // 30 min local stale cleanup
|
||||
const MIN_S3_TOTAL_SIZE_FOR_MULTIPART = 5 * 1024 * 1024; // 5MB
|
||||
const S3_PART_SIZE = 5 * 1024 * 1024; // Min 5MB for S3 parts (except last)
|
||||
|
||||
// --- Helper function to convert stream to string ---
|
||||
async function streamToString(stream) {
|
||||
if (!stream || typeof stream.pipe !== 'function') return Promise.resolve(null);
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks = [];
|
||||
stream.on('data', (chunk) => chunks.push(chunk));
|
||||
stream.on('error', reject);
|
||||
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
|
||||
});
|
||||
}
|
||||
const UPLOAD_TIMEOUT = 30 * 60 * 1000; // For local metadata cleanup
|
||||
|
||||
// --- S3 Client Initialization ---
|
||||
let s3Client;
|
||||
try {
|
||||
s3Client = new S3Client({
|
||||
const s3ClientConfig = {
|
||||
region: config.s3Region,
|
||||
credentials: { accessKeyId: config.s3AccessKeyId, secretAccessKey: config.s3SecretAccessKey },
|
||||
credentials: {
|
||||
accessKeyId: config.s3AccessKeyId,
|
||||
secretAccessKey: config.s3SecretAccessKey,
|
||||
},
|
||||
...(config.s3EndpointUrl && { endpoint: config.s3EndpointUrl }),
|
||||
...(config.s3ForcePathStyle && { forcePathStyle: true }),
|
||||
});
|
||||
logger.success('[S3 Adapter] S3 Client configured.');
|
||||
};
|
||||
if (s3ClientConfig.endpoint) logger.info(`[S3 Adapter] Configuring S3 client for endpoint: ${s3ClientConfig.endpoint}`);
|
||||
if (s3ClientConfig.forcePathStyle) logger.info(`[S3 Adapter] Configuring S3 client with forcePathStyle: true`);
|
||||
s3Client = new S3Client(s3ClientConfig);
|
||||
logger.success('[S3 Adapter] S3 Client configured successfully.');
|
||||
} catch (error) {
|
||||
logger.error(`[S3 Adapter] Failed to configure S3 client: ${error.message}`);
|
||||
throw new Error('S3 Client configuration failed.');
|
||||
throw new Error('S3 Client configuration failed. Check S3 environment variables.');
|
||||
}
|
||||
|
||||
async function ensureDirExists(dirPath, purpose) {
|
||||
// --- Metadata Helper Functions ---
|
||||
async function ensureMetadataDirExists() {
|
||||
try {
|
||||
if (!fsSync.existsSync(dirPath)) {
|
||||
await fs.mkdir(dirPath, { recursive: true });
|
||||
logger.info(`[S3 Adapter] Created local ${purpose} directory: ${dirPath}`);
|
||||
if (!fsSync.existsSync(METADATA_DIR)) {
|
||||
await fs.mkdir(METADATA_DIR, { recursive: true });
|
||||
logger.info(`[S3 Adapter] Created local metadata directory: ${METADATA_DIR}`);
|
||||
}
|
||||
await fs.access(dirPath, fsSync.constants.W_OK);
|
||||
await fs.access(METADATA_DIR, fsSync.constants.W_OK);
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Local ${purpose} directory error (${dirPath}): ${err.message}`);
|
||||
throw new Error(`Failed to access/create local ${purpose} directory: ${dirPath}`);
|
||||
logger.error(`[S3 Adapter] Local metadata directory error (${METADATA_DIR}): ${err.message}`);
|
||||
throw new Error(`Failed to access or create local metadata directory for S3 adapter state: ${METADATA_DIR}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Metadata functions (read, write, delete) remain largely the same as your last working version.
|
||||
// Ensure metadata includes: appUploadId, s3Key, originalFilename, fileSize, bytesReceived, batchId,
|
||||
// createdAt, lastActivity, isMultipartUpload, tempPartPath, s3UploadId (for MPU), parts (for MPU), currentPartBytes (for MPU).
|
||||
|
||||
async function readUploadMetadata(uploadId) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return null;
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
|
||||
logger.warn(`[S3 Adapter] Attempted to read metadata with invalid uploadId: ${uploadId}`);
|
||||
return null;
|
||||
}
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
try {
|
||||
const data = await fs.readFile(metaFilePath, 'utf8');
|
||||
const metadata = JSON.parse(data);
|
||||
metadata.parts = metadata.parts || []; // Ensure parts array exists if MPU
|
||||
metadata.parts = metadata.parts || [];
|
||||
return metadata;
|
||||
} catch (err) {
|
||||
if (err.code === 'ENOENT') return null;
|
||||
@@ -86,9 +89,13 @@ try {
|
||||
}
|
||||
|
||||
async function writeUploadMetadata(uploadId, metadata) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return;
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
|
||||
logger.error(`[S3 Adapter] Attempted to write metadata with invalid uploadId: ${uploadId}`);
|
||||
return;
|
||||
}
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
metadata.lastActivity = Date.now();
|
||||
metadata.parts = metadata.parts || [];
|
||||
try {
|
||||
const tempMetaPath = `${metaFilePath}.${crypto.randomBytes(4).toString('hex')}.tmp`;
|
||||
await fs.writeFile(tempMetaPath, JSON.stringify(metadata, null, 2));
|
||||
@@ -101,174 +108,228 @@ try {
|
||||
}
|
||||
|
||||
async function deleteUploadMetadata(uploadId) {
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) return;
|
||||
if (!uploadId || typeof uploadId !== 'string' || uploadId.includes('..')) {
|
||||
logger.warn(`[S3 Adapter] Attempted to delete metadata with invalid uploadId: ${uploadId}`);
|
||||
return;
|
||||
}
|
||||
const metaFilePath = path.join(METADATA_DIR, `${uploadId}.meta`);
|
||||
try { await fs.unlink(metaFilePath); }
|
||||
catch (err) { if (err.code !== 'ENOENT') logger.error(`[S3 Adapter] Err deleting meta ${uploadId}.meta: ${err.message}`);}
|
||||
try {
|
||||
await fs.unlink(metaFilePath);
|
||||
logger.debug(`[S3 Adapter] Deleted metadata file: ${uploadId}.meta`);
|
||||
} catch (err) {
|
||||
if (err.code !== 'ENOENT') logger.error(`[S3 Adapter] Error deleting metadata file ${uploadId}.meta: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
ensureMetadataDirExists().catch(err => {
|
||||
logger.error(`[S3 Adapter] Initialization failed (metadata dir): ${err.message}`);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
Promise.all([
|
||||
ensureDirExists(METADATA_DIR, 'metadata'),
|
||||
ensureDirExists(TEMP_CHUNK_DIR, 'part/small file buffering')
|
||||
]).catch(err => { logger.error(`[S3 Adapter] Critical dir ensure error: ${err.message}`); process.exit(1); });
|
||||
// --- S3 Object/Prefix Utilities ---
|
||||
const batchS3PrefixMappings = new Map(); // In-memory: originalTopLevelFolder-batchId -> actualS3Prefix
|
||||
|
||||
async function s3ObjectExists(key) {
|
||||
logger.info(`[S3 Adapter] s3ObjectExists: Checking key "${key}"`);
|
||||
try {
|
||||
await s3Client.send(new HeadObjectCommand({ Bucket: config.s3BucketName, Key: key }));
|
||||
logger.info(`[S3 Adapter] s3ObjectExists: HeadObject success for key "${key}". Key EXISTS.`);
|
||||
return true;
|
||||
} catch (error) {
|
||||
// logger.error(`[S3 Adapter DEBUG] Full error object for HeadObject on key "${key}":\n`, util.inspect(error, { showHidden: false, depth: null, colors: false }));
|
||||
if (error.name === 'NotFound' || error.name === 'NoSuchKey' || (error.$metadata && error.$metadata.httpStatusCode === 404)) {
|
||||
logger.info(`[S3 Adapter] s3ObjectExists: Key "${key}" NOT found (404-like error).`);
|
||||
return false;
|
||||
}
|
||||
if (error.name === '403' || (error.$metadata && error.$metadata.httpStatusCode === 403)) {
|
||||
logger.warn(`[S3 Adapter] s3ObjectExists: Received 403 Forbidden for key "${key}". For unique key generation, treating this as 'likely does not exist'.`);
|
||||
return false;
|
||||
}
|
||||
logger.error(`[S3 Adapter] s3ObjectExists: Unhandled error type "${error.name}" for key "${key}": ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function getUniqueS3FolderPrefix(originalPrefix, batchId) {
|
||||
if (!originalPrefix || !originalPrefix.endsWith('/')) {
|
||||
logger.error("[S3 Adapter] getUniqueS3FolderPrefix: originalPrefix must be a non-empty string ending with '/'");
|
||||
return originalPrefix; // Or throw error
|
||||
}
|
||||
const prefixMapKey = `${originalPrefix}-${batchId}`;
|
||||
if (batchS3PrefixMappings.has(prefixMapKey)) {
|
||||
return batchS3PrefixMappings.get(prefixMapKey);
|
||||
}
|
||||
|
||||
let currentPrefixToCheck = originalPrefix;
|
||||
let counter = 1;
|
||||
const baseName = originalPrefix.slice(0, -1); // "MyFolder" from "MyFolder/"
|
||||
|
||||
async function prefixHasObjects(prefix) {
|
||||
try {
|
||||
const listResponse = await s3Client.send(new ListObjectsV2Command({
|
||||
Bucket: config.s3BucketName, Prefix: prefix, MaxKeys: 1
|
||||
}));
|
||||
return listResponse.KeyCount > 0;
|
||||
} catch (error) {
|
||||
logger.error(`[S3 Adapter] Error listing objects for prefix check "${prefix}": ${error.message}`);
|
||||
throw error; // Propagate error if listing fails for permission reasons etc.
|
||||
}
|
||||
}
|
||||
|
||||
while (await prefixHasObjects(currentPrefixToCheck)) {
|
||||
logger.warn(`[S3 Adapter] S3 prefix "${currentPrefixToCheck}" is not empty. Generating unique prefix for base "${baseName}/".`);
|
||||
currentPrefixToCheck = `${baseName}-${counter}/`; // Use hyphen for suffix
|
||||
counter++;
|
||||
}
|
||||
|
||||
if (currentPrefixToCheck !== originalPrefix) {
|
||||
logger.info(`[S3 Adapter] Using unique S3 folder prefix: "${currentPrefixToCheck}" for original "${originalPrefix}" in batch "${batchId}"`);
|
||||
}
|
||||
batchS3PrefixMappings.set(prefixMapKey, currentPrefixToCheck);
|
||||
return currentPrefixToCheck;
|
||||
}
|
||||
|
||||
// --- Interface Implementation ---
|
||||
async function initUpload(filename, fileSize, clientBatchId) {
|
||||
await ensureMetadataDirExists();
|
||||
const size = Number(fileSize);
|
||||
const appUploadId = crypto.randomBytes(16).toString('hex');
|
||||
const sanitizedFilename = sanitizePathPreserveDirs(filename);
|
||||
const s3Key = path.normalize(sanitizedFilename).replace(/^(\.\.(\/|\\|$))+/, '').replace(/\\/g, '/').replace(/^\/+/, '');
|
||||
logger.info(`[S3 Adapter] Init: Key: ${s3Key}, Size: ${size}`);
|
||||
|
||||
const batchId = clientBatchId || `${Date.now()}-${crypto.randomBytes(4).toString('hex').substring(0, 9)}`;
|
||||
const tempPartPath = path.join(TEMP_CHUNK_DIR, `${appUploadId}.partbuffer`); // Buffer for current part or small file
|
||||
|
||||
const baseMetadata = {
|
||||
appUploadId, s3Key, originalFilename: filename, fileSize: size,
|
||||
bytesReceived: 0, batchId, createdAt: Date.now(), lastActivity: Date.now(),
|
||||
tempPartPath, currentPartBytes: 0, parts: []
|
||||
};
|
||||
const originalSanitizedFullpath = sanitizePathPreserveDirs(filename); // e.g., "MyFolder/image.jpg" or "image.jpg"
|
||||
let s3KeyStructure = path.normalize(originalSanitizedFullpath)
|
||||
.replace(/^(\.\.(\/|\\|$))+/, '').replace(/\\/g, '/').replace(/^\/+/, '');
|
||||
|
||||
let effectiveBasePrefix = ""; // e.g., "MyFolder-1/" or ""
|
||||
const pathParts = s3KeyStructure.split('/');
|
||||
const isNestedPath = pathParts.length > 1;
|
||||
let relativePathInFolder = s3KeyStructure;
|
||||
|
||||
if (isNestedPath) {
|
||||
const originalTopLevelFolder = pathParts[0] + '/'; // "MyFolder/"
|
||||
effectiveBasePrefix = await getUniqueS3FolderPrefix(originalTopLevelFolder, batchId);
|
||||
relativePathInFolder = pathParts.slice(1).join('/'); // "SubFolder/image.jpg" or "image.jpg"
|
||||
s3KeyStructure = effectiveBasePrefix + relativePathInFolder;
|
||||
}
|
||||
logger.info(`[S3 Adapter] Init: Original Full Path: "${originalSanitizedFullpath}", Effective Base Prefix: "${effectiveBasePrefix}", Relative Path In Folder: "${relativePathInFolder}"`);
|
||||
|
||||
let finalS3Key = s3KeyStructure;
|
||||
let fileCounter = 1;
|
||||
const fileDir = path.dirname(s3KeyStructure);
|
||||
const fileExt = path.extname(s3KeyStructure);
|
||||
const fileBaseName = path.basename(s3KeyStructure, fileExt);
|
||||
|
||||
while (await s3ObjectExists(finalS3Key)) {
|
||||
logger.warn(`[S3 Adapter] S3 file key already exists: "${finalS3Key}". Generating unique file key.`);
|
||||
finalS3Key = (fileDir === "." ? "" : fileDir + "/") + `${fileBaseName}-${fileCounter}${fileExt}`; // Use hyphen
|
||||
fileCounter++;
|
||||
}
|
||||
if (finalS3Key !== s3KeyStructure) {
|
||||
logger.info(`[S3 Adapter] Using unique S3 file key: "${finalS3Key}"`);
|
||||
}
|
||||
|
||||
if (size === 0) {
|
||||
await s3Client.send(new PutObjectCommand({ Bucket: config.s3BucketName, Key: s3Key, Body: '', ContentLength: 0 }));
|
||||
logger.success(`[S3 Adapter] Zero-byte uploaded: ${s3Key}`);
|
||||
sendNotification(filename, 0, config);
|
||||
try {
|
||||
await s3Client.send(new PutObjectCommand({
|
||||
Bucket: config.s3BucketName, Key: finalS3Key, Body: '', ContentLength: 0
|
||||
}));
|
||||
logger.success(`[S3 Adapter] Completed zero-byte file: ${finalS3Key}`);
|
||||
sendNotification(originalSanitizedFullpath, 0, config);
|
||||
return { uploadId: `zero-byte-${appUploadId}` };
|
||||
} catch (putErr) {
|
||||
logger.error(`[S3 Adapter] Failed zero-byte PUT for ${finalS3Key}: ${putErr.message}`);
|
||||
throw putErr;
|
||||
}
|
||||
}
|
||||
|
||||
let metadata;
|
||||
if (size < MIN_S3_TOTAL_SIZE_FOR_MULTIPART) {
|
||||
logger.info(`[S3 Adapter] Small file (<${MIN_S3_TOTAL_SIZE_FOR_MULTIPART}B). Will use single PUT.`);
|
||||
metadata = { ...baseMetadata, isMultipartUpload: false };
|
||||
} else {
|
||||
logger.info(`[S3 Adapter] Large file (>=${MIN_S3_TOTAL_SIZE_FOR_MULTIPART}B). Will use MPU.`);
|
||||
const mpuResponse = await s3Client.send(new CreateMultipartUploadCommand({ Bucket: config.s3BucketName, Key: s3Key }));
|
||||
if (!mpuResponse.UploadId) throw new Error('S3 did not return UploadId for MPU.');
|
||||
metadata = { ...baseMetadata, isMultipartUpload: true, s3UploadId: mpuResponse.UploadId };
|
||||
logger.info(`[S3 Adapter] MPU initiated for ${s3Key}, S3UploadId: ${metadata.s3UploadId}`);
|
||||
}
|
||||
try {
|
||||
const createCommand = new CreateMultipartUploadCommand({ Bucket: config.s3BucketName, Key: finalS3Key });
|
||||
const response = await s3Client.send(createCommand);
|
||||
const s3UploadId = response.UploadId;
|
||||
if (!s3UploadId) throw new Error('S3 did not return UploadId');
|
||||
logger.info(`[S3 Adapter] Multipart initiated for ${finalS3Key} (S3 UploadId: ${s3UploadId})`);
|
||||
|
||||
await fs.writeFile(tempPartPath, ''); // Create empty buffer file
|
||||
const metadata = {
|
||||
appUploadId, s3UploadId, s3Key: finalS3Key,
|
||||
originalFilename: originalSanitizedFullpath, // Use the full original path for notification
|
||||
fileSize: size, bytesReceived: 0, parts: [], batchId,
|
||||
createdAt: Date.now(), lastActivity: Date.now()
|
||||
};
|
||||
await writeUploadMetadata(appUploadId, metadata);
|
||||
logger.info(`[S3 Adapter] Initialized upload ${appUploadId} for ${s3Key}. Temp buffer: ${tempPartPath}. MPU: ${metadata.isMultipartUpload}`);
|
||||
return { uploadId: appUploadId };
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed multipart init for ${finalS3Key}: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function _uploadBufferedPart(metadata) {
|
||||
const partBuffer = await fs.readFile(metadata.tempPartPath);
|
||||
if (partBuffer.length === 0) {
|
||||
logger.warn(`[S3 Adapter MPU] Attempted to upload empty part for ${metadata.appUploadId}. Skipping.`);
|
||||
return; // Don't upload empty parts
|
||||
}
|
||||
|
||||
const partNumber = metadata.parts.length + 1;
|
||||
logger.info(`[S3 Adapter MPU] Uploading part ${partNumber} (${partBuffer.length} bytes) for ${metadata.appUploadId} (Key: ${metadata.s3Key})`);
|
||||
|
||||
const uploadPartCmd = new UploadPartCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
|
||||
Body: partBuffer, PartNumber: partNumber, ContentLength: partBuffer.length
|
||||
});
|
||||
const partResponse = await s3Client.send(uploadPartCmd);
|
||||
metadata.parts.push({ PartNumber: partNumber, ETag: partResponse.ETag });
|
||||
|
||||
// Reset buffer for next part
|
||||
await fs.writeFile(metadata.tempPartPath, '');
|
||||
metadata.currentPartBytes = 0;
|
||||
logger.info(`[S3 Adapter MPU] Part ${partNumber} uploaded for ${metadata.appUploadId}. ETag: ${partResponse.ETag}`);
|
||||
}
|
||||
|
||||
async function storeChunk(appUploadId, chunk) {
|
||||
async function storeChunk(appUploadId, chunk, partNumber) {
|
||||
const chunkSize = chunk.length;
|
||||
if (!chunkSize) throw new Error('Empty chunk received for storeChunk');
|
||||
if (!chunkSize) throw new Error('Empty chunk received');
|
||||
if (partNumber < 1) throw new Error('PartNumber must be 1 or greater');
|
||||
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata) throw new Error(`Upload session ${appUploadId} not found or already completed.`);
|
||||
|
||||
await fs.appendFile(metadata.tempPartPath, chunk);
|
||||
metadata.bytesReceived += chunkSize;
|
||||
metadata.currentPartBytes += chunkSize;
|
||||
|
||||
let justUploadedAPart = false;
|
||||
if (metadata.isMultipartUpload) {
|
||||
// Upload part if buffer is full or it's the last overall chunk for the file
|
||||
const isLastChunkOfFile = metadata.bytesReceived >= metadata.fileSize;
|
||||
if (metadata.currentPartBytes >= S3_PART_SIZE || (isLastChunkOfFile && metadata.currentPartBytes > 0)) {
|
||||
await _uploadBufferedPart(metadata);
|
||||
justUploadedAPart = true; // indicates that currentPartBytes was reset
|
||||
if (!metadata || !metadata.s3UploadId) {
|
||||
logger.warn(`[S3 Adapter] Metadata or S3 UploadId not found for chunk: ${appUploadId}`);
|
||||
throw new Error('Upload session not found or already completed');
|
||||
}
|
||||
if (metadata.bytesReceived >= metadata.fileSize && metadata.fileSize > 0) {
|
||||
logger.warn(`[S3 Adapter] Chunk for already completed upload ${appUploadId}. Ignoring.`);
|
||||
return { bytesReceived: metadata.bytesReceived, progress: 100, completed: true };
|
||||
}
|
||||
|
||||
await writeUploadMetadata(appUploadId, metadata); // Persist bytesReceived, parts array, currentPartBytes
|
||||
try {
|
||||
const cmd = new UploadPartCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
|
||||
Body: chunk, PartNumber: partNumber, ContentLength: chunkSize
|
||||
});
|
||||
const response = await s3Client.send(cmd);
|
||||
const etag = response.ETag;
|
||||
if (!etag) throw new Error(`S3 ETag missing for Part ${partNumber}`);
|
||||
|
||||
metadata.parts.push({ PartNumber: partNumber, ETag: etag });
|
||||
metadata.parts.sort((a, b) => a.PartNumber - b.PartNumber);
|
||||
metadata.bytesReceived = Math.min((metadata.bytesReceived || 0) + chunkSize, metadata.fileSize);
|
||||
await writeUploadMetadata(appUploadId, metadata);
|
||||
|
||||
const progress = metadata.fileSize === 0 ? 100 : Math.min(Math.round((metadata.bytesReceived / metadata.fileSize) * 100), 100);
|
||||
const completed = metadata.bytesReceived >= metadata.fileSize;
|
||||
|
||||
logger.debug(`[S3 Adapter] Chunk stored for ${appUploadId}. Total ${metadata.bytesReceived}/${metadata.fileSize} (${progress}%). Part buffered: ${metadata.currentPartBytes}. Part uploaded: ${justUploadedAPart}`);
|
||||
if (completed) logger.info(`[S3 Adapter] All data for ${appUploadId} received locally. Ready for S3 finalization.`);
|
||||
|
||||
logger.debug(`[S3 Adapter] Part ${partNumber} for ${appUploadId} (Key: ${metadata.s3Key}). ETag: ${etag}. Progress: ~${progress}%. Completed: ${completed}`);
|
||||
return { bytesReceived: metadata.bytesReceived, progress, completed };
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed Part ${partNumber} for ${appUploadId} (Key: ${metadata.s3Key}): ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function completeUpload(appUploadId) {
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata) throw new Error(`Cannot complete: Metadata for ${appUploadId} not found.`);
|
||||
|
||||
if (!metadata || !metadata.s3UploadId || !metadata.parts || metadata.parts.length === 0) {
|
||||
throw new Error('Upload completion failed: Missing metadata/parts');
|
||||
}
|
||||
if (metadata.bytesReceived < metadata.fileSize) {
|
||||
logger.error(`[S3 Adapter] FATAL: Attempt to complete ${appUploadId} with ${metadata.bytesReceived}/${metadata.fileSize} bytes.`);
|
||||
throw new Error(`Incomplete data for ${appUploadId} cannot be finalized.`);
|
||||
logger.warn(`[S3 Adapter] Completing ${appUploadId} with ${metadata.bytesReceived}/${metadata.fileSize} bytes tracked.`);
|
||||
}
|
||||
|
||||
try {
|
||||
if (metadata.isMultipartUpload) {
|
||||
// If there's any data left in the buffer for the last part, upload it
|
||||
if (metadata.currentPartBytes > 0) {
|
||||
logger.info(`[S3 Adapter MPU] Uploading final remaining buffered part for ${appUploadId}`);
|
||||
await _uploadBufferedPart(metadata); // This will also update metadata and save it
|
||||
// Re-read metadata to get the final parts list if _uploadBufferedPart wrote it
|
||||
const updatedMetadata = await readUploadMetadata(appUploadId);
|
||||
if (!updatedMetadata) throw new Error("Metadata disappeared after final part upload.");
|
||||
metadata.parts = updatedMetadata.parts;
|
||||
}
|
||||
if (!metadata.parts || metadata.parts.length === 0 && metadata.fileSize > 0) { // fileSize > 0 check because a 0 byte file could be MPU if MIN_S3_TOTAL_SIZE_FOR_MULTIPART is 0
|
||||
throw new Error(`No parts recorded for MPU ${appUploadId} of size ${metadata.fileSize}. Cannot complete.`);
|
||||
}
|
||||
const completeCmd = new CompleteMultipartUploadCommand({
|
||||
const cmd = new CompleteMultipartUploadCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
|
||||
MultipartUpload: { Parts: metadata.parts },
|
||||
});
|
||||
const response = await s3Client.send(completeCmd);
|
||||
logger.success(`[S3 Adapter MPU] Finalized: ${metadata.s3Key} (ETag: ${response.ETag})`);
|
||||
} else {
|
||||
// Single PUT for small files
|
||||
const fileBuffer = await fs.readFile(metadata.tempPartPath);
|
||||
if (fileBuffer.length !== metadata.fileSize) {
|
||||
throw new Error(`Buffered size ${fileBuffer.length} != metadata size ${metadata.fileSize} for ${appUploadId}`);
|
||||
}
|
||||
logger.info(`[S3 Adapter SinglePut] Uploading ${metadata.s3Key} via single PutObject from buffer.`);
|
||||
await s3Client.send(new PutObjectCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key,
|
||||
Body: fileBuffer, ContentLength: metadata.fileSize
|
||||
}));
|
||||
logger.success(`[S3 Adapter SinglePut] Finalized: ${metadata.s3Key}`);
|
||||
}
|
||||
|
||||
await fs.unlink(metadata.tempPartPath).catch(err => logger.warn(`[S3 Adapter] Failed to delete temp part buffer ${metadata.tempPartPath}: ${err.message}`));
|
||||
const response = await s3Client.send(cmd);
|
||||
logger.success(`[S3 Adapter] Finalized: ${metadata.s3Key} (ETag: ${response.ETag})`);
|
||||
await deleteUploadMetadata(appUploadId);
|
||||
sendNotification(metadata.originalFilename, metadata.fileSize, config);
|
||||
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
|
||||
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to complete S3 upload for ${appUploadId} (Key: ${metadata.s3Key}, MPU: ${metadata.isMultipartUpload}): ${err.message} ${err.name}`);
|
||||
if (err && err.$response) {
|
||||
const responseBody = err.$response.body ? await streamToString(err.$response.body) : "No body/streamed";
|
||||
console.error("[S3 Adapter] Raw Error $response:", {statusCode: err.$response.statusCode, headers: err.$response.headers, body: responseBody});
|
||||
} else {
|
||||
console.error("[S3 Adapter] Error object (no $response):", JSON.stringify(err, Object.getOwnPropertyNames(err), 2));
|
||||
}
|
||||
if (metadata.isMultipartUpload && metadata.s3UploadId) {
|
||||
logger.warn(`[S3 Adapter MPU] Attempting to abort failed MPU ${metadata.s3UploadId}`);
|
||||
await s3Client.send(new AbortMultipartUploadCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId }))
|
||||
.catch(abortErr => logger.error(`[S3 Adapter MPU] Failed to abort MPU after error: ${abortErr.message}`));
|
||||
logger.error(`[S3 Adapter] Failed CompleteMultipartUpload for ${metadata.s3Key}: ${err.message}`);
|
||||
if (err.Code === 'NoSuchUpload' || err.name === 'NoSuchUpload') {
|
||||
logger.warn(`[S3 Adapter] NoSuchUpload on complete for ${appUploadId}. Assuming completed/aborted.`);
|
||||
await deleteUploadMetadata(appUploadId).catch(()=>{});
|
||||
try {
|
||||
await s3Client.send(new HeadObjectCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key }));
|
||||
logger.info(`[S3 Adapter] Final object ${metadata.s3Key} exists after NoSuchUpload. Treating as completed.`);
|
||||
return { filename: metadata.originalFilename, size: metadata.fileSize, finalPath: metadata.s3Key };
|
||||
} catch (headErr) { throw new Error('Completion failed: Session & final object not found.'); }
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
@@ -276,117 +337,100 @@ try {
|
||||
|
||||
async function abortUpload(appUploadId) {
|
||||
const metadata = await readUploadMetadata(appUploadId);
|
||||
if (!metadata) {
|
||||
logger.warn(`[S3 Adapter] Abort: Metadata for ${appUploadId} not found.`);
|
||||
await deleteUploadMetadata(appUploadId); // ensure local meta is gone
|
||||
const tempPartPath = path.join(TEMP_CHUNK_DIR, `${appUploadId}.partbuffer`); // guess path
|
||||
await fs.unlink(tempPartPath).catch(()=>{}); // try delete orphan buffer
|
||||
return;
|
||||
if (!metadata || !metadata.s3UploadId) {
|
||||
logger.warn(`[S3 Adapter] Abort for non-existent/completed upload: ${appUploadId}`);
|
||||
await deleteUploadMetadata(appUploadId); return;
|
||||
}
|
||||
|
||||
if (metadata.tempPartPath) {
|
||||
await fs.unlink(metadata.tempPartPath)
|
||||
.then(() => logger.info(`[S3 Adapter] Deleted temp part buffer on abort: ${metadata.tempPartPath}`))
|
||||
.catch(err => { if (err.code !== 'ENOENT') logger.error(`[S3 Adapter] Err deleting temp buffer ${metadata.tempPartPath} on abort: ${err.message}`);});
|
||||
}
|
||||
|
||||
if (metadata.isMultipartUpload && metadata.s3UploadId) {
|
||||
try {
|
||||
await s3Client.send(new AbortMultipartUploadCommand({
|
||||
Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId,
|
||||
}));
|
||||
logger.info(`[S3 Adapter MPU] Aborted S3 MPU: ${metadata.s3UploadId}`);
|
||||
logger.info(`[S3 Adapter] Aborted: ${appUploadId} (Key: ${metadata.s3Key})`);
|
||||
} catch (err) {
|
||||
if (err.name !== 'NoSuchUpload') { logger.error(`[S3 Adapter MPU] Failed to abort S3 MPU ${metadata.s3UploadId}: ${err.message}`);}
|
||||
else {logger.warn(`[S3 Adapter MPU] S3 MPU ${metadata.s3UploadId} not found during abort (already gone).`);}
|
||||
if (err.name !== 'NoSuchUpload') {
|
||||
logger.error(`[S3 Adapter] Failed Abort for ${metadata.s3Key}: ${err.message}`); throw err;
|
||||
}
|
||||
logger.warn(`[S3 Adapter] NoSuchUpload on abort for ${metadata.s3Key}. Already aborted/completed.`);
|
||||
}
|
||||
await deleteUploadMetadata(appUploadId);
|
||||
}
|
||||
|
||||
|
||||
async function listFiles() {
|
||||
try {
|
||||
const command = new ListObjectsV2Command({ Bucket: config.s3BucketName });
|
||||
const response = await s3Client.send(command);
|
||||
const files = (response.Contents || [])
|
||||
.map(item => ({
|
||||
let isTruncated = true; let continuationToken; const allFiles = [];
|
||||
while(isTruncated) {
|
||||
const params = { Bucket: config.s3BucketName };
|
||||
if (continuationToken) params.ContinuationToken = continuationToken;
|
||||
const response = await s3Client.send(new ListObjectsV2Command(params));
|
||||
(response.Contents || []).forEach(item => allFiles.push({
|
||||
filename: item.Key, size: item.Size,
|
||||
formattedSize: formatFileSize(item.Size), uploadDate: item.LastModified
|
||||
})).sort((a, b) => b.uploadDate.getTime() - a.uploadDate.getTime());
|
||||
return files;
|
||||
} catch (err) { logger.error(`[S3 Adapter] Failed to list S3 objects: ${err.message}`); throw err; }
|
||||
}));
|
||||
isTruncated = response.IsTruncated;
|
||||
continuationToken = response.NextContinuationToken;
|
||||
}
|
||||
allFiles.sort((a, b) => b.uploadDate.getTime() - a.uploadDate.getTime());
|
||||
return allFiles;
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed list objects in ${config.s3BucketName}: ${err.message}`); throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function getDownloadUrlOrStream(s3Key) {
|
||||
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid S3 key for download');
|
||||
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid filename for download');
|
||||
try {
|
||||
const url = await getSignedUrl(s3Client, new GetObjectCommand({ Bucket: config.s3BucketName, Key: s3Key }), { expiresIn: 3600 });
|
||||
logger.info(`[S3 Adapter] Generated presigned URL for ${s3Key}`);
|
||||
const cmd = new GetObjectCommand({ Bucket: config.s3BucketName, Key: s3Key });
|
||||
const url = await getSignedUrl(s3Client, cmd, { expiresIn: 3600 });
|
||||
logger.info(`[S3 Adapter] Presigned URL for ${s3Key}`);
|
||||
return { type: 'url', value: url };
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed to get presigned URL for ${s3Key}: ${err.message}`);
|
||||
if (err.name === 'NoSuchKey') throw new Error('File not found in S3');
|
||||
throw err;
|
||||
logger.error(`[S3 Adapter] Failed presigned URL for ${s3Key}: ${err.message}`);
|
||||
if (err.name === 'NoSuchKey') throw new Error('File not found in S3'); throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function deleteFile(s3Key) {
|
||||
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid S3 key for delete');
|
||||
if (!s3Key || s3Key.includes('..') || s3Key.startsWith('/')) throw new Error('Invalid filename for delete');
|
||||
try {
|
||||
await s3Client.send(new DeleteObjectCommand({ Bucket: config.s3BucketName, Key: s3Key }));
|
||||
logger.info(`[S3 Adapter] Deleted S3 object: ${s3Key}`);
|
||||
} catch (err) { logger.error(`[S3 Adapter] Failed to delete S3 object ${s3Key}: ${err.message}`); throw err; }
|
||||
logger.info(`[S3 Adapter] Deleted: ${s3Key}`);
|
||||
} catch (err) {
|
||||
logger.error(`[S3 Adapter] Failed delete for ${s3Key}: ${err.message}`); throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function cleanupStale() {
|
||||
logger.info('[S3 Adapter] Cleaning stale local metadata & temp part buffers...');
|
||||
let cleanedMeta = 0, checkedMeta = 0, cleanedBuffers = 0;
|
||||
const now = Date.now();
|
||||
|
||||
try { // Stale .meta files
|
||||
const metaFiles = await fs.readdir(METADATA_DIR);
|
||||
for (const file of metaFiles) {
|
||||
if (!file.endsWith('.meta')) continue;
|
||||
checkedMeta++;
|
||||
const appUploadId = file.replace('.meta', '');
|
||||
const metaFilePath = path.join(METADATA_DIR, file);
|
||||
logger.info('[S3 Adapter] Cleaning stale local metadata...');
|
||||
let cleaned = 0, checked = 0;
|
||||
try {
|
||||
const metadata = JSON.parse(await fs.readFile(metaFilePath, 'utf8'));
|
||||
if (now - (metadata.lastActivity || metadata.createdAt || 0) > UPLOAD_TIMEOUT) {
|
||||
logger.warn(`[S3 Adapter] Stale meta: ${file}. AppUploadId: ${appUploadId}. Cleaning.`);
|
||||
if (metadata.tempPartPath) {
|
||||
await fs.unlink(metadata.tempPartPath).then(()=>cleanedBuffers++).catch(()=>{});
|
||||
await ensureMetadataDirExists(); const files = await fs.readdir(METADATA_DIR); const now = Date.now();
|
||||
for (const file of files) {
|
||||
if (file.endsWith('.meta')) {
|
||||
checked++; const id = file.replace('.meta',''); const fp = path.join(METADATA_DIR, file);
|
||||
try {
|
||||
const meta = JSON.parse(await fs.readFile(fp, 'utf8'));
|
||||
if (now - (meta.lastActivity || meta.createdAt || 0) > UPLOAD_TIMEOUT) {
|
||||
logger.warn(`[S3 Adapter] Stale local meta: ${file}, S3 ID: ${meta.s3UploadId||'N/A'}`);
|
||||
await deleteUploadMetadata(id); cleaned++;
|
||||
}
|
||||
if (metadata.isMultipartUpload && metadata.s3UploadId) {
|
||||
await s3Client.send(new AbortMultipartUploadCommand({ Bucket: config.s3BucketName, Key: metadata.s3Key, UploadId: metadata.s3UploadId }))
|
||||
.catch(e => {if (e.name !== 'NoSuchUpload') logger.error(`Stale MPU abort fail: ${e.message}`);});
|
||||
}
|
||||
await fs.unlink(metaFilePath);
|
||||
cleanedMeta++;
|
||||
}
|
||||
} catch (e) { logger.error(`Error processing stale meta ${file}: ${e.message}. Deleting.`); await fs.unlink(metaFilePath).catch(()=>{});}
|
||||
}
|
||||
} catch (e) { if (e.code !== 'ENOENT') logger.error(`Meta dir cleanup error: ${e.message}`);}
|
||||
|
||||
try { // Orphaned .partbuffer files
|
||||
const bufferFiles = await fs.readdir(TEMP_CHUNK_DIR);
|
||||
for (const file of bufferFiles) {
|
||||
if (!file.endsWith('.partbuffer')) continue;
|
||||
const bufferFilePath = path.join(TEMP_CHUNK_DIR, file);
|
||||
const stats = await fs.stat(bufferFilePath);
|
||||
if (now - stats.mtime.getTime() > UPLOAD_TIMEOUT) {
|
||||
logger.warn(`[S3 Adapter] Stale orphaned buffer: ${file}. Deleting.`);
|
||||
await fs.unlink(bufferFilePath);
|
||||
cleanedBuffers++;
|
||||
} catch (e) { logger.error(`[S3 Adapter] Error parsing meta ${fp}: ${e.message}`); await fs.unlink(fp).catch(()=>{}); }
|
||||
} else if (file.endsWith('.tmp')) {
|
||||
const tmpP = path.join(METADATA_DIR, file);
|
||||
try { if (now - (await fs.stat(tmpP)).mtime.getTime() > UPLOAD_TIMEOUT) { logger.warn(`[S3 Adapter] Deleting stale tmp meta: ${file}`); await fs.unlink(tmpP); }}
|
||||
catch (e) { if (e.code!=='ENOENT') logger.error(`[S3 Adapter] Error stat/unlink tmp meta ${tmpP}: ${e.message}`);}
|
||||
}
|
||||
}
|
||||
} catch (e) { if (e.code !== 'ENOENT') logger.error(`Temp buffer dir cleanup error: ${e.message}`);}
|
||||
|
||||
if (checkedMeta || cleanedMeta || cleanedBuffers) {
|
||||
logger.info(`[S3 Adapter] Local cleanup: MetaChecked: ${checkedMeta}, MetaCleaned: ${cleanedMeta}, BuffersCleaned: ${cleanedBuffers}.`);
|
||||
if (checked > 0 || cleaned > 0) logger.info(`[S3 Adapter] Local meta cleanup: Checked ${checked}, Cleaned ${cleaned}.`);
|
||||
logger.warn(`[S3 Adapter] IMPORTANT: Configure S3 Lifecycle Rules on bucket '${config.s3BucketName}' to clean incomplete multipart uploads.`);
|
||||
} catch (err) {
|
||||
if (err.code==='ENOENT'&&err.path===METADATA_DIR) logger.warn('[S3 Adapter] Local meta dir not found for cleanup.');
|
||||
else logger.error(`[S3 Adapter] Error local meta cleanup: ${err.message}`);
|
||||
}
|
||||
// Basic batchS3PrefixMappings cleanup
|
||||
if (batchS3PrefixMappings.size > 1000) {
|
||||
logger.warn(`[S3 Adapter] Clearing batchS3PrefixMappings (size: ${batchS3PrefixMappings.size}).`);
|
||||
batchS3PrefixMappings.clear();
|
||||
}
|
||||
logger.warn(`[S3 Adapter] Reminder: Configure S3 Lifecycle Rules on bucket '${config.s3BucketName}' for S3-side MPU cleanup.`);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
Reference in New Issue
Block a user