diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md index c7e2938b3f7f9..6e5ccdd23251d 100644 --- a/docs/streams/upgrade-guide.md +++ b/docs/streams/upgrade-guide.md @@ -61,6 +61,10 @@ Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS v Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher. +## Streams API changes in 4.3.0 + +The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, and `poll-ratio`, along with streams state updater metrics `active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and `checkpoint-ratio` have been updated. Each metric now reports, over a rolling measurement window, the ratio of time this thread spends performing the given action (`{action}`) to the total elapsed time in that window. The effective window duration is determined by the metrics configuration: `metrics.sample.window.ms` (per-sample window length) and `metrics.num.samples` (number of rolling windows). + ## Streams API changes in 4.2.0 ### General Availability for a core feature set of the Streams Rebalance Protocol (KIP-1071) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 1bfe5eaceacf4..7792a64f88f5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -22,12 +22,14 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor.RecordingLevel; -import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.metrics.stats.WindowedSum; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; @@ -67,8 +69,9 @@ import java.util.stream.Stream; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION; -import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX; public class DefaultStateUpdater implements StateUpdater { @@ -84,6 +87,15 @@ private class StateUpdaterThread extends Thread { private final Map updatingTasks = new ConcurrentHashMap<>(); private final Map pausedTasks = new ConcurrentHashMap<>(); + private final WindowedSum idleTimeWindowedSum = new WindowedSum(); + private final WindowedSum checkpointTimeWindowedSum = new WindowedSum(); + private final WindowedSum activeRestoreTimeWindowedSum = new WindowedSum(); + private final WindowedSum standbyRestoreTimeWindowedSum = new WindowedSum(); + private final WindowedSum runOnceLatencyWindowedSum = new WindowedSum(); + private final MetricConfig metricsConfig; + + private boolean timeWindowInitialized = false; + private long totalCheckpointLatency = 0L; private volatile long fetchDeadlineClientInstanceId = -1L; @@ -95,6 +107,7 @@ public StateUpdaterThread(final String name, super(name); this.changelogReader = changelogReader; this.updaterMetrics = new StateUpdaterMetrics(metrics, name); + this.metricsConfig = metrics.metricsRegistry().config(); } public Collection updatingTasks() { @@ -144,6 +157,7 @@ public long numPausedActiveTasks() { public void run() { log.info("State updater thread started"); try { + initTimeWindowIfNeeded(time.milliseconds()); while (isRunning.get()) { runOnce(); } @@ -713,19 +727,65 @@ private void measureCheckpointLatency(final Runnable actionToMeasure) { private void recordMetrics(final long now, final long totalLatency, final long totalWaitLatency) { final long totalRestoreLatency = Math.max(0L, totalLatency - totalWaitLatency - totalCheckpointLatency); - updaterMetrics.idleRatioSensor.record((double) totalWaitLatency / totalLatency, now); - updaterMetrics.checkpointRatioSensor.record((double) totalCheckpointLatency / totalLatency, now); + recordWindowedSum( + now, + totalWaitLatency, + totalCheckpointLatency, + totalRestoreLatency * (changelogReader.isRestoringActive() ? 1.0d : 0.0d), + totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d), + totalLatency + ); - if (changelogReader.isRestoringActive()) { - updaterMetrics.activeRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now); - updaterMetrics.standbyRestoreRatioSensor.record(0.0d, now); - } else { - updaterMetrics.standbyRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now); - updaterMetrics.activeRestoreRatioSensor.record(0.0d, now); - } + recordRatios(now); totalCheckpointLatency = 0L; } + + private void initTimeWindowIfNeeded(final long now) { + if (!timeWindowInitialized) { + idleTimeWindowedSum.record(metricsConfig, 0.0, now); + checkpointTimeWindowedSum.record(metricsConfig, 0.0, now); + activeRestoreTimeWindowedSum.record(metricsConfig, 0.0, now); + standbyRestoreTimeWindowedSum.record(metricsConfig, 0.0, now); + runOnceLatencyWindowedSum.record(metricsConfig, 0.0, now); + timeWindowInitialized = true; + } + } + + private void recordWindowedSum(final long now, + final double idleTime, + final double checkpointTime, + final double activeRestoreTime, + final double standbyRestoreTime, + final double totalLatency) { + idleTimeWindowedSum.record(metricsConfig, idleTime, now); + checkpointTimeWindowedSum.record(metricsConfig, checkpointTime, now); + activeRestoreTimeWindowedSum.record(metricsConfig, activeRestoreTime, now); + standbyRestoreTimeWindowedSum.record(metricsConfig, standbyRestoreTime, now); + runOnceLatencyWindowedSum.record(metricsConfig, totalLatency, now); + } + + private void recordRatios(final long now) { + final double runOnceLatencyWindow = runOnceLatencyWindowedSum.measure(metricsConfig, now); + + recordRatio(now, runOnceLatencyWindow, idleTimeWindowedSum, updaterMetrics.idleRatioSensor); + recordRatio(now, runOnceLatencyWindow, checkpointTimeWindowedSum, updaterMetrics.checkpointRatioSensor); + recordRatio(now, runOnceLatencyWindow, activeRestoreTimeWindowedSum, updaterMetrics.activeRestoreRatioSensor); + recordRatio(now, runOnceLatencyWindow, standbyRestoreTimeWindowedSum, updaterMetrics.standbyRestoreRatioSensor); + } + + private void recordRatio(final long now, + final double runOnceLatencyWindow, + final WindowedSum windowedSum, + final Sensor ratioSensor) { + if (runOnceLatencyWindow > 0.0) { + final double elapsedTime = windowedSum.measure(metricsConfig, now); + ratioSensor.record(elapsedTime / runOnceLatencyWindow, now); + } else { + ratioSensor.record(0.0, now); + } + } + } private final Time time; @@ -1035,10 +1095,10 @@ private Stream streamOfNonPausedTasks() { private class StateUpdaterMetrics { private static final String STATE_LEVEL_GROUP = "stream-state-updater-metrics"; - private static final String IDLE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "being idle"; - private static final String RESTORE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "restoring active tasks"; - private static final String UPDATE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "updating standby tasks"; - private static final String CHECKPOINT_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "checkpointing tasks restored progress"; + private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "being idle"; + private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "restoring active tasks"; + private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "updating standby tasks"; + private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "checkpointing tasks restored progress"; private static final String RESTORE_RECORDS_RATE_DESCRIPTION = RATE_DESCRIPTION + "records restored"; private static final String RESTORE_RATE_DESCRIPTION = RATE_DESCRIPTION + "restore calls triggered"; @@ -1089,19 +1149,19 @@ private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threa allMetricNames.push(metricName); this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO); - this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Value()); allSensors.add(this.idleRatioSensor); this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO); - this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Value()); allSensors.add(this.activeRestoreRatioSensor); this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO); - this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Value()); allSensors.add(this.standbyRestoreRatioSensor); this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO); - this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg()); + this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Value()); allSensors.add(this.checkpointRatioSensor); this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 641fdf9470e21..a23a50e755aa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -150,12 +150,13 @@ public int hashCode() { public static final String OPERATIONS = " operations"; public static final String TOTAL_DESCRIPTION = "The total number of "; public static final String RATE_DESCRIPTION = "The average per-second number of "; - public static final String RATIO_DESCRIPTION = "The fraction of time the thread spent on "; public static final String AVG_LATENCY_DESCRIPTION = "The average latency of "; public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of "; public static final String LATENCY_DESCRIPTION_SUFFIX = " in milliseconds"; public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; + public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, "; + public static final String THREAD_TIME_UNIT_DESCRIPTION = "of the time this thread spent "; public static final String RECORD_E2E_LATENCY = "record-e2e-latency"; public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX = diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java index 73fa53ebcdd04..d9e8f668c137d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java @@ -31,7 +31,9 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORDS_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor; @@ -82,16 +84,16 @@ private ThreadMetrics() {} private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency"; private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency"; private static final String PROCESS_RATIO_DESCRIPTION = - "The ratio, over a rolling measurement window, of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "processing active tasks to the total elapsed time in that window."; private static final String PUNCTUATE_RATIO_DESCRIPTION = - "The ratio, over a rolling measurement window, of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "punctuating active tasks to the total elapsed time in that window."; private static final String POLL_RATIO_DESCRIPTION = - "The ratio, over a rolling measurement window, of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "polling records from the consumer to the total elapsed time in that window."; private static final String COMMIT_RATIO_DESCRIPTION = - "The ratio, over a rolling measurement window, of the time this thread spent " + + WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "committing all tasks to the total elapsed time in that window."; private static final String BLOCKED_TIME_DESCRIPTION = "The total time the thread spent blocked on kafka in nanoseconds"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java index 4f9d1b3c0e6b4..8087e28c8b338 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java @@ -1645,25 +1645,25 @@ public void shouldRecordMetrics() throws Exception { metricName = new MetricName("idle-ratio", "stream-state-updater-metrics", - "The fraction of time the thread spent on being idle", + "The ratio, over a rolling measurement window, of the time this thread spent being idle", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); metricName = new MetricName("active-restore-ratio", "stream-state-updater-metrics", - "The fraction of time the thread spent on restoring active tasks", + "The ratio, over a rolling measurement window, of the time this thread spent restoring active tasks", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d)); metricName = new MetricName("standby-update-ratio", "stream-state-updater-metrics", - "The fraction of time the thread spent on updating standby tasks", + "The ratio, over a rolling measurement window, of the time this thread spent updating standby tasks", tagMap); verifyMetric(metrics, metricName, is(0.0d)); metricName = new MetricName("checkpoint-ratio", "stream-state-updater-metrics", - "The fraction of time the thread spent on checkpointing tasks restored progress", + "The ratio, over a rolling measurement window, of the time this thread spent checkpointing tasks restored progress", tagMap); verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));