Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.LongSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,8 +25,12 @@ public class RateByServiceTraceSampler implements Sampler, PrioritySampler, Remo
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";

private static final double DEFAULT_RATE = 1.0;
private static final double MAX_RATE_INCREASE_FACTOR = 2.0;
static final long RAMP_UP_INTERVAL_NANOS = 1_000_000_000L;

private volatile RateSamplersByEnvAndService serviceRates = new RateSamplersByEnvAndService();
private long lastCappedNanos;
LongSupplier nanoTimeSupplier = System::nanoTime;

@Override
public <T extends CoreSpan<T>> boolean sample(final T span) {
Expand Down Expand Up @@ -62,6 +67,14 @@ private <T extends CoreSpan<T>> String getSpanEnv(final T span) {
return span.getTag("env", "");
}

static boolean shouldCap(double oldRate, double newRate) {
return oldRate != 0 && newRate > oldRate * MAX_RATE_INCREASE_FACTOR;
}

static double cappedRate(double oldRate) {
return oldRate * MAX_RATE_INCREASE_FACTOR;
}

@Override
public void onResponse(
final String endpoint, final Map<String, Map<String, Number>> responseJson) {
Expand All @@ -72,6 +85,13 @@ public void onResponse(
}

log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);

final RateSamplersByEnvAndService currentSnapshot = serviceRates;
final long now = nanoTimeSupplier.getAsLong();
final boolean canIncrease =
lastCappedNanos == 0 || (now - lastCappedNanos) >= RAMP_UP_INTERVAL_NANOS;
boolean anyCapped = false;

final TreeMap<String, TreeMap<String, RateSampler>> updatedEnvServiceRates =
new TreeMap<>(String::compareToIgnoreCase);

Expand All @@ -84,17 +104,38 @@ public void onResponse(

EnvAndService envAndService = EnvAndService.fromString(entry.getKey());
if (envAndService.isFallback()) {
double oldRate = currentSnapshot.getFallbackSampler().getSampleRate();
if (canIncrease && shouldCap(oldRate, rate)) {
rate = cappedRate(oldRate);
anyCapped = true;
} else if (!canIncrease && shouldCap(oldRate, rate)) {
rate = oldRate;
}
fallbackSampler = RateByServiceTraceSampler.createRateSampler(rate);
} else {
double oldRate =
currentSnapshot
.getSampler(envAndService.lowerEnv, envAndService.lowerService)
.getSampleRate();
if (canIncrease && shouldCap(oldRate, rate)) {
rate = cappedRate(oldRate);
anyCapped = true;
} else if (!canIncrease && shouldCap(oldRate, rate)) {
rate = oldRate;
}
final double effectiveRate = rate;
Map<String, RateSampler> serviceRates =
updatedEnvServiceRates.computeIfAbsent(
envAndService.lowerEnv, env -> new TreeMap<>(String::compareToIgnoreCase));

serviceRates.computeIfAbsent(
envAndService.lowerService,
service -> RateByServiceTraceSampler.createRateSampler(rate));
service -> RateByServiceTraceSampler.createRateSampler(effectiveRate));
}
}
if (canIncrease && anyCapped) {
lastCappedNanos = now;
}
serviceRates = new RateSamplersByEnvAndService(updatedEnvServiceRates, fallbackSampler);
}

Expand Down Expand Up @@ -128,6 +169,10 @@ private static final class RateSamplersByEnvAndService {
this.fallbackSampler = fallbackSampler;
}

RateSampler getFallbackSampler() {
return fallbackSampler;
}

