refactor natsapi
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
import asyncio
|
||||
import datetime as dt
|
||||
import json
|
||||
import random
|
||||
import subprocess
|
||||
import tempfile
|
||||
from time import sleep
|
||||
from typing import Union
|
||||
|
||||
@@ -252,3 +255,48 @@ def run_script_email_results_task(
|
||||
server.quit()
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
|
||||
def _get_nats_config() -> dict:
|
||||
return {
|
||||
"key": settings.SECRET_KEY,
|
||||
"natsurl": f"tls://{settings.ALLOWED_HOSTS[0]}:4222",
|
||||
}
|
||||
|
||||
|
||||
@app.task
|
||||
def monitor_agents_task() -> None:
|
||||
agents = Agent.objects.only(
|
||||
"pk", "agent_id", "last_seen", "overdue_time", "offline_time"
|
||||
)
|
||||
ret = [i.agent_id for i in agents if i.status != "online"]
|
||||
config = _get_nats_config()
|
||||
config["agents"] = ret
|
||||
with tempfile.NamedTemporaryFile() as fp:
|
||||
with open(fp.name, "w") as f:
|
||||
json.dump(config, f)
|
||||
|
||||
cmd = ["/usr/local/bin/nats-api", "-c", fp.name, "-m", "monitor"]
|
||||
try:
|
||||
subprocess.run(cmd, capture_output=True, timeout=30)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
|
||||
@app.task
|
||||
def get_wmi_task() -> None:
|
||||
agents = Agent.objects.only(
|
||||
"pk", "agent_id", "last_seen", "overdue_time", "offline_time"
|
||||
)
|
||||
ret = [i.agent_id for i in agents if i.status == "online"]
|
||||
config = _get_nats_config()
|
||||
config["agents"] = ret
|
||||
with tempfile.NamedTemporaryFile() as fp:
|
||||
with open(fp.name, "w") as f:
|
||||
json.dump(config, f)
|
||||
|
||||
cmd = ["/usr/local/bin/nats-api", "-c", fp.name, "-m", "wmi"]
|
||||
try:
|
||||
subprocess.run(cmd, capture_output=True, timeout=30)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
@@ -1,5 +0,0 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class NatsapiConfig(AppConfig):
|
||||
name = "natsapi"
|
@@ -1,36 +0,0 @@
|
||||
from django.conf import settings
|
||||
from model_bakery import baker
|
||||
|
||||
from tacticalrmm.test import TacticalTestCase
|
||||
|
||||
|
||||
class TestNatsAPIViews(TacticalTestCase):
|
||||
def setUp(self):
|
||||
self.authenticate()
|
||||
self.setup_coresettings()
|
||||
|
||||
def test_nats_agents(self):
|
||||
baker.make_recipe(
|
||||
"agents.online_agent", version=settings.LATEST_AGENT_VER, _quantity=14
|
||||
)
|
||||
|
||||
baker.make_recipe(
|
||||
"agents.offline_agent", version=settings.LATEST_AGENT_VER, _quantity=6
|
||||
)
|
||||
baker.make_recipe(
|
||||
"agents.overdue_agent", version=settings.LATEST_AGENT_VER, _quantity=5
|
||||
)
|
||||
|
||||
url = "/natsapi/online/agents/"
|
||||
r = self.client.get(url)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
self.assertEqual(len(r.json()["agent_ids"]), 14)
|
||||
|
||||
url = "/natsapi/offline/agents/"
|
||||
r = self.client.get(url)
|
||||
self.assertEqual(r.status_code, 200)
|
||||
self.assertEqual(len(r.json()["agent_ids"]), 11)
|
||||
|
||||
url = "/natsapi/asdjaksdasd/agents/"
|
||||
r = self.client.get(url)
|
||||
self.assertEqual(r.status_code, 400)
|
@@ -1,9 +0,0 @@
|
||||
from django.urls import path
|
||||
|
||||
from . import views
|
||||
|
||||
urlpatterns = [
|
||||
path("natsinfo/", views.nats_info),
|
||||
path("<str:stat>/agents/", views.NatsAgents.as_view()),
|
||||
path("logcrash/", views.LogCrash.as_view()),
|
||||
]
|
@@ -1,60 +0,0 @@
|
||||
from django.conf import settings
|
||||
from django.shortcuts import get_object_or_404
|
||||
from django.utils import timezone as djangotime
|
||||
from loguru import logger
|
||||
from rest_framework.decorators import (
|
||||
api_view,
|
||||
authentication_classes,
|
||||
permission_classes,
|
||||
)
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
|
||||
from agents.models import Agent
|
||||
from tacticalrmm.utils import notify_error
|
||||
|
||||
logger.configure(**settings.LOG_CONFIG)
|
||||
|
||||
|
||||
@api_view()
|
||||
@permission_classes([])
|
||||
@authentication_classes([])
|
||||
def nats_info(request):
|
||||
return Response({"user": "tacticalrmm", "password": settings.SECRET_KEY})
|
||||
|
||||
|
||||
class NatsAgents(APIView):
|
||||
authentication_classes = [] # type: ignore
|
||||
permission_classes = [] # type: ignore
|
||||
|
||||
def get(self, request, stat: str):
|
||||
if stat not in ["online", "offline"]:
|
||||
return notify_error("invalid request")
|
||||
|
||||
ret: list[str] = []
|
||||
agents = Agent.objects.only(
|
||||
"pk", "agent_id", "version", "last_seen", "overdue_time", "offline_time"
|
||||
)
|
||||
if stat == "online":
|
||||
ret = [i.agent_id for i in agents if i.status == "online"]
|
||||
else:
|
||||
ret = [i.agent_id for i in agents if i.status != "online"]
|
||||
|
||||
return Response({"agent_ids": ret})
|
||||
|
||||
|
||||
class LogCrash(APIView):
|
||||
authentication_classes = [] # type: ignore
|
||||
permission_classes = [] # type: ignore
|
||||
|
||||
def post(self, request):
|
||||
agent = get_object_or_404(Agent, agent_id=request.data["agentid"])
|
||||
agent.last_seen = djangotime.now()
|
||||
agent.save(update_fields=["last_seen"])
|
||||
|
||||
if hasattr(settings, "DEBUGTEST") and settings.DEBUGTEST:
|
||||
logger.info(
|
||||
f"Detected crashed tacticalagent service on {agent.hostname} v{agent.version}, attempting recovery"
|
||||
)
|
||||
|
||||
return Response("ok")
|
@@ -35,6 +35,14 @@ app.conf.beat_schedule = {
|
||||
"task": "agents.tasks.auto_self_agent_update_task",
|
||||
"schedule": crontab(minute=35, hour="*"),
|
||||
},
|
||||
"monitor-agents": {
|
||||
"task": "agents.tasks.monitor_agents_task",
|
||||
"schedule": crontab(minute="*/7"),
|
||||
},
|
||||
"get-wmi": {
|
||||
"task": "agents.tasks.get_wmi_task",
|
||||
"schedule": crontab(minute="*/18"),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
@@ -15,7 +15,6 @@ def get_debug_info():
|
||||
|
||||
|
||||
EXCLUDE_PATHS = (
|
||||
"/natsapi",
|
||||
"/api/v3",
|
||||
"/logs/auditlogs",
|
||||
f"/{settings.ADMIN_URL}",
|
||||
|
@@ -61,7 +61,6 @@ INSTALLED_APPS = [
|
||||
"logs",
|
||||
"scripts",
|
||||
"alerts",
|
||||
"natsapi",
|
||||
]
|
||||
|
||||
if not "AZPIPELINE" in os.environ:
|
||||
|
@@ -23,7 +23,6 @@ urlpatterns = [
|
||||
path("scripts/", include("scripts.urls")),
|
||||
path("alerts/", include("alerts.urls")),
|
||||
path("accounts/", include("accounts.urls")),
|
||||
path("natsapi/", include("natsapi.urls")),
|
||||
]
|
||||
|
||||
if hasattr(settings, "ADMIN_ENABLED") and settings.ADMIN_ENABLED:
|
||||
|
2
go.mod
2
go.mod
@@ -3,9 +3,7 @@ module github.com/wh1te909/tacticalrmm
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/go-resty/resty/v2 v2.5.0
|
||||
github.com/nats-io/nats.go v1.10.1-0.20210107160453-a133396829fc
|
||||
github.com/ugorji/go/codec v1.2.4
|
||||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 // indirect
|
||||
golang.org/x/sys v0.0.0-20210122235752-a8b976e07c7b // indirect
|
||||
)
|
||||
|
9
go.sum
9
go.sum
@@ -1,5 +1,3 @@
|
||||
github.com/go-resty/resty/v2 v2.5.0 h1:WFb5bD49/85PO7WgAjZ+/TJQ+Ty1XOcWEfD1zIFCM1c=
|
||||
github.com/go-resty/resty/v2 v2.5.0/go.mod h1:B88+xCTEwvfD94NOuE6GS1wMlnoKNY8eEiNizfNwOwA=
|
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
|
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
@@ -45,22 +43,15 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPh
|
||||
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
|
||||
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew=
|
||||
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191022100944-742c48ecaeb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210122235752-a8b976e07c7b h1:HSSdksA3iHk8fuZz7C7+A6tDgtIRF+7FSXu5TgK09I8=
|
||||
golang.org/x/sys v0.0.0-20210122235752-a8b976e07c7b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI=
|
||||
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
|
17
main.go
17
main.go
@@ -1,6 +1,6 @@
|
||||
package main
|
||||
|
||||
// env CGO_ENABLED=0 go build -v -a -ldflags "-s -w" -o nats-api
|
||||
// env CGO_ENABLED=0 go build -ldflags "-s -w" -o nats-api
|
||||
|
||||
import (
|
||||
"flag"
|
||||
@@ -9,13 +9,12 @@ import (
|
||||
"github.com/wh1te909/tacticalrmm/natsapi"
|
||||
)
|
||||
|
||||
var version = "1.1.1"
|
||||
var version = "2.0.0"
|
||||
|
||||
func main() {
|
||||
ver := flag.Bool("version", false, "Prints version")
|
||||
apiHost := flag.String("api-host", "", "django full base url")
|
||||
natsHost := flag.String("nats-host", "", "nats full connection string")
|
||||
debug := flag.Bool("debug", false, "Debug")
|
||||
mode := flag.String("m", "", "Mode")
|
||||
config := flag.String("c", "", "config file")
|
||||
flag.Parse()
|
||||
|
||||
if *ver {
|
||||
@@ -23,5 +22,11 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
api.Listen(*apiHost, *natsHost, version, *debug)
|
||||
switch *mode {
|
||||
case "monitor":
|
||||
api.MonitorAgents(*config)
|
||||
case "wmi":
|
||||
api.GetWMI(*config)
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -1,82 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-resty/resty/v2"
|
||||
nats "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
var rClient = resty.New()
|
||||
|
||||
func getAPI(apihost, natshost string) (string, string, error) {
|
||||
if apihost != "" && natshost != "" {
|
||||
return apihost, natshost, nil
|
||||
}
|
||||
|
||||
f, err := os.Open(`/etc/nginx/sites-available/rmm.conf`)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
for scanner.Scan() {
|
||||
if strings.Contains(scanner.Text(), "server_name") && !strings.Contains(scanner.Text(), "301") {
|
||||
r := strings.NewReplacer("server_name", "", ";", "")
|
||||
s := strings.ReplaceAll(r.Replace(scanner.Text()), " ", "")
|
||||
return fmt.Sprintf("https://%s/natsapi", s), fmt.Sprintf("tls://%s:4222", s), nil
|
||||
}
|
||||
}
|
||||
return "", "", errors.New("unable to parse api from nginx conf")
|
||||
}
|
||||
|
||||
func Listen(apihost, natshost, version string, debug bool) {
|
||||
api, natsurl, err := getAPI(apihost, natshost)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
log.Printf("Tactical Nats API Version %s\n", version)
|
||||
log.Println("Api base url: ", api)
|
||||
log.Println("Nats connection url: ", natsurl)
|
||||
|
||||
rClient.SetHostURL(api)
|
||||
rClient.SetTimeout(10 * time.Second)
|
||||
natsinfo, err := rClient.R().SetResult(&NatsInfo{}).Get("/natsinfo/")
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
if natsinfo.IsError() {
|
||||
log.Fatalln(natsinfo.String())
|
||||
}
|
||||
|
||||
opts := []nats.Option{
|
||||
nats.Name("TacticalRMM"),
|
||||
nats.UserInfo(natsinfo.Result().(*NatsInfo).User,
|
||||
natsinfo.Result().(*NatsInfo).Password),
|
||||
nats.ReconnectWait(time.Second * 5),
|
||||
nats.RetryOnFailedConnect(true),
|
||||
nats.MaxReconnects(-1),
|
||||
nats.ReconnectBufSize(-1),
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(natsurl, opts...)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go getWMI(rClient, nc)
|
||||
go monitorAgents(rClient, nc)
|
||||
wg.Wait()
|
||||
}
|
Binary file not shown.
139
natsapi/tasks.go
139
natsapi/tasks.go
@@ -1,15 +1,42 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-resty/resty/v2"
|
||||
nats "github.com/nats-io/nats.go"
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
func monitorAgents(c *resty.Client, nc *nats.Conn) {
|
||||
type JsonFile struct {
|
||||
Agents []string `json:"agents"`
|
||||
Key string `json:"key"`
|
||||
NatsURL string `json:"natsurl"`
|
||||
}
|
||||
|
||||
type Recovery struct {
|
||||
Func string `json:"func"`
|
||||
Data map[string]string `json:"payload"`
|
||||
}
|
||||
|
||||
func setupNatsOptions(key string) []nats.Option {
|
||||
opts := []nats.Option{
|
||||
nats.Name("TacticalRMM"),
|
||||
nats.UserInfo("tacticalrmm", key),
|
||||
nats.ReconnectWait(time.Second * 2),
|
||||
nats.RetryOnFailedConnect(true),
|
||||
nats.MaxReconnects(3),
|
||||
nats.ReconnectBufSize(-1),
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
func MonitorAgents(file string) {
|
||||
var result JsonFile
|
||||
var payload, recPayload []byte
|
||||
var mh codec.MsgpackHandle
|
||||
mh.RawToString = true
|
||||
@@ -22,56 +49,80 @@ func monitorAgents(c *resty.Client, nc *nats.Conn) {
|
||||
Data: map[string]string{"mode": "tacagent"},
|
||||
})
|
||||
|
||||
tick := time.NewTicker(7 * time.Minute)
|
||||
for range tick.C {
|
||||
var wg sync.WaitGroup
|
||||
agentids, _ := c.R().SetResult(&AgentIDS{}).Get("/offline/agents/")
|
||||
ids := agentids.Result().(*AgentIDS).IDs
|
||||
wg.Add(len(ids))
|
||||
var resp string
|
||||
|
||||
for _, id := range ids {
|
||||
go func(id string, nc *nats.Conn, wg *sync.WaitGroup, c *resty.Client) {
|
||||
defer wg.Done()
|
||||
out, err := nc.Request(id, payload, 1*time.Second)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
dec := codec.NewDecoderBytes(out.Data, &mh)
|
||||
if err := dec.Decode(&resp); err == nil {
|
||||
// if the agent is respoding to pong from the rpc service but is not showing as online (handled by tacticalagent service)
|
||||
// then tacticalagent service is hung. forcefully restart it
|
||||
if resp == "pong" {
|
||||
nc.Publish(id, recPayload)
|
||||
p := map[string]string{"agentid": id}
|
||||
c.R().SetBody(p).Post("/logcrash/")
|
||||
}
|
||||
}
|
||||
}(id, nc, &wg, c)
|
||||
}
|
||||
wg.Wait()
|
||||
jret, _ := ioutil.ReadFile(file)
|
||||
err := json.Unmarshal(jret, &result)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
opts := setupNatsOptions(result.Key)
|
||||
|
||||
nc, err := nats.Connect(result.NatsURL, opts...)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var resp string
|
||||
wg.Add(len(result.Agents))
|
||||
|
||||
for _, id := range result.Agents {
|
||||
go func(id string, nc *nats.Conn, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
out, err := nc.Request(id, payload, 1*time.Second)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
dec := codec.NewDecoderBytes(out.Data, &mh)
|
||||
if err := dec.Decode(&resp); err == nil {
|
||||
// if the agent is respoding to pong from the rpc service but is not showing as online (handled by tacticalagent service)
|
||||
// then tacticalagent service is hung. forcefully restart it
|
||||
if resp == "pong" {
|
||||
nc.Publish(id, recPayload)
|
||||
}
|
||||
}
|
||||
}(id, nc, &wg)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func getWMI(c *resty.Client, nc *nats.Conn) {
|
||||
func GetWMI(file string) {
|
||||
var result JsonFile
|
||||
var payload []byte
|
||||
var mh codec.MsgpackHandle
|
||||
mh.RawToString = true
|
||||
ret := codec.NewEncoderBytes(&payload, new(codec.MsgpackHandle))
|
||||
ret.Encode(map[string]string{"func": "wmi"})
|
||||
|
||||
tick := time.NewTicker(18 * time.Minute)
|
||||
for range tick.C {
|
||||
agentids, _ := c.R().SetResult(&AgentIDS{}).Get("/online/agents/")
|
||||
ids := agentids.Result().(*AgentIDS).IDs
|
||||
chunks := makeChunks(ids, 40)
|
||||
|
||||
for _, id := range chunks {
|
||||
for _, chunk := range id {
|
||||
nc.Publish(chunk, payload)
|
||||
time.Sleep(time.Duration(randRange(50, 400)) * time.Millisecond)
|
||||
}
|
||||
time.Sleep(15 * time.Second)
|
||||
}
|
||||
jret, _ := ioutil.ReadFile(file)
|
||||
err := json.Unmarshal(jret, &result)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
|
||||
opts := setupNatsOptions(result.Key)
|
||||
|
||||
nc, err := nats.Connect(result.NatsURL, opts...)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(result.Agents))
|
||||
|
||||
for _, id := range result.Agents {
|
||||
go func(id string, nc *nats.Conn, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
time.Sleep(time.Duration(randRange(0, 20)) * time.Second)
|
||||
nc.Publish(id, payload)
|
||||
}(id, nc, &wg)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func randRange(min, max int) int {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
return rand.Intn(max-min) + min
|
||||
}
|
||||
|
@@ -1,15 +0,0 @@
|
||||
package api
|
||||
|
||||
type NatsInfo struct {
|
||||
User string `json:"user"`
|
||||
Password string `json:"password"`
|
||||
}
|
||||
|
||||
type AgentIDS struct {
|
||||
IDs []string `json:"agent_ids"`
|
||||
}
|
||||
|
||||
type Recovery struct {
|
||||
Func string `json:"func"`
|
||||
Data map[string]string `json:"payload"`
|
||||
}
|
@@ -1,23 +0,0 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
func makeChunks(ids []string, chunkSize int) [][]string {
|
||||
var chunks [][]string
|
||||
for i := 0; i < len(ids); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(ids) {
|
||||
end = len(ids)
|
||||
}
|
||||
chunks = append(chunks, ids[i:end])
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
|
||||
func randRange(min, max int) int {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
return rand.Intn(max-min) + min
|
||||
}
|
Reference in New Issue
Block a user