Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,39 @@ on:
pull_request:

jobs:
forbidden-file-drift-guard:
runs-on: ubuntu-latest
permissions:
contents: read
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Fail if forbidden files are modified
run: |
BASE="${{ github.event.pull_request.base.sha }}"
HEAD="${{ github.event.pull_request.head.sha }}"
CHANGED=$(git diff --name-only "$BASE" "$HEAD")
FORBIDDEN_TOUCHED=""
while IFS= read -r f; do
case "$f" in
authority_gate.py|stop_machine.py|commit_gate/*)
FORBIDDEN_TOUCHED="$FORBIDDEN_TOUCHED $f"
;;
esac
done <<< "$CHANGED"
if [ -n "$FORBIDDEN_TOUCHED" ]; then
echo "ERROR: forbidden files modified in this PR:$FORBIDDEN_TOUCHED"
exit 1
fi
echo "OK: no forbidden files modified."

tests:
runs-on: ubuntu-latest
permissions:
contents: read
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
Expand Down
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
__pycache__/
*.py[cod]
*.pyo
.pytest_cache/
*.egg-info/
dist/
build/
.eggs/
111 changes: 111 additions & 0 deletions mgtp/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""mgtp/evaluator.py — Fail-closed MGTP verdict resolver.

Consumes authority_gate.AuthorityGate; does not redefine it.

Fail-closed rules (non-negotiable):
1. verdict == ALLOW requires provided_evidence is not None.
2. decision_time must fall within [request.timestamp, request.timestamp + window].
Outside that window → REFUSED (fail-closed).
"""

from datetime import datetime, timezone
from typing import Optional

from authority_gate import Decision, Evidence, AuthorityGate

from .types import AuthorityContext, DecisionRecord, TransitionOutcome, TransitionRequest

# Authority window: maximum seconds between request timestamp and decision_time.
AUTHORITY_WINDOW_SECONDS = 3600 # 1 hour; conservative, never relaxed upward here

_ISO_FMT = "%Y-%m-%dT%H:%M:%SZ"


def _parse_utc(ts: str) -> datetime:
"""Parse an ISO-8601 UTC timestamp string to an aware datetime."""
try:
return datetime.strptime(ts, _ISO_FMT).replace(tzinfo=timezone.utc)
except ValueError:
raise ValueError(f"Timestamp must be ISO-8601 UTC (YYYY-MM-DDTHH:MM:SSZ), got: {ts!r}")


def _within_authority_window(request_ts: str, decision_ts: str, window_seconds: int) -> bool:
"""Return True iff decision_ts is within [request_ts, request_ts + window_seconds]."""
req_dt = _parse_utc(request_ts)
dec_dt = _parse_utc(decision_ts)
delta = (dec_dt - req_dt).total_seconds()
return 0 <= delta <= window_seconds


def evaluate(
request: TransitionRequest,
context: AuthorityContext,
provided_evidence: Optional[Evidence],
decision_time: str,
authority_window_seconds: int = AUTHORITY_WINDOW_SECONDS,
) -> DecisionRecord:
"""Evaluate a TransitionRequest and return a DecisionRecord.

Fail-closed: any uncertainty or missing evidence yields REFUSED.
"""
# --- Guard: decision_time within authority window ---
if not _within_authority_window(request.timestamp, decision_time, authority_window_seconds):
return DecisionRecord(
transition_id=request.transition_id,
verdict=TransitionOutcome.REFUSED,
reasons=("decision_time_outside_authority_window",),
decision_time=decision_time,
authority_basis=context.authority_basis,
)

# --- Guard: evidence must not be None for any non-REFUSED path ---
if provided_evidence is None:
return DecisionRecord(
transition_id=request.transition_id,
verdict=TransitionOutcome.REFUSED,
reasons=("missing_evidence",),
decision_time=decision_time,
authority_basis=context.authority_basis,
)

# --- Resolve required evidence level from authority_basis ---
try:
required = Evidence[context.authority_basis]
except KeyError:
return DecisionRecord(
transition_id=request.transition_id,
verdict=TransitionOutcome.REFUSED,
reasons=("unknown_authority_basis",),
decision_time=decision_time,
authority_basis=context.authority_basis,
)

gate = AuthorityGate(required)
gate_decision = gate.check(provided_evidence)

if gate_decision is Decision.DENY:
return DecisionRecord(
transition_id=request.transition_id,
verdict=TransitionOutcome.REFUSED,
reasons=("insufficient_evidence",),
decision_time=decision_time,
authority_basis=context.authority_basis,
)

# Evidence is sufficient; apply risk-class supervision rules.
if request.irreversible or request.trust_boundary_crossed:
return DecisionRecord(
transition_id=request.transition_id,
verdict=TransitionOutcome.SUPERVISED,
reasons=("irreversible_or_trust_boundary",),
decision_time=decision_time,
authority_basis=context.authority_basis,
)

return DecisionRecord(
transition_id=request.transition_id,
verdict=TransitionOutcome.APPROVED,
reasons=("evidence_sufficient",),
decision_time=decision_time,
authority_basis=context.authority_basis,
)
37 changes: 36 additions & 1 deletion mgtp/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import hashlib
import json
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from typing import Optional, Tuple


class RiskClass(str, Enum):
Expand Down Expand Up @@ -32,3 +34,36 @@ class AuthorityContext:
actor_id: str
authority_basis: str # map to Evidence enum name e.g. "OWNER"
tenant_id: str


@dataclass(frozen=True)
class DecisionRecord:
"""Immutable, canonical artefact recording an MGTP evaluation decision.

Fields are sorted alphabetically in the canonical representation to
guarantee byte-for-byte stability across Python versions and runs.
"""

transition_id: str
verdict: TransitionOutcome
reasons: Tuple[str, ...] # must be pre-sorted by caller
decision_time: str # ISO-8601 timestamp; injected, never generated here
authority_basis: str

def canonical_bytes(self) -> bytes:
"""Return deterministic UTF-8 JSON bytes (sorted keys, no whitespace)."""
obj = {
"authority_basis": self.authority_basis,
"decision_time": self.decision_time,
"reasons": list(self.reasons),
"transition_id": self.transition_id,
"verdict": self.verdict.value,
}
return json.dumps(
obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False
).encode("utf-8")

@property
def decision_hash(self) -> str:
"""SHA-256 hex digest (lower-case) of canonical_bytes()."""
return hashlib.sha256(self.canonical_bytes()).hexdigest()
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""conftest.py — Add repository root to sys.path for tests in this directory."""
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
Loading