diff --git a/Cargo.lock b/Cargo.lock index b88f04c..96a8898 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,7 +2085,7 @@ dependencies = [ [[package]] name = "opsqueue" -version = "0.34.1" +version = "0.34.2" dependencies = [ "anyhow", "arc-swap", @@ -2139,7 +2139,7 @@ dependencies = [ [[package]] name = "opsqueue_python" -version = "0.34.1" +version = "0.34.2" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 41bd67b..07fd513 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "0.34.1" +version = "0.34.2" [workspace.lints.clippy] diff --git a/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py b/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py index 41f08b5..f8539df 100644 --- a/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py +++ b/libs/opsqueue_python/examples/integer_increment/integer_increment_producer.py @@ -7,7 +7,7 @@ client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/integer_increment") input_iter = range(0, 1_000_000) -output_iter = client.run_submission(input_iter, chunk_size=1000) +output_iter = client.run_submission(input_iter, chunk_size=1_000) # Now do something with the output: # for x in output_iter: diff --git a/opsqueue/app/main.rs b/opsqueue/app/main.rs index 641c813..b11aec4 100644 --- a/opsqueue/app/main.rs +++ b/opsqueue/app/main.rs @@ -47,6 +47,8 @@ pub async fn async_main() { .expect("Timed out while initiating the database"); moro_local::async_scope!(|scope| { + scope.spawn(db_pool.periodically_checkpoint_wal()); + scope.spawn(opsqueue::server::serve_producer_and_consumer( config, &server_addr, diff --git a/opsqueue/src/db/mod.rs b/opsqueue/src/db/mod.rs index 3c3b570..36e210f 100644 --- a/opsqueue/src/db/mod.rs +++ b/opsqueue/src/db/mod.rs @@ -241,6 +241,28 @@ impl DBPools { pub async fn writer_conn(&self) -> sqlx::Result> { self.write_pool.writer_conn().await } + + /// Performas an explicit, non-passive WAL checkpoint + /// We use the 'TRUNCATE' strategy, which will do the most work but will briefly block the writer *and* all readers + /// + /// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint + pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> { + let mut conn = self.writer_conn().await?; + let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);") + .fetch_one(conn.get_inner()) + .await?; + tracing::warn!("WAL checkpoint completed {res:?}"); + Ok(()) + } + + pub async fn periodically_checkpoint_wal(&self) { + const EXPLICIT_WAL_CHECK_INTERVAL: Duration = Duration::from_secs(1); + let mut interval = tokio::time::interval(EXPLICIT_WAL_CHECK_INTERVAL); + loop { + let _ = self.perform_explicit_wal_checkpoint().await; + interval.tick().await; + } + } } /// A connection pool. @@ -366,9 +388,14 @@ fn db_options(database_filename: &str) -> SqliteConnectOptions { .synchronous(SqliteSynchronous::Normal) // Full is not needed because we use WAL mode .busy_timeout(Duration::from_secs(5)) // No query should ever lock for more than 5 seconds .foreign_keys(true) // By default SQLite does not do foreign key checks; we want them to ensure data consistency - .pragma("mmap_size", "134217728") + // Caching: + .pragma("mmap_size", format!("{}", 128 * 1024 * 1024)) .pragma("cache_size", "-1000000") // Cache size of 10⁶ KiB AKA 1GiB (negative value means measured in KiB rather than in multiples of the page size) - // NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk) + // NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk) + // WAL checkpointing + .busy_timeout(Duration::from_secs(5)) // Matches the SQLx default (*not* the sqlite-on-its-own default, which is 'error immediately'), but made explicit as it is subject to change + .pragma("wal_autocheckpoint", "0") // Turn the passive autocheckpointing _off_, as we do our own explicit active checkpointing + .pragma("journal_size_limit", format!("{}", 4 * 1024 * 1024)) // Truncate WAL file down to 4 MiB after checkpointing } async fn ensure_db_exists(database_filename: &str) { @@ -412,6 +439,9 @@ async fn db_connect_read_pool( } /// Connect the writer pool. +/// +/// It intentionally only contains a single connection +/// as SQLite only allows a single write at a time async fn db_connect_write_pool(database_filename: &str) -> WriterPool { SqlitePoolOptions::new() .min_connections(1)