Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 9 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ libc = "0.2" # Needed for mlock (memory pinning) and sched_setaffinity

[dev-dependencies]
assert_no_alloc = { version = "1.1.2" }
criterion = "0.8.2"
criterion = { version = "0.5", features = ["html_reports"] }

[lib]
bench = false # We use the 'benches/' directory

[[bench]]
name = "store_bench"
harness = false

[profile.profiling]
inherits = "release"
debug = true
154 changes: 95 additions & 59 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,52 +1,55 @@
# Roda

Ultrahighperformance, lowlatency state computer for realtime analytics and trading systems. Roda lets you build
deterministic streaming pipelines with cachefriendly dataflows, waitfree reads, and explicit memory boundsideal for
Ultra-high-performance, low-latency state computer for real-time analytics and trading systems. Roda lets you build
deterministic streaming pipelines with cache-friendly dataflows, wait-free reads, and explicit memory boundsideal for
HFT, market microstructure research, telemetry, and any workload where microseconds matter.

> Status: early design and API preview. Examples and tests illustrate the intended DX. Expect rapid iteration and
> Status: Early design and API preview. Examples and tests illustrate the intended DX. Expect rapid iteration and
> breaking changes.

---

## Why Roda?

- Deterministic performance: explicit store sizes, preallocated ring buffers, back‑pressure free write path by design
goals.
- Low latency by construction: reader APIs are designed for zero/constant allocations and predictable access patterns.
- Declarative pipelines: express processing in terms of partitions, reductions, and sliding windows.
- Indexable state: build direct indexes for O(1) lookups into rolling state.
- Simple concurrency model: long‑lived workers with single‑writer/multi‑reader patterns.

## Core concepts

- Engine: orchestrates workers (long‑lived tasks) that advance your pipelines.
- Store<T>: a bounded, cache‑friendly ring buffer that holds your state. You choose the capacity up front.
- push(value): append a new item (typically by a single writer thread)
- reader(): returns a `StoreReader` view appropriate for consumers
- direct_index<Key>(): build a secondary index over the store
- StoreReader<T>: a cursor‑based handle for consuming state from a `Store`.
- next(): advance the cursor to the next available item
- get(), get_at(at), get_last(): retrieve a copy of the state
- get_window::<N>(at): retrieve a fixed‑size window of state
- with(|state| ...), with_at(at, |state| ...), with_last(|state| ...): execute a closure with a borrowed reference
- Aggregator<In, Out, Key = ()>: a partitioned reducer for turning event streams into rolling state.
- from(&reader): set the input source
- to(&mut store): set the output target
- partition_by(|in| Key): assign each input to a partition
- reduce(|idx, in, out| ...): merge an input into the current output for its partition; idx is 0‑based within the
partition window
- Window<In, Out>: a fixed‑size sliding window over the input store.
- from(&reader): set the input source
- to(&mut store): set the output target
- reduce(window_size, |window: &[In]| -> Option<Out>): compute optional output when the window is advanced
- DirectIndex<Key, Value>: build and query secondary indexes over a store for O(1) state lookups.
- compute(|value| Key): manually update the index for the next available item in the store (typically called inside a worker)
- Deterministic performance: Explicit store sizes, preallocated ring buffers, back-pressure free write path by design goals.
- Low latency by construction: Reader APIs are designed for zero/constant allocations and predictable access patterns.
- Declarative pipelines: Express processing in terms of partitions, reductions, and sliding windows.
- Indexable state: Build direct indexes for O(1) lookups into rolling state.
- Simple concurrency model: Long-lived workers with single-writer/multi-reader patterns.

## Core Concepts

- **Engine:** Orchestrates workers (long-lived tasks) that advance your pipelines.
- **Store<T>:** A bounded, cache-friendly ring buffer that holds your state. You choose the capacity up front.
- `push(value)`: Append a new item (typically by a single writer thread).
- `reader()`: Returns a `StoreReader` view appropriate for consumers.
- `direct_index<Key>()`: Build a secondary index over the store.
- **StoreReader<T>:** A cursor-based handle for consuming state from a `Store`.
- `next()`: Advance the cursor to the next available item.
- `get()`, `get_at(at)`, `get_last()`: Retrieve a copy of the state.
- `get_window::<N>(at)`: Retrieve a fixed-size window of state.
- `with(|state| ...)`, `with_at(at, |state| ...)`, `with_last(|state| ...)`: Execute a closure with a borrowed reference.
- **Aggregator<In, Out, Key = ()>:** A partitioned reducer for turning event streams into rolling state.
- `from(&reader)`: Set the input source.
- `to(&mut store)`: Set the output target.
- `partition_by(|in| Key)`: Assign each input to a partition.
- `reduce(|idx, in, out| ...)`: Merge an input into the current output for its partition; `idx` is 0-based within the partition window.
- **Window<In, Out>:** A fixed-size sliding window over the input store.
- `from(&reader)`: Set the input source.
- `to(&mut store)`: Set the output target.
- `reduce(window_size, |window: &[In]| -> Option<Out>)`: Compute optional output when the window is advanced.
- **DirectIndex<Key, Value>:** Build and query secondary indexes over a store for O(1) state lookups.
- `compute(|value| Key)`: Manually update the index for the next available item in the store (typically called inside a worker).

---

For a deep dive into Roda's memory model, zero-copy internals, and execution patterns, see [DESIGN.md](DESIGN.md).

- **Shared-Nothing Strategy:** While data is shared for efficiency, workers maintain independent logic and state to avoid contention.
- **Microsecond Precision:** Built specifically for systems where every microsecond of jitter impacts the bottom line.
- **Cache-Friendly:** Data layout is optimized for CPU cache lines, minimizing cache misses during pipeline execution.
- **Built-in Indexing:** O(1) secondary lookups without the overhead of a general-purpose database.

