diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..e14c839
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,9 @@
+# Changelog
+
+## v0.2.1 - 2026-02-17
+
+- Hardened advisory feed ingestion by rejecting insecure `http://` remote references.
+- Enforced explicit advisory feed schema (`aixv.advisory-feed/v1`) for sync operations.
+- Added `record_id` safety validation (`^[A-Za-z0-9._-]{1,128}$`) to prevent path traversal footguns.
+- Added tests for insecure feed rejection and unsafe record ID rejection.
+
diff --git a/README.md b/README.md
index 17d143b..dfabbf1 100644
--- a/README.md
+++ b/README.md
@@ -1,12 +1,11 @@
-# AIXV - AI Integrity Exchange & Verification
+# AIXV - AI Integrity eXchange & Verification
[](https://github.com/aixv-org/aixv/actions/workflows/ci.yml)
[](https://github.com/aixv-org/aixv/actions/workflows/conformance.yml)
@@ -15,7 +14,7 @@
[](https://pypi.org/project/aixv)
-AIXV is an open standard for AI artifact attestation, provenance, rollback, compromise detection, and investigation.
+**AIXV** is an open standard for AI artifact attestation, provenance, rollback, compromise detection, and investigation.
In practical terms, AIXV helps organizations answer high-stakes questions before deploying or accepting AI artifacts:
- What exactly is this artifact?
@@ -26,20 +25,20 @@ In practical terms, AIXV helps organizations answer high-stakes questions before
AIXV is built for three audiences that need shared, verifiable answers:
-- Technical teams: deterministic verification and machine-readable admission decisions.
-- Enterprise and public-sector risk owners: auditable evidence, policy controls, and incident traceability.
-- Policy, governance, and assurance functions: explicit trust assumptions, conformance checks, and compatibility contracts.
+- **Technical teams**: deterministic verification and machine-readable admission decisions.
+- **Enterprise and public-sector risk owners**: auditable evidence, policy controls, and incident traceability.
+- **Policy, governance, and assurance functions**: explicit trust assumptions, conformance checks, and compatibility contracts.
AIXV composes Sigstore cryptographic primitives and adds AI-native semantics:
-- artifact typing,
-- lineage graphs,
+- Artifact typing,
+- Lineage graphs,
- ML-specific attestations,
-- advisory/recall workflows,
-- and policy-driven verification.
+- Advisory/recall workflows,
+- Policy-driven verification.
## Release Posture
-Current maturity: **Pre-alpha**.
+Current maturity: **Pre-Alpha**
This repository is a functional preview of the AIXV standard, but not yet a final ratified standard release.
@@ -71,16 +70,16 @@ For security and procurement reviews, the strongest immediate signals are:
- `docs/AIXV_STANDARD.md`
- `docs/NORMATIVE_CORE.md`
-- `docs/QUALITY_BAR.md`
+- `docs/QUALITY.md`
- `docs/THREAT_MODEL.md`
-- `SECURITY.md`
- `docs/COMPATIBILITY.md`
- `docs/TERMINOLOGY.md`
- `docs/REGISTRIES.md`
-- `docs/PROFILES.md`
+- `docs/ASSURANCE_LEVELS.md`
- `docs/CONFORMANCE.md`
- `docs/GOVERNANCE.md`
- `docs/REPO_CONTROLS.md`
+- `SECURITY.md`
- `RELEASE.md`
## Installation
@@ -102,29 +101,67 @@ aixv policy create --input policy.json --sign
aixv verify model.safetensors \
--policy .aixv/policies/policy.json \
--policy-trusted-subject security-policy@aixv.org \
+ --assurance-level level-2 \
--json
# 4) Run conformance checks
aixv conformance --json
+
+# 5) Optional: enforce signed-and-trusted attestations in lineage/export flows
+aixv provenance model.safetensors \
+ --require-signed-attestations \
+ --trusted-attestation-subject ci-attestations@aixv.org \
+ --json
```
## CLI Surface
```bash
+# Version
aixv version
+
+# Signing
aixv sign model.safetensors --identity-token-env SIGSTORE_ID_TOKEN
+
+# Verification
aixv verify model.safetensors --identity alice@example.com --issuer https://accounts.google.com
+
+# Attestation
aixv attest model.safetensors --predicate training --input training.json
+
+# Provenance
aixv provenance model.safetensors --depth 3
+aixv provenance model.safetensors --view explain --depth 3 --json
+
+# Advisory
aixv advisory create --advisory-id ADV-2026-0001 --severity critical --input advisory.json --sign
aixv advisory verify .aixv/advisories/ADV-2026-0001.json --trusted-subject security@aixv.org
+aixv advisory sync --feed advisory-feed.json --trusted-subject security@aixv.org --max-bundle-age-days 30
+
+# Policy
aixv policy create --input policy.json --sign
aixv policy verify .aixv/policies/policy.json --trusted-subject security-policy@aixv.org
+aixv policy template --assurance-level level-2 --json
+aixv policy migrate --input policy.json --to-assurance-level level-3 --max-bundle-age-days 30
+
+# Record
aixv record create --kind waiver --record-id WVR-2026-01 --input waiver.json --sign
aixv record verify .aixv/policies/policy.json --kind policy --trusted-subject security-policy@aixv.org
+
+# Bundle
+aixv bundle create --input bundle.json --sign
+aixv bundle verify .aixv/records/bundle/bundle-main.json --trusted-subject release@aixv.org
+
+# Conformance
aixv conformance --json
-aixv rollback model-v2.safetensors --to sha256:...
+
+# Rollback
+aixv rollback model-v2.safetensors --to sha256:... --identity-token-env SIGSTORE_ID_TOKEN
+
+# Export
aixv export model.safetensors --format in-toto
+aixv export model.safetensors --format slsa --json
+aixv export model.safetensors --format ml-bom --json
```
## Policy Example
diff --git a/SECURITY.md b/SECURITY.md
index 64ceda5..2fd81cf 100644
--- a/SECURITY.md
+++ b/SECURITY.md
@@ -56,4 +56,4 @@ The following are non-negotiable invariants:
This repository is pre-alpha. Use in production only with explicit risk acceptance.
-For production pilots, minimum controls are defined in `docs/QUALITY_BAR.md`.
+For production pilots, minimum controls are defined in `docs/QUALITY.md`.
diff --git a/docs/AIXV_STANDARD.md b/docs/AIXV_STANDARD.md
index 0e32863..6c55ece 100644
--- a/docs/AIXV_STANDARD.md
+++ b/docs/AIXV_STANDARD.md
@@ -216,17 +216,20 @@ Policy outputs:
- Breaking changes require new major version URI.
- Verifiers SHOULD support at least one prior major version during migration.
-## 13. Interoperability Profiles
+## 13. Interoperability Assurance Levels
-AIXV profiles define strict subsets for predictable exchange:
-- `core-minimal`: sign + verify + training lineage
-- `core-enterprise`: adds advisories, policy decisions, rollback attestations
-- `core-regulated`: adds immutable retention and evidence export requirements
+AIXV assurance levels define strict subsets for predictable exchange:
+- `level-1`: sign + verify + training lineage
+- `level-2`: adds advisories, policy decisions, rollback attestations
+- `level-3`: adds immutable retention and evidence export requirements
+
+These levels are ordinal assurance tiers and do not encode assumptions about any
+specific sector, procurement regime, or legal jurisdiction.
Export targets:
- in-toto statements
- SLSA provenance mappings
-- ML-BOM (SPDX/CycloneDX extension profile)
+- ML-BOM (SPDX/CycloneDX extension format)
## 14. Security Requirements
@@ -234,6 +237,7 @@ Export targets:
- Clock skew tolerance MUST be explicit in verifier config.
- Digest algorithm agility MUST be designed in (`sha256` required in v1).
- Replay protection SHOULD check transparency log integrated time and bundle uniqueness.
+- Remote advisory ingestion SHOULD enforce monotonic integrated time per advisory ID and reject stale bundles by configured age.
- All critical security decisions MUST be auditable with machine-readable reason codes.
## 15. Reference CLI Contract (v1)
@@ -242,7 +246,8 @@ Export targets:
- `aixv attest --predicate --input `
- `aixv verify [--policy ]`
- `aixv provenance [--depth N]`
-- `aixv advisory create|verify|list ...`
+- `aixv advisory create|verify|list|sync ...`
+- `aixv policy template|migrate|create|verify ...`
- `aixv rollback --to `
- `aixv export --format in-toto|slsa|ml-bom|aixv`
diff --git a/docs/ASSURANCE_LEVELS.md b/docs/ASSURANCE_LEVELS.md
new file mode 100644
index 0000000..4003396
--- /dev/null
+++ b/docs/ASSURANCE_LEVELS.md
@@ -0,0 +1,39 @@
+# AIXV Assurance Levels (v1 Draft)
+
+Assurance levels are constrained operating modes that define required controls.
+Levels are ordinal only; they do not imply any specific industry, jurisdiction,
+or regulatory framework.
+
+## level-1
+
+Required:
+- Signed artifact verification bound to trusted subject identities.
+- Deterministic JSON output for automation.
+- Policy validation (`aixv.policy/v1`) when policy input is provided.
+- Optional enforcement gate: `verify --assurance-level level-1`.
+
+## level-2
+
+Includes `level-1`, plus:
+- Signed policy verification enabled by default.
+- Signed advisory verification for policy-driven advisory enforcement.
+- AdmissionDecision output persisted/logged by deployment systems.
+- Enforcement gate: `verify --assurance-level level-2` requires:
+ - `--policy`,
+ - signed policy verification enabled,
+ - `require_signed_advisories=true`,
+ - configured advisory trust subjects.
+
+## level-3
+
+Includes `level-2`, plus:
+- Evidence retention requirements for signatures, policy records, advisories, and decisions.
+- Immutable audit trail integration requirements.
+- Formal conformance testing requirement before production use.
+- Enforcement gate: `verify --assurance-level level-3` additionally requires:
+ - `max_bundle_age_days`,
+ - `require_no_active_advisories=true`.
+
+Operational helpers:
+- `policy template --assurance-level ` generates a baseline policy payload.
+- `policy migrate --to-assurance-level ` upgrades existing policy payloads while preserving explicit trust roots.
diff --git a/docs/CONFORMANCE.md b/docs/CONFORMANCE.md
index 27381e8..18b0fcb 100644
--- a/docs/CONFORMANCE.md
+++ b/docs/CONFORMANCE.md
@@ -45,3 +45,10 @@ Runtime conformance report:
- `policy.fixture.valid.v1`: valid policy fixture is accepted.
- `policy.fixture.invalid.v1`: invalid policy fixture is rejected.
- `record.fixture.policy.v1`: signed-record policy fixture parses with expected kind.
+- `policy.unknown-field.reject.v1`: unknown policy fields are rejected.
+- `policy.advisory-trust.subject-fallback.v1`: advisory trust roots fall back to policy `subject`.
+- `advisory.signed-policy.filtering.v1`: when signed advisories are required, untrusted advisories are ignored and trusted advisories drive enforcement.
+- `bundle.schema.validation.v1`: bundle records validate canonical multi-artifact membership semantics.
+- `advisory.sync.replay-freshness.v1`: advisory feed ingestion rejects replayed/stale integrated times and stale bundles.
+- `crypto.invalid-bundle.artifact.reject.v1`: malformed artifact signature bundles are rejected fail-closed.
+- `crypto.invalid-bundle.statement.reject.v1`: malformed DSSE statement bundles are rejected fail-closed.
diff --git a/docs/CRITIQUES_AND_DECISIONS.md b/docs/CRITIQUES_AND_DECISIONS.md
index ac72f0c..ad84bcb 100644
--- a/docs/CRITIQUES_AND_DECISIONS.md
+++ b/docs/CRITIQUES_AND_DECISIONS.md
@@ -72,7 +72,7 @@ Decision:
- Evidence remains queryable via stored attestations/advisories/signatures.
Status:
-- Implemented as explicit rollback event record; lineage expansion ongoing.
+- Implemented as explicit rollback event record and signed by default (`rollback --sign` default true); lineage expansion ongoing.
## 7. "Over-flexible tooling leads to insecure defaults"
@@ -88,8 +88,66 @@ Decision:
Status:
- Implemented; further hardening planned with signed remote feeds.
+## 8. "Signed attestations are emitted but not enforced downstream"
+
+Risk:
+- Provenance/export views could include unsigned or untrusted attestation claims without explicit operator intent.
+
+Decision:
+- Added strict mode for lineage/export flows:
+ - `--require-signed-attestations`
+ - trusted signer subject/issuer constraints for attestation verification.
+- Signed attestation verification uses Sigstore DSSE verification and requires payload match with stored statements.
+
+Status:
+- Implemented in `provenance` and `export` CLI flows.
+
+## 9. "Assurance levels are documented but not enforced"
+
+Risk:
+- Organizations cannot reliably prove they are operating in a specific assurance level.
+
+Decision:
+- Added verifier assurance-level gates:
+ - `verify --assurance-level level-1`
+ - `verify --assurance-level level-2`
+ - `verify --assurance-level level-3`
+- Levels are ordinal control tiers, not labels for specific sectors or regulatory regimes.
+- Assurance-level gates enforce required policy controls and signed-control-plane constraints.
+
+Status:
+- Implemented.
+
+## 10. "Lineage investigation needs explicit trace/impact/explain modes"
+
+Risk:
+- Operators cannot quickly pivot from compromised ancestor to impacted descendants or produce compact explanations.
+
+Decision:
+- `provenance` now supports:
+ - `--view trace` (ancestors),
+ - `--view impact` (descendants),
+ - `--view explain` (condensed trust/advisory evidence).
+
+Status:
+- Implemented.
+
+## 11. "Single-file trust is not enough for model releases"
+
+Risk:
+- Real releases ship multiple artifacts (weights, tokenizer, config); single-artifact trust checks are insufficient.
+
+Decision:
+- Added strict signed bundle records (`kind=bundle`, `aixv.bundle/v1`) with:
+ - canonical digest normalization,
+ - primary-member consistency,
+ - signature verification and optional required-member checks.
+
+Status:
+- Implemented via `bundle create` and `bundle verify`.
+
## Next hardening steps
-1. Add signed remote advisory feed ingestion with replay/freshness protection.
-2. Add conformance test vectors for every failure mode above.
-3. Add compatibility profile gates (`core-minimal`, `core-enterprise`, `core-regulated`).
+1. Add authenticated advisory feed discovery/rotation (beyond direct feed URL ingestion).
+2. Add conformance vectors for additional malformed feed and rollback edge cases.
+3. Add organization-ready policy packs built on `policy template`/`policy migrate`.
diff --git a/docs/PROFILES.md b/docs/PROFILES.md
deleted file mode 100644
index d256610..0000000
--- a/docs/PROFILES.md
+++ /dev/null
@@ -1,24 +0,0 @@
-# AIXV Profiles (v1 Draft)
-
-Profiles are constrained operating modes that define required controls.
-
-## core-minimal
-
-Required:
-- Signed artifact verification bound to trusted subject identities.
-- Deterministic JSON output for automation.
-- Policy validation (`aixv.policy/v1`) when policy input is provided.
-
-## core-enterprise
-
-Includes `core-minimal`, plus:
-- Signed policy verification enabled by default.
-- Signed advisory verification for policy-driven advisory enforcement.
-- AdmissionDecision output persisted/logged by deployment systems.
-
-## core-regulated
-
-Includes `core-enterprise`, plus:
-- Evidence retention requirements for signatures, policy records, advisories, and decisions.
-- Immutable audit trail integration requirements.
-- Formal conformance testing requirement before production use.
diff --git a/docs/QUALITY_BAR.md b/docs/QUALITY.md
similarity index 69%
rename from docs/QUALITY_BAR.md
rename to docs/QUALITY.md
index f1b77b1..c0a8262 100644
--- a/docs/QUALITY_BAR.md
+++ b/docs/QUALITY.md
@@ -1,6 +1,6 @@
-# AIXV Quality Bar (Core)
+# AIXV Quality (Core)
-This is the minimal bar for production-grade provenance verification in AIXV.
+This document defines core quality requirements for production-grade provenance verification in AIXV.
## 1. Cryptographic Integrity
@@ -12,6 +12,10 @@ This is the minimal bar for production-grade provenance verification in AIXV.
- Policy is treated as a signed record (`kind=policy`) and can be verified independently.
- Advisories are treated as signed records (`kind=advisory`) and can be required-signed by policy.
+- Multi-artifact bundles are represented as signed bundle records (`kind=bundle`, `aixv.bundle/v1`).
+- When `require_signed_advisories=true`, only signed-and-trusted advisories influence admission outcomes.
+- Advisory feed ingestion (`advisory sync`) rejects replayed/stale updates and supports max-age freshness checks.
+- Rollback records are signed by default (`rollback --sign`), preserving append-only evidence.
- Admission decisions are explicit and machine-readable (`allow|deny`, violations, evidence).
## 3. Schema Stability
@@ -36,4 +40,4 @@ This is the minimal bar for production-grade provenance verification in AIXV.
- Commands support deterministic JSON output (`--json`) for CI/CD and admission hooks.
- Decision outcomes are reproducible from signed evidence + policy.
-
+- Verifier assurance-level gates (`level-1|level-2|level-3`) provide explicit assurance mode enforcement.
diff --git a/docs/REGISTRIES.md b/docs/REGISTRIES.md
index 36677e3..c38d0b9 100644
--- a/docs/REGISTRIES.md
+++ b/docs/REGISTRIES.md
@@ -5,6 +5,7 @@
Core kinds:
- `policy`
- `advisory`
+- `bundle`
Provisional kinds:
- `waiver`
@@ -15,6 +16,9 @@ Registration guidance:
- New kinds should be lowercase kebab-case.
- Kind semantics must define expected payload schema and trust implications.
+Record ID guidance:
+- `record_id` should match `^[A-Za-z0-9._-]{1,128}$`.
+
## 2. Predicate URI Registry
Current predicate aliases:
@@ -42,3 +46,19 @@ Allowed status values:
- `active`
- `mitigated`
- `withdrawn`
+
+## 5. Advisory Feed Schema
+
+Feed schema:
+- `aixv.advisory-feed/v1`
+
+Required fields:
+- `entries[]` where each entry includes:
+ - `record` (or `record_url`): advisory signed-record JSON reference
+ - `bundle` (or `bundle_url`): Sigstore bundle JSON reference
+
+Operational semantics:
+- Each entry must verify against trusted signer subjects before import.
+- Remote feed and entry URLs must use `https://` (or be local file paths).
+- Replay/stale updates are rejected when integrated time does not advance.
+- Optional freshness guard rejects bundles older than configured max age.
diff --git a/src/aixv/__init__.py b/src/aixv/__init__.py
index d3ec452..3ced358 100644
--- a/src/aixv/__init__.py
+++ b/src/aixv/__init__.py
@@ -1 +1 @@
-__version__ = "0.2.0"
+__version__ = "0.2.1"
diff --git a/src/aixv/cli.py b/src/aixv/cli.py
index 780cdfd..106be93 100644
--- a/src/aixv/cli.py
+++ b/src/aixv/cli.py
@@ -1,6 +1,9 @@
import json
+from datetime import datetime, timezone
from pathlib import Path
-from typing import Any, Dict, Optional
+from tempfile import TemporaryDirectory
+from typing import Any, Dict, List, Optional
+from urllib.request import urlopen
import typer
from rich.console import Console
@@ -9,13 +12,17 @@
from aixv.conformance import run_conformance_checks
from aixv.core import (
PREDICATE_ALIASES,
+ advisory_trust_constraints_from_policy,
create_attestation_record,
create_record,
- detect_parents_from_training_attestations,
ensure_artifact,
evaluate_admission,
+ evaluate_advisory_sync_guards,
+ evaluate_assurance_level_requirements,
+ export_attestations_as_ml_bom,
+ export_attestations_as_slsa,
list_advisories,
- load_attestations_for_digest,
+ load_attestation_records_for_digest,
load_signed_record,
make_statement,
normalize_sha256_digest,
@@ -27,11 +34,15 @@
sha256_file,
sign_artifact_with_sigstore,
sign_statement_with_sigstore,
+ trace_training_lineage_descendants,
+ trace_training_lineage_parents,
validate_policy_payload,
validate_predicate,
+ validate_record_id,
validate_record_payload,
verify_artifact_with_sigstore,
verify_signed_record,
+ verify_statement_with_sigstore,
write_json,
)
@@ -39,9 +50,11 @@
advisory_app = typer.Typer(help="Advisory and recall operations")
policy_app = typer.Typer(help="Policy operations")
record_app = typer.Typer(help="Generic signed-record operations")
+bundle_app = typer.Typer(help="Multi-artifact bundle operations")
app.add_typer(advisory_app, name="advisory")
app.add_typer(policy_app, name="policy")
app.add_typer(record_app, name="record")
+app.add_typer(bundle_app, name="bundle")
console = Console()
@@ -62,6 +75,152 @@ def _parse_csv_values(raw: Optional[str]) -> list:
return [s.strip() for s in raw.split(",") if s.strip()]
+def _is_remote_ref(ref: str) -> bool:
+ return ref.startswith("https://") or ref.startswith("http://")
+
+
+def _read_bytes_from_ref(ref: str) -> bytes:
+ if ref.startswith("http://"):
+ raise ValueError("insecure remote reference: only https:// is allowed")
+ if _is_remote_ref(ref):
+ with urlopen(ref, timeout=20) as response: # nosec B310 - expected remote feed access
+ return response.read()
+ return Path(ref).read_bytes()
+
+
+def _read_json_from_ref(ref: str) -> Dict[str, Any]:
+ raw = _read_bytes_from_ref(ref)
+ payload = json.loads(raw.decode("utf-8"))
+ if not isinstance(payload, dict):
+ raise ValueError("feed payload must be a JSON object")
+ return payload
+
+
+def _parse_advisory_feed_entries(payload: Dict[str, Any]) -> List[Dict[str, str]]:
+ entries_raw: Any = payload.get("entries")
+ if isinstance(entries_raw, list):
+ entries = entries_raw
+ else:
+ raise ValueError("feed payload must include entries[]")
+
+ out: List[Dict[str, str]] = []
+ for index, entry in enumerate(entries):
+ if not isinstance(entry, dict):
+ raise ValueError(f"feed entry {index} must be an object")
+ record_ref = entry.get("record") or entry.get("record_url")
+ bundle_ref = entry.get("bundle") or entry.get("bundle_url")
+ if not isinstance(record_ref, str) or not record_ref.strip():
+ raise ValueError(f"feed entry {index} missing record reference")
+ if not isinstance(bundle_ref, str) or not bundle_ref.strip():
+ raise ValueError(f"feed entry {index} missing bundle reference")
+ out.append({"record": record_ref.strip(), "bundle": bundle_ref.strip()})
+ return out
+
+
+def _advisory_sync_state_path(root: Path) -> Path:
+ return root / ".aixv" / "advisory-sync-state.json"
+
+
+def _load_advisory_sync_state(root: Path) -> Dict[str, Any]:
+ path = _advisory_sync_state_path(root)
+ if not path.exists():
+ return {"schema": "aixv.advisory-sync-state/v1", "entries": {}}
+ payload = read_json(str(path))
+ if not isinstance(payload, dict):
+ raise ValueError("invalid advisory sync state format")
+ entries = payload.get("entries")
+ if not isinstance(entries, dict):
+ raise ValueError("invalid advisory sync state entries")
+ return payload
+
+
+def _save_advisory_sync_state(root: Path, payload: Dict[str, Any]) -> None:
+ write_json(_advisory_sync_state_path(root), payload)
+
+
+def _resolve_bundle_reference(record_path: Path, bundle_ref: Optional[str]) -> Optional[Path]:
+ if not bundle_ref:
+ return None
+ path = Path(bundle_ref)
+ if not path.is_absolute():
+ path = record_path.parent / path
+ return path
+
+
+def _assess_attestation_trust(
+ *,
+ entries: list,
+ require_signed: bool,
+ trusted_subjects: list,
+ trusted_issuers: list,
+ staging: bool,
+ offline: bool,
+) -> Dict[str, Any]:
+ if require_signed and len(trusted_subjects) < 1:
+ raise ValueError(
+ "signed attestation verification requires trusted_attestation_subject or trusted_attestation_subjects"
+ )
+
+ enriched: list = []
+ violations: list = []
+ signed_count = 0
+ verified_count = 0
+
+ for entry in entries:
+ statement = entry.get("statement", {})
+ attestation_path = Path(entry["path"])
+ bundle_path = _resolve_bundle_reference(
+ attestation_path, entry.get("signature_bundle_path")
+ )
+
+ trust: Dict[str, Any] = {
+ "signed": bool(bundle_path),
+ "signed_and_trusted": False,
+ "signature_bundle_path": str(bundle_path) if bundle_path else None,
+ }
+ if bundle_path:
+ signed_count += 1
+ if len(trusted_subjects) > 0:
+ if not bundle_path.exists():
+ trust["error"] = f"signature bundle not found: {bundle_path}"
+ else:
+ try:
+ verification = verify_statement_with_sigstore(
+ statement=statement,
+ bundle_in=bundle_path,
+ subject=None,
+ issuer=None,
+ allow_subjects=trusted_subjects,
+ allow_issuers=trusted_issuers,
+ staging=staging,
+ offline=offline,
+ )
+ trust["signed_and_trusted"] = True
+ trust["signature_verification"] = verification
+ verified_count += 1
+ except Exception as exc:
+ trust["error"] = str(exc)
+ if require_signed and not trust["signed_and_trusted"]:
+ violations.append(f"attestation is not signed and trusted: {attestation_path}")
+
+ statement_payload: Dict[str, Any] = (
+ dict(statement) if isinstance(statement, dict) else {"statement": statement}
+ )
+ statement_payload["_trust"] = trust
+ enriched.append(statement_payload)
+
+ return {
+ "attestations": enriched,
+ "summary": {
+ "total": len(entries),
+ "signed": signed_count,
+ "signed_and_trusted": verified_count,
+ "unsigned_or_untrusted": len(entries) - verified_count,
+ },
+ "violations": violations,
+ }
+
+
def _create_and_optionally_sign_record(
*,
kind: str,
@@ -175,6 +334,11 @@ def verify(
policy_trusted_issuers: Optional[str] = typer.Option(
None, help="Comma-separated trusted signer issuers for policy file"
),
+ assurance_level: Optional[str] = typer.Option(
+ None,
+ "--assurance-level",
+ help="Assurance level gate: level-1|level-2|level-3",
+ ),
staging: bool = typer.Option(False, help="Use Sigstore staging instance"),
offline: bool = typer.Option(False, help="Use cached trust root only"),
json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
@@ -223,6 +387,15 @@ def verify(
if issuer is not None:
effective_policy["issuer"] = issuer
+ assurance_level_violations = evaluate_assurance_level_requirements(
+ assurance_level=assurance_level,
+ policy_provided=policy is not None,
+ require_signed_policy=require_signed_policy,
+ policy=effective_policy,
+ )
+ if assurance_level_violations:
+ raise ValueError("; ".join(assurance_level_violations))
+
identity_constraints_present = bool(subject or effective_policy.get("allow_subjects"))
if not identity_constraints_present:
raise ValueError(
@@ -239,16 +412,25 @@ def verify(
staging=staging,
offline=offline,
)
- advisory_trusted_subjects = effective_policy.get(
- "advisory_allow_subjects"
- ) or effective_policy.get("allow_subjects", [])
- advisory_trusted_issuers = effective_policy.get(
- "advisory_allow_issuers"
- ) or effective_policy.get("allow_issuers", [])
+ advisory_trust = advisory_trust_constraints_from_policy(effective_policy)
+ advisory_trusted_subjects = advisory_trust["subjects"]
+ advisory_trusted_issuers = advisory_trust["issuers"]
+ if (
+ bool(effective_policy.get("require_signed_advisories", False))
+ and (
+ effective_policy.get("require_no_active_advisories")
+ or effective_policy.get("deny_advisory_severity_at_or_above") is not None
+ )
+ and len(advisory_trusted_subjects) < 1
+ ):
+ raise ValueError(
+ "policy requires advisory trust roots; set advisory_allow_subjects "
+ "or subject/allow_subjects"
+ )
advisories = list_advisories(
_root_dir(),
result["digest"],
- require_signed=bool(effective_policy.get("require_signed_advisories", False)),
+ require_signed=False,
trusted_subjects=advisory_trusted_subjects,
trusted_issuers=advisory_trusted_issuers,
staging=staging,
@@ -262,6 +444,8 @@ def verify(
result["policy_violations"] = decision.violations
result["admission_decision"] = decision.decision
result["admission_evidence"] = decision.evidence
+ if assurance_level:
+ result["assurance_level"] = assurance_level
if decision.decision != "allow":
result["ok"] = False
@@ -310,13 +494,6 @@ def attest(
)
write_json(statement_path, statement)
- stored_path = create_attestation_record(
- root=_root_dir(),
- artifact=artifact_path,
- predicate_uri=predicate_uri,
- statement=statement,
- )
-
result: Dict[str, Any] = {
"ok": True,
"artifact": str(artifact_path),
@@ -324,13 +501,13 @@ def attest(
"predicate": predicate,
"predicate_uri": predicate_uri,
"statement_path": str(statement_path),
- "attestation_store_path": str(stored_path),
"schema_validated": predicate_uri in PREDICATE_ALIASES.values(),
}
+ signature_bundle_path: Optional[str] = None
if sign:
bundle_out = statement_path.with_name(f"{statement_path.name}.sigstore.json")
- result["signature"] = sign_statement_with_sigstore(
+ signature = sign_statement_with_sigstore(
statement=statement,
bundle_out=bundle_out,
identity_token=identity_token,
@@ -339,6 +516,17 @@ def attest(
staging=staging,
offline=offline,
)
+ signature_bundle_path = signature["bundle_path"]
+ result["signature"] = signature
+
+ stored_path = create_attestation_record(
+ root=_root_dir(),
+ artifact=artifact_path,
+ predicate_uri=predicate_uri,
+ statement=statement,
+ signature_bundle_path=signature_bundle_path,
+ )
+ result["attestation_store_path"] = str(stored_path)
_emit(result, json_output)
except Exception as e:
@@ -350,27 +538,82 @@ def attest(
def provenance(
artifact: str = typer.Argument(..., help="Path to artifact"),
depth: int = typer.Option(3, help="Lineage traversal depth"),
+ view: str = typer.Option("trace", help="Lineage view: trace|impact|explain"),
+ require_signed_attestations: bool = typer.Option(
+ False, help="Require all matching attestations to be signed and trusted"
+ ),
+ trusted_attestation_subject: Optional[str] = typer.Option(
+ None, help="Trusted attestation signer subject"
+ ),
+ trusted_attestation_subjects: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted attestation signer subjects"
+ ),
+ trusted_attestation_issuers: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted attestation signer issuers"
+ ),
+ staging: bool = typer.Option(False, help="Use Sigstore staging instance"),
+ offline: bool = typer.Option(False, help="Use cached trust root only"),
json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
):
"""Return local lineage view from stored attestations."""
- _ = depth
try:
+ if view not in {"trace", "impact", "explain"}:
+ raise ValueError(f"unsupported provenance view: {view}")
artifact_path = ensure_artifact(artifact)
digest = sha256_file(artifact_path)
- attestations = load_attestations_for_digest(_root_dir(), digest)
- parents = detect_parents_from_training_attestations(attestations)
- advisories = list_advisories(_root_dir(), digest)
- _emit(
- {
- "ok": True,
- "artifact": str(artifact_path),
- "artifact_digest": digest,
- "attestation_count": len(attestations),
- "parents": parents,
- "advisory_count": len(advisories),
- },
- json_output,
+ entries = load_attestation_records_for_digest(_root_dir(), digest)
+ subject_list = []
+ if trusted_attestation_subject:
+ subject_list.append(trusted_attestation_subject)
+ subject_list.extend(_parse_csv_values(trusted_attestation_subjects))
+ issuer_list = _parse_csv_values(trusted_attestation_issuers)
+ trust_report = _assess_attestation_trust(
+ entries=entries,
+ require_signed=require_signed_attestations,
+ trusted_subjects=subject_list,
+ trusted_issuers=issuer_list,
+ staging=staging,
+ offline=offline,
)
+ attestations = trust_report["attestations"]
+ advisories = list_advisories(_root_dir(), digest)
+ trusted_active_advisories = [
+ a
+ for a in advisories
+ if a.get("status") == "active"
+ and isinstance(a.get("_trust"), dict)
+ and a.get("_trust", {}).get("signed_and_trusted") is True
+ ]
+ payload: Dict[str, Any] = {
+ "ok": len(trust_report["violations"]) < 1,
+ "view": view,
+ "artifact": str(artifact_path),
+ "artifact_digest": digest,
+ "attestation_count": len(attestations),
+ "attestation_trust": trust_report["summary"],
+ "attestation_violations": trust_report["violations"],
+ "advisory_count": len(advisories),
+ "trusted_active_advisory_count": len(trusted_active_advisories),
+ }
+
+ if view in {"trace", "explain"}:
+ payload["parents"] = trace_training_lineage_parents(_root_dir(), digest, depth)
+ if view in {"impact", "explain"}:
+ payload["descendants"] = trace_training_lineage_descendants(_root_dir(), digest, depth)
+ if view == "explain":
+ explanation: List[str] = []
+ if trust_report["violations"]:
+ explanation.append("attestation trust violations present")
+ if trusted_active_advisories:
+ explanation.append("trusted active advisories present")
+ if not explanation:
+ explanation.append("no immediate trust or advisory blockers detected")
+ payload["explain"] = explanation
+ _emit(payload, json_output)
+ if trust_report["violations"]:
+ raise typer.Exit(code=1)
+ except typer.Exit:
+ raise
except Exception as e:
_emit({"ok": False, "error": str(e), "command": "provenance"}, json_output)
raise typer.Exit(code=1)
@@ -411,15 +654,64 @@ def sign_delta(
def rollback(
artifact: str = typer.Argument(..., help="Artifact being rolled back"),
to: str = typer.Option(..., help="Target digest or artifact path"),
+ sign: bool = typer.Option(
+ True,
+ "--sign/--no-sign",
+ help="Sign rollback record with Sigstore (default: enabled)",
+ ),
+ identity_token: Optional[str] = typer.Option(None, help="OIDC token for keyless signing"),
+ identity_token_env: str = typer.Option(
+ "SIGSTORE_ID_TOKEN", help="Environment variable containing OIDC token"
+ ),
+ interactive_oidc: bool = typer.Option(
+ False, help="Acquire OIDC token interactively via browser"
+ ),
+ staging: bool = typer.Option(False, help="Use Sigstore staging instance"),
+ offline: bool = typer.Option(False, help="Use cached trust root only"),
json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
):
- """Create a rollback event record."""
+ """Create a rollback event record (signed by default)."""
try:
artifact_path = ensure_artifact(artifact)
- record = rollback_record(artifact_path, to)
+ rollback_payload = rollback_record(artifact_path, to)
out = artifact_path.with_name(f"{artifact_path.name}.rollback.json")
- write_json(out, record)
- _emit({"ok": True, "rollback_record_path": str(out), "record": record}, json_output)
+ if not sign:
+ write_json(out, rollback_payload)
+ _emit(
+ {
+ "ok": True,
+ "rollback_record_path": str(out),
+ "record": rollback_payload,
+ "signed": False,
+ },
+ json_output,
+ )
+ return
+
+ timestamp_token = (
+ rollback_payload["timestamp"]
+ .replace(":", "")
+ .replace("-", "")
+ .replace("+", "")
+ .replace("T", "t")
+ )
+ digest_token = rollback_payload["artifact_digest"].split(":", 1)[1][:12]
+ record_id = f"RBK-{timestamp_token}-{digest_token}"
+ result = _create_and_optionally_sign_record(
+ kind="rollback",
+ record_id=record_id,
+ payload=rollback_payload,
+ output=str(out),
+ sign=True,
+ identity_token=identity_token,
+ identity_token_env=identity_token_env,
+ interactive_oidc=interactive_oidc,
+ staging=staging,
+ offline=offline,
+ )
+ result["rollback_record_path"] = result["path"]
+ result["signed"] = True
+ _emit(result, json_output)
except Exception as e:
_emit({"ok": False, "error": str(e), "command": "rollback"}, json_output)
raise typer.Exit(code=1)
@@ -429,21 +721,82 @@ def rollback(
def export(
artifact: str = typer.Argument(..., help="Path to artifact"),
format: str = typer.Option(..., help="Export format: in-toto|slsa|ml-bom|aixv"),
+ require_signed_attestations: bool = typer.Option(
+ False, help="Require all exported attestations to be signed and trusted"
+ ),
+ trusted_attestation_subject: Optional[str] = typer.Option(
+ None, help="Trusted attestation signer subject"
+ ),
+ trusted_attestation_subjects: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted attestation signer subjects"
+ ),
+ trusted_attestation_issuers: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted attestation signer issuers"
+ ),
+ staging: bool = typer.Option(False, help="Use Sigstore staging instance"),
+ offline: bool = typer.Option(False, help="Use cached trust root only"),
json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
):
"""Export locally available evidence in requested format."""
try:
artifact_path = ensure_artifact(artifact)
digest = sha256_file(artifact_path)
- attestations = load_attestations_for_digest(_root_dir(), digest)
+ entries = load_attestation_records_for_digest(_root_dir(), digest)
+ subject_list = []
+ if trusted_attestation_subject:
+ subject_list.append(trusted_attestation_subject)
+ subject_list.extend(_parse_csv_values(trusted_attestation_subjects))
+ issuer_list = _parse_csv_values(trusted_attestation_issuers)
+ trust_report = _assess_attestation_trust(
+ entries=entries,
+ require_signed=require_signed_attestations,
+ trusted_subjects=subject_list,
+ trusted_issuers=issuer_list,
+ staging=staging,
+ offline=offline,
+ )
+ attestations = trust_report["attestations"]
if format not in {"in-toto", "slsa", "ml-bom", "aixv"}:
raise ValueError(f"unsupported format: {format}")
- payload = {
- "format": format,
- "artifact_digest": digest,
- "attestations": attestations,
- }
- _emit({"ok": True, "export": payload}, json_output)
+ payload: Dict[str, Any]
+ if format == "in-toto":
+ payload = {
+ "format": "in-toto",
+ "artifact_digest": digest,
+ "statements": attestations,
+ }
+ elif format == "slsa":
+ payload = {
+ "format": "slsa",
+ "provenance": export_attestations_as_slsa(
+ artifact_digest=digest,
+ artifact_name=artifact_path.name,
+ attestations=attestations,
+ ),
+ }
+ elif format == "ml-bom":
+ payload = {
+ "format": "ml-bom",
+ "bom": export_attestations_as_ml_bom(
+ artifact_digest=digest,
+ artifact_name=artifact_path.name,
+ attestations=attestations,
+ ),
+ }
+ else:
+ payload = {
+ "format": "aixv",
+ "artifact_digest": digest,
+ "attestations": attestations,
+ }
+ payload["attestation_trust"] = trust_report["summary"]
+ payload["attestation_violations"] = trust_report["violations"]
+ ok = len(trust_report["violations"]) < 1
+ _emit({"ok": ok, "export": payload}, json_output)
+ if not ok:
+ raise typer.Exit(code=1)
+ except typer.Exit:
+ raise
except Exception as e:
_emit({"ok": False, "error": str(e), "command": "export"}, json_output)
raise typer.Exit(code=1)
@@ -555,6 +908,8 @@ def advisory_verify(
"""Verify advisory record signature and signer trust constraints."""
try:
advisory_path = ensure_artifact(advisory_record)
+ advisory = load_signed_record(str(advisory_path), expected_kind="advisory")
+ validate_predicate(PREDICATE_ALIASES["advisory"], advisory.payload)
bundle_in = resolve_signature_bundle_path(advisory_path, bundle)
if not bundle_in.exists():
raise FileNotFoundError(f"bundle not found: {bundle_in}")
@@ -573,6 +928,8 @@ def advisory_verify(
staging=staging,
offline=offline,
)
+ result["record_kind"] = advisory.kind
+ result["record_id"] = advisory.record_id
result["ok"] = True
_emit(result, json_output)
except Exception as e:
@@ -580,6 +937,143 @@ def advisory_verify(
raise typer.Exit(code=1)
+@advisory_app.command("sync")
+def advisory_sync(
+ feed: str = typer.Option(
+ ...,
+ help="Path/URL to advisory feed JSON (schema aixv.advisory-feed/v1 with entries[])",
+ ),
+ trusted_subject: Optional[str] = typer.Option(None, help="Trusted advisory signer subject"),
+ trusted_subjects: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted advisory signer subjects"
+ ),
+ trusted_issuers: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted advisory signer issuers"
+ ),
+ max_bundle_age_days: Optional[int] = typer.Option(
+ None, help="Reject advisories with bundle integrated time older than N days"
+ ),
+ staging: bool = typer.Option(False, help="Use Sigstore staging instance"),
+ offline: bool = typer.Option(False, help="Use cached trust root only"),
+ json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
+):
+ """Ingest signed advisories from a feed with replay/freshness protection."""
+ try:
+ subject_list = []
+ if trusted_subject:
+ subject_list.append(trusted_subject)
+ subject_list.extend(_parse_csv_values(trusted_subjects))
+ issuer_list = _parse_csv_values(trusted_issuers)
+ if len(subject_list) < 1:
+ raise ValueError("trusted subject constraints required")
+ if max_bundle_age_days is not None and max_bundle_age_days < 1:
+ raise ValueError("max_bundle_age_days must be >= 1")
+
+ feed_payload = _read_json_from_ref(feed)
+ if feed_payload.get("schema") != "aixv.advisory-feed/v1":
+ raise ValueError("feed schema must be aixv.advisory-feed/v1")
+ entries = _parse_advisory_feed_entries(feed_payload)
+
+ root = _root_dir()
+ advisories_dir = root / ".aixv" / "advisories"
+ advisories_dir.mkdir(parents=True, exist_ok=True)
+ sync_state = _load_advisory_sync_state(root)
+ sync_entries = sync_state.setdefault("entries", {})
+ if not isinstance(sync_entries, dict):
+ raise ValueError("invalid advisory sync state entries")
+
+ results: List[Dict[str, Any]] = []
+ imported = 0
+ for entry in entries:
+ record_ref = entry["record"]
+ bundle_ref = entry["bundle"]
+ try:
+ record_bytes = _read_bytes_from_ref(record_ref)
+ bundle_bytes = _read_bytes_from_ref(bundle_ref)
+ with TemporaryDirectory() as tmp:
+ tmp_dir = Path(tmp)
+ tmp_record = tmp_dir / "record.json"
+ tmp_bundle = tmp_dir / "record.sigstore.json"
+ tmp_record.write_bytes(record_bytes)
+ tmp_bundle.write_bytes(bundle_bytes)
+
+ advisory = load_signed_record(str(tmp_record), expected_kind="advisory")
+ validate_predicate(PREDICATE_ALIASES["advisory"], advisory.payload)
+ verification = verify_signed_record(
+ record_path=tmp_record,
+ bundle_path=tmp_bundle,
+ trusted_subjects=subject_list,
+ trusted_issuers=issuer_list,
+ staging=staging,
+ offline=offline,
+ )
+
+ safe_advisory_id = validate_record_id(advisory.record_id)
+ previous_integrated = None
+ existing = sync_entries.get(safe_advisory_id)
+ if isinstance(existing, dict):
+ token = existing.get("integrated_time")
+ if isinstance(token, str):
+ previous_integrated = token
+ guard_violations = evaluate_advisory_sync_guards(
+ integrated_time=verification.get("integrated_time"),
+ previous_integrated_time=previous_integrated,
+ max_age_days=max_bundle_age_days,
+ )
+ if guard_violations:
+ raise ValueError("; ".join(guard_violations))
+
+ record_out = advisories_dir / f"{safe_advisory_id}.json"
+ bundle_out = advisories_dir / f"{safe_advisory_id}.json.sigstore.json"
+ record_out.write_bytes(record_bytes)
+ bundle_out.write_bytes(bundle_bytes)
+
+ sync_entries[safe_advisory_id] = {
+ "integrated_time": verification.get("integrated_time"),
+ "updated_at": datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat(),
+ "source_record": record_ref,
+ "source_bundle": bundle_ref,
+ }
+ imported += 1
+ results.append(
+ {
+ "advisory_id": safe_advisory_id,
+ "status": "imported",
+ "integrated_time": verification.get("integrated_time"),
+ "path": str(record_out),
+ "bundle_path": str(bundle_out),
+ }
+ )
+ except Exception as exc:
+ results.append(
+ {
+ "record": record_ref,
+ "bundle": bundle_ref,
+ "status": "rejected",
+ "error": str(exc),
+ }
+ )
+
+ _save_advisory_sync_state(root, sync_state)
+ rejected = [r for r in results if r.get("status") == "rejected"]
+ payload = {
+ "ok": len(rejected) == 0,
+ "feed": feed,
+ "entry_count": len(entries),
+ "imported_count": imported,
+ "rejected_count": len(rejected),
+ "results": results,
+ }
+ _emit(payload, json_output)
+ if rejected:
+ raise typer.Exit(code=1)
+ except typer.Exit:
+ raise
+ except Exception as e:
+ _emit({"ok": False, "error": str(e), "command": "advisory sync"}, json_output)
+ raise typer.Exit(code=1)
+
+
@record_app.command("verify")
def record_verify(
record: str = typer.Argument(..., help="Path to signed record JSON"),
@@ -637,7 +1131,9 @@ def record_verify(
@record_app.command("create")
def record_create(
- kind: str = typer.Option(..., help="Record kind (e.g., policy, advisory, waiver, incident)"),
+ kind: str = typer.Option(
+ ..., help="Record kind (e.g., policy, advisory, bundle, waiver, incident)"
+ ),
record_id: str = typer.Option(..., help="Record identifier"),
input: str = typer.Option(..., "--input", "-i", help="Path to record payload JSON"),
output: Optional[str] = typer.Option(None, help="Output path for signed record JSON"),
@@ -682,6 +1178,186 @@ def record_create(
raise typer.Exit(code=1)
+def _policy_template_for_assurance_level(
+ *,
+ assurance_level: str,
+ advisory_subject_overrides: List[str],
+ max_bundle_age_days: Optional[int],
+) -> Dict[str, Any]:
+ if assurance_level not in {"level-1", "level-2", "level-3"}:
+ raise ValueError(f"unsupported assurance level: {assurance_level}")
+ if max_bundle_age_days is not None and max_bundle_age_days < 1:
+ raise ValueError("max_bundle_age_days must be >= 1")
+
+ policy: Dict[str, Any] = {
+ "policy_type": "aixv.policy/v1",
+ "allow_subjects": ["REPLACE_WITH_TRUSTED_SUBJECT"],
+ }
+ if assurance_level == "level-1":
+ policy["require_signed_advisories"] = False
+ if assurance_level in {"level-2", "level-3"}:
+ policy["require_signed_advisories"] = True
+ policy["advisory_allow_subjects"] = (
+ advisory_subject_overrides
+ if advisory_subject_overrides
+ else ["REPLACE_WITH_TRUSTED_ADVISORY_SIGNER"]
+ )
+ if assurance_level == "level-3":
+ if max_bundle_age_days is None:
+ raise ValueError("level-3 template requires --max-bundle-age-days")
+ policy["max_bundle_age_days"] = max_bundle_age_days
+ policy["require_no_active_advisories"] = True
+
+ validated = validate_policy_payload(policy)
+ violations = evaluate_assurance_level_requirements(
+ assurance_level=assurance_level,
+ policy_provided=True,
+ require_signed_policy=True,
+ policy=validated,
+ )
+ if violations:
+ raise ValueError("; ".join(violations))
+ return validated
+
+
+def _migrate_policy_to_assurance_level(
+ *,
+ policy: Dict[str, Any],
+ assurance_level: str,
+ advisory_subject_overrides: List[str],
+ max_bundle_age_days: Optional[int],
+) -> Dict[str, Any]:
+ if assurance_level not in {"level-1", "level-2", "level-3"}:
+ raise ValueError(f"unsupported assurance level: {assurance_level}")
+ if max_bundle_age_days is not None and max_bundle_age_days < 1:
+ raise ValueError("max_bundle_age_days must be >= 1")
+
+ migrated: Dict[str, Any] = dict(policy)
+ if assurance_level in {"level-2", "level-3"}:
+ migrated["require_signed_advisories"] = True
+ if advisory_subject_overrides:
+ migrated["advisory_allow_subjects"] = advisory_subject_overrides
+ advisory_trust = advisory_trust_constraints_from_policy(migrated)
+ if len(advisory_trust["subjects"]) < 1:
+ raise ValueError(
+ "level-2/level-3 migration requires advisory trust subjects; "
+ "set policy subject/allow_subjects or pass --advisory-trusted-subject(s)"
+ )
+ if assurance_level == "level-3":
+ migrated["require_no_active_advisories"] = True
+ if max_bundle_age_days is not None:
+ migrated["max_bundle_age_days"] = max_bundle_age_days
+ elif migrated.get("max_bundle_age_days") is None:
+ raise ValueError(
+ "level-3 migration requires max_bundle_age_days; pass --max-bundle-age-days"
+ )
+
+ validated = validate_policy_payload(migrated)
+ violations = evaluate_assurance_level_requirements(
+ assurance_level=assurance_level,
+ policy_provided=True,
+ require_signed_policy=True,
+ policy=validated,
+ )
+ if violations:
+ raise ValueError("; ".join(violations))
+ return validated
+
+
+@policy_app.command("template")
+def policy_template(
+ assurance_level: str = typer.Option(..., "--assurance-level", help="level-1|level-2|level-3"),
+ advisory_trusted_subject: Optional[str] = typer.Option(
+ None, help="Trusted advisory signer subject"
+ ),
+ advisory_trusted_subjects: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted advisory signer subjects"
+ ),
+ max_bundle_age_days: Optional[int] = typer.Option(None, help="Required for level-3 templates"),
+ output: Optional[str] = typer.Option(None, help="Optional output path for policy template"),
+ json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
+):
+ """Generate a baseline policy template for an assurance level."""
+ try:
+ advisory_subjects: List[str] = []
+ if advisory_trusted_subject:
+ advisory_subjects.append(advisory_trusted_subject)
+ advisory_subjects.extend(_parse_csv_values(advisory_trusted_subjects))
+ policy = _policy_template_for_assurance_level(
+ assurance_level=assurance_level,
+ advisory_subject_overrides=advisory_subjects,
+ max_bundle_age_days=max_bundle_age_days,
+ )
+ out_path = Path(output) if output else None
+ if out_path is not None:
+ write_json(out_path, policy)
+ _emit(
+ {
+ "ok": True,
+ "assurance_level": assurance_level,
+ "policy": policy,
+ "path": str(out_path) if out_path else None,
+ },
+ json_output,
+ )
+ except Exception as e:
+ _emit({"ok": False, "error": str(e), "command": "policy template"}, json_output)
+ raise typer.Exit(code=1)
+
+
+@policy_app.command("migrate")
+def policy_migrate(
+ input: str = typer.Option(..., "--input", "-i", help="Path to policy JSON or policy record"),
+ to_assurance_level: str = typer.Option(
+ ..., "--to-assurance-level", help="level-1|level-2|level-3"
+ ),
+ advisory_trusted_subject: Optional[str] = typer.Option(
+ None, help="Trusted advisory signer subject override"
+ ),
+ advisory_trusted_subjects: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted advisory signer subject overrides"
+ ),
+ max_bundle_age_days: Optional[int] = typer.Option(
+ None, help="Required when migrating to level-3 without existing freshness policy"
+ ),
+ output: Optional[str] = typer.Option(None, help="Output path for migrated policy JSON"),
+ json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
+):
+ """Migrate an existing policy payload to a target assurance level."""
+ try:
+ input_path = ensure_artifact(input)
+ base_policy = policy_from_file(str(input_path))
+ advisory_subjects: List[str] = []
+ if advisory_trusted_subject:
+ advisory_subjects.append(advisory_trusted_subject)
+ advisory_subjects.extend(_parse_csv_values(advisory_trusted_subjects))
+ migrated = _migrate_policy_to_assurance_level(
+ policy=base_policy,
+ assurance_level=to_assurance_level,
+ advisory_subject_overrides=advisory_subjects,
+ max_bundle_age_days=max_bundle_age_days,
+ )
+ out_path = (
+ Path(output)
+ if output
+ else input_path.with_name(f"{input_path.stem}.{to_assurance_level}.policy.json")
+ )
+ write_json(out_path, migrated)
+ _emit(
+ {
+ "ok": True,
+ "input": str(input_path),
+ "assurance_level": to_assurance_level,
+ "path": str(out_path),
+ "policy": migrated,
+ },
+ json_output,
+ )
+ except Exception as e:
+ _emit({"ok": False, "error": str(e), "command": "policy migrate"}, json_output)
+ raise typer.Exit(code=1)
+
+
@policy_app.command("create")
def policy_create(
input: str = typer.Option(..., "--input", "-i", help="Path to policy JSON payload"),
@@ -778,6 +1454,118 @@ def policy_verify(
raise typer.Exit(code=1)
+@bundle_app.command("create")
+def bundle_create(
+ input: str = typer.Option(..., "--input", "-i", help="Path to bundle payload JSON"),
+ bundle_id: Optional[str] = typer.Option(None, help="Bundle identifier override"),
+ output: Optional[str] = typer.Option(None, help="Output path for bundle record"),
+ sign: bool = typer.Option(False, help="Sign bundle record with Sigstore"),
+ identity_token: Optional[str] = typer.Option(None, help="OIDC token for keyless signing"),
+ identity_token_env: str = typer.Option(
+ "SIGSTORE_ID_TOKEN", help="Environment variable containing OIDC token"
+ ),
+ interactive_oidc: bool = typer.Option(
+ False, help="Acquire OIDC token interactively via browser"
+ ),
+ staging: bool = typer.Option(False, help="Use Sigstore staging instance"),
+ offline: bool = typer.Option(False, help="Use cached trust root only"),
+ json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
+):
+ """Create a strict multi-artifact bundle record."""
+ try:
+ input_path = ensure_artifact(input)
+ payload = read_json(str(input_path))
+ if bundle_id is not None:
+ payload["bundle_id"] = bundle_id
+ validated = validate_record_payload("bundle", payload)
+ resolved_bundle_id = validated["bundle_id"]
+ result = _create_and_optionally_sign_record(
+ kind="bundle",
+ record_id=resolved_bundle_id,
+ payload=validated,
+ output=output,
+ sign=sign,
+ identity_token=identity_token,
+ identity_token_env=identity_token_env,
+ interactive_oidc=interactive_oidc,
+ staging=staging,
+ offline=offline,
+ )
+ result["bundle_id"] = resolved_bundle_id
+ _emit(result, json_output)
+ except Exception as e:
+ _emit({"ok": False, "error": str(e), "command": "bundle create"}, json_output)
+ raise typer.Exit(code=1)
+
+
+@bundle_app.command("verify")
+def bundle_verify(
+ bundle_record: str = typer.Argument(..., help="Path to bundle record JSON"),
+ bundle: Optional[str] = typer.Option(None, help="Sigstore bundle path for bundle record"),
+ require_member: Optional[str] = typer.Option(
+ None, help="Required member digest (sha256:...) or path to local file"
+ ),
+ trusted_subject: Optional[str] = typer.Option(None, help="Trusted bundle signer subject"),
+ trusted_subjects: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted bundle signer subjects"
+ ),
+ trusted_issuers: Optional[str] = typer.Option(
+ None, help="Comma-separated trusted bundle signer issuers"
+ ),
+ staging: bool = typer.Option(False, help="Use Sigstore staging instance"),
+ offline: bool = typer.Option(False, help="Use cached trust root only"),
+ json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
+):
+ """Verify signed bundle record and optional required membership."""
+ try:
+ record_path = ensure_artifact(bundle_record)
+ bundle_record_obj = load_signed_record(str(record_path), expected_kind="bundle")
+ validated_bundle = validate_record_payload("bundle", bundle_record_obj.payload)
+ bundle_in = resolve_signature_bundle_path(record_path, bundle)
+ if not bundle_in.exists():
+ raise FileNotFoundError(f"bundle not found: {bundle_in}")
+ subject_list = []
+ if trusted_subject:
+ subject_list.append(trusted_subject)
+ subject_list.extend(_parse_csv_values(trusted_subjects))
+ issuer_list = _parse_csv_values(trusted_issuers)
+ if len(subject_list) < 1:
+ raise ValueError("trusted subject constraints required")
+ signature_verification = verify_signed_record(
+ record_path=record_path,
+ bundle_path=bundle_in,
+ trusted_subjects=subject_list,
+ trusted_issuers=issuer_list,
+ staging=staging,
+ offline=offline,
+ )
+
+ required_digest: Optional[str] = None
+ if require_member:
+ candidate_path = Path(require_member)
+ if candidate_path.exists() and candidate_path.is_file():
+ required_digest = sha256_file(candidate_path)
+ else:
+ required_digest = normalize_sha256_digest(require_member)
+ if required_digest not in validated_bundle.get("members", []):
+ raise ValueError(f"required member digest not found in bundle: {required_digest}")
+
+ _emit(
+ {
+ "ok": True,
+ "bundle_path": str(record_path),
+ "bundle_record_id": bundle_record_obj.record_id,
+ "bundle_payload": validated_bundle,
+ "required_member": required_digest,
+ "signature_verification": signature_verification,
+ },
+ json_output,
+ )
+ except Exception as e:
+ _emit({"ok": False, "error": str(e), "command": "bundle verify"}, json_output)
+ raise typer.Exit(code=1)
+
+
@app.command()
def version(
json_output: bool = typer.Option(False, "--json", help="Deterministic JSON output"),
diff --git a/src/aixv/conformance.py b/src/aixv/conformance.py
index f49a035..460cb03 100644
--- a/src/aixv/conformance.py
+++ b/src/aixv/conformance.py
@@ -3,11 +3,21 @@
import json
from datetime import datetime, timezone
from pathlib import Path
+from tempfile import TemporaryDirectory
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, ConfigDict, Field
-from aixv.core import load_signed_record, validate_policy_payload
+from aixv.core import (
+ advisory_trust_constraints_from_policy,
+ evaluate_advisory_policy,
+ evaluate_advisory_sync_guards,
+ load_signed_record,
+ validate_bundle_payload,
+ validate_policy_payload,
+ verify_artifact_with_sigstore,
+ verify_statement_with_sigstore,
+)
FIXTURES_DIR = Path("docs/conformance/fixtures")
@@ -135,12 +145,238 @@ def _check_signed_record_fixture() -> ConformanceCheck:
)
+def _check_policy_unknown_field_rejected() -> ConformanceCheck:
+ payload = {
+ "policy_type": "aixv.policy/v1",
+ "allow_subjects": ["alice@example.com"],
+ "unexpected": True,
+ }
+ try:
+ validate_policy_payload(payload)
+ except Exception:
+ return ConformanceCheck(
+ check_id="policy.unknown-field.reject.v1",
+ status="pass",
+ evidence={"expected_rejection": True},
+ )
+ return ConformanceCheck(
+ check_id="policy.unknown-field.reject.v1",
+ status="fail",
+ evidence={},
+ error="policy unknown field was accepted",
+ )
+
+
+def _check_advisory_trust_subject_fallback() -> ConformanceCheck:
+ policy = {
+ "policy_type": "aixv.policy/v1",
+ "subject": "security-policy@aixv.org",
+ "require_signed_advisories": True,
+ "require_no_active_advisories": True,
+ }
+ constraints = advisory_trust_constraints_from_policy(policy)
+ if constraints["subjects"] == ["security-policy@aixv.org"]:
+ return ConformanceCheck(
+ check_id="policy.advisory-trust.subject-fallback.v1",
+ status="pass",
+ evidence={"subjects": constraints["subjects"]},
+ )
+ return ConformanceCheck(
+ check_id="policy.advisory-trust.subject-fallback.v1",
+ status="fail",
+ evidence={"subjects": constraints["subjects"]},
+ error="advisory trust root fallback did not include policy subject",
+ )
+
+
+def _check_signed_advisory_policy_semantics() -> ConformanceCheck:
+ policy = {
+ "policy_type": "aixv.policy/v1",
+ "allow_subjects": ["security-policy@aixv.org"],
+ "require_signed_advisories": True,
+ "require_no_active_advisories": True,
+ }
+ untrusted_only = [
+ {"status": "active", "_trust": {"signed_and_trusted": False}},
+ ]
+ trusted_active = [
+ {"status": "active", "_trust": {"signed_and_trusted": True}},
+ ]
+ untrusted_violations = evaluate_advisory_policy(policy=policy, advisories=untrusted_only)
+ trusted_violations = evaluate_advisory_policy(policy=policy, advisories=trusted_active)
+ if len(untrusted_violations) < 1 and len(trusted_violations) > 0:
+ return ConformanceCheck(
+ check_id="advisory.signed-policy.filtering.v1",
+ status="pass",
+ evidence={
+ "untrusted_violations": untrusted_violations,
+ "trusted_violations": trusted_violations,
+ },
+ )
+ return ConformanceCheck(
+ check_id="advisory.signed-policy.filtering.v1",
+ status="fail",
+ evidence={
+ "untrusted_violations": untrusted_violations,
+ "trusted_violations": trusted_violations,
+ },
+ error="signed advisory policy semantics failed",
+ )
+
+
+def _check_invalid_artifact_bundle_rejected() -> ConformanceCheck:
+ with TemporaryDirectory() as tmp:
+ artifact = Path(tmp) / "artifact.bin"
+ bundle = Path(tmp) / "artifact.bin.sigstore.json"
+ artifact.write_bytes(b"aixv")
+ bundle.write_text("{}", encoding="utf-8")
+ try:
+ verify_artifact_with_sigstore(
+ artifact=artifact,
+ bundle_in=bundle,
+ subject="alice@example.com",
+ issuer=None,
+ allow_subjects=[],
+ allow_issuers=[],
+ staging=False,
+ offline=True,
+ )
+ except Exception:
+ return ConformanceCheck(
+ check_id="crypto.invalid-bundle.artifact.reject.v1",
+ status="pass",
+ evidence={"expected_rejection": True},
+ )
+ return ConformanceCheck(
+ check_id="crypto.invalid-bundle.artifact.reject.v1",
+ status="fail",
+ evidence={},
+ error="invalid artifact bundle unexpectedly verified",
+ )
+
+
+def _check_invalid_statement_bundle_rejected() -> ConformanceCheck:
+ statement = {
+ "_type": "https://in-toto.io/Statement/v1",
+ "subject": [{"name": "artifact.bin", "digest": {"sha256": "0" * 64}}],
+ "predicateType": "https://aixv.org/attestation/training/v1",
+ "predicate": {
+ "parent_models": [{"digest": f"sha256:{'0' * 64}"}],
+ "datasets": [],
+ "training_run": {
+ "framework": "pytorch",
+ "framework_version": "2.2.0",
+ "code_digest": f"sha256:{'0' * 64}",
+ "environment_digest": f"sha256:{'0' * 64}",
+ },
+ "hyperparameters": {},
+ },
+ }
+ with TemporaryDirectory() as tmp:
+ bundle = Path(tmp) / "statement.sigstore.json"
+ bundle.write_text("{}", encoding="utf-8")
+ try:
+ verify_statement_with_sigstore(
+ statement=statement,
+ bundle_in=bundle,
+ subject="alice@example.com",
+ issuer=None,
+ allow_subjects=[],
+ allow_issuers=[],
+ staging=False,
+ offline=True,
+ )
+ except Exception:
+ return ConformanceCheck(
+ check_id="crypto.invalid-bundle.statement.reject.v1",
+ status="pass",
+ evidence={"expected_rejection": True},
+ )
+ return ConformanceCheck(
+ check_id="crypto.invalid-bundle.statement.reject.v1",
+ status="fail",
+ evidence={},
+ error="invalid statement bundle unexpectedly verified",
+ )
+
+
+def _check_bundle_schema_validation() -> ConformanceCheck:
+ payload = {
+ "bundle_type": "aixv.bundle/v1",
+ "bundle_id": "bundle-main",
+ "primary": "1" * 64,
+ "members": [f"sha256:{'2' * 64}"],
+ }
+ try:
+ validated = validate_bundle_payload(payload)
+ except Exception as exc:
+ return ConformanceCheck(
+ check_id="bundle.schema.validation.v1",
+ status="fail",
+ evidence={"payload": payload},
+ error=str(exc),
+ )
+ if validated["primary"] not in validated["members"]:
+ return ConformanceCheck(
+ check_id="bundle.schema.validation.v1",
+ status="fail",
+ evidence={"validated": validated},
+ error="bundle primary digest missing from members",
+ )
+ return ConformanceCheck(
+ check_id="bundle.schema.validation.v1",
+ status="pass",
+ evidence={"member_count": len(validated["members"])},
+ )
+
+
+def _check_advisory_sync_replay_and_freshness() -> ConformanceCheck:
+ replay_violations = evaluate_advisory_sync_guards(
+ integrated_time="2026-02-16T00:00:00+00:00",
+ previous_integrated_time="2026-02-16T00:00:00+00:00",
+ max_age_days=None,
+ )
+ stale_violations = evaluate_advisory_sync_guards(
+ integrated_time="2026-01-01T00:00:00+00:00",
+ previous_integrated_time=None,
+ max_age_days=7,
+ now=datetime(2026, 2, 1, tzinfo=timezone.utc),
+ )
+ replay_blocked = any("replay/stale" in v for v in replay_violations)
+ stale_blocked = any("max_age_days" in v for v in stale_violations)
+ if replay_blocked and stale_blocked:
+ return ConformanceCheck(
+ check_id="advisory.sync.replay-freshness.v1",
+ status="pass",
+ evidence={
+ "replay_violations": replay_violations,
+ "stale_violations": stale_violations,
+ },
+ )
+ return ConformanceCheck(
+ check_id="advisory.sync.replay-freshness.v1",
+ status="fail",
+ evidence={
+ "replay_violations": replay_violations,
+ "stale_violations": stale_violations,
+ },
+ error="advisory sync replay/freshness guards failed",
+ )
+
+
def run_conformance_checks() -> ConformanceReport:
checks = [
_check_fixtures_exist(),
_check_policy_valid_fixture(),
_check_policy_invalid_fixture(),
_check_signed_record_fixture(),
+ _check_policy_unknown_field_rejected(),
+ _check_advisory_trust_subject_fallback(),
+ _check_signed_advisory_policy_semantics(),
+ _check_bundle_schema_validation(),
+ _check_advisory_sync_replay_and_freshness(),
+ _check_invalid_artifact_bundle_rejected(),
+ _check_invalid_statement_bundle_rejected(),
]
overall_status = "pass" if all(c.status == "pass" for c in checks) else "fail"
return ConformanceReport(
diff --git a/src/aixv/core.py b/src/aixv/core.py
index ad2ccb7..256d7bf 100644
--- a/src/aixv/core.py
+++ b/src/aixv/core.py
@@ -3,6 +3,8 @@
import hashlib
import json
import os
+import re
+from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
@@ -16,6 +18,7 @@
from cryptography.x509.oid import NameOID
from pydantic import BaseModel, ConfigDict, Field, ValidationError
from sigstore.dsse import Statement
+from sigstore.hashes import HashAlgorithm, Hashed
from sigstore.models import Bundle, ClientTrustConfig
from sigstore.oidc import IdentityToken, Issuer
from sigstore.sign import SigningContext
@@ -31,6 +34,9 @@
ALLOWED_SEVERITIES = {"low", "medium", "high", "critical"}
ALLOWED_ADVISORY_STATUS = {"active", "mitigated", "withdrawn"}
SEVERITY_RANK = {"low": 1, "medium": 2, "high": 3, "critical": 4}
+ATTESTATION_RECORD_SCHEMA = "aixv.attestation-record/v1"
+ALLOWED_ASSURANCE_LEVELS = {"level-1", "level-2", "level-3"}
+RECORD_ID_PATTERN = re.compile(r"^[A-Za-z0-9._-]{1,128}$")
class ParentModel(BaseModel):
@@ -101,6 +107,26 @@ def model_post_init(self, __context: Any) -> None:
raise ValueError(f"invalid status: {self.status}")
+class ArtifactBundle(BaseModel):
+ bundle_type: str = "aixv.bundle/v1"
+ bundle_id: str
+ primary: str
+ members: List[str] = Field(min_length=1)
+
+ model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")
+
+ def model_post_init(self, __context: Any) -> None:
+ self.primary = normalize_sha256_digest(self.primary)
+ normalized_members: List[str] = []
+ for member in self.members:
+ digest = normalize_sha256_digest(member)
+ if digest not in normalized_members:
+ normalized_members.append(digest)
+ if self.primary not in normalized_members:
+ normalized_members.append(self.primary)
+ self.members = normalized_members
+
+
class VerifyPolicy(BaseModel):
policy_type: str = "aixv.policy/v1"
subject: Optional[str] = None
@@ -181,6 +207,28 @@ def normalize_sha256_digest(value: str) -> str:
return f"sha256:{token}"
+def _normalize_string_list(raw: Any) -> List[str]:
+ if not isinstance(raw, list):
+ return []
+ out: List[str] = []
+ for item in raw:
+ if not isinstance(item, str):
+ continue
+ value = item.strip()
+ if value and value not in out:
+ out.append(value)
+ return out
+
+
+def validate_record_id(record_id: str) -> str:
+ token = record_id.strip()
+ if not RECORD_ID_PATTERN.fullmatch(token):
+ raise ValueError(
+ "invalid record_id: must match ^[A-Za-z0-9._-]{1,128}$ and must not include path separators"
+ )
+ return token
+
+
def ensure_artifact(path: str) -> Path:
artifact = Path(path)
if not artifact.exists():
@@ -202,7 +250,7 @@ def write_json(path: Path, payload: Dict[str, Any]) -> None:
f.write("\n")
-def sha256_file(path: Path) -> str:
+def _sha256_file_digest(path: Path) -> bytes:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
@@ -210,7 +258,15 @@ def sha256_file(path: Path) -> str:
if not chunk:
break
h.update(chunk)
- return f"sha256:{h.hexdigest()}"
+ return h.digest()
+
+
+def sha256_file(path: Path) -> str:
+ return f"sha256:{_sha256_file_digest(path).hex()}"
+
+
+def sigstore_hashed_input(path: Path) -> Hashed:
+ return Hashed(algorithm=HashAlgorithm.SHA2_256, digest=_sha256_file_digest(path))
def bundle_path_for(artifact: Path) -> Path:
@@ -255,6 +311,75 @@ def validate_policy_payload(raw: Dict[str, Any]) -> Dict[str, Any]:
return parsed.model_dump()
+def validate_bundle_payload(raw: Dict[str, Any]) -> Dict[str, Any]:
+ try:
+ parsed = ArtifactBundle.model_validate(raw)
+ except ValidationError as e:
+ raise ValueError(str(e))
+ return parsed.model_dump()
+
+
+def advisory_trust_constraints_from_policy(policy: Dict[str, Any]) -> Dict[str, List[str]]:
+ subjects = _normalize_string_list(policy.get("advisory_allow_subjects"))
+ issuers = _normalize_string_list(policy.get("advisory_allow_issuers"))
+
+ if not subjects:
+ subjects = _normalize_string_list(policy.get("allow_subjects"))
+ if not subjects:
+ subject = policy.get("subject")
+ if isinstance(subject, str) and subject.strip():
+ subjects = [subject.strip()]
+
+ if not issuers:
+ issuers = _normalize_string_list(policy.get("allow_issuers"))
+ if not issuers:
+ issuer = policy.get("issuer")
+ if isinstance(issuer, str) and issuer.strip():
+ issuers = [issuer.strip()]
+
+ return {"subjects": subjects, "issuers": issuers}
+
+
+def evaluate_assurance_level_requirements(
+ *,
+ assurance_level: Optional[str],
+ policy_provided: bool,
+ require_signed_policy: bool,
+ policy: Dict[str, Any],
+) -> List[str]:
+ if assurance_level is None:
+ return []
+ if assurance_level not in ALLOWED_ASSURANCE_LEVELS:
+ return [f"unsupported assurance level: {assurance_level}"]
+
+ violations: List[str] = []
+ if assurance_level in {"level-2", "level-3"}:
+ if not policy_provided:
+ violations.append(f"assurance level {assurance_level} requires --policy")
+ if not require_signed_policy:
+ violations.append(
+ f"assurance level {assurance_level} requires signed policy verification"
+ )
+ if not bool(policy.get("require_signed_advisories", False)):
+ violations.append(
+ f"assurance level {assurance_level} requires require_signed_advisories=true"
+ )
+ advisory_trust = advisory_trust_constraints_from_policy(policy)
+ if len(advisory_trust["subjects"]) < 1:
+ violations.append(
+ f"assurance level {assurance_level} requires advisory trust subjects via "
+ "advisory_allow_subjects or subject/allow_subjects"
+ )
+
+ if assurance_level == "level-3":
+ if policy.get("max_bundle_age_days") is None:
+ violations.append("assurance level level-3 requires max_bundle_age_days")
+ if not bool(policy.get("require_no_active_advisories", False)):
+ violations.append("assurance level level-3 requires require_no_active_advisories=true")
+
+ return violations
+
+
def resolve_predicate_uri(predicate: str) -> str:
return PREDICATE_ALIASES.get(predicate, predicate)
@@ -332,12 +457,13 @@ def sign_artifact_with_sigstore(
else ClientTrustConfig.production(offline=offline)
)
ctx = SigningContext.from_trust_config(trust_config)
+ hashed_input = sigstore_hashed_input(artifact)
with ctx.signer(token, cache=True) as signer:
- bundle = signer.sign_artifact(artifact.read_bytes())
+ bundle = signer.sign_artifact(hashed_input)
bundle_out.write_text(bundle.to_json(), encoding="utf-8")
return {
"bundle_path": str(bundle_out),
- "digest": sha256_file(artifact),
+ "digest": f"sha256:{hashed_input.digest.hex()}",
"identity": {"subject": token.identity, "issuer": token.issuer},
"staging": staging,
}
@@ -378,6 +504,19 @@ def sign_statement_with_sigstore(
}
+def _bundle_integrated_time(bundle: Bundle) -> Optional[str]:
+ if bundle.log_entry and bundle.log_entry._inner.integrated_time:
+ return (
+ datetime.fromtimestamp(
+ bundle.log_entry._inner.integrated_time,
+ tz=timezone.utc,
+ )
+ .replace(microsecond=0)
+ .isoformat()
+ )
+ return None
+
+
def verify_artifact_with_sigstore(
*,
artifact: Path,
@@ -399,23 +538,15 @@ def verify_artifact_with_sigstore(
allow_subjects=allow_subjects or [],
allow_issuers=allow_issuers or [],
)
- verifier.verify_artifact(artifact.read_bytes(), bundle, identity_policy)
+ hashed_input = sigstore_hashed_input(artifact)
+ verifier.verify_artifact(hashed_input, bundle, identity_policy)
cert = bundle.signing_certificate
subject_candidates = _extract_subject_candidates(cert)
issuer_value = _extract_oidc_issuer(cert)
- integrated_time = None
- if bundle.log_entry and bundle.log_entry._inner.integrated_time:
- integrated_time = (
- datetime.fromtimestamp(
- bundle.log_entry._inner.integrated_time,
- tz=timezone.utc,
- )
- .replace(microsecond=0)
- .isoformat()
- )
+ integrated_time = _bundle_integrated_time(bundle)
return {
"verified": True,
- "digest": sha256_file(artifact),
+ "digest": f"sha256:{hashed_input.digest.hex()}",
"bundle_path": str(bundle_in),
"expected_subject": subject,
"expected_issuer": issuer,
@@ -426,6 +557,56 @@ def verify_artifact_with_sigstore(
}
+def verify_statement_with_sigstore(
+ *,
+ statement: Dict[str, Any],
+ bundle_in: Path,
+ subject: Optional[str],
+ issuer: Optional[str],
+ allow_subjects: Optional[List[str]] = None,
+ allow_issuers: Optional[List[str]] = None,
+ staging: bool,
+ offline: bool,
+) -> Dict[str, Any]:
+ bundle = Bundle.from_json(bundle_in.read_text(encoding="utf-8"))
+ verifier = (
+ Verifier.staging(offline=offline) if staging else Verifier.production(offline=offline)
+ )
+ identity_policy = build_sigstore_identity_policy(
+ subject=subject,
+ issuer=issuer,
+ allow_subjects=allow_subjects or [],
+ allow_issuers=allow_issuers or [],
+ )
+ payload_type, payload_bytes = verifier.verify_dsse(bundle, identity_policy)
+ try:
+ verified_statement = json.loads(payload_bytes.decode("utf-8"))
+ except Exception as exc:
+ raise ValueError(f"verified DSSE payload is not valid UTF-8 JSON: {exc}") from exc
+ if not isinstance(verified_statement, dict):
+ raise ValueError("verified DSSE payload is not a JSON object statement")
+ if verified_statement.get("_type") != "https://in-toto.io/Statement/v1":
+ raise ValueError("verified DSSE payload is not an in-toto Statement/v1")
+ if verified_statement != statement:
+ raise ValueError("verified DSSE payload does not match expected statement content")
+
+ cert = bundle.signing_certificate
+ subject_candidates = _extract_subject_candidates(cert)
+ issuer_value = _extract_oidc_issuer(cert)
+ integrated_time = _bundle_integrated_time(bundle)
+ return {
+ "verified": True,
+ "bundle_path": str(bundle_in),
+ "payload_type": payload_type,
+ "expected_subject": subject,
+ "expected_issuer": issuer,
+ "actual_subjects": subject_candidates,
+ "actual_issuer": issuer_value,
+ "integrated_time": integrated_time,
+ "staging": staging,
+ }
+
+
def build_sigstore_identity_policy(
*,
subject: Optional[str],
@@ -459,29 +640,189 @@ def create_attestation_record(
artifact: Path,
predicate_uri: str,
statement: Dict[str, Any],
+ signature_bundle_path: Optional[str] = None,
) -> Path:
digest = sha256_file(artifact).replace(":", "_")
parts = predicate_uri.rstrip("/").split("/")
key = f"{parts[-2]}.{parts[-1]}" if len(parts) >= 2 else parts[-1]
path = attestation_store(root) / f"{digest}.{key}.json"
- write_json(path, statement)
+ write_json(
+ path,
+ {
+ "schema": ATTESTATION_RECORD_SCHEMA,
+ "created_at": now_iso(),
+ "predicate_type": predicate_uri,
+ "statement": statement,
+ "signature_bundle_path": signature_bundle_path,
+ },
+ )
return path
-def load_attestations_for_digest(root: Path, digest: str) -> List[Dict[str, Any]]:
+def _statement_has_subject_digest(statement: Dict[str, Any], digest: str) -> bool:
+ target = normalize_sha256_digest(digest)
+ subjects = statement.get("subject", [])
+ if not isinstance(subjects, list):
+ return False
+ for subject in subjects:
+ if not isinstance(subject, dict):
+ continue
+ subject_digest = subject.get("digest")
+ if not isinstance(subject_digest, dict):
+ continue
+ token = subject_digest.get("sha256")
+ if not isinstance(token, str):
+ continue
+ try:
+ if normalize_sha256_digest(token) == target:
+ return True
+ except ValueError:
+ continue
+ return False
+
+
+def load_attestation_records_for_digest(root: Path, digest: str) -> List[Dict[str, Any]]:
needle = digest.replace(":", "_")
+ target = normalize_sha256_digest(digest)
results: List[Dict[str, Any]] = []
store = attestation_store(root)
if not store.exists():
return results
for path in sorted(store.glob(f"{needle}.*.json")):
try:
- results.append(read_json(str(path)))
+ raw = read_json(str(path))
+ except Exception:
+ continue
+ statement: Optional[Dict[str, Any]] = None
+ signature_bundle_path: Optional[str] = None
+ if isinstance(raw, dict) and raw.get("schema") == ATTESTATION_RECORD_SCHEMA:
+ statement_candidate = raw.get("statement")
+ if isinstance(statement_candidate, dict):
+ statement = statement_candidate
+ bundle_candidate = raw.get("signature_bundle_path")
+ if isinstance(bundle_candidate, str) and bundle_candidate.strip():
+ signature_bundle_path = bundle_candidate
+ elif isinstance(raw, dict):
+ statement = raw
+ bundle_candidate = raw.get("signature_bundle_path")
+ if isinstance(bundle_candidate, str) and bundle_candidate.strip():
+ signature_bundle_path = bundle_candidate
+ if not isinstance(statement, dict):
+ continue
+ if not _statement_has_subject_digest(statement, target):
+ continue
+ if not signature_bundle_path:
+ default_bundle = path.with_name(f"{path.name}.sigstore.json")
+ if default_bundle.exists():
+ signature_bundle_path = str(default_bundle)
+ results.append(
+ {
+ "path": str(path),
+ "statement": statement,
+ "signature_bundle_path": signature_bundle_path,
+ }
+ )
+ return results
+
+
+def load_attestations_for_digest(root: Path, digest: str) -> List[Dict[str, Any]]:
+ results: List[Dict[str, Any]] = []
+ for entry in load_attestation_records_for_digest(root, digest):
+ statement = entry.get("statement")
+ if isinstance(statement, dict):
+ results.append(statement)
+ return results
+
+
+def load_all_attestation_records(root: Path) -> List[Dict[str, Any]]:
+ results: List[Dict[str, Any]] = []
+ store = attestation_store(root)
+ if not store.exists():
+ return results
+ for path in sorted(store.glob("*.json")):
+ try:
+ raw = read_json(str(path))
except Exception:
continue
+ statement: Optional[Dict[str, Any]] = None
+ signature_bundle_path: Optional[str] = None
+ if isinstance(raw, dict) and raw.get("schema") == ATTESTATION_RECORD_SCHEMA:
+ candidate = raw.get("statement")
+ if isinstance(candidate, dict):
+ statement = candidate
+ bundle_candidate = raw.get("signature_bundle_path")
+ if isinstance(bundle_candidate, str) and bundle_candidate.strip():
+ signature_bundle_path = bundle_candidate
+ elif isinstance(raw, dict):
+ statement = raw
+ bundle_candidate = raw.get("signature_bundle_path")
+ if isinstance(bundle_candidate, str) and bundle_candidate.strip():
+ signature_bundle_path = bundle_candidate
+ if not isinstance(statement, dict):
+ continue
+ if not signature_bundle_path:
+ default_bundle = path.with_name(f"{path.name}.sigstore.json")
+ if default_bundle.exists():
+ signature_bundle_path = str(default_bundle)
+ results.append(
+ {
+ "path": str(path),
+ "statement": statement,
+ "signature_bundle_path": signature_bundle_path,
+ }
+ )
return results
+def summarize_training_lineage_from_attestations(
+ attestations: List[Dict[str, Any]],
+) -> Dict[str, Any]:
+ parent_digests: List[str] = []
+ dataset_digests: List[str] = []
+ training_runs: List[Dict[str, Any]] = []
+
+ for att in attestations:
+ if att.get("predicateType") != PREDICATE_ALIASES["training"]:
+ continue
+ predicate = att.get("predicate", {})
+ if not isinstance(predicate, dict):
+ continue
+ for parent in predicate.get("parent_models", []):
+ if not isinstance(parent, dict):
+ continue
+ token = parent.get("digest")
+ if not isinstance(token, str):
+ continue
+ try:
+ digest = normalize_sha256_digest(token)
+ except ValueError:
+ continue
+ if digest not in parent_digests:
+ parent_digests.append(digest)
+ for dataset in predicate.get("datasets", []):
+ if not isinstance(dataset, dict):
+ continue
+ token = dataset.get("digest")
+ if not isinstance(token, str):
+ continue
+ try:
+ digest = normalize_sha256_digest(token)
+ except ValueError:
+ continue
+ if digest not in dataset_digests:
+ dataset_digests.append(digest)
+ run = predicate.get("training_run")
+ if isinstance(run, dict):
+ training_runs.append(run)
+
+ return {
+ "parent_digests": parent_digests,
+ "dataset_digests": dataset_digests,
+ "training_run_count": len(training_runs),
+ "training_runs": training_runs,
+ }
+
+
def create_advisory_record(
*,
root: Path,
@@ -553,31 +894,38 @@ def list_advisories(
continue
if normalized_candidate == target:
signed_and_trusted = False
+ bundle_ref: Optional[Path] = None
if bundle_path and isinstance(bundle_path, str):
- bundle_ref = Path(bundle_path)
- if not bundle_ref.is_absolute():
- bundle_ref = path.parent / bundle_ref
- if bundle_ref.exists():
- try:
- verify_artifact_with_sigstore(
- artifact=path,
- bundle_in=bundle_ref,
- subject=None,
- issuer=None,
- allow_subjects=trusted_subjects or [],
- allow_issuers=trusted_issuers or [],
- staging=staging,
- offline=offline,
- )
- signed_and_trusted = True
- except Exception:
- signed_and_trusted = False
+ candidate_ref = Path(bundle_path)
+ if not candidate_ref.is_absolute():
+ candidate_ref = path.parent / candidate_ref
+ if candidate_ref.exists():
+ bundle_ref = candidate_ref
+ if bundle_ref is None:
+ default_bundle = path.with_name(f"{path.name}.sigstore.json")
+ if default_bundle.exists():
+ bundle_ref = default_bundle
+ if bundle_ref is not None and bundle_ref.exists():
+ try:
+ verify_artifact_with_sigstore(
+ artifact=path,
+ bundle_in=bundle_ref,
+ subject=None,
+ issuer=None,
+ allow_subjects=trusted_subjects or [],
+ allow_issuers=trusted_issuers or [],
+ staging=staging,
+ offline=offline,
+ )
+ signed_and_trusted = True
+ except Exception:
+ signed_and_trusted = False
if require_signed and not signed_and_trusted:
break
advisory = dict(advisory)
advisory["_trust"] = {
"signed_and_trusted": signed_and_trusted,
- "signature_bundle_path": bundle_path,
+ "signature_bundle_path": str(bundle_ref) if bundle_ref else bundle_path,
}
out.append(advisory)
break
@@ -618,7 +966,8 @@ def resolve_signature_bundle_path(record_path: Path, explicit_bundle: Optional[s
bundle_ref = Path(record.signature_bundle_path)
if not bundle_ref.is_absolute():
bundle_ref = record_path.parent / bundle_ref
- return bundle_ref
+ if bundle_ref.exists():
+ return bundle_ref
except Exception:
pass
return record_path.with_name(f"{record_path.name}.sigstore.json")
@@ -655,6 +1004,8 @@ def validate_record_payload(kind: str, payload: Dict[str, Any]) -> Dict[str, Any
return validate_policy_payload(payload)
if kind == "advisory":
return validate_predicate(PREDICATE_ALIASES["advisory"], payload)
+ if kind == "bundle":
+ return validate_bundle_payload(payload)
return payload
@@ -717,13 +1068,18 @@ def create_record(
output_path: Optional[str] = None,
signature_bundle_path: Optional[str] = None,
) -> Path:
+ normalized_record_id = validate_record_id(record_id)
record = create_signed_record_payload(
kind=kind,
- record_id=record_id,
+ record_id=normalized_record_id,
payload=payload,
signature_bundle_path=signature_bundle_path,
)
- path = Path(output_path) if output_path else record_store(root, kind) / f"{record_id}.json"
+ path = (
+ Path(output_path)
+ if output_path
+ else record_store(root, kind) / f"{normalized_record_id}.json"
+ )
write_json(path, record)
return path
@@ -750,6 +1106,142 @@ def detect_parents_from_training_attestations(
return parents
+def trace_training_lineage_parents(
+ root: Path, artifact_digest: str, depth: int
+) -> List[Dict[str, Any]]:
+ if depth < 1:
+ return []
+ start = normalize_sha256_digest(artifact_digest)
+ visited = {start}
+ frontier = deque([(start, 1)])
+ out: List[Dict[str, Any]] = []
+
+ while frontier:
+ child_digest, current_depth = frontier.popleft()
+ attestations = load_attestations_for_digest(root, child_digest)
+ parents = detect_parents_from_training_attestations(attestations)
+ for parent in parents:
+ if not isinstance(parent, dict):
+ continue
+ candidate = parent.get("digest")
+ if not isinstance(candidate, str):
+ continue
+ try:
+ normalized_parent = normalize_sha256_digest(candidate)
+ except ValueError:
+ continue
+ entry = dict(parent)
+ entry["digest"] = normalized_parent
+ entry["child_digest"] = child_digest
+ entry["depth"] = current_depth
+ out.append(entry)
+ if current_depth < depth and normalized_parent not in visited:
+ visited.add(normalized_parent)
+ frontier.append((normalized_parent, current_depth + 1))
+
+ return out
+
+
+def trace_training_lineage_descendants(
+ root: Path, artifact_digest: str, depth: int
+) -> List[Dict[str, Any]]:
+ if depth < 1:
+ return []
+ start = normalize_sha256_digest(artifact_digest)
+ edges: Dict[str, List[str]] = {}
+ for entry in load_all_attestation_records(root):
+ statement = entry.get("statement")
+ if not isinstance(statement, dict):
+ continue
+ subjects = statement.get("subject", [])
+ if not isinstance(subjects, list):
+ continue
+ subject_digests: List[str] = []
+ for subject in subjects:
+ if not isinstance(subject, dict):
+ continue
+ digest_map = subject.get("digest")
+ if not isinstance(digest_map, dict):
+ continue
+ token = digest_map.get("sha256")
+ if not isinstance(token, str):
+ continue
+ try:
+ subject_digests.append(normalize_sha256_digest(token))
+ except ValueError:
+ continue
+ if not subject_digests:
+ continue
+ summary = summarize_training_lineage_from_attestations([statement])
+ parent_digests = summary["parent_digests"]
+ for parent in parent_digests:
+ edges.setdefault(parent, [])
+ for child in subject_digests:
+ if child not in edges[parent]:
+ edges[parent].append(child)
+
+ visited = {start}
+ frontier = deque([(start, 1)])
+ out: List[Dict[str, Any]] = []
+
+ while frontier:
+ parent, current_depth = frontier.popleft()
+ for child in edges.get(parent, []):
+ out.append({"digest": child, "parent_digest": parent, "depth": current_depth})
+ if current_depth < depth and child not in visited:
+ visited.add(child)
+ frontier.append((child, current_depth + 1))
+
+ return out
+
+
+def _sha256_hex_token(digest: str) -> str:
+ return normalize_sha256_digest(digest).split(":", 1)[1]
+
+
+def export_attestations_as_slsa(
+ *, artifact_digest: str, artifact_name: str, attestations: List[Dict[str, Any]]
+) -> Dict[str, Any]:
+ summary = summarize_training_lineage_from_attestations(attestations)
+ materials = summary["parent_digests"] + summary["dataset_digests"]
+ out_materials: List[Dict[str, Any]] = []
+ for digest in materials:
+ out_materials.append({"uri": digest, "digest": {"sha256": _sha256_hex_token(digest)}})
+ return {
+ "predicateType": "https://slsa.dev/provenance/v1",
+ "subject": [
+ {"name": artifact_name, "digest": {"sha256": _sha256_hex_token(artifact_digest)}}
+ ],
+ "buildDefinition": {
+ "buildType": "aixv/training",
+ "externalParameters": {"attestation_count": len(attestations)},
+ "resolvedDependencies": out_materials,
+ },
+ "runDetails": {
+ "builder": {"id": "aixv"},
+ "metadata": {"training_run_count": summary["training_run_count"]},
+ },
+ }
+
+
+def export_attestations_as_ml_bom(
+ *, artifact_digest: str, artifact_name: str, attestations: List[Dict[str, Any]]
+) -> Dict[str, Any]:
+ summary = summarize_training_lineage_from_attestations(attestations)
+ components: List[Dict[str, Any]] = [
+ {"name": artifact_name, "digest": artifact_digest, "role": "model"}
+ ]
+ for digest in summary["parent_digests"]:
+ components.append({"digest": digest, "role": "parent-model"})
+ for digest in summary["dataset_digests"]:
+ components.append({"digest": digest, "role": "dataset"})
+ return {
+ "bom_format": "aixv.ml-bom/v1",
+ "component_count": len(components),
+ "components": components,
+ }
+
+
def _extract_subject_candidates(cert: Any) -> List[str]:
out: List[str] = []
try:
@@ -861,13 +1353,68 @@ def evaluate_freshness_policy(
return violations
+def evaluate_advisory_sync_guards(
+ *,
+ integrated_time: Optional[str],
+ previous_integrated_time: Optional[str],
+ max_age_days: Optional[int],
+ now: Optional[datetime] = None,
+) -> List[str]:
+ violations: List[str] = []
+ if integrated_time is None:
+ return ["advisory sync requires bundle integrated time"]
+ try:
+ observed = datetime.fromisoformat(integrated_time)
+ except Exception:
+ return ["advisory sync integrated_time is invalid"]
+ if observed.tzinfo is None:
+ observed = observed.replace(tzinfo=timezone.utc)
+ observed = observed.astimezone(timezone.utc)
+
+ if previous_integrated_time is not None:
+ try:
+ previous = datetime.fromisoformat(previous_integrated_time)
+ if previous.tzinfo is None:
+ previous = previous.replace(tzinfo=timezone.utc)
+ previous = previous.astimezone(timezone.utc)
+ if observed <= previous:
+ violations.append(
+ "advisory replay/stale update detected: integrated_time did not advance"
+ )
+ except Exception:
+ violations.append("advisory sync state integrated_time is invalid")
+
+ if max_age_days is not None:
+ if max_age_days < 1:
+ violations.append("max_age_days must be >= 1")
+ else:
+ now_ts = now or datetime.now(tz=timezone.utc)
+ if now_ts.tzinfo is None:
+ now_ts = now_ts.replace(tzinfo=timezone.utc)
+ age = now_ts.astimezone(timezone.utc) - observed
+ if age.total_seconds() > int(max_age_days) * 86400:
+ violations.append(f"advisory bundle age exceeds max_age_days ({max_age_days})")
+
+ return violations
+
+
def evaluate_advisory_policy(
*,
policy: Dict[str, Any],
advisories: List[Dict[str, Any]],
) -> List[str]:
violations: List[str] = []
- active = [a for a in advisories if a.get("status") == "active"]
+ require_signed = bool(policy.get("require_signed_advisories", False))
+ considered = advisories
+ if require_signed:
+ considered = [
+ a
+ for a in advisories
+ if isinstance(a.get("_trust"), dict)
+ and a.get("_trust", {}).get("signed_and_trusted") is True
+ ]
+
+ active = [a for a in considered if a.get("status") == "active"]
if policy.get("require_no_active_advisories") and active:
violations.append("active advisories present while require_no_active_advisories=true")
diff --git a/tests/test_cli_contract.py b/tests/test_cli_contract.py
index 1319f5e..d36d7d2 100644
--- a/tests/test_cli_contract.py
+++ b/tests/test_cli_contract.py
@@ -105,3 +105,405 @@ def test_verify_missing_bundle_json_failure_contract() -> None:
"error": "bundle not found: model.safetensors.sigstore.json",
"ok": False,
}
+
+
+def test_advisory_verify_rejects_non_advisory_record() -> None:
+ with runner.isolated_filesystem():
+ policy_record = {
+ "schema": "aixv.signed-record/v1",
+ "kind": "policy",
+ "record_id": "policy-main",
+ "created_at": "2026-02-16T00:00:00+00:00",
+ "payload": {
+ "policy_type": "aixv.policy/v1",
+ "allow_subjects": ["alice@example.com"],
+ },
+ "signature_bundle_path": "policy.bundle.json",
+ }
+ Path("policy.json").write_text(json.dumps(policy_record), encoding="utf-8")
+
+ result = runner.invoke(app, ["advisory", "verify", "policy.json", "--json"])
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "advisory verify"
+ assert "expected record kind advisory" in out["error"]
+
+
+def test_provenance_require_signed_attestations_needs_trusted_subjects() -> None:
+ with runner.isolated_filesystem():
+ Path("model.safetensors").write_bytes(b"hello")
+ result = runner.invoke(
+ app,
+ [
+ "provenance",
+ "model.safetensors",
+ "--require-signed-attestations",
+ "--json",
+ ],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "provenance"
+ assert "signed attestation verification requires" in out["error"]
+
+
+def test_export_require_signed_attestations_needs_trusted_subjects() -> None:
+ with runner.isolated_filesystem():
+ Path("model.safetensors").write_bytes(b"hello")
+ result = runner.invoke(
+ app,
+ [
+ "export",
+ "model.safetensors",
+ "--format",
+ "in-toto",
+ "--require-signed-attestations",
+ "--json",
+ ],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "export"
+ assert "signed attestation verification requires" in out["error"]
+
+
+def test_rollback_default_signed_requires_identity_token() -> None:
+ with runner.isolated_filesystem():
+ Path("model.safetensors").write_bytes(b"hello")
+ result = runner.invoke(
+ app,
+ ["rollback", "model.safetensors", "--to", "sha256:abc", "--json"],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "rollback"
+ assert "missing OIDC token" in out["error"]
+
+
+def test_rollback_no_sign_emits_plain_record() -> None:
+ with runner.isolated_filesystem():
+ Path("model.safetensors").write_bytes(b"hello")
+ result = runner.invoke(
+ app,
+ [
+ "rollback",
+ "model.safetensors",
+ "--to",
+ "sha256:abc",
+ "--no-sign",
+ "--json",
+ ],
+ )
+ assert result.exit_code == 0
+ out = json.loads(result.stdout)
+ assert out["ok"] is True
+ assert out["signed"] is False
+ path = Path(out["rollback_record_path"])
+ assert path.exists()
+ payload = json.loads(path.read_text(encoding="utf-8"))
+ assert payload["event_type"] == "rollback"
+
+
+def test_verify_level_2_requires_policy() -> None:
+ with runner.isolated_filesystem():
+ Path("model.safetensors").write_bytes(b"hello")
+ Path("model.safetensors.sigstore.json").write_text("{}", encoding="utf-8")
+ result = runner.invoke(
+ app,
+ [
+ "verify",
+ "model.safetensors",
+ "--identity",
+ "alice@example.com",
+ "--assurance-level",
+ "level-2",
+ "--json",
+ ],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "verify"
+ assert "requires --policy" in out["error"]
+
+
+def test_provenance_impact_and_explain_views() -> None:
+ with runner.isolated_filesystem():
+ Path("model.safetensors").write_bytes(b"hello")
+ impact_result = runner.invoke(
+ app,
+ [
+ "provenance",
+ "model.safetensors",
+ "--view",
+ "impact",
+ "--json",
+ ],
+ )
+ assert impact_result.exit_code == 0
+ impact_payload = json.loads(impact_result.stdout)
+ assert impact_payload["view"] == "impact"
+ assert isinstance(impact_payload["descendants"], list)
+
+ explain_result = runner.invoke(
+ app,
+ [
+ "provenance",
+ "model.safetensors",
+ "--view",
+ "explain",
+ "--json",
+ ],
+ )
+ assert explain_result.exit_code == 0
+ explain_payload = json.loads(explain_result.stdout)
+ assert explain_payload["view"] == "explain"
+ assert isinstance(explain_payload["explain"], list)
+
+
+def test_export_slsa_and_ml_bom_shapes() -> None:
+ with runner.isolated_filesystem():
+ Path("model.safetensors").write_bytes(b"hello")
+
+ slsa = runner.invoke(
+ app,
+ ["export", "model.safetensors", "--format", "slsa", "--json"],
+ )
+ assert slsa.exit_code == 0
+ slsa_payload = json.loads(slsa.stdout)
+ assert slsa_payload["ok"] is True
+ assert slsa_payload["export"]["format"] == "slsa"
+ assert "provenance" in slsa_payload["export"]
+
+ ml_bom = runner.invoke(
+ app,
+ ["export", "model.safetensors", "--format", "ml-bom", "--json"],
+ )
+ assert ml_bom.exit_code == 0
+ ml_bom_payload = json.loads(ml_bom.stdout)
+ assert ml_bom_payload["ok"] is True
+ assert ml_bom_payload["export"]["format"] == "ml-bom"
+ assert "bom" in ml_bom_payload["export"]
+
+
+def test_bundle_create_validates_and_normalizes_members() -> None:
+ with runner.isolated_filesystem():
+ payload = {
+ "bundle_type": "aixv.bundle/v1",
+ "bundle_id": "bundle-main",
+ "primary": "1" * 64,
+ "members": [f"sha256:{'2' * 64}"],
+ }
+ Path("bundle.json").write_text(json.dumps(payload), encoding="utf-8")
+ result = runner.invoke(
+ app,
+ ["bundle", "create", "--input", "bundle.json", "--json"],
+ )
+ assert result.exit_code == 0
+ out = json.loads(result.stdout)
+ record = json.loads(Path(out["path"]).read_text(encoding="utf-8"))
+ assert record["kind"] == "bundle"
+ members = record["payload"]["members"]
+ assert f"sha256:{'1' * 64}" in members
+ assert f"sha256:{'2' * 64}" in members
+
+
+def test_bundle_verify_requires_trusted_subject_constraints() -> None:
+ with runner.isolated_filesystem():
+ bundle_record = {
+ "schema": "aixv.signed-record/v1",
+ "kind": "bundle",
+ "record_id": "bundle-main",
+ "created_at": "2026-02-16T00:00:00+00:00",
+ "payload": {
+ "bundle_type": "aixv.bundle/v1",
+ "bundle_id": "bundle-main",
+ "primary": f"sha256:{'1' * 64}",
+ "members": [f"sha256:{'1' * 64}", f"sha256:{'2' * 64}"],
+ },
+ "signature_bundle_path": "bundle.sigstore.json",
+ }
+ Path("bundle.record.json").write_text(json.dumps(bundle_record), encoding="utf-8")
+ Path("bundle.sigstore.json").write_text("{}", encoding="utf-8")
+ result = runner.invoke(
+ app,
+ ["bundle", "verify", "bundle.record.json", "--json"],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "bundle verify"
+ assert "trusted subject constraints required" in out["error"]
+
+
+def test_policy_template_level_3_requires_max_bundle_age() -> None:
+ result = runner.invoke(
+ app,
+ ["policy", "template", "--assurance-level", "level-3", "--json"],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "policy template"
+ assert "max-bundle-age-days" in out["error"]
+
+
+def test_policy_migrate_level_3_requires_bundle_age_if_missing() -> None:
+ with runner.isolated_filesystem():
+ Path("policy.json").write_text(
+ json.dumps(
+ {
+ "policy_type": "aixv.policy/v1",
+ "allow_subjects": ["alice@example.com"],
+ }
+ ),
+ encoding="utf-8",
+ )
+ result = runner.invoke(
+ app,
+ [
+ "policy",
+ "migrate",
+ "--input",
+ "policy.json",
+ "--to-assurance-level",
+ "level-3",
+ "--json",
+ ],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "policy migrate"
+ assert "max_bundle_age_days" in out["error"]
+
+
+def test_advisory_sync_rejects_replay_integrated_time(monkeypatch) -> None:
+ with runner.isolated_filesystem():
+ advisory_record = {
+ "schema": "aixv.signed-record/v1",
+ "kind": "advisory",
+ "record_id": "ADV-2026-0001",
+ "created_at": "2026-02-16T00:00:00+00:00",
+ "payload": {
+ "advisory_id": "ADV-2026-0001",
+ "affected": [{"digest": f"sha256:{'1' * 64}"}],
+ "severity": "high",
+ "status": "active",
+ "reason_code": "model-compromise",
+ "recommended_actions": ["rollback"],
+ },
+ "signature_bundle_path": None,
+ }
+ Path("remote-advisory.json").write_text(json.dumps(advisory_record), encoding="utf-8")
+ Path("remote-advisory.sigstore.json").write_text("{}", encoding="utf-8")
+ Path("feed.json").write_text(
+ json.dumps(
+ {
+ "schema": "aixv.advisory-feed/v1",
+ "entries": [
+ {
+ "record": "remote-advisory.json",
+ "bundle": "remote-advisory.sigstore.json",
+ }
+ ],
+ }
+ ),
+ encoding="utf-8",
+ )
+
+ def _fake_verify_signed_record(*args, **kwargs):
+ return {
+ "verified": True,
+ "integrated_time": "2026-02-16T00:00:00+00:00",
+ "actual_subjects": ["security@aixv.org"],
+ "actual_issuer": "https://accounts.google.com",
+ }
+
+ monkeypatch.setattr("aixv.cli.verify_signed_record", _fake_verify_signed_record)
+
+ first = runner.invoke(
+ app,
+ [
+ "advisory",
+ "sync",
+ "--feed",
+ "feed.json",
+ "--trusted-subject",
+ "security@aixv.org",
+ "--json",
+ ],
+ )
+ assert first.exit_code == 0
+ first_out = json.loads(first.stdout)
+ assert first_out["ok"] is True
+ assert first_out["imported_count"] == 1
+
+ second = runner.invoke(
+ app,
+ [
+ "advisory",
+ "sync",
+ "--feed",
+ "feed.json",
+ "--trusted-subject",
+ "security@aixv.org",
+ "--json",
+ ],
+ )
+ assert second.exit_code == 1
+ second_out = json.loads(second.stdout)
+ assert second_out["ok"] is False
+ assert second_out["rejected_count"] == 1
+ assert "replay/stale" in second_out["results"][0]["error"]
+
+
+def test_advisory_sync_rejects_http_feed_reference() -> None:
+ result = runner.invoke(
+ app,
+ [
+ "advisory",
+ "sync",
+ "--feed",
+ "http://example.com/advisory-feed.json",
+ "--trusted-subject",
+ "security@aixv.org",
+ "--json",
+ ],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "advisory sync"
+ assert "only https://" in out["error"]
+
+
+def test_record_create_rejects_unsafe_record_id() -> None:
+ with runner.isolated_filesystem():
+ Path("waiver.json").write_text(
+ json.dumps({"reason": "temporary exception"}),
+ encoding="utf-8",
+ )
+ result = runner.invoke(
+ app,
+ [
+ "record",
+ "create",
+ "--kind",
+ "waiver",
+ "--record-id",
+ "../escape",
+ "--input",
+ "waiver.json",
+ "--json",
+ ],
+ )
+ assert result.exit_code == 1
+ out = json.loads(result.stdout)
+ assert out["ok"] is False
+ assert out["command"] == "record create"
+ assert "invalid record_id" in out["error"]
diff --git a/tests/test_conformance_command.py b/tests/test_conformance_command.py
index 00a4be6..4eee812 100644
--- a/tests/test_conformance_command.py
+++ b/tests/test_conformance_command.py
@@ -54,7 +54,14 @@ def test_conformance_command_passes_with_required_fixtures() -> None:
assert payload["ok"] is True
assert payload["schema"] == "aixv.conformance-report/v1"
assert payload["overall_status"] == "pass"
- assert len(payload["checks"]) >= 4
+ check_ids = {c["check_id"] for c in payload["checks"]}
+ assert "policy.unknown-field.reject.v1" in check_ids
+ assert "policy.advisory-trust.subject-fallback.v1" in check_ids
+ assert "advisory.signed-policy.filtering.v1" in check_ids
+ assert "bundle.schema.validation.v1" in check_ids
+ assert "advisory.sync.replay-freshness.v1" in check_ids
+ assert "crypto.invalid-bundle.artifact.reject.v1" in check_ids
+ assert "crypto.invalid-bundle.statement.reject.v1" in check_ids
def test_conformance_command_fails_when_fixtures_missing() -> None:
diff --git a/tests/test_core_primitives.py b/tests/test_core_primitives.py
index 9426810..2efe6a2 100644
--- a/tests/test_core_primitives.py
+++ b/tests/test_core_primitives.py
@@ -1,14 +1,28 @@
+from datetime import datetime, timezone
from pathlib import Path
import pytest
from aixv.core import (
+ advisory_trust_constraints_from_policy,
+ create_attestation_record,
create_record,
create_signed_record_payload,
evaluate_admission,
+ evaluate_advisory_policy,
+ evaluate_advisory_sync_guards,
+ evaluate_assurance_level_requirements,
+ export_attestations_as_ml_bom,
+ export_attestations_as_slsa,
+ load_attestation_records_for_digest,
+ load_attestations_for_digest,
load_signed_record,
normalize_sha256_digest,
+ sha256_file,
+ trace_training_lineage_descendants,
+ trace_training_lineage_parents,
validate_policy_payload,
+ validate_record_payload,
)
@@ -70,3 +84,228 @@ def test_evaluate_admission_returns_deny_on_policy_violation() -> None:
)
assert decision.decision == "deny"
assert len(decision.violations) >= 1
+
+
+def test_advisory_trust_constraints_fall_back_to_subject() -> None:
+ constraints = advisory_trust_constraints_from_policy(
+ {
+ "policy_type": "aixv.policy/v1",
+ "subject": "security-policy@aixv.org",
+ "issuer": "https://accounts.google.com",
+ }
+ )
+ assert constraints["subjects"] == ["security-policy@aixv.org"]
+ assert constraints["issuers"] == ["https://accounts.google.com"]
+
+
+def test_evaluate_advisory_policy_uses_only_trusted_when_signed_required() -> None:
+ policy = {
+ "policy_type": "aixv.policy/v1",
+ "allow_subjects": ["security@aixv.org"],
+ "require_signed_advisories": True,
+ "require_no_active_advisories": True,
+ }
+ unsigned_active = [{"status": "active", "_trust": {"signed_and_trusted": False}}]
+ trusted_active = [{"status": "active", "_trust": {"signed_and_trusted": True}}]
+ assert evaluate_advisory_policy(policy=policy, advisories=unsigned_active) == []
+ assert evaluate_advisory_policy(policy=policy, advisories=trusted_active) == [
+ "active advisories present while require_no_active_advisories=true"
+ ]
+
+
+def test_evaluate_advisory_sync_guards_rejects_replay() -> None:
+ violations = evaluate_advisory_sync_guards(
+ integrated_time="2026-02-16T00:00:00+00:00",
+ previous_integrated_time="2026-02-16T00:00:00+00:00",
+ max_age_days=None,
+ )
+ assert any("replay/stale" in v for v in violations)
+
+
+def test_evaluate_advisory_sync_guards_rejects_stale_bundle() -> None:
+ violations = evaluate_advisory_sync_guards(
+ integrated_time="2026-01-01T00:00:00+00:00",
+ previous_integrated_time=None,
+ max_age_days=7,
+ now=datetime(2026, 2, 1, tzinfo=timezone.utc),
+ )
+ assert any("max_age_days" in v for v in violations)
+
+
+def test_attestation_record_roundtrip_preserves_statement_and_bundle(tmp_path: Path) -> None:
+ artifact = tmp_path / "model.safetensors"
+ artifact.write_bytes(b"hello")
+ digest = sha256_file(artifact)
+ statement = {
+ "_type": "https://in-toto.io/Statement/v1",
+ "subject": [{"name": artifact.name, "digest": {"sha256": digest.split(":", 1)[1]}}],
+ "predicateType": "https://aixv.org/attestation/training/v1",
+ "predicate": {
+ "parent_models": [{"digest": digest}],
+ "datasets": [],
+ "training_run": {
+ "framework": "pytorch",
+ "framework_version": "2.2.0",
+ "code_digest": digest,
+ "environment_digest": digest,
+ },
+ "hyperparameters": {},
+ },
+ }
+ create_attestation_record(
+ root=tmp_path,
+ artifact=artifact,
+ predicate_uri=statement["predicateType"],
+ statement=statement,
+ signature_bundle_path="example.statement.sigstore.json",
+ )
+ records = load_attestation_records_for_digest(tmp_path, digest)
+ assert len(records) == 1
+ assert records[0]["statement"]["predicateType"] == statement["predicateType"]
+ assert records[0]["signature_bundle_path"] == "example.statement.sigstore.json"
+
+ statements = load_attestations_for_digest(tmp_path, digest)
+ assert len(statements) == 1
+ assert statements[0]["subject"][0]["digest"]["sha256"] == digest.split(":", 1)[1]
+
+
+def test_trace_training_lineage_parents_honors_depth(tmp_path: Path) -> None:
+ leaf = tmp_path / "leaf.safetensors"
+ parent = tmp_path / "parent.safetensors"
+ root = tmp_path / "root.safetensors"
+ leaf.write_bytes(b"leaf")
+ parent.write_bytes(b"parent")
+ root.write_bytes(b"root")
+
+ leaf_digest = sha256_file(leaf)
+ parent_digest = sha256_file(parent)
+ root_digest = sha256_file(root)
+
+ create_attestation_record(
+ root=tmp_path,
+ artifact=leaf,
+ predicate_uri="https://aixv.org/attestation/training/v1",
+ statement={
+ "_type": "https://in-toto.io/Statement/v1",
+ "subject": [{"name": leaf.name, "digest": {"sha256": leaf_digest.split(":", 1)[1]}}],
+ "predicateType": "https://aixv.org/attestation/training/v1",
+ "predicate": {
+ "parent_models": [{"digest": parent_digest}],
+ "datasets": [],
+ "training_run": {
+ "framework": "pytorch",
+ "framework_version": "2.2.0",
+ "code_digest": leaf_digest,
+ "environment_digest": leaf_digest,
+ },
+ "hyperparameters": {},
+ },
+ },
+ )
+ create_attestation_record(
+ root=tmp_path,
+ artifact=parent,
+ predicate_uri="https://aixv.org/attestation/training/v1",
+ statement={
+ "_type": "https://in-toto.io/Statement/v1",
+ "subject": [
+ {"name": parent.name, "digest": {"sha256": parent_digest.split(":", 1)[1]}}
+ ],
+ "predicateType": "https://aixv.org/attestation/training/v1",
+ "predicate": {
+ "parent_models": [{"digest": root_digest}],
+ "datasets": [],
+ "training_run": {
+ "framework": "pytorch",
+ "framework_version": "2.2.0",
+ "code_digest": parent_digest,
+ "environment_digest": parent_digest,
+ },
+ "hyperparameters": {},
+ },
+ },
+ )
+
+ one_hop = trace_training_lineage_parents(tmp_path, leaf_digest, depth=1)
+ two_hop = trace_training_lineage_parents(tmp_path, leaf_digest, depth=2)
+
+ assert len(one_hop) == 1
+ assert one_hop[0]["digest"] == parent_digest
+ assert one_hop[0]["depth"] == 1
+
+ assert len(two_hop) == 2
+ assert {entry["digest"] for entry in two_hop} == {parent_digest, root_digest}
+
+ descendants = trace_training_lineage_descendants(tmp_path, root_digest, depth=2)
+ assert {entry["digest"] for entry in descendants} == {parent_digest, leaf_digest}
+
+
+def test_assurance_level_requirements_for_level_3_policy() -> None:
+ violations = evaluate_assurance_level_requirements(
+ assurance_level="level-3",
+ policy_provided=True,
+ require_signed_policy=True,
+ policy={
+ "policy_type": "aixv.policy/v1",
+ "allow_subjects": ["alice@example.com"],
+ "require_signed_advisories": True,
+ "require_no_active_advisories": True,
+ "max_bundle_age_days": 7,
+ },
+ )
+ assert violations == []
+
+
+def test_export_adapters_emit_expected_shapes() -> None:
+ digest = f"sha256:{'1' * 64}"
+ parent = f"sha256:{'2' * 64}"
+ dataset = f"sha256:{'3' * 64}"
+ attestations = [
+ {
+ "_type": "https://in-toto.io/Statement/v1",
+ "subject": [
+ {"name": "model.safetensors", "digest": {"sha256": digest.split(":", 1)[1]}}
+ ],
+ "predicateType": "https://aixv.org/attestation/training/v1",
+ "predicate": {
+ "parent_models": [{"digest": parent}],
+ "datasets": [{"digest": dataset, "split": "train"}],
+ "training_run": {
+ "framework": "pytorch",
+ "framework_version": "2.2.0",
+ "code_digest": digest,
+ "environment_digest": digest,
+ },
+ "hyperparameters": {},
+ },
+ }
+ ]
+ slsa = export_attestations_as_slsa(
+ artifact_digest=digest,
+ artifact_name="model.safetensors",
+ attestations=attestations,
+ )
+ assert slsa["predicateType"] == "https://slsa.dev/provenance/v1"
+ assert len(slsa["buildDefinition"]["resolvedDependencies"]) == 2
+
+ bom = export_attestations_as_ml_bom(
+ artifact_digest=digest,
+ artifact_name="model.safetensors",
+ attestations=attestations,
+ )
+ assert bom["bom_format"] == "aixv.ml-bom/v1"
+ assert bom["component_count"] == 3
+
+
+def test_bundle_payload_validation_normalizes_primary_into_members() -> None:
+ payload = validate_record_payload(
+ "bundle",
+ {
+ "bundle_type": "aixv.bundle/v1",
+ "bundle_id": "bundle-main",
+ "primary": "1" * 64,
+ "members": [f"sha256:{'2' * 64}"],
+ },
+ )
+ assert payload["primary"] == f"sha256:{'1' * 64}"
+ assert payload["members"] == [f"sha256:{'2' * 64}", f"sha256:{'1' * 64}"]