Skip to content

Commit db2da69

Browse files
committed
graph, node, store: Add progress reporting for dump and restore
Add DumpReporter and RestoreReporter callback traits following the established PruneReporter pattern. The store layer calls trait methods at key moments (table start/finish, batch completion, schema creation, finalization) while the CLI provides concrete implementations using indicatif spinners that show per-table progress with row counts and elapsed time.
1 parent c768b58 commit db2da69

File tree

11 files changed

+440
-48
lines changed

11 files changed

+440
-48
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ chrono = "0.4.43"
4646
bs58 = "0.5.1"
4747
clap = { version = "4.5.4", features = ["derive", "env", "wrap_help"] }
4848
clap_complete = "4"
49+
console = "0.16"
4950
derive_more = { version = "2.1.1", default-features = false }
5051
diesel = { version = "2.2.7", features = [
5152
"postgres",
@@ -70,6 +71,7 @@ graphman-server = { path = "./server/graphman" }
7071
graphman = { path = "./core/graphman" }
7172
graphman-store = { path = "./core/graphman_store" }
7273
graphql-tools = "0.5.1"
74+
indicatif = "0.18"
7375
Inflector = "0.11.3"
7476
itertools = "0.14.0"
7577
lazy_static = "1.5.0"

gnd/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ reqwest = { workspace = true }
5757
thiserror = { workspace = true }
5858

5959
# Console output
60-
indicatif = "0.18"
61-
console = "0.16"
6260
similar = "2"
61+
indicatif = { workspace = true }
62+
console = { workspace = true }
6363

6464
# Code generation
6565
graphql-tools = { workspace = true }

graph/src/components/store/mod.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -978,6 +978,69 @@ pub trait PruneReporter: Send + 'static {
978978
fn finish(&mut self) {}
979979
}
980980

981+
/// Callbacks for `SubgraphStore.dump` so that callers can report progress
982+
/// of the dump procedure to users
983+
#[allow(unused_variables)]
984+
pub trait DumpReporter: Send + 'static {
985+
/// Called at the start with deployment hash and total entity table count.
986+
fn start(&mut self, deployment: &str, table_count: usize) {}
987+
988+
/// Called before dumping an entity table. `rows_approx` is estimated
989+
/// from the vid range (max_vid - min_vid + 1), 0 if empty.
990+
fn start_table(&mut self, table: &str, rows_approx: usize) {}
991+
992+
/// Called after a batch of rows has been written to Parquet.
993+
fn batch_dumped(&mut self, table: &str, rows: usize) {}
994+
995+
/// Called after an entity table has been fully dumped.
996+
fn finish_table(&mut self, table: &str, rows: usize) {}
997+
998+
/// Called before dumping data_sources$.
999+
fn start_data_sources(&mut self) {}
1000+
1001+
/// Called after data_sources$ has been dumped.
1002+
fn finish_data_sources(&mut self, rows: usize) {}
1003+
1004+
/// Called when the entire dump has completed.
1005+
fn finish(&mut self) {}
1006+
}
1007+
1008+
/// Callbacks for `SubgraphStore.restore` so that callers can report
1009+
/// progress of the restore procedure to users
1010+
#[allow(unused_variables)]
1011+
pub trait RestoreReporter: Send + 'static {
1012+
/// Called at the start with deployment hash and total entity table count.
1013+
fn start(&mut self, deployment: &str, table_count: usize) {}
1014+
1015+
/// Called when creating the schema.
1016+
fn start_create_schema(&mut self) {}
1017+
fn finish_create_schema(&mut self) {}
1018+
1019+
/// Called before importing an entity table. `total_rows` is the sum
1020+
/// of row counts across all chunks for this table.
1021+
fn start_table(&mut self, table: &str, total_rows: usize) {}
1022+
1023+
/// Called after a batch of rows has been inserted.
1024+
fn batch_imported(&mut self, table: &str, rows: usize) {}
1025+
1026+
/// Called when a table is skipped because it was already restored.
1027+
fn skip_table(&mut self, table: &str) {}
1028+
1029+
/// Called after an entity table has been fully imported.
1030+
fn finish_table(&mut self, table: &str, rows: usize) {}
1031+
1032+
/// Called before/after importing data_sources$.
1033+
fn start_data_sources(&mut self, total_rows: usize) {}
1034+
fn finish_data_sources(&mut self, rows: usize) {}
1035+
1036+
/// Called during finalization (sequence resets, head block set).
1037+
fn start_finalize(&mut self) {}
1038+
fn finish_finalize(&mut self) {}
1039+
1040+
/// Called when the entire restore has completed.
1041+
fn finish(&mut self) {}
1042+
}
1043+
9811044
/// Select how pruning should be done
9821045
#[derive(Clone, Copy, Debug, Display, PartialEq)]
9831046
pub enum PruningStrategy {

node/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ diesel = { workspace = true }
4040
diesel-async = { workspace = true }
4141
prometheus = { version = "0.14.0", features = ["push"] }
4242
json-structural-diff = { version = "0.2", features = ["colorize"] }
43+
indicatif = { workspace = true }
44+
console = { workspace = true }
4345

4446
# Dependencies related to Amp subgraphs
4547
tokio-util.workspace = true

node/src/manager/commands/dump.rs

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,117 @@
11
use std::fs;
22
use std::sync::Arc;
3+
use std::time::Instant;
34

5+
use console::style;
6+
use graph::components::store::DumpReporter;
47
use graph::prelude::anyhow::Result;
8+
use indicatif::ProgressBar;
59

610
use graph_store_postgres::{ConnectionPool, SubgraphStore};
711

812
use crate::manager::deployment::DeploymentSearch;
913

14+
struct DumpProgress {
15+
spinner: ProgressBar,
16+
start: Instant,
17+
table_start: Instant,
18+
table_rows: usize,
19+
}
20+
21+
impl DumpProgress {
22+
fn new() -> Self {
23+
let spinner = ProgressBar::new_spinner();
24+
spinner.enable_steady_tick(std::time::Duration::from_millis(80));
25+
Self {
26+
spinner,
27+
start: Instant::now(),
28+
table_start: Instant::now(),
29+
table_rows: 0,
30+
}
31+
}
32+
}
33+
34+
impl DumpReporter for DumpProgress {
35+
fn start(&mut self, deployment: &str, table_count: usize) {
36+
self.start = Instant::now();
37+
self.spinner.set_message(format!(
38+
"Dumping deployment {} ({} tables)",
39+
style(deployment).cyan(),
40+
table_count,
41+
));
42+
}
43+
44+
fn start_table(&mut self, table: &str, rows_approx: usize) {
45+
self.table_start = Instant::now();
46+
self.table_rows = 0;
47+
self.spinner
48+
.set_message(format!("{:<32} ~{} rows", table, format_count(rows_approx),));
49+
}
50+
51+
fn batch_dumped(&mut self, table: &str, rows: usize) {
52+
self.table_rows += rows;
53+
self.spinner.set_message(format!(
54+
"{:<32} {} rows",
55+
table,
56+
format_count(self.table_rows),
57+
));
58+
}
59+
60+
fn finish_table(&mut self, table: &str, rows: usize) {
61+
let elapsed = self.table_start.elapsed().as_secs();
62+
let line = format!(
63+
" {} {:<32} {:>10} rows {:>4}s",
64+
style("\u{2714}").green(),
65+
table,
66+
format_count(rows),
67+
elapsed,
68+
);
69+
self.spinner.suspend(|| println!("{line}"));
70+
}
71+
72+
fn start_data_sources(&mut self) {
73+
self.table_start = Instant::now();
74+
self.table_rows = 0;
75+
self.spinner.set_message("data_sources$".to_string());
76+
}
77+
78+
fn finish_data_sources(&mut self, rows: usize) {
79+
let elapsed = self.table_start.elapsed().as_secs();
80+
let line = format!(
81+
" {} {:<32} {:>10} rows {:>4}s",
82+
style("\u{2714}").green(),
83+
"data_sources$",
84+
format_count(rows),
85+
elapsed,
86+
);
87+
self.spinner.suspend(|| println!("{line}"));
88+
}
89+
90+
fn finish(&mut self) {
91+
let elapsed = self.start.elapsed().as_secs();
92+
self.spinner.finish_with_message(format!(
93+
"{} Dump complete ({}s)",
94+
style("\u{2714}").green(),
95+
elapsed
96+
));
97+
}
98+
}
99+
100+
fn format_count(n: usize) -> String {
101+
if n == 0 {
102+
return "0".to_string();
103+
}
104+
let s = n.to_string();
105+
let mut result = String::new();
106+
for (i, c) in s.chars().rev().enumerate() {
107+
if i > 0 && i % 3 == 0 {
108+
result.push(',');
109+
}
110+
result.push(c);
111+
}
112+
result.chars().rev().collect()
113+
}
114+
10115
pub async fn run(
11116
subgraph_store: Arc<SubgraphStore>,
12117
primary_pool: ConnectionPool,
@@ -18,6 +123,7 @@ pub async fn run(
18123

19124
let loc = deployment.locate_unique(&primary_pool).await?;
20125

21-
subgraph_store.dump(&loc, directory).await?;
126+
let reporter = Box::new(DumpProgress::new());
127+
subgraph_store.dump(&loc, directory, reporter).await?;
22128
Ok(())
23129
}

0 commit comments

Comments
 (0)