Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 8 additions & 42 deletions db4-graph/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::{
io,
path::{Path, PathBuf},
sync::{
atomic::{self, AtomicU64, AtomicUsize},
Arc,
},
sync::{atomic::AtomicUsize, Arc},
};

use raphtory_api::core::{
Expand All @@ -26,11 +23,13 @@ use storage::{
},
persist::strategy::{Config, PersistentStrategy},
resolver::GIDResolverOps,
wal::{GraphWal, TransactionID, Wal},
Extension, GIDResolver, Layer, ReadLockedLayer, WalImpl, ES, GS, NS,
Extension, GIDResolver, Layer, ReadLockedLayer, transaction::TransactionManager,
WalImpl, ES, NS, GS, wal::Wal,
};
use tempfile::TempDir;

mod replay;

#[derive(Debug)]
pub struct TemporalGraph<EXT: Config = Extension> {
// mapping between logical and physical ids
Expand Down Expand Up @@ -83,40 +82,6 @@ impl<'a> From<&'a Path> for GraphDir {
}
}

#[derive(Debug)]
pub struct TransactionManager {
last_transaction_id: AtomicU64,
wal: Arc<WalImpl>,
}

impl TransactionManager {
const STARTING_TRANSACTION_ID: TransactionID = 1;

pub fn new(wal: Arc<WalImpl>) -> Self {
Self {
last_transaction_id: AtomicU64::new(Self::STARTING_TRANSACTION_ID),
wal,
}
}

pub fn load(self, last_transaction_id: TransactionID) {
self.last_transaction_id
.store(last_transaction_id, atomic::Ordering::SeqCst)
}

pub fn begin_transaction(&self) -> TransactionID {
let transaction_id = self
.last_transaction_id
.fetch_add(1, atomic::Ordering::SeqCst);
self.wal.log_begin_transaction(transaction_id).unwrap();
transaction_id
}

pub fn end_transaction(&self, transaction_id: TransactionID) {
self.wal.log_end_transaction(transaction_id).unwrap();
}
}

impl Default for TemporalGraph<Extension> {
fn default() -> Self {
Self::new(Extension::default()).unwrap()
Expand Down Expand Up @@ -161,7 +126,7 @@ impl<EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>> Temporal
logical_to_physical: resolver.into(),
node_count,
storage: Arc::new(storage),
transaction_manager: Arc::new(TransactionManager::new(wal.clone())),
transaction_manager: Arc::new(TransactionManager::new()),
wal,
})
}
Expand Down Expand Up @@ -207,7 +172,7 @@ impl<EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>> Temporal
logical_to_physical,
node_count: AtomicUsize::new(0),
storage: Arc::new(storage),
transaction_manager: Arc::new(TransactionManager::new(wal.clone())),
transaction_manager: Arc::new(TransactionManager::new()),
wal,
})
}
Expand Down Expand Up @@ -371,6 +336,7 @@ impl<EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>> Temporal
}
}

/// Holds write locks across all segments in the graph for fast bulk ingestion.
pub struct WriteLockedGraph<'a, EXT>
where
EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
Expand Down
131 changes: 131 additions & 0 deletions db4-graph/src/replay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//! Implements WAL replay for a `WriteLockedGraph`.
//! Allows for fast replay by making use of one-time lock acquisition for
//! all the segments in the graph.

use storage::pages::resolve_pos;
use crate::{WriteLockedGraph};
use raphtory_api::core::{
entities::{properties::prop::Prop, EID, GID, VID},
storage::timeindex::TimeIndexEntry,
entities::properties::meta::STATIC_GRAPH_LAYER_ID,
};
use raphtory_core::entities::GidRef;
use storage::{
persist::strategy::PersistentStrategy,
NS, ES, GS,
error::StorageError,
wal::{GraphReplay, TransactionID, LSN},
};
use storage::resolver::GIDResolverOps;

