Skip to content
4 changes: 4 additions & 0 deletions docs/streams/upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS v

Since 2.6.0 release, Kafka Streams depends on a RocksDB version that requires MacOS 10.14 or higher.

## Streams API changes in 4.3.0

The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, and `poll-ratio`, along with streams state updater metrics `active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and `checkpoint-ratio` have been updated. Each metric now reports, over a rolling measurement window, the ratio of time this thread spends performing the given action (`{action}`) to the total elapsed time in that window. The effective window duration is determined by the metrics configuration: `metrics.sample.window.ms` (per-sample window length) and `metrics.num.samples` (number of rolling windows).

## Streams API changes in 4.2.0

Kafka Streams now supports Dead Letter Queue (DLQ). A new config `errors.deadletterqueue.topic.name` allows to specify the name of the DLQ topic. When set and `DefaultProductionExceptionHandler` is used, records that cause exceptions will be forwarded to the DLQ topic. If a custom exception handler is used, it is up to the custom handler to build DLQ records to send, hence, depending on the implementation, the `errors.deadletterqueue.topic.name` configuration may be ignored. `org.apache.kafka.streams.errors.ProductionExceptionHandler$ProductionExceptionHandlerResponse` is deprecated and replaced by `org.apache.kafka.streams.errors.ProductionExceptionHandler$Response`. Methods `handle` and `handleSerializationException` in `org.apache.kafka.streams.errors.ProductionExceptionHandler` are deprecated and replaced by `handleError` and `handleSerializationError` respectively in order to use the new `Response` class. More details can be found in [KIP-1034](https://cwiki.apache.org/confluence/x/HwviEQ).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -67,8 +69,9 @@
import java.util.stream.Stream;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX;

public class DefaultStateUpdater implements StateUpdater {

Expand All @@ -84,6 +87,14 @@ private class StateUpdaterThread extends Thread {
private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
private final Map<TaskId, Task> pausedTasks = new ConcurrentHashMap<>();

private final WindowedSum idleTimeWindowedSum = new WindowedSum();
private final WindowedSum checkpointTimeWindowedSum = new WindowedSum();
private final WindowedSum activeRestoreTimeWindowedSum = new WindowedSum();
private final WindowedSum standbyRestoreTimeWindowedSum = new WindowedSum();
private final MetricConfig metricsConfig;

private boolean timeWindowInitialized = false;

private long totalCheckpointLatency = 0L;

private volatile long fetchDeadlineClientInstanceId = -1L;
Expand All @@ -95,6 +106,7 @@ public StateUpdaterThread(final String name,
super(name);
this.changelogReader = changelogReader;
this.updaterMetrics = new StateUpdaterMetrics(metrics, name);
this.metricsConfig = metrics.metricsRegistry().config();
}

public Collection<Task> updatingTasks() {
Expand Down Expand Up @@ -144,6 +156,7 @@ public long numPausedActiveTasks() {
public void run() {
log.info("State updater thread started");
try {
initTimeWindowIfNeeded(time.milliseconds());
while (isRunning.get()) {
runOnce();
}
Expand Down Expand Up @@ -713,19 +726,63 @@ private void measureCheckpointLatency(final Runnable actionToMeasure) {
private void recordMetrics(final long now, final long totalLatency, final long totalWaitLatency) {
final long totalRestoreLatency = Math.max(0L, totalLatency - totalWaitLatency - totalCheckpointLatency);

updaterMetrics.idleRatioSensor.record((double) totalWaitLatency / totalLatency, now);
updaterMetrics.checkpointRatioSensor.record((double) totalCheckpointLatency / totalLatency, now);
recordWindowedSum(
now,
totalWaitLatency,
totalCheckpointLatency,
totalRestoreLatency * (changelogReader.isRestoringActive() ? 1.0d : 0.0d),
totalRestoreLatency * (changelogReader.isRestoringActive() ? 0.0d : 1.0d)
);

if (changelogReader.isRestoringActive()) {
updaterMetrics.activeRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now);
updaterMetrics.standbyRestoreRatioSensor.record(0.0d, now);
} else {
updaterMetrics.standbyRestoreRatioSensor.record((double) totalRestoreLatency / totalLatency, now);
updaterMetrics.activeRestoreRatioSensor.record(0.0d, now);
}
recordRatios(now, totalLatency);

totalCheckpointLatency = 0L;
}

private void initTimeWindowIfNeeded(final long now) {
if (!timeWindowInitialized) {
idleTimeWindowedSum.record(metricsConfig, 0.0, now);
checkpointTimeWindowedSum.record(metricsConfig, 0.0, now);
activeRestoreTimeWindowedSum.record(metricsConfig, 0.0, now);
standbyRestoreTimeWindowedSum.record(metricsConfig, 0.0, now);
timeWindowInitialized = true;
}
}

private void recordWindowedSum(final long now,
final double idleTime,
final double checkpointTime,
final double activeRestoreTime,
final double standbyRestoreTime) {
idleTimeWindowedSum.record(metricsConfig, idleTime, now);
checkpointTimeWindowedSum.record(metricsConfig, checkpointTime, now);
activeRestoreTimeWindowedSum.record(metricsConfig, activeRestoreTime, now);
standbyRestoreTimeWindowedSum.record(metricsConfig, standbyRestoreTime, now);
}

private void recordRatios(final long now, final long totalTime) {
final double idleTime = idleTimeWindowedSum.measure(metricsConfig, now);
final double checkpointTime = checkpointTimeWindowedSum.measure(metricsConfig, now);
final double activeRestoreTime = activeRestoreTimeWindowedSum.measure(metricsConfig, now);
final double standbyRestoreTime = standbyRestoreTimeWindowedSum.measure(metricsConfig, now);

recordRatio(now, totalTime, idleTime, updaterMetrics.idleRatioSensor);
recordRatio(now, totalTime, checkpointTime, updaterMetrics.checkpointRatioSensor);
recordRatio(now, totalTime, activeRestoreTime, updaterMetrics.activeRestoreRatioSensor);
recordRatio(now, totalTime, standbyRestoreTime, updaterMetrics.standbyRestoreRatioSensor);
}

private void recordRatio(final long now,
final double totalTime,
final double elapsedTime,
final Sensor ratioSensor) {
if (totalTime > 0.0) {
ratioSensor.record(elapsedTime / totalTime, now);
} else {
ratioSensor.record(0.0, now);
}
}

}

private final Time time;
Expand Down Expand Up @@ -1035,10 +1092,10 @@ private Stream<Task> streamOfNonPausedTasks() {
private class StateUpdaterMetrics {
private static final String STATE_LEVEL_GROUP = "stream-state-updater-metrics";

private static final String IDLE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "being idle";
private static final String RESTORE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "restoring active tasks";
private static final String UPDATE_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "updating standby tasks";
private static final String CHECKPOINT_RATIO_DESCRIPTION = RATIO_DESCRIPTION + "checkpointing tasks restored progress";
private static final String IDLE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "being idle";
private static final String RESTORE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "restoring active tasks";
private static final String UPDATE_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "updating standby tasks";
private static final String CHECKPOINT_RATIO_DESCRIPTION = WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION + "checkpointing tasks restored progress";
private static final String RESTORE_RECORDS_RATE_DESCRIPTION = RATE_DESCRIPTION + "records restored";
private static final String RESTORE_RATE_DESCRIPTION = RATE_DESCRIPTION + "restore calls triggered";

Expand Down Expand Up @@ -1089,19 +1146,19 @@ private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final String threa
allMetricNames.push(metricName);

this.idleRatioSensor = metrics.threadLevelSensor(threadId, "idle-ratio", RecordingLevel.INFO);
this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
this.idleRatioSensor.add(new MetricName("idle-ratio", STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Value());
allSensors.add(this.idleRatioSensor);

this.activeRestoreRatioSensor = metrics.threadLevelSensor(threadId, "active-restore-ratio", RecordingLevel.INFO);
this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
this.activeRestoreRatioSensor.add(new MetricName("active-restore-ratio", STATE_LEVEL_GROUP, RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Value());
allSensors.add(this.activeRestoreRatioSensor);

this.standbyRestoreRatioSensor = metrics.threadLevelSensor(threadId, "standby-update-ratio", RecordingLevel.INFO);
this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
this.standbyRestoreRatioSensor.add(new MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION, threadLevelTags), new Value());
allSensors.add(this.standbyRestoreRatioSensor);

this.checkpointRatioSensor = metrics.threadLevelSensor(threadId, "checkpoint-ratio", RecordingLevel.INFO);
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio", STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Value());
allSensors.add(this.checkpointRatioSensor);

this.restoreSensor = metrics.threadLevelSensor(threadId, "restore-records", RecordingLevel.INFO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,13 @@ public int hashCode() {
public static final String OPERATIONS = " operations";
public static final String TOTAL_DESCRIPTION = "The total number of ";
public static final String RATE_DESCRIPTION = "The average per-second number of ";
public static final String RATIO_DESCRIPTION = "The fraction of time the thread spent on ";
public static final String AVG_LATENCY_DESCRIPTION = "The average latency of ";
public static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of ";
public static final String LATENCY_DESCRIPTION_SUFFIX = " in milliseconds";
public static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
public static final String RATE_DESCRIPTION_SUFFIX = " per second";
public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, ";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, ";
public static final String WINDOWED_RATIO_DESCRIPTION_PREFIX = "The ratio, over a rolling measurement window, of the time this thread spent ";

is it only used like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this prefix consts is only used for thread. But it could be used for describing against something other units in the future.
The name of this const doesn't carry meaning about the entity (e.g.time, thread) described, so I added another const here to address that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good, it’s your call, but I’d keep it simple and let whoever comes next make further adjustments if they’re ever needed.

public static final String THREAD_TIME_UNIT_DESCRIPTION = "of the time this thread spent ";

public static final String RECORD_E2E_LATENCY = "record-e2e-latency";
public static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORDS_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_TIME_UNIT_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.WINDOWED_RATIO_DESCRIPTION_PREFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
Expand Down Expand Up @@ -82,16 +84,16 @@ private ThreadMetrics() {}
private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency";
private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency";
private static final String PROCESS_RATIO_DESCRIPTION =
"The ratio, over a rolling measurement window, of the time this thread spent " +
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"processing active tasks to the total elapsed time in that window.";
private static final String PUNCTUATE_RATIO_DESCRIPTION =
"The ratio, over a rolling measurement window, of the time this thread spent " +
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"punctuating active tasks to the total elapsed time in that window.";
private static final String POLL_RATIO_DESCRIPTION =
"The ratio, over a rolling measurement window, of the time this thread spent " +
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"polling records from the consumer to the total elapsed time in that window.";
private static final String COMMIT_RATIO_DESCRIPTION =
"The ratio, over a rolling measurement window, of the time this thread spent " +
WINDOWED_RATIO_DESCRIPTION_PREFIX + THREAD_TIME_UNIT_DESCRIPTION +
"committing all tasks to the total elapsed time in that window.";
private static final String BLOCKED_TIME_DESCRIPTION =
"The total time the thread spent blocked on kafka in nanoseconds";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1645,25 +1645,25 @@ public void shouldRecordMetrics() throws Exception {

metricName = new MetricName("idle-ratio",
"stream-state-updater-metrics",
"The fraction of time the thread spent on being idle",
"The ratio, over a rolling measurement window, of the time this thread spent being idle",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));

metricName = new MetricName("active-restore-ratio",
"stream-state-updater-metrics",
"The fraction of time the thread spent on restoring active tasks",
"The ratio, over a rolling measurement window, of the time this thread spent restoring active tasks",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));

metricName = new MetricName("standby-update-ratio",
"stream-state-updater-metrics",
"The fraction of time the thread spent on updating standby tasks",
"The ratio, over a rolling measurement window, of the time this thread spent updating standby tasks",
tagMap);
verifyMetric(metrics, metricName, is(0.0d));

metricName = new MetricName("checkpoint-ratio",
"stream-state-updater-metrics",
"The fraction of time the thread spent on checkpointing tasks restored progress",
"The ratio, over a rolling measurement window, of the time this thread spent checkpointing tasks restored progress",
tagMap);
verifyMetric(metrics, metricName, greaterThanOrEqualTo(0.0d));

Expand Down