diff --git a/modules/tests/test_db/harness/harness.sh b/modules/tests/test_db/harness/harness.sh index 291a538e..c4ddbe6e 100755 --- a/modules/tests/test_db/harness/harness.sh +++ b/modules/tests/test_db/harness/harness.sh @@ -62,6 +62,10 @@ curl -d "delete_check_returned_data:some_key:a" http://$PLAID_LOCATION/webhook/$ curl -d "insert:some_key:some_value" http://$PLAID_LOCATION/webhook/$URL curl -d "delete_check_returned_data:some_key:some_value" http://$PLAID_LOCATION/webhook/$URL # the DB is empty +curl -d "insert_batch:key1=value1|key2=value2|key3=value3" http://$PLAID_LOCATION/webhook/$URL +curl -d "get:key1" http://$PLAID_LOCATION/webhook/$URL +curl -d "get:key2" http://$PLAID_LOCATION/webhook/$URL +curl -d "get:key3" http://$PLAID_LOCATION/webhook/$URL sleep 2 @@ -83,8 +87,11 @@ kill $RH_PID 2>&1 > /dev/null # OK # OK # OK +# value1 +# value2 +# value3 -echo -e "Empty\nEmpty\nfirst_value\nEmpty\nsecond_value\nEmpty\nEmpty\nEmpty\na\naaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\nOK\nOK\nOK\nOK\nOK" > expected.txt +echo -e "Empty\nEmpty\nfirst_value\nEmpty\nsecond_value\nEmpty\nEmpty\nEmpty\na\naaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\nOK\nOK\nOK\nOK\nOK\nvalue1\nvalue2\nvalue3" > expected.txt diff expected.txt $FILE RESULT=$? diff --git a/modules/tests/test_db/src/lib.rs b/modules/tests/test_db/src/lib.rs index 66370c58..b82fe0c0 100644 --- a/modules/tests/test_db/src/lib.rs +++ b/modules/tests/test_db/src/lib.rs @@ -26,6 +26,20 @@ fn handle_post(log: &str) -> Result<(), i32> { "insert" => { plaid::storage::insert(parts[1], parts[2].as_bytes()).unwrap(); } + "insert_batch" => { + // Format: insert_batch:key1=value1|key2=value2|... + let items: Vec = parts[1] + .split('|') + .map(|pair| { + let (key, value) = pair.split_once('=').expect("invalid key=value pair"); + plaid::storage::Item { + key: key.to_string(), + value: value.as_bytes().to_vec(), + } + }) + .collect(); + plaid::storage::insert_batch(&items).unwrap(); + } "delete" => { plaid::storage::delete(parts[1]).unwrap(); } diff --git a/modules/tests/test_shared_db_rule_1/harness/harness.sh b/modules/tests/test_shared_db_rule_1/harness/harness.sh index 8def45eb..516e65f3 100755 --- a/modules/tests/test_shared_db_rule_1/harness/harness.sh +++ b/modules/tests/test_shared_db_rule_1/harness/harness.sh @@ -27,6 +27,12 @@ curl -d "delete and check" http://$PLAID_LOCATION/webhook/$URL2 sleep 2 curl -d "read after deletion" http://$PLAID_LOCATION/webhook/$URL1 sleep 2 +curl -d "insert batch and check" http://$PLAID_LOCATION/webhook/$URL2 +sleep 2 +curl -d "read after batch insert" http://$PLAID_LOCATION/webhook/$URL1 +sleep 2 +curl -d "delete batch inserts" http://$PLAID_LOCATION/webhook/$URL2 +sleep 2 curl -d "fill up the db" http://$PLAID_LOCATION/webhook/$URL2 sleep 2 curl -d "write to full db" http://$PLAID_LOCATION/webhook/$URL2 @@ -36,7 +42,7 @@ sleep 2 kill $RH_PID 2>&1 > /dev/null -echo -e "OK\nOK\nOK\nOK\nOK\nOK\nOK\nOK" > expected.txt +echo -e "OK\nOK\nOK\nOK\nOK\nOK\nOK\nOK\nOK\nOK" > expected.txt diff expected.txt $FILE RESULT=$? diff --git a/modules/tests/test_shared_db_rule_1/src/lib.rs b/modules/tests/test_shared_db_rule_1/src/lib.rs index 8269de03..e31c47e9 100644 --- a/modules/tests/test_shared_db_rule_1/src/lib.rs +++ b/modules/tests/test_shared_db_rule_1/src/lib.rs @@ -47,6 +47,19 @@ fn main(log: String, _: LogSource) -> Result<(), i32> { } make_named_request("test-response", "OK", HashMap::new()).unwrap(); } + "read after batch insert" => { + for i in 0..3 { + let key = format!("key{i}"); + let expected_value = format!("value{i}"); + + let actual_value = plaid::storage::get_shared(SHARED_DB, &key).unwrap(); + + if expected_value.as_bytes() != actual_value { + panic!("Return value does not match expected value for key {key}"); + } + } + make_named_request("test-response", "OK", HashMap::new()).unwrap(); + } _ => panic!("Got an unexpected log"), } diff --git a/modules/tests/test_shared_db_rule_2/src/lib.rs b/modules/tests/test_shared_db_rule_2/src/lib.rs index 80316554..2a15ca51 100644 --- a/modules/tests/test_shared_db_rule_2/src/lib.rs +++ b/modules/tests/test_shared_db_rule_2/src/lib.rs @@ -1,13 +1,22 @@ use std::collections::HashMap; -use plaid_stl::{entrypoint_with_source, messages::LogSource, network::make_named_request, plaid}; +use plaid_stl::{ + entrypoint_with_source, + messages::LogSource, + network::make_named_request, + plaid::{self, storage::Item}, +}; entrypoint_with_source!(); const SHARED_DB: &str = "shared_db_1"; const RULE_NAME: &str = "test_shared_db_rule_2"; +const BATCH_INSERT_SIZE: usize = 3; + fn main(log: String, _: LogSource) -> Result<(), i32> { + let batch_insert_items = generate_batch_insert_items(); + // Depending on the value of "log", we do different things match log.as_str() { "write and check" => { @@ -59,11 +68,22 @@ fn main(log: String, _: LogSource) -> Result<(), i32> { "[{RULE_NAME}] Writing to a full shared DB, should fail..." )); match plaid::storage::insert_shared(SHARED_DB, "another_key", &vec![0u8]) { - Ok(_) => panic!("This should have failed"), + Ok(_) => panic!("Single insert on a full DB should have failed"), + Err(_) => { + plaid::print_debug_string(&format!( + "[{RULE_NAME}] Failed as expected on single item insert" + )); + } + } + match plaid::storage::insert_batch_shared(SHARED_DB, &batch_insert_items) { + Ok(_) => panic!("Batch insert on a full DB should have failed"), Err(_) => { - plaid::print_debug_string(&format!("[{RULE_NAME}] Failed as expected")); + plaid::print_debug_string(&format!( + "[{RULE_NAME}] Failed as expected on batch insert" + )); } } + make_named_request("test-response", "OK", HashMap::new()).unwrap(); } "write to non-existing db" => { @@ -71,15 +91,70 @@ fn main(log: String, _: LogSource) -> Result<(), i32> { "[{RULE_NAME}] Writing to a non-existing shared DB, should fail..." )); match plaid::storage::insert_shared("this_does_not_exist", "some_key", &vec![0u8]) { - Ok(_) => panic!("This should have failed"), + Ok(_) => panic!("Single insert to non-existant DB should have failed"), Err(_) => { - plaid::print_debug_string(&format!("[{RULE_NAME}] Failed as expected")); + plaid::print_debug_string(&format!( + "[{RULE_NAME}] Failed as expected on single item write to non-existent DB" + )); + } + } + match plaid::storage::insert_batch_shared("this_does_not_exist", &batch_insert_items) { + Ok(_) => panic!("Batch insert to non-existant DB should have failed"), + Err(_) => { + plaid::print_debug_string(&format!( + "[{RULE_NAME}] Failed as expected on batch item write to non-existent DB" + )); } } make_named_request("test-response", "OK", HashMap::new()).unwrap(); } + "insert batch and check" => { + plaid::print_debug_string(&format!( + "[{RULE_NAME}] Writing {BATCH_INSERT_SIZE} items to DB..." + )); + + plaid::storage::insert_batch_shared(SHARED_DB, &batch_insert_items).unwrap(); + + for item in batch_insert_items { + let returned_val = plaid::storage::get_shared(SHARED_DB, &item.key).unwrap(); + + if returned_val != item.value { + panic!( + "Returned value does not match expected value for key: {}", + item.key + ) + } + } + make_named_request("test-response", "OK", HashMap::new()).unwrap(); + } + "delete batch inserts" => { + plaid::print_debug_string(&format!( + "[{RULE_NAME}] Deleting {BATCH_INSERT_SIZE} items from DB..." + )); + + for item in batch_insert_items { + plaid::storage::delete_shared(SHARED_DB, &item.key).unwrap(); + } + } _ => panic!("Got an unexpected log"), } Ok(()) } + +fn generate_batch_insert_items() -> Vec { + let mut items = vec![]; + for i in 0..BATCH_INSERT_SIZE { + let key = format!("key{i}"); + let value = format!("value{i}"); + + let item = Item { + key, + value: value.as_bytes().to_vec(), + }; + + items.push(item) + } + + items +} diff --git a/runtime/plaid-stl/src/plaid/storage.rs b/runtime/plaid-stl/src/plaid/storage.rs index f5c49359..42759555 100644 --- a/runtime/plaid-stl/src/plaid/storage.rs +++ b/runtime/plaid-stl/src/plaid/storage.rs @@ -1,11 +1,20 @@ use std::fmt::Display; +use serde::{Deserialize, Serialize}; + use crate::PlaidFunctionError; pub enum StorageError { BufferSizeMismatch, } +/// A key/value pair for use with batch insert operations. +#[derive(Serialize, Deserialize)] +pub struct Item { + pub key: String, + pub value: Vec, +} + pub fn insert(key: &str, value: &[u8]) -> Result, PlaidFunctionError> { extern "C" { /// Send a request to store this data in whatever persistence system Plaid has configured. @@ -122,6 +131,55 @@ pub fn insert_shared( } } +/// Insert multiple key/value pairs in a single atomic batch operation. +pub fn insert_batch(items: &[Item]) -> Result<(), PlaidFunctionError> { + extern "C" { + fn storage_insert_batch(items_buf: *const u8, items_buf_len: usize) -> i32; + } + + let items_json = + serde_json::to_vec(items).map_err(|_| PlaidFunctionError::ErrorCouldNotSerialize)?; + + let result = unsafe { storage_insert_batch(items_json.as_ptr(), items_json.len()) }; + + if result == 0 { + Ok(()) + } else { + Err(result.into()) + } +} + +/// Insert multiple key/value pairs into a shared namespace in a single atomic batch operation. +pub fn insert_batch_shared(namespace: &str, items: &[Item]) -> Result<(), PlaidFunctionError> { + extern "C" { + fn storage_insert_batch_shared( + namespace: *const u8, + namespace_len: usize, + items_buf: *const u8, + items_buf_len: usize, + ) -> i32; + } + + let namespace_bytes = namespace.as_bytes(); + let items_json = + serde_json::to_vec(items).map_err(|_| PlaidFunctionError::ErrorCouldNotSerialize)?; + + let result = unsafe { + storage_insert_batch_shared( + namespace_bytes.as_ptr(), + namespace_bytes.len(), + items_json.as_ptr(), + items_json.len(), + ) + }; + + if result == 0 { + Ok(()) + } else { + Err(result.into()) + } +} + pub fn get(key: &str) -> Result, PlaidFunctionError> { extern "C" { fn storage_get(key: *const u8, key_len: usize, data: *const u8, data_len: usize) -> i32; diff --git a/runtime/plaid/src/functions/api.rs b/runtime/plaid/src/functions/api.rs index c54af192..77c24efb 100644 --- a/runtime/plaid/src/functions/api.rs +++ b/runtime/plaid/src/functions/api.rs @@ -666,6 +666,12 @@ pub fn to_api_function( "storage_insert_shared" => { Function::new_typed_with_env(&mut store, &env, super::storage::insert_shared) } + "storage_insert_batch" => { + Function::new_typed_with_env(&mut store, &env, super::storage::insert_batch) + } + "storage_insert_batch_shared" => { + Function::new_typed_with_env(&mut store, &env, super::storage::insert_batch_shared) + } "storage_get" => Function::new_typed_with_env(&mut store, &env, super::storage::get), "storage_get_shared" => { Function::new_typed_with_env(&mut store, &env, super::storage::get_shared) diff --git a/runtime/plaid/src/functions/storage/insert.rs b/runtime/plaid/src/functions/storage/insert.rs index beeb1482..aada310e 100644 --- a/runtime/plaid/src/functions/storage/insert.rs +++ b/runtime/plaid/src/functions/storage/insert.rs @@ -1,5 +1,6 @@ use std::sync::{Arc, RwLock}; +use plaid_stl::plaid::storage::Item; use wasmer::{AsStoreRef, FunctionEnvMut, MemoryView, WasmPtr}; use crate::{executor::Env, functions::FunctionErrors, loader::LimitValue, storage::Storage}; @@ -9,6 +10,216 @@ use super::{ safely_write_data_back, }; +/// Insert multiple key/value pairs into the storage system in a single batch operation. +/// +/// The guest passes a single JSON-encoded `Vec` in `items_buf`. +/// Returns 0 on success. +pub fn insert_batch(env: FunctionEnvMut, items_buf: WasmPtr, items_buf_len: u32) -> i32 { + let store = env.as_store_ref(); + let env_data = env.data(); + + let Some(storage) = &env_data.storage else { + return FunctionErrors::ApiNotConfigured as i32; + }; + + let memory_view = match get_memory(&env, &store) { + Ok(memory_view) => memory_view, + Err(e) => { + error!( + "{}: Memory error in storage_insert_batch: {e:?}", + env_data.module.name, + ); + return FunctionErrors::CouldNotGetAdequateMemory as i32; + } + }; + + safely_get_guest_string!(items_json, memory_view, items_buf, items_buf_len, env_data); + + match insert_batch_common( + env_data, + storage, + env_data.module.name.clone(), + items_json, + env_data.module.storage_limit.clone(), + env_data.module.storage_current.clone(), + ) { + Ok(code) => code, + Err(e) => e as i32, + } +} + +/// Insert multiple key/value pairs into a shared namespace in a single batch operation. +/// +/// The guest passes a single JSON-encoded `Vec` in `items_buf`. +/// Returns 0 on success. +pub fn insert_batch_shared( + env: FunctionEnvMut, + namespace_buf: WasmPtr, + namespace_buf_len: u32, + items_buf: WasmPtr, + items_buf_len: u32, +) -> i32 { + let store = env.as_store_ref(); + let env_data = env.data(); + + let Some(storage) = &env_data.storage else { + return FunctionErrors::ApiNotConfigured as i32; + }; + + let Some(shared_dbs) = &storage.shared_dbs else { + return FunctionErrors::OperationNotAllowed as i32; + }; + + let memory_view = match get_memory(&env, &store) { + Ok(memory_view) => memory_view, + Err(e) => { + error!( + "{}: Memory error in storage_insert_batch_shared: {e:?}", + env_data.module.name, + ); + return FunctionErrors::CouldNotGetAdequateMemory as i32; + } + }; + + safely_get_guest_string!( + namespace, + memory_view, + namespace_buf, + namespace_buf_len, + env_data + ); + + let Some(db) = shared_dbs.get(&namespace) else { + return FunctionErrors::SharedDbError as i32; + }; + + if !db.config.rw.contains(&env_data.module.name) { + return FunctionErrors::OperationNotAllowed as i32; + } + + safely_get_guest_string!(items_json, memory_view, items_buf, items_buf_len, env_data); + + match insert_batch_common( + env_data, + storage, + namespace, + items_json, + db.config.size_limit.clone(), + db.used_storage.clone(), + ) { + Ok(code) => code, + Err(e) => e as i32, + } +} + +/// Code common to [`insert_batch`] and [`insert_batch_shared`]. +/// +/// `items_json` is a JSON-encoded `Vec`. +/// Storage-limit accounting mirrors [`insert_common`]: for each item we subtract the size of any +/// existing data that would be overwritten. +fn insert_batch_common( + env_data: &Env, + storage: &Arc, + namespace: String, + items_json: String, + storage_limit: LimitValue, + storage_counter: Arc>, +) -> Result { + let items: Vec = match serde_json::from_str(&items_json) { + Ok(v) => v, + Err(e) => { + error!( + "{}: Failed to deserialize items for storage_insert_batch: {e}", + env_data.module.name, + ); + return Err(FunctionErrors::ErrorCouldNotSerialize); + } + }; + + info!( + "[{}]: batch inserting {} items to namespace {namespace}", + env_data.module.name, + items.len() + ); + + // For a limited namespace, check that the entire batch would fit before writing anything. + match storage_limit { + LimitValue::Unlimited => { + // The storage is unlimited, so we don't check / update any counters and just proceed with the operation + let result = env_data + .api + .clone() + .runtime + .block_on(async move { storage.insert_batch(namespace, items).await }); + + match result { + Ok(()) => Ok(0), + Err(e) => { + error!( + "{}: Storage error during insert_batch: {e}", + env_data.module.name, + ); + Err(FunctionErrors::InternalApiError) + } + } + } + LimitValue::Limited(limit) => { + let mut storage_current = match storage_counter.write() { + Ok(g) => g, + Err(e) => { + error!("Critical error getting a lock on used storage: {e:?}"); + return Err(FunctionErrors::InternalApiError); + } + }; + + // Compute the net byte delta for the whole batch, accounting for any data that would + // be overwritten by keys that already exist. + let mut net_delta: i64 = 0; + for item in &items { + let key_len = item.key.as_bytes().len() as u64; + let existing = fetch_existing_data_size(env_data, storage, &namespace, &item.key)?; + + // New contribution: key + value. Subtract what was already counted. + net_delta += (key_len + item.value.len() as u64) as i64 - existing as i64; + } + + let would_be_used = (*storage_current as i64 + net_delta) as u64; + if would_be_used > limit { + error!( + "{}: Batch insert rejected: would exceed the configured storage limit.", + env_data.module.name, + ); + let _ = env_data.external_logging_system.log_module_error( + env_data.module.name.clone(), + "Batch insert rejected: would exceed the configured storage limit.".to_string(), + vec![], + ); + return Err(FunctionErrors::StorageLimitReached); + } + + let result = env_data + .api + .clone() + .runtime + .block_on(async move { storage.insert_batch(namespace, items).await }); + + match result { + Ok(()) => { + *storage_current = would_be_used; + Ok(0) + } + Err(e) => { + error!( + "{}: Storage error during insert_batch: {e}", + env_data.module.name, + ); + Err(FunctionErrors::InternalApiError) + } + } + } + } +} + /// Store data in the storage system if one is configured pub fn insert( env: FunctionEnvMut, @@ -116,7 +327,7 @@ pub fn insert_shared( match insert_common( env_data, storage, - env_data.module.name.clone(), + namespace, key, value, memory_view, diff --git a/runtime/plaid/src/functions/storage/mod.rs b/runtime/plaid/src/functions/storage/mod.rs index 243081fb..2fa924d6 100644 --- a/runtime/plaid/src/functions/storage/mod.rs +++ b/runtime/plaid/src/functions/storage/mod.rs @@ -5,7 +5,7 @@ use super::{ pub use delete::{delete, delete_shared}; pub use get::{get, get_shared}; -pub use insert::{insert, insert_shared}; +pub use insert::{insert, insert_batch, insert_batch_shared, insert_shared}; pub use list::{list_keys, list_keys_shared}; macro_rules! safely_get_guest_string { diff --git a/runtime/plaid/src/storage/dynamodb/mod.rs b/runtime/plaid/src/storage/dynamodb/mod.rs index dd1de2ff..85213fcc 100644 --- a/runtime/plaid/src/storage/dynamodb/mod.rs +++ b/runtime/plaid/src/storage/dynamodb/mod.rs @@ -5,12 +5,13 @@ use std::collections::HashMap; use async_trait::async_trait; use aws_sdk_dynamodb::{ - types::{AttributeValue, KeyType}, + types::{AttributeValue, KeyType, Put, TransactWriteItem}, Client, }; +use aws_sdk_s3::primitives::Blob; use serde::Deserialize; -use crate::{get_aws_sdk_config, AwsAuthentication}; +use crate::{get_aws_sdk_config, storage::Item, AwsAuthentication}; use super::{StorageError, StorageProvider}; @@ -83,6 +84,30 @@ impl StorageProvider for DynamoDb { true } + async fn insert_batch(&self, namespace: String, items: Vec) -> Result<(), StorageError> { + let write_items = items + .into_iter() + .map(|item| { + let put = Put::builder() + .item(item.key, AttributeValue::B(Blob::new(item.value))) + .table_name(&namespace) + .build() + .map_err(|e| StorageError::BuildError(e.to_string()))?; + + Ok(TransactWriteItem::builder().put(put).build()) + }) + .collect::, StorageError>>()?; + + self.client + .transact_write_items() + .set_transact_items(Some(write_items)) + .send() + .await + .map_err(|e| StorageError::BatchWriteError(e.to_string()))?; + + Ok(()) + } + async fn insert( &self, namespace: String, diff --git a/runtime/plaid/src/storage/in_memory/mod.rs b/runtime/plaid/src/storage/in_memory/mod.rs index 9d31e8cf..562df2c1 100644 --- a/runtime/plaid/src/storage/in_memory/mod.rs +++ b/runtime/plaid/src/storage/in_memory/mod.rs @@ -1,5 +1,7 @@ //! This module provides a way for Plaid to use an in-memory store as a DB. Note - This storage is not persisted across reboots. +use crate::storage::Item; + use super::{StorageError, StorageProvider}; use async_trait::async_trait; use std::{collections::HashMap, sync::Arc}; @@ -23,6 +25,16 @@ impl StorageProvider for InMemoryDb { false } + async fn insert_batch(&self, namespace: String, items: Vec) -> Result<(), StorageError> { + let mut db = self.db.write().await; + let ns = db.entry(namespace).or_default(); + for item in items { + ns.insert(item.key, item.value); + } + + Ok(()) + } + async fn insert( &self, namespace: String, diff --git a/runtime/plaid/src/storage/mod.rs b/runtime/plaid/src/storage/mod.rs index 0e345889..6587909a 100644 --- a/runtime/plaid/src/storage/mod.rs +++ b/runtime/plaid/src/storage/mod.rs @@ -15,6 +15,7 @@ pub mod in_memory; use futures_util::future::join_all; use in_memory::InMemoryDb; +use plaid_stl::plaid::storage::Item; use serde::Deserialize; use crate::loader::LimitValue; @@ -74,6 +75,9 @@ pub enum StorageError { CouldNotAccessStorage(String), Access(String), SharedDbError(String), + Unimplemented { operation: String, provider: String }, + BuildError(String), + BatchWriteError(String), } impl std::fmt::Display for StorageError { @@ -90,6 +94,19 @@ impl std::fmt::Display for StorageError { Self::SharedDbError(ref e) => { write!(f, "Error while attempting an operation on a shared DB: {e}") } + Self::Unimplemented { + operation, + provider, + } => { + write!( + f, + "The {operation} operation is not available for {provider}" + ) + } + Self::BuildError(e) => write!(f, "Failed to build: {e}"), + Self::BatchWriteError(e) => { + write!(f, "Error while attempting to batch write items: {e}") + } } } } @@ -109,6 +126,7 @@ pub trait StorageProvider { key: String, value: Vec, ) -> Result>, StorageError>; + async fn insert_batch(&self, namespace: String, items: Vec) -> Result<(), StorageError>; /// Get a value by key from the storage provider. If the key doesn't exist, then it will /// return Ok(None) signifying the storage provider was successfully able to identify /// the key was not set. @@ -225,6 +243,14 @@ impl Storage { self.database.insert(namespace, key, value).await } + pub async fn insert_batch( + &self, + namespace: String, + items: Vec, + ) -> Result<(), StorageError> { + self.database.insert_batch(namespace, items).await + } + pub async fn get(&self, namespace: &str, key: &str) -> Result>, StorageError> { self.database.get(namespace, key).await } diff --git a/runtime/plaid/src/storage/sled/mod.rs b/runtime/plaid/src/storage/sled/mod.rs index 3c28ba19..674e0b2c 100644 --- a/runtime/plaid/src/storage/sled/mod.rs +++ b/runtime/plaid/src/storage/sled/mod.rs @@ -6,7 +6,7 @@ use serde::Deserialize; use sled::Db; -use super::{StorageError, StorageProvider}; +use super::{Item, StorageError, StorageProvider}; /// Configuration for a Sled DB #[derive(Deserialize)] @@ -33,6 +33,17 @@ impl StorageProvider for Sled { true } + async fn insert_batch( + &self, + _namespace: String, + _items: Vec, + ) -> Result<(), StorageError> { + Err(StorageError::Unimplemented { + operation: "insert_batch".to_string(), + provider: "Sled".to_string(), + }) + } + async fn insert( &self, namespace: String,