Skip to content

Commit e8bd34f

Browse files
committed
Configurable batch size
1 parent 77556ce commit e8bd34f

File tree

5 files changed

+50
-54
lines changed

5 files changed

+50
-54
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# confluent-kafka-javascript 1.6.1
2+
3+
v1.6.1 is a maintenance release. It is supported for all usage.
4+
5+
### Enhancements
6+
7+
1. Configurable batch size through the `js.max.batch.size` property (#389).
8+
9+
110
# confluent-kafka-javascript 1.6.0
211

312
v1.6.0 is a feature release. It is supported for all usage.

MIGRATION.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,9 +303,9 @@ producerRun().then(consumerRun).catch(console.error);
303303
- The `heartbeat()` no longer needs to be called by the user in the `eachMessage/eachBatch` callback.
304304
Heartbeats are automatically managed by librdkafka.
305305
- The `partitionsConsumedConcurrently` is supported by both `eachMessage` and `eachBatch`.
306-
- An API compatible version of `eachBatch` is available, but the batch size calculation is not
307-
as per configured parameters, rather, a constant maximum size is configured internally. This is subject
308-
to change.
306+
- An API compatible version of `eachBatch` is available, maximum batch size
307+
can be configured through the `js.max.batch.size` configuration property
308+
and defaults to 32.
309309
The property `eachBatchAutoResolve` is supported.
310310
Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported,
311311
and within the returned batch, `offsetLag` and `offsetLagLow` are supported.

examples/performance/performance-primitives.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ module.exports = {
1919
newCompatibleProducer,
2020
};
2121

22+
23+
const MAX_BATCH_SIZE = process.env.MAX_BATCH_SIZE ? +process.env.MAX_BATCH_SIZE : null;
24+
2225
function baseConfiguration(parameters) {
2326
let ret = {
2427
'client.id': 'kafka-test-performance',
@@ -146,6 +149,10 @@ function newCompatibleConsumer(parameters, eachBatch) {
146149
const autoCommitOpts = autoCommit > 0 ?
147150
{ 'enable.auto.commit': true, 'auto.commit.interval.ms': autoCommit } :
148151
{ 'enable.auto.commit': false };
152+
const jsOpts = {};
153+
if (eachBatch && MAX_BATCH_SIZE !== null) {
154+
jsOpts['js.max.batch.size'] = MAX_BATCH_SIZE;
155+
}
149156

150157
let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE;
151158
if (!groupId) {
@@ -157,6 +164,7 @@ function newCompatibleConsumer(parameters, eachBatch) {
157164
'auto.offset.reset': 'earliest',
158165
'fetch.queue.backoff.ms': '100',
159166
...autoCommitOpts,
167+
...jsOpts,
160168
});
161169
return new CompatibleConsumer(consumer);
162170
}

lib/kafkajs/_consumer.js

Lines changed: 19 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -155,17 +155,6 @@ class Consumer {
155155
*/
156156
#messageCache = null;
157157

158-
/**
159-
* The maximum size of the message cache.
160-
* Will be adjusted dynamically.
161-
*/
162-
#messageCacheMaxSize = 1;
163-
164-
/**
165-
* Number of times we tried to increase the cache.
166-
*/
167-
#increaseCount = 0;
168-
169158
/**
170159
* Whether the user has enabled manual offset management (commits).
171160
*/
@@ -182,6 +171,11 @@ class Consumer {
182171
*/
183172
#partitionCount = 0;
184173

174+
/**
175+
* Maximum batch size passed in eachBatch calls.
176+
*/
177+
#maxBatchSize = 32;
178+
185179
/**
186180
* Whether worker termination has been scheduled.
187181
*/
@@ -311,8 +305,6 @@ class Consumer {
311305
* consumed messages upto N from the internalClient, but the user has stale'd the cache
312306
* after consuming just k (< N) messages. We seek back to last consumed offset + 1. */
313307
this.#messageCache.clear();
314-
this.#messageCacheMaxSize = 1;
315-
this.#increaseCount = 0;
316308
const clearPartitions = this.assignment();
317309
const seeks = [];
318310
for (const topicPartition of clearPartitions) {
@@ -691,6 +683,17 @@ class Consumer {
691683
this.#cacheExpirationTimeoutMs = this.#maxPollIntervalMs;
692684
rdKafkaConfig['max.poll.interval.ms'] = this.#maxPollIntervalMs * 2;
693685

686+
if (rdKafkaConfig['js.max.batch.size'] !== undefined) {
687+
const maxBatchSize = +rdKafkaConfig['js.max.batch.size'];
688+
if (!Number.isInteger(maxBatchSize) || (maxBatchSize <= 0 && maxBatchSize !== -1)) {
689+
throw new error.KafkaJSError(
690+
"'js.max.batch.size' must be a positive integer or -1 for unlimited batch size.",
691+
{ code: error.ErrorCodes.ERR__INVALID_ARG });
692+
}
693+
this.#maxBatchSize = maxBatchSize;
694+
delete rdKafkaConfig['js.max.batch.size'];
695+
}
696+
694697
return rdKafkaConfig;
695698
}
696699

@@ -844,33 +847,6 @@ class Consumer {
844847
await this.commitOffsets();
845848
}
846849

847-
/**
848-
* Request a size increase.
849-
* It increases the size by 2x, but only if the size is less than 1024,
850-
* only if the size has been requested to be increased twice in a row.
851-
* @private
852-
*/
853-
#increaseMaxSize() {
854-
if (this.#messageCacheMaxSize === 1024)
855-
return;
856-
this.#increaseCount++;
857-
if (this.#increaseCount <= 1)
858-
return;
859-
this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024);
860-
this.#increaseCount = 0;
861-
}
862-
863-
/**
864-
* Request a size decrease.
865-
* It decreases the size to 80% of the last received size, with a minimum of 1.
866-
* @param {number} recvdSize - the number of messages received in the last poll.
867-
* @private
868-
*/
869-
#decreaseMaxSize(recvdSize) {
870-
this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1);
871-
this.#increaseCount = 0;
872-
}
873-
874850
/**
875851
* Converts a list of messages returned by node-rdkafka into a message that can be used by the eachBatch callback.
876852
* @param {import("../..").Message[]} messages - must not be empty. Must contain messages from the same topic and partition.
@@ -1001,11 +977,6 @@ class Consumer {
1001977
const res = takeFromCache();
1002978
this.#lastFetchClockNs = hrtime.bigint();
1003979
this.#maxPollIntervalRestart.resolve();
1004-
if (messages.length === this.#messageCacheMaxSize) {
1005-
this.#increaseMaxSize();
1006-
} else {
1007-
this.#decreaseMaxSize(messages.length);
1008-
}
1009980
return res;
1010981
} finally {
1011982
this.#fetchInProgress.resolve();
@@ -1040,7 +1011,7 @@ class Consumer {
10401011
}
10411012

10421013
return this.#fetchAndResolveWith(() => this.#messageCache.next(),
1043-
this.#messageCacheMaxSize);
1014+
Number.MAX_SAFE_INTEGER);
10441015
}
10451016

10461017
/**
@@ -1071,7 +1042,7 @@ class Consumer {
10711042

10721043
return this.#fetchAndResolveWith(() =>
10731044
this.#messageCache.nextN(null, size),
1074-
this.#messageCacheMaxSize);
1045+
Number.MAX_SAFE_INTEGER);
10751046
}
10761047

10771048
/**
@@ -1559,11 +1530,9 @@ class Consumer {
15591530
async #runInternal(config) {
15601531
this.#concurrency = config.partitionsConsumedConcurrently;
15611532
const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor;
1562-
/* TODO: make this dynamic, based on max batch size / size of last message seen. */
1563-
const maxBatchSize = 32;
15641533
const fetcher = config.eachMessage
15651534
? (savedIdx) => this.#consumeSingleCached(savedIdx)
1566-
: (savedIdx) => this.#consumeCachedN(savedIdx, maxBatchSize);
1535+
: (savedIdx) => this.#consumeCachedN(savedIdx, this.#maxBatchSize);
15671536
this.#workers = [];
15681537

15691538
await this.#lock.write(async () => {

types/kafkajs.d.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,17 @@ export interface ConsumerConfig {
244244
partitionAssignors?: PartitionAssignors[],
245245
}
246246

247-
export type ConsumerGlobalAndTopicConfig = ConsumerGlobalConfig & ConsumerTopicConfig;
247+
export interface JSConsumerConfig {
248+
/**
249+
* Maximum batch size passed in eachBatch calls.
250+
* A value of -1 means no limit.
251+
*
252+
* @default 32
253+
*/
254+
'js.max.batch.size'?: string | number
255+
}
256+
257+
export type ConsumerGlobalAndTopicConfig = ConsumerGlobalConfig & ConsumerTopicConfig & JSConsumerConfig;
248258

249259
export interface ConsumerConstructorConfig extends ConsumerGlobalAndTopicConfig {
250260
kafkaJS?: ConsumerConfig;

0 commit comments

Comments
 (0)