From c95286e6a59a4379242505180a9530ad11cf5ab1 Mon Sep 17 00:00:00 2001 From: "raphael.gavache" Date: Mon, 2 Mar 2026 14:11:18 -0500 Subject: [PATCH 1/2] Add capped sampling rate increases --- .../sampling/RateByServiceTraceSampler.java | 45 +++++- .../RateByServiceTraceSamplerTest.groovy | 129 ++++++++++++++++++ 2 files changed, 172 insertions(+), 2 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java b/dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java index 52957b643a0..0bd3610a24b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.function.Function; +import java.util.function.LongSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,8 +25,11 @@ public class RateByServiceTraceSampler implements Sampler, PrioritySampler, Remo public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr"; private static final double DEFAULT_RATE = 1.0; + static final long RAMP_UP_INTERVAL_NANOS = 1_000_000_000L; private volatile RateSamplersByEnvAndService serviceRates = new RateSamplersByEnvAndService(); + private long lastCappedNanos; + LongSupplier nanoTimeSupplier = System::nanoTime; @Override public > boolean sample(final T span) { @@ -62,6 +66,16 @@ private > String getSpanEnv(final T span) { return span.getTag("env", ""); } + static double cappedRate(double oldRate, double newRate, boolean canIncrease) { + if (newRate <= oldRate || oldRate == 0) { + return newRate; + } + if (!canIncrease) { + return oldRate; + } + return Math.min(oldRate * 2, newRate); + } + @Override public void onResponse( final String endpoint, final Map> responseJson) { @@ -72,6 +86,13 @@ public void onResponse( } log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson); + + final RateSamplersByEnvAndService currentSnapshot = serviceRates; + final long now = nanoTimeSupplier.getAsLong(); + final boolean canIncrease = + lastCappedNanos == 0 || (now - lastCappedNanos) >= RAMP_UP_INTERVAL_NANOS; + boolean anyCapped = false; + final TreeMap> updatedEnvServiceRates = new TreeMap<>(String::compareToIgnoreCase); @@ -84,17 +105,33 @@ public void onResponse( EnvAndService envAndService = EnvAndService.fromString(entry.getKey()); if (envAndService.isFallback()) { - fallbackSampler = RateByServiceTraceSampler.createRateSampler(rate); + double oldRate = currentSnapshot.getFallbackSampler().getSampleRate(); + double effective = cappedRate(oldRate, rate, canIncrease); + if (effective != rate) { + anyCapped = true; + } + fallbackSampler = RateByServiceTraceSampler.createRateSampler(effective); } else { + double oldRate = + currentSnapshot + .getSampler(envAndService.lowerEnv, envAndService.lowerService) + .getSampleRate(); + double effective = cappedRate(oldRate, rate, canIncrease); + if (effective != rate) { + anyCapped = true; + } Map serviceRates = updatedEnvServiceRates.computeIfAbsent( envAndService.lowerEnv, env -> new TreeMap<>(String::compareToIgnoreCase)); serviceRates.computeIfAbsent( envAndService.lowerService, - service -> RateByServiceTraceSampler.createRateSampler(rate)); + service -> RateByServiceTraceSampler.createRateSampler(effective)); } } + if (anyCapped) { + lastCappedNanos = now; + } serviceRates = new RateSamplersByEnvAndService(updatedEnvServiceRates, fallbackSampler); } @@ -128,6 +165,10 @@ private static final class RateSamplersByEnvAndService { this.fallbackSampler = fallbackSampler; } + RateSampler getFallbackSampler() { + return fallbackSampler; + } + // used in tests only RateSampler getSampler(EnvAndService envAndService) { return getSampler(envAndService.lowerEnv, envAndService.lowerService); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy index 7d79eb2ee91..c0bd8404825 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy @@ -222,6 +222,135 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification { 'manual.keep' | true | PrioritySampling.USER_KEEP } + def "cappedRate returns new rate when decreasing"() { + expect: + RateByServiceTraceSampler.cappedRate(0.8, 0.4, true) == 0.4 + RateByServiceTraceSampler.cappedRate(0.8, 0.4, false) == 0.4 + RateByServiceTraceSampler.cappedRate(0.5, 0.5, true) == 0.5 + RateByServiceTraceSampler.cappedRate(0.5, 0.5, false) == 0.5 + } + + def "cappedRate returns new rate when old rate is zero"() { + expect: + RateByServiceTraceSampler.cappedRate(0.0, 0.5, true) == 0.5 + RateByServiceTraceSampler.cappedRate(0.0, 0.5, false) == 0.5 + } + + def "cappedRate caps increase to 2x when canIncrease"() { + expect: + RateByServiceTraceSampler.cappedRate(0.1, 1.0, true) == 0.2 + RateByServiceTraceSampler.cappedRate(0.2, 1.0, true) == 0.4 + RateByServiceTraceSampler.cappedRate(0.4, 1.0, true) == 0.8 + RateByServiceTraceSampler.cappedRate(0.4, 0.5, true) == 0.5 + } + + def "cappedRate holds old rate when canIncrease is false"() { + expect: + RateByServiceTraceSampler.cappedRate(0.1, 1.0, false) == 0.1 + RateByServiceTraceSampler.cappedRate(0.2, 0.8, false) == 0.2 + } + + def "ramp-up caps rate increases at 2x per interval"() { + setup: + RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler() + long currentTime = 1_000_000_000L + serviceSampler.nanoTimeSupplier = { -> currentTime } + def tolerance = 0.01 + + // Set initial rate to 0.1 + String response = '{"rate_by_service": {"service:foo,env:bar":0.1, "service:,env:":0.1}}' + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + expect: + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.1) < tolerance + + when: "agent restart sends rate 1.0, first interval" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS + response = '{"rate_by_service": {"service:foo,env:bar":1.0, "service:,env:":1.0}}' + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate is capped at 2x = 0.2" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance + Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance + + when: "second interval" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate doubles to 0.4" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance + Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.4) < tolerance + + when: "third interval" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate doubles to 0.8" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.8) < tolerance + Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.8) < tolerance + + when: "fourth interval" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate reaches target 1.0 (2x=1.6 > 1.0)" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 1.0) < tolerance + Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 1.0) < tolerance + } + + def "ramp-down applies immediately"() { + setup: + RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler() + long currentTime = 1_000_000_000L + serviceSampler.nanoTimeSupplier = { -> currentTime } + def tolerance = 0.01 + + // Set initial rate to 0.8 + String response = '{"rate_by_service": {"service:foo,env:bar":0.8, "service:,env:":0.8}}' + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + when: "rate decreases to 0.2" + response = '{"rate_by_service": {"service:foo,env:bar":0.2, "service:,env:":0.2}}' + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "decrease is applied immediately" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance + Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance + } + + def "rate increase blocked during cooldown"() { + setup: + RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler() + long currentTime = 1_000_000_000L + serviceSampler.nanoTimeSupplier = { -> currentTime } + def tolerance = 0.01 + + // Set initial rate to 0.1 + String response = '{"rate_by_service": {"service:foo,env:bar":0.1}}' + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + when: "rate jumps, first capped increase" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS + response = '{"rate_by_service": {"service:foo,env:bar":1.0}}' + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "capped to 0.2" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance + + when: "try again immediately (within cooldown)" + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate stays at 0.2 because cooldown hasn't elapsed" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance + + when: "after cooldown elapsed" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate doubles to 0.4" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance + } + def "not setting forced tracing via tag or setting it wrong value not causing exception"() { setup: def sampler = new RateByServiceTraceSampler() From 866193c87750fd205c5bffe360a8cd74f5300567 Mon Sep 17 00:00:00 2001 From: "raphael.gavache" Date: Thu, 5 Mar 2026 17:18:17 -0500 Subject: [PATCH 2/2] fix --- .../sampling/RateByServiceTraceSampler.java | 34 ++++++---- .../RateByServiceTraceSamplerTest.groovy | 68 ++++++++++++++----- 2 files changed, 71 insertions(+), 31 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java b/dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java index 0bd3610a24b..6eb609f4efa 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/sampling/RateByServiceTraceSampler.java @@ -25,6 +25,7 @@ public class RateByServiceTraceSampler implements Sampler, PrioritySampler, Remo public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr"; private static final double DEFAULT_RATE = 1.0; + private static final double MAX_RATE_INCREASE_FACTOR = 2.0; static final long RAMP_UP_INTERVAL_NANOS = 1_000_000_000L; private volatile RateSamplersByEnvAndService serviceRates = new RateSamplersByEnvAndService(); @@ -66,14 +67,12 @@ private > String getSpanEnv(final T span) { return span.getTag("env", ""); } - static double cappedRate(double oldRate, double newRate, boolean canIncrease) { - if (newRate <= oldRate || oldRate == 0) { - return newRate; - } - if (!canIncrease) { - return oldRate; - } - return Math.min(oldRate * 2, newRate); + static boolean shouldCap(double oldRate, double newRate) { + return oldRate != 0 && newRate > oldRate * MAX_RATE_INCREASE_FACTOR; + } + + static double cappedRate(double oldRate) { + return oldRate * MAX_RATE_INCREASE_FACTOR; } @Override @@ -106,30 +105,35 @@ public void onResponse( EnvAndService envAndService = EnvAndService.fromString(entry.getKey()); if (envAndService.isFallback()) { double oldRate = currentSnapshot.getFallbackSampler().getSampleRate(); - double effective = cappedRate(oldRate, rate, canIncrease); - if (effective != rate) { + if (canIncrease && shouldCap(oldRate, rate)) { + rate = cappedRate(oldRate); anyCapped = true; + } else if (!canIncrease && shouldCap(oldRate, rate)) { + rate = oldRate; } - fallbackSampler = RateByServiceTraceSampler.createRateSampler(effective); + fallbackSampler = RateByServiceTraceSampler.createRateSampler(rate); } else { double oldRate = currentSnapshot .getSampler(envAndService.lowerEnv, envAndService.lowerService) .getSampleRate(); - double effective = cappedRate(oldRate, rate, canIncrease); - if (effective != rate) { + if (canIncrease && shouldCap(oldRate, rate)) { + rate = cappedRate(oldRate); anyCapped = true; + } else if (!canIncrease && shouldCap(oldRate, rate)) { + rate = oldRate; } + final double effectiveRate = rate; Map serviceRates = updatedEnvServiceRates.computeIfAbsent( envAndService.lowerEnv, env -> new TreeMap<>(String::compareToIgnoreCase)); serviceRates.computeIfAbsent( envAndService.lowerService, - service -> RateByServiceTraceSampler.createRateSampler(effective)); + service -> RateByServiceTraceSampler.createRateSampler(effectiveRate)); } } - if (anyCapped) { + if (canIncrease && anyCapped) { lastCappedNanos = now; } serviceRates = new RateSamplersByEnvAndService(updatedEnvServiceRates, fallbackSampler); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy index c0bd8404825..5844fc9d9ee 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/sampling/RateByServiceTraceSamplerTest.groovy @@ -222,32 +222,31 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification { 'manual.keep' | true | PrioritySampling.USER_KEEP } - def "cappedRate returns new rate when decreasing"() { + def "shouldCap returns false when rate decreases or stays same"() { expect: - RateByServiceTraceSampler.cappedRate(0.8, 0.4, true) == 0.4 - RateByServiceTraceSampler.cappedRate(0.8, 0.4, false) == 0.4 - RateByServiceTraceSampler.cappedRate(0.5, 0.5, true) == 0.5 - RateByServiceTraceSampler.cappedRate(0.5, 0.5, false) == 0.5 + !RateByServiceTraceSampler.shouldCap(0.8, 0.4) + !RateByServiceTraceSampler.shouldCap(0.5, 0.5) + !RateByServiceTraceSampler.shouldCap(0.5, 1.0) // 1.0 <= 0.5 * 2, no cap needed } - def "cappedRate returns new rate when old rate is zero"() { + def "shouldCap returns false when old rate is zero"() { expect: - RateByServiceTraceSampler.cappedRate(0.0, 0.5, true) == 0.5 - RateByServiceTraceSampler.cappedRate(0.0, 0.5, false) == 0.5 + !RateByServiceTraceSampler.shouldCap(0.0, 0.5) + !RateByServiceTraceSampler.shouldCap(0.0, 1.0) } - def "cappedRate caps increase to 2x when canIncrease"() { + def "shouldCap returns true when new rate exceeds 2x old rate"() { expect: - RateByServiceTraceSampler.cappedRate(0.1, 1.0, true) == 0.2 - RateByServiceTraceSampler.cappedRate(0.2, 1.0, true) == 0.4 - RateByServiceTraceSampler.cappedRate(0.4, 1.0, true) == 0.8 - RateByServiceTraceSampler.cappedRate(0.4, 0.5, true) == 0.5 + RateByServiceTraceSampler.shouldCap(0.1, 1.0) + RateByServiceTraceSampler.shouldCap(0.2, 0.8) + RateByServiceTraceSampler.shouldCap(0.1, 0.3) } - def "cappedRate holds old rate when canIncrease is false"() { + def "cappedRate returns 2x old rate"() { expect: - RateByServiceTraceSampler.cappedRate(0.1, 1.0, false) == 0.1 - RateByServiceTraceSampler.cappedRate(0.2, 0.8, false) == 0.2 + RateByServiceTraceSampler.cappedRate(0.1) == 0.2 + RateByServiceTraceSampler.cappedRate(0.2) == 0.4 + RateByServiceTraceSampler.cappedRate(0.4) == 0.8 } def "ramp-up caps rate increases at 2x per interval"() { @@ -351,6 +350,43 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification { Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance } + def "cooldown not reset by blocked increase"() { + setup: + RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler() + long currentTime = 1_000_000_000L + serviceSampler.nanoTimeSupplier = { -> currentTime } + def tolerance = 0.01 + + // Set initial low rate + String response = '{"rate_by_service": {"service:foo,env:bar":0.01}}' + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + expect: + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.01) < tolerance + + when: "wait for cooldown, apply increase: 0.01 -> 0.02" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS + response = '{"rate_by_service": {"service:foo,env:bar":1.0}}' + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate is capped at 2x = 0.02" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance + + when: "before cooldown elapses, send another increase - rate should be held and lastCapped NOT reset" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2 + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate stays at 0.02 (cooldown)" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance + + when: "wait remaining half of cooldown from the original cap - should allow next ramp-up" + currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2 + serviceSampler.onResponse("traces", serializer.fromJson(response)) + + then: "rate doubles to 0.04 because lastCapped was NOT reset by the blocked increase" + Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.04) < tolerance + } + def "not setting forced tracing via tag or setting it wrong value not causing exception"() { setup: def sampler = new RateByServiceTraceSampler()