diff --git a/db4-graph/src/lib.rs b/db4-graph/src/lib.rs index 7b7e8b7fa2..2b21a7d232 100644 --- a/db4-graph/src/lib.rs +++ b/db4-graph/src/lib.rs @@ -1,10 +1,7 @@ use std::{ io, path::{Path, PathBuf}, - sync::{ - atomic::{self, AtomicU64, AtomicUsize}, - Arc, - }, + sync::{atomic::AtomicUsize, Arc}, }; use raphtory_api::core::{ @@ -26,11 +23,13 @@ use storage::{ }, persist::strategy::{Config, PersistentStrategy}, resolver::GIDResolverOps, - wal::{GraphWal, TransactionID, Wal}, - Extension, GIDResolver, Layer, ReadLockedLayer, WalImpl, ES, GS, NS, + Extension, GIDResolver, Layer, ReadLockedLayer, transaction::TransactionManager, + WalImpl, ES, NS, GS, wal::Wal, }; use tempfile::TempDir; +mod replay; + #[derive(Debug)] pub struct TemporalGraph { // mapping between logical and physical ids @@ -83,40 +82,6 @@ impl<'a> From<&'a Path> for GraphDir { } } -#[derive(Debug)] -pub struct TransactionManager { - last_transaction_id: AtomicU64, - wal: Arc, -} - -impl TransactionManager { - const STARTING_TRANSACTION_ID: TransactionID = 1; - - pub fn new(wal: Arc) -> Self { - Self { - last_transaction_id: AtomicU64::new(Self::STARTING_TRANSACTION_ID), - wal, - } - } - - pub fn load(self, last_transaction_id: TransactionID) { - self.last_transaction_id - .store(last_transaction_id, atomic::Ordering::SeqCst) - } - - pub fn begin_transaction(&self) -> TransactionID { - let transaction_id = self - .last_transaction_id - .fetch_add(1, atomic::Ordering::SeqCst); - self.wal.log_begin_transaction(transaction_id).unwrap(); - transaction_id - } - - pub fn end_transaction(&self, transaction_id: TransactionID) { - self.wal.log_end_transaction(transaction_id).unwrap(); - } -} - impl Default for TemporalGraph { fn default() -> Self { Self::new(Extension::default()).unwrap() @@ -161,7 +126,7 @@ impl, ES = ES, GS = GS>> Temporal logical_to_physical: resolver.into(), node_count, storage: Arc::new(storage), - transaction_manager: Arc::new(TransactionManager::new(wal.clone())), + transaction_manager: Arc::new(TransactionManager::new()), wal, }) } @@ -207,7 +172,7 @@ impl, ES = ES, GS = GS>> Temporal logical_to_physical, node_count: AtomicUsize::new(0), storage: Arc::new(storage), - transaction_manager: Arc::new(TransactionManager::new(wal.clone())), + transaction_manager: Arc::new(TransactionManager::new()), wal, }) } @@ -371,6 +336,7 @@ impl, ES = ES, GS = GS>> Temporal } } +/// Holds write locks across all segments in the graph for fast bulk ingestion. pub struct WriteLockedGraph<'a, EXT> where EXT: PersistentStrategy, ES = ES, GS = GS>, diff --git a/db4-graph/src/replay.rs b/db4-graph/src/replay.rs new file mode 100644 index 0000000000..b620c4fe83 --- /dev/null +++ b/db4-graph/src/replay.rs @@ -0,0 +1,131 @@ +//! Implements WAL replay for a `WriteLockedGraph`. +//! Allows for fast replay by making use of one-time lock acquisition for +//! all the segments in the graph. + +use storage::pages::resolve_pos; +use crate::{WriteLockedGraph}; +use raphtory_api::core::{ + entities::{properties::prop::Prop, EID, GID, VID}, + storage::timeindex::TimeIndexEntry, + entities::properties::meta::STATIC_GRAPH_LAYER_ID, +}; +use raphtory_core::entities::GidRef; +use storage::{ + persist::strategy::PersistentStrategy, + NS, ES, GS, + error::StorageError, + wal::{GraphReplay, TransactionID, LSN}, +}; +use storage::resolver::GIDResolverOps; + +impl GraphReplay for WriteLockedGraph<'_, EXT> +where + EXT: PersistentStrategy, ES = ES, GS = GS>, +{ + fn replay_add_edge( + &mut self, + lsn: LSN, + transaction_id: TransactionID, + t: TimeIndexEntry, + src_name: GID, + src_id: VID, + dst_name: GID, + dst_id: VID, + eid: EID, + layer_name: Option, + layer_id: usize, + props: Vec<(String, usize, Prop)>, + ) -> Result<(), StorageError> { + // TODO: Check max lsn on disk to see if this record should be replayed. + + let temporal_graph = self.graph(); + let node_max_page_len = temporal_graph.storage().nodes().max_page_len(); + let edge_max_page_len = temporal_graph.storage().edges().max_page_len(); + + // 1. Insert prop ids into edge meta. + // No need to validate props again since they are already validated before + // being logged to the WAL. + let edge_meta = temporal_graph.edge_meta(); + let mut prop_ids_and_values = Vec::new(); + + for (prop_name, prop_id, prop_value) in props.into_iter() { + let prop_mapper = edge_meta.temporal_prop_mapper(); + + prop_mapper.set_id_and_dtype(prop_name, prop_id, prop_value.dtype()); + prop_ids_and_values.push((prop_id, prop_value)); + } + + // 2. Insert node ids into resolver. + temporal_graph.logical_to_physical.set(GidRef::from(&src_name), src_id)?; + temporal_graph.logical_to_physical.set(GidRef::from(&dst_name), dst_id)?; + + // 3. Insert layer id into the layer meta of both edge and node. + let node_meta = temporal_graph.node_meta(); + + edge_meta.layer_meta().set_id(layer_name.as_deref().unwrap_or("_default"), layer_id); + node_meta.layer_meta().set_id(layer_name.as_deref().unwrap_or("_default"), layer_id); + + // 4. Grab src writer and add edge data. + let (src_segment_id, src_pos) = resolve_pos(src_id, node_max_page_len); + let num_nodes = src_id.index() + 1; + self.resize_chunks_to_num_nodes(num_nodes); // Create enough segments. + + let mut src_writer = self.nodes.get_mut(src_segment_id).unwrap().writer(); + src_writer.store_node_id(src_pos, STATIC_GRAPH_LAYER_ID, GidRef::from(&src_name)); + + let is_new_edge_static = src_writer.get_out_edge(src_pos, dst_id, STATIC_GRAPH_LAYER_ID).is_none(); + let is_new_edge_layer = src_writer.get_out_edge(src_pos, dst_id, layer_id).is_none(); + + // Add the edge to the static graph if it doesn't already exist. + if is_new_edge_static { + src_writer.add_static_outbound_edge(src_pos, dst_id, eid); + } + + // Add the edge to the layer if it doesn't already exist, else just record the timestamp. + if is_new_edge_layer { + src_writer.add_outbound_edge(Some(t), src_pos, dst_id, eid.with_layer(layer_id)); + } else { + src_writer.update_timestamp(t, src_pos, eid.with_layer(layer_id)); + } + + // Release the writer for mutable access to dst_writer. + drop(src_writer); + + // 5. Grab dst writer and add edge data. + let (dst_segment_id, dst_pos) = resolve_pos(dst_id, node_max_page_len); + let num_nodes = dst_id.index() + 1; + self.resize_chunks_to_num_nodes(num_nodes); + + let mut dst_writer = self.nodes.get_mut(dst_segment_id).unwrap().writer(); + dst_writer.store_node_id(dst_pos, STATIC_GRAPH_LAYER_ID, GidRef::from(&dst_name)); + + if is_new_edge_static { + dst_writer.add_static_inbound_edge(dst_pos, src_id, eid); + } + + if is_new_edge_layer { + dst_writer.add_inbound_edge(Some(t), dst_pos, src_id, eid.with_layer(layer_id)); + } else { + dst_writer.update_timestamp(t, dst_pos, eid.with_layer(layer_id)); + } + + drop(dst_writer); + + // 6. Grab edge writer and add temporal props & metadata. + let (edge_segment_id, edge_pos) = resolve_pos(eid, edge_max_page_len); + let num_edges = eid.index() + 1; + self.resize_chunks_to_num_edges(num_edges); + let mut edge_writer = self.edges.get_mut(edge_segment_id).unwrap().writer(); + + // Add edge into the static graph if it doesn't already exist. + if is_new_edge_static { + let already_counted = false; + edge_writer.add_static_edge(Some(edge_pos), src_id, dst_id, already_counted); + } + + // Add edge into the specified layer with timestamp and props. + edge_writer.add_edge(t, edge_pos, src_id, dst_id, prop_ids_and_values, layer_id); + + Ok(()) + } +} diff --git a/db4-storage/src/api/edges.rs b/db4-storage/src/api/edges.rs index 61136444cd..905f6ed64b 100644 --- a/db4-storage/src/api/edges.rs +++ b/db4-storage/src/api/edges.rs @@ -58,8 +58,7 @@ pub trait EdgeSegmentOps: Send + Sync + std::fmt::Debug + 'static { fn try_head_mut(&self) -> Option>; - /// mark segment as dirty without triggering a write - fn mark_dirty(&self); + fn set_dirty(&self, dirty: bool); /// notify that an edge was added (might need to write to disk) fn notify_write( diff --git a/db4-storage/src/api/graph_props.rs b/db4-storage/src/api/graph_props.rs index 768aa8b123..a06ab76acc 100644 --- a/db4-storage/src/api/graph_props.rs +++ b/db4-storage/src/api/graph_props.rs @@ -29,7 +29,7 @@ where fn est_size(&self) -> usize; - fn mark_dirty(&self); + fn set_dirty(&self, dirty: bool); fn notify_write( &self, diff --git a/db4-storage/src/api/nodes.rs b/db4-storage/src/api/nodes.rs index ebea776c8a..9f9b2c2283 100644 --- a/db4-storage/src/api/nodes.rs +++ b/db4-storage/src/api/nodes.rs @@ -94,7 +94,7 @@ pub trait NodeSegmentOps: Send + Sync + std::fmt::Debug + 'static { head_lock: impl DerefMut, ) -> Result<(), StorageError>; - fn mark_dirty(&self); + fn set_dirty(&self, dirty: bool); fn check_node(&self, pos: LocalPOS, layer_id: usize) -> bool; diff --git a/db4-storage/src/lib.rs b/db4-storage/src/lib.rs index ebe5bf708b..10f7b74408 100644 --- a/db4-storage/src/lib.rs +++ b/db4-storage/src/lib.rs @@ -43,6 +43,7 @@ pub mod persist; pub mod properties; pub mod resolver; pub mod segments; +pub mod transaction; pub mod utils; pub mod wal; diff --git a/db4-storage/src/pages/edge_page/writer.rs b/db4-storage/src/pages/edge_page/writer.rs index cde4d7fdb3..c8ad00db36 100644 --- a/db4-storage/src/pages/edge_page/writer.rs +++ b/db4-storage/src/pages/edge_page/writer.rs @@ -3,7 +3,7 @@ use crate::{ segments::edge::segment::MemEdgeSegment, }; use arrow_array::{ArrayRef, BooleanArray}; -use raphtory_api::core::entities::{VID, properties::prop::Prop}; +use raphtory_api::core::entities::{properties::{meta::STATIC_GRAPH_LAYER_ID, prop::Prop}, VID}; use raphtory_core::{ entities::EID, storage::timeindex::{AsTime, TimeIndexEntry}, @@ -45,17 +45,20 @@ impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmen dst: VID, props: impl IntoIterator, layer_id: usize, - lsn: u64, ) -> LocalPOS { - let existing_edge = self + let is_new_edge = !self .page .contains_edge(edge_pos, layer_id, self.writer.deref()); - if !existing_edge { + + if is_new_edge { self.increment_layer_num_edges(layer_id); } + self.graph_stats.update_time(t.t()); + self.writer - .insert_edge_internal(t, edge_pos, src, dst, layer_id, props, lsn); + .insert_edge_internal(t, edge_pos, src, dst, layer_id, props); + edge_pos } @@ -91,7 +94,6 @@ impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmen src: VID, dst: VID, layer_id: usize, - lsn: u64, ) { let existing_edge = self .page @@ -101,27 +103,30 @@ impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmen } self.graph_stats.update_time(t.t()); self.writer - .delete_edge_internal(t, edge_pos, src, dst, layer_id, lsn); + .delete_edge_internal(t, edge_pos, src, dst, layer_id); } + /// Adds a static edge to the graph. + /// + /// If `edge_pos` is `None`, a new position is allocated. If `Some`, the provided position + /// is used. + /// Set `already_counted` to `true` when bulk loading to avoid double-counting statistics. pub fn add_static_edge( &mut self, edge_pos: Option, src: impl Into, dst: impl Into, - lsn: u64, - exist: bool, // used when edge_pos is Some but the is not counted, this is used in the bulk loader + already_counted: bool, ) -> LocalPOS { - let layer_id = 0; // assuming layer_id 0 for static edges, adjust as needed - - if edge_pos.is_some() && !exist { + if edge_pos.is_some() && !already_counted { self.page.increment_num_edges(); - self.increment_layer_num_edges(layer_id); + self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID); } - let edge_pos = edge_pos.unwrap_or_else(|| self.new_local_pos(layer_id)); + let edge_pos = edge_pos.unwrap_or_else(|| self.new_local_pos(STATIC_GRAPH_LAYER_ID)); self.writer - .insert_static_edge_internal(edge_pos, src, dst, layer_id, lsn); + .insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID); + edge_pos } @@ -131,26 +136,26 @@ impl<'a, MP: DerefMut + std::fmt::Debug, ES: EdgeSegmen edge_pos: LocalPOS, src: VID, dst: VID, - exists: bool, + edge_exists: bool, layer_id: usize, c_props: impl IntoIterator, t_props: impl IntoIterator, - lsn: u64, ) { - if !exists { - self.increment_layer_num_edges(0); + if !edge_exists { + self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID); self.increment_layer_num_edges(layer_id); + + self.writer + .insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID); } - self.writer - .insert_static_edge_internal(edge_pos, src, dst, 0, lsn); + self.graph_stats.update_time(t.t()); self.writer .update_const_properties(edge_pos, src, dst, layer_id, c_props); - self.graph_stats.update_time(t.t()); self.writer - .insert_edge_internal(t, edge_pos, src, dst, layer_id, t_props, lsn); + .insert_edge_internal(t, edge_pos, src, dst, layer_id, t_props); } pub fn segment_id(&self) -> usize { diff --git a/db4-storage/src/pages/edge_store.rs b/db4-storage/src/pages/edge_store.rs index 71ed0d1be3..2f676e5246 100644 --- a/db4-storage/src/pages/edge_store.rs +++ b/db4-storage/src/pages/edge_store.rs @@ -17,7 +17,7 @@ use crate::{ segments::edge::segment::MemEdgeSegment, }; use parking_lot::{RwLock, RwLockWriteGuard}; -use raphtory_api::core::entities::{EID, VID, properties::meta::Meta}; +use raphtory_api::core::entities::{properties::meta::{Meta, STATIC_GRAPH_LAYER_ID}, EID, VID}; use raphtory_core::{ entities::{ELID, LayerIds}, storage::timeindex::{AsTime, TimeIndexEntry}, @@ -117,6 +117,10 @@ impl, EXT: Config> EdgeStorageInner &self.layer_counter } + pub fn segments(&self) -> &boxcar::Vec> { + &self.segments + } + pub fn new_with_meta(edges_path: Option, edge_meta: Arc, ext: EXT) -> Self { let free_pages = (0..N).map(RwLock::new).collect::>(); let empty = Self { @@ -130,21 +134,25 @@ impl, EXT: Config> EdgeStorageInner let layer_mapper = empty.edge_meta().layer_meta(); let prop_mapper = empty.edge_meta().temporal_prop_mapper(); let metadata_mapper = empty.edge_meta().metadata_mapper(); + if layer_mapper.num_fields() > 0 || prop_mapper.num_fields() > 0 || metadata_mapper.num_fields() > 0 { - let segment = empty.get_or_create_segment(0); + let segment = empty.get_or_create_segment(STATIC_GRAPH_LAYER_ID); let mut head = segment.head_mut(); + for layer in layer_mapper.ids() { head.get_or_create_layer(layer); } + if prop_mapper.num_fields() > 0 { head.get_or_create_layer(0) .properties_mut() .set_has_properties() } - segment.mark_dirty(); + + segment.set_dirty(true); } empty } @@ -330,9 +338,11 @@ impl, EXT: Config> EdgeStorageInner if let Some(segment) = self.segments.get(segment_id) { return segment; } + let count = self.segments.count(); + if count > segment_id { - // something has allocated the segment, wait for it to be added + // Something has allocated the segment, wait for it to be added. loop { if let Some(segment) = self.segments.get(segment_id) { return segment; @@ -342,7 +352,7 @@ impl, EXT: Config> EdgeStorageInner } } } else { - // we need to create the segment + // We need to create the segment. self.segments.reserve(segment_id + 1 - count); loop { @@ -360,7 +370,7 @@ impl, EXT: Config> EdgeStorageInner if let Some(segment) = self.segments.get(segment_id) { return segment; } else { - // wait for the segment to be created + // Wait for the segment to be created. std::thread::yield_now(); } } diff --git a/db4-storage/src/pages/graph_prop_page/writer.rs b/db4-storage/src/pages/graph_prop_page/writer.rs index 50485d47c8..612a1be9cc 100644 --- a/db4-storage/src/pages/graph_prop_page/writer.rs +++ b/db4-storage/src/pages/graph_prop_page/writer.rs @@ -31,10 +31,9 @@ impl<'a, GS: GraphPropSegmentOps> GraphPropWriter<'a, GS> { lsn: u64, ) { let add = self.mem_segment.add_properties(t, props); - self.mem_segment.layers_mut()[MemGraphPropSegment::DEFAULT_LAYER].set_lsn(lsn); self.graph_props.increment_est_size(add); - self.graph_props.mark_dirty(); + self.graph_props.set_dirty(true); } pub fn check_metadata(&self, props: &[(usize, Prop)]) -> Result<(), StorageError> { @@ -43,10 +42,9 @@ impl<'a, GS: GraphPropSegmentOps> GraphPropWriter<'a, GS> { pub fn update_metadata(&mut self, props: impl IntoIterator, lsn: u64) { let add = self.mem_segment.update_metadata(props); - self.mem_segment.layers_mut()[MemGraphPropSegment::DEFAULT_LAYER].set_lsn(lsn); self.graph_props.increment_est_size(add); - self.graph_props.mark_dirty(); + self.graph_props.set_dirty(true); } } diff --git a/db4-storage/src/pages/graph_prop_store.rs b/db4-storage/src/pages/graph_prop_store.rs index 6e958182c3..0f1aaff698 100644 --- a/db4-storage/src/pages/graph_prop_store.rs +++ b/db4-storage/src/pages/graph_prop_store.rs @@ -66,6 +66,10 @@ impl, EXT: Config> GraphPropStorageInne self.page.entry() } + pub fn segment(&self) -> &Arc { + &self.page + } + pub fn writer(&self) -> GraphPropWriter<'_, GS> { let head = self.page.head_mut(); let graph_props = &self.page; diff --git a/db4-storage/src/pages/locked/edges.rs b/db4-storage/src/pages/locked/edges.rs index a07f03147b..ff01546c1d 100644 --- a/db4-storage/src/pages/locked/edges.rs +++ b/db4-storage/src/pages/locked/edges.rs @@ -79,6 +79,11 @@ impl<'a, ES: EdgeSegmentOps> WriteLockedEdgePages<'a, ES> { Self { writers } } + #[inline] + pub fn get_mut(&mut self, segment_id: usize) -> Option<&mut LockedEdgePage<'a, ES>> { + self.writers.get_mut(segment_id) + } + pub fn par_iter_mut(&mut self) -> rayon::slice::IterMut<'_, LockedEdgePage<'a, ES>> { self.writers.par_iter_mut() } diff --git a/db4-storage/src/pages/locked/graph_props.rs b/db4-storage/src/pages/locked/graph_props.rs index 5ef775dfdb..87d41dc222 100644 --- a/db4-storage/src/pages/locked/graph_props.rs +++ b/db4-storage/src/pages/locked/graph_props.rs @@ -27,10 +27,9 @@ impl<'a, GS: GraphPropSegmentOps> LockedGraphPropPage<'a, GS> { lsn: u64, ) { let add = self.lock.add_properties(t, props); - self.lock.layers_mut()[MemGraphPropSegment::DEFAULT_LAYER].set_lsn(lsn); self.page.increment_est_size(add); - self.page.mark_dirty(); + self.page.set_dirty(true); } /// Add metadata (constant properties) to the graph @@ -41,10 +40,9 @@ impl<'a, GS: GraphPropSegmentOps> LockedGraphPropPage<'a, GS> { /// Update metadata (constant properties) on the graph pub fn update_metadata(&mut self, props: impl IntoIterator, lsn: u64) { let add = self.lock.update_metadata(props); - self.lock.layers_mut()[MemGraphPropSegment::DEFAULT_LAYER].set_lsn(lsn); self.page.increment_est_size(add); - self.page.mark_dirty(); + self.page.set_dirty(true); } } diff --git a/db4-storage/src/pages/locked/nodes.rs b/db4-storage/src/pages/locked/nodes.rs index 48b4fd7f10..78aed9dbd5 100644 --- a/db4-storage/src/pages/locked/nodes.rs +++ b/db4-storage/src/pages/locked/nodes.rs @@ -11,7 +11,7 @@ use rayon::prelude::*; use std::ops::DerefMut; pub struct LockedNodePage<'a, NS> { - page_id: usize, + segment_id: usize, max_page_len: u32, layer_counter: &'a GraphStats, page: &'a NS, @@ -20,14 +20,14 @@ pub struct LockedNodePage<'a, NS> { impl<'a, NS: NodeSegmentOps> LockedNodePage<'a, NS> { pub fn new( - page_id: usize, + segment_id: usize, layer_counter: &'a GraphStats, max_page_len: u32, page: &'a NS, lock: RwLockWriteGuard<'a, MemNodeSegment>, ) -> Self { Self { - page_id, + segment_id, layer_counter, max_page_len, page, @@ -49,14 +49,15 @@ impl<'a, NS: NodeSegmentOps> LockedNodePage<'a, NS> { } #[inline(always)] - pub fn page_id(&self) -> usize { - self.page_id + pub fn segment_id(&self) -> usize { + self.segment_id } #[inline(always)] pub fn resolve_pos(&self, node_id: VID) -> Option { let (page, pos) = resolve_pos(node_id, self.max_page_len); - if page == self.page_id { + + if page == self.segment_id { Some(pos) } else { None @@ -86,6 +87,15 @@ impl<'a, NS: NodeSegmentOps> WriteLockedNodePages<'a, NS> { Self { writers } } + pub fn len(&self) -> usize { + self.writers.len() + } + + #[inline] + pub fn get_mut(&mut self, segment_id: usize) -> Option<&mut LockedNodePage<'a, NS>> { + self.writers.get_mut(segment_id) + } + pub fn par_iter_mut(&mut self) -> rayon::slice::IterMut<'_, LockedNodePage<'a, NS>> { self.writers.par_iter_mut() } @@ -104,10 +114,6 @@ impl<'a, NS: NodeSegmentOps> WriteLockedNodePages<'a, NS> { } } - pub fn len(&self) -> usize { - self.writers.len() - } - pub fn vacuum(&mut self) -> Result<(), StorageError> { for LockedNodePage { page, lock, .. } in &mut self.writers { page.vacuum(lock.deref_mut())?; diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index 6be52d59e5..d0b6da23e1 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -9,8 +9,8 @@ use crate::{ }; use edge_page::writer::EdgeWriter; use edge_store::EdgeStorageInner; +use node_page::writer::{NodeWriter, NodeWriters}; use graph_prop_store::GraphPropStorageInner; -use node_page::writer::{NodeWriter, WriterPair}; use node_store::NodeStorageInner; use parking_lot::RwLockWriteGuard; use raphtory_api::core::{ @@ -247,10 +247,11 @@ impl< let src = src.into(); let dst = dst.into(); let mut session = self.write_session(src, dst, None); + session.set_lsn(lsn); let elid = session - .add_static_edge(src, dst, lsn) + .add_static_edge(src, dst) .map(|eid| eid.with_layer(0)); - session.add_edge_into_layer(t, src, dst, elid, lsn, props); + session.add_edge_into_layer(t, src, dst, elid, props); Ok(elid) } @@ -318,7 +319,7 @@ impl< let (segment, node_pos) = self.nodes.resolve_pos(node); let mut node_writer = self.nodes.writer(segment); let prop_writer = PropsMetaWriter::constant(self.node_meta(), props.into_iter())?; - node_writer.update_c_props(node_pos, layer_id, prop_writer.into_props_const()?, 0); // TODO: LSN + node_writer.update_c_props(node_pos, layer_id, prop_writer.into_props_const()?); Ok(()) } @@ -336,7 +337,7 @@ impl< let mut node_writer = self.nodes.writer(segment); let prop_writer = PropsMetaWriter::temporal(self.node_meta(), props.into_iter())?; - node_writer.add_props(t, node_pos, layer_id, prop_writer.into_props_temporal()?, 0); // TODO: LSN + node_writer.add_props(t, node_pos, layer_id, prop_writer.into_props_temporal()?); Ok(()) } @@ -349,26 +350,30 @@ impl< let (src_chunk, _) = self.nodes.resolve_pos(src); let (dst_chunk, _) = self.nodes.resolve_pos(dst); + // Acquire locks in consistent order (lower chunk ID first) to prevent deadlocks. let node_writers = if src_chunk < dst_chunk { - let src_writer = self.node_writer(src_chunk); - let dst_writer = self.node_writer(dst_chunk); - WriterPair::Different { - src_writer, - dst_writer, - } + let src = self.node_writer(src_chunk); + let dst = self.node_writer(dst_chunk); + + NodeWriters { src, dst: Some(dst) } } else if src_chunk > dst_chunk { - let dst_writer = self.node_writer(dst_chunk); - let src_writer = self.node_writer(src_chunk); - WriterPair::Different { - src_writer, - dst_writer, - } + let dst = self.node_writer(dst_chunk); + let src = self.node_writer(src_chunk); + + NodeWriters { src, dst: Some(dst) } } else { - let writer = self.node_writer(src_chunk); - WriterPair::Same { writer } + let src = self.node_writer(src_chunk); + + NodeWriters { src, dst: None } }; - let edge_writer = e_id.map(|e_id| self.edge_writer(e_id)); + let (_, src_pos) = self.nodes.resolve_pos(src); + let existing_eid = node_writers.src.get_out_edge(src_pos, dst, 0); + + let edge_writer = match e_id.or(existing_eid) { + Some(e_id) => self.edge_writer(e_id), + None => self.get_free_writer(), + }; WriteSession::new(node_writers, edge_writer, self) } @@ -386,22 +391,29 @@ impl< self.nodes().get_or_create_segment(src_chunk); self.nodes().get_or_create_segment(dst_chunk); + // FIXME: This can livelock due to inconsistent lock acquisition order. loop { if let Some(src_writer) = self.nodes().try_writer(src_chunk) { if let Some(dst_writer) = self.nodes().try_writer(dst_chunk) { - break WriterPair::Different { - src_writer, - dst_writer, + break NodeWriters { + src: src_writer, + dst: Some(dst_writer), }; } } } } else { let writer = self.node_writer(src_chunk); - WriterPair::Same { writer } + NodeWriters { src: writer, dst: None } }; - let edge_writer = e_id.map(|e_id| self.edge_writer(e_id)); + let (_, src_pos) = self.nodes.resolve_pos(src); + let existing_eid = node_writers.src.get_out_edge(src_pos, dst, 0); + + let edge_writer = match e_id.or(existing_eid) { + Some(e_id) => self.edge_writer(e_id), + None => self.get_free_writer(), + }; WriteSession::new(node_writers, edge_writer, self) } diff --git a/db4-storage/src/pages/node_page/writer.rs b/db4-storage/src/pages/node_page/writer.rs index 882c97d05e..b54c635569 100644 --- a/db4-storage/src/pages/node_page/writer.rs +++ b/db4-storage/src/pages/node_page/writer.rs @@ -37,9 +37,8 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri src_pos: impl Into, dst: impl Into, e_id: impl Into, - lsn: u64, ) { - self.add_outbound_edge_inner(t, src_pos, dst, e_id, lsn); + self.add_outbound_edge_inner(t, src_pos, dst, e_id); } pub fn add_static_outbound_edge( @@ -47,10 +46,9 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri src_pos: LocalPOS, dst: impl Into, e_id: impl Into, - lsn: u64, ) { let e_id = e_id.into(); - self.add_outbound_edge_inner::(None, src_pos, dst, e_id.with_layer(0), lsn); + self.add_outbound_edge_inner::(None, src_pos, dst, e_id.with_layer(0)); } fn add_outbound_edge_inner( @@ -59,7 +57,6 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri src_pos: impl Into, dst: impl Into, e_id: impl Into, - lsn: u64, ) { let src_pos = src_pos.into(); let dst = dst.into(); @@ -71,7 +68,7 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri let layer_id = e_id.layer(); let (is_new_node, add) = self .mut_segment - .add_outbound_edge(t, src_pos, dst, e_id, lsn); + .add_outbound_edge(t, src_pos, dst, e_id); self.page.increment_est_size(add); if is_new_node && !self.page.check_node(src_pos, layer_id) { @@ -85,9 +82,8 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri dst_pos: impl Into, src: impl Into, e_id: impl Into, - lsn: u64, ) { - self.add_inbound_edge_inner(t, dst_pos, src, e_id, lsn); + self.add_inbound_edge_inner(t, dst_pos, src, e_id); } pub fn add_static_inbound_edge( @@ -95,10 +91,9 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri dst_pos: LocalPOS, src: impl Into, e_id: impl Into, - lsn: u64, ) { let e_id = e_id.into(); - self.add_inbound_edge_inner::(None, dst_pos, src, e_id.with_layer(0), lsn); + self.add_inbound_edge_inner::(None, dst_pos, src, e_id.with_layer(0)); } fn add_inbound_edge_inner( @@ -107,7 +102,6 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri dst_pos: impl Into, src: impl Into, e_id: impl Into, - lsn: u64, ) { let e_id = e_id.into(); let src = src.into(); @@ -118,7 +112,7 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri let dst_pos = dst_pos.into(); let (is_new_node, add) = self .mut_segment - .add_inbound_edge(t, dst_pos, src, e_id, lsn); + .add_inbound_edge(t, dst_pos, src, e_id); self.page.increment_est_size(add); @@ -133,11 +127,9 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri pos: LocalPOS, layer_id: usize, props: impl IntoIterator, - lsn: u64, ) { self.l_counter.update_time(t.t()); let (is_new_node, add) = self.mut_segment.add_props(t, pos, layer_id, props); - self.mut_segment.as_mut()[layer_id].set_lsn(lsn); self.page.increment_est_size(add); if is_new_node && !self.page.check_node(pos, layer_id) { self.l_counter.increment(layer_id); @@ -158,10 +150,8 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri pos: LocalPOS, layer_id: usize, props: impl IntoIterator, - lsn: u64, ) { let (is_new_node, add) = self.mut_segment.update_metadata(pos, layer_id, props); - self.mut_segment.as_mut()[layer_id].set_lsn(lsn); self.page.increment_est_size(add); if is_new_node && !self.page.check_node(pos, layer_id) { self.l_counter.increment(layer_id); @@ -172,9 +162,9 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri self.mut_segment.get_metadata(pos, layer_id, prop_id) } - pub fn update_timestamp(&mut self, t: T, pos: LocalPOS, e_id: ELID, lsn: u64) { + pub fn update_timestamp(&mut self, t: T, pos: LocalPOS, e_id: ELID) { self.l_counter.update_time(t.t()); - let add = self.mut_segment.update_timestamp(t, pos, e_id, lsn); + let add = self.mut_segment.update_timestamp(t, pos, e_id); self.page.increment_est_size(add); } @@ -194,18 +184,17 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> NodeWri layer_id: usize, gid: GidRef<'_>, node_type: usize, - lsn: u64, ) { let node_type = (node_type != 0).then_some(node_type); - self.update_c_props(pos, layer_id, node_info_as_props(Some(gid), node_type), lsn); + self.update_c_props(pos, layer_id, node_info_as_props(Some(gid), node_type)); } - pub fn store_node_id(&mut self, pos: LocalPOS, layer_id: usize, gid: GidRef<'_>, lsn: u64) { - self.update_c_props(pos, layer_id, node_info_as_props(Some(gid), None), lsn); + pub fn store_node_id(&mut self, pos: LocalPOS, layer_id: usize, gid: GidRef<'_>) { + self.update_c_props(pos, layer_id, node_info_as_props(Some(gid), None)); } - pub fn update_deletion_time(&mut self, t: T, node: LocalPOS, e_id: ELID, lsn: u64) { - self.update_timestamp(t, node, e_id, lsn); + pub fn update_deletion_time(&mut self, t: T, node: LocalPOS, e_id: ELID) { + self.update_timestamp(t, node, e_id); } } @@ -231,34 +220,20 @@ impl<'a, MP: DerefMut + 'a, NS: NodeSegmentOps> Drop } } -pub enum WriterPair<'a, MP: DerefMut, NS: NodeSegmentOps> { - Same { - writer: NodeWriter<'a, MP, NS>, - }, - Different { - src_writer: NodeWriter<'a, MP, NS>, - dst_writer: NodeWriter<'a, MP, NS>, - }, + +/// Holds writers for src and dst node segments when adding an edge. +/// If both nodes are in the same segment, `dst` is `None` and `src` is used for both. +pub struct NodeWriters<'a, MP: DerefMut, NS: NodeSegmentOps> { + pub src: NodeWriter<'a, MP, NS>, + pub dst: Option>, } -impl<'a, MP: DerefMut, NS: NodeSegmentOps> WriterPair<'a, MP, NS> { +impl<'a, MP: DerefMut, NS: NodeSegmentOps> NodeWriters<'a, MP, NS> { pub fn get_mut_src(&mut self) -> &mut NodeWriter<'a, MP, NS> { - match self { - WriterPair::Same { writer, .. } => writer, - WriterPair::Different { - src_writer: writer_i, - .. - } => writer_i, - } + &mut self.src } pub fn get_mut_dst(&mut self) -> &mut NodeWriter<'a, MP, NS> { - match self { - WriterPair::Same { writer, .. } => writer, - WriterPair::Different { - dst_writer: writer_j, - .. - } => writer_j, - } + self.dst.as_mut().unwrap_or(&mut self.src) } } diff --git a/db4-storage/src/pages/node_store.rs b/db4-storage/src/pages/node_store.rs index 113112a77a..b94924f1a0 100644 --- a/db4-storage/src/pages/node_store.rs +++ b/db4-storage/src/pages/node_store.rs @@ -159,7 +159,7 @@ impl, EXT: Config> NodeStorageInner .properties_mut() .set_has_properties() } - segment.mark_dirty(); + segment.set_dirty(true); } empty } @@ -354,19 +354,21 @@ impl, EXT: Config> NodeStorageInner if let Some(segment) = self.pages.get(segment_id) { return segment; } + let count = self.pages.count(); + if count > segment_id { - // something has allocated the segment, wait for it to be added + // Something has allocated the segment, wait for it to be added. loop { if let Some(segment) = self.pages.get(segment_id) { return segment; } else { - // wait for the segment to be created + // Wait for the segment to be created. std::thread::yield_now(); } } } else { - // we need to create the segment + // We need to create the segment. self.pages.reserve(segment_id + 1 - count); loop { @@ -385,7 +387,7 @@ impl, EXT: Config> NodeStorageInner if let Some(segment) = self.pages.get(segment_id) { return segment; } else { - // wait for the segment to be created + // Wait for the segment to be created. std::thread::yield_now(); } } diff --git a/db4-storage/src/pages/session.rs b/db4-storage/src/pages/session.rs index 36999acd38..a83bb9f899 100644 --- a/db4-storage/src/pages/session.rs +++ b/db4-storage/src/pages/session.rs @@ -1,14 +1,15 @@ use super::{ - GraphStore, edge_page::writer::EdgeWriter, node_page::writer::WriterPair, resolve_pos, + GraphStore, edge_page::writer::EdgeWriter, node_page::writer::NodeWriters, resolve_pos, }; use crate::{ LocalPOS, api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps}, persist::strategy::{Config, PersistentStrategy}, segments::{edge::segment::MemEdgeSegment, node::segment::MemNodeSegment}, + wal::LSN, }; use parking_lot::RwLockWriteGuard; -use raphtory_api::core::{entities::properties::prop::Prop, storage::dict_mapper::MaybeNew}; +use raphtory_api::core::{entities::properties::{meta::STATIC_GRAPH_LAYER_ID, prop::Prop}, storage::dict_mapper::MaybeNew}; use raphtory_core::{ entities::{EID, ELID, VID}, storage::timeindex::AsTime, @@ -21,8 +22,8 @@ pub struct WriteSession< GS: GraphPropSegmentOps, EXT: Config, > { - node_writers: WriterPair<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS>, - edge_writer: Option, ES>>, + node_writers: NodeWriters<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS>, + edge_writer: EdgeWriter<'a, RwLockWriteGuard<'a, MemEdgeSegment>, ES>, graph: &'a GraphStore, } @@ -35,8 +36,8 @@ impl< > WriteSession<'a, NS, ES, GS, EXT> { pub fn new( - node_writers: WriterPair<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS>, - edge_writer: Option, ES>>, + node_writers: NodeWriters<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS>, + edge_writer: EdgeWriter<'a, RwLockWriteGuard<'a, MemEdgeSegment>, ES>, graph: &'a GraphStore, ) -> Self { Self { @@ -56,7 +57,6 @@ impl< src: impl Into, dst: impl Into, edge: MaybeNew, - lsn: u64, props: impl IntoIterator, ) { let src = src.into(); @@ -69,19 +69,15 @@ impl< let (_, src_pos) = self.graph.nodes().resolve_pos(src); let (_, dst_pos) = self.graph.nodes().resolve_pos(dst); - if let Some(writer) = self.edge_writer.as_mut() { - let edge_max_page_len = writer.writer.get_or_create_layer(layer).max_page_len(); - let (_, edge_pos) = resolve_pos(e_id.edge, edge_max_page_len); + let edge_max_page_len = self + .edge_writer + .writer + .get_or_create_layer(layer) + .max_page_len(); + let (_, edge_pos) = resolve_pos(e_id.edge, edge_max_page_len); - writer.add_edge(t, edge_pos, src, dst, props, layer, lsn); - } else { - let mut writer = self.graph.edge_writer(e_id.edge); - let edge_max_page_len = writer.writer.get_or_create_layer(layer).max_page_len(); - let (_, edge_pos) = resolve_pos(e_id.edge, edge_max_page_len); - - writer.add_edge(t, edge_pos, src, dst, props, layer, lsn); - self.edge_writer = Some(writer); // Attach edge_writer to hold onto locks - } + self.edge_writer + .add_edge(t, edge_pos, src, dst, props, layer); let edge_id = edge.inner(); @@ -94,18 +90,18 @@ impl< { self.node_writers .get_mut_src() - .add_outbound_edge(Some(t), src_pos, dst, edge_id, lsn); + .add_outbound_edge(Some(t), src_pos, dst, edge_id); self.node_writers .get_mut_dst() - .add_inbound_edge(Some(t), dst_pos, src, edge_id, lsn); + .add_inbound_edge(Some(t), dst_pos, src, edge_id); } self.node_writers .get_mut_src() - .update_timestamp(t, src_pos, e_id, lsn); + .update_timestamp(t, src_pos, e_id); self.node_writers .get_mut_dst() - .update_timestamp(t, dst_pos, e_id, lsn); + .update_timestamp(t, dst_pos, e_id); } pub fn delete_edge_from_layer( @@ -114,7 +110,6 @@ impl< src: impl Into, dst: impl Into, edge: MaybeNew, - lsn: u64, ) { let src = src.into(); let dst = dst.into(); @@ -126,19 +121,15 @@ impl< let (_, src_pos) = self.graph.nodes().resolve_pos(src); let (_, dst_pos) = self.graph.nodes().resolve_pos(dst); - if let Some(writer) = self.edge_writer.as_mut() { - let edge_max_page_len = writer.writer.get_or_create_layer(layer).max_page_len(); - let (_, edge_pos) = resolve_pos(e_id.edge, edge_max_page_len); - - writer.delete_edge(t, edge_pos, src, dst, layer, lsn); - } else { - let mut writer = self.graph.edge_writer(e_id.edge); - let edge_max_page_len = writer.writer.get_or_create_layer(layer).max_page_len(); - let (_, edge_pos) = resolve_pos(e_id.edge, edge_max_page_len); + let edge_max_page_len = self + .edge_writer + .writer + .get_or_create_layer(layer) + .max_page_len(); + let (_, edge_pos) = resolve_pos(e_id.edge, edge_max_page_len); - writer.delete_edge(t, edge_pos, src, dst, layer, lsn); - self.edge_writer = Some(writer); // Attach edge_writer to hold onto locks - } + self.edge_writer + .delete_edge(t, edge_pos, src, dst, layer); let edge_id = edge.inner(); @@ -155,23 +146,21 @@ impl< src_pos, dst, edge_id, - lsn, ); self.node_writers.get_mut_dst().add_inbound_edge( Some(t), dst_pos, src, edge_id, - lsn, ); } self.node_writers .get_mut_src() - .update_deletion_time(t, src_pos, e_id, lsn); + .update_deletion_time(t, src_pos, e_id); self.node_writers .get_mut_dst() - .update_deletion_time(t, dst_pos, e_id, lsn); + .update_deletion_time(t, dst_pos, e_id); } } @@ -179,52 +168,53 @@ impl< &mut self, src: impl Into, dst: impl Into, - lsn: u64, ) -> MaybeNew { let src = src.into(); let dst = dst.into(); - let layer_id = 0; // static graph goes to layer 0 let (_, src_pos) = self.graph.nodes().resolve_pos(src); let (_, dst_pos) = self.graph.nodes().resolve_pos(dst); - if let Some(e_id) = self + let existing_eid = self .node_writers .get_mut_src() - .get_out_edge(src_pos, dst, layer_id) - { - // If edge_writer is not set, we need to create a new one - if self.edge_writer.is_none() { - self.edge_writer = Some(self.graph.edge_writer(e_id)); - } - let edge_writer = self.edge_writer.as_mut().unwrap(); - let (_, edge_pos) = self.graph.edges().resolve_pos(e_id); + .get_out_edge(src_pos, dst, STATIC_GRAPH_LAYER_ID); - edge_writer.add_static_edge(Some(edge_pos), src, dst, lsn, true); - - MaybeNew::Existing(e_id) - } else { - let mut edge_writer = self.graph.get_free_writer(); - let edge_id = edge_writer.add_static_edge(None, src, dst, lsn, false); - let edge_id = - edge_id.as_eid(edge_writer.segment_id(), self.graph.edges().max_page_len()); + // Edge already exists, so no need to add it again. + if let Some(eid) = existing_eid { + return MaybeNew::Existing(eid) + } - self.edge_writer = Some(edge_writer); // Attach edge_writer to hold onto locks + let edge_pos = None; + let already_counted = false; + let edge_pos = + self.edge_writer.add_static_edge(edge_pos, src, dst, already_counted); + let edge_id = + edge_pos.as_eid(self.edge_writer.segment_id(), self.graph.edges().max_page_len()); - self.node_writers - .get_mut_src() - .add_static_outbound_edge(src_pos, dst, edge_id, lsn); - self.node_writers - .get_mut_dst() - .add_static_inbound_edge(dst_pos, src, edge_id, lsn); + self.node_writers + .get_mut_src() + .add_static_outbound_edge(src_pos, dst, edge_id); + self.node_writers + .get_mut_dst() + .add_static_inbound_edge(dst_pos, src, edge_id); - MaybeNew::New(edge_id) - } + MaybeNew::New(edge_id) } pub fn node_writers( &mut self, - ) -> &mut WriterPair<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS> { + ) -> &mut NodeWriters<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS> { &mut self.node_writers } + + pub fn set_lsn(&mut self, lsn: LSN) { + self.node_writers.src.mut_segment.set_lsn(lsn); + + if let Some(dst) = &mut self.node_writers.dst { + dst.mut_segment.set_lsn(lsn); + } + + self.edge_writer.writer.set_lsn(lsn); + } } diff --git a/db4-storage/src/pages/test_utils/checkers.rs b/db4-storage/src/pages/test_utils/checkers.rs index 44adc815b9..b1d7568b19 100644 --- a/db4-storage/src/pages/test_utils/checkers.rs +++ b/db4-storage/src/pages/test_utils/checkers.rs @@ -58,9 +58,10 @@ pub fn make_graph_from_edges< let layer_id = layer_id.unwrap_or(0); let mut session = graph.write_session(*src, *dst, None); - let eid = session.add_static_edge(*src, *dst, lsn); + session.set_lsn(lsn); + let eid = session.add_static_edge(*src, *dst); let elid = eid.map(|eid| eid.with_layer(layer_id)); - session.add_edge_into_layer(timestamp, *src, *dst, elid, lsn, []); + session.add_edge_into_layer(timestamp, *src, *dst, elid, []); Ok::<_, StorageError>(()) }) @@ -75,9 +76,10 @@ pub fn make_graph_from_edges< let layer_id = layer_id.unwrap_or(0); let mut session = graph.write_session(*src, *dst, None); - let eid = session.add_static_edge(*src, *dst, lsn); + session.set_lsn(lsn); + let eid = session.add_static_edge(*src, *dst); let elid = eid.map(|e| e.with_layer(layer_id)); - session.add_edge_into_layer(timestamp, *src, *dst, elid, lsn, []); + session.add_edge_into_layer(timestamp, *src, *dst, elid, []); Ok::<_, StorageError>(()) }) diff --git a/db4-storage/src/segments/edge/segment.rs b/db4-storage/src/segments/edge/segment.rs index 66c072ef30..f7e5a72923 100644 --- a/db4-storage/src/segments/edge/segment.rs +++ b/db4-storage/src/segments/edge/segment.rs @@ -10,6 +10,7 @@ use crate::{ edge::entry::{MemEdgeEntry, MemEdgeRef}, }, utils::Iter4, + wal::LSN, }; use arrow_array::{ArrayRef, BooleanArray}; use parking_lot::lock_api::ArcRwLockReadGuard; @@ -53,6 +54,7 @@ impl HasRow for EdgeEntry { pub struct MemEdgeSegment { layers: Vec>, est_size: usize, + lsn: LSN, } impl>> From for MemEdgeSegment { @@ -63,7 +65,11 @@ impl>> From for MemEdgeSeg !layers.is_empty(), "MemEdgeSegment must have at least one layer" ); - Self { layers, est_size } + Self { + layers, + est_size, + lsn: 0, + } } } @@ -84,6 +90,7 @@ impl MemEdgeSegment { Self { layers: vec![SegmentContainer::new(segment_id, max_page_len, meta)], est_size: 0, + lsn: 0, } } @@ -130,7 +137,11 @@ impl MemEdgeSegment { } pub fn lsn(&self) -> u64 { - self.layers.iter().map(|seg| seg.lsn()).min().unwrap_or(0) + self.lsn + } + + pub fn set_lsn(&mut self, lsn: u64) { + self.lsn = lsn; } pub fn max_page_len(&self) -> u32 { @@ -207,20 +218,20 @@ impl MemEdgeSegment { dst: VID, layer_id: usize, props: impl IntoIterator, - lsn: u64, ) { // Ensure we have enough layers self.ensure_layer(layer_id); let est_size = self.layers[layer_id].est_size(); - self.layers[layer_id].set_lsn(lsn); let local_row = self.reserve_local_row(edge_pos, src, dst, layer_id); let mut prop_entry: PropMutEntry<'_> = self.layers[layer_id] .properties_mut() .get_mut_entry(local_row); + let ts = TimeIndexEntry::new(t.t(), t.i()); prop_entry.append_t_props(ts, props); + let layer_est_size = self.layers[layer_id].est_size(); self.est_size += layer_est_size.saturating_sub(est_size); } @@ -232,14 +243,12 @@ impl MemEdgeSegment { src: VID, dst: VID, layer_id: usize, - lsn: u64, ) { let t = TimeIndexEntry::new(t.t(), t.i()); // Ensure we have enough layers self.ensure_layer(layer_id); let est_size = self.layers[layer_id].est_size(); - self.layers[layer_id].set_lsn(lsn); let local_row = self.reserve_local_row(edge_pos, src, dst, layer_id); let props = self.layers[layer_id].properties_mut(); @@ -254,14 +263,12 @@ impl MemEdgeSegment { src: impl Into, dst: impl Into, layer_id: usize, - lsn: u64, ) { let src = src.into(); let dst = dst.into(); // Ensure we have enough layers self.ensure_layer(layer_id); - self.layers[layer_id].set_lsn(lsn); let est_size = self.layers[layer_id].est_size(); self.reserve_local_row(edge_pos, src, dst, layer_id); @@ -271,7 +278,7 @@ impl MemEdgeSegment { fn ensure_layer(&mut self, layer_id: usize) { if layer_id >= self.layers.len() { - // Get details from first layer to create consistent new layers + // Get details from first layer to create consistent new layers. if let Some(first_layer) = self.layers.first() { let segment_id = first_layer.segment_id(); let max_page_len = first_layer.max_page_len(); @@ -580,7 +587,7 @@ impl>> EdgeSegmentOps for EdgeSegm .map_or(0, |layer| layer.len()) } - fn mark_dirty(&self) {} + fn set_dirty(&self, _dirty: bool) {} } #[cfg(test)] @@ -607,7 +614,6 @@ mod test { VID(2), 0, vec![(0, Prop::from("test1"))], - 1, ); segment.insert_edge_internal( @@ -617,7 +623,6 @@ mod test { VID(4), 0, vec![(0, Prop::from("test2"))], - 2, ); segment.insert_edge_internal( @@ -627,7 +632,6 @@ mod test { VID(6), 0, vec![(0, Prop::from("test3"))], - 3, ); // Verify edges exist @@ -757,7 +761,6 @@ mod test { VID(2), 0, vec![(0, Prop::from("test1"))], - 1, ); segment1.insert_edge_internal( TimeIndexEntry::new(2, 1), @@ -766,7 +769,6 @@ mod test { VID(4), 0, vec![(0, Prop::from("test2"))], - 1, ); segment1.insert_edge_internal( TimeIndexEntry::new(3, 2), @@ -775,7 +777,6 @@ mod test { VID(6), 0, vec![(0, Prop::from("test3"))], - 1, ); // Equivalent bulk insertion @@ -825,7 +826,6 @@ mod test { VID(2), 0, vec![(0, Prop::from("individual1"))], - 1, ); // Bulk insert some edges @@ -857,7 +857,6 @@ mod test { VID(8), 0, vec![(0, Prop::from("individual2"))], - 1, ); // Another bulk insert @@ -977,14 +976,13 @@ mod test { VID(2), 0, vec![(0, Prop::from("test"))], - 1, ); let est_size1 = segment.est_size(); assert!(est_size1 > 0); - segment.delete_edge_internal(TimeIndexEntry::new(2, 3), LocalPOS(0), VID(5), VID(3), 0, 0); + segment.delete_edge_internal(TimeIndexEntry::new(2, 3), LocalPOS(0), VID(5), VID(3), 0); let est_size2 = segment.est_size(); @@ -1001,7 +999,6 @@ mod test { VID(6), 0, vec![(0, Prop::from("test2"))], - 1, ); let est_size3 = segment.est_size(); @@ -1012,7 +1009,7 @@ mod test { // Insert a static edge - segment.insert_static_edge_internal(LocalPOS(1), 4, 6, 0, 1); + segment.insert_static_edge_internal(LocalPOS(1), 4, 6, 0); let est_size4 = segment.est_size(); assert_eq!( diff --git a/db4-storage/src/segments/graph_prop/mod.rs b/db4-storage/src/segments/graph_prop/mod.rs index 7d20c0624d..d6f98c9038 100644 --- a/db4-storage/src/segments/graph_prop/mod.rs +++ b/db4-storage/src/segments/graph_prop/mod.rs @@ -79,8 +79,8 @@ impl GraphPropSegmentOps for GraphPropSegmentView

{ self.est_size.load(Ordering::Relaxed) } - fn mark_dirty(&self) { - self.is_dirty.store(true, Ordering::Relaxed); + fn set_dirty(&self, dirty: bool) { + self.is_dirty.store(dirty, Ordering::Release); } fn notify_write( diff --git a/db4-storage/src/segments/graph_prop/segment.rs b/db4-storage/src/segments/graph_prop/segment.rs index 4e310cad03..2636ce8d9a 100644 --- a/db4-storage/src/segments/graph_prop/segment.rs +++ b/db4-storage/src/segments/graph_prop/segment.rs @@ -1,7 +1,5 @@ use crate::{ - LocalPOS, - error::StorageError, - segments::{HasRow, SegmentContainer}, + error::StorageError, segments::{HasRow, SegmentContainer}, wal::LSN, LocalPOS }; use raphtory_api::core::entities::properties::{meta::Meta, prop::Prop}; use raphtory_core::{ @@ -15,6 +13,7 @@ use std::sync::Arc; pub struct MemGraphPropSegment { /// Layers containing graph properties and metadata. layers: Vec>, + lsn: LSN, } /// A unit-like struct for use with `SegmentContainer`. @@ -49,13 +48,10 @@ impl MemGraphPropSegment { Self { layers: vec![SegmentContainer::new(segment_id, max_page_len, meta)], + lsn: 0, } } - pub fn lsn(&self) -> u64 { - self.layers.iter().map(|seg| seg.lsn()).min().unwrap_or(0) - } - pub fn get_or_create_layer(&mut self, layer_id: usize) -> &mut SegmentContainer { if layer_id >= self.layers.len() { let max_page_len = self.layers[0].max_page_len(); @@ -87,7 +83,15 @@ impl MemGraphPropSegment { pub fn take(&mut self) -> Self { let layers = self.layers.iter_mut().map(|layer| layer.take()).collect(); - Self { layers } + Self { layers, lsn: self.lsn } + } + + pub fn lsn(&self) -> LSN { + self.lsn + } + + pub fn set_lsn(&mut self, lsn: LSN) { + self.lsn = lsn; } pub fn add_properties( diff --git a/db4-storage/src/segments/mod.rs b/db4-storage/src/segments/mod.rs index e0b39c7fc5..c8e660b4e4 100644 --- a/db4-storage/src/segments/mod.rs +++ b/db4-storage/src/segments/mod.rs @@ -157,7 +157,6 @@ pub struct SegmentContainer { max_page_len: u32, properties: Properties, meta: Arc, - lsn: u64, } pub trait HasRow: Default + Send + Sync + Sized { @@ -176,7 +175,6 @@ impl SegmentContainer { max_page_len, properties: Default::default(), meta, - lsn: 0, } } @@ -286,16 +284,6 @@ impl SegmentContainer { self.segment_id } - #[inline(always)] - pub fn lsn(&self) -> u64 { - self.lsn - } - - #[inline(always)] - pub fn set_lsn(&mut self, lsn: u64) { - self.lsn = lsn; - } - pub fn len(&self) -> u32 { self.data.data.len() as u32 } diff --git a/db4-storage/src/segments/node/segment.rs b/db4-storage/src/segments/node/segment.rs index bc7c5bcdd6..832828b990 100644 --- a/db4-storage/src/segments/node/segment.rs +++ b/db4-storage/src/segments/node/segment.rs @@ -1,13 +1,7 @@ use crate::{ - LocalPOS, - api::nodes::{LockedNSSegment, NodeSegmentOps}, - error::StorageError, - loop_lock_write, - persist::strategy::PersistentStrategy, - segments::{ - HasRow, SegmentContainer, - node::entry::{MemNodeEntry, MemNodeRef}, - }, + api::nodes::{LockedNSSegment, NodeSegmentOps}, error::StorageError, loop_lock_write, persist::strategy::PersistentStrategy, segments::{ + node::entry::{MemNodeEntry, MemNodeRef}, HasRow, SegmentContainer + }, wal::LSN, LocalPOS }; use either::Either; use parking_lot::lock_api::ArcRwLockReadGuard; @@ -36,6 +30,7 @@ pub struct MemNodeSegment { segment_id: usize, max_page_len: u32, layers: Vec>, + lsn: LSN, } impl>> From for MemNodeSegment { @@ -51,6 +46,7 @@ impl>> From for MemNodeSegm segment_id, max_page_len, layers, + lsn: 0, } } } @@ -141,8 +137,14 @@ impl MemNodeSegment { self.get_adj(n, layer_id).map_or(0, |adj| adj.degree(dir)) } - pub fn lsn(&self) -> u64 { - self.layers.iter().map(|seg| seg.lsn()).min().unwrap_or(0) + pub fn lsn(&self) -> LSN { + self.lsn + } + + pub fn set_lsn(&mut self, lsn: LSN) { + if lsn > self.lsn { + self.lsn = lsn; + } } pub fn to_vid(&self, pos: LocalPOS) -> VID { @@ -190,6 +192,7 @@ impl MemNodeSegment { segment_id, max_page_len, layers: vec![SegmentContainer::new(segment_id, max_page_len, meta)], + lsn: 0, } } @@ -199,14 +202,12 @@ impl MemNodeSegment { src_pos: LocalPOS, dst: impl Into, e_id: impl Into, - lsn: u64, ) -> (bool, usize) { let dst = dst.into(); let e_id = e_id.into(); let layer_id = e_id.layer(); let layer = self.get_or_create_layer(layer_id); let est_size = layer.est_size(); - layer.set_lsn(lsn); let add_out = layer.reserve_local_row(src_pos); let new_entry = add_out.is_new(); @@ -228,7 +229,6 @@ impl MemNodeSegment { dst_pos: impl Into, src: impl Into, e_id: impl Into, - lsn: u64, ) -> (bool, usize) { let src = src.into(); let e_id = e_id.into(); @@ -237,7 +237,6 @@ impl MemNodeSegment { let layer = self.get_or_create_layer(layer_id); let est_size = layer.est_size(); - layer.set_lsn(lsn); let add_in = layer.reserve_local_row(dst_pos); let new_entry = add_in.is_new(); @@ -268,12 +267,10 @@ impl MemNodeSegment { t: T, node_pos: LocalPOS, e_id: ELID, - lsn: u64, ) -> usize { let layer_id = e_id.layer(); let (est_size, row) = { let segment_container = self.get_or_create_layer(layer_id); //&mut self.layers[e_id.layer()]; - segment_container.set_lsn(lsn); let est_size = segment_container.est_size(); let row = segment_container.reserve_local_row(node_pos).inner().row(); (est_size, row) @@ -483,7 +480,7 @@ impl>> NodeSegmentOps for NodeSegm Ok(()) } - fn mark_dirty(&self) {} + fn set_dirty(&self, _dirty: bool) {} fn check_node(&self, _pos: LocalPOS, _layer_id: usize) -> bool { false @@ -586,7 +583,7 @@ mod test { let est_size1 = segment.est_size(); assert_eq!(est_size1, 0); - writer.add_outbound_edge(Some(1), LocalPOS(1), VID(3), EID(7).with_layer(0), 0); + writer.add_outbound_edge(Some(1), LocalPOS(1), VID(3), EID(7).with_layer(0)); let est_size2 = segment.est_size(); assert!( @@ -594,7 +591,7 @@ mod test { "Estimated size should be greater than 0 after adding an edge" ); - writer.add_inbound_edge(Some(1), LocalPOS(2), VID(4), EID(8).with_layer(0), 0); + writer.add_inbound_edge(Some(1), LocalPOS(2), VID(4), EID(8).with_layer(0)); let est_size3 = segment.est_size(); assert!( @@ -604,7 +601,7 @@ mod test { // no change when adding the same edge again - writer.add_outbound_edge::(None, LocalPOS(1), VID(3), EID(7).with_layer(0), 0); + writer.add_outbound_edge::(None, LocalPOS(1), VID(3), EID(7).with_layer(0)); let est_size4 = segment.est_size(); assert_eq!( est_size4, est_size3, @@ -619,7 +616,7 @@ mod test { .unwrap() .inner(); - writer.update_c_props(LocalPOS(1), 0, [(prop_id, Prop::U64(73))], 0); + writer.update_c_props(LocalPOS(1), 0, [(prop_id, Prop::U64(73))]); let est_size5 = segment.est_size(); assert!( @@ -627,7 +624,7 @@ mod test { "Estimated size should increase after adding constant properties" ); - writer.update_timestamp(17, LocalPOS(1), ELID::new(EID(0), 0), 0); + writer.update_timestamp(17, LocalPOS(1), ELID::new(EID(0), 0)); let est_size6 = segment.est_size(); assert!( @@ -642,7 +639,7 @@ mod test { .unwrap() .inner(); - writer.add_props(42, LocalPOS(1), 0, [(prop_id, Prop::F64(4.13))], 0); + writer.add_props(42, LocalPOS(1), 0, [(prop_id, Prop::F64(4.13))]); let est_size7 = segment.est_size(); assert!( @@ -650,7 +647,7 @@ mod test { "Estimated size should increase after adding temporal properties" ); - writer.add_props(72, LocalPOS(1), 0, [(prop_id, Prop::F64(5.41))], 0); + writer.add_props(72, LocalPOS(1), 0, [(prop_id, Prop::F64(5.41))]); let est_size8 = segment.est_size(); assert!( est_size8 > est_size7, diff --git a/db4-storage/src/transaction/mod.rs b/db4-storage/src/transaction/mod.rs new file mode 100644 index 0000000000..439e5b00de --- /dev/null +++ b/db4-storage/src/transaction/mod.rs @@ -0,0 +1,40 @@ +use std::sync::atomic::{self, AtomicU64}; + +use crate::wal::TransactionID; + +#[derive(Debug)] +pub struct TransactionManager { + last_transaction_id: AtomicU64, +} + +impl TransactionManager { + const STARTING_TRANSACTION_ID: TransactionID = 1; + + pub fn new() -> Self { + Self { + last_transaction_id: AtomicU64::new(Self::STARTING_TRANSACTION_ID), + } + } + + /// Restores the last used transaction ID to the specified value. + /// Intended for using during recovery. + pub fn restore_transaction_id(&self, last_transaction_id: TransactionID) { + self.last_transaction_id + .store(last_transaction_id, atomic::Ordering::SeqCst) + } + + pub fn begin_transaction(&self) -> TransactionID { + self.last_transaction_id + .fetch_add(1, atomic::Ordering::SeqCst) + } + + pub fn end_transaction(&self, _transaction_id: TransactionID) { + // No-op for now. + } +} + +impl Default for TransactionManager { + fn default() -> Self { + Self::new() + } +} diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index 71ba54ce4a..7b0b0e6745 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -1,6 +1,6 @@ use std::path::Path; -use raphtory_api::core::{entities::properties::prop::Prop, storage::dict_mapper::MaybeNew}; +use raphtory_api::core::entities::properties::prop::Prop; use raphtory_core::{ entities::{EID, GID, VID}, storage::timeindex::TimeIndexEntry, @@ -8,84 +8,24 @@ use raphtory_core::{ use crate::{ error::StorageError, - wal::{GraphReplayer, GraphWal, LSN, TransactionID, no_wal::NoWal}, + wal::{GraphReplay, GraphWal, LSN, TransactionID, no_wal::NoWal}, }; impl GraphWal for NoWal { type ReplayEntry = (); - fn log_begin_transaction(&self, _transaction_id: TransactionID) -> Result { - Ok(0) - } - - fn log_end_transaction(&self, _transaction_id: TransactionID) -> Result { - Ok(0) - } - - fn log_add_static_edge( - &self, - _transaction_id: TransactionID, - _t: TimeIndexEntry, - _src: VID, - _dst: VID, - ) -> Result { - Ok(0) - } - fn log_add_edge( &self, _transaction_id: TransactionID, _t: TimeIndexEntry, - _src: VID, - _dst: VID, + _src_name: GID, + _src_id: VID, + _dst_name: GID, + _dst_id: VID, _eid: EID, + _layer_name: Option<&str>, _layer_id: usize, - _props: &[(usize, Prop)], - ) -> Result { - Ok(0) - } - - fn log_node_id( - &self, - _transaction_id: TransactionID, - _gid: GID, - _vid: VID, - ) -> Result { - Ok(0) - } - - fn log_edge_id( - &self, - _transaction_id: TransactionID, - _src: VID, - _dst: VID, - _eid: EID, - _layer_id: usize, - ) -> Result { - Ok(0) - } - - fn log_const_prop_ids>( - &self, - _transaction_id: TransactionID, - _props: &[MaybeNew<(PN, usize, Prop)>], - ) -> Result { - Ok(0) - } - - fn log_temporal_prop_ids>( - &self, - _transaction_id: TransactionID, - _props: &[MaybeNew<(PN, usize, Prop)>], - ) -> Result { - Ok(0) - } - - fn log_layer_id( - &self, - _transaction_id: TransactionID, - _name: &str, - _id: usize, + _props: Vec<(&str, usize, Prop)>, ) -> Result { Ok(0) } @@ -100,7 +40,7 @@ impl GraphWal for NoWal { std::iter::once(Ok((0, ()))) } - fn replay_to_graph( + fn replay_to_graph( _dir: impl AsRef, _graph: &mut G, ) -> Result<(), StorageError> { diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 7538781b16..9752c6ef4f 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -1,5 +1,5 @@ use crate::error::StorageError; -use raphtory_api::core::{entities::properties::prop::Prop, storage::dict_mapper::MaybeNew}; +use raphtory_api::core::entities::properties::prop::Prop; use raphtory_core::{ entities::{EID, GID, VID}, storage::timeindex::TimeIndexEntry, @@ -46,93 +46,18 @@ pub trait GraphWal { /// ReplayEntry represents the type of the wal entry returned during replay. type ReplayEntry; - fn log_begin_transaction(&self, transaction_id: TransactionID) -> Result; - - fn log_end_transaction(&self, transaction_id: TransactionID) -> Result; - - /// Log a static edge addition. - /// - /// # Arguments - /// - /// * `transaction_id` - The transaction ID - /// * `t` - The timestamp of the edge addition - /// * `src` - The source vertex ID - /// * `dst` - The destination vertex ID - fn log_add_static_edge( - &self, - transaction_id: TransactionID, - t: TimeIndexEntry, - src: VID, - dst: VID, - ) -> Result; - - /// Log an edge addition to a layer with temporal props. - /// - /// # Arguments - /// - /// * `transaction_id` - The transaction ID - /// * `t` - The timestamp of the edge addition - /// * `src` - The source vertex ID - /// * `dst` - The destination vertex ID - /// * `eid` - The edge ID - /// * `layer_id` - The layer ID - /// * `props` - The temporal properties of the edge fn log_add_edge( &self, transaction_id: TransactionID, t: TimeIndexEntry, - src: VID, - dst: VID, - eid: EID, - layer_id: usize, - props: &[(usize, Prop)], - ) -> Result; - - fn log_node_id( - &self, - transaction_id: TransactionID, - gid: GID, - vid: VID, - ) -> Result; - - fn log_edge_id( - &self, - transaction_id: TransactionID, - src: VID, - dst: VID, + src_name: GID, + src_id: VID, + dst_name: GID, + dst_id: VID, eid: EID, + layer_name: Option<&str>, layer_id: usize, - ) -> Result; - - /// Log constant prop name -> prop id mappings. - /// - /// # Arguments - /// - /// * `transaction_id` - The transaction ID - /// * `props` - A slice containing new or existing tuples of (prop name, id, value) - fn log_const_prop_ids>( - &self, - transaction_id: TransactionID, - props: &[MaybeNew<(PN, usize, Prop)>], - ) -> Result; - - /// Log temporal prop name -> prop id mappings. - /// - /// # Arguments - /// - /// * `transaction_id` - The transaction ID - /// * `props` - A slice containing new or existing tuples of (prop name, id, value). - fn log_temporal_prop_ids>( - &self, - transaction_id: TransactionID, - props: &[MaybeNew<(PN, usize, Prop)>], - ) -> Result; - - fn log_layer_id( - &self, - transaction_id: TransactionID, - name: &str, - id: usize, + props: Vec<(&str, usize, Prop)>, ) -> Result; /// Logs a checkpoint record, indicating that all Wal operations upto and including @@ -145,74 +70,26 @@ pub trait GraphWal { ) -> impl Iterator>; /// Replays and applies all the wal entries in the given directory to the given graph. - fn replay_to_graph( + fn replay_to_graph( dir: impl AsRef, graph: &mut G, ) -> Result<(), StorageError>; } -/// Trait for defining callbacks for replaying from wal -pub trait GraphReplayer { - fn replay_begin_transaction( - &self, - lsn: LSN, - transaction_id: TransactionID, - ) -> Result<(), StorageError>; - - fn replay_end_transaction( - &self, - lsn: LSN, - transaction_id: TransactionID, - ) -> Result<(), StorageError>; - - fn replay_add_static_edge( - &self, - lsn: LSN, - transaction_id: TransactionID, - t: TimeIndexEntry, - src: VID, - dst: VID, - ) -> Result<(), StorageError>; - +/// Trait for defining callbacks for replaying from wal. +pub trait GraphReplay { fn replay_add_edge( - &self, + &mut self, lsn: LSN, transaction_id: TransactionID, t: TimeIndexEntry, - src: VID, - dst: VID, + src_name: GID, + src_id: VID, + dst_name: GID, + dst_id: VID, eid: EID, + layer_name: Option, layer_id: usize, - props: &[(usize, Prop)], - ) -> Result<(), StorageError>; - - fn replay_node_id( - &self, - lsn: LSN, - transaction_id: TransactionID, - gid: GID, - vid: VID, - ) -> Result<(), StorageError>; - - fn replay_const_prop_ids>( - &self, - lsn: LSN, - transaction_id: TransactionID, - props: &[MaybeNew<(PN, usize, Prop)>], - ) -> Result<(), StorageError>; - - fn replay_temporal_prop_ids>( - &self, - lsn: LSN, - transaction_id: TransactionID, - props: &[MaybeNew<(PN, usize, Prop)>], - ) -> Result<(), StorageError>; - - fn replay_layer_id( - &self, - lsn: LSN, - transaction_id: TransactionID, - name: &str, - id: usize, + props: Vec<(String, usize, Prop)>, ) -> Result<(), StorageError>; } diff --git a/raphtory-api/src/core/entities/mod.rs b/raphtory-api/src/core/entities/mod.rs index cce5d1c80a..2256f86c6d 100644 --- a/raphtory-api/src/core/entities/mod.rs +++ b/raphtory-api/src/core/entities/mod.rs @@ -64,6 +64,10 @@ impl Default for EID { } impl EID { + pub fn index(&self) -> usize { + self.0 + } + pub fn as_u64(self) -> u64 { self.0 as u64 } diff --git a/raphtory-storage/src/mutation/addition_ops.rs b/raphtory-storage/src/mutation/addition_ops.rs index e918be32d9..cef904562a 100644 --- a/raphtory-storage/src/mutation/addition_ops.rs +++ b/raphtory-storage/src/mutation/addition_ops.rs @@ -5,7 +5,7 @@ use crate::{ MutationError, }, }; -use db4_graph::{TransactionManager, WriteLockedGraph}; +use db4_graph::WriteLockedGraph; use raphtory_api::{ core::{ entities::{ @@ -20,7 +20,7 @@ use raphtory_api::{ inherit::Base, }; use raphtory_core::entities::{nodes::node_ref::NodeRef, ELID}; -use storage::{Extension, WalImpl}; +use storage::{Extension, wal::LSN}; pub trait InternalAdditionOps { type Error: From; @@ -93,12 +93,6 @@ pub trait InternalAdditionOps { meta: &Meta, props: impl Iterator, ) -> Result>, Self::Error>; - - /// TODO: Not sure the below methods belong here... - - fn transaction_manager(&self) -> &TransactionManager; - - fn wal(&self) -> &WalImpl; } pub trait EdgeWriteLock: Send + Sync { @@ -106,7 +100,6 @@ pub trait EdgeWriteLock: Send + Sync { &mut self, src: impl Into, dst: impl Into, - lsn: u64, ) -> MaybeNew; /// add edge update @@ -116,7 +109,6 @@ pub trait EdgeWriteLock: Send + Sync { src: impl Into, dst: impl Into, eid: MaybeNew, - lsn: u64, props: impl IntoIterator, ) -> MaybeNew; @@ -125,12 +117,13 @@ pub trait EdgeWriteLock: Send + Sync { t: TimeIndexEntry, src: impl Into, dst: impl Into, - lsn: u64, layer: usize, ) -> MaybeNew; fn store_src_node_info(&mut self, id: impl Into, node_id: Option); fn store_dst_node_info(&mut self, id: impl Into, node_id: Option); + + fn set_lsn(&mut self, lsn: LSN); } pub trait SessionAdditionOps: Send + Sync { @@ -287,14 +280,6 @@ impl InternalAdditionOps for GraphStorage { Ok(self.mutable()?.validate_gids(gids)?) } - fn transaction_manager(&self) -> &TransactionManager { - self.mutable().unwrap().transaction_manager.as_ref() - } - - fn wal(&self) -> &WalImpl { - self.mutable().unwrap().wal.as_ref() - } - fn resolve_node_and_type( &self, id: NodeRef, @@ -404,16 +389,6 @@ where self.base().validate_gids(gids) } - #[inline] - fn transaction_manager(&self) -> &TransactionManager { - self.base().transaction_manager() - } - - #[inline] - fn wal(&self) -> &WalImpl { - self.base().wal() - } - fn resolve_node_and_type( &self, id: NodeRef, diff --git a/raphtory-storage/src/mutation/addition_ops_ext.rs b/raphtory-storage/src/mutation/addition_ops_ext.rs index 70cba75036..612caef67d 100644 --- a/raphtory-storage/src/mutation/addition_ops_ext.rs +++ b/raphtory-storage/src/mutation/addition_ops_ext.rs @@ -1,8 +1,9 @@ use crate::mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps, SessionAdditionOps}, + durability_ops::DurabilityOps, MutationError, }; -use db4_graph::{TemporalGraph, TransactionManager, WriteLockedGraph}; +use db4_graph::{TemporalGraph, WriteLockedGraph}; use raphtory_api::core::{ entities::properties::{ meta::{Meta, NODE_ID_IDX, NODE_TYPE_IDX}, @@ -16,14 +17,15 @@ use raphtory_core::{ nodes::node_ref::{AsNodeRef, NodeRef}, GidRef, EID, ELID, MAX_LAYER, VID, }, - storage::timeindex::TimeIndexEntry, + storage::{timeindex::TimeIndexEntry}, }; use storage::{ pages::{node_page::writer::node_info_as_props, session::WriteSession}, persist::strategy::PersistentStrategy, properties::props_meta_writer::PropsMetaWriter, resolver::GIDResolverOps, - Extension, WalImpl, ES, GS, NS, + Extension, transaction::TransactionManager, WalImpl, ES, NS, GS, + wal::LSN, }; pub struct WriteS<'a, EXT: PersistentStrategy, ES = ES, GS = GS>> { @@ -42,9 +44,8 @@ impl<'a, EXT: PersistentStrategy, ES = ES, GS = GS>> Edge &mut self, src: impl Into, dst: impl Into, - lsn: u64, ) -> MaybeNew { - self.static_session.add_static_edge(src, dst, lsn) + self.static_session.add_static_edge(src, dst) } fn internal_add_edge( @@ -53,11 +54,9 @@ impl<'a, EXT: PersistentStrategy, ES = ES, GS = GS>> Edge src: impl Into, dst: impl Into, eid: MaybeNew, - lsn: u64, props: impl IntoIterator, ) -> MaybeNew { - self.static_session - .add_edge_into_layer(t, src, dst, eid, lsn, props); + self.static_session.add_edge_into_layer(t, src, dst, eid, props); eid } @@ -67,18 +66,17 @@ impl<'a, EXT: PersistentStrategy, ES = ES, GS = GS>> Edge t: TimeIndexEntry, src: impl Into, dst: impl Into, - lsn: u64, layer: usize, ) -> MaybeNew { let src = src.into(); let dst = dst.into(); let eid = self .static_session - .add_static_edge(src, dst, lsn) + .add_static_edge(src, dst) .map(|eid| eid.with_layer_deletion(layer)); self.static_session - .delete_edge_from_layer(t, src, dst, eid, lsn); + .delete_edge_from_layer(t, src, dst, eid); eid } @@ -90,7 +88,7 @@ impl<'a, EXT: PersistentStrategy, ES = ES, GS = GS>> Edge self.static_session .node_writers() .get_mut_src() - .update_c_props(pos, 0, [(NODE_ID_IDX, id.into())], 0); + .update_c_props(pos, 0, [(NODE_ID_IDX, id.into())]); }; } @@ -101,9 +99,13 @@ impl<'a, EXT: PersistentStrategy, ES = ES, GS = GS>> Edge self.static_session .node_writers() .get_mut_dst() - .update_c_props(pos, 0, [(NODE_ID_IDX, id.into())], 0); + .update_c_props(pos, 0, [(NODE_ID_IDX, id.into())]); }; } + + fn set_lsn(&mut self, lsn: LSN) { + self.static_session.set_lsn(lsn); + } } impl<'a> SessionAdditionOps for UnlockedSession<'a> { @@ -259,7 +261,6 @@ impl InternalAdditionOps for TemporalGraph { local_pos, 0, node_info_as_props(id.as_gid_ref().left(), None), - 0, ); MaybeNew::Existing(0) } @@ -275,7 +276,6 @@ impl InternalAdditionOps for TemporalGraph { id.as_gid_ref().left(), Some(node_type_id.inner()).filter(|&id| id != 0), ), - 0, ); node_type_id } @@ -339,7 +339,7 @@ impl InternalAdditionOps for TemporalGraph { ) -> Result<(), Self::Error> { let (segment, node_pos) = self.storage().nodes().resolve_pos(v); let mut node_writer = self.storage().node_writer(segment); - node_writer.add_props(t, node_pos, 0, props, 0); + node_writer.add_props(t, node_pos, 0, props); Ok(()) } @@ -380,7 +380,9 @@ impl InternalAdditionOps for TemporalGraph { Ok(prop_ids) } } +} +impl DurabilityOps for TemporalGraph { fn transaction_manager(&self) -> &TransactionManager { &self.transaction_manager } diff --git a/raphtory-storage/src/mutation/deletion_ops.rs b/raphtory-storage/src/mutation/deletion_ops.rs index 06b934cc3c..0a7b0a4b12 100644 --- a/raphtory-storage/src/mutation/deletion_ops.rs +++ b/raphtory-storage/src/mutation/deletion_ops.rs @@ -36,8 +36,9 @@ impl InternalDeletionOps for db4_graph::TemporalGraph { layer: usize, ) -> Result, Self::Error> { let mut session = self.storage().write_session(src, dst, None); - let edge = session.add_static_edge(src, dst, 0); - session.delete_edge_from_layer(t, src, dst, edge.map(|eid| eid.with_layer(layer)), 0); + session.set_lsn(0); + let edge = session.add_static_edge(src, dst); + session.delete_edge_from_layer(t, src, dst, edge.map(|eid| eid.with_layer(layer))); Ok(edge) } @@ -52,7 +53,7 @@ impl InternalDeletionOps for db4_graph::TemporalGraph { let (src, dst) = writer.get_edge(0, edge_pos).unwrap_or_else(|| { panic!("Internal Error: Edge {eid:?} not found in storage"); }); - writer.delete_edge(t, edge_pos, src, dst, layer, 0); + writer.delete_edge(t, edge_pos, src, dst, layer); Ok(()) } } diff --git a/raphtory-storage/src/mutation/durability_ops.rs b/raphtory-storage/src/mutation/durability_ops.rs new file mode 100644 index 0000000000..34713df7aa --- /dev/null +++ b/raphtory-storage/src/mutation/durability_ops.rs @@ -0,0 +1,37 @@ +use storage::{transaction::TransactionManager, WalImpl}; +use crate::graph::graph::GraphStorage; +use raphtory_api::inherit::Base; + +/// Accessor methods for transactions and write-ahead logging. +pub trait DurabilityOps { + fn transaction_manager(&self) -> &TransactionManager; + + fn wal(&self) -> &WalImpl; +} + +impl DurabilityOps for GraphStorage { + fn transaction_manager(&self) -> &TransactionManager { + self.mutable().unwrap().transaction_manager.as_ref() + } + + fn wal(&self) -> &WalImpl { + self.mutable().unwrap().wal.as_ref() + } +} + +pub trait InheritDurabilityOps: Base {} + +impl DurabilityOps for G +where + G::Base: DurabilityOps, +{ + #[inline] + fn transaction_manager(&self) -> &TransactionManager { + self.base().transaction_manager() + } + + #[inline] + fn wal(&self) -> &WalImpl { + self.base().wal() + } +} diff --git a/raphtory-storage/src/mutation/mod.rs b/raphtory-storage/src/mutation/mod.rs index 44f18037b9..7fb66b7a61 100644 --- a/raphtory-storage/src/mutation/mod.rs +++ b/raphtory-storage/src/mutation/mod.rs @@ -4,6 +4,7 @@ use crate::{ mutation::{ addition_ops::InheritAdditionOps, deletion_ops::InheritDeletionOps, property_addition_ops::InheritPropertyAdditionOps, + durability_ops::InheritDurabilityOps, }, }; use parking_lot::RwLockWriteGuard; @@ -31,6 +32,7 @@ pub mod addition_ops; pub mod addition_ops_ext; pub mod deletion_ops; pub mod property_addition_ops; +pub mod durability_ops; pub type NodeWriterT<'a> = NodeWriter<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS>; pub type EdgeWriterT<'a> = EdgeWriter<'a, RwLockWriteGuard<'a, MemEdgeSegment>, ES>; @@ -70,5 +72,6 @@ pub trait InheritMutationOps: Base {} impl InheritAdditionOps for G {} impl InheritPropertyAdditionOps for G {} impl InheritDeletionOps for G {} +impl InheritDurabilityOps for G {} impl InheritMutationOps for Arc {} diff --git a/raphtory-storage/src/mutation/property_addition_ops.rs b/raphtory-storage/src/mutation/property_addition_ops.rs index 0447d09bf7..65e9dcc681 100644 --- a/raphtory-storage/src/mutation/property_addition_ops.rs +++ b/raphtory-storage/src/mutation/property_addition_ops.rs @@ -87,7 +87,7 @@ impl InternalPropertyAdditionOps for db4_graph::TemporalGraph { let (segment_id, node_pos) = self.storage().nodes().resolve_pos(vid); let mut writer = self.storage().nodes().writer(segment_id); writer.check_metadata(node_pos, 0, &props)?; - writer.update_c_props(node_pos, 0, props, 0); + writer.update_c_props(node_pos, 0, props); Ok(writer) } @@ -98,7 +98,7 @@ impl InternalPropertyAdditionOps for db4_graph::TemporalGraph { ) -> Result, Self::Error> { let (segment_id, node_pos) = self.storage().nodes().resolve_pos(vid); let mut writer = self.storage().nodes().writer(segment_id); - writer.update_c_props(node_pos, 0, props, 0); + writer.update_c_props(node_pos, 0, props); Ok(writer) } diff --git a/raphtory/src/db/api/mutation/addition_ops.rs b/raphtory/src/db/api/mutation/addition_ops.rs index ed319d3d97..9d62c395d8 100644 --- a/raphtory/src/db/api/mutation/addition_ops.rs +++ b/raphtory/src/db/api/mutation/addition_ops.rs @@ -14,10 +14,12 @@ use crate::{ prelude::{GraphViewOps, NodeViewOps}, }; use raphtory_api::core::entities::properties::prop::Prop; +use raphtory_core::entities::GID; use raphtory_storage::mutation::addition_ops::{EdgeWriteLock, InternalAdditionOps}; +use raphtory_storage::mutation::durability_ops::DurabilityOps; use storage::wal::{GraphWal, Wal}; -pub trait AdditionOps: StaticGraphViewOps + InternalAdditionOps> { +pub trait AdditionOps: StaticGraphViewOps + InternalAdditionOps> + DurabilityOps { // TODO: Probably add vector reference here like add /// Add a node to the graph /// @@ -143,7 +145,7 @@ pub trait AdditionOps: StaticGraphViewOps + InternalAdditionOps> + StaticGraphViewOps> AdditionOps for G { +impl> + StaticGraphViewOps + DurabilityOps> AdditionOps for G { fn add_node< V: AsNodeRef, T: TryIntoInputTime, @@ -248,7 +250,6 @@ impl> + StaticGraphViewOps> Addit props: PII, layer: Option<&str>, ) -> Result, GraphError> { - // Log transaction start let transaction_id = self.transaction_manager().begin_transaction(); let session = self.write_session().map_err(|err| err.into())?; @@ -267,19 +268,6 @@ impl> + StaticGraphViewOps> Addit ) .map_err(into_graph_err)?; - // Log prop name -> prop id mappings - self.wal() - .log_temporal_prop_ids(transaction_id, &props_with_status) - .unwrap(); - - let props = props_with_status - .into_iter() - .map(|maybe_new| { - let (_, prop_id, prop) = maybe_new.inner(); - (prop_id, prop) - }) - .collect::>(); - let ti = time_from_input_session(&session, t)?; let src_id = self .resolve_node(src.as_node_ref()) @@ -289,76 +277,78 @@ impl> + StaticGraphViewOps> Addit .map_err(into_graph_err)?; let layer_id = self.resolve_layer(layer).map_err(into_graph_err)?; - // Log node -> node id mappings // FIXME: We are logging node -> node id mappings AFTER they are inserted into the // resolver. Make sure resolver mapping CANNOT get to disk before Wal. - if let Some(gid) = src.as_node_ref().as_gid_ref().left() { - self.wal() - .log_node_id(transaction_id, gid.into(), src_id.inner()) - .unwrap(); - } - - if let Some(gid) = dst.as_node_ref().as_gid_ref().left() { - self.wal() - .log_node_id(transaction_id, gid.into(), dst_id.inner()) - .unwrap(); - } + let src_gid = src.as_node_ref().as_gid_ref().left().map(|gid_ref| GID::from(gid_ref)).unwrap(); + let dst_gid = dst.as_node_ref().as_gid_ref().left().map(|gid_ref| GID::from(gid_ref)).unwrap(); let src_id = src_id.inner(); let dst_id = dst_id.inner(); - // Log layer -> layer id mappings - if let Some(layer) = layer { - self.wal() - .log_layer_id(transaction_id, layer, layer_id.inner()) - .unwrap(); - } - let layer_id = layer_id.inner(); - // Holds all locks for nodes and edge until add_edge_op goes out of scope + // Hold all locks for src node, dst node and edge until add_edge_op goes out of scope. let mut add_edge_op = self .atomic_add_edge(src_id, dst_id, None, layer_id) .map_err(into_graph_err)?; - // Log edge addition - let add_static_edge_lsn = self - .wal() - .log_add_static_edge(transaction_id, ti, src_id, dst_id) - .unwrap(); - let edge_id = add_edge_op.internal_add_static_edge(src_id, dst_id, add_static_edge_lsn); - - // Log edge -> edge id mappings - // NOTE: We log edge id mappings after they are inserted into edge segments. - // This is fine as long as we hold onto segment locks for the entire operation. - let add_edge_lsn = self - .wal() - .log_add_edge( - transaction_id, - ti, - src_id, - dst_id, - edge_id.inner(), - layer_id, - &props, - ) - .unwrap(); + // NOTE: We log edge id after it is inserted into the edge segment. + // This is fine as long as we hold onto the edge segment lock through add_edge_op + // for the entire operation. + let edge_id = add_edge_op.internal_add_static_edge(src_id, dst_id); + + // All names, ids and values have been generated for this operation. + // Create a wal entry to mark it as durable. + let props_for_wal = props_with_status + .iter() + .map(|maybe_new| { + let (prop_name, prop_id, prop) = maybe_new.as_ref().inner(); + (prop_name.as_ref(), *prop_id, prop.clone()) + }) + .collect::>(); + + let lsn = self.wal().log_add_edge( + transaction_id, + ti, + src_gid, + src_id, + dst_gid, + dst_id, + edge_id.inner(), + layer, + layer_id, + props_for_wal, + ).unwrap(); + + let props = props_with_status + .into_iter() + .map(|maybe_new| { + let (_, prop_id, prop) = maybe_new.inner(); + (prop_id, prop) + }) + .collect::>(); + let edge_id = add_edge_op.internal_add_edge( ti, src_id, dst_id, edge_id.map(|eid| eid.with_layer(layer_id)), - add_edge_lsn, props, ); add_edge_op.store_src_node_info(src_id, src.as_node_ref().as_gid_ref().left()); add_edge_op.store_dst_node_info(dst_id, dst.as_node_ref().as_gid_ref().left()); - // Log transaction end + // Update the src, dst and edge segments with the lsn of the wal entry. + add_edge_op.set_lsn(lsn); + self.transaction_manager().end_transaction(transaction_id); - // Flush all wal entries to disk. + // Drop to release all the segment locks. + // FIXME: Make sure segments cannot get to disk before wal entry is flushed. + // drop(add_edge_op); + + // Flush the wal entry to disk. self.wal().sync().unwrap(); Ok(EdgeView::new( diff --git a/raphtory/src/db/api/mutation/deletion_ops.rs b/raphtory/src/db/api/mutation/deletion_ops.rs index e25b1ca190..8157040213 100644 --- a/raphtory/src/db/api/mutation/deletion_ops.rs +++ b/raphtory/src/db/api/mutation/deletion_ops.rs @@ -51,7 +51,7 @@ pub trait DeletionOps: .atomic_add_edge(src_id, dst_id, None, layer_id) .map_err(into_graph_err)?; - let edge_id = add_edge_op.internal_delete_edge(ti, src_id, dst_id, 0, layer_id); + let edge_id = add_edge_op.internal_delete_edge(ti, src_id, dst_id, layer_id); add_edge_op.store_src_node_info(src_id, src.as_node_ref().as_gid_ref().left()); add_edge_op.store_dst_node_info(dst_id, dst.as_node_ref().as_gid_ref().left()); diff --git a/raphtory/src/db/api/storage/storage.rs b/raphtory/src/db/api/storage/storage.rs index 84de4c5e81..e188874f54 100644 --- a/raphtory/src/db/api/storage/storage.rs +++ b/raphtory/src/db/api/storage/storage.rs @@ -6,7 +6,7 @@ use crate::{ }, errors::GraphError, }; -use db4_graph::{TemporalGraph, TransactionManager, WriteLockedGraph}; +use db4_graph::{TemporalGraph, WriteLockedGraph}; use raphtory_api::core::{ entities::{ properties::{ @@ -24,6 +24,7 @@ use raphtory_storage::{ layer_ops::InheritLayerOps, mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps, SessionAdditionOps}, + durability_ops::DurabilityOps, addition_ops_ext::{UnlockedSession, WriteS}, deletion_ops::InternalDeletionOps, property_addition_ops::InternalPropertyAdditionOps, @@ -35,7 +36,7 @@ use std::{ path::Path, sync::Arc, }; -use storage::{Extension, WalImpl}; +use storage::{Extension, transaction::TransactionManager, WalImpl, wal::LSN}; #[cfg(feature = "search")] use { @@ -301,9 +302,8 @@ impl EdgeWriteLock for AtomicAddEdgeSession<'_> { &mut self, src: impl Into, dst: impl Into, - lsn: u64, ) -> MaybeNew { - self.session.internal_add_static_edge(src, dst, lsn) + self.session.internal_add_static_edge(src, dst) } fn internal_add_edge( @@ -312,11 +312,10 @@ impl EdgeWriteLock for AtomicAddEdgeSession<'_> { src: impl Into, dst: impl Into, e_id: MaybeNew, - lsn: u64, props: impl IntoIterator, ) -> MaybeNew { self.session - .internal_add_edge(t, src, dst, e_id, lsn, props) + .internal_add_edge(t, src, dst, e_id, props) } fn internal_delete_edge( @@ -324,10 +323,9 @@ impl EdgeWriteLock for AtomicAddEdgeSession<'_> { t: TimeIndexEntry, src: impl Into, dst: impl Into, - lsn: u64, layer: usize, ) -> MaybeNew { - self.session.internal_delete_edge(t, src, dst, lsn, layer) + self.session.internal_delete_edge(t, src, dst, layer) } fn store_src_node_info(&mut self, id: impl Into, node_id: Option) { @@ -337,6 +335,10 @@ impl EdgeWriteLock for AtomicAddEdgeSession<'_> { fn store_dst_node_info(&mut self, id: impl Into, node_id: Option) { self.session.store_dst_node_info(id, node_id); } + + fn set_lsn(&mut self, lsn: LSN) { + self.session.set_lsn(lsn); + } } impl<'a> SessionAdditionOps for StorageWriteSession<'a> { @@ -575,14 +577,6 @@ impl InternalAdditionOps for Storage { Ok(self.graph.validate_gids(gids)?) } - fn transaction_manager(&self) -> &TransactionManager { - self.graph.mutable().unwrap().transaction_manager.as_ref() - } - - fn wal(&self) -> &WalImpl { - self.graph.mutable().unwrap().wal.as_ref() - } - fn resolve_node_and_type( &self, id: NodeRef, @@ -592,6 +586,16 @@ impl InternalAdditionOps for Storage { } } +impl DurabilityOps for Storage { + fn transaction_manager(&self) -> &TransactionManager { + self.graph.mutable().unwrap().transaction_manager.as_ref() + } + + fn wal(&self) -> &WalImpl { + self.graph.mutable().unwrap().wal.as_ref() + } +} + impl InternalPropertyAdditionOps for Storage { type Error = GraphError; diff --git a/raphtory/src/db/api/view/graph.rs b/raphtory/src/db/api/view/graph.rs index f8fc5d3387..1102225cea 100644 --- a/raphtory/src/db/api/view/graph.rs +++ b/raphtory/src/db/api/view/graph.rs @@ -357,17 +357,16 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { 0, gid.as_ref(), new_type_id, - 0, ); } else { - writer.store_node_id(node_pos, 0, gid.as_ref(), 0); + writer.store_node_id(node_pos, 0, gid.as_ref()); } graph_storage .write_session()? .set_node(gid.as_ref(), new_id)?; for (t, row) in node.rows() { - writer.add_props(t, node_pos, 0, row, 0); + writer.add_props(t, node_pos, 0, row); } writer.update_c_props( @@ -375,7 +374,6 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { 0, node.metadata_ids() .filter_map(|id| node.get_metadata(id).map(|prop| (id, prop))), - 0, ); } } @@ -396,13 +394,13 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { if let Some(edge_pos) = shard.resolve_pos(eid) { let mut writer = shard.writer(); // make the edge for the first time - writer.add_static_edge(Some(edge_pos), src, dst, 0, false); + writer.add_static_edge(Some(edge_pos), src, dst, false); for edge in edge.explode_layers() { let layer = layer_map[edge.edge.layer().unwrap()]; for edge in edge.explode() { let t = edge.edge.time().unwrap(); - writer.add_edge(t, edge_pos, src, dst, [], layer, 0); + writer.add_edge(t, edge_pos, src, dst, [], layer); } //TODO: move this in edge.row() for (t, t_props) in edge @@ -422,7 +420,7 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { let props = t_props .map(|(_, prop_id, prop)| (prop_id, prop)) .collect::>(); - writer.add_edge(t, edge_pos, src, dst, props, layer, 0); + writer.add_edge(t, edge_pos, src, dst, props, layer); } writer.update_c_props( edge_pos, @@ -443,7 +441,7 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { self.layer_ids(), ) { let layer = layer_map[layer]; - writer.delete_edge(t, edge_pos, src, dst, layer, 0); + writer.delete_edge(t, edge_pos, src, dst, layer); } } } @@ -460,12 +458,12 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { if let Some(node_pos) = maybe_src_pos { let mut writer = shard.writer(); - writer.add_static_outbound_edge(node_pos, dst_id, eid, 0); + writer.add_static_outbound_edge(node_pos, dst_id, eid); } if let Some(node_pos) = maybe_dst_pos { let mut writer = shard.writer(); - writer.add_static_inbound_edge(node_pos, src_id, eid, 0); + writer.add_static_inbound_edge(node_pos, src_id, eid); } for e in edge.explode_layers() { @@ -477,7 +475,6 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { node_pos, dst_id, eid.with_layer(layer), - 0, ); } if let Some(node_pos) = maybe_dst_pos { @@ -487,7 +484,6 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { node_pos, src_id, eid.with_layer(layer), - 0, ); } } @@ -498,14 +494,14 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { let t = e.time_and_index().expect("exploded edge should have time"); let l = layer_map[e.edge.layer().unwrap()]; - writer.update_timestamp(t, node_pos, eid.with_layer(l), 0); + writer.update_timestamp(t, node_pos, eid.with_layer(l)); } if let Some(node_pos) = maybe_dst_pos { let mut writer = shard.writer(); let t = e.time_and_index().expect("exploded edge should have time"); let l = layer_map[e.edge.layer().unwrap()]; - writer.update_timestamp(t, node_pos, eid.with_layer(l), 0); + writer.update_timestamp(t, node_pos, eid.with_layer(l)); } } @@ -519,11 +515,11 @@ impl<'graph, G: GraphView + 'graph> GraphViewOps<'graph> for G { let layer = layer_map[layer]; if let Some(node_pos) = maybe_src_pos { let mut writer = shard.writer(); - writer.update_timestamp(t, node_pos, eid.with_layer_deletion(layer), 0); + writer.update_timestamp(t, node_pos, eid.with_layer_deletion(layer)); } if let Some(node_pos) = maybe_dst_pos { let mut writer = shard.writer(); - writer.update_timestamp(t, node_pos, eid.with_layer_deletion(layer), 0); + writer.update_timestamp(t, node_pos, eid.with_layer_deletion(layer)); } } } diff --git a/raphtory/src/db/graph/edge.rs b/raphtory/src/db/graph/edge.rs index 6e4add574f..1fa9de0f73 100644 --- a/raphtory/src/db/graph/edge.rs +++ b/raphtory/src/db/graph/edge.rs @@ -40,6 +40,7 @@ use raphtory_storage::{ graph::edges::edge_storage_ops::EdgeStorageOps, mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps}, + durability_ops::DurabilityOps, deletion_ops::InternalDeletionOps, property_addition_ops::InternalPropertyAdditionOps, }, @@ -176,7 +177,8 @@ impl< G: StaticGraphViewOps + InternalAdditionOps + InternalPropertyAdditionOps - + InternalDeletionOps, + + InternalDeletionOps + + DurabilityOps, > EdgeView { pub fn delete(&self, t: T, layer: Option<&str>) -> Result<(), GraphError> { @@ -444,7 +446,6 @@ impl EdgeView { src, dst, MaybeNew::New(e_id.with_layer(layer_id)), - 0, props, ); diff --git a/raphtory/src/db/mod.rs b/raphtory/src/db/mod.rs index 54e9c74f6c..63e711afda 100644 --- a/raphtory/src/db/mod.rs +++ b/raphtory/src/db/mod.rs @@ -1,4 +1,3 @@ pub mod api; pub mod graph; -pub mod replay; pub mod task; diff --git a/raphtory/src/db/replay/mod.rs b/raphtory/src/db/replay/mod.rs deleted file mode 100644 index 2c356faa3a..0000000000 --- a/raphtory/src/db/replay/mod.rs +++ /dev/null @@ -1,115 +0,0 @@ -use db4_graph::TemporalGraph; -use raphtory_api::core::{ - entities::{properties::prop::Prop, EID, GID, VID}, - storage::{dict_mapper::MaybeNew, timeindex::TimeIndexEntry}, -}; -use storage::{ - api::edges::EdgeSegmentOps, - error::StorageError, - wal::{GraphReplayer, TransactionID, LSN}, - Extension, -}; - -/// Wrapper struct for implementing GraphReplayer for a TemporalGraph. -/// This is needed to workaround Rust's orphan rule since both ReplayGraph and TemporalGraph -/// are foreign to this crate. -#[derive(Debug)] -pub struct ReplayGraph { - graph: TemporalGraph, -} - -impl ReplayGraph { - pub fn new(graph: TemporalGraph) -> Self { - Self { graph } - } -} - -impl GraphReplayer for ReplayGraph { - fn replay_begin_transaction( - &self, - lsn: LSN, - transaction_id: TransactionID, - ) -> Result<(), StorageError> { - Ok(()) - } - - fn replay_end_transaction( - &self, - lsn: LSN, - transaction_id: TransactionID, - ) -> Result<(), StorageError> { - Ok(()) - } - - fn replay_add_static_edge( - &self, - lsn: LSN, - transaction_id: TransactionID, - t: TimeIndexEntry, - src: VID, - dst: VID, - ) -> Result<(), StorageError> { - Ok(()) - } - - fn replay_add_edge( - &self, - lsn: LSN, - transaction_id: TransactionID, - t: TimeIndexEntry, - src: VID, - dst: VID, - eid: EID, - layer_id: usize, - props: &[(usize, Prop)], - ) -> Result<(), StorageError> { - let edge_segment = self.graph.storage().edges().get_edge_segment(eid); - - match edge_segment { - Some(edge_segment) => { - edge_segment.head().lsn(); - } - _ => {} - } - - Ok(()) - } - - fn replay_node_id( - &self, - lsn: LSN, - transaction_id: TransactionID, - gid: GID, - vid: VID, - ) -> Result<(), StorageError> { - Ok(()) - } - - fn replay_const_prop_ids>( - &self, - lsn: LSN, - transaction_id: TransactionID, - props: &[MaybeNew<(PN, usize, Prop)>], - ) -> Result<(), StorageError> { - Ok(()) - } - - fn replay_temporal_prop_ids>( - &self, - lsn: LSN, - transaction_id: TransactionID, - props: &[MaybeNew<(PN, usize, Prop)>], - ) -> Result<(), StorageError> { - Ok(()) - } - - fn replay_layer_id( - &self, - lsn: LSN, - transaction_id: TransactionID, - name: &str, - id: usize, - ) -> Result<(), StorageError> { - Ok(()) - } -} diff --git a/raphtory/src/io/arrow/df_loaders.rs b/raphtory/src/io/arrow/df_loaders.rs index 7ddeb6cae1..5b5ec145c6 100644 --- a/raphtory/src/io/arrow/df_loaders.rs +++ b/raphtory/src/io/arrow/df_loaders.rs @@ -203,19 +203,18 @@ pub fn load_nodes_from_df< let mut writer = shard.writer(); let t = TimeIndexEntry(time, secondary_index); let layer_id = STATIC_GRAPH_LAYER_ID; - let lsn = 0; update_time(t); writer - .store_node_id_and_node_type(mut_node, layer_id, gid, *node_type, lsn); + .store_node_id_and_node_type(mut_node, layer_id, gid, *node_type); let t_props = prop_cols.iter_row(row); let c_props = metadata_cols .iter_row(row) .chain(shared_metadata.iter().cloned()); - writer.add_props(t, mut_node, layer_id, t_props, lsn); - writer.update_c_props(mut_node, layer_id, c_props, lsn); + writer.add_props(t, mut_node, layer_id, t_props); + writer.update_c_props(mut_node, layer_id, c_props); }; } @@ -415,10 +414,10 @@ pub fn load_edges_from_df