impl<EXT> GraphReplay for WriteLockedGraph<'_, EXT>
where
EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
{
fn replay_add_edge(
&mut self,
lsn: LSN,
transaction_id: TransactionID,
t: TimeIndexEntry,
src_name: GID,
src_id: VID,
dst_name: GID,
dst_id: VID,
eid: EID,
layer_name: Option<String>,
layer_id: usize,
props: Vec<(String, usize, Prop)>,
) -> Result<(), StorageError> {
// TODO: Check max lsn on disk to see if this record should be replayed.

let temporal_graph = self.graph();
let node_max_page_len = temporal_graph.storage().nodes().max_page_len();
let edge_max_page_len = temporal_graph.storage().edges().max_page_len();

// 1. Insert prop ids into edge meta.
// No need to validate props again since they are already validated before
// being logged to the WAL.
let edge_meta = temporal_graph.edge_meta();
let mut prop_ids_and_values = Vec::new();

for (prop_name, prop_id, prop_value) in props.into_iter() {
let prop_mapper = edge_meta.temporal_prop_mapper();

prop_mapper.set_id_and_dtype(prop_name, prop_id, prop_value.dtype());
prop_ids_and_values.push((prop_id, prop_value));
}

// 2. Insert node ids into resolver.
temporal_graph.logical_to_physical.set(GidRef::from(&src_name), src_id)?;
temporal_graph.logical_to_physical.set(GidRef::from(&dst_name), dst_id)?;

// 3. Insert layer id into the layer meta of both edge and node.
let node_meta = temporal_graph.node_meta();

edge_meta.layer_meta().set_id(layer_name.as_deref().unwrap_or("_default"), layer_id);
node_meta.layer_meta().set_id(layer_name.as_deref().unwrap_or("_default"), layer_id);

// 4. Grab src writer and add edge data.
let (src_segment_id, src_pos) = resolve_pos(src_id, node_max_page_len);
let num_nodes = src_id.index() + 1;
self.resize_chunks_to_num_nodes(num_nodes); // Create enough segments.

let mut src_writer = self.nodes.get_mut(src_segment_id).unwrap().writer();
src_writer.store_node_id(src_pos, STATIC_GRAPH_LAYER_ID, GidRef::from(&src_name));

let is_new_edge_static = src_writer.get_out_edge(src_pos, dst_id, STATIC_GRAPH_LAYER_ID).is_none();
let is_new_edge_layer = src_writer.get_out_edge(src_pos, dst_id, layer_id).is_none();

// Add the edge to the static graph if it doesn't already exist.
if is_new_edge_static {
src_writer.add_static_outbound_edge(src_pos, dst_id, eid);
}

// Add the edge to the layer if it doesn't already exist, else just record the timestamp.
if is_new_edge_layer {
src_writer.add_outbound_edge(Some(t), src_pos, dst_id, eid.with_layer(layer_id));
} else {
src_writer.update_timestamp(t, src_pos, eid.with_layer(layer_id));
}

// Release the writer for mutable access to dst_writer.
drop(src_writer);

// 5. Grab dst writer and add edge data.
let (dst_segment_id, dst_pos) = resolve_pos(dst_id, node_max_page_len);
let num_nodes = dst_id.index() + 1;
self.resize_chunks_to_num_nodes(num_nodes);

let mut dst_writer = self.nodes.get_mut(dst_segment_id).unwrap().writer();
dst_writer.store_node_id(dst_pos, STATIC_GRAPH_LAYER_ID, GidRef::from(&dst_name));

if is_new_edge_static {
dst_writer.add_static_inbound_edge(dst_pos, src_id, eid);
}

if is_new_edge_layer {
dst_writer.add_inbound_edge(Some(t), dst_pos, src_id, eid.with_layer(layer_id));
} else {
dst_writer.update_timestamp(t, dst_pos, eid.with_layer(layer_id));
}

drop(dst_writer);

// 6. Grab edge writer and add temporal props & metadata.
let (edge_segment_id, edge_pos) = resolve_pos(eid, edge_max_page_len);
let num_edges = eid.index() + 1;
self.resize_chunks_to_num_edges(num_edges);
let mut edge_writer = self.edges.get_mut(edge_segment_id).unwrap().writer();

// Add edge into the static graph if it doesn't already exist.
if is_new_edge_static {
let already_counted = false;
edge_writer.add_static_edge(Some(edge_pos), src_id, dst_id, already_counted);
}

// Add edge into the specified layer with timestamp and props.
edge_writer.add_edge(t, edge_pos, src_id, dst_id, prop_ids_and_values, layer_id);

Ok(())
}
}
3 changes: 1 addition & 2 deletions db4-storage/src/api/edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ pub trait EdgeSegmentOps: Send + Sync + std::fmt::Debug + 'static {

fn try_head_mut(&self) -> Option<RwLockWriteGuard<'_, MemEdgeSegment>>;

/// mark segment as dirty without triggering a write
fn mark_dirty(&self);
fn set_dirty(&self, dirty: bool);

