Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9b2748e
remove unused
fabianmurariu Nov 28, 2025
7bde59c
rename pages to segments and refactor some APIs for counting
fabianmurariu Dec 2, 2025
2b7c9f7
added better iterators for new node layout
fabianmurariu Dec 3, 2025
b5df1bb
added first draft of state mapping VID -> value
fabianmurariu Dec 3, 2025
045ecb4
use state index for node state when necessary
fabianmurariu Dec 4, 2025
49852ae
Index is now an enum
fabianmurariu Dec 4, 2025
f430064
changes to task runners to support non contiguous VIDs
fabianmurariu Dec 5, 2025
9b26888
fix test_hits
fabianmurariu Dec 8, 2025
fab89ed
fixes to algorithms and materialize to integrate Index
fabianmurariu Dec 8, 2025
abb695a
fixes for motifs
fabianmurariu Dec 8, 2025
e142476
fixes for dijkstra
fabianmurariu Dec 8, 2025
6a0879f
fixes for algo tests, CCs still remaining
fabianmurariu Dec 8, 2025
74ba68d
refactor edge_loading into a separate module
fabianmurariu Dec 11, 2025
51ff462
refactor load_edges_from_df
fabianmurariu Dec 11, 2025
90b997c
introduce stable ids in parquet loading
fabianmurariu Dec 11, 2025
51bb16e
load_edges_from_df supports non resolution variant
fabianmurariu Dec 12, 2025
991378b
can encode with stable id, decode is broken
fabianmurariu Dec 12, 2025
00409fc
move load nodes in submodule
fabianmurariu Dec 12, 2025
e994b2d
df_loaders refactor progress
fabianmurariu Dec 15, 2025
ab7dd3c
fixes post rebase
fabianmurariu Dec 15, 2025
8f1da08
simple test gets the num nodes correctly
fabianmurariu Dec 17, 2025
02f88c6
fix edge count on parquet decoding
fabianmurariu Dec 17, 2025
0994310
fixes for various counts/comparisons on bulk loading
fabianmurariu Dec 18, 2025
b0eb48e
first end-2-end for stable id encoding passes
fabianmurariu Dec 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ raphtory-core = { version = "0.16.2", path = "raphtory-core", default-features =
raphtory-graphql = { version = "0.16.2", path = "raphtory-graphql", default-features = false }
raphtory-storage = { version = "0.16.2", path = "raphtory-storage", default-features = false }
async-graphql = { version = "7.0.16", features = ["dynamic-schema"] }
bincode = "1.3.3"
bincode = {version = "2", features = ["serde"]}
async-graphql-poem = "7.0.16"
dynamic-graphql = "0.10.1"
derive_more = "2.0.1"
Expand Down Expand Up @@ -100,6 +100,7 @@ num-integer = "0.1"
rand_distr = "0.5.1"
rustc-hash = "2.0.0"
twox-hash = "2.1.0"
tinyvec = { version = "1.10", features = ["serde", "alloc"] }
lock_api = { version = "0.4.11", features = ["arc_lock", "serde"] }
dashmap = { version = "6.0.1", features = ["serde", "rayon"] }
glam = "0.29.0"
Expand Down
31 changes: 13 additions & 18 deletions db4-graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use storage::{
layer_counter::GraphStats,
locked::{
edges::WriteLockedEdgePages, graph_props::WriteLockedGraphPropPages,
nodes::WriteLockedNodePages,
nodes::WriteLockedNodeSegments,
},
},
persist::strategy::{Config, PersistentStrategy},
Expand All @@ -35,7 +35,7 @@ use tempfile::TempDir;
pub struct TemporalGraph<EXT: Config = Extension> {
// mapping between logical and physical ids
pub logical_to_physical: Arc<GIDResolver>,
pub node_count: AtomicUsize,
pub event_counter: AtomicUsize,
storage: Arc<Layer<EXT>>,
graph_dir: Option<GraphDir>,
pub transaction_manager: Arc<TransactionManager>,
Expand Down Expand Up @@ -152,14 +152,13 @@ impl<EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>> Temporal

let gid_resolver_dir = path.join("gid_resolver");
let resolver = GIDResolver::new_with_path(&gid_resolver_dir)?;
let node_count = AtomicUsize::new(storage.nodes().num_nodes());
let wal_dir = path.join("wal");
let wal = Arc::new(WalImpl::new(Some(wal_dir))?);

Ok(Self {
graph_dir: Some(path.into()),
event_counter: AtomicUsize::new(resolver.len()),
logical_to_physical: resolver.into(),
node_count,
storage: Arc::new(storage),
transaction_manager: Arc::new(TransactionManager::new(wal.clone())),
wal,
Expand Down Expand Up @@ -205,9 +204,9 @@ impl<EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>> Temporal
Ok(Self {
graph_dir,
logical_to_physical,
node_count: AtomicUsize::new(0),
storage: Arc::new(storage),
transaction_manager: Arc::new(TransactionManager::new(wal.clone())),
event_counter: AtomicUsize::new(0),
wal,
})
}
Expand Down Expand Up @@ -375,7 +374,7 @@ pub struct WriteLockedGraph<'a, EXT>
where
EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>,
{
pub nodes: WriteLockedNodePages<'a, storage::NS<EXT>>,
pub nodes: WriteLockedNodeSegments<'a, storage::NS<EXT>>,
pub edges: WriteLockedEdgePages<'a, storage::ES<EXT>>,
pub graph_props: WriteLockedGraphPropPages<'a, storage::GS<EXT>>,
pub graph: &'a TemporalGraph<EXT>,
Expand All @@ -397,21 +396,17 @@ impl<'a, EXT: PersistentStrategy<NS = NS<EXT>, ES = ES<EXT>, GS = GS<EXT>>>
self.graph
}

pub fn resize_chunks_to_num_nodes(&mut self, num_nodes: usize) {
if num_nodes == 0 {
return;
pub fn resize_chunks_to_num_nodes(&mut self, max_vid: Option<VID>) {
if let Some(max_vid) = max_vid {
let (chunks_needed, _) = self.graph.storage.nodes().resolve_pos(max_vid);
self.graph.storage().nodes().grow(chunks_needed + 1);
std::mem::take(&mut self.nodes);
self.nodes = self.graph.storage.nodes().write_locked();
}
let (chunks_needed, _) = self.graph.storage.nodes().resolve_pos(VID(num_nodes - 1));
self.graph.storage().nodes().grow(chunks_needed + 1);
std::mem::take(&mut self.nodes);
self.nodes = self.graph.storage.nodes().write_locked();
}

pub fn resize_chunks_to_num_edges(&mut self, num_edges: usize) {
if num_edges == 0 {
return;
}
let (chunks_needed, _) = self.graph.storage.edges().resolve_pos(EID(num_edges - 1));
pub fn resize_chunks_to_num_edges(&mut self, max_eid: EID) {
let (chunks_needed, _) = self.graph.storage.edges().resolve_pos(max_eid);
self.graph.storage().edges().grow(chunks_needed + 1);
std::mem::take(&mut self.edges);
self.edges = self.graph.storage.edges().write_locked();
Expand Down
1 change: 1 addition & 0 deletions db4-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ itertools.workspace = true
thiserror.workspace = true
roaring.workspace = true
sysinfo.workspace = true
tinyvec.workspace = true
proptest = { workspace = true, optional = true }
tempfile = { workspace = true, optional = true }
iter-enum = { workspace = true, features = ["rayon"] }
Expand Down
6 changes: 5 additions & 1 deletion db4-storage/src/api/edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub trait EdgeSegmentOps: Send + Sync + std::fmt::Debug + 'static {

fn t_len(&self) -> usize;
fn num_layers(&self) -> usize;
// Persistent layer count, not used for up to date counts
fn layer_count(&self, layer_id: usize) -> u32;

fn load(
Expand Down Expand Up @@ -67,7 +68,10 @@ pub trait EdgeSegmentOps: Send + Sync + std::fmt::Debug + 'static {
head_lock: impl DerefMut<Target = MemEdgeSegment>,
) -> Result<(), StorageError>;

fn increment_num_edges(&self) -> u32;
fn increment_num_edges(&self) -> u32 {
self.edges_counter()
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}

fn contains_edge(
&self,
Expand Down
50 changes: 35 additions & 15 deletions db4-storage/src/api/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ use std::{
borrow::Cow,
ops::{Deref, DerefMut, Range},
path::{Path, PathBuf},
sync::Arc,
sync::{Arc, atomic::AtomicU32},
};

use rayon::prelude::*;

use crate::{
LocalPOS,
error::StorageError,
gen_ts::LayerIter,
pages::node_store::increment_and_clamp,
segments::node::segment::MemNodeSegment,
utils::{Iter2, Iter3, Iter4},
};
Expand All @@ -47,12 +50,6 @@ pub trait NodeSegmentOps: Send + Sync + std::fmt::Debug + 'static {

fn t_len(&self) -> usize;

fn event_id(&self) -> i64;

fn increment_event_id(&self, i: i64);

fn decrement_event_id(&self) -> i64;

fn load(
page_id: usize,
node_meta: Arc<Meta>,
Expand Down Expand Up @@ -81,14 +78,6 @@ pub trait NodeSegmentOps: Send + Sync + std::fmt::Debug + 'static {

fn try_head_mut(&self) -> Option<RwLockWriteGuard<'_, MemNodeSegment>>;

fn num_nodes(&self) -> u32 {
self.layer_count(0)
}

fn num_layers(&self) -> usize;

fn layer_count(&self, layer_id: usize) -> u32;

fn notify_write(
&self,
head_lock: impl DerefMut<Target = MemNodeSegment>,
Expand Down Expand Up @@ -128,14 +117,45 @@ pub trait NodeSegmentOps: Send + Sync + std::fmt::Debug + 'static {
&self,
locked_head: impl DerefMut<Target = MemNodeSegment>,
) -> Result<(), StorageError>;

fn nodes_counter(&self) -> &AtomicU32;

fn increment_num_nodes(&self, max_page_len: u32) {
increment_and_clamp(self.nodes_counter(), max_page_len);
}

fn num_nodes(&self) -> u32 {
self.nodes_counter()
.load(std::sync::atomic::Ordering::Relaxed)
}

fn num_layers(&self) -> usize;

fn layer_count(&self, layer_id: usize) -> u32;
}

pub trait LockedNSSegment: std::fmt::Debug + Send + Sync {
type EntryRef<'a>: NodeRefOps<'a>
where
Self: 'a;

fn num_nodes(&self) -> u32;

fn entry_ref<'a>(&'a self, pos: impl Into<LocalPOS>) -> Self::EntryRef<'a>;

fn iter_entries<'a>(&'a self) -> impl Iterator<Item = Self::EntryRef<'a>> + Send + Sync + 'a {
let num_nodes = self.num_nodes();
(0..num_nodes).map(move |vid| self.entry_ref(LocalPOS(vid)))
}

fn par_iter_entries<'a>(
&'a self,
) -> impl ParallelIterator<Item = Self::EntryRef<'a>> + Send + Sync + 'a {
let num_nodes = self.num_nodes();
(0..num_nodes)
.into_par_iter()
.map(move |vid| self.entry_ref(LocalPOS(vid)))
}
}

pub trait NodeEntryOps<'a>: Send + Sync + 'a {
Expand Down
Loading
Loading