Skip to content

Commit cefcf3f

Browse files
committed
Use only 1.5 seconds cache size estimation aligned at batch size * concurrency
1 parent 9b556d2 commit cefcf3f

File tree

3 files changed

+22
-60
lines changed

3 files changed

+22
-60
lines changed

lib/kafkajs/_consumer.js

Lines changed: 12 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,6 @@ class Consumer {
197197
* The number of partitions to consume concurrently as set by the user, or 1.
198198
*/
199199
#concurrency = 1;
200-
201-
202-
#runConfig = null;
203-
204200
/**
205201
* Promise that resolves together with last in progress fetch.
206202
* It's set to null when no fetch is in progress.
@@ -958,7 +954,7 @@ class Consumer {
958954
#updateMaxMessageCacheSize() {
959955
if (this.#maxBatchSize === -1) {
960956
// In case of unbounded max batch size it returns all available messages
961-
// for a partition in each batch. Cache is unbounded as well as
957+
// for a partition in each batch. Cache is unbounded given that
962958
// it takes only one call to process each partition.
963959
return;
964960
}
@@ -969,15 +965,11 @@ class Consumer {
969965
const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9;
970966
const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds;
971967
// Keep enough messages in the cache for 1.5 seconds of concurrent consumption.
972-
this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency;
973-
const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency;
974-
if (this.#messageCacheMaxSize < minCacheSize)
975-
// Keep at least one batch or one message per worker.
976-
// It's possible less workers than requested were active in previous run.
977-
this.#messageCacheMaxSize = minCacheSize;
978-
else if (this.#messageCacheMaxSize > minCacheSize * 10)
979-
// Keep at most 10 messages or batches per requested worker.
980-
this.#messageCacheMaxSize = minCacheSize * 10;
968+
// Round up to the nearest multiple of `#maxBatchesSize`.
969+
this.#messageCacheMaxSize = Math.ceil(
970+
Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency
971+
/ this.#maxBatchesSize
972+
) * this.#maxBatchesSize;
981973
}
982974
}
983975

@@ -1593,16 +1585,18 @@ class Consumer {
15931585
* @private
15941586
*/
15951587
async #runInternal(config) {
1596-
this.#runConfig = config;
1597-
this.#concurrency = config.partitionsConsumedConcurrently;
1598-
this.#maxBatchesSize = this.#maxBatchSize * this.#concurrency;
15991588
const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor;
16001589
const fetcher = config.eachMessage
16011590
? (savedIdx) => this.#consumeSingleCached(savedIdx)
16021591
: (savedIdx) => this.#consumeCachedN(savedIdx, this.#maxBatchSize);
1603-
this.#workers = [];
16041592

16051593
await this.#lock.write(async () => {
1594+
this.#workers = [];
1595+
this.#concurrency = config.partitionsConsumedConcurrently;
1596+
this.#maxBatchesSize = (
1597+
config.eachBatch && this.#maxBatchSize > 0 ?
1598+
this.#maxBatchSize :
1599+
1) * this.#concurrency;
16061600

