Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
c4391e6
feat(absolute_to_incremental transformer): #24655 Add a transformer t…
johannesfloriangeiger Mar 3, 2026
89cbf16
#24655: Docs.
johannesfloriangeiger Mar 3, 2026
b783a82
#24655: Formatting.
johannesfloriangeiger Mar 3, 2026
84a1a9c
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 3, 2026
96d6187
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 4, 2026
fd58738
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 5, 2026
8848288
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 5, 2026
3279a84
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 7, 2026
471b1d2
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 9, 2026
68d74d5
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 10, 2026
236b2f7
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 13, 2026
dbc6cec
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 13, 2026
eba18be
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 14, 2026
004d0b3
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 16, 2026
31cf043
Merge branch 'master' into 24655-absolute-to-incremental-transformer
johannesfloriangeiger Mar 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ transforms-logs = [
"transforms-trace_to_log"
]
transforms-metrics = [
"transforms-absolute_to_incremental",
"transforms-aggregate",
"transforms-filter",
"transforms-incremental_to_absolute",
Expand All @@ -774,6 +775,7 @@ transforms-metrics = [
"transforms-throttle",
]

transforms-absolute_to_incremental = []
transforms-aggregate = []
transforms-aws_ec2_metadata = ["dep:arc-swap"]
transforms-dedupe = ["transforms-impl-dedupe"]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/24655.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Adds a transformer to translate absolute into incremental metrics.

authors: johannesfloriangeiger
231 changes: 231 additions & 0 deletions src/transforms/absolute_to_incremental.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use std::{collections::HashMap, future::ready, pin::Pin, time::Duration};

use futures::{Stream, StreamExt};
use vector_lib::configurable::configurable_component;

use crate::{
config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
event::Event,
schema,
sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings},
transforms::{TaskTransform, Transform},
};

/// Configuration for the `absolute_to_incremental` transform.
#[configurable_component(transform(
"absolute_to_incremental",
"Convert absolute metrics to incremental."
))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct AbsoluteToIncrementalConfig {
/// Configuration for the internal metrics cache used to normalize a stream of absolute
/// metrics into incremental metrics.
///
/// By default, absolute metrics are evicted after 5 minutes of not being updated. The next
/// absolute value will be reset.
#[configurable(derived)]
#[serde(default)]
pub cache: NormalizerConfig<AbsoluteToIncrementalDefaultNormalizerSettings>,
}

#[derive(Clone, Copy, Debug, Default)]
pub struct AbsoluteToIncrementalDefaultNormalizerSettings;

impl NormalizerSettings for AbsoluteToIncrementalDefaultNormalizerSettings {
const MAX_EVENTS: Option<usize> = None;
const MAX_BYTES: Option<usize> = None;
const TIME_TO_LIVE: Option<u64> = Some(300);
}

pub const fn default_expire_metrics_secs() -> Duration {
Duration::from_secs(120)
}

impl_generate_config_from_default!(AbsoluteToIncrementalConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "absolute_to_incremental")]
impl TransformConfig for AbsoluteToIncrementalConfig {
async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
AbsoluteToIncremental::new(self).map(Transform::event_task)
}

fn input(&self) -> Input {
Input::metric()
}

fn outputs(
&self,
_: &TransformContext,
_: &[(OutputId, schema::Definition)],
) -> Vec<TransformOutput> {
vec![TransformOutput::new(DataType::Metric, HashMap::new())]
}
}

#[derive(Debug)]
pub struct AbsoluteToIncremental {
data: MetricSet,
}

impl AbsoluteToIncremental {
pub fn new(config: &AbsoluteToIncrementalConfig) -> crate::Result<Self> {
// Create a new MetricSet with the proper cache settings
Ok(Self {
data: MetricSet::new(config.cache.validate()?.into_settings()),
})
}

pub fn transform_one(&mut self, event: Event) -> Option<Event> {
self.data
.make_incremental(event.as_metric().clone())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Propagate cached finalizers when normalizing absolute metrics

transform_one forwards each event directly to MetricSet::make_incremental, which caches the first absolute sample of each series and returns None. In that code path, the cached entry retains the original event metadata/finalizers and later updates clone the prior reference before reinserting, so the first sample’s finalizer is never released or merged into emitted events for hot series. With acknowledged sources, that can keep batch notifiers alive indefinitely and stall upstream acknowledgements. Please strip or merge cached finalizers when updating the reference metric so dropped-first samples do not pin ack state.

Useful? React with 👍 / 👎.

.map(Event::Metric)
}
}

