diff --git a/Cargo.lock b/Cargo.lock index fbed0f2e..55160271 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -219,6 +219,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "assert_cmd" version = "2.1.2" @@ -717,12 +727,16 @@ dependencies = [ "chrono", "metrics", "metrics-exporter-prometheus", + "reqwest 0.13.2", "schemars 0.8.22", "serde", "serde_json", + "tempfile", "tokio", + "toml 1.0.6+spec-1.1.0", "tracing", "tracing-subscriber", + "wiremock", ] [[package]] @@ -1670,6 +1684,24 @@ dependencies = [ "syn", ] +[[package]] +name = "deadpool" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" +dependencies = [ + "deadpool-runtime", + "lazy_static", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "der" version = "0.7.10" @@ -3522,6 +3554,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -6870,6 +6912,29 @@ version = "0.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" +[[package]] +name = "wiremock" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" +dependencies = [ + "assert-json-diff", + "base64", + "deadpool", + "futures", + "http", + "http-body-util", + "hyper", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/crates/auths-cli/Cargo.toml b/crates/auths-cli/Cargo.toml index bb900646..bbcec681 100644 --- a/crates/auths-cli/Cargo.toml +++ b/crates/auths-cli/Cargo.toml @@ -39,7 +39,7 @@ auths-index.workspace = true auths-crypto.workspace = true auths-sdk.workspace = true auths-pairing-protocol.workspace = true -auths-telemetry.workspace = true +auths-telemetry = { workspace = true, features = ["sink-http"] } auths-verifier = { workspace = true, features = ["native"] } auths-infra-git.workspace = true auths-infra-http.workspace = true diff --git a/crates/auths-cli/src/factories/mod.rs b/crates/auths-cli/src/factories/mod.rs index 3b4905a7..24d069a8 100644 --- a/crates/auths-cli/src/factories/mod.rs +++ b/crates/auths-cli/src/factories/mod.rs @@ -7,8 +7,12 @@ use std::time::Duration; use anyhow::Result; use auths_core::config::EnvironmentConfig; +use auths_core::paths::auths_home; use auths_core::signing::{CachedPassphraseProvider, PassphraseProvider}; use auths_sdk::ports::agent::AgentSigningPort; +use auths_telemetry::TelemetryShutdown; +use auths_telemetry::config::{build_sinks_from_config, load_audit_config}; +use auths_telemetry::sinks::composite::CompositeSink; use crate::cli::AuthsCli; use crate::config::{CliConfig, OutputFormat}; @@ -64,6 +68,29 @@ pub fn build_config(cli: &AuthsCli) -> Result { }) } +/// Loads audit sinks from `~/.auths/audit.toml` and initialises the global +/// telemetry pipeline. +/// +/// Returns `None` when no sinks are configured — zero overhead in that case. +/// +/// Usage: +/// ```ignore +/// let _telemetry = auths_cli::factories::init_audit_sinks(); +/// ``` +pub fn init_audit_sinks() -> Option { + let audit_path = match auths_home() { + Ok(h) => h.join("audit.toml"), + Err(_) => return None, + }; + let config = load_audit_config(&audit_path); + let sinks = build_sinks_from_config(&config, |name| std::env::var(name).ok()); + if sinks.is_empty() { + return None; + } + let composite = Arc::new(CompositeSink::new(sinks)); + Some(auths_telemetry::init_telemetry_with_sink(composite)) +} + /// Build the platform-appropriate agent signing provider. /// /// Returns `CliAgentAdapter` on Unix, `NoopAgentProvider` elsewhere. diff --git a/crates/auths-cli/src/main.rs b/crates/auths-cli/src/main.rs index 21f449cc..49695839 100644 --- a/crates/auths-cli/src/main.rs +++ b/crates/auths-cli/src/main.rs @@ -11,7 +11,7 @@ use clap::Parser; use auths_cli::cli::{AuthsCli, RootCommand}; use auths_cli::commands::executable::ExecutableCommand; use auths_cli::config::OutputFormat; -use auths_cli::factories::build_config; +use auths_cli::factories::{build_config, init_audit_sinks}; use auths_cli::ux::format::set_json_mode; fn main() { @@ -21,9 +21,24 @@ fn main() { } } +/// Maps auditable commands to their action name. Returns `None` for commands +/// that don't emit audit events. +fn audit_action(command: &RootCommand) -> Option<&'static str> { + match command { + RootCommand::Init(_) => Some("identity_created"), + RootCommand::Pair(_) => Some("device_paired"), + RootCommand::Device(_) => Some("device_command"), + RootCommand::Verify(_) => Some("commit_verified"), + RootCommand::Signers(_) => Some("signers_command"), + _ => None, + } +} + fn run() -> Result<()> { env_logger::init(); + let _telemetry = init_audit_sinks(); + let cli = AuthsCli::parse(); if cli.help_all { @@ -58,7 +73,9 @@ fn run() -> Result<()> { } }; - match command { + let action = audit_action(&command); + + let result = match command { RootCommand::Init(cmd) => cmd.execute(&ctx), RootCommand::Sign(cmd) => cmd.execute(&ctx), RootCommand::Verify(cmd) => cmd.execute(&ctx), @@ -86,5 +103,14 @@ fn run() -> Result<()> { RootCommand::Config(cmd) => cmd.execute(&ctx), RootCommand::Commit(cmd) => cmd.execute(&ctx), RootCommand::Debug(cmd) => cmd.execute(&ctx), + }; + + if let Some(action) = action { + let status = if result.is_ok() { "success" } else { "failed" }; + let now = chrono::Utc::now().timestamp(); + let event = auths_telemetry::build_audit_event("unknown", action, status, now); + auths_telemetry::emit_telemetry(&event); } + + result } diff --git a/crates/auths-telemetry/Cargo.toml b/crates/auths-telemetry/Cargo.toml index 1df7943b..43f6a6d8 100644 --- a/crates/auths-telemetry/Cargo.toml +++ b/crates/auths-telemetry/Cargo.toml @@ -18,17 +18,23 @@ path = "src/lib.rs" chrono = { version = "0.4", features = ["serde"] } metrics = "0.24" metrics-exporter-prometheus = "0.18.1" +reqwest = { version = "0.13.2", features = ["json"], optional = true } schemars.workspace = true serde = { version = "1", features = ["derive"] } serde_json = "1" +tokio = { workspace = true, optional = true } +toml = "1.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } [features] test-utils = [] +sink-http = ["dep:reqwest", "dep:tokio"] [dev-dependencies] +tempfile = "3" tokio = { workspace = true, features = ["full"] } +wiremock = "0.6" [lints] workspace = true diff --git a/crates/auths-telemetry/src/config.rs b/crates/auths-telemetry/src/config.rs new file mode 100644 index 00000000..b8f82c7a --- /dev/null +++ b/crates/auths-telemetry/src/config.rs @@ -0,0 +1,234 @@ +//! Audit sink configuration. +//! +//! Customers define audit sinks in a TOML file (typically `~/.auths/audit.toml`). +//! Each sink entry specifies a type, destination, and optional credentials +//! (resolved from environment variables at runtime). + +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use serde::Deserialize; +use tracing::warn; + +use crate::ports::EventSink; +use crate::sinks::stdout::WriterSink; + +/// Top-level audit configuration. +/// +/// Usage: +/// ```ignore +/// let config = load_audit_config(Path::new("/home/user/.auths/audit.toml")); +/// let sinks = build_sinks_from_config(&config, |name| std::env::var(name).ok()); +/// ``` +#[derive(Debug, Deserialize, Default)] +pub struct AuditConfig { + #[serde(default)] + pub sinks: Vec, +} + +/// Authentication scheme for HTTP sinks. +#[derive(Debug, Deserialize, Clone, Default)] +#[serde(rename_all = "snake_case")] +pub enum AuthScheme { + /// `Authorization: Splunk ` + Splunk, + /// `Authorization: Bearer ` + #[default] + Bearer, + /// Custom header name (e.g. `DD-API-KEY`) + ApiKeyHeader { header: String }, +} + +/// Payload format for HTTP sinks (matches `sinks::http::PayloadFormat`). +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub enum ConfigPayloadFormat { + SplunkHec, + DatadogLogs, + NdJson, +} + +/// Individual sink configuration entry. +#[derive(Debug, Deserialize)] +#[serde(tag = "type")] +pub enum SinkConfig { + #[serde(rename = "http")] + Http { + url: String, + token_env: String, + #[serde(default)] + auth_scheme: AuthScheme, + payload_format: ConfigPayloadFormat, + #[serde(default = "default_batch_size")] + batch_size: usize, + #[serde(default = "default_flush_interval")] + flush_interval_ms: u64, + }, + #[serde(rename = "file")] + File { path: PathBuf }, + #[serde(rename = "stdout")] + Stdout, +} + +fn default_batch_size() -> usize { + 10 +} + +fn default_flush_interval() -> u64 { + 5000 +} + +/// Load audit config from a TOML file. +/// +/// Returns an empty config if the file does not exist. Logs a warning and +/// returns empty config if the file is malformed. +/// +/// Args: +/// * `path`: Path to the audit TOML config file. +/// +/// Usage: +/// ```ignore +/// let config = load_audit_config(Path::new("/home/user/.auths/audit.toml")); +/// ``` +pub fn load_audit_config(path: &Path) -> AuditConfig { + let content = match fs::read_to_string(path) { + Ok(c) => c, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return AuditConfig::default(), + Err(e) => { + warn!("could not read audit config {}: {e}", path.display()); + return AuditConfig::default(); + } + }; + + match toml::from_str(&content) { + Ok(config) => config, + Err(e) => { + warn!("invalid audit config {}: {e}", path.display()); + AuditConfig::default() + } + } +} + +/// Build concrete [`EventSink`] instances from parsed config. +/// +/// The `resolve_env` closure resolves environment variable names to values. +/// Skips sinks whose env vars are missing (with a warning). Returns sinks +/// ready for [`CompositeSink`](crate::sinks::composite::CompositeSink). +/// +/// Args: +/// * `config`: Parsed audit configuration. +/// * `resolve_env`: Closure that resolves env var names to values. +/// +/// Usage: +/// ```ignore +/// let sinks = build_sinks_from_config(&config, |name| std::env::var(name).ok()); +/// let composite = CompositeSink::new(sinks); +/// ``` +pub fn build_sinks_from_config( + config: &AuditConfig, + resolve_env: impl Fn(&str) -> Option, +) -> Vec> { + let mut sinks: Vec> = Vec::new(); + + for sink_config in &config.sinks { + match sink_config { + SinkConfig::Http { .. } => { + build_http_sink(&mut sinks, sink_config, &resolve_env); + } + SinkConfig::File { path } => { + build_file_sink(&mut sinks, path); + } + SinkConfig::Stdout => { + sinks.push(Arc::new(crate::sinks::stdout::new_stdout_sink())); + } + } + } + + sinks +} + +#[cfg(feature = "sink-http")] +fn build_http_sink( + sinks: &mut Vec>, + sink_config: &SinkConfig, + resolve_env: &dyn Fn(&str) -> Option, +) { + use std::collections::HashMap; + + use crate::sinks::http::{HttpSink, HttpSinkConfig, PayloadFormat}; + + let SinkConfig::Http { + url, + token_env, + auth_scheme, + payload_format, + batch_size, + flush_interval_ms, + } = sink_config + else { + return; + }; + + let Some(token) = resolve_env(token_env) else { + warn!("skipping audit sink: env var '{token_env}' not set"); + return; + }; + + let mut headers = HashMap::new(); + match auth_scheme { + AuthScheme::Splunk => { + headers.insert("Authorization".to_string(), format!("Splunk {token}")); + } + AuthScheme::Bearer => { + headers.insert("Authorization".to_string(), format!("Bearer {token}")); + } + AuthScheme::ApiKeyHeader { header } => { + headers.insert(header.clone(), token); + } + } + + let format = match payload_format { + ConfigPayloadFormat::SplunkHec => PayloadFormat::SplunkHec, + ConfigPayloadFormat::DatadogLogs => PayloadFormat::DatadogLogs, + ConfigPayloadFormat::NdJson => PayloadFormat::NdJson, + }; + + let config = HttpSinkConfig { + url: url.to_string(), + headers, + batch_size: *batch_size, + flush_interval_ms: *flush_interval_ms, + timeout_ms: 2000, + payload_format: format, + }; + + sinks.push(Arc::new(HttpSink::new(config))); +} + +#[cfg(not(feature = "sink-http"))] +fn build_http_sink( + _sinks: &mut Vec>, + _sink_config: &SinkConfig, + _resolve_env: &dyn Fn(&str) -> Option, +) { + warn!("HTTP audit sinks require the 'sink-http' feature; skipping"); +} + +fn build_file_sink(sinks: &mut Vec>, path: &Path) { + if let Some(parent) = path.parent() + && let Err(e) = fs::create_dir_all(parent) + { + warn!("could not create directory {}: {e}", parent.display()); + return; + } + + match fs::OpenOptions::new().create(true).append(true).open(path) { + Ok(file) => { + sinks.push(Arc::new(WriterSink::new(file))); + } + Err(e) => { + warn!("could not open audit log {}: {e}", path.display()); + } + } +} diff --git a/crates/auths-telemetry/src/lib.rs b/crates/auths-telemetry/src/lib.rs index c6956dd7..e8b8d715 100644 --- a/crates/auths-telemetry/src/lib.rs +++ b/crates/auths-telemetry/src/lib.rs @@ -4,6 +4,7 @@ //! deterministic telemetry standard consumed by auths-auth-server, //! auths-registry-server, and auths-chat-server. +pub mod config; pub mod emitter; pub mod event; pub mod logging; diff --git a/crates/auths-telemetry/src/sinks/composite.rs b/crates/auths-telemetry/src/sinks/composite.rs new file mode 100644 index 00000000..ad113ce3 --- /dev/null +++ b/crates/auths-telemetry/src/sinks/composite.rs @@ -0,0 +1,90 @@ +//! Composite sink that fans out events to multiple child sinks. + +use std::sync::Arc; + +use crate::ports::EventSink; + +/// Routes events to multiple [`EventSink`] children simultaneously. +/// +/// Each child receives every emitted payload independently. A panic or failure +/// in one child does not prevent delivery to the remaining children. +/// +/// Args: +/// * `sinks`: The child sinks to fan out to. +/// +/// Usage: +/// ```ignore +/// let composite = CompositeSink::new(vec![sink_a, sink_b]); +/// composite.emit(r#"{"action":"sign"}"#); +/// ``` +pub struct CompositeSink { + sinks: Vec>, +} + +impl CompositeSink { + /// Create a sink that fans out to the given children. + pub fn new(sinks: Vec>) -> Self { + Self { sinks } + } + + /// Create a sink with no children (noop). + pub fn empty() -> Self { + Self { sinks: Vec::new() } + } +} + +impl EventSink for CompositeSink { + fn emit(&self, payload: &str) { + for sink in &self.sinks { + // Isolate panics so one broken child cannot kill siblings + let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + sink.emit(payload); + })); + } + } + + fn flush(&self) { + for sink in &self.sinks { + let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + sink.flush(); + })); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::testing::MemoryEventSink; + + /// Harness that wraps a CompositeSink with a single MemoryEventSink child, + /// exposing `drain()` so the `event_sink_contract_tests!` macro works. + struct ContractHarness { + composite: CompositeSink, + inner: Arc, + } + + impl ContractHarness { + fn new() -> Self { + let inner = Arc::new(MemoryEventSink::new()); + let composite = CompositeSink::new(vec![inner.clone()]); + Self { composite, inner } + } + + fn drain(&self) -> Vec { + self.inner.drain() + } + } + + impl EventSink for ContractHarness { + fn emit(&self, payload: &str) { + self.composite.emit(payload); + } + + fn flush(&self) { + self.composite.flush(); + } + } + + crate::event_sink_contract_tests!(composite_contract, ContractHarness::new()); +} diff --git a/crates/auths-telemetry/src/sinks/http.rs b/crates/auths-telemetry/src/sinks/http.rs new file mode 100644 index 00000000..aea61d81 --- /dev/null +++ b/crates/auths-telemetry/src/sinks/http.rs @@ -0,0 +1,236 @@ +//! Generic HTTP event sink for enterprise SIEM integration. +//! +//! Sends batched audit events to customer-configured HTTP endpoints. +//! Supports Splunk HEC, Datadog Logs, and generic NDJSON formats. +//! Non-blocking: `emit()` pushes to a bounded channel; a background thread +//! batches and POSTs. + +use std::collections::HashMap; +use std::sync::atomic::Ordering; +use std::sync::mpsc as std_mpsc; +use std::time::Duration; + +use crate::emitter::DROPPED_AUDIT_EVENTS; +use crate::ports::EventSink; + +/// Payload serialization format for the HTTP endpoint. +#[derive(Debug, Clone)] +pub enum PayloadFormat { + /// Splunk HTTP Event Collector: concatenated JSON objects. + /// `{"event":"..."}{"event":"..."}` + SplunkHec, + /// Datadog Logs API: JSON array. + /// `[{"message":"..."}, {"message":"..."}]` + DatadogLogs, + /// Newline-delimited JSON (generic). + /// `{"event":"..."}\n{"event":"..."}\n` + NdJson, +} + +/// Configuration for an [`HttpSink`]. +#[derive(Debug, Clone)] +pub struct HttpSinkConfig { + pub url: String, + pub headers: HashMap, + pub batch_size: usize, + pub flush_interval_ms: u64, + pub timeout_ms: u64, + pub payload_format: PayloadFormat, +} + +impl Default for HttpSinkConfig { + fn default() -> Self { + Self { + url: String::new(), + headers: HashMap::new(), + batch_size: 10, + flush_interval_ms: 5000, + timeout_ms: 2000, + payload_format: PayloadFormat::NdJson, + } + } +} + +enum WorkerMsg { + Event(String), + Flush(std_mpsc::SyncSender<()>), + Shutdown, +} + +/// Generic HTTP POST event sink. +/// +/// Events are queued via a bounded channel and delivered by a background thread. +/// Best-effort delivery — HTTP failures are silently dropped. +/// +/// Args: +/// * `config`: Sink configuration (URL, headers, format, batching). +/// +/// Usage: +/// ```ignore +/// let config = HttpSinkConfig { url: "https://splunk.corp:8088/services/collector/event".into(), ..Default::default() }; +/// let sink = HttpSink::new(config); +/// sink.emit(r#"{"action":"sign"}"#); +/// sink.flush(); +/// ``` +pub struct HttpSink { + tx: tokio::sync::mpsc::Sender, + worker_handle: std::sync::Mutex>>, +} + +impl HttpSink { + /// Create a new HTTP sink with the given config. Spawns a background worker thread. + pub fn new(config: HttpSinkConfig) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel::(256); + + let worker_handle = std::thread::Builder::new() + .name("auths-http-sink".into()) + .spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + if let Ok(rt) = rt { + rt.block_on(worker_loop(config, rx)); + } + }); + + Self { + tx, + worker_handle: std::sync::Mutex::new(worker_handle.ok()), + } + } +} + +impl EventSink for HttpSink { + fn emit(&self, payload: &str) { + if self + .tx + .try_send(WorkerMsg::Event(payload.to_string())) + .is_err() + { + DROPPED_AUDIT_EVENTS.fetch_add(1, Ordering::Relaxed); + } + } + + fn flush(&self) { + let (ack_tx, ack_rx) = std_mpsc::sync_channel(0); + if self.tx.try_send(WorkerMsg::Flush(ack_tx)).is_ok() { + let _ = ack_rx.recv_timeout(Duration::from_secs(2)); + } + } +} + +impl Drop for HttpSink { + fn drop(&mut self) { + let _ = self.tx.try_send(WorkerMsg::Shutdown); + if let Ok(mut guard) = self.worker_handle.lock() + && let Some(handle) = guard.take() + { + let _ = handle.join(); + } + } +} + +async fn worker_loop(config: HttpSinkConfig, mut rx: tokio::sync::mpsc::Receiver) { + let client = reqwest::Client::builder() + .timeout(Duration::from_millis(config.timeout_ms)) + .connect_timeout(Duration::from_secs(5)) + .user_agent("auths-telemetry/0.1") + .build(); + + let Ok(client) = client else { return }; + + let mut buffer: Vec = Vec::with_capacity(config.batch_size); + let flush_interval = Duration::from_millis(config.flush_interval_ms); + let mut timer = tokio::time::interval(flush_interval); + timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + // Skip the immediate first tick + timer.tick().await; + + loop { + tokio::select! { + msg = rx.recv() => { + match msg { + Some(WorkerMsg::Event(payload)) => { + buffer.push(payload); + if buffer.len() >= config.batch_size { + send_batch(&client, &config, &mut buffer).await; + } + } + Some(WorkerMsg::Flush(ack)) => { + if !buffer.is_empty() { + send_batch(&client, &config, &mut buffer).await; + } + let _ = ack.send(()); + } + Some(WorkerMsg::Shutdown) | None => { + if !buffer.is_empty() { + send_batch(&client, &config, &mut buffer).await; + } + break; + } + } + } + _ = timer.tick() => { + if !buffer.is_empty() { + send_batch(&client, &config, &mut buffer).await; + } + } + } + } +} + +async fn send_batch(client: &reqwest::Client, config: &HttpSinkConfig, buffer: &mut Vec) { + let body = format_batch(&config.payload_format, buffer); + buffer.clear(); + + let content_type = match config.payload_format { + PayloadFormat::NdJson => "application/x-ndjson", + PayloadFormat::SplunkHec | PayloadFormat::DatadogLogs => "application/json", + }; + + let mut req = client + .post(&config.url) + .header("Content-Type", content_type) + .body(body); + + for (key, value) in &config.headers { + req = req.header(key.as_str(), value.as_str()); + } + + // Best-effort: silently drop HTTP errors + let _ = req.send().await; +} + +/// Serialize a batch of JSON payloads into the format-specific HTTP body. +/// +/// Args: +/// * `format`: The target payload format. +/// * `events`: Raw JSON payload strings from `emit()`. +pub fn format_batch(format: &PayloadFormat, events: &[String]) -> String { + match format { + PayloadFormat::SplunkHec => { + let mut body = String::new(); + for event in events { + body.push_str(r#"{"event":"#); + body.push_str(event); + body.push_str(r#","source":"auths","sourcetype":"auths:audit"}"#); + } + body + } + PayloadFormat::DatadogLogs => { + let entries: Vec = events + .iter() + .map(|e| format!(r#"{{"message":{e},"ddsource":"auths","service":"auths"}}"#)) + .collect(); + format!("[{}]", entries.join(",")) + } + PayloadFormat::NdJson => { + let mut body = String::new(); + for event in events { + body.push_str(event); + body.push('\n'); + } + body + } + } +} diff --git a/crates/auths-telemetry/src/sinks/mod.rs b/crates/auths-telemetry/src/sinks/mod.rs index 35499af1..d198a29b 100644 --- a/crates/auths-telemetry/src/sinks/mod.rs +++ b/crates/auths-telemetry/src/sinks/mod.rs @@ -1,3 +1,6 @@ //! Telemetry sink implementations. +pub mod composite; +#[cfg(feature = "sink-http")] +pub mod http; pub mod stdout; diff --git a/crates/auths-telemetry/tests/cases/composite.rs b/crates/auths-telemetry/tests/cases/composite.rs new file mode 100644 index 00000000..dbcaa574 --- /dev/null +++ b/crates/auths-telemetry/tests/cases/composite.rs @@ -0,0 +1,96 @@ +use std::sync::{Arc, Mutex}; + +use auths_telemetry::EventSink; +use auths_telemetry::sinks::composite::CompositeSink; +use auths_telemetry::sinks::stdout::WriterSink; + +/// Shared buffer for capturing sink output without stdout. +struct SharedBuf(Arc>>); + +impl std::io::Write for SharedBuf { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +fn make_capturing_sink() -> (Arc>>, WriterSink) { + let buf = Arc::new(Mutex::new(Vec::new())); + let sink = WriterSink::new(SharedBuf(Arc::clone(&buf))); + (buf, sink) +} + +fn captured_lines(buf: &Arc>>) -> Vec { + let bytes = buf.lock().unwrap().clone(); + String::from_utf8(bytes) + .unwrap() + .lines() + .map(String::from) + .collect() +} + +#[test] +fn fan_out_delivers_to_all_children() { + let (buf_a, sink_a) = make_capturing_sink(); + let (buf_b, sink_b) = make_capturing_sink(); + let composite = CompositeSink::new(vec![Arc::new(sink_a), Arc::new(sink_b)]); + + composite.emit("event-1"); + composite.emit("event-2"); + + assert_eq!(captured_lines(&buf_a), vec!["event-1", "event-2"]); + assert_eq!(captured_lines(&buf_b), vec!["event-1", "event-2"]); +} + +#[test] +fn empty_composite_does_not_panic() { + let composite = CompositeSink::empty(); + composite.emit("payload"); + composite.flush(); +} + +#[test] +fn panicking_child_does_not_block_siblings() { + struct PanicSink; + impl EventSink for PanicSink { + fn emit(&self, _payload: &str) { + panic!("intentional test panic"); + } + fn flush(&self) {} + } + + let (buf, good_sink) = make_capturing_sink(); + let composite = CompositeSink::new(vec![ + Arc::new(PanicSink) as Arc, + Arc::new(good_sink), + ]); + + composite.emit("should-survive"); + assert_eq!(captured_lines(&buf), vec!["should-survive"]); +} + +#[test] +fn flush_fans_out_to_all_children() { + let (buf_a, sink_a) = make_capturing_sink(); + let (buf_b, sink_b) = make_capturing_sink(); + let composite = CompositeSink::new(vec![Arc::new(sink_a), Arc::new(sink_b)]); + + composite.emit("before-flush"); + composite.flush(); + + assert_eq!(captured_lines(&buf_a), vec!["before-flush"]); + assert_eq!(captured_lines(&buf_b), vec!["before-flush"]); +} + +#[test] +fn single_child_composite_forwards_correctly() { + let (buf, sink) = make_capturing_sink(); + let composite = CompositeSink::new(vec![Arc::new(sink)]); + + composite.emit("solo"); + assert_eq!(captured_lines(&buf), vec!["solo"]); +} diff --git a/crates/auths-telemetry/tests/cases/config.rs b/crates/auths-telemetry/tests/cases/config.rs new file mode 100644 index 00000000..b95d0739 --- /dev/null +++ b/crates/auths-telemetry/tests/cases/config.rs @@ -0,0 +1,168 @@ +use std::path::Path; + +use auths_telemetry::config::{AuditConfig, load_audit_config}; + +#[test] +fn missing_file_returns_empty_config() { + let config = load_audit_config(Path::new("/nonexistent/audit.toml")); + assert!(config.sinks.is_empty()); +} + +#[test] +fn empty_file_returns_empty_config() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("audit.toml"); + std::fs::write(&path, "").unwrap(); + + let config = load_audit_config(&path); + assert!(config.sinks.is_empty()); +} + +#[test] +fn malformed_toml_returns_empty_config() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("audit.toml"); + std::fs::write(&path, "this is not valid toml [[[").unwrap(); + + let config = load_audit_config(&path); + assert!(config.sinks.is_empty()); +} + +#[test] +fn parses_stdout_sink() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("audit.toml"); + std::fs::write( + &path, + r#" +[[sinks]] +type = "stdout" +"#, + ) + .unwrap(); + + let config = load_audit_config(&path); + assert_eq!(config.sinks.len(), 1); +} + +#[test] +fn parses_file_sink() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("audit.toml"); + std::fs::write( + &path, + r#" +[[sinks]] +type = "file" +path = "/tmp/auths-audit.jsonl" +"#, + ) + .unwrap(); + + let config = load_audit_config(&path); + assert_eq!(config.sinks.len(), 1); +} + +#[test] +fn parses_http_sink_with_defaults() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("audit.toml"); + std::fs::write( + &path, + r#" +[[sinks]] +type = "http" +url = "https://splunk.corp:8088/services/collector/event" +token_env = "SPLUNK_HEC_TOKEN" +payload_format = "splunk_hec" +auth_scheme = "splunk" +"#, + ) + .unwrap(); + + let config = load_audit_config(&path); + assert_eq!(config.sinks.len(), 1); +} + +#[test] +fn parses_multiple_sinks() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("audit.toml"); + std::fs::write( + &path, + r#" +[[sinks]] +type = "http" +url = "https://splunk.corp:8088/services/collector/event" +token_env = "SPLUNK_HEC_TOKEN" +payload_format = "splunk_hec" + +[[sinks]] +type = "file" +path = "/var/log/auths/audit.jsonl" + +[[sinks]] +type = "stdout" +"#, + ) + .unwrap(); + + let config = load_audit_config(&path); + assert_eq!(config.sinks.len(), 3); +} + +#[test] +fn empty_sinks_array_is_valid() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("audit.toml"); + std::fs::write(&path, "sinks = []").unwrap(); + + let config = load_audit_config(&path); + assert!(config.sinks.is_empty()); +} + +#[test] +fn build_stdout_sink() { + let config = AuditConfig { + sinks: vec![auths_telemetry::config::SinkConfig::Stdout], + }; + let sinks = auths_telemetry::config::build_sinks_from_config(&config, |_| None); + assert_eq!(sinks.len(), 1); +} + +#[test] +fn build_file_sink_creates_parent_dirs() { + let dir = tempfile::tempdir().unwrap(); + let log_path = dir.path().join("subdir").join("audit.jsonl"); + + let config = AuditConfig { + sinks: vec![auths_telemetry::config::SinkConfig::File { + path: log_path.clone(), + }], + }; + let sinks = auths_telemetry::config::build_sinks_from_config(&config, |_| None); + assert_eq!(sinks.len(), 1); + assert!(log_path.parent().unwrap().exists()); +} + +#[test] +fn build_skips_http_sink_when_env_var_missing() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("audit.toml"); + std::fs::write( + &path, + r#" +[[sinks]] +type = "http" +url = "https://example.com/events" +token_env = "SPLUNK_TOKEN" +payload_format = "ndjson" +"#, + ) + .unwrap(); + + let config = load_audit_config(&path); + // Resolver always returns None — simulates missing env var + let sinks = auths_telemetry::config::build_sinks_from_config(&config, |_| None); + assert!(sinks.is_empty(), "should skip sink when env var is missing"); +} diff --git a/crates/auths-telemetry/tests/cases/http_sink.rs b/crates/auths-telemetry/tests/cases/http_sink.rs new file mode 100644 index 00000000..6546f7d0 --- /dev/null +++ b/crates/auths-telemetry/tests/cases/http_sink.rs @@ -0,0 +1,202 @@ +use std::collections::HashMap; +use std::time::Duration; + +use auths_telemetry::EventSink; +use auths_telemetry::sinks::http::{HttpSink, HttpSinkConfig, PayloadFormat, format_batch}; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +fn config_for(server: &MockServer, format: PayloadFormat) -> HttpSinkConfig { + HttpSinkConfig { + url: format!("{}/events", server.uri()), + headers: HashMap::new(), + batch_size: 10, + flush_interval_ms: 60000, + timeout_ms: 2000, + payload_format: format, + } +} + +#[tokio::test] +async fn ndjson_format_delivers_events() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/events")) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&server) + .await; + + let mut config = config_for(&server, PayloadFormat::NdJson); + config.batch_size = 2; + + let sink = HttpSink::new(config); + sink.emit(r#"{"action":"sign"}"#); + sink.emit(r#"{"action":"verify"}"#); + + // Give the worker time to flush the batch + tokio::time::sleep(Duration::from_millis(200)).await; + sink.flush(); + drop(sink); + + // wiremock verifies expect(1) on drop +} + +#[tokio::test] +async fn splunk_hec_format_sends_concatenated_json() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/events")) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&server) + .await; + + let mut config = config_for(&server, PayloadFormat::SplunkHec); + config.batch_size = 2; + config + .headers + .insert("Authorization".to_string(), "Splunk test-token".to_string()); + + let sink = HttpSink::new(config); + sink.emit(r#"{"action":"sign"}"#); + sink.emit(r#"{"action":"verify"}"#); + + tokio::time::sleep(Duration::from_millis(200)).await; + sink.flush(); + drop(sink); +} + +#[tokio::test] +async fn datadog_format_sends_json_array() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/events")) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&server) + .await; + + let mut config = config_for(&server, PayloadFormat::DatadogLogs); + config.batch_size = 2; + config + .headers + .insert("DD-API-KEY".to_string(), "test-key".to_string()); + + let sink = HttpSink::new(config); + sink.emit(r#"{"action":"sign"}"#); + sink.emit(r#"{"action":"verify"}"#); + + tokio::time::sleep(Duration::from_millis(200)).await; + sink.flush(); + drop(sink); +} + +#[tokio::test] +async fn flush_delivers_partial_batch() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/events")) + .respond_with(ResponseTemplate::new(200)) + .expect(1) + .mount(&server) + .await; + + let config = config_for(&server, PayloadFormat::NdJson); + // batch_size is 10, but we only emit 1 event — flush should still deliver + let sink = HttpSink::new(config); + sink.emit(r#"{"action":"sign"}"#); + sink.flush(); + drop(sink); +} + +#[tokio::test] +async fn http_error_does_not_panic() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/events")) + .respond_with(ResponseTemplate::new(500)) + .mount(&server) + .await; + + let mut config = config_for(&server, PayloadFormat::NdJson); + config.batch_size = 1; + + let sink = HttpSink::new(config); + sink.emit(r#"{"action":"sign"}"#); + tokio::time::sleep(Duration::from_millis(200)).await; + sink.emit(r#"{"action":"verify"}"#); + sink.flush(); + drop(sink); + // No panic = success +} + +#[tokio::test] +async fn drop_shuts_down_cleanly() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/events")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let config = config_for(&server, PayloadFormat::NdJson); + let sink = HttpSink::new(config); + sink.emit(r#"{"action":"sign"}"#); + drop(sink); + // Clean shutdown = no hang, no panic +} + +#[test] +fn format_batch_splunk_hec_concatenates_objects() { + let events = vec![ + r#"{"action":"sign"}"#.to_string(), + r#"{"action":"verify"}"#.to_string(), + ]; + let body = format_batch(&PayloadFormat::SplunkHec, &events); + assert!( + !body.starts_with('['), + "Splunk HEC must NOT be a JSON array" + ); + assert!( + body.contains(r#"{"event":{"action":"sign"}"#), + "body: {body}" + ); + assert!( + body.contains(r#"{"event":{"action":"verify"}"#), + "body: {body}" + ); +} + +#[test] +fn format_batch_datadog_produces_json_array() { + let events = vec![ + r#"{"action":"sign"}"#.to_string(), + r#"{"action":"verify"}"#.to_string(), + ]; + let body = format_batch(&PayloadFormat::DatadogLogs, &events); + assert!(body.starts_with('['), "Datadog must be a JSON array"); + assert!(body.ends_with(']'), "Datadog must end with ]"); + assert!(body.contains(r#""ddsource":"auths""#), "body: {body}"); +} + +#[test] +fn format_batch_ndjson_uses_newlines() { + let events = vec![ + r#"{"action":"sign"}"#.to_string(), + r#"{"action":"verify"}"#.to_string(), + ]; + let body = format_batch(&PayloadFormat::NdJson, &events); + let lines: Vec<&str> = body.trim_end().split('\n').collect(); + assert_eq!(lines.len(), 2); + assert_eq!(lines[0], r#"{"action":"sign"}"#); + assert_eq!(lines[1], r#"{"action":"verify"}"#); +} + +#[test] +fn format_batch_empty_events_produces_empty_output() { + let events: Vec = vec![]; + assert_eq!(format_batch(&PayloadFormat::NdJson, &events), ""); + assert_eq!(format_batch(&PayloadFormat::SplunkHec, &events), ""); + assert_eq!(format_batch(&PayloadFormat::DatadogLogs, &events), "[]"); +} diff --git a/crates/auths-telemetry/tests/cases/mod.rs b/crates/auths-telemetry/tests/cases/mod.rs index 473c63d7..21a7eb37 100644 --- a/crates/auths-telemetry/tests/cases/mod.rs +++ b/crates/auths-telemetry/tests/cases/mod.rs @@ -1,2 +1,6 @@ +pub mod composite; +pub mod config; pub mod emitter; +#[cfg(feature = "sink-http")] +pub mod http_sink; pub mod schema; diff --git a/deny.toml b/deny.toml index f0329142..08fa264e 100644 --- a/deny.toml +++ b/deny.toml @@ -30,6 +30,7 @@ deny = [ "auths-infra-http", "auths-cli", "auths-mcp-server", + "auths-telemetry", "auths-id", "xtask", "jsonschema", diff --git a/packages/auths-python/Cargo.lock b/packages/auths-python/Cargo.lock index 54d1a326..956256d4 100644 --- a/packages/auths-python/Cargo.lock +++ b/packages/auths-python/Cargo.lock @@ -361,6 +361,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "toml", "tracing", "tracing-subscriber", ]