diff --git a/exporter/kafkaexporter/exporter.go b/exporter/kafkaexporter/exporter.go index ff84e0f5ab1..4e7321b4199 100644 --- a/exporter/kafkaexporter/exporter.go +++ b/exporter/kafkaexporter/exporter.go @@ -19,10 +19,28 @@ type MessageSender interface { SendMessages(msgs []*sarama.ProducerMessage) error } +type AsyncMessageSender struct { + sarama.AsyncProducer + logger *fflog.FFLogger +} + +func (a *AsyncMessageSender) SendMessages(msgs []*sarama.ProducerMessage) error { + for len(msgs) > 0 { + select { + case err := <-a.AsyncProducer.Errors(): + a.logger.Warn("Failed to produce message: %w", err) + case a.AsyncProducer.Input() <- msgs[0]: + msgs = msgs[1:] + } + } + return nil +} + // Settings contains Kafka-specific configurations needed for message creation type Settings struct { Topic string `json:"topic"` Addresses []string `json:"addresses"` + Async bool `json:"async"` *sarama.Config } @@ -41,13 +59,15 @@ type Exporter struct { // dialer will create the producer. This field is added for dependency injection during testing as sarama // has the annoying tendency to dial as soon as a producer is created. dialer func(addrs []string, config *sarama.Config) (MessageSender, error) + + logger *fflog.FFLogger } // Export will produce a message to the Kafka topic. The message's value will contain the event encoded in the // selected format. Messages are published synchronously and will error immediately on failure. -func (e *Exporter) Export(_ context.Context, _ *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error { +func (e *Exporter) Export(_ context.Context, logger *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error { if e.sender == nil { - err := e.initializeProducer() + err := e.initializeProducer(logger) if err != nil { return fmt.Errorf("writer: %w", err) } @@ -81,7 +101,7 @@ func (e *Exporter) IsBulk() bool { // initializeProducer runs only once and creates a new producer from the dialer. If the config is not populated a new // one will be created with sensible defaults. -func (e *Exporter) initializeProducer() error { +func (e *Exporter) initializeProducer(logger *fflog.FFLogger) error { if e.Settings.Config == nil { e.Settings.Config = sarama.NewConfig() e.Settings.Config.Producer.Return.Successes = true // Needs to be true for sync producers @@ -90,6 +110,13 @@ func (e *Exporter) initializeProducer() error { if e.dialer == nil { e.dialer = func(addrs []string, config *sarama.Config) (MessageSender, error) { // Adapter for the function to comply with the MessageSender interface return + if e.Settings.Async { + asyncProducer, err := sarama.NewAsyncProducer(addrs, config) + if err != nil { + return nil, err + } + return &AsyncMessageSender{AsyncProducer: asyncProducer, logger: logger}, nil //TODO Close should be called on shutdown + } return sarama.NewSyncProducer(addrs, config) } }