新增了GPU监控数据接收存储

接收agent发送的gpu监控数据.
参考Record实现, 新增GPURecord表存储监控数据和压缩监控数据.
This commit is contained in:
kdwycz
2025-09-15 20:23:16 +08:00
parent 354903b97b
commit 0f7cd5f438
9 changed files with 482 additions and 18 deletions

3
.gitignore vendored
View File

@@ -30,3 +30,6 @@ go.work.sum
# env file
.env
# build
komari

View File

@@ -37,6 +37,7 @@ var TerminalSessions = make(map[string]*TerminalSession)
func SaveClientReportToDB() error {
lastMinute := time.Now().Add(-time.Minute).Unix()
var records []models.Record
var gpuRecords []models.GPURecord
// 遍历所有客户端记录
for uuid, x := range Records.Items() {
@@ -65,10 +66,16 @@ func SaveClientReportToDB() error {
if len(filtered) > 0 {
r := utils.AverageReport(uuid, time.Now(), filtered, 0.3)
records = append(records, r)
// 使用与其他数据相同的聚合逻辑处理GPU数据
gpuAggregated := utils.AverageGPUReports(uuid, time.Now(), filtered, 0.3)
gpuRecords = append(gpuRecords, gpuAggregated...)
}
}
// 批量插入数据库前去重client与time共同构成唯一键
db := dbcore.GetDBInstance()
if len(records) > 0 {
unique := make(map[string]models.Record)
for _, rec := range records {
@@ -79,13 +86,30 @@ func SaveClientReportToDB() error {
for _, rec := range unique {
deduped = append(deduped, rec)
}
db := dbcore.GetDBInstance()
if err := db.Model(&models.Record{}).Create(&deduped).Error; err != nil {
log.Printf("Failed to save records to database: %v", err)
return err
}
}
// 批量插入GPU记录
if len(gpuRecords) > 0 {
// GPU记录也需要去重防止重复插入
gpuUnique := make(map[string]models.GPURecord)
for _, rec := range gpuRecords {
key := rec.Client + "_" + strconv.Itoa(rec.DeviceIndex) + "_" + strconv.FormatInt(rec.Time.ToTime().Unix(), 10)
gpuUnique[key] = rec
}
var gpuDeduped []models.GPURecord
for _, rec := range gpuUnique {
gpuDeduped = append(gpuDeduped, rec)
}
if err := db.Model(&models.GPURecord{}).Create(&gpuDeduped).Error; err != nil {
log.Printf("Failed to save GPU records to database: %v", err)
return err
}
}
return nil
}

View File

@@ -59,37 +59,73 @@ func GetRecordsByUUID(c *gin.Context) {
// 验证 load_type 参数
validLoadTypes := map[string]bool{
"cpu": true, "gpu": true, "ram": true, "swap": true,
"cpu": true, "ram": true, "swap": true,
"load": true, "temp": true, "disk": true, "network": true,
"process": true, "connections": true, "all": true, "": true,
}
if !validLoadTypes[loadType] {
api.RespondError(c, 400, "Invalid load_type parameter")
return
}
records, err := records.GetRecordsByClientAndTime(uuid, time.Now().Add(-time.Duration(hoursInt)*time.Hour), time.Now())
clientRecords, err := records.GetRecordsByClientAndTime(uuid, time.Now().Add(-time.Duration(hoursInt)*time.Hour), time.Now())
if err != nil {
api.RespondError(c, 500, "Failed to fetch records: "+err.Error())
return
}
// 根据 load_type 过滤返回的数据
// 准备基本响应
response := gin.H{
"records": clientRecords,
"count": len(clientRecords),
}
// 如果有load_type过滤应用过滤
if loadType != "" && loadType != "all" {
filteredRecords := filterRecordsByLoadType(records, loadType)
api.RespondSuccess(c, gin.H{
filteredRecords := filterRecordsByLoadType(clientRecords, loadType)
response = gin.H{
"records": filteredRecords,
"count": len(filteredRecords),
"load_type": loadType,
})
} else {
// 返回所有数据(向后兼容)
api.RespondSuccess(c, gin.H{
"records": records,
"count": len(records),
})
}
}
// 自动检测是否有GPU数据并附加到响应中
if loadType == "" || loadType == "all" || loadType == "gpu" {
gpuRecords, err := records.GetGPURecordsByClientAndTime(uuid, time.Now().Add(-time.Duration(hoursInt)*time.Hour), time.Now())
if err == nil && len(gpuRecords) > 0 {
// 按设备索引分组数据,构建简化的设备结构
gpuDevices := make(map[string]interface{})
for _, record := range gpuRecords {
deviceKey := strconv.Itoa(record.DeviceIndex)
// 如果设备还没有初始化,创建设备信息
if gpuDevices[deviceKey] == nil {
gpuDevices[deviceKey] = gin.H{
"device_index": record.DeviceIndex,
"device_name": record.DeviceName,
"records": []models.GPURecord{},
}
}
// 添加记录到设备
device := gpuDevices[deviceKey].(gin.H)
records := device["records"].([]models.GPURecord)
device["records"] = append(records, record)
gpuDevices[deviceKey] = device
}
// 添加优化后的GPU数据结构到响应
response["gpu_devices"] = gpuDevices
response["has_gpu_data"] = true
} else {
response["has_gpu_data"] = false
}
}
api.RespondSuccess(c, response)
}
// filterRecordsByLoadType 根据 load_type 过滤记录,只返回相关字段

View File

@@ -69,6 +69,7 @@ type Report struct {
Disk DiskReport `json:"disk"`
Network NetworkReport `json:"network"`
Connections ConnectionsReport `json:"connections"`
GPU *GPUDetailReport `json:"gpu,omitempty"` // 新增GPU详细信息
Uptime int64 `json:"uptime"`
Process int `json:"process"`
Message string `json:"message"`
@@ -83,6 +84,23 @@ type CPUReport struct {
Usage float64 `json:"usage,omitempty"`
}
// GPUDetailReport 详细GPU信息报告
type GPUDetailReport struct {
Count int `json:"count"` // GPU数量
AverageUsage float64 `json:"average_usage"` // 平均使用率
DetailedInfo []GPUDeviceInfo `json:"detailed_info"` // 每个GPU的详细信息
}
// GPUDeviceInfo 单个GPU设备信息
type GPUDeviceInfo struct {
Name string `json:"name"` // GPU型号名称
MemoryTotal int64 `json:"memory_total"` // 显存总量(字节)
MemoryUsed int64 `json:"memory_used"` // 显存已用(字节)
Utilization float64 `json:"utilization"` // GPU使用率(%)
Temperature int `json:"temperature"` // GPU温度(°C)
}
// 保持向后兼容的旧GPUReport结构
type GPUReport struct {
Name string `json:"name,omitempty"`
Usage float64 `json:"usage,omitempty"`

View File

@@ -144,11 +144,37 @@ func SaveClientReport(clientUUID string, report common.Report) (err error) {
return fmt.Errorf("failed to save Record: %v", err)
}
// 保存GPU详细记录到独立表
currentTime := time.Now()
if report.GPU != nil && len(report.GPU.DetailedInfo) > 0 {
for idx, gpu := range report.GPU.DetailedInfo {
gpuRecord := models.GPURecord{
Client: clientUUID,
Time: models.FromTime(currentTime),
DeviceIndex: idx,
DeviceName: gpu.Name,
MemTotal: gpu.MemoryTotal,
MemUsed: gpu.MemoryUsed,
Utilization: float32(gpu.Utilization),
Temperature: gpu.Temperature,
}
if err := db.Create(&gpuRecord).Error; err != nil {
return fmt.Errorf("failed to save GPU record: %v", err)
}
}
}
// 计算平均GPU使用率用于向后兼容
averageGPUUsage := float32(0)
if report.GPU != nil && len(report.GPU.DetailedInfo) > 0 {
averageGPUUsage = float32(report.GPU.AverageUsage)
}
Record := models.Record{
Client: clientUUID,
Time: models.FromTime(time.Now()),
Time: models.FromTime(currentTime),
Cpu: float32(report.CPU.Usage),
Gpu: 0, // Report 未提供 GPU Usage,设为 0与原 nil 行为类似)
Gpu: averageGPUUsage, // 使用平均GPU使用率
Ram: report.Ram.Used,
RamTotal: report.Ram.Total,
Swap: report.Swap.Used,

View File

@@ -254,6 +254,7 @@ func GetDBInstance() *gorm.DB {
&models.User{},
&models.Client{},
&models.Record{},
&models.GPURecord{},
&models.Config{},
&models.Log{},
&models.Clipboard{},
@@ -274,6 +275,12 @@ func GetDBInstance() *gorm.DB {
if err != nil {
log.Printf("Failed to create records_long_term table, it may already exist: %v", err)
}
err = instance.Table("gpu_records_long_term").AutoMigrate(
&models.GPURecord{},
)
if err != nil {
log.Printf("Failed to create gpu_records_long_term table, it may already exist: %v", err)
}
err = instance.AutoMigrate(
&models.Session{},
)

View File

@@ -93,6 +93,18 @@ type Record struct {
//Uptime int64 `json:"uptime" gorm:"type:bigint"`
}
// GPURecord logs individual GPU metrics over time
type GPURecord struct {
Client string `json:"client" gorm:"type:varchar(36);index"` // 客户端UUID
Time LocalTime `json:"time" gorm:"index"` // 记录时间
DeviceIndex int `json:"device_index" gorm:"index"` // GPU设备索引 (0,1,2...)
DeviceName string `json:"device_name" gorm:"type:varchar(100)"` // GPU型号
MemTotal int64 `json:"mem_total" gorm:"type:bigint"` // 显存总量(字节)
MemUsed int64 `json:"mem_used" gorm:"type:bigint"` // 显存使用(字节)
Utilization float32 `json:"utilization" gorm:"type:decimal(5,2)"` // GPU使用率(%)
Temperature int `json:"temperature"` // GPU温度(°C)
}
// StringArray represents a slice of strings stored as JSON in the database
// StringArray 存储为 JSON 的字符串切片类型
type StringArray []string

View File

@@ -18,14 +18,61 @@ func RecordOne(rec models.Record) error {
return db.Create(&rec).Error
}
func RecordGPU(rec models.GPURecord) error {
db := dbcore.GetDBInstance()
return db.Create(&rec).Error
}
func DeleteAll() error {
db := dbcore.GetDBInstance()
if err := db.Exec("DELETE FROM records_long_term").Error; err != nil {
return err
}
if err := db.Exec("DELETE FROM gpu_records_long_term").Error; err != nil {
return err
}
if err := db.Exec("DELETE FROM gpu_records").Error; err != nil {
return err
}
return db.Exec("DELETE FROM records").Error
}
// GetGPURecordsByClientAndTime 获取GPU记录数据
func GetGPURecordsByClientAndTime(uuid string, start, end time.Time) ([]models.GPURecord, error) {
db := dbcore.GetDBInstance()
var records []models.GPURecord
fourHoursAgo := time.Now().Add(-4*time.Hour - time.Minute)
var recentRecords []models.GPURecord
recentStart := start
if end.After(fourHoursAgo) {
if recentStart.Before(fourHoursAgo) {
recentStart = fourHoursAgo
}
err := db.Where("client = ? AND time >= ? AND time <= ?", uuid, recentStart, end).
Order("time ASC, device_index ASC").Find(&recentRecords).Error
if err != nil {
log.Printf("Error fetching recent GPU records for client %s between %s and %s: %v", uuid, recentStart, end, err)
return nil, err
}
}
var longTermRecords []models.GPURecord
err := db.Table("gpu_records_long_term").Where("client = ? AND time >= ? AND time <= ?", uuid, start, end).
Order("time ASC, device_index ASC").Find(&longTermRecords).Error
if err != nil {
log.Printf("Error fetching long-term GPU records for client %s between %s and %s: %v", uuid, start, end, err)
return recentRecords, nil
}
// 合并结果 - 不再需要类型转换
records = append(records, recentRecords...)
records = append(records, longTermRecords...)
return records, nil
}
func GetLatestRecord(uuid string) (Record []models.Record, err error) {
db := dbcore.GetDBInstance()
err = db.Where("client = ?", uuid).Order("time DESC").Limit(1).Find(&Record).Error
@@ -35,6 +82,8 @@ func GetLatestRecord(uuid string) (Record []models.Record, err error) {
func DeleteRecordBefore(before time.Time) error {
db := dbcore.GetDBInstance()
db.Table("records_long_term").Where("time < ?", before).Delete(&models.Record{})
db.Table("gpu_records_long_term").Where("time < ?", before).Delete(&models.GPURecord{})
db.Where("time < ?", before).Delete(&models.GPURecord{})
return db.Where("time < ?", before).Delete(&models.Record{}).Error
}
@@ -117,6 +166,12 @@ func CompactRecord() error {
return err
}
err = migrateGPURecords(db)
if err != nil {
log.Printf("Error migrating GPU records: %v", err)
return err
}
if flags.DatabaseType == "sqlite" {
if err := db.Exec("VACUUM").Error; err != nil {
log.Printf("Error vacuuming database: %v", err)
@@ -304,3 +359,147 @@ func migrateOldRecords(db *gorm.DB) error {
return nil
})
}
// migrateGPURecords 压缩GPU记录数据
func migrateGPURecords(db *gorm.DB) error {
fourHoursAgo := time.Now().Add(-4 * time.Hour)
// 查询超过4小时的GPU记录
var gpuRecords []models.GPURecord
if err := db.Where("time < ?", fourHoursAgo).Find(&gpuRecords).Error; err != nil {
return err
}
if len(gpuRecords) == 0 {
return nil
}
// 按Client + DeviceIndex + 15分钟时间窗口分组
type gpuGroupKey struct {
Client string
DeviceIndex int
TimeSlot time.Time
DeviceName string
}
type gpuGroupData struct {
MemTotal []int64
MemUsed []int64
Utilization []float32
Temperature []int
}
groupedGPUs := make(map[gpuGroupKey]*gpuGroupData)
for _, record := range gpuRecords {
key := gpuGroupKey{
Client: record.Client,
DeviceIndex: record.DeviceIndex,
TimeSlot: record.Time.ToTime().Truncate(15 * time.Minute),
DeviceName: record.DeviceName,
}
if _, ok := groupedGPUs[key]; !ok {
groupedGPUs[key] = &gpuGroupData{}
}
data := groupedGPUs[key]
data.MemTotal = append(data.MemTotal, record.MemTotal)
data.MemUsed = append(data.MemUsed, record.MemUsed)
data.Utilization = append(data.Utilization, record.Utilization)
data.Temperature = append(data.Temperature, record.Temperature)
}
// 百分位数计算函数 (复用传统Record压缩逻辑)
getPercentile := func(values []float64, percentile float64) float64 {
if len(values) == 0 {
return 0
}
sortedValues := make([]float64, len(values))
copy(sortedValues, values)
sort.Float64s(sortedValues)
index := float64(len(sortedValues)-1) * percentile
lowerIndex := int(index)
if lowerIndex >= len(sortedValues)-1 {
return sortedValues[len(sortedValues)-1]
}
frac := index - float64(lowerIndex)
return sortedValues[lowerIndex] + frac*(sortedValues[lowerIndex+1]-sortedValues[lowerIndex])
}
getIntPercentile := func(values []int64, percentile float64) int64 {
if len(values) == 0 {
return 0
}
floats := make([]float64, len(values))
for i, v := range values {
floats[i] = float64(v)
}
return int64(getPercentile(floats, percentile))
}
// 温度数据转换辅助函数
convertIntToInt64 := func(values []int) []int64 {
result := make([]int64, len(values))
for i, v := range values {
result[i] = int64(v)
}
return result
}
getFloat32Percentile := func(values []float32, percentile float64) float32 {
if len(values) == 0 {
return 0
}
floats := make([]float64, len(values))
for i, v := range values {
floats[i] = float64(v)
}
return float32(getPercentile(floats, percentile))
}
// 保持与传统Record压缩的一致性
high_percentile := 0.7
return db.Transaction(func(tx *gorm.DB) error {
for key, data := range groupedGPUs {
// 检查是否已存在记录
var existingCount int64
if err := tx.Table("gpu_records_long_term").Where("client = ? AND device_index = ? AND time = ?",
key.Client, key.DeviceIndex, key.TimeSlot).Count(&existingCount).Error; err != nil {
return err
}
compressedGPU := models.GPURecord{
Client: key.Client,
DeviceIndex: key.DeviceIndex,
Time: models.FromTime(key.TimeSlot),
DeviceName: key.DeviceName,
MemTotal: getIntPercentile(data.MemTotal, high_percentile),
MemUsed: getIntPercentile(data.MemUsed, high_percentile),
Utilization: getFloat32Percentile(data.Utilization, high_percentile),
Temperature: int(getIntPercentile(convertIntToInt64(data.Temperature), high_percentile)),
}
if existingCount > 0 {
// 更新已存在记录
if err := tx.Table("gpu_records_long_term").Where("client = ? AND device_index = ? AND time = ?",
key.Client, key.DeviceIndex, key.TimeSlot).Updates(&compressedGPU).Error; err != nil {
return err
}
} else {
// 创建新记录
if err := tx.Table("gpu_records_long_term").Create(&compressedGPU).Error; err != nil {
return err
}
}
}
// 删除已压缩的原始GPU数据
if err := tx.Where("time < ?", fourHoursAgo.Add(-1*time.Hour)).Delete(&models.GPURecord{}).Error; err != nil {
return err
}
return nil
})
}

View File

@@ -49,13 +49,19 @@ func AverageReport(uuid string, time time.Time, records []common.Report, topPerc
}
}
var sumCPU, sumLOAD float32
var sumCPU, sumLOAD, sumGPU float32
var sumRAM, sumRAMTotal, sumSWAP, sumSWAPTotal, sumDISK, sumDISKTotal, sumNETIn, sumNETOut, sumNETTotalUp, sumNETTotalDown int64
var sumPROCESS, sumConnections, sumConnectionsUDP int
if topPercentage > 0 && topPercentage <= 1 {
sumCPU, _ = sumAndSort(func(r common.Report) float32 { return float32(r.CPU.Usage) }, nil, true)
sumLOAD, _ = sumAndSort(func(r common.Report) float32 { return float32(r.Load.Load1) }, nil, true)
sumGPU, _ = sumAndSort(func(r common.Report) float32 {
if r.GPU != nil {
return float32(r.GPU.AverageUsage)
}
return 0
}, nil, true)
_, sumRAM = sumAndSort(nil, func(r common.Report) int64 { return r.Ram.Used }, false)
//_, sumRAMTotal = sumAndSort(nil, nil, func(r common.Report) int64 { return r.Ram.Total }, false)
@@ -79,6 +85,9 @@ func AverageReport(uuid string, time time.Time, records []common.Report, topPerc
for _, r := range records {
sumCPU += float32(r.CPU.Usage)
sumLOAD += float32(r.Load.Load1)
if r.GPU != nil {
sumGPU += float32(r.GPU.AverageUsage)
}
sumRAM += r.Ram.Used
sumRAMTotal += r.Ram.Total
sumSWAP += r.Swap.Used
@@ -100,7 +109,7 @@ func AverageReport(uuid string, time time.Time, records []common.Report, topPerc
Client: uuid,
Time: models.FromTime(time),
Cpu: sumCPU / float32(recordsToAverageCount),
Gpu: 0, // 保持原始行为
Gpu: sumGPU / float32(recordsToAverageCount), // 计算GPU平均使用率
Ram: sumRAM / int64(recordsToAverageCount),
RamTotal: records[0].Ram.Total,
Swap: sumSWAP / int64(recordsToAverageCount),
@@ -120,6 +129,136 @@ func AverageReport(uuid string, time time.Time, records []common.Report, topPerc
return newRecord
}
// AverageGPUReports 使用与 AverageReport 相同的聚合逻辑处理GPU数据
// 返回每个GPU设备的聚合记录
func AverageGPUReports(uuid string, time time.Time, reports []common.Report, topPercentage float64) []models.GPURecord {
if len(reports) == 0 {
return []models.GPURecord{}
}
// 收集所有GPU设备数据
deviceData := make(map[int]struct {
DeviceName string
MemTotal []int64
MemUsed []int64
Utilization []float64
Temperature []int
})
for _, report := range reports {
if report.GPU != nil && len(report.GPU.DetailedInfo) > 0 {
for idx, gpu := range report.GPU.DetailedInfo {
if _, exists := deviceData[idx]; !exists {
deviceData[idx] = struct {
DeviceName string
MemTotal []int64
MemUsed []int64
Utilization []float64
Temperature []int
}{DeviceName: gpu.Name}
}
data := deviceData[idx]
data.MemTotal = append(data.MemTotal, gpu.MemoryTotal)
data.MemUsed = append(data.MemUsed, gpu.MemoryUsed)
data.Utilization = append(data.Utilization, gpu.Utilization)
data.Temperature = append(data.Temperature, gpu.Temperature)
deviceData[idx] = data
}
}
}
// 复用现有的聚合逻辑
sumAndSort := func(values []float64, topPerc float64) float64 {
if len(values) == 0 {
return 0
}
count := len(values)
recordsToAverageCount := count
if topPerc > 0 && topPerc <= 1 {
recordsToAverageCount = int(float64(count) * topPerc)
if recordsToAverageCount == 0 && count > 0 {
recordsToAverageCount = 1
}
}
if topPerc > 0 && topPerc <= 1 {
sort.Float64s(values)
// 取最高的值
var sum float64
for i := count - recordsToAverageCount; i < count; i++ {
sum += values[i]
}
return sum / float64(recordsToAverageCount)
} else {
var sum float64
for _, val := range values {
sum += val
}
return sum / float64(count)
}
}
sumAndSortInt64 := func(values []int64, topPerc float64) int64 {
if len(values) == 0 {
return 0
}
count := len(values)
recordsToAverageCount := count
if topPerc > 0 && topPerc <= 1 {
recordsToAverageCount = int(float64(count) * topPerc)
if recordsToAverageCount == 0 && count > 0 {
recordsToAverageCount = 1
}
}
if topPerc > 0 && topPerc <= 1 {
sort.Slice(values, func(i, j int) bool { return values[i] > values[j] })
var sum int64
for i := 0; i < recordsToAverageCount; i++ {
sum += values[i]
}
return sum / int64(recordsToAverageCount)
} else {
var sum int64
for _, val := range values {
sum += val
}
return sum / int64(count)
}
}
sumAndSortInt := func(values []int, topPerc float64) int {
if len(values) == 0 {
return 0
}
int64Values := make([]int64, len(values))
for i, v := range values {
int64Values[i] = int64(v)
}
return int(sumAndSortInt64(int64Values, topPerc))
}
// 生成每个设备的聚合记录
var result []models.GPURecord
for deviceIndex, data := range deviceData {
if len(data.MemTotal) > 0 {
record := models.GPURecord{
Client: uuid,
Time: models.FromTime(time),
DeviceIndex: deviceIndex,
DeviceName: data.DeviceName,
MemTotal: sumAndSortInt64(data.MemTotal, topPercentage),
MemUsed: sumAndSortInt64(data.MemUsed, topPercentage),
Utilization: float32(sumAndSort(data.Utilization, topPercentage)),
Temperature: sumAndSortInt(data.Temperature, topPercentage),
}
result = append(result, record)
}
}
return result
}
func DataMasking(str string, private []string) string {
if str == "" || len(private) == 0 {
return str