Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ members = [
]

[workspace.package]
version = "0.34.1"
version = "0.34.2"


[workspace.lints.clippy]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/integer_increment")

input_iter = range(0, 1_000_000)
output_iter = client.run_submission(input_iter, chunk_size=1000)
output_iter = client.run_submission(input_iter, chunk_size=1_000)

# Now do something with the output:
# for x in output_iter:
Expand Down
2 changes: 2 additions & 0 deletions opsqueue/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub async fn async_main() {
.expect("Timed out while initiating the database");

moro_local::async_scope!(|scope| {
scope.spawn(db_pool.periodically_checkpoint_wal());

scope.spawn(opsqueue::server::serve_producer_and_consumer(
config,
&server_addr,
Expand Down
34 changes: 32 additions & 2 deletions opsqueue/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,28 @@ impl DBPools {
pub async fn writer_conn(&self) -> sqlx::Result<Writer<NoTransaction>> {
self.write_pool.writer_conn().await
}

/// Performas an explicit, non-passive WAL checkpoint
/// We use the 'TRUNCATE' strategy, which will do the most work but will briefly block the writer *and* all readers
///
Comment on lines +246 to +247
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You use the RESTART strategy.

/// c.f. https://www.sqlite.org/pragma.html#pragma_wal_checkpoint
pub async fn perform_explicit_wal_checkpoint(&self) -> sqlx::Result<()> {
let mut conn = self.writer_conn().await?;
let res: (i32, i32, i32) = sqlx::query_as("PRAGMA wal_checkpoint(RESTART);")
.fetch_one(conn.get_inner())
.await?;
tracing::warn!("WAL checkpoint completed {res:?}");
Ok(())
}

pub async fn periodically_checkpoint_wal(&self) {
const EXPLICIT_WAL_CHECK_INTERVAL: Duration = Duration::from_secs(1);
let mut interval = tokio::time::interval(EXPLICIT_WAL_CHECK_INTERVAL);
loop {
let _ = self.perform_explicit_wal_checkpoint().await;
interval.tick().await;
}
}
}

/// A connection pool.
Expand Down Expand Up @@ -366,9 +388,14 @@ fn db_options(database_filename: &str) -> SqliteConnectOptions {
.synchronous(SqliteSynchronous::Normal) // Full is not needed because we use WAL mode
.busy_timeout(Duration::from_secs(5)) // No query should ever lock for more than 5 seconds
.foreign_keys(true) // By default SQLite does not do foreign key checks; we want them to ensure data consistency
.pragma("mmap_size", "134217728")
// Caching:
.pragma("mmap_size", format!("{}", 128 * 1024 * 1024))
.pragma("cache_size", "-1000000") // Cache size of 10⁶ KiB AKA 1GiB (negative value means measured in KiB rather than in multiples of the page size)
// NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk)
// NOTE: we do _not_ set PRAGMA temp_store = 2 (MEMORY) because as long as the page cache has room those will use memory anyway (and if it is full we need the disk)
// WAL checkpointing
.busy_timeout(Duration::from_secs(5)) // Matches the SQLx default (*not* the sqlite-on-its-own default, which is 'error immediately'), but made explicit as it is subject to change
.pragma("wal_autocheckpoint", "0") // Turn the passive autocheckpointing _off_, as we do our own explicit active checkpointing
.pragma("journal_size_limit", format!("{}", 4 * 1024 * 1024)) // Truncate WAL file down to 4 MiB after checkpointing
}

async fn ensure_db_exists(database_filename: &str) {
Expand Down Expand Up @@ -412,6 +439,9 @@ async fn db_connect_read_pool(
}

/// Connect the writer pool.
///
/// It intentionally only contains a single connection
/// as SQLite only allows a single write at a time
async fn db_connect_write_pool(database_filename: &str) -> WriterPool {
SqlitePoolOptions::new()
.min_connections(1)
Expand Down
Loading