From a24cb10a2a2bbee7452603e908d6c36e952e6594 Mon Sep 17 00:00:00 2001 From: Travel Planner Developer Date: Fri, 16 Jan 2026 11:13:00 +0200 Subject: [PATCH 01/20] Synchronous version of Transaction Client. Async functions wrapped with block_on. Signed-off-by: Otto Westerlund --- src/lib.rs | 2 ++ src/transaction/mod.rs | 2 ++ src/transaction/sync_client.rs | 66 ++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 src/transaction/sync_client.rs diff --git a/src/lib.rs b/src/lib.rs index 60dc2956..b4d4a8cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -159,6 +159,8 @@ pub use crate::transaction::Client as TransactionClient; #[doc(inline)] pub use crate::transaction::Snapshot; #[doc(inline)] +pub use crate::transaction::SyncTransactionClient; +#[doc(inline)] pub use crate::transaction::Transaction; #[doc(inline)] pub use crate::transaction::TransactionOptions; diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 5bc8f0e4..0a765939 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -12,6 +12,7 @@ pub use client::Client; pub(crate) use lock::resolve_locks; pub(crate) use lock::HasLocks; pub use snapshot::Snapshot; +pub use sync_client::SyncTransactionClient; pub use transaction::CheckLevel; #[doc(hidden)] pub use transaction::HeartbeatOption; @@ -28,5 +29,6 @@ pub use lock::LockResolver; pub use lock::ResolveLocksContext; pub use lock::ResolveLocksOptions; mod snapshot; +mod sync_client; #[allow(clippy::module_inception)] mod transaction; diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs new file mode 100644 index 00000000..668d9cda --- /dev/null +++ b/src/transaction/sync_client.rs @@ -0,0 +1,66 @@ +use crate::{ + request::plan::CleanupLocksResult, + transaction::{client::Client, ResolveLocksOptions}, + BoundRange, Config, Result, Snapshot, Timestamp, Transaction, TransactionOptions, +}; +use futures::executor::block_on; + +pub struct SyncTransactionClient { + client: Client, +} + +impl SyncTransactionClient { + pub fn new>(pd_endpoints: Vec) -> Result { + Self::new_with_config(pd_endpoints, Config::default()) + } + + pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { + let client = block_on(Client::new_with_config(pd_endpoints, config))?; + Ok(Self { client }) + } + + pub fn begin_optimistic(&self) -> Result { + block_on(self.client.begin_optimistic()) + } + + pub fn begin_pessimistic(&self) -> Result { + block_on(self.client.begin_pessimistic()) + } + + pub fn begin_with_options(&self, options: TransactionOptions) -> Result { + block_on(self.client.begin_with_options(options)) + } + + pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot { + self.client.snapshot(timestamp, options) + } + + pub fn current_timestamp(&self) -> Result { + block_on(self.client.current_timestamp()) + } + + pub fn gc(&self, safepoint: Timestamp) -> Result { + block_on(self.client.gc(safepoint)) + } + + pub fn cleanup_locks( + &self, + range: impl Into, + safepoint: &Timestamp, + options: ResolveLocksOptions, + ) -> Result { + block_on(self.client.cleanup_locks(range, safepoint, options)) + } + + pub fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { + block_on(self.client.unsafe_destroy_range(range)) + } +} + +impl Clone for SyncTransactionClient { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + } + } +} From fda1e8576630824cfa4485d7693d8845e62a740e Mon Sep 17 00:00:00 2001 From: Travel Planner Developer Date: Fri, 16 Jan 2026 12:41:14 +0200 Subject: [PATCH 02/20] sync transaction and snapshot, tests, runtime handling for sync_client Signed-off-by: Otto Westerlund --- src/lib.rs | 4 + src/transaction/mod.rs | 4 + src/transaction/sync_client.rs | 60 +++++-- src/transaction/sync_snapshot.rs | 72 ++++++++ src/transaction/sync_transaction.rs | 127 ++++++++++++++ tests/common/mod.rs | 6 + tests/sync_transaction_tests.rs | 253 ++++++++++++++++++++++++++++ 7 files changed, 509 insertions(+), 17 deletions(-) create mode 100644 src/transaction/sync_snapshot.rs create mode 100644 src/transaction/sync_transaction.rs create mode 100644 tests/sync_transaction_tests.rs diff --git a/src/lib.rs b/src/lib.rs index b4d4a8cd..2afc1c80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -159,6 +159,10 @@ pub use crate::transaction::Client as TransactionClient; #[doc(inline)] pub use crate::transaction::Snapshot; #[doc(inline)] +pub use crate::transaction::SyncSnapshot; +#[doc(inline)] +pub use crate::transaction::SyncTransaction; +#[doc(inline)] pub use crate::transaction::SyncTransactionClient; #[doc(inline)] pub use crate::transaction::Transaction; diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 0a765939..cc538945 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -13,6 +13,8 @@ pub(crate) use lock::resolve_locks; pub(crate) use lock::HasLocks; pub use snapshot::Snapshot; pub use sync_client::SyncTransactionClient; +pub use sync_snapshot::SyncSnapshot; +pub use sync_transaction::SyncTransaction; pub use transaction::CheckLevel; #[doc(hidden)] pub use transaction::HeartbeatOption; @@ -30,5 +32,7 @@ pub use lock::ResolveLocksContext; pub use lock::ResolveLocksOptions; mod snapshot; mod sync_client; +mod sync_snapshot; +mod sync_transaction; #[allow(clippy::module_inception)] mod transaction; diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 668d9cda..c4e9fe08 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -1,12 +1,16 @@ use crate::{ request::plan::CleanupLocksResult, - transaction::{client::Client, ResolveLocksOptions}, - BoundRange, Config, Result, Snapshot, Timestamp, Transaction, TransactionOptions, + transaction::{ + client::Client, sync_snapshot::SyncSnapshot, sync_transaction::SyncTransaction, + ResolveLocksOptions, + }, + BoundRange, Config, Result, Timestamp, TransactionOptions, }; -use futures::executor::block_on; +use std::sync::Arc; pub struct SyncTransactionClient { client: Client, + runtime: Arc, } impl SyncTransactionClient { @@ -15,32 +19,40 @@ impl SyncTransactionClient { } pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { - let client = block_on(Client::new_with_config(pd_endpoints, config))?; - Ok(Self { client }) + let runtime = + Arc::new(tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime")); + let client = runtime.block_on(Client::new_with_config(pd_endpoints, config))?; + Ok(Self { client, runtime }) } - pub fn begin_optimistic(&self) -> Result { - block_on(self.client.begin_optimistic()) + pub fn begin_optimistic(&self) -> Result { + let inner = self.runtime.block_on(self.client.begin_optimistic())?; + Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } - pub fn begin_pessimistic(&self) -> Result { - block_on(self.client.begin_pessimistic()) + pub fn begin_pessimistic(&self) -> Result { + let inner = self.runtime.block_on(self.client.begin_pessimistic())?; + Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } - pub fn begin_with_options(&self, options: TransactionOptions) -> Result { - block_on(self.client.begin_with_options(options)) + pub fn begin_with_options(&self, options: TransactionOptions) -> Result { + let inner = self + .runtime + .block_on(self.client.begin_with_options(options))?; + Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } - pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot { - self.client.snapshot(timestamp, options) + pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> SyncSnapshot { + let inner = self.client.snapshot(timestamp, options); + SyncSnapshot::new(inner, Arc::clone(&self.runtime)) } pub fn current_timestamp(&self) -> Result { - block_on(self.client.current_timestamp()) + self.runtime.block_on(self.client.current_timestamp()) } pub fn gc(&self, safepoint: Timestamp) -> Result { - block_on(self.client.gc(safepoint)) + self.runtime.block_on(self.client.gc(safepoint)) } pub fn cleanup_locks( @@ -49,11 +61,24 @@ impl SyncTransactionClient { safepoint: &Timestamp, options: ResolveLocksOptions, ) -> Result { - block_on(self.client.cleanup_locks(range, safepoint, options)) + self.runtime + .block_on(self.client.cleanup_locks(range, safepoint, options)) } pub fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { - block_on(self.client.unsafe_destroy_range(range)) + self.runtime + .block_on(self.client.unsafe_destroy_range(range)) + } + + #[cfg(feature = "integration-tests")] + pub fn scan_locks( + &self, + safepoint: &Timestamp, + range: impl Into, + batch_size: u32, + ) -> Result> { + self.runtime + .block_on(self.client.scan_locks(safepoint, range, batch_size)) } } @@ -61,6 +86,7 @@ impl Clone for SyncTransactionClient { fn clone(&self) -> Self { Self { client: self.client.clone(), + runtime: Arc::clone(&self.runtime), } } } diff --git a/src/transaction/sync_snapshot.rs b/src/transaction/sync_snapshot.rs new file mode 100644 index 00000000..ac7268a7 --- /dev/null +++ b/src/transaction/sync_snapshot.rs @@ -0,0 +1,72 @@ +use crate::{BoundRange, Key, KvPair, Result, Snapshot, Value}; +use std::sync::Arc; + +/// A synchronous read-only snapshot. +/// +/// This is a wrapper around the async [`Snapshot`] that provides blocking methods. +/// All operations block the current thread until completed. +pub struct SyncSnapshot { + inner: Snapshot, + runtime: Arc, +} + +impl SyncSnapshot { + pub(crate) fn new(inner: Snapshot, runtime: Arc) -> Self { + Self { inner, runtime } + } + + /// Get the value associated with the given key. + pub fn get(&mut self, key: impl Into) -> Result> { + self.runtime.block_on(self.inner.get(key)) + } + + /// Check whether the key exists. + pub fn key_exists(&mut self, key: impl Into) -> Result { + self.runtime.block_on(self.inner.key_exists(key)) + } + + /// Get the values associated with the given keys. + pub fn batch_get( + &mut self, + keys: impl IntoIterator>, + ) -> Result> { + self.runtime.block_on(self.inner.batch_get(keys)) + } + + /// Scan a range, return at most `limit` key-value pairs that lying in the range. + pub fn scan( + &mut self, + range: impl Into, + limit: u32, + ) -> Result> { + self.runtime.block_on(self.inner.scan(range, limit)) + } + + /// Scan a range, return at most `limit` keys that lying in the range. + pub fn scan_keys( + &mut self, + range: impl Into, + limit: u32, + ) -> Result> { + self.runtime.block_on(self.inner.scan_keys(range, limit)) + } + + /// Similar to scan, but in the reverse direction. + pub fn scan_reverse( + &mut self, + range: impl Into, + limit: u32, + ) -> Result> { + self.runtime.block_on(self.inner.scan_reverse(range, limit)) + } + + /// Similar to scan_keys, but in the reverse direction. + pub fn scan_keys_reverse( + &mut self, + range: impl Into, + limit: u32, + ) -> Result> { + self.runtime + .block_on(self.inner.scan_keys_reverse(range, limit)) + } +} diff --git a/src/transaction/sync_transaction.rs b/src/transaction/sync_transaction.rs new file mode 100644 index 00000000..8f541a4a --- /dev/null +++ b/src/transaction/sync_transaction.rs @@ -0,0 +1,127 @@ +use crate::{ + transaction::Mutation, BoundRange, Key, KvPair, Result, Timestamp, Transaction, Value, +}; +use std::sync::Arc; + +/// A synchronous transaction. +/// +/// This is a wrapper around the async [`Transaction`] that provides blocking methods. +/// All operations block the current thread until completed. +pub struct SyncTransaction { + inner: Transaction, + runtime: Arc, +} + +impl SyncTransaction { + pub(crate) fn new(inner: Transaction, runtime: Arc) -> Self { + Self { inner, runtime } + } + + /// Get the value associated with the given key. + pub fn get(&mut self, key: impl Into) -> Result> { + self.runtime.block_on(self.inner.get(key)) + } + + /// Get the value associated with the given key, and lock the key. + pub fn get_for_update(&mut self, key: impl Into) -> Result> { + self.runtime.block_on(self.inner.get_for_update(key)) + } + + /// Check if the given key exists. + pub fn key_exists(&mut self, key: impl Into) -> Result { + self.runtime.block_on(self.inner.key_exists(key)) + } + + /// Get the values associated with the given keys. + pub fn batch_get( + &mut self, + keys: impl IntoIterator>, + ) -> Result> { + self.runtime.block_on(self.inner.batch_get(keys)) + } + + /// Get the values associated with the given keys, and lock the keys. + pub fn batch_get_for_update( + &mut self, + keys: impl IntoIterator>, + ) -> Result> { + self.runtime.block_on(self.inner.batch_get_for_update(keys)) + } + + /// Scan a range and return the key-value pairs. + pub fn scan( + &mut self, + range: impl Into, + limit: u32, + ) -> Result> { + self.runtime.block_on(self.inner.scan(range, limit)) + } + + /// Scan a range and return only the keys. + pub fn scan_keys( + &mut self, + range: impl Into, + limit: u32, + ) -> Result> { + self.runtime.block_on(self.inner.scan_keys(range, limit)) + } + + /// Scan a range in reverse order. + pub fn scan_reverse( + &mut self, + range: impl Into, + limit: u32, + ) -> Result> { + self.runtime.block_on(self.inner.scan_reverse(range, limit)) + } + + /// Scan keys in a range in reverse order. + pub fn scan_keys_reverse( + &mut self, + range: impl Into, + limit: u32, + ) -> Result> { + self.runtime + .block_on(self.inner.scan_keys_reverse(range, limit)) + } + + /// Set the value associated with the given key. + pub fn put(&mut self, key: impl Into, value: impl Into) -> Result<()> { + self.runtime.block_on(self.inner.put(key, value)) + } + + /// Insert the key-value pair. Returns an error if the key already exists. + pub fn insert(&mut self, key: impl Into, value: impl Into) -> Result<()> { + self.runtime.block_on(self.inner.insert(key, value)) + } + + /// Delete the given key. + pub fn delete(&mut self, key: impl Into) -> Result<()> { + self.runtime.block_on(self.inner.delete(key)) + } + + /// Apply multiple mutations atomically. + pub fn batch_mutate(&mut self, mutations: impl IntoIterator) -> Result<()> { + self.runtime.block_on(self.inner.batch_mutate(mutations)) + } + + /// Lock the given keys without associating any values. + pub fn lock_keys(&mut self, keys: impl IntoIterator>) -> Result<()> { + self.runtime.block_on(self.inner.lock_keys(keys)) + } + + /// Commit the transaction. + pub fn commit(&mut self) -> Result> { + self.runtime.block_on(self.inner.commit()) + } + + /// Rollback the transaction. + pub fn rollback(&mut self) -> Result<()> { + self.runtime.block_on(self.inner.rollback()) + } + + /// Send a heart beat message to keep the transaction alive. + pub fn send_heart_beat(&mut self) -> Result { + self.runtime.block_on(self.inner.send_heart_beat()) + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index da37f398..5df73812 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -74,6 +74,12 @@ pub async fn init() -> Result<()> { Ok(()) } +pub fn init_sync() -> Result<()> { + tokio::runtime::Runtime::new() + .expect("Failed to create Tokio runtime") + .block_on(init()) +} + async fn ensure_region_split( keys: impl IntoIterator>, region_count: u32, diff --git a/tests/sync_transaction_tests.rs b/tests/sync_transaction_tests.rs new file mode 100644 index 00000000..dc6b4409 --- /dev/null +++ b/tests/sync_transaction_tests.rs @@ -0,0 +1,253 @@ +#![cfg(feature = "integration-tests")] + +//! Tests for SyncTransactionClient +//! +//! These tests mirror the async TransactionClient tests but use the synchronous API. + +mod common; +use common::*; +use serial_test::serial; +use std::collections::HashMap; +use tikv_client::Config; +use tikv_client::Key; +use tikv_client::Result; +use tikv_client::SyncTransactionClient; +use tikv_client::TransactionOptions; +use tikv_client::Value; + +/// Helper to initialize and return a sync client +fn sync_client() -> Result { + SyncTransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) +} + +#[test] +#[serial] +fn sync_txn_get_timestamp() -> Result<()> { + const COUNT: usize = 1 << 8; + let client = sync_client()?; + + let mut versions = (0..COUNT) + .map(|_| client.current_timestamp()) + .map(|res| res.map(|ts| (ts.physical << 18) + ts.logical)) + .collect::>>()?; + + // Each version should be unique + versions.sort_unstable(); + versions.dedup(); + assert_eq!(versions.len(), COUNT); + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_crud() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + let mut txn = client.begin_optimistic()?; + + // Get non-existent keys + assert!(txn.get("foo".to_owned())?.is_none()); + + // batch_get do not return non-existent entries + assert_eq!( + txn.batch_get(vec!["foo".to_owned(), "bar".to_owned()])? + .count(), + 0 + ); + + txn.put("foo".to_owned(), "bar".to_owned())?; + txn.put("bar".to_owned(), "foo".to_owned())?; + + // Read buffered values + assert_eq!(txn.get("foo".to_owned())?, Some("bar".to_owned().into())); + + let batch_get_res: HashMap = txn + .batch_get(vec!["foo".to_owned(), "bar".to_owned()])? + .map(|pair| (pair.0, pair.1)) + .collect(); + + assert_eq!( + batch_get_res.get(&Key::from("foo".to_owned())), + Some(Value::from("bar".to_owned())).as_ref() + ); + assert_eq!( + batch_get_res.get(&Key::from("bar".to_owned())), + Some(Value::from("foo".to_owned())).as_ref() + ); + + txn.commit()?; + + // Verify the values were committed + let mut txn = client.begin_optimistic()?; + assert_eq!(txn.get("foo".to_owned())?, Some("bar".to_owned().into())); + + // Test delete + txn.delete("foo".to_owned())?; + assert!(txn.get("foo".to_owned())?.is_none()); + txn.commit()?; + + // Verify deletion + let mut txn = client.begin_optimistic()?; + assert!(txn.get("foo".to_owned())?.is_none()); + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_begin_pessimistic() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + let mut txn = client.begin_pessimistic()?; + + txn.put("pessimistic_key".to_owned(), "value".to_owned())?; + assert_eq!( + txn.get("pessimistic_key".to_owned())?, + Some("value".to_owned().into()) + ); + + txn.commit()?; + + // Verify committed + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("pessimistic_key".to_owned())?, + Some("value".to_owned().into()) + ); + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_snapshot() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + + // Write some data + let mut txn = client.begin_optimistic()?; + txn.put("snapshot_key".to_owned(), "initial".to_owned())?; + txn.commit()?; + + // Get snapshot at current timestamp + let ts = client.current_timestamp()?; + let mut snapshot = client.snapshot(ts, TransactionOptions::new_optimistic()); + + // Verify snapshot reads initial value + assert_eq!( + snapshot.get("snapshot_key".to_owned())?, + Some("initial".to_owned().into()) + ); + + // Modify the value + let mut txn = client.begin_optimistic()?; + txn.put("snapshot_key".to_owned(), "updated".to_owned())?; + txn.commit()?; + + // Snapshot still reads old value + assert_eq!( + snapshot.get("snapshot_key".to_owned())?, + Some("initial".to_owned().into()) + ); + + // New transaction reads updated value + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("snapshot_key".to_owned())?, + Some("updated".to_owned().into()) + ); + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_begin_with_options() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + let options = TransactionOptions::new_optimistic(); + let mut txn = client.begin_with_options(options)?; + + txn.put("options_key".to_owned(), "value".to_owned())?; + txn.commit()?; + + // Verify + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("options_key".to_owned())?, + Some("value".to_owned().into()) + ); + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_rollback() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + + // Write initial value + let mut txn = client.begin_optimistic()?; + txn.put("rollback_key".to_owned(), "initial".to_owned())?; + txn.commit()?; + + // Start new transaction and modify + let mut txn = client.begin_optimistic()?; + txn.put("rollback_key".to_owned(), "modified".to_owned())?; + + // Rollback instead of commit + txn.rollback()?; + + // Verify value is still "initial" + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("rollback_key".to_owned())?, + Some("initial".to_owned().into()) + ); + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_clone_client() -> Result<()> { + init_sync()?; + + let client1 = sync_client()?; + let client2 = client1.clone(); + + // Both clients should work independently + let mut txn1 = client1.begin_optimistic()?; + let mut txn2 = client2.begin_optimistic()?; + + txn1.put("clone_key1".to_owned(), "value1".to_owned())?; + txn2.put("clone_key2".to_owned(), "value2".to_owned())?; + + txn1.commit()?; + txn2.commit()?; + + // Verify both writes succeeded + let mut txn = client1.begin_optimistic()?; + assert_eq!( + txn.get("clone_key1".to_owned())?, + Some("value1".to_owned().into()) + ); + assert_eq!( + txn.get("clone_key2".to_owned())?, + Some("value2".to_owned().into()) + ); + txn.rollback()?; + + Ok(()) +} From 9d2f1a1fca9deb8d875044b215c0bcc954043900 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Fri, 16 Jan 2026 13:07:28 +0200 Subject: [PATCH 03/20] Add comments and sign-off-by message Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index c4e9fe08..3e390cc3 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -14,10 +14,12 @@ pub struct SyncTransactionClient { } impl SyncTransactionClient { + /// Synchronous version of `TransactionClient::new`. pub fn new>(pd_endpoints: Vec) -> Result { Self::new_with_config(pd_endpoints, Config::default()) } + /// Synchronous version of `TransactionClient::new_with_config`. pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { let runtime = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime")); @@ -25,16 +27,19 @@ impl SyncTransactionClient { Ok(Self { client, runtime }) } + /// Synchronous version of `TransactionClient::begin_optimistic`. pub fn begin_optimistic(&self) -> Result { let inner = self.runtime.block_on(self.client.begin_optimistic())?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } + /// Synchronous version of `TransactionClient::begin_pessimistic`. pub fn begin_pessimistic(&self) -> Result { let inner = self.runtime.block_on(self.client.begin_pessimistic())?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } + /// Synchronous version of `TransactionClient::begin_with_options`. pub fn begin_with_options(&self, options: TransactionOptions) -> Result { let inner = self .runtime @@ -47,14 +52,17 @@ impl SyncTransactionClient { SyncSnapshot::new(inner, Arc::clone(&self.runtime)) } + /// Synchronous version of `TransactionClient::current_timestamp`. pub fn current_timestamp(&self) -> Result { self.runtime.block_on(self.client.current_timestamp()) } + /// Synchronous version of `TransactionClient::gc`. pub fn gc(&self, safepoint: Timestamp) -> Result { self.runtime.block_on(self.client.gc(safepoint)) } + /// Synchronous version of `TransactionClient::cleanup_locks`. pub fn cleanup_locks( &self, range: impl Into, @@ -65,6 +73,7 @@ impl SyncTransactionClient { .block_on(self.client.cleanup_locks(range, safepoint, options)) } + /// Synchronous version of `TransactionClient::unsafe_destroy_range`. pub fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { self.runtime .block_on(self.client.unsafe_destroy_range(range)) From 83498cfd6b291254a8d05738b126832035700937 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Sat, 17 Jan 2026 11:00:39 +0200 Subject: [PATCH 04/20] change .expect to ? operator for better error handling Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 2 +- tests/common/mod.rs | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 3e390cc3..55cb236a 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -22,7 +22,7 @@ impl SyncTransactionClient { /// Synchronous version of `TransactionClient::new_with_config`. pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { let runtime = - Arc::new(tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime")); + Arc::new(tokio::runtime::Runtime::new()?); let client = runtime.block_on(Client::new_with_config(pd_endpoints, config))?; Ok(Self { client, runtime }) } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 5df73812..eb21b3fb 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -75,9 +75,7 @@ pub async fn init() -> Result<()> { } pub fn init_sync() -> Result<()> { - tokio::runtime::Runtime::new() - .expect("Failed to create Tokio runtime") - .block_on(init()) + tokio::runtime::Runtime::new()?.block_on(init()) } async fn ensure_region_split( From d720774f8fd9b5a184896729db025799097da227 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Sat, 17 Jan 2026 11:09:27 +0200 Subject: [PATCH 05/20] add documentation to sync_client public API Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 156 +++++++++++++++++++++++++++++++-- 1 file changed, 147 insertions(+), 9 deletions(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 55cb236a..2152298d 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -8,18 +8,55 @@ use crate::{ }; use std::sync::Arc; +/// Synchronous TiKV transactional client. +/// +/// This is a synchronous wrapper around the async [`TransactionClient`](crate::TransactionClient). +/// All methods block the current thread until completion. +/// +/// For async operations, use [`TransactionClient`](crate::TransactionClient) instead. pub struct SyncTransactionClient { client: Client, runtime: Arc, } impl SyncTransactionClient { - /// Synchronous version of `TransactionClient::new`. + /// Create a synchronous transactional [`SyncTransactionClient`] and connect to the TiKV cluster. + /// + /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for + /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint + /// (include all endpoints, if possible), this helps avoid having a single point of failure. + /// + /// This is a synchronous version of [`TransactionClient::new`](crate::TransactionClient::new). + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::SyncTransactionClient; + /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); + /// ``` pub fn new>(pd_endpoints: Vec) -> Result { Self::new_with_config(pd_endpoints, Config::default()) } - /// Synchronous version of `TransactionClient::new_with_config`. + /// Create a synchronous transactional [`SyncTransactionClient`] with a custom configuration. + /// + /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for + /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint + /// (include all endpoints, if possible), this helps avoid having a single point of failure. + /// + /// This is a synchronous version of [`TransactionClient::new_with_config`](crate::TransactionClient::new_with_config). + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::{Config, SyncTransactionClient}; + /// # use std::time::Duration; + /// let client = SyncTransactionClient::new_with_config( + /// vec!["192.168.0.100"], + /// Config::default().with_timeout(Duration::from_secs(60)), + /// ) + /// .unwrap(); + /// ``` pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { let runtime = Arc::new(tokio::runtime::Runtime::new()?); @@ -27,19 +64,66 @@ impl SyncTransactionClient { Ok(Self { client, runtime }) } - /// Synchronous version of `TransactionClient::begin_optimistic`. + /// Creates a new optimistic [`SyncTransaction`]. + /// + /// Use the transaction to issue requests like [`get`](SyncTransaction::get) or + /// [`put`](SyncTransaction::put). + /// + /// Write operations do not lock data in TiKV, thus the commit request may fail due to a write + /// conflict. + /// + /// This is a synchronous version of [`TransactionClient::begin_optimistic`](crate::TransactionClient::begin_optimistic). + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::SyncTransactionClient; + /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); + /// let mut transaction = client.begin_optimistic().unwrap(); + /// // ... Issue some commands. + /// transaction.commit().unwrap(); + /// ``` pub fn begin_optimistic(&self) -> Result { let inner = self.runtime.block_on(self.client.begin_optimistic())?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } - /// Synchronous version of `TransactionClient::begin_pessimistic`. + /// Creates a new pessimistic [`SyncTransaction`]. + /// + /// Write operations will lock the data until committed, thus commit requests should not suffer + /// from write conflicts. + /// + /// This is a synchronous version of [`TransactionClient::begin_pessimistic`](crate::TransactionClient::begin_pessimistic). + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::SyncTransactionClient; + /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); + /// let mut transaction = client.begin_pessimistic().unwrap(); + /// // ... Issue some commands. + /// transaction.commit().unwrap(); + /// ``` pub fn begin_pessimistic(&self) -> Result { let inner = self.runtime.block_on(self.client.begin_pessimistic())?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } - /// Synchronous version of `TransactionClient::begin_with_options`. + /// Create a new customized [`SyncTransaction`]. + /// + /// This is a synchronous version of [`TransactionClient::begin_with_options`](crate::TransactionClient::begin_with_options). + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::{SyncTransactionClient, TransactionOptions}; + /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); + /// let mut transaction = client + /// .begin_with_options(TransactionOptions::default().use_async_commit()) + /// .unwrap(); + /// // ... Issue some commands. + /// transaction.commit().unwrap(); + /// ``` pub fn begin_with_options(&self, options: TransactionOptions) -> Result { let inner = self .runtime @@ -47,22 +131,61 @@ impl SyncTransactionClient { Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } + /// Create a new [`SyncSnapshot`] at the given [`Timestamp`]. + /// + /// This is a synchronous version of [`TransactionClient::snapshot`](crate::TransactionClient::snapshot). + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::{SyncTransactionClient, TransactionOptions}; + /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); + /// let timestamp = client.current_timestamp().unwrap(); + /// let snapshot = client.snapshot(timestamp, TransactionOptions::default()); + /// ``` pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> SyncSnapshot { let inner = self.client.snapshot(timestamp, options); SyncSnapshot::new(inner, Arc::clone(&self.runtime)) } - /// Synchronous version of `TransactionClient::current_timestamp`. + /// Retrieve the current [`Timestamp`]. + /// + /// This is a synchronous version of [`TransactionClient::current_timestamp`](crate::TransactionClient::current_timestamp). + /// + /// # Examples + /// + /// ```rust,no_run + /// # use tikv_client::SyncTransactionClient; + /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); + /// let timestamp = client.current_timestamp().unwrap(); + /// ``` pub fn current_timestamp(&self) -> Result { self.runtime.block_on(self.client.current_timestamp()) } - /// Synchronous version of `TransactionClient::gc`. + /// Request garbage collection (GC) of the TiKV cluster. + /// + /// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee + /// that all transactions started before this timestamp had committed. We can keep an active + /// transaction list in application to decide which is the minimal start timestamp of them. + /// + /// For each key, the last mutation record (unless it's a deletion) before `safepoint` is retained. + /// + /// GC is performed by: + /// 1. resolving all locks with timestamp <= `safepoint` + /// 2. updating PD's known safepoint + /// + /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). + /// We skip the second step "delete ranges" which is an optimization for TiDB. + /// + /// This is a synchronous version of [`TransactionClient::gc`](crate::TransactionClient::gc). pub fn gc(&self, safepoint: Timestamp) -> Result { self.runtime.block_on(self.client.gc(safepoint)) } - /// Synchronous version of `TransactionClient::cleanup_locks`. + /// Clean up all locks in the specified range. + /// + /// This is a synchronous version of [`TransactionClient::cleanup_locks`](crate::TransactionClient::cleanup_locks). pub fn cleanup_locks( &self, range: impl Into, @@ -73,12 +196,27 @@ impl SyncTransactionClient { .block_on(self.client.cleanup_locks(range, safepoint, options)) } - /// Synchronous version of `TransactionClient::unsafe_destroy_range`. + /// Cleans up all keys in a range and quickly reclaim disk space. + /// + /// The range can span over multiple regions. + /// + /// Note that the request will directly delete data from RocksDB, and all MVCC will be erased. + /// + /// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB. + /// + /// This is a synchronous version of [`TransactionClient::unsafe_destroy_range`](crate::TransactionClient::unsafe_destroy_range). pub fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { self.runtime .block_on(self.client.unsafe_destroy_range(range)) } + /// Scan all locks in the specified range. + /// + /// This is only available for integration tests. + /// + /// Note: `batch_size` must be >= expected number of locks. + /// + /// This is a synchronous version of [`TransactionClient::scan_locks`](crate::TransactionClient::scan_locks). #[cfg(feature = "integration-tests")] pub fn scan_locks( &self, From 2fad975c4f51effcda97938bcf754165f093b221 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Sat, 17 Jan 2026 11:16:01 +0200 Subject: [PATCH 06/20] add better documentation about snapshot Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 2152298d..457407c1 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -131,7 +131,18 @@ impl SyncTransactionClient { Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } - /// Create a new [`SyncSnapshot`] at the given [`Timestamp`]. + /// Create a new read-only [`SyncSnapshot`] at the given [`Timestamp`]. + /// + /// A snapshot is a read-only transaction that reads data as if the snapshot was taken at the + /// specified timestamp. It can read operations that happened before the timestamp, but ignores + /// operations after the timestamp. + /// + /// Use snapshots when you need: + /// - Consistent reads across multiple operations without starting a full transaction + /// - Point-in-time reads at a specific timestamp + /// - Read-only access without the overhead of transaction tracking + /// + /// Unlike transactions, snapshots cannot perform write operations (put, delete, etc.). /// /// This is a synchronous version of [`TransactionClient::snapshot`](crate::TransactionClient::snapshot). /// @@ -141,7 +152,10 @@ impl SyncTransactionClient { /// # use tikv_client::{SyncTransactionClient, TransactionOptions}; /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); /// let timestamp = client.current_timestamp().unwrap(); - /// let snapshot = client.snapshot(timestamp, TransactionOptions::default()); + /// let mut snapshot = client.snapshot(timestamp, TransactionOptions::default()); + /// + /// // Read data as it existed at the snapshot timestamp + /// let value = snapshot.get("key".to_owned()).unwrap(); /// ``` pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> SyncSnapshot { let inner = self.client.snapshot(timestamp, options); From a6403de5044f29d43ffa6fbaa383c37b51451d19 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Sun, 18 Jan 2026 10:39:39 +0200 Subject: [PATCH 07/20] remove trailing whitespace Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 457407c1..85e893d1 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -153,7 +153,7 @@ impl SyncTransactionClient { /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); /// let timestamp = client.current_timestamp().unwrap(); /// let mut snapshot = client.snapshot(timestamp, TransactionOptions::default()); - /// + /// /// // Read data as it existed at the snapshot timestamp /// let value = snapshot.get("key".to_owned()).unwrap(); /// ``` From 63c94d108e3f9c93a022688b12c30fb27f63ba62 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Sun, 18 Jan 2026 11:22:19 +0200 Subject: [PATCH 08/20] change tokio runtime method, add scan feature tests Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 7 +- tests/sync_transaction_tests.rs | 139 ++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 85e893d1..9402769c 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -58,8 +58,11 @@ impl SyncTransactionClient { /// .unwrap(); /// ``` pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { - let runtime = - Arc::new(tokio::runtime::Runtime::new()?); + let runtime = Arc::new( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?, + ); let client = runtime.block_on(Client::new_with_config(pd_endpoints, config))?; Ok(Self { client, runtime }) } diff --git a/tests/sync_transaction_tests.rs b/tests/sync_transaction_tests.rs index dc6b4409..a7a5b566 100644 --- a/tests/sync_transaction_tests.rs +++ b/tests/sync_transaction_tests.rs @@ -251,3 +251,142 @@ fn sync_txn_clone_client() -> Result<()> { Ok(()) } + +#[test] +#[serial] +fn sync_txn_scan() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup: Write test data + let mut txn = client.begin_optimistic()?; + txn.put("key1".to_owned(), "value1".to_owned())?; + txn.put("key2".to_owned(), "value2".to_owned())?; + txn.put("key3".to_owned(), "value3".to_owned())?; + txn.put("key4".to_owned(), "value4".to_owned())?; + txn.commit()?; + + // Test scan in forward order + let mut txn = client.begin_optimistic()?; + let results: Vec<_> = txn + .scan("key1".to_owned().."key4".to_owned(), 10)? + .collect(); + + assert_eq!(results.len(), 3); // key1, key2, key3 (key4 is exclusive) + assert_eq!(results[0].0, Key::from("key1".to_owned())); + assert_eq!(results[0].1, Value::from("value1".to_owned())); + + txn.rollback()?; + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_scan_keys() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup + let mut txn = client.begin_optimistic()?; + txn.put("scan_k1".to_owned(), "v1".to_owned())?; + txn.put("scan_k2".to_owned(), "v2".to_owned())?; + txn.put("scan_k3".to_owned(), "v3".to_owned())?; + txn.commit()?; + + // Test scan_keys (only keys, no values) + let mut txn = client.begin_optimistic()?; + let keys: Vec<_> = txn + .scan_keys("scan_k1".to_owned()..="scan_k3".to_owned(), 10)? + .collect(); + + assert_eq!(keys.len(), 3); + assert_eq!(keys[0], Key::from("scan_k1".to_owned())); + assert_eq!(keys[1], Key::from("scan_k2".to_owned())); + assert_eq!(keys[2], Key::from("scan_k3".to_owned())); + + txn.rollback()?; + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_scan_reverse() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup + let mut txn = client.begin_optimistic()?; + txn.put("rev1".to_owned(), "value1".to_owned())?; + txn.put("rev2".to_owned(), "value2".to_owned())?; + txn.put("rev3".to_owned(), "value3".to_owned())?; + txn.commit()?; + + // Test scan_reverse - should return in reverse order + let mut txn = client.begin_optimistic()?; + let results: Vec<_> = txn + .scan_reverse("rev1".to_owned()..="rev3".to_owned(), 10)? + .collect(); + + assert_eq!(results.len(), 3); + // Reverse order: rev3, rev2, rev1 + assert_eq!(results[0].0, Key::from("rev3".to_owned())); + assert_eq!(results[1].0, Key::from("rev2".to_owned())); + assert_eq!(results[2].0, Key::from("rev1".to_owned())); + + txn.rollback()?; + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_scan_keys_reverse() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup + let mut txn = client.begin_optimistic()?; + txn.put("revkey1".to_owned(), "v1".to_owned())?; + txn.put("revkey2".to_owned(), "v2".to_owned())?; + txn.put("revkey3".to_owned(), "v3".to_owned())?; + txn.commit()?; + + // Test scan_keys_reverse + let mut txn = client.begin_optimistic()?; + let keys: Vec<_> = txn + .scan_keys_reverse("revkey1".to_owned()..="revkey3".to_owned(), 10)? + .collect(); + + assert_eq!(keys.len(), 3); + // Reverse order + assert_eq!(keys[0], Key::from("revkey3".to_owned())); + assert_eq!(keys[1], Key::from("revkey2".to_owned())); + assert_eq!(keys[2], Key::from("revkey1".to_owned())); + + txn.rollback()?; + Ok(()) +} + +#[test] +#[serial] +fn sync_txn_scan_with_limit() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup: Write more data than we'll scan + let mut txn = client.begin_optimistic()?; + for i in 1..=10 { + txn.put(format!("limit_key{:02}", i), format!("value{}", i))?; + } + txn.commit()?; + + // Test with limit + let mut txn = client.begin_optimistic()?; + let results: Vec<_> = txn + .scan("limit_key00".to_owned().., 5)? // Limit to 5 results + .collect(); + + assert_eq!(results.len(), 5); // Should only get 5 results + + txn.rollback()?; + Ok(()) +} \ No newline at end of file From 55bf7a3977644465a2756ab5cf4bc3e85a151394 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Mon, 19 Jan 2026 06:45:36 +0200 Subject: [PATCH 09/20] init_sync change according to pr comment, cargo fmt Signed-off-by: Otto Westerlund --- tests/common/mod.rs | 6 +++++- tests/sync_transaction_tests.rs | 22 +++++++++++----------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index eb21b3fb..855222dc 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -75,7 +75,11 @@ pub async fn init() -> Result<()> { } pub fn init_sync() -> Result<()> { - tokio::runtime::Runtime::new()?.block_on(init()) + if let Ok(handle) = tokio::runtime::Handle::try_current() { + handle.block_on(init()) + } else { + tokio::runtime::Runtime::new()?.block_on(init()) + } } async fn ensure_region_split( diff --git a/tests/sync_transaction_tests.rs b/tests/sync_transaction_tests.rs index a7a5b566..789b8345 100644 --- a/tests/sync_transaction_tests.rs +++ b/tests/sync_transaction_tests.rs @@ -271,11 +271,11 @@ fn sync_txn_scan() -> Result<()> { let results: Vec<_> = txn .scan("key1".to_owned().."key4".to_owned(), 10)? .collect(); - + assert_eq!(results.len(), 3); // key1, key2, key3 (key4 is exclusive) assert_eq!(results[0].0, Key::from("key1".to_owned())); assert_eq!(results[0].1, Value::from("value1".to_owned())); - + txn.rollback()?; Ok(()) } @@ -298,12 +298,12 @@ fn sync_txn_scan_keys() -> Result<()> { let keys: Vec<_> = txn .scan_keys("scan_k1".to_owned()..="scan_k3".to_owned(), 10)? .collect(); - + assert_eq!(keys.len(), 3); assert_eq!(keys[0], Key::from("scan_k1".to_owned())); assert_eq!(keys[1], Key::from("scan_k2".to_owned())); assert_eq!(keys[2], Key::from("scan_k3".to_owned())); - + txn.rollback()?; Ok(()) } @@ -326,13 +326,13 @@ fn sync_txn_scan_reverse() -> Result<()> { let results: Vec<_> = txn .scan_reverse("rev1".to_owned()..="rev3".to_owned(), 10)? .collect(); - + assert_eq!(results.len(), 3); // Reverse order: rev3, rev2, rev1 assert_eq!(results[0].0, Key::from("rev3".to_owned())); assert_eq!(results[1].0, Key::from("rev2".to_owned())); assert_eq!(results[2].0, Key::from("rev1".to_owned())); - + txn.rollback()?; Ok(()) } @@ -355,13 +355,13 @@ fn sync_txn_scan_keys_reverse() -> Result<()> { let keys: Vec<_> = txn .scan_keys_reverse("revkey1".to_owned()..="revkey3".to_owned(), 10)? .collect(); - + assert_eq!(keys.len(), 3); // Reverse order assert_eq!(keys[0], Key::from("revkey3".to_owned())); assert_eq!(keys[1], Key::from("revkey2".to_owned())); assert_eq!(keys[2], Key::from("revkey1".to_owned())); - + txn.rollback()?; Ok(()) } @@ -384,9 +384,9 @@ fn sync_txn_scan_with_limit() -> Result<()> { let results: Vec<_> = txn .scan("limit_key00".to_owned().., 5)? // Limit to 5 results .collect(); - + assert_eq!(results.len(), 5); // Should only get 5 results - + txn.rollback()?; Ok(()) -} \ No newline at end of file +} From 923546605e45c45a8d60d3598cc6cdae76f90ff4 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Tue, 20 Jan 2026 08:19:34 +0200 Subject: [PATCH 10/20] multi thread scheduler for new_with_config Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 9402769c..6fde72a6 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -59,7 +59,7 @@ impl SyncTransactionClient { /// ``` pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { let runtime = Arc::new( - tokio::runtime::Builder::new_current_thread() + tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?, ); From cc0f11908a2094f895224a8f3d7ae94b6b576d31 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Tue, 20 Jan 2026 08:28:47 +0200 Subject: [PATCH 11/20] cleanup docstrings Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 117 ++------------------------------- 1 file changed, 4 insertions(+), 113 deletions(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 6fde72a6..5773e135 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -22,41 +22,14 @@ pub struct SyncTransactionClient { impl SyncTransactionClient { /// Create a synchronous transactional [`SyncTransactionClient`] and connect to the TiKV cluster. /// - /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for - /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint - /// (include all endpoints, if possible), this helps avoid having a single point of failure. - /// - /// This is a synchronous version of [`TransactionClient::new`](crate::TransactionClient::new). - /// - /// # Examples - /// - /// ```rust,no_run - /// # use tikv_client::SyncTransactionClient; - /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); - /// ``` + /// See usage example in the documentation of [`TransactionClient::new`](crate::TransactionClient::new). pub fn new>(pd_endpoints: Vec) -> Result { Self::new_with_config(pd_endpoints, Config::default()) } /// Create a synchronous transactional [`SyncTransactionClient`] with a custom configuration. /// - /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for - /// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint - /// (include all endpoints, if possible), this helps avoid having a single point of failure. - /// - /// This is a synchronous version of [`TransactionClient::new_with_config`](crate::TransactionClient::new_with_config). - /// - /// # Examples - /// - /// ```rust,no_run - /// # use tikv_client::{Config, SyncTransactionClient}; - /// # use std::time::Duration; - /// let client = SyncTransactionClient::new_with_config( - /// vec!["192.168.0.100"], - /// Config::default().with_timeout(Duration::from_secs(60)), - /// ) - /// .unwrap(); - /// ``` + /// See usage example in the documentation of [`TransactionClient::new_with_config`](crate::TransactionClient::new_with_config). pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() @@ -72,41 +45,15 @@ impl SyncTransactionClient { /// Use the transaction to issue requests like [`get`](SyncTransaction::get) or /// [`put`](SyncTransaction::put). /// - /// Write operations do not lock data in TiKV, thus the commit request may fail due to a write - /// conflict. - /// /// This is a synchronous version of [`TransactionClient::begin_optimistic`](crate::TransactionClient::begin_optimistic). - /// - /// # Examples - /// - /// ```rust,no_run - /// # use tikv_client::SyncTransactionClient; - /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); - /// let mut transaction = client.begin_optimistic().unwrap(); - /// // ... Issue some commands. - /// transaction.commit().unwrap(); - /// ``` pub fn begin_optimistic(&self) -> Result { let inner = self.runtime.block_on(self.client.begin_optimistic())?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } /// Creates a new pessimistic [`SyncTransaction`]. - /// - /// Write operations will lock the data until committed, thus commit requests should not suffer - /// from write conflicts. - /// + /// /// This is a synchronous version of [`TransactionClient::begin_pessimistic`](crate::TransactionClient::begin_pessimistic). - /// - /// # Examples - /// - /// ```rust,no_run - /// # use tikv_client::SyncTransactionClient; - /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); - /// let mut transaction = client.begin_pessimistic().unwrap(); - /// // ... Issue some commands. - /// transaction.commit().unwrap(); - /// ``` pub fn begin_pessimistic(&self) -> Result { let inner = self.runtime.block_on(self.client.begin_pessimistic())?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) @@ -115,18 +62,6 @@ impl SyncTransactionClient { /// Create a new customized [`SyncTransaction`]. /// /// This is a synchronous version of [`TransactionClient::begin_with_options`](crate::TransactionClient::begin_with_options). - /// - /// # Examples - /// - /// ```rust,no_run - /// # use tikv_client::{SyncTransactionClient, TransactionOptions}; - /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); - /// let mut transaction = client - /// .begin_with_options(TransactionOptions::default().use_async_commit()) - /// .unwrap(); - /// // ... Issue some commands. - /// transaction.commit().unwrap(); - /// ``` pub fn begin_with_options(&self, options: TransactionOptions) -> Result { let inner = self .runtime @@ -135,31 +70,8 @@ impl SyncTransactionClient { } /// Create a new read-only [`SyncSnapshot`] at the given [`Timestamp`]. - /// - /// A snapshot is a read-only transaction that reads data as if the snapshot was taken at the - /// specified timestamp. It can read operations that happened before the timestamp, but ignores - /// operations after the timestamp. - /// - /// Use snapshots when you need: - /// - Consistent reads across multiple operations without starting a full transaction - /// - Point-in-time reads at a specific timestamp - /// - Read-only access without the overhead of transaction tracking - /// - /// Unlike transactions, snapshots cannot perform write operations (put, delete, etc.). - /// + /// /// This is a synchronous version of [`TransactionClient::snapshot`](crate::TransactionClient::snapshot). - /// - /// # Examples - /// - /// ```rust,no_run - /// # use tikv_client::{SyncTransactionClient, TransactionOptions}; - /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); - /// let timestamp = client.current_timestamp().unwrap(); - /// let mut snapshot = client.snapshot(timestamp, TransactionOptions::default()); - /// - /// // Read data as it existed at the snapshot timestamp - /// let value = snapshot.get("key".to_owned()).unwrap(); - /// ``` pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> SyncSnapshot { let inner = self.client.snapshot(timestamp, options); SyncSnapshot::new(inner, Arc::clone(&self.runtime)) @@ -168,33 +80,12 @@ impl SyncTransactionClient { /// Retrieve the current [`Timestamp`]. /// /// This is a synchronous version of [`TransactionClient::current_timestamp`](crate::TransactionClient::current_timestamp). - /// - /// # Examples - /// - /// ```rust,no_run - /// # use tikv_client::SyncTransactionClient; - /// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap(); - /// let timestamp = client.current_timestamp().unwrap(); - /// ``` pub fn current_timestamp(&self) -> Result { self.runtime.block_on(self.client.current_timestamp()) } /// Request garbage collection (GC) of the TiKV cluster. /// - /// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee - /// that all transactions started before this timestamp had committed. We can keep an active - /// transaction list in application to decide which is the minimal start timestamp of them. - /// - /// For each key, the last mutation record (unless it's a deletion) before `safepoint` is retained. - /// - /// GC is performed by: - /// 1. resolving all locks with timestamp <= `safepoint` - /// 2. updating PD's known safepoint - /// - /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). - /// We skip the second step "delete ranges" which is an optimization for TiDB. - /// /// This is a synchronous version of [`TransactionClient::gc`](crate::TransactionClient::gc). pub fn gc(&self, safepoint: Timestamp) -> Result { self.runtime.block_on(self.client.gc(safepoint)) From b5b389c5aebcee27be2764e6c5fcbc57d8bb9376 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Tue, 20 Jan 2026 08:31:07 +0200 Subject: [PATCH 12/20] rename test cases sync_txn -> txn_sync Signed-off-by: Otto Westerlund --- tests/sync_transaction_tests.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/sync_transaction_tests.rs b/tests/sync_transaction_tests.rs index 789b8345..a923681a 100644 --- a/tests/sync_transaction_tests.rs +++ b/tests/sync_transaction_tests.rs @@ -22,7 +22,7 @@ fn sync_client() -> Result { #[test] #[serial] -fn sync_txn_get_timestamp() -> Result<()> { +fn txn_sync_get_timestamp() -> Result<()> { const COUNT: usize = 1 << 8; let client = sync_client()?; @@ -40,7 +40,7 @@ fn sync_txn_get_timestamp() -> Result<()> { #[test] #[serial] -fn sync_txn_crud() -> Result<()> { +fn txn_sync_crud() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -97,7 +97,7 @@ fn sync_txn_crud() -> Result<()> { #[test] #[serial] -fn sync_txn_begin_pessimistic() -> Result<()> { +fn txn_sync_begin_pessimistic() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -124,7 +124,7 @@ fn sync_txn_begin_pessimistic() -> Result<()> { #[test] #[serial] -fn sync_txn_snapshot() -> Result<()> { +fn txn_sync_snapshot() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -168,7 +168,7 @@ fn sync_txn_snapshot() -> Result<()> { #[test] #[serial] -fn sync_txn_begin_with_options() -> Result<()> { +fn txn_sync_begin_with_options() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -191,7 +191,7 @@ fn sync_txn_begin_with_options() -> Result<()> { #[test] #[serial] -fn sync_txn_rollback() -> Result<()> { +fn txn_sync_rollback() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -221,7 +221,7 @@ fn sync_txn_rollback() -> Result<()> { #[test] #[serial] -fn sync_txn_clone_client() -> Result<()> { +fn txn_sync_clone_client() -> Result<()> { init_sync()?; let client1 = sync_client()?; @@ -254,7 +254,7 @@ fn sync_txn_clone_client() -> Result<()> { #[test] #[serial] -fn sync_txn_scan() -> Result<()> { +fn txn_sync_scan() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -282,7 +282,7 @@ fn sync_txn_scan() -> Result<()> { #[test] #[serial] -fn sync_txn_scan_keys() -> Result<()> { +fn txn_sync_scan_keys() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -310,7 +310,7 @@ fn sync_txn_scan_keys() -> Result<()> { #[test] #[serial] -fn sync_txn_scan_reverse() -> Result<()> { +fn txn_sync_scan_reverse() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -339,7 +339,7 @@ fn sync_txn_scan_reverse() -> Result<()> { #[test] #[serial] -fn sync_txn_scan_keys_reverse() -> Result<()> { +fn txn_sync_scan_keys_reverse() -> Result<()> { init_sync()?; let client = sync_client()?; @@ -368,7 +368,7 @@ fn sync_txn_scan_keys_reverse() -> Result<()> { #[test] #[serial] -fn sync_txn_scan_with_limit() -> Result<()> { +fn txn_sync_scan_with_limit() -> Result<()> { init_sync()?; let client = sync_client()?; From 00dc27973495d89c5ea7e289aee0ea60ef7c8201 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Tue, 20 Jan 2026 08:34:54 +0200 Subject: [PATCH 13/20] change init_sync according to review suggestion and add comment about usage limitation Signed-off-by: Otto Westerlund --- tests/common/mod.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 855222dc..a2a1cd51 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -74,12 +74,18 @@ pub async fn init() -> Result<()> { Ok(()) } +/// Initialize the test environment synchronously. +/// +/// This function creates its own tokio runtime and must be called from +/// synchronous code only (e.g., at the start of a `#[test]` function). +/// +/// **Do not call this from inside an async function or tokio runtime** - +/// it will panic. If you're already in an async context, use `init()` directly. pub fn init_sync() -> Result<()> { - if let Ok(handle) = tokio::runtime::Handle::try_current() { - handle.block_on(init()) - } else { - tokio::runtime::Runtime::new()?.block_on(init()) - } + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()? + .block_on(init()) } async fn ensure_region_split( From b3ea3faa13531c5d87bd3d879304099ebe739216 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Tue, 20 Jan 2026 12:54:42 +0200 Subject: [PATCH 14/20] remove trailing whitespace Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 4 ++-- tests/common/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 5773e135..9c9db213 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -52,7 +52,7 @@ impl SyncTransactionClient { } /// Creates a new pessimistic [`SyncTransaction`]. - /// + /// /// This is a synchronous version of [`TransactionClient::begin_pessimistic`](crate::TransactionClient::begin_pessimistic). pub fn begin_pessimistic(&self) -> Result { let inner = self.runtime.block_on(self.client.begin_pessimistic())?; @@ -70,7 +70,7 @@ impl SyncTransactionClient { } /// Create a new read-only [`SyncSnapshot`] at the given [`Timestamp`]. - /// + /// /// This is a synchronous version of [`TransactionClient::snapshot`](crate::TransactionClient::snapshot). pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> SyncSnapshot { let inner = self.client.snapshot(timestamp, options); diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a2a1cd51..69eddbe9 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -78,8 +78,8 @@ pub async fn init() -> Result<()> { /// /// This function creates its own tokio runtime and must be called from /// synchronous code only (e.g., at the start of a `#[test]` function). -/// -/// **Do not call this from inside an async function or tokio runtime** - +/// +/// **Do not call this from inside an async function or tokio runtime** - /// it will panic. If you're already in an async context, use `init()` directly. pub fn init_sync() -> Result<()> { tokio::runtime::Builder::new_current_thread() From 920f0044236e847936be46aea8253cf4947f43a4 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Tue, 20 Jan 2026 12:55:23 +0200 Subject: [PATCH 15/20] tests for sync_transaction wrapper functions Signed-off-by: Otto Westerlund --- tests/sync_transaction_tests.rs | 280 ++++++++++++++++++++++++++++++++ 1 file changed, 280 insertions(+) diff --git a/tests/sync_transaction_tests.rs b/tests/sync_transaction_tests.rs index a923681a..fbf75080 100644 --- a/tests/sync_transaction_tests.rs +++ b/tests/sync_transaction_tests.rs @@ -8,6 +8,7 @@ mod common; use common::*; use serial_test::serial; use std::collections::HashMap; +use tikv_client::transaction::Mutation; use tikv_client::Config; use tikv_client::Key; use tikv_client::Result; @@ -390,3 +391,282 @@ fn txn_sync_scan_with_limit() -> Result<()> { txn.rollback()?; Ok(()) } + +#[test] +#[serial] +fn txn_sync_insert() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Test insert on non-existent key + let mut txn = client.begin_optimistic()?; + txn.insert("insert_key".to_owned(), "value".to_owned())?; + txn.commit()?; + + // Verify insert succeeded + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("insert_key".to_owned())?, + Some("value".to_owned().into()) + ); + txn.rollback()?; + + // Test insert on existing key should fail at commit + let mut txn = client.begin_optimistic()?; + txn.insert("insert_key".to_owned(), "new_value".to_owned())?; // This succeeds + let result = txn.commit(); // But commit should fail + assert!(result.is_err()); + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_batch_mutate() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup: Write initial data + let mut txn = client.begin_optimistic()?; + txn.put("mutate_key1".to_owned(), "initial1".to_owned())?; + txn.put("mutate_key2".to_owned(), "initial2".to_owned())?; + txn.put("mutate_key3".to_owned(), "initial3".to_owned())?; + txn.commit()?; + + // Test batch_mutate with mix of puts and deletes + let mut txn = client.begin_optimistic()?; + let mutations = vec![ + Mutation::Put( + Key::from("mutate_key1".to_owned()), + Value::from("updated1".to_owned()), + ), + Mutation::Delete(Key::from("mutate_key2".to_owned())), + Mutation::Put( + Key::from("mutate_key4".to_owned()), + Value::from("new4".to_owned()), + ), + ]; + txn.batch_mutate(mutations)?; + txn.commit()?; + + // Verify mutations applied correctly + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("mutate_key1".to_owned())?, + Some("updated1".to_owned().into()) + ); + assert!(txn.get("mutate_key2".to_owned())?.is_none()); // Deleted + assert_eq!( + txn.get("mutate_key3".to_owned())?, + Some("initial3".to_owned().into()) + ); // Unchanged + assert_eq!( + txn.get("mutate_key4".to_owned())?, + Some("new4".to_owned().into()) + ); // New key + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_lock_keys() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup: Write initial data + let mut txn = client.begin_optimistic()?; + txn.put("lock_key1".to_owned(), "value1".to_owned())?; + txn.put("lock_key2".to_owned(), "value2".to_owned())?; + txn.commit()?; + + // Test lock_keys - should lock keys without modifying them + let mut txn = client.begin_pessimistic()?; + txn.lock_keys(vec!["lock_key1".to_owned(), "lock_key2".to_owned()])?; + + // Should still be able to read the values + assert_eq!( + txn.get("lock_key1".to_owned())?, + Some("value1".to_owned().into()) + ); + assert_eq!( + txn.get("lock_key2".to_owned())?, + Some("value2".to_owned().into()) + ); + + txn.commit()?; + + // Verify values unchanged + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("lock_key1".to_owned())?, + Some("value1".to_owned().into()) + ); + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_get_for_update() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup: Write initial data + let mut txn = client.begin_optimistic()?; + txn.put("gfu_key".to_owned(), "initial".to_owned())?; + txn.commit()?; + + // Test get_for_update - gets value and locks the key + let mut txn = client.begin_pessimistic()?; + let value = txn.get_for_update("gfu_key".to_owned())?; + assert_eq!(value, Some("initial".to_owned().into())); + + // Update the value after locking + txn.put("gfu_key".to_owned(), "updated".to_owned())?; + txn.commit()?; + + // Verify update succeeded + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("gfu_key".to_owned())?, + Some("updated".to_owned().into()) + ); + txn.rollback()?; + + // Test get_for_update on non-existent key + let mut txn = client.begin_pessimistic()?; + let value = txn.get_for_update("nonexistent_gfu".to_owned())?; + assert!(value.is_none()); + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_batch_get_for_update() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup: Write initial data + let mut txn = client.begin_optimistic()?; + txn.put("bgfu_key1".to_owned(), "value1".to_owned())?; + txn.put("bgfu_key2".to_owned(), "value2".to_owned())?; + txn.put("bgfu_key3".to_owned(), "value3".to_owned())?; + txn.commit()?; + + // Test batch_get_for_update + let mut txn = client.begin_pessimistic()?; + let results = txn.batch_get_for_update(vec![ + "bgfu_key1".to_owned(), + "bgfu_key2".to_owned(), + "bgfu_key3".to_owned(), + "nonexistent".to_owned(), // Non-existent key + ])?; + + // Should only return existing keys + assert_eq!(results.len(), 3); + let result_map: HashMap = results.into_iter().map(|kv| (kv.0, kv.1)).collect(); + + assert_eq!( + result_map.get(&Key::from("bgfu_key1".to_owned())), + Some(&Value::from("value1".to_owned())) + ); + assert_eq!( + result_map.get(&Key::from("bgfu_key2".to_owned())), + Some(&Value::from("value2".to_owned())) + ); + assert_eq!( + result_map.get(&Key::from("bgfu_key3".to_owned())), + Some(&Value::from("value3".to_owned())) + ); + + // Modify values after locking + txn.put("bgfu_key1".to_owned(), "updated1".to_owned())?; + txn.commit()?; + + // Verify update succeeded + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("bgfu_key1".to_owned())?, + Some("updated1".to_owned().into()) + ); + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_key_exists() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Setup: Write some data + let mut txn = client.begin_optimistic()?; + txn.put("exists_key".to_owned(), "value".to_owned())?; + txn.commit()?; + + // Test key_exists on existing key + let mut txn = client.begin_optimistic()?; + assert!(txn.key_exists("exists_key".to_owned())?); + + // Test key_exists on non-existent key + assert!(!txn.key_exists("nonexistent_key".to_owned())?); + + txn.rollback()?; + + // Test key_exists with buffered operations + let mut txn = client.begin_optimistic()?; + txn.put("buffered_key".to_owned(), "value".to_owned())?; + + // Should see the buffered key as existing + assert!(txn.key_exists("buffered_key".to_owned())?); + + // Test with buffered delete + txn.delete("exists_key".to_owned())?; + assert!(!txn.key_exists("exists_key".to_owned())?); + + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_send_heart_beat() -> Result<()> { + init_sync()?; + let client = sync_client()?; + + // Start a pessimistic transaction (heart beat is typically used with long-running transactions) + let mut txn = client.begin_pessimistic()?; + + // Put some data + txn.put("heartbeat_key".to_owned(), "value".to_owned())?; + + // Send heart beat to keep transaction alive + let ttl = txn.send_heart_beat()?; + + // TTL should be a positive value (in milliseconds) + assert!(ttl > 0); + + // Can send multiple heart beats + let ttl2 = txn.send_heart_beat()?; + assert!(ttl2 > 0); + + // Commit the transaction + txn.commit()?; + + // Verify data was written + let mut txn = client.begin_optimistic()?; + assert_eq!( + txn.get("heartbeat_key".to_owned())?, + Some("value".to_owned().into()) + ); + txn.rollback()?; + + Ok(()) +} From b5fe3b113aed7134fd587ee3cd8bfb255669c9e4 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Wed, 28 Jan 2026 11:33:22 +0200 Subject: [PATCH 16/20] code review fixes Signed-off-by: Otto Westerlund --- src/common/errors.rs | 6 + src/transaction/sync_client.rs | 174 +++++++++++++-- src/transaction/sync_snapshot.rs | 4 +- tests/sync_transaction_tests.rs | 364 +++++++++++++++++++++++++++++++ 4 files changed, 532 insertions(+), 16 deletions(-) diff --git a/src/common/errors.rs b/src/common/errors.rs index 41e081a2..cd0e19ec 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -114,6 +114,12 @@ pub enum Error { KeyspaceNotFound(String), #[error("Transaction not found error: {:?}", _0)] TxnNotFound(kvrpcpb::TxnNotFound), + /// Attempted to create or use the sync client (including calling its methods) from within a Tokio async runtime context + #[error( + "Cannot use SyncTransactionClient from within a Tokio async runtime context: {0}. \ +Use TransactionClient instead or move SyncTransactionClient usage outside the async context." + )] + NestedRuntimeError(String), } impl From for Error { diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 9c9db213..9351f8ca 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -4,10 +4,62 @@ use crate::{ client::Client, sync_snapshot::SyncSnapshot, sync_transaction::SyncTransaction, ResolveLocksOptions, }, - BoundRange, Config, Result, Timestamp, TransactionOptions, + BoundRange, Config, Error, Result, Timestamp, TransactionOptions, }; use std::sync::Arc; +/// Detects whether a Tokio async runtime is already running on the current thread. +/// +/// When the synchronous transaction client is used from within an existing async +/// runtime, blocking operations (such as `block_on`) can cause deadlocks or other +/// unexpected blocking behavior. This helper checks for a currently active Tokio +/// runtime and returns `Error::NestedRuntimeError` if one is found, allowing callers +/// to detect and handle incorrect use of the synchronous client from within an +/// existing async runtime instead of risking deadlocks or unexpected blocking. +/// +/// Note: checks only for Tokio runtimes, not other async runtimes. +/// +/// # Error Handling +/// +/// If this function returns `Error::NestedRuntimeError`, callers should: +/// - Use the async [`TransactionClient`](crate::TransactionClient) instead of `SyncTransactionClient` +/// - Move the `SyncTransactionClient` creation and usage outside of the async context +/// - Consider restructuring the code to avoid mixing sync and async execution contexts +fn check_nested_runtime() -> Result<()> { + if tokio::runtime::Handle::try_current().is_ok() { + return Err(Error::NestedRuntimeError( + "Nested Tokio runtime detected: cannot use SyncTransactionClient from within an async context. \ + Use the async TransactionClient instead, or create and use SyncTransactionClient outside of any Tokio runtime." + .to_string(), + )); + } + Ok(()) +} + +/// Run a `Result`-returning future on the given Tokio runtime with nested-runtime detection. +/// +/// This is a thin wrapper around [`tokio::runtime::Runtime::block_on`] that first checks +/// whether a Tokio runtime is already active in the current context. If a nested runtime +/// is detected, it returns [`Error::NestedRuntimeError`] instead of attempting to block, +/// which helps prevent potential deadlocks and provides clearer error messages when +/// `block_on` is misused from within an existing async runtime. +/// +/// # Returns +/// +/// - `Ok(T)` with the successful result produced by the provided future when no nested +/// runtime is detected and the future completes successfully. +/// - `Err(Error::NestedRuntimeError)` if a Tokio runtime is already active on the current +/// thread when this function is called. +/// - `Err(e)` for any other [`Error`] produced either by the future itself or by +/// `runtime.block_on`. +fn safe_block_on(runtime: &tokio::runtime::Runtime, future: F) -> Result +where + F: std::future::Future>, +{ + check_nested_runtime()?; + runtime.block_on(future) +} + /// Synchronous TiKV transactional client. /// /// This is a synchronous wrapper around the async [`TransactionClient`](crate::TransactionClient). @@ -31,6 +83,8 @@ impl SyncTransactionClient { /// /// See usage example in the documentation of [`TransactionClient::new_with_config`](crate::TransactionClient::new_with_config). pub fn new_with_config>(pd_endpoints: Vec, config: Config) -> Result { + check_nested_runtime()?; + let runtime = Arc::new( tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -47,7 +101,7 @@ impl SyncTransactionClient { /// /// This is a synchronous version of [`TransactionClient::begin_optimistic`](crate::TransactionClient::begin_optimistic). pub fn begin_optimistic(&self) -> Result { - let inner = self.runtime.block_on(self.client.begin_optimistic())?; + let inner = safe_block_on(&self.runtime, self.client.begin_optimistic())?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } @@ -55,7 +109,7 @@ impl SyncTransactionClient { /// /// This is a synchronous version of [`TransactionClient::begin_pessimistic`](crate::TransactionClient::begin_pessimistic). pub fn begin_pessimistic(&self) -> Result { - let inner = self.runtime.block_on(self.client.begin_pessimistic())?; + let inner = safe_block_on(&self.runtime, self.client.begin_pessimistic())?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } @@ -63,9 +117,7 @@ impl SyncTransactionClient { /// /// This is a synchronous version of [`TransactionClient::begin_with_options`](crate::TransactionClient::begin_with_options). pub fn begin_with_options(&self, options: TransactionOptions) -> Result { - let inner = self - .runtime - .block_on(self.client.begin_with_options(options))?; + let inner = safe_block_on(&self.runtime, self.client.begin_with_options(options))?; Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime))) } @@ -81,14 +133,14 @@ impl SyncTransactionClient { /// /// This is a synchronous version of [`TransactionClient::current_timestamp`](crate::TransactionClient::current_timestamp). pub fn current_timestamp(&self) -> Result { - self.runtime.block_on(self.client.current_timestamp()) + safe_block_on(&self.runtime, self.client.current_timestamp()) } /// Request garbage collection (GC) of the TiKV cluster. /// /// This is a synchronous version of [`TransactionClient::gc`](crate::TransactionClient::gc). pub fn gc(&self, safepoint: Timestamp) -> Result { - self.runtime.block_on(self.client.gc(safepoint)) + safe_block_on(&self.runtime, self.client.gc(safepoint)) } /// Clean up all locks in the specified range. @@ -100,8 +152,10 @@ impl SyncTransactionClient { safepoint: &Timestamp, options: ResolveLocksOptions, ) -> Result { - self.runtime - .block_on(self.client.cleanup_locks(range, safepoint, options)) + safe_block_on( + &self.runtime, + self.client.cleanup_locks(range, safepoint, options), + ) } /// Cleans up all keys in a range and quickly reclaim disk space. @@ -114,8 +168,7 @@ impl SyncTransactionClient { /// /// This is a synchronous version of [`TransactionClient::unsafe_destroy_range`](crate::TransactionClient::unsafe_destroy_range). pub fn unsafe_destroy_range(&self, range: impl Into) -> Result<()> { - self.runtime - .block_on(self.client.unsafe_destroy_range(range)) + safe_block_on(&self.runtime, self.client.unsafe_destroy_range(range)) } /// Scan all locks in the specified range. @@ -132,8 +185,10 @@ impl SyncTransactionClient { range: impl Into, batch_size: u32, ) -> Result> { - self.runtime - .block_on(self.client.scan_locks(safepoint, range, batch_size)) + safe_block_on( + &self.runtime, + self.client.scan_locks(safepoint, range, batch_size), + ) } } @@ -145,3 +200,94 @@ impl Clone for SyncTransactionClient { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_check_nested_runtime_outside_async() { + // When called outside an async context, should return Ok(()) + let result = check_nested_runtime(); + assert!( + result.is_ok(), + "check_nested_runtime should succeed outside async context" + ); + } + + #[test] + fn test_check_nested_runtime_inside_async() { + // When called inside an async context, should return Err + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let result = check_nested_runtime(); + assert!( + result.is_err(), + "check_nested_runtime should fail inside async context" + ); + + // Verify the error type is correct + match result.unwrap_err() { + Error::NestedRuntimeError(_) => { + // Expected case - test passes + } + other => panic!("Expected NestedRuntimeError, got: {:?}", other), + } + }); + } + + #[test] + fn test_safe_block_on_outside_async() { + // safe_block_on should work when called outside an async context + let rt = tokio::runtime::Runtime::new().unwrap(); + let result = safe_block_on(&rt, async { Ok::<_, Error>(42) }); + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 42); + } + + #[test] + fn test_safe_block_on_inside_async() { + // safe_block_on should fail when called inside an async context + let outer_rt = tokio::runtime::Runtime::new().unwrap(); + + // Create the inner runtime OUTSIDE the async context + let inner_rt = tokio::runtime::Runtime::new().unwrap(); + + outer_rt.block_on(async { + let result = safe_block_on(&inner_rt, async { Ok::<_, Error>(42) }); + + assert!( + result.is_err(), + "safe_block_on should fail inside async context" + ); + + // Verify the error type is correct + match result.unwrap_err() { + Error::NestedRuntimeError(_) => { + // Expected case - test passes + } + other => panic!("Expected NestedRuntimeError, got: {:?}", other), + } + }); + } + + #[test] + fn test_nested_runtime_error_matching() { + // Verify that NestedRuntimeError can be matched on programmatically + let outer_rt = tokio::runtime::Runtime::new().unwrap(); + + outer_rt.block_on(async { + let result = check_nested_runtime(); + + assert!(result.is_err()); + + // Demonstrate type-safe error matching + match result.unwrap_err() { + Error::NestedRuntimeError(_) => { + // Expected case - test passes + } + other => panic!("Expected NestedRuntimeError, got: {:?}", other), + } + }); + } +} diff --git a/src/transaction/sync_snapshot.rs b/src/transaction/sync_snapshot.rs index ac7268a7..7f12751e 100644 --- a/src/transaction/sync_snapshot.rs +++ b/src/transaction/sync_snapshot.rs @@ -33,7 +33,7 @@ impl SyncSnapshot { self.runtime.block_on(self.inner.batch_get(keys)) } - /// Scan a range, return at most `limit` key-value pairs that lying in the range. + /// Scan a range, return at most `limit` key-value pairs that lie in the range. pub fn scan( &mut self, range: impl Into, @@ -42,7 +42,7 @@ impl SyncSnapshot { self.runtime.block_on(self.inner.scan(range, limit)) } - /// Scan a range, return at most `limit` keys that lying in the range. + /// Scan a range, return at most `limit` keys that lie in the range. pub fn scan_keys( &mut self, range: impl Into, diff --git a/tests/sync_transaction_tests.rs b/tests/sync_transaction_tests.rs index fbf75080..391a5d21 100644 --- a/tests/sync_transaction_tests.rs +++ b/tests/sync_transaction_tests.rs @@ -21,6 +21,91 @@ fn sync_client() -> Result { SyncTransactionClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()) } +#[test] +#[serial] +fn test_sync_client_outside_async_context() -> Result<()> { + // This test verifies that SyncTransactionClient works correctly when called + // from a synchronous context (no Tokio runtime active) + let client = sync_client()?; + + // Should be able to call methods without error + let _timestamp = client.current_timestamp()?; + let mut txn = client.begin_optimistic()?; + + // Must rollback or commit the transaction to avoid panic on drop + txn.rollback()?; + + Ok(()) +} + +#[test] +#[serial] +fn test_sync_client_new_inside_async_context() { + // This test verifies that creating a SyncTransactionClient inside an async + // context returns an error instead of panicking + let runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + let result = SyncTransactionClient::new_with_config( + pd_addrs(), + Config::default().with_default_keyspace(), + ); + + // Should return an error, not panic + assert!(result.is_err()); + + // Verify the error type is correct + match result.unwrap_err() { + tikv_client::Error::NestedRuntimeError(_) => { + // Expected case - test passes + } + other => panic!("Expected NestedRuntimeError, got: {:?}", other), + } + }); +} + +#[test] +#[serial] +fn test_sync_client_methods_inside_async_context() -> Result<()> { + // This test verifies that calling SyncTransactionClient methods inside an + // async context returns an error instead of panicking. + // The client is created outside the async context, but methods are called inside. + + let client = sync_client()?; + let runtime = tokio::runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + // All of these should return errors when called inside an async context + let timestamp_result = client.current_timestamp(); + assert!( + timestamp_result.is_err(), + "current_timestamp should fail in async context" + ); + + let begin_result = client.begin_optimistic(); + assert!( + begin_result.is_err(), + "begin_optimistic should fail in async context" + ); + + let pessimistic_result = client.begin_pessimistic(); + assert!( + pessimistic_result.is_err(), + "begin_pessimistic should fail in async context" + ); + + // Verify the error type is correct + match timestamp_result { + Err(tikv_client::Error::NestedRuntimeError(_)) => { + // Expected case - test passes + } + other => panic!("Expected NestedRuntimeError, got: {:?}", other), + } + }); + + Ok(()) +} + #[test] #[serial] fn txn_sync_get_timestamp() -> Result<()> { @@ -167,6 +252,285 @@ fn txn_sync_snapshot() -> Result<()> { Ok(()) } +#[test] +#[serial] +fn txn_sync_snapshot_batch_get() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + + // Write test data + let mut txn = client.begin_optimistic()?; + txn.put("snap_batch_k1".to_owned(), "v1".to_owned())?; + txn.put("snap_batch_k2".to_owned(), "v2".to_owned())?; + txn.put("snap_batch_k3".to_owned(), "v3".to_owned())?; + txn.commit()?; + + // Get snapshot + let ts = client.current_timestamp()?; + let mut snapshot = client.snapshot(ts, TransactionOptions::new_optimistic()); + + // Test batch_get + let keys = vec![ + "snap_batch_k1".to_owned(), + "snap_batch_k2".to_owned(), + "snap_batch_k3".to_owned(), + "snap_batch_k_nonexistent".to_owned(), + ]; + let results: Vec<_> = snapshot.batch_get(keys)?.collect(); + + assert_eq!(results.len(), 3); + + // Convert results to HashMap for order-agnostic verification + let result_map: HashMap = results + .iter() + .map(|kv| (kv.0.clone(), kv.1.clone())) + .collect(); + + // Verify all keys and values (order-agnostic) + assert_eq!( + result_map.get(&Key::from("snap_batch_k1".to_owned())), + Some(&Value::from("v1".to_owned())), + "snap_batch_k1 should have value v1" + ); + assert_eq!( + result_map.get(&Key::from("snap_batch_k2".to_owned())), + Some(&Value::from("v2".to_owned())), + "snap_batch_k2 should have value v2" + ); + assert_eq!( + result_map.get(&Key::from("snap_batch_k3".to_owned())), + Some(&Value::from("v3".to_owned())), + "snap_batch_k3 should have value v3" + ); + + // Verify non-existent key is not in results + assert!( + !result_map.contains_key(&Key::from("snap_batch_k_nonexistent".to_owned())), + "Non-existent key should not be in results" + ); + + // Test edge case: empty key list + let empty_results: Vec<_> = snapshot.batch_get(Vec::::new())?.collect(); + assert_eq!( + empty_results.len(), + 0, + "Empty key list should return no results" + ); + + // Test edge case: all non-existent keys + let nonexistent_keys = vec![ + "snap_batch_k_fake1".to_owned(), + "snap_batch_k_fake2".to_owned(), + "snap_batch_k_fake3".to_owned(), + ]; + let nonexistent_results: Vec<_> = snapshot.batch_get(nonexistent_keys)?.collect(); + assert_eq!( + nonexistent_results.len(), + 0, + "All non-existent keys should return no results" + ); + + // Test edge case: duplicate keys + // Ensure that providing duplicate keys yields one result per unique key. + let duplicate_keys = vec![ + "snap_batch_k1".to_owned(), + "snap_batch_k2".to_owned(), + "snap_batch_k1".to_owned(), // Duplicate + "snap_batch_k3".to_owned(), + "snap_batch_k2".to_owned(), // Duplicate + ]; + let duplicate_results: Vec<_> = snapshot.batch_get(duplicate_keys)?.collect(); + + // Verify API behavior: duplicate keys in the input result in unique keys in the output + let dup_result_map: HashMap = duplicate_results + .iter() + .map(|kv| (kv.0.clone(), kv.1.clone())) + .collect(); + assert_eq!( + dup_result_map.len(), + 3, + "API returns unique results: duplicate keys in input are deduplicated in output" + ); + assert!( + dup_result_map.contains_key(&Key::from("snap_batch_k1".to_owned())), + "snap_batch_k1 should be present" + ); + assert!( + dup_result_map.contains_key(&Key::from("snap_batch_k2".to_owned())), + "snap_batch_k2 should be present" + ); + assert!( + dup_result_map.contains_key(&Key::from("snap_batch_k3".to_owned())), + "snap_batch_k3 should be present" + ); + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_snapshot_scan() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + + // Write test data + let mut txn = client.begin_optimistic()?; + txn.put("snap_scan_1".to_owned(), "value1".to_owned())?; + txn.put("snap_scan_2".to_owned(), "value2".to_owned())?; + txn.put("snap_scan_3".to_owned(), "value3".to_owned())?; + txn.put("snap_scan_4".to_owned(), "value4".to_owned())?; + txn.commit()?; + + // Get snapshot + let ts = client.current_timestamp()?; + let mut snapshot = client.snapshot(ts, TransactionOptions::new_optimistic()); + + // Test scan + let results: Vec<_> = snapshot + .scan("snap_scan_1".to_owned().."snap_scan_4".to_owned(), 10)? + .collect(); + + // snap_scan_1, snap_scan_2, snap_scan_3 (snap_scan_4 is exclusive) + assert_eq!(results.len(), 3); + assert_eq!(results[0].0, Key::from("snap_scan_1".to_owned())); + assert_eq!(results[0].1, Value::from("value1".to_owned())); + assert_eq!(results[1].0, Key::from("snap_scan_2".to_owned())); + assert_eq!(results[1].1, Value::from("value2".to_owned())); + assert_eq!(results[2].0, Key::from("snap_scan_3".to_owned())); + assert_eq!(results[2].1, Value::from("value3".to_owned())); + + // Test scan with limit + let results: Vec<_> = snapshot + .scan("snap_scan_1".to_owned()..="snap_scan_4".to_owned(), 2)? + .collect(); + + assert_eq!(results.len(), 2); // Limited to 2 + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_snapshot_scan_keys() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + + // Write test data + let mut txn = client.begin_optimistic()?; + txn.put("snap_keys_1".to_owned(), "v1".to_owned())?; + txn.put("snap_keys_2".to_owned(), "v2".to_owned())?; + txn.put("snap_keys_3".to_owned(), "v3".to_owned())?; + txn.commit()?; + + // Get snapshot + let ts = client.current_timestamp()?; + let mut snapshot = client.snapshot(ts, TransactionOptions::new_optimistic()); + + // Test scan_keys (only keys, no values) + let keys: Vec<_> = snapshot + .scan_keys("snap_keys_1".to_owned()..="snap_keys_3".to_owned(), 10)? + .collect(); + + assert_eq!(keys.len(), 3); + assert_eq!(keys[0], Key::from("snap_keys_1".to_owned())); + assert_eq!(keys[1], Key::from("snap_keys_2".to_owned())); + assert_eq!(keys[2], Key::from("snap_keys_3".to_owned())); + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_snapshot_scan_reverse() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + + // Write test data + let mut txn = client.begin_optimistic()?; + txn.put("snap_rev_1".to_owned(), "value1".to_owned())?; + txn.put("snap_rev_2".to_owned(), "value2".to_owned())?; + txn.put("snap_rev_3".to_owned(), "value3".to_owned())?; + txn.put("snap_rev_4".to_owned(), "value4".to_owned())?; + txn.commit()?; + + // Get snapshot + let ts = client.current_timestamp()?; + let mut snapshot = client.snapshot(ts, TransactionOptions::new_optimistic()); + + // Test scan_reverse (reverse order) + let results: Vec<_> = snapshot + .scan_reverse("snap_rev_1".to_owned()..="snap_rev_4".to_owned(), 10)? + .collect(); + + assert_eq!(results.len(), 4); + // Should be in reverse order + assert_eq!(results[0].0, Key::from("snap_rev_4".to_owned())); + assert_eq!(results[1].0, Key::from("snap_rev_3".to_owned())); + assert_eq!(results[2].0, Key::from("snap_rev_2".to_owned())); + assert_eq!(results[3].0, Key::from("snap_rev_1".to_owned())); + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_snapshot_scan_keys_reverse() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + + // Write test data + let mut txn = client.begin_optimistic()?; + txn.put("snap_krev_a".to_owned(), "v1".to_owned())?; + txn.put("snap_krev_b".to_owned(), "v2".to_owned())?; + txn.put("snap_krev_c".to_owned(), "v3".to_owned())?; + txn.commit()?; + + // Get snapshot + let ts = client.current_timestamp()?; + let mut snapshot = client.snapshot(ts, TransactionOptions::new_optimistic()); + + // Test scan_keys_reverse (keys only, in reverse order) + let keys: Vec<_> = snapshot + .scan_keys_reverse("snap_krev_a".to_owned()..="snap_krev_c".to_owned(), 10)? + .collect(); + + assert_eq!(keys.len(), 3); + // Should be in reverse order + assert_eq!(keys[0], Key::from("snap_krev_c".to_owned())); + assert_eq!(keys[1], Key::from("snap_krev_b".to_owned())); + assert_eq!(keys[2], Key::from("snap_krev_a".to_owned())); + + Ok(()) +} + +#[test] +#[serial] +fn txn_sync_snapshot_key_exists() -> Result<()> { + init_sync()?; + + let client = sync_client()?; + + // Write test data + let mut txn = client.begin_optimistic()?; + txn.put("snap_exists_key".to_owned(), "value".to_owned())?; + txn.commit()?; + + // Get snapshot + let ts = client.current_timestamp()?; + let mut snapshot = client.snapshot(ts, TransactionOptions::new_optimistic()); + + // Test key_exists + assert!(snapshot.key_exists("snap_exists_key".to_owned())?); + assert!(!snapshot.key_exists("snap_nonexistent_key".to_owned())?); + + Ok(()) +} + #[test] #[serial] fn txn_sync_begin_with_options() -> Result<()> { From 4807831e1bd83e2bdbb5a861cdf619cd6ff888da Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Wed, 28 Jan 2026 11:33:53 +0200 Subject: [PATCH 17/20] remove trailing whitespace Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index 9351f8ca..f6422833 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -16,7 +16,7 @@ use std::sync::Arc; /// runtime and returns `Error::NestedRuntimeError` if one is found, allowing callers /// to detect and handle incorrect use of the synchronous client from within an /// existing async runtime instead of risking deadlocks or unexpected blocking. -/// +/// /// Note: checks only for Tokio runtimes, not other async runtimes. /// /// # Error Handling From 2f6c85fd0811b3099d2d81f2a8466fe165287319 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Wed, 28 Jan 2026 11:39:56 +0200 Subject: [PATCH 18/20] fix compilation error Signed-off-by: Otto Westerlund --- tests/sync_transaction_tests.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/sync_transaction_tests.rs b/tests/sync_transaction_tests.rs index 391a5d21..f6ffea24 100644 --- a/tests/sync_transaction_tests.rs +++ b/tests/sync_transaction_tests.rs @@ -51,15 +51,13 @@ fn test_sync_client_new_inside_async_context() { Config::default().with_default_keyspace(), ); - // Should return an error, not panic - assert!(result.is_err()); - // Verify the error type is correct - match result.unwrap_err() { - tikv_client::Error::NestedRuntimeError(_) => { + match result { + Err(tikv_client::Error::NestedRuntimeError(_)) => { // Expected case - test passes } - other => panic!("Expected NestedRuntimeError, got: {:?}", other), + Err(other) => panic!("Expected NestedRuntimeError, got: {:?}", other), + Ok(_) => panic!("Expected error but got Ok"), } }); } From d625c21d652567da14747943b2695b81ceb42137 Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Wed, 28 Jan 2026 13:40:59 +0200 Subject: [PATCH 19/20] error handling to sync_snapshot and sync_transaction to prevent panics Signed-off-by: Otto Westerlund --- src/transaction/sync_client.rs | 4 ++-- src/transaction/sync_snapshot.rs | 16 ++++++------- src/transaction/sync_transaction.rs | 36 ++++++++++++++--------------- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index f6422833..eccc43c1 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -25,7 +25,7 @@ use std::sync::Arc; /// - Use the async [`TransactionClient`](crate::TransactionClient) instead of `SyncTransactionClient` /// - Move the `SyncTransactionClient` creation and usage outside of the async context /// - Consider restructuring the code to avoid mixing sync and async execution contexts -fn check_nested_runtime() -> Result<()> { +pub(crate) fn check_nested_runtime() -> Result<()> { if tokio::runtime::Handle::try_current().is_ok() { return Err(Error::NestedRuntimeError( "Nested Tokio runtime detected: cannot use SyncTransactionClient from within an async context. \ @@ -52,7 +52,7 @@ fn check_nested_runtime() -> Result<()> { /// thread when this function is called. /// - `Err(e)` for any other [`Error`] produced either by the future itself or by /// `runtime.block_on`. -fn safe_block_on(runtime: &tokio::runtime::Runtime, future: F) -> Result +pub(crate) fn safe_block_on(runtime: &tokio::runtime::Runtime, future: F) -> Result where F: std::future::Future>, { diff --git a/src/transaction/sync_snapshot.rs b/src/transaction/sync_snapshot.rs index 7f12751e..af2ae15b 100644 --- a/src/transaction/sync_snapshot.rs +++ b/src/transaction/sync_snapshot.rs @@ -1,3 +1,4 @@ +use crate::transaction::sync_client::safe_block_on; use crate::{BoundRange, Key, KvPair, Result, Snapshot, Value}; use std::sync::Arc; @@ -17,12 +18,12 @@ impl SyncSnapshot { /// Get the value associated with the given key. pub fn get(&mut self, key: impl Into) -> Result> { - self.runtime.block_on(self.inner.get(key)) + safe_block_on(&self.runtime, self.inner.get(key)) } /// Check whether the key exists. pub fn key_exists(&mut self, key: impl Into) -> Result { - self.runtime.block_on(self.inner.key_exists(key)) + safe_block_on(&self.runtime, self.inner.key_exists(key)) } /// Get the values associated with the given keys. @@ -30,7 +31,7 @@ impl SyncSnapshot { &mut self, keys: impl IntoIterator>, ) -> Result> { - self.runtime.block_on(self.inner.batch_get(keys)) + safe_block_on(&self.runtime, self.inner.batch_get(keys)) } /// Scan a range, return at most `limit` key-value pairs that lie in the range. @@ -39,7 +40,7 @@ impl SyncSnapshot { range: impl Into, limit: u32, ) -> Result> { - self.runtime.block_on(self.inner.scan(range, limit)) + safe_block_on(&self.runtime, self.inner.scan(range, limit)) } /// Scan a range, return at most `limit` keys that lie in the range. @@ -48,7 +49,7 @@ impl SyncSnapshot { range: impl Into, limit: u32, ) -> Result> { - self.runtime.block_on(self.inner.scan_keys(range, limit)) + safe_block_on(&self.runtime, self.inner.scan_keys(range, limit)) } /// Similar to scan, but in the reverse direction. @@ -57,7 +58,7 @@ impl SyncSnapshot { range: impl Into, limit: u32, ) -> Result> { - self.runtime.block_on(self.inner.scan_reverse(range, limit)) + safe_block_on(&self.runtime, self.inner.scan_reverse(range, limit)) } /// Similar to scan_keys, but in the reverse direction. @@ -66,7 +67,6 @@ impl SyncSnapshot { range: impl Into, limit: u32, ) -> Result> { - self.runtime - .block_on(self.inner.scan_keys_reverse(range, limit)) + safe_block_on(&self.runtime, self.inner.scan_keys_reverse(range, limit)) } } diff --git a/src/transaction/sync_transaction.rs b/src/transaction/sync_transaction.rs index 8f541a4a..0d089423 100644 --- a/src/transaction/sync_transaction.rs +++ b/src/transaction/sync_transaction.rs @@ -1,3 +1,4 @@ +use crate::transaction::sync_client::safe_block_on; use crate::{ transaction::Mutation, BoundRange, Key, KvPair, Result, Timestamp, Transaction, Value, }; @@ -19,17 +20,17 @@ impl SyncTransaction { /// Get the value associated with the given key. pub fn get(&mut self, key: impl Into) -> Result> { - self.runtime.block_on(self.inner.get(key)) + safe_block_on(&self.runtime, self.inner.get(key)) } /// Get the value associated with the given key, and lock the key. pub fn get_for_update(&mut self, key: impl Into) -> Result> { - self.runtime.block_on(self.inner.get_for_update(key)) + safe_block_on(&self.runtime, self.inner.get_for_update(key)) } /// Check if the given key exists. pub fn key_exists(&mut self, key: impl Into) -> Result { - self.runtime.block_on(self.inner.key_exists(key)) + safe_block_on(&self.runtime, self.inner.key_exists(key)) } /// Get the values associated with the given keys. @@ -37,7 +38,7 @@ impl SyncTransaction { &mut self, keys: impl IntoIterator>, ) -> Result> { - self.runtime.block_on(self.inner.batch_get(keys)) + safe_block_on(&self.runtime, self.inner.batch_get(keys)) } /// Get the values associated with the given keys, and lock the keys. @@ -45,7 +46,7 @@ impl SyncTransaction { &mut self, keys: impl IntoIterator>, ) -> Result> { - self.runtime.block_on(self.inner.batch_get_for_update(keys)) + safe_block_on(&self.runtime, self.inner.batch_get_for_update(keys)) } /// Scan a range and return the key-value pairs. @@ -54,7 +55,7 @@ impl SyncTransaction { range: impl Into, limit: u32, ) -> Result> { - self.runtime.block_on(self.inner.scan(range, limit)) + safe_block_on(&self.runtime, self.inner.scan(range, limit)) } /// Scan a range and return only the keys. @@ -63,7 +64,7 @@ impl SyncTransaction { range: impl Into, limit: u32, ) -> Result> { - self.runtime.block_on(self.inner.scan_keys(range, limit)) + safe_block_on(&self.runtime, self.inner.scan_keys(range, limit)) } /// Scan a range in reverse order. @@ -72,7 +73,7 @@ impl SyncTransaction { range: impl Into, limit: u32, ) -> Result> { - self.runtime.block_on(self.inner.scan_reverse(range, limit)) + safe_block_on(&self.runtime, self.inner.scan_reverse(range, limit)) } /// Scan keys in a range in reverse order. @@ -81,47 +82,46 @@ impl SyncTransaction { range: impl Into, limit: u32, ) -> Result> { - self.runtime - .block_on(self.inner.scan_keys_reverse(range, limit)) + safe_block_on(&self.runtime, self.inner.scan_keys_reverse(range, limit)) } /// Set the value associated with the given key. pub fn put(&mut self, key: impl Into, value: impl Into) -> Result<()> { - self.runtime.block_on(self.inner.put(key, value)) + safe_block_on(&self.runtime, self.inner.put(key, value)) } /// Insert the key-value pair. Returns an error if the key already exists. pub fn insert(&mut self, key: impl Into, value: impl Into) -> Result<()> { - self.runtime.block_on(self.inner.insert(key, value)) + safe_block_on(&self.runtime, self.inner.insert(key, value)) } /// Delete the given key. pub fn delete(&mut self, key: impl Into) -> Result<()> { - self.runtime.block_on(self.inner.delete(key)) + safe_block_on(&self.runtime, self.inner.delete(key)) } /// Apply multiple mutations atomically. pub fn batch_mutate(&mut self, mutations: impl IntoIterator) -> Result<()> { - self.runtime.block_on(self.inner.batch_mutate(mutations)) + safe_block_on(&self.runtime, self.inner.batch_mutate(mutations)) } /// Lock the given keys without associating any values. pub fn lock_keys(&mut self, keys: impl IntoIterator>) -> Result<()> { - self.runtime.block_on(self.inner.lock_keys(keys)) + safe_block_on(&self.runtime, self.inner.lock_keys(keys)) } /// Commit the transaction. pub fn commit(&mut self) -> Result> { - self.runtime.block_on(self.inner.commit()) + safe_block_on(&self.runtime, self.inner.commit()) } /// Rollback the transaction. pub fn rollback(&mut self) -> Result<()> { - self.runtime.block_on(self.inner.rollback()) + safe_block_on(&self.runtime, self.inner.rollback()) } /// Send a heart beat message to keep the transaction alive. pub fn send_heart_beat(&mut self) -> Result { - self.runtime.block_on(self.inner.send_heart_beat()) + safe_block_on(&self.runtime, self.inner.send_heart_beat()) } } From 715426519189870ff2a819548764032ede6cedae Mon Sep 17 00:00:00 2001 From: Otto Westerlund Date: Sun, 15 Feb 2026 16:15:56 +0200 Subject: [PATCH 20/20] eliminate duplicate error message Signed-off-by: Otto Westerlund --- src/common/errors.rs | 4 ++-- src/transaction/sync_client.rs | 6 +----- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/common/errors.rs b/src/common/errors.rs index 3ca8b2a5..f9a99d2f 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -130,8 +130,8 @@ pub enum Error { TxnNotFound(kvrpcpb::TxnNotFound), /// Attempted to create or use the sync client (including calling its methods) from within a Tokio async runtime context #[error( - "Cannot use SyncTransactionClient from within a Tokio async runtime context: {0}. \ -Use TransactionClient instead or move SyncTransactionClient usage outside the async context." + "Nested Tokio runtime detected: cannot use SyncTransactionClient from within an async context. \ +Use the async TransactionClient instead, or create and use SyncTransactionClient outside of any Tokio runtime.{0}" )] NestedRuntimeError(String), } diff --git a/src/transaction/sync_client.rs b/src/transaction/sync_client.rs index eccc43c1..50a25542 100644 --- a/src/transaction/sync_client.rs +++ b/src/transaction/sync_client.rs @@ -27,11 +27,7 @@ use std::sync::Arc; /// - Consider restructuring the code to avoid mixing sync and async execution contexts pub(crate) fn check_nested_runtime() -> Result<()> { if tokio::runtime::Handle::try_current().is_ok() { - return Err(Error::NestedRuntimeError( - "Nested Tokio runtime detected: cannot use SyncTransactionClient from within an async context. \ - Use the async TransactionClient instead, or create and use SyncTransactionClient outside of any Tokio runtime." - .to_string(), - )); + return Err(Error::NestedRuntimeError(String::new())); } Ok(()) }