From 127d1a02fb451ca2850c365b29a5bcfedd2860ea Mon Sep 17 00:00:00 2001 From: Taleh Ibrahimli Date: Thu, 12 Feb 2026 11:16:07 +0100 Subject: [PATCH 1/7] improve Readme.md --- README.md | 134 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 79 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 0f7cc81..2bb5001 100644 --- a/README.md +++ b/README.md @@ -1,52 +1,55 @@ # Roda -Ultra‑high‑performance, low‑latency state computer for real‑time analytics and trading systems. Roda lets you build -deterministic streaming pipelines with cache‑friendly dataflows, wait‑free reads, and explicit memory bounds — ideal for +Ultra-high-performance, low-latency state computer for real-time analytics and trading systems. Roda lets you build +deterministic streaming pipelines with cache-friendly dataflows, wait-free reads, and explicit memory bounds—ideal for HFT, market microstructure research, telemetry, and any workload where microseconds matter. -> Status: early design and API preview. Examples and tests illustrate the intended DX. Expect rapid iteration and +> Status: Early design and API preview. Examples and tests illustrate the intended DX. Expect rapid iteration and > breaking changes. --- ## Why Roda? -- Deterministic performance: explicit store sizes, preallocated ring buffers, back‑pressure free write path by design - goals. -- Low latency by construction: reader APIs are designed for zero/constant allocations and predictable access patterns. -- Declarative pipelines: express processing in terms of partitions, reductions, and sliding windows. -- Indexable state: build direct indexes for O(1) lookups into rolling state. -- Simple concurrency model: long‑lived workers with single‑writer/multi‑reader patterns. - -## Core concepts - -- Engine: orchestrates workers (long‑lived tasks) that advance your pipelines. -- Store: a bounded, cache‑friendly ring buffer that holds your state. You choose the capacity up front. - - push(value): append a new item (typically by a single writer thread) - - reader(): returns a `StoreReader` view appropriate for consumers - - direct_index(): build a secondary index over the store -- StoreReader: a cursor‑based handle for consuming state from a `Store`. - - next(): advance the cursor to the next available item - - get(), get_at(at), get_last(): retrieve a copy of the state - - get_window::(at): retrieve a fixed‑size window of state - - with(|state| ...), with_at(at, |state| ...), with_last(|state| ...): execute a closure with a borrowed reference -- Aggregator: a partitioned reducer for turning event streams into rolling state. - - from(&reader): set the input source - - to(&mut store): set the output target - - partition_by(|in| Key): assign each input to a partition - - reduce(|idx, in, out| ...): merge an input into the current output for its partition; idx is 0‑based within the - partition window -- Window: a fixed‑size sliding window over the input store. - - from(&reader): set the input source - - to(&mut store): set the output target - - reduce(window_size, |window: &[In]| -> Option): compute optional output when the window is advanced -- DirectIndex: build and query secondary indexes over a store for O(1) state lookups. - - compute(|value| Key): manually update the index for the next available item in the store (typically called inside a worker) +- Deterministic performance: Explicit store sizes, preallocated ring buffers, back-pressure free write path by design goals. +- Low latency by construction: Reader APIs are designed for zero/constant allocations and predictable access patterns. +- Declarative pipelines: Express processing in terms of partitions, reductions, and sliding windows. +- Indexable state: Build direct indexes for O(1) lookups into rolling state. +- Simple concurrency model: Long-lived workers with single-writer/multi-reader patterns. + +## Core Concepts + +- **Engine:** Orchestrates workers (long-lived tasks) that advance your pipelines. +- **Store:** A bounded, cache-friendly ring buffer that holds your state. You choose the capacity up front. + - `push(value)`: Append a new item (typically by a single writer thread). + - `reader()`: Returns a `StoreReader` view appropriate for consumers. + - `direct_index()`: Build a secondary index over the store. +- **StoreReader:** A cursor-based handle for consuming state from a `Store`. + - `next()`: Advance the cursor to the next available item. + - `get()`, `get_at(at)`, `get_last()`: Retrieve a copy of the state. + - `get_window::(at)`: Retrieve a fixed-size window of state. + - `with(|state| ...)`, `with_at(at, |state| ...)`, `with_last(|state| ...)`: Execute a closure with a borrowed reference. +- **Aggregator:** A partitioned reducer for turning event streams into rolling state. + - `from(&reader)`: Set the input source. + - `to(&mut store)`: Set the output target. + - `partition_by(|in| Key)`: Assign each input to a partition. + - `reduce(|idx, in, out| ...)`: Merge an input into the current output for its partition; `idx` is 0-based within the partition window. +- **Window:** A fixed-size sliding window over the input store. + - `from(&reader)`: Set the input source. + - `to(&mut store)`: Set the output target. + - `reduce(window_size, |window: &[In]| -> Option)`: Compute optional output when the window is advanced. +- **DirectIndex:** Build and query secondary indexes over a store for O(1) state lookups. + - `compute(|value| Key)`: Manually update the index for the next available item in the store (typically called inside a worker). --- For a deep dive into Roda's memory model, zero-copy internals, and execution patterns, see [DESIGN.md](DESIGN.md). +- **Shared-Nothing Strategy:** While data is shared for efficiency, workers maintain independent logic and state to avoid contention. +- **Microsecond Precision:** Built specifically for systems where every microsecond of jitter impacts the bottom line. +- **Cache-Friendly:** Data layout is optimized for CPU cache lines, minimizing cache misses during pipeline execution. +- **Built-in Indexing:** O(1) secondary lookups without the overhead of a general-purpose database. + ## Architecture at a Glance Roda is designed as a **Shared-Memory, Single-Writer Multi-Reader (SWMR)** system: @@ -55,26 +58,39 @@ Roda is designed as a **Shared-Memory, Single-Writer Multi-Reader (SWMR)** syste - **Deterministic:** Memory is pre-allocated; no allocations on the hot path. - **Declarative:** Pipelines are built by connecting `Store`, `Aggregator`, and `Window` primitives. -## Quick start +## Features -Using the crate directly: +- **Blazing Fast:** Designed for microsecond-level latency using memory-mapped ring buffers. +- **Zero-Copy:** Data is borrowed directly from shared memory regions; no unnecessary allocations on the hot path. +- **Lock-Free:** Single-Writer Multi-Reader (SWMR) pattern with atomic coordination. +- **Deterministic:** Explicit memory management and pre-allocated stores prevent GC pauses or unexpected heap allocations. +- **Declarative API:** Build complex data processing pipelines using `Aggregator`, `Window`, and `Index` primitives. -```bash -# Run the end‑to‑end example -cargo run --example hello_world +## Quick Start + +Add `roda-state` to your `Cargo.toml`: + +```toml +[dependencies] +roda-state = "0.1" ``` -Or add roda‑state to your own Cargo.toml while working from this repository: +Or if you're working from this repository: ```toml [dependencies] roda-state = { path = "." } ``` -## Example: from ticks to OHLC to trading signals +Run the example: + +```bash +cargo run --example hello_world +``` + +## Example: From Ticks to OHLC to Trading Signals -Below is a trimmed version of examples/hello_world.rs that demonstrates a two‑stage pipeline: aggregate ticks into OHLC -candles, then derive a simple momentum signal via a sliding window. +Below is a trimmed version of `examples/hello_world.rs` that demonstrates a two-stage pipeline: aggregate ticks into OHLC candles, then derive a simple momentum signal via a sliding window. ```rust use bytemuck::{Pod, Zeroable}; @@ -86,7 +102,7 @@ use roda_state::{Aggregator, RodaEngine, Window}; struct Tick { symbol: u64, price: f64, - timestamp: u64 + timestamp: u64, } #[repr(C)] @@ -97,7 +113,7 @@ struct OHLC { high: f64, low: f64, close: f64, - timestamp: u64 + timestamp: u64, } #[repr(C)] @@ -106,13 +122,14 @@ struct Signal { symbol: u64, timestamp: u64, direction: i32, - size: u32 + size: u32, } -#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Pod, Zeroable)] +#[repr(C)] struct TimeKey { symbol: u64, - timestamp: u64 + timestamp: u64, } fn main() { @@ -132,13 +149,16 @@ fn main() { let mut ohlc_pipeline: Aggregator = Aggregator::new(); let mut strategy_pipeline: Window = Window::new(); - // Worker 1: aggregate ticks → OHLC and maintain index + // Worker 1: aggregate ticks -> OHLC and maintain index engine.run_worker(move || { tick_reader.next(); ohlc_pipeline .from(&tick_reader) .to(&mut ohlc_store) - .partition_by(|t| TimeKey { symbol: t.symbol, timestamp: t.timestamp / 100_000 }) + .partition_by(|t| TimeKey { + symbol: t.symbol, + timestamp: t.timestamp / 100_000 + }) .reduce(|i, t, c| { if i == 0 { c.open = t.price; @@ -153,10 +173,14 @@ fn main() { c.close = t.price; } }); - ohlc_index.compute(|c| TimeKey { symbol: c.symbol, timestamp: c.timestamp / 100_000 }); + + ohlc_index.compute(|c| TimeKey { + symbol: c.symbol, + timestamp: c.timestamp / 100_000 + }); }); - // Worker 2: 2‑bar momentum signal + // Worker 2: 2-bar momentum signal engine.run_worker(move || { ohlc_reader.next(); strategy_pipeline @@ -176,14 +200,14 @@ fn main() { } ``` -Explore the full example in examples/hello_world.rs for more context. +Explore the full example in `examples/hello_world.rs` for more context. ## Contributing Contributions are welcome! If you have ideas, issues, or benchmarks: -- Open an issue to discuss the use‑case and constraints -- Keep PRs focused and measured; include micro‑benchmarks when changing hot paths +- Open an issue to discuss the use-case and constraints +- Keep PRs focused and measured; include micro-benchmarks when changing hot paths - Follow the existing code style and formatting ## License From 8db7be1e4c465088fd41df1d776b597b9a74d767 Mon Sep 17 00:00:00 2001 From: Taleh Ibrahimli Date: Thu, 12 Feb 2026 19:06:11 +0100 Subject: [PATCH 2/7] improvements --- README.md | 20 +++++-- examples/hello_world.rs | 20 +++++-- src/aggregator.rs | 16 +++--- src/components.rs | 15 ++++- src/engine.rs | 23 +++++--- src/lib.rs | 3 +- src/storage/mmap_journal.rs | 102 +++++++++++++++++++++++++++++++++ src/storage/mod.rs | 1 + src/store.rs | 111 +++++++++++++++++++++++++++--------- src/window.rs | 7 +-- tests/aggregator_tests.rs | 30 +++++----- tests/index_tests.rs | 26 ++++----- tests/logic_tests.rs | 60 +++++++++++++++++++ tests/push_read_tests.rs | 43 ++++++++++---- tests/window_tests.rs | 26 ++++----- 15 files changed, 392 insertions(+), 111 deletions(-) create mode 100644 src/storage/mmap_journal.rs create mode 100644 src/storage/mod.rs create mode 100644 tests/logic_tests.rs diff --git a/README.md b/README.md index 2bb5001..020f815 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ Below is a trimmed version of `examples/hello_world.rs` that demonstrates a two- ```rust use bytemuck::{Pod, Zeroable}; -use roda_state::components::{Index, Store, StoreReader}; +use roda_state::components::{Engine, Index, Store, StoreOptions, StoreReader}; use roda_state::{Aggregator, RodaEngine, Window}; #[repr(C)] @@ -136,11 +136,23 @@ fn main() { let engine = RodaEngine::new(); // Allocate bounded stores (explicit memory profile) - let tick_store = engine.store::(1_000_000); + let tick_store = engine.store::(StoreOptions { + name: "ticks", + size: 1_000_000, + in_memory: true, + }); let tick_reader = tick_store.reader(); - let mut ohlc_store = engine.store::(10_000); + let mut ohlc_store = engine.store::(StoreOptions { + name: "ohlc", + size: 10_000, + in_memory: true, + }); let ohlc_reader = ohlc_store.reader(); - let mut signal_store = engine.store::(10_000); + let mut signal_store = engine.store::(StoreOptions { + name: "signals", + size: 10_000, + in_memory: true, + }); // Index to locate candles by (symbol, time) let ohlc_index = ohlc_store.direct_index::(); diff --git a/examples/hello_world.rs b/examples/hello_world.rs index 0ab2f7e..7db05a9 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -1,5 +1,5 @@ use bytemuck::{Pod, Zeroable}; -use roda_state::components::{Index, Store, StoreReader}; +use roda_state::components::{Engine, Index, Store, StoreOptions, StoreReader}; use roda_state::{Aggregator, RodaEngine, Window}; use std::cmp::min; // ============================================================================== @@ -49,11 +49,23 @@ fn main() { let engine = RodaEngine::new(); // A. RESOURCES - let tick_store = engine.store::(1_000_000); + let tick_store = engine.store::(StoreOptions { + name: "ticks", + size: 1_000_000, + in_memory: true, + }); let tick_reader = tick_store.reader(); - let mut ohlc_store = engine.store::(10_000); + let mut ohlc_store = engine.store::(StoreOptions{ + name: "ohlc", + size: 10_000, + in_memory: true, + }); let ohlc_reader = ohlc_store.reader(); - let mut simple_strategy = engine.store::(10_000); + let mut simple_strategy = engine.store::(StoreOptions{ + name: "simple_strategy", + size: 10_000, + in_memory: true, + }); // The Index tracks where specific candles live in the ring buffer let ohlc_index = ohlc_store.direct_index::(); diff --git a/src/aggregator.rs b/src/aggregator.rs index 6900a9c..b0b1e7b 100644 --- a/src/aggregator.rs +++ b/src/aggregator.rs @@ -1,4 +1,4 @@ -use crate::store::{CircularRodaStore, CircularRodaStoreReader}; +use crate::components::{Store, StoreReader}; use bytemuck::Pod; use std::marker::PhantomData; @@ -8,19 +8,19 @@ pub struct Aggregator { pub(crate) _partition_key: PhantomData, } -impl Aggregator { +impl Aggregator { pub fn to( &self, - _p0: &mut CircularRodaStore, + _p0: &mut impl Store, ) -> Aggregator { todo!() } } -impl Aggregator { +impl Aggregator { pub fn from( &self, - _p0: &CircularRodaStoreReader, + _p0: &impl StoreReader, ) -> Aggregator { todo!() } @@ -44,8 +44,10 @@ impl Default } } -impl Aggregator { - pub fn pipe(_source: CircularRodaStore, _target: CircularRodaStore) -> Self { +impl + Aggregator +{ + pub fn pipe(_source: impl Store, _target: impl Store) -> Self { Self { _v: Default::default(), _out_v: Default::default(), diff --git a/src/components.rs b/src/components.rs index c6f9607..8d5ed64 100644 --- a/src/components.rs +++ b/src/components.rs @@ -1,14 +1,25 @@ use crate::index::DirectIndex; use bytemuck::Pod; -pub trait Store { +pub struct StoreOptions { + pub name: &'static str, + pub size: usize, + pub in_memory: bool, +} + +pub trait Engine { + fn run_worker(&self, runnable: impl FnMut() + Send + 'static); + fn store(&self, options: StoreOptions) -> impl Store + 'static; +} + +pub trait Store: Send { type Reader: StoreReader; fn push(&mut self, state: State); fn reader(&self) -> Self::Reader; fn direct_index(&self) -> DirectIndex; } -pub trait StoreReader { +pub trait StoreReader: Send { fn next(&self) -> bool; fn with(&self, handler: impl FnOnce(&State) -> R) -> Option; diff --git a/src/engine.rs b/src/engine.rs index e382c38..4c868a9 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,28 +1,33 @@ -use crate::store::CircularRodaStore; +use crate::components::{Engine, Store, StoreOptions}; +use crate::store::CircularStore; use bytemuck::Pod; use std::thread; -pub struct RodaEngine {} +pub struct RodaEngine { + root_path: &'static str, +} -impl RodaEngine { - pub fn run_worker(&self, mut runnable: impl FnMut() + Send + 'static) { +impl Engine for RodaEngine { + fn run_worker(&self, mut runnable: impl FnMut() + Send + 'static) { thread::spawn(move || { loop { runnable(); } }); } -} -impl RodaEngine { - pub fn store(&self, _size: u32) -> CircularRodaStore { - todo!() + fn store(&self, options: StoreOptions) -> impl Store + 'static { + CircularStore::new(self.root_path, options) } } impl RodaEngine { pub fn new() -> Self { - Self {} + Self { root_path: "data" } + } + + pub fn new_with_root_path(root_path: &'static str) -> Self { + Self { root_path } } } diff --git a/src/lib.rs b/src/lib.rs index b50c89a..8b678dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,9 +4,10 @@ pub mod engine; pub mod index; pub mod store; pub mod window; +mod storage; pub use crate::aggregator::Aggregator; pub use crate::engine::RodaEngine; pub use crate::index::{DirectIndex, RodaDirectIndexReader}; -pub use crate::store::{CircularRodaStore, CircularRodaStoreReader}; +pub use crate::store::{CircularStore, CircularStoreReader}; pub use crate::window::Window; diff --git a/src/storage/mmap_journal.rs b/src/storage/mmap_journal.rs new file mode 100644 index 0000000..5e5d400 --- /dev/null +++ b/src/storage/mmap_journal.rs @@ -0,0 +1,102 @@ +use bytemuck::Pod; +use memmap2::{MmapMut, MmapOptions}; +use std::fs::OpenOptions; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; + +pub(crate) struct MmapJournal { + ptr: *mut u8, + len: usize, + write_index: Arc, +} + +impl MmapJournal { + /// CREATE: Creates a brand new file, truncating any existing data. + pub fn new(path: Option, total_size: usize) -> Result { + let mut mmap = if let Some(p) = &path { + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(p)?; + + file.set_len(total_size as u64)?; + unsafe { MmapOptions::new().map_mut(&file)? } + } else { + MmapOptions::new().len(total_size).map_anon()? + }; + + let ptr = mmap.as_mut_ptr(); + let len = mmap.len(); + Ok(Self { + ptr, + len, + write_index: Arc::new(Default::default()), + }) + } + + /// OPEN: Loads an existing file and maps its current size. + pub fn load(path: PathBuf) -> Result { + let file = OpenOptions::new().read(true).write(true).open(&path)?; + + let mut mmap = unsafe { MmapOptions::new().map_mut(&file)? }; + + let ptr = mmap.as_mut_ptr(); + let len = mmap.len(); + Ok(Self { + ptr, + len, + write_index: Arc::new(Default::default()), + }) + } + + // --- Bytemuck Methods --- + + /// 1. Read (Immutable) + /// Casts bytes at offset to a reference of T. + pub fn read(&self, offset: usize) -> &T { + let end = offset + size_of::(); + assert!(offset + size_of::() <= self.len); + bytemuck::from_bytes(&self.slice()[offset..end]) + } + + pub fn append(&mut self, state: &T) { + let offset = self.write_index.load(std::sync::atomic::Ordering::Relaxed); + let size = std::mem::size_of::(); + let end = offset + size; + + let dest_slice = self.slice_mut(); + + // 1. Check for buffer overflow + assert!(end <= dest_slice.len(), "Append exceeds buffer capacity!"); + + // 2. Perform the write + dest_slice[offset..end].copy_from_slice(bytemuck::bytes_of(state)); + + self.write_index.store(end, std::sync::atomic::Ordering::Release); + } + + fn slice(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.ptr, self.len) } + } + + fn slice_mut(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) } + } + + pub(crate) fn get_write_index(&self) -> usize { + self.write_index.load(std::sync::atomic::Ordering::Acquire) + } + + pub(crate) fn reader(&self) -> MmapJournal { + MmapJournal { + ptr: self.ptr, + len: self.len, + write_index: self.write_index.clone(), + } + } +} + +unsafe impl Send for MmapJournal {} diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 0000000..c83b146 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1 @@ +pub mod mmap_journal; diff --git a/src/store.rs b/src/store.rs index 6967255..0279f2f 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,62 +1,119 @@ -use crate::components::{Store, StoreReader}; +use crate::components::{Store, StoreOptions, StoreReader}; use crate::index::DirectIndex; +use crate::storage::mmap_journal::MmapJournal; use bytemuck::Pod; -use std::marker::PhantomData; +use std::cell::Cell; +use std::path::PathBuf; -pub struct CircularRodaStore { - pub(crate) _p: PhantomData, +pub struct CircularStore { + storage: MmapJournal, } -pub struct CircularRodaStoreReader { - pub(crate) _p: PhantomData, +pub struct CircularStoreReader { + next_index: Cell, + storage: MmapJournal, } -impl Store for CircularRodaStore { - type Reader = CircularRodaStoreReader; +impl CircularStore { + pub fn new(root_path: &'static str, option: StoreOptions) -> Self { + let storage = if option.in_memory { + MmapJournal::new(None, option.size).unwrap() + } else { + let path: PathBuf = format!("{}/{}.store", root_path, option.name).into(); + if path.exists() { + MmapJournal::load(path).unwrap() + } else { + MmapJournal::new(Some(path), option.size).unwrap() + } + }; - fn push(&mut self, _state: State) { - todo!() + Self { storage } + } +} + +impl Store for CircularStore { + type Reader = CircularStoreReader; + + fn push(&mut self, state: State) { + self.storage.append(&state); } - fn reader(&self) -> CircularRodaStoreReader { - todo!() + fn reader(&self) -> CircularStoreReader { + CircularStoreReader { + next_index: Cell::new(0), + storage: self.storage.reader(), + } } fn direct_index(&self) -> DirectIndex { - todo!() + DirectIndex { + _k: std::marker::PhantomData, + _v: std::marker::PhantomData, + } } } -impl StoreReader for CircularRodaStoreReader { +impl StoreReader for CircularStoreReader { fn next(&self) -> bool { - todo!() + let index_to_read = self.next_index.get(); + let offset = index_to_read * size_of::(); + if offset + size_of::() > self.storage.get_write_index() { + return false; + } + self.next_index.set(index_to_read + 1); + + true } - fn with(&self, _handler: impl FnOnce(&State) -> R) -> Option { - todo!() + fn with(&self, handler: impl FnOnce(&State) -> R) -> Option { + let next_index = self.next_index.get(); + if next_index == 0 { + return None; + } + let current_index = next_index - 1; + let offset = current_index * size_of::(); + Some(handler(self.storage.read(offset))) } - fn with_at(&self, _at: usize, _handler: impl FnOnce(&State) -> R) -> Option { - todo!() + fn with_at(&self, at: usize, handler: impl FnOnce(&State) -> R) -> Option { + let offset = at * size_of::(); + if offset + size_of::() > self.storage.get_write_index() { + return None; + } + Some(handler(self.storage.read(offset))) } - fn with_last(&self, _handler: impl FnOnce(&State) -> R) -> Option { - todo!() + fn with_last(&self, handler: impl FnOnce(&State) -> R) -> Option { + let write_index = self.storage.get_write_index(); + if write_index < size_of::() { + return None; + } + let offset = write_index - size_of::(); + Some(handler(self.storage.read(offset))) } fn get(&self) -> Option { - todo!() + self.with(|s| *s) } - fn get_at(&self, _at: usize) -> Option { - todo!() + fn get_at(&self, at: usize) -> Option { + self.with_at(at, |s| *s) } fn get_last(&self) -> Option { - todo!() + self.with_last(|s| *s) } - fn get_window(&self, _at: usize) -> Option<[State; N]> { - todo!() + fn get_window(&self, at: usize) -> Option<[State; N]> { + let offset = at * size_of::(); + if offset + size_of::() * N > self.storage.get_write_index() { + return None; + } + + let mut window = Vec::with_capacity(N); + for i in 0..N { + window.push(*self.storage.read::(offset + i * size_of::())); + } + window.try_into().ok() } } diff --git a/src/window.rs b/src/window.rs index 2c0aa9a..907f763 100644 --- a/src/window.rs +++ b/src/window.rs @@ -1,5 +1,4 @@ use crate::components::{Store, StoreReader}; -use crate::store::CircularRodaStore; use bytemuck::Pod; use std::marker::PhantomData; @@ -8,7 +7,7 @@ pub struct Window { pub(crate) _out_v: PhantomData, } -impl Window { +impl Window { pub fn from>( &self, _reader: &Reader, @@ -36,8 +35,8 @@ impl Default for Window { } } -impl Window { - pub fn pipe(source: impl StoreReader, target: CircularRodaStore) -> Self { +impl Window { + pub fn pipe(source: impl StoreReader, target: impl Store) -> Self { let _ = source; let _ = target; Self { diff --git a/tests/aggregator_tests.rs b/tests/aggregator_tests.rs index 4e936ea..0a87162 100644 --- a/tests/aggregator_tests.rs +++ b/tests/aggregator_tests.rs @@ -1,5 +1,5 @@ use bytemuck::{Pod, Zeroable}; -use roda_state::components::{Store, StoreReader}; +use roda_state::components::{Engine, Store, StoreOptions, StoreReader}; use roda_state::{Aggregator, RodaEngine}; #[repr(C)] @@ -31,8 +31,8 @@ pub struct GroupKey { #[ignore] fn test_aggregator_count_and_sum() { let engine = RodaEngine::new(); - let mut source = engine.store::(1024); - let mut target = engine.store::(1024); + let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); @@ -75,8 +75,8 @@ fn test_aggregator_count_and_sum() { #[ignore] fn test_aggregator_min_max_tracking() { let engine = RodaEngine::new(); - let mut source = engine.store::(1024); - let mut target = engine.store::(1024); + let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); @@ -128,8 +128,8 @@ fn test_aggregator_min_max_tracking() { #[ignore] fn test_aggregator_multiple_partitions() { let engine = RodaEngine::new(); - let mut source = engine.store::(1024); - let mut target = engine.store::(1024); + let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); @@ -179,8 +179,8 @@ fn test_aggregator_multiple_partitions() { #[ignore] fn test_aggregator_complex_key() { let engine = RodaEngine::new(); - let mut source = engine.store::(1024); - let mut target = engine.store::(1024); + let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); @@ -217,8 +217,8 @@ fn test_aggregator_complex_key() { #[ignore] fn test_aggregator_reset_behavior() { let engine = RodaEngine::new(); - let mut source = engine.store::(10); - let mut target = engine.store::(10); + let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); @@ -267,8 +267,8 @@ fn test_aggregator_reset_behavior() { #[ignore] fn test_aggregator_large_index() { let engine = RodaEngine::new(); - let mut source = engine.store::(1024); - let mut target = engine.store::(1024); + let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); let mut aggregator: Aggregator = Aggregator::new(); @@ -309,8 +309,8 @@ fn test_aggregator_worker_large() { use std::time::Duration; let engine = RodaEngine::new(); - let mut source = engine.store::(2000); - let mut target = engine.store::(2000); + let mut source = engine.store::(StoreOptions { name: "source", size: 2000, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 2000, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); diff --git a/tests/index_tests.rs b/tests/index_tests.rs index 864ff10..eec757d 100644 --- a/tests/index_tests.rs +++ b/tests/index_tests.rs @@ -1,6 +1,6 @@ use bytemuck::{Pod, Zeroable}; use roda_state::RodaEngine; -use roda_state::components::{Index, IndexReader, Store}; +use roda_state::components::{Engine, Index, IndexReader, Store, StoreOptions}; use std::thread; use std::time::Duration; @@ -15,7 +15,7 @@ struct ComplexKey { #[ignore] fn test_index_multiple_values() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index = store.direct_index::(); for i in 0..5 { @@ -37,7 +37,7 @@ fn test_index_multiple_values() { #[ignore] fn test_multiple_indices_on_same_store() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index_double = store.direct_index::(); let index_triple = store.direct_index::(); @@ -58,7 +58,7 @@ fn test_multiple_indices_on_same_store() { #[ignore] fn test_index_complex_key() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index = store.direct_index::(); store.push(100); @@ -88,7 +88,7 @@ fn test_index_complex_key() { #[ignore] fn test_index_shallow_clone_sharing() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index = store.direct_index::(); let clone1 = index.reader(); let clone2 = index.reader(); @@ -104,7 +104,7 @@ fn test_index_shallow_clone_sharing() { #[ignore] fn test_index_collision_overwrite() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index = store.direct_index::(); // Both 10 and 20 will map to key 1 @@ -123,7 +123,7 @@ fn test_index_collision_overwrite() { #[ignore] fn test_index_not_found() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index = store.direct_index::(); store.push(10); @@ -138,7 +138,7 @@ fn test_index_not_found() { #[ignore] fn test_concurrent_push_and_index() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index = store.direct_index::(); let index_reader = index.reader(); @@ -168,8 +168,8 @@ fn test_concurrent_push_and_index() { #[ignore] fn test_run_worker_with_multiple_stores() { let engine = RodaEngine::new(); - let mut store_u32 = engine.store::(1024); - let mut store_string = engine.store::<[u8; 16]>(1024); + let mut store_u32 = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store_string = engine.store::<[u8; 16]>(StoreOptions { name: "test", size: 1024, in_memory: true }); let index_u32 = store_u32.direct_index::(); let index_string = store_string.direct_index::(); @@ -187,7 +187,7 @@ fn test_run_worker_with_multiple_stores() { let mut bytes = [0u8; 16]; bytes[..5].copy_from_slice(b"hello"); store_string.push(bytes); - index_string.compute(|s| { + index_string.compute(|s: &[u8; 16]| { let len = s.iter().take_while(|&&b| b != 0).count(); len }); @@ -205,7 +205,7 @@ fn test_run_worker_with_multiple_stores() { #[ignore] fn test_multiple_workers_reading_index_only_original_computes() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index = store.direct_index::(); let reader1 = index.reader(); @@ -230,7 +230,7 @@ fn test_multiple_workers_reading_index_only_original_computes() { #[ignore] fn test_reader_cannot_compute() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); let index = store.direct_index::(); let _reader = index.reader(); diff --git a/tests/logic_tests.rs b/tests/logic_tests.rs new file mode 100644 index 0000000..2bdeb39 --- /dev/null +++ b/tests/logic_tests.rs @@ -0,0 +1,60 @@ +use roda_state::RodaEngine; +use roda_state::components::{Engine, Store, StoreOptions, StoreReader}; + +#[test] +fn test_reader_next_and_with_logic() { + let engine = RodaEngine::new(); + let mut store = engine.store::(StoreOptions { + name: "logic_test", + size: 1024, + in_memory: true, + }); + let reader = store.reader(); + + // Initially, next() should be false and with() should be None + assert!(!reader.next()); + assert!(reader.with(|&x| x).is_none()); + + // Push one value + store.push(100); + + // next() should now be true + assert!(reader.next()); + // after next(), with() should return the value + assert_eq!(reader.with(|&x| x), Some(100)); + + // next() should now be false again until another push + assert!(!reader.next()); + // but with() should still return the last successfully read value + assert_eq!(reader.with(|&x| x), Some(100)); + + // Push another value + store.push(200); + + // next() should be true + assert!(reader.next()); + // with() should return the new value + assert_eq!(reader.with(|&x| x), Some(200)); +} + +#[test] +fn test_reader_get_at_and_last() { + let engine = RodaEngine::new(); + let mut store = engine.store::(StoreOptions { + name: "logic_test_2", + size: 1024, + in_memory: true, + }); + let reader = store.reader(); + + store.push(10); + store.push(20); + store.push(30); + + assert_eq!(reader.get_at(0), Some(10)); + assert_eq!(reader.get_at(1), Some(20)); + assert_eq!(reader.get_at(2), Some(30)); + assert_eq!(reader.get_at(3), None); + + assert_eq!(reader.get_last(), Some(30)); +} diff --git a/tests/push_read_tests.rs b/tests/push_read_tests.rs index 1667b50..06d5521 100644 --- a/tests/push_read_tests.rs +++ b/tests/push_read_tests.rs @@ -1,11 +1,14 @@ use roda_state::RodaEngine; -use roda_state::components::{Store, StoreReader}; +use roda_state::components::{Engine, Store, StoreOptions, StoreReader}; #[test] -#[ignore] fn test_push_then_read_single() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { + name: "test1", + size: 1024, + in_memory: true, + }); let reader = store.reader(); store.push(42); @@ -15,10 +18,13 @@ fn test_push_then_read_single() { } #[test] -#[ignore] fn test_multiple_push_read_in_order() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { + name: "test2", + size: 1024, + in_memory: true, + }); let reader = store.reader(); for v in [1u32, 2, 3, 4, 5] { @@ -32,10 +38,13 @@ fn test_multiple_push_read_in_order() { } #[test] -#[ignore] fn test_interleaved_push_and_read() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { + name: "test3", + size: 1024, + in_memory: true, + }); let reader = store.reader(); // Push values; verify FIFO order via get_window @@ -52,12 +61,19 @@ fn test_interleaved_push_and_read() { } #[test] -#[ignore] fn test_stores_are_isolated_by_type() { let engine = RodaEngine::new(); - let mut u_store = engine.store::(1024); - let mut i_store = engine.store::(1024); + let mut u_store = engine.store::(StoreOptions { + name: "u32", + size: 1024, + in_memory: true, + }); + let mut i_store = engine.store::(StoreOptions { + name: "i64", + size: 1024, + in_memory: true, + }); let u_reader = u_store.reader(); let i_reader = i_store.reader(); @@ -76,10 +92,13 @@ fn test_stores_are_isolated_by_type() { } #[test] -#[ignore] fn test_push_after_partial_reads() { let engine = RodaEngine::new(); - let mut store = engine.store::(1024); + let mut store = engine.store::(StoreOptions { + name: "test4", + size: 1024, + in_memory: true, + }); let reader = store.reader(); store.push(100); diff --git a/tests/window_tests.rs b/tests/window_tests.rs index a8387a4..cf954b2 100644 --- a/tests/window_tests.rs +++ b/tests/window_tests.rs @@ -1,5 +1,5 @@ use bytemuck::{Pod, Zeroable}; -use roda_state::components::{Store, StoreReader}; +use roda_state::components::{Engine, Store, StoreOptions, StoreReader}; use roda_state::{RodaEngine, Window}; #[repr(C)] @@ -20,8 +20,8 @@ pub struct Analysis { #[ignore] fn test_window_filling_and_sliding() { let engine = RodaEngine::new(); - let mut source = engine.store::(10); - let mut target = engine.store::(10); + let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -69,8 +69,8 @@ fn test_window_filling_and_sliding() { #[ignore] fn test_window_size_one() { let engine = RodaEngine::new(); - let mut source = engine.store::(10); - let mut target = engine.store::(10); + let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -112,8 +112,8 @@ fn test_window_size_one() { #[ignore] fn test_window_large_sliding() { let engine = RodaEngine::new(); - let mut source = engine.store::(100); - let mut target = engine.store::(100); + let mut source = engine.store::(StoreOptions { name: "source", size: 100, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 100, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -166,8 +166,8 @@ fn test_window_worker_large() { use std::time::Duration; let engine = RodaEngine::new(); - let mut source = engine.store::(2000); - let mut target = engine.store::(2000); + let mut source = engine.store::(StoreOptions { name: "source", size: 2000, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 2000, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -210,8 +210,8 @@ fn test_window_worker_large() { #[ignore] fn test_window_max_value() { let engine = RodaEngine::new(); - let mut source = engine.store::(10); - let mut target = engine.store::(10); + let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -247,8 +247,8 @@ fn test_window_all_none_until_full() { use std::sync::atomic::{AtomicUsize, Ordering}; let engine = RodaEngine::new(); - let mut source = engine.store::(10); - let mut target = engine.store::(10); + let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); + let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); From 38a9e11a36a4ad9c7f609bc122f335998eecb737 Mon Sep 17 00:00:00 2001 From: Taleh Ibrahimli Date: Thu, 12 Feb 2026 19:21:12 +0100 Subject: [PATCH 3/7] add bench --- Cargo.toml | 4 + benches/store_bench.rs | 141 ++++++++++++++++++++++++++++++++++++ src/storage/mmap_journal.rs | 47 ++++++++---- src/store.rs | 39 +++++++--- 4 files changed, 205 insertions(+), 26 deletions(-) create mode 100644 benches/store_bench.rs diff --git a/Cargo.toml b/Cargo.toml index 022540a..2577875 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,10 @@ criterion = "0.8.2" [lib] bench = false # We use the 'benches/' directory +[[bench]] +name = "store_bench" +harness = false + [profile.profiling] inherits = "release" debug = true diff --git a/benches/store_bench.rs b/benches/store_bench.rs new file mode 100644 index 0000000..bc6e5ec --- /dev/null +++ b/benches/store_bench.rs @@ -0,0 +1,141 @@ +use std::hint::black_box; +use criterion::{criterion_group, criterion_main, Criterion, Throughput}; +use roda_state::RodaEngine; +use roda_state::components::{Engine, Store, StoreOptions, StoreReader}; +use bytemuck::{Pod, Zeroable}; + +#[derive(Clone, Copy, Zeroable, Pod)] +#[repr(C)] +struct LargeState { + data: [u64; 16], // 128 bytes +} + +fn bench_push(c: &mut Criterion) { + let engine = RodaEngine::new(); + let mut group = c.benchmark_group("push"); + + // 1GB buffer to ensure we don't overflow during benchmarking + let size = 1024 * 1024 * 1024; + let mut store_u64 = engine.store::(StoreOptions { + name: "bench_push_u64", + size, + in_memory: true, + }); + + group.throughput(Throughput::Elements(1)); + group.bench_function("push_u64", |b| { + let mut val = 0u64; + b.iter(|| { + store_u64.push(black_box(val)); + val += 1; + }); + }); + + let mut store_large = engine.store::(StoreOptions { + name: "bench_push_large", + size, + in_memory: true, + }); + + group.bench_function("push_128b", |b| { + let val = LargeState { data: [42; 16] }; + b.iter(|| { + store_large.push(black_box(val)); + }); + }); + + group.finish(); +} + +fn bench_fetch(c: &mut Criterion) { + let engine = RodaEngine::new(); + let mut group = c.benchmark_group("fetch"); + + let size = 1024 * 1024 * 100; // 100MB + let mut store = engine.store::(StoreOptions { + name: "bench_fetch", + size, + in_memory: true, + }); + + // Pre-fill some data + for i in 0..10000 { + store.push(i as u64); + } + let reader = store.reader(); + + group.throughput(Throughput::Elements(1)); + group.bench_function("get_at_u64", |b| { + b.iter(|| { + black_box(reader.get_at(black_box(5000))); + }); + }); + + group.bench_function("get_last_u64", |b| { + b.iter(|| { + black_box(reader.get_last()); + }); + }); + + let mut store_large = engine.store::(StoreOptions { + name: "bench_fetch_large", + size, + in_memory: true, + }); + for _ in 0..10000 { + store_large.push(LargeState { data: [42; 16] }); + } + let reader_large = store_large.reader(); + + group.bench_function("get_at_128b", |b| { + b.iter(|| { + black_box(reader_large.get_at(black_box(5000))); + }); + }); + + group.bench_function("next_get_u64", |b| { + b.iter(|| { + if reader.next() { + black_box(reader.get()); + } + }); + }); + + group.finish(); +} + +fn bench_window(c: &mut Criterion) { + let engine = RodaEngine::new(); + let mut group = c.benchmark_group("window"); + + let size = 1024 * 1024 * 100; // 100MB + let mut store = engine.store::(StoreOptions { + name: "bench_window", + size, + in_memory: true, + }); + + // Pre-fill some data + for i in 0..10000 { + store.push(i as u64); + } + let reader = store.reader(); + + group.throughput(Throughput::Elements(1)); + group.bench_function("get_window_10", |b| { + b.iter(|| { + black_box(reader.get_window::<10>(black_box(5000))); + }); + }); + + group.bench_function("get_window_100", |b| { + b.iter(|| { + black_box(reader.get_window::<100>(black_box(5000))); + }); + }); + + group.finish(); +} + +criterion_group!(benches, bench_push, bench_fetch, bench_window); +criterion_main!(benches); diff --git a/src/storage/mmap_journal.rs b/src/storage/mmap_journal.rs index 5e5d400..a16b044 100644 --- a/src/storage/mmap_journal.rs +++ b/src/storage/mmap_journal.rs @@ -5,13 +5,15 @@ use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicUsize; -pub(crate) struct MmapJournal { +pub(crate) struct MmapRing { + _mmap: Arc, ptr: *mut u8, len: usize, write_index: Arc, + read_only: bool, } -impl MmapJournal { +impl MmapRing { /// CREATE: Creates a brand new file, truncating any existing data. pub fn new(path: Option, total_size: usize) -> Result { let mut mmap = if let Some(p) = &path { @@ -31,9 +33,11 @@ impl MmapJournal { let ptr = mmap.as_mut_ptr(); let len = mmap.len(); Ok(Self { + _mmap: Arc::new(mmap), ptr, len, write_index: Arc::new(Default::default()), + read_only: false, }) } @@ -46,9 +50,11 @@ impl MmapJournal { let ptr = mmap.as_mut_ptr(); let len = mmap.len(); Ok(Self { + _mmap: Arc::new(mmap), ptr, len, write_index: Arc::new(Default::default()), + read_only: false, }) } @@ -57,25 +63,27 @@ impl MmapJournal { /// 1. Read (Immutable) /// Casts bytes at offset to a reference of T. pub fn read(&self, offset: usize) -> &T { - let end = offset + size_of::(); - assert!(offset + size_of::() <= self.len); - bytemuck::from_bytes(&self.slice()[offset..end]) + let actual_offset = offset % self.len; + let end = actual_offset + size_of::(); + assert!(end <= self.len, "Read crosses buffer boundary - alignment issue?"); + bytemuck::from_bytes(&self.slice()[actual_offset..end]) } pub fn append(&mut self, state: &T) { - let offset = self.write_index.load(std::sync::atomic::Ordering::Relaxed); - let size = std::mem::size_of::(); - let end = offset + size; + let current_pos = self.write_index.load(std::sync::atomic::Ordering::Relaxed); + let size = size_of::(); + let actual_offset = current_pos % self.len; + let end = actual_offset + size; let dest_slice = self.slice_mut(); - // 1. Check for buffer overflow - assert!(end <= dest_slice.len(), "Append exceeds buffer capacity!"); + // Check for boundary crossing + assert!(end <= dest_slice.len(), "Append crosses buffer boundary - alignment issue?"); - // 2. Perform the write - dest_slice[offset..end].copy_from_slice(bytemuck::bytes_of(state)); + // Perform the write + dest_slice[actual_offset..end].copy_from_slice(bytemuck::bytes_of(state)); - self.write_index.store(end, std::sync::atomic::Ordering::Release); + self.write_index.store(current_pos + size, std::sync::atomic::Ordering::Release); } fn slice(&self) -> &[u8] { @@ -83,6 +91,7 @@ impl MmapJournal { } fn slice_mut(&mut self) -> &mut [u8] { + assert!(!self.read_only, "Cannot mutate read-only buffer"); unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) } } @@ -90,13 +99,19 @@ impl MmapJournal { self.write_index.load(std::sync::atomic::Ordering::Acquire) } - pub(crate) fn reader(&self) -> MmapJournal { - MmapJournal { + pub(crate) fn len(&self) -> usize { + self.len + } + + pub(crate) fn reader(&self) -> MmapRing { + MmapRing { + _mmap: self._mmap.clone(), ptr: self.ptr, len: self.len, write_index: self.write_index.clone(), + read_only: true, } } } -unsafe impl Send for MmapJournal {} +unsafe impl Send for MmapRing {} diff --git a/src/store.rs b/src/store.rs index 0279f2f..3fbb60d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,29 +1,29 @@ use crate::components::{Store, StoreOptions, StoreReader}; use crate::index::DirectIndex; -use crate::storage::mmap_journal::MmapJournal; +use crate::storage::mmap_journal::MmapRing; use bytemuck::Pod; use std::cell::Cell; use std::path::PathBuf; pub struct CircularStore { - storage: MmapJournal, + storage: MmapRing, } pub struct CircularStoreReader { next_index: Cell, - storage: MmapJournal, + storage: MmapRing, } impl CircularStore { pub fn new(root_path: &'static str, option: StoreOptions) -> Self { let storage = if option.in_memory { - MmapJournal::new(None, option.size).unwrap() + MmapRing::new(None, option.size).unwrap() } else { let path: PathBuf = format!("{}/{}.store", root_path, option.name).into(); if path.exists() { - MmapJournal::load(path).unwrap() + MmapRing::load(path).unwrap() } else { - MmapJournal::new(Some(path), option.size).unwrap() + MmapRing::new(Some(path), option.size).unwrap() } }; @@ -35,6 +35,7 @@ impl Store for CircularStore { type Reader = CircularStoreReader; fn push(&mut self, state: State) { + assert!(self.storage.len() >= size_of::(), "Store size {} is too small for State size {}", self.storage.len(), size_of::()); self.storage.append(&state); } @@ -57,10 +58,20 @@ impl StoreReader for CircularStoreReader { fn next(&self) -> bool { let index_to_read = self.next_index.get(); let offset = index_to_read * size_of::(); - if offset + size_of::() > self.storage.get_write_index() { + let write_index = self.storage.get_write_index(); + + if offset + size_of::() > write_index { return false; } - self.next_index.set(index_to_read + 1); + + let min_offset = write_index.saturating_sub(self.storage.len()); + if offset < min_offset { + // Lapped: skip to the oldest available data + let new_index = min_offset / size_of::(); + self.next_index.set(new_index + 1); + } else { + self.next_index.set(index_to_read + 1); + } true } @@ -77,9 +88,13 @@ impl StoreReader for CircularStoreReader { fn with_at(&self, at: usize, handler: impl FnOnce(&State) -> R) -> Option { let offset = at * size_of::(); - if offset + size_of::() > self.storage.get_write_index() { + let write_index = self.storage.get_write_index(); + if offset + size_of::() > write_index { return None; } + if offset < write_index.saturating_sub(self.storage.len()) { + return None; // Data has been overwritten + } Some(handler(self.storage.read(offset))) } @@ -106,9 +121,13 @@ impl StoreReader for CircularStoreReader { fn get_window(&self, at: usize) -> Option<[State; N]> { let offset = at * size_of::(); - if offset + size_of::() * N > self.storage.get_write_index() { + let write_index = self.storage.get_write_index(); + if offset + size_of::() * N > write_index { return None; } + if offset < write_index.saturating_sub(self.storage.len()) { + return None; // Part of the window has been overwritten + } let mut window = Vec::with_capacity(N); for i in 0..N { From 70faf8e6fc5ad9a6991acfe4053e2cc9c559c804 Mon Sep 17 00:00:00 2001 From: Taleh Ibrahimli Date: Thu, 12 Feb 2026 19:22:26 +0100 Subject: [PATCH 4/7] improve tests --- tests/store_circular_tests.rs | 121 ++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 tests/store_circular_tests.rs diff --git a/tests/store_circular_tests.rs b/tests/store_circular_tests.rs new file mode 100644 index 0000000..01a4ab3 --- /dev/null +++ b/tests/store_circular_tests.rs @@ -0,0 +1,121 @@ +use roda_state::RodaEngine; +use roda_state::components::{Engine, Store, StoreOptions, StoreReader}; + +#[test] +fn test_store_wrap_around() { + let engine = RodaEngine::new(); + let size = 64; // 8 u64s + let mut store = engine.store::(StoreOptions { + name: "test_wrap", + size, + in_memory: true, + }); + let reader = store.reader(); + + // Fill the store + for i in 0..8 { + store.push(i as u64); + } + + // Read all + for i in 0..8 { + assert_eq!(reader.get_at(i), Some(i as u64)); + } + + // Push one more, should overwrite index 0 + store.push(8); + + // index 0 should be gone (overwritten by 8) + assert_eq!(reader.get_at(0), None); + // index 8 should be present + assert_eq!(reader.get_at(8), Some(8)); + + // get_last should be 8 + assert_eq!(reader.get_last(), Some(8)); +} + +#[test] +fn test_reader_lapping_catch_up() { + let engine = RodaEngine::new(); + let size = 64; // 8 u64s + let mut store = engine.store::(StoreOptions { + name: "test_lapping", + size, + in_memory: true, + }); + let reader = store.reader(); + + // Push 4 items + for i in 0..4 { + store.push(i as u64); + } + + // Reader is at index 0. Advance it once. + assert!(reader.next()); // reads 0 + assert_eq!(reader.get(), Some(0)); + + // Now push 8 more items, which will overwrite the current position of the reader (index 1) + // Buffer has [8, 9, 10, 11, 4, 5, 6, 7] (logical indices 8, 9, 10, 11, 4, 5, 6, 7) + // Current next_index in reader is 1. + // min_offset for write_index 12*8 = 96 is 96 - 64 = 32. + // index 1 has offset 8. 8 < 32, so it's lapped. + for i in 4..12 { + store.push(i as u64); + } + + // Calling next() should detect lapping and catch up to the oldest available data (index 4) + assert!(reader.next()); + // It should skip 1, 2, 3 and jump to the oldest available element after being lapped. + // My implementation sets it to min_offset / size + 1. + // min_offset = 32. new_index = 32 / 8 = 4. next_index set to 5. + // So get() should return element 4. + assert_eq!(reader.get(), Some(4)); +} + +#[test] +fn test_get_window_lapping() { + let engine = RodaEngine::new(); + let size = 64; // 8 u64s + let mut store = engine.store::(StoreOptions { + name: "test_window_lapping", + size, + in_memory: true, + }); + let reader = store.reader(); + + for i in 0..12 { + store.push(i as u64); + } + // Write index is 12*8 = 96. min_offset is 32 (index 4). + + // Window at index 4 (length 4) should be [4, 5, 6, 7] + let win = reader.get_window::<4>(4).unwrap(); + assert_eq!(win, [4, 5, 6, 7]); + + // Window at index 3 should be None (partially overwritten) + assert!(reader.get_window::<4>(3).is_none()); +} + +#[test] +fn test_large_rolling_push() { + let engine = RodaEngine::new(); + let size = 1024; // 128 u64s + let mut store = engine.store::(StoreOptions { + name: "test_large_rolling", + size, + in_memory: true, + }); + let reader = store.reader(); + + for i in 0..1000 { + store.push(i as u64); + } + + assert_eq!(reader.get_last(), Some(999)); + + // The buffer holds last 128 elements. + // 1000 - 128 = 872. + // Indices 872 to 999 should be available. + assert_eq!(reader.get_at(872), Some(872)); + assert_eq!(reader.get_at(871), None); +} From fe0288147526c32db7d7ee91ea098acd5b9e3a07 Mon Sep 17 00:00:00 2001 From: Taleh Ibrahimli Date: Thu, 12 Feb 2026 20:41:40 +0100 Subject: [PATCH 5/7] improve tests --- Cargo.lock | 36 +++------- Cargo.toml | 2 +- src/store.rs | 8 +-- tests/store_no_alloc_tests.rs | 119 ++++++++++++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 33 deletions(-) create mode 100644 tests/store_no_alloc_tests.rs diff --git a/Cargo.lock b/Cargo.lock index c3872ad..b0914e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,15 +11,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "alloca" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5a7d05ea6aea7e9e64d25b9156ba2fee3fdd659e34e41063cd2fc7cd020d7f4" -dependencies = [ - "cc", -] - [[package]] name = "android_system_properties" version = "0.1.5" @@ -224,24 +215,25 @@ dependencies = [ [[package]] name = "criterion" -version = "0.8.2" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "950046b2aa2492f9a536f5f4f9a3de7b9e2476e575e05bd6c333371add4d98f3" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" dependencies = [ - "alloca", "anes", "cast", "ciborium", "clap", "criterion-plot", + "is-terminal", "itertools", "num-traits", + "once_cell", "oorandom", - "page_size", "plotters", "rayon", "regex", "serde", + "serde_derive", "serde_json", "tinytemplate", "walkdir", @@ -249,9 +241,9 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.8.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8d80a2f4f5b554395e47b5d8305bc3d27813bacb73493eb1001e8f76dae29ea" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" dependencies = [ "cast", "itertools", @@ -436,9 +428,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.13.0" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" dependencies = [ "either", ] @@ -531,16 +523,6 @@ version = "11.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" -[[package]] -name = "page_size" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "parking_lot" version = "0.12.5" diff --git a/Cargo.toml b/Cargo.toml index 2577875..8067b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ libc = "0.2" # Needed for mlock (memory pinning) and sched_setaffinity [dev-dependencies] assert_no_alloc = { version = "1.1.2" } -criterion = "0.8.2" +criterion = { version = "0.5", features = ["html_reports"] } [lib] bench = false # We use the 'benches/' directory diff --git a/src/store.rs b/src/store.rs index 3fbb60d..bc5f197 100644 --- a/src/store.rs +++ b/src/store.rs @@ -129,10 +129,8 @@ impl StoreReader for CircularStoreReader { return None; // Part of the window has been overwritten } - let mut window = Vec::with_capacity(N); - for i in 0..N { - window.push(*self.storage.read::(offset + i * size_of::())); - } - window.try_into().ok() + Some(std::array::from_fn(|i| { + *self.storage.read::(offset + i * size_of::()) + })) } } diff --git a/tests/store_no_alloc_tests.rs b/tests/store_no_alloc_tests.rs new file mode 100644 index 0000000..2563447 --- /dev/null +++ b/tests/store_no_alloc_tests.rs @@ -0,0 +1,119 @@ +use assert_no_alloc::*; +use roda_state::RodaEngine; +use roda_state::components::{Engine, Store, StoreOptions, StoreReader}; + +#[cfg(debug_assertions)] +#[global_allocator] +static ALLOC: AllocDisabler = AllocDisabler; + +#[test] +fn test_store_push_no_alloc() { + let engine = RodaEngine::new(); + let mut store = engine.store::(StoreOptions { + name: "no_alloc_push", + size: 1024, + in_memory: true, + }); + + assert_no_alloc(|| { + store.push(42); + }); +} + +#[test] +fn test_store_reader_next_no_alloc() { + let engine = RodaEngine::new(); + let mut store = engine.store::(StoreOptions { + name: "no_alloc_next", + size: 1024, + in_memory: true, + }); + store.push(42); + let reader = store.reader(); + + assert_no_alloc(|| { + reader.next(); + }); +} + +#[test] +fn test_store_reader_get_no_alloc() { + let engine = RodaEngine::new(); + let mut store = engine.store::(StoreOptions { + name: "no_alloc_get", + size: 1024, + in_memory: true, + }); + store.push(42); + let reader = store.reader(); + reader.next(); + + assert_no_alloc(|| { + let _ = reader.get(); + }); +} + +#[test] +fn test_store_reader_get_window_no_alloc() { + let engine = RodaEngine::new(); + let mut store = engine.store::(StoreOptions { + name: "no_alloc_window", + size: 1024, + in_memory: true, + }); + store.push(42); + store.push(43); + let reader = store.reader(); + + assert_no_alloc(|| { + let res = reader.get_window::<2>(0).unwrap(); + assert_eq!(res[0], 42); + assert_eq!(res[1], 43); + }); +} + +#[test] +fn test_store_reader_get_at_no_alloc() { + let engine = RodaEngine::new(); + let mut store = engine.store::(StoreOptions { + name: "no_alloc_get_at", + size: 1024, + in_memory: true, + }); + store.push(42); + let reader = store.reader(); + + assert_no_alloc(|| { + let _ = reader.get_at(0); + }); +} + +#[test] +fn test_store_reader_get_last_no_alloc() { + let engine = RodaEngine::new(); + let mut store = engine.store::(StoreOptions { + name: "no_alloc_get_last", + size: 1024, + in_memory: true, + }); + store.push(42); + let reader = store.reader(); + + assert_no_alloc(|| { + let _ = reader.get_last(); + }); +} + +#[test] +fn test_store_direct_index_no_alloc() { + let engine = RodaEngine::new(); + let store = engine.store::(StoreOptions { + name: "no_alloc_direct_index", + size: 1024, + in_memory: true, + }); + + assert_no_alloc(|| { + let _ = store.direct_index::(); + }); +} From 4110a0338febde899740bc9aa6b20429b48a90ea Mon Sep 17 00:00:00 2001 From: Taleh Ibrahimli Date: Thu, 12 Feb 2026 21:13:51 +0100 Subject: [PATCH 6/7] fixes --- benches/store_bench.rs | 10 ++-- examples/hello_world.rs | 8 +-- src/lib.rs | 2 +- src/storage/mmap_journal.rs | 14 +++-- src/store.rs | 9 +++- tests/aggregator_tests.rs | 96 +++++++++++++++++++++++++++-------- tests/index_tests.rs | 66 ++++++++++++++++++++---- tests/store_circular_tests.rs | 10 ++-- tests/window_tests.rs | 77 +++++++++++++++++++++------- 9 files changed, 224 insertions(+), 68 deletions(-) diff --git a/benches/store_bench.rs b/benches/store_bench.rs index bc6e5ec..c7d3763 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -1,8 +1,8 @@ -use std::hint::black_box; -use criterion::{criterion_group, criterion_main, Criterion, Throughput}; +use bytemuck::{Pod, Zeroable}; +use criterion::{Criterion, Throughput, criterion_group, criterion_main}; use roda_state::RodaEngine; use roda_state::components::{Engine, Store, StoreOptions, StoreReader}; -use bytemuck::{Pod, Zeroable}; +use std::hint::black_box; #[derive(Clone, Copy, Zeroable, Pod)] #[repr(C)] @@ -15,7 +15,7 @@ fn bench_push(c: &mut Criterion) { let mut group = c.benchmark_group("push"); // 1GB buffer to ensure we don't overflow during benchmarking - let size = 1024 * 1024 * 1024; + let size = 1024 * 1024 * 1024; let mut store_u64 = engine.store::(StoreOptions { name: "bench_push_u64", size, @@ -127,7 +127,7 @@ fn bench_window(c: &mut Criterion) { black_box(reader.get_window::<10>(black_box(5000))); }); }); - + group.bench_function("get_window_100", |b| { b.iter(|| { black_box(reader.get_window::<100>(black_box(5000))); diff --git a/examples/hello_world.rs b/examples/hello_world.rs index 7db05a9..e21756c 100644 --- a/examples/hello_world.rs +++ b/examples/hello_world.rs @@ -55,13 +55,13 @@ fn main() { in_memory: true, }); let tick_reader = tick_store.reader(); - let mut ohlc_store = engine.store::(StoreOptions{ + let mut ohlc_store = engine.store::(StoreOptions { name: "ohlc", size: 10_000, in_memory: true, }); let ohlc_reader = ohlc_store.reader(); - let mut simple_strategy = engine.store::(StoreOptions{ + let mut simple_strategy = engine.store::(StoreOptions { name: "simple_strategy", size: 10_000, in_memory: true, @@ -71,8 +71,8 @@ fn main() { let ohlc_index = ohlc_store.direct_index::(); // B. PIPELINE - let mut ohlc_pipeline: Aggregator = Aggregator::new(); - let mut simple_strategy_pipeline: Window = Window::new(); + let ohlc_pipeline: Aggregator = Aggregator::new(); + let simple_strategy_pipeline: Window = Window::new(); // C. WORKER engine.run_worker(move || { diff --git a/src/lib.rs b/src/lib.rs index 8b678dc..11f8cdb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,9 @@ pub mod aggregator; pub mod components; pub mod engine; pub mod index; +mod storage; pub mod store; pub mod window; -mod storage; pub use crate::aggregator::Aggregator; pub use crate::engine::RodaEngine; diff --git a/src/storage/mmap_journal.rs b/src/storage/mmap_journal.rs index a16b044..c46c1bf 100644 --- a/src/storage/mmap_journal.rs +++ b/src/storage/mmap_journal.rs @@ -61,11 +61,15 @@ impl MmapRing { // --- Bytemuck Methods --- /// 1. Read (Immutable) + /// /// Casts bytes at offset to a reference of T. pub fn read(&self, offset: usize) -> &T { let actual_offset = offset % self.len; let end = actual_offset + size_of::(); - assert!(end <= self.len, "Read crosses buffer boundary - alignment issue?"); + assert!( + end <= self.len, + "Read crosses buffer boundary - alignment issue?" + ); bytemuck::from_bytes(&self.slice()[actual_offset..end]) } @@ -78,12 +82,16 @@ impl MmapRing { let dest_slice = self.slice_mut(); // Check for boundary crossing - assert!(end <= dest_slice.len(), "Append crosses buffer boundary - alignment issue?"); + assert!( + end <= dest_slice.len(), + "Append crosses buffer boundary - alignment issue?" + ); // Perform the write dest_slice[actual_offset..end].copy_from_slice(bytemuck::bytes_of(state)); - self.write_index.store(current_pos + size, std::sync::atomic::Ordering::Release); + self.write_index + .store(current_pos + size, std::sync::atomic::Ordering::Release); } fn slice(&self) -> &[u8] { diff --git a/src/store.rs b/src/store.rs index bc5f197..fe56cb2 100644 --- a/src/store.rs +++ b/src/store.rs @@ -35,7 +35,12 @@ impl Store for CircularStore { type Reader = CircularStoreReader; fn push(&mut self, state: State) { - assert!(self.storage.len() >= size_of::(), "Store size {} is too small for State size {}", self.storage.len(), size_of::()); + assert!( + self.storage.len() >= size_of::(), + "Store size {} is too small for State size {}", + self.storage.len(), + size_of::() + ); self.storage.append(&state); } @@ -59,7 +64,7 @@ impl StoreReader for CircularStoreReader { let index_to_read = self.next_index.get(); let offset = index_to_read * size_of::(); let write_index = self.storage.get_write_index(); - + if offset + size_of::() > write_index { return false; } diff --git a/tests/aggregator_tests.rs b/tests/aggregator_tests.rs index 0a87162..72f9c23 100644 --- a/tests/aggregator_tests.rs +++ b/tests/aggregator_tests.rs @@ -31,12 +31,20 @@ pub struct GroupKey { #[ignore] fn test_aggregator_count_and_sum() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 1024, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 1024, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); - let mut aggregator: Aggregator = Aggregator::new(); + let aggregator: Aggregator = Aggregator::new(); // Run aggregation inside worker engine.run_worker(move || { @@ -75,12 +83,20 @@ fn test_aggregator_count_and_sum() { #[ignore] fn test_aggregator_min_max_tracking() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 1024, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 1024, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); - let mut aggregator: Aggregator = Aggregator::new(); + let aggregator: Aggregator = Aggregator::new(); // Run aggregation inside worker engine.run_worker(move || { @@ -128,12 +144,20 @@ fn test_aggregator_min_max_tracking() { #[ignore] fn test_aggregator_multiple_partitions() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 1024, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 1024, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); - let mut aggregator: Aggregator = Aggregator::new(); + let aggregator: Aggregator = Aggregator::new(); // Run aggregation inside worker engine.run_worker(move || { @@ -179,12 +203,20 @@ fn test_aggregator_multiple_partitions() { #[ignore] fn test_aggregator_complex_key() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 1024, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 1024, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); - let mut aggregator: Aggregator = Aggregator::new(); + let aggregator: Aggregator = Aggregator::new(); // Run aggregation with complex key inside worker engine.run_worker(move || { @@ -217,12 +249,20 @@ fn test_aggregator_complex_key() { #[ignore] fn test_aggregator_reset_behavior() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 10, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 10, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); - let mut aggregator: Aggregator = Aggregator::new(); + let aggregator: Aggregator = Aggregator::new(); // Run aggregation inside worker engine.run_worker(move || { @@ -267,8 +307,16 @@ fn test_aggregator_reset_behavior() { #[ignore] fn test_aggregator_large_index() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 1024, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 1024, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 1024, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 1024, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); let mut aggregator: Aggregator = Aggregator::new(); @@ -309,12 +357,20 @@ fn test_aggregator_worker_large() { use std::time::Duration; let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 2000, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 2000, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 2000, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 2000, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); - let mut aggregator: Aggregator = Aggregator::new(); + let aggregator: Aggregator = Aggregator::new(); engine.run_worker(move || { source_reader.next(); diff --git a/tests/index_tests.rs b/tests/index_tests.rs index eec757d..a2a7d10 100644 --- a/tests/index_tests.rs +++ b/tests/index_tests.rs @@ -15,7 +15,11 @@ struct ComplexKey { #[ignore] fn test_index_multiple_values() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index = store.direct_index::(); for i in 0..5 { @@ -37,7 +41,11 @@ fn test_index_multiple_values() { #[ignore] fn test_multiple_indices_on_same_store() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index_double = store.direct_index::(); let index_triple = store.direct_index::(); @@ -58,7 +66,11 @@ fn test_multiple_indices_on_same_store() { #[ignore] fn test_index_complex_key() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index = store.direct_index::(); store.push(100); @@ -88,7 +100,11 @@ fn test_index_complex_key() { #[ignore] fn test_index_shallow_clone_sharing() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index = store.direct_index::(); let clone1 = index.reader(); let clone2 = index.reader(); @@ -104,7 +120,11 @@ fn test_index_shallow_clone_sharing() { #[ignore] fn test_index_collision_overwrite() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index = store.direct_index::(); // Both 10 and 20 will map to key 1 @@ -123,7 +143,11 @@ fn test_index_collision_overwrite() { #[ignore] fn test_index_not_found() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index = store.direct_index::(); store.push(10); @@ -138,7 +162,11 @@ fn test_index_not_found() { #[ignore] fn test_concurrent_push_and_index() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index = store.direct_index::(); let index_reader = index.reader(); @@ -168,8 +196,16 @@ fn test_concurrent_push_and_index() { #[ignore] fn test_run_worker_with_multiple_stores() { let engine = RodaEngine::new(); - let mut store_u32 = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); - let mut store_string = engine.store::<[u8; 16]>(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store_u32 = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); + let mut store_string = engine.store::<[u8; 16]>(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index_u32 = store_u32.direct_index::(); let index_string = store_string.direct_index::(); @@ -205,7 +241,11 @@ fn test_run_worker_with_multiple_stores() { #[ignore] fn test_multiple_workers_reading_index_only_original_computes() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index = store.direct_index::(); let reader1 = index.reader(); @@ -230,7 +270,11 @@ fn test_multiple_workers_reading_index_only_original_computes() { #[ignore] fn test_reader_cannot_compute() { let engine = RodaEngine::new(); - let mut store = engine.store::(StoreOptions { name: "test", size: 1024, in_memory: true }); + let mut store = engine.store::(StoreOptions { + name: "test", + size: 1024, + in_memory: true, + }); let index = store.direct_index::(); let _reader = index.reader(); diff --git a/tests/store_circular_tests.rs b/tests/store_circular_tests.rs index 01a4ab3..a72f385 100644 --- a/tests/store_circular_tests.rs +++ b/tests/store_circular_tests.rs @@ -29,7 +29,7 @@ fn test_store_wrap_around() { assert_eq!(reader.get_at(0), None); // index 8 should be present assert_eq!(reader.get_at(8), Some(8)); - + // get_last should be 8 assert_eq!(reader.get_last(), Some(8)); } @@ -64,7 +64,7 @@ fn test_reader_lapping_catch_up() { } // Calling next() should detect lapping and catch up to the oldest available data (index 4) - assert!(reader.next()); + assert!(reader.next()); // It should skip 1, 2, 3 and jump to the oldest available element after being lapped. // My implementation sets it to min_offset / size + 1. // min_offset = 32. new_index = 32 / 8 = 4. next_index set to 5. @@ -87,7 +87,7 @@ fn test_get_window_lapping() { store.push(i as u64); } // Write index is 12*8 = 96. min_offset is 32 (index 4). - + // Window at index 4 (length 4) should be [4, 5, 6, 7] let win = reader.get_window::<4>(4).unwrap(); assert_eq!(win, [4, 5, 6, 7]); @@ -112,9 +112,9 @@ fn test_large_rolling_push() { } assert_eq!(reader.get_last(), Some(999)); - + // The buffer holds last 128 elements. - // 1000 - 128 = 872. + // 1000 - 128 = 872. // Indices 872 to 999 should be available. assert_eq!(reader.get_at(872), Some(872)); assert_eq!(reader.get_at(871), None); diff --git a/tests/window_tests.rs b/tests/window_tests.rs index cf954b2..1f737c4 100644 --- a/tests/window_tests.rs +++ b/tests/window_tests.rs @@ -20,8 +20,16 @@ pub struct Analysis { #[ignore] fn test_window_filling_and_sliding() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 10, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 10, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -69,8 +77,16 @@ fn test_window_filling_and_sliding() { #[ignore] fn test_window_size_one() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 10, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 10, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -112,8 +128,16 @@ fn test_window_size_one() { #[ignore] fn test_window_large_sliding() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 100, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 100, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 100, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 100, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -161,13 +185,17 @@ fn test_window_large_sliding() { #[test] #[ignore] fn test_window_worker_large() { - use std::sync::{Arc, Mutex}; - use std::thread; - use std::time::Duration; - let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 2000, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 2000, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 2000, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 2000, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -210,8 +238,16 @@ fn test_window_worker_large() { #[ignore] fn test_window_max_value() { let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 10, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 10, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); @@ -245,10 +281,17 @@ fn test_window_max_value() { fn test_window_all_none_until_full() { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - let engine = RodaEngine::new(); - let mut source = engine.store::(StoreOptions { name: "source", size: 10, in_memory: true }); - let mut target = engine.store::(StoreOptions { name: "target", size: 10, in_memory: true }); + let mut source = engine.store::(StoreOptions { + name: "source", + size: 10, + in_memory: true, + }); + let mut target = engine.store::(StoreOptions { + name: "target", + size: 10, + in_memory: true, + }); let source_reader = source.reader(); let target_reader = target.reader(); let mut pipeline = Window::new(); From 417e91c6d4db93aa0f0d8e40890b307224e35579 Mon Sep 17 00:00:00 2001 From: Taleh Ibrahimli Date: Thu, 12 Feb 2026 21:55:43 +0100 Subject: [PATCH 7/7] improvements --- src/components.rs | 2 +- src/storage/mmap_journal.rs | 12 ++++++++++++ src/store.rs | 6 ++---- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/components.rs b/src/components.rs index 8d5ed64..b61ebf5 100644 --- a/src/components.rs +++ b/src/components.rs @@ -29,7 +29,7 @@ pub trait StoreReader: Send { fn get(&self) -> Option; fn get_at(&self, at: usize) -> Option; fn get_last(&self) -> Option; - fn get_window(&self, at: usize) -> Option<[State; N]>; + fn get_window(&self, at: usize) -> Option<&[State]>; } pub trait Index { diff --git a/src/storage/mmap_journal.rs b/src/storage/mmap_journal.rs index c46c1bf..e1eba84 100644 --- a/src/storage/mmap_journal.rs +++ b/src/storage/mmap_journal.rs @@ -73,6 +73,18 @@ impl MmapRing { bytemuck::from_bytes(&self.slice()[actual_offset..end]) } + pub(crate) fn read_window(&self, offset: usize) -> &[T] { + let actual_offset = offset % self.len; + let end = actual_offset + size_of::() * N; + assert!( + end <= self.len, + "Read crosses buffer boundary - alignment issue?" + ); + let bytes = &self.slice()[actual_offset..end]; + + bytemuck::cast_slice(bytes) + } + pub fn append(&mut self, state: &T) { let current_pos = self.write_index.load(std::sync::atomic::Ordering::Relaxed); let size = size_of::(); diff --git a/src/store.rs b/src/store.rs index fe56cb2..141b5fc 100644 --- a/src/store.rs +++ b/src/store.rs @@ -124,7 +124,7 @@ impl StoreReader for CircularStoreReader { self.with_last(|s| *s) } - fn get_window(&self, at: usize) -> Option<[State; N]> { + fn get_window(&self, at: usize) -> Option<&[State]> { let offset = at * size_of::(); let write_index = self.storage.get_write_index(); if offset + size_of::() * N > write_index { @@ -134,8 +134,6 @@ impl StoreReader for CircularStoreReader { return None; // Part of the window has been overwritten } - Some(std::array::from_fn(|i| { - *self.storage.read::(offset + i * size_of::()) - })) + Some(self.storage.read_window::(offset)) } }