Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions agent/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ def _solve_recursive(
# Stream thinking/text deltas only for top-level calls
if on_content_delta and depth == 0 and hasattr(model, "on_content_delta"):
model.on_content_delta = on_content_delta
# Rate-limit messages fire at all depths
if on_event and hasattr(model, "on_retry"):
model.on_retry = lambda msg, _d=depth, _s=step: self._emit(
f"[d{_d}/s{_s}] {msg}", on_event
)
try:
turn = model.complete(conversation)
except ModelError as exc:
Expand All @@ -333,6 +338,8 @@ def _solve_recursive(
finally:
if hasattr(model, "on_content_delta"):
model.on_content_delta = None
if hasattr(model, "on_retry"):
model.on_retry = None
elapsed = time.monotonic() - t0

if replay_logger:
Expand Down
157 changes: 116 additions & 41 deletions agent/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import json
import socket
import time
import urllib.error
import urllib.request
from dataclasses import dataclass, field
Expand Down Expand Up @@ -95,37 +96,83 @@ def _extract_content(content: object) -> str:
return ""


def _parse_retry_after(exc: urllib.error.HTTPError, default: int = 5) -> int:
"""Parse Retry-After header from an HTTPError, clamped to [1, 120]."""
raw = exc.headers.get("Retry-After")
if raw is None:
return default
try:
return max(1, min(120, int(raw)))
except (ValueError, TypeError):
return default


def _notify_retry(on_retry: "Callable[[str], None] | None", msg: str) -> None:
"""Fire on_retry callback, swallowing any exception."""
if on_retry is not None:
try:
on_retry(msg)
except Exception:
pass


def _sleep_with_countdown(
seconds: int,
attempt: int,
max_attempts: int,
on_retry: "Callable[[str], None] | None",
) -> None:
"""Sleep for *seconds*, firing on_retry with a countdown each second."""
for remaining in range(seconds, 0, -1):
_notify_retry(
on_retry,
f"Rate limited (attempt {attempt}/{max_attempts}). "
f"Retrying in {remaining}s...",
)
time.sleep(1)


def _http_json(
url: str,
method: str,
headers: dict[str, str],
payload: dict[str, Any] | None = None,
timeout_sec: int = 90,
max_rate_limit_retries: int = 5,
) -> dict[str, Any]:
req = urllib.request.Request(
url=url,
data=(json.dumps(payload).encode("utf-8") if payload is not None else None),
headers=headers,
method=method,
)
try:
with urllib.request.urlopen(req, timeout=timeout_sec) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as exc: # pragma: no cover - network path
body = exc.read().decode("utf-8", errors="replace")
raise ModelError(f"HTTP {exc.code} calling {url}: {body}") from exc
except urllib.error.URLError as exc: # pragma: no cover - network path
raise ModelError(f"Connection error calling {url}: {exc}") from exc
except OSError as exc: # pragma: no cover - bare socket.timeout, etc.
raise ModelError(f"Network error calling {url}: {exc}") from exc
for rate_limit_attempt in range(1, max_rate_limit_retries + 1):
req = urllib.request.Request(
url=url,
data=(json.dumps(payload).encode("utf-8") if payload is not None else None),
headers=headers,
method=method,
)
try:
with urllib.request.urlopen(req, timeout=timeout_sec) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as exc:
if exc.code == 429:
retry_after = _parse_retry_after(exc)
_sleep_with_countdown(retry_after, rate_limit_attempt, max_rate_limit_retries, None)
continue
body = exc.read().decode("utf-8", errors="replace")[:8192]
raise ModelError(f"HTTP {exc.code} calling {url}: {body}") from exc
except urllib.error.URLError as exc: # pragma: no cover - network path
raise ModelError(f"Connection error calling {url}: {exc}") from exc
except OSError as exc: # pragma: no cover - bare socket.timeout, etc.
raise ModelError(f"Network error calling {url}: {exc}") from exc

try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise ModelError(f"Non-JSON response from {url}: {raw[:500]}") from exc
if not isinstance(parsed, dict):
raise ModelError(f"Unexpected non-object JSON from {url}: {type(parsed)!r}")
return parsed
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise ModelError(f"Non-JSON response from {url}: {raw[:500]}") from exc
if not isinstance(parsed, dict):
raise ModelError(f"Unexpected non-object JSON from {url}: {type(parsed)!r}")
return parsed

raise ModelError(
f"HTTP 429 calling {url}: rate limited, exhausted {max_rate_limit_retries} retries"
)


def _extend_socket_timeout(resp: Any, timeout: float) -> None:
Expand Down Expand Up @@ -212,32 +259,54 @@ def _http_stream_sse(
stream_timeout: float = 120,
max_retries: int = 3,
on_sse_event: "Callable[[str, dict[str, Any]], None] | None" = None,
on_retry: "Callable[[str], None] | None" = None,
max_rate_limit_retries: int = 5,
) -> list[tuple[str, dict[str, Any]]]:
"""Stream an SSE endpoint with first-byte timeout and retry logic."""
data = json.dumps(payload).encode("utf-8")

last_exc: Exception | None = None
for attempt in range(max_retries):
req = urllib.request.Request(url=url, data=data, headers=headers, method=method)
try:
resp = urllib.request.urlopen(req, timeout=first_byte_timeout)
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise ModelError(f"HTTP {exc.code} calling {url}: {body}") from exc
except (socket.timeout, urllib.error.URLError, OSError) as exc:
# Timeout or connection error — retry
last_exc = exc
for rate_limit_attempt in range(1, max_rate_limit_retries + 1):
got_429 = False
retry_after = 0

last_exc: Exception | None = None
for attempt in range(max_retries):
req = urllib.request.Request(url=url, data=data, headers=headers, method=method)
try:
resp = urllib.request.urlopen(req, timeout=first_byte_timeout)
except urllib.error.HTTPError as exc:
if exc.code == 429:
retry_after = _parse_retry_after(exc)
got_429 = True
break
body = exc.read().decode("utf-8", errors="replace")[:8192]
raise ModelError(f"HTTP {exc.code} calling {url}: {body}") from exc
except (socket.timeout, urllib.error.URLError, OSError) as exc:
# Timeout or connection error — retry
last_exc = exc
continue

# First byte received — extend timeout for the rest of the stream
_extend_socket_timeout(resp, stream_timeout)
try:
return _read_sse_events(resp, on_sse_event=on_sse_event)
finally:
resp.close()
else:
if not got_429:
raise ModelError(
f"Timed out after {max_retries} attempts calling {url}: {last_exc}"
)

if got_429:
_sleep_with_countdown(retry_after, rate_limit_attempt, max_rate_limit_retries, on_retry)
continue

# First byte received — extend timeout for the rest of the stream
_extend_socket_timeout(resp, stream_timeout)
try:
return _read_sse_events(resp, on_sse_event=on_sse_event)
finally:
resp.close()
# If we reach here without got_429 and without returning, something went wrong
break # pragma: no cover

raise ModelError(
f"Timed out after {max_retries} attempts calling {url}: {last_exc}"
f"HTTP 429 calling {url}: rate limited, exhausted {max_rate_limit_retries} retries"
)


Expand Down Expand Up @@ -577,6 +646,7 @@ class OpenAICompatibleModel:
strict_tools: bool = True
tool_defs: list[dict[str, Any]] | None = None
on_content_delta: Callable[[str, str], None] | None = None
on_retry: Callable[[str], None] | None = None

def _is_reasoning_model(self) -> bool:
"""OpenAI reasoning models (o-series, gpt-5 series) have different API constraints."""
Expand Down Expand Up @@ -656,6 +726,7 @@ def _forward_delta(_event_type: str, data: dict[str, Any]) -> None:
payload=payload,
stream_timeout=self.timeout_sec,
on_sse_event=sse_cb,
on_retry=self.on_retry,
)
parsed = _accumulate_openai_stream(events)
except ModelError as exc:
Expand All @@ -675,6 +746,7 @@ def _forward_delta(_event_type: str, data: dict[str, Any]) -> None:
payload=payload,
stream_timeout=self.timeout_sec,
on_sse_event=sse_cb,
on_retry=self.on_retry,
)
parsed = _accumulate_openai_stream(events)

