Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to the AxonFlow Python SDK will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [4.2.0] - 2026-03-16

### Added

- `get_circuit_breaker_status()` — query active circuit breaker circuits and emergency stop state
- `get_circuit_breaker_history(limit)` — retrieve circuit breaker trip/reset audit trail
- `get_circuit_breaker_config(tenant_id)` — get effective circuit breaker config (global or tenant-specific)
- `update_circuit_breaker_config(config)` — update per-tenant circuit breaker thresholds

---

## [4.1.0] - 2026-03-14

### Added
Expand Down
11 changes: 11 additions & 0 deletions axonflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@
BudgetStatus,
CacheConfig,
CancelPlanResponse,
CircuitBreakerConfig,
CircuitBreakerConfigUpdate,
CircuitBreakerHistoryEntry,
CircuitBreakerHistoryResponse,
CircuitBreakerStatusResponse,
ClientRequest,
ClientResponse,
CodeArtifact,
Expand Down Expand Up @@ -312,6 +317,12 @@
# Audit Tool Call types (Issue #1260)
"AuditToolCallRequest",
"AuditToolCallResponse",
# Circuit Breaker Observability types (Issue #1176)
"CircuitBreakerStatusResponse",
"CircuitBreakerHistoryEntry",
"CircuitBreakerHistoryResponse",
"CircuitBreakerConfig",
"CircuitBreakerConfigUpdate",
# Execution Replay types
"ExecutionSummary",
"ExecutionSnapshot",
Expand Down
197 changes: 197 additions & 0 deletions axonflow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@
BudgetStatus,
CacheConfig,
CancelPlanResponse,
CircuitBreakerConfig,
CircuitBreakerConfigUpdate,
CircuitBreakerHistoryEntry,
CircuitBreakerHistoryResponse,
CircuitBreakerStatusResponse,
ClientRequest,
ClientResponse,
ConnectorHealthStatus,
Expand Down Expand Up @@ -1850,6 +1855,171 @@ async def audit_tool_call(
timestamp=response["timestamp"],
)

# =========================================================================
# Circuit Breaker Observability Methods
# =========================================================================

async def get_circuit_breaker_status(self) -> CircuitBreakerStatusResponse:
"""Get all active circuit breaker circuits.

Returns the current state of all circuit breakers, including which
circuits are open (tripped) and whether any emergency stop is active.

Returns:
CircuitBreakerStatusResponse with active circuits and counts.

Raises:
AxonFlowError: If the request fails.

Example:
>>> status = await client.get_circuit_breaker_status()
>>> print(f"{status.count} active circuits")
>>> if status.emergency_stop_active:
... print("Emergency stop is active!")
"""
if self._config.debug:
self._logger.debug("Getting circuit breaker status")

response = await self._request("GET", "/api/v1/circuit-breaker/status")
data = response.get("data", response)

return CircuitBreakerStatusResponse(
active_circuits=data.get("active_circuits") or [],
count=data.get("count", 0),
emergency_stop_active=data.get("emergency_stop_active", False),
)

async def get_circuit_breaker_history(
self,
limit: int | None = None,
) -> CircuitBreakerHistoryResponse:
"""Get circuit breaker history for audit trail.

Returns the history of circuit breaker state transitions, including
trips, resets, and auto-recovery events.

Args:
limit: Maximum number of history entries to return.

Returns:
CircuitBreakerHistoryResponse with history entries.

Raises:
AxonFlowError: If the request fails.

Example:
>>> history = await client.get_circuit_breaker_history(limit=50)
>>> for entry in history.history:
... print(f"{entry.scope}/{entry.scope_id}: {entry.state}")
"""
if self._config.debug:
self._logger.debug(
"Getting circuit breaker history",
limit=limit,
)

path = "/api/v1/circuit-breaker/history"
if limit is not None:
path = f"{path}?limit={limit}"

response = await self._request("GET", path)
data = response.get("data", response)

history = [CircuitBreakerHistoryEntry(**entry) for entry in (data.get("history") or [])]

return CircuitBreakerHistoryResponse(
history=history,
count=data.get("count", 0),
)

async def get_circuit_breaker_config(
self,
tenant_id: str | None = None,
) -> CircuitBreakerConfig:
"""Get circuit breaker configuration (global or tenant-specific).

Args:
tenant_id: If provided, returns tenant-specific config with
any overrides applied. Otherwise returns global defaults.

Returns:
CircuitBreakerConfig with thresholds and recovery settings.

Raises:
AxonFlowError: If the request fails.

Example:
>>> config = await client.get_circuit_breaker_config()
>>> print(f"Error threshold: {config.error_threshold}")
>>> tenant_config = await client.get_circuit_breaker_config(
... tenant_id="tenant-123"
... )
"""
if self._config.debug:
self._logger.debug(
"Getting circuit breaker config",
tenant_id=tenant_id,
)

path = "/api/v1/circuit-breaker/config"
if tenant_id is not None:
path = f"{path}?tenant_id={tenant_id}"

response = await self._request("GET", path)
data = response.get("data", response)

return CircuitBreakerConfig(**data)

async def update_circuit_breaker_config(
self,
config: CircuitBreakerConfigUpdate,
) -> dict[str, Any]:
"""Update per-tenant circuit breaker configuration.

Sets tenant-specific overrides for circuit breaker thresholds and
recovery behavior.

Args:
config: Configuration update with tenant_id and override values.

Returns:
Server response confirming the update.

Raises:
ValueError: If tenant_id is empty.
AxonFlowError: If the request fails.

Example:
>>> from axonflow.types import CircuitBreakerConfigUpdate
>>> result = await client.update_circuit_breaker_config(
... CircuitBreakerConfigUpdate(
... tenant_id="tenant-123",
... error_threshold=10,
... violation_threshold=5,
... )
... )
"""
if not config.tenant_id or not config.tenant_id.strip():
msg = "tenant_id is required and cannot be empty"
raise ValueError(msg)

