Skip to content

Commit 99bbb70

Browse files
committed
Performance test improvements, measure eachBatch rate, time and lag, avg and max memory usage
1 parent 890f9ec commit 99bbb70

File tree

8 files changed

+395
-160
lines changed

8 files changed

+395
-160
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ coverage
2121
.nyc_output/
2222
*lcov.info
2323
**/lcov-report
24+
examples/performance/*.json
25+
*.log

.semaphore/semaphore.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ blocks:
191191
- export NODE_OPTIONS='--max-old-space-size=1536'
192192
- cd examples/performance
193193
- npm install
194-
- bash -c '../../ci/tests/run_perf_test.sh'
194+
- node '../../ci/tests/run_perf_test.js'
195195
- rm -rf ./node_modules
196196

197197
- name: "Linux amd64: Release"

ci/tests/run_perf_test.js

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#!/usr/bin/env node
2+
3+
const { execSync } = require('child_process');
4+
5+
function runCommand(command) {
6+
try {
7+
const output = execSync(command, { encoding: 'utf8', stdio: 'pipe' });
8+
console.log(output);
9+
return output;
10+
} catch (error) {
11+
const errorOutput = error.stdout || error.stderr || error.message;
12+
console.log(errorOutput);
13+
return errorOutput;
14+
}
15+
}
16+
17+
function extractValue(content, pattern) {
18+
try {
19+
const lines = content.split('\n');
20+
const matchingLine = lines.find(line => line.includes(pattern));
21+
if (matchingLine) {
22+
const value = matchingLine.split(':')[1]?.trim();
23+
return Number(value || '');
24+
}
25+
return NaN;
26+
} catch (error) {
27+
return NaN;
28+
}
29+
}
30+
31+
function belowThreshold(value, target, threshold = 0.7) {
32+
if (isNaN(value) || isNaN(target))
33+
throw new Error(`Invalid number comparison: value=${value}, target=${target}`);
34+
return value < (target * threshold);
35+
}
36+
37+
function belowTarget(value, target) {
38+
return belowThreshold(value, target, 1);
39+
}
40+
41+
// Run performance tests and store outputs in memory
42+
console.log('Running Confluent Producer/Consumer test...');
43+
const outputConfluentProducerConsumer = runCommand('MODE=confluent MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer');
44+
45+
console.log('Running KafkaJS Producer/Consumer test...');
46+
const outputKjsProducerConsumer = runCommand('MODE=kafkajs MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer');
47+
48+
console.log('Running Confluent CTP test...');
49+
const outputConfluentCtp = runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
50+
51+
console.log('Running KafkaJS CTP test...');
52+
const outputKjsCtp = runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
53+
54+
// Extract Confluent results
55+
const producerConfluent = extractValue(outputConfluentProducerConsumer, '=== Producer Rate:');
56+
const consumerConfluentMessage = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate (eachMessage):');
57+
const consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):');
58+
const consumerConfluentBatch = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate (eachBatch):');
59+
const consumerConfluentBatchTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachBatch):');
60+
const consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch lag:');
61+
const consumerConfluentBatchMaxLag = extractValue(outputConfluentProducerConsumer, '=== Max eachBatch lag:');
62+
const consumerConfluentAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Max Average RSS across tests:');
63+
const consumerConfluentMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max RSS across tests:');
64+
const ctpConfluent = extractValue(outputConfluentCtp, '=== Consume-Transform-Produce Rate:');
65+
66+
// Extract KafkaJS results
67+
const producerKjs = extractValue(outputKjsProducerConsumer, '=== Producer Rate:');
68+
const consumerKjsMessage = extractValue(outputKjsProducerConsumer, '=== Consumer Rate (eachMessage):');
69+
const consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
70+
const consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate (eachBatch):');
71+
const consumerKjsBatchTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachBatch):');
72+
const consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '=== Average eachBatch lag:');
73+
const consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:');
74+
const consumerKjsAverageRSS = extractValue(outputKjsProducerConsumer, '=== Max Average RSS across tests:');
75+
const consumerKjsMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max RSS across tests:');
76+
const ctpKjs = extractValue(outputKjsCtp, '=== Consume-Transform-Produce Rate:');
77+
78+
// Print results
79+
console.log(`Producer rates: confluent ${producerConfluent}, kafkajs ${producerKjs}`);
80+
console.log(`Consumer rates (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
81+
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
82+
console.log(`Consumer rates (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
83+
console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
84+
console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`);
85+
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);
86+
console.log(`Average RSS: confluent ${consumerConfluentAverageRSS}, kafkajs ${consumerKjsAverageRSS}`);
87+
console.log(`Max RSS: confluent ${consumerConfluentMaxRSS}, kafkajs ${consumerKjsMaxRSS}`);
88+
console.log(`CTP rates: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`);
89+
90+
let errcode = 0;
91+
const maxPerformanceDifference = 0.7;
92+
93+
// Compare against KJS (30% threshold)
94+
if (belowThreshold(producerConfluent, producerKjs, maxPerformanceDifference)) {
95+
console.log(`Producer rates differ by more than 30%: confluent ${producerConfluent}, kafkajs ${producerKjs}`);
96+
errcode = 1;
97+
}
98+
99+
if (belowThreshold(consumerConfluentMessage, consumerKjsMessage, maxPerformanceDifference)) {
100+
console.log(`Consumer rates (eachMessage) differ by more than 30%: confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
101+
// FIXME: improve consumer performance at least to KafkaJS level
102+
errcode = 0;
103+
}
104+
105+
// Lower is better for time
106+
if (belowThreshold(consumerKjsTime, consumerConfluentTime, maxPerformanceDifference)) {
107+
console.log(`Consumption time (eachMessage) differ by more than 30%: confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
108+
errcode = 0;
109+
}
110+
111+
if (belowThreshold(consumerConfluentBatch, consumerKjsBatch, maxPerformanceDifference)) {
112+
console.log(`Consumer rates (eachBatch) differ by more than 30%: confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
113+
errcode = 0;
114+
}
115+
116+
// Lower is better for time
117+
if (belowThreshold(consumerKjsBatchTime, consumerConfluentBatchTime, maxPerformanceDifference)) {
118+
console.log(`Consumption time (eachBatch) differ by more than 30%: confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
119+
errcode = 0;
120+
}
121+
122+
if (belowThreshold(ctpConfluent, ctpKjs, maxPerformanceDifference)) {
123+
console.log(`CTP rates differ by more than 30%: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`);
124+
errcode = 1;
125+
}
126+
127+
// Compare against target numbers
128+
const TARGET_PRODUCE = process.env.TARGET_PRODUCE_PERFORMANCE || '35';
129+
const TARGET_CONSUME = process.env.TARGET_CONSUME_PERFORMANCE || '18';
130+
const TARGET_CTP = process.env.TARGET_CTP_PERFORMANCE || '0.02';
131+
132+
if (belowTarget(producerConfluent, TARGET_PRODUCE)) {
133+
console.log(`Confluent producer rate is below target: ${producerConfluent}`);
134+
errcode = 1;
135+
}
136+
137+
if (belowTarget(consumerConfluentMessage, TARGET_CONSUME)) {
138+
console.log(`Confluent consumer rate is below target: ${consumerConfluentMessage}`);
139+
errcode = 1;
140+
}
141+
142+
if (belowTarget(ctpConfluent, TARGET_CTP)) {
143+
console.log(`Confluent CTP rate is below target: ${ctpConfluent}`);
144+
errcode = 1;
145+
}
146+
147+
process.exit(errcode);

ci/tests/run_perf_test.sh

Lines changed: 0 additions & 64 deletions
This file was deleted.

examples/performance/performance-consolidated.js

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const fs = require('fs');
12
const mode = process.env.MODE ? process.env.MODE : 'confluent';
23

34
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether;
@@ -27,6 +28,44 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
2728
const produceConsumeLatency = process.argv.includes('--latency');
2829
const all = process.argv.includes('--all');
2930
const createTopics = process.argv.includes('--create-topics');
31+
let maxAverageRSSKB, maxMaxRSSKB;
32+
const stats = {};
33+
34+
let measures = [];
35+
let interval;
36+
const startTrackingMemory = () => {
37+
interval = setInterval(() => {
38+
const rss = BigInt(process.memoryUsage().rss);
39+
measures.push({ rss, timestamp: Date.now() });
40+
}, 100);
41+
};
42+
43+
const datapointToJSON = (m) =>
44+
({ rss: m.rss.toString(), timestamp: m.timestamp.toString() });
45+
46+
const endTrackingMemory = (fileName) => {
47+
clearInterval(interval);
48+
interval = null;
49+
const averageRSS = measures.reduce((sum, m) => sum + m.rss, 0n) / BigInt(measures.length);
50+
const averageRSSKB = averageRSS / 1024n;
51+
maxAverageRSSKB = !maxAverageRSSKB || averageRSSKB > maxAverageRSSKB ? averageRSSKB : maxAverageRSSKB;
52+
console.log(`=== Average RSS: ${averageRSSKB} KB`);
53+
const max = measures.reduce((prev, current) => (prev.rss > current.rss) ? prev : current);
54+
const maxRSSKB = max.rss / 1024n;
55+
maxMaxRSSKB = !maxMaxRSSKB || maxRSSKB > maxMaxRSSKB ? maxRSSKB : maxMaxRSSKB;
56+
console.log(`=== Max RSS: ${maxRSSKB} KB at ${new Date(max.timestamp).toISOString()}`);
57+
if (fileName) {
58+
const measuresJSON = JSON.stringify({
59+
measures: measures.map(datapointToJSON),
60+
averageRSS: averageRSS.toString(),
61+
maxRSS: datapointToJSON(max)
62+
}, null, 2);
63+
fs.writeFileSync(fileName, measuresJSON);
64+
}
65+
measures = [];
66+
}
67+
68+
console.log(`=== Starting Performance Tests - Mode ${mode} ===`);
3069

3170
if (createTopics || all) {
3271
console.log("=== Creating Topics (deleting if they exist already):");
@@ -45,18 +84,38 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
4584
console.log(` Batch Size: ${batchSize}`);
4685
console.log(` Compression: ${compression}`);
4786
console.log(` Warmup Messages: ${warmupMessages}`);
87+
startTrackingMemory();
4888
const producerRate = await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
89+
endTrackingMemory(`producer-memory-${mode}.json`);
4990
console.log("=== Producer Rate: ", producerRate);
5091
}
5192

5293
if (consumer || all) {
5394
// If user runs this without --producer then they are responsible for seeding the topic.
54-
console.log("=== Running Basic Consumer Performance Test:")
95+
console.log("=== Running Basic Consumer Performance Test (eachMessage):")
5596
console.log(` Brokers: ${brokers}`);
5697
console.log(` Topic: ${topic}`);
5798
console.log(` Message Count: ${messageCount}`);
58-
const consumerRate = await runConsumer(brokers, topic, messageCount);
59-
console.log("=== Consumer Rate: ", consumerRate);
99+
startTrackingMemory();
100+
const consumerRate = await runConsumer(brokers, topic, warmupMessages, messageCount, false, stats);
101+
endTrackingMemory(`consumer-memory-message-${mode}.json`);
102+
console.log("=== Consumer Rate (eachMessage): ", consumerRate);
103+
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
104+
}
105+
106+
if (consumer || all) {
107+
// If user runs this without --producer then they are responsible for seeding the topic.
108+
console.log("=== Running Basic Consumer Performance Test (eachBatch):")
109+
console.log(` Brokers: ${brokers}`);
110+
console.log(` Topic: ${topic}`);
111+
console.log(` Message Count: ${messageCount}`);
112+
startTrackingMemory();
113+
const consumerRate = await runConsumer(brokers, topic, warmupMessages, messageCount, true, stats);
114+
endTrackingMemory(`consumer-memory-batch-${mode}.json`);
115+
console.log("=== Consumer Rate (eachBatch): ", consumerRate);
116+
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
117+
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
118+
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
60119
}
61120

62121
if (ctp || all) {
@@ -67,7 +126,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
67126
console.log(` Message Count: ${messageCount}`);
68127
// Seed the topic with messages
69128
await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
129+
startTrackingMemory();
70130
const ctpRate = await runConsumeTransformProduce(brokers, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
131+
endTrackingMemory(`consume-transform-produce-${mode}.json`);
71132
console.log("=== Consume-Transform-Produce Rate: ", ctpRate);
72133
}
73134

@@ -78,7 +139,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
78139
console.log(` Message Count: ${messageCount}`);
79140
console.log(` Consumer Processing Time: ${consumerProcessingTime}`);
80141
console.log(` Producer Processing Time: ${producerProcessingTime}`);
142+
startTrackingMemory();
81143
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(brokers, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
144+
endTrackingMemory(`producer-consumer-together-${mode}.json`);
82145
console.log(`=== Produce-To-Consume Latency (ms): Mean: ${mean}, P50: ${p50}, P90: ${p90}, P95: ${p95}`);
83146

84147
// The presence of outliers invalidates the mean measurement, and rasies concerns as to why there are any.
@@ -87,4 +150,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
87150
console.log("=== Outliers (ms): ", outliers);
88151
}
89152
}
153+
154+
if (maxAverageRSSKB !== undefined && maxMaxRSSKB !== undefined) {
155+
console.log(`=== Max Average RSS across tests: ${maxAverageRSSKB}`);
156+
console.log(`=== Max RSS across tests: ${maxMaxRSSKB}`);
157+
}
90158
})();

0 commit comments

Comments
 (0)