From ae794c8e5bada63d63b13f21849b50b0198ef2d7 Mon Sep 17 00:00:00 2001 From: qiuwenhao Date: Sat, 13 Sep 2025 15:24:12 +0800 Subject: [PATCH] feat(kq): add batch push methods for Kafka producer --- kq/pusher.go | 60 +++++++++++++++++++++++++++++ kq/pusher_test.go | 97 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/kq/pusher.go b/kq/pusher.go index b254d4c..b4911c7 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -15,6 +15,12 @@ import ( type ( PushOption func(options *pushOptions) + // KeyValue represents a key-value pair for batch sending + KeyValue struct { + Key string + Value string + } + Pusher struct { topic string producer kafkaWriter @@ -178,3 +184,57 @@ func WithSyncPush() PushOption { options.syncPush = true } } + +// BatchPush sends multiple messages to the Kafka topic. +// It generates timestamp-based keys for each message automatically. +func (p *Pusher) BatchPush(ctx context.Context, msgs []string) error { + if len(msgs) == 0 { + return nil + } + + keyValues := make([]KeyValue, len(msgs)) + baseTime := time.Now().UnixNano() + for i, msg := range msgs { + keyValues[i] = KeyValue{ + Key: strconv.FormatInt(baseTime+int64(i), 10), + Value: msg, + } + } + return p.BatchPushWithKeys(ctx, keyValues) +} + +// BatchPushWithKeys sends multiple key-value pairs to the Kafka topic. +func (p *Pusher) BatchPushWithKeys(ctx context.Context, keyValues []KeyValue) error { + if len(keyValues) == 0 { + return nil + } + + msgs := make([]kafka.Message, len(keyValues)) + for i, kv := range keyValues { + msg := kafka.Message{ + Key: []byte(kv.Key), + Value: []byte(kv.Value), + } + + // wrap message into message carrier for tracing + mc := internal.NewMessageCarrier(internal.NewMessage(&msg)) + // inject trace context into message + otel.GetTextMapPropagator().Inject(ctx, mc) + + msgs[i] = msg + } + + // Handle sync vs async mode + if p.executor != nil { + // Async mode: use executor for batching + for _, msg := range msgs { + if err := p.executor.Add(msg, len(msg.Value)); err != nil { + return err + } + } + return nil + } else { + // Sync mode: send directly + return p.producer.WriteMessages(ctx, msgs...) + } +} diff --git a/kq/pusher_test.go b/kq/pusher_test.go index 72b380d..0627421 100644 --- a/kq/pusher_test.go +++ b/kq/pusher_test.go @@ -139,3 +139,100 @@ func TestPusher_PushWithKey_Error(t *testing.T) { assert.Equal(t, expectedError, err) mockWriter.AssertExpectations(t) } + +func TestPusher_BatchPush(t *testing.T) { + mockWriter := new(mockKafkaWriter) + pusher := &Pusher{ + producer: mockWriter, + topic: "test-topic", + } + + ctx := context.Background() + msgs := []string{"message1", "message2", "message3"} + + mockWriter.On("WriteMessages", ctx, mock.MatchedBy(func(msgs []kafka.Message) bool { + return len(msgs) == 3 && + string(msgs[0].Value) == "message1" && + string(msgs[1].Value) == "message2" && + string(msgs[2].Value) == "message3" + })).Return(nil) + + err := pusher.BatchPush(ctx, msgs) + assert.NoError(t, err) + mockWriter.AssertExpectations(t) +} + +func TestPusher_BatchPushWithKeys(t *testing.T) { + mockWriter := new(mockKafkaWriter) + pusher := &Pusher{ + producer: mockWriter, + topic: "test-topic", + } + + ctx := context.Background() + keyValues := []KeyValue{ + {Key: "key1", Value: "value1"}, + {Key: "key2", Value: "value2"}, + } + + mockWriter.On("WriteMessages", ctx, mock.MatchedBy(func(msgs []kafka.Message) bool { + return len(msgs) == 2 && + string(msgs[0].Key) == "key1" && string(msgs[0].Value) == "value1" && + string(msgs[1].Key) == "key2" && string(msgs[1].Value) == "value2" + })).Return(nil) + + err := pusher.BatchPushWithKeys(ctx, keyValues) + assert.NoError(t, err) + mockWriter.AssertExpectations(t) +} + +func TestPusher_BatchPush_EmptyMessages(t *testing.T) { + mockWriter := new(mockKafkaWriter) + pusher := &Pusher{ + producer: mockWriter, + topic: "test-topic", + } + + ctx := context.Background() + var msgs []string + + err := pusher.BatchPush(ctx, msgs) + assert.NoError(t, err) + // Should not call WriteMessages for empty slice + mockWriter.AssertNotCalled(t, "WriteMessages") +} + +func TestPusher_BatchPushWithKeys_EmptyKeyValues(t *testing.T) { + mockWriter := new(mockKafkaWriter) + pusher := &Pusher{ + producer: mockWriter, + topic: "test-topic", + } + + ctx := context.Background() + var keyValues []KeyValue + + err := pusher.BatchPushWithKeys(ctx, keyValues) + assert.NoError(t, err) + // Should not call WriteMessages for empty slice + mockWriter.AssertNotCalled(t, "WriteMessages") +} + +func TestPusher_BatchPush_Error(t *testing.T) { + mockWriter := new(mockKafkaWriter) + pusher := &Pusher{ + producer: mockWriter, + topic: "test-topic", + } + + ctx := context.Background() + msgs := []string{"message1", "message2"} + + expectedError := errors.New("batch write error") + mockWriter.On("WriteMessages", ctx, mock.AnythingOfType("[]kafka.Message")).Return(expectedError) + + err := pusher.BatchPush(ctx, msgs) + assert.Error(t, err) + assert.Equal(t, expectedError, err) + mockWriter.AssertExpectations(t) +}