/// notify that an edge was added (might need to write to disk)
fn notify_write(
Expand Down
2 changes: 1 addition & 1 deletion db4-storage/src/api/graph_props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ where

fn est_size(&self) -> usize;

fn mark_dirty(&self);
fn set_dirty(&self, dirty: bool);

fn notify_write(
&self,
Expand Down
2 changes: 1 addition & 1 deletion db4-storage/src/api/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait NodeSegmentOps: Send + Sync + std::fmt::Debug + 'static {
head_lock: impl DerefMut<Target = MemNodeSegment>,
) -> Result<(), StorageError>;

fn mark_dirty(&self);
fn set_dirty(&self, dirty: bool);

fn check_node(&self, pos: LocalPOS, layer_id: usize) -> bool;

Expand Down
1 change: 1 addition & 0 deletions db4-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub mod persist;
pub mod properties;
pub mod resolver;
pub mod segments;
pub mod transaction;
pub mod utils;
pub mod wal;

Expand Down
51 changes: 28 additions & 23 deletions db4-storage/src/pages/edge_page/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
segments::edge::segment::MemEdgeSegment,
};
use arrow_array::{ArrayRef, BooleanArray};
use raphtory_api::core::entities::{VID, properties::prop::Prop};
use raphtory_api::core::entities::{properties::{meta::STATIC_GRAPH_LAYER_ID, prop::Prop}, VID};
use raphtory_core::{
entities::EID,
storage::timeindex::{AsTime, TimeIndexEntry},
Expand Down Expand Up @@ -45,17 +45,20 @@ impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmen
dst: VID,
props: impl IntoIterator<Item = (usize, Prop)>,
layer_id: usize,
lsn: u64,
) -> LocalPOS {
let existing_edge = self
let is_new_edge = !self
.page
.contains_edge(edge_pos, layer_id, self.writer.deref());
if !existing_edge {

if is_new_edge {
self.increment_layer_num_edges(layer_id);
}

self.graph_stats.update_time(t.t());

self.writer
.insert_edge_internal(t, edge_pos, src, dst, layer_id, props, lsn);
.insert_edge_internal(t, edge_pos, src, dst, layer_id, props);

edge_pos
}

Expand Down Expand Up @@ -91,7 +94,6 @@ impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmen
src: VID,
dst: VID,
layer_id: usize,
lsn: u64,
) {
let existing_edge = self
.page
Expand All @@ -101,27 +103,30 @@ impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmen
}
self.graph_stats.update_time(t.t());
self.writer
.delete_edge_internal(t, edge_pos, src, dst, layer_id, lsn);
.delete_edge_internal(t, edge_pos, src, dst, layer_id);
}

/// Adds a static edge to the graph.
///
/// If `edge_pos` is `None`, a new position is allocated. If `Some`, the provided position
/// is used.
/// Set `already_counted` to `true` when bulk loading to avoid double-counting statistics.
pub fn add_static_edge(
&mut self,
edge_pos: Option<LocalPOS>,
src: impl Into<VID>,
dst: impl Into<VID>,
lsn: u64,
exist: bool, // used when edge_pos is Some but the is not counted, this is used in the bulk loader
already_counted: bool,
) -> LocalPOS {
let layer_id = 0; // assuming layer_id 0 for static edges, adjust as needed

if edge_pos.is_some() && !exist {
if edge_pos.is_some() && !already_counted {
self.page.increment_num_edges();
self.increment_layer_num_edges(layer_id);
self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID);
}

let edge_pos = edge_pos.unwrap_or_else(|| self.new_local_pos(layer_id));
let edge_pos = edge_pos.unwrap_or_else(|| self.new_local_pos(STATIC_GRAPH_LAYER_ID));
self.writer
.insert_static_edge_internal(edge_pos, src, dst, layer_id, lsn);
.insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID);

edge_pos
}

Expand All @@ -131,26 +136,26 @@ impl<'a, MP: DerefMut<Target = MemEdgeSegment> + std::fmt::Debug, ES: EdgeSegmen
edge_pos: LocalPOS,
src: VID,
dst: VID,
exists: bool,
edge_exists: bool,
layer_id: usize,
c_props: impl IntoIterator<Item = (usize, Prop)>,
t_props: impl IntoIterator<Item = (usize, Prop)>,
lsn: u64,
) {
if !exists {
self.increment_layer_num_edges(0);
if !edge_exists {
self.increment_layer_num_edges(STATIC_GRAPH_LAYER_ID);
self.increment_layer_num_edges(layer_id);

self.writer
.insert_static_edge_internal(edge_pos, src, dst, STATIC_GRAPH_LAYER_ID);
}

self.writer
.insert_static_edge_internal(edge_pos, src, dst, 0, lsn);
self.graph_stats.update_time(t.t());

self.writer
.update_const_properties(edge_pos, src, dst, layer_id, c_props);

self.graph_stats.update_time(t.t());
self.writer
.insert_edge_internal(t, edge_pos, src, dst, layer_id, t_props, lsn);
.insert_edge_internal(t, edge_pos, src, dst, layer_id, t_props);
}

pub fn segment_id(&self) -> usize {
Expand Down
Loading
Loading