From 5eaace886ed75c3cbcf1b4e90df05a0d387a3ed8 Mon Sep 17 00:00:00 2001 From: Ryan Fowler Date: Fri, 6 Jun 2025 19:26:13 -0700 Subject: [PATCH] Close pool connections concurrently --- src/pool.rs | 17 ++++++++++++----- tests/tests.rs | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index e8c133e..7bd44ce 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -78,9 +78,16 @@ impl PoolBuilder { /// Specify the number of sqlite connections to open as part of the pool. /// - /// Defaults to the number of logical CPUs of the current system. + /// Defaults to the number of logical CPUs of the current system. Values + /// less than `1` are clamped to `1`. + /// + /// ``` + /// use async_sqlite::PoolBuilder; + /// + /// let builder = PoolBuilder::new().num_conns(2); + /// ``` pub fn num_conns(mut self, num_conns: usize) -> Self { - self.num_conns = Some(num_conns); + self.num_conns = Some(num_conns.max(1)); self } @@ -197,9 +204,9 @@ impl Pool { /// After this method returns, all calls to `self::conn()` or /// `self::conn_mut()` will return an [`Error::Closed`] error. pub async fn close(&self) -> Result<(), Error> { - for client in self.state.clients.iter() { - client.close().await?; - } + let closes = self.state.clients.iter().map(|client| client.close()); + let res = join_all(closes).await; + res.into_iter().collect::, Error>>()?; Ok(()) } diff --git a/tests/tests.rs b/tests/tests.rs index f96195f..4eaf002 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -84,6 +84,8 @@ async_test!(test_journal_mode); async_test!(test_concurrency); async_test!(test_pool); async_test!(test_pool_conn_for_each); +async_test!(test_pool_close_concurrent); +async_test!(test_pool_num_conns_zero_clamps); async fn test_journal_mode() { let tmp_dir = tempfile::tempdir().unwrap(); @@ -227,3 +229,36 @@ async fn test_pool_conn_for_each() { // cleanup pool.close().await.expect("closing client conn"); } + +async fn test_pool_close_concurrent() { + let tmp_dir = tempfile::tempdir().unwrap(); + let pool = PoolBuilder::new() + .path(tmp_dir.path().join("sqlite.db")) + .num_conns(2) + .open() + .await + .expect("pool unable to be opened"); + + let c1 = pool.close(); + let c2 = pool.close(); + futures_util::future::join_all([c1, c2]) + .await + .into_iter() + .collect::, Error>>() + .expect("closing concurrently"); + + let res = pool.conn(|c| c.execute("SELECT 1", ())).await; + assert!(matches!(res, Err(Error::Closed))); +} + +async fn test_pool_num_conns_zero_clamps() { + let tmp_dir = tempfile::tempdir().unwrap(); + let pool = PoolBuilder::new() + .path(tmp_dir.path().join("clamp.db")) + .num_conns(0) + .open() + .await + .expect("pool unable to be opened"); + let results = pool.conn_for_each(|_| Ok(())).await; + assert_eq!(results.len(), 1); +}