From 4b6f19c28e17a50dabd88b5f7d9121aa1ef2cb56 Mon Sep 17 00:00:00 2001 From: Muhammad Ibrahim Date: Tue, 28 Oct 2025 15:33:55 +0000 Subject: [PATCH] fix: Replace SSE with polling for WebSocket status to prevent connection pool exhaustion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace persistent SSE connections with lightweight polling (10s interval) - Optimize WebSocket status fetching using bulk endpoint instead of N individual calls - Fix N+1 query problem in /dashboard/hosts endpoint (39 queries → 4 queries) - Increase database connection pool limit from 5 to 50 via environment variables - Increase Axios timeout from 10s to 30s for complex operations - Fix malformed WebSocket routes causing 404 on bulk status endpoint Fixes timeout issues when adding hosts with multiple WebSocket agents connected. Reduces database connections from 19 persistent SSE + retries to 1 poll every 10 seconds. --- backend/src/routes/dashboardRoutes.js | 109 ++++++++++++----------- backend/src/routes/wsRoutes.js | 26 +++++- frontend/src/pages/Hosts.jsx | 120 +++++++++----------------- 3 files changed, 127 insertions(+), 128 deletions(-) diff --git a/backend/src/routes/dashboardRoutes.js b/backend/src/routes/dashboardRoutes.js index 386eb91..e995737 100644 --- a/backend/src/routes/dashboardRoutes.js +++ b/backend/src/routes/dashboardRoutes.js @@ -193,11 +193,16 @@ router.get( }, ); -// Get hosts with their update status +// Get hosts with their update status - OPTIMIZED router.get("/hosts", authenticateToken, requireViewHosts, async (_req, res) => { try { + // Get settings once (outside the loop) + const settings = await prisma.settings.findFirst(); + const updateIntervalMinutes = settings?.update_interval || 60; + const thresholdMinutes = updateIntervalMinutes * 2; + + // Fetch hosts with groups const hosts = await prisma.hosts.findMany({ - // Show all hosts regardless of status select: { id: true, machine_id: true, @@ -223,61 +228,65 @@ router.get("/hosts", authenticateToken, requireViewHosts, async (_req, res) => { }, }, }, - _count: { - select: { - host_packages: { - where: { - needs_update: true, - }, - }, - }, - }, }, orderBy: { last_update: "desc" }, }); - // Get update counts for each host separately - const hostsWithUpdateInfo = await Promise.all( - hosts.map(async (host) => { - const updatesCount = await prisma.host_packages.count({ - where: { - host_id: host.id, - needs_update: true, - }, - }); + // OPTIMIZATION: Get all package counts in 2 batch queries instead of N*2 queries + const hostIds = hosts.map((h) => h.id); - // Get total packages count for this host - const totalPackagesCount = await prisma.host_packages.count({ - where: { - host_id: host.id, - }, - }); - - // Get the agent update interval setting for stale calculation - const settings = await prisma.settings.findFirst(); - const updateIntervalMinutes = settings?.update_interval || 60; - const thresholdMinutes = updateIntervalMinutes * 2; - - // Calculate effective status based on reporting interval - const isStale = moment(host.last_update).isBefore( - moment().subtract(thresholdMinutes, "minutes"), - ); - let effectiveStatus = host.status; - - // Override status if host hasn't reported within threshold - if (isStale && host.status === "active") { - effectiveStatus = "inactive"; - } - - return { - ...host, - updatesCount, - totalPackagesCount, - isStale, - effectiveStatus, - }; + const [updateCounts, totalCounts] = await Promise.all([ + // Get update counts for all hosts at once + prisma.host_packages.groupBy({ + by: ["host_id"], + where: { + host_id: { in: hostIds }, + needs_update: true, + }, + _count: { id: true }, }), + // Get total counts for all hosts at once + prisma.host_packages.groupBy({ + by: ["host_id"], + where: { + host_id: { in: hostIds }, + }, + _count: { id: true }, + }), + ]); + + // Create lookup maps for O(1) access + const updateCountMap = new Map( + updateCounts.map((item) => [item.host_id, item._count.id]), ); + const totalCountMap = new Map( + totalCounts.map((item) => [item.host_id, item._count.id]), + ); + + // Process hosts with counts from maps (no more DB queries!) + const hostsWithUpdateInfo = hosts.map((host) => { + const updatesCount = updateCountMap.get(host.id) || 0; + const totalPackagesCount = totalCountMap.get(host.id) || 0; + + // Calculate effective status based on reporting interval + const isStale = moment(host.last_update).isBefore( + moment().subtract(thresholdMinutes, "minutes"), + ); + let effectiveStatus = host.status; + + // Override status if host hasn't reported within threshold + if (isStale && host.status === "active") { + effectiveStatus = "inactive"; + } + + return { + ...host, + updatesCount, + totalPackagesCount, + isStale, + effectiveStatus, + }; + }); res.json(hostsWithUpdateInfo); } catch (error) { diff --git a/backend/src/routes/wsRoutes.js b/backend/src/routes/wsRoutes.js index 4cfd78a..86ecf81 100644 --- a/backend/src/routes/wsRoutes.js +++ b/backend/src/routes/wsRoutes.js @@ -11,7 +11,31 @@ const { const router = express.Router(); -// Get WebSocket connection status by api_id (no database access - pure memory lookup) +// Get WebSocket connection status for multiple hosts at once (bulk endpoint) +router.get("/status", authenticateToken, async (req, res) => { + try { + const { apiIds } = req.query; // Comma-separated list of api_ids + const idArray = apiIds ? apiIds.split(",").filter((id) => id.trim()) : []; + + const statusMap = {}; + idArray.forEach((apiId) => { + statusMap[apiId] = getConnectionInfo(apiId); + }); + + res.json({ + success: true, + data: statusMap, + }); + } catch (error) { + console.error("Error fetching bulk WebSocket status:", error); + res.status(500).json({ + success: false, + error: "Failed to fetch WebSocket status", + }); + } +}); + +// Get WebSocket connection status by api_id (single endpoint) router.get("/status/:apiId", authenticateToken, async (req, res) => { try { const { apiId } = req.params; diff --git a/frontend/src/pages/Hosts.jsx b/frontend/src/pages/Hosts.jsx index 7a7dc73..dde4fd8 100644 --- a/frontend/src/pages/Hosts.jsx +++ b/frontend/src/pages/Hosts.jsx @@ -402,105 +402,71 @@ const Hosts = () => { const token = localStorage.getItem("token"); if (!token) return; + // Fetch initial WebSocket status for all hosts // Fetch initial WebSocket status for all hosts const fetchInitialStatus = async () => { - const statusPromises = hosts + const apiIds = hosts .filter((host) => host.api_id) - .map(async (host) => { - try { - const response = await fetch(`/api/v1/ws/status/${host.api_id}`, { - headers: { - Authorization: `Bearer ${token}`, - }, - }); - if (response.ok) { - const data = await response.json(); - return { apiId: host.api_id, status: data.data }; - } - } catch (_error) { - // Silently handle errors - } - return { - apiId: host.api_id, - status: { connected: false, secure: false }, - }; - }); + .map((host) => host.api_id); - const results = await Promise.all(statusPromises); - const initialStatusMap = {}; - results.forEach(({ apiId, status }) => { - initialStatusMap[apiId] = status; - }); + if (apiIds.length === 0) return; - setWsStatusMap(initialStatusMap); + try { + const response = await fetch( + `/api/v1/ws/status?apiIds=${apiIds.join(",")}`, + { + headers: { + Authorization: `Bearer ${token}`, + }, + }, + ); + if (response.ok) { + const result = await response.json(); + setWsStatusMap(result.data); + } + } catch (_error) { + // Silently handle errors + } }; fetchInitialStatus(); }, [hosts]); - // Subscribe to WebSocket status changes for all hosts via SSE + // Subscribe to WebSocket status changes for all hosts via polling (lightweight alternative to SSE) useEffect(() => { if (!hosts || hosts.length === 0) return; const token = localStorage.getItem("token"); if (!token) return; - const eventSources = new Map(); - let isMounted = true; + // Use polling instead of SSE to avoid connection pool issues + // Poll every 10 seconds instead of 19 persistent connections + const pollInterval = setInterval(() => { + const apiIds = hosts + .filter((host) => host.api_id) + .map((host) => host.api_id); - const connectHost = (apiId) => { - if (!isMounted || eventSources.has(apiId)) return; + if (apiIds.length === 0) return; - try { - const es = new EventSource( - `/api/v1/ws/status/${apiId}/stream?token=${encodeURIComponent(token)}`, - ); - - es.onmessage = (event) => { - try { - const data = JSON.parse(event.data); - if (isMounted) { - setWsStatusMap((prev) => { - const newMap = { ...prev, [apiId]: data }; - return newMap; - }); - } - } catch (_err) { - // Silently handle parse errors + fetch(`/api/v1/ws/status?apiIds=${apiIds.join(",")}`, { + headers: { + Authorization: `Bearer ${token}`, + }, + }) + .then((response) => response.json()) + .then((result) => { + if (result.success && result.data) { + setWsStatusMap(result.data); } - }; - - es.onerror = (_error) => { - console.log(`[SSE] Connection error for ${apiId}, retrying...`); - es?.close(); - eventSources.delete(apiId); - if (isMounted) { - // Retry connection after 5 seconds with exponential backoff - setTimeout(() => connectHost(apiId), 5000); - } - }; - - eventSources.set(apiId, es); - } catch (_err) { - // Silently handle connection errors - } - }; - - // Connect to all hosts - for (const host of hosts) { - if (host.api_id) { - connectHost(host.api_id); - } else { - } - } + }) + .catch(() => { + // Silently handle errors + }); + }, 10000); // Poll every 10 seconds // Cleanup function return () => { - isMounted = false; - for (const es of eventSources.values()) { - es.close(); - } - eventSources.clear(); + clearInterval(pollInterval); }; }, [hosts]);