Expand Down Expand Up @@ -771,6 +843,7 @@ class AnthropicModel:
timeout_sec: int = 300
tool_defs: list[dict[str, Any]] | None = None
on_content_delta: Callable[[str, str], None] | None = None
on_retry: Callable[[str], None] | None = None

def create_conversation(self, system_prompt: str, initial_user_message: str) -> Conversation:
messages: list[Any] = [
Expand Down Expand Up @@ -849,6 +922,7 @@ def _forward_delta(_event_type: str, data: dict[str, Any]) -> None:
payload=payload,
stream_timeout=self.timeout_sec,
on_sse_event=sse_cb,
on_retry=self.on_retry,
)
parsed = _accumulate_anthropic_stream(events)
except ModelError as exc:
Expand All @@ -869,6 +943,7 @@ def _forward_delta(_event_type: str, data: dict[str, Any]) -> None:
payload=payload,
stream_timeout=self.timeout_sec,
on_sse_event=sse_cb,
on_retry=self.on_retry,
)
parsed = _accumulate_anthropic_stream(events)

Expand Down
62 changes: 62 additions & 0 deletions docs/brainstorms/2026-02-20-429-rate-limit-handling-brainstorm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Brainstorm: Graceful 429 Rate Limit Handling

**Date:** 2026-02-20
**Status:** Ready for planning

