Skip to content

Commit 15928f1

Browse files
committed
Use same producer and different E2E latencies
1 parent 476a38e commit 15928f1

File tree

5 files changed

+112
-49
lines changed

5 files changed

+112
-49
lines changed

ci/tests/run_perf_test.js

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ async function main() {
126126
let ctpConfluent, ctpKjs;
127127
let consumerConfluentMessage;
128128
let consumerConfluentMessageRate;
129-
let consumerConfluentMessageAvgLatency;
130-
let consumerConfluentMessageMaxLatency;
129+
let consumerConfluentMessageAvgLatencyT0T1;
130+
let consumerConfluentMessageMaxLatencyT0T1;
131+
let consumerConfluentMessageAvgLatencyT0T2;
132+
let consumerConfluentMessageMaxLatencyT0T2;
131133
let consumerConfluentTime;
132134
let consumerConfluentMessageAverageRSS;
133135
let consumerConfluentMessageMaxRSS;
@@ -137,8 +139,10 @@ async function main() {
137139

138140
let consumerConfluentBatch;
139141
let consumerConfluentBatchRate;
140-
let consumerConfluentBatchAvgLatency;
141-
let consumerConfluentBatchMaxLatency;
142+
let consumerConfluentBatchAvgLatencyT0T1;
143+
let consumerConfluentBatchMaxLatencyT0T1;
144+
let consumerConfluentBatchAvgLatencyT0T2;
145+
let consumerConfluentBatchMaxLatencyT0T2;
142146
let consumerConfluentBatchTime;
143147
let consumerConfluentBatchAverageLag;
144148
let consumerConfluentBatchMaxLag;
@@ -155,8 +159,10 @@ async function main() {
155159
if (consumerModeAll || consumerModeEachMessage) {
156160
consumerConfluentMessage = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachMessage):');
157161
consumerConfluentMessageRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
158-
consumerConfluentMessageAvgLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency (eachMessage):');
159-
consumerConfluentMessageMaxLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
162+
consumerConfluentMessageAvgLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):');
163+
consumerConfluentMessageMaxLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):');
164+
consumerConfluentMessageAvgLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):');
165+
consumerConfluentMessageMaxLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):');
160166
consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):');
161167
consumerConfluentMessageAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Average consumer-each-message RSS KB:');
162168
consumerConfluentMessageMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max consumer-each-message RSS KB:');
@@ -167,8 +173,10 @@ async function main() {
167173
if (consumerModeAll || consumerModeEachBatch) {
168174
consumerConfluentBatch = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
169175
consumerConfluentBatchRate = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
170-
consumerConfluentBatchAvgLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency (eachBatch):');
171-
consumerConfluentBatchMaxLatency = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency (eachBatch):');
176+
consumerConfluentBatchAvgLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):');
177+
consumerConfluentBatchMaxLatencyT0T1 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):');
178+
consumerConfluentBatchAvgLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):');
179+
consumerConfluentBatchMaxLatencyT0T2 = extractValue(outputConfluentProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):');
172180
consumerConfluentBatchTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachBatch):');
173181
consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch lag:');
174182
consumerConfluentBatchMaxLag = extractValue(outputConfluentProducerConsumer, '=== Max eachBatch lag:');
@@ -188,8 +196,8 @@ async function main() {
188196
// Extract KafkaJS results
189197
let consumerKjsMessage;
190198
let consumerKjsMessageRate;
191-
let consumerKjsMessageAvgLatency;
192-
let consumerKjsMessageMaxLatency;
199+
let consumerKjsMessageAvgLatencyT0T1;
200+
let consumerKjsMessageMaxLatencyT0T1;
193201
let consumerKjsTime;
194202
let consumerKjsMessageAverageRSS;
195203
let consumerKjsMessageMaxRSS;
@@ -199,8 +207,8 @@ async function main() {
199207

200208
let consumerKjsBatch;
201209
let consumerKjsBatchRate;
202-
let consumerKjsBatchAvgLatency;
203-
let consumerKjsBatchMaxLatency;
210+
let consumerKjsBatchAvgLatencyT0T1;
211+
let consumerKjsBatchMaxLatencyT0T1;
204212
let consumerKjsBatchTime;
205213
let consumerKjsBatchAverageLag;
206214
let consumerKjsBatchMaxLag;
@@ -217,8 +225,10 @@ async function main() {
217225
if (consumerModeAll || consumerModeEachMessage) {
218226
consumerKjsMessage = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachMessage):');
219227
consumerKjsMessageRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachMessage):');
220-
consumerKjsMessageAvgLatency = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency (eachMessage):');
221-
consumerKjsMessageMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachMessage):');
228+
consumerKjsMessageAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachMessage):');
229+
consumerKjsMessageMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachMessage):');
230+
consumerKjsMessageAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachMessage):');
231+
consumerKjsMessageMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachMessage):');
222232
consumerKjsMessageAverageRSS = extractValue(outputKjsProducerConsumer, '=== Average consumer-each-message RSS KB:');
223233
consumerKjsMessageMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max consumer-each-message RSS KB:');
224234
consumerKjsMessageAverageBrokerLag = extractValue(outputKjsProducerConsumer, `=== Average broker lag (${groupIdEachMessageKafkaJS}):`);
@@ -229,8 +239,10 @@ async function main() {
229239
consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
230240
consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate MB/s (eachBatch):');
231241
consumerKjsBatchRate = extractValue(outputKjsProducerConsumer, '=== Consumer Rate msg/s (eachBatch):');
232-
consumerKjsBatchAvgLatency = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency (eachBatch):');
233-
consumerKjsBatchMaxLatency = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency (eachBatch):');
242+
consumerKjsBatchAvgLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T1 (eachBatch):');
243+
consumerKjsBatchMaxLatencyT0T1 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T1 (eachBatch):');
244+
consumerKjsBatchAvgLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer average E2E latency T0-T2 (eachBatch):');
245+
consumerKjsBatchMaxLatencyT0T2 = extractValue(outputKjsProducerConsumer, '=== Consumer max E2E latency T0-T2 (eachBatch):');
234246
consumerKjsBatchTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachBatch):');
235247
consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '=== Average eachBatch lag:');
236248
consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:');
@@ -254,8 +266,12 @@ async function main() {
254266
if (consumerModeAll || consumerModeEachMessage) {
255267
console.log(`Consumer rates MB/s (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
256268
console.log(`Consumer rates msg/s (eachMessage): confluent ${consumerConfluentMessageRate}, kafkajs ${consumerKjsMessageRate}`);
257-
console.log(`Consumer average E2E latency (eachMessage): confluent ${consumerConfluentMessageAvgLatency}, kafkajs ${consumerKjsMessageAvgLatency}`);
258-
console.log(`Consumer max E2E latency (eachMessage): confluent ${consumerConfluentMessageMaxLatency}, kafkajs ${consumerKjsMessageMaxLatency}`);
269+
console.log(`Consumer average E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T1}, kafkajs ${consumerKjsMessageAvgLatencyT0T1}`);
270+
console.log(`Consumer max E2E latency T0-T1 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T1}, kafkajs ${consumerKjsMessageMaxLatencyT0T1}`);
271+
if (produceToSecondTopic) {
272+
console.log(`Consumer average E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageAvgLatencyT0T2}, kafkajs ${consumerKjsMessageAvgLatencyT0T2}`);
273+
console.log(`Consumer max E2E latency T0-T2 (eachMessage): confluent ${consumerConfluentMessageMaxLatencyT0T2}, kafkajs ${consumerKjsMessageMaxLatencyT0T2}`);
274+
}
259275
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
260276
console.log(`Average RSS (eachMessage): confluent ${consumerConfluentMessageAverageRSS}, kafkajs ${consumerKjsMessageAverageRSS}`);
261277
console.log(`Max RSS (eachMessage): confluent ${consumerConfluentMessageMaxRSS}, kafkajs ${consumerKjsMessageMaxRSS}`);
@@ -266,8 +282,12 @@ async function main() {
266282
if (consumerModeAll || consumerModeEachBatch) {
267283
console.log(`Consumer rates MB/s (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
268284
console.log(`Consumer rates msg/s (eachBatch): confluent ${consumerConfluentBatchRate}, kafkajs ${consumerKjsBatchRate}`);
269-
console.log(`Consumer average E2E latency (eachBatch): confluent ${consumerConfluentBatchAvgLatency}, kafkajs ${consumerKjsBatchAvgLatency}`);
270-
console.log(`Consumer max E2E latency (eachBatch): confluent ${consumerConfluentBatchMaxLatency}, kafkajs ${consumerKjsBatchMaxLatency}`);
285+
console.log(`Consumer average E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T1}, kafkajs ${consumerKjsBatchAvgLatencyT0T1}`);
286+
console.log(`Consumer max E2E latency T0-T1 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T1}, kafkajs ${consumerKjsBatchMaxLatencyT0T1}`);
287+
if (produceToSecondTopic) {
288+
console.log(`Consumer average E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchAvgLatencyT0T2}, kafkajs ${consumerKjsBatchAvgLatencyT0T2}`);
289+
console.log(`Consumer max E2E latency T0-T2 (eachBatch): confluent ${consumerConfluentBatchMaxLatencyT0T2}, kafkajs ${consumerKjsBatchMaxLatencyT0T2}`);
290+
}
271291
console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
272292
console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`);
273293
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);

examples/performance/performance-consolidated.js

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ if (mode === 'confluent') {
99
} else {
1010
({ runProducer, runConsumer, runConsumeTransformProduce, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
1111
/* createTopics is more reliable in CKJS */
12-
({ runCreateTopics, runLagMonitoring } = require('./performance-primitives'));
12+
({ runCreateTopics, runLagMonitoring, runProducer: runProducerCKJS } = require('./performance-primitives'));
1313
}
1414

1515
const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
@@ -32,6 +32,7 @@ const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +proc
3232
const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100;
3333
const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.env.PRODUCER_PROCESSING_TIME : 100;
3434
const limitRPS = process.env.LIMIT_RPS ? +process.env.LIMIT_RPS : null;
35+
const useCKJSProducerEverywhere = process.env.USE_CKJS_PRODUCER_EVERYWHERE === 'true';
3536
const parameters = {
3637
brokers,
3738
securityProtocol,
@@ -134,8 +135,13 @@ function logParameters(parameters) {
134135
console.log(` Compression: ${compression}`);
135136
console.log(` Limit RPS: ${limitRPS}`);
136137
console.log(` Warmup Messages: ${warmupMessages}`);
138+
console.log(` Use CKJS Producer Everywhere: ${useCKJSProducerEverywhere}`);
137139
startTrackingMemory();
138-
const producerRate = await runProducer(parameters, topic, batchSize,
140+
let runProducerFunction = runProducer;
141+
if (useCKJSProducerEverywhere) {
142+
runProducerFunction = runProducerCKJS;
143+
}
144+
const producerRate = await runProducerFunction(parameters, topic, batchSize,
139145
warmupMessages, messageCount, messageSize, compression,
140146
randomness, limitRPS);
141147
endTrackingMemory('producer', `producer-memory-${mode}.json`);
@@ -153,12 +159,16 @@ function logParameters(parameters) {
153159
const consumerRate = await runConsumer(parameters, topic,
154160
warmupMessages, messageCount,
155161
false, partitionsConsumedConcurrently, stats,
156-
produceToSecondTopic ? topic2 : null, compression);
162+
produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere);
157163
endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`);
158164
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
159165
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
160-
console.log("=== Consumer average E2E latency (eachMessage): ", stats.avgLatency);
161-
console.log("=== Consumer max E2E latency (eachMessage): ", stats.maxLatency);
166+
console.log("=== Consumer average E2E latency T0-T1 (eachMessage): ", stats.avgLatencyT0T1);
167+
console.log("=== Consumer max E2E latency T0-T1 (eachMessage): ", stats.maxLatencyT0T1);
168+
if (produceToSecondTopic) {
169+
console.log("=== Consumer average E2E latency T0-T2 (eachMessage): ", stats.avgLatencyT0T2);
170+
console.log("=== Consumer max E2E latency T0-T2 (eachMessage): ", stats.maxLatencyT0T2);
171+
}
162172
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
163173
}
164174

