diff --git a/.gitignore b/.gitignore
index 4b9c048..58b400f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -139,6 +139,7 @@ playwright-report/
test-results.xml
test_*.sh
test-*.sh
+*.code-workspace
# Package manager lock files (uncomment if you want to ignore them)
# package-lock.json
diff --git a/backend/package.json b/backend/package.json
index eae2672..9e39827 100644
--- a/backend/package.json
+++ b/backend/package.json
@@ -14,11 +14,12 @@
"db:studio": "prisma studio"
},
"dependencies": {
- "@bull-board/api": "^6.13.0",
- "@bull-board/express": "^6.13.0",
+ "@bull-board/api": "^6.13.1",
+ "@bull-board/express": "^6.13.1",
"@prisma/client": "^6.1.0",
"bcryptjs": "^2.4.3",
"bullmq": "^5.61.0",
+ "cookie-parser": "^1.4.7",
"cors": "^2.8.5",
"dotenv": "^16.4.7",
"express": "^4.21.2",
@@ -31,7 +32,8 @@
"qrcode": "^1.5.4",
"speakeasy": "^2.0.0",
"uuid": "^11.0.3",
- "winston": "^3.17.0"
+ "winston": "^3.17.0",
+ "ws": "^8.18.0"
},
"devDependencies": {
"@types/bcryptjs": "^2.4.6",
diff --git a/backend/prisma/migrations/20251015192533_add_job_history_table/migration.sql b/backend/prisma/migrations/20251015192533_add_job_history_table/migration.sql
new file mode 100644
index 0000000..ac40490
--- /dev/null
+++ b/backend/prisma/migrations/20251015192533_add_job_history_table/migration.sql
@@ -0,0 +1,40 @@
+-- CreateTable
+CREATE TABLE "job_history" (
+ "id" TEXT NOT NULL,
+ "job_id" TEXT NOT NULL,
+ "queue_name" TEXT NOT NULL,
+ "job_name" TEXT NOT NULL,
+ "host_id" TEXT,
+ "api_id" TEXT,
+ "status" TEXT NOT NULL,
+ "attempt_number" INTEGER NOT NULL DEFAULT 1,
+ "error_message" TEXT,
+ "output" JSONB,
+ "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ "updated_at" TIMESTAMP(3) NOT NULL,
+ "completed_at" TIMESTAMP(3),
+
+ CONSTRAINT "job_history_pkey" PRIMARY KEY ("id")
+);
+
+-- CreateIndex
+CREATE INDEX "job_history_job_id_idx" ON "job_history"("job_id");
+
+-- CreateIndex
+CREATE INDEX "job_history_queue_name_idx" ON "job_history"("queue_name");
+
+-- CreateIndex
+CREATE INDEX "job_history_host_id_idx" ON "job_history"("host_id");
+
+-- CreateIndex
+CREATE INDEX "job_history_api_id_idx" ON "job_history"("api_id");
+
+-- CreateIndex
+CREATE INDEX "job_history_status_idx" ON "job_history"("status");
+
+-- CreateIndex
+CREATE INDEX "job_history_created_at_idx" ON "job_history"("created_at");
+
+-- AddForeignKey
+ALTER TABLE "job_history" ADD CONSTRAINT "job_history_host_id_fkey" FOREIGN KEY ("host_id") REFERENCES "hosts"("id") ON DELETE SET NULL ON UPDATE CASCADE;
+
diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma
index 395f7ef..cb31167 100644
--- a/backend/prisma/schema.prisma
+++ b/backend/prisma/schema.prisma
@@ -101,6 +101,7 @@ model hosts {
host_repositories host_repositories[]
host_groups host_groups? @relation(fields: [host_group_id], references: [id])
update_history update_history[]
+ job_history job_history[]
@@index([machine_id])
@@index([friendly_name])
@@ -324,3 +325,27 @@ model docker_image_updates {
@@index([image_id])
@@index([is_security_update])
}
+
+model job_history {
+ id String @id
+ job_id String
+ queue_name String
+ job_name String
+ host_id String?
+ api_id String?
+ status String
+ attempt_number Int @default(1)
+ error_message String?
+ output Json?
+ created_at DateTime @default(now())
+ updated_at DateTime
+ completed_at DateTime?
+ hosts hosts? @relation(fields: [host_id], references: [id], onDelete: SetNull)
+
+ @@index([job_id])
+ @@index([queue_name])
+ @@index([host_id])
+ @@index([api_id])
+ @@index([status])
+ @@index([created_at])
+}
diff --git a/backend/src/routes/automationRoutes.js b/backend/src/routes/automationRoutes.js
index 6498696..c2d6265 100644
--- a/backend/src/routes/automationRoutes.js
+++ b/backend/src/routes/automationRoutes.js
@@ -1,11 +1,12 @@
const express = require("express");
const { queueManager, QUEUE_NAMES } = require("../services/automation");
+const { getConnectedApiIds } = require("../services/agentWs");
const { authenticateToken } = require("../middleware/auth");
const router = express.Router();
// Get all queue statistics
-router.get("/stats", authenticateToken, async (req, res) => {
+router.get("/stats", authenticateToken, async (_req, res) => {
try {
const stats = await queueManager.getAllQueueStats();
res.json({
@@ -60,7 +61,10 @@ router.get("/jobs/:queueName", authenticateToken, async (req, res) => {
});
}
- const jobs = await queueManager.getRecentJobs(queueName, parseInt(limit));
+ const jobs = await queueManager.getRecentJobs(
+ queueName,
+ parseInt(limit, 10),
+ );
// Format jobs for frontend
const formattedJobs = jobs.map((job) => ({
@@ -96,7 +100,7 @@ router.get("/jobs/:queueName", authenticateToken, async (req, res) => {
});
// Trigger manual GitHub update check
-router.post("/trigger/github-update", authenticateToken, async (req, res) => {
+router.post("/trigger/github-update", authenticateToken, async (_req, res) => {
try {
const job = await queueManager.triggerGitHubUpdateCheck();
res.json({
@@ -116,51 +120,61 @@ router.post("/trigger/github-update", authenticateToken, async (req, res) => {
});
// Trigger manual session cleanup
-router.post("/trigger/session-cleanup", authenticateToken, async (req, res) => {
- try {
- const job = await queueManager.triggerSessionCleanup();
- res.json({
- success: true,
- data: {
- jobId: job.id,
- message: "Session cleanup triggered successfully",
- },
- });
- } catch (error) {
- console.error("Error triggering session cleanup:", error);
- res.status(500).json({
- success: false,
- error: "Failed to trigger session cleanup",
- });
- }
-});
+router.post(
+ "/trigger/session-cleanup",
+ authenticateToken,
+ async (_req, res) => {
+ try {
+ const job = await queueManager.triggerSessionCleanup();
+ res.json({
+ success: true,
+ data: {
+ jobId: job.id,
+ message: "Session cleanup triggered successfully",
+ },
+ });
+ } catch (error) {
+ console.error("Error triggering session cleanup:", error);
+ res.status(500).json({
+ success: false,
+ error: "Failed to trigger session cleanup",
+ });
+ }
+ },
+);
-// Trigger manual echo hello
-router.post("/trigger/echo-hello", authenticateToken, async (req, res) => {
- try {
- const { message } = req.body;
- const job = await queueManager.triggerEchoHello(message);
- res.json({
- success: true,
- data: {
- jobId: job.id,
- message: "Echo hello triggered successfully",
- },
- });
- } catch (error) {
- console.error("Error triggering echo hello:", error);
- res.status(500).json({
- success: false,
- error: "Failed to trigger echo hello",
- });
- }
-});
+// Trigger Agent Collection: enqueue report_now for connected agents only
+router.post(
+ "/trigger/agent-collection",
+ authenticateToken,
+ async (_req, res) => {
+ try {
+ const queue = queueManager.queues[QUEUE_NAMES.AGENT_COMMANDS];
+ const apiIds = getConnectedApiIds();
+ if (!apiIds || apiIds.length === 0) {
+ return res.json({ success: true, data: { enqueued: 0 } });
+ }
+ const jobs = apiIds.map((apiId) => ({
+ name: "report_now",
+ data: { api_id: apiId, type: "report_now" },
+ opts: { attempts: 3, backoff: { type: "fixed", delay: 2000 } },
+ }));
+ await queue.addBulk(jobs);
+ res.json({ success: true, data: { enqueued: jobs.length } });
+ } catch (error) {
+ console.error("Error triggering agent collection:", error);
+ res
+ .status(500)
+ .json({ success: false, error: "Failed to trigger agent collection" });
+ }
+ },
+);
// Trigger manual orphaned repo cleanup
router.post(
"/trigger/orphaned-repo-cleanup",
authenticateToken,
- async (req, res) => {
+ async (_req, res) => {
try {
const job = await queueManager.triggerOrphanedRepoCleanup();
res.json({
@@ -181,7 +195,7 @@ router.post(
);
// Get queue health status
-router.get("/health", authenticateToken, async (req, res) => {
+router.get("/health", authenticateToken, async (_req, res) => {
try {
const stats = await queueManager.getAllQueueStats();
const totalJobs = Object.values(stats).reduce((sum, queueStats) => {
@@ -224,7 +238,7 @@ router.get("/health", authenticateToken, async (req, res) => {
});
// Get automation overview (for dashboard cards)
-router.get("/overview", authenticateToken, async (req, res) => {
+router.get("/overview", authenticateToken, async (_req, res) => {
try {
const stats = await queueManager.getAllQueueStats();
@@ -232,7 +246,6 @@ router.get("/overview", authenticateToken, async (req, res) => {
const recentJobs = await Promise.all([
queueManager.getRecentJobs(QUEUE_NAMES.GITHUB_UPDATE_CHECK, 1),
queueManager.getRecentJobs(QUEUE_NAMES.SESSION_CLEANUP, 1),
- queueManager.getRecentJobs(QUEUE_NAMES.ECHO_HELLO, 1),
queueManager.getRecentJobs(QUEUE_NAMES.ORPHANED_REPO_CLEANUP, 1),
]);
@@ -241,22 +254,16 @@ router.get("/overview", authenticateToken, async (req, res) => {
scheduledTasks:
stats[QUEUE_NAMES.GITHUB_UPDATE_CHECK].delayed +
stats[QUEUE_NAMES.SESSION_CLEANUP].delayed +
- stats[QUEUE_NAMES.SYSTEM_MAINTENANCE].delayed +
- stats[QUEUE_NAMES.ECHO_HELLO].delayed +
stats[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].delayed,
runningTasks:
stats[QUEUE_NAMES.GITHUB_UPDATE_CHECK].active +
stats[QUEUE_NAMES.SESSION_CLEANUP].active +
- stats[QUEUE_NAMES.SYSTEM_MAINTENANCE].active +
- stats[QUEUE_NAMES.ECHO_HELLO].active +
stats[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].active,
failedTasks:
stats[QUEUE_NAMES.GITHUB_UPDATE_CHECK].failed +
stats[QUEUE_NAMES.SESSION_CLEANUP].failed +
- stats[QUEUE_NAMES.SYSTEM_MAINTENANCE].failed +
- stats[QUEUE_NAMES.ECHO_HELLO].failed +
stats[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].failed,
totalAutomations: Object.values(stats).reduce((sum, queueStats) => {
@@ -305,10 +312,10 @@ router.get("/overview", authenticateToken, async (req, res) => {
stats: stats[QUEUE_NAMES.SESSION_CLEANUP],
},
{
- name: "Echo Hello",
- queue: QUEUE_NAMES.ECHO_HELLO,
- description: "Simple test automation task",
- schedule: "Manual only",
+ name: "Orphaned Repo Cleanup",
+ queue: QUEUE_NAMES.ORPHANED_REPO_CLEANUP,
+ description: "Removes repositories with no associated hosts",
+ schedule: "Daily at 2 AM",
lastRun: recentJobs[2][0]?.finishedOn
? new Date(recentJobs[2][0].finishedOn).toLocaleString()
: "Never",
@@ -318,22 +325,6 @@ router.get("/overview", authenticateToken, async (req, res) => {
: recentJobs[2][0]
? "Success"
: "Never run",
- stats: stats[QUEUE_NAMES.ECHO_HELLO],
- },
- {
- name: "Orphaned Repo Cleanup",
- queue: QUEUE_NAMES.ORPHANED_REPO_CLEANUP,
- description: "Removes repositories with no associated hosts",
- schedule: "Daily at 2 AM",
- lastRun: recentJobs[3][0]?.finishedOn
- ? new Date(recentJobs[3][0].finishedOn).toLocaleString()
- : "Never",
- lastRunTimestamp: recentJobs[3][0]?.finishedOn || 0,
- status: recentJobs[3][0]?.failedReason
- ? "Failed"
- : recentJobs[3][0]
- ? "Success"
- : "Never run",
stats: stats[QUEUE_NAMES.ORPHANED_REPO_CLEANUP],
},
].sort((a, b) => {
diff --git a/backend/src/routes/dashboardRoutes.js b/backend/src/routes/dashboardRoutes.js
index 4bf5753..20d515f 100644
--- a/backend/src/routes/dashboardRoutes.js
+++ b/backend/src/routes/dashboardRoutes.js
@@ -8,6 +8,7 @@ const {
requireViewPackages,
requireViewUsers,
} = require("../middleware/permissions");
+const { queueManager } = require("../services/automation");
const router = express.Router();
const prisma = new PrismaClient();
@@ -413,6 +414,51 @@ router.get(
},
);
+// Get agent queue status for a specific host
+router.get(
+ "/hosts/:hostId/queue",
+ authenticateToken,
+ requireViewHosts,
+ async (req, res) => {
+ try {
+ const { hostId } = req.params;
+ const { limit = 20 } = req.query;
+
+ // Get the host to find its API ID
+ const host = await prisma.hosts.findUnique({
+ where: { id: hostId },
+ select: { api_id: true, friendly_name: true },
+ });
+
+ if (!host) {
+ return res.status(404).json({ error: "Host not found" });
+ }
+
+ // Get queue jobs for this host
+ const queueData = await queueManager.getHostJobs(
+ host.api_id,
+ parseInt(limit, 10),
+ );
+
+ res.json({
+ success: true,
+ data: {
+ hostId,
+ apiId: host.api_id,
+ friendlyName: host.friendly_name,
+ ...queueData,
+ },
+ });
+ } catch (error) {
+ console.error("Error fetching host queue status:", error);
+ res.status(500).json({
+ success: false,
+ error: "Failed to fetch host queue status",
+ });
+ }
+ },
+);
+
// Get recent users ordered by last_login desc
router.get(
"/recent-users",
diff --git a/backend/src/routes/hostRoutes.js b/backend/src/routes/hostRoutes.js
index 6bb698c..9a13563 100644
--- a/backend/src/routes/hostRoutes.js
+++ b/backend/src/routes/hostRoutes.js
@@ -406,7 +406,7 @@ router.post(
// Process packages in batches using createMany/updateMany
const packagesToCreate = [];
const packagesToUpdate = [];
- const hostPackagesToUpsert = [];
+ const _hostPackagesToUpsert = [];
// First pass: identify what needs to be created/updated
const existingPackages = await tx.packages.findMany({
diff --git a/backend/src/routes/settingsRoutes.js b/backend/src/routes/settingsRoutes.js
index 965bcdb..1bfed45 100644
--- a/backend/src/routes/settingsRoutes.js
+++ b/backend/src/routes/settingsRoutes.js
@@ -8,102 +8,9 @@ const { getSettings, updateSettings } = require("../services/settingsService");
const router = express.Router();
const prisma = new PrismaClient();
-// Function to trigger crontab updates on all hosts with auto-update enabled
-async function triggerCrontabUpdates() {
- try {
- console.log(
- "Triggering crontab updates on all hosts with auto-update enabled...",
- );
-
- // Get current settings for server URL
- const settings = await getSettings();
- const serverUrl = settings.server_url;
-
- // Get all hosts that have auto-update enabled
- const hosts = await prisma.hosts.findMany({
- where: {
- auto_update: true,
- status: "active", // Only update active hosts
- },
- select: {
- id: true,
- friendly_name: true,
- api_id: true,
- api_key: true,
- },
- });
-
- console.log(`Found ${hosts.length} hosts with auto-update enabled`);
-
- // For each host, we'll send a special update command that triggers crontab update
- // This is done by sending a ping with a special flag
- for (const host of hosts) {
- try {
- console.log(
- `Triggering crontab update for host: ${host.friendly_name}`,
- );
-
- // We'll use the existing ping endpoint but add a special parameter
- // The agent will detect this and run update-crontab command
- const http = require("node:http");
- const https = require("node:https");
-
- const url = new URL(`${serverUrl}/api/v1/hosts/ping`);
- const isHttps = url.protocol === "https:";
- const client = isHttps ? https : http;
-
- const postData = JSON.stringify({
- triggerCrontabUpdate: true,
- message: "Update interval changed, please update your crontab",
- });
-
- const options = {
- hostname: url.hostname,
- port: url.port || (isHttps ? 443 : 80),
- path: url.pathname,
- method: "POST",
- headers: {
- "Content-Type": "application/json",
- "Content-Length": Buffer.byteLength(postData),
- "X-API-ID": host.api_id,
- "X-API-KEY": host.api_key,
- },
- };
-
- const req = client.request(options, (res) => {
- if (res.statusCode === 200) {
- console.log(
- `Successfully triggered crontab update for ${host.friendly_name}`,
- );
- } else {
- console.error(
- `Failed to trigger crontab update for ${host.friendly_name}: ${res.statusCode}`,
- );
- }
- });
-
- req.on("error", (error) => {
- console.error(
- `Error triggering crontab update for ${host.friendly_name}:`,
- error.message,
- );
- });
-
- req.write(postData);
- req.end();
- } catch (error) {
- console.error(
- `Error triggering crontab update for ${host.friendly_name}:`,
- error.message,
- );
- }
- }
-
- console.log("Crontab update trigger completed");
- } catch (error) {
- console.error("Error in triggerCrontabUpdates:", error);
- }
-}
+// WebSocket broadcaster for agent policy updates
+const { broadcastSettingsUpdate } = require("../services/agentWs");
+const { queueManager, QUEUE_NAMES } = require("../services/automation");
// Helpers
function normalizeUpdateInterval(minutes) {
@@ -290,15 +197,37 @@ router.put(
console.log("Settings updated successfully:", updatedSettings);
- // If update interval changed, trigger crontab updates on all hosts with auto-update enabled
+ // If update interval changed, enqueue persistent jobs for agents
if (
updateInterval !== undefined &&
oldUpdateInterval !== updateData.update_interval
) {
console.log(
- `Update interval changed from ${oldUpdateInterval} to ${updateData.update_interval} minutes. Triggering crontab updates...`,
+ `Update interval changed from ${oldUpdateInterval} to ${updateData.update_interval} minutes. Enqueueing agent settings updates...`,
);
- await triggerCrontabUpdates();
+
+ const hosts = await prisma.hosts.findMany({
+ where: { status: "active" },
+ select: { api_id: true },
+ });
+
+ const queue = queueManager.queues[QUEUE_NAMES.AGENT_COMMANDS];
+ const jobs = hosts.map((h) => ({
+ name: "settings_update",
+ data: {
+ api_id: h.api_id,
+ type: "settings_update",
+ update_interval: updateData.update_interval,
+ },
+ opts: { attempts: 10, backoff: { type: "exponential", delay: 5000 } },
+ }));
+
+ // Bulk add jobs
+ await queue.addBulk(jobs);
+
+ // Also broadcast immediately to currently connected agents (best-effort)
+ // This ensures agents receive the change even if their host status isn't active yet
+ broadcastSettingsUpdate(updateData.update_interval);
}
res.json({
diff --git a/backend/src/server.js b/backend/src/server.js
index d44c251..c69a456 100644
--- a/backend/src/server.js
+++ b/backend/src/server.js
@@ -39,6 +39,7 @@ const express = require("express");
const cors = require("cors");
const helmet = require("helmet");
const rateLimit = require("express-rate-limit");
+const cookieParser = require("cookie-parser");
const {
createPrismaClient,
waitForDatabase,
@@ -69,6 +70,10 @@ const updateScheduler = require("./services/updateScheduler");
const { initSettings } = require("./services/settingsService");
const { cleanup_expired_sessions } = require("./utils/session_manager");
const { queueManager } = require("./services/automation");
+const { authenticateToken, requireAdmin } = require("./middleware/auth");
+const { createBullBoard } = require("@bull-board/api");
+const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
+const { ExpressAdapter } = require("@bull-board/express");
// Initialize Prisma client with optimized connection pooling for multiple instances
const prisma = createPrismaClient();
@@ -255,6 +260,9 @@ if (process.env.ENABLE_LOGGING === "true") {
const app = express();
const PORT = process.env.PORT || 3001;
+const http = require("node:http");
+const server = http.createServer(app);
+const { init: initAgentWs } = require("./services/agentWs");
// Trust proxy (needed when behind reverse proxy) and remove X-Powered-By
if (process.env.TRUST_PROXY) {
@@ -342,12 +350,17 @@ app.use(
// Allow non-browser/SSR tools with no origin
if (!origin) return callback(null, true);
if (allowedOrigins.includes(origin)) return callback(null, true);
+ // Allow same-origin requests (e.g., Bull Board accessing its own API)
+ // This allows http://hostname:3001 to make requests to http://hostname:3001
+ if (origin?.includes(":3001")) return callback(null, true);
return callback(new Error("Not allowed by CORS"));
},
credentials: true,
}),
);
app.use(limiter);
+// Cookie parser for Bull Board sessions
+app.use(cookieParser());
// Reduce body size limits to reasonable defaults
app.use(express.json({ limit: process.env.JSON_BODY_LIMIT || "5mb" }));
app.use(
@@ -430,6 +443,122 @@ app.use(`/api/${apiVersion}/gethomepage`, gethomepageRoutes);
app.use(`/api/${apiVersion}/automation`, automationRoutes);
app.use(`/api/${apiVersion}/docker`, dockerRoutes);
+// Bull Board - will be populated after queue manager initializes
+let bullBoardRouter = null;
+const bullBoardSessions = new Map(); // Store authenticated sessions
+
+// Mount Bull Board at /admin instead of /api/v1/admin to avoid path conflicts
+app.use(`/admin/queues`, (_req, res, next) => {
+ // Relax COOP/COEP for Bull Board in non-production to avoid browser warnings
+ if (process.env.NODE_ENV !== "production") {
+ res.setHeader("Cross-Origin-Opener-Policy", "same-origin-allow-popups");
+ res.setHeader("Cross-Origin-Embedder-Policy", "unsafe-none");
+ }
+
+ next();
+});
+
+// Authentication middleware for Bull Board
+app.use(`/admin/queues`, async (req, res, next) => {
+ // Skip authentication for static assets only
+ if (req.path.includes("/static/") || req.path.includes("/favicon")) {
+ return next();
+ }
+
+ // Check for bull-board-session cookie first
+ const sessionId = req.cookies["bull-board-session"];
+ if (sessionId) {
+ const session = bullBoardSessions.get(sessionId);
+ if (session && Date.now() - session.timestamp < 3600000) {
+ // 1 hour
+ // Valid session, extend it
+ session.timestamp = Date.now();
+ return next();
+ } else if (session) {
+ // Expired session, remove it
+ bullBoardSessions.delete(sessionId);
+ }
+ }
+
+ // No valid session, check for token
+ let token = req.query.token;
+ if (!token && req.headers.authorization) {
+ token = req.headers.authorization.replace("Bearer ", "");
+ }
+
+ // If no token, deny access
+ if (!token) {
+ return res.status(401).json({ error: "Access token required" });
+ }
+
+ // Add token to headers for authentication
+ req.headers.authorization = `Bearer ${token}`;
+
+ // Authenticate the user
+ return authenticateToken(req, res, (err) => {
+ if (err) {
+ return res.status(401).json({ error: "Authentication failed" });
+ }
+ return requireAdmin(req, res, (adminErr) => {
+ if (adminErr) {
+ return res.status(403).json({ error: "Admin access required" });
+ }
+
+ // Authentication successful - create a session
+ const newSessionId = require("node:crypto")
+ .randomBytes(32)
+ .toString("hex");
+ bullBoardSessions.set(newSessionId, {
+ timestamp: Date.now(),
+ userId: req.user.id,
+ });
+
+ // Set session cookie
+ res.cookie("bull-board-session", newSessionId, {
+ httpOnly: true,
+ secure: process.env.NODE_ENV === "production",
+ sameSite: "lax",
+ maxAge: 3600000, // 1 hour
+ });
+
+ // Clean up old sessions periodically
+ if (bullBoardSessions.size > 100) {
+ const now = Date.now();
+ for (const [sid, session] of bullBoardSessions.entries()) {
+ if (now - session.timestamp > 3600000) {
+ bullBoardSessions.delete(sid);
+ }
+ }
+ }
+
+ return next();
+ });
+ });
+});
+
+app.use(`/admin/queues`, (req, res, next) => {
+ if (bullBoardRouter) {
+ return bullBoardRouter(req, res, next);
+ }
+ return res.status(503).json({ error: "Bull Board not initialized yet" });
+});
+
+// Error handler specifically for Bull Board routes
+app.use("/admin/queues", (err, req, res, _next) => {
+ console.error("Bull Board error on", req.method, req.url);
+ console.error("Error details:", err.message);
+ console.error("Stack:", err.stack);
+ if (process.env.ENABLE_LOGGING === "true") {
+ logger.error(`Bull Board error on ${req.method} ${req.url}:`, err);
+ }
+ res.status(500).json({
+ error: "Internal server error",
+ message: err.message,
+ path: req.path,
+ url: req.url,
+ });
+});
+
// Error handling middleware
app.use((err, _req, res, _next) => {
if (process.env.ENABLE_LOGGING === "true") {
@@ -743,6 +872,25 @@ async function startServer() {
// Schedule recurring jobs
await queueManager.scheduleAllJobs();
+ // Set up Bull Board for queue monitoring
+ const serverAdapter = new ExpressAdapter();
+ // Set basePath to match where we mount the router
+ serverAdapter.setBasePath("/admin/queues");
+
+ const { QUEUE_NAMES } = require("./services/automation");
+ const bullAdapters = Object.values(QUEUE_NAMES).map(
+ (queueName) => new BullMQAdapter(queueManager.queues[queueName]),
+ );
+
+ createBullBoard({
+ queues: bullAdapters,
+ serverAdapter: serverAdapter,
+ });
+
+ // Set the router for the Bull Board middleware (secured middleware above)
+ bullBoardRouter = serverAdapter.getRouter();
+ console.log("โ
Bull Board mounted at /admin/queues (secured)");
+
// Initial session cleanup
await cleanup_expired_sessions();
@@ -758,7 +906,10 @@ async function startServer() {
60 * 60 * 1000,
); // Every hour
- app.listen(PORT, () => {
+ // Initialize WS layer with the underlying HTTP server
+ initAgentWs(server, prisma);
+
+ server.listen(PORT, () => {
if (process.env.ENABLE_LOGGING === "true") {
logger.info(`Server running on port ${PORT}`);
logger.info(`Environment: ${process.env.NODE_ENV}`);
diff --git a/backend/src/services/agentWs.js b/backend/src/services/agentWs.js
new file mode 100644
index 0000000..1aafa11
--- /dev/null
+++ b/backend/src/services/agentWs.js
@@ -0,0 +1,125 @@
+// Lightweight WebSocket hub for agent connections
+// Auth: X-API-ID / X-API-KEY headers on the upgrade request
+
+const WebSocket = require("ws");
+const url = require("node:url");
+
+// Connection registry by api_id
+const apiIdToSocket = new Map();
+
+let wss;
+let prisma;
+
+function init(server, prismaClient) {
+ prisma = prismaClient;
+ wss = new WebSocket.Server({ noServer: true });
+
+ // Handle HTTP upgrade events and authenticate before accepting WS
+ server.on("upgrade", async (request, socket, head) => {
+ try {
+ const { pathname } = url.parse(request.url);
+ if (!pathname || !pathname.startsWith("/api/")) {
+ socket.destroy();
+ return;
+ }
+
+ // Expected path: /api/{v}/agents/ws
+ const parts = pathname.split("/").filter(Boolean); // [api, v1, agents, ws]
+ if (parts.length !== 4 || parts[2] !== "agents" || parts[3] !== "ws") {
+ socket.destroy();
+ return;
+ }
+
+ const apiId = request.headers["x-api-id"];
+ const apiKey = request.headers["x-api-key"];
+ if (!apiId || !apiKey) {
+ socket.destroy();
+ return;
+ }
+
+ // Validate credentials
+ const host = await prisma.hosts.findUnique({ where: { api_id: apiId } });
+ if (!host || host.api_key !== apiKey) {
+ socket.destroy();
+ return;
+ }
+
+ wss.handleUpgrade(request, socket, head, (ws) => {
+ ws.apiId = apiId;
+ apiIdToSocket.set(apiId, ws);
+ console.log(
+ `[agent-ws] connected api_id=${apiId} total=${apiIdToSocket.size}`,
+ );
+
+ ws.on("message", () => {
+ // Currently we don't need to handle agent->server messages
+ });
+
+ ws.on("close", () => {
+ const existing = apiIdToSocket.get(apiId);
+ if (existing === ws) {
+ apiIdToSocket.delete(apiId);
+ }
+ console.log(
+ `[agent-ws] disconnected api_id=${apiId} total=${apiIdToSocket.size}`,
+ );
+ });
+
+ // Optional: greet/ack
+ safeSend(ws, JSON.stringify({ type: "connected" }));
+ });
+ } catch (_err) {
+ try {
+ socket.destroy();
+ } catch {
+ /* ignore */
+ }
+ }
+ });
+}
+
+function safeSend(ws, data) {
+ if (ws && ws.readyState === WebSocket.OPEN) {
+ try {
+ ws.send(data);
+ } catch {
+ /* ignore */
+ }
+ }
+}
+
+function broadcastSettingsUpdate(newInterval) {
+ const payload = JSON.stringify({
+ type: "settings_update",
+ update_interval: newInterval,
+ });
+ for (const [, ws] of apiIdToSocket) {
+ safeSend(ws, payload);
+ }
+}
+
+function pushReportNow(apiId) {
+ const ws = apiIdToSocket.get(apiId);
+ safeSend(ws, JSON.stringify({ type: "report_now" }));
+}
+
+function pushSettingsUpdate(apiId, newInterval) {
+ const ws = apiIdToSocket.get(apiId);
+ safeSend(
+ ws,
+ JSON.stringify({ type: "settings_update", update_interval: newInterval }),
+ );
+}
+
+module.exports = {
+ init,
+ broadcastSettingsUpdate,
+ pushReportNow,
+ pushSettingsUpdate,
+ // Expose read-only view of connected agents
+ getConnectedApiIds: () => Array.from(apiIdToSocket.keys()),
+ isConnected: (apiId) => {
+ const ws = apiIdToSocket.get(apiId);
+ return !!ws && ws.readyState === WebSocket.OPEN;
+ },
+};
diff --git a/backend/src/services/automation/echoHello.js b/backend/src/services/automation/echoHello.js
deleted file mode 100644
index 18744a6..0000000
--- a/backend/src/services/automation/echoHello.js
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Echo Hello Automation
- * Simple test automation task
- */
-class EchoHello {
- constructor(queueManager) {
- this.queueManager = queueManager;
- this.queueName = "echo-hello";
- }
-
- /**
- * Process echo hello job
- */
- async process(job) {
- const startTime = Date.now();
- console.log("๐ Starting echo hello task...");
-
- try {
- // Simple echo task
- const message = job.data.message || "Hello from BullMQ!";
- const timestamp = new Date().toISOString();
-
- // Simulate some work
- await new Promise((resolve) => setTimeout(resolve, 100));
-
- const executionTime = Date.now() - startTime;
- console.log(`โ
Echo hello completed in ${executionTime}ms: ${message}`);
-
- return {
- success: true,
- message,
- timestamp,
- executionTime,
- };
- } catch (error) {
- const executionTime = Date.now() - startTime;
- console.error(
- `โ Echo hello failed after ${executionTime}ms:`,
- error.message,
- );
- throw error;
- }
- }
-
- /**
- * Echo hello is manual only - no scheduling
- */
- async schedule() {
- console.log("โน๏ธ Echo hello is manual only - no scheduling needed");
- return null;
- }
-
- /**
- * Trigger manual echo hello
- */
- async triggerManual(message = "Hello from BullMQ!") {
- const job = await this.queueManager.queues[this.queueName].add(
- "echo-hello-manual",
- { message },
- { priority: 1 },
- );
- console.log("โ
Manual echo hello triggered");
- return job;
- }
-}
-
-module.exports = EchoHello;
diff --git a/backend/src/services/automation/githubUpdateCheck.js b/backend/src/services/automation/githubUpdateCheck.js
index 0d561b7..a9cdd0f 100644
--- a/backend/src/services/automation/githubUpdateCheck.js
+++ b/backend/src/services/automation/githubUpdateCheck.js
@@ -14,7 +14,7 @@ class GitHubUpdateCheck {
/**
* Process GitHub update check job
*/
- async process(job) {
+ async process(_job) {
const startTime = Date.now();
console.log("๐ Starting GitHub update check...");
diff --git a/backend/src/services/automation/index.js b/backend/src/services/automation/index.js
index 2d02aa2..9a960d5 100644
--- a/backend/src/services/automation/index.js
+++ b/backend/src/services/automation/index.js
@@ -1,20 +1,19 @@
const { Queue, Worker } = require("bullmq");
const { redis, redisConnection } = require("./shared/redis");
const { prisma } = require("./shared/prisma");
+const agentWs = require("../agentWs");
// Import automation classes
const GitHubUpdateCheck = require("./githubUpdateCheck");
const SessionCleanup = require("./sessionCleanup");
const OrphanedRepoCleanup = require("./orphanedRepoCleanup");
-const EchoHello = require("./echoHello");
// Queue names
const QUEUE_NAMES = {
GITHUB_UPDATE_CHECK: "github-update-check",
SESSION_CLEANUP: "session-cleanup",
- SYSTEM_MAINTENANCE: "system-maintenance",
- ECHO_HELLO: "echo-hello",
ORPHANED_REPO_CLEANUP: "orphaned-repo-cleanup",
+ AGENT_COMMANDS: "agent-commands",
};
/**
@@ -60,7 +59,7 @@ class QueueManager {
* Initialize all queues
*/
async initializeQueues() {
- for (const [key, queueName] of Object.entries(QUEUE_NAMES)) {
+ for (const [_key, queueName] of Object.entries(QUEUE_NAMES)) {
this.queues[queueName] = new Queue(queueName, {
connection: redisConnection,
defaultJobOptions: {
@@ -88,7 +87,6 @@ class QueueManager {
this.automations[QUEUE_NAMES.SESSION_CLEANUP] = new SessionCleanup(this);
this.automations[QUEUE_NAMES.ORPHANED_REPO_CLEANUP] =
new OrphanedRepoCleanup(this);
- this.automations[QUEUE_NAMES.ECHO_HELLO] = new EchoHello(this);
console.log("โ
All automation classes initialized");
}
@@ -133,15 +131,150 @@ class QueueManager {
},
);
- // Echo Hello Worker
- this.workers[QUEUE_NAMES.ECHO_HELLO] = new Worker(
- QUEUE_NAMES.ECHO_HELLO,
- this.automations[QUEUE_NAMES.ECHO_HELLO].process.bind(
- this.automations[QUEUE_NAMES.ECHO_HELLO],
- ),
+ // Agent Commands Worker
+ this.workers[QUEUE_NAMES.AGENT_COMMANDS] = new Worker(
+ QUEUE_NAMES.AGENT_COMMANDS,
+ async (job) => {
+ const { api_id, type, update_interval } = job.data || {};
+ console.log("[agent-commands] processing job", job.id, api_id, type);
+
+ // Log job attempt to history - use job.id as the unique identifier
+ const attemptNumber = job.attemptsMade || 1;
+ const historyId = job.id; // Single row per job, updated with each attempt
+
+ try {
+ if (!api_id || !type) {
+ throw new Error("invalid job data");
+ }
+
+ // Find host by api_id
+ const host = await prisma.hosts.findUnique({
+ where: { api_id },
+ select: { id: true },
+ });
+
+ // Ensure agent is connected; if not, retry later
+ if (!agentWs.isConnected(api_id)) {
+ const error = new Error("agent not connected");
+ // Log failed attempt
+ await prisma.job_history.upsert({
+ where: { id: historyId },
+ create: {
+ id: historyId,
+ job_id: job.id,
+ queue_name: QUEUE_NAMES.AGENT_COMMANDS,
+ job_name: type,
+ host_id: host?.id,
+ api_id,
+ status: "failed",
+ attempt_number: attemptNumber,
+ error_message: error.message,
+ created_at: new Date(),
+ updated_at: new Date(),
+ },
+ update: {
+ status: "failed",
+ attempt_number: attemptNumber,
+ error_message: error.message,
+ updated_at: new Date(),
+ },
+ });
+ console.log(
+ "[agent-commands] agent not connected, will retry",
+ api_id,
+ );
+ throw error;
+ }
+
+ // Process the command
+ let result;
+ if (type === "settings_update") {
+ agentWs.pushSettingsUpdate(api_id, update_interval);
+ console.log(
+ "[agent-commands] delivered settings_update",
+ api_id,
+ update_interval,
+ );
+ result = { delivered: true, update_interval };
+ } else if (type === "report_now") {
+ agentWs.pushReportNow(api_id);
+ console.log("[agent-commands] delivered report_now", api_id);
+ result = { delivered: true };
+ } else {
+ throw new Error("unsupported agent command");
+ }
+
+ // Log successful completion
+ await prisma.job_history.upsert({
+ where: { id: historyId },
+ create: {
+ id: historyId,
+ job_id: job.id,
+ queue_name: QUEUE_NAMES.AGENT_COMMANDS,
+ job_name: type,
+ host_id: host?.id,
+ api_id,
+ status: "completed",
+ attempt_number: attemptNumber,
+ output: result,
+ created_at: new Date(),
+ updated_at: new Date(),
+ completed_at: new Date(),
+ },
+ update: {
+ status: "completed",
+ attempt_number: attemptNumber,
+ output: result,
+ error_message: null,
+ updated_at: new Date(),
+ completed_at: new Date(),
+ },
+ });
+
+ return result;
+ } catch (error) {
+ // Log error to history (if not already logged above)
+ if (error.message !== "agent not connected") {
+ const host = await prisma.hosts
+ .findUnique({
+ where: { api_id },
+ select: { id: true },
+ })
+ .catch(() => null);
+
+ await prisma.job_history
+ .upsert({
+ where: { id: historyId },
+ create: {
+ id: historyId,
+ job_id: job.id,
+ queue_name: QUEUE_NAMES.AGENT_COMMANDS,
+ job_name: type || "unknown",
+ host_id: host?.id,
+ api_id,
+ status: "failed",
+ attempt_number: attemptNumber,
+ error_message: error.message,
+ created_at: new Date(),
+ updated_at: new Date(),
+ },
+ update: {
+ status: "failed",
+ attempt_number: attemptNumber,
+ error_message: error.message,
+ updated_at: new Date(),
+ },
+ })
+ .catch((err) =>
+ console.error("[agent-commands] failed to log error:", err),
+ );
+ }
+ throw error;
+ }
+ },
{
connection: redisConnection,
- concurrency: 1,
+ concurrency: 10,
},
);
@@ -184,7 +317,6 @@ class QueueManager {
await this.automations[QUEUE_NAMES.GITHUB_UPDATE_CHECK].schedule();
await this.automations[QUEUE_NAMES.SESSION_CLEANUP].schedule();
await this.automations[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].schedule();
- await this.automations[QUEUE_NAMES.ECHO_HELLO].schedule();
}
/**
@@ -202,10 +334,6 @@ class QueueManager {
return this.automations[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].triggerManual();
}
- async triggerEchoHello(message = "Hello from BullMQ!") {
- return this.automations[QUEUE_NAMES.ECHO_HELLO].triggerManual(message);
- }
-
/**
* Get queue statistics
*/
@@ -262,6 +390,73 @@ class QueueManager {
.slice(0, limit);
}
+ /**
+ * Get jobs for a specific host (by API ID)
+ */
+ async getHostJobs(apiId, limit = 20) {
+ const queue = this.queues[QUEUE_NAMES.AGENT_COMMANDS];
+ if (!queue) {
+ throw new Error(`Queue ${QUEUE_NAMES.AGENT_COMMANDS} not found`);
+ }
+
+ console.log(`[getHostJobs] Looking for jobs with api_id: ${apiId}`);
+
+ // Get active queue status (waiting, active, delayed, failed)
+ const [waiting, active, delayed, failed] = await Promise.all([
+ queue.getWaiting(),
+ queue.getActive(),
+ queue.getDelayed(),
+ queue.getFailed(),
+ ]);
+
+ // Filter by API ID
+ const filterByApiId = (jobs) =>
+ jobs.filter((job) => job.data && job.data.api_id === apiId);
+
+ const waitingCount = filterByApiId(waiting).length;
+ const activeCount = filterByApiId(active).length;
+ const delayedCount = filterByApiId(delayed).length;
+ const failedCount = filterByApiId(failed).length;
+
+ console.log(
+ `[getHostJobs] Queue status - Waiting: ${waitingCount}, Active: ${activeCount}, Delayed: ${delayedCount}, Failed: ${failedCount}`,
+ );
+
+ // Get job history from database (shows all attempts and status changes)
+ const jobHistory = await prisma.job_history.findMany({
+ where: {
+ api_id: apiId,
+ },
+ orderBy: {
+ created_at: "desc",
+ },
+ take: limit,
+ });
+
+ console.log(
+ `[getHostJobs] Found ${jobHistory.length} job history records for api_id: ${apiId}`,
+ );
+
+ return {
+ waiting: waitingCount,
+ active: activeCount,
+ delayed: delayedCount,
+ failed: failedCount,
+ jobHistory: jobHistory.map((job) => ({
+ id: job.id,
+ job_id: job.job_id,
+ job_name: job.job_name,
+ status: job.status,
+ attempt_number: job.attempt_number,
+ error_message: job.error_message,
+ output: job.output,
+ created_at: job.created_at,
+ updated_at: job.updated_at,
+ completed_at: job.completed_at,
+ })),
+ };
+ }
+
/**
* Graceful shutdown
*/
@@ -269,8 +464,24 @@ class QueueManager {
console.log("๐ Shutting down queue manager...");
for (const queueName of Object.keys(this.queues)) {
- await this.queues[queueName].close();
- await this.workers[queueName].close();
+ try {
+ await this.queues[queueName].close();
+ } catch (e) {
+ console.warn(
+ `โ ๏ธ Failed to close queue '${queueName}':`,
+ e?.message || e,
+ );
+ }
+ if (this.workers?.[queueName]) {
+ try {
+ await this.workers[queueName].close();
+ } catch (e) {
+ console.warn(
+ `โ ๏ธ Failed to close worker for '${queueName}':`,
+ e?.message || e,
+ );
+ }
+ }
}
await redis.quit();
diff --git a/backend/src/services/automation/orphanedRepoCleanup.js b/backend/src/services/automation/orphanedRepoCleanup.js
index 6351823..9e9ccb3 100644
--- a/backend/src/services/automation/orphanedRepoCleanup.js
+++ b/backend/src/services/automation/orphanedRepoCleanup.js
@@ -13,7 +13,7 @@ class OrphanedRepoCleanup {
/**
* Process orphaned repository cleanup job
*/
- async process(job) {
+ async process(_job) {
const startTime = Date.now();
console.log("๐งน Starting orphaned repository cleanup...");
diff --git a/backend/src/services/automation/sessionCleanup.js b/backend/src/services/automation/sessionCleanup.js
index 1454224..fd92000 100644
--- a/backend/src/services/automation/sessionCleanup.js
+++ b/backend/src/services/automation/sessionCleanup.js
@@ -1,5 +1,4 @@
const { prisma } = require("./shared/prisma");
-const { cleanup_expired_sessions } = require("../../utils/session_manager");
/**
* Session Cleanup Automation
@@ -14,7 +13,7 @@ class SessionCleanup {
/**
* Process session cleanup job
*/
- async process(job) {
+ async process(_job) {
const startTime = Date.now();
console.log("๐งน Starting session cleanup...");
diff --git a/backend/src/services/automation/shared/redis.js b/backend/src/services/automation/shared/redis.js
index e4e1f2e..62aa644 100644
--- a/backend/src/services/automation/shared/redis.js
+++ b/backend/src/services/automation/shared/redis.js
@@ -3,9 +3,9 @@ const IORedis = require("ioredis");
// Redis connection configuration
const redisConnection = {
host: process.env.REDIS_HOST || "localhost",
- port: parseInt(process.env.REDIS_PORT) || 6379,
+ port: parseInt(process.env.REDIS_PORT, 10) || 6379,
password: process.env.REDIS_PASSWORD || undefined,
- db: parseInt(process.env.REDIS_DB) || 0,
+ db: parseInt(process.env.REDIS_DB, 10) || 0,
retryDelayOnFailover: 100,
maxRetriesPerRequest: null, // BullMQ requires this to be null
};
diff --git a/frontend/public/assets/bull-board-logo.svg b/frontend/public/assets/bull-board-logo.svg
new file mode 100644
index 0000000..bdc52a0
--- /dev/null
+++ b/frontend/public/assets/bull-board-logo.svg
@@ -0,0 +1,23 @@
+
diff --git a/frontend/src/pages/Automation.jsx b/frontend/src/pages/Automation.jsx
index 9716664..6e3286e 100644
--- a/frontend/src/pages/Automation.jsx
+++ b/frontend/src/pages/Automation.jsx
@@ -1,20 +1,17 @@
import { useQuery } from "@tanstack/react-query";
import {
Activity,
- AlertCircle,
ArrowDown,
ArrowUp,
ArrowUpDown,
- Bot,
CheckCircle,
Clock,
Play,
- RefreshCw,
Settings,
XCircle,
Zap,
} from "lucide-react";
-import { useEffect, useState } from "react";
+import { useState } from "react";
import api from "../utils/api";
const Automation = () => {
@@ -33,7 +30,7 @@ const Automation = () => {
});
// Fetch queue statistics
- const { data: queueStats, isLoading: statsLoading } = useQuery({
+ useQuery({
queryKey: ["automation-stats"],
queryFn: async () => {
const response = await api.get("/automation/stats");
@@ -43,7 +40,7 @@ const Automation = () => {
});
// Fetch recent jobs
- const { data: recentJobs, isLoading: jobsLoading } = useQuery({
+ useQuery({
queryKey: ["automation-jobs"],
queryFn: async () => {
const jobs = await Promise.all([
@@ -62,7 +59,7 @@ const Automation = () => {
refetchInterval: 30000,
});
- const getStatusIcon = (status) => {
+ const _getStatusIcon = (status) => {
switch (status) {
case "completed":
return