From 9f1030b7c18fee1d3d11724858dbcab9ed3fb89f Mon Sep 17 00:00:00 2001 From: Vladimir Zhuk Date: Thu, 19 Mar 2026 10:34:09 +0100 Subject: [PATCH 1/4] chore(datadog_metrics sink): switch series v2 endpoint to zstd compression Co-Authored-By: Claude Sonnet 4.6 --- ...adog_metrics_zstd_series_v2.enhancement.md | 3 + src/sinks/datadog/metrics/config.rs | 33 +- src/sinks/datadog/metrics/encoder.rs | 814 ++++++++++++------ src/sinks/datadog/metrics/request_builder.rs | 20 +- src/sinks/datadog/metrics/service.rs | 8 +- 5 files changed, 606 insertions(+), 272 deletions(-) create mode 100644 changelog.d/24956_datadog_metrics_zstd_series_v2.enhancement.md diff --git a/changelog.d/24956_datadog_metrics_zstd_series_v2.enhancement.md b/changelog.d/24956_datadog_metrics_zstd_series_v2.enhancement.md new file mode 100644 index 0000000000000..03f1f0507037a --- /dev/null +++ b/changelog.d/24956_datadog_metrics_zstd_series_v2.enhancement.md @@ -0,0 +1,3 @@ +The `datadog_metrics` sink now uses zstd compression when submitting metrics to the Series v2 endpoint (`/api/v2/series`). Series v1 and Sketches continue to use zlib (deflate). + +authors: vladimir-dd diff --git a/src/sinks/datadog/metrics/config.rs b/src/sinks/datadog/metrics/config.rs index e296bd823d60c..4d5674464421d 100644 --- a/src/sinks/datadog/metrics/config.rs +++ b/src/sinks/datadog/metrics/config.rs @@ -87,11 +87,6 @@ impl DatadogMetricsEndpoint { } } - // Gets whether or not this is a series endpoint. - pub const fn is_series(self) -> bool { - matches!(self, Self::Series { .. }) - } - pub(super) const fn payload_limits(self) -> DatadogMetricsPayloadLimits { // from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics let (uncompressed, compressed) = match self { @@ -112,6 +107,32 @@ impl DatadogMetricsEndpoint { compressed, } } + + /// Returns the compression scheme used for this endpoint. + pub(super) const fn compression(self) -> DatadogMetricsCompression { + match self { + Self::Series(SeriesApiVersion::V2) => DatadogMetricsCompression::Zstd, + _ => DatadogMetricsCompression::Zlib, + } + } +} + +/// Selects the compressor for a given Datadog metrics endpoint. +#[derive(Clone, Copy, Debug)] +pub(super) enum DatadogMetricsCompression { + /// zlib (deflate) — used by Series v1 and Sketches. + Zlib, + /// zstd — used by Series v2. + Zstd, +} + +impl DatadogMetricsCompression { + pub(super) const fn content_encoding(self) -> &'static str { + match self { + Self::Zstd => "zstd", + Self::Zlib => "deflate", + } + } } /// Maps Datadog metric endpoints to their actual URI. @@ -270,7 +291,7 @@ impl DatadogMetricsConfig { endpoint_configuration, self.default_namespace.clone(), self.series_api_version, - )?; + ); let protocol = self.get_protocol(dd_common); let sink = DatadogMetricsSink::new( diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 9f0b71c87ba98..7e99c70567a2b 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -16,7 +16,7 @@ use vector_lib::{ request_metadata::GroupedCountByteSize, }; -use super::config::{DatadogMetricsEndpoint, SeriesApiVersion}; +use super::config::{DatadogMetricsCompression, DatadogMetricsEndpoint, SeriesApiVersion}; use crate::{ common::datadog::{ DatadogMetricType, DatadogPoint, DatadogSeriesMetric, DatadogSeriesMetricMetadata, @@ -47,23 +47,6 @@ mod ddmetric_proto { include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs")); } -#[derive(Debug, Snafu)] -pub enum CreateError { - #[snafu(display("Invalid compressed/uncompressed payload size limits were given"))] - InvalidLimits, -} - -impl CreateError { - /// Gets the telemetry-friendly string version of this error. - /// - /// The value will be a short string with only lowercase letters and underscores. - pub const fn as_error_type(&self) -> &'static str { - match self { - Self::InvalidLimits => "invalid_payload_limits", - } - } -} - #[derive(Debug, Snafu)] pub enum EncoderError { #[snafu(display( @@ -133,6 +116,16 @@ impl FinishError { struct EncoderState { writer: Compressor, written: usize, + /// Upper bound on uncompressed bytes sitting in the compressor's internal buffer (written but + /// not yet flushed to `writer.get_ref()`). All compressors may buffer internally: zstd holds + /// up to 128 KB per block, zlib's BufWriter holds up to 4 KB. Since `get_ref().len()` only + /// reflects bytes that have been flushed through all layers, we track this bound to avoid + /// underestimating the compressed payload size. + /// + /// Increases by `n` on each write. Resets to `n` when a new compressed block is detected in + /// `writer.get_ref()` (the triggering write may straddle the block boundary, so `n` is a safe + /// upper bound on what remains buffered after the flush). + buffered_bound: usize, buf: Vec, processed: Vec, byte_size: GroupedCountByteSize, @@ -141,8 +134,9 @@ struct EncoderState { impl Default for EncoderState { fn default() -> Self { Self { - writer: get_compressor(), + writer: Compression::zlib_default().into(), written: 0, + buffered_bound: 0, buf: Vec::with_capacity(1024), processed: Vec::new(), byte_size: telemetry().create_request_count_byte_size(), @@ -164,45 +158,62 @@ pub struct DatadogMetricsEncoder { impl DatadogMetricsEncoder { /// Creates a new `DatadogMetricsEncoder` for the given endpoint. - pub fn new( - endpoint: DatadogMetricsEndpoint, - default_namespace: Option, - ) -> Result { + pub fn new(endpoint: DatadogMetricsEndpoint, default_namespace: Option) -> Self { let payload_limits = endpoint.payload_limits(); - Self::with_payload_limits( + + Self { endpoint, - default_namespace, - payload_limits.uncompressed, - payload_limits.compressed, - ) + default_namespace: default_namespace.map(Arc::from), + uncompressed_limit: payload_limits.uncompressed, + compressed_limit: payload_limits.compressed, + state: EncoderState { + writer: endpoint.compression().compressor(), + ..Default::default() + }, + log_schema: log_schema(), + origin_product_value: *ORIGIN_PRODUCT_VALUE, + } } +} +#[cfg(test)] +impl DatadogMetricsEncoder { /// Creates a new `DatadogMetricsEncoder` for the given endpoint, with specific payload limits. + /// + /// Only available in tests; production code always uses the API-defined limits via `new`. pub fn with_payload_limits( endpoint: DatadogMetricsEndpoint, default_namespace: Option, uncompressed_limit: usize, compressed_limit: usize, - ) -> Result { - let (uncompressed_limit, compressed_limit) = - validate_payload_size_limits(endpoint, uncompressed_limit, compressed_limit) - .ok_or(CreateError::InvalidLimits)?; - - Ok(Self { + ) -> Self { + Self { endpoint, default_namespace: default_namespace.map(Arc::from), uncompressed_limit, compressed_limit, - state: EncoderState::default(), + state: EncoderState { + writer: endpoint.compression().compressor(), + ..Default::default() + }, log_schema: log_schema(), origin_product_value: *ORIGIN_PRODUCT_VALUE, - }) + } + } + + /// Returns the current `buffered_bound` value for white-box testing of zstd block-flush reset. + fn buffered_bound(&self) -> usize { + self.state.buffered_bound } } impl DatadogMetricsEncoder { fn reset_state(&mut self) -> EncoderState { - mem::take(&mut self.state) + let new_state = EncoderState { + writer: self.endpoint.compression().compressor(), + ..Default::default() + }; + mem::replace(&mut self.state, new_state) } fn encode_single_metric(&mut self, metric: Metric) -> Result, EncoderError> { @@ -342,24 +353,36 @@ impl DatadogMetricsEncoder { // compressor might have internal state around checksums, etc, that can't be similarly // rolled back. // - // Our strategy is thus: simply take the encoded-but-decompressed size and see if it would - // fit within the compressed limit. In `get_endpoint_payload_size_limits`, we calculate the - // expected maximum overhead of zlib based on all input data being incompressible, which - // zlib ensures will be the worst case as it can figure out whether a compressed or - // uncompressed block would take up more space _before_ choosing which strategy to go with. + // Strategy: split the estimate into two parts: + // 1. Bytes already flushed to the output buffer (`get_ref().len()`) — exact compressed size. + // 2. Bytes still in the compressor's internal buffer plus this new metric — estimated via + // max_compressed_size(buffered_bound + n) (worst-case upper bound). // - // Thus, simply put, we've already accounted for the uncertainty by making our check here - // assume the worst case while our limits assume the worst case _overhead_. Maybe our - // numbers are technically off in the end, but `finish` catches that for us, too. - let compressed_len = self.state.writer.get_ref().len(); - let max_compressed_metric_len = n + max_compressed_overhead_len(n); - if compressed_len + max_compressed_metric_len > self.compressed_limit { + // All compressors may buffer data internally before flushing to the output: zstd buffers + // up to 128 KB per block, zlib's BufWriter holds up to 4 KB. `get_ref().len()` only + // reflects bytes that have been flushed through all layers. We track `buffered_bound` — + // an upper bound on uncompressed bytes written but not yet visible in `get_ref()` — and + // include it in the estimate for all compressor types. + let compression = self.endpoint.compression(); + let flushed_compressed = self.state.writer.get_ref().len(); + if flushed_compressed + compression.max_compressed_size(self.state.buffered_bound + n) + > self.compressed_limit + { return Ok(false); } // We should be safe to write our holding buffer to the compressor and store the metric. + // + // Update buffered_bound: if a new block appeared in the output (flushed_compressed grew), + // reset to n — the triggering write may straddle the block boundary, so n is a safe upper + // bound on what remains buffered. Otherwise accumulate. self.state.writer.write_all(&self.state.buf)?; self.state.written += n; + if self.state.writer.get_ref().len() > flushed_compressed { + self.state.buffered_bound = n; + } else { + self.state.buffered_bound += n; + } Ok(true) } @@ -383,7 +406,10 @@ impl DatadogMetricsEncoder { // Make sure we've written our header already. if self.state.written == 0 { match write_payload_header(self.endpoint, &mut self.state.writer) { - Ok(n) => self.state.written += n, + Ok(n) => { + self.state.written += n; + self.state.buffered_bound += n; + } Err(_) => return Ok(Some(metric)), } } @@ -874,75 +900,47 @@ fn generate_series_metrics( }]) } -fn get_compressor() -> Compressor { - // We use the "zlib default" compressor because it's all Datadog supports, and adding it - // generically to `Compression` would make things a little weird because of the conversion trait - // implementations that are also only none vs gzip. - Compression::zlib_default().into() -} - -const fn max_uncompressed_header_len() -> usize { - SERIES_PAYLOAD_HEADER.len() + SERIES_PAYLOAD_FOOTER.len() -} - -// Datadog ingest APIs accept zlib, which is what we're accounting for here. By default, zlib -// has a 2 byte header and 4 byte CRC trailer. [1] -// -// [1] https://www.zlib.net/zlib_tech.html -const ZLIB_HEADER_TRAILER: usize = 6; - -const fn max_compression_overhead_len(compressed_limit: usize) -> usize { - // We calculate the overhead as the zlib header/trailer plus the worst case overhead of - // compressing `compressed_limit` bytes, such that we assume all of the data we write may not be - // compressed at all. - ZLIB_HEADER_TRAILER + max_compressed_overhead_len(compressed_limit) -} - -const fn max_compressed_overhead_len(len: usize) -> usize { - // Datadog ingest APIs accept zlib, which is what we're accounting for here. - // - // Deflate, the underlying compression algorithm, has a technique to ensure that input data - // can't be encoded in such a way where it's expanded by a meaningful amount. This technique - // allows storing blocks of uncompressed data with only 5 bytes of overhead per block. - // Technically, the blocks can be up to 65KB in Deflate, but modern zlib implementations use - // block sizes of 16KB. [1][2] - // - // We calculate the overhead of compressing a given `len` bytes as the worst case of that many - // bytes being written to the compressor and being unable to be compressed at all - // - // [1] https://www.zlib.net/zlib_tech.html - // [2] https://www.bolet.org/~pornin/deflate-flush-fr.html - const STORED_BLOCK_SIZE: usize = 16384; - (1 + len.saturating_sub(ZLIB_HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5 -} - -const fn validate_payload_size_limits( - endpoint: DatadogMetricsEndpoint, - uncompressed_limit: usize, - compressed_limit: usize, -) -> Option<(usize, usize)> { - if endpoint.is_series() { - // For series, we need to make sure the uncompressed limit can account for the header/footer - // we would add that wraps the encoded metrics up in the expected JSON object. This does - // imply that adding 1 to this limit would be allowed, and obviously we can't encode a - // series metric in a single byte, but this is just a simple sanity check, not an exhaustive - // search of the absolute bare minimum size. - let header_len = max_uncompressed_header_len(); - if uncompressed_limit <= header_len { - return None; +impl DatadogMetricsCompression { + fn compressor(self) -> Compressor { + match self { + Self::Zstd => Compression::zstd_default().into(), + Self::Zlib => Compression::zlib_default().into(), } } - // Get the maximum possible overhead of the compression container, based on the incoming - // _uncompressed_ limit. We use the uncompressed limit because we're calculating the maximum - // overhead in the case that, theoretically, none of the input data was compressible. This - // possibility is essentially impossible, but serves as a proper worst-case-scenario check. - let max_compression_overhead = max_compression_overhead_len(uncompressed_limit); - if compressed_limit <= max_compression_overhead { - return None; + /// Returns the worst-case compressed size of `n` uncompressed bytes. + /// + /// For zlib (deflate), the worst case occurs when data is entirely incompressible and stored in + /// uncompressed blocks (5 bytes overhead per 16 KB block, as per the DEFLATE spec). + /// + /// For zstd, this uses the same formula as `ZSTD_compressBound` from the zstd C library. + const fn max_compressed_size(self, n: usize) -> usize { + match self { + Self::Zlib => { + // Deflate stores incompressible data in uncompressed blocks of up to 16 KB, + // each with 5 bytes of overhead (1 byte header + 2 byte length + 2 byte ~length). + // We subtract the zlib frame (2-byte header + 4-byte CRC trailer) from the block + // count since those bytes are not stored-block data. + // See: https://www.zlib.net/zlib_tech.html + const STORED_BLOCK_SIZE: usize = 16384; + const STORED_BLOCK_OVERHEAD: usize = 5; + const ZLIB_FRAME: usize = 6; + n + (1 + n.saturating_sub(ZLIB_FRAME) / STORED_BLOCK_SIZE) * STORED_BLOCK_OVERHEAD + } + Self::Zstd => { + // zstd_safe::compress_bound is not const, so we use the same formula it uses + // internally: srcSize + (srcSize >> 8) + small correction for inputs < 128 KB. + // See: https://github.com/facebook/zstd/blob/dev/lib/zstd.h (ZSTD_compressBound) + const ZSTD_128KB: usize = 128 << 10; + n + (n >> 8) + + if n < ZSTD_128KB { + (ZSTD_128KB - n) >> 11 + } else { + 0 + } + } + } } - - Some((uncompressed_limit, compressed_limit)) } fn write_payload_header( @@ -983,15 +981,10 @@ fn write_payload_footer( #[cfg(test)] mod tests { - use std::{ - io::{self, copy}, - num::NonZeroU32, - sync::Arc, - }; + use std::{io, num::NonZeroU32, sync::Arc}; - use bytes::{BufMut, Bytes, BytesMut}; + use bytes::{BufMut, Bytes}; use chrono::{DateTime, TimeZone, Timelike, Utc}; - use flate2::read::ZlibDecoder; use proptest::{ arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert, proptest, strategy::Strategy, string::string_regex, @@ -1010,19 +1003,30 @@ mod tests { use super::{ DatadogMetricsEncoder, EncoderError, ddmetric_proto, encode_proto_key_and_message, - encode_tags, encode_timestamp, generate_series_metrics, get_compressor, - get_sketch_payload_sketches_field_number, max_compression_overhead_len, - max_uncompressed_header_len, series_to_proto_message, sketch_to_proto_message, - validate_payload_size_limits, write_payload_footer, write_payload_header, + encode_tags, encode_timestamp, generate_series_metrics, + get_sketch_payload_sketches_field_number, series_to_proto_message, sketch_to_proto_message, + write_payload_footer, write_payload_header, }; use crate::{ common::datadog::DatadogMetricType, - sinks::datadog::metrics::{ - config::{DatadogMetricsEndpoint, SeriesApiVersion}, - encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE}, + sinks::{ + datadog::metrics::{ + config::{DatadogMetricsCompression, DatadogMetricsEndpoint, SeriesApiVersion}, + encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE}, + }, + util::{Compression, Compressor}, }, }; + const fn max_uncompressed_header_len(endpoint: DatadogMetricsEndpoint) -> usize { + match endpoint { + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => { + super::SERIES_PAYLOAD_HEADER.len() + super::SERIES_PAYLOAD_FOOTER.len() + } + _ => 0, + } + } + fn get_simple_counter() -> Metric { let value = MetricValue::Counter { value: 3.14 }; Metric::new("basic_counter", MetricKind::Incremental, value).with_timestamp(Some(ts())) @@ -1048,8 +1052,8 @@ mod tests { .with_timestamp(Some(ts())) } - fn get_compressed_empty_series_payload() -> Bytes { - let mut compressor = get_compressor(); + fn get_compressed_empty_series_v1_payload() -> Bytes { + let mut compressor = Compressor::from(Compression::zlib_default()); _ = write_payload_header( DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), @@ -1066,14 +1070,108 @@ mod tests { } fn get_compressed_empty_sketches_payload() -> Bytes { - get_compressor().finish().expect("should not fail").freeze() + Compressor::from(Compression::zlib_default()) + .finish() + .expect("should not fail") + .freeze() } - fn decompress_payload(payload: Bytes) -> io::Result { + fn get_compressed_empty_series_v2_payload() -> Bytes { + Compressor::from(Compression::zstd_default()) + .finish() + .expect("should not fail") + .freeze() + } + + fn decompress_zlib_payload(payload: Bytes) -> io::Result { + use bytes::BytesMut; + use flate2::read::ZlibDecoder; + use std::io::copy; let mut decompressor = ZlibDecoder::new(&payload[..]); let mut decompressed = BytesMut::new().writer(); - let result = copy(&mut decompressor, &mut decompressed); - result.map(|_| decompressed.into_inner().freeze()) + copy(&mut decompressor, &mut decompressed)?; + Ok(decompressed.into_inner().freeze()) + } + + fn decompress_zstd_payload(payload: Bytes) -> io::Result { + let decompressed = zstd::decode_all(&payload[..])?; + Ok(Bytes::from(decompressed)) + } + + /// Returns the number of bytes added to the compressor's output buffer after writing `n` + /// bytes of high-entropy data. Measures only the *incremental* bytes, not the frame overhead + /// that `finish()` would append (Adler-32 / empty final block for zlib, end frame for zstd). + /// + /// This mirrors how `try_compress_buffer` uses `max_compressed_size`: it checks how many + /// more compressed bytes would be produced, against the current running output length. + /// Compresses `n` bytes of high-entropy (worst-case for compression) data and returns the + /// total output size after `finish()`. + fn total_compressed_len(compression: DatadogMetricsCompression, n: usize) -> usize { + use std::io::Write as _; + // Xorshift64 — period 2^64-1, passes BigCrush, produces statistically random bytes + // that neither zlib nor zstd can compress significantly. + let mut state = 0xdeadbeef_cafebabe_u64; + let data: Vec = (0..n) + .map(|_| { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + state as u8 + }) + .collect(); + let mut compressor = compression.compressor(); + compressor.write_all(&data).expect("write should succeed"); + compressor.finish().expect("finish should succeed").len() + } + + /// Validates that `max_compressed_size(n)` is a true upper bound on the compressed bytes + /// attributable to `n` uncompressed bytes, for both zlib and zstd. + /// + /// We measure `total_compressed_len(n) - total_compressed_len(0)` to strip the fixed frame + /// overhead (header + trailer) written regardless of input size, isolating the bytes + /// contributed by the data itself. + #[test] + fn max_compressed_size_is_upper_bound() { + // zlib stored-block boundary: 16 384 bytes; zstd block boundary: 131 072 bytes. + let test_sizes = [ + 0, 1, 100, 1_000, 16_383, 16_384, 16_385, 32_767, 32_768, 131_071, 131_072, 131_073, + 500_000, + ]; + + let zlib_frame = total_compressed_len(DatadogMetricsCompression::Zlib, 0); + let zstd_frame = total_compressed_len(DatadogMetricsCompression::Zstd, 0); + + // The formula must not overestimate by more than 1% of input + 64 bytes (a small + // constant that covers the zstd correction term for very small inputs). + let max_slack = |n: usize| n / 100 + 64; + + for &n in &test_sizes { + let actual_zlib = total_compressed_len(DatadogMetricsCompression::Zlib, n) - zlib_frame; + let max_zlib = DatadogMetricsCompression::Zlib.max_compressed_size(n); + assert!( + actual_zlib <= max_zlib, + "zlib n={n}: formula underestimates: actual={actual_zlib} > max={max_zlib}" + ); + assert!( + max_zlib - actual_zlib <= max_slack(n), + "zlib n={n}: formula overestimates: slack={} > {}", + max_zlib - actual_zlib, + max_slack(n) + ); + + let actual_zstd = total_compressed_len(DatadogMetricsCompression::Zstd, n) - zstd_frame; + let max_zstd = DatadogMetricsCompression::Zstd.max_compressed_size(n); + assert!( + actual_zstd <= max_zstd, + "zstd n={n}: formula underestimates: actual={actual_zstd} > max={max_zstd}" + ); + assert!( + max_zstd - actual_zstd <= max_slack(n), + "zstd n={n}: formula overestimates: slack={} > {}", + max_zstd - actual_zstd, + max_slack(n) + ); + } } fn ts() -> DateTime { @@ -1149,8 +1247,7 @@ mod tests { #[test] fn incorrect_metric_for_endpoint_causes_error() { // Series metrics can't go to the sketches endpoint. - let mut sketch_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None) - .expect("default payload size limits should be valid"); + let mut sketch_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None); let series_result = sketch_encoder.try_encode(get_simple_counter()); assert!(matches!( series_result.err(), @@ -1159,8 +1256,7 @@ mod tests { // And sketches can't go to the series endpoint. let mut series_v1_encoder = - DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None) - .expect("default payload size limits should be valid"); + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None); let sketch_result = series_v1_encoder.try_encode(get_simple_sketch()); assert!(matches!( sketch_result.err(), @@ -1168,8 +1264,7 @@ mod tests { )); let mut series_v2_encoder = - DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None) - .expect("default payload size limits should be valid"); + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None); let sketch_result = series_v2_encoder.try_encode(get_simple_sketch()); assert!(matches!( sketch_result.err(), @@ -1380,8 +1475,7 @@ mod tests { // This is a simple test where we ensure that a single metric, with the default limits, can // be encoded without hitting any errors. let mut encoder = - DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None) - .expect("default payload size limits should be valid"); + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None); let counter = get_simple_counter(); let expected = counter.clone(); @@ -1404,8 +1498,7 @@ mod tests { // This is a simple test where we ensure that a single metric, with the default limits, can // be encoded without hitting any errors. let mut encoder = - DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None) - .expect("default payload size limits should be valid"); + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None); let counter = get_simple_counter(); let expected = counter.clone(); @@ -1427,8 +1520,7 @@ mod tests { fn encode_single_sketch_metric_with_default_limits() { // This is a simple test where we ensure that a single metric, with the default limits, can // be encoded without hitting any errors. - let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None) - .expect("default payload size limits should be valid"); + let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None); let sketch = get_simple_sketch(); let expected = sketch.clone(); @@ -1450,8 +1542,7 @@ mod tests { fn encode_empty_sketch() { // This is a simple test where we ensure that a single metric, with the default limits, can // be encoded without hitting any errors. - let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None) - .expect("default payload size limits should be valid"); + let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None); let sketch = Metric::new( "empty", MetricKind::Incremental, @@ -1511,77 +1602,6 @@ mod tests { assert_eq!(normal_buf, incremental_buf); } - #[test] - fn payload_size_limits_series() { - // Get the maximum length of the header/trailer data. - let header_len = max_uncompressed_header_len(); - - // This is too small. - let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), - header_len, - usize::MAX, - ); - assert_eq!(result, None); - - // This is just right. - let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), - header_len + 1, - usize::MAX, - ); - assert_eq!(result, Some((header_len + 1, usize::MAX))); - - // Get the maximum compressed overhead length, based on our input uncompressed size. This - // represents the worst case overhead based on the input data (of length usize::MAX, in this - // case) being entirely incompressible. - let compression_overhead_len = max_compression_overhead_len(usize::MAX); - - // This is too small. - let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), - usize::MAX, - compression_overhead_len, - ); - assert_eq!(result, None); - - // This is just right. - let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), - usize::MAX, - compression_overhead_len + 1, - ); - assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1))); - } - - #[test] - fn payload_size_limits_sketches() { - // There's no lower bound on uncompressed size for the sketches payload. - let result = validate_payload_size_limits(DatadogMetricsEndpoint::Sketches, 0, usize::MAX); - assert_eq!(result, Some((0, usize::MAX))); - - // Get the maximum compressed overhead length, based on our input uncompressed size. This - // represents the worst case overhead based on the input data (of length usize::MAX, in this - // case) being entirely incompressible. - let compression_overhead_len = max_compression_overhead_len(usize::MAX); - - // This is too small. - let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Sketches, - usize::MAX, - compression_overhead_len, - ); - assert_eq!(result, None); - - // This is just right. - let result = validate_payload_size_limits( - DatadogMetricsEndpoint::Sketches, - usize::MAX, - compression_overhead_len + 1, - ); - assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1))); - } - #[test] fn default_payload_limits_are_endpoint_aware() { let v1 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V1).payload_limits(); @@ -1609,8 +1629,7 @@ mod tests { let mut encoder = DatadogMetricsEncoder::new( DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None, - ) - .expect("default payload size limits should be valid"); + ); let mut next_pending = Vec::new(); let mut hit_limit = false; @@ -1658,14 +1677,14 @@ mod tests { // We manually create the encoder with an arbitrarily low "uncompressed" limit but high // "compressed" limit to exercise the codepath that should avoid encoding a metric when the // uncompressed payload would exceed the limit. - let header_len = max_uncompressed_header_len(); + let header_len = + max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)); let mut encoder = DatadogMetricsEncoder::with_payload_limits( DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None, header_len + 1, usize::MAX, - ) - .expect("payload size limits should be valid"); + ); // Trying to encode a metric that would cause us to exceed our uncompressed limits will // _not_ return an error from `try_encode`, but instead will simply return back the metric @@ -1684,11 +1703,11 @@ mod tests { let (payload, processed) = result.unwrap(); assert_eq!( payload.uncompressed_byte_size, - max_uncompressed_header_len() + max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)) ); assert_eq!( payload.into_payload(), - get_compressed_empty_series_payload() + get_compressed_empty_series_v1_payload() ); assert_eq!(processed.len(), 0); } @@ -1703,8 +1722,7 @@ mod tests { None, 1, usize::MAX, - ) - .expect("payload size limits should be valid"); + ); // Trying to encode a metric that would cause us to exceed our uncompressed limits will // _not_ return an error from `try_encode`, but instead will simply return back the metric @@ -1740,8 +1758,7 @@ mod tests { None, uncompressed_limit, compressed_limit, - ) - .expect("payload size limits should be valid"); + ); // Trying to encode a metric that would cause us to exceed our compressed limits will // _not_ return an error from `try_encode`, but instead will simply return back the metric @@ -1760,11 +1777,11 @@ mod tests { let (payload, processed) = result.unwrap(); assert_eq!( payload.uncompressed_byte_size, - max_uncompressed_header_len() + max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1)) ); assert_eq!( payload.into_payload(), - get_compressed_empty_series_payload() + get_compressed_empty_series_v1_payload() ); assert_eq!(processed.len(), 0); } @@ -1775,14 +1792,13 @@ mod tests { // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the // compressed payload would exceed the limit. let uncompressed_limit = 128; - let compressed_limit = 16; + let compressed_limit = 32; let mut encoder = DatadogMetricsEncoder::with_payload_limits( DatadogMetricsEndpoint::Sketches, None, uncompressed_limit, compressed_limit, - ) - .expect("payload size limits should be valid"); + ); // Trying to encode a metric that would cause us to exceed our compressed limits will // _not_ return an error from `try_encode`, but instead will simply return back the metric @@ -1807,6 +1823,284 @@ mod tests { assert_eq!(processed.len(), 0); } + #[test] + fn encode_series_v2_breaks_out_when_limit_reached_compressed() { + // We manually create the encoder with an arbitrarily low "compressed" limit but high + // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the + // compressed payload would exceed the limit. + let uncompressed_limit = 128; + let compressed_limit = 32; + let mut encoder = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), + None, + uncompressed_limit, + compressed_limit, + ); + + // Trying to encode a metric that would cause us to exceed our compressed limits will + // _not_ return an error from `try_encode`, but instead will simply return back the metric + // as it could not be added. + let counter = get_simple_counter(); + let result = encoder.try_encode(counter.clone()); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), Some(counter)); + + // And similarly, since we didn't actually encode a metric, we _should_ be able to finish + // this payload, but it will be empty (effectively, the header/footer will exist) and no + // processed metrics should be returned. + let result = encoder.finish(); + assert!(result.is_ok()); + + let (payload, processed) = result.unwrap(); + assert_eq!(payload.uncompressed_byte_size, 0); + assert_eq!( + payload.into_payload(), + get_compressed_empty_series_v2_payload() + ); + assert_eq!(processed.len(), 0); + } + + #[test] + fn zstd_v2_payload_never_exceeds_512kb_with_incompressible_data() { + // End-to-end regression test using the real 512 KB compressed limit. + // + // Metric names are generated with a xorshift64 PRNG producing random printable ASCII + // (6.5 bits of entropy per byte), making them effectively incompressible for zstd. + // This makes the capacity estimate tight, so the test validates both directions: + // + // Safety (upper bound): payload ≤ 512 KB. + // Without the fix, the encoder ignores zstd's internal 128 KB buffer. When the + // encoder finally stops, finish() flushes that hidden buffer on top of the already + // ~511 KB payload → ~639 KB → TooLarge. + // + // Utilization (lower bound): payload > 95% of 512 KB. + // With incompressible data, actual_compressed ≈ max_cs(uncompressed), so the + // estimate is tight. The ~2.5% gap comes from: (1) compressible proto framing + // (field tags, timestamps, metadata) that zstd compresses better than max_cs + // predicts, (2) the max_cs overhead term (~0.4%), and (3) one-metric stopping + // granularity (~1%). + + // xorshift64 PRNG: 5000 random printable ASCII chars per metric name (0x21..0x7E, + // 93 values ≈ 6.5 bits/byte). Long names ensure the random portion dominates the + // compressible proto framing, maximizing utilization. + const PRINTABLE_START: u8 = 0x21; + const PRINTABLE_END: u8 = 0x7E; + const PRINTABLE_LEN: u64 = (PRINTABLE_END - PRINTABLE_START + 1) as u64; // 93 + let mut xor_state = 0xdeadbeef_cafebabe_u64; + let mut next_name = || -> String { + std::iter::once('m') + .chain((0..4999).map(|_| { + xor_state ^= xor_state << 13; + xor_state ^= xor_state >> 7; + xor_state ^= xor_state << 17; + (PRINTABLE_START + (xor_state % PRINTABLE_LEN) as u8) as char + })) + .collect() + }; + + let mut encoder = + DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None); + + let mut accepted = 0usize; + loop { + let metric = Metric::new( + next_name(), + MetricKind::Incremental, + MetricValue::Counter { + value: (accepted + 1) as f64, + }, + ) + .with_timestamp(Some(ts())); + + match encoder.try_encode(metric) { + Ok(None) => accepted += 1, + Ok(Some(_)) => break, + Err(e) => panic!("unexpected encoding error: {e}"), + } + } + + assert!(accepted > 0, "at least one metric must be accepted"); + + let compressed_limit = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) + .payload_limits() + .compressed; + + let (payload, _) = encoder.finish().unwrap_or_else(|e| { + panic!( + "finish() returned an error after {accepted} metrics — \ + the capacity estimate failed to prevent overflow: {e:?}" + ) + }); + let payload_len = payload.into_payload().len(); + + // Safety: the hard limit must never be exceeded. + assert!( + payload_len <= compressed_limit, + "payload ({payload_len} bytes) exceeded the {compressed_limit}-byte compressed limit" + ); + + // Utilization: the encoder should use at least 95% of the available capacity. + let min_utilization = compressed_limit * 95 / 100; + assert!( + payload_len > min_utilization, + "payload ({payload_len} bytes) is below 95% of the {compressed_limit}-byte limit \ + ({min_utilization} bytes) — estimate may be over-conservative" + ); + } + + #[test] + fn compressed_limit_is_respected_regardless_of_compressor_internal_buffering() { + // Regression test for zstd's internal buffering hiding the compressed-size check. + // + // zstd buffers up to 128 KB internally before flushing a block to the output Vec. + // The old capacity check used `get_ref().len()` alone as "compressed bytes so far", which + // returns 0 until zstd's first 128 KB block completes. This caused the encoder to accept + // metrics indefinitely — finish() would then return TooLarge, triggering expensive + // re-encoding. + // + // The fix splits the estimate: exact compressed size for flushed blocks, plus + // max_compressed_size for the unflushed portion (bytes still in zstd's internal buffer). + // This is accurate for flushed data and bounded for unflushed data. + // + // At compressed_limit=512, no zstd block will flush (needs 128 KB of input), so + // get_ref().len() stays 0 throughout. The old code would accept all 100 metrics; + // the new code stops after a handful. + let compressed_limit = 512; + let mut encoder = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), + None, + 1_000_000, + compressed_limit, + ); + + let mut accepted = 0; + for i in 0..100 { + let metric = Metric::new( + format!("counter_{i:0>20}"), + MetricKind::Incremental, + MetricValue::Counter { + value: (i + 1) as f64, + }, + ) + .with_timestamp(Some(ts())); + match encoder.try_encode(metric) { + Ok(None) => accepted += 1, + Ok(Some(_)) => break, + Err(e) => panic!("unexpected encoding error: {e}"), + } + } + + assert!(accepted > 0, "encoder should accept at least one metric"); + assert!( + accepted < 10, + "encoder accepted too many metrics — compressed limit was likely not enforced (accepted={accepted})" + ); + + let result = encoder.finish(); + assert!( + result.is_ok(), + "finish() must not return TooLarge: {:?}", + result.err() + ); + let (payload, _) = result.unwrap(); + assert!( + payload.into_payload().len() <= compressed_limit, + "payload exceeded compressed_limit" + ); + } + + #[test] + fn zstd_buffered_bound_resets_to_last_metric_size_after_block_flush() { + // White-box test: directly verifies that buffered_bound resets to exactly n (the last + // metric's encoded size) when a zstd block flush occurs, not to 0 or some other value. + // + // buffered_bound is an upper bound on bytes in zstd's internal buffer. On each write it + // accumulates (+= n). When a flush is detected (get_ref().len() grows), it resets to n — + // meaning only the triggering write could straddle the block boundary. + // + // If it reset to 0 instead, subsequent capacity checks would degenerate to + // flushed_compressed + max_cs(n) + // which vastly underestimates for any data written after the flush, re-introducing the + // original blind spot. If it failed to reset at all, the encoder would become + // over-conservative and reject valid metrics after the first flush. + // + // Strategy: + // 1. Measure a single metric's encoded size by inspecting buffered_bound after one write. + // 2. Feed metrics into a second encoder (with unlimited limits) until buffered_bound + // *decreases*, which signals a block flush. Assert the post-flush value equals + // exactly one metric's encoded size. + + let make_metric = |i: usize| { + // Fixed-width name (600-char zero-padded) gives a constant per-metric encoded size. + // Value is (i + 1) to ensure it is never 0.0: proto3 omits default (zero) values, + // which would make the first metric smaller than the rest. + Metric::new( + format!("counter_{i:0>600}"), + MetricKind::Incremental, + MetricValue::Counter { + value: (i + 1) as f64, + }, + ) + .with_timestamp(Some(ts())) + }; + + // Step 1: measure a single metric's encoded size. + let metric_size = { + let mut probe = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), + None, + usize::MAX, + usize::MAX, + ); + assert!( + probe.try_encode(make_metric(0)).unwrap().is_none(), + "first metric must be accepted" + ); + probe.buffered_bound() + }; + assert!(metric_size > 0, "encoded metric must be non-empty"); + + // Step 2: encode metrics until buffered_bound decreases (= flush detected). + let mut encoder = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), + None, + usize::MAX, + usize::MAX, + ); + + let mut prev_buffered = 0usize; + let mut flush_seen = false; + + for i in 0..1000 { + let metric = make_metric(i); + match encoder.try_encode(metric) { + Ok(None) => {} + Ok(Some(_)) => panic!("unexpected rejection at i={i} with unlimited limits"), + Err(e) => panic!("unexpected error at i={i}: {e}"), + } + + let curr = encoder.buffered_bound(); + + if curr < prev_buffered { + // A block flush just occurred: buffered_bound must reset to exactly n. + assert_eq!( + curr, metric_size, + "after block flush, buffered_bound should reset to one metric's encoded size \ + ({metric_size} bytes) but got {curr}" + ); + flush_seen = true; + break; + } + + prev_buffered = curr; + } + + assert!( + flush_seen, + "no zstd block flush detected after 1000 metrics — increase loop bound or metric size" + ); + } + fn arb_counter_metric() -> impl Strategy { let name = string_regex("[a-zA-Z][a-zA-Z0-9_]{8,96}").expect("regex should not be invalid"); let value = ARB_POSITIVE_F64; @@ -1827,7 +2121,7 @@ mod tests { proptest! { #[test] - fn encoding_check_for_payload_limit_edge_cases( + fn encoding_check_for_payload_limit_edge_cases_v1( uncompressed_limit in 0..64_000_000usize, compressed_limit in 0..10_000_000usize, metric in arb_counter_metric(), @@ -1838,25 +2132,51 @@ mod tests { // // We check this with targeted unit tests as well but this is some cheap insurance to // show that we're hopefully not missing any particular corner cases. - let result = DatadogMetricsEncoder::with_payload_limits( + let mut encoder = DatadogMetricsEncoder::with_payload_limits( + DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), + None, + uncompressed_limit, + compressed_limit, + ); + _ = encoder.try_encode(metric); + + if let Ok((payload, _processed)) = encoder.finish() { + let payload = payload.into_payload(); + prop_assert!(payload.len() <= compressed_limit); + + // V1 uses zlib/deflate. + let result = decompress_zlib_payload(payload); + prop_assert!(result.is_ok()); + + let decompressed = result.unwrap(); + prop_assert!(decompressed.len() <= uncompressed_limit); + } + } + + #[test] + fn encoding_check_for_payload_limit_edge_cases_v2( + uncompressed_limit in 0..10_000_000usize, + compressed_limit in 0..1_000_000usize, + metric in arb_counter_metric(), + ) { + let mut encoder = DatadogMetricsEncoder::with_payload_limits( DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None, uncompressed_limit, compressed_limit, ); - if let Ok(mut encoder) = result { - _ = encoder.try_encode(metric); + _ = encoder.try_encode(metric); - if let Ok((payload, _processed)) = encoder.finish() { - let payload = payload.into_payload(); - prop_assert!(payload.len() <= compressed_limit); + if let Ok((payload, _processed)) = encoder.finish() { + let payload = payload.into_payload(); + prop_assert!(payload.len() <= compressed_limit); - let result = decompress_payload(payload); - prop_assert!(result.is_ok()); + // V2 uses zstd. + let result = decompress_zstd_payload(payload); + prop_assert!(result.is_ok()); - let decompressed = result.unwrap(); - prop_assert!(decompressed.len() <= uncompressed_limit); - } + let decompressed = result.unwrap(); + prop_assert!(decompressed.len() <= uncompressed_limit); } } } diff --git a/src/sinks/datadog/metrics/request_builder.rs b/src/sinks/datadog/metrics/request_builder.rs index 3b1c2f4607d39..ba8a7723dbf07 100644 --- a/src/sinks/datadog/metrics/request_builder.rs +++ b/src/sinks/datadog/metrics/request_builder.rs @@ -9,19 +9,13 @@ use vector_lib::{ use super::{ config::{DatadogMetricsEndpoint, DatadogMetricsEndpointConfiguration, SeriesApiVersion}, - encoder::{CreateError, DatadogMetricsEncoder, EncoderError, FinishError}, + encoder::{DatadogMetricsEncoder, EncoderError, FinishError}, service::DatadogMetricsRequest, }; use crate::sinks::util::{IncrementalRequestBuilder, metadata::RequestMetadataBuilder}; #[derive(Debug, Snafu)] pub enum RequestBuilderError { - #[snafu( - context(false), - display("Failed to build the request builder: {source}") - )] - FailedToBuild { source: CreateError }, - #[snafu(context(false), display("Failed to encode metric: {source}"))] FailedToEncode { source: EncoderError }, @@ -40,7 +34,6 @@ impl RequestBuilderError { /// many events were dropped as a result. pub fn into_parts(self) -> (String, &'static str, u64) { match self { - Self::FailedToBuild { source } => (source.to_string(), source.as_error_type(), 0), // Encoding errors always happen at the per-metric level, so we could only ever drop a // single metric/event at a time. Self::FailedToEncode { source } => (source.to_string(), source.as_error_type(), 1), @@ -81,18 +74,18 @@ impl DatadogMetricsRequestBuilder { endpoint_configuration: DatadogMetricsEndpointConfiguration, default_namespace: Option, series_api_version: SeriesApiVersion, - ) -> Result { - Ok(Self { + ) -> Self { + Self { endpoint_configuration, series_encoder: DatadogMetricsEncoder::new( DatadogMetricsEndpoint::Series(series_api_version), default_namespace.clone(), - )?, + ), sketches_encoder: DatadogMetricsEncoder::new( DatadogMetricsEndpoint::Sketches, default_namespace, - )?, - }) + ), + } } const fn get_encoder( @@ -247,6 +240,7 @@ impl IncrementalRequestBuilder<((Option>, DatadogMetricsEndpoint), Vec< payload, uri, content_type: ddmetrics_metadata.endpoint.content_type(), + content_encoding: ddmetrics_metadata.endpoint.compression().content_encoding(), finalizers: ddmetrics_metadata.finalizers, metadata: request_metadata, } diff --git a/src/sinks/datadog/metrics/service.rs b/src/sinks/datadog/metrics/service.rs index e40c7cd91c406..024493ed88fc2 100644 --- a/src/sinks/datadog/metrics/service.rs +++ b/src/sinks/datadog/metrics/service.rs @@ -44,6 +44,7 @@ pub struct DatadogMetricsRequest { pub payload: Bytes, pub uri: Uri, pub content_type: &'static str, + pub content_encoding: &'static str, pub finalizers: EventFinalizers, pub metadata: RequestMetadata, } @@ -63,11 +64,6 @@ impl DatadogMetricsRequest { HeaderValue::from_str(&key).expect("API key should be only valid ASCII characters") }, ); - // Requests to the metrics endpoints can be compressed, and there's almost no reason to - // _not_ compress them given tha t metric data, when encoded, is very repetitive. Thus, - // here and through the sink code, we always compress requests. Datadog also only supports - // zlib (DEFLATE) compression, which is why it's hard-coded here vs being set via the common - // `Compression` value that most sinks utilize. let request = Request::post(self.uri) .header("DD-API-KEY", api_key) // TODO: The Datadog Agent sends this header to indicate the version of the Go library @@ -82,7 +78,7 @@ impl DatadogMetricsRequest { // this header. .header("DD-Agent-Payload", "4.87.0") .header(CONTENT_TYPE, self.content_type) - .header(CONTENT_ENCODING, "deflate"); + .header(CONTENT_ENCODING, self.content_encoding); request.body(Body::from(self.payload)) } From 48bdb12f7eb9f8398d8a698d2a0758b46ca53f2f Mon Sep 17 00:00:00 2001 From: Vladimir Zhuk Date: Thu, 19 Mar 2026 19:02:30 +0100 Subject: [PATCH 2/4] chore(regression): switch statsd_to_datadog_metrics to ingress_throughput The egress_throughput goal measures compressed bytes received by the blackhole. Switching v2 series from zlib to zstd produces smaller compressed payloads (better compression ratio), which registers as a false regression in egress bytes/sec. ingress_throughput measures how fast Vector consumes statsd data from the generator, which is compression-agnostic and reflects actual pipeline performance. Co-Authored-By: Claude Opus 4.6 (1M context) --- regression/cases/statsd_to_datadog_metrics/experiment.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression/cases/statsd_to_datadog_metrics/experiment.yaml b/regression/cases/statsd_to_datadog_metrics/experiment.yaml index 7c0e069a9eddc..3650e2c1157a2 100644 --- a/regression/cases/statsd_to_datadog_metrics/experiment.yaml +++ b/regression/cases/statsd_to_datadog_metrics/experiment.yaml @@ -1,4 +1,4 @@ -optimization_goal: egress_throughput +optimization_goal: ingress_throughput target: name: vector From e2da674c0d32ecf5497c9d7712e5f54997ee7e4b Mon Sep 17 00:00:00 2001 From: Vladimir Zhuk Date: Fri, 20 Mar 2026 10:11:42 +0100 Subject: [PATCH 3/4] fix(datadog_metrics sink): prevent division-by-zero in proptest with zero limits Start proptest ranges at 1 instead of 0 for uncompressed_limit and compressed_limit. The old validate_payload_size_limits rejected zero limits, but with_payload_limits is now infallible, so finish() can panic on division-by-zero when computing recommended_splits. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/sinks/datadog/metrics/encoder.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 7e99c70567a2b..4743a3ff66575 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -2122,8 +2122,8 @@ mod tests { proptest! { #[test] fn encoding_check_for_payload_limit_edge_cases_v1( - uncompressed_limit in 0..64_000_000usize, - compressed_limit in 0..10_000_000usize, + uncompressed_limit in 1..64_000_000usize, + compressed_limit in 1..10_000_000usize, metric in arb_counter_metric(), ) { // We simply try to encode a single metric into an encoder, and make sure that when we @@ -2155,8 +2155,8 @@ mod tests { #[test] fn encoding_check_for_payload_limit_edge_cases_v2( - uncompressed_limit in 0..10_000_000usize, - compressed_limit in 0..1_000_000usize, + uncompressed_limit in 1..10_000_000usize, + compressed_limit in 1..1_000_000usize, metric in arb_counter_metric(), ) { let mut encoder = DatadogMetricsEncoder::with_payload_limits( From a4f8e56d645c9734c41067f766b1c4282fe7c387 Mon Sep 17 00:00:00 2001 From: Vladimir Zhuk Date: Fri, 20 Mar 2026 11:47:01 +0100 Subject: [PATCH 4/4] chore(datadog_metrics sink): switch sketches endpoint to zstd compression Sketches endpoint now uses zstd instead of zlib, matching Series v2. Only Series v1 remains on zlib. Validated against real Datadog API: 36/36 correctness checks passed, all 18 metrics match between v1 and v2. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/sinks/datadog/metrics/config.rs | 8 ++++---- src/sinks/datadog/metrics/encoder.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sinks/datadog/metrics/config.rs b/src/sinks/datadog/metrics/config.rs index 4d5674464421d..a5fdccede6a14 100644 --- a/src/sinks/datadog/metrics/config.rs +++ b/src/sinks/datadog/metrics/config.rs @@ -111,8 +111,8 @@ impl DatadogMetricsEndpoint { /// Returns the compression scheme used for this endpoint. pub(super) const fn compression(self) -> DatadogMetricsCompression { match self { - Self::Series(SeriesApiVersion::V2) => DatadogMetricsCompression::Zstd, - _ => DatadogMetricsCompression::Zlib, + Self::Series(SeriesApiVersion::V1) => DatadogMetricsCompression::Zlib, + _ => DatadogMetricsCompression::Zstd, } } } @@ -120,9 +120,9 @@ impl DatadogMetricsEndpoint { /// Selects the compressor for a given Datadog metrics endpoint. #[derive(Clone, Copy, Debug)] pub(super) enum DatadogMetricsCompression { - /// zlib (deflate) — used by Series v1 and Sketches. + /// zlib (deflate) — used by Series v1. Zlib, - /// zstd — used by Series v2. + /// zstd — used by Series v2 and Sketches. Zstd, } diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 4743a3ff66575..ac069af80eaac 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -1070,7 +1070,7 @@ mod tests { } fn get_compressed_empty_sketches_payload() -> Bytes { - Compressor::from(Compression::zlib_default()) + Compressor::from(Compression::zstd_default()) .finish() .expect("should not fail") .freeze()