From 4cbda770c9ed99e3824781b0c1ea3390f5a78836 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Sun, 1 Mar 2026 22:28:16 +0100 Subject: [PATCH] capture latency metrics --- .../spark/SparkLauncherListener.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java index fc1efb6894c..c926b42b1ef 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/SparkLauncherListener.java @@ -25,6 +25,11 @@ public class SparkLauncherListener implements SparkAppHandle.Listener { private static volatile boolean shutdownHookRegistered = false; + private static long spanStartTimeMs = 0L; + private static long connectedTimeMs = 0L; + private static long submittedTimeMs = 0L; + private static long runningTimeMs = 0L; + public static synchronized void createLauncherSpan(Object launcher) { if (launcherSpan != null) { return; @@ -40,6 +45,10 @@ public static synchronized void createLauncherSpan(Object launcher) { span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS); setLauncherConfigTags(span, launcher); captureEmrStepId(span); + spanStartTimeMs = System.currentTimeMillis(); + connectedTimeMs = 0L; + submittedTimeMs = 0L; + runningTimeMs = 0L; launcherSpan = span; if (!shutdownHookRegistered) { @@ -52,6 +61,7 @@ public static synchronized void createLauncherSpan(Object launcher) { AgentSpan s = launcherSpan; if (s != null) { log.info("Finishing spark.launcher span from shutdown hook"); + setTimingMetrics(s); s.finish(); launcherSpan = null; } @@ -70,6 +80,7 @@ public static synchronized void finishSpan(boolean isError, String errorMessage) span.setTag(DDTags.ERROR_TYPE, "Spark Launcher Failed"); span.setTag(DDTags.ERROR_MSG, errorMessage); } + setTimingMetrics(span); span.finish(); launcherSpan = null; } @@ -82,10 +93,26 @@ public static synchronized void finishSpanWithThrowable(Throwable throwable) { if (throwable != null) { span.addThrowable(throwable); } + setTimingMetrics(span); span.finish(); launcherSpan = null; } + private static void setTimingMetrics(AgentSpan span) { + if (spanStartTimeMs <= 0L) { + return; + } + if (connectedTimeMs > 0L) { + span.setMetric("spark.launcher.time_to_connected_ms", connectedTimeMs - spanStartTimeMs); + } + if (submittedTimeMs > 0L) { + span.setMetric("spark.launcher.time_to_submitted_ms", submittedTimeMs - spanStartTimeMs); + } + if (runningTimeMs > 0L) { + span.setMetric("spark.launcher.time_to_running_ms", runningTimeMs - spanStartTimeMs); + } + } + @Override public void stateChanged(SparkAppHandle handle) { synchronized (SparkLauncherListener.class) { @@ -93,6 +120,7 @@ public void stateChanged(SparkAppHandle handle) { AgentSpan span = launcherSpan; if (span != null) { span.setTag("spark.launcher.app_state", state.toString()); + recordStateTimestamp(state); String appId = handle.getAppId(); if (appId != null) { @@ -132,6 +160,23 @@ public void infoChanged(SparkAppHandle handle) { } } + private static void recordStateTimestamp(SparkAppHandle.State state) { + long now = System.currentTimeMillis(); + if (state == SparkAppHandle.State.CONNECTED) { + if (connectedTimeMs == 0L) { + connectedTimeMs = now; + } + } else if (state == SparkAppHandle.State.SUBMITTED) { + if (submittedTimeMs == 0L) { + submittedTimeMs = now; + } + } else if (state == SparkAppHandle.State.RUNNING) { + if (runningTimeMs == 0L) { + runningTimeMs = now; + } + } + } + private static void captureEmrStepId(AgentSpan span) { String stepId = EmrUtils.getEmrStepId(); if (stepId != null) {