fix: Replace SSE with polling for WebSocket status to prevent connection pool exhaustion

- Replace persistent SSE connections with lightweight polling (10s interval)
- Optimize WebSocket status fetching using bulk endpoint instead of N individual calls
- Fix N+1 query problem in /dashboard/hosts endpoint (39 queries → 4 queries)
- Increase database connection pool limit from 5 to 50 via environment variables
- Increase Axios timeout from 10s to 30s for complex operations
- Fix malformed WebSocket routes causing 404 on bulk status endpoint

Fixes timeout issues when adding hosts with multiple WebSocket agents connected.
Reduces database connections from 19 persistent SSE + retries to 1 poll every 10 seconds.
This commit is contained in:
Muhammad Ibrahim
2025-10-28 15:33:55 +00:00
parent ae6afb0ef4
commit 4b6f19c28e
3 changed files with 127 additions and 128 deletions

View File

@@ -193,11 +193,16 @@ router.get(
}, },
); );
// Get hosts with their update status // Get hosts with their update status - OPTIMIZED
router.get("/hosts", authenticateToken, requireViewHosts, async (_req, res) => { router.get("/hosts", authenticateToken, requireViewHosts, async (_req, res) => {
try { try {
// Get settings once (outside the loop)
const settings = await prisma.settings.findFirst();
const updateIntervalMinutes = settings?.update_interval || 60;
const thresholdMinutes = updateIntervalMinutes * 2;
// Fetch hosts with groups
const hosts = await prisma.hosts.findMany({ const hosts = await prisma.hosts.findMany({
// Show all hosts regardless of status
select: { select: {
id: true, id: true,
machine_id: true, machine_id: true,
@@ -223,40 +228,45 @@ router.get("/hosts", authenticateToken, requireViewHosts, async (_req, res) => {
}, },
}, },
}, },
_count: {
select: {
host_packages: {
where: {
needs_update: true,
},
},
},
},
}, },
orderBy: { last_update: "desc" }, orderBy: { last_update: "desc" },
}); });
// Get update counts for each host separately // OPTIMIZATION: Get all package counts in 2 batch queries instead of N*2 queries
const hostsWithUpdateInfo = await Promise.all( const hostIds = hosts.map((h) => h.id);
hosts.map(async (host) => {
const updatesCount = await prisma.host_packages.count({ const [updateCounts, totalCounts] = await Promise.all([
// Get update counts for all hosts at once
prisma.host_packages.groupBy({
by: ["host_id"],
where: { where: {
host_id: host.id, host_id: { in: hostIds },
needs_update: true, needs_update: true,
}, },
}); _count: { id: true },
}),
// Get total packages count for this host // Get total counts for all hosts at once
const totalPackagesCount = await prisma.host_packages.count({ prisma.host_packages.groupBy({
by: ["host_id"],
where: { where: {
host_id: host.id, host_id: { in: hostIds },
}, },
}); _count: { id: true },
}),
]);
// Get the agent update interval setting for stale calculation // Create lookup maps for O(1) access
const settings = await prisma.settings.findFirst(); const updateCountMap = new Map(
const updateIntervalMinutes = settings?.update_interval || 60; updateCounts.map((item) => [item.host_id, item._count.id]),
const thresholdMinutes = updateIntervalMinutes * 2; );
const totalCountMap = new Map(
totalCounts.map((item) => [item.host_id, item._count.id]),
);
// Process hosts with counts from maps (no more DB queries!)
const hostsWithUpdateInfo = hosts.map((host) => {
const updatesCount = updateCountMap.get(host.id) || 0;
const totalPackagesCount = totalCountMap.get(host.id) || 0;
// Calculate effective status based on reporting interval // Calculate effective status based on reporting interval
const isStale = moment(host.last_update).isBefore( const isStale = moment(host.last_update).isBefore(
@@ -276,8 +286,7 @@ router.get("/hosts", authenticateToken, requireViewHosts, async (_req, res) => {
isStale, isStale,
effectiveStatus, effectiveStatus,
}; };
}), });
);
res.json(hostsWithUpdateInfo); res.json(hostsWithUpdateInfo);
} catch (error) { } catch (error) {

View File

@@ -11,7 +11,31 @@ const {
const router = express.Router(); const router = express.Router();
// Get WebSocket connection status by api_id (no database access - pure memory lookup) // Get WebSocket connection status for multiple hosts at once (bulk endpoint)
router.get("/status", authenticateToken, async (req, res) => {
try {
const { apiIds } = req.query; // Comma-separated list of api_ids
const idArray = apiIds ? apiIds.split(",").filter((id) => id.trim()) : [];
const statusMap = {};
idArray.forEach((apiId) => {
statusMap[apiId] = getConnectionInfo(apiId);
});
res.json({
success: true,
data: statusMap,
});
} catch (error) {
console.error("Error fetching bulk WebSocket status:", error);
res.status(500).json({
success: false,
error: "Failed to fetch WebSocket status",
});
}
});
// Get WebSocket connection status by api_id (single endpoint)
router.get("/status/:apiId", authenticateToken, async (req, res) => { router.get("/status/:apiId", authenticateToken, async (req, res) => {
try { try {
const { apiId } = req.params; const { apiId } = req.params;

View File

@@ -402,105 +402,71 @@ const Hosts = () => {
const token = localStorage.getItem("token"); const token = localStorage.getItem("token");
if (!token) return; if (!token) return;
// Fetch initial WebSocket status for all hosts
// Fetch initial WebSocket status for all hosts // Fetch initial WebSocket status for all hosts
const fetchInitialStatus = async () => { const fetchInitialStatus = async () => {
const statusPromises = hosts const apiIds = hosts
.filter((host) => host.api_id) .filter((host) => host.api_id)
.map(async (host) => { .map((host) => host.api_id);
if (apiIds.length === 0) return;
try { try {
const response = await fetch(`/api/v1/ws/status/${host.api_id}`, { const response = await fetch(
`/api/v1/ws/status?apiIds=${apiIds.join(",")}`,
{
headers: { headers: {
Authorization: `Bearer ${token}`, Authorization: `Bearer ${token}`,
}, },
}); },
);
if (response.ok) { if (response.ok) {
const data = await response.json(); const result = await response.json();
return { apiId: host.api_id, status: data.data }; setWsStatusMap(result.data);
} }
} catch (_error) { } catch (_error) {
// Silently handle errors // Silently handle errors
} }
return {
apiId: host.api_id,
status: { connected: false, secure: false },
};
});
const results = await Promise.all(statusPromises);
const initialStatusMap = {};
results.forEach(({ apiId, status }) => {
initialStatusMap[apiId] = status;
});
setWsStatusMap(initialStatusMap);
}; };
fetchInitialStatus(); fetchInitialStatus();
}, [hosts]); }, [hosts]);
// Subscribe to WebSocket status changes for all hosts via SSE // Subscribe to WebSocket status changes for all hosts via polling (lightweight alternative to SSE)
useEffect(() => { useEffect(() => {
if (!hosts || hosts.length === 0) return; if (!hosts || hosts.length === 0) return;
const token = localStorage.getItem("token"); const token = localStorage.getItem("token");
if (!token) return; if (!token) return;
const eventSources = new Map(); // Use polling instead of SSE to avoid connection pool issues
let isMounted = true; // Poll every 10 seconds instead of 19 persistent connections
const pollInterval = setInterval(() => {
const apiIds = hosts
.filter((host) => host.api_id)
.map((host) => host.api_id);
const connectHost = (apiId) => { if (apiIds.length === 0) return;
if (!isMounted || eventSources.has(apiId)) return;
try { fetch(`/api/v1/ws/status?apiIds=${apiIds.join(",")}`, {
const es = new EventSource( headers: {
`/api/v1/ws/status/${apiId}/stream?token=${encodeURIComponent(token)}`, Authorization: `Bearer ${token}`,
); },
})
es.onmessage = (event) => { .then((response) => response.json())
try { .then((result) => {
const data = JSON.parse(event.data); if (result.success && result.data) {
if (isMounted) { setWsStatusMap(result.data);
setWsStatusMap((prev) => { }
const newMap = { ...prev, [apiId]: data }; })
return newMap; .catch(() => {
// Silently handle errors
}); });
} }, 10000); // Poll every 10 seconds
} catch (_err) {
// Silently handle parse errors
}
};
es.onerror = (_error) => {
console.log(`[SSE] Connection error for ${apiId}, retrying...`);
es?.close();
eventSources.delete(apiId);
if (isMounted) {
// Retry connection after 5 seconds with exponential backoff
setTimeout(() => connectHost(apiId), 5000);
}
};
eventSources.set(apiId, es);
} catch (_err) {
// Silently handle connection errors
}
};
// Connect to all hosts
for (const host of hosts) {
if (host.api_id) {
connectHost(host.api_id);
} else {
}
}
// Cleanup function // Cleanup function
return () => { return () => {
isMounted = false; clearInterval(pollInterval);
for (const es of eventSources.values()) {
es.close();
}
eventSources.clear();
}; };
}, [hosts]); }, [hosts]);