mirror of
https://github.com/openobserve/goflow2.git
synced 2025-10-23 07:11:57 +00:00
metrics: collect drops on receiver (#226)
This commit is contained in:
@@ -247,10 +247,11 @@ func main() {
|
|||||||
l.Info("starting collection")
|
l.Info("starting collection")
|
||||||
|
|
||||||
cfg := &utils.UDPReceiverConfig{
|
cfg := &utils.UDPReceiverConfig{
|
||||||
Sockets: numSockets,
|
Sockets: numSockets,
|
||||||
Workers: numWorkers,
|
Workers: numWorkers,
|
||||||
QueueSize: queueSize,
|
QueueSize: queueSize,
|
||||||
Blocking: isBlocking,
|
Blocking: isBlocking,
|
||||||
|
ReceiverCallback: metrics.NewReceiverMetric(),
|
||||||
}
|
}
|
||||||
recv, err := utils.NewUDPReceiver(cfg)
|
recv, err := utils.NewUDPReceiver(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -9,6 +9,22 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
MetricReceivedDroppedPackets = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "flow_dropped_packets_total",
|
||||||
|
Help: "Packets dropped before processing.",
|
||||||
|
Namespace: NAMESPACE,
|
||||||
|
},
|
||||||
|
[]string{"remote_ip", "local_ip", "local_port"},
|
||||||
|
)
|
||||||
|
MetricReceivedDroppedBytes = prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "flow_dropped_bytes_total",
|
||||||
|
Help: "Bytes dropped before processing.",
|
||||||
|
Namespace: NAMESPACE,
|
||||||
|
},
|
||||||
|
[]string{"remote_ip", "local_ip", "local_port"},
|
||||||
|
)
|
||||||
MetricTrafficBytes = prometheus.NewCounterVec(
|
MetricTrafficBytes = prometheus.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "flow_traffic_bytes_total",
|
Name: "flow_traffic_bytes_total",
|
||||||
@@ -114,6 +130,9 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
prometheus.MustRegister(MetricReceivedDroppedPackets)
|
||||||
|
prometheus.MustRegister(MetricReceivedDroppedBytes)
|
||||||
|
|
||||||
prometheus.MustRegister(MetricTrafficBytes)
|
prometheus.MustRegister(MetricTrafficBytes)
|
||||||
prometheus.MustRegister(MetricTrafficPackets)
|
prometheus.MustRegister(MetricTrafficPackets)
|
||||||
prometheus.MustRegister(MetricPacketSizeSum)
|
prometheus.MustRegister(MetricPacketSizeSum)
|
||||||
|
32
metrics/receiver.go
Normal file
32
metrics/receiver.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/netsampler/goflow2/v2/utils"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReceiverMetric struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReceiverMetric() *ReceiverMetric {
|
||||||
|
return &ReceiverMetric{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ReceiverMetric) Dropped(pkt utils.Message) {
|
||||||
|
remote := pkt.Src.Addr().Unmap().String()
|
||||||
|
localIP := pkt.Dst.Addr().Unmap().String()
|
||||||
|
|
||||||
|
port := fmt.Sprintf("%d", pkt.Dst.Port())
|
||||||
|
size := len(pkt.Payload)
|
||||||
|
|
||||||
|
labels := prometheus.Labels{
|
||||||
|
"remote_ip": remote,
|
||||||
|
"local_ip": localIP,
|
||||||
|
"local_port": port,
|
||||||
|
}
|
||||||
|
MetricReceivedDroppedPackets.With(labels).Inc()
|
||||||
|
MetricReceivedDroppedBytes.With(labels).Add(float64(size))
|
||||||
|
}
|
17
utils/udp.go
17
utils/udp.go
@@ -10,6 +10,10 @@ import (
|
|||||||
reuseport "github.com/libp2p/go-reuseport"
|
reuseport "github.com/libp2p/go-reuseport"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ReceiverCallback interface {
|
||||||
|
Dropped(msg Message)
|
||||||
|
}
|
||||||
|
|
||||||
// Callback used to decode a UDP message
|
// Callback used to decode a UDP message
|
||||||
type DecoderFunc func(msg interface{}) error
|
type DecoderFunc func(msg interface{}) error
|
||||||
|
|
||||||
@@ -48,6 +52,8 @@ type UDPReceiver struct {
|
|||||||
|
|
||||||
workers int
|
workers int
|
||||||
sockets int
|
sockets int
|
||||||
|
|
||||||
|
cb ReceiverCallback
|
||||||
}
|
}
|
||||||
|
|
||||||
type UDPReceiverConfig struct {
|
type UDPReceiverConfig struct {
|
||||||
@@ -55,6 +61,8 @@ type UDPReceiverConfig struct {
|
|||||||
Sockets int
|
Sockets int
|
||||||
Blocking bool
|
Blocking bool
|
||||||
QueueSize int
|
QueueSize int
|
||||||
|
|
||||||
|
ReceiverCallback ReceiverCallback
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUDPReceiver(cfg *UDPReceiverConfig) (*UDPReceiver, error) {
|
func NewUDPReceiver(cfg *UDPReceiverConfig) (*UDPReceiver, error) {
|
||||||
@@ -80,6 +88,7 @@ func NewUDPReceiver(cfg *UDPReceiverConfig) (*UDPReceiver, error) {
|
|||||||
r.workers = cfg.Workers
|
r.workers = cfg.Workers
|
||||||
dispatchSize = cfg.QueueSize
|
dispatchSize = cfg.QueueSize
|
||||||
r.blocking = cfg.Blocking
|
r.blocking = cfg.Blocking
|
||||||
|
r.cb = cfg.ReceiverCallback
|
||||||
}
|
}
|
||||||
|
|
||||||
if dispatchSize == 0 {
|
if dispatchSize == 0 {
|
||||||
@@ -171,6 +180,14 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {
|
|||||||
case <-r.q:
|
case <-r.q:
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
|
if r.cb != nil {
|
||||||
|
r.cb.Dropped(Message{
|
||||||
|
Src: pkt.src.AddrPort(),
|
||||||
|
Dst: pkt.dst.AddrPort(),
|
||||||
|
Payload: pkt.payload[0:pkt.size],
|
||||||
|
Received: pkt.received,
|
||||||
|
})
|
||||||
|
}
|
||||||
packetPool.Put(pkt)
|
packetPool.Put(pkt)
|
||||||
// increase counter
|
// increase counter
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user