Skip to content

Conversation

@emasab
Copy link
Contributor

@emasab emasab commented Oct 29, 2025

Closes #286.

batch size is now configurable through the js.consumer.max.batch.size property (-1 for unlimited batch size). Even when unlimited, batch sizes are actually limited by librdkafka consumer buffer. js.consumer.max.cache.size.per.worker.ms allows to configure the amount of data kept in buffer in terms of milliseconds based on the estimated consume rate.

Checklist

  • Contains customer facing changes? The new js properties.

Test & Review

automatic tests and running the performance example

Copilot AI review requested due to automatic review settings October 29, 2025 15:58
@emasab emasab requested review from a team as code owners October 29, 2025 15:58
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements configurable batch size for consumer operations through new JavaScript-specific properties js.consumer.max.batch.size and js.consumer.max.cache.size.per.worker.ms. The changes include a new cache size calculation algorithm based on consumption rate estimation and fixes for message cache management during rebalances.

Key changes:

  • Adds configuration options for batch size (-1 for unlimited) and cache size based on consumption rate
  • Replaces the fixed cache increase/decrease logic with dynamic size calculation based on message consumption rate
  • Introduces message return mechanism to preserve at-least-once delivery guarantees during pending operations

Reviewed Changes

Copilot reviewed 12 out of 14 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
lib/kafkajs/_consumer.js Implements new configuration properties, dynamic cache sizing, and message return logic
lib/kafkajs/_consumer_cache.js Adds methods to prepend messages back to cache for reprocessing
test/promisified/consumer/consumerCacheTests.spec.js Updates test parameters and timing to work with new batch size behavior
test/promisified/consumer/consumeMessages.spec.js Adjusts tests for dynamic batch sizes and adds producer disconnect calls
test/promisified/admin/fetch_offsets.spec.js Fixes timing issue with offset commit test
test/promisified/producer/flush.spec.js Adds missing producer initialization and cleanup
MIGRATION.md Documents new configuration properties
CHANGELOG.md Adds release notes for v1.6.1
package.json, schemaregistry/package.json, lib/util.js Version bumps to 1.6.1
ci/update-version.js Fixes prerelease version separator

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

firstLongBatchProcessing = receivedMessages;
}
if ( receivedMessages === 14) {
if (firstLongBatchProcessing && receivedMessages === firstLongBatchProcessing + 1) {
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential logic error: the condition checks receivedMessages === firstLongBatchProcessing + 1, but firstLongBatchProcessing is set inside a condition that may be true multiple times. If multiple batches have >= 32 messages, firstLongBatchProcessing will be set multiple times, causing this condition to trigger incorrectly.

Copilot uses AI. Check for mistakes.
Comment on lines +1000 to +1006
for (const msg of messages) {
const key = partitionKey(msg);
partitionsNum.set(key, 1);
if (partitionsNum.size >= this.#concurrency) {
break;
}
}
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop counts unique partition keys but always sets the value to 1. This could be simplified by using a Set instead of a Map, which would make the intent clearer and slightly improve performance.

Copilot uses AI. Check for mistakes.
@sonarqube-confluent
Copy link

Quality Gate failed Quality Gate failed

Failed conditions
3 New issues
1 Security Hotspot
69.4% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE SonarQube for IDE

Copy link
Contributor

@milindl milindl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments, looks generally ok and I'd seen the previous PR

@@ -1,3 +1,16 @@
# confluent-kafka-javascript 1.6.1

v1.6.1 is a maintenance release. It is supported for all usage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be 1.7.0

Comment on lines +308 to +310
and defaults to 32. `js.consumer.max.cache.size.per.worker.ms` allows to
configure the cache size estimated based on consumption rate and defaults
to 1.5 seconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
and defaults to 32. `js.consumer.max.cache.size.per.worker.ms` allows to
configure the cache size estimated based on consumption rate and defaults
to 1.5 seconds.
and defaults to 32. `js.consumer.max.cache.size.per.worker.ms` allows to
configure the cache size estimated based on consumption rate and defaults to the cache being sized to 1.5s worth of messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this comment exactly, but at the moment I feel as if the js.consumer.max.cache.size.per.worker.ms is not explained fully.

ppc = this.#discardMessages(ms, ppc);
break;
/*
* Don't process messages anymore, execute the operations first.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

batch size limit is severely limiting compared to KafkaJS

3 participants