diff --git a/lading/src/observer.rs b/lading/src/observer.rs index 6d1f3495f..8df792ac6 100644 --- a/lading/src/observer.rs +++ b/lading/src/observer.rs @@ -13,8 +13,9 @@ use std::io; use crate::target::TargetPidReceiver; use serde::Deserialize; +/// Linux-specific observer implementation using procfs and cgroups #[cfg(target_os = "linux")] -mod linux; +pub mod linux; #[derive(thiserror::Error, Debug)] /// Errors produced by [`Server`] diff --git a/lading/src/observer/linux.rs b/lading/src/observer/linux.rs index 78dc5826f..7d5c8f78f 100644 --- a/lading/src/observer/linux.rs +++ b/lading/src/observer/linux.rs @@ -1,5 +1,7 @@ -mod cgroup; -mod procfs; +/// Cgroup metrics collection +pub mod cgroup; +/// Procfs metrics collection +pub mod procfs; mod utils; mod wss; @@ -19,8 +21,9 @@ pub enum Error { Wss(#[from] wss::Error), } +/// Combined sampler for procfs, cgroup, and WSS metrics #[derive(Debug)] -pub(crate) struct Sampler { +pub struct Sampler { procfs: procfs::Sampler, cgroup: cgroup::Sampler, wss: Option, @@ -28,7 +31,8 @@ pub(crate) struct Sampler { } impl Sampler { - pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { + /// Create a new Sampler for the given parent PID + pub fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { let procfs_sampler = procfs::Sampler::new(parent_pid)?; let cgroup_sampler = cgroup::Sampler::new(parent_pid, labels)?; let wss_sampler = if wss::Sampler::is_available() { @@ -64,7 +68,8 @@ ls -l /sys/kernel/mm/page_idle/bitmap }) } - pub(crate) async fn sample(&mut self) -> Result<(), Error> { + /// Sample all metrics (procfs, cgroup, and optionally WSS) + pub async fn sample(&mut self) -> Result<(), Error> { let sample_smaps = self.tick_counter.is_multiple_of(10); let sample_wss = self.tick_counter.is_multiple_of(60); self.tick_counter += 1; diff --git a/lading/src/observer/linux/cgroup.rs b/lading/src/observer/linux/cgroup.rs index 73de6abd5..139000c44 100644 --- a/lading/src/observer/linux/cgroup.rs +++ b/lading/src/observer/linux/cgroup.rs @@ -1,5 +1,5 @@ /// Code to read cgroup information. -pub(crate) mod v2; +pub mod v2; use std::{collections::VecDeque, io, path::PathBuf}; @@ -33,15 +33,17 @@ struct CgroupInfo { cpu_sampler: cpu::Sampler, } +/// Samples cgroup metrics for a process tree #[derive(Debug)] -pub(crate) struct Sampler { +pub struct Sampler { parent: Process, cgroup_info: FxHashMap, labels: Vec<(String, String)>, } impl Sampler { - pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { + /// Create a new cgroup Sampler for the given parent PID + pub fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { let parent = Process::new(parent_pid)?; let cgroup_info = FxHashMap::default(); @@ -52,8 +54,9 @@ impl Sampler { }) } + /// Poll cgroup metrics for all processes in the tree #[allow(clippy::cast_possible_wrap)] - pub(crate) async fn poll(&mut self) -> Result<(), Error> { + pub async fn poll(&mut self) -> Result<(), Error> { // Every sample run we collect all the child processes rooted at the // parent. As noted by the procfs documentation is this done by // dereferencing the `/proc//root` symlink. diff --git a/lading/src/observer/linux/cgroup/v2.rs b/lading/src/observer/linux/cgroup/v2.rs index 4d435e57d..a71a32e95 100644 --- a/lading/src/observer/linux/cgroup/v2.rs +++ b/lading/src/observer/linux/cgroup/v2.rs @@ -1,6 +1,7 @@ -pub(crate) mod cpu; -pub(crate) mod io; -pub(crate) mod memory; +/// CPU metrics from cgroup v2 +pub mod cpu; +pub mod io; +pub mod memory; use core::f64; use std::{ @@ -12,22 +13,28 @@ use metrics::{counter, gauge}; use tokio::fs; use tracing::{debug, error, warn}; +/// Errors produced by cgroup v2 functions #[derive(thiserror::Error, Debug)] pub enum Error { + /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Integer parsing error #[error("Parse int error: {0}")] ParseInt(#[from] std::num::ParseIntError), + /// Float parsing error #[error("Parse float error: {0}")] ParseFloat(#[from] std::num::ParseFloatError), + /// Cgroup v2 hierarchy not found for process #[error("Cgroup v2 not found")] CgroupV2NotFound, + /// PSI (Pressure Stall Information) parsing error #[error("Parsing PSI error: {0}")] ParsingPsi(String), } /// Determines the cgroup v2 path for a given PID. -pub(crate) async fn get_path(pid: i32) -> Result { +pub async fn get_path(pid: i32) -> Result { let path = format!("/proc/{pid}/cgroup"); let content = fs::read_to_string(path).await?; @@ -52,7 +59,7 @@ pub(crate) async fn get_path(pid: i32) -> Result { /// Polls for any cgroup metrics that can be read, v2 version. #[tracing::instrument(skip_all)] #[allow(clippy::too_many_lines)] -pub(crate) async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> { +pub async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> { // Read all files in the cgroup `path` and create metrics for them. If we // lack permissions to read we skip the file. We do not use ? to allow for // the maximal number of files to be read. diff --git a/lading/src/observer/linux/cgroup/v2/cpu.rs b/lading/src/observer/linux/cgroup/v2/cpu.rs index fb0e98115..694518524 100644 --- a/lading/src/observer/linux/cgroup/v2/cpu.rs +++ b/lading/src/observer/linux/cgroup/v2/cpu.rs @@ -5,12 +5,14 @@ use tokio::fs; #[derive(thiserror::Error, Debug)] /// Errors produced by functions in this module -pub(crate) enum Error { +pub enum Error { /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Float parsing error #[error("Float Parsing: {0}")] ParseFloat(#[from] std::num::ParseFloatError), + /// Integer parsing error #[error("Integer Parsing: {0}")] ParseInt(#[from] std::num::ParseIntError), } @@ -23,25 +25,22 @@ struct Stats { last_instant: Instant, } -#[derive(Debug)] -pub(crate) struct Sampler { - prev: Stats, +/// Samples CPU statistics from cgroup v2 with delta calculations +#[derive(Debug, Default)] +pub struct Sampler { + /// Previous stats for delta calculation. None on first poll. + prev: Option, } impl Sampler { - pub(crate) fn new() -> Self { - Self { - prev: Stats { - usage_usec: 0, - user_usec: 0, - system_usec: 0, - last_instant: Instant::now(), - }, - } + /// Create a new CPU Sampler + #[must_use] + pub fn new() -> Self { + Self { prev: None } } - // Read cgroup CPU data and calculate a percentage of usage. - pub(crate) async fn poll( + /// Read cgroup CPU data and calculate a percentage of usage. + pub async fn poll( &mut self, group_prefix: &Path, labels: &[(String, String)], @@ -80,17 +79,29 @@ impl Sampler { } let now = Instant::now(); - let delta_time = now.duration_since(self.prev.last_instant).as_micros(); - let delta_usage = usage_usec.saturating_sub(self.prev.usage_usec); - let delta_user = user_usec.saturating_sub(self.prev.user_usec); - let delta_system = system_usec.saturating_sub(self.prev.system_usec); - - // Update previous stats and if there's a time delta calculate the CPU - // usage. - self.prev.usage_usec = usage_usec; - self.prev.user_usec = user_usec; - self.prev.system_usec = system_usec; - self.prev.last_instant = now; + let current = Stats { + usage_usec, + user_usec, + system_usec, + last_instant: now, + }; + + // On first poll, just record baseline stats without emitting metrics. + // This avoids a spike where delta = (cumulative_since_container_start - 0). + let Some(ref prev) = self.prev else { + self.prev = Some(current); + return Ok(()); + }; + + let delta_time = now.duration_since(prev.last_instant).as_micros(); + let delta_usage = usage_usec.saturating_sub(prev.usage_usec); + let delta_user = user_usec.saturating_sub(prev.user_usec); + let delta_system = system_usec.saturating_sub(prev.system_usec); + + // Update previous stats for next poll + self.prev = Some(current); + + // Emit metrics if there's a time delta if delta_time > 0 { let delta_time = delta_time as f64; diff --git a/lading/src/observer/linux/cgroup/v2/memory.rs b/lading/src/observer/linux/cgroup/v2/memory.rs index d5f356e83..046ef44ac 100644 --- a/lading/src/observer/linux/cgroup/v2/memory.rs +++ b/lading/src/observer/linux/cgroup/v2/memory.rs @@ -24,7 +24,7 @@ use tracing::warn; /// * `pgdeactivate` /// * `pglazyfree` /// * `pglazyfreed` -pub(crate) fn stat(content: &str, metric_prefix: &str, labels: &[(String, String)]) { +pub fn stat(content: &str, metric_prefix: &str, labels: &[(String, String)]) { let counter_keys = [ "pgfault", "pgmajfault", diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index dabc1f87c..bd0962141 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -1,7 +1,9 @@ /// Sampler implementation for procfs filesystems -mod memory; -mod stat; -mod uptime; +pub mod memory; +/// Per-process CPU statistics from /proc//stat +pub mod stat; +/// System uptime from /proc/uptime +pub mod uptime; mod vmstat; use std::io; @@ -79,14 +81,16 @@ struct ProcessInfo { stat_sampler: stat::Sampler, } +/// Samples procfs metrics for a process tree #[derive(Debug)] -pub(crate) struct Sampler { +pub struct Sampler { parent: Process, process_info: FxHashMap, } impl Sampler { - pub(crate) fn new(parent_pid: i32) -> Result { + /// Create a new procfs Sampler for the given parent PID + pub fn new(parent_pid: i32) -> Result { let parent = Process::new(parent_pid)?; let process_info = FxHashMap::default(); @@ -96,6 +100,7 @@ impl Sampler { }) } + /// Poll procfs metrics for all processes in the tree #[allow( clippy::similar_names, clippy::too_many_lines, @@ -104,7 +109,7 @@ impl Sampler { clippy::cast_possible_wrap, clippy::cast_lossless )] - pub(crate) async fn poll(&mut self, include_smaps: bool) -> Result<(), Error> { + pub async fn poll(&mut self, include_smaps: bool) -> Result<(), Error> { // A tally of the total RSS and PSS consumed by the parent process and // its children. let mut aggr = smaps_rollup::Aggregator::default(); diff --git a/lading/src/observer/linux/procfs/memory.rs b/lading/src/observer/linux/procfs/memory.rs index 99775a7fc..b84a80417 100644 --- a/lading/src/observer/linux/procfs/memory.rs +++ b/lading/src/observer/linux/procfs/memory.rs @@ -1,5 +1,7 @@ -pub(crate) mod smaps; -pub(crate) mod smaps_rollup; +/// Per-process memory regions from /proc//smaps +pub mod smaps; +/// Rolled-up memory statistics from /proc//smaps_rollup +pub mod smaps_rollup; const BYTES_PER_KIBIBYTE: u64 = 1024; diff --git a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs b/lading/src/observer/linux/procfs/memory/smaps_rollup.rs index 17c6b777d..46b5e4a6d 100644 --- a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs +++ b/lading/src/observer/linux/procfs/memory/smaps_rollup.rs @@ -6,24 +6,29 @@ use super::{BYTES_PER_KIBIBYTE, next_token}; #[derive(thiserror::Error, Debug)] /// Errors produced by functions in this module -pub(crate) enum Error { +pub enum Error { /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Integer parsing error #[error("Number Parsing: {0}")] ParseInt(#[from] std::num::ParseIntError), + /// General parsing error #[error("Parsing: {0}")] Parsing(String), } +/// Aggregates memory metrics from smaps_rollup #[derive(Debug, Clone, Copy, Default)] -pub(crate) struct Aggregator { - pub(crate) rss: u64, - pub(crate) pss: u64, +pub struct Aggregator { + /// Resident Set Size in bytes + pub rss: u64, + /// Proportional Set Size in bytes + pub pss: u64, } -// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. -pub(crate) async fn poll( +/// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. +pub async fn poll( pid: i32, labels: &[(&'static str, String)], aggr: &mut Aggregator, diff --git a/lading/src/observer/linux/procfs/stat.rs b/lading/src/observer/linux/procfs/stat.rs index ad85813c7..36556f683 100644 --- a/lading/src/observer/linux/procfs/stat.rs +++ b/lading/src/observer/linux/procfs/stat.rs @@ -3,16 +3,22 @@ use tokio::fs; use crate::observer::linux::cgroup; +/// Errors produced by functions in this module #[derive(thiserror::Error, Debug)] pub enum Error { + /// Wrapper for [`std::io::Error`] #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Float parsing error #[error("Float Parsing: {0}")] ParseFloat(#[from] std::num::ParseFloatError), + /// Integer parsing error #[error("Integer Parsing: {0}")] ParseInt(#[from] std::num::ParseIntError), + /// Stat file format error #[error("Stat Malformed: {0}")] StatMalformed(&'static str), + /// Cgroup path resolution error #[error("Cgroup get_path: {0}")] Cgroup(#[from] cgroup::v2::Error), } @@ -35,22 +41,26 @@ struct CpuUtilization { system_cpu_millicores: f64, } -#[derive(Debug)] -pub(crate) struct Sampler { +/// Samples CPU statistics from /proc//stat with delta calculations +#[derive(Debug, Clone, Copy)] +pub struct Sampler { ticks_per_second: f64, - prev: Stats, + /// Previous stats for delta calculation. None on first poll. + prev: Option, } impl Sampler { - pub(crate) fn new() -> Self { + /// Create a new Sampler + pub fn new() -> Self { Self { ticks_per_second: unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64, - prev: Stats::default(), + prev: None, } } + /// Poll CPU statistics and emit metrics #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] - pub(crate) async fn poll( + pub async fn poll( &mut self, pid: i32, uptime_secs: f64, @@ -78,16 +88,20 @@ impl Sampler { let (cur_pid, utime_ticks, stime_ticks) = parse(&stat_contents)?; assert!(cur_pid == pid); - // Get or initialize the previous stats. Note that the first time this is - // initialized we intentionally set last_instance to now to avoid scheduling - // shenanigans. let cur_stats = Stats { user_ticks: utime_ticks, system_ticks: stime_ticks, uptime_ticks: (uptime_secs * self.ticks_per_second).round() as u64, }; - if let Some(util) = compute_cpu_usage(self.prev, cur_stats, allowed_cores) { + // On first poll, just record baseline stats without emitting metrics. + // This avoids a spike where delta = (cumulative_since_process_start - 0). + let Some(ref prev) = self.prev else { + self.prev = Some(cur_stats); + return Ok(()); + }; + + if let Some(util) = compute_cpu_usage(*prev, cur_stats, allowed_cores) { // NOTE these metric names are paired with names in cgroup/v2/cpu.rs and // must remain consistent. If you change these, change those. gauge!("stat.total_cpu_percentage", labels).set(util.total_cpu_percentage); @@ -103,7 +117,7 @@ impl Sampler { gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores); } - self.prev = cur_stats; + self.prev = Some(cur_stats); Ok(()) } diff --git a/lading/src/observer/linux/procfs/uptime.rs b/lading/src/observer/linux/procfs/uptime.rs index e548be138..b27d62fb9 100644 --- a/lading/src/observer/linux/procfs/uptime.rs +++ b/lading/src/observer/linux/procfs/uptime.rs @@ -15,7 +15,7 @@ pub enum Error { /// Read `/proc/uptime` /// /// Only the first field is used, which is the total uptime in seconds. -pub(crate) async fn poll() -> Result { +pub async fn poll() -> Result { let buf = tokio::fs::read_to_string("/proc/uptime").await?; let uptime_secs = proc_uptime_inner(&buf)?; Ok(uptime_secs) diff --git a/lading_capture/src/formats.rs b/lading_capture/src/formats.rs index 571f09b88..3585ef5e7 100644 --- a/lading_capture/src/formats.rs +++ b/lading_capture/src/formats.rs @@ -24,6 +24,9 @@ pub enum Error { /// IO errors during write operations #[error("IO error: {0}")] Io(#[from] std::io::Error), + /// Rotation not supported for this format + #[error("File rotation not supported for this format")] + RotationNotSupported, } /// Trait for output format implementations @@ -63,6 +66,29 @@ pub trait OutputFormat { /// /// Returns an error if closing fails. fn close(self) -> Result<(), Error>; + + /// Rotate to a new output file + /// + /// Flushes and closes the current file (writing footer for Parquet), then + /// opens a new file at the specified path. This allows continuous metrics + /// collection while producing multiple readable output files. + /// + /// The default implementation returns `RotationNotSupported`. Formats that + /// support rotation (like Parquet with file-based writers) should override. + /// + /// # Arguments + /// + /// * `path` - Path for the new output file + /// + /// # Errors + /// + /// Returns an error if rotation is not supported or if file operations fail. + fn rotate(self, _path: std::path::PathBuf) -> Result + where + Self: Sized, + { + Err(Error::RotationNotSupported) + } } #[cfg(test)] @@ -71,8 +97,7 @@ mod tests { use crate::line::{Line, LineValue, MetricKind}; use approx::relative_eq; use arrow_array::{ - Array, BinaryArray, Float64Array, MapArray, StringArray, StructArray, - TimestampMillisecondArray, UInt64Array, + Array, BinaryArray, Float64Array, StringArray, TimestampMillisecondArray, UInt64Array, }; use bytes::Bytes; use datadog_protos::metrics::Dogsketch; @@ -461,12 +486,23 @@ mod tests { .downcast_ref::() .expect("value_float is Float64Array"); - let labels_array = batch - .column_by_name("labels") - .expect("labels column") - .as_any() - .downcast_ref::() - .expect("labels is MapArray"); + // Collect l_* columns for label extraction (new schema uses flat columns) + let schema = batch.schema(); + let l_columns: Vec<(&str, &StringArray)> = schema + .fields() + .iter() + .filter_map(|field| { + let name = field.name(); + if name.starts_with("l_") { + batch + .column_by_name(name) + .and_then(|c| c.as_any().downcast_ref::()) + .map(|arr| (name.strip_prefix("l_").unwrap_or(name), arr)) + } else { + None + } + }) + .collect(); let value_histogram_array = batch .column_by_name("value_histogram") @@ -498,21 +534,12 @@ mod tests { LineValue::Int(value_int_array.value(row_idx)) }; - let labels_slice: StructArray = labels_array.value(row_idx); - let keys = labels_slice - .column(0) - .as_any() - .downcast_ref::() - .expect("label keys are strings"); - let values = labels_slice - .column(1) - .as_any() - .downcast_ref::() - .expect("label values are strings"); - + // Extract labels from l_* columns let mut labels = FxHashMap::default(); - for i in 0..keys.len() { - labels.insert(keys.value(i).to_string(), values.value(i).to_string()); + for (key, arr) in &l_columns { + if !arr.is_null(row_idx) { + labels.insert((*key).to_string(), arr.value(row_idx).to_string()); + } } let value_histogram = if value_histogram_array.is_null(row_idx) { diff --git a/lading_capture/src/formats/parquet.rs b/lading_capture/src/formats/parquet.rs index be327c59a..5cc652771 100644 --- a/lading_capture/src/formats/parquet.rs +++ b/lading_capture/src/formats/parquet.rs @@ -3,18 +3,27 @@ //! This format buffers metrics in memory and periodically writes them as //! Parquet row groups, providing better compression and query performance than //! JSONL. +//! +//! Labels are stored as top-level columns with `l_` prefix (e.g., `l_container_id`, +//! `l_namespace`). This enables Parquet predicate pushdown for efficient filtering +//! by any label key, unlike the previous `MapArray` approach which required post-read +//! filtering. +//! +//! Schema is determined dynamically at first flush based on label keys discovered +//! in the buffered data. All label columns are nullable Utf8 strings. use std::{ - io::{Seek, Write}, + collections::{BTreeMap, BTreeSet}, + fs::File, + io::{BufWriter, Seek, Write}, sync::Arc, }; use arrow_array::{ - ArrayRef, BinaryArray, Float64Array, MapArray, RecordBatch, StringArray, StructArray, - TimestampMillisecondArray, UInt64Array, + ArrayRef, BinaryArray, Float64Array, RecordBatch, StringArray, TimestampMillisecondArray, + UInt64Array, }; -use arrow_buffer::OffsetBuffer; -use arrow_schema::{ArrowError, DataType, Field, Fields, Schema, TimeUnit}; +use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; use parquet::{ arrow::ArrowWriter, basic::{Compression, ZstdLevel}, @@ -24,6 +33,52 @@ use parquet::{ use crate::line; +/// Configuration for a single bloom filter column +#[derive(Debug, Clone)] +pub struct BloomFilterColumn { + /// Label name (e.g., `container_id` will be applied to `l_container_id` column) + pub label_name: String, + /// Expected number of distinct values (NDV) for sizing the bloom filter. + /// Higher values create larger filters with lower false positive rates. + pub ndv: u64, +} + +impl BloomFilterColumn { + /// Create a new bloom filter column configuration + #[must_use] + pub fn new(label_name: impl Into, ndv: u64) -> Self { + Self { + label_name: label_name.into(), + ndv, + } + } +} + +/// Configuration for bloom filters on label columns +/// +/// Bloom filters enable efficient query-time filtering by allowing readers +/// to skip row groups that definitely don't contain a target value. +/// This is especially useful for high-cardinality label columns like +/// `container_id` where queries often filter for a specific value. +/// +/// # Example +/// +/// ``` +/// use lading_capture::formats::parquet::{BloomFilterConfig, BloomFilterColumn}; +/// +/// let config = BloomFilterConfig { +/// columns: vec![ +/// BloomFilterColumn::new("container_id", 100), +/// ], +/// }; +/// ``` +#[derive(Debug, Clone, Default)] +pub struct BloomFilterConfig { + /// Label columns to enable bloom filters on. + /// Each column specifies the label name (without `l_` prefix) and expected NDV. + pub columns: Vec, +} + /// Parquet format errors #[derive(thiserror::Error, Debug)] pub enum Error { @@ -43,6 +98,9 @@ pub enum Error { /// Holds reusable allocations for all columns to avoid repeated allocation /// during flush operations. Buffers are cleared after each write, preserving /// capacity for the next batch. +/// +/// Labels are stored per-row as a map, with all unique keys tracked separately +/// to enable dynamic schema generation with `l_` columns. #[derive(Debug)] struct ColumnBuffers { run_ids: Vec, @@ -52,9 +110,12 @@ struct ColumnBuffers { metric_kinds: Vec<&'static str>, values_int: Vec>, values_float: Vec>, - label_keys: Vec, - label_values: Vec, - label_offsets: Vec, + /// Per-row label maps. Each entry contains the labels for one metric row. + /// Using `BTreeMap` for consistent ordering when iterating. + row_labels: Vec>, + /// All unique label keys seen in this batch. Used to generate schema. + /// `BTreeSet` ensures consistent column ordering across runs. + unique_label_keys: BTreeSet, values_histogram: Vec>, } @@ -71,9 +132,8 @@ impl ColumnBuffers { metric_kinds: Vec::with_capacity(INITIAL_CAPACITY), values_int: Vec::with_capacity(INITIAL_CAPACITY), values_float: Vec::with_capacity(INITIAL_CAPACITY), - label_keys: Vec::with_capacity(INITIAL_CAPACITY * 2), - label_values: Vec::with_capacity(INITIAL_CAPACITY * 2), - label_offsets: Vec::with_capacity(INITIAL_CAPACITY + 1), + row_labels: Vec::with_capacity(INITIAL_CAPACITY), + unique_label_keys: BTreeSet::new(), values_histogram: Vec::with_capacity(INITIAL_CAPACITY), } } @@ -87,9 +147,9 @@ impl ColumnBuffers { self.metric_kinds.clear(); self.values_int.clear(); self.values_float.clear(); - self.label_keys.clear(); - self.label_values.clear(); - self.label_offsets.clear(); + self.row_labels.clear(); + // Note: unique_label_keys is NOT cleared - it accumulates across flushes + // to maintain consistent schema within a file self.values_histogram.clear(); } @@ -117,13 +177,13 @@ impl ColumnBuffers { } } - // Add labels for this row + // Store labels for this row and track unique keys + let mut row_map = BTreeMap::new(); for (k, v) in &line.labels { - self.label_keys.push(k.clone()); - self.label_values.push(v.clone()); + self.unique_label_keys.insert(k.clone()); + row_map.insert(k.clone(), v.clone()); } - #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)] - self.label_offsets.push(self.label_keys.len() as i32); + self.row_labels.push(row_map); self.values_histogram.push(line.value_histogram.clone()); } @@ -144,16 +204,31 @@ impl ColumnBuffers { /// /// Buffers metrics in memory. Calling `flush()` writes accumulated metrics as a /// Parquet row group. +/// +/// The schema is determined dynamically at first flush based on label keys +/// discovered in the buffered data. Label columns use the `l_` prefix +/// (e.g., `l_container_id`). #[derive(Debug)] pub struct Format { /// Reusable column buffers for building Arrow arrays buffers: ColumnBuffers, - /// Parquet writer - writer: ArrowWriter, - /// Pre-computed Arrow schema - schema: Arc, + /// Parquet writer - created lazily on first flush when schema is known + writer: Option>, + /// The underlying writer, stored until `ArrowWriter` is created + raw_writer: Option, + /// Arrow schema - created on first flush based on discovered label keys + schema: Option>, + /// Ordered list of label keys in schema (for consistent column ordering) + schema_label_keys: Vec, + /// Compression level for Zstd (stored for rotation) + compression_level: i32, + /// Bloom filter configuration (stored for rotation) + bloom_filter_config: BloomFilterConfig, } +/// Label column prefix for flattened labels +const LABEL_COLUMN_PREFIX: &str = "l_"; + impl Format { /// Create a new Parquet format writer /// @@ -164,9 +239,48 @@ impl Format { /// /// # Errors /// - /// Returns error if Arrow writer creation fails + /// Returns error if compression level is invalid pub fn new(writer: W, compression_level: i32) -> Result { - let schema = Arc::new(Schema::new(vec![ + Self::with_bloom_filter(writer, compression_level, BloomFilterConfig::default()) + } + + /// Create a new Parquet format writer with bloom filter configuration + /// + /// # Arguments + /// + /// * `writer` - Writer implementing Write + Seek for Parquet output + /// * `compression_level` - Zstd compression level (1-22) + /// * `bloom_filter_config` - Configuration for bloom filters on label columns + /// + /// # Errors + /// + /// Returns error if compression level is invalid + pub fn with_bloom_filter( + writer: W, + compression_level: i32, + bloom_filter_config: BloomFilterConfig, + ) -> Result { + // Validate compression level early + let _ = ZstdLevel::try_new(compression_level)?; + + Ok(Self { + buffers: ColumnBuffers::new(), + writer: None, + raw_writer: Some(writer), + schema: None, + schema_label_keys: Vec::new(), + compression_level, + bloom_filter_config, + }) + } + + /// Generate schema based on discovered label keys + /// + /// Creates base columns plus `l_` columns for each unique label key. + /// Label columns are nullable Utf8 strings, sorted alphabetically for + /// consistent ordering. + fn generate_schema(label_keys: &BTreeSet) -> (Arc, Vec) { + let mut fields = vec![ Field::new("run_id", DataType::Utf8, false), Field::new( "time", @@ -178,24 +292,29 @@ impl Format { Field::new("metric_kind", DataType::Utf8, false), Field::new("value_int", DataType::UInt64, true), Field::new("value_float", DataType::Float64, true), - Field::new( - "labels", - DataType::Map( - Arc::new(Field::new( - "entries", - DataType::Struct(Fields::from(vec![ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, false), - ])), - false, - )), - false, - ), - false, - ), - Field::new("value_histogram", DataType::Binary, true), - ])); + ]; + + // Add l_ columns for each label key (sorted by BTreeSet) + let ordered_keys: Vec = label_keys.iter().cloned().collect(); + for key in &ordered_keys { + fields.push(Field::new( + format!("{LABEL_COLUMN_PREFIX}{key}"), + DataType::Utf8, + true, // nullable - not all rows have all labels + )); + } + + fields.push(Field::new("value_histogram", DataType::Binary, true)); + (Arc::new(Schema::new(fields)), ordered_keys) + } + + /// Create writer properties with dictionary encoding for appropriate columns + fn create_writer_properties( + compression_level: i32, + label_keys: &[String], + bloom_filter_config: &BloomFilterConfig, + ) -> Result { // Use Parquet v2 format for better encodings and compression: // // - DELTA_BINARY_PACKED encoding for integers (timestamps, fetch_index) @@ -204,22 +323,60 @@ impl Format { // // Dictionary encoding for low-cardinality columns: // - // - metric_kind: only 2 values ("counter", "gauge") + // - metric_kind: only 3 values ("counter", "gauge", "histogram") // - run_id: one UUID per run - let props = WriterProperties::builder() + // - label columns: often low cardinality (container_id, namespace, etc.) + let mut builder = WriterProperties::builder() .set_writer_version(WriterVersion::PARQUET_2_0) .set_compression(Compression::ZSTD(ZstdLevel::try_new(compression_level)?)) .set_column_dictionary_enabled(ColumnPath::from("metric_kind"), true) - .set_column_dictionary_enabled(ColumnPath::from("run_id"), true) - .build(); + .set_column_dictionary_enabled(ColumnPath::from("run_id"), true); + + // Enable dictionary encoding for all label columns + for key in label_keys { + builder = builder.set_column_dictionary_enabled( + ColumnPath::from(format!("{LABEL_COLUMN_PREFIX}{key}")), + true, + ); + } - let arrow_writer = ArrowWriter::try_new(writer, schema.clone(), Some(props))?; + // Enable bloom filters for configured label columns + // Bloom filters allow readers to skip row groups that definitely don't + // contain the target value, improving query performance significantly. + for bloom_col in &bloom_filter_config.columns { + let column_name = format!("{LABEL_COLUMN_PREFIX}{}", bloom_col.label_name); + let column_path = ColumnPath::from(column_name); - Ok(Self { - buffers: ColumnBuffers::new(), - writer: arrow_writer, - schema, - }) + builder = builder.set_column_bloom_filter_enabled(column_path.clone(), true); + builder = builder.set_column_bloom_filter_ndv(column_path, bloom_col.ndv); + } + + Ok(builder.build()) + } + + /// Initialize the writer with schema based on discovered label keys + /// + /// Called on first flush when we know what label keys exist. + fn initialize_writer(&mut self) -> Result<(), Error> { + let raw_writer = self + .raw_writer + .take() + .expect("raw_writer should be present before initialization"); + + let (schema, ordered_keys) = Self::generate_schema(&self.buffers.unique_label_keys); + let props = Self::create_writer_properties( + self.compression_level, + &ordered_keys, + &self.bloom_filter_config, + )?; + + let arrow_writer = ArrowWriter::try_new(raw_writer, schema.clone(), Some(props))?; + + self.schema = Some(schema); + self.schema_label_keys = ordered_keys; + self.writer = Some(arrow_writer); + + Ok(()) } /// Convert buffered data to Arrow `RecordBatch` @@ -228,43 +385,19 @@ impl Format { /// /// Returns error if `RecordBatch` construction fails fn buffers_to_record_batch(&self) -> Result { + let schema = self + .schema + .as_ref() + .expect("schema should be initialized before creating record batch"); + if self.buffers.is_empty() { - return Ok(RecordBatch::new_empty(self.schema.clone())); + return Ok(RecordBatch::new_empty(schema.clone())); } - // Prepare label offsets with initial 0 - let mut label_offsets = Vec::with_capacity(self.buffers.label_offsets.len() + 1); - label_offsets.push(0i32); - label_offsets.extend_from_slice(&self.buffers.label_offsets); - - // Build the labels map array using pre-allocated buffers - let keys_array = Arc::new(StringArray::from(self.buffers.label_keys.clone())); - let values_array = Arc::new(StringArray::from(self.buffers.label_values.clone())); - let struct_array = StructArray::from(vec![ - ( - Arc::new(Field::new("key", DataType::Utf8, false)), - keys_array as ArrayRef, - ), - ( - Arc::new(Field::new("value", DataType::Utf8, false)), - values_array as ArrayRef, - ), - ]); - - let field = Arc::new(Field::new( - "entries", - DataType::Struct(Fields::from(vec![ - Field::new("key", DataType::Utf8, false), - Field::new("value", DataType::Utf8, false), - ])), - false, - )); + let num_rows = self.buffers.run_ids.len(); - let offsets = OffsetBuffer::new(label_offsets.into()); - let labels_map = MapArray::new(field, offsets, struct_array, None, false); - - // Build arrays directly from pre-allocated buffers - let arrays: Vec = vec![ + // Build base column arrays + let mut arrays: Vec = vec![ Arc::new(StringArray::from(self.buffers.run_ids.clone())), Arc::new(TimestampMillisecondArray::from(self.buffers.times.clone())), Arc::new(UInt64Array::from(self.buffers.fetch_indices.clone())), @@ -272,23 +405,47 @@ impl Format { Arc::new(StringArray::from(self.buffers.metric_kinds.clone())), Arc::new(UInt64Array::from(self.buffers.values_int.clone())), Arc::new(Float64Array::from(self.buffers.values_float.clone())), - Arc::new(labels_map), - Arc::new(BinaryArray::from_opt_vec( - self.buffers - .values_histogram - .iter() - .map(|v| { - if v.is_empty() { - None - } else { - Some(v.as_slice()) - } - }) - .collect(), - )), ]; - Ok(RecordBatch::try_new(self.schema.clone(), arrays)?) + // Build l_ columns for each label key in schema order + for key in &self.schema_label_keys { + let values: Vec> = self + .buffers + .row_labels + .iter() + .map(|row_map| row_map.get(key).map(String::as_str)) + .collect(); + arrays.push(Arc::new(StringArray::from(values))); + } + + // Add histogram column last + arrays.push(Arc::new( + self.buffers + .values_histogram + .iter() + .map(|v| { + if v.is_empty() { + None + } else { + Some(v.as_slice()) + } + }) + .collect::(), + )); + + debug_assert_eq!( + arrays.len(), + schema.fields().len(), + "array count ({}) must match schema field count ({})", + arrays.len(), + schema.fields().len() + ); + debug_assert!( + arrays.iter().all(|a| a.len() == num_rows), + "all arrays must have {num_rows} rows", + ); + + Ok(RecordBatch::try_new(schema.clone(), arrays)?) } /// Write buffered metrics as a Parquet row group @@ -301,8 +458,16 @@ impl Format { return Ok(()); } + // Initialize writer on first flush when we know the label keys + if self.writer.is_none() { + self.initialize_writer()?; + } + let batch = self.buffers_to_record_batch()?; - self.writer.write(&batch)?; + self.writer + .as_mut() + .expect("writer should be initialized") + .write(&batch)?; self.buffers.clear(); Ok(()) } @@ -345,8 +510,12 @@ impl Format { pub fn close(mut self) -> Result<(), Error> { // Write any remaining buffered data as a final row group self.write_parquet()?; - // Close the ArrowWriter which consumes it - self.writer.close()?; + + // Close the ArrowWriter if it was created + if let Some(writer) = self.writer { + writer.close()?; + } + // If writer was never created (no data written), nothing to close Ok(()) } } @@ -365,6 +534,44 @@ impl crate::formats::OutputFormat for Format { } } +impl Format> { + /// Rotate to a new output file + /// + /// Closes the current Parquet file (writing footer) and opens a new file + /// at the specified path with the same compression and bloom filter settings. + /// + /// # Errors + /// + /// Returns an error if closing the current file or creating the new file fails. + pub fn rotate_to(self, path: &std::path::Path) -> Result { + // Store settings before closing + let compression_level = self.compression_level; + let bloom_filter_config = self.bloom_filter_config.clone(); + + // Close current file (writes footer) + self.close()?; + + // Create new file and writer with same settings + let file = File::create(path)?; + let writer = BufWriter::new(file); + let format = Self::with_bloom_filter(writer, compression_level, bloom_filter_config)?; + + Ok(format) + } + + /// Get the compression level for this format + #[must_use] + pub fn compression_level(&self) -> i32 { + self.compression_level + } + + /// Get the bloom filter configuration for this format + #[must_use] + pub fn bloom_filter_config(&self) -> &BloomFilterConfig { + &self.bloom_filter_config + } +} + #[cfg(test)] mod tests { use super::*; @@ -456,4 +663,114 @@ mod tests { assert!(!buffer.get_ref().is_empty(), "should have written data"); } + + #[test] + fn writes_label_columns() { + use arrow_array::{Array, RecordBatchReader}; + use bytes::Bytes; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + let mut buffer = Cursor::new(Vec::new()); + + { + let mut format = Format::new(&mut buffer, 3).expect("create format"); + + // Write metric with labels + let mut labels = FxHashMap::default(); + labels.insert("container_id".to_string(), "abc123".to_string()); + labels.insert("namespace".to_string(), "default".to_string()); + labels.insert("qos_class".to_string(), "Guaranteed".to_string()); + + let line = Line { + run_id: Uuid::new_v4(), + time: 1000, + fetch_index: 0, + metric_name: "test_metric".into(), + metric_kind: MetricKind::Gauge, + value: LineValue::Float(42.0), + labels, + value_histogram: Vec::new(), + }; + + format.write_metric(&line).expect("write should succeed"); + + // Write another metric with different labels + let mut labels2 = FxHashMap::default(); + labels2.insert("container_id".to_string(), "def456".to_string()); + labels2.insert("namespace".to_string(), "kube-system".to_string()); + // Note: no qos_class label + + let line2 = Line { + run_id: Uuid::new_v4(), + time: 2000, + fetch_index: 1, + metric_name: "test_metric".into(), + metric_kind: MetricKind::Gauge, + value: LineValue::Float(100.0), + labels: labels2, + value_histogram: Vec::new(), + }; + + format.write_metric(&line2).expect("write should succeed"); + format.close().expect("close should succeed"); + } + + // Read back and verify schema has l_* columns + let data = Bytes::from(buffer.into_inner()); + let reader = ParquetRecordBatchReaderBuilder::try_new(data) + .expect("create reader") + .build() + .expect("build reader"); + + let schema = reader.schema(); + + // Check that l_* columns exist (sorted alphabetically) + assert!( + schema.field_with_name("l_container_id").is_ok(), + "should have l_container_id column" + ); + assert!( + schema.field_with_name("l_namespace").is_ok(), + "should have l_namespace column" + ); + assert!( + schema.field_with_name("l_qos_class").is_ok(), + "should have l_qos_class column" + ); + + // Check no labels MapArray column + assert!( + schema.field_with_name("labels").is_err(), + "should NOT have labels column (replaced by l_* columns)" + ); + + // Read data and verify values + let batches: Vec<_> = reader.into_iter().collect(); + assert_eq!(batches.len(), 1, "should have one batch"); + let batch = batches[0].as_ref().expect("batch should be ok"); + + assert_eq!(batch.num_rows(), 2, "should have 2 rows"); + + // Check l_container_id values + let container_col = batch + .column_by_name("l_container_id") + .expect("l_container_id column"); + let container_arr = container_col + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(container_arr.value(0), "abc123"); + assert_eq!(container_arr.value(1), "def456"); + + // Check l_qos_class values (second row should be null) + let qos_col = batch + .column_by_name("l_qos_class") + .expect("l_qos_class column"); + let qos_arr = qos_col + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(qos_arr.value(0), "Guaranteed"); + assert!(qos_arr.is_null(1), "second row should have null qos_class"); + } } diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 2ca2a6c9f..b3e095783 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -17,7 +17,7 @@ use std::{ }; use arc_swap::ArcSwap; -use tokio::{fs, sync::mpsc, time}; +use tokio::{fs, sync::mpsc, sync::oneshot, time}; use crate::{ accumulator, @@ -445,6 +445,27 @@ impl CaptureManager>, RealClock> } } +/// Request to rotate to a new output file +/// +/// Contains the path for the new file and a channel to send the result. +pub struct RotationRequest { + /// Path for the new output file + pub path: PathBuf, + /// Channel to send rotation result (Ok on success, Err on failure) + pub response: oneshot::Sender>, +} + +impl std::fmt::Debug for RotationRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RotationRequest") + .field("path", &self.path) + .finish_non_exhaustive() + } +} + +/// Handle for sending rotation requests to a running [`CaptureManager`] +pub type RotationSender = mpsc::Sender; + impl CaptureManager>, RealClock> { /// Create a new [`CaptureManager`] with file-based Parquet writer /// @@ -460,13 +481,55 @@ impl CaptureManager>, RealCloc experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, expiration: Duration, + ) -> Result { + Self::new_parquet_with_bloom_filter( + capture_path, + flush_seconds, + compression_level, + parquet::BloomFilterConfig::default(), + shutdown, + experiment_started, + target_running, + expiration, + ) + .await + } + + /// Create a new [`CaptureManager`] with file-based Parquet writer and bloom filter config + /// + /// # Arguments + /// + /// * `capture_path` - Path to the output Parquet file + /// * `flush_seconds` - How often to flush buffered data + /// * `compression_level` - Zstd compression level (1-22) + /// * `bloom_filter_config` - Configuration for bloom filters on label columns + /// * `shutdown` - Signal to gracefully shut down the capture manager + /// * `experiment_started` - Signal that the experiment has started + /// * `target_running` - Signal that the target is running + /// * `expiration` - Duration after which metrics expire + /// + /// # Errors + /// + /// Function will error if the underlying capture file cannot be opened or + /// if Parquet writer creation fails. + #[allow(clippy::too_many_arguments)] + pub async fn new_parquet_with_bloom_filter( + capture_path: PathBuf, + flush_seconds: u64, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + shutdown: lading_signal::Watcher, + experiment_started: lading_signal::Watcher, + target_running: lading_signal::Watcher, + expiration: Duration, ) -> Result { let fp = fs::File::create(&capture_path) .await .map_err(formats::Error::Io)?; let fp = fp.into_std().await; let writer = BufWriter::new(fp); - let format = parquet::Format::new(writer, compression_level)?; + let format = + parquet::Format::with_bloom_filter(writer, compression_level, bloom_filter_config)?; Ok(Self::new_with_format( format, @@ -478,6 +541,183 @@ impl CaptureManager>, RealCloc RealClock::default(), )) } + + /// Run [`CaptureManager`] with file rotation support + /// + /// Similar to [`start`](CaptureManager::start), but also provides a channel + /// for rotation requests. When a rotation request is received, the current + /// Parquet file is finalized (footer written) and a new file is created at + /// the specified path. + /// + /// Returns a tuple of ([`RotationSender`], [`JoinHandle`](tokio::task::JoinHandle)) + /// immediately. The `RotationSender` can be used to trigger rotations while + /// the event loop runs. The `JoinHandle` can be awaited to ensure the + /// CaptureManager has fully drained and closed before shutdown. + /// + /// # Panics + /// + /// Panics if the shutdown watcher is missing (should never happen in normal use). + /// + /// # Errors + /// + /// Returns an error if there is already a global recorder set. + #[allow(clippy::cast_possible_truncation)] + pub async fn start_with_rotation( + mut self, + ) -> Result<(RotationSender, tokio::task::JoinHandle<()>), Error> { + // Create rotation channel - return the sender immediately + let (rotation_tx, rotation_rx) = mpsc::channel::(4); + + // Initialize historical sender + HISTORICAL_SENDER.store(Arc::new(Some(Arc::new(Sender { + snd: self.snd.clone(), + })))); + + self.install()?; + info!("Capture manager installed with rotation support, recording to capture file."); + + // Wait until the target is running then mark time-zero + self.target_running.recv().await; + self.clock.mark_start(); + + let compression_level = self.format.compression_level(); + let bloom_filter_config = self.format.bloom_filter_config().clone(); + + // Run the event loop in a spawned task so we can return the sender immediately + let expiration = self.expiration; + let format = self.format; + let flush_seconds = self.flush_seconds; + let registry = self.registry; + let accumulator = self.accumulator; + let global_labels = self.global_labels; + let clock = self.clock; + let recv = self.recv; + let shutdown = self + .shutdown + .take() + .expect("shutdown watcher must be present"); + + let handle = tokio::spawn(async move { + if let Err(e) = Self::rotation_event_loop( + expiration, + format, + flush_seconds, + registry, + accumulator, + global_labels, + clock, + recv, + shutdown, + rotation_rx, + compression_level, + bloom_filter_config, + ) + .await + { + error!(error = %e, "CaptureManager rotation event loop error"); + } + }); + + Ok((rotation_tx, handle)) + } + + /// Internal event loop with rotation support + #[allow(clippy::too_many_arguments)] + #[allow(clippy::cast_possible_truncation)] + async fn rotation_event_loop( + expiration: Duration, + format: formats::parquet::Format>, + flush_seconds: u64, + registry: Arc>, + accumulator: Accumulator, + global_labels: FxHashMap, + clock: RealClock, + mut recv: mpsc::Receiver, + shutdown: lading_signal::Watcher, + mut rotation_rx: mpsc::Receiver, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + ) -> Result<(), Error> { + let mut flush_interval = clock.interval(Duration::from_millis(TICK_DURATION_MS as u64)); + let shutdown_wait = shutdown.recv(); + tokio::pin!(shutdown_wait); + + // Create state machine with owned state + let mut state_machine = StateMachine::new( + expiration, + format, + flush_seconds, + registry, + accumulator, + global_labels, + clock, + ); + + // Event loop with rotation support + loop { + let event = tokio::select! { + val = recv.recv() => { + match val { + Some(metric) => Event::MetricReceived(metric), + None => Event::ChannelClosed, + } + } + () = flush_interval.tick() => Event::FlushTick, + Some(rotation_req) = rotation_rx.recv() => { + // Handle rotation inline since it's not a state machine event + let result = Self::handle_rotation( + &mut state_machine, + rotation_req.path, + compression_level, + &bloom_filter_config, + ).await; + // Send result back to caller (ignore send error if receiver dropped) + let _ = rotation_req.response.send(result); + continue; + } + () = &mut shutdown_wait => Event::ShutdownSignaled, + }; + + match state_machine.next(event)? { + Operation::Continue => {} + Operation::Exit => return Ok(()), + } + } + } + + /// Handle a rotation request + async fn handle_rotation( + state_machine: &mut StateMachine< + formats::parquet::Format>, + RealClock, + >, + new_path: PathBuf, + compression_level: i32, + bloom_filter_config: &parquet::BloomFilterConfig, + ) -> Result<(), formats::Error> { + // Create new file and format with same settings + let fp = fs::File::create(&new_path) + .await + .map_err(formats::Error::Io)?; + let fp = fp.into_std().await; + let writer = BufWriter::new(fp); + let new_format = parquet::Format::with_bloom_filter( + writer, + compression_level, + bloom_filter_config.clone(), + )?; + + // Swap formats - this flushes any buffered data + let old_format = state_machine + .replace_format(new_format) + .map_err(|e| formats::Error::Io(io::Error::other(e.to_string())))?; + + // Close old format to write Parquet footer + old_format.close()?; + + info!(path = %new_path.display(), "Rotated to new capture file"); + Ok(()) + } } impl @@ -504,6 +744,40 @@ impl experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, expiration: Duration, + ) -> Result { + Self::new_multi_with_bloom_filter( + base_path, + flush_seconds, + compression_level, + parquet::BloomFilterConfig::default(), + shutdown, + experiment_started, + target_running, + expiration, + ) + .await + } + + /// Create a new [`CaptureManager`] with file-based multi-format writer and bloom filter config + /// + /// Writes to both JSONL and Parquet formats simultaneously. The base path + /// is used to generate two output files: `{base_path}.jsonl` and + /// `{base_path}.parquet`. + /// + /// # Errors + /// + /// Function will error if either capture file cannot be opened or if + /// format creation fails. + #[allow(clippy::too_many_arguments)] + pub async fn new_multi_with_bloom_filter( + base_path: PathBuf, + flush_seconds: u64, + compression_level: i32, + bloom_filter_config: parquet::BloomFilterConfig, + shutdown: lading_signal::Watcher, + experiment_started: lading_signal::Watcher, + target_running: lading_signal::Watcher, + expiration: Duration, ) -> Result { let jsonl_path = base_path.with_extension("jsonl"); let parquet_path = base_path.with_extension("parquet"); @@ -520,7 +794,11 @@ impl .map_err(formats::Error::Io)?; let parquet_file = parquet_file.into_std().await; let parquet_writer = BufWriter::new(parquet_file); - let parquet_format = parquet::Format::new(parquet_writer, compression_level)?; + let parquet_format = parquet::Format::with_bloom_filter( + parquet_writer, + compression_level, + bloom_filter_config, + )?; let format = multi::Format::new(jsonl_format, parquet_format); diff --git a/lading_capture/src/manager/state_machine.rs b/lading_capture/src/manager/state_machine.rs index 9d9a2c0fb..5836e2bc3 100644 --- a/lading_capture/src/manager/state_machine.rs +++ b/lading_capture/src/manager/state_machine.rs @@ -258,6 +258,39 @@ impl StateMachine { Ok(Operation::Exit) } + /// Replace the current format with a new one, returning the old format. + /// + /// This method flushes any buffered data before returning the old format. + /// The caller is responsible for closing the old format (to write any + /// footer/metadata) and providing a properly initialized new format. + /// + /// This enables file rotation: the caller can close the old format (writing + /// the Parquet footer), create a new file, and provide the new format. + /// + /// # Errors + /// + /// Returns an error if flushing the current format fails. + /// + /// # Panics + /// + /// Panics if called when no format is present (after shutdown). + pub(crate) fn replace_format(&mut self, new_format: F) -> Result { + // Flush any buffered data in the current format + self.format + .as_mut() + .expect("format must be present during operation") + .flush()?; + + // Swap in the new format and return the old one + let old_format = self + .format + .replace(new_format) + .expect("format must be present during operation"); + + info!("Format replaced for file rotation"); + Ok(old_format) + } + /// Convert an Instant timestamp to `Accumulator` logical tick time. #[inline] fn instant_to_tick(&self, timestamp: Instant) -> u64 { diff --git a/lading_capture/src/validate/parquet.rs b/lading_capture/src/validate/parquet.rs index 2f77f9323..d46fbf980 100644 --- a/lading_capture/src/validate/parquet.rs +++ b/lading_capture/src/validate/parquet.rs @@ -10,9 +10,7 @@ use std::fs::File; use std::hash::{BuildHasher, Hasher}; use std::path::Path; -use arrow_array::{ - Array, MapArray, StringArray, StructArray, TimestampMillisecondArray, UInt64Array, -}; +use arrow_array::{Array, StringArray, TimestampMillisecondArray, UInt64Array}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use crate::validate::ValidationResult; @@ -120,12 +118,23 @@ pub fn validate_parquet>( Error::InvalidColumnType("'metric_name' column is not String".to_string()) })?; - let labels_array = batch - .column_by_name("labels") - .ok_or_else(|| Error::MissingColumn("labels".to_string()))? - .as_any() - .downcast_ref::() - .ok_or_else(|| Error::InvalidColumnType("'labels' column is not Map".to_string()))?; + // Collect l_* columns for label extraction (new schema uses flat columns) + let schema = batch.schema(); + let l_columns: Vec<(&str, &StringArray)> = schema + .fields() + .iter() + .filter_map(|field| { + let name = field.name(); + if name.starts_with("l_") { + batch + .column_by_name(name) + .and_then(|c| c.as_any().downcast_ref::()) + .map(|arr| (name.strip_prefix("l_").unwrap_or(name), arr)) + } else { + None + } + }) + .collect(); let metric_kind_array = batch .column_by_name("metric_kind") @@ -172,27 +181,13 @@ pub fn validate_parquet>( fetch_index_to_time.insert(fetch_index, time); } - let labels_slice: StructArray = labels_array.value(row); - let key_array = labels_slice - .column(0) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType("Labels keys are not StringArray".to_string()) - })?; - let value_array = labels_slice - .column(1) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::InvalidColumnType("Labels values are not StringArray".to_string()) - })?; - + // Extract labels from l_* columns let mut sorted_labels: BTreeSet = BTreeSet::new(); - for i in 0..key_array.len() { - let key = key_array.value(i); - let value = value_array.value(i); - sorted_labels.insert(format!("{key}:{value}")); + for (key, arr) in &l_columns { + if !arr.is_null(row) { + let value = arr.value(row); + sorted_labels.insert(format!("{key}:{value}")); + } } let mut hasher = hash_builder.build_hasher();