Removed js file for the update checker for github

Added real-time feature for agent status
made some ui improvements on the host details page
This commit is contained in:
Muhammad Ibrahim
2025-10-15 22:15:18 +01:00
parent 9a40d5e6ee
commit 5b77a1328d
9 changed files with 467 additions and 401 deletions

View File

@@ -241,12 +241,15 @@ router.get("/health", authenticateToken, async (_req, res) => {
router.get("/overview", authenticateToken, async (_req, res) => {
try {
const stats = await queueManager.getAllQueueStats();
const { getSettings } = require("../services/settingsService");
const settings = await getSettings();
// Get recent jobs for each queue to show last run times
const recentJobs = await Promise.all([
queueManager.getRecentJobs(QUEUE_NAMES.GITHUB_UPDATE_CHECK, 1),
queueManager.getRecentJobs(QUEUE_NAMES.SESSION_CLEANUP, 1),
queueManager.getRecentJobs(QUEUE_NAMES.ORPHANED_REPO_CLEANUP, 1),
queueManager.getRecentJobs(QUEUE_NAMES.AGENT_COMMANDS, 1),
]);
// Calculate overview metrics
@@ -327,6 +330,22 @@ router.get("/overview", authenticateToken, async (_req, res) => {
: "Never run",
stats: stats[QUEUE_NAMES.ORPHANED_REPO_CLEANUP],
},
{
name: "Collect Host Statistics",
queue: QUEUE_NAMES.AGENT_COMMANDS,
description: "Collects package statistics from all connected agents",
schedule: `Every ${settings.update_interval} minutes (Agent-driven)`,
lastRun: recentJobs[3][0]?.finishedOn
? new Date(recentJobs[3][0].finishedOn).toLocaleString()
: "Never",
lastRunTimestamp: recentJobs[3][0]?.finishedOn || 0,
status: recentJobs[3][0]?.failedReason
? "Failed"
: recentJobs[3][0]
? "Success"
: "Never run",
stats: stats[QUEUE_NAMES.AGENT_COMMANDS],
},
].sort((a, b) => {
// Sort by last run timestamp (most recent first)
// If both have never run (timestamp 0), maintain original order

View File

@@ -0,0 +1,112 @@
const express = require("express");
const { authenticateToken } = require("../middleware/auth");
const {
getConnectionInfo,
subscribeToConnectionChanges,
} = require("../services/agentWs");
const router = express.Router();
// Get WebSocket connection status by api_id (no database access - pure memory lookup)
router.get("/status/:apiId", authenticateToken, async (req, res) => {
try {
const { apiId } = req.params;
// Direct in-memory check - no database query needed
const connectionInfo = getConnectionInfo(apiId);
// Minimal response for maximum speed
res.json({
success: true,
data: connectionInfo,
});
} catch (error) {
console.error("Error fetching WebSocket status:", error);
res.status(500).json({
success: false,
error: "Failed to fetch WebSocket status",
});
}
});
// Server-Sent Events endpoint for real-time status updates (no polling needed!)
router.get("/status/:apiId/stream", async (req, res) => {
try {
const { apiId } = req.params;
// Manual authentication for SSE (EventSource doesn't support custom headers)
const token =
req.query.token || req.headers.authorization?.replace("Bearer ", "");
if (!token) {
return res.status(401).json({ error: "Authentication required" });
}
// Verify token manually
const jwt = require("jsonwebtoken");
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
req.user = decoded;
} catch (_err) {
console.error("[SSE] Invalid token for api_id:", apiId);
return res.status(401).json({ error: "Invalid or expired token" });
}
console.log("[SSE] Client connected for api_id:", apiId);
// Set headers for SSE
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no"); // Disable nginx buffering
// Send initial status immediately
const initialInfo = getConnectionInfo(apiId);
res.write(`data: ${JSON.stringify(initialInfo)}\n\n`);
res.flushHeaders(); // Ensure headers are sent immediately
// Subscribe to connection changes for this specific api_id
const unsubscribe = subscribeToConnectionChanges(apiId, (_connected) => {
try {
// Push update to client instantly when status changes
const connectionInfo = getConnectionInfo(apiId);
console.log(
`[SSE] Pushing status change for ${apiId}: connected=${connectionInfo.connected} secure=${connectionInfo.secure}`,
);
res.write(`data: ${JSON.stringify(connectionInfo)}\n\n`);
} catch (err) {
console.error("[SSE] Error writing to stream:", err);
}
});
// Heartbeat to keep connection alive (every 30 seconds)
const heartbeat = setInterval(() => {
try {
res.write(": heartbeat\n\n");
} catch (err) {
console.error("[SSE] Error writing heartbeat:", err);
clearInterval(heartbeat);
}
}, 30000);
// Cleanup on client disconnect
req.on("close", () => {
console.log("[SSE] Client disconnected for api_id:", apiId);
clearInterval(heartbeat);
unsubscribe();
});
// Handle errors
req.on("error", (err) => {
console.error("[SSE] Request error:", err);
clearInterval(heartbeat);
unsubscribe();
});
} catch (error) {
console.error("[SSE] Unexpected error:", error);
if (!res.headersSent) {
res.status(500).json({ error: "Internal server error" });
}
}
});
module.exports = router;

View File

@@ -66,9 +66,8 @@ const autoEnrollmentRoutes = require("./routes/autoEnrollmentRoutes");
const gethomepageRoutes = require("./routes/gethomepageRoutes");
const automationRoutes = require("./routes/automationRoutes");
const dockerRoutes = require("./routes/dockerRoutes");
const updateScheduler = require("./services/updateScheduler");
const wsRoutes = require("./routes/wsRoutes");
const { initSettings } = require("./services/settingsService");
const { cleanup_expired_sessions } = require("./utils/session_manager");
const { queueManager } = require("./services/automation");
const { authenticateToken, requireAdmin } = require("./middleware/auth");
const { createBullBoard } = require("@bull-board/api");
@@ -442,6 +441,7 @@ app.use(
app.use(`/api/${apiVersion}/gethomepage`, gethomepageRoutes);
app.use(`/api/${apiVersion}/automation`, automationRoutes);
app.use(`/api/${apiVersion}/docker`, dockerRoutes);
app.use(`/api/${apiVersion}/ws`, wsRoutes);
// Bull Board - will be populated after queue manager initializes
let bullBoardRouter = null;
@@ -580,10 +580,6 @@ process.on("SIGINT", async () => {
if (process.env.ENABLE_LOGGING === "true") {
logger.info("SIGINT received, shutting down gracefully");
}
if (app.locals.session_cleanup_interval) {
clearInterval(app.locals.session_cleanup_interval);
}
updateScheduler.stop();
await queueManager.shutdown();
await disconnectPrisma(prisma);
process.exit(0);
@@ -593,10 +589,6 @@ process.on("SIGTERM", async () => {
if (process.env.ENABLE_LOGGING === "true") {
logger.info("SIGTERM received, shutting down gracefully");
}
if (app.locals.session_cleanup_interval) {
clearInterval(app.locals.session_cleanup_interval);
}
updateScheduler.stop();
await queueManager.shutdown();
await disconnectPrisma(prisma);
process.exit(0);
@@ -891,21 +883,6 @@ async function startServer() {
bullBoardRouter = serverAdapter.getRouter();
console.log("✅ Bull Board mounted at /admin/queues (secured)");
// Initial session cleanup
await cleanup_expired_sessions();
// Schedule session cleanup every hour
const session_cleanup_interval = setInterval(
async () => {
try {
await cleanup_expired_sessions();
} catch (error) {
console.error("Session cleanup error:", error);
}
},
60 * 60 * 1000,
); // Every hour
// Initialize WS layer with the underlying HTTP server
initAgentWs(server, prisma);
@@ -913,15 +890,8 @@ async function startServer() {
if (process.env.ENABLE_LOGGING === "true") {
logger.info(`Server running on port ${PORT}`);
logger.info(`Environment: ${process.env.NODE_ENV}`);
logger.info("✅ Session cleanup scheduled (every hour)");
}
// Start update scheduler
updateScheduler.start();
});
// Store interval for cleanup on shutdown
app.locals.session_cleanup_interval = session_cleanup_interval;
} catch (error) {
console.error("❌ Failed to start server:", error.message);
process.exit(1);

View File

@@ -7,6 +7,14 @@ const url = require("node:url");
// Connection registry by api_id
const apiIdToSocket = new Map();
// Connection metadata (secure/insecure)
// Map<api_id, { ws: WebSocket, secure: boolean }>
const connectionMetadata = new Map();
// Subscribers for connection status changes (for SSE)
// Map<api_id, Set<callback>>
const connectionChangeSubscribers = new Map();
let wss;
let prisma;
@@ -46,11 +54,21 @@ function init(server, prismaClient) {
wss.handleUpgrade(request, socket, head, (ws) => {
ws.apiId = apiId;
// Detect if connection is secure (wss://) or not (ws://)
const isSecure =
socket.encrypted || request.headers["x-forwarded-proto"] === "https";
apiIdToSocket.set(apiId, ws);
connectionMetadata.set(apiId, { ws, secure: isSecure });
console.log(
`[agent-ws] connected api_id=${apiId} total=${apiIdToSocket.size}`,
`[agent-ws] connected api_id=${apiId} protocol=${isSecure ? "wss" : "ws"} total=${apiIdToSocket.size}`,
);
// Notify subscribers of connection
notifyConnectionChange(apiId, true);
ws.on("message", () => {
// Currently we don't need to handle agent->server messages
});
@@ -59,6 +77,9 @@ function init(server, prismaClient) {
const existing = apiIdToSocket.get(apiId);
if (existing === ws) {
apiIdToSocket.delete(apiId);
connectionMetadata.delete(apiId);
// Notify subscribers of disconnection
notifyConnectionChange(apiId, false);
}
console.log(
`[agent-ws] disconnected api_id=${apiId} total=${apiIdToSocket.size}`,
@@ -111,6 +132,39 @@ function pushSettingsUpdate(apiId, newInterval) {
);
}
// Notify all subscribers when connection status changes
function notifyConnectionChange(apiId, connected) {
const subscribers = connectionChangeSubscribers.get(apiId);
if (subscribers) {
for (const callback of subscribers) {
try {
callback(connected);
} catch (err) {
console.error(`[agent-ws] error notifying subscriber:`, err);
}
}
}
}
// Subscribe to connection status changes for a specific api_id
function subscribeToConnectionChanges(apiId, callback) {
if (!connectionChangeSubscribers.has(apiId)) {
connectionChangeSubscribers.set(apiId, new Set());
}
connectionChangeSubscribers.get(apiId).add(callback);
// Return unsubscribe function
return () => {
const subscribers = connectionChangeSubscribers.get(apiId);
if (subscribers) {
subscribers.delete(callback);
if (subscribers.size === 0) {
connectionChangeSubscribers.delete(apiId);
}
}
};
}
module.exports = {
init,
broadcastSettingsUpdate,
@@ -122,4 +176,15 @@ module.exports = {
const ws = apiIdToSocket.get(apiId);
return !!ws && ws.readyState === WebSocket.OPEN;
},
// Get connection info including protocol (ws/wss)
getConnectionInfo: (apiId) => {
const metadata = connectionMetadata.get(apiId);
if (!metadata) {
return { connected: false, secure: false };
}
const connected = metadata.ws.readyState === WebSocket.OPEN;
return { connected, secure: metadata.secure };
},
// Subscribe to connection status changes (for SSE)
subscribeToConnectionChanges,
};

View File

@@ -1,295 +0,0 @@
const { PrismaClient } = require("@prisma/client");
const { exec } = require("node:child_process");
const { promisify } = require("node:util");
const prisma = new PrismaClient();
const execAsync = promisify(exec);
class UpdateScheduler {
constructor() {
this.isRunning = false;
this.intervalId = null;
this.checkInterval = 24 * 60 * 60 * 1000; // 24 hours in milliseconds
}
// Start the scheduler
start() {
if (this.isRunning) {
console.log("Update scheduler is already running");
return;
}
console.log("🔄 Starting update scheduler...");
this.isRunning = true;
// Run initial check
this.checkForUpdates();
// Schedule regular checks
this.intervalId = setInterval(() => {
this.checkForUpdates();
}, this.checkInterval);
console.log(
`✅ Update scheduler started - checking every ${this.checkInterval / (60 * 60 * 1000)} hours`,
);
}
// Stop the scheduler
stop() {
if (!this.isRunning) {
console.log("Update scheduler is not running");
return;
}
console.log("🛑 Stopping update scheduler...");
this.isRunning = false;
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
console.log("✅ Update scheduler stopped");
}
// Check for updates
async checkForUpdates() {
try {
console.log("🔍 Checking for updates...");
// Get settings
const settings = await prisma.settings.findFirst();
const DEFAULT_GITHUB_REPO = "https://github.com/patchMon/patchmon";
const repoUrl = settings?.githubRepoUrl || DEFAULT_GITHUB_REPO;
let owner, repo;
if (repoUrl.includes("git@github.com:")) {
const match = repoUrl.match(/git@github\.com:([^/]+)\/([^/]+)\.git/);
if (match) {
[, owner, repo] = match;
}
} else if (repoUrl.includes("github.com/")) {
const match = repoUrl.match(
/github\.com\/([^/]+)\/([^/]+?)(?:\.git)?$/,
);
if (match) {
[, owner, repo] = match;
}
}
if (!owner || !repo) {
console.log(
"⚠️ Could not parse GitHub repository URL, skipping update check",
);
return;
}
let latestVersion;
const isPrivate = settings.repositoryType === "private";
if (isPrivate) {
// Use SSH for private repositories
latestVersion = await this.checkPrivateRepo(settings, owner, repo);
} else {
// Use GitHub API for public repositories
latestVersion = await this.checkPublicRepo(owner, repo);
}
if (!latestVersion) {
console.log(
"⚠️ Could not determine latest version, skipping update check",
);
return;
}
// Read version from package.json dynamically
let currentVersion = "1.2.9"; // fallback
try {
const packageJson = require("../../package.json");
if (packageJson?.version) {
currentVersion = packageJson.version;
}
} catch (packageError) {
console.warn(
"Could not read version from package.json, using fallback:",
packageError.message,
);
}
const isUpdateAvailable =
this.compareVersions(latestVersion, currentVersion) > 0;
// Update settings with check results
await prisma.settings.update({
where: { id: settings.id },
data: {
last_update_check: new Date(),
update_available: isUpdateAvailable,
latest_version: latestVersion,
},
});
console.log(
`✅ Update check completed - Current: ${currentVersion}, Latest: ${latestVersion}, Update Available: ${isUpdateAvailable}`,
);
} catch (error) {
console.error("❌ Error checking for updates:", error.message);
// Update last check time even on error
try {
const settings = await prisma.settings.findFirst();
if (settings) {
await prisma.settings.update({
where: { id: settings.id },
data: {
last_update_check: new Date(),
update_available: false,
},
});
}
} catch (updateError) {
console.error(
"❌ Error updating last check time:",
updateError.message,
);
}
}
}
// Check private repository using SSH
async checkPrivateRepo(settings, owner, repo) {
try {
let sshKeyPath = settings.sshKeyPath;
// Try to find SSH key if not configured
if (!sshKeyPath) {
const possibleKeyPaths = [
"/root/.ssh/id_ed25519",
"/root/.ssh/id_rsa",
"/home/patchmon/.ssh/id_ed25519",
"/home/patchmon/.ssh/id_rsa",
"/var/www/.ssh/id_ed25519",
"/var/www/.ssh/id_rsa",
];
for (const path of possibleKeyPaths) {
try {
require("node:fs").accessSync(path);
sshKeyPath = path;
break;
} catch {
// Key not found at this path, try next
}
}
}
if (!sshKeyPath) {
throw new Error("No SSH deploy key found");
}
const sshRepoUrl = `git@github.com:${owner}/${repo}.git`;
const env = {
...process.env,
GIT_SSH_COMMAND: `ssh -i ${sshKeyPath} -o StrictHostKeyChecking=no -o IdentitiesOnly=yes`,
};
const { stdout: sshLatestTag } = await execAsync(
`git ls-remote --tags --sort=-version:refname ${sshRepoUrl} | head -n 1 | sed 's/.*refs\\/tags\\///' | sed 's/\\^{}//'`,
{
timeout: 10000,
env: env,
},
);
return sshLatestTag.trim().replace("v", "");
} catch (error) {
console.error("SSH Git error:", error.message);
throw error;
}
}
// Check public repository using GitHub API
async checkPublicRepo(owner, repo) {
try {
const httpsRepoUrl = `https://api.github.com/repos/${owner}/${repo}/releases/latest`;
// Get current version for User-Agent
let currentVersion = "1.2.9"; // fallback
try {
const packageJson = require("../../package.json");
if (packageJson?.version) {
currentVersion = packageJson.version;
}
} catch (packageError) {
console.warn(
"Could not read version from package.json for User-Agent, using fallback:",
packageError.message,
);
}
const response = await fetch(httpsRepoUrl, {
method: "GET",
headers: {
Accept: "application/vnd.github.v3+json",
"User-Agent": `PatchMon-Server/${currentVersion}`,
},
});
if (!response.ok) {
const errorText = await response.text();
if (
errorText.includes("rate limit") ||
errorText.includes("API rate limit")
) {
console.log(
"⚠️ GitHub API rate limit exceeded, skipping update check",
);
return null; // Return null instead of throwing error
}
throw new Error(
`GitHub API error: ${response.status} ${response.statusText}`,
);
}
const releaseData = await response.json();
return releaseData.tag_name.replace("v", "");
} catch (error) {
console.error("GitHub API error:", error.message);
throw error;
}
}
// Compare version strings (semantic versioning)
compareVersions(version1, version2) {
const v1parts = version1.split(".").map(Number);
const v2parts = version2.split(".").map(Number);
const maxLength = Math.max(v1parts.length, v2parts.length);
for (let i = 0; i < maxLength; i++) {
const v1part = v1parts[i] || 0;
const v2part = v2parts[i] || 0;
if (v1part > v2part) return 1;
if (v1part < v2part) return -1;
}
return 0;
}
// Get scheduler status
getStatus() {
return {
isRunning: this.isRunning,
checkInterval: this.checkInterval,
nextCheck: this.isRunning
? new Date(Date.now() + this.checkInterval)
: null,
};
}
}
// Create singleton instance
const updateScheduler = new UpdateScheduler();
module.exports = updateScheduler;