mirror of
https://github.com/openobserve/goflow2.git
synced 2025-10-27 01:54:01 +00:00
producer: bugfix of sampling mapping for ipfix (#207)
This commit is contained in:
@@ -182,7 +182,7 @@ func GetTemplateSize(version uint16, template []Field) int {
|
|||||||
|
|
||||||
func DecodeDataSetUsingFields(version uint16, payload *bytes.Buffer, listFields []Field) ([]DataField, error) {
|
func DecodeDataSetUsingFields(version uint16, payload *bytes.Buffer, listFields []Field) ([]DataField, error) {
|
||||||
dataFields := make([]DataField, len(listFields))
|
dataFields := make([]DataField, len(listFields))
|
||||||
for payload.Len() >= GetTemplateSize(version, listFields) {
|
if payload.Len() >= GetTemplateSize(version, listFields) {
|
||||||
|
|
||||||
for i, templateField := range listFields {
|
for i, templateField := range listFields {
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package protoproducer
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -47,19 +46,11 @@ func (s *basicSamplingRateSystem) AddSamplingRate(version uint16, obsDomainId ui
|
|||||||
func (s *basicSamplingRateSystem) GetSamplingRate(version uint16, obsDomainId uint32) (uint32, error) {
|
func (s *basicSamplingRateSystem) GetSamplingRate(version uint16, obsDomainId uint32) (uint32, error) {
|
||||||
s.samplinglock.RLock()
|
s.samplinglock.RLock()
|
||||||
defer s.samplinglock.RUnlock()
|
defer s.samplinglock.RUnlock()
|
||||||
/*samplingVersion, okver := s.sampling[version]
|
|
||||||
if okver {
|
|
||||||
samplingRate, okid := samplingVersion[obsDomainId]
|
|
||||||
if okid {
|
|
||||||
return samplingRate, nil
|
|
||||||
}
|
|
||||||
return 0, errors.New("") // TBC
|
|
||||||
}*/
|
|
||||||
if samplingRate, ok := s.sampling[fmt.Sprintf("%d-%d", version, obsDomainId)]; ok {
|
if samplingRate, ok := s.sampling[fmt.Sprintf("%d-%d", version, obsDomainId)]; ok {
|
||||||
return samplingRate, nil
|
return samplingRate, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, errors.New("") // TBC // todo: now
|
return 0, fmt.Errorf("sampling rate not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
type SingleSamplingRateSystem struct {
|
type SingleSamplingRateSystem struct {
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ func (p *ProtoProducer) enrich(flowMessageSet []producer.ProducerMessage, cb fun
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProtoProducer) getSamplingRateSystem(args *producer.ProduceArgs) SamplingRateSystem {
|
func (p *ProtoProducer) getSamplingRateSystem(args *producer.ProduceArgs) SamplingRateSystem {
|
||||||
key := args.Src.String()
|
key := args.Src.Addr().String()
|
||||||
p.samplinglock.RLock()
|
p.samplinglock.RLock()
|
||||||
sampling, ok := p.sampling[key]
|
sampling, ok := p.sampling[key]
|
||||||
p.samplinglock.RUnlock()
|
p.samplinglock.RUnlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user