From 7286479978f4042f3a69fdcf1dcb46702d72d481 Mon Sep 17 00:00:00 2001 From: nicoferdi96 Date: Wed, 22 Oct 2025 15:16:54 +0200 Subject: [PATCH 1/3] feat: Add CrewAI Flows and LiteAgent support to Galileo event listener - Add support for Flow events (FlowStartedEvent, FlowFinishedEvent, MethodExecutionStartedEvent, MethodExecutionFinishedEvent, MethodExecutionFailedEvent) - Add support for LiteAgent events (LiteAgentExecutionStartedEvent, LiteAgentExecutionCompletedEvent, LiteAgentExecutionErrorEvent) - Implement flow context tracking with _flow_root_id, _current_method_id, and _current_lite_agent_id - Enhance _generate_run_id to handle flow-specific events and agent_info - Update parent run ID logic to properly nest crews within methods and tools/LLMs within lite agents - Add _hash_to_uuid helper method for cleaner UUID generation - Import flow and lite agent event types conditionally based on CrewAI version --- src/galileo/handlers/crewai/handler.py | 386 +++++++++++++++++++------ 1 file changed, 305 insertions(+), 81 deletions(-) diff --git a/src/galileo/handlers/crewai/handler.py b/src/galileo/handlers/crewai/handler.py index 811b1b02b..41bf9d6aa 100644 --- a/src/galileo/handlers/crewai/handler.py +++ b/src/galileo/handlers/crewai/handler.py @@ -25,12 +25,22 @@ AgentExecutionCompletedEvent, AgentExecutionErrorEvent, AgentExecutionStartedEvent, + LiteAgentExecutionCompletedEvent, + LiteAgentExecutionErrorEvent, + LiteAgentExecutionStartedEvent, ) from crewai.events.types.crew_events import ( # pyright: ignore[reportMissingImports] CrewKickoffCompletedEvent, CrewKickoffFailedEvent, CrewKickoffStartedEvent, ) + from crewai.events.types.flow_events import ( # pyright: ignore[reportMissingImports] + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionFailedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, + ) from crewai.events.types.llm_events import ( # pyright: ignore[reportMissingImports] LLMCallCompletedEvent, LLMCallFailedEvent, @@ -139,6 +149,11 @@ def __init__( integration="crewai", ) + # Flow context tracking + self._flow_root_id: Optional[UUID] = None + self._current_method_id: Optional[UUID] = None + self._current_lite_agent_id: Optional[UUID] = None + # Only call super().__init__() if CrewAI is available if CREWAI_AVAILABLE: super().__init__() @@ -219,6 +234,40 @@ def on_llm_call_completed(source: Any, event: LLMCallCompletedEvent) -> None: def on_llm_call_failed(source: Any, event: LLMCallFailedEvent) -> None: self._handle_llm_call_failed(source, event) + # Flow event handlers + @crewai_event_bus.on(FlowStartedEvent) + def on_flow_started(source: Any, event: FlowStartedEvent) -> None: + self._handle_flow_started(source, event) + + @crewai_event_bus.on(FlowFinishedEvent) + def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None: + self._handle_flow_finished(source, event) + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def on_method_execution_started(source: Any, event: MethodExecutionStartedEvent) -> None: + self._handle_method_execution_started(source, event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def on_method_execution_finished(source: Any, event: MethodExecutionFinishedEvent) -> None: + self._handle_method_execution_finished(source, event) + + @crewai_event_bus.on(MethodExecutionFailedEvent) + def on_method_execution_failed(source: Any, event: MethodExecutionFailedEvent) -> None: + self._handle_method_execution_failed(source, event) + + # Light Agent event handlers + @crewai_event_bus.on(LiteAgentExecutionStartedEvent) + def on_lite_agent_execution_started(source: Any, event: LiteAgentExecutionStartedEvent) -> None: + self._handle_lite_agent_execution_started(source, event) + + @crewai_event_bus.on(LiteAgentExecutionCompletedEvent) + def on_lite_agent_execution_completed(source: Any, event: LiteAgentExecutionCompletedEvent) -> None: + self._handle_lite_agent_execution_completed(source, event) + + @crewai_event_bus.on(LiteAgentExecutionErrorEvent) + def on_lite_agent_execution_error(source: Any, event: LiteAgentExecutionErrorEvent) -> None: + self._handle_lite_agent_execution_error(source, event) + # Memory event handlers (only available in CrewAI >= 0.177.0) if CREWAI_EVENTS_MODULE_AVAILABLE: @@ -254,70 +303,86 @@ def on_memory_retrieval_started(source: Any, event: MemoryRetrievalStartedEvent) def on_memory_retrieval_completed(source: Any, event: MemoryRetrievalCompletedEvent) -> None: self._handle_memory_retrieval_completed(source, event) + def _hash_to_uuid(self, content: str) -> UUID: + """Convert a string to a UUID using MD5 hash.""" + return UUID(hashlib.md5(content.encode()).hexdigest()) + def _generate_run_id(self, source: Any, event: Any) -> UUID: """Generate a consistent UUID for event tracing.""" - # Memory event specific ID generation - if hasattr(event, "query"): # Memory query events - source_type = event.source_type if hasattr(event, "source_type") else "" - query_str = f"memory_query_{event.query}_{source_type}_{getattr(event, 'agent_id', '')}_{getattr(event, 'limit', '')}_{getattr(event, 'score_threshold', '')}" - hash_obj = hashlib.md5(query_str.encode()) - digest = hash_obj.hexdigest() - return UUID(digest) - - if hasattr(event, "value") and getattr(event, "type", "").startswith("memory_save"): # Memory save events - save_str = f"memory_save_{getattr(event, 'agent_id', '')}_{getattr(event, 'task_id', '')}_{getattr(event, 'agent_role', '')}" - hash_obj = hashlib.md5(save_str.encode()) - digest = hash_obj.hexdigest() - return UUID(digest) - - if ( - hasattr(event, "memory_content") or getattr(event, "type", "") == "memory_retrieval_started" - ): # Memory retrieval events - # Use consistent fields for all retrieval events (started/completed) - exclude memory_content to ensure same ID - retrieval_str = f"memory_retrieval_{getattr(event, 'task_id', '')}_{getattr(event, 'agent_id', '')}" - hash_obj = hashlib.md5(retrieval_str.encode()) - digest = hash_obj.hexdigest() - return UUID(digest) + if hasattr(event, "flow_name"): + if getattr(event, "type", "") in ("flow_started", "flow_finished"): + if hasattr(source, "state"): + state_id = getattr(source.state, "id", None) or ( + source.state.get("id") if isinstance(source.state, dict) else None + ) + if state_id: + return state_id if isinstance(state_id, UUID) else UUID(str(state_id)) + + if hasattr(source, "id") and source.id: + return source.id if isinstance(source.id, UUID) else UUID(str(source.id)) + + _logger.warning("Flow has no state.id or source.id, using hash-based ID") + return self._hash_to_uuid(f"flow_{event.flow_name}") + + if hasattr(event, "method_name"): + flow_id = event.flow_name + if hasattr(source, "state"): + state_id = getattr(source.state, "id", None) or ( + source.state.get("id") if isinstance(source.state, dict) else None + ) + if state_id: + flow_id = str(state_id) + elif hasattr(source, "id") and source.id: + flow_id = str(source.id) + + return self._hash_to_uuid(f"method_{flow_id}_{event.method_name}") + + if hasattr(event, "agent_info") and isinstance(event.agent_info, dict): + if hasattr(source, "id") and source.id: + return source.id if isinstance(source.id, UUID) else UUID(str(source.id)) + return self._hash_to_uuid(json.dumps(event.agent_info, sort_keys=True, default=str)) + + if hasattr(event, "query"): + source_type = getattr(event, "source_type", "") + return self._hash_to_uuid( + f"memory_query_{event.query}_{source_type}_{getattr(event, 'agent_id', '')}_{getattr(event, 'limit', '')}_{getattr(event, 'score_threshold', '')}" + ) + + if hasattr(event, "value") and getattr(event, "type", "").startswith("memory_save"): + return self._hash_to_uuid( + f"memory_save_{getattr(event, 'agent_id', '')}_{getattr(event, 'task_id', '')}_{getattr(event, 'agent_role', '')}" + ) + + if hasattr(event, "memory_content") or getattr(event, "type", "") == "memory_retrieval_started": + return self._hash_to_uuid( + f"memory_retrieval_{getattr(event, 'task_id', '')}_{getattr(event, 'agent_id', '')}" + ) if hasattr(source, "id") and source.id: return source.id if hasattr(event, "messages"): - messages = json.dumps(event.to_json().get("messages", [])) - hash_obj = hashlib.md5(messages.encode()) - digest = hash_obj.hexdigest() - return UUID(digest) + return self._hash_to_uuid(json.dumps(event.to_json().get("messages", []))) if hasattr(event, "tool_args"): - # tool_args might be a dict or JSON string - if isinstance(event.tool_args, str): - tool_args = event.tool_args - else: - # Convert dict to JSON string - tool_args = json.dumps(event.tool_args) if isinstance(event.tool_args, dict) else str(event.tool_args) - hash_obj = hashlib.md5(tool_args.encode()) - digest = hash_obj.hexdigest() - return UUID(digest) + tool_args = ( + event.tool_args + if isinstance(event.tool_args, str) + else (json.dumps(event.tool_args) if isinstance(event.tool_args, dict) else str(event.tool_args)) + ) + return self._hash_to_uuid(tool_args) if isinstance(event, dict) and "messages" in event: - # If event is a string, use it directly - messages = json.dumps(event["messages"]) - hash_obj = hashlib.md5(messages.encode()) - digest = hash_obj.hexdigest() - return UUID(digest) - - # Fallback to generating a UUID based on event properties - source_str = f"{getattr(event, 'crew_name', '')}_{getattr(event, 'agent', '')}_{getattr(event, 'task', '')}" - hash_obj = hashlib.md5(source_str.encode()) - digest = hash_obj.hexdigest() - return UUID(digest) + return self._hash_to_uuid(json.dumps(event["messages"])) + + return self._hash_to_uuid( + f"{getattr(event, 'crew_name', '')}_{getattr(event, 'agent', '')}_{getattr(event, 'task', '')}" + ) def _extract_metadata(self, event: Any) -> dict: """Extract metadata from event for span attributes.""" - metadata = {} - metadata["timestamp"] = event.timestamp.isoformat() + metadata = {"timestamp": event.timestamp.isoformat()} - # Add source information if hasattr(event, "source_type") and event.source_type: metadata["source_type"] = event.source_type @@ -330,42 +395,38 @@ def _extract_metadata(self, event: Any) -> dict: def _handle_crew_kickoff_started(self, source: Any, event: "CrewKickoffStartedEvent") -> None: """Handle crew execution start.""" run_id = self._generate_run_id(source, event) - crew_name = event.crew_name if hasattr(event, "crew_name") else "Crew" + crew_name = getattr(event, "crew_name", "Crew") metadata = self._extract_metadata(event) metadata["crew_name"] = crew_name - input = getattr(event, "inputs", {}) + input_data = getattr(event, "inputs", {}) self._handler.start_node( node_type=NodeType.CHAIN.value, - parent_run_id=None, # Root node + parent_run_id=self._current_method_id, run_id=run_id, name=crew_name, - input=serialize_to_str(input) if input else "-", + input=serialize_to_str(input_data) if input_data else "-", metadata=metadata, ) def _update_crew_input(self, run_id: str) -> None: - """Update crew input with task descriptions. if input is not set.""" + """Update crew input with task descriptions if input is not set.""" nodes = self._handler.get_nodes() root_node = nodes.get(str(run_id)) if not root_node: return - input = root_node.span_params.get("input", "-") - tasks = "" + current_input = root_node.span_params.get("input", "-") - if not input or input == "-": + if not current_input or current_input == "-": tasks = "\n".join( - [ - str(node.span_params.get("metadata", {}).get("task_description")) - for node in nodes.values() - if node.span_params.get("metadata", {}).get("task_description") - ] + str(node.span_params.get("metadata", {}).get("task_description")) + for node in nodes.values() + if node.span_params.get("metadata", {}).get("task_description") ) - input = f"Tasks: {tasks}" if tasks else "-" - root_node.span_params["input"] = input + root_node.span_params["input"] = f"Tasks: {tasks}" if tasks else "-" def _handle_crew_kickoff_completed(self, source: Any, event: "CrewKickoffCompletedEvent") -> None: """Handle crew execution completion.""" @@ -387,9 +448,9 @@ def _handle_agent_execution_started(self, source: Any, event: "AgentExecutionSta """Handle agent execution start.""" run_id = self._generate_run_id(source, event) parent_run_id = self._to_uuid(event.task.id) - role = "Unknown Agent" metadata = self._extract_metadata(event) + role = "Unknown Agent" if hasattr(event, "agent") and event.agent: if hasattr(event.agent, "role"): role = event.agent.role @@ -428,31 +489,29 @@ def _handle_agent_execution_error(self, source: Any, event: "AgentExecutionError def _handle_task_started(self, source: Any, event: "TaskStartedEvent") -> None: """Handle task start.""" run_id = self._generate_run_id(source, event) - task = event.task if hasattr(event, "task") else None + task = getattr(event, "task", None) parent_run_id = self._to_uuid(task.agent.crew.id) if task else None metadata = self._extract_metadata(event) + task_name = "Unknown Task" + if task: if hasattr(task, "description"): metadata["task_description"] = task.description + task_name = task.description[:50] + "..." if len(task.description) > 50 else task.description if hasattr(task, "id"): metadata["task_id"] = str(task.id) - task_name = "Unknown Task" - if task and hasattr(task, "description"): - # Use first 50 chars of description as name - task_name = task.description[:50] + "..." if len(task.description) > 50 else task.description - - input = getattr(event, "context", "") - if not input: - input = task.description if task and hasattr(task, "description") else "" + input_data = getattr(event, "context", "") or ( + task.description if task and hasattr(task, "description") else "" + ) self._handler.start_node( node_type=NodeType.CHAIN.value, parent_run_id=parent_run_id, run_id=run_id, name=task_name, - input=serialize_to_str(input) if input else "-", + input=serialize_to_str(input_data) if input_data else "-", metadata=metadata, ) @@ -477,21 +536,22 @@ def _handle_task_failed(self, source: Any, event: "TaskFailedEvent") -> None: def _handle_tool_usage_started(self, source: Any, event: "ToolUsageStartedEvent") -> None: """Handle tool usage start.""" run_id = self._generate_run_id(source, event) - parent_run_id = self._to_uuid(event.agent.id) if event.agent else None - input = getattr(event, "tool_args", {}) + parent_run_id = self._current_lite_agent_id or (self._to_uuid(event.agent.id) if event.agent else None) + + tool_args = getattr(event, "tool_args", {}) tool_name = getattr(event, "tool_name", "Unknown Tool") metadata = self._extract_metadata(event) metadata["tool_name"] = tool_name - if input: - metadata["tool_args"] = str(input) + if tool_args: + metadata["tool_args"] = str(tool_args) self._handler.start_node( node_type=NodeType.TOOL.value, parent_run_id=parent_run_id, run_id=run_id, name=tool_name, - input=serialize_to_str(input) if input else "-", + input=serialize_to_str(tool_args) if tool_args else "-", metadata=metadata, ) @@ -524,7 +584,10 @@ def _to_uuid(self, id: Union[str, None, UUID]) -> Union[UUID, None]: def _handle_llm_call_started(self, source: Any, event: "LLMCallStartedEvent") -> None: """Handle LLM call start.""" run_id = self._generate_run_id(source, event) - parent_run_id = self._to_uuid(event.agent_id) if hasattr(event, "agent_id") else None + parent_run_id = self._current_lite_agent_id or ( + self._to_uuid(event.agent_id) if hasattr(event, "agent_id") else None + ) + llm_name = getattr(source, "model", "Unknown Model") metadata = self._extract_metadata(event) @@ -684,6 +747,167 @@ def _handle_memory_retrieval_completed(self, source: Any, event: "MemoryRetrieva self._handler.end_node(run_id=run_id, output=serialize_to_str(event.memory_content), metadata=metadata) + # Flow event handlers + def _handle_flow_started(self, source: Any, event: "FlowStartedEvent") -> None: + """Handle Flow execution start.""" + run_id = self._generate_run_id(source, event) + self._flow_root_id = run_id + + flow_name = event.flow_name if hasattr(event, "flow_name") else "Flow" + + metadata = self._extract_metadata(event) + metadata["flow_name"] = flow_name + metadata["flow_state_id"] = str(run_id) + + input_data = getattr(event, "inputs", {}) + + self._handler.start_node( + node_type=NodeType.CHAIN.value, + parent_run_id=None, + run_id=run_id, + name=flow_name, + input=serialize_to_str(input_data) if input_data else "-", + metadata=metadata, + ) + + def _handle_flow_finished(self, source: Any, event: "FlowFinishedEvent") -> None: + """Handle Flow execution completion.""" + run_id = self._flow_root_id + + if not run_id: + run_id = self._generate_run_id(source, event) + _logger.debug("Flow finished: regenerated run_id from event") + + output = getattr(event, "result", {}) + self._handler.end_node(run_id=run_id, output=serialize_to_str(output)) + + self._flow_root_id = None + self._current_method_id = None + self._current_lite_agent_id = None + + def _handle_method_execution_started(self, source: Any, event: "MethodExecutionStartedEvent") -> None: + """Handle method execution start.""" + run_id = self._generate_run_id(source, event) + self._current_method_id = run_id + + method_name = event.method_name if hasattr(event, "method_name") else "Method" + + metadata = self._extract_metadata(event) + metadata["method_name"] = method_name + metadata["flow_name"] = event.flow_name + + if hasattr(event, "state"): + metadata["state_type"] = type(event.state).__name__ + if hasattr(event.state, "id"): + metadata["state_id"] = str(event.state.id) + elif isinstance(event.state, dict) and "id" in event.state: + metadata["state_id"] = str(event.state["id"]) + + input_data = getattr(event, "params", None) + if not input_data: + input_data = getattr(event, "state", None) + + self._handler.start_node( + node_type=NodeType.CHAIN.value, + parent_run_id=self._flow_root_id, + run_id=run_id, + name=method_name, + input=serialize_to_str(input_data) if input_data else "-", + metadata=metadata, + ) + + def _handle_method_execution_finished(self, source: Any, event: "MethodExecutionFinishedEvent") -> None: + """Handle method execution completion.""" + run_id = self._current_method_id + + if not run_id: + run_id = self._generate_run_id(source, event) + _logger.debug(f"Method finished: regenerated run_id for {event.method_name}") + + output = getattr(event, "result", None) + metadata = {} + + if hasattr(event, "state"): + metadata["final_state_type"] = type(event.state).__name__ + + self._handler.end_node(run_id=run_id, output=serialize_to_str(output), metadata=metadata) + + self._current_method_id = None + + def _handle_method_execution_failed(self, source: Any, event: "MethodExecutionFailedEvent") -> None: + """Handle method execution failure.""" + run_id = self._current_method_id + + if not run_id: + _logger.warning("Method failed event without current_method_id") + return + + metadata = self._extract_metadata(event) + error = getattr(event, "error", "Unknown error") + metadata["error"] = str(error) + metadata["error_type"] = type(error).__name__ + + self._handler.end_node(run_id=run_id, output=f"Error: {error}", metadata=metadata) + + self._current_method_id = None + + def _handle_lite_agent_execution_started(self, source: Any, event: "LiteAgentExecutionStartedEvent") -> None: + """Handle LiteAgent execution start.""" + run_id = self._generate_run_id(source, event) + self._current_lite_agent_id = run_id + parent_run_id = self._current_method_id + + agent_info = getattr(event, "agent_info", {}) + agent_name = agent_info.get("role", agent_info.get("name", "LiteAgent")) + + metadata = self._extract_metadata(event) + + metadata["agent_info"] = json.dumps(agent_info, default=str) + + if hasattr(event, "tools") and event.tools: + metadata["available_tools"] = [str(getattr(tool, "name", tool)) for tool in event.tools] + + messages = getattr(event, "messages", []) + + self._handler.start_node( + node_type=NodeType.AGENT.value, + parent_run_id=parent_run_id, + run_id=run_id, + name=agent_name, + input=serialize_to_str(messages), + metadata=metadata, + ) + + def _handle_lite_agent_execution_completed(self, source: Any, event: "LiteAgentExecutionCompletedEvent") -> None: + """Handle LiteAgent execution completion.""" + run_id = self._current_lite_agent_id + + if not run_id: + run_id = self._generate_run_id(source, event) + _logger.debug("LiteAgent completed: regenerated run_id from event") + + output = getattr(event, "output", "") + + self._handler.end_node(run_id=run_id, output=serialize_to_str(output)) + + self._current_lite_agent_id = None + + def _handle_lite_agent_execution_error(self, source: Any, event: "LiteAgentExecutionErrorEvent") -> None: + """Handle LiteAgent execution error.""" + run_id = self._current_lite_agent_id + + if not run_id: + run_id = self._generate_run_id(source, event) + _logger.debug("LiteAgent error: regenerated run_id from event") + + metadata = self._extract_metadata(event) + error = getattr(event, "error", "Unknown error") + metadata["error"] = str(error) + + self._handler.end_node(run_id=run_id, output=f"Error: {error}", metadata=metadata) + + self._current_lite_agent_id = None + def lite_llm_usage_callback( self, kwargs: dict, # kwargs to completion From 08d90596f85dbeb1344950ef22ad1acf6684af5c Mon Sep 17 00:00:00 2001 From: nicoferdi96 Date: Wed, 22 Oct 2025 16:27:28 +0200 Subject: [PATCH 2/3] Fix on method handling and inputs for flows Now we are correctly handling state changes and execution methods --- src/galileo/handlers/crewai/handler.py | 70 +++++++++++++++++++++++--- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/src/galileo/handlers/crewai/handler.py b/src/galileo/handlers/crewai/handler.py index 41bf9d6aa..3ee7fd4e7 100644 --- a/src/galileo/handlers/crewai/handler.py +++ b/src/galileo/handlers/crewai/handler.py @@ -358,12 +358,16 @@ def _generate_run_id(self, source: Any, event: Any) -> UUID: f"memory_retrieval_{getattr(event, 'task_id', '')}_{getattr(event, 'agent_id', '')}" ) + # For LLM events, always generate unique IDs to avoid conflicts with agents + if hasattr(event, "messages"): + # Include source.id in hash if available to maintain some consistency + source_id = getattr(source, "id", "unknown") if hasattr(source, "id") else "unknown" + messages_hash = json.dumps(event.to_json().get("messages", [])) + return self._hash_to_uuid(f"llm_{source_id}_{messages_hash}") + if hasattr(source, "id") and source.id: return source.id - if hasattr(event, "messages"): - return self._hash_to_uuid(json.dumps(event.to_json().get("messages", []))) - if hasattr(event, "tool_args"): tool_args = ( event.tool_args @@ -391,6 +395,58 @@ def _extract_metadata(self, event: Any) -> dict: return metadata + def _extract_flow_state_input(self, source: Any, event: Any) -> dict: + """Extract FlowState pydantic model as input data.""" + # For method events, prefer event.state as it might be more current + state_to_check = None + if hasattr(event, "state") and event.state is not None: + state_to_check = event.state + elif hasattr(source, "state"): + state_to_check = source.state + + if state_to_check is not None: + if hasattr(state_to_check, "model_dump"): + # Pydantic v2 + try: + return state_to_check.model_dump() + except Exception: + pass + elif hasattr(state_to_check, "dict"): + # Pydantic v1 + try: + return state_to_check.dict() + except Exception: + pass + elif isinstance(state_to_check, dict): + # Already a dict + return state_to_check + else: + # Try to convert to dict + try: + return vars(state_to_check) + except Exception: + pass + + # Fallback to event inputs + event_inputs = getattr(event, "inputs", {}) + if event_inputs: + return event_inputs + + # Last resort - try to get any available data + if hasattr(source, "__dict__"): + source_data = {} + for key, value in source.__dict__.items(): + if not key.startswith("_"): # Skip private attributes + try: + # Try to serialize the value + json.dumps(value, default=str) + source_data[key] = value + except (TypeError, ValueError): + source_data[key] = str(value) + return source_data + + return {} + # Crew event handlers def _handle_crew_kickoff_started(self, source: Any, event: "CrewKickoffStartedEvent") -> None: """Handle crew execution start.""" @@ -759,7 +815,8 @@ def _handle_flow_started(self, source: Any, event: "FlowStartedEvent") -> None: metadata["flow_name"] = flow_name metadata["flow_state_id"] = str(run_id) - input_data = getattr(event, "inputs", {}) + # Capture FlowState pydantic model if available + input_data = self._extract_flow_state_input(source, event) self._handler.start_node( node_type=NodeType.CHAIN.value, @@ -803,9 +860,8 @@ def _handle_method_execution_started(self, source: Any, event: "MethodExecutionS elif isinstance(event.state, dict) and "id" in event.state: metadata["state_id"] = str(event.state["id"]) - input_data = getattr(event, "params", None) - if not input_data: - input_data = getattr(event, "state", None) + # Capture current FlowState for this method + input_data = self._extract_flow_state_input(source, event) self._handler.start_node( node_type=NodeType.CHAIN.value, From f367b11fd02c6398d6126b9144624a38e3533b93 Mon Sep 17 00:00:00 2001 From: nicoferdi96 Date: Wed, 29 Oct 2025 15:55:37 +0100 Subject: [PATCH 3/3] better CrewAI handling - Race condition for event bus events - Better flow handling - including crews inside a flow - Better handling of agents --- src/galileo/handlers/crewai/handler.py | 95 +++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 8 deletions(-) diff --git a/src/galileo/handlers/crewai/handler.py b/src/galileo/handlers/crewai/handler.py index 3ee7fd4e7..47d0bb3e8 100644 --- a/src/galileo/handlers/crewai/handler.py +++ b/src/galileo/handlers/crewai/handler.py @@ -153,6 +153,11 @@ def __init__( self._flow_root_id: Optional[UUID] = None self._current_method_id: Optional[UUID] = None self._current_lite_agent_id: Optional[UUID] = None + self._in_flow_context: bool = False + + # Crew context tracking for proper agent/task hierarchy + self._current_crew_id: Optional[UUID] = None + self._current_agent_id: Optional[UUID] = None # Only call super().__init__() if CrewAI is available if CREWAI_AVAILABLE: @@ -451,6 +456,7 @@ def _extract_flow_state_input(self, source: Any, event: Any) -> dict: def _handle_crew_kickoff_started(self, source: Any, event: "CrewKickoffStartedEvent") -> None: """Handle crew execution start.""" run_id = self._generate_run_id(source, event) + self._current_crew_id = run_id crew_name = getattr(event, "crew_name", "Crew") metadata = self._extract_metadata(event) @@ -458,9 +464,33 @@ def _handle_crew_kickoff_started(self, source: Any, event: "CrewKickoffStartedEv input_data = getattr(event, "inputs", {}) + # Determine parent based on context + parent_run_id = None + + if self._in_flow_context: + # In flow context, crew must have a method parent + if self._current_method_id: + parent_run_id = self._current_method_id + _logger.debug(f"Crew {crew_name} inside flow, parent method: {parent_run_id}") + else: + # Race condition: method hasn't been set yet. In this case try to infer from the crew's source or create a placeholder method + _logger.warning( + f"Crew {crew_name} started in flow context but no current method set - " + f"possible race condition. Will nest under flow root." + ) + # Fall back to flow root to avoid creating separate trace + parent_run_id = self._flow_root_id + if not parent_run_id: + _logger.error( + f"Neither method nor flow root set for crew {crew_name} - crew may be logged incorrectly" + ) + else: + # Standalone crew - root of its own trace + _logger.debug(f"Standalone crew with ID: {run_id}") + self._handler.start_node( node_type=NodeType.CHAIN.value, - parent_run_id=self._current_method_id, + parent_run_id=parent_run_id, run_id=run_id, name=crew_name, input=serialize_to_str(input_data) if input_data else "-", @@ -490,6 +520,7 @@ def _handle_crew_kickoff_completed(self, source: Any, event: "CrewKickoffComplet output = getattr(event, "output", {}) self._update_crew_input(str(run_id)) self._handler.end_node(run_id=run_id, output=serialize_to_str(getattr(output, "raw", output))) + self._current_crew_id = None def _handle_crew_kickoff_failed(self, source: Any, event: "CrewKickoffFailedEvent") -> None: """Handle crew execution failure.""" @@ -498,12 +529,27 @@ def _handle_crew_kickoff_failed(self, source: Any, event: "CrewKickoffFailedEven metadata["error"] = event.error self._handler.end_node(run_id=run_id, output=f"Error: {event.error}", metadata=metadata) + self._current_crew_id = None # Agent event handlers def _handle_agent_execution_started(self, source: Any, event: "AgentExecutionStartedEvent") -> None: """Handle agent execution start.""" run_id = self._generate_run_id(source, event) - parent_run_id = self._to_uuid(event.task.id) + self._current_agent_id = run_id + + task = getattr(event, "task", None) + + # Try multiple methods to get the parent crew ID + parent_run_id = None + if task and hasattr(task, "agent") and hasattr(task.agent, "crew"): + parent_run_id = self._to_uuid(task.agent.crew.id) + + # Fallback to current crew + if not parent_run_id: + parent_run_id = self._current_crew_id + if not parent_run_id: + _logger.warning("Agent started but no crew parent found - agent may not nest correctly") + metadata = self._extract_metadata(event) role = "Unknown Agent" @@ -530,6 +576,7 @@ def _handle_agent_execution_completed(self, source: Any, event: "AgentExecutionC """Handle agent execution completion.""" run_id = self._generate_run_id(source, event) self._handler.end_node(run_id=run_id, output=serialize_to_str(getattr(event, "output", ""))) + self._current_agent_id = None def _handle_agent_execution_error(self, source: Any, event: "AgentExecutionErrorEvent") -> None: """Handle agent execution error.""" @@ -540,13 +587,29 @@ def _handle_agent_execution_error(self, source: Any, event: "AgentExecutionError self._handler.end_node( run_id=run_id, output=f"Error: {getattr(event, 'error', 'Unknown error')}", metadata=metadata ) + self._current_agent_id = None # Task event handlers def _handle_task_started(self, source: Any, event: "TaskStartedEvent") -> None: """Handle task start.""" run_id = self._generate_run_id(source, event) task = getattr(event, "task", None) - parent_run_id = self._to_uuid(task.agent.crew.id) if task else None + + parent_run_id = self._current_agent_id + + # Fallback if agent not set + if not parent_run_id and task: + # Try to get agent ID from task + if hasattr(task, "agent") and hasattr(task.agent, "id"): + parent_run_id = self._to_uuid(task.agent.id) + + # Last resort: use crew as parent + if not parent_run_id and hasattr(task, "agent") and hasattr(task.agent, "crew"): + parent_run_id = self._to_uuid(task.agent.crew.id) + _logger.warning("Task started but agent not found - nesting under crew instead") + + if not parent_run_id: + _logger.warning("Task started but no parent found - task may not nest correctly") metadata = self._extract_metadata(event) task_name = "Unknown Task" @@ -808,6 +871,7 @@ def _handle_flow_started(self, source: Any, event: "FlowStartedEvent") -> None: """Handle Flow execution start.""" run_id = self._generate_run_id(source, event) self._flow_root_id = run_id + self._in_flow_context = True flow_name = event.flow_name if hasattr(event, "flow_name") else "Flow" @@ -838,6 +902,7 @@ def _handle_flow_finished(self, source: Any, event: "FlowFinishedEvent") -> None output = getattr(event, "result", {}) self._handler.end_node(run_id=run_id, output=serialize_to_str(output)) + self._in_flow_context = False self._flow_root_id = None self._current_method_id = None self._current_lite_agent_id = None @@ -863,6 +928,24 @@ def _handle_method_execution_started(self, source: Any, event: "MethodExecutionS # Capture current FlowState for this method input_data = self._extract_flow_state_input(source, event) + # If flow root isn't set yet (race condition), create it now + if not self._flow_root_id: + # Generate the flow root ID from the event's flow information + flow_root_id = self._generate_run_id(source, {"flow_name": event.flow_name, "type": "flow_started"}) + self._flow_root_id = flow_root_id + self._in_flow_context = True + _logger.warning(f"Flow root not set when method {method_name} started - creating flow root retroactively") + + # Create the flow root node + self._handler.start_node( + node_type=NodeType.CHAIN.value, + parent_run_id=None, + run_id=flow_root_id, + name=event.flow_name, + input=serialize_to_str(input_data), + metadata={"flow_name": event.flow_name}, + ) + self._handler.start_node( node_type=NodeType.CHAIN.value, parent_run_id=self._flow_root_id, @@ -965,11 +1048,7 @@ def _handle_lite_agent_execution_error(self, source: Any, event: "LiteAgentExecu self._current_lite_agent_id = None def lite_llm_usage_callback( - self, - kwargs: dict, # kwargs to completion - completion_response: Any, # response from completion - start_time: datetime, - end_time: datetime, + self, kwargs: dict, completion_response: Any, start_time: datetime, end_time: datetime ) -> None: node_id = self._generate_run_id(kwargs, kwargs)