Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,32 @@ REGISTRY_URL=http://localhost:8080
# Port for the approval UI server (default 3000; use 3001 to avoid Grafana conflict)
PORT=3001

# =============================================================================
# Audit Archiver
# =============================================================================

# Database connection (uses primary — archiver runs DDL)
ARCHIVER__DATABASE__URL=postgres://agentauth:agentauth_dev@localhost:5434/agentauth
ARCHIVER__DATABASE__MAX_CONNECTIONS=4
ARCHIVER__DATABASE__CONNECT_TIMEOUT_SECS=5

# Cold storage backend: "s3" or "local_fs"
ARCHIVER__STORAGE__BACKEND=s3
ARCHIVER__STORAGE__S3_BUCKET=agentauth-audit-archive
ARCHIVER__STORAGE__S3_PREFIX=audit-archive/
# Override endpoint for MinIO in local dev
ARCHIVER__STORAGE__S3_ENDPOINT_URL=http://localhost:9000
ARCHIVER__STORAGE__S3_REGION=us-east-1
# Local filesystem path (when backend=local_fs)
# ARCHIVER__STORAGE__LOCAL_PATH=./tmp/audit-archive

# Retention policy
ARCHIVER__RETENTION__HOT_RETENTION_DAYS=90
ARCHIVER__RETENTION__ADVANCE_PARTITION_DAYS=7

# Observability
ARCHIVER__OBSERVABILITY__LOG_LEVEL=info

# =============================================================================
# Development Only
# =============================================================================
Expand Down
11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ proptest = "1.4"
schemars = "0.8"
jsonschema = "0.17"

# Arrow / Parquet (audit archiver)
arrow = { version = "53", default-features = false, features = ["chrono-tz"] }
parquet = { version = "53", default-features = false, features = ["arrow", "zstd"] }

# AWS SDK (S3 cold storage)
aws-sdk-s3 = "1"
aws-config = { version = "1", features = ["behavior-version-latest"] }

# Bytes
bytes = "1"

[workspace.lints.rust]
unsafe_code = "forbid"
missing_docs = "warn"
Expand Down
39 changes: 39 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,44 @@ services:
networks:
- agentauth-net

# MinIO (S3-compatible storage for local development)
minio:
image: minio/minio:latest
container_name: agentauth-minio
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio-data:/data
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 5s
retries: 5
networks:
- agentauth-net

# MinIO bucket initializer
minio-init:
image: minio/mc:latest
container_name: agentauth-minio-init
depends_on:
minio:
condition: service_healthy
entrypoint: /bin/sh
command:
- -c
- |
mc alias set local http://minio:9000 minioadmin minioadmin
mc mb local/agentauth-audit-archive --ignore-existing
echo "MinIO bucket created"
networks:
- agentauth-net

volumes:
postgres-primary-data:
postgres-replica-data:
Expand All @@ -277,6 +315,7 @@ volumes:
redis-3-data:
prometheus-data:
grafana-data:
minio-data:

networks:
agentauth-net:
Expand Down
23 changes: 23 additions & 0 deletions services/audit-archiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,37 @@ chrono = { workspace = true }

# Error handling
anyhow = { workspace = true }
thiserror = { workspace = true }

# Async traits
async-trait = { workspace = true }

# Tracing
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

# Metrics
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }

# Configuration
config = { workspace = true }
dotenvy = { workspace = true }

# Arrow / Parquet
arrow = { workspace = true }
parquet = { workspace = true }

# AWS SDK (S3)
aws-sdk-s3 = { workspace = true }
aws-config = { workspace = true }

# Bytes
bytes = { workspace = true }

[dev-dependencies]
tempfile = "3"
tokio-test = { workspace = true }

[lints]
workspace = true
156 changes: 156 additions & 0 deletions services/audit-archiver/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//! Configuration for the audit archiver service.

use serde::Deserialize;

/// Top-level archiver configuration, loaded from environment variables
/// with the `ARCHIVER__` prefix.
#[derive(Debug, Clone, Deserialize)]
pub struct ArchiverConfig {
/// Database connection settings.
pub database: DatabaseConfig,
/// Cold storage settings.
pub storage: StorageConfig,
/// Partition retention policy.
#[serde(default)]
pub retention: RetentionConfig,
/// Observability settings.
#[serde(default)]
pub observability: ObservabilityConfig,
}

