From c04998c8d0914c1069ae52eeeac9847b5a5a0cd8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 23:53:27 +0000 Subject: [PATCH 1/4] Initial plan From e2a23ce56e54f734077b77d13f3e6deb8128b59e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 1 Mar 2026 23:55:49 +0000 Subject: [PATCH 2/4] Add .github/copilot-instructions.md and commitboundary_spec_intake.prompt.md Co-authored-by: LalaSkye <228581229+LalaSkye@users.noreply.github.com> --- .github/copilot-instructions.md | 51 ++++++++++ .../commitboundary_spec_intake.prompt.md | 93 +++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 .github/copilot-instructions.md create mode 100644 .github/prompts/commitboundary_spec_intake.prompt.md diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..fe0fbb0 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,51 @@ +# Copilot Instructions for constraint-workshop + +## Repository purpose + +This repository contains small, deterministic control primitives for software systems. +Each primitive is a standalone, testable, auditable brick with no framework dependencies, +no runtime state, and no hidden behavior. + +## Coding standards + +- **stdlib only** — no third-party runtime dependencies unless explicitly approved. +- **Deterministic** — same inputs must always produce the same outputs; no randomness, no time-dependent logic, no global state. +- **No side effects** — primitives must not log, mutate shared state, or perform I/O. +- **Pure Python** — target Python 3.10–3.12. Avoid walrus operators or syntax unavailable on 3.10. +- **Tests mandatory** — every primitive ships with a `pytest` test file. Tests must cover all invariants stated in the module docstring. +- **No ML / no network** — classifiers use phrase matching and regex only. + +## Module layout conventions + +``` +/ + src// # source package (importable) + tests/ # pytest tests + rules/ # declarative JSON rule files (where applicable) + baselines/ # hash-bound baseline artefacts (where applicable) + README.md # contract + usage +``` + +## Commit Gate specifics + +- Verdicts are exactly: `ALLOW`, `REFUSE`, `ESCALATE` — no new verdict values. +- Scope matching is **exact string match only** — no glob, no regex, no prefix. +- `invariant_hash` is SHA-256 of the declared contract text (hex, lowercase, 64 chars). +- `decision_hash` is SHA-256 of `canonical_request + verdict + sorted(reasons)`. +- Authority drift detection: new allowlist edges with an unchanged `invariant_hash` must **FAIL**. +- The `/prometheus/` directory is observability-only and must never be imported by gate/engine code. + +## Invariant litmus specifics + +- Scoring: `+0.25` hard-invariant signal, `-0.25` cost-curve signal, `+0.15` quantification signal. +- Negation window: 2 words. +- No external dependencies beyond `re`. + +## Pull request checklist + +Before opening a PR, verify: +1. All new primitives have a matching `test_.py`. +2. `pytest` passes with zero failures. +3. No new runtime dependencies introduced. +4. README updated if the public interface changed. +5. If a new `CommitRequest` field is added, update `canonicalise.py` and regenerate baselines. diff --git a/.github/prompts/commitboundary_spec_intake.prompt.md b/.github/prompts/commitboundary_spec_intake.prompt.md new file mode 100644 index 0000000..ccdd031 --- /dev/null +++ b/.github/prompts/commitboundary_spec_intake.prompt.md @@ -0,0 +1,93 @@ +--- +mode: 'agent' +description: 'Intake prompt: generate a v0 specification for the CommitBoundary primitive.' +--- + +# CommitBoundary Spec Intake — v0 + +You are a specification author for the `constraint-workshop` repository. +Your task is to produce a complete, self-contained v0 specification for a new primitive called **CommitBoundary**. + +## Context + +The repository already contains: +- `stop_machine` — a three-state deterministic machine (GREEN → AMBER → RED). +- `authority_gate` — an evidence-ordered access gate (NONE < USER < OWNER < ADMIN). +- `invariant_litmus` — a posture classifier (HARD_INVARIANT / COST_CURVE / EDGE). +- `commit_gate` — a hash-bound commit authority engine (ALLOW / REFUSE / ESCALATE). + +All primitives share these invariants: +- stdlib only, no network, no randomness, no global state. +- Deterministic: same inputs → same outputs. +- Testable in complete isolation. + +## CommitBoundary definition + +A **CommitBoundary** is a declarative, hash-bound envelope that wraps a single logical +commit action and asserts: + +1. **Scope envelope** — the set of resource paths or identifiers the commit is permitted to touch. +2. **Actor binding** — which actor (by `actor_id`) owns this boundary instance. +3. **Action class constraint** — which `action_class` values are permitted inside this boundary. +4. **Boundary hash** — SHA-256 of the canonical (sorted, minified) JSON representation of fields 1–3. + Any mutation to scope, actor, or action constraints must produce a new hash. +5. **Violation detection** — given a `CommitRequest`, return `WITHIN` if the request is fully + contained by the boundary, or `BREACH` if any field falls outside the declared envelope. + +## What to generate + +Produce the following files. Output each file as a fenced code block with its path as the language tag. + +### 1. `commit_boundary/README.md` + +Include: +- One-paragraph description. +- Invariants table (at minimum: boundary_hash immutability, WITHIN/BREACH are the only verdicts, + scope matching is exact-string only, no side effects). +- Input/output field tables for `BoundarySpec` and `BoundaryVerdict`. +- Minimal usage example (Python snippet). +- How to run tests. + +### 2. `commit_boundary/src/commit_boundary/__init__.py` + +Export: `BoundarySpec`, `BoundaryVerdict`, `CommitBoundary`. + +### 3. `commit_boundary/src/commit_boundary/boundary.py` + +Implement: +- `BoundarySpec(actor_id, action_classes, scope_paths)` — immutable dataclass. + - `boundary_hash: str` — computed at construction, SHA-256 hex of canonical JSON. +- `BoundaryVerdict` — enum with `WITHIN` and `BREACH`. +- `CommitBoundary(spec: BoundarySpec)` — evaluator class. + - `evaluate(request: dict) -> BoundaryVerdict` — pure, no side effects. + - Returns `BREACH` if `request["actor_id"] != spec.actor_id`. + - Returns `BREACH` if `request["action_class"] not in spec.action_classes`. + - Returns `BREACH` if any path in `request.get("scope_paths", [])` is not in `spec.scope_paths`. + - Returns `WITHIN` otherwise. +- Canonicalisation: sort all collections before hashing; use `json.dumps(..., sort_keys=True, separators=(',',':'))`. + +### 4. `commit_boundary/tests/test_boundary.py` + +Cover all invariants with `pytest` tests: +- `WITHIN` verdict for a fully matching request. +- `BREACH` for wrong `actor_id`. +- `BREACH` for disallowed `action_class`. +- `BREACH` for out-of-scope path. +- Hash immutability: same spec constructed twice has the same `boundary_hash`. +- Hash sensitivity: mutating any spec field changes `boundary_hash`. +- No side effects: calling `evaluate()` does not mutate the spec. + +## Constraints + +- stdlib only (`hashlib`, `json`, `dataclasses`, `enum`). +- No new verdict values beyond `WITHIN` and `BREACH`. +- Scope matching: exact string match, no glob, no regex. +- `boundary_hash` must be 64-character lowercase hex SHA-256. +- Must be deterministic across Python 3.10, 3.11, 3.12. +- Do not import anything from `commit_gate`, `stop_machine`, `authority_gate`, + `invariant_litmus`, or `prometheus`. + +## Output format + +Return only the file contents as fenced code blocks (path as the fence language label). +Do not add prose outside the code blocks. From 3bcce88396db7f1d073449c477c77f769a0abf74 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 2 Mar 2026 00:00:22 +0000 Subject: [PATCH 3/4] Rewrite rails files: 5 invariants + schemas-first rule in copilot-instructions; schemas+golden-only scope in prompt Co-authored-by: LalaSkye <228581229+LalaSkye@users.noreply.github.com> --- .github/copilot-instructions.md | 16 ++ .../commitboundary_spec_intake.prompt.md | 159 ++++++++++-------- 2 files changed, 101 insertions(+), 74 deletions(-) diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index fe0fbb0..d376cbf 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -41,6 +41,22 @@ no runtime state, and no hidden behavior. - Negation window: 2 words. - No external dependencies beyond `re`. +## Hard decision invariants (enforced by all gate primitives) + +These rules are non-negotiable and must be encoded in every gate, engine, and policy primitive: + +1. **Fail-closed defaults** — if policy, identity, or logging is missing, the decision is `DENY`. +2. **No silent success** — every decision must produce a Receipt; if a Receipt cannot be created, the decision is `DENY`. +3. **No self-approval** — `actor_id` cannot appear as an approver for the same request; if it does, the decision is `DENY`. +4. **Approval required but absent** — if approval is required and no valid approval is present, the decision is `HOLD` (never `ALLOW`). +5. **Tamper-evident log** — Receipts carry `{seq, prev_hash, this_hash}`; any out-of-order sequence or invalid hash linkage must be rejected. + +## Process rules + +- **Prefer schemas + conformance tests before implementation.** Define the JSON schema and golden test vectors first; open a separate PR for the implementation. +- **PRs must be small and reviewable.** One primitive or one contract change per PR. +- **No scope expansion.** A PR that adds new verdicts, new primitives, or new cross-module imports without a preceding schema PR must be rejected. + ## Pull request checklist Before opening a PR, verify: diff --git a/.github/prompts/commitboundary_spec_intake.prompt.md b/.github/prompts/commitboundary_spec_intake.prompt.md index ccdd031..124d77b 100644 --- a/.github/prompts/commitboundary_spec_intake.prompt.md +++ b/.github/prompts/commitboundary_spec_intake.prompt.md @@ -1,93 +1,104 @@ --- mode: 'agent' -description: 'Intake prompt: generate a v0 specification for the CommitBoundary primitive.' +description: 'Intake prompt: generate schemas + golden test vectors for CommitBoundary v0 (no implementation).' --- # CommitBoundary Spec Intake — v0 You are a specification author for the `constraint-workshop` repository. -Your task is to produce a complete, self-contained v0 specification for a new primitive called **CommitBoundary**. + +## SCOPE CONSTRAINT — READ FIRST + +You must generate **schemas and golden test vectors only**. +Do NOT generate any Python implementation classes, engine code, or runtime logic. +Do NOT create `BoundarySpec`, `BoundaryVerdict`, or any callable Python class. +Implementation is deferred to a separate, subsequent PR. ## Context -The repository already contains: -- `stop_machine` — a three-state deterministic machine (GREEN → AMBER → RED). -- `authority_gate` — an evidence-ordered access gate (NONE < USER < OWNER < ADMIN). -- `invariant_litmus` — a posture classifier (HARD_INVARIANT / COST_CURVE / EDGE). -- `commit_gate` — a hash-bound commit authority engine (ALLOW / REFUSE / ESCALATE). +The repository contains deterministic control primitives (stdlib-only, no network, no randomness). +All decisions are fail-closed: missing policy → DENY; no Receipt possible → DENY; +self-approval → DENY; approval required but absent → HOLD (never ALLOW). + +## What to generate -All primitives share these invariants: -- stdlib only, no network, no randomness, no global state. -- Deterministic: same inputs → same outputs. -- Testable in complete isolation. +Produce exactly the following files as fenced code blocks with the path as the language label. -## CommitBoundary definition +### 1. `spec/schemas/commit_boundary_spec.schema.json` -A **CommitBoundary** is a declarative, hash-bound envelope that wraps a single logical -commit action and asserts: +A JSON Schema (draft-07) describing a `BoundarySpec` document: +- `actor_id` (string, required) +- `action_classes` (array of strings, minItems 1, unique, required) +- `scope_paths` (array of strings, minItems 1, unique, required) +- `boundary_hash` (string, pattern `^[0-9a-f]{64}$`, required) -1. **Scope envelope** — the set of resource paths or identifiers the commit is permitted to touch. -2. **Actor binding** — which actor (by `actor_id`) owns this boundary instance. -3. **Action class constraint** — which `action_class` values are permitted inside this boundary. -4. **Boundary hash** — SHA-256 of the canonical (sorted, minified) JSON representation of fields 1–3. - Any mutation to scope, actor, or action constraints must produce a new hash. -5. **Violation detection** — given a `CommitRequest`, return `WITHIN` if the request is fully - contained by the boundary, or `BREACH` if any field falls outside the declared envelope. +### 2. `spec/schemas/boundary_verdict.schema.json` -## What to generate +A JSON Schema (draft-07) describing a `BoundaryVerdict` document: +- `verdict` (string, enum `["WITHIN", "BREACH"]`, required) +- `actor_id` (string, required) +- `action_class` (string, required) +- `scope_paths_requested` (array of strings, required) +- `boundary_hash` (string, pattern `^[0-9a-f]{64}$`, required) + +### 3. `spec/golden/build1/within.json` + +A golden vector representing a **WITHIN** verdict: +- actor and action class exactly match the spec +- all requested scope_paths are in the spec's scope_paths +- Include a `"_comment"` field: `"Happy path: all fields match"` + +### 4. `spec/golden/build1/breach_actor.json` + +A golden vector representing a **BREACH** verdict caused by wrong `actor_id`. +- Include a `"_comment"` field: `"actor_id mismatch triggers BREACH"` + +### 5. `spec/golden/build1/breach_action.json` + +A golden vector representing a **BREACH** verdict caused by a disallowed `action_class`. +- Include a `"_comment"` field: `"action_class not in spec triggers BREACH"` -Produce the following files. Output each file as a fenced code block with its path as the language tag. - -### 1. `commit_boundary/README.md` - -Include: -- One-paragraph description. -- Invariants table (at minimum: boundary_hash immutability, WITHIN/BREACH are the only verdicts, - scope matching is exact-string only, no side effects). -- Input/output field tables for `BoundarySpec` and `BoundaryVerdict`. -- Minimal usage example (Python snippet). -- How to run tests. - -### 2. `commit_boundary/src/commit_boundary/__init__.py` - -Export: `BoundarySpec`, `BoundaryVerdict`, `CommitBoundary`. - -### 3. `commit_boundary/src/commit_boundary/boundary.py` - -Implement: -- `BoundarySpec(actor_id, action_classes, scope_paths)` — immutable dataclass. - - `boundary_hash: str` — computed at construction, SHA-256 hex of canonical JSON. -- `BoundaryVerdict` — enum with `WITHIN` and `BREACH`. -- `CommitBoundary(spec: BoundarySpec)` — evaluator class. - - `evaluate(request: dict) -> BoundaryVerdict` — pure, no side effects. - - Returns `BREACH` if `request["actor_id"] != spec.actor_id`. - - Returns `BREACH` if `request["action_class"] not in spec.action_classes`. - - Returns `BREACH` if any path in `request.get("scope_paths", [])` is not in `spec.scope_paths`. - - Returns `WITHIN` otherwise. -- Canonicalisation: sort all collections before hashing; use `json.dumps(..., sort_keys=True, separators=(',',':'))`. - -### 4. `commit_boundary/tests/test_boundary.py` - -Cover all invariants with `pytest` tests: -- `WITHIN` verdict for a fully matching request. -- `BREACH` for wrong `actor_id`. -- `BREACH` for disallowed `action_class`. -- `BREACH` for out-of-scope path. -- Hash immutability: same spec constructed twice has the same `boundary_hash`. -- Hash sensitivity: mutating any spec field changes `boundary_hash`. -- No side effects: calling `evaluate()` does not mutate the spec. - -## Constraints - -- stdlib only (`hashlib`, `json`, `dataclasses`, `enum`). -- No new verdict values beyond `WITHIN` and `BREACH`. -- Scope matching: exact string match, no glob, no regex. -- `boundary_hash` must be 64-character lowercase hex SHA-256. -- Must be deterministic across Python 3.10, 3.11, 3.12. -- Do not import anything from `commit_gate`, `stop_machine`, `authority_gate`, - `invariant_litmus`, or `prometheus`. +### 6. `spec/golden/build1/breach_scope.json` + +A golden vector representing a **BREACH** verdict caused by an out-of-scope path. +- Include a `"_comment"` field: `"scope_path outside envelope triggers BREACH"` + +### 7. `spec/golden/build2/hold_no_approval.json` + +A golden vector representing a **HOLD** outcome: +- approval is required by the spec but no approval is present in the request +- Include a `"_comment"` field: `"approval required but absent -> HOLD, never ALLOW"` + +### 8. `tests/test_schemas.py` + +A `pytest` test file that: +- Loads `spec/schemas/commit_boundary_spec.schema.json` and `spec/schemas/boundary_verdict.schema.json` using only stdlib (`json`). +- Validates that each golden file in `spec/golden/build1/` and `spec/golden/build2/` is valid JSON (parseable). +- Checks that every golden verdict file contains a `"verdict"` key whose value is one of `["WITHIN", "BREACH", "HOLD"]`. +- Checks that every golden file contains a `"_comment"` key. +- Uses `jsonschema` if available, otherwise skips schema-validation tests with `pytest.importorskip`. + +### 9. `tests/test_golden_invariants.py` + +A `pytest` test file that encodes the hard decision invariants as parameterised checks against the golden vectors: +- For every golden file with `"verdict": "WITHIN"`: assert `actor_id`, `action_class`, and all `scope_paths_requested` are present and non-empty. +- For every golden file with `"verdict": "BREACH"`: assert at least one of actor, action, or scope is the cause (field `"breach_reason"` must be present). +- For `hold_no_approval.json`: assert `"verdict"` is `"HOLD"` and `"approval_present"` is `false`. +- No implementation logic — tests assert only the structure and values of the JSON golden files. + +### 10. `README.md` section — append only + +Append a new section at the end of the existing README with heading `## What is inspectable in v0`. + +Content must include: +- One sentence stating that v0 ships schemas and golden test vectors only; implementation is in a future PR. +- A bullet list of the schema files under `spec/schemas/`. +- A bullet list of the golden vector files under `spec/golden/`. +- A bullet list of the test files and what invariant each covers. +- The sentence: "All verdicts are fail-closed: missing policy → DENY; no Receipt → DENY; self-approval → DENY; approval required but absent → HOLD." ## Output format -Return only the file contents as fenced code blocks (path as the fence language label). -Do not add prose outside the code blocks. +Return only fenced code blocks, one per file, with the relative file path as the fence label. +Do not add any prose, explanation, or commentary outside the code blocks. + From 1cca11f02254aab9396f73e959c12821e212576f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 2 Mar 2026 00:09:00 +0000 Subject: [PATCH 4/4] Add decision_space_ledger: deterministic inspection utility for decision-space snapshots Co-authored-by: LalaSkye <228581229+LalaSkye@users.noreply.github.com> --- decision_space_ledger/README.md | 123 ++++ .../src/decision_space_ledger/__init__.py | 33 + .../src/decision_space_ledger/__main__.py | 7 + .../src/decision_space_ledger/canonicalise.py | 46 ++ .../src/decision_space_ledger/cli.py | 70 +++ .../decision_space_snapshot_v1.schema.json | 49 ++ .../src/decision_space_ledger/diff.py | 95 +++ .../src/decision_space_ledger/schema.py | 171 +++++ decision_space_ledger/tests/test_ledger.py | 589 ++++++++++++++++++ 9 files changed, 1183 insertions(+) create mode 100644 decision_space_ledger/README.md create mode 100644 decision_space_ledger/src/decision_space_ledger/__init__.py create mode 100644 decision_space_ledger/src/decision_space_ledger/__main__.py create mode 100644 decision_space_ledger/src/decision_space_ledger/canonicalise.py create mode 100644 decision_space_ledger/src/decision_space_ledger/cli.py create mode 100644 decision_space_ledger/src/decision_space_ledger/decision_space_snapshot_v1.schema.json create mode 100644 decision_space_ledger/src/decision_space_ledger/diff.py create mode 100644 decision_space_ledger/src/decision_space_ledger/schema.py create mode 100644 decision_space_ledger/tests/test_ledger.py diff --git a/decision_space_ledger/README.md b/decision_space_ledger/README.md new file mode 100644 index 0000000..cdea022 --- /dev/null +++ b/decision_space_ledger/README.md @@ -0,0 +1,123 @@ +# decision_space_ledger + +Deterministic inspection utility for versioned decision-space snapshots. + +**Inspection only.** This primitive does not modify any enforcement, gating, authority, or runtime logic. It reads snapshots, validates them, canonicalises them, hashes them, and produces structured diffs. + +--- + +## Schema — `decision_space_snapshot_v1` + +| Field | Type | Constraints | +|-------|------|-------------| +| `version` | string | Must be exactly `"v1"` | +| `variables` | array of strings | Unique items | +| `allowed_transitions` | array of `{from, to}` objects | Each item has exactly `from` (string) and `to` (string) | +| `exclusions` | array of strings | Unique items | +| `reason_code_families` | object → array of strings | Each value array has unique items | + +The canonical JSON Schema (draft-07) is in `src/decision_space_ledger/decision_space_snapshot_v1.schema.json`. + +--- + +## Invariants + +| # | Invariant | +|---|-----------| +| 1 | `validate()` raises `ValueError` for any schema violation — no silent pass-through | +| 2 | `canonicalise()` is order-independent: logically equivalent snapshots produce byte-identical output | +| 3 | `canonical_hash()` is exactly 64 lowercase hex characters (SHA-256) | +| 4 | Same inputs always produce the same outputs (deterministic, no randomness, no global state) | +| 5 | `diff()` output lists are always sorted (deterministic across runs) | +| 6 | No side effects — no I/O, no logging, no state mutation | + +--- + +## Interface + +```python +from decision_space_ledger import validate, canonicalise, canonical_hash, diff + +snapshot = { + "version": "v1", + "variables": ["APPROVE", "DENY", "HOLD"], + "allowed_transitions": [ + {"from": "HOLD", "to": "APPROVE"}, + {"from": "HOLD", "to": "DENY"}, + ], + "exclusions": [], + "reason_code_families": { + "approval": ["MANUAL", "AUTO"], + }, +} + +validate(snapshot) # raises ValueError on any violation +h = canonical_hash(snapshot) # 64-char lowercase hex SHA-256 +delta = diff(snapshot, other) # structured diff dict +``` + +--- + +## CLI + +Compare two snapshot files: + +```bash +python -m decision_space_ledger snapshot_a.json snapshot_b.json +``` + +Output (JSON to stdout): + +```json +{ + "snapshot_a": {"path": "snapshot_a.json", "hash": ""}, + "snapshot_b": {"path": "snapshot_b.json", "hash": ""}, + "diff": { + "is_identical": false, + "variables_added": ["NEW_STATE"], + "variables_removed": [], + "transitions_added": [{"from": "HOLD", "to": "NEW_STATE"}], + "transitions_removed": [], + "exclusions_added": [], + "exclusions_removed": [], + "reason_code_families_added": [], + "reason_code_families_removed": [], + "reason_code_families_changed": {} + } +} +``` + +Exit codes: `0` success, `1` I/O or validation error, `2` incorrect usage. + +--- + +## Run Tests + +```bash +pytest decision_space_ledger/tests/ -v +``` + +Or from the repository root (tests are discovered automatically): + +```bash +pytest -q +``` + +--- + +## Module layout + +``` +decision_space_ledger/ + src/decision_space_ledger/ + __init__.py # public exports + schema.py # SCHEMA constant + validate() + decision_space_snapshot_v1.schema.json # JSON Schema draft-07 reference + canonicalise.py # canonicalise() + canonical_hash() + diff.py # diff() + cli.py # CLI main() + __main__.py # python -m entry point + tests/ + test_ledger.py # full unit tests (62 tests) + README.md +``` diff --git a/decision_space_ledger/src/decision_space_ledger/__init__.py b/decision_space_ledger/src/decision_space_ledger/__init__.py new file mode 100644 index 0000000..c17ee18 --- /dev/null +++ b/decision_space_ledger/src/decision_space_ledger/__init__.py @@ -0,0 +1,33 @@ +"""Decision-Space Diff Ledger — inspection utility for versioned decision-space snapshots. + +Inspection only. No modification of enforcement, gating, authority, or runtime logic. + +Public API:: + + from decision_space_ledger import validate, canonicalise, canonical_hash, diff + + snapshot = { + "version": "v1", + "variables": ["A", "B"], + "allowed_transitions": [{"from": "A", "to": "B"}], + "exclusions": [], + "reason_code_families": {"approval": ["MANUAL", "AUTO"]}, + } + validate(snapshot) + h = canonical_hash(snapshot) + delta = diff(snapshot, other_snapshot) +""" + +from .canonicalise import canonical_hash, canonicalise +from .diff import diff +from .schema import SCHEMA, validate + +__version__ = "0.1.0" + +__all__ = [ + "SCHEMA", + "canonicalise", + "canonical_hash", + "diff", + "validate", +] diff --git a/decision_space_ledger/src/decision_space_ledger/__main__.py b/decision_space_ledger/src/decision_space_ledger/__main__.py new file mode 100644 index 0000000..2c27f86 --- /dev/null +++ b/decision_space_ledger/src/decision_space_ledger/__main__.py @@ -0,0 +1,7 @@ +"""Allow running as: python -m decision_space_ledger """ + +import sys + +from .cli import main + +sys.exit(main()) diff --git a/decision_space_ledger/src/decision_space_ledger/canonicalise.py b/decision_space_ledger/src/decision_space_ledger/canonicalise.py new file mode 100644 index 0000000..0b94e36 --- /dev/null +++ b/decision_space_ledger/src/decision_space_ledger/canonicalise.py @@ -0,0 +1,46 @@ +"""Deterministic canonicalisation and SHA-256 hashing for decision_space_snapshot_v1. + +Canonicalisation rules: +- ``variables`` — sorted lexicographically. +- ``allowed_transitions`` — sorted by (from, to). +- ``exclusions`` — sorted lexicographically. +- ``reason_code_families`` — each code list sorted lexicographically; + the families object itself uses sorted keys (via json.dumps sort_keys=True). +- Final JSON: sort_keys=True, no whitespace, UTF-8 encoded. + +The hash is SHA-256 of the canonical bytes, returned as a lowercase hex string +(64 characters). +""" + +import hashlib +import json + + +def canonicalise(snapshot): + """Return canonical JSON bytes of a validated snapshot. + + All order-independent arrays are sorted before serialisation so that + logically equivalent snapshots produce byte-identical output regardless + of insertion order. + """ + canonical = { + "version": snapshot["version"], + "variables": sorted(snapshot["variables"]), + "allowed_transitions": sorted( + snapshot["allowed_transitions"], + key=lambda t: (t["from"], t["to"]), + ), + "exclusions": sorted(snapshot["exclusions"]), + "reason_code_families": { + family: sorted(codes) + for family, codes in snapshot["reason_code_families"].items() + }, + } + return json.dumps( + canonical, sort_keys=True, separators=(",", ":"), ensure_ascii=False + ).encode("utf-8") + + +def canonical_hash(snapshot): + """Return SHA-256 hex digest (lowercase, 64 chars) of the canonical snapshot.""" + return hashlib.sha256(canonicalise(snapshot)).hexdigest() diff --git a/decision_space_ledger/src/decision_space_ledger/cli.py b/decision_space_ledger/src/decision_space_ledger/cli.py new file mode 100644 index 0000000..f8a6398 --- /dev/null +++ b/decision_space_ledger/src/decision_space_ledger/cli.py @@ -0,0 +1,70 @@ +"""CLI entry point for the decision_space_ledger. + +Usage:: + + python -m decision_space_ledger + +Validates both snapshots, computes their canonical SHA-256 hashes, produces a +structured diff, and writes the result as JSON to stdout. + +Exit codes: + 0 — success + 1 — file I/O or validation error + 2 — incorrect usage +""" + +import json +import sys + +from .canonicalise import canonical_hash +from .diff import diff +from .schema import validate + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + if len(argv) != 2: + print( + "Usage: python -m decision_space_ledger ", + file=sys.stderr, + ) + return 2 + + path_a, path_b = argv[0], argv[1] + + try: + with open(path_a, encoding="utf-8") as fh: + snap_a = json.load(fh) + except (OSError, json.JSONDecodeError) as exc: + print(f"Error reading {path_a}: {exc}", file=sys.stderr) + return 1 + + try: + with open(path_b, encoding="utf-8") as fh: + snap_b = json.load(fh) + except (OSError, json.JSONDecodeError) as exc: + print(f"Error reading {path_b}: {exc}", file=sys.stderr) + return 1 + + try: + validate(snap_a) + except ValueError as exc: + print(f"Validation error in {path_a}: {exc}", file=sys.stderr) + return 1 + + try: + validate(snap_b) + except ValueError as exc: + print(f"Validation error in {path_b}: {exc}", file=sys.stderr) + return 1 + + result = { + "snapshot_a": {"path": path_a, "hash": canonical_hash(snap_a)}, + "snapshot_b": {"path": path_b, "hash": canonical_hash(snap_b)}, + "diff": diff(snap_a, snap_b), + } + + print(json.dumps(result, indent=2)) + return 0 diff --git a/decision_space_ledger/src/decision_space_ledger/decision_space_snapshot_v1.schema.json b/decision_space_ledger/src/decision_space_ledger/decision_space_snapshot_v1.schema.json new file mode 100644 index 0000000..7b87320 --- /dev/null +++ b/decision_space_ledger/src/decision_space_ledger/decision_space_snapshot_v1.schema.json @@ -0,0 +1,49 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "decision_space_snapshot_v1", + "description": "A versioned snapshot of a decision space: variables, transitions, exclusions, and reason-code families.", + "type": "object", + "required": ["version", "variables", "allowed_transitions", "exclusions", "reason_code_families"], + "additionalProperties": false, + "properties": { + "version": { + "type": "string", + "enum": ["v1"], + "description": "Schema version identifier." + }, + "variables": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": true, + "description": "The set of decision variables in this space." + }, + "allowed_transitions": { + "type": "array", + "items": { + "type": "object", + "required": ["from", "to"], + "additionalProperties": false, + "properties": { + "from": {"type": "string", "description": "Source state."}, + "to": {"type": "string", "description": "Target state."} + } + }, + "description": "Permitted state transitions." + }, + "exclusions": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": true, + "description": "Variable names or transition identifiers explicitly excluded from the space." + }, + "reason_code_families": { + "type": "object", + "additionalProperties": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": true + }, + "description": "Named families of reason codes. Each key is a family name; each value is a list of unique reason code strings." + } + } +} diff --git a/decision_space_ledger/src/decision_space_ledger/diff.py b/decision_space_ledger/src/decision_space_ledger/diff.py new file mode 100644 index 0000000..83eb64a --- /dev/null +++ b/decision_space_ledger/src/decision_space_ledger/diff.py @@ -0,0 +1,95 @@ +"""Structured diff between two decision_space_snapshot_v1 instances. + +All diff results are deterministic: lists are sorted lexicographically so the +same logical change always produces identical output. +""" + + +def diff(snapshot_a, snapshot_b): + """Compute a structured diff between two validated snapshots. + + Both snapshots must already have been validated with ``schema.validate``. + + Returns a dict with the following keys: + + ``is_identical`` + True if the two snapshots are logically equivalent. + + ``variables_added`` / ``variables_removed`` + Sorted lists of variable names present in b but not a (or vice versa). + + ``transitions_added`` / ``transitions_removed`` + Sorted lists of ``{"from": ..., "to": ...}`` dicts present in b but + not a (or vice versa). Sorted by (from, to). + + ``exclusions_added`` / ``exclusions_removed`` + Sorted lists of exclusion strings. + + ``reason_code_families_added`` / ``reason_code_families_removed`` + Sorted lists of family names new in b / absent from b. + + ``reason_code_families_changed`` + Dict mapping each changed family name to + ``{"codes_added": [...], "codes_removed": [...]}``. + """ + vars_a = set(snapshot_a["variables"]) + vars_b = set(snapshot_b["variables"]) + + trans_a = {(t["from"], t["to"]) for t in snapshot_a["allowed_transitions"]} + trans_b = {(t["from"], t["to"]) for t in snapshot_b["allowed_transitions"]} + + excl_a = set(snapshot_a["exclusions"]) + excl_b = set(snapshot_b["exclusions"]) + + fam_a = snapshot_a["reason_code_families"] + fam_b = snapshot_b["reason_code_families"] + + families_added = sorted(set(fam_b) - set(fam_a)) + families_removed = sorted(set(fam_a) - set(fam_b)) + + families_changed = {} + for family in sorted(set(fam_a) & set(fam_b)): + codes_a = set(fam_a[family]) + codes_b = set(fam_b[family]) + added = sorted(codes_b - codes_a) + removed = sorted(codes_a - codes_b) + if added or removed: + families_changed[family] = {"codes_added": added, "codes_removed": removed} + + variables_added = sorted(vars_b - vars_a) + variables_removed = sorted(vars_a - vars_b) + + transitions_added = sorted(trans_b - trans_a, key=lambda t: (t[0], t[1])) + transitions_removed = sorted(trans_a - trans_b, key=lambda t: (t[0], t[1])) + + exclusions_added = sorted(excl_b - excl_a) + exclusions_removed = sorted(excl_a - excl_b) + + is_identical = not any([ + variables_added, + variables_removed, + transitions_added, + transitions_removed, + exclusions_added, + exclusions_removed, + families_added, + families_removed, + families_changed, + ]) + + return { + "is_identical": is_identical, + "variables_added": variables_added, + "variables_removed": variables_removed, + "transitions_added": [ + {"from": t[0], "to": t[1]} for t in transitions_added + ], + "transitions_removed": [ + {"from": t[0], "to": t[1]} for t in transitions_removed + ], + "exclusions_added": exclusions_added, + "exclusions_removed": exclusions_removed, + "reason_code_families_added": families_added, + "reason_code_families_removed": families_removed, + "reason_code_families_changed": families_changed, + } diff --git a/decision_space_ledger/src/decision_space_ledger/schema.py b/decision_space_ledger/src/decision_space_ledger/schema.py new file mode 100644 index 0000000..ecb0f54 --- /dev/null +++ b/decision_space_ledger/src/decision_space_ledger/schema.py @@ -0,0 +1,171 @@ +"""Schema definition and strict validation for decision_space_snapshot_v1. + +Validates without any third-party dependencies. All checks are explicit and +deterministic. Raises ValueError with a descriptive message on any violation. +""" + +SCHEMA = { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "decision_space_snapshot_v1", + "type": "object", + "required": [ + "version", + "variables", + "allowed_transitions", + "exclusions", + "reason_code_families", + ], + "additionalProperties": False, + "properties": { + "version": {"type": "string", "enum": ["v1"]}, + "variables": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": True, + }, + "allowed_transitions": { + "type": "array", + "items": { + "type": "object", + "required": ["from", "to"], + "additionalProperties": False, + "properties": { + "from": {"type": "string"}, + "to": {"type": "string"}, + }, + }, + }, + "exclusions": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": True, + }, + "reason_code_families": { + "type": "object", + "additionalProperties": { + "type": "array", + "items": {"type": "string"}, + "uniqueItems": True, + }, + }, + }, +} + +_REQUIRED_KEYS = frozenset(SCHEMA["required"]) +_ALLOWED_KEYS = frozenset(SCHEMA["properties"]) + + +def validate(snapshot): + """Validate a decision_space_snapshot_v1 dict strictly. + + Returns the snapshot unchanged if valid. + Raises ValueError with a descriptive message on any violation. + """ + if not isinstance(snapshot, dict): + raise ValueError( + f"snapshot must be a dict, got {type(snapshot).__name__}" + ) + + extra_keys = set(snapshot) - _ALLOWED_KEYS + if extra_keys: + raise ValueError( + f"unexpected keys in snapshot: {sorted(extra_keys)}" + ) + + for key in sorted(_REQUIRED_KEYS): + if key not in snapshot: + raise ValueError(f"required key missing: '{key}'") + + # version + version = snapshot["version"] + if not isinstance(version, str): + raise ValueError( + f"version must be a string, got {type(version).__name__}" + ) + if version != "v1": + raise ValueError(f"version must be 'v1', got {version!r}") + + # variables + variables = snapshot["variables"] + if not isinstance(variables, list): + raise ValueError( + f"variables must be a list, got {type(variables).__name__}" + ) + for i, v in enumerate(variables): + if not isinstance(v, str): + raise ValueError( + f"variables[{i}] must be a string, got {type(v).__name__}" + ) + if len(variables) != len(set(variables)): + raise ValueError("variables must contain unique items") + + # allowed_transitions + transitions = snapshot["allowed_transitions"] + if not isinstance(transitions, list): + raise ValueError( + f"allowed_transitions must be a list, got {type(transitions).__name__}" + ) + for i, t in enumerate(transitions): + if not isinstance(t, dict): + raise ValueError( + f"allowed_transitions[{i}] must be an object, got {type(t).__name__}" + ) + extra_t = set(t) - {"from", "to"} + if extra_t: + raise ValueError( + f"allowed_transitions[{i}] has unexpected keys: {sorted(extra_t)}" + ) + for field in ("from", "to"): + if field not in t: + raise ValueError( + f"allowed_transitions[{i}] missing required key '{field}'" + ) + if not isinstance(t[field], str): + raise ValueError( + f"allowed_transitions[{i}]['{field}'] must be a string, " + f"got {type(t[field]).__name__}" + ) + + # exclusions + exclusions = snapshot["exclusions"] + if not isinstance(exclusions, list): + raise ValueError( + f"exclusions must be a list, got {type(exclusions).__name__}" + ) + for i, e in enumerate(exclusions): + if not isinstance(e, str): + raise ValueError( + f"exclusions[{i}] must be a string, got {type(e).__name__}" + ) + if len(exclusions) != len(set(exclusions)): + raise ValueError("exclusions must contain unique items") + + # reason_code_families + families = snapshot["reason_code_families"] + if not isinstance(families, dict): + raise ValueError( + f"reason_code_families must be an object, got {type(families).__name__}" + ) + for family_name, codes in families.items(): + if not isinstance(family_name, str): + raise ValueError( + f"reason_code_families key must be a string, " + f"got {type(family_name).__name__}" + ) + if not isinstance(codes, list): + raise ValueError( + f"reason_code_families['{family_name}'] must be a list, " + f"got {type(codes).__name__}" + ) + for i, code in enumerate(codes): + if not isinstance(code, str): + raise ValueError( + f"reason_code_families['{family_name}'][{i}] must be a string, " + f"got {type(code).__name__}" + ) + if len(codes) != len(set(codes)): + raise ValueError( + f"reason_code_families['{family_name}'] must contain unique items" + ) + + return snapshot diff --git a/decision_space_ledger/tests/test_ledger.py b/decision_space_ledger/tests/test_ledger.py new file mode 100644 index 0000000..c8b6c57 --- /dev/null +++ b/decision_space_ledger/tests/test_ledger.py @@ -0,0 +1,589 @@ +"""Tests for decision_space_ledger — schema validation, canonicalisation, hashing, diff, CLI.""" + +import json +import sys +import tempfile +from pathlib import Path + +# Add src to path so tests can be run from repository root via pytest -q +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) + +from decision_space_ledger import SCHEMA, canonical_hash, canonicalise, diff, validate +from decision_space_ledger.cli import main + + +# --------------------------------------------------------------------------- +# Shared fixtures +# --------------------------------------------------------------------------- + +MINIMAL = { + "version": "v1", + "variables": ["A", "B"], + "allowed_transitions": [{"from": "A", "to": "B"}], + "exclusions": [], + "reason_code_families": {}, +} + +FULL = { + "version": "v1", + "variables": ["APPROVE", "DENY", "HOLD", "REVIEW"], + "allowed_transitions": [ + {"from": "REVIEW", "to": "APPROVE"}, + {"from": "REVIEW", "to": "DENY"}, + {"from": "REVIEW", "to": "HOLD"}, + {"from": "HOLD", "to": "APPROVE"}, + {"from": "HOLD", "to": "DENY"}, + ], + "exclusions": ["DENY"], + "reason_code_families": { + "approval": ["MANUAL", "AUTO"], + "rejection": ["POLICY_VIOLATION", "TIMEOUT"], + }, +} + + +def _copy(d): + return json.loads(json.dumps(d)) + + +# --------------------------------------------------------------------------- +# Schema: SCHEMA constant +# --------------------------------------------------------------------------- + +def test_schema_constant_has_required_keys(): + assert SCHEMA["title"] == "decision_space_snapshot_v1" + assert "properties" in SCHEMA + required = SCHEMA["required"] + for key in ("version", "variables", "allowed_transitions", "exclusions", "reason_code_families"): + assert key in required + + +# --------------------------------------------------------------------------- +# Validation: valid inputs +# --------------------------------------------------------------------------- + +def test_validate_minimal_snapshot(): + result = validate(_copy(MINIMAL)) + assert result["version"] == "v1" + + +def test_validate_full_snapshot(): + result = validate(_copy(FULL)) + assert result is not None + + +def test_validate_returns_snapshot_unchanged(): + snap = _copy(MINIMAL) + result = validate(snap) + assert result is snap + + +def test_validate_empty_variables_allowed(): + snap = _copy(MINIMAL) + snap["variables"] = [] + validate(snap) + + +def test_validate_empty_transitions_allowed(): + snap = _copy(MINIMAL) + snap["allowed_transitions"] = [] + validate(snap) + + +def test_validate_empty_reason_code_families_allowed(): + validate(_copy(MINIMAL)) + + +# --------------------------------------------------------------------------- +# Validation: invalid inputs — top-level +# --------------------------------------------------------------------------- + +def test_validate_not_a_dict(): + import pytest + with pytest.raises(ValueError, match="dict"): + validate([1, 2, 3]) + + +def test_validate_extra_key_rejected(): + import pytest + snap = _copy(MINIMAL) + snap["unexpected_field"] = "oops" + with pytest.raises(ValueError, match="unexpected keys"): + validate(snap) + + +def test_validate_missing_version(): + import pytest + snap = _copy(MINIMAL) + del snap["version"] + with pytest.raises(ValueError, match="version"): + validate(snap) + + +def test_validate_missing_variables(): + import pytest + snap = _copy(MINIMAL) + del snap["variables"] + with pytest.raises(ValueError, match="variables"): + validate(snap) + + +def test_validate_missing_allowed_transitions(): + import pytest + snap = _copy(MINIMAL) + del snap["allowed_transitions"] + with pytest.raises(ValueError, match="allowed_transitions"): + validate(snap) + + +def test_validate_missing_exclusions(): + import pytest + snap = _copy(MINIMAL) + del snap["exclusions"] + with pytest.raises(ValueError, match="exclusions"): + validate(snap) + + +def test_validate_missing_reason_code_families(): + import pytest + snap = _copy(MINIMAL) + del snap["reason_code_families"] + with pytest.raises(ValueError, match="reason_code_families"): + validate(snap) + + +# --------------------------------------------------------------------------- +# Validation: invalid inputs — version +# --------------------------------------------------------------------------- + +def test_validate_wrong_version(): + import pytest + snap = _copy(MINIMAL) + snap["version"] = "v2" + with pytest.raises(ValueError, match="v1"): + validate(snap) + + +def test_validate_version_not_string(): + import pytest + snap = _copy(MINIMAL) + snap["version"] = 1 + with pytest.raises(ValueError, match="string"): + validate(snap) + + +# --------------------------------------------------------------------------- +# Validation: invalid inputs — variables +# --------------------------------------------------------------------------- + +def test_validate_variables_not_list(): + import pytest + snap = _copy(MINIMAL) + snap["variables"] = "A" + with pytest.raises(ValueError, match="list"): + validate(snap) + + +def test_validate_variables_non_string_item(): + import pytest + snap = _copy(MINIMAL) + snap["variables"] = ["A", 42] + with pytest.raises(ValueError, match="string"): + validate(snap) + + +def test_validate_variables_duplicates_rejected(): + import pytest + snap = _copy(MINIMAL) + snap["variables"] = ["A", "A"] + with pytest.raises(ValueError, match="unique"): + validate(snap) + + +# --------------------------------------------------------------------------- +# Validation: invalid inputs — allowed_transitions +# --------------------------------------------------------------------------- + +def test_validate_transitions_not_list(): + import pytest + snap = _copy(MINIMAL) + snap["allowed_transitions"] = "A->B" + with pytest.raises(ValueError, match="list"): + validate(snap) + + +def test_validate_transition_not_dict(): + import pytest + snap = _copy(MINIMAL) + snap["allowed_transitions"] = ["A->B"] + with pytest.raises(ValueError, match="object"): + validate(snap) + + +def test_validate_transition_missing_from(): + import pytest + snap = _copy(MINIMAL) + snap["allowed_transitions"] = [{"to": "B"}] + with pytest.raises(ValueError, match="'from'"): + validate(snap) + + +def test_validate_transition_missing_to(): + import pytest + snap = _copy(MINIMAL) + snap["allowed_transitions"] = [{"from": "A"}] + with pytest.raises(ValueError, match="'to'"): + validate(snap) + + +def test_validate_transition_extra_key_rejected(): + import pytest + snap = _copy(MINIMAL) + snap["allowed_transitions"] = [{"from": "A", "to": "B", "weight": 1}] + with pytest.raises(ValueError, match="unexpected keys"): + validate(snap) + + +def test_validate_transition_from_not_string(): + import pytest + snap = _copy(MINIMAL) + snap["allowed_transitions"] = [{"from": 0, "to": "B"}] + with pytest.raises(ValueError, match="string"): + validate(snap) + + +# --------------------------------------------------------------------------- +# Validation: invalid inputs — exclusions +# --------------------------------------------------------------------------- + +def test_validate_exclusions_not_list(): + import pytest + snap = _copy(MINIMAL) + snap["exclusions"] = "X" + with pytest.raises(ValueError, match="list"): + validate(snap) + + +def test_validate_exclusions_non_string_item(): + import pytest + snap = _copy(MINIMAL) + snap["exclusions"] = [99] + with pytest.raises(ValueError, match="string"): + validate(snap) + + +def test_validate_exclusions_duplicates_rejected(): + import pytest + snap = _copy(MINIMAL) + snap["exclusions"] = ["X", "X"] + with pytest.raises(ValueError, match="unique"): + validate(snap) + + +# --------------------------------------------------------------------------- +# Validation: invalid inputs — reason_code_families +# --------------------------------------------------------------------------- + +def test_validate_reason_code_families_not_dict(): + import pytest + snap = _copy(MINIMAL) + snap["reason_code_families"] = ["a"] + with pytest.raises(ValueError, match="object"): + validate(snap) + + +def test_validate_reason_code_family_value_not_list(): + import pytest + snap = _copy(MINIMAL) + snap["reason_code_families"] = {"approval": "MANUAL"} + with pytest.raises(ValueError, match="list"): + validate(snap) + + +def test_validate_reason_code_family_item_not_string(): + import pytest + snap = _copy(MINIMAL) + snap["reason_code_families"] = {"approval": [1, 2]} + with pytest.raises(ValueError, match="string"): + validate(snap) + + +def test_validate_reason_code_family_duplicates_rejected(): + import pytest + snap = _copy(MINIMAL) + snap["reason_code_families"] = {"approval": ["MANUAL", "MANUAL"]} + with pytest.raises(ValueError, match="unique"): + validate(snap) + + +# --------------------------------------------------------------------------- +# Canonicalisation +# --------------------------------------------------------------------------- + +def test_canonicalise_returns_bytes(): + result = canonicalise(_copy(MINIMAL)) + assert isinstance(result, bytes) + + +def test_canonicalise_deterministic(): + """Same logical snapshot always produces identical bytes.""" + a = canonicalise(_copy(MINIMAL)) + b = canonicalise(_copy(MINIMAL)) + assert a == b + + +def test_canonicalise_order_independent_variables(): + """variables are sorted, so insertion order must not matter.""" + snap1 = _copy(MINIMAL) + snap1["variables"] = ["B", "A"] + snap2 = _copy(MINIMAL) + snap2["variables"] = ["A", "B"] + assert canonicalise(snap1) == canonicalise(snap2) + + +def test_canonicalise_order_independent_transitions(): + snap1 = _copy(FULL) + snap2 = _copy(FULL) + snap2["allowed_transitions"] = list(reversed(snap2["allowed_transitions"])) + assert canonicalise(snap1) == canonicalise(snap2) + + +def test_canonicalise_order_independent_exclusions(): + snap1 = _copy(FULL) + snap1["exclusions"] = ["Y", "X"] + snap2 = _copy(FULL) + snap2["exclusions"] = ["X", "Y"] + assert canonicalise(snap1) == canonicalise(snap2) + + +def test_canonicalise_order_independent_reason_codes(): + snap1 = _copy(FULL) + snap1["reason_code_families"]["approval"] = ["AUTO", "MANUAL"] + snap2 = _copy(FULL) + snap2["reason_code_families"]["approval"] = ["MANUAL", "AUTO"] + assert canonicalise(snap1) == canonicalise(snap2) + + +def test_canonicalise_different_snapshots_differ(): + snap1 = _copy(MINIMAL) + snap2 = _copy(MINIMAL) + snap2["variables"] = ["A", "B", "C"] + assert canonicalise(snap1) != canonicalise(snap2) + + +# --------------------------------------------------------------------------- +# Hashing +# --------------------------------------------------------------------------- + +def test_hash_is_64_char_hex_lowercase(): + h = canonical_hash(_copy(MINIMAL)) + assert isinstance(h, str) + assert len(h) == 64 + assert h == h.lower() + int(h, 16) # must be valid hex + + +def test_hash_deterministic(): + h1 = canonical_hash(_copy(MINIMAL)) + h2 = canonical_hash(_copy(MINIMAL)) + assert h1 == h2 + + +def test_hash_order_independent(): + snap1 = _copy(FULL) + snap2 = _copy(FULL) + snap2["variables"] = list(reversed(snap2["variables"])) + assert canonical_hash(snap1) == canonical_hash(snap2) + + +def test_hash_changes_on_mutation(): + snap1 = _copy(MINIMAL) + snap2 = _copy(MINIMAL) + snap2["variables"] = ["Z"] + assert canonical_hash(snap1) != canonical_hash(snap2) + + +# --------------------------------------------------------------------------- +# Diff +# --------------------------------------------------------------------------- + +def test_diff_identical_snapshots(): + result = diff(_copy(FULL), _copy(FULL)) + assert result["is_identical"] is True + assert result["variables_added"] == [] + assert result["variables_removed"] == [] + assert result["transitions_added"] == [] + assert result["transitions_removed"] == [] + assert result["exclusions_added"] == [] + assert result["exclusions_removed"] == [] + assert result["reason_code_families_added"] == [] + assert result["reason_code_families_removed"] == [] + assert result["reason_code_families_changed"] == {} + + +def test_diff_variable_added(): + a = _copy(MINIMAL) + b = _copy(MINIMAL) + b["variables"] = ["A", "B", "C"] + result = diff(a, b) + assert result["variables_added"] == ["C"] + assert result["variables_removed"] == [] + assert result["is_identical"] is False + + +def test_diff_variable_removed(): + a = _copy(FULL) + b = _copy(FULL) + b["variables"] = [v for v in b["variables"] if v != "HOLD"] + result = diff(a, b) + assert "HOLD" in result["variables_removed"] + assert result["is_identical"] is False + + +def test_diff_transition_added(): + a = _copy(MINIMAL) + b = _copy(MINIMAL) + b["allowed_transitions"] = [{"from": "A", "to": "B"}, {"from": "B", "to": "A"}] + result = diff(a, b) + assert {"from": "B", "to": "A"} in result["transitions_added"] + assert result["is_identical"] is False + + +def test_diff_transition_removed(): + a = _copy(FULL) + b = _copy(FULL) + b["allowed_transitions"] = [ + t for t in b["allowed_transitions"] + if not (t["from"] == "HOLD" and t["to"] == "DENY") + ] + result = diff(a, b) + assert {"from": "HOLD", "to": "DENY"} in result["transitions_removed"] + + +def test_diff_exclusion_added(): + a = _copy(MINIMAL) + b = _copy(MINIMAL) + b["exclusions"] = ["X"] + result = diff(a, b) + assert result["exclusions_added"] == ["X"] + + +def test_diff_exclusion_removed(): + a = _copy(FULL) + b = _copy(FULL) + b["exclusions"] = [] + result = diff(a, b) + assert "DENY" in result["exclusions_removed"] + + +def test_diff_family_added(): + a = _copy(MINIMAL) + b = _copy(MINIMAL) + b["reason_code_families"] = {"new_family": ["CODE1"]} + result = diff(a, b) + assert "new_family" in result["reason_code_families_added"] + assert result["is_identical"] is False + + +def test_diff_family_removed(): + a = _copy(FULL) + b = _copy(FULL) + del b["reason_code_families"]["rejection"] + result = diff(a, b) + assert "rejection" in result["reason_code_families_removed"] + + +def test_diff_family_codes_changed(): + a = _copy(FULL) + b = _copy(FULL) + b["reason_code_families"]["approval"] = ["MANUAL", "AUTO", "OVERRIDE"] + result = diff(a, b) + changed = result["reason_code_families_changed"] + assert "approval" in changed + assert "OVERRIDE" in changed["approval"]["codes_added"] + assert changed["approval"]["codes_removed"] == [] + + +def test_diff_is_deterministic(): + """Running diff twice on the same inputs returns the same result.""" + r1 = diff(_copy(FULL), _copy(MINIMAL)) + r2 = diff(_copy(FULL), _copy(MINIMAL)) + assert r1 == r2 + + +def test_diff_sorted_outputs(): + a = _copy(MINIMAL) + a["variables"] = [] + b = _copy(MINIMAL) + b["variables"] = ["C", "A", "B"] + result = diff(a, b) + assert result["variables_added"] == sorted(result["variables_added"]) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def _write_json(path, obj): + Path(path).write_text(json.dumps(obj), encoding="utf-8") + + +def test_cli_compare_identical(capsys): + with tempfile.TemporaryDirectory() as tmp: + a_path = str(Path(tmp) / "a.json") + b_path = str(Path(tmp) / "b.json") + _write_json(a_path, _copy(FULL)) + _write_json(b_path, _copy(FULL)) + rc = main([a_path, b_path]) + assert rc == 0 + out = json.loads(capsys.readouterr().out) + assert out["diff"]["is_identical"] is True + assert len(out["snapshot_a"]["hash"]) == 64 + assert out["snapshot_a"]["hash"] == out["snapshot_b"]["hash"] + + +def test_cli_compare_different(capsys): + with tempfile.TemporaryDirectory() as tmp: + a_path = str(Path(tmp) / "a.json") + b_path = str(Path(tmp) / "b.json") + _write_json(a_path, _copy(MINIMAL)) + b = _copy(FULL) + _write_json(b_path, b) + rc = main([a_path, b_path]) + assert rc == 0 + out = json.loads(capsys.readouterr().out) + assert out["diff"]["is_identical"] is False + assert out["snapshot_a"]["hash"] != out["snapshot_b"]["hash"] + + +def test_cli_no_args(capsys): + rc = main([]) + assert rc == 2 + + +def test_cli_one_arg(capsys): + rc = main(["only_one.json"]) + assert rc == 2 + + +def test_cli_invalid_json(capsys, tmp_path): + bad = tmp_path / "bad.json" + bad.write_text("not json", encoding="utf-8") + good = tmp_path / "good.json" + _write_json(str(good), _copy(MINIMAL)) + rc = main([str(bad), str(good)]) + assert rc == 1 + + +def test_cli_invalid_snapshot(capsys, tmp_path): + invalid = tmp_path / "invalid.json" + _write_json(str(invalid), {"version": "v2"}) + good = tmp_path / "good.json" + _write_json(str(good), _copy(MINIMAL)) + rc = main([str(invalid), str(good)]) + assert rc == 1 + + +def test_cli_missing_file(capsys): + rc = main(["/nonexistent/a.json", "/nonexistent/b.json"]) + assert rc == 1