Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ jobs:
- 'raft-kv-memstore-opendal-snapshot-data'
- 'raft-kv-memstore-singlethreaded'
- 'raft-kv-rocksdb'
- 'multi-raft-kv'
- 'multi-raft-sharding'

steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -401,13 +403,17 @@ jobs:

- name: Test demo script of examples/${{ matrix.example }}
# The script is not meant for testing. Just to ensure it works but do not
# rely on it.
# rely on it. Skip if test-cluster.sh doesn't exist.
if: ${{ matrix.toolchain == 'stable' }}

shell: bash
run: |
cd examples/${{ matrix.example }}
./test-cluster.sh
if [ -f test-cluster.sh ]; then
./test-cluster.sh
else
echo "No test-cluster.sh found, skipping"
fi

- name: Format
# clippy/format produces different result with stable and nightly. Only with nightly.
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ exclude = [
"examples/raft-kv-memstore-opendal-snapshot-data",
"examples/raft-kv-rocksdb",

"examples/multi-raft-kv",
"examples/multi-raft-sharding",

"rt-monoio",
"rt-compio"
"rt-compio",
"multiraft"
]
16 changes: 15 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ test:
cargo test --features serde
# only crate `tests` has single-term-leader feature
cargo test --features single-term-leader -p tests
# multiraft crate tests
cargo test --manifest-path multiraft/Cargo.toml
$(MAKE) test-examples

check-parallel:
Expand All @@ -40,6 +42,8 @@ test-examples:
cargo test --manifest-path examples/raft-kv-memstore-singlethreaded/Cargo.toml
cargo test --manifest-path examples/raft-kv-rocksdb/Cargo.toml
cargo test --manifest-path examples/rocksstore/Cargo.toml
cargo test --manifest-path examples/multi-raft-kv/Cargo.toml
cargo test --manifest-path examples/multi-raft-sharding/Cargo.toml

bench:
cargo bench --features bench
Expand Down Expand Up @@ -75,19 +79,29 @@ guide:

lint:
cargo fmt
cargo fmt --manifest-path multiraft/Cargo.toml
cargo fmt --manifest-path rt-compio/Cargo.toml
cargo fmt --manifest-path rt-monoio/Cargo.toml
cargo fmt --manifest-path examples/mem-log/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-memstore-network-v2/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-memstore-singlethreaded/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-memstore/Cargo.toml
cargo fmt --manifest-path examples/raft-kv-rocksdb/Cargo.toml
cargo fmt --manifest-path examples/multi-raft-kv/Cargo.toml
cargo fmt --manifest-path examples/multi-raft-sharding/Cargo.toml
cargo clippy --no-deps --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/mem-log/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path multiraft/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path rt-compio/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path rt-monoio/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/mem-log/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-memstore-network-v2/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-memstore-singlethreaded/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-memstore/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/raft-kv-rocksdb/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/multi-raft-kv/Cargo.toml --all-targets -- -D warnings
cargo clippy --no-deps --manifest-path examples/multi-raft-sharding/Cargo.toml --all-targets -- -D warnings
# Bug: clippy --all-targets reports false warning about unused dep in
# `[dev-dependencies]`:
# https://github.com/rust-lang/rust/issues/72686#issuecomment-635539688
Expand Down
2 changes: 1 addition & 1 deletion examples/mem-log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[dependencies]
openraft = { path = "../../openraft", features = ["type-alias"] }
openraft = { path = "../../openraft", default-features = false, features = ["type-alias", "tokio-rt"] }

tokio = { version = "1.0", default-features = false, features = ["sync"] }

Expand Down
34 changes: 34 additions & 0 deletions examples/multi-raft-kv/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "multi-raft-kv"
version = "0.1.0"
readme = "README.md"

