refactor: v2 (#150)

This commit is contained in:
Louis
2023-08-09 19:47:20 -07:00
committed by GitHub
parent 1298b9408d
commit ae56e41786
75 changed files with 4905 additions and 4828 deletions

View File

@@ -1,171 +0,0 @@
package utils
import (
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
MetricTrafficBytes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_traffic_bytes",
Help: "Bytes received by the application.",
},
[]string{"remote_ip", "local_ip", "local_port", "type"},
)
MetricTrafficPackets = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_traffic_packets",
Help: "Packets received by the application.",
},
[]string{"remote_ip", "local_ip", "local_port", "type"},
)
MetricPacketSizeSum = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "flow_traffic_summary_size_bytes",
Help: "Summary of packet size.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"remote_ip", "local_ip", "local_port", "type"},
)
DecoderStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_decoder_count",
Help: "Decoder processed count.",
},
[]string{"worker", "name"},
)
DecoderErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_decoder_error_count",
Help: "Decoder processed error count.",
},
[]string{"worker", "name"},
)
DecoderTime = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "flow_summary_decoding_time_us",
Help: "Decoding time summary.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"name"},
)
DecoderProcessTime = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "flow_summary_processing_time_us",
Help: "Processing time summary.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"name"},
)
NetFlowStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_nf_count",
Help: "NetFlows processed.",
},
[]string{"router", "version"},
)
NetFlowErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_nf_errors_count",
Help: "NetFlows processed errors.",
},
[]string{"router", "error"},
)
NetFlowSetRecordsStatsSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_nf_flowset_records_sum",
Help: "NetFlows FlowSets sum of records.",
},
[]string{"router", "version", "type"}, // data-template, data, opts...
)
NetFlowSetStatsSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_nf_flowset_sum",
Help: "NetFlows FlowSets sum.",
},
[]string{"router", "version", "type"}, // data-template, data, opts...
)
NetFlowTimeStatsSum = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "flow_process_nf_delay_summary_seconds",
Help: "NetFlows time difference between time of flow and processing.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"router", "version"},
)
NetFlowTemplatesStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_nf_templates_count",
Help: "NetFlows Template count.",
},
[]string{"router", "version", "obs_domain_id", "template_id", "type"}, // options/template
)
SFlowStats = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_sf_count",
Help: "sFlows processed.",
},
[]string{"router", "agent", "version"},
)
SFlowErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_sf_errors_count",
Help: "sFlows processed errors.",
},
[]string{"router", "error"},
)
SFlowSampleStatsSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_sf_samples_sum",
Help: "SFlows samples sum.",
},
[]string{"router", "agent", "version", "type"}, // counter, flow, expanded...
)
SFlowSampleRecordsStatsSum = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flow_process_sf_samples_records_sum",
Help: "SFlows samples sum of records.",
},
[]string{"router", "agent", "version", "type"}, // data-template, data, opts...
)
)
func init() {
prometheus.MustRegister(MetricTrafficBytes)
prometheus.MustRegister(MetricTrafficPackets)
prometheus.MustRegister(MetricPacketSizeSum)
prometheus.MustRegister(DecoderStats)
prometheus.MustRegister(DecoderErrors)
prometheus.MustRegister(DecoderTime)
prometheus.MustRegister(DecoderProcessTime)
prometheus.MustRegister(NetFlowStats)
prometheus.MustRegister(NetFlowErrors)
prometheus.MustRegister(NetFlowSetRecordsStatsSum)
prometheus.MustRegister(NetFlowSetStatsSum)
prometheus.MustRegister(NetFlowTimeStatsSum)
prometheus.MustRegister(NetFlowTemplatesStats)
prometheus.MustRegister(SFlowStats)
prometheus.MustRegister(SFlowErrors)
prometheus.MustRegister(SFlowSampleStatsSum)
prometheus.MustRegister(SFlowSampleRecordsStatsSum)
}
func DefaultAccountCallback(name string, id int, start, end time.Time) {
DecoderProcessTime.With(
prometheus.Labels{
"name": name,
}).
Observe(float64((end.Sub(start)).Nanoseconds()) / 1000)
DecoderStats.With(
prometheus.Labels{
"worker": strconv.Itoa(id),
"name": name,
}).
Inc()
}

View File

@@ -1,377 +0,0 @@
package utils
import (
"bytes"
"context"
"sync"
"time"
"github.com/netsampler/goflow2/decoders/netflow"
"github.com/netsampler/goflow2/decoders/netflow/templates"
"github.com/netsampler/goflow2/format"
flowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
"github.com/netsampler/goflow2/transport"
"github.com/prometheus/client_golang/prometheus"
)
/*
type TemplateSystem struct {
key string
templates *netflow.BasicTemplateSystem
}
func (s *TemplateSystem) AddTemplate(version uint16, obsDomainId uint32, template interface{}) {
s.templates.AddTemplate(version, obsDomainId, template)
typeStr := "options_template"
var templateId uint16
switch templateIdConv := template.(type) {
case netflow.IPFIXOptionsTemplateRecord:
templateId = templateIdConv.TemplateId
case netflow.NFv9OptionsTemplateRecord:
templateId = templateIdConv.TemplateId
case netflow.TemplateRecord:
templateId = templateIdConv.TemplateId
typeStr = "template"
}
NetFlowTemplatesStats.With(
prometheus.Labels{
"router": s.key,
"version": strconv.Itoa(int(version)),
"obs_domain_id": strconv.Itoa(int(obsDomainId)),
"template_id": strconv.Itoa(int(templateId)),
"type": typeStr,
}).
Inc()
}
func (s *TemplateSystem) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
return s.templates.GetTemplate(version, obsDomainId, templateId)
}
*/
type StateNetFlow struct {
stopper
Format format.FormatInterface
Transport transport.TransportInterface
Logger Logger
/*templateslock *sync.RWMutex
templates map[string]*TemplateSystem*/
samplinglock *sync.RWMutex
sampling map[string]producer.SamplingRateSystem
Config *producer.ProducerConfig
configMapped *producer.ProducerConfigMapped
TemplateSystem templates.TemplateInterface
ctx context.Context
}
func NewStateNetFlow() *StateNetFlow {
return &StateNetFlow{
ctx: context.Background(),
samplinglock: &sync.RWMutex{},
sampling: make(map[string]producer.SamplingRateSystem),
}
}
func (s *StateNetFlow) DecodeFlow(msg interface{}) error {
pkt := msg.(BaseMessage)
buf := bytes.NewBuffer(pkt.Payload)
key := pkt.Src.String()
samplerAddress := pkt.Src
if samplerAddress.To4() != nil {
samplerAddress = samplerAddress.To4()
}
s.samplinglock.RLock()
sampling, ok := s.sampling[key]
s.samplinglock.RUnlock()
if !ok {
sampling = producer.CreateSamplingSystem()
s.samplinglock.Lock()
s.sampling[key] = sampling
s.samplinglock.Unlock()
}
ts := uint64(time.Now().UTC().Unix())
if pkt.SetTime {
ts = uint64(pkt.RecvTime.UTC().Unix())
}
timeTrackStart := time.Now()
msgDec, err := netflow.DecodeMessageContext(s.ctx, buf, key, netflow.TemplateWrapper{s.ctx, key, s.TemplateSystem})
if err != nil {
switch err.(type) {
case *netflow.ErrorTemplateNotFound:
NetFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "template_not_found",
}).
Inc()
default:
NetFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_decoding",
}).
Inc()
}
return err
}
var flowMessageSet []*flowmessage.FlowMessage
switch msgDecConv := msgDec.(type) {
case netflow.NFv9Packet:
NetFlowStats.With(
prometheus.Labels{
"router": key,
"version": "9",
}).
Inc()
for _, fs := range msgDecConv.FlowSets {
switch fsConv := fs.(type) {
case netflow.TemplateFlowSet:
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
"type": "TemplateFlowSet",
}).
Inc()
NetFlowSetRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
"type": "TemplateFlowSet",
}).
Add(float64(len(fsConv.Records)))
case netflow.NFv9OptionsTemplateFlowSet:
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
"type": "OptionsTemplateFlowSet",
}).
Inc()
NetFlowSetRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
"type": "OptionsTemplateFlowSet",
}).
Add(float64(len(fsConv.Records)))
case netflow.OptionsDataFlowSet:
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
"type": "OptionsDataFlowSet",
}).
Inc()
NetFlowSetRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
"type": "OptionsDataFlowSet",
}).
Add(float64(len(fsConv.Records)))
case netflow.DataFlowSet:
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
"type": "DataFlowSet",
}).
Inc()
NetFlowSetRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
"type": "DataFlowSet",
}).
Add(float64(len(fsConv.Records)))
}
}
flowMessageSet, err = producer.ProcessMessageNetFlowConfig(msgDecConv, sampling, s.configMapped)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.SamplerAddress = samplerAddress
timeDiff := fmsg.TimeReceived - fmsg.TimeFlowEnd
NetFlowTimeStatsSum.With(
prometheus.Labels{
"router": key,
"version": "9",
}).
Observe(float64(timeDiff))
}
case netflow.IPFIXPacket:
NetFlowStats.With(
prometheus.Labels{
"router": key,
"version": "10",
}).
Inc()
for _, fs := range msgDecConv.FlowSets {
switch fsConv := fs.(type) {
case netflow.TemplateFlowSet:
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
"type": "TemplateFlowSet",
}).
Inc()
NetFlowSetRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
"type": "TemplateFlowSet",
}).
Add(float64(len(fsConv.Records)))
case netflow.IPFIXOptionsTemplateFlowSet:
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
"type": "OptionsTemplateFlowSet",
}).
Inc()
NetFlowSetRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
"type": "OptionsTemplateFlowSet",
}).
Add(float64(len(fsConv.Records)))
case netflow.OptionsDataFlowSet:
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
"type": "OptionsDataFlowSet",
}).
Inc()
NetFlowSetRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
"type": "OptionsDataFlowSet",
}).
Add(float64(len(fsConv.Records)))
case netflow.DataFlowSet:
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
"type": "DataFlowSet",
}).
Inc()
NetFlowSetRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
"type": "DataFlowSet",
}).
Add(float64(len(fsConv.Records)))
}
}
flowMessageSet, err = producer.ProcessMessageNetFlowConfig(msgDecConv, sampling, s.configMapped)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.SamplerAddress = samplerAddress
timeDiff := fmsg.TimeReceived - fmsg.TimeFlowEnd
NetFlowTimeStatsSum.With(
prometheus.Labels{
"router": key,
"version": "10",
}).
Observe(float64(timeDiff))
}
}
timeTrackStop := time.Now()
DecoderTime.With(
prometheus.Labels{
"name": "NetFlow",
}).
Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
for _, fmsg := range flowMessageSet {
if s.Format != nil {
key, data, err := s.Format.Format(fmsg)
if err != nil && s.Logger != nil {
s.Logger.Error(err)
}
if err == nil && s.Transport != nil {
err = s.Transport.Send(key, data)
if err != nil {
s.Logger.Error(err)
}
}
}
}
return nil
}
/*
func (s *StateNetFlow) ServeHTTPTemplates(w http.ResponseWriter, r *http.Request) {
tmp := make(map[string]map[uint16]map[uint32]map[uint16]interface{})
s.templateslock.RLock()
for key, templatesrouterstr := range s.templates {
templatesrouter := templatesrouterstr.templates.GetTemplates()
tmp[key] = templatesrouter
}
s.templateslock.RUnlock()
enc := json.NewEncoder(w)
enc.Encode(tmp)
}
func (s *StateNetFlow) InitTemplates() {
s.templates = make(map[string]*TemplateSystem)
s.templateslock = &sync.RWMutex{}
s.sampling = make(map[string]producer.SamplingRateSystem)
s.samplinglock = &sync.RWMutex{}
}*/
func (s *StateNetFlow) initConfig() {
s.configMapped = producer.NewProducerConfigMapped(s.Config)
}
func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
if err := s.start(); err != nil {
return err
}
//s.InitTemplates()
s.initConfig()
return UDPStoppableRoutine(s.stopCh, "NetFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}
// FlowRoutineCtx?

View File

@@ -1,111 +0,0 @@
package utils
import (
"bytes"
"time"
"github.com/netsampler/goflow2/decoders/netflowlegacy"
"github.com/netsampler/goflow2/format"
flowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
"github.com/netsampler/goflow2/transport"
"github.com/prometheus/client_golang/prometheus"
)
type StateNFLegacy struct {
stopper
Format format.FormatInterface
Transport transport.TransportInterface
Logger Logger
}
func NewStateNFLegacy() *StateNFLegacy {
return &StateNFLegacy{}
}
func (s *StateNFLegacy) DecodeFlow(msg interface{}) error {
pkt := msg.(BaseMessage)
buf := bytes.NewBuffer(pkt.Payload)
key := pkt.Src.String()
samplerAddress := pkt.Src
if samplerAddress.To4() != nil {
samplerAddress = samplerAddress.To4()
}
ts := uint64(time.Now().UTC().Unix())
if pkt.SetTime {
ts = uint64(pkt.RecvTime.UTC().Unix())
}
timeTrackStart := time.Now()
msgDec, err := netflowlegacy.DecodeMessage(buf)
if err != nil {
switch err.(type) {
case *netflowlegacy.ErrorVersion:
NetFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_version",
}).
Inc()
}
return err
}
switch msgDecConv := msgDec.(type) {
case netflowlegacy.PacketNetFlowV5:
NetFlowStats.With(
prometheus.Labels{
"router": key,
"version": "5",
}).
Inc()
NetFlowSetStatsSum.With(
prometheus.Labels{
"router": key,
"version": "5",
"type": "DataFlowSet",
}).
Add(float64(msgDecConv.Count))
}
var flowMessageSet []*flowmessage.FlowMessage
flowMessageSet, err = producer.ProcessMessageNetFlowLegacy(msgDec)
timeTrackStop := time.Now()
DecoderTime.With(
prometheus.Labels{
"name": "NetFlowV5",
}).
Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.SamplerAddress = samplerAddress
if s.Format != nil {
key, data, err := s.Format.Format(fmsg)
if err != nil && s.Logger != nil {
s.Logger.Error(err)
}
if err == nil && s.Transport != nil {
err = s.Transport.Send(key, data)
if err != nil {
s.Logger.Error(err)
}
}
}
}
return nil
}
func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
if err := s.start(); err != nil {
return err
}
return UDPStoppableRoutine(s.stopCh, "NetFlowV5", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}

