Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bytemuck = {version = "1.25.0", features = ["derive"]}
memmap2 = "0.9.9"
thiserror = "2.0.18"
crossbeam-utils = "0.8.21"
crossbeam-skiplist = "0.1"
libc = "0.2" # Needed for mlock (memory pinning) and sched_setaffinity

[dev-dependencies]
Expand All @@ -25,6 +26,10 @@ bench = false # We use the 'benches/' directory
name = "store_bench"
harness = false

[[bench]]
name = "comprehensive_bench"
harness = false

[profile.profiling]
inherits = "release"
debug = true
11 changes: 6 additions & 5 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,22 @@ Workers execute user pipelines in a continuous loop using an **Adaptive Backoff

### 2.2 The Store (The Source of Truth)

The `CircularRodaStore<T>` is a fixed-capacity circular buffer backed by memory-mapped files.
The `StoreJournal<T>` is a fixed-capacity append-only buffer backed by memory-mapped files.

* **Memory Layout:** `[ Header (Atomics) | Data Region (T...) | Padding ]`.
* **Write Model:** **Single Writer**. Only one thread (the owner of the `Store` handle) can write, eliminating
write-side contention.
* **Read Model:** **Multiple Readers**. Each reader (or worker) uses an independent `CircularRodaStoreReader<T>` handle
* **Read Model:** **Multiple Readers**. Each reader (or worker) uses an independent `StoreJournalReader<T>` handle
that maintains its own
state (cursor).
* **Addressing:** Data is addressed by a monotonic `u64` sequence number (`Cursor`). The physical address is
`(Cursor % Capacity) * sizeof(T)`.
`Cursor * sizeof(T)`.
* **Full Buffer Policy:** If the store is full, it will panic on the next `push`. No wrapping or overwriting occurs.

### 2.3 StoreReader & Traits

Roda uses traits to define the behavior of stores and readers, allowing for different implementations (like the default
`CircularRodaStore`).
`StoreJournal`).

* **Store Trait:** Defines `push`, `reader`, and `direct_index`.
* **StoreReader Trait:** Defines `next`, `with`, `with_at`, `with_last`, `get`, `get_at`, `get_last`, and `get_window`.
Expand Down Expand Up @@ -115,7 +116,7 @@ To guarantee performance and zero-copy safety, Roda imposes several constraints:

Synchronization is achieved without locks using `Acquire/Release` semantics:

* **Writer:** `buffer[cursor % cap] = data; cursor.store(new_val, Release);`
* **Writer:** `buffer[cursor] = data; cursor.store(new_val, Release);`
* **Reader:** `while cursor.load(Acquire) > local_cursor { process(); local_cursor++; }`

This ensures that when the reader sees the updated cursor, it is guaranteed to see the data written by the writer.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ HFT, market microstructure research, telemetry, and any workload where microseco

## Why Roda?

- Deterministic performance: Explicit store sizes, preallocated ring buffers, back-pressure free write path by design goals.
- Deterministic performance: Explicit store sizes, preallocated buffers, back-pressure free write path by design goals.
- Low latency by construction: Reader APIs are designed for zero/constant allocations and predictable access patterns.
- Declarative pipelines: Express processing in terms of partitions, reductions, and sliding windows.
- Indexable state: Build direct indexes for O(1) lookups into rolling state.
Expand All @@ -20,7 +20,7 @@ HFT, market microstructure research, telemetry, and any workload where microseco
## Core Concepts

- **Engine:** Orchestrates workers (long-lived tasks) that advance your pipelines.
- **Store<T>:** A bounded, cache-friendly ring buffer that holds your state. You choose the capacity up front.
- **Store<T>:** A bounded, cache-friendly append-only buffer that holds your state. You choose the capacity up front.
- `push(value)`: Append a new item (typically by a single writer thread).
- `reader()`: Returns a `StoreReader` view appropriate for consumers.
- `direct_index<Key>()`: Build a secondary index over the store.
Expand Down Expand Up @@ -60,7 +60,7 @@ Roda is designed as a **Shared-Memory, Single-Writer Multi-Reader (SWMR)** syste

