Skip to content

Commit edf83ac

Browse files
author
cellwebb
committed
feat(streaming): simplify to streaming-only implementation
- Remove create_message() method from base provider interface - Remove all backwards compatibility code and dual-mode logic - Implement streaming-only architecture across all providers - Add real-time SSE streaming with proper error handling - Simplify agent loop to use streaming by default - Update tests to work with streaming-only interface - Remove environment variable disable options - Improve user experience with consistent real-time responses BREAKING CHANGE: create_message() method removed - use stream_message() only
1 parent cd1ce53 commit edf83ac

7 files changed

Lines changed: 307 additions & 90 deletions

File tree

src/clippy/agent/conversation.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,17 @@ def compact_conversation(
256256
# Build temporary conversation for summarization
257257
summarization_conversation = [system_msg] + to_summarize + [summary_request]
258258

259-
# Call LLM to create summary
260-
response = provider.create_message(
259+
# Call LLM to create summary using streaming
260+
response_content = ""
261+
for chunk in provider.stream_message(
261262
messages=summarization_conversation,
262263
tools=[], # No tools needed for summarization
263264
model=model,
264-
)
265+
):
266+
if chunk.get("content"):
267+
response_content += chunk["content"]
268+
269+
response = {"content": response_content}
265270

266271
summary_content = response.get("content", "")
267272
if not summary_content:

src/clippy/agent/loop.py

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,10 @@ def run_agent_loop(
145145

146146
logger.debug(f"Loaded {len(tools)} tools for iteration {iteration}")
147147

148-
# Call provider (returns OpenAI message dict)
148+
# Call provider using streaming
149149
try:
150-
response = config.provider.create_message(
151-
messages=conversation_history,
152-
tools=tools,
153-
model=config.model,
150+
response = _process_streaming_response(
151+
config.provider, conversation_history, tools, config.model, config.console
154152
)
155153

156154
# Track token usage from this API call
@@ -205,12 +203,7 @@ def run_agent_loop(
205203
# Add to conversation history
206204
conversation_history.append(assistant_message)
207205

208-
# Print assistant's text response to the user
209-
if response.get("content"):
210-
content = response["content"]
211-
if isinstance(content, str) and content.strip():
212-
cleaned_content = content.lstrip("\n")
213-
config.console.print(f"\n[bold blue][📎][/bold blue] {escape(cleaned_content)}")
206+
# Note: Response content is already displayed during streaming
214207

215208
# Save conversation automatically after each assistant message
216209
if config.parent_agent is not None:
@@ -288,6 +281,68 @@ def run_agent_loop(
288281
# Note: No maximum iterations limit - loop runs until agent completes or is interrupted
289282

290283

284+
def _process_streaming_response(
285+
provider: LLMProvider,
286+
conversation_history: list[dict[str, Any]],
287+
tools: list[dict[str, Any]],
288+
model: str,
289+
console: ConsoleProtocol,
290+
) -> dict[str, Any]:
291+
"""
292+
Process streaming response from provider and display in real-time.
293+
294+
Args:
295+
provider: LLM provider instance
296+
conversation_history: Current conversation history
297+
tools: List of available tools
298+
model: Model identifier
299+
console: Console for output
300+
301+
Returns:
302+
Final consolidated response
303+
"""
304+
# Start with an empty header for the response
305+
console.print("\n[bold blue][📎][/bold blue] ", end="")
306+
307+
# Track accumulated content and tool calls
308+
accumulated_content = ""
309+
accumulated_tool_calls: list[dict[str, Any]] = []
310+
311+
for chunk in provider.stream_message(conversation_history, tools, model):
312+
# Handle streaming content deltas
313+
if chunk.get("delta") and chunk.get("content"):
314+
# Print the chunk directly for real-time display
315+
console.print(escape(chunk["content"]), end="")
316+
accumulated_content += chunk["content"]
317+
elif chunk.get("content") and not chunk.get("delta"):
318+
# Final full content (for non-streaming models like codex)
319+
accumulated_content = chunk["content"]
320+
console.print(escape(accumulated_content))
321+
elif not chunk.get("delta"):
322+
# Final chunk with complete response
323+
if chunk.get("content"):
324+
accumulated_content = chunk["content"]
325+
if not accumulated_content.strip() and accumulated_tool_calls:
326+
# Only tool calls, no content - print newline
327+
console.print()
328+
329+
# Return the final consolidated response
330+
return {
331+
"role": "assistant",
332+
"content": accumulated_content if accumulated_content else None,
333+
"tool_calls": accumulated_tool_calls if accumulated_tool_calls else None,
334+
"finish_reason": chunk.get("finish_reason"),
335+
}
336+
337+
# If we get here, return what we accumulated
338+
return {
339+
"role": "assistant",
340+
"content": accumulated_content if accumulated_content else None,
341+
"tool_calls": accumulated_tool_calls if accumulated_tool_calls else None,
342+
"finish_reason": "stop",
343+
}
344+
345+
291346
def _display_auto_compaction_notification(console: ConsoleProtocol, stats: dict[str, Any]) -> None:
292347
"""
293348
Display a subtle but informative auto-compaction notification.

src/clippy/llm/base.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
from collections.abc import Iterator
56
from dataclasses import dataclass, field
67
from typing import Any
78

@@ -29,23 +30,23 @@ class LLMResponse:
2930
class BaseProvider:
3031
"""Abstract base for all LLM providers."""
3132

32-
def create_message(
33+
def stream_message(
3334
self,
3435
messages: list[dict[str, Any]],
3536
tools: list[dict[str, Any]] | None = None,
3637
model: str = "gpt-5-mini",
3738
**kwargs: Any,
38-
) -> dict[str, Any]:
39-
"""Create a chat completion.
39+
) -> Iterator[dict[str, Any]]:
40+
"""Stream a chat completion.
4041
4142
Args:
4243
messages: List of messages in OpenAI format
4344
tools: Optional list of tool definitions in OpenAI format
4445
model: Model identifier
4546
**kwargs: Additional provider-specific arguments
4647
47-
Returns:
48-
Response dict in OpenAI format for compatibility
48+
Yields:
49+
Streaming response chunks in OpenAI format
4950
"""
5051
raise NotImplementedError
5152

src/clippy/llm/http_client.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
from __future__ import annotations
44

5+
import json as _json
56
import logging
7+
from collections.abc import Iterator
68
from typing import Any
79

810
import httpx
@@ -71,6 +73,61 @@ def post_with_retry(
7173
return response
7274

7375

76+
def stream_with_retry(
77+
client: httpx.Client,
78+
url: str,
79+
json: dict[str, Any],
80+
headers: dict[str, str],
81+
) -> Iterator[dict[str, Any]]:
82+
"""POST with streaming response for SSE.
83+
84+
Args:
85+
client: httpx Client instance
86+
url: URL to POST to
87+
json: JSON payload
88+
headers: HTTP headers
89+
90+
Yields:
91+
Parsed SSE data chunks
92+
93+
Raises:
94+
RetryableHTTPError: For retryable status codes
95+
httpx.HTTPStatusError: For non-retryable errors
96+
"""
97+
try:
98+
with client.stream(
99+
"POST",
100+
url,
101+
json=json,
102+
headers=headers,
103+
) as response:
104+
if response.status_code in RETRYABLE_STATUS_CODES:
105+
logger.warning(f"Retryable error {response.status_code} from {url}")
106+
raise RetryableHTTPError(response.status_code, "Retryable status code")
107+
108+
response.raise_for_status()
109+
110+
# Parse SSE (Server-Sent Events) stream
111+
for line in response.iter_lines():
112+
if line.strip() == "":
113+
continue # Skip empty lines
114+
if line.startswith("data: "):
115+
data = line[6:] # Remove "data: " prefix
116+
if data.strip() == "[DONE]":
117+
break
118+
try:
119+
parsed = _json.loads(data)
120+
yield parsed
121+
except _json.JSONDecodeError:
122+
logger.warning(f"Failed to parse SSE data: {data}")
123+
continue
124+
125+
except httpx.HTTPStatusError as e:
126+
if e.response.status_code in RETRYABLE_STATUS_CODES:
127+
raise RetryableHTTPError(e.response.status_code, e.response.text)
128+
raise
129+
130+
74131
def create_client(timeout: httpx.Timeout | None = None) -> httpx.Client:
75132
"""Create a configured httpx client.
76133

0 commit comments

Comments
 (0)