224
utils/pipe.go Normal file
View File

@@ -0,0 +1,224 @@
package utils
import (
"bytes"
"fmt"
"sync"
"github.com/netsampler/goflow2/v2/decoders/netflow"
"github.com/netsampler/goflow2/v2/decoders/netflowlegacy"
"github.com/netsampler/goflow2/v2/decoders/sflow"
"github.com/netsampler/goflow2/v2/decoders/utils"
"github.com/netsampler/goflow2/v2/format"
"github.com/netsampler/goflow2/v2/producer"
"github.com/netsampler/goflow2/v2/transport"
"github.com/netsampler/goflow2/v2/utils/templates"
)
type FlowPipe interface {
DecodeFlow(msg interface{}) error
Close()
}
type flowpipe struct {
format format.FormatInterface
transport transport.TransportInterface
producer producer.ProducerInterface
netFlowTemplater templates.TemplateSystemGenerator
}
type PipeConfig struct {
Format format.FormatInterface
Transport transport.TransportInterface
Producer producer.ProducerInterface
NetFlowTemplater templates.TemplateSystemGenerator
}
func (p *flowpipe) formatSend(flowMessageSet []producer.ProducerMessage) error {
for _, msg := range flowMessageSet {
// todo: pass normal
if p.format != nil {
key, data, err := p.format.Format(msg)
if err != nil {
return err
}
if p.transport != nil {
if err = p.transport.Send(key, data); err != nil {
return err
}
}
// send to pool for reuse
}
}
return nil
}
func (p *flowpipe) parseConfig(cfg *PipeConfig) {
p.format = cfg.Format
p.transport = cfg.Transport
p.producer = cfg.Producer
if cfg.NetFlowTemplater != nil {
p.netFlowTemplater = cfg.NetFlowTemplater
} else {
p.netFlowTemplater = templates.DefaultTemplateGenerator
}
}
type SFlowPipe struct {
flowpipe
}
type NetFlowPipe struct {
flowpipe
templateslock *sync.RWMutex
templates map[string]netflow.NetFlowTemplateSystem
}
type PipeMessageError struct {
Message *Message
Err error
}
func (e *PipeMessageError) Error() string {
return fmt.Sprintf("message from %s %s", e.Message.Src.String(), e.Err.Error())
}
func (e *PipeMessageError) Unwrap() error {
return e.Err
}
func NewSFlowPipe(cfg *PipeConfig) *SFlowPipe {
p := &SFlowPipe{}
p.parseConfig(cfg)
return p
}
func (p *SFlowPipe) Close() {
}
func (p *SFlowPipe) DecodeFlow(msg interface{}) error {
pkt, ok := msg.(*Message)
if !ok {
return fmt.Errorf("flow is not *Message")
}
buf := bytes.NewBuffer(pkt.Payload)
//key := pkt.Src.String()
var packet sflow.Packet
if err := sflow.DecodeMessageVersion(buf, &packet); err != nil {
return &PipeMessageError{pkt, err}
}
args := producer.ProduceArgs{
Src: pkt.Src,
Dst: pkt.Dst,
TimeReceived: pkt.Received,
SamplerAddress: pkt.Src.Addr(),
}
if p.producer == nil {
return nil
}
flowMessageSet, err := p.producer.Produce(&packet, &args)
defer p.producer.Commit(flowMessageSet)
if err != nil {
return &PipeMessageError{pkt, err}
}
return p.formatSend(flowMessageSet)
}
func NewNetFlowPipe(cfg *PipeConfig) *NetFlowPipe {
p := &NetFlowPipe{
templateslock: &sync.RWMutex{},
templates: make(map[string]netflow.NetFlowTemplateSystem),
}
p.parseConfig(cfg)
return p
}
func (p *NetFlowPipe) DecodeFlow(msg interface{}) error {
pkt, ok := msg.(*Message)
if !ok {
return fmt.Errorf("flow is not *Message")
}
buf := bytes.NewBuffer(pkt.Payload)
key := pkt.Src.String()
p.templateslock.RLock()
templates, ok := p.templates[key]
p.templateslock.RUnlock()
if !ok {
templates = p.netFlowTemplater(key)
p.templateslock.Lock()
p.templates[key] = templates
p.templateslock.Unlock()
}
var packetV5 netflowlegacy.PacketNetFlowV5
var packetNFv9 netflow.NFv9Packet
var packetIPFIX netflow.IPFIXPacket
// decode the version
var version uint16
if err := utils.BinaryDecoder(buf, &version); err != nil {
return &PipeMessageError{pkt, err}
}
switch version {
case 5:
packetV5.Version = 5
if err := netflowlegacy.DecodeMessage(buf, &packetV5); err != nil {
return &PipeMessageError{pkt, err}
}
case 9:
packetNFv9.Version = 9
if err := netflow.DecodeMessageNetFlow(buf, templates, &packetNFv9); err != nil {
return &PipeMessageError{pkt, err}
}
case 10:
packetIPFIX.Version = 10
if err := netflow.DecodeMessageIPFIX(buf, templates, &packetIPFIX); err != nil {
return &PipeMessageError{pkt, err}
}
default:
return &PipeMessageError{pkt, fmt.Errorf("Not a NetFlow packet")}
}
var flowMessageSet []producer.ProducerMessage
var err error
args := producer.ProduceArgs{
Src: pkt.Src,
Dst: pkt.Dst,
TimeReceived: pkt.Received,
SamplerAddress: pkt.Src.Addr(),
}
if p.producer == nil {
return nil
}
switch version {
case 5:
flowMessageSet, err = p.producer.Produce(&packetV5, &args)
case 9:
flowMessageSet, err = p.producer.Produce(&packetNFv9, &args)
case 10:
flowMessageSet, err = p.producer.Produce(&packetIPFIX, &args)
}
defer p.producer.Commit(flowMessageSet)
if err != nil {
return &PipeMessageError{pkt, err}
}
return p.formatSend(flowMessageSet)
}
func (p *NetFlowPipe) Close() {
}

