Skip to content

Commit 99a906c

Browse files
committed
Use only 1.5 seconds cache size estimation aligned at batch size * concurrency
1 parent 9b556d2 commit 99a906c

File tree

3 files changed

+26
-83
lines changed

3 files changed

+26
-83
lines changed

lib/kafkajs/_consumer.js

Lines changed: 12 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -197,10 +197,6 @@ class Consumer {
197197
* The number of partitions to consume concurrently as set by the user, or 1.
198198
*/
199199
#concurrency = 1;
200-
201-
202-
#runConfig = null;
203-
204200
/**
205201
* Promise that resolves together with last in progress fetch.
206202
* It's set to null when no fetch is in progress.
@@ -958,7 +954,7 @@ class Consumer {
958954
#updateMaxMessageCacheSize() {
959955
if (this.#maxBatchSize === -1) {
960956
// In case of unbounded max batch size it returns all available messages
961-
// for a partition in each batch. Cache is unbounded as well as
957+
// for a partition in each batch. Cache is unbounded given that
962958
// it takes only one call to process each partition.
963959
return;
964960
}
@@ -969,15 +965,11 @@ class Consumer {
969965
const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9;
970966
const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds;
971967
// Keep enough messages in the cache for 1.5 seconds of concurrent consumption.
972-
this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency;
973-
const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency;
974-
if (this.#messageCacheMaxSize < minCacheSize)
975-
// Keep at least one batch or one message per worker.
976-
// It's possible less workers than requested were active in previous run.
977-
this.#messageCacheMaxSize = minCacheSize;
978-
else if (this.#messageCacheMaxSize > minCacheSize * 10)
979-
// Keep at most 10 messages or batches per requested worker.
980-
this.#messageCacheMaxSize = minCacheSize * 10;
968+
// Round up to the nearest multiple of `#maxBatchesSize`.
969+
this.#messageCacheMaxSize = Math.ceil(
970+
Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency
971+
/ this.#maxBatchesSize
972+
) * this.#maxBatchesSize;
981973
}
982974
}
983975

@@ -1110,24 +1102,6 @@ class Consumer {
11101102
this.#messageCacheMaxSize);
11111103
}
11121104

1113-
/**
1114-
* Consumes n messages from the internal consumer.
1115-
* @returns {Promise<import("../..").Message[]>} A promise that resolves to a list of messages. The size of this list is guaranteed to be less than or equal to n.
1116-
* @note this method cannot be used in conjunction with #consumeSingleCached.
1117-
* @private
1118-
*/
1119-
async #consumeN(n) {
1120-
return new Promise((resolve, reject) => {
1121-
this.#internalClient.consume(n, (err, messages) => {
1122-
if (err) {
1123-
reject(createKafkaJsErrorFromLibRdKafkaError(err));
1124-
return;
1125-
}
1126-
resolve(messages);
1127-
});
1128-
});
1129-
}
1130-
11311105
/**
11321106
* Flattens a list of topics with partitions into a list of topic, partition.
11331107
* @param {Array<({topic: string, partitions: Array<number>}|{topic: string, partition: number})>} topics
@@ -1593,16 +1567,18 @@ class Consumer {
15931567
* @private
15941568
*/
15951569
async #runInternal(config) {
1596-
this.#runConfig = config;
1597-
this.#concurrency = config.partitionsConsumedConcurrently;
1598-
this.#maxBatchesSize = this.#maxBatchSize * this.#concurrency;
15991570
const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor;
16001571
const fetcher = config.eachMessage
16011572
? (savedIdx) => this.#consumeSingleCached(savedIdx)
16021573
: (savedIdx) => this.#consumeCachedN(savedIdx, this.#maxBatchSize);
1603-
this.#workers = [];
16041574

16051575
await this.#lock.write(async () => {
1576+
this.#workers = [];
1577+
this.#concurrency = config.partitionsConsumedConcurrently;
1578+
this.#maxBatchesSize = (
1579+
config.eachBatch && this.#maxBatchSize > 0 ?
1580+
this.#maxBatchSize :
1581+
1) * this.#concurrency;
16061582

16071583
while (!this.#disconnectStarted) {
16081584
if (this.#maxPollIntervalRestart.resolved)
@@ -1639,38 +1615,6 @@ class Consumer {
16391615
this.#maxPollIntervalRestart.resolve();
16401616
}
16411617

1642-
/**
1643-
* Consumes a single message from the consumer within the given timeout.
1644-
* THIS METHOD IS NOT IMPLEMENTED.
1645-
* @note This method cannot be used with run(). Either that, or this must be used.
1646-
*
1647-
* @param {any} args
1648-
* @param {number} args.timeout - the timeout in milliseconds, defaults to 1000.
1649-
* @returns {import("../..").Message|null} a message, or null if the timeout was reached.
1650-
* @private
1651-
*/
1652-
async consume({ timeout } = { timeout: 1000 }) {
1653-
if (this.#state !== ConsumerState.CONNECTED) {
1654-
throw new error.KafkaJSError('consume can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
1655-
}
1656-
1657-
if (this.#running) {
1658-
throw new error.KafkaJSError('consume() and run() cannot be used together.', { code: error.ErrorCodes.ERR__CONFLICT });
1659-
}
1660-
1661-
this.#internalClient.setDefaultConsumeTimeout(timeout);
1662-
let m = null;
1663-
1664-
try {
1665-
const ms = await this.#consumeN(1);
1666-
m = ms[0];
1667-
} finally {
1668-
this.#internalClient.setDefaultConsumeTimeout(undefined);
1669-
}
1670-
1671-
throw new error.KafkaJSError('consume() is not implemented.' + m, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
1672-
}
1673-
16741618
async #commitOffsetsUntilNoStateErr(offsetsToCommit) {
16751619
let err = { code: error.ErrorCodes.ERR_NO_ERROR };
16761620
do {

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,9 +434,8 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
434434
inProgressMaxValue = Math.max(inProgress, inProgressMaxValue);
435435
if (inProgressMaxValue >= expectedMaxConcurrentWorkers) {
436436
maxConcurrentWorkersReached.resolve();
437-
} else if (messagesConsumed.length > 2048) {
438-
await sleep(1000);
439437
}
438+
await sleep(100);
440439
inProgress--;
441440
},
442441
});

test/promisified/consumer/consumerCacheTests.spec.js

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
9090
});
9191

