diff --git a/.changeset/error-handling-circuit-breaker.md b/.changeset/error-handling-circuit-breaker.md new file mode 100644 index 00000000..20ec1b87 --- /dev/null +++ b/.changeset/error-handling-circuit-breaker.md @@ -0,0 +1,12 @@ +--- +"@agentxjs/core": minor +"agentxjs": minor +--- + +Add AgentXError top-level error type, circuit breaker, and onError API + +- New `@agentxjs/core/error` module with `AgentXError` class and `CircuitBreaker` +- `AgentXError` is a core-level type (like `AgentXPlatform`) with category, code, recoverability +- Circuit breaker protects against cascading LLM driver failures (5 failures → open → 30s cooldown) +- Message persistence failures now emit `AgentXError` via EventBus instead of being silently swallowed +- New `ax.onError(handler)` top-level API on AgentX interface for structured error handling diff --git a/bdd/journeys/developer/02-getting-started.feature b/bdd/journeys/developer/02-getting-started.feature index 7e1bff5c..9158442c 100644 --- a/bdd/journeys/developer/02-getting-started.feature +++ b/bdd/journeys/developer/02-getting-started.feature @@ -175,6 +175,24 @@ Feature: Getting Started with AgentX SDK """ Then I can display or process the conversation history + # ============================================================================ + # Error Handling + # ============================================================================ + + Scenario: Developer handles errors via top-level onError + Given I have an AgentX instance in any mode + When I register an error handler: + """ + ax.onError((error) => { + console.error(`[${error.category}] ${error.code}: ${error.message}`); + if (!error.recoverable) { + // Circuit is open or fatal error + } + }); + """ + Then all AgentXError instances from any layer are delivered to this handler + And this is independent of stream events and Presentation API + # ============================================================================ # MCP Tools # ============================================================================ diff --git a/bdd/journeys/developer/03-error-handling.feature b/bdd/journeys/developer/03-error-handling.feature new file mode 100644 index 00000000..04c158a1 --- /dev/null +++ b/bdd/journeys/developer/03-error-handling.feature @@ -0,0 +1,89 @@ +@journey @developer +Feature: Error Handling in AgentX SDK + As a developer, I need structured error handling so I can respond to + failures gracefully without digging into stream events. + + # ============================================================================ + # AgentXError — Top-Level Error Type + # ============================================================================ + + Scenario: Developer understands AgentXError as a core type + Given AgentXError is defined in @agentxjs/core/error + Then it is a top-level type like AgentXPlatform + And it extends the standard Error class + And it has these properties: + | property | type | description | + | code | string | e.g. "PERSISTENCE_FAILED"| + | category | "driver" \| "persistence" \| "connection" \| "runtime" | which layer failed | + | recoverable | boolean | should the caller retry | + | context | { agentId?, sessionId?, messageId? } | scope of the error | + | cause | Error (optional) | original error | + + Scenario: Developer imports AgentXError from core + Given I need to handle errors + When I import from @agentxjs/core/error: + """ + import { AgentXError } from "@agentxjs/core/error"; + """ + Then I can use AgentXError for instanceof checks and error creation + + # ============================================================================ + # onError — Top-Level API + # ============================================================================ + + Scenario: Developer subscribes to errors via onError + Given I have an AgentX instance in any mode + When I register an error handler: + """ + ax.onError((error) => { + console.error(`[${error.category}] ${error.code}: ${error.message}`); + if (!error.recoverable) { + // Circuit is open, stop sending requests + } + }); + """ + Then all AgentXError instances from any layer are delivered to this handler + And this is independent of ax.on("error", ...) which handles stream events + And this is independent of presentation.onError which handles UI errors + + # ============================================================================ + # Circuit Breaker — Driver Layer + # ============================================================================ + + Scenario: Circuit breaker protects against cascading LLM failures + Given the LLM API returns consecutive errors + When the failure count reaches the threshold (default: 5) + Then the circuit opens and rejects new requests immediately + And an AgentXError is emitted: + | code | CIRCUIT_OPEN | + | category | driver | + | recoverable | false | + | message | Circuit breaker open: too many failures| + + Scenario: Circuit breaker recovers after cooldown + Given the circuit is open + When the reset timeout elapses (default: 30 seconds) + Then the circuit transitions to half-open + And the next request is allowed through as a probe + And if it succeeds, the circuit closes and normal operation resumes + And if it fails, the circuit re-opens + + # ============================================================================ + # Persistence Errors — No Longer Silent + # ============================================================================ + + Scenario: Persistence failure emits AgentXError instead of silent logging + Given an agent is processing a conversation + When a message fails to persist to the session repository + Then an AgentXError is emitted via onError: + | code | PERSISTENCE_FAILED | + | category | persistence | + | recoverable | true | + And the conversation continues (persistence failure does not crash the agent) + And the error includes context with agentId and sessionId + + Scenario: User message persistence failure stops the request + Given a user sends a message + When the user message fails to persist + Then the request fails with an error (not silently swallowed) + And an AgentXError is emitted with code "PERSISTENCE_FAILED" diff --git a/packages/agentx/README.md b/packages/agentx/README.md index 69f426ca..14fe4edd 100644 --- a/packages/agentx/README.md +++ b/packages/agentx/README.md @@ -98,6 +98,9 @@ interface AgentX { onAny(handler: BusEventHandler): Unsubscribe; subscribe(sessionId: string): void; + // Error handling + onError(handler: (error: AgentXError) => void): Unsubscribe; + // Lifecycle disconnect(): Promise; dispose(): Promise; @@ -158,6 +161,42 @@ const { records } = await ax.rpc<{ records: ImageRecord[] }>("image.list"); const response = await ax.rpc(request.method, request.params); ``` +### Error Handling + +Top-level error handler — receives structured `AgentXError` from all layers (driver, persistence, connection, runtime). Independent of stream events and Presentation API. + +```typescript +import { AgentXError } from "agentxjs"; + +ax.onError((error) => { + console.error(`[${error.category}] ${error.code}: ${error.message}`); + + if (error.code === "CIRCUIT_OPEN") { + // Too many consecutive LLM failures — stop sending requests + } + + if (error.code === "PERSISTENCE_FAILED") { + // Message failed to save — conversation continues but data may be lost + } + + if (!error.recoverable) { + // Fatal error — consider restarting the agent + } +}); +``` + +**AgentXError properties:** + +| Property | Type | Description | +| ------------- | -------- | ------------------------------------ | +| `code` | string | Error code (e.g. `PERSISTENCE_FAILED`) | +| `category` | string | `"driver"` \| `"persistence"` \| `"connection"` \| `"runtime"` | +| `recoverable` | boolean | Whether the caller should retry | +| `context` | object | `{ agentId?, sessionId?, imageId? }` | +| `cause` | Error? | Original error | + +**Built-in circuit breaker:** After 5 consecutive driver failures, the circuit opens and rejects new requests. After 30s cooldown, one probe request is allowed through. Success closes the circuit. + ### Stream Events | Event | Data | Description | diff --git a/packages/agentx/src/LocalClient.ts b/packages/agentx/src/LocalClient.ts index 8b37861a..041b06a9 100644 --- a/packages/agentx/src/LocalClient.ts +++ b/packages/agentx/src/LocalClient.ts @@ -5,6 +5,7 @@ * Implements the same AgentX interface as RemoteClient. */ +import type { AgentXError } from "@agentxjs/core/error"; import type { BusEvent, BusEventHandler, EventBus, Unsubscribe } from "@agentxjs/core/event"; import type { RpcMethod } from "@agentxjs/core/network"; import type { AgentXRuntime } from "@agentxjs/core/runtime"; @@ -77,6 +78,14 @@ export class LocalClient implements AgentX { // No-op for local mode - already subscribed via eventBus } + // ==================== Error Handling ==================== + + onError(handler: (error: AgentXError) => void): Unsubscribe { + return this.runtime.platform.eventBus.on("agentx_error", (event) => { + handler(event.data as AgentXError); + }); + } + // ==================== RPC ==================== async rpc(method: string, params?: unknown): Promise { diff --git a/packages/agentx/src/RemoteClient.ts b/packages/agentx/src/RemoteClient.ts index 0fd8c87b..898df4a6 100644 --- a/packages/agentx/src/RemoteClient.ts +++ b/packages/agentx/src/RemoteClient.ts @@ -5,6 +5,7 @@ * This class focuses on business logic, not protocol details. */ +import type { AgentXError } from "@agentxjs/core/error"; import type { BusEvent, BusEventHandler, EventBus, Unsubscribe } from "@agentxjs/core/event"; import { EventBusImpl } from "@agentxjs/core/event"; import { RpcClient } from "@agentxjs/core/network"; @@ -111,6 +112,14 @@ export class RemoteClient implements AgentX { logger.debug("Subscribed to session", { sessionId }); } + // ==================== Error Handling ==================== + + onError(handler: (error: AgentXError) => void): Unsubscribe { + return this.eventBus.on("agentx_error", (event) => { + handler(event.data as AgentXError); + }); + } + // ==================== RPC ==================== async rpc(method: string, params?: unknown): Promise { diff --git a/packages/agentx/src/index.ts b/packages/agentx/src/index.ts index a44c06b4..ab610c66 100644 --- a/packages/agentx/src/index.ts +++ b/packages/agentx/src/index.ts @@ -106,6 +106,10 @@ export function createAgentX(config?: PlatformConfig): AgentXBuilder { getLocalClient().subscribe(sessionId); }, + onError(handler) { + return getLocalClient().onError(handler); + }, + async disconnect() { await localClient?.disconnect(); }, @@ -155,6 +159,9 @@ export function createAgentX(config?: PlatformConfig): AgentXBuilder { }; } +export type { AgentXErrorCategory, AgentXErrorContext } from "@agentxjs/core/error"; +// Re-export error types +export { AgentXError, AgentXErrorCode } from "@agentxjs/core/error"; // Re-export server export { CommandHandler } from "./CommandHandler"; // Re-export Presentation types and classes diff --git a/packages/agentx/src/types.ts b/packages/agentx/src/types.ts index 3ab376a1..0c5e76b7 100644 --- a/packages/agentx/src/types.ts +++ b/packages/agentx/src/types.ts @@ -3,6 +3,7 @@ */ import type { Message } from "@agentxjs/core/agent"; +import type { AgentXError } from "@agentxjs/core/error"; import type { BusEvent, BusEventHandler, EventBus, Unsubscribe } from "@agentxjs/core/event"; import type { AgentXPlatform } from "@agentxjs/core/runtime"; import type { Presentation, PresentationOptions } from "./presentation"; @@ -334,6 +335,25 @@ export interface AgentX { onAny(handler: BusEventHandler): Unsubscribe; subscribe(sessionId: string): void; + // ==================== Error Handling ==================== + + /** + * Top-level error handler — receives all AgentXError instances from any layer. + * + * Independent of `on("error", ...)` (stream events) and `presentation.onError` (UI errors). + * + * @example + * ```typescript + * ax.onError((error) => { + * console.error(`[${error.category}] ${error.code}: ${error.message}`); + * if (!error.recoverable) { + * // Circuit is open, stop sending requests + * } + * }); + * ``` + */ + onError(handler: (error: AgentXError) => void): Unsubscribe; + // ==================== RPC ==================== /** diff --git a/packages/core/package.json b/packages/core/package.json index 2f0d95b5..a5ed127d 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -70,6 +70,11 @@ "import": "./dist/persistence/index.js", "default": "./dist/persistence/index.js" }, + "./error": { + "types": "./dist/error/index.d.ts", + "import": "./dist/error/index.js", + "default": "./dist/error/index.js" + }, "./network": { "types": "./dist/network/index.d.ts", "import": "./dist/network/index.js", diff --git a/packages/core/src/error/AgentXError.ts b/packages/core/src/error/AgentXError.ts new file mode 100644 index 00000000..3e1bf6b4 --- /dev/null +++ b/packages/core/src/error/AgentXError.ts @@ -0,0 +1,95 @@ +/** + * AgentXError — Top-level error type for the AgentX framework + * + * Like AgentXPlatform, this is a core-level type that all layers use. + * Provides structured error classification with category, code, and recoverability. + */ + +// ============================================================================ +// Types +// ============================================================================ + +/** + * Error category — which layer produced the error + */ +export type AgentXErrorCategory = "driver" | "persistence" | "connection" | "runtime"; + +/** + * Error context — scope information for debugging + */ +export interface AgentXErrorContext { + agentId?: string; + sessionId?: string; + imageId?: string; + containerId?: string; + messageId?: string; +} + +// ============================================================================ +// Error Codes +// ============================================================================ + +/** + * Well-known error codes + */ +export const AgentXErrorCode = { + // Driver + DRIVER_ERROR: "DRIVER_ERROR", + CIRCUIT_OPEN: "CIRCUIT_OPEN", + + // Persistence + PERSISTENCE_FAILED: "PERSISTENCE_FAILED", + + // Connection + CONNECTION_FAILED: "CONNECTION_FAILED", + CONNECTION_TIMEOUT: "CONNECTION_TIMEOUT", + + // Runtime + RUNTIME_ERROR: "RUNTIME_ERROR", +} as const; + +export type AgentXErrorCodeType = (typeof AgentXErrorCode)[keyof typeof AgentXErrorCode]; + +// ============================================================================ +// AgentXError Class +// ============================================================================ + +/** + * AgentXError — structured error for all AgentX layers + * + * @example + * ```typescript + * import { AgentXError } from "@agentxjs/core/error"; + * + * throw new AgentXError({ + * code: "PERSISTENCE_FAILED", + * category: "persistence", + * message: "Failed to persist assistant message", + * recoverable: true, + * context: { agentId: "agent_123", sessionId: "sess_456" }, + * cause: originalError, + * }); + * ``` + */ +export class AgentXError extends Error { + readonly code: string; + readonly category: AgentXErrorCategory; + readonly recoverable: boolean; + readonly context?: AgentXErrorContext; + + constructor(options: { + code: string; + category: AgentXErrorCategory; + message: string; + recoverable: boolean; + context?: AgentXErrorContext; + cause?: Error; + }) { + super(options.message, { cause: options.cause }); + this.name = "AgentXError"; + this.code = options.code; + this.category = options.category; + this.recoverable = options.recoverable; + this.context = options.context; + } +} diff --git a/packages/core/src/error/CircuitBreaker.ts b/packages/core/src/error/CircuitBreaker.ts new file mode 100644 index 00000000..cc50c5fc --- /dev/null +++ b/packages/core/src/error/CircuitBreaker.ts @@ -0,0 +1,139 @@ +/** + * CircuitBreaker — Protects against cascading failures + * + * Three states: + * - CLOSED: Normal operation, requests pass through + * - OPEN: Failures exceeded threshold, requests rejected immediately + * - HALF_OPEN: After cooldown, one probe request allowed through + */ + +import { AgentXError } from "./AgentXError"; + +// ============================================================================ +// Types +// ============================================================================ + +export type CircuitState = "closed" | "open" | "half_open"; + +export interface CircuitBreakerOptions { + /** Number of consecutive failures before opening (default: 5) */ + failureThreshold?: number; + /** Time in ms before transitioning from open to half-open (default: 30000) */ + resetTimeout?: number; +} + +export type CircuitBreakerEventHandler = (state: CircuitState, error?: AgentXError) => void; + +// ============================================================================ +// CircuitBreaker +// ============================================================================ + +export class CircuitBreaker { + private state: CircuitState = "closed"; + private failureCount = 0; + private lastFailureTime = 0; + private readonly failureThreshold: number; + private readonly resetTimeout: number; + private onStateChange?: CircuitBreakerEventHandler; + + constructor(options?: CircuitBreakerOptions) { + this.failureThreshold = options?.failureThreshold ?? 5; + this.resetTimeout = options?.resetTimeout ?? 30_000; + } + + /** + * Register a state change handler + */ + onChange(handler: CircuitBreakerEventHandler): void { + this.onStateChange = handler; + } + + /** + * Check if a request should be allowed through + */ + canExecute(): boolean { + if (this.state === "closed") return true; + + if (this.state === "open") { + // Check if cooldown has elapsed + if (Date.now() - this.lastFailureTime >= this.resetTimeout) { + this.transition("half_open"); + return true; // Allow one probe request + } + return false; + } + + // half_open — allow the probe + return true; + } + + /** + * Record a successful execution + */ + recordSuccess(): void { + if (this.state === "half_open" || this.state === "closed") { + this.failureCount = 0; + if (this.state !== "closed") { + this.transition("closed"); + } + } + } + + /** + * Record a failed execution + */ + recordFailure(error?: Error): void { + this.failureCount++; + this.lastFailureTime = Date.now(); + + if (this.state === "half_open") { + // Probe failed, re-open + this.transition("open", error); + } else if (this.failureCount >= this.failureThreshold) { + this.transition("open", error); + } + } + + /** + * Get current state + */ + getState(): CircuitState { + return this.state; + } + + /** + * Get current failure count + */ + getFailureCount(): number { + return this.failureCount; + } + + /** + * Reset the circuit breaker to closed state + */ + reset(): void { + this.failureCount = 0; + this.lastFailureTime = 0; + this.transition("closed"); + } + + private transition(newState: CircuitState, causeError?: Error): void { + const oldState = this.state; + if (oldState === newState) return; + + this.state = newState; + + let agentXError: AgentXError | undefined; + if (newState === "open") { + agentXError = new AgentXError({ + code: "CIRCUIT_OPEN", + category: "driver", + message: `Circuit breaker open: ${this.failureCount} consecutive failures`, + recoverable: false, + cause: causeError, + }); + } + + this.onStateChange?.(newState, agentXError); + } +} diff --git a/packages/core/src/error/index.ts b/packages/core/src/error/index.ts new file mode 100644 index 00000000..85c7efc1 --- /dev/null +++ b/packages/core/src/error/index.ts @@ -0,0 +1,19 @@ +/** + * @agentxjs/core/error + * + * Top-level error types for the AgentX framework. + */ + +export { + AgentXError, + type AgentXErrorCategory, + AgentXErrorCode, + type AgentXErrorCodeType, + type AgentXErrorContext, +} from "./AgentXError"; +export { + CircuitBreaker, + type CircuitBreakerEventHandler, + type CircuitBreakerOptions, + type CircuitState, +} from "./CircuitBreaker"; diff --git a/packages/core/src/runtime/AgentXRuntime.ts b/packages/core/src/runtime/AgentXRuntime.ts index f921aecf..3719c30e 100644 --- a/packages/core/src/runtime/AgentXRuntime.ts +++ b/packages/core/src/runtime/AgentXRuntime.ts @@ -31,6 +31,8 @@ import type { DriverStreamEvent, ToolDefinition, } from "../driver/types"; +import { AgentXError } from "../error/AgentXError"; +import { CircuitBreaker } from "../error/CircuitBreaker"; import type { BusEvent } from "../event/types"; import { createSession } from "../session/Session"; import type { @@ -54,6 +56,7 @@ interface AgentState { subscriptions: Set<() => void>; driver: Driver; engine: AgentEngine; + circuitBreaker: CircuitBreaker; /** Flag to track if a receive operation is in progress */ isReceiving: boolean; /** Pending message persist promises — flushed at end of receive() */ @@ -180,7 +183,24 @@ export class AgentXRuntimeImpl implements AgentXRuntime { if (category === "message" && output.type !== "user_message") { const message = output.data as Message; const persistPromise = sessionRepository.addMessage(sessionId, message).catch((err) => { + const axError = new AgentXError({ + code: "PERSISTENCE_FAILED", + category: "persistence", + message: `Failed to persist ${output.type}`, + recoverable: true, + context: { agentId, sessionId, imageId, containerId: imageRecord.containerId }, + cause: err instanceof Error ? err : new Error(String(err)), + }); logger.error("Failed to persist message", { type: output.type, error: err }); + eventBus.emit({ + type: "agentx_error", + timestamp: Date.now(), + source: "runtime", + category: "error", + intent: "notification", + data: axError, + context: { agentId, sessionId, imageId, containerId: imageRecord.containerId }, + } as BusEvent); }); const agentState = this.agents.get(agentId); if (agentState) { @@ -208,6 +228,23 @@ export class AgentXRuntimeImpl implements AgentXRuntime { createdAt: Date.now(), }; + // Create circuit breaker for this agent's driver calls + const circuitBreaker = new CircuitBreaker(); + circuitBreaker.onChange((newState, error) => { + logger.warn("Circuit breaker state changed", { agentId, state: newState }); + if (error) { + eventBus.emit({ + type: "agentx_error", + timestamp: Date.now(), + source: "runtime", + category: "error", + intent: "notification", + data: error, + context: { agentId, imageId, containerId: imageRecord.containerId, sessionId }, + } as BusEvent); + } + }); + // Store agent state with driver and engine const state: AgentState = { agent, @@ -215,6 +252,7 @@ export class AgentXRuntimeImpl implements AgentXRuntime { subscriptions: new Set(), driver, engine, + circuitBreaker, isReceiving: false, pendingPersists: [], }; @@ -386,6 +424,22 @@ export class AgentXRuntimeImpl implements AgentXRuntime { throw new Error(`Agent ${agentId} is already processing a message`); } + // Circuit breaker check + if (!state.circuitBreaker.canExecute()) { + throw new AgentXError({ + code: "CIRCUIT_OPEN", + category: "driver", + message: "Circuit breaker open: too many consecutive driver failures", + recoverable: false, + context: { + agentId, + sessionId: state.agent.sessionId, + imageId: state.agent.imageId, + containerId: state.agent.containerId, + }, + }); + } + const actualRequestId = requestId ?? this.generateRequestId(); // Build user message @@ -423,7 +477,12 @@ export class AgentXRuntimeImpl implements AgentXRuntime { // Convert DriverStreamEvent to BusEvent and emit this.handleDriverEvent(state, event, actualRequestId); } + // Driver call succeeded — record success for circuit breaker + state.circuitBreaker.recordSuccess(); } catch (error) { + // Record failure for circuit breaker + state.circuitBreaker.recordFailure(error instanceof Error ? error : undefined); + // Emit error event this.emitEvent( state,