diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index bddc252f1f02e..5bd1de80c87d5 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -69,6 +69,8 @@ bitwidth blackbox Blaupunkt Blusens +bpf +bpftrace buildname buildroot bytestream @@ -518,11 +520,14 @@ ubuntu Umeox UMTS unchunked +uprobe +uprobes upstreaminfo urlencoding useragents usergroups userguide +ustack Verizon vhosts Videocon diff --git a/Cargo.toml b/Cargo.toml index a24afe7d3fa59..31d2042bb786d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -456,7 +456,7 @@ byteorder = "1.5.0" windows-service = "0.8.0" [target.'cfg(unix)'.dependencies] -nix = { version = "0.31", default-features = false, features = ["socket", "signal", "fs"] } +nix = { version = "0.31", default-features = false, features = ["socket", "signal", "fs", "process"] } [target.'cfg(target_os = "linux")'.dependencies] netlink-packet-utils = "0.5.2" @@ -548,6 +548,7 @@ target-x86_64-unknown-linux-musl = ["target-unix"] # Enables features that work only on systems providing `cfg(unix)` unix = ["tikv-jemallocator", "allocation-tracing"] allocation-tracing = ["vector-lib/allocation-tracing"] +component-probes = [] # Enables kubernetes dependencies and shared code. Kubernetes-related sources, # transforms and sinks should depend on this feature. diff --git a/changelog.d/component_probes_bpftrace_cpu_attribution.feature.md b/changelog.d/component_probes_bpftrace_cpu_attribution.feature.md new file mode 100644 index 0000000000000..f4561084d3043 --- /dev/null +++ b/changelog.d/component_probes_bpftrace_cpu_attribution.feature.md @@ -0,0 +1,3 @@ +Added a `component-probes` Cargo feature (disabled by default) that enables bpftrace-based per-component CPU attribution. When enabled, a shared-memory array and uprobe symbol allow external bpftrace scripts to attribute CPU samples to individual Vector components (sources, transforms, sinks). + +authors: connoryy diff --git a/src/internal_telemetry/component_probes.rs b/src/internal_telemetry/component_probes.rs new file mode 100644 index 0000000000000..64fc303e77fde --- /dev/null +++ b/src/internal_telemetry/component_probes.rs @@ -0,0 +1,195 @@ +//! Lightweight bpftrace-based per-component CPU attribution. +//! +//! When the `component-probes` feature is enabled, this module provides a +//! [`ComponentProbesLayer`] that tags each Tokio worker thread with the ID of +//! the currently executing component. External bpftrace scripts read this tag +//! on a profile timer to produce per-component flamegraphs. + +use std::{ + marker::PhantomData, + sync::atomic::{AtomicU32, Ordering}, +}; + +use tracing::{ + Subscriber, + field::{Field, Visit}, + span::{Attributes, Id}, +}; +use tracing_subscriber::{Layer, layer::Context, registry::LookupSpan}; + +/// Returns a leaked `&'static AtomicU32` unique to the current thread. +/// +/// On first access, allocates a byte via `Box::leak` and calls +/// [`vector_register_thread`] so bpftrace can map this thread's TID +/// to the byte's address. The leaked byte is valid for the process lifetime. +fn thread_label() -> &'static AtomicU32 { + thread_local! { + static LABEL: &'static AtomicU32 = { + let label: &'static AtomicU32 = Box::leak(Box::new(AtomicU32::new(0))); + #[cfg(target_os = "linux")] + { + let tid = nix::unistd::gettid().as_raw() as u64; + vector_register_thread(tid, label as *const AtomicU32 as *const u8); + } + label + }; + } + LABEL.with(|l| *l) +} + +/// Uprobe attachment point called once per thread to register the +/// `tid -> label_address` mapping with bpftrace. +#[unsafe(no_mangle)] +#[inline(never)] +#[allow(clippy::missing_const_for_fn)] +pub extern "C" fn vector_register_thread(tid: u64, label_ptr: *const u8) { + std::hint::black_box((tid, label_ptr)); +} + +/// Uprobe attachment point called once per component to register the +/// `group_id -> component_name` mapping with bpftrace. +#[unsafe(no_mangle)] +#[inline(never)] +#[allow(clippy::missing_const_for_fn)] +pub extern "C" fn vector_register_component(id: u32, name_ptr: *const u8, name_len: usize) { + std::hint::black_box((id, name_ptr, name_len)); +} + +/// Next probe group ID. 0 means idle (no component active). +static NEXT_PROBE_ID: AtomicU32 = AtomicU32::new(1); + +/// Stored in span extensions to associate a span with a probe group ID. +struct ProbeGroupId(u32); + +/// Extracts the `component_id` field value from span attributes. +#[derive(Default)] +struct ComponentIdVisitor { + component_id: Option, +} + +impl Visit for ComponentIdVisitor { + fn record_str(&mut self, field: &Field, value: &str) { + if field.name() == "component_id" { + self.component_id = Some(value.to_owned()); + } + } + + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + if field.name() == "component_id" { + self.component_id = Some(format!("{value:?}")); + } + } +} + +/// A tracing layer that writes the active component's group ID to a per-thread +/// [`AtomicU32`] on span enter and clears it on exit. +/// +/// Detects component spans via the `component_id` field in `on_new_span`, +/// assigns a unique probe group ID, and registers the mapping with bpftrace +/// via [`vector_register_component`]. Independent of `allocation-tracing`. +pub struct ComponentProbesLayer { + _subscriber: PhantomData, +} + +impl Default for ComponentProbesLayer { + fn default() -> Self { + Self::new() + } +} + +impl ComponentProbesLayer { + #[must_use] + pub const fn new() -> Self { + Self { + _subscriber: PhantomData, + } + } +} + +impl Layer for ComponentProbesLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let mut visitor = ComponentIdVisitor::default(); + attrs.record(&mut visitor); + + if let Some(component_id) = visitor.component_id { + let probe_id = NEXT_PROBE_ID.fetch_add(1, Ordering::Relaxed); + if probe_id == 0 { + return; + } + + let id_bytes = component_id.as_bytes(); + vector_register_component(probe_id, id_bytes.as_ptr(), id_bytes.len()); + + if let Some(span) = ctx.span(id) { + span.extensions_mut().insert(ProbeGroupId(probe_id)); + } + } + } + + fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { + if let Some(span) = ctx.span(id) + && let Some(probe) = span.extensions().get::() + { + thread_label().store(probe.0, Ordering::Relaxed); + } + } + + fn on_exit(&self, id: &Id, ctx: Context<'_, S>) { + if let Some(span) = ctx.span(id) + && span.extensions().get::().is_some() + { + thread_label().store(0, Ordering::Relaxed); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn thread_label_store_and_clear() { + let label = thread_label(); + let group_id: u32 = 7; + + label.store(group_id, Ordering::Relaxed); + assert_eq!(label.load(Ordering::Relaxed), group_id); + + label.store(0, Ordering::Relaxed); + assert_eq!(label.load(Ordering::Relaxed), 0); + } + + #[test] + fn thread_label_is_stable() { + let a = thread_label(); + let b = thread_label(); + assert!(std::ptr::eq(a, b), "must return the same address"); + } + + #[test] + fn thread_labels_are_unique() { + use std::sync::mpsc; + let (tx, rx) = mpsc::channel(); + for _ in 0..4 { + let tx = tx.clone(); + std::thread::spawn(move || { + tx.send(thread_label() as *const AtomicU32 as usize) + .unwrap(); + }); + } + drop(tx); + let mut addrs: Vec = rx.iter().collect(); + addrs.sort(); + addrs.dedup(); + assert_eq!(addrs.len(), 4, "each thread must get a distinct address"); + } + + #[test] + fn register_component_does_not_panic() { + let name = b"test_component"; + vector_register_component(1, name.as_ptr(), name.len()); + } +} diff --git a/src/internal_telemetry/mod.rs b/src/internal_telemetry/mod.rs index dfbe500e308c0..095b09e3c5f8e 100644 --- a/src/internal_telemetry/mod.rs +++ b/src/internal_telemetry/mod.rs @@ -2,3 +2,6 @@ #[cfg(feature = "allocation-tracing")] pub mod allocations; + +#[cfg(all(target_os = "linux", feature = "component-probes"))] +pub mod component_probes; diff --git a/src/trace.rs b/src/trace.rs index 6ecec804569ed..a3c06d5d55e74 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -90,6 +90,14 @@ pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) subscriber.with(allocation_layer) }; + #[cfg(all(target_os = "linux", feature = "component-probes"))] + let subscriber = { + let probes_layer = crate::internal_telemetry::component_probes::ComponentProbesLayer::new() + .with_filter(LevelFilter::ERROR); + + subscriber.with(probes_layer) + }; + if json { let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true); diff --git a/website/content/en/guides/advanced/component-level-cpu-profiling-with-bpftrace.md b/website/content/en/guides/advanced/component-level-cpu-profiling-with-bpftrace.md new file mode 100644 index 0000000000000..50ed35138afe7 --- /dev/null +++ b/website/content/en/guides/advanced/component-level-cpu-profiling-with-bpftrace.md @@ -0,0 +1,81 @@ +--- +title: Component-Level CPU Profiling with bpftrace +description: Use bpftrace to attribute CPU samples to individual Vector components for targeted performance analysis +authors: ["connoryy"] +domain: observability +weight: 1 +tags: ["profiling", "bpftrace", "cpu", "observability", "advanced", "guides", "guide"] +--- + +When investigating CPU usage in a Vector pipeline, standard profiling tools show +which _functions_ are hot but not which _components_ (sources, transforms, +sinks) are responsible. The `component-probes` feature solves this by tagging +each thread with the currently active component so that bpftrace can sample it +externally. + +## Prerequisites + +- Linux with [bpftrace](https://github.com/bpftrace/bpftrace) +- Root or `CAP_BPF` +- Vector built with `--features component-probes` + +## How It Works + +When `component-probes` is enabled, Vector writes the active component's ID to +a per-thread atomic on span enter and clears it on exit. Two `extern "C"` +functions serve as uprobe attachment points: + +- `vector_register_thread(tid, label_ptr)` — maps a thread's TID to the + address of its label (fired once per thread). +- `vector_register_component(group_id, name_ptr, name_len)` — maps a group + ID to a component name (fired once per component). + +## bpftrace Script + +Replace `/path/to/vector` with your binary path: + +```bpf +#!/usr/bin/env bpftrace + +uprobe:/path/to/vector:vector_register_thread { + @tid_to_addr[arg0] = arg1; + @vector_pid = pid; +} + +uprobe:/path/to/vector:vector_register_component { + @names[arg0] = str(arg1, arg2); +} + +profile:hz:997 { + if (@vector_pid != 0 && pid == @vector_pid) { + $addr = @tid_to_addr[tid]; + if ($addr != 0) { + $group_id = *(uint32 *)$addr; + if ($group_id != 0) { + @stacks[@names[$group_id], ustack()] = count(); + } + } + } +} +``` + +This aggregates component-labeled stack traces directly in bpftrace. Start +bpftrace before Vector so it catches the registration uprobes during startup. + +If `ustack()` is not available in your environment, replace the `@stacks` +line with a `printf` to emit raw labeled samples that can be joined with +stack traces from other tools like `perf`: + +```bpf +printf("S %lld %d %s\n", nsecs, tid, @names[$group_id]); +``` + +## Overhead + +- **Per span enter/exit**: one span extension lookup + one relaxed atomic store. +- **Per thread**: 4 bytes via `Box::leak` (never freed — bpftrace reads the + address asynchronously with no synchronization). +- **Per component**: one uprobe call at startup. +- **Sampling**: kernel-side, not charged to Vector. + +When the feature is not enabled, zero extra code is compiled.