diff --git a/mgtp/__init__.py b/mgtp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mgtp/decision_space.py b/mgtp/decision_space.py new file mode 100644 index 0000000..a50c9dd --- /dev/null +++ b/mgtp/decision_space.py @@ -0,0 +1,203 @@ +"""decision_space — Deterministic Decision-Space Diff Ledger. + +Schema: decision_space_snapshot_v1 + +{ + "version": "v1", + "variables": [string], + "allowed_transitions": [{"from": string, "to": string}], + "exclusions": [string], + "reason_code_families": {"": [string]} +} + +All operations are deterministic, stdlib-only, fail-closed on schema violation. +""" + +import hashlib +import json + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + +def validate_snapshot(snapshot: dict) -> None: + """Validate a decision-space snapshot against the v1 schema. + + Raises ValueError on any schema violation. + All checks are deterministic and fail-closed. + """ + if not isinstance(snapshot, dict): + raise ValueError("snapshot must be a dict") + + # version + if "version" not in snapshot: + raise ValueError("missing required field: version") + if snapshot["version"] != "v1": + raise ValueError(f"unsupported version: {snapshot['version']!r}; expected 'v1'") + + # variables + if "variables" not in snapshot: + raise ValueError("missing required field: variables") + if not isinstance(snapshot["variables"], list): + raise ValueError("variables must be a list") + for i, v in enumerate(snapshot["variables"]): + if not isinstance(v, str): + raise ValueError(f"variables[{i}] must be a string, got {type(v).__name__}") + + # allowed_transitions + if "allowed_transitions" not in snapshot: + raise ValueError("missing required field: allowed_transitions") + if not isinstance(snapshot["allowed_transitions"], list): + raise ValueError("allowed_transitions must be a list") + for i, t in enumerate(snapshot["allowed_transitions"]): + if not isinstance(t, dict): + raise ValueError(f"allowed_transitions[{i}] must be a dict") + if "from" not in t: + raise ValueError(f"allowed_transitions[{i}] missing field: from") + if "to" not in t: + raise ValueError(f"allowed_transitions[{i}] missing field: to") + if not isinstance(t["from"], str): + raise ValueError(f"allowed_transitions[{i}].from must be a string") + if not isinstance(t["to"], str): + raise ValueError(f"allowed_transitions[{i}].to must be a string") + + # exclusions + if "exclusions" not in snapshot: + raise ValueError("missing required field: exclusions") + if not isinstance(snapshot["exclusions"], list): + raise ValueError("exclusions must be a list") + for i, e in enumerate(snapshot["exclusions"]): + if not isinstance(e, str): + raise ValueError(f"exclusions[{i}] must be a string, got {type(e).__name__}") + + # reason_code_families + if "reason_code_families" not in snapshot: + raise ValueError("missing required field: reason_code_families") + if not isinstance(snapshot["reason_code_families"], dict): + raise ValueError("reason_code_families must be a dict") + for family, codes in snapshot["reason_code_families"].items(): + if not isinstance(family, str): + raise ValueError(f"reason_code_families key must be a string, got {type(family).__name__}") + if not isinstance(codes, list): + raise ValueError(f"reason_code_families[{family!r}] must be a list") + for j, code in enumerate(codes): + if not isinstance(code, str): + raise ValueError( + f"reason_code_families[{family!r}][{j}] must be a string, got {type(code).__name__}" + ) + + +# --------------------------------------------------------------------------- +# Canonicalization +# --------------------------------------------------------------------------- + +def canonicalize_snapshot(snapshot: dict) -> dict: + """Return a deterministically sorted copy of a snapshot. + + - variables, exclusions: sorted lexicographically + - allowed_transitions: sorted by (from, to) + - reason_code_families: keys sorted, each value list sorted + - version: preserved as-is + """ + return { + "version": snapshot["version"], + "variables": sorted(snapshot["variables"]), + "allowed_transitions": sorted( + snapshot["allowed_transitions"], key=lambda t: (t["from"], t["to"]) + ), + "exclusions": sorted(snapshot["exclusions"]), + "reason_code_families": { + family: sorted(codes) + for family, codes in sorted(snapshot["reason_code_families"].items()) + }, + } + + +# --------------------------------------------------------------------------- +# Hashing +# --------------------------------------------------------------------------- + +def snapshot_hash(snapshot: dict) -> str: + """Return SHA256 hex digest (lower-case) of the canonical JSON of a snapshot. + + The snapshot is canonicalized before hashing, so key insertion order + and list order do not affect the result. + """ + canonical = canonicalize_snapshot(snapshot) + serialized = json.dumps(canonical, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + return hashlib.sha256(serialized.encode("utf-8")).hexdigest() + + +# --------------------------------------------------------------------------- +# Diff +# --------------------------------------------------------------------------- + +def diff_snapshots(a: dict, b: dict) -> dict: + """Compute a deterministic structural diff between two snapshots. + + Both snapshots are validated and canonicalized before diffing. + + Returns: + { + "variables_added": [str], + "variables_removed": [str], + "transitions_added": [{"from": str, "to": str}], + "transitions_removed": [{"from": str, "to": str}], + "exclusions_added": [str], + "exclusions_removed": [str], + "reason_codes_added": {family: [str]}, + "reason_codes_removed": {family: [str]} + } + """ + validate_snapshot(a) + validate_snapshot(b) + + ca = canonicalize_snapshot(a) + cb = canonicalize_snapshot(b) + + # variables + vars_a = set(ca["variables"]) + vars_b = set(cb["variables"]) + + # transitions — represent as frozensets of (from, to) tuples + def _transition_key(t): + return (t["from"], t["to"]) + + trans_a = {_transition_key(t): t for t in ca["allowed_transitions"]} + trans_b = {_transition_key(t): t for t in cb["allowed_transitions"]} + + # exclusions + excl_a = set(ca["exclusions"]) + excl_b = set(cb["exclusions"]) + + # reason_code_families + all_families = sorted(set(ca["reason_code_families"]) | set(cb["reason_code_families"])) + reason_codes_added = {} + reason_codes_removed = {} + for family in all_families: + codes_a = set(ca["reason_code_families"].get(family, [])) + codes_b = set(cb["reason_code_families"].get(family, [])) + added = sorted(codes_b - codes_a) + removed = sorted(codes_a - codes_b) + if added: + reason_codes_added[family] = added + if removed: + reason_codes_removed[family] = removed + + return { + "variables_added": sorted(vars_b - vars_a), + "variables_removed": sorted(vars_a - vars_b), + "transitions_added": sorted( + [trans_b[k] for k in set(trans_b) - set(trans_a)], + key=_transition_key, + ), + "transitions_removed": sorted( + [trans_a[k] for k in set(trans_a) - set(trans_b)], + key=_transition_key, + ), + "exclusions_added": sorted(excl_b - excl_a), + "exclusions_removed": sorted(excl_a - excl_b), + "reason_codes_added": reason_codes_added, + "reason_codes_removed": reason_codes_removed, + } diff --git a/scripts/ds_diff.py b/scripts/ds_diff.py new file mode 100644 index 0000000..3b9d508 --- /dev/null +++ b/scripts/ds_diff.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +"""ds_diff — Decision-Space Diff CLI. + +Usage: + python scripts/ds_diff.py snapshot_a.json snapshot_b.json + +Output: + Hash A: + Hash B: + + +Exit codes: + 0 success + 1 validation failure or usage error +""" + +import json +import sys +from pathlib import Path + +# Allow running from any working directory +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from mgtp.decision_space import diff_snapshots, snapshot_hash, validate_snapshot + + +def _load_json(path: str) -> dict: + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except (OSError, json.JSONDecodeError) as exc: + print(f"ERROR: could not load {path!r}: {exc}", file=sys.stderr) + sys.exit(1) + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + if len(argv) != 2: + print("Usage: ds_diff.py snapshot_a.json snapshot_b.json", file=sys.stderr) + sys.exit(1) + + path_a, path_b = argv + + snapshot_a = _load_json(path_a) + snapshot_b = _load_json(path_b) + + try: + validate_snapshot(snapshot_a) + except ValueError as exc: + print(f"ERROR: snapshot A validation failed: {exc}", file=sys.stderr) + sys.exit(1) + + try: + validate_snapshot(snapshot_b) + except ValueError as exc: + print(f"ERROR: snapshot B validation failed: {exc}", file=sys.stderr) + sys.exit(1) + + hash_a = snapshot_hash(snapshot_a) + hash_b = snapshot_hash(snapshot_b) + + diff = diff_snapshots(snapshot_a, snapshot_b) + + print(f"Hash A: {hash_a}") + print(f"Hash B: {hash_b}") + print(json.dumps(diff, sort_keys=True, indent=2, ensure_ascii=False)) + + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/tests/test_decision_space.py b/tests/test_decision_space.py new file mode 100644 index 0000000..8d2c79c --- /dev/null +++ b/tests/test_decision_space.py @@ -0,0 +1,409 @@ +"""Tests for mgtp.decision_space — Decision-Space Diff Ledger.""" + +import json +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from mgtp.decision_space import ( + canonicalize_snapshot, + diff_snapshots, + snapshot_hash, + validate_snapshot, +) + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +VALID_SNAPSHOT = { + "version": "v1", + "variables": ["x", "y", "z"], + "allowed_transitions": [ + {"from": "GREEN", "to": "AMBER"}, + {"from": "AMBER", "to": "RED"}, + ], + "exclusions": ["deprecated_var"], + "reason_code_families": { + "ALLOW": ["allowlist_match"], + "REFUSE": ["default_refuse", "denylist_match"], + }, +} + +VALID_SNAPSHOT_B = { + "version": "v1", + "variables": ["x", "y", "z", "w"], + "allowed_transitions": [ + {"from": "AMBER", "to": "RED"}, + {"from": "GREEN", "to": "AMBER"}, + {"from": "GREEN", "to": "RED"}, + ], + "exclusions": [], + "reason_code_families": { + "ALLOW": ["allowlist_match", "escalation_override"], + "REFUSE": ["default_refuse"], + }, +} + + +# --------------------------------------------------------------------------- +# validate_snapshot — valid cases +# --------------------------------------------------------------------------- + +def test_valid_snapshot_passes(): + validate_snapshot(VALID_SNAPSHOT) # must not raise + + +def test_valid_empty_lists(): + snap = { + "version": "v1", + "variables": [], + "allowed_transitions": [], + "exclusions": [], + "reason_code_families": {}, + } + validate_snapshot(snap) # must not raise + + +# --------------------------------------------------------------------------- +# validate_snapshot — schema violation cases +# --------------------------------------------------------------------------- + +def test_non_dict_raises(): + with pytest.raises(ValueError, match="must be a dict"): + validate_snapshot(["not", "a", "dict"]) + + +def test_missing_version_raises(): + snap = dict(VALID_SNAPSHOT) + del snap["version"] + with pytest.raises(ValueError, match="version"): + validate_snapshot(snap) + + +def test_wrong_version_raises(): + snap = dict(VALID_SNAPSHOT) + snap["version"] = "v2" + with pytest.raises(ValueError, match="unsupported version"): + validate_snapshot(snap) + + +def test_missing_variables_raises(): + snap = {k: v for k, v in VALID_SNAPSHOT.items() if k != "variables"} + with pytest.raises(ValueError, match="variables"): + validate_snapshot(snap) + + +def test_variables_not_list_raises(): + snap = dict(VALID_SNAPSHOT) + snap["variables"] = "not_a_list" + with pytest.raises(ValueError, match="variables must be a list"): + validate_snapshot(snap) + + +def test_variables_non_string_element_raises(): + snap = dict(VALID_SNAPSHOT) + snap["variables"] = ["ok", 42] + with pytest.raises(ValueError, match="variables\\[1\\]"): + validate_snapshot(snap) + + +def test_missing_allowed_transitions_raises(): + snap = {k: v for k, v in VALID_SNAPSHOT.items() if k != "allowed_transitions"} + with pytest.raises(ValueError, match="allowed_transitions"): + validate_snapshot(snap) + + +def test_allowed_transitions_not_list_raises(): + snap = dict(VALID_SNAPSHOT) + snap["allowed_transitions"] = {"from": "A", "to": "B"} + with pytest.raises(ValueError, match="allowed_transitions must be a list"): + validate_snapshot(snap) + + +def test_transition_missing_from_raises(): + snap = dict(VALID_SNAPSHOT) + snap["allowed_transitions"] = [{"to": "AMBER"}] + with pytest.raises(ValueError, match="from"): + validate_snapshot(snap) + + +def test_transition_missing_to_raises(): + snap = dict(VALID_SNAPSHOT) + snap["allowed_transitions"] = [{"from": "GREEN"}] + with pytest.raises(ValueError, match="to"): + validate_snapshot(snap) + + +def test_transition_from_non_string_raises(): + snap = dict(VALID_SNAPSHOT) + snap["allowed_transitions"] = [{"from": 1, "to": "AMBER"}] + with pytest.raises(ValueError, match="from must be a string"): + validate_snapshot(snap) + + +def test_transition_to_non_string_raises(): + snap = dict(VALID_SNAPSHOT) + snap["allowed_transitions"] = [{"from": "GREEN", "to": 2}] + with pytest.raises(ValueError, match="to must be a string"): + validate_snapshot(snap) + + +def test_missing_exclusions_raises(): + snap = {k: v for k, v in VALID_SNAPSHOT.items() if k != "exclusions"} + with pytest.raises(ValueError, match="exclusions"): + validate_snapshot(snap) + + +def test_exclusions_not_list_raises(): + snap = dict(VALID_SNAPSHOT) + snap["exclusions"] = "bad" + with pytest.raises(ValueError, match="exclusions must be a list"): + validate_snapshot(snap) + + +def test_exclusions_non_string_element_raises(): + snap = dict(VALID_SNAPSHOT) + snap["exclusions"] = [True] + with pytest.raises(ValueError, match="exclusions\\[0\\]"): + validate_snapshot(snap) + + +def test_missing_reason_code_families_raises(): + snap = {k: v for k, v in VALID_SNAPSHOT.items() if k != "reason_code_families"} + with pytest.raises(ValueError, match="reason_code_families"): + validate_snapshot(snap) + + +def test_reason_code_families_not_dict_raises(): + snap = dict(VALID_SNAPSHOT) + snap["reason_code_families"] = ["ALLOW"] + with pytest.raises(ValueError, match="reason_code_families must be a dict"): + validate_snapshot(snap) + + +def test_reason_code_family_value_not_list_raises(): + snap = dict(VALID_SNAPSHOT) + snap["reason_code_families"] = {"ALLOW": "not_a_list"} + with pytest.raises(ValueError, match="must be a list"): + validate_snapshot(snap) + + +def test_reason_code_family_code_non_string_raises(): + snap = dict(VALID_SNAPSHOT) + snap["reason_code_families"] = {"ALLOW": [99]} + with pytest.raises(ValueError, match="must be a string"): + validate_snapshot(snap) + + +# --------------------------------------------------------------------------- +# canonicalize_snapshot +# --------------------------------------------------------------------------- + +def test_canonicalize_sorts_variables(): + snap = dict(VALID_SNAPSHOT) + snap["variables"] = ["z", "a", "m"] + result = canonicalize_snapshot(snap) + assert result["variables"] == ["a", "m", "z"] + + +def test_canonicalize_sorts_exclusions(): + snap = dict(VALID_SNAPSHOT) + snap["exclusions"] = ["gamma", "alpha", "beta"] + result = canonicalize_snapshot(snap) + assert result["exclusions"] == ["alpha", "beta", "gamma"] + + +def test_canonicalize_sorts_transitions(): + snap = dict(VALID_SNAPSHOT) + snap["allowed_transitions"] = [ + {"from": "RED", "to": "GREEN"}, + {"from": "AMBER", "to": "RED"}, + {"from": "GREEN", "to": "AMBER"}, + ] + result = canonicalize_snapshot(snap) + assert result["allowed_transitions"] == [ + {"from": "AMBER", "to": "RED"}, + {"from": "GREEN", "to": "AMBER"}, + {"from": "RED", "to": "GREEN"}, + ] + + +def test_canonicalize_sorts_reason_code_families_keys(): + snap = dict(VALID_SNAPSHOT) + snap["reason_code_families"] = {"Z_FAM": ["z_code"], "A_FAM": ["a_code"]} + result = canonicalize_snapshot(snap) + assert list(result["reason_code_families"].keys()) == ["A_FAM", "Z_FAM"] + + +def test_canonicalize_sorts_reason_codes_within_family(): + snap = dict(VALID_SNAPSHOT) + snap["reason_code_families"] = {"ALLOW": ["z_code", "a_code", "m_code"]} + result = canonicalize_snapshot(snap) + assert result["reason_code_families"]["ALLOW"] == ["a_code", "m_code", "z_code"] + + +# --------------------------------------------------------------------------- +# snapshot_hash — stability and determinism +# --------------------------------------------------------------------------- + +def test_hash_is_stable(): + h1 = snapshot_hash(VALID_SNAPSHOT) + h2 = snapshot_hash(VALID_SNAPSHOT) + assert h1 == h2 + + +def test_hash_is_lowercase_hex(): + h = snapshot_hash(VALID_SNAPSHOT) + assert len(h) == 64 + assert h == h.lower() + assert all(c in "0123456789abcdef" for c in h) + + +def test_hash_independent_of_key_insertion_order(): + snap_a = { + "version": "v1", + "variables": ["x"], + "allowed_transitions": [], + "exclusions": [], + "reason_code_families": {}, + } + snap_b = { + "exclusions": [], + "allowed_transitions": [], + "reason_code_families": {}, + "variables": ["x"], + "version": "v1", + } + assert snapshot_hash(snap_a) == snapshot_hash(snap_b) + + +def test_hash_independent_of_list_order(): + snap_a = dict(VALID_SNAPSHOT) + snap_a["variables"] = ["z", "y", "x"] + snap_b = dict(VALID_SNAPSHOT) + snap_b["variables"] = ["x", "y", "z"] + assert snapshot_hash(snap_a) == snapshot_hash(snap_b) + + +def test_different_snapshots_have_different_hashes(): + assert snapshot_hash(VALID_SNAPSHOT) != snapshot_hash(VALID_SNAPSHOT_B) + + +# --------------------------------------------------------------------------- +# diff_snapshots +# --------------------------------------------------------------------------- + +def test_identical_snapshots_produce_empty_diff(): + diff = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT) + assert diff["variables_added"] == [] + assert diff["variables_removed"] == [] + assert diff["transitions_added"] == [] + assert diff["transitions_removed"] == [] + assert diff["exclusions_added"] == [] + assert diff["exclusions_removed"] == [] + assert diff["reason_codes_added"] == {} + assert diff["reason_codes_removed"] == {} + + +def test_diff_detects_variable_added(): + diff = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + assert "w" in diff["variables_added"] + + +def test_diff_detects_no_variable_removed_when_b_is_superset(): + diff = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + # VALID_SNAPSHOT_B adds "w" but keeps x, y, z + assert diff["variables_removed"] == [] + + +def test_diff_detects_variable_removed(): + diff = diff_snapshots(VALID_SNAPSHOT_B, VALID_SNAPSHOT) + assert "w" in diff["variables_removed"] + + +def test_diff_detects_transition_added(): + diff = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + assert {"from": "GREEN", "to": "RED"} in diff["transitions_added"] + + +def test_diff_detects_transition_removed(): + diff = diff_snapshots(VALID_SNAPSHOT_B, VALID_SNAPSHOT) + assert {"from": "GREEN", "to": "RED"} in diff["transitions_removed"] + + +def test_diff_detects_exclusion_removed(): + diff = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + assert "deprecated_var" in diff["exclusions_removed"] + + +def test_diff_detects_exclusion_added(): + diff = diff_snapshots(VALID_SNAPSHOT_B, VALID_SNAPSHOT) + assert "deprecated_var" in diff["exclusions_added"] + + +def test_diff_detects_reason_code_added(): + diff = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + assert "escalation_override" in diff["reason_codes_added"].get("ALLOW", []) + + +def test_diff_detects_reason_code_removed(): + diff = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + assert "denylist_match" in diff["reason_codes_removed"].get("REFUSE", []) + + +def test_diff_raises_on_invalid_snapshot_a(): + bad = {"version": "v1"} + with pytest.raises(ValueError): + diff_snapshots(bad, VALID_SNAPSHOT) + + +def test_diff_raises_on_invalid_snapshot_b(): + bad = {"version": "v1"} + with pytest.raises(ValueError): + diff_snapshots(VALID_SNAPSHOT, bad) + + +def test_diff_is_deterministic(): + d1 = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + d2 = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + assert json.dumps(d1, sort_keys=True) == json.dumps(d2, sort_keys=True) + + +def test_diff_no_false_positives_for_unchanged_items(): + snap_a = { + "version": "v1", + "variables": ["x", "y"], + "allowed_transitions": [{"from": "A", "to": "B"}], + "exclusions": ["ex1"], + "reason_code_families": {"FAM": ["code1"]}, + } + snap_b = dict(snap_a) + snap_b["variables"] = ["x", "y", "z"] # only add z + diff = diff_snapshots(snap_a, snap_b) + assert diff["variables_added"] == ["z"] + assert diff["variables_removed"] == [] + assert diff["transitions_added"] == [] + assert diff["transitions_removed"] == [] + assert diff["exclusions_added"] == [] + assert diff["exclusions_removed"] == [] + assert diff["reason_codes_added"] == {} + assert diff["reason_codes_removed"] == {} + + +def test_diff_output_keys_present(): + diff = diff_snapshots(VALID_SNAPSHOT, VALID_SNAPSHOT_B) + expected_keys = { + "variables_added", + "variables_removed", + "transitions_added", + "transitions_removed", + "exclusions_added", + "exclusions_removed", + "reason_codes_added", + "reason_codes_removed", + } + assert set(diff.keys()) == expected_keys