View File

@@ -1,170 +0,0 @@
package utils
import (
"bytes"
"net"
"time"
"github.com/netsampler/goflow2/decoders/sflow"
"github.com/netsampler/goflow2/format"
flowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
"github.com/netsampler/goflow2/transport"
"github.com/prometheus/client_golang/prometheus"
)
type StateSFlow struct {
stopper
Format format.FormatInterface
Transport transport.TransportInterface
Logger Logger
Config *producer.ProducerConfig
configMapped *producer.ProducerConfigMapped
}
func NewStateSFlow() *StateSFlow {
return &StateSFlow{}
}
func (s *StateSFlow) DecodeFlow(msg interface{}) error {
pkt := msg.(BaseMessage)
buf := bytes.NewBuffer(pkt.Payload)
key := pkt.Src.String()
ts := uint64(time.Now().UTC().Unix())
if pkt.SetTime {
ts = uint64(pkt.RecvTime.UTC().Unix())
}
timeTrackStart := time.Now()
msgDec, err := sflow.DecodeMessage(buf)
if err != nil {
switch err.(type) {
case *sflow.ErrorVersion:
SFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_version",
}).
Inc()
case *sflow.ErrorIPVersion:
SFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_ip_version",
}).
Inc()
case *sflow.ErrorDataFormat:
SFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_data_format",
}).
Inc()
default:
SFlowErrors.With(
prometheus.Labels{
"router": key,
"error": "error_decoding",
}).
Inc()
}
return err
}
switch msgDecConv := msgDec.(type) {
case sflow.Packet:
agentStr := net.IP(msgDecConv.AgentIP).String()
SFlowStats.With(
prometheus.Labels{
"router": key,
"agent": agentStr,
"version": "5",
}).
Inc()
for _, samples := range msgDecConv.Samples {
typeStr := "unknown"
countRec := 0
switch samplesConv := samples.(type) {
case sflow.FlowSample:
typeStr = "FlowSample"
countRec = len(samplesConv.Records)
case sflow.CounterSample:
typeStr = "CounterSample"
if samplesConv.Header.Format == 4 {
typeStr = "Expanded" + typeStr
}
countRec = len(samplesConv.Records)
case sflow.ExpandedFlowSample:
typeStr = "ExpandedFlowSample"
countRec = len(samplesConv.Records)
}
SFlowSampleStatsSum.With(
prometheus.Labels{
"router": key,
"agent": agentStr,
"version": "5",
"type": typeStr,
}).
Inc()
SFlowSampleRecordsStatsSum.With(
prometheus.Labels{
"router": key,
"agent": agentStr,
"version": "5",
"type": typeStr,
}).
Add(float64(countRec))
}
}
var flowMessageSet []*flowmessage.FlowMessage
flowMessageSet, err = producer.ProcessMessageSFlowConfig(msgDec, s.configMapped)
timeTrackStop := time.Now()
DecoderTime.With(
prometheus.Labels{
"name": "sFlow",
}).
Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
fmsg.TimeFlowStart = ts
fmsg.TimeFlowEnd = ts
if s.Format != nil {
key, data, err := s.Format.Format(fmsg)
if err != nil && s.Logger != nil {
s.Logger.Error(err)
}
if err == nil && s.Transport != nil {
err = s.Transport.Send(key, data)
if err != nil {
s.Logger.Error(err)
}
}
}
}
return nil
}
func (s *StateSFlow) initConfig() {
s.configMapped = producer.NewProducerConfigMapped(s.Config)
}
func (s *StateSFlow) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
if err := s.start(); err != nil {
return err
}
s.initConfig()
return UDPStoppableRoutine(s.stopCh, "sFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}

View File

@@ -1,92 +0,0 @@
package utils
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestDecodeFlowExpandedSFlow(t *testing.T) {
msg := BaseMessage{
Src: []byte{},
Port: 1,
Payload: getExpandedSFlowDecode(),
}
s := &StateSFlow{}
assert.Nil(t, s.DecodeFlow(msg))
}
func getExpandedSFlowDecode() []byte {
return []byte{
0, 0, 0, 5, 0, 0, 0, 1, 1, 2, 3, 4, 0, 0, 0, 0, 5, 167, 139, 219, 5, 118,
138, 184, 0, 0, 0, 6, 0, 0, 0, 3, 0, 0, 0, 220, 2, 144, 194, 214, 0, 0, 0, 0,
0, 5, 6, 164, 0, 0, 3, 255, 6, 6, 189, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5,
6, 164, 0, 0, 0, 0, 0, 5, 6, 171, 0, 0, 0, 2, 0, 0, 3, 233, 0, 0, 0, 6,
0, 0, 5, 7, 0, 0, 0, 0, 0, 0, 5, 7, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0,
0, 144, 0, 0, 0, 1, 0, 0, 5, 234, 0, 0, 0, 4, 0, 0, 0, 128, 8, 6, 168, 250,
146, 253, 116, 131, 239, 8, 101, 183, 129, 0, 5, 7, 8, 0, 9, 0, 5, 212, 0, 2, 4, 0,
3, 6, 252, 8, 9, 187, 169, 1, 4, 7, 186, 201, 1, 187, 249, 6, 160, 7, 5, 240, 6, 4,
4, 0, 0, 6, 0, 123, 119, 210, 0, 0, 165, 105, 7, 171, 145, 234, 102, 0, 252, 187, 162, 227,
104, 188, 126, 232, 156, 164, 2, 115, 6, 100, 0, 185, 6, 4, 119, 5, 213, 1, 215, 208, 8, 4,
118, 183, 241, 225, 130, 186, 2, 250, 220, 153, 189, 3, 4, 4, 1, 8, 210, 119, 172, 9, 164, 233,
1, 8, 171, 226, 196, 195, 3, 152, 9, 5, 6, 181, 4, 7, 0, 0, 0, 3, 0, 0, 0, 220,
9, 107, 215, 156, 0, 0, 0, 0, 0, 5, 6, 165, 0, 0, 3, 255, 226, 123, 0, 100, 0, 0,
0, 0, 0, 0, 0, 0, 0, 5, 6, 165, 0, 0, 0, 0, 0, 5, 6, 164, 0, 0, 0, 2,
0, 0, 3, 233, 0, 0, 0, 6, 0, 0, 3, 184, 0, 0, 0, 0, 0, 0, 3, 184, 0, 0,
0, 0, 0, 0, 0, 1, 0, 0, 0, 144, 0, 0, 0, 1, 0, 0, 5, 190, 0, 0, 0, 4,
0, 0, 0, 128, 116, 131, 239, 8, 101, 183, 144, 226, 186, 134, 8, 1, 129, 0, 3, 184, 8, 0,
9, 0, 5, 168, 7, 127, 4, 0, 4, 6, 163, 211, 185, 9, 220, 7, 0, 254, 3, 8, 0, 9,
130, 136, 179, 1, 2, 2, 7, 5, 250, 4, 128, 6, 0, 1, 7, 1, 0, 0, 1, 1, 8, 0,
6, 9, 250, 9, 4, 113, 121, 4, 160, 125, 0, 4, 9, 209, 241, 194, 190, 148, 161, 186, 6, 192,
246, 190, 170, 2, 238, 190, 128, 221, 223, 1, 218, 225, 3, 9, 7, 226, 220, 231, 127, 3, 3, 252,
7, 9, 161, 247, 218, 8, 8, 174, 133, 4, 213, 245, 149, 218, 5, 4, 200, 128, 139, 5, 0, 115,
0, 0, 0, 3, 0, 0, 0, 220, 2, 144, 194, 215, 0, 0, 0, 0, 0, 5, 6, 164, 0, 0,
3, 255, 6, 6, 253, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 6, 164, 0, 0, 0, 0,
0, 5, 6, 171, 0, 0, 0, 2, 0, 0, 3, 233, 0, 0, 0, 6, 0, 0, 0, 104, 0, 0,
0, 0, 0, 0, 0, 104, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 144, 0, 0, 0, 1,
0, 0, 5, 242, 0, 0, 0, 4, 0, 0, 0, 128, 116, 131, 239, 7, 9, 1, 116, 131, 239, 8,
101, 183, 129, 0, 0, 104, 8, 0, 9, 0, 5, 220, 152, 143, 4, 0, 1, 6, 5, 179, 9, 187,
191, 101, 190, 2, 144, 182, 0, 0, 130, 4, 252, 4, 160, 192, 138, 8, 219, 124, 128, 6, 0, 235,
180, 213, 0, 0, 1, 1, 8, 0, 9, 124, 6, 1, 9, 1, 252, 3, 194, 8, 195, 209, 115, 1,
5, 152, 204, 2, 6, 4, 1, 119, 254, 9, 1, 170, 0, 192, 2, 7, 190, 9, 149, 5, 101, 2,
128, 122, 0, 190, 1, 109, 188, 175, 4, 8, 152, 1, 142, 108, 2, 100, 2, 124, 125, 195, 5, 8,
233, 126, 7, 4, 243, 4, 3, 153, 0, 0, 0, 3, 0, 0, 0, 220, 5, 1, 150, 6, 0, 0,
0, 0, 0, 5, 6, 167, 0, 0, 3, 255, 6, 5, 105, 220, 0, 0, 0, 0, 0, 0, 0, 0,
0, 5, 6, 167, 0, 0, 0, 0, 0, 5, 6, 164, 0, 0, 0, 2, 0, 0, 3, 233, 0, 0,
0, 6, 0, 0, 5, 7, 0, 0, 0, 0, 0, 0, 5, 7, 0, 0, 0, 0, 0, 0, 0, 1,
0, 0, 0, 144, 0, 0, 0, 1, 0, 0, 2, 2, 0, 0, 0, 4, 0, 0, 0, 128, 116, 131,
239, 8, 101, 183, 152, 3, 130, 1, 196, 153, 129, 0, 5, 7, 8, 0, 9, 0, 2, 0, 0, 0,
4, 0, 126, 7, 119, 188, 185, 9, 221, 8, 2, 116, 144, 0, 9, 139, 3, 112, 2, 0, 8, 124,
255, 251, 0, 0, 131, 2, 0, 0, 0, 246, 3, 3, 107, 5, 0, 0, 0, 0, 9, 173, 2, 217,
6, 248, 0, 0, 9, 173, 2, 217, 8, 248, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 6, 9, 153,
215, 157, 0, 255, 0, 8, 1, 0, 9, 8, 9, 6, 164, 103, 9, 5, 0, 0, 0, 3, 0, 0,
0, 152, 5, 201, 2, 175, 0, 0, 0, 0, 0, 5, 6, 5, 0, 0, 3, 255, 1, 8, 9, 1,
0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 6, 5, 0, 0, 0, 0, 0, 5, 6, 164, 0, 0,
0, 2, 0, 0, 3, 233, 0, 0, 0, 6, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 6, 0, 0, 0, 1, 0, 0, 0, 4, 0, 0,
0, 4, 0, 0, 0, 0, 116, 131, 239, 8, 101, 183, 218, 177, 4, 251, 217, 207, 8, 0, 9, 0,
0, 8, 0, 0, 0, 0, 9, 7, 8, 161, 106, 3, 109, 6, 185, 9, 220, 215, 0, 123, 9, 184,
0, 8, 116, 122, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 3, 130, 6,
0, 0, 0, 3, 0, 0, 0, 220, 2, 144, 194, 216, 0, 0, 0, 0, 0, 5, 6, 164, 0, 0,
3, 255, 6, 7, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 6, 164, 0, 0, 0, 0,
0, 5, 6, 165, 0, 0, 0, 2, 0, 0, 3, 233, 0, 0, 0, 6, 0, 0, 3, 202, 0, 0,
0, 0, 0, 0, 3, 202, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 144, 0, 0, 0, 1,
0, 0, 5, 242, 0, 0, 0, 4, 0, 0, 0, 128, 144, 226, 186, 135, 4, 241, 116, 131, 239, 8,
101, 183, 129, 0, 3, 202, 8, 0, 9, 0, 5, 220, 147, 0, 4, 0, 7, 6, 225, 131, 1, 159,
7, 185, 195, 181, 170, 8, 9, 117, 7, 175, 8, 3, 191, 135, 190, 150, 196, 102, 0, 6, 0, 119,
116, 113, 0, 0, 201, 244, 240, 206, 2, 117, 4, 139, 8, 4, 240, 223, 247, 123, 6, 0, 239, 0,
9, 116, 152, 153, 191, 0, 124, 2, 7, 8, 3, 178, 166, 150, 3, 218, 163, 175, 121, 8, 4, 210,
4, 5, 166, 5, 178, 1, 6, 222, 172, 186, 6, 241, 232, 8, 188, 192, 2, 220, 128, 1, 8, 7,
194, 130, 220, 5, 2, 0, 158, 195, 0, 4, 3, 2, 160, 158, 157, 2, 102, 3, 7, 3, 0, 0,
1, 3, 3, 4, 1, 1, 4, 2, 187, 255, 188, 3, 4, 138, 9, 180, 104, 233, 212, 239, 123, 237,
112, 8, 133, 129, 152, 138, 7, 195, 8, 171, 237, 3, 4, 223, 116, 214, 151, 9, 151, 102, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0,
}
}

View File

@@ -1,33 +0,0 @@
package utils
import (
"errors"
)
// ErrAlreadyStarted error happens when you try to start twice a flow routine
var ErrAlreadyStarted = errors.New("the routine is already started")
// stopper mechanism, common for all the flow routines
type stopper struct {
stopCh chan struct{}
}
func (s *stopper) start() error {
if s.stopCh != nil {
return ErrAlreadyStarted
}
s.stopCh = make(chan struct{})
return nil
}
func (s *stopper) Shutdown() {
if s.stopCh != nil {
select {
case <-s.stopCh:
default:
close(s.stopCh)
}
s.stopCh = nil
}
}

View File

@@ -1,51 +0,0 @@
package utils
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStopper(t *testing.T) {
r := routine{}
require.False(t, r.Running)
require.NoError(t, r.StartRoutine())
assert.True(t, r.Running)
r.Shutdown()
assert.Eventually(t, func() bool {
return r.Running == false
}, time.Second, time.Millisecond)
// after shutdown, we can start it again
require.NoError(t, r.StartRoutine())
assert.True(t, r.Running)
}
func TestStopper_CannotStartTwice(t *testing.T) {
r := routine{}
require.False(t, r.Running)
require.NoError(t, r.StartRoutine())
assert.ErrorIs(t, r.StartRoutine(), ErrAlreadyStarted)
}
type routine struct {
stopper
Running bool
}
func (p *routine) StartRoutine() error {
if err := p.start(); err != nil {
return err
}
p.Running = true
waitForGoRoutine := make(chan struct{})
go func() {
close(waitForGoRoutine)
<-p.stopCh
p.Running = false
}()
<-waitForGoRoutine
return nil
}

View File

@@ -0,0 +1,14 @@
package templates
import (
"github.com/netsampler/goflow2/v2/decoders/netflow"
)
// Function that Create Template Systems.
// This is meant to be used by a pipe
type TemplateSystemGenerator func(key string) netflow.NetFlowTemplateSystem
// Default template generator
func DefaultTemplateGenerator(key string) netflow.NetFlowTemplateSystem {
return netflow.CreateTemplateSystem()
}

278
utils/udp.go Normal file
View File

@@ -0,0 +1,278 @@
package utils
import (
"fmt"
"net"
"net/netip"
"sync"
"time"
reuseport "github.com/libp2p/go-reuseport"
)
// Callback used to decode a UDP message
type DecoderFunc func(msg interface{}) error
type udpPacket struct {
src *net.UDPAddr
dst *net.UDPAddr
size int
payload []byte
received time.Time
}
type Message struct {
Src netip.AddrPort
Dst netip.AddrPort
Payload []byte
Received time.Time
}
var packetPool = sync.Pool{
New: func() any {
return &udpPacket{
payload: make([]byte, 9000),
}
},
}
type UDPReceiver struct {
ready chan bool
q chan bool
wg *sync.WaitGroup
dispatch chan *udpPacket
errCh chan error // linked to receiver, never closed
decodersCnt int
blocking bool
workers int
sockets int
}
type UDPReceiverConfig struct {
Workers int
Sockets int
Blocking bool
QueueSize int
}
func NewUDPReceiver(cfg *UDPReceiverConfig) (*UDPReceiver, error) {
r := &UDPReceiver{
wg: &sync.WaitGroup{},
sockets: 2,
workers: 2,
ready: make(chan bool),
errCh: make(chan error),
}
dispatchSize := 1000000
if cfg != nil {
if cfg.Sockets <= 0 {
cfg.Sockets = 1
}
if cfg.Workers <= 0 {
cfg.Workers = cfg.Sockets
}
r.sockets = cfg.Sockets
r.workers = cfg.Workers
dispatchSize = cfg.QueueSize
r.blocking = cfg.Blocking
}
if dispatchSize == 0 {
r.dispatch = make(chan *udpPacket) // synchronous mode
} else {
r.dispatch = make(chan *udpPacket, dispatchSize)
}
err := r.init()
return r, err
}
// Initialize channels that are related to a session
// Once the user calls Stop, they can restart the capture
func (r *UDPReceiver) init() error {
r.q = make(chan bool)
r.decodersCnt = 0
select {
case <-r.ready:
return fmt.Errorf("receiver is already stopped")
default:
close(r.ready)
}
return nil
}
func (r *UDPReceiver) logError(err error) {
select {
case r.errCh <- err:
default:
}
}
func (r *UDPReceiver) Errors() <-chan error {
return r.errCh
}
func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {
pconn, err := reuseport.ListenPacket("udp", fmt.Sprintf("%s:%d", addr, port))
close(started)
if err != nil {
return err
}
q := make(chan bool)
// function to quit
go func() {
select {
case <-q: // if routine has exited before
case <-r.q: // upon general close
}
pconn.Close()
}()
defer close(q)
udpconn, ok := pconn.(*net.UDPConn)
if !ok {
return err
}
localAddr, _ := udpconn.LocalAddr().(*net.UDPAddr)
for {
pkt := packetPool.Get().(*udpPacket)
pkt.size, pkt.src, err = udpconn.ReadFromUDP(pkt.payload)
if err != nil {
packetPool.Put(pkt)
return err
}
pkt.dst = localAddr
pkt.received = time.Now().UTC()
if pkt.size == 0 {
// error
continue
}
if r.blocking {
// does not drop
// if combined with synchronous mode
select {
case r.dispatch <- pkt:
case <-r.q:
return nil
}
} else {
select {
case r.dispatch <- pkt:
case <-r.q:
return nil
default:
packetPool.Put(pkt)
// increase counter
}
}
}
}
type ReceiverError struct {
Err error
}
func (e *ReceiverError) Error() string {
return "receiver: " + e.Err.Error()
}
func (e *ReceiverError) Unwrap() error {
return e.Err
}
// Start the processing routines
func (r *UDPReceiver) decoders(workers int, decodeFunc DecoderFunc) error {
for i := 0; i < workers; i++ {
r.wg.Add(1)
r.decodersCnt += 1
go func() {
defer r.wg.Done()
for pkt := range r.dispatch {
if pkt == nil {
return
}
if decodeFunc != nil {
msg := Message{
Src: pkt.src.AddrPort(),
Dst: pkt.dst.AddrPort(),
Payload: pkt.payload[0:pkt.size],
Received: pkt.received,
}
if err := decodeFunc(&msg); err != nil {
r.logError(&ReceiverError{err})
}
}
packetPool.Put(pkt)
}
}()
}
return nil
}
// Starts the UDP receiving workers
func (r *UDPReceiver) receivers(sockets int, addr string, port int) error {
for i := 0; i < sockets; i++ {
r.wg.Add(1)
started := make(chan bool)
go func() {
defer r.wg.Done()
if err := r.receive(addr, port, started); err != nil {
r.logError(&ReceiverError{err})
}
}()
<-started
}
return nil
}
// Start UDP receivers and the processing routines
func (r *UDPReceiver) Start(addr string, port int, decodeFunc DecoderFunc) error {
select {
case <-r.ready:
r.ready = make(chan bool)
default:
return fmt.Errorf("receiver is already started")
}
if err := r.decoders(r.workers, decodeFunc); err != nil {
return err
}
if err := r.receivers(r.workers, addr, port); err != nil {
return err
}
return nil
}
// Stops the routines
func (r *UDPReceiver) Stop() error {
select {
case <-r.q:
default:
close(r.q)
}
for i := 0; i < r.decodersCnt; i++ {
r.dispatch <- nil
}
r.wg.Wait()
return r.init() // recreates the closed channels
}

63
utils/udp_test.go Normal file
View File

@@ -0,0 +1,63 @@
package utils
import (
"fmt"
"net"
"testing"
//"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestUDPReceiver(t *testing.T) {
addr := "[::1]"
port, err := getFreeUDPPort()
require.NoError(t, err)
t.Logf("starting UDP receiver on %s:%d\n", addr, port)
r, err := NewUDPReceiver(nil)
require.NoError(t, err)
require.NoError(t, r.Start(addr, port, nil))
sendMessage := func(msg string) error {
conn, err := net.Dial("udp", fmt.Sprintf("%s:%d", addr, port))
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Write([]byte(msg))
return err
}
require.NoError(t, sendMessage("message"))
t.Log("sending message\n")
require.NoError(t, r.Stop())
}
func TestUDPClose(t *testing.T) {
addr := "[::1]"
port, err := getFreeUDPPort()
require.NoError(t, err)
t.Logf("starting UDP receiver on %s:%d\n", addr, port)
r, err := NewUDPReceiver(nil)
require.NoError(t, err)
require.NoError(t, r.Start(addr, port, nil))
require.NoError(t, r.Stop())
require.NoError(t, r.Start(addr, port, nil))
require.Error(t, r.Start(addr, port, nil))
require.NoError(t, r.Stop())
require.Error(t, r.Stop())
}
func getFreeUDPPort() (int, error) {
a, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
return 0, err
}
l, err := net.ListenUDP("udp", a)
if err != nil {
return 0, err
}
defer l.Close()
return l.LocalAddr().(*net.UDPAddr).Port, nil
}

