flow is working

This commit is contained in:
Prabhat Sharma
2024-01-14 08:03:25 -08:00
parent ecd8290848
commit a3a1e30e4f
3 changed files with 50 additions and 12 deletions

View File

@@ -22,6 +22,7 @@ import (
// import various transports // import various transports
"github.com/netsampler/goflow2/v2/transport" "github.com/netsampler/goflow2/v2/transport"
_ "github.com/netsampler/goflow2/v2/transport/file" _ "github.com/netsampler/goflow2/v2/transport/file"
_ "github.com/netsampler/goflow2/v2/transport/http"
_ "github.com/netsampler/goflow2/v2/transport/kafka" _ "github.com/netsampler/goflow2/v2/transport/kafka"
"github.com/oschwald/geoip2-golang" "github.com/oschwald/geoip2-golang"

View File

@@ -30,6 +30,7 @@ import (
"github.com/netsampler/goflow2/v2/transport" "github.com/netsampler/goflow2/v2/transport"
_ "github.com/netsampler/goflow2/v2/transport/file" _ "github.com/netsampler/goflow2/v2/transport/file"
_ "github.com/netsampler/goflow2/v2/transport/kafka" _ "github.com/netsampler/goflow2/v2/transport/kafka"
_ "github.com/netsampler/goflow2/v2/transport/http"
// various producers // various producers
"github.com/netsampler/goflow2/v2/producer" "github.com/netsampler/goflow2/v2/producer"

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"flag" "flag"
"fmt"
"net/http" "net/http"
"sync" "sync"
@@ -12,36 +13,71 @@ import (
type HTTPDriver struct { type HTTPDriver struct {
httpDestination string httpDestination string
httpAuthHeader string
httpAuthCredentials string
lock *sync.RWMutex lock *sync.RWMutex
q chan bool q chan bool
batchSize int
batchData [][]byte
} }
func (d *HTTPDriver) Prepare() error { func (d *HTTPDriver) Prepare() error {
flag.StringVar(&d.httpDestination, "transport.http", "", "HTTP endpoint for output") flag.StringVar(&d.httpDestination, "transport.http.destination", "", "HTTP endpoint for output")
flag.StringVar(&d.httpAuthHeader, "transport.http.auth.header", "", "HTTP header to set for credentials")
flag.StringVar(&d.httpAuthCredentials, "transport.http.auth.credentials", "", "credentials for the header")
flag.IntVar(&d.batchSize, "transport.http.batchSize", 1000, "Batch size for sending records")
if d.batchSize <= 0 {
d.batchSize = 1000 // default batch size
}
return nil return nil
} }
func (d *HTTPDriver) Init() error { func (d *HTTPDriver) Init() error {
d.q = make(chan bool, 1) d.q = make(chan bool, 1)
d.batchData = make([][]byte, 0, d.batchSize)
return nil return nil
} }
func (d *HTTPDriver) Send(key, data []byte) error { func (d *HTTPDriver) Send(key, data []byte) error {
d.lock.RLock() d.lock.RLock()
httpDestination := d.httpDestination httpDestination := d.httpDestination
httpAuthHeader := d.httpAuthHeader
httpAuthCredentials := d.httpAuthCredentials
d.lock.RUnlock() d.lock.RUnlock()
jsonData, err := json.Marshal(data) d.batchData = append(d.batchData, data)
fmt.Println("batchData len:", len(d.batchData))
if len(d.batchData) >= d.batchSize {
jsonData, err := json.Marshal(d.batchData)
if err != nil { if err != nil {
return err return err
} }
resp, err := http.Post(httpDestination, "application/json", bytes.NewBuffer(jsonData)) fmt.Println("jsonData:", string(jsonData))
req, err := http.NewRequest("POST", httpDestination, bytes.NewBuffer(jsonData))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
fmt.Println("httpAuthHeader:", httpAuthHeader)
fmt.Println("httpAuthCredentials:", httpAuthCredentials)
req.Header.Set(httpAuthHeader, httpAuthCredentials)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
// reset batchData
d.batchData = d.batchData[:0]
}
return nil return nil
} }