Skip to content

feat(cluster): Integrating Open Raft logic into the Redis command flow#244

Open
guozhihao-224 wants to merge 1 commit intofeat/raftfrom
feat/perfect-raft
Open

feat(cluster): Integrating Open Raft logic into the Redis command flow#244
guozhihao-224 wants to merge 1 commit intofeat/raftfrom
feat/perfect-raft

Conversation

@guozhihao-224
Copy link
Collaborator

@guozhihao-224 guozhihao-224 commented Mar 14, 2026

Summary by CodeRabbit

  • New Features

    • Added multi-node cluster configuration and orchestration scripts to start, stop, and test a 3-node cluster
    • Added data-dir configuration option for customizing storage location
    • Added cluster testing functionality to validate replication across nodes
  • Refactor

    • Refactored storage layer and batch handling to support distributed consensus operations

@coderabbitai
Copy link

coderabbitai bot commented Mar 14, 2026

📝 Walkthrough

Walkthrough

This PR introduces Raft cluster orchestration and refactors the storage-to-Raft integration layer. It adds configuration files and scripts for a 3-node Raft cluster setup, implements an AppendLogFunction abstraction to bridge storage and Raft consensus, and updates BinlogBatch to properly commit entries through Raft with async runtime handling.

Changes

Cohort / File(s) Summary
Cluster Configuration
scripts/cluster/node1.conf, scripts/cluster/node2.conf, scripts/cluster/node3.conf
Defines network and Raft settings for each of three cluster nodes, specifying distinct ports (7379–7381), Raft node IDs (1–3), and data directories.
Cluster Orchestration Scripts
scripts/cluster/start_cluster.sh, scripts/cluster/stop_cluster.sh, scripts/cluster/test_cluster.sh
Implements cluster lifecycle management: start_cluster.sh builds the binary, initializes directories, starts three nodes, and optionally initializes Raft members; stop_cluster.sh gracefully shuts down nodes with timeout; test_cluster.sh validates leader election, replication, and deletes across all nodes.
Raft Storage Adapter & Integration
src/raft/src/adapter.rs, src/raft/src/lib.rs, src/server/src/main.rs
Adds create_append_log_fn to forward binlog writes to RaftApp::client_write asynchronously; exports new raft modules (adapter, node, state_machine); wires the adapter into storage initialization.
Storage Batch & Options
src/storage/src/batch.rs, src/storage/src/options.rs, src/storage/src/lib.rs
Implements AppendLogFunction type alias; converts BinlogBatch from placeholder to functional implementation with put/delete accumulation, async commit via append_log_fn, and runtime-safe Tokio handling.
Storage Mode Selection & Configuration
src/storage/src/redis.rs, src/storage/src/storage.rs, src/conf/src/config.rs
Adds conditional batch creation (BinlogBatch in Raft mode, RocksBatch otherwise) with RWLock-protected append_log_fn storage; introduces set_append_log_fn_for_all for propagating the adapter; adds storage_data_dir config field.
Configuration & Type Refactoring
src/conf/src/raft_type.rs, src/cmd/src/lib.rs, src/cmd/src/set.rs
Renames BinlogResponse.message to error_message with serde alias; deprecates Cmd::to_binlog; removes to_binlog implementation from SetCmd.
Raft Network & Internals
src/raft/src/api.rs, src/raft/src/network.rs, src/raft/src/state_machine.rs
Reorders imports in api.rs; adds safe base URL resolution in network.rs (handles missing schemes); adds EOF newline in state_machine.rs.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Storage
    participant BinlogBatch
    participant AppendLogFn as AppendLogFunction
    participant RaftApp
    participant Consensus

    Client->>Storage: write(key, value)
    alt Raft Mode Active
        Storage->>BinlogBatch: new(db_id, append_log_fn)
        Client->>BinlogBatch: put(key, value)
        BinlogBatch->>BinlogBatch: accumulate entry
        Client->>BinlogBatch: commit()
        BinlogBatch->>AppendLogFn: call with Binlog
        AppendLogFn->>RaftApp: client_write(binlog)
        RaftApp->>Consensus: append_entries
        Consensus-->>RaftApp: response
        RaftApp-->>AppendLogFn: Ok(())
        AppendLogFn-->>BinlogBatch: resolved
    else Local Mode
        Storage->>Storage: create_local_batch (RocksBatch)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • marsevilspirit
  • AlexStocks

Poem

🐰 A cluster of three nodes hops into place,
With batches and adapters keeping a steady pace,
Raft consensus binds them in silent accord,
Append-log functions bridge storage and lord,
The database now dances—distributed grace!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.65% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: integrating Open Raft logic into the Redis command flow, which aligns with the substantial refactoring across raft, storage, and server modules.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/perfect-raft
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Tip

Migrating from UI to YAML configuration.

Use the @coderabbitai configuration command in a PR comment to get a dump of all your UI settings in YAML format. You can then edit this YAML file and upload it to the root of your repository to configure CodeRabbit programmatically.

