diff --git a/docs/changelog.md b/docs/changelog.md
index 7d48970..b237daf 100644
--- a/docs/changelog.md
+++ b/docs/changelog.md
@@ -41,6 +41,19 @@ All notable changes to this project are documented here. For detailed informatio
- Can be combined with `max_usd`: `budget(max_usd=1.00, max_llm_calls=20)`
- Works with fallback: `budget(max_usd=1.00, max_llm_calls=20, fallback={"at_pct": 0.8, "model": "gpt-4o-mini"})`
+**LiteLLM provider adapter**
+
+- Install with `pip install shekel[litellm]`
+- Patches `litellm.completion` and `litellm.acompletion` (sync + async, including streaming)
+- Tracks costs across all 100+ providers LiteLLM supports (Gemini, Cohere, Ollama, Azure, Bedrock, Mistral, and more)
+- Model names with provider prefix (e.g. `gemini/gemini-1.5-flash`) pass through to the pricing engine
+
+**LangGraph integration helper**
+
+- `from shekel.integrations.langgraph import budgeted_graph`
+- `budgeted_graph(max_usd, **kwargs)` β convenience context manager wrapping `budget()` for LangGraph workflows
+- Install with `pip install shekel[langgraph]`
+
## [0.2.5] - 2026-03-11
### π§ Extensible Provider Architecture
diff --git a/docs/extending.md b/docs/extending.md
index 93a0329..f9af92f 100644
--- a/docs/extending.md
+++ b/docs/extending.md
@@ -67,7 +67,7 @@ assert cost == 0.005 # (1000/1000 * 0.002) + (500/1000 * 0.006)
## Supporting New LLM Providers
-Shekel uses a pluggable `ProviderAdapter` pattern. To add support for a new provider (e.g., Cohere, Mistral), implement `ProviderAdapter` and register it β no changes to core Shekel code required.
+Shekel uses a pluggable `ProviderAdapter` pattern. Built-in adapters cover **OpenAI**, **Anthropic**, and **LiteLLM** (which in turn routes to 100+ providers). To add support for a provider not covered by LiteLLM (e.g., a proprietary API or a very new SDK), implement `ProviderAdapter` and register it β no changes to core Shekel code required.
### The ProviderAdapter Interface
diff --git a/docs/how-it-works.md b/docs/how-it-works.md
index 54f4a3b..2ac4a5f 100644
--- a/docs/how-it-works.md
+++ b/docs/how-it-works.md
@@ -40,6 +40,10 @@ with budget(max_usd=1.00):
- `anthropic.resources.messages.Messages.create` (sync)
- `anthropic.resources.messages.AsyncMessages.create` (async)
+**LiteLLM** (when `shekel[litellm]` is installed):
+- `litellm.completion` (sync)
+- `litellm.acompletion` (async)
+
### Patching Implementation
Shekel uses a pluggable `ProviderAdapter` pattern β each provider registers itself in `ADAPTER_REGISTRY`. `shekel/_patch.py` delegates all patching to the registry:
@@ -153,7 +157,7 @@ with budget(max_usd=5.00):
Shekel extracts tokens from API responses:
-**OpenAI:**
+**OpenAI / LiteLLM:**
```python
def _extract_openai_tokens(response):
input_tokens = response.usage.prompt_tokens
@@ -162,6 +166,8 @@ def _extract_openai_tokens(response):
return input_tokens, output_tokens, model
```
+LiteLLM uses the same OpenAI-compatible format regardless of the underlying provider, so the same extraction logic applies.
+
**Anthropic:**
```python
def _extract_anthropic_tokens(response):
diff --git a/docs/index.md b/docs/index.md
index 8595b09..52e46c0 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -111,7 +111,7 @@ I built shekel so you don't have to learn that lesson yourself.
---
- Works with LangGraph, CrewAI, AutoGen, LlamaIndex, Haystack, and any framework that calls OpenAI or Anthropic.
+ Works with LangGraph, CrewAI, AutoGen, LlamaIndex, Haystack, and any framework that calls OpenAI, Anthropic, or LiteLLM.
- :material-web:{ .lg .middle } **Async & Streaming**
@@ -144,6 +144,12 @@ I built shekel so you don't have to learn that lesson yourself.
pip install shekel[anthropic]
```
+=== "LiteLLM (100+ providers)"
+
+ ```bash
+ pip install shekel[litellm]
+ ```
+
=== "Both"
```bash
@@ -217,7 +223,7 @@ print(f"Remaining: ${b.remaining:.4f}")
## What's New in v0.2.6
-**Breaking-Change Release** β Cleaner API with dict-based fallback, renamed callbacks, removed deprecated parameters, and new call-count budgets.
+**Breaking-Change Release** β Cleaner API with dict-based fallback, renamed callbacks, removed deprecated parameters, new call-count budgets, LiteLLM support, and a LangGraph convenience helper.
@@ -239,6 +245,28 @@ print(f"Remaining: ${b.remaining:.4f}")
New `max_llm_calls` parameter limits by number of LLM API calls, combinable with `max_usd`.
+- :material-transit-connection-variant:{ .lg .middle } **[LiteLLM Support](integrations/litellm.md)**
+
+ ---
+
+ Native adapter for LiteLLM β track costs across 100+ providers (Gemini, Cohere, Ollama, Azure, Bedrockβ¦) with zero extra code.
+
+ ```python
+ pip install shekel[litellm]
+ ```
+
+- :material-graph:{ .lg .middle } **[LangGraph Helper](integrations/langgraph.md)**
+
+ ---
+
+ New `budgeted_graph()` context manager for cleaner LangGraph integration.
+
+ ```python
+ from shekel.integrations.langgraph import budgeted_graph
+ with budgeted_graph(max_usd=0.50) as b:
+ result = app.invoke(state)
+ ```
+
---
@@ -279,6 +307,8 @@ print(f"Remaining: ${b.remaining:.4f}")
Built-in pricing for GPT-4o, GPT-4o-mini, o1, Claude 3.5 Sonnet, Claude 3 Haiku, Gemini 1.5, and more.
+Install `shekel[litellm]` to track costs across 100+ providers through LiteLLM's unified interface.
+
Install `shekel[all-models]` for 400+ models via [tokencost](https://github.com/AgentOps-AI/tokencost).
[See full model list β](models.md)
diff --git a/docs/installation.md b/docs/installation.md
index 148c0f3..469f7d4 100644
--- a/docs/installation.md
+++ b/docs/installation.md
@@ -3,8 +3,9 @@
## Requirements
- Python 3.9 or higher
-- OpenAI SDK (optional) - for OpenAI models
-- Anthropic SDK (optional) - for Anthropic models
+- OpenAI SDK (optional) β for OpenAI models
+- Anthropic SDK (optional) β for Anthropic models
+- LiteLLM (optional) β for 100+ providers via a unified interface
## Install Shekel
@@ -34,6 +35,14 @@ If you're using models from both providers:
pip install shekel[all]
```
+### LiteLLM (100+ Providers)
+
+For access to OpenAI, Anthropic, Gemini, Cohere, Ollama, Azure, Bedrock, and 90+ more through a unified interface:
+
+```bash
+pip install shekel[litellm]
+```
+
### Extended Model Support (400+ Models)
For support of 400+ models via [tokencost](https://github.com/AgentOps-AI/tokencost):
@@ -97,6 +106,7 @@ Shekel has zero required dependencies beyond the Python standard library. The Op
|---------|-----------|---------|
| `openai>=1.0.0` | Optional | Track OpenAI API costs |
| `anthropic>=0.7.0` | Optional | Track Anthropic API costs |
+| `litellm>=1.0.0` | Optional | Track costs via LiteLLM (100+ providers) |
| `tokencost>=0.1.0` | Optional | Support 400+ models |
| `click>=8.0.0` | Optional | CLI tools |
@@ -118,6 +128,14 @@ If you see this error, install the Anthropic SDK:
pip install shekel[anthropic]
```
+### ImportError: No module named 'litellm'
+
+If you see this error, install LiteLLM:
+
+```bash
+pip install shekel[litellm]
+```
+
### Model pricing not found
For models not in shekel's built-in pricing table:
diff --git a/docs/integrations/langgraph.md b/docs/integrations/langgraph.md
index 0862425..a8a6b51 100644
--- a/docs/integrations/langgraph.md
+++ b/docs/integrations/langgraph.md
@@ -8,9 +8,26 @@ Shekel works seamlessly with [LangGraph](https://github.com/langchain-ai/langgra
pip install shekel[openai] "langgraph>=0.2"
```
+## Convenience Helper
+
+Shekel provides a `budgeted_graph()` context manager so you don't need to import `budget` directly:
+
+```python
+from shekel.integrations.langgraph import budgeted_graph
+
+app = graph.compile()
+
+with budgeted_graph(max_usd=0.50, name="research-graph") as b:
+ result = app.invoke({"question": "What is 2+2?", "answer": ""})
+ print(f"Answer: {result['answer']}")
+ print(f"Cost: ${b.spent:.4f}")
+```
+
+It accepts the same keyword arguments as `budget()` (`name`, `warn_at`, `fallback`, `max_llm_calls`, etc.) and yields the active budget object.
+
## Basic Integration
-Wrap your LangGraph execution with a budget context:
+You can also use `budget()` directly β they are equivalent:
```python
from langgraph.graph import StateGraph, END
diff --git a/docs/integrations/litellm.md b/docs/integrations/litellm.md
new file mode 100644
index 0000000..2e63996
--- /dev/null
+++ b/docs/integrations/litellm.md
@@ -0,0 +1,179 @@
+# LiteLLM Integration
+
+Shekel natively supports [LiteLLM](https://github.com/BerriAI/litellm), the unified gateway that routes to 100+ LLM providers using an OpenAI-compatible interface.
+
+## Installation
+
+```bash
+pip install shekel[litellm]
+```
+
+Or alongside other extras:
+
+```bash
+pip install "shekel[litellm,langfuse]"
+```
+
+## Basic Usage
+
+Wrap any `litellm.completion` call in a budget context β no other changes needed:
+
+```python
+import litellm
+from shekel import budget
+
+with budget(max_usd=0.50) as b:
+ response = litellm.completion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": "Hello!"}],
+ )
+ print(response.choices[0].message.content)
+
+print(f"Cost: ${b.spent:.4f}")
+```
+
+## Why LiteLLM + Shekel?
+
+LiteLLM routes to OpenAI, Anthropic, Gemini, Cohere, Ollama, Azure, Bedrock, and 90+ more. Shekel tracks the cost of every call regardless of which provider LiteLLM routes to.
+
+```python
+import litellm
+from shekel import budget
+
+with budget(max_usd=2.00) as b:
+ # OpenAI
+ litellm.completion(model="gpt-4o-mini", messages=[...])
+ # Anthropic
+ litellm.completion(model="claude-3-haiku-20240307", messages=[...])
+ # Google Gemini
+ litellm.completion(model="gemini/gemini-1.5-flash", messages=[...])
+
+print(f"Combined cost across providers: ${b.spent:.4f}")
+```
+
+## Async Support
+
+```python
+import asyncio
+import litellm
+from shekel import budget
+
+async def run():
+ async with budget(max_usd=1.00) as b:
+ response = await litellm.acompletion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": "Hello async!"}],
+ )
+ print(response.choices[0].message.content)
+ print(f"Cost: ${b.spent:.4f}")
+
+asyncio.run(run())
+```
+
+## Streaming
+
+```python
+import litellm
+from shekel import budget
+
+with budget(max_usd=0.50) as b:
+ stream = litellm.completion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": "Count to 5"}],
+ stream=True,
+ )
+ for chunk in stream:
+ print(chunk.choices[0].delta.content or "", end="", flush=True)
+
+print(f"\nStreaming cost: ${b.spent:.4f}")
+```
+
+## Budget Enforcement
+
+Hard cap and early warnings work exactly as with any other provider:
+
+```python
+from shekel import budget, BudgetExceededError
+
+try:
+ with budget(max_usd=0.10, warn_at=0.8) as b:
+ for i in range(100):
+ litellm.completion(
+ model="gpt-4o",
+ messages=[{"role": "user", "content": f"Question {i}"}],
+ )
+except BudgetExceededError as e:
+ print(f"Stopped at ${e.spent:.4f} after {b.call_count} calls")
+```
+
+## Fallback Models
+
+Switch to a cheaper LiteLLM-routed model when budget runs low:
+
+```python
+import litellm
+from shekel import budget
+
+with budget(
+ max_usd=1.00,
+ fallback={"at_pct": 0.8, "model": "gpt-4o-mini"},
+) as b:
+ response = litellm.completion(
+ model="gpt-4o",
+ messages=[{"role": "user", "content": "Hello"}],
+ )
+
+if b.model_switched:
+ print(f"Switched to {b.fallback['model']} at ${b.switched_at_usd:.4f}")
+```
+
+## How It Works
+
+Shekel's `LiteLLMAdapter` patches the `litellm.completion` and `litellm.acompletion` module-level functions when the first `budget()` context is entered, and restores them when the last one exits.
+
+LiteLLM returns responses in OpenAI-compatible format (`response.usage.prompt_tokens`, `response.usage.completion_tokens`), so token extraction is straightforward regardless of which underlying provider was used.
+
+Model names may include a provider prefix (e.g. `gemini/gemini-1.5-flash`, `anthropic/claude-3-haiku-20240307`). Shekel passes these through to its pricing engine, which falls back to [tokencost](https://github.com/AgentOps-AI/tokencost) for extended model coverage.
+
+## Extended Model Pricing
+
+For accurate pricing on the full range of providers LiteLLM supports:
+
+```bash
+pip install "shekel[litellm,all-models]"
+```
+
+This installs `tokencost`, which covers 400+ models including Gemini, Cohere, Mistral, and many more.
+
+## With LangGraph or CrewAI
+
+LiteLLM can serve as the LLM backend for agent frameworks. Shekel tracks costs regardless:
+
+```python
+from langgraph.graph import StateGraph, END
+import litellm
+from shekel import budget
+
+def call_litellm(state):
+ response = litellm.completion(
+ model="gemini/gemini-1.5-flash",
+ messages=[{"role": "user", "content": state["question"]}],
+ )
+ return {"answer": response.choices[0].message.content}
+
+graph = StateGraph({"question": str, "answer": str})
+graph.add_node("llm", call_litellm)
+graph.set_entry_point("llm")
+graph.add_edge("llm", END)
+app = graph.compile()
+
+with budget(max_usd=0.50) as b:
+ result = app.invoke({"question": "What is 2+2?", "answer": ""})
+ print(f"Cost: ${b.spent:.4f}")
+```
+
+## Next Steps
+
+- [LangGraph Integration](langgraph.md)
+- [Extending Shekel](../extending.md) β add your own provider adapter
+- [Supported Models](../models.md)
diff --git a/docs/quickstart.md b/docs/quickstart.md
index 2f6ecc5..f7b2a3f 100644
--- a/docs/quickstart.md
+++ b/docs/quickstart.md
@@ -9,7 +9,8 @@ Choose the installation that matches your LLM provider:
```bash
pip install shekel[openai] # For OpenAI
pip install shekel[anthropic] # For Anthropic
-pip install shekel[all] # For both
+pip install shekel[litellm] # For 100+ providers via LiteLLM
+pip install shekel[all] # For OpenAI + Anthropic + LiteLLM
```
## Step 2: Import and Use
@@ -236,18 +237,35 @@ print(f"\nStreaming cost: ${b.spent:.4f}")
## Working with Frameworks
-Shekel works automatically with any framework that uses OpenAI or Anthropic under the hood.
+Shekel works automatically with any framework that uses OpenAI, Anthropic, or LiteLLM under the hood.
-### LangGraph
+### LiteLLM
+
+Track costs across 100+ providers with a single adapter:
```python
-from langgraph.graph import StateGraph, END
+import litellm
from shekel import budget
+with budget(max_usd=0.50) as b:
+ response = litellm.completion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": "Hello!"}],
+ )
+print(f"Cost: ${b.spent:.4f}")
+```
+
+### LangGraph
+
+Use `budget()` directly, or the convenience `budgeted_graph()` helper:
+
+```python
+from shekel.integrations.langgraph import budgeted_graph
+
# Your graph definition here
app = graph.compile()
-with budget(max_usd=0.50) as b:
+with budgeted_graph(max_usd=0.50, name="my-graph") as b:
result = app.invoke({"question": "What is 2+2?"})
print(f"Graph execution cost: ${b.spent:.4f}")
```
diff --git a/examples/langgraph_demo.py b/examples/langgraph_demo.py
index 0653afb..72f0e99 100644
--- a/examples/langgraph_demo.py
+++ b/examples/langgraph_demo.py
@@ -1,14 +1,17 @@
# Requires: pip install shekel[openai] "langgraph>=0.2"
"""
-Minimal LangGraph demo showing shekel budget enforcement.
+LangGraph demo: budget enforcement with the budgeted_graph() helper.
-This example builds a simple one-node LangGraph that calls OpenAI,
-wrapped in a shekel budget context to track and cap spend.
+Shows three patterns:
+1. budgeted_graph() convenience helper (recommended)
+2. budget() directly β equivalent but more verbose
+3. Fallback model when budget threshold is reached
"""
import os
from shekel import BudgetExceededError, budget
+from shekel.integrations.langgraph import budgeted_graph
def main() -> None:
@@ -46,14 +49,45 @@ def call_llm(state: State) -> State:
graph.add_edge("llm", END)
app = graph.compile()
+ # ------------------------------------------------------------------
+ # 1. budgeted_graph() β recommended convenience helper
+ # ------------------------------------------------------------------
+ print("=== budgeted_graph() helper ===")
try:
- with budget(max_usd=0.10, warn_at=0.8) as b:
+ with budgeted_graph(max_usd=0.10, name="demo", warn_at=0.8) as b:
result = app.invoke({"question": "What is 2+2?", "answer": ""})
print(f"Answer: {result['answer']}")
print(f"Spent: ${b.spent:.4f} / ${b.limit:.2f}")
except BudgetExceededError as e:
print(f"Budget exceeded: {e}")
+ # ------------------------------------------------------------------
+ # 2. budget() directly β same result, more explicit
+ # ------------------------------------------------------------------
+ print("\n=== budget() directly ===")
+ try:
+ with budget(max_usd=0.10) as b:
+ result = app.invoke({"question": "Name a planet.", "answer": ""})
+ print(f"Answer: {result['answer']}")
+ print(f"Spent: ${b.spent:.4f}")
+ except BudgetExceededError as e:
+ print(f"Budget exceeded: {e}")
+
+ # ------------------------------------------------------------------
+ # 3. Fallback model when threshold is reached
+ # ------------------------------------------------------------------
+ print("\n=== Fallback model ===")
+ with budgeted_graph(
+ max_usd=0.001,
+ name="fallback-demo",
+ fallback={"at_pct": 0.5, "model": "gpt-4o-mini"},
+ ) as b:
+ result = app.invoke({"question": "What is the capital of France?", "answer": ""})
+ print(f"Answer: {result['answer']}")
+ if b.model_switched:
+ print(f"Switched to fallback at ${b.switched_at_usd:.6f}")
+ print(f"Total: ${b.spent:.4f}")
+
if __name__ == "__main__":
main()
diff --git a/examples/litellm_basic.py b/examples/litellm_basic.py
new file mode 100644
index 0000000..310e391
--- /dev/null
+++ b/examples/litellm_basic.py
@@ -0,0 +1,136 @@
+# Requires: pip install shekel[litellm]
+"""
+LiteLLM examples: basic tracking, multi-provider, streaming, fallback, async.
+
+LiteLLM routes to 100+ providers (OpenAI, Anthropic, Gemini, Cohere, Ollama,
+Azure, Bedrock, Mistral, β¦) through a unified OpenAI-compatible interface.
+Shekel's LiteLLMAdapter patches litellm.completion and litellm.acompletion,
+so every call is tracked automatically β no matter which provider is used.
+"""
+
+import asyncio
+import os
+
+from shekel import BudgetExceededError, budget
+
+
+def main() -> None:
+ try:
+ import litellm
+ except ImportError:
+ print("Run: pip install shekel[litellm]")
+ return
+
+ api_key = os.environ.get("OPENAI_API_KEY")
+ if not api_key:
+ print("Set OPENAI_API_KEY (or the key for your chosen provider) to run this demo.")
+ return
+
+ # ------------------------------------------------------------------
+ # 1. Basic budget tracking
+ # ------------------------------------------------------------------
+ print("=== Basic budget ===")
+ try:
+ with budget(max_usd=0.10, warn_at=0.8) as b:
+ response = litellm.completion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": "Say hello in one sentence."}],
+ )
+ print(response.choices[0].message.content)
+ print(f"Spent: ${b.spent:.4f}")
+ except BudgetExceededError as e:
+ print(f"Budget exceeded: {e}")
+
+ # ------------------------------------------------------------------
+ # 2. Multi-provider: route to different backends in one budget
+ # ------------------------------------------------------------------
+ print("\n=== Multi-provider under one budget ===")
+ with budget(max_usd=1.00, name="multi-provider") as b:
+ # OpenAI
+ r1 = litellm.completion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": "Name a color."}],
+ max_tokens=5,
+ )
+ print(f"OpenAI: {r1.choices[0].message.content.strip()}")
+
+ # Anthropic β set ANTHROPIC_API_KEY to run this leg
+ if os.environ.get("ANTHROPIC_API_KEY"):
+ r2 = litellm.completion(
+ model="claude-3-haiku-20240307",
+ messages=[{"role": "user", "content": "Name a fruit."}],
+ max_tokens=5,
+ )
+ print(f"Anthropic: {r2.choices[0].message.content.strip()}")
+
+ print(f"Combined cost: ${b.spent:.4f}")
+ print(b.summary())
+
+ # ------------------------------------------------------------------
+ # 3. Streaming
+ # ------------------------------------------------------------------
+ print("\n=== Streaming ===")
+ with budget(max_usd=0.10) as b:
+ stream = litellm.completion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": "Count from 1 to 5."}],
+ stream=True,
+ )
+ for chunk in stream:
+ print(chunk.choices[0].delta.content or "", end="", flush=True)
+ print(f"\nStreaming cost: ${b.spent:.4f}")
+
+ # ------------------------------------------------------------------
+ # 4. Fallback to cheaper model
+ # ------------------------------------------------------------------
+ print("\n=== Fallback model ===")
+ with budget(max_usd=0.001, fallback={"at_pct": 0.5, "model": "gpt-4o-mini"}) as b:
+ response = litellm.completion(
+ model="gpt-4o",
+ messages=[{"role": "user", "content": "What is the capital of France?"}],
+ max_tokens=20,
+ )
+ print(response.choices[0].message.content)
+ if b.model_switched:
+ print(f"Switched to fallback at ${b.switched_at_usd:.6f}")
+ print(f"Total: ${b.spent:.4f}")
+
+ # ------------------------------------------------------------------
+ # 5. Call-count limit (useful for rate-limit-aware agents)
+ # ------------------------------------------------------------------
+ print("\n=== Call-count limit ===")
+ questions = ["What is 1+1?", "What is 2+2?", "What is 3+3?", "What is 4+4?"]
+ answered = []
+ try:
+ with budget(max_llm_calls=2) as b:
+ for q in questions:
+ r = litellm.completion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": q}],
+ max_tokens=5,
+ )
+ answered.append(r.choices[0].message.content.strip())
+ except BudgetExceededError:
+ pass
+ print(f"Answered {len(answered)}/{len(questions)} questions (limit: 2 calls)")
+
+ # ------------------------------------------------------------------
+ # 6. Async usage
+ # ------------------------------------------------------------------
+ print("\n=== Async ===")
+
+ async def async_example() -> None:
+ async with budget(max_usd=0.10) as b:
+ response = await litellm.acompletion(
+ model="gpt-4o-mini",
+ messages=[{"role": "user", "content": "What is Python in one sentence?"}],
+ max_tokens=30,
+ )
+ print(response.choices[0].message.content)
+ print(f"Async cost: ${b.spent:.4f}")
+
+ asyncio.run(async_example())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/mkdocs.yml b/mkdocs.yml
index befdb99..47623db 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -114,11 +114,12 @@ nav:
- Streaming: usage/streaming.md
- Decorators: usage/decorators.md
- Integrations:
- - Langfuse: integrations/langfuse.md
+ - LiteLLM: integrations/litellm.md
- LangGraph: integrations/langgraph.md
- CrewAI: integrations/crewai.md
- OpenAI: integrations/openai.md
- Anthropic: integrations/anthropic.md
+ - Langfuse: integrations/langfuse.md
- Reference:
- CLI Tools: cli.md
- API Reference: api-reference.md
diff --git a/pyproject.toml b/pyproject.toml
index a53c723..17f72b2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -62,7 +62,8 @@ dependencies = []
openai = ["openai>=1.0.0"]
anthropic = ["anthropic>=0.7.0"]
langfuse = ["langfuse>=2.0.0"]
-all = ["openai>=1.0.0", "anthropic>=0.7.0", "langfuse>=2.0.0"]
+litellm = ["litellm>=1.0.0"]
+all = ["openai>=1.0.0", "anthropic>=0.7.0", "langfuse>=2.0.0", "litellm>=1.0.0"]
all-models = ["openai>=1.0.0", "anthropic>=0.7.0", "langfuse>=2.0.0", "tokencost>=0.1.0"]
cli = ["click>=8.0.0"]
dev = [
@@ -102,14 +103,17 @@ benchmark_warmup = false
[tool.black]
line-length = 100
target-version = ["py39"]
+exclude = "examples/"
[tool.isort]
profile = "black"
line_length = 100
+skip_glob = ["examples/*"]
[tool.ruff]
target-version = "py39"
line-length = 100
+exclude = ["examples"]
[tool.ruff.lint]
select = ["E", "F", "I", "UP"]
@@ -127,3 +131,7 @@ ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "langfuse"
ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "litellm"
+ignore_missing_imports = true
diff --git a/shekel/_patch.py b/shekel/_patch.py
index faf2840..e532df8 100644
--- a/shekel/_patch.py
+++ b/shekel/_patch.py
@@ -332,3 +332,89 @@ async def _wrap_anthropic_stream_async(stream: Any) -> Any:
yield event
finally:
_record(input_tokens, output_tokens, model)
+
+
+# ---------------------------------------------------------------------------
+# LiteLLM sync wrapper
+# ---------------------------------------------------------------------------
+
+
+def _litellm_sync_wrapper(*args: Any, **kwargs: Any) -> Any:
+ original = _originals.get("litellm_sync")
+ if original is None:
+ raise RuntimeError("shekel: litellm original not stored")
+
+ active_budget = _context.get_active_budget()
+ if active_budget is not None:
+ _apply_fallback_if_needed(active_budget, kwargs, "litellm")
+
+ if kwargs.get("stream") is True:
+ kwargs.setdefault("stream_options", {})["include_usage"] = True
+ stream = original(*args, **kwargs)
+ return _wrap_litellm_stream(stream)
+
+ response = original(*args, **kwargs)
+ input_tokens, output_tokens, model = _extract_openai_tokens(response)
+ _record(input_tokens, output_tokens, model)
+ return response
+
+
+def _wrap_litellm_stream(stream: Any) -> Generator[Any, None, None]:
+ seen: list[tuple[int, int, str]] = []
+ try:
+ for chunk in stream:
+ if getattr(chunk, "usage", None) is not None:
+ try:
+ it = chunk.usage.prompt_tokens or 0
+ ot = chunk.usage.completion_tokens or 0
+ m = getattr(chunk, "model", None) or "unknown"
+ seen.append((it, ot, m))
+ except AttributeError:
+ pass
+ yield chunk
+ finally:
+ it, ot, m = seen[-1] if seen else (0, 0, "unknown")
+ _record(it, ot, m)
+
+
+# ---------------------------------------------------------------------------
+# LiteLLM async wrapper
+# ---------------------------------------------------------------------------
+
+
+async def _litellm_async_wrapper(*args: Any, **kwargs: Any) -> Any:
+ original = _originals.get("litellm_async")
+ if original is None:
+ raise RuntimeError("shekel: litellm async original not stored")
+
+ active_budget = _context.get_active_budget()
+ if active_budget is not None:
+ _apply_fallback_if_needed(active_budget, kwargs, "litellm")
+
+ if kwargs.get("stream") is True:
+ kwargs.setdefault("stream_options", {})["include_usage"] = True
+ stream = await original(*args, **kwargs)
+ return _wrap_litellm_stream_async(stream)
+
+ response = await original(*args, **kwargs)
+ input_tokens, output_tokens, model = _extract_openai_tokens(response)
+ _record(input_tokens, output_tokens, model)
+ return response
+
+
+async def _wrap_litellm_stream_async(stream: Any) -> Any:
+ seen: list[tuple[int, int, str]] = []
+ try:
+ async for chunk in stream:
+ if getattr(chunk, "usage", None) is not None:
+ try:
+ it = chunk.usage.prompt_tokens or 0
+ ot = chunk.usage.completion_tokens or 0
+ m = getattr(chunk, "model", None) or "unknown"
+ seen.append((it, ot, m))
+ except AttributeError:
+ pass
+ yield chunk
+ finally:
+ it, ot, m = seen[-1] if seen else (0, 0, "unknown")
+ _record(it, ot, m)
diff --git a/shekel/providers/__init__.py b/shekel/providers/__init__.py
index 18edaae..23aec21 100644
--- a/shekel/providers/__init__.py
+++ b/shekel/providers/__init__.py
@@ -16,10 +16,18 @@
ADAPTER_REGISTRY.register(OpenAIAdapter())
ADAPTER_REGISTRY.register(AnthropicAdapter())
+try:
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ ADAPTER_REGISTRY.register(LiteLLMAdapter())
+except ImportError:
+ pass
+
__all__ = [
"ADAPTER_REGISTRY",
"ProviderAdapter",
"ProviderRegistry",
"OpenAIAdapter",
"AnthropicAdapter",
+ "LiteLLMAdapter",
]
diff --git a/shekel/providers/litellm.py b/shekel/providers/litellm.py
new file mode 100644
index 0000000..4a2a398
--- /dev/null
+++ b/shekel/providers/litellm.py
@@ -0,0 +1,90 @@
+"""LiteLLM provider adapter for Shekel LLM cost tracking."""
+
+from __future__ import annotations
+
+from collections.abc import Generator
+from typing import Any
+
+from shekel.providers.base import ProviderAdapter
+
+
+class LiteLLMAdapter(ProviderAdapter):
+ """Adapter for LiteLLM's unified completion API.
+
+ LiteLLM exposes an OpenAI-compatible interface (litellm.completion /
+ litellm.acompletion) that routes to 100+ providers. This adapter patches
+ those module-level functions to track cost inside shekel budgets.
+ """
+
+ def __init__(self) -> None:
+ self._originals: dict[str, Any] = {}
+
+ @property
+ def name(self) -> str:
+ return "litellm"
+
+ def install_patches(self) -> None:
+ """Monkey-patch litellm.completion and litellm.acompletion."""
+ from shekel import _patch
+
+ try:
+ import litellm
+
+ if "litellm_sync" not in _patch._originals:
+ _patch._originals["litellm_sync"] = litellm.completion
+ _patch._originals["litellm_async"] = litellm.acompletion
+ litellm.completion = _patch._litellm_sync_wrapper
+ litellm.acompletion = _patch._litellm_async_wrapper
+ except ImportError:
+ pass
+
+ def remove_patches(self) -> None:
+ """Restore original litellm functions."""
+ from shekel import _patch
+
+ try:
+ import litellm
+
+ if "litellm_sync" in _patch._originals:
+ litellm.completion = _patch._originals.pop("litellm_sync")
+ if "litellm_async" in _patch._originals:
+ litellm.acompletion = _patch._originals.pop("litellm_async")
+ except ImportError:
+ pass
+
+ def extract_tokens(self, response: Any) -> tuple[int, int, str]:
+ """Extract tokens from a LiteLLM non-streaming response.
+
+ LiteLLM uses the OpenAI format: usage.prompt_tokens / usage.completion_tokens.
+ Model names may include a provider prefix (e.g. 'openai/gpt-4o').
+ """
+ try:
+ usage = response.usage
+ if usage is None:
+ model = getattr(response, "model", None) or "unknown"
+ return 0, 0, model
+ input_tokens = usage.prompt_tokens or 0
+ output_tokens = usage.completion_tokens or 0
+ model = getattr(response, "model", None) or "unknown"
+ return input_tokens, output_tokens, model
+ except AttributeError:
+ return 0, 0, "unknown"
+
+ def detect_streaming(self, kwargs: dict[str, Any], response: Any) -> bool:
+ """Detect streaming via the 'stream' kwarg (same as OpenAI)."""
+ return kwargs.get("stream") is True
+
+ def wrap_stream(self, stream: Any) -> Generator[Any, None, tuple[int, int, str]]:
+ """Wrap a LiteLLM streaming response to collect token counts."""
+ seen: list[tuple[int, int, str]] = []
+ for chunk in stream:
+ if getattr(chunk, "usage", None) is not None:
+ try:
+ it = chunk.usage.prompt_tokens or 0
+ ot = chunk.usage.completion_tokens or 0
+ m = getattr(chunk, "model", None) or "unknown"
+ seen.append((it, ot, m))
+ except AttributeError:
+ pass
+ yield chunk
+ return seen[-1] if seen else (0, 0, "unknown")
diff --git a/tests/providers/conftest.py b/tests/providers/conftest.py
index d1587c3..8e30c3a 100644
--- a/tests/providers/conftest.py
+++ b/tests/providers/conftest.py
@@ -57,6 +57,23 @@ def __init__(self, model: str, input_tokens: int):
self.usage = MockUsage(input_tokens=input_tokens)
+class MockLiteLLMResponse:
+ """Mock LiteLLM API response (OpenAI-compatible format)."""
+
+ def __init__(self, model: str, input_tokens: int, output_tokens: int):
+ self.model = model
+ self.usage = MockUsage(prompt_tokens=input_tokens, completion_tokens=output_tokens)
+
+
+class MockLiteLLMChunk:
+ """Mock LiteLLM streaming chunk."""
+
+ def __init__(self, model: str | None = None, usage: MockUsage | None = None):
+ self.model = model
+ self.usage = usage
+ self.choices = []
+
+
class ProviderTestBase:
"""Base class providing mock response factories for provider testing."""
@@ -85,6 +102,23 @@ def make_openai_stream(
usage=MockUsage(prompt_tokens=input_tokens, completion_tokens=output_tokens),
)
+ def make_litellm_response(
+ self, model: str = "gpt-4o", input_tokens: int = 0, output_tokens: int = 0
+ ) -> MockLiteLLMResponse:
+ """Create a mock LiteLLM API response."""
+ return MockLiteLLMResponse(model, input_tokens, output_tokens)
+
+ def make_litellm_stream(
+ self, model: str = "gpt-4o", input_tokens: int = 0, output_tokens: int = 0
+ ) -> Generator[MockLiteLLMChunk, None, None]:
+ """Create a mock LiteLLM streaming response."""
+ yield MockLiteLLMChunk(model=model)
+ yield MockLiteLLMChunk(model=model)
+ yield MockLiteLLMChunk(
+ model=model,
+ usage=MockUsage(prompt_tokens=input_tokens, completion_tokens=output_tokens),
+ )
+
def make_anthropic_stream(
self, model: str = "claude-3-haiku-20240307", input_tokens: int = 0, output_tokens: int = 0
) -> Generator[MockAnthropicEvent, None, None]:
diff --git a/tests/providers/test_litellm_adapter.py b/tests/providers/test_litellm_adapter.py
new file mode 100644
index 0000000..61330bd
--- /dev/null
+++ b/tests/providers/test_litellm_adapter.py
@@ -0,0 +1,265 @@
+"""TDD tests for LiteLLMAdapter."""
+
+from __future__ import annotations
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+from tests.providers.conftest import MockLiteLLMChunk, ProviderTestBase
+
+
+class TestLiteLLMAdapterBasic(ProviderTestBase):
+
+ def test_name_is_litellm(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ assert adapter.name == "litellm"
+
+ def test_implements_provider_adapter(self):
+ from shekel.providers.base import ProviderAdapter
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ assert isinstance(adapter, ProviderAdapter)
+
+
+class TestLiteLLMTokenExtraction(ProviderTestBase):
+
+ def test_extract_tokens_from_valid_response(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ response = self.make_litellm_response("gpt-4o", 100, 50)
+ input_tok, output_tok, model = adapter.extract_tokens(response)
+ assert input_tok == 100
+ assert output_tok == 50
+ assert model == "gpt-4o"
+
+ def test_extract_tokens_handles_none_usage(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ response = MagicMock()
+ response.usage = None
+ response.model = "gpt-4o"
+ input_tok, output_tok, model = adapter.extract_tokens(response)
+ assert input_tok == 0
+ assert output_tok == 0
+
+ def test_extract_tokens_handles_missing_usage_attribute(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ response = MagicMock(spec=[])
+ input_tok, output_tok, model = adapter.extract_tokens(response)
+ assert input_tok == 0
+ assert output_tok == 0
+ assert model == "unknown"
+
+ def test_extract_tokens_handles_zero_tokens(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ response = self.make_litellm_response("gpt-4o", 0, 0)
+ input_tok, output_tok, model = adapter.extract_tokens(response)
+ assert input_tok == 0
+ assert output_tok == 0
+
+ def test_extract_tokens_handles_litellm_prefixed_model(self):
+ """LiteLLM may return model names like 'openai/gpt-4o'."""
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ response = self.make_litellm_response("openai/gpt-4o", 200, 80)
+ input_tok, output_tok, model = adapter.extract_tokens(response)
+ assert input_tok == 200
+ assert output_tok == 80
+ assert model == "openai/gpt-4o"
+
+
+class TestLiteLLMStreamDetection(ProviderTestBase):
+
+ def test_detect_streaming_true_when_stream_kwarg_set(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ assert adapter.detect_streaming({"stream": True}, None) is True
+
+ def test_detect_streaming_false_when_no_kwarg(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ assert adapter.detect_streaming({}, None) is False
+
+ def test_detect_streaming_false_when_stream_false(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ assert adapter.detect_streaming({"stream": False}, None) is False
+
+
+class TestLiteLLMStreamWrapping(ProviderTestBase):
+
+ def test_wrap_stream_yields_all_chunks(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ stream = self.make_litellm_stream("gpt-4o", 100, 50)
+ chunks = list(adapter.wrap_stream(stream))
+ assert len(chunks) == 3
+
+ def test_wrap_stream_collects_tokens_from_final_chunk(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ stream = self.make_litellm_stream("gpt-4o", 100, 50)
+ gen = adapter.wrap_stream(stream)
+ try:
+ while True:
+ next(gen)
+ except StopIteration as e:
+ input_tok, output_tok, model = e.value
+ assert input_tok == 100
+ assert output_tok == 50
+ assert model == "gpt-4o"
+
+ def test_wrap_stream_returns_unknown_if_no_usage_chunk(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ chunks = [MockLiteLLMChunk(model="gpt-4o", usage=None) for _ in range(3)]
+ gen = adapter.wrap_stream(iter(chunks))
+ try:
+ while True:
+ next(gen)
+ except StopIteration as e:
+ input_tok, output_tok, model = e.value
+ assert input_tok == 0
+ assert output_tok == 0
+
+ def test_wrap_stream_handles_chunk_attribute_error(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+
+ class BrokenUsage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(f"broken: {name}")
+
+ def stream_with_bad_usage():
+ chunk1 = MagicMock()
+ chunk1.usage = BrokenUsage()
+ chunk1.model = "gpt-4o"
+ yield chunk1
+ chunk2 = MagicMock()
+ chunk2.usage = None
+ chunk2.model = "gpt-4o"
+ yield chunk2
+
+ gen = adapter.wrap_stream(stream_with_bad_usage())
+ chunks = list(gen)
+ assert len(chunks) == 2
+
+
+class TestLiteLLMPatching(ProviderTestBase):
+
+ def test_install_patches_when_litellm_available(self):
+ import litellm
+
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ original = litellm.completion
+ adapter.install_patches()
+ assert litellm.completion is not original
+ adapter.remove_patches()
+
+ def test_remove_patches_restores_originals(self):
+ import litellm
+
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ original = litellm.completion
+ adapter.install_patches()
+ adapter.remove_patches()
+ assert litellm.completion is original
+
+ def test_install_patches_idempotent(self):
+ """Calling install_patches twice does not double-wrap."""
+ import litellm
+
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ original = litellm.completion
+ adapter.install_patches()
+ after_first = litellm.completion
+ adapter.install_patches() # second call β should no-op
+ assert litellm.completion is after_first
+ adapter.remove_patches()
+ assert litellm.completion is original
+
+ def test_install_patches_safe_without_litellm(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ with patch.dict("sys.modules", {"litellm": None}):
+ try:
+ adapter.install_patches()
+ except Exception:
+ pass # ImportError is fine; no logic error
+
+ def test_remove_patches_safe_without_litellm(self):
+ from shekel.providers.litellm import LiteLLMAdapter
+
+ adapter = LiteLLMAdapter()
+ with patch.dict("sys.modules", {"litellm": None}):
+ adapter.remove_patches() # Must not raise
+
+
+class TestLiteLLMCostRecording(ProviderTestBase):
+
+ def test_completion_records_cost(self):
+ """litellm.completion inside budget() records spend."""
+ import litellm
+
+ from shekel import budget
+
+ mock_response = self.make_litellm_response("gpt-4o-mini", 100, 50)
+
+ with patch.object(litellm, "completion", return_value=mock_response):
+ with budget(max_usd=1.0) as b:
+ litellm.completion(model="gpt-4o-mini", messages=[])
+ assert b.spent > 0
+
+ def test_completion_stream_records_cost(self):
+ """litellm.completion(stream=True) inside budget() records spend."""
+ import litellm
+
+ from shekel import budget
+
+ mock_stream = self.make_litellm_stream("gpt-4o-mini", 100, 50)
+
+ with patch.object(litellm, "completion", return_value=mock_stream):
+ with budget(max_usd=1.0) as b:
+ stream = litellm.completion(model="gpt-4o-mini", messages=[], stream=True)
+ for _ in stream:
+ pass
+ assert b.spent > 0
+
+ @pytest.mark.asyncio
+ async def test_acompletion_records_cost(self):
+ """litellm.acompletion inside budget() records spend."""
+ import litellm
+
+ from shekel import budget
+
+ mock_response = self.make_litellm_response("gpt-4o-mini", 100, 50)
+
+ with patch.object(litellm, "acompletion", new=AsyncMock(return_value=mock_response)):
+ with budget(max_usd=1.0) as b:
+ await litellm.acompletion(model="gpt-4o-mini", messages=[])
+ assert b.spent > 0
diff --git a/tests/providers/test_registry.py b/tests/providers/test_registry.py
index 5d2dcac..a72e735 100644
--- a/tests/providers/test_registry.py
+++ b/tests/providers/test_registry.py
@@ -285,3 +285,15 @@ def test_anthropic_adapter_registered(self):
anthropic_adapter = ADAPTER_REGISTRY.get_by_name("anthropic")
assert anthropic_adapter is not None
+
+ def test_litellm_import_error_is_swallowed(self):
+ """Lines 23-24: ImportError when litellm is absent is silently ignored."""
+ import importlib
+ import sys
+ from unittest.mock import patch
+
+ with patch.dict(sys.modules, {"litellm": None, "shekel.providers.litellm": None}):
+ # Re-importing with litellm blocked must not raise
+ import shekel.providers as providers_mod
+
+ importlib.reload(providers_mod)
diff --git a/tests/test_patch_coverage.py b/tests/test_patch_coverage.py
new file mode 100644
index 0000000..2252081
--- /dev/null
+++ b/tests/test_patch_coverage.py
@@ -0,0 +1,403 @@
+"""Tests to reach 100% coverage of shekel/_patch.py.
+
+Each test targets specific uncovered lines identified by coverage analysis.
+"""
+
+from __future__ import annotations
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+# ---------------------------------------------------------------------------
+# _validate_same_provider (line 69)
+# ---------------------------------------------------------------------------
+
+
+def test_validate_same_provider_anthropic_rejects_openai_model():
+ """Line 69: anthropic provider + openai fallback model raises ValueError."""
+ from shekel._patch import _validate_same_provider
+
+ with pytest.raises(ValueError, match="OpenAI model"):
+ _validate_same_provider("gpt-4o", "anthropic")
+
+
+# ---------------------------------------------------------------------------
+# _extract_openai_tokens (lines 101-102)
+# ---------------------------------------------------------------------------
+
+
+def test_extract_openai_tokens_attribute_error():
+ """Lines 101-102: response with no attributes returns (0, 0, 'unknown')."""
+ from shekel._patch import _extract_openai_tokens
+
+ response = MagicMock(spec=[]) # no attributes at all
+ assert _extract_openai_tokens(response) == (0, 0, "unknown")
+
+
+# ---------------------------------------------------------------------------
+# _record (lines 121-122 and 141-143)
+# ---------------------------------------------------------------------------
+
+
+def test_record_swallows_pricing_exception():
+ """Lines 121-122: if calculate_cost raises, cost falls back to 0.0."""
+ from shekel import budget
+ from shekel._patch import _record
+
+ with budget(max_usd=1.0) as b:
+ with patch("shekel._pricing.calculate_cost", side_effect=RuntimeError("bad")):
+ _record(100, 50, "gpt-4o")
+ assert b.spent == pytest.approx(0.0)
+
+
+def test_record_swallows_adapter_emit_exception():
+ """Lines 141-143: if AdapterRegistry.emit_event raises, exception is swallowed."""
+ from shekel import budget
+ from shekel._patch import _record
+
+ with budget(max_usd=1.0):
+ with patch(
+ "shekel.integrations.AdapterRegistry.emit_event",
+ side_effect=RuntimeError("adapter crash"),
+ ):
+ _record(100, 50, "gpt-4o-mini") # must not raise
+
+
+# ---------------------------------------------------------------------------
+# _openai_sync_wrapper (line 154)
+# ---------------------------------------------------------------------------
+
+
+def test_openai_sync_wrapper_raises_if_no_original():
+ """Line 154: RuntimeError when openai_sync not in _originals."""
+ from shekel._patch import _openai_sync_wrapper
+
+ with patch("shekel._patch._originals", {}):
+ with pytest.raises(RuntimeError, match="openai original not stored"):
+ _openai_sync_wrapper(None)
+
+
+# ---------------------------------------------------------------------------
+# _wrap_openai_stream (lines 182-183)
+# ---------------------------------------------------------------------------
+
+
+def test_wrap_openai_stream_swallows_chunk_attribute_error():
+ """Lines 182-183: chunk whose usage attrs raise AttributeError is handled."""
+ from shekel._patch import _wrap_openai_stream
+
+ class BrokenUsage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(name)
+
+ def stream():
+ chunk = MagicMock()
+ chunk.usage = BrokenUsage()
+ yield chunk
+
+ list(_wrap_openai_stream(stream())) # must not raise
+
+
+# ---------------------------------------------------------------------------
+# _openai_async_wrapper (line 201)
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_openai_async_wrapper_raises_if_no_original():
+ """Line 201: RuntimeError when openai_async not in _originals."""
+ from shekel._patch import _openai_async_wrapper
+
+ with patch("shekel._patch._originals", {}):
+ with pytest.raises(RuntimeError, match="openai async original not stored"):
+ await _openai_async_wrapper(None)
+
+
+# ---------------------------------------------------------------------------
+# _wrap_openai_stream_async (lines 229-230 and 236)
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_wrap_openai_stream_async_swallows_attribute_error():
+ """Lines 229-230: broken usage in async stream chunk is handled."""
+ from shekel._patch import _wrap_openai_stream_async
+
+ class BrokenUsage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(name)
+
+ async def stream():
+ chunk = MagicMock()
+ chunk.usage = BrokenUsage()
+ yield chunk
+
+ async for _ in _wrap_openai_stream_async(stream()):
+ pass
+
+
+@pytest.mark.asyncio
+async def test_wrap_openai_stream_async_no_usage_chunks():
+ """Line 236: when no chunk has usage, falls back to (0, 0, 'unknown')."""
+ from shekel import budget
+ from shekel._patch import _wrap_openai_stream_async
+
+ async def stream():
+ chunk = MagicMock()
+ chunk.usage = None
+ yield chunk
+
+ with budget(max_usd=1.0) as b:
+ async for _ in _wrap_openai_stream_async(stream()):
+ pass
+ assert b.spent == pytest.approx(0.0)
+
+
+# ---------------------------------------------------------------------------
+# _anthropic_sync_wrapper (line 248)
+# ---------------------------------------------------------------------------
+
+
+def test_anthropic_sync_wrapper_raises_if_no_original():
+ """Line 248: RuntimeError when anthropic_sync not in _originals."""
+ from shekel._patch import _anthropic_sync_wrapper
+
+ with patch("shekel._patch._originals", {}):
+ with pytest.raises(RuntimeError, match="anthropic original not stored"):
+ _anthropic_sync_wrapper(None)
+
+
+# ---------------------------------------------------------------------------
+# _wrap_anthropic_stream (lines 277-278 and 282-283)
+# ---------------------------------------------------------------------------
+
+
+def test_wrap_anthropic_stream_swallows_message_start_attribute_error():
+ """Lines 277-278: broken message_start event handled gracefully."""
+ from shekel._patch import _wrap_anthropic_stream
+
+ class BrokenMessage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(name)
+
+ class MessageStartEvent:
+ type = "message_start"
+ message = BrokenMessage()
+
+ list(_wrap_anthropic_stream(iter([MessageStartEvent()])))
+
+
+def test_wrap_anthropic_stream_swallows_message_delta_attribute_error():
+ """Lines 282-283: broken message_delta event handled gracefully."""
+ from shekel._patch import _wrap_anthropic_stream
+
+ class BrokenUsage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(name)
+
+ class MessageDeltaEvent:
+ type = "message_delta"
+ usage = BrokenUsage()
+
+ list(_wrap_anthropic_stream(iter([MessageDeltaEvent()])))
+
+
+# ---------------------------------------------------------------------------
+# _anthropic_async_wrapper (line 297)
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_anthropic_async_wrapper_raises_if_no_original():
+ """Line 297: RuntimeError when anthropic_async not in _originals."""
+ from shekel._patch import _anthropic_async_wrapper
+
+ with patch("shekel._patch._originals", {}):
+ with pytest.raises(RuntimeError, match="anthropic async original not stored"):
+ await _anthropic_async_wrapper(None)
+
+
+# ---------------------------------------------------------------------------
+# _wrap_anthropic_stream_async (lines 325-326 and 330-331)
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_wrap_anthropic_stream_async_swallows_message_start_error():
+ """Lines 325-326: broken message_start in async stream handled."""
+ from shekel._patch import _wrap_anthropic_stream_async
+
+ class BrokenMessage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(name)
+
+ class MessageStartEvent:
+ type = "message_start"
+ message = BrokenMessage()
+
+ async def stream():
+ yield MessageStartEvent()
+
+ async for _ in _wrap_anthropic_stream_async(stream()):
+ pass
+
+
+@pytest.mark.asyncio
+async def test_wrap_anthropic_stream_async_swallows_message_delta_error():
+ """Lines 330-331: broken message_delta in async stream handled."""
+ from shekel._patch import _wrap_anthropic_stream_async
+
+ class BrokenUsage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(name)
+
+ class MessageDeltaEvent:
+ type = "message_delta"
+ usage = BrokenUsage()
+
+ async def stream():
+ yield MessageDeltaEvent()
+
+ async for _ in _wrap_anthropic_stream_async(stream()):
+ pass
+
+
+# ---------------------------------------------------------------------------
+# _litellm_sync_wrapper (line 345)
+# ---------------------------------------------------------------------------
+
+
+def test_litellm_sync_wrapper_raises_if_no_original():
+ """Line 345: RuntimeError when litellm_sync not in _originals."""
+ from shekel._patch import _litellm_sync_wrapper
+
+ with patch("shekel._patch._originals", {}):
+ with pytest.raises(RuntimeError, match="litellm original not stored"):
+ _litellm_sync_wrapper()
+
+
+# ---------------------------------------------------------------------------
+# _wrap_litellm_stream (lines 372-373)
+# ---------------------------------------------------------------------------
+
+
+def test_wrap_litellm_stream_swallows_chunk_attribute_error():
+ """Lines 372-373: broken usage attrs in litellm stream chunk handled."""
+ from shekel._patch import _wrap_litellm_stream
+
+ class BrokenUsage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(name)
+
+ def stream():
+ chunk = MagicMock()
+ chunk.usage = BrokenUsage()
+ yield chunk
+
+ list(_wrap_litellm_stream(stream()))
+
+
+# ---------------------------------------------------------------------------
+# _litellm_async_wrapper (lines 388 and 395-397)
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_litellm_async_wrapper_raises_if_no_original():
+ """Line 388: RuntimeError when litellm_async not in _originals."""
+ from shekel._patch import _litellm_async_wrapper
+
+ with patch("shekel._patch._originals", {}):
+ with pytest.raises(RuntimeError, match="litellm async original not stored"):
+ await _litellm_async_wrapper()
+
+
+@pytest.mark.asyncio
+async def test_litellm_async_wrapper_stream_path():
+ """Lines 395-397: stream=True branch in async wrapper returns async generator."""
+ from shekel._patch import _litellm_async_wrapper
+
+ async def mock_async_stream():
+ chunk = MagicMock()
+ chunk.usage = MagicMock()
+ chunk.usage.prompt_tokens = 10
+ chunk.usage.completion_tokens = 5
+ chunk.model = "gpt-4o-mini"
+ yield chunk
+
+ original = AsyncMock(return_value=mock_async_stream())
+
+ mock_budget = MagicMock()
+ mock_budget._using_fallback = False
+
+ with patch("shekel._patch._originals", {"litellm_async": original}):
+ with patch("shekel._context.get_active_budget", return_value=mock_budget):
+ stream = await _litellm_async_wrapper(model="gpt-4o-mini", messages=[], stream=True)
+ assert stream is not None
+ # drain the generator to hit the finally block
+ async for _ in stream:
+ pass
+
+
+# ---------------------------------------------------------------------------
+# _wrap_litellm_stream_async (lines 406-420)
+# ---------------------------------------------------------------------------
+
+
+@pytest.mark.asyncio
+async def test_wrap_litellm_stream_async_records_cost():
+ """Lines 406-414: async litellm stream records tokens from usage chunk."""
+ from shekel import budget
+ from shekel._patch import _wrap_litellm_stream_async
+
+ async def stream():
+ chunk = MagicMock()
+ chunk.usage = None
+ yield chunk
+ final = MagicMock()
+ final.usage = MagicMock()
+ final.usage.prompt_tokens = 100
+ final.usage.completion_tokens = 50
+ final.model = "gpt-4o-mini"
+ yield final
+
+ with budget(max_usd=1.0) as b:
+ async for _ in _wrap_litellm_stream_async(stream()):
+ pass
+ assert b.spent > 0
+
+
+@pytest.mark.asyncio
+async def test_wrap_litellm_stream_async_swallows_attribute_error():
+ """Lines 415-416: broken usage in async litellm chunk handled."""
+ from shekel._patch import _wrap_litellm_stream_async
+
+ class BrokenUsage:
+ def __getattr__(self, name: str) -> None:
+ raise AttributeError(name)
+
+ async def stream():
+ chunk = MagicMock()
+ chunk.usage = BrokenUsage()
+ yield chunk
+
+ async for _ in _wrap_litellm_stream_async(stream()):
+ pass
+
+
+@pytest.mark.asyncio
+async def test_wrap_litellm_stream_async_no_usage_fallback():
+ """Line 420: no usage chunks β falls back to (0, 0, 'unknown')."""
+ from shekel import budget
+ from shekel._patch import _wrap_litellm_stream_async
+
+ async def stream():
+ chunk = MagicMock()
+ chunk.usage = None
+ yield chunk
+
+ with budget(max_usd=1.0) as b:
+ async for _ in _wrap_litellm_stream_async(stream()):
+ pass
+ assert b.spent == pytest.approx(0.0)