diff --git a/tinyagent/hooks/gradio_callback.py b/tinyagent/hooks/gradio_callback.py index 6e70e44..f3fa21c 100644 --- a/tinyagent/hooks/gradio_callback.py +++ b/tinyagent/hooks/gradio_callback.py @@ -5,6 +5,7 @@ import re import shutil import time +import io from pathlib import Path from typing import Any, Dict, List, Optional, Set, Union @@ -36,6 +37,7 @@ def __init__( show_thinking: bool = True, show_tool_calls: bool = True, logger: Optional[logging.Logger] = None, + log_manager: Optional[Any] = None, ): """ Initialize the Gradio callback. @@ -46,6 +48,7 @@ def __init__( show_thinking: Whether to show the thinking process show_tool_calls: Whether to show tool calls logger: Optional logger to use + log_manager: Optional LoggingManager instance to capture logs from """ self.logger = logger or logging.getLogger(__name__) self.show_thinking = show_thinking @@ -81,6 +84,37 @@ def __init__( # References to Gradio UI components (will be set in create_app) self._chatbot_component = None self._token_usage_component = None + + # Log stream for displaying logs in the UI + self.log_stream = io.StringIO() + self._log_component = None + + # Setup logging + self.log_manager = log_manager + if log_manager: + # Create a handler that writes to our StringIO stream + self.log_handler = logging.StreamHandler(self.log_stream) + self.log_handler.setFormatter( + logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s') + ) + self.log_handler.setLevel(logging.DEBUG) + + # Add the handler to the LoggingManager + log_manager.configure_handler( + self.log_handler, + format_string='%(asctime)s - %(levelname)s - %(name)s - %(message)s', + level=logging.DEBUG + ) + self.logger.debug("Added log handler to LoggingManager") + elif logger: + # Fall back to single logger if no LoggingManager is provided + self.log_handler = logging.StreamHandler(self.log_stream) + self.log_handler.setFormatter( + logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s') + ) + self.log_handler.setLevel(logging.DEBUG) + logger.addHandler(self.log_handler) + self.logger.debug("Added log handler to logger") self.logger.debug("GradioCallback initialized") @@ -157,7 +191,7 @@ async def _handle_message_add(self, agent: Any, **kwargs: Any) -> None: # Add to detailed tool call info if not already present by ID if not any(tc['id'] == tool_id for tc in self.tool_call_details): - self.tool_call_details.append({ + tool_detail = { "id": tool_id, "name": tool_name, "arguments": formatted_args, @@ -166,7 +200,25 @@ async def _handle_message_add(self, agent: Any, **kwargs: Any) -> None: "result_token_count": 0, "timestamp": current_time, "result_timestamp": None - }) + } + + # Special handling for run_python tool - extract code_lines + if tool_name == "run_python": + try: + # Look for code in different possible field names + code_content = None + for field in ['code_lines', 'code', 'script', 'python_code']: + if field in parsed_args: + code_content = parsed_args[field] + break + + if code_content is not None: + tool_detail["code_lines"] = code_content + self.logger.debug(f"Stored code content for run_python tool {tool_id}") + except Exception as e: + self.logger.error(f"Error processing run_python arguments: {e}") + + self.tool_call_details.append(tool_detail) self.logger.debug(f"Added tool call detail: {tool_name} (ID: {tool_id}, Tokens: {token_count})") # If this is a final_answer or ask_question tool, we'll handle it specially later @@ -386,20 +438,128 @@ def _get_token_usage_text(self) -> str: f"O {self.token_usage['completion_tokens']} | " + f"Total {self.token_usage['total_tokens']}") + def _format_run_python_tool(self, tool_detail: dict) -> str: + """ + Format run_python tool call with proper markdown formatting for code and output. + + Args: + tool_detail: Tool call detail dictionary + + Returns: + Formatted markdown string + """ + tool_name = tool_detail["name"] + tool_id = tool_detail.get("id", "unknown") + code_lines = tool_detail.get("code_lines", []) + result = tool_detail.get("result") + input_tokens = tool_detail.get("token_count", 0) + output_tokens = tool_detail.get("result_token_count", 0) + total_tokens = input_tokens + output_tokens + + # Start building the formatted content + parts = [] + + # Handle different code_lines formats + combined_code = "" + if code_lines: + if isinstance(code_lines, list): + # Standard case: list of code lines + combined_code = "\n".join(code_lines) + elif isinstance(code_lines, str): + # Handle case where code_lines is a single string + combined_code = code_lines + else: + # Convert other types to string + combined_code = str(code_lines) + + # If we have code content, show it as Python code block + if combined_code.strip(): + parts.append(f"**Python Code:**\n```python\n{combined_code}\n```") + else: + # Try to extract code from arguments as fallback + try: + args_dict = json.loads(tool_detail['arguments']) + # Check for different possible code field names + code_content = None + for field in ['code_lines', 'code', 'script', 'python_code']: + if field in args_dict: + code_content = args_dict[field] + break + + if code_content: + if isinstance(code_content, list): + combined_code = "\n".join(code_content) + else: + combined_code = str(code_content) + + if combined_code.strip(): + parts.append(f"**Python Code:**\n```python\n{combined_code}\n```") + else: + # Final fallback to showing raw arguments + parts.append(f"**Input Arguments:**\n```json\n{tool_detail['arguments']}\n```") + else: + # No code found, show raw arguments + parts.append(f"**Input Arguments:**\n```json\n{tool_detail['arguments']}\n```") + except (json.JSONDecodeError, KeyError): + # If we can't parse arguments, show them as-is + parts.append(f"**Input Arguments:**\n```json\n{tool_detail['arguments']}\n```") + + # Add the output if available + if result is not None: + parts.append(f"\n**Output:** ({output_tokens} tokens)") + + try: + # Try to parse result as JSON for better formatting + result_json = json.loads(result) + parts.append(f"```json\n{json.dumps(result_json, indent=2)}\n```") + except (json.JSONDecodeError, TypeError): + # Handle plain text result + if isinstance(result, str): + # Replace escaped newlines with actual newlines for better readability + formatted_result = result.replace("\\n", "\n") + parts.append(f"```\n{formatted_result}\n```") + else: + parts.append(f"```\n{str(result)}\n```") + else: + parts.append(f"\n**Status:** β³ Processing...") + + # Add token information + parts.append(f"\n**Token Usage:** {total_tokens} total ({input_tokens} input + {output_tokens} output)") + + return "\n".join(parts) + async def interact_with_agent(self, user_input_processed, chatbot_history): """ Process user input, interact with the agent, and stream updates to Gradio UI. Each tool call and response will be shown as a separate message. """ self.logger.info(f"Starting interaction for: {user_input_processed[:50]}...") + + # Reset state for new interaction to prevent showing previous content + self.thinking_content = "" + self.tool_calls = [] + self.tool_call_details = [] + self.assistant_text_responses = [] + self.token_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} + self.is_running = True + self.last_update_yield_time = 0 + self.logger.debug("Reset interaction state for new conversation turn") # 1. Add user message to chatbot history as a ChatMessage chatbot_history.append( ChatMessage(role="user", content=user_input_processed) ) - # Initial yield to show user message - yield chatbot_history, self._get_token_usage_text() + # 2. Add typing indicator immediately after user message + typing_message = ChatMessage( + role="assistant", + content="π€ Thinking..." + ) + chatbot_history.append(typing_message) + typing_message_index = len(chatbot_history) - 1 + + # Initial yield to show user message and typing indicator + yield chatbot_history, self._get_token_usage_text(), self.log_stream.getvalue() if self._log_component else None # Kick off the agent in the background loop = asyncio.get_event_loop() @@ -407,7 +567,7 @@ async def interact_with_agent(self, user_input_processed, chatbot_history): displayed_tool_calls = set() displayed_text_responses = set() - thinking_message_added = False + thinking_removed = False update_interval = 0.3 min_yield_interval = 0.2 @@ -420,6 +580,14 @@ async def interact_with_agent(self, user_input_processed, chatbot_history): sorted_tool_details = sorted(self.tool_call_details, key=lambda x: x.get("timestamp", 0)) sorted_text_responses = sorted(self.assistant_text_responses, key=lambda x: x.get("timestamp", 0)) + # Remove typing indicator once we have actual content to show + if not thinking_removed and (sorted_text_responses or sorted_tool_details): + # Remove the typing indicator + if typing_message_index < len(chatbot_history): + chatbot_history.pop(typing_message_index) + thinking_removed = True + self.logger.debug("Removed typing indicator") + # β New assistant text chunks for resp in sorted_text_responses: content = resp["content"] @@ -430,22 +598,6 @@ async def interact_with_agent(self, user_input_processed, chatbot_history): displayed_text_responses.add(content) self.logger.debug(f"Added new text response: {content[:50]}...") - # β Thinking placeholder (optional) - if self.show_thinking and self.thinking_content \ - and not thinking_message_added \ - and not displayed_text_responses: - thinking_msg = ( - "Working on it...\n\n" - "```" - f"{self.thinking_content}" - "```" - ) - chatbot_history.append( - ChatMessage(role="assistant", content=thinking_msg) - ) - thinking_message_added = True - self.logger.debug("Added thinking message") - # β Show tool calls with "working..." status when they start if self.show_tool_calls: for tool in sorted_tool_details: @@ -455,11 +607,18 @@ async def interact_with_agent(self, user_input_processed, chatbot_history): # If we haven't displayed this tool call yet if tid not in displayed_tool_calls and tid not in in_progress_tool_calls: in_tok = tool.get("token_count", 0) + # Create "working..." message for this tool call - body = ( - f"**Input Arguments:**\n```json\n{tool['arguments']}\n```\n\n" - f"**Output:** β³ Working...\n" - ) + if tname == "run_python": + # Special formatting for run_python + body = self._format_run_python_tool(tool) + else: + # Standard formatting for other tools + body = ( + f"**Input Arguments:**\n```json\n{tool['arguments']}\n```\n\n" + f"**Output:** β³ Working...\n" + ) + # Add to chatbot with "working" status msg = ChatMessage( role="assistant", @@ -483,10 +642,16 @@ async def interact_with_agent(self, user_input_processed, chatbot_history): tot_tok = in_tok + out_tok # Update the message with completed status and result - body = ( - f"**Input Arguments:**\n```json\n{tool['arguments']}\n```\n\n" - f"**Output:** ({out_tok} tokens)\n```json\n{tool['result']}\n```\n" - ) + if tname == "run_python": + # Special formatting for completed run_python + body = self._format_run_python_tool(tool) + else: + # Standard formatting for other completed tools + body = ( + f"**Input Arguments:**\n```json\n{tool['arguments']}\n```\n\n" + f"**Output:** ({out_tok} tokens)\n```json\n{tool['result']}\n```\n" + ) + # Update the existing message chatbot_history[pos] = ChatMessage( role="assistant", @@ -501,13 +666,19 @@ async def interact_with_agent(self, user_input_processed, chatbot_history): del in_progress_tool_calls[tid] self.logger.debug(f"Updated tool call to completed: {tname}") - # yield updated history + token usage + # yield updated history + token usage + logs token_text = self._get_token_usage_text() - yield chatbot_history, token_text + logs = self.log_stream.getvalue() if self._log_component else None + yield chatbot_history, token_text, logs self.last_update_yield_time = now await asyncio.sleep(update_interval) + # Remove typing indicator if still present when agent finishes + if not thinking_removed and typing_message_index < len(chatbot_history): + chatbot_history.pop(typing_message_index) + self.logger.debug("Removed typing indicator at end") + # once the agent_task is done, add its final result if any try: final_text = await agent_task @@ -521,8 +692,9 @@ async def interact_with_agent(self, user_input_processed, chatbot_history): ) self.logger.debug(f"Added final result: {final_text[:50]}...") - # final token usage - yield chatbot_history, self._get_token_usage_text() + # final token usage and logs + logs = self.log_stream.getvalue() if self._log_component else None + yield chatbot_history, self._get_token_usage_text(), logs def _format_response(self, response_text): """ @@ -673,9 +845,9 @@ def create_app(self, agent: TinyAgent, title: str = "TinyAgent Chat", descriptio # Footer gr.Markdown( - "