From 9cc15b90a809a1ba3b713f4ad2f14fa5caa5d94c Mon Sep 17 00:00:00 2001 From: Chenrui Wu Date: Sat, 31 Jan 2026 20:20:23 +0800 Subject: [PATCH 1/2] feat(server): implement log rotation based on size and retention - implemented log rotation based on size and retention as the title; - implemented configurable attributions and imported breaking changes; - added units and integration test in logger.rs and intergration mod; - added documentations and imported new dependencies, etc. Details can be looked up at Apache Iggy apache#2452 --- Cargo.lock | 21 + DEPENDENCIES.md | 2 + core/common/src/utils/byte_size.rs | 7 + core/common/src/utils/duration.rs | 29 ++ core/integration/src/test_server.rs | 4 +- .../server/scenarios/log_rotation_scenario.rs | 382 ++++++++++++++ .../integration/tests/server/scenarios/mod.rs | 1 + core/integration/tests/server/specific.rs | 5 +- core/server/Cargo.toml | 2 + core/server/config.toml | 22 +- core/server/src/configs/defaults.rs | 9 +- core/server/src/configs/displays.rs | 8 +- core/server/src/configs/system.rs | 7 +- core/server/src/configs/validators.rs | 47 +- core/server/src/log/logger.rs | 475 +++++++++++++++++- foreign/cpp/tests/e2e/server.toml | 26 +- 16 files changed, 1009 insertions(+), 38 deletions(-) create mode 100644 core/integration/tests/server/scenarios/log_rotation_scenario.rs diff --git a/Cargo.lock b/Cargo.lock index b43d62fe0d..f2806dd022 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3302,6 +3302,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "fs2" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -7642,6 +7652,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "rolling-file" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8395b4f860856b740f20a296ea2cd4d823e81a2658cf05ef61be22916026a906" +dependencies = [ + "chrono", +] + [[package]] name = "route-recognizer" version = "0.3.1" @@ -8273,6 +8292,7 @@ dependencies = [ "figlet-rs", "figment", "flume 0.12.0", + "fs2", "futures", "hash32 1.0.0", "human-repr", @@ -8297,6 +8317,7 @@ dependencies = [ "reqwest", "ringbuffer", "rmp-serde", + "rolling-file", "rust-embed", "rustls", "rustls-pemfile", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index a7fd8c256e..924cbd10d3 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -293,6 +293,7 @@ foreign-types-shared: 0.1.1, "Apache-2.0 OR MIT", form_urlencoded: 1.2.2, "Apache-2.0 OR MIT", fragile: 2.0.1, "Apache-2.0", fs-err: 3.2.2, "Apache-2.0 OR MIT", +fs2: 0.4.3, "Apache-2.0 OR MIT", fs_extra: 1.3.0, "MIT", fsevent-sys: 4.1.0, "MIT", funty: 2.0.0, "MIT", @@ -661,6 +662,7 @@ rmcp-macros: 0.14.0, "Apache-2.0", rmp: 0.8.15, "MIT", rmp-serde: 1.3.1, "MIT", roaring: 0.11.3, "Apache-2.0 OR MIT", +rolling-file: 0.2.0, "Apache-2.0 OR MIT", route-recognizer: 0.3.1, "MIT", rsa: 0.9.10, "Apache-2.0 OR MIT", rust-embed: 8.11.0, "MIT", diff --git a/core/common/src/utils/byte_size.rs b/core/common/src/utils/byte_size.rs index 2ccf58beed..230f8b4846 100644 --- a/core/common/src/utils/byte_size.rs +++ b/core/common/src/utils/byte_size.rs @@ -137,6 +137,13 @@ impl IggyByteSize { format!("{:.2}", self.0.get_appropriate_unit(UnitType::Decimal)) } + /// Subtract another IggyByteSize value, return 0 if the result is negative. + pub fn saturating_sub(&self, other: &Self) -> Self { + let self_bytes = self.as_bytes_u64(); + let other_bytes = other.as_bytes_u64(); + IggyByteSize::new(self_bytes.saturating_sub(other_bytes)) + } + /// Calculates the throughput based on the provided duration and returns a human-readable string. pub(crate) fn _as_human_throughput_string(&self, duration: &IggyDuration) -> String { if duration.is_zero() { diff --git a/core/common/src/utils/duration.rs b/core/common/src/utils/duration.rs index cb256808ad..aa6c796a7e 100644 --- a/core/common/src/utils/duration.rs +++ b/core/common/src/utils/duration.rs @@ -29,6 +29,35 @@ use std::{ pub const SEC_IN_MICRO: u64 = 1_000_000; +/// A struct for representing time durations with various utility functions. +/// +/// This struct wraps `std::time::Duration` and uses the `humantime` crate for parsing and formatting +/// human-readable duration strings. It also implements serialization and deserialization via the `serde` crate. +/// +/// # Example +/// +/// ``` +/// use iggy_common::IggyDuration; +/// use std::str::FromStr; +/// +/// let duration = IggyDuration::from(3661_000_000_u64); // 3661 seconds in microseconds +/// assert_eq!(3661, duration.as_secs()); +/// assert_eq!("1h 1m 1s", duration.as_human_time_string()); +/// assert_eq!("1h 1m 1s", format!("{}", duration)); +/// +/// let duration = IggyDuration::from(0_u64); +/// assert_eq!(0, duration.as_secs()); +/// assert_eq!("0s", duration.as_human_time_string()); +/// assert_eq!("0s", format!("{}", duration)); +/// +/// let duration = IggyDuration::from_str("1h 1m 1s").unwrap(); +/// assert_eq!(3661, duration.as_secs()); +/// assert_eq!("1h 1m 1s", duration.as_human_time_string()); +/// assert_eq!("1h 1m 1s", format!("{}", duration)); +/// +/// let duration = IggyDuration::from_str("unlimited").unwrap(); +/// assert_eq!(0, duration.as_secs()); +/// ``` #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct IggyDuration { duration: Duration, diff --git a/core/integration/src/test_server.rs b/core/integration/src/test_server.rs index a58ad85a3b..81c68b6197 100644 --- a/core/integration/src/test_server.rs +++ b/core/integration/src/test_server.rs @@ -120,9 +120,9 @@ impl TestServer { Ok(parallelism) => { let available_cpus = parallelism.get(); if available_cpus >= 4 { - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); let max_start = available_cpus - 4; - let start = rng.gen_range(0..=max_start); + let start = rng.random_range(0..=max_start); let end = start + 4; format!("{}..{}", start, end) } else { diff --git a/core/integration/tests/server/scenarios/log_rotation_scenario.rs b/core/integration/tests/server/scenarios/log_rotation_scenario.rs new file mode 100644 index 0000000000..f5db3ee042 --- /dev/null +++ b/core/integration/tests/server/scenarios/log_rotation_scenario.rs @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::server::scenarios::{PARTITIONS_COUNT, STREAM_NAME, TOPIC_NAME}; +use iggy::prelude::*; +use iggy_common::{ + CompressionAlgorithm, Identifier, IggyByteSize, IggyDuration, IggyExpiry, MaxTopicSize, +}; +use integration::tcp_client::TcpClientFactory; +use integration::test_server::{ClientFactory, IpAddrKind, TestServer, login_root}; +use once_cell::sync::Lazy; +use serial_test::parallel; +use std::collections::HashMap; +use std::path::Path; +use std::time::Duration; +use test_case::test_matrix; +use tokio::fs; +use tokio::sync::Mutex; +use tokio::time::{sleep, timeout}; + +const RETENTION_SECS: u64 = 30; +const OPERATION_TIMEOUT_SECS: u64 = 10; +const OPERATION_LOOP_COUNT: usize = 300; +const FROM_BYTES_TO_KB: u64 = 1000; +const IGGY_LOG_BASE_NAME: &str = "iggy-server.log"; + +static PRINT_LOCK: Lazy> = Lazy::new(|| Mutex::new(())); + +#[derive(Debug)] +pub struct LogRotationTestConfig { + pub name: String, + pub max_single_log_size: IggyByteSize, + pub max_total_log_size: IggyByteSize, + pub rotation_check_interval: IggyDuration, + pub retention: IggyDuration, +} + +fn config_regular_rotation() -> LogRotationTestConfig { + LogRotationTestConfig { + name: "log_regular_rotation".to_string(), + max_single_log_size: IggyByteSize::new(100_000), + max_total_log_size: IggyByteSize::new(400_000), + rotation_check_interval: IggyDuration::ONE_SECOND, + retention: IggyDuration::new_from_secs(RETENTION_SECS), + } +} + +fn config_unlimited_size() -> LogRotationTestConfig { + LogRotationTestConfig { + name: "log_unlimited_size".to_string(), + max_single_log_size: IggyByteSize::new(0), + max_total_log_size: IggyByteSize::new(400_000), + rotation_check_interval: IggyDuration::ONE_SECOND, + retention: IggyDuration::new_from_secs(RETENTION_SECS), + } +} + +fn config_unlimited_archives() -> LogRotationTestConfig { + LogRotationTestConfig { + name: "log_unlimited_archives".to_string(), + max_single_log_size: IggyByteSize::new(100_000), + max_total_log_size: IggyByteSize::new(0), + rotation_check_interval: IggyDuration::ONE_SECOND, + retention: IggyDuration::new_from_secs(RETENTION_SECS), + } +} + +fn config_special_scenario() -> LogRotationTestConfig { + LogRotationTestConfig { + name: "log_special_scenario".to_string(), + max_single_log_size: IggyByteSize::new(0), + max_total_log_size: IggyByteSize::new(0), + rotation_check_interval: IggyDuration::ONE_SECOND, + retention: IggyDuration::new_from_secs(RETENTION_SECS), + } +} + +#[test_matrix( + [config_regular_rotation(), config_unlimited_size(), config_unlimited_archives(), config_special_scenario()] +)] +#[tokio::test] +#[parallel] +async fn log_rotation_should_be_valid(present_log_config: LogRotationTestConfig) { + let mut extra_envs = HashMap::new(); + extra_envs.insert( + "IGGY_SYSTEM_LOGGING_MAX_FILE_SIZE".to_string(), + format!("{}", present_log_config.max_single_log_size), + ); + extra_envs.insert( + "IGGY_SYSTEM_LOGGING_MAX_TOTAL_SIZE".to_string(), + format!("{}", present_log_config.max_total_log_size), + ); + extra_envs.insert( + "IGGY_SYSTEM_LOGGING_ROTATION_CHECK_INTERVAL".to_string(), + format!("{}", present_log_config.rotation_check_interval), + ); + extra_envs.insert( + "IGGY_SYSTEM_LOGGING_RETENTION".to_string(), + format!("{}", present_log_config.retention), + ); + + let mut test_server = TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4); + test_server.start(); + + let server_addr = test_server.get_raw_tcp_addr().unwrap(); + let client_factory = TcpClientFactory { + server_addr, + ..Default::default() + }; + + let log_dir = format!("{}/logs", test_server.get_local_data_path()); + + test_server.assert_running(); + run(&client_factory, &log_dir, present_log_config).await; +} + +pub async fn run( + client_factory: &dyn ClientFactory, + log_dir: &str, + present_log_config: LogRotationTestConfig, +) { + let done_status = false; + let present_log_test_title = present_log_config.name.clone(); + let log_path = Path::new(log_dir); + assert!( + log_path.exists() && log_path.is_dir(), + "failed::no_such_directory => {log_dir}", + ); + + let client = init_valid_client(client_factory).await; + assert!( + client.is_ok(), + "failed::client_initialize => {:?}", + client.as_ref().err(), + ); + + let generator_result = generate_enough_logs(client.as_ref().unwrap()).await; + assert!( + generator_result.is_ok(), + "failed::generate_logs => {:?}", + generator_result.as_ref().err(), + ); + + nocapture_observer(log_path, &present_log_test_title, done_status).await; + sleep(present_log_config.rotation_check_interval.get_duration()).await; + + let rotation_result = validate_log_rotation_rules(log_path, present_log_config).await; + assert!( + rotation_result.is_ok(), + "failed::rotation_check => {:?}", + rotation_result.as_ref().err(), + ); + + nocapture_observer(log_path, &present_log_test_title, !done_status).await; +} + +async fn init_valid_client(client_factory: &dyn ClientFactory) -> Result { + let operation_timeout = IggyDuration::new(Duration::from_secs(OPERATION_TIMEOUT_SECS)); + let client_wrapper = timeout( + operation_timeout.get_duration(), + client_factory.create_client(), + ) + .await + .map_err(|_| "ClientWrapper creation timed out")?; + + timeout(operation_timeout.get_duration(), client_wrapper.connect()) + .await + .map_err(|_| "Client connection timed out")? + .map_err(|e| format!("Client connection failed: {e:?}"))?; + + let client = IggyClient::create(client_wrapper, None, None); + timeout(operation_timeout.get_duration(), login_root(&client)) + .await + .map_err(|e| format!("Root user login timed out: {e:?}"))?; + + Ok(client) +} + +/// Loop through the creation and deletion of streams / topics +/// to trigger server business operations, thereby generating +/// sufficient log data to meet the trigger conditions for the +/// log rotation test. +async fn generate_enough_logs(client: &IggyClient) -> Result<(), String> { + for i in 0..OPERATION_LOOP_COUNT { + let stream_name = format!("{STREAM_NAME}-{i}"); + let topic_name = format!("{TOPIC_NAME}-{i}"); + + client + .create_stream(&stream_name) + .await + .map_err(|e| format!("Failed to create {stream_name}: {e}"))?; + + let stream_identifier = Identifier::named(&stream_name) + .map_err(|e| format!("Failed to create stream label {e}"))?; + + client + .create_topic( + &stream_identifier, + &topic_name, + PARTITIONS_COUNT, + CompressionAlgorithm::default(), + None, + IggyExpiry::NeverExpire, + MaxTopicSize::Unlimited, + ) + .await + .map_err(|e| format!("Failed to create topic {topic_name}: {e}"))?; + + client + .delete_stream(&stream_identifier) + .await + .map_err(|e| format!("Failed to remove stream {stream_name}: {e}"))?; + } + + Ok(()) +} + +async fn validate_log_rotation_rules( + log_dir: &Path, + present_log_config: LogRotationTestConfig, +) -> Result<(), String> { + let log_dir_display = log_dir.display(); + let mut dir_entries = fs::read_dir(log_dir) + .await + .map_err(|e| format!("Failed to read log directory '{log_dir_display}': {e}",))?; + + let mut valid_log_files = Vec::new(); + while let Some(entry) = dir_entries.next_entry().await.map_err(|e| { + format!("Failed to read next entry in log directory '{log_dir_display}': {e}",) + })? { + let file_path = entry.path(); + + if !file_path.is_file() { + continue; + } + + let file_name = match file_path.file_name().and_then(|name| name.to_str()) { + Some(name) => name, + None => continue, + }; + + if is_valid_iggy_log_file(file_name) { + valid_log_files.push(file_path); + } + } + + if valid_log_files.is_empty() { + return Err(format!( + "No valid Iggy log files found in directory '{}'. Expected files matching '{}' (original) or '{}.' (archived).", + log_dir_display, IGGY_LOG_BASE_NAME, IGGY_LOG_BASE_NAME + )); + } + + // logger.rs => tracing_appender::non_blocking(file_appender); + // The delay in log writing in Iggy mainly depends on the processing speed + // of background threads and the operating system's I/O scheduling, which + // means that the actual size of written logs may be slightly larger than + // expected. So there ignores tiny minor overflow by comparing integer KB + // values instead of exact bytes. + + let mut total_log_size = IggyByteSize::new(0); + let max_single_kb = present_log_config.max_single_log_size.as_bytes_u64() / FROM_BYTES_TO_KB; + let max_total_kb = present_log_config.max_total_log_size.as_bytes_u64() / FROM_BYTES_TO_KB; + let present_file_amount = valid_log_files.len(); + + if max_single_kb == 0 && present_file_amount > 1 { + return Err(format!( + "Log size should be unlimited if `max_file_size` is set to 0, found {} files.", + present_file_amount, + )); + } else if max_total_kb == 0 && max_single_kb != 0 { + if present_file_amount as u64 <= 1 { + return Err(format!( + "Archives should be unlimited if `max_total_size` is set to 0, found {} files.", + present_file_amount, + )); + } + } else { + for log_file in valid_log_files { + let file_metadata = fs::metadata(&log_file).await.map_err(|e| { + format!( + "Failed to get metadata for file '{}': {}", + log_file.display(), + e + ) + })?; + + let file_size_bytes = file_metadata.len(); + if max_single_kb != 0 { + let current_single_kb = file_size_bytes / FROM_BYTES_TO_KB; + if current_single_kb > max_single_kb { + return Err(format!( + "Single log file exceeds maximum allowed size: '{}'", + log_file.display() + )); + } + } + + total_log_size += IggyByteSize::new(file_size_bytes); + } + } + + let current_total_kb = total_log_size.as_bytes_u64() / FROM_BYTES_TO_KB; + if max_total_kb != 0 && max_single_kb != 0 && current_total_kb > max_total_kb { + return Err(format!( + "Total log size exceeds maximum:{} expected: '{}'KB", + log_dir_display, max_total_kb, + )); + } else if max_total_kb != 0 + && max_single_kb != 0 + && present_file_amount as u64 > max_total_kb / max_single_kb + { + return Err(format!( + "Total log file amount exceeds:{} expected: '{}'", + log_dir_display, + max_total_kb / max_single_kb, + )); + } + + Ok(()) +} + +fn is_valid_iggy_log_file(file_name: &str) -> bool { + if file_name == IGGY_LOG_BASE_NAME { + return true; + } + + let archive_log_prefix = format!("{}.", IGGY_LOG_BASE_NAME); + if file_name.starts_with(&archive_log_prefix) { + let numeric_suffix = &file_name[archive_log_prefix.len()..]; + return !numeric_suffix.is_empty() && numeric_suffix.chars().all(|c| c.is_ascii_digit()); + } + false +} + +/// Solely for manual && direct observation of file status to +/// reduce debugging overhead. Due to the different nature of +/// asynchronous tasks, the output order of scenarios may be +/// mixed, but the mutex can prevent messy terminal output. +async fn nocapture_observer(log_path: &Path, title: &str, done: bool) -> () { + let _lock = PRINT_LOCK.lock().await; + eprintln!( + "\n{:>4}\x1b[33m Size\x1b[0m <-> \x1b[33mPath\x1b[0m && server::specific::log_rotation_should_be_valid::\x1b[33m{}\x1b[0m", + "", title, + ); + + let mut dir_entries = fs::read_dir(log_path).await.unwrap(); + while let Some(entry) = dir_entries.next_entry().await.unwrap() { + let file_path = entry.path(); + if file_path.is_file() { + let meta = fs::metadata(&file_path).await.unwrap(); + eprintln!( + "{:>6} KB <-> {:<50}", + meta.len() / FROM_BYTES_TO_KB, + file_path.display() + ); + } + } + + if done { + eprintln!( + "\n\x1b[32m [Passed]\x1b[0m <-> {:<25} <{:->45}>\n", + title, "", + ); + } +} diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 3253493c49..b04866f372 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -29,6 +29,7 @@ pub mod create_message_payload; pub mod cross_protocol_pat_scenario; pub mod delete_segments_scenario; pub mod encryption_scenario; +pub mod log_rotation_scenario; pub mod message_headers_scenario; pub mod message_size_scenario; pub mod offset_scenario; diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index 3e68494884..afd4a3a0c5 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -134,7 +135,7 @@ async fn tcp_tls_self_signed_scenario_should_be_valid() { .await .expect("Failed to connect TLS client with self-signed cert"); - let client = iggy::clients::client::IggyClient::create(ClientWrapper::Iggy(client), None, None); + let client = IggyClient::create(ClientWrapper::Iggy(client), None, None); tcp_tls_scenario::run(&client).await; } diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 6789e6cb0c..7f84a50b68 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -66,6 +66,7 @@ error_set = { workspace = true } figlet-rs = { workspace = true } figment = { workspace = true } flume = { workspace = true } +fs2 = "0.4.3" futures = { workspace = true } hash32 = { workspace = true } human-repr = { workspace = true } @@ -89,6 +90,7 @@ rand = { workspace = true } reqwest = { workspace = true, features = ["rustls-tls-no-provider"] } ringbuffer = { workspace = true } rmp-serde = { workspace = true } +rolling-file = "0.2.0" rust-embed = { workspace = true, optional = true } rustls = { workspace = true } rustls-pemfile = { workspace = true } diff --git a/core/server/config.toml b/core/server/config.toml index cec71ddc74..3949b9cd5b 100644 --- a/core/server/config.toml +++ b/core/server/config.toml @@ -363,10 +363,24 @@ level = "info" # When enabled, logs are stored in {system.path}/{system.logging.path} (default: local_data/logs). file_enabled = true -# Maximum size of the log files before rotation. -max_size = "512 MB" - -# Time to retain log files before deletion. +# Maximum size of a single log file before rotation occurs. When a log +# file reaches this size, it will be rotated (closed and a new file +# created). This setting works together with max_total_size to control +# log storage. You can set it to 0 to enable unlimited size of single +# log, but all logs will be written to a single file, thus disabling +# log rotation. Please configure 0 with caution, esp. RUST_LOG > debug +max_file_size = "500 MB" + +# Maximum total size of all log files. When this size is reached, +# the oldest log files will be deleted first. Set it to 0 to allow +# an unlimited number of archived logs. This does not disable time +# based log rotation or per-log-file size limits. +max_total_size = "4 GB" + +# Time interval for checking log rotation status. Avoid less than 1s. +rotation_check_interval = "1 h" + +# Time to retain log files before deletion. Avoid less than 1s, too. retention = "7 days" # Interval for printing system information to the log. diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index af283442e8..ce9a84ab44 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -402,7 +402,14 @@ impl Default for LoggingConfig { path: SERVER_CONFIG.system.logging.path.parse().unwrap(), level: SERVER_CONFIG.system.logging.level.parse().unwrap(), file_enabled: SERVER_CONFIG.system.logging.file_enabled, - max_size: SERVER_CONFIG.system.logging.max_size.parse().unwrap(), + max_file_size: SERVER_CONFIG.system.logging.max_file_size.parse().unwrap(), + max_total_size: SERVER_CONFIG.system.logging.max_total_size.parse().unwrap(), + rotation_check_interval: SERVER_CONFIG + .system + .logging + .rotation_check_interval + .parse() + .unwrap(), retention: SERVER_CONFIG.system.logging.retention.parse().unwrap(), sysinfo_print_interval: SERVER_CONFIG .system diff --git a/core/server/src/configs/displays.rs b/core/server/src/configs/displays.rs index 8973b356af..515011c255 100644 --- a/core/server/src/configs/displays.rs +++ b/core/server/src/configs/displays.rs @@ -170,7 +170,7 @@ impl Display for ServerConfig { } impl Display for MessageSaverConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, "{{ enabled: {}, enforce_fsync: {}, interval: {} }}", @@ -249,11 +249,13 @@ impl Display for LoggingConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{{ path: {}, level: {}, file_enabled: {}, max_size: {}, retention: {} }}", + "{{ path: {}, level: {}, file_enabled: {}, max_file_size: {}, max_total_size: {}, rotation_check_interval: {}, retention: {} }}", self.path, self.level, self.file_enabled, - self.max_size.as_human_string_with_zero_as_unlimited(), + self.max_file_size.as_human_string_with_zero_as_unlimited(), + self.max_total_size.as_human_string_with_zero_as_unlimited(), + self.rotation_check_interval, self.retention ) } diff --git a/core/server/src/configs/system.rs b/core/server/src/configs/system.rs index f8c7423ed9..8e12059671 100644 --- a/core/server/src/configs/system.rs +++ b/core/server/src/configs/system.rs @@ -86,7 +86,12 @@ pub struct LoggingConfig { pub level: String, pub file_enabled: bool, #[config_env(leaf)] - pub max_size: IggyByteSize, + pub max_file_size: IggyByteSize, + #[config_env(leaf)] + pub max_total_size: IggyByteSize, + #[config_env(leaf)] + #[serde_as(as = "DisplayFromStr")] + pub rotation_check_interval: IggyDuration, #[config_env(leaf)] #[serde_as(as = "DisplayFromStr")] pub retention: IggyDuration, diff --git a/core/server/src/configs/validators.rs b/core/server/src/configs/validators.rs index 987b0d86aa..ceb0fe30d0 100644 --- a/core/server/src/configs/validators.rs +++ b/core/server/src/configs/validators.rs @@ -22,7 +22,7 @@ use super::server::{ DataMaintenanceConfig, MessageSaverConfig, MessagesMaintenanceConfig, TelemetryConfig, }; use super::sharding::{CpuAllocation, ShardingConfig}; -use super::system::{CompressionConfig, PartitionConfig}; +use super::system::{CompressionConfig, LoggingConfig, PartitionConfig}; use crate::configs::COMPONENT; use crate::configs::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig}; use crate::configs::sharding::NumaTopology; @@ -82,6 +82,13 @@ impl Validatable for ServerConfig { format!("{COMPONENT} (error: {e}) - failed to validate cluster config") })?; + self.system + .logging + .validate() + .error(|e: &ConfigurationError| { + format!("{COMPONENT} (error: {e}) - failed to validate logging config") + })?; + let topic_size = match self.system.topic.max_size { MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()), MaxTopicSize::Unlimited => Ok(u64::MAX), @@ -250,6 +257,44 @@ impl Validatable for PersonalAccessTokenConfig { } } +impl Validatable for LoggingConfig { + fn validate(&self) -> Result<(), ConfigurationError> { + if self.level.is_empty() { + error!("system.logging.level is supposed be configured"); + return Err(ConfigurationError::InvalidConfigurationValue); + } + + if self.retention.as_secs() < 1 { + error!( + "Configured system.logging.retention {} is less than minimum 1 second", + self.retention + ); + return Err(ConfigurationError::InvalidConfigurationValue); + } + + if self.rotation_check_interval.as_secs() < 1 { + error!( + "Configured system.logging.rotation_check_interval {} is less than minimum 1 second", + self.rotation_check_interval + ); + return Err(ConfigurationError::InvalidConfigurationValue); + } + + let max_total_size_unlimited = self.max_total_size.as_bytes_u64() == 0; + if !max_total_size_unlimited + && self.max_file_size.as_bytes_u64() > self.max_total_size.as_bytes_u64() + { + error!( + "Configured system.logging.max_total_size {} is less than system.logging.max_file_size {}", + self.max_total_size, self.max_file_size + ); + return Err(ConfigurationError::InvalidConfigurationValue); + } + + Ok(()) + } +} + impl Validatable for MemoryPoolConfig { fn validate(&self) -> Result<(), ConfigurationError> { if self.enabled && self.size == 0 { diff --git a/core/server/src/log/logger.rs b/core/server/src/log/logger.rs index 4c0e0691eb..b9b4d2aa6c 100644 --- a/core/server/src/log/logger.rs +++ b/core/server/src/log/logger.rs @@ -1,4 +1,5 @@ -/* Licensed to the Apache Software Foundation (ASF) under one +/* + * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -21,6 +22,7 @@ use crate::configs::server::{TelemetryConfig, TelemetryTransport}; use crate::configs::system::LoggingConfig; use crate::log::runtime::CompioRuntime; use crate::server_error::LogError; +use iggy_common::{IggyByteSize, IggyDuration}; use opentelemetry::KeyValue; use opentelemetry::global; use opentelemetry::trace::TracerProvider; @@ -30,10 +32,14 @@ use opentelemetry_sdk::Resource; use opentelemetry_sdk::logs::log_processor_with_async_runtime; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::span_processor_with_async_runtime; +use rolling_file::{BasicRollingFileAppender, RollingConditionBasic}; +use std::fs; use std::io::{self, Write}; use std::path::PathBuf; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Mutex}; -use tracing::{info, trace}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::{debug, error, info, trace, warn}; use tracing_appender::non_blocking::WorkerGuard; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::field::{RecordFields, VisitOutput}; @@ -47,15 +53,16 @@ use tracing_subscriber::{ }; const IGGY_LOG_FILE_PREFIX: &str = "iggy-server.log"; +const ONE_HUNDRED_THOUSAND: u64 = 100_000; // Writer that does nothing struct NullWriter; impl Write for NullWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { + fn write(&mut self, buf: &[u8]) -> io::Result { Ok(buf.len()) } - fn flush(&mut self) -> std::io::Result<()> { + fn flush(&mut self) -> io::Result<()> { Ok(()) } } @@ -63,13 +70,13 @@ impl Write for NullWriter { // Wrapper around Arc>> to implement Write struct VecStringWriter(Arc>>); impl Write for VecStringWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { + fn write(&mut self, buf: &[u8]) -> io::Result { let mut lock = self.0.lock().unwrap(); lock.push(String::from_utf8_lossy(buf).into_owned()); Ok(buf.len()) } - fn flush(&mut self) -> std::io::Result<()> { + fn flush(&mut self) -> io::Result<()> { // Just nop, we don't need to flush anything Ok(()) } @@ -128,6 +135,9 @@ pub struct Logging { otel_traces_reload_handle: Option>, early_logs_buffer: Arc>>, + rotation_should_stop: Arc, + rotation_thread: Option>, + rotation_stop_sender: Arc>>>, } impl Logging { @@ -140,7 +150,10 @@ impl Logging { env_filter_reload_handle: None, otel_logs_reload_handle: None, otel_traces_reload_handle: None, + rotation_thread: None, + rotation_stop_sender: Arc::new(Mutex::new(None)), early_logs_buffer: Arc::new(Mutex::new(vec![])), + rotation_should_stop: Arc::new(AtomicBool::new(false)), } } @@ -213,7 +226,7 @@ impl Logging { // Use the rolling appender to avoid having a huge log file. // Make sure logs are dumped to the file during graceful shutdown. - trace!("Logging config: {}", config); + trace!("Logging config: {config}"); // Reload EnvFilter with config level if RUST_LOG is not set. // Config level supports EnvFilter syntax (e.g., "warn,server=debug,iggy=trace"). @@ -232,7 +245,7 @@ impl Logging { }; // Initialize non-blocking stdout layer - let (non_blocking_stdout, stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); + let (non_blocking_stdout, stdout_guard) = tracing_appender::non_blocking(io::stdout()); let stdout_layer = fmt::Layer::default() .with_ansi(true) .event_format(Self::get_log_format()) @@ -252,9 +265,51 @@ impl Logging { let logs_path = if config.file_enabled { let base_directory = PathBuf::from(base_directory); let logs_subdirectory = PathBuf::from(config.path.clone()); - let logs_path = base_directory.join(logs_subdirectory.clone()); - let file_appender = - tracing_appender::rolling::hourly(logs_path.clone(), IGGY_LOG_FILE_PREFIX); + let logs_subdirectory = logs_subdirectory + .canonicalize() + .unwrap_or(logs_subdirectory); + let logs_path = base_directory.join(logs_subdirectory); + + if let Err(e) = fs::create_dir_all(&logs_path) { + warn!("Failed to create logs directory {logs_path:?}: {e}"); + return Err(LogError::FileReloadFailure); + } + + // Check available disk space, at least 10MB + let min_disk_space: u64 = ONE_HUNDRED_THOUSAND * 100; + if let Ok(available_space) = fs2::available_space(&logs_path) { + if available_space < min_disk_space { + warn!( + "Low disk space for logs. Available: {available_space} bytes, Recommended: {min_disk_space} bytes" + ); + } + } else { + warn!("Failed to check available disk space for logs directory: {logs_path:?}"); + } + + let max_files = Self::calculate_max_files(config.max_total_size, config.max_file_size); + + // If max_file_size == 0, then keep interpreting behavior as same + // as fn IggyByteSize::as_human_string_with_zero_as_unlimited do. + // This will cover all log rotations if expecting unlimited. + let mut condition_builder = RollingConditionBasic::new(); + let max_file_size_bytes = config.max_file_size.as_bytes_u64(); + + if max_file_size_bytes != 0 { + condition_builder = condition_builder.max_size(max_file_size_bytes).hourly(); + } + let condition = condition_builder; + + let file_appender = BasicRollingFileAppender::new( + logs_path.join(IGGY_LOG_FILE_PREFIX), + condition, + max_files, + ) + .map_err(|e| { + error!("Failed to create file appender: {e}"); + LogError::FileReloadFailure + })?; + let (mut non_blocking_file, file_guard) = tracing_appender::non_blocking(file_appender); self.dump_to_file(&mut non_blocking_file); @@ -284,9 +339,11 @@ impl Logging { self.init_telemetry(telemetry_config)?; } + self.rotation_thread = self.install_log_rotation_handler(config, logs_path.as_ref()); + if let Some(logs_path) = logs_path { info!( - "Logging initialized, logs will be stored at: {logs_path:?}. Logs will be rotated hourly. Log filter: {log_filter}." + "Logging initialized, logs will be stored at: {logs_path:?}. Logs will be rotated based on size. Log filter: {log_filter}." ); } else { info!("Logging initialized (file output disabled). Log filter: {log_filter}."); @@ -387,8 +444,8 @@ impl Logging { .expect("Failed to modify telemetry traces layer"); info!( - "Telemetry initialized with service name: {}", - telemetry_config.service_name + "Telemetry initialized with service name: {config_service_name}", + config_service_name = telemetry_config.service_name ); Ok(()) } @@ -397,10 +454,6 @@ impl Logging { Format::default().with_thread_names(true) } - fn _install_log_rotation_handler(&self) { - todo!("Implement log rotation handler based on size and retention time"); - } - fn print_build_info() { if option_env!("IGGY_CI_BUILD") == Some("true") { let hash = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown"); @@ -408,8 +461,7 @@ impl Logging { let rust_version = option_env!("VERGEN_RUSTC_SEMVER").unwrap_or("unknown"); let target = option_env!("VERGEN_CARGO_TARGET_TRIPLE").unwrap_or("unknown"); info!( - "Version: {VERSION}, hash: {}, built at: {} using rust version: {} for target: {}", - hash, built_at, rust_version, target + "Version: {VERSION}, hash: {hash}, built at: {built_at} using rust version: {rust_version} for target: {target}" ); } else { info!( @@ -417,6 +469,281 @@ impl Logging { ) } } + + fn calculate_max_files( + max_total_size_bytes: IggyByteSize, + max_file_size_bytes: IggyByteSize, + ) -> usize { + if max_total_size_bytes == 0 { + // If the third attribute of BasicRollingFileAppender::new() + // is `usize::MAX` then it would reach iter capability. + ONE_HUNDRED_THOUSAND as usize + } else if max_file_size_bytes == 0 { + 1 + } else { + let max_files = + max_total_size_bytes.as_bytes_u64() / max_file_size_bytes.as_bytes_u64(); + max_files.clamp(1, ONE_HUNDRED_THOUSAND) as usize + } + } + + fn install_log_rotation_handler( + &self, + config: &LoggingConfig, + logs_path: Option<&PathBuf>, + ) -> Option> { + let logs_path = logs_path?; + let path = logs_path.to_path_buf(); + let max_total_size = config.max_total_size; + let max_file_size = config.max_file_size; + let rotation_check_interval = config.rotation_check_interval; + let retention = config.retention; + let should_stop = Arc::clone(&self.rotation_should_stop); + + let (tx, rx) = std::sync::mpsc::channel::<()>(); + *self.rotation_stop_sender.lock().unwrap() = Some(tx.clone()); + + let handle = std::thread::Builder::new() + .name("log-rotation".to_string()) + .spawn(move || { + Self::run_log_rotation_loop( + path, + retention, + max_total_size, + max_file_size, + rotation_check_interval, + should_stop, + rx, + ) + }) + .expect("Failed to spawn log rotation thread"); + + Some(handle) + } + + fn run_log_rotation_loop( + path: PathBuf, + retention: IggyDuration, + max_total_size: IggyByteSize, + max_file_size: IggyByteSize, + check_interval: IggyDuration, + should_stop: Arc, + rx: std::sync::mpsc::Receiver<()>, + ) { + loop { + if should_stop.load(std::sync::atomic::Ordering::Relaxed) { + debug!("Log rotation thread detected stop flag, exiting"); + break; + } + + match rx.recv_timeout(check_interval.get_duration()) { + Ok(_) => { + debug!("Log rotation thread received channel stop signal, exiting"); + break; + } + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + Self::cleanup_log_files(&path, retention, max_total_size, max_file_size); + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { + warn!("Log rotation channel disconnected, exiting thread"); + break; + } + } + } + + debug!("Log rotation thread exited gracefully"); + } + + fn read_log_files(logs_path: &PathBuf) -> Vec<(fs::DirEntry, SystemTime, Duration, u64)> { + let entries = match fs::read_dir(logs_path) { + Ok(entries) => entries, + Err(e) => { + warn!("Failed to read log directory {logs_path:?}: {e}"); + return Vec::new(); + } + }; + + let mut file_entries = Vec::new(); + + for entry in entries.flatten() { + if let Some(file_name) = entry.file_name().to_str() { + if file_name == IGGY_LOG_FILE_PREFIX { + // Skip the actively written primary log file + continue; + } + if !file_name.starts_with(IGGY_LOG_FILE_PREFIX) { + continue; + } + } else { + continue; + } + + let metadata = match entry.metadata() { + Ok(metadata) => metadata, + Err(e) => { + warn!( + "Failed to get metadata for {entry_path:?}: {e}", + entry_path = entry.path() + ); + continue; + } + }; + + if !metadata.is_file() { + continue; + } + + let modified = match metadata.modified() { + Ok(modified) => modified, + Err(e) => { + warn!( + "Failed to get modification time for {entry_path:?}: {e}", + entry_path = entry.path() + ); + continue; + } + }; + + let elapsed = match modified.duration_since(UNIX_EPOCH) { + Ok(elapsed) => elapsed, + Err(e) => { + warn!( + "Failed to calculate elapsed time for {entry_path:?}: {e}", + entry_path = entry.path() + ); + continue; + } + }; + + let file_size = metadata.len(); + file_entries.push((entry, modified, elapsed, file_size)); + } + + file_entries + } + + fn cleanup_log_files( + logs_path: &PathBuf, + retention: IggyDuration, + max_total_size: IggyByteSize, + max_file_size: IggyByteSize, + ) { + debug!("Starting log cleanup for directory: {logs_path:?}"); + debug!( + "retention: {retention:?}, max_total_size: {max_total_size} bytes, max_single_file_size: {max_file_size} bytes" + ); + + let mut file_entries = Self::read_log_files(logs_path); + debug!( + "Processed {file_entries_len} log files from directory: {logs_path:?}", + file_entries_len = file_entries.len(), + ); + + let mut removed_files_count = 0; + let cutoff = if !retention.is_zero() { + match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(now) => Some(now - retention.get_duration()), + Err(e) => { + warn!("Failed to get current time: {e}"); + return; + } + } + } else { + None + }; + + let mut expired_file_indices = Vec::new(); + for (idx, tuple) in file_entries.iter().enumerate() { + let entry = &tuple.0; + let elapsed = &tuple.2; + + let mut need_remove = false; + if let Some(cutoff) = &cutoff + && *elapsed < *cutoff + { + need_remove = true; + debug!( + "Mark old log file for remove: {entry_path:?}", + entry_path = entry.path() + ); + } + + if need_remove { + expired_file_indices.push(idx); + } + } + + for &idx in expired_file_indices.iter().rev() { + let entry = &file_entries[idx]; + if fs::remove_file(entry.0.path()).is_ok() { + debug!( + "Removed log file: {entry_path:?}", + entry_path = entry.0.path() + ); + removed_files_count += 1; + file_entries.remove(idx); + } else { + warn!( + "Failed to remove log file {entry_path:?}", + entry_path = entry.0.path() + ); + } + } + + let total_size = file_entries + .iter() + .map(|(_, _, _, size)| IggyByteSize::new(*size)) + .sum::(); + + let notification = |path: &PathBuf, count: &i32| { + if count > &0 { + info!("Logs cleaned up for directory: {path:?}. Removed {count} files."); + } + }; + + // Setting total max log size to 0 disables only total size + // rotation, with other limits remain effective, including + // per-file size limitation, preserving structural order. + if max_total_size == 0 { + notification(logs_path, &removed_files_count); + return; + } + + if total_size > max_total_size { + file_entries.sort_unstable_by_key(|(_, mtime, _, _)| *mtime); + + let mut remaining_size = total_size; + let mut to_remove = Vec::new(); + + for (idx, (_entry, _, _, fsize)) in file_entries.iter().enumerate() { + if remaining_size <= max_total_size { + break; + } + to_remove.push((idx, *fsize)); + remaining_size = remaining_size.saturating_sub(&IggyByteSize::from(*fsize)); + } + + for (idx, fsize) in to_remove.iter().rev() { + let entry = &file_entries[*idx]; + if fs::remove_file(entry.0.path()).is_ok() { + debug!( + "Removed log file (size control): {:?} freed {:.2} MiB", + entry.0.path(), + *fsize as f64 / 1_048_576.0 + ); + removed_files_count += 1; + file_entries.remove(*idx); + } else { + warn!( + "Failed to remove log file for size control: {:?}", + entry.0.path() + ); + } + } + } + + notification(logs_path, &removed_files_count); + } } impl Default for Logging { @@ -425,6 +752,29 @@ impl Default for Logging { } } +impl Drop for Logging { + fn drop(&mut self) { + self.rotation_should_stop + .store(true, std::sync::atomic::Ordering::Relaxed); + debug!("Set rotation_should_stop to true for log rotation thread"); + + if let Ok(sender_guard) = self.rotation_stop_sender.lock() + && let Some(ref sender) = *sender_guard + { + let _ = sender.send(()).map_err(|e| { + warn!("Failed to send stop signal to log rotation thread: {e}"); + }); + } + + if let Some(handle) = self.rotation_thread.take() { + match handle.join() { + Ok(_) => debug!("Log rotation thread joined successfully"), + Err(e) => warn!("Failed to join log rotation thread: {e:?}"), + } + } + } +} + // This is a workaround for a bug with `with_ansi` setting in tracing // Bug thread: https://github.com/tokio-rs/tracing/issues/3116 struct NoAnsiFields {} @@ -440,3 +790,88 @@ impl<'writer> FormatFields<'writer> for NoAnsiFields { a.finish() } } + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_log_directory_creation() { + let temp_dir = TempDir::new().expect("Failed to create temporary directory"); + let base_path = temp_dir.path().to_str().unwrap().to_string(); + let log_subdir = "test_logs".to_string(); + + let log_path = PathBuf::from(&base_path).join(&log_subdir); + assert!(!log_path.exists()); + fs::create_dir_all(&log_path).expect("Failed to create log directory"); + assert!(log_path.exists()); + } + + #[test] + fn test_disk_space_check() { + let temp_dir = TempDir::new().expect("Failed to create temporary directory"); + let log_path = temp_dir.path(); + let result = fs2::available_space(log_path); + assert!(result.is_ok()); + + let available_space = result.unwrap(); + assert!(available_space > 0); + } + + #[test] + fn test_calculate_max_files() { + assert_eq!( + Logging::calculate_max_files(IggyByteSize::from(100), IggyByteSize::from(0)), + 1 // Enable unlimited size of single log, the value won't be used actually + ); + assert_eq!( + Logging::calculate_max_files(IggyByteSize::from(0), IggyByteSize::from(100)), + ONE_HUNDRED_THOUSAND as usize // Allow an unlimited number of archived logs + ); + assert_eq!( + Logging::calculate_max_files( + IggyByteSize::from(ONE_HUNDRED_THOUSAND * 10), + IggyByteSize::from(1) + ), + ONE_HUNDRED_THOUSAND as usize // Result should be limited to ONE_HUNDRED_THOUSAND by clamp + ); + assert_eq!( + Logging::calculate_max_files(IggyByteSize::from(1000), IggyByteSize::from(100)), + 10 + ); + assert_eq!( + Logging::calculate_max_files(IggyByteSize::from(500), IggyByteSize::from(100)), + 5 + ); + assert_eq!( + Logging::calculate_max_files(IggyByteSize::from(2000), IggyByteSize::from(100)), + 20 + ); + assert_eq!( + Logging::calculate_max_files(IggyByteSize::from(50), IggyByteSize::from(100)), + 1 + ); + } + + #[test] + fn test_cleanup_log_files_functions() { + use std::time::Duration; + let temp_dir = TempDir::new().expect("Failed to create temporary directory"); + let log_path = temp_dir.path().to_path_buf(); + Logging::cleanup_log_files( + &log_path, + IggyDuration::new(Duration::from_secs(3600)), + IggyByteSize::from(2048 * 1024), + IggyByteSize::from(512 * 1024), + ); + } + + #[test] + fn test_logging_creation() { + let logging = Logging::new(); + assert!(logging.stdout_guard.is_none()); + assert!(logging.file_guard.is_none()); + assert!(logging.env_filter_reload_handle.is_none()); + } +} diff --git a/foreign/cpp/tests/e2e/server.toml b/foreign/cpp/tests/e2e/server.toml index de67367aad..fb5f87b354 100644 --- a/foreign/cpp/tests/e2e/server.toml +++ b/foreign/cpp/tests/e2e/server.toml @@ -270,10 +270,28 @@ path = "logs" # Level of logging detail. Options: "debug", "info", "warn", "error". level = "info" -# Maximum size of the log files before rotation. -max_size = "512 MB" - -# Time to retain log files before deletion. +# Whether to write logs to file. When false, logs are only written to stdout. +# When enabled, logs are stored in {system.path}/{system.logging.path} (default: local_data/logs). +file_enabled = true + +# Maximum size of a single log file before rotation occurs. When a log +# file reaches this size, it will be rotated (closed and a new file +# created). This setting works together with max_total_size to control +# log storage. You can set it to 0 to enable unlimited size of single +# log, but all logs will be written to a single file, thus disabling +# log rotation. Please configure 0 with caution, esp. RUST_LOG > debug +max_file_size = "500 MB" + +# Maximum total size of all log files. When this size is reached, +# the oldest log files will be deleted first. Set it to 0 to allow +# an unlimited number of archived logs. This does not disable time +# based log rotation or per-log-file size limits. +max_total_size = "4 GB" + +# Time interval for checking log rotation status. Avoid less than 1s. +rotation_check_interval = "1 h" + +# Time to retain log files before deletion. Avoid less than 1s, too. retention = "7 days" # Interval for printing system information to the log. From 8ae5018b60430c28c575f43ee64cb17dcb9c879b Mon Sep 17 00:00:00 2001 From: Chenrui Wu Date: Sat, 31 Jan 2026 20:59:03 +0800 Subject: [PATCH 2/2] chore(test): remove the useless pub declaration --- .../integration/tests/server/scenarios/log_rotation_scenario.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/integration/tests/server/scenarios/log_rotation_scenario.rs b/core/integration/tests/server/scenarios/log_rotation_scenario.rs index f5db3ee042..69ccccd418 100644 --- a/core/integration/tests/server/scenarios/log_rotation_scenario.rs +++ b/core/integration/tests/server/scenarios/log_rotation_scenario.rs @@ -130,7 +130,7 @@ async fn log_rotation_should_be_valid(present_log_config: LogRotationTestConfig) run(&client_factory, &log_dir, present_log_config).await; } -pub async fn run( +async fn run( client_factory: &dyn ClientFactory, log_dir: &str, present_log_config: LogRotationTestConfig,