@github-actions github-actions bot added the ✏️ Feature new feature label Mar 14, 2026
@guozhihao-224
Copy link
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Mar 14, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (3)
src/storage/src/batch.rs (1)

256-264: Mock function's call counter is unused.

The called Arc<AtomicUsize> is created but not returned from create_mock_append_fn, so tests cannot verify that the append function was actually invoked during commit(). Consider returning the counter or using a different verification approach.

♻️ Suggested fix to enable call verification
-    fn create_mock_append_fn() -> AppendLogFunction {
+    fn create_mock_append_fn() -> (AppendLogFunction, Arc<AtomicUsize>) {
         let called = Arc::new(AtomicUsize::new(0));
         let called_clone = called.clone();
 
-        Arc::new(move |_binlog: Binlog| {
+        let func = Arc::new(move |_binlog: Binlog| {
             called_clone.fetch_add(1, Ordering::SeqCst);
             Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = Result<()>> + Send>>
-        })
+        });
+        (func, called)
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/src/batch.rs` around lines 256 - 264, The mock append function
create_mock_append_fn creates an Arc<AtomicUsize> called called but never
exposes it, so tests can't assert the append was invoked; modify
create_mock_append_fn to return both the AppendLogFunction and the
Arc<AtomicUsize> (or accept a provided counter) so callers can inspect the
counter after commit(), update the closure to increment that shared counter
(called_clone) as it already does, and adjust tests to receive and assert on the
returned counter value to verify the append invocation.
src/server/src/main.rs (1)

262-288: Raft HTTP server failures are silently ignored.

If the Raft HTTP server fails to bind or encounters an error, it's logged but the main Redis server continues running. This could lead to a partially-functional cluster where Redis accepts writes but Raft consensus isn't operational.

Consider propagating startup errors or adding health checks that surface Raft server availability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/src/main.rs` around lines 262 - 288, The Raft HTTP server startup
errors inside the tokio::spawn block (see tokio::spawn, HttpServer::new,
bind(&raft_addr), and server.run().await) are only logged and do not affect the
main process; change this so failures are propagated to the main startup flow:
create a oneshot or mpsc channel before spawning, send an Err(error) back to the
caller if bind or server.run().await fails (or send Ok on success), and have the
main task await the receiver and abort/startup-fail (return Err or panic) when
an error arrives; alternatively remove the detached spawn and await the
server.run().await result directly from the main async context so bind/run
errors propagate out of main.
src/storage/src/redis.rs (1)

351-372: Consider using parking_lot::RwLock instead of std::sync::RwLock.

The .unwrap() calls on the RwLock will panic if the lock becomes poisoned (when a thread panics while holding the lock). Since this codebase already uses parking_lot (visible in imports), consider using parking_lot::RwLock which doesn't have poisoning semantics, or handle the potential poison error gracefully.

♻️ Suggested change to use parking_lot

In imports (line 21 area):

-use std::sync::RwLock;
+use parking_lot::RwLock;

Then the method bodies simplify (no .unwrap() needed):

 pub fn create_batch(&self) -> Result<Box<dyn crate::batch::Batch + '_>> {
-    if let Some(append_log_fn) = self.append_log_fn.read().unwrap().as_ref() {
+    if let Some(append_log_fn) = self.append_log_fn.read().as_ref() {
         return Ok(Box::new(crate::batch::BinlogBatch::new(
             self.index as u32,
             append_log_fn.clone(),
         )));
     }
     self.create_rocks_batch()
 }

 pub fn set_append_log_fn(&self, f: AppendLogFunction) {
-    *self.append_log_fn.write().unwrap() = Some(f);
+    *self.append_log_fn.write() = Some(f);
 }

 pub fn is_raft_mode(&self) -> bool {
-    self.append_log_fn.read().unwrap().is_some()
+    self.append_log_fn.read().is_some()
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/src/redis.rs` around lines 351 - 372, The RwLock usage in
functions like create_batch, create_local_batch, set_append_log_fn, and
is_raft_mode currently calls .read().unwrap()/ .write().unwrap(), which will
panic on a poisoned std::sync::RwLock; switch the type of append_log_fn to
parking_lot::RwLock (import parking_lot::RwLock and update the field type) so
you can call .read()/.write() without handling PoisonError, or alternatively
replace each .unwrap() with proper poison handling (e.g., map_err or recover) to
avoid panics; update code sites referencing append_log_fn (create_batch,
set_append_log_fn, is_raft_mode) to use the chosen approach and ensure
create_rocks_batch/create_local_batch logic remains unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@scripts/cluster/start_cluster.sh`:
- Line 7: The script uses "set -e" which can mask failures in pipelines (e.g.,
the pipeline in the build step that runs "cargo build | tail -5"), so enable
pipefail and stricter error handling by adding "set -o pipefail" (or replace the
existing "set -e" with "set -euo pipefail") at the top of start_cluster.sh;
additionally, update the build pipeline (the "cargo build | tail -5" invocation)
to either capture the cargo exit code explicitly (run cargo build and check its
exit status before piping) or use a construct that preserves the original
command's exit status so failed cargo builds cause the script to exit
immediately.
- Around line 83-97: The wait_for_ports logic currently loops to check
redis_port and raft_port readiness but never fails the function on timeout;
update the wait_for_ports function so each port loop sets a local "ready" flag
(e.g., redis_ready, raft_ready), after each loop check the flag and if false
call log_error (including node_num and port) and return 1; only return 0 at the
end when both redis_ready and raft_ready are true. Reference the existing
symbols: wait_for_ports, node_num, redis_port, raft_port, log_success, and add
log_error + explicit return 1 on timeout so the caller sees failure.

In `@scripts/cluster/stop_cluster.sh`:
- Around line 26-43: The PID check in the stop logic (uses pid_file, log_info,
log_warn) only verifies existence and doesn’t confirm the process is the
expected kiwi node; update the shutdown sequence to first read and quote "$pid",
then verify identity by inspecting /proc/"$pid"/cmdline or using ps -p "$pid" -o
comm= (or full args) and match against the expected process name or pattern for
the kiwi node before sending signals; only proceed with kill/kill -9 if the
command-line check matches, and ensure every occurrence of $pid is quoted (e.g.,
"$pid") to avoid word-splitting.

In `@scripts/cluster/test_cluster.sh`:
- Around line 142-143: The check currently hardcodes
follower_port=${REDIS_PORTS[1]} which can point at the leader and give false
positives; update the logic to dynamically pick a follower: detect the leader by
querying each port in REDIS_PORTS with `redis-cli -p $port INFO replication`
(check the "role:master" or "role:slave" field), set leader_port to the master
and choose any other port as follower_port (or fail if no follower found), then
run `redis-cli -p $follower_port GET "$key"` into result; replace the hardcoded
REDIS_PORTS[1] usage with this leader-detection + follower selection logic so
the replication check always queries an actual follower.

In `@src/conf/src/raft_type.rs`:
- Around line 74-75: The field `error_message` is annotated with `#[serde(alias
= "message")]` which only affects deserialization and will still serialize as
`error_message`; change the serde attrs on the `error_message` field so it
serializes as `"message"` while still accepting the legacy `"error_message"` on
input, e.g. replace the current attribute with something like `#[serde(rename =
"message", alias = "error_message")]` on the `error_message` field in
`raft_type.rs`.

In `@src/storage/src/batch.rs`:
- Around line 226-238: The current code calls tokio::task::block_in_place
whenever tokio::runtime::Handle::try_current() succeeds, which can panic on a
single-threaded (current_thread) runtime; instead, when try_current() returns
Ok(handle) avoid block_in_place and run the blocking work via
tokio::task::spawn_blocking (or handle.spawn_blocking) so the blocking future
runs on the blocking thread pool and then await/join that spawned task (instead
of calling block_in_place); keep the existing branch that builds a new
current_thread runtime and uses rt.block_on(future) only when try_current()
returns Err, and replace the direct block_in_place path with spawn_blocking
logic around the same future.

---

Nitpick comments:
In `@src/server/src/main.rs`:
- Around line 262-288: The Raft HTTP server startup errors inside the
tokio::spawn block (see tokio::spawn, HttpServer::new, bind(&raft_addr), and
server.run().await) are only logged and do not affect the main process; change
this so failures are propagated to the main startup flow: create a oneshot or
mpsc channel before spawning, send an Err(error) back to the caller if bind or
server.run().await fails (or send Ok on success), and have the main task await
the receiver and abort/startup-fail (return Err or panic) when an error arrives;
alternatively remove the detached spawn and await the server.run().await result
directly from the main async context so bind/run errors propagate out of main.

In `@src/storage/src/batch.rs`:
- Around line 256-264: The mock append function create_mock_append_fn creates an
Arc<AtomicUsize> called called but never exposes it, so tests can't assert the
append was invoked; modify create_mock_append_fn to return both the
AppendLogFunction and the Arc<AtomicUsize> (or accept a provided counter) so
callers can inspect the counter after commit(), update the closure to increment
that shared counter (called_clone) as it already does, and adjust tests to
receive and assert on the returned counter value to verify the append
invocation.

In `@src/storage/src/redis.rs`:
- Around line 351-372: The RwLock usage in functions like create_batch,
create_local_batch, set_append_log_fn, and is_raft_mode currently calls
.read().unwrap()/ .write().unwrap(), which will panic on a poisoned
std::sync::RwLock; switch the type of append_log_fn to parking_lot::RwLock
(import parking_lot::RwLock and update the field type) so you can call
.read()/.write() without handling PoisonError, or alternatively replace each
.unwrap() with proper poison handling (e.g., map_err or recover) to avoid
panics; update code sites referencing append_log_fn (create_batch,
set_append_log_fn, is_raft_mode) to use the chosen approach and ensure
create_rocks_batch/create_local_batch logic remains unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3e8fbd3a-4d28-41ed-83a3-f0a7aaaab681

📥 Commits

Reviewing files that changed from the base of the PR and between 1c58ec5 and 032ae63.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (21)
  • scripts/cluster/node1.conf
  • scripts/cluster/node2.conf
  • scripts/cluster/node3.conf
  • scripts/cluster/start_cluster.sh
  • scripts/cluster/stop_cluster.sh
  • scripts/cluster/test_cluster.sh
  • src/cmd/src/lib.rs
  • src/cmd/src/set.rs
  • src/conf/src/config.rs
  • src/conf/src/raft_type.rs
  • src/raft/src/adapter.rs
  • src/raft/src/api.rs
  • src/raft/src/lib.rs
  • src/raft/src/network.rs
  • src/raft/src/state_machine.rs
  • src/server/src/main.rs
  • src/storage/src/batch.rs
  • src/storage/src/lib.rs
  • src/storage/src/options.rs
  • src/storage/src/redis.rs
  • src/storage/src/storage.rs
💤 Files with no reviewable changes (1)
  • src/cmd/src/set.rs

# Usage: ./start_cluster.sh [--init]
#

set -e
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, locate and read the script file
find . -name "start_cluster.sh" -type f

Repository: arana-db/kiwi

Length of output: 91


🏁 Script executed:

# Read the file to verify the content at lines 7 and 50
cat -n ./scripts/cluster/start_cluster.sh

Repository: arana-db/kiwi

Length of output: 6685


Pipeline failures can be masked with set -e alone; use pipefail for reliability.

Line 50 pipes cargo build into tail -5; with only set -e, the pipeline's exit status depends on tail's success, not cargo's. While the subsequent file existence check (line 51) provides a safety net, relying on post-hoc validation is less reliable than catching the failure immediately.

Proposed fix
-set -e
+set -euo pipefail

And update the build step:

-    cargo build --release --bin kiwi 2>&1 | tail -5
-    if [ ! -f "$PROJECT_ROOT/target/release/kiwi" ]; then
+    if ! cargo build --release --bin kiwi 2>&1 | tail -5; then
         log_error "Build failed!"
         exit 1
     fi
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/cluster/start_cluster.sh` at line 7, The script uses "set -e" which
can mask failures in pipelines (e.g., the pipeline in the build step that runs
"cargo build | tail -5"), so enable pipefail and stricter error handling by
adding "set -o pipefail" (or replace the existing "set -e" with "set -euo
pipefail") at the top of start_cluster.sh; additionally, update the build
pipeline (the "cargo build | tail -5" invocation) to either capture the cargo
exit code explicitly (run cargo build and check its exit status before piping)
or use a construct that preserves the original command's exit status so failed
cargo builds cause the script to exit immediately.

Comment on lines +83 to +97
for j in {1..10}; do
if nc -z 127.0.0.1 $redis_port 2>/dev/null; then
log_success "Node $node_num Redis port $redis_port is ready"
break
fi
sleep 1
done

for j in {1..10}; do
if nc -z 127.0.0.1 $raft_port 2>/dev/null; then
log_success "Node $node_num Raft port $raft_port is ready"
break
fi
sleep 1
done
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check if the script exists and examine the relevant lines
if [ -f "scripts/cluster/start_cluster.sh" ]; then
    echo "File found. Checking lines 83-97:"
    sed -n '83,97p' scripts/cluster/start_cluster.sh
    echo -e "\n\nShowing broader context (lines 70-110):"
    sed -n '70,110p' scripts/cluster/start_cluster.sh
else
    echo "File not found. Listing scripts/cluster directory:"
    ls -la scripts/cluster/ 2>/dev/null || echo "Directory doesn't exist"
fi

Repository: arana-db/kiwi

Length of output: 1905


🏁 Script executed:

# Search for the wait_for_ports function definition
echo "Looking for wait_for_ports function:"
grep -n "wait_for_ports" scripts/cluster/start_cluster.sh 2>/dev/null || echo "Not found in expected location"

# Also search for how this function is called
echo -e "\n\nSearching for function calls:"
rg "wait_for_ports" scripts/cluster/ -n 2>/dev/null || echo "No results"

Repository: arana-db/kiwi

Length of output: 281


🏁 Script executed:

# Get the full function to understand error handling context
echo "Checking for error handling patterns in the script:"
rg -A 5 -B 5 "for j in \{1\.\.10\}" scripts/cluster/start_cluster.sh 2>/dev/null || echo "Pattern not found"

Repository: arana-db/kiwi

Length of output: 737


🏁 Script executed:

# Check the beginning of the script for error handling flags
echo "Checking script beginning:"
head -20 scripts/cluster/start_cluster.sh

echo -e "\n\nChecking how wait_for_ports is called (around line 183):"
sed -n '175,195p' scripts/cluster/start_cluster.sh

echo -e "\n\nChecking what happens after wait_for_ports call:"
sed -n '183,200p' scripts/cluster/start_cluster.sh

Repository: arana-db/kiwi

Length of output: 997


🏁 Script executed:

# Check if there are any other safety checks or error handling
echo "Checking for error handling patterns:"
rg "set -e|set -o|trap|set -E" scripts/cluster/start_cluster.sh || echo "No standard error flags found"

echo -e "\n\nChecking main function flow:"
rg -B 2 -A 10 "main()" scripts/cluster/start_cluster.sh | head -50

Repository: arana-db/kiwi

Length of output: 417


wait_for_ports needs explicit failure handling for port timeout scenarios.

The function checks Redis and Raft port readiness with 10-second timeouts per port, but silently succeeds even if ports never open. Since the function returns 0 implicitly, the script continues to initialize the cluster despite missing ports, producing false "cluster started" status.

Although set -e is enabled at script startup, it cannot catch this silent failure since the function doesn't return an error code. The fix should track readiness state and explicitly return 1 on timeout so the error propagates correctly.

Proposed fix
-        for j in {1..10}; do
+        local redis_ready=false
+        for j in {1..10}; do
             if nc -z 127.0.0.1 $redis_port 2>/dev/null; then
                 log_success "Node $node_num Redis port $redis_port is ready"
+                redis_ready=true
                 break
             fi
             sleep 1
         done
+        if [ "$redis_ready" = false ]; then
+            log_error "Node $node_num Redis port $redis_port did not become ready"
+            return 1
+        fi
         
-        for j in {1..10}; do
+        local raft_ready=false
+        for j in {1..10}; do
             if nc -z 127.0.0.1 $raft_port 2>/dev/null; then
                 log_success "Node $node_num Raft port $raft_port is ready"
+                raft_ready=true
                 break
             fi
             sleep 1
         done
+        if [ "$raft_ready" = false ]; then
+            log_error "Node $node_num Raft port $raft_port did not become ready"
+            return 1
+        fi
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for j in {1..10}; do
if nc -z 127.0.0.1 $redis_port 2>/dev/null; then
log_success "Node $node_num Redis port $redis_port is ready"
break
fi
sleep 1
done
for j in {1..10}; do
if nc -z 127.0.0.1 $raft_port 2>/dev/null; then
log_success "Node $node_num Raft port $raft_port is ready"
break
fi
sleep 1
done
local redis_ready=false
for j in {1..10}; do
if nc -z 127.0.0.1 $redis_port 2>/dev/null; then
log_success "Node $node_num Redis port $redis_port is ready"
redis_ready=true
break
fi
sleep 1
done
if [ "$redis_ready" = false ]; then
log_error "Node $node_num Redis port $redis_port did not become ready"
return 1
fi
local raft_ready=false
for j in {1..10}; do
if nc -z 127.0.0.1 $raft_port 2>/dev/null; then
log_success "Node $node_num Raft port $raft_port is ready"
raft_ready=true
break
fi
sleep 1
done
if [ "$raft_ready" = false ]; then
log_error "Node $node_num Raft port $raft_port did not become ready"
return 1
fi
🧰 Tools
🪛 Shellcheck (0.11.0)

[warning] 91-91: j appears unused. Verify use (or export if used externally).

(SC2034)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/cluster/start_cluster.sh` around lines 83 - 97, The wait_for_ports
logic currently loops to check redis_port and raft_port readiness but never
fails the function on timeout; update the wait_for_ports function so each port
loop sets a local "ready" flag (e.g., redis_ready, raft_ready), after each loop
check the flag and if false call log_error (including node_num and port) and
return 1; only return 0 at the end when both redis_ready and raft_ready are
true. Reference the existing symbols: wait_for_ports, node_num, redis_port,
raft_port, log_success, and add log_error + explicit return 1 on timeout so the
caller sees failure.

Comment on lines +26 to +43
local pid=$(cat "$pid_file")
if kill -0 $pid 2>/dev/null; then
log_info "Stopping Node $node_id (PID: $pid)..."
kill $pid 2>/dev/null || true

# Wait for graceful shutdown
for i in {1..10}; do
if ! kill -0 $pid 2>/dev/null; then
break
fi
sleep 1
done

# Force kill if still running
if kill -0 $pid 2>/dev/null; then
log_warn "Force killing Node $node_id..."
kill -9 $pid 2>/dev/null || true
fi
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Verify the file exists and read its full content
if [ -f "scripts/cluster/stop_cluster.sh" ]; then
    wc -l "scripts/cluster/stop_cluster.sh"
    echo "--- Full file content ---"
    cat -n "scripts/cluster/stop_cluster.sh"
else
    echo "File not found: scripts/cluster/stop_cluster.sh"
fi

Repository: arana-db/kiwi

Length of output: 2234


🏁 Script executed:

# Search for where PID files are created and how they're managed
rg "pid_file|\.pid" scripts/cluster/ -A 3 -B 3

Repository: arana-db/kiwi

Length of output: 3365


🏁 Script executed:

# Look for any cleanup or validation logic related to PIDs
rg "rm.*pid|delete.*pid|stale|cleanup" scripts/cluster/ -i

Repository: arana-db/kiwi

Length of output: 369


PID reuse risk: process identity is not verified before kill.

If a stale PID file exists from a previous invocation, the PID may have been recycled to an unrelated process, causing it to be terminated. The kill -0 $pid check only verifies that a process with that PID exists, not that it is the correct kiwi node process.

The script should verify process identity before sending kill signals. Additionally, all $pid variables lack proper quoting.

Proposed fix
-        local pid=$(cat "$pid_file")
-        if kill -0 $pid 2>/dev/null; then
+        local pid
+        pid=$(cat "$pid_file")
+        local cmdline
+        cmdline=$(ps -p "$pid" -o args= 2>/dev/null || true)
+        if [[ "$cmdline" != *"kiwi"* ]]; then
+            log_warn "PID $pid is not a kiwi process; skipping"
+            rm -f "$pid_file"
+            return
+        fi
+        if kill -0 "$pid" 2>/dev/null; then
             log_info "Stopping Node $node_id (PID: $pid)..."
-            kill $pid 2>/dev/null || true
+            kill "$pid" 2>/dev/null || true
@@
-                if ! kill -0 $pid 2>/dev/null; then
+                if ! kill -0 "$pid" 2>/dev/null; then
@@
-            if kill -0 $pid 2>/dev/null; then
+            if kill -0 "$pid" 2>/dev/null; then
                 log_warn "Force killing Node $node_id..."
-                kill -9 $pid 2>/dev/null || true
+                kill -9 "$pid" 2>/dev/null || true
             fi
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
local pid=$(cat "$pid_file")
if kill -0 $pid 2>/dev/null; then
log_info "Stopping Node $node_id (PID: $pid)..."
kill $pid 2>/dev/null || true
# Wait for graceful shutdown
for i in {1..10}; do
if ! kill -0 $pid 2>/dev/null; then
break
fi
sleep 1
done
# Force kill if still running
if kill -0 $pid 2>/dev/null; then
log_warn "Force killing Node $node_id..."
kill -9 $pid 2>/dev/null || true
fi
local pid
pid=$(cat "$pid_file")
local cmdline
cmdline=$(ps -p "$pid" -o args= 2>/dev/null || true)
if [[ "$cmdline" != *"kiwi"* ]]; then
log_warn "PID $pid is not a kiwi process; skipping"
rm -f "$pid_file"
return
fi
if kill -0 "$pid" 2>/dev/null; then
log_info "Stopping Node $node_id (PID: $pid)..."
kill "$pid" 2>/dev/null || true
# Wait for graceful shutdown
for i in {1..10}; do
if ! kill -0 "$pid" 2>/dev/null; then
break
fi
sleep 1
done
# Force kill if still running
if kill -0 "$pid" 2>/dev/null; then
log_warn "Force killing Node $node_id..."
kill -9 "$pid" 2>/dev/null || true
fi
🧰 Tools
🪛 Shellcheck (0.11.0)

[warning] 26-26: Declare and assign separately to avoid masking return values.

(SC2155)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/cluster/stop_cluster.sh` around lines 26 - 43, The PID check in the
stop logic (uses pid_file, log_info, log_warn) only verifies existence and
doesn’t confirm the process is the expected kiwi node; update the shutdown
sequence to first read and quote "$pid", then verify identity by inspecting
/proc/"$pid"/cmdline or using ps -p "$pid" -o comm= (or full args) and match
against the expected process name or pattern for the kiwi node before sending
signals; only proceed with kill/kill -9 if the command-line check matches, and
ensure every occurrence of $pid is quoted (e.g., "$pid") to avoid
word-splitting.

Comment on lines +142 to +143
local follower_port=${REDIS_PORTS[1]}
local result=$(redis-cli -p $follower_port GET "$key" 2>/dev/null)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's check the file size and structure
wc -l scripts/cluster/test_cluster.sh

Repository: arana-db/kiwi

Length of output: 92


🏁 Script executed:

# Now let's read the relevant section around lines 142-143
sed -n '130,160p' scripts/cluster/test_cluster.sh

Repository: arana-db/kiwi

Length of output: 957


🏁 Script executed:

# Let's also find the test_multiple_writes function and understand its full context
grep -n "test_multiple_writes\|REDIS_PORTS\|LEADER_REDIS_PORT" scripts/cluster/test_cluster.sh | head -30

Repository: arana-db/kiwi

Length of output: 925


🏁 Script executed:

# Let's search for where REDIS_PORTS and LEADER_REDIS_PORT are defined/initialized
grep -n "REDIS_PORTS\|LEADER_REDIS_PORT" scripts/cluster/test_cluster.sh

Repository: arana-db/kiwi

Length of output: 867


🏁 Script executed:

# Let's look at the full context of the function to understand variables better
sed -n '130,160p' scripts/cluster/test_cluster.sh

Repository: arana-db/kiwi

Length of output: 957


🏁 Script executed:

# Let's also check if LEADER_INDEX or similar is available
grep -n "LEADER_INDEX\|LEADER_REDIS_PORT.*=" scripts/cluster/test_cluster.sh | head -20

Repository: arana-db/kiwi

Length of output: 237


Replication check is hardcoded to a fixed node and can yield false positives.

Line 142 always validates against REDIS_PORTS[1] (port 7380); when Node 2 is the leader, this verifies the leader itself rather than follower replication, allowing replication failures to go undetected.

Proposed fix
-        local follower_port=${REDIS_PORTS[1]}
-        local result=$(redis-cli -p $follower_port GET "$key" 2>/dev/null)
-        
-        if [ "$result" == "$value" ]; then
-            success_count=$((success_count + 1))
-        fi
+        local replicated=true
+        for follower_port in "${REDIS_PORTS[@]}"; do
+            if [ "$follower_port" -eq "$LEADER_REDIS_PORT" ]; then
+                continue
+            fi
+            local result
+            result=$(redis-cli -p "$follower_port" GET "$key" 2>/dev/null)
+            if [ "$result" != "$value" ]; then
+                replicated=false
+                break
+            fi
+        done
+        if $replicated; then
+            success_count=$((success_count + 1))
+        fi
🧰 Tools
🪛 Shellcheck (0.11.0)

[warning] 143-143: Declare and assign separately to avoid masking return values.

(SC2155)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/cluster/test_cluster.sh` around lines 142 - 143, The check currently
hardcodes follower_port=${REDIS_PORTS[1]} which can point at the leader and give
false positives; update the logic to dynamically pick a follower: detect the
leader by querying each port in REDIS_PORTS with `redis-cli -p $port INFO
replication` (check the "role:master" or "role:slave" field), set leader_port to
the master and choose any other port as follower_port (or fail if no follower
found), then run `redis-cli -p $follower_port GET "$key"` into result; replace
the hardcoded REDIS_PORTS[1] usage with this leader-detection + follower
selection logic so the replication check always queries an actual follower.

Comment on lines +74 to +75
#[serde(alias = "message")]
pub error_message: Option<String>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

In Serde, does #[serde(alias = "message")] affect serialization output field names, or only deserialization?

💡 Result:

#[serde(alias = "message")] affects deserialization only: it lets Serde accept "message" (in addition to the Rust field name) when reading input, but it does not change the field name used during serialization. To change the serialized output name, use #[serde(rename = "...")] (optionally rename(serialize = "...")). [1]

Source:
[1] Serde “Field attributes” docs (alias and rename) (serde.rs)

Citations:


🏁 Script executed:

#!/bin/bash
# Read-only check for potential consumers relying on `message` in code/tests/docs.
rg -nP '\b(error_message|message)\b' --type rust

Repository: arana-db/kiwi

Length of output: 40120


serde(alias) affects deserialization only; serialization uses the Rust field name.

This will serialize as error_message instead of message, breaking compatibility with external consumers expecting message in the wire protocol. Use rename to preserve serialization key while maintaining backward-compatible deserialization:

 #[serde(
+    rename = "message",
     alias = "message"
 )]
 pub error_message: Option<String>,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[serde(alias = "message")]
pub error_message: Option<String>,
#[serde(
rename = "message",
alias = "message"
)]
pub error_message: Option<String>,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/conf/src/raft_type.rs` around lines 74 - 75, The field `error_message` is
annotated with `#[serde(alias = "message")]` which only affects deserialization
and will still serialize as `error_message`; change the serde attrs on the
`error_message` field so it serializes as `"message"` while still accepting the
legacy `"error_message"` on input, e.g. replace the current attribute with
something like `#[serde(rename = "message", alias = "error_message")]` on the
`error_message` field in `raft_type.rs`.