if self._config.debug:
self._logger.debug(
"Updating circuit breaker config",
tenant_id=config.tenant_id,
)

request_body = config.model_dump(by_alias=True, exclude_none=True)

response = await self._request(
"PUT",
"/api/v1/circuit-breaker/config",
json_data=request_body,
)

result: dict[str, Any] = response.get("data", response)
return result

# =========================================================================
# Audit Log Read Methods
# =========================================================================
Expand Down Expand Up @@ -6271,6 +6441,33 @@ def audit_tool_call(
"""Record a non-LLM tool call in the audit trail."""
return self._run_sync(self._async_client.audit_tool_call(request))

# Circuit Breaker Observability sync wrappers

def get_circuit_breaker_status(self) -> CircuitBreakerStatusResponse:
"""Get all active circuit breaker circuits."""
return self._run_sync(self._async_client.get_circuit_breaker_status())

def get_circuit_breaker_history(
self,
limit: int | None = None,
) -> CircuitBreakerHistoryResponse:
"""Get circuit breaker history for audit trail."""
return self._run_sync(self._async_client.get_circuit_breaker_history(limit=limit))

def get_circuit_breaker_config(
self,
tenant_id: str | None = None,
) -> CircuitBreakerConfig:
"""Get circuit breaker config (global or tenant-specific)."""
return self._run_sync(self._async_client.get_circuit_breaker_config(tenant_id=tenant_id))

def update_circuit_breaker_config(
self,
config: CircuitBreakerConfigUpdate,
) -> dict[str, Any]:
"""Update per-tenant circuit breaker config."""
return self._run_sync(self._async_client.update_circuit_breaker_config(config))

# Policy CRUD sync wrappers

def list_static_policies(
Expand Down
82 changes: 82 additions & 0 deletions axonflow/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,3 +1168,85 @@ class AuditToolCallResponse(BaseModel):
audit_id: str = Field(description="Unique ID for the audit entry")
status: str = Field(description="Recording status (e.g., recorded)")
timestamp: str = Field(description="Timestamp when the audit entry was recorded")


# =========================================================================
# Circuit Breaker Observability Types
# =========================================================================


class CircuitBreakerStatusResponse(BaseModel):
"""Response from circuit breaker status endpoint."""

model_config = ConfigDict(populate_by_name=True)

active_circuits: list[dict[str, Any]] = Field(
default_factory=list, description="List of active (open) circuits"
)
count: int = Field(description="Number of active circuits")
emergency_stop_active: bool = Field(description="Whether any circuit is open")


class CircuitBreakerHistoryEntry(BaseModel):
"""A single circuit breaker history entry."""

model_config = ConfigDict(populate_by_name=True)

id: str = Field(description="Circuit ID")
org_id: str = Field(description="Organization ID")
scope: str = Field(description="Circuit scope (global, tenant, client, policy)")
scope_id: str = Field(default="", description="Scope identifier")
state: str = Field(description="Circuit state (closed, open, half_open)")
trip_reason: str | None = Field(default=None, description="Why the circuit was tripped")
tripped_by: str | None = Field(default=None, description="Who/what tripped the circuit")
tripped_at: str | None = Field(default=None, description="When the circuit was tripped")
expires_at: str | None = Field(default=None, description="When the circuit will auto-reset")
reset_by: str | None = Field(default=None, description="Who reset the circuit")
reset_at: str | None = Field(default=None, description="When the circuit was reset")
error_count: int = Field(default=0, description="Number of errors in current window")
violation_count: int = Field(default=0, description="Number of violations in current window")


class CircuitBreakerHistoryResponse(BaseModel):
"""Response from circuit breaker history endpoint."""

model_config = ConfigDict(populate_by_name=True)

history: list[CircuitBreakerHistoryEntry] = Field(
default_factory=list, description="Circuit history entries"
)
count: int = Field(description="Number of history entries")


class CircuitBreakerConfig(BaseModel):
"""Circuit breaker configuration (effective for a tenant or global)."""

model_config = ConfigDict(populate_by_name=True)

source: str = Field(description="Config source: 'global' or 'tenant'")
error_threshold: int = Field(description="Error threshold for auto-trip")
violation_threshold: int = Field(description="Policy violation threshold")
window_seconds: int = Field(description="Sliding window duration in seconds")
default_timeout_seconds: int = Field(description="Default circuit open timeout in seconds")
max_timeout_seconds: int = Field(description="Maximum allowed timeout in seconds")
enable_auto_recovery: bool = Field(description="Whether auto-recovery is enabled")
tenant_id: str | None = Field(default=None, description="Tenant ID if tenant-specific")
overrides: dict[str, Any] | None = Field(default=None, description="Tenant-specific overrides")


class CircuitBreakerConfigUpdate(BaseModel):
"""Request to update per-tenant circuit breaker config."""

model_config = ConfigDict(populate_by_name=True)

tenant_id: str = Field(description="Tenant ID to configure")
error_threshold: int | None = Field(default=None, description="Override error threshold")
violation_threshold: int | None = Field(
default=None, description="Override violation threshold"
)
window_seconds: int | None = Field(default=None, description="Override window duration")
default_timeout_seconds: int | None = Field(
default=None, description="Override default timeout"
)
max_timeout_seconds: int | None = Field(default=None, description="Override max timeout")
enable_auto_recovery: bool | None = Field(default=None, description="Override auto-recovery")
Loading
Loading