View File

@@ -1,234 +0,0 @@
package utils
import (
"errors"
"fmt"
"io"
"net"
"strconv"
"sync"
"time"
reuseport "github.com/libp2p/go-reuseport"
decoder "github.com/netsampler/goflow2/decoders"
"github.com/netsampler/goflow2/decoders/netflow"
flowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
)
type ProducerConfig *producer.ProducerConfig
func LoadMapping(f io.Reader) (ProducerConfig, error) {
config := &producer.ProducerConfig{}
dec := yaml.NewDecoder(f)
err := dec.Decode(config)
return config, err
}
func GetServiceAddresses(srv string) (addrs []string, err error) {
_, srvs, err := net.LookupSRV("", "", srv)
if err != nil {
return nil, errors.New(fmt.Sprintf("Service discovery: %v\n", err))
}
for _, srv := range srvs {
addrs = append(addrs, net.JoinHostPort(srv.Target, strconv.Itoa(int(srv.Port))))
}
return addrs, nil
}
type Logger interface {
Printf(string, ...interface{})
Errorf(string, ...interface{})
Warnf(string, ...interface{})
Warn(...interface{})
Error(...interface{})
Debug(...interface{})
Debugf(string, ...interface{})
Infof(string, ...interface{})
Fatalf(string, ...interface{})
}
type BaseMessage struct {
Src net.IP
Port int
Payload []byte
SetTime bool
RecvTime time.Time
}
type Transport interface {
Send([]*flowmessage.FlowMessage)
}
type Formatter interface {
Format([]*flowmessage.FlowMessage)
}
/*
type DefaultLogTransport struct {
}
func (s *DefaultLogTransport) Publish(msgs []*flowmessage.FlowMessage) {
for _, msg := range msgs {
fmt.Printf("%v\n", FlowMessageToString(msg))
}
}
type DefaultJSONTransport struct {
}
func (s *DefaultJSONTransport) Publish(msgs []*flowmessage.FlowMessage) {
for _, msg := range msgs {
fmt.Printf("%v\n", FlowMessageToJSON(msg))
}
}
*/
type DefaultErrorCallback struct {
Logger Logger
}
func (cb *DefaultErrorCallback) Callback(name string, id int, start, end time.Time, err error) {
if _, ok := err.(*netflow.ErrorTemplateNotFound); ok {
return
}
if cb.Logger != nil {
cb.Logger.Errorf("Error from: %v (%v) duration: %v. %v", name, id, end.Sub(start), err)
}
}
func UDPRoutine(name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse bool, logger Logger) error {
return UDPStoppableRoutine(make(chan struct{}), name, decodeFunc, workers, addr, port, sockReuse, logger)
}
// UDPStoppableRoutine runs a UDPRoutine that can be stopped by closing the stopCh passed as argument
func UDPStoppableRoutine(stopCh <-chan struct{}, name string, decodeFunc decoder.DecoderFunc, workers int, addr string, port int, sockReuse bool, logger Logger) error {
ecb := DefaultErrorCallback{
Logger: logger,
}
decoderParams := decoder.DecoderParams{
DecoderFunc: decodeFunc,
DoneCallback: DefaultAccountCallback,
ErrorCallback: ecb.Callback,
}
processor := decoder.CreateProcessor(workers, decoderParams, name)
processor.Start()
addrUDP := net.UDPAddr{
IP: net.ParseIP(addr),
Port: port,
}
var udpconn *net.UDPConn
var err error
if sockReuse {
pconn, err := reuseport.ListenPacket("udp", addrUDP.String())
if err != nil {
return err
}
defer pconn.Close()
var ok bool
udpconn, ok = pconn.(*net.UDPConn)
if !ok {
return err
}
} else {
udpconn, err = net.ListenUDP("udp", &addrUDP)
if err != nil {
return err
}
defer udpconn.Close()
}
payload := make([]byte, 9000)
localIP := addrUDP.IP.String()
if addrUDP.IP == nil {
localIP = ""
}
type udpData struct {
size int
pktAddr *net.UDPAddr
payload []byte
}
udpDataCh := make(chan udpData)
defer close(udpDataCh)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
u := udpData{}
u.size, u.pktAddr, _ = udpconn.ReadFromUDP(payload)
if u.size == 0 { // Ignore 0 byte packets.
continue
}
u.payload = make([]byte, u.size)
copy(u.payload, payload[0:u.size])
select {
case <-stopCh:
return
default:
udpDataCh <- u
}
}
}()
func() {
for {
select {
case u := <-udpDataCh:
process(u.size, u.payload, u.pktAddr, processor, localIP, addrUDP, name)
case <-stopCh:
return
}
}
}()
for _ = range udpDataCh {
// drain
}
wg.Wait()
return nil
}
func process(size int, payload []byte, pktAddr *net.UDPAddr, processor decoder.Processor, localIP string, addrUDP net.UDPAddr, name string) {
baseMessage := BaseMessage{
Src: pktAddr.IP,
Port: pktAddr.Port,
Payload: payload,
}
processor.ProcessMessage(baseMessage)
MetricTrafficBytes.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Add(float64(size))
MetricTrafficPackets.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Inc()
MetricPacketSizeSum.With(
prometheus.Labels{
"remote_ip": pktAddr.IP.String(),
"local_ip": localIP,
"local_port": strconv.Itoa(addrUDP.Port),
"type": name,
}).
Observe(float64(size))
}

