mirror of
https://github.com/openobserve/goflow2.git
synced 2025-10-23 07:11:57 +00:00
formatting improved with selectors and text output (#14)
* Improve format registration * less interleaving (json does not require importing protobuf) * generic text renderer * escape for strings in text/json formatter
This commit is contained in:
@@ -19,7 +19,8 @@ import (
|
||||
|
||||
// import various formatters
|
||||
"github.com/netsampler/goflow2/format"
|
||||
"github.com/netsampler/goflow2/format/json"
|
||||
"github.com/netsampler/goflow2/format/common"
|
||||
_ "github.com/netsampler/goflow2/format/json"
|
||||
_ "github.com/netsampler/goflow2/format/protobuf"
|
||||
|
||||
// import various transports
|
||||
@@ -85,8 +86,8 @@ func MapFlow(dbAsn, dbCountry *geoip2.Reader, msg *flowmessage.FlowMessageExt) {
|
||||
}
|
||||
|
||||
func init() {
|
||||
json.AddJSONField("SrcCountry", json.FORMAT_TYPE_STRING)
|
||||
json.AddJSONField("DstCountry", json.FORMAT_TYPE_STRING)
|
||||
common.AddTextField("SrcCountry", common.FORMAT_TYPE_STRING)
|
||||
common.AddTextField("DstCountry", common.FORMAT_TYPE_STRING)
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/netsampler/goflow2/format"
|
||||
_ "github.com/netsampler/goflow2/format/json"
|
||||
_ "github.com/netsampler/goflow2/format/protobuf"
|
||||
_ "github.com/netsampler/goflow2/format/text"
|
||||
|
||||
// import various transports
|
||||
"github.com/netsampler/goflow2/transport"
|
||||
@@ -40,7 +41,6 @@ var (
|
||||
|
||||
Format = flag.String("format", "json", fmt.Sprintf("Choose the format (available: %s)", strings.Join(format.GetFormats(), ", ")))
|
||||
Transport = flag.String("transport", "file", fmt.Sprintf("Choose the transport (available: %s)", strings.Join(transport.GetTransports(), ", ")))
|
||||
//FixedLength = flag.Bool("proto.fixedlen", false, "Enable fixed length protobuf")
|
||||
|
||||
MetricsAddr = flag.String("metrics.addr", ":8080", "Metrics address")
|
||||
MetricsPath = flag.String("metrics.path", "/metrics", "Metrics path")
|
||||
|
56
format/common/hash.go
Normal file
56
format/common/hash.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
fieldsVar string
|
||||
fields []string // Hashing fields
|
||||
|
||||
hashDeclared bool
|
||||
hashDeclaredLock = &sync.Mutex{}
|
||||
)
|
||||
|
||||
func HashFlag() {
|
||||
hashDeclaredLock.Lock()
|
||||
defer hashDeclaredLock.Unlock()
|
||||
|
||||
if hashDeclared {
|
||||
return
|
||||
}
|
||||
hashDeclared = true
|
||||
flag.StringVar(&fieldsVar, "format.hash", "SamplerAddress", "List of fields to do hashing, separated by commas")
|
||||
|
||||
}
|
||||
|
||||
func ManualHashInit() error {
|
||||
fields = strings.Split(fieldsVar, ",")
|
||||
return nil
|
||||
}
|
||||
|
||||
func HashProtoLocal(msg interface{}) string {
|
||||
return HashProto(fields, msg)
|
||||
}
|
||||
|
||||
func HashProto(fields []string, msg interface{}) string {
|
||||
var keyStr string
|
||||
|
||||
if msg != nil {
|
||||
vfm := reflect.ValueOf(msg)
|
||||
vfm = reflect.Indirect(vfm)
|
||||
|
||||
for _, kf := range fields {
|
||||
fieldValue := vfm.FieldByName(kf)
|
||||
if fieldValue.IsValid() {
|
||||
keyStr += fmt.Sprintf("%v-", fieldValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return keyStr
|
||||
}
|
38
format/common/selector.go
Normal file
38
format/common/selector.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
selectorVar string
|
||||
selector []string // Hashing fields
|
||||
selectorMap = make(map[string]bool)
|
||||
|
||||
selectorDeclared bool
|
||||
selectorDeclaredLock = &sync.Mutex{}
|
||||
)
|
||||
|
||||
func SelectorFlag() {
|
||||
selectorDeclaredLock.Lock()
|
||||
defer selectorDeclaredLock.Unlock()
|
||||
|
||||
if selectorDeclared {
|
||||
return
|
||||
}
|
||||
selectorDeclared = true
|
||||
flag.StringVar(&selectorVar, "format.selector", "", "List of fields to do keep in output")
|
||||
}
|
||||
|
||||
func ManualSelectorInit() error {
|
||||
if selectorVar == "" {
|
||||
return nil
|
||||
}
|
||||
selector = strings.Split(selectorVar, ",")
|
||||
for _, v := range selector {
|
||||
selectorMap[v] = true
|
||||
}
|
||||
return nil
|
||||
}
|
263
format/common/text.go
Normal file
263
format/common/text.go
Normal file
@@ -0,0 +1,263 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
FORMAT_TYPE_UNKNOWN = iota
|
||||
FORMAT_TYPE_STRING_FUNC
|
||||
FORMAT_TYPE_STRING
|
||||
FORMAT_TYPE_INTEGER
|
||||
FORMAT_TYPE_IP
|
||||
FORMAT_TYPE_MAC
|
||||
)
|
||||
|
||||
var (
|
||||
EtypeName = map[uint32]string{
|
||||
0x806: "ARP",
|
||||
0x800: "IPv4",
|
||||
0x86dd: "IPv6",
|
||||
}
|
||||
ProtoName = map[uint32]string{
|
||||
1: "ICMP",
|
||||
6: "TCP",
|
||||
17: "UDP",
|
||||
58: "ICMPv6",
|
||||
}
|
||||
IcmpTypeName = map[uint32]string{
|
||||
0: "EchoReply",
|
||||
3: "DestinationUnreachable",
|
||||
8: "Echo",
|
||||
9: "RouterAdvertisement",
|
||||
10: "RouterSolicitation",
|
||||
11: "TimeExceeded",
|
||||
}
|
||||
Icmp6TypeName = map[uint32]string{
|
||||
1: "DestinationUnreachable",
|
||||
2: "PacketTooBig",
|
||||
3: "TimeExceeded",
|
||||
128: "EchoRequest",
|
||||
129: "EchoReply",
|
||||
133: "RouterSolicitation",
|
||||
134: "RouterAdvertisement",
|
||||
}
|
||||
|
||||
TextFields = []string{
|
||||
"Type",
|
||||
"TimeReceived",
|
||||
"SequenceNum",
|
||||
"SamplingRate",
|
||||
"SamplerAddress",
|
||||
"TimeFlowStart",
|
||||
"TimeFlowEnd",
|
||||
"Bytes",
|
||||
"Packets",
|
||||
"SrcAddr",
|
||||
"DstAddr",
|
||||
"Etype",
|
||||
"Proto",
|
||||
"SrcPort",
|
||||
"DstPort",
|
||||
"InIf",
|
||||
"OutIf",
|
||||
"SrcMac",
|
||||
"DstMac",
|
||||
"SrcVlan",
|
||||
"DstVlan",
|
||||
"VlanId",
|
||||
"IngressVrfID",
|
||||
"EgressVrfID",
|
||||
"IPTos",
|
||||
"ForwardingStatus",
|
||||
"IPTTL",
|
||||
"TCPFlags",
|
||||
"IcmpType",
|
||||
"IcmpCode",
|
||||
"IPv6FlowLabel",
|
||||
"FragmentId",
|
||||
"FragmentOffset",
|
||||
"BiFlowDirection",
|
||||
"SrcAS",
|
||||
"DstAS",
|
||||
"NextHop",
|
||||
"NextHopAS",
|
||||
"SrcNet",
|
||||
"DstNet",
|
||||
}
|
||||
TextFieldsTypes = []int{
|
||||
FORMAT_TYPE_STRING_FUNC,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_IP,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_IP,
|
||||
FORMAT_TYPE_IP,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_MAC,
|
||||
FORMAT_TYPE_MAC,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_IP,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
}
|
||||
RenderExtras = []string{
|
||||
"EtypeName",
|
||||
"ProtoName",
|
||||
"IcmpName",
|
||||
}
|
||||
RenderExtraCall = []RenderExtraFunction{
|
||||
RenderExtraFunctionEtypeName,
|
||||
RenderExtraFunctionProtoName,
|
||||
RenderExtraFunctionIcmpName,
|
||||
}
|
||||
)
|
||||
|
||||
func AddTextField(name string, jtype int) {
|
||||
TextFields = append(TextFields, name)
|
||||
TextFieldsTypes = append(TextFieldsTypes, jtype)
|
||||
}
|
||||
|
||||
type RenderExtraFunction func(proto.Message) string
|
||||
|
||||
func RenderExtraFetchNumbers(msg proto.Message, fields []string) []uint64 {
|
||||
vfm := reflect.ValueOf(msg)
|
||||
vfm = reflect.Indirect(vfm)
|
||||
|
||||
values := make([]uint64, len(fields))
|
||||
for i, kf := range fields {
|
||||
fieldValue := vfm.FieldByName(kf)
|
||||
if fieldValue.IsValid() {
|
||||
values[i] = fieldValue.Uint()
|
||||
}
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
func RenderExtraFunctionEtypeName(msg proto.Message) string {
|
||||
num := RenderExtraFetchNumbers(msg, []string{"Etype"})
|
||||
return EtypeName[uint32(num[0])]
|
||||
}
|
||||
|
||||
func RenderExtraFunctionProtoName(msg proto.Message) string {
|
||||
num := RenderExtraFetchNumbers(msg, []string{"Proto"})
|
||||
return ProtoName[uint32(num[0])]
|
||||
}
|
||||
func RenderExtraFunctionIcmpName(msg proto.Message) string {
|
||||
num := RenderExtraFetchNumbers(msg, []string{"Proto", "IcmpCode", "IcmpType"})
|
||||
return IcmpCodeType(uint32(num[0]), uint32(num[1]), uint32(num[2]))
|
||||
}
|
||||
|
||||
func IcmpCodeType(proto, icmpCode, icmpType uint32) string {
|
||||
if proto == 1 {
|
||||
return IcmpTypeName[icmpType]
|
||||
} else if proto == 58 {
|
||||
return Icmp6TypeName[icmpType]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func RenderIP(addr []byte) string {
|
||||
if addr == nil || (len(addr) != 4 && len(addr) != 16) {
|
||||
return ""
|
||||
}
|
||||
|
||||
return net.IP(addr).String()
|
||||
}
|
||||
|
||||
func FormatMessageReflectText(msg proto.Message, ext string) string {
|
||||
return FormatMessageReflectCustom(msg, ext, "", " ", "=", false)
|
||||
}
|
||||
|
||||
func FormatMessageReflectJSON(msg proto.Message, ext string) string {
|
||||
return fmt.Sprintf("{%s}", FormatMessageReflectCustom(msg, ext, "\"", ",", ":", true))
|
||||
}
|
||||
|
||||
func FormatMessageReflectCustom(msg proto.Message, ext, quotes, sep, sign string, null bool) string {
|
||||
fstr := make([]string, len(TextFields)+len(RenderExtras))
|
||||
|
||||
vfm := reflect.ValueOf(msg)
|
||||
vfm = reflect.Indirect(vfm)
|
||||
|
||||
var i int
|
||||
for j, kf := range TextFields {
|
||||
fieldValue := vfm.FieldByName(kf)
|
||||
if fieldValue.IsValid() {
|
||||
|
||||
switch TextFieldsTypes[j] {
|
||||
case FORMAT_TYPE_STRING_FUNC:
|
||||
strMethod := fieldValue.MethodByName("String").Call([]reflect.Value{})
|
||||
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, strMethod[0].String())
|
||||
case FORMAT_TYPE_STRING:
|
||||
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, fieldValue.String())
|
||||
case FORMAT_TYPE_INTEGER:
|
||||
fstr[i] = fmt.Sprintf("%s%s%s%s%d", quotes, kf, quotes, sign, fieldValue.Uint())
|
||||
case FORMAT_TYPE_IP:
|
||||
ip := fieldValue.Bytes()
|
||||
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, RenderIP(ip))
|
||||
case FORMAT_TYPE_MAC:
|
||||
mac := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(mac, fieldValue.Uint())
|
||||
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, kf, quotes, sign, net.HardwareAddr(mac[2:]).String())
|
||||
default:
|
||||
if null {
|
||||
fstr[i] = fmt.Sprintf("%s%s%s%snull", quotes, kf, quotes, sign)
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
if null {
|
||||
fstr[i] = fmt.Sprintf("%s%s%s%snull", quotes, kf, quotes, sign)
|
||||
}
|
||||
}
|
||||
if len(selectorMap) == 0 || selectorMap[kf] {
|
||||
i++
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
for j, e := range RenderExtras {
|
||||
fstr[i] = fmt.Sprintf("%s%s%s%s%q", quotes, e, quotes, sign, RenderExtraCall[j](msg))
|
||||
if len(selectorMap) == 0 || selectorMap[e] {
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
if len(selectorMap) > 0 {
|
||||
fstr = fstr[0:i]
|
||||
}
|
||||
|
||||
return strings.Join(fstr, sep)
|
||||
}
|
@@ -2,215 +2,27 @@ package json
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/netsampler/goflow2/format"
|
||||
"github.com/netsampler/goflow2/format/protobuf"
|
||||
flowmessage "github.com/netsampler/goflow2/pb"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"github.com/netsampler/goflow2/format/common"
|
||||
)
|
||||
|
||||
const (
|
||||
FORMAT_TYPE_UNKNOWN = iota
|
||||
FORMAT_TYPE_STRING_FUNC
|
||||
FORMAT_TYPE_STRING
|
||||
FORMAT_TYPE_INTEGER
|
||||
FORMAT_TYPE_IP
|
||||
FORMAT_TYPE_MAC
|
||||
)
|
||||
|
||||
var (
|
||||
EtypeName = map[uint32]string{
|
||||
0x806: "ARP",
|
||||
0x800: "IPv4",
|
||||
0x86dd: "IPv6",
|
||||
}
|
||||
ProtoName = map[uint32]string{
|
||||
1: "ICMP",
|
||||
6: "TCP",
|
||||
17: "UDP",
|
||||
58: "ICMPv6",
|
||||
}
|
||||
IcmpTypeName = map[uint32]string{
|
||||
0: "EchoReply",
|
||||
3: "DestinationUnreachable",
|
||||
8: "Echo",
|
||||
9: "RouterAdvertisement",
|
||||
10: "RouterSolicitation",
|
||||
11: "TimeExceeded",
|
||||
}
|
||||
Icmp6TypeName = map[uint32]string{
|
||||
1: "DestinationUnreachable",
|
||||
2: "PacketTooBig",
|
||||
3: "TimeExceeded",
|
||||
128: "EchoRequest",
|
||||
129: "EchoReply",
|
||||
133: "RouterSolicitation",
|
||||
134: "RouterAdvertisement",
|
||||
}
|
||||
|
||||
JsonFields = []string{
|
||||
"Type",
|
||||
"TimeReceived",
|
||||
"SequenceNum",
|
||||
"SamplingRate",
|
||||
"SamplerAddress",
|
||||
"TimeFlowStart",
|
||||
"TimeFlowEnd",
|
||||
"Bytes",
|
||||
"Packets",
|
||||
"SrcAddr",
|
||||
"DstAddr",
|
||||
"Etype",
|
||||
"Proto",
|
||||
"SrcPort",
|
||||
"DstPort",
|
||||
"InIf",
|
||||
"OutIf",
|
||||
"SrcMac",
|
||||
"DstMac",
|
||||
"SrcVlan",
|
||||
"DstVlan",
|
||||
"VlanId",
|
||||
"IngressVrfID",
|
||||
"EgressVrfID",
|
||||
"IPTos",
|
||||
"ForwardingStatus",
|
||||
"IPTTL",
|
||||
"TCPFlags",
|
||||
"IcmpType",
|
||||
"IcmpCode",
|
||||
"IPv6FlowLabel",
|
||||
"FragmentId",
|
||||
"FragmentOffset",
|
||||
"BiFlowDirection",
|
||||
"SrcAS",
|
||||
"DstAS",
|
||||
"NextHop",
|
||||
"NextHopAS",
|
||||
"SrcNet",
|
||||
"DstNet",
|
||||
}
|
||||
JsonFieldsTypes = []int{
|
||||
FORMAT_TYPE_STRING_FUNC,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_IP,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_IP,
|
||||
FORMAT_TYPE_IP,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_MAC,
|
||||
FORMAT_TYPE_MAC,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_IP,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
FORMAT_TYPE_INTEGER,
|
||||
}
|
||||
JsonExtras = []string{
|
||||
"EtypeName",
|
||||
"ProtoName",
|
||||
"IcmpName",
|
||||
}
|
||||
JsonExtraCall = []JsonExtraFunction{
|
||||
JsonExtraFunctionEtypeName,
|
||||
JsonExtraFunctionProtoName,
|
||||
JsonExtraFunctionIcmpName,
|
||||
}
|
||||
)
|
||||
|
||||
func AddJSONField(name string, jtype int) {
|
||||
JsonFields = append(JsonFields, name)
|
||||
JsonFieldsTypes = append(JsonFieldsTypes, jtype)
|
||||
}
|
||||
|
||||
type JsonExtraFunction func(proto.Message) string
|
||||
|
||||
func JsonExtraFetchNumbers(msg proto.Message, fields []string) []uint64 {
|
||||
vfm := reflect.ValueOf(msg)
|
||||
vfm = reflect.Indirect(vfm)
|
||||
|
||||
values := make([]uint64, len(fields))
|
||||
for i, kf := range fields {
|
||||
fieldValue := vfm.FieldByName(kf)
|
||||
if fieldValue.IsValid() {
|
||||
values[i] = fieldValue.Uint()
|
||||
}
|
||||
}
|
||||
|
||||
return values
|
||||
}
|
||||
|
||||
func JsonExtraFunctionEtypeName(msg proto.Message) string {
|
||||
num := JsonExtraFetchNumbers(msg, []string{"Etype"})
|
||||
return EtypeName[uint32(num[0])]
|
||||
}
|
||||
func JsonExtraFunctionProtoName(msg proto.Message) string {
|
||||
num := JsonExtraFetchNumbers(msg, []string{"Proto"})
|
||||
return ProtoName[uint32(num[0])]
|
||||
}
|
||||
func JsonExtraFunctionIcmpName(msg proto.Message) string {
|
||||
num := JsonExtraFetchNumbers(msg, []string{"Proto", "IcmpCode", "IcmpType"})
|
||||
return IcmpCodeType(uint32(num[0]), uint32(num[1]), uint32(num[2]))
|
||||
}
|
||||
|
||||
func IcmpCodeType(proto, icmpCode, icmpType uint32) string {
|
||||
if proto == 1 {
|
||||
return IcmpTypeName[icmpType]
|
||||
} else if proto == 58 {
|
||||
return Icmp6TypeName[icmpType]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type JsonDriver struct {
|
||||
fieldsVar string
|
||||
fields []string // Hashing fields
|
||||
}
|
||||
|
||||
func RenderIP(addr []byte) string {
|
||||
if addr == nil || (len(addr) != 4 && len(addr) != 16) {
|
||||
return ""
|
||||
}
|
||||
|
||||
return net.IP(addr).String()
|
||||
}
|
||||
|
||||
func (d *JsonDriver) Prepare() error {
|
||||
common.HashFlag()
|
||||
common.SelectorFlag()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *JsonDriver) Init(context.Context) error {
|
||||
return protobuf.ManualInit()
|
||||
err := common.ManualHashInit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return common.ManualSelectorInit()
|
||||
}
|
||||
|
||||
func (d *JsonDriver) Format(data interface{}) ([]byte, []byte, error) {
|
||||
@@ -219,151 +31,8 @@ func (d *JsonDriver) Format(data interface{}) ([]byte, []byte, error) {
|
||||
return nil, nil, fmt.Errorf("message is not protobuf")
|
||||
}
|
||||
|
||||
key := protobuf.HashProtoLocal(msg)
|
||||
return []byte(key), []byte(FormatMessageReflect(msg, "")), nil
|
||||
}
|
||||
|
||||
func FormatMessageReflect(msg proto.Message, ext string) string {
|
||||
fstr := make([]string, len(JsonFields)+len(JsonExtras))
|
||||
|
||||
vfm := reflect.ValueOf(msg)
|
||||
vfm = reflect.Indirect(vfm)
|
||||
|
||||
for i, kf := range JsonFields {
|
||||
fieldValue := vfm.FieldByName(kf)
|
||||
if fieldValue.IsValid() {
|
||||
|
||||
switch JsonFieldsTypes[i] {
|
||||
case FORMAT_TYPE_STRING_FUNC:
|
||||
strMethod := fieldValue.MethodByName("String").Call([]reflect.Value{})
|
||||
fstr[i] = fmt.Sprintf("\"%s\":\"%s\"", kf, strMethod[0].String())
|
||||
case FORMAT_TYPE_STRING:
|
||||
fstr[i] = fmt.Sprintf("\"%s\":\"%s\"", kf, fieldValue.String())
|
||||
case FORMAT_TYPE_INTEGER:
|
||||
fstr[i] = fmt.Sprintf("\"%s\":%d", kf, fieldValue.Uint())
|
||||
case FORMAT_TYPE_IP:
|
||||
ip := fieldValue.Bytes()
|
||||
fstr[i] = fmt.Sprintf("\"%s\":\"%s\"", kf, RenderIP(ip))
|
||||
case FORMAT_TYPE_MAC:
|
||||
mac := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(mac, fieldValue.Uint())
|
||||
fstr[i] = fmt.Sprintf("\"%s\":\"%s\"", kf, net.HardwareAddr(mac[2:]).String())
|
||||
default:
|
||||
fstr[i] = fmt.Sprintf("\"%s\":null", kf)
|
||||
}
|
||||
|
||||
} else {
|
||||
fstr[i] = fmt.Sprintf("\"%s\":null", kf)
|
||||
}
|
||||
}
|
||||
|
||||
for i, e := range JsonExtras {
|
||||
fstr[i+len(JsonFields)] = fmt.Sprintf("\"%s\":\"%s\"", e, JsonExtraCall[i](msg))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("{%s}", strings.Join(fstr, ","))
|
||||
}
|
||||
|
||||
func FormatMessage(msg *flowmessage.FlowMessage, ext string) string {
|
||||
srcmac := make([]byte, 8)
|
||||
dstmac := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(srcmac, msg.SrcMac)
|
||||
binary.BigEndian.PutUint64(dstmac, msg.DstMac)
|
||||
srcmac = srcmac[2:8]
|
||||
dstmac = dstmac[2:8]
|
||||
|
||||
b := fmt.Sprintf(
|
||||
"{"+
|
||||
"\"Type\":\"%v\","+
|
||||
"\"TimeReceived\":%d,"+
|
||||
"\"SequenceNum\":%d,"+
|
||||
"\"SamplingRate\":%d,"+
|
||||
"\"SamplerAddress\":\"%v\","+
|
||||
"\"TimeFlowStart\":%d,"+
|
||||
"\"TimeFlowEnd\":%d,"+
|
||||
"\"Bytes\":%d,"+
|
||||
"\"Packets\":%d,"+
|
||||
"\"SrcAddr\":\"%v\","+
|
||||
"\"DstAddr\":\"%v\","+
|
||||
"\"Etype\":%d,"+
|
||||
"\"EtypeName\":\"%s\","+
|
||||
"\"Proto\":%d,"+
|
||||
"\"ProtoName\":\"%s\","+
|
||||
"\"SrcPort\":%d,"+
|
||||
"\"DstPort\":%d,"+
|
||||
"\"InIf\":%d,"+
|
||||
"\"OutIf\":%d,"+
|
||||
"\"SrcMac\":\"%v\","+
|
||||
"\"DstMac\":\"%v\","+
|
||||
"\"SrcVlan\":%d,"+
|
||||
"\"DstVlan\":%d,"+
|
||||
"\"VlanId\":%d,"+
|
||||
"\"IngressVrfID\":%d,"+
|
||||
"\"EgressVrfID\":%d,"+
|
||||
"\"IPTos\":%d,"+
|
||||
"\"ForwardingStatus\":%d,"+
|
||||
"\"IPTTL\":%d,"+
|
||||
"\"TCPFlags\":%d,"+
|
||||
"\"IcmpType\":%d,"+
|
||||
"\"IcmpCode\":%d,"+
|
||||
"\"IcmpName\":\"%s\","+
|
||||
"\"IPv6FlowLabel\":%d,"+
|
||||
"\"FragmentId\":%d,"+
|
||||
"\"FragmentOffset\":%d,"+
|
||||
"\"BiFlowDirection\":\"%v\","+
|
||||
"\"SrcAS\":%d,"+
|
||||
"\"DstAS\":%d,"+
|
||||
"\"NextHop\":\"%v\","+
|
||||
"\"NextHopAS\":%d,"+
|
||||
"\"SrcNet\":%d,"+
|
||||
"\"DstNet\":%d"+
|
||||
"%s}",
|
||||
msg.Type.String(),
|
||||
msg.TimeReceived,
|
||||
msg.SequenceNum,
|
||||
msg.SamplingRate,
|
||||
RenderIP(msg.SamplerAddress),
|
||||
msg.TimeFlowStart,
|
||||
msg.TimeFlowEnd,
|
||||
msg.Bytes,
|
||||
msg.Packets,
|
||||
RenderIP(msg.SrcAddr),
|
||||
RenderIP(msg.DstAddr),
|
||||
msg.Etype,
|
||||
EtypeName[msg.Etype],
|
||||
msg.Proto,
|
||||
ProtoName[msg.Proto],
|
||||
msg.SrcPort,
|
||||
msg.DstPort,
|
||||
msg.InIf,
|
||||
msg.OutIf,
|
||||
net.HardwareAddr(srcmac).String(),
|
||||
net.HardwareAddr(dstmac).String(),
|
||||
msg.SrcVlan,
|
||||
msg.DstVlan,
|
||||
msg.VlanId,
|
||||
msg.IngressVrfID,
|
||||
msg.EgressVrfID,
|
||||
msg.IPTos,
|
||||
msg.ForwardingStatus,
|
||||
msg.IPTTL,
|
||||
msg.TCPFlags,
|
||||
msg.IcmpType,
|
||||
msg.IcmpCode,
|
||||
IcmpCodeType(msg.Proto, msg.IcmpCode, msg.IcmpType),
|
||||
msg.IPv6FlowLabel,
|
||||
msg.FragmentId,
|
||||
msg.FragmentOffset,
|
||||
msg.BiFlowDirection,
|
||||
msg.SrcAS,
|
||||
msg.DstAS,
|
||||
RenderIP(msg.NextHop),
|
||||
msg.NextHopAS,
|
||||
msg.SrcNet,
|
||||
msg.DstNet,
|
||||
ext)
|
||||
|
||||
return b
|
||||
key := common.HashProtoLocal(msg)
|
||||
return []byte(key), []byte(common.FormatMessageReflectJSON(msg, "")), nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@@ -6,13 +6,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/netsampler/goflow2/format"
|
||||
"reflect"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var (
|
||||
fieldsVar string
|
||||
fields []string // Hashing fields
|
||||
"github.com/netsampler/goflow2/format/common"
|
||||
)
|
||||
|
||||
type ProtobufDriver struct {
|
||||
@@ -20,18 +14,13 @@ type ProtobufDriver struct {
|
||||
}
|
||||
|
||||
func (d *ProtobufDriver) Prepare() error {
|
||||
flag.StringVar(&fieldsVar, "format.hash", "SamplerAddress", "List of fields to do hashing, separated by commas")
|
||||
common.HashFlag()
|
||||
flag.BoolVar(&d.fixedLen, "format.protobuf.fixedlen", false, "Prefix the protobuf with message length")
|
||||
return nil
|
||||
}
|
||||
|
||||
func ManualInit() error {
|
||||
fields = strings.Split(fieldsVar, ",")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ProtobufDriver) Init(context.Context) error {
|
||||
return ManualInit()
|
||||
return common.ManualHashInit()
|
||||
}
|
||||
|
||||
func (d *ProtobufDriver) Format(data interface{}) ([]byte, []byte, error) {
|
||||
@@ -39,7 +28,7 @@ func (d *ProtobufDriver) Format(data interface{}) ([]byte, []byte, error) {
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("message is not protobuf")
|
||||
}
|
||||
key := HashProtoLocal(msg)
|
||||
key := common.HashProtoLocal(msg)
|
||||
|
||||
if !d.fixedLen {
|
||||
b, err := proto.Marshal(msg)
|
||||
@@ -55,25 +44,3 @@ func init() {
|
||||
d := &ProtobufDriver{}
|
||||
format.RegisterFormatDriver("pb", d)
|
||||
}
|
||||
|
||||
func HashProtoLocal(msg interface{}) string {
|
||||
return HashProto(fields, msg)
|
||||
}
|
||||
|
||||
func HashProto(fields []string, msg interface{}) string {
|
||||
var keyStr string
|
||||
|
||||
if msg != nil {
|
||||
vfm := reflect.ValueOf(msg)
|
||||
vfm = reflect.Indirect(vfm)
|
||||
|
||||
for _, kf := range fields {
|
||||
fieldValue := vfm.FieldByName(kf)
|
||||
if fieldValue.IsValid() {
|
||||
keyStr += fmt.Sprintf("%v-", fieldValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return keyStr
|
||||
}
|
||||
|
41
format/text/text.go
Normal file
41
format/text/text.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package text
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/netsampler/goflow2/format"
|
||||
"github.com/netsampler/goflow2/format/common"
|
||||
)
|
||||
|
||||
type TextDriver struct {
|
||||
}
|
||||
|
||||
func (d *TextDriver) Prepare() error {
|
||||
common.HashFlag()
|
||||
common.SelectorFlag()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *TextDriver) Init(context.Context) error {
|
||||
err := common.ManualHashInit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return common.ManualSelectorInit()
|
||||
}
|
||||
|
||||
func (d *TextDriver) Format(data interface{}) ([]byte, []byte, error) {
|
||||
msg, ok := data.(proto.Message)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("message is not protobuf")
|
||||
}
|
||||
|
||||
key := common.HashProtoLocal(msg)
|
||||
return []byte(key), []byte(common.FormatMessageReflectText(msg, "")), nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
d := &TextDriver{}
|
||||
format.RegisterFormatDriver("text", d)
|
||||
}
|
Reference in New Issue
Block a user