decoders: port reader changes from v1 to v2 (#204)

This commit is contained in:
Louis
2023-09-04 03:19:41 -07:00
committed by GitHub
parent cfed6da9c1
commit 34a0c1618e
6 changed files with 423 additions and 37 deletions

View File

@@ -42,7 +42,11 @@ func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateR
var err error
for payload.Len() >= 4 {
optsTemplateRecord := NFv9OptionsTemplateRecord{}
err = utils.BinaryDecoder(payload, &optsTemplateRecord.TemplateId, &optsTemplateRecord.ScopeLength, &optsTemplateRecord.OptionLength)
err = utils.BinaryDecoder(payload,
&optsTemplateRecord.TemplateId,
&optsTemplateRecord.ScopeLength,
&optsTemplateRecord.OptionLength,
)
if err != nil {
return records, err
}
@@ -80,12 +84,17 @@ func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateR
}
func DecodeField(payload *bytes.Buffer, field *Field, pen bool) error {
if err := utils.BinaryDecoder(payload, &field.Type, &field.Length); err != nil {
if err := utils.BinaryDecoder(payload,
&field.Type,
&field.Length,
); err != nil {
return err
}
if pen && field.Type&0x8000 != 0 {
field.PenProvided = true
return utils.BinaryDecoder(payload, &field.Pen)
return utils.BinaryDecoder(payload,
&field.Pen,
)
}
return nil
}
@@ -138,7 +147,10 @@ func DecodeTemplateSet(version uint16, payload *bytes.Buffer) ([]TemplateRecord,
var err error
for payload.Len() >= 4 {
templateRecord := TemplateRecord{}
err = utils.BinaryDecoder(payload, &templateRecord.TemplateId, &templateRecord.FieldCount)
err = utils.BinaryDecoder(payload,
&templateRecord.TemplateId,
&templateRecord.FieldCount,
)
if err != nil {
return records, fmt.Errorf("TemplateSet: reading header [%w]", err)
}
@@ -150,13 +162,18 @@ func DecodeTemplateSet(version uint16, payload *bytes.Buffer) ([]TemplateRecord,
fields := make([]Field, int(templateRecord.FieldCount)) // max 65532 which would be 589KB
for i := 0; i < int(templateRecord.FieldCount); i++ {
field := Field{}
if err := utils.BinaryDecoder(payload, &field.Type, &field.Length); err != nil {
if err := utils.BinaryDecoder(payload,
&field.Type,
&field.Length,
); err != nil {
return records, fmt.Errorf("TemplateSet: reading field [%w]", err)
}
if version == 10 && field.Type&0x8000 != 0 {
field.PenProvided = true
field.Type = field.Type ^ 0x8000
if err := utils.BinaryDecoder(payload, &field.Pen); err != nil {
if err := utils.BinaryDecoder(payload,
&field.Pen,
); err != nil {
return records, fmt.Errorf("TemplateSet: reading enterprise field [%w]", err)
}
}
@@ -190,11 +207,15 @@ func DecodeDataSetUsingFields(version uint16, payload *bytes.Buffer, listFields
if templateField.Length == 0xffff {
var variableLen8 byte
var variableLen16 uint16
if err := utils.BinaryDecoder(payload, &variableLen8); err != nil {
if err := utils.BinaryDecoder(payload,
&variableLen8,
); err != nil {
return nil, err
}
if variableLen8 == 0xff {
if err := utils.BinaryDecoder(payload, &variableLen16); err != nil {
if err := utils.BinaryDecoder(payload,
&variableLen16,
); err != nil {
return nil, err
}
finalLength = int(variableLen16)
@@ -266,7 +287,10 @@ func DecodeMessageCommon(payload *bytes.Buffer, templates NetFlowTemplateSystem,
for i := 0; ((i < int(size) && version == 9) || version == 10) && payload.Len() > 0; i++ {
fsheader := FlowSetHeader{}
if err := utils.BinaryDecoder(payload, &fsheader); err != nil {
if err := utils.BinaryDecoder(payload,
&fsheader.Id,
&fsheader.Length,
); err != nil {
return flowSet, fmt.Errorf("header [%w]", err)
}
@@ -418,7 +442,8 @@ func DecodeMessageNetFlow(payload *bytes.Buffer, templates NetFlowTemplateSystem
&packetNFv9.SystemUptime,
&packetNFv9.UnixSeconds,
&packetNFv9.SequenceNumber,
&packetNFv9.SourceId); err != nil {
&packetNFv9.SourceId,
); err != nil {
return &DecoderError{"NetFlowV9 header", err}
}
/*size = packetNFv9.Count
@@ -438,7 +463,8 @@ func DecodeMessageIPFIX(payload *bytes.Buffer, templates NetFlowTemplateSystem,
&packetIPFIX.Length,
&packetIPFIX.ExportTime,
&packetIPFIX.SequenceNumber,
&packetIPFIX.ObservationDomainId); err != nil {
&packetIPFIX.ObservationDomainId,
); err != nil {
return &DecoderError{"IPFIX header", err}
}
/*size = packetIPFIX.Length
@@ -455,7 +481,9 @@ func DecodeMessageIPFIX(payload *bytes.Buffer, templates NetFlowTemplateSystem,
func DecodeMessageVersion(payload *bytes.Buffer, templates NetFlowTemplateSystem, packetNFv9 *NFv9Packet, packetIPFIX *IPFIXPacket) error {
var version uint16
if err := utils.BinaryDecoder(payload, &version); err != nil {
if err := utils.BinaryDecoder(payload,
&version,
); err != nil {
return &DecoderError{"IPFIX/NetFlowV9 version", err}
}

View File

@@ -48,9 +48,35 @@ func DecodeMessage(payload *bytes.Buffer, packet *PacketNetFlowV5) error {
packet.Records = make([]RecordsNetFlowV5, int(packet.Count)) // maximum is 65535 which would be 3MB
for i := 0; i < int(packet.Count) && payload.Len() >= 48; i++ {
record := RecordsNetFlowV5{}
if err := utils.BinaryDecoder(payload, &record); err != nil {
var srcAddr, dstAddr, nextHop uint32
if err := utils.BinaryDecoder(payload,
&srcAddr,
&dstAddr,
&nextHop,
&record.Input,
&record.Output,
&record.DPkts,
&record.DOctets,
&record.First,
&record.Last,
&record.SrcPort,
&record.DstPort,
&record.Pad1,
&record.TCPFlags,
&record.Proto,
&record.Tos,
&record.SrcAS,
&record.DstAS,
&record.SrcMask,
&record.DstMask,
&record.Pad2,
); err != nil {
return &DecoderError{err}
}
record.SrcAddr = IPAddress(srcAddr)
record.DstAddr = IPAddress(dstAddr)
record.NextHop = IPAddress(nextHop)
packet.Records[i] = record
}

View File

@@ -70,7 +70,7 @@ func DecodeIP(payload *bytes.Buffer) (uint32, []byte, error) {
return ipVersion, ip, fmt.Errorf("DecodeIP: unknown IP version %d", ipVersion)
}
if payload.Len() >= len(ip) {
if err := utils.BinaryDecoder(payload, &ip); err != nil {
if err := utils.BinaryDecoder(payload, ip); err != nil {
return 0, nil, fmt.Errorf("DecodeIP: [%w]", err)
}
} else {
@@ -86,13 +86,47 @@ func DecodeCounterRecord(header *RecordHeader, payload *bytes.Buffer) (CounterRe
switch header.DataFormat {
case 1:
var ifCounters IfCounters
if err := utils.BinaryDecoder(payload, &ifCounters); err != nil {
if err := utils.BinaryDecoder(payload,
&ifCounters.IfIndex,
&ifCounters.IfType,
&ifCounters.IfSpeed,
&ifCounters.IfDirection,
&ifCounters.IfStatus,
&ifCounters.IfInOctets,
&ifCounters.IfInUcastPkts,
&ifCounters.IfInMulticastPkts,
&ifCounters.IfInBroadcastPkts,
&ifCounters.IfInDiscards,
&ifCounters.IfInErrors,
&ifCounters.IfInUnknownProtos,
&ifCounters.IfOutOctets,
&ifCounters.IfOutUcastPkts,
&ifCounters.IfOutMulticastPkts,
&ifCounters.IfOutBroadcastPkts,
&ifCounters.IfOutDiscards,
&ifCounters.IfOutErrors,
&ifCounters.IfPromiscuousMode,
); err != nil {
return counterRecord, &RecordError{header.DataFormat, err}
}
counterRecord.Data = ifCounters
case 2:
var ethernetCounters EthernetCounters
if err := utils.BinaryDecoder(payload, &ethernetCounters); err != nil {
if err := utils.BinaryDecoder(payload,
&ethernetCounters.Dot3StatsAlignmentErrors,
&ethernetCounters.Dot3StatsFCSErrors,
&ethernetCounters.Dot3StatsSingleCollisionFrames,
&ethernetCounters.Dot3StatsMultipleCollisionFrames,
&ethernetCounters.Dot3StatsSQETestErrors,
&ethernetCounters.Dot3StatsDeferredTransmissions,
&ethernetCounters.Dot3StatsLateCollisions,
&ethernetCounters.Dot3StatsExcessiveCollisions,
&ethernetCounters.Dot3StatsInternalMacTransmitErrors,
&ethernetCounters.Dot3StatsCarrierSenseErrors,
&ethernetCounters.Dot3StatsFrameTooLongs,
&ethernetCounters.Dot3StatsInternalMacReceiveErrors,
&ethernetCounters.Dot3StatsSymbolErrors,
); err != nil {
return counterRecord, &RecordError{header.DataFormat, err}
}
counterRecord.Data = ethernetCounters
@@ -113,7 +147,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
switch header.DataFormat {
case FORMAT_EXT_SWITCH:
extendedSwitch := ExtendedSwitch{}
err := utils.BinaryDecoder(payload, &extendedSwitch)
err := utils.BinaryDecoder(payload, &extendedSwitch.SrcVlan, &extendedSwitch.SrcPriority, &extendedSwitch.DstVlan, &extendedSwitch.DstPriority)
if err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
@@ -124,7 +158,8 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
&sampledHeader.Protocol,
&sampledHeader.FrameLength,
&sampledHeader.Stripped,
&sampledHeader.OriginalLength); err != nil {
&sampledHeader.OriginalLength,
); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
sampledHeader.HeaderData = payload.Bytes()
@@ -136,7 +171,16 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
DstIP: make([]byte, 4),
},
}
if err := utils.BinaryDecoder(payload, &sampledIP.SampledIPBase, &sampledIP.Tos); err != nil {
if err := utils.BinaryDecoder(payload,
&sampledIP.SampledIPBase.Length,
&sampledIP.SampledIPBase.Protocol,
sampledIP.SampledIPBase.SrcIP,
sampledIP.SampledIPBase.DstIP,
&sampledIP.SampledIPBase.SrcPort,
&sampledIP.SampledIPBase.DstPort,
&sampledIP.SampledIPBase.TcpFlags,
&sampledIP.Tos,
); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = sampledIP
@@ -147,7 +191,16 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
DstIP: make([]byte, 16),
},
}
if err := utils.BinaryDecoder(payload, &sampledIP.SampledIPBase, &sampledIP.Priority); err != nil {
if err := utils.BinaryDecoder(payload,
&sampledIP.SampledIPBase.Length,
&sampledIP.SampledIPBase.Protocol,
sampledIP.SampledIPBase.SrcIP,
sampledIP.SampledIPBase.DstIP,
&sampledIP.SampledIPBase.SrcPort,
&sampledIP.SampledIPBase.DstPort,
&sampledIP.SampledIPBase.TcpFlags,
&sampledIP.Priority,
); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = sampledIP
@@ -156,7 +209,10 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
if extendedRouter.NextHopIPVersion, extendedRouter.NextHop, err = DecodeIP(payload); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
if err := utils.BinaryDecoder(payload, &extendedRouter.SrcMaskLen, &extendedRouter.DstMaskLen); err != nil {
if err := utils.BinaryDecoder(payload,
&extendedRouter.SrcMaskLen,
&extendedRouter.DstMaskLen,
); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
flowRecord.Data = extendedRouter
@@ -165,13 +221,20 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
if extendedGateway.NextHopIPVersion, extendedGateway.NextHop, err = DecodeIP(payload); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
if err := utils.BinaryDecoder(payload, &extendedGateway.AS, &extendedGateway.SrcAS, &extendedGateway.SrcPeerAS,
&extendedGateway.ASDestinations); err != nil {
if err := utils.BinaryDecoder(payload,
&extendedGateway.AS,
&extendedGateway.SrcAS,
&extendedGateway.SrcPeerAS,
&extendedGateway.ASDestinations,
); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
var asPath []uint32
if extendedGateway.ASDestinations != 0 {
if err := utils.BinaryDecoder(payload, &extendedGateway.ASPathType, &extendedGateway.ASPathLength); err != nil {
if err := utils.BinaryDecoder(payload,
&extendedGateway.ASPathType,
&extendedGateway.ASPathLength,
); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
// protection for as-path length
@@ -190,7 +253,9 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord,
}
extendedGateway.ASPath = asPath
if err := utils.BinaryDecoder(payload, &extendedGateway.CommunitiesLength); err != nil {
if err := utils.BinaryDecoder(payload,
&extendedGateway.CommunitiesLength,
); err != nil {
return flowRecord, &RecordError{header.DataFormat, err}
}
// protection for communities length
@@ -224,7 +289,9 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
format := header.Format
var sample interface{}
if err := utils.BinaryDecoder(payload, &header.SampleSequenceNumber); err != nil {
if err := utils.BinaryDecoder(payload,
&header.SampleSequenceNumber,
); err != nil {
return sample, fmt.Errorf("header seq [%w]", err)
}
seq := header.SampleSequenceNumber
@@ -237,7 +304,10 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
header.SourceIdType = sourceId >> 24
header.SourceIdValue = sourceId & 0x00ffffff
} else if format == FORMAT_IPV4 || format == FORMAT_IPV6 {
if err := utils.BinaryDecoder(payload, &header.SourceIdType, &header.SourceIdValue); err != nil {
if err := utils.BinaryDecoder(payload,
&header.SourceIdType,
&header.SourceIdValue,
); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("header source [%w]", err)}
}
} else {
@@ -252,8 +322,14 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
flowSample = FlowSample{
Header: *header,
}
if err := utils.BinaryDecoder(payload, &flowSample.SamplingRate, &flowSample.SamplePool,
&flowSample.Drops, &flowSample.Input, &flowSample.Output, &flowSample.FlowRecordsCount); err != nil {
if err := utils.BinaryDecoder(payload,
&flowSample.SamplingRate,
&flowSample.SamplePool,
&flowSample.Drops,
&flowSample.Input,
&flowSample.Output,
&flowSample.FlowRecordsCount,
); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("raw [%w]", err)}
}
recordsCount = flowSample.FlowRecordsCount
@@ -287,7 +363,8 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
&expandedFlowSample.InputIfValue,
&expandedFlowSample.OutputIfFormat,
&expandedFlowSample.OutputIfValue,
&expandedFlowSample.FlowRecordsCount); err != nil {
&expandedFlowSample.FlowRecordsCount,
); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("IPv4 [%w]", err)}
}
recordsCount = expandedFlowSample.FlowRecordsCount
@@ -296,7 +373,10 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err
}
for i := 0; i < int(recordsCount) && payload.Len() >= 8; i++ {
recordHeader := RecordHeader{}
if err := utils.BinaryDecoder(payload, &recordHeader.DataFormat, &recordHeader.Length); err != nil {
if err := utils.BinaryDecoder(payload,
&recordHeader.DataFormat,
&recordHeader.Length,
); err != nil {
return sample, &FlowError{format, seq, fmt.Errorf("record header [%w]", err)}
}
if int(recordHeader.Length) > payload.Len() {
@@ -361,7 +441,8 @@ func DecodeMessage(payload *bytes.Buffer, packetV5 *Packet) error {
&packetV5.SubAgentId,
&packetV5.SequenceNumber,
&packetV5.Uptime,
&packetV5.SamplesCount); err != nil {
&packetV5.SamplesCount,
); err != nil {
return &DecoderError{err}
}
if packetV5.SamplesCount > 1000 {

View File

@@ -1,16 +1,128 @@
package utils
import (
"bytes"
"encoding/binary"
"errors"
"io"
"reflect"
)
func BinaryDecoder(payload io.Reader, dests ...interface{}) error {
type BytesBuffer interface {
io.Reader
Next(int) []byte
}
func BinaryDecoder(payload *bytes.Buffer, dests ...interface{}) error {
for _, dest := range dests {
err := binary.Read(payload, binary.BigEndian, dest)
err := BinaryRead(payload, binary.BigEndian, dest)
if err != nil {
return err
}
}
return nil
}
func BinaryRead(payload BytesBuffer, order binary.ByteOrder, data any) error {
// Fast path for basic types and slices.
if n := intDataSize(data); n != 0 {
bs := payload.Next(n)
if len(bs) < n {
return io.ErrUnexpectedEOF
}
switch data := data.(type) {
case *bool:
*data = bs[0] != 0
case *int8:
*data = int8(bs[0])
case *uint8:
*data = bs[0]
case *int16:
*data = int16(order.Uint16(bs))
case *uint16:
*data = order.Uint16(bs)
case *int32:
*data = int32(order.Uint32(bs))
case *uint32:
*data = order.Uint32(bs)
case *int64:
*data = int64(order.Uint64(bs))
case *uint64:
*data = order.Uint64(bs)
case []bool:
for i, x := range bs { // Easier to loop over the input for 8-bit values.
data[i] = x != 0
}
case []int8:
for i, x := range bs {
data[i] = int8(x)
}
case []uint8:
copy(data, bs)
case []int16:
for i := range data {
data[i] = int16(order.Uint16(bs[2*i:]))
}
case []uint16:
for i := range data {
data[i] = order.Uint16(bs[2*i:])
}
case []int32:
for i := range data {
data[i] = int32(order.Uint32(bs[4*i:]))
}
case []uint32:
for i := range data {
data[i] = order.Uint32(bs[4*i:])
}
case []int64:
for i := range data {
data[i] = int64(order.Uint64(bs[8*i:]))
}
case []uint64:
for i := range data {
data[i] = order.Uint64(bs[8*i:])
}
default:
n = 0 // fast path doesn't apply
}
if n != 0 {
return nil
}
}
return errors.New("binary.Read: invalid type " + reflect.TypeOf(data).String())
}
// intDataSize returns the size of the data required to represent the data when encoded.
// It returns zero if the type cannot be implemented by the fast path in Read or Write.
func intDataSize(data any) int {
switch data := data.(type) {
case bool, int8, uint8, *bool, *int8, *uint8:
return 1
case []bool:
return len(data)
case []int8:
return len(data)
case []uint8:
return len(data)
case int16, uint16, *int16, *uint16:
return 2
case []int16:
return 2 * len(data)
case []uint16:
return 2 * len(data)
case int32, uint32, *int32, *uint32:
return 4
case []int32:
return 4 * len(data)
case []uint32:
return 4 * len(data)
case int64, uint64, *int64, *uint64:
return 8
case []int64:
return 8 * len(data)
case []uint64:
return 8 * len(data)
}
return 0
}

View File

@@ -0,0 +1,138 @@
package utils
import (
"encoding/binary"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func testBinaryRead(buf BytesBuffer, data any) error {
order := binary.BigEndian
return BinaryRead(buf, order, data)
}
func testBinaryReadComparison(buf BytesBuffer, data any) error {
order := binary.BigEndian
return binary.Read(buf, order, data)
}
type benchFct func(buf BytesBuffer, data any) error
func TestBinaryReadInteger(t *testing.T) {
buf := newTestBuf([]byte{1, 2, 3, 4})
var dest uint32
err := testBinaryRead(buf, &dest)
require.NoError(t, err)
assert.Equal(t, uint32(0x1020304), dest)
}
func TestBinaryReadBytes(t *testing.T) {
buf := newTestBuf([]byte{1, 2, 3, 4})
dest := make([]byte, 4)
err := testBinaryRead(buf, dest)
require.NoError(t, err)
}
func TestBinaryReadUints(t *testing.T) {
buf := newTestBuf([]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4})
dest := make([]uint32, 4)
err := testBinaryRead(buf, dest)
require.NoError(t, err)
assert.Equal(t, uint32(0x1020304), dest[0])
}
type testBuf struct {
buf []byte
off int
}
func newTestBuf(data []byte) *testBuf {
return &testBuf{
buf: data,
}
}
func (b *testBuf) Next(n int) []byte {
if n > len(b.buf) {
return b.buf
}
return b.buf[0:n]
}
func (b *testBuf) Reset() {
b.off = 0
}
func (b *testBuf) Read(p []byte) (int, error) {
if len(b.buf) == 0 || b.off >= len(b.buf) {
return 0, io.EOF
}
n := copy(p, b.buf[b.off:])
b.off += n
return n, nil
}
func benchBinaryRead(b *testing.B, buf *testBuf, dest any, cmp bool) {
var fct benchFct
if cmp {
fct = testBinaryReadComparison
} else {
fct = testBinaryRead
}
for n := 0; n < b.N; n++ {
fct(buf, dest)
buf.Reset()
}
}
func BenchmarkBinaryReadIntegerBase(b *testing.B) {
buf := newTestBuf([]byte{1, 2, 3, 4})
var dest uint32
benchBinaryRead(b, buf, &dest, false)
}
func BenchmarkBinaryReadIntegerComparison(b *testing.B) {
buf := newTestBuf([]byte{1, 2, 3, 4})
var dest uint32
benchBinaryRead(b, buf, &dest, true)
}
func BenchmarkBinaryReadByteBase(b *testing.B) {
buf := newTestBuf([]byte{1, 2, 3, 4})
var dest byte
benchBinaryRead(b, buf, &dest, false)
}
func BBenchmarkBinaryReadByteComparison(b *testing.B) {
buf := newTestBuf([]byte{1, 2, 3, 4})
var dest byte
benchBinaryRead(b, buf, &dest, true)
}
func BenchmarkBinaryReadBytesBase(b *testing.B) {
buf := newTestBuf([]byte{1, 2, 3, 4})
dest := make([]byte, 4)
benchBinaryRead(b, buf, dest, false)
}
func BenchmarkBinaryReadBytesComparison(b *testing.B) {
buf := newTestBuf([]byte{1, 2, 3, 4})
dest := make([]byte, 4)
benchBinaryRead(b, buf, dest, true)
}
func BenchmarkBinaryReadUintsBase(b *testing.B) {
buf := newTestBuf([]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4})
dest := make([]uint32, 4)
benchBinaryRead(b, buf, dest, false)
}
func BenchmarkBinaryReadUintsComparison(b *testing.B) {
buf := newTestBuf([]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4})
dest := make([]uint32, 4)
benchBinaryRead(b, buf, dest, true)
}

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/netsampler/goflow2/v2/decoders/netflow"
"github.com/netsampler/goflow2/v2/decoders/utils"
flowmessage "github.com/netsampler/goflow2/v2/pb"
"github.com/netsampler/goflow2/v2/producer"
)
@@ -77,20 +78,20 @@ func NetFlowPopulate(dataFields []netflow.DataField, typeId uint16, addr interfa
exists, value := NetFlowLookFor(dataFields, typeId)
if exists && value != nil {
valueBytes, ok := value.([]byte)
valueReader := bytes.NewReader(valueBytes)
valueReader := bytes.NewBuffer(valueBytes)
if ok {
switch addrt := addr.(type) {
//case *(net.IP):
// *addrt = valueBytes
case *(time.Time):
t := uint64(0)
if err := binary.Read(valueReader, binary.BigEndian, &t); err != nil {
if err := utils.BinaryRead(valueReader, binary.BigEndian, &t); err != nil {
return false, err
}
t64 := int64(t / 1000)
*addrt = time.Unix(t64, 0)
default:
if err := binary.Read(valueReader, binary.BigEndian, addr); err != nil {
if err := utils.BinaryRead(valueReader, binary.BigEndian, addr); err != nil {
return false, err
}
}