diff --git a/Cargo.toml b/Cargo.toml index b4565050babe5..1e270e662669e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -763,6 +763,7 @@ transforms-logs = [ "transforms-trace_to_log" ] transforms-metrics = [ + "transforms-absolute_to_incremental", "transforms-aggregate", "transforms-filter", "transforms-incremental_to_absolute", @@ -774,6 +775,7 @@ transforms-metrics = [ "transforms-throttle", ] +transforms-absolute_to_incremental = [] transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] transforms-dedupe = ["transforms-impl-dedupe"] diff --git a/changelog.d/24655.feature.md b/changelog.d/24655.feature.md new file mode 100644 index 0000000000000..a07fbe00474fe --- /dev/null +++ b/changelog.d/24655.feature.md @@ -0,0 +1,3 @@ +Adds a transformer to translate absolute into incremental metrics. + +authors: johannesfloriangeiger diff --git a/src/transforms/absolute_to_incremental.rs b/src/transforms/absolute_to_incremental.rs new file mode 100644 index 0000000000000..6e8b1abf4a1b6 --- /dev/null +++ b/src/transforms/absolute_to_incremental.rs @@ -0,0 +1,231 @@ +use std::{collections::HashMap, future::ready, pin::Pin, time::Duration}; + +use futures::{Stream, StreamExt}; +use vector_lib::configurable::configurable_component; + +use crate::{ + config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput}, + event::Event, + schema, + sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings}, + transforms::{TaskTransform, Transform}, +}; + +/// Configuration for the `absolute_to_incremental` transform. +#[configurable_component(transform( + "absolute_to_incremental", + "Convert absolute metrics to incremental." +))] +#[derive(Clone, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct AbsoluteToIncrementalConfig { + /// Configuration for the internal metrics cache used to normalize a stream of absolute + /// metrics into incremental metrics. + /// + /// By default, absolute metrics are evicted after 5 minutes of not being updated. The next + /// absolute value will be reset. + #[configurable(derived)] + #[serde(default)] + pub cache: NormalizerConfig, +} + +#[derive(Clone, Copy, Debug, Default)] +pub struct AbsoluteToIncrementalDefaultNormalizerSettings; + +impl NormalizerSettings for AbsoluteToIncrementalDefaultNormalizerSettings { + const MAX_EVENTS: Option = None; + const MAX_BYTES: Option = None; + const TIME_TO_LIVE: Option = Some(300); +} + +pub const fn default_expire_metrics_secs() -> Duration { + Duration::from_secs(120) +} + +impl_generate_config_from_default!(AbsoluteToIncrementalConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "absolute_to_incremental")] +impl TransformConfig for AbsoluteToIncrementalConfig { + async fn build(&self, _context: &TransformContext) -> crate::Result { + AbsoluteToIncremental::new(self).map(Transform::event_task) + } + + fn input(&self) -> Input { + Input::metric() + } + + fn outputs( + &self, + _: &TransformContext, + _: &[(OutputId, schema::Definition)], + ) -> Vec { + vec![TransformOutput::new(DataType::Metric, HashMap::new())] + } +} + +#[derive(Debug)] +pub struct AbsoluteToIncremental { + data: MetricSet, +} + +impl AbsoluteToIncremental { + pub fn new(config: &AbsoluteToIncrementalConfig) -> crate::Result { + // Create a new MetricSet with the proper cache settings + Ok(Self { + data: MetricSet::new(config.cache.validate()?.into_settings()), + }) + } + + pub fn transform_one(&mut self, event: Event) -> Option { + self.data + .make_incremental(event.as_metric().clone()) + .map(Event::Metric) + } +} + +impl TaskTransform for AbsoluteToIncremental { + fn transform( + self: Box, + task: Pin + Send>>, + ) -> Pin + Send>> + where + Self: 'static, + { + let mut inner = self; + Box::pin(task.filter_map(move |v| ready(inner.transform_one(v)))) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use futures_util::SinkExt; + use similar_asserts::assert_eq; + use vector_lib::config::ComponentKey; + + use super::*; + use crate::event::{ + Metric, + metric::{MetricKind, MetricValue}, + }; + + fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event { + let mut event = Event::Metric(Metric::new(name, kind, value)) + .with_source_id(Arc::new(ComponentKey::from("in"))) + .with_upstream_id(Arc::new(OutputId::from("transform"))); + + event.metadata_mut().set_source_type("unit_test_stream"); + + event + } + + async fn assert_metric_eq( + tx: &mut futures::channel::mpsc::Sender, + mut out_stream: impl Stream + Unpin, + metric: Event, + expected_metric: Event, + ) { + tx.send(metric).await.unwrap(); + if let Some(out_event) = out_stream.next().await { + let result = out_event; + assert_eq!(result, expected_metric); + } else { + panic!("Unexpectedly received None in output stream"); + } + } + + #[tokio::test] + async fn test_absolute_to_incremental() { + let config = toml::from_str::( + r#" +[cache] +max_events = 100 +"#, + ) + .unwrap(); + let absolute_to_incremental = AbsoluteToIncremental::new(&config) + .map(Transform::event_task) + .unwrap(); + let absolute_to_incremental = absolute_to_incremental.into_task(); + let (mut tx, rx) = futures::channel::mpsc::channel(10); + let mut out_stream = absolute_to_incremental.transform_events(Box::pin(rx)); + + let abs_counter_0 = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 0.0 }, + ); + tx.send(abs_counter_0).await.unwrap(); + + let abs_counter_1 = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 10.0 }, + ); + let expected_abs_counter_1 = make_metric( + "absolute_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + abs_counter_1, + expected_abs_counter_1, + ) + .await; + + let abs_counter_2 = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 20.0 }, + ); + let expected_abs_counter_2 = make_metric( + "absolute_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + abs_counter_2, + expected_abs_counter_2, + ) + .await; + + let abs_counter_3 = make_metric( + "absolute_counter", + MetricKind::Absolute, + MetricValue::Counter { value: 30.0 }, + ); + let expected_abs_counter_3 = make_metric( + "absolute_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 10.0 }, + ); + assert_metric_eq( + &mut tx, + &mut out_stream, + abs_counter_3, + expected_abs_counter_3, + ) + .await; + + // Incremental counters are emitted unchanged + let incremental_counter = make_metric( + "incremental_counter", + MetricKind::Incremental, + MetricValue::Counter { value: 42.0 }, + ); + let expected_incremental_counter = incremental_counter.clone(); + assert_metric_eq( + &mut tx, + &mut out_stream, + incremental_counter, + expected_incremental_counter, + ) + .await; + } +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index e4f6671828807..378b1e040de25 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -7,6 +7,8 @@ pub mod reduce; #[cfg(feature = "transforms-impl-sample")] pub mod sample; +#[cfg(feature = "transforms-absolute_to_incremental")] +pub mod absolute_to_incremental; #[cfg(feature = "transforms-aggregate")] pub mod aggregate; #[cfg(feature = "transforms-aws_ec2_metadata")] diff --git a/website/cue/reference/components/transforms/generated/absolute_to_incremental.cue b/website/cue/reference/components/transforms/generated/absolute_to_incremental.cue new file mode 100644 index 0000000000000..33acc28e844bf --- /dev/null +++ b/website/cue/reference/components/transforms/generated/absolute_to_incremental.cue @@ -0,0 +1,32 @@ +package metadata + +generated: components: transforms: absolute_to_incremental: configuration: cache: { + description: """ + Configuration for the internal metrics cache used to normalize a stream of absolute + metrics into incremental metrics. + + By default, absolute metrics are evicted after 5 minutes of not being updated. The next + absolute value will be reset. + """ + required: false + type: object: options: { + max_bytes: { + description: "The maximum size in bytes of the events in the metrics normalizer cache, excluding cache overhead." + required: false + type: uint: unit: "bytes" + } + max_events: { + description: "The maximum number of events of the metrics normalizer cache" + required: false + type: uint: unit: "events" + } + time_to_live: { + description: "The maximum age of a metric not being updated before it is evicted from the metrics normalizer cache." + required: false + type: uint: { + default: 300 + unit: "seconds" + } + } + } +}