mirror of
https://github.com/openobserve/goflow2.git
synced 2025-10-23 07:11:57 +00:00
feat: handle panic (#254)
This commit is contained in:
@@ -39,6 +39,7 @@ import (
|
|||||||
// core libraries
|
// core libraries
|
||||||
"github.com/netsampler/goflow2/v2/metrics"
|
"github.com/netsampler/goflow2/v2/metrics"
|
||||||
"github.com/netsampler/goflow2/v2/utils"
|
"github.com/netsampler/goflow2/v2/utils"
|
||||||
|
"github.com/netsampler/goflow2/v2/utils/debug"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -128,6 +129,8 @@ func main() {
|
|||||||
log.Fatalf("producer %s does not exist", *Produce)
|
log.Fatalf("producer %s does not exist", *Produce)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// intercept panic and generate an error
|
||||||
|
flowProducer = debug.WrapPanicProducer(flowProducer)
|
||||||
// wrap producer with Prometheus metrics
|
// wrap producer with Prometheus metrics
|
||||||
flowProducer = metrics.WrapPromProducer(flowProducer)
|
flowProducer = metrics.WrapPromProducer(flowProducer)
|
||||||
|
|
||||||
@@ -273,7 +276,12 @@ func main() {
|
|||||||
l.Errorf("scheme %s does not exist", listenAddrUrl.Scheme)
|
l.Errorf("scheme %s does not exist", listenAddrUrl.Scheme)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
decodeFunc = metrics.PromDecoderWrapper(p.DecodeFlow, listenAddrUrl.Scheme)
|
|
||||||
|
decodeFunc = p.DecodeFlow
|
||||||
|
// intercept panic and generate error
|
||||||
|
decodeFunc = debug.PanicDecoderWrapper(decodeFunc)
|
||||||
|
// wrap decoder with Prometheus metrics
|
||||||
|
decodeFunc = metrics.PromDecoderWrapper(decodeFunc, listenAddrUrl.Scheme)
|
||||||
pipes = append(pipes, p)
|
pipes = append(pipes, p)
|
||||||
|
|
||||||
// starts receivers
|
// starts receivers
|
||||||
@@ -293,6 +301,15 @@ func main() {
|
|||||||
l := l.WithError(err)
|
l := l.WithError(err)
|
||||||
if errors.Is(err, netflow.ErrorTemplateNotFound) {
|
if errors.Is(err, netflow.ErrorTemplateNotFound) {
|
||||||
l.Warn("template error")
|
l.Warn("template error")
|
||||||
|
} else if errors.Is(err, debug.PanicError) {
|
||||||
|
var pErrMsg *debug.PanicErrorMessage
|
||||||
|
if errors.As(err, &pErrMsg) {
|
||||||
|
l = l.WithFields(log.Fields{
|
||||||
|
"message": pErrMsg.Msg,
|
||||||
|
"stacktrace": string(pErrMsg.Stacktrace),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
l.Error("intercepted panic")
|
||||||
} else if errors.Is(err, net.ErrClosed) {
|
} else if errors.Is(err, net.ErrClosed) {
|
||||||
l.Info("closed receiver")
|
l.Info("closed receiver")
|
||||||
} else {
|
} else {
|
||||||
|
23
utils/debug/debug.go
Normal file
23
utils/debug/debug.go
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
package debug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
PanicError = fmt.Errorf("panic")
|
||||||
|
)
|
||||||
|
|
||||||
|
type PanicErrorMessage struct {
|
||||||
|
Msg interface{}
|
||||||
|
Inner string
|
||||||
|
Stacktrace []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *PanicErrorMessage) Error() string {
|
||||||
|
return fmt.Sprintf("%s", e.Inner)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *PanicErrorMessage) Unwrap() []error {
|
||||||
|
return []error{PanicError}
|
||||||
|
}
|
22
utils/debug/decoder.go
Normal file
22
utils/debug/decoder.go
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
package debug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime/debug"
|
||||||
|
|
||||||
|
"github.com/netsampler/goflow2/v2/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
func PanicDecoderWrapper(wrapped utils.DecoderFunc) utils.DecoderFunc {
|
||||||
|
return func(msg interface{}) (err error) {
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if pErr := recover(); pErr != nil {
|
||||||
|
|
||||||
|
pErrC, _ := pErr.(string)
|
||||||
|
err = &PanicErrorMessage{Msg: msg, Inner: pErrC, Stacktrace: debug.Stack()}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
err = wrapped(msg)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
39
utils/debug/producer.go
Normal file
39
utils/debug/producer.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package debug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime/debug"
|
||||||
|
|
||||||
|
"github.com/netsampler/goflow2/v2/producer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PanicProducerWrapper struct {
|
||||||
|
wrapped producer.ProducerInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PanicProducerWrapper) Produce(msg interface{}, args *producer.ProduceArgs) (flowMessageSet []producer.ProducerMessage, err error) {
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if pErr := recover(); pErr != nil {
|
||||||
|
|
||||||
|
pErrC, _ := pErr.(string)
|
||||||
|
err = &PanicErrorMessage{Msg: msg, Inner: pErrC, Stacktrace: debug.Stack()}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
flowMessageSet, err = p.wrapped.Produce(msg, args)
|
||||||
|
return flowMessageSet, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PanicProducerWrapper) Close() {
|
||||||
|
p.wrapped.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PanicProducerWrapper) Commit(flowMessageSet []producer.ProducerMessage) {
|
||||||
|
p.wrapped.Commit(flowMessageSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
func WrapPanicProducer(wrapped producer.ProducerInterface) producer.ProducerInterface {
|
||||||
|
return &PanicProducerWrapper{
|
||||||
|
wrapped: wrapped,
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user