Skip to content

Commit 68cc1e4

Browse files
committed
Add latency percentiles calculation
1 parent 3816d53 commit 68cc1e4

File tree

5 files changed

+145
-12
lines changed

5 files changed

+145
-12
lines changed

ci/tests/run_perf_test.js

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,19 @@ async function main() {
9090

9191
if (concurrentRun) {
9292
console.log(`Running ${modeLabel} Producer/Consumer test (concurrently)...`);
93-
const INITIAL_DELAY_MS = 2000;
93+
const INITIAL_DELAY_MS = 10000;
9494
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
9595
// Wait INITIAL_DELAY_MS more to see if all lag is caught up, start earlier than the producer to check
9696
// E2E latencies more accurately.
97-
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + INITIAL_DELAY_MS * 2;
97+
const TERMINATE_TIMEOUT_MS_CONSUMERS = TERMINATE_TIMEOUT_MS + INITIAL_DELAY_MS + 2000;
98+
const TERMINATE_TIMEOUT_MS_LAG_MONITORING = TERMINATE_TIMEOUT_MS + 1000;
9899

99100
await runCommand(`MODE=${mode} node performance-consolidated.js --create-topics`);
101+
102+
console.log(`Waiting 10s ms after topic creation before starting producer and consumers...`);
103+
await new Promise(resolve => setTimeout(resolve, 10000));
104+
105+
console.log(`Starting producer and consumers...`);
100106
const allPromises = [];
101107
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} node performance-consolidated.js --producer`));
102108
if (consumerModeAll || consumerModeEachMessage) {
@@ -106,10 +112,10 @@ async function main() {
106112
allPromises.push(runCommand(`MODE=${mode} MESSAGE_COUNT=${messageCount} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_BATCH=${groupIdEachBatch} node performance-consolidated.js --consumer-each-batch ${produceToSecondTopicParam}`));
107113
}
108114
if (consumerModeAll || consumerModeEachMessage) {
109-
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
115+
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_LAG_MONITORING} GROUPID_MONITOR=${groupIdEachMessage} node performance-consolidated.js --monitor-lag`));
110116
}
111117
if (consumerModeAll || consumerModeEachBatch) {
112-
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=0 TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_CONSUMERS} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
118+
allPromises.push(runCommand(`MODE=${mode} INITIAL_DELAY_MS=${INITIAL_DELAY_MS} TERMINATE_TIMEOUT_MS=${TERMINATE_TIMEOUT_MS_LAG_MONITORING} GROUPID_MONITOR=${groupIdEachBatch} node performance-consolidated.js --monitor-lag`));
113119
}
114120
const results = await Promise.allSettled(allPromises);
115121
return results.map(r => r.status === 'fulfilled' ? r.value : '').join('\n');

examples/performance/performance-consolidated.js

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
1717
const securityProtocol = process.env.SECURITY_PROTOCOL;
1818
const saslUsername = process.env.SASL_USERNAME;
1919
const saslPassword = process.env.SASL_PASSWORD;
20-
const topic = process.env.KAFKA_TOPIC || 'test-topic';
21-
const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2';
20+
const topic = process.env.KAFKA_TOPIC || `test-topic-${mode}`;
21+
const topic2 = process.env.KAFKA_TOPIC2 || `test-topic2-${mode}`;
2222
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
2323
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 4096;
2424
const batchSize = process.env.PRODUCER_BATCH_SIZE ? +process.env.PRODUCER_BATCH_SIZE : 100;
@@ -53,6 +53,13 @@ function logParameters(parameters) {
5353
}
5454
}
5555

