diff --git a/Cargo.lock b/Cargo.lock index 7713086bd..965e88439 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -624,8 +624,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link 0.2.1", ] @@ -1860,9 +1862,13 @@ dependencies = [ "serde_qs", "serde_yaml", "sha2", + "signal-hook", "sysinfo", "tempfile", "thiserror", + "tikv-jemalloc-ctl", + "tikv-jemalloc-sys", + "tikv-jemallocator", "tokio", "tokio-stream", "tokio-util", @@ -1917,6 +1923,7 @@ dependencies = [ "arbitrary", "byte-unit", "bytes", + "chrono", "criterion", "opentelemetry-proto", "proptest", @@ -1927,6 +1934,7 @@ dependencies = [ "serde", "serde_json", "serde_tuple", + "tempfile", "thiserror", "time", "tokio", @@ -3614,6 +3622,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.7" @@ -3796,6 +3814,37 @@ dependencies = [ "ordered-float 2.10.1", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "661f1f6a57b3a36dc9174a2c10f19513b4866816e13425d3e418b11cc37bc24c" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.44" diff --git a/Dockerfile b/Dockerfile index 54893cefb..f5052ad32 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,14 +9,10 @@ RUN cargo chef prepare --recipe-path recipe.json # Stage 1: Cacher - Build dependencies only FROM docker.io/rust:1.90.0-slim-bookworm AS cacher -ARG SCCACHE_BUCKET -ARG SCCACHE_REGION -ARG AWS_ACCESS_KEY_ID -ARG AWS_SECRET_ACCESS_KEY -ARG AWS_SESSION_TOKEN -ENV CARGO_INCREMENTAL=0 +ENV JEMALLOC_SYS_WITH_PROFILING=1 WORKDIR /app RUN apt-get update && apt-get install -y \ + build-essential \ pkg-config=1.8.1-1 \ libssl-dev=3.0.17-1~deb12u3 \ protobuf-compiler=3.21.12-3 \ @@ -24,37 +20,17 @@ RUN apt-get update && apt-get install -y \ libfuse3-dev=3.14.0-4 \ curl \ && rm -rf /var/lib/apt/lists/* -# Download pre-built sccache binary -RUN case "$(uname -m)" in \ - x86_64) ARCH=x86_64-unknown-linux-musl ;; \ - aarch64) ARCH=aarch64-unknown-linux-musl ;; \ - *) echo "Unsupported architecture" && exit 1 ;; \ - esac && \ - curl -L https://github.com/mozilla/sccache/releases/download/v0.8.2/sccache-v0.8.2-${ARCH}.tar.gz | tar xz && \ - mv sccache-v0.8.2-${ARCH}/sccache /usr/local/cargo/bin/ && \ - rm -rf sccache-v0.8.2-${ARCH} RUN cargo install cargo-chef --version 0.1.73 COPY --from=planner /app/recipe.json recipe.json # This layer is cached until Cargo.toml/Cargo.lock change -# Use BuildKit secrets to pass AWS credentials securely (not exposed in image metadata) -RUN --mount=type=secret,id=aws_access_key_id \ - --mount=type=secret,id=aws_secret_access_key \ - --mount=type=secret,id=aws_session_token \ - export AWS_ACCESS_KEY_ID=$(cat /run/secrets/aws_access_key_id) && \ - export AWS_SECRET_ACCESS_KEY=$(cat /run/secrets/aws_secret_access_key) && \ - export AWS_SESSION_TOKEN=$(cat /run/secrets/aws_session_token) && \ - export RUSTC_WRAPPER=sccache && \ - cargo chef cook --release --locked --features logrotate_fs --recipe-path recipe.json +RUN cargo chef cook --release --locked --features "logrotate_fs jemalloc_profiling" --recipe-path recipe.json # Stage 2: Builder - Build source code FROM docker.io/rust:1.90.0-slim-bookworm AS builder -ARG SCCACHE_BUCKET -ARG SCCACHE_REGION -ENV CARGO_INCREMENTAL=0 -ENV SCCACHE_BUCKET=${SCCACHE_BUCKET} -ENV SCCACHE_REGION=${SCCACHE_REGION} +ENV JEMALLOC_SYS_WITH_PROFILING=1 WORKDIR /app RUN apt-get update && apt-get install -y \ + build-essential \ pkg-config=1.8.1-1 \ libssl-dev=3.0.17-1~deb12u3 \ protobuf-compiler=3.21.12-3 \ @@ -68,23 +44,130 @@ COPY --from=cacher /usr/local/cargo /usr/local/cargo COPY . . # Build binary - reuses cached dependencies + sccache # Use BuildKit secrets to pass AWS credentials securely (not exposed in image metadata) -RUN --mount=type=secret,id=aws_access_key_id \ - --mount=type=secret,id=aws_secret_access_key \ - --mount=type=secret,id=aws_session_token \ - export AWS_ACCESS_KEY_ID=$(cat /run/secrets/aws_access_key_id) && \ - export AWS_SECRET_ACCESS_KEY=$(cat /run/secrets/aws_secret_access_key) && \ - export AWS_SESSION_TOKEN=$(cat /run/secrets/aws_session_token) && \ - export RUSTC_WRAPPER=sccache && \ - cargo build --release --locked --bin lading --features logrotate_fs +RUN cargo build --release --locked --bin lading --features "logrotate_fs jemalloc_profiling" # Stage 3: Runtime FROM docker.io/debian:bookworm-20241202-slim RUN apt-get update && apt-get install -y \ libfuse3-dev=3.14.0-4 \ fuse3=3.14.0-4 \ + curl \ + ca-certificates \ + procps \ + gawk \ + perl \ + libjemalloc2 \ && rm -rf /var/lib/apt/lists/* +RUN ARCH="$(dpkg --print-architecture)" && \ + url_release="https://github.com/DataDog/ddprof/releases/latest/download/ddprof-${ARCH}" && \ + curl -L -o /usr/local/bin/ddprof "${url_release}" && \ + chmod 755 /usr/local/bin/ddprof +# Lightweight page-table/RSS sampler helper +RUN cat > /usr/local/bin/pt-sampler <<'EOF' && chmod +x /usr/local/bin/pt-sampler +#!/bin/sh +# Usage: pt-sampler [interval_seconds] +pid="$1" +interval="${2:-5}" +if [ -z "$pid" ]; then + echo "usage: pt-sampler [interval_seconds]" >&2 + exit 1 +fi +while :; do + ts="$(date -Is)" + pt_kb="$(awk '/^PageTables:/ {sum+=$2} END {print sum+0}' /proc/"$pid"/smaps 2>/dev/null)" + rss_kb="$(awk '/^VmRSS:/ {print $2}' /proc/"$pid"/status 2>/dev/null)" + echo "$ts rss_kb=${rss_kb:-0} pagetables_kb=${pt_kb:-0}" + sleep "$interval" +done +EOF + +# Simple smaps rollup helper to find largest RSS mappings +RUN cat > /usr/local/bin/smaps-top <<'EOF' && chmod +x /usr/local/bin/smaps-top +#!/bin/sh +# Usage: smaps-top [topN] +pid="$1" +top="${2:-5}" +if [ -z "$pid" ]; then + echo "usage: smaps-top [topN]" >&2 + exit 1 +fi +gawk -v top="$top" ' +BEGIN{RS=""; OFS=""; c=0} +{ + rss=0; size=0; + if (match($0,/Rss:[[:space:]]+([0-9]+)/,a)) rss=a[1]; + if (match($0,/Size:[[:space:]]+([0-9]+)/,b)) size=b[1]; + hdr=$0; sub(/\n.*/,"",hdr); + blocks[c,"rss"]=rss; blocks[c,"size"]=size; blocks[c,"hdr"]=hdr; blocks[c,"blk"]=$0; c++; +} +END{ + n=c; + for(i=0;iblocks[max,"rss"]) max=j; + } + if(max!=i){ + tmp_r=blocks[i,"rss"]; tmp_s=blocks[i,"size"]; tmp_h=blocks[i,"hdr"]; tmp_b=blocks[i,"blk"]; + blocks[i,"rss"]=blocks[max,"rss"]; blocks[i,"size"]=blocks[max,"size"]; blocks[i,"hdr"]=blocks[max,"hdr"]; blocks[i,"blk"]=blocks[max,"blk"]; + blocks[max,"rss"]=tmp_r; blocks[max,"size"]=tmp_s; blocks[max,"hdr"]=tmp_h; blocks[max,"blk"]=tmp_b; + } + } + limit=(top /usr/local/bin/jeprof && \ + chmod +x /usr/local/bin/jeprof + +# Default jemalloc profiling configuration: enabled, active, 1MiB sample, dumps under /tmp. +# Note: tikv-jemalloc-sys uses _RJEM_MALLOC_CONF due to the _rjem_ symbol prefix. +# We set both for compatibility. +ENV MALLOC_CONF=prof:true,prof_active:true,lg_prof_sample:20,prof_prefix:/tmp/lading-heap \ + _RJEM_MALLOC_CONF=prof:true,prof_active:true,lg_prof_sample:20,prof_prefix:/tmp/lading-heap + +# Entrypoint wrapper: ensures MALLOC_CONF defaults are present and dump dir exists. +RUN cat > /usr/local/bin/lading-entrypoint.sh <<'EOF' && chmod +x /usr/local/bin/lading-entrypoint.sh +#!/bin/sh +set -e + +# Allow override; if not set, use baked-in default +# tikv-jemalloc-sys uses _RJEM_MALLOC_CONF due to symbol prefix, set both for compatibility +: "${MALLOC_CONF:=prof:true,prof_active:true,lg_prof_sample:20,prof_prefix:/tmp/lading-heap}" +: "${_RJEM_MALLOC_CONF:=$MALLOC_CONF}" +export MALLOC_CONF _RJEM_MALLOC_CONF + +# Best-effort create dump directory from prof_prefix +prefix="$(printf '%s' "$MALLOC_CONF" | tr ',' '\n' | sed -n 's/^prof_prefix://p' | head -n1)" +if [ -n "$prefix" ]; then + dir="$(dirname "$prefix")" + mkdir -p "$dir" 2>/dev/null || true +fi + +exec /usr/bin/lading "$@" +EOF # Smoke test RUN ["/usr/bin/lading", "--help"] -ENTRYPOINT ["/usr/bin/lading"] +ENTRYPOINT ["/usr/local/bin/lading-entrypoint.sh"] diff --git a/examples/lading-logrotatefs.yaml b/examples/lading-logrotatefs.yaml index 08a1da16a..adafa2130 100644 --- a/examples/lading-logrotatefs.yaml +++ b/examples/lading-logrotatefs.yaml @@ -9,7 +9,9 @@ generator: max_depth: 0 variant: "ascii" load_profile: - constant: 1.3MiB + constant: + rate: + bytes_per_second: 1.3MiB maximum_prebuild_cache_size_bytes: 1GiB mount_point: /tmp/logrotate diff --git a/lading/Cargo.toml b/lading/Cargo.toml index a8142ea5c..077e1f65e 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -96,6 +96,10 @@ tonic-prost = { workspace = true } tracing = { workspace = true } tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] } zstd = "0.13.3" +signal-hook = { version = "0.3", optional = true } +tikv-jemalloc-ctl = { version = "0.6", optional = true, features = ["profiling"] } +tikv-jemallocator = { version = "0.6", optional = true, features = ["profiling"] } +tikv-jemalloc-sys = { version = "0.6", optional = true, features = ["profiling"] } [target.'cfg(target_os = "linux")'.dependencies] procfs = { version = "0.18", default-features = false, features = [] } @@ -111,8 +115,14 @@ prost-build = { workspace = true } tonic-prost-build = { workspace = true } [features] -default = [] +default = ["jemalloc_profiling"] logrotate_fs = ["fuser"] +jemalloc_profiling = [ + "tikv-jemallocator/profiling", + "tikv-jemalloc-ctl/profiling", + "tikv-jemalloc-sys/profiling", + "signal-hook", +] [lib] doctest = false diff --git a/lading/src/bin/lading.rs b/lading/src/bin/lading.rs index 27440964f..4bf068b22 100644 --- a/lading/src/bin/lading.rs +++ b/lading/src/bin/lading.rs @@ -1,5 +1,10 @@ //! Main lading binary for load testing. +// Force jemalloc as global allocator (unix only) +#[cfg(unix)] +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + use std::{ env, fmt::{self, Display}, @@ -69,6 +74,32 @@ enum Error { Registration(#[from] lading_signal::RegisterError), } +/// Read current process RSS in bytes from /proc/self/statm (Linux only). +/// Returns 0 on non-Linux or if reading fails. +#[cfg(target_os = "linux")] +fn get_rss_bytes() -> u64 { + use std::fs; + let page_size = 4096u64; // Typical page size + fs::read_to_string("/proc/self/statm") + .ok() + .and_then(|s| s.split_whitespace().nth(1)?.parse::().ok()) + .map(|pages| pages * page_size) + .unwrap_or(0) +} + +#[cfg(not(target_os = "linux"))] +fn get_rss_bytes() -> u64 { + 0 +} + +fn log_rss_main(label: &str) { + let rss = get_rss_bytes(); + if rss > 0 { + let rss_mb = rss / (1024 * 1024); + info!("[MEMORY] {}: RSS = {} MB ({} bytes)", label, rss_mb, rss); + } +} + fn default_config_path() -> String { "/etc/lading/lading.yaml".to_string() } @@ -395,6 +426,9 @@ async fn inner_main( let (experiment_started_watcher, experiment_started_broadcast) = lading_signal::signal(); let (target_running_watcher, target_running_broadcast) = lading_signal::signal(); + #[cfg(all(feature = "jemalloc_profiling", unix))] + lading::install_jemalloc_profiling_handler(); + // Set up the telemetry sub-system. // // We support two methods to exflitrate telemetry about the target from rig: @@ -531,11 +565,23 @@ async fn inner_main( // // GENERATOR // - for cfg in config.generator { + log_rss_main("Before creating generators"); + lading::log_jemalloc_stats("Before creating generators"); + let generator_count = config.generator.len(); + for (i, cfg) in config.generator.into_iter().enumerate() { + info!("[STARTUP] Creating generator {}/{}", i + 1, generator_count); let tgt_rcv = tgt_snd.subscribe(); let generator_server = generator::Server::new(cfg, shutdown_watcher.clone())?; + log_rss_main(&format!( + "After creating generator {}/{}", + i + 1, + generator_count + )); + lading::log_jemalloc_stats(&format!("After generator {}/{}", i + 1, generator_count)); gsrv_joinset.spawn(generator_server.run(tgt_rcv)); } + log_rss_main("After all generators created"); + lading::log_jemalloc_stats("After all generators created"); // // INSPECTOR @@ -716,6 +762,12 @@ fn main() -> Result<(), Error> { "Lading running with {limit} amount of memory.", limit = memory_limit.to_string() ); + if let Ok(limit_v1) = std::fs::read_to_string("/sys/fs/cgroup/memory/memory.limit_in_bytes") { + info!("cgroup v1 memory.limit_in_bytes: {}", limit_v1.trim()); + } + if let Ok(max_v2) = std::fs::read_to_string("/sys/fs/cgroup/memory.max") { + info!("cgroup v2 memory.max: {}", max_v2.trim()); + } // Two-parser fallback logic until CliFlatLegacy is removed let args = match CliWithSubcommands::try_parse() { diff --git a/lading/src/common.rs b/lading/src/common.rs index 0782fb2f5..c7d3d66e3 100644 --- a/lading/src/common.rs +++ b/lading/src/common.rs @@ -14,23 +14,18 @@ pub struct Output { pub stdout: Behavior, } -#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Default)] #[serde(deny_unknown_fields)] #[serde(untagged)] /// Defines the [`Output`] behavior for stderr and stdout. pub enum Behavior { /// Redirect stdout, stderr to /dev/null + #[default] Quiet, /// Write to a location on-disk. Log(PathBuf), } -impl Default for Behavior { - fn default() -> Self { - Self::Quiet - } -} - impl fmt::Display for Behavior { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { match self { diff --git a/lading/src/generator/common.rs b/lading/src/generator/common.rs index eeee3d86c..8da9604fc 100644 --- a/lading/src/generator/common.rs +++ b/lading/src/generator/common.rs @@ -4,34 +4,72 @@ use byte_unit::Byte; use serde::{Deserialize, Serialize}; use std::num::{NonZeroU16, NonZeroU32}; -/// Generator-specific throttle configuration with field names that are specific -/// to byte-oriented generators. +/// Unified rate specification; defaults to bytes when `mode` is unset. +#[derive(Debug, Deserialize, Serialize, PartialEq, Clone, Copy, Default)] +#[serde(deny_unknown_fields)] +pub struct RateSpec { + /// Throttle mode; defaults to bytes when absent. + #[serde(default)] + pub mode: Option, + /// Bytes per second (bytes mode only). + #[serde(default)] + pub bytes_per_second: Option, + /// Blocks per second (blocks mode only). + #[serde(default)] + pub blocks_per_second: Option, +} + +impl RateSpec { + fn resolve(&self) -> Result<(ThrottleMode, NonZeroU32), ThrottleConversionError> { + let mode = self.mode.unwrap_or(ThrottleMode::Bytes); + match mode { + ThrottleMode::Bytes => { + let bps = self + .bytes_per_second + .ok_or(ThrottleConversionError::MissingRate)?; + let val = bps.as_u128(); + let val = + u32::try_from(val).map_err(|_| ThrottleConversionError::ValueTooLarge(bps))?; + NonZeroU32::new(val) + .map(|n| (ThrottleMode::Bytes, n)) + .ok_or(ThrottleConversionError::Zero) + } + ThrottleMode::Blocks => self + .blocks_per_second + .map(|n| (ThrottleMode::Blocks, n)) + .ok_or(ThrottleConversionError::MissingRate), + } + } +} + +/// Generator-specific throttle configuration unified for bytes or blocks. #[derive(Debug, Deserialize, Serialize, PartialEq, Clone, Copy)] #[serde(rename_all = "snake_case")] #[serde(deny_unknown_fields)] -pub enum BytesThrottleConfig { +pub enum ThrottleConfig { /// A throttle that allows the generator to produce as fast as possible AllOut, /// A throttle that attempts stable load Stable { - /// The bytes per second rate limit (e.g., "1MB", "512KiB") - bytes_per_second: Byte, + /// Rate specification (bytes or blocks). Defaults to bytes when mode is unset. + #[serde(default)] + rate: RateSpec, /// The timeout in milliseconds for IO operations. Default is 0. #[serde(default)] timeout_millis: u64, }, /// A throttle that linearly increases load over time Linear { - /// The initial bytes per second (e.g., "100KB") - initial_bytes_per_second: Byte, - /// The maximum bytes per second (e.g., "10MB") - maximum_bytes_per_second: Byte, - /// The rate of change in bytes per second per second - rate_of_change: Byte, + /// The initial rate (bytes or blocks per second) + initial: RateSpec, + /// The maximum rate (bytes or blocks per second) + maximum: RateSpec, + /// The rate of change per second (bytes or blocks per second) + rate_of_change: RateSpec, }, } -/// Error converting `BytesThrottleConfig` to internal throttle config +/// Error converting `ThrottleConfig` to internal throttle config #[derive(Debug, thiserror::Error, Clone, Copy)] pub enum ThrottleConversionError { /// Value exceeds u32 capacity @@ -43,110 +81,176 @@ pub enum ThrottleConversionError { /// Conflicting configuration provided #[error("Cannot specify both throttle config and bytes_per_second")] ConflictingConfig, + /// Missing rate specification + #[error("Rate must be specified for the selected throttle mode")] + MissingRate, + /// Mixed throttle modes in a linear profile + #[error("All rate specs in a linear throttle must use the same mode")] + MixedModes, } -/// Create a throttle from optional config and `bytes_per_second` fallback -/// -/// This function implements the standard throttle creation logic for -/// byte-oriented generators. It handles the interaction between the new -/// `BytesThrottleConfig` and the legacy `bytes_per_second` field. -/// -/// # Decision Logic -/// -/// | `BytesThrottleConfig` | `bytes_per_second` | Result | -/// |---------------------|------------------|--------| -/// | Some(config) | Some(bps) | Error - Conflicting configuration | -/// | Some(config) | None | Use `BytesThrottleConfig` | -/// | None | Some(bps) | Create Stable throttle with `timeout_micros`: 0 | -/// | None | None | `AllOut` throttle (no rate limiting) | -/// -/// # Errors +/// Indicates how a throttle should interpret its token units. +#[derive(Debug, Deserialize, Serialize, PartialEq, Clone, Copy)] +#[serde(rename_all = "snake_case")] +#[serde(deny_unknown_fields)] +pub enum ThrottleMode { + /// Throttle tokens represent bytes. + Bytes, + /// Throttle tokens represent block counts. + Blocks, +} + +/// Wrapper around a throttle and how its tokens should be interpreted. +#[derive(Debug)] +pub(super) struct BlockThrottle { + /// Underlying throttle instance. + inner: lading_throttle::Throttle, + /// Token interpretation mode. + pub mode: ThrottleMode, +} + +impl BlockThrottle { + /// Wait for capacity for a block, interpreting tokens according to `mode`. + pub(super) async fn wait_for_block( + &mut self, + block_cache: &lading_payload::block::Cache, + handle: &lading_payload::block::Handle, + ) -> Result<(), lading_throttle::Error> { + let tokens: NonZeroU32 = match self.mode { + ThrottleMode::Bytes => block_cache.peek_next_size(handle), + ThrottleMode::Blocks => NonZeroU32::new(1).expect("non-zero"), + }; + self.inner.wait_for(tokens).await + } + + /// Divide the underlying throttle capacity by `n`, preserving mode. + pub(super) fn divide(self, n: NonZeroU32) -> Result { + let throttle = self.inner.divide(n)?; + Ok(Self { + inner: throttle, + mode: self.mode, + }) + } + + /// Get the maximum capacity of the underlying throttle + pub(super) fn maximum_capacity(&self) -> u32 { + self.inner.maximum_capacity() + } +} + +/// Create a throttle from config plus optional legacy bytes-per-second fallback. /// -/// Returns an error if: -/// - Both config and `bytes_per_second` are provided (conflicting configuration) -/// - The `bytes_per_second` value exceeds `u32::MAX` -/// - The `bytes_per_second` value is zero +/// Returns a [`BlockThrottle`] that carries both the throttle and its mode +/// (bytes vs blocks). pub(super) fn create_throttle( - config: Option<&BytesThrottleConfig>, - bytes_per_second: Option<&byte_unit::Byte>, -) -> Result { - let throttle_config = match (config, bytes_per_second) { - (Some(_), Some(_)) => { - return Err(ThrottleConversionError::ConflictingConfig); + config: Option<&ThrottleConfig>, + legacy_bytes_per_second: Option<&byte_unit::Byte>, +) -> Result { + let fallback = legacy_bytes_per_second.map(|bps| ThrottleConfig::Stable { + rate: RateSpec { + mode: Some(ThrottleMode::Bytes), + bytes_per_second: Some(*bps), + blocks_per_second: None, + }, + timeout_millis: 0, + }); + + let cfg = config + .copied() + .or(fallback) + .unwrap_or(ThrottleConfig::AllOut); + let throttle = match cfg { + ThrottleConfig::AllOut => { + lading_throttle::Throttle::new_with_config(lading_throttle::Config::AllOut) + } + ThrottleConfig::Stable { + rate, + timeout_millis, + } => { + let (_mode, cap) = rate.resolve()?; + lading_throttle::Throttle::new_with_config(lading_throttle::Config::Stable { + maximum_capacity: cap, + timeout_micros: timeout_millis.saturating_mul(1000), + }) } - (Some(tc), None) => tc.try_into()?, - (None, Some(bps)) => { - let bps_value = bps.as_u128(); - if bps_value > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge(*bps)); + ThrottleConfig::Linear { + initial, + maximum, + rate_of_change, + } => { + let (m1, init) = initial.resolve()?; + let (m2, max) = maximum.resolve()?; + let (m3, rate) = rate_of_change.resolve()?; + if m1 != m2 || m1 != m3 { + return Err(ThrottleConversionError::MixedModes); } - #[allow(clippy::cast_possible_truncation)] - let bps_u32 = NonZeroU32::new(bps_value as u32).ok_or(ThrottleConversionError::Zero)?; - lading_throttle::Config::Stable { - maximum_capacity: bps_u32, - timeout_micros: 0, + lading_throttle::Throttle::new_with_config(lading_throttle::Config::Linear { + initial_capacity: init.get(), + maximum_capacity: max, + rate_of_change: rate.get(), + }) + } + }; + + let mode = match cfg { + ThrottleConfig::AllOut => ThrottleMode::Bytes, + ThrottleConfig::Stable { rate, .. } => rate.resolve()?.0, + ThrottleConfig::Linear { + initial, + maximum, + rate_of_change, + } => { + let (m1, _) = initial.resolve()?; + let (m2, _) = maximum.resolve()?; + let (m3, _) = rate_of_change.resolve()?; + if m1 != m2 || m1 != m3 { + return Err(ThrottleConversionError::MixedModes); } + m1 } - (None, None) => lading_throttle::Config::AllOut, }; - Ok(lading_throttle::Throttle::new_with_config(throttle_config)) + + Ok(BlockThrottle { + inner: throttle, + mode, + }) } -impl TryFrom<&BytesThrottleConfig> for lading_throttle::Config { +impl TryFrom<&ThrottleConfig> for lading_throttle::Config { type Error = ThrottleConversionError; #[allow(clippy::cast_possible_truncation)] - fn try_from(config: &BytesThrottleConfig) -> Result { + fn try_from(config: &ThrottleConfig) -> Result { match config { - BytesThrottleConfig::AllOut => Ok(lading_throttle::Config::AllOut), - BytesThrottleConfig::Stable { - bytes_per_second, + ThrottleConfig::AllOut => Ok(lading_throttle::Config::AllOut), + ThrottleConfig::Stable { + rate, timeout_millis, } => { - let value = bytes_per_second.as_u128(); - if value > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge(*bytes_per_second)); + let (mode, cap) = rate.resolve()?; + if mode != ThrottleMode::Bytes { + return Err(ThrottleConversionError::MixedModes); } - let value = value as u32; - let value = NonZeroU32::new(value).ok_or(ThrottleConversionError::Zero)?; Ok(lading_throttle::Config::Stable { - maximum_capacity: value, + maximum_capacity: cap, timeout_micros: timeout_millis.saturating_mul(1000), }) } - BytesThrottleConfig::Linear { - initial_bytes_per_second, - maximum_bytes_per_second, + ThrottleConfig::Linear { + initial, + maximum, rate_of_change, } => { - let initial = initial_bytes_per_second.as_u128(); - let maximum = maximum_bytes_per_second.as_u128(); - let rate = rate_of_change.as_u128(); - - if initial > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge( - *initial_bytes_per_second, - )); - } - if maximum > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge( - *maximum_bytes_per_second, - )); + let (m1, init) = initial.resolve()?; + let (m2, max) = maximum.resolve()?; + let (m3, rate) = rate_of_change.resolve()?; + if m1 != m2 || m1 != m3 || m1 != ThrottleMode::Bytes { + return Err(ThrottleConversionError::MixedModes); } - if rate > u128::from(u32::MAX) { - return Err(ThrottleConversionError::ValueTooLarge(*rate_of_change)); - } - - let initial = initial as u32; - let maximum = maximum as u32; - let rate = rate as u32; - - let maximum = NonZeroU32::new(maximum).ok_or(ThrottleConversionError::Zero)?; - Ok(lading_throttle::Config::Linear { - initial_capacity: initial, - maximum_capacity: maximum, - rate_of_change: rate, + initial_capacity: init.get(), + maximum_capacity: max, + rate_of_change: rate.get(), }) } } @@ -161,7 +265,7 @@ impl TryFrom<&BytesThrottleConfig> for lading_throttle::Config { /// - Pooled: Multiple concurrent requests with semaphore limiting (HTTP/Splunk /// HEC pattern) /// - Workers: Multiple persistent worker tasks (TCP/UDP/Unix pattern) -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub(super) enum ConcurrencyStrategy { /// Pool of connections with semaphore limiting concurrent requests Pooled { @@ -193,7 +297,7 @@ impl ConcurrencyStrategy { } /// Get the number of parallel connections for this strategy - pub(super) fn connection_count(&self) -> u16 { + pub(super) fn connection_count(self) -> u16 { match self { Self::Pooled { max_connections } => max_connections.get(), Self::Workers { count } => count.get(), diff --git a/lading/src/generator/file_gen/logrotate.rs b/lading/src/generator/file_gen/logrotate.rs index 5efdbbbb5..1ce741c71 100644 --- a/lading/src/generator/file_gen/logrotate.rs +++ b/lading/src/generator/file_gen/logrotate.rs @@ -36,7 +36,8 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode, + create_throttle, }; /// An enum to allow us to determine what operation caused an IO errror as the @@ -137,6 +138,7 @@ pub struct Config { /// Sets the [`crate::payload::Config`] of this template. pub variant: lading_payload::Config, /// Defines the number of bytes that written in each log file. + #[deprecated(note = "Use load_profile bytes-per-second instead")] bytes_per_second: Option, /// Defines the maximum internal cache of this log target. `file_gen` will /// pre-build its outputs up to the byte capacity specified here. @@ -147,8 +149,9 @@ pub struct Config { /// Whether to use a fixed or streaming block cache #[serde(default = "lading_payload::block::default_cache_method")] block_cache_method: block::CacheMethod, - /// The load throttle configuration - pub throttle: Option, + /// Throughput profile controlling emission rate (bytes or blocks). + #[serde(default)] + pub load_profile: Option, } #[derive(Debug)] @@ -209,13 +212,22 @@ impl Server { maximum_prebuild_cache_size_bytes.get() as usize, )?, }; + info!( + cache_bytes = block_cache.total_size(), + "logrotate block cache initialized" + ); let block_cache = Arc::new(block_cache); let mut handles = Vec::new(); for idx in 0..config.concurrent_logs { - let throttle = - create_throttle(config.throttle.as_ref(), config.bytes_per_second.as_ref())?; + let legacy_bps = { + #[allow(deprecated)] + { + config.bytes_per_second.as_ref() + } + }; + let throughput_throttle = create_throttle(config.load_profile.as_ref(), legacy_bps)?; let mut dir_path = config.root.clone(); let depth = rng.random_range(0..config.max_depth); @@ -234,8 +246,9 @@ impl Server { &basename, config.total_rotations, maximum_bytes_per_log, + maximum_block_size.get(), Arc::clone(&block_cache), - throttle, + throughput_throttle, shutdown.clone(), child_labels, ); @@ -283,8 +296,9 @@ struct Child { names: Vec, // The soft limit bytes per file that will trigger a rotation. maximum_bytes_per_log: NonZeroU32, + maximum_block_size: u32, block_cache: Arc, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, } @@ -295,8 +309,9 @@ impl Child { basename: &Path, total_rotations: u8, maximum_bytes_per_log: NonZeroU32, + maximum_block_size: u32, block_cache: Arc, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, shutdown: lading_signal::Watcher, labels: Vec<(String, String)>, ) -> Self { @@ -316,6 +331,7 @@ impl Child { Self { names, maximum_bytes_per_log, + maximum_block_size, block_cache, throttle, shutdown, @@ -324,7 +340,11 @@ impl Child { } async fn spin(mut self) -> Result<(), Error> { - let buffer_capacity = self.throttle.maximum_capacity() as usize; + let mut handle = self.block_cache.handle(); + let buffer_capacity = match self.throttle.mode { + ThrottleMode::Bytes => self.throttle.maximum_capacity() as usize, + ThrottleMode::Blocks => self.maximum_block_size as usize, + }; let mut total_bytes_written: u64 = 0; let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get()); @@ -357,21 +377,16 @@ impl Child { })?, ); - let mut handle = self.block_cache.handle(); - let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { // SAFETY: By construction the block cache will never be empty // except in the event of a catastrophic failure. - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { - let block = self.block_cache.advance(&mut handle); - write_bytes(block, + write_bytes(self.block_cache.advance(&mut handle), &mut fp, &mut total_bytes_written, buffer_capacity, @@ -382,7 +397,10 @@ impl Child { &self.labels).await?; } Err(err) => { - error!("Throttle request of {} is larger than throttle capacity. Block will be discarded. Error: {}", total_bytes, err); + let total_bytes = self.block_cache.peek_next_size(&handle); + error!( + "Throttle request for block size {total_bytes} failed. Block will be discarded. Error: {err}" + ); } } } diff --git a/lading/src/generator/file_gen/logrotate_fs.rs b/lading/src/generator/file_gen/logrotate_fs.rs index c07d58ffc..75de02918 100644 --- a/lading/src/generator/file_gen/logrotate_fs.rs +++ b/lading/src/generator/file_gen/logrotate_fs.rs @@ -5,6 +5,7 @@ #![allow(clippy::cast_possible_wrap)] use crate::generator; +use crate::generator::common::{RateSpec, ThrottleConfig, ThrottleConversionError, ThrottleMode}; use fuser::{ BackgroundSession, FileAttr, Filesystem, MountOption, ReplyAttr, ReplyData, ReplyDirectory, ReplyEntry, Request, spawn_mount2, @@ -13,7 +14,7 @@ use lading_payload::block; use metrics::counter; use nix::libc::{self, ENOENT}; use rand::{SeedableRng, rngs::SmallRng}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use std::{ collections::HashMap, ffi::OsStr, @@ -55,41 +56,220 @@ pub struct Config { maximum_block_size: byte_unit::Byte, /// The mount-point for this filesystem mount_point: PathBuf, - /// The load profile, controlling bytes per second as a function of time. + /// The load profile, controlling bytes or blocks per second as a function of time. load_profile: LoadProfile, + /// Optional throttle profile (bytes or blocks). When set, overrides + /// `load_profile`. + #[serde(default)] + pub throttle: Option, } /// Profile for load in this filesystem. -#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)] +#[derive(Debug, Serialize, Clone, Copy, PartialEq)] #[serde(rename_all = "snake_case")] pub enum LoadProfile { - /// Constant bytes per second + /// Constant rate (bytes or blocks per second). + Constant { + /// Rate specification (bytes or blocks). + rate: RateSpec, + }, + /// Linear growth of rate (bytes or blocks per second). + Linear { + /// Starting point for the rate. + initial: RateSpec, + /// Amount to increase per second. + rate_of_change: RateSpec, + }, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "snake_case")] +enum LoadProfileWire { + Constant { + rate: RateSpec, + }, + Linear { + initial: RateSpec, + rate_of_change: RateSpec, + }, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "snake_case")] +enum LegacyLoadProfile { Constant(byte_unit::Byte), - /// Linear growth of bytes per second Linear { - /// Starting point for bytes per second initial_bytes_per_second: byte_unit::Byte, - /// Amount to increase per second rate: byte_unit::Byte, }, + Blocks { + blocks_per_second: NonZeroU32, + }, +} + +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum LoadProfileCompat { + New(LoadProfileWire), + Legacy(LegacyLoadProfile), } impl LoadProfile { - fn to_model(self) -> model::LoadProfile { - // For now, one tick is one second. - match self { - LoadProfile::Constant(bpt) => model::LoadProfile::Constant(bpt.as_u128() as u64), - LoadProfile::Linear { + fn from_legacy(profile: &LegacyLoadProfile) -> Self { + match profile { + LegacyLoadProfile::Constant(bps) => LoadProfile::Constant { + rate: RateSpec { + mode: None, + bytes_per_second: Some(*bps), + blocks_per_second: None, + }, + }, + LegacyLoadProfile::Linear { initial_bytes_per_second, rate, - } => model::LoadProfile::Linear { - start: initial_bytes_per_second.as_u128() as u64, - rate: rate.as_u128() as u64, + } => LoadProfile::Linear { + initial: RateSpec { + mode: None, + bytes_per_second: Some(*initial_bytes_per_second), + blocks_per_second: None, + }, + rate_of_change: RateSpec { + mode: None, + bytes_per_second: Some(*rate), + blocks_per_second: None, + }, + }, + LegacyLoadProfile::Blocks { blocks_per_second } => LoadProfile::Constant { + rate: RateSpec { + mode: Some(ThrottleMode::Blocks), + bytes_per_second: None, + blocks_per_second: Some(*blocks_per_second), + }, }, } } } +impl<'de> Deserialize<'de> for LoadProfile { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let compat = LoadProfileCompat::deserialize(deserializer)?; + Ok(match compat { + LoadProfileCompat::New(profile) => match profile { + LoadProfileWire::Constant { rate } => LoadProfile::Constant { rate }, + LoadProfileWire::Linear { + initial, + rate_of_change, + } => LoadProfile::Linear { + initial, + rate_of_change, + }, + }, + LoadProfileCompat::Legacy(profile) => LoadProfile::from_legacy(&profile), + }) + } +} + +impl LoadProfile { + fn to_model(self) -> Result { + // For now, one tick is one second. + match self { + LoadProfile::Constant { rate } => { + let (mode, cap) = resolve_rate(&rate)?; + match mode { + ThrottleMode::Bytes => Ok(model::LoadProfile::Constant(u64::from(cap.get()))), + ThrottleMode::Blocks => Ok(model::LoadProfile::Blocks { + blocks_per_tick: u64::from(cap.get()), + }), + } + } + LoadProfile::Linear { + initial, + rate_of_change, + } => { + let (m1, init) = resolve_rate(&initial)?; + let (m2, rate) = resolve_rate(&rate_of_change)?; + if m1 != m2 { + return Err(ThrottleConversionError::MixedModes); + } + match m1 { + ThrottleMode::Bytes => Ok(model::LoadProfile::Linear { + start: u64::from(init.get()), + rate: u64::from(rate.get()), + }), + ThrottleMode::Blocks => Ok(model::LoadProfile::BlocksLinear { + start: u64::from(init.get()), + rate: u64::from(rate.get()), + }), + } + } + } + } +} + +fn resolve_rate(rate: &RateSpec) -> Result<(ThrottleMode, NonZeroU32), ThrottleConversionError> { + let mode = rate.mode.unwrap_or(ThrottleMode::Bytes); + match mode { + ThrottleMode::Bytes => { + let bps = rate + .bytes_per_second + .ok_or(ThrottleConversionError::MissingRate)?; + let val = bps.as_u128(); + if val > u128::from(u32::MAX) { + return Err(ThrottleConversionError::ValueTooLarge(bps)); + } + NonZeroU32::new(val as u32) + .map(|n| (ThrottleMode::Bytes, n)) + .ok_or(ThrottleConversionError::Zero) + } + ThrottleMode::Blocks => rate + .blocks_per_second + .map(|n| (ThrottleMode::Blocks, n)) + .ok_or(ThrottleConversionError::MissingRate), + } +} + +fn load_profile_from_throttle( + throttle: &ThrottleConfig, +) -> Result { + match throttle { + ThrottleConfig::AllOut => Ok(model::LoadProfile::Blocks { blocks_per_tick: 1 }), + ThrottleConfig::Stable { rate, .. } => { + let (mode, cap) = resolve_rate(rate)?; + match mode { + ThrottleMode::Bytes => Ok(model::LoadProfile::Constant(u64::from(cap.get()))), + ThrottleMode::Blocks => Ok(model::LoadProfile::Blocks { + blocks_per_tick: u64::from(cap.get()), + }), + } + } + ThrottleConfig::Linear { + initial, + maximum, + rate_of_change, + } => { + let (m1, init) = resolve_rate(initial)?; + let (m2, _max) = resolve_rate(maximum)?; + let (m3, rate) = resolve_rate(rate_of_change)?; + if m1 != m2 || m1 != m3 { + return Err(ThrottleConversionError::MixedModes); + } + match m1 { + ThrottleMode::Bytes => Ok(model::LoadProfile::Linear { + start: u64::from(init.get()), + rate: u64::from(rate.get()), + }), + ThrottleMode::Blocks => Ok(model::LoadProfile::BlocksLinear { + start: u64::from(init.get()), + rate: u64::from(rate.get()), + }), + } + } + } +} + #[derive(thiserror::Error, Debug)] /// Error for `LogrotateFs` pub enum Error { @@ -99,6 +279,9 @@ pub enum Error { /// Creation of payload blocks failed. #[error("Block creation error: {0}")] Block(#[from] block::Error), + /// Throttle conversion error + #[error("Throttle configuration error: {0}")] + ThrottleConversion(#[from] ThrottleConversionError), /// Failed to convert, value is 0 #[error("Value provided must not be zero")] Zero, @@ -154,10 +337,22 @@ impl Server { // divvy this up in the future. total_bytes.get() as usize, )?; + let load_profile = if let Some(throttle) = &config.throttle { + load_profile_from_throttle(throttle)? + } else { + config.load_profile.to_model()? + }; let start_time = Instant::now(); let start_time_system = SystemTime::now(); + let block_cache_size = block_cache.total_size(); + info!( + "LogrotateFS block cache initialized: requested={}, actual={} bytes, blocks={}", + config.maximum_prebuild_cache_size_bytes, + block_cache_size, + block_cache.len() + ); let state = model::State::new( &mut rng, start_time.elapsed().as_secs(), @@ -166,7 +361,7 @@ impl Server { block_cache, config.max_depth, config.concurrent_logs, - config.load_profile.to_model(), + load_profile, ); info!( diff --git a/lading/src/generator/file_gen/logrotate_fs/model.rs b/lading/src/generator/file_gen/logrotate_fs/model.rs index bc5c7d9d9..0829b75fe 100644 --- a/lading/src/generator/file_gen/logrotate_fs/model.rs +++ b/lading/src/generator/file_gen/logrotate_fs/model.rs @@ -13,6 +13,17 @@ pub(crate) type Tick = u64; /// The identification node number pub(crate) type Inode = usize; +/// Parameters describing a file's position in the rotation hierarchy +#[derive(Debug, Clone, Copy)] +pub(crate) struct FileHierarchy { + /// The parent node of this file + pub(crate) parent: Inode, + /// The peer of this file (next in rotation sequence) + pub(crate) peer: Option, + /// The group ID shared by all files in the same rotation group + pub(crate) group_id: u16, +} + /// Model representation of a `File`. Does not actually contain any bytes but /// stores sufficient metadata to determine access patterns over time. #[derive(Debug, Clone)] @@ -82,6 +93,9 @@ pub(crate) struct File { /// starting positions in the cache. cache_offset: u64, + /// Handle for iterating block sizes when simulating block-based writes. + block_handle: block::Handle, + /// The random number generator used to generate the cache offset. rng: SmallRng, } @@ -112,20 +126,34 @@ pub(crate) fn generate_cache_offset(rng: &mut SmallRng, total_cache_size: u64) - rng.random_range(0..total_cache_size) } +fn generate_block_handle(rng: &mut R, block_cache: &block::Cache) -> block::Handle +where + R: Rng + ?Sized, +{ + let mut handle = block_cache.handle(); + let len = block_cache.len(); + if len == 0 { + return handle; + } + let offset = rng.random_range(0..len); + for _ in 0..offset { + let _ = block_cache.advance(&mut handle); + } + handle +} impl File { /// Create a new instance of `File` pub(crate) fn new( mut rng: SmallRng, - parent: Inode, - group_id: u16, + hierarchy: FileHierarchy, bytes_per_tick: u64, now: Tick, - peer: Option, total_cache_size: u64, + block_handle: block::Handle, ) -> Self { let cache_offset = generate_cache_offset(&mut rng, total_cache_size); Self { - parent, + parent: hierarchy.parent, bytes_written: 0, bytes_read: 0, access_tick: now, @@ -136,12 +164,13 @@ impl File { read_only: false, read_only_since: None, ordinal: 0, - peer, - group_id, + peer: hierarchy.peer, + group_id: hierarchy.group_id, open_handles: 0, unlinked: false, max_offset_observed: 0, cache_offset, + block_handle, rng, } } @@ -295,6 +324,18 @@ pub(crate) enum LoadProfile { /// Amount to increase per tick rate: u64, }, + /// Constant blocks per tick + Blocks { + /// Blocks per tick + blocks_per_tick: u64, + }, + /// Linear growth of blocks per tick + BlocksLinear { + /// Starting point for blocks per tick + start: u64, + /// Amount to increase per tick + rate: u64, + }, } /// The state of the filesystem @@ -505,16 +546,21 @@ impl State { // Generate a new SmallRng instance from the states rng to be used in deterministic offset generation let child_seed: [u8; 32] = rng.random(); - let child_rng = SmallRng::from_seed(child_seed); + let mut child_rng = SmallRng::from_seed(child_seed); + let block_handle = generate_block_handle(&mut child_rng, &state.block_cache); + let hierarchy = FileHierarchy { + parent: current_inode, + peer: None, + group_id, + }; let file = File::new( child_rng, - current_inode, - group_id, + hierarchy, 0, state.now, - None, state.total_cache_size, + block_handle, ); state.nodes.insert(file_inode, Node::File { file }); @@ -581,20 +627,41 @@ impl State { } } + fn blocks_to_bytes( + block_cache: &block::Cache, + blocks_len: u64, + total_cache_size: u64, + handle: &mut block::Handle, + blocks_per_tick: u64, + ) -> u64 { + if blocks_per_tick == 0 { + return 0; + } + if blocks_len == 0 { + return 0; + } + let cycles = blocks_per_tick / blocks_len; + let remainder = blocks_per_tick % blocks_len; + let mut bytes = total_cache_size.saturating_mul(cycles); + for _ in 0..remainder { + let size = block_cache.peek_next_size(handle); + bytes = bytes.saturating_add(u64::from(size.get())); + let _ = block_cache.advance(handle); + } + bytes + } + #[inline] #[allow(clippy::too_many_lines)] fn advance_time_inner(&mut self, now: Tick) { assert!(now >= self.now); - // Compute new global bytes_per_tick, at now - 1. + // Compute new global throughput, at now - 1. let elapsed_ticks = now.saturating_sub(self.initial_tick).saturating_sub(1); - let bytes_per_tick = match &self.load_profile { - LoadProfile::Constant(bytes) => *bytes, - LoadProfile::Linear { start, rate } => { - start.saturating_add(rate.saturating_mul(elapsed_ticks)) - } - }; + let block_cache = &self.block_cache; + let blocks_len = block_cache.len() as u64; + let total_cache_size = block_cache.total_size(); // Update each File's bytes_per_tick but do not advance time, as that is // done later. for node in self.nodes.values_mut() { @@ -602,7 +669,30 @@ impl State { && !file.read_only && !file.unlinked { - file.bytes_per_tick = bytes_per_tick; + file.bytes_per_tick = match &self.load_profile { + LoadProfile::Constant(bytes) => *bytes, + LoadProfile::Linear { start, rate } => { + start.saturating_add(rate.saturating_mul(elapsed_ticks)) + } + LoadProfile::Blocks { blocks_per_tick } => Self::blocks_to_bytes( + block_cache, + blocks_len, + total_cache_size, + &mut file.block_handle, + *blocks_per_tick, + ), + LoadProfile::BlocksLinear { start, rate } => { + let blocks_per_tick = + start.saturating_add(rate.saturating_mul(elapsed_ticks)); + Self::blocks_to_bytes( + block_cache, + blocks_len, + total_cache_size, + &mut file.block_handle, + blocks_per_tick, + ) + } + }; } } @@ -611,7 +701,15 @@ impl State { } for inode in self.inode_scratch.drain(..) { - let (rotated_inode, parent_inode, group_id, ordinal, file_rng, cache_offset) = { + let ( + rotated_inode, + parent_inode, + group_id, + ordinal, + file_rng, + cache_offset, + bytes_per_tick, + ) = { // If the node pointed to by inode doesn't exist, that's a // catastrophic programming error. We just copied all inode to node // pairs. @@ -656,6 +754,7 @@ impl State { file.ordinal, file.rng.clone(), file.cache_offset, + file.bytes_per_tick, ) }; @@ -666,14 +765,20 @@ impl State { // Set bytes_per_tick to current and now to now-1 else we'll never // ramp properly. let new_file_inode = self.next_inode; + let mut file_rng = file_rng; + let block_handle = generate_block_handle(&mut file_rng, &self.block_cache); + let hierarchy = FileHierarchy { + parent: parent_inode, + peer: Some(rotated_inode), + group_id, + }; let mut new_file = File::new( file_rng, - parent_inode, - group_id, + hierarchy, bytes_per_tick, self.now.saturating_sub(1), - Some(rotated_inode), self.total_cache_size, + block_handle, ); let new_file_cache_offset = new_file.cache_offset; @@ -1277,20 +1382,25 @@ mod test { } // Property 7: bytes_written are tick accurate - for (&inode, node) in &state.nodes { - if let Node::File { file } = node { - let end_tick = file.read_only_since.unwrap_or(state.now); - let expected_bytes = compute_expected_bytes_written( - &state.load_profile, - state.initial_tick, - file.created_tick, - end_tick, - ); - assert_eq!( - file.bytes_written, expected_bytes, - "bytes_written ({}) does not match expected_bytes_written ({expected_bytes}) for file with inode {inode}", - file.bytes_written, - ); + if !matches!( + state.load_profile, + LoadProfile::Blocks { .. } | LoadProfile::BlocksLinear { .. } + ) { + for (&inode, node) in &state.nodes { + if let Node::File { file } = node { + let end_tick = file.read_only_since.unwrap_or(state.now); + let expected_bytes = compute_expected_bytes_written( + &state.load_profile, + state.initial_tick, + file.created_tick, + end_tick, + ); + assert_eq!( + file.bytes_written, expected_bytes, + "bytes_written ({}) does not match expected_bytes_written ({expected_bytes}) for file with inode {inode}", + file.bytes_written, + ); + } } } @@ -1392,6 +1502,7 @@ mod test { .saturating_add(rate.saturating_mul(sum_of_terms)); total_bytes } + LoadProfile::Blocks { .. } | LoadProfile::BlocksLinear { .. } => 0, } } diff --git a/lading/src/generator/file_gen/traditional.rs b/lading/src/generator/file_gen/traditional.rs index 72ea23fd3..4589a0d71 100644 --- a/lading/src/generator/file_gen/traditional.rs +++ b/lading/src/generator/file_gen/traditional.rs @@ -30,6 +30,7 @@ use tokio::{ fs, io::{AsyncWriteExt, BufWriter}, task::{JoinError, JoinSet}, + time::{Duration, Instant}, }; use tracing::{error, info}; @@ -37,7 +38,8 @@ use lading_payload::{self, block}; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode, + create_throttle, }; #[derive(thiserror::Error, Debug)] @@ -103,6 +105,7 @@ pub struct Config { /// written _continuously_ per second from this target. Higher bursts are /// possible as the internal governor accumulates, up to /// `maximum_bytes_burst`. + #[deprecated(note = "Use load_profile.bytes.bytes_per_second instead")] bytes_per_second: Option, /// Defines the maximum internal cache of this log target. `file_gen` will /// pre-build its outputs up to the byte capacity specified here. @@ -118,8 +121,21 @@ pub struct Config { /// tailing software to remove old files. #[serde(default = "default_rotation")] rotate: bool, - /// The load throttle configuration - pub throttle: Option, + /// Throughput profile controlling emission rate (bytes or blocks). + #[serde(default)] + pub load_profile: Option, + /// Optional fixed interval between blocks. When set, the generator waits + /// this duration before emitting the next block, regardless of byte size. + pub block_interval_millis: Option, + /// Flush after each block. Useful when block intervals are large and the + /// buffered writer would otherwise delay writes to disk. + #[serde(default)] + pub flush_each_block: bool, + /// Optional starting line offset into the block cache. This advances + /// through blocks until the cumulative line count reaches this value, + /// then begins emitting from that block. If data point counts are not + /// available for a payload, this setting is effectively ignored. + pub start_line_index: Option, } #[derive(Debug)] @@ -166,8 +182,13 @@ impl Server { let file_index = Arc::new(AtomicU32::new(0)); for _ in 0..config.duplicates { - let throttle = - create_throttle(config.throttle.as_ref(), config.bytes_per_second.as_ref())?; + let legacy_bps = { + #[allow(deprecated)] + { + config.bytes_per_second.as_ref() + } + }; + let throughput_throttle = create_throttle(config.load_profile.as_ref(), legacy_bps)?; let block_cache = match config.block_cache_method { block::CacheMethod::Fixed => block::Cache::fixed_with_max_overhead( @@ -190,11 +211,15 @@ impl Server { let child = Child { path_template: config.path_template.clone(), maximum_bytes_per_file, - throttle, + throttle: throughput_throttle, block_cache: Arc::new(block_cache), + maximum_block_size: maximum_block_size as u64, file_index: Arc::clone(&file_index), rotate: config.rotate, shutdown: shutdown.clone(), + block_interval: config.block_interval_millis.map(Duration::from_millis), + flush_each_block: config.flush_each_block, + start_line_index: config.start_line_index.unwrap_or(0), }; handles.spawn(child.spin()); @@ -264,22 +289,48 @@ impl Server { struct Child { path_template: String, maximum_bytes_per_file: NonZeroU32, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, + maximum_block_size: u64, rotate: bool, file_index: Arc, shutdown: lading_signal::Watcher, + block_interval: Option, + flush_each_block: bool, + start_line_index: u64, } impl Child { + #[allow(clippy::too_many_lines)] pub(crate) async fn spin(mut self) -> Result<(), Error> { - let buffer_capacity = self.throttle.maximum_capacity() as usize; let mut total_bytes_written: u64 = 0; let maximum_bytes_per_file: u64 = u64::from(self.maximum_bytes_per_file.get()); let mut file_index = self.file_index.fetch_add(1, Ordering::Relaxed); let mut path = path_from_template(&self.path_template, file_index); + let mut handle = self.block_cache.handle(); + if self.start_line_index > 0 { + let mut remaining = self.start_line_index; + // Walk blocks until we reach or surpass the requested line offset. + // If metadata is missing, assume at least one data point to ensure progress. + loop { + let md = self.block_cache.peek_next_metadata(&handle); + let mut lines = md.data_points.unwrap_or(1); + if lines == 0 { + lines = 1; + } + if lines >= remaining { + break; + } + remaining = remaining.saturating_sub(lines); + let _ = self.block_cache.advance(&mut handle); + } + } + let buffer_capacity = match self.throttle.mode { + ThrottleMode::Bytes => self.throttle.maximum_capacity() as usize, + ThrottleMode::Blocks => usize::try_from(self.maximum_block_size).unwrap_or(usize::MAX), + }; let mut fp = BufWriter::with_capacity( buffer_capacity, fs::OpenOptions::new() @@ -298,26 +349,43 @@ impl Child { } })?, ); - - let mut handle = self.block_cache.handle(); - let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); + let mut next_tick = self + .block_interval + .as_ref() + .map(|dur| Instant::now() + *dur); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let block = self.block_cache.advance(&mut handle); - let total_bytes = u64::from(total_bytes.get()); + let total_bytes = u64::from(block.total_bytes.get()); + if let Some(dur) = self.block_interval { + if let Some(deadline) = next_tick { + tokio::select! { + () = tokio::time::sleep_until(deadline) => {}, + () = &mut shutdown_wait => { + fp.flush().await?; + info!("shutdown signal received"); + return Ok(()); + }, + } + next_tick = Some(deadline + dur); + } else { + next_tick = Some(Instant::now() + dur); + } + } { fp.write_all(&block.bytes).await?; counter!("bytes_written").increment(total_bytes); total_bytes_written += total_bytes; } + if self.flush_each_block { + fp.flush().await?; + } if total_bytes_written > maximum_bytes_per_file { fp.flush().await?; @@ -351,7 +419,10 @@ impl Child { } } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + let total_bytes = self.block_cache.peek_next_size(&handle); + error!( + "Throttle request for block size {total_bytes} failed. Block will be discarded. Error: {err}" + ); } } } diff --git a/lading/src/generator/grpc.rs b/lading/src/generator/grpc.rs index 8cda3eab1..12b449074 100644 --- a/lading/src/generator/grpc.rs +++ b/lading/src/generator/grpc.rs @@ -36,7 +36,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; /// Errors produced by [`Grpc`] @@ -115,7 +115,7 @@ pub struct Config { /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// No-op tonic codec. Sends raw bytes and returns the number of bytes received. @@ -175,7 +175,7 @@ pub struct Grpc { target_uri: Uri, rpc_path: PathAndQuery, shutdown: lading_signal::Watcher, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: block::Cache, metric_labels: Vec<(String, String)>, } @@ -304,10 +304,9 @@ impl Grpc { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - _ = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { + let _ = result; let block = self.block_cache.advance(&mut handle); let block_length = block.bytes.len(); counter!("requests_sent", &self.metric_labels).increment(1); diff --git a/lading/src/generator/http.rs b/lading/src/generator/http.rs index 3ba49a64d..889f13361 100644 --- a/lading/src/generator/http.rs +++ b/lading/src/generator/http.rs @@ -30,7 +30,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, ConcurrencyStrategy, MetricsBuilder, ThrottleConversionError, + BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; @@ -75,7 +75,7 @@ pub struct Config { /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(thiserror::Error, Debug)] @@ -123,7 +123,7 @@ pub struct Http { method: hyper::Method, headers: hyper::HeaderMap, concurrency: ConcurrencyStrategy, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -225,9 +225,8 @@ impl Http { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let block = self.block_cache.advance(&mut handle); @@ -285,6 +284,7 @@ impl Http { } Err(err) => { + let total_bytes: std::num::NonZero = self.block_cache.peek_next_size(&handle); error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); } } diff --git a/lading/src/generator/passthru_file.rs b/lading/src/generator/passthru_file.rs index 5baddd096..f46ecf88d 100644 --- a/lading/src/generator/passthru_file.rs +++ b/lading/src/generator/passthru_file.rs @@ -14,7 +14,6 @@ use std::{num::NonZeroU32, path::PathBuf, time::Duration}; use tokio::{fs, io::AsyncWriteExt}; use byte_unit::Byte; -use lading_throttle::Throttle; use metrics::{counter, gauge}; use rand::{SeedableRng, rngs::StdRng}; use serde::{Deserialize, Serialize}; @@ -24,7 +23,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; #[derive(Debug, Deserialize, Serialize, PartialEq, Clone)] @@ -45,7 +44,7 @@ pub struct Config { /// The maximum size in bytes of the cache of prebuilt messages pub maximum_prebuild_cache_size_bytes: Byte, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// Errors produced by [`PassthruFile`]. @@ -74,7 +73,7 @@ pub enum Error { /// This generator is responsible for sending data to a file on disk. pub struct PassthruFile { path: PathBuf, - throttle: Throttle, + throttle: BlockThrottle, block_cache: block::Cache, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -183,9 +182,9 @@ impl PassthruFile { continue; }; - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - _ = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { + let _ = result; let block = self.block_cache.advance(&mut handle); match current_file.write_all(&block.bytes).await { Ok(()) => { diff --git a/lading/src/generator/splunk_hec.rs b/lading/src/generator/splunk_hec.rs index 9cc03e7a3..2e8c8d2ba 100644 --- a/lading/src/generator/splunk_hec.rs +++ b/lading/src/generator/splunk_hec.rs @@ -45,7 +45,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; static CONNECTION_SEMAPHORE: OnceCell = OnceCell::new(); @@ -93,7 +93,7 @@ pub struct Config { /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(thiserror::Error, Debug)] @@ -152,7 +152,7 @@ pub struct SplunkHec { uri: Uri, token: String, parallel_connections: u16, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, channels: Channels, @@ -288,10 +288,9 @@ impl SplunkHec { .next() .expect("channel should never be empty") .clone(); - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let client = client.clone(); @@ -319,6 +318,7 @@ impl SplunkHec { tokio::spawn(send_hec_request(permit, block_length, labels, channel, client, request, request_shutdown.clone(), uri_clone)); } Err(err) => { + let total_bytes = self.block_cache.peek_next_size(&handle); error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); } } diff --git a/lading/src/generator/tcp.rs b/lading/src/generator/tcp.rs index 8bcdb3efe..432e08193 100644 --- a/lading/src/generator/tcp.rs +++ b/lading/src/generator/tcp.rs @@ -32,7 +32,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, ConcurrencyStrategy, MetricsBuilder, ThrottleConversionError, + BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; @@ -61,7 +61,7 @@ pub struct Config { #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(thiserror::Error, Debug)] @@ -216,7 +216,7 @@ impl Tcp { struct TcpWorker { addr: SocketAddr, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -250,9 +250,8 @@ impl TcpWorker { continue; }; - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let block = self.block_cache.advance(&mut handle); @@ -272,6 +271,7 @@ impl TcpWorker { } } Err(err) => { + let total_bytes: std::num::NonZero = self.block_cache.peek_next_size(&handle); error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); } } diff --git a/lading/src/generator/trace_agent.rs b/lading/src/generator/trace_agent.rs index c31e8ef73..178b7617b 100644 --- a/lading/src/generator/trace_agent.rs +++ b/lading/src/generator/trace_agent.rs @@ -17,7 +17,7 @@ //! Additional metrics may be emitted by this generator's [throttle]. use super::General; use crate::generator::common::{ - BytesThrottleConfig, ConcurrencyStrategy, MetricsBuilder, ThrottleConversionError, + BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; use bytes::Bytes; @@ -175,7 +175,7 @@ pub struct Config { /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } #[derive(thiserror::Error, Debug)] @@ -246,7 +246,7 @@ pub struct TraceAgent { trace_endpoint: Uri, backoff_behavior: BackoffBehavior, concurrency: ConcurrencyStrategy, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -353,9 +353,8 @@ impl TraceAgent { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { let block = self.block_cache.advance(&mut handle); @@ -378,6 +377,7 @@ impl TraceAgent { }); } Err(err) => { + let total_bytes = self.block_cache.peek_next_size(&handle); error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}", total_bytes = total_bytes); } } diff --git a/lading/src/generator/udp.rs b/lading/src/generator/udp.rs index 3062ae8e5..b34113a90 100644 --- a/lading/src/generator/udp.rs +++ b/lading/src/generator/udp.rs @@ -32,7 +32,7 @@ use lading_payload::block; use super::General; use crate::generator::common::{ - BytesThrottleConfig, ConcurrencyStrategy, MetricsBuilder, ThrottleConversionError, + BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; @@ -66,7 +66,7 @@ pub struct Config { #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// Errors produced by [`Udp`]. @@ -219,7 +219,7 @@ impl Udp { struct UdpWorker { addr: SocketAddr, - throttle: lading_throttle::Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -234,8 +234,6 @@ impl UdpWorker { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { conn = UdpSocket::bind("127.0.0.1:0"), if connection.is_none() => { match conn { @@ -253,7 +251,7 @@ impl UdpWorker { } } } - result = self.throttle.wait_for(total_bytes), if connection.is_some() => { + result = self.throttle.wait_for_block(&self.block_cache, &handle), if connection.is_some() => { match result { Ok(()) => { let sock = connection.expect("connection failed"); @@ -275,6 +273,7 @@ impl UdpWorker { } } Err(err) => { + let total_bytes = self.block_cache.peek_next_size(&handle); error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); } } diff --git a/lading/src/generator/unix_datagram.rs b/lading/src/generator/unix_datagram.rs index 006e0cc78..1d410b102 100644 --- a/lading/src/generator/unix_datagram.rs +++ b/lading/src/generator/unix_datagram.rs @@ -14,7 +14,6 @@ use byte_unit::{Byte, Unit}; use futures::future::join_all; use lading_payload::block; -use lading_throttle::Throttle; use metrics::counter; use rand::{SeedableRng, rngs::StdRng}; use serde::{Deserialize, Serialize}; @@ -29,7 +28,7 @@ use tracing::{debug, error, info}; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; fn default_parallel_connections() -> u16 { @@ -68,7 +67,7 @@ pub struct Config { #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// Errors produced by [`UnixDatagram`]. @@ -233,7 +232,7 @@ impl UnixDatagram { #[derive(Debug)] struct Child { path: PathBuf, - throttle: Throttle, + throttle: BlockThrottle, block_cache: Arc, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -269,10 +268,8 @@ impl Child { let shutdown_wait = self.shutdown.recv(); tokio::pin!(shutdown_wait); loop { - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { // NOTE When we write into a unix socket it may be that only @@ -298,6 +295,7 @@ impl Child { } } Err(err) => { + let total_bytes = self.block_cache.peek_next_size(&handle); error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); } } diff --git a/lading/src/generator/unix_stream.rs b/lading/src/generator/unix_stream.rs index 366f20662..34f7622a2 100644 --- a/lading/src/generator/unix_stream.rs +++ b/lading/src/generator/unix_stream.rs @@ -12,7 +12,6 @@ //! use lading_payload::block; -use lading_throttle::Throttle; use metrics::counter; use rand::{SeedableRng, rngs::StdRng}; use serde::{Deserialize, Serialize}; @@ -27,7 +26,7 @@ use tracing::{debug, error, info, warn}; use super::General; use crate::generator::common::{ - BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle, + BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle, }; fn default_parallel_connections() -> u16 { @@ -59,7 +58,7 @@ pub struct Config { #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, /// The load throttle configuration - pub throttle: Option, + pub throttle: Option, } /// Errors produced by [`UnixStream`]. @@ -229,7 +228,7 @@ impl UnixStream { #[derive(Debug)] struct Child { path: PathBuf, - throttle: Throttle, + throttle: BlockThrottle, block_cache: block::Cache, metric_labels: Vec<(String, String)>, shutdown: lading_signal::Watcher, @@ -268,10 +267,8 @@ impl Child { continue; }; - let total_bytes = self.block_cache.peek_next_size(&handle); - tokio::select! { - result = self.throttle.wait_for(total_bytes) => { + result = self.throttle.wait_for_block(&self.block_cache, &handle) => { match result { Ok(()) => { // NOTE When we write into a unix stream it may be that only @@ -321,7 +318,10 @@ impl Child { } } Err(err) => { - error!("Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}"); + let total_bytes = self.block_cache.peek_next_size(&handle); + error!( + "Throttle request of {total_bytes} is larger than throttle capacity. Block will be discarded. Error: {err}" + ); } } } diff --git a/lading/src/lib.rs b/lading/src/lib.rs index f8d382aab..95639d981 100644 --- a/lading/src/lib.rs +++ b/lading/src/lib.rs @@ -52,3 +52,112 @@ pub fn get_available_memory() -> Byte { let sys = System::new_all(); Byte::from_u64(sys.total_memory()) } + +#[cfg(all(feature = "jemalloc_profiling", unix))] +/// Log jemalloc memory statistics for debugging. +pub fn log_jemalloc_stats(label: &str) { + use std::os::raw::{c_char, c_void}; + use tikv_jemalloc_ctl::epoch; + use tikv_jemalloc_sys as jemalloc_sys; + use tracing::info; + + // Refresh jemalloc stats + let _ = epoch::advance(); + + // Read stats via mallctl + fn read_stat(name: &[u8]) -> usize { + let mut value: usize = 0; + let mut len = std::mem::size_of::(); + unsafe { + jemalloc_sys::mallctl( + name.as_ptr() as *const c_char, + &mut value as *mut _ as *mut c_void, + &mut len, + std::ptr::null_mut(), + 0, + ); + } + value + } + + let allocated = read_stat(b"stats.allocated\0"); + let active = read_stat(b"stats.active\0"); + let resident = read_stat(b"stats.resident\0"); + let retained = read_stat(b"stats.retained\0"); + let mapped = read_stat(b"stats.mapped\0"); + + info!( + "[JEMALLOC] {}: allocated={} MB, active={} MB, resident={} MB, retained={} MB, mapped={} MB", + label, + allocated / (1024 * 1024), + active / (1024 * 1024), + resident / (1024 * 1024), + retained / (1024 * 1024), + mapped / (1024 * 1024) + ); +} + +#[cfg(not(all(feature = "jemalloc_profiling", unix)))] +/// Stub for non-jemalloc builds +pub fn log_jemalloc_stats(_label: &str) {} + +#[cfg(all(feature = "jemalloc_profiling", unix))] +/// Install a SIGUSR1 handler that writes jemalloc heap profiles to `/tmp`. +/// +/// This requires running lading with the `jemalloc_profiling` feature enabled +/// and with jemalloc profiling turned on at runtime, for example via +/// `MALLOC_CONF=prof:true,prof_active:true`. When the process receives SIGUSR1 +/// a heap profile will be written to `/tmp/lading-heap-.heap`. +pub fn install_jemalloc_profiling_handler() { + use signal_hook::consts::signal::SIGUSR1; + use signal_hook::iterator::Signals; + use std::ffi::CString; + use std::os::raw::{c_char, c_void}; + use std::time::{SystemTime, UNIX_EPOCH}; + use tikv_jemalloc_ctl::epoch; + use tikv_jemalloc_sys as jemalloc_sys; + use tokio::task; + use tracing::{error, info}; + + task::spawn_blocking(move || { + let mut signals = match Signals::new([SIGUSR1]) { + Ok(sig) => sig, + Err(err) => { + error!(?err, "failed to install jemalloc profiling signal handler"); + return; + } + }; + + for signal in &mut signals { + let _ = epoch::advance(); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let path = format!("/tmp/lading-heap-{timestamp}.heap"); + let c_path = match CString::new(path.clone()) { + Ok(c) => c, + Err(err) => { + error!(?err, %path, "failed to build CString for heap profile path"); + continue; + } + }; + let mut filename_ptr = c_path.as_ptr(); + // Safety: mallctl expects a pointer to a C string pointer for prof.dump. + let res = unsafe { + jemalloc_sys::mallctl( + b"prof.dump\0".as_ptr() as *const c_char, + std::ptr::null_mut::(), + std::ptr::null_mut(), + &mut filename_ptr as *mut _ as *mut c_void, + std::mem::size_of::<*const c_char>(), + ) + }; + if res == 0 { + info!(signal, %path, "wrote jemalloc heap profile"); + } else { + error!(signal, %path, res, "failed to write jemalloc heap profile"); + } + } + }); +} diff --git a/lading_payload/Cargo.toml b/lading_payload/Cargo.toml index f8609248a..294218838 100644 --- a/lading_payload/Cargo.toml +++ b/lading_payload/Cargo.toml @@ -31,11 +31,13 @@ time = { version = "0.3", features = ["formatting"] } tracing = { workspace = true } tokio = { workspace = true } arbitrary = { version = "1", optional = true, features = ["derive"] } +chrono = "0.4.42" [dev-dependencies] proptest = { workspace = true } proptest-derive = { workspace = true } criterion = { version = "0.8", features = ["html_reports"] } +tempfile = { workspace = true } [features] default = [] diff --git a/lading_payload/src/block.rs b/lading_payload/src/block.rs index e565875c2..7fd84861d 100644 --- a/lading_payload/src/block.rs +++ b/lading_payload/src/block.rs @@ -14,6 +14,34 @@ use serde::{Deserialize, Serialize}; use tokio::time::Instant; use tracing::{Level, debug, error, info, span, warn}; +/// Read current process RSS in bytes from /proc/self/statm (Linux only). +/// Returns 0 on non-Linux or if reading fails. +#[cfg(target_os = "linux")] +fn get_rss_bytes() -> u64 { + use std::fs; + let page_size = 4096u64; // Typical page size + fs::read_to_string("/proc/self/statm") + .ok() + .and_then(|s| s.split_whitespace().nth(1)?.parse::().ok()) + .map(|pages| pages * page_size) + .unwrap_or(0) +} + +#[cfg(not(target_os = "linux"))] +fn get_rss_bytes() -> u64 { + 0 +} + +fn log_rss(label: &str) { + let rss = get_rss_bytes(); + if rss > 0 { + let rss_str = Byte::from_u64(rss) + .get_appropriate_unit(byte_unit::UnitType::Binary) + .to_string(); + info!("[MEMORY] {}: RSS = {}", label, rss_str); + } +} + /// Error for block construction #[derive(Debug, thiserror::Error)] pub enum SpinError { @@ -23,6 +51,12 @@ pub enum SpinError { /// Static payload creation error #[error(transparent)] Static(#[from] crate::statik::Error), + /// Static line-rate payload creation error + #[error(transparent)] + StaticLinesPerSecond(#[from] crate::statik_line_rate::Error), + /// Static second-grouped payload creation error + #[error(transparent)] + StaticSecond(#[from] crate::statik_second::Error), /// rng slice is Empty #[error("RNG slice is empty")] EmptyRng, @@ -55,6 +89,12 @@ pub enum Error { /// Static payload creation error #[error(transparent)] Static(#[from] crate::statik::Error), + /// Static line-rate payload creation error + #[error(transparent)] + StaticLinesPerSecond(#[from] crate::statik_line_rate::Error), + /// Static second-grouped payload creation error + #[error(transparent)] + StaticSecond(#[from] crate::statik_second::Error), /// Error for crate deserialization #[error("Deserialization error: {0}")] Deserialize(#[from] crate::Error), @@ -88,6 +128,22 @@ pub struct Block { pub metadata: BlockMetadata, } +impl Block { + /// Estimate the actual heap memory used by this block. + /// This includes the Block struct, Bytes overhead, and payload. + pub fn estimated_heap_bytes(&self) -> usize { + // Block struct size (stack/inline) + let block_struct_size = std::mem::size_of::(); + // Bytes payload (actual data) + let payload_size = self.bytes.len(); + // Bytes has internal Arc overhead (~32 bytes on 64-bit) + let bytes_overhead = 32; + // jemalloc has ~16 bytes overhead per allocation + let alloc_overhead = 16; + block_struct_size + payload_size + bytes_overhead + alloc_overhead + } +} + /// Metadata associated with a Block #[derive(Debug, Clone, Default, Copy)] pub struct BlockMetadata { @@ -170,7 +226,7 @@ pub enum Cache { /// Each independent consumer should create its own Handle by calling /// `Cache::handle()`. Handles maintain their own position in the cache /// and advance independently. -#[derive(Debug)] +#[derive(Debug, Clone)] #[allow(missing_copy_implementations)] // intentionally not Copy to force callers to call `handle`. pub struct Handle { idx: usize, @@ -200,6 +256,9 @@ impl Cache { where R: Rng + ?Sized, { + let payload_name = format!("{:?}", payload).chars().take(50).collect::(); + log_rss(&format!("Before cache construction for {}", payload_name)); + let maximum_block_bytes = if (maximum_block_bytes > u128::from(u32::MAX)) || (maximum_block_bytes > u128::from(total_bytes.get())) { @@ -337,6 +396,42 @@ impl Cache { total_bytes.get(), )? } + crate::Config::StaticLinesPerSecond { + static_path, + lines_per_second, + } => { + let span = span!(Level::INFO, "fixed", payload = "static-lines-per-second"); + let _guard = span.enter(); + let mut serializer = + crate::StaticLinesPerSecond::new(static_path, *lines_per_second)?; + construct_block_cache_inner( + &mut rng, + &mut serializer, + maximum_block_bytes, + total_bytes.get(), + )? + } + crate::Config::StaticSecond { + static_path, + timestamp_format, + emit_placeholder, + start_line_index, + } => { + let span = span!(Level::INFO, "fixed", payload = "static-second"); + let _guard = span.enter(); + let mut serializer = crate::StaticSecond::new( + static_path, + timestamp_format, + *emit_placeholder, + *start_line_index, + )?; + construct_block_cache_inner( + &mut rng, + &mut serializer, + maximum_block_bytes, + total_bytes.get(), + )? + } crate::Config::OpentelemetryTraces(config) => { let mut pyld = crate::OpentelemetryTraces::with_config(*config, &mut rng)?; let span = span!(Level::INFO, "fixed", payload = "otel-traces"); @@ -378,6 +473,30 @@ impl Cache { .map(|block| u64::from(block.total_bytes.get())) .sum(); + // Log cache stats and memory after construction + let block_count = blocks.len(); + let cache_size_str = Byte::from_u64(total_cycle_size) + .get_appropriate_unit(byte_unit::UnitType::Binary) + .to_string(); + + // Estimate actual heap usage + let estimated_heap: usize = blocks.iter().map(|b| b.estimated_heap_bytes()).sum(); + let vec_capacity_bytes = blocks.capacity() * std::mem::size_of::(); + let total_estimated = estimated_heap + vec_capacity_bytes; + let estimated_str = Byte::from_u64(total_estimated as u64) + .get_appropriate_unit(byte_unit::UnitType::Binary) + .to_string(); + + info!( + "[CACHE] Constructed {} blocks, payload size: {}, estimated heap: {}, vec capacity: {} blocks ({} bytes)", + block_count, + cache_size_str, + estimated_str, + blocks.capacity(), + vec_capacity_bytes + ); + log_rss(&format!("After cache construction ({} blocks)", block_count)); + Ok(Self::Fixed { idx: 0, blocks, @@ -409,6 +528,20 @@ impl Cache { } } + /// Get the number of blocks in the cache. + #[must_use] + pub fn len(&self) -> usize { + match self { + Self::Fixed { blocks, .. } => blocks.len(), + } + } + + /// Returns true if the cache has no blocks. + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Get metadata of the next block without advancing. #[must_use] pub fn peek_next_metadata(&self, handle: &Handle) -> BlockMetadata { @@ -530,16 +663,28 @@ where let mut rejected_block_sizes = 0; let mut success_block_sizes = 0; + // Estimate block count: use conservative 4KB average for small-block payloads + // (like static_second), but cap at 1/4 max_block_size for large-block payloads. + // This reduces Vec reallocations during cache construction. + let conservative_avg = 4096u32; // 4 KB - typical for static_second + let optimistic_avg = max_block_size / 4; + let avg_block_estimate = conservative_avg.min(optimistic_avg).max(1); + let estimated_blocks = (total_bytes / avg_block_estimate).max(128) as usize; info!( ?max_block_size, ?total_bytes, + ?estimated_blocks, + ?avg_block_estimate, "Constructing requested block cache" ); - let mut block_cache: Vec = Vec::with_capacity(128); + let mut block_cache: Vec = Vec::with_capacity(estimated_blocks); let mut bytes_remaining = total_bytes; let start = Instant::now(); let mut next_minute = 1; + let mut next_rss_log_blocks = 1000; // Log RSS every 1000 blocks + + log_rss("Start of cache construction loop"); // Build out the blocks. // @@ -563,6 +708,15 @@ where min_actual_block_size = min_actual_block_size.min(total_bytes); bytes_remaining = bytes_remaining.saturating_sub(total_bytes); block_cache.push(block); + + // Periodic RSS logging during construction + if success_block_sizes == next_rss_log_blocks { + log_rss(&format!( + "After {} blocks, {} remaining, vec cap={}", + success_block_sizes, bytes_remaining, block_cache.capacity() + )); + next_rss_log_blocks *= 2; // Log at 1000, 2000, 4000, 8000... + } } Err(SpinError::EmptyBlock) => { debug!(?block_size, "rejected block"); @@ -636,6 +790,26 @@ where ); } + log_rss(&format!( + "End of cache loop: {} blocks, filled={}, vec_cap={}", + block_cache.len(), + filled_sum_str, + block_cache.capacity() + )); + + // Release excess Vec capacity to reduce memory footprint + let old_cap = block_cache.capacity(); + block_cache.shrink_to_fit(); + if block_cache.capacity() < old_cap { + info!( + "Shrunk block_cache Vec from {} to {} capacity (saved {} bytes)", + old_cap, + block_cache.capacity(), + (old_cap - block_cache.capacity()) * std::mem::size_of::() + ); + log_rss("After shrink_to_fit"); + } + Ok(block_cache) } } @@ -658,7 +832,16 @@ where { let mut block: Writer = BytesMut::with_capacity(chunk_size as usize).writer(); serializer.to_bytes(&mut rng, chunk_size as usize, &mut block)?; - let bytes: Bytes = block.into_inner().freeze(); + let inner = block.into_inner(); + // Shrink allocation to actual size to avoid holding onto excess capacity. + // This is critical for small-block payloads like static_second where + // requested chunk_size (up to 5MB) >> actual data (often <10KB). + let bytes: Bytes = if inner.len() < inner.capacity() / 2 { + // Copy to right-sized allocation when we'd waste >50% capacity + Bytes::copy_from_slice(&inner) + } else { + inner.freeze() + }; if bytes.is_empty() { // Blocks should not be empty and if they are empty this is an // error. Caller may choose to handle this however they wish, often it diff --git a/lading_payload/src/lib.rs b/lading_payload/src/lib.rs index 043e2ffdf..08caa468b 100644 --- a/lading_payload/src/lib.rs +++ b/lading_payload/src/lib.rs @@ -27,6 +27,8 @@ pub use opentelemetry::metric::OpentelemetryMetrics; pub use opentelemetry::trace::OpentelemetryTraces; pub use splunk_hec::SplunkHec; pub use statik::Static; +pub use statik_line_rate::StaticLinesPerSecond; +pub use statik_second::StaticSecond; pub use syslog::Syslog5424; pub mod apache_common; @@ -40,6 +42,8 @@ pub mod opentelemetry; pub mod procfs; pub mod splunk_hec; pub mod statik; +pub mod statik_line_rate; +pub mod statik_second; pub mod syslog; pub mod trace_agent; @@ -129,6 +133,31 @@ pub enum Config { /// assumed to be line-oriented but no other claim is made on the file. static_path: PathBuf, }, + /// Generates static data but limits the number of lines emitted per block + StaticLinesPerSecond { + /// Defines the file path to read static variant data from. Content is + /// assumed to be line-oriented but no other claim is made on the file. + static_path: PathBuf, + /// Number of lines to emit in each generated block + lines_per_second: u32, + }, + /// Generates static data grouped by second; each block contains one + /// second's worth of logs as determined by a parsed timestamp prefix. + StaticSecond { + /// Defines the file path to read static variant data from. Content is + /// assumed to be line-oriented. + static_path: PathBuf, + /// Chrono-compatible timestamp format string used to parse the leading + /// timestamp in each line. + timestamp_format: String, + /// Emit a minimal placeholder block (single newline) for seconds with + /// no lines. When false, empty seconds are skipped. + #[serde(default)] + emit_placeholder: bool, + /// Optional starting line offset; lines before this index are skipped. + #[serde(default)] + start_line_index: Option, + }, /// Generates a line of printable ascii characters Ascii, /// Generates a json encoded line @@ -167,6 +196,10 @@ pub enum Payload { SplunkHec(splunk_hec::SplunkHec), /// Static file content Static(Static), + /// Static file content with a fixed number of lines emitted per block + StaticLinesPerSecond(StaticLinesPerSecond), + /// Static file content grouped into one-second blocks based on timestamps + StaticSecond(StaticSecond), /// Syslog RFC 5424 format Syslog(Syslog5424), /// OpenTelemetry traces @@ -195,6 +228,8 @@ impl Serialize for Payload { Payload::Json(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::SplunkHec(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::Static(ser) => ser.to_bytes(rng, max_bytes, writer), + Payload::StaticLinesPerSecond(ser) => ser.to_bytes(rng, max_bytes, writer), + Payload::StaticSecond(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::Syslog(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::OtelTraces(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::OtelLogs(ser) => ser.to_bytes(rng, max_bytes, writer), @@ -207,6 +242,8 @@ impl Serialize for Payload { fn data_points_generated(&self) -> Option { match self { Payload::OtelMetrics(ser) => ser.data_points_generated(), + Payload::StaticLinesPerSecond(ser) => ser.data_points_generated(), + Payload::StaticSecond(ser) => ser.data_points_generated(), // Other implementations use the default None _ => None, } diff --git a/lading_payload/src/splunk_hec.rs b/lading_payload/src/splunk_hec.rs index d2d82e7d3..5b14811c8 100644 --- a/lading_payload/src/splunk_hec.rs +++ b/lading_payload/src/splunk_hec.rs @@ -118,7 +118,7 @@ impl Distribution for StandardUniform { } /// Encoding to be used -#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Deserialize, Serialize, Debug, Clone, Copy, PartialEq, Eq, Default)] #[serde(deny_unknown_fields)] #[serde(rename_all = "snake_case")] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] @@ -127,15 +127,10 @@ pub enum Encoding { /// Use text-encoded log messages Text, /// Use JSON-encoded log messages + #[default] Json, } -impl Default for Encoding { - fn default() -> Self { - Self::Json - } -} - #[derive(Debug, Default, Clone, Copy)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] /// Splunk's HEC diff --git a/lading_payload/src/statik_line_rate.rs b/lading_payload/src/statik_line_rate.rs new file mode 100644 index 000000000..d5ddc4c10 --- /dev/null +++ b/lading_payload/src/statik_line_rate.rs @@ -0,0 +1,178 @@ +//! Static file payload that replays a limited number of lines per block. + +use std::{ + fs::{self, OpenOptions}, + io::{BufRead, BufReader, Write}, + num::NonZeroU32, + path::Path, +}; + +use rand::{Rng, seq::IndexedMutRandom}; +use tracing::debug; + +#[derive(Debug)] +struct Source { + lines: Vec>, + next_idx: usize, +} + +#[derive(Debug)] +/// Static payload that emits a fixed number of lines each time it is asked to +/// serialize. +pub struct StaticLinesPerSecond { + sources: Vec, + lines_per_block: NonZeroU32, + last_lines_generated: u64, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`StaticLinesPerSecond`]. +pub enum Error { + /// IO error + #[error(transparent)] + Io(#[from] std::io::Error), + /// No lines were discovered in the provided path + #[error("No lines found in static path")] + NoLines, + /// The provided `lines_per_second` value was zero + #[error("lines_per_second must be greater than zero")] + ZeroLinesPerSecond, +} + +impl StaticLinesPerSecond { + /// Create a new instance of `StaticLinesPerSecond` + /// + /// # Errors + /// + /// See documentation for [`Error`] + pub fn new(path: &Path, lines_per_second: u32) -> Result { + let lines_per_block = NonZeroU32::new(lines_per_second).ok_or(Error::ZeroLinesPerSecond)?; + + let mut sources = Vec::with_capacity(16); + + let metadata = fs::metadata(path)?; + if metadata.is_file() { + debug!("Static path {} is a file.", path.display()); + let lines = read_lines(path)?; + sources.push(Source { next_idx: 0, lines }); + } else if metadata.is_dir() { + debug!("Static path {} is a directory.", path.display()); + for entry in fs::read_dir(path)? { + let entry = entry?; + let entry_pth = entry.path(); + debug!("Attempting to open {} as file.", entry_pth.display()); + if let Ok(file) = OpenOptions::new().read(true).open(&entry_pth) { + let lines = read_lines_from_reader(file)?; + sources.push(Source { next_idx: 0, lines }); + } + } + } + + if sources.iter().all(|s| s.lines.is_empty()) { + return Err(Error::NoLines); + } + + Ok(Self { + sources, + lines_per_block, + last_lines_generated: 0, + }) + } +} + +impl crate::Serialize for StaticLinesPerSecond { + fn to_bytes( + &mut self, + mut rng: R, + max_bytes: usize, + writer: &mut W, + ) -> Result<(), crate::Error> + where + R: Rng + Sized, + W: Write, + { + self.last_lines_generated = 0; + + let Some(source) = self.sources.choose_mut(&mut rng) else { + return Ok(()); + }; + if source.lines.is_empty() { + return Ok(()); + } + + let mut bytes_written = 0usize; + for _ in 0..self.lines_per_block.get() { + let line = &source.lines[source.next_idx % source.lines.len()]; + let needed = line.len() + 1; // newline + if bytes_written + needed > max_bytes { + break; + } + + writer.write_all(line)?; + writer.write_all(b"\n")?; + bytes_written += needed; + self.last_lines_generated += 1; + source.next_idx = (source.next_idx + 1) % source.lines.len(); + } + + Ok(()) + } + + fn data_points_generated(&self) -> Option { + Some(self.last_lines_generated) + } +} + +fn read_lines(path: &Path) -> Result>, std::io::Error> { + let file = OpenOptions::new().read(true).open(path)?; + read_lines_from_reader(file) +} + +fn read_lines_from_reader(reader: R) -> Result>, std::io::Error> { + let mut out = Vec::new(); + let mut reader = BufReader::new(reader); + let mut buf = String::new(); + while { + buf.clear(); + reader.read_line(&mut buf)? + } != 0 + { + if buf.ends_with('\n') { + buf.pop(); + if buf.ends_with('\r') { + buf.pop(); + } + } + out.push(buf.as_bytes().to_vec()); + } + Ok(out) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Serialize; + use rand::{SeedableRng, rngs::StdRng}; + use std::{env, fs::File, io::Write}; + + #[test] + fn writes_requested_number_of_lines() { + let mut path = env::temp_dir(); + path.push("static_line_rate_test.txt"); + { + let mut f = File::create(&path).unwrap(); + writeln!(f, "alpha").unwrap(); + writeln!(f, "beta").unwrap(); + writeln!(f, "gamma").unwrap(); + } + + let mut serializer = StaticLinesPerSecond::new(&path, 2).unwrap(); + let mut buf = Vec::new(); + let mut rng = StdRng::seed_from_u64(42); + + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"alpha\nbeta\n"); + // Clean up + let _ = std::fs::remove_file(&path); + } +} diff --git a/lading_payload/src/statik_second.rs b/lading_payload/src/statik_second.rs new file mode 100644 index 000000000..1e5bcd6b8 --- /dev/null +++ b/lading_payload/src/statik_second.rs @@ -0,0 +1,386 @@ +//! Static file payload that emits one second of log lines per block, based on +//! parsing a timestamp at the start of each line. The parsed timestamp is +//! stripped from emitted lines; only the message body is replayed. + +use std::{ + fs::File, + io::{BufRead, BufReader, Write}, + path::{Path, PathBuf}, +}; + +use chrono::{NaiveDateTime, TimeZone, Utc}; +use rand::Rng; +use tracing::{debug, info}; + +#[derive(Debug)] +struct BlockLines { + lines: Vec>, +} + +#[derive(thiserror::Error, Debug)] +/// Errors produced by [`StaticSecond`]. +pub enum Error { + /// IO error + #[error(transparent)] + Io(#[from] std::io::Error), + /// No lines were discovered in the provided path + #[error("No lines found in static path")] + NoLines, + /// Timestamp parsing failed for a line + #[error("Failed to parse timestamp from line: {0}")] + Timestamp(String), +} + +#[derive(Debug)] +/// Static payload grouped by second boundaries. +pub struct StaticSecond { + path: PathBuf, + timestamp_format: String, + emit_placeholder: bool, + initial_offset: u64, + lines_to_skip_remaining: u64, + offset_consumed: bool, + reader: BufReader, + carry_first_line: Option<(i64, Vec)>, + pending_gap: u64, + next_block: Option, + last_lines_generated: u64, +} + +impl StaticSecond { + /// Create a new instance of `StaticSecond` + /// + /// Lines are grouped into blocks by the second of their timestamp. The + /// timestamp is parsed from the start of the line up to the first + /// whitespace, using `timestamp_format` (chrono strftime syntax). The + /// parsed timestamp is removed from the emitted line, leaving only the + /// remainder of the message. `start_line_index`, when provided, skips that + /// many lines (modulo the total number of available lines) before + /// returning payloads. + /// + /// # Errors + /// + /// Returns an error if the file cannot be read, contains no lines, or a + /// timestamp fails to parse. + pub fn new( + path: &Path, + timestamp_format: &str, + emit_placeholder: bool, + start_line_index: Option, + ) -> Result { + let file_size_bytes = File::open(path) + .ok() + .and_then(|f| f.metadata().ok()) + .map(|m| m.len()) + .unwrap_or(0); + + // Validation + counting pass; keeps memory usage low while preserving eager error detection. + let mut validation_reader = BufReader::new(File::open(path)?); + let mut total_lines: u64 = 0; + let mut buf = String::new(); + loop { + buf.clear(); + let bytes = validation_reader.read_line(&mut buf)?; + if bytes == 0 { + break; + } + if buf.trim().is_empty() { + continue; + } + Self::parse_line_with_format(&buf, timestamp_format)?; + total_lines += 1; + } + + if total_lines == 0 { + return Err(Error::NoLines); + } + + let initial_offset = start_line_index.unwrap_or(0) % total_lines; + let reader = BufReader::new(File::open(path)?); + + info!( + "StaticSecond streaming from {} ({} bytes, {} total lines, emit_placeholder={}, start_line_index={})", + path.display(), + file_size_bytes, + total_lines, + emit_placeholder, + initial_offset + ); + + let mut this = Self { + path: path.to_path_buf(), + timestamp_format: timestamp_format.to_owned(), + emit_placeholder, + initial_offset, + lines_to_skip_remaining: initial_offset, + offset_consumed: initial_offset == 0, + reader, + carry_first_line: None, + pending_gap: 0, + next_block: None, + last_lines_generated: 0, + }; + + // Preload first block so we can fail fast on empty/invalid content. + this.fill_next_block()?; + if this.next_block.is_none() { + return Err(Error::NoLines); + } + + Ok(this) + } + + fn parse_line_with_format(line: &str, timestamp_format: &str) -> Result<(i64, Vec), Error> { + let mut parts = line.splitn(2, char::is_whitespace); + let ts_token = parts.next().unwrap_or(""); + let payload = parts + .next() + .unwrap_or("") + .trim_start() + // Strip trailing newlines so we don't double-append in `to_bytes`. + .trim_end_matches(['\r', '\n']) + .as_bytes() + .to_vec(); + let ts = NaiveDateTime::parse_from_str(ts_token, timestamp_format) + .map_err(|_| Error::Timestamp(line.to_string()))?; + let sec = Utc.from_utc_datetime(&ts).timestamp(); + Ok((sec, payload)) + } + + fn reset_reader(&mut self) -> Result<(), Error> { + let file = File::open(&self.path)?; + self.reader = BufReader::new(file); + self.carry_first_line = None; + self.pending_gap = 0; + self.next_block = None; + self.lines_to_skip_remaining = if self.offset_consumed { + 0 + } else { + self.initial_offset + }; + Ok(()) + } + + fn fill_next_block(&mut self) -> Result<(), Error> { + if self.next_block.is_none() { + self.next_block = self.read_next_block()?; + } + Ok(()) + } + + fn read_next_block(&mut self) -> Result, Error> { + if self.pending_gap > 0 { + self.pending_gap -= 1; + return Ok(Some(BlockLines { lines: Vec::new() })); + } + + // Pre-allocate for typical log density (reduces reallocation during push) + let mut lines: Vec> = Vec::with_capacity(256); + let mut sec: Option = None; + + if let Some((carry_sec, payload)) = self.carry_first_line.take() { + sec = Some(carry_sec); + lines.push(payload); + } + + let mut buf = String::new(); + loop { + buf.clear(); + let bytes = self.reader.read_line(&mut buf)?; + if bytes == 0 { + break; + } + if buf.trim().is_empty() { + continue; + } + + if self.lines_to_skip_remaining > 0 { + self.lines_to_skip_remaining -= 1; + continue; + } + + let (line_sec, payload) = Self::parse_line_with_format(&buf, &self.timestamp_format)?; + + match sec { + None => { + sec = Some(line_sec); + lines.push(payload); + self.offset_consumed = true; + } + Some(s) if s == line_sec => { + lines.push(payload); + } + Some(s) if s < line_sec => { + if self.emit_placeholder && line_sec > s + 1 { + self.pending_gap = (line_sec - s - 1) as u64; + } + self.carry_first_line = Some((line_sec, payload)); + break; + } + Some(s) => { + debug!("Encountered out-of-order timestamp: current {s}, new {line_sec}"); + self.carry_first_line = Some((line_sec, payload)); + break; + } + } + } + + if sec.is_none() && lines.is_empty() { + return Ok(None); + } + + Ok(Some(BlockLines { lines })) + } +} + +impl crate::Serialize for StaticSecond { + fn to_bytes( + &mut self, + _rng: R, + max_bytes: usize, + writer: &mut W, + ) -> Result<(), crate::Error> + where + R: Rng + Sized, + W: Write, + { + self.last_lines_generated = 0; + + self.fill_next_block()?; + if self.next_block.is_none() { + self.reset_reader()?; + self.fill_next_block()?; + } + + let Some(block) = self.next_block.take() else { + return Ok(()); + }; + + let mut bytes_written = 0usize; + if block.lines.is_empty() { + // When requested, emit a minimal placeholder (one newline) for + // empty seconds to preserve timing gaps without breaking the + // non-zero block invariant. + if self.emit_placeholder && max_bytes > 0 { + writer.write_all(b"\n")?; + } + } else { + for line in &block.lines { + let needed = line.len() + 1; // newline + if bytes_written + needed > max_bytes { + break; + } + writer.write_all(line)?; + writer.write_all(b"\n")?; + bytes_written += needed; + self.last_lines_generated += 1; + } + } + Ok(()) + } + + fn data_points_generated(&self) -> Option { + Some(self.last_lines_generated) + } +} + +impl From for crate::Error { + fn from(err: Error) -> Self { + match err { + Error::Io(e) => crate::Error::Io(e), + Error::NoLines | Error::Timestamp(_) => crate::Error::Serialize, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Serialize; + use rand::{SeedableRng, rngs::StdRng}; + use std::{fs::File, io::Write}; + use tempfile::tempdir; + + #[test] + fn removes_timestamp_from_output() { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().join("static_second_test.log"); + { + let mut f = File::create(&path).unwrap(); + writeln!(f, "2024-01-01T00:00:00 first").unwrap(); + writeln!(f, "2024-01-01T00:00:00 second").unwrap(); + writeln!(f, "2024-01-01T00:00:01 third").unwrap(); + } + + let mut serializer = StaticSecond::new( + &path, + "%Y-%m-%dT%H:%M:%S", + /* emit_placeholder */ false, + None, + ) + .unwrap(); + let mut rng = StdRng::seed_from_u64(7); + let mut buf = Vec::new(); + + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"first\nsecond\n"); + + buf.clear(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"third\n"); + } + + #[test] + fn emits_placeholders_for_missing_seconds() { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().join("placeholder_test.log"); + { + let mut f = File::create(&path).unwrap(); + writeln!(f, "2024-01-01T00:00:00 first").unwrap(); + // Intentionally skip 00:00:01 + writeln!(f, "2024-01-01T00:00:02 third").unwrap(); + } + + let mut serializer = StaticSecond::new(&path, "%Y-%m-%dT%H:%M:%S", true, None).unwrap(); + let mut rng = StdRng::seed_from_u64(7); + + let mut buf = Vec::new(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"first\n"); + + buf.clear(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + // Placeholder newline for the missing second + assert_eq!(buf, b"\n"); + + buf.clear(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"third\n"); + } + + #[test] + fn honors_start_line_index_with_wraparound() { + let temp_dir = tempdir().unwrap(); + let path = temp_dir.path().join("start_index_test.log"); + { + let mut f = File::create(&path).unwrap(); + // Two lines in the first second, one in the second second. + writeln!(f, "2024-01-01T00:00:00 first").unwrap(); + writeln!(f, "2024-01-01T00:00:00 second").unwrap(); + writeln!(f, "2024-01-01T00:00:01 third").unwrap(); + } + + // Skip the first two lines; the stream should begin with "third". + let mut serializer = StaticSecond::new(&path, "%Y-%m-%dT%H:%M:%S", false, Some(2)).unwrap(); + let mut rng = StdRng::seed_from_u64(7); + + let mut buf = Vec::new(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + assert_eq!(buf, b"third\n"); + + buf.clear(); + serializer.to_bytes(&mut rng, 1024, &mut buf).unwrap(); + // After wrapping, we return to the beginning of the stream. + assert_eq!(buf, b"first\nsecond\n"); + } +}