Comment on lines +226 to 238
match tokio::runtime::Handle::try_current() {
Ok(handle) => tokio::task::block_in_place(|| handle.block_on(future)),
Err(_) => {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| crate::error::Error::Batch {
message: format!("Failed to create runtime for Raft commit: {e}"),
location: snafu::Location::new(file!(), line!(), column!()),
})?;
rt.block_on(future)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Does tokio block_in_place panic on current_thread runtime?

💡 Result:

Yes. tokio::task::block_in_place panics if it’s called from a current_thread (single-threaded) runtime. Use a multi-thread runtime, or use spawn_blocking to offload blocking work instead. [1][2]


Fix panic risk with block_in_place on single-threaded runtimes.

tokio::task::block_in_place will panic if called from within a current_thread runtime. The current code calls block_in_place whenever try_current() succeeds, without verifying whether the runtime is multi-threaded. If this code runs in a single-threaded context (e.g., a current_thread runtime), it will crash.

Use spawn_blocking or check that the runtime is multi-threaded before calling block_in_place. Alternatively, document and enforce that this code must only run within a multi-threaded runtime.

🔧 Example fix using spawn_blocking
-        match tokio::runtime::Handle::try_current() {
-            Ok(handle) => tokio::task::block_in_place(|| handle.block_on(future)),
-            Err(_) => {
+        match tokio::runtime::Handle::try_current() {
+            Ok(handle) => {
+                // Use spawn_blocking to avoid panic on current_thread runtime
+                let (tx, rx) = std::sync::mpsc::channel();
+                handle.spawn(async move {
+                    let result = future.await;
+                    let _ = tx.send(result);
+                });
+                rx.recv().map_err(|_| crate::error::Error::Batch {
+                    message: "Raft commit channel closed".to_string(),
+                    location: snafu::Location::new(file!(), line!(), column!()),
+                })?
+            }
+            Err(_) => {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/src/batch.rs` around lines 226 - 238, The current code calls
tokio::task::block_in_place whenever tokio::runtime::Handle::try_current()
succeeds, which can panic on a single-threaded (current_thread) runtime;
instead, when try_current() returns Ok(handle) avoid block_in_place and run the
blocking work via tokio::task::spawn_blocking (or handle.spawn_blocking) so the
blocking future runs on the blocking thread pool and then await/join that
spawned task (instead of calling block_in_place); keep the existing branch that
builds a new current_thread runtime and uses rt.block_on(future) only when
try_current() returns Err, and replace the direct block_in_place path with
spawn_blocking logic around the same future.

@@ -363,6 +348,29 @@ impl Redis {
)))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_batch(), set_append_log_fn(), is_raft_mode() 三个方法里都用了 .unwrap(),CI 已经开启了 -D clippy::unwrap_used,这些会导致编译失败。

// 当前代码
if let Some(append_log_fn) = self.append_log_fn.read().unwrap().as_ref() {

// 建议改为
if let Some(append_log_fn) = self.append_log_fn.read()
    .unwrap_or_else(|e| e.into_inner())
    .as_ref()
{

同理 write().unwrap() 也需要处理。RwLock poisoned 时 unwrap() 会 panic,用 unwrap_or_else(|e| e.into_inner()) 可以恢复。


let future = (self.append_log_fn)(binlog);

match tokio::runtime::Handle::try_current() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit() 里用 block_in_place + handle.block_on(future) 同步等待 Raft 共识完成,有死锁风险:

  1. 如果当前在 tokio current_thread runtime 上(比如测试),block_in_place 会 panic
  2. 如果 Raft 共识需要的网络请求也在同一个 runtime 上(比如存储 runtime 只有少量线程),所有线程被 block_in_place 占住,Raft 内部的 async 任务就拿不到线程执行,形成死锁
  3. fallback 分支每次创建新 runtime 代价太大

建议考虑让 commit() 变成 async(Batch trait 相应改造),或者确保 Raft 共识跑在独立的 runtime 上,与存储 runtime 隔离。

crate::slot_indexer::key_to_slot_id(key) as u32
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slot_idx 只取第一个 key 的 slot,如果一个 batch 包含多个不同 slot 的 key(比如 MSET),binlog 里记录的 slot_idx 就不准确。

state_machine replay 时如果依赖 slot_idx 做路由,会导致部分 key 写错 instance。

建议:

  1. 要么一个 batch 严格限制同 slot(在 put/delete 时校验),不同 slot 拆成多个 batch
  2. 要么 binlog 结构去掉单一 slot_idx,改为每个 entry 带自己的 slot

@@ -154,6 +157,9 @@ impl Config {
)),
})?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

配置项命名冲突:这里用了 data-dir,PR #243(Issue #193)用的是 db-dir,两个 PR 都在做「配置 RocksDB 存储目录」这件事。

建议统一命名。从 Redis 惯例看,dirdb-dir 更常见;data-dir 容易和 raft-data-dir 混淆。需要和 PR #243 协调一下用哪个名字。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

先合并 RP #243 , 以那个PR作为标准。当前PR的修改,主要为了处理在启动多个节点时没有办法处理多个db目录的问题

pub struct BinlogResponse {
pub success: bool,
pub message: Option<String>,
#[serde(alias = "message")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messageerror_message 是序列化层的 breaking change。#[serde(alias = "message")] 只对反序列化生效,序列化时输出的 JSON key 已经变成 error_message 了。

如果有其他节点还在用旧代码,它们发来的 response 能被正确反序列化(alias 兜底),但本节点发出去的 response 旧节点解析不了。

建议用 #[serde(rename = "message")] 保持序列化兼容,或者确认所有节点同时升级。

let base = match self.target_addr.starts_with("http://")
|| self.target_addr.starts_with("https://")
{
true => self.target_addr.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match bool { true => ..., false => ... } 不是惯用写法,clippy 会有 clippy::match_bool 警告。改成 if/else 更清晰:

let base = if self.target_addr.starts_with("http://")
    || self.target_addr.starts_with("https://")
{
    self.target_addr.clone()
} else {
    format\!("http://{}", self.target_addr)
};

message: response
.error_message
.unwrap_or_else(|| "Raft write failed".to_string()),
location: Default::default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 unwrap_or_else 没问题(不是 .unwrap()),但错误信息可以更有用——把原始的 error_message 内容带上:

Err(storage::error::Error::Batch {
    message: format\!(
        "Raft write rejected: {}",
        response.error_message.as_deref().unwrap_or("unknown error")
    ),
    location: Default::default(),
})

另外 location: Default::default() 不会记录实际位置。如果 storage error 有 snafu::Location 支持,可以用 snafu::Location::new(file\!(), line\!(), column\!()) 代替。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

✏️ Feature new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants