move checkin to go
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
import asyncio
|
||||
import datetime as dt
|
||||
import random
|
||||
import tempfile
|
||||
import json
|
||||
import subprocess
|
||||
import urllib.parse
|
||||
from time import sleep
|
||||
from typing import Union
|
||||
@@ -322,4 +325,23 @@ def get_wmi_task() -> None:
|
||||
"pk", "agent_id", "last_seen", "overdue_time", "offline_time"
|
||||
)
|
||||
ids = [i.agent_id for i in agents if i.status == "online"]
|
||||
run_nats_api_cmd("wmi", ids)
|
||||
run_nats_api_cmd("wmi", ids, timeout=45)
|
||||
|
||||
|
||||
@app.task
|
||||
def agent_checkin_task() -> None:
|
||||
db = settings.DATABASES["default"]
|
||||
config = {
|
||||
"key": settings.SECRET_KEY,
|
||||
"natsurl": f"tls://{settings.ALLOWED_HOSTS[0]}:4222",
|
||||
"user": db["USER"],
|
||||
"pass": db["PASSWORD"],
|
||||
"host": db["HOST"],
|
||||
"port": int(db["PORT"]),
|
||||
"dbname": db["NAME"],
|
||||
}
|
||||
with tempfile.NamedTemporaryFile() as fp:
|
||||
with open(fp.name, "w") as f:
|
||||
json.dump(config, f)
|
||||
cmd = ["/usr/local/bin/nats-api", "-c", fp.name, "-m", "checkin"]
|
||||
subprocess.run(cmd, timeout=30)
|
||||
|
@@ -54,10 +54,11 @@ def debug_task(self):
|
||||
@app.on_after_finalize.connect
|
||||
def setup_periodic_tasks(sender, **kwargs):
|
||||
|
||||
from agents.tasks import agent_outages_task
|
||||
from agents.tasks import agent_outages_task, agent_checkin_task
|
||||
from alerts.tasks import unsnooze_alerts
|
||||
from core.tasks import core_maintenance_tasks, cache_db_fields_task
|
||||
|
||||
sender.add_periodic_task(45.0, agent_checkin_task.s())
|
||||
sender.add_periodic_task(60.0, agent_outages_task.s())
|
||||
sender.add_periodic_task(60.0 * 30, core_maintenance_tasks.s())
|
||||
sender.add_periodic_task(60.0 * 60, unsnooze_alerts.s())
|
||||
|
2
go.mod
2
go.mod
@@ -3,6 +3,8 @@ module github.com/wh1te909/tacticalrmm
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/jmoiron/sqlx v1.3.4
|
||||
github.com/lib/pq v1.10.2
|
||||
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed // indirect
|
||||
github.com/nats-io/nats.go v1.11.0
|
||||
github.com/ugorji/go/codec v1.2.6
|
||||
|
9
go.sum
9
go.sum
@@ -1,3 +1,5 @@
|
||||
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
|
||||
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
@@ -8,6 +10,13 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
|
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w=
|
||||
github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
|
||||
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
|
||||
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
|
||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2BA=
|
||||
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
|
||||
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
|
||||
|
7
main.go
7
main.go
@@ -9,7 +9,7 @@ import (
|
||||
"github.com/wh1te909/tacticalrmm/natsapi"
|
||||
)
|
||||
|
||||
var version = "2.0.0"
|
||||
var version = "2.1.0"
|
||||
|
||||
func main() {
|
||||
ver := flag.Bool("version", false, "Prints version")
|
||||
@@ -27,6 +27,9 @@ func main() {
|
||||
api.MonitorAgents(*config)
|
||||
case "wmi":
|
||||
api.GetWMI(*config)
|
||||
case "checkin":
|
||||
api.CheckIn(*config)
|
||||
default:
|
||||
fmt.Println(version)
|
||||
}
|
||||
|
||||
}
|
||||
|
Binary file not shown.
104
natsapi/tasks.go
104
natsapi/tasks.go
@@ -2,12 +2,15 @@ package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/lib/pq"
|
||||
nats "github.com/nats-io/nats.go"
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
@@ -18,6 +21,21 @@ type JsonFile struct {
|
||||
NatsURL string `json:"natsurl"`
|
||||
}
|
||||
|
||||
type DjangoConfig struct {
|
||||
Key string `json:"key"`
|
||||
NatsURL string `json:"natsurl"`
|
||||
User string `json:"user"`
|
||||
Pass string `json:"pass"`
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
DBName string `json:"dbname"`
|
||||
}
|
||||
|
||||
type Agent struct {
|
||||
ID int `db:"id"`
|
||||
AgentID string `db:"agent_id"`
|
||||
}
|
||||
|
||||
type Recovery struct {
|
||||
Func string `json:"func"`
|
||||
Data map[string]string `json:"payload"`
|
||||
@@ -87,6 +105,90 @@ func MonitorAgents(file string) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func CheckIn(file string) {
|
||||
var r DjangoConfig
|
||||
|
||||
jret, _ := ioutil.ReadFile(file)
|
||||
err := json.Unmarshal(jret, &r)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
psqlInfo := fmt.Sprintf("host=%s port=%d user=%s "+
|
||||
"password=%s dbname=%s sslmode=disable",
|
||||
r.Host, r.Port, r.User, r.Pass, r.DBName)
|
||||
|
||||
db, err := sqlx.Connect("postgres", psqlInfo)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
db.SetMaxOpenConns(15)
|
||||
|
||||
agent := Agent{}
|
||||
agents := make([]Agent, 0)
|
||||
rows, err := db.Queryx("SELECT agents_agent.id, agents_agent.agent_id FROM agents_agent")
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
err := rows.StructScan(&agent)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
agents = append(agents, agent)
|
||||
}
|
||||
|
||||
var payload []byte
|
||||
ret := codec.NewEncoderBytes(&payload, new(codec.MsgpackHandle))
|
||||
ret.Encode(map[string]string{"func": "ping"})
|
||||
|
||||
opts := setupNatsOptions(r.Key)
|
||||
|
||||
nc, err := nats.Connect(r.NatsURL, opts...)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(agents))
|
||||
|
||||
loc, _ := time.LoadLocation("UTC")
|
||||
now := time.Now().In(loc)
|
||||
|
||||
for _, a := range agents {
|
||||
go func(id string, pk int, nc *nats.Conn, wg *sync.WaitGroup, db *sqlx.DB, now time.Time) {
|
||||
defer wg.Done()
|
||||
|
||||
var resp string
|
||||
var mh codec.MsgpackHandle
|
||||
mh.RawToString = true
|
||||
|
||||
time.Sleep(time.Duration(randRange(100, 1500)) * time.Millisecond)
|
||||
out, err := nc.Request(id, payload, 1*time.Second)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
dec := codec.NewDecoderBytes(out.Data, &mh)
|
||||
if err := dec.Decode(&resp); err == nil {
|
||||
if resp == "pong" {
|
||||
_, err = db.NamedExec(
|
||||
`UPDATE agents_agent SET last_seen=:lastSeen WHERE agents_agent.id=:pk`,
|
||||
map[string]interface{}{"lastSeen": now, "pk": pk},
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(a.AgentID, a.ID, nc, &wg, db, now)
|
||||
}
|
||||
wg.Wait()
|
||||
db.Close()
|
||||
}
|
||||
|
||||
func GetWMI(file string) {
|
||||
var result JsonFile
|
||||
var payload []byte
|
||||
@@ -115,7 +217,7 @@ func GetWMI(file string) {
|
||||
for _, id := range result.Agents {
|
||||
go func(id string, nc *nats.Conn, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
time.Sleep(time.Duration(randRange(0, 20)) * time.Second)
|
||||
time.Sleep(time.Duration(randRange(0, 28)) * time.Second)
|
||||
nc.Publish(id, payload)
|
||||
}(id, nc, &wg)
|
||||
}
|
||||
|
Reference in New Issue
Block a user