## Features

- **Blazing Fast:** Designed for microsecond-level latency using memory-mapped ring buffers.
- **Blazing Fast:** Designed for microsecond-level latency using memory-mapped buffers.
- **Zero-Copy:** Data is borrowed directly from shared memory regions; no unnecessary allocations on the hot path.
- **Lock-Free:** Single-Writer Multi-Reader (SWMR) pattern with atomic coordination.
- **Deterministic:** Explicit memory management and pre-allocated stores prevent GC pauses or unexpected heap allocations.
Expand Down
264 changes: 264 additions & 0 deletions benches/comprehensive_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
use bytemuck::{Pod, Zeroable};
use criterion::{Criterion, black_box, criterion_group, criterion_main};
use roda_state::components::{Engine, Index, IndexReader, Store, StoreOptions, StoreReader};
use roda_state::{Aggregator, RodaEngine, Window};

#[derive(Clone, Copy, Zeroable, Pod, Default)]
#[repr(C)]
struct RawData {
id: u32,
_pad: u32,
value: f64,
}

#[derive(Clone, Copy, Zeroable, Pod, Default)]
#[repr(C)]
struct AggregatedData {
id: u32,
_pad: u32,
sum: f64,
count: u64,
}

fn bench_index(c: &mut Criterion) {
let engine = RodaEngine::new();
let mut group = c.benchmark_group("index");

let size = 16 * 1024 * 1024 * 1024;
let mut store = engine.store::<RawData>(StoreOptions {
name: "bench_index_store",
size,
in_memory: true,
});

// Fill data
for i in 0..10000 {
store.push(RawData {
id: i as u32,
value: i as f64,
..Default::default()
});
}

let index = store.direct_index::<u32>();

group.bench_function("index_compute_10k", |b| {
b.iter(|| {
let reader = store.reader();
let index = store.direct_index::<u32>();
while reader.next() {
index.compute(|data| data.id);
}
});
});

// Pre-compute index for lookup bench
let reader = store.reader();
while reader.next() {
index.compute(|data| data.id);
}
let index_reader = index.reader();

group.bench_function("index_lookup", |b| {
let mut i = 0u32;
b.iter(|| {
black_box(index_reader.get(&(i % 10000)));
i += 1;
});
});

group.bench_function("index_incremental_compute", |b| {
let mut i = 10000u32;
let reader = store.reader();
// Skip already pushed
for _ in 0..10000 {
reader.next();
}

b.iter(|| {
store.push(RawData {
id: i,
value: i as f64,
..Default::default()
});
reader.next();
index.compute(|data| data.id);
i += 1;
});
});

group.finish();
}

fn bench_aggregator(c: &mut Criterion) {
let engine = RodaEngine::new();
let mut group = c.benchmark_group("aggregator");

for num_partitions in [10, 100, 1000] {
let mut source = engine.store::<RawData>(StoreOptions {
name: "bench_agg_source",
size: 8 * 1024 * 1024 * 1024,
in_memory: true,
});
let mut target = engine.store::<AggregatedData>(StoreOptions {
name: "bench_agg_target",
size: 8 * 1024 * 1024 * 1024,
in_memory: true,
});

let source_reader = source.reader();
let aggregator: Aggregator<RawData, AggregatedData, u32> = Aggregator::new();

group.bench_function(
format!("aggregator_reduce_step_{}_partitions", num_partitions),
|b| {
let mut i = 0u32;
b.iter(|| {
source.push(RawData {
id: i % num_partitions,
value: 1.0,
..Default::default()
});
source_reader.next();
aggregator
.from(&source_reader)
.to(&mut target)
.partition_by(|r| r.id)
.reduce(|_idx, r, s| {
s.id = r.id;
s.sum += r.value;
s.count += 1;
});
i += 1;
});
},
);
}

group.finish();
}

