diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c4fd53..933f89c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Sliding-window rate limiting in `DefaultPolicyEngine` per `(principal_id, capability_id)` pair (#39). + Default limits by safety class: 60 READ / 10 WRITE / 2 DESTRUCTIVE per 60s window. + Service-role principals get 10× limits. Configurable via constructor. - GitHub Release step in publish workflow — creates a release with auto-generated notes and artifacts before publishing to PyPI. ### Fixed diff --git a/docs/security.md b/docs/security.md index 06a03f4..8343157 100644 --- a/docs/security.md +++ b/docs/security.md @@ -35,4 +35,8 @@ Consider an agent that obtains a token for `billing.list_invoices` then passes i - The `AGENT_KERNEL_SECRET` must be kept secret. Rotate it if compromised. - The default `InMemoryDriver` has no persistence — suitable for testing only. - PII redaction is heuristic (regex-based). It is not a substitute for proper data governance. -- There is no rate limiting or quota enforcement in v0.1. +- Rate limiting is enforced per `(principal_id, capability_id)` pair using a sliding window. + Default limits: 60 READ / 10 WRITE / 2 DESTRUCTIVE invocations per 60-second window. + Principals with the `"service"` role receive 10× the default limits. Limits are + configurable via `DefaultPolicyEngine(rate_limits=...)`. There is no distributed or + persistent rate-limit state — limits reset on process restart. diff --git a/src/agent_kernel/policy.py b/src/agent_kernel/policy.py index 1271db2..c85e229 100644 --- a/src/agent_kernel/policy.py +++ b/src/agent_kernel/policy.py @@ -3,10 +3,14 @@ from __future__ import annotations import logging +import time +from collections import defaultdict +from collections.abc import Callable +from dataclasses import dataclass from typing import Any, Protocol from .enums import SafetyClass, SensitivityTag -from .errors import PolicyDenied +from .errors import AgentKernelError, PolicyDenied from .models import Capability, CapabilityRequest, PolicyDecision, Principal logger = logging.getLogger(__name__) @@ -18,6 +22,66 @@ _MAX_ROWS_USER = 50 _MAX_ROWS_SERVICE = 500 +# Default rate limits per safety class: (invocations, window_seconds). +_DEFAULT_RATE_LIMITS: dict[SafetyClass, tuple[int, float]] = { + SafetyClass.READ: (60, 60.0), + SafetyClass.WRITE: (10, 60.0), + SafetyClass.DESTRUCTIVE: (2, 60.0), +} + +# Service role multiplier for rate limits. +_SERVICE_RATE_MULTIPLIER = 10 + + +@dataclass(slots=True) +class _RateEntry: + """Timestamps for a single rate-limit key.""" + + timestamps: list[float] + + +class RateLimiter: + """Sliding-window rate limiter using monotonic clock. + + Args: + clock: Callable returning the current time in seconds. + Defaults to :func:`time.monotonic`. + """ + + def __init__(self, clock: Callable[[], float] | None = None) -> None: + self._clock = clock or time.monotonic + self._windows: dict[str, _RateEntry] = defaultdict(lambda: _RateEntry(timestamps=[])) + + def check(self, key: str, limit: int, window_seconds: float) -> bool: + """Return ``True`` if the next invocation would be within the limit. + + Prunes expired timestamps as a side-effect. + + Args: + key: Rate-limit key (e.g. ``"principal:capability"``). + limit: Maximum allowed invocations per window. + window_seconds: Sliding window duration in seconds. + + Returns: + ``True`` if under limit, ``False`` if limit would be exceeded. + """ + now = self._clock() + cutoff = now - window_seconds + entry = self._windows[key] + entry.timestamps = [t for t in entry.timestamps if t > cutoff] + if not entry.timestamps: + del self._windows[key] + return True + return len(entry.timestamps) < limit + + def record(self, key: str) -> None: + """Record an invocation for *key*. + + Args: + key: Rate-limit key. + """ + self._windows[key].timestamps.append(self._clock()) + class PolicyEngine(Protocol): """Interface for a policy engine.""" @@ -61,8 +125,40 @@ class DefaultPolicyEngine: ``"secrets_reader"`` and a justification of at least 15 characters. 6. **max_rows** — 50 for regular users; 500 for principals with the ``"service"`` role. + 7. **Rate limiting** — sliding-window rate limit per + ``(principal_id, capability_id)`` pair, with defaults by safety class. + Principals with the ``"service"`` role get 10× the default limits. """ + def __init__( + self, + *, + rate_limits: dict[SafetyClass, tuple[int, float]] | None = None, + clock: Callable[[], float] | None = None, + ) -> None: + """Initialise the policy engine. + + Args: + rate_limits: Override default rate limits per safety class. + Each value is ``(max_invocations, window_seconds)``. + Partial overrides are merged into the defaults so that + unspecified safety classes retain their default limits. + clock: Monotonic clock callable for rate-limiter. + Defaults to :func:`time.monotonic`. + """ + limits = dict(_DEFAULT_RATE_LIMITS) + if rate_limits is not None: + limits.update(rate_limits) + for sc, (count, window) in limits.items(): + if count < 1 or window <= 0: + raise AgentKernelError( + f"Invalid rate limit for {sc.value}: " + f"limit must be >= 1 and window must be > 0, " + f"got limit={count}, window={window}." + ) + self._rate_limits = limits + self._limiter = RateLimiter(clock=clock) + @staticmethod def _deny(reason: str, *, principal_id: str, capability_id: str) -> PolicyDenied: """Log a policy denial at WARNING and return the exception to raise.""" @@ -197,6 +293,22 @@ def evaluate( else: constraints["max_rows"] = max_rows + # ── Rate limiting ───────────────────────────────────────────────── + + rate_key = f"{pid}:{cid}" + if capability.safety_class in self._rate_limits: + limit, window = self._rate_limits[capability.safety_class] + if "service" in roles: + limit *= _SERVICE_RATE_MULTIPLIER + if not self._limiter.check(rate_key, limit, window): + raise self._deny( + f"Rate limit exceeded: {limit} {capability.safety_class.value} " + f"invocations per {window}s for principal '{pid}'", + principal_id=pid, + capability_id=cid, + ) + self._limiter.record(rate_key) + reason = "Request approved by DefaultPolicyEngine." logger.info( "policy_allowed", diff --git a/tests/test_policy.py b/tests/test_policy.py index 7154915..10d23c8 100644 --- a/tests/test_policy.py +++ b/tests/test_policy.py @@ -2,9 +2,12 @@ from __future__ import annotations +from collections.abc import Callable + import pytest from agent_kernel import ( + AgentKernelError, Capability, DefaultPolicyEngine, PolicyDenied, @@ -13,6 +16,7 @@ SensitivityTag, ) from agent_kernel.models import CapabilityRequest +from agent_kernel.policy import RateLimiter def _req(cap_id: str, **constraints: object) -> CapabilityRequest: @@ -35,31 +39,34 @@ def _cap( ) -engine = DefaultPolicyEngine() +@pytest.fixture() +def engine() -> DefaultPolicyEngine: + """Fresh engine per test to avoid shared rate-limit state.""" + return DefaultPolicyEngine() # ── READ ─────────────────────────────────────────────────────────────────────── -def test_read_allowed_no_roles() -> None: +def test_read_allowed_no_roles(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1") dec = engine.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") assert dec.allowed is True -def test_read_sets_max_rows_user() -> None: +def test_read_sets_max_rows_user(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["reader"]) dec = engine.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") assert dec.constraints["max_rows"] == 50 -def test_read_sets_max_rows_service() -> None: +def test_read_sets_max_rows_service(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="svc1", roles=["service"]) dec = engine.evaluate(_req("cap.r"), _cap("cap.r", SafetyClass.READ), p, justification="") assert dec.constraints["max_rows"] == 500 -def test_read_respects_tighter_constraint() -> None: +def test_read_respects_tighter_constraint(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1") dec = engine.evaluate( _req("cap.r", max_rows=5), _cap("cap.r", SafetyClass.READ), p, justification="" @@ -67,7 +74,7 @@ def test_read_respects_tighter_constraint() -> None: assert dec.constraints["max_rows"] == 5 -def test_read_tighter_constraint_cannot_exceed_cap() -> None: +def test_read_tighter_constraint_cannot_exceed_cap(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1") dec = engine.evaluate( _req("cap.r", max_rows=9999), _cap("cap.r", SafetyClass.READ), p, justification="" @@ -78,7 +85,7 @@ def test_read_tighter_constraint_cannot_exceed_cap() -> None: # ── WRITE ────────────────────────────────────────────────────────────────────── -def test_write_denied_no_role() -> None: +def test_write_denied_no_role(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["reader"]) with pytest.raises(PolicyDenied, match="writer.*admin"): engine.evaluate( @@ -89,7 +96,7 @@ def test_write_denied_no_role() -> None: ) -def test_write_denied_short_justification() -> None: +def test_write_denied_short_justification(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["writer"]) with pytest.raises(PolicyDenied, match="justification"): engine.evaluate( @@ -97,7 +104,7 @@ def test_write_denied_short_justification() -> None: ) -def test_write_denied_whitespace_justification() -> None: +def test_write_denied_whitespace_justification(engine: DefaultPolicyEngine) -> None: """Whitespace-only justification must not bypass the length requirement.""" p = Principal(principal_id="u1", roles=["writer"]) with pytest.raises(PolicyDenied, match="justification"): @@ -106,7 +113,7 @@ def test_write_denied_whitespace_justification() -> None: ) -def test_write_allowed_writer_role() -> None: +def test_write_allowed_writer_role(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["writer"]) dec = engine.evaluate( _req("cap.w"), @@ -117,7 +124,7 @@ def test_write_allowed_writer_role() -> None: assert dec.allowed is True -def test_write_allowed_admin_role() -> None: +def test_write_allowed_admin_role(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["admin"]) dec = engine.evaluate( _req("cap.w"), @@ -131,7 +138,7 @@ def test_write_allowed_admin_role() -> None: # ── DESTRUCTIVE ──────────────────────────────────────────────────────────────── -def test_destructive_denied_short_justification() -> None: +def test_destructive_denied_short_justification(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["admin"]) with pytest.raises(PolicyDenied, match="DESTRUCTIVE capabilities require a justification"): engine.evaluate( @@ -142,7 +149,7 @@ def test_destructive_denied_short_justification() -> None: ) -def test_destructive_denied_whitespace_justification() -> None: +def test_destructive_denied_whitespace_justification(engine: DefaultPolicyEngine) -> None: """Whitespace-only justification must not bypass the length requirement.""" p = Principal(principal_id="u1", roles=["admin"]) with pytest.raises(PolicyDenied, match="justification"): @@ -154,7 +161,7 @@ def test_destructive_denied_whitespace_justification() -> None: ) -def test_destructive_denied_no_admin() -> None: +def test_destructive_denied_no_admin(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["writer"]) with pytest.raises(PolicyDenied, match="admin"): engine.evaluate( @@ -165,7 +172,7 @@ def test_destructive_denied_no_admin() -> None: ) -def test_destructive_allowed_admin() -> None: +def test_destructive_allowed_admin(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["admin"]) dec = engine.evaluate( _req("cap.d"), @@ -179,35 +186,35 @@ def test_destructive_allowed_admin() -> None: # ── PII / PCI ────────────────────────────────────────────────────────────────── -def test_pii_requires_tenant() -> None: +def test_pii_requires_tenant(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["reader"]) cap = _cap("cap.pii", SafetyClass.READ, SensitivityTag.PII) with pytest.raises(PolicyDenied, match="tenant"): engine.evaluate(_req("cap.pii"), cap, p, justification="") -def test_pii_allowed_with_tenant() -> None: +def test_pii_allowed_with_tenant(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["reader"], attributes={"tenant": "acme"}) cap = _cap("cap.pii", SafetyClass.READ, SensitivityTag.PII) dec = engine.evaluate(_req("cap.pii"), cap, p, justification="") assert dec.allowed is True -def test_pii_enforces_allowed_fields() -> None: +def test_pii_enforces_allowed_fields(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["reader"], attributes={"tenant": "acme"}) cap = _cap("cap.pii", SafetyClass.READ, SensitivityTag.PII, allowed_fields=["id", "name"]) dec = engine.evaluate(_req("cap.pii"), cap, p, justification="") assert dec.constraints.get("allowed_fields") == ["id", "name"] -def test_pii_reader_skips_allowed_fields() -> None: +def test_pii_reader_skips_allowed_fields(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["reader", "pii_reader"], attributes={"tenant": "acme"}) cap = _cap("cap.pii", SafetyClass.READ, SensitivityTag.PII, allowed_fields=["id", "name"]) dec = engine.evaluate(_req("cap.pii"), cap, p, justification="") assert "allowed_fields" not in dec.constraints -def test_pci_requires_tenant() -> None: +def test_pci_requires_tenant(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["reader"]) cap = _cap("cap.pci", SafetyClass.READ, SensitivityTag.PCI) with pytest.raises(PolicyDenied, match="tenant"): @@ -217,21 +224,21 @@ def test_pci_requires_tenant() -> None: # ── SECRETS ──────────────────────────────────────────────────────────────────── -def test_secrets_denied_no_role() -> None: +def test_secrets_denied_no_role(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["reader"]) cap = _cap("cap.sec", SafetyClass.READ, SensitivityTag.SECRETS) with pytest.raises(PolicyDenied, match="secrets_reader"): engine.evaluate(_req("cap.sec"), cap, p, justification="long enough justification here") -def test_secrets_denied_short_justification() -> None: +def test_secrets_denied_short_justification(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["secrets_reader"]) cap = _cap("cap.sec", SafetyClass.READ, SensitivityTag.SECRETS) with pytest.raises(PolicyDenied, match="justification"): engine.evaluate(_req("cap.sec"), cap, p, justification="too short") -def test_secrets_denied_whitespace_justification() -> None: +def test_secrets_denied_whitespace_justification(engine: DefaultPolicyEngine) -> None: """Whitespace-only justification must not bypass the length requirement.""" p = Principal(principal_id="u1", roles=["secrets_reader"]) cap = _cap("cap.sec", SafetyClass.READ, SensitivityTag.SECRETS) @@ -239,21 +246,21 @@ def test_secrets_denied_whitespace_justification() -> None: engine.evaluate(_req("cap.sec"), cap, p, justification=" ") -def test_secrets_allowed_secrets_reader_role() -> None: +def test_secrets_allowed_secrets_reader_role(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["secrets_reader"]) cap = _cap("cap.sec", SafetyClass.READ, SensitivityTag.SECRETS) dec = engine.evaluate(_req("cap.sec"), cap, p, justification="long enough justification here") assert dec.allowed is True -def test_secrets_allowed_admin_role() -> None: +def test_secrets_allowed_admin_role(engine: DefaultPolicyEngine) -> None: p = Principal(principal_id="u1", roles=["admin"]) cap = _cap("cap.sec", SafetyClass.READ, SensitivityTag.SECRETS) dec = engine.evaluate(_req("cap.sec"), cap, p, justification="long enough justification here") assert dec.allowed is True -def test_secrets_denied_writer_role() -> None: +def test_secrets_denied_writer_role(engine: DefaultPolicyEngine) -> None: """Writer role is insufficient for SECRETS capabilities.""" p = Principal(principal_id="u1", roles=["writer"]) cap = _cap("cap.sec", SafetyClass.READ, SensitivityTag.SECRETS) @@ -264,7 +271,7 @@ def test_secrets_denied_writer_role() -> None: # ── Confused-deputy binding (via token) ──────────────────────────────────────── -def test_max_rows_enforcement() -> None: +def test_max_rows_enforcement(engine: DefaultPolicyEngine) -> None: """max_rows in constraints is capped by the policy ceiling.""" p = Principal(principal_id="u1") dec = engine.evaluate( @@ -273,7 +280,7 @@ def test_max_rows_enforcement() -> None: assert dec.constraints["max_rows"] == 50 -def test_max_rows_invalid_raises_policy_denied() -> None: +def test_max_rows_invalid_raises_policy_denied(engine: DefaultPolicyEngine) -> None: """Non-numeric max_rows raises PolicyDenied, not bare ValueError.""" p = Principal(principal_id="u1") with pytest.raises(PolicyDenied, match="Invalid 'max_rows'"): @@ -285,10 +292,179 @@ def test_max_rows_invalid_raises_policy_denied() -> None: ) -def test_max_rows_negative_clamped_to_zero() -> None: +def test_max_rows_negative_clamped_to_zero(engine: DefaultPolicyEngine) -> None: """Negative max_rows is clamped to 0.""" p = Principal(principal_id="u1") dec = engine.evaluate( _req("cap.r", max_rows=-10), _cap("cap.r", SafetyClass.READ), p, justification="" ) assert dec.constraints["max_rows"] == 0 + + +# ── Rate limiting ───────────────────────────────────────────────────────────────── + + +def _make_clock(start: float = 0.0) -> tuple[list[float], Callable[[], float]]: + """Return a controllable clock: (time_ref, clock_fn). + + Advance time by mutating ``time_ref[0]``. + """ + time_ref = [start] + return time_ref, lambda: time_ref[0] + + +def test_rate_limiter_under_limit() -> None: + """Requests within the limit are allowed.""" + _, clock = _make_clock() + limiter = RateLimiter(clock=clock) + for _ in range(5): + assert limiter.check("k", 5, 60.0) is True + limiter.record("k") + # 6th should be denied + assert limiter.check("k", 5, 60.0) is False + + +def test_rate_limiter_window_expires() -> None: + """Old entries expire and free up capacity.""" + t, clock = _make_clock(0.0) + limiter = RateLimiter(clock=clock) + # Fill window + for _ in range(5): + limiter.check("k", 5, 60.0) + limiter.record("k") + assert limiter.check("k", 5, 60.0) is False + # Advance past window + t[0] = 61.0 + assert limiter.check("k", 5, 60.0) is True + + +def test_read_rate_limit_exceeded() -> None: + """61st READ invocation in 60s raises PolicyDenied.""" + _, clock = _make_clock() + eng = DefaultPolicyEngine(clock=clock) + p = Principal(principal_id="u1") + cap = _cap("cap.r", SafetyClass.READ) + for _ in range(60): + eng.evaluate(_req("cap.r"), cap, p, justification="") + with pytest.raises(PolicyDenied, match="Rate limit exceeded"): + eng.evaluate(_req("cap.r"), cap, p, justification="") + + +def test_write_rate_limit_exceeded() -> None: + """11th WRITE invocation in 60s raises PolicyDenied.""" + _, clock = _make_clock() + eng = DefaultPolicyEngine(clock=clock) + p = Principal(principal_id="u1", roles=["writer"]) + cap = _cap("cap.w", SafetyClass.WRITE) + just = "this is a long enough justification string" + for _ in range(10): + eng.evaluate(_req("cap.w"), cap, p, justification=just) + with pytest.raises(PolicyDenied, match="Rate limit exceeded"): + eng.evaluate(_req("cap.w"), cap, p, justification=just) + + +def test_destructive_rate_limit_exceeded() -> None: + """3rd DESTRUCTIVE invocation in 60s raises PolicyDenied.""" + _, clock = _make_clock() + eng = DefaultPolicyEngine(clock=clock) + p = Principal(principal_id="u1", roles=["admin"]) + cap = _cap("cap.d", SafetyClass.DESTRUCTIVE) + just = "long enough justification" + for _ in range(2): + eng.evaluate(_req("cap.d"), cap, p, justification=just) + with pytest.raises(PolicyDenied, match="Rate limit exceeded"): + eng.evaluate(_req("cap.d"), cap, p, justification=just) + + +def test_rate_limit_per_principal_capability_pair() -> None: + """Rate limits are scoped to (principal_id, capability_id), not global.""" + _, clock = _make_clock() + eng = DefaultPolicyEngine(clock=clock) + p1 = Principal(principal_id="u1") + p2 = Principal(principal_id="u2") + cap = _cap("cap.r", SafetyClass.READ) + # Exhaust u1's limit + for _ in range(60): + eng.evaluate(_req("cap.r"), cap, p1, justification="") + with pytest.raises(PolicyDenied, match="Rate limit exceeded"): + eng.evaluate(_req("cap.r"), cap, p1, justification="") + # u2 is unaffected + eng.evaluate(_req("cap.r"), cap, p2, justification="") + + +def test_service_role_gets_10x_limit() -> None: + """Principals with 'service' role get 10x the default rate limits.""" + _, clock = _make_clock() + eng = DefaultPolicyEngine(clock=clock) + p = Principal(principal_id="svc1", roles=["service"]) + cap = _cap("cap.r", SafetyClass.READ) + # Default READ is 60; service gets 600 + for _ in range(600): + eng.evaluate(_req("cap.r"), cap, p, justification="") + with pytest.raises(PolicyDenied, match="Rate limit exceeded"): + eng.evaluate(_req("cap.r"), cap, p, justification="") + + +def test_rate_limit_configurable() -> None: + """Rate limits are configurable via constructor.""" + _, clock = _make_clock() + eng = DefaultPolicyEngine( + rate_limits={SafetyClass.READ: (3, 10.0)}, + clock=clock, + ) + p = Principal(principal_id="u1") + cap = _cap("cap.r", SafetyClass.READ) + for _ in range(3): + eng.evaluate(_req("cap.r"), cap, p, justification="") + with pytest.raises(PolicyDenied, match="Rate limit exceeded"): + eng.evaluate(_req("cap.r"), cap, p, justification="") + + +def test_partial_rate_limits_preserves_defaults() -> None: + """Partial rate_limits override must not disable defaults for other classes.""" + _, clock = _make_clock() + eng = DefaultPolicyEngine( + rate_limits={SafetyClass.READ: (3, 10.0)}, + clock=clock, + ) + p = Principal(principal_id="u1", roles=["admin"]) + cap_d = _cap("cap.d", SafetyClass.DESTRUCTIVE) + just = "long enough justification" + # DESTRUCTIVE default is 2 per 60s — must still be enforced + for _ in range(2): + eng.evaluate(_req("cap.d"), cap_d, p, justification=just) + with pytest.raises(PolicyDenied, match="Rate limit exceeded"): + eng.evaluate(_req("cap.d"), cap_d, p, justification=just) + + +def test_rate_limit_rejects_zero_limit() -> None: + """Rate limit with limit < 1 raises at construction time.""" + with pytest.raises(AgentKernelError, match="limit must be >= 1"): + DefaultPolicyEngine(rate_limits={SafetyClass.READ: (0, 60.0)}) + + +def test_rate_limit_rejects_non_positive_window() -> None: + """Rate limit with window <= 0 raises at construction time.""" + with pytest.raises(AgentKernelError, match="window must be > 0"): + DefaultPolicyEngine(rate_limits={SafetyClass.WRITE: (10, 0.0)}) + + +def test_rate_limit_window_slides() -> None: + """Old entries expire, allowing new invocations after the window slides.""" + t, clock = _make_clock(0.0) + eng = DefaultPolicyEngine( + rate_limits={SafetyClass.READ: (2, 10.0)}, + clock=clock, + ) + p = Principal(principal_id="u1") + cap = _cap("cap.r", SafetyClass.READ) + # Use both + eng.evaluate(_req("cap.r"), cap, p, justification="") + t[0] = 5.0 + eng.evaluate(_req("cap.r"), cap, p, justification="") + # Blocked + with pytest.raises(PolicyDenied, match="Rate limit exceeded"): + eng.evaluate(_req("cap.r"), cap, p, justification="") + # Advance past first entry's window + t[0] = 11.0 + eng.evaluate(_req("cap.r"), cap, p, justification="") # should succeed