From 34a0c1618e6fcf0d509c4553b3236b4aa85c71ca Mon Sep 17 00:00:00 2001 From: Louis Date: Mon, 4 Sep 2023 03:19:41 -0700 Subject: [PATCH] decoders: port reader changes from v1 to v2 (#204) --- decoders/netflow/netflow.go | 52 ++++++++--- decoders/netflowlegacy/netflow.go | 28 +++++- decoders/sflow/sflow.go | 119 ++++++++++++++++++++++---- decoders/utils/utils.go | 116 ++++++++++++++++++++++++- decoders/utils/utils_test.go | 138 ++++++++++++++++++++++++++++++ producer/proto/producer_nf.go | 7 +- 6 files changed, 423 insertions(+), 37 deletions(-) create mode 100644 decoders/utils/utils_test.go diff --git a/decoders/netflow/netflow.go b/decoders/netflow/netflow.go index d7e351e..ee800b5 100644 --- a/decoders/netflow/netflow.go +++ b/decoders/netflow/netflow.go @@ -42,7 +42,11 @@ func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateR var err error for payload.Len() >= 4 { optsTemplateRecord := NFv9OptionsTemplateRecord{} - err = utils.BinaryDecoder(payload, &optsTemplateRecord.TemplateId, &optsTemplateRecord.ScopeLength, &optsTemplateRecord.OptionLength) + err = utils.BinaryDecoder(payload, + &optsTemplateRecord.TemplateId, + &optsTemplateRecord.ScopeLength, + &optsTemplateRecord.OptionLength, + ) if err != nil { return records, err } @@ -80,12 +84,17 @@ func DecodeNFv9OptionsTemplateSet(payload *bytes.Buffer) ([]NFv9OptionsTemplateR } func DecodeField(payload *bytes.Buffer, field *Field, pen bool) error { - if err := utils.BinaryDecoder(payload, &field.Type, &field.Length); err != nil { + if err := utils.BinaryDecoder(payload, + &field.Type, + &field.Length, + ); err != nil { return err } if pen && field.Type&0x8000 != 0 { field.PenProvided = true - return utils.BinaryDecoder(payload, &field.Pen) + return utils.BinaryDecoder(payload, + &field.Pen, + ) } return nil } @@ -138,7 +147,10 @@ func DecodeTemplateSet(version uint16, payload *bytes.Buffer) ([]TemplateRecord, var err error for payload.Len() >= 4 { templateRecord := TemplateRecord{} - err = utils.BinaryDecoder(payload, &templateRecord.TemplateId, &templateRecord.FieldCount) + err = utils.BinaryDecoder(payload, + &templateRecord.TemplateId, + &templateRecord.FieldCount, + ) if err != nil { return records, fmt.Errorf("TemplateSet: reading header [%w]", err) } @@ -150,13 +162,18 @@ func DecodeTemplateSet(version uint16, payload *bytes.Buffer) ([]TemplateRecord, fields := make([]Field, int(templateRecord.FieldCount)) // max 65532 which would be 589KB for i := 0; i < int(templateRecord.FieldCount); i++ { field := Field{} - if err := utils.BinaryDecoder(payload, &field.Type, &field.Length); err != nil { + if err := utils.BinaryDecoder(payload, + &field.Type, + &field.Length, + ); err != nil { return records, fmt.Errorf("TemplateSet: reading field [%w]", err) } if version == 10 && field.Type&0x8000 != 0 { field.PenProvided = true field.Type = field.Type ^ 0x8000 - if err := utils.BinaryDecoder(payload, &field.Pen); err != nil { + if err := utils.BinaryDecoder(payload, + &field.Pen, + ); err != nil { return records, fmt.Errorf("TemplateSet: reading enterprise field [%w]", err) } } @@ -190,11 +207,15 @@ func DecodeDataSetUsingFields(version uint16, payload *bytes.Buffer, listFields if templateField.Length == 0xffff { var variableLen8 byte var variableLen16 uint16 - if err := utils.BinaryDecoder(payload, &variableLen8); err != nil { + if err := utils.BinaryDecoder(payload, + &variableLen8, + ); err != nil { return nil, err } if variableLen8 == 0xff { - if err := utils.BinaryDecoder(payload, &variableLen16); err != nil { + if err := utils.BinaryDecoder(payload, + &variableLen16, + ); err != nil { return nil, err } finalLength = int(variableLen16) @@ -266,7 +287,10 @@ func DecodeMessageCommon(payload *bytes.Buffer, templates NetFlowTemplateSystem, for i := 0; ((i < int(size) && version == 9) || version == 10) && payload.Len() > 0; i++ { fsheader := FlowSetHeader{} - if err := utils.BinaryDecoder(payload, &fsheader); err != nil { + if err := utils.BinaryDecoder(payload, + &fsheader.Id, + &fsheader.Length, + ); err != nil { return flowSet, fmt.Errorf("header [%w]", err) } @@ -418,7 +442,8 @@ func DecodeMessageNetFlow(payload *bytes.Buffer, templates NetFlowTemplateSystem &packetNFv9.SystemUptime, &packetNFv9.UnixSeconds, &packetNFv9.SequenceNumber, - &packetNFv9.SourceId); err != nil { + &packetNFv9.SourceId, + ); err != nil { return &DecoderError{"NetFlowV9 header", err} } /*size = packetNFv9.Count @@ -438,7 +463,8 @@ func DecodeMessageIPFIX(payload *bytes.Buffer, templates NetFlowTemplateSystem, &packetIPFIX.Length, &packetIPFIX.ExportTime, &packetIPFIX.SequenceNumber, - &packetIPFIX.ObservationDomainId); err != nil { + &packetIPFIX.ObservationDomainId, + ); err != nil { return &DecoderError{"IPFIX header", err} } /*size = packetIPFIX.Length @@ -455,7 +481,9 @@ func DecodeMessageIPFIX(payload *bytes.Buffer, templates NetFlowTemplateSystem, func DecodeMessageVersion(payload *bytes.Buffer, templates NetFlowTemplateSystem, packetNFv9 *NFv9Packet, packetIPFIX *IPFIXPacket) error { var version uint16 - if err := utils.BinaryDecoder(payload, &version); err != nil { + if err := utils.BinaryDecoder(payload, + &version, + ); err != nil { return &DecoderError{"IPFIX/NetFlowV9 version", err} } diff --git a/decoders/netflowlegacy/netflow.go b/decoders/netflowlegacy/netflow.go index d498b85..a2f029f 100644 --- a/decoders/netflowlegacy/netflow.go +++ b/decoders/netflowlegacy/netflow.go @@ -48,9 +48,35 @@ func DecodeMessage(payload *bytes.Buffer, packet *PacketNetFlowV5) error { packet.Records = make([]RecordsNetFlowV5, int(packet.Count)) // maximum is 65535 which would be 3MB for i := 0; i < int(packet.Count) && payload.Len() >= 48; i++ { record := RecordsNetFlowV5{} - if err := utils.BinaryDecoder(payload, &record); err != nil { + var srcAddr, dstAddr, nextHop uint32 + + if err := utils.BinaryDecoder(payload, + &srcAddr, + &dstAddr, + &nextHop, + &record.Input, + &record.Output, + &record.DPkts, + &record.DOctets, + &record.First, + &record.Last, + &record.SrcPort, + &record.DstPort, + &record.Pad1, + &record.TCPFlags, + &record.Proto, + &record.Tos, + &record.SrcAS, + &record.DstAS, + &record.SrcMask, + &record.DstMask, + &record.Pad2, + ); err != nil { return &DecoderError{err} } + record.SrcAddr = IPAddress(srcAddr) + record.DstAddr = IPAddress(dstAddr) + record.NextHop = IPAddress(nextHop) packet.Records[i] = record } diff --git a/decoders/sflow/sflow.go b/decoders/sflow/sflow.go index 6b9e386..20656d7 100644 --- a/decoders/sflow/sflow.go +++ b/decoders/sflow/sflow.go @@ -70,7 +70,7 @@ func DecodeIP(payload *bytes.Buffer) (uint32, []byte, error) { return ipVersion, ip, fmt.Errorf("DecodeIP: unknown IP version %d", ipVersion) } if payload.Len() >= len(ip) { - if err := utils.BinaryDecoder(payload, &ip); err != nil { + if err := utils.BinaryDecoder(payload, ip); err != nil { return 0, nil, fmt.Errorf("DecodeIP: [%w]", err) } } else { @@ -86,13 +86,47 @@ func DecodeCounterRecord(header *RecordHeader, payload *bytes.Buffer) (CounterRe switch header.DataFormat { case 1: var ifCounters IfCounters - if err := utils.BinaryDecoder(payload, &ifCounters); err != nil { + if err := utils.BinaryDecoder(payload, + &ifCounters.IfIndex, + &ifCounters.IfType, + &ifCounters.IfSpeed, + &ifCounters.IfDirection, + &ifCounters.IfStatus, + &ifCounters.IfInOctets, + &ifCounters.IfInUcastPkts, + &ifCounters.IfInMulticastPkts, + &ifCounters.IfInBroadcastPkts, + &ifCounters.IfInDiscards, + &ifCounters.IfInErrors, + &ifCounters.IfInUnknownProtos, + &ifCounters.IfOutOctets, + &ifCounters.IfOutUcastPkts, + &ifCounters.IfOutMulticastPkts, + &ifCounters.IfOutBroadcastPkts, + &ifCounters.IfOutDiscards, + &ifCounters.IfOutErrors, + &ifCounters.IfPromiscuousMode, + ); err != nil { return counterRecord, &RecordError{header.DataFormat, err} } counterRecord.Data = ifCounters case 2: var ethernetCounters EthernetCounters - if err := utils.BinaryDecoder(payload, ðernetCounters); err != nil { + if err := utils.BinaryDecoder(payload, + ðernetCounters.Dot3StatsAlignmentErrors, + ðernetCounters.Dot3StatsFCSErrors, + ðernetCounters.Dot3StatsSingleCollisionFrames, + ðernetCounters.Dot3StatsMultipleCollisionFrames, + ðernetCounters.Dot3StatsSQETestErrors, + ðernetCounters.Dot3StatsDeferredTransmissions, + ðernetCounters.Dot3StatsLateCollisions, + ðernetCounters.Dot3StatsExcessiveCollisions, + ðernetCounters.Dot3StatsInternalMacTransmitErrors, + ðernetCounters.Dot3StatsCarrierSenseErrors, + ðernetCounters.Dot3StatsFrameTooLongs, + ðernetCounters.Dot3StatsInternalMacReceiveErrors, + ðernetCounters.Dot3StatsSymbolErrors, + ); err != nil { return counterRecord, &RecordError{header.DataFormat, err} } counterRecord.Data = ethernetCounters @@ -113,7 +147,7 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord, switch header.DataFormat { case FORMAT_EXT_SWITCH: extendedSwitch := ExtendedSwitch{} - err := utils.BinaryDecoder(payload, &extendedSwitch) + err := utils.BinaryDecoder(payload, &extendedSwitch.SrcVlan, &extendedSwitch.SrcPriority, &extendedSwitch.DstVlan, &extendedSwitch.DstPriority) if err != nil { return flowRecord, &RecordError{header.DataFormat, err} } @@ -124,7 +158,8 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord, &sampledHeader.Protocol, &sampledHeader.FrameLength, &sampledHeader.Stripped, - &sampledHeader.OriginalLength); err != nil { + &sampledHeader.OriginalLength, + ); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } sampledHeader.HeaderData = payload.Bytes() @@ -136,7 +171,16 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord, DstIP: make([]byte, 4), }, } - if err := utils.BinaryDecoder(payload, &sampledIP.SampledIPBase, &sampledIP.Tos); err != nil { + if err := utils.BinaryDecoder(payload, + &sampledIP.SampledIPBase.Length, + &sampledIP.SampledIPBase.Protocol, + sampledIP.SampledIPBase.SrcIP, + sampledIP.SampledIPBase.DstIP, + &sampledIP.SampledIPBase.SrcPort, + &sampledIP.SampledIPBase.DstPort, + &sampledIP.SampledIPBase.TcpFlags, + &sampledIP.Tos, + ); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } flowRecord.Data = sampledIP @@ -147,7 +191,16 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord, DstIP: make([]byte, 16), }, } - if err := utils.BinaryDecoder(payload, &sampledIP.SampledIPBase, &sampledIP.Priority); err != nil { + if err := utils.BinaryDecoder(payload, + &sampledIP.SampledIPBase.Length, + &sampledIP.SampledIPBase.Protocol, + sampledIP.SampledIPBase.SrcIP, + sampledIP.SampledIPBase.DstIP, + &sampledIP.SampledIPBase.SrcPort, + &sampledIP.SampledIPBase.DstPort, + &sampledIP.SampledIPBase.TcpFlags, + &sampledIP.Priority, + ); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } flowRecord.Data = sampledIP @@ -156,7 +209,10 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord, if extendedRouter.NextHopIPVersion, extendedRouter.NextHop, err = DecodeIP(payload); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } - if err := utils.BinaryDecoder(payload, &extendedRouter.SrcMaskLen, &extendedRouter.DstMaskLen); err != nil { + if err := utils.BinaryDecoder(payload, + &extendedRouter.SrcMaskLen, + &extendedRouter.DstMaskLen, + ); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } flowRecord.Data = extendedRouter @@ -165,13 +221,20 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord, if extendedGateway.NextHopIPVersion, extendedGateway.NextHop, err = DecodeIP(payload); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } - if err := utils.BinaryDecoder(payload, &extendedGateway.AS, &extendedGateway.SrcAS, &extendedGateway.SrcPeerAS, - &extendedGateway.ASDestinations); err != nil { + if err := utils.BinaryDecoder(payload, + &extendedGateway.AS, + &extendedGateway.SrcAS, + &extendedGateway.SrcPeerAS, + &extendedGateway.ASDestinations, + ); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } var asPath []uint32 if extendedGateway.ASDestinations != 0 { - if err := utils.BinaryDecoder(payload, &extendedGateway.ASPathType, &extendedGateway.ASPathLength); err != nil { + if err := utils.BinaryDecoder(payload, + &extendedGateway.ASPathType, + &extendedGateway.ASPathLength, + ); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } // protection for as-path length @@ -190,7 +253,9 @@ func DecodeFlowRecord(header *RecordHeader, payload *bytes.Buffer) (FlowRecord, } extendedGateway.ASPath = asPath - if err := utils.BinaryDecoder(payload, &extendedGateway.CommunitiesLength); err != nil { + if err := utils.BinaryDecoder(payload, + &extendedGateway.CommunitiesLength, + ); err != nil { return flowRecord, &RecordError{header.DataFormat, err} } // protection for communities length @@ -224,7 +289,9 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err format := header.Format var sample interface{} - if err := utils.BinaryDecoder(payload, &header.SampleSequenceNumber); err != nil { + if err := utils.BinaryDecoder(payload, + &header.SampleSequenceNumber, + ); err != nil { return sample, fmt.Errorf("header seq [%w]", err) } seq := header.SampleSequenceNumber @@ -237,7 +304,10 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err header.SourceIdType = sourceId >> 24 header.SourceIdValue = sourceId & 0x00ffffff } else if format == FORMAT_IPV4 || format == FORMAT_IPV6 { - if err := utils.BinaryDecoder(payload, &header.SourceIdType, &header.SourceIdValue); err != nil { + if err := utils.BinaryDecoder(payload, + &header.SourceIdType, + &header.SourceIdValue, + ); err != nil { return sample, &FlowError{format, seq, fmt.Errorf("header source [%w]", err)} } } else { @@ -252,8 +322,14 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err flowSample = FlowSample{ Header: *header, } - if err := utils.BinaryDecoder(payload, &flowSample.SamplingRate, &flowSample.SamplePool, - &flowSample.Drops, &flowSample.Input, &flowSample.Output, &flowSample.FlowRecordsCount); err != nil { + if err := utils.BinaryDecoder(payload, + &flowSample.SamplingRate, + &flowSample.SamplePool, + &flowSample.Drops, + &flowSample.Input, + &flowSample.Output, + &flowSample.FlowRecordsCount, + ); err != nil { return sample, &FlowError{format, seq, fmt.Errorf("raw [%w]", err)} } recordsCount = flowSample.FlowRecordsCount @@ -287,7 +363,8 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err &expandedFlowSample.InputIfValue, &expandedFlowSample.OutputIfFormat, &expandedFlowSample.OutputIfValue, - &expandedFlowSample.FlowRecordsCount); err != nil { + &expandedFlowSample.FlowRecordsCount, + ); err != nil { return sample, &FlowError{format, seq, fmt.Errorf("IPv4 [%w]", err)} } recordsCount = expandedFlowSample.FlowRecordsCount @@ -296,7 +373,10 @@ func DecodeSample(header *SampleHeader, payload *bytes.Buffer) (interface{}, err } for i := 0; i < int(recordsCount) && payload.Len() >= 8; i++ { recordHeader := RecordHeader{} - if err := utils.BinaryDecoder(payload, &recordHeader.DataFormat, &recordHeader.Length); err != nil { + if err := utils.BinaryDecoder(payload, + &recordHeader.DataFormat, + &recordHeader.Length, + ); err != nil { return sample, &FlowError{format, seq, fmt.Errorf("record header [%w]", err)} } if int(recordHeader.Length) > payload.Len() { @@ -361,7 +441,8 @@ func DecodeMessage(payload *bytes.Buffer, packetV5 *Packet) error { &packetV5.SubAgentId, &packetV5.SequenceNumber, &packetV5.Uptime, - &packetV5.SamplesCount); err != nil { + &packetV5.SamplesCount, + ); err != nil { return &DecoderError{err} } if packetV5.SamplesCount > 1000 { diff --git a/decoders/utils/utils.go b/decoders/utils/utils.go index a36e3b2..9c8e597 100644 --- a/decoders/utils/utils.go +++ b/decoders/utils/utils.go @@ -1,16 +1,128 @@ package utils import ( + "bytes" "encoding/binary" + "errors" "io" + "reflect" ) -func BinaryDecoder(payload io.Reader, dests ...interface{}) error { +type BytesBuffer interface { + io.Reader + Next(int) []byte +} + +func BinaryDecoder(payload *bytes.Buffer, dests ...interface{}) error { for _, dest := range dests { - err := binary.Read(payload, binary.BigEndian, dest) + err := BinaryRead(payload, binary.BigEndian, dest) if err != nil { return err } } return nil } +func BinaryRead(payload BytesBuffer, order binary.ByteOrder, data any) error { + // Fast path for basic types and slices. + if n := intDataSize(data); n != 0 { + bs := payload.Next(n) + if len(bs) < n { + return io.ErrUnexpectedEOF + } + switch data := data.(type) { + case *bool: + *data = bs[0] != 0 + case *int8: + *data = int8(bs[0]) + case *uint8: + *data = bs[0] + case *int16: + *data = int16(order.Uint16(bs)) + case *uint16: + *data = order.Uint16(bs) + case *int32: + *data = int32(order.Uint32(bs)) + case *uint32: + *data = order.Uint32(bs) + case *int64: + *data = int64(order.Uint64(bs)) + case *uint64: + *data = order.Uint64(bs) + case []bool: + for i, x := range bs { // Easier to loop over the input for 8-bit values. + data[i] = x != 0 + } + case []int8: + for i, x := range bs { + data[i] = int8(x) + } + case []uint8: + copy(data, bs) + case []int16: + for i := range data { + data[i] = int16(order.Uint16(bs[2*i:])) + } + case []uint16: + for i := range data { + data[i] = order.Uint16(bs[2*i:]) + } + case []int32: + for i := range data { + data[i] = int32(order.Uint32(bs[4*i:])) + } + case []uint32: + for i := range data { + data[i] = order.Uint32(bs[4*i:]) + } + case []int64: + for i := range data { + data[i] = int64(order.Uint64(bs[8*i:])) + } + case []uint64: + for i := range data { + data[i] = order.Uint64(bs[8*i:]) + } + default: + n = 0 // fast path doesn't apply + } + if n != 0 { + return nil + } + } + + return errors.New("binary.Read: invalid type " + reflect.TypeOf(data).String()) +} + +// intDataSize returns the size of the data required to represent the data when encoded. +// It returns zero if the type cannot be implemented by the fast path in Read or Write. +func intDataSize(data any) int { + switch data := data.(type) { + case bool, int8, uint8, *bool, *int8, *uint8: + return 1 + case []bool: + return len(data) + case []int8: + return len(data) + case []uint8: + return len(data) + case int16, uint16, *int16, *uint16: + return 2 + case []int16: + return 2 * len(data) + case []uint16: + return 2 * len(data) + case int32, uint32, *int32, *uint32: + return 4 + case []int32: + return 4 * len(data) + case []uint32: + return 4 * len(data) + case int64, uint64, *int64, *uint64: + return 8 + case []int64: + return 8 * len(data) + case []uint64: + return 8 * len(data) + } + return 0 +} diff --git a/decoders/utils/utils_test.go b/decoders/utils/utils_test.go new file mode 100644 index 0000000..c9f598b --- /dev/null +++ b/decoders/utils/utils_test.go @@ -0,0 +1,138 @@ +package utils + +import ( + "encoding/binary" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testBinaryRead(buf BytesBuffer, data any) error { + order := binary.BigEndian + return BinaryRead(buf, order, data) +} + +func testBinaryReadComparison(buf BytesBuffer, data any) error { + order := binary.BigEndian + return binary.Read(buf, order, data) +} + +type benchFct func(buf BytesBuffer, data any) error + +func TestBinaryReadInteger(t *testing.T) { + buf := newTestBuf([]byte{1, 2, 3, 4}) + var dest uint32 + err := testBinaryRead(buf, &dest) + require.NoError(t, err) + assert.Equal(t, uint32(0x1020304), dest) +} + +func TestBinaryReadBytes(t *testing.T) { + buf := newTestBuf([]byte{1, 2, 3, 4}) + dest := make([]byte, 4) + err := testBinaryRead(buf, dest) + require.NoError(t, err) +} + +func TestBinaryReadUints(t *testing.T) { + buf := newTestBuf([]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}) + dest := make([]uint32, 4) + err := testBinaryRead(buf, dest) + require.NoError(t, err) + assert.Equal(t, uint32(0x1020304), dest[0]) +} + +type testBuf struct { + buf []byte + off int +} + +func newTestBuf(data []byte) *testBuf { + return &testBuf{ + buf: data, + } +} + +func (b *testBuf) Next(n int) []byte { + if n > len(b.buf) { + return b.buf + } + return b.buf[0:n] +} + +func (b *testBuf) Reset() { + b.off = 0 +} + +func (b *testBuf) Read(p []byte) (int, error) { + if len(b.buf) == 0 || b.off >= len(b.buf) { + return 0, io.EOF + } + + n := copy(p, b.buf[b.off:]) + b.off += n + return n, nil +} + +func benchBinaryRead(b *testing.B, buf *testBuf, dest any, cmp bool) { + var fct benchFct + if cmp { + fct = testBinaryReadComparison + } else { + fct = testBinaryRead + } + for n := 0; n < b.N; n++ { + fct(buf, dest) + buf.Reset() + } +} + +func BenchmarkBinaryReadIntegerBase(b *testing.B) { + buf := newTestBuf([]byte{1, 2, 3, 4}) + var dest uint32 + benchBinaryRead(b, buf, &dest, false) +} + +func BenchmarkBinaryReadIntegerComparison(b *testing.B) { + buf := newTestBuf([]byte{1, 2, 3, 4}) + var dest uint32 + benchBinaryRead(b, buf, &dest, true) +} + +func BenchmarkBinaryReadByteBase(b *testing.B) { + buf := newTestBuf([]byte{1, 2, 3, 4}) + var dest byte + benchBinaryRead(b, buf, &dest, false) +} + +func BBenchmarkBinaryReadByteComparison(b *testing.B) { + buf := newTestBuf([]byte{1, 2, 3, 4}) + var dest byte + benchBinaryRead(b, buf, &dest, true) +} + +func BenchmarkBinaryReadBytesBase(b *testing.B) { + buf := newTestBuf([]byte{1, 2, 3, 4}) + dest := make([]byte, 4) + benchBinaryRead(b, buf, dest, false) +} + +func BenchmarkBinaryReadBytesComparison(b *testing.B) { + buf := newTestBuf([]byte{1, 2, 3, 4}) + dest := make([]byte, 4) + benchBinaryRead(b, buf, dest, true) +} + +func BenchmarkBinaryReadUintsBase(b *testing.B) { + buf := newTestBuf([]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}) + dest := make([]uint32, 4) + benchBinaryRead(b, buf, dest, false) +} + +func BenchmarkBinaryReadUintsComparison(b *testing.B) { + buf := newTestBuf([]byte{1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}) + dest := make([]uint32, 4) + benchBinaryRead(b, buf, dest, true) +} diff --git a/producer/proto/producer_nf.go b/producer/proto/producer_nf.go index 19aca72..2a62250 100644 --- a/producer/proto/producer_nf.go +++ b/producer/proto/producer_nf.go @@ -8,6 +8,7 @@ import ( "time" "github.com/netsampler/goflow2/v2/decoders/netflow" + "github.com/netsampler/goflow2/v2/decoders/utils" flowmessage "github.com/netsampler/goflow2/v2/pb" "github.com/netsampler/goflow2/v2/producer" ) @@ -77,20 +78,20 @@ func NetFlowPopulate(dataFields []netflow.DataField, typeId uint16, addr interfa exists, value := NetFlowLookFor(dataFields, typeId) if exists && value != nil { valueBytes, ok := value.([]byte) - valueReader := bytes.NewReader(valueBytes) + valueReader := bytes.NewBuffer(valueBytes) if ok { switch addrt := addr.(type) { //case *(net.IP): // *addrt = valueBytes case *(time.Time): t := uint64(0) - if err := binary.Read(valueReader, binary.BigEndian, &t); err != nil { + if err := utils.BinaryRead(valueReader, binary.BigEndian, &t); err != nil { return false, err } t64 := int64(t / 1000) *addrt = time.Unix(t64, 0) default: - if err := binary.Read(valueReader, binary.BigEndian, addr); err != nil { + if err := utils.BinaryRead(valueReader, binary.BigEndian, addr); err != nil { return false, err } }