@@ -173,15 +183,19 @@ function logParameters(parameters) {
173183
const consumerRate = await runConsumer(parameters, topic,
174184
warmupMessages, messageCount,
175185
true, partitionsConsumedConcurrently, stats,
176-
produceToSecondTopic ? topic2 : null, compression);
186+
produceToSecondTopic ? topic2 : null, compression, useCKJSProducerEverywhere);
177187
endTrackingMemory('consumer-each-batch', `consumer-memory-batch-${mode}.json`);
178188
console.log("=== Consumer Rate MB/s (eachBatch): ", consumerRate);
179189
console.log("=== Consumer Rate msg/s (eachBatch): ", stats.messageRate);
180190
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
181191
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
182192
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
183-
console.log("=== Consumer average E2E latency (eachBatch): ", stats.avgLatency);
184-
console.log("=== Consumer max E2E latency (eachBatch): ", stats.maxLatency);
193+
console.log("=== Consumer average E2E latency T0-T1 (eachBatch): ", stats.avgLatencyT0T1);
194+
console.log("=== Consumer max E2E latency T0-T1 (eachBatch): ", stats.maxLatencyT0T1);
195+
if (produceToSecondTopic) {
196+
console.log("=== Consumer average E2E latency T0-T2 (eachBatch): ", stats.avgLatencyT0T2);
197+
console.log("=== Consumer max E2E latency T0-T2 (eachBatch): ", stats.maxLatencyT0T2);
198+
}
185199
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
186200
}
187201

0 commit comments

Comments
 (0)