mirror of
				https://github.com/openobserve/goflow2.git
				synced 2025-11-03 21:43:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			100 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			100 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package utils
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/netsampler/goflow2/decoders/netflowlegacy"
 | 
						|
	"github.com/netsampler/goflow2/format"
 | 
						|
	flowmessage "github.com/netsampler/goflow2/pb"
 | 
						|
	"github.com/netsampler/goflow2/producer"
 | 
						|
	"github.com/netsampler/goflow2/transport"
 | 
						|
	"github.com/prometheus/client_golang/prometheus"
 | 
						|
)
 | 
						|
 | 
						|
type StateNFLegacy struct {
 | 
						|
	Format    format.FormatInterface
 | 
						|
	Transport transport.TransportInterface
 | 
						|
	Logger    Logger
 | 
						|
}
 | 
						|
 | 
						|
func (s *StateNFLegacy) DecodeFlow(msg interface{}) error {
 | 
						|
	pkt := msg.(BaseMessage)
 | 
						|
	buf := bytes.NewBuffer(pkt.Payload)
 | 
						|
	key := pkt.Src.String()
 | 
						|
	samplerAddress := pkt.Src
 | 
						|
	if samplerAddress.To4() != nil {
 | 
						|
		samplerAddress = samplerAddress.To4()
 | 
						|
	}
 | 
						|
 | 
						|
	ts := uint64(time.Now().UTC().Unix())
 | 
						|
	if pkt.SetTime {
 | 
						|
		ts = uint64(pkt.RecvTime.UTC().Unix())
 | 
						|
	}
 | 
						|
 | 
						|
	timeTrackStart := time.Now()
 | 
						|
	msgDec, err := netflowlegacy.DecodeMessage(buf)
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		switch err.(type) {
 | 
						|
		case *netflowlegacy.ErrorVersion:
 | 
						|
			NetFlowErrors.With(
 | 
						|
				prometheus.Labels{
 | 
						|
					"router": key,
 | 
						|
					"error":  "error_version",
 | 
						|
				}).
 | 
						|
				Inc()
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	switch msgDecConv := msgDec.(type) {
 | 
						|
	case netflowlegacy.PacketNetFlowV5:
 | 
						|
		NetFlowStats.With(
 | 
						|
			prometheus.Labels{
 | 
						|
				"router":  key,
 | 
						|
				"version": "5",
 | 
						|
			}).
 | 
						|
			Inc()
 | 
						|
		NetFlowSetStatsSum.With(
 | 
						|
			prometheus.Labels{
 | 
						|
				"router":  key,
 | 
						|
				"version": "5",
 | 
						|
				"type":    "DataFlowSet",
 | 
						|
			}).
 | 
						|
			Add(float64(msgDecConv.Count))
 | 
						|
	}
 | 
						|
 | 
						|
	var flowMessageSet []*flowmessage.FlowMessage
 | 
						|
	flowMessageSet, err = producer.ProcessMessageNetFlowLegacy(msgDec)
 | 
						|
 | 
						|
	timeTrackStop := time.Now()
 | 
						|
	DecoderTime.With(
 | 
						|
		prometheus.Labels{
 | 
						|
			"name": "NetFlowV5",
 | 
						|
		}).
 | 
						|
		Observe(float64((timeTrackStop.Sub(timeTrackStart)).Nanoseconds()) / 1000)
 | 
						|
 | 
						|
	for _, fmsg := range flowMessageSet {
 | 
						|
		fmsg.TimeReceived = ts
 | 
						|
		fmsg.SamplerAddress = samplerAddress
 | 
						|
 | 
						|
		if s.Format != nil {
 | 
						|
			key, data, err := s.Format.Format(fmsg)
 | 
						|
 | 
						|
			if err != nil && s.Logger != nil {
 | 
						|
				s.Logger.Error(err)
 | 
						|
			}
 | 
						|
			if err == nil && s.Transport != nil {
 | 
						|
				s.Transport.Send(key, data)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *StateNFLegacy) FlowRoutine(workers int, addr string, port int, reuseport bool) error {
 | 
						|
	return UDPRoutine("NetFlowV5", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
 | 
						|
}
 |