## What We're Building

Retry logic for HTTP 429 (rate limit) responses from the Anthropic API. When the
app hits a rate limit, it should wait using the `Retry-After` header and retry
automatically — up to 5 attempts — with a visible countdown in the TUI so the
user knows what's happening.

## Why This Approach

**Problem:** The app currently treats 429 as a fatal error. `_http_stream_sse()`
catches `HTTPError` and immediately raises `ModelError`, which terminates the
engine's solve loop. The user sees an error and has to restart.

**Approach A (inline retry)** was chosen over a retry decorator or model-layer
retry because:

- The existing `_http_stream_sse()` already has a retry loop for connection
errors — extending it for 429 is natural
- Only two call sites need updating (`_http_stream_sse` and `_http_json`), which
doesn't justify a new abstraction
- Keeps HTTP retry concerns in the HTTP transport layer where they belong

## Key Decisions

1. **Scope: Anthropic 429 only (for now).** Other providers and other transient
errors (5xx) are out of scope. The shared code path means expanding later is
straightforward.

2. **Retry strategy: Respect `Retry-After` header, max 5 attempts.** Anthropic
includes this header in 429 responses. We honor it directly rather than
implementing our own exponential backoff.

3. **UX: Visible countdown.** During the wait, show a live countdown
(e.g., "Rate limited. Retrying in 8s... 7s... 6s...") in the TUI event stream
so the user knows the app hasn't hung.

4. **Both call paths covered.** Retry logic goes in both `_http_stream_sse()`
(streaming LLM calls) and `_http_json()` (model listing).

5. **Failure mode: ModelError as today.** After 5 failed attempts, raise the same
`ModelError` the app raises now. No new exception subclasses.

6. **Implementation: Inline in transport functions.** No decorators, no new
abstractions. Extend the existing retry pattern.

## Affected Code

| File | Function | Change |
|------|----------|--------|
| `agent/model.py` | `_http_stream_sse()` | Add 429 detection in HTTPError handler, parse Retry-After, sleep with countdown, loop |
| `agent/model.py` | `_http_json()` | Same pattern |
| `agent/model.py` | Both functions' signatures | Add optional `on_retry` callback for countdown display |
| `agent/engine.py` | Call sites of above | Thread `on_event` through as `on_retry` callback |

## Open Questions

None — all questions resolved during brainstorming.
Loading