From d679df326380dac348d7ccf184effca6187dc9fc Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 21 Dec 2025 15:00:45 -0600 Subject: [PATCH 01/11] change to windowed sum ratio metrics --- .../internals/DefaultStateUpdater.java | 96 +++++++++++++++---- .../internals/metrics/StreamsMetricsImpl.java | 1 + .../internals/DefaultStateUpdaterTest.java | 8 +- 3 files changed, 82 insertions(+), 23 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 1bfe5eaceacf4..4e4ffebd2c5f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -22,12 +22,14 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; -import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.metrics.stats.WindowedSum; +import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; @@ -67,8 +69,8 @@ import java.util.stream.Stream; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATE_DESCRIPTION_PREFIX; public class DefaultStateUpdater implements StateUpdater { @@ -84,6 +86,14 @@ private class StateUpdaterThread extends Thread { private final Map updatingTasks = new ConcurrentHashMap<>(); private final Map pausedTasks = new ConcurrentHashMap<>(); + private final WindowedSum idleTimeWindowedSum = new WindowedSum(); + private final WindowedSum checkpointTimeWindowedSum = new WindowedSum(); + private final WindowedSum activeRestoreTimeWindowedSum = new WindowedSum(); + private final WindowedSum standbyRestoreTimeWindowedSum = new WindowedSum(); + private final MetricConfig metricsConfig; + + private boolean timeWindowInitialized = false; + private long totalCheckpointLatency = 0L; private volatile long fetchDeadlineClientInstanceId = -1L; @@ -95,6 +105,7 @@ public StateUpdaterThread(final String name, super(name); this.changelogReader = changelogReader; this.updaterMetrics = new StateUpdaterMetrics(metrics, name); + this.metricsConfig = metrics.metricsRegistry().config(); } public Collection updatingTasks() { @@ -144,6 +155,7 @@ public long numPausedActiveTasks() { public void run() { log.info("State updater thread started"); try { + initTimeWindowIfNeeded(time.milliseconds()); while (isRunning.get()) { runOnce(); } @@ -713,19 +725,65 @@ private void measureCheckpointLatency(final Runnable actionToMeasure) { private void recordMetrics(final long now, final long totalLatency, final long totalWaitLatency) { final long totalRestoreLatency = Math.max(0L, totalLatency - totalWaitLatency - totalCheckpointLatency); - updaterMetrics.idleRatioSensor.record((double) totalWaitLatency / totalLatency, now); - updaterMetrics.checkpointRatioSensor.record((double) totalCheckpointLatency / totalLatency, now); + recordWindowedSum( + now, + (double) totalWaitLatency, + (double) totalCheckpointLatency, + (double) totalRestoreLatency * (changelogReader.isRestoringActive() ? 1.0d : 0.0d), + (double) totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d) + ); - if (changelogReader.isRestoringActive()) { - updaterMetrics.activeRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now); - updaterMetrics.standbyRestoreRatioSensor.record(0.0d, now); - } else { - updaterMetrics.standbyRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now); - updaterMetrics.activeRestoreRatioSensor.record(0.0d, now); - } + recordRatios(now); totalCheckpointLatency = 0L; } + + private void initTimeWindowIfNeeded(final long now) { + if (!timeWindowInitialized) { + idleTimeWindowedSum.record(metricsConfig, 0.0, now); + checkpointTimeWindowedSum.record(metricsConfig, 0.0, now); + activeRestoreTimeWindowedSum.record(metricsConfig, 0.0, now); + standbyRestoreTimeWindowedSum.record(metricsConfig, 0.0, now); + timeWindowInitialized = true; + } + } + + private void recordWindowedSum(final long now, + final double idleTime, + final double checkpointTime, + final double activeRestoreTime, + final double standbyRestoreTime) { + idleTimeWindowedSum.record(metricsConfig, idleTime, now); + checkpointTimeWindowedSum.record(metricsConfig, checkpointTime, now); + activeRestoreTimeWindowedSum.record(metricsConfig, activeRestoreTime, now); + standbyRestoreTimeWindowedSum.record(metricsConfig, standbyRestoreTime, now); + } + + private void recordRatios(final long now) { + final double idleTime = idleTimeWindowedSum.measure(metricsConfig, now); + final double checkpointTime = checkpointTimeWindowedSum.measure(metricsConfig, now); + final double activeRestoreTime = activeRestoreTimeWindowedSum.measure(metricsConfig, now); + final double standbyRestoreTime = standbyRestoreTimeWindowedSum.measure(metricsConfig, now); + + final double totalTime = idleTime + checkpointTime + activeRestoreTime + standbyRestoreTime; + + recordRatio(now, totalTime, idleTime, updaterMetrics.idleRatioSensor); + recordRatio(now, totalTime, checkpointTime, updaterMetrics.checkpointRatioSensor); + recordRatio(now, totalTime, activeRestoreTime, updaterMetrics.activeRestoreRatioSensor); + recordRatio(now, totalTime, standbyRestoreTime, updaterMetrics.standbyRestoreRatioSensor); + } + + private void recordRatio(final long now, + final double totalTime, + final double elapsedTime, + final Sensor ratioSensor) { + if (totalTime > 0.0) { + ratioSensor.record(elapsedTime / totalTime, now); + } else { + ratioSensor.record(0.0, now); + } + } + } private final Time time; @@ -1035,10 +1093,10 @@ private Stream streamOfNonPausedTasks() { private class StateUpdaterMetrics { private static final String STATE_LEVEL_GROUP = "stream-state-updater-metrics"; - private static final String IDLE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "being idle"; - private static final String RESTORE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "restoring active tasks"; - private static final String UPDATE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "updating standby tasks"; - private static final String CHECKPOINT_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "checkpointing tasks restored progress"; + private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATE_DESCRIPTION_PREFIX + "of the time this thread spend being idle"; + private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATE_DESCRIPTION_PREFIX + "of the time this thread spend restoring active tasks"; + private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATE_DESCRIPTION_PREFIX + "of the time this thread spend updating standby tasks"; + private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATE_DESCRIPTION_PREFIX + "of the time this thread spend checkpointing tasks restored progress"; private static final String RESTORE_RECORDS_RATE_DESCRIPTION = RATE_DESCRIPTION + "records restored"; private static final String RESTORE_RATE_DESCRIPTION = RATE_DESCRIPTION + "restore calls triggered"; @@ -1089,19 +1147,19 @@ private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threa allMetricNames.push(metricName); this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO); - this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Value()); allSensors.add(this.idleRatioSensor); this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO); - this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Value()); allSensors.add(this.activeRestoreRatioSensor); this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO); - this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Value()); allSensors.add(this.standbyRestoreRatioSensor); this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO); - this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Value()); allSensors.add(this.checkpointRatioSensor); this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 641fdf9470e21..8485bf08c51e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -156,6 +156,7 @@ public int hashCode() { public static final String LATENCY_DESCRIPTION_SUFFIX = " in milliseconds"; public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; + public static final String WINDOWED_RATE_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, "; public static final String RECORD_E2E_LATENCY = "record-e2e-latency"; public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX = diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 4f9d1b3c0e6b4..c77342a0b1e13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -1645,25 +1645,25 @@ public void shouldRecordMetrics() throws Exception { metricName = new MetricName("idle-ratio", "stream-state-updater-metrics", - "The fraction of time the thread spent on being idle", + "The ratio, over a rolling measurement window, of the time this thread spend being idle", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); metricName = new MetricName("active-restore-ratio", "stream-state-updater-metrics", - "The fraction of time the thread spent on restoring active tasks", + "The ratio, over a rolling measurement window, of the time this thread spend restoring active tasks", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); metricName = new MetricName("standby-update-ratio", "stream-state-updater-metrics", - "The fraction of time the thread spent on updating standby tasks", + "The ratio, over a rolling measurement window, of the time this thread spend updating standby tasks", tagMap); verifyMetric(metrics, metricName, is(0.0d)); metricName = new MetricName("checkpoint-ratio", "stream-state-updater-metrics", - "The fraction of time the thread spent on checkpointing tasks restored progress", + "The ratio, over a rolling measurement window, of the time this thread spend checkpointing tasks restored progress", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); From cd124a410a35d02805f227c5c108d6882fec61b0 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Sun, 21 Dec 2025 15:53:28 -0600 Subject: [PATCH 02/11] update metric prefix for StreamThread --- .../processor/internals/DefaultStateUpdater.java | 10 +++++----- .../internals/metrics/StreamsMetricsImpl.java | 3 +-- .../processor/internals/metrics/ThreadMetrics.java | 9 +++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 4e4ffebd2c5f1..020b7382b63ad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -70,7 +70,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATE_DESCRIPTION_PREFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX; public class DefaultStateUpdater implements StateUpdater { @@ -1093,10 +1093,10 @@ private Stream streamOfNonPausedTasks() { private class StateUpdaterMetrics { private static final String STATE_LEVEL_GROUP = "stream-state-updater-metrics"; - private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATE_DESCRIPTION_PREFIX + "of the time this thread spend being idle"; - private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATE_DESCRIPTION_PREFIX + "of the time this thread spend restoring active tasks"; - private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATE_DESCRIPTION_PREFIX + "of the time this thread spend updating standby tasks"; - private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATE_DESCRIPTION_PREFIX + "of the time this thread spend checkpointing tasks restored progress"; + private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spend being idle"; + private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spend restoring active tasks"; + private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spend updating standby tasks"; + private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spend checkpointing tasks restored progress"; private static final String RESTORE_RECORDS_RATE_DESCRIPTION = RATE_DESCRIPTION + "records restored"; private static final String RESTORE_RATE_DESCRIPTION = RATE_DESCRIPTION + "restore calls triggered"; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 8485bf08c51e4..b9dc1c2b8466c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -150,13 +150,12 @@ public int hashCode() { public static final String OPERATIONS = " operations"; public static final String TOTAL_DESCRIPTION = "The total number of "; public static final String RATE_DESCRIPTION = "The average per-second number of "; - public static final String RATIO_DESCRIPTION = "The fraction of time the thread spent on "; public static final String AVG_LATENCY_DESCRIPTION = "The average latency of "; public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of "; public static final String LATENCY_DESCRIPTION_SUFFIX = " in milliseconds"; public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; - public static final String WINDOWED_RATE_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, "; + public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, "; public static final String RECORD_E2E_LATENCY = "record-e2e-latency"; public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX = diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index 73fa53ebcdd04..d9701fc6472b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -32,6 +32,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORDS_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor; @@ -82,16 +83,16 @@ private ThreadMetrics() {} private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency"; private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency"; private static final String PROCESS_RATIO_DESCRIPTION = - "The ratio, over a rolling measurement window, of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spent " + "processing active tasks to the total elapsed time in that window."; private static final String PUNCTUATE_RATIO_DESCRIPTION = - "The ratio, over a rolling measurement window, of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spent " + "punctuating active tasks to the total elapsed time in that window."; private static final String POLL_RATIO_DESCRIPTION = - "The ratio, over a rolling measurement window, of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spent " + "polling records from the consumer to the total elapsed time in that window."; private static final String COMMIT_RATIO_DESCRIPTION = - "The ratio, over a rolling measurement window, of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spent " + "committing all tasks to the total elapsed time in that window."; private static final String BLOCKED_TIME_DESCRIPTION = "The total time the thread spent blocked on kafka in nanoseconds"; From 04467dce21fe9ffa6f046d2b6a1b01d46d0b333f Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 22 Dec 2025 11:25:00 -0600 Subject: [PATCH 03/11] convert string to const --- .../streams/processor/internals/DefaultStateUpdater.java | 9 +++++---- .../processor/internals/metrics/StreamsMetricsImpl.java | 1 + .../processor/internals/metrics/ThreadMetrics.java | 9 +++++---- .../processor/internals/DefaultStateUpdaterTest.java | 8 ++++---- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 020b7382b63ad..93c4ce18308be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -70,6 +70,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX; public class DefaultStateUpdater implements StateUpdater { @@ -1093,10 +1094,10 @@ private Stream streamOfNonPausedTasks() { private class StateUpdaterMetrics { private static final String STATE_LEVEL_GROUP = "stream-state-updater-metrics"; - private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spend being idle"; - private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spend restoring active tasks"; - private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spend updating standby tasks"; - private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spend checkpointing tasks restored progress"; + private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "being idle"; + private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "restoring active tasks"; + private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "updating standby tasks"; + private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "checkpointing tasks restored progress"; private static final String RESTORE_RECORDS_RATE_DESCRIPTION = RATE_DESCRIPTION + "records restored"; private static final String RESTORE_RATE_DESCRIPTION = RATE_DESCRIPTION + "restore calls triggered"; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index b9dc1c2b8466c..a23a50e755aa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -156,6 +156,7 @@ public int hashCode() { public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, "; + public static final String THREAD_TIME_UNIT_DESCRIPTION = "of the time this thread spent "; public static final String RECORD_E2E_LATENCY = "record-e2e-latency"; public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX = diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index d9701fc6472b7..d9e8f668c137d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -31,6 +31,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORDS_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; @@ -83,16 +84,16 @@ private ThreadMetrics() {} private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency"; private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency"; private static final String PROCESS_RATIO_DESCRIPTION = - WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "processing active tasks to the total elapsed time in that window."; private static final String PUNCTUATE_RATIO_DESCRIPTION = - WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "punctuating active tasks to the total elapsed time in that window."; private static final String POLL_RATIO_DESCRIPTION = - WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "polling records from the consumer to the total elapsed time in that window."; private static final String COMMIT_RATIO_DESCRIPTION = - WINDOWED_RATIO_DESCRIPTION_PREFIX + "of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "committing all tasks to the total elapsed time in that window."; private static final String BLOCKED_TIME_DESCRIPTION = "The total time the thread spent blocked on kafka in nanoseconds"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index c77342a0b1e13..8087e28c8b338 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -1645,25 +1645,25 @@ public void shouldRecordMetrics() throws Exception { metricName = new MetricName("idle-ratio", "stream-state-updater-metrics", - "The ratio, over a rolling measurement window, of the time this thread spend being idle", + "The ratio, over a rolling measurement window, of the time this thread spent being idle", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); metricName = new MetricName("active-restore-ratio", "stream-state-updater-metrics", - "The ratio, over a rolling measurement window, of the time this thread spend restoring active tasks", + "The ratio, over a rolling measurement window, of the time this thread spent restoring active tasks", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); metricName = new MetricName("standby-update-ratio", "stream-state-updater-metrics", - "The ratio, over a rolling measurement window, of the time this thread spend updating standby tasks", + "The ratio, over a rolling measurement window, of the time this thread spent updating standby tasks", tagMap); verifyMetric(metrics, metricName, is(0.0d)); metricName = new MetricName("checkpoint-ratio", "stream-state-updater-metrics", - "The ratio, over a rolling measurement window, of the time this thread spend checkpointing tasks restored progress", + "The ratio, over a rolling measurement window, of the time this thread spent checkpointing tasks restored progress", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); From b3887d3b5e40c13071030ae2854cb7bdc779eba3 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Fri, 9 Jan 2026 11:18:20 -0600 Subject: [PATCH 04/11] adjust import order --- .../kafka/streams/processor/internals/DefaultStateUpdater.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 93c4ce18308be..75bd1fed6b280 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -27,9 +27,9 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.metrics.stats.WindowedCount; import org.apache.kafka.common.metrics.stats.WindowedSum; -import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; From 9f3107b3727cc563c2201f9e2388aa05800f978a Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Tue, 13 Jan 2026 10:45:08 -0600 Subject: [PATCH 05/11] revise total latency --- .../streams/processor/internals/DefaultStateUpdater.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 75bd1fed6b280..659a2aaef8aad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -734,7 +734,7 @@ private void recordMetrics(final long now, final long totalLatency, final long t (double) totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d) ); - recordRatios(now); + recordRatios(now, totalLatency); totalCheckpointLatency = 0L; } @@ -760,14 +760,12 @@ private void recordWindowedSum(final long now, standbyRestoreTimeWindowedSum.record(metricsConfig, standbyRestoreTime, now); } - private void recordRatios(final long now) { + private void recordRatios(final long now, final long totalTime) { final double idleTime = idleTimeWindowedSum.measure(metricsConfig, now); final double checkpointTime = checkpointTimeWindowedSum.measure(metricsConfig, now); final double activeRestoreTime = activeRestoreTimeWindowedSum.measure(metricsConfig, now); final double standbyRestoreTime = standbyRestoreTimeWindowedSum.measure(metricsConfig, now); - final double totalTime = idleTime + checkpointTime + activeRestoreTime + standbyRestoreTime; - recordRatio(now, totalTime, idleTime, updaterMetrics.idleRatioSensor); recordRatio(now, totalTime, checkpointTime, updaterMetrics.checkpointRatioSensor); recordRatio(now, totalTime, activeRestoreTime, updaterMetrics.activeRestoreRatioSensor); From b3d8b6918b4bfdde6ff7753796fed744cb05fbed Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Wed, 14 Jan 2026 09:35:18 -0600 Subject: [PATCH 06/11] add to upgrade doc --- docs/streams/upgrade-guide.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md index 1add58f435975..735dd783cb251 100644 --- a/docs/streams/upgrade-guide.md +++ b/docs/streams/upgrade-guide.md @@ -61,6 +61,11 @@ Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS v Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher. +## Streams API changes in 4.3.0 + +The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, and `poll-ratio` , along with streams state updater metrics `active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and `checkpoint-ratio` have been updated. Each metric now reports, over a rolling measurement window, the ratio of time this thread spends performing the given action (`{action}`) to the total elapsed time in that window. The effective window duration is determined by the metrics configuration: `metrics.sample.window.ms` (per-sample window length)and `metrics.num.samples` (number of rolling windows). + + ## Streams API changes in 4.2.0 Kafka Streams now supports Dead Letter Queue (DLQ). A new config `errors.deadletterqueue.topic.name` allows to specify the name of the DLQ topic. When set and `DefaultProductionExceptionHandler` is used, records that cause exceptions will be forwarded to the DLQ topic. If a custom exception handler is used, it is up to the custom handler to build DLQ records to send, hence, depending on the implementation, the `errors.deadletterqueue.topic.name` configuration may be ignored. `org.apache.kafka.streams.errors.ProductionExceptionHandler$ProductionExceptionHandlerResponse` is deprecated and replaced by `org.apache.kafka.streams.errors.ProductionExceptionHandler$Response`. Methods `handle` and `handleSerializationException` in `org.apache.kafka.streams.errors.ProductionExceptionHandler` are deprecated and replaced by `handleError` and `handleSerializationError` respectively in order to use the new `Response` class. More details can be found in [KIP-1034](https://cwiki.apache.org/confluence/x/HwviEQ). From ba28acd2ad7ad170df09850148a85aeccd9f9b09 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Wed, 14 Jan 2026 09:38:36 -0600 Subject: [PATCH 07/11] remove unnecessary spacing --- docs/streams/upgrade-guide.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md index 735dd783cb251..d94ef3745abc8 100644 --- a/docs/streams/upgrade-guide.md +++ b/docs/streams/upgrade-guide.md @@ -65,7 +65,6 @@ Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires Ma The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, and `poll-ratio` , along with streams state updater metrics `active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and `checkpoint-ratio` have been updated. Each metric now reports, over a rolling measurement window, the ratio of time this thread spends performing the given action (`{action}`) to the total elapsed time in that window. The effective window duration is determined by the metrics configuration: `metrics.sample.window.ms` (per-sample window length)and `metrics.num.samples` (number of rolling windows). - ## Streams API changes in 4.2.0 Kafka Streams now supports Dead Letter Queue (DLQ). A new config `errors.deadletterqueue.topic.name` allows to specify the name of the DLQ topic. When set and `DefaultProductionExceptionHandler` is used, records that cause exceptions will be forwarded to the DLQ topic. If a custom exception handler is used, it is up to the custom handler to build DLQ records to send, hence, depending on the implementation, the `errors.deadletterqueue.topic.name` configuration may be ignored. `org.apache.kafka.streams.errors.ProductionExceptionHandler$ProductionExceptionHandlerResponse` is deprecated and replaced by `org.apache.kafka.streams.errors.ProductionExceptionHandler$Response`. Methods `handle` and `handleSerializationException` in `org.apache.kafka.streams.errors.ProductionExceptionHandler` are deprecated and replaced by `handleError` and `handleSerializationError` respectively in order to use the new `Response` class. More details can be found in [KIP-1034](https://cwiki.apache.org/confluence/x/HwviEQ). From c8e6b5b89dee8d06b6b330c13f77263997534977 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Wed, 14 Jan 2026 09:58:52 -0600 Subject: [PATCH 08/11] remove space --- docs/streams/upgrade-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md index d94ef3745abc8..eeb09a330b981 100644 --- a/docs/streams/upgrade-guide.md +++ b/docs/streams/upgrade-guide.md @@ -63,7 +63,7 @@ Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires Ma ## Streams API changes in 4.3.0 -The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, and `poll-ratio` , along with streams state updater metrics `active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and `checkpoint-ratio` have been updated. Each metric now reports, over a rolling measurement window, the ratio of time this thread spends performing the given action (`{action}`) to the total elapsed time in that window. The effective window duration is determined by the metrics configuration: `metrics.sample.window.ms` (per-sample window length)and `metrics.num.samples` (number of rolling windows). +The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, and `poll-ratio`, along with streams state updater metrics `active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and `checkpoint-ratio` have been updated. Each metric now reports, over a rolling measurement window, the ratio of time this thread spends performing the given action (`{action}`) to the total elapsed time in that window. The effective window duration is determined by the metrics configuration: `metrics.sample.window.ms` (per-sample window length)and `metrics.num.samples` (number of rolling windows). ## Streams API changes in 4.2.0 From bcc1cf2a9f4ba8616357c99909ec68c6eeeaa334 Mon Sep 17 00:00:00 2001 From: Lucy Liu Date: Thu, 15 Jan 2026 16:27:21 -0600 Subject: [PATCH 09/11] Apply suggestions from code review Co-authored-by: Matthias J. Sax --- docs/streams/upgrade-guide.md | 2 +- .../kafka/streams/processor/internals/DefaultStateUpdater.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md index eeb09a330b981..1721602b6af1b 100644 --- a/docs/streams/upgrade-guide.md +++ b/docs/streams/upgrade-guide.md @@ -63,7 +63,7 @@ Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires Ma ## Streams API changes in 4.3.0 -The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, and `poll-ratio`, along with streams state updater metrics `active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and `checkpoint-ratio` have been updated. Each metric now reports, over a rolling measurement window, the ratio of time this thread spends performing the given action (`{action}`) to the total elapsed time in that window. The effective window duration is determined by the metrics configuration: `metrics.sample.window.ms` (per-sample window length)and `metrics.num.samples` (number of rolling windows). +The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, and `poll-ratio`, along with streams state updater metrics `active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and `checkpoint-ratio` have been updated. Each metric now reports, over a rolling measurement window, the ratio of time this thread spends performing the given action (`{action}`) to the total elapsed time in that window. The effective window duration is determined by the metrics configuration: `metrics.sample.window.ms` (per-sample window length) and `metrics.num.samples` (number of rolling windows). ## Streams API changes in 4.2.0 diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 659a2aaef8aad..7bb6e124b2734 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -1092,7 +1092,7 @@ private Stream streamOfNonPausedTasks() { private class StateUpdaterMetrics { private static final String STATE_LEVEL_GROUP = "stream-state-updater-metrics"; - private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "being idle"; + private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "being idle"; private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "restoring active tasks"; private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "updating standby tasks"; private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "checkpointing tasks restored progress"; From e14379d8c59def903ff8c499e7fd4492f920ddf0 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Thu, 15 Jan 2026 16:35:31 -0600 Subject: [PATCH 10/11] remove explicit casting from long to double --- .../streams/processor/internals/DefaultStateUpdater.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 7bb6e124b2734..7370733c5832f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -728,10 +728,10 @@ private void recordMetrics(final long now, final long totalLatency, final long t recordWindowedSum( now, - (double) totalWaitLatency, - (double) totalCheckpointLatency, - (double) totalRestoreLatency * (changelogReader.isRestoringActive() ? 1.0d : 0.0d), - (double) totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d) + totalWaitLatency, + totalCheckpointLatency, + totalRestoreLatency * (changelogReader.isRestoringActive() ? 1.0d : 0.0d), + totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d) ); recordRatios(now, totalLatency); From 713b125d857cd3583789f3b4a5db4da7af9fd846 Mon Sep 17 00:00:00 2001 From: lucliu1108 Date: Mon, 26 Jan 2026 23:28:00 -0600 Subject: [PATCH 11/11] align name with StreamThread --- .../internals/DefaultStateUpdater.java | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 7370733c5832f..7792a64f88f5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -91,6 +91,7 @@ private class StateUpdaterThread extends Thread { private final WindowedSum checkpointTimeWindowedSum = new WindowedSum(); private final WindowedSum activeRestoreTimeWindowedSum = new WindowedSum(); private final WindowedSum standbyRestoreTimeWindowedSum = new WindowedSum(); + private final WindowedSum runOnceLatencyWindowedSum = new WindowedSum(); private final MetricConfig metricsConfig; private boolean timeWindowInitialized = false; @@ -731,10 +732,11 @@ private void recordMetrics(final long now, final long totalLatency, final long t totalWaitLatency, totalCheckpointLatency, totalRestoreLatency * (changelogReader.isRestoringActive() ? 1.0d : 0.0d), - totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d) + totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d), + totalLatency ); - recordRatios(now, totalLatency); + recordRatios(now); totalCheckpointLatency = 0L; } @@ -745,6 +747,7 @@ private void initTimeWindowIfNeeded(final long now) { checkpointTimeWindowedSum.record(metricsConfig, 0.0, now); activeRestoreTimeWindowedSum.record(metricsConfig, 0.0, now); standbyRestoreTimeWindowedSum.record(metricsConfig, 0.0, now); + runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now); timeWindowInitialized = true; } } @@ -753,31 +756,31 @@ private void recordWindowedSum(final long now, final double idleTime, final double checkpointTime, final double activeRestoreTime, - final double standbyRestoreTime) { + final double standbyRestoreTime, + final double totalLatency) { idleTimeWindowedSum.record(metricsConfig, idleTime, now); checkpointTimeWindowedSum.record(metricsConfig, checkpointTime, now); activeRestoreTimeWindowedSum.record(metricsConfig, activeRestoreTime, now); standbyRestoreTimeWindowedSum.record(metricsConfig, standbyRestoreTime, now); + runOnceLatencyWindowedSum.record(metricsConfig, totalLatency, now); } - private void recordRatios(final long now, final long totalTime) { - final double idleTime = idleTimeWindowedSum.measure(metricsConfig, now); - final double checkpointTime = checkpointTimeWindowedSum.measure(metricsConfig, now); - final double activeRestoreTime = activeRestoreTimeWindowedSum.measure(metricsConfig, now); - final double standbyRestoreTime = standbyRestoreTimeWindowedSum.measure(metricsConfig, now); + private void recordRatios(final long now) { + final double runOnceLatencyWindow = runOnceLatencyWindowedSum.measure(metricsConfig, now); - recordRatio(now, totalTime, idleTime, updaterMetrics.idleRatioSensor); - recordRatio(now, totalTime, checkpointTime, updaterMetrics.checkpointRatioSensor); - recordRatio(now, totalTime, activeRestoreTime, updaterMetrics.activeRestoreRatioSensor); - recordRatio(now, totalTime, standbyRestoreTime, updaterMetrics.standbyRestoreRatioSensor); + recordRatio(now, runOnceLatencyWindow, idleTimeWindowedSum, updaterMetrics.idleRatioSensor); + recordRatio(now, runOnceLatencyWindow, checkpointTimeWindowedSum, updaterMetrics.checkpointRatioSensor); + recordRatio(now, runOnceLatencyWindow, activeRestoreTimeWindowedSum, updaterMetrics.activeRestoreRatioSensor); + recordRatio(now, runOnceLatencyWindow, standbyRestoreTimeWindowedSum, updaterMetrics.standbyRestoreRatioSensor); } private void recordRatio(final long now, - final double totalTime, - final double elapsedTime, + final double runOnceLatencyWindow, + final WindowedSum windowedSum, final Sensor ratioSensor) { - if (totalTime > 0.0) { - ratioSensor.record(elapsedTime / totalTime, now); + if (runOnceLatencyWindow > 0.0) { + final double elapsedTime = windowedSum.measure(metricsConfig, now); + ratioSensor.record(elapsedTime / runOnceLatencyWindow, now); } else { ratioSensor.record(0.0, now); }