From ad340607c8a7a7e513a25079d39cb2531a14bdd7 Mon Sep 17 00:00:00 2001 From: affsantos Date: Wed, 18 Mar 2026 11:21:12 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20focused=20profiling=20=E2=80=94=20only?= =?UTF-8?q?=20profile=20changed=20columns?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For wide models (100+ columns), data-diff now profiles only the columns that actually changed, using sqlglot AST analysis. This reduces BQ compute by ~90% for typical changes to wide models. How it works: - parse_columns.py identifies added_columns + expression_changes - is_cte_change_additive() classifies CTE modifications: - Additive (new LEFT JOINs, new columns) → safe for focused profiling - Structural (WHERE/filter/JOIN changes) → falls back to full profiling - EXCEPT DISTINCT row comparison always covers ALL columns as safety net - --full flag to override and profile everything Also syncs upstream improvements: - data-diff.sh: get_affected_columns() helper, column filter in build_profile_query(), profiling mode tracking per model - template.html: focused profiling banner, skip-row styling for non-profiled columns (shown as — instead of misleading zeros) - SKILL.md: document --full flag and focused profiling behavior --- README.md | 9 ++ SKILL.md | 29 +++++- data-diff.sh | 248 +++++++++++++++++++++++++++++++++++++++++------ parse_columns.py | 198 +++++++++++++++++++++++++++++++------ template.html | 95 +++++++++++++++--- 5 files changed, 506 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index 52362a6..b3c9654 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Visual data diff for dbt + BigQuery. Compare production vs development data afte ```bash dbt build --select my_model # 1. Build your changes ./data-diff.sh my_model # 2. See what changed → opens HTML report +./data-diff.sh --full my_model # 3. (optional) Profile ALL columns ``` Zero configuration — reads your GCP project, dbt project name, and schemas from the manifest automatically. @@ -53,6 +54,14 @@ Compares your dev tables against production across three layers: The HTML report includes summary with risk indicators, per-model cards with column profiles (prod vs dev side-by-side), code diffs, and sample rows. +### Focused Profiling + +By default, only **changed columns** are profiled — using sqlglot AST analysis to detect which columns were added or had their expressions modified. For wide models (100+ columns), this is dramatically faster. + +CTE modifications are classified as **additive** (new LEFT JOINs, new columns — safe for focused profiling) or **structural** (WHERE/filter/JOIN changes — falls back to full profiling automatically). The `EXCEPT DISTINCT` row comparison always covers all columns regardless. + +Use `--full` to force profiling of every column when needed. + ## AI Agent Integration Includes a `SKILL.md` for [pi](https://github.com/mariozechner/pi-coding-agent) and Claude Code. The agent suggests running data-diff after validation passes and summarises findings in chat. diff --git a/SKILL.md b/SKILL.md index c970a56..1e9976f 100644 --- a/SKILL.md +++ b/SKILL.md @@ -48,11 +48,33 @@ Suggest running it when: # Multiple models .agents/skills/data-diff/data-diff.sh "int_order_pricing int_product_inventory" + +# Full profiling (all columns — slower but complete) +.agents/skills/data-diff/data-diff.sh --full int_order_pricing ``` The script prints progress to stderr and the output HTML file path to stdout. It automatically opens the page in the browser on macOS. +### Focused profiling (default) + +By default, data-diff only profiles **columns that changed** (added or +expression-modified) using sqlglot analysis. This is dramatically faster +for wide models (e.g. a 100+ column model profiling only the ~11 that +changed). + +The EXCEPT DISTINCT row comparison still covers **all columns**, so data +changes in non-profiled columns are still caught in the sample rows +section. + +Fallback to full profiling happens automatically when: +- sqlglot analysis fails or is unavailable +- Only CTEs changed with no identifiable output column changes +- The model is new (no production counterpart) + +Use `--full` to force profiling of all columns when you need complete +statistical comparison (e.g. investigating indirect CTE changes). + ### What happens under the hood | Step | What | Cost | @@ -61,11 +83,12 @@ stdout. It automatically opens the page in the browser on macOS. | 2. Code diff | sqlglot AST parse (local) | Free | | 3. Schema diff | `INFORMATION_SCHEMA` query | 1 fast query / model | | 4. Extract primary keys | Manifest parsing (local) | Free | -| 5. Profile columns | BQ profiling query | 1 query / model / env | -| 6. Sample rows | `EXCEPT DISTINCT` query | 1 query / model | +| 5. Profile columns | BQ profiling query (focused: only changed cols) | 1 query / model / env | +| 6. Sample rows | `EXCEPT DISTINCT` query (all columns) | 1 query / model | | 7–8. Assemble + render | JSON → HTML injection | Free | -**Performance**: ~30–60 seconds for up to 5 models. +**Performance**: ~30–60 seconds for up to 5 models. Focused profiling +significantly speeds up wide models (100+ columns → only changed columns). ## Interpreting the Output diff --git a/data-diff.sh b/data-diff.sh index 09f8401..192e69c 100755 --- a/data-diff.sh +++ b/data-diff.sh @@ -25,15 +25,26 @@ DIAGRAMS_DIR="${REPO_ROOT}/.data-diff" NUMERIC_TYPES="INT64|FLOAT64|NUMERIC|BIGNUMERIC|DECIMAL|BIGDECIMAL" # ── Parse arguments ───────────────────────────────────────────── +PROFILE_MODE="focused" # default: only profile affected columns + if [[ $# -eq 0 ]]; then - echo "Usage: data-diff.sh " >&2 + echo "Usage: data-diff.sh [--full] " >&2 + echo "" >&2 + echo "Options:" >&2 + echo " --full Profile ALL columns (default: only changed columns)" >&2 echo "" >&2 echo "Examples:" >&2 echo " data-diff.sh int_order_pricing" >&2 echo " data-diff.sh \"int_order_pricing int_product_inventory\"" >&2 + echo " data-diff.sh --full int_order_pricing" >&2 exit 1 fi +if [[ "$1" == "--full" ]]; then + PROFILE_MODE="full" + shift +fi + MODELS="$*" # ── Preflight checks ─────────────────────────────────────────── @@ -195,6 +206,84 @@ else echo '{}' > "$TMPDIR/code_changes.json" fi +# ── Helper: extract affected columns from code changes ────────── +# Returns a newline-separated list of column names to profile, +# or the literal string "ALL" if full profiling is needed. +get_affected_columns() { + local model_base="$1" + + # --full flag overrides focused profiling + if [[ "$PROFILE_MODE" == "full" ]]; then + echo "ALL" + return + fi + + local code_file="$TMPDIR/code_changes.json" + if [[ ! -f "$code_file" || "$(cat "$code_file")" == "{}" ]]; then + echo "ALL" + return + fi + + # Find this model in the code_changes array + local model_data + model_data=$(jq -r --arg m "$model_base" \ + '.[] | select(.model == $m) // empty' "$code_file" 2>/dev/null) + + if [[ -z "$model_data" ]]; then + echo "ALL" + return + fi + + # New models always get full profiling + local is_new + is_new=$(echo "$model_data" | jq -r '.is_new // false') + if [[ "$is_new" == "true" ]]; then + echo "ALL" + return + fi + + # No code_changes (parse failed) → full profiling + local code_changes + code_changes=$(echo "$model_data" | jq '.code_changes // empty') + if [[ -z "$code_changes" || "$code_changes" == "null" ]]; then + echo "ALL" + return + fi + + # Get affected columns (added + expression_changes) + local affected + affected=$(echo "$code_changes" | jq -r ' + .affected_columns // [] | .[] + ' 2>/dev/null) + + local has_indirect + has_indirect=$(echo "$code_changes" | jq -r '.has_indirect_changes // false') + + # If CTEs were structurally modified (not just additive changes) + # and we can't determine which columns are affected → full profiling. + if [[ "$has_indirect" == "true" ]]; then + if [[ -z "$affected" ]]; then + # CTE-only changes, no direct column changes → must profile all + echo "ALL" + return + fi + # CTE structural changes + direct column changes → full profiling + # for safety (the CTE change could affect existing columns too) + log " $model_base: structural CTE changes → full profiling for safety" + echo "ALL" + return + fi + + # If nothing changed at all (no columns, no CTEs) → profile all + # (defensive — shouldn't happen for a modified model) + if [[ -z "$affected" ]]; then + echo "ALL" + return + fi + + echo "$affected" +} + # ═══════════════════════════════════════════════════════════════════ # STEP 3: Schema diff via INFORMATION_SCHEMA # ═══════════════════════════════════════════════════════════════════ @@ -289,15 +378,31 @@ step "Step 5: Profile columns" build_profile_query() { local cols_json="$1" local table="$2" + local filter_file="${3:-}" # optional: file with column names to profile (one per line) # Use python to build the query dynamically from column metadata "$PYTHON" -c " import json, sys + cols = json.load(open('$cols_json')) NUMERIC = {'INT64','FLOAT64','NUMERIC','BIGNUMERIC','DECIMAL','BIGDECIMAL'} + +# Load column filter if provided +filter_set = None +filter_path = '$filter_file' +if filter_path: + try: + with open(filter_path) as f: + filter_set = {line.strip() for line in f if line.strip()} + except FileNotFoundError: + pass + parts = ['COUNT(*) AS _row_count'] for c in cols: name = c['column_name'] + # Skip columns not in filter (if a filter is active) + if filter_set is not None and name not in filter_set: + continue safe = name.replace('-','_') parts.append(f'COUNT(DISTINCT \`{name}\`) AS \`{safe}__distinct\`') parts.append(f'COUNTIF(\`{name}\` IS NULL) AS \`{safe}__nulls\`') @@ -305,6 +410,7 @@ for c in cols: parts.append(f'MIN(\`{name}\`) AS \`{safe}__min\`') parts.append(f'MAX(\`{name}\`) AS \`{safe}__max\`') parts.append(f'ROUND(CAST(AVG(\`{name}\`) AS FLOAT64), 4) AS \`{safe}__mean\`') + q = 'SELECT\n ' + ',\n '.join(parts) + f'\nFROM $table' print(q) " @@ -322,13 +428,44 @@ for model_base in $MODEL_LIST; do log "$model_base: ⚠️ no columns found — skipping profile" echo '[]' > "$TMPDIR/models/${model_base}_profile_dev.json" echo '[]' > "$TMPDIR/models/${model_base}_profile_prod.json" + echo '"ALL"' > "$TMPDIR/models/${model_base}_profiled_cols.json" continue fi + # Determine which columns to profile (focused vs full) + affected_cols=$(get_affected_columns "$model_base") + filter_file="" + + if [[ "$affected_cols" == "ALL" ]]; then + log "$model_base: profiling ALL $col_count columns" + echo '"ALL"' > "$TMPDIR/models/${model_base}_profiled_cols.json" + else + # Write affected columns to a filter file + filter_file="$TMPDIR/models/${model_base}_col_filter.txt" + echo "$affected_cols" > "$filter_file" + + # Also add columns from schema diff (new columns in schema that + # sqlglot might not have caught — e.g. from Jinja) + if [[ -f "$TMPDIR/models/${model_base}_schema_diff.json" ]]; then + jq -r '.[] | select(.change_type == "added") | .column' \ + "$TMPDIR/models/${model_base}_schema_diff.json" 2>/dev/null \ + >> "$filter_file" || true + fi + + # Deduplicate + sort -u "$filter_file" -o "$filter_file" + affected_count=$(wc -l < "$filter_file" | tr -d ' ') + log "$model_base: focused profiling — $affected_count of $col_count columns" + + # Save the list for HTML rendering + jq -R -s 'split("\n") | map(select(length > 0))' "$filter_file" \ + > "$TMPDIR/models/${model_base}_profiled_cols.json" + fi + # Profile dev dev_table="\`${GCP_PROJECT}\`.\`${DEV_SCHEMA}\`.\`${model_base}\`" - dev_query=$(build_profile_query "$dev_cols_file" "$dev_table") - log "$model_base: profiling dev ($col_count columns)..." + dev_query=$(build_profile_query "$dev_cols_file" "$dev_table" "$filter_file") + log "$model_base: profiling dev..." bq query --use_legacy_sql=false --format=json "$dev_query" 2>/dev/null \ > "$TMPDIR/models/${model_base}_profile_dev.json" \ || echo '[{}]' > "$TMPDIR/models/${model_base}_profile_dev.json" @@ -340,8 +477,8 @@ for model_base in $MODEL_LIST; do if [[ "$prod_col_count" -gt 0 ]]; then prod_table="\`${GCP_PROJECT}\`.\`${prod_schema}\`.\`${model_base}\`" - prod_query=$(build_profile_query "$prod_cols_file" "$prod_table") - log "$model_base: profiling prod ($prod_col_count columns)..." + prod_query=$(build_profile_query "$prod_cols_file" "$prod_table" "$filter_file") + log "$model_base: profiling prod..." bq query --use_legacy_sql=false --format=json "$prod_query" 2>/dev/null \ > "$TMPDIR/models/${model_base}_profile_prod.json" \ || echo '[{}]' > "$TMPDIR/models/${model_base}_profile_prod.json" @@ -375,6 +512,7 @@ for model_base in $MODEL_LIST; do || echo '[]' > "$TMPDIR/models/${model_base}_rows_added.json" echo '[]' > "$TMPDIR/models/${model_base}_rows_removed.json" echo '[]' > "$TMPDIR/models/${model_base}_rows_modified_raw.json" + echo '[]' > "$TMPDIR/models/${model_base}_rows_new_columns.json" elif [[ -n "$prod_schema" ]]; then prod_table="\`${GCP_PROJECT}\`.\`${prod_schema}\`.\`${model_base}\`" @@ -486,10 +624,35 @@ for model_base in $MODEL_LIST; do fi fi # end common_cols check + + # Sample rows for newly added columns (PK + new columns only) + added_cols=$(jq -r ' + [.[] | select(.change_type == "added") | .column] | + if length > 0 then map("`" + . + "`") | join(", ") else empty end + ' "$TMPDIR/models/${model_base}_schema_diff.json" 2>/dev/null || true) + + if [[ -n "$added_cols" ]]; then + pk_select_cols=$(cat "$TMPDIR/models/${model_base}_pk.json" | jq -r ' + if length > 0 then map("`" + . + "`") | join(", ") else empty end + ') + if [[ -n "$pk_select_cols" ]]; then + new_col_select="${pk_select_cols}, ${added_cols}" + else + new_col_select="${added_cols}" + fi + log "$model_base: sampling new column data..." + bq query --use_legacy_sql=false --format=json --max_rows=10 \ + "SELECT ${new_col_select} FROM ${dev_table} LIMIT 10" 2>/dev/null \ + > "$TMPDIR/models/${model_base}_rows_new_columns.json" \ + || echo '[]' > "$TMPDIR/models/${model_base}_rows_new_columns.json" + else + echo '[]' > "$TMPDIR/models/${model_base}_rows_new_columns.json" + fi else echo '[]' > "$TMPDIR/models/${model_base}_rows_added.json" echo '[]' > "$TMPDIR/models/${model_base}_rows_removed.json" echo '[]' > "$TMPDIR/models/${model_base}_rows_modified_raw.json" + echo '[]' > "$TMPDIR/models/${model_base}_rows_new_columns.json" fi done @@ -541,8 +704,9 @@ model_names = [f.replace(".json","") for f in os.listdir(models_dir) "_dev_cols.json", "_prod_cols.json", "_schema_diff.json", "_pk.json", "_profile_dev.json", "_profile_prod.json", + "_profiled_cols.json", "_col_filter.txt", "_rows_added.json", "_rows_removed.json", - "_rows_modified_raw.json" + "_rows_modified_raw.json", "_rows_new_columns.json" ] )] @@ -569,12 +733,25 @@ for model_name in sorted(model_names): rows_added_raw = load("_rows_added.json") rows_removed_raw = load("_rows_removed.json") rows_modified_raw = load("_rows_modified_raw.json") + rows_new_columns_raw = load("_rows_new_columns.json") + + # Load profiled columns list ("ALL" or a list of column names) + profiled_cols_raw = load("_profiled_cols.json", default='"ALL"') + if isinstance(profiled_cols_raw, str): + # JSON string "ALL" gets loaded as the string ALL + profiled_cols_set = None # None means all columns were profiled + elif isinstance(profiled_cols_raw, list): + profiled_cols_set = set(profiled_cols_raw) + else: + profiled_cols_set = None + + is_focused = profiled_cols_set is not None # Parse profiles — bq returns a list with one row profile_dev = profile_dev_raw[0] if profile_dev_raw else {} profile_prod = profile_prod_raw[0] if profile_prod_raw else {} - # Row counts + # Row counts (always available — COUNT(*) is in every profile query) dev_row_count = int(profile_dev.get("_row_count", 0)) prod_row_count = int(profile_prod.get("_row_count", 0)) if not is_new else None @@ -607,13 +784,8 @@ for model_name in sorted(model_names): is_pk = cname in pk is_schema_add = cname in schema_additions - # Extract dev profile values - dev_distinct = int(profile_dev.get(f"{safe}__distinct", 0)) - dev_nulls = int(profile_dev.get(f"{safe}__nulls", 0)) - dev_null_pct = round(dev_nulls / dev_row_count * 100, 2) if dev_row_count else 0 - dev_min = profile_dev.get(f"{safe}__min") - dev_max = profile_dev.get(f"{safe}__max") - dev_mean = profile_dev.get(f"{safe}__mean") + # Was this column profiled? (focused mode skips unchanged columns) + was_profiled = profiled_cols_set is None or cname in profiled_cols_set # Convert numeric strings def to_num(v): @@ -621,18 +793,29 @@ for model_name in sorted(model_names): try: return float(v) except: return v - dev_profile = { - "distinct_count": dev_distinct, - "null_count": dev_nulls, - "null_pct": dev_null_pct, - "min": to_num(dev_min), - "max": to_num(dev_max), - "mean": to_num(dev_mean) - } + if was_profiled: + # Extract dev profile values + dev_distinct = int(profile_dev.get(f"{safe}__distinct", 0)) + dev_nulls = int(profile_dev.get(f"{safe}__nulls", 0)) + dev_null_pct = round(dev_nulls / dev_row_count * 100, 2) if dev_row_count else 0 + dev_min = profile_dev.get(f"{safe}__min") + dev_max = profile_dev.get(f"{safe}__max") + dev_mean = profile_dev.get(f"{safe}__mean") + + dev_profile = { + "distinct_count": dev_distinct, + "null_count": dev_nulls, + "null_pct": dev_null_pct, + "min": to_num(dev_min), + "max": to_num(dev_max), + "mean": to_num(dev_mean) + } + else: + dev_profile = None # Not profiled — rendered as "—" in HTML # Extract prod profile values (if not new and column exists in prod) prod_profile = None - if not is_new and not is_schema_add and cname in prod_col_map: + if not is_new and not is_schema_add and cname in prod_col_map and was_profiled: prod_distinct = int(profile_prod.get(f"{safe}__distinct", 0)) prod_nulls = int(profile_prod.get(f"{safe}__nulls", 0)) prod_null_pct = round(prod_nulls / prod_row_count * 100, 2) if prod_row_count else 0 @@ -651,7 +834,7 @@ for model_name in sorted(model_names): # Determine if data changed is_changed = False - if prod_profile and not is_schema_add: + if prod_profile and dev_profile and not is_schema_add: if (prod_profile["distinct_count"] != dev_profile["distinct_count"] or prod_profile["null_pct"] != dev_profile["null_pct"] or prod_profile["min"] != dev_profile["min"] or @@ -663,7 +846,7 @@ for model_name in sorted(model_names): is_changed = True # Check for null% spikes (risk indicator) - if prod_profile and not is_schema_add: + if prod_profile and dev_profile and not is_schema_add: null_delta = dev_profile["null_pct"] - prod_profile["null_pct"] if abs(null_delta) > 5: risk_indicators.append({ @@ -677,6 +860,7 @@ for model_name in sorted(model_names): "data_type": ctype, "is_primary_key": is_pk, "is_changed": is_changed, + "is_profiled": was_profiled, "prod": prod_profile, "dev": dev_profile } @@ -737,9 +921,14 @@ for model_name in sorted(model_names): sample_rows = { "added": rows_added, "removed": rows_removed, - "modified": rows_modified + "modified": rows_modified, + "new_columns": rows_new_columns_raw or [] } + # Profiling summary + profiled_count = sum(1 for c in column_profiles if c.get("is_profiled", True)) + total_cols = len(column_profiles) + model_obj = { "name": model_name, "layer": meta["layer"], @@ -758,7 +947,12 @@ for model_name in sorted(model_names): "schema_changes": schema_diff, "code_changes": code_changes, "column_profiles": column_profiles, - "sample_rows": sample_rows + "sample_rows": sample_rows, + "profiling": { + "mode": "focused" if is_focused else "full", + "profiled_columns": profiled_count, + "total_columns": total_cols + } } models.append(model_obj) diff --git a/parse_columns.py b/parse_columns.py index 179cdd9..a7d4720 100755 --- a/parse_columns.py +++ b/parse_columns.py @@ -80,6 +80,7 @@ def strip_jinja(raw: str) -> str: # AST helpers # --------------------------------------------------------------------------- + def _parse(sql: str) -> exp.Expression: """Parse SQL with BigQuery dialect.""" return sqlglot.parse_one(sql, dialect="bigquery") @@ -124,11 +125,124 @@ def extract_ctes(parsed: exp.Expression) -> dict[str, str]: return ctes +def _extract_cte_nodes(parsed: exp.Expression) -> dict[str, exp.Expression]: + """Extract CTE body AST nodes (not just SQL strings).""" + ctes: dict[str, exp.Expression] = {} + for cte in parsed.find_all(exp.CTE): + ctes[cte.alias] = cte.this + return ctes + + +def is_cte_change_additive( + prod_body: exp.Expression, + dev_body: exp.Expression, +) -> bool: + """Determine if a CTE modification is purely additive. + + A CTE change is "additive" (safe for focused profiling) when: + - WHERE clause unchanged + - GROUP BY / HAVING unchanged + - All existing JOINs preserved (new JOINs may be added) + - New JOINs are LEFT/CROSS (can't filter rows) + - SELECT column expressions unchanged (columns may be added) + + Returns True if the change is additive, False if it's structural + (could affect existing column data). + """ + try: + # Compare WHERE + prod_where = prod_body.find(exp.Where) + dev_where = dev_body.find(exp.Where) + if not _clauses_match(prod_where, dev_where): + return False + + # Compare GROUP BY + if not _clauses_match(prod_body.find(exp.Group), dev_body.find(exp.Group)): + return False + + # Compare HAVING + if not _clauses_match( + prod_body.find(exp.Having), dev_body.find(exp.Having) + ): + return False + + # Compare JOINs — all existing must be preserved, new must be LEFT/CROSS + prod_joins = [_norm(j.sql(dialect="bigquery")) for j in prod_body.find_all(exp.Join)] + dev_joins_nodes = list(dev_body.find_all(exp.Join)) + dev_joins = [_norm(j.sql(dialect="bigquery")) for j in dev_joins_nodes] + + if not all(pj in dev_joins for pj in prod_joins): + return False + + new_join_nodes = [ + j for j in dev_joins_nodes + if _norm(j.sql(dialect="bigquery")) not in prod_joins + ] + for j in new_join_nodes: + side = j.args.get("side", "") + kind = j.args.get("kind", "") + side_str = side if isinstance(side, str) else (side.name if side else "") + kind_str = kind if isinstance(kind, str) else (kind.name if kind else "") + # Only LEFT and CROSS joins are additive (can't remove existing rows) + if side_str.upper() not in ("LEFT", "") or kind_str.upper() == "CROSS": + continue + if side_str.upper() == "" and kind_str.upper() not in ("CROSS", ""): + return False + + # Compare SELECT columns within the CTE (existing expressions unchanged). + # Use _norm_expr to handle alias changes (id → o.id) from added JOINs. + prod_cte_cols, _ = extract_columns(prod_body) + dev_cte_cols, _ = extract_columns(dev_body) + for col_name, prod_expr in prod_cte_cols.items(): + dev_expr = dev_cte_cols.get(col_name) + if dev_expr is None: + return False # column removed + if _norm_expr(prod_expr) != _norm_expr(dev_expr): + return False # expression changed + + return True + except Exception: + return False # any error → assume structural (conservative) + + +def _clauses_match( + prod_clause: exp.Expression | None, + dev_clause: exp.Expression | None, +) -> bool: + """Check if two SQL clauses match (both None or same SQL).""" + if prod_clause is None and dev_clause is None: + return True + if prod_clause is None or dev_clause is None: + return False + return _norm(prod_clause.sql(dialect="bigquery")) == _norm( + dev_clause.sql(dialect="bigquery") + ) + + def _norm(sql: str) -> str: """Normalise SQL for comparison (collapse whitespace, lowercase).""" return " ".join(sql.split()).strip().lower() +def _norm_expr(expr_sql: str) -> str: + """Normalise a column expression for comparison. + + Handles the common case where adding a JOIN forces table aliases + onto existing columns (e.g. ``id`` → ``o.id``). For simple column + references, strips the table qualifier so they compare equal. + For complex expressions, falls back to standard normalisation. + """ + normed = _norm(expr_sql) + # Simple column reference with optional table qualifier: "t.col" → "col" + parts = normed.rsplit(".", 1) + if len(parts) == 2: + table_part, col_part = parts + # Only strip if the table part looks like an alias (simple identifier) + if table_part.replace("`", "").isidentifier(): + return col_part + return normed + + _JINJA_PLACEHOLDER = "__jinja_expr__" @@ -146,6 +260,7 @@ def _has_jinja_artifacts(parsed: exp.Expression) -> bool: # Model-level diff # --------------------------------------------------------------------------- + def diff_model( model_name: str, prod_raw: str | None, @@ -205,9 +320,7 @@ def diff_model( result["is_new"] = True result["columns"] = list(dev_columns.keys()) if dev_has_star: - result["columns_note"] = ( - "SELECT * detected — column list may be incomplete" - ) + result["columns_note"] = "SELECT * detected — column list may be incomplete" result["code_changes"] = None return result @@ -237,15 +350,20 @@ def diff_model( elif dev_expr is None: removed_columns.append(col) elif _norm(prod_expr) != _norm(dev_expr): - expression_changes.append({ - "column": col, - "prod_expression": prod_expr, - "dev_expression": dev_expr, - }) + expression_changes.append( + { + "column": col, + "prod_expression": prod_expr, + "dev_expression": dev_expr, + } + ) # -- CTE diff ------------------------------------------------------ cte_changes: list[dict] = [] + prod_cte_nodes = _extract_cte_nodes(prod_parsed) + dev_cte_nodes = _extract_cte_nodes(dev_parsed) all_ctes = sorted(set(prod_ctes) | set(dev_ctes)) + for cte_name in all_ctes: prod_body = prod_ctes.get(cte_name) dev_body = dev_ctes.get(cte_name) @@ -255,14 +373,36 @@ def diff_model( elif dev_body is None: cte_changes.append({"cte_name": cte_name, "change_type": "removed"}) elif _norm(prod_body) != _norm(dev_body): - cte_changes.append({"cte_name": cte_name, "change_type": "modified"}) + # Determine if the modification is purely additive (safe for + # focused profiling) or structural (could affect existing data). + prod_node = prod_cte_nodes.get(cte_name) + dev_node = dev_cte_nodes.get(cte_name) + additive = ( + prod_node is not None + and dev_node is not None + and is_cte_change_additive(prod_node, dev_node) + ) + cte_changes.append({ + "cte_name": cte_name, + "change_type": "modified", + "is_additive": additive, + }) # -- Indirect changes flag ----------------------------------------- - # True when a CTE was modified or removed, meaning data flowing - # through it may differ even if output column expressions are - # unchanged (e.g. WHERE / JOIN / filter changes). + # True when a CTE was modified (structurally, not just additive) or + # removed, meaning data flowing through it may differ even if output + # column expressions look unchanged. + # Additive CTE changes (new columns / LEFT JOINs only) are safe. has_indirect = any( - c["change_type"] in ("modified", "removed") for c in cte_changes + c["change_type"] == "removed" + or (c["change_type"] == "modified" and not c.get("is_additive", False)) + for c in cte_changes + ) + + # Build affected_columns — the columns that need profiling. + # This is the union of added columns and columns whose expression changed. + affected_columns = sorted( + set(added_columns + [c["column"] for c in expression_changes]) ) result["is_new"] = False @@ -272,14 +412,13 @@ def diff_model( "removed_columns": removed_columns, "cte_changes": cte_changes, "has_indirect_changes": has_indirect, + "affected_columns": affected_columns, } # -- Warnings ------------------------------------------------------ warnings: list[str] = [] if prod_has_star or dev_has_star: - warnings.append( - "SELECT * detected — column list may be incomplete" - ) + warnings.append("SELECT * detected — column list may be incomplete") if used_compiled_fallback: warnings.append( "Dev side parsed from compiled SQL (raw_code had " @@ -296,6 +435,7 @@ def diff_model( # Manifest helpers # --------------------------------------------------------------------------- + def _load_manifest(path: str) -> dict: """Load a dbt manifest.json.""" with open(path) as fh: @@ -323,10 +463,7 @@ def _find_node(manifest: dict, model_name: str) -> dict | None: for node_key, node_val in manifest.get("nodes", {}).items(): if not node_key.startswith("model."): continue - if ( - node_val.get("alias") == model_name - or node_key.endswith(f".{model_name}") - ): + if node_val.get("alias") == model_name or node_key.endswith(f".{model_name}"): return node_val return None @@ -348,6 +485,7 @@ def _compiled_path(project_root: Path, node: dict) -> Path | None: # Main # --------------------------------------------------------------------------- + def main() -> None: parser = argparse.ArgumentParser( description="Compare dbt model SQL ASTs between prod and dev.", @@ -397,11 +535,13 @@ def main() -> None: for model_name in model_names: dev_node = _find_node(dev_manifest, model_name) if dev_node is None: - results.append({ - "model": model_name, - "code_changes": None, - "parse_error": f"model '{model_name}' not found in dev manifest", - }) + results.append( + { + "model": model_name, + "code_changes": None, + "parse_error": f"model '{model_name}' not found in dev manifest", + } + ) continue dev_raw = dev_node.get("raw_code") or dev_node.get("raw_sql", "") @@ -410,15 +550,11 @@ def main() -> None: if prod_manifest is not None: prod_node = _find_node(prod_manifest, model_name) if prod_node is not None: - prod_raw = prod_node.get("raw_code") or prod_node.get( - "raw_sql", "" - ) + prod_raw = prod_node.get("raw_code") or prod_node.get("raw_sql", "") compiled = _compiled_path(project_root, dev_node) - results.append( - diff_model(model_name, prod_raw, dev_raw, compiled) - ) + results.append(diff_model(model_name, prod_raw, dev_raw, compiled)) # Output --------------------------------------------------------------- json.dump(results, sys.stdout, indent=2) diff --git a/template.html b/template.html index 06244ef..eeda4e2 100644 --- a/template.html +++ b/template.html @@ -563,6 +563,36 @@ .profile-table tr.row-new:hover { background: rgba(5, 150, 105, 0.1); } +.profile-table tr.row-skipped { + opacity: 0.4; +} +.profile-table tr.row-skipped td { + font-style: italic; +} + +/* Focused profiling banner */ +.profiling-banner { + display: flex; + align-items: center; + gap: 8px; + padding: 8px 12px; + margin-bottom: 10px; + border-radius: 6px; + background: var(--surface-recessed); + border: 1px solid var(--border-subtle); + font-size: 12px; + color: var(--text-dim); +} +.profiling-banner .badge { + font-size: 10px; + font-weight: 700; + text-transform: uppercase; + letter-spacing: 0.04em; + padding: 2px 6px; + border-radius: 3px; + background: var(--teal); + color: var(--bg); +} /* Inline delta */ .delta { @@ -982,11 +1012,22 @@ // Column profile table const cols = model.column_profiles; const isNew = model.is_new; + const profiling = model.profiling || { mode: 'full' }; const hasNumeric = cols.some(c => { const d = c.dev || c.prod; return d && (d.min != null || d.max != null || d.mean != null); }); + // Focused profiling banner + if (profiling.mode === 'focused') { + html += `
`; + html += `Focused`; + html += `Profiled ${profiling.profiled_columns} of ${profiling.total_columns} columns `; + html += `(only changed/added). Row comparison still covers all columns.`; + html += ` Re-run with --full for complete profiling.`; + html += `
`; + } + html += `
`; html += `
`; html += ``; @@ -1004,18 +1045,23 @@ html += ``; html += ``; + const na = ''; + cols.forEach((col, ci) => { const p = col.prod || {}; const d = col.dev || {}; + const isProfiled = col.is_profiled !== false; // default true for backwards compat let rowCls = ''; - if (col.is_schema_addition) rowCls = 'row-new'; + if (!isProfiled) rowCls = 'row-skipped'; + else if (col.is_schema_addition) rowCls = 'row-new'; else if (col.is_changed) rowCls = 'row-changed'; // Status dot let dotCls = 'status-dot--same'; let dotTitle = 'Unchanged'; - if (col.is_primary_key) { dotCls = 'status-dot--pk'; dotTitle = 'Primary Key'; } - if (col.is_schema_addition) { dotCls = 'status-dot--added'; dotTitle = 'New Column'; } + if (!isProfiled) { dotCls = 'status-dot--same'; dotTitle = 'Not profiled (unchanged)'; } + else if (col.is_primary_key) { dotCls = 'status-dot--pk'; dotTitle = 'Primary Key'; } + else if (col.is_schema_addition) { dotCls = 'status-dot--added'; dotTitle = 'New Column'; } else if (col.is_changed) { dotCls = 'status-dot--changed'; dotTitle = 'Data Changed'; } html += ``; @@ -1023,21 +1069,30 @@ html += ``; html += ``; - if (isNew) { + if (!isProfiled) { + // Not profiled — show dashes + html += ``; + html += ``; + if (hasNumeric) { + html += ``; + html += ``; + html += ``; + } + } else if (isNew) { html += ``; html += ``; if (hasNumeric) { - html += ``; - html += ``; - html += ``; + html += ``; + html += ``; + html += ``; } } else if (col.is_schema_addition) { html += ``; html += ``; if (hasNumeric) { - html += ``; - html += ``; - html += ``; + html += ``; + html += ``; + html += ``; } } else { html += ``; @@ -1056,14 +1111,19 @@ // Sample rows const sr = model.sample_rows; - const totalSamples = sr.added.length + sr.removed.length + sr.modified.length; + const newColSamples = sr.new_columns || []; + const totalSamples = sr.added.length + sr.removed.length + sr.modified.length + newColSamples.length; if (totalSamples > 0) { html += `
`; html += `Sample Rows · ${totalSamples} shown`; html += `
`; if (sr.added.length > 0) { - html += `
+ Added (${sr.added.length})
`; + if (model.is_new && model.row_count.dev) { + html += `
Sample of ${sr.added.length} from ${fmt(model.row_count.dev)} new rows
`; + } else { + html += `
+ Added (${sr.added.length})
`; + } html += renderSampleTable(sr.added); } @@ -1077,6 +1137,17 @@ html += renderModifiedTable(sr.modified); } + if (newColSamples.length > 0) { + const addedColNames = (model.schema_changes || []) + .filter(sc => sc.change_type === 'added') + .map(sc => sc.column); + const colLabel = addedColNames.length > 0 + ? addedColNames.map(c => `${esc(c)}`).join(', ') + : 'new columns'; + html += `
★ New column data — ${colLabel} (${newColSamples.length} rows)
`; + html += renderSampleTable(newColSamples); + } + html += `
`; }
${esc(col.column)}${col.is_primary_key ? ' 🔑' : ''}${esc(col.data_type)}${na}${na}${na}${na}${na}${fmt(d.distinct_count)}${fmtPct(d.null_pct)}${d.min != null ? fmt(d.min, 2) : ''}${d.max != null ? fmt(d.max, 2) : ''}${d.mean != null ? fmt(d.mean, 2) : ''}${d.min != null ? fmt(d.min, 2) : na}${d.max != null ? fmt(d.max, 2) : na}${d.mean != null ? fmt(d.mean, 2) : na}${fmt(d.distinct_count)}${fmtPct(d.null_pct)}${d.min != null ? fmt(d.min, 2) : ''}${d.max != null ? fmt(d.max, 2) : ''}${d.mean != null ? fmt(d.mean, 2) : ''}${d.min != null ? fmt(d.min, 2) : na}${d.max != null ? fmt(d.max, 2) : na}${d.mean != null ? fmt(d.mean, 2) : na}${profileCellPair(p.distinct_count, d.distinct_count)}