fn bench_window(c: &mut Criterion) {
let engine = RodaEngine::new();
let mut group = c.benchmark_group("window_component");

let size = 8 * 1024 * 1024 * 1024;
let mut source = engine.store::<RawData>(StoreOptions {
name: "bench_window_source",
size,
in_memory: true,
});
let mut target = engine.store::<RawData>(StoreOptions {
name: "bench_window_target",
size,
in_memory: true,
});

let source_reader = source.reader();
let window: Window<RawData, RawData> = Window::new();

for window_size in [10, 100] {
group.bench_function(format!("window_reduce_size_{}", window_size), |b| {
let mut i = 0u32;
b.iter(|| {
source.push(RawData {
id: i,
value: i as f64,
..Default::default()
});
source_reader.next();
window
.from(&source_reader)
.to(&mut target)
.reduce(window_size, |data| {
let sum: f64 = data.iter().map(|d| d.value).sum();
Some(RawData {
id: data.last().unwrap().id,
value: sum / data.len() as f64,
..Default::default()
})
});
i += 1;
});
});
}

group.finish();
}

fn bench_mixed(c: &mut Criterion) {
let engine = RodaEngine::new();
let mut group = c.benchmark_group("mixed_pipeline");

let size = 8 * 1024 * 1024 * 1024;
let mut s1 = engine.store::<RawData>(StoreOptions {
name: "mixed_s1",
size,
in_memory: true,
});
let mut s2 = engine.store::<AggregatedData>(StoreOptions {
name: "mixed_s2",
size,
in_memory: true,
});
let mut s3 = engine.store::<AggregatedData>(StoreOptions {
name: "mixed_s3",
size,
in_memory: true,
});

let r1 = s1.reader();
let r2 = s2.reader();

let aggregator: Aggregator<RawData, AggregatedData, u32> = Aggregator::new();
let window: Window<AggregatedData, AggregatedData> = Window::new();

group.bench_function("mixed_push_agg_window", |b| {
let mut i = 0u32;
b.iter(|| {
// Push to S1
s1.push(RawData {
id: i % 10,
value: 1.0,
..Default::default()
});

// Aggregator: S1 -> S2
r1.next();
aggregator
.from(&r1)
.to(&mut s2)
.partition_by(|r| r.id)
.reduce(|_idx, r, s| {
s.id = r.id;
s.sum += r.value;
s.count += 1;
});

// Window: S2 -> S3
r2.next();
window.from(&r2).to(&mut s3).reduce(5, |data| {
let sum: f64 = data.iter().map(|d| d.sum).sum();
Some(AggregatedData {
id: 0, // Mixed
sum,
count: data.iter().map(|d| d.count).sum(),
..Default::default()
})
});

i += 1;
});
});

group.finish();
}

criterion_group!(
benches,
bench_index,
bench_aggregator,
bench_window,
bench_mixed
);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn bench_push(c: &mut Criterion) {
let mut group = c.benchmark_group("push");

// 1GB buffer to ensure we don't overflow during benchmarking
let size = 1024 * 1024 * 1024;
let size = 16 * 1024 * 1024 * 1024;
let mut store_u64 = engine.store::<u64>(StoreOptions {
name: "bench_push_u64",
size,
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct OHLC {
}

#[repr(C)]
#[derive(Debug, Clone, Copy, Default, Pod, Zeroable)]
#[derive(Debug, Clone, Copy, Default, Pod, Zeroable, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TimeKey {
pub symbol: u64,
pub timestamp: u64,
Expand Down
2 changes: 1 addition & 1 deletion scripts/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -e
echo "Running rustfmt..."
cargo fmt --all --check

echo "Running clippy..." // temporary disabled, slows down active development, will be reenabled
echo "Running clippy..."
cargo clippy -- -D warnings

echo "Running tests..."
Expand Down
Loading