Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions intake_pack/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""intake_pack — Schema-only definitions for intake-pack v0.1.

Exports:
IntakeRequestSchema, IntakeVerdictSchema (schema.py)
AllowFamily, RefuseFamily, EscalateFamily,
VERDICT_FAMILY (reason_codes.py)
StatusEnvelope, ENVELOPE_VERSION (status_envelope.py)
"""

from .reason_codes import AllowFamily, EscalateFamily, RefuseFamily, VERDICT_FAMILY
from .schema import IntakeRequestSchema, IntakeVerdictSchema, SCHEMA_VERSION
from .status_envelope import StatusEnvelope, ENVELOPE_VERSION

__all__ = [
"IntakeRequestSchema",
"IntakeVerdictSchema",
"SCHEMA_VERSION",
"AllowFamily",
"RefuseFamily",
"EscalateFamily",
"VERDICT_FAMILY",
"StatusEnvelope",
"ENVELOPE_VERSION",
]
41 changes: 41 additions & 0 deletions intake_pack/reason_codes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""reason_codes — Canonical reason-code families for intake-pack v0.1.

Each Enum groups the reason-code strings that may accompany a given verdict
class. Values match the strings emitted by the commit_gate engine so that
consumers can compare by value without a hard engine dependency.

No logic. No external dependencies.
"""

from enum import Enum
from typing import Dict, Type


class AllowFamily(Enum):
"""Reason codes that accompany an ALLOW verdict."""

ALLOWLIST_MATCH = "allowlist_match"
EXPLICIT_GRANT = "explicit_grant"


class RefuseFamily(Enum):
"""Reason codes that accompany a REFUSE verdict."""

DENYLIST_MATCH = "denylist_match"
DEFAULT_REFUSE = "default_refuse"
MISSING_EVIDENCE = "missing_evidence"


class EscalateFamily(Enum):
"""Reason codes that accompany an ESCALATE verdict."""

ESCALATION_MATCH = "escalation_match"
SCOPE_AMBIGUOUS = "scope_ambiguous"


# Maps a verdict string to its corresponding reason-code family.
VERDICT_FAMILY: Dict[str, Type[Enum]] = {
"ALLOW": AllowFamily,
"REFUSE": RefuseFamily,
"ESCALATE": EscalateFamily,
}
54 changes: 54 additions & 0 deletions intake_pack/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""schema — Canonical field specifications for intake-pack v0.1.

Pure data-structure definitions only. No behaviour. No external dependencies.
"""

from typing import List

SCHEMA_VERSION = "0.1"


class IntakeRequestSchema:
"""Field specification for an intake request object.

Fields
------
actor_id str Caller identity.
action_class str Category of the requested action.
context dict Free-form metadata; must be JSON-serialisable.
authority_scope dict Scope key/value pairs used for rule matching.
invariant_hash str SHA-256 of the invariant set in effect.
timestamp_utc str|None ISO-8601 timestamp; optional, excluded from hash.
"""

REQUIRED_FIELDS: List[str] = [
"actor_id",
"action_class",
"context",
"authority_scope",
"invariant_hash",
]
OPTIONAL_FIELDS: List[str] = ["timestamp_utc"]
ALL_FIELDS: List[str] = REQUIRED_FIELDS + OPTIONAL_FIELDS


class IntakeVerdictSchema:
"""Field specification for an intake verdict object.

Fields
------
verdict str One of ALLOW, REFUSE, ESCALATE.
reasons list Sorted list of reason-code strings.
decision_hash str SHA-256 of the canonical decision object.
request_hash str SHA-256 of the canonical request object.
artefact_version str Schema version string.
"""

VERDICT_VALUES: List[str] = ["ALLOW", "REFUSE", "ESCALATE"]
REQUIRED_FIELDS: List[str] = [
"verdict",
"reasons",
"decision_hash",
"request_hash",
"artefact_version",
]
29 changes: 29 additions & 0 deletions intake_pack/status_envelope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""status_envelope — Standardised status wrapper for intake-pack v0.1.

A pure data container. No I/O. No external dependencies.
"""

from typing import Any, List, NamedTuple, Optional

ENVELOPE_VERSION = "0.1"


class StatusEnvelope(NamedTuple):
"""Standardised response wrapper.

Fields
------
ok bool True if the operation succeeded without error.
status str Verdict tag: ALLOW, REFUSE, or ESCALATE.
reason_codes List[str] Sorted list of reason-code strings.
payload Any|None Inner result object; None on error.
envelope_version str Schema version for this envelope.
error str|None Error message; populated only when ok is False.
"""

