feat: 增加Ping任务管理功能

This commit is contained in:
Akizon77
2025-06-21 16:47:39 +08:00
parent 785f448baf
commit d6b58f6eed
10 changed files with 400 additions and 21 deletions

View File

@@ -1,7 +1,77 @@
package admin
import "github.com/gin-gonic/gin"
import (
"net/http"
func AddPingTast(c *gin.Context) {
"github.com/gin-gonic/gin"
"github.com/komari-monitor/komari/api"
"github.com/komari-monitor/komari/database/tasks"
)
// POST body: clients []string, target, task_type string, interval int
func AddPingTask(c *gin.Context) {
var req struct {
Clients []string `json:"clients" binding:"required"`
Target string `json:"target" binding:"required"`
TaskType string `json:"type" binding:"required"` // icmp, tcp, http
Interval int `json:"interval" binding:"required"` // 间隔时间,单位秒
}
if err := c.ShouldBindJSON(&req); err != nil {
api.RespondError(c, http.StatusBadRequest, "Invalid request data")
return
}
if taskID, err := tasks.AddPingTask(req.Clients, req.Target, req.TaskType, req.Interval); err != nil {
api.RespondError(c, http.StatusInternalServerError, err.Error())
} else {
api.RespondSuccess(c, gin.H{"task_id": taskID})
}
}
// POST body: id []uint
func DeletePingTask(c *gin.Context) {
var req struct {
ID []uint `json:"id" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
api.RespondError(c, http.StatusBadRequest, "Invalid request data")
return
}
if err := tasks.DeletePingTask(req.ID); err != nil {
api.RespondError(c, http.StatusInternalServerError, err.Error())
} else {
api.RespondSuccess(c, nil)
}
}
// POST body: id []uint, updates map[string]interface{}
func EditPingTask(c *gin.Context) {
var req struct {
ID []uint `json:"id" binding:"required"`
Updates map[string]interface{} `json:"updates" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
api.RespondError(c, http.StatusBadRequest, "Invalid request data")
return
}
if err := tasks.EditPingTask(req.ID, req.Updates); err != nil {
api.RespondError(c, http.StatusInternalServerError, err.Error())
} else {
tasks.DeletePingRecords(req.ID) // 重置记录
api.RespondSuccess(c, nil)
}
}
func GetAllPingTasks(c *gin.Context) {
tasks, err := tasks.GetAllPingTasks()
if err != nil {
api.RespondError(c, http.StatusInternalServerError, err.Error())
return
}
api.RespondSuccess(c, tasks)
}

View File

@@ -15,6 +15,8 @@ import (
"github.com/komari-monitor/komari/api"
"github.com/komari-monitor/komari/common"
"github.com/komari-monitor/komari/database/clients"
"github.com/komari-monitor/komari/database/models"
"github.com/komari-monitor/komari/database/tasks"
"github.com/komari-monitor/komari/utils/notification"
"github.com/komari-monitor/komari/ws"
)
@@ -66,11 +68,12 @@ func WebSocketReport(c *gin.Context) {
},
}
// Upgrade the HTTP connection to a WebSocket connection
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
unsafeConn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"status": "error", "error": "Failed to upgrade to WebSocket"})
return
}
conn := ws.NewSafeConn(unsafeConn)
defer conn.Close()
_, message, err := conn.ReadMessage()
@@ -148,8 +151,24 @@ func WebSocketReport(c *gin.Context) {
}
ws.SetLatestReport(uuid, &report)
case "ping_result":
conn.WriteJSON(gin.H{"status": "pong"})
// TODO: handle ping result
var reqBody struct {
PingTaskID uint `json:"task_id"`
PingResult int `json:"value"`
PingType string `json:"ping_type"`
FinishedAt time.Time `json:"finished_at"`
}
err = json.Unmarshal(message, &reqBody)
if err != nil {
conn.WriteJSON(gin.H{"status": "error", "error": "Invalid ping result format"})
continue
}
pingResult := models.PingRecord{
Client: uuid,
TaskId: reqBody.PingTaskID,
Value: reqBody.PingResult,
Time: reqBody.FinishedAt,
}
tasks.SavePingRecord(pingResult)
default:
log.Printf("Unknown message type: %s", msgType.Type)
conn.WriteJSON(gin.H{"status": "error", "error": "Unknown message type"})

View File