56+
function printPercentiles(percentiles, type) {
57+
for (const { percentile, value, count, total } of percentiles) {
58+
const percentileStr = `P${percentile}`.padStart(6, ' ');
59+
console.log(`=== Consumer ${percentileStr} E2E latency ${type}: ${value.toFixed(2)} ms (${count}/${total})`);
60+
}
61+
}
62+
5663
(async function () {
5764
const producer = process.argv.includes('--producer');
5865
const consumer = process.argv.includes('--consumer');
@@ -169,10 +176,11 @@ function logParameters(parameters) {
169176
endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`);
170177
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
171178
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
172-
console.log("=== Consumer average E2E latency T0-T1 (eachMessage): ", stats.avgLatencyT0T1);
179+
printPercentiles(stats.percentilesTOT1, 'T0-T1 (eachMessage)');
173180
console.log("=== Consumer max E2E latency T0-T1 (eachMessage): ", stats.maxLatencyT0T1);
174181
if (produceToSecondTopic) {
175182
console.log("=== Consumer average E2E latency T0-T2 (eachMessage): ", stats.avgLatencyT0T2);
183+
printPercentiles(stats.percentilesTOT2, 'T0-T2 (eachMessage)');
176184
console.log("=== Consumer max E2E latency T0-T2 (eachMessage): ", stats.maxLatencyT0T2);
177185
}
178186
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
@@ -197,9 +205,11 @@ function logParameters(parameters) {
197205
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
198206
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
199207
console.log("=== Consumer average E2E latency T0-T1 (eachBatch): ", stats.avgLatencyT0T1);
208+
printPercentiles(stats.percentilesTOT1, 'T0-T1 (eachBatch)');
200209
console.log("=== Consumer max E2E latency T0-T1 (eachBatch): ", stats.maxLatencyT0T1);
201210
if (produceToSecondTopic) {
202211
console.log("=== Consumer average E2E latency T0-T2 (eachBatch): ", stats.avgLatencyT0T2);
212+
printPercentiles(stats.percentilesTOT2, 'T0-T2 (eachBatch)');
203213
console.log("=== Consumer max E2E latency T0-T2 (eachBatch): ", stats.maxLatencyT0T2);
204214
}
205215
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);

examples/performance/performance-primitives-common.js

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const { hrtime } = require('process');
22
const { randomBytes } = require('crypto');
3+
const PERCENTILES = [50, 75, 90, 95, 99, 99.9, 99.99, 100];
34

45
const TERMINATE_TIMEOUT_MS = process.env.TERMINATE_TIMEOUT_MS ? +process.env.TERMINATE_TIMEOUT_MS : 600000;
56
const AUTO_COMMIT = process.env.AUTO_COMMIT || 'false';
@@ -58,8 +59,86 @@ function genericProduceToTopic(producer, topic, messages) {
5859
});
5960
}
6061

62+
63+
// We use a simple count-sketch for latency percentiles to avoid storing all latencies in memory.
64+
// because we're also measuring the memory usage of the consumer as part of the performance tests.
65+
class LatencyCountSketch {
66+
#numBuckets;
67+
#minValue;
68+
#maxValue;
69+
#buckets;
70+
#counts;
71+
#changeBaseLogarithm;
72+
#totalCount = 0;
73+
#base;
74+
75+
constructor({
76+
error = 0.01, // 1% error
77+
minValue = 0.01, // min 10μs latency
78+
maxValue = 60000, // max 60s latency
79+
}) {
80+
// Each bucket represents [x, x * (1 + error))
81+
this.#base = 1 + error;
82+
// Change base from natural log to log base this.#base
83+
this.#changeBaseLogarithm = Math.log(this.#base);
84+
this.#numBuckets = Math.ceil(Math.log(maxValue / minValue) / Math.log(this.#base));
85+
this.#maxValue = maxValue;
86+
87+
this.#buckets = new Array(this.#numBuckets + 2).fill(0);
88+
this.#buckets[this.#numBuckets + 1] = Number.POSITIVE_INFINITY;
89+
this.#buckets[this.#numBuckets] = this.#maxValue;
90+
this.#buckets[0] = 0;
91+
let i = this.#numBuckets - 1;
92+
let currentValue = maxValue;
93+
while (i >= 1) {
94+
let nextMinimum = currentValue / this.#base;
95+
this.#buckets[i] = nextMinimum;
96+
currentValue = nextMinimum;
97+
i--;
98+
}
99+
this.#minValue = this.#buckets[1];
100+
this.#counts = new Array(this.#numBuckets + 2).fill(0);
101+
}
102+
103+
add(latency) {
104+
let idx = 0;
105+
if (latency > 0)
106+
idx = Math.ceil(Math.log(latency / this.#minValue) / this.#changeBaseLogarithm);
107+
idx = (idx < 0) ? 0 :
108+
(idx > this.#buckets.length - 2) ? (this.#buckets.length - 2) :
109+
idx;
110+
111+
this.#counts[idx]++;
112+
this.#totalCount++;
113+
}
114+
115+
percentiles(percentilesArray) {
116+
const percentileCounts = percentilesArray.map(p => Math.ceil(this.#totalCount * p / 100));
117+
const percentileResults = new Array(percentilesArray.length);
118+
var totalCountSoFar = 0;
119+
let j = 0;
120+
let sum = 0;
121+
for (let i = 0; i < this.#counts.length; i++) {
122+
sum += this.#counts[i];
123+
}
124+
for (let i = 0; i < percentileCounts.length; i++) {
125+
while ((totalCountSoFar < percentileCounts[i]) && (j < this.#counts.length - 1)) {
126+
totalCountSoFar += this.#counts[j];
127+
j++;
128+
}
129+
const bucketIndex = (j < this.#counts.length - 1) ? j : this.#counts.length - 2;
130+
percentileResults[i] = [this.#buckets[bucketIndex], totalCountSoFar, this.#totalCount];
131+
}
132+
return percentileResults;
133+
}
134+
}
135+
61136
async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eachBatch, partitionsConsumedConcurrently, stats, actionOnMessages) {
62137
const handlers = installHandlers(totalMessageCnt === -1);
138+
if (stats) {
139+
stats.percentilesTOT1 = new LatencyCountSketch({});
140+
stats.percentilesTOT2 = new LatencyCountSketch({});
141+
}
63142
while (true) {
64143
try {
65144
await consumer.connect();
@@ -96,7 +175,17 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
96175
return;
97176

98177
const sentAt = Number(decoder.decode(message.value.slice(0, 13)));
99-
const latency = receivedAt - sentAt;
178+
let latency = receivedAt - sentAt;
179+
180+
if (isNaN(latency)) {
181+
console.log(`WARN: NaN latency received message timestamp: ${message.value.slice(0, 13)}`);
182+
return;
183+
} else if (latency < 0) {
184+
console.log(`WARN: negative latency ${latency} sentAt ${sentAt} receivedAt ${receivedAt}`);
185+
latency = 0;
186+
} else if (latency > 60000) {
187+
console.log(`WARN: received large latency ${latency} sentAt ${sentAt} receivedAt ${receivedAt}`);
188+
}
100189

101190
if (!isT0T2) {
102191
if (!stats.maxLatencyT0T1) {
@@ -106,6 +195,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
106195
stats.maxLatencyT0T1 = Math.max(stats.maxLatencyT0T1, latency);
107196
stats.avgLatencyT0T1 = ((stats.avgLatencyT0T1 * (numMessages - 1)) + latency) / numMessages;
108197
}
198+
stats.percentilesTOT1.add(latency);
109199
} else {
110200
if (!stats.maxLatencyT0T2) {
111201
stats.maxLatencyT0T2 = latency;
@@ -114,6 +204,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
114204
stats.maxLatencyT0T2 = Math.max(stats.maxLatencyT0T2, latency);
115205
stats.avgLatencyT0T2 = ((stats.avgLatencyT0T2 * (numMessages - 1)) + latency) / numMessages;
116206
}
207+
stats.percentilesTOT2.add(latency);
117208
}
118209
};
119210

@@ -257,6 +348,18 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
257348
stats.messageRate = durationSeconds > 0 ?
258349
(messagesMeasured / durationSeconds) : Infinity;
259350
stats.durationSeconds = durationSeconds;
351+
stats.percentilesTOT1 = stats.percentilesTOT1.percentiles(PERCENTILES).map((value, index) => ({
352+
percentile: PERCENTILES[index],
353+
value: value[0],
354+
count: value[1],
355+
total: value[2],
356+
}));
357+
stats.percentilesTOT2 = stats.percentilesTOT2.percentiles(PERCENTILES).map((value, index) => ({
358+
percentile: PERCENTILES[index],
359+
value: value[0],
360+
count: value[1],
361+
total: value[2],
362+
}));
260363
}
261364
removeHandlers(handlers);
262365
return rate;

examples/performance/performance-primitives-kafkajs.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ module.exports = {
2020
runProducerConsumerTogether,
2121
};
2222

23+
const IS_HIGHER_LATENCY_BROKER = process.env.IS_HIGHER_LATENCY_BROKER === 'true';
24+
2325
function baseConfiguration(parameters) {
2426
let ret = {
2527
clientId: 'kafka-test-performance',
@@ -147,13 +149,17 @@ class CompatibleConsumer {
147149

148150
function newCompatibleConsumer(parameters, eachBatch) {
149151
const kafka = new Kafka(baseConfiguration(parameters));
152+
const higherLatencyBrokerOpts = IS_HIGHER_LATENCY_BROKER ? {
153+
maxBytesPerPartition: 8388608
154+
} : {};
150155

151156
let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE;
152157
if (!groupId) {
153158
groupId = 'test-group' + Math.random();
154159
}
155160
console.log(`New KafkaJS group id: ${groupId}`);
156161
const consumer = kafka.consumer({
162+
...higherLatencyBrokerOpts,
157163
groupId,
158164
});
159165
return new CompatibleConsumer(consumer);

examples/performance/performance-primitives.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ module.exports = {
2121

2222

2323
const CONSUMER_MAX_BATCH_SIZE = process.env.CONSUMER_MAX_BATCH_SIZE ? +process.env.CONSUMER_MAX_BATCH_SIZE : null;
24+
const IS_HIGHER_LATENCY_BROKER = process.env.IS_HIGHER_LATENCY_BROKER === 'true';
2425

2526
function baseConfiguration(parameters) {
2627
let ret = {
@@ -100,9 +101,13 @@ class CompatibleProducer {
100101
}
101102
}
102103
function newCompatibleProducer(parameters, compression) {
104+
const higherLatencyBrokerOpts = IS_HIGHER_LATENCY_BROKER ? {
105+
'linger.ms': '100'
106+
} : {};
103107
return new CompatibleProducer(
104108
new Kafka({
105109
...baseConfiguration(parameters),
110+
...higherLatencyBrokerOpts,
106111
'compression.codec': CompressionTypes[compression],
107112
}).producer());
108113
}
@@ -149,10 +154,12 @@ function newCompatibleConsumer(parameters, eachBatch) {
149154
const autoCommitOpts = autoCommit > 0 ?
150155
{ 'enable.auto.commit': true, 'auto.commit.interval.ms': autoCommit } :
151156
{ 'enable.auto.commit': false };
152-
const jsOpts = {};
153-
if (eachBatch && CONSUMER_MAX_BATCH_SIZE !== null) {
154-
jsOpts['js.consumer.max.batch.size'] = CONSUMER_MAX_BATCH_SIZE;
155-
}
157+
const jsOpts = (eachBatch && CONSUMER_MAX_BATCH_SIZE !== null) ? {
158+
'js.consumer.max.batch.size': CONSUMER_MAX_BATCH_SIZE
159+
} : {};
160+
const higherLatencyBrokerOpts = IS_HIGHER_LATENCY_BROKER ? {
161+
'max.partition.fetch.bytes': '8388608'
162+
} : {};
156163

157164
let groupId = eachBatch ? process.env.GROUPID_BATCH : process.env.GROUPID_MESSAGE;
158165
if (!groupId) {
@@ -165,6 +172,7 @@ function newCompatibleConsumer(parameters, eachBatch) {
165172
'fetch.queue.backoff.ms': '100',
166173
...autoCommitOpts,
167174
...jsOpts,
175+
...higherLatencyBrokerOpts,
168176
});
169177
return new CompatibleConsumer(consumer);
170178
}

0 commit comments

Comments
 (0)