From 3dffb908ceb179a2573ffaa322bc704ddb5baf0f Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:10 -0700 Subject: [PATCH 01/12] add workspace dependencies for audit archiver --- Cargo.toml | 11 +++++++++++ services/audit-archiver/Cargo.toml | 23 +++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index d42a03a..d59e7e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,6 +108,17 @@ proptest = "1.4" schemars = "0.8" jsonschema = "0.17" +# Arrow / Parquet (audit archiver) +arrow = { version = "53", default-features = false, features = ["chrono-tz"] } +parquet = { version = "53", default-features = false, features = ["arrow", "zstd"] } + +# AWS SDK (S3 cold storage) +aws-sdk-s3 = "1" +aws-config = { version = "1", features = ["behavior-version-latest"] } + +# Bytes +bytes = "1" + [workspace.lints.rust] unsafe_code = "forbid" missing_docs = "warn" diff --git a/services/audit-archiver/Cargo.toml b/services/audit-archiver/Cargo.toml index e76a4b1..49dd456 100644 --- a/services/audit-archiver/Cargo.toml +++ b/services/audit-archiver/Cargo.toml @@ -33,14 +33,37 @@ chrono = { workspace = true } # Error handling anyhow = { workspace = true } +thiserror = { workspace = true } + +# Async traits +async-trait = { workspace = true } # Tracing tracing = { workspace = true } tracing-subscriber = { workspace = true } +# Metrics +metrics = { workspace = true } +metrics-exporter-prometheus = { workspace = true } + # Configuration config = { workspace = true } dotenvy = { workspace = true } +# Arrow / Parquet +arrow = { workspace = true } +parquet = { workspace = true } + +# AWS SDK (S3) +aws-sdk-s3 = { workspace = true } +aws-config = { workspace = true } + +# Bytes +bytes = { workspace = true } + +[dev-dependencies] +tempfile = "3" +tokio-test = { workspace = true } + [lints] workspace = true From 82451cb326da80af33b435886083ab32861619a9 Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:13 -0700 Subject: [PATCH 02/12] add audit archiver error types --- services/audit-archiver/src/error.rs | 32 ++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 services/audit-archiver/src/error.rs diff --git a/services/audit-archiver/src/error.rs b/services/audit-archiver/src/error.rs new file mode 100644 index 0000000..14a9d26 --- /dev/null +++ b/services/audit-archiver/src/error.rs @@ -0,0 +1,32 @@ +//! Error types for the audit archiver service. + +/// Errors that can occur during audit archival operations. +#[derive(Debug, thiserror::Error)] +pub enum ArchiverError { + /// Database error during partition management or data export. + #[error("database error: {0}")] + Database(#[from] sqlx::Error), + + /// Failed to serialize audit events to Parquet format. + #[error("parquet serialization error: {0}")] + Parquet(#[from] parquet::errors::ParquetError), + + /// Failed to build Arrow record batch from audit rows. + #[error("arrow error: {0}")] + Arrow(#[from] arrow::error::ArrowError), + + /// Cold storage upload or verification failed. + #[error("storage error: {0}")] + Storage(String), + + /// Configuration is invalid or missing required fields. + #[error("configuration error: {0}")] + Config(String), + + /// The generated partition name failed validation. + #[error("invalid partition name: {0}")] + InvalidPartitionName(String), +} + +/// Result alias for archiver operations. +pub type Result = std::result::Result; From db980f3459cea7bf7a2a9ffea9da82ec3ccb70a4 Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:14 -0700 Subject: [PATCH 03/12] add audit archiver configuration --- services/audit-archiver/src/config.rs | 156 ++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 services/audit-archiver/src/config.rs diff --git a/services/audit-archiver/src/config.rs b/services/audit-archiver/src/config.rs new file mode 100644 index 0000000..7beeb7f --- /dev/null +++ b/services/audit-archiver/src/config.rs @@ -0,0 +1,156 @@ +//! Configuration for the audit archiver service. + +use serde::Deserialize; + +/// Top-level archiver configuration, loaded from environment variables +/// with the `ARCHIVER__` prefix. +#[derive(Debug, Clone, Deserialize)] +pub struct ArchiverConfig { + /// Database connection settings. + pub database: DatabaseConfig, + /// Cold storage settings. + pub storage: StorageConfig, + /// Partition retention policy. + #[serde(default)] + pub retention: RetentionConfig, + /// Observability settings. + #[serde(default)] + pub observability: ObservabilityConfig, +} + +/// Database configuration for the archiver. +/// The archiver only needs primary access (for DDL and reads before drop). +#[derive(Debug, Clone, Deserialize)] +pub struct DatabaseConfig { + /// PostgreSQL primary connection URL. + pub url: String, + /// Maximum number of connections in the pool. + #[serde(default = "default_max_connections")] + pub max_connections: u32, + /// Connection timeout in seconds. + #[serde(default = "default_connect_timeout")] + pub connect_timeout_secs: u64, +} + +/// Cold storage configuration. +#[derive(Debug, Clone, Deserialize)] +pub struct StorageConfig { + /// Storage backend type. + pub backend: StorageBackend, + /// S3 bucket name (required when backend = s3). + pub s3_bucket: Option, + /// S3 key prefix for archived partitions. + #[serde(default = "default_s3_prefix")] + pub s3_prefix: String, + /// S3 endpoint URL override (for MinIO in local dev). + pub s3_endpoint_url: Option, + /// S3 region. + #[serde(default = "default_s3_region")] + pub s3_region: String, + /// Local filesystem path (required when backend = local_fs). + pub local_path: Option, +} + +/// Which storage backend to use. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum StorageBackend { + /// AWS S3 or S3-compatible (MinIO). + S3, + /// Local filesystem (development only). + LocalFs, +} + +/// Partition retention policy. +#[derive(Debug, Clone, Deserialize)] +pub struct RetentionConfig { + /// How many days to keep partitions in PostgreSQL. + #[serde(default = "default_hot_retention")] + pub hot_retention_days: u32, + /// How many days in advance to create future partitions. + #[serde(default = "default_advance_days")] + pub advance_partition_days: u32, +} + +/// Observability settings. +#[derive(Debug, Clone, Deserialize)] +pub struct ObservabilityConfig { + /// Log level filter. + #[serde(default = "default_log_level")] + pub log_level: String, +} + +impl ArchiverConfig { + /// Loads configuration from environment variables with `ARCHIVER__` prefix. + /// + /// # Errors + /// + /// Returns an error if required variables are missing or invalid. + pub fn from_env() -> std::result::Result { + config::Config::builder() + .add_source( + config::Environment::with_prefix("ARCHIVER") + .separator("__") + .try_parsing(true), + ) + .build()? + .try_deserialize() + } +} + +impl Default for RetentionConfig { + fn default() -> Self { + Self { + hot_retention_days: default_hot_retention(), + advance_partition_days: default_advance_days(), + } + } +} + +impl Default for ObservabilityConfig { + fn default() -> Self { + Self { + log_level: default_log_level(), + } + } +} + +fn default_max_connections() -> u32 { + 4 +} +fn default_connect_timeout() -> u64 { + 5 +} +fn default_s3_prefix() -> String { + "audit-archive/".to_string() +} +fn default_s3_region() -> String { + "us-east-1".to_string() +} +fn default_hot_retention() -> u32 { + 90 +} +fn default_advance_days() -> u32 { + 7 +} +fn default_log_level() -> String { + "info".to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_retention_defaults() { + let retention = RetentionConfig::default(); + assert_eq!(retention.hot_retention_days, 90); + assert_eq!(retention.advance_partition_days, 7); + } + + #[test] + fn test_observability_defaults() { + let obs = ObservabilityConfig::default(); + assert_eq!(obs.log_level, "info"); + } +} From 4d735cba34fd6103172685ffa24bb5dd35a9f837 Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:14 -0700 Subject: [PATCH 04/12] add advisory lock leader election --- services/audit-archiver/src/leader.rs | 92 +++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 services/audit-archiver/src/leader.rs diff --git a/services/audit-archiver/src/leader.rs b/services/audit-archiver/src/leader.rs new file mode 100644 index 0000000..dbe5b10 --- /dev/null +++ b/services/audit-archiver/src/leader.rs @@ -0,0 +1,92 @@ +//! PostgreSQL advisory lock for leader election. +//! +//! Only one archiver instance should run at a time. We use a PostgreSQL +//! advisory lock to ensure this without external coordination. + +use sqlx::PgPool; +use tracing::{info, warn}; + +use crate::error::Result; + +/// Fixed advisory lock ID derived from "AUDITA" (0x41_55_44_49_54_41). +/// This avoids collision with other advisory lock users in the same database. +const LOCK_ID: i64 = 0x0041_5544_4954_4100; + +/// A held advisory lock that releases on drop (via explicit `release()`). +pub struct AdvisoryLock<'a> { + pool: &'a PgPool, + held: bool, +} + +impl<'a> AdvisoryLock<'a> { + /// Attempts to acquire the archiver advisory lock. + /// + /// Returns `Ok(Some(lock))` if acquired, `Ok(None)` if another instance + /// holds it. Does not block. + /// + /// # Errors + /// + /// Returns an error if the database query fails. + pub async fn try_acquire(pool: &'a PgPool) -> Result> { + let row: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)") + .bind(LOCK_ID) + .fetch_one(pool) + .await?; + + if row.0 { + info!(lock_id = LOCK_ID, "acquired advisory lock"); + Ok(Some(Self { pool, held: true })) + } else { + warn!(lock_id = LOCK_ID, "another archiver instance holds the lock"); + Ok(None) + } + } + + /// Releases the advisory lock. + /// + /// # Errors + /// + /// Returns an error if the database query fails. + pub async fn release(mut self) -> Result<()> { + self.release_inner().await + } + + async fn release_inner(&mut self) -> Result<()> { + if self.held { + let _: (bool,) = sqlx::query_as("SELECT pg_advisory_unlock($1)") + .bind(LOCK_ID) + .fetch_one(self.pool) + .await?; + self.held = false; + info!(lock_id = LOCK_ID, "released advisory lock"); + } + Ok(()) + } +} + +impl Drop for AdvisoryLock<'_> { + fn drop(&mut self) { + if self.held { + // Best-effort warning — the lock will be released when the + // connection/session closes anyway, but explicit release is preferred. + warn!("advisory lock dropped without explicit release"); + } + } +} + +/// Returns the advisory lock ID used by the archiver (for testing/logging). +#[must_use] +#[allow(dead_code)] // Used in tests +pub const fn lock_id() -> i64 { + LOCK_ID +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lock_id_is_stable() { + assert_eq!(lock_id(), 0x0041_5544_4954_4100); + } +} From e7018c2335be2128234651e97e63d43fe85d66c5 Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:15 -0700 Subject: [PATCH 05/12] add partition lifecycle management --- services/audit-archiver/src/partition.rs | 363 +++++++++++++++++++++++ 1 file changed, 363 insertions(+) create mode 100644 services/audit-archiver/src/partition.rs diff --git a/services/audit-archiver/src/partition.rs b/services/audit-archiver/src/partition.rs new file mode 100644 index 0000000..a7a24d3 --- /dev/null +++ b/services/audit-archiver/src/partition.rs @@ -0,0 +1,363 @@ +//! Partition management for the `audit_events` table. +//! +//! Handles listing existing partitions, creating future partitions, +//! and detaching/dropping archived partitions. + +use chrono::{Datelike, NaiveDate}; +use sqlx::PgPool; +use tracing::{info, warn}; + +use crate::error::{ArchiverError, Result}; + +/// Information about an existing partition. +#[derive(Debug, Clone)] +pub struct PartitionInfo { + /// The partition table name (e.g., `audit_events_2025_01`). + pub name: String, + /// The lower bound of the partition range (inclusive). + /// Used by callers to determine partition age for retention decisions. + #[allow(dead_code)] + pub range_start: NaiveDate, + /// The upper bound of the partition range (exclusive). + pub range_end: NaiveDate, +} + +/// Generates the partition table name for a given year and month. +/// +/// Format: `audit_events_YYYY_MM` +#[must_use] +pub fn partition_name(year: i32, month: u32) -> String { + format!("audit_events_{year:04}_{month:02}") +} + +/// Validates that a partition name matches the expected format (public version). +/// +/// # Errors +/// +/// Returns an error if the name doesn't match `audit_events_YYYY_MM`. +pub fn validate_partition_name_public(name: &str) -> Result<()> { + validate_partition_name(name) +} + +/// Validates that a partition name matches the expected format. +/// Prevents SQL injection through generated identifiers. +fn validate_partition_name(name: &str) -> Result<()> { + // Partition names are program-generated from chrono::NaiveDate, never from + // user input. This validation is defense-in-depth against programming errors. + let is_valid = name.len() == "audit_events_YYYY_MM".len() + && name.starts_with("audit_events_") + && name[13..17].chars().all(|c| c.is_ascii_digit()) + && name.as_bytes()[17] == b'_' + && name[18..20].chars().all(|c| c.is_ascii_digit()); + + if is_valid { + Ok(()) + } else { + Err(ArchiverError::InvalidPartitionName(name.to_string())) + } +} + +/// Lists all existing partitions of the `audit_events` table. +/// +/// # Errors +/// +/// Returns an error if the database query fails. +pub async fn list_partitions(pool: &PgPool) -> Result> { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT child.relname AS partition_name \ + FROM pg_inherits \ + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid \ + JOIN pg_class child ON pg_inherits.inhrelid = child.oid \ + WHERE parent.relname = 'audit_events' \ + ORDER BY child.relname", + ) + .fetch_all(pool) + .await?; + + let mut partitions = Vec::with_capacity(rows.len()); + for (name,) in rows { + if let Some(info) = parse_partition_name(&name) { + partitions.push(info); + } else { + warn!(partition = %name, "skipping partition with unrecognized name format"); + } + } + + Ok(partitions) +} + +/// Parses a partition name into a `PartitionInfo` with date range. +fn parse_partition_name(name: &str) -> Option { + // Expected format: audit_events_YYYY_MM + if name.len() != 20 || !name.starts_with("audit_events_") { + return None; + } + + let year: i32 = name[13..17].parse().ok()?; + let month: u32 = name[18..20].parse().ok()?; + + let range_start = NaiveDate::from_ymd_opt(year, month, 1)?; + // Next month's first day + let range_end = if month == 12 { + NaiveDate::from_ymd_opt(year + 1, 1, 1)? + } else { + NaiveDate::from_ymd_opt(year, month + 1, 1)? + }; + + Some(PartitionInfo { + name: name.to_string(), + range_start, + range_end, + }) +} + +/// Creates a new monthly partition if it does not already exist. +/// +/// # Safety (SQL injection) +/// +/// The partition name and date boundaries are generated from `chrono::NaiveDate` +/// values, never from user input. The name is validated against a strict regex +/// pattern before use. SQLx does not support parameterized identifiers in DDL, +/// so `format!` is used here — this is safe because the inputs are trusted +/// program-generated values. +/// +/// # Errors +/// +/// Returns an error if the partition name is invalid or the query fails. +pub async fn create_partition(pool: &PgPool, year: i32, month: u32) -> Result { + let name = partition_name(year, month); + validate_partition_name(&name)?; + + let range_start = NaiveDate::from_ymd_opt(year, month, 1) + .ok_or_else(|| ArchiverError::InvalidPartitionName(name.clone()))?; + let range_end = if month == 12 { + NaiveDate::from_ymd_opt(year + 1, 1, 1) + } else { + NaiveDate::from_ymd_opt(year, month + 1, 1) + } + .ok_or_else(|| ArchiverError::InvalidPartitionName(name.clone()))?; + + // Check if partition already exists + let exists: (bool,) = sqlx::query_as( + "SELECT EXISTS(SELECT 1 FROM pg_class WHERE relname = $1 AND relkind = 'r')", + ) + .bind(&name) + .fetch_one(pool) + .await?; + + if exists.0 { + return Ok(false); + } + + // DDL: partition name and date boundaries are program-generated, not user input. + let create_sql = format!( + "CREATE TABLE {name} PARTITION OF audit_events \ + FOR VALUES FROM ('{range_start}') TO ('{range_end}')" + ); + sqlx::query(&create_sql).execute(pool).await?; + + // Grant same permissions as parent table + let grant_sql = format!( + "GRANT SELECT, INSERT ON {name} TO agentauth_service" + ); + sqlx::query(&grant_sql).execute(pool).await?; + + let revoke_sql = format!( + "REVOKE UPDATE, DELETE ON {name} FROM agentauth_service" + ); + sqlx::query(&revoke_sql).execute(pool).await?; + + info!(partition = %name, start = %range_start, end = %range_end, "created partition"); + Ok(true) +} + +/// Detaches a partition from the parent table without blocking writes. +/// +/// Uses `CONCURRENTLY` to avoid holding an ACCESS EXCLUSIVE lock on the +/// parent `audit_events` table. +/// +/// # Errors +/// +/// Returns an error if the partition name is invalid or the query fails. +pub async fn detach_partition(pool: &PgPool, name: &str) -> Result<()> { + validate_partition_name(name)?; + + // DDL: partition name is validated against strict format above. + let sql = format!("ALTER TABLE audit_events DETACH PARTITION {name} CONCURRENTLY"); + sqlx::query(&sql).execute(pool).await?; + + info!(partition = %name, "detached partition from parent table"); + Ok(()) +} + +/// Drops a previously detached partition table. +/// +/// # Errors +/// +/// Returns an error if the partition name is invalid or the query fails. +pub async fn drop_partition(pool: &PgPool, name: &str) -> Result<()> { + validate_partition_name(name)?; + + // DDL: partition name is validated against strict format above. + let sql = format!("DROP TABLE IF EXISTS {name}"); + sqlx::query(&sql).execute(pool).await?; + + info!(partition = %name, "dropped partition table"); + Ok(()) +} + +/// Counts the number of rows in a partition. +/// +/// # Errors +/// +/// Returns an error if the partition name is invalid or the query fails. +#[allow(dead_code)] // Utility for diagnostics and future use +pub async fn count_rows(pool: &PgPool, name: &str) -> Result { + validate_partition_name(name)?; + + // DDL: partition name is validated against strict format above. + let sql = format!("SELECT COUNT(*) FROM {name}"); + let row: (i64,) = sqlx::query_as(&sql).fetch_one(pool).await?; + Ok(row.0) +} + +/// Determines which partitions need to be created based on the current date +/// and the advance window. +#[must_use] +pub fn partitions_to_create(today: NaiveDate, advance_days: u32) -> Vec<(i32, u32)> { + let target_date = today + chrono::Duration::days(i64::from(advance_days)); + let mut result = Vec::new(); + + let mut year = today.year(); + let mut month = today.month(); + + loop { + result.push((year, month)); + + // Advance to next month + if month == 12 { + year += 1; + month = 1; + } else { + month += 1; + } + + let month_start = NaiveDate::from_ymd_opt(year, month, 1); + match month_start { + Some(d) if d <= target_date => {} + _ => break, + } + } + + // Include the month that contains target_date + let target_month_start = NaiveDate::from_ymd_opt(target_date.year(), target_date.month(), 1); + if let Some(tms) = target_month_start { + let entry = (tms.year(), tms.month()); + if !result.contains(&entry) { + result.push(entry); + } + } + + result +} + +/// Determines which partitions are eligible for archival based on retention policy. +#[must_use] +pub fn partitions_to_archive( + partitions: &[PartitionInfo], + today: NaiveDate, + hot_retention_days: u32, +) -> Vec { + let cutoff = today - chrono::Duration::days(i64::from(hot_retention_days)); + partitions + .iter() + .filter(|p| p.range_end <= cutoff) + .cloned() + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::NaiveDate; + + #[test] + fn test_partition_name_generation() { + assert_eq!(partition_name(2025, 1), "audit_events_2025_01"); + assert_eq!(partition_name(2026, 12), "audit_events_2026_12"); + } + + #[test] + fn test_validate_partition_name_valid() { + assert!(validate_partition_name("audit_events_2025_01").is_ok()); + assert!(validate_partition_name("audit_events_2026_12").is_ok()); + } + + #[test] + fn test_validate_partition_name_invalid() { + assert!(validate_partition_name("audit_events_202_01").is_err()); + assert!(validate_partition_name("other_table_2025_01").is_err()); + assert!(validate_partition_name("audit_events_2025_1").is_err()); + assert!(validate_partition_name("audit_events_abcd_ef").is_err()); + assert!(validate_partition_name("").is_err()); + assert!(validate_partition_name("audit_events_2025_01; DROP TABLE--").is_err()); + } + + #[test] + fn test_parse_partition_name() { + let info = parse_partition_name("audit_events_2025_01"); + assert!(info.is_some()); + let info = info.expect("test: known valid"); + assert_eq!(info.range_start, NaiveDate::from_ymd_opt(2025, 1, 1).expect("test")); + assert_eq!(info.range_end, NaiveDate::from_ymd_opt(2025, 2, 1).expect("test")); + } + + #[test] + fn test_parse_partition_name_december() { + let info = parse_partition_name("audit_events_2025_12"); + assert!(info.is_some()); + let info = info.expect("test: known valid"); + assert_eq!(info.range_end, NaiveDate::from_ymd_opt(2026, 1, 1).expect("test")); + } + + #[test] + fn test_partitions_to_create() { + let today = NaiveDate::from_ymd_opt(2026, 2, 25).expect("test"); + let result = partitions_to_create(today, 7); + // Feb 25 + 7 = Mar 4, so we need Feb and Mar + assert!(result.contains(&(2026, 2))); + assert!(result.contains(&(2026, 3))); + } + + #[test] + fn test_partitions_to_create_month_boundary() { + let today = NaiveDate::from_ymd_opt(2026, 12, 28).expect("test"); + let result = partitions_to_create(today, 7); + // Dec 28 + 7 = Jan 4, so we need Dec and next Jan + assert!(result.contains(&(2026, 12))); + assert!(result.contains(&(2027, 1))); + } + + #[test] + fn test_partitions_to_archive() { + let partitions = vec![ + PartitionInfo { + name: "audit_events_2025_01".to_string(), + range_start: NaiveDate::from_ymd_opt(2025, 1, 1).expect("test"), + range_end: NaiveDate::from_ymd_opt(2025, 2, 1).expect("test"), + }, + PartitionInfo { + name: "audit_events_2026_02".to_string(), + range_start: NaiveDate::from_ymd_opt(2026, 2, 1).expect("test"), + range_end: NaiveDate::from_ymd_opt(2026, 3, 1).expect("test"), + }, + ]; + + let today = NaiveDate::from_ymd_opt(2026, 3, 1).expect("test"); + let to_archive = partitions_to_archive(&partitions, today, 90); + + // 2025_01 ended Feb 1, which is > 90 days before Mar 1 2026 + assert_eq!(to_archive.len(), 1); + assert_eq!(to_archive[0].name, "audit_events_2025_01"); + } +} From 25a7af11287f1f8deafa629406ea74e5740adf1a Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:15 -0700 Subject: [PATCH 06/12] add audit event row export from partitions --- services/audit-archiver/src/export.rs | 145 ++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 services/audit-archiver/src/export.rs diff --git a/services/audit-archiver/src/export.rs b/services/audit-archiver/src/export.rs new file mode 100644 index 0000000..62f521a --- /dev/null +++ b/services/audit-archiver/src/export.rs @@ -0,0 +1,145 @@ +//! Export audit event rows from a PostgreSQL partition. +//! +//! Streams rows from a partition table and converts them into a flat +//! struct suitable for Arrow/Parquet conversion. + +use sqlx::PgPool; +use tracing::info; + +use crate::error::Result; +use crate::partition; + +/// A flat representation of an audit event row, with all types converted +/// to Parquet-friendly formats (strings for UUIDs, microseconds for timestamps). +#[derive(Debug, Clone)] +pub struct AuditRow { + /// Event ID (UUID as hyphenated string). + pub id: String, + /// Event type (enum cast to text). + pub event_type: String, + /// Agent ID (nullable UUID string). + pub agent_id: Option, + /// Service provider ID (nullable UUID string). + pub service_provider_id: Option, + /// Human principal ID (nullable UUID string). + pub human_principal_id: Option, + /// Grant ID (nullable UUID string). + pub grant_id: Option, + /// Token JTI (nullable UUID string). + pub token_jti: Option, + /// Event data (JSONB serialized to string). + pub event_data: String, + /// Outcome (e.g., "success", "error"). + pub outcome: String, + /// Error message (nullable). + pub error_message: Option, + /// Source IP (INET cast to text, nullable). + pub source_ip: Option, + /// User agent string (nullable). + pub user_agent: Option, + /// Request ID (nullable UUID string). + pub request_id: Option, + /// Trace ID (nullable). + pub trace_id: Option, + /// Previous event hash (32 bytes). + pub previous_event_hash: Vec, + /// Row hash (32 bytes). + pub row_hash: Vec, + /// Registry signature (64 bytes). + pub registry_signature: Vec, + /// Created at timestamp as microseconds since Unix epoch (UTC). + pub created_at_micros: i64, +} + +/// Intermediate row type for sqlx deserialization. +#[derive(sqlx::FromRow)] +struct RawAuditRow { + id: uuid::Uuid, + event_type: String, + agent_id: Option, + service_provider_id: Option, + human_principal_id: Option, + grant_id: Option, + token_jti: Option, + event_data: serde_json::Value, + outcome: String, + error_message: Option, + source_ip: Option, + user_agent: Option, + request_id: Option, + trace_id: Option, + previous_event_hash: Vec, + row_hash: Vec, + registry_signature: Vec, + created_at: chrono::DateTime, +} + +impl From for AuditRow { + fn from(raw: RawAuditRow) -> Self { + Self { + id: raw.id.to_string(), + event_type: raw.event_type, + agent_id: raw.agent_id.map(|u| u.to_string()), + service_provider_id: raw.service_provider_id.map(|u| u.to_string()), + human_principal_id: raw.human_principal_id.map(|u| u.to_string()), + grant_id: raw.grant_id.map(|u| u.to_string()), + token_jti: raw.token_jti.map(|u| u.to_string()), + event_data: raw.event_data.to_string(), + outcome: raw.outcome, + error_message: raw.error_message, + source_ip: raw.source_ip, + user_agent: raw.user_agent, + request_id: raw.request_id.map(|u| u.to_string()), + trace_id: raw.trace_id, + previous_event_hash: raw.previous_event_hash, + row_hash: raw.row_hash, + registry_signature: raw.registry_signature, + created_at_micros: raw.created_at.timestamp_micros(), + } + } +} + +/// Exports all rows from a partition in batches. +/// +/// Returns batches of `AuditRow` suitable for Parquet conversion. +/// Each batch contains up to `batch_size` rows. +/// +/// # Errors +/// +/// Returns an error if the partition name is invalid or the query fails. +pub async fn export_partition( + pool: &PgPool, + partition_name: &str, + batch_size: usize, +) -> Result>> { + // Validate partition name before using in SQL + partition::validate_partition_name_public(partition_name)?; + + // DDL: partition name is validated against strict format above, not user input. + let sql = format!( + "SELECT id, event_type::text, agent_id, service_provider_id, human_principal_id, \ + grant_id, token_jti, event_data, outcome, error_message, \ + source_ip::text, user_agent, request_id, trace_id, \ + previous_event_hash, row_hash, registry_signature, created_at \ + FROM {partition_name} ORDER BY created_at ASC" + ); + + let raw_rows: Vec = sqlx::query_as(&sql).fetch_all(pool).await?; + + let total = raw_rows.len(); + info!( + partition = %partition_name, + rows = total, + "exported rows from partition" + ); + + let batches: Vec> = raw_rows + .into_iter() + .map(AuditRow::from) + .collect::>() + .chunks(batch_size) + .map(<[AuditRow]>::to_vec) + .collect(); + + Ok(batches) +} From eb636b7102644e2690ee42524176e5cd34210282 Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:16 -0700 Subject: [PATCH 07/12] add arrow schema and parquet writer --- services/audit-archiver/src/parquet.rs | 218 +++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 services/audit-archiver/src/parquet.rs diff --git a/services/audit-archiver/src/parquet.rs b/services/audit-archiver/src/parquet.rs new file mode 100644 index 0000000..0ff7a77 --- /dev/null +++ b/services/audit-archiver/src/parquet.rs @@ -0,0 +1,218 @@ +//! Arrow schema definition and Parquet writer for audit events. + +use std::sync::Arc; + +use arrow::array::{ + BinaryBuilder, RecordBatch, StringBuilder, TimestampMicrosecondBuilder, +}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use bytes::Bytes; +use parquet::arrow::ArrowWriter; +use parquet::basic::Compression; +use parquet::file::properties::WriterProperties; + +use crate::error::Result; +use crate::export::AuditRow; + +/// Returns the Arrow schema for audit events. +#[must_use] +pub fn audit_events_schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("event_type", DataType::Utf8, false), + Field::new("agent_id", DataType::Utf8, true), + Field::new("service_provider_id", DataType::Utf8, true), + Field::new("human_principal_id", DataType::Utf8, true), + Field::new("grant_id", DataType::Utf8, true), + Field::new("token_jti", DataType::Utf8, true), + Field::new("event_data", DataType::Utf8, false), + Field::new("outcome", DataType::Utf8, false), + Field::new("error_message", DataType::Utf8, true), + Field::new("source_ip", DataType::Utf8, true), + Field::new("user_agent", DataType::Utf8, true), + Field::new("request_id", DataType::Utf8, true), + Field::new("trace_id", DataType::Utf8, true), + Field::new("previous_event_hash", DataType::Binary, false), + Field::new("row_hash", DataType::Binary, false), + Field::new("registry_signature", DataType::Binary, false), + Field::new( + "created_at", + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + false, + ), + ]) +} + +/// Converts a batch of `AuditRow`s into an Arrow `RecordBatch`. +/// +/// # Errors +/// +/// Returns an error if the batch cannot be built. +pub fn rows_to_record_batch(rows: &[AuditRow], schema: &Arc) -> Result { + let len = rows.len(); + + let mut id_builder = StringBuilder::with_capacity(len, len * 36); + let mut event_type_builder = StringBuilder::with_capacity(len, len * 20); + let mut agent_id_builder = StringBuilder::with_capacity(len, len * 36); + let mut sp_id_builder = StringBuilder::with_capacity(len, len * 36); + let mut hp_id_builder = StringBuilder::with_capacity(len, len * 36); + let mut grant_id_builder = StringBuilder::with_capacity(len, len * 36); + let mut token_jti_builder = StringBuilder::with_capacity(len, len * 36); + let mut event_data_builder = StringBuilder::with_capacity(len, len * 100); + let mut outcome_builder = StringBuilder::with_capacity(len, len * 10); + let mut error_msg_builder = StringBuilder::with_capacity(len, len * 50); + let mut source_ip_builder = StringBuilder::with_capacity(len, len * 15); + let mut user_agent_builder = StringBuilder::with_capacity(len, len * 50); + let mut request_id_builder = StringBuilder::with_capacity(len, len * 36); + let mut trace_id_builder = StringBuilder::with_capacity(len, len * 32); + let mut prev_hash_builder = BinaryBuilder::with_capacity(len, len * 32); + let mut row_hash_builder = BinaryBuilder::with_capacity(len, len * 32); + let mut sig_builder = BinaryBuilder::with_capacity(len, len * 64); + let mut created_at_builder = TimestampMicrosecondBuilder::with_capacity(len); + + for row in rows { + id_builder.append_value(&row.id); + event_type_builder.append_value(&row.event_type); + append_optional_string(&mut agent_id_builder, row.agent_id.as_deref()); + append_optional_string(&mut sp_id_builder, row.service_provider_id.as_deref()); + append_optional_string(&mut hp_id_builder, row.human_principal_id.as_deref()); + append_optional_string(&mut grant_id_builder, row.grant_id.as_deref()); + append_optional_string(&mut token_jti_builder, row.token_jti.as_deref()); + event_data_builder.append_value(&row.event_data); + outcome_builder.append_value(&row.outcome); + append_optional_string(&mut error_msg_builder, row.error_message.as_deref()); + append_optional_string(&mut source_ip_builder, row.source_ip.as_deref()); + append_optional_string(&mut user_agent_builder, row.user_agent.as_deref()); + append_optional_string(&mut request_id_builder, row.request_id.as_deref()); + append_optional_string(&mut trace_id_builder, row.trace_id.as_deref()); + prev_hash_builder.append_value(&row.previous_event_hash); + row_hash_builder.append_value(&row.row_hash); + sig_builder.append_value(&row.registry_signature); + created_at_builder.append_value(row.created_at_micros); + } + + let batch = RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(id_builder.finish()), + Arc::new(event_type_builder.finish()), + Arc::new(agent_id_builder.finish()), + Arc::new(sp_id_builder.finish()), + Arc::new(hp_id_builder.finish()), + Arc::new(grant_id_builder.finish()), + Arc::new(token_jti_builder.finish()), + Arc::new(event_data_builder.finish()), + Arc::new(outcome_builder.finish()), + Arc::new(error_msg_builder.finish()), + Arc::new(source_ip_builder.finish()), + Arc::new(user_agent_builder.finish()), + Arc::new(request_id_builder.finish()), + Arc::new(trace_id_builder.finish()), + Arc::new(prev_hash_builder.finish()), + Arc::new(row_hash_builder.finish()), + Arc::new(sig_builder.finish()), + Arc::new(created_at_builder.finish().with_timezone("UTC")), + ], + )?; + + Ok(batch) +} + +fn append_optional_string(builder: &mut StringBuilder, value: Option<&str>) { + match value { + Some(v) => builder.append_value(v), + None => builder.append_null(), + } +} + +/// Writes record batches to Parquet format in memory and returns the bytes. +/// +/// Uses ZSTD compression and a row group size of 65536. +/// +/// # Errors +/// +/// Returns an error if the Parquet writer fails. +pub fn write_parquet(batches: &[RecordBatch], schema: &Arc) -> Result { + let props = WriterProperties::builder() + .set_compression(Compression::ZSTD(parquet::basic::ZstdLevel::default())) + .set_max_row_group_size(65536) + .set_created_by("agentauth-audit-archiver".to_string()) + .build(); + + let mut buf = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, Arc::clone(schema), Some(props))?; + + for batch in batches { + writer.write(batch)?; + } + + writer.close()?; + Ok(Bytes::from(buf)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_schema_has_18_fields() { + let schema = audit_events_schema(); + assert_eq!(schema.fields().len(), 18); + } + + #[test] + fn test_schema_id_is_not_nullable() { + let schema = audit_events_schema(); + let id_field = schema.field_with_name("id").expect("test: id field exists"); + assert!(!id_field.is_nullable()); + } + + #[test] + fn test_schema_agent_id_is_nullable() { + let schema = audit_events_schema(); + let field = schema.field_with_name("agent_id").expect("test: field exists"); + assert!(field.is_nullable()); + } + + #[test] + fn test_roundtrip_parquet() { + let schema = Arc::new(audit_events_schema()); + let rows = vec![AuditRow { + id: "550e8400-e29b-41d4-a716-446655440000".to_string(), + event_type: "token_issued".to_string(), + agent_id: Some("agent-1".to_string()), + service_provider_id: None, + human_principal_id: None, + grant_id: None, + token_jti: None, + event_data: "{}".to_string(), + outcome: "success".to_string(), + error_message: None, + source_ip: None, + user_agent: None, + request_id: None, + trace_id: None, + previous_event_hash: vec![0u8; 32], + row_hash: vec![1u8; 32], + registry_signature: vec![2u8; 64], + created_at_micros: 1_700_000_000_000_000, + }]; + + let batch = rows_to_record_batch(&rows, &schema).expect("test: batch creation"); + assert_eq!(batch.num_rows(), 1); + + let bytes = write_parquet(&[batch], &schema).expect("test: parquet write"); + assert!(!bytes.is_empty()); + + // Verify we can read it back + let reader = parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new( + bytes::Bytes::from(bytes.to_vec()), + 1024, + ) + .expect("test: parquet reader"); + + let batches: Vec<_> = reader.into_iter().collect::, _>>().expect("test: read batches"); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 1); + } +} From b7a178526d355ca1e859c7c9dd8caa244799e4aa Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:17 -0700 Subject: [PATCH 08/12] add cold storage backends for S3 and local filesystem --- services/audit-archiver/src/storage.rs | 265 +++++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 services/audit-archiver/src/storage.rs diff --git a/services/audit-archiver/src/storage.rs b/services/audit-archiver/src/storage.rs new file mode 100644 index 0000000..8aaa29f --- /dev/null +++ b/services/audit-archiver/src/storage.rs @@ -0,0 +1,265 @@ +//! Cold storage backends for archived Parquet files. +//! +//! Provides a `ColdStorage` trait with two implementations: +//! - `S3Storage` for production (AWS S3, MinIO, or any S3-compatible API) +//! - `LocalFsStorage` for local development and testing + +use std::path::PathBuf; + +use bytes::Bytes; +use tracing::info; + +use crate::config::{StorageBackend, StorageConfig}; +use crate::error::{ArchiverError, Result}; + +/// Metadata attached to archived Parquet files. +#[derive(Debug, Clone)] +pub struct ArchiveMetadata { + /// The partition name (e.g., `audit_events_2025_01`). + pub partition_name: String, + /// Number of rows archived. + pub row_count: u64, +} + +/// Trait for cold storage backends. +#[async_trait::async_trait] +pub trait ColdStorage: Send + Sync { + /// Uploads a Parquet file to cold storage. + /// + /// # Errors + /// + /// Returns an error if the upload fails. + async fn upload(&self, key: &str, data: Bytes, metadata: &ArchiveMetadata) -> Result<()>; + + /// Checks if a key already exists in cold storage (for idempotency). + /// + /// # Errors + /// + /// Returns an error if the check fails. + async fn exists(&self, key: &str) -> Result; +} + +/// Creates a `ColdStorage` implementation from configuration. +/// +/// # Errors +/// +/// Returns an error if the configuration is invalid. +pub async fn create_storage(config: &StorageConfig) -> Result> { + match config.backend { + StorageBackend::S3 => { + let bucket = config + .s3_bucket + .as_deref() + .ok_or_else(|| ArchiverError::Config("s3_bucket is required when backend=s3".into()))?; + let storage = S3Storage::new(config, bucket).await?; + Ok(Box::new(storage)) + } + StorageBackend::LocalFs => { + let path = config + .local_path + .as_deref() + .ok_or_else(|| ArchiverError::Config("local_path is required when backend=local_fs".into()))?; + let storage = LocalFsStorage::new(path)?; + Ok(Box::new(storage)) + } + } +} + +/// Generates the storage key for a partition's Parquet file. +/// +/// Uses Hive-style partitioning: `{prefix}year=YYYY/month=MM/audit_events_YYYY_MM.parquet` +#[must_use] +pub fn storage_key(prefix: &str, partition_name: &str) -> String { + // Extract year and month from partition name (audit_events_YYYY_MM) + let year = &partition_name[13..17]; + let month = &partition_name[18..20]; + format!("{prefix}year={year}/month={month}/{partition_name}.parquet") +} + +// ── S3 Storage ──────────────────────────────────────────────────────────── + +/// S3-compatible cold storage (AWS S3, MinIO, GCS via S3 compatibility). +pub struct S3Storage { + client: aws_sdk_s3::Client, + bucket: String, +} + +impl S3Storage { + /// Creates a new S3 storage backend. + /// + /// # Errors + /// + /// Returns an error if the AWS SDK configuration fails. + async fn new(config: &StorageConfig, bucket: &str) -> Result { + let mut aws_config_builder = aws_config::from_env().region( + aws_config::Region::new(config.s3_region.clone()), + ); + + // Override endpoint for MinIO or other S3-compatible services + if let Some(ref endpoint) = config.s3_endpoint_url { + aws_config_builder = aws_config_builder.endpoint_url(endpoint); + } + + let aws_config = aws_config_builder.load().await; + + let s3_config = aws_sdk_s3::config::Builder::from(&aws_config) + .force_path_style(true) // Required for MinIO + .build(); + + let client = aws_sdk_s3::Client::from_conf(s3_config); + + Ok(Self { + client, + bucket: bucket.to_string(), + }) + } +} + +#[async_trait::async_trait] +impl ColdStorage for S3Storage { + async fn upload(&self, key: &str, data: Bytes, metadata: &ArchiveMetadata) -> Result<()> { + self.client + .put_object() + .bucket(&self.bucket) + .key(key) + .body(data.into()) + .content_type("application/vnd.apache.parquet") + .metadata("partition_name", &metadata.partition_name) + .metadata("row_count", metadata.row_count.to_string()) + .send() + .await + .map_err(|e| ArchiverError::Storage(format!("S3 upload failed: {e}")))?; + + info!( + bucket = %self.bucket, + key = %key, + rows = metadata.row_count, + "uploaded archive to S3" + ); + Ok(()) + } + + async fn exists(&self, key: &str) -> Result { + match self + .client + .head_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + { + Ok(_) => Ok(true), + Err(err) => { + // NotFound means the key doesn't exist + let is_not_found = err + .as_service_error() + .is_some_and(aws_sdk_s3::operation::head_object::HeadObjectError::is_not_found); + if is_not_found { + Ok(false) + } else { + Err(ArchiverError::Storage(format!("S3 head_object failed: {err}"))) + } + } + } + } +} + +// ── Local Filesystem Storage ────────────────────────────────────────────── + +/// Local filesystem cold storage (development and testing only). +pub struct LocalFsStorage { + base_path: PathBuf, +} + +impl LocalFsStorage { + /// Creates a new local filesystem storage backend. + /// + /// Creates the base directory if it doesn't exist. + /// + /// # Errors + /// + /// Returns an error if the directory cannot be created. + fn new(path: &str) -> Result { + let base_path = PathBuf::from(path); + std::fs::create_dir_all(&base_path) + .map_err(|e| ArchiverError::Storage(format!("failed to create directory {path}: {e}")))?; + Ok(Self { base_path }) + } +} + +#[async_trait::async_trait] +impl ColdStorage for LocalFsStorage { + async fn upload(&self, key: &str, data: Bytes, metadata: &ArchiveMetadata) -> Result<()> { + let file_path = self.base_path.join(key); + + // Create parent directories + if let Some(parent) = file_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .map_err(|e| ArchiverError::Storage(format!("failed to create dirs: {e}")))?; + } + + tokio::fs::write(&file_path, &data) + .await + .map_err(|e| ArchiverError::Storage(format!("failed to write file: {e}")))?; + + info!( + path = %file_path.display(), + rows = metadata.row_count, + size_bytes = data.len(), + "wrote archive to local filesystem" + ); + Ok(()) + } + + async fn exists(&self, key: &str) -> Result { + let file_path = self.base_path.join(key); + Ok(file_path.exists()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_storage_key_format() { + let key = storage_key("audit-archive/", "audit_events_2025_01"); + assert_eq!( + key, + "audit-archive/year=2025/month=01/audit_events_2025_01.parquet" + ); + } + + #[test] + fn test_storage_key_december() { + let key = storage_key("prefix/", "audit_events_2026_12"); + assert_eq!( + key, + "prefix/year=2026/month=12/audit_events_2026_12.parquet" + ); + } + + #[tokio::test] + async fn test_local_fs_upload_and_exists() { + let tmp = tempfile::tempdir().expect("test: create tmpdir"); + let storage = LocalFsStorage::new(tmp.path().to_str().expect("test: path")) + .expect("test: create storage"); + + let key = "year=2025/month=01/test.parquet"; + let data = Bytes::from_static(b"test parquet data"); + let metadata = ArchiveMetadata { + partition_name: "audit_events_2025_01".to_string(), + row_count: 42, + }; + + assert!(!storage.exists(key).await.expect("test: exists check")); + + storage + .upload(key, data, &metadata) + .await + .expect("test: upload"); + + assert!(storage.exists(key).await.expect("test: exists check")); + } +} From 6a817acce255df85e921591a72dc1899cfbd356f Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:17 -0700 Subject: [PATCH 09/12] implement audit archiver pipeline --- services/audit-archiver/src/main.rs | 353 +++++++++++++++++++++++++++- 1 file changed, 349 insertions(+), 4 deletions(-) diff --git a/services/audit-archiver/src/main.rs b/services/audit-archiver/src/main.rs index 450b83d..0bccd6c 100644 --- a/services/audit-archiver/src/main.rs +++ b/services/audit-archiver/src/main.rs @@ -1,11 +1,356 @@ //! AgentAuth Audit Archiver Service //! -//! Async audit log compaction and cold storage archival. +//! One-shot job that manages audit log partition lifecycle: +//! 1. Creates future partitions (7 days in advance) +//! 2. Archives expired partitions to Parquet in cold storage +//! 3. Drops archived partitions from PostgreSQL +//! +//! Designed to run as a Kubernetes CronJob with leader election +//! via PostgreSQL advisory locks. #![forbid(unsafe_code)] #![deny(clippy::unwrap_used)] -fn main() { - // Placeholder - to be implemented in Phase 9 - println!("AgentAuth Audit Archiver Service - Not yet implemented"); +mod config; +mod error; +mod export; +mod leader; +mod parquet; +mod partition; +mod storage; + +use std::sync::Arc; +use std::time::Instant; + +use chrono::Utc; +use sqlx::postgres::PgPoolOptions; +use tracing::{error, info, warn}; +use tracing_subscriber::EnvFilter; + +use crate::config::ArchiverConfig; +use crate::storage::ArchiveMetadata; + +/// Pipeline outcome counters for the summary log line. +struct PipelineResult { + partitions_created: u32, + partitions_archived: u32, + partitions_skipped: u32, + total_rows_exported: u64, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + dotenvy::dotenv().ok(); + + let config = ArchiverConfig::from_env().map_err(|e| { + eprintln!("configuration error: {e}"); + e + })?; + + init_tracing(&config.observability.log_level); + + info!( + version = env!("CARGO_PKG_VERSION"), + service = "audit-archiver", + "starting audit archiver" + ); + + let started = Instant::now(); + + // Connect to PostgreSQL primary + let pool = PgPoolOptions::new() + .max_connections(config.database.max_connections) + .acquire_timeout(std::time::Duration::from_secs( + config.database.connect_timeout_secs, + )) + .connect(&config.database.url) + .await?; + + info!("connected to PostgreSQL"); + + // Acquire advisory lock — exit cleanly if another instance is running + let Some(lock) = leader::AdvisoryLock::try_acquire(&pool).await? else { + info!("another archiver instance is running, exiting"); + return Ok(()); + }; + + // Set up a shutdown signal listener for graceful SIGTERM handling + let shutdown = tokio::sync::watch::channel(false); + let shutdown_rx = shutdown.1.clone(); + tokio::spawn(async move { + shutdown_signal().await; + let _ = shutdown.0.send(true); + }); + + // Run the pipeline + let result = run_pipeline(&config, &pool, &shutdown_rx).await; + + // Release the lock before exiting + if let Err(e) = lock.release().await { + error!(error = %e, "failed to release advisory lock"); + } + + pool.close().await; + + match result { + Ok(outcome) => { + info!( + partitions_created = outcome.partitions_created, + partitions_archived = outcome.partitions_archived, + partitions_skipped = outcome.partitions_skipped, + total_rows_exported = outcome.total_rows_exported, + duration_secs = started.elapsed().as_secs_f64(), + "archiver completed successfully" + ); + Ok(()) + } + Err(e) => { + error!( + error = %e, + duration_secs = started.elapsed().as_secs_f64(), + "archiver failed" + ); + Err(e.into()) + } + } +} + +/// Runs the full archival pipeline. +async fn run_pipeline( + config: &ArchiverConfig, + pool: &sqlx::PgPool, + shutdown: &tokio::sync::watch::Receiver, +) -> error::Result { + let mut result = PipelineResult { + partitions_created: 0, + partitions_archived: 0, + partitions_skipped: 0, + total_rows_exported: 0, + }; + + // Step A: Ensure future partitions exist + if *shutdown.borrow() { + warn!("shutdown requested, skipping partition creation"); + return Ok(result); + } + + let today = Utc::now().date_naive(); + ensure_future_partitions(pool, &config.retention, today, &mut result).await; + + // Step B: Archive expired partitions + if *shutdown.borrow() { + warn!("shutdown requested, skipping archival"); + return Ok(result); + } + + archive_expired(config, pool, shutdown, today, &mut result).await?; + + Ok(result) +} + +/// Creates partitions for the current and upcoming months. +async fn ensure_future_partitions( + pool: &sqlx::PgPool, + retention: &config::RetentionConfig, + today: chrono::NaiveDate, + result: &mut PipelineResult, +) { + let needed = partition::partitions_to_create(today, retention.advance_partition_days); + + for (year, month) in &needed { + match partition::create_partition(pool, *year, *month).await { + Ok(true) => result.partitions_created += 1, + Ok(false) => {} // Already exists + Err(e) => { + warn!(year, month, error = %e, "failed to create partition, continuing"); + } + } + } + + info!( + created = result.partitions_created, + checked = needed.len(), + "partition creation step complete" + ); +} + +/// Archives and drops partitions that have exceeded the hot retention window. +async fn archive_expired( + config: &ArchiverConfig, + pool: &sqlx::PgPool, + shutdown: &tokio::sync::watch::Receiver, + today: chrono::NaiveDate, + result: &mut PipelineResult, +) -> error::Result<()> { + let existing = partition::list_partitions(pool).await?; + let to_archive = + partition::partitions_to_archive(&existing, today, config.retention.hot_retention_days); + + if to_archive.is_empty() { + info!("no partitions eligible for archival"); + return Ok(()); + } + + info!(count = to_archive.len(), "partitions eligible for archival"); + + let cold_storage = storage::create_storage(&config.storage).await?; + let schema = Arc::new(parquet::audit_events_schema()); + + for partition_info in &to_archive { + if *shutdown.borrow() { + warn!(partition = %partition_info.name, "shutdown requested, stopping archival"); + break; + } + + let key = storage::storage_key(&config.storage.s3_prefix, &partition_info.name); + archive_single_partition(pool, &*cold_storage, &schema, partition_info, &key, result) + .await; + } + + Ok(()) +} + +/// Archives a single partition: export, compress, upload, then drop. +async fn archive_single_partition( + pool: &sqlx::PgPool, + cold_storage: &dyn storage::ColdStorage, + schema: &Arc, + partition_info: &partition::PartitionInfo, + key: &str, + result: &mut PipelineResult, +) { + // Idempotency: skip if already archived + match cold_storage.exists(key).await { + Ok(true) => { + info!(partition = %partition_info.name, key, "already archived, dropping partition"); + drop_partition_with_warning(pool, &partition_info.name).await; + result.partitions_skipped += 1; + return; + } + Ok(false) => {} // Proceed with export + Err(e) => { + warn!(partition = %partition_info.name, error = %e, "failed to check archive, skipping"); + return; + } + } + + // Export rows + let batches = match export::export_partition(pool, &partition_info.name, 1000).await { + Ok(b) => b, + Err(e) => { + error!(partition = %partition_info.name, error = %e, "failed to export partition"); + return; + } + }; + + let row_count: u64 = batches.iter().map(|b| b.len() as u64).sum(); + + if row_count == 0 { + info!(partition = %partition_info.name, "partition is empty, dropping"); + drop_partition_with_warning(pool, &partition_info.name).await; + result.partitions_archived += 1; + return; + } + + // Convert to Arrow and write Parquet + let record_batches: Vec<_> = batches + .iter() + .filter_map(|rows| { + parquet::rows_to_record_batch(rows, schema) + .map_err(|e| error!(partition = %partition_info.name, error = %e, "record batch error")) + .ok() + }) + .collect(); + + let parquet_bytes = match parquet::write_parquet(&record_batches, schema) { + Ok(bytes) => bytes, + Err(e) => { + error!(partition = %partition_info.name, error = %e, "parquet write failed"); + return; + } + }; + + let metadata = ArchiveMetadata { + partition_name: partition_info.name.clone(), + row_count, + }; + + // Upload and verify + if let Err(e) = cold_storage.upload(key, parquet_bytes, &metadata).await { + error!(partition = %partition_info.name, error = %e, "upload failed"); + return; + } + + match cold_storage.exists(key).await { + Ok(true) => {} + Ok(false) => { + error!(partition = %partition_info.name, key, "upload verification failed"); + return; + } + Err(e) => { + error!(partition = %partition_info.name, error = %e, "verification check failed"); + return; + } + } + + // Drop the partition + drop_partition_with_warning(pool, &partition_info.name).await; + + result.partitions_archived += 1; + result.total_rows_exported += row_count; + + info!(partition = %partition_info.name, rows = row_count, "partition archived and dropped"); +} + +/// Detaches and drops a partition, logging a warning on failure. +async fn drop_partition_with_warning(pool: &sqlx::PgPool, name: &str) { + if let Err(e) = partition::detach_partition(pool, name).await { + warn!(partition = %name, error = %e, "failed to detach partition"); + return; + } + if let Err(e) = partition::drop_partition(pool, name).await { + warn!(partition = %name, error = %e, "failed to drop partition"); + } +} + +/// Initializes structured JSON tracing. +fn init_tracing(log_level: &str) { + let filter = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new(log_level)) + .unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::fmt() + .with_env_filter(filter) + .json() + .with_target(true) + .with_thread_ids(false) + .init(); +} + +/// Waits for a shutdown signal (Ctrl+C or SIGTERM). +#[allow(clippy::expect_used)] // Signal handler setup is infallible in practice; + // panicking here is appropriate since the process + // cannot function without signal handling. +async fn shutdown_signal() { + let ctrl_c = async { + tokio::signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to install SIGTERM handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + () = ctrl_c => info!("received Ctrl+C"), + () = terminate => info!("received SIGTERM"), + } } From aa4e0ffb392f29a2a2b3ca9cb473c2bbea697b0c Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:18 -0700 Subject: [PATCH 10/12] add MinIO service for local S3-compatible storage --- docker-compose.yml | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 97cc75e..0af39cc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -269,6 +269,44 @@ services: networks: - agentauth-net + # MinIO (S3-compatible storage for local development) + minio: + image: minio/minio:latest + container_name: agentauth-minio + command: server /data --console-address ":9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + ports: + - "9000:9000" + - "9001:9001" + volumes: + - minio-data:/data + healthcheck: + test: ["CMD", "mc", "ready", "local"] + interval: 5s + timeout: 5s + retries: 5 + networks: + - agentauth-net + + # MinIO bucket initializer + minio-init: + image: minio/mc:latest + container_name: agentauth-minio-init + depends_on: + minio: + condition: service_healthy + entrypoint: /bin/sh + command: + - -c + - | + mc alias set local http://minio:9000 minioadmin minioadmin + mc mb local/agentauth-audit-archive --ignore-existing + echo "MinIO bucket created" + networks: + - agentauth-net + volumes: postgres-primary-data: postgres-replica-data: @@ -277,6 +315,7 @@ volumes: redis-3-data: prometheus-data: grafana-data: + minio-data: networks: agentauth-net: From 365f50fa6318439e3002f4f56630d4b21f8c05e3 Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:49:19 -0700 Subject: [PATCH 11/12] add audit archiver environment variables --- .env.example | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/.env.example b/.env.example index 4a6b3b4..783ae7b 100644 --- a/.env.example +++ b/.env.example @@ -138,6 +138,32 @@ REGISTRY_URL=http://localhost:8080 # Port for the approval UI server (default 3000; use 3001 to avoid Grafana conflict) PORT=3001 +# ============================================================================= +# Audit Archiver +# ============================================================================= + +# Database connection (uses primary — archiver runs DDL) +ARCHIVER__DATABASE__URL=postgres://agentauth:agentauth_dev@localhost:5434/agentauth +ARCHIVER__DATABASE__MAX_CONNECTIONS=4 +ARCHIVER__DATABASE__CONNECT_TIMEOUT_SECS=5 + +# Cold storage backend: "s3" or "local_fs" +ARCHIVER__STORAGE__BACKEND=s3 +ARCHIVER__STORAGE__S3_BUCKET=agentauth-audit-archive +ARCHIVER__STORAGE__S3_PREFIX=audit-archive/ +# Override endpoint for MinIO in local dev +ARCHIVER__STORAGE__S3_ENDPOINT_URL=http://localhost:9000 +ARCHIVER__STORAGE__S3_REGION=us-east-1 +# Local filesystem path (when backend=local_fs) +# ARCHIVER__STORAGE__LOCAL_PATH=./tmp/audit-archive + +# Retention policy +ARCHIVER__RETENTION__HOT_RETENTION_DAYS=90 +ARCHIVER__RETENTION__ADVANCE_PARTITION_DAYS=7 + +# Observability +ARCHIVER__OBSERVABILITY__LOG_LEVEL=info + # ============================================================================= # Development Only # ============================================================================= From f53eea35110c84dd9a223dae2379d728942eee2a Mon Sep 17 00:00:00 2001 From: Max Malkin Date: Sun, 1 Mar 2026 18:53:16 -0700 Subject: [PATCH 12/12] remove unused count_rows to fix SQL interpolation CI check --- services/audit-archiver/src/partition.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/services/audit-archiver/src/partition.rs b/services/audit-archiver/src/partition.rs index a7a24d3..ee3a807 100644 --- a/services/audit-archiver/src/partition.rs +++ b/services/audit-archiver/src/partition.rs @@ -206,21 +206,6 @@ pub async fn drop_partition(pool: &PgPool, name: &str) -> Result<()> { Ok(()) } -/// Counts the number of rows in a partition. -/// -/// # Errors -/// -/// Returns an error if the partition name is invalid or the query fails. -#[allow(dead_code)] // Utility for diagnostics and future use -pub async fn count_rows(pool: &PgPool, name: &str) -> Result { - validate_partition_name(name)?; - - // DDL: partition name is validated against strict format above. - let sql = format!("SELECT COUNT(*) FROM {name}"); - let row: (i64,) = sqlx::query_as(&sql).fetch_one(pool).await?; - Ok(row.0) -} - /// Determines which partitions need to be created based on the current date /// and the advance window. #[must_use]