From f4b68b136a47c5e9e17149d944756bd2a6f40e4b Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 15:21:35 +0100 Subject: [PATCH 01/16] Configurable batch size --- CHANGELOG.md | 9 ++++++ MIGRATION.md | 6 ++-- lib/kafkajs/_consumer.js | 69 +++++++++++----------------------------- types/kafkajs.d.ts | 12 ++++++- 4 files changed, 42 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b5823789..60b57794 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# confluent-kafka-javascript 1.6.1 + +v1.6.1 is a maintenance release. It is supported for all usage. + +### Enhancements + +1. Configurable batch size through the `js.max.batch.size` property (#389). + + # confluent-kafka-javascript 1.6.0 v1.6.0 is a feature release. It is supported for all usage. diff --git a/MIGRATION.md b/MIGRATION.md index 27ae6438..13cd5621 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -303,9 +303,9 @@ producerRun().then(consumerRun).catch(console.error); - The `heartbeat()` no longer needs to be called by the user in the `eachMessage/eachBatch` callback. Heartbeats are automatically managed by librdkafka. - The `partitionsConsumedConcurrently` is supported by both `eachMessage` and `eachBatch`. - - An API compatible version of `eachBatch` is available, but the batch size calculation is not - as per configured parameters, rather, a constant maximum size is configured internally. This is subject - to change. + - An API compatible version of `eachBatch` is available, maximum batch size + can be configured through the `js.max.batch.size` configuration property + and defaults to 32. The property `eachBatchAutoResolve` is supported. Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported, and within the returned batch, `offsetLag` and `offsetLagLow` are supported. diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index ada184f8..20d093fb 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -155,17 +155,6 @@ class Consumer { */ #messageCache = null; - /** - * The maximum size of the message cache. - * Will be adjusted dynamically. - */ - #messageCacheMaxSize = 1; - - /** - * Number of times we tried to increase the cache. - */ - #increaseCount = 0; - /** * Whether the user has enabled manual offset management (commits). */ @@ -182,6 +171,11 @@ class Consumer { */ #partitionCount = 0; + /** + * Maximum batch size passed in eachBatch calls. + */ + #maxBatchSize = 32; + /** * Whether worker termination has been scheduled. */ @@ -311,8 +305,6 @@ class Consumer { * consumed messages upto N from the internalClient, but the user has stale'd the cache * after consuming just k (< N) messages. We seek back to last consumed offset + 1. */ this.#messageCache.clear(); - this.#messageCacheMaxSize = 1; - this.#increaseCount = 0; const clearPartitions = this.assignment(); const seeks = []; for (const topicPartition of clearPartitions) { @@ -691,6 +683,17 @@ class Consumer { this.#cacheExpirationTimeoutMs = this.#maxPollIntervalMs; rdKafkaConfig['max.poll.interval.ms'] = this.#maxPollIntervalMs * 2; + if (rdKafkaConfig['js.max.batch.size'] !== undefined) { + const maxBatchSize = +rdKafkaConfig['js.max.batch.size']; + if (!Number.isInteger(maxBatchSize) || (maxBatchSize <= 0 && maxBatchSize !== -1)) { + throw new error.KafkaJSError( + "'js.max.batch.size' must be a positive integer or -1 for unlimited batch size.", + { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + this.#maxBatchSize = maxBatchSize; + delete rdKafkaConfig['js.max.batch.size']; + } + return rdKafkaConfig; } @@ -844,33 +847,6 @@ class Consumer { await this.commitOffsets(); } - /** - * Request a size increase. - * It increases the size by 2x, but only if the size is less than 1024, - * only if the size has been requested to be increased twice in a row. - * @private - */ - #increaseMaxSize() { - if (this.#messageCacheMaxSize === 1024) - return; - this.#increaseCount++; - if (this.#increaseCount <= 1) - return; - this.#messageCacheMaxSize = Math.min(this.#messageCacheMaxSize << 1, 1024); - this.#increaseCount = 0; - } - - /** - * Request a size decrease. - * It decreases the size to 80% of the last received size, with a minimum of 1. - * @param {number} recvdSize - the number of messages received in the last poll. - * @private - */ - #decreaseMaxSize(recvdSize) { - this.#messageCacheMaxSize = Math.max(Math.floor((recvdSize * 8) / 10), 1); - this.#increaseCount = 0; - } - /** * Converts a list of messages returned by node-rdkafka into a message that can be used by the eachBatch callback. * @param {import("../..").Message[]} messages - must not be empty. Must contain messages from the same topic and partition. @@ -1001,11 +977,6 @@ class Consumer { const res = takeFromCache(); this.#lastFetchClockNs = hrtime.bigint(); this.#maxPollIntervalRestart.resolve(); - if (messages.length === this.#messageCacheMaxSize) { - this.#increaseMaxSize(); - } else { - this.#decreaseMaxSize(messages.length); - } return res; } finally { this.#fetchInProgress.resolve(); @@ -1040,7 +1011,7 @@ class Consumer { } return this.#fetchAndResolveWith(() => this.#messageCache.next(), - this.#messageCacheMaxSize); + Number.MAX_SAFE_INTEGER); } /** @@ -1071,7 +1042,7 @@ class Consumer { return this.#fetchAndResolveWith(() => this.#messageCache.nextN(null, size), - this.#messageCacheMaxSize); + Number.MAX_SAFE_INTEGER); } /** @@ -1559,11 +1530,9 @@ class Consumer { async #runInternal(config) { this.#concurrency = config.partitionsConsumedConcurrently; const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor; - /* TODO: make this dynamic, based on max batch size / size of last message seen. */ - const maxBatchSize = 32; const fetcher = config.eachMessage ? (savedIdx) => this.#consumeSingleCached(savedIdx) - : (savedIdx) => this.#consumeCachedN(savedIdx, maxBatchSize); + : (savedIdx) => this.#consumeCachedN(savedIdx, this.#maxBatchSize); this.#workers = []; await this.#lock.write(async () => { diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 6045a77e..9048fb4e 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -244,7 +244,17 @@ export interface ConsumerConfig { partitionAssignors?: PartitionAssignors[], } -export type ConsumerGlobalAndTopicConfig = ConsumerGlobalConfig & ConsumerTopicConfig; +export interface JSConsumerConfig { + /** + * Maximum batch size passed in eachBatch calls. + * A value of -1 means no limit. + * + * @default 32 + */ + 'js.max.batch.size'?: string | number +} + +export type ConsumerGlobalAndTopicConfig = ConsumerGlobalConfig & ConsumerTopicConfig & JSConsumerConfig; export interface ConsumerConstructorConfig extends ConsumerGlobalAndTopicConfig { kafkaJS?: ConsumerConfig; From 09c6197cd573525f272c6f54b73588900cf61edf Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Sun, 19 Oct 2025 11:20:27 +0200 Subject: [PATCH 02/16] Keep 1.5s of data in cache --- lib/kafkajs/_consumer.js | 73 ++++++++++++++++++- .../consumer/consumeMessages.spec.js | 24 ++---- 2 files changed, 74 insertions(+), 23 deletions(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 20d093fb..c7ce6289 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -155,6 +155,12 @@ class Consumer { */ #messageCache = null; + /** + * The maximum size of the message cache. + * Will be adjusted dynamically. + */ + #messageCacheMaxSize = 1; + /** * Whether the user has enabled manual offset management (commits). */ @@ -175,6 +181,7 @@ class Consumer { * Maximum batch size passed in eachBatch calls. */ #maxBatchSize = 32; + #maxBatchesSize = 32; /** * Whether worker termination has been scheduled. @@ -191,6 +198,9 @@ class Consumer { */ #concurrency = 1; + + #runConfig = null; + /** * Promise that resolves together with last in progress fetch. * It's set to null when no fetch is in progress. @@ -228,7 +238,15 @@ class Consumer { /** * Last fetch real time clock in nanoseconds. */ - #lastFetchClockNs = 0; + #lastFetchClockNs = 0n; + /** + * Last number of messages fetched. + */ + #lastFetchedMessageCnt = 0n; + /** + * Last fetch concurrency used. + */ + #lastFetchedConcurrency = 0n; /** * List of pending operations to be executed after @@ -691,6 +709,10 @@ class Consumer { { code: error.ErrorCodes.ERR__INVALID_ARG }); } this.#maxBatchSize = maxBatchSize; + this.#maxBatchesSize = maxBatchSize; + if (maxBatchSize === -1) { + this.#messageCacheMaxSize = Number.MAX_SAFE_INTEGER; + } delete rdKafkaConfig['js.max.batch.size']; } @@ -933,6 +955,43 @@ class Consumer { return returnPayload; } + #updateMaxMessageCacheSize() { + if (this.#maxBatchSize === -1) { + return; + } + + const nowNs = hrtime.bigint(); + if (this.#lastFetchedMessageCnt > 0 && this.#lastFetchClockNs > 0n && + nowNs > this.#lastFetchClockNs) { + const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9; + const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds; + // Keep enough messages in the cache for 1.5 seconds of consumption. + this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency; + const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency; + if (this.#messageCacheMaxSize < minCacheSize) + this.#messageCacheMaxSize = minCacheSize; + else if (this.#messageCacheMaxSize > minCacheSize * 10) + this.#messageCacheMaxSize = minCacheSize * 10; + if (this.#messageCacheMaxSize < 1024) + this.#messageCacheMaxSize = 1024; + } + } + + #saveFetchStats(messages) { + this.#lastFetchClockNs = hrtime.bigint(); + const partitionsNum = new Map(); + for (const msg of messages) { + const key = partitionKey(msg); + partitionsNum.set(key, 1); + if (partitionsNum.size >= this.#concurrency) { + break; + } + } + this.#lastFetchedConcurrency = partitionsNum.size; + this.#lastFetchedMessageCnt = messages.length; + } + + async #fetchAndResolveWith(takeFromCache, size) { if (this.#fetchInProgress) { await this.#fetchInProgress; @@ -959,6 +1018,8 @@ class Consumer { const fetchResult = new DeferredPromise(); this.#logger.debug(`Attempting to fetch ${size} messages to the message cache`, this.#createConsumerBindingMessageMetadata()); + + this.#updateMaxMessageCacheSize(); this.#internalClient.consume(size, (err, messages) => fetchResult.resolve([err, messages])); @@ -975,7 +1036,7 @@ class Consumer { this.#messageCache.addMessages(messages); const res = takeFromCache(); - this.#lastFetchClockNs = hrtime.bigint(); + this.#saveFetchStats(messages); this.#maxPollIntervalRestart.resolve(); return res; } finally { @@ -1011,7 +1072,7 @@ class Consumer { } return this.#fetchAndResolveWith(() => this.#messageCache.next(), - Number.MAX_SAFE_INTEGER); + this.#messageCacheMaxSize); } /** @@ -1042,7 +1103,7 @@ class Consumer { return this.#fetchAndResolveWith(() => this.#messageCache.nextN(null, size), - Number.MAX_SAFE_INTEGER); + this.#messageCacheMaxSize); } /** @@ -1528,7 +1589,9 @@ class Consumer { * @private */ async #runInternal(config) { + this.#runConfig = config; this.#concurrency = config.partitionsConsumedConcurrently; + this.#maxBatchesSize = this.#maxBatchSize * this.#concurrency; const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor; const fetcher = config.eachMessage ? (savedIdx) => this.#consumeSingleCached(savedIdx) @@ -1543,6 +1606,8 @@ class Consumer { this.#workerTerminationScheduled = new DeferredPromise(); this.#lastFetchClockNs = hrtime.bigint(); + this.#lastFetchedMessageCnt = 0; + this.#lastFetchedConcurrency = 0; if (this.#pendingOperations.length === 0) { const workersToSpawn = Math.max(1, Math.min(this.#concurrency, this.#partitionCount)); const cacheExpirationLoop = this.#cacheExpirationLoop(); diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index ca9204c6..2fc0ccd3 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -642,9 +642,6 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit let errors = false; let receivedMessages = 0; - const batchLengths = [1, 1, 2, - /* cache reset */ - 1, 1]; consumer.run({ partitionsConsumedConcurrently, eachBatchAutoResolve: true, @@ -652,10 +649,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit receivedMessages++; try { - expect(event.batch.messages.length) - .toEqual(batchLengths[receivedMessages - 1]); - - if (receivedMessages === 3) { + if (event.batch.messages.length >= 32) { expect(event.isStale()).toEqual(false); await sleep(7500); /* 7.5s 'processing' @@ -732,13 +726,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit let errors = false; let receivedMessages = 0; - const batchLengths = [/* first we reach batches of 32 message and fetches of 64 - * max poll interval exceeded happens on second - * 32 messages batch of the 64 msg fetch. */ - 1, 1, 2, 2, 4, 4, 8, 8, 16, 16, 32, 32, 32, 32, - /* max poll interval exceeded, 32 reprocessed + - * 1 new message. */ - 1, 1, 2, 2, 4, 4, 8, 8, 3]; + let firstLongBatchProcessing; consumer.run({ partitionsConsumedConcurrently, eachBatchAutoResolve: true, @@ -746,17 +734,15 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit receivedMessages++; try { - expect(event.batch.messages.length) - .toEqual(batchLengths[receivedMessages - 1]); - - if (receivedMessages === 13) { + if (!firstLongBatchProcessing && event.batch.messages.length >= 32) { expect(event.isStale()).toEqual(false); await sleep(6000); /* 6s 'processing' * cache clearance starts at 7000 */ expect(event.isStale()).toEqual(false); + firstLongBatchProcessing = receivedMessages; } - if ( receivedMessages === 14) { + if (firstLongBatchProcessing && receivedMessages === firstLongBatchProcessing + 1) { expect(event.isStale()).toEqual(false); await sleep(10000); /* 10s 'processing' From df6c83388f6569afb0571f110ff8c9974412fa77 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 15:14:56 +0100 Subject: [PATCH 03/16] Remove minimum 1024 cache size for faster rebalances in case of long processing time --- lib/kafkajs/_consumer.js | 12 +++++--- .../consumer/consumeMessages.spec.js | 28 +++++++++++++------ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index c7ce6289..77fa73a8 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -957,6 +957,9 @@ class Consumer { #updateMaxMessageCacheSize() { if (this.#maxBatchSize === -1) { + // In case of unbounded max batch size it returns all available messages + // for a partition in each batch. Cache is unbounded as well as + // it takes only one call to process each partition. return; } @@ -965,15 +968,16 @@ class Consumer { nowNs > this.#lastFetchClockNs) { const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9; const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds; - // Keep enough messages in the cache for 1.5 seconds of consumption. + // Keep enough messages in the cache for 1.5 seconds of concurrent consumption. this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency; const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency; if (this.#messageCacheMaxSize < minCacheSize) + // Keep at least one batch or one message per worker. + // It's possible less workers than requested were active in previous run. this.#messageCacheMaxSize = minCacheSize; else if (this.#messageCacheMaxSize > minCacheSize * 10) + // Keep at most 10 messages or batches per requested worker. this.#messageCacheMaxSize = minCacheSize * 10; - if (this.#messageCacheMaxSize < 1024) - this.#messageCacheMaxSize = 1024; } } @@ -1018,7 +1022,7 @@ class Consumer { const fetchResult = new DeferredPromise(); this.#logger.debug(`Attempting to fetch ${size} messages to the message cache`, this.#createConsumerBindingMessageMetadata()); - + this.#updateMaxMessageCacheSize(); this.#internalClient.consume(size, (err, messages) => fetchResult.resolve([err, messages])); diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index 2fc0ccd3..5c0df12d 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -412,6 +412,11 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit partitions: partitions, }); + // If you have a large consume time and consuming one message at a time, + // you need to have very small batch sizes to keep the concurrency up. + // It's to avoid having a too large cache and postponing the next fetch + // and so the rebalance too much. + const producer = createProducer({}, {'batch.num.messages': '1'}); await producer.connect(); await consumer.connect(); await consumer.subscribe({ topic: topicName }); @@ -448,6 +453,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await producer.send({ topic: topicName, messages }); await maxConcurrentWorkersReached; expect(inProgressMaxValue).toBe(expectedMaxConcurrentWorkers); + await producer.disconnect(); }); it('consume GZIP messages', async () => { @@ -612,6 +618,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit let assigns = 0; let revokes = 0; let lost = 0; + let firstBatchProcessing; consumer = createConsumer({ groupId, maxWaitTimeInMs: 100, @@ -649,14 +656,14 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit receivedMessages++; try { - if (event.batch.messages.length >= 32) { - expect(event.isStale()).toEqual(false); - await sleep(7500); - /* 7.5s 'processing' - * doesn't exceed max poll interval. - * Cache reset is transparent */ - expect(event.isStale()).toEqual(false); - } + expect(event.isStale()).toEqual(false); + await sleep(7500); + /* 7.5s 'processing' + * doesn't exceed max poll interval. + * Cache reset is transparent */ + expect(event.isStale()).toEqual(false); + if (firstBatchProcessing === undefined) + firstBatchProcessing = receivedMessages; } catch (e) { console.error(e); errors = true; @@ -680,6 +687,8 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit /* Triggers revocation */ await consumer.disconnect(); + expect(firstBatchProcessing).toBeDefined(); + expect(receivedMessages).toBeGreaterThan(firstBatchProcessing); /* First assignment */ expect(assigns).toEqual(1); /* Revocation on disconnect */ @@ -777,6 +786,9 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit /* Triggers revocation */ await consumer.disconnect(); + expect(firstLongBatchProcessing).toBeDefined(); + expect(receivedMessages).toBeGreaterThan(firstLongBatchProcessing); + /* First assignment + assignment after partitions lost */ expect(assigns).toEqual(2); /* Partitions lost + revocation on disconnect */ From cf45a3c523aadf15ec1705d73176b6e6cf456ef2 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 20 Oct 2025 13:50:32 +0200 Subject: [PATCH 04/16] Changed property name to `js.consumer.max.batch.size` --- CHANGELOG.md | 2 +- MIGRATION.md | 2 +- lib/kafkajs/_consumer.js | 8 ++++---- types/kafkajs.d.ts | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60b57794..3e031d8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ v1.6.1 is a maintenance release. It is supported for all usage. ### Enhancements -1. Configurable batch size through the `js.max.batch.size` property (#389). +1. Configurable batch size through the `js.consumer.max.batch.size` property (#389). # confluent-kafka-javascript 1.6.0 diff --git a/MIGRATION.md b/MIGRATION.md index 13cd5621..26ad9efd 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -304,7 +304,7 @@ producerRun().then(consumerRun).catch(console.error); Heartbeats are automatically managed by librdkafka. - The `partitionsConsumedConcurrently` is supported by both `eachMessage` and `eachBatch`. - An API compatible version of `eachBatch` is available, maximum batch size - can be configured through the `js.max.batch.size` configuration property + can be configured through the `js.consumer.max.batch.size` configuration property and defaults to 32. The property `eachBatchAutoResolve` is supported. Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported, diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 77fa73a8..6a1d8987 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -701,11 +701,11 @@ class Consumer { this.#cacheExpirationTimeoutMs = this.#maxPollIntervalMs; rdKafkaConfig['max.poll.interval.ms'] = this.#maxPollIntervalMs * 2; - if (rdKafkaConfig['js.max.batch.size'] !== undefined) { - const maxBatchSize = +rdKafkaConfig['js.max.batch.size']; + if (rdKafkaConfig['js.consumer.max.batch.size'] !== undefined) { + const maxBatchSize = +rdKafkaConfig['js.consumer.max.batch.size']; if (!Number.isInteger(maxBatchSize) || (maxBatchSize <= 0 && maxBatchSize !== -1)) { throw new error.KafkaJSError( - "'js.max.batch.size' must be a positive integer or -1 for unlimited batch size.", + "'js.consumer.max.batch.size' must be a positive integer or -1 for unlimited batch size.", { code: error.ErrorCodes.ERR__INVALID_ARG }); } this.#maxBatchSize = maxBatchSize; @@ -713,7 +713,7 @@ class Consumer { if (maxBatchSize === -1) { this.#messageCacheMaxSize = Number.MAX_SAFE_INTEGER; } - delete rdKafkaConfig['js.max.batch.size']; + delete rdKafkaConfig['js.consumer.max.batch.size']; } return rdKafkaConfig; diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 9048fb4e..a9bb92ac 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -251,7 +251,7 @@ export interface JSConsumerConfig { * * @default 32 */ - 'js.max.batch.size'?: string | number + 'js.consumer.max.batch.size'?: string | number } export type ConsumerGlobalAndTopicConfig = ConsumerGlobalConfig & ConsumerTopicConfig & JSConsumerConfig; From 28edb19ecbb758087e07fcc5623cb3cd6543d99c Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 22 Oct 2025 17:21:08 +0200 Subject: [PATCH 05/16] Use only 1.5 seconds cache size estimation aligned at batch size * concurrency --- lib/kafkajs/_consumer.js | 80 +++---------------- .../consumer/consumeMessages.spec.js | 3 +- .../consumer/consumerCacheTests.spec.js | 26 +++--- 3 files changed, 26 insertions(+), 83 deletions(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 6a1d8987..1d067b09 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -197,10 +197,6 @@ class Consumer { * The number of partitions to consume concurrently as set by the user, or 1. */ #concurrency = 1; - - - #runConfig = null; - /** * Promise that resolves together with last in progress fetch. * It's set to null when no fetch is in progress. @@ -958,7 +954,7 @@ class Consumer { #updateMaxMessageCacheSize() { if (this.#maxBatchSize === -1) { // In case of unbounded max batch size it returns all available messages - // for a partition in each batch. Cache is unbounded as well as + // for a partition in each batch. Cache is unbounded given that // it takes only one call to process each partition. return; } @@ -969,15 +965,11 @@ class Consumer { const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9; const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds; // Keep enough messages in the cache for 1.5 seconds of concurrent consumption. - this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency; - const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency; - if (this.#messageCacheMaxSize < minCacheSize) - // Keep at least one batch or one message per worker. - // It's possible less workers than requested were active in previous run. - this.#messageCacheMaxSize = minCacheSize; - else if (this.#messageCacheMaxSize > minCacheSize * 10) - // Keep at most 10 messages or batches per requested worker. - this.#messageCacheMaxSize = minCacheSize * 10; + // Round up to the nearest multiple of `#maxBatchesSize`. + this.#messageCacheMaxSize = Math.ceil( + Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency + / this.#maxBatchesSize + ) * this.#maxBatchesSize; } } @@ -1110,24 +1102,6 @@ class Consumer { this.#messageCacheMaxSize); } - /** - * Consumes n messages from the internal consumer. - * @returns {Promise} A promise that resolves to a list of messages. The size of this list is guaranteed to be less than or equal to n. - * @note this method cannot be used in conjunction with #consumeSingleCached. - * @private - */ - async #consumeN(n) { - return new Promise((resolve, reject) => { - this.#internalClient.consume(n, (err, messages) => { - if (err) { - reject(createKafkaJsErrorFromLibRdKafkaError(err)); - return; - } - resolve(messages); - }); - }); - } - /** * Flattens a list of topics with partitions into a list of topic, partition. * @param {Array<({topic: string, partitions: Array}|{topic: string, partition: number})>} topics @@ -1593,16 +1567,18 @@ class Consumer { * @private */ async #runInternal(config) { - this.#runConfig = config; - this.#concurrency = config.partitionsConsumedConcurrently; - this.#maxBatchesSize = this.#maxBatchSize * this.#concurrency; const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor; const fetcher = config.eachMessage ? (savedIdx) => this.#consumeSingleCached(savedIdx) : (savedIdx) => this.#consumeCachedN(savedIdx, this.#maxBatchSize); - this.#workers = []; await this.#lock.write(async () => { + this.#workers = []; + this.#concurrency = config.partitionsConsumedConcurrently; + this.#maxBatchesSize = ( + config.eachBatch && this.#maxBatchSize > 0 ? + this.#maxBatchSize : + 1) * this.#concurrency; while (!this.#disconnectStarted) { if (this.#maxPollIntervalRestart.resolved) @@ -1639,38 +1615,6 @@ class Consumer { this.#maxPollIntervalRestart.resolve(); } - /** - * Consumes a single message from the consumer within the given timeout. - * THIS METHOD IS NOT IMPLEMENTED. - * @note This method cannot be used with run(). Either that, or this must be used. - * - * @param {any} args - * @param {number} args.timeout - the timeout in milliseconds, defaults to 1000. - * @returns {import("../..").Message|null} a message, or null if the timeout was reached. - * @private - */ - async consume({ timeout } = { timeout: 1000 }) { - if (this.#state !== ConsumerState.CONNECTED) { - throw new error.KafkaJSError('consume can only be called while connected.', { code: error.ErrorCodes.ERR__STATE }); - } - - if (this.#running) { - throw new error.KafkaJSError('consume() and run() cannot be used together.', { code: error.ErrorCodes.ERR__CONFLICT }); - } - - this.#internalClient.setDefaultConsumeTimeout(timeout); - let m = null; - - try { - const ms = await this.#consumeN(1); - m = ms[0]; - } finally { - this.#internalClient.setDefaultConsumeTimeout(undefined); - } - - throw new error.KafkaJSError('consume() is not implemented.' + m, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED }); - } - async #commitOffsetsUntilNoStateErr(offsetsToCommit) { let err = { code: error.ErrorCodes.ERR_NO_ERROR }; do { diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index 5c0df12d..8ef27e04 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -434,9 +434,8 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit inProgressMaxValue = Math.max(inProgress, inProgressMaxValue); if (inProgressMaxValue >= expectedMaxConcurrentWorkers) { maxConcurrentWorkersReached.resolve(); - } else if (messagesConsumed.length > 2048) { - await sleep(1000); } + await sleep(100); inProgress--; }, }); diff --git a/test/promisified/consumer/consumerCacheTests.spec.js b/test/promisified/consumer/consumerCacheTests.spec.js index f923c65f..5ba94d15 100644 --- a/test/promisified/consumer/consumerCacheTests.spec.js +++ b/test/promisified/consumer/consumerCacheTests.spec.js @@ -90,6 +90,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon }); it('is cleared on seek', async () => { + const producer = createProducer({}, {'batch.num.messages': '1'}); await consumer.connect(); await producer.connect(); await consumer.subscribe({ topic: topicName }); @@ -134,6 +135,8 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon expect(messagesConsumed.filter(m => m.partition === 1).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`)); // partition 2 expect(messagesConsumed.filter(m => m.partition === 2).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`)); + + await producer.disconnect(); }); it('is cleared before rebalance', async () => { @@ -142,6 +145,8 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon * the consumers are created with the same groupId, we create them here. * TODO: verify correctness of theory. It's conjecture... which solves flakiness. */ let groupId = `consumer-group-id-${secureRandom()}`; + const multiplier = 9; + const numMessages = 16 * multiplier; consumer = createConsumer({ groupId, maxWaitTimeInMs: 100, @@ -156,6 +161,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon autoCommit: isAutoCommit, clientId: "consumer2", }); + const producer = createProducer({}, {'batch.num.messages': '1'}); await consumer.connect(); await producer.connect(); @@ -164,7 +170,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon const messagesConsumed = []; const messagesConsumedConsumer1 = []; const messagesConsumedConsumer2 = []; - let consumer2ConsumeRunning = false; consumer.run({ partitionsConsumedConcurrently, @@ -176,18 +181,13 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon { topic: event.topic, partition: event.partition, offset: Number(event.message.offset) + 1 }, ]); - /* Until the second consumer joins, consume messages slowly so as to not consume them all - * before the rebalance triggers. */ - if (messagesConsumed.length > 1024 && !consumer2ConsumeRunning) { - await sleep(10); - } + await sleep(100); } }); /* Evenly distribute 1024*9 messages across 3 partitions */ let i = 0; - const multiplier = 9; - const messages = Array(1024 * multiplier) + const messages = Array(numMessages) .fill() .map(() => { const value = secureRandom(); @@ -198,7 +198,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon // Wait for the messages - some of them, before starting the // second consumer. - await waitForMessages(messagesConsumed, { number: 1024 }); + await waitForMessages(messagesConsumed, { number: 16 }); await consumer2.connect(); await consumer2.subscribe({ topic: topicName }); @@ -210,23 +210,23 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon }); await waitFor(() => consumer2.assignment().length > 0, () => null); - consumer2ConsumeRunning = true; /* Now that both consumers have joined, wait for all msgs to be consumed */ - await waitForMessages(messagesConsumed, { number: 1024 * multiplier }); + await waitForMessages(messagesConsumed, { number: numMessages }); /* No extra messages should be consumed. */ await sleep(1000); - expect(messagesConsumed.length).toEqual(1024 * multiplier); + expect(messagesConsumed.length).toEqual(numMessages); /* Check if all messages were consumed. */ expect(messagesConsumed.map(event => (+event.message.offset)).sort((a, b) => a - b)) - .toEqual(Array(1024 * multiplier).fill().map((_, i) => Math.floor(i / 3))); + .toEqual(Array(numMessages).fill().map((_, i) => Math.floor(i / 3))); /* Consumer2 should have consumed at least one message. */ expect(messagesConsumedConsumer2.length).toBeGreaterThan(0); await consumer2.disconnect(); + await producer.disconnect(); }, 60000); it('does not hold up polling for non-message events', async () => { From e34123200b51ea11d47760e2b0eb68be33b83487 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Thu, 23 Oct 2025 11:06:23 +0200 Subject: [PATCH 06/16] Fix for at-least-once guarantee not ensured in case a seek happens on one partition and there are messages being fetched for other partitions --- lib/kafkajs/_consumer.js | 40 +++++++++++++++++++--------------- lib/kafkajs/_consumer_cache.js | 35 ++++++++++++++++++++++++----- 2 files changed, 53 insertions(+), 22 deletions(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 1d067b09..43d5d13a 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1402,21 +1402,21 @@ class Consumer { return ppc; } - #discardMessages(ms, ppc) { - if (ms) { - let m = ms[0]; - if (m.constructor === Array) { - m = m[0]; - } - ppc = ms[1]; - if (m && !this.#lastConsumedOffsets.has(ppc.key)) { + #returnMessages(ms) { + let m = ms[0]; + // ppc could have been change we must return it as well. + let ppc = ms[1]; + const messagesToReturn = m.constructor === Array ? m : [m]; + const firstMessage = messagesToReturn[0]; + if (firstMessage && !this.#lastConsumedOffsets.has(ppc.key)) { this.#lastConsumedOffsets.set(ppc.key, { - topic: m.topic, - partition: m.partition, - offset: m.offset - 1, + topic: firstMessage.topic, + partition: firstMessage.partition, + offset: firstMessage.offset - 1, }); - } } + + this.#messageCache.returnMessages(messagesToReturn); return ppc; } @@ -1463,11 +1463,15 @@ class Consumer { continue; if (this.#pendingOperations.length) { - ppc = this.#discardMessages(ms, ppc); - break; + /* + * Don't process messages anymore, execute the operations first. + * Return the messages to the cache that will be cleared if needed. + * `ppc` could have been changed, we must return it as well. + */ + ppc = this.#returnMessages(ms); + } else { + ppc = await perMessageProcessor(ms, config); } - - ppc = await perMessageProcessor(ms, config); } catch (e) { /* Since this error cannot be exposed to the user in the current situation, just log and retry. * This is due to restartOnFailure being set to always true. */ @@ -1548,7 +1552,9 @@ class Consumer { * @private */ async #executePendingOperations() { - for (const op of this.#pendingOperations) { + // Execute all pending operations, they could add more operations. + while (this.#pendingOperations.length > 0) { + const op = this.#pendingOperations.shift(); await op(); } this.#pendingOperations = []; diff --git a/lib/kafkajs/_consumer_cache.js b/lib/kafkajs/_consumer_cache.js index 9047688e..33a2e81c 100644 --- a/lib/kafkajs/_consumer_cache.js +++ b/lib/kafkajs/_consumer_cache.js @@ -28,12 +28,19 @@ class PerPartitionMessageCache { } /** - * Adds a message to the cache. + * Adds a message to the cache as last one. */ - _add(message) { + _addLast(message) { this.#cache.addLast(message); } + /** + * Adds a message to the cache as first one. + */ + _addFirst(message) { + this.#cache.addFirst(message); + } + get key() { return this.#key; } @@ -126,7 +133,7 @@ class MessageCache { * * @param {Object} message - the message to add to the cache. */ - #add(message) { + #add(message, head = false) { const key = partitionKey(message); let cache = this.#tpToPpc.get(key); if (!cache) { @@ -135,7 +142,11 @@ class MessageCache { cache._node = this.#availablePartitions.addLast(cache); this.notifyAvailablePartitions(); } - cache._add(message); + if (head) { + cache._addFirst(message); + } else { + cache._addLast(message); + } } get availableSize() { @@ -183,7 +194,21 @@ class MessageCache { */ addMessages(messages) { for (const message of messages) - this.#add(message); + this.#add(message, false); + this.#size += messages.length; + } + + /** + * Return messages to the cache, to be read again. + * + * @param {Array} messages - the messages to return to the cache. + */ + returnMessages(messages) { + let i = messages.length - 1; + while (i >= 0) { + this.#add(messages[i], true); + i--; + } this.#size += messages.length; } From 82a4867418dbc71b7eb777e8b8f6630c9ca26ab2 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 10:28:43 +0100 Subject: [PATCH 07/16] Configurable cache size in milliseconds --- CHANGELOG.md | 4 +++- MIGRATION.md | 4 +++- lib/kafkajs/_consumer.js | 26 ++++++++++++++++++++++---- types/kafkajs.d.ts | 9 ++++++++- 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e031d8e..19b33882 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,9 @@ v1.6.1 is a maintenance release. It is supported for all usage. ### Enhancements -1. Configurable batch size through the `js.consumer.max.batch.size` property (#389). +1. Configurable batch size through the `js.consumer.max.batch.size` property + and cache size through the `js.consumer.max.cache.size.per.worker.ms` + property (#389). # confluent-kafka-javascript 1.6.0 diff --git a/MIGRATION.md b/MIGRATION.md index 26ad9efd..1945ae5d 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -305,7 +305,9 @@ producerRun().then(consumerRun).catch(console.error); - The `partitionsConsumedConcurrently` is supported by both `eachMessage` and `eachBatch`. - An API compatible version of `eachBatch` is available, maximum batch size can be configured through the `js.consumer.max.batch.size` configuration property - and defaults to 32. + and defaults to 32. `js.consumer.max.cache.size.per.worker.ms` allows to + configure the cache size estimated based on consumption rate and defaults + to 1.5 seconds. The property `eachBatchAutoResolve` is supported. Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported, and within the returned batch, `offsetLag` and `offsetLagLow` are supported. diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 43d5d13a..1f316847 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -183,6 +183,14 @@ class Consumer { #maxBatchSize = 32; #maxBatchesSize = 32; + /** + * Maximum cache size in milliseconds per worker. + * Based on the consumer rate estimated through the eachMessage/eachBatch calls. + * + * @default 1500 + */ + #maxCacheSizePerWorkerMs = 1500; + /** * Whether worker termination has been scheduled. */ @@ -711,6 +719,16 @@ class Consumer { } delete rdKafkaConfig['js.consumer.max.batch.size']; } + if (rdKafkaConfig['js.consumer.max.cache.size.per.worker.ms'] !== undefined) { + const maxCacheSizePerWorkerMs = +rdKafkaConfig['js.consumer.max.cache.size.per.worker.ms']; + if (!Number.isInteger(maxCacheSizePerWorkerMs) || (maxCacheSizePerWorkerMs <= 0)) { + throw new error.KafkaJSError( + "'js.consumer.max.cache.size.per.worker.ms' must be a positive integer.", + { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + this.#maxCacheSizePerWorkerMs = maxCacheSizePerWorkerMs; + delete rdKafkaConfig['js.consumer.max.cache.size.per.worker.ms']; + } return rdKafkaConfig; } @@ -962,12 +980,12 @@ class Consumer { const nowNs = hrtime.bigint(); if (this.#lastFetchedMessageCnt > 0 && this.#lastFetchClockNs > 0n && nowNs > this.#lastFetchClockNs) { - const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9; - const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds; - // Keep enough messages in the cache for 1.5 seconds of concurrent consumption. + const consumptionDurationMilliseconds = Number(nowNs - this.#lastFetchClockNs) / 1e6; + const messagesPerMillisecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationMilliseconds; + // Keep enough messages in the cache for this.#maxCacheSizePerWorkerMs of concurrent consumption by all workers. // Round up to the nearest multiple of `#maxBatchesSize`. this.#messageCacheMaxSize = Math.ceil( - Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency + Math.round(this.#maxCacheSizePerWorkerMs * messagesPerMillisecondSingleWorker) * this.#concurrency / this.#maxBatchesSize ) * this.#maxBatchesSize; } diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index a9bb92ac..c18bbc84 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -251,7 +251,14 @@ export interface JSConsumerConfig { * * @default 32 */ - 'js.consumer.max.batch.size'?: string | number + 'js.consumer.max.batch.size'?: string | number, + /** + * Maximum cache size per worker in milliseconds based on the + * consume rate estimated through the eachMessage/eachBatch calls. + * + * @default 1500 + */ + 'js.consumer.max.cache.size.per.worker.ms'?: string | number } export type ConsumerGlobalAndTopicConfig = ConsumerGlobalConfig & ConsumerTopicConfig & JSConsumerConfig; From 03369f426e7ede47f5b76be6e679f0c9b72aae4a Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 13:52:18 +0100 Subject: [PATCH 08/16] Add worker identifier to the payload for better debugging --- lib/kafkajs/_consumer.js | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 1f316847..6dad154f 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -800,7 +800,7 @@ class Consumer { * @returns {import("../../types/kafkajs").EachMessagePayload} * @private */ - #createPayload(message) { + #createPayload(message, worker) { let key = message.key; if (typeof key === 'string') { key = Buffer.from(key); @@ -824,6 +824,7 @@ class Consumer { }, heartbeat: async () => { /* no op */ }, pause: this.pause.bind(this, [{ topic: message.topic, partitions: [message.partition] }]), + _worker: worker, }; } @@ -889,7 +890,7 @@ class Consumer { * @returns {import("../../types/kafkajs").EachBatchPayload} * @private */ - #createBatchPayload(messages) { + #createBatchPayload(messages, worker) { const topic = messages[0].topic; const partition = messages[0].partition; let watermarkOffsets = {}; @@ -954,6 +955,7 @@ class Consumer { _stale: false, _seeked: false, _lastResolvedOffset: { offset: -1, leaderEpoch: -1 }, + _worker: worker, heartbeat: async () => { /* no op */ }, pause: this.pause.bind(this, [{ topic, partitions: [partition] }]), commitOffsetsIfNecessary: this.#eachBatchPayload_commitOffsetsIfNecessary.bind(this), @@ -1290,12 +1292,12 @@ class Consumer { * @returns {Promise} The cache index of the message that was processed. * @private */ - async #messageProcessor(m, config) { + async #messageProcessor(m, config, worker) { let ppc; [m, ppc] = m; let key = partitionKey(m); let eachMessageProcessed = false; - const payload = this.#createPayload(m); + const payload = this.#createPayload(m, worker); try { this.#lastConsumedOffsets.set(key, m); @@ -1351,11 +1353,11 @@ class Consumer { * the passed batch. * @private */ - async #batchProcessor(ms, config) { + async #batchProcessor(ms, config, worker) { let ppc; [ms, ppc] = ms; const key = partitionKey(ms[0]); - const payload = this.#createBatchPayload(ms); + const payload = this.#createBatchPayload(ms, worker); this.#topicPartitionToBatchPayload.set(key, payload); @@ -1474,6 +1476,7 @@ class Consumer { */ async #worker(config, perMessageProcessor, fetcher) { let ppc = null; + let workerId = Math.random().toString().slice(2); while (!this.#workerTerminationScheduled.resolved) { try { const ms = await fetcher(ppc); @@ -1488,7 +1491,7 @@ class Consumer { */ ppc = this.#returnMessages(ms); } else { - ppc = await perMessageProcessor(ms, config); + ppc = await perMessageProcessor(ms, config, workerId); } } catch (e) { /* Since this error cannot be exposed to the user in the current situation, just log and retry. From 940bc20b9864c2225fca16615fcc1bece6a901eb Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 20 Oct 2025 12:26:40 +0200 Subject: [PATCH 09/16] Fix for test flakyness --- test/promisified/admin/fetch_offsets.spec.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/promisified/admin/fetch_offsets.spec.js b/test/promisified/admin/fetch_offsets.spec.js index 08e5a81a..c6412530 100644 --- a/test/promisified/admin/fetch_offsets.spec.js +++ b/test/promisified/admin/fetch_offsets.spec.js @@ -132,8 +132,7 @@ describe("fetchOffset function", () => { await consumer.run({ eachMessage: async ({ topic, partition, message }) => { - messagesConsumed.push(message); // Populate messagesConsumed - if (messagesConsumed.length === 5) { + if (messagesConsumed.length === 4) { await consumer.commitOffsets([ { topic, @@ -142,6 +141,7 @@ describe("fetchOffset function", () => { }, ]); } + messagesConsumed.push(message); // Populate messagesConsumed }, }); From bd0537f6f89680801ca1d926607d907f63f3addd Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Thu, 23 Oct 2025 11:07:57 +0200 Subject: [PATCH 10/16] Make `is cleared before rebalance` less flaky in case of increased time before first assignment --- lib/kafkajs/_consumer.js | 4 ++-- .../consumer/consumerCacheTests.spec.js | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 6dad154f..c81d494b 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1435,7 +1435,7 @@ class Consumer { offset: firstMessage.offset - 1, }); } - + this.#messageCache.returnMessages(messagesToReturn); return ppc; } @@ -1484,7 +1484,7 @@ class Consumer { continue; if (this.#pendingOperations.length) { - /* + /* * Don't process messages anymore, execute the operations first. * Return the messages to the cache that will be cleared if needed. * `ppc` could have been changed, we must return it as well. diff --git a/test/promisified/consumer/consumerCacheTests.spec.js b/test/promisified/consumer/consumerCacheTests.spec.js index 5ba94d15..f62bdb4e 100644 --- a/test/promisified/consumer/consumerCacheTests.spec.js +++ b/test/promisified/consumer/consumerCacheTests.spec.js @@ -90,7 +90,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon }); it('is cleared on seek', async () => { - const producer = createProducer({}, {'batch.num.messages': '1'}); await consumer.connect(); await producer.connect(); await consumer.subscribe({ topic: topicName }); @@ -135,8 +134,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon expect(messagesConsumed.filter(m => m.partition === 1).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`)); // partition 2 expect(messagesConsumed.filter(m => m.partition === 2).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`)); - - await producer.disconnect(); }); it('is cleared before rebalance', async () => { @@ -145,7 +142,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon * the consumers are created with the same groupId, we create them here. * TODO: verify correctness of theory. It's conjecture... which solves flakiness. */ let groupId = `consumer-group-id-${secureRandom()}`; - const multiplier = 9; + const multiplier = 18; const numMessages = 16 * multiplier; consumer = createConsumer({ groupId, @@ -161,7 +158,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon autoCommit: isAutoCommit, clientId: "consumer2", }); - const producer = createProducer({}, {'batch.num.messages': '1'}); await consumer.connect(); await producer.connect(); @@ -181,11 +177,14 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon { topic: event.topic, partition: event.partition, offset: Number(event.message.offset) + 1 }, ]); - await sleep(100); + // Simulate some processing time so we don't poll all messages + // and put them in the cache before consumer2 joins. + if (messagesConsumedConsumer2.length === 0) + await sleep(100); } }); - /* Evenly distribute 1024*9 messages across 3 partitions */ + /* Evenly distribute numMessages messages across 3 partitions */ let i = 0; const messages = Array(numMessages) .fill() @@ -226,7 +225,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon expect(messagesConsumedConsumer2.length).toBeGreaterThan(0); await consumer2.disconnect(); - await producer.disconnect(); }, 60000); it('does not hold up polling for non-message events', async () => { From dc21a5490f25b38d861d136eabaf470ec4b85e6e Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 09:33:21 +0100 Subject: [PATCH 11/16] Reduce flakyness of 'times out if messages are pending' --- test/promisified/producer/flush.spec.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/promisified/producer/flush.spec.js b/test/promisified/producer/flush.spec.js index 64a15b9f..e8dec677 100644 --- a/test/promisified/producer/flush.spec.js +++ b/test/promisified/producer/flush.spec.js @@ -71,6 +71,10 @@ describe('Producer > Flush', () => { it('times out if messages are pending', async () => { + const producer = createProducer({ + }, { + 'batch.num.messages': 1, + }); await producer.connect(); let messageSent = false; @@ -82,6 +86,7 @@ describe('Producer > Flush', () => { /* Small timeout */ await expect(producer.flush({ timeout: 1 })).rejects.toThrow(Kafka.KafkaJSTimeout); expect(messageSent).toBe(false); + await producer.disconnect(); } ); From 9ae3f5934a7b300b9f04197e4d36a335777c43ad Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Mon, 20 Oct 2025 12:43:58 +0200 Subject: [PATCH 12/16] 1.6.1-alpha.1 --- ci/update-version.js | 2 +- lib/util.js | 2 +- package-lock.json | 6 +++--- package.json | 2 +- schemaregistry/package.json | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ci/update-version.js b/ci/update-version.js index 531ed2ef..796261f9 100644 --- a/ci/update-version.js +++ b/ci/update-version.js @@ -89,7 +89,7 @@ function getPackageVersion(tag, branch) { // publish with a -devel suffix for EA and RC releases. if (tag.prerelease.length > 0) { - baseVersion += '-' + tag.prerelease.join('-'); + baseVersion += '-' + tag.prerelease.join('.'); } console.log(`Package version is "${baseVersion}"`); diff --git a/lib/util.js b/lib/util.js index ae539b46..389c3411 100644 --- a/lib/util.js +++ b/lib/util.js @@ -52,4 +52,4 @@ util.dictToStringList = function (mapOrObject) { return list; }; -util.bindingVersion = '1.6.0'; +util.bindingVersion = '1.6.1-alpha.1'; diff --git a/package-lock.json b/package-lock.json index 45f6f0a3..e930a811 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@confluentinc/kafka-javascript", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@confluentinc/kafka-javascript", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "hasInstallScript": true, "license": "MIT", "workspaces": [ @@ -10077,7 +10077,7 @@ }, "schemaregistry": { "name": "@confluentinc/schemaregistry", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "license": "MIT", "dependencies": { "@aws-sdk/client-kms": "^3.637.0", diff --git a/package.json b/package.json index 4d36d2da..0e0e27c3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@confluentinc/kafka-javascript", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "description": "Node.js bindings for librdkafka", "librdkafka": "2.12.0", "librdkafka_win": "2.12.0", diff --git a/schemaregistry/package.json b/schemaregistry/package.json index fe058a56..d6218c6d 100644 --- a/schemaregistry/package.json +++ b/schemaregistry/package.json @@ -1,6 +1,6 @@ { "name": "@confluentinc/schemaregistry", - "version": "1.6.0", + "version": "1.6.1-alpha.1", "description": "Node.js client for Confluent Schema Registry", "main": "dist/index.js", "types": "dist/index.d.ts", From d30ea6d64193a3dbe0c7c79828fa73522142d136 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 16:40:29 +0100 Subject: [PATCH 13/16] fixup CHANGELOG and spaces --- CHANGELOG.md | 2 ++ lib/kafkajs/_consumer.js | 1 + 2 files changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19b33882..a6b7bb9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ v1.6.1 is a maintenance release. It is supported for all usage. 1. Configurable batch size through the `js.consumer.max.batch.size` property and cache size through the `js.consumer.max.cache.size.per.worker.ms` property (#389). +2. Fix for at-least-once guarantee not ensured in case a seek happens on one +partition and there are messages being fetched about other partitions (#389). # confluent-kafka-javascript 1.6.0 diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index c81d494b..d9661ffb 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -205,6 +205,7 @@ class Consumer { * The number of partitions to consume concurrently as set by the user, or 1. */ #concurrency = 1; + /** * Promise that resolves together with last in progress fetch. * It's set to null when no fetch is in progress. From 5a7b93dde696ec9fa26625a0e94df1779f6fa4e2 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 16:42:16 +0100 Subject: [PATCH 14/16] 1.6.1 --- lib/util.js | 2 +- package-lock.json | 6 +++--- package.json | 2 +- schemaregistry/package.json | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/util.js b/lib/util.js index 389c3411..d2dcc9e5 100644 --- a/lib/util.js +++ b/lib/util.js @@ -52,4 +52,4 @@ util.dictToStringList = function (mapOrObject) { return list; }; -util.bindingVersion = '1.6.1-alpha.1'; +util.bindingVersion = '1.6.1'; diff --git a/package-lock.json b/package-lock.json index e930a811..fe9eb280 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@confluentinc/kafka-javascript", - "version": "1.6.1-alpha.1", + "version": "1.6.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@confluentinc/kafka-javascript", - "version": "1.6.1-alpha.1", + "version": "1.6.1", "hasInstallScript": true, "license": "MIT", "workspaces": [ @@ -10077,7 +10077,7 @@ }, "schemaregistry": { "name": "@confluentinc/schemaregistry", - "version": "1.6.1-alpha.1", + "version": "1.6.1", "license": "MIT", "dependencies": { "@aws-sdk/client-kms": "^3.637.0", diff --git a/package.json b/package.json index 0e0e27c3..bebafa9c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@confluentinc/kafka-javascript", - "version": "1.6.1-alpha.1", + "version": "1.6.1", "description": "Node.js bindings for librdkafka", "librdkafka": "2.12.0", "librdkafka_win": "2.12.0", diff --git a/schemaregistry/package.json b/schemaregistry/package.json index d6218c6d..b15a5c8f 100644 --- a/schemaregistry/package.json +++ b/schemaregistry/package.json @@ -1,6 +1,6 @@ { "name": "@confluentinc/schemaregistry", - "version": "1.6.1-alpha.1", + "version": "1.6.1", "description": "Node.js client for Confluent Schema Registry", "main": "dist/index.js", "types": "dist/index.d.ts", From 06e6bcb6a942f3b3a2293c54b7ea27ea7e5392d9 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 16:48:51 +0100 Subject: [PATCH 15/16] FIXME about KIP-848 autocommit issue --- test/promisified/consumer/consumerCacheTests.spec.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/promisified/consumer/consumerCacheTests.spec.js b/test/promisified/consumer/consumerCacheTests.spec.js index f62bdb4e..e5bf1950 100644 --- a/test/promisified/consumer/consumerCacheTests.spec.js +++ b/test/promisified/consumer/consumerCacheTests.spec.js @@ -232,6 +232,10 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon * non-message events like rebalances, etc. Internally, this is to make sure that * we call poll() at least once within max.poll.interval.ms even if the cache is * still full. This depends on us expiring the cache on time. */ + + /* FIXME: this test can be flaky when using KIP-848 protocol and + * auto-commit. To check if there's something to fix about that case. + */ const impatientConsumer = createConsumer({ groupId, maxWaitTimeInMs: 100, From 697a98f4b6cea206fafec8697f070fdc593865ae Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Oct 2025 16:58:50 +0100 Subject: [PATCH 16/16] Updated PR in changelog entry --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6b7bb9f..f6e5830a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,9 @@ v1.6.1 is a maintenance release. It is supported for all usage. 1. Configurable batch size through the `js.consumer.max.batch.size` property and cache size through the `js.consumer.max.cache.size.per.worker.ms` - property (#389). + property (#393). 2. Fix for at-least-once guarantee not ensured in case a seek happens on one -partition and there are messages being fetched about other partitions (#389). +partition and there are messages being fetched about other partitions (#393). # confluent-kafka-javascript 1.6.0