mirror of
https://github.com/openobserve/goflow2.git
synced 2025-11-03 05:23:32 +00:00
decoder: fix netflow flowsets decoding (#218)
* decoder was only reading the first FlowSet of those messages, resulting in unaccounted flows Co-authored-by: lspgn <lspgn@users.noreply.github.com>
This commit is contained in:
@@ -282,157 +282,163 @@ func DecodeDataSet(version uint16, payload *bytes.Buffer, listFields []Field) ([
|
|||||||
return records, nil
|
return records, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func DecodeMessageCommon(payload *bytes.Buffer, templates NetFlowTemplateSystem, obsDomainId uint32, size, version uint16) (interface{}, error) {
|
func DecodeMessageCommon(payload *bytes.Buffer, templates NetFlowTemplateSystem, obsDomainId uint32, size, version uint16) (flowSets []interface{}, err error) {
|
||||||
var flowSet interface{}
|
|
||||||
|
|
||||||
for i := 0; ((i < int(size) && version == 9) || version == 10) && payload.Len() > 0; i++ {
|
for i := 0; ((i < int(size) && version == 9) || version == 10) && payload.Len() > 0; i++ {
|
||||||
fsheader := FlowSetHeader{}
|
if flowSet, err := DecodeMessageCommonFlowSet(payload, templates, obsDomainId, version); err != nil {
|
||||||
if err := utils.BinaryDecoder(payload,
|
return flowSets, err
|
||||||
&fsheader.Id,
|
|
||||||
&fsheader.Length,
|
|
||||||
); err != nil {
|
|
||||||
return flowSet, fmt.Errorf("header [%w]", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
nextrelpos := int(fsheader.Length) - binary.Size(fsheader)
|
|
||||||
if nextrelpos < 0 {
|
|
||||||
return flowSet, fmt.Errorf("negative length")
|
|
||||||
}
|
|
||||||
|
|
||||||
if fsheader.Id == 0 && version == 9 {
|
|
||||||
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
|
||||||
records, err := DecodeTemplateSet(version, templateReader)
|
|
||||||
if err != nil {
|
|
||||||
return flowSet, &FlowError{version, "FlowSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
templatefs := TemplateFlowSet{
|
|
||||||
FlowSetHeader: fsheader,
|
|
||||||
Records: records,
|
|
||||||
}
|
|
||||||
|
|
||||||
flowSet = templatefs
|
|
||||||
|
|
||||||
if templates != nil {
|
|
||||||
for _, record := range records {
|
|
||||||
if err := templates.AddTemplate(version, obsDomainId, record.TemplateId, record); err != nil {
|
|
||||||
return flowSet, &FlowError{version, "FlowSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if fsheader.Id == 1 && version == 9 {
|
|
||||||
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
|
||||||
records, err := DecodeNFv9OptionsTemplateSet(templateReader)
|
|
||||||
if err != nil {
|
|
||||||
return flowSet, &FlowError{version, "NetFlow OptionsTemplateSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
optsTemplatefs := NFv9OptionsTemplateFlowSet{
|
|
||||||
FlowSetHeader: fsheader,
|
|
||||||
Records: records,
|
|
||||||
}
|
|
||||||
flowSet = optsTemplatefs
|
|
||||||
|
|
||||||
if templates != nil {
|
|
||||||
for _, record := range records {
|
|
||||||
if err := templates.AddTemplate(version, obsDomainId, record.TemplateId, record); err != nil {
|
|
||||||
return flowSet, &FlowError{version, "OptionsTemplateSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if fsheader.Id == 2 && version == 10 {
|
|
||||||
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
|
||||||
records, err := DecodeTemplateSet(version, templateReader)
|
|
||||||
if err != nil {
|
|
||||||
return flowSet, &FlowError{version, "IPFIX TemplateSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
templatefs := TemplateFlowSet{
|
|
||||||
FlowSetHeader: fsheader,
|
|
||||||
Records: records,
|
|
||||||
}
|
|
||||||
flowSet = templatefs
|
|
||||||
|
|
||||||
if templates != nil {
|
|
||||||
for _, record := range records {
|
|
||||||
if err := templates.AddTemplate(version, obsDomainId, record.TemplateId, record); err != nil {
|
|
||||||
return flowSet, &FlowError{version, "IPFIX TemplateSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if fsheader.Id == 3 && version == 10 {
|
|
||||||
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
|
||||||
records, err := DecodeIPFIXOptionsTemplateSet(templateReader)
|
|
||||||
if err != nil {
|
|
||||||
return flowSet, &FlowError{version, "IPFIX OptionsTemplateSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
optsTemplatefs := IPFIXOptionsTemplateFlowSet{
|
|
||||||
FlowSetHeader: fsheader,
|
|
||||||
Records: records,
|
|
||||||
}
|
|
||||||
flowSet = optsTemplatefs
|
|
||||||
|
|
||||||
if templates != nil {
|
|
||||||
for _, record := range records {
|
|
||||||
if err := templates.AddTemplate(version, obsDomainId, record.TemplateId, record); err != nil {
|
|
||||||
return flowSet, &FlowError{version, "IPFIX OptionsTemplateSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if fsheader.Id >= 256 {
|
|
||||||
dataReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
|
||||||
|
|
||||||
if templates == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
template, err := templates.GetTemplate(version, obsDomainId, fsheader.Id)
|
|
||||||
if err != nil {
|
|
||||||
return flowSet, &FlowError{version, "Decode", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch templatec := template.(type) {
|
|
||||||
case TemplateRecord:
|
|
||||||
records, err := DecodeDataSet(version, dataReader, templatec.Fields)
|
|
||||||
if err != nil {
|
|
||||||
return flowSet, &FlowError{version, "DataSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
datafs := DataFlowSet{
|
|
||||||
FlowSetHeader: fsheader,
|
|
||||||
Records: records,
|
|
||||||
}
|
|
||||||
flowSet = datafs
|
|
||||||
case IPFIXOptionsTemplateRecord:
|
|
||||||
records, err := DecodeOptionsDataSet(version, dataReader, templatec.Scopes, templatec.Options)
|
|
||||||
if err != nil {
|
|
||||||
return flowSet, &FlowError{version, "DataSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
|
|
||||||
datafs := OptionsDataFlowSet{
|
|
||||||
FlowSetHeader: fsheader,
|
|
||||||
Records: records,
|
|
||||||
}
|
|
||||||
flowSet = datafs
|
|
||||||
case NFv9OptionsTemplateRecord:
|
|
||||||
records, err := DecodeOptionsDataSet(version, dataReader, templatec.Scopes, templatec.Options)
|
|
||||||
if err != nil {
|
|
||||||
return flowSet, &FlowError{version, "OptionDataSet", obsDomainId, fsheader.Id, err}
|
|
||||||
}
|
|
||||||
|
|
||||||
datafs := OptionsDataFlowSet{
|
|
||||||
FlowSetHeader: fsheader,
|
|
||||||
Records: records,
|
|
||||||
}
|
|
||||||
flowSet = datafs
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
return flowSet, &FlowError{version, "Decode", obsDomainId, fsheader.Id, fmt.Errorf("ID error")}
|
flowSets = append(flowSets, flowSet)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return flowSets, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func DecodeMessageCommonFlowSet(payload *bytes.Buffer, templates NetFlowTemplateSystem, obsDomainId uint32, version uint16) (flowSet interface{}, err error) {
|
||||||
|
fsheader := FlowSetHeader{}
|
||||||
|
if err := utils.BinaryDecoder(payload,
|
||||||
|
&fsheader.Id,
|
||||||
|
&fsheader.Length,
|
||||||
|
); err != nil {
|
||||||
|
return flowSet, fmt.Errorf("header [%w]", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
nextrelpos := int(fsheader.Length) - binary.Size(fsheader)
|
||||||
|
if nextrelpos < 0 {
|
||||||
|
return flowSet, fmt.Errorf("negative length")
|
||||||
|
}
|
||||||
|
|
||||||
|
if fsheader.Id == 0 && version == 9 {
|
||||||
|
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
||||||
|
records, err := DecodeTemplateSet(version, templateReader)
|
||||||
|
if err != nil {
|
||||||
|
return flowSet, &FlowError{version, "FlowSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
templatefs := TemplateFlowSet{
|
||||||
|
FlowSetHeader: fsheader,
|
||||||
|
Records: records,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
flowSet = templatefs
|
||||||
|
|
||||||
|
if templates != nil {
|
||||||
|
for _, record := range records {
|
||||||
|
if err := templates.AddTemplate(version, obsDomainId, record.TemplateId, record); err != nil {
|
||||||
|
return flowSet, &FlowError{version, "FlowSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if fsheader.Id == 1 && version == 9 {
|
||||||
|
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
||||||
|
records, err := DecodeNFv9OptionsTemplateSet(templateReader)
|
||||||
|
if err != nil {
|
||||||
|
return flowSet, &FlowError{version, "NetFlow OptionsTemplateSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
optsTemplatefs := NFv9OptionsTemplateFlowSet{
|
||||||
|
FlowSetHeader: fsheader,
|
||||||
|
Records: records,
|
||||||
|
}
|
||||||
|
flowSet = optsTemplatefs
|
||||||
|
|
||||||
|
if templates != nil {
|
||||||
|
for _, record := range records {
|
||||||
|
if err := templates.AddTemplate(version, obsDomainId, record.TemplateId, record); err != nil {
|
||||||
|
return flowSet, &FlowError{version, "OptionsTemplateSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if fsheader.Id == 2 && version == 10 {
|
||||||
|
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
||||||
|
records, err := DecodeTemplateSet(version, templateReader)
|
||||||
|
if err != nil {
|
||||||
|
return flowSet, &FlowError{version, "IPFIX TemplateSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
templatefs := TemplateFlowSet{
|
||||||
|
FlowSetHeader: fsheader,
|
||||||
|
Records: records,
|
||||||
|
}
|
||||||
|
flowSet = templatefs
|
||||||
|
|
||||||
|
if templates != nil {
|
||||||
|
for _, record := range records {
|
||||||
|
if err := templates.AddTemplate(version, obsDomainId, record.TemplateId, record); err != nil {
|
||||||
|
return flowSet, &FlowError{version, "IPFIX TemplateSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if fsheader.Id == 3 && version == 10 {
|
||||||
|
templateReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
||||||
|
records, err := DecodeIPFIXOptionsTemplateSet(templateReader)
|
||||||
|
if err != nil {
|
||||||
|
return flowSet, &FlowError{version, "IPFIX OptionsTemplateSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
optsTemplatefs := IPFIXOptionsTemplateFlowSet{
|
||||||
|
FlowSetHeader: fsheader,
|
||||||
|
Records: records,
|
||||||
|
}
|
||||||
|
flowSet = optsTemplatefs
|
||||||
|
|
||||||
|
if templates != nil {
|
||||||
|
for _, record := range records {
|
||||||
|
if err := templates.AddTemplate(version, obsDomainId, record.TemplateId, record); err != nil {
|
||||||
|
return flowSet, &FlowError{version, "IPFIX OptionsTemplateSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if fsheader.Id >= 256 {
|
||||||
|
dataReader := bytes.NewBuffer(payload.Next(nextrelpos))
|
||||||
|
|
||||||
|
if templates == nil {
|
||||||
|
return flowSet, &FlowError{version, "Templates", obsDomainId, fsheader.Id, fmt.Errorf("No templates")}
|
||||||
|
}
|
||||||
|
|
||||||
|
template, err := templates.GetTemplate(version, obsDomainId, fsheader.Id)
|
||||||
|
if err != nil {
|
||||||
|
return flowSet, &FlowError{version, "Decode", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch templatec := template.(type) {
|
||||||
|
case TemplateRecord:
|
||||||
|
records, err := DecodeDataSet(version, dataReader, templatec.Fields)
|
||||||
|
if err != nil {
|
||||||
|
return flowSet, &FlowError{version, "DataSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
datafs := DataFlowSet{
|
||||||
|
FlowSetHeader: fsheader,
|
||||||
|
Records: records,
|
||||||
|
}
|
||||||
|
flowSet = datafs
|
||||||
|
case IPFIXOptionsTemplateRecord:
|
||||||
|
records, err := DecodeOptionsDataSet(version, dataReader, templatec.Scopes, templatec.Options)
|
||||||
|
if err != nil {
|
||||||
|
return flowSet, &FlowError{version, "DataSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
|
||||||
|
datafs := OptionsDataFlowSet{
|
||||||
|
FlowSetHeader: fsheader,
|
||||||
|
Records: records,
|
||||||
|
}
|
||||||
|
flowSet = datafs
|
||||||
|
case NFv9OptionsTemplateRecord:
|
||||||
|
records, err := DecodeOptionsDataSet(version, dataReader, templatec.Scopes, templatec.Options)
|
||||||
|
if err != nil {
|
||||||
|
return flowSet, &FlowError{version, "OptionDataSet", obsDomainId, fsheader.Id, err}
|
||||||
|
}
|
||||||
|
|
||||||
|
datafs := OptionsDataFlowSet{
|
||||||
|
FlowSetHeader: fsheader,
|
||||||
|
Records: records,
|
||||||
|
}
|
||||||
|
flowSet = datafs
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
return flowSet, &FlowError{version, "Decode", obsDomainId, fsheader.Id, fmt.Errorf("ID error")}
|
||||||
}
|
}
|
||||||
return flowSet, nil
|
return flowSet, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func DecodeMessageNetFlow(payload *bytes.Buffer, templates NetFlowTemplateSystem, packetNFv9 *NFv9Packet) error {
|
func DecodeMessageNetFlow(payload *bytes.Buffer, templates NetFlowTemplateSystem, packetNFv9 *NFv9Packet) error {
|
||||||
@@ -449,8 +455,8 @@ func DecodeMessageNetFlow(payload *bytes.Buffer, templates NetFlowTemplateSystem
|
|||||||
/*size = packetNFv9.Count
|
/*size = packetNFv9.Count
|
||||||
packetNFv9.Version = version
|
packetNFv9.Version = version
|
||||||
obsDomainId = packetNFv9.SourceId*/
|
obsDomainId = packetNFv9.SourceId*/
|
||||||
flowSet, err := DecodeMessageCommon(payload, templates, packetNFv9.SourceId, packetNFv9.Count, 9)
|
flowSets, err := DecodeMessageCommon(payload, templates, packetNFv9.SourceId, packetNFv9.Count, 9)
|
||||||
packetNFv9.FlowSets = append(packetNFv9.FlowSets, flowSet)
|
packetNFv9.FlowSets = flowSets
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &DecoderError{"NetFlowV9", err}
|
return &DecoderError{"NetFlowV9", err}
|
||||||
}
|
}
|
||||||
@@ -470,8 +476,8 @@ func DecodeMessageIPFIX(payload *bytes.Buffer, templates NetFlowTemplateSystem,
|
|||||||
/*size = packetIPFIX.Length
|
/*size = packetIPFIX.Length
|
||||||
packetIPFIX.Version = version
|
packetIPFIX.Version = version
|
||||||
obsDomainId = packetIPFIX.ObservationDomainId*/
|
obsDomainId = packetIPFIX.ObservationDomainId*/
|
||||||
flowSet, err := DecodeMessageCommon(payload, templates, packetIPFIX.ObservationDomainId, packetIPFIX.Length, 10)
|
flowSets, err := DecodeMessageCommon(payload, templates, packetIPFIX.ObservationDomainId, packetIPFIX.Length, 10)
|
||||||
packetIPFIX.FlowSets = append(packetIPFIX.FlowSets, flowSet)
|
packetIPFIX.FlowSets = flowSets
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &DecoderError{"IPFIX", err}
|
return &DecoderError{"IPFIX", err}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user