impl TaskTransform<Event> for AbsoluteToIncremental {
fn transform(
self: Box<Self>,
task: Pin<Box<dyn Stream<Item = Event> + Send>>,
) -> Pin<Box<dyn Stream<Item = Event> + Send>>
where
Self: 'static,
{
let mut inner = self;
Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use futures_util::SinkExt;
use similar_asserts::assert_eq;
use vector_lib::config::ComponentKey;

use super::*;
use crate::event::{
Metric,
metric::{MetricKind, MetricValue},
};

fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
let mut event = Event::Metric(Metric::new(name, kind, value))
.with_source_id(Arc::new(ComponentKey::from("in")))
.with_upstream_id(Arc::new(OutputId::from("transform")));

event.metadata_mut().set_source_type("unit_test_stream");

event
}

async fn assert_metric_eq(
tx: &mut futures::channel::mpsc::Sender<Event>,
mut out_stream: impl Stream<Item = Event> + Unpin,
metric: Event,
expected_metric: Event,
) {
tx.send(metric).await.unwrap();
if let Some(out_event) = out_stream.next().await {
let result = out_event;
assert_eq!(result, expected_metric);
} else {
panic!("Unexpectedly received None in output stream");
}
}

#[tokio::test]
async fn test_absolute_to_incremental() {
let config = toml::from_str::<AbsoluteToIncrementalConfig>(
r#"
[cache]
max_events = 100
"#,
)
.unwrap();
let absolute_to_incremental = AbsoluteToIncremental::new(&config)
.map(Transform::event_task)
.unwrap();
let absolute_to_incremental = absolute_to_incremental.into_task();
let (mut tx, rx) = futures::channel::mpsc::channel(10);
let mut out_stream = absolute_to_incremental.transform_events(Box::pin(rx));

let abs_counter_0 = make_metric(
"absolute_counter",
MetricKind::Absolute,
MetricValue::Counter { value: 0.0 },
);
tx.send(abs_counter_0).await.unwrap();

let abs_counter_1 = make_metric(
"absolute_counter",
MetricKind::Absolute,
MetricValue::Counter { value: 10.0 },
);
let expected_abs_counter_1 = make_metric(
"absolute_counter",
MetricKind::Incremental,
MetricValue::Counter { value: 10.0 },
);
assert_metric_eq(
&mut tx,
&mut out_stream,
abs_counter_1,
expected_abs_counter_1,
)
.await;

let abs_counter_2 = make_metric(
"absolute_counter",
MetricKind::Absolute,
MetricValue::Counter { value: 20.0 },
);
let expected_abs_counter_2 = make_metric(
"absolute_counter",
MetricKind::Incremental,
MetricValue::Counter { value: 10.0 },
);
assert_metric_eq(
&mut tx,
&mut out_stream,
abs_counter_2,
expected_abs_counter_2,
)
.await;

let abs_counter_3 = make_metric(
"absolute_counter",
MetricKind::Absolute,
MetricValue::Counter { value: 30.0 },
);
let expected_abs_counter_3 = make_metric(
"absolute_counter",
MetricKind::Incremental,
MetricValue::Counter { value: 10.0 },
);
assert_metric_eq(
&mut tx,
&mut out_stream,
abs_counter_3,
expected_abs_counter_3,
)
.await;

// Incremental counters are emitted unchanged
let incremental_counter = make_metric(
"incremental_counter",
MetricKind::Incremental,
MetricValue::Counter { value: 42.0 },
);
let expected_incremental_counter = incremental_counter.clone();
assert_metric_eq(
&mut tx,
&mut out_stream,
incremental_counter,
expected_incremental_counter,
)
.await;
}
}
2 changes: 2 additions & 0 deletions src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub mod reduce;
#[cfg(feature = "transforms-impl-sample")]
pub mod sample;

#[cfg(feature = "transforms-absolute_to_incremental")]
pub mod absolute_to_incremental;
#[cfg(feature = "transforms-aggregate")]
pub mod aggregate;
#[cfg(feature = "transforms-aws_ec2_metadata")]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package metadata

generated: components: transforms: absolute_to_incremental: configuration: cache: {
description: """
Configuration for the internal metrics cache used to normalize a stream of absolute
metrics into incremental metrics.

By default, absolute metrics are evicted after 5 minutes of not being updated. The next
absolute value will be reset.
"""
required: false
type: object: options: {
max_bytes: {
description: "The maximum size in bytes of the events in the metrics normalizer cache, excluding cache overhead."
required: false
type: uint: unit: "bytes"
}
max_events: {
description: "The maximum number of events of the metrics normalizer cache"
required: false
type: uint: unit: "events"
}
time_to_live: {
description: "The maximum age of a metric not being updated before it is evicted from the metrics normalizer cache."
required: false
type: uint: {
default: 300
unit: "seconds"
}
}
}
}
Loading