Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions crates/solti-observe/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,28 @@
//! Observability primitives for the Solti task execution system.
//!
//! ## Modules
//!
//! | Module | Feature | Description |
//! |------------------------------------|-----------------|----------------------------------------------------------|
//! | [`LoggerConfig`] / [`init_logger`] | - | Configurable tracing subscriber (text / json / journald) |
//! | [`TracingEventSubscriber`] | `subscriber` | Logs taskvisor events via [`tracing`] |
//! | [`timezone_sync`] | `timezone-sync` | Periodic task that re-detects the local UTC offset |
//!
//! ## Quick start
//!
//! ```rust,ignore
//! use solti_observe::{LoggerConfig, LoggerLevel, init_local_offset, init_logger};
//!
//! // Must be called before spawning threads (tokio runtime).
//! init_local_offset();
//!
//! let cfg = LoggerConfig {
//! level: LoggerLevel::new("info")?,
//! ..Default::default()
//! };
//! init_logger(&cfg)?;
//! ```

mod logger;
pub use logger::*;

Expand Down
2 changes: 0 additions & 2 deletions crates/solti-observe/src/logger/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,3 @@ pub enum LoggerError {
#[error("Invalid log level: {0}")]
InvalidLevel(String),
}

pub type LoggerResult<T> = Result<T, LoggerError>;
22 changes: 10 additions & 12 deletions crates/solti-observe/src/logger/log.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,38 @@
use tracing::Subscriber;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt};

use crate::logger::{
config::LoggerConfig,
error::{LoggerError, LoggerResult},
object::LoggerRfc3339,
};
use crate::logger::{config::LoggerConfig, error::LoggerError, object::LoggerRfc3339};

