11const { hrtime } = require ( 'process' ) ;
22const { randomBytes } = require ( 'crypto' ) ;
3+ const PERCENTILES = [ 50 , 75 , 90 , 95 , 99 , 99.9 , 99.99 , 100 ] ;
34
45const TERMINATE_TIMEOUT_MS = process . env . TERMINATE_TIMEOUT_MS ? + process . env . TERMINATE_TIMEOUT_MS : 600000 ;
56const AUTO_COMMIT = process . env . AUTO_COMMIT || 'false' ;
@@ -58,8 +59,78 @@ function genericProduceToTopic(producer, topic, messages) {
5859 } ) ;
5960}
6061
62+
63+ // We use a simple count-sketch for latency percentiles to avoid storing all latencies in memory.
64+ // because we're also measuring the memory usage of the consumer as part of the performance tests.
65+ class LatencyCountSketch {
66+ #numBuckets;
67+ #maxValue;
68+ #buckets;
69+ #counts;
70+ #findNextMinimum;
71+ #changeBaseLogarithm;
72+ #totalCount = 0 ;
73+
74+ constructor ( {
75+ numBuckets = 600 ,
76+ error = 0.01 , // 1% error
77+ maxValue = 60000 , // max 60s latency
78+ } ) {
79+ // Each bucket represents [x * (1 - error), x * (1 + error))
80+ this . #numBuckets = numBuckets ;
81+ this . #findNextMinimum = 1 / ( 1 + error ) * ( 1 - error ) ;
82+ // Change base from natural log to log base findNextMinimum
83+ this . #changeBaseLogarithm = Math . log ( this . #findNextMinimum) ;
84+
85+ this . #maxValue = maxValue ;
86+ this . #buckets = new Array ( this . #numBuckets + 2 ) . fill ( 0 ) ;
87+ this . #buckets[ this . #numBuckets + 1 ] = Number . POSITIVE_INFINITY ;
88+ this . #buckets[ this . #numBuckets] = this . #maxValue;
89+ this . #buckets[ 0 ] = 0 ;
90+ let i = this . #numBuckets - 1 ;
91+ let currentValue = maxValue ;
92+ while ( i >= 1 ) {
93+ let nextMinimum = currentValue * this . #findNextMinimum;
94+ this . #buckets[ i ] = nextMinimum ;
95+ currentValue = nextMinimum ;
96+ i -- ;
97+ }
98+ this . #counts = new Array ( this . #numBuckets + 2 ) . fill ( 0 ) ;
99+ }
100+
101+ add ( latency ) {
102+ let idx = Math . floor ( this . #numBuckets - Math . log ( latency / this . #maxValue) / this . #changeBaseLogarithm) ;
103+ idx = idx < 0 ? 0 :
104+ idx > this . #buckets. length - 2 ? this . #buckets. length - 2 :
105+ idx ;
106+
107+ this . #counts[ idx ] ++ ;
108+ this . #totalCount++ ;
109+ }
110+
111+ percentiles ( percentilesArray ) {
112+ const percentileCounts = percentilesArray . map ( p => Math . ceil ( this . #totalCount * p / 100 ) ) ;
113+ const percentileResults = new Array ( percentilesArray . length ) ;
114+ var totalCountSoFar = 0 ;
115+ let j = 0 ;
116+ for ( let i = 0 ; i < percentileCounts . length ; i ++ ) {
117+ while ( totalCountSoFar < percentileCounts [ i ] && j < this . #counts. length - 1 ) {
118+ totalCountSoFar += this . #counts[ j ] ;
119+ j ++ ;
120+ }
121+ const bucketIndex = j < this . #counts. length - 1 ? j : this . #counts. length - 2 ;
122+ percentileResults [ i ] = this . #buckets[ bucketIndex ] ;
123+ }
124+ return percentileResults ;
125+ }
126+ }
127+
61128async function runConsumer ( consumer , topic , warmupMessages , totalMessageCnt , eachBatch , partitionsConsumedConcurrently , stats , actionOnMessages ) {
62129 const handlers = installHandlers ( totalMessageCnt === - 1 ) ;
130+ if ( stats ) {
131+ stats . percentilesTOT1 = new LatencyCountSketch ( { } ) ;
132+ stats . percentilesTOT2 = new LatencyCountSketch ( { } ) ;
133+ }
63134 while ( true ) {
64135 try {
65136 await consumer . connect ( ) ;
@@ -106,6 +177,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
106177 stats . maxLatencyT0T1 = Math . max ( stats . maxLatencyT0T1 , latency ) ;
107178 stats . avgLatencyT0T1 = ( ( stats . avgLatencyT0T1 * ( numMessages - 1 ) ) + latency ) / numMessages ;
108179 }
180+ stats . percentilesTOT1 . add ( latency ) ;
109181 } else {
110182 if ( ! stats . maxLatencyT0T2 ) {
111183 stats . maxLatencyT0T2 = latency ;
@@ -114,6 +186,7 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
114186 stats . maxLatencyT0T2 = Math . max ( stats . maxLatencyT0T2 , latency ) ;
115187 stats . avgLatencyT0T2 = ( ( stats . avgLatencyT0T2 * ( numMessages - 1 ) ) + latency ) / numMessages ;
116188 }
189+ stats . percentilesTOT2 . add ( latency ) ;
117190 }
118191 } ;
119192
@@ -257,6 +330,14 @@ async function runConsumer(consumer, topic, warmupMessages, totalMessageCnt, eac
257330 stats . messageRate = durationSeconds > 0 ?
258331 ( messagesMeasured / durationSeconds ) : Infinity ;
259332 stats . durationSeconds = durationSeconds ;
333+ stats . percentilesTOT1 = stats . percentilesTOT1 . percentiles ( PERCENTILES ) . map ( ( value , index ) => ( {
334+ percentile : PERCENTILES [ index ] ,
335+ value,
336+ } ) ) ;
337+ stats . percentilesTOT2 = stats . percentilesTOT2 . percentiles ( PERCENTILES ) . map ( ( value , index ) => ( {
338+ percentile : PERCENTILES [ index ] ,
339+ value,
340+ } ) ) ;
260341 }
261342 removeHandlers ( handlers ) ;
262343 return rate ;
0 commit comments