diff --git a/.gitignore b/.gitignore index 6d1b093fe..c3dee08e8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,20 +4,23 @@ scripts/deploy/config_out/ deploy/kv_server/output/ .idea/ .vscode/ -.DS_Store +**/*.DS_Store bazel-* -*.log +**/*.log *.bak venv sdk_validator/venv -__pycache__ +**/*__pycache__ MODULE.* apache_release *.out.* *.data.* *.pb.* .cache/ +.bazel-cache/ resdb/ 100*_db/ +*_db/ gmon.out -.history/ \ No newline at end of file +.history/ +**/*AGENTS.md diff --git a/.licenserc.yaml b/.licenserc.yaml index badb8a63c..36a11e92b 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -5,7 +5,8 @@ header: paths-ignore: - '.*' - - '.**/**' + - '.**/*' + - "ecosystem/**/.*" - '**/*.sol' - CNAME - DISCLAIMER @@ -14,8 +15,10 @@ header: - 'dev/.rat-excludes' - 'documents/doxygen/.gitignore' - 'third_party/loc_script/src/index.js' + - '**/*.json' + - '**/*.config' + - '**/*.conf' comment: on-failure - diff --git a/api/ip_address.config b/api/ip_address.config index e119673ea..0450a6c2f 100644 --- a/api/ip_address.config +++ b/api/ip_address.config @@ -1 +1 @@ -5 44.193.63.142 17005 \ No newline at end of file +5 127.0.0.1 20005 diff --git a/api/kv_operation.py b/api/kv_operation.py index f18c84601..3efcb0bbb 100644 --- a/api/kv_operation.py +++ b/api/kv_operation.py @@ -44,3 +44,11 @@ def get_value(key: str or int or float, config_path: str = current_dir + "/ip_ad :return: A string of the key's corresponding value. """ return pybind_kv.get(str(key), os.path.abspath(config_path)) + +def get_value_readonly(key: str | int | float, config_path: str = current_dir + "/ip_address.config") -> str: + """ + :param key: The key of the value you want to get in key value pair. + :param config_path: Default is connect to the main chain, users can specify the path to connect to their local blockchain without tracking the read. + :return: A string of the key's corresponding value. + """ + return pybind_kv.get_value_readonly(str(key), os.path.abspath(config_path)) diff --git a/api/learner-test.py b/api/learner-test.py new file mode 100644 index 000000000..8f6f2b18e --- /dev/null +++ b/api/learner-test.py @@ -0,0 +1,714 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + + +# import sys, os, time, subprocess + +# subprocess.run("bazel build :pybind_kv_so", shell=True) +# sys.path.append(os.getcwd()) + + + +from kv_operation import set_value, get_value, get_value_readonly +import sys, os, time, statistics, threading, queue, subprocess, matplotlib.pyplot as plt + +# Build and import the kv_operation module +def build_kv_binding(): + subprocess.run("bazel build :pybind_kv_so", shell=True, check=True) + sys.path.append(os.getcwd()) + from kv_operation import set_value, get_value, get_value_readonly + return set_value, get_value, get_value_readonly + +# Simple stats helper +def summarize_latencies(latencies): + if not latencies: + return {} + return { + "count": len(latencies), + "mean": statistics.mean(latencies), + "median": statistics.median(latencies), + "p95": statistics.quantiles(latencies, n=20)[18], # approx 95th + "p99": statistics.quantiles(latencies, n=100)[98], # approx 99th + "min": min(latencies), + "max": max(latencies), + } + +# Generic worker to send operations and record per-op latency +def run_client_worker(op_func, num_requests, key_prefix, thread_id, result_queue): + latencies = [] + successes = 0 + for i in range(num_requests): + key = f"{key_prefix}_{thread_id}_{i}" + # Some experiments will treat op_func as GET, some as SET, etc. + start = time.time() + try: + op_func(key) + successes += 1 + except Exception as e: + # Log or ignore for pseudocode + pass + end = time.time() + latencies.append(end - start) + # Push results back to main thread + result_queue.put((successes, latencies)) + +# Experiment 1: Read Latency Comparison +def exp1_read_latency(set_value, get_value, get_value_readonly): + """ + Compare read latency for PBFT vs learner under different concurrency levels. + Assumes KV already populated with some keys. + """ + # Pre-populate a test key so both paths can read something + base_key = "EXP1_KEY" + set_value(base_key, "warmup_value") + + client_concurrency_levels = [1, 4, 8, 16, 32] + num_requests_per_client = 100 + + results = [] + + for num_clients in client_concurrency_levels: + print(f"\n=== Exp1: {num_clients} clients ===") + + for mode in ["pbft", "learner"]: + if mode == "pbft": + read_func = lambda k: get_value(k) + else: + read_func = lambda k: get_value_readonly(k) + + # All clients read same key to stress the path + def op_func(_ignored_key): + # Always read base_key + return read_func(base_key) + + result_queue = queue.Queue() + threads = [] + + start_wall = time.time() + for cid in range(num_clients): + t = threading.Thread( + target=run_client_worker, + args=(op_func, num_requests_per_client, "EXP1", cid, result_queue) + ) + t.start() + threads.append(t) + + for t in threads: + t.join() + end_wall = time.time() + + # Aggregate latencies + all_latencies = [] + total_success = 0 + while not result_queue.empty(): + successes, lat = result_queue.get() + total_success += successes + all_latencies.extend(lat) + + stats = summarize_latencies(all_latencies) + throughput = total_success / (end_wall - start_wall) + + results.append({ + "mode": mode, + "num_clients": num_clients, + "throughput": throughput, + "latency_stats": stats, + }) + + print(f"Mode={mode}, clients={num_clients}") + print(f" throughput={throughput:.2f} reads/sec") + print(f" latency: {stats}") + + return results + +def run_mixed_client_worker( + set_value, + read_func, + key_prefix, + thread_id, + read_ratio, + total_ops, + result_queue, +): + """ + A worker that issues a mix of reads and writes on a *single existing key*. + This avoids calling get_value_readonly() on keys that were never written. + """ + latencies_reads = [] + latencies_writes = [] + num_reads = 0 + num_writes = 0 + + # Use one key per client, pre-populated before measurement + key = f"{key_prefix}_{thread_id}" + + for i in range(total_ops): + is_read = (i % 100) < int(read_ratio * 100) + + if is_read: + start = time.time() + try: + read_func(key) + num_reads += 1 + except Exception: + # Optional: log, but don't kill the worker + pass + end = time.time() + latencies_reads.append(end - start) + else: + value = str(i) + start = time.time() + try: + set_value(key, value) + num_writes += 1 + except Exception: + pass + end = time.time() + latencies_writes.append(end - start) + + result_queue.put({ + "reads": num_reads, + "writes": num_writes, + "latencies_reads": latencies_reads, + "latencies_writes": latencies_writes, + }) + +# Experiment 2: Mixed Read/Write Workload +def exp2_mixed_workload(set_value, get_value, get_value_readonly): + """ + Measure write throughput and latency under different read/write mixes, + comparing PBFT vs learner for reads. + Now: + - pre-populates one key per client, + - only reads/writes existing keys. + """ + mixes = [0.0, 0.5, 0.9, 0.99] # fraction of reads + num_clients = 8 + total_ops_per_client = 200 + + results = [] + + # Pre-populate keys so both PBFT and learner know about them + for cid in range(num_clients): + key = f"EXP2_{cid}" + try: + set_value(key, "0") + except Exception as e: + print(f"Pre-populate failed for {key}: {e}") + + for read_ratio in mixes: + print(f"\n=== Exp2: read_ratio={read_ratio} ===") + + for mode in ["pbft", "learner"]: + if mode == "pbft": + read_func = lambda k: get_value(k) + else: + read_func = lambda k: get_value_readonly(k) + + result_queue = queue.Queue() + threads = [] + + start_wall = time.time() + for cid in range(num_clients): + t = threading.Thread( + target=run_mixed_client_worker, + args=( + set_value, + read_func, + "EXP2", + cid, + read_ratio, + total_ops_per_client, + result_queue, + ), + ) + t.start() + threads.append(t) + + for t in threads: + t.join() + end_wall = time.time() + + # Aggregate + total_reads = 0 + total_writes = 0 + all_read_lat = [] + all_write_lat = [] + while not result_queue.empty(): + r = result_queue.get() + total_reads += r["reads"] + total_writes += r["writes"] + all_read_lat.extend(r["latencies_reads"]) + all_write_lat.extend(r["latencies_writes"]) + + duration = end_wall - start_wall + read_throughput = total_reads / duration if duration > 0 else 0 + write_throughput = total_writes / duration if duration > 0 else 0 + + stats_reads = summarize_latencies(all_read_lat) + stats_writes = summarize_latencies(all_write_lat) + + results.append({ + "mode": mode, + "read_ratio": read_ratio, + "read_throughput": read_throughput, + "write_throughput": write_throughput, + "latency_reads": stats_reads, + "latency_writes": stats_writes, + }) + + print(f"Mode={mode}, read_ratio={read_ratio}") + print(f" read_throughput={read_throughput:.2f} rps") + print(f" write_throughput={write_throughput:.2f} wps") + print(f" read latency: {stats_reads}") + print(f" write latency: {stats_writes}") + + return results + +# Experiment 3: Staleness and Fallback Rate +def exp3_staleness_and_fallback(set_value, get_value, get_value_readonly): + """ + Measure how often learner reads differ from PBFT reads (staleness) + and how often learner fails and forces fallback. + """ + base_key = "EXP3_KEY" + + num_rounds = 100 + sleep_between_writes = 0.1 # simulate blocks committing over time + + num_equal = 0 + num_stale = 0 + num_fallback = 0 + learner_latencies = [] + pbft_latencies = [] + + for i in range(num_rounds): + # Write a fresh value through PBFT (normal write) + new_value = f"value_{i}" + set_value(base_key, new_value) + time.sleep(sleep_between_writes) + + # Ground truth read via PBFT + start_p = time.time() + truth = get_value(base_key) + end_p = time.time() + pbft_latencies.append(end_p - start_p) + + # Try learner read + start_l = time.time() + try: + learner_val = get_value_readonly(base_key) + end_l = time.time() + learner_latencies.append(end_l - start_l) + + if learner_val == truth: + num_equal += 1 + else: + # learner responded but value != PBFT truth ⇒ stale + num_stale += 1 + except Exception: + # interpret as fallback / failure to serve from learner + end_l = time.time() + learner_latencies.append(end_l - start_l) + num_fallback += 1 + + total = num_rounds + stats_learner = summarize_latencies(learner_latencies) + stats_pbft = summarize_latencies(pbft_latencies) + + print("\n=== Exp3: Staleness & Fallback ===") + print(f"Total trials: {total}") + print(f" equal (fresh) : {num_equal} ({num_equal/total:.2%})") + print(f" stale responses : {num_stale} ({num_stale/total:.2%})") + print(f" fallbacks : {num_fallback} ({num_fallback/total:.2%})") + print(f" learner latency : {stats_learner}") + print(f" PBFT latency : {stats_pbft}") + + return { + "total": total, + "equal": num_equal, + "stale": num_stale, + "fallback": num_fallback, + "latency_learner": stats_learner, + "latency_pbft": stats_pbft, + } + +# Experiment 4: Fault Injection Sanity Check +def exp4_fault_injection_sanity(set_value, get_value, get_value_readonly): + """ + Assumes that on the server side you have configured one replica + to send bad digests to the learner. + + From the client side, we: + - write monotone increasing integers to a key, + - read via PBFT as ground truth, + - read via learner, and + - check for impossible values. + """ + num_rounds = 100 + fallback_thresholds = [0.1, 0.01, 0.001, 0.0001] # seconds + all_results = [] + + for th_idx, th in enumerate(fallback_thresholds): + key = f"EXP4_KEY_{th_idx}" + + print(f"Testing with fallback threshold: {th} seconds") + impossible_values = [] + mismatches = 0 + fallbacks = 0 + + pbft_latencies = [] + learner_latencies = [] + highest_seen = -1 + + for i in range(num_rounds): + # Write monotone integer + set_value(key, str(i)) + time.sleep(0.1) + + # Ground truth + time0 = time.time() + truth = get_value(key) + time1 = time.time() + # Took pbft this time to get value + pbft_latencies.append(time1 - time0) + + time2 = time.time() + val = get_value_readonly(key) + time3 = time.time() + # Took learner this time to get value + learner_latency = time3 - time2 + learner_latencies.append(learner_latency) + + if learner_latency > th: + # If learner took too long, consider it a fallback + fallbacks += 1 + + if val != truth: + mismatches += 1 + # If you never wrote val at all, this is impossible + try: + int_val = int(val) + except ValueError: + # non-integer, clearly impossible in this setup + impossible_values.append(val) + continue + + if int_val < 0 or int_val > i: + impossible_values.append(val) + + # Rollback check: learner should not move backwards in value + if int_val < highest_seen: + impossible_values.append( + f"rollback: was {highest_seen}, now {int_val}" + ) + else: + highest_seen = max(highest_seen, int_val) + + print("\n=== Exp4: Fault Injection (Timing-based) ===") + print(f"Threshold: {th} seconds") + print(f"Total rounds: {num_rounds}") + print(f"Fallback reads (slow): {fallbacks}") + print(f"Mismatches total: {mismatches}") + print(f"Impossible values: {impossible_values}") + print(f"Learner latencies: {summarize_latencies(learner_latencies)}") + print(f"PBFT latencies: {summarize_latencies(pbft_latencies)}") + + all_results.append({ + "threshold": th, + "num_rounds": num_rounds, + "mismatches": mismatches, + "fallbacks": fallbacks, + "impossible_values": impossible_values, + "learner_latencies": summarize_latencies(learner_latencies), + "pbft_latencies": summarize_latencies(pbft_latencies), + }) + + return all_results + +def plot_exp1_latency(exp1_results, metric="median"): + """ + exp1_results: list of dicts from exp1_read_latency() + each dict has: + "mode": "pbft" or "learner" + "num_clients": int + "throughput": float + "latency_stats": {"median": ..., "p95": ..., ...} + + metric: which stat to plot: "median", "p95", "p99", etc. + """ + modes = ["pbft", "learner"] + num_clients = sorted(set(r["num_clients"] for r in exp1_results)) + + plt.figure() + for mode in modes: + ys = [] + for c in num_clients: + entry = next( + r for r in exp1_results + if r["mode"] == mode and r["num_clients"] == c + ) + ys.append(entry["latency_stats"][metric]) + plt.plot(num_clients, ys, marker="o", label=mode) + + plt.xlabel("Number of clients") + plt.ylabel(f"Read latency ({metric}) [seconds]") + plt.title(f"Experiment 1: {metric} read latency vs clients") + plt.legend() + plt.grid(True) + plt.tight_layout() + + filename = os.path.join(PLOTS_DIR, f"exp1_latency_{metric}.pdf") + plt.savefig(filename, dpi=300) + + +def plot_exp1_throughput(exp1_results): + modes = ["pbft", "learner"] + num_clients = sorted(set(r["num_clients"] for r in exp1_results)) + + plt.figure() + for mode in modes: + ys = [] + for c in num_clients: + entry = next( + r for r in exp1_results + if r["mode"] == mode and r["num_clients"] == c + ) + ys.append(entry["throughput"]) + plt.plot(num_clients, ys, marker="o", label=mode) + + plt.xlabel("Number of clients") + plt.ylabel("Read throughput [ops/sec]") + plt.title("Experiment 1: read throughput vs clients") + plt.legend() + plt.grid(True) + plt.tight_layout() + + filename = os.path.join(PLOTS_DIR, "exp1_throughput.pdf") + plt.savefig(filename, dpi=300) + + +def plot_exp2_write_throughput(exp2_results): + """ + exp2_results: list of dicts from exp2_mixed_workload(), each dict: + "mode": "pbft" or "learner" + "read_ratio": float + "read_throughput": float + "write_throughput": float + "latency_reads": {...} + "latency_writes": {...} + """ + modes = ["pbft", "learner"] + read_ratios = sorted(set(r["read_ratio"] for r in exp2_results)) + + plt.figure() + for mode in modes: + ys = [] + for rr in read_ratios: + entry = next( + r for r in exp2_results + if r["mode"] == mode and r["read_ratio"] == rr + ) + ys.append(entry["write_throughput"]) + plt.plot(read_ratios, ys, marker="o", label=mode) + + plt.xlabel("Read ratio (fraction of operations that are reads)") + plt.ylabel("Write throughput [ops/sec]") + plt.title("Experiment 2: write throughput vs read ratio") + plt.legend() + plt.grid(True) + plt.tight_layout() + + filename = os.path.join(PLOTS_DIR, "exp2_write_throughput.pdf") + plt.savefig(filename, dpi=300) + + + +def plot_exp2_read_throughput(exp2_results): + modes = ["pbft", "learner"] + read_ratios = sorted(set(r["read_ratio"] for r in exp2_results)) + + plt.figure() + for mode in modes: + ys = [] + for rr in read_ratios: + entry = next( + r for r in exp2_results + if r["mode"] == mode and r["read_ratio"] == rr + ) + ys.append(entry["read_throughput"]) + plt.plot(read_ratios, ys, marker="o", label=mode) + + plt.xlabel("Read ratio (fraction of operations that are reads)") + plt.ylabel("Read throughput [ops/sec]") + plt.title("Experiment 2: read throughput vs read ratio") + plt.legend() + plt.grid(True) + plt.tight_layout() + + filename = os.path.join(PLOTS_DIR, "exp2_read_throughput.pdf") + plt.savefig(filename, dpi=300) + + +def plot_exp3_staleness(exp3_results): + """ + Visual summary of how learner behaves: + - fraction of fresh reads (equal to PBFT) + - fraction of stale reads (value != PBFT) + - fraction of fallbacks (learner couldn't serve) + """ + total = exp3_results["total"] + equal = exp3_results["equal"] + stale = exp3_results["stale"] + fallback = exp3_results["fallback"] + + labels = ["Fresh (equal)", "Stale", "Fallback"] + counts = [equal, stale, fallback] + fractions = [c / total if total > 0 else 0 for c in counts] + + plt.figure() + x = range(len(labels)) + plt.bar(x, fractions) + plt.xticks(x, labels, rotation=15) + plt.ylim(0, 1.0) + + plt.ylabel("Fraction of reads") + plt.title("Experiment 3: Learner read outcomes (fresh vs stale vs fallback)") + plt.grid(axis="y") + plt.tight_layout() + + filename = os.path.join(PLOTS_DIR, "exp3_staleness.pdf") + plt.savefig(filename, dpi=300) + + +def plot_exp3_latency(exp3_results): + """ + Compare learner vs PBFT latency from Experiment 3 + using median and p95 stats. + """ + stats_learner = exp3_results["latency_learner"] + stats_pbft = exp3_results["latency_pbft"] + + metrics = ["median", "p95"] + x = range(len(metrics)) + + pbft_vals = [stats_pbft[m] for m in metrics] + learner_vals = [stats_learner[m] for m in metrics] + + width = 0.35 + + plt.figure() + plt.bar([xi - width/2 for xi in x], pbft_vals, width, label="pbft") + plt.bar([xi + width/2 for xi in x], learner_vals, width, label="learner") + + plt.xticks(list(x), metrics) + plt.ylabel("Latency [seconds]") + plt.title("Experiment 3: Latency comparison (PBFT vs learner)") + plt.legend() + plt.grid(axis="y") + plt.tight_layout() + + filename = os.path.join(PLOTS_DIR, "exp3_latency.pdf") + plt.savefig(filename, dpi=300) + + +def plot_exp4_fault_summary(exp4_results): + """ + Visual summary of learner behavior under faulty replica digests. + + exp4_results is a list of dicts, each like: + { + "threshold": th, + "num_rounds": num_rounds, + "mismatches": mismatches, + "fallbacks": fallbacks, + "impossible_values": [...], + "learner_latencies": {...}, + "pbft_latencies": {...}, + } + """ + import matplotlib.pyplot as plt + import os + + # Extract per-threshold stats + thresholds = [r["threshold"] for r in exp4_results] + mismatches = [r["mismatches"] for r in exp4_results] + fallbacks = [r["fallbacks"] for r in exp4_results] + impossible_counts = [len(r["impossible_values"]) for r in exp4_results] + + x = list(range(len(thresholds))) + width = 0.25 + + plt.figure() + + # Grouped bars: one group per threshold + plt.bar([i - width for i in x], mismatches, width, label="Mismatches") + plt.bar(x, fallbacks, width, label="Fallbacks") + plt.bar([i + width for i in x], impossible_counts, width, label="Impossible values") + + # X-axis labels = thresholds + plt.xticks(x, [str(th) for th in thresholds], rotation=15) + plt.xlabel("Fallback threshold (seconds)") + plt.ylabel("Count") + plt.title("Experiment 4: Learner behavior under faulty digest injection") + plt.legend() + plt.grid(axis="y") + plt.tight_layout() + + filename = os.path.join(PLOTS_DIR, "exp4_fault_summary.pdf") + plt.savefig(filename, dpi=300) + plt.close() + + +PLOTS_DIR = "plots" + +def ensure_plots_dir(): + os.makedirs(PLOTS_DIR, exist_ok=True) + +if __name__ == "__main__": + ensure_plots_dir() + + set_value, get_value, get_value_readonly = build_kv_binding() + + # Run experiments + # exp1_results = exp1_read_latency(set_value, get_value, get_value_readonly) + # plot_exp1_latency(exp1_results, metric="median") + # plot_exp1_latency(exp1_results, metric="p95") + # plot_exp1_throughput(exp1_results) + + # exp2_results = exp2_mixed_workload(set_value, get_value, get_value_readonly) + # plot_exp2_write_throughput(exp2_results) + # plot_exp2_read_throughput(exp2_results) + + # exp3_results = exp3_staleness_and_fallback(set_value, get_value, get_value_readonly) + # plot_exp3_staleness(exp3_results) + # plot_exp3_latency(exp3_results) + + # Only run exp4 when there is a faulty-replica setup ready + exp4_results = exp4_fault_injection_sanity(set_value, get_value, get_value_readonly) + plot_exp4_fault_summary(exp4_results) + + + + + + + + diff --git a/api/plots/exp1_latency_median.pdf b/api/plots/exp1_latency_median.pdf new file mode 100644 index 000000000..e7117ed32 Binary files /dev/null and b/api/plots/exp1_latency_median.pdf differ diff --git a/api/plots/exp1_latency_p95.pdf b/api/plots/exp1_latency_p95.pdf new file mode 100644 index 000000000..5463a15be Binary files /dev/null and b/api/plots/exp1_latency_p95.pdf differ diff --git a/api/plots/exp1_throughput.pdf b/api/plots/exp1_throughput.pdf new file mode 100644 index 000000000..4f9c3aaa0 Binary files /dev/null and b/api/plots/exp1_throughput.pdf differ diff --git a/api/plots/exp2_read_throughput.pdf b/api/plots/exp2_read_throughput.pdf new file mode 100644 index 000000000..86d069684 Binary files /dev/null and b/api/plots/exp2_read_throughput.pdf differ diff --git a/api/plots/exp2_write_throughput.pdf b/api/plots/exp2_write_throughput.pdf new file mode 100644 index 000000000..6b3c8d042 Binary files /dev/null and b/api/plots/exp2_write_throughput.pdf differ diff --git a/api/plots/exp3_latency.pdf b/api/plots/exp3_latency.pdf new file mode 100644 index 000000000..3f850269b Binary files /dev/null and b/api/plots/exp3_latency.pdf differ diff --git a/api/plots/exp3_staleness.pdf b/api/plots/exp3_staleness.pdf new file mode 100644 index 000000000..7a06e8929 Binary files /dev/null and b/api/plots/exp3_staleness.pdf differ diff --git a/api/plots/exp4_fault_summary.pdf b/api/plots/exp4_fault_summary.pdf new file mode 100644 index 000000000..135adb757 Binary files /dev/null and b/api/plots/exp4_fault_summary.pdf differ diff --git a/api/pybind_kv_service.cpp b/api/pybind_kv_service.cpp index 013683fbd..1454ebbbe 100644 --- a/api/pybind_kv_service.cpp +++ b/api/pybind_kv_service.cpp @@ -48,6 +48,18 @@ std::string get(std::string key, std::string config_path) { } } +std::string get_ro(std::string key, std::string config_path) { + ResDBConfig config = GenerateResDBConfig(config_path); + config.SetClientTimeoutMs(100000); + KVClient client(config); + auto result_ptr = client.GetReadOnly(key); + if (result_ptr) { + return *result_ptr; + } else { + return ""; + } +} + bool set(std::string key, std::string value, std::string config_path) { ResDBConfig config = GenerateResDBConfig(config_path); config.SetClientTimeoutMs(100000); @@ -62,5 +74,6 @@ bool set(std::string key, std::string value, std::string config_path) { PYBIND11_MODULE(pybind_kv, m) { m.def("get", &get, "A function that gets a value from the key-value store"); + m.def("get_value_readonly", &get_ro, "A function that gets a value from the key-value store with no tracking"); m.def("set", &set, "A function that sets a value in the key-value store"); } diff --git a/entrypoint.sh b/entrypoint.sh index dbd4ca2c0..5f45e4829 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -19,5 +19,21 @@ # under the License. # -./service/tools/kv/server_tools/start_kv_service.sh -tail -f /dev/null +./service/tools/kv/server_tools/generate_config.sh +./service/tools/kv/server_tools/start_kv_service_learner.sh + +if [[ "$2" =~ ^[0-9]+$ ]]; then + lines=$2 +else + lines=2 +fi + +if [ "$1" = "-w" ]; then + watch -n 1 tail -n "$lines" logs/*.log +elif [ "$1" = "-l" ]; then + tail -f logs/learner.log +elif [ "$1" = "-a" ]; then + tail -f logs/*.log +else + tail -f /dev/null +fi diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp index c587f592c..3703a635e 100644 --- a/executor/kv/kv_executor.cpp +++ b/executor/kv/kv_executor.cpp @@ -48,6 +48,8 @@ std::unique_ptr KVExecutor::ExecuteRequest( Set(kv_request.key(), kv_request.value()); } else if (kv_request.cmd() == KVRequest::GET) { kv_response.set_value(Get(kv_request.key())); + } else if (kv_request.cmd() == KVRequest::GET_READ_ONLY) { + kv_response.set_value(GetReadOnly(kv_request.key())); } else if (kv_request.cmd() == KVRequest::GETALLVALUES) { kv_response.set_value(GetAllValues()); } else if (kv_request.cmd() == KVRequest::GETRANGE) { @@ -99,6 +101,8 @@ std::unique_ptr KVExecutor::ExecuteData( Set(kv_request.key(), kv_request.value()); } else if (kv_request.cmd() == KVRequest::GET) { kv_response.set_value(Get(kv_request.key())); + } else if (kv_request.cmd() == KVRequest::GET_READ_ONLY) { + kv_response.set_value(GetReadOnly(kv_request.key())); } else if (kv_request.cmd() == KVRequest::GETALLVALUES) { kv_response.set_value(GetAllValues()); } else if (kv_request.cmd() == KVRequest::GETRANGE) { @@ -144,6 +148,12 @@ std::string KVExecutor::Get(const std::string& key) { return storage_->GetValue(key); } +std::string KVExecutor::GetReadOnly(const std::string& key) { + LOG(ERROR)<<" get read only key:"<GetValue(key); +} + std::string KVExecutor::GetAllValues() { return storage_->GetAllValues(); } // Get values on a range of keys diff --git a/executor/kv/kv_executor.h b/executor/kv/kv_executor.h index fef125972..f7a85e2c6 100644 --- a/executor/kv/kv_executor.h +++ b/executor/kv/kv_executor.h @@ -43,6 +43,7 @@ class KVExecutor : public TransactionManager { protected: virtual void Set(const std::string& key, const std::string& value); std::string Get(const std::string& key); + std::string GetReadOnly(const std::string& key); std::string GetAllValues(); std::string GetRange(const std::string& min_key, const std::string& max_key); diff --git a/interface/kv/kv_client.cpp b/interface/kv/kv_client.cpp index 25526395b..a4bde62bc 100644 --- a/interface/kv/kv_client.cpp +++ b/interface/kv/kv_client.cpp @@ -47,6 +47,19 @@ std::unique_ptr KVClient::Get(const std::string& key) { return std::make_unique(response.value()); } +std::unique_ptr KVClient::GetReadOnly(const std::string& key) { + KVRequest request; + request.set_cmd(KVRequest::GET_READ_ONLY); + request.set_key(key); + KVResponse response; + int ret = SendReadOnlyRequest(request, &response); + if (ret != 0) { + LOG(ERROR) << "send request fail, ret:" << ret; + return nullptr; + } + return std::make_unique(response.value()); +} + std::unique_ptr KVClient::GetAllValues() { KVRequest request; request.set_cmd(KVRequest::GETALLVALUES); diff --git a/interface/kv/kv_client.h b/interface/kv/kv_client.h index 52cbcab19..cc3cc9435 100644 --- a/interface/kv/kv_client.h +++ b/interface/kv/kv_client.h @@ -58,6 +58,7 @@ class KVClient : public TransactionConstructor { // above. int Set(const std::string& key, const std::string& data); std::unique_ptr Get(const std::string& key); + std::unique_ptr GetReadOnly(const std::string& key); std::unique_ptr GetAllValues(); std::unique_ptr GetRange(const std::string& min_key, const std::string& max_key); diff --git a/interface/rdbc/transaction_constructor.cpp b/interface/rdbc/transaction_constructor.cpp index be42224aa..afea31871 100644 --- a/interface/rdbc/transaction_constructor.cpp +++ b/interface/rdbc/transaction_constructor.cpp @@ -85,5 +85,46 @@ int TransactionConstructor::SendRequest( } return -1; } +// NEW: Send a read-only request. If learner nodes are configured, route the +// request to a learner; otherwise fall back to a regular replica. + +int TransactionConstructor::SendReadOnlyRequest( + const google::protobuf::Message& message, Request::Type type) { + const auto& learners = config_.GetLearnerInfos(); + if (!learners.empty()) { + NetChannel::SetDestReplicaInfo(learners[0]); + } else { + NetChannel::SetDestReplicaInfo(config_.GetReplicaInfos()[0]); + } + + return NetChannel::SendRequest(message, type, false); +} + +// NEW: Send a read-only request and wait for a response. Routed to learners +// when available, with fallback to replicas. +int TransactionConstructor::SendReadOnlyRequest( + const google::protobuf::Message& message, + google::protobuf::Message* response, Request::Type type) { + const auto& learners = config_.GetLearnerInfos(); + if (!learners.empty()) { + NetChannel::SetDestReplicaInfo(learners[0]); + } else { + NetChannel::SetDestReplicaInfo(config_.GetReplicaInfos()[0]); + } + + int ret = NetChannel::SendRequest(message, type, true); + if (ret == 0) { + std::string resp_str; + int recv_ret = NetChannel::RecvRawMessageData(&resp_str); + if (recv_ret >= 0) { + if (!response->ParseFromString(resp_str)) { + LOG(ERROR) << "parse response fail:" << resp_str.size(); + return -2; + } + return 0; + } + } + return -1; +} } // namespace resdb diff --git a/interface/rdbc/transaction_constructor.h b/interface/rdbc/transaction_constructor.h index 39754a846..bb05b6173 100644 --- a/interface/rdbc/transaction_constructor.h +++ b/interface/rdbc/transaction_constructor.h @@ -39,7 +39,15 @@ class TransactionConstructor : public NetChannel { int SendRequest(const google::protobuf::Message& message, google::protobuf::Message* response, Request::Type type = Request::TYPE_CLIENT_REQUEST); + + // NEW: Send read-only request (routed to learners if available). + int SendReadOnlyRequest(const google::protobuf::Message& message, + Request::Type type = Request::TYPE_CLIENT_REQUEST); + // NEW: Send read-only request and wait for a response. + int SendReadOnlyRequest(const google::protobuf::Message& message, + google::protobuf::Message* response, + Request::Type type = Request::TYPE_CLIENT_REQUEST); private: absl::StatusOr GetResponseData(const Response& response); diff --git a/logs/.gitkeep b/logs/.gitkeep new file mode 100644 index 000000000..5fb255256 --- /dev/null +++ b/logs/.gitkeep @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# \ No newline at end of file diff --git a/platform/config/resdb_config.cpp b/platform/config/resdb_config.cpp index 23129a258..1ed26c0ad 100644 --- a/platform/config/resdb_config.cpp +++ b/platform/config/resdb_config.cpp @@ -58,6 +58,14 @@ ResDBConfig::ResDBConfig(const ResConfigData& config_data, break; } } + learners_.clear(); + for (const auto& learner : config_data_.learner_info()) { + learners_.push_back(learner); + } + + if (config_data_.block_size() > 0) { + block_size_ = config_data_.block_size(); + } if (config_data_.view_change_timeout_ms() == 0) { config_data_.set_view_change_timeout_ms(viewchange_commit_timeout_ms_); } @@ -95,11 +103,27 @@ void ResDBConfig::SetConfigData(const ResConfigData& config_data) { break; } } + learners_.clear(); + for (const auto& learner : config_data_.learner_info()) { + learners_.push_back(learner); + } + if (config_data_.block_size() > 0) { + block_size_ = config_data_.block_size(); + } if (config_data_.view_change_timeout_ms() == 0) { config_data_.set_view_change_timeout_ms(viewchange_commit_timeout_ms_); } } +const std::vector& ResDBConfig::GetLearnerInfos() const { + return learners_; +} + +void ResDBConfig::AddLearnerInfo(const ReplicaInfo& learner) { + *config_data_.add_learner_info() = learner; + learners_.push_back(learner); +} + KeyInfo ResDBConfig::GetPrivateKey() const { return private_key_; } CertificateInfo ResDBConfig::GetPublicKeyCertificateInfo() const { diff --git a/platform/config/resdb_config.h b/platform/config/resdb_config.h index 9867c6d5a..931a8ec93 100644 --- a/platform/config/resdb_config.h +++ b/platform/config/resdb_config.h @@ -121,9 +121,19 @@ class ResDBConfig { uint32_t GetViewchangeCommitTimeout() const; void SetViewchangeCommitTimeout(uint64_t timeout_ms); + // learner + int GetBlockSize() const { return block_size_; } + void SetBlockSize(int bs) { block_size_ = bs; } + const std::vector& GetLearnerInfos() const; + void AddLearnerInfo(const ReplicaInfo&); + private: ResConfigData config_data_; std::vector replicas_; + + // NEW: Learner nodes for read only fast path + // std::vector learners_; + ReplicaInfo self_info_; const KeyInfo private_key_; const CertificateInfo public_key_cert_info_; @@ -147,6 +157,10 @@ class ResDBConfig { uint32_t input_worker_num_ = 5; uint32_t output_worker_num_ = 5; uint32_t client_batch_num_ = 100; + + // learner + uint32_t block_size_ = 10; + std::vector learners_; }; } // namespace resdb diff --git a/platform/consensus/ordering/pbft/BUILD b/platform/consensus/ordering/pbft/BUILD index 6f4fdff70..2f2c5de00 100644 --- a/platform/consensus/ordering/pbft/BUILD +++ b/platform/consensus/ordering/pbft/BUILD @@ -45,6 +45,8 @@ cc_library( ":lock_free_collector_pool", ":transaction_utils", "//platform/networkstrate:replica_communicator", + "//interface/rdbc:net_channel", + "//proto/kv:kv_cc_proto", ], ) @@ -103,7 +105,8 @@ cc_library( hdrs = ["commitment.h"], deps = [ ":message_manager", - ":response_manager", + ":response_manager", + "//common/crypto:hash", "//common/utils", "//platform/common/queue:batch_queue", "//platform/config:resdb_config", diff --git a/platform/consensus/ordering/pbft/commitment.cpp b/platform/consensus/ordering/pbft/commitment.cpp index 4d048331e..78ef4df98 100644 --- a/platform/consensus/ordering/pbft/commitment.cpp +++ b/platform/consensus/ordering/pbft/commitment.cpp @@ -23,6 +23,8 @@ #include #include "common/utils/utils.h" +#include "common/crypto/hash.h" + #include "platform/consensus/ordering/pbft/transaction_utils.h" namespace resdb { @@ -237,6 +239,7 @@ int Commitment::ProcessProposeMsg(std::unique_ptr context, // If receive 2f+1 prepare message, broadcast a commit message. int Commitment::ProcessPrepareMsg(std::unique_ptr context, std::unique_ptr request) { + if (context == nullptr || context->signature.signature().empty()) { LOG(ERROR) << "user request doesn't contain signature, reject"; return -2; @@ -322,6 +325,24 @@ int Commitment::PostProcessExecutedMsg() { // LOG(ERROR)<<"send back to proxy:"<proxy_id(); batch_resp->SerializeToString(request.mutable_data()); replica_communicator_->SendMessage(request, request.proxy_id()); + uint64_t set_cnt = batch_resp->set_count(); + uint64_t read_cnt = batch_resp->read_count(); + uint64_t delete_cnt = batch_resp->delete_count(); + + // send an update to learners after every block size execution + if (request.seq() % config_.GetBlockSize() == 0) { + SendUpdateToLearners(request.seq()); + } + + // // Also mirror the executed request to every learner so they can consume + // // the committed stream. + // for (const auto& learner : config_.GetLearnerInfos()) { + // LOG(ERROR) << "Forward commit to learner id " << learner.id() + // << " seq " << request.seq() + // << " set=" << set_cnt << " read=" << read_cnt + // << " delete=" << delete_cnt; + // replica_communicator_->SendMessage(request, learner.id()); + // } } return 0; } @@ -330,4 +351,101 @@ DuplicateManager* Commitment::GetDuplicateManager() { return duplicate_manager_.get(); } +void Commitment::SendUpdateToLearners(int seq_num) { + + // get stored requests from message manager + std::vector> reqs_to_learner = message_manager_->getLearnerUpdateRequests(); + + std::string raw_data; + + // adds all requests into one batch variable + RequestBatch batch; + for (const auto& r : reqs_to_learner) { + batch.add_requests()->Swap(r.get()); + } + + // turn the entire batch variable into string + batch.SerializeToString(&raw_data); + + // generate hash of entire batch + std::string block_hash = resdb::utils::CalculateSHA256Hash(raw_data); + + // generate correct row of Vandermode matrix + std::vector A_i = gen_A_row(); + + // perform message distribution algorithm + std::vector F_i; + + uint32_t excess_bytes = 0; + while (raw_data.size() % A_i.size() != 0) { + raw_data += "0"; + excess_bytes++; + } + + uint32_t iter = 0; + uint32_t c_ik = 0; + + for (int d = 0; d < raw_data.size(); d++) { // data MUST BE A MULTIPLE OF m + c_ik = (c_ik + A_i[iter] * (uint8_t)static_cast(raw_data[d])) % 257; + + iter++; + if (iter == A_i.size()) { + F_i.push_back(c_ik); + + c_ik = 0; + iter = 0; + } + } + + // add data to learner update message and send it + LearnerUpdate learnerUpdate; + learnerUpdate.mutable_data()->Reserve(static_cast(F_i.size())); + for (uint32_t v : F_i) { + learnerUpdate.add_data(v); + } + learnerUpdate.set_block_hash(block_hash); + learnerUpdate.set_seq(seq_num); + learnerUpdate.set_sender_id(config_.GetSelfInfo().id()); + learnerUpdate.set_excess_bytes(excess_bytes); + + for (const auto& learner : config_.GetLearnerInfos()) { + replica_communicator_->SendMessage(learnerUpdate, learner.id()); + } + +} + +std::vector Commitment::gen_A_row() { + + int n = config_.GetReplicaNum(); + int m = config_.GetMinClientReceiveNum(); + int p = 257; + + std::vector A_i(m); + + int i = config_.GetSelfInfo().id(); + + for (int j = 0; j < m; j++) { + + int exp = j; + int base = i; + int mod = p; + int result = 1; + + while (exp > 0) { + if (exp & 1) { + result = (result * base) % mod; + } + + base = (base * base) % mod; + exp >>= 1; + } + + A_i[j] = result; + + } + + return A_i; + +} + } // namespace resdb diff --git a/platform/consensus/ordering/pbft/commitment.h b/platform/consensus/ordering/pbft/commitment.h index 03d77cf3d..4cdd4933d 100644 --- a/platform/consensus/ordering/pbft/commitment.h +++ b/platform/consensus/ordering/pbft/commitment.h @@ -59,6 +59,10 @@ class Commitment { protected: virtual int PostProcessExecutedMsg(); + // distributed message functions + std::vector gen_A_row(); + void SendUpdateToLearners(int seq_num); + protected: ResDBConfig config_; MessageManager* message_manager_; @@ -74,6 +78,7 @@ class Commitment { std::mutex mutex_; std::unique_ptr duplicate_manager_; + }; } // namespace resdb diff --git a/platform/consensus/ordering/pbft/message_manager.cpp b/platform/consensus/ordering/pbft/message_manager.cpp index cc5e187c6..eb88ed114 100644 --- a/platform/consensus/ordering/pbft/message_manager.cpp +++ b/platform/consensus/ordering/pbft/message_manager.cpp @@ -22,6 +22,7 @@ #include #include "common/utils/utils.h" +#include "proto/kv/kv.pb.h" namespace resdb { @@ -48,6 +49,47 @@ MessageManager::MessageManager( resp_msg->set_seq(request->seq()); resp_msg->set_current_view(request->current_view()); resp_msg->set_primary_id(GetCurrentPrimary()); + uint64_t set_cnt = 0; + uint64_t read_cnt = 0; + uint64_t delete_cnt = 0; + BatchUserRequest batch_request; + if (batch_request.ParseFromString(request->data())) { + for (const auto& user_req : batch_request.user_requests()) { + resdb::KVRequest kv_request; + if (!kv_request.ParseFromString(user_req.request().data())) { + continue; + } + switch (kv_request.cmd()) { + case resdb::KVRequest::SET: + case resdb::KVRequest::SET_WITH_VERSION: + ++set_cnt; + break; + case resdb::KVRequest::GET: + case resdb::KVRequest::GETALLVALUES: + case resdb::KVRequest::GETRANGE: + case resdb::KVRequest::GET_WITH_VERSION: + case resdb::KVRequest::GET_ALL_ITEMS: + case resdb::KVRequest::GET_KEY_RANGE: + case resdb::KVRequest::GET_HISTORY: + case resdb::KVRequest::GET_TOP: + ++read_cnt; + break; + default: + break; + } + } + } + resp_msg->set_set_count(set_cnt); + resp_msg->set_read_count(read_cnt); + resp_msg->set_delete_count(delete_cnt); + + // copying request for learner update sake + auto request_copy = std::unique_ptr(request->New()); + request_copy->CopyFrom(*request); + + // adds request to the vector of requests waiting to be sent to learners + learner_update_requests_.push_back(std::move(request_copy)); + if (transaction_executor_->NeedResponse() && resp_msg->proxy_id() != 0) { queue_.Push(std::move(resp_msg)); @@ -290,4 +332,8 @@ LockFreeCollectorPool* MessageManager::GetCollectorPool() { return collector_pool_.get(); } +std::vector> MessageManager::getLearnerUpdateRequests() { + return std::move(learner_update_requests_); +} + } // namespace resdb diff --git a/platform/consensus/ordering/pbft/message_manager.h b/platform/consensus/ordering/pbft/message_manager.h index c51fd22a5..ddb6337f6 100644 --- a/platform/consensus/ordering/pbft/message_manager.h +++ b/platform/consensus/ordering/pbft/message_manager.h @@ -112,6 +112,9 @@ class MessageManager { LockFreeCollectorPool* GetCollectorPool(); + // function to retrieved stored learner updates + std::vector> getLearnerUpdateRequests(); + private: bool IsValidMsg(const Request& request); @@ -139,6 +142,8 @@ class MessageManager { std::mutex lct_lock_; std::map last_committed_time_; + + std::vector> learner_update_requests_; }; } // namespace resdb diff --git a/platform/consensus/ordering/pbft/response_manager.cpp b/platform/consensus/ordering/pbft/response_manager.cpp index f490c0039..2876bfbcb 100644 --- a/platform/consensus/ordering/pbft/response_manager.cpp +++ b/platform/consensus/ordering/pbft/response_manager.cpp @@ -22,6 +22,8 @@ #include #include "common/utils/utils.h" +#include "interface/rdbc/net_channel.h" +#include "proto/kv/kv.pb.h" namespace resdb { @@ -72,6 +74,10 @@ ResponseManager::ResponseManager(const ResDBConfig& config, } global_stats_ = Stats::GetGlobalStats(); send_num_ = 0; + + if (!config_.GetLearnerInfos().empty()) { + learner_info_ = config_.GetLearnerInfos()[0]; + } } ResponseManager::~ResponseManager() { @@ -102,6 +108,10 @@ std::vector> ResponseManager::FetchContextList( int ResponseManager::NewUserRequest(std::unique_ptr context, std::unique_ptr user_request) { + if (ForwardReadOnlyToLearner(context.get(), user_request.get())) { + return 0; + } + if (!user_request->need_response()) { context->client = nullptr; } @@ -407,4 +417,51 @@ void ResponseManager::MonitoringClientTimeOut() { } } } + +bool ResponseManager::ForwardReadOnlyToLearner(Context* context, + Request* user_request) { + if (!learner_info_.has_value() || context == nullptr || + context->client == nullptr || user_request == nullptr) { + return false; + } + if (user_request->type() != Request::TYPE_CLIENT_REQUEST) { + return false; + } + + KVRequest kv_request; + if (!kv_request.ParseFromString(user_request->data())) { + return false; + } + if (kv_request.cmd() != KVRequest::GET_READ_ONLY) { + return false; + } + + NetChannel learner_channel(learner_info_->ip(), learner_info_->port()); + learner_channel.SetRecvTimeout(100000); // 100ms + + std::string payload; + if (!kv_request.SerializeToString(&payload)) { + return false; + } + + if (learner_channel.SendRawMessageData(payload) != 0) { + return false; + } + + // Await learner response; if nothing returns, fall back to PBFT. + std::string learner_resp; + if (learner_channel.RecvRawMessageData(&learner_resp) <= 0) { + return false; + } + + KVResponse learner_response; + if (!learner_response.ParseFromString(learner_resp)) { + return false; + } + + if (context->client->SendRawMessageData(learner_resp) != 0) { + return false; + } + return true; +} } // namespace resdb diff --git a/platform/consensus/ordering/pbft/response_manager.h b/platform/consensus/ordering/pbft/response_manager.h index 2972171eb..53df26a24 100644 --- a/platform/consensus/ordering/pbft/response_manager.h +++ b/platform/consensus/ordering/pbft/response_manager.h @@ -19,6 +19,8 @@ #pragma once #include +#include +#include #include "platform/config/resdb_config.h" #include "platform/consensus/ordering/pbft/lock_free_collector_pool.h" @@ -82,6 +84,7 @@ class ResponseManager { void ResponseTimer(std::string hash); void MonitoringClientTimeOut(); std::unique_ptr GetTimeOutRequest(std::string hash); + bool ForwardReadOnlyToLearner(Context* context, Request* user_request); private: ResDBConfig config_; @@ -104,6 +107,7 @@ class ResponseManager { sem_t request_sent_signal_; uint64_t highest_seq_; uint64_t highest_seq_primary_id_; + std::optional learner_info_; }; } // namespace resdb diff --git a/platform/learner/BUILD b/platform/learner/BUILD new file mode 100644 index 000000000..852205e4f --- /dev/null +++ b/platform/learner/BUILD @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "learner_lib", + srcs = [ + "learner.cpp", + ], + hdrs = ["learner.h"], + deps = [ + "//chain/storage:leveldb", + "//common:json", + "//platform/common/network:tcp_socket", + "//platform/proto:resdb_cc_proto", + "//proto/kv:kv_cc_proto", + "@com_github_google_glog//:glog", + "//common/crypto:hash", + ], +) + +cc_binary( + name = "learner", + srcs = ["learner_main.cpp"], + deps = [ + ":learner_lib", + ], +) diff --git a/platform/learner/learner.config b/platform/learner/learner.config new file mode 100644 index 000000000..363cabf1a --- /dev/null +++ b/platform/learner/learner.config @@ -0,0 +1,12 @@ +{ + "ip": "127.0.0.1", + "port": 25000, + "block_size": 10, + "replicas": [ + { "id": 1, "ip": "127.0.0.1", "port": 20000 }, + { "id": 2, "ip": "127.0.0.1", "port": 20001 }, + { "id": 3, "ip": "127.0.0.1", "port": 20002 }, + { "id": 4, "ip": "127.0.0.1", "port": 20003 } + ] +} + \ No newline at end of file diff --git a/platform/learner/learner.cpp b/platform/learner/learner.cpp new file mode 100644 index 000000000..7a5354440 --- /dev/null +++ b/platform/learner/learner.cpp @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "platform/learner/learner.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "chain/storage/leveldb.h" +#include "platform/common/network/socket.h" +#include "platform/common/network/tcp_socket.h" +#include "platform/proto/resdb.pb.h" +#include "proto/kv/kv.pb.h" +#include "platform/statistic/stats.h" +#include "common/crypto/hash.h" + +namespace { + +std::string DefaultConfigPath() { return "platform/learner/learner.config"; } + +LearnerConfig ParseLearnerConfig(const std::string& config_path) { + std::ifstream config_stream(config_path); + if (!config_stream.is_open()) { + throw std::runtime_error("Unable to open learner config: " + config_path); + } + + nlohmann::json config_json = + nlohmann::json::parse(config_stream, nullptr, true, true); + LearnerConfig config; + config.ip = config_json.value("ip", config.ip); + config.port = config_json.value("port", 0); + config.block_size = config_json.value("block_size", 0); + config.db_path = config_json.value("db_path", ""); + if (config.db_path.empty() && config.port > 0) { + config.db_path = std::to_string(config.port) + "_db/"; + } + + if (config_json.contains("replicas") && config_json["replicas"].is_array()) { + for (const auto& replica_json : config_json["replicas"]) { + resdb::ReplicaInfo replica_info; + replica_info.set_id(replica_json.value("id", 0)); + replica_info.set_ip(replica_json.value("ip", "127.0.0.1")); + replica_info.set_port(replica_json.value("port", 0)); + if (replica_info.port() != 0) { + config.replicas.push_back(replica_info); + } + } + config.total_replicas = config.replicas.size(); + config.min_needed = (config.total_replicas - 1) / 3 + 1; + } + + if (config.port <= 0) { + throw std::runtime_error("Learner config must include a valid port"); + } + if (config.block_size <= 0) { + throw std::runtime_error("Learner config must include a positive block_size"); + } + return config; +} + +} // namespace + +Learner::Learner(const std::string& config_path) + : config_(LoadConfig(config_path)) { + InitializeStorage(); + resdb::Stats::GetGlobalStats()->Stop(); +} + +LearnerConfig Learner::LoadConfig(const std::string& config_path) { + const std::string path = config_path.empty() ? DefaultConfigPath() : config_path; + return ParseLearnerConfig(path); +} + +void Learner::Run() { + resdb::TcpSocket server; + if (server.Listen(config_.ip, config_.port) != 0) { + throw std::runtime_error("Learner failed to bind to " + config_.ip + ":" + + std::to_string(config_.port)); + } + + is_running_.store(true); + metrics_thread_ = std::thread(&Learner::MetricsLoop, this); + + while (is_running_.load()) { + auto client = server.Accept(); + if (!client) { + continue; + } + std::thread(&Learner::HandleClient, this, std::move(client)).detach(); + } +} + +void Learner::HandleClient(std::unique_ptr socket) { + // std::cout << "[Learner] replica connected" << std::endl; + while (true) { + void* buffer = nullptr; + size_t len = 0; + int ret = socket->Recv(&buffer, &len); + if (ret <= 0) { + if (buffer != nullptr) { + free(buffer); + } + break; + } + + std::string payload(static_cast(buffer), len); + free(buffer); + if (!ProcessBroadcast(socket.get(), payload)) { + break; + } + } + // std::cout << "[Learner] replica disconnected" << std::endl; +} + +bool Learner::ProcessBroadcast(resdb::Socket* socket, + const std::string& payload) { + + resdb::ResDBMessage envelope; + if (!envelope.ParseFromString(payload)) { + + resdb::KVRequest kv_request; + if (kv_request.ParseFromString(payload)) { + if (HandleReadOnlyRequest(socket, kv_request)) { + LOG(INFO) << "served read-only key=" << kv_request.key(); + return false; + } + ++total_messages_; + total_bytes_.fetch_add(payload.size()); + last_type_.store(kv_request.cmd()); + last_seq_.store(0); + last_sender_.store(-1); + last_payload_bytes_.store(payload.size()); + return false; + } + ++total_messages_; + total_bytes_.fetch_add(payload.size()); + last_type_.store(-1); + last_seq_.store(0); + last_sender_.store(-1); + last_payload_bytes_.store(payload.size()); + return true; + } + + resdb::LearnerUpdate request; + if (request.ParseFromString(envelope.data())) { + + if (request.sender_id() == 0) return true; + + HandleLearnerUpdate(request); + + ++total_messages_; + total_bytes_.fetch_add(payload.size()); + + return true; + } + + ++total_messages_; + total_bytes_.fetch_add(envelope.data().size()); + last_type_.store(-1); + last_seq_.store(0); + last_sender_.store(-1); + last_payload_bytes_.store(envelope.data().size()); + return true; +} + +void Learner::HandleLearnerUpdate(resdb::LearnerUpdate learnerUpdate) { + + std::lock_guard lock(m); + + std::string block_hash = learnerUpdate.block_hash(); + int c_seq = learnerUpdate.seq(); + int sender_id = learnerUpdate.sender_id(); + int excess_bytes = learnerUpdate.excess_bytes(); + + int blockIndex = c_seq / config_.block_size - 1; + + // fill sequence status with "havent started" if variable doesnt exist yet + while (sequence_status.size() < blockIndex + 1) { + sequence_status.push_back(0); + } + + switch (sequence_status[blockIndex]) { + case 0: { + hashCounts.push_back(std::tuple(blockIndex, block_hash, 1)); + learnerUpdates.push_back(learnerUpdate); + sequence_status[blockIndex] = 1; + + if (1 >= config_.min_needed) { + sequence_status[blockIndex] = 2; + } + + break; + } + case 1: { + } + case 2: { + std::tuple *valid_hc = nullptr; + + if (sequence_status[blockIndex] == 1) { + learnerUpdates.push_back(learnerUpdate); + + std::tuple *curr_hc = nullptr; + for (std::tuple& hc : hashCounts) { + if (std::get<0>(hc) != blockIndex) continue; + + if (std::get<1>(hc) == block_hash) { + std::get<2>(hc) = std::get<2>(hc) + 1; + curr_hc = &hc; + break; + } + } + + if (curr_hc == nullptr) { + hashCounts.push_back(std::tuple(blockIndex, block_hash, 1)); + curr_hc = &hashCounts.back(); + } + + if (std::get<2>(*curr_hc) >= config_.min_needed) { + sequence_status[blockIndex] = 2; + valid_hc = curr_hc; + } else { + break; + } + } + + if (valid_hc == nullptr) { // will run if input case: 2 + for (std::tuple& hc : hashCounts) { + if (std::get<0>(hc) != blockIndex) continue; + + if (std::get<2>(hc) >= config_.min_needed && std::get<1>(hc) == block_hash) { // only set if hash is valid and hash matches received hash + learnerUpdates.push_back(learnerUpdate); + valid_hc = &hc; + break; + } + } + } + + if (std::get<1>(*valid_hc) == block_hash) { + + std::vector lus; + for (resdb::LearnerUpdate lu : learnerUpdates) { + if (lu.block_hash() == block_hash) { + lus.push_back(lu); + } + } + + bool first = true; + std::vector choice(lus.size(), false); + for (int i = 0; i < config_.min_needed; i++) { + choice[i] = true; + } + + while (GetNextCombination(choice, first)) { + first = false; + if (!choice[choice.size()-1]) continue; // include the current received message + + std::vector inds; + std::vector rep_ids; + for (int i = 0; i < choice.size(); i++) { + if (choice[i]) { + inds.push_back(i); + rep_ids.push_back(lus[i].sender_id()); + } + } + + std::vector> A = gen_A(rep_ids); + + std::vector> Ainv = invertMatrix(A, 257); + + std::string raw_bytes; + for (auto i_iter = 0; i_iter < learnerUpdate.data().size(); i_iter++) { // iterate through each block of m bytes (assumption, lengths are equal) + + for (auto b_iter = 0; b_iter < config_.min_needed; b_iter++) { // iterate through each byte + + uint32_t mid = 0; + for (int m_iter = 0; m_iter < config_.min_needed; m_iter++) { // do dot product + + mid = (mid + Ainv[b_iter][m_iter] * lus[inds[m_iter]].data()[i_iter]) % 257; + + } + + raw_bytes.push_back(static_cast(mid)); + + } + + } + + for (int i = 0; i < excess_bytes; i++) { + raw_bytes.pop_back(); + } + + // std::ofstream out("learner_resTEST.txt"); + // for (int i = 0; i < raw_bytes.size(); i++) { + // for (int j=1; j < 256; j*=2) { + // out << (raw_bytes[i] & j) << " "; + // } + // } + + std::string final_hash = resdb::utils::CalculateSHA256Hash(raw_bytes); + + if (final_hash == block_hash) { // we have reconstructed the batch + + LOG(INFO) << "RECONSTRUCTED " << c_seq << " FROM " << rep_ids[0] << " " << rep_ids[1]; + + resdb::RequestBatch batch; + if (batch.ParseFromArray(raw_bytes.data(), static_cast(raw_bytes.size()))) { + resdb::BatchUserRequest batch_request; + for (int i = 0; i < batch.requests_size(); ++i) { + batch_request.ParseFromString(batch.requests()[i].data()); + + for (int j = 0; j < batch_request.user_requests().size(); j++) { + ExecuteRequest(batch_request.user_requests()[j].request().data()); + } + } + + } + + sequence_status[blockIndex] = 3; + return; + } + + } + + } + break; + } + + } + +} + + +void Learner::ExecuteRequest(std::string requestStr) { + resdb::KVRequest kv_request; + if (!kv_request.ParseFromString(requestStr)) { + return; + } + + if (kv_request.cmd() == resdb::KVRequest::SET) { + storage_->SetValue(kv_request.key(), kv_request.value()); + if (known_keys_.find(kv_request.key()) == known_keys_.end()) { + known_keys_.insert(kv_request.key()); + } + } else if (kv_request.cmd() == resdb::KVRequest::SET_WITH_VERSION) { + storage_->SetValueWithVersion(kv_request.key(), kv_request.value(), kv_request.version()); + if (known_keys_.find(kv_request.key()) == known_keys_.end()) { + known_keys_.insert(kv_request.key()); + } + } +} + +uint32_t Learner::modpow(uint32_t a, uint32_t e, uint32_t p) { + uint32_t r = 1; + while (e > 0) { + if (e & 1) r = (r * a) % p; + a = (a * a) % p; + e >>= 1; + } + return r; +} + +uint32_t Learner::modinv(uint32_t x, uint32_t p) { + return modpow(x, p - 2, p); +} + +std::vector> Learner::invertMatrix(std::vector> A, int p) { + int n = A.size(); + + // Form augmented matrix [A | I] + std::vector> aug(n, std::vector(2*n)); + for (int i = 0; i < n; i++) { + for (int j = 0; j < n; j++) { + aug[i][j] = A[i][j] % p; + + aug[i][n+j] = 0; + if (i == j) aug[i][n+j] = 1; + } + + } + + // Gauss-Jordan + for (int col = 0; col < n; col++) { + + // Find pivot row + int pivot = col; + while (pivot < n && aug[pivot][col] == 0) pivot++; + if (pivot == n) throw std::runtime_error("Matrix is not invertible mod p"); + + swap(aug[col], aug[pivot]); + + // Normalize pivot row + int32_t inv = modinv(aug[col][col], p); + for (int j = 0; j < 2*n; j++) + aug[col][j] = (aug[col][j] * inv) % p; + + // Eliminate other rows + for (int i = 0; i < n; i++) { + if (i == col) continue; + int32_t factor = aug[i][col]; + for (int j = 0; j < 2*n; j++) { + int32_t temp = (aug[i][j] - factor * aug[col][j]) % p; + if (temp < 0) temp += p; + aug[i][j] = temp; + } + } + + } + + // Extract inverse matrix + std::vector> invA(n, std::vector(n)); + for (int i = 0; i < n; i++) + for (int j = 0; j < n; j++) + invA[i][j] = aug[i][n+j]; + + return invA; +} + +std::vector> Learner::gen_A(std::vector inds) { + + int n = inds.size(); + int m = config_.min_needed; + int p = 257; + + std::vector> A(n,std::vector(m)); + + for (int i = 0; i < n; i++) { + + for (int j = 0; j < m; j++) { + + int exp = j; + int base = inds[i]; + int mod = p; + int result = 1; + + while (exp > 0) { + if (exp & 1) { + result = (result * base) % mod; + } + + base = (base * base) % mod; + exp >>= 1; + } + + A[i][j] = result; + + } + + } + + return A; + +} + +bool Learner::GetNextCombination(std::vector &choice, bool first) { + if (first) return true; + + int stage = 0; + int counter = 0; + for (int i = choice.size() - 1; i >= 0; i--) { + if (stage == 0) { + if (choice[i] == 1) { + counter++; + choice[i] = 0; + } else { + stage = 1; + } + } else if (stage == 1) { + if (choice[i] == 1) { + counter++; + choice[i] = 0; + stage = 2; + + while (counter > 0) { + choice[i+counter] = 1; + counter--; + } + } + } + } + + return stage == 2; +} + +void Learner::MetricsLoop() const { + uint64_t last_messages = 0; + uint64_t last_bytes = 0; + while (is_running_.load()) { + uint64_t current_messages = total_messages_.load(); + uint64_t current_bytes = total_bytes_.load(); + if (current_messages != last_messages || current_bytes != last_bytes) { + PrintMetrics(); + last_messages = current_messages; + last_bytes = current_bytes; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +bool Learner::HandleReadOnlyRequest(resdb::Socket* socket, + const resdb::KVRequest& request) const { + if (request.cmd() != resdb::KVRequest::GET_READ_ONLY || storage_ == nullptr) { + return false; + } + + if (known_keys_.find(request.key()) == known_keys_.end()) { + return false; + } + + std::string value = storage_->GetValue(request.key()); + if (value.empty()) { + return false; + } + + resdb::KVResponse response; + response.set_key(request.key()); + response.set_value(value); + + std::string resp_str; + if (!response.SerializeToString(&resp_str)) { + LOG(ERROR) << "[Learner] failed to serialize KVResponse"; + return false; + } + + if (socket->Send(resp_str) < 0) { + LOG(ERROR) << "[Learner] failed to send read-only response"; + return false; + } + + return true; +} + +void Learner::PrintMetrics() const { + LOG(INFO) << "msgs=" << total_messages_.load() + << " bytes=" << total_bytes_.load() + << " sets=" << total_sets_.load() + << " reads=" << total_reads_.load() + << " deletes=" << total_deletes_.load() + << " last_type=" << last_type_.load() + << " last_seq=" << last_seq_.load() + << " sender=" << last_sender_.load() + << " payload=" << last_payload_bytes_.load() << "B"; +} + +void Learner::InitializeStorage() { + if (config_.db_path.empty()) { + config_.db_path = std::to_string(config_.port) + "_db/"; + } + storage_ = resdb::storage::NewResLevelDB(config_.db_path); + if (!storage_) { + throw std::runtime_error("Learner failed to initialize storage at " + + config_.db_path); + } + known_keys_.clear(); + storage_->SetValue("test", "from learner db"); + storage_->Flush(); + known_keys_.insert("test"); + VLOG(1) << "[Learner] Initialized local DB at " << config_.db_path + << " with test key"; +} \ No newline at end of file diff --git a/platform/learner/learner.h b/platform/learner/learner.h new file mode 100644 index 000000000..567bfaf2d --- /dev/null +++ b/platform/learner/learner.h @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include + +#include "chain/storage/storage.h" +#include "proto/kv/kv.pb.h" +#include "platform/proto/resdb.pb.h" +#include "platform/proto/replica_info.pb.h" +#include "common/crypto/hash.h" + +namespace resdb { +class Socket; +class Request; +} + +struct LearnerConfig { + std::string ip = "127.0.0.1"; + int port = 0; + int block_size = 0; + std::string db_path; + std::vector replicas; + int total_replicas = 0; + int min_needed = 0; +}; + +class Learner { + public: + explicit Learner(const std::string& config_path); + void Run(); + +private: + static LearnerConfig LoadConfig(const std::string& config_path); + void HandleClient(std::unique_ptr socket); + // Returns true if the connection should remain open, false if it should be closed. + bool ProcessBroadcast(resdb::Socket* socket, + const std::string& payload); + bool HandleReadOnlyRequest(resdb::Socket* socket, + const resdb::KVRequest& request) const; + void MetricsLoop() const; + void PrintMetrics() const; + void InitializeStorage(); + + void HandleLearnerUpdate(resdb::LearnerUpdate learnerUpdate); + uint32_t modpow(uint32_t a, uint32_t e, uint32_t p); + uint32_t modinv(uint32_t x, uint32_t p); + std::vector> invertMatrix(std::vector> A, int p); + std::vector> gen_A(std::vector inds); + bool GetNextCombination(std::vector &choice, bool first); + void ExecuteRequest(std::string requestStr); + +private: + LearnerConfig config_; + std::atomic is_running_{false}; + mutable std::atomic total_messages_{0}; + mutable std::atomic total_bytes_{0}; + mutable std::atomic total_sets_{0}; + mutable std::atomic total_reads_{0}; + mutable std::atomic total_deletes_{0}; + mutable std::atomic last_type_{-1}; + mutable std::atomic last_seq_{0}; + mutable std::atomic last_sender_{-1}; + mutable std::atomic last_payload_bytes_{0}; + mutable std::thread metrics_thread_; + mutable std::unique_ptr storage_; + std::unordered_set known_keys_; + + std::vector sequence_status; // 0 = not started, 1 = unknown valid hash, 2 = known valid hash, 3 = completed + std::vector learnerUpdates; + std::vector> hashCounts; + std::mutex m; +}; diff --git a/platform/learner/learner_main.cpp b/platform/learner/learner_main.cpp new file mode 100644 index 000000000..911664c22 --- /dev/null +++ b/platform/learner/learner_main.cpp @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "platform/learner/learner.h" + +#include +#include +#include + +#include + +std::string ResolveConfigPath(int argc, char** argv) { + std::string config_path = "platform/learner/learner.config"; + for (int i = 1; i < argc; ++i) { + if (std::string(argv[i]) == "-config" && i + 1 < argc) { + config_path = argv[i + 1]; + break; + } + } + return config_path; +} + +int main(int argc, char** argv) { + try { + google::InitGoogleLogging(argv[0]); + FLAGS_logtostderr = 1; + Learner learner(ResolveConfigPath(argc, argv)); + learner.Run(); + } catch (const std::exception& e) { + std::cerr << "Failed to start learner: " << e.what() << std::endl; + return 1; + } + return 0; +} diff --git a/platform/networkstrate/consensus_manager.cpp b/platform/networkstrate/consensus_manager.cpp index b3fb10625..399f6a518 100644 --- a/platform/networkstrate/consensus_manager.cpp +++ b/platform/networkstrate/consensus_manager.cpp @@ -47,6 +47,11 @@ ConsensusManager::ConsensusManager(const ResDBConfig& config) config_.GetPrivateKey(), config_.GetPublicKeyCertificateInfo()); } bc_client_ = GetReplicaClient(config_.GetReplicaInfos(), true); + + // Add learners + for (const auto& learner : config_.GetLearnerInfos()) { + AddNewClient(learner); + } } ConsensusManager::~ConsensusManager() { diff --git a/platform/networkstrate/replica_communicator.cpp b/platform/networkstrate/replica_communicator.cpp index f1521acb4..44bb8fee0 100644 --- a/platform/networkstrate/replica_communicator.cpp +++ b/platform/networkstrate/replica_communicator.cpp @@ -206,16 +206,12 @@ int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message) { int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message, const ReplicaInfo& replica_info) { - return SendSingleMessage(message, replica_info); - - if (is_use_long_conn_) { - std::string data = NetChannel::GetRawMessageString(message, verifier_); - BroadcastData broadcast_data; - broadcast_data.add_data()->swap(data); - return SendMessageFromPool(broadcast_data, {replica_info}); - } else { + // Learners/clients aren't part of the replica pool, so send directly on the + // configured port instead of the long-connection (+10000) channel. + if (!IsInPool(replica_info)) { return SendMessageInternal(message, {replica_info}); } + return SendSingleMessage(message, replica_info); } int ReplicaCommunicator::SendBatchMessage( diff --git a/platform/proto/replica_info.proto b/platform/proto/replica_info.proto index eaeac73e1..a5c4ff09e 100644 --- a/platform/proto/replica_info.proto +++ b/platform/proto/replica_info.proto @@ -34,6 +34,9 @@ message ReplicaInfo { message RegionInfo { repeated ReplicaInfo replica_info = 1; int32 region_id = 2; + +// NEW: Learner nodes for read-only routing + repeated ReplicaInfo learner_info = 3; } message ResConfigData{ @@ -58,12 +61,16 @@ message ResConfigData{ optional bool enable_resview = 23; optional bool enable_faulty_switch = 24; -// for hotstuff. + // for hotstuff. optional bool use_chain_hotstuff = 9; optional int32 max_client_complaint_num = 21; optional int32 duplicate_check_frequency_useconds = 22; + + // learner + optional int32 block_size = 33; + repeated ReplicaInfo learner_info = 34; } message ReplicaStates { diff --git a/platform/proto/resdb.proto b/platform/proto/resdb.proto index 47edac383..10b05fb68 100644 --- a/platform/proto/resdb.proto +++ b/platform/proto/resdb.proto @@ -91,6 +91,18 @@ message Request { bytes data_hash = 26; } +message RequestBatch { + repeated Request requests = 1; +} + +message LearnerUpdate { + repeated uint32 data = 1; + bytes block_hash = 2; + uint64 seq = 3; + uint32 sender_id = 4; + uint32 excess_bytes = 5; +} + // The response message containing response message ResponseData { bytes data = 1; @@ -131,6 +143,9 @@ message BatchUserResponse { uint64 local_id = 7; bytes hash = 8; int32 primary_id = 9; + uint64 set_count = 10; + uint64 read_count = 11; + uint64 delete_count = 12; } message HeartBeatInfo{ @@ -206,4 +221,3 @@ message QueryResponse { message CustomQueryResponse { bytes resp_str = 1; } - diff --git a/proto/kv/kv.proto b/proto/kv/kv.proto index 750058e75..99893309b 100644 --- a/proto/kv/kv.proto +++ b/proto/kv/kv.proto @@ -34,6 +34,7 @@ message KVRequest { GET_KEY_RANGE = 8; GET_HISTORY = 9; GET_TOP = 10; + GET_READ_ONLY = 11; } CMD cmd = 1; string key = 2; diff --git a/scripts/deploy/config/pbft.config b/scripts/deploy/config/pbft.config index 93e2ba29e..16145bdc4 100644 --- a/scripts/deploy/config/pbft.config +++ b/scripts/deploy/config/pbft.config @@ -1,5 +1,5 @@ { - "clientBatchNum": 100, + "clientBatchNum": 10, "enable_viewchange": true, "recovery_enabled": true, "max_client_complaint_num":10, diff --git a/service/tools/config/interface/service.config b/service/tools/config/interface/service.config index d357ba735..2214ed91a 100644 --- a/service/tools/config/interface/service.config +++ b/service/tools/config/interface/service.config @@ -1,3 +1,2 @@ -5 127.0.0.1 10005 - +5 127.0.0.1 20005 diff --git a/service/tools/config/server.config b/service/tools/config/server.config new file mode 100644 index 000000000..16ace299b --- /dev/null +++ b/service/tools/config/server.config @@ -0,0 +1,5 @@ + +1 127.0.0.1 20001 +2 127.0.0.1 20002 +3 127.0.0.1 20003 +4 127.0.0.1 20004 diff --git a/service/tools/config/server/server.config b/service/tools/config/server/server.config index 740bae3ab..f8e0008e9 100644 --- a/service/tools/config/server/server.config +++ b/service/tools/config/server/server.config @@ -1,38 +1,36 @@ { - region : { - replica_info : { - id:1, - ip:"127.0.0.1", - port: 10001, - }, - replica_info : { - id:2, - ip:"127.0.0.1", - port: 10002, - }, - replica_info : { - id:3, - ip:"127.0.0.1", - port: 10003, - }, - replica_info : { - id:4, - ip:"127.0.0.1", - port: 10004, - }, - region_id: 1, - }, - self_region_id:1, - leveldb_info : { - write_buffer_size_mb:128, - write_batch_size:1, - enable_block_cache: true, - block_cache_capacity: 100 - }, - require_txn_validation:true, - enable_viewchange:false, - enable_resview:true, - enable_faulty_switch:false -} - - + "region": [ + { + "replicaInfo": [ + { + "id": "1", + "ip": "127.0.0.1", + "port": 20001 + }, + { + "id": "2", + "ip": "127.0.0.1", + "port": 20002 + }, + { + "id": "3", + "ip": "127.0.0.1", + "port": 20003 + }, + { + "id": "4", + "ip": "127.0.0.1", + "port": 20004 + } + ] + } + ], + "learnerInfo": [ + { + "id": 0, + "ip": "127.0.0.1", + "port": 25000 + } + ], + "block_size": 10 +} \ No newline at end of file diff --git a/service/tools/contract/api_tools/example_contract/token.json b/service/tools/contract/api_tools/example_contract/token.json index eaeaa36ff..ebac04a78 100644 --- a/service/tools/contract/api_tools/example_contract/token.json +++ b/service/tools/contract/api_tools/example_contract/token.json @@ -1,3 +1,4 @@ + { "contracts": { "token.sol:Token": { diff --git a/service/tools/data/cert/cert_1.cert b/service/tools/data/cert/cert_1.cert index cb2ffd016..a0a6ede63 100644 Binary files a/service/tools/data/cert/cert_1.cert and b/service/tools/data/cert/cert_1.cert differ diff --git a/service/tools/data/cert/cert_2.cert b/service/tools/data/cert/cert_2.cert index 81b874353..d69a4a492 100644 --- a/service/tools/data/cert/cert_2.cert +++ b/service/tools/data/cert/cert_2.cert @@ -1,7 +1,7 @@ $ - u>��,����Nb+R�m����.�dQ|;V;݅ | -6 + u>��,����Nb+R�m����.�dQ|;V;݅ } +7 $ - 󗑉���Ej�K' =��zp�@r����E�����" 127.0.0.1(�NB@,'�GJ�|�R;�'��m@�:�#9�ĝ�'[� J��Z}_��,��Iʟ9'n)%� -�n|G  \ No newline at end of file + 󗑉���Ej�K' =��zp�@r����E�����" 127.0.0.1(��B@Ol� +Ɖ瀥5��&eJ�X�B��O.kP��'y�ᆰy�:�k��I}m���=�.��5 �$�  \ No newline at end of file diff --git a/service/tools/data/cert/cert_3.cert b/service/tools/data/cert/cert_3.cert index f7f811218..58899e575 100644 --- a/service/tools/data/cert/cert_3.cert +++ b/service/tools/data/cert/cert_3.cert @@ -1,7 +1,7 @@ $ - u>��,����Nb+R�m����.�dQ|;V;݅ | -6 + u>��,����Nb+R�m����.�dQ|;V;݅ } +7 $ |�a9� #��Y����,����Nb+R�m����.�dQ|;V;݅ | -6 + u>��,����Nb+R�m����.�dQ|;V;݅ } +7 $ - { aLx2٢�<�jQf+S�b��֐�±��2" 127.0.0.1(�NB@N�뀒�U�U����zi�W�}2Ub����X����1&�I��5�>=��y��{�֐6]g�w��  \ No newline at end of file + { aLx2٢�<�jQf+S�b��֐�±��2" 127.0.0.1(��B@Y�)X��>�~l(}�Z������{���yqa���YX^���0$N'C"Tz��}��\-��  \ No newline at end of file diff --git a/service/tools/data/cert/cert_5.cert b/service/tools/data/cert/cert_5.cert index b04bf7a91..a1ebad5af 100644 --- a/service/tools/data/cert/cert_5.cert +++ b/service/tools/data/cert/cert_5.cert @@ -1,8 +1,7 @@ $ - u>��,����Nb+R�m����.�dQ|;V;݅ ~ -8 + u>��,����Nb+R�m����.�dQ|;V;݅  +9 $ �H��Y -1����hܘ�hT�]%%꡽ey��V" 127.0.0.1(�NB@P���$4�����L v�t��`�#�+g��X�7i -��L]൮! ���]k!d{q�����P�  \ No newline at end of file +1����hܘ�hT�]%%꡽ey��V" 127.0.0.1(��B@! VH`� NTô0 ��1�o�N9��PT�j�k��|���U�U���t��@&��n�}*�W�  \ No newline at end of file diff --git a/service/tools/kv/server_tools/generate_config.sh b/service/tools/kv/server_tools/generate_config.sh index 5f4944345..1727c382e 100755 --- a/service/tools/kv/server_tools/generate_config.sh +++ b/service/tools/kv/server_tools/generate_config.sh @@ -30,5 +30,32 @@ CERT_PATH=$PWD/service/tools/data/cert/ CONFIG_PATH=$PWD/service/tools/config/ PORT_BASE=20000 CLIENT_NUM=1 +export LEARNER_CONFIG_PATH=$PWD/platform/learner/learner.config ./service/tools/config/generate_config.sh ${WORKSPACE} ${CERT_PATH} ${CERT_PATH} ${CONFIG_PATH} ${CERT_PATH} ${CLIENT_NUM} ${PORT_BASE} ${iplist[@]} + +python3 - "$CONFIG_PATH/server/server.config" <<'PY' +import json +import os +import sys + +cfg_path = sys.argv[1] +with open(cfg_path) as f: + cfg = json.load(f) + +learner_config_path = os.environ.get("LEARNER_CONFIG_PATH") +if learner_config_path and os.path.exists(learner_config_path): + with open(learner_config_path) as lf: + learner_cfg = json.load(lf) + learner_info = { + "id": int(learner_cfg.get("node_id", 0)), + "ip": learner_cfg.get("ip", "127.0.0.1"), + "port": int(learner_cfg.get("port", 0)), + } + cfg["learnerInfo"] = [learner_info] + if "block_size" in learner_cfg: + cfg["block_size"] = int(learner_cfg["block_size"]) + +with open(cfg_path, "w") as f: + json.dump(cfg, f, indent=2) +PY diff --git a/service/tools/kv/server_tools/start_kv_service.sh b/service/tools/kv/server_tools/start_kv_service.sh index e02731336..c0e1759d7 100755 --- a/service/tools/kv/server_tools/start_kv_service.sh +++ b/service/tools/kv/server_tools/start_kv_service.sh @@ -24,9 +24,8 @@ WORK_PATH=$PWD CERT_PATH=${WORK_PATH}/service/tools/data/cert/ bazel build //service/kv:kv_service $@ -nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node1.key.pri $CERT_PATH/cert_1.cert > server0.log & -nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node2.key.pri $CERT_PATH/cert_2.cert > server1.log & -nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node3.key.pri $CERT_PATH/cert_3.cert > server2.log & -nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node4.key.pri $CERT_PATH/cert_4.cert > server3.log & - -nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node5.key.pri $CERT_PATH/cert_5.cert > client.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node1.key.pri $CERT_PATH/cert_1.cert > logs/server0.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node2.key.pri $CERT_PATH/cert_2.cert > logs/server1.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node3.key.pri $CERT_PATH/cert_3.cert > logs/server2.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node4.key.pri $CERT_PATH/cert_4.cert > logs/server3.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node5.key.pri $CERT_PATH/cert_5.cert > logs/client.log & diff --git a/service/tools/kv/server_tools/start_kv_service_learner.sh b/service/tools/kv/server_tools/start_kv_service_learner.sh new file mode 100755 index 000000000..e390e9a0f --- /dev/null +++ b/service/tools/kv/server_tools/start_kv_service_learner.sh @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +killall -9 kv_service +killall -9 learner + +SERVER_PATH=./bazel-bin/service/kv/kv_service +SERVER_CONFIG=service/tools/config/server/server.config +WORK_PATH=$PWD +CERT_PATH=${WORK_PATH}/service/tools/data/cert/ +LEARNER_PATH=./bazel-bin/platform/learner/learner +LEARNER_CONFIG=$PWD/platform/learner/learner.config + +# start both builds +bazel build //platform/learner:learner $@ +bazel build //service/kv:kv_service $@ + +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node1.key.pri $CERT_PATH/cert_1.cert > logs/server0.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node2.key.pri $CERT_PATH/cert_2.cert > logs/server1.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node3.key.pri $CERT_PATH/cert_3.cert > logs/server2.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node4.key.pri $CERT_PATH/cert_4.cert > logs/server3.log & +nohup $SERVER_PATH $SERVER_CONFIG $CERT_PATH/node5.key.pri $CERT_PATH/cert_5.cert > logs/client.log & + +# learner +nohup $LEARNER_PATH -config $LEARNER_CONFIG > logs/learner.log & diff --git a/service/tools/utxo/wallet_tool/cpp/client_config.config b/service/tools/utxo/wallet_tool/cpp/client_config.config index e1bcc9054..c7b0cd314 100644 --- a/service/tools/utxo/wallet_tool/cpp/client_config.config +++ b/service/tools/utxo/wallet_tool/cpp/client_config.config @@ -1 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + 5 127.0.0.1 10005 diff --git a/service/tools/utxo/wallet_tool/cpp/server_config0.config b/service/tools/utxo/wallet_tool/cpp/server_config0.config index ee48298e4..17308dfc6 100644 --- a/service/tools/utxo/wallet_tool/cpp/server_config0.config +++ b/service/tools/utxo/wallet_tool/cpp/server_config0.config @@ -1 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + 1 127.0.0.1 10001 diff --git a/service/utxo/config/server_config.config b/service/utxo/config/server_config.config index ec284753a..3ab099068 100644 --- a/service/utxo/config/server_config.config +++ b/service/utxo/config/server_config.config @@ -1,3 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + { region : { replica_info : { diff --git a/service/utxo/config/utxo_config.config b/service/utxo/config/utxo_config.config index 237ad2822..31a9e7656 100644 --- a/service/utxo/config/utxo_config.config +++ b/service/utxo/config/utxo_config.config @@ -1,3 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# { genesis_transactions: {