edition = "2021"
authors = [
"AriesDevil <ariesdevil77@gmail.com>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example Multi-Raft distributed key-value store with 3 groups built upon `openraft`."
homepage = "https://github.com/databendlabs/openraft"
keywords = ["raft", "consensus", "multi-raft"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[dependencies]
mem-log = { path = "../mem-log", features = [] }
openraft = { path = "../../openraft", default-features = false, features = ["serde", "type-alias", "tokio-rt"] }
openraft-multi = { path = "../../multiraft" }

futures = { version = "0.3" }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
tokio = { version = "1", default-features = false, features = ["sync"] }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }


[features]

[package.metadata.docs.rs]
all-features = true

85 changes: 85 additions & 0 deletions examples/multi-raft-kv/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Multi-Raft KV Store Example

This example demonstrates how to use OpenRaft's Multi-Raft support to run multiple independent Raft consensus groups within a single process.

## Overview

The example creates a distributed key-value store with **3 Raft groups**:
- **users** - Stores user data
- **orders** - Stores order data
- **products** - Stores product data

Each group runs its own independent Raft consensus, but they share the same network infrastructure.

## Architecture

```
+-----------------------------------------------------------------------+
| Node 1 |
| +-------------------+ +-------------------+ +-------------------+ |
| | Group "users" | | Group "orders" | | Group "products" | |
| | (Raft Instance) | | (Raft Instance) | | (Raft Instance) | |
| +-------------------+ +-------------------+ +-------------------+ |
| | | | |
| +----------------------+----------------------+ |
| | |
| +--------+--------+ |
| | Router | |
| | (shared network)| |
| +-----------------+ |
+------------------------------------------------------------------------+
|
Network Connection
|
+------------------------------------------------------------------------+
| Node 2 |
| +-------------------+ +-------------------+ +-------------------+ |
| | Group "users" | | Group "orders" | | Group "products" | |
| | (Raft Instance) | | (Raft Instance) | | (Raft Instance) | |
| +-------------------+ +-------------------+ +-------------------+ |
+------------------------------------------------------------------------+
```

## Key Concepts

### GroupId
A string identifier that uniquely identifies each Raft group (e.g., "users", "orders", "products").

### Shared Network
Multiple Raft groups share the same network infrastructure (`Router`), reducing connection overhead. Messages are routed to the correct group using the `group_id`.

### Independent Consensus
Each group runs its own Raft consensus independently:
- Separate log storage
- Separate state machine
- Separate leader election
- Separate membership

## Running the Test

```bash
# Run the integration test
cargo test -p multi-raft-kv test_multi_raft_cluster -- --nocapture

# With debug logging
RUST_LOG=debug cargo test -p multi-raft-kv test_multi_raft_cluster -- --nocapture
```

## Code Structure

```
multi-raft-kv/
├── Cargo.toml
├── README.md
├── src/
│ ├── lib.rs # Type definitions and group constants
│ ├── app.rs # Application handler for each group
│ ├── api.rs # API handlers (read, write, raft operations)
│ ├── network.rs # Network implementation with group routing
│ ├── router.rs # Message router for (node_id, group_id)
│ └── store.rs # State machine storage
└── tests/
└── cluster/
├── main.rs
└── test_cluster.rs # Integration tests
```
111 changes: 111 additions & 0 deletions examples/multi-raft-kv/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;

use openraft::raft::TransferLeaderRequest;
use openraft::BasicNode;
use openraft::ReadPolicy;

use crate::app::GroupApp;
use crate::decode;
use crate::encode;
use crate::typ::*;
use crate::NodeId;

/// Write a key-value pair to the group's state machine
pub async fn write(app: &mut GroupApp, req: String) -> String {
let res = app.raft.client_write(decode(&req)).await;
encode(res)
}

/// Read a value from the group's state machine using linearizable read
pub async fn read(app: &mut GroupApp, req: String) -> String {
let key: String = decode(&req);

let ret = app.raft.get_read_linearizer(ReadPolicy::ReadIndex).await;

let res = match ret {
Ok(linearizer) => {
linearizer.await_ready(&app.raft).await.unwrap();

let state_machine = app.state_machine.state_machine.lock().await;
let value = state_machine.data.get(&key).cloned();

let res: Result<String, RaftError<LinearizableReadError>> = Ok(value.unwrap_or_default());
res
}
Err(e) => Err(e),
};
encode(res)
}

// ============================================================================
// Raft Protocol API
// ============================================================================

/// Handle vote request
pub async fn vote(app: &mut GroupApp, req: String) -> String {
let res = app.raft.vote(decode(&req)).await;
encode(res)
}

/// Handle append entries request
pub async fn append(app: &mut GroupApp, req: String) -> String {
let res = app.raft.append_entries(decode(&req)).await;
encode(res)
}

/// Receive a snapshot and install it
pub async fn snapshot(app: &mut GroupApp, req: String) -> String {
let (vote, snapshot_meta, snapshot_data): (Vote, SnapshotMeta, SnapshotData) = decode(&req);
let snapshot = Snapshot {
meta: snapshot_meta,
snapshot: snapshot_data,
};
let res = app.raft.install_full_snapshot(vote, snapshot).await.map_err(RaftError::<Infallible>::Fatal);
encode(res)
}

/// Handle transfer leader request
pub async fn transfer_leader(app: &mut GroupApp, req: String) -> String {
let transfer_req: TransferLeaderRequest<crate::TypeConfig> = decode(&req);
let res = app.raft.handle_transfer_leader(transfer_req).await;
encode(res)
}

// ============================================================================
// Management API
// ============================================================================

/// Add a node as **Learner** to this group.
///
/// This should be done before adding a node as a member into the cluster
/// (by calling `change-membership`)
pub async fn add_learner(app: &mut GroupApp, req: String) -> String {
let node_id: NodeId = decode(&req);
let node = BasicNode { addr: "".to_string() };
let res = app.raft.add_learner(node_id, node, true).await;
encode(res)
}

/// Changes specified learners to members, or remove members from this group.
pub async fn change_membership(app: &mut GroupApp, req: String) -> String {
let node_ids: BTreeSet<NodeId> = decode(&req);
let res = app.raft.change_membership(node_ids, false).await;
encode(res)
}

/// Initialize a single-node cluster for this group.
pub async fn init(app: &mut GroupApp) -> String {
let mut nodes = BTreeMap::new();
nodes.insert(app.node_id, BasicNode { addr: "".to_string() });
let res = app.raft.initialize(nodes).await;
encode(res)
}

/// Get the latest metrics of this Raft group
pub async fn metrics(app: &mut GroupApp) -> String {
let metrics = app.raft.metrics().borrow().clone();

let res: Result<RaftMetrics, Infallible> = Ok(metrics);
encode(res)
}
Loading