9292
it('is cleared on seek', async () => {
93+
const producer = createProducer({}, {'batch.num.messages': '1'});
9394
await consumer.connect();
9495
await producer.connect();
9596
await consumer.subscribe({ topic: topicName });
@@ -134,6 +135,8 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
134135
expect(messagesConsumed.filter(m => m.partition === 1).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`));
135136
// partition 2
136137
expect(messagesConsumed.filter(m => m.partition === 2).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`));
138+
139+
await producer.disconnect();
137140
});
138141

139142
it('is cleared before rebalance', async () => {
@@ -142,6 +145,8 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
142145
* the consumers are created with the same groupId, we create them here.
143146
* TODO: verify correctness of theory. It's conjecture... which solves flakiness. */
144147
let groupId = `consumer-group-id-${secureRandom()}`;
148+
const multiplier = 9;
149+
const numMessages = 16 * multiplier;
145150
consumer = createConsumer({
146151
groupId,
147152
maxWaitTimeInMs: 100,
@@ -156,6 +161,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
156161
autoCommit: isAutoCommit,
157162
clientId: "consumer2",
158163
});
164+
const producer = createProducer({}, {'batch.num.messages': '1'});
159165

160166
await consumer.connect();
161167
await producer.connect();
@@ -164,7 +170,6 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
164170
const messagesConsumed = [];
165171
const messagesConsumedConsumer1 = [];
166172
const messagesConsumedConsumer2 = [];
167-
let consumer2ConsumeRunning = false;
168173

169174
consumer.run({
170175
partitionsConsumedConcurrently,
@@ -176,18 +181,13 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
176181
{ topic: event.topic, partition: event.partition, offset: Number(event.message.offset) + 1 },
177182
]);
178183

179-
/* Until the second consumer joins, consume messages slowly so as to not consume them all
180-
* before the rebalance triggers. */
181-
if (messagesConsumed.length > 1024 && !consumer2ConsumeRunning) {
182-
await sleep(10);
183-
}
184+
await sleep(100);
184185
}
185186
});
186187

187188
/* Evenly distribute 1024*9 messages across 3 partitions */
188189
let i = 0;
189-
const multiplier = 9;
190-
const messages = Array(1024 * multiplier)
190+
const messages = Array(numMessages)
191191
.fill()
192192
.map(() => {
193193
const value = secureRandom();
@@ -198,7 +198,7 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
198198

199199
// Wait for the messages - some of them, before starting the
200200
// second consumer.
201-
await waitForMessages(messagesConsumed, { number: 1024 });
201+
await waitForMessages(messagesConsumed, { number: 16 });
202202

203203
await consumer2.connect();
204204
await consumer2.subscribe({ topic: topicName });
@@ -210,23 +210,23 @@ describe.each(cases)('Consumer message cache - isAutoCommit = %s - partitionsCon
210210
});
211211

212212
await waitFor(() => consumer2.assignment().length > 0, () => null);
213-
consumer2ConsumeRunning = true;
214213

215214
/* Now that both consumers have joined, wait for all msgs to be consumed */
216-
await waitForMessages(messagesConsumed, { number: 1024 * multiplier });
215+
await waitForMessages(messagesConsumed, { number: numMessages });
217216

218217
/* No extra messages should be consumed. */
219218
await sleep(1000);
220-
expect(messagesConsumed.length).toEqual(1024 * multiplier);
219+
expect(messagesConsumed.length).toEqual(numMessages);
221220

222221
/* Check if all messages were consumed. */
223222
expect(messagesConsumed.map(event => (+event.message.offset)).sort((a, b) => a - b))
224-
.toEqual(Array(1024 * multiplier).fill().map((_, i) => Math.floor(i / 3)));
223+
.toEqual(Array(numMessages).fill().map((_, i) => Math.floor(i / 3)));
225224

226225
/* Consumer2 should have consumed at least one message. */
227226
expect(messagesConsumedConsumer2.length).toBeGreaterThan(0);
228227

229228
await consumer2.disconnect();
229+
await producer.disconnect();
230230
}, 60000);
231231

232232
it('does not hold up polling for non-message events', async () => {

0 commit comments

Comments
 (0)