Added support for the new agent mechanism and Binary

Added bullMQ + redis to the platform for automation and queue mechanism
Added new tabs in host details
This commit is contained in:
Muhammad Ibrahim
2025-10-15 20:56:58 +01:00
parent fdd0cfd619
commit 9a40d5e6ee
23 changed files with 1155 additions and 360 deletions

View File

@@ -0,0 +1,125 @@
// Lightweight WebSocket hub for agent connections
// Auth: X-API-ID / X-API-KEY headers on the upgrade request
const WebSocket = require("ws");
const url = require("node:url");
// Connection registry by api_id
const apiIdToSocket = new Map();
let wss;
let prisma;
function init(server, prismaClient) {
prisma = prismaClient;
wss = new WebSocket.Server({ noServer: true });
// Handle HTTP upgrade events and authenticate before accepting WS
server.on("upgrade", async (request, socket, head) => {
try {
const { pathname } = url.parse(request.url);
if (!pathname || !pathname.startsWith("/api/")) {
socket.destroy();
return;
}
// Expected path: /api/{v}/agents/ws
const parts = pathname.split("/").filter(Boolean); // [api, v1, agents, ws]
if (parts.length !== 4 || parts[2] !== "agents" || parts[3] !== "ws") {
socket.destroy();
return;
}
const apiId = request.headers["x-api-id"];
const apiKey = request.headers["x-api-key"];
if (!apiId || !apiKey) {
socket.destroy();
return;
}
// Validate credentials
const host = await prisma.hosts.findUnique({ where: { api_id: apiId } });
if (!host || host.api_key !== apiKey) {
socket.destroy();
return;
}
wss.handleUpgrade(request, socket, head, (ws) => {
ws.apiId = apiId;
apiIdToSocket.set(apiId, ws);
console.log(
`[agent-ws] connected api_id=${apiId} total=${apiIdToSocket.size}`,
);
ws.on("message", () => {
// Currently we don't need to handle agent->server messages
});
ws.on("close", () => {
const existing = apiIdToSocket.get(apiId);
if (existing === ws) {
apiIdToSocket.delete(apiId);
}
console.log(
`[agent-ws] disconnected api_id=${apiId} total=${apiIdToSocket.size}`,
);
});
// Optional: greet/ack
safeSend(ws, JSON.stringify({ type: "connected" }));
});
} catch (_err) {
try {
socket.destroy();
} catch {
/* ignore */
}
}
});
}
function safeSend(ws, data) {
if (ws && ws.readyState === WebSocket.OPEN) {
try {
ws.send(data);
} catch {
/* ignore */
}
}
}
function broadcastSettingsUpdate(newInterval) {
const payload = JSON.stringify({
type: "settings_update",
update_interval: newInterval,
});
for (const [, ws] of apiIdToSocket) {
safeSend(ws, payload);
}
}
function pushReportNow(apiId) {
const ws = apiIdToSocket.get(apiId);
safeSend(ws, JSON.stringify({ type: "report_now" }));
}
function pushSettingsUpdate(apiId, newInterval) {
const ws = apiIdToSocket.get(apiId);
safeSend(
ws,
JSON.stringify({ type: "settings_update", update_interval: newInterval }),
);
}
module.exports = {
init,
broadcastSettingsUpdate,
pushReportNow,
pushSettingsUpdate,
// Expose read-only view of connected agents
getConnectedApiIds: () => Array.from(apiIdToSocket.keys()),
isConnected: (apiId) => {
const ws = apiIdToSocket.get(apiId);
return !!ws && ws.readyState === WebSocket.OPEN;
},
};

View File

@@ -1,67 +0,0 @@
/**
* Echo Hello Automation
* Simple test automation task
*/
class EchoHello {
constructor(queueManager) {
this.queueManager = queueManager;
this.queueName = "echo-hello";
}
/**
* Process echo hello job
*/
async process(job) {
const startTime = Date.now();
console.log("👋 Starting echo hello task...");
try {
// Simple echo task
const message = job.data.message || "Hello from BullMQ!";
const timestamp = new Date().toISOString();
// Simulate some work
await new Promise((resolve) => setTimeout(resolve, 100));
const executionTime = Date.now() - startTime;
console.log(`✅ Echo hello completed in ${executionTime}ms: ${message}`);
return {
success: true,
message,
timestamp,
executionTime,
};
} catch (error) {
const executionTime = Date.now() - startTime;
console.error(
`❌ Echo hello failed after ${executionTime}ms:`,
error.message,
);
throw error;
}
}
/**
* Echo hello is manual only - no scheduling
*/
async schedule() {
console.log(" Echo hello is manual only - no scheduling needed");
return null;
}
/**
* Trigger manual echo hello
*/
async triggerManual(message = "Hello from BullMQ!") {
const job = await this.queueManager.queues[this.queueName].add(
"echo-hello-manual",
{ message },
{ priority: 1 },
);
console.log("✅ Manual echo hello triggered");
return job;
}
}
module.exports = EchoHello;

View File