/// Database configuration for the archiver.
/// The archiver only needs primary access (for DDL and reads before drop).
#[derive(Debug, Clone, Deserialize)]
pub struct DatabaseConfig {
/// PostgreSQL primary connection URL.
pub url: String,
/// Maximum number of connections in the pool.
#[serde(default = "default_max_connections")]
pub max_connections: u32,
/// Connection timeout in seconds.
#[serde(default = "default_connect_timeout")]
pub connect_timeout_secs: u64,
}

/// Cold storage configuration.
#[derive(Debug, Clone, Deserialize)]
pub struct StorageConfig {
/// Storage backend type.
pub backend: StorageBackend,
/// S3 bucket name (required when backend = s3).
pub s3_bucket: Option<String>,
/// S3 key prefix for archived partitions.
#[serde(default = "default_s3_prefix")]
pub s3_prefix: String,
/// S3 endpoint URL override (for MinIO in local dev).
pub s3_endpoint_url: Option<String>,
/// S3 region.
#[serde(default = "default_s3_region")]
pub s3_region: String,
/// Local filesystem path (required when backend = local_fs).
pub local_path: Option<String>,
}

/// Which storage backend to use.
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StorageBackend {
/// AWS S3 or S3-compatible (MinIO).
S3,
/// Local filesystem (development only).
LocalFs,
}

/// Partition retention policy.
#[derive(Debug, Clone, Deserialize)]
pub struct RetentionConfig {
/// How many days to keep partitions in PostgreSQL.
#[serde(default = "default_hot_retention")]
pub hot_retention_days: u32,
/// How many days in advance to create future partitions.
#[serde(default = "default_advance_days")]
pub advance_partition_days: u32,
}

/// Observability settings.
#[derive(Debug, Clone, Deserialize)]
pub struct ObservabilityConfig {
/// Log level filter.
#[serde(default = "default_log_level")]
pub log_level: String,
}

impl ArchiverConfig {
/// Loads configuration from environment variables with `ARCHIVER__` prefix.
///
/// # Errors
///
/// Returns an error if required variables are missing or invalid.
pub fn from_env() -> std::result::Result<Self, config::ConfigError> {
config::Config::builder()
.add_source(
config::Environment::with_prefix("ARCHIVER")
.separator("__")
.try_parsing(true),
)
.build()?
.try_deserialize()
}
}

impl Default for RetentionConfig {
fn default() -> Self {
Self {
hot_retention_days: default_hot_retention(),
advance_partition_days: default_advance_days(),
}
}
}

impl Default for ObservabilityConfig {
fn default() -> Self {
Self {
log_level: default_log_level(),
}
}
}

fn default_max_connections() -> u32 {
4
}
fn default_connect_timeout() -> u64 {
5
}
fn default_s3_prefix() -> String {
"audit-archive/".to_string()
}
fn default_s3_region() -> String {
"us-east-1".to_string()
}
fn default_hot_retention() -> u32 {
90
}
fn default_advance_days() -> u32 {
7
}
fn default_log_level() -> String {
"info".to_string()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_retention_defaults() {
let retention = RetentionConfig::default();
assert_eq!(retention.hot_retention_days, 90);
assert_eq!(retention.advance_partition_days, 7);
}

#[test]
fn test_observability_defaults() {
let obs = ObservabilityConfig::default();
assert_eq!(obs.log_level, "info");
}
}
32 changes: 32 additions & 0 deletions services/audit-archiver/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Error types for the audit archiver service.

/// Errors that can occur during audit archival operations.
#[derive(Debug, thiserror::Error)]
pub enum ArchiverError {
/// Database error during partition management or data export.
#[error("database error: {0}")]
Database(#[from] sqlx::Error),

/// Failed to serialize audit events to Parquet format.
#[error("parquet serialization error: {0}")]
Parquet(#[from] parquet::errors::ParquetError),

/// Failed to build Arrow record batch from audit rows.
#[error("arrow error: {0}")]
Arrow(#[from] arrow::error::ArrowError),

/// Cold storage upload or verification failed.
#[error("storage error: {0}")]
Storage(String),

/// Configuration is invalid or missing required fields.
#[error("configuration error: {0}")]
Config(String),

/// The generated partition name failed validation.
#[error("invalid partition name: {0}")]
InvalidPartitionName(String),
}

/// Result alias for archiver operations.
pub type Result<T> = std::result::Result<T, ArchiverError>;
Loading
Loading