Skip to content

Commit 77556ce

Browse files
committed
Auto commit on batch end
1 parent 9f4461f commit 77556ce

File tree

3 files changed

+26
-0
lines changed

3 files changed

+26
-0
lines changed

examples/performance/performance-primitives-common.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const { randomBytes } = require('crypto');
33

44
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
55
const AUTO_COMMIT = process.env.AUTO_COMMIT || 'false';
6+
const AUTO_COMMIT_ON_BATCH_END = process.env.AUTO_COMMIT_ON_BATCH_END === 'true';
67
let autoCommit;
78
if (AUTO_COMMIT && AUTO_COMMIT === 'false')
89
autoCommit = null;
@@ -136,6 +137,14 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
136137
await actionOnMessages([message]);
137138
updateLatency(Date.now(), messagesMeasured, message, true);
138139
}
140+
141+
if (autoCommit !== null && AUTO_COMMIT_ON_BATCH_END) {
142+
await consumer.commitOffsetsOnBatchEnd([{
143+
topic,
144+
partition,
145+
offset: (Number(message.offset) + 1).toString(),
146+
}]);
147+
}
139148
}
140149
}
141150
if (eachBatch) {
@@ -144,6 +153,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
144153
eachBatch: async ({ batch }) => {
145154
const messagesBeforeBatch = messagesReceived;
146155
const topic = batch.topic;
156+
const partition = batch.partition;
147157
let messagesBase;
148158
let messages;
149159
messagesReceived += batch.messages.length;
@@ -190,6 +200,14 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
190200
}
191201
}
192202
}
203+
204+
if (autoCommit !== null && AUTO_COMMIT_ON_BATCH_END) {
205+
await consumer.commitOffsetsOnBatchEnd([{
206+
topic,
207+
partition,
208+
offset: (Number(batch.lastOffset()) + 1).toString(),
209+
}]);
210+
}
193211
}
194212
};
195213
}

examples/performance/performance-primitives-kafkajs.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ class CompatibleConsumer {
128128
return this.consumer.pause(topics);
129129
}
130130

131+
commitOffsetsOnBatchEnd(offsets) {
132+
// do nothing, done by KafkaJS after each* if autoCommit is enabled
133+
}
134+
131135
run(opts) {
132136
const autoCommit = getAutoCommit();
133137
const autoCommitOpts = autoCommit > 0 ?

examples/performance/performance-primitives.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ class CompatibleConsumer {
129129
return this.consumer.pause(topics);
130130
}
131131

132+
commitOffsetsOnBatchEnd(offsets) {
133+
return this.consumer.commitOffsets(offsets);
134+
}
135+
132136
run(opts) {
133137
return this.consumer.run({
134138
...opts

0 commit comments

Comments
 (0)