From fe169cb2529119fa8ceb82cecfa7a8c808bb1e3a Mon Sep 17 00:00:00 2001 From: Marten Wijnja Date: Wed, 25 Mar 2026 22:56:19 +0100 Subject: [PATCH 1/2] Make explicit non-passive Sqlite WAL checkpoints The default way SQLite in WAL mode performs checkpoints, is by waiting for quiet time where there are no readers nor writers. Any read-connection can block this passive WAL checkpointing from making progress. We have observed that in production when having large workloads with consumers working on hundreds of chunks concurrently. Whereas the WAL is not expected to grow much beyond 4MiB (that's when the passive autocheckpointing kicks in), we saw WALs of > 850MiB. At that point, reads slow down significantly and that can cause a failure for the system as a whole. To mitigate this, as per the SQLite docs, this PR: - Disables the passive autocheckpointing (leaving it enabled conflicts with manual checkpointing which can then result in a SQLITE_BUSY) - Runs active checkpointing _every second_. It is very fast when there is little-to-no work to do, but under load it is expected that we'll hit the '1000 page mutations' (AKA the 4MiB default WAL size) within a second. - We use the 'RESTART' strategy, together with a `journal_file_limit` that ensures that the WAL will be trimmed down to the max of 4MiB. That means we don't always fully truncate, nor do we keep it at 'whatever the max happened to be'. Doing this checkpointing does mean that every second there is a tiny timeframe in which both write-tasks and also all read-tasks will have to wait. This is unlikely to cause any problems. We now explicitly configure the busy timeout to be 5 seconds, which was the prior implicit default of SQLx/Rusqlite for good measure. --- .../integer_increment_producer.py | 2 +- opsqueue/app/main.rs | 2 ++ opsqueue/src/db/mod.rs | 34 +++++++++++++++++-- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py b/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py index 41f08b5..fc99d21 100644 --- a/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py +++ b/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py @@ -7,7 +7,7 @@ client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/integer_increment") input_iter = range(0, 1_000_000) -output_iter = client.run_submission(input_iter, chunk_size=1000) +output_iter = client.run_submission(input_iter, chunk_size=10_000) # Now do something with the output: # for x in output_iter: diff --git a/opsqueue/app/main.rs b/opsqueue/app/main.rs index 641c813..b11aec4 100644 --- a/opsqueue/app/main.rs +++ b/opsqueue/app/main.rs @@ -47,6 +47,8 @@ pub async fn async_main() { .expect("Timed out while initiating the database"); moro_local::async_scope!(|scope| { + scope.spawn(db_pool.periodically_checkpoint_wal()); + scope.spawn(opsqueue::server::serve_producer_and_consumer( config, &server_addr, diff --git a/opsqueue/src/db/mod.rs b/opsqueue/src/db/mod.rs index 3c3b570..36e210f 100644 --- a/opsqueue/src/db/mod.rs +++ b/opsqueue/src/db/mod.rs @@ -241,6 +241,28 @@ impl DBPools { pub async fn writer_conn(&self) -> sqlx::Result> { self.write_pool.writer_conn().await } + + /// Performas an explicit, non-passive WAL checkpoint + /// We use the 'TRUNCATE' strategy, which will do the most work but will briefly block the writer *and* all readers + /// + /// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint + pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> { + let mut conn = self.writer_conn().await?; + let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);") + .fetch_one(conn.get_inner()) + .await?; + tracing::warn!("WAL checkpoint completed {res:?}"); + Ok(()) + } + + pub async fn periodically_checkpoint_wal(&self) { + const EXPLICIT_WAL_CHECK_INTERVAL: Duration = Duration::from_secs(1); + let mut interval = tokio::time::interval(EXPLICIT_WAL_CHECK_INTERVAL); + loop { + let _ = self.perform_explicit_wal_checkpoint().await; + interval.tick().await; + } + } } /// A connection pool. @@ -366,9 +388,14 @@ fn db_options(database_filename: &str) -> SqliteConnectOptions { .synchronous(SqliteSynchronous::Normal) // Full is not needed because we use WAL mode .busy_timeout(Duration::from_secs(5)) // No query should ever lock for more than 5 seconds .foreign_keys(true) // By default SQLite does not do foreign key checks; we want them to ensure data consistency - .pragma("mmap_size", "134217728") + // Caching: + .pragma("mmap_size", format!("{}", 128 * 1024 * 1024)) .pragma("cache_size", "-1000000") // Cache size of 10⁶ KiB AKA 1GiB (negative value means measured in KiB rather than in multiples of the page size) - // NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk) + // NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk) + // WAL checkpointing + .busy_timeout(Duration::from_secs(5)) // Matches the SQLx default (*not* the sqlite-on-its-own default, which is 'error immediately'), but made explicit as it is subject to change + .pragma("wal_autocheckpoint", "0") // Turn the passive autocheckpointing _off_, as we do our own explicit active checkpointing + .pragma("journal_size_limit", format!("{}", 4 * 1024 * 1024)) // Truncate WAL file down to 4 MiB after checkpointing } async fn ensure_db_exists(database_filename: &str) { @@ -412,6 +439,9 @@ async fn db_connect_read_pool( } /// Connect the writer pool. +/// +/// It intentionally only contains a single connection +/// as SQLite only allows a single write at a time async fn db_connect_write_pool(database_filename: &str) -> WriterPool { SqlitePoolOptions::new() .min_connections(1) From 1f005b9e1d38ab7c380adb83283009fa8b8dfd69 Mon Sep 17 00:00:00 2001 From: Marten Wijnja Date: Thu, 26 Mar 2026 11:49:08 +0100 Subject: [PATCH 2/2] Bump version --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- .../examples/integer_increment/integer_increment_producer.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b88f04c..96a8898 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,7 +2085,7 @@ dependencies = [ [[package]] name = "opsqueue" -version = "0.34.1" +version = "0.34.2" dependencies = [ "anyhow", "arc-swap", @@ -2139,7 +2139,7 @@ dependencies = [ [[package]] name = "opsqueue_python" -version = "0.34.1" +version = "0.34.2" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 41bd67b..07fd513 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "0.34.1" +version = "0.34.2" [workspace.lints.clippy] diff --git a/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py b/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py index fc99d21..f8539df 100644 --- a/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py +++ b/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py @@ -7,7 +7,7 @@ client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/integer_increment") input_iter = range(0, 1_000_000) -output_iter = client.run_submission(input_iter, chunk_size=10_000) +output_iter = client.run_submission(input_iter, chunk_size=1_000) # Now do something with the output: # for x in output_iter: