diff --git a/Cargo.toml b/Cargo.toml index cc9ed03..251b499 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "indradb-sled" -version = "0.1.0" +version = "4.0.0-RC1" authors = ["Yusuf Simonson "] description = "A sled-backed datastore for IndraDB" homepage = "https://indradb.github.io" @@ -8,6 +8,7 @@ repository = "https://github.com/indradb/sled" keywords = ["graph", "database"] categories = ["database", "database-implementations"] license = "MPL-2.0" +edition = "2021" [lib] name = "indradb_sled" @@ -19,9 +20,11 @@ test-suite = ["indradb-lib/test-suite", "tempfile"] bench-suite = ["indradb-lib/bench-suite", "tempfile"] [dependencies] -chrono = { version = "0.4.19", features = ["serde"] } -indradb-lib = "^2.2.0" -serde_json = "^1.0.57" -sled = { version = "0.34.6", features = ["compression", "no_metrics"] } -tempfile = { version = "^3.2.0", optional = true} -uuid = { version = "~0.8.2", features = ["v1", "serde"] } +chrono = { version = "0.4", features = ["serde"] } +indradb-lib = "4.0" +serde_json = "1.0" +sled = { version = "0.34", features = ["compression"] } +tempfile = { version = "3.10", optional = true } +uuid = { version = "1.9", features = ["v1", "serde"] } +ecow = { version = "0.2.2" } +thiserror = { version = "1.0" } \ No newline at end of file diff --git a/README.md b/README.md index a605476..7b88bdd 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,18 @@ This is an implementation of the IndraDB datastore for sled. -The sled datastore is not production-ready yet. sled itself is pre-1.0, and makes no guarantees about on-disk format stability. Upgrading IndraDB may require you to [manually migrate the sled datastore.](https://docs.rs/sled/0.34.6/sled/struct.Db.html#method.export) Additionally, there is a standing issue that prevents the sled datastore from having the same level of safety as the RocksDB datastore. +## General hints + +The sled datastore is not production-ready yet. sled itself is pre-1.0, and makes no guarantees about on-disk format +stability. +Upgrading IndraDB may require you +to [manually migrate the sled datastore.](https://docs.rs/sled/0.34.6/sled/struct.Db.html#method.export) +Additionally, there is a standing issue that prevents the sled datastore from having the same level of safety as the +RocksDB datastore. +Performance of sled in two-hop-queries is better than rocksdb though. We measured about 40% faster query times. + +## TODO / Notices: + +- This version does index all properties by default (and therefore duplicates the data). + This is adding the benefit of faster queries but also increases the storage requirements. +- Batch operations are not optimized yet, diff --git a/src/datastore.rs b/src/datastore.rs index 1d3f8c3..3adeb5c 100644 --- a/src/datastore.rs +++ b/src/datastore.rs @@ -1,20 +1,17 @@ use std::path::Path; -use std::sync::Arc; -use std::{u64, usize}; -use super::errors::map_err; -use super::managers::*; - -use chrono::offset::Utc; -use indradb::util::next_uuid; -use indradb::{ - BulkInsertItem, Datastore, Edge, EdgeDirection, EdgeKey, EdgeProperties, EdgeProperty, EdgePropertyQuery, - EdgeQuery, NamedProperty, Result, Transaction, Type, Vertex, VertexProperties, VertexProperty, VertexPropertyQuery, - VertexQuery, -}; -use serde_json::Value as JsonValue; +use indradb::{Datastore, Result}; use sled::{Config, Db, Tree}; -use uuid::Uuid; + +use crate::managers::edge_manager::EdgeManager; +use crate::managers::edge_property_manager::EdgePropertyManager; +use crate::managers::edge_range_manager::EdgeRangeManager; +use crate::managers::metadata::MetaDataManager; +use crate::managers::vertex_manager::VertexManager; +use crate::managers::vertex_property_manager::VertexPropertyManager; +use crate::transaction::SledTransaction; + +use super::errors::map_err; #[derive(Copy, Clone, Default, Debug)] pub struct SledConfig { @@ -38,22 +35,27 @@ impl SledConfig { /// Creates a new sled datastore. pub fn open>(self, path: P) -> Result { Ok(SledDatastore { - holder: Arc::new(SledHolder::new(path, self)?), + holder: SledHolder::new(path, self)?, }) } } /// The meat of a Sled datastore pub struct SledHolder { - pub(crate) db: Arc, // Derefs to Tree, holds the vertices + pub(crate) db: Db, // Derefs to Tree, holds the vertices pub(crate) edges: Tree, pub(crate) edge_ranges: Tree, pub(crate) reversed_edge_ranges: Tree, pub(crate) vertex_properties: Tree, pub(crate) edge_properties: Tree, + // for prop-name -> value -> ID prefix-indexed lookup + pub(crate) edge_property_values: Tree, + // for prop-name -> value -> UUID prefix-indexed lookup + pub(crate) vertex_property_values: Tree, + pub(crate) metadata: Tree, } -impl<'ds> SledHolder { +impl SledHolder { /// The meat of a Sled datastore. /// /// # Arguments @@ -78,463 +80,52 @@ impl<'ds> SledHolder { reversed_edge_ranges: map_err(db.open_tree("reversed_edge_ranges"))?, vertex_properties: map_err(db.open_tree("vertex_properties"))?, edge_properties: map_err(db.open_tree("edge_properties"))?, - db: Arc::new(db), + vertex_property_values: map_err(db.open_tree("vertex_property_values"))?, + edge_property_values: map_err(db.open_tree("edge_property_values"))?, + metadata: map_err(db.open_tree("metadata"))?, + db, }) } } /// A datastore that is backed by Sled. pub struct SledDatastore { - pub(crate) holder: Arc, + pub(crate) holder: SledHolder, } -impl<'ds> SledDatastore { +impl SledDatastore { /// Creates a new Sled datastore. /// /// # Arguments /// * `path`: The file path to the Sled database. pub fn new>(path: P) -> Result { Ok(SledDatastore { - holder: Arc::new(SledHolder::new(path, SledConfig::default())?), + holder: SledHolder::new(path, SledConfig::default())?, }) } } impl Datastore for SledDatastore { - type Trans = SledTransaction; - - fn sync(&self) -> Result<()> { - let holder = self.holder.clone(); - let db = holder.db.clone(); - map_err(db.flush())?; - Ok(()) - } - - fn transaction(&self) -> Result { - Ok(SledTransaction::new(self.holder.clone())) - } - - fn bulk_insert(&self, items: I) -> Result<()> + type Transaction<'a> = SledTransaction<'a> where - I: Iterator, - { - let vertex_manager = VertexManager::new(&self.holder); - let edge_manager = EdgeManager::new(&self.holder); - let vertex_property_manager = VertexPropertyManager::new(&self.holder.vertex_properties); - let edge_property_manager = EdgePropertyManager::new(&self.holder.edge_properties); - - for item in items { - match item { - BulkInsertItem::Vertex(ref vertex) => { - vertex_manager.create(vertex)?; - } - BulkInsertItem::Edge(ref key) => { - edge_manager.set(key.outbound_id, &key.t, key.inbound_id, Utc::now())?; - } - BulkInsertItem::VertexProperty(id, ref name, ref value) => { - vertex_property_manager.set(id, name, value)?; - } - BulkInsertItem::EdgeProperty(ref key, ref name, ref value) => { - edge_property_manager.set(key.outbound_id, &key.t, key.inbound_id, name, value)?; - } - } - } - - map_err(self.holder.db.flush())?; - Ok(()) - } -} - -/// A transaction that is backed by Sled. -pub struct SledTransaction { - holder: Arc, -} - -impl SledTransaction { - fn new(holder: Arc) -> Self { - SledTransaction { holder } - } - - #[allow(clippy::needless_collect)] - fn vertex_query_to_iterator<'iter, 'trans: 'iter>( - &'trans self, - q: VertexQuery, - ) -> Result> + 'iter>> { - match q { - VertexQuery::Range(q) => { - let vertex_manager = VertexManager::new(&self.holder); - - let next_uuid = match q.start_id { - Some(start_id) => { - match next_uuid(start_id) { - Ok(next_uuid) => next_uuid, - // If we get an error back, it's because - // `start_id` is the maximum possible value. We - // know that no vertices exist whose ID is greater - // than the maximum possible value, so just return - // an empty list. - Err(_) => return Ok(Box::new(vec![].into_iter())), - } - } - None => Uuid::default(), - }; - - let mut iter: Box>> = - Box::new(vertex_manager.iterate_for_range(next_uuid)); - - if let Some(ref t) = q.t { - iter = Box::new(iter.filter(move |item| match item { - Ok((_, v)) => v == t, - Err(_) => true, - })); - } - - let results: Vec> = iter.take(q.limit as usize).collect(); - Ok(Box::new(results.into_iter())) - } - VertexQuery::Specific(q) => { - let vertex_manager = VertexManager::new(&self.holder); - - let iter = q.ids.into_iter().map(move |id| match vertex_manager.get(id)? { - Some(value) => Ok(Some((id, value))), - None => Ok(None), - }); - - Ok(Box::new(remove_nones_from_iterator(iter))) - } - VertexQuery::Pipe(q) => { - let vertex_manager = VertexManager::new(&self.holder); - let edge_iterator = self.edge_query_to_iterator(*q.inner)?; - let direction = q.direction; - - let iter = edge_iterator.map(move |item| { - let (outbound_id, _, _, inbound_id) = item?; - - let id = match direction { - EdgeDirection::Outbound => outbound_id, - EdgeDirection::Inbound => inbound_id, - }; - - match vertex_manager.get(id)? { - Some(value) => Ok(Some((id, value))), - None => Ok(None), - } - }); - - let mut iter: Box>> = Box::new(remove_nones_from_iterator(iter)); - - if let Some(ref t) = q.t { - iter = Box::new(iter.filter(move |item| match item { - Ok((_, v)) => v == t, - Err(_) => true, - })); - } - - let results: Vec> = iter.take(q.limit as usize).collect(); - Ok(Box::new(results.into_iter())) - } - } - } - - fn edge_query_to_iterator<'iter, 'trans: 'iter>( - &'trans self, - q: EdgeQuery, - ) -> Result> + 'iter>> { - match q { - EdgeQuery::Specific(q) => { - let edge_manager = EdgeManager::new(&self.holder); - - let edges = q.keys.into_iter().map(move |key| { - match edge_manager.get(key.outbound_id, &key.t, key.inbound_id)? { - Some(update_datetime) => { - Ok(Some((key.outbound_id, key.t.clone(), update_datetime, key.inbound_id))) - } - None => Ok(None), - } - }); - - let iterator = remove_nones_from_iterator(edges); - Ok(Box::new(iterator)) - } - EdgeQuery::Pipe(q) => { - let vertex_iterator = self.vertex_query_to_iterator(*q.inner)?; - - let edge_range_manager = match q.direction { - EdgeDirection::Outbound => EdgeRangeManager::new(&self.holder), - EdgeDirection::Inbound => EdgeRangeManager::new_reversed(&self.holder), - }; - - // Ideally we'd use iterators all the way down, but things - // start breaking apart due to conditional expressions not - // returning the same type signature, issues with `Result`s - // and some of the iterators, etc. So at this point, we'll - // just resort to building a vector. - let mut edges: Vec> = Vec::new(); - - for item in vertex_iterator { - let (id, _) = item?; - let edge_iterator = edge_range_manager.iterate_for_range(id, q.t.as_ref(), q.high)?; - - for item in edge_iterator { - match item { - Ok(( - edge_range_first_id, - edge_range_t, - edge_range_update_datetime, - edge_range_second_id, - )) => { - if let Some(low) = q.low { - if edge_range_update_datetime < low { - break; - } - } - - edges.push(match q.direction { - EdgeDirection::Outbound => Ok(( - edge_range_first_id, - edge_range_t, - edge_range_update_datetime, - edge_range_second_id, - )), - EdgeDirection::Inbound => Ok(( - edge_range_second_id, - edge_range_t, - edge_range_update_datetime, - edge_range_first_id, - )), - }) - } - Err(_) => edges.push(item), - } - - if edges.len() == q.limit as usize { - break; - } - } - } - - Ok(Box::new(edges.into_iter())) - } - } - } -} - -impl Transaction for SledTransaction { - fn create_vertex(&self, vertex: &Vertex) -> Result { - let vertex_manager = VertexManager::new(&self.holder); - - if vertex_manager.exists(vertex.id)? { - Ok(false) - } else { - vertex_manager.create(vertex)?; - Ok(true) - } - } - - fn get_vertices>(&self, q: Q) -> Result> { - let iterator = self.vertex_query_to_iterator(q.into())?; - - let mapped = iterator.map(move |item| { - let (id, t) = item?; - let vertex = Vertex::with_id(id, t); - Ok(vertex) - }); - - mapped.collect() - } - - fn delete_vertices>(&self, q: Q) -> Result<()> { - let iterator = self.vertex_query_to_iterator(q.into())?; - let vertex_manager = VertexManager::new(&self.holder); - - for item in iterator { - let (id, _) = item?; - vertex_manager.delete(id)?; - } - - Ok(()) - } - - fn get_vertex_count(&self) -> Result { - let vertex_manager = VertexManager::new(&self.holder); - let iterator = vertex_manager.iterate_for_range(Uuid::default()); - Ok(iterator.count() as u64) - } - - fn create_edge(&self, key: &EdgeKey) -> Result { - let vertex_manager = VertexManager::new(&self.holder); - - if !vertex_manager.exists(key.outbound_id)? || !vertex_manager.exists(key.inbound_id)? { - Ok(false) - } else { - let edge_manager = EdgeManager::new(&self.holder); - edge_manager.set(key.outbound_id, &key.t, key.inbound_id, Utc::now())?; - Ok(true) - } - } - - fn get_edges>(&self, q: Q) -> Result> { - let iterator = self.edge_query_to_iterator(q.into())?; - - let mapped = iterator.map(move |item: Result| { - let (outbound_id, t, update_datetime, inbound_id) = item?; - let key = EdgeKey::new(outbound_id, t, inbound_id); - let edge = Edge::new(key, update_datetime); - Ok(edge) - }); - - mapped.collect() - } - - fn delete_edges>(&self, q: Q) -> Result<()> { - let edge_manager = EdgeManager::new(&self.holder); - let vertex_manager = VertexManager::new(&self.holder); - let iterator = self.edge_query_to_iterator(q.into())?; - - for item in iterator { - let (outbound_id, t, update_datetime, inbound_id) = item?; - - if vertex_manager.get(outbound_id)?.is_some() { - edge_manager.delete(outbound_id, &t, inbound_id, update_datetime)?; - }; - } - Ok(()) - } - - fn get_edge_count(&self, id: Uuid, t: Option<&Type>, direction: EdgeDirection) -> Result { - let edge_range_manager = match direction { - EdgeDirection::Outbound => EdgeRangeManager::new(&self.holder), - EdgeDirection::Inbound => EdgeRangeManager::new_reversed(&self.holder), - }; - - let iter = edge_range_manager.iterate_for_range(id, t, None)?; - let count = iter.count(); - - Ok(count as u64) - } - - fn get_vertex_properties(&self, q: VertexPropertyQuery) -> Result> { - let manager = VertexPropertyManager::new(&self.holder.vertex_properties); - let mut properties = Vec::new(); - - for item in self.vertex_query_to_iterator(q.inner)? { - let (id, _) = item?; - let value = manager.get(id, &q.name)?; - - if let Some(value) = value { - properties.push(VertexProperty::new(id, value)); - } + Self: 'a; + + fn transaction(&self) -> Self::Transaction<'_> { + SledTransaction { + holder: &self.holder, + vertex_manager: VertexManager::new(&self.holder), + edge_manager: EdgeManager::new(&self.holder), + edge_range_manager: EdgeRangeManager::new(&self.holder), + edge_range_manager_rev: EdgeRangeManager::new_reversed(&self.holder), + edge_property_manager: EdgePropertyManager::new( + &self.holder.edge_properties, + &self.holder.edge_property_values, + ), + vertex_property_manager: VertexPropertyManager::new( + &self.holder.vertex_properties, + &self.holder.vertex_property_values, + ), + meta_data_manager: MetaDataManager::new(&self.holder.metadata).unwrap(), } - - Ok(properties) - } - - fn get_all_vertex_properties>(&self, q: Q) -> Result> { - let manager = VertexPropertyManager::new(&self.holder.vertex_properties); - let iterator = self.vertex_query_to_iterator(q.into())?; - - let iter = iterator.map(move |item| { - let (id, t) = item?; - let vertex = Vertex::with_id(id, t); - - let it = manager.iterate_for_owner(id)?; - let props: Result> = it.collect(); - let props_iter = props?.into_iter(); - let props = props_iter - .map(|((_, name), value)| NamedProperty::new(name, value)) - .collect(); - - Ok(VertexProperties::new(vertex, props)) - }); - - iter.collect() } - - fn set_vertex_properties(&self, q: VertexPropertyQuery, value: &JsonValue) -> Result<()> { - let manager = VertexPropertyManager::new(&self.holder.vertex_properties); - - for item in self.vertex_query_to_iterator(q.inner)? { - let (id, _) = item?; - manager.set(id, &q.name, value)?; - } - Ok(()) - } - - fn delete_vertex_properties(&self, q: VertexPropertyQuery) -> Result<()> { - let manager = VertexPropertyManager::new(&self.holder.vertex_properties); - - for item in self.vertex_query_to_iterator(q.inner)? { - let (id, _) = item?; - manager.delete(id, &q.name)?; - } - Ok(()) - } - - fn get_edge_properties(&self, q: EdgePropertyQuery) -> Result> { - let manager = EdgePropertyManager::new(&self.holder.edge_properties); - let mut properties = Vec::new(); - - for item in self.edge_query_to_iterator(q.inner)? { - let (outbound_id, t, _, inbound_id) = item?; - let value = manager.get(outbound_id, &t, inbound_id, &q.name)?; - - if let Some(value) = value { - let key = EdgeKey::new(outbound_id, t, inbound_id); - properties.push(EdgeProperty::new(key, value)); - } - } - - Ok(properties) - } - - fn get_all_edge_properties>(&self, q: Q) -> Result> { - let manager = EdgePropertyManager::new(&self.holder.edge_properties); - let iterator = self.edge_query_to_iterator(q.into())?; - - let iter = iterator.map(move |item| { - let (out_id, t, time, in_id) = item?; - let edge = Edge::new(EdgeKey::new(out_id, t.clone(), in_id), time); - let it = manager.iterate_for_owner(out_id, &t, in_id)?; - let props: Result> = it.collect(); - let props_iter = props?.into_iter(); - let props = props_iter - .map(|((_, _, _, name), value)| NamedProperty::new(name, value)) - .collect(); - - Ok(EdgeProperties::new(edge, props)) - }); - - iter.collect() - } - - fn set_edge_properties(&self, q: EdgePropertyQuery, value: &JsonValue) -> Result<()> { - let manager = EdgePropertyManager::new(&self.holder.edge_properties); - - for item in self.edge_query_to_iterator(q.inner)? { - let (outbound_id, t, _, inbound_id) = item?; - manager.set(outbound_id, &t, inbound_id, &q.name, value)?; - } - Ok(()) - } - - fn delete_edge_properties(&self, q: EdgePropertyQuery) -> Result<()> { - let manager = EdgePropertyManager::new(&self.holder.edge_properties); - - for item in self.edge_query_to_iterator(q.inner)? { - let (outbound_id, t, _, inbound_id) = item?; - manager.delete(outbound_id, &t, inbound_id, &q.name)?; - } - Ok(()) - } -} - -fn remove_nones_from_iterator(iter: I) -> impl Iterator> -where - I: Iterator>>, -{ - iter.filter_map(|item| match item { - Err(err) => Some(Err(err)), - Ok(Some(value)) => Some(Ok(value)), - _ => None, - }) } diff --git a/src/errors.rs b/src/errors.rs index 45e026a..78b0582 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,6 +1,26 @@ +use std::sync::PoisonError; + use indradb::Error as IndraError; use sled::Error as SledError; pub(crate) fn map_err(result: Result) -> Result { - result.map_err(|err| IndraError::Datastore { inner: Box::new(err) }) + result.map_err(|err| IndraError::Datastore(Box::new(err))) +} + +#[derive(Debug, thiserror::Error)] +pub enum DSError { + #[error("Error in locking a RwLock: {0}")] + PoisonError(String), +} + +impl From> for DSError { + fn from(value: PoisonError) -> Self { + DSError::PoisonError(value.to_string()) + } +} + +impl From for IndraError { + fn from(err: DSError) -> Self { + IndraError::Datastore(Box::new(err)) + } } diff --git a/src/lib.rs b/src/lib.rs index 11f4543..58240dd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,57 +3,74 @@ #![cfg_attr(feature = "bench-suite", feature(test))] extern crate chrono; - +extern crate ecow; #[cfg(any(feature = "bench-suite", feature = "test-suite"))] #[macro_use] extern crate indradb; #[cfg(not(any(feature = "bench-suite", feature = "test-suite")))] extern crate indradb; - extern crate serde_json; extern crate sled; #[cfg(any(feature = "bench-suite", feature = "test-suite"))] extern crate tempfile; +extern crate thiserror; extern crate uuid; +use indradb::Edge; + +pub use self::datastore::{SledConfig, SledDatastore}; + mod datastore; mod errors; mod managers; - -pub use self::datastore::{SledConfig, SledDatastore, SledTransaction}; +mod transaction; mod normal_config { + #[cfg(feature = "bench-suite")] full_bench_impl!({ use super::SledDatastore; + use indradb::Database; use tempfile::tempdir; let path = tempdir().unwrap().into_path(); - SledDatastore::new(path).unwrap() + Database::new(SledDatastore::new(path).unwrap()) }); #[cfg(feature = "test-suite")] full_test_impl!({ use super::SledDatastore; + use indradb::Database; use tempfile::tempdir; let path = tempdir().unwrap().into_path(); - SledDatastore::new(path).unwrap() + Database::new(SledDatastore::new(path).unwrap()) }); } mod compression_config { + #[cfg(feature = "bench-suite")] full_bench_impl!({ use super::SledConfig; + use indradb::Database; use tempfile::tempdir; let path = tempdir().unwrap().into_path(); - SledConfig::with_compression(None).open(path).unwrap() + Database::new(SledConfig::with_compression(None).open(path).unwrap()) }); #[cfg(feature = "test-suite")] full_test_impl!({ use super::SledConfig; + use indradb::Database; use tempfile::tempdir; let path = tempdir().unwrap().into_path(); - SledConfig::with_compression(None).open(path).unwrap() + Database::new(SledConfig::with_compression(None).open(path).unwrap()) }); } + +fn reverse_edge(edge: &Edge) -> Edge { + Edge { + outbound_id: edge.inbound_id, + t: edge.t, + inbound_id: edge.outbound_id, + } +} diff --git a/src/managers.rs b/src/managers.rs deleted file mode 100644 index 0fe2567..0000000 --- a/src/managers.rs +++ /dev/null @@ -1,448 +0,0 @@ -use std::io::Cursor; -use std::ops::Deref; -use std::u8; - -use super::errors::map_err; -use crate::datastore::SledHolder; - -use chrono::offset::Utc; -use chrono::DateTime; -use indradb::{util, Result, Type, Vertex}; -use serde_json::Value as JsonValue; -use sled::Result as SledResult; -use sled::{IVec, Iter as DbIterator, Tree}; -use uuid::Uuid; - -pub type OwnedPropertyItem = ((Uuid, String), JsonValue); -pub type VertexItem = (Uuid, Type); -pub type EdgeRangeItem = (Uuid, Type, DateTime, Uuid); -pub type EdgePropertyItem = ((Uuid, Type, Uuid, String), JsonValue); - -fn take_while_prefixed(iterator: DbIterator, prefix: Vec) -> impl Iterator> { - iterator.take_while(move |item| -> bool { - match item { - Ok((k, _)) => k.starts_with(&prefix), - Err(_) => false, - } - }) -} - -pub struct VertexManager<'db: 'tree, 'tree> { - pub holder: &'db SledHolder, - pub tree: &'tree Tree, -} - -impl<'db: 'tree, 'tree> VertexManager<'db, 'tree> { - pub fn new(ds: &'db SledHolder) -> Self { - VertexManager { - holder: ds, - tree: ds.db.deref(), - } - } - - fn key(&self, id: Uuid) -> Vec { - util::build(&[util::Component::Uuid(id)]) - } - - pub fn exists(&self, id: Uuid) -> Result { - Ok(map_err(self.tree.get(&self.key(id)))?.is_some()) - } - - pub fn get(&self, id: Uuid) -> Result> { - match map_err(self.tree.get(&self.key(id)))? { - Some(value_bytes) => { - let mut cursor = Cursor::new(value_bytes.deref()); - Ok(Some(util::read_type(&mut cursor))) - } - None => Ok(None), - } - } - - fn iterate(&self, iterator: DbIterator) -> impl Iterator> + '_ { - iterator.map(move |item| -> Result { - let (k, v) = map_err(item)?; - - let id = { - debug_assert_eq!(k.len(), 16); - let mut cursor = Cursor::new(k); - util::read_uuid(&mut cursor) - }; - - let mut cursor = Cursor::new(v); - let t = util::read_type(&mut cursor); - Ok((id, t)) - }) - } - - pub fn iterate_for_range(&self, id: Uuid) -> impl Iterator> + '_ { - let low_key = util::build(&[util::Component::Uuid(id)]); - let low_key_bytes: &[u8] = low_key.as_ref(); - let iter = self.tree.range(low_key_bytes..); - self.iterate(iter) - } - - pub fn create(&self, vertex: &Vertex) -> Result<()> { - let key = self.key(vertex.id); - map_err(self.tree.insert(&key, util::build(&[util::Component::Type(&vertex.t)])))?; - Ok(()) - } - - pub fn delete(&self, id: Uuid) -> Result<()> { - map_err(self.tree.remove(&self.key(id)))?; - - let vertex_property_manager = VertexPropertyManager::new(&self.holder.vertex_properties); - for item in vertex_property_manager.iterate_for_owner(id)? { - let ((vertex_property_owner_id, vertex_property_name), _) = item?; - vertex_property_manager.delete(vertex_property_owner_id, &vertex_property_name[..])?; - } - - let edge_manager = EdgeManager::new(self.holder); - - { - let edge_range_manager = EdgeRangeManager::new(self.holder); - for item in edge_range_manager.iterate_for_owner(id) { - let (edge_range_outbound_id, edge_range_t, edge_range_update_datetime, edge_range_inbound_id) = item?; - debug_assert_eq!(edge_range_outbound_id, id); - edge_manager.delete( - edge_range_outbound_id, - &edge_range_t, - edge_range_inbound_id, - edge_range_update_datetime, - )?; - } - } - - { - let reversed_edge_range_manager = EdgeRangeManager::new_reversed(self.holder); - for item in reversed_edge_range_manager.iterate_for_owner(id) { - let ( - reversed_edge_range_inbound_id, - reversed_edge_range_t, - reversed_edge_range_update_datetime, - reversed_edge_range_outbound_id, - ) = item?; - debug_assert_eq!(reversed_edge_range_inbound_id, id); - edge_manager.delete( - reversed_edge_range_outbound_id, - &reversed_edge_range_t, - reversed_edge_range_inbound_id, - reversed_edge_range_update_datetime, - )?; - } - } - Ok(()) - } -} - -pub struct EdgeManager<'db: 'tree, 'tree> { - pub holder: &'db SledHolder, - pub tree: &'tree Tree, -} - -impl<'db, 'tree> EdgeManager<'db, 'tree> { - pub fn new(ds: &'db SledHolder) -> Self { - EdgeManager { - holder: ds, - tree: &ds.edges, - } - } - - fn key(&self, outbound_id: Uuid, t: &Type, inbound_id: Uuid) -> Vec { - util::build(&[ - util::Component::Uuid(outbound_id), - util::Component::Type(t), - util::Component::Uuid(inbound_id), - ]) - } - - pub fn get(&self, outbound_id: Uuid, t: &Type, inbound_id: Uuid) -> Result>> { - match map_err(self.tree.get(self.key(outbound_id, t, inbound_id)))? { - Some(value_bytes) => { - let mut cursor = Cursor::new(value_bytes.deref()); - Ok(Some(util::read_datetime(&mut cursor))) - } - None => Ok(None), - } - } - - pub fn set(&self, outbound_id: Uuid, t: &Type, inbound_id: Uuid, new_update_datetime: DateTime) -> Result<()> { - let edge_range_manager = EdgeRangeManager::new(self.holder); - let reversed_edge_range_manager = EdgeRangeManager::new_reversed(self.holder); - - if let Some(update_datetime) = self.get(outbound_id, t, inbound_id)? { - edge_range_manager.delete(outbound_id, t, update_datetime, inbound_id)?; - reversed_edge_range_manager.delete(inbound_id, t, update_datetime, outbound_id)?; - } - - let key = self.key(outbound_id, t, inbound_id); - map_err( - self.tree - .insert(key, util::build(&[util::Component::DateTime(new_update_datetime)])), - )?; - edge_range_manager.set(outbound_id, t, new_update_datetime, inbound_id)?; - reversed_edge_range_manager.set(inbound_id, t, new_update_datetime, outbound_id)?; - Ok(()) - } - - pub fn delete(&self, outbound_id: Uuid, t: &Type, inbound_id: Uuid, update_datetime: DateTime) -> Result<()> { - map_err(self.tree.remove(&self.key(outbound_id, t, inbound_id)))?; - - let edge_range_manager = EdgeRangeManager::new(self.holder); - edge_range_manager.delete(outbound_id, t, update_datetime, inbound_id)?; - - let reversed_edge_range_manager = EdgeRangeManager::new_reversed(self.holder); - reversed_edge_range_manager.delete(inbound_id, t, update_datetime, outbound_id)?; - - let edge_property_manager = EdgePropertyManager::new(&self.holder.edge_properties); - for item in edge_property_manager.iterate_for_owner(outbound_id, t, inbound_id)? { - let ((edge_property_outbound_id, edge_property_t, edge_property_inbound_id, edge_property_name), _) = item?; - edge_property_manager.delete( - edge_property_outbound_id, - &edge_property_t, - edge_property_inbound_id, - &edge_property_name[..], - )?; - } - Ok(()) - } -} - -pub struct EdgeRangeManager<'tree> { - pub tree: &'tree Tree, -} - -impl<'tree> EdgeRangeManager<'tree> { - pub fn new<'db: 'tree>(ds: &'db SledHolder) -> Self { - EdgeRangeManager { tree: &ds.edge_ranges } - } - - pub fn new_reversed<'db: 'tree>(ds: &'db SledHolder) -> Self { - EdgeRangeManager { - tree: &ds.reversed_edge_ranges, - } - } - - fn key(&self, first_id: Uuid, t: &Type, update_datetime: DateTime, second_id: Uuid) -> Vec { - util::build(&[ - util::Component::Uuid(first_id), - util::Component::Type(t), - util::Component::DateTime(update_datetime), - util::Component::Uuid(second_id), - ]) - } - - fn iterate<'it>(&self, iterator: DbIterator, prefix: Vec) -> impl Iterator> + 'it { - let filtered = take_while_prefixed(iterator, prefix); - filtered.map(move |item| -> Result { - let (k, _) = map_err(item)?; - let mut cursor = Cursor::new(k); - let first_id = util::read_uuid(&mut cursor); - let t = util::read_type(&mut cursor); - let update_datetime = util::read_datetime(&mut cursor); - let second_id = util::read_uuid(&mut cursor); - Ok((first_id, t, update_datetime, second_id)) - }) - } - - pub fn iterate_for_range<'iter, 'trans: 'iter>( - &'trans self, - id: Uuid, - t: Option<&Type>, - high: Option>, - ) -> Result> + 'iter>> { - match t { - Some(t) => { - let high = high.unwrap_or_else(|| *util::MAX_DATETIME); - let prefix = util::build(&[util::Component::Uuid(id), util::Component::Type(t)]); - let low_key = util::build(&[ - util::Component::Uuid(id), - util::Component::Type(t), - util::Component::DateTime(high), - ]); - let low_key_bytes: &[u8] = low_key.as_ref(); - let iterator = self.tree.range(low_key_bytes..); - Ok(Box::new(self.iterate(iterator, prefix))) - } - None => { - let prefix = util::build(&[util::Component::Uuid(id)]); - let prefix_bytes: &[u8] = prefix.as_ref(); - let iterator = self.tree.range(prefix_bytes..); - let mapped = self.iterate(iterator, prefix); - - if let Some(high) = high { - // We can filter out `update_datetime`s greater than - // `high` via key prefix filtering, so instead we handle - // it here - after the key has been deserialized. - let filtered = mapped.filter(move |item| { - if let Ok((_, _, update_datetime, _)) = *item { - update_datetime <= high - } else { - true - } - }); - - Ok(Box::new(filtered)) - } else { - Ok(Box::new(mapped)) - } - } - } - } - - pub fn iterate_for_owner<'iter, 'trans: 'iter>( - &'trans self, - id: Uuid, - ) -> impl Iterator> + 'iter { - let prefix: Vec = util::build(&[util::Component::Uuid(id)]); - let iterator = self.tree.scan_prefix(&prefix); - self.iterate(iterator, prefix) - } - - pub fn set(&self, first_id: Uuid, t: &Type, update_datetime: DateTime, second_id: Uuid) -> Result<()> { - let key = self.key(first_id, t, update_datetime, second_id); - map_err(self.tree.insert(&key, &[]))?; - Ok(()) - } - - pub fn delete(&self, first_id: Uuid, t: &Type, update_datetime: DateTime, second_id: Uuid) -> Result<()> { - map_err(self.tree.remove(&self.key(first_id, t, update_datetime, second_id)))?; - Ok(()) - } -} - -pub struct VertexPropertyManager<'tree> { - pub tree: &'tree Tree, -} - -impl<'tree> VertexPropertyManager<'tree> { - pub fn new(tree: &'tree Tree) -> Self { - VertexPropertyManager { tree } - } - - fn key(&self, vertex_id: Uuid, name: &str) -> Vec { - util::build(&[ - util::Component::Uuid(vertex_id), - util::Component::FixedLengthString(name), - ]) - } - - pub fn iterate_for_owner(&self, vertex_id: Uuid) -> Result> + '_> { - let prefix = util::build(&[util::Component::Uuid(vertex_id)]); - let iterator = self.tree.scan_prefix(&prefix); - - Ok(iterator.map(move |item| -> Result { - let (k, v) = map_err(item)?; - let mut cursor = Cursor::new(k); - let owner_id = util::read_uuid(&mut cursor); - debug_assert_eq!(vertex_id, owner_id); - let name = util::read_fixed_length_string(&mut cursor); - let value = serde_json::from_slice(&v)?; - Ok(((owner_id, name), value)) - })) - } - - pub fn get(&self, vertex_id: Uuid, name: &str) -> Result> { - let key = self.key(vertex_id, name); - - match map_err(self.tree.get(&key))? { - Some(value_bytes) => Ok(Some(serde_json::from_slice(&value_bytes)?)), - None => Ok(None), - } - } - - pub fn set(&self, vertex_id: Uuid, name: &str, value: &JsonValue) -> Result<()> { - let key = self.key(vertex_id, name); - let value_json = serde_json::to_vec(value)?; - map_err(self.tree.insert(key.as_slice(), value_json.as_slice()))?; - Ok(()) - } - - pub fn delete(&self, vertex_id: Uuid, name: &str) -> Result<()> { - map_err(self.tree.remove(&self.key(vertex_id, name)))?; - Ok(()) - } -} - -pub struct EdgePropertyManager<'tree> { - pub tree: &'tree Tree, -} - -impl<'tree> EdgePropertyManager<'tree> { - pub fn new(tree: &'tree Tree) -> Self { - EdgePropertyManager { tree } - } - - fn key(&self, outbound_id: Uuid, t: &Type, inbound_id: Uuid, name: &str) -> Vec { - util::build(&[ - util::Component::Uuid(outbound_id), - util::Component::Type(t), - util::Component::Uuid(inbound_id), - util::Component::FixedLengthString(name), - ]) - } - - pub fn iterate_for_owner<'a>( - &'a self, - outbound_id: Uuid, - t: &'a Type, - inbound_id: Uuid, - ) -> Result> + 'a>> { - let prefix = util::build(&[ - util::Component::Uuid(outbound_id), - util::Component::Type(t), - util::Component::Uuid(inbound_id), - ]); - - let iterator = self.tree.scan_prefix(&prefix); - - let mapped = iterator.map(move |item| -> Result { - let (k, v) = map_err(item)?; - let mut cursor = Cursor::new(k); - - let edge_property_outbound_id = util::read_uuid(&mut cursor); - debug_assert_eq!(edge_property_outbound_id, outbound_id); - - let edge_property_t = util::read_type(&mut cursor); - debug_assert_eq!(&edge_property_t, t); - - let edge_property_inbound_id = util::read_uuid(&mut cursor); - debug_assert_eq!(edge_property_inbound_id, inbound_id); - - let edge_property_name = util::read_fixed_length_string(&mut cursor); - - let value = serde_json::from_slice(&v)?; - Ok(( - ( - edge_property_outbound_id, - edge_property_t, - edge_property_inbound_id, - edge_property_name, - ), - value, - )) - }); - - Ok(Box::new(mapped)) - } - - pub fn get(&self, outbound_id: Uuid, t: &Type, inbound_id: Uuid, name: &str) -> Result> { - let key = self.key(outbound_id, t, inbound_id, name); - - match map_err(self.tree.get(&key))? { - Some(ref value_bytes) => Ok(Some(serde_json::from_slice(value_bytes)?)), - None => Ok(None), - } - } - - pub fn set(&self, outbound_id: Uuid, t: &Type, inbound_id: Uuid, name: &str, value: &JsonValue) -> Result<()> { - let key = self.key(outbound_id, t, inbound_id, name); - let value_json = serde_json::to_vec(value)?; - map_err(self.tree.insert(key.as_slice(), value_json.as_slice()))?; - Ok(()) - } - - pub fn delete(&self, outbound_id: Uuid, t: &Type, inbound_id: Uuid, name: &str) -> Result<()> { - map_err(self.tree.remove(&self.key(outbound_id, t, inbound_id, name)))?; - Ok(()) - } -} diff --git a/src/managers/edge_manager.rs b/src/managers/edge_manager.rs new file mode 100644 index 0000000..427ddee --- /dev/null +++ b/src/managers/edge_manager.rs @@ -0,0 +1,80 @@ +use indradb::{util, Edge}; +use sled::{Batch, IVec, Tree}; + +use crate::datastore::SledHolder; +use crate::errors::map_err; +use crate::managers::edge_property_manager::EdgePropertyManager; +use crate::managers::edge_range_manager::EdgeRangeManager; +use crate::reverse_edge; + +pub struct EdgeManager<'db: 'tree, 'tree> { + pub holder: &'db SledHolder, + pub tree: &'tree Tree, +} + +impl<'db, 'tree> EdgeManager<'db, 'tree> { + pub fn new(ds: &'db SledHolder) -> Self { + EdgeManager { + holder: ds, + tree: &ds.edges, + } + } + + fn key(&self, edge: Edge) -> Vec { + util::build(&[ + util::Component::Uuid(edge.outbound_id), + util::Component::Identifier(edge.t), + util::Component::Uuid(edge.inbound_id), + ]) + } + + pub fn count(&self) -> u64 { + self.tree.iter().count() as u64 + } + + pub fn set_batch( + &self, + edge: &Edge, + batch: &mut Batch, + range_batch: &mut Batch, + range_rev_batch: &mut Batch, + ) -> indradb::Result<()> { + let key = self.key(edge.clone()); + batch.insert(key, IVec::default()); + let edge_range_manager = EdgeRangeManager::new(self.holder); + edge_range_manager.set_batch(edge, range_batch)?; + let edge_range_manager_rev = EdgeRangeManager::new_reversed(self.holder); + edge_range_manager_rev.set_batch(&reverse_edge(edge), range_rev_batch)?; + Ok(()) + } + + pub fn set(&self, edge: &Edge) -> indradb::Result<()> { + let edge_range_manager = EdgeRangeManager::new(self.holder); + let reversed_edge_range_manager = EdgeRangeManager::new_reversed(self.holder); + + let key = self.key(edge.clone()); + map_err(self.tree.insert(key, IVec::default()))?; + edge_range_manager.set(edge)?; + reversed_edge_range_manager.set(&reverse_edge(edge))?; + Ok(()) + } + + pub fn delete(&self, edge: &Edge) -> indradb::Result<()> { + map_err(self.tree.remove(self.key(edge.clone())))?; + + let edge_range_manager = EdgeRangeManager::new(self.holder); + edge_range_manager.delete(edge)?; + + let reversed_edge_range_manager = EdgeRangeManager::new_reversed(self.holder); + reversed_edge_range_manager.delete(&reverse_edge(edge))?; + + let edge_property_manager = + EdgePropertyManager::new(&self.holder.edge_properties, &self.holder.edge_property_values); + + for item in edge_property_manager.iterate_for_owner(edge)? { + let ((edge, id), _) = item?; + edge_property_manager.delete(&edge, id)?; + } + Ok(()) + } +} diff --git a/src/managers/edge_property_manager.rs b/src/managers/edge_property_manager.rs new file mode 100644 index 0000000..f44e30f --- /dev/null +++ b/src/managers/edge_property_manager.rs @@ -0,0 +1,191 @@ +use std::collections::HashMap; +use std::io::Cursor; + +use indradb::{util, Edge, Identifier, Json}; +use serde_json::Value as JsonValue; +use sled::{IVec, Tree}; + +use crate::errors::map_err; + +pub type EdgePropertyItem = ((Edge, Identifier), JsonValue); + +pub struct EdgePropertyManager<'tree> { + pub tree: &'tree Tree, + pub value_index_tree: &'tree Tree, +} + +impl<'tree> EdgePropertyManager<'tree> { + pub fn new(tree: &'tree Tree, value_index_tree: &'tree Tree) -> Self { + EdgePropertyManager { tree, value_index_tree } + } + + fn key(&self, edge: &Edge, name: Identifier) -> Vec { + util::build(&[ + util::Component::Uuid(edge.outbound_id), + util::Component::Identifier(edge.t), + util::Component::Uuid(edge.inbound_id), + util::Component::Identifier(name), + ]) + } + + fn read_key(buf: IVec) -> (Edge, Identifier) { + let mut cursor = Cursor::new(buf.as_ref()); + let edge_property_outbound_id = util::read_uuid(&mut cursor); + let edge_property_t = util::read_identifier(&mut cursor); + let edge_property_inbound_id = util::read_uuid(&mut cursor); + let edge_property_name = util::read_identifier(&mut cursor); + let edge = Edge { + outbound_id: edge_property_outbound_id, + t: edge_property_t, + inbound_id: edge_property_inbound_id, + }; + (edge, edge_property_name) + } + + pub fn iterate_for_property_name( + &self, + name: Identifier, + ) -> indradb::Result> + '_> { + let prefix = util::build(&[util::Component::Identifier(name)]); + let iterator = self.value_index_tree.scan_prefix(prefix); + + Ok(iterator.map(move |item| -> indradb::Result { + let (k, _v) = map_err(item)?; + let (_p, _, edge) = Self::read_key_value_index(k); + Ok(edge) + })) + } + + pub fn iterate_for_property_name_and_value( + &'tree self, + name: Identifier, + value: &JsonValue, + ) -> indradb::Result> + 'tree> { + let value = value.clone(); + let prefix = util::build(&[ + util::Component::Identifier(name), + util::Component::Json(&Json::new(value)), + ]); + let iterator = self.value_index_tree.scan_prefix(prefix); + + Ok(iterator.map(move |item| -> indradb::Result { + let (k, _) = map_err(item)?; + let (_p, _, edge) = Self::read_key_value_index(k); + Ok(edge) + })) + } + + pub fn iterate_for_owner<'a>( + &'a self, + edge: &Edge, + ) -> indradb::Result> + 'a>> { + let prefix = util::build(&[ + util::Component::Uuid(edge.outbound_id), + util::Component::Identifier(edge.t), + util::Component::Uuid(edge.inbound_id), + ]); + + let iterator = self.tree.scan_prefix(prefix); + let mapped = iterator.map(move |item| -> indradb::Result { + let (k, v) = map_err(item)?; + let (edge, p_name) = Self::read_key(k); + let value = serde_json::from_slice(&v)?; + Ok(((edge, p_name), value)) + }); + + Ok(Box::new(mapped)) + } + + pub fn get(&self, edge: &Edge, name: Identifier) -> indradb::Result> { + let key = self.key(edge, name); + + match map_err(self.tree.get(key))? { + Some(ref value_bytes) => Ok(Some(serde_json::from_slice(value_bytes)?)), + None => Ok(None), + } + } + + fn key_value_index(edge: &Edge, value: &JsonValue, property_name: Identifier) -> Vec { + util::build(&[ + util::Component::Identifier(property_name), + util::Component::Json(&Json::new(value.clone())), + util::Component::Uuid(edge.outbound_id), + util::Component::Identifier(edge.t), + util::Component::Uuid(edge.inbound_id), + ]) + } + + fn read_key_value_index(buf: IVec) -> (Identifier, u64, Edge) { + let mut cursor = Cursor::new(buf.as_ref()); + let name = util::read_identifier(&mut cursor); + let value = util::read_u64(&mut cursor); + let outbound_id = util::read_uuid(&mut cursor); + let t = util::read_identifier(&mut cursor); + let inbound_id = util::read_uuid(&mut cursor); + ( + name, + value, + Edge { + outbound_id, + t, + inbound_id, + }, + ) + } + + pub fn set_batch( + &self, + edge: &Edge, + batch: &mut sled::Batch, + batch_value: &mut sled::Batch, + property_creation_set: &mut HashMap<(Edge, Identifier), Vec>, + name: Identifier, + value: &JsonValue, + ) -> indradb::Result<()> { + let key = self.key(edge, name); + let value_json = serde_json::to_vec(value)?; + batch.insert(key.clone(), value_json); + let old_value = map_err(self.tree.get(key.clone()))?; + if let Some(old_value) = old_value { + let old_value: Json = serde_json::from_slice(&old_value)?; + let value_key = Self::key_value_index(edge, &old_value, name); + batch_value.remove(value_key.as_slice()); + } + let value_key = Self::key_value_index(edge, value, name); + property_creation_set.insert((edge.clone(), name), value_key); + Ok(()) + } + + pub fn set(&self, edge: &Edge, name: Identifier, value: &JsonValue) -> indradb::Result<()> { + let key = self.key(edge, name); + let value_json = serde_json::to_vec(value)?; + + let old_value = map_err(self.tree.get(key.clone()))?; + if let Some(old_value) = old_value { + let old_value: Json = serde_json::from_slice(&old_value)?; + let value_key = Self::key_value_index(edge, &old_value, name); + map_err(self.value_index_tree.remove(value_key.as_slice()))?; + } + + map_err(self.tree.insert(key.as_slice(), value_json.as_slice()))?; + let value_key = Self::key_value_index(edge, value, name); + + map_err( + self.value_index_tree + .insert(value_key.as_slice(), value_json.as_slice()), + )?; + Ok(()) + } + + pub fn delete(&self, edge: &Edge, name: Identifier) -> indradb::Result<()> { + let old_value = map_err(self.tree.get(self.key(edge, name)))?; + map_err(self.tree.remove(self.key(edge, name)))?; + if let Some(old_value) = old_value { + let old_value: Json = serde_json::from_slice(&old_value)?; + let value_key = Self::key_value_index(edge, &old_value, name); + map_err(self.value_index_tree.remove(value_key.as_slice()))?; + } + + Ok(()) + } +} diff --git a/src/managers/edge_range_manager.rs b/src/managers/edge_range_manager.rs new file mode 100644 index 0000000..0ee1b61 --- /dev/null +++ b/src/managers/edge_range_manager.rs @@ -0,0 +1,92 @@ +use std::io::Cursor; + +use indradb::{util, Edge}; +use sled::{Batch, Iter as DbIterator, Tree}; +use uuid::Uuid; + +use crate::datastore::SledHolder; +use crate::errors::map_err; + +pub struct EdgeRangeManager<'tree> { + pub tree: &'tree Tree, +} + +impl<'tree> EdgeRangeManager<'tree> { + pub fn new<'db: 'tree>(ds: &'db SledHolder) -> Self { + EdgeRangeManager { tree: &ds.edge_ranges } + } + + pub fn new_reversed<'db: 'tree>(ds: &'db SledHolder) -> Self { + EdgeRangeManager { + tree: &ds.reversed_edge_ranges, + } + } + + fn key(&self, edge: &Edge) -> Vec { + util::build(&[ + util::Component::Uuid(edge.outbound_id), + util::Component::Identifier(edge.t), + util::Component::Uuid(edge.inbound_id), + ]) + } + + pub(crate) fn contains(&self, edge: &Edge) -> indradb::Result { + let key = self.key(edge); + map_err(self.tree.contains_key(key)) + } + + fn sled_to_edge(iter: DbIterator) -> impl Iterator> { + iter.map(move |item| { + let (k, _) = map_err(item)?; + let mut cursor = Cursor::new(k); + let outbound_id = util::read_uuid(&mut cursor); + let t = util::read_identifier(&mut cursor); + let inbound_id = util::read_uuid(&mut cursor); + Ok(Edge { + outbound_id, + t, + inbound_id, + }) + }) + } + + pub fn iterate_for_range<'iter, 'trans: 'iter>( + &'trans self, + edge: &Edge, + ) -> impl Iterator> { + let offset = self.key(edge); + let iterator = self.tree.range(offset..); + Self::sled_to_edge(iterator) + } + + pub fn iterate_for_all(&self) -> impl Iterator> { + let iterator = self.tree.iter(); + Self::sled_to_edge(iterator) + } + + pub fn iterate_for_owner<'iter, 'trans: 'iter>( + &'trans self, + id: Uuid, + ) -> impl Iterator> + 'iter { + let prefix: Vec = util::build(&[util::Component::Uuid(id)]); + let iterator = self.tree.scan_prefix(prefix); + Self::sled_to_edge(iterator) + } + + pub fn set(&self, edge: &Edge) -> indradb::Result<()> { + let key = self.key(edge); + map_err(self.tree.insert(key, &[]))?; + Ok(()) + } + + pub fn set_batch(&self, edge: &Edge, batch: &mut Batch) -> indradb::Result<()> { + let key = self.key(edge); + batch.insert(key, &[]); + Ok(()) + } + + pub fn delete(&self, edge: &Edge) -> indradb::Result<()> { + map_err(self.tree.remove(self.key(edge)))?; + Ok(()) + } +} diff --git a/src/managers/metadata.rs b/src/managers/metadata.rs new file mode 100644 index 0000000..82b989e --- /dev/null +++ b/src/managers/metadata.rs @@ -0,0 +1,91 @@ +use std::collections::HashSet; +use std::io::Cursor; +use std::sync::{Arc, RwLock}; + +use indradb::{util, Identifier}; +use sled::Tree; + +use crate::errors::{map_err, DSError}; + +const INDEXED_PROPERTIES: &str = "IndexedProperties"; + +pub struct MetaDataManager<'tree> { + pub tree: &'tree Tree, + indexed_properties: Arc>>, + index_key: Identifier, +} + +impl<'tree> MetaDataManager<'tree> { + pub fn new(tree: &'tree Tree) -> indradb::Result { + let manager = MetaDataManager { + tree, + indexed_properties: Arc::new(RwLock::new(HashSet::new())), + index_key: Identifier::new(INDEXED_PROPERTIES)?, + }; + manager.load()?; + Ok(manager) + } + + pub fn is_indexed(&self, prop: &Identifier) -> indradb::Result { + let indexed_properties = self.indexed_properties.read().map_err(DSError::from)?; + + let is_indexed = indexed_properties.contains(prop.as_str()); + Ok(is_indexed) + } + + pub fn add_index(&self, prop: &Identifier) -> indradb::Result<()> { + { + let mut indexed_properties = self.indexed_properties.write().map_err(DSError::from)?; + if indexed_properties.contains(prop.as_str()) { + return Ok(()); + } + indexed_properties.insert(prop.to_string()); + } + self.sync()?; + Ok(()) + } + + #[allow(dead_code)] + pub fn remove_index(&self, prop: &Identifier) -> indradb::Result<()> { + { + let mut indexed_properties = self.indexed_properties.write().map_err(DSError::from)?; + if !indexed_properties.contains(prop.as_str()) { + return Ok(()); + } + + indexed_properties.remove(prop.as_str()); + } + self.sync()?; + Ok(()) + } + + fn load(&self) -> indradb::Result<()> { + let mut indexed_properties = self.indexed_properties.write().map_err(DSError::from)?; + let all_indexed_prefix = util::build(&[util::Component::Identifier(self.index_key)]); + for index in self.tree.scan_prefix(all_indexed_prefix) { + let (k, _) = map_err(index)?; + let mut cursor = Cursor::new(k); + let _ = util::read_identifier(&mut cursor); + let prop = util::read_identifier(&mut cursor); + + indexed_properties.insert(prop.to_string()); + } + Ok(()) + } + + pub(crate) fn sync(&self) -> indradb::Result<()> { + let all_indexed_prefix = util::build(&[util::Component::Identifier(self.index_key)]); + for index in self.tree.scan_prefix(all_indexed_prefix) { + let (key, _) = map_err(index)?; + map_err(self.tree.remove(key))?; + } + for index in self.indexed_properties.read().map_err(DSError::from)?.iter() { + let key = util::build(&[ + util::Component::Identifier(self.index_key), + util::Component::Identifier(Identifier::new(index)?), + ]); + map_err(self.tree.insert(key, &[]))?; + } + Ok(()) + } +} diff --git a/src/managers/mod.rs b/src/managers/mod.rs new file mode 100644 index 0000000..775f518 --- /dev/null +++ b/src/managers/mod.rs @@ -0,0 +1,6 @@ +pub(crate) mod edge_manager; +pub(crate) mod edge_property_manager; +pub(crate) mod edge_range_manager; +pub(crate) mod metadata; +pub(crate) mod vertex_manager; +pub(crate) mod vertex_property_manager; diff --git a/src/managers/vertex_manager.rs b/src/managers/vertex_manager.rs new file mode 100644 index 0000000..c760239 --- /dev/null +++ b/src/managers/vertex_manager.rs @@ -0,0 +1,115 @@ +use std::io::Cursor; +use std::ops::Deref; + +use indradb::{util, Identifier, Vertex}; +use sled::{Batch, Iter as DbIterator, Tree}; +use uuid::Uuid; + +use crate::datastore::SledHolder; +use crate::errors::map_err; +use crate::managers::edge_manager::EdgeManager; +use crate::managers::edge_range_manager::EdgeRangeManager; +use crate::managers::vertex_property_manager::VertexPropertyManager; + +pub type VertexItem = (Uuid, Identifier); + +pub struct VertexManager<'db: 'tree, 'tree> { + pub holder: &'db SledHolder, + pub tree: &'tree Tree, +} + +impl<'db: 'tree, 'tree> VertexManager<'db, 'tree> { + pub fn new(ds: &'db SledHolder) -> Self { + VertexManager { + holder: ds, + tree: ds.db.deref(), + } + } + + pub fn count(&self) -> u64 { + self.tree.iter().count() as u64 + } + + fn key(&self, id: Uuid) -> Vec { + util::build(&[util::Component::Uuid(id)]) + } + + pub fn exists(&self, id: Uuid) -> indradb::Result { + Ok(map_err(self.tree.get(self.key(id)))?.is_some()) + } + + pub fn get(&self, id: Uuid) -> indradb::Result> { + match map_err(self.tree.get(self.key(id)))? { + Some(value_bytes) => { + let mut cursor = Cursor::new(value_bytes.deref()); + Ok(Some(util::read_identifier(&mut cursor))) + } + None => Ok(None), + } + } + + fn iterate(&self, iterator: DbIterator) -> impl Iterator> + '_ { + iterator.map(move |item| -> indradb::Result { + let (k, v) = map_err(item)?; + + let id = { + debug_assert_eq!(k.len(), 16); + let mut cursor = Cursor::new(k); + util::read_uuid(&mut cursor) + }; + + let mut cursor = Cursor::new(v); + let t = util::read_identifier(&mut cursor); + Ok((id, t)) + }) + } + + pub fn iterate_for_range(&self, id: Uuid) -> impl Iterator> + '_ { + let low_key = util::build(&[util::Component::Uuid(id)]); + let low_key_bytes: &[u8] = low_key.as_ref(); + let iter = self.tree.range(low_key_bytes..); + self.iterate(iter) + } + + pub fn create(&self, vertex: &Vertex) -> indradb::Result { + let key = self.key(vertex.id); + if map_err(self.tree.contains_key(&key))? { + return Ok(false); + } + map_err( + self.tree + .insert(&key, util::build(&[util::Component::Identifier(vertex.t)])), + )?; + Ok(true) + } + + pub fn create_batch(&self, vertex: &Vertex, batch: &mut Batch) -> indradb::Result<()> { + let key = self.key(vertex.id); + batch.insert(key.clone(), util::build(&[util::Component::Identifier(vertex.t)])); + Ok(()) + } + + pub fn delete(&self, id: Uuid) -> indradb::Result<()> { + map_err(self.tree.remove(self.key(id)))?; + + let vertex_property_manager = + VertexPropertyManager::new(&self.holder.vertex_properties, &self.holder.vertex_property_values); + for item in vertex_property_manager.iterate_for_owner(id)? { + let ((vertex_property_owner_id, vertex_property_name), _) = item?; + vertex_property_manager.delete(vertex_property_owner_id, vertex_property_name)?; + } + + let edge_manager = EdgeManager::new(self.holder); + + { + let edge_range_manager = EdgeRangeManager::new(self.holder); + for item in edge_range_manager.iterate_for_owner(id) { + let edge = item?; + debug_assert_eq!(edge.outbound_id, id); + edge_manager.delete(&edge)?; + } + } + + Ok(()) + } +} diff --git a/src/managers/vertex_property_manager.rs b/src/managers/vertex_property_manager.rs new file mode 100644 index 0000000..06a6b77 --- /dev/null +++ b/src/managers/vertex_property_manager.rs @@ -0,0 +1,171 @@ +use std::collections::HashMap; +use std::io::Cursor; + +use indradb::{util, Identifier, Json}; +use serde_json::Value as JsonValue; +use sled::{IVec, Tree}; +use uuid::Uuid; + +use crate::errors::map_err; + +pub type OwnedPropertyItem = ((Uuid, Identifier), JsonValue); + +pub struct VertexPropertyManager<'tree> { + pub tree: &'tree Tree, + pub value_index_tree: &'tree Tree, +} + +impl<'tree> VertexPropertyManager<'tree> { + pub fn new(tree: &'tree Tree, value_index_tree: &'tree Tree) -> Self { + VertexPropertyManager { tree, value_index_tree } + } + + fn key(&self, vertex_id: Uuid, name: Identifier) -> Vec { + util::build(&[util::Component::Uuid(vertex_id), util::Component::Identifier(name)]) + } + + fn key_value_index(vertex_id: &Uuid, value: &JsonValue, property_name: Identifier) -> Vec { + util::build(&[ + util::Component::Identifier(property_name), + util::Component::Json(&Json::new(value.clone())), + util::Component::Uuid(*vertex_id), + ]) + } + + fn read_key_value_index(buf: IVec) -> (Identifier, u64, Uuid) { + let mut cursor = Cursor::new(buf.as_ref()); + let name = util::read_identifier(&mut cursor); + let value = util::read_u64(&mut cursor); + let uuid = util::read_uuid(&mut cursor); + (name, value, uuid) + } + + fn value_iterate_uuids(&self, iterator: sled::Iter) -> impl Iterator> + '_ { + iterator.map(move |item| -> indradb::Result { + let (k, _) = map_err(item)?; + let (_, _, vid) = Self::read_key_value_index(k); + Ok(vid) + }) + } + + pub fn iterate_for_property_name( + &self, + name: Identifier, + ) -> indradb::Result> + '_> { + let prefix = util::build(&[util::Component::Identifier(name)]); + let iterator = self.value_index_tree.scan_prefix(prefix); + Ok(self.value_iterate_uuids(iterator)) + } + + pub fn iterate_for_property_name_and_value( + &self, + name: Identifier, + value: &JsonValue, + ) -> indradb::Result> + '_> { + let prefix = util::build(&[ + util::Component::Identifier(name), + util::Component::Json(&Json::new(value.clone())), + ]); + let iterator = self.value_index_tree.scan_prefix(prefix); + + Ok(self.value_iterate_uuids(iterator)) + } + + pub fn iterate_for_owner( + &self, + vertex_id: Uuid, + ) -> indradb::Result> + '_> { + let prefix = util::build(&[util::Component::Uuid(vertex_id)]); + let iterator = self.tree.scan_prefix(prefix); + + Ok(iterator.map(move |item| -> indradb::Result { + let (k, v) = map_err(item)?; + let mut cursor = Cursor::new(k); + let owner_id = util::read_uuid(&mut cursor); + debug_assert_eq!(vertex_id, owner_id); + let name = util::read_identifier(&mut cursor); + let value = serde_json::from_slice(&v)?; + Ok(((owner_id, name), value)) + })) + } + + pub fn get(&self, vertex_id: Uuid, name: Identifier) -> indradb::Result> { + let key = self.key(vertex_id, name); + + match map_err(self.tree.get(key))? { + Some(value_bytes) => Ok(Some(serde_json::from_slice(&value_bytes)?)), + None => Ok(None), + } + } + + pub fn set_batch( + &self, + vertex_id: Uuid, + batch: &mut sled::Batch, + batch_value: &mut sled::Batch, + property_creation_set: &mut HashMap<(Uuid, Identifier), Vec>, + name: Identifier, + value: &JsonValue, + ) -> indradb::Result<()> { + let key = self.key(vertex_id, name); + let value_json = serde_json::to_vec(value)?; + batch.insert(key.clone(), value_json); + let old_value = map_err(self.tree.get(key.clone()))?; + if let Some(old_value) = old_value { + let old_value: Json = serde_json::from_slice(&old_value)?; + let value_key = Self::key_value_index(&vertex_id, &old_value, name); + batch_value.remove(value_key.as_slice()); + } + let value_key = Self::key_value_index(&vertex_id, value, name); + property_creation_set.insert((vertex_id, name), value_key); + Ok(()) + } + + pub fn set(&self, vertex_id: Uuid, name: Identifier, value: &JsonValue) -> indradb::Result<()> { + let key = self.key(vertex_id, name); + let value_json = serde_json::to_vec(value)?; + + if let Some(old) = map_err(self.tree.get(key.clone()))? { + let old_value = serde_json::from_slice(&old)?; + let value_index_key = Self::key_value_index(&vertex_id, &old_value, name); + map_err(self.value_index_tree.remove(value_index_key))?; + } + + map_err(self.tree.insert(key.as_slice(), value_json.as_slice()))?; + let value_index_key = Self::key_value_index(&vertex_id, value, name); + map_err(self.value_index_tree.insert(value_index_key, value_json.as_slice()))?; + Ok(()) + } + + pub fn delete(&self, vertex_id: Uuid, name: Identifier) -> indradb::Result<()> { + let old_value = map_err(self.tree.get(self.key(vertex_id, name)))?; + map_err(self.tree.remove(self.key(vertex_id, name)))?; + if let Some(old_value) = old_value { + let old_value = serde_json::from_slice(&old_value)?; + let value_index_key = Self::key_value_index(&vertex_id, &old_value, name); + map_err(self.value_index_tree.remove(value_index_key))?; + } + + Ok(()) + } +} +#[cfg(test)] +mod test { + use serde_json::json; + use uuid::{Context, Timestamp}; + + use super::*; + + #[test] + fn test_index_key_and_reco() { + let context = Context::new(24); + let uuid = Uuid::new_v1(Timestamp::now(context), &[1, 2, 3, 4, 5, 6]); + let name = Identifier::new("_changesetID").unwrap(); + let value = json! {"Changesets/25dfc1e7-fdd1-4027-9e98-48a8429a9c70"}; + let key = VertexPropertyManager::key_value_index(&uuid, &value, name); + + let (n, _v, id) = VertexPropertyManager::read_key_value_index(key.into()); + assert_eq!(n, name); + assert_eq!(uuid, id); + } +} diff --git a/src/transaction.rs b/src/transaction.rs new file mode 100644 index 0000000..d4d694c --- /dev/null +++ b/src/transaction.rs @@ -0,0 +1,332 @@ +use std::collections::HashMap; +use std::ops::Deref; + +use indradb::{BulkInsertItem, DynIter, Edge, Error, Identifier, Json, Transaction, Vertex}; +use sled::{Batch, IVec}; +use uuid::Uuid; + +use crate::datastore::SledHolder; +use crate::errors::map_err; +use crate::managers::edge_manager::EdgeManager; +use crate::managers::edge_property_manager::EdgePropertyManager; +use crate::managers::edge_range_manager::EdgeRangeManager; +use crate::managers::metadata::MetaDataManager; +use crate::managers::vertex_manager::VertexManager; +use crate::managers::vertex_property_manager::VertexPropertyManager; + +#[derive(Default)] +struct IndraSledBatch { + pub(crate) vertex_creation_batch: Batch, + pub(crate) edge_creation_batch: Batch, + pub(crate) edge_range_creation_batch: Batch, + pub(crate) edge_range_rev_creation_batch: Batch, + pub(crate) vertex_property_creation_batch: Batch, + pub(crate) vertex_property_value_creation_batch: Batch, + pub(crate) vertex_property_creation_set: HashMap<(Uuid, Identifier), Vec>, + pub(crate) edge_property_creation_batch: Batch, + pub(crate) edge_property_value_creation_batch: Batch, + pub(crate) edge_property_creation_set: HashMap<(Edge, Identifier), Vec>, +} + +impl IndraSledBatch { + fn apply(mut self, holder: &SledHolder) -> indradb::Result<()> { + map_err(holder.db.deref().apply_batch(self.vertex_creation_batch))?; + map_err(holder.edges.apply_batch(self.edge_creation_batch))?; + map_err(holder.edge_ranges.apply_batch(self.edge_range_creation_batch))?; + map_err( + holder + .reversed_edge_ranges + .apply_batch(self.edge_range_rev_creation_batch), + )?; + map_err(holder.edge_properties.apply_batch(self.edge_property_creation_batch))?; + map_err( + holder + .vertex_properties + .apply_batch(self.vertex_property_creation_batch), + )?; + + for (_, key) in self.edge_property_creation_set { + self.edge_property_value_creation_batch.insert(key, IVec::default()); + } + for (_, key) in self.vertex_property_creation_set { + self.vertex_property_value_creation_batch.insert(key, IVec::default()); + } + map_err( + holder + .vertex_property_values + .apply_batch(self.vertex_property_value_creation_batch), + )?; + map_err( + holder + .edge_property_values + .apply_batch(self.edge_property_value_creation_batch), + )?; + Ok(()) + } +} + +/// A transaction that is backed by Sled. +pub struct SledTransaction<'a> { + pub(crate) holder: &'a SledHolder, + pub(crate) vertex_manager: VertexManager<'a, 'a>, + pub(crate) edge_manager: EdgeManager<'a, 'a>, + pub(crate) edge_property_manager: EdgePropertyManager<'a>, + pub(crate) vertex_property_manager: VertexPropertyManager<'a>, + pub(crate) edge_range_manager: EdgeRangeManager<'a>, + pub(crate) edge_range_manager_rev: EdgeRangeManager<'a>, + pub(crate) meta_data_manager: MetaDataManager<'a>, +} + +impl<'a> Transaction<'a> for SledTransaction<'a> { + fn vertex_count(&self) -> u64 { + let vertex_manager = VertexManager::new(self.holder); + vertex_manager.count() + } + fn all_vertices(&'a self) -> indradb::Result> { + let iterator = self.vertex_manager.iterate_for_range(Uuid::default()); + let mapped = iterator.map(move |item| { + let (id, t) = item?; + let vertex = Vertex::with_id(id, t); + Ok::(vertex) + }); + + Ok(Box::new(mapped)) + } + + fn range_vertices(&'a self, offset: Uuid) -> indradb::Result> { + let iter = self + .vertex_manager + .iterate_for_range(offset) + .map(|e| e.map(|v| Vertex::with_id(v.0, v.1))); + Ok(Box::new(iter)) + } + + fn specific_vertices(&'a self, ids: Vec) -> indradb::Result> { + let iter = ids.into_iter().filter_map(move |id| { + let v = self.vertex_manager.get(id).transpose(); + v.map(|v| v.map(|v| Vertex::with_id(id, v))) + }); + Ok(Box::new(iter)) + } + + fn vertex_ids_with_property(&'a self, name: Identifier) -> indradb::Result>> { + if !self.meta_data_manager.is_indexed(&name)? { + return Ok(None); + } + let iter = self.vertex_property_manager.iterate_for_property_name(name)?; + Ok(Some(Box::new(iter))) + } + + fn vertex_ids_with_property_value( + &'a self, + name: Identifier, + value: &Json, + ) -> indradb::Result>> { + if !self.meta_data_manager.is_indexed(&name)? { + return Ok(None); + } + let iter = self + .vertex_property_manager + .iterate_for_property_name_and_value(name, value)?; + Ok(Some(Box::new(iter))) + } + + fn edge_count(&self) -> u64 { + let edge_manager = EdgeManager::new(self.holder); + edge_manager.count() + } + + fn all_edges(&'a self) -> indradb::Result> { + let iter = self.edge_range_manager.iterate_for_all(); + + Ok(Box::new(iter)) + } + + fn range_edges(&'a self, offset: Edge) -> indradb::Result> { + let iter = self.edge_range_manager.iterate_for_range(&offset); + + Ok(Box::new(iter)) + } + + fn range_reversed_edges(&'a self, offset: Edge) -> indradb::Result> { + let iter = self.edge_range_manager_rev.iterate_for_range(&offset); + + Ok(Box::new(iter)) + } + + fn specific_edges(&'a self, edges: Vec) -> indradb::Result> { + let iter: Vec<_> = edges + .into_iter() + .filter(|e| { + let r = self.edge_range_manager.contains(e); + if let Ok(r) = r { + r + } else { + false + } + }) + .map(Ok) + .collect(); + Ok(Box::new(iter.into_iter())) + } + + fn edges_with_property(&'a self, name: Identifier) -> indradb::Result>> { + if !self.meta_data_manager.is_indexed(&name)? { + return Ok(None); + } + let iter = self.edge_property_manager.iterate_for_property_name(name)?; + Ok(Some(Box::new(iter))) + } + + fn edges_with_property_value( + &'a self, + name: Identifier, + value: &Json, + ) -> indradb::Result>> { + if !self.meta_data_manager.is_indexed(&name)? { + return Ok(None); + } + let iter = self + .edge_property_manager + .iterate_for_property_name_and_value(name, value)?; + Ok(Some(Box::new(iter))) + } + + fn vertex_property(&self, vertex: &Vertex, name: Identifier) -> indradb::Result> { + let r = self.vertex_property_manager.get(vertex.id, name)?; + Ok(r.map(|v| v.into())) + } + + fn all_vertex_properties_for_vertex(&'a self, vertex: &Vertex) -> indradb::Result> { + let iter = self.vertex_property_manager.iterate_for_owner(vertex.id)?; + let iter = iter.map(|r| r.map(|((_, name), val)| (name, Json::new(val)))); + Ok(Box::new(iter)) + } + + fn edge_property(&self, edge: &Edge, name: Identifier) -> indradb::Result> { + let result = self.edge_property_manager.get(edge, name)?; + Ok(result.map(Json::new)) + } + + fn all_edge_properties_for_edge(&'a self, edge: &Edge) -> indradb::Result> { + let iter = self.edge_property_manager.iterate_for_owner(edge)?; + let iter = iter.map(|e| e.map(|((_, id), val)| (id, Json::new(val)))); + Ok(Box::new(iter)) + } + + fn delete_vertices(&mut self, vertices: Vec) -> indradb::Result<()> { + for v in vertices { + self.vertex_manager.delete(v.id)? + } + Ok(()) + } + + fn delete_edges(&mut self, edges: Vec) -> indradb::Result<()> { + for item in edges.iter() { + if self.vertex_manager.get(item.outbound_id)?.is_some() { + self.edge_manager.delete(item)?; + }; + } + + Ok(()) + } + + fn delete_vertex_properties(&mut self, props: Vec<(Uuid, Identifier)>) -> indradb::Result<()> { + for (id, prop) in props { + self.vertex_property_manager.delete(id, prop)? + } + Ok(()) + } + + fn delete_edge_properties(&mut self, props: Vec<(Edge, Identifier)>) -> indradb::Result<()> { + for (edge, prop) in props { + self.edge_property_manager.delete(&edge, prop)?; + } + Ok(()) + } + + fn sync(&self) -> indradb::Result<()> { + self.meta_data_manager.sync()?; + let _ = map_err(self.holder.db.flush())?; + Ok(()) + } + + fn create_vertex(&mut self, vertex: &Vertex) -> indradb::Result { + self.vertex_manager.create(vertex) + } + + fn create_edge(&mut self, edge: &Edge) -> indradb::Result { + let outbound_exists = self.vertex_manager.exists(edge.outbound_id)?; + let inbound_exists = self.vertex_manager.exists(edge.inbound_id)?; + + if !outbound_exists || !inbound_exists { + Ok(false) + } else { + self.edge_manager.set(edge)?; + Ok(true) + } + } + + fn bulk_insert(&mut self, items: Vec) -> indradb::Result<()> { + let mut batch = IndraSledBatch::default(); + + for item in items { + match item { + BulkInsertItem::Vertex(v) => { + self.vertex_manager.create_batch(&v, &mut batch.vertex_creation_batch)?; + } + BulkInsertItem::Edge(e) => { + self.edge_manager.set_batch( + &e, + &mut batch.edge_creation_batch, + &mut batch.edge_range_creation_batch, + &mut batch.edge_range_rev_creation_batch, + )?; + } + BulkInsertItem::VertexProperty(id, p, v) => { + self.vertex_property_manager.set_batch( + id, + &mut batch.vertex_property_creation_batch, + &mut batch.vertex_property_value_creation_batch, + &mut batch.vertex_property_creation_set, + p, + &v, + )?; + } + BulkInsertItem::EdgeProperty(e, p, v) => { + self.edge_property_manager.set_batch( + &e, + &mut batch.edge_property_creation_batch, + &mut batch.edge_property_value_creation_batch, + &mut batch.edge_property_creation_set, + p, + &v, + )?; + } + } + } + batch.apply(self.holder)?; + + self.sync()?; + Ok(()) + } + + fn index_property(&mut self, name: Identifier) -> indradb::Result<()> { + self.meta_data_manager.add_index(&name)?; + Ok(()) + } + + fn set_vertex_properties(&mut self, vertices: Vec, name: Identifier, value: &Json) -> indradb::Result<()> { + for v in vertices { + self.vertex_property_manager.set(v, name, value)?; + } + Ok(()) + } + + fn set_edge_properties(&mut self, edges: Vec, name: Identifier, value: &Json) -> indradb::Result<()> { + for edge in edges { + self.edge_property_manager.set(&edge, name, value)?; + } + Ok(()) + } +}