Refactor template as module (#49)

* Refactor template as module
* Abstraction to allow custom template storage (eg: file, redis, http...)
* Works similarly to Transport and Format
This commit is contained in:
Louis
2023-04-16 06:52:32 +03:00
committed by GitHub
parent 6b3c5f1215
commit c4c4ffb4e4
10 changed files with 492 additions and 104 deletions

View File

@@ -22,6 +22,11 @@ import (
_ "github.com/netsampler/goflow2/transport/file"
_ "github.com/netsampler/goflow2/transport/kafka"
// import various NetFlow/IPFIX templates
"github.com/netsampler/goflow2/decoders/netflow/templates"
_ "github.com/netsampler/goflow2/decoders/netflow/templates/file"
_ "github.com/netsampler/goflow2/decoders/netflow/templates/memory"
"github.com/netsampler/goflow2/utils"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
@@ -39,6 +44,8 @@ var (
LogLevel = flag.String("loglevel", "info", "Log level")
LogFmt = flag.String("logfmt", "normal", "Log formatter")
NetFlowTemplates = flag.String("netflow.templates", "memory", fmt.Sprintf("Choose the format (available: %s)", strings.Join(templates.GetTemplates(), ", ")))
Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", ")))
Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", ")))
@@ -95,6 +102,13 @@ func main() {
}
defer transporter.Close(ctx)
// the following is only useful when parsing NetFlowV9/IPFIX (template-based flow)
templateSystem, err := templates.FindTemplateSystem(ctx, *NetFlowTemplates)
if err != nil {
log.Fatal(err)
}
defer templateSystem.Close(ctx)
switch *LogFmt {
case "json":
log.SetFormatter(&log.JSONFormatter{})
@@ -154,10 +168,11 @@ func main() {
err = sSFlow.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "netflow" {
sNF := &utils.StateNetFlow{
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
Config: config,
Format: formatter,
Transport: transporter,
Logger: log.StandardLogger(),
Config: config,
TemplateSystem: templateSystem,
}
err = sNF.FlowRoutine(*Workers, hostname, int(port), *ReusePort)
} else if listenAddrUrl.Scheme == "nfl" {

View File

@@ -2,20 +2,14 @@ package netflow
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"sync"
"github.com/netsampler/goflow2/decoders/netflow/templates"
"github.com/netsampler/goflow2/decoders/utils"
)
type FlowBaseTemplateSet map[uint16]map[uint32]map[uint16]interface{}
type NetFlowTemplateSystem interface {
GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error)
AddTemplate(version uint16, obsDomainId uint32, template interface{})
}
func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateRecord, error) {
var records []NFv9OptionsTemplateRecord
var err error
@@ -249,66 +243,11 @@ func DecodeDataSet(version uint16, payload *bytes.Buffer, listFields []Field) ([
return records, nil
}
func (ts *BasicTemplateSystem) GetTemplates() map[uint16]map[uint32]map[uint16]interface{} {
ts.templateslock.RLock()
tmp := ts.templates
ts.templateslock.RUnlock()
return tmp
func DecodeMessage(payload *bytes.Buffer, templates templates.TemplateInterface) (interface{}, error) {
return DecodeMessageContext(context.Background(), payload, "", templates)
}
func (ts *BasicTemplateSystem) AddTemplate(version uint16, obsDomainId uint32, template interface{}) {
ts.templateslock.Lock()
defer ts.templateslock.Unlock()
_, exists := ts.templates[version]
if exists != true {
ts.templates[version] = make(map[uint32]map[uint16]interface{})
}
_, exists = ts.templates[version][obsDomainId]
if exists != true {
ts.templates[version][obsDomainId] = make(map[uint16]interface{})
}
var templateId uint16
switch templateIdConv := template.(type) {
case IPFIXOptionsTemplateRecord:
templateId = templateIdConv.TemplateId
case NFv9OptionsTemplateRecord:
templateId = templateIdConv.TemplateId
case TemplateRecord:
templateId = templateIdConv.TemplateId
}
ts.templates[version][obsDomainId][templateId] = template
}
func (ts *BasicTemplateSystem) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
ts.templateslock.RLock()
defer ts.templateslock.RUnlock()
templatesVersion, okver := ts.templates[version]
if okver {
templatesObsDom, okobs := templatesVersion[obsDomainId]
if okobs {
template, okid := templatesObsDom[templateId]
if okid {
return template, nil
}
}
}
return nil, NewErrorTemplateNotFound(version, obsDomainId, templateId, "info")
}
type BasicTemplateSystem struct {
templates FlowBaseTemplateSet
templateslock *sync.RWMutex
}
func CreateTemplateSystem() *BasicTemplateSystem {
ts := &BasicTemplateSystem{
templates: make(FlowBaseTemplateSet),
templateslock: &sync.RWMutex{},
}
return ts
}
func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (interface{}, error) {
func DecodeMessageContext(ctx context.Context, payload *bytes.Buffer, templateKey string, tpli templates.TemplateInterface) (interface{}, error) {
var size uint16
packetNFv9 := NFv9Packet{}
packetIPFIX := IPFIXPacket{}
@@ -368,9 +307,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
flowSet = templatefs
if templates != nil {
if tpli != nil {
for _, record := range records {
templates.AddTemplate(version, obsDomainId, record)
tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record)
}
}
@@ -386,9 +325,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
}
flowSet = optsTemplatefs
if templates != nil {
if tpli != nil {
for _, record := range records {
templates.AddTemplate(version, obsDomainId, record)
tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record)
}
}
@@ -404,9 +343,9 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
}
flowSet = templatefs
if templates != nil {
if tpli != nil {
for _, record := range records {
templates.AddTemplate(version, obsDomainId, record)
tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record)
}
}
@@ -422,20 +361,20 @@ func DecodeMessage(payload *bytes.Buffer, templates NetFlowTemplateSystem) (inte
}
flowSet = optsTemplatefs
if templates != nil {
if tpli != nil {
for _, record := range records {
templates.AddTemplate(version, obsDomainId, record)
tpli.AddTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, record.TemplateId), record)
}
}
} else if fsheader.Id >= 256 {
dataReader := bytes.NewBuffer(payload.Next(nextrelpos))
if templates == nil {
if tpli == nil {
continue
}
template, err := templates.GetTemplate(version, obsDomainId, fsheader.Id)
template, err := tpli.GetTemplate(ctx, templates.NewTemplateKey(templateKey, version, obsDomainId, fsheader.Id))
if err == nil {
switch templatec := template.(type) {

View File

@@ -2,13 +2,17 @@ package netflow
import (
"bytes"
"context"
"testing"
"github.com/netsampler/goflow2/decoders/netflow/templates/memory"
"github.com/stretchr/testify/assert"
)
func TestDecodeNetFlowV9(t *testing.T) {
templates := CreateTemplateSystem()
templates := &memory.MemoryDriver{}
templates.Init(context.Background())
// Decode a template
template := []byte{

View File

@@ -0,0 +1,204 @@
package file
import (
"context"
"encoding/json"
"flag"
"fmt"
"github.com/netsampler/goflow2/decoders/netflow"
"github.com/netsampler/goflow2/decoders/netflow/templates"
"github.com/netsampler/goflow2/decoders/netflow/templates/memory"
"os"
"sync"
)
type TemplateFileObject struct {
Key *templates.TemplateKey
Data *TemplateFileData
}
type TemplateFileData struct {
Type string
Data interface{}
}
func (d *TemplateFileData) UnmarshalJSON(b []byte) error {
var s struct {
Type string
Data interface{} `json:"-"`
}
if err := json.Unmarshal(b, &s); err != nil {
return err
}
switch s.Type {
case "NFv9OptionsTemplateRecord":
newS := new(struct {
Type string
Data netflow.NFv9OptionsTemplateRecord
})
if err := json.Unmarshal(b, newS); err != nil {
return err
}
d.Type = newS.Type
d.Data = newS.Data
case "TemplateRecord":
newS := new(struct {
Type string
Data netflow.TemplateRecord
})
if err := json.Unmarshal(b, newS); err != nil {
return err
}
d.Type = newS.Type
d.Data = newS.Data
case "IPFIXOptionsTemplateRecord":
newS := new(struct {
Type string
Data netflow.IPFIXOptionsTemplateRecord
})
if err := json.Unmarshal(b, newS); err != nil {
return err
}
d.Type = newS.Type
d.Data = newS.Data
}
return nil
}
type TemplateFile struct {
Templates []*TemplateFileObject `json:"templates"`
}
func (f *TemplateFile) Add(key *templates.TemplateKey, data interface{}) {
var typeName string
switch data.(type) {
case netflow.NFv9OptionsTemplateRecord:
typeName = "NFv9OptionsTemplateRecord"
case netflow.TemplateRecord:
typeName = "TemplateRecord"
case netflow.IPFIXOptionsTemplateRecord:
typeName = "IPFIXOptionsTemplateRecord"
default:
return
}
f.Templates = append(f.Templates, &TemplateFileObject{
Key: key,
Data: &TemplateFileData{
Type: typeName,
Data: data,
},
})
}
func NewTemplateFile() *TemplateFile {
return &TemplateFile{
Templates: make([]*TemplateFileObject, 0),
}
}
type FileDriver struct {
memDriver *memory.MemoryDriver
path string
lock *sync.Mutex
}
func (d *FileDriver) Prepare() error {
d.memDriver = memory.Driver
d.lock = &sync.Mutex{}
flag.StringVar(&d.path, "netflow.templates.file.path", "./templates.json", "Path of file to store templates")
return nil
}
func (d *FileDriver) Init(ctx context.Context) error {
var err error
if err = d.memDriver.Init(ctx); err != nil {
return err
}
f, err := os.OpenFile(d.path, os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
return err
}
defer f.Close()
dec := json.NewDecoder(f)
tf := NewTemplateFile()
if err = dec.Decode(tf); err != nil {
// log error
}
for _, template := range tf.Templates {
if err := d.memDriver.AddTemplate(ctx, template.Key, template.Data.Data); err != nil {
// log error
continue
}
}
return nil
}
func (d *FileDriver) Close(ctx context.Context) error {
if err := d.memDriver.Close(ctx); err != nil {
return err
}
return nil
}
func (d *FileDriver) ListTemplates(ctx context.Context, ch chan *templates.TemplateKey) error {
return d.memDriver.ListTemplates(ctx, ch)
}
func (d *FileDriver) AddTemplate(ctx context.Context, key *templates.TemplateKey, template interface{}) error {
d.lock.Lock()
defer d.lock.Unlock()
if err := d.memDriver.AddTemplate(ctx, key, template); err != nil {
return err
}
tf := NewTemplateFile()
ch := make(chan *templates.TemplateKey, 5)
go func() {
if err := d.memDriver.ListTemplates(ctx, ch); err != nil {
// log error
close(ch)
}
}()
for key := range ch {
if key == nil {
break
}
if template, err := d.memDriver.GetTemplate(ctx, key); err != nil {
// log error
continue
} else {
tf.Add(key, template)
}
}
tmpPath := fmt.Sprintf("%s-tmp", d.path)
f, err := os.OpenFile(tmpPath, os.O_RDWR|os.O_CREATE, 0755)
if err != nil {
return err
}
enc := json.NewEncoder(f)
if err := enc.Encode(tf); err != nil {
f.Close()
return err
}
return os.Rename(tmpPath, d.path)
}
func (d *FileDriver) GetTemplate(ctx context.Context, key *templates.TemplateKey) (interface{}, error) {
return d.memDriver.GetTemplate(ctx, key)
}
func init() {
d := &FileDriver{}
templates.RegisterTemplateDriver("file", d)
}

View File

@@ -0,0 +1,73 @@
package memory
import (
"context"
"github.com/netsampler/goflow2/decoders/netflow/templates"
"sync"
)
var (
Driver = &MemoryDriver{}
)
type templateData struct {
key *templates.TemplateKey
data interface{}
}
type MemoryDriver struct {
lock *sync.RWMutex
templates map[string]templateData
}
func (d *MemoryDriver) Prepare() error {
// could have an expiry
return nil
}
func (d *MemoryDriver) Init(context.Context) error {
d.lock = &sync.RWMutex{}
d.templates = make(map[string]templateData)
return nil
}
func (d *MemoryDriver) Close(context.Context) error {
return nil
}
func (d *MemoryDriver) ListTemplates(ctx context.Context, ch chan *templates.TemplateKey) error {
d.lock.RLock()
defer d.lock.RUnlock()
for _, v := range d.templates {
select {
case ch <- v.key:
case <-ctx.Done():
return ctx.Err()
}
}
select {
case ch <- nil:
}
return nil
}
func (d *MemoryDriver) AddTemplate(ctx context.Context, key *templates.TemplateKey, template interface{}) error {
d.lock.Lock()
defer d.lock.Unlock()
d.templates[key.String()] = templateData{
key: key,
data: template,
}
return nil
}
func (d *MemoryDriver) GetTemplate(ctx context.Context, key *templates.TemplateKey) (interface{}, error) {
d.lock.RLock()
defer d.lock.RUnlock()
return d.templates[key.String()].data, nil
}
func init() {
templates.RegisterTemplateDriver("memory", Driver)
}

View File

@@ -0,0 +1,139 @@
package templates
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
)
var (
templateDrivers = make(map[string]TemplateDriver) // might be better to change into "factory"
lock = &sync.RWMutex{}
)
type TemplateDriver interface {
TemplateInterface
Prepare() error // Prepare driver (eg: flag registration)
Init(context.Context) error // Initialize driver (eg: parse keying)
Close(context.Context) error // Close drive (eg: close file)
}
type TemplateKey struct {
TemplateKey string
Version uint16
ObsDomainId uint32
TemplateId uint16
}
func NewTemplateKey(templateKey string, version uint16, obsDomainId uint32, templateId uint16) *TemplateKey {
return &TemplateKey{
TemplateKey: templateKey,
Version: version,
ObsDomainId: obsDomainId,
TemplateId: templateId,
}
}
func (k *TemplateKey) String() string {
return fmt.Sprintf("%s-%d-%d-%d", k.TemplateKey, k.Version, k.ObsDomainId, k.TemplateId)
}
func ParseTemplateKey(key string, k *TemplateKey) error {
if k != nil {
return nil
}
var version uint16
var obsDomainId uint32
var templateId uint16
keySplit := strings.Split(key, "-")
if len(keySplit) != 4 {
return fmt.Errorf("template key format is invalid")
}
templateKey := keySplit[0]
if val, err := strconv.ParseUint(keySplit[1], 10, 64); err != nil {
return fmt.Errorf("template key version is invalid")
} else {
version = uint16(val)
}
if val, err := strconv.ParseUint(keySplit[2], 10, 64); err != nil {
fmt.Errorf("template key observation domain I Dis invalid")
} else {
obsDomainId = uint32(val)
}
if val, err := strconv.ParseUint(keySplit[3], 10, 64); err != nil {
fmt.Errorf("template key template ID is invalid")
} else {
templateId = uint16(val)
}
k.TemplateKey = templateKey
k.Version = version
k.ObsDomainId = obsDomainId
k.TemplateId = templateId
return nil
}
type TemplateInterface interface {
ListTemplates(ctx context.Context, ch chan *TemplateKey) error
GetTemplate(ctx context.Context, key *TemplateKey) (interface{}, error)
AddTemplate(ctx context.Context, key *TemplateKey, template interface{}) error // add expiration
}
type TemplateSystem struct {
driver TemplateDriver
}
func (t *TemplateSystem) ListTemplates(ctx context.Context, ch chan *TemplateKey) error {
return t.driver.ListTemplates(ctx, ch)
}
func (t *TemplateSystem) AddTemplate(ctx context.Context, key *TemplateKey, template interface{}) error {
return t.driver.AddTemplate(ctx, key, template)
}
func (t *TemplateSystem) GetTemplate(ctx context.Context, key *TemplateKey) (interface{}, error) {
return t.driver.GetTemplate(ctx, key)
}
func (t *TemplateSystem) Close(ctx context.Context) error {
return t.driver.Close(ctx)
}
func RegisterTemplateDriver(name string, t TemplateDriver) {
lock.Lock()
templateDrivers[name] = t
lock.Unlock()
if err := t.Prepare(); err != nil {
panic(err)
}
}
func FindTemplateSystem(ctx context.Context, name string) (*TemplateSystem, error) {
lock.RLock()
t, ok := templateDrivers[name]
lock.RUnlock()
if !ok {
return nil, fmt.Errorf("Template %s not found", name)
}
err := t.Init(ctx)
return &TemplateSystem{t}, err
}
func GetTemplates() []string {
lock.RLock()
defer lock.RUnlock()
t := make([]string, len(templateDrivers))
var i int
for k, _ := range templateDrivers {
t[i] = k
i++
}
return t
}

View File

@@ -15,6 +15,8 @@ type FormatDriver interface {
Prepare() error // Prepare driver (eg: flag registration)
Init(context.Context) error // Initialize driver (eg: parse keying)
Format(data interface{}) ([]byte, []byte, error) // Send a message
//FormatInterface // set this and remove Format
}
type FormatInterface interface {

View File

@@ -2,13 +2,12 @@ package utils
import (
"bytes"
"encoding/json"
"net/http"
"strconv"
"context"
"sync"
"time"
"github.com/netsampler/goflow2/decoders/netflow"
"github.com/netsampler/goflow2/decoders/netflow/templates"
"github.com/netsampler/goflow2/format"
flowmessage "github.com/netsampler/goflow2/pb"
"github.com/netsampler/goflow2/producer"
@@ -16,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
/*
type TemplateSystem struct {
key string
templates *netflow.BasicTemplateSystem
@@ -49,21 +49,34 @@ func (s *TemplateSystem) AddTemplate(version uint16, obsDomainId uint32, templat
func (s *TemplateSystem) GetTemplate(version uint16, obsDomainId uint32, templateId uint16) (interface{}, error) {
return s.templates.GetTemplate(version, obsDomainId, templateId)
}
*/
type StateNetFlow struct {
stopper
Format format.FormatInterface
Transport transport.TransportInterface
Logger Logger
templateslock *sync.RWMutex
templates map[string]*TemplateSystem
Format format.FormatInterface
Transport transport.TransportInterface
Logger Logger
/*templateslock *sync.RWMutex
templates map[string]*TemplateSystem*/
samplinglock *sync.RWMutex
sampling map[string]producer.SamplingRateSystem
Config *producer.ProducerConfig
configMapped *producer.ProducerConfigMapped
TemplateSystem templates.TemplateInterface
ctx context.Context
}
func NewStateNetFlow() *StateNetFlow {
return &StateNetFlow{
ctx: context.Background(),
samplinglock: &sync.RWMutex{},
sampling: make(map[string]producer.SamplingRateSystem),
}
}
func (s *StateNetFlow) DecodeFlow(msg interface{}) error {
@@ -76,18 +89,6 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error {
samplerAddress = samplerAddress.To4()
}
s.templateslock.RLock()
templates, ok := s.templates[key]
s.templateslock.RUnlock()
if !ok {
templates = &TemplateSystem{
templates: netflow.CreateTemplateSystem(),
key: key,
}
s.templateslock.Lock()
s.templates[key] = templates
s.templateslock.Unlock()
}
s.samplinglock.RLock()
sampling, ok := s.sampling[key]
s.samplinglock.RUnlock()
@@ -104,7 +105,7 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error {
}
timeTrackStart := time.Now()
msgDec, err := netflow.DecodeMessage(buf, templates)
msgDec, err := netflow.DecodeMessageContext(s.ctx, buf, key, s.TemplateSystem)
if err != nil {
switch err.(type) {
case *netflow.ErrorTemplateNotFound:
@@ -340,6 +341,7 @@ func (s *StateNetFlow) DecodeFlow(msg interface{}) error {
return nil
}
/*
func (s *StateNetFlow) ServeHTTPTemplates(w http.ResponseWriter, r *http.Request) {
tmp := make(map[string]map[uint16]map[uint32]map[uint16]interface{})
s.templateslock.RLock()
@@ -357,7 +359,7 @@ func (s *StateNetFlow) InitTemplates() {
s.templateslock = &sync.RWMutex{}
s.sampling = make(map[string]producer.SamplingRateSystem)
s.samplinglock = &sync.RWMutex{}
}
}*/
func (s *StateNetFlow) initConfig() {
s.configMapped = producer.NewProducerConfigMapped(s.Config)
@@ -367,7 +369,9 @@ func (s *StateNetFlow) FlowRoutine(workers int, addr string, port int, reuseport
if err := s.start(); err != nil {
return err
}
s.InitTemplates()
//s.InitTemplates()
s.initConfig()
return UDPStoppableRoutine(s.stopCh, "NetFlow", s.DecodeFlow, workers, addr, port, reuseport, s.Logger)
}
// FlowRoutineCtx?

View File

@@ -20,6 +20,10 @@ type StateNFLegacy struct {
Logger Logger
}
func NewStateNFLegacy() *StateNFLegacy {
return &StateNFLegacy{}
}
func (s *StateNFLegacy) DecodeFlow(msg interface{}) error {
pkt := msg.(BaseMessage)
buf := bytes.NewBuffer(pkt.Payload)

View File

@@ -24,6 +24,10 @@ type StateSFlow struct {
configMapped *producer.ProducerConfigMapped
}
func NewStateSFlow() *StateSFlow {
return &StateSFlow{}
}
func (s *StateSFlow) DecodeFlow(msg interface{}) error {
pkt := msg.(BaseMessage)
buf := bytes.NewBuffer(pkt.Payload)