mirror of
https://github.com/openobserve/goflow2.git
synced 2025-10-23 07:11:57 +00:00
Kafka: enable compression (#128)
Co-authored-by: Arun Cherla <kcherla@gmail.com>
This commit is contained in:
@@ -32,12 +32,23 @@ type KafkaDriver struct {
|
||||
|
||||
kafkaHashing bool
|
||||
kafkaVersion string
|
||||
kafkaCompressionCodec string
|
||||
|
||||
producer sarama.AsyncProducer
|
||||
|
||||
q chan bool
|
||||
}
|
||||
|
||||
var (
|
||||
compressionCodecs = map[string]sarama.CompressionCodec{
|
||||
strings.ToLower(sarama.CompressionNone.String()): sarama.CompressionNone,
|
||||
strings.ToLower(sarama.CompressionGZIP.String()): sarama.CompressionGZIP,
|
||||
strings.ToLower(sarama.CompressionSnappy.String()): sarama.CompressionSnappy,
|
||||
strings.ToLower(sarama.CompressionLZ4.String()): sarama.CompressionLZ4,
|
||||
strings.ToLower(sarama.CompressionZSTD.String()): sarama.CompressionZSTD,
|
||||
}
|
||||
)
|
||||
|
||||
func (d *KafkaDriver) Prepare() error {
|
||||
flag.BoolVar(&d.kafkaTLS, "transport.kafka.tls", false, "Use TLS to connect to Kafka")
|
||||
|
||||
@@ -54,6 +65,7 @@ func (d *KafkaDriver) Prepare() error {
|
||||
|
||||
//flag.StringVar(&d.kafkaKeying, "transport.kafka.key", "SamplerAddress,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas")
|
||||
flag.StringVar(&d.kafkaVersion, "transport.kafka.version", "2.8.0", "Kafka version")
|
||||
flag.StringVar(&d.kafkaCompressionCodec, "transport.kafka.compression", "", "Kafka default compression")
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -71,6 +83,26 @@ func (d *KafkaDriver) Init(context.Context) error {
|
||||
kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes
|
||||
kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes
|
||||
kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency
|
||||
|
||||
if d.kafkaCompressionCodec != "" {
|
||||
/*
|
||||
// when upgrading sarama, replace with:
|
||||
// note: if the library adds more codecs, they will be supported natively
|
||||
var cc *sarama.CompressionCodec
|
||||
|
||||
if err := cc.UnmarshalText([]byte(d.kafkaCompressionCodec)); err != nil {
|
||||
return err
|
||||
}
|
||||
kafkaConfig.Producer.Compression = *cc
|
||||
*/
|
||||
|
||||
if cc, ok := compressionCodecs[strings.ToLower(d.kafkaCompressionCodec)]; !ok {
|
||||
return errors.New("compression codec does not exist")
|
||||
} else {
|
||||
kafkaConfig.Producer.Compression = cc
|
||||
}
|
||||
}
|
||||
|
||||
if d.kafkaTLS {
|
||||
rootCAs, err := x509.SystemCertPool()
|
||||
if err != nil {
|
||||
|
Reference in New Issue
Block a user