16071601
while (!this.#disconnectStarted) {
16081602
if (this.#maxPollIntervalRestart.resolved)
@@ -1639,38 +1633,6 @@ class Consumer {
16391633
this.#maxPollIntervalRestart.resolve();
16401634
}
16411635

1642-
/**
1643-
* Consumes a single message from the consumer within the given timeout.
1644-
* THIS METHOD IS NOT IMPLEMENTED.
1645-
* @note This method cannot be used with run(). Either that, or this must be used.
1646-
*
1647-
* @param {any} args
1648-
* @param {number} args.timeout - the timeout in milliseconds, defaults to 1000.
1649-
* @returns {import("../..").Message|null} a message, or null if the timeout was reached.
1650-
* @private
1651-
*/
1652-
async consume({ timeout } = { timeout: 1000 }) {
1653-
if (this.#state !== ConsumerState.CONNECTED) {
1654-
throw new error.KafkaJSError('consume can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
1655-
}
1656-
1657-
if (this.#running) {
1658-
throw new error.KafkaJSError('consume() and run() cannot be used together.', { code: error.ErrorCodes.ERR__CONFLICT });
1659-
}
1660-
1661-
this.#internalClient.setDefaultConsumeTimeout(timeout);
1662-
let m = null;
1663-
1664-
try {
1665-
const ms = await this.#consumeN(1);
1666-
m = ms[0];
1667-
} finally {
1668-
this.#internalClient.setDefaultConsumeTimeout(undefined);
1669-
}
1670-
1671-
throw new error.KafkaJSError('consume() is not implemented.' + m, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
1672-
}
1673-
16741636
async #commitOffsetsUntilNoStateErr(offsetsToCommit) {
16751637
let err = { code: error.ErrorCodes.ERR_NO_ERROR };
16761638
do {

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,8 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
434434
inProgressMaxValue = Math.max(inProgress, inProgressMaxValue);
435435
if (inProgressMaxValue >= expectedMaxConcurrentWorkers) {
436436
maxConcurrentWorkersReached.resolve();
437-
} else if (messagesConsumed.length > 2048) {
438-
await sleep(1000);
439437
}
438+
await sleep(100);
440439
inProgress--;
441440
},
442441
});

test/promisified/consumer/consumerCacheTests.spec.js

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
142142
* the consumers are created with the same groupId, we create them here.
143143
* TODO: verify correctness of theory. It's conjecture... which solves flakiness. */
144144
let groupId = `consumer-group-id-${secureRandom()}`;
145+
const multiplier = 9;
146+
const numMessages = 16 * multiplier;
145147
consumer = createConsumer({
146148
groupId,
147149
maxWaitTimeInMs: 100,
@@ -178,16 +180,15 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
178180

179181
/* Until the second consumer joins, consume messages slowly so as to not consume them all
180182
* before the rebalance triggers. */
181-
if (messagesConsumed.length > 1024 && !consumer2ConsumeRunning) {
182-
await sleep(10);
183+
if (!consumer2ConsumeRunning) {
184+
await sleep(100);
183185
}
184186
}
185187
});
186188

187189
/* Evenly distribute 1024*9 messages across 3 partitions */
188190
let i = 0;
189-
const multiplier = 9;
190-
const messages = Array(1024 * multiplier)
191+
const messages = Array(numMessages)
191192
.fill()
192193
.map(() => {
193194
const value = secureRandom();
@@ -198,7 +199,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
198199

199200
// Wait for the messages - some of them, before starting the
200201
// second consumer.
201-
await waitForMessages(messagesConsumed, { number: 1024 });
202+
await waitForMessages(messagesConsumed, { number: 16 });
202203

203204
await consumer2.connect();
204205
await consumer2.subscribe({ topic: topicName });
@@ -213,15 +214,15 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
213214
consumer2ConsumeRunning = true;
214215

215216
/* Now that both consumers have joined, wait for all msgs to be consumed */
216-
await waitForMessages(messagesConsumed, { number: 1024 * multiplier });
217+
await waitForMessages(messagesConsumed, { number: numMessages });
217218

218219
/* No extra messages should be consumed. */
219220
await sleep(1000);
220-
expect(messagesConsumed.length).toEqual(1024 * multiplier);
221+
expect(messagesConsumed.length).toEqual(numMessages);
221222

222223
/* Check if all messages were consumed. */
223224
expect(messagesConsumed.map(event => (+event.message.offset)).sort((a, b) => a - b))
224-
.toEqual(Array(1024 * multiplier).fill().map((_, i) => Math.floor(i / 3)));
225+
.toEqual(Array(numMessages).fill().map((_, i) => Math.floor(i / 3)));
225226

226227
/* Consumer2 should have consumed at least one message. */
227228
expect(messagesConsumedConsumer2.length).toBeGreaterThan(0);

0 commit comments

Comments
 (0)