@@ -7,6 +7,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/komari-monitor/komari/api"
records "github.com/komari-monitor/komari/database/records"
"github.com/komari-monitor/komari/database/tasks"
)
func GetRecordsByUUID(c *gin.Context) {
@@ -36,3 +37,33 @@ func GetRecordsByUUID(c *gin.Context) {
"count": len(records),
})
}
// GET query: uuid string, hours int
func GetPingRecords(c *gin.Context) {
uuid := c.Query("uuid")
hours := c.Query("hours")
if uuid == "" {
api.RespondError(c, 400, "UUID is required")
return
}
if hours == "" {
hours = "4"
}
hoursInt, err := strconv.Atoi(hours)
if err != nil {
api.RespondError(c, 400, "Invalid hours parameter")
return
}
records, err := tasks.GetPingRecordsByClientAndTime(uuid, time.Now().Add(-time.Duration(hoursInt)*time.Hour), time.Now())
if err != nil {
api.RespondError(c, 500, "Failed to fetch records: "+err.Error())
return
}
api.RespondSuccess(c, gin.H{
"records": records,
"count": len(records),
})
}

View File

@@ -15,10 +15,10 @@ import (
"github.com/komari-monitor/komari/api/admin/clipboard"
log_api "github.com/komari-monitor/komari/api/admin/log"
"github.com/komari-monitor/komari/api/admin/notification"
"github.com/komari-monitor/komari/api/admin/record"
"github.com/komari-monitor/komari/api/admin/test"
"github.com/komari-monitor/komari/api/admin/update"
"github.com/komari-monitor/komari/api/client"
"github.com/komari-monitor/komari/api/record"
"github.com/komari-monitor/komari/cmd/flags"
"github.com/komari-monitor/komari/database/accounts"
"github.com/komari-monitor/komari/database/config"
@@ -43,6 +43,7 @@ var ServerCmd = &cobra.Command{
Short: "Start the server",
Long: `Start the server`,
Run: func(cmd *cobra.Command, args []string) {
// #region 初始化
if err := os.MkdirAll("./data", os.ModePerm); err != nil {
log.Fatalf("Failed to create data directory: %v", err)
}
@@ -90,7 +91,7 @@ var ServerCmd = &cobra.Command{
r.Any("/ping", func(c *gin.Context) {
c.String(200, "pong")
})
// #region 公开路由
r.POST("/api/login", api.Login)
r.GET("/api/me", api.GetMe)
r.GET("/api/clients", ws.GetClients)
@@ -103,7 +104,8 @@ var ServerCmd = &cobra.Command{
r.GET("/api/recent/:uuid", api.GetClientRecentRecords)
r.GET("/api/records/load", record.GetRecordsByUUID)
r.GET("/api/records/ping", record.GetRecordsByUUID)
// #region Agent
tokenAuthrized := r.Group("/api/clients", api.TokenAuthMiddleware())
{
tokenAuthrized.GET("/report", client.WebSocketReport) // websocket
@@ -112,7 +114,7 @@ var ServerCmd = &cobra.Command{
tokenAuthrized.GET("/terminal", client.EstablishConnection)
tokenAuthrized.POST("/task/result", client.TaskResult)
}
// #region 管理员
adminAuthrized := r.Group("/api/admin", api.AdminAuthMiddleware())
{
// test
@@ -201,12 +203,21 @@ var ServerCmd = &cobra.Command{
notificationGroup.POST("/offline/disable", notification.DisableOfflineNotification)
}
pingTaskGroup := adminAuthrized.Group("/ping")
{
pingTaskGroup.GET("/", admin.GetAllPingTasks)
pingTaskGroup.POST("/add", admin.AddPingTask)
pingTaskGroup.POST("/delete", admin.DeletePingTask)
pingTaskGroup.POST("/edit", admin.EditPingTask)
}
}
public.Static(r.Group("/"), func(handlers ...gin.HandlerFunc) {
r.NoRoute(handlers...)
})
// 静态文件服务
// #region 静态文件服务
public.UpdateIndex(conf)
config.Subscribe(func(event config.ConfigEvent) {
public.UpdateIndex(event.New)
@@ -265,7 +276,9 @@ func InitDatabase() {
}
}
// #region 定时任务
func DoScheduledWork() {
tasks.ReloadPingSchedule()
ticker := time.NewTicker(time.Minute * 30)
minute := time.NewTicker(60 * time.Second)
//records.DeleteRecordBefore(time.Now().Add(-time.Hour * 24 * 30))
@@ -277,6 +290,7 @@ func DoScheduledWork() {
records.DeleteRecordBefore(time.Now().Add(-time.Hour * time.Duration(cfg.RecordPreserveTime)))
records.CompactRecord()
tasks.ClearTaskResultsByTimeBefore(time.Now().Add(-time.Hour * time.Duration(cfg.RecordPreserveTime)))
tasks.DeletePingRecordsBefore(time.Now().Add(-time.Hour * time.Duration(cfg.RecordPreserveTime)))
logOperation.RemoveOldLogs()
case <-minute.C:
api.SaveClientReportToDB()

View File

@@ -134,6 +134,8 @@ func GetDBInstance() *gorm.DB {
&models.Clipboard{},
&models.LoadNotification{},
&models.OfflineNotification{},
&models.PingRecord{},
&models.PingTask{},
)
if err != nil {
log.Fatalf("Failed to create tables: %v", err)

View File

@@ -3,17 +3,16 @@ package models
import "time"
type PingRecord struct {
Client string `json:"client" gorm:"type:varchar(36);not null;index;unique;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;foreignKey:client;references:UUID"`
Type string `json:"type" gorm:"type:varchar(12);not null;default:'ping'"` // ping tcping http
Time time.Time `json:"time" gorm:"index;not null;unique"`
Target string `json:"target" gorm:"type:varchar(255);not null"`
Client string `json:"client" gorm:"type:varchar(36);not null;index;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;foreignKey:client;references:UUID"`
TaskId uint `json:"task_id" gorm:"not null;index;constraint:OnDelete:CASCADE,OnUpdate:CASCADE;foreignKey:TaskId;references:Id"`
Time time.Time `json:"time" gorm:"index;not null"`
Value int `json:"value" gorm:"type:int;not null"` // Ping 值,单位毫秒
}
type PingTask struct {
ID uint `json:"id,omitempty" gorm:"primaryKey;autoIncrement"`
Id uint `json:"id,omitempty" gorm:"primaryKey;autoIncrement"`
Clients StringArray `json:"clients" gorm:"type:longtext"`
Type string `json:"type" gorm:"type:varchar(12);not null;default:'ping'"` // ping tcping http
Type string `json:"type" gorm:"type:varchar(12);not null;default:'icmp'"` // icmp tcp http
Target string `json:"target" gorm:"type:varchar(255);not null"`
Interval int `json:"interval" gorm:"type:int;not null;default:60"` // 间隔时间
}

View File

@@ -1,5 +1,100 @@
package tasks
func AddPingTask(clients []string, target, task_type string, interval int) {
import (
"time"
"github.com/komari-monitor/komari/database/dbcore"
"github.com/komari-monitor/komari/database/models"
"github.com/komari-monitor/komari/utils"
"gorm.io/gorm"
)
func AddPingTask(clients []string, target, task_type string, interval int) (uint, error) {
db := dbcore.GetDBInstance()
task := models.PingTask{
Clients: clients,
Type: task_type,
Target: target,
Interval: interval,
}
if err := db.Create(&task).Error; err != nil {
return 0, err
}
ReloadPingSchedule()
return task.Id, nil
}
func DeletePingTask(id []uint) error {
db := dbcore.GetDBInstance()
result := db.Where("id IN ?", id).Delete(&models.PingTask{})
if result.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
ReloadPingSchedule()
return result.Error
}
func EditPingTask(id []uint, updates map[string]interface{}) error {
db := dbcore.GetDBInstance()
result := db.Model(&models.PingTask{}).Where("id IN ?", id).Updates(updates)
if result.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
ReloadPingSchedule()
return result.Error
}
func GetAllPingTasks() ([]models.PingTask, error) {
db := dbcore.GetDBInstance()
var tasks []models.PingTask
if err := db.Find(&tasks).Error; err != nil {
return nil, err
}
return tasks, nil
}
func SavePingRecord(record models.PingRecord) error {
db := dbcore.GetDBInstance()
return db.Create(&record).Error
}
func GetPingRecords(client string) ([]models.PingRecord, error) {
db := dbcore.GetDBInstance()
var records []models.PingRecord
if err := db.Where("client = ?", client).Order("time DESC").Find(&records).Error; err != nil {
return nil, err
}
return records, nil
}
func DeletePingRecordsBefore(time time.Time) error {
db := dbcore.GetDBInstance()
err := db.Where("time < ?", time).Delete(&models.PingRecord{}).Error
return err
}
func DeletePingRecords(id []uint) error {
db := dbcore.GetDBInstance()
result := db.Where("task_id IN ?", id).Delete(&models.PingRecord{})
if result.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}
return result.Error
}
func ReloadPingSchedule() error {
db := dbcore.GetDBInstance()
var pingTasks []models.PingTask
if err := db.Find(&pingTasks).Error; err != nil {
return err
}
return utils.ReloadPingSchedule(pingTasks)
}
func GetPingRecordsByClientAndTime(uuid string, start, end time.Time) ([]models.PingRecord, error) {
db := dbcore.GetDBInstance()
var records []models.PingRecord
if err := db.Where("client = ? AND time >= ? AND time <= ?", uuid, start, end).Order("time DESC").Find(&records).Error; err != nil {
return nil, err
}
return records, nil
}

94
utils/pingSchedule.go Normal file
View File

@@ -0,0 +1,94 @@
package utils
import (
"sync"
"time"
"github.com/komari-monitor/komari/database/models"
"github.com/komari-monitor/komari/ws"
)
// PingTaskManager 管理定时器和任务
type PingTaskManager struct {
mu sync.Mutex
tickers map[int]*time.Ticker
tasks map[int][]models.PingTask
stopChan chan struct{}
}
var manager = &PingTaskManager{
tickers: make(map[int]*time.Ticker),
tasks: make(map[int][]models.PingTask),
stopChan: make(chan struct{}),
}
// Reload 重载时间表
func (m *PingTaskManager) Reload(pingTasks []models.PingTask) error {
m.mu.Lock()
defer m.mu.Unlock()
// 停止所有现有定时器
for _, ticker := range m.tickers {
ticker.Stop()
}
m.tickers = make(map[int]*time.Ticker)
m.tasks = make(map[int][]models.PingTask)
// 按Interval分组任务
taskGroups := make(map[int][]models.PingTask)
for _, task := range pingTasks {
taskGroups[task.Interval] = append(taskGroups[task.Interval], task)
}
// 为每个唯一的Interval创建定时器
for interval, tasks := range taskGroups {
ticker := time.NewTicker(time.Duration(interval) * time.Second)
m.tickers[interval] = ticker
m.tasks[interval] = tasks
go func(ticker *time.Ticker, tasks []models.PingTask) {
for {
select {
case <-ticker.C:
for _, task := range tasks {
go executePingTask(task)
}
case <-m.stopChan:
return
}
}
}(ticker, tasks)
}
return nil
}
// executePingTask 执行单个PingTask
func executePingTask(task models.PingTask) {
clients := ws.GetConnectedClients()
var message struct {
TaskID uint `json:"ping_task_id"`
Message string `json:"message"`
Type string `json:"ping_type"`
Target string `json:"ping_target"`
}
for _, clientUUID := range task.Clients {
if conn, exists := clients[clientUUID]; exists {
if conn == nil {
continue
}
message.Message = "ping"
message.TaskID = task.Id
message.Type = task.Type
message.Target = task.Target
if err := conn.WriteJSON(message); err != nil {
continue
}
}
}
}
// ReloadPingSchedule 加载或重载时间表
func ReloadPingSchedule(pingTasks []models.PingTask) error {
return manager.Reload(pingTasks)
}

View File

@@ -8,23 +8,23 @@ import (
)
var (
connectedClients = make(map[string]*websocket.Conn)
connectedClients = make(map[string]*SafeConn)
ConnectedUsers = []*websocket.Conn{}
latestReport = make(map[string]*common.Report)
mu = sync.RWMutex{}
)
func GetConnectedClients() map[string]*websocket.Conn {
func GetConnectedClients() map[string]*SafeConn {
mu.RLock()
defer mu.RUnlock()
clientsCopy := make(map[string]*websocket.Conn)
clientsCopy := make(map[string]*SafeConn)
for k, v := range connectedClients {
clientsCopy[k] = v
}
return clientsCopy
}
func SetConnectedClients(uuid string, conn *websocket.Conn) {
func SetConnectedClients(uuid string, conn *SafeConn) {
mu.Lock()
defer mu.Unlock()
connectedClients[uuid] = conn
@@ -32,6 +32,9 @@ func SetConnectedClients(uuid string, conn *websocket.Conn) {
func DeleteConnectedClients(uuid string) {
mu.Lock()
defer mu.Unlock()
if conn, exists := connectedClients[uuid]; exists {
conn.Close()
}
delete(connectedClients, uuid)
}
func GetLatestReport() map[string]*common.Report {

52
ws/safeConn.go Normal file
View File

@@ -0,0 +1,52 @@
package ws
import (
"sync"
"time"
"github.com/gorilla/websocket"
)
type SafeConn struct {
conn *websocket.Conn
mu sync.Mutex
}
func NewSafeConn(conn *websocket.Conn) *SafeConn {
return &SafeConn{
conn: conn,
mu: sync.Mutex{},
}
}
func (sc *SafeConn) WriteMessage(messageType int, data []byte) error {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.conn.WriteMessage(messageType, data)
}
func (sc *SafeConn) WriteJSON(v interface{}) error {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.conn.WriteJSON(v)
}
func (sc *SafeConn) Close() error {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.conn.Close()
}
func (sc *SafeConn) ReadMessage() (int, []byte, error) {
return sc.conn.ReadMessage()
}
func (sc *SafeConn) ReadJSON(v interface{}) error {
return sc.conn.ReadJSON(v)
}
func (sc *SafeConn) SetReadDeadline(t time.Time) error {
return sc.conn.SetReadDeadline(t)
}
func (sc *SafeConn) GetConn() *websocket.Conn {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.conn
}