@@ -14,7 +14,7 @@ class GitHubUpdateCheck {
/**
* Process GitHub update check job
*/
async process(job) {
async process(_job) {
const startTime = Date.now();
console.log("🔍 Starting GitHub update check...");

View File

@@ -1,20 +1,19 @@
const { Queue, Worker } = require("bullmq");
const { redis, redisConnection } = require("./shared/redis");
const { prisma } = require("./shared/prisma");
const agentWs = require("../agentWs");
// Import automation classes
const GitHubUpdateCheck = require("./githubUpdateCheck");
const SessionCleanup = require("./sessionCleanup");
const OrphanedRepoCleanup = require("./orphanedRepoCleanup");
const EchoHello = require("./echoHello");
// Queue names
const QUEUE_NAMES = {
GITHUB_UPDATE_CHECK: "github-update-check",
SESSION_CLEANUP: "session-cleanup",
SYSTEM_MAINTENANCE: "system-maintenance",
ECHO_HELLO: "echo-hello",
ORPHANED_REPO_CLEANUP: "orphaned-repo-cleanup",
AGENT_COMMANDS: "agent-commands",
};
/**
@@ -60,7 +59,7 @@ class QueueManager {
* Initialize all queues
*/
async initializeQueues() {
for (const [key, queueName] of Object.entries(QUEUE_NAMES)) {
for (const [_key, queueName] of Object.entries(QUEUE_NAMES)) {
this.queues[queueName] = new Queue(queueName, {
connection: redisConnection,
defaultJobOptions: {
@@ -88,7 +87,6 @@ class QueueManager {
this.automations[QUEUE_NAMES.SESSION_CLEANUP] = new SessionCleanup(this);
this.automations[QUEUE_NAMES.ORPHANED_REPO_CLEANUP] =
new OrphanedRepoCleanup(this);
this.automations[QUEUE_NAMES.ECHO_HELLO] = new EchoHello(this);
console.log("✅ All automation classes initialized");
}
@@ -133,15 +131,150 @@ class QueueManager {
},
);
// Echo Hello Worker
this.workers[QUEUE_NAMES.ECHO_HELLO] = new Worker(
QUEUE_NAMES.ECHO_HELLO,
this.automations[QUEUE_NAMES.ECHO_HELLO].process.bind(
this.automations[QUEUE_NAMES.ECHO_HELLO],
),
// Agent Commands Worker
this.workers[QUEUE_NAMES.AGENT_COMMANDS] = new Worker(
QUEUE_NAMES.AGENT_COMMANDS,
async (job) => {
const { api_id, type, update_interval } = job.data || {};
console.log("[agent-commands] processing job", job.id, api_id, type);
// Log job attempt to history - use job.id as the unique identifier
const attemptNumber = job.attemptsMade || 1;
const historyId = job.id; // Single row per job, updated with each attempt
try {
if (!api_id || !type) {
throw new Error("invalid job data");
}
// Find host by api_id
const host = await prisma.hosts.findUnique({
where: { api_id },
select: { id: true },
});
// Ensure agent is connected; if not, retry later
if (!agentWs.isConnected(api_id)) {
const error = new Error("agent not connected");
// Log failed attempt
await prisma.job_history.upsert({
where: { id: historyId },
create: {
id: historyId,
job_id: job.id,
queue_name: QUEUE_NAMES.AGENT_COMMANDS,
job_name: type,
host_id: host?.id,
api_id,
status: "failed",
attempt_number: attemptNumber,
error_message: error.message,
created_at: new Date(),
updated_at: new Date(),
},
update: {
status: "failed",
attempt_number: attemptNumber,
error_message: error.message,
updated_at: new Date(),
},
});
console.log(
"[agent-commands] agent not connected, will retry",
api_id,
);
throw error;
}
// Process the command
let result;
if (type === "settings_update") {
agentWs.pushSettingsUpdate(api_id, update_interval);
console.log(
"[agent-commands] delivered settings_update",
api_id,
update_interval,
);
result = { delivered: true, update_interval };
} else if (type === "report_now") {
agentWs.pushReportNow(api_id);
console.log("[agent-commands] delivered report_now", api_id);
result = { delivered: true };
} else {
throw new Error("unsupported agent command");
}
// Log successful completion
await prisma.job_history.upsert({
where: { id: historyId },
create: {
id: historyId,
job_id: job.id,
queue_name: QUEUE_NAMES.AGENT_COMMANDS,
job_name: type,
host_id: host?.id,
api_id,
status: "completed",
attempt_number: attemptNumber,
output: result,
created_at: new Date(),
updated_at: new Date(),
completed_at: new Date(),
},
update: {
status: "completed",
attempt_number: attemptNumber,
output: result,
error_message: null,
updated_at: new Date(),
completed_at: new Date(),
},
});
return result;
} catch (error) {
// Log error to history (if not already logged above)
if (error.message !== "agent not connected") {
const host = await prisma.hosts
.findUnique({
where: { api_id },
select: { id: true },
})
.catch(() => null);
await prisma.job_history
.upsert({
where: { id: historyId },
create: {
id: historyId,
job_id: job.id,
queue_name: QUEUE_NAMES.AGENT_COMMANDS,
job_name: type || "unknown",
host_id: host?.id,
api_id,
status: "failed",
attempt_number: attemptNumber,
error_message: error.message,
created_at: new Date(),
updated_at: new Date(),
},
update: {
status: "failed",
attempt_number: attemptNumber,
error_message: error.message,
updated_at: new Date(),
},
})
.catch((err) =>
console.error("[agent-commands] failed to log error:", err),
);
}
throw error;
}
},
{
connection: redisConnection,
concurrency: 1,
concurrency: 10,
},
);
@@ -184,7 +317,6 @@ class QueueManager {
await this.automations[QUEUE_NAMES.GITHUB_UPDATE_CHECK].schedule();
await this.automations[QUEUE_NAMES.SESSION_CLEANUP].schedule();
await this.automations[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].schedule();
await this.automations[QUEUE_NAMES.ECHO_HELLO].schedule();
}
/**
@@ -202,10 +334,6 @@ class QueueManager {
return this.automations[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].triggerManual();
}
async triggerEchoHello(message = "Hello from BullMQ!") {
return this.automations[QUEUE_NAMES.ECHO_HELLO].triggerManual(message);
}
/**
* Get queue statistics
*/
@@ -262,6 +390,73 @@ class QueueManager {
.slice(0, limit);
}
/**
* Get jobs for a specific host (by API ID)
*/
async getHostJobs(apiId, limit = 20) {
const queue = this.queues[QUEUE_NAMES.AGENT_COMMANDS];
if (!queue) {
throw new Error(`Queue ${QUEUE_NAMES.AGENT_COMMANDS} not found`);
}
console.log(`[getHostJobs] Looking for jobs with api_id: ${apiId}`);
// Get active queue status (waiting, active, delayed, failed)
const [waiting, active, delayed, failed] = await Promise.all([
queue.getWaiting(),
queue.getActive(),
queue.getDelayed(),
queue.getFailed(),
]);
// Filter by API ID
const filterByApiId = (jobs) =>
jobs.filter((job) => job.data && job.data.api_id === apiId);
const waitingCount = filterByApiId(waiting).length;
const activeCount = filterByApiId(active).length;
const delayedCount = filterByApiId(delayed).length;
const failedCount = filterByApiId(failed).length;
console.log(
`[getHostJobs] Queue status - Waiting: ${waitingCount}, Active: ${activeCount}, Delayed: ${delayedCount}, Failed: ${failedCount}`,
);
// Get job history from database (shows all attempts and status changes)
const jobHistory = await prisma.job_history.findMany({
where: {
api_id: apiId,
},
orderBy: {
created_at: "desc",
},
take: limit,
});
console.log(
`[getHostJobs] Found ${jobHistory.length} job history records for api_id: ${apiId}`,
);
return {
waiting: waitingCount,
active: activeCount,
delayed: delayedCount,
failed: failedCount,
jobHistory: jobHistory.map((job) => ({
id: job.id,
job_id: job.job_id,
job_name: job.job_name,
status: job.status,
attempt_number: job.attempt_number,
error_message: job.error_message,
output: job.output,
created_at: job.created_at,
updated_at: job.updated_at,
completed_at: job.completed_at,
})),
};
}
/**
* Graceful shutdown
*/
@@ -269,8 +464,24 @@ class QueueManager {
console.log("🛑 Shutting down queue manager...");
for (const queueName of Object.keys(this.queues)) {
await this.queues[queueName].close();
await this.workers[queueName].close();
try {
await this.queues[queueName].close();
} catch (e) {
console.warn(
`⚠️ Failed to close queue '${queueName}':`,
e?.message || e,
);
}
if (this.workers?.[queueName]) {
try {
await this.workers[queueName].close();
} catch (e) {
console.warn(
`⚠️ Failed to close worker for '${queueName}':`,
e?.message || e,
);
}
}
}
await redis.quit();

View File

@@ -13,7 +13,7 @@ class OrphanedRepoCleanup {
/**
* Process orphaned repository cleanup job
*/
async process(job) {
async process(_job) {
const startTime = Date.now();
console.log("🧹 Starting orphaned repository cleanup...");

View File

@@ -1,5 +1,4 @@
const { prisma } = require("./shared/prisma");
const { cleanup_expired_sessions } = require("../../utils/session_manager");
/**
* Session Cleanup Automation
@@ -14,7 +13,7 @@ class SessionCleanup {
/**
* Process session cleanup job
*/
async process(job) {
async process(_job) {
const startTime = Date.now();
console.log("🧹 Starting session cleanup...");

View File

@@ -3,9 +3,9 @@ const IORedis = require("ioredis");
// Redis connection configuration
const redisConnection = {
host: process.env.REDIS_HOST || "localhost",
port: parseInt(process.env.REDIS_PORT) || 6379,
port: parseInt(process.env.REDIS_PORT, 10) || 6379,
password: process.env.REDIS_PASSWORD || undefined,
db: parseInt(process.env.REDIS_DB) || 0,
db: parseInt(process.env.REDIS_DB, 10) || 0,
retryDelayOnFailover: 100,
maxRetriesPerRequest: null, // BullMQ requires this to be null
};