Skip to content
3 changes: 2 additions & 1 deletion lading/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use std::io;
use crate::target::TargetPidReceiver;
use serde::Deserialize;

/// Linux-specific observer implementation using procfs and cgroups
#[cfg(target_os = "linux")]
mod linux;
pub mod linux;

#[derive(thiserror::Error, Debug)]
/// Errors produced by [`Server`]
Expand Down
15 changes: 10 additions & 5 deletions lading/src/observer/linux.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
mod cgroup;
mod procfs;
/// Cgroup metrics collection
pub mod cgroup;
/// Procfs metrics collection
pub mod procfs;
mod utils;
mod wss;

Expand All @@ -19,16 +21,18 @@ pub enum Error {
Wss(#[from] wss::Error),
}

/// Combined sampler for procfs, cgroup, and WSS metrics
#[derive(Debug)]
pub(crate) struct Sampler {
pub struct Sampler {
procfs: procfs::Sampler,
cgroup: cgroup::Sampler,
wss: Option<wss::Sampler>,
tick_counter: u8,
}

impl Sampler {
pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result<Self, Error> {
/// Create a new Sampler for the given parent PID
pub fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result<Self, Error> {
let procfs_sampler = procfs::Sampler::new(parent_pid)?;
let cgroup_sampler = cgroup::Sampler::new(parent_pid, labels)?;
let wss_sampler = if wss::Sampler::is_available() {
Expand Down Expand Up @@ -64,7 +68,8 @@ ls -l /sys/kernel/mm/page_idle/bitmap
})
}

pub(crate) async fn sample(&mut self) -> Result<(), Error> {
/// Sample all metrics (procfs, cgroup, and optionally WSS)
pub async fn sample(&mut self) -> Result<(), Error> {
let sample_smaps = self.tick_counter.is_multiple_of(10);
let sample_wss = self.tick_counter.is_multiple_of(60);
self.tick_counter += 1;
Expand Down
11 changes: 7 additions & 4 deletions lading/src/observer/linux/cgroup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// Code to read cgroup information.
pub(crate) mod v2;
pub mod v2;

use std::{collections::VecDeque, io, path::PathBuf};

Expand Down Expand Up @@ -33,15 +33,17 @@ struct CgroupInfo {
cpu_sampler: cpu::Sampler,
}

/// Samples cgroup metrics for a process tree
#[derive(Debug)]
pub(crate) struct Sampler {
pub struct Sampler {
parent: Process,
cgroup_info: FxHashMap<PathBuf, CgroupInfo>,
labels: Vec<(String, String)>,
}

impl Sampler {
pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result<Self, Error> {
/// Create a new cgroup Sampler for the given parent PID
pub fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result<Self, Error> {
let parent = Process::new(parent_pid)?;
let cgroup_info = FxHashMap::default();

Expand All @@ -52,8 +54,9 @@ impl Sampler {
})
}

/// Poll cgroup metrics for all processes in the tree
#[allow(clippy::cast_possible_wrap)]
pub(crate) async fn poll(&mut self) -> Result<(), Error> {
pub async fn poll(&mut self) -> Result<(), Error> {
// Every sample run we collect all the child processes rooted at the
// parent. As noted by the procfs documentation is this done by
// dereferencing the `/proc/<pid>/root` symlink.
Expand Down
17 changes: 12 additions & 5 deletions lading/src/observer/linux/cgroup/v2.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub(crate) mod cpu;
pub(crate) mod io;
pub(crate) mod memory;
/// CPU metrics from cgroup v2
pub mod cpu;
pub mod io;
pub mod memory;

use core::f64;
use std::{
Expand All @@ -12,22 +13,28 @@ use metrics::{counter, gauge};
use tokio::fs;
use tracing::{debug, error, warn};

/// Errors produced by cgroup v2 functions
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Wrapper for [`std::io::Error`]
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
/// Integer parsing error
#[error("Parse int error: {0}")]
ParseInt(#[from] std::num::ParseIntError),
/// Float parsing error
#[error("Parse float error: {0}")]
ParseFloat(#[from] std::num::ParseFloatError),
/// Cgroup v2 hierarchy not found for process
#[error("Cgroup v2 not found")]
CgroupV2NotFound,
/// PSI (Pressure Stall Information) parsing error
#[error("Parsing PSI error: {0}")]
ParsingPsi(String),
}

/// Determines the cgroup v2 path for a given PID.
pub(crate) async fn get_path(pid: i32) -> Result<PathBuf, Error> {
pub async fn get_path(pid: i32) -> Result<PathBuf, Error> {
let path = format!("/proc/{pid}/cgroup");
let content = fs::read_to_string(path).await?;

Expand All @@ -52,7 +59,7 @@ pub(crate) async fn get_path(pid: i32) -> Result<PathBuf, Error> {
/// Polls for any cgroup metrics that can be read, v2 version.
#[tracing::instrument(skip_all)]
#[allow(clippy::too_many_lines)]
pub(crate) async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> {
pub async fn poll(file_path: &Path, labels: &[(String, String)]) -> Result<(), Error> {
// Read all files in the cgroup `path` and create metrics for them. If we
// lack permissions to read we skip the file. We do not use ? to allow for
// the maximal number of files to be read.
Expand Down
63 changes: 37 additions & 26 deletions lading/src/observer/linux/cgroup/v2/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use tokio::fs;

#[derive(thiserror::Error, Debug)]
/// Errors produced by functions in this module
pub(crate) enum Error {
pub enum Error {
/// Wrapper for [`std::io::Error`]
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
/// Float parsing error
#[error("Float Parsing: {0}")]
ParseFloat(#[from] std::num::ParseFloatError),
/// Integer parsing error
#[error("Integer Parsing: {0}")]
ParseInt(#[from] std::num::ParseIntError),
}
Expand All @@ -23,25 +25,22 @@ struct Stats {
last_instant: Instant,
}

#[derive(Debug)]
pub(crate) struct Sampler {
prev: Stats,
/// Samples CPU statistics from cgroup v2 with delta calculations
#[derive(Debug, Default)]
pub struct Sampler {
/// Previous stats for delta calculation. None on first poll.
prev: Option<Stats>,
}

impl Sampler {
pub(crate) fn new() -> Self {
Self {
prev: Stats {
usage_usec: 0,
user_usec: 0,
system_usec: 0,
last_instant: Instant::now(),
},
}
/// Create a new CPU Sampler
#[must_use]
pub fn new() -> Self {
Self { prev: None }
}

// Read cgroup CPU data and calculate a percentage of usage.
pub(crate) async fn poll(
/// Read cgroup CPU data and calculate a percentage of usage.
pub async fn poll(
&mut self,
group_prefix: &Path,
labels: &[(String, String)],
Expand Down Expand Up @@ -80,17 +79,29 @@ impl Sampler {
}

let now = Instant::now();
let delta_time = now.duration_since(self.prev.last_instant).as_micros();
let delta_usage = usage_usec.saturating_sub(self.prev.usage_usec);
let delta_user = user_usec.saturating_sub(self.prev.user_usec);
let delta_system = system_usec.saturating_sub(self.prev.system_usec);

// Update previous stats and if there's a time delta calculate the CPU
// usage.
self.prev.usage_usec = usage_usec;
self.prev.user_usec = user_usec;
self.prev.system_usec = system_usec;
self.prev.last_instant = now;
let current = Stats {
usage_usec,
user_usec,
system_usec,
last_instant: now,
};

// On first poll, just record baseline stats without emitting metrics.
// This avoids a spike where delta = (cumulative_since_container_start - 0).
let Some(ref prev) = self.prev else {
self.prev = Some(current);
return Ok(());
};

let delta_time = now.duration_since(prev.last_instant).as_micros();
let delta_usage = usage_usec.saturating_sub(prev.usage_usec);
let delta_user = user_usec.saturating_sub(prev.user_usec);
let delta_system = system_usec.saturating_sub(prev.system_usec);

// Update previous stats for next poll
self.prev = Some(current);

// Emit metrics if there's a time delta
if delta_time > 0 {
let delta_time = delta_time as f64;

Expand Down
2 changes: 1 addition & 1 deletion lading/src/observer/linux/cgroup/v2/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tracing::warn;
/// * `pgdeactivate`
/// * `pglazyfree`
/// * `pglazyfreed`
pub(crate) fn stat(content: &str, metric_prefix: &str, labels: &[(String, String)]) {
pub fn stat(content: &str, metric_prefix: &str, labels: &[(String, String)]) {
let counter_keys = [
"pgfault",
"pgmajfault",
Expand Down
17 changes: 11 additions & 6 deletions lading/src/observer/linux/procfs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/// Sampler implementation for procfs filesystems
mod memory;
mod stat;
mod uptime;
pub mod memory;
/// Per-process CPU statistics from /proc/<pid>/stat
pub mod stat;
/// System uptime from /proc/uptime
pub mod uptime;
mod vmstat;

use std::io;
Expand Down Expand Up @@ -79,14 +81,16 @@ struct ProcessInfo {
stat_sampler: stat::Sampler,
}

/// Samples procfs metrics for a process tree
#[derive(Debug)]
pub(crate) struct Sampler {
pub struct Sampler {
parent: Process,
process_info: FxHashMap<i32, ProcessInfo>,
}

impl Sampler {
pub(crate) fn new(parent_pid: i32) -> Result<Self, Error> {
/// Create a new procfs Sampler for the given parent PID
pub fn new(parent_pid: i32) -> Result<Self, Error> {
let parent = Process::new(parent_pid)?;
let process_info = FxHashMap::default();

Expand All @@ -96,6 +100,7 @@ impl Sampler {
})
}

/// Poll procfs metrics for all processes in the tree
#[allow(
clippy::similar_names,
clippy::too_many_lines,
Expand All @@ -104,7 +109,7 @@ impl Sampler {
clippy::cast_possible_wrap,
clippy::cast_lossless
)]
pub(crate) async fn poll(&mut self, include_smaps: bool) -> Result<(), Error> {
pub async fn poll(&mut self, include_smaps: bool) -> Result<(), Error> {
// A tally of the total RSS and PSS consumed by the parent process and
// its children.
let mut aggr = smaps_rollup::Aggregator::default();
Expand Down
6 changes: 4 additions & 2 deletions lading/src/observer/linux/procfs/memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub(crate) mod smaps;
pub(crate) mod smaps_rollup;
/// Per-process memory regions from /proc/<pid>/smaps
pub mod smaps;
/// Rolled-up memory statistics from /proc/<pid>/smaps_rollup
pub mod smaps_rollup;

const BYTES_PER_KIBIBYTE: u64 = 1024;

Expand Down
17 changes: 11 additions & 6 deletions lading/src/observer/linux/procfs/memory/smaps_rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ use super::{BYTES_PER_KIBIBYTE, next_token};

#[derive(thiserror::Error, Debug)]
/// Errors produced by functions in this module
pub(crate) enum Error {
pub enum Error {
/// Wrapper for [`std::io::Error`]
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
/// Integer parsing error
#[error("Number Parsing: {0}")]
ParseInt(#[from] std::num::ParseIntError),
/// General parsing error
#[error("Parsing: {0}")]
Parsing(String),
}

/// Aggregates memory metrics from smaps_rollup
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct Aggregator {
pub(crate) rss: u64,
pub(crate) pss: u64,
pub struct Aggregator {
/// Resident Set Size in bytes
pub rss: u64,
/// Proportional Set Size in bytes
pub pss: u64,
}

// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics.
pub(crate) async fn poll(
/// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics.
pub async fn poll(
pid: i32,
labels: &[(&'static str, String)],
aggr: &mut Aggregator,
Expand Down
Loading
Loading