diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index be775e8..a5abfc4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,11 +20,33 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Install pytest - run: pip install pytest + - name: Install dependencies + run: pip install pytest pytest-cov - - name: Run tests - run: python -m pytest tests/ -v + - name: Run tests with coverage + run: python -m pytest tests/ -v --cov=framework --cov=prepare --cov-report=term-missing --cov-fail-under=70 + + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install linters + run: pip install "ruff>=0.8,<1.0" bandit + + - name: Ruff lint + run: ruff check framework.py prepare.py + + - name: Ruff format check + run: ruff format --check framework.py prepare.py + + - name: Security scan (bandit) + run: bandit -r framework.py prepare.py -ll shellcheck: runs-on: ubuntu-latest diff --git a/framework.py b/framework.py index 890c862..fdac8fd 100644 --- a/framework.py +++ b/framework.py @@ -14,12 +14,15 @@ """ import os +import re import sys import json +import shutil import argparse import subprocess +from collections import Counter from datetime import datetime, timezone -from pathlib import Path +from typing import Optional # --------------------------------------------------------------------------- # Constants @@ -28,6 +31,8 @@ OURO_DIR = ".ouro" STATE_FILE = "state.json" RESULTS_FILE = "ouro-results.tsv" +REFLECTIVE_LOG = "reflective-log.jsonl" +CLAUDE_MD_FILENAME = "CLAUDE.md" STAGES = ["BOUND", "MAP", "PLAN", "BUILD", "VERIFY", "LOOP"] @@ -40,10 +45,37 @@ "architectural": {"max_lines": None, "max_files": None, "phases": None}, } +# Shared BOUND markers — used by both framework.py and prepare.py (DRY) +BOUND_SECTION_MARKERS = ["## BOUND", "# BOUND"] +BOUND_CONTENT_MARKERS = [ + "DANGER ZONE", + "DANGER_ZONE", + "NEVER DO", + "NEVER_DO", + "IRON LAW", + "IRON_LAW", +] +BOUND_ALL_MARKERS = BOUND_SECTION_MARKERS + BOUND_CONTENT_MARKERS + +# Template placeholders indicating unfilled CLAUDE.md +TEMPLATE_PLACEHOLDERS = [ + "[PROJECT_NAME]", + "[why it's dangerous]", + "[action]", + "[Invariant 1", +] + +# Magic values extracted as named constants +GIT_TIMEOUT_SECONDS = 10 +HOT_FILE_EDIT_THRESHOLD = 3 +HISTORY_LIMIT = 50 +MAX_RETRY_BEFORE_ESCALATE = 3 + # --------------------------------------------------------------------------- # State management # --------------------------------------------------------------------------- + def load_state(project_path: str, required: bool = True) -> dict: """Load ouro state from .ouro/state.json. @@ -63,27 +95,261 @@ def load_state(project_path: str, required: bool = True) -> dict: return None print(f"Corrupted state file: {state_path}") print(f"Error: {e}") - print(f"Run: python prepare.py init {project_path} (or delete .ouro/ to reset)") + print( + f"Run: python prepare.py init {project_path} (or delete .ouro/ to reset)" + ) sys.exit(1) def save_state(project_path: str, state: dict): - """Save ouro state to .ouro/state.json.""" + """Save ouro state to .ouro/state.json (atomic write).""" state_path = os.path.join(project_path, OURO_DIR, STATE_FILE) state["updated_at"] = datetime.now(timezone.utc).isoformat() - with open(state_path, "w") as f: + tmp_path = state_path + ".tmp" + with open(tmp_path, "w") as f: json.dump(state, f, indent=2) + try: + os.replace(tmp_path, state_path) + except OSError: + shutil.move(tmp_path, state_path) + + +# --------------------------------------------------------------------------- +# CLAUDE.md parsing +# --------------------------------------------------------------------------- + + +def _get_claude_md_path(project_path: str) -> str: + """Return the path to CLAUDE.md within the project.""" + return os.path.join(project_path, CLAUDE_MD_FILENAME) + + +def parse_claude_md(project_path: str) -> dict: + """Parse CLAUDE.md into structured BOUND data. + + Returns a dict with: + danger_zones: list[str] — paths/patterns from DANGER ZONES section + never_do: list[str] — prohibitions from NEVER DO section + iron_laws: list[str] — invariants from IRON LAWS section + has_bound: bool — whether any BOUND markers were found + raw_content: str — full file content (empty string if file missing) + """ + result = { + "danger_zones": [], + "never_do": [], + "iron_laws": [], + "has_bound": False, + "raw_content": "", + "parse_source": "none", # "structured", "fallback", or "none" + } + + claude_md = _get_claude_md_path(project_path) + if not os.path.exists(claude_md): + return result + + try: + with open(claude_md, "r", encoding="utf-8") as f: + content = f.read() + except OSError: + return result + + result["raw_content"] = content + result["has_bound"] = any(m in content for m in BOUND_ALL_MARKERS) + + # --- Primary extraction: standard section headers --- + + # Extract DANGER ZONES — lines with backtick-wrapped paths + dz_match = re.search( + r"(?:###?\s*DANGER\s*ZONES?)(.*?)(?=\n###?\s|\Z)", + content, + re.DOTALL | re.IGNORECASE, + ) + if dz_match: + zone_text = dz_match.group(1) + result["danger_zones"] = re.findall(r"`([^`]+)`", zone_text) + + # Extract NEVER DO — list items + nd_match = re.search( + r"(?:###?\s*NEVER\s*DO)(.*?)(?=\n###?\s|\Z)", + content, + re.DOTALL | re.IGNORECASE, + ) + if nd_match: + nd_text = nd_match.group(1) + result["never_do"] = [ + line.strip().lstrip("-*").strip() + for line in nd_text.strip().split("\n") + if line.strip() and line.strip().startswith(("-", "*")) + ] + + # Extract IRON LAWS — list items + il_match = re.search( + r"(?:###?\s*IRON\s*LAWS?)(.*?)(?=\n###?\s|\Z)", + content, + re.DOTALL | re.IGNORECASE, + ) + if il_match: + il_text = il_match.group(1) + result["iron_laws"] = [ + line.strip().lstrip("-*").strip() + for line in il_text.strip().split("\n") + if line.strip() and line.strip().startswith(("-", "*")) + ] + + # Mark source if primary extraction succeeded + if any([result["danger_zones"], result["never_do"], result["iron_laws"]]): + result["parse_source"] = "structured" + return result + + # --- Fallback extraction: prose-style CLAUDE.md without standard headers --- + # Only runs if primary extraction found nothing but has_bound is True + # (keywords exist but not in structured sections) + + if result["has_bound"] and not any( + [result["danger_zones"], result["never_do"], result["iron_laws"]] + ): + # Fallback DANGER ZONES: backtick-wrapped paths on lines near + # "DANGER" keyword (within 3 lines) + lines = content.split("\n") + for i, line in enumerate(lines): + if "DANGER" in line.upper(): + # Scan this line and nearby lines for backtick paths + window = lines[max(0, i - 1) : i + 4] + for wline in window: + for path in re.findall(r"`([^`]+)`", wline): + # Only include path-like strings (contain / or .) + if "/" in path or path.endswith( + (".py", ".sh", ".js", ".ts", ".rs", ".go") + ): + if path not in result["danger_zones"]: + result["danger_zones"].append(path) + + # Fallback NEVER DO: lines starting with "Never" or "Do not" or + # "- Never" anywhere in the file + for line in lines: + stripped = line.strip().lstrip("-*").strip() + if re.match(r"^(Never|Do not|NEVER)\b", stripped): + if stripped not in result["never_do"]: + result["never_do"].append(stripped) + + # Fallback IRON LAWS: lines containing "must" or "always" near + # backtick-wrapped code/paths (heuristic for invariants) + for line in lines: + stripped = line.strip().lstrip("-*").strip() + if re.search(r"\b(must|always|required)\b", stripped, re.IGNORECASE): + if "`" in line and stripped not in result["iron_laws"]: + result["iron_laws"].append(stripped) + + if any([result["danger_zones"], result["never_do"], result["iron_laws"]]): + result["parse_source"] = "fallback" + + return result + + +def _file_in_danger_zone(file_path: str, danger_zones: list) -> Optional[str]: + """Check if a file path matches any DANGER ZONE pattern. + + Uses path-segment-aware matching to avoid false positives: + - Zone "auth/" matches "auth/login.py" but NOT "unauthorized.py" + - Zone "auth/core.py" matches exactly that file + - Zone ending with "/" is treated as a directory prefix + + Returns the matched zone pattern, or None if no match. + """ + if not file_path: + return None + + # Normalize separators + norm_file = file_path.replace("\\", "/") + file_segments = norm_file.split("/") + + for zone in danger_zones: + if not zone: + continue + + norm_zone = zone.replace("\\", "/") + + # Exact match + if norm_file == norm_zone: + return zone + + # Directory prefix: zone "src/payments/" → file must start with that path + if norm_zone.endswith("/"): + if norm_file.startswith(norm_zone): + return zone + continue + + # File match: zone "auth/core.py" → exact path segment match + zone_segments = norm_zone.split("/") + + # Check if zone segments appear as contiguous subsequence in file path + zone_len = len(zone_segments) + for i in range(len(file_segments) - zone_len + 1): + if file_segments[i : i + zone_len] == zone_segments: + return zone + + return None + + +# --------------------------------------------------------------------------- +# Complexity detection +# --------------------------------------------------------------------------- + + +def detect_complexity( + project_path: str, changed_files: list = None, danger_zones: list = None +) -> dict: + """Detect task complexity based on file count and DANGER ZONE proximity. + + Returns: + level: str — trivial/simple/complex/architectural + reason: str — why this level was chosen + route: dict — the matching COMPLEXITY_ROUTES entry + """ + if changed_files is None: + changed_files = [] + if danger_zones is None: + danger_zones = [] + + num_files = len(changed_files) + dz_touched = [f for f in changed_files if _file_in_danger_zone(f, danger_zones)] + + # Determine level + if dz_touched: + if any("IRON" in str(dz).upper() for dz in dz_touched): + level = "architectural" + reason = f"Modifies IRON LAW area: {', '.join(dz_touched[:3])}" + else: + level = "complex" + reason = f"Touches DANGER ZONE: {', '.join(dz_touched[:3])}" + elif num_files <= 1: + level = "trivial" + reason = f"{num_files} file(s), no DANGER ZONE contact" + elif num_files <= 3: + level = "simple" + reason = f"{num_files} files, no DANGER ZONE contact" + else: + level = "complex" + reason = f"{num_files} files across multiple areas" + + return { + "level": level, + "reason": reason, + "route": COMPLEXITY_ROUTES[level], + } + # --------------------------------------------------------------------------- # Status # --------------------------------------------------------------------------- + def show_status(project_path: str): """Display current Ouro state.""" state = load_state(project_path) print(f"{'=' * 50}") - print(f" Ouro Loop — Status") + print(" Ouro Loop — Status") print(f"{'=' * 50}") print(f" Project: {state.get('project_name', 'Unknown')}") print(f" Stage: {state.get('current_stage', 'UNKNOWN')}") @@ -93,7 +359,7 @@ def show_status(project_path: str): if phase is not None and total > 0: print(f" Phase: {phase}/{total}") else: - print(f" Phase: N/A") + print(" Phase: N/A") print(f" BOUND: {'Defined' if state.get('bound_defined') else 'Not defined'}") @@ -108,67 +374,195 @@ def show_status(project_path: str): print(f" History: {passed} passed, {failed} failed, {len(history)} total") print(f"{'=' * 50}") + # --------------------------------------------------------------------------- # Verification # --------------------------------------------------------------------------- + def run_verification(project_path: str) -> dict: - """Run multi-layer verification checks.""" + """Run multi-layer verification checks (Layer 1 + 2 + 3).""" results = { "timestamp": datetime.now(timezone.utc).isoformat(), "layer1_gates": {}, "layer2_self": {}, + "layer3_review": {}, "overall": "PASS", } - # Layer 1: Gates - results["layer1_gates"] = run_gates(project_path) + # Refresh bound_defined in state — init snapshot may be stale + # (user might have added BOUND to CLAUDE.md after init) + bound_data = parse_claude_md(project_path) + state = load_state(project_path, required=False) + if state and state.get("bound_defined") != bound_data["has_bound"]: + state["bound_defined"] = bound_data["has_bound"] + save_state(project_path, state) + + # Layer 1: Gates (pass cached bound_data to avoid re-parsing) + results["layer1_gates"] = run_gates(project_path, _bound_data=bound_data) # Layer 2: Self-assessment - results["layer2_self"] = run_self_assessment(project_path) + results["layer2_self"] = run_self_assessment(project_path, _bound_data=bound_data) + + # Layer 3: External review triggers + results["layer3_review"] = _check_layer3_triggers( + project_path, results, _bound_data=bound_data + ) # Determine overall verdict - gate_failures = [g for g, v in results["layer1_gates"].items() if v["status"] == "FAIL"] - self_failures = [s for s, v in results["layer2_self"].items() if v["status"] == "FAIL"] + gate_failures = [ + g for g, v in results["layer1_gates"].items() if v["status"] == "FAIL" + ] + self_failures = [ + s for s, v in results["layer2_self"].items() if v["status"] == "FAIL" + ] + review_required = results["layer3_review"].get("required", False) if gate_failures or self_failures: results["overall"] = "FAIL" results["failures"] = gate_failures + self_failures + elif review_required: + results["overall"] = "REVIEW" + results["review_reasons"] = results["layer3_review"].get("reasons", []) + else: + # Check if everything is WARN/SKIP with no PASS — project likely not set up + all_statuses = [v["status"] for v in results["layer1_gates"].values()] + [ + v["status"] for v in results["layer2_self"].values() + ] + if all_statuses and "PASS" not in all_statuses: + results["overall"] = "WARN" return results -def run_gates(project_path: str) -> dict: - """Layer 1: Automated gates.""" +def _check_layer3_triggers( + project_path: str, current_results: dict, _bound_data: dict = None +) -> dict: + """Layer 3: Check if external (human) review is required. + + Triggers: + - Changes touch a DANGER ZONE + - IRON LAW needs modification + - 3+ consecutive RETRY verdicts + - Failed Layer 1 gate + """ + review = {"required": False, "reasons": []} + + # Check DANGER ZONE contact via RELEVANCE gate + relevance = current_results.get("layer1_gates", {}).get("RELEVANCE", {}) + dz_files = relevance.get("danger_zone_files", []) + if dz_files: + review["required"] = True + review["reasons"].append(f"DANGER ZONE touched: {', '.join(dz_files[:3])}") + + # Check for Layer 1 gate failures + gate_failures = [ + g + for g, v in current_results.get("layer1_gates", {}).items() + if v["status"] == "FAIL" + ] + if gate_failures: + review["required"] = True + review["reasons"].append(f"Layer 1 gate failed: {', '.join(gate_failures)}") + + # Check consecutive RETRY count from state history + state = load_state(project_path, required=False) + if state: + history = state.get("history", []) + consecutive_retries = 0 + for entry in reversed(history): + if entry.get("verdict") == "RETRY": + consecutive_retries += 1 + else: + break + if consecutive_retries >= MAX_RETRY_BEFORE_ESCALATE: + review["required"] = True + review["reasons"].append( + f"{consecutive_retries} consecutive RETRY verdicts — " + f"mandatory user review" + ) + + # Check complexity level (architectural = always review) + if _bound_data is None: + _bound_data = parse_claude_md(project_path) + changed_files = relevance.get("files", []) + if changed_files: + complexity = detect_complexity( + project_path, changed_files, _bound_data["danger_zones"] + ) + if complexity["level"] == "architectural": + review["required"] = True + review["reasons"].append( + f"Architectural complexity: {complexity['reason']}" + ) + + return review + + +def run_gates(project_path: str, _bound_data: dict = None) -> dict: + """Layer 1: Automated gates (EXIST, RELEVANCE, ROOT_CAUSE, RECALL, MOMENTUM).""" gates = {} + if _bound_data is None: + _bound_data = parse_claude_md(project_path) + bound_data = _bound_data + danger_zones = bound_data["danger_zones"] - # EXIST gate: check that key files exist - claude_md = os.path.join(project_path, "CLAUDE.md") + # EXIST gate: check that key files exist + DANGER ZONE awareness + claude_md = _get_claude_md_path(project_path) claude_exists = os.path.exists(claude_md) if claude_exists: gates["EXIST"] = {"status": "PASS", "detail": "CLAUDE.md exists"} else: - # No CLAUDE.md — check if state says BOUND should be defined state = load_state(project_path, required=False) bound_expected = state.get("bound_defined", False) if state else False if bound_expected: - gates["EXIST"] = {"status": "FAIL", "detail": "CLAUDE.md missing but BOUND was expected"} + gates["EXIST"] = { + "status": "FAIL", + "detail": "CLAUDE.md missing but BOUND was expected", + } else: - gates["EXIST"] = {"status": "WARN", "detail": "No CLAUDE.md — define BOUND before BUILD"} - + gates["EXIST"] = { + "status": "WARN", + "detail": "No CLAUDE.md — define BOUND before BUILD", + } - # RELEVANCE gate: check git status for scope + # RELEVANCE gate: check git status for scope + DANGER ZONE overlap + changed_files = [] try: result = subprocess.run( ["git", "status", "--short"], - capture_output=True, text=True, cwd=project_path, timeout=10 + capture_output=True, + text=True, + cwd=project_path, + timeout=GIT_TIMEOUT_SECONDS, ) - changed_files = [line.strip().split()[-1] for line in result.stdout.strip().split("\n") if line.strip()] - gates["RELEVANCE"] = { - "status": "PASS", - "detail": f"{len(changed_files)} files changed", - "files": changed_files[:20], - } + changed_files = [ + line.strip().split()[-1] + for line in result.stdout.strip().split("\n") + if line.strip() + ] + + # Check if any changed files are in DANGER ZONES + dz_hits = [] + for f in changed_files: + zone = _file_in_danger_zone(f, danger_zones) + if zone: + dz_hits.append(f"{f} (zone: {zone})") + + if dz_hits: + gates["RELEVANCE"] = { + "status": "WARN", + "detail": f"{len(changed_files)} files changed, " + f"{len(dz_hits)} in DANGER ZONE: {', '.join(dz_hits[:5])}", + "files": changed_files[:20], + "danger_zone_files": dz_hits, + } + else: + gates["RELEVANCE"] = { + "status": "PASS", + "detail": f"{len(changed_files)} files changed", + "files": changed_files[:20], + } except (subprocess.TimeoutExpired, FileNotFoundError): gates["RELEVANCE"] = {"status": "SKIP", "detail": "git not available"} @@ -176,26 +570,62 @@ def run_gates(project_path: str) -> dict: try: result = subprocess.run( ["git", "log", "--name-only", "--pretty=format:", "-10"], - capture_output=True, text=True, cwd=project_path, timeout=10 + capture_output=True, + text=True, + cwd=project_path, + timeout=GIT_TIMEOUT_SECONDS, ) files = [f.strip() for f in result.stdout.strip().split("\n") if f.strip()] - from collections import Counter freq = Counter(files) - hot_files = {f: c for f, c in freq.items() if c >= 3} + hot_files = {f: c for f, c in freq.items() if c >= HOT_FILE_EDIT_THRESHOLD} gates["ROOT_CAUSE"] = { "status": "WARN" if hot_files else "PASS", - "detail": f"Hot files: {', '.join(hot_files.keys())}" if hot_files else "No repeated edits detected", + "detail": ( + f"Hot files: {', '.join(hot_files.keys())}" + if hot_files + else "No repeated edits detected" + ), } except (subprocess.TimeoutExpired, FileNotFoundError): gates["ROOT_CAUSE"] = {"status": "SKIP", "detail": "git not available"} + # RECALL gate: verify BOUND constraints are accessible and recently read + if bound_data["has_bound"]: + recall_issues = [] + if not bound_data["danger_zones"]: + recall_issues.append("no DANGER ZONES parsed") + if not bound_data["iron_laws"]: + recall_issues.append("no IRON LAWS parsed") + if recall_issues: + gates["RECALL"] = { + "status": "WARN", + "detail": f"BOUND exists but incomplete: {', '.join(recall_issues)}", + } + else: + gates["RECALL"] = { + "status": "PASS", + "detail": ( + f"BOUND loaded: {len(bound_data['danger_zones'])} zones, " + f"{len(bound_data['never_do'])} prohibitions, " + f"{len(bound_data['iron_laws'])} laws" + ), + } + else: + gates["RECALL"] = { + "status": "WARN", + "detail": "No BOUND defined — constraints may be forgotten", + } + # MOMENTUM gate: check recent commit frequency try: result = subprocess.run( ["git", "log", "--oneline", "-5"], - capture_output=True, text=True, cwd=project_path, timeout=10 + capture_output=True, + text=True, + cwd=project_path, + timeout=GIT_TIMEOUT_SECONDS, ) - commits = [l for l in result.stdout.strip().split("\n") if l.strip()] + commits = [line for line in result.stdout.strip().split("\n") if line.strip()] gates["MOMENTUM"] = { "status": "PASS" if len(commits) >= 2 else "WARN", "detail": f"{len(commits)} recent commits", @@ -206,30 +636,43 @@ def run_gates(project_path: str) -> dict: return gates -def run_self_assessment(project_path: str) -> dict: +def run_self_assessment(project_path: str, _bound_data: dict = None) -> dict: """Layer 2: Self-assessment checks.""" checks = {} - # BOUND compliance: check CLAUDE.md for BOUND section - claude_md = os.path.join(project_path, "CLAUDE.md") + # BOUND compliance: use parse_claude_md() for structured check + bound_data = ( + _bound_data if _bound_data is not None else parse_claude_md(project_path) + ) + claude_md = _get_claude_md_path(project_path) if os.path.exists(claude_md): - try: - with open(claude_md, "r", encoding="utf-8") as f: - content = f.read() - has_bound = any(m in content for m in ["## BOUND", "# BOUND", "DANGER ZONE", "IRON LAW"]) + # File exists but parse returned empty content → read error + if not bound_data["raw_content"] and os.path.getsize(claude_md) > 0: checks["bound_compliance"] = { - "status": "PASS" if has_bound else "WARN", - "detail": "BOUND section found" if has_bound else "No BOUND section in CLAUDE.md", + "status": "SKIP", + "detail": "Cannot read CLAUDE.md", + } + elif bound_data["has_bound"]: + checks["bound_compliance"] = { + "status": "PASS", + "detail": "BOUND section found", + } + else: + checks["bound_compliance"] = { + "status": "WARN", + "detail": "No BOUND section in CLAUDE.md", } - except OSError: - checks["bound_compliance"] = {"status": "SKIP", "detail": "Cannot read CLAUDE.md"} else: checks["bound_compliance"] = {"status": "SKIP", "detail": "No CLAUDE.md"} # Test detection test_found = False for root, dirs, files in os.walk(project_path): - dirs[:] = [d for d in dirs if d not in {".git", "node_modules", "__pycache__", ".venv", ".ouro"}] + dirs[:] = [ + d + for d in dirs + if d not in {".git", "node_modules", "__pycache__", ".venv", ".ouro"} + ] for f in files: if "test" in f.lower() or "spec" in f.lower(): test_found = True @@ -248,20 +691,34 @@ def run_self_assessment(project_path: str) -> dict: def print_verification(results: dict): """Print verification results.""" print(f"{'=' * 50}") - print(f" Ouro Loop — Verification") + print(" Ouro Loop — Verification") print(f"{'=' * 50}") print(" Layer 1 — Gates:") for gate, info in results.get("layer1_gates", {}).items(): - icon = {"PASS": "+", "FAIL": "X", "WARN": "!", "SKIP": "-"}.get(info["status"], "?") + icon = {"PASS": "+", "FAIL": "X", "WARN": "!", "SKIP": "-"}.get( + info["status"], "?" + ) print(f" [{icon}] {gate:15s} {info['detail']}") print() print(" Layer 2 — Self-Assessment:") for check, info in results.get("layer2_self", {}).items(): - icon = {"PASS": "+", "FAIL": "X", "WARN": "!", "SKIP": "-"}.get(info["status"], "?") + icon = {"PASS": "+", "FAIL": "X", "WARN": "!", "SKIP": "-"}.get( + info["status"], "?" + ) print(f" [{icon}] {check:15s} {info['detail']}") + # Layer 3 + layer3 = results.get("layer3_review", {}) + print() + if layer3.get("required"): + print(" Layer 3 — External Review: REQUIRED") + for reason in layer3.get("reasons", []): + print(f" [!] {reason}") + else: + print(" Layer 3 — External Review: Not required") + print() overall = results.get("overall", "UNKNOWN") print(f" Overall: {overall}") @@ -269,15 +726,383 @@ def print_verification(results: dict): if overall == "FAIL": failures = results.get("failures", []) print(f" Failures: {', '.join(failures)}") + elif overall == "REVIEW": + print(" Action: Human review required before continuing") print(f"{'=' * 50}") + +# --------------------------------------------------------------------------- +# Pattern detection +# --------------------------------------------------------------------------- + +# Pattern detection thresholds +CONSECUTIVE_FAIL_THRESHOLD = 2 +VELOCITY_WINDOW = 5 +DRIFT_DIRECTORY_THRESHOLD = 5 + + +def detect_patterns(history: list, current_gates: dict = None) -> dict: + """Analyze history to detect behavioral patterns. + + This is the "Pattern" layer of the reflective log — it identifies + recurring behaviors that an LLM should be aware of when starting + a new iteration. + + Returns: + consecutive_failures: int — how many FAIL/RETRY in a row (tail) + stuck_loop: bool — same file failing repeatedly + velocity_trend: str — ACCELERATING / STABLE / DECELERATING / STALLED + hot_files: list — files appearing in ROOT_CAUSE warnings + drift_signal: bool — RELEVANCE gate has been warning + retry_rate: float — percentage of RETRY verdicts in recent history + """ + patterns = { + "consecutive_failures": 0, + "stuck_loop": False, + "velocity_trend": "UNKNOWN", + "hot_files": [], + "drift_signal": False, + "retry_rate": 0.0, + } + + # Extract gate-based signals (even with empty history) + if current_gates: + root_cause = current_gates.get("ROOT_CAUSE", {}) + detail = root_cause.get("detail", "") + if "Hot files:" in detail: + files_str = detail.replace("Hot files: ", "") + patterns["hot_files"] = [f.strip() for f in files_str.split(",")] + + relevance = current_gates.get("RELEVANCE", {}) + if relevance.get("danger_zone_files"): + patterns["drift_signal"] = True + + if not history: + return patterns + + # Consecutive failures (from tail) + for entry in reversed(history): + if entry.get("verdict") in ("FAIL", "RETRY"): + patterns["consecutive_failures"] += 1 + else: + break + + # Retry rate in recent window + window = history[-VELOCITY_WINDOW:] + retries = sum(1 for e in window if e.get("verdict") == "RETRY") + patterns["retry_rate"] = retries / len(window) if window else 0.0 + + # Velocity trend: compare pass rates in two halves of recent history + # Require >= 6 entries for meaningful trend detection — with 4-5 entries + # a single RETRY creates misleading DECELERATING signal + recent = ( + history[-VELOCITY_WINDOW * 2 :] + if len(history) >= VELOCITY_WINDOW * 2 + else history + ) + if len(recent) >= 6: + mid = len(recent) // 2 + first_half = recent[:mid] + second_half = recent[mid:] + first_pass_rate = sum( + 1 for e in first_half if e.get("verdict") == "PASS" + ) / len(first_half) + second_pass_rate = sum( + 1 for e in second_half if e.get("verdict") == "PASS" + ) / len(second_half) + diff = second_pass_rate - first_pass_rate + # Require > 0.3 swing (not 0.2) to reduce false positives + if diff > 0.3: + patterns["velocity_trend"] = "ACCELERATING" + elif diff < -0.3: + patterns["velocity_trend"] = "DECELERATING" + elif second_pass_rate == 0: + patterns["velocity_trend"] = "STALLED" + else: + patterns["velocity_trend"] = "STABLE" + + # Stuck loop: same stage appearing 3+ times consecutively + if len(history) >= 3: + last_stages = [e.get("stage") for e in history[-3:]] + last_verdicts = [e.get("verdict") for e in history[-3:]] + if len(set(last_stages)) == 1 and all( + v in ("FAIL", "RETRY") for v in last_verdicts + ): + patterns["stuck_loop"] = True + + return patterns + + +# --------------------------------------------------------------------------- +# Reflective logging (three-layer structured log) +# --------------------------------------------------------------------------- + +REFLECTIVE_LOG_LIMIT = 30 # keep last N entries + + +def build_reflective_entry( + project_path: str, verdict: str, verification: dict, notes: str = "" +) -> dict: + """Build a three-layer reflective log entry. + + Layer 1 — WHAT: what happened this iteration (facts, signals) + Layer 2 — WHY: why decisions were made (causal chain) + Layer 3 — PATTERN: behavioral patterns detected (self-awareness) + + This structured entry is designed to be quickly parseable by an LLM + at the start of the next iteration, providing ambient self-awareness + without requiring raw session replay. + """ + state = load_state(project_path, required=False) or {} + bound_data = parse_claude_md(project_path) + gates = verification.get("layer1_gates", {}) + layer3 = verification.get("layer3_review", {}) + + # Collect changed files from RELEVANCE gate + changed_files = gates.get("RELEVANCE", {}).get("files", []) + + # Detect complexity + complexity = detect_complexity( + project_path, changed_files, bound_data["danger_zones"] + ) + + # Detect patterns from history + history = state.get("history", []) + patterns = detect_patterns(history, gates) + + entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "iteration": len(history) + 1, + # Layer 1 — WHAT (facts) + "what": { + "stage": state.get("current_stage", "UNKNOWN"), + "phase": f"{state.get('current_phase', '?')}/{state.get('total_phases', '?')}", + "verdict": verdict, + "overall": verification.get("overall", "UNKNOWN"), + "gates": { + gate: { + "status": info.get("status", "?"), + "detail": info.get("detail", ""), + } + for gate, info in gates.items() + }, + "changed_files": changed_files[:10], + "danger_zone_contact": gates.get("RELEVANCE", {}).get( + "danger_zone_files", [] + ), + "bound_violations": sum( + 1 for v in gates.values() if v.get("status") == "FAIL" + ), + "review_required": layer3.get("required", False), + }, + # Layer 2 — WHY (decisions and causal chain) + "why": { + "complexity": complexity["level"], + "complexity_reason": complexity["reason"], + "review_reasons": layer3.get("reasons", []), + "bound_state": { + "danger_zones": len(bound_data["danger_zones"]), + "never_do": len(bound_data["never_do"]), + "iron_laws": len(bound_data["iron_laws"]), + }, + "notes": notes, + }, + # Layer 3 — PATTERN (self-awareness) + "pattern": { + "consecutive_failures": patterns["consecutive_failures"], + "stuck_loop": patterns["stuck_loop"], + "velocity_trend": patterns["velocity_trend"], + "retry_rate": round(patterns["retry_rate"], 2), + "hot_files": patterns["hot_files"], + "drift_signal": patterns["drift_signal"], + }, + } + + # Add actionable summary for quick LLM consumption + alerts = [] + if patterns["stuck_loop"]: + alerts.append( + "STUCK: same stage failing 3+ times — try fundamentally different approach" + ) + if patterns["consecutive_failures"] >= MAX_RETRY_BEFORE_ESCALATE: + alerts.append( + f"ESCALATE: {patterns['consecutive_failures']} consecutive failures — consider user review" + ) + if patterns["velocity_trend"] == "DECELERATING": + alerts.append("SLOWING: pass rate declining — reassess approach") + if patterns["velocity_trend"] == "STALLED": + alerts.append("STALLED: no passes in recent window — step back and remap") + if patterns["drift_signal"]: + alerts.append("DRIFT: working in DANGER ZONE — extra caution required") + if patterns["hot_files"]: + alerts.append( + f"HOT FILES: {', '.join(patterns['hot_files'][:3])} — possible symptom-chasing" + ) + + entry["alerts"] = alerts + + return entry + + +def write_reflective_log(project_path: str, entry: dict): + """Append a reflective log entry to .ouro/reflective-log.jsonl. + + Each line is a self-contained JSON object. The file is append-only + and trimmed to REFLECTIVE_LOG_LIMIT entries on write. + """ + log_path = os.path.join(project_path, OURO_DIR, REFLECTIVE_LOG) + os.makedirs(os.path.dirname(log_path), exist_ok=True) + + # Read existing entries + entries = [] + if os.path.exists(log_path): + try: + with open(log_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + except OSError: + pass + + entries.append(entry) + + # Trim to limit + entries = entries[-REFLECTIVE_LOG_LIMIT:] + + # Write back (atomic-ish: write to tmp then rename) + tmp_path = log_path + ".tmp" + try: + with open(tmp_path, "w", encoding="utf-8") as f: + for e in entries: + f.write(json.dumps(e, ensure_ascii=False) + "\n") + try: + os.replace(tmp_path, log_path) + except OSError: + # Fallback for cross-device moves (Docker volumes, NFS, etc.) + shutil.move(tmp_path, log_path) + except OSError as e: + print(f"Warning: Could not write reflective log: {e}") + # Clean up temp file if write or move failed + if os.path.exists(tmp_path): + try: + os.remove(tmp_path) + except OSError: + pass + + +def read_reflective_log(project_path: str, last_n: int = 5) -> list: + """Read the last N entries from the reflective log. + + Returns a list of dicts, newest last. + """ + log_path = os.path.join(project_path, OURO_DIR, REFLECTIVE_LOG) + if not os.path.exists(log_path): + return [] + + entries = [] + try: + with open(log_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + except OSError: + return [] + + return entries[-last_n:] + + +def print_reflective_summary(project_path: str, last_n: int = 5): + """Print a human-readable summary of recent reflective log entries.""" + entries = read_reflective_log(project_path, last_n) + if not entries: + print("No reflective log entries found.") + return + + print(f"{'=' * 60}") + print(f" Ouro Loop — Reflective Log (last {len(entries)} entries)") + print(f"{'=' * 60}") + + for i, entry in enumerate(entries, 1): + what = entry.get("what", {}) + why = entry.get("why", {}) + pattern = entry.get("pattern", {}) + alerts = entry.get("alerts", []) + + ts = entry.get("timestamp", "?")[:19] + iteration = entry.get("iteration", "?") + + print(f"\n #{iteration} [{ts}]") + print( + f" WHAT: {what.get('stage', '?')} {what.get('phase', '?')} " + f"→ {what.get('verdict', '?')} " + f"(overall: {what.get('overall', '?')})" + ) + + # Gate summary (compact) + gate_summary = [] + for gate, info in what.get("gates", {}).items(): + status = info.get("status", "?") + icon = {"PASS": "+", "FAIL": "X", "WARN": "!", "SKIP": "-"}.get(status, "?") + gate_summary.append(f"{gate}[{icon}]") + if gate_summary: + print(f" Gates: {' '.join(gate_summary)}") + + if what.get("danger_zone_contact"): + print(f" DZ contact: {', '.join(what['danger_zone_contact'][:3])}") + + print( + f" WHY: complexity={why.get('complexity', '?')} " + f"| {why.get('complexity_reason', '')}" + ) + if why.get("notes"): + print(f" notes: {why['notes']}") + + print( + f" PATTERN: velocity={pattern.get('velocity_trend', '?')} " + f"| failures={pattern.get('consecutive_failures', 0)} " + f"| retry_rate={pattern.get('retry_rate', 0):.0%}" + ) + if pattern.get("stuck_loop"): + print(" STUCK LOOP DETECTED") + if pattern.get("hot_files"): + print(f" hot: {', '.join(pattern['hot_files'][:3])}") + + if alerts: + for alert in alerts: + print(f" >> {alert}") + + # Overall trend + if len(entries) >= 3: + verdicts = [e.get("what", {}).get("verdict") for e in entries] + pass_count = sum(1 for v in verdicts if v == "PASS") + fail_count = sum(1 for v in verdicts if v in ("FAIL", "RETRY")) + print( + f"\n Trend: {pass_count} PASS / {fail_count} FAIL in last {len(entries)}" + ) + + last_pattern = entries[-1].get("pattern", {}) + velocity = last_pattern.get("velocity_trend", "UNKNOWN") + print(f" Velocity: {velocity}") + + print(f"\n{'=' * 60}") + + # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- + def log_phase_result(project_path: str, verdict: str, notes: str = ""): - """Log a phase result to ouro-results.tsv and update state.""" + """Log a phase result to ouro-results.tsv, state history, and reflective log.""" state = load_state(project_path) phase = state.get("current_phase") total = state.get("total_phases", 0) @@ -293,34 +1118,53 @@ def log_phase_result(project_path: str, verdict: str, notes: str = ""): # Count bound violations gate_results = results.get("layer1_gates", {}) - bound_violations = sum(1 for v in gate_results.values() if v.get("status") == "FAIL") + bound_violations = sum( + 1 for v in gate_results.values() if v.get("status") == "FAIL" + ) - # Log to TSV + # Log to TSV (with error handling for filesystem issues) results_path = os.path.join(project_path, RESULTS_FILE) - with open(results_path, "a") as f: - f.write(f"{phase_str}\t{verdict}\t{bound_violations}\t" - f"N/A\tnone\t{notes}\n") + try: + with open(results_path, "a") as f: + f.write(f"{phase_str}\t{verdict}\t{bound_violations}\tN/A\tnone\t{notes}\n") + except OSError as e: + print(f"Warning: Could not write to {results_path}: {e}") + + # Build reflective entry BEFORE updating state (so iteration count is correct) + reflective_entry = build_reflective_entry(project_path, verdict, results, notes) # Update state history - state.setdefault("history", []).append({ - "timestamp": datetime.now(timezone.utc).isoformat(), - "stage": state.get("current_stage", "UNKNOWN"), - "phase": phase_str, - "verdict": verdict, - "bound_violations": bound_violations, - "notes": notes, - }) - - # Keep last 50 history entries - state["history"] = state["history"][-50:] + state.setdefault("history", []).append( + { + "timestamp": datetime.now(timezone.utc).isoformat(), + "stage": state.get("current_stage", "UNKNOWN"), + "phase": phase_str, + "verdict": verdict, + "bound_violations": bound_violations, + "notes": notes, + } + ) + + # Keep last N history entries + state["history"] = state["history"][-HISTORY_LIMIT:] save_state(project_path, state) + # Write reflective log after state is saved + write_reflective_log(project_path, reflective_entry) + print(f"Logged: {phase_str} — {verdict}") + # Print alerts if any + if reflective_entry.get("alerts"): + for alert in reflective_entry["alerts"]: + print(f" >> {alert}") + + # --------------------------------------------------------------------------- # Phase advancement # --------------------------------------------------------------------------- + def advance_phase(project_path: str): """Advance to the next phase.""" state = load_state(project_path) @@ -343,35 +1187,36 @@ def advance_phase(project_path: str): save_state(project_path, state) print(f"Advanced to phase {phase + 1}/{total}") + # --------------------------------------------------------------------------- # BOUND check # --------------------------------------------------------------------------- + def check_bound(project_path: str): """Check BOUND compliance in CLAUDE.md.""" - claude_md = os.path.join(project_path, "CLAUDE.md") + claude_md = _get_claude_md_path(project_path) if not os.path.exists(claude_md): print("No CLAUDE.md found. BOUND not defined.") print("Run: python prepare.py template claude") return - with open(claude_md, "r", encoding="utf-8") as f: - content = f.read() + bound_data = parse_claude_md(project_path) + content = bound_data["raw_content"] # Detect template placeholders — template has keywords but no real content - template_markers = ["[PROJECT_NAME]", "[why it's dangerous]", "[action]", "[Invariant 1"] - is_template = any(marker in content for marker in template_markers) + is_template = any(marker in content for marker in TEMPLATE_PLACEHOLDERS) if is_template: print(f"{'=' * 50}") - print(f" Ouro Loop — BOUND Check") + print(" Ouro Loop — BOUND Check") print(f"{'=' * 50}") - print(f" [!] CLAUDE.md is still a template — fill in real BOUND values") - print(f" Edit CLAUDE.md to replace [placeholders] with actual boundaries") + print(" [!] CLAUDE.md is still a template — fill in real BOUND values") + print(" Edit CLAUDE.md to replace [placeholders] with actual boundaries") print(f"{'=' * 50}") return print(f"{'=' * 50}") - print(f" Ouro Loop — BOUND Check") + print(" Ouro Loop — BOUND Check") print(f"{'=' * 50}") sections = { @@ -387,6 +1232,24 @@ def check_bound(project_path: str): if not found: all_defined = False + # Show parsed BOUND details + if bound_data["danger_zones"]: + print(f"\n Parsed DANGER ZONES: {len(bound_data['danger_zones'])}") + for dz in bound_data["danger_zones"][:5]: + print(f" - {dz}") + if bound_data["iron_laws"]: + print(f" Parsed IRON LAWS: {len(bound_data['iron_laws'])}") + for il in bound_data["iron_laws"][:5]: + print(f" - {il}") + + source = bound_data.get("parse_source", "none") + if source == "fallback": + print( + "\n [!] Parse source: fallback (prose-style CLAUDE.md, results may be noisy)" + ) + elif source == "structured": + print("\n Parse source: structured") + print() if all_defined: print(" BOUND fully defined. Ready for BUILD.") @@ -397,6 +1260,7 @@ def check_bound(project_path: str): print(f"{'=' * 50}") + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- @@ -417,19 +1281,35 @@ def check_bound(project_path: str): # log log_parser = subparsers.add_parser("log", help="Log phase result") - log_parser.add_argument("verdict", choices=["PASS", "FAIL", "RETRY", "SKIP"], - help="Phase verdict") + log_parser.add_argument( + "verdict", choices=["PASS", "FAIL", "RETRY", "SKIP"], help="Phase verdict" + ) log_parser.add_argument("--notes", default="", help="Notes for this phase") log_parser.add_argument("--path", default=".", help="Project directory") # advance advance_parser = subparsers.add_parser("advance", help="Advance to next phase") - advance_parser.add_argument("path", nargs="?", default=".", help="Project directory") + advance_parser.add_argument( + "path", nargs="?", default=".", help="Project directory" + ) # bound-check bound_parser = subparsers.add_parser("bound-check", help="Check BOUND compliance") bound_parser.add_argument("path", nargs="?", default=".", help="Project directory") + # reflect + reflect_parser = subparsers.add_parser("reflect", help="Show reflective log") + reflect_parser.add_argument( + "path", nargs="?", default=".", help="Project directory" + ) + reflect_parser.add_argument( + "-n", + "--last", + type=int, + default=5, + help="Number of entries to show (default: 5)", + ) + args = parser.parse_args() if args.command is None: @@ -442,8 +1322,10 @@ def check_bound(project_path: str): results = run_verification(args.path) print_verification(results) elif args.command == "log": - log_phase_result(getattr(args, 'path', '.'), args.verdict, args.notes) + log_phase_result(getattr(args, "path", "."), args.verdict, args.notes) elif args.command == "advance": advance_phase(args.path) elif args.command == "bound-check": check_bound(args.path) + elif args.command == "reflect": + print_reflective_summary(args.path, args.last) diff --git a/hooks/bound-guard.sh b/hooks/bound-guard.sh index f7e8987..9d4a01d 100755 --- a/hooks/bound-guard.sh +++ b/hooks/bound-guard.sh @@ -11,10 +11,14 @@ CWD=$(echo "$INPUT" | jq -r '.cwd // empty') # No file path = not a file edit, allow [ -z "$FILE_PATH" ] && exit 0 -# Find CLAUDE.md +# Find CLAUDE.md — recursive upward search (up to 5 levels) CLAUDE_MD="" -for candidate in "$CWD/CLAUDE.md" "$CWD/../CLAUDE.md"; do - [ -f "$candidate" ] && CLAUDE_MD="$candidate" && break +SEARCH_DIR="$CWD" +for _ in 1 2 3 4 5; do + [ -f "$SEARCH_DIR/CLAUDE.md" ] && CLAUDE_MD="$SEARCH_DIR/CLAUDE.md" && break + PARENT=$(dirname "$SEARCH_DIR") + [ "$PARENT" = "$SEARCH_DIR" ] && break # reached filesystem root + SEARCH_DIR="$PARENT" done # No CLAUDE.md = no BOUND defined, allow (warn via stderr) @@ -33,13 +37,24 @@ DANGER_ZONES=$(sed -n '/### DANGER ZONES/,/### /p' "$CLAUDE_MD" \ # Make FILE_PATH relative to CWD for matching REL_PATH="${FILE_PATH#"$CWD"/}" -# Check if the file matches any DANGER ZONE +# Check if the file matches any DANGER ZONE (path-segment aware) +# Matches Python-side _file_in_danger_zone() behavior: +# - Directory zone "auth/" matches "auth/login.py" but NOT "unauthorized.py" +# - File zone "auth/core.py" matches exact path segments while IFS= read -r zone; do [ -z "$zone" ] && continue - if [[ "$REL_PATH" == $zone* || "$REL_PATH" == *"$zone"* ]]; then - # Match found — block via exit 2, stderr goes to agent as feedback - echo "DANGER ZONE: '$REL_PATH' matches bound '$zone' in CLAUDE.md. You must escalate to the user before modifying this file. Do NOT retry this edit without explicit user approval." >&2 - exit 2 + if [[ "$zone" == */ ]]; then + # Directory zone: only prefix match + if [[ "$REL_PATH" == "$zone"* ]]; then + echo "DANGER ZONE: '$REL_PATH' matches bound '$zone' in CLAUDE.md. You must escalate to the user before modifying this file. Do NOT retry this edit without explicit user approval." >&2 + exit 2 + fi + else + # File zone: exact match or path-segment match (zone appears after a /) + if [[ "$REL_PATH" == "$zone" || "$REL_PATH" == *"/$zone" || "$REL_PATH" == *"/$zone/"* ]]; then + echo "DANGER ZONE: '$REL_PATH' matches bound '$zone' in CLAUDE.md. You must escalate to the user before modifying this file. Do NOT retry this edit without explicit user approval." >&2 + exit 2 + fi fi done <<< "$DANGER_ZONES" diff --git a/hooks/momentum-gate.sh b/hooks/momentum-gate.sh new file mode 100755 index 0000000..9a35e3b --- /dev/null +++ b/hooks/momentum-gate.sh @@ -0,0 +1,77 @@ +#!/bin/bash +# Ouro Loop — MOMENTUM Gate (PostToolUse: Edit|Write|Read) +# +# Tracks the read/write ratio of tool calls. If the agent has been reading +# far more than writing (ratio > 3:1 in last 10 actions), it may be stuck. +# Warns the agent to make progress — write something, even if imperfect. + +INPUT=$(cat) +TOOL_NAME=$(echo "$INPUT" | jq -r '.tool_name // empty') +CWD=$(echo "$INPUT" | jq -r '.cwd // empty') + +[ -z "$TOOL_NAME" ] && exit 0 + +# State file to track read/write counts +STATE_DIR="${CWD}/.ouro" +TRACKER="${STATE_DIR}/momentum-tracker.json" + +# Ensure state dir exists +mkdir -p "$STATE_DIR" 2>/dev/null + +# Initialize tracker if missing +if [ ! -f "$TRACKER" ]; then + echo '{"reads": 0, "writes": 0, "actions": 0}' > "$TRACKER" +fi + +# Classify the tool action +IS_WRITE=false +IS_READ=false + +case "$TOOL_NAME" in + Edit|Write|NotebookEdit) + IS_WRITE=true + ;; + Read|Glob|Grep) + IS_READ=true + ;; +esac + +# Update counts +if [ "$IS_READ" = true ]; then + jq '.reads += 1 | .actions += 1' "$TRACKER" > "${TRACKER}.tmp" \ + && mv "${TRACKER}.tmp" "$TRACKER" +elif [ "$IS_WRITE" = true ]; then + jq '.writes += 1 | .actions += 1' "$TRACKER" > "${TRACKER}.tmp" \ + && mv "${TRACKER}.tmp" "$TRACKER" +else + # Other tools — increment actions only + jq '.actions += 1' "$TRACKER" > "${TRACKER}.tmp" \ + && mv "${TRACKER}.tmp" "$TRACKER" +fi + +# Check ratio every 10 actions +ACTIONS=$(jq -r '.actions // 0' "$TRACKER") +READS=$(jq -r '.reads // 0' "$TRACKER") +WRITES=$(jq -r '.writes // 0' "$TRACKER") + +if [ "$ACTIONS" -ge 10 ]; then + # Reset counter for next window + jq '.reads = 0 | .writes = 0 | .actions = 0' "$TRACKER" > "${TRACKER}.tmp" \ + && mv "${TRACKER}.tmp" "$TRACKER" + + if [ "$WRITES" -eq 0 ] && [ "$READS" -gt 3 ]; then + cat << EOF +{ + "additionalContext": "[MOMENTUM] ${READS} reads, ${WRITES} writes in last 10 actions. You may be stuck in analysis paralysis. Stop reading and write something — a test, a stub, a prototype. Iterate." +} +EOF + elif [ "$WRITES" -gt 0 ] && [ "$READS" -ge "$((WRITES * 3))" ]; then + cat << EOF +{ + "additionalContext": "[MOMENTUM] Read/write ratio is ${READS}:${WRITES} (above 3:1 threshold). Consider making forward progress — write code, don't just read." +} +EOF + fi +fi + +exit 0 diff --git a/hooks/recall-gate.sh b/hooks/recall-gate.sh index 736560b..27a2db7 100755 --- a/hooks/recall-gate.sh +++ b/hooks/recall-gate.sh @@ -7,8 +7,12 @@ CWD=$(cat | jq -r '.cwd // empty') CLAUDE_MD="" -for candidate in "$CWD/CLAUDE.md" "$CWD/../CLAUDE.md"; do - [ -f "$candidate" ] && CLAUDE_MD="$candidate" && break +SEARCH_DIR="$CWD" +for _ in 1 2 3 4 5; do + [ -f "$SEARCH_DIR/CLAUDE.md" ] && CLAUDE_MD="$SEARCH_DIR/CLAUDE.md" && break + PARENT=$(dirname "$SEARCH_DIR") + [ "$PARENT" = "$SEARCH_DIR" ] && break + SEARCH_DIR="$PARENT" done if [ -z "$CLAUDE_MD" ]; then @@ -17,7 +21,7 @@ if [ -z "$CLAUDE_MD" ]; then fi # Extract the BOUND section (everything between ## BOUND and the next ##) -BOUND_SECTION=$(sed -n '/^## BOUND/,/^## [^B]/p' "$CLAUDE_MD" | head -50) +BOUND_SECTION=$(sed -n '/^## BOUND/,/^## [^B]/p' "$CLAUDE_MD") if [ -n "$BOUND_SECTION" ]; then echo "[RECALL] Context compacting. Re-injecting BOUND constraints:" diff --git a/hooks/settings.json.template b/hooks/settings.json.template index 475f945..63a81bb 100644 --- a/hooks/settings.json.template +++ b/hooks/settings.json.template @@ -25,6 +25,11 @@ "type": "command", "command": "$OURO_LOOP_DIR/hooks/root-cause-tracker.sh", "timeout": 5 + }, + { + "type": "command", + "command": "$OURO_LOOP_DIR/hooks/momentum-gate.sh", + "timeout": 5 } ] } diff --git a/prepare.py b/prepare.py index 9528843..b8881ec 100644 --- a/prepare.py +++ b/prepare.py @@ -17,38 +17,77 @@ import shutil import argparse from datetime import datetime, timezone -from pathlib import Path from collections import Counter +# Import shared constants from framework.py (DRY) +from framework import ( + OURO_DIR, + STATE_FILE, + RESULTS_FILE, + CLAUDE_MD_FILENAME, + parse_claude_md, +) + # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- - -OURO_DIR = ".ouro" -STATE_FILE = "state.json" -RESULTS_FILE = "ouro-results.tsv" TEMPLATES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") MODULES_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "modules") # File extensions to language mapping LANG_MAP = { - ".py": "Python", ".js": "JavaScript", ".ts": "TypeScript", - ".tsx": "TypeScript (React)", ".jsx": "JavaScript (React)", - ".rs": "Rust", ".go": "Go", ".java": "Java", ".kt": "Kotlin", - ".swift": "Swift", ".rb": "Ruby", ".php": "PHP", ".c": "C", - ".cpp": "C++", ".h": "C/C++ Header", ".cs": "C#", - ".sol": "Solidity", ".move": "Move", ".vy": "Vyper", - ".sql": "SQL", ".sh": "Shell", ".md": "Markdown", - ".yml": "YAML", ".yaml": "YAML", ".toml": "TOML", - ".json": "JSON", ".html": "HTML", ".css": "CSS", - ".scss": "SCSS", ".svelte": "Svelte", ".vue": "Vue", + ".py": "Python", + ".js": "JavaScript", + ".ts": "TypeScript", + ".tsx": "TypeScript (React)", + ".jsx": "JavaScript (React)", + ".rs": "Rust", + ".go": "Go", + ".java": "Java", + ".kt": "Kotlin", + ".swift": "Swift", + ".rb": "Ruby", + ".php": "PHP", + ".c": "C", + ".cpp": "C++", + ".h": "C/C++ Header", + ".cs": "C#", + ".sol": "Solidity", + ".move": "Move", + ".vy": "Vyper", + ".sql": "SQL", + ".sh": "Shell", + ".md": "Markdown", + ".yml": "YAML", + ".yaml": "YAML", + ".toml": "TOML", + ".json": "JSON", + ".html": "HTML", + ".css": "CSS", + ".scss": "SCSS", + ".svelte": "Svelte", + ".vue": "Vue", } # Directories to skip during scanning SKIP_DIRS = { - ".git", ".ouro", "node_modules", "__pycache__", ".venv", - "venv", ".next", "build", "dist", "target", ".idea", ".vscode", - "vendor", "Pods", ".build", "DerivedData", ".cache", + ".git", + ".ouro", + "node_modules", + "__pycache__", + ".venv", + "venv", + ".next", + "build", + "dist", + "target", + ".idea", + ".vscode", + "vendor", + "Pods", + ".build", + "DerivedData", + ".cache", } # Marker files that indicate project type @@ -74,94 +113,115 @@ # Scan # --------------------------------------------------------------------------- -def scan_project(project_path: str) -> dict: - """Scan a project directory and return a structured summary.""" - project_path = os.path.abspath(project_path) - if not os.path.isdir(project_path): - print(f"Error: {project_path} is not a directory") - sys.exit(1) - result = { - "path": project_path, - "name": os.path.basename(project_path), - "scanned_at": datetime.now(timezone.utc).isoformat(), - "project_types": [], +def _detect_project_types(project_path: str) -> list: + """Detect project types from marker files.""" + types = [] + for marker, ptype in PROJECT_MARKERS.items(): + if os.path.exists(os.path.join(project_path, marker)): + types.append(ptype) + return types + + +def _scan_files(project_path: str) -> dict: + """Walk the project tree and collect file statistics. + + Returns a dict with: languages, file_count, dir_count, total_lines, + has_claude_md, has_tests, has_ci. + """ + stats = { "languages": Counter(), "file_count": 0, "dir_count": 0, "total_lines": 0, - "top_directories": [], "has_claude_md": False, "has_tests": False, "has_ci": False, - "bound_detected": False, - "danger_zones": [], } - # Detect project types - for marker, ptype in PROJECT_MARKERS.items(): - if os.path.exists(os.path.join(project_path, marker)): - result["project_types"].append(ptype) - - # Walk the project for root, dirs, files in os.walk(project_path): - # Filter out skip directories dirs[:] = [d for d in dirs if d not in SKIP_DIRS] - result["dir_count"] += len(dirs) + stats["dir_count"] += len(dirs) rel_root = os.path.relpath(root, project_path) for fname in files: filepath = os.path.join(root, fname) - result["file_count"] += 1 + stats["file_count"] += 1 # Language detection ext = os.path.splitext(fname)[1].lower() if ext in LANG_MAP: - result["languages"][LANG_MAP[ext]] += 1 - - # Count lines for code files - if ext in LANG_MAP: + stats["languages"][LANG_MAP[ext]] += 1 + # Count lines for code files try: with open(filepath, "r", encoding="utf-8", errors="ignore") as f: - result["total_lines"] += sum(1 for _ in f) + stats["total_lines"] += sum(1 for _ in f) except (OSError, PermissionError): pass # Special file detection - if fname == "CLAUDE.md": - result["has_claude_md"] = True - # Check for BOUND markers - try: - with open(filepath, "r", encoding="utf-8") as f: - content = f.read() - if any(marker in content for marker in - ["DANGER ZONE", "NEVER DO", "IRON LAW", - "## BOUND", "# BOUND"]): - result["bound_detected"] = True - except (OSError, PermissionError): - pass + if fname == CLAUDE_MD_FILENAME: + stats["has_claude_md"] = True # Test detection if "test" in fname.lower() or "spec" in fname.lower(): - result["has_tests"] = True + stats["has_tests"] = True # CI detection if rel_root in [".github/workflows", ".circleci", ".gitlab-ci"]: - result["has_ci"] = True + stats["has_ci"] = True + + return stats - # Top-level directories + +def _get_top_directories(project_path: str) -> list: + """List top-level directories, excluding SKIP_DIRS.""" try: - top_dirs = sorted([ - d for d in os.listdir(project_path) - if os.path.isdir(os.path.join(project_path, d)) and d not in SKIP_DIRS - ]) - result["top_directories"] = top_dirs + return sorted( + [ + d + for d in os.listdir(project_path) + if os.path.isdir(os.path.join(project_path, d)) and d not in SKIP_DIRS + ] + ) except OSError: - pass + return [] + - # Convert Counter to dict for JSON serialization - result["languages"] = dict(result["languages"].most_common(10)) +def scan_project(project_path: str) -> dict: + """Scan a project directory and return a structured summary. + + Orchestrates sub-functions for project type detection, file scanning, + BOUND detection, and directory listing (SRP). + """ + project_path = os.path.abspath(project_path) + if not os.path.isdir(project_path): + print(f"Error: {project_path} is not a directory") + sys.exit(1) + + # File statistics + file_stats = _scan_files(project_path) + + # BOUND detection via shared parser + bound_data = parse_claude_md(project_path) + + result = { + "path": project_path, + "name": os.path.basename(project_path), + "scanned_at": datetime.now(timezone.utc).isoformat(), + "project_types": _detect_project_types(project_path), + "languages": dict(file_stats["languages"].most_common(10)), + "file_count": file_stats["file_count"], + "dir_count": file_stats["dir_count"], + "total_lines": file_stats["total_lines"], + "top_directories": _get_top_directories(project_path), + "has_claude_md": file_stats["has_claude_md"], + "has_tests": file_stats["has_tests"], + "has_ci": file_stats["has_ci"], + "bound_detected": bound_data["has_bound"], + "danger_zones": bound_data["danger_zones"], + } return result @@ -169,7 +229,7 @@ def scan_project(project_path: str) -> dict: def print_scan_report(scan: dict): """Print a human-readable scan report.""" print(f"{'=' * 60}") - print(f" Ouro Loop — Project Scan") + print(" Ouro Loop — Project Scan") print(f"{'=' * 60}") print(f" Project: {scan['name']}") print(f" Path: {scan['path']}") @@ -202,11 +262,15 @@ def print_scan_report(scan: dict): # Recommendations recommendations = [] if not scan["bound_detected"]: - recommendations.append("Define BOUND (DANGER ZONES, NEVER DO, IRON LAWS) before building") + recommendations.append( + "Define BOUND (DANGER ZONES, NEVER DO, IRON LAWS) before building" + ) if not scan["has_tests"]: recommendations.append("Add tests — VERIFY stage requires testable assertions") if not scan["has_claude_md"]: - recommendations.append("Create CLAUDE.md with BOUND section (use: python prepare.py template claude)") + recommendations.append( + "Create CLAUDE.md with BOUND section (use: python prepare.py template claude)" + ) if not scan["has_ci"]: recommendations.append("Consider adding CI for automated Layer 2 verification") @@ -217,10 +281,12 @@ def print_scan_report(scan: dict): print(f" {i}. {rec}") print() + # --------------------------------------------------------------------------- # Init # --------------------------------------------------------------------------- + def init_ouro(project_path: str): """Initialize .ouro/ directory with initial state.""" ouro_path = os.path.join(project_path, OURO_DIR) @@ -256,7 +322,9 @@ def init_ouro(project_path: str): results_path = os.path.join(project_path, RESULTS_FILE) if not os.path.exists(results_path): with open(results_path, "w") as f: - f.write("phase\tverdict\tbound_violations\ttest_pass_rate\tscope_deviation\tnotes\n") + f.write( + "phase\tverdict\tbound_violations\ttest_pass_rate\tscope_deviation\tnotes\n" + ) print(f"Ouro initialized at {ouro_path}") print(f" State: {state_path}") @@ -270,6 +338,7 @@ def init_ouro(project_path: str): else: print("BOUND detected. Ready to start the Ouro Loop.") + # --------------------------------------------------------------------------- # Templates # --------------------------------------------------------------------------- @@ -280,6 +349,7 @@ def init_ouro(project_path: str): "verify": "verify-checklist.md.template", } + def install_template(template_type: str, project_path: str): """Copy a template to the project directory.""" if template_type not in TEMPLATE_MAP: @@ -306,6 +376,7 @@ def install_template(template_type: str, project_path: str): print(f"Template installed: {dst}") print(f" Edit this file to define your project's {template_type} configuration.") + # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- @@ -318,20 +389,27 @@ def install_template(template_type: str, project_path: str): # scan scan_parser = subparsers.add_parser("scan", help="Scan project structure") - scan_parser.add_argument("path", nargs="?", default=".", - help="Project directory to scan (default: current dir)") + scan_parser.add_argument( + "path", + nargs="?", + default=".", + help="Project directory to scan (default: current dir)", + ) # init init_parser = subparsers.add_parser("init", help="Initialize .ouro/ directory") - init_parser.add_argument("path", nargs="?", default=".", - help="Project directory (default: current dir)") + init_parser.add_argument( + "path", nargs="?", default=".", help="Project directory (default: current dir)" + ) # template tmpl_parser = subparsers.add_parser("template", help="Install a template") - tmpl_parser.add_argument("type", choices=TEMPLATE_MAP.keys(), - help="Template type to install") - tmpl_parser.add_argument("path", nargs="?", default=".", - help="Project directory (default: current dir)") + tmpl_parser.add_argument( + "type", choices=TEMPLATE_MAP.keys(), help="Template type to install" + ) + tmpl_parser.add_argument( + "path", nargs="?", default=".", help="Project directory (default: current dir)" + ) args = parser.parse_args() diff --git a/pyproject.toml b/pyproject.toml index 5a1c381..c17b1e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,3 +34,6 @@ py-modules = ["framework", "prepare"] [tool.setuptools.package-data] "*" = ["*.md", "*.sh", "*.json"] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/tests/test_coverage_gaps.py b/tests/test_coverage_gaps.py new file mode 100644 index 0000000..e5be013 --- /dev/null +++ b/tests/test_coverage_gaps.py @@ -0,0 +1,870 @@ +""" +Tests covering previously untested code paths identified by coverage gap analysis. + +Covers: +- run_verification() returning "REVIEW" overall +- _check_layer3_triggers() architectural complexity branch +- detect_complexity() architectural level +- build_reflective_entry() all 4 alert types (DRIFT, HOT FILES, SLOWING, STALLED) +- write_reflective_log() OSError recovery paths +- log_phase_result() TSV write OSError warning +- print_verification() REVIEW output branch +- print_reflective_summary() conditional branches (DZ, notes, stuck, hot files) +- parse_claude_md() star (*) list markers, DANGER ZONE singular form, end-of-file +- _scan_files() CI detection for .circleci and .gitlab-ci +- _file_in_danger_zone() edge cases (exact match, empty zone string) +- detect_patterns() velocity_trend UNKNOWN with < 4 entries +- read_reflective_log() last_n larger than actual entries +- show_status() edge cases +- check_bound() parsed details output + +Run with: + python3 -m pytest tests/test_coverage_gaps.py -v +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +from io import StringIO +from unittest.mock import patch, MagicMock + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, PROJECT_ROOT) + +import framework +import prepare + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_tmp() -> str: + return tempfile.mkdtemp() + + +def _write(path: str, content: str = ""): + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write(content) + + +def _make_state(project_path: str, **overrides) -> dict: + state = { + "version": "0.1.0", + "project_name": "test-project", + "current_stage": "BUILD", + "current_phase": 1, + "total_phases": 3, + "bound_defined": False, + "history": [], + } + state.update(overrides) + ouro_dir = os.path.join(project_path, ".ouro") + os.makedirs(ouro_dir, exist_ok=True) + with open(os.path.join(ouro_dir, "state.json"), "w") as f: + json.dump(state, f, indent=2) + return state + + +# --------------------------------------------------------------------------- +# run_verification() → overall = "REVIEW" +# --------------------------------------------------------------------------- + +class TestRunVerificationReviewPath(unittest.TestCase): + """run_verification() returns overall='REVIEW' when review is required + but no gates have failed.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run") + def test_review_when_dz_touched_no_gate_fail(self, mock_run): + """DANGER ZONE contact with no FAIL gates → overall = REVIEW.""" + _write(os.path.join(self.tmp, "CLAUDE.md"), + "## BOUND\n### DANGER ZONES\n- `src/pay/` — payments\n" + "### IRON LAWS\n- Rule 1\n") + # Git status returns a file in DZ + def side_effect(cmd, **kwargs): + if "status" in cmd: + return MagicMock(stdout=" M src/pay/stripe.py\n", returncode=0) + return MagicMock(stdout="", returncode=0) + mock_run.side_effect = side_effect + + results = framework.run_verification(self.tmp) + self.assertEqual(results["overall"], "REVIEW") + self.assertIn("review_reasons", results) + self.assertTrue(len(results["review_reasons"]) > 0) + + @patch("framework.subprocess.run") + def test_fail_takes_priority_over_review(self, mock_run): + """Gate FAIL + review required → overall = FAIL, not REVIEW.""" + mock_run.return_value = MagicMock(stdout="", returncode=0) + _make_state(self.tmp, bound_defined=True) + _write(os.path.join(self.tmp, "CLAUDE.md"), "## BOUND\nrules\n") + + # Force a deterministic FAIL by mocking run_gates + fake_gates = { + "EXIST": {"status": "FAIL", "detail": "forced"}, + "RELEVANCE": { + "status": "WARN", + "files": ["src/x.py"], + "danger_zone_files": ["src/x.py (zone: src/)"], + }, + } + with patch("framework.run_gates", return_value=fake_gates): + results = framework.run_verification(self.tmp) + # FAIL must take priority even when review is also required + self.assertEqual(results["overall"], "FAIL") + self.assertIn("EXIST", results["failures"]) + + +# --------------------------------------------------------------------------- +# _check_layer3_triggers() — architectural complexity +# --------------------------------------------------------------------------- + +class TestLayer3ArchitecturalComplexity(unittest.TestCase): + """architectural complexity triggers Layer 3 review.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_architectural_complexity_triggers_review(self): + """Files touching IRON area should trigger architectural review.""" + _write(os.path.join(self.tmp, "CLAUDE.md"), + "## BOUND\n### DANGER ZONES\n- `IRON_config/` — law config\n" + "### IRON LAWS\n- Law 1\n") + results = { + "layer1_gates": { + "RELEVANCE": { + "status": "PASS", + "files": ["IRON_config/rules.py"], + }, + } + } + review = framework._check_layer3_triggers(self.tmp, results) + self.assertTrue(review["required"]) + self.assertTrue(any("Architectural" in r for r in review["reasons"])) + + def test_empty_changed_files_skips_complexity(self): + results = { + "layer1_gates": { + "RELEVANCE": {"status": "PASS", "files": []}, + } + } + review = framework._check_layer3_triggers(self.tmp, results) + # Should not crash and should not trigger architectural + arch_reasons = [r for r in review["reasons"] if "Architectural" in r] + self.assertEqual(len(arch_reasons), 0) + + def test_multiple_triggers_accumulate_reasons(self): + """Multiple trigger conditions produce multiple reasons.""" + _make_state(self.tmp, history=[ + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + ]) + results = { + "layer1_gates": { + "EXIST": {"status": "FAIL", "detail": "missing"}, + "RELEVANCE": { + "status": "WARN", + "files": ["a.py"], + "danger_zone_files": ["a.py (zone: src/)"], + }, + } + } + review = framework._check_layer3_triggers(self.tmp, results) + self.assertTrue(review["required"]) + self.assertGreaterEqual(len(review["reasons"]), 3) + + +# --------------------------------------------------------------------------- +# detect_complexity() — architectural level +# --------------------------------------------------------------------------- + +class TestDetectComplexityArchitectural(unittest.TestCase): + """detect_complexity() correctly identifies architectural level.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_iron_in_dz_triggers_architectural(self): + result = framework.detect_complexity( + self.tmp, + ["IRON_config/rules.py"], + ["IRON_config/"] + ) + self.assertEqual(result["level"], "architectural") + self.assertIn("IRON LAW", result["reason"]) + + def test_exactly_four_files_is_complex(self): + """4 files without DZ contact → complex (boundary: > 3).""" + result = framework.detect_complexity( + self.tmp, + ["a.py", "b.py", "c.py", "d.py"], + [] + ) + self.assertEqual(result["level"], "complex") + + def test_default_none_args(self): + """None args default to empty lists without crash.""" + result = framework.detect_complexity(self.tmp) + self.assertEqual(result["level"], "trivial") + + +# --------------------------------------------------------------------------- +# build_reflective_entry() — all alert types +# --------------------------------------------------------------------------- + +class TestBuildReflectiveEntryAlerts(unittest.TestCase): + """All alert types are correctly triggered.""" + + def setUp(self): + self.tmp = _make_tmp() + _make_state(self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _build_with_gates(self, gates, history=None): + if history: + _make_state(self.tmp, history=history) + verification = { + "layer1_gates": gates, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + return framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + + def test_drift_alert(self): + gates = { + "RELEVANCE": { + "status": "WARN", + "files": ["src/payments/x.py"], + "danger_zone_files": ["src/payments/x.py"], + } + } + entry = self._build_with_gates(gates) + self.assertTrue(any("DRIFT" in a for a in entry["alerts"])) + + def test_hot_files_alert(self): + gates = { + "ROOT_CAUSE": { + "status": "WARN", + "detail": "Hot files: framework.py, prepare.py", + } + } + entry = self._build_with_gates(gates) + self.assertTrue(any("HOT FILES" in a for a in entry["alerts"])) + + def test_slowing_alert(self): + """DECELERATING velocity → SLOWING alert (requires 6+ entries, > 0.3 swing).""" + history = ( + [{"verdict": "PASS", "stage": "BUILD"}] * 3 + + [{"verdict": "FAIL", "stage": "BUILD"}] * 3 + ) + entry = self._build_with_gates({}, history=history) + self.assertTrue(any("SLOWING" in a for a in entry["alerts"])) + + def test_stalled_alert(self): + """All failures → STALLED alert (requires 6+ entries).""" + history = [{"verdict": "FAIL", "stage": "BUILD"}] * 6 + entry = self._build_with_gates({}, history=history) + self.assertTrue(any("STALLED" in a for a in entry["alerts"])) + + def test_no_alerts_when_healthy(self): + entry = self._build_with_gates({}) + self.assertEqual(entry["alerts"], []) + + def test_bound_violations_counted_in_what(self): + gates = { + "EXIST": {"status": "FAIL", "detail": "missing"}, + "RECALL": {"status": "FAIL", "detail": "no bound"}, + } + verification = { + "layer1_gates": gates, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "FAIL", + } + entry = framework.build_reflective_entry( + self.tmp, "FAIL", verification + ) + self.assertEqual(entry["what"]["bound_violations"], 2) + + def test_review_required_in_what(self): + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": True, "reasons": ["DZ touched"]}, + "overall": "REVIEW", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + self.assertTrue(entry["what"]["review_required"]) + + +# --------------------------------------------------------------------------- +# write_reflective_log() — OSError recovery +# --------------------------------------------------------------------------- + +class TestWriteReflectiveLogOSError(unittest.TestCase): + """write_reflective_log() handles filesystem errors gracefully.""" + + def setUp(self): + self.tmp = _make_tmp() + os.makedirs(os.path.join(self.tmp, ".ouro"), exist_ok=True) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_oserror_on_write_prints_warning(self): + """OSError during write prints warning but doesn't crash.""" + entry = {"test": True} + original_open = open + + def fail_on_tmp(path, *args, **kwargs): + if str(path).endswith(".tmp"): + raise OSError("disk full") + return original_open(path, *args, **kwargs) + + buf = StringIO() + with patch("builtins.open", side_effect=fail_on_tmp), \ + patch("sys.stdout", buf): + framework.write_reflective_log(self.tmp, entry) + self.assertIn("Warning", buf.getvalue()) + + def test_ouro_dir_auto_created(self): + """write creates .ouro/ if it doesn't exist.""" + new_tmp = _make_tmp() + try: + entry = {"test": True} + framework.write_reflective_log(new_tmp, entry) + self.assertTrue(os.path.exists( + os.path.join(new_tmp, ".ouro", "reflective-log.jsonl") + )) + finally: + shutil.rmtree(new_tmp, ignore_errors=True) + + def test_non_ascii_content_preserved(self): + """Non-ASCII characters are preserved in JSONL output.""" + entry = {"notes": "Chinese characters: \u4e2d\u6587\u6d4b\u8bd5, symbol: \u2705"} + framework.write_reflective_log(self.tmp, entry) + entries = framework.read_reflective_log(self.tmp) + self.assertIn("\u4e2d\u6587", entries[0]["notes"]) + + +# --------------------------------------------------------------------------- +# log_phase_result() — TSV OSError path +# --------------------------------------------------------------------------- + +class TestLogPhaseResultTSVError(unittest.TestCase): + """log_phase_result() handles TSV write failures gracefully.""" + + def setUp(self): + self.tmp = _make_tmp() + _make_state(self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run") + def test_tsv_oserror_prints_warning_continues(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + # Make results file path point to a directory (can't write) + results_dir = os.path.join(self.tmp, "ouro-results.tsv") + os.makedirs(results_dir, exist_ok=True) + + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "PASS") + output = buf.getvalue() + self.assertIn("Warning", output) + # But log still completes + self.assertIn("Logged", output) + + +# --------------------------------------------------------------------------- +# print_verification() — REVIEW path +# --------------------------------------------------------------------------- + +class TestPrintVerificationReview(unittest.TestCase): + """print_verification() correctly formats REVIEW overall status.""" + + def _capture(self, results): + buf = StringIO() + with patch("sys.stdout", buf): + framework.print_verification(results) + return buf.getvalue() + + def test_review_overall_shows_action(self): + results = { + "layer1_gates": {"EXIST": {"status": "PASS", "detail": "ok"}}, + "layer2_self": {}, + "layer3_review": { + "required": True, + "reasons": ["DANGER ZONE touched: src/pay/"], + }, + "overall": "REVIEW", + } + output = self._capture(results) + self.assertIn("Human review required", output) + self.assertIn("REQUIRED", output) + self.assertIn("DANGER ZONE", output) + + def test_layer3_not_required_shows_text(self): + results = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + output = self._capture(results) + self.assertIn("Not required", output) + + +# --------------------------------------------------------------------------- +# print_reflective_summary() — conditional branches +# --------------------------------------------------------------------------- + +class TestPrintReflectiveSummaryBranches(unittest.TestCase): + """Conditional output branches in print_reflective_summary().""" + + def setUp(self): + self.tmp = _make_tmp() + os.makedirs(os.path.join(self.tmp, ".ouro"), exist_ok=True) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _capture(self, last_n=5): + buf = StringIO() + with patch("sys.stdout", buf): + framework.print_reflective_summary(self.tmp, last_n) + return buf.getvalue() + + def _write_entry(self, **overrides): + base = { + "timestamp": "2026-01-01T12:00:00+00:00", + "iteration": 1, + "what": {"stage": "BUILD", "phase": "1/3", "verdict": "PASS", + "overall": "PASS", "gates": {}, + "changed_files": [], "danger_zone_contact": [], + "bound_violations": 0, "review_required": False}, + "why": {"complexity": "simple", "complexity_reason": "", + "review_reasons": [], + "bound_state": {"danger_zones": 0, "never_do": 0, "iron_laws": 0}, + "notes": ""}, + "pattern": {"consecutive_failures": 0, "stuck_loop": False, + "velocity_trend": "STABLE", "retry_rate": 0.0, + "hot_files": [], "drift_signal": False}, + "alerts": [], + } + # Deep merge overrides + for key, val in overrides.items(): + if isinstance(val, dict) and key in base and isinstance(base[key], dict): + base[key].update(val) + else: + base[key] = val + framework.write_reflective_log(self.tmp, base) + + def test_dz_contact_shown(self): + self._write_entry(what={ + "stage": "BUILD", "phase": "1/3", "verdict": "PASS", + "overall": "PASS", "gates": {}, + "changed_files": [], "bound_violations": 0, + "review_required": False, + "danger_zone_contact": ["src/payments/stripe.py"], + }) + output = self._capture() + self.assertIn("DZ contact", output) + + def test_notes_shown(self): + self._write_entry(why={ + "complexity": "simple", "complexity_reason": "", + "review_reasons": [], + "bound_state": {"danger_zones": 0, "never_do": 0, "iron_laws": 0}, + "notes": "important context here", + }) + output = self._capture() + self.assertIn("important context here", output) + + def test_stuck_loop_shown(self): + self._write_entry(pattern={ + "consecutive_failures": 3, "stuck_loop": True, + "velocity_trend": "STALLED", "retry_rate": 1.0, + "hot_files": [], "drift_signal": False, + }) + output = self._capture() + self.assertIn("STUCK LOOP DETECTED", output) + + def test_hot_files_shown(self): + self._write_entry(pattern={ + "consecutive_failures": 0, "stuck_loop": False, + "velocity_trend": "STABLE", "retry_rate": 0.0, + "hot_files": ["framework.py", "prepare.py"], "drift_signal": False, + }) + output = self._capture() + self.assertIn("hot:", output) + self.assertIn("framework.py", output) + + def test_trend_not_shown_with_two_entries(self): + self._write_entry() + self._write_entry(iteration=2) + output = self._capture() + self.assertNotIn("Trend:", output) + + +# --------------------------------------------------------------------------- +# parse_claude_md() — edge cases +# --------------------------------------------------------------------------- + +class TestParseClaude_EdgeCases(unittest.TestCase): + """parse_claude_md() edge cases: star markers, singular form, end-of-file.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_star_list_markers_in_never_do(self): + content = ("### NEVER DO\n" + "* Never use float for money\n" + "* Never skip tests\n") + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(len(result["never_do"]), 2) + self.assertIn("Never use float for money", result["never_do"]) + + def test_star_list_markers_in_iron_laws(self): + content = ("### IRON LAWS\n" + "* All API responses include request_id\n" + "* Coverage > 90%\n") + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(len(result["iron_laws"]), 2) + + def test_singular_danger_zone_header(self): + content = ("### DANGER ZONE\n" + "- `src/core/` — core logic\n") + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(result["danger_zones"], ["src/core/"]) + + def test_section_at_end_of_file_no_trailing_header(self): + content = ("### IRON LAWS\n" + "- Must always log audit trail\n" + "- Coverage never below 80%\n") + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(len(result["iron_laws"]), 2) + + def test_empty_file_has_bound_false(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), "") + result = framework.parse_claude_md(self.tmp) + self.assertFalse(result["has_bound"]) + + def test_whitespace_only_file(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), " \n\n \n") + result = framework.parse_claude_md(self.tmp) + self.assertFalse(result["has_bound"]) + + def test_danger_zone_without_backticks_not_extracted(self): + content = ("### DANGER ZONES\n" + "- src/payments/ — no backticks\n") + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(result["danger_zones"], []) + + def test_h2_bound_header_triggers_has_bound(self): + content = "## BOUND\nSome rules here\n" + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertTrue(result["has_bound"]) + + def test_h1_bound_header_triggers_has_bound(self): + content = "# BOUND\nSome rules here\n" + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertTrue(result["has_bound"]) + + +# --------------------------------------------------------------------------- +# _file_in_danger_zone() — edge cases +# --------------------------------------------------------------------------- + +class TestFileInDangerZoneEdgeCases(unittest.TestCase): + + def test_exact_equality(self): + result = framework._file_in_danger_zone( + "src/payments/", ["src/payments/"] + ) + self.assertEqual(result, "src/payments/") + + def test_empty_zone_string_skipped(self): + """Empty string zone is safely skipped (no false positive).""" + result = framework._file_in_danger_zone("any/file.py", [""]) + self.assertIsNone(result) + + def test_multiple_zones_returns_first_match(self): + result = framework._file_in_danger_zone( + "src/payments/stripe.py", + ["src/", "src/payments/"] + ) + # Should return first matching zone + self.assertEqual(result, "src/") + + +# --------------------------------------------------------------------------- +# detect_patterns() — velocity UNKNOWN with < 4 entries +# --------------------------------------------------------------------------- + +class TestDetectPatternsEdgeCases(unittest.TestCase): + + def test_velocity_unknown_with_three_entries(self): + history = [ + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "UNKNOWN") + + def test_velocity_unknown_with_five_entries(self): + """5 entries is below the 6-entry threshold — still UNKNOWN.""" + history = [ + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "UNKNOWN") + + def test_velocity_unknown_with_one_entry(self): + history = [{"verdict": "PASS", "stage": "BUILD"}] + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "UNKNOWN") + + def test_retry_rate_window_larger_than_history(self): + history = [{"verdict": "RETRY", "stage": "BUILD"}] + result = framework.detect_patterns(history) + self.assertAlmostEqual(result["retry_rate"], 1.0) + + def test_root_cause_no_hot_files_prefix(self): + """Detail without 'Hot files:' prefix → empty hot_files.""" + gates = { + "ROOT_CAUSE": { + "status": "PASS", + "detail": "No repeated edits detected", + } + } + result = framework.detect_patterns([], gates) + self.assertEqual(result["hot_files"], []) + + +# --------------------------------------------------------------------------- +# read_reflective_log() — last_n edge cases +# --------------------------------------------------------------------------- + +class TestReadReflectiveLogEdgeCases(unittest.TestCase): + + def setUp(self): + self.tmp = _make_tmp() + os.makedirs(os.path.join(self.tmp, ".ouro"), exist_ok=True) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_last_n_larger_than_entries(self): + for i in range(3): + framework.write_reflective_log( + self.tmp, {"iteration": i} + ) + entries = framework.read_reflective_log(self.tmp, last_n=100) + self.assertEqual(len(entries), 3) + + def test_read_oserror_returns_empty(self): + log_path = os.path.join(self.tmp, ".ouro", "reflective-log.jsonl") + _write(log_path, '{"a": 1}\n') + original_open = open + + def fail_open(path, *args, **kwargs): + if "reflective-log" in str(path): + raise OSError("permission denied") + return original_open(path, *args, **kwargs) + + with patch("builtins.open", side_effect=fail_open): + entries = framework.read_reflective_log(self.tmp) + self.assertEqual(entries, []) + + +# --------------------------------------------------------------------------- +# _scan_files() — CI detection for .circleci and .gitlab-ci +# --------------------------------------------------------------------------- + +class TestScanFilesCIDetection(unittest.TestCase): + """CI detection covers all three CI directory conventions.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_circleci_detected(self): + _write(os.path.join(self.tmp, ".circleci", "config.yml"), "version: 2\n") + scan = prepare.scan_project(self.tmp) + self.assertTrue(scan["has_ci"]) + + def test_gitlab_ci_detected(self): + # .gitlab-ci is a directory check in rel_root comparison + _write(os.path.join(self.tmp, ".gitlab-ci", "pipeline.yml"), "stages:\n") + scan = prepare.scan_project(self.tmp) + self.assertTrue(scan["has_ci"]) + + def test_github_workflows_detected(self): + _write(os.path.join(self.tmp, ".github", "workflows", "ci.yml"), "name: CI\n") + scan = prepare.scan_project(self.tmp) + self.assertTrue(scan["has_ci"]) + + +# --------------------------------------------------------------------------- +# show_status() — edge cases +# --------------------------------------------------------------------------- + +class TestShowStatusEdgeCases(unittest.TestCase): + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_missing_project_name_shows_unknown(self): + _make_state(self.tmp) + state_path = os.path.join(self.tmp, ".ouro", "state.json") + with open(state_path) as f: + state = json.load(f) + del state["project_name"] + with open(state_path, "w") as f: + json.dump(state, f) + + buf = StringIO() + with patch("sys.stdout", buf): + framework.show_status(self.tmp) + self.assertIn("Unknown", buf.getvalue()) + + def test_phase_not_none_but_total_zero_shows_na(self): + _make_state(self.tmp, current_phase=1, total_phases=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.show_status(self.tmp) + self.assertIn("N/A", buf.getvalue()) + + +# --------------------------------------------------------------------------- +# check_bound() — parsed details output +# --------------------------------------------------------------------------- + +class TestCheckBoundParsedDetails(unittest.TestCase): + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_shows_parsed_danger_zones(self): + content = ("## BOUND\n" + "### DANGER ZONES\n" + "- `src/payments/` — payments\n" + "- `migrations/` — DB schema\n" + "### NEVER DO\n- Never delete\n" + "### IRON LAWS\n- Coverage > 90%\n") + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + buf = StringIO() + with patch("sys.stdout", buf): + framework.check_bound(self.tmp) + output = buf.getvalue() + self.assertIn("Parsed DANGER ZONES: 2", output) + self.assertIn("src/payments/", output) + + def test_shows_parsed_iron_laws(self): + content = ("## BOUND\n" + "### DANGER ZONES\n- `src/` — core\n" + "### NEVER DO\n- Never skip\n" + "### IRON LAWS\n" + "- All APIs include request_id\n" + "- Coverage > 90%\n") + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + buf = StringIO() + with patch("sys.stdout", buf): + framework.check_bound(self.tmp) + output = buf.getvalue() + self.assertIn("Parsed IRON LAWS: 2", output) + + def test_truncates_at_five_danger_zones(self): + zones = "\n".join(f"- `zone{i}/` — zone {i}" for i in range(8)) + content = f"## BOUND\n### DANGER ZONES\n{zones}\n### NEVER DO\n- X\n### IRON LAWS\n- Y\n" + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + buf = StringIO() + with patch("sys.stdout", buf): + framework.check_bound(self.tmp) + output = buf.getvalue() + # Should show "Parsed DANGER ZONES: 8" but only list 5 + self.assertIn("Parsed DANGER ZONES: 8", output) + self.assertIn("zone4/", output) + self.assertNotIn("zone5/", output) + + +# --------------------------------------------------------------------------- +# log_phase_result() with SKIP verdict +# --------------------------------------------------------------------------- + +class TestLogSkipVerdict(unittest.TestCase): + + def setUp(self): + self.tmp = _make_tmp() + _make_state(self.tmp) + results_path = os.path.join(self.tmp, "ouro-results.tsv") + with open(results_path, "w") as f: + f.write("phase\tverdict\n") + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run") + def test_skip_verdict_logged(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "SKIP", "skipped phase") + self.assertIn("Logged", buf.getvalue()) + # Verify TSV has SKIP + with open(os.path.join(self.tmp, "ouro-results.tsv")) as f: + content = f.read() + self.assertIn("SKIP", content) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_edge_case_fixes.py b/tests/test_edge_case_fixes.py new file mode 100644 index 0000000..10c98e5 --- /dev/null +++ b/tests/test_edge_case_fixes.py @@ -0,0 +1,268 @@ +""" +Regression tests for the 4 behavioral edge cases found in E2E testing: + +1. init snapshot bound_defined doesn't update → verify now refreshes it +2. Single RETRY triggers DECELERATING → requires 6+ entries + 0.3 threshold +3. All WARN/SKIP gates → overall was PASS, now correctly WARN +4. DZ substring match "auth" vs "unauthorized.py" → path-segment matching + +Run with: + python3 -m pytest tests/test_edge_case_fixes.py -v +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +from unittest.mock import patch, MagicMock + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, PROJECT_ROOT) + +import framework + + +def _make_tmp(): + return tempfile.mkdtemp() + + +def _write(path, content=""): + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write(content) + + +def _make_state(project_path, **overrides): + state = { + "version": "0.1.0", + "project_name": "test", + "current_stage": "BUILD", + "current_phase": 1, + "total_phases": 3, + "bound_defined": False, + "history": [], + } + state.update(overrides) + ouro_dir = os.path.join(project_path, ".ouro") + os.makedirs(ouro_dir, exist_ok=True) + with open(os.path.join(ouro_dir, "state.json"), "w") as f: + json.dump(state, f, indent=2) + return state + + +# --------------------------------------------------------------------------- +# Edge Case 1: bound_defined refresh +# --------------------------------------------------------------------------- + +class TestBoundDefinedRefresh(unittest.TestCase): + """verify refreshes bound_defined when CLAUDE.md is added after init.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run") + def test_bound_defined_updated_when_claude_md_added(self, mock_run): + """Init without BOUND → add CLAUDE.md → verify updates state.""" + mock_run.return_value = MagicMock(stdout="", returncode=0) + _make_state(self.tmp, bound_defined=False) + + # Initially no BOUND + state = framework.load_state(self.tmp) + self.assertFalse(state["bound_defined"]) + + # Now add CLAUDE.md with BOUND markers + _write(os.path.join(self.tmp, "CLAUDE.md"), + "## BOUND\n### DANGER ZONES\n- `src/` — core\n" + "### IRON LAWS\n- Rule 1\n") + + # Run verify — should refresh bound_defined + framework.run_verification(self.tmp) + + state = framework.load_state(self.tmp) + self.assertTrue(state["bound_defined"]) + + @patch("framework.subprocess.run") + def test_bound_defined_cleared_when_claude_md_removed(self, mock_run): + """Remove CLAUDE.md → verify clears bound_defined.""" + mock_run.return_value = MagicMock(stdout="", returncode=0) + _write(os.path.join(self.tmp, "CLAUDE.md"), "## BOUND\nstuff\n") + _make_state(self.tmp, bound_defined=True) + + # Remove CLAUDE.md + os.remove(os.path.join(self.tmp, "CLAUDE.md")) + + framework.run_verification(self.tmp) + + state = framework.load_state(self.tmp) + self.assertFalse(state["bound_defined"]) + + +# --------------------------------------------------------------------------- +# Edge Case 2: Single RETRY no longer triggers DECELERATING +# --------------------------------------------------------------------------- + +class TestVelocitySensitivity(unittest.TestCase): + """Single RETRY among passes should not trigger DECELERATING.""" + + def test_single_retry_among_passes_is_unknown(self): + """[PASS, PASS, PASS, RETRY] has only 4 entries → UNKNOWN.""" + history = [ + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "UNKNOWN") + + def test_single_retry_among_five_passes_is_unknown(self): + """[PASS, PASS, PASS, PASS, RETRY] has 5 entries → still UNKNOWN.""" + history = [ + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "UNKNOWN") + + def test_genuine_deceleration_requires_clear_signal(self): + """3 PASS then 3 FAIL → genuine DECELERATING (swing = 1.0 > 0.3).""" + history = ( + [{"verdict": "PASS", "stage": "BUILD"}] * 3 + + [{"verdict": "FAIL", "stage": "BUILD"}] * 3 + ) + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "DECELERATING") + + def test_mild_variation_is_stable(self): + """Alternating results within threshold → STABLE.""" + history = ( + [{"verdict": "PASS", "stage": "BUILD"}] * 2 + + [{"verdict": "FAIL", "stage": "BUILD"}] * 1 + + [{"verdict": "PASS", "stage": "BUILD"}] * 2 + + [{"verdict": "FAIL", "stage": "BUILD"}] * 1 + ) + result = framework.detect_patterns(history) + # first half: [P,P,F] = 0.67, second half: [P,P,F] = 0.67 + # diff = 0.0, within ±0.3 threshold → STABLE + self.assertEqual(result["velocity_trend"], "STABLE") + + +# --------------------------------------------------------------------------- +# Edge Case 3: All WARN/SKIP → overall WARN +# --------------------------------------------------------------------------- + +class TestOverallWarnWhenNoPass(unittest.TestCase): + """Empty/unconfigured project should get overall=WARN, not PASS.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run", + side_effect=FileNotFoundError("no git")) + def test_all_warn_skip_returns_overall_warn(self, mock_run): + """No CLAUDE.md, no git, no tests → all WARN/SKIP → overall WARN.""" + results = framework.run_verification(self.tmp) + self.assertEqual(results["overall"], "WARN") + + @patch("framework.subprocess.run") + def test_mix_of_pass_and_warn_returns_pass(self, mock_run): + """At least one PASS gate → overall PASS (not WARN).""" + mock_run.return_value = MagicMock(stdout="", returncode=0) + _write(os.path.join(self.tmp, "CLAUDE.md"), + "## BOUND\n### DANGER ZONES\n- `src/` — core\n" + "### IRON LAWS\n- Rule 1\n") + _write(os.path.join(self.tmp, "test_x.py"), "") + results = framework.run_verification(self.tmp) + # EXIST=PASS, so overall should be PASS not WARN + self.assertEqual(results["overall"], "PASS") + + +# --------------------------------------------------------------------------- +# Edge Case 4: DZ substring false positive +# --------------------------------------------------------------------------- + +class TestDangerZonePathSegmentMatching(unittest.TestCase): + """Path-segment matching prevents false positives.""" + + def test_auth_dir_does_not_match_unauthorized_file(self): + """Zone 'auth/' must NOT match 'unauthorized.py'.""" + result = framework._file_in_danger_zone("unauthorized.py", ["auth/"]) + self.assertIsNone(result) + + def test_auth_dir_does_not_match_authenticated_dir(self): + """Zone 'auth/' must NOT match 'authenticated/login.py'.""" + result = framework._file_in_danger_zone( + "authenticated/login.py", ["auth/"] + ) + self.assertIsNone(result) + + def test_auth_dir_matches_auth_subpath(self): + """Zone 'auth/' correctly matches 'auth/login.py'.""" + result = framework._file_in_danger_zone("auth/login.py", ["auth/"]) + self.assertEqual(result, "auth/") + + def test_auth_dir_matches_nested_auth(self): + """Zone 'auth/' matches 'src/auth/login.py' — prefix must match.""" + # "auth/" as a directory prefix only matches paths STARTING with "auth/" + result = framework._file_in_danger_zone("src/auth/login.py", ["auth/"]) + # This should NOT match since "src/auth/login.py" doesn't start with "auth/" + self.assertIsNone(result) + + def test_full_path_zone_matches_exactly(self): + """Zone 'src/auth/' matches 'src/auth/login.py'.""" + result = framework._file_in_danger_zone( + "src/auth/login.py", ["src/auth/"] + ) + self.assertEqual(result, "src/auth/") + + def test_file_zone_matches_exact_file(self): + """Zone 'auth/core.py' matches 'auth/core.py' exactly.""" + result = framework._file_in_danger_zone( + "auth/core.py", ["auth/core.py"] + ) + self.assertEqual(result, "auth/core.py") + + def test_file_zone_matches_in_nested_path(self): + """Zone 'auth/core.py' matches 'src/auth/core.py' via segment match.""" + result = framework._file_in_danger_zone( + "src/auth/core.py", ["auth/core.py"] + ) + self.assertEqual(result, "auth/core.py") + + def test_file_zone_does_not_match_partial_name(self): + """Zone 'core.py' does NOT match 'hardcore.py'.""" + result = framework._file_in_danger_zone("hardcore.py", ["core.py"]) + self.assertIsNone(result) + + def test_migration_dir_matches_subfiles(self): + """Zone 'migrations/' matches 'migrations/001_init.sql'.""" + result = framework._file_in_danger_zone( + "migrations/001_init.sql", ["migrations/"] + ) + self.assertEqual(result, "migrations/") + + def test_migration_dir_does_not_match_my_migrations(self): + """Zone 'migrations/' does NOT match 'my_migrations/x.sql'.""" + result = framework._file_in_danger_zone( + "my_migrations/x.sql", ["migrations/"] + ) + self.assertIsNone(result) + + def test_empty_file_path_returns_none(self): + result = framework._file_in_danger_zone("", ["auth/"]) + self.assertIsNone(result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_framework.py b/tests/test_framework.py index eec3d65..6383eb6 100644 --- a/tests/test_framework.py +++ b/tests/test_framework.py @@ -632,11 +632,16 @@ def test_overall_pass_when_no_failures(self): self.assertEqual(results["overall"], "PASS") def test_overall_fail_when_gate_fails(self): - # Force EXIST gate to FAIL by expecting BOUND but no CLAUDE.md + # Force EXIST gate to FAIL by mocking run_gates directly _make_state(self.tmp, bound_defined=True) + _write(os.path.join(self.tmp, "CLAUDE.md"), "## BOUND\nrules\n") mock_result = MagicMock() mock_result.stdout = "" - with patch("framework.subprocess.run", return_value=mock_result): + fake_gates = { + "EXIST": {"status": "FAIL", "detail": "forced fail for test"}, + } + with patch("framework.subprocess.run", return_value=mock_result), \ + patch("framework.run_gates", return_value=fake_gates): results = framework.run_verification(self.tmp) self.assertEqual(results["overall"], "FAIL") @@ -664,10 +669,17 @@ def test_result_has_layer2_self(self): self.assertIsInstance(results["layer2_self"], dict) def test_failures_key_populated_on_fail(self): + # Force a genuine FAIL: create CLAUDE.md with bound markers so + # bound_defined stays True, then mock run_gates to return a FAIL. _make_state(self.tmp, bound_defined=True) + _write(os.path.join(self.tmp, "CLAUDE.md"), "## BOUND\nrules\n") mock_result = MagicMock() mock_result.stdout = "" - with patch("framework.subprocess.run", return_value=mock_result): + fake_gates = { + "EXIST": {"status": "FAIL", "detail": "forced fail for test"}, + } + with patch("framework.subprocess.run", return_value=mock_result), \ + patch("framework.run_gates", return_value=fake_gates): results = framework.run_verification(self.tmp) self.assertIn("failures", results) self.assertIsInstance(results["failures"], list) diff --git a/tests/test_framework_extra.py b/tests/test_framework_extra.py index bd7d7e8..d61ec60 100644 --- a/tests/test_framework_extra.py +++ b/tests/test_framework_extra.py @@ -306,15 +306,18 @@ def test_bound_violations_zero_when_no_gate_fails(self): self.assertEqual(data_row[2], "0") # bound_violations column def test_bound_violations_incremented_when_gate_fails(self): - # Force EXIST gate to FAIL + # Force gate FAIL by mocking run_gates directly _make_state(self.tmp, current_phase=1, total_phases=3, bound_defined=True) + _write(os.path.join(self.tmp, "CLAUDE.md"), "## BOUND\nrules\n") results_path = os.path.join(self.tmp, "ouro-results.tsv") with open(results_path, "w") as f: f.write("phase\tverdict\tbound_violations\ttest_pass_rate\tscope_deviation\tnotes\n") mock_result = MagicMock() mock_result.stdout = "" - with patch("framework.subprocess.run", return_value=mock_result): + fake_gates = {"EXIST": {"status": "FAIL", "detail": "forced"}} + with patch("framework.subprocess.run", return_value=mock_result), \ + patch("framework.run_gates", return_value=fake_gates): framework.log_phase_result(self.tmp, "FAIL") with open(results_path) as f: diff --git a/tests/test_new_features.py b/tests/test_new_features.py new file mode 100644 index 0000000..1518ebf --- /dev/null +++ b/tests/test_new_features.py @@ -0,0 +1,609 @@ +""" +Tests for new framework features: +- parse_claude_md() parser +- detect_complexity() routing +- _file_in_danger_zone() helper +- RECALL gate +- Layer 3 verification triggers +- advance_phase() edge cases +- run_verification() coordination +- prepare.py integration (init → scan → template) + +Run with: + python3 -m pytest tests/test_new_features.py -v +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +from io import StringIO +from unittest.mock import patch, MagicMock + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, PROJECT_ROOT) + +import framework +import prepare + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_tmp() -> str: + return tempfile.mkdtemp() + + +def _write(path: str, content: str = ""): + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write(content) + + +def _make_state(project_path: str, **overrides) -> dict: + state = { + "version": "0.1.0", + "project_name": "test-project", + "current_stage": "BUILD", + "current_phase": 1, + "total_phases": 3, + "bound_defined": False, + "history": [], + } + state.update(overrides) + ouro_dir = os.path.join(project_path, ".ouro") + os.makedirs(ouro_dir, exist_ok=True) + with open(os.path.join(ouro_dir, "state.json"), "w") as f: + json.dump(state, f, indent=2) + return state + + +FULL_CLAUDE_MD = """\ +# CLAUDE.md + +## BOUND + +### DANGER ZONES +- `src/payments/` — payment processing logic +- `migrations/` — database migrations +- `auth/core.py` — authentication core + +### NEVER DO +- Never use float for money calculations +- Never delete migration files +- Never bypass auth checks + +### IRON LAWS +- All API responses must include request_id +- Test coverage must stay above 90% +- All database queries must use parameterized statements +""" + + +# --------------------------------------------------------------------------- +# parse_claude_md() +# --------------------------------------------------------------------------- + +class TestParseClaude(unittest.TestCase): + """parse_claude_md() correctly extracts BOUND sections.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_full_parse_extracts_danger_zones(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(result["danger_zones"], + ["src/payments/", "migrations/", "auth/core.py"]) + + def test_full_parse_extracts_never_do(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(len(result["never_do"]), 3) + self.assertIn("Never use float for money calculations", + result["never_do"]) + + def test_full_parse_extracts_iron_laws(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(len(result["iron_laws"]), 3) + self.assertIn("All API responses must include request_id", + result["iron_laws"]) + + def test_has_bound_true_when_markers_present(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + result = framework.parse_claude_md(self.tmp) + self.assertTrue(result["has_bound"]) + + def test_has_bound_false_when_no_markers(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), "# Just a project\n") + result = framework.parse_claude_md(self.tmp) + self.assertFalse(result["has_bound"]) + + def test_missing_file_returns_empty(self): + result = framework.parse_claude_md(self.tmp) + self.assertFalse(result["has_bound"]) + self.assertEqual(result["danger_zones"], []) + self.assertEqual(result["never_do"], []) + self.assertEqual(result["iron_laws"], []) + + def test_raw_content_populated(self): + content = "# Test\n## BOUND\nstuff" + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(result["raw_content"], content) + + def test_oserror_returns_empty(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + original_open = open + + def patched_open(path, *args, **kwargs): + if "CLAUDE.md" in str(path): + raise OSError("cannot read") + return original_open(path, *args, **kwargs) + + with patch("builtins.open", side_effect=patched_open): + result = framework.parse_claude_md(self.tmp) + self.assertFalse(result["has_bound"]) + + def test_partial_bound_only_danger_zones(self): + content = "## BOUND\n### DANGER ZONES\n- `src/core/` — core logic\n" + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(result["danger_zones"], ["src/core/"]) + self.assertEqual(result["never_do"], []) + self.assertEqual(result["iron_laws"], []) + + def test_danger_zone_with_multiple_backtick_items(self): + content = ("### DANGER ZONES\n" + "- `src/a/` — thing A\n" + "- `src/b/` — thing B\n") + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + result = framework.parse_claude_md(self.tmp) + self.assertEqual(result["danger_zones"], ["src/a/", "src/b/"]) + + +# --------------------------------------------------------------------------- +# _file_in_danger_zone() +# --------------------------------------------------------------------------- + +class TestFileInDangerZone(unittest.TestCase): + """_file_in_danger_zone() matches file paths against zones.""" + + def test_exact_prefix_match(self): + result = framework._file_in_danger_zone( + "src/payments/stripe.py", ["src/payments/"] + ) + self.assertEqual(result, "src/payments/") + + def test_file_zone_segment_match_in_nested_path(self): + """File zone 'auth/core.py' matches 'lib/auth/core.py' via segment subsequence.""" + result = framework._file_in_danger_zone( + "lib/auth/core.py", ["auth/core.py"] + ) + self.assertEqual(result, "auth/core.py") + + def test_no_match_returns_none(self): + result = framework._file_in_danger_zone( + "src/utils/helpers.py", ["src/payments/", "migrations/"] + ) + self.assertIsNone(result) + + def test_empty_zones_returns_none(self): + result = framework._file_in_danger_zone("any/file.py", []) + self.assertIsNone(result) + + +# --------------------------------------------------------------------------- +# detect_complexity() +# --------------------------------------------------------------------------- + +class TestDetectComplexity(unittest.TestCase): + """detect_complexity() routes based on file count and DANGER ZONE.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_trivial_single_file_no_dz(self): + result = framework.detect_complexity(self.tmp, ["file.py"], []) + self.assertEqual(result["level"], "trivial") + + def test_trivial_no_files(self): + result = framework.detect_complexity(self.tmp, [], []) + self.assertEqual(result["level"], "trivial") + + def test_simple_two_files_no_dz(self): + result = framework.detect_complexity( + self.tmp, ["a.py", "b.py"], [] + ) + self.assertEqual(result["level"], "simple") + + def test_simple_three_files_no_dz(self): + result = framework.detect_complexity( + self.tmp, ["a.py", "b.py", "c.py"], [] + ) + self.assertEqual(result["level"], "simple") + + def test_complex_many_files_no_dz(self): + files = [f"f{i}.py" for i in range(5)] + result = framework.detect_complexity(self.tmp, files, []) + self.assertEqual(result["level"], "complex") + + def test_complex_when_dz_touched(self): + result = framework.detect_complexity( + self.tmp, + ["src/payments/stripe.py"], + ["src/payments/"] + ) + self.assertEqual(result["level"], "complex") + + def test_result_has_route_dict(self): + result = framework.detect_complexity(self.tmp, ["a.py"], []) + self.assertIn("route", result) + self.assertIn("max_lines", result["route"]) + + def test_result_has_reason(self): + result = framework.detect_complexity(self.tmp, ["a.py"], []) + self.assertIsInstance(result["reason"], str) + self.assertGreater(len(result["reason"]), 0) + + +# --------------------------------------------------------------------------- +# RECALL gate +# --------------------------------------------------------------------------- + +class TestRecallGate(unittest.TestCase): + """RECALL gate checks BOUND constraint accessibility.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run") + def test_recall_pass_when_full_bound(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + gates = framework.run_gates(self.tmp) + self.assertIn("RECALL", gates) + self.assertEqual(gates["RECALL"]["status"], "PASS") + + @patch("framework.subprocess.run") + def test_recall_warn_when_no_bound(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + gates = framework.run_gates(self.tmp) + self.assertEqual(gates["RECALL"]["status"], "WARN") + + @patch("framework.subprocess.run") + def test_recall_warn_incomplete_bound(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + # BOUND exists but no IRON LAWS + content = "## BOUND\n### DANGER ZONES\n- `src/` — core\n" + _write(os.path.join(self.tmp, "CLAUDE.md"), content) + gates = framework.run_gates(self.tmp) + self.assertEqual(gates["RECALL"]["status"], "WARN") + self.assertIn("incomplete", gates["RECALL"]["detail"]) + + @patch("framework.subprocess.run") + def test_recall_detail_shows_counts(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + gates = framework.run_gates(self.tmp) + self.assertIn("3 zones", gates["RECALL"]["detail"]) + self.assertIn("3 laws", gates["RECALL"]["detail"]) + + +# --------------------------------------------------------------------------- +# Layer 3 verification triggers +# --------------------------------------------------------------------------- + +class TestLayer3Triggers(unittest.TestCase): + """_check_layer3_triggers() detects human review conditions.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_no_triggers_returns_not_required(self): + results = { + "layer1_gates": { + "EXIST": {"status": "PASS"}, + "RELEVANCE": {"status": "PASS", "files": []}, + } + } + review = framework._check_layer3_triggers(self.tmp, results) + self.assertFalse(review["required"]) + + def test_danger_zone_files_trigger_review(self): + results = { + "layer1_gates": { + "RELEVANCE": { + "status": "WARN", + "files": ["src/payments/stripe.py"], + "danger_zone_files": ["src/payments/stripe.py"], + }, + } + } + review = framework._check_layer3_triggers(self.tmp, results) + self.assertTrue(review["required"]) + self.assertTrue(any("DANGER ZONE" in r for r in review["reasons"])) + + def test_gate_failure_triggers_review(self): + results = { + "layer1_gates": { + "EXIST": {"status": "FAIL", "detail": "missing"}, + "RELEVANCE": {"status": "PASS", "files": []}, + } + } + review = framework._check_layer3_triggers(self.tmp, results) + self.assertTrue(review["required"]) + self.assertTrue(any("gate failed" in r for r in review["reasons"])) + + def test_consecutive_retries_trigger_review(self): + _make_state(self.tmp, history=[ + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + ]) + results = { + "layer1_gates": { + "RELEVANCE": {"status": "PASS", "files": []}, + } + } + review = framework._check_layer3_triggers(self.tmp, results) + self.assertTrue(review["required"]) + self.assertTrue(any("RETRY" in r for r in review["reasons"])) + + def test_two_retries_do_not_trigger(self): + _make_state(self.tmp, history=[ + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + ]) + results = { + "layer1_gates": { + "RELEVANCE": {"status": "PASS", "files": []}, + } + } + review = framework._check_layer3_triggers(self.tmp, results) + # Only RETRY check — no other triggers + retry_reasons = [r for r in review["reasons"] if "RETRY" in r] + self.assertEqual(len(retry_reasons), 0) + + +# --------------------------------------------------------------------------- +# run_verification() coordination +# --------------------------------------------------------------------------- + +class TestRunVerificationCoordination(unittest.TestCase): + """run_verification() correctly combines Layer 1+2+3 results.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run") + def test_all_pass_returns_pass(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + results = framework.run_verification(self.tmp) + self.assertEqual(results["overall"], "PASS") + + @patch("framework.subprocess.run") + def test_has_layer3_key(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + results = framework.run_verification(self.tmp) + self.assertIn("layer3_review", results) + + @patch("framework.subprocess.run") + def test_gate_fail_sets_overall_fail(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + _make_state(self.tmp, bound_defined=True) + _write(os.path.join(self.tmp, "CLAUDE.md"), "## BOUND\nrules\n") + fake_gates = {"EXIST": {"status": "FAIL", "detail": "forced"}} + with patch("framework.run_gates", return_value=fake_gates): + results = framework.run_verification(self.tmp) + self.assertEqual(results["overall"], "FAIL") + self.assertIn("EXIST", results.get("failures", [])) + + @patch("framework.subprocess.run") + def test_failures_list_populated(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + _make_state(self.tmp, bound_defined=True) + _write(os.path.join(self.tmp, "CLAUDE.md"), "## BOUND\nrules\n") + fake_gates = {"EXIST": {"status": "FAIL", "detail": "forced"}} + with patch("framework.run_gates", return_value=fake_gates): + results = framework.run_verification(self.tmp) + self.assertIsInstance(results.get("failures"), list) + + +# --------------------------------------------------------------------------- +# advance_phase() edge cases +# --------------------------------------------------------------------------- + +class TestAdvancePhaseEdgeCases(unittest.TestCase): + """Edge cases for advance_phase().""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_advance_with_total_zero(self): + """total_phases=0 and current_phase=0 → goes to LOOP.""" + _make_state(self.tmp, current_phase=0, total_phases=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.advance_phase(self.tmp) + state = framework.load_state(self.tmp) + self.assertEqual(state["current_stage"], "LOOP") + + def test_advance_at_boundary_exactly_equal(self): + """current_phase == total_phases → LOOP transition.""" + _make_state(self.tmp, current_phase=3, total_phases=3) + buf = StringIO() + with patch("sys.stdout", buf): + framework.advance_phase(self.tmp) + state = framework.load_state(self.tmp) + self.assertEqual(state["current_stage"], "LOOP") + self.assertIsNone(state["current_phase"]) + + def test_advance_none_phase_prints_message(self): + """No phase plan → message printed, state unchanged.""" + _make_state(self.tmp, current_phase=None, total_phases=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.advance_phase(self.tmp) + output = buf.getvalue() + self.assertIn("No phase plan", output) + + def test_advance_increments_correctly(self): + """Normal case: phase increments by 1.""" + _make_state(self.tmp, current_phase=2, total_phases=5) + buf = StringIO() + with patch("sys.stdout", buf): + framework.advance_phase(self.tmp) + state = framework.load_state(self.tmp) + self.assertEqual(state["current_phase"], 3) + self.assertEqual(state["current_stage"], "BUILD") + + +# --------------------------------------------------------------------------- +# log_phase_result() TSV error handling +# --------------------------------------------------------------------------- + +class TestLogPhaseResultErrorHandling(unittest.TestCase): + """log_phase_result() handles TSV write failures gracefully.""" + + def setUp(self): + self.tmp = _make_tmp() + _make_state(self.tmp) + # Create the results TSV + results_path = os.path.join(self.tmp, "ouro-results.tsv") + with open(results_path, "w") as f: + f.write("phase\tverdict\n") + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run") + def test_log_succeeds_normally(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "PASS", "test note") + self.assertIn("Logged", buf.getvalue()) + + +# --------------------------------------------------------------------------- +# prepare.py integration tests +# --------------------------------------------------------------------------- + +class TestPrepareIntegration(unittest.TestCase): + """Integration test: init → scan → template flow.""" + + def setUp(self): + self.tmp = _make_tmp() + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_init_then_scan_consistent(self): + """After init, scan should show bound_detected matching state.""" + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + buf = StringIO() + with patch("sys.stdout", buf): + prepare.init_ouro(self.tmp) + + # State says bound_defined=True + with open(os.path.join(self.tmp, ".ouro", "state.json")) as f: + state = json.load(f) + self.assertTrue(state["bound_defined"]) + + # Scan also says bound_detected=True + scan = prepare.scan_project(self.tmp) + self.assertTrue(scan["bound_detected"]) + + def test_scan_populates_danger_zones(self): + """scan_project() now fills danger_zones from parse_claude_md().""" + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + scan = prepare.scan_project(self.tmp) + self.assertEqual(scan["danger_zones"], + ["src/payments/", "migrations/", "auth/core.py"]) + + def test_scan_empty_danger_zones_without_claude_md(self): + scan = prepare.scan_project(self.tmp) + self.assertEqual(scan["danger_zones"], []) + + def test_template_then_scan_detects_bound(self): + """After installing claude template, scan detects BOUND.""" + buf = StringIO() + with patch("sys.stdout", buf): + prepare.install_template("claude", self.tmp) + + scan = prepare.scan_project(self.tmp) + self.assertTrue(scan["has_claude_md"]) + # Template has BOUND markers + self.assertTrue(scan["bound_detected"]) + + def test_init_scan_template_full_cycle(self): + """Full cycle: template → init → scan → verify consistency.""" + # Install template + buf = StringIO() + with patch("sys.stdout", buf): + prepare.install_template("claude", self.tmp) + prepare.init_ouro(self.tmp) + + # Verify state exists and is valid + state_path = os.path.join(self.tmp, ".ouro", "state.json") + self.assertTrue(os.path.exists(state_path)) + with open(state_path) as f: + state = json.load(f) + self.assertEqual(state["current_stage"], "BOUND") + + # Verify scan matches + scan = prepare.scan_project(self.tmp) + self.assertTrue(scan["has_claude_md"]) + + +# --------------------------------------------------------------------------- +# Shared constants +# --------------------------------------------------------------------------- + +class TestSharedConstants(unittest.TestCase): + """Shared constants are accessible from both framework and prepare.""" + + def test_bound_all_markers_includes_section_markers(self): + for marker in framework.BOUND_SECTION_MARKERS: + self.assertIn(marker, framework.BOUND_ALL_MARKERS) + + def test_bound_all_markers_includes_content_markers(self): + for marker in framework.BOUND_CONTENT_MARKERS: + self.assertIn(marker, framework.BOUND_ALL_MARKERS) + + def test_claude_md_filename_constant(self): + self.assertEqual(framework.CLAUDE_MD_FILENAME, "CLAUDE.md") + + def test_magic_constants_are_positive(self): + self.assertGreater(framework.GIT_TIMEOUT_SECONDS, 0) + self.assertGreater(framework.HOT_FILE_EDIT_THRESHOLD, 0) + self.assertGreater(framework.HISTORY_LIMIT, 0) + self.assertGreater(framework.MAX_RETRY_BEFORE_ESCALATE, 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_reflective_log.py b/tests/test_reflective_log.py new file mode 100644 index 0000000..d85bad9 --- /dev/null +++ b/tests/test_reflective_log.py @@ -0,0 +1,716 @@ +""" +Tests for the three-layer reflective logging system: +- detect_patterns() — behavioral pattern detection +- build_reflective_entry() — three-layer structured entry construction +- write_reflective_log() / read_reflective_log() — JSONL persistence +- print_reflective_summary() — human-readable output +- Integration with log_phase_result() + +Run with: + python3 -m pytest tests/test_reflective_log.py -v +""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +from io import StringIO +from unittest.mock import patch, MagicMock + +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, PROJECT_ROOT) + +import framework + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_tmp() -> str: + return tempfile.mkdtemp() + + +def _write(path: str, content: str = ""): + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write(content) + + +def _make_state(project_path: str, **overrides) -> dict: + state = { + "version": "0.1.0", + "project_name": "test-project", + "current_stage": "BUILD", + "current_phase": 1, + "total_phases": 3, + "bound_defined": False, + "history": [], + } + state.update(overrides) + ouro_dir = os.path.join(project_path, ".ouro") + os.makedirs(ouro_dir, exist_ok=True) + with open(os.path.join(ouro_dir, "state.json"), "w") as f: + json.dump(state, f, indent=2) + return state + + +FULL_CLAUDE_MD = """\ +# CLAUDE.md + +## BOUND + +### DANGER ZONES +- `src/payments/` — payment processing logic +- `migrations/` — database migrations + +### NEVER DO +- Never use float for money calculations +- Never delete migration files + +### IRON LAWS +- All API responses must include request_id +- Test coverage must stay above 90% +""" + + +# --------------------------------------------------------------------------- +# detect_patterns() +# --------------------------------------------------------------------------- + +class TestDetectPatterns(unittest.TestCase): + """detect_patterns() identifies behavioral patterns in history.""" + + def test_empty_history_returns_defaults(self): + result = framework.detect_patterns([]) + self.assertEqual(result["consecutive_failures"], 0) + self.assertFalse(result["stuck_loop"]) + self.assertEqual(result["velocity_trend"], "UNKNOWN") + self.assertEqual(result["hot_files"], []) + self.assertFalse(result["drift_signal"]) + self.assertEqual(result["retry_rate"], 0.0) + + def test_consecutive_failures_counted_from_tail(self): + history = [ + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["consecutive_failures"], 3) + + def test_pass_breaks_consecutive_count(self): + history = [ + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["consecutive_failures"], 1) + + def test_retry_rate_calculated(self): + history = [ + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertAlmostEqual(result["retry_rate"], 0.4, places=1) + + def test_stuck_loop_detected(self): + history = [ + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertTrue(result["stuck_loop"]) + + def test_stuck_loop_not_detected_with_different_stages(self): + history = [ + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "VERIFY"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertFalse(result["stuck_loop"]) + + def test_stuck_loop_not_detected_with_pass(self): + history = [ + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertFalse(result["stuck_loop"]) + + def test_velocity_stable_with_consistent_passes(self): + history = [{"verdict": "PASS", "stage": "BUILD"}] * 6 + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "STABLE") + + def test_velocity_unknown_with_few_entries(self): + """< 6 entries → UNKNOWN (avoids false DECELERATING from single RETRY).""" + history = [ + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "UNKNOWN") + + def test_velocity_decelerating(self): + history = [ + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "DECELERATING") + + def test_velocity_accelerating(self): + history = [ + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + {"verdict": "PASS", "stage": "BUILD"}, + ] + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "ACCELERATING") + + def test_velocity_stalled(self): + history = [{"verdict": "FAIL", "stage": "BUILD"}] * 6 + result = framework.detect_patterns(history) + self.assertEqual(result["velocity_trend"], "STALLED") + + def test_hot_files_extracted_from_gates(self): + gates = { + "ROOT_CAUSE": { + "status": "WARN", + "detail": "Hot files: framework.py, prepare.py", + } + } + result = framework.detect_patterns([], gates) + self.assertEqual(result["hot_files"], ["framework.py", "prepare.py"]) + + def test_drift_signal_from_danger_zone_files(self): + gates = { + "RELEVANCE": { + "status": "WARN", + "danger_zone_files": ["src/payments/stripe.py"], + } + } + result = framework.detect_patterns([], gates) + self.assertTrue(result["drift_signal"]) + + def test_no_drift_without_dz_files(self): + gates = { + "RELEVANCE": { + "status": "PASS", + "files": ["utils.py"], + } + } + result = framework.detect_patterns([], gates) + self.assertFalse(result["drift_signal"]) + + +# --------------------------------------------------------------------------- +# build_reflective_entry() +# --------------------------------------------------------------------------- + +class TestBuildReflectiveEntry(unittest.TestCase): + """build_reflective_entry() constructs a valid three-layer entry.""" + + def setUp(self): + self.tmp = _make_tmp() + _make_state(self.tmp) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_entry_has_three_layers(self): + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification, "test note" + ) + self.assertIn("what", entry) + self.assertIn("why", entry) + self.assertIn("pattern", entry) + + def test_entry_has_timestamp(self): + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + self.assertIn("timestamp", entry) + self.assertIsInstance(entry["timestamp"], str) + + def test_what_layer_contains_verdict(self): + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "FAIL", verification + ) + self.assertEqual(entry["what"]["verdict"], "FAIL") + + def test_what_layer_contains_gates(self): + verification = { + "layer1_gates": { + "EXIST": {"status": "PASS", "detail": "ok"}, + "RECALL": {"status": "WARN", "detail": "incomplete"}, + }, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + self.assertIn("EXIST", entry["what"]["gates"]) + self.assertEqual(entry["what"]["gates"]["EXIST"]["status"], "PASS") + + def test_why_layer_contains_complexity(self): + verification = { + "layer1_gates": { + "RELEVANCE": {"status": "PASS", "detail": "2 files", "files": ["a.py", "b.py"]}, + }, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + self.assertIn(entry["why"]["complexity"], + ["trivial", "simple", "complex", "architectural"]) + + def test_why_layer_contains_bound_state(self): + _write(os.path.join(self.tmp, "CLAUDE.md"), FULL_CLAUDE_MD) + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + self.assertEqual(entry["why"]["bound_state"]["danger_zones"], 2) + self.assertEqual(entry["why"]["bound_state"]["iron_laws"], 2) + + def test_why_layer_contains_notes(self): + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification, "important context" + ) + self.assertEqual(entry["why"]["notes"], "important context") + + def test_pattern_layer_has_required_fields(self): + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + pattern = entry["pattern"] + self.assertIn("consecutive_failures", pattern) + self.assertIn("stuck_loop", pattern) + self.assertIn("velocity_trend", pattern) + self.assertIn("retry_rate", pattern) + self.assertIn("hot_files", pattern) + self.assertIn("drift_signal", pattern) + + def test_alerts_empty_when_healthy(self): + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + self.assertEqual(entry["alerts"], []) + + def test_alerts_populated_when_stuck(self): + _make_state(self.tmp, history=[ + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ]) + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "FAIL", + } + entry = framework.build_reflective_entry( + self.tmp, "FAIL", verification + ) + self.assertTrue(len(entry["alerts"]) > 0) + self.assertTrue(any("STUCK" in a for a in entry["alerts"])) + + def test_alerts_on_consecutive_escalate(self): + _make_state(self.tmp, history=[ + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + {"verdict": "RETRY", "stage": "BUILD"}, + ]) + verification = { + "layer1_gates": {}, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "FAIL", + } + entry = framework.build_reflective_entry( + self.tmp, "RETRY", verification + ) + self.assertTrue(any("ESCALATE" in a for a in entry["alerts"])) + + def test_entry_is_json_serializable(self): + verification = { + "layer1_gates": { + "EXIST": {"status": "PASS", "detail": "ok"}, + }, + "layer2_self": {}, + "layer3_review": {"required": False, "reasons": []}, + "overall": "PASS", + } + entry = framework.build_reflective_entry( + self.tmp, "PASS", verification + ) + # Must not raise + serialized = json.dumps(entry, ensure_ascii=False) + roundtrip = json.loads(serialized) + self.assertEqual(roundtrip["what"]["verdict"], "PASS") + + +# --------------------------------------------------------------------------- +# write_reflective_log() / read_reflective_log() +# --------------------------------------------------------------------------- + +class TestReflectiveLogPersistence(unittest.TestCase): + """JSONL persistence for the reflective log.""" + + def setUp(self): + self.tmp = _make_tmp() + os.makedirs(os.path.join(self.tmp, ".ouro"), exist_ok=True) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def test_write_creates_file(self): + entry = {"what": {"verdict": "PASS"}, "timestamp": "2026-01-01T00:00:00"} + framework.write_reflective_log(self.tmp, entry) + log_path = os.path.join(self.tmp, ".ouro", "reflective-log.jsonl") + self.assertTrue(os.path.exists(log_path)) + + def test_write_then_read_roundtrip(self): + entry = { + "what": {"verdict": "PASS"}, + "why": {"complexity": "simple"}, + "pattern": {"stuck_loop": False}, + "timestamp": "2026-01-01T00:00:00", + } + framework.write_reflective_log(self.tmp, entry) + entries = framework.read_reflective_log(self.tmp) + self.assertEqual(len(entries), 1) + self.assertEqual(entries[0]["what"]["verdict"], "PASS") + + def test_multiple_writes_append(self): + for i in range(3): + entry = {"iteration": i, "timestamp": f"2026-01-0{i+1}T00:00:00"} + framework.write_reflective_log(self.tmp, entry) + entries = framework.read_reflective_log(self.tmp, last_n=10) + self.assertEqual(len(entries), 3) + self.assertEqual(entries[0]["iteration"], 0) + self.assertEqual(entries[2]["iteration"], 2) + + def test_read_last_n_returns_subset(self): + for i in range(10): + entry = {"iteration": i, "timestamp": f"2026-01-{i+1:02d}T00:00:00"} + framework.write_reflective_log(self.tmp, entry) + entries = framework.read_reflective_log(self.tmp, last_n=3) + self.assertEqual(len(entries), 3) + self.assertEqual(entries[0]["iteration"], 7) + self.assertEqual(entries[2]["iteration"], 9) + + def test_log_trimmed_to_limit(self): + for i in range(framework.REFLECTIVE_LOG_LIMIT + 10): + entry = {"iteration": i, "timestamp": f"2026-01-01T00:{i:02d}:00"} + framework.write_reflective_log(self.tmp, entry) + entries = framework.read_reflective_log(self.tmp, last_n=100) + self.assertEqual(len(entries), framework.REFLECTIVE_LOG_LIMIT) + # Oldest entries were trimmed + self.assertEqual(entries[0]["iteration"], 10) + + def test_read_empty_returns_empty_list(self): + entries = framework.read_reflective_log(self.tmp) + self.assertEqual(entries, []) + + def test_read_nonexistent_dir_returns_empty_list(self): + entries = framework.read_reflective_log("/nonexistent/path") + self.assertEqual(entries, []) + + def test_corrupted_lines_skipped(self): + log_path = os.path.join(self.tmp, ".ouro", "reflective-log.jsonl") + with open(log_path, "w") as f: + f.write('{"iteration": 0}\n') + f.write('NOT VALID JSON\n') + f.write('{"iteration": 1}\n') + entries = framework.read_reflective_log(self.tmp, last_n=10) + self.assertEqual(len(entries), 2) + self.assertEqual(entries[0]["iteration"], 0) + self.assertEqual(entries[1]["iteration"], 1) + + def test_each_line_is_valid_json(self): + entry = { + "what": {"verdict": "PASS"}, + "why": {"notes": "line with\nnewline should be escaped"}, + "timestamp": "2026-01-01T00:00:00", + } + framework.write_reflective_log(self.tmp, entry) + log_path = os.path.join(self.tmp, ".ouro", "reflective-log.jsonl") + with open(log_path) as f: + lines = f.readlines() + self.assertEqual(len(lines), 1) + parsed = json.loads(lines[0]) + self.assertIn("\n", parsed["why"]["notes"]) + + +# --------------------------------------------------------------------------- +# print_reflective_summary() +# --------------------------------------------------------------------------- + +class TestPrintReflectiveSummary(unittest.TestCase): + """print_reflective_summary() formats output correctly.""" + + def setUp(self): + self.tmp = _make_tmp() + os.makedirs(os.path.join(self.tmp, ".ouro"), exist_ok=True) + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + def _capture(self, project_path, last_n=5) -> str: + buf = StringIO() + with patch("sys.stdout", buf): + framework.print_reflective_summary(project_path, last_n) + return buf.getvalue() + + def test_empty_log_shows_message(self): + output = self._capture(self.tmp) + self.assertIn("No reflective log", output) + + def test_single_entry_shows_what_why_pattern(self): + entry = { + "timestamp": "2026-01-01T12:00:00+00:00", + "iteration": 1, + "what": { + "stage": "BUILD", + "phase": "1/3", + "verdict": "PASS", + "overall": "PASS", + "gates": {"EXIST": {"status": "PASS", "detail": "ok"}}, + "changed_files": [], + "danger_zone_contact": [], + "bound_violations": 0, + "review_required": False, + }, + "why": { + "complexity": "simple", + "complexity_reason": "2 files", + "review_reasons": [], + "bound_state": {"danger_zones": 2, "never_do": 2, "iron_laws": 2}, + "notes": "", + }, + "pattern": { + "consecutive_failures": 0, + "stuck_loop": False, + "velocity_trend": "STABLE", + "retry_rate": 0.0, + "hot_files": [], + "drift_signal": False, + }, + "alerts": [], + } + framework.write_reflective_log(self.tmp, entry) + output = self._capture(self.tmp) + self.assertIn("WHAT:", output) + self.assertIn("WHY:", output) + self.assertIn("PATTERN:", output) + self.assertIn("BUILD", output) + self.assertIn("PASS", output) + + def test_alerts_shown_in_output(self): + entry = { + "timestamp": "2026-01-01T12:00:00+00:00", + "iteration": 1, + "what": {"stage": "BUILD", "phase": "1/3", "verdict": "FAIL", + "overall": "FAIL", "gates": {}, + "changed_files": [], "danger_zone_contact": [], + "bound_violations": 0, "review_required": False}, + "why": {"complexity": "simple", "complexity_reason": "", + "review_reasons": [], + "bound_state": {"danger_zones": 0, "never_do": 0, "iron_laws": 0}, + "notes": ""}, + "pattern": {"consecutive_failures": 3, "stuck_loop": True, + "velocity_trend": "STALLED", "retry_rate": 1.0, + "hot_files": [], "drift_signal": False}, + "alerts": ["STUCK: same stage failing 3+ times"], + } + framework.write_reflective_log(self.tmp, entry) + output = self._capture(self.tmp) + self.assertIn("STUCK", output) + + def test_trend_shown_with_multiple_entries(self): + for i in range(5): + entry = { + "timestamp": f"2026-01-0{i+1}T12:00:00+00:00", + "iteration": i + 1, + "what": {"stage": "BUILD", "phase": f"{i+1}/5", + "verdict": "PASS", "overall": "PASS", + "gates": {}, "changed_files": [], + "danger_zone_contact": [], "bound_violations": 0, + "review_required": False}, + "why": {"complexity": "simple", "complexity_reason": "", + "review_reasons": [], + "bound_state": {"danger_zones": 0, "never_do": 0, "iron_laws": 0}, + "notes": ""}, + "pattern": {"consecutive_failures": 0, "stuck_loop": False, + "velocity_trend": "STABLE", "retry_rate": 0.0, + "hot_files": [], "drift_signal": False}, + "alerts": [], + } + framework.write_reflective_log(self.tmp, entry) + output = self._capture(self.tmp) + self.assertIn("Trend:", output) + self.assertIn("Velocity:", output) + + +# --------------------------------------------------------------------------- +# Integration: log_phase_result() writes reflective log +# --------------------------------------------------------------------------- + +class TestLogPhaseResultReflectiveIntegration(unittest.TestCase): + """log_phase_result() now also writes to the reflective log.""" + + def setUp(self): + self.tmp = _make_tmp() + _make_state(self.tmp) + results_path = os.path.join(self.tmp, "ouro-results.tsv") + with open(results_path, "w") as f: + f.write("phase\tverdict\n") + + def tearDown(self): + shutil.rmtree(self.tmp, ignore_errors=True) + + @patch("framework.subprocess.run") + def test_log_creates_reflective_entry(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "PASS", "test integration") + entries = framework.read_reflective_log(self.tmp) + self.assertEqual(len(entries), 1) + self.assertEqual(entries[0]["what"]["verdict"], "PASS") + + @patch("framework.subprocess.run") + def test_log_reflective_has_three_layers(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "PASS") + entries = framework.read_reflective_log(self.tmp) + entry = entries[0] + self.assertIn("what", entry) + self.assertIn("why", entry) + self.assertIn("pattern", entry) + self.assertIn("alerts", entry) + + @patch("framework.subprocess.run") + def test_multiple_logs_create_multiple_entries(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "PASS", "first") + framework.log_phase_result(self.tmp, "FAIL", "second") + framework.log_phase_result(self.tmp, "RETRY", "third") + entries = framework.read_reflective_log(self.tmp) + self.assertEqual(len(entries), 3) + self.assertEqual(entries[0]["what"]["verdict"], "PASS") + self.assertEqual(entries[1]["what"]["verdict"], "FAIL") + self.assertEqual(entries[2]["what"]["verdict"], "RETRY") + + @patch("framework.subprocess.run") + def test_log_alerts_printed_to_stdout(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + # Create history with consecutive failures to trigger alerts + _make_state(self.tmp, history=[ + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + {"verdict": "FAIL", "stage": "BUILD"}, + ]) + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "FAIL", "still failing") + output = buf.getvalue() + self.assertIn(">>", output) + + @patch("framework.subprocess.run") + def test_reflective_notes_preserved(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "PASS", "important context here") + entries = framework.read_reflective_log(self.tmp) + self.assertEqual(entries[0]["why"]["notes"], "important context here") + + @patch("framework.subprocess.run") + def test_reflective_iteration_increments(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + buf = StringIO() + with patch("sys.stdout", buf): + framework.log_phase_result(self.tmp, "PASS", "first") + framework.log_phase_result(self.tmp, "PASS", "second") + entries = framework.read_reflective_log(self.tmp) + # Iteration numbers should increment + self.assertEqual(entries[0]["iteration"], 1) + self.assertEqual(entries[1]["iteration"], 2) + + +if __name__ == "__main__": + unittest.main()