From c4391e632b86888d7dd3d168cd74b1b82ade6dc8 Mon Sep 17 00:00:00 2001 From: Johannes Geiger Date: Tue, 3 Mar 2026 22:04:09 +0000 Subject: [PATCH 1/3] feat(absolute_to_incremental transformer): #24655 Add a transformer to translate absolute to incremental metrics. --- Cargo.lock | 65 +----- Cargo.toml | 2 + changelog.d/24655.feature.md | 3 + src/transforms/absolute_to_incremental.rs | 231 ++++++++++++++++++++++ src/transforms/mod.rs | 2 + 5 files changed, 243 insertions(+), 60 deletions(-) create mode 100644 changelog.d/24655.feature.md create mode 100644 src/transforms/absolute_to_incremental.rs diff --git a/Cargo.lock b/Cargo.lock index 30537aabbffb5..8558be7f4643b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3950,12 +3950,12 @@ dependencies = [ [[package]] name = "evmap" -version = "11.0.0" +version = "10.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b8874945f036109c72242964c1174cf99434e30cfa45bf45fedc983f50046f8" +checksum = "6e3ea06a83f97d3dc2eb06e51e7a729b418f0717a5558a5c870e3d5156dc558d" dependencies = [ "hashbag", - "left-right", + "slab", "smallvec", ] @@ -4394,21 +4394,6 @@ dependencies = [ "tokio-io", ] -[[package]] -name = "generator" -version = "0.8.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9" -dependencies = [ - "cc", - "cfg-if", - "libc", - "log", - "rustversion", - "windows-link 0.2.0", - "windows-result", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -4507,7 +4492,7 @@ dependencies = [ "serde_derive", "serde_json", "simpl", - "smpl_jwt 0.8.0", + "smpl_jwt", "time", "tokio", ] @@ -6194,17 +6179,6 @@ dependencies = [ "spin 0.5.2", ] -[[package]] -name = "left-right" -version = "0.11.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f0c21e4c8ff95f487fb34e6f9182875f42c84cef966d29216bf115d9bba835a" -dependencies = [ - "crossbeam-utils", - "loom", - "slab", -] - [[package]] name = "lexical-core" version = "1.0.6" @@ -6464,19 +6438,6 @@ dependencies = [ "prost-types 0.12.6", ] -[[package]] -name = "loom" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" -dependencies = [ - "cfg-if", - "generator", - "scoped-tls", - "tracing 0.1.44", - "tracing-subscriber", -] - [[package]] name = "lru" version = "0.12.5" @@ -10478,22 +10439,6 @@ dependencies = [ "time", ] -[[package]] -name = "smpl_jwt" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a45432dc6b645c982d4ef68966b4507fb57d98ca67289df780cd7ca4a4369f5e" -dependencies = [ - "base64 0.22.1", - "log", - "openssl", - "serde", - "serde_derive", - "serde_json", - "simpl", - "time", -] - [[package]] name = "snafu" version = "0.7.5" @@ -12688,7 +12633,7 @@ dependencies = [ "serial_test", "similar-asserts", "smallvec", - "smpl_jwt 0.9.0", + "smpl_jwt", "snafu 0.8.9", "snap", "socket2 0.5.10", diff --git a/Cargo.toml b/Cargo.toml index 56e3376b29f02..5798ce4427e8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -749,6 +749,7 @@ transforms-logs = [ "transforms-trace_to_log" ] transforms-metrics = [ + "transforms-absolute_to_incremental", "transforms-aggregate", "transforms-filter", "transforms-incremental_to_absolute", @@ -760,6 +761,7 @@ transforms-metrics = [ "transforms-throttle", ] +transforms-absolute_to_incremental = [] transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] transforms-dedupe = ["transforms-impl-dedupe"] diff --git a/changelog.d/24655.feature.md b/changelog.d/24655.feature.md new file mode 100644 index 0000000000000..2ced168bd4290 --- /dev/null +++ b/changelog.d/24655.feature.md @@ -0,0 +1,3 @@ +Adds a transformer to translate absolute into incremental metrics. + +authors: johannesfloriangeiger \ No newline at end of file diff --git a/src/transforms/absolute_to_incremental.rs b/src/transforms/absolute_to_incremental.rs new file mode 100644 index 0000000000000..3ee9504aa279d --- /dev/null +++ b/src/transforms/absolute_to_incremental.rs @@ -0,0 +1,231 @@ +use std::{collections::HashMap, future::ready, pin::Pin, time::Duration}; + +use futures::{Stream, StreamExt}; +use vector_lib::configurable::configurable_component; + +use crate::{ + config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput}, + event::Event, + schema, + sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings}, + transforms::{TaskTransform, Transform}, +}; + +/// Configuration for the `absolute_to_incremental` transform. +#[configurable_component(transform( + "absolute_to_incremental", + "Convert absolute metrics to incremental." +))] +#[derive(Clone, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct AbsoluteToIncrementalConfig { + /// Configuration for the internal metrics cache used to normalize a stream of absolute + /// metrics into incremental metrics. + /// + /// By default, absolute metrics are evicted after 5 minutes of not being updated. The next + /// absolute value will be reset. + #[configurable(derived)] + #[serde(default)] + pub cache: NormalizerConfig, +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct AbsoluteToIncrementalDefaultNormalizerSettings; + +impl NormalizerSettings for AbsoluteToIncrementalDefaultNormalizerSettings { + const MAX_EVENTS: Option = None; + const MAX_BYTES: Option = None; + const TIME_TO_LIVE: Option = Some(300); +} + +pub const fn default_expire_metrics_secs() -> Duration { + Duration::from_secs(120) +} + +impl_generate_config_from_default!(AbsoluteToIncrementalConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "absolute_to_incremental")] +impl TransformConfig for AbsoluteToIncrementalConfig { + async fn build(&self, _context: &TransformContext) -> crate::Result { + AbsoluteToIncremental::new(self).map(Transform::event_task) + } + + fn input(&self) -> Input { + Input::metric() + } + + fn outputs( + &self, + _: &TransformContext, + _: &[(OutputId, schema::Definition)], + ) -> Vec { + vec![TransformOutput::new(DataType::Metric, HashMap::new())] + } +} + +#[derive(Debug)] +pub struct AbsoluteToIncremental { + data: MetricSet, +} + +impl AbsoluteToIncremental { + pub fn new(config: &AbsoluteToIncrementalConfig) -> crate::Result { + // Create a new MetricSet with the proper cache settings + Ok(Self { + data: MetricSet::new(config.cache.validate()?.into_settings()), + }) + } + + pub fn transform_one(&mut self, event: Event) -> Option { + self.data + .make_incremental(event.as_metric().clone()) + .map(Event::Metric) + } +} + +impl TaskTransform for AbsoluteToIncremental { + fn transform( + self: Box, + task: Pin + Send>>, + ) -> Pin + Send>> + where + Self: 'static, + { + let mut inner = self; + Box::pin(task.filter_map(move |v| ready(inner.transform_one(v)))) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures_util::SinkExt; + use similar_asserts::assert_eq; + use vector_lib::config::ComponentKey; + + use super::*; + use crate::event::{ + metric::{MetricKind, MetricValue}, + Metric, + }; + + fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event { + let mut event = Event::Metric(Metric::new(name, kind, value)) + .with_source_id(Arc::new(ComponentKey::from("in"))) + .with_upstream_id(Arc::new(OutputId::from("transform"))); + + event.metadata_mut().set_source_type("unit_test_stream"); + + event + } + + async fn assert_metric_eq( + tx: &mut futures::channel::mpsc::Sender, + mut out_stream: impl Stream + Unpin, + metric: Event, + expected_metric: Event, + ) { + tx.send(metric).await.unwrap(); + if let Some(out_event) = out_stream.next().await { + let result = out_event; + assert_eq!(result, expected_metric); + } else { + panic!("Unexpectedly received None in output stream"); + } + } + + #[tokio::test] + async fn test_absolute_to_incremental() { + let config = toml::from_str::( + r#" +[cache] +max_events = 100 +"#, + ) + .unwrap(); + let absolute_to_incremental = AbsoluteToIncremental::new(&config) + .map(Transform::event_task) + .unwrap(); + let absolute_to_incremental = absolute_to_incremental.into_task(); + let (mut tx, rx) = futures::channel::mpsc::channel(10); + let mut out_stream = absolute_to_incremental.transform_events(Box::pin(rx)); + + let abs_counter_0 = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 0.0 }, + ); + tx.send(abs_counter_0).await.unwrap(); + + let abs_counter_1 = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 10.0 }, + ); + let expected_abs_counter_1 = make_metric( + "absolute_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + abs_counter_1, + expected_abs_counter_1, + ) + .await; + + let abs_counter_2 = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 20.0 }, + ); + let expected_abs_counter_2 = make_metric( + "absolute_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + abs_counter_2, + expected_abs_counter_2, + ) + .await; + + let abs_counter_3 = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 30.0 }, + ); + let expected_abs_counter_3 = make_metric( + "absolute_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + abs_counter_3, + expected_abs_counter_3, + ) + .await; + + // Incremental counters are emitted unchanged + let incremental_counter = make_metric( + "incremental_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 42.0 }, + ); + let expected_incremental_counter = incremental_counter.clone(); + assert_metric_eq( + &mut tx, + &mut out_stream, + incremental_counter, + expected_incremental_counter, + ) + .await; + } +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index e4f6671828807..378b1e040de25 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -7,6 +7,8 @@ pub mod reduce; #[cfg(feature = "transforms-impl-sample")] pub mod sample; +#[cfg(feature = "transforms-absolute_to_incremental")] +pub mod absolute_to_incremental; #[cfg(feature = "transforms-aggregate")] pub mod aggregate; #[cfg(feature = "transforms-aws_ec2_metadata")] From 89cbf16a637c7622fc7b92358962c6a3ba123ebb Mon Sep 17 00:00:00 2001 From: Johannes Geiger Date: Tue, 3 Mar 2026 22:12:13 +0000 Subject: [PATCH 2/3] #24655: Docs. --- .../generated/absolute_to_incremental.cue | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 website/cue/reference/components/transforms/generated/absolute_to_incremental.cue diff --git a/website/cue/reference/components/transforms/generated/absolute_to_incremental.cue b/website/cue/reference/components/transforms/generated/absolute_to_incremental.cue new file mode 100644 index 0000000000000..33acc28e844bf --- /dev/null +++ b/website/cue/reference/components/transforms/generated/absolute_to_incremental.cue @@ -0,0 +1,32 @@ +package metadata + +generated: components: transforms: absolute_to_incremental: configuration: cache: { + description: """ + Configuration for the internal metrics cache used to normalize a stream of absolute + metrics into incremental metrics. + + By default, absolute metrics are evicted after 5 minutes of not being updated. The next + absolute value will be reset. + """ + required: false + type: object: options: { + max_bytes: { + description: "The maximum size in bytes of the events in the metrics normalizer cache, excluding cache overhead." + required: false + type: uint: unit: "bytes" + } + max_events: { + description: "The maximum number of events of the metrics normalizer cache" + required: false + type: uint: unit: "events" + } + time_to_live: { + description: "The maximum age of a metric not being updated before it is evicted from the metrics normalizer cache." + required: false + type: uint: { + default: 300 + unit: "seconds" + } + } + } +} From b783a822e9eddd0c419f75d0ae81a3855f994453 Mon Sep 17 00:00:00 2001 From: Johannes Geiger Date: Tue, 3 Mar 2026 22:21:51 +0000 Subject: [PATCH 3/3] #24655: Formatting. --- changelog.d/24655.feature.md | 2 +- src/transforms/absolute_to_incremental.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changelog.d/24655.feature.md b/changelog.d/24655.feature.md index 2ced168bd4290..a07fbe00474fe 100644 --- a/changelog.d/24655.feature.md +++ b/changelog.d/24655.feature.md @@ -1,3 +1,3 @@ Adds a transformer to translate absolute into incremental metrics. -authors: johannesfloriangeiger \ No newline at end of file +authors: johannesfloriangeiger diff --git a/src/transforms/absolute_to_incremental.rs b/src/transforms/absolute_to_incremental.rs index 3ee9504aa279d..6e8b1abf4a1b6 100644 --- a/src/transforms/absolute_to_incremental.rs +++ b/src/transforms/absolute_to_incremental.rs @@ -107,8 +107,8 @@ mod tests { use super::*; use crate::event::{ - metric::{MetricKind, MetricValue}, Metric, + metric::{MetricKind, MetricValue}, }; fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {