Skip to content

Commit 4ad7f02

Browse files
committed
Keep 1.5s of data in cache
1 parent e8bd34f commit 4ad7f02

File tree

2 files changed

+74
-23
lines changed

2 files changed

+74
-23
lines changed

lib/kafkajs/_consumer.js

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ class Consumer {
155155
*/
156156
#messageCache = null;
157157

158+
/**
159+
* The maximum size of the message cache.
160+
* Will be adjusted dynamically.
161+
*/
162+
#messageCacheMaxSize = 1;
163+
158164
/**
159165
* Whether the user has enabled manual offset management (commits).
160166
*/
@@ -175,6 +181,7 @@ class Consumer {
175181
* Maximum batch size passed in eachBatch calls.
176182
*/
177183
#maxBatchSize = 32;
184+
#maxBatchesSize = 32;
178185

179186
/**
180187
* Whether worker termination has been scheduled.
@@ -191,6 +198,9 @@ class Consumer {
191198
*/
192199
#concurrency = 1;
193200

201+
202+
#runConfig = null;
203+
194204
/**
195205
* Promise that resolves together with last in progress fetch.
196206
* It's set to null when no fetch is in progress.
@@ -228,7 +238,15 @@ class Consumer {
228238
/**
229239
* Last fetch real time clock in nanoseconds.
230240
*/
231-
#lastFetchClockNs = 0;
241+
#lastFetchClockNs = 0n;
242+
/**
243+
* Last number of messages fetched.
244+
*/
245+
#lastFetchedMessageCnt = 0n;
246+
/**
247+
* Last fetch concurrency used.
248+
*/
249+
#lastFetchedConcurrency = 0n;
232250

233251
/**
234252
* List of pending operations to be executed after
@@ -691,6 +709,10 @@ class Consumer {
691709
{ code: error.ErrorCodes.ERR__INVALID_ARG });
692710
}
693711
this.#maxBatchSize = maxBatchSize;
712+
this.#maxBatchesSize = maxBatchSize;
713+
if (maxBatchSize === -1) {
714+
this.#messageCacheMaxSize = Number.MAX_SAFE_INTEGER;
715+
}
694716
delete rdKafkaConfig['js.max.batch.size'];
695717
}
696718

@@ -933,6 +955,43 @@ class Consumer {
933955
return returnPayload;
934956
}
935957

958+
#updateMaxMessageCacheSize() {
959+
if (this.#maxBatchSize === -1) {
960+
return;
961+
}
962+
963+
const nowNs = hrtime.bigint();
964+
if (this.#lastFetchedMessageCnt > 0 && this.#lastFetchClockNs > 0n &&
965+
nowNs > this.#lastFetchClockNs) {
966+
const consumptionDurationSeconds = Number(nowNs - this.#lastFetchClockNs) / 1e9;
967+
const messagesPerSecondSingleWorker = this.#lastFetchedMessageCnt / this.#lastFetchedConcurrency / consumptionDurationSeconds;
968+
// Keep enough messages in the cache for 1.5 seconds of consumption.
969+
this.#messageCacheMaxSize = Math.round(1.5 * messagesPerSecondSingleWorker) * this.#concurrency;
970+
const minCacheSize = this.#runConfig.eachBatch ? this.#maxBatchesSize : this.#concurrency;
971+
if (this.#messageCacheMaxSize < minCacheSize)
972+
this.#messageCacheMaxSize = minCacheSize;
973+
else if (this.#messageCacheMaxSize > minCacheSize * 10)
974+
this.#messageCacheMaxSize = minCacheSize * 10;
975+
if (this.#messageCacheMaxSize < 1024)
976+
this.#messageCacheMaxSize = 1024;
977+
}
978+
}
979+
980+
#saveFetchStats(messages) {
981+
this.#lastFetchClockNs = hrtime.bigint();
982+
const partitionsNum = new Map();
983+
for (const msg of messages) {
984+
const key = partitionKey(msg);
985+
partitionsNum.set(key, 1);
986+
if (partitionsNum.size >= this.#concurrency) {
987+
break;
988+
}
989+
}
990+
this.#lastFetchedConcurrency = partitionsNum.size;
991+
this.#lastFetchedMessageCnt = messages.length;
992+
}
993+
994+
936995
async #fetchAndResolveWith(takeFromCache, size) {
937996
if (this.#fetchInProgress) {
938997
await this.#fetchInProgress;
@@ -959,6 +1018,8 @@ class Consumer {
9591018
const fetchResult = new DeferredPromise();
9601019
this.#logger.debug(`Attempting to fetch ${size} messages to the message cache`,
9611020
this.#createConsumerBindingMessageMetadata());
1021+
1022+
this.#updateMaxMessageCacheSize();
9621023
this.#internalClient.consume(size, (err, messages) =>
9631024
fetchResult.resolve([err, messages]));
9641025

@@ -975,7 +1036,7 @@ class Consumer {
9751036

9761037
this.#messageCache.addMessages(messages);
9771038
const res = takeFromCache();
978-
this.#lastFetchClockNs = hrtime.bigint();
1039+
this.#saveFetchStats(messages);
9791040
this.#maxPollIntervalRestart.resolve();
9801041
return res;
9811042
} finally {
@@ -1011,7 +1072,7 @@ class Consumer {
10111072
}
10121073

10131074
return this.#fetchAndResolveWith(() => this.#messageCache.next(),
1014-
Number.MAX_SAFE_INTEGER);
1075+
this.#messageCacheMaxSize);
10151076
}
10161077

10171078
/**
@@ -1042,7 +1103,7 @@ class Consumer {
10421103

10431104
return this.#fetchAndResolveWith(() =>
10441105
this.#messageCache.nextN(null, size),
1045-
Number.MAX_SAFE_INTEGER);
1106+
this.#messageCacheMaxSize);
10461107
}
10471108

10481109
/**
@@ -1528,7 +1589,9 @@ class Consumer {
15281589
* @private
15291590
*/
15301591
async #runInternal(config) {
1592+
this.#runConfig = config;
15311593
this.#concurrency = config.partitionsConsumedConcurrently;
1594+
this.#maxBatchesSize = this.#maxBatchSize * this.#concurrency;
15321595
const perMessageProcessor = config.eachMessage ? this.#messageProcessor : this.#batchProcessor;
15331596
const fetcher = config.eachMessage
15341597
? (savedIdx) => this.#consumeSingleCached(savedIdx)
@@ -1543,6 +1606,8 @@ class Consumer {
15431606

15441607
this.#workerTerminationScheduled = new DeferredPromise();
15451608
this.#lastFetchClockNs = hrtime.bigint();
1609+
this.#lastFetchedMessageCnt = 0;
1610+
this.#lastFetchedConcurrency = 0;
15461611
if (this.#pendingOperations.length === 0) {
15471612
const workersToSpawn = Math.max(1, Math.min(this.#concurrency, this.#partitionCount));
15481613
const cacheExpirationLoop = this.#cacheExpirationLoop();

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -642,20 +642,14 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
642642

643643
let errors = false;
644644
let receivedMessages = 0;
645-
const batchLengths = [1, 1, 2,
646-
/* cache reset */
647-
1, 1];
648645
consumer.run({
649646
partitionsConsumedConcurrently,
650647
eachBatchAutoResolve: true,
651648
eachBatch: async (event) => {
652649
receivedMessages++;
653650

654651
try {
655-
expect(event.batch.messages.length)
656-
.toEqual(batchLengths[receivedMessages - 1]);
657-
658-
if (receivedMessages === 3) {
652+
if (event.batch.messages.length >= 32) {
659653
expect(event.isStale()).toEqual(false);
660654
await sleep(7500);
661655
/* 7.5s 'processing'
@@ -732,31 +726,23 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
732726

733727
let errors = false;
734728
let receivedMessages = 0;
735-
const batchLengths = [/* first we reach batches of 32 message and fetches of 64
736-
* max poll interval exceeded happens on second
737-
* 32 messages batch of the 64 msg fetch. */
738-
1, 1, 2, 2, 4, 4, 8, 8, 16, 16, 32, 32, 32, 32,
739-
/* max poll interval exceeded, 32 reprocessed +
740-
* 1 new message. */
741-
1, 1, 2, 2, 4, 4, 8, 8, 3];
729+
let firstLongBatchProcessing;
742730
consumer.run({
743731
partitionsConsumedConcurrently,
744732
eachBatchAutoResolve: true,
745733
eachBatch: async (event) => {
746734
receivedMessages++;
747735

748736
try {
749-
expect(event.batch.messages.length)
750-
.toEqual(batchLengths[receivedMessages - 1]);
751-
752-
if (receivedMessages === 13) {
737+
if (!firstLongBatchProcessing && event.batch.messages.length >= 32) {
753738
expect(event.isStale()).toEqual(false);
754739
await sleep(6000);
755740
/* 6s 'processing'
756741
* cache clearance starts at 7000 */
757742
expect(event.isStale()).toEqual(false);
743+
firstLongBatchProcessing = receivedMessages;
758744
}
759-
if ( receivedMessages === 14) {
745+
if (firstLongBatchProcessing && receivedMessages === firstLongBatchProcessing + 1) {
760746
expect(event.isStale()).toEqual(false);
761747
await sleep(10000);
762748
/* 10s 'processing'

0 commit comments

Comments
 (0)