Skip to content

Commit bf3807f

Browse files
author
Michal Tichák
committed
[core] better kafka metrics
1 parent 7a1243b commit bf3807f

File tree

2 files changed

+39
-37
lines changed

2 files changed

+39
-37
lines changed

common/event/writer.go

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ import (
4242
)
4343

4444
var (
45-
log = logger.New(logrus.StandardLogger(), "event")
46-
KAFKAWRITER = "kafka_writer"
47-
KAFKAPREPARE = "kafka_prepare"
45+
log = logger.New(logrus.StandardLogger(), "event")
46+
KAFKAWRITER = "kafka_writer"
47+
KAFKAPREPARE = "kafka_prepare"
48+
writerBatchsize = 100
4849
)
4950

5051
type Writer interface {
@@ -76,7 +77,7 @@ type KafkaWriter struct {
7677
toBatchMessagesChan chan kafka.Message
7778
messageBuffer FifoBuffer[kafka.Message]
7879
// NOTE: we use settable callback in order to be able to test writing of messages via KafkaWriter, without necessity of setting up cluster
79-
writeFunction func([]kafka.Message, *monitoring.Metric)
80+
writeFunction func([]kafka.Message)
8081
runningWorkers sync.WaitGroup
8182
batchingLoopDoneCh chan struct{}
8283
}
@@ -94,19 +95,30 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
9495
Topic: string(topic),
9596
Balancer: &kafka.Hash{},
9697
AllowAutoTopicCreation: true,
98+
BatchTimeout: time.Millisecond, // we are batching the messages ourselves and writing sync into writer, so long timeout is not necessary
99+
BatchSize: writerBatchsize,
97100
},
98-
toBatchMessagesChan: make(chan kafka.Message, 10000),
101+
toBatchMessagesChan: make(chan kafka.Message, 100000),
99102
messageBuffer: NewFifoBuffer[kafka.Message](),
100103
runningWorkers: sync.WaitGroup{},
101104
batchingLoopDoneCh: make(chan struct{}, 1),
102105
}
103106

104-
writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) {
105-
defer monitoring.TimerNS(metric)()
107+
writer.writeFunction = func(messages []kafka.Message) {
108+
metric := writer.newMetric(KAFKAWRITER)
109+
metric.SetFieldUInt64("messages_sent", uint64(len(messages)))
110+
metric.SetFieldUInt64("messages_failed", 0)
111+
112+
metricDuration := writer.newMetric(KAFKAWRITER)
113+
defer monitoring.SendHistogrammable(&metricDuration)
114+
defer monitoring.TimerNS(&metricDuration)()
115+
106116
if err := writer.WriteMessages(context.Background(), messages...); err != nil {
107117
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
108118
log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err)
109119
}
120+
121+
monitoring.Send(&metric)
110122
}
111123

112124
go writer.writingLoop()
@@ -138,18 +150,11 @@ func (w *KafkaWriter) writingLoop() {
138150
w.runningWorkers.Done()
139151
return
140152
default:
141-
messagesToSend := w.messageBuffer.PopMultiple(100)
153+
messagesToSend := w.messageBuffer.PopMultiple(uint(writerBatchsize))
142154
if len(messagesToSend) == 0 {
143155
continue
144156
}
145-
146-
metric := w.newMetric(KAFKAWRITER)
147-
metric.SetFieldUInt64("messages_sent", uint64(len(messagesToSend)))
148-
metric.SetFieldUInt64("messages_failed", 0)
149-
150-
w.writeFunction(messagesToSend, &metric)
151-
152-
monitoring.Send(&metric)
157+
w.writeFunction(messagesToSend)
153158
}
154159
}
155160
}
@@ -241,25 +246,23 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
241246

242247
metric := w.newMetric(KAFKAPREPARE)
243248

244-
func() {
245-
defer monitoring.TimerNS(&metric)()
246-
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
247-
if err != nil {
248-
log.WithField("event", e).
249-
WithField("level", infologger.IL_Support).
250-
Errorf("Failed to convert event to kafka event: %s", err.Error())
251-
return
252-
}
249+
defer monitoring.SendHistogrammable(&metric)
250+
defer monitoring.TimerNS(&metric)()
253251

254-
message, err := kafkaEventToKafkaMessage(wrappedEvent, key)
255-
if err != nil {
256-
log.WithField("event", e).
257-
WithField("level", infologger.IL_Support).
258-
Errorf("Failed to convert kafka event to message: %s", err.Error())
259-
return
260-
}
261-
w.toBatchMessagesChan <- message
262-
}()
252+
wrappedEvent, key, err := internalEventToKafkaEvent(e, timestamp)
253+
if err != nil {
254+
log.WithField("event", e).
255+
WithField("level", infologger.IL_Support).
256+
Errorf("Failed to convert event to kafka event: %s", err.Error())
257+
return
258+
}
263259

264-
monitoring.Send(&metric)
260+
message, err := kafkaEventToKafkaMessage(wrappedEvent, key)
261+
if err != nil {
262+
log.WithField("event", e).
263+
WithField("level", infologger.IL_Support).
264+
Errorf("Failed to convert kafka event to message: %s", err.Error())
265+
return
266+
}
267+
w.toBatchMessagesChan <- message
265268
}

common/event/writer_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ package event
2727
import (
2828
"sync"
2929

30-
"github.com/AliceO2Group/Control/common/monitoring"
3130
pb "github.com/AliceO2Group/Control/common/protos"
3231
. "github.com/onsi/ginkgo/v2"
3332
. "github.com/onsi/gomega"
@@ -47,7 +46,7 @@ var _ = Describe("Writer", func() {
4746
writer.runningWorkers = sync.WaitGroup{}
4847
writer.batchingLoopDoneCh = make(chan struct{}, 1)
4948

50-
writer.writeFunction = func(messages []kafka.Message, _ *monitoring.Metric) {
49+
writer.writeFunction = func(messages []kafka.Message) {
5150
Expect(len(messages)).To(Equal(1))
5251
event := &pb.Event{}
5352
proto.Unmarshal(messages[0].Value, event)

0 commit comments

Comments
 (0)