mirror of
https://github.com/openobserve/goflow2.git
synced 2025-11-04 22:03:21 +00:00
Merge pull request #48 from netsampler/feature/kafka-flush-control
Add flags to control Kafka Flush parameters
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
sarama "github.com/Shopify/sarama"
|
sarama "github.com/Shopify/sarama"
|
||||||
"github.com/netsampler/goflow2/transport"
|
"github.com/netsampler/goflow2/transport"
|
||||||
@@ -23,6 +24,9 @@ type KafkaDriver struct {
|
|||||||
kafkaTopic string
|
kafkaTopic string
|
||||||
kafkaSrv string
|
kafkaSrv string
|
||||||
kafkaBrk string
|
kafkaBrk string
|
||||||
|
kafkaMaxMsgBytes int
|
||||||
|
kafkaFlushBytes int
|
||||||
|
kafkaFlushFrequency time.Duration
|
||||||
|
|
||||||
kafkaLogErrors bool
|
kafkaLogErrors bool
|
||||||
|
|
||||||
@@ -41,6 +45,9 @@ func (d *KafkaDriver) Prepare() error {
|
|||||||
flag.StringVar(&d.kafkaTopic, "transport.kafka.topic", "flow-messages", "Kafka topic to produce to")
|
flag.StringVar(&d.kafkaTopic, "transport.kafka.topic", "flow-messages", "Kafka topic to produce to")
|
||||||
flag.StringVar(&d.kafkaSrv, "transport.kafka.srv", "", "SRV record containing a list of Kafka brokers (or use brokers)")
|
flag.StringVar(&d.kafkaSrv, "transport.kafka.srv", "", "SRV record containing a list of Kafka brokers (or use brokers)")
|
||||||
flag.StringVar(&d.kafkaBrk, "transport.kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas")
|
flag.StringVar(&d.kafkaBrk, "transport.kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas")
|
||||||
|
flag.IntVar(&d.kafkaMaxMsgBytes, "transport.kafka.maxmsgbytes", 1000000, "Kafka max message bytes")
|
||||||
|
flag.IntVar(&d.kafkaFlushBytes, "transport.kafka.flushbytes", int(sarama.MaxRequestSize), "Kafka flush bytes")
|
||||||
|
flag.DurationVar(&d.kafkaFlushFrequency, "transport.kafka.flushfreq", time.Second*5, "Kafka flush frequency")
|
||||||
|
|
||||||
flag.BoolVar(&d.kafkaLogErrors, "transport.kafka.log.err", false, "Log Kafka errors")
|
flag.BoolVar(&d.kafkaLogErrors, "transport.kafka.log.err", false, "Log Kafka errors")
|
||||||
flag.BoolVar(&d.kafkaHashing, "transport.kafka.hashing", false, "Enable partition hashing")
|
flag.BoolVar(&d.kafkaHashing, "transport.kafka.hashing", false, "Enable partition hashing")
|
||||||
@@ -61,6 +68,9 @@ func (d *KafkaDriver) Init(context.Context) error {
|
|||||||
kafkaConfig.Version = kafkaConfigVersion
|
kafkaConfig.Version = kafkaConfigVersion
|
||||||
kafkaConfig.Producer.Return.Successes = false
|
kafkaConfig.Producer.Return.Successes = false
|
||||||
kafkaConfig.Producer.Return.Errors = d.kafkaLogErrors
|
kafkaConfig.Producer.Return.Errors = d.kafkaLogErrors
|
||||||
|
kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes
|
||||||
|
kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes
|
||||||
|
kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency
|
||||||
if d.kafkaTLS {
|
if d.kafkaTLS {
|
||||||
rootCAs, err := x509.SystemCertPool()
|
rootCAs, err := x509.SystemCertPool()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user