diff --git a/camel/agents/chat_agent.py b/camel/agents/chat_agent.py index f101679cff..ad6feb4483 100644 --- a/camel/agents/chat_agent.py +++ b/camel/agents/chat_agent.py @@ -1781,8 +1781,18 @@ def summarize( # convert structured output to custom markdown if present if structured_output: # convert structured output to custom markdown + # exclude operation_mode and target_workflow_filename + # if present (used for workflow save logic, not persisted) + exclude_fields = [] + if hasattr(structured_output, 'operation_mode'): + exclude_fields.append('operation_mode') + if hasattr(structured_output, 'target_workflow_filename'): + exclude_fields.append('target_workflow_filename') + summary_content = context_util.structured_output_to_markdown( - structured_data=structured_output, metadata=metadata + structured_data=structured_output, + metadata=metadata, + exclude_fields=exclude_fields if exclude_fields else None, ) if add_user_messages: summary_content = self._append_user_messages_section( @@ -2215,8 +2225,18 @@ async def asummarize( # convert structured output to custom markdown if present if structured_output: # convert structured output to custom markdown + # exclude operation_mode and target_workflow_filename + # if present (used for workflow save logic, not persisted) + exclude_fields = [] + if hasattr(structured_output, 'operation_mode'): + exclude_fields.append('operation_mode') + if hasattr(structured_output, 'target_workflow_filename'): + exclude_fields.append('target_workflow_filename') + summary_content = context_util.structured_output_to_markdown( - structured_data=structured_output, metadata=metadata + structured_data=structured_output, + metadata=metadata, + exclude_fields=exclude_fields if exclude_fields else None, ) if add_user_messages: summary_content = self._append_user_messages_section( diff --git a/camel/benchmarks/browsecomp.py b/camel/benchmarks/browsecomp.py index 3f269388ec..1b9d05e581 100644 --- a/camel/benchmarks/browsecomp.py +++ b/camel/benchmarks/browsecomp.py @@ -55,7 +55,7 @@ class QueryResponse(BaseModel): ) exact_answer: str = Field(description="""your succinct, final answer.""") confidence: str = Field( - description=""" + description=r""" your confidence score between 0|\%| and 100|\%| for your answer. """ ) @@ -92,7 +92,7 @@ class GradingResponse(BaseModel): incorrect.""" ) confidence: str = Field( - description="""The extracted confidence score between 0|\%| + description=r"""The extracted confidence score between 0|\%| and 100|\%| from [response]. Put 100 if there is no confidence score available. """ ) @@ -160,8 +160,8 @@ class EvalResult(BaseModel): {content} """ -GRADER_TEMPLATE = """ -Judge whether the following [response] to [question] is correct or not +GRADER_TEMPLATE = r""" +Judge whether the following [response] to [question] is correct or not based on the precise and unambiguous [correct_answer] below. [question]: {question} @@ -171,26 +171,26 @@ class EvalResult(BaseModel): Your judgement must be in the format and criteria specified below: extracted_final_answer: The final exact answer extracted from the [response]. -Put the extracted answer as 'None' if there is no exact, final answer to +Put the extracted answer as 'None' if there is no exact, final answer to extract from the response. [correct_answer]: {correct_answer} -reasoning: Explain why the extracted_final_answer is correct or incorrect -based on [correct_answer], focusing only on if there are meaningful -differences between [correct_answer] and the extracted_final_answer. -Do not comment on any background to the problem, do not attempt -to solve the problem, do not argue for any answer different +reasoning: Explain why the extracted_final_answer is correct or incorrect +based on [correct_answer], focusing only on if there are meaningful +differences between [correct_answer] and the extracted_final_answer. +Do not comment on any background to the problem, do not attempt +to solve the problem, do not argue for any answer different than [correct_answer], focus only on whether the answers match. -correct: Answer 'yes' if extracted_final_answer matches the -[correct_answer] given above, or is within a small margin of error for -numerical problems. Answer 'no' otherwise, i.e. if there is any -inconsistency, ambiguity, non-equivalency, or if the extracted answer is +correct: Answer 'yes' if extracted_final_answer matches the +[correct_answer] given above, or is within a small margin of error for +numerical problems. Answer 'no' otherwise, i.e. if there is any +inconsistency, ambiguity, non-equivalency, or if the extracted answer is incorrect. -confidence: The extracted confidence score between 0|\%| and 100|\%| +confidence: The extracted confidence score between 0|\%| and 100|\%| from [response]. Put 100 if there is no confidence score available. """.strip() diff --git a/camel/societies/workforce/workflow_memory_manager.py b/camel/societies/workforce/workflow_memory_manager.py index 769ef980a4..c4da27717d 100644 --- a/camel/societies/workforce/workflow_memory_manager.py +++ b/camel/societies/workforce/workflow_memory_manager.py @@ -93,6 +93,14 @@ def __init__( self._role_identifier = role_identifier self.config = config if config is not None else WorkflowConfig() + # mapping of loaded workflow filenames to their full file paths + # populated when workflows are loaded, used to resolve update targets + self._loaded_workflow_paths: Dict[str, str] = {} + + # cached loaded workflow contents for reuse in prompt preparation + # list of dicts with 'filename' and 'content' keys + self._loaded_workflow_contents: List[Dict[str, str]] = [] + def _get_context_utility(self) -> ContextUtility: r"""Get context utility with lazy initialization. @@ -554,8 +562,9 @@ def save_workflow( r"""Save the worker's current workflow memories using agent summarization. - This method generates a workflow summary from the worker agent's - conversation history and saves it to a markdown file. + This method uses a two-pass approach: first generates the workflow + summary to determine operation_mode (update vs create), then saves + to the appropriate file path based on that decision. Args: conversation_accumulator (Optional[ChatAgent]): Optional @@ -570,26 +579,210 @@ def save_workflow( - worker_description (str): Worker description used """ try: - # setup context utility and agent + # pass 1: generate workflow summary (without saving to disk) + summary_result = self.generate_workflow_summary( + conversation_accumulator=conversation_accumulator + ) + + if summary_result["status"] != "success": + return { + "status": "error", + "summary": "", + "file_path": None, + "worker_description": self.description, + "message": f"Failed to generate summary: " + f"{summary_result['status']}", + } + + workflow_summary = summary_result["structured_summary"] + if not workflow_summary: + return { + "status": "error", + "summary": "", + "file_path": None, + "worker_description": self.description, + "message": "No structured summary generated", + } + + # pass 2: save using save_workflow_content which handles + # operation_mode branching + context_util = self._get_context_utility() + result = self.save_workflow_content( + workflow_summary=workflow_summary, + context_utility=context_util, + conversation_accumulator=conversation_accumulator, + ) + + return result + + except Exception as e: + return { + "status": "error", + "summary": "", + "file_path": None, + "worker_description": self.description, + "message": f"Failed to save workflow memories: {e!s}", + } + + def generate_workflow_summary( + self, + conversation_accumulator: Optional[ChatAgent] = None, + ) -> Dict[str, Any]: + r"""Generate a workflow summary without saving to disk. + + This method generates a workflow summary by calling a dedicated + summarizer agent. It does NOT save to disk - only generates the + summary content and structured output. Use this when you need to + inspect the summary (e.g., extract operation_mode) before determining + where to save it. + + Args: + conversation_accumulator (Optional[ChatAgent]): Optional + accumulator agent with collected conversations. If provided, + uses this instead of the main worker agent. + + Returns: + Dict[str, Any]: Result dictionary with: + - structured_summary: WorkflowSummary instance or None + - summary_content: Raw text content + - status: "success" or error message + """ + + result: Dict[str, Any] = { + "structured_summary": None, + "summary_content": "", + "status": "", + } + + try: + # setup context utility context_util = self._get_context_utility() self.worker.set_context_utility(context_util) - # prepare workflow summarization components + # prepare workflow summarization prompt structured_prompt = self._prepare_workflow_prompt() - # check if we should use role_name or let summarize extract - # task_title - clean_name = self._get_sanitized_role_name() - use_role_name_for_filename = not is_generic_role_name(clean_name) - - # if role_name is explicit, use it for filename - # if role_name is generic, pass none to let summarize use - # task_title - filename = ( - self._generate_workflow_filename() - if use_role_name_for_filename - else None + # select agent for summarization + agent_to_summarize = self.worker + if conversation_accumulator is not None: + accumulator_messages, _ = ( + conversation_accumulator.memory.get_context() + ) + if accumulator_messages: + conversation_accumulator.set_context_utility(context_util) + agent_to_summarize = conversation_accumulator + logger.info( + f"Using conversation accumulator with " + f"{len(accumulator_messages)} messages for workflow " + f"summary" + ) + + # get conversation from agent's memory + source_agent = ( + conversation_accumulator + if conversation_accumulator + else self.worker ) + messages, _ = source_agent.memory.get_context() + + if not messages: + result["status"] = "No conversation context available" + return result + + # build conversation text + conversation_text, _ = ( + agent_to_summarize._build_conversation_text_from_messages( + messages, include_summaries=False + ) + ) + + if not conversation_text: + result["status"] = "Conversation context is empty" + return result + + # create or reuse summarizer agent + if agent_to_summarize._context_summary_agent is None: + agent_to_summarize._context_summary_agent = ChatAgent( + system_message=( + "You are a helpful assistant that summarizes " + "conversations" + ), + model=agent_to_summarize.model_backend, + agent_id=f"{agent_to_summarize.agent_id}_context_summarizer", + ) + else: + agent_to_summarize._context_summary_agent.reset() + + # prepare prompt + prompt_text = ( + f"{structured_prompt.rstrip()}\n\n" + f"AGENT CONVERSATION TO BE SUMMARIZED:\n" + f"{conversation_text}" + ) + + # call summarizer agent with structured output + response = agent_to_summarize._context_summary_agent.step( + prompt_text, response_format=WorkflowSummary + ) + + if not response.msgs: + result["status"] = "Failed to generate summary" + return result + + summary_content = response.msgs[-1].content.strip() + structured_output = None + if response.msgs[-1].parsed: + structured_output = response.msgs[-1].parsed + + result.update( + { + "structured_summary": structured_output, + "summary_content": summary_content, + "status": "success", + } + ) + return result + + except Exception as exc: + error_message = f"Failed to generate summary: {exc}" + logger.error(error_message) + result["status"] = error_message + return result + + async def generate_workflow_summary_async( + self, + conversation_accumulator: Optional[ChatAgent] = None, + ) -> Dict[str, Any]: + r"""Asynchronously generate a workflow summary without saving to disk. + + This is the async version of generate_workflow_summary() that uses + astep() for non-blocking LLM calls. It does NOT save to disk - only + generates the summary content and structured output. + + Args: + conversation_accumulator (Optional[ChatAgent]): Optional + accumulator agent with collected conversations. If provided, + uses this instead of the main worker agent. + + Returns: + Dict[str, Any]: Result dictionary with: + - structured_summary: WorkflowSummary instance or None + - summary_content: Raw text content + - status: "success" or error message + """ + result: Dict[str, Any] = { + "structured_summary": None, + "summary_content": "", + "status": "", + } + + try: + # setup context utility + context_util = self._get_context_utility() + self.worker.set_context_utility(context_util) + + # prepare workflow summarization prompt + structured_prompt = self._prepare_workflow_prompt() # select agent for summarization agent_to_summarize = self.worker @@ -606,27 +799,79 @@ def save_workflow( f"summary" ) - # generate and save workflow summary - result = agent_to_summarize.summarize( - filename=filename, - summary_prompt=structured_prompt, - response_format=WorkflowSummary, + # get conversation from agent's memory + source_agent = ( + conversation_accumulator + if conversation_accumulator + else self.worker + ) + messages, _ = source_agent.memory.get_context() + + if not messages: + result["status"] = "No conversation context available" + return result + + # build conversation text + conversation_text, _ = ( + agent_to_summarize._build_conversation_text_from_messages( + messages, include_summaries=False + ) ) - # add worker metadata - result["worker_description"] = self.description + if not conversation_text: + result["status"] = "Conversation context is empty" + return result + + # create or reuse summarizer agent + if agent_to_summarize._context_summary_agent is None: + agent_to_summarize._context_summary_agent = ChatAgent( + system_message=( + "You are a helpful assistant that summarizes " + "conversations" + ), + model=agent_to_summarize.model_backend, + agent_id=f"{agent_to_summarize.agent_id}_context_summarizer", + ) + else: + agent_to_summarize._context_summary_agent.reset() + + # prepare prompt + prompt_text = ( + f"{structured_prompt.rstrip()}\n\n" + f"AGENT CONVERSATION TO BE SUMMARIZED:\n" + f"{conversation_text}" + ) + + # call summarizer agent with structured output (async) + response = await agent_to_summarize._context_summary_agent.astep( + prompt_text, response_format=WorkflowSummary + ) + + if not response.msgs: + result["status"] = "Failed to generate summary" + return result + + summary_content = response.msgs[-1].content.strip() + structured_output = None + if response.msgs[-1].parsed: + structured_output = response.msgs[-1].parsed + + result.update( + { + "structured_summary": structured_output, + "summary_content": summary_content, + "status": "success", + } + ) return result - except Exception as e: - return { - "status": "error", - "summary": "", - "file_path": None, - "worker_description": self.description, - "message": f"Failed to save workflow memories: {e!s}", - } + except Exception as exc: + error_message = f"Failed to generate summary: {exc}" + logger.error(error_message) + result["status"] = error_message + return result - async def save_workflow_content_async( + def save_workflow_content( self, workflow_summary: 'WorkflowSummary', context_utility: Optional[ContextUtility] = None, @@ -697,19 +942,75 @@ def _create_error_result(message: str) -> Dict[str, Any]: # set context utility on worker self.worker.set_context_utility(context_utility) - # determine filename from task_title - task_title = workflow_summary.task_title - clean_title = ContextUtility.sanitize_workflow_filename(task_title) - base_filename = ( - f"{clean_title}{self.config.workflow_filename_suffix}" - if clean_title - else "workflow" + # determine file path based on operation mode + operation_mode = getattr( + workflow_summary, 'operation_mode', 'create' ) + target_filename = getattr( + workflow_summary, 'target_workflow_filename', None + ) + + # validate operation_mode - default to create for unexpected values + if operation_mode not in ("create", "update"): + logger.warning( + f"Unexpected operation_mode '{operation_mode}', " + "defaulting to 'create'." + ) + operation_mode = "create" + + if operation_mode == "update": + # if only one workflow loaded and no target specified, + # assume agent meant that one + has_single_workflow = len(self._loaded_workflow_paths) == 1 + if not target_filename and has_single_workflow: + target_filename = next( + iter(self._loaded_workflow_paths.keys()) + ) + logger.info( + f"Auto-selecting single loaded workflow: " + f"{target_filename}" + ) + + # validate target filename exists in loaded workflows + if ( + target_filename + and target_filename in self._loaded_workflow_paths + ): + # use the stored path for the target workflow + file_path = Path( + self._loaded_workflow_paths[target_filename] + ) + base_filename = target_filename + logger.info(f"Updating existing workflow: {file_path}") + else: + # invalid or missing target, fall back to create mode + available = list(self._loaded_workflow_paths.keys()) + logger.warning( + f"Invalid target_workflow_filename " + f"'{target_filename}', available: {available}. " + "Falling back to create mode." + ) + operation_mode = "create" + + if operation_mode == "create": + # use task_title from summary for filename + task_title = workflow_summary.task_title + clean_title = ContextUtility.sanitize_workflow_filename( + task_title + ) + base_filename = ( + f"{clean_title}{self.config.workflow_filename_suffix}" + if clean_title + else "workflow" + ) + + file_path = ( + context_utility.get_working_directory() + / f"{base_filename}.md" + ) + logger.info(f"Creating new workflow: {file_path}") # check if workflow file already exists to handle versioning - file_path = ( - context_utility.get_working_directory() / f"{base_filename}.md" - ) existing_metadata = self._extract_existing_workflow_metadata( file_path ) @@ -741,8 +1042,12 @@ def _create_error_result(message: str) -> Dict[str, Any]: ) # convert WorkflowSummary to markdown + # exclude operation_mode and target_workflow_filename as they're + # only used for save logic, not persisted in the workflow file summary_content = context_utility.structured_output_to_markdown( - structured_data=workflow_summary, metadata=metadata + structured_data=workflow_summary, + metadata=metadata, + exclude_fields=['operation_mode', 'target_workflow_filename'], ) # save to disk @@ -757,10 +1062,9 @@ def _create_error_result(message: str) -> Dict[str, Any]: f"conversation from a previous session: {summary_content}" ) + status = "success" if save_status == "success" else save_status return { - "status": "success" - if save_status == "success" - else save_status, + "status": status, "summary": formatted_summary, "file_path": str(file_path), "worker_description": self.description, @@ -771,15 +1075,49 @@ def _create_error_result(message: str) -> Dict[str, Any]: f"Failed to save workflow content: {e!s}" ) + async def save_workflow_content_async( + self, + workflow_summary: 'WorkflowSummary', + context_utility: Optional[ContextUtility] = None, + conversation_accumulator: Optional[ChatAgent] = None, + ) -> Dict[str, Any]: + r"""Async wrapper for save_workflow_content. + + Delegates to sync version since file I/O operations are synchronous. + This method exists for API consistency with save_workflow_async(). + + Args: + workflow_summary (WorkflowSummary): Pre-generated workflow summary + object containing task_title, agent_title, etc. + context_utility (Optional[ContextUtility]): Context utility with + correct working directory. If None, uses default. + conversation_accumulator (Optional[ChatAgent]): An optional agent + that holds accumulated conversation history. Used to get + accurate message_count metadata. (default: :obj:`None`) + + Returns: + Dict[str, Any]: Result dictionary with keys: + - status (str): "success" or "error" + - summary (str): Formatted workflow summary + - file_path (str): Path to saved file + - worker_description (str): Worker description used + """ + return self.save_workflow_content( + workflow_summary=workflow_summary, + context_utility=context_utility, + conversation_accumulator=conversation_accumulator, + ) + async def save_workflow_async( self, conversation_accumulator: Optional[ChatAgent] = None ) -> Dict[str, Any]: r"""Asynchronously save the worker's current workflow memories using agent summarization. - This is the async version of save_workflow() that uses asummarize() for - non-blocking LLM calls, enabling parallel summarization of multiple - workers. + This is the async version of save_workflow() that uses a two-pass + approach: first generate the workflow summary (async LLM call), then + save to disk using the appropriate file path based on operation_mode + (update vs create). Args: conversation_accumulator (Optional[ChatAgent]): Optional @@ -794,54 +1132,40 @@ async def save_workflow_async( - worker_description (str): Worker description used """ try: - # setup context utility and agent - context_util = self._get_context_utility() - self.worker.set_context_utility(context_util) - - # prepare workflow summarization components - structured_prompt = self._prepare_workflow_prompt() + # pass 1: generate workflow summary (without saving to disk) + summary_result = await self.generate_workflow_summary_async( + conversation_accumulator=conversation_accumulator + ) - # select agent for summarization - agent_to_summarize = self.worker - if conversation_accumulator is not None: - accumulator_messages, _ = ( - conversation_accumulator.memory.get_context() - ) - if accumulator_messages: - conversation_accumulator.set_context_utility(context_util) - agent_to_summarize = conversation_accumulator - logger.info( - f"Using conversation accumulator with " - f"{len(accumulator_messages)} messages for workflow " - f"summary" - ) + if summary_result["status"] != "success": + return { + "status": "error", + "summary": "", + "file_path": None, + "worker_description": self.description, + "message": f"Failed to generate summary: " + f"{summary_result['status']}", + } - # check if we should use role_name or let asummarize extract - # task_title - clean_name = self._get_sanitized_role_name() - use_role_name_for_filename = not is_generic_role_name(clean_name) - - # generate and save workflow summary - # if role_name is explicit, use it for filename - # if role_name is generic, pass none to let asummarize use - # task_title - filename = ( - self._generate_workflow_filename() - if use_role_name_for_filename - else None - ) + workflow_summary = summary_result["structured_summary"] + if not workflow_summary: + return { + "status": "error", + "summary": "", + "file_path": None, + "worker_description": self.description, + "message": "No structured summary generated", + } - # **KEY CHANGE**: Using asummarize() instead of summarize() - result = await agent_to_summarize.asummarize( - filename=filename, - summary_prompt=structured_prompt, - response_format=WorkflowSummary, + # pass 2: save using save_workflow_content which handles + # operation_mode branching (sync - file I/O doesn't need async) + context_util = self._get_context_utility() + return self.save_workflow_content( + workflow_summary=workflow_summary, + context_utility=context_util, + conversation_accumulator=conversation_accumulator, ) - # add worker metadata - result["worker_description"] = self.description - return result - except Exception as e: return { "status": "error", @@ -1144,6 +1468,9 @@ def _collect_workflow_contents( ) -> List[Dict[str, str]]: r"""Collect and load workflow file contents. + Also populates the _loaded_workflow_paths mapping for use during + workflow save operations (to support update mode). + Args: workflow_files (List[str]): List of workflow file paths to load. @@ -1173,6 +1500,8 @@ def _collect_workflow_contents( workflows_to_load.append( {'filename': filename, 'content': content} ) + # store filename -> full path mapping for update mode + self._loaded_workflow_paths[filename] = file_path logger.info(f"Loaded workflow content: {filename}") else: logger.warning( @@ -1187,6 +1516,36 @@ def _collect_workflow_contents( return workflows_to_load + def _format_workflow_list( + self, workflows_to_load: List[Dict[str, str]] + ) -> str: + r"""Format a list of workflows into a readable string. + + This is a helper method that formats workflow content without + adding outer headers/footers. Used by _format_workflows_for_context + and _prepare_workflow_prompt. + + Args: + workflows_to_load (List[Dict[str, str]]): List of workflow + dicts with 'filename' and 'content' keys. + + Returns: + str: Formatted workflow list string. + """ + if not workflows_to_load: + return "" + + formatted_content = "" + for i, workflow_data in enumerate(workflows_to_load, 1): + formatted_content += ( + f"\n\n{'=' * 60}\n" + f"Workflow {i}: {workflow_data['filename']}\n" + f"{'=' * 60}\n\n" + f"{workflow_data['content']}" + ) + + return formatted_content + def _format_workflows_for_context( self, workflows_to_load: List[Dict[str, str]] ) -> str: @@ -1219,18 +1578,9 @@ def _format_workflows_for_context( "decisions for your current task." ) - # combine all workflows into single content block - combined_content = f"\n\n--- Previous Workflows ---\n{prefix_prompt}\n" - - for i, workflow_data in enumerate(workflows_to_load, 1): - combined_content += ( - f"\n\n{'=' * 60}\n" - f"Workflow {i}: {workflow_data['filename']}\n" - f"{'=' * 60}\n\n" - f"{workflow_data['content']}" - ) - - # add clear ending marker + # combine header, formatted workflows, and footer + combined_content = f"\n\n--- Previous Workflows ---\n{prefix_prompt}" + combined_content += self._format_workflow_list(workflows_to_load) combined_content += "\n\n--- End of Previous Workflows ---\n" return combined_content @@ -1274,6 +1624,7 @@ def _load_workflow_files( r"""Load workflow files and return count of successful loads. Loads all workflows together with a single header to avoid repetition. + Clears and repopulates the _loaded_workflow_paths mapping. Args: workflow_files (List[str]): List of workflow file paths to load. @@ -1285,7 +1636,11 @@ def _load_workflow_files( if not workflow_files: return 0 - # collect workflow contents from files + # clear previous mapping and cached contents before loading + self._loaded_workflow_paths.clear() + self._loaded_workflow_contents.clear() + + # collect workflow contents from files (also populates the mapping) workflows_to_load = self._collect_workflow_contents( workflow_files[:max_workflows] ) @@ -1293,6 +1648,9 @@ def _load_workflow_files( if not workflows_to_load: return 0 + # cache loaded contents for reuse in prompt preparation + self._loaded_workflow_contents = workflows_to_load + # format workflows into context string try: workflow_context = self._format_workflows_for_context( @@ -1341,10 +1699,48 @@ def _generate_workflow_filename(self) -> str: def _prepare_workflow_prompt(self) -> str: r"""Prepare the structured prompt for workflow summarization. + Includes operation mode instructions if workflows were loaded, + guiding the agent to decide whether to update an existing + workflow or create a new one. + Returns: str: Structured prompt for workflow summary. """ workflow_prompt = WorkflowSummary.get_instruction_prompt() + + # add operation mode instructions based on loaded workflows + if self._loaded_workflow_paths: + loaded_filenames = list(self._loaded_workflow_paths.keys()) + + workflow_prompt += ( + "\n\nOPERATION MODE SELECTION:\n" + "You have previously loaded workflow(s). Review them below " + "and decide whether to update one or create a new workflow." + "\n\nDecision rules:\n" + "- If this task is a continuation, improvement, or refinement " + "of a loaded workflow → set operation_mode='update' and " + "target_workflow_filename to that workflow's exact filename\n" + "- If this is a distinctly different task with different " + "goals/tools → set operation_mode='create'\n\n" + "When choosing 'update', select the single most relevant " + "workflow filename. The updated workflow should incorporate " + "learnings from this session.\n\n" + f"Available workflow filenames: {loaded_filenames}" + ) + + # include formatted workflow content for reference + if self._loaded_workflow_contents: + workflow_prompt += "\n\n--- Loaded Workflows Reference ---" + workflow_prompt += self._format_workflow_list( + self._loaded_workflow_contents + ) + workflow_prompt += "\n\n--- End of Loaded Workflows ---" + else: + workflow_prompt += ( + "\n\nOPERATION MODE:\n" + "No workflows were loaded. Set operation_mode='create'." + ) + return StructuredOutputHandler.generate_structured_prompt( base_prompt=workflow_prompt, schema=WorkflowSummary ) diff --git a/camel/societies/workforce/workforce.py b/camel/societies/workforce/workforce.py index ec0fe51c60..d1d83bd5ac 100644 --- a/camel/societies/workforce/workforce.py +++ b/camel/societies/workforce/workforce.py @@ -3030,24 +3030,16 @@ async def save_single_worker( result).""" if isinstance(child, SingleAgentWorker): try: - from camel.utils.context_utils import ( - ContextUtility, - WorkflowSummary, - ) + from camel.utils.context_utils import ContextUtility # TWO-PASS APPROACH FOR ROLE-BASED SAVING: - # Pass 1: Generate summary to get agent_title + # Use WorkflowMemoryManager which has access to + # _loaded_workflow_paths for operation_mode support workflow_manager = child._get_workflow_manager() - summary_prompt = ( - workflow_manager._prepare_workflow_prompt() - ) - # generate summary without saving - # use conversation accumulator if available + # Pass 1: Generate summary using manager (has loaded paths) gen_result = ( - await child.worker.generate_workflow_summary_async( - summary_prompt=summary_prompt, - response_format=WorkflowSummary, + await workflow_manager.generate_workflow_summary_async( conversation_accumulator=( child._conversation_accumulator ), @@ -3080,6 +3072,8 @@ async def save_single_worker( ) # save with correct context and accumulator + # save_workflow_content_async handles operation_mode + # branching result = ( await workflow_manager.save_workflow_content_async( workflow_summary=workflow_summary, diff --git a/camel/storages/vectordb_storages/qdrant.py b/camel/storages/vectordb_storages/qdrant.py index cc9e398b77..d61e8a9054 100644 --- a/camel/storages/vectordb_storages/qdrant.py +++ b/camel/storages/vectordb_storages/qdrant.py @@ -309,7 +309,7 @@ def update_payload( op_info = self._client.set_payload( collection_name=self.collection_name, payload=payload, - points=PointIdsList(points=points), + points=PointIdsList(points=points), # type: ignore[arg-type] **kwargs, ) if op_info.status != UpdateStatus.COMPLETED: @@ -376,7 +376,7 @@ def delete( op_info = self._client.delete( collection_name=self.collection_name, points_selector=PointIdsList( - points=cast(List[Union[int, str, UUID]], ids) + points=cast(List[Union[int, str, UUID]], ids) # type: ignore[arg-type] ), **kwargs, ) diff --git a/camel/storages/vectordb_storages/tidb.py b/camel/storages/vectordb_storages/tidb.py index 5d42c0e60e..5b75e3abbf 100644 --- a/camel/storages/vectordb_storages/tidb.py +++ b/camel/storages/vectordb_storages/tidb.py @@ -107,7 +107,7 @@ def _create_client( ) def _get_table_model(self, collection_name: str) -> Any: - from pytidb.datatype import JSON + from pytidb.datatype import JSON # type: ignore[import-not-found] from pytidb.schema import Field, TableModel, VectorField class VectorDBRecordBase(TableModel, table=False): @@ -130,7 +130,8 @@ def _open_and_create_table(self) -> "Table[Any]": if table is None: table_model = self._get_table_model(self.collection_name) table = self._client.create_table( - schema=table_model, if_exists="skip" + schema=table_model, + if_exists="skip", # type: ignore[call-arg] ) return table diff --git a/camel/utils/context_utils.py b/camel/utils/context_utils.py index 4bedebddb4..9c630e4349 100644 --- a/camel/utils/context_utils.py +++ b/camel/utils/context_utils.py @@ -16,7 +16,7 @@ import re from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Optional +from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Literal, Optional from pydantic import BaseModel, Field @@ -113,6 +113,26 @@ class WorkflowSummary(BaseModel): "'report-generation'.", default_factory=list, ) + operation_mode: Literal["create", "update"] = Field( + description=( + "Whether to create a new workflow file or update an existing one. " + "Use 'update' ONLY if you loaded a workflow that this task is a " + "continuation, improvement, or refinement of AND the task_title " + "should remain the same as the loaded workflow. Use 'create' if " + "this is a new/different task, if the task_title would be " + "different, or if no workflows were loaded." + ), + default="create", + ) + target_workflow_filename: Optional[str] = Field( + description=( + "When operation_mode is 'update', specify the exact filename " + "(without .md extension) of the loaded workflow to update. " + "Must match one of the loaded workflow filenames exactly. " + "Leave empty or null when operation_mode is 'create'." + ), + default=None, + ) @classmethod def get_instruction_prompt(cls) -> str: @@ -141,7 +161,14 @@ def get_instruction_prompt(cls) -> str: 'with hyphens (e.g., "data-analysis", "web-scraping") that ' 'describe the workflow domain, type, and key capabilities to ' 'help future agents discover this workflow when working on ' - 'similar tasks.' + 'similar tasks. ' + 'For operation_mode, decide whether to update an existing ' + 'workflow or create a new one. Use "update" ONLY if you loaded ' + 'a workflow that this task is a continuation or refinement of ' + 'AND the task_title should remain the same. If the task is ' + 'different enough to warrant a different task_title, use "create" ' + 'instead. When using "update", keep the same task_title as the ' + 'loaded workflow and specify target_workflow_filename.' ) @@ -365,6 +392,7 @@ def structured_output_to_markdown( metadata: Optional[Dict[str, Any]] = None, title: Optional[str] = None, field_mappings: Optional[Dict[str, str]] = None, + exclude_fields: Optional[List[str]] = None, ) -> str: r"""Convert any Pydantic BaseModel instance to markdown format. @@ -374,6 +402,8 @@ def structured_output_to_markdown( title: Optional custom title, defaults to model class name field_mappings: Optional mapping of field names to custom section titles + exclude_fields: Optional list of field names to exclude from + the markdown output Returns: str: Markdown formatted content @@ -396,8 +426,12 @@ def structured_output_to_markdown( # Get model fields and values model_dict = structured_data.model_dump() + exclude_set = set(exclude_fields) if exclude_fields else set() for field_name, field_value in model_dict.items(): + # Skip excluded fields + if field_name in exclude_set: + continue # Use custom mapping or convert field name to title case if field_mappings and field_name in field_mappings: section_title = field_mappings[field_name] diff --git a/examples/workforce/workforce_workflow_memory_example.py b/examples/workforce/workforce_workflow_memory_example.py index a6b47c4d25..805ab21617 100644 --- a/examples/workforce/workforce_workflow_memory_example.py +++ b/examples/workforce/workforce_workflow_memory_example.py @@ -49,7 +49,7 @@ def create_math_agent() -> ChatAgent: "mathematical concepts. Use the math tools available to you." ), ) - # Use DEFAULT to automatically use Azure OpenAI (configured in .env) + model = ModelFactory.create( model_platform=ModelPlatformType.DEFAULT, model_type=ModelType.DEFAULT, diff --git a/test/models/test_cerebras_model.py b/test/models/test_cerebras_model.py index 4f78f64b03..94bd922c03 100644 --- a/test/models/test_cerebras_model.py +++ b/test/models/test_cerebras_model.py @@ -30,7 +30,8 @@ ModelType.CEREBRAS_QWEN_3_32B, ], ) -def test_cerebras_model(model_type: ModelType): +def test_cerebras_model(model_type: ModelType, monkeypatch): + monkeypatch.setenv("CEREBRAS_API_KEY", "test_key") model_config_dict = CerebrasConfig().as_dict() model = CerebrasModel(model_type, model_config_dict) assert model.model_type == model_type diff --git a/test/workforce/test_workflow_memory.py b/test/workforce/test_workflow_memory.py index 0ec257e081..1dfdb80d29 100644 --- a/test/workforce/test_workflow_memory.py +++ b/test/workforce/test_workflow_memory.py @@ -202,19 +202,49 @@ class TestSingleAgentWorkerWorkflow: @pytest.mark.asyncio async def test_save_workflow_success(self, temp_context_dir): """Test successful workflow saving.""" + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import WorkflowSummary + worker = MockSingleAgentWorker("test_analyst") - # Mock the asummarize method to return success - mock_result = { + # Mock the WorkflowMemoryManager methods that are now used + mock_workflow_summary = WorkflowSummary( + agent_title="test_analyst", + task_title="Test Analysis", + task_description="Test analysis task", + tools=[], + steps=["Step 1"], + tags=["test"], + ) + + mock_gen_result = { + "status": "success", + "structured_summary": mock_workflow_summary, + "summary_content": "Test workflow summary", + } + + mock_save_result = { "status": "success", "summary": "Test workflow summary", "file_path": ( f"{temp_context_dir}/test_analyst_workflow_20250122.md" ), + "worker_description": "test_analyst", } - with patch.object( - worker.worker, 'asummarize', return_value=mock_result + with ( + patch.object( + WorkflowMemoryManager, + 'generate_workflow_summary_async', + return_value=mock_gen_result, + ), + patch.object( + WorkflowMemoryManager, + 'save_workflow_content', + return_value=mock_save_result, + ), ): result = await worker.save_workflow_memories_async() @@ -242,11 +272,17 @@ async def test_save_workflow_non_chat_agent(self): @pytest.mark.asyncio async def test_save_workflow_exception(self, temp_context_dir): """Test workflow saving when exception occurs.""" + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + worker = MockSingleAgentWorker("test_analyst") - # Mock asummarize to raise exception + # Mock generate_workflow_summary_async to raise exception with patch.object( - worker.worker, 'asummarize', side_effect=Exception("Test error") + WorkflowMemoryManager, + 'generate_workflow_summary_async', + side_effect=Exception("Test error"), ): result = await worker.save_workflow_memories_async() @@ -386,22 +422,22 @@ async def test_save_workflow_memories_real_execution( """Test save_workflow_memories_async with real internal execution. This test exercises the actual save_workflow_memories_async() - implementation, only mocking the ChatAgent.asummarize() method to - avoid LLM API calls. + implementation, mocking WorkflowMemoryManager methods to avoid + LLM API calls. Tests the following internal behavior: - Workforce iteration through child workers - SingleAgentWorker validation logic - (_validate_workflow_save_requirements) - Context utility setup and agent configuration - Filename generation from worker description - (_generate_workflow_filename) - - Workflow prompt preparation (_prepare_workflow_prompt) - - Agent selection for summarization (_select_agent_for_summarization) - - Worker metadata addition to results - Result processing and error handling in Workforce - Shared context utility setup between Workforce and workers """ + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import WorkflowSummary + workforce = Workforce("Test Workforce") worker = MockSingleAgentWorker("data_analyst") workforce._children = [worker] @@ -419,11 +455,9 @@ async def test_save_workflow_memories_real_execution( # Store initial conversation accumulator state initial_accumulator = worker._conversation_accumulator - # Mock only the ChatAgent.generate_workflow_summary_async() method + # Mock WorkflowMemoryManager.generate_workflow_summary_async() # Role-based structure: # workforce_workflows/{role}/{task_title}_workflow.md - from camel.utils.context_utils import WorkflowSummary - mock_workflow_summary = WorkflowSummary( agent_title="test_worker", task_title="Analyze Sales Data", @@ -439,11 +473,28 @@ async def test_save_workflow_memories_real_execution( "summary_content": "Completed data analysis workflow", } - with patch.object( - ChatAgent, - 'generate_workflow_summary_async', - return_value=mock_gen_result, - ) as mock_generate: + mock_save_result = { + "status": "success", + "summary": "Completed data analysis workflow", + "file_path": ( + f"{temp_context_dir}/workforce_workflows/test_worker/" + "analyze_sales_data_workflow.md" + ), + "worker_description": "data_analyst", + } + + with ( + patch.object( + WorkflowMemoryManager, + 'generate_workflow_summary_async', + return_value=mock_gen_result, + ) as mock_generate, + patch.object( + WorkflowMemoryManager, + 'save_workflow_content_async', + return_value=mock_save_result, + ), + ): # This executes the real save_workflow_memories_async() logic results = await workforce.save_workflow_memories_async() @@ -457,17 +508,6 @@ async def test_save_workflow_memories_real_execution( # Verify generate_workflow_summary_async was called mock_generate.assert_called_once() - call_kwargs = mock_generate.call_args[1] - - # Verify structured output format is set (WorkflowSummary) - assert 'response_format' in call_kwargs - from camel.utils.context_utils import WorkflowSummary - - assert call_kwargs['response_format'] == WorkflowSummary - - # Verify summary prompt was prepared - assert 'summary_prompt' in call_kwargs - assert call_kwargs['summary_prompt'] is not None # Verify conversation accumulator cleanup # (it should be None after successful save) @@ -553,33 +593,54 @@ class TestWorkflowIntegration: """Integration tests for workflow functionality.""" @pytest.mark.asyncio - async def test_filename_sanitization(self): + async def test_filename_sanitization(self, temp_context_dir): """Test worker descriptions are properly sanitized for filenames.""" + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import WorkflowSummary + worker = MockSingleAgentWorker("Data Analyst & ML Engineer!") - # Mock datetime for consistent testing - with patch( - 'camel.societies.workforce.single_agent_worker.datetime' - ) as mock_datetime: - mock_datetime.datetime.now.return_value.strftime.return_value = ( - "20250122_143022" - ) + mock_workflow_summary = WorkflowSummary( + agent_title="data_analyst_ml_engineer", + task_title="Test Analysis", + task_description="Test analysis task", + tools=[], + steps=["Step 1"], + tags=["test"], + ) - mock_result = { - "status": "success", - "summary": "Test summary", - "file_path": ( - "/path/to/data_analyst_ml_engineer_workflow_" - "20250122_143022.md" - ), - } + mock_gen_result = { + "status": "success", + "structured_summary": mock_workflow_summary, + "summary_content": "Test summary", + } - with patch.object( - worker.worker, 'asummarize', return_value=mock_result - ): - # Verify the filename generation works with special characters - result = await worker.save_workflow_memories_async() - assert result["status"] == "success" + mock_save_result = { + "status": "success", + "summary": "Test summary", + "file_path": ( + f"{temp_context_dir}/data_analyst_ml_engineer_workflow.md" + ), + "worker_description": "Data Analyst & ML Engineer!", + } + + with ( + patch.object( + WorkflowMemoryManager, + 'generate_workflow_summary_async', + return_value=mock_gen_result, + ), + patch.object( + WorkflowMemoryManager, + 'save_workflow_content', + return_value=mock_save_result, + ), + ): + # Verify the filename generation works with special characters + result = await worker.save_workflow_memories_async() + assert result["status"] == "success" @pytest.mark.asyncio async def test_long_description_filename_generation( @@ -590,6 +651,11 @@ async def test_long_description_filename_generation( This test addresses issue #3277 where long worker descriptions caused filesystem errors due to excessive filename lengths. """ + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import WorkflowSummary + # create worker with extremely long description (like in eigent.py) long_desc = ( "Developer Agent: A master-level coding assistant with a " @@ -610,14 +676,40 @@ async def test_long_description_filename_generation( enable_workflow_memory=True, ) - # mock the asummarize method - mock_result = { + mock_workflow_summary = WorkflowSummary( + agent_title="developer_agent", + task_title="Code Development Task", + task_description="Developed code", + tools=[], + steps=["Step 1"], + tags=["development"], + ) + + mock_gen_result = { + "status": "success", + "structured_summary": mock_workflow_summary, + "summary_content": "Test workflow summary", + } + + mock_save_result = { "status": "success", "summary": "Test workflow summary", "file_path": f"{temp_context_dir}/developer_agent_workflow.md", + "worker_description": long_desc, } - with patch.object(agent, 'asummarize', return_value=mock_result): + with ( + patch.object( + WorkflowMemoryManager, + 'generate_workflow_summary_async', + return_value=mock_gen_result, + ), + patch.object( + WorkflowMemoryManager, + 'save_workflow_content', + return_value=mock_save_result, + ), + ): result = await worker.save_workflow_memories_async() assert result["status"] == "success" @@ -641,6 +733,11 @@ async def test_filename_generation_with_generic_role_name( When agent has a generic role_name like "assistant", the filename should be based on the task_title from the generated workflow summary. """ + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import WorkflowSummary + # create worker with generic role_name sys_msg = BaseMessage.make_assistant_message( role_name="Assistant", # generic role name @@ -662,17 +759,40 @@ async def test_filename_generation_with_generic_role_name( 'system', } - # mock the asummarize method to return a workflow with task_title - mock_result = { + mock_workflow_summary = WorkflowSummary( + agent_title="assistant", + task_title="Analyze sales data", + task_description="Analyzed sales data", + tools=[], + steps=["Step 1"], + tags=["data-analysis"], + ) + + mock_gen_result = { + "status": "success", + "structured_summary": mock_workflow_summary, + "summary_content": "Test workflow summary", + } + + mock_save_result = { "status": "success", "summary": "Test workflow summary", "file_path": f"{temp_context_dir}/analyze_sales_data_workflow.md", - "structured_summary": type( - 'obj', (object,), {'task_title': 'Analyze sales data'} - )(), + "worker_description": "Data analyst worker", } - with patch.object(agent, 'asummarize', return_value=mock_result): + with ( + patch.object( + WorkflowMemoryManager, + 'generate_workflow_summary_async', + return_value=mock_gen_result, + ), + patch.object( + WorkflowMemoryManager, + 'save_workflow_content', + return_value=mock_save_result, + ), + ): # note: in real execution, file would be renamed to use task_title # the test shows that with generic role_name, we rely on task_title result = await worker.save_workflow_memories_async() @@ -1510,6 +1630,11 @@ async def test_accumulator_cleanup_after_successful_save( Validates cleanup to prevent memory leaks and duplicate content in future saves. """ + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import WorkflowSummary + sys_msg = BaseMessage.make_assistant_message( role_name="Test Worker", content="You are a test worker." ) @@ -1538,14 +1663,39 @@ async def test_accumulator_cleanup_after_successful_save( ), "Accumulator should exist after processing tasks" # save workflow with mocked summarization - mock_summary_result = { + mock_workflow_summary = WorkflowSummary( + agent_title="test_worker", + task_title="Test Task", + task_description="Test task description", + tools=[], + steps=["Step 1"], + tags=["test"], + ) + + mock_gen_result = { + "status": "success", + "structured_summary": mock_workflow_summary, + "summary_content": "Test summary", + } + + mock_save_result = { "status": "success", "summary": "Test summary", "file_path": f"{temp_context_dir}/test_workflow.md", + "worker_description": "test_worker", } - with patch.object( - ChatAgent, 'asummarize', return_value=mock_summary_result + with ( + patch.object( + WorkflowMemoryManager, + 'generate_workflow_summary_async', + return_value=mock_gen_result, + ), + patch.object( + WorkflowMemoryManager, + 'save_workflow_content', + return_value=mock_save_result, + ), ): result = await worker.save_workflow_memories_async() @@ -2403,90 +2553,3 @@ async def test_extract_metadata_from_existing_workflow( assert metadata.message_count == 42 assert metadata.created_at == "2025-01-15T10:00:00.000000" assert metadata.updated_at == "2025-01-15T12:00:00.000000" - - @pytest.mark.asyncio - async def test_different_workflows_have_independent_versions( - self, temp_context_dir - ): - """Test that different workflow files maintain independent versions.""" - from pathlib import Path - - from camel.societies.workforce.workflow_memory_manager import ( - WorkflowMemoryManager, - ) - from camel.utils.context_utils import ContextUtility, WorkflowSummary - - worker = MockSingleAgentWorker("multi_task_worker") - - role_name = "multi_task_worker" - context_utility = ContextUtility( - working_directory=f"{temp_context_dir}/workforce_workflows/{role_name}", - create_folder=True, - use_session_subfolder=False, - ) - - manager = WorkflowMemoryManager( - worker=worker.worker, - description="Multi-task worker", - context_utility=context_utility, - role_identifier=role_name, - ) - - # create first workflow - workflow1 = WorkflowSummary( - agent_title="multi_task_worker", - task_title="Task Alpha", - task_description="First task", - tools=[], - steps=["Step 1"], - failure_and_recovery_strategies=[], - notes_and_observations="", - tags=["alpha"], - ) - - # create second workflow with different title - workflow2 = WorkflowSummary( - agent_title="multi_task_worker", - task_title="Task Beta", - task_description="Second task", - tools=[], - steps=["Step 1"], - failure_and_recovery_strategies=[], - notes_and_observations="", - tags=["beta"], - ) - - # save workflow1 twice (should be v1 then v2) - result1a = await manager.save_workflow_content_async( - workflow_summary=workflow1, - context_utility=context_utility, - conversation_accumulator=None, - ) - assert result1a["status"] == "success" - file1 = Path(result1a["file_path"]) - assert "workflow_version: 1" in file1.read_text() - - await manager.save_workflow_content_async( - workflow_summary=workflow1, - context_utility=context_utility, - conversation_accumulator=None, - ) - assert "workflow_version: 2" in file1.read_text() - - # save workflow2 (should be v1, independent of workflow1) - result2a = await manager.save_workflow_content_async( - workflow_summary=workflow2, - context_utility=context_utility, - conversation_accumulator=None, - ) - assert result2a["status"] == "success" - file2 = Path(result2a["file_path"]) - - # workflow2 should be v1 (its first save) - assert "workflow_version: 1" in file2.read_text() - - # workflow1 should still be v2 - assert "workflow_version: 2" in file1.read_text() - - # verify they are different files - assert file1 != file2