mirror of
https://github.com/komari-monitor/komari.git
synced 2025-10-23 03:31:56 +00:00
114 lines
2.6 KiB
Go
114 lines
2.6 KiB
Go
package utils
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/komari-monitor/komari/database/models"
|
|
"github.com/komari-monitor/komari/ws"
|
|
)
|
|
|
|
// PingTaskManager 管理定时器和任务
|
|
type PingTaskManager struct {
|
|
mu sync.Mutex
|
|
cancelFunc context.CancelFunc
|
|
tasks map[int][]models.PingTask
|
|
}
|
|
|
|
var manager = &PingTaskManager{
|
|
tasks: make(map[int][]models.PingTask),
|
|
}
|
|
|
|
// Reload 重载时间表
|
|
func (m *PingTaskManager) Reload(pingTasks []models.PingTask) error {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if m.cancelFunc != nil {
|
|
m.cancelFunc()
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
m.cancelFunc = cancel
|
|
|
|
m.tasks = make(map[int][]models.PingTask)
|
|
|
|
// 按Interval分组任务
|
|
taskGroups := make(map[int][]models.PingTask)
|
|
for _, task := range pingTasks {
|
|
if task.Interval <= 0 {
|
|
continue
|
|
}
|
|
taskGroups[task.Interval] = append(taskGroups[task.Interval], task)
|
|
}
|
|
|
|
// 为每个唯一的Interval创建协程
|
|
for interval, tasks := range taskGroups {
|
|
m.tasks[interval] = tasks
|
|
go m.runPreciseLoop(ctx, time.Duration(interval)*time.Second, tasks)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *PingTaskManager) runPreciseLoop(ctx context.Context, interval time.Duration, tasks []models.PingTask) {
|
|
// Start the first timer.
|
|
timer := time.NewTimer(interval)
|
|
|
|
// This will be the reference point for all future ticks.
|
|
// By adding the interval to this time, we avoid accumulating execution delays.
|
|
nextTick := time.Now().Add(interval)
|
|
defer timer.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
onlineClients := ws.GetConnectedClients()
|
|
for _, task := range tasks {
|
|
go executePingTask(ctx, task, onlineClients)
|
|
}
|
|
|
|
nextTick = nextTick.Add(interval)
|
|
timer.Reset(time.Until(nextTick))
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// executePingTask 执行单个PingTask
|
|
func executePingTask(ctx context.Context, task models.PingTask, onlineClients map[string]*ws.SafeConn) {
|
|
var message struct {
|
|
TaskID uint `json:"ping_task_id"`
|
|
Message string `json:"message"`
|
|
Type string `json:"ping_type"`
|
|
Target string `json:"ping_target"`
|
|
}
|
|
|
|
message.Message = "ping"
|
|
message.TaskID = task.Id
|
|
message.Type = task.Type
|
|
message.Target = task.Target
|
|
|
|
for _, clientUUID := range task.Clients {
|
|
select {
|
|
case <-ctx.Done():
|
|
// Context was canceled, stop sending pings.
|
|
return
|
|
default:
|
|
// Context is still active, continue.
|
|
}
|
|
|
|
if conn, exists := onlineClients[clientUUID]; exists && conn != nil {
|
|
if err := conn.WriteJSON(message); err != nil {
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ReloadPingSchedule 加载或重载时间表
|
|
func ReloadPingSchedule(pingTasks []models.PingTask) error {
|
|
return manager.Reload(pingTasks)
|
|
}
|