mirror of
https://github.com/9technologygroup/patchmon.net.git
synced 2025-11-13 02:17:05 +00:00
Fixed repo count issue
Refactored code to remove duplicate backend api endpoints for counting Improved connection persistence issues Improved database connection pooling issues Fixed redis connection efficiency Changed version to 1.3.0 Fixed GO binary detection based on package manager rather than OS
This commit is contained in:
@@ -52,7 +52,7 @@ class GitHubUpdateCheck {
|
||||
}
|
||||
|
||||
// Read version from package.json
|
||||
let currentVersion = "1.2.7"; // fallback
|
||||
let currentVersion = "1.3.0"; // fallback
|
||||
try {
|
||||
const packageJson = require("../../../package.json");
|
||||
if (packageJson?.version) {
|
||||
|
||||
@@ -99,16 +99,27 @@ class QueueManager {
|
||||
* Initialize all workers
|
||||
*/
|
||||
async initializeWorkers() {
|
||||
// Optimized worker options to reduce Redis connections
|
||||
const workerOptions = {
|
||||
connection: redisConnection,
|
||||
concurrency: 1, // Keep concurrency low to reduce connections
|
||||
// Connection optimization
|
||||
maxStalledCount: 1,
|
||||
stalledInterval: 30000,
|
||||
// Reduce connection churn
|
||||
settings: {
|
||||
stalledInterval: 30000,
|
||||
maxStalledCount: 1,
|
||||
},
|
||||
};
|
||||
|
||||
// GitHub Update Check Worker
|
||||
this.workers[QUEUE_NAMES.GITHUB_UPDATE_CHECK] = new Worker(
|
||||
QUEUE_NAMES.GITHUB_UPDATE_CHECK,
|
||||
this.automations[QUEUE_NAMES.GITHUB_UPDATE_CHECK].process.bind(
|
||||
this.automations[QUEUE_NAMES.GITHUB_UPDATE_CHECK],
|
||||
),
|
||||
{
|
||||
connection: redisConnection,
|
||||
concurrency: 1,
|
||||
},
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
// Session Cleanup Worker
|
||||
@@ -117,10 +128,7 @@ class QueueManager {
|
||||
this.automations[QUEUE_NAMES.SESSION_CLEANUP].process.bind(
|
||||
this.automations[QUEUE_NAMES.SESSION_CLEANUP],
|
||||
),
|
||||
{
|
||||
connection: redisConnection,
|
||||
concurrency: 1,
|
||||
},
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
// Orphaned Repo Cleanup Worker
|
||||
@@ -129,10 +137,7 @@ class QueueManager {
|
||||
this.automations[QUEUE_NAMES.ORPHANED_REPO_CLEANUP].process.bind(
|
||||
this.automations[QUEUE_NAMES.ORPHANED_REPO_CLEANUP],
|
||||
),
|
||||
{
|
||||
connection: redisConnection,
|
||||
concurrency: 1,
|
||||
},
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
// Orphaned Package Cleanup Worker
|
||||
@@ -141,167 +146,33 @@ class QueueManager {
|
||||
this.automations[QUEUE_NAMES.ORPHANED_PACKAGE_CLEANUP].process.bind(
|
||||
this.automations[QUEUE_NAMES.ORPHANED_PACKAGE_CLEANUP],
|
||||
),
|
||||
{
|
||||
connection: redisConnection,
|
||||
concurrency: 1,
|
||||
},
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
// Agent Commands Worker
|
||||
this.workers[QUEUE_NAMES.AGENT_COMMANDS] = new Worker(
|
||||
QUEUE_NAMES.AGENT_COMMANDS,
|
||||
async (job) => {
|
||||
const { api_id, type, update_interval } = job.data || {};
|
||||
console.log("[agent-commands] processing job", job.id, api_id, type);
|
||||
const { api_id, type } = job.data;
|
||||
console.log(`Processing agent command: ${type} for ${api_id}`);
|
||||
|
||||
// Log job attempt to history - use job.id as the unique identifier
|
||||
const attemptNumber = job.attemptsMade || 1;
|
||||
const historyId = job.id; // Single row per job, updated with each attempt
|
||||
|
||||
try {
|
||||
if (!api_id || !type) {
|
||||
throw new Error("invalid job data");
|
||||
}
|
||||
|
||||
// Find host by api_id
|
||||
const host = await prisma.hosts.findUnique({
|
||||
where: { api_id },
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
// Ensure agent is connected; if not, retry later
|
||||
if (!agentWs.isConnected(api_id)) {
|
||||
const error = new Error("agent not connected");
|
||||
// Log failed attempt
|
||||
await prisma.job_history.upsert({
|
||||
where: { id: historyId },
|
||||
create: {
|
||||
id: historyId,
|
||||
job_id: job.id,
|
||||
queue_name: QUEUE_NAMES.AGENT_COMMANDS,
|
||||
job_name: type,
|
||||
host_id: host?.id,
|
||||
api_id,
|
||||
status: "failed",
|
||||
attempt_number: attemptNumber,
|
||||
error_message: error.message,
|
||||
created_at: new Date(),
|
||||
updated_at: new Date(),
|
||||
},
|
||||
update: {
|
||||
status: "failed",
|
||||
attempt_number: attemptNumber,
|
||||
error_message: error.message,
|
||||
updated_at: new Date(),
|
||||
},
|
||||
});
|
||||
console.log(
|
||||
"[agent-commands] agent not connected, will retry",
|
||||
api_id,
|
||||
);
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Process the command
|
||||
let result;
|
||||
if (type === "settings_update") {
|
||||
agentWs.pushSettingsUpdate(api_id, update_interval);
|
||||
console.log(
|
||||
"[agent-commands] delivered settings_update",
|
||||
api_id,
|
||||
update_interval,
|
||||
);
|
||||
result = { delivered: true, update_interval };
|
||||
} else if (type === "report_now") {
|
||||
agentWs.pushReportNow(api_id);
|
||||
console.log("[agent-commands] delivered report_now", api_id);
|
||||
result = { delivered: true };
|
||||
} else {
|
||||
throw new Error("unsupported agent command");
|
||||
}
|
||||
|
||||
// Log successful completion
|
||||
await prisma.job_history.upsert({
|
||||
where: { id: historyId },
|
||||
create: {
|
||||
id: historyId,
|
||||
job_id: job.id,
|
||||
queue_name: QUEUE_NAMES.AGENT_COMMANDS,
|
||||
job_name: type,
|
||||
host_id: host?.id,
|
||||
api_id,
|
||||
status: "completed",
|
||||
attempt_number: attemptNumber,
|
||||
output: result,
|
||||
created_at: new Date(),
|
||||
updated_at: new Date(),
|
||||
completed_at: new Date(),
|
||||
},
|
||||
update: {
|
||||
status: "completed",
|
||||
attempt_number: attemptNumber,
|
||||
output: result,
|
||||
error_message: null,
|
||||
updated_at: new Date(),
|
||||
completed_at: new Date(),
|
||||
},
|
||||
});
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
// Log error to history (if not already logged above)
|
||||
if (error.message !== "agent not connected") {
|
||||
const host = await prisma.hosts
|
||||
.findUnique({
|
||||
where: { api_id },
|
||||
select: { id: true },
|
||||
})
|
||||
.catch(() => null);
|
||||
|
||||
await prisma.job_history
|
||||
.upsert({
|
||||
where: { id: historyId },
|
||||
create: {
|
||||
id: historyId,
|
||||
job_id: job.id,
|
||||
queue_name: QUEUE_NAMES.AGENT_COMMANDS,
|
||||
job_name: type || "unknown",
|
||||
host_id: host?.id,
|
||||
api_id,
|
||||
status: "failed",
|
||||
attempt_number: attemptNumber,
|
||||
error_message: error.message,
|
||||
created_at: new Date(),
|
||||
updated_at: new Date(),
|
||||
},
|
||||
update: {
|
||||
status: "failed",
|
||||
attempt_number: attemptNumber,
|
||||
error_message: error.message,
|
||||
updated_at: new Date(),
|
||||
},
|
||||
})
|
||||
.catch((err) =>
|
||||
console.error("[agent-commands] failed to log error:", err),
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
// Send command via WebSocket based on type
|
||||
if (type === "report_now") {
|
||||
agentWs.pushReportNow(api_id);
|
||||
} else if (type === "settings_update") {
|
||||
// For settings update, we need additional data
|
||||
const { update_interval } = job.data;
|
||||
agentWs.pushSettingsUpdate(api_id, update_interval);
|
||||
} else {
|
||||
console.error(`Unknown agent command type: ${type}`);
|
||||
}
|
||||
},
|
||||
{
|
||||
connection: redisConnection,
|
||||
concurrency: 10,
|
||||
},
|
||||
workerOptions,
|
||||
);
|
||||
|
||||
// Add error handling for all workers
|
||||
Object.values(this.workers).forEach((worker) => {
|
||||
worker.on("error", (error) => {
|
||||
console.error("Worker error:", error);
|
||||
});
|
||||
});
|
||||
|
||||
console.log("✅ All workers initialized");
|
||||
console.log(
|
||||
"✅ All workers initialized with optimized connection settings",
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
const { PrismaClient } = require("@prisma/client");
|
||||
const { getPrismaClient } = require("../../../config/prisma");
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
const prisma = getPrismaClient();
|
||||
|
||||
module.exports = { prisma };
|
||||
|
||||
@@ -1,17 +1,56 @@
|
||||
const IORedis = require("ioredis");
|
||||
|
||||
// Redis connection configuration
|
||||
// Redis connection configuration with connection pooling
|
||||
const redisConnection = {
|
||||
host: process.env.REDIS_HOST || "localhost",
|
||||
port: parseInt(process.env.REDIS_PORT, 10) || 6379,
|
||||
password: process.env.REDIS_PASSWORD || undefined,
|
||||
username: process.env.REDIS_USER || undefined,
|
||||
db: parseInt(process.env.REDIS_DB, 10) || 0,
|
||||
// Connection pooling settings
|
||||
lazyConnect: true,
|
||||
keepAlive: 30000,
|
||||
connectTimeout: 30000, // Increased from 10s to 30s
|
||||
commandTimeout: 30000, // Increased from 5s to 30s
|
||||
enableReadyCheck: false,
|
||||
// Reduce connection churn
|
||||
family: 4, // Force IPv4
|
||||
// Retry settings
|
||||
retryDelayOnClusterDown: 300,
|
||||
retryDelayOnFailover: 100,
|
||||
maxRetriesPerRequest: null, // BullMQ requires this to be null
|
||||
// Connection pool settings
|
||||
maxLoadingTimeout: 30000,
|
||||
};
|
||||
|
||||
// Create Redis connection
|
||||
const redis = new IORedis(redisConnection);
|
||||
// Create Redis connection with singleton pattern
|
||||
let redisInstance = null;
|
||||
|
||||
module.exports = { redis, redisConnection };
|
||||
function getRedisConnection() {
|
||||
if (!redisInstance) {
|
||||
redisInstance = new IORedis(redisConnection);
|
||||
|
||||
// Handle graceful shutdown
|
||||
process.on("beforeExit", async () => {
|
||||
await redisInstance.quit();
|
||||
});
|
||||
|
||||
process.on("SIGINT", async () => {
|
||||
await redisInstance.quit();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on("SIGTERM", async () => {
|
||||
await redisInstance.quit();
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
return redisInstance;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
redis: getRedisConnection(),
|
||||
redisConnection,
|
||||
getRedisConnection,
|
||||
};
|
||||
|
||||
@@ -33,7 +33,7 @@ async function checkPublicRepo(owner, repo) {
|
||||
try {
|
||||
const httpsRepoUrl = `https://api.github.com/repos/${owner}/${repo}/releases/latest`;
|
||||
|
||||
let currentVersion = "1.2.7"; // fallback
|
||||
let currentVersion = "1.3.0"; // fallback
|
||||
try {
|
||||
const packageJson = require("../../../package.json");
|
||||
if (packageJson?.version) {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
const { PrismaClient } = require("@prisma/client");
|
||||
const { getPrismaClient } = require("../config/prisma");
|
||||
const { v4: uuidv4 } = require("uuid");
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
const prisma = getPrismaClient();
|
||||
|
||||
// Cached settings instance
|
||||
let cachedSettings = null;
|
||||
|
||||
Reference in New Issue
Block a user