## Architecture at a Glance

Roda is designed as a **Shared-Memory, Single-Writer Multi-Reader (SWMR)** system:
Expand All @@ -55,38 +58,51 @@ Roda is designed as a **Shared-Memory, Single-Writer Multi-Reader (SWMR)** syste
- **Deterministic:** Memory is pre-allocated; no allocations on the hot path.
- **Declarative:** Pipelines are built by connecting `Store`, `Aggregator`, and `Window` primitives.

## Quick start
## Features

Using the crate directly:
- **Blazing Fast:** Designed for microsecond-level latency using memory-mapped ring buffers.
- **Zero-Copy:** Data is borrowed directly from shared memory regions; no unnecessary allocations on the hot path.
- **Lock-Free:** Single-Writer Multi-Reader (SWMR) pattern with atomic coordination.
- **Deterministic:** Explicit memory management and pre-allocated stores prevent GC pauses or unexpected heap allocations.
- **Declarative API:** Build complex data processing pipelines using `Aggregator`, `Window`, and `Index` primitives.

```bash
# Run the end‑to‑end example
cargo run --example hello_world
## Quick Start

Add `roda-state` to your `Cargo.toml`:

```toml
[dependencies]
roda-state = "0.1"
```

Or add roda‑state to your own Cargo.toml while working from this repository:
Or if you're working from this repository:

```toml
[dependencies]
roda-state = { path = "." }
```

## Example: from ticks to OHLC to trading signals
Run the example:

Below is a trimmed version of examples/hello_world.rs that demonstrates a two‑stage pipeline: aggregate ticks into OHLC
candles, then derive a simple momentum signal via a sliding window.
```bash
cargo run --example hello_world
```

## Example: From Ticks to OHLC to Trading Signals

Below is a trimmed version of `examples/hello_world.rs` that demonstrates a two-stage pipeline: aggregate ticks into OHLC candles, then derive a simple momentum signal via a sliding window.

```rust
use bytemuck::{Pod, Zeroable};
use roda_state::components::{Index, Store, StoreReader};
use roda_state::components::{Engine, Index, Store, StoreOptions, StoreReader};
use roda_state::{Aggregator, RodaEngine, Window};

#[repr(C)]
#[derive(Clone, Copy, Default, Pod, Zeroable)]
struct Tick {
symbol: u64,
price: f64,
timestamp: u64
timestamp: u64,
}

#[repr(C)]
Expand All @@ -97,7 +113,7 @@ struct OHLC {
high: f64,
low: f64,
close: f64,
timestamp: u64
timestamp: u64,
}

#[repr(C)]
Expand All @@ -106,24 +122,37 @@ struct Signal {
symbol: u64,
timestamp: u64,
direction: i32,
size: u32
size: u32,
}

#[derive(Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Pod, Zeroable)]
#[repr(C)]
struct TimeKey {
symbol: u64,
timestamp: u64
timestamp: u64,
}

fn main() {
let engine = RodaEngine::new();

// Allocate bounded stores (explicit memory profile)
let tick_store = engine.store::<Tick>(1_000_000);
let tick_store = engine.store::<Tick>(StoreOptions {
name: "ticks",
size: 1_000_000,
in_memory: true,
});
let tick_reader = tick_store.reader();
let mut ohlc_store = engine.store::<OHLC>(10_000);
let mut ohlc_store = engine.store::<OHLC>(StoreOptions {
name: "ohlc",
size: 10_000,
in_memory: true,
});
let ohlc_reader = ohlc_store.reader();
let mut signal_store = engine.store::<Signal>(10_000);
let mut signal_store = engine.store::<Signal>(StoreOptions {
name: "signals",
size: 10_000,
in_memory: true,
});

// Index to locate candles by (symbol, time)
let ohlc_index = ohlc_store.direct_index::<TimeKey>();
Expand All @@ -132,13 +161,16 @@ fn main() {
let mut ohlc_pipeline: Aggregator<Tick, OHLC, TimeKey> = Aggregator::new();
let mut strategy_pipeline: Window<OHLC, Signal> = Window::new();

// Worker 1: aggregate ticks OHLC and maintain index
// Worker 1: aggregate ticks -> OHLC and maintain index
engine.run_worker(move || {
tick_reader.next();
ohlc_pipeline
.from(&tick_reader)
.to(&mut ohlc_store)
.partition_by(|t| TimeKey { symbol: t.symbol, timestamp: t.timestamp / 100_000 })
.partition_by(|t| TimeKey {
symbol: t.symbol,
timestamp: t.timestamp / 100_000
})
.reduce(|i, t, c| {
if i == 0 {
c.open = t.price;
Expand All @@ -153,10 +185,14 @@ fn main() {
c.close = t.price;
}
});
ohlc_index.compute(|c| TimeKey { symbol: c.symbol, timestamp: c.timestamp / 100_000 });

ohlc_index.compute(|c| TimeKey {
symbol: c.symbol,
timestamp: c.timestamp / 100_000
});
});

// Worker 2: 2bar momentum signal
// Worker 2: 2-bar momentum signal
engine.run_worker(move || {
ohlc_reader.next();
strategy_pipeline
Expand All @@ -176,14 +212,14 @@ fn main() {
}
```

Explore the full example in examples/hello_world.rs for more context.
Explore the full example in `examples/hello_world.rs` for more context.

## Contributing

Contributions are welcome! If you have ideas, issues, or benchmarks:

- Open an issue to discuss the usecase and constraints
- Keep PRs focused and measured; include microbenchmarks when changing hot paths
- Open an issue to discuss the use-case and constraints
- Keep PRs focused and measured; include micro-benchmarks when changing hot paths
- Follow the existing code style and formatting

## License
Expand Down
Loading