mirror of
https://github.com/9technologygroup/patchmon.net.git
synced 2025-11-13 02:17:05 +00:00
Merge remote 1-3-3 branch with local changes
This commit is contained in:
@@ -242,6 +242,30 @@ router.post(
|
||||
},
|
||||
);
|
||||
|
||||
// Trigger manual system statistics collection
|
||||
router.post(
|
||||
"/trigger/system-statistics",
|
||||
authenticateToken,
|
||||
async (_req, res) => {
|
||||
try {
|
||||
const job = await queueManager.triggerSystemStatistics();
|
||||
res.json({
|
||||
success: true,
|
||||
data: {
|
||||
jobId: job.id,
|
||||
message: "System statistics collection triggered successfully",
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("Error triggering system statistics collection:", error);
|
||||
res.status(500).json({
|
||||
success: false,
|
||||
error: "Failed to trigger system statistics collection",
|
||||
});
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Get queue health status
|
||||
router.get("/health", authenticateToken, async (_req, res) => {
|
||||
try {
|
||||
@@ -300,6 +324,7 @@ router.get("/overview", authenticateToken, async (_req, res) => {
|
||||
queueManager.getRecentJobs(QUEUE_NAMES.ORPHANED_PACKAGE_CLEANUP, 1),
|
||||
queueManager.getRecentJobs(QUEUE_NAMES.DOCKER_INVENTORY_CLEANUP, 1),
|
||||
queueManager.getRecentJobs(QUEUE_NAMES.AGENT_COMMANDS, 1),
|
||||
queueManager.getRecentJobs(QUEUE_NAMES.SYSTEM_STATISTICS, 1),
|
||||
]);
|
||||
|
||||
// Calculate overview metrics
|
||||
@@ -309,21 +334,24 @@ router.get("/overview", authenticateToken, async (_req, res) => {
|
||||
stats[QUEUE_NAMES.SESSION_CLEANUP].delayed +
|
||||
stats[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].delayed +
|
||||
stats[QUEUE_NAMES.ORPHANED_PACKAGE_CLEANUP].delayed +
|
||||
stats[QUEUE_NAMES.DOCKER_INVENTORY_CLEANUP].delayed,
|
||||
stats[QUEUE_NAMES.DOCKER_INVENTORY_CLEANUP].delayed +
|
||||
stats[QUEUE_NAMES.SYSTEM_STATISTICS].delayed,
|
||||
|
||||
runningTasks:
|
||||
stats[QUEUE_NAMES.GITHUB_UPDATE_CHECK].active +
|
||||
stats[QUEUE_NAMES.SESSION_CLEANUP].active +
|
||||
stats[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].active +
|
||||
stats[QUEUE_NAMES.ORPHANED_PACKAGE_CLEANUP].active +
|
||||
stats[QUEUE_NAMES.DOCKER_INVENTORY_CLEANUP].active,
|
||||
stats[QUEUE_NAMES.DOCKER_INVENTORY_CLEANUP].active +
|
||||
stats[QUEUE_NAMES.SYSTEM_STATISTICS].active,
|
||||
|
||||
failedTasks:
|
||||
stats[QUEUE_NAMES.GITHUB_UPDATE_CHECK].failed +
|
||||
stats[QUEUE_NAMES.SESSION_CLEANUP].failed +
|
||||
stats[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].failed +
|
||||
stats[QUEUE_NAMES.ORPHANED_PACKAGE_CLEANUP].failed +
|
||||
stats[QUEUE_NAMES.DOCKER_INVENTORY_CLEANUP].failed,
|
||||
stats[QUEUE_NAMES.DOCKER_INVENTORY_CLEANUP].failed +
|
||||
stats[QUEUE_NAMES.SYSTEM_STATISTICS].failed,
|
||||
|
||||
totalAutomations: Object.values(stats).reduce((sum, queueStats) => {
|
||||
return (
|
||||
@@ -435,6 +463,22 @@ router.get("/overview", authenticateToken, async (_req, res) => {
|
||||
: "Never run",
|
||||
stats: stats[QUEUE_NAMES.AGENT_COMMANDS],
|
||||
},
|
||||
{
|
||||
name: "System Statistics Collection",
|
||||
queue: QUEUE_NAMES.SYSTEM_STATISTICS,
|
||||
description: "Collects aggregated system-wide package statistics",
|
||||
schedule: "Every 30 minutes",
|
||||
lastRun: recentJobs[6][0]?.finishedOn
|
||||
? new Date(recentJobs[6][0].finishedOn).toLocaleString()
|
||||
: "Never",
|
||||
lastRunTimestamp: recentJobs[6][0]?.finishedOn || 0,
|
||||
status: recentJobs[6][0]?.failedReason
|
||||
? "Failed"
|
||||
: recentJobs[6][0]
|
||||
? "Success"
|
||||
: "Never run",
|
||||
stats: stats[QUEUE_NAMES.SYSTEM_STATISTICS],
|
||||
},
|
||||
].sort((a, b) => {
|
||||
// Sort by last run timestamp (most recent first)
|
||||
// If both have never run (timestamp 0), maintain original order
|
||||
|
||||
@@ -564,174 +564,216 @@ router.get(
|
||||
const startDate = new Date();
|
||||
startDate.setDate(endDate.getDate() - daysInt);
|
||||
|
||||
// Build where clause
|
||||
const whereClause = {
|
||||
timestamp: {
|
||||
gte: startDate,
|
||||
lte: endDate,
|
||||
},
|
||||
};
|
||||
|
||||
// Add host filter if specified
|
||||
if (hostId && hostId !== "all" && hostId !== "undefined") {
|
||||
whereClause.host_id = hostId;
|
||||
}
|
||||
|
||||
// Get all update history records in the date range
|
||||
const trendsData = await prisma.update_history.findMany({
|
||||
where: whereClause,
|
||||
select: {
|
||||
timestamp: true,
|
||||
packages_count: true,
|
||||
security_count: true,
|
||||
total_packages: true,
|
||||
host_id: true,
|
||||
status: true,
|
||||
},
|
||||
orderBy: {
|
||||
timestamp: "asc",
|
||||
},
|
||||
});
|
||||
|
||||
// Enhanced data validation and processing
|
||||
const processedData = trendsData
|
||||
.filter((record) => {
|
||||
// Enhanced validation
|
||||
return (
|
||||
record.total_packages !== null &&
|
||||
record.total_packages >= 0 &&
|
||||
record.packages_count >= 0 &&
|
||||
record.security_count >= 0 &&
|
||||
record.security_count <= record.packages_count && // Security can't exceed outdated
|
||||
record.status === "success"
|
||||
); // Only include successful reports
|
||||
})
|
||||
.map((record) => {
|
||||
const date = new Date(record.timestamp);
|
||||
let timeKey;
|
||||
|
||||
if (daysInt <= 1) {
|
||||
// For hourly view, group by hour only (not minutes)
|
||||
timeKey = date.toISOString().substring(0, 13); // YYYY-MM-DDTHH
|
||||
} else {
|
||||
// For daily view, group by day
|
||||
timeKey = date.toISOString().split("T")[0]; // YYYY-MM-DD
|
||||
}
|
||||
|
||||
return {
|
||||
timeKey,
|
||||
total_packages: record.total_packages,
|
||||
packages_count: record.packages_count || 0,
|
||||
security_count: record.security_count || 0,
|
||||
host_id: record.host_id,
|
||||
timestamp: record.timestamp,
|
||||
};
|
||||
});
|
||||
|
||||
// Determine if we need aggregation based on host filter
|
||||
const needsAggregation =
|
||||
!hostId || hostId === "all" || hostId === "undefined";
|
||||
|
||||
let trendsData;
|
||||
|
||||
if (needsAggregation) {
|
||||
// For "All Hosts" mode, use system_statistics table
|
||||
trendsData = await prisma.system_statistics.findMany({
|
||||
where: {
|
||||
timestamp: {
|
||||
gte: startDate,
|
||||
lte: endDate,
|
||||
},
|
||||
},
|
||||
select: {
|
||||
timestamp: true,
|
||||
unique_packages_count: true,
|
||||
unique_security_count: true,
|
||||
total_packages: true,
|
||||
total_hosts: true,
|
||||
hosts_needing_updates: true,
|
||||
},
|
||||
orderBy: {
|
||||
timestamp: "asc",
|
||||
},
|
||||
});
|
||||
} else {
|
||||
// For individual host, use update_history table
|
||||
trendsData = await prisma.update_history.findMany({
|
||||
where: {
|
||||
host_id: hostId,
|
||||
timestamp: {
|
||||
gte: startDate,
|
||||
lte: endDate,
|
||||
},
|
||||
},
|
||||
select: {
|
||||
timestamp: true,
|
||||
packages_count: true,
|
||||
security_count: true,
|
||||
total_packages: true,
|
||||
host_id: true,
|
||||
status: true,
|
||||
},
|
||||
orderBy: {
|
||||
timestamp: "asc",
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Process data based on source
|
||||
let processedData;
|
||||
let aggregatedArray;
|
||||
|
||||
if (needsAggregation) {
|
||||
// For "All Hosts" mode, we need to calculate the actual total packages differently
|
||||
// Instead of aggregating historical data (which is per-host), we'll use the current total
|
||||
// and show that as a flat line, since total packages don't change much over time
|
||||
// For "All Hosts" mode, data comes from system_statistics table
|
||||
// Already aggregated, just need to format it
|
||||
processedData = trendsData
|
||||
.filter((record) => {
|
||||
// Enhanced validation
|
||||
return (
|
||||
record.total_packages !== null &&
|
||||
record.total_packages >= 0 &&
|
||||
record.unique_packages_count >= 0 &&
|
||||
record.unique_security_count >= 0 &&
|
||||
record.unique_security_count <= record.unique_packages_count
|
||||
);
|
||||
})
|
||||
.map((record) => {
|
||||
const date = new Date(record.timestamp);
|
||||
let timeKey;
|
||||
|
||||
// Get the current total packages count (unique packages across all hosts)
|
||||
const currentTotalPackages = await prisma.packages.count({
|
||||
where: {
|
||||
host_packages: {
|
||||
some: {}, // At least one host has this package
|
||||
},
|
||||
},
|
||||
});
|
||||
if (daysInt <= 1) {
|
||||
// For "Last 24 hours", use full timestamp for each data point
|
||||
// This allows plotting all individual data points
|
||||
timeKey = date.toISOString(); // Full ISO timestamp
|
||||
} else {
|
||||
// For daily view, group by day
|
||||
timeKey = date.toISOString().split("T")[0]; // YYYY-MM-DD
|
||||
}
|
||||
|
||||
// Aggregate data by timeKey when looking at "All Hosts" or no specific host
|
||||
const aggregatedData = processedData.reduce((acc, item) => {
|
||||
if (!acc[item.timeKey]) {
|
||||
acc[item.timeKey] = {
|
||||
timeKey: item.timeKey,
|
||||
total_packages: currentTotalPackages, // Use current total packages
|
||||
packages_count: 0,
|
||||
security_count: 0,
|
||||
record_count: 0,
|
||||
host_ids: new Set(),
|
||||
min_timestamp: item.timestamp,
|
||||
max_timestamp: item.timestamp,
|
||||
return {
|
||||
timeKey,
|
||||
total_packages: record.total_packages,
|
||||
packages_count: record.unique_packages_count,
|
||||
security_count: record.unique_security_count,
|
||||
timestamp: record.timestamp,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
// For outdated and security packages: SUM (these represent counts across hosts)
|
||||
acc[item.timeKey].packages_count += item.packages_count;
|
||||
acc[item.timeKey].security_count += item.security_count;
|
||||
if (daysInt <= 1) {
|
||||
// For "Last 24 hours", use all individual data points without grouping
|
||||
// Sort by timestamp
|
||||
aggregatedArray = processedData.sort(
|
||||
(a, b) => a.timestamp.getTime() - b.timestamp.getTime(),
|
||||
);
|
||||
} else {
|
||||
// For longer periods, group by timeKey and take the latest value for each period
|
||||
const aggregatedData = processedData.reduce((acc, item) => {
|
||||
if (
|
||||
!acc[item.timeKey] ||
|
||||
item.timestamp > acc[item.timeKey].timestamp
|
||||
) {
|
||||
acc[item.timeKey] = item;
|
||||
}
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
acc[item.timeKey].record_count += 1;
|
||||
acc[item.timeKey].host_ids.add(item.host_id);
|
||||
|
||||
// Track timestamp range
|
||||
if (item.timestamp < acc[item.timeKey].min_timestamp) {
|
||||
acc[item.timeKey].min_timestamp = item.timestamp;
|
||||
}
|
||||
if (item.timestamp > acc[item.timeKey].max_timestamp) {
|
||||
acc[item.timeKey].max_timestamp = item.timestamp;
|
||||
}
|
||||
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
// Convert to array and add metadata
|
||||
aggregatedArray = Object.values(aggregatedData)
|
||||
.map((item) => ({
|
||||
...item,
|
||||
host_count: item.host_ids.size,
|
||||
host_ids: Array.from(item.host_ids),
|
||||
}))
|
||||
.sort((a, b) => a.timeKey.localeCompare(b.timeKey));
|
||||
// Convert to array and sort
|
||||
aggregatedArray = Object.values(aggregatedData).sort((a, b) =>
|
||||
a.timeKey.localeCompare(b.timeKey),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// For specific host, show individual data points without aggregation
|
||||
// But still group by timeKey to handle multiple reports from same host in same time period
|
||||
const hostAggregatedData = processedData.reduce((acc, item) => {
|
||||
if (!acc[item.timeKey]) {
|
||||
acc[item.timeKey] = {
|
||||
timeKey: item.timeKey,
|
||||
total_packages: 0,
|
||||
packages_count: 0,
|
||||
security_count: 0,
|
||||
record_count: 0,
|
||||
host_ids: new Set([item.host_id]),
|
||||
min_timestamp: item.timestamp,
|
||||
max_timestamp: item.timestamp,
|
||||
// For individual host, data comes from update_history table
|
||||
processedData = trendsData
|
||||
.filter((record) => {
|
||||
// Enhanced validation
|
||||
return (
|
||||
record.total_packages !== null &&
|
||||
record.total_packages >= 0 &&
|
||||
record.packages_count >= 0 &&
|
||||
record.security_count >= 0 &&
|
||||
record.security_count <= record.packages_count &&
|
||||
record.status === "success"
|
||||
);
|
||||
})
|
||||
.map((record) => {
|
||||
const date = new Date(record.timestamp);
|
||||
let timeKey;
|
||||
|
||||
if (daysInt <= 1) {
|
||||
// For "Last 24 hours", use full timestamp for each data point
|
||||
// This allows plotting all individual data points
|
||||
timeKey = date.toISOString(); // Full ISO timestamp
|
||||
} else {
|
||||
// For daily view, group by day
|
||||
timeKey = date.toISOString().split("T")[0]; // YYYY-MM-DD
|
||||
}
|
||||
|
||||
return {
|
||||
timeKey,
|
||||
total_packages: record.total_packages,
|
||||
packages_count: record.packages_count || 0,
|
||||
security_count: record.security_count || 0,
|
||||
host_id: record.host_id,
|
||||
timestamp: record.timestamp,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
// For same host, take the latest values (not sum)
|
||||
// This handles cases where a host reports multiple times in the same time period
|
||||
if (item.timestamp > acc[item.timeKey].max_timestamp) {
|
||||
acc[item.timeKey].total_packages = item.total_packages;
|
||||
acc[item.timeKey].packages_count = item.packages_count;
|
||||
acc[item.timeKey].security_count = item.security_count;
|
||||
acc[item.timeKey].max_timestamp = item.timestamp;
|
||||
}
|
||||
if (daysInt <= 1) {
|
||||
// For "Last 24 hours", use all individual data points without grouping
|
||||
// Sort by timestamp
|
||||
aggregatedArray = processedData.sort(
|
||||
(a, b) => a.timestamp.getTime() - b.timestamp.getTime(),
|
||||
);
|
||||
} else {
|
||||
// For longer periods, group by timeKey to handle multiple reports from same host in same time period
|
||||
const hostAggregatedData = processedData.reduce((acc, item) => {
|
||||
if (!acc[item.timeKey]) {
|
||||
acc[item.timeKey] = {
|
||||
timeKey: item.timeKey,
|
||||
total_packages: 0,
|
||||
packages_count: 0,
|
||||
security_count: 0,
|
||||
record_count: 0,
|
||||
host_ids: new Set([item.host_id]),
|
||||
min_timestamp: item.timestamp,
|
||||
max_timestamp: item.timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
acc[item.timeKey].record_count += 1;
|
||||
// For same host, take the latest values (not sum)
|
||||
// This handles cases where a host reports multiple times in the same time period
|
||||
if (item.timestamp > acc[item.timeKey].max_timestamp) {
|
||||
acc[item.timeKey].total_packages = item.total_packages;
|
||||
acc[item.timeKey].packages_count = item.packages_count;
|
||||
acc[item.timeKey].security_count = item.security_count;
|
||||
acc[item.timeKey].max_timestamp = item.timestamp;
|
||||
}
|
||||
|
||||
return acc;
|
||||
}, {});
|
||||
acc[item.timeKey].record_count += 1;
|
||||
|
||||
// Convert to array
|
||||
aggregatedArray = Object.values(hostAggregatedData)
|
||||
.map((item) => ({
|
||||
...item,
|
||||
host_count: item.host_ids.size,
|
||||
host_ids: Array.from(item.host_ids),
|
||||
}))
|
||||
.sort((a, b) => a.timeKey.localeCompare(b.timeKey));
|
||||
return acc;
|
||||
}, {});
|
||||
|
||||
// Convert to array
|
||||
aggregatedArray = Object.values(hostAggregatedData)
|
||||
.map((item) => ({
|
||||
...item,
|
||||
host_count: item.host_ids.size,
|
||||
host_ids: Array.from(item.host_ids),
|
||||
}))
|
||||
.sort((a, b) => a.timeKey.localeCompare(b.timeKey));
|
||||
}
|
||||
}
|
||||
|
||||
// Handle sparse data by filling missing time periods
|
||||
const fillMissingPeriods = (data, daysInt) => {
|
||||
if (data.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// For "Last 24 hours", return data as-is without filling gaps
|
||||
// This allows plotting all individual data points
|
||||
if (daysInt <= 1) {
|
||||
return data;
|
||||
}
|
||||
|
||||
const filledData = [];
|
||||
const startDate = new Date();
|
||||
startDate.setDate(startDate.getDate() - daysInt);
|
||||
@@ -741,50 +783,58 @@ router.get(
|
||||
const endDate = new Date();
|
||||
const currentDate = new Date(startDate);
|
||||
|
||||
// Find the last known values for interpolation
|
||||
// Sort data by timeKey to get chronological order
|
||||
const sortedData = [...data].sort((a, b) =>
|
||||
a.timeKey.localeCompare(b.timeKey),
|
||||
);
|
||||
|
||||
// Find the first actual data point (don't fill before this)
|
||||
const firstDataPoint = sortedData[0];
|
||||
const firstDataTimeKey = firstDataPoint?.timeKey;
|
||||
|
||||
// Track last known values as we iterate forward
|
||||
let lastKnownValues = null;
|
||||
if (data.length > 0) {
|
||||
lastKnownValues = {
|
||||
total_packages: data[0].total_packages,
|
||||
packages_count: data[0].packages_count,
|
||||
security_count: data[0].security_count,
|
||||
};
|
||||
}
|
||||
let hasSeenFirstDataPoint = false;
|
||||
|
||||
while (currentDate <= endDate) {
|
||||
let timeKey;
|
||||
if (daysInt <= 1) {
|
||||
timeKey = currentDate.toISOString().substring(0, 13); // Hourly
|
||||
currentDate.setHours(currentDate.getHours() + 1);
|
||||
} else {
|
||||
timeKey = currentDate.toISOString().split("T")[0]; // Daily
|
||||
currentDate.setDate(currentDate.getDate() + 1);
|
||||
// For daily view, group by day
|
||||
timeKey = currentDate.toISOString().split("T")[0]; // YYYY-MM-DD
|
||||
currentDate.setDate(currentDate.getDate() + 1);
|
||||
|
||||
// Skip periods before the first actual data point
|
||||
if (firstDataTimeKey && timeKey < firstDataTimeKey) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (dataMap.has(timeKey)) {
|
||||
const item = dataMap.get(timeKey);
|
||||
filledData.push(item);
|
||||
// Update last known values
|
||||
// Update last known values with actual data
|
||||
lastKnownValues = {
|
||||
total_packages: item.total_packages,
|
||||
packages_count: item.packages_count,
|
||||
security_count: item.security_count,
|
||||
total_packages: item.total_packages || 0,
|
||||
packages_count: item.packages_count || 0,
|
||||
security_count: item.security_count || 0,
|
||||
};
|
||||
hasSeenFirstDataPoint = true;
|
||||
} else {
|
||||
// For missing periods, use the last known values (interpolation)
|
||||
// This creates a continuous line instead of gaps
|
||||
filledData.push({
|
||||
timeKey,
|
||||
total_packages: lastKnownValues?.total_packages || 0,
|
||||
packages_count: lastKnownValues?.packages_count || 0,
|
||||
security_count: lastKnownValues?.security_count || 0,
|
||||
record_count: 0,
|
||||
host_count: 0,
|
||||
host_ids: [],
|
||||
min_timestamp: null,
|
||||
max_timestamp: null,
|
||||
isInterpolated: true, // Mark as interpolated for debugging
|
||||
});
|
||||
// For missing periods AFTER the first data point, use forward-fill
|
||||
// Only fill if we have a last known value and we've seen the first data point
|
||||
if (lastKnownValues !== null && hasSeenFirstDataPoint) {
|
||||
filledData.push({
|
||||
timeKey,
|
||||
total_packages: lastKnownValues.total_packages,
|
||||
packages_count: lastKnownValues.packages_count,
|
||||
security_count: lastKnownValues.security_count,
|
||||
record_count: 0,
|
||||
host_count: 0,
|
||||
host_ids: [],
|
||||
min_timestamp: null,
|
||||
max_timestamp: null,
|
||||
isInterpolated: true, // Mark as interpolated for debugging
|
||||
});
|
||||
}
|
||||
// If we haven't seen the first data point yet, skip this period
|
||||
}
|
||||
}
|
||||
|
||||
@@ -810,7 +860,7 @@ router.get(
|
||||
// Get current package state for offline fallback
|
||||
let currentPackageState = null;
|
||||
if (hostId && hostId !== "all" && hostId !== "undefined") {
|
||||
// Get current package counts for specific host
|
||||
// For individual host, get current package counts from host_packages
|
||||
const currentState = await prisma.host_packages.aggregate({
|
||||
where: {
|
||||
host_id: hostId,
|
||||
@@ -841,34 +891,64 @@ router.get(
|
||||
security_count: securityCount,
|
||||
};
|
||||
} else {
|
||||
// Get current package counts for all hosts
|
||||
// Total packages = count of unique packages installed on at least one host
|
||||
const totalPackagesCount = await prisma.packages.count({
|
||||
where: {
|
||||
host_packages: {
|
||||
some: {}, // At least one host has this package
|
||||
// For "All Hosts" mode, use the latest system_statistics record if available
|
||||
// Otherwise calculate from database
|
||||
const latestStats = await prisma.system_statistics.findFirst({
|
||||
orderBy: {
|
||||
timestamp: "desc",
|
||||
},
|
||||
select: {
|
||||
total_packages: true,
|
||||
unique_packages_count: true,
|
||||
unique_security_count: true,
|
||||
timestamp: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (latestStats) {
|
||||
// Use latest system statistics (collected by scheduled job)
|
||||
currentPackageState = {
|
||||
total_packages: latestStats.total_packages,
|
||||
packages_count: latestStats.unique_packages_count,
|
||||
security_count: latestStats.unique_security_count,
|
||||
};
|
||||
} else {
|
||||
// Fallback: calculate from database if no statistics collected yet
|
||||
const totalPackagesCount = await prisma.packages.count({
|
||||
where: {
|
||||
host_packages: {
|
||||
some: {}, // At least one host has this package
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
// Get counts for boolean fields separately
|
||||
const outdatedCount = await prisma.host_packages.count({
|
||||
where: {
|
||||
needs_update: true,
|
||||
},
|
||||
});
|
||||
const uniqueOutdatedCount = await prisma.packages.count({
|
||||
where: {
|
||||
host_packages: {
|
||||
some: {
|
||||
needs_update: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const securityCount = await prisma.host_packages.count({
|
||||
where: {
|
||||
is_security_update: true,
|
||||
},
|
||||
});
|
||||
const uniqueSecurityCount = await prisma.packages.count({
|
||||
where: {
|
||||
host_packages: {
|
||||
some: {
|
||||
needs_update: true,
|
||||
is_security_update: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
currentPackageState = {
|
||||
total_packages: totalPackagesCount,
|
||||
packages_count: outdatedCount,
|
||||
security_count: securityCount,
|
||||
};
|
||||
currentPackageState = {
|
||||
total_packages: totalPackagesCount,
|
||||
packages_count: uniqueOutdatedCount,
|
||||
security_count: uniqueSecurityCount,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Format data for chart
|
||||
@@ -923,6 +1003,11 @@ router.get(
|
||||
chartData.datasets[2].data.push(item.security_count);
|
||||
});
|
||||
|
||||
// Replace the last label with "Now" to indicate current state
|
||||
if (chartData.labels.length > 0) {
|
||||
chartData.labels[chartData.labels.length - 1] = "Now";
|
||||
}
|
||||
|
||||
// Calculate data quality metrics
|
||||
const dataQuality = {
|
||||
totalRecords: trendsData.length,
|
||||
|
||||
@@ -2,6 +2,7 @@ const express = require("express");
|
||||
const { authenticateToken } = require("../middleware/auth");
|
||||
const { getPrismaClient } = require("../config/prisma");
|
||||
const { v4: uuidv4 } = require("uuid");
|
||||
const { get_current_time, parse_date } = require("../utils/timezone");
|
||||
|
||||
const prisma = getPrismaClient();
|
||||
const router = express.Router();
|
||||
@@ -537,14 +538,7 @@ router.post("/collect", async (req, res) => {
|
||||
return res.status(401).json({ error: "Invalid API credentials" });
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
|
||||
// Helper function to validate and parse dates
|
||||
const parseDate = (dateString) => {
|
||||
if (!dateString) return now;
|
||||
const date = new Date(dateString);
|
||||
return Number.isNaN(date.getTime()) ? now : date;
|
||||
};
|
||||
const now = get_current_time();
|
||||
|
||||
// Process containers
|
||||
if (containers && Array.isArray(containers)) {
|
||||
@@ -572,7 +566,7 @@ router.post("/collect", async (req, res) => {
|
||||
tag: containerData.image_tag,
|
||||
image_id: containerData.image_id || "unknown",
|
||||
source: containerData.image_source || "docker-hub",
|
||||
created_at: parseDate(containerData.created_at),
|
||||
created_at: parse_date(containerData.created_at, now),
|
||||
last_checked: now,
|
||||
updated_at: now,
|
||||
},
|
||||
@@ -597,7 +591,7 @@ router.post("/collect", async (req, res) => {
|
||||
state: containerData.state,
|
||||
ports: containerData.ports || null,
|
||||
started_at: containerData.started_at
|
||||
? parseDate(containerData.started_at)
|
||||
? parse_date(containerData.started_at, null)
|
||||
: null,
|
||||
updated_at: now,
|
||||
last_checked: now,
|
||||
@@ -613,9 +607,9 @@ router.post("/collect", async (req, res) => {
|
||||
status: containerData.status,
|
||||
state: containerData.state,
|
||||
ports: containerData.ports || null,
|
||||
created_at: parseDate(containerData.created_at),
|
||||
created_at: parse_date(containerData.created_at, now),
|
||||
started_at: containerData.started_at
|
||||
? parseDate(containerData.started_at)
|
||||
? parse_date(containerData.started_at, null)
|
||||
: null,
|
||||
updated_at: now,
|
||||
},
|
||||
@@ -651,7 +645,7 @@ router.post("/collect", async (req, res) => {
|
||||
? BigInt(imageData.size_bytes)
|
||||
: null,
|
||||
source: imageData.source || "docker-hub",
|
||||
created_at: parseDate(imageData.created_at),
|
||||
created_at: parse_date(imageData.created_at, now),
|
||||
updated_at: now,
|
||||
},
|
||||
});
|
||||
@@ -780,14 +774,7 @@ router.post("/../integrations/docker", async (req, res) => {
|
||||
`[Docker Integration] Processing for host: ${host.friendly_name}`,
|
||||
);
|
||||
|
||||
const now = new Date();
|
||||
|
||||
// Helper function to validate and parse dates
|
||||
const parseDate = (dateString) => {
|
||||
if (!dateString) return now;
|
||||
const date = new Date(dateString);
|
||||
return Number.isNaN(date.getTime()) ? now : date;
|
||||
};
|
||||
const now = get_current_time();
|
||||
|
||||
let containersProcessed = 0;
|
||||
let imagesProcessed = 0;
|
||||
@@ -822,7 +809,7 @@ router.post("/../integrations/docker", async (req, res) => {
|
||||
tag: containerData.image_tag,
|
||||
image_id: containerData.image_id || "unknown",
|
||||
source: containerData.image_source || "docker-hub",
|
||||
created_at: parseDate(containerData.created_at),
|
||||
created_at: parse_date(containerData.created_at, now),
|
||||
last_checked: now,
|
||||
updated_at: now,
|
||||
},
|
||||
@@ -847,7 +834,7 @@ router.post("/../integrations/docker", async (req, res) => {
|
||||
state: containerData.state || containerData.status,
|
||||
ports: containerData.ports || null,
|
||||
started_at: containerData.started_at
|
||||
? parseDate(containerData.started_at)
|
||||
? parse_date(containerData.started_at, null)
|
||||
: null,
|
||||
updated_at: now,
|
||||
last_checked: now,
|
||||
@@ -863,9 +850,9 @@ router.post("/../integrations/docker", async (req, res) => {
|
||||
status: containerData.status,
|
||||
state: containerData.state || containerData.status,
|
||||
ports: containerData.ports || null,
|
||||
created_at: parseDate(containerData.created_at),
|
||||
created_at: parse_date(containerData.created_at, now),
|
||||
started_at: containerData.started_at
|
||||
? parseDate(containerData.started_at)
|
||||
? parse_date(containerData.started_at, null)
|
||||
: null,
|
||||
updated_at: now,
|
||||
},
|
||||
@@ -911,7 +898,7 @@ router.post("/../integrations/docker", async (req, res) => {
|
||||
? BigInt(imageData.size_bytes)
|
||||
: null,
|
||||
source: imageSource,
|
||||
created_at: parseDate(imageData.created_at),
|
||||
created_at: parse_date(imageData.created_at, now),
|
||||
last_checked: now,
|
||||
updated_at: now,
|
||||
},
|
||||
|
||||
@@ -11,10 +11,16 @@ const {
|
||||
requireManageSettings,
|
||||
} = require("../middleware/permissions");
|
||||
const { queueManager, QUEUE_NAMES } = require("../services/automation");
|
||||
const { pushIntegrationToggle, isConnected } = require("../services/agentWs");
|
||||
const agentVersionService = require("../services/agentVersionService");
|
||||
|
||||
const router = express.Router();
|
||||
const prisma = getPrismaClient();
|
||||
|
||||
// In-memory cache for integration states (api_id -> { integration_name -> enabled })
|
||||
// This stores the last known state from successful toggles
|
||||
const integrationStateCache = new Map();
|
||||
|
||||
// Secure endpoint to download the agent script/binary (requires API authentication)
|
||||
router.get("/agent/download", async (req, res) => {
|
||||
try {
|
||||
@@ -128,9 +134,6 @@ router.get("/agent/version", async (req, res) => {
|
||||
try {
|
||||
const fs = require("node:fs");
|
||||
const path = require("node:path");
|
||||
const { exec } = require("node:child_process");
|
||||
const { promisify } = require("node:util");
|
||||
const execAsync = promisify(exec);
|
||||
|
||||
// Get architecture parameter (default to amd64 for Go agents)
|
||||
const architecture = req.query.arch || "amd64";
|
||||
@@ -165,53 +168,108 @@ router.get("/agent/version", async (req, res) => {
|
||||
minServerVersion: null,
|
||||
});
|
||||
} else {
|
||||
// Go agent version check (binary)
|
||||
const binaryName = `patchmon-agent-linux-${architecture}`;
|
||||
const binaryPath = path.join(__dirname, "../../../agents", binaryName);
|
||||
// Go agent version check
|
||||
// Detect server architecture and map to Go architecture names
|
||||
const os = require("node:os");
|
||||
const { exec } = require("node:child_process");
|
||||
const { promisify } = require("node:util");
|
||||
const execAsync = promisify(exec);
|
||||
|
||||
if (!fs.existsSync(binaryPath)) {
|
||||
return res.status(404).json({
|
||||
error: `Go agent binary not found for architecture: ${architecture}`,
|
||||
});
|
||||
const serverArch = os.arch();
|
||||
// Map Node.js architecture to Go architecture names
|
||||
const archMap = {
|
||||
x64: "amd64",
|
||||
ia32: "386",
|
||||
arm64: "arm64",
|
||||
arm: "arm",
|
||||
};
|
||||
const serverGoArch = archMap[serverArch] || serverArch;
|
||||
|
||||
// If requested architecture matches server architecture, execute the binary
|
||||
if (architecture === serverGoArch) {
|
||||
const binaryName = `patchmon-agent-linux-${architecture}`;
|
||||
const binaryPath = path.join(__dirname, "../../../agents", binaryName);
|
||||
|
||||
if (!fs.existsSync(binaryPath)) {
|
||||
// Binary doesn't exist, fall back to GitHub
|
||||
console.log(`Binary ${binaryName} not found, falling back to GitHub`);
|
||||
} else {
|
||||
// Execute the binary to get its version
|
||||
try {
|
||||
const { stdout } = await execAsync(`${binaryPath} --help`, {
|
||||
timeout: 10000,
|
||||
});
|
||||
|
||||
// Parse version from help output (e.g., "PatchMon Agent v1.3.1")
|
||||
const versionMatch = stdout.match(
|
||||
/PatchMon Agent v([0-9]+\.[0-9]+\.[0-9]+)/i,
|
||||
);
|
||||
|
||||
if (versionMatch) {
|
||||
const serverVersion = versionMatch[1];
|
||||
const agentVersion = req.query.currentVersion || serverVersion;
|
||||
|
||||
// Simple version comparison (assuming semantic versioning)
|
||||
const hasUpdate = agentVersion !== serverVersion;
|
||||
|
||||
return res.json({
|
||||
currentVersion: agentVersion,
|
||||
latestVersion: serverVersion,
|
||||
hasUpdate: hasUpdate,
|
||||
downloadUrl: `/api/v1/hosts/agent/download?arch=${architecture}`,
|
||||
releaseNotes: `PatchMon Agent v${serverVersion}`,
|
||||
minServerVersion: null,
|
||||
architecture: architecture,
|
||||
agentType: "go",
|
||||
});
|
||||
}
|
||||
} catch (execError) {
|
||||
// Execution failed, fall back to GitHub
|
||||
console.log(
|
||||
`Failed to execute binary ${binaryName}: ${execError.message}, falling back to GitHub`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Execute the binary to get its version
|
||||
// Fall back to GitHub if architecture doesn't match or binary execution failed
|
||||
try {
|
||||
const { stdout } = await execAsync(`${binaryPath} --help`, {
|
||||
timeout: 10000,
|
||||
});
|
||||
const versionInfo = await agentVersionService.getVersionInfo();
|
||||
const latestVersion = versionInfo.latestVersion;
|
||||
const agentVersion =
|
||||
req.query.currentVersion || latestVersion || "unknown";
|
||||
|
||||
// Parse version from help output (e.g., "PatchMon Agent v1.3.1")
|
||||
const versionMatch = stdout.match(
|
||||
/PatchMon Agent v([0-9]+\.[0-9]+\.[0-9]+)/i,
|
||||
);
|
||||
|
||||
if (!versionMatch) {
|
||||
return res.status(500).json({
|
||||
error: "Could not extract version from agent binary",
|
||||
if (!latestVersion) {
|
||||
return res.status(503).json({
|
||||
error: "Unable to determine latest version from GitHub releases",
|
||||
currentVersion: agentVersion,
|
||||
latestVersion: null,
|
||||
hasUpdate: false,
|
||||
});
|
||||
}
|
||||
|
||||
const serverVersion = versionMatch[1];
|
||||
const agentVersion = req.query.currentVersion || serverVersion;
|
||||
|
||||
// Simple version comparison (assuming semantic versioning)
|
||||
const hasUpdate = agentVersion !== serverVersion;
|
||||
const hasUpdate =
|
||||
agentVersion !== latestVersion && latestVersion !== null;
|
||||
|
||||
res.json({
|
||||
currentVersion: agentVersion,
|
||||
latestVersion: serverVersion,
|
||||
latestVersion: latestVersion,
|
||||
hasUpdate: hasUpdate,
|
||||
downloadUrl: `/api/v1/hosts/agent/download?arch=${architecture}`,
|
||||
releaseNotes: `PatchMon Agent v${serverVersion}`,
|
||||
releaseNotes: `PatchMon Agent v${latestVersion}`,
|
||||
minServerVersion: null,
|
||||
architecture: architecture,
|
||||
agentType: "go",
|
||||
});
|
||||
} catch (execError) {
|
||||
console.error("Failed to execute agent binary:", execError.message);
|
||||
} catch (serviceError) {
|
||||
console.error(
|
||||
"Failed to get version from agentVersionService:",
|
||||
serviceError.message,
|
||||
);
|
||||
return res.status(500).json({
|
||||
error: "Failed to get version from agent binary",
|
||||
error: "Failed to get agent version from service",
|
||||
details: serviceError.message,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1616,10 +1674,14 @@ router.get("/install", async (req, res) => {
|
||||
// Check for --force parameter
|
||||
const forceInstall = req.query.force === "true" || req.query.force === "1";
|
||||
|
||||
// Get architecture parameter (default to amd64)
|
||||
const architecture = req.query.arch || "amd64";
|
||||
// Get architecture parameter (only set if explicitly provided, otherwise let script auto-detect)
|
||||
const architecture = req.query.arch;
|
||||
|
||||
// Inject the API credentials, server URL, curl flags, SSL verify flag, force flag, and architecture into the script
|
||||
// Only set ARCHITECTURE if explicitly provided, otherwise let the script auto-detect
|
||||
const archExport = architecture
|
||||
? `export ARCHITECTURE="${architecture}"\n`
|
||||
: "";
|
||||
const envVars = `#!/bin/bash
|
||||
export PATCHMON_URL="${serverUrl}"
|
||||
export API_ID="${host.api_id}"
|
||||
@@ -1627,8 +1689,7 @@ export API_KEY="${host.api_key}"
|
||||
export CURL_FLAGS="${curlFlags}"
|
||||
export SKIP_SSL_VERIFY="${skipSSLVerify}"
|
||||
export FORCE_INSTALL="${forceInstall ? "true" : "false"}"
|
||||
export ARCHITECTURE="${architecture}"
|
||||
|
||||
${archExport}
|
||||
`;
|
||||
|
||||
// Remove the shebang from the original script and prepend our env vars
|
||||
@@ -2103,4 +2164,137 @@ router.patch(
|
||||
},
|
||||
);
|
||||
|
||||
// Get integration status for a host
|
||||
router.get(
|
||||
"/:hostId/integrations",
|
||||
authenticateToken,
|
||||
requireManageHosts,
|
||||
async (req, res) => {
|
||||
try {
|
||||
const { hostId } = req.params;
|
||||
|
||||
// Get host to verify it exists
|
||||
const host = await prisma.hosts.findUnique({
|
||||
where: { id: hostId },
|
||||
select: { id: true, api_id: true, friendly_name: true },
|
||||
});
|
||||
|
||||
if (!host) {
|
||||
return res.status(404).json({ error: "Host not found" });
|
||||
}
|
||||
|
||||
// Check if agent is connected
|
||||
const connected = isConnected(host.api_id);
|
||||
|
||||
// Get integration states from cache (or defaults if not cached)
|
||||
// Default: all integrations are disabled
|
||||
const cachedState = integrationStateCache.get(host.api_id) || {};
|
||||
const integrations = {
|
||||
docker: cachedState.docker || false, // Default: disabled
|
||||
// Future integrations can be added here
|
||||
};
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
data: {
|
||||
integrations,
|
||||
connected,
|
||||
host: {
|
||||
id: host.id,
|
||||
friendlyName: host.friendly_name,
|
||||
apiId: host.api_id,
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("Get integration status error:", error);
|
||||
res.status(500).json({ error: "Failed to get integration status" });
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Toggle integration status for a host
|
||||
router.post(
|
||||
"/:hostId/integrations/:integrationName/toggle",
|
||||
authenticateToken,
|
||||
requireManageHosts,
|
||||
[body("enabled").isBoolean().withMessage("Enabled status must be a boolean")],
|
||||
async (req, res) => {
|
||||
try {
|
||||
const errors = validationResult(req);
|
||||
if (!errors.isEmpty()) {
|
||||
return res.status(400).json({ errors: errors.array() });
|
||||
}
|
||||
|
||||
const { hostId, integrationName } = req.params;
|
||||
const { enabled } = req.body;
|
||||
|
||||
// Validate integration name
|
||||
const validIntegrations = ["docker"]; // Add more as they're implemented
|
||||
if (!validIntegrations.includes(integrationName)) {
|
||||
return res.status(400).json({
|
||||
error: "Invalid integration name",
|
||||
validIntegrations,
|
||||
});
|
||||
}
|
||||
|
||||
// Get host to verify it exists
|
||||
const host = await prisma.hosts.findUnique({
|
||||
where: { id: hostId },
|
||||
select: { id: true, api_id: true, friendly_name: true },
|
||||
});
|
||||
|
||||
if (!host) {
|
||||
return res.status(404).json({ error: "Host not found" });
|
||||
}
|
||||
|
||||
// Check if agent is connected
|
||||
if (!isConnected(host.api_id)) {
|
||||
return res.status(503).json({
|
||||
error: "Agent is not connected",
|
||||
message:
|
||||
"The agent must be connected via WebSocket to toggle integrations",
|
||||
});
|
||||
}
|
||||
|
||||
// Send WebSocket message to agent
|
||||
const success = pushIntegrationToggle(
|
||||
host.api_id,
|
||||
integrationName,
|
||||
enabled,
|
||||
);
|
||||
|
||||
if (!success) {
|
||||
return res.status(503).json({
|
||||
error: "Failed to send integration toggle",
|
||||
message: "Agent connection may have been lost",
|
||||
});
|
||||
}
|
||||
|
||||
// Update cache with new state
|
||||
if (!integrationStateCache.has(host.api_id)) {
|
||||
integrationStateCache.set(host.api_id, {});
|
||||
}
|
||||
integrationStateCache.get(host.api_id)[integrationName] = enabled;
|
||||
|
||||
res.json({
|
||||
success: true,
|
||||
message: `Integration ${integrationName} ${enabled ? "enabled" : "disabled"} successfully`,
|
||||
data: {
|
||||
integration: integrationName,
|
||||
enabled,
|
||||
host: {
|
||||
id: host.id,
|
||||
friendlyName: host.friendly_name,
|
||||
apiId: host.api_id,
|
||||
},
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("Toggle integration error:", error);
|
||||
res.status(500).json({ error: "Failed to toggle integration" });
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
module.exports = router;
|
||||
|
||||
@@ -60,9 +60,14 @@ router.post(
|
||||
authenticateToken,
|
||||
[
|
||||
body("token")
|
||||
.notEmpty()
|
||||
.withMessage("Token is required")
|
||||
.isString()
|
||||
.withMessage("Token must be a string")
|
||||
.isLength({ min: 6, max: 6 })
|
||||
.withMessage("Token must be 6 digits"),
|
||||
body("token").isNumeric().withMessage("Token must contain only numbers"),
|
||||
.withMessage("Token must be exactly 6 digits")
|
||||
.matches(/^\d{6}$/)
|
||||
.withMessage("Token must contain only numbers"),
|
||||
],
|
||||
async (req, res) => {
|
||||
try {
|
||||
@@ -71,7 +76,11 @@ router.post(
|
||||
return res.status(400).json({ errors: errors.array() });
|
||||
}
|
||||
|
||||
const { token } = req.body;
|
||||
// Ensure token is a string (convert if needed)
|
||||
let { token } = req.body;
|
||||
if (typeof token !== "string") {
|
||||
token = String(token);
|
||||
}
|
||||
const userId = req.user.id;
|
||||
|
||||
// Get user's TFA secret
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
const WebSocket = require("ws");
|
||||
const url = require("node:url");
|
||||
const { get_current_time } = require("../utils/timezone");
|
||||
|
||||
// Connection registry by api_id
|
||||
const apiIdToSocket = new Map();
|
||||
@@ -49,7 +50,29 @@ function init(server, prismaClient) {
|
||||
wss.handleUpgrade(request, socket, head, (ws) => {
|
||||
ws.on("message", (message) => {
|
||||
// Echo back for Bull Board WebSocket
|
||||
ws.send(message);
|
||||
try {
|
||||
ws.send(message);
|
||||
} catch (_err) {
|
||||
// Ignore send errors (connection may be closed)
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("error", (err) => {
|
||||
// Handle WebSocket errors gracefully for Bull Board
|
||||
if (
|
||||
err.code === "WS_ERR_INVALID_CLOSE_CODE" ||
|
||||
err.code === "ECONNRESET" ||
|
||||
err.code === "EPIPE"
|
||||
) {
|
||||
// These are expected errors, just log quietly
|
||||
console.log("[bullboard-ws] connection error:", err.code);
|
||||
} else {
|
||||
console.error("[bullboard-ws] error:", err.message || err);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("close", () => {
|
||||
// Connection closed, no action needed
|
||||
});
|
||||
});
|
||||
return;
|
||||
@@ -117,7 +140,58 @@ function init(server, prismaClient) {
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("close", () => {
|
||||
ws.on("error", (err) => {
|
||||
// Handle WebSocket errors gracefully without crashing
|
||||
// Common errors: invalid close codes (1006), connection resets, etc.
|
||||
if (
|
||||
err.code === "WS_ERR_INVALID_CLOSE_CODE" ||
|
||||
err.message?.includes("invalid status code 1006") ||
|
||||
err.message?.includes("Invalid WebSocket frame")
|
||||
) {
|
||||
// 1006 is a special close code indicating abnormal closure
|
||||
// It cannot be sent in a close frame, but can occur when connection is lost
|
||||
console.log(
|
||||
`[agent-ws] connection error for ${apiId} (abnormal closure):`,
|
||||
err.message || err.code,
|
||||
);
|
||||
} else if (
|
||||
err.code === "ECONNRESET" ||
|
||||
err.code === "EPIPE" ||
|
||||
err.message?.includes("read ECONNRESET")
|
||||
) {
|
||||
// Connection reset errors are common and expected
|
||||
console.log(`[agent-ws] connection reset for ${apiId}`);
|
||||
} else {
|
||||
// Log other errors for debugging
|
||||
console.error(
|
||||
`[agent-ws] error for ${apiId}:`,
|
||||
err.message || err.code || err,
|
||||
);
|
||||
}
|
||||
|
||||
// Clean up connection on error
|
||||
const existing = apiIdToSocket.get(apiId);
|
||||
if (existing === ws) {
|
||||
apiIdToSocket.delete(apiId);
|
||||
connectionMetadata.delete(apiId);
|
||||
// Notify subscribers of disconnection
|
||||
notifyConnectionChange(apiId, false);
|
||||
}
|
||||
|
||||
// Try to close the connection gracefully if still open
|
||||
if (
|
||||
ws.readyState === WebSocket.OPEN ||
|
||||
ws.readyState === WebSocket.CONNECTING
|
||||
) {
|
||||
try {
|
||||
ws.close(1000); // Normal closure
|
||||
} catch {
|
||||
// Ignore errors when closing
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
ws.on("close", (code, reason) => {
|
||||
const existing = apiIdToSocket.get(apiId);
|
||||
if (existing === ws) {
|
||||
apiIdToSocket.delete(apiId);
|
||||
@@ -126,7 +200,7 @@ function init(server, prismaClient) {
|
||||
notifyConnectionChange(apiId, false);
|
||||
}
|
||||
console.log(
|
||||
`[agent-ws] disconnected api_id=${apiId} total=${apiIdToSocket.size}`,
|
||||
`[agent-ws] disconnected api_id=${apiId} code=${code} reason=${reason || "none"} total=${apiIdToSocket.size}`,
|
||||
);
|
||||
});
|
||||
|
||||
@@ -181,6 +255,29 @@ function pushUpdateAgent(apiId) {
|
||||
safeSend(ws, JSON.stringify({ type: "update_agent" }));
|
||||
}
|
||||
|
||||
function pushIntegrationToggle(apiId, integrationName, enabled) {
|
||||
const ws = apiIdToSocket.get(apiId);
|
||||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||||
safeSend(
|
||||
ws,
|
||||
JSON.stringify({
|
||||
type: "integration_toggle",
|
||||
integration: integrationName,
|
||||
enabled: enabled,
|
||||
}),
|
||||
);
|
||||
console.log(
|
||||
`📤 Pushed integration toggle to agent ${apiId}: ${integrationName} = ${enabled}`,
|
||||
);
|
||||
return true;
|
||||
} else {
|
||||
console.log(
|
||||
`⚠️ Agent ${apiId} not connected, cannot push integration toggle, please edit config.yml manually`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function getConnectionByApiId(apiId) {
|
||||
return apiIdToSocket.get(apiId);
|
||||
}
|
||||
@@ -314,7 +411,7 @@ async function handleDockerStatusEvent(apiId, message) {
|
||||
status: status,
|
||||
state: status,
|
||||
updated_at: new Date(timestamp || Date.now()),
|
||||
last_checked: new Date(),
|
||||
last_checked: get_current_time(),
|
||||
},
|
||||
});
|
||||
|
||||
@@ -340,6 +437,7 @@ module.exports = {
|
||||
pushReportNow,
|
||||
pushSettingsUpdate,
|
||||
pushUpdateAgent,
|
||||
pushIntegrationToggle,
|
||||
pushUpdateNotification,
|
||||
pushUpdateNotificationToAll,
|
||||
// Expose read-only view of connected agents
|
||||
|
||||
@@ -139,15 +139,13 @@ class DockerImageUpdateCheck {
|
||||
console.log("🐳 Starting Docker image update check...");
|
||||
|
||||
try {
|
||||
// Get all Docker images that have a digest and repository
|
||||
// Get all Docker images that have a digest
|
||||
// Note: repository is required (non-nullable) in schema, so we don't need to check it
|
||||
const images = await prisma.docker_images.findMany({
|
||||
where: {
|
||||
digest: {
|
||||
not: null,
|
||||
},
|
||||
repository: {
|
||||
not: null,
|
||||
},
|
||||
},
|
||||
include: {
|
||||
docker_image_updates: true,
|
||||
|
||||
@@ -3,6 +3,7 @@ const { redis, redisConnection } = require("./shared/redis");
|
||||
const { prisma } = require("./shared/prisma");
|
||||
const agentWs = require("../agentWs");
|
||||
const { v4: uuidv4 } = require("uuid");
|
||||
const { get_current_time } = require("../../utils/timezone");
|
||||
|
||||
// Import automation classes
|
||||
const GitHubUpdateCheck = require("./githubUpdateCheck");
|
||||
@@ -12,6 +13,7 @@ const OrphanedPackageCleanup = require("./orphanedPackageCleanup");
|
||||
const DockerInventoryCleanup = require("./dockerInventoryCleanup");
|
||||
const DockerImageUpdateCheck = require("./dockerImageUpdateCheck");
|
||||
const MetricsReporting = require("./metricsReporting");
|
||||
const SystemStatistics = require("./systemStatistics");
|
||||
|
||||
// Queue names
|
||||
const QUEUE_NAMES = {
|
||||
@@ -22,6 +24,7 @@ const QUEUE_NAMES = {
|
||||
DOCKER_INVENTORY_CLEANUP: "docker-inventory-cleanup",
|
||||
DOCKER_IMAGE_UPDATE_CHECK: "docker-image-update-check",
|
||||
METRICS_REPORTING: "metrics-reporting",
|
||||
SYSTEM_STATISTICS: "system-statistics",
|
||||
AGENT_COMMANDS: "agent-commands",
|
||||
};
|
||||
|
||||
@@ -105,6 +108,9 @@ class QueueManager {
|
||||
this.automations[QUEUE_NAMES.METRICS_REPORTING] = new MetricsReporting(
|
||||
this,
|
||||
);
|
||||
this.automations[QUEUE_NAMES.SYSTEM_STATISTICS] = new SystemStatistics(
|
||||
this,
|
||||
);
|
||||
|
||||
console.log("✅ All automation classes initialized");
|
||||
}
|
||||
@@ -190,6 +196,15 @@ class QueueManager {
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
// System Statistics Worker
|
||||
this.workers[QUEUE_NAMES.SYSTEM_STATISTICS] = new Worker(
|
||||
QUEUE_NAMES.SYSTEM_STATISTICS,
|
||||
this.automations[QUEUE_NAMES.SYSTEM_STATISTICS].process.bind(
|
||||
this.automations[QUEUE_NAMES.SYSTEM_STATISTICS],
|
||||
),
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
// Agent Commands Worker
|
||||
this.workers[QUEUE_NAMES.AGENT_COMMANDS] = new Worker(
|
||||
QUEUE_NAMES.AGENT_COMMANDS,
|
||||
@@ -216,8 +231,8 @@ class QueueManager {
|
||||
api_id: api_id,
|
||||
status: "active",
|
||||
attempt_number: job.attemptsMade + 1,
|
||||
created_at: new Date(),
|
||||
updated_at: new Date(),
|
||||
created_at: get_current_time(),
|
||||
updated_at: get_current_time(),
|
||||
},
|
||||
});
|
||||
console.log(`📝 Logged job to job_history: ${job.id} (${type})`);
|
||||
@@ -257,8 +272,8 @@ class QueueManager {
|
||||
where: { job_id: job.id },
|
||||
data: {
|
||||
status: "completed",
|
||||
completed_at: new Date(),
|
||||
updated_at: new Date(),
|
||||
completed_at: get_current_time(),
|
||||
updated_at: get_current_time(),
|
||||
},
|
||||
});
|
||||
console.log(`✅ Marked job as completed in job_history: ${job.id}`);
|
||||
@@ -271,8 +286,8 @@ class QueueManager {
|
||||
data: {
|
||||
status: "failed",
|
||||
error_message: error.message,
|
||||
completed_at: new Date(),
|
||||
updated_at: new Date(),
|
||||
completed_at: get_current_time(),
|
||||
updated_at: get_current_time(),
|
||||
},
|
||||
});
|
||||
console.log(`❌ Marked job as failed in job_history: ${job.id}`);
|
||||
@@ -322,6 +337,7 @@ class QueueManager {
|
||||
await this.automations[QUEUE_NAMES.DOCKER_INVENTORY_CLEANUP].schedule();
|
||||
await this.automations[QUEUE_NAMES.DOCKER_IMAGE_UPDATE_CHECK].schedule();
|
||||
await this.automations[QUEUE_NAMES.METRICS_REPORTING].schedule();
|
||||
await this.automations[QUEUE_NAMES.SYSTEM_STATISTICS].schedule();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -357,6 +373,10 @@ class QueueManager {
|
||||
].triggerManual();
|
||||
}
|
||||
|
||||
async triggerSystemStatistics() {
|
||||
return this.automations[QUEUE_NAMES.SYSTEM_STATISTICS].triggerManual();
|
||||
}
|
||||
|
||||
async triggerMetricsReporting() {
|
||||
return this.automations[QUEUE_NAMES.METRICS_REPORTING].triggerManual();
|
||||
}
|
||||
|
||||
140
backend/src/services/automation/systemStatistics.js
Normal file
140
backend/src/services/automation/systemStatistics.js
Normal file
@@ -0,0 +1,140 @@
|
||||
const { prisma } = require("./shared/prisma");
|
||||
const { v4: uuidv4 } = require("uuid");
|
||||
|
||||
/**
|
||||
* System Statistics Collection Automation
|
||||
* Collects aggregated system-wide statistics every 30 minutes
|
||||
* for use in package trends charts
|
||||
*/
|
||||
class SystemStatistics {
|
||||
constructor(queueManager) {
|
||||
this.queueManager = queueManager;
|
||||
this.queueName = "system-statistics";
|
||||
}
|
||||
|
||||
/**
|
||||
* Process system statistics collection job
|
||||
*/
|
||||
async process(_job) {
|
||||
const startTime = Date.now();
|
||||
console.log("📊 Starting system statistics collection...");
|
||||
|
||||
try {
|
||||
// Calculate unique package counts across all hosts
|
||||
const uniquePackagesCount = await prisma.packages.count({
|
||||
where: {
|
||||
host_packages: {
|
||||
some: {
|
||||
needs_update: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const uniqueSecurityCount = await prisma.packages.count({
|
||||
where: {
|
||||
host_packages: {
|
||||
some: {
|
||||
needs_update: true,
|
||||
is_security_update: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// Calculate total unique packages installed on at least one host
|
||||
const totalPackages = await prisma.packages.count({
|
||||
where: {
|
||||
host_packages: {
|
||||
some: {}, // At least one host has this package
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// Calculate total hosts
|
||||
const totalHosts = await prisma.hosts.count({
|
||||
where: {
|
||||
status: "active",
|
||||
},
|
||||
});
|
||||
|
||||
// Calculate hosts needing updates (distinct hosts with packages needing updates)
|
||||
const hostsNeedingUpdates = await prisma.hosts.count({
|
||||
where: {
|
||||
status: "active",
|
||||
host_packages: {
|
||||
some: {
|
||||
needs_update: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// Store statistics in database
|
||||
await prisma.system_statistics.create({
|
||||
data: {
|
||||
id: uuidv4(),
|
||||
unique_packages_count: uniquePackagesCount,
|
||||
unique_security_count: uniqueSecurityCount,
|
||||
total_packages: totalPackages,
|
||||
total_hosts: totalHosts,
|
||||
hosts_needing_updates: hostsNeedingUpdates,
|
||||
timestamp: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
const executionTime = Date.now() - startTime;
|
||||
console.log(
|
||||
`✅ System statistics collection completed in ${executionTime}ms - Unique packages: ${uniquePackagesCount}, Security: ${uniqueSecurityCount}, Total hosts: ${totalHosts}`,
|
||||
);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
uniquePackagesCount,
|
||||
uniqueSecurityCount,
|
||||
totalPackages,
|
||||
totalHosts,
|
||||
hostsNeedingUpdates,
|
||||
executionTime,
|
||||
};
|
||||
} catch (error) {
|
||||
const executionTime = Date.now() - startTime;
|
||||
console.error(
|
||||
`❌ System statistics collection failed after ${executionTime}ms:`,
|
||||
error.message,
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule recurring system statistics collection (every 30 minutes)
|
||||
*/
|
||||
async schedule() {
|
||||
const job = await this.queueManager.queues[this.queueName].add(
|
||||
"system-statistics",
|
||||
{},
|
||||
{
|
||||
repeat: { pattern: "*/30 * * * *" }, // Every 30 minutes
|
||||
jobId: "system-statistics-recurring",
|
||||
},
|
||||
);
|
||||
console.log("✅ System statistics collection scheduled (every 30 minutes)");
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Trigger manual system statistics collection
|
||||
*/
|
||||
async triggerManual() {
|
||||
const job = await this.queueManager.queues[this.queueName].add(
|
||||
"system-statistics-manual",
|
||||
{},
|
||||
{ priority: 1 },
|
||||
);
|
||||
console.log("✅ Manual system statistics collection triggered");
|
||||
return job;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = SystemStatistics;
|
||||
Reference in New Issue
Block a user