/// Initializes text logger.
pub fn logger_text(cfg: &LoggerConfig) -> LoggerResult<()> {
pub fn logger_text(cfg: &LoggerConfig) -> Result<(), LoggerError> {
let filter = cfg.level.to_env_filter();
let timer = LoggerRfc3339::new(cfg.tz);
let fmt_layer = fmt::layer()
.with_ansi(cfg.should_use_color())
.with_target(cfg.with_targets)
.with_timer(LoggerRfc3339);
.with_timer(timer);

let subscriber = tracing_subscriber::registry().with(filter).with(fmt_layer);
init_subscriber(subscriber)
}

/// Initializes JSON (structured) logger.
pub fn logger_json(cfg: &LoggerConfig) -> LoggerResult<()> {
pub fn logger_json(cfg: &LoggerConfig) -> Result<(), LoggerError> {
let filter = cfg.level.to_env_filter();
let timer = LoggerRfc3339::new(cfg.tz);
let fmt_layer = fmt::layer()
.json()
.with_ansi(false)
.with_target(cfg.with_targets)
.with_timer(LoggerRfc3339);
.with_timer(timer);

let subscriber = tracing_subscriber::registry().with(filter).with(fmt_layer);
init_subscriber(subscriber)
}

/// Initializes journald logger (Linux only).
#[cfg(target_os = "linux")]
pub fn logger_journald(cfg: &LoggerConfig) -> LoggerResult<()> {
pub fn logger_journald(cfg: &LoggerConfig) -> Result<(), LoggerError> {
let filter = cfg.level.to_env_filter();
let journald =
tracing_journald::layer().map_err(|e| LoggerError::JournaldInitFailed(e.to_string()))?;
Expand All @@ -45,12 +43,12 @@ pub fn logger_journald(cfg: &LoggerConfig) -> LoggerResult<()> {

/// Stub for journald on non-Linux platforms.
#[cfg(not(all(target_os = "linux")))]
pub fn logger_journald(_cfg: &LoggerConfig) -> LoggerResult<()> {
pub fn logger_journald(_cfg: &LoggerConfig) -> Result<(), LoggerError> {
Err(LoggerError::JournaldNotSupported)
}

/// Installs the subscriber as the global default.
fn init_subscriber<S>(subscriber: S) -> LoggerResult<()>
fn init_subscriber<S>(subscriber: S) -> Result<(), LoggerError>
where
S: Subscriber + Send + Sync + 'static,
{
Expand Down
5 changes: 3 additions & 2 deletions crates/solti-observe/src/logger/object/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use serde::{Deserialize, Serialize, Serializer};
use crate::logger::LoggerError;

/// Output format for the logger.
/// - `Text` β€” human-friendly, colored (when enabled) text logs.
/// - `Json` β€” structured JSON logs for machines / log collectors.
/// - `Text` - colored (when enabled) text logs.
/// - `Json` β€” structured JSON logs.
/// - `Journald` β€” logs are sent to systemd-journald (Linux only).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
Expand All @@ -29,6 +29,7 @@ impl FromStr for LoggerFormat {
type Err = LoggerError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let norm = s.trim().to_ascii_lowercase();

match norm.as_str() {
"text" => Ok(Self::Text),
"json" => Ok(Self::Json),
Expand Down
42 changes: 26 additions & 16 deletions crates/solti-observe/src/logger/object/rfc3339.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,40 @@
use std::fmt;

use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use time::{OffsetDateTime, UtcOffset, format_description::well_known::Rfc3339};
use tracing_subscriber::fmt::{format::Writer, time::FormatTime};

use crate::logger::object::timezone::get_or_detect_local_offset;
use crate::logger::object::timezone::{LoggerTimeZone, get_or_detect_local_offset};

/// Dynamic RFC3339 timestamp formatter with local timezone support.
/// Dynamic RFC3339 timestamp formatter that respects [`LoggerTimeZone`].
///
/// Reads the current local offset on every invocation, allowing timezone
/// changes to be reflected in logs without subscriber reinitialization.
///
/// Falls back to UTC if offset detection fails.
/// - [`LoggerTimeZone::Utc`] β€” always formats as `…+00:00`
/// - [`LoggerTimeZone::Local`] β€” reads the cached local offset on every call,
/// so DST changes picked up by [`crate::timezone_sync`] are reflected
/// without subscriber reinitialization.
#[derive(Debug, Clone, Copy)]
pub struct LoggerRfc3339;
pub struct LoggerRfc3339 {
tz: LoggerTimeZone,
}

impl LoggerRfc3339 {
/// Create a formatter for the given timezone setting.
pub fn new(tz: LoggerTimeZone) -> Self {
Self { tz }
}
}

impl FormatTime for LoggerRfc3339 {
fn format_time(&self, w: &mut Writer<'_>) -> fmt::Result {
let local = OffsetDateTime::now_utc().to_offset(get_or_detect_local_offset());
let offset = match self.tz {
LoggerTimeZone::Utc => UtcOffset::UTC,
LoggerTimeZone::Local => get_or_detect_local_offset(),
};

let ts = OffsetDateTime::now_utc().to_offset(offset);

match local.format(&Rfc3339) {
Ok(ts) => {
write!(w, "{} ", ts)
}
Err(_) => {
write!(w, "<invalid-time> ")
}
match ts.format(&Rfc3339) {
Ok(s) => write!(w, "{s} "),
Err(_) => write!(w, "<invalid-time> "),
}
}
}
13 changes: 11 additions & 2 deletions crates/solti-observe/src/logger/object/timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ pub fn init_local_offset() {
if let Ok(mut guard) = LOCAL_OFFSET.write() {
*guard = offset;
}
let _ = INIT_DONE.set(());
}

/// Synchronizes local offset.
/// Re-detects the system UTC offset and updates the global cache.
///
/// Called periodically by the [`crate::timezone_sync`] task.
/// If detection fails (common in multi-threaded contexts on Unix),
/// the existing cached offset is preserved and no error is returned.
pub(crate) fn sync_local_offset() -> Result<(), LoggerError> {
match UtcOffset::current_local_offset() {
Ok(new_offset) => {
Expand All @@ -118,7 +123,11 @@ pub(crate) fn sync_local_offset() -> Result<(), LoggerError> {
}
}

/// Returns current local offset for timestamp formatting.
/// Returns the cached local offset for timestamp formatting.
///
/// On first call (if [`init_local_offset`] was never called) attempts a
/// one-shot detection. On failure prints a warning to stderr and falls
/// back to UTC.
pub(crate) fn get_or_detect_local_offset() -> UtcOffset {
INIT_DONE.get_or_init(|| match UtcOffset::current_local_offset() {
Ok(detected) => {
Expand Down
51 changes: 30 additions & 21 deletions crates/solti-observe/src/logger/tasks/timezone_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,33 @@ use tracing::debug;

use crate::logger::object::timezone::sync_local_offset;

/// Logical slot name used for timezone sync task.
///
/// Ensures that only one sync operation runs at any time.
/// Logical slot name for the timezone sync task.
pub const TZ_SYNC_SLOT: &str = "solti-logger-tz-sync";

/// Per-attempt timeout in milliseconds.
///
/// If syncing the timezone offset takes longer than this limit,
/// the task is considered failed and restart/backoff logic applies.
/// Per-attempt timeout (ms).
pub const TZ_SYNC_TIMEOUT_MS: u64 = 60_000;

/// Delay between successful sync attempts in milliseconds.
///
/// This defines the periodic nature of the timezone-sync task.
pub const TZ_SYNC_RETRY_MS: u64 = 3_600_000;
/// Interval between successful sync attempts (ms).
pub const TZ_SYNC_PERIOD_MS: u64 = 3_600_000;

/// Initial backoff delay on failure (ms).
const BACKOFF_FIRST_MS: u64 = 5_000;

/// Build the timezone sync task and its model-level specification.
/// Maximum backoff delay on repeated failures (ms).
const BACKOFF_MAX_MS: u64 = 300_000;

/// Backoff multiplier per consecutive failure.
const BACKOFF_FACTOR: f64 = 2.0;

/// Builds the timezone sync task and its supervision specification.
///
/// Returns a `(TaskRef, CreateSpec)` pair ready to be submitted to a [`taskvisor::Supervisor`] via `submit_with_task`.
///
/// Returns:
/// - [`TaskRef`] β€” executable task body.
/// - [`CreateSpec`] β€” restart/backoff/admission policy and slot binding.
/// # Behaviour
/// - On **success**: next run is scheduled after [`TZ_SYNC_PERIOD_MS`].
/// - On **failure**: exponential backoff from 5 s to 5 min with equal jitter.
/// - **Admission**: [`AdmissionStrategy::Replace`] β€” duplicate submissions
/// cancel the in-flight attempt and reschedule.
pub fn timezone_sync() -> (TaskRef, CreateSpec) {
let task: TaskRef = TaskFn::arc(TZ_SYNC_SLOT, |ctx: CancellationToken| async move {
debug!("timezone sync started");
Expand All @@ -49,18 +55,21 @@ pub fn timezone_sync() -> (TaskRef, CreateSpec) {

let backoff = BackoffStrategy {
jitter: JitterStrategy::Equal,
first_ms: TZ_SYNC_TIMEOUT_MS,
max_ms: TZ_SYNC_TIMEOUT_MS,
factor: 1.0,
first_ms: BACKOFF_FIRST_MS,
max_ms: BACKOFF_MAX_MS,
factor: BACKOFF_FACTOR,
};

let spec = CreateSpec {
restart: RestartStrategy::periodic(TZ_SYNC_PERIOD_MS),
slot: TZ_SYNC_SLOT.to_string(),
timeout_ms: TZ_SYNC_TIMEOUT_MS,
restart: RestartStrategy::periodic(TZ_SYNC_RETRY_MS),
backoff,

admission: AdmissionStrategy::Replace,
kind: TaskKind::None,
labels: RunnerLabels::default(),
kind: TaskKind::None,
backoff,
};

(task, spec)
}
Loading
Loading