// used in tests only
RateSampler getSampler(EnvAndService envAndService) {
return getSampler(envAndService.lowerEnv, envAndService.lowerService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,171 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification {
'manual.keep' | true | PrioritySampling.USER_KEEP
}

def "shouldCap returns false when rate decreases or stays same"() {
expect:
!RateByServiceTraceSampler.shouldCap(0.8, 0.4)
!RateByServiceTraceSampler.shouldCap(0.5, 0.5)
!RateByServiceTraceSampler.shouldCap(0.5, 1.0) // 1.0 <= 0.5 * 2, no cap needed
}

def "shouldCap returns false when old rate is zero"() {
expect:
!RateByServiceTraceSampler.shouldCap(0.0, 0.5)
!RateByServiceTraceSampler.shouldCap(0.0, 1.0)
}

def "shouldCap returns true when new rate exceeds 2x old rate"() {
expect:
RateByServiceTraceSampler.shouldCap(0.1, 1.0)
RateByServiceTraceSampler.shouldCap(0.2, 0.8)
RateByServiceTraceSampler.shouldCap(0.1, 0.3)
}

def "cappedRate returns 2x old rate"() {
expect:
RateByServiceTraceSampler.cappedRate(0.1) == 0.2
RateByServiceTraceSampler.cappedRate(0.2) == 0.4
RateByServiceTraceSampler.cappedRate(0.4) == 0.8
}

def "ramp-up caps rate increases at 2x per interval"() {
setup:
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler()
long currentTime = 1_000_000_000L
serviceSampler.nanoTimeSupplier = { -> currentTime }
def tolerance = 0.01

// Set initial rate to 0.1
String response = '{"rate_by_service": {"service:foo,env:bar":0.1, "service:,env:":0.1}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

expect:
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.1) < tolerance

when: "agent restart sends rate 1.0, first interval"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
response = '{"rate_by_service": {"service:foo,env:bar":1.0, "service:,env:":1.0}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate is capped at 2x = 0.2"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance

when: "second interval"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate doubles to 0.4"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.4) < tolerance

when: "third interval"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate doubles to 0.8"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.8) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.8) < tolerance

when: "fourth interval"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate reaches target 1.0 (2x=1.6 > 1.0)"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 1.0) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 1.0) < tolerance
}

def "ramp-down applies immediately"() {
setup:
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler()
long currentTime = 1_000_000_000L
serviceSampler.nanoTimeSupplier = { -> currentTime }
def tolerance = 0.01

// Set initial rate to 0.8
String response = '{"rate_by_service": {"service:foo,env:bar":0.8, "service:,env:":0.8}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

when: "rate decreases to 0.2"
response = '{"rate_by_service": {"service:foo,env:bar":0.2, "service:,env:":0.2}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "decrease is applied immediately"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance
}

def "rate increase blocked during cooldown"() {
setup:
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler()
long currentTime = 1_000_000_000L
serviceSampler.nanoTimeSupplier = { -> currentTime }
def tolerance = 0.01

// Set initial rate to 0.1
String response = '{"rate_by_service": {"service:foo,env:bar":0.1}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

when: "rate jumps, first capped increase"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
response = '{"rate_by_service": {"service:foo,env:bar":1.0}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "capped to 0.2"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance

when: "try again immediately (within cooldown)"
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate stays at 0.2 because cooldown hasn't elapsed"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance

when: "after cooldown elapsed"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate doubles to 0.4"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
}

def "cooldown not reset by blocked increase"() {
setup:
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler()
long currentTime = 1_000_000_000L
serviceSampler.nanoTimeSupplier = { -> currentTime }
def tolerance = 0.01

// Set initial low rate
String response = '{"rate_by_service": {"service:foo,env:bar":0.01}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

expect:
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.01) < tolerance

when: "wait for cooldown, apply increase: 0.01 -> 0.02"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS
response = '{"rate_by_service": {"service:foo,env:bar":1.0}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate is capped at 2x = 0.02"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance

when: "before cooldown elapses, send another increase - rate should be held and lastCapped NOT reset"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate stays at 0.02 (cooldown)"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance

when: "wait remaining half of cooldown from the original cap - should allow next ramp-up"
currentTime += RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate doubles to 0.04 because lastCapped was NOT reset by the blocked increase"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.04) < tolerance
}

def "not setting forced tracing via tag or setting it wrong value not causing exception"() {
setup:
def sampler = new RateByServiceTraceSampler()
Expand Down
Loading