From c4c4ffb4e481d7b9ba216b074551fadffa77fe4a Mon Sep 17 00:00:00 2001 From: Louis Date: Sun, 16 Apr 2023 06:52:32 +0300 Subject: [PATCH] Refactor template as module (#49) * Refactor template as module * Abstraction to allow custom template storage (eg: file, redis, http...) * Works similarly to Transport and Format --- cmd/goflow2/main.go | 23 ++- decoders/netflow/netflow.go | 91 ++------- decoders/netflow/netflow_test.go | 6 +- decoders/netflow/templates/file/file.go | 204 ++++++++++++++++++++ decoders/netflow/templates/memory/memory.go | 73 +++++++ decoders/netflow/templates/templates.go | 139 +++++++++++++ format/format.go | 2 + utils/netflow.go | 50 ++--- utils/nflegacy.go | 4 + utils/sflow.go | 4 + 10 files changed, 492 insertions(+), 104 deletions(-) create mode 100644 decoders/netflow/templates/file/file.go create mode 100644 decoders/netflow/templates/memory/memory.go create mode 100644 decoders/netflow/templates/templates.go diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index 63345d4..1767f55 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -22,6 +22,11 @@ import ( _ "github.com/netsampler/goflow2/transport/file" _ "github.com/netsampler/goflow2/transport/kafka" + // import various NetFlow/IPFIX templates + "github.com/netsampler/goflow2/decoders/netflow/templates" + _ "github.com/netsampler/goflow2/decoders/netflow/templates/file" + _ "github.com/netsampler/goflow2/decoders/netflow/templates/memory" + "github.com/netsampler/goflow2/utils" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" @@ -39,6 +44,8 @@ var ( LogLevel = flag.String("loglevel", "info", "Log level") LogFmt = flag.String("logfmt", "normal", "Log formatter") + NetFlowTemplates = flag.String("netflow.templates", "memory", fmt.Sprintf("Choose the format (available: %s)", strings.Join(templates.GetTemplates(), ", "))) + Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", "))) Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", "))) @@ -95,6 +102,13 @@ func main() { } defer transporter.Close(ctx) + // the following is only useful when parsing NetFlowV9/IPFIX (template-based flow) + templateSystem, err := templates.FindTemplateSystem(ctx, *NetFlowTemplates) + if err != nil { + log.Fatal(err) + } + defer templateSystem.Close(ctx) + switch *LogFmt { case "json": log.SetFormatter(&log.JSONFormatter{}) @@ -154,10 +168,11 @@ func main() { err = sSFlow.FlowRoutine(*Workers, hostname, int(port), *ReusePort) } else if listenAddrUrl.Scheme == "netflow" { sNF := &utils.StateNetFlow{ - Format: formatter, - Transport: transporter, - Logger: log.StandardLogger(), - Config: config, + Format: formatter, + Transport: transporter, + Logger: log.StandardLogger(), + Config: config, + TemplateSystem: templateSystem, } err = sNF.FlowRoutine(*Workers, hostname, int(port), *ReusePort) } else if listenAddrUrl.Scheme == "nfl" { diff --git a/decoders/netflow/netflow.go b/decoders/netflow/netflow.go index 156dd00..0e332d6 100644 --- a/decoders/netflow/netflow.go +++ b/decoders/netflow/netflow.go @@ -2,20 +2,14 @@ package netflow import ( "bytes" + "context" "encoding/binary" "fmt" - "sync" + "github.com/netsampler/goflow2/decoders/netflow/templates" "github.com/netsampler/goflow2/decoders/utils" ) -type FlowBaseTemplateSet map[uint16]map[uint32]map[uint16]interface{} - -type NetFlowTemplateSystem interface { - GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) - AddTemplate(version uint16, obsDomainId uint32, template interface{}) -} - func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateRecord, error) { var records []NFv9OptionsTemplateRecord var err error @@ -249,66 +243,11 @@ func DecodeDataSet(version uint16, payload *bytes.Buffer, listFields []Field) ([ return records, nil } -func (ts *BasicTemplateSystem) GetTemplates() map[uint16]map[uint32]map[uint16]interface{} { - ts.templateslock.RLock() - tmp := ts.templates - ts.templateslock.RUnlock() - return tmp +func DecodeMessage(payload *bytes.Buffer, templates templates.TemplateInterface) (interface{}, error) { + return DecodeMessageContext(context.Background(), payload, "", templates) } -func (ts *BasicTemplateSystem) AddTemplate(version uint16, obsDomainId uint32, template interface{}) { - ts.templateslock.Lock() - defer ts.templateslock.Unlock() - _, exists := ts.templates[version] - if exists != true { - ts.templates[version] = make(map[uint32]map[uint16]interface{}) - } - _, exists = ts.templates[version][obsDomainId] - if exists != true { - ts.templates[version][obsDomainId] = make(map[uint16]interface{}) - } - var templateId uint16 - switch templateIdConv := template.(type) { - case IPFIXOptionsTemplateRecord: - templateId = templateIdConv.TemplateId - case NFv9OptionsTemplateRecord: - templateId = templateIdConv.TemplateId - case TemplateRecord: - templateId = templateIdConv.TemplateId - } - ts.templates[version][obsDomainId][templateId] = template -} - -func (ts *BasicTemplateSystem) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) { - ts.templateslock.RLock() - defer ts.templateslock.RUnlock() - templatesVersion, okver := ts.templates[version] - if okver { - templatesObsDom, okobs := templatesVersion[obsDomainId] - if okobs { - template, okid := templatesObsDom[templateId] - if okid { - return template, nil - } - } - } - return nil, NewErrorTemplateNotFound(version, obsDomainId, templateId, "info") -} - -type BasicTemplateSystem struct { - templates FlowBaseTemplateSet - templateslock *sync.RWMutex -} - -func CreateTemplateSystem() *BasicTemplateSystem { - ts := &BasicTemplateSystem{ - templates: make(FlowBaseTemplateSet), - templateslock: &sync.RWMutex{}, - } - return ts -} - -func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (interface{}, error) { +func DecodeMessageContext(ctx context.Context, payload *bytes.Buffer, templateKey string, tpli templates.TemplateInterface) (interface{}, error) { var size uint16 packetNFv9 := NFv9Packet{} packetIPFIX := IPFIXPacket{} @@ -368,9 +307,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte flowSet = templatefs - if templates != nil { + if tpli != nil { for _, record := range records { - templates.AddTemplate(version, obsDomainId, record) + tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record) } } @@ -386,9 +325,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte } flowSet = optsTemplatefs - if templates != nil { + if tpli != nil { for _, record := range records { - templates.AddTemplate(version, obsDomainId, record) + tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record) } } @@ -404,9 +343,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte } flowSet = templatefs - if templates != nil { + if tpli != nil { for _, record := range records { - templates.AddTemplate(version, obsDomainId, record) + tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record) } } @@ -422,20 +361,20 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte } flowSet = optsTemplatefs - if templates != nil { + if tpli != nil { for _, record := range records { - templates.AddTemplate(version, obsDomainId, record) + tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record) } } } else if fsheader.Id >= 256 { dataReader := bytes.NewBuffer(payload.Next(nextrelpos)) - if templates == nil { + if tpli == nil { continue } - template, err := templates.GetTemplate(version, obsDomainId, fsheader.Id) + template, err := tpli.GetTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, fsheader.Id)) if err == nil { switch templatec := template.(type) { diff --git a/decoders/netflow/netflow_test.go b/decoders/netflow/netflow_test.go index 5097c7d..2c5e8bd 100644 --- a/decoders/netflow/netflow_test.go +++ b/decoders/netflow/netflow_test.go @@ -2,13 +2,17 @@ package netflow import ( "bytes" + "context" "testing" + "github.com/netsampler/goflow2/decoders/netflow/templates/memory" + "github.com/stretchr/testify/assert" ) func TestDecodeNetFlowV9(t *testing.T) { - templates := CreateTemplateSystem() + templates := &memory.MemoryDriver{} + templates.Init(context.Background()) // Decode a template template := []byte{ diff --git a/decoders/netflow/templates/file/file.go b/decoders/netflow/templates/file/file.go new file mode 100644 index 0000000..476babb --- /dev/null +++ b/decoders/netflow/templates/file/file.go @@ -0,0 +1,204 @@ +package file + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "github.com/netsampler/goflow2/decoders/netflow" + "github.com/netsampler/goflow2/decoders/netflow/templates" + "github.com/netsampler/goflow2/decoders/netflow/templates/memory" + "os" + "sync" +) + +type TemplateFileObject struct { + Key *templates.TemplateKey + Data *TemplateFileData +} + +type TemplateFileData struct { + Type string + Data interface{} +} + +func (d *TemplateFileData) UnmarshalJSON(b []byte) error { + var s struct { + Type string + Data interface{} `json:"-"` + } + if err := json.Unmarshal(b, &s); err != nil { + return err + } + + switch s.Type { + case "NFv9OptionsTemplateRecord": + newS := new(struct { + Type string + Data netflow.NFv9OptionsTemplateRecord + }) + if err := json.Unmarshal(b, newS); err != nil { + return err + } + d.Type = newS.Type + d.Data = newS.Data + case "TemplateRecord": + newS := new(struct { + Type string + Data netflow.TemplateRecord + }) + if err := json.Unmarshal(b, newS); err != nil { + return err + } + d.Type = newS.Type + d.Data = newS.Data + case "IPFIXOptionsTemplateRecord": + newS := new(struct { + Type string + Data netflow.IPFIXOptionsTemplateRecord + }) + if err := json.Unmarshal(b, newS); err != nil { + return err + } + d.Type = newS.Type + d.Data = newS.Data + } + + return nil +} + +type TemplateFile struct { + Templates []*TemplateFileObject `json:"templates"` +} + +func (f *TemplateFile) Add(key *templates.TemplateKey, data interface{}) { + var typeName string + + switch data.(type) { + case netflow.NFv9OptionsTemplateRecord: + typeName = "NFv9OptionsTemplateRecord" + case netflow.TemplateRecord: + typeName = "TemplateRecord" + case netflow.IPFIXOptionsTemplateRecord: + typeName = "IPFIXOptionsTemplateRecord" + default: + return + } + + f.Templates = append(f.Templates, &TemplateFileObject{ + Key: key, + Data: &TemplateFileData{ + Type: typeName, + Data: data, + }, + }) +} + +func NewTemplateFile() *TemplateFile { + return &TemplateFile{ + Templates: make([]*TemplateFileObject, 0), + } +} + +type FileDriver struct { + memDriver *memory.MemoryDriver + path string + lock *sync.Mutex +} + +func (d *FileDriver) Prepare() error { + d.memDriver = memory.Driver + d.lock = &sync.Mutex{} + flag.StringVar(&d.path, "netflow.templates.file.path", "./templates.json", "Path of file to store templates") + return nil +} + +func (d *FileDriver) Init(ctx context.Context) error { + var err error + if err = d.memDriver.Init(ctx); err != nil { + return err + } + + f, err := os.OpenFile(d.path, os.O_RDWR|os.O_CREATE, 0755) + if err != nil { + return err + } + defer f.Close() + dec := json.NewDecoder(f) + tf := NewTemplateFile() + if err = dec.Decode(tf); err != nil { + // log error + } + for _, template := range tf.Templates { + if err := d.memDriver.AddTemplate(ctx, template.Key, template.Data.Data); err != nil { + // log error + continue + } + } + + return nil +} + +func (d *FileDriver) Close(ctx context.Context) error { + if err := d.memDriver.Close(ctx); err != nil { + return err + } + return nil +} + +func (d *FileDriver) ListTemplates(ctx context.Context, ch chan *templates.TemplateKey) error { + return d.memDriver.ListTemplates(ctx, ch) +} + +func (d *FileDriver) AddTemplate(ctx context.Context, key *templates.TemplateKey, template interface{}) error { + d.lock.Lock() + defer d.lock.Unlock() + if err := d.memDriver.AddTemplate(ctx, key, template); err != nil { + return err + } + + tf := NewTemplateFile() + + ch := make(chan *templates.TemplateKey, 5) + go func() { + if err := d.memDriver.ListTemplates(ctx, ch); err != nil { + // log error + close(ch) + } + }() + for key := range ch { + if key == nil { + break + } + if template, err := d.memDriver.GetTemplate(ctx, key); err != nil { + // log error + continue + } else { + tf.Add(key, template) + } + + } + + tmpPath := fmt.Sprintf("%s-tmp", d.path) + f, err := os.OpenFile(tmpPath, os.O_RDWR|os.O_CREATE, 0755) + if err != nil { + return err + } + + enc := json.NewEncoder(f) + if err := enc.Encode(tf); err != nil { + f.Close() + return err + } + + return os.Rename(tmpPath, d.path) +} + +func (d *FileDriver) GetTemplate(ctx context.Context, key *templates.TemplateKey) (interface{}, error) { + return d.memDriver.GetTemplate(ctx, key) +} + +func init() { + d := &FileDriver{} + templates.RegisterTemplateDriver("file", d) +} diff --git a/decoders/netflow/templates/memory/memory.go b/decoders/netflow/templates/memory/memory.go new file mode 100644 index 0000000..0f16b24 --- /dev/null +++ b/decoders/netflow/templates/memory/memory.go @@ -0,0 +1,73 @@ +package memory + +import ( + "context" + "github.com/netsampler/goflow2/decoders/netflow/templates" + "sync" +) + +var ( + Driver = &MemoryDriver{} +) + +type templateData struct { + key *templates.TemplateKey + data interface{} +} + +type MemoryDriver struct { + lock *sync.RWMutex + templates map[string]templateData +} + +func (d *MemoryDriver) Prepare() error { + // could have an expiry + return nil +} + +func (d *MemoryDriver) Init(context.Context) error { + d.lock = &sync.RWMutex{} + d.templates = make(map[string]templateData) + return nil +} + +func (d *MemoryDriver) Close(context.Context) error { + return nil +} + +func (d *MemoryDriver) ListTemplates(ctx context.Context, ch chan *templates.TemplateKey) error { + d.lock.RLock() + defer d.lock.RUnlock() + for _, v := range d.templates { + select { + case ch <- v.key: + case <-ctx.Done(): + return ctx.Err() + } + } + select { + case ch <- nil: + } + return nil +} + +func (d *MemoryDriver) AddTemplate(ctx context.Context, key *templates.TemplateKey, template interface{}) error { + d.lock.Lock() + defer d.lock.Unlock() + + d.templates[key.String()] = templateData{ + key: key, + data: template, + } + return nil +} + +func (d *MemoryDriver) GetTemplate(ctx context.Context, key *templates.TemplateKey) (interface{}, error) { + d.lock.RLock() + defer d.lock.RUnlock() + return d.templates[key.String()].data, nil +} + +func init() { + templates.RegisterTemplateDriver("memory", Driver) +} diff --git a/decoders/netflow/templates/templates.go b/decoders/netflow/templates/templates.go new file mode 100644 index 0000000..525e6b1 --- /dev/null +++ b/decoders/netflow/templates/templates.go @@ -0,0 +1,139 @@ +package templates + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" +) + +var ( + templateDrivers = make(map[string]TemplateDriver) // might be better to change into "factory" + lock = &sync.RWMutex{} +) + +type TemplateDriver interface { + TemplateInterface + + Prepare() error // Prepare driver (eg: flag registration) + Init(context.Context) error // Initialize driver (eg: parse keying) + Close(context.Context) error // Close drive (eg: close file) +} + +type TemplateKey struct { + TemplateKey string + Version uint16 + ObsDomainId uint32 + TemplateId uint16 +} + +func NewTemplateKey(templateKey string, version uint16, obsDomainId uint32, templateId uint16) *TemplateKey { + return &TemplateKey{ + TemplateKey: templateKey, + Version: version, + ObsDomainId: obsDomainId, + TemplateId: templateId, + } +} + +func (k *TemplateKey) String() string { + return fmt.Sprintf("%s-%d-%d-%d", k.TemplateKey, k.Version, k.ObsDomainId, k.TemplateId) +} + +func ParseTemplateKey(key string, k *TemplateKey) error { + if k != nil { + return nil + } + var version uint16 + var obsDomainId uint32 + var templateId uint16 + + keySplit := strings.Split(key, "-") + if len(keySplit) != 4 { + return fmt.Errorf("template key format is invalid") + } + templateKey := keySplit[0] + if val, err := strconv.ParseUint(keySplit[1], 10, 64); err != nil { + return fmt.Errorf("template key version is invalid") + } else { + version = uint16(val) + } + if val, err := strconv.ParseUint(keySplit[2], 10, 64); err != nil { + fmt.Errorf("template key observation domain I Dis invalid") + } else { + obsDomainId = uint32(val) + } + if val, err := strconv.ParseUint(keySplit[3], 10, 64); err != nil { + fmt.Errorf("template key template ID is invalid") + } else { + templateId = uint16(val) + } + + k.TemplateKey = templateKey + k.Version = version + k.ObsDomainId = obsDomainId + k.TemplateId = templateId + + return nil +} + +type TemplateInterface interface { + ListTemplates(ctx context.Context, ch chan *TemplateKey) error + GetTemplate(ctx context.Context, key *TemplateKey) (interface{}, error) + AddTemplate(ctx context.Context, key *TemplateKey, template interface{}) error // add expiration +} + +type TemplateSystem struct { + driver TemplateDriver +} + +func (t *TemplateSystem) ListTemplates(ctx context.Context, ch chan *TemplateKey) error { + return t.driver.ListTemplates(ctx, ch) +} + +func (t *TemplateSystem) AddTemplate(ctx context.Context, key *TemplateKey, template interface{}) error { + return t.driver.AddTemplate(ctx, key, template) +} + +func (t *TemplateSystem) GetTemplate(ctx context.Context, key *TemplateKey) (interface{}, error) { + return t.driver.GetTemplate(ctx, key) +} + +func (t *TemplateSystem) Close(ctx context.Context) error { + return t.driver.Close(ctx) +} + +func RegisterTemplateDriver(name string, t TemplateDriver) { + lock.Lock() + templateDrivers[name] = t + lock.Unlock() + + if err := t.Prepare(); err != nil { + panic(err) + } +} + +func FindTemplateSystem(ctx context.Context, name string) (*TemplateSystem, error) { + lock.RLock() + t, ok := templateDrivers[name] + lock.RUnlock() + if !ok { + return nil, fmt.Errorf("Template %s not found", name) + } + + err := t.Init(ctx) + return &TemplateSystem{t}, err +} + +func GetTemplates() []string { + lock.RLock() + defer lock.RUnlock() + t := make([]string, len(templateDrivers)) + var i int + for k, _ := range templateDrivers { + t[i] = k + i++ + } + return t +} diff --git a/format/format.go b/format/format.go index 5edf057..66e5231 100644 --- a/format/format.go +++ b/format/format.go @@ -15,6 +15,8 @@ type FormatDriver interface { Prepare() error // Prepare driver (eg: flag registration) Init(context.Context) error // Initialize driver (eg: parse keying) Format(data interface{}) ([]byte, []byte, error) // Send a message + + //FormatInterface // set this and remove Format } type FormatInterface interface { diff --git a/utils/netflow.go b/utils/netflow.go index 28bbf15..cabe063 100644 --- a/utils/netflow.go +++ b/utils/netflow.go @@ -2,13 +2,12 @@ package utils import ( "bytes" - "encoding/json" - "net/http" - "strconv" + "context" "sync" "time" "github.com/netsampler/goflow2/decoders/netflow" + "github.com/netsampler/goflow2/decoders/netflow/templates" "github.com/netsampler/goflow2/format" flowmessage "github.com/netsampler/goflow2/pb" "github.com/netsampler/goflow2/producer" @@ -16,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +/* type TemplateSystem struct { key string templates *netflow.BasicTemplateSystem @@ -49,21 +49,34 @@ func (s *TemplateSystem) AddTemplate(version uint16, obsDomainId uint32, templat func (s *TemplateSystem) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) { return s.templates.GetTemplate(version, obsDomainId, templateId) } +*/ type StateNetFlow struct { stopper - Format format.FormatInterface - Transport transport.TransportInterface - Logger Logger - templateslock *sync.RWMutex - templates map[string]*TemplateSystem + Format format.FormatInterface + Transport transport.TransportInterface + Logger Logger + /*templateslock *sync.RWMutex + templates map[string]*TemplateSystem*/ samplinglock *sync.RWMutex sampling map[string]producer.SamplingRateSystem Config *producer.ProducerConfig configMapped *producer.ProducerConfigMapped + + TemplateSystem templates.TemplateInterface + + ctx context.Context +} + +func NewStateNetFlow() *StateNetFlow { + return &StateNetFlow{ + ctx: context.Background(), + samplinglock: &sync.RWMutex{}, + sampling: make(map[string]producer.SamplingRateSystem), + } } func (s *StateNetFlow) DecodeFlow(msg interface{}) error { @@ -76,18 +89,6 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { samplerAddress = samplerAddress.To4() } - s.templateslock.RLock() - templates, ok := s.templates[key] - s.templateslock.RUnlock() - if !ok { - templates = &TemplateSystem{ - templates: netflow.CreateTemplateSystem(), - key: key, - } - s.templateslock.Lock() - s.templates[key] = templates - s.templateslock.Unlock() - } s.samplinglock.RLock() sampling, ok := s.sampling[key] s.samplinglock.RUnlock() @@ -104,7 +105,7 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { } timeTrackStart := time.Now() - msgDec, err := netflow.DecodeMessage(buf, templates) + msgDec, err := netflow.DecodeMessageContext(s.ctx, buf, key, s.TemplateSystem) if err != nil { switch err.(type) { case *netflow.ErrorTemplateNotFound: @@ -340,6 +341,7 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error { return nil } +/* func (s *StateNetFlow) ServeHTTPTemplates(w http.ResponseWriter, r *http.Request) { tmp := make(map[string]map[uint16]map[uint32]map[uint16]interface{}) s.templateslock.RLock() @@ -357,7 +359,7 @@ func (s *StateNetFlow) InitTemplates() { s.templateslock = &sync.RWMutex{} s.sampling = make(map[string]producer.SamplingRateSystem) s.samplinglock = &sync.RWMutex{} -} +}*/ func (s *StateNetFlow) initConfig() { s.configMapped = producer.NewProducerConfigMapped(s.Config) @@ -367,7 +369,9 @@ func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport if err := s.start(); err != nil { return err } - s.InitTemplates() + //s.InitTemplates() s.initConfig() return UDPStoppableRoutine(s.stopCh, "NetFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger) } + +// FlowRoutineCtx? diff --git a/utils/nflegacy.go b/utils/nflegacy.go index 9b9f0be..dcfc36d 100644 --- a/utils/nflegacy.go +++ b/utils/nflegacy.go @@ -20,6 +20,10 @@ type StateNFLegacy struct { Logger Logger } +func NewStateNFLegacy() *StateNFLegacy { + return &StateNFLegacy{} +} + func (s *StateNFLegacy) DecodeFlow(msg interface{}) error { pkt := msg.(BaseMessage) buf := bytes.NewBuffer(pkt.Payload) diff --git a/utils/sflow.go b/utils/sflow.go index 4ebe68d..27223bc 100644 --- a/utils/sflow.go +++ b/utils/sflow.go @@ -24,6 +24,10 @@ type StateSFlow struct { configMapped *producer.ProducerConfigMapped } +func NewStateSFlow() *StateSFlow { + return &StateSFlow{} +} + func (s *StateSFlow) DecodeFlow(msg interface{}) error { pkt := msg.(BaseMessage) buf := bytes.NewBuffer(pkt.Payload)