ok: bool
status: str
reason_codes: List[str]
payload: Optional[Any]
envelope_version: str = ENVELOPE_VERSION
error: Optional[str] = None
169 changes: 169 additions & 0 deletions test_intake_pack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Tests for intake_pack v0.1 — schema, reason-code families, status envelope."""

import pytest

from intake_pack import (
ENVELOPE_VERSION,
SCHEMA_VERSION,
AllowFamily,
EscalateFamily,
IntakeRequestSchema,
IntakeVerdictSchema,
RefuseFamily,
StatusEnvelope,
VERDICT_FAMILY,
)


# ---------------------------------------------------------------------------
# Schema tests
# ---------------------------------------------------------------------------


def test_schema_version():
assert SCHEMA_VERSION == "0.1"


def test_intake_request_schema_required_fields():
assert "actor_id" in IntakeRequestSchema.REQUIRED_FIELDS
assert "action_class" in IntakeRequestSchema.REQUIRED_FIELDS
assert "context" in IntakeRequestSchema.REQUIRED_FIELDS
assert "authority_scope" in IntakeRequestSchema.REQUIRED_FIELDS
assert "invariant_hash" in IntakeRequestSchema.REQUIRED_FIELDS


def test_intake_request_schema_optional_fields():
assert "timestamp_utc" in IntakeRequestSchema.OPTIONAL_FIELDS


def test_intake_request_schema_all_fields():
assert set(IntakeRequestSchema.ALL_FIELDS) == set(
IntakeRequestSchema.REQUIRED_FIELDS + IntakeRequestSchema.OPTIONAL_FIELDS
)


def test_intake_verdict_schema_verdict_values():
assert set(IntakeVerdictSchema.VERDICT_VALUES) == {"ALLOW", "REFUSE", "ESCALATE"}


def test_intake_verdict_schema_required_fields():
assert "verdict" in IntakeVerdictSchema.REQUIRED_FIELDS
assert "reasons" in IntakeVerdictSchema.REQUIRED_FIELDS
assert "decision_hash" in IntakeVerdictSchema.REQUIRED_FIELDS
assert "request_hash" in IntakeVerdictSchema.REQUIRED_FIELDS
assert "artefact_version" in IntakeVerdictSchema.REQUIRED_FIELDS


# ---------------------------------------------------------------------------
# Reason-code family tests
# ---------------------------------------------------------------------------


def test_allow_family_values():
assert AllowFamily.ALLOWLIST_MATCH.value == "allowlist_match"
assert AllowFamily.EXPLICIT_GRANT.value == "explicit_grant"


def test_refuse_family_values():
assert RefuseFamily.DENYLIST_MATCH.value == "denylist_match"
assert RefuseFamily.DEFAULT_REFUSE.value == "default_refuse"
assert RefuseFamily.MISSING_EVIDENCE.value == "missing_evidence"


def test_escalate_family_values():
assert EscalateFamily.ESCALATION_MATCH.value == "escalation_match"
assert EscalateFamily.SCOPE_AMBIGUOUS.value == "scope_ambiguous"


def test_verdict_family_mapping_keys():
assert set(VERDICT_FAMILY.keys()) == {"ALLOW", "REFUSE", "ESCALATE"}


def test_verdict_family_mapping_types():
assert VERDICT_FAMILY["ALLOW"] is AllowFamily
assert VERDICT_FAMILY["REFUSE"] is RefuseFamily
assert VERDICT_FAMILY["ESCALATE"] is EscalateFamily


def test_engine_reason_codes_are_in_families():
"""Engine-emitted reason codes must exist in the appropriate family."""
engine_allow = "allowlist_match"
engine_refuse_denylist = "denylist_match"
engine_refuse_default = "default_refuse"
engine_escalate = "escalation_match"

allow_values = {m.value for m in AllowFamily}
refuse_values = {m.value for m in RefuseFamily}
escalate_values = {m.value for m in EscalateFamily}

assert engine_allow in allow_values
assert engine_refuse_denylist in refuse_values
assert engine_refuse_default in refuse_values
assert engine_escalate in escalate_values


# ---------------------------------------------------------------------------
# Status envelope tests
# ---------------------------------------------------------------------------


def test_envelope_version():
assert ENVELOPE_VERSION == "0.1"


def test_status_envelope_allow():
env = StatusEnvelope(
ok=True,
status="ALLOW",
reason_codes=["allowlist_match"],
payload={"verdict": "ALLOW"},
)
assert env.ok is True
assert env.status == "ALLOW"
assert env.reason_codes == ["allowlist_match"]
assert env.payload == {"verdict": "ALLOW"}
assert env.envelope_version == ENVELOPE_VERSION
assert env.error is None


def test_status_envelope_refuse():
env = StatusEnvelope(
ok=False,
status="REFUSE",
reason_codes=["default_refuse"],
payload=None,
error="No matching rule found.",
)
assert env.ok is False
assert env.status == "REFUSE"
assert env.error == "No matching rule found."
assert env.payload is None


def test_status_envelope_escalate():
env = StatusEnvelope(
ok=True,
status="ESCALATE",
reason_codes=["escalation_match"],
payload={"verdict": "ESCALATE"},
)
assert env.status == "ESCALATE"
assert env.reason_codes == ["escalation_match"]


def test_status_envelope_is_immutable():
"""StatusEnvelope is a NamedTuple and therefore immutable."""
env = StatusEnvelope(
ok=True,
status="ALLOW",
reason_codes=[],
payload=None,
)
with pytest.raises(AttributeError):
env.ok = False # type: ignore[misc]


def test_status_envelope_default_version():
"""envelope_version defaults to ENVELOPE_VERSION constant."""
env = StatusEnvelope(ok=True, status="ALLOW", reason_codes=[], payload=None)
assert env.envelope_version == "0.1"