View File

@@ -1,93 +0,0 @@
package utils
import (
"fmt"
"net"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCancelUDPRoutine(t *testing.T) {
testTimeout := time.After(10 * time.Second)
port, err := getFreeUDPPort()
require.NoError(t, err)
dp := dummyFlowProcessor{}
go func() {
require.NoError(t, dp.FlowRoutine("127.0.0.1", port))
}()
// wait slightly so we give time to the server to accept requests
time.Sleep(100 * time.Millisecond)
sendMessage := func(msg string) error {
conn, err := net.Dial("udp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Write([]byte(msg))
return err
}
require.NoError(t, sendMessage("message 1"))
require.NoError(t, sendMessage("message 2"))
require.NoError(t, sendMessage("message 3"))
readMessage := func() string {
select {
case msg := <-dp.receivedMessages:
return string(msg.(BaseMessage).Payload)
case <-testTimeout:
require.Fail(t, "test timed out while waiting for message")
return ""
}
}
// in UDP, messages might arrive out of order or duplicate, so whe just verify they arrive
// to avoid flaky tests
require.Contains(t, []string{"message 1", "message 2", "message 3"}, readMessage())
require.Contains(t, []string{"message 1", "message 2", "message 3"}, readMessage())
require.Contains(t, []string{"message 1", "message 2", "message 3"}, readMessage())
dp.Shutdown()
time.Sleep(100 * time.Millisecond)
_ = sendMessage("no more messages should be processed")
select {
case msg := <-dp.receivedMessages:
assert.Fail(t, fmt.Sprint(msg))
default:
// everything is correct
}
}
type dummyFlowProcessor struct {
stopper
receivedMessages chan interface{}
}
func (d *dummyFlowProcessor) FlowRoutine(host string, port int) error {
_ = d.start()
d.receivedMessages = make(chan interface{})
return UDPStoppableRoutine(d.stopCh, "test_udp", func(msg interface{}) error {
d.receivedMessages <- msg
return nil
}, 3, host, port, false, logrus.StandardLogger())
}
func getFreeUDPPort() (int, error) {
a, err := net.ResolveUDPAddr("udp", "127.0.0.1:0")
if err != nil {
return 0, err
}
l, err := net.ListenUDP("udp", a)
if err != nil {
return 0, err
}
defer l.Close()
return l.LocalAddr().(*net.UDPAddr).Port, nil
}