From 9dbe0d15aa680ea395003278567177a20fcaa60f Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Fri, 19 Dec 2025 10:08:32 -0500 Subject: [PATCH 1/9] feat(observer): Expose observer module as public API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make the observer module and its subcomponents public to allow external crates to reuse the procfs and cgroup v2 parsing logic. Exposed APIs: - observer::linux::Sampler - high-level sampler for a process tree - observer::linux::procfs::{memory, stat, uptime} - procfs parsers - observer::linux::cgroup::v2::{poll, get_path, cpu, memory} - cgroup v2 parsers This enables fine-grained-monitor and similar tools to leverage lading's battle-tested observer implementation without vendoring. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading/src/observer.rs | 2 +- lading/src/observer/linux.rs | 10 +++++----- lading/src/observer/linux/cgroup.rs | 8 ++++---- lading/src/observer/linux/cgroup/v2.rs | 10 +++++----- lading/src/observer/linux/cgroup/v2/cpu.rs | 8 ++++---- lading/src/observer/linux/cgroup/v2/memory.rs | 2 +- lading/src/observer/linux/procfs.rs | 12 ++++++------ lading/src/observer/linux/procfs/memory.rs | 4 ++-- .../src/observer/linux/procfs/memory/smaps_rollup.rs | 10 +++++----- lading/src/observer/linux/procfs/stat.rs | 6 +++--- lading/src/observer/linux/procfs/uptime.rs | 2 +- 11 files changed, 37 insertions(+), 37 deletions(-) diff --git a/lading/src/observer.rs b/lading/src/observer.rs index 6d1f3495f..fde6dcc42 100644 --- a/lading/src/observer.rs +++ b/lading/src/observer.rs @@ -14,7 +14,7 @@ use crate::target::TargetPidReceiver; use serde::Deserialize; #[cfg(target_os = "linux")] -mod linux; +pub mod linux; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Server`] diff --git a/lading/src/observer/linux.rs b/lading/src/observer/linux.rs index 78dc5826f..f81239607 100644 --- a/lading/src/observer/linux.rs +++ b/lading/src/observer/linux.rs @@ -1,5 +1,5 @@ -mod cgroup; -mod procfs; +pub mod cgroup; +pub mod procfs; mod utils; mod wss; @@ -20,7 +20,7 @@ pub enum Error { } #[derive(Debug)] -pub(crate) struct Sampler { +pub struct Sampler { procfs: procfs::Sampler, cgroup: cgroup::Sampler, wss: Option, @@ -28,7 +28,7 @@ pub(crate) struct Sampler { } impl Sampler { - pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { + pub fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { let procfs_sampler = procfs::Sampler::new(parent_pid)?; let cgroup_sampler = cgroup::Sampler::new(parent_pid, labels)?; let wss_sampler = if wss::Sampler::is_available() { @@ -64,7 +64,7 @@ ls -l /sys/kernel/mm/page_idle/bitmap }) } - pub(crate) async fn sample(&mut self) -> Result<(), Error> { + pub async fn sample(&mut self) -> Result<(), Error> { let sample_smaps = self.tick_counter.is_multiple_of(10); let sample_wss = self.tick_counter.is_multiple_of(60); self.tick_counter += 1; diff --git a/lading/src/observer/linux/cgroup.rs b/lading/src/observer/linux/cgroup.rs index 73de6abd5..5e58d41b2 100644 --- a/lading/src/observer/linux/cgroup.rs +++ b/lading/src/observer/linux/cgroup.rs @@ -1,5 +1,5 @@ /// Code to read cgroup information. -pub(crate) mod v2; +pub mod v2; use std::{collections::VecDeque, io, path::PathBuf}; @@ -34,14 +34,14 @@ struct CgroupInfo { } #[derive(Debug)] -pub(crate) struct Sampler { +pub struct Sampler { parent: Process, cgroup_info: FxHashMap, labels: Vec<(String, String)>, } impl Sampler { - pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { + pub fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { let parent = Process::new(parent_pid)?; let cgroup_info = FxHashMap::default(); @@ -53,7 +53,7 @@ impl Sampler { } #[allow(clippy::cast_possible_wrap)] - pub(crate) async fn poll(&mut self) -> Result<(), Error> { + pub async fn poll(&mut self) -> Result<(), Error> { // Every sample run we collect all the child processes rooted at the // parent. As noted by the procfs documentation is this done by // dereferencing the `/proc//root` symlink. diff --git a/lading/src/observer/linux/cgroup/v2.rs b/lading/src/observer/linux/cgroup/v2.rs index 4d435e57d..1a1e48271 100644 --- a/lading/src/observer/linux/cgroup/v2.rs +++ b/lading/src/observer/linux/cgroup/v2.rs @@ -1,6 +1,6 @@ -pub(crate) mod cpu; -pub(crate) mod io; -pub(crate) mod memory; +pub mod cpu; +pub mod io; +pub mod memory; use core::f64; use std::{ @@ -27,7 +27,7 @@ pub enum Error { } /// Determines the cgroup v2 path for a given PID. -pub(crate) async fn get_path(pid: i32) -> Result { +pub async fn get_path(pid: i32) -> Result { let path = format!("/proc/{pid}/cgroup"); let content = fs::read_to_string(path).await?; @@ -52,7 +52,7 @@ pub(crate) async fn get_path(pid: i32) -> Result { /// Polls for any cgroup metrics that can be read, v2 version. #[tracing::instrument(skip_all)] #[allow(clippy::too_many_lines)] -pub(crate) async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> { +pub async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> { // Read all files in the cgroup `path` and create metrics for them. If we // lack permissions to read we skip the file. We do not use ? to allow for // the maximal number of files to be read. diff --git a/lading/src/observer/linux/cgroup/v2/cpu.rs b/lading/src/observer/linux/cgroup/v2/cpu.rs index fb0e98115..fc99128f8 100644 --- a/lading/src/observer/linux/cgroup/v2/cpu.rs +++ b/lading/src/observer/linux/cgroup/v2/cpu.rs @@ -5,7 +5,7 @@ use tokio::fs; #[derive(thiserror::Error, Debug)] /// Errors produced by functions in this module -pub(crate) enum Error { +pub enum Error { /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), @@ -24,12 +24,12 @@ struct Stats { } #[derive(Debug)] -pub(crate) struct Sampler { +pub struct Sampler { prev: Stats, } impl Sampler { - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self { prev: Stats { usage_usec: 0, @@ -41,7 +41,7 @@ impl Sampler { } // Read cgroup CPU data and calculate a percentage of usage. - pub(crate) async fn poll( + pub async fn poll( &mut self, group_prefix: &Path, labels: &[(String, String)], diff --git a/lading/src/observer/linux/cgroup/v2/memory.rs b/lading/src/observer/linux/cgroup/v2/memory.rs index d5f356e83..046ef44ac 100644 --- a/lading/src/observer/linux/cgroup/v2/memory.rs +++ b/lading/src/observer/linux/cgroup/v2/memory.rs @@ -24,7 +24,7 @@ use tracing::warn; /// * `pgdeactivate` /// * `pglazyfree` /// * `pglazyfreed` -pub(crate) fn stat(content: &str, metric_prefix: &str, labels: &[(String, String)]) { +pub fn stat(content: &str, metric_prefix: &str, labels: &[(String, String)]) { let counter_keys = [ "pgfault", "pgmajfault", diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index dabc1f87c..291957239 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -1,7 +1,7 @@ /// Sampler implementation for procfs filesystems -mod memory; -mod stat; -mod uptime; +pub mod memory; +pub mod stat; +pub mod uptime; mod vmstat; use std::io; @@ -80,13 +80,13 @@ struct ProcessInfo { } #[derive(Debug)] -pub(crate) struct Sampler { +pub struct Sampler { parent: Process, process_info: FxHashMap, } impl Sampler { - pub(crate) fn new(parent_pid: i32) -> Result { + pub fn new(parent_pid: i32) -> Result { let parent = Process::new(parent_pid)?; let process_info = FxHashMap::default(); @@ -104,7 +104,7 @@ impl Sampler { clippy::cast_possible_wrap, clippy::cast_lossless )] - pub(crate) async fn poll(&mut self, include_smaps: bool) -> Result<(), Error> { + pub async fn poll(&mut self, include_smaps: bool) -> Result<(), Error> { // A tally of the total RSS and PSS consumed by the parent process and // its children. let mut aggr = smaps_rollup::Aggregator::default(); diff --git a/lading/src/observer/linux/procfs/memory.rs b/lading/src/observer/linux/procfs/memory.rs index 99775a7fc..8db1da169 100644 --- a/lading/src/observer/linux/procfs/memory.rs +++ b/lading/src/observer/linux/procfs/memory.rs @@ -1,5 +1,5 @@ -pub(crate) mod smaps; -pub(crate) mod smaps_rollup; +pub mod smaps; +pub mod smaps_rollup; const BYTES_PER_KIBIBYTE: u64 = 1024; diff --git a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs b/lading/src/observer/linux/procfs/memory/smaps_rollup.rs index 17c6b777d..d3aada3fe 100644 --- a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs +++ b/lading/src/observer/linux/procfs/memory/smaps_rollup.rs @@ -6,7 +6,7 @@ use super::{BYTES_PER_KIBIBYTE, next_token}; #[derive(thiserror::Error, Debug)] /// Errors produced by functions in this module -pub(crate) enum Error { +pub enum Error { /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), @@ -17,13 +17,13 @@ pub(crate) enum Error { } #[derive(Debug, Clone, Copy, Default)] -pub(crate) struct Aggregator { - pub(crate) rss: u64, - pub(crate) pss: u64, +pub struct Aggregator { + pub rss: u64, + pub pss: u64, } // Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. -pub(crate) async fn poll( +pub async fn poll( pid: i32, labels: &[(&'static str, String)], aggr: &mut Aggregator, diff --git a/lading/src/observer/linux/procfs/stat.rs b/lading/src/observer/linux/procfs/stat.rs index ad85813c7..6b01ec16e 100644 --- a/lading/src/observer/linux/procfs/stat.rs +++ b/lading/src/observer/linux/procfs/stat.rs @@ -36,13 +36,13 @@ struct CpuUtilization { } #[derive(Debug)] -pub(crate) struct Sampler { +pub struct Sampler { ticks_per_second: f64, prev: Stats, } impl Sampler { - pub(crate) fn new() -> Self { + pub fn new() -> Self { Self { ticks_per_second: unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64, prev: Stats::default(), @@ -50,7 +50,7 @@ impl Sampler { } #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] - pub(crate) async fn poll( + pub async fn poll( &mut self, pid: i32, uptime_secs: f64, diff --git a/lading/src/observer/linux/procfs/uptime.rs b/lading/src/observer/linux/procfs/uptime.rs index e548be138..b27d62fb9 100644 --- a/lading/src/observer/linux/procfs/uptime.rs +++ b/lading/src/observer/linux/procfs/uptime.rs @@ -15,7 +15,7 @@ pub enum Error { /// Read `/proc/uptime` /// /// Only the first field is used, which is the total uptime in seconds. -pub(crate) async fn poll() -> Result { +pub async fn poll() -> Result { let buf = tokio::fs::read_to_string("/proc/uptime").await?; let uptime_secs = proc_uptime_inner(&buf)?; Ok(uptime_secs) From 7dc4671687911bd4358f84a27597edaa8f4c29ef Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Fri, 19 Dec 2025 10:35:25 -0500 Subject: [PATCH 2/9] Add documentation to public observer APIs --- lading/src/observer/linux.rs | 3 +++ lading/src/observer/linux/cgroup.rs | 3 +++ lading/src/observer/linux/cgroup/v2.rs | 6 ++++++ lading/src/observer/linux/cgroup/v2/cpu.rs | 6 +++++- lading/src/observer/linux/procfs.rs | 3 +++ .../src/observer/linux/procfs/memory/smaps_rollup.rs | 7 ++++++- lading/src/observer/linux/procfs/stat.rs | 11 ++++++++++- 7 files changed, 36 insertions(+), 3 deletions(-) diff --git a/lading/src/observer/linux.rs b/lading/src/observer/linux.rs index f81239607..2e2ab3f4c 100644 --- a/lading/src/observer/linux.rs +++ b/lading/src/observer/linux.rs @@ -19,6 +19,7 @@ pub enum Error { Wss(#[from] wss::Error), } +/// Combined sampler for procfs, cgroup, and WSS metrics #[derive(Debug)] pub struct Sampler { procfs: procfs::Sampler, @@ -28,6 +29,7 @@ pub struct Sampler { } impl Sampler { + /// Create a new Sampler for the given parent PID pub fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { let procfs_sampler = procfs::Sampler::new(parent_pid)?; let cgroup_sampler = cgroup::Sampler::new(parent_pid, labels)?; @@ -64,6 +66,7 @@ ls -l /sys/kernel/mm/page_idle/bitmap }) } + /// Sample all metrics (procfs, cgroup, and optionally WSS) pub async fn sample(&mut self) -> Result<(), Error> { let sample_smaps = self.tick_counter.is_multiple_of(10); let sample_wss = self.tick_counter.is_multiple_of(60); diff --git a/lading/src/observer/linux/cgroup.rs b/lading/src/observer/linux/cgroup.rs index 5e58d41b2..139000c44 100644 --- a/lading/src/observer/linux/cgroup.rs +++ b/lading/src/observer/linux/cgroup.rs @@ -33,6 +33,7 @@ struct CgroupInfo { cpu_sampler: cpu::Sampler, } +/// Samples cgroup metrics for a process tree #[derive(Debug)] pub struct Sampler { parent: Process, @@ -41,6 +42,7 @@ pub struct Sampler { } impl Sampler { + /// Create a new cgroup Sampler for the given parent PID pub fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { let parent = Process::new(parent_pid)?; let cgroup_info = FxHashMap::default(); @@ -52,6 +54,7 @@ impl Sampler { }) } + /// Poll cgroup metrics for all processes in the tree #[allow(clippy::cast_possible_wrap)] pub async fn poll(&mut self) -> Result<(), Error> { // Every sample run we collect all the child processes rooted at the diff --git a/lading/src/observer/linux/cgroup/v2.rs b/lading/src/observer/linux/cgroup/v2.rs index 1a1e48271..5660c22c6 100644 --- a/lading/src/observer/linux/cgroup/v2.rs +++ b/lading/src/observer/linux/cgroup/v2.rs @@ -12,16 +12,22 @@ use metrics::{counter, gauge}; use tokio::fs; use tracing::{debug, error, warn}; +/// Errors produced by cgroup v2 functions #[derive(thiserror::Error, Debug)] pub enum Error { + /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Integer parsing error #[error("Parse int error: {0}")] ParseInt(#[from] std::num::ParseIntError), + /// Float parsing error #[error("Parse float error: {0}")] ParseFloat(#[from] std::num::ParseFloatError), + /// Cgroup v2 hierarchy not found for process #[error("Cgroup v2 not found")] CgroupV2NotFound, + /// PSI (Pressure Stall Information) parsing error #[error("Parsing PSI error: {0}")] ParsingPsi(String), } diff --git a/lading/src/observer/linux/cgroup/v2/cpu.rs b/lading/src/observer/linux/cgroup/v2/cpu.rs index fc99128f8..4d0552757 100644 --- a/lading/src/observer/linux/cgroup/v2/cpu.rs +++ b/lading/src/observer/linux/cgroup/v2/cpu.rs @@ -9,8 +9,10 @@ pub enum Error { /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Float parsing error #[error("Float Parsing: {0}")] ParseFloat(#[from] std::num::ParseFloatError), + /// Integer parsing error #[error("Integer Parsing: {0}")] ParseInt(#[from] std::num::ParseIntError), } @@ -23,12 +25,14 @@ struct Stats { last_instant: Instant, } +/// Samples CPU statistics from cgroup v2 with delta calculations #[derive(Debug)] pub struct Sampler { prev: Stats, } impl Sampler { + /// Create a new CPU Sampler pub fn new() -> Self { Self { prev: Stats { @@ -40,7 +44,7 @@ impl Sampler { } } - // Read cgroup CPU data and calculate a percentage of usage. + /// Read cgroup CPU data and calculate a percentage of usage. pub async fn poll( &mut self, group_prefix: &Path, diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index 291957239..093363173 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -79,6 +79,7 @@ struct ProcessInfo { stat_sampler: stat::Sampler, } +/// Samples procfs metrics for a process tree #[derive(Debug)] pub struct Sampler { parent: Process, @@ -86,6 +87,7 @@ pub struct Sampler { } impl Sampler { + /// Create a new procfs Sampler for the given parent PID pub fn new(parent_pid: i32) -> Result { let parent = Process::new(parent_pid)?; let process_info = FxHashMap::default(); @@ -96,6 +98,7 @@ impl Sampler { }) } + /// Poll procfs metrics for all processes in the tree #[allow( clippy::similar_names, clippy::too_many_lines, diff --git a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs b/lading/src/observer/linux/procfs/memory/smaps_rollup.rs index d3aada3fe..46b5e4a6d 100644 --- a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs +++ b/lading/src/observer/linux/procfs/memory/smaps_rollup.rs @@ -10,19 +10,24 @@ pub enum Error { /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Integer parsing error #[error("Number Parsing: {0}")] ParseInt(#[from] std::num::ParseIntError), + /// General parsing error #[error("Parsing: {0}")] Parsing(String), } +/// Aggregates memory metrics from smaps_rollup #[derive(Debug, Clone, Copy, Default)] pub struct Aggregator { + /// Resident Set Size in bytes pub rss: u64, + /// Proportional Set Size in bytes pub pss: u64, } -// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. +/// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. pub async fn poll( pid: i32, labels: &[(&'static str, String)], diff --git a/lading/src/observer/linux/procfs/stat.rs b/lading/src/observer/linux/procfs/stat.rs index 6b01ec16e..53fa601cc 100644 --- a/lading/src/observer/linux/procfs/stat.rs +++ b/lading/src/observer/linux/procfs/stat.rs @@ -3,16 +3,22 @@ use tokio::fs; use crate::observer::linux::cgroup; +/// Errors produced by functions in this module #[derive(thiserror::Error, Debug)] pub enum Error { + /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Float parsing error #[error("Float Parsing: {0}")] ParseFloat(#[from] std::num::ParseFloatError), + /// Integer parsing error #[error("Integer Parsing: {0}")] ParseInt(#[from] std::num::ParseIntError), + /// Stat file format error #[error("Stat Malformed: {0}")] StatMalformed(&'static str), + /// Cgroup path resolution error #[error("Cgroup get_path: {0}")] Cgroup(#[from] cgroup::v2::Error), } @@ -35,13 +41,15 @@ struct CpuUtilization { system_cpu_millicores: f64, } -#[derive(Debug)] +/// Samples CPU statistics from /proc//stat with delta calculations +#[derive(Debug, Clone, Copy)] pub struct Sampler { ticks_per_second: f64, prev: Stats, } impl Sampler { + /// Create a new Sampler pub fn new() -> Self { Self { ticks_per_second: unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64, @@ -49,6 +57,7 @@ impl Sampler { } } + /// Poll CPU statistics and emit metrics #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] pub async fn poll( &mut self, From 859db6e10ee14375d70a84d7dcf427ef3125fe2f Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Fri, 19 Dec 2025 12:33:01 -0500 Subject: [PATCH 3/9] docs: Add missing doc comments for public modules Required for #![deny(missing_docs)] when using lading as a library dependency. --- lading/src/observer.rs | 1 + lading/src/observer/linux.rs | 2 ++ lading/src/observer/linux/cgroup/v2.rs | 1 + lading/src/observer/linux/procfs.rs | 2 ++ lading/src/observer/linux/procfs/memory.rs | 2 ++ 5 files changed, 8 insertions(+) diff --git a/lading/src/observer.rs b/lading/src/observer.rs index fde6dcc42..8df792ac6 100644 --- a/lading/src/observer.rs +++ b/lading/src/observer.rs @@ -13,6 +13,7 @@ use std::io; use crate::target::TargetPidReceiver; use serde::Deserialize; +/// Linux-specific observer implementation using procfs and cgroups #[cfg(target_os = "linux")] pub mod linux; diff --git a/lading/src/observer/linux.rs b/lading/src/observer/linux.rs index 2e2ab3f4c..7d5c8f78f 100644 --- a/lading/src/observer/linux.rs +++ b/lading/src/observer/linux.rs @@ -1,4 +1,6 @@ +/// Cgroup metrics collection pub mod cgroup; +/// Procfs metrics collection pub mod procfs; mod utils; mod wss; diff --git a/lading/src/observer/linux/cgroup/v2.rs b/lading/src/observer/linux/cgroup/v2.rs index 5660c22c6..a71a32e95 100644 --- a/lading/src/observer/linux/cgroup/v2.rs +++ b/lading/src/observer/linux/cgroup/v2.rs @@ -1,3 +1,4 @@ +/// CPU metrics from cgroup v2 pub mod cpu; pub mod io; pub mod memory; diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index 093363173..bd0962141 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -1,6 +1,8 @@ /// Sampler implementation for procfs filesystems pub mod memory; +/// Per-process CPU statistics from /proc//stat pub mod stat; +/// System uptime from /proc/uptime pub mod uptime; mod vmstat; diff --git a/lading/src/observer/linux/procfs/memory.rs b/lading/src/observer/linux/procfs/memory.rs index 8db1da169..b84a80417 100644 --- a/lading/src/observer/linux/procfs/memory.rs +++ b/lading/src/observer/linux/procfs/memory.rs @@ -1,4 +1,6 @@ +/// Per-process memory regions from /proc//smaps pub mod smaps; +/// Rolled-up memory statistics from /proc//smaps_rollup pub mod smaps_rollup; const BYTES_PER_KIBIBYTE: u64 = 1024; From d0aebbea87412f388c5833bc53073981c6523dc1 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Fri, 19 Dec 2025 14:53:11 -0500 Subject: [PATCH 4/9] feat(capture): add file rotation support for Parquet format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add rotation API to CaptureManager that allows rotating to new output files without stopping the capture. This enables long-running capture sessions to produce multiple readable Parquet files with valid footers. Changes: - Add RotationRequest/RotationSender types for async rotation requests - Add start_with_rotation() that spawns event loop and returns sender - Add replace_format() to StateMachine for IO-agnostic format swapping - Add rotate() trait method stub to OutputFormat (returns error by default) - Add rotate_to() inherent method on parquet Format> The rotation flow: 1. Caller sends RotationRequest with new file path via RotationSender 2. CaptureManager creates new file and format 3. StateMachine.replace_format() flushes and swaps formats 4. Old format is closed (writing Parquet footer) 5. Response sent back to caller 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/formats.rs | 26 +++ lading_capture/src/formats/parquet.rs | 33 ++++ lading_capture/src/manager.rs | 179 +++++++++++++++++++- lading_capture/src/manager/state_machine.rs | 33 ++++ 4 files changed, 270 insertions(+), 1 deletion(-) diff --git a/lading_capture/src/formats.rs b/lading_capture/src/formats.rs index 571f09b88..416954d3d 100644 --- a/lading_capture/src/formats.rs +++ b/lading_capture/src/formats.rs @@ -24,6 +24,9 @@ pub enum Error { /// IO errors during write operations #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Rotation not supported for this format + #[error("File rotation not supported for this format")] + RotationNotSupported, } /// Trait for output format implementations @@ -63,6 +66,29 @@ pub trait OutputFormat { /// /// Returns an error if closing fails. fn close(self) -> Result<(), Error>; + + /// Rotate to a new output file + /// + /// Flushes and closes the current file (writing footer for Parquet), then + /// opens a new file at the specified path. This allows continuous metrics + /// collection while producing multiple readable output files. + /// + /// The default implementation returns `RotationNotSupported`. Formats that + /// support rotation (like Parquet with file-based writers) should override. + /// + /// # Arguments + /// + /// * `path` - Path for the new output file + /// + /// # Errors + /// + /// Returns an error if rotation is not supported or if file operations fail. + fn rotate(self, _path: std::path::PathBuf) -> Result + where + Self: Sized, + { + Err(Error::RotationNotSupported) + } } #[cfg(test)] diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index be327c59a..d90de9e02 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -152,6 +152,8 @@ pub struct Format { writer: ArrowWriter, /// Pre-computed Arrow schema schema: Arc, + /// Compression level for Zstd (stored for rotation) + compression_level: i32, } impl Format { @@ -219,6 +221,7 @@ impl Format { buffers: ColumnBuffers::new(), writer: arrow_writer, schema, + compression_level, }) } @@ -365,6 +368,36 @@ impl crate::formats::OutputFormat for Format { } } +impl Format> { + /// Rotate to a new output file + /// + /// Closes the current Parquet file (writing footer) and opens a new file + /// at the specified path with the same compression settings. + /// + /// # Errors + /// + /// Returns an error if closing the current file or creating the new file fails. + pub fn rotate_to(self, path: std::path::PathBuf) -> Result { + // Store compression level before closing + let compression_level = self.compression_level; + + // Close current file (writes footer) + self.close()?; + + // Create new file and writer + let file = std::fs::File::create(&path)?; + let writer = std::io::BufWriter::new(file); + let format = Self::new(writer, compression_level)?; + + Ok(format) + } + + /// Get the compression level for this format + pub fn compression_level(&self) -> i32 { + self.compression_level + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 2ca2a6c9f..b8cb7d43b 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -17,7 +17,7 @@ use std::{ }; use arc_swap::ArcSwap; -use tokio::{fs, sync::mpsc, time}; +use tokio::{fs, sync::mpsc, sync::oneshot, time}; use crate::{ accumulator, @@ -445,6 +445,27 @@ impl CaptureManager>, RealClock> } } +/// Request to rotate to a new output file +/// +/// Contains the path for the new file and a channel to send the result. +pub struct RotationRequest { + /// Path for the new output file + pub path: PathBuf, + /// Channel to send rotation result (Ok on success, Err on failure) + pub response: oneshot::Sender>, +} + +impl std::fmt::Debug for RotationRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RotationRequest") + .field("path", &self.path) + .finish_non_exhaustive() + } +} + +/// Handle for sending rotation requests to a running CaptureManager +pub type RotationSender = mpsc::Sender; + impl CaptureManager>, RealClock> { /// Create a new [`CaptureManager`] with file-based Parquet writer /// @@ -478,6 +499,162 @@ impl CaptureManager>, RealCloc RealClock::default(), )) } + + /// Run [`CaptureManager`] with file rotation support + /// + /// Similar to [`start`](CaptureManager::start), but also provides a channel + /// for rotation requests. When a rotation request is received, the current + /// Parquet file is finalized (footer written) and a new file is created at + /// the specified path. + /// + /// Returns a [`RotationSender`] immediately that can be used to trigger + /// rotations while the event loop runs. + /// + /// # Errors + /// + /// Returns an error if there is already a global recorder set. + #[allow(clippy::cast_possible_truncation)] + pub async fn start_with_rotation(mut self) -> Result { + // Create rotation channel - return the sender immediately + let (rotation_tx, rotation_rx) = mpsc::channel::(4); + + // Initialize historical sender + HISTORICAL_SENDER.store(Arc::new(Some(Arc::new(Sender { + snd: self.snd.clone(), + })))); + + self.install()?; + info!("Capture manager installed with rotation support, recording to capture file."); + + // Wait until the target is running then mark time-zero + self.target_running.recv().await; + self.clock.mark_start(); + + let compression_level = self.format.compression_level(); + + // Run the event loop in a spawned task so we can return the sender immediately + let expiration = self.expiration; + let format = self.format; + let flush_seconds = self.flush_seconds; + let registry = self.registry; + let accumulator = self.accumulator; + let global_labels = self.global_labels; + let clock = self.clock; + let recv = self.recv; + let shutdown = self.shutdown.take().expect("shutdown watcher must be present"); + + tokio::spawn(async move { + if let Err(e) = Self::rotation_event_loop( + expiration, + format, + flush_seconds, + registry, + accumulator, + global_labels, + clock, + recv, + shutdown, + rotation_rx, + compression_level, + ) + .await + { + error!(error = %e, "CaptureManager rotation event loop error"); + } + }); + + Ok(rotation_tx) + } + + /// Internal event loop with rotation support + #[allow(clippy::too_many_arguments)] + async fn rotation_event_loop( + expiration: Duration, + format: formats::parquet::Format>, + flush_seconds: u64, + registry: Arc>, + accumulator: Accumulator, + global_labels: FxHashMap, + clock: RealClock, + mut recv: mpsc::Receiver, + shutdown: lading_signal::Watcher, + mut rotation_rx: mpsc::Receiver, + compression_level: i32, + ) -> Result<(), Error> { + let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64)); + let shutdown_wait = shutdown.recv(); + tokio::pin!(shutdown_wait); + + // Create state machine with owned state + let mut state_machine = StateMachine::new( + expiration, + format, + flush_seconds, + registry, + accumulator, + global_labels, + clock, + ); + + // Event loop with rotation support + loop { + let event = tokio::select! { + val = recv.recv() => { + match val { + Some(metric) => Event::MetricReceived(metric), + None => Event::ChannelClosed, + } + } + () = flush_interval.tick() => Event::FlushTick, + Some(rotation_req) = rotation_rx.recv() => { + // Handle rotation inline since it's not a state machine event + let result = Self::handle_rotation( + &mut state_machine, + rotation_req.path, + compression_level, + ).await; + // Send result back to caller (ignore send error if receiver dropped) + let _ = rotation_req.response.send(result); + continue; + } + () = &mut shutdown_wait => Event::ShutdownSignaled, + }; + + match state_machine.next(event)? { + Operation::Continue => {} + Operation::Exit => return Ok(()), + } + } + } + + /// Handle a rotation request + async fn handle_rotation( + state_machine: &mut StateMachine< + formats::parquet::Format>, + RealClock, + >, + new_path: PathBuf, + compression_level: i32, + ) -> Result<(), formats::Error> { + // Create new file and format + let fp = fs::File::create(&new_path) + .await + .map_err(formats::Error::Io)?; + let fp = fp.into_std().await; + let writer = BufWriter::new(fp); + let new_format = parquet::Format::new(writer, compression_level)?; + + // Swap formats - this flushes any buffered data + let old_format = state_machine + .replace_format(new_format) + .map_err(|e| formats::Error::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?; + + // Close old format to write Parquet footer + old_format.close()?; + + info!(path = %new_path.display(), "Rotated to new capture file"); + Ok(()) + } } impl diff --git a/lading_capture/src/manager/state_machine.rs b/lading_capture/src/manager/state_machine.rs index 9d9a2c0fb..5836e2bc3 100644 --- a/lading_capture/src/manager/state_machine.rs +++ b/lading_capture/src/manager/state_machine.rs @@ -258,6 +258,39 @@ impl StateMachine { Ok(Operation::Exit) } + /// Replace the current format with a new one, returning the old format. + /// + /// This method flushes any buffered data before returning the old format. + /// The caller is responsible for closing the old format (to write any + /// footer/metadata) and providing a properly initialized new format. + /// + /// This enables file rotation: the caller can close the old format (writing + /// the Parquet footer), create a new file, and provide the new format. + /// + /// # Errors + /// + /// Returns an error if flushing the current format fails. + /// + /// # Panics + /// + /// Panics if called when no format is present (after shutdown). + pub(crate) fn replace_format(&mut self, new_format: F) -> Result { + // Flush any buffered data in the current format + self.format + .as_mut() + .expect("format must be present during operation") + .flush()?; + + // Swap in the new format and return the old one + let old_format = self + .format + .replace(new_format) + .expect("format must be present during operation"); + + info!("Format replaced for file rotation"); + Ok(old_format) + } + /// Convert an Instant timestamp to `Accumulator` logical tick time. #[inline] fn instant_to_tick(&self, timestamp: Instant) -> u64 { From 5d5134566e3ed4faa85115e2eb7c79785d7859c1 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Mon, 22 Dec 2025 18:03:19 -0500 Subject: [PATCH 5/9] Fix CPU metric spike on first observation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CPU Sampler was initializing prev stats to zeros, causing the first delta calculation to be (cumulative_since_container_start - 0) which produces an enormous spike in total_cpu_usage_millicores. Fix by making prev an Option. On first poll, we record baseline stats but skip metric emission. Subsequent polls compute proper deltas. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading/src/observer/linux/cgroup/v2/cpu.rs | 49 ++++++++++++---------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/lading/src/observer/linux/cgroup/v2/cpu.rs b/lading/src/observer/linux/cgroup/v2/cpu.rs index 4d0552757..694518524 100644 --- a/lading/src/observer/linux/cgroup/v2/cpu.rs +++ b/lading/src/observer/linux/cgroup/v2/cpu.rs @@ -26,22 +26,17 @@ struct Stats { } /// Samples CPU statistics from cgroup v2 with delta calculations -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Sampler { - prev: Stats, + /// Previous stats for delta calculation. None on first poll. + prev: Option, } impl Sampler { /// Create a new CPU Sampler + #[must_use] pub fn new() -> Self { - Self { - prev: Stats { - usage_usec: 0, - user_usec: 0, - system_usec: 0, - last_instant: Instant::now(), - }, - } + Self { prev: None } } /// Read cgroup CPU data and calculate a percentage of usage. @@ -84,17 +79,29 @@ impl Sampler { } let now = Instant::now(); - let delta_time = now.duration_since(self.prev.last_instant).as_micros(); - let delta_usage = usage_usec.saturating_sub(self.prev.usage_usec); - let delta_user = user_usec.saturating_sub(self.prev.user_usec); - let delta_system = system_usec.saturating_sub(self.prev.system_usec); - - // Update previous stats and if there's a time delta calculate the CPU - // usage. - self.prev.usage_usec = usage_usec; - self.prev.user_usec = user_usec; - self.prev.system_usec = system_usec; - self.prev.last_instant = now; + let current = Stats { + usage_usec, + user_usec, + system_usec, + last_instant: now, + }; + + // On first poll, just record baseline stats without emitting metrics. + // This avoids a spike where delta = (cumulative_since_container_start - 0). + let Some(ref prev) = self.prev else { + self.prev = Some(current); + return Ok(()); + }; + + let delta_time = now.duration_since(prev.last_instant).as_micros(); + let delta_usage = usage_usec.saturating_sub(prev.usage_usec); + let delta_user = user_usec.saturating_sub(prev.user_usec); + let delta_system = system_usec.saturating_sub(prev.system_usec); + + // Update previous stats for next poll + self.prev = Some(current); + + // Emit metrics if there's a time delta if delta_time > 0 { let delta_time = delta_time as f64; From 706bc95ef42f6479e364fa9c7a55b7c7c58e15bb Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Tue, 23 Dec 2025 09:33:41 -0500 Subject: [PATCH 6/9] Fix first-poll CPU spike in procfs/stat.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same fix as cgroup/v2/cpu.rs: make prev an Option and skip metric emission on first poll to avoid computing delta from cumulative-since-process-start minus zero. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading/src/observer/linux/procfs/stat.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lading/src/observer/linux/procfs/stat.rs b/lading/src/observer/linux/procfs/stat.rs index 53fa601cc..36556f683 100644 --- a/lading/src/observer/linux/procfs/stat.rs +++ b/lading/src/observer/linux/procfs/stat.rs @@ -45,7 +45,8 @@ struct CpuUtilization { #[derive(Debug, Clone, Copy)] pub struct Sampler { ticks_per_second: f64, - prev: Stats, + /// Previous stats for delta calculation. None on first poll. + prev: Option, } impl Sampler { @@ -53,7 +54,7 @@ impl Sampler { pub fn new() -> Self { Self { ticks_per_second: unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64, - prev: Stats::default(), + prev: None, } } @@ -87,16 +88,20 @@ impl Sampler { let (cur_pid, utime_ticks, stime_ticks) = parse(&stat_contents)?; assert!(cur_pid == pid); - // Get or initialize the previous stats. Note that the first time this is - // initialized we intentionally set last_instance to now to avoid scheduling - // shenanigans. let cur_stats = Stats { user_ticks: utime_ticks, system_ticks: stime_ticks, uptime_ticks: (uptime_secs * self.ticks_per_second).round() as u64, }; - if let Some(util) = compute_cpu_usage(self.prev, cur_stats, allowed_cores) { + // On first poll, just record baseline stats without emitting metrics. + // This avoids a spike where delta = (cumulative_since_process_start - 0). + let Some(ref prev) = self.prev else { + self.prev = Some(cur_stats); + return Ok(()); + }; + + if let Some(util) = compute_cpu_usage(*prev, cur_stats, allowed_cores) { // NOTE these metric names are paired with names in cgroup/v2/cpu.rs and // must remain consistent. If you change these, change those. gauge!("stat.total_cpu_percentage", labels).set(util.total_cpu_percentage); @@ -112,7 +117,7 @@ impl Sampler { gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores); } - self.prev = cur_stats; + self.prev = Some(cur_stats); Ok(()) } From b1d81933f08cb083208baae0b50efed43d71733d Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Wed, 31 Dec 2025 15:06:17 -0500 Subject: [PATCH 7/9] feat(capture): flatten labels to top-level l_* columns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace MapArray-based label storage with flat l_ columns in Parquet output. This enables predicate pushdown for filtering by container_id and other labels, avoiding full file scans. Key changes: - Dynamic schema generation based on discovered label keys - Dictionary encoding for low-cardinality label columns - Lazy ArrowWriter initialization (schema determined at first flush) - Updated validation and round-trip tests for new schema 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/formats.rs | 45 +-- lading_capture/src/formats/parquet.rs | 407 ++++++++++++++++++------- lading_capture/src/manager.rs | 14 +- lading_capture/src/validate/parquet.rs | 53 ++-- 4 files changed, 358 insertions(+), 161 deletions(-) diff --git a/lading_capture/src/formats.rs b/lading_capture/src/formats.rs index 416954d3d..3585ef5e7 100644 --- a/lading_capture/src/formats.rs +++ b/lading_capture/src/formats.rs @@ -97,8 +97,7 @@ mod tests { use crate::line::{Line, LineValue, MetricKind}; use approx::relative_eq; use arrow_array::{ - Array, BinaryArray, Float64Array, MapArray, StringArray, StructArray, - TimestampMillisecondArray, UInt64Array, + Array, BinaryArray, Float64Array, StringArray, TimestampMillisecondArray, UInt64Array, }; use bytes::Bytes; use datadog_protos::metrics::Dogsketch; @@ -487,12 +486,23 @@ mod tests { .downcast_ref::() .expect("value_float is Float64Array"); - let labels_array = batch - .column_by_name("labels") - .expect("labels column") - .as_any() - .downcast_ref::() - .expect("labels is MapArray"); + // Collect l_* columns for label extraction (new schema uses flat columns) + let schema = batch.schema(); + let l_columns: Vec<(&str, &StringArray)> = schema + .fields() + .iter() + .filter_map(|field| { + let name = field.name(); + if name.starts_with("l_") { + batch + .column_by_name(name) + .and_then(|c| c.as_any().downcast_ref::()) + .map(|arr| (name.strip_prefix("l_").unwrap_or(name), arr)) + } else { + None + } + }) + .collect(); let value_histogram_array = batch .column_by_name("value_histogram") @@ -524,21 +534,12 @@ mod tests { LineValue::Int(value_int_array.value(row_idx)) }; - let labels_slice: StructArray = labels_array.value(row_idx); - let keys = labels_slice - .column(0) - .as_any() - .downcast_ref::() - .expect("label keys are strings"); - let values = labels_slice - .column(1) - .as_any() - .downcast_ref::() - .expect("label values are strings"); - + // Extract labels from l_* columns let mut labels = FxHashMap::default(); - for i in 0..keys.len() { - labels.insert(keys.value(i).to_string(), values.value(i).to_string()); + for (key, arr) in &l_columns { + if !arr.is_null(row_idx) { + labels.insert((*key).to_string(), arr.value(row_idx).to_string()); + } } let value_histogram = if value_histogram_array.is_null(row_idx) { diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index d90de9e02..6db3bd38b 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -3,18 +3,27 @@ //! This format buffers metrics in memory and periodically writes them as //! Parquet row groups, providing better compression and query performance than //! JSONL. +//! +//! Labels are stored as top-level columns with `l_` prefix (e.g., `l_container_id`, +//! `l_namespace`). This enables Parquet predicate pushdown for efficient filtering +//! by any label key, unlike the previous `MapArray` approach which required post-read +//! filtering. +//! +//! Schema is determined dynamically at first flush based on label keys discovered +//! in the buffered data. All label columns are nullable Utf8 strings. use std::{ - io::{Seek, Write}, + collections::{BTreeMap, BTreeSet}, + fs::File, + io::{BufWriter, Seek, Write}, sync::Arc, }; use arrow_array::{ - ArrayRef, BinaryArray, Float64Array, MapArray, RecordBatch, StringArray, StructArray, - TimestampMillisecondArray, UInt64Array, + ArrayRef, BinaryArray, Float64Array, RecordBatch, StringArray, TimestampMillisecondArray, + UInt64Array, }; -use arrow_buffer::OffsetBuffer; -use arrow_schema::{ArrowError, DataType, Field, Fields, Schema, TimeUnit}; +use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; use parquet::{ arrow::ArrowWriter, basic::{Compression, ZstdLevel}, @@ -43,6 +52,9 @@ pub enum Error { /// Holds reusable allocations for all columns to avoid repeated allocation /// during flush operations. Buffers are cleared after each write, preserving /// capacity for the next batch. +/// +/// Labels are stored per-row as a map, with all unique keys tracked separately +/// to enable dynamic schema generation with `l_` columns. #[derive(Debug)] struct ColumnBuffers { run_ids: Vec, @@ -52,9 +64,12 @@ struct ColumnBuffers { metric_kinds: Vec<&'static str>, values_int: Vec>, values_float: Vec>, - label_keys: Vec, - label_values: Vec, - label_offsets: Vec, + /// Per-row label maps. Each entry contains the labels for one metric row. + /// Using `BTreeMap` for consistent ordering when iterating. + row_labels: Vec>, + /// All unique label keys seen in this batch. Used to generate schema. + /// `BTreeSet` ensures consistent column ordering across runs. + unique_label_keys: BTreeSet, values_histogram: Vec>, } @@ -71,9 +86,8 @@ impl ColumnBuffers { metric_kinds: Vec::with_capacity(INITIAL_CAPACITY), values_int: Vec::with_capacity(INITIAL_CAPACITY), values_float: Vec::with_capacity(INITIAL_CAPACITY), - label_keys: Vec::with_capacity(INITIAL_CAPACITY * 2), - label_values: Vec::with_capacity(INITIAL_CAPACITY * 2), - label_offsets: Vec::with_capacity(INITIAL_CAPACITY + 1), + row_labels: Vec::with_capacity(INITIAL_CAPACITY), + unique_label_keys: BTreeSet::new(), values_histogram: Vec::with_capacity(INITIAL_CAPACITY), } } @@ -87,9 +101,9 @@ impl ColumnBuffers { self.metric_kinds.clear(); self.values_int.clear(); self.values_float.clear(); - self.label_keys.clear(); - self.label_values.clear(); - self.label_offsets.clear(); + self.row_labels.clear(); + // Note: unique_label_keys is NOT cleared - it accumulates across flushes + // to maintain consistent schema within a file self.values_histogram.clear(); } @@ -117,13 +131,13 @@ impl ColumnBuffers { } } - // Add labels for this row + // Store labels for this row and track unique keys + let mut row_map = BTreeMap::new(); for (k, v) in &line.labels { - self.label_keys.push(k.clone()); - self.label_values.push(v.clone()); + self.unique_label_keys.insert(k.clone()); + row_map.insert(k.clone(), v.clone()); } - #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)] - self.label_offsets.push(self.label_keys.len() as i32); + self.row_labels.push(row_map); self.values_histogram.push(line.value_histogram.clone()); } @@ -144,18 +158,29 @@ impl ColumnBuffers { /// /// Buffers metrics in memory. Calling `flush()` writes accumulated metrics as a /// Parquet row group. +/// +/// The schema is determined dynamically at first flush based on label keys +/// discovered in the buffered data. Label columns use the `l_` prefix +/// (e.g., `l_container_id`). #[derive(Debug)] pub struct Format { /// Reusable column buffers for building Arrow arrays buffers: ColumnBuffers, - /// Parquet writer - writer: ArrowWriter, - /// Pre-computed Arrow schema - schema: Arc, + /// Parquet writer - created lazily on first flush when schema is known + writer: Option>, + /// The underlying writer, stored until `ArrowWriter` is created + raw_writer: Option, + /// Arrow schema - created on first flush based on discovered label keys + schema: Option>, + /// Ordered list of label keys in schema (for consistent column ordering) + schema_label_keys: Vec, /// Compression level for Zstd (stored for rotation) compression_level: i32, } +/// Label column prefix for flattened labels +const LABEL_COLUMN_PREFIX: &str = "l_"; + impl Format { /// Create a new Parquet format writer /// @@ -166,9 +191,28 @@ impl Format { /// /// # Errors /// - /// Returns error if Arrow writer creation fails + /// Returns error if compression level is invalid pub fn new(writer: W, compression_level: i32) -> Result { - let schema = Arc::new(Schema::new(vec![ + // Validate compression level early + let _ = ZstdLevel::try_new(compression_level)?; + + Ok(Self { + buffers: ColumnBuffers::new(), + writer: None, + raw_writer: Some(writer), + schema: None, + schema_label_keys: Vec::new(), + compression_level, + }) + } + + /// Generate schema based on discovered label keys + /// + /// Creates base columns plus `l_` columns for each unique label key. + /// Label columns are nullable Utf8 strings, sorted alphabetically for + /// consistent ordering. + fn generate_schema(label_keys: &BTreeSet) -> (Arc, Vec) { + let mut fields = vec![ Field::new("run_id", DataType::Utf8, false), Field::new( "time", @@ -180,24 +224,28 @@ impl Format { Field::new("metric_kind", DataType::Utf8, false), Field::new("value_int", DataType::UInt64, true), Field::new("value_float", DataType::Float64, true), - Field::new( - "labels", - DataType::Map( - Arc::new(Field::new( - "entries", - DataType::Struct(Fields::from(vec![ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, false), - ])), - false, - )), - false, - ), - false, - ), - Field::new("value_histogram", DataType::Binary, true), - ])); + ]; + + // Add l_ columns for each label key (sorted by BTreeSet) + let ordered_keys: Vec = label_keys.iter().cloned().collect(); + for key in &ordered_keys { + fields.push(Field::new( + format!("{LABEL_COLUMN_PREFIX}{key}"), + DataType::Utf8, + true, // nullable - not all rows have all labels + )); + } + fields.push(Field::new("value_histogram", DataType::Binary, true)); + + (Arc::new(Schema::new(fields)), ordered_keys) + } + + /// Create writer properties with dictionary encoding for appropriate columns + fn create_writer_properties( + compression_level: i32, + label_keys: &[String], + ) -> Result { // Use Parquet v2 format for better encodings and compression: // // - DELTA_BINARY_PACKED encoding for integers (timestamps, fetch_index) @@ -206,23 +254,45 @@ impl Format { // // Dictionary encoding for low-cardinality columns: // - // - metric_kind: only 2 values ("counter", "gauge") + // - metric_kind: only 3 values ("counter", "gauge", "histogram") // - run_id: one UUID per run - let props = WriterProperties::builder() + // - label columns: often low cardinality (container_id, namespace, etc.) + let mut builder = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) .set_compression(Compression::ZSTD(ZstdLevel::try_new(compression_level)?)) .set_column_dictionary_enabled(ColumnPath::from("metric_kind"), true) - .set_column_dictionary_enabled(ColumnPath::from("run_id"), true) - .build(); + .set_column_dictionary_enabled(ColumnPath::from("run_id"), true); + + // Enable dictionary encoding for all label columns + for key in label_keys { + builder = builder.set_column_dictionary_enabled( + ColumnPath::from(format!("{LABEL_COLUMN_PREFIX}{key}")), + true, + ); + } - let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?; + Ok(builder.build()) + } - Ok(Self { - buffers: ColumnBuffers::new(), - writer: arrow_writer, - schema, - compression_level, - }) + /// Initialize the writer with schema based on discovered label keys + /// + /// Called on first flush when we know what label keys exist. + fn initialize_writer(&mut self) -> Result<(), Error> { + let raw_writer = self + .raw_writer + .take() + .expect("raw_writer should be present before initialization"); + + let (schema, ordered_keys) = Self::generate_schema(&self.buffers.unique_label_keys); + let props = Self::create_writer_properties(self.compression_level, &ordered_keys)?; + + let arrow_writer = ArrowWriter::try_new(raw_writer, schema.clone(), Some(props))?; + + self.schema = Some(schema); + self.schema_label_keys = ordered_keys; + self.writer = Some(arrow_writer); + + Ok(()) } /// Convert buffered data to Arrow `RecordBatch` @@ -231,43 +301,19 @@ impl Format { /// /// Returns error if `RecordBatch` construction fails fn buffers_to_record_batch(&self) -> Result { + let schema = self + .schema + .as_ref() + .expect("schema should be initialized before creating record batch"); + if self.buffers.is_empty() { - return Ok(RecordBatch::new_empty(self.schema.clone())); + return Ok(RecordBatch::new_empty(schema.clone())); } - // Prepare label offsets with initial 0 - let mut label_offsets = Vec::with_capacity(self.buffers.label_offsets.len() + 1); - label_offsets.push(0i32); - label_offsets.extend_from_slice(&self.buffers.label_offsets); - - // Build the labels map array using pre-allocated buffers - let keys_array = Arc::new(StringArray::from(self.buffers.label_keys.clone())); - let values_array = Arc::new(StringArray::from(self.buffers.label_values.clone())); - let struct_array = StructArray::from(vec![ - ( - Arc::new(Field::new("key", DataType::Utf8, false)), - keys_array as ArrayRef, - ), - ( - Arc::new(Field::new("value", DataType::Utf8, false)), - values_array as ArrayRef, - ), - ]); - - let field = Arc::new(Field::new( - "entries", - DataType::Struct(Fields::from(vec![ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, false), - ])), - false, - )); - - let offsets = OffsetBuffer::new(label_offsets.into()); - let labels_map = MapArray::new(field, offsets, struct_array, None, false); + let num_rows = self.buffers.run_ids.len(); - // Build arrays directly from pre-allocated buffers - let arrays: Vec = vec![ + // Build base column arrays + let mut arrays: Vec = vec![ Arc::new(StringArray::from(self.buffers.run_ids.clone())), Arc::new(TimestampMillisecondArray::from(self.buffers.times.clone())), Arc::new(UInt64Array::from(self.buffers.fetch_indices.clone())), @@ -275,23 +321,47 @@ impl Format { Arc::new(StringArray::from(self.buffers.metric_kinds.clone())), Arc::new(UInt64Array::from(self.buffers.values_int.clone())), Arc::new(Float64Array::from(self.buffers.values_float.clone())), - Arc::new(labels_map), - Arc::new(BinaryArray::from_opt_vec( - self.buffers - .values_histogram - .iter() - .map(|v| { - if v.is_empty() { - None - } else { - Some(v.as_slice()) - } - }) - .collect(), - )), ]; - Ok(RecordBatch::try_new(self.schema.clone(), arrays)?) + // Build l_ columns for each label key in schema order + for key in &self.schema_label_keys { + let values: Vec> = self + .buffers + .row_labels + .iter() + .map(|row_map| row_map.get(key).map(String::as_str)) + .collect(); + arrays.push(Arc::new(StringArray::from(values))); + } + + // Add histogram column last + arrays.push(Arc::new( + self.buffers + .values_histogram + .iter() + .map(|v| { + if v.is_empty() { + None + } else { + Some(v.as_slice()) + } + }) + .collect::(), + )); + + debug_assert_eq!( + arrays.len(), + schema.fields().len(), + "array count ({}) must match schema field count ({})", + arrays.len(), + schema.fields().len() + ); + debug_assert!( + arrays.iter().all(|a| a.len() == num_rows), + "all arrays must have {num_rows} rows", + ); + + Ok(RecordBatch::try_new(schema.clone(), arrays)?) } /// Write buffered metrics as a Parquet row group @@ -304,8 +374,16 @@ impl Format { return Ok(()); } + // Initialize writer on first flush when we know the label keys + if self.writer.is_none() { + self.initialize_writer()?; + } + let batch = self.buffers_to_record_batch()?; - self.writer.write(&batch)?; + self.writer + .as_mut() + .expect("writer should be initialized") + .write(&batch)?; self.buffers.clear(); Ok(()) } @@ -348,8 +426,12 @@ impl Format { pub fn close(mut self) -> Result<(), Error> { // Write any remaining buffered data as a final row group self.write_parquet()?; - // Close the ArrowWriter which consumes it - self.writer.close()?; + + // Close the ArrowWriter if it was created + if let Some(writer) = self.writer { + writer.close()?; + } + // If writer was never created (no data written), nothing to close Ok(()) } } @@ -377,7 +459,7 @@ impl Format> { /// # Errors /// /// Returns an error if closing the current file or creating the new file fails. - pub fn rotate_to(self, path: std::path::PathBuf) -> Result { + pub fn rotate_to(self, path: &std::path::Path) -> Result { // Store compression level before closing let compression_level = self.compression_level; @@ -385,14 +467,15 @@ impl Format> { self.close()?; // Create new file and writer - let file = std::fs::File::create(&path)?; - let writer = std::io::BufWriter::new(file); + let file = File::create(path)?; + let writer = BufWriter::new(file); let format = Self::new(writer, compression_level)?; Ok(format) } /// Get the compression level for this format + #[must_use] pub fn compression_level(&self) -> i32 { self.compression_level } @@ -489,4 +572,114 @@ mod tests { assert!(!buffer.get_ref().is_empty(), "should have written data"); } + + #[test] + fn writes_label_columns() { + use arrow_array::{Array, RecordBatchReader}; + use bytes::Bytes; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let mut buffer = Cursor::new(Vec::new()); + + { + let mut format = Format::new(&mut buffer, 3).expect("create format"); + + // Write metric with labels + let mut labels = FxHashMap::default(); + labels.insert("container_id".to_string(), "abc123".to_string()); + labels.insert("namespace".to_string(), "default".to_string()); + labels.insert("qos_class".to_string(), "Guaranteed".to_string()); + + let line = Line { + run_id: Uuid::new_v4(), + time: 1000, + fetch_index: 0, + metric_name: "test_metric".into(), + metric_kind: MetricKind::Gauge, + value: LineValue::Float(42.0), + labels, + value_histogram: Vec::new(), + }; + + format.write_metric(&line).expect("write should succeed"); + + // Write another metric with different labels + let mut labels2 = FxHashMap::default(); + labels2.insert("container_id".to_string(), "def456".to_string()); + labels2.insert("namespace".to_string(), "kube-system".to_string()); + // Note: no qos_class label + + let line2 = Line { + run_id: Uuid::new_v4(), + time: 2000, + fetch_index: 1, + metric_name: "test_metric".into(), + metric_kind: MetricKind::Gauge, + value: LineValue::Float(100.0), + labels: labels2, + value_histogram: Vec::new(), + }; + + format.write_metric(&line2).expect("write should succeed"); + format.close().expect("close should succeed"); + } + + // Read back and verify schema has l_* columns + let data = Bytes::from(buffer.into_inner()); + let reader = ParquetRecordBatchReaderBuilder::try_new(data) + .expect("create reader") + .build() + .expect("build reader"); + + let schema = reader.schema(); + + // Check that l_* columns exist (sorted alphabetically) + assert!( + schema.field_with_name("l_container_id").is_ok(), + "should have l_container_id column" + ); + assert!( + schema.field_with_name("l_namespace").is_ok(), + "should have l_namespace column" + ); + assert!( + schema.field_with_name("l_qos_class").is_ok(), + "should have l_qos_class column" + ); + + // Check no labels MapArray column + assert!( + schema.field_with_name("labels").is_err(), + "should NOT have labels column (replaced by l_* columns)" + ); + + // Read data and verify values + let batches: Vec<_> = reader.into_iter().collect(); + assert_eq!(batches.len(), 1, "should have one batch"); + let batch = batches[0].as_ref().expect("batch should be ok"); + + assert_eq!(batch.num_rows(), 2, "should have 2 rows"); + + // Check l_container_id values + let container_col = batch + .column_by_name("l_container_id") + .expect("l_container_id column"); + let container_arr = container_col + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(container_arr.value(0), "abc123"); + assert_eq!(container_arr.value(1), "def456"); + + // Check l_qos_class values (second row should be null) + let qos_col = batch + .column_by_name("l_qos_class") + .expect("l_qos_class column"); + let qos_arr = qos_col + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(qos_arr.value(0), "Guaranteed"); + assert!(qos_arr.is_null(1), "second row should have null qos_class"); + } } diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index b8cb7d43b..a6ed106b0 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -463,7 +463,7 @@ impl std::fmt::Debug for RotationRequest { } } -/// Handle for sending rotation requests to a running CaptureManager +/// Handle for sending rotation requests to a running [`CaptureManager`] pub type RotationSender = mpsc::Sender; impl CaptureManager>, RealClock> { @@ -510,6 +510,10 @@ impl CaptureManager>, RealCloc /// Returns a [`RotationSender`] immediately that can be used to trigger /// rotations while the event loop runs. /// + /// # Panics + /// + /// Panics if the shutdown watcher is missing (should never happen in normal use). + /// /// # Errors /// /// Returns an error if there is already a global recorder set. @@ -541,7 +545,10 @@ impl CaptureManager>, RealCloc let global_labels = self.global_labels; let clock = self.clock; let recv = self.recv; - let shutdown = self.shutdown.take().expect("shutdown watcher must be present"); + let shutdown = self + .shutdown + .take() + .expect("shutdown watcher must be present"); tokio::spawn(async move { if let Err(e) = Self::rotation_event_loop( @@ -568,6 +575,7 @@ impl CaptureManager>, RealCloc /// Internal event loop with rotation support #[allow(clippy::too_many_arguments)] + #[allow(clippy::cast_possible_truncation)] async fn rotation_event_loop( expiration: Duration, format: formats::parquet::Format>, @@ -647,7 +655,7 @@ impl CaptureManager>, RealCloc // Swap formats - this flushes any buffered data let old_format = state_machine .replace_format(new_format) - .map_err(|e| formats::Error::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?; + .map_err(|e| formats::Error::Io(io::Error::other(e.to_string())))?; // Close old format to write Parquet footer old_format.close()?; diff --git a/lading_capture/src/validate/parquet.rs b/lading_capture/src/validate/parquet.rs index 2f77f9323..d46fbf980 100644 --- a/lading_capture/src/validate/parquet.rs +++ b/lading_capture/src/validate/parquet.rs @@ -10,9 +10,7 @@ use std::fs::File; use std::hash::{BuildHasher, Hasher}; use std::path::Path; -use arrow_array::{ - Array, MapArray, StringArray, StructArray, TimestampMillisecondArray, UInt64Array, -}; +use arrow_array::{Array, StringArray, TimestampMillisecondArray, UInt64Array}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use crate::validate::ValidationResult; @@ -120,12 +118,23 @@ pub fn validate_parquet>( Error::InvalidColumnType("'metric_name' column is not String".to_string()) })?; - let labels_array = batch - .column_by_name("labels") - .ok_or_else(|| Error::MissingColumn("labels".to_string()))? - .as_any() - .downcast_ref::() - .ok_or_else(|| Error::InvalidColumnType("'labels' column is not Map".to_string()))?; + // Collect l_* columns for label extraction (new schema uses flat columns) + let schema = batch.schema(); + let l_columns: Vec<(&str, &StringArray)> = schema + .fields() + .iter() + .filter_map(|field| { + let name = field.name(); + if name.starts_with("l_") { + batch + .column_by_name(name) + .and_then(|c| c.as_any().downcast_ref::()) + .map(|arr| (name.strip_prefix("l_").unwrap_or(name), arr)) + } else { + None + } + }) + .collect(); let metric_kind_array = batch .column_by_name("metric_kind") @@ -172,27 +181,13 @@ pub fn validate_parquet>( fetch_index_to_time.insert(fetch_index, time); } - let labels_slice: StructArray = labels_array.value(row); - let key_array = labels_slice - .column(0) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType("Labels keys are not StringArray".to_string()) - })?; - let value_array = labels_slice - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType("Labels values are not StringArray".to_string()) - })?; - + // Extract labels from l_* columns let mut sorted_labels: BTreeSet = BTreeSet::new(); - for i in 0..key_array.len() { - let key = key_array.value(i); - let value = value_array.value(i); - sorted_labels.insert(format!("{key}:{value}")); + for (key, arr) in &l_columns { + if !arr.is_null(row) { + let value = arr.value(row); + sorted_labels.insert(format!("{key}:{value}")); + } } let mut hasher = hash_builder.build_hasher(); From fc017f16816417f1570d72395910ce0e3641b077 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Fri, 2 Jan 2026 15:39:08 -0500 Subject: [PATCH 8/9] Add bloom filter configuration API for parquet format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add BloomFilterConfig and BloomFilterColumn types to configure bloom filters on label columns. Bloom filters enable efficient query-time filtering by allowing readers to skip row groups that definitely don't contain a target value. New APIs: - Format::with_bloom_filter() - create writer with bloom filter config - format.bloom_filter_config() - getter for rotation - CaptureManager::new_parquet_with_bloom_filter() - CaptureManager::new_multi_with_bloom_filter() Backwards compatible - existing Format::new() and new_parquet() still work unchanged using BloomFilterConfig::default(). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/formats/parquet.rs | 101 ++++++++++++++++++++++++-- lading_capture/src/manager.rs | 97 ++++++++++++++++++++++++- 2 files changed, 189 insertions(+), 9 deletions(-) diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index 6db3bd38b..5cc652771 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -33,6 +33,52 @@ use parquet::{ use crate::line; +/// Configuration for a single bloom filter column +#[derive(Debug, Clone)] +pub struct BloomFilterColumn { + /// Label name (e.g., `container_id` will be applied to `l_container_id` column) + pub label_name: String, + /// Expected number of distinct values (NDV) for sizing the bloom filter. + /// Higher values create larger filters with lower false positive rates. + pub ndv: u64, +} + +impl BloomFilterColumn { + /// Create a new bloom filter column configuration + #[must_use] + pub fn new(label_name: impl Into, ndv: u64) -> Self { + Self { + label_name: label_name.into(), + ndv, + } + } +} + +/// Configuration for bloom filters on label columns +/// +/// Bloom filters enable efficient query-time filtering by allowing readers +/// to skip row groups that definitely don't contain a target value. +/// This is especially useful for high-cardinality label columns like +/// `container_id` where queries often filter for a specific value. +/// +/// # Example +/// +/// ``` +/// use lading_capture::formats::parquet::{BloomFilterConfig, BloomFilterColumn}; +/// +/// let config = BloomFilterConfig { +/// columns: vec![ +/// BloomFilterColumn::new("container_id", 100), +/// ], +/// }; +/// ``` +#[derive(Debug, Clone, Default)] +pub struct BloomFilterConfig { + /// Label columns to enable bloom filters on. + /// Each column specifies the label name (without `l_` prefix) and expected NDV. + pub columns: Vec, +} + /// Parquet format errors #[derive(thiserror::Error, Debug)] pub enum Error { @@ -176,6 +222,8 @@ pub struct Format { schema_label_keys: Vec, /// Compression level for Zstd (stored for rotation) compression_level: i32, + /// Bloom filter configuration (stored for rotation) + bloom_filter_config: BloomFilterConfig, } /// Label column prefix for flattened labels @@ -193,6 +241,25 @@ impl Format { /// /// Returns error if compression level is invalid pub fn new(writer: W, compression_level: i32) -> Result { + Self::with_bloom_filter(writer, compression_level, BloomFilterConfig::default()) + } + + /// Create a new Parquet format writer with bloom filter configuration + /// + /// # Arguments + /// + /// * `writer` - Writer implementing Write + Seek for Parquet output + /// * `compression_level` - Zstd compression level (1-22) + /// * `bloom_filter_config` - Configuration for bloom filters on label columns + /// + /// # Errors + /// + /// Returns error if compression level is invalid + pub fn with_bloom_filter( + writer: W, + compression_level: i32, + bloom_filter_config: BloomFilterConfig, + ) -> Result { // Validate compression level early let _ = ZstdLevel::try_new(compression_level)?; @@ -203,6 +270,7 @@ impl Format { schema: None, schema_label_keys: Vec::new(), compression_level, + bloom_filter_config, }) } @@ -245,6 +313,7 @@ impl Format { fn create_writer_properties( compression_level: i32, label_keys: &[String], + bloom_filter_config: &BloomFilterConfig, ) -> Result { // Use Parquet v2 format for better encodings and compression: // @@ -271,6 +340,17 @@ impl Format { ); } + // Enable bloom filters for configured label columns + // Bloom filters allow readers to skip row groups that definitely don't + // contain the target value, improving query performance significantly. + for bloom_col in &bloom_filter_config.columns { + let column_name = format!("{LABEL_COLUMN_PREFIX}{}", bloom_col.label_name); + let column_path = ColumnPath::from(column_name); + + builder = builder.set_column_bloom_filter_enabled(column_path.clone(), true); + builder = builder.set_column_bloom_filter_ndv(column_path, bloom_col.ndv); + } + Ok(builder.build()) } @@ -284,7 +364,11 @@ impl Format { .expect("raw_writer should be present before initialization"); let (schema, ordered_keys) = Self::generate_schema(&self.buffers.unique_label_keys); - let props = Self::create_writer_properties(self.compression_level, &ordered_keys)?; + let props = Self::create_writer_properties( + self.compression_level, + &ordered_keys, + &self.bloom_filter_config, + )?; let arrow_writer = ArrowWriter::try_new(raw_writer, schema.clone(), Some(props))?; @@ -454,22 +538,23 @@ impl Format> { /// Rotate to a new output file /// /// Closes the current Parquet file (writing footer) and opens a new file - /// at the specified path with the same compression settings. + /// at the specified path with the same compression and bloom filter settings. /// /// # Errors /// /// Returns an error if closing the current file or creating the new file fails. pub fn rotate_to(self, path: &std::path::Path) -> Result { - // Store compression level before closing + // Store settings before closing let compression_level = self.compression_level; + let bloom_filter_config = self.bloom_filter_config.clone(); // Close current file (writes footer) self.close()?; - // Create new file and writer + // Create new file and writer with same settings let file = File::create(path)?; let writer = BufWriter::new(file); - let format = Self::new(writer, compression_level)?; + let format = Self::with_bloom_filter(writer, compression_level, bloom_filter_config)?; Ok(format) } @@ -479,6 +564,12 @@ impl Format> { pub fn compression_level(&self) -> i32 { self.compression_level } + + /// Get the bloom filter configuration for this format + #[must_use] + pub fn bloom_filter_config(&self) -> &BloomFilterConfig { + &self.bloom_filter_config + } } #[cfg(test)] diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index a6ed106b0..aae6319ba 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -481,13 +481,55 @@ impl CaptureManager>, RealCloc experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, expiration: Duration, + ) -> Result { + Self::new_parquet_with_bloom_filter( + capture_path, + flush_seconds, + compression_level, + parquet::BloomFilterConfig::default(), + shutdown, + experiment_started, + target_running, + expiration, + ) + .await + } + + /// Create a new [`CaptureManager`] with file-based Parquet writer and bloom filter config + /// + /// # Arguments + /// + /// * `capture_path` - Path to the output Parquet file + /// * `flush_seconds` - How often to flush buffered data + /// * `compression_level` - Zstd compression level (1-22) + /// * `bloom_filter_config` - Configuration for bloom filters on label columns + /// * `shutdown` - Signal to gracefully shut down the capture manager + /// * `experiment_started` - Signal that the experiment has started + /// * `target_running` - Signal that the target is running + /// * `expiration` - Duration after which metrics expire + /// + /// # Errors + /// + /// Function will error if the underlying capture file cannot be opened or + /// if Parquet writer creation fails. + #[allow(clippy::too_many_arguments)] + pub async fn new_parquet_with_bloom_filter( + capture_path: PathBuf, + flush_seconds: u64, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + shutdown: lading_signal::Watcher, + experiment_started: lading_signal::Watcher, + target_running: lading_signal::Watcher, + expiration: Duration, ) -> Result { let fp = fs::File::create(&capture_path) .await .map_err(formats::Error::Io)?; let fp = fp.into_std().await; let writer = BufWriter::new(fp); - let format = parquet::Format::new(writer, compression_level)?; + let format = + parquet::Format::with_bloom_filter(writer, compression_level, bloom_filter_config)?; Ok(Self::new_with_format( format, @@ -535,6 +577,7 @@ impl CaptureManager>, RealCloc self.clock.mark_start(); let compression_level = self.format.compression_level(); + let bloom_filter_config = self.format.bloom_filter_config().clone(); // Run the event loop in a spawned task so we can return the sender immediately let expiration = self.expiration; @@ -563,6 +606,7 @@ impl CaptureManager>, RealCloc shutdown, rotation_rx, compression_level, + bloom_filter_config, ) .await { @@ -588,6 +632,7 @@ impl CaptureManager>, RealCloc shutdown: lading_signal::Watcher, mut rotation_rx: mpsc::Receiver, compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, ) -> Result<(), Error> { let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64)); let shutdown_wait = shutdown.recv(); @@ -620,6 +665,7 @@ impl CaptureManager>, RealCloc &mut state_machine, rotation_req.path, compression_level, + &bloom_filter_config, ).await; // Send result back to caller (ignore send error if receiver dropped) let _ = rotation_req.response.send(result); @@ -643,14 +689,19 @@ impl CaptureManager>, RealCloc >, new_path: PathBuf, compression_level: i32, + bloom_filter_config: &parquet::BloomFilterConfig, ) -> Result<(), formats::Error> { - // Create new file and format + // Create new file and format with same settings let fp = fs::File::create(&new_path) .await .map_err(formats::Error::Io)?; let fp = fp.into_std().await; let writer = BufWriter::new(fp); - let new_format = parquet::Format::new(writer, compression_level)?; + let new_format = parquet::Format::with_bloom_filter( + writer, + compression_level, + bloom_filter_config.clone(), + )?; // Swap formats - this flushes any buffered data let old_format = state_machine @@ -689,6 +740,40 @@ impl experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, expiration: Duration, + ) -> Result { + Self::new_multi_with_bloom_filter( + base_path, + flush_seconds, + compression_level, + parquet::BloomFilterConfig::default(), + shutdown, + experiment_started, + target_running, + expiration, + ) + .await + } + + /// Create a new [`CaptureManager`] with file-based multi-format writer and bloom filter config + /// + /// Writes to both JSONL and Parquet formats simultaneously. The base path + /// is used to generate two output files: `{base_path}.jsonl` and + /// `{base_path}.parquet`. + /// + /// # Errors + /// + /// Function will error if either capture file cannot be opened or if + /// format creation fails. + #[allow(clippy::too_many_arguments)] + pub async fn new_multi_with_bloom_filter( + base_path: PathBuf, + flush_seconds: u64, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + shutdown: lading_signal::Watcher, + experiment_started: lading_signal::Watcher, + target_running: lading_signal::Watcher, + expiration: Duration, ) -> Result { let jsonl_path = base_path.with_extension("jsonl"); let parquet_path = base_path.with_extension("parquet"); @@ -705,7 +790,11 @@ impl .map_err(formats::Error::Io)?; let parquet_file = parquet_file.into_std().await; let parquet_writer = BufWriter::new(parquet_file); - let parquet_format = parquet::Format::new(parquet_writer, compression_level)?; + let parquet_format = parquet::Format::with_bloom_filter( + parquet_writer, + compression_level, + bloom_filter_config, + )?; let format = multi::Format::new(jsonl_format, parquet_format); From 0542dd663ad32cf8d79a8208188988aee2240299 Mon Sep 17 00:00:00 2001 From: Scott Opell Date: Thu, 8 Jan 2026 13:48:40 -0500 Subject: [PATCH 9/9] Return JoinHandle from start_with_rotation for graceful shutdown The start_with_rotation method now returns (RotationSender, JoinHandle<()>) instead of just RotationSender. This allows callers to await the JoinHandle to ensure the CaptureManager has fully drained all buffered metrics and closed the output file before the process exits. This is important for short-lived workloads where the 60-tick accumulator window may contain data that would otherwise be lost if the spawned task is aborted during runtime shutdown. Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/manager.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index aae6319ba..b3e095783 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -549,8 +549,10 @@ impl CaptureManager>, RealCloc /// Parquet file is finalized (footer written) and a new file is created at /// the specified path. /// - /// Returns a [`RotationSender`] immediately that can be used to trigger - /// rotations while the event loop runs. + /// Returns a tuple of ([`RotationSender`], [`JoinHandle`](tokio::task::JoinHandle)) + /// immediately. The `RotationSender` can be used to trigger rotations while + /// the event loop runs. The `JoinHandle` can be awaited to ensure the + /// CaptureManager has fully drained and closed before shutdown. /// /// # Panics /// @@ -560,7 +562,9 @@ impl CaptureManager>, RealCloc /// /// Returns an error if there is already a global recorder set. #[allow(clippy::cast_possible_truncation)] - pub async fn start_with_rotation(mut self) -> Result { + pub async fn start_with_rotation( + mut self, + ) -> Result<(RotationSender, tokio::task::JoinHandle<()>), Error> { // Create rotation channel - return the sender immediately let (rotation_tx, rotation_rx) = mpsc::channel::(4); @@ -593,7 +597,7 @@ impl CaptureManager>, RealCloc .take() .expect("shutdown watcher must be present"); - tokio::spawn(async move { + let handle = tokio::spawn(async move { if let Err(e) = Self::rotation_event_loop( expiration, format, @@ -614,7 +618,7 @@ impl CaptureManager>, RealCloc } }); - Ok(rotation_tx) + Ok((rotation_tx, handle)) } /// Internal event loop with rotation support