Skip to content

feat(new transform): #24655 Add a transformer to translate absolute to incremental metrics.#24839

Open
johannesfloriangeiger wants to merge 15 commits intovectordotdev:masterfrom
johannesfloriangeiger:24655-absolute-to-incremental-transformer
Open

feat(new transform): #24655 Add a transformer to translate absolute to incremental metrics.#24839
johannesfloriangeiger wants to merge 15 commits intovectordotdev:masterfrom
johannesfloriangeiger:24655-absolute-to-incremental-transformer

Conversation

@johannesfloriangeiger
Copy link
Contributor

Summary

See title, add a transformer that can translate absolute to incremental metrics, the counterpart to the existing "Incremental to Absolute" Transformer.

This addresses the second observed issue in #24655 where the AWS CloudWatch Metric Sink will not properly rotate buffer files when processing absolute counter metrics. My working theory is that because the sink will translate absolute counters to incremental counters the event that gets sent to CloudWatch eventually differs from the event that the sink receives - and thus, the buffer is never rotated because the original metric is never acknowledged. Ideally, the AWS CloudWatch Metric Sink would not buffer incoming metrics until after they are pre-processed (i.e., filter out all unsupported metrics and translate all absolute counters into incremental ones) but I assume that's a bigger piece of Vector architecture change? So, this transformer acts as a workaround as it allows users to pre-process absolute metrics before they reach the sink and thus letting the buffer rotate again properly as the event that gets sent is the same as the sink receives.

Vector configuration

data_dir = "."

[sources.static_metrics]
type = "static_metrics"
metrics = [
    { name = "test_metric_0", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_1", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_2", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_3", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_4", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_5", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_6", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_7", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_8", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_9", kind = "absolute", tags = { }, value.gauge.value = 42 },
    { name = "test_metric_10", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_11", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_12", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_13", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_14", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_15", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_16", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_17", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_18", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_19", kind = "absolute", tags = { }, value.counter.value = 42 },
    { name = "test_metric_20", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_21", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_22", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_23", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_24", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_25", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_26", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_27", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_28", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_29", kind = "incremental", tags = { }, value.counter.value = 42 },
    { name = "test_metric_30", kind = "absolute", tags = { }, value.aggregated_histogram.buckets = [{ le = 10, count = 1, upper_limit = 42.0 }], value.aggregated_histogram.count = 42, value.aggregated_histogram.sum = 42 },
]

[transforms.absolute_counter_metrics_filter]
inputs = ["static_metrics"]
type = "filter"
condition = '.type == "counter" && .kind == "absolute"'

[transforms.absolute_counter_to_incremental]
type = "absolute_to_incremental"
inputs = ["absolute_counter_metrics_filter"]

[transforms.other_metrics_filter]
inputs = ["static_metrics"]
type = "filter"
condition = '.type == "gauge" || (.type == "counter" && .kind == "incremental")'

[sinks.aws_cloudwatch_metrics]
type = "aws_cloudwatch_metrics"
inputs = ["absolute_counter_to_incremental", "other_metrics_filter"]
default_namespace = "default"
healthcheck.enabled = true
buffer.type = "disk"
buffer.max_size = 2097184

How did you test this PR?

Manually, see above configuration and observe that the buffer files rotate correctly.

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • make fmt
      • make check-clippy (if there are failures it's possible some of them can be fixed with make clippy-fix)
      • make test
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run make build-licenses to regenerate the license inventory and commit the changes (if any). More details here.

@johannesfloriangeiger johannesfloriangeiger requested review from a team as code owners March 3, 2026 22:20
@github-actions github-actions bot added domain: transforms Anything related to Vector's transform components domain: external docs Anything related to Vector's external, public documentation labels Mar 3, 2026
@johannesfloriangeiger johannesfloriangeiger changed the title feat(absolute_to_incremental transformer): #24655 Add a transformer to translate absolute to incremental metrics. feat(new transform): #24655 Add a transformer to translate absolute to incremental metrics. Mar 3, 2026
@pront
Copy link
Member

pront commented Mar 18, 2026

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 004d0b3543

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".


pub fn transform_one(&mut self, event: Event) -> Option<Event> {
self.data
.make_incremental(event.as_metric().clone())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Propagate cached finalizers when normalizing absolute metrics

transform_one forwards each event directly to MetricSet::make_incremental, which caches the first absolute sample of each series and returns None. In that code path, the cached entry retains the original event metadata/finalizers and later updates clone the prior reference before reinserting, so the first sample’s finalizer is never released or merged into emitted events for hot series. With acknowledged sources, that can keep batch notifiers alive indefinitely and stall upstream acknowledgements. Please strip or merge cached finalizers when updating the reference metric so dropped-first samples do not pin ack state.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: external docs Anything related to Vector's external, public documentation domain: transforms Anything related to Vector's transform components

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AWS CloudWatch metric sink not cleaning up buffer files consistently

3 participants