Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
626 changes: 425 additions & 201 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ description = "A wait-free, cache-friendly state synchronization engine for HFT.
authors = ["Your Name"]

[dependencies]
spdlog-rs = "0.5.2"
rand = "0.10.0-rc.6"
bytemuck = {version = "1.25.0", features = ["derive"]}
bytemuck = { version = "1.25.0", features = ["derive"] }
memmap2 = "0.9.9"
thiserror = "2.0.18"
crossbeam-utils = "0.8.21"
crossbeam-skiplist = "0.1"
libc = "0.2" # Needed for mlock (memory pinning) and sched_setaffinity
clap = { version = "4.5.57", features = ["derive"] }
hdrhistogram = "7.5"
spdlog-rs = "0.5.2"

[dev-dependencies]
assert_no_alloc = { version = "1.1.2" }
criterion = { version = "0.5", features = ["html_reports"] }
criterion = { version = "0.8.2", features = ["html_reports"] }
dbn = { version = "0.48.0" }
clap = { version = "4.0", features = ["derive"] }

[lib]
bench = false # We use the 'benches/' directory
Expand All @@ -27,7 +27,7 @@ name = "store_bench"
harness = false

[[bench]]
name = "comprehensive_bench"
name = "sensor_bench"
harness = false

[profile.profiling]
Expand Down
159 changes: 92 additions & 67 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Roda

Ultra-high-performance, low-latency state computer for real-time analytics and trading systems. Roda lets you build
Ultra-high-performance, low-latency state computer for real-time analytics and event-driven systems. Roda lets you build
deterministic streaming pipelines with cache-friendly dataflows, wait-free reads, and explicit memory bounds—ideal for
HFT, market microstructure research, telemetry, and any workload where microseconds matter.
IoT, telemetry, industrial automation, and any workload where microseconds matter.

> Status: Early design and API preview. Examples and tests illustrate the intended DX. Expect rapid iteration and
> breaking changes.
Expand Down Expand Up @@ -85,134 +85,159 @@ roda-state = { path = "." }
Run the example:

```bash
cargo run --example hello_world
cargo run --example sensor_test
```

## Example: From Ticks to OHLC to Trading Signals
## Example: From Sensor Readings to Summaries to Alerts

Below is a trimmed version of `examples/hello_world.rs` that demonstrates a two-stage pipeline: aggregate ticks into OHLC candles, then derive a simple momentum signal via a sliding window.
Below is a trimmed version of `examples/sensor_test.rs` that demonstrates a two-stage pipeline: aggregate raw sensor readings into statistical summaries, then derive alerts when anomalies are detected via a sliding window.

```rust
use bytemuck::{Pod, Zeroable};
use roda_state::components::{Engine, Index, Store, StoreOptions, StoreReader};
use roda_state::{Aggregator, RodaEngine, Window};
use std::thread;
use std::time::Duration;

#[repr(C)]
#[derive(Clone, Copy, Default, Pod, Zeroable)]
struct Tick {
symbol: u64,
price: f64,
struct Reading {
sensor_id: u64,
value: f64,
timestamp: u64,
}

impl Reading {
fn from(sensor_id: u64, value: f64, timestamp: u64) -> Self {
Self { sensor_id, value, timestamp }
}
}


#[repr(C)]
#[derive(Clone, Copy, Default, Pod, Zeroable)]
struct OHLC {
symbol: u64,
open: f64,
high: f64,
low: f64,
close: f64,
struct Summary {
sensor_id: u64,
min: f64,
max: f64,
avg: f64,
count: u64,
timestamp: u64,
}

#[repr(C)]
#[derive(Clone, Copy, Default, Pod, Zeroable)]
struct Signal {
symbol: u64,
struct Alert {
sensor_id: u64,
timestamp: u64,
direction: i32,
size: u32,
severity: i32,
_pad0: i32,
}

#[derive(Clone, Copy, PartialEq, Eq, Hash, Pod, Zeroable)]
#[repr(C)]
struct TimeKey {
symbol: u64,
struct SensorKey {
sensor_id: u64,
timestamp: u64,
}

fn main() {
let engine = RodaEngine::new();

// Allocate bounded stores (explicit memory profile)
let tick_store = engine.store::<Tick>(StoreOptions {
name: "ticks",
// 1. Allocate bounded stores
let mut reading_store = engine.store::<Reading>(StoreOptions {
name: "readings",
size: 1_000_000,
in_memory: true,
});
let tick_reader = tick_store.reader();
let mut ohlc_store = engine.store::<OHLC>(StoreOptions {
name: "ohlc",
let reading_reader = reading_store.reader();

let mut summary_store = engine.store::<Summary>(StoreOptions {
name: "summaries",
size: 10_000,
in_memory: true,
});
let ohlc_reader = ohlc_store.reader();
let mut signal_store = engine.store::<Signal>(StoreOptions {
name: "signals",
let summary_reader = summary_store.reader();

let mut alert_store = engine.store::<Alert>(StoreOptions {
name: "alerts",
size: 10_000,
in_memory: true,
});
let alert_reader_for_print = alert_store.reader();

// Index to locate candles by (symbol, time)
let ohlc_index = ohlc_store.direct_index::<TimeKey>();
let summary_index = summary_store.direct_index::<SensorKey>();

// Declare pipelines
let mut ohlc_pipeline: Aggregator<Tick, OHLC, TimeKey> = Aggregator::new();
let mut strategy_pipeline: Window<OHLC, Signal> = Window::new();
// 2. Declare pipelines
let summary_pipeline: Aggregator<Reading, Summary, SensorKey> = Aggregator::new();
let alert_pipeline: Window<Summary, Alert> = Window::new();

// Worker 1: aggregate ticks -> OHLC and maintain index
// 3. Worker 1: aggregate readings -> summaries and maintain index
engine.run_worker(move || {
tick_reader.next();
ohlc_pipeline
.from(&tick_reader)
.to(&mut ohlc_store)
.partition_by(|t| TimeKey {
symbol: t.symbol,
timestamp: t.timestamp / 100_000
reading_reader.next();
summary_pipeline
.from(&reading_reader)
.to(&mut summary_store)
.partition_by(|r| SensorKey {
sensor_id: r.sensor_id,
timestamp: r.timestamp / 100_000
})
.reduce(|i, t, c| {
.reduce(|i, r, s| {
if i == 0 {
c.open = t.price;
c.high = t.price;
c.low = t.price;
c.close = t.price;
c.symbol = t.symbol;
c.timestamp = (t.timestamp / 100_000) * 100_000;
*s = Summary {
sensor_id: r.sensor_id,
min: r.value, max: r.value, avg: r.value, count: 1,
timestamp: (r.timestamp / 100_000) * 100_000,
};
} else {
c.high = c.high.max(t.price);
c.low = c.low.min(t.price);
c.close = t.price;
s.min = s.min.min(r.value);
s.max = s.max.max(r.value);
s.avg = (s.avg * s.count as f64 + r.value) / (s.count + 1) as f64;
s.count += 1;
}
});

ohlc_index.compute(|c| TimeKey {
symbol: c.symbol,
timestamp: c.timestamp / 100_000
summary_index.compute(|s| SensorKey {
sensor_id: s.sensor_id,
timestamp: s.timestamp / 100_000
});
});

// Worker 2: 2-bar momentum signal
// 4. Worker 2: alert on average jumps
engine.run_worker(move || {
ohlc_reader.next();
strategy_pipeline
.from(&ohlc_reader)
.to(&mut signal_store)
summary_reader.next();
alert_pipeline
.from(&summary_reader)
.to(&mut alert_store)
.reduce(2, |w| {
let prev = w[0];
let cur = w[1];
(cur.close > prev.close).then(|| Signal {
symbol: cur.symbol,
let (prev, cur) = (w[0], w[1]);
(cur.avg > prev.avg * 1.5).then(|| Alert {
sensor_id: cur.sensor_id,
timestamp: cur.timestamp,
direction: 1,
size: ((cur.close - prev.close) as u32).min(100)
severity: 1,
..Default::default()
})
});
});

// 5. Data Ingestion
reading_store.push(Reading::from(1, 10.0, 10_000));
reading_store.push(Reading::from(1, 12.0, 20_000));
reading_store.push(Reading::from(1, 20.0, 110_000));
reading_store.push(Reading::from(1, 22.0, 120_000));

thread::sleep(Duration::from_millis(100));

// 6. Print Results
while alert_reader_for_print.next() {
if let Some(a) = alert_reader_for_print.get() {
println!("{:?}", a);
}
}
}
```

Explore the full example in `examples/hello_world.rs` for more context.
Explore the full example in `examples/sensor_test.rs` for more context.

## Contributing

Expand Down
Loading