From 801a11c51fc34228f518083605cf460c5630098b Mon Sep 17 00:00:00 2001 From: zshanhui Date: Thu, 20 Nov 2025 17:10:24 +0800 Subject: [PATCH 01/10] implement log_message in WorkforceLogger; switch print statements to use log_message --- camel/societies/workforce/events.py | 4 + camel/societies/workforce/workforce.py | 139 ++++++++++-------- camel/societies/workforce/workforce_logger.py | 37 ++++- 3 files changed, 115 insertions(+), 65 deletions(-) diff --git a/camel/societies/workforce/events.py b/camel/societies/workforce/events.py index 2800739319..593d421f83 100644 --- a/camel/societies/workforce/events.py +++ b/camel/societies/workforce/events.py @@ -38,6 +38,10 @@ class WorkforceEventBase(BaseModel): default_factory=lambda: datetime.now(timezone.utc) ) +class LogEvent(WorkforceEventBase): + event_type: Literal["log"] = "log" + message: str + level: Literal["info", "warning", "error", "success"] class WorkerCreatedEvent(WorkforceEventBase): event_type: Literal["worker_created"] = "worker_created" diff --git a/camel/societies/workforce/workforce.py b/camel/societies/workforce/workforce.py index 012a8cd74b..7663d39080 100644 --- a/camel/societies/workforce/workforce.py +++ b/camel/societies/workforce/workforce.py @@ -94,6 +94,7 @@ from .events import ( AllTasksCompletedEvent, + LogEvent, TaskAssignedEvent, TaskCompletedEvent, TaskCreatedEvent, @@ -3084,6 +3085,7 @@ async def _find_assignee( return TaskAssignResult(assignments=all_assignments) async def _post_task(self, task: Task, assignee_id: str) -> None: + logger.warning(f'[Workforce._post_task] posting task {task.id} to {assignee_id}') # Record the start time when a task is posted self._task_start_times[task.id] = time.time() @@ -3106,10 +3108,13 @@ async def _post_task(self, task: Task, assignee_id: str) -> None: logger.error( f"Failed to post task {task.id} to {assignee_id}: {e}" ) - print( - f"{Fore.RED}Failed to post task {task.id} to {assignee_id}: " - f"{e}{Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message( + LogEvent( + message=f"Failed to post task {task.id} to {assignee_id}: {e}", + level="error" + ) + ) async def _post_dependency(self, dependency: Task) -> None: await self._channel.post_dependency(dependency, self.node_id) @@ -3236,7 +3241,12 @@ async def _create_worker_node_for_task(self, task: Task) -> Worker: ) new_node.set_channel(self._channel) - print(f"{Fore.CYAN}{new_node} created.{Fore.RESET}") + for cb in self._callbacks: + cb.log_message( + LogEvent( + message=f"{new_node} created.", + level="success" + )) self._children.append(new_node) @@ -3538,11 +3548,10 @@ async def _handle_failed_task(self, task: Task) -> bool: f"{task.failure_count}/{MAX_TASK_RETRIES}): {detailed_error}" ) - print( - f"{Fore.RED}❌ Task {task.id} failed " - f"(attempt {task.failure_count}/{MAX_TASK_RETRIES}): " - f"{failure_reason}{Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message(LogEvent( + message=f"❌ Task {task.id} failed (attempt {task.failure_count}/{MAX_TASK_RETRIES}): {failure_reason}", + level="error")) task_failed_event = TaskFailedEvent( task_id=task.id, @@ -3711,10 +3720,10 @@ async def _handle_completed_task(self, task: Task) -> None: tasks_list.pop(i) self._pending_tasks = deque(tasks_list) found_and_removed = True - print( - f"{Fore.GREEN}✅ Task {task.id} completed and removed " - f"from queue.{Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message(LogEvent( + message=f"✅ Task {task.id} completed and removed from queue.", + level="warning")) break if not found_and_removed: @@ -3847,7 +3856,11 @@ def dump_workforce_logs(self, file_path: str) -> None: cb for cb in self._callbacks if isinstance(cb, WorkforceMetrics) ] if len(metrics_cb) == 0: - print("Logger not initialized. Cannot dump logs.") + for cb in self._callbacks: + cb.log_message(LogEvent( + message="Logger not initialized. Cannot dump logs.", + level="info" + )) return metrics_cb[0].dump_to_json(file_path) # Use logger.info or print, consistent with existing style @@ -4142,25 +4155,25 @@ async def _listen_to_channel(self) -> None: # Do not halt if we have main tasks in queue if len(self.get_main_task_queue()) > 0: - print( - f"{Fore.RED}Task {returned_task.id} has " - f"failed for {MAX_TASK_RETRIES} times " - f"after insufficient results, skipping " - f"that task. Final error: " - f"{returned_task.result or 'Unknown err'}" - f"{Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message( + LogEvent( + message=f"Task {returned_task.id} has failed for {MAX_TASK_RETRIES} times after insufficient results, skipping that task. Final error: {returned_task.result or 'Unknown error'}", + level="error" + ) + ) + self._skip_requested = True continue - print( - f"{Fore.RED}Task {returned_task.id} has " - f"failed for {MAX_TASK_RETRIES} times after " - f"insufficient results, halting the " - f"workforce. Final error: " - f"{returned_task.result or 'Unknown error'}" - f"{Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message( + LogEvent( + message=f"Task {returned_task.id} has failed for {MAX_TASK_RETRIES} times after insufficient results, halting the workforce. Final error: {returned_task.result or 'Unknown error'}", + level="error" + ) + ) + await self._graceful_shutdown(returned_task) break except Exception as e: @@ -4185,12 +4198,14 @@ async def _listen_to_channel(self) -> None: # Check retry limit before attempting recovery if returned_task.failure_count >= 2: - print( - f"{Fore.YELLOW}Task {returned_task.id} " - f"completed with low quality score: " - f"{quality_eval.quality_score} " - f"(retry limit reached){Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message( + LogEvent( + level='warning', + message=f'Task {returned_task.id} completed with low quality score: {quality_eval.quality_score} (retry limit reached)' + ) + ) + await self._handle_completed_task( returned_task ) @@ -4203,14 +4218,10 @@ async def _listen_to_channel(self) -> None: if quality_eval.recovery_strategy else "" ) - print( - f"{Fore.YELLOW}⚠️ Task {returned_task.id} " - f"failed quality check (score: " - f"{quality_eval.quality_score}). " - f"Issues: {', '.join(quality_eval.issues)}. " - f"Recovery: {recovery_action}{Fore.RESET}" - ) - + for cb in self._callbacks: + cb.log_message(LogEvent( + level="warning", + message=f"⚠️ Task {returned_task.id} failed quality check (score: {quality_eval.quality_score}). Issues: {', '.join(quality_eval.issues)}. Recovery: {recovery_action}")) # Mark as failed for recovery returned_task.failure_count += 1 returned_task.state = TaskState.FAILED @@ -4247,11 +4258,13 @@ async def _listen_to_channel(self) -> None: ) continue else: - print( - f"{Fore.CYAN}Task {returned_task.id} " - f"completed successfully (quality score: " - f"{quality_eval.quality_score}).{Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message( + LogEvent( + level="success", + message=f"Task {returned_task.id} completed successfully (quality score: {quality_eval.quality_score})." + ) + ) await self._handle_completed_task(returned_task) elif returned_task.state == TaskState.FAILED: try: @@ -4261,23 +4274,21 @@ async def _listen_to_channel(self) -> None: # Do not halt if we have main tasks in queue if len(self.get_main_task_queue()) > 0: - print( - f"{Fore.RED}Task {returned_task.id} has " - f"failed for {MAX_TASK_RETRIES} times, " - f"skipping that task. Final error: " - f"{returned_task.result or 'Unknown error'}" - f"{Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message( + LogEvent( + level="error", + message=f"Task {returned_task.id} has failed for {MAX_TASK_RETRIES} times, skipping that task. Final error: {returned_task.result or 'Unknown error'}" + ) + ) self._skip_requested = True continue - print( - f"{Fore.RED}Task {returned_task.id} has failed " - f"for {MAX_TASK_RETRIES} times, halting " - f"the workforce. Final error: " - f"{returned_task.result or 'Unknown error'}" - f"{Fore.RESET}" - ) + for cb in self._callbacks: + cb.log_message(event=LogEvent( + level="error", + message=f"Task {returned_task.id} has failed for {MAX_TASK_RETRIES} times, halting the workforce. Final error: {returned_task.result or 'Unknown error'}" + )) # Graceful shutdown instead of immediate break await self._graceful_shutdown(returned_task) break diff --git a/camel/societies/workforce/workforce_logger.py b/camel/societies/workforce/workforce_logger.py index b04c134288..f30577a320 100644 --- a/camel/societies/workforce/workforce_logger.py +++ b/camel/societies/workforce/workforce_logger.py @@ -13,11 +13,15 @@ # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= import json from datetime import datetime, timezone +import logging from typing import Any, Dict, List, Optional -from camel.logger import get_logger +from colorama import Fore + +from camel.logger import get_logger, set_log_level from camel.societies.workforce.events import ( AllTasksCompletedEvent, + LogEvent, QueueStatusEvent, TaskAssignedEvent, TaskCompletedEvent, @@ -34,6 +38,12 @@ logger = get_logger(__name__) +_COLOR_MAP = { + "warning": Fore.YELLOW, + "error": Fore.RED, + "success": Fore.CYAN, +} + class WorkforceLogger(WorkforceCallback, WorkforceMetrics): r"""Logs events and metrics for a Workforce instance.""" @@ -50,6 +60,31 @@ def __init__(self, workforce_id: str): self._worker_information: Dict[str, Dict[str, Any]] = {} self._initial_worker_logs: List[Dict[str, Any]] = [] + def _color_message(self, event: LogEvent) -> str: + if event.level not in _COLOR_MAP: + return event.message + + if event.level == "info": + return event.message + color = _COLOR_MAP.get(event.level) + return f"{color}{event.message}{Fore.RESET}" + + def log_message(self, event: LogEvent) -> None: + handler = logging.StreamHandler() + handler.setLevel(logging.INFO) + logger.addHandler(handler) + message = self._color_message(event) + + try: + if event.level in ["info", "success"]: + logger.info(message) + if event.level == "warning": + logger.warning(message) + if event.level == "error": + logger.error(message) + finally: + logger.removeHandler(handler) + def _log_event(self, event_type: str, **kwargs: Any) -> None: r"""Internal method to create and store a log entry. From 46cb07fed32bb544fa3d8b172a83953020317e17 Mon Sep 17 00:00:00 2001 From: zshanhui Date: Fri, 21 Nov 2025 15:40:05 +0800 Subject: [PATCH 02/10] small fixes --- camel/societies/workforce/workforce.py | 1 - camel/societies/workforce/workforce_logger.py | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/camel/societies/workforce/workforce.py b/camel/societies/workforce/workforce.py index d06f2a25d4..e393fc068a 100644 --- a/camel/societies/workforce/workforce.py +++ b/camel/societies/workforce/workforce.py @@ -3624,7 +3624,6 @@ async def _find_assignee( return TaskAssignResult(assignments=all_assignments) async def _post_task(self, task: Task, assignee_id: str) -> None: - logger.warning(f'[Workforce._post_task] posting task {task.id} to {assignee_id}') # Record the start time when a task is posted self._task_start_times[task.id] = time.time() diff --git a/camel/societies/workforce/workforce_logger.py b/camel/societies/workforce/workforce_logger.py index f30577a320..baba18fcb4 100644 --- a/camel/societies/workforce/workforce_logger.py +++ b/camel/societies/workforce/workforce_logger.py @@ -61,10 +61,7 @@ def __init__(self, workforce_id: str): self._initial_worker_logs: List[Dict[str, Any]] = [] def _color_message(self, event: LogEvent) -> str: - if event.level not in _COLOR_MAP: - return event.message - - if event.level == "info": + if event.level not in _COLOR_MAP or event.level == 'info': return event.message color = _COLOR_MAP.get(event.level) return f"{color}{event.message}{Fore.RESET}" From 0ea6f6dd1b5b95a7a3cb461128dcabff51eab438 Mon Sep 17 00:00:00 2001 From: Srimadhava K <120107598+sk5268@users.noreply.github.com> Date: Mon, 24 Nov 2025 19:14:17 +0530 Subject: [PATCH 03/10] Feat: Add Cerebras Platform Models. (#3424) Co-authored-by: Saed Bhati <105969318+Saedbhati@users.noreply.github.com> Co-authored-by: JINO ROHIT --- .env.example | 3 + camel/configs/__init__.py | 3 + camel/configs/cerebras_config.py | 96 +++++++++++++++++++++++ camel/models/__init__.py | 2 + camel/models/cerebras_model.py | 83 ++++++++++++++++++++ camel/models/model_factory.py | 2 + camel/types/enums.py | 27 +++++++ examples/models/cerebras_model_example.py | 48 ++++++++++++ test/models/test_cerebras_model.py | 40 ++++++++++ 9 files changed, 304 insertions(+) create mode 100644 camel/configs/cerebras_config.py create mode 100644 camel/models/cerebras_model.py create mode 100644 examples/models/cerebras_model_example.py create mode 100644 test/models/test_cerebras_model.py diff --git a/.env.example b/.env.example index f6c752d5c4..b4c96a3d27 100644 --- a/.env.example +++ b/.env.example @@ -10,6 +10,9 @@ # Models API #=========================================== +# Cerebras API (https://chat.cerebras.ai/) +# CEREBRAS_API_KEY="Fill your API key here" + # OpenAI API (https://platform.openai.com/signup) # OPENAI_API_KEY="Fill your API key here" diff --git a/camel/configs/__init__.py b/camel/configs/__init__.py index 620ab91503..dd3cead2ed 100644 --- a/camel/configs/__init__.py +++ b/camel/configs/__init__.py @@ -17,6 +17,7 @@ from .anthropic_config import ANTHROPIC_API_PARAMS, AnthropicConfig from .base_config import BaseConfig from .bedrock_config import BEDROCK_API_PARAMS, BedrockConfig +from .cerebras_config import CEREBRAS_API_PARAMS, CerebrasConfig from .cohere_config import COHERE_API_PARAMS, CohereConfig from .cometapi_config import COMETAPI_API_PARAMS, CometAPIConfig from .crynux_config import CRYNUX_API_PARAMS, CrynuxConfig @@ -93,6 +94,8 @@ 'SAMBA_CLOUD_API_PARAMS', 'TogetherAIConfig', 'TOGETHERAI_API_PARAMS', + 'CerebrasConfig', + 'CEREBRAS_API_PARAMS', 'CohereConfig', 'COHERE_API_PARAMS', 'CometAPIConfig', diff --git a/camel/configs/cerebras_config.py b/camel/configs/cerebras_config.py new file mode 100644 index 0000000000..31839c1009 --- /dev/null +++ b/camel/configs/cerebras_config.py @@ -0,0 +1,96 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +from __future__ import annotations + +from typing import Dict, Optional, Sequence, Union + +from camel.configs.base_config import BaseConfig + + +class CerebrasConfig(BaseConfig): + r"""Defines the parameters for generating chat completions using Cerebras + compatibility. + + Reference: https://inference-docs.cerebras.ai/resources/openai + + Args: + temperature (float, optional): Sampling temperature to use, between + :obj:`0` and :obj:`2`. Higher values make the output more random, + while lower values make it more focused and deterministic. + (default: :obj:`None`) + + top_p (float, optional): An alternative to sampling with temperature, + called nucleus sampling, where the model considers the results of + the tokens with top_p probability mass. So :obj:`0.1` means only + the tokens comprising the top 10% probability mass are considered. + (default: :obj:`None`) + + response_format (object, optional): An object specifying the format + that the model must output.Setting to {"type": "json_object"} + enables JSON mode, which guarantees the message the model + generates is valid JSON. (default: :obj:`None`) + + stream (bool, optional): If True, partial message deltas will be sent + as data-only server-sent events as they become available. + (default: :obj:`None`) + + stop (str or list, optional): Up to :obj:`4` sequences where the API + will stop generating further tokens. (default: :obj:`None`) + + max_tokens (int, optional): The maximum number of tokens to generate + in the chat completion. The total length of input tokens and + generated tokens is limited by the model's context length. + (default: :obj:`None`) + + user (str, optional): A unique identifier representing your end-user, + which can help OpenAI to monitor and detect abuse. + (default: :obj:`None`) + + tools (list[FunctionTool], optional): A list of tools the model may + call. Currently, only functions are supported as a tool. Use this + to provide a list of functions the model may generate JSON inputs + for. A max of 128 functions are supported. + + tool_choice (Union[dict[str, str], str], optional): Controls which (if + any) tool is called by the model. :obj:`"none"` means the model + will not call any tool and instead generates a message. + :obj:`"auto"` means the model can pick between generating a + message or calling one or more tools. :obj:`"required"` means the + model must call one or more tools. Specifying a particular tool + via {"type": "function", "function": {"name": "my_function"}} + forces the model to call that tool. :obj:`"none"` is the default + when no tools are present. :obj:`"auto"` is the default if tools + are present. + + reasoning_effort(str, optional): A parameter specifying the level of + reasoning used by certain model types. Valid values are :obj: + `"low"`, :obj:`"medium"`, or :obj:`"high"`. If set, it is only + applied to the model types that support it (e.g., :obj:`o1`, + :obj:`o1mini`, :obj:`o1preview`, :obj:`o3mini`). If not provided + or if the model type does not support it, this parameter is + ignored. (default: :obj:`None`) + """ + + temperature: Optional[float] = None + top_p: Optional[float] = None + stream: Optional[bool] = None + stop: Optional[Union[str, Sequence[str]]] = None + max_tokens: Optional[int] = None + response_format: Optional[Dict] = None + user: Optional[str] = None + tool_choice: Optional[Union[Dict[str, str], str]] = None + reasoning_effort: Optional[str] = None + + +CEREBRAS_API_PARAMS = {param for param in CerebrasConfig.model_fields.keys()} diff --git a/camel/models/__init__.py b/camel/models/__init__.py index 55f728a40d..bab6229706 100644 --- a/camel/models/__init__.py +++ b/camel/models/__init__.py @@ -19,6 +19,7 @@ from .azure_openai_model import AzureOpenAIModel from .base_audio_model import BaseAudioModel from .base_model import BaseModelBackend +from .cerebras_model import CerebrasModel from .cohere_model import CohereModel from .cometapi_model import CometAPIModel from .crynux_model import CrynuxModel @@ -71,6 +72,7 @@ 'GroqModel', 'StubModel', 'ZhipuAIModel', + 'CerebrasModel', 'CohereModel', 'CometAPIModel', 'ModelFactory', diff --git a/camel/models/cerebras_model.py b/camel/models/cerebras_model.py new file mode 100644 index 0000000000..a1859a0f0b --- /dev/null +++ b/camel/models/cerebras_model.py @@ -0,0 +1,83 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import os +from typing import Any, Dict, Optional, Union + +from camel.configs import CerebrasConfig +from camel.models.openai_compatible_model import OpenAICompatibleModel +from camel.types import ModelType +from camel.utils import ( + BaseTokenCounter, + api_keys_required, +) + + +class CerebrasModel(OpenAICompatibleModel): + r"""LLM API served by Cerebras in a unified + OpenAICompatibleModel interface. + + Args: + model_type (Union[ModelType, str]): Model for which a backend is + created. + model_config_dict (Optional[Dict[str, Any]], optional): A dictionary + that will be fed into:obj:`openai.ChatCompletion.create()`. + If:obj:`None`, :obj:`CerebrasConfig().as_dict()` will be used. + (default: :obj:`None`) + api_key (Optional[str], optional): The API key for authenticating + with the Cerebras service. (default: :obj:`None`). + url (Optional[str], optional): The url to the Cerebras service. + (default: :obj:`None`) + token_counter (Optional[BaseTokenCounter], optional): Token counter to + use for the model. If not provided, :obj:`OpenAITokenCounter( + ModelType.GPT_4O_MINI)` will be used. + (default: :obj:`None`) + timeout (Optional[float], optional): The timeout value in seconds for + API calls. If not provided, will fall back to the MODEL_TIMEOUT + environment variable or default to 180 seconds. + (default: :obj:`None`) + max_retries (int, optional): Maximum number of retries for API calls. + (default: :obj:`3`) + **kwargs (Any): Additional arguments to pass to the client + initialization. + """ + + @api_keys_required([("api_key", "CEREBRAS_API_KEY")]) + def __init__( + self, + model_type: Union[ModelType, str], + model_config_dict: Optional[Dict[str, Any]] = None, + api_key: Optional[str] = None, + url: Optional[str] = None, + token_counter: Optional[BaseTokenCounter] = None, + timeout: Optional[float] = None, + max_retries: int = 3, + **kwargs: Any, + ) -> None: + if model_config_dict is None: + model_config_dict = CerebrasConfig().as_dict() + api_key = api_key or os.environ.get("CEREBRAS_API_KEY") + url = url or os.environ.get( + "CEREBRAS_API_BASE_URL", "https://api.cerebras.ai/v1" + ) + timeout = timeout or float(os.environ.get("MODEL_TIMEOUT", 180)) + super().__init__( + model_type=model_type, + model_config_dict=model_config_dict, + api_key=api_key, + url=url, + token_counter=token_counter, + timeout=timeout, + max_retries=max_retries, + **kwargs, + ) diff --git a/camel/models/model_factory.py b/camel/models/model_factory.py index 6d05dcdfe0..ad9a0da89b 100644 --- a/camel/models/model_factory.py +++ b/camel/models/model_factory.py @@ -22,6 +22,7 @@ from camel.models.aws_bedrock_model import AWSBedrockModel from camel.models.azure_openai_model import AzureOpenAIModel from camel.models.base_model import BaseModelBackend +from camel.models.cerebras_model import CerebrasModel from camel.models.cohere_model import CohereModel from camel.models.cometapi_model import CometAPIModel from camel.models.crynux_model import CrynuxModel @@ -89,6 +90,7 @@ class ModelFactory: ModelPlatformType.AZURE: AzureOpenAIModel, ModelPlatformType.ANTHROPIC: AnthropicModel, ModelPlatformType.GROQ: GroqModel, + ModelPlatformType.CEREBRAS: CerebrasModel, ModelPlatformType.COMETAPI: CometAPIModel, ModelPlatformType.NEBIUS: NebiusModel, ModelPlatformType.LMSTUDIO: LMStudioModel, diff --git a/camel/types/enums.py b/camel/types/enums.py index 656ec643c9..61fdbb5fbf 100644 --- a/camel/types/enums.py +++ b/camel/types/enums.py @@ -91,6 +91,12 @@ class ModelType(UnifiedModelType, Enum): GROQ_MIXTRAL_8_7B = "mixtral-8x7b-32768" GROQ_GEMMA_2_9B_IT = "gemma2-9b-it" + # Cerebras platform models + CEREBRAS_GPT_OSS_120B = "gpt-oss-120b" + CEREBRAS_LLAMA_3_1_8B = "llama3.1-8b" + CEREBRAS_LLAMA_3_3_70B = "llama3.3-70b" + CEREBRAS_QWEN_3_32B = "qwen-3-32b" + # Nebius AI Studio platform models NEBIUS_GPT_OSS_120B = "gpt-oss-120b" NEBIUS_GPT_OSS_20B = "gpt-oss-20b" @@ -554,6 +560,7 @@ def support_native_tool_calling(self) -> bool: self.is_together, self.is_sambanova, self.is_groq, + self.is_cerebras, self.is_openrouter, self.is_lmstudio, self.is_sglang, @@ -696,6 +703,16 @@ def is_groq(self) -> bool: ModelType.GROQ_GEMMA_2_9B_IT, } + @property + def is_cerebras(self) -> bool: + r"""Returns whether this type of models is served by Cerebras.""" + return self in { + ModelType.CEREBRAS_GPT_OSS_120B, + ModelType.CEREBRAS_LLAMA_3_1_8B, + ModelType.CEREBRAS_LLAMA_3_3_70B, + ModelType.CEREBRAS_QWEN_3_32B, + } + @property def is_nebius(self) -> bool: r"""Returns whether this type of models is served by Nebius AI @@ -1165,6 +1182,7 @@ def token_limit(self) -> int: }: return 4_096 elif self in { + ModelType.CEREBRAS_LLAMA_3_1_8B, ModelType.GPT_4, ModelType.GROQ_LLAMA_3_8B, ModelType.GROQ_LLAMA_3_70B, @@ -1312,6 +1330,9 @@ def token_limit(self) -> int: return 32_768 elif self in { ModelType.MISTRAL_MIXTRAL_8x22B, + ModelType.CEREBRAS_GPT_OSS_120B, + ModelType.CEREBRAS_LLAMA_3_3_70B, + ModelType.CEREBRAS_QWEN_3_32B, ModelType.DEEPSEEK_CHAT, ModelType.DEEPSEEK_REASONER, ModelType.PPIO_DEEPSEEK_R1_TURBO, @@ -1801,6 +1822,7 @@ class ModelPlatformType(Enum): CRYNUX = "crynux" AIHUBMIX = "aihubmix" MINIMAX = "minimax" + CEREBRAS = "cerebras" @classmethod def from_name(cls, name): @@ -1991,6 +2013,11 @@ def is_minimax(self) -> bool: r"""Returns whether this platform is Minimax M2.""" return self is ModelPlatformType.MINIMAX + @property + def is_cerebras(self) -> bool: + r"""Returns whether this platform is Cerebras.""" + return self is ModelPlatformType.CEREBRAS + class AudioModelType(Enum): TTS_1 = "tts-1" diff --git a/examples/models/cerebras_model_example.py b/examples/models/cerebras_model_example.py new file mode 100644 index 0000000000..5734a2c483 --- /dev/null +++ b/examples/models/cerebras_model_example.py @@ -0,0 +1,48 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= + +from camel.agents import ChatAgent +from camel.configs import CerebrasConfig +from camel.models import ModelFactory +from camel.types import ModelPlatformType, ModelType + +# Define system message +model = ModelFactory.create( + model_platform=ModelPlatformType.CEREBRAS, + model_type=ModelType.CEREBRAS_LLAMA_3_3_70B, + model_config_dict=CerebrasConfig(temperature=0.2).as_dict(), +) + +sys_msg = "You are a helpful assistant." + +# Set agent +camel_agent = ChatAgent(system_message=sys_msg, model=model) + +user_msg = """Say hi to CAMEL AI, one open-source community + dedicated to the study of autonomous and communicative agents.""" + +# Get response information +response = camel_agent.step(user_msg) +print(response.msgs[0].content) + +''' +=============================================================================== +Hello to the CAMEL AI community. It's great to see a group of like-minded +individuals coming together to explore and advance the field of autonomous and +communicative agents. Your open-source approach is truly commendable, as it +fosters collaboration, innovation, and transparency. I'm excited to learn more +about your projects and initiatives, and I'm happy to help in any way I can. +Keep pushing the boundaries of AI research and development! +=============================================================================== +''' diff --git a/test/models/test_cerebras_model.py b/test/models/test_cerebras_model.py new file mode 100644 index 0000000000..4f78f64b03 --- /dev/null +++ b/test/models/test_cerebras_model.py @@ -0,0 +1,40 @@ +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= + +import pytest + +from camel.configs import CerebrasConfig +from camel.models import CerebrasModel +from camel.types import ModelType +from camel.utils import OpenAITokenCounter + + +@pytest.mark.model_backend +@pytest.mark.parametrize( + "model_type", + [ + ModelType.CEREBRAS_GPT_OSS_120B, + ModelType.CEREBRAS_LLAMA_3_1_8B, + ModelType.CEREBRAS_LLAMA_3_3_70B, + ModelType.CEREBRAS_QWEN_3_32B, + ], +) +def test_cerebras_model(model_type: ModelType): + model_config_dict = CerebrasConfig().as_dict() + model = CerebrasModel(model_type, model_config_dict) + assert model.model_type == model_type + assert model.model_config_dict == model_config_dict + assert isinstance(model.token_counter, OpenAITokenCounter) + assert isinstance(model.model_type.value_for_tiktoken, str) + assert isinstance(model.model_type.token_limit, int) From aae8454e36ba2c22d7d01f2d63e5d7261eeeb35c Mon Sep 17 00:00:00 2001 From: camel-docs-bot Date: Mon, 24 Nov 2025 13:45:11 +0000 Subject: [PATCH 04/10] Auto-update documentation after merge [skip ci] --- docs/mintlify/docs.json | 2 + .../camel.configs.cerebras_config.mdx | 27 ++++++++++++ .../reference/camel.models.cerebras_model.mdx | 41 +++++++++++++++++++ docs/mintlify/reference/camel.types.enums.mdx | 20 +++++++++ 4 files changed, 90 insertions(+) create mode 100644 docs/mintlify/reference/camel.configs.cerebras_config.mdx create mode 100644 docs/mintlify/reference/camel.models.cerebras_model.mdx diff --git a/docs/mintlify/docs.json b/docs/mintlify/docs.json index 4d125a9945..e09e23ca5c 100644 --- a/docs/mintlify/docs.json +++ b/docs/mintlify/docs.json @@ -224,6 +224,7 @@ "reference/camel.configs.anthropic_config", "reference/camel.configs.base_config", "reference/camel.configs.bedrock_config", + "reference/camel.configs.cerebras_config", "reference/camel.configs.cohere_config", "reference/camel.configs.cometapi_config", "reference/camel.configs.crynux_config", @@ -309,6 +310,7 @@ "reference/camel.models.azure_openai_model", "reference/camel.models.base_audio_model", "reference/camel.models.base_model", + "reference/camel.models.cerebras_model", "reference/camel.models.cohere_model", "reference/camel.models.cometapi_model", "reference/camel.models.crynux_model", diff --git a/docs/mintlify/reference/camel.configs.cerebras_config.mdx b/docs/mintlify/reference/camel.configs.cerebras_config.mdx new file mode 100644 index 0000000000..1da85e6a6d --- /dev/null +++ b/docs/mintlify/reference/camel.configs.cerebras_config.mdx @@ -0,0 +1,27 @@ + + + + +## CerebrasConfig + +```python +class CerebrasConfig(BaseConfig): +``` + +Defines the parameters for generating chat completions using Cerebras +compatibility. + +Reference: https://inference-docs.cerebras.ai/resources/openai + +**Parameters:** + +- **temperature** (float, optional): Sampling temperature to use, between :obj:`0` and :obj:`2`. Higher values make the output more random, while lower values make it more focused and deterministic. (default: :obj:`None`) +- **top_p** (float, optional): An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So :obj:`0.1` means only the tokens comprising the top 10% probability mass are considered. (default: :obj:`None`) +- **response_format** (object, optional): An object specifying the format that the model must output.Setting to `{"type": "json_object"}` enables JSON mode, which guarantees the message the model generates is valid JSON. (default: :obj:`None`) +- **stream** (bool, optional): If True, partial message deltas will be sent as data-only server-sent events as they become available. (default: :obj:`None`) +- **stop** (str or list, optional): Up to :obj:`4` sequences where the API will stop generating further tokens. (default: :obj:`None`) +- **max_tokens** (int, optional): The maximum number of tokens to generate in the chat completion. The total length of input tokens and generated tokens is limited by the model's context length. (default: :obj:`None`) +- **user** (str, optional): A unique identifier representing your end-user, which can help OpenAI to monitor and detect abuse. (default: :obj:`None`) +- **tools** (list[FunctionTool], optional): A list of tools the model may call. Currently, only functions are supported as a tool. Use this to provide a list of functions the model may generate JSON inputs for. A max of 128 functions are supported. +- **tool_choice** (Union[dict[str, str], str], optional): Controls which (if any) tool is called by the model. :obj:`"none"` means the model will not call any tool and instead generates a message. :obj:`"auto"` means the model can pick between generating a message or calling one or more tools. :obj:`"required"` means the model must call one or more tools. Specifying a particular tool via `{"type": "function", "function": {"name": "my_function"}}` forces the model to call that tool. :obj:`"none"` is the default when no tools are present. :obj:`"auto"` is the default if tools are present. +- **reasoning_effort** (str, optional): A parameter specifying the level of reasoning used by certain model types. Valid values are :obj: `"low"`, :obj:`"medium"`, or :obj:`"high"`. If set, it is only applied to the model types that support it (e.g., :obj:`o1`, :obj:`o1mini`, :obj:`o1preview`, :obj:`o3mini`). If not provided or if the model type does not support it, this parameter is ignored. (default: :obj:`None`) diff --git a/docs/mintlify/reference/camel.models.cerebras_model.mdx b/docs/mintlify/reference/camel.models.cerebras_model.mdx new file mode 100644 index 0000000000..7641bce74b --- /dev/null +++ b/docs/mintlify/reference/camel.models.cerebras_model.mdx @@ -0,0 +1,41 @@ + + + + +## CerebrasModel + +```python +class CerebrasModel(OpenAICompatibleModel): +``` + +LLM API served by Cerebras in a unified +OpenAICompatibleModel interface. + +**Parameters:** + +- **model_type** (Union[ModelType, str]): Model for which a backend is created. +- **model_config_dict** (Optional[Dict[str, Any]], optional): A dictionary that will be fed into:obj:`openai.ChatCompletion.create()`. +- **If**: obj:`None`, :obj:`CerebrasConfig().as_dict()` will be used. (default: :obj:`None`) +- **api_key** (Optional[str], optional): The API key for authenticating with the Cerebras service. (default: :obj:`None`). +- **url** (Optional[str], optional): The url to the Cerebras service. (default: :obj:`None`) +- **token_counter** (Optional[BaseTokenCounter], optional): Token counter to use for the model. If not provided, :obj:`OpenAITokenCounter( ModelType.GPT_4O_MINI)` will be used. (default: :obj:`None`) +- **timeout** (Optional[float], optional): The timeout value in seconds for API calls. If not provided, will fall back to the MODEL_TIMEOUT environment variable or default to 180 seconds. (default: :obj:`None`) +- **max_retries** (int, optional): Maximum number of retries for API calls. (default: :obj:`3`) **kwargs (Any): Additional arguments to pass to the client initialization. + + + +### __init__ + +```python +def __init__( + self, + model_type: Union[ModelType, str], + model_config_dict: Optional[Dict[str, Any]] = None, + api_key: Optional[str] = None, + url: Optional[str] = None, + token_counter: Optional[BaseTokenCounter] = None, + timeout: Optional[float] = None, + max_retries: int = 3, + **kwargs: Any +): +``` diff --git a/docs/mintlify/reference/camel.types.enums.mdx b/docs/mintlify/reference/camel.types.enums.mdx index 35da593bb2..7202627c9f 100644 --- a/docs/mintlify/reference/camel.types.enums.mdx +++ b/docs/mintlify/reference/camel.types.enums.mdx @@ -139,6 +139,16 @@ def is_groq(self): Returns whether this type of models is served by Groq. + + +### is_cerebras + +```python +def is_cerebras(self): +``` + +Returns whether this type of models is served by Cerebras. + ### is_nebius @@ -861,6 +871,16 @@ def is_minimax(self): Returns whether this platform is Minimax M2. + + +### is_cerebras + +```python +def is_cerebras(self): +``` + +Returns whether this platform is Cerebras. + ## AudioModelType From 2f773f35675a97bfbc873cfea679c691f05b00e1 Mon Sep 17 00:00:00 2001 From: Hesam Sheikh <41022652+hesamsheikh@users.noreply.github.com> Date: Mon, 24 Nov 2025 16:00:08 +0100 Subject: [PATCH 05/10] [enhance] Make Workflow Save Folder and Filename Semantic (#3388) Co-authored-by: Tao Sun <168447269+fengju0213@users.noreply.github.com> --- camel/agents/chat_agent.py | 363 ++++--- camel/societies/workforce/utils.py | 99 +- .../workforce/workflow_memory_manager.py | 660 +++++++++++- camel/societies/workforce/workforce.py | 205 ++-- camel/utils/context_utils.py | 109 +- .../workforce_workflow_memory_example.py | 19 +- test/workforce/test_workflow_memory.py | 974 +++++++++--------- 7 files changed, 1701 insertions(+), 728 deletions(-) diff --git a/camel/agents/chat_agent.py b/camel/agents/chat_agent.py index 3b75c238eb..97f7c7968e 100644 --- a/camel/agents/chat_agent.py +++ b/camel/agents/chat_agent.py @@ -1679,76 +1679,12 @@ def summarize( result["status"] = status_message return result - # Convert messages to conversation text - conversation_lines = [] - user_messages: List[str] = [] - for message in messages: - role = message.get('role', 'unknown') - content = message.get('content', '') - - # Skip summary messages if include_summaries is False - if not include_summaries and isinstance(content, str): - # Check if this is a summary message by looking for marker - if content.startswith('[CONTEXT_SUMMARY]'): - continue - - # Handle tool call messages (assistant calling tools) - tool_calls = message.get('tool_calls') - if tool_calls and isinstance(tool_calls, (list, tuple)): - for tool_call in tool_calls: - # Handle both dict and object formats - if isinstance(tool_call, dict): - func_name = tool_call.get('function', {}).get( - 'name', 'unknown_tool' - ) - func_args_str = tool_call.get('function', {}).get( - 'arguments', '{}' - ) - else: - # Handle object format (Pydantic or similar) - func_name = getattr( - getattr(tool_call, 'function', None), - 'name', - 'unknown_tool', - ) - func_args_str = getattr( - getattr(tool_call, 'function', None), - 'arguments', - '{}', - ) - - # Parse and format arguments for readability - try: - import json - - args_dict = json.loads(func_args_str) - args_formatted = ', '.join( - f"{k}={v}" for k, v in args_dict.items() - ) - except (json.JSONDecodeError, ValueError, TypeError): - args_formatted = func_args_str - - conversation_lines.append( - f"[TOOL CALL] {func_name}({args_formatted})" - ) - - # Handle tool response messages - elif role == 'tool': - tool_name = message.get('name', 'unknown_tool') - if not content: - content = str(message.get('content', '')) - conversation_lines.append( - f"[TOOL RESULT] {tool_name} → {content}" - ) - - # Handle regular content messages (user/assistant/system) - elif content: - content = str(content) - if role == 'user': - user_messages.append(content) - conversation_lines.append(f"{role}: {content}") - - conversation_text = "\n".join(conversation_lines).strip() + # build conversation text using shared helper + conversation_text, user_messages = ( + self._build_conversation_text_from_messages( + messages, include_summaries + ) + ) if not conversation_text: status_message = ( @@ -1887,6 +1823,216 @@ def summarize( result["status"] = error_message return result + def _build_conversation_text_from_messages( + self, + messages: List[Any], + include_summaries: bool = False, + ) -> tuple[str, List[str]]: + r"""Build conversation text from messages for summarization. + + This is a shared helper method that converts messages to a formatted + conversation text string, handling tool calls, tool results, and + regular messages. + + Args: + messages (List[Any]): List of messages to convert. + include_summaries (bool): Whether to include messages starting + with [CONTEXT_SUMMARY]. (default: :obj:`False`) + + Returns: + tuple[str, List[str]]: A tuple containing: + - Formatted conversation text + - List of user messages extracted from the conversation + """ + # Convert messages to conversation text + conversation_lines = [] + user_messages: List[str] = [] + for message in messages: + role = message.get('role', 'unknown') + content = message.get('content', '') + + # Skip summary messages if include_summaries is False + if not include_summaries and isinstance(content, str): + # Check if this is a summary message by looking for marker + if content.startswith('[CONTEXT_SUMMARY]'): + continue + + # Handle tool call messages (assistant calling tools) + tool_calls = message.get('tool_calls') + if tool_calls and isinstance(tool_calls, (list, tuple)): + for tool_call in tool_calls: + # Handle both dict and object formats + if isinstance(tool_call, dict): + func_name = tool_call.get('function', {}).get( + 'name', 'unknown_tool' + ) + func_args_str = tool_call.get('function', {}).get( + 'arguments', '{}' + ) + else: + # Handle object format (Pydantic or similar) + func_name = getattr( + getattr(tool_call, 'function', None), + 'name', + 'unknown_tool', + ) + func_args_str = getattr( + getattr(tool_call, 'function', None), + 'arguments', + '{}', + ) + + # Parse and format arguments for readability + try: + import json + + args_dict = json.loads(func_args_str) + args_formatted = ', '.join( + f"{k}={v}" for k, v in args_dict.items() + ) + except (json.JSONDecodeError, ValueError, TypeError): + args_formatted = func_args_str + + conversation_lines.append( + f"[TOOL CALL] {func_name}({args_formatted})" + ) + + # Handle tool response messages + elif role == 'tool': + tool_name = message.get('name', 'unknown_tool') + if not content: + content = str(message.get('content', '')) + conversation_lines.append( + f"[TOOL RESULT] {tool_name} → {content}" + ) + + # Handle regular content messages (user/assistant/system) + elif content: + content = str(content) + if role == 'user': + user_messages.append(content) + conversation_lines.append(f"{role}: {content}") + + return "\n".join(conversation_lines).strip(), user_messages + + async def generate_workflow_summary_async( + self, + summary_prompt: Optional[str] = None, + response_format: Optional[Type[BaseModel]] = None, + include_summaries: bool = False, + conversation_accumulator: Optional['ChatAgent'] = None, + ) -> Dict[str, Any]: + r"""Generate a workflow summary without saving to disk. + + This method generates a workflow summary by calling a dedicated + summarizer agent. It does NOT save to disk - only generates the + summary content and structured output. Use this when you need to + inspect the summary (e.g., extract agent_title) before determining + where to save it. + + Args: + summary_prompt (Optional[str]): Custom prompt for the summarizer. + response_format (Optional[Type[BaseModel]]): A Pydantic model + defining the expected structure (e.g., WorkflowSummary). + include_summaries (bool): Whether to include previously generated + summaries. (default: :obj:`False`) + conversation_accumulator (Optional[ChatAgent]): An optional agent + that holds accumulated conversation history. If provided, + memory will be retrieved from this agent instead of self. + (default: :obj:`None`) + + Returns: + Dict[str, Any]: Result dictionary with: + - structured_summary: Pydantic model instance + - summary_content: Raw text content + - status: "success" or error message + + """ + result: Dict[str, Any] = { + "structured_summary": None, + "summary_content": "", + "status": "", + } + + try: + # get conversation from accumulator or self + source_agent = ( + conversation_accumulator if conversation_accumulator else self + ) + messages, _ = source_agent.memory.get_context() + + if not messages: + result["status"] = "No conversation context available" + return result + + # build conversation text using shared helper + conversation_text, _ = self._build_conversation_text_from_messages( + messages, include_summaries + ) + + if not conversation_text: + result["status"] = "Conversation context is empty" + return result + + # create or reuse summarizer agent + if self._context_summary_agent is None: + self._context_summary_agent = ChatAgent( + system_message=( + "You are a helpful assistant that summarizes " + "conversations" + ), + model=self.model_backend, + agent_id=f"{self.agent_id}_context_summarizer", + ) + else: + self._context_summary_agent.reset() + + # prepare prompt + if summary_prompt: + prompt_text = ( + f"{summary_prompt.rstrip()}\n\n" + f"AGENT CONVERSATION TO BE SUMMARIZED:\n" + f"{conversation_text}" + ) + else: + prompt_text = build_default_summary_prompt(conversation_text) + + # call summarizer agent + if response_format: + response = await self._context_summary_agent.astep( + prompt_text, response_format=response_format + ) + else: + response = await self._context_summary_agent.astep(prompt_text) + + # handle streaming response + if isinstance(response, AsyncStreamingChatAgentResponse): + response = await response + + if not response.msgs: + result["status"] = "Failed to generate summary" + return result + + summary_content = response.msgs[-1].content.strip() + structured_output = None + if response_format and response.msgs[-1].parsed: + structured_output = response.msgs[-1].parsed + + result.update( + { + "structured_summary": structured_output, + "summary_content": summary_content, + "status": "success", + } + ) + return result + + except Exception as exc: + error_message = f"Failed to generate summary: {exc}" + logger.error(error_message) + result["status"] = error_message + return result + async def asummarize( self, filename: Optional[str] = None, @@ -1924,6 +2070,7 @@ async def asummarize( (full compression). (default: :obj:`False`) add_user_messages (bool): Whether add user messages to summary. (default: :obj:`True`) + Returns: Dict[str, Any]: A dictionary containing the summary text, file path, status message, and optionally structured_summary if @@ -1957,76 +2104,12 @@ async def asummarize( result["status"] = status_message return result - # Convert messages to conversation text - conversation_lines = [] - user_messages: List[str] = [] - for message in messages: - role = message.get('role', 'unknown') - content = message.get('content', '') - - # Skip summary messages if include_summaries is False - if not include_summaries and isinstance(content, str): - # Check if this is a summary message by looking for marker - if content.startswith('[CONTEXT_SUMMARY]'): - continue - - # Handle tool call messages (assistant calling tools) - tool_calls = message.get('tool_calls') - if tool_calls and isinstance(tool_calls, (list, tuple)): - for tool_call in tool_calls: - # Handle both dict and object formats - if isinstance(tool_call, dict): - func_name = tool_call.get('function', {}).get( - 'name', 'unknown_tool' - ) - func_args_str = tool_call.get('function', {}).get( - 'arguments', '{}' - ) - else: - # Handle object format (Pydantic or similar) - func_name = getattr( - getattr(tool_call, 'function', None), - 'name', - 'unknown_tool', - ) - func_args_str = getattr( - getattr(tool_call, 'function', None), - 'arguments', - '{}', - ) - - # Parse and format arguments for readability - try: - import json - - args_dict = json.loads(func_args_str) - args_formatted = ', '.join( - f"{k}={v}" for k, v in args_dict.items() - ) - except (json.JSONDecodeError, ValueError, TypeError): - args_formatted = func_args_str - - conversation_lines.append( - f"[TOOL CALL] {func_name}({args_formatted})" - ) - - # Handle tool response messages - elif role == 'tool': - tool_name = message.get('name', 'unknown_tool') - if not content: - content = str(message.get('content', '')) - conversation_lines.append( - f"[TOOL RESULT] {tool_name} → {content}" - ) - - # Handle regular content messages (user/assistant/system) - elif content: - content = str(content) - if role == 'user': - user_messages.append(content) - conversation_lines.append(f"{role}: {content}") - - conversation_text = "\n".join(conversation_lines).strip() + # build conversation text using shared helper + conversation_text, user_messages = ( + self._build_conversation_text_from_messages( + messages, include_summaries + ) + ) if not conversation_text: status_message = ( diff --git a/camel/societies/workforce/utils.py b/camel/societies/workforce/utils.py index 35e0aa22c5..74d9895951 100644 --- a/camel/societies/workforce/utils.py +++ b/camel/societies/workforce/utils.py @@ -17,6 +17,99 @@ from pydantic import BaseModel, Field, field_validator +# generic role names that should trigger fallback in role identification +# used for workflow organization to avoid using generic names as folder names +GENERIC_ROLE_NAMES = frozenset( + {'assistant', 'agent', 'user', 'system', 'worker', 'helper'} +) + + +def is_generic_role_name(role_name: str) -> bool: + r"""Check if a role name is generic and should trigger fallback logic. + + Generic role names are common, non-specific identifiers that don't + provide meaningful information about an agent's actual purpose. + When a role name is generic, fallback logic should be used to find + a more specific identifier (e.g., from LLM-generated agent_title + or description). + + Args: + role_name (str): The role name to check (will be converted to + lowercase for case-insensitive comparison). + + Returns: + bool: True if the role name is generic, False otherwise. + + Example: + >>> is_generic_role_name("assistant") + True + >>> is_generic_role_name("data_analyst") + False + >>> is_generic_role_name("AGENT") + True + """ + return role_name.lower() in GENERIC_ROLE_NAMES + + +class WorkflowMetadata(BaseModel): + r"""Pydantic model for workflow metadata tracking. + + This model defines the formal schema for workflow metadata that tracks + versioning, timestamps, and contextual information about saved workflows. + Used to maintain workflow history and enable proper version management. + """ + + session_id: str = Field( + description="Session identifier for the workflow execution" + ) + working_directory: str = Field( + description="Directory path where the workflow is stored" + ) + created_at: str = Field( + description="ISO timestamp when workflow was first created" + ) + updated_at: str = Field( + description="ISO timestamp of last modification to the workflow" + ) + workflow_version: int = Field( + default=1, description="Version number, increments on updates" + ) + agent_id: str = Field( + description="UUID of the agent that created/updated the workflow" + ) + message_count: int = Field( + description="Number of messages in the workflow conversation" + ) + + +class WorkflowConfig(BaseModel): + r"""Configuration for workflow memory management. + + Centralizes all workflow-related configuration options to avoid scattered + settings across multiple files and methods. + """ + + max_workflows_per_role: int = Field( + default=100, + description="Maximum number of workflows to keep per role folder", + ) + workflow_filename_suffix: str = Field( + default="_workflow", + description="Suffix appended to workflow filenames", + ) + workflow_folder_name: str = Field( + default="workforce_workflows", + description="Base folder name for storing workflows", + ) + enable_versioning: bool = Field( + default=True, + description="Whether to track workflow versions", + ) + default_max_files_to_load: int = Field( + default=3, + description="Default maximum number of workflow files to load", + ) + class WorkerConf(BaseModel): r"""The configuration of a worker.""" @@ -162,8 +255,7 @@ class TaskAnalysisResult(BaseModel): # Common fields - always populated reasoning: str = Field( - description="Explanation for the analysis result or recovery " - "decision" + description="Explanation for the analysis result or recovery decision" ) recovery_strategy: Optional[RecoveryStrategy] = Field( @@ -625,8 +717,7 @@ def wrapper(self, *args, **kwargs): # This should not be reached, but just in case if handle_exceptions: logger.error( - f"Unexpected failure in {func.__name__}: " - f"{last_exception}" + f"Unexpected failure in {func.__name__}: {last_exception}" ) return None else: diff --git a/camel/societies/workforce/workflow_memory_manager.py b/camel/societies/workforce/workflow_memory_manager.py index 0212ae1366..769ef980a4 100644 --- a/camel/societies/workforce/workflow_memory_manager.py +++ b/camel/societies/workforce/workflow_memory_manager.py @@ -24,6 +24,11 @@ from camel.societies.workforce.structured_output_handler import ( StructuredOutputHandler, ) +from camel.societies.workforce.utils import ( + WorkflowConfig, + WorkflowMetadata, + is_generic_role_name, +) from camel.types import OpenAIBackendRole from camel.utils.context_utils import ContextUtility, WorkflowSummary @@ -60,6 +65,11 @@ class WorkflowMemoryManager: description (str): Description of the worker's role. context_utility (Optional[ContextUtility]): Shared context utility for workflow operations. If None, creates a new instance. + role_identifier (Optional[str]): Role identifier for organizing + workflows by role. If provided, workflows will be stored in + role-based folders. If None, uses default workforce context. + config (Optional[WorkflowConfig]): Configuration for workflow + management. If None, uses default configuration. """ def __init__( @@ -67,6 +77,8 @@ def __init__( worker: ChatAgent, description: str, context_utility: Optional[ContextUtility] = None, + role_identifier: Optional[str] = None, + config: Optional[WorkflowConfig] = None, ): # validate worker type at initialization if not isinstance(worker, ChatAgent): @@ -78,35 +90,276 @@ def __init__( self.worker = worker self.description = description self._context_utility = context_utility + self._role_identifier = role_identifier + self.config = config if config is not None else WorkflowConfig() def _get_context_utility(self) -> ContextUtility: - r"""Get context utility with lazy initialization.""" + r"""Get context utility with lazy initialization. + + Uses role-based context if role_identifier is set, otherwise falls + back to default workforce shared context. + """ if self._context_utility is None: - self._context_utility = ContextUtility.get_workforce_shared() + if self._role_identifier: + self._context_utility = ( + ContextUtility.get_workforce_shared_by_role( + self._role_identifier + ) + ) + else: + self._context_utility = ContextUtility.get_workforce_shared() return self._context_utility + def _extract_existing_workflow_metadata( + self, file_path: Path + ) -> Optional[WorkflowMetadata]: + r"""Extract metadata from an existing workflow file for versioning. + + This method reads the metadata section from an existing workflow + markdown file to retrieve version number and creation timestamp, + enabling proper version tracking when updating workflows. + + Args: + file_path (Path): Path to the existing workflow file. + + Returns: + Optional[WorkflowMetadata]: WorkflowMetadata instance if file + exists and metadata is successfully parsed, None otherwise. + """ + try: + # check if parent directory exists first + if not file_path.parent.exists(): + return None + + if not file_path.exists(): + return None + + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + + # extract metadata section + metadata_match = re.search( + r'## Metadata\s*\n(.*?)(?:\n##|$)', content, re.DOTALL + ) + + if not metadata_match: + return None + + metadata_section = metadata_match.group(1).strip() + + # parse metadata lines (format: "- key: value") + metadata_dict: Dict[str, Any] = {} + for line in metadata_section.split('\n'): + line = line.strip() + if line.startswith('-'): + # remove leading "- " and split on first ":" + line = line[1:].strip() + if ':' in line: + key, value = line.split(':', 1) + key = key.strip() + value = value.strip() + + # convert workflow_version to int + if key == 'workflow_version': + try: + metadata_dict[key] = int(value) + except ValueError: + metadata_dict[key] = 1 + # convert message_count to int + elif key == 'message_count': + try: + metadata_dict[key] = int(value) + except ValueError: + metadata_dict[key] = 0 + else: + metadata_dict[key] = value + + # create WorkflowMetadata instance if we have required fields + required_fields = { + 'session_id', + 'working_directory', + 'created_at', + 'agent_id', + } + if not required_fields.issubset(metadata_dict.keys()): + logger.warning( + f"Existing workflow missing required metadata fields: " + f"{file_path}" + ) + return None + + # ensure we have updated_at and workflow_version + if 'updated_at' not in metadata_dict: + metadata_dict['updated_at'] = metadata_dict['created_at'] + if 'workflow_version' not in metadata_dict: + metadata_dict['workflow_version'] = 1 + if 'message_count' not in metadata_dict: + metadata_dict['message_count'] = 0 + + return WorkflowMetadata(**metadata_dict) + + except Exception as e: + logger.warning( + f"Error extracting workflow metadata from {file_path}: {e}" + ) + return None + + def _try_role_based_loading( + self, + role_name: str, + pattern: Optional[str], + max_files_to_load: int, + use_smart_selection: bool, + ) -> bool: + r"""Try loading workflows from role-based directory structure. + + Args: + role_name (str): Role name to load workflows from. + pattern (Optional[str]): Custom search pattern for workflow files. + max_files_to_load (int): Maximum number of workflow files to load. + use_smart_selection (bool): Whether to use agent-based selection. + + Returns: + bool: True if workflows were successfully loaded, False otherwise. + """ + logger.info(f"Attempting to load workflows for role: {role_name}") + + loaded = self.load_workflows_by_role( + role_name=role_name, + pattern=pattern, + max_files_to_load=max_files_to_load, + use_smart_selection=use_smart_selection, + ) + + return loaded + + def _try_session_based_loading( + self, + session_id: str, + role_name: str, + pattern: Optional[str], + max_files_to_load: int, + use_smart_selection: bool, + ) -> bool: + r"""Try loading workflows from session-based directory (deprecated). + + Args: + session_id (str): Workforce session ID to load from. + role_name (str): Role name (for deprecation warning). + pattern (Optional[str]): Custom search pattern for workflow files. + max_files_to_load (int): Maximum number of workflow files to load. + use_smart_selection (bool): Whether to use agent-based selection. + + Returns: + bool: True if workflows were successfully loaded, False otherwise. + """ + import warnings + + warnings.warn( + f"Session-based workflow loading " + f"(session_id={session_id}) is deprecated. " + f"Workflows are now organized by role in " + f"workforce_workflows/{{role_name}}/ folders. " + f"No workflows found for role '{role_name}'.", + FutureWarning, + stacklevel=2, + ) + + logger.info( + f"Falling back to session-based loading for " + f"session_id={session_id}" + ) + + if use_smart_selection: + return self._session_based_smart_loading( + session_id, max_files_to_load + ) + else: + return self._session_based_pattern_loading( + pattern, session_id, max_files_to_load + ) + + def _session_based_smart_loading( + self, session_id: str, max_files_to_load: int + ) -> bool: + r"""Load workflows from session using smart selection. + + Args: + session_id (str): Session ID to load from. + max_files_to_load (int): Maximum number of files to load. + + Returns: + bool: True if workflows were loaded, False otherwise. + """ + context_util = self._get_context_utility() + workflows_metadata = context_util.get_all_workflows_info(session_id) + + if workflows_metadata: + selected_files, selection_method = self._select_relevant_workflows( + workflows_metadata, + max_files_to_load, + session_id, + ) + + if selected_files: + logger.info( + f"Workflow selection method: {selection_method.value}" + ) + loaded_count = self._load_workflow_files( + selected_files, max_files_to_load + ) + return loaded_count > 0 + + return False + + def _session_based_pattern_loading( + self, + pattern: Optional[str], + session_id: str, + max_files_to_load: int, + ) -> bool: + r"""Load workflows from session using pattern matching. + + Args: + pattern (Optional[str]): Pattern for file matching. + session_id (str): Session ID to load from. + max_files_to_load (int): Maximum number of files to load. + + Returns: + bool: True if workflows were loaded, False otherwise. + """ + workflow_files = self._find_workflow_files(pattern, session_id) + if workflow_files: + loaded_count = self._load_workflow_files( + workflow_files, max_files_to_load + ) + return loaded_count > 0 + + return False + def load_workflows( self, pattern: Optional[str] = None, - max_files_to_load: int = 3, + max_files_to_load: Optional[int] = None, session_id: Optional[str] = None, use_smart_selection: bool = True, ) -> bool: r"""Load workflow memories using intelligent agent-based selection. - This method uses the worker agent to intelligently select the most - relevant workflows based on workflow information (title, description, - tags) rather than simple filename pattern matching. + This method first tries to load workflows from the role-based folder + structure. If no workflows are found and session_id is provided, falls + back to session-based loading (deprecated). Args: pattern (Optional[str]): Legacy parameter for backward compatibility. When use_smart_selection=False, uses this pattern for file matching. Ignored when smart selection is enabled. - max_files_to_load (int): Maximum number of workflow files to load. - (default: :obj:`3`) - session_id (Optional[str]): Specific workforce session ID to load - from. If None, searches across all sessions. + max_files_to_load (Optional[int]): Maximum number of workflow files + to load. If None, uses config.default_max_files_to_load. + (default: :obj:`None`) + session_id (Optional[str]): Deprecated. Specific workforce session + ID to load from using legacy session-based organization. (default: :obj:`None`) use_smart_selection (bool): Whether to use agent-based intelligent workflow selection. When True, uses workflow @@ -115,36 +368,142 @@ def load_workflows( Returns: bool: True if workflow memories were successfully loaded, False - otherwise. + otherwise. Check logs for detailed error messages. """ try: + # use config default if not specified + if max_files_to_load is None: + max_files_to_load = self.config.default_max_files_to_load + # reset system message to original state before loading # this prevents duplicate workflow context on multiple calls self.worker.reset_to_original_system_message() + # determine role name to use + role_name = ( + self._role_identifier + if self._role_identifier + else self._get_sanitized_role_name() + ) + + # try role-based loading first + loaded = self._try_role_based_loading( + role_name, pattern, max_files_to_load, use_smart_selection + ) + + if loaded: + logger.info( + f"Successfully loaded workflows for role '{role_name}'" + ) + return True + + # fallback to session-based if session_id is provided + if session_id is not None: + loaded = self._try_session_based_loading( + session_id, + role_name, + pattern, + max_files_to_load, + use_smart_selection, + ) + if loaded: + logger.info( + f"Successfully loaded workflows from session " + f"'{session_id}' (deprecated)" + ) + return True + + logger.info( + f"No workflow files found for role '{role_name}'. " + f"This may be expected if no workflows have been saved yet." + ) + return False + + except Exception as e: + logger.error( + f"Error loading workflow memories for " + f"{self.description}: {e!s}", + exc_info=True, + ) + return False + + def load_workflows_by_role( + self, + role_name: Optional[str] = None, + pattern: Optional[str] = None, + max_files_to_load: Optional[int] = None, + use_smart_selection: bool = True, + ) -> bool: + r"""Load workflow memories from role-based directory structure. + + This method loads workflows from the new role-based folder structure: + workforce_workflows/{role_name}/*.md + + Args: + role_name (Optional[str]): Role name to load workflows from. If + None, uses the worker's role_name or role_identifier. + pattern (Optional[str]): Custom search pattern for workflow files. + Ignored when use_smart_selection=True. + max_files_to_load (Optional[int]): Maximum number of workflow files + to load. If None, uses config.default_max_files_to_load. + (default: :obj:`None`) + use_smart_selection (bool): Whether to use agent-based + intelligent workflow selection. When True, uses workflow + information and LLM to select most relevant workflows. When + False, falls back to pattern matching. (default: :obj:`True`) + + Returns: + bool: True if workflow memories were successfully loaded, False + otherwise. + """ + try: + # use config default if not specified + if max_files_to_load is None: + max_files_to_load = self.config.default_max_files_to_load + + # reset system message to original state before loading + self.worker.reset_to_original_system_message() + + # determine role name to use + if role_name is None: + role_name = ( + self._role_identifier or self._get_sanitized_role_name() + ) + # determine which selection method to use if use_smart_selection: # smart selection: use workflow information and agent # intelligence context_util = self._get_context_utility() - workflows_metadata = context_util.get_all_workflows_info( - session_id + + # find workflow files in role-based directory + workflow_files = self._find_workflow_files_by_role( + role_name, pattern ) + # get workflow metadata for smart selection + workflows_metadata = [] + for file_path in workflow_files: + metadata = context_util.extract_workflow_info(file_path) + if metadata: + workflows_metadata.append(metadata) + if not workflows_metadata: - logger.info("No workflow files found") + logger.info( + f"No workflow files found for role: {role_name}" + ) return False # use agent to select most relevant workflows selected_files, selection_method = ( self._select_relevant_workflows( - workflows_metadata, max_files_to_load, session_id + workflows_metadata, max_files_to_load ) ) if not selected_files: logger.info( - f"No workflows selected " + f"No workflows selected for role {role_name} " f"(method: {selection_method.value})" ) return False @@ -161,8 +520,14 @@ def load_workflows( else: # legacy pattern matching approach - workflow_files = self._find_workflow_files(pattern, session_id) + workflow_files = self._find_workflow_files_by_role( + role_name, pattern + ) + if not workflow_files: + logger.info( + f"No workflow files found for role: {role_name}" + ) return False loaded_count = self._load_workflow_files( @@ -173,14 +538,13 @@ def load_workflows( if loaded_count > 0: logger.info( f"Successfully loaded {loaded_count} workflow file(s) for " - f"{self.description}" + f"role {role_name}" ) return loaded_count > 0 except Exception as e: logger.warning( - f"Error loading workflow memories for {self.description}: " - f"{e!s}" + f"Error loading workflow memories for role {role_name}: {e!s}" ) return False @@ -216,12 +580,7 @@ def save_workflow( # check if we should use role_name or let summarize extract # task_title clean_name = self._get_sanitized_role_name() - use_role_name_for_filename = clean_name not in { - 'assistant', - 'agent', - 'user', - 'system', - } + use_role_name_for_filename = not is_generic_role_name(clean_name) # if role_name is explicit, use it for filename # if role_name is generic, pass none to let summarize use @@ -267,6 +626,151 @@ def save_workflow( "message": f"Failed to save workflow memories: {e!s}", } + async def save_workflow_content_async( + self, + workflow_summary: 'WorkflowSummary', + context_utility: Optional[ContextUtility] = None, + conversation_accumulator: Optional[ChatAgent] = None, + ) -> Dict[str, Any]: + r"""Save a pre-generated workflow summary to disk. + + This method takes a pre-generated WorkflowSummary object and saves + it to disk using the provided context utility. It does NOT call the + LLM - just formats and saves the content. Use this for two-pass + workflows where the summary is generated first, then saved to a + location determined by the summary content. + + Args: + workflow_summary (WorkflowSummary): Pre-generated workflow summary + object containing task_title, agent_title, etc. + context_utility (Optional[ContextUtility]): Context utility with + correct working directory. If None, uses default. + conversation_accumulator (Optional[ChatAgent]): An optional agent + that holds accumulated conversation history. Used to get + accurate message_count metadata. (default: :obj:`None`) + + Returns: + Dict[str, Any]: Result dictionary with keys: + - status (str): "success" or "error" + - summary (str): Formatted workflow summary + - file_path (str): Path to saved file + - worker_description (str): Worker description used + """ + + def _create_error_result(message: str) -> Dict[str, Any]: + """helper to create error result dict.""" + return { + "status": "error", + "summary": "", + "file_path": None, + "worker_description": self.description, + "message": message, + } + + try: + # validate workflow_summary input + if not workflow_summary: + return _create_error_result("workflow_summary is required") + + # validate required fields exist + if not hasattr(workflow_summary, 'task_title'): + return _create_error_result( + "workflow_summary must have task_title field" + ) + + if not hasattr(workflow_summary, 'agent_title'): + return _create_error_result( + "workflow_summary must have agent_title field" + ) + + # validate agent_title is not empty + agent_title = getattr(workflow_summary, 'agent_title', '').strip() + if not agent_title: + return _create_error_result( + "workflow_summary.agent_title cannot be empty" + ) + + # use provided context utility or get default + if context_utility is None: + context_utility = self._get_context_utility() + + # set context utility on worker + self.worker.set_context_utility(context_utility) + + # determine filename from task_title + task_title = workflow_summary.task_title + clean_title = ContextUtility.sanitize_workflow_filename(task_title) + base_filename = ( + f"{clean_title}{self.config.workflow_filename_suffix}" + if clean_title + else "workflow" + ) + + # check if workflow file already exists to handle versioning + file_path = ( + context_utility.get_working_directory() / f"{base_filename}.md" + ) + existing_metadata = self._extract_existing_workflow_metadata( + file_path + ) + + # build metadata - get message count from accumulator if available + source_agent = ( + conversation_accumulator + if conversation_accumulator + else self.worker + ) + + # determine version and created_at based on existing metadata + # only increment version if versioning is enabled + if self.config.enable_versioning and existing_metadata: + workflow_version = existing_metadata.workflow_version + 1 + created_at = existing_metadata.created_at + else: + workflow_version = 1 + created_at = None + + metadata = context_utility.get_session_metadata( + workflow_version=workflow_version, created_at=created_at + ) + metadata.update( + { + "agent_id": self.worker.agent_id, + "message_count": len(source_agent.memory.get_context()[0]), + } + ) + + # convert WorkflowSummary to markdown + summary_content = context_utility.structured_output_to_markdown( + structured_data=workflow_summary, metadata=metadata + ) + + # save to disk + save_status = context_utility.save_markdown_file( + base_filename, + summary_content, + ) + + # format summary with context prefix + formatted_summary = ( + f"[CONTEXT_SUMMARY] The following is a summary of our " + f"conversation from a previous session: {summary_content}" + ) + + return { + "status": "success" + if save_status == "success" + else save_status, + "summary": formatted_summary, + "file_path": str(file_path), + "worker_description": self.description, + } + + except Exception as e: + return _create_error_result( + f"Failed to save workflow content: {e!s}" + ) + async def save_workflow_async( self, conversation_accumulator: Optional[ChatAgent] = None ) -> Dict[str, Any]: @@ -315,12 +819,7 @@ async def save_workflow_async( # check if we should use role_name or let asummarize extract # task_title clean_name = self._get_sanitized_role_name() - use_role_name_for_filename = clean_name not in { - 'assistant', - 'agent', - 'user', - 'system', - } + use_role_name_for_filename = not is_generic_role_name(clean_name) # generate and save workflow summary # if role_name is explicit, use it for filename @@ -511,6 +1010,11 @@ def _find_workflow_files( ) -> List[str]: r"""Find and return sorted workflow files matching the pattern. + .. note:: + Session-based workflow search will be deprecated in a future + version. Consider using :meth:`_find_workflow_files_by_role` for + role-based organization instead. + Args: pattern (Optional[str]): Custom search pattern for workflow files. If None, uses worker role_name to generate pattern. @@ -521,27 +1025,40 @@ def _find_workflow_files( List[str]: Sorted list of workflow file paths (empty if validation fails). """ + import warnings + + warnings.warn( + "Session-based workflow search is deprecated and will be removed " + "in a future version. Consider using load_workflows_by_role() for " + "role-based organization instead.", + FutureWarning, + stacklevel=2, + ) + # generate filename-safe search pattern from worker role name if pattern is None: # get sanitized role name clean_name = self._get_sanitized_role_name() # check if role_name is generic - generic_names = {'assistant', 'agent', 'user', 'system'} - if clean_name in generic_names: + if is_generic_role_name(clean_name): # for generic role names, search for all workflow files # since filename is based on task_title - pattern = "*_workflow*.md" + pattern = f"*{self.config.workflow_filename_suffix}*.md" else: # for explicit role names, search for role-specific files - pattern = f"{clean_name}_workflow*.md" + pattern = ( + f"{clean_name}{self.config.workflow_filename_suffix}*.md" + ) - # get the base workforce_workflows directory + # get the base workflow directory from config camel_workdir = os.environ.get("CAMEL_WORKDIR") if camel_workdir: - base_dir = os.path.join(camel_workdir, "workforce_workflows") + base_dir = os.path.join( + camel_workdir, self.config.workflow_folder_name + ) else: - base_dir = "workforce_workflows" + base_dir = self.config.workflow_folder_name # search for workflow files in specified or all session directories if session_id: @@ -564,6 +1081,64 @@ def extract_session_timestamp(filepath: str) -> str: workflow_files.sort(key=extract_session_timestamp, reverse=True) return workflow_files + def _find_workflow_files_by_role( + self, role_name: Optional[str] = None, pattern: Optional[str] = None + ) -> List[str]: + r"""Find workflow files in role-based directory structure. + + This method searches for workflows in the new role-based folder + structure: workforce_workflows/{role_name}/*.md + + Args: + role_name (Optional[str]): Role name to search for. If None, + uses the worker's role_name or role_identifier. + pattern (Optional[str]): Custom search pattern for workflow files. + If None, searches for all workflow files in the role directory. + + Returns: + List[str]: Sorted list of workflow file paths by modification time + (most recent first). + """ + # determine role name to use + if role_name is None: + role_name = ( + self._role_identifier or self._get_sanitized_role_name() + ) + + # sanitize role name for filesystem use + clean_role = ContextUtility.sanitize_workflow_filename(role_name) + if not clean_role: + clean_role = "unknown_role" + + # get the base workflow directory from config + camel_workdir = os.environ.get("CAMEL_WORKDIR") + if camel_workdir: + base_dir = os.path.join( + camel_workdir, self.config.workflow_folder_name, clean_role + ) + else: + base_dir = os.path.join( + self.config.workflow_folder_name, clean_role + ) + + # use provided pattern or default to all workflow files + if pattern is None: + pattern = f"*{self.config.workflow_filename_suffix}*.md" + + # search for workflow files in role directory + search_path = str(Path(base_dir) / pattern) + workflow_files = glob.glob(search_path) + + if not workflow_files: + logger.info( + f"No workflow files found in role directory: {base_dir}" + ) + return [] + + # sort by file modification time (most recent first) + workflow_files.sort(key=os.path.getmtime, reverse=True) + return workflow_files + def _collect_workflow_contents( self, workflow_files: List[str] ) -> List[Dict[str, str]]: @@ -655,6 +1230,9 @@ def _format_workflows_for_context( f"{workflow_data['content']}" ) + # add clear ending marker + combined_content += "\n\n--- End of Previous Workflows ---\n" + return combined_content def _add_workflows_to_system_message(self, workflow_context: str) -> bool: @@ -755,10 +1333,10 @@ def _generate_workflow_filename(self) -> str: Returns: str: Sanitized filename without timestamp and without .md - extension. Format: {role_name}_workflow + extension. Format: {role_name}{workflow_filename_suffix} """ clean_name = self._get_sanitized_role_name() - return f"{clean_name}_workflow" + return f"{clean_name}{self.config.workflow_filename_suffix}" def _prepare_workflow_prompt(self) -> str: r"""Prepare the structured prompt for workflow summarization. diff --git a/camel/societies/workforce/workforce.py b/camel/societies/workforce/workforce.py index e393fc068a..e4a5977c56 100644 --- a/camel/societies/workforce/workforce.py +++ b/camel/societies/workforce/workforce.py @@ -41,7 +41,7 @@ from .workforce_metrics import WorkforceMetrics if TYPE_CHECKING: - from camel.utils.context_utils import ContextUtility + from camel.utils.context_utils import ContextUtility, WorkflowSummary from colorama import Fore @@ -578,6 +578,54 @@ def _get_or_create_shared_context_utility( ) return self._shared_context_utility + def _get_role_identifier( + self, + worker: ChatAgent, + description: str, + workflow_summary: Optional['WorkflowSummary'] = None, + ) -> str: + r"""Extract role identifier for organizing workflows. + + Uses priority fallback: role_name → agent_title (from + WorkflowSummary) → sanitized description. + + Args: + worker (ChatAgent): The worker agent to extract role from. + description (str): Worker description to use as fallback. + workflow_summary (Optional[WorkflowSummary]): Optional + WorkflowSummary object that may contain agent_title field. + + Returns: + str: Role identifier for organizing workflows. + """ + from camel.societies.workforce.utils import is_generic_role_name + from camel.utils.context_utils import ContextUtility + + # try worker.role_name first (if not generic) + if hasattr(worker, 'role_name') and worker.role_name: + clean_name = ContextUtility.sanitize_workflow_filename( + worker.role_name + ) + if clean_name and not is_generic_role_name(clean_name): + return clean_name + + # try agent_title from WorkflowSummary (LLM-generated) + if workflow_summary and hasattr(workflow_summary, 'agent_title'): + agent_title = workflow_summary.agent_title + clean_title = ContextUtility.sanitize_workflow_filename( + agent_title + ) + if clean_title and not is_generic_role_name(clean_title): + return clean_title + + # fallback to sanitized truncated description + # truncate long descriptions + max_length = 30 + if len(description) > max_length: + description = description[:max_length] + clean_desc = ContextUtility.sanitize_workflow_filename(description) + return clean_desc if clean_desc else "unknown_role" + def _validate_agent_compatibility( self, agent: ChatAgent, agent_context: str = "agent" ) -> None: @@ -2786,7 +2834,6 @@ def reset(self) -> None: def save_workflow_memories( self, - session_id: Optional[str] = None, ) -> Dict[str, str]: r"""Save workflow memories for all SingleAgentWorker instances in the workforce. @@ -2804,11 +2851,8 @@ def save_workflow_memories( method. Other worker types are skipped. - Args: - session_id (Optional[str]): Custom session ID to use for saving - workflows. If None, auto-generates a timestamped session ID. - Useful for organizing workflows by project or context. - (default: :obj:`None`) + Workflows are organized by agent role in role-based folders: + workforce_workflows/{role_name}/*.md Returns: Dict[str, str]: Dictionary mapping worker node IDs to save results. @@ -2818,15 +2862,12 @@ def save_workflow_memories( Example: >>> workforce = Workforce("My Team") >>> # ... add workers and process tasks ... - >>> # save with auto-generated session id >>> results = workforce.save_workflow_memories() >>> print(results) - {'worker_123': '/path/to/developer_agent_workflow.md', + {'worker_123': 'workforce_workflows/developer/task_workflow.md', 'worker_456': 'error: No conversation context available'} - >>> # save with custom project id - >>> results = workforce.save_workflow_memories( - ... session_id="project_123" - ... ) + >>> # preferred: use async version + >>> results = await workforce.save_workflow_memories_async() Note: For better performance with multiple workers, use the async @@ -2838,56 +2879,22 @@ def save_workflow_memories( :meth:`save_workflow_memories_async`: Async version with parallel processing for significantly better performance. """ + import asyncio import warnings warnings.warn( - "save_workflow_memories() is slow for multiple workers. " - "Consider using save_workflow_memories_async() for parallel " - "processing and ~4x faster performance.", + "save_workflow_memories() is deprecated and slow for multiple " + "workers. Use save_workflow_memories_async() instead for parallel " + "processing and significantly better performance.", DeprecationWarning, stacklevel=2, ) - results = {} - - # Get or create shared context utility for this save operation - shared_context_utility = self._get_or_create_shared_context_utility( - session_id=session_id - ) - - for child in self._children: - if isinstance(child, SingleAgentWorker): - try: - # Set the shared context utility for this operation - child._shared_context_utility = shared_context_utility - child.worker.set_context_utility(shared_context_utility) - - result = child.save_workflow_memories() - if result.get("status") == "success": - results[child.node_id] = result.get( - "file_path", "unknown_path" - ) - else: - # Error: check if there's a separate message field, - # otherwise use the status itself - error_msg = result.get( - "message", result.get("status", "Unknown error") - ) - results[child.node_id] = f"error: {error_msg}" - except Exception as e: - results[child.node_id] = f"error: {e!s}" - else: - # Skip non-SingleAgentWorker types - results[child.node_id] = ( - f"skipped: {type(child).__name__} not supported" - ) - - logger.info(f"Workflow save completed for {len(results)} workers") - return results + # delegate to async version using asyncio + return asyncio.run(self.save_workflow_memories_async()) async def save_workflow_memories_async( self, - session_id: Optional[str] = None, ) -> Dict[str, str]: r"""Asynchronously save workflow memories for all SingleAgentWorker instances in the workforce. @@ -2901,11 +2908,8 @@ async def save_workflow_memories_async( save_workflow_memories_async() method in parallel. Other worker types are skipped. - Args: - session_id (Optional[str]): Custom session ID to use for saving - workflows. If None, auto-generates a timestamped session ID. - Useful for organizing workflows by project or context. - (default: :obj:`None`) + Workflows are organized by agent role in role-based folders: + workforce_workflows/{role_name}/*.md Returns: Dict[str, str]: Dictionary mapping worker node IDs to save results. @@ -2918,19 +2922,14 @@ async def save_workflow_memories_async( >>> # save with parallel summarization (faster) >>> results = await workforce.save_workflow_memories_async() >>> print(results) - {'worker_123': '/path/to/developer_agent_workflow.md', - 'worker_456': '/path/to/search_agent_workflow.md', - 'worker_789': '/path/to/document_agent_workflow.md'} + {'worker_123': 'workforce_workflows/developer/task_workflow.md', + 'worker_456': 'workforce_workflows/analyst/report_workflow.md', + 'worker_789': 'workforce_workflows/writer/doc_workflow.md'} """ import asyncio results = {} - # Get or create shared context utility for this save operation - shared_context_utility = self._get_or_create_shared_context_utility( - session_id=session_id - ) - # Prepare tasks for parallel execution async def save_single_worker( child: BaseNode, @@ -2939,11 +2938,77 @@ async def save_single_worker( result).""" if isinstance(child, SingleAgentWorker): try: - # Set the shared context utility for this operation - child._shared_context_utility = shared_context_utility - child.worker.set_context_utility(shared_context_utility) + from camel.utils.context_utils import ( + ContextUtility, + WorkflowSummary, + ) + + # TWO-PASS APPROACH FOR ROLE-BASED SAVING: + # Pass 1: Generate summary to get agent_title + workflow_manager = child._get_workflow_manager() + summary_prompt = ( + workflow_manager._prepare_workflow_prompt() + ) + + # generate summary without saving + # use conversation accumulator if available + gen_result = ( + await child.worker.generate_workflow_summary_async( + summary_prompt=summary_prompt, + response_format=WorkflowSummary, + conversation_accumulator=( + child._conversation_accumulator + ), + ) + ) + + if gen_result.get("status") != "success": + error_msg = gen_result.get( + "status", "Failed to generate summary" + ) + return (child.node_id, f"error: {error_msg}") + + workflow_summary = gen_result.get("structured_summary") + if not workflow_summary: + return ( + child.node_id, + "error: No workflow summary generated", + ) + + # Pass 2: Extract agent_title and determine role folder + role_id = self._get_role_identifier( + child.worker, + child.description, + workflow_summary=workflow_summary, + ) + + # create role-based context utility + role_context = ContextUtility.get_workforce_shared_by_role( + role_id + ) + + # save with correct context and accumulator + result = ( + await workflow_manager.save_workflow_content_async( + workflow_summary=workflow_summary, + context_utility=role_context, + conversation_accumulator=( + child._conversation_accumulator + ), + ) + ) + + # clean up accumulator after successful save + if ( + result.get("status") == "success" + and child._conversation_accumulator is not None + ): + logger.info( + "Cleaning up conversation accumulator after " + "workflow summarization" + ) + child._conversation_accumulator = None - result = await child.save_workflow_memories_async() if result.get("status") == "success": return ( child.node_id, diff --git a/camel/utils/context_utils.py b/camel/utils/context_utils.py index 801298abc2..4bedebddb4 100644 --- a/camel/utils/context_utils.py +++ b/camel/utils/context_utils.py @@ -36,6 +36,17 @@ class WorkflowSummary(BaseModel): by future agents for similar tasks. """ + agent_title: str = Field( + description=( + "A concise role or identity describing WHO the agent is " + "and its purpose (≤ 5 words). This represents the agent's " + "capability or specialization, not the specific task. Use " + "lowercase with underscores. Examples: 'data_analyst', " + "'python_developer', 'research_assistant', 'content_writer', " + "'sales_analyst', 'customer_support_agent'. This is used to " + "organize workflows by agent role." + ) + ) task_title: str = Field( description="A short, generic title of the main task (≤ 10 words). " "Avoid product- or case-specific names. " @@ -118,6 +129,8 @@ def get_instruction_prompt(cls) -> str: 'conversation and extract the key workflow information ' 'following the provided schema structure. If a field has no ' 'content, still include it per the schema, but keep it empty. ' + 'For agent_title, identify your role/capability based on the task ' + 'you performed (e.g., "data_analyst", "python_developer"). ' 'The length of your workflow must be proportional to the ' 'complexity of the task. Example: If the task is simply ' 'about a simple math problem, the workflow must be short, ' @@ -155,12 +168,14 @@ class ContextUtility: # Class variables for shared session management _shared_sessions: ClassVar[Dict[str, 'ContextUtility']] = {} _default_workforce_session: ClassVar[Optional['ContextUtility']] = None + _role_based_contexts: ClassVar[Dict[str, 'ContextUtility']] = {} def __init__( self, working_directory: Optional[str] = None, session_id: Optional[str] = None, create_folder: bool = True, + use_session_subfolder: bool = True, ): r"""Initialize the ContextUtility. @@ -176,15 +191,23 @@ def __init__( immediately. If False, the folder will be created only when needed (e.g., when saving files). Default is True for backward compatibility. + use_session_subfolder (bool): Whether to append session_id as a + subfolder. If False, files are saved directly to + working_directory without session subfolder. Use False for + role-based organization. Default is True for backward + compatibility. """ self.working_directory_param = working_directory - self._setup_storage(working_directory, session_id, create_folder) + self._setup_storage( + working_directory, session_id, create_folder, use_session_subfolder + ) def _setup_storage( self, working_directory: Optional[str], session_id: Optional[str] = None, create_folder: bool = True, + use_session_subfolder: bool = True, ) -> None: r"""Initialize session-specific storage paths and optionally create directory structure for context file management.""" @@ -199,8 +222,9 @@ def _setup_storage( else: self.working_directory = Path("context_files") - # Create session-specific directory - self.working_directory = self.working_directory / self.session_id + # Create session-specific directory only if requested + if use_session_subfolder: + self.working_directory = self.working_directory / self.session_id # Only create directory if requested if create_folder: @@ -537,18 +561,30 @@ def create_session_directory( session_dir.mkdir(parents=True, exist_ok=True) return session_dir - def get_session_metadata(self) -> Dict[str, Any]: + def get_session_metadata( + self, workflow_version: int = 1, created_at: Optional[str] = None + ) -> Dict[str, Any]: r"""Collect comprehensive session information including identifiers, timestamps, and directory paths for tracking and reference. + Args: + workflow_version (int): Version number of the workflow. Defaults + to 1 for new workflows. (default: :obj:`1`) + created_at (Optional[str]): ISO timestamp when workflow was first + created. If None, uses current timestamp for new workflows. + (default: :obj:`None`) + Returns: Dict[str, Any]: Session metadata including ID, timestamp, - directory. + directory, version, and update timestamp. """ + now = datetime.now().isoformat() return { 'session_id': self.session_id, 'working_directory': str(self.working_directory), - 'created_at': datetime.now().isoformat(), + 'created_at': created_at if created_at else now, + 'updated_at': now, + 'workflow_version': workflow_version, } def list_sessions(self, base_dir: Optional[str] = None) -> List[str]: @@ -948,6 +984,11 @@ def get_workforce_shared( ) -> 'ContextUtility': r"""Get or create shared workforce context utility with lazy init. + .. note:: + Session-based workflow storage will be deprecated in a future + version. Consider using :meth:`get_workforce_shared_by_role` for + role-based organization instead. + This method provides a centralized way to access shared context utilities for workforce workflows, ensuring all workforce components use the same session directory. @@ -959,6 +1000,18 @@ def get_workforce_shared( Returns: ContextUtility: Shared context utility instance for workforce. """ + import warnings + + if session_id is not None: + warnings.warn( + "Session-based workflow storage will be deprecated in a " + "future version. Consider using " + "get_workforce_shared_by_role() for role-based " + "organization instead.", + FutureWarning, + stacklevel=2, + ) + if session_id is None: # Use default workforce session if cls._default_workforce_session is None: @@ -991,6 +1044,49 @@ def get_workforce_shared( ) return cls._shared_sessions[session_id] + @classmethod + def get_workforce_shared_by_role( + cls, role_identifier: str + ) -> 'ContextUtility': + r"""Get or create shared workforce context utility based on role. + + This method provides role-based context utilities for workforce + workflows, organizing workflows by agent role instead of session ID. + + Args: + role_identifier (str): Role identifier (e.g., role_name or + agent_title). Will be sanitized for filesystem use. + + Returns: + ContextUtility: Shared context utility instance for the role. + """ + # sanitize role identifier for use as folder name + clean_role = cls.sanitize_workflow_filename(role_identifier) + if not clean_role: + clean_role = "unknown_role" + + # use setdefault to avoid race condition when multiple async tasks + # access the same role simultaneously + if clean_role not in cls._role_based_contexts: + camel_workdir = os.environ.get("CAMEL_WORKDIR") + if camel_workdir: + base_path = os.path.join( + camel_workdir, "workforce_workflows", clean_role + ) + else: + base_path = os.path.join("workforce_workflows", clean_role) + + # setdefault is atomic for dict operations + cls._role_based_contexts.setdefault( + clean_role, + cls( + working_directory=base_path, + create_folder=False, # Don't create folder until needed + use_session_subfolder=False, # No session subfolder + ), + ) + return cls._role_based_contexts[clean_role] + @classmethod def reset_shared_sessions(cls) -> None: r"""Reset shared sessions (useful for testing). @@ -1001,3 +1097,4 @@ def reset_shared_sessions(cls) -> None: """ cls._shared_sessions.clear() cls._default_workforce_session = None + cls._role_based_contexts.clear() diff --git a/examples/workforce/workforce_workflow_memory_example.py b/examples/workforce/workforce_workflow_memory_example.py index 9c73fe1b31..a6b47c4d25 100644 --- a/examples/workforce/workforce_workflow_memory_example.py +++ b/examples/workforce/workforce_workflow_memory_example.py @@ -49,8 +49,9 @@ def create_math_agent() -> ChatAgent: "mathematical concepts. Use the math tools available to you." ), ) + # Use DEFAULT to automatically use Azure OpenAI (configured in .env) model = ModelFactory.create( - model_platform=ModelPlatformType.OPENAI, + model_platform=ModelPlatformType.DEFAULT, model_type=ModelType.DEFAULT, model_config_dict=ChatGPTConfig().as_dict(), ) @@ -171,6 +172,22 @@ async def demonstrate_second_session(): # Load previous workflows loaded_workflows = workforce.load_workflow_memories() + # Print system messages to verify workflows were loaded + print("\n" + "=" * 80) + print("System messages after loading workflows:") + print("=" * 80) + + for worker in workforce._children: + if hasattr(worker, 'worker') and hasattr( + worker.worker, '_system_message' + ): + print(f"\n{worker.description} system message:") + print("-" * 80) + system_msg_content = worker.worker._system_message.content + # Print first 500 chars to avoid too much output + print(system_msg_content) + print("-" * 80) + # Process new tasks with loaded workflow context new_tasks = [ Task( diff --git a/test/workforce/test_workflow_memory.py b/test/workforce/test_workflow_memory.py index 052f7354a0..0ec257e081 100644 --- a/test/workforce/test_workflow_memory.py +++ b/test/workforce/test_workflow_memory.py @@ -39,6 +39,24 @@ def mock_model_backend(): yield stub_model +@pytest.fixture(autouse=True) +def reset_context_utility(): + """Reset ContextUtility shared state before each test. + + This ensures test isolation by clearing cached ContextUtility instances + that persist across tests. Without this, tests can fail when run together + because they inherit stale ContextUtility instances pointing to deleted + temporary directories from previous tests. + """ + from camel.utils.context_utils import ContextUtility + + # reset before test + ContextUtility.reset_shared_sessions() + yield + # cleanup after test + ContextUtility.reset_shared_sessions() + + class MockSingleAgentWorker(SingleAgentWorker): """A mock worker for testing workflow functionality.""" @@ -94,6 +112,90 @@ def mock_workforce(): return workforce +@pytest.fixture +def workflow_files_dir(temp_context_dir): + """Create workflow files for testing. + + Creates a role-based folder structure with sample workflow files. + """ + import time + + # Create role-based folder structure + role_dir = os.path.join( + temp_context_dir, "workforce_workflows", "test_worker" + ) + os.makedirs(role_dir, exist_ok=True) + + # Create multiple workflow files for testing + workflows = [ + { + "filename": "data_analysis_workflow.md", + "content": """### Task Title +Data Analysis Workflow + +### Task Description +Analyze sales data and generate reports + +### Tags +- data-analysis +- statistics +- reporting + +### Steps +1. Load data +2. Analyze trends +3. Generate report +""", + }, + { + "filename": "web_scraping_workflow.md", + "content": """### Task Title +Web Scraping Workflow + +### Task Description +Scrape website data + +### Tags +- web-scraping +- data-collection + +### Steps +1. Configure scraper +2. Extract data +3. Store results +""", + }, + { + "filename": "database_query_workflow.md", + "content": """### Task Title +Database Query Workflow + +### Task Description +Query database for analysis + +### Tags +- database +- sql +- data-analysis + +### Steps +1. Connect to database +2. Execute queries +3. Process results +""", + }, + ] + + # Write workflow files with slight delays to ensure different timestamps + for workflow in workflows: + file_path = os.path.join(role_dir, workflow["filename"]) + with open(file_path, 'w') as f: + f.write(workflow["content"]) + time.sleep(0.01) # Ensure different modification times + + return temp_context_dir + + class TestSingleAgentWorkerWorkflow: """Test workflow functionality for SingleAgentWorker.""" @@ -162,9 +264,9 @@ def test_load_workflow_success( """Test successful workflow loading (legacy pattern matching mode).""" worker = MockSingleAgentWorker("data_analyst") - # Mock file discovery + # Mock file discovery - using role-based structure mock_files = [ - f"{temp_context_dir}/session_123/data_analyst_workflow_20250122.md" + f"{temp_context_dir}/workforce_workflows/test_worker/data_analyst_workflow_20250122.md" ] mock_glob.return_value = mock_files mock_getmtime.return_value = 1234567890 @@ -213,68 +315,29 @@ def test_load_workflow_non_chat_agent(self): with pytest.raises(TypeError, match="Worker must be a ChatAgent"): worker.load_workflow_memories() - @patch('glob.glob') def test_load_workflow_prioritizes_newest_session( - self, mock_glob, temp_context_dir + self, workflow_files_dir ): - """Test that workflow loading prioritizes files from newest sessions - (legacy mode). + """Test workflow loading prioritizes most recent files (legacy). - This test verifies that when multiple workflow files exist across - different sessions, files from the newest session (by session - timestamp) are loaded first, regardless of file modification times. + This test verifies that when multiple workflow files exist, the + most recently modified files are loaded first. """ worker = MockSingleAgentWorker("data_analyst") - # Simulate realistic scenario: multiple workforce sessions over time - # Each session has workflows from different agents - # Session 1 (older): 17:23:56 - # Session 2 (newer): 17:46:50 - mock_files = [ - # Newer session files (data_analyst and other agents) - f"{temp_context_dir}/workforce_workflows/session_20251002_174650_470517/data_analyst_workflow.md", - f"{temp_context_dir}/workforce_workflows/session_20251002_174650_470517/developer_workflow.md", - # Older session files (data_analyst and other agents) - f"{temp_context_dir}/workforce_workflows/session_20251002_172356_365242/data_analyst_workflow.md", - f"{temp_context_dir}/workforce_workflows/session_20251002_172356_365242/researcher_workflow.md", - ] - mock_glob.return_value = mock_files - - # Mock the shared context utility methods - mock_context_utility = MagicMock() - mock_context_utility.load_markdown_file.return_value = ( - "# Workflow content\nThis is workflow data" - ) - mock_context_utility._filter_metadata_from_content.return_value = ( - "This is workflow data" + # Use legacy pattern matching mode + result = worker.load_workflow_memories( + max_workflows=1, use_smart_selection=False ) - with patch( - 'camel.utils.context_utils.ContextUtility.get_workforce_shared', - return_value=mock_context_utility, - ): - # Use legacy pattern matching mode - result = worker.load_workflow_memories( - max_workflows=1, use_smart_selection=False - ) - - assert result is True - - # Verify load was called once - assert mock_context_utility.load_markdown_file.call_count == 1 - - # Get the filename that was loaded (first argument) - loaded_filename = ( - mock_context_utility.load_markdown_file.call_args[0][0] - ) + assert result is True - # The loaded file should be 'data_analyst_workflow' (without .md) - # from the newer session (verified by it being loaded first) - assert loaded_filename == "data_analyst_workflow" + # Verify workflow was loaded into system message + system_content = worker.worker._system_message.content + assert "Workflow" in system_content or "workflow" in system_content - @patch('glob.glob') def test_load_workflow_memories_resets_system_message( - self, mock_glob, temp_context_dir + self, workflow_files_dir ): """Test that multiple calls to load_workflow_memories reset system message (legacy mode). @@ -288,138 +351,34 @@ def test_load_workflow_memories_resets_system_message( # get original system message content original_content = worker.worker._original_system_message.content - # mock workflow files - mock_files = [ - f"{temp_context_dir}/workforce_workflows/session_1/data_analyst_workflow.md" - ] - mock_glob.return_value = mock_files - - # mock the shared context utility methods - mock_context_utility = MagicMock() - mock_context_utility.load_markdown_file.return_value = ( - "# Workflow content\nThis is workflow data" - ) - mock_context_utility._filter_metadata_from_content.return_value = ( - "This is workflow data" + # first call to load_workflow_memories (legacy mode) + worker.load_workflow_memories( + max_workflows=1, use_smart_selection=False ) - with patch( - 'camel.utils.context_utils.ContextUtility.get_workforce_shared', - return_value=mock_context_utility, - ): - # first call to load_workflow_memories (legacy mode) - worker.load_workflow_memories( - max_workflows=1, use_smart_selection=False - ) + # verify reset was called (system message matches original) + first_call_system_content = worker.worker._system_message.content - # verify reset was called (system message matches original) - first_call_system_content = worker.worker._system_message.content - - # second call to load_workflow_memories (legacy mode) - worker.load_workflow_memories( - max_workflows=1, use_smart_selection=False - ) - - # after second call, system message should still be based on - # original (not accumulating from first call) - second_call_system_content = worker.worker._system_message.content + # second call to load_workflow_memories (legacy mode) + worker.load_workflow_memories( + max_workflows=1, use_smart_selection=False + ) - # verify both calls resulted in same base system message - # (indicating reset happened before each load) - assert first_call_system_content == second_call_system_content + # after second call, system message should still be based on + # original (not accumulating from first call) + second_call_system_content = worker.worker._system_message.content - # verify the system message contains original content - assert original_content in second_call_system_content + # verify both calls resulted in same base system message + # (indicating reset happened before each load) + assert first_call_system_content == second_call_system_content - # verify load was called twice (once per load_workflow_memories) - assert mock_context_utility.load_markdown_file.call_count == 2 + # verify the system message contains original content + assert original_content in second_call_system_content class TestWorkforceWorkflowMemoryMethods: """Test workflow functionality for Workforce.""" - @pytest.mark.asyncio - async def test_save_workflow_memories_success(self, mock_workforce): - """Test successful workflow saving for all workers.""" - # Mock save_workflow_memories_async for both workers - mock_result_1 = { - "status": "success", - "file_path": "/path/to/data_analyst_workflow.md", - } - mock_result_2 = { - "status": "success", - "file_path": "/path/to/python_developer_workflow.md", - } - - with ( - patch.object( - mock_workforce._children[0], - 'save_workflow_memories_async', - return_value=mock_result_1, - ), - patch.object( - mock_workforce._children[1], - 'save_workflow_memories_async', - return_value=mock_result_2, - ), - ): - results = await mock_workforce.save_workflow_memories_async() - - assert len(results) == 2 - assert all("/path/to/" in path for path in results.values()) - - @pytest.mark.asyncio - async def test_save_workflow_memories_mixed_results(self, mock_workforce): - """Test workflow saving with mixed success/failure results.""" - # Mock one success, one failure - mock_result_success = { - "status": "success", - "file_path": "/path/to/data_analyst_workflow.md", - } - mock_result_error = { - "status": "error", - "message": "No conversation context", - } - - with ( - patch.object( - mock_workforce._children[0], - 'save_workflow_memories_async', - return_value=mock_result_success, - ), - patch.object( - mock_workforce._children[1], - 'save_workflow_memories_async', - return_value=mock_result_error, - ), - ): - results = await mock_workforce.save_workflow_memories_async() - - assert len(results) == 2 - assert ( - "/path/to/data_analyst_workflow.md" - in results[mock_workforce._children[0].node_id] - ) - assert ( - "error: No conversation context" - in results[mock_workforce._children[1].node_id] - ) - - @pytest.mark.asyncio - async def test_save_workflow_memories_exception(self, mock_workforce): - """Test workflow saving when exception occurs.""" - with patch.object( - mock_workforce._children[0], - 'save_workflow_memories_async', - side_effect=Exception("Test error"), - ): - results = await mock_workforce.save_workflow_memories_async() - - assert ( - "error: Test error" - in results[mock_workforce._children[0].node_id] - ) - @pytest.mark.asyncio async def test_save_workflow_memories_real_execution( self, temp_context_dir @@ -460,47 +419,51 @@ async def test_save_workflow_memories_real_execution( # Store initial conversation accumulator state initial_accumulator = worker._conversation_accumulator - # Mock only the ChatAgent.asummarize() method (which makes LLM calls) - mock_summary_result = { + # Mock only the ChatAgent.generate_workflow_summary_async() method + # Role-based structure: + # workforce_workflows/{role}/{task_title}_workflow.md + from camel.utils.context_utils import WorkflowSummary + + mock_workflow_summary = WorkflowSummary( + agent_title="test_worker", + task_title="Analyze Sales Data", + task_description="Analyzed sales data for quarterly report", + tools=[], + steps=["Load data", "Analyze trends", "Generate report"], + tags=["data-analysis", "sales"], + ) + + mock_gen_result = { "status": "success", - "summary": "Completed data analysis workflow", - "file_path": ( - f"{temp_context_dir}/workforce_workflows/" - "session_test/data_analyst_workflow.md" - ), - "worker_description": "data_analyst", + "structured_summary": mock_workflow_summary, + "summary_content": "Completed data analysis workflow", } with patch.object( - ChatAgent, 'asummarize', return_value=mock_summary_result - ) as mock_asummarize: + ChatAgent, + 'generate_workflow_summary_async', + return_value=mock_gen_result, + ) as mock_generate: # This executes the real save_workflow_memories_async() logic results = await workforce.save_workflow_memories_async() # Verify Workforce correctly processes worker results assert len(results) == 1 assert worker.node_id in results - assert "data_analyst_workflow" in results[worker.node_id] - assert results[worker.node_id] == mock_summary_result["file_path"] + assert "workflow" in results[worker.node_id] + # check that file was saved in role-based directory + assert "test_worker" in results[worker.node_id] + assert "analyze_sales_data_workflow.md" in results[worker.node_id] - # Verify shared context utility was set up correctly - assert worker.worker._context_utility is not None - - # Verify ChatAgent.asummarize was called with correct parameters - # (validates filename generation, prompt preparation, agent - # selection) - mock_asummarize.assert_called_once() - call_kwargs = mock_asummarize.call_args[1] - - # Verify filename generation includes worker role_name - # (not description, as that would be too long) - assert 'filename' in call_kwargs - # MockSingleAgentWorker uses "Test Worker" as role_name - assert 'test_worker_workflow' in call_kwargs['filename'] + # Verify generate_workflow_summary_async was called + mock_generate.assert_called_once() + call_kwargs = mock_generate.call_args[1] # Verify structured output format is set (WorkflowSummary) assert 'response_format' in call_kwargs - assert call_kwargs['response_format'] is not None + from camel.utils.context_utils import WorkflowSummary + + assert call_kwargs['response_format'] == WorkflowSummary # Verify summary prompt was prepared assert 'summary_prompt' in call_kwargs @@ -589,41 +552,6 @@ async def test_workflows_skip_non_single_agent_workers(self): class TestWorkflowIntegration: """Integration tests for workflow functionality.""" - @pytest.mark.asyncio - async def test_end_to_end_workflow_memory(self, temp_context_dir): - """Test complete workflow: save workflow, load in new session.""" - # First session: create workforce and mock workflow saving - workforce1 = Workforce("Test Team") - worker1 = MockSingleAgentWorker("data_analyst") - workforce1._children = [worker1] - - # Mock successful asummarize and save - mock_save_result = { - "status": "success", - "summary": "Data analysis workflow completed", - "file_path": f"{temp_context_dir}/data_analyst_workflow_test.md", - } - - with patch.object( - worker1.worker, 'asummarize', return_value=mock_save_result - ): - save_results = await workforce1.save_workflow_memories_async() - assert ( - save_results[worker1.node_id] == mock_save_result["file_path"] - ) - - # Second session: create new workforce and load workflows - workforce2 = Workforce("Test Team") - worker2 = MockSingleAgentWorker("data_analyst") - workforce2._children = [worker2] - - # Mock successful workflow loading - with patch.object( - worker2, 'load_workflow_memories', return_value=True - ): - load_results = workforce2.load_workflow_memories() - assert load_results[worker2.node_id] is True - @pytest.mark.asyncio async def test_filename_sanitization(self): """Test worker descriptions are properly sanitized for filenames.""" @@ -756,57 +684,6 @@ async def test_filename_generation_with_generic_role_name( or "analyze" in result["file_path"] ) - @pytest.mark.asyncio - async def test_custom_session_id_integration(self, temp_context_dir): - """Test end-to-end workflow with custom session ID. - - This test addresses issue #3277 request for custom session IDs. - """ - # create workforce with custom session id - workforce = Workforce("Test Team") - worker = MockSingleAgentWorker("test_analyst") - workforce._children = [worker] - - custom_session = "project_abc123" - - # simulate conversation - user_msg = BaseMessage.make_user_message( - role_name="User", content="Analyze data" - ) - assistant_msg = BaseMessage.make_assistant_message( - role_name="Assistant", content="Analysis complete" - ) - worker.worker.record_message(user_msg) - worker.worker.record_message(assistant_msg) - - # mock the asummarize method - expected_path = ( - f"{temp_context_dir}/workforce_workflows/" - f"{custom_session}/test_worker_workflow.md" - ) - mock_result = { - "status": "success", - "summary": "Test workflow", - "file_path": expected_path, - } - - with patch.object(ChatAgent, 'asummarize', return_value=mock_result): - # save with custom session id - results = await workforce.save_workflow_memories_async( - session_id=custom_session - ) - - # verify results contain the custom session path - assert worker.node_id in results - assert custom_session in results[worker.node_id] - - # verify the shared context utility was created with custom - # session - assert workforce._shared_context_utility is not None - assert ( - workforce._shared_context_utility.session_id == custom_session - ) - class TestSharedContextUtility: """Test shared context utility functionality.""" @@ -905,63 +782,16 @@ def test_single_agent_worker_uses_shared_context(self): class TestSmartWorkflowSelection: """Test smart workflow selection feature.""" - def test_smart_selection_with_metadata(self, temp_context_dir): + def test_smart_selection_with_metadata(self, workflow_files_dir): """Test smart workflow selection using metadata.""" worker = MockSingleAgentWorker("data_analyst") - # Mock workflow metadata - mock_metadata = [ - { - 'title': 'Data Analysis Workflow', - 'description': 'Analyze sales data and generate reports', - 'tags': ['data-analysis', 'statistics', 'reporting'], - 'file_path': ( - f"{temp_context_dir}/session_1/data_analyst_workflow.md" - ), - }, - { - 'title': 'Web Scraping Workflow', - 'description': 'Scrape website data', - 'tags': ['web-scraping', 'data-collection'], - 'file_path': ( - f"{temp_context_dir}/session_1/web_scraper_workflow.md" - ), - }, - { - 'title': 'Database Query Workflow', - 'description': 'Query database for analysis', - 'tags': ['database', 'sql', 'data-analysis'], - 'file_path': ( - f"{temp_context_dir}/session_1/db_analyst_workflow.md" - ), - }, - ] - - # Mock context utility to return info - mock_context_utility = MagicMock() - mock_context_utility.get_all_workflows_info.return_value = ( - mock_metadata - ) - mock_context_utility.load_markdown_file.return_value = ( - "# Workflow content\nThis is workflow data" - ) - mock_context_utility._filter_metadata_from_content.return_value = ( - "This is workflow data" - ) - - # Mock agent response to select first workflow + # Mock agent response to select workflows 1 and 3 mock_agent_response = MagicMock() mock_agent_response.msgs = [MagicMock(content="1, 3")] - # Mock get_workforce_shared to return our mock context utility - with ( - patch( - 'camel.societies.workforce.workflow_memory_manager.ContextUtility.get_workforce_shared', - return_value=mock_context_utility, - ), - patch.object( - worker.worker, 'step', return_value=mock_agent_response - ), + with patch.object( + worker.worker, 'step', return_value=mock_agent_response ): result = worker.load_workflow_memories( max_workflows=2, use_smart_selection=True @@ -969,9 +799,12 @@ def test_smart_selection_with_metadata(self, temp_context_dir): # Verify smart selection was used assert result is True - mock_context_utility.get_all_workflows_info.assert_called_once() worker.worker.step.assert_called_once() + # Verify workflows were loaded into system message + system_content = worker.worker._system_message.content + assert "Workflow" in system_content or "workflow" in system_content + # Verify agent was asked to select workflows selection_call = worker.worker.step.call_args[0][0] assert "data_analyst" in selection_call.content @@ -979,115 +812,45 @@ def test_smart_selection_with_metadata(self, temp_context_dir): assert "data-analysis" in selection_call.content def test_smart_selection_no_metadata(self, temp_context_dir): - """Test smart selection when no workflow info found.""" + """Test smart selection when no workflow files exist.""" worker = MockSingleAgentWorker("data_analyst") - # Mock context utility to return empty info - mock_context_utility = MagicMock() - mock_context_utility.get_all_workflows_info.return_value = [] - - with patch( - 'camel.societies.workforce.workflow_memory_manager.ContextUtility.get_workforce_shared', - return_value=mock_context_utility, - ): - result = worker.load_workflow_memories(use_smart_selection=True) + # Don't create any workflow files - should return False + result = worker.load_workflow_memories(use_smart_selection=True) - assert result is False - mock_context_utility.get_all_workflows_info.assert_called_once() + assert result is False - def test_smart_selection_fewer_workflows_than_max(self, temp_context_dir): + def test_smart_selection_fewer_workflows_than_max( + self, workflow_files_dir + ): """Test smart selection when fewer workflows exist than max_files.""" worker = MockSingleAgentWorker("data_analyst") - # Mock only 2 workflows but ask for 5 - mock_metadata = [ - { - 'title': 'Workflow 1', - 'description': 'Test workflow 1', - 'tags': ['test'], - 'file_path': f"{temp_context_dir}/session_1/workflow1.md", - }, - { - 'title': 'Workflow 2', - 'description': 'Test workflow 2', - 'tags': ['test'], - 'file_path': f"{temp_context_dir}/session_1/workflow2.md", - }, - ] - - mock_context_utility = MagicMock() - mock_context_utility.get_all_workflows_info.return_value = ( - mock_metadata - ) - mock_context_utility.load_markdown_file.return_value = ( - "# Workflow content\nThis is workflow data" - ) - mock_context_utility._filter_metadata_from_content.return_value = ( - "This is workflow data" + # Only 3 workflows exist but ask for 5 - should load all 3 + # without agent selection + result = worker.load_workflow_memories( + max_workflows=5, use_smart_selection=True ) - with patch( - 'camel.societies.workforce.workflow_memory_manager.ContextUtility.get_workforce_shared', - return_value=mock_context_utility, - ): - result = worker.load_workflow_memories( - max_workflows=5, use_smart_selection=True - ) + # Should load all 3 workflows without agent selection + assert result is True - # Should load all 2 workflows without agent selection - assert result is True - assert mock_context_utility.load_markdown_file.call_count == 2 + # Verify workflows were loaded + system_content = worker.worker._system_message.content + assert "Workflow" in system_content or "workflow" in system_content def test_smart_selection_agent_selection_failure_fallback( - self, temp_context_dir + self, workflow_files_dir ): """Test fallback to most recent when agent selection fails.""" worker = MockSingleAgentWorker("data_analyst") - mock_metadata = [ - { - 'title': 'Workflow 1', - 'description': 'Test', - 'tags': ['test'], - 'file_path': f"{temp_context_dir}/session_1/workflow1.md", - }, - { - 'title': 'Workflow 2', - 'description': 'Test', - 'tags': ['test'], - 'file_path': f"{temp_context_dir}/session_1/workflow2.md", - }, - { - 'title': 'Workflow 3', - 'description': 'Test', - 'tags': ['test'], - 'file_path': f"{temp_context_dir}/session_1/workflow3.md", - }, - ] - - mock_context_utility = MagicMock() - mock_context_utility.get_all_workflows_info.return_value = ( - mock_metadata - ) - mock_context_utility.load_markdown_file.return_value = ( - "# Workflow content\nThis is workflow data" - ) - mock_context_utility._filter_metadata_from_content.return_value = ( - "This is workflow data" - ) - # Mock agent to return invalid response mock_agent_response = MagicMock() mock_agent_response.msgs = [MagicMock(content="invalid response")] - with ( - patch( - 'camel.societies.workforce.workflow_memory_manager.ContextUtility.get_workforce_shared', - return_value=mock_context_utility, - ), - patch.object( - worker.worker, 'step', return_value=mock_agent_response - ), + with patch.object( + worker.worker, 'step', return_value=mock_agent_response ): result = worker.load_workflow_memories( max_workflows=2, use_smart_selection=True @@ -1095,44 +858,23 @@ def test_smart_selection_agent_selection_failure_fallback( # Should fallback to first 2 workflows (most recent) assert result is True - assert mock_context_utility.load_markdown_file.call_count == 2 - def test_smart_selection_memory_cleanup(self, temp_context_dir): + # Verify workflows were loaded + system_content = worker.worker._system_message.content + assert "Workflow" in system_content or "workflow" in system_content + + def test_smart_selection_memory_cleanup(self, workflow_files_dir): """Test that agent memory is cleaned after smart selection.""" worker = MockSingleAgentWorker("data_analyst") - mock_metadata = [ - { - 'title': 'Test Workflow', - 'description': 'Test', - 'tags': ['test'], - 'file_path': f"{temp_context_dir}/session_1/workflow.md", - } - ] + result = worker.load_workflow_memories(use_smart_selection=True) - mock_context_utility = MagicMock() - mock_context_utility.get_all_workflows_info.return_value = ( - mock_metadata - ) - mock_context_utility.load_markdown_file.return_value = ( - "# Workflow content\nThis is workflow data" - ) - mock_context_utility._filter_metadata_from_content.return_value = ( - "This is workflow data" - ) + assert result is True - with patch( - 'camel.societies.workforce.workflow_memory_manager.ContextUtility.get_workforce_shared', - return_value=mock_context_utility, - ): - result = worker.load_workflow_memories(use_smart_selection=True) - - assert result is True - - # Memory should be cleaned (only system message remains) - final_memory = worker.worker.memory.get_context()[0] - # Should have system message only - assert len(final_memory) == 1 + # Memory should be cleaned (only system message remains) + final_memory = worker.worker.memory.get_context()[0] + # Should have system message only + assert len(final_memory) == 1 def test_extract_workflow_info(self, temp_context_dir): """Test info extraction from workflow markdown file.""" @@ -1271,11 +1013,11 @@ def test_practical_smart_selection(self, temp_context_dir): """ import os - # Create session directory in workforce_workflows - session_dir = os.path.join( - temp_context_dir, "workforce_workflows", "session_test" + # Create role-based directory in workforce_workflows + role_dir = os.path.join( + temp_context_dir, "workforce_workflows", "test_worker" ) - os.makedirs(session_dir, exist_ok=True) + os.makedirs(role_dir, exist_ok=True) # Create 10 different workflow files with varied content workflows = [ @@ -1349,11 +1091,11 @@ def test_practical_smart_selection(self, temp_context_dir): # Write workflow files for workflow in workflows: - file_path = os.path.join(session_dir, workflow["name"]) + file_path = os.path.join(role_dir, workflow["name"]) content = f"""## Metadata - session_id: session_test -- working_directory: {session_dir} +- working_directory: {role_dir} - created_at: 2025-01-15T10:00:00.000000 - agent_id: test-agent-id - message_count: 5 @@ -2448,3 +2190,303 @@ async def test_save_workflow_with_very_long_conversation( elif result["status"] == "error": # graceful error handling assert "message" in result + + +class TestWorkflowVersioning: + """Test workflow versioning functionality.""" + + @pytest.mark.asyncio + async def test_workflow_version_increments_and_preserves_created_at( + self, temp_context_dir + ): + """Test version increments (v1->v2->v3) and created_at is preserved.""" + import asyncio + import re + from pathlib import Path + + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import ContextUtility, WorkflowSummary + + worker = MockSingleAgentWorker("data_analyst") + + role_name = "data_analyst" + context_utility = ContextUtility( + working_directory=f"{temp_context_dir}/workforce_workflows/{role_name}", + create_folder=True, + use_session_subfolder=False, + ) + + manager = WorkflowMemoryManager( + worker=worker.worker, + description="Data analyst", + context_utility=context_utility, + role_identifier=role_name, + ) + + workflow_summary = WorkflowSummary( + agent_title="data_analyst", + task_title="Calculate Sum", + task_description="Calculate sum", + tools=[], + steps=["1. Add numbers"], + failure_and_recovery_strategies=[], + notes_and_observations="", + tags=["math"], + ) + + # save v1 + result1 = await manager.save_workflow_content_async( + workflow_summary=workflow_summary, + context_utility=context_utility, + conversation_accumulator=None, + ) + assert result1["status"] == "success" + workflow_file = Path(result1["file_path"]) + content1 = workflow_file.read_text() + assert "workflow_version: 1" in content1 + + created_match = re.search(r"created_at: (.+)", content1) + assert created_match + created_at_v1 = created_match.group(1).strip() + + await asyncio.sleep(0.01) + + # save v2 + await manager.save_workflow_content_async( + workflow_summary=workflow_summary, + context_utility=context_utility, + conversation_accumulator=None, + ) + content2 = workflow_file.read_text() + assert "workflow_version: 2" in content2 + + await asyncio.sleep(0.01) + + # save v3 + await manager.save_workflow_content_async( + workflow_summary=workflow_summary, + context_utility=context_utility, + conversation_accumulator=None, + ) + content3 = workflow_file.read_text() + assert "workflow_version: 3" in content3 + + # verify created_at preserved + created_match_v3 = re.search(r"created_at: (.+)", content3) + assert created_match_v3 + created_at_v3 = created_match_v3.group(1).strip() + assert created_at_v1 == created_at_v3 + + @pytest.mark.asyncio + async def test_workflow_version_disabled(self, temp_context_dir): + """Test that versioning can be disabled via config.""" + from pathlib import Path + + from camel.societies.workforce.utils import WorkflowConfig + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import ContextUtility, WorkflowSummary + + worker = MockSingleAgentWorker("developer") + + role_name = "developer" + context_utility = ContextUtility( + working_directory=f"{temp_context_dir}/workforce_workflows/{role_name}", + create_folder=True, + use_session_subfolder=False, + ) + + # create config with versioning disabled + config = WorkflowConfig(enable_versioning=False) + + manager = WorkflowMemoryManager( + worker=worker.worker, + description="Software developer", + context_utility=context_utility, + role_identifier=role_name, + config=config, + ) + + workflow_summary = WorkflowSummary( + agent_title="developer", + task_title="Fix Bug", + task_description="Fix critical production bug", + tools=[], + steps=["1. Identify issue", "2. Fix code"], + failure_and_recovery_strategies=[], + notes_and_observations="", + tags=["bug-fix"], + ) + + # save first time + result1 = await manager.save_workflow_content_async( + workflow_summary=workflow_summary, + context_utility=context_utility, + conversation_accumulator=None, + ) + + assert result1["status"] == "success" + workflow_file = Path(result1["file_path"]) + content1 = workflow_file.read_text() + assert "workflow_version: 1" in content1 + + # save again - version should STILL be 1 (not incremented) + result2 = await manager.save_workflow_content_async( + workflow_summary=workflow_summary, + context_utility=context_utility, + conversation_accumulator=None, + ) + + assert result2["status"] == "success" + content2 = workflow_file.read_text() + assert ( + "workflow_version: 1" in content2 + ), "Version should remain 1 when versioning is disabled" + assert "workflow_version: 2" not in content2 + + @pytest.mark.asyncio + async def test_extract_metadata_from_existing_workflow( + self, temp_context_dir + ): + """Test extracting metadata from an existing workflow file.""" + from pathlib import Path + + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + + worker = MockSingleAgentWorker("extractor_test") + + manager = WorkflowMemoryManager( + worker=worker.worker, + description="Test worker", + role_identifier="extractor_test", + ) + + # create a workflow file with metadata + workflow_dir = Path(temp_context_dir) / "test_workflows" + workflow_dir.mkdir(parents=True, exist_ok=True) + workflow_file = workflow_dir / "test_workflow.md" + + workflow_content = """## Metadata + +- session_id: test_session_123 +- working_directory: /tmp/test +- created_at: 2025-01-15T10:00:00.000000 +- updated_at: 2025-01-15T12:00:00.000000 +- workflow_version: 3 +- agent_id: agent-uuid-456 +- message_count: 42 + +## WorkflowSummary + +### Task Title +Test Workflow + +### Task Description +This is a test workflow for metadata extraction. +""" + + workflow_file.write_text(workflow_content) + + # extract metadata + metadata = manager._extract_existing_workflow_metadata(workflow_file) + + assert metadata is not None, "Should successfully extract metadata" + assert metadata.session_id == "test_session_123" + assert metadata.working_directory == "/tmp/test" + assert metadata.workflow_version == 3 + assert metadata.agent_id == "agent-uuid-456" + assert metadata.message_count == 42 + assert metadata.created_at == "2025-01-15T10:00:00.000000" + assert metadata.updated_at == "2025-01-15T12:00:00.000000" + + @pytest.mark.asyncio + async def test_different_workflows_have_independent_versions( + self, temp_context_dir + ): + """Test that different workflow files maintain independent versions.""" + from pathlib import Path + + from camel.societies.workforce.workflow_memory_manager import ( + WorkflowMemoryManager, + ) + from camel.utils.context_utils import ContextUtility, WorkflowSummary + + worker = MockSingleAgentWorker("multi_task_worker") + + role_name = "multi_task_worker" + context_utility = ContextUtility( + working_directory=f"{temp_context_dir}/workforce_workflows/{role_name}", + create_folder=True, + use_session_subfolder=False, + ) + + manager = WorkflowMemoryManager( + worker=worker.worker, + description="Multi-task worker", + context_utility=context_utility, + role_identifier=role_name, + ) + + # create first workflow + workflow1 = WorkflowSummary( + agent_title="multi_task_worker", + task_title="Task Alpha", + task_description="First task", + tools=[], + steps=["Step 1"], + failure_and_recovery_strategies=[], + notes_and_observations="", + tags=["alpha"], + ) + + # create second workflow with different title + workflow2 = WorkflowSummary( + agent_title="multi_task_worker", + task_title="Task Beta", + task_description="Second task", + tools=[], + steps=["Step 1"], + failure_and_recovery_strategies=[], + notes_and_observations="", + tags=["beta"], + ) + + # save workflow1 twice (should be v1 then v2) + result1a = await manager.save_workflow_content_async( + workflow_summary=workflow1, + context_utility=context_utility, + conversation_accumulator=None, + ) + assert result1a["status"] == "success" + file1 = Path(result1a["file_path"]) + assert "workflow_version: 1" in file1.read_text() + + await manager.save_workflow_content_async( + workflow_summary=workflow1, + context_utility=context_utility, + conversation_accumulator=None, + ) + assert "workflow_version: 2" in file1.read_text() + + # save workflow2 (should be v1, independent of workflow1) + result2a = await manager.save_workflow_content_async( + workflow_summary=workflow2, + context_utility=context_utility, + conversation_accumulator=None, + ) + assert result2a["status"] == "success" + file2 = Path(result2a["file_path"]) + + # workflow2 should be v1 (its first save) + assert "workflow_version: 1" in file2.read_text() + + # workflow1 should still be v2 + assert "workflow_version: 2" in file1.read_text() + + # verify they are different files + assert file1 != file2 From 2609a78f731cbc5c9bd7c25e1bb78fff689e3fde Mon Sep 17 00:00:00 2001 From: camel-docs-bot Date: Mon, 24 Nov 2025 15:01:07 +0000 Subject: [PATCH 06/10] Auto-update documentation after merge [skip ci] --- .../reference/camel.agents.chat_agent.mdx | 25 +++ .../camel.societies.workforce.utils.mdx | 51 +++++ ...ties.workforce.workflow_memory_manager.mdx | 209 +++++++++++++++++- .../camel.societies.workforce.workforce.mdx | 50 +++-- .../reference/camel.utils.context_utils.mdx | 49 +++- 5 files changed, 351 insertions(+), 33 deletions(-) diff --git a/docs/mintlify/reference/camel.agents.chat_agent.mdx b/docs/mintlify/reference/camel.agents.chat_agent.mdx index 42266a854d..5805742fd7 100644 --- a/docs/mintlify/reference/camel.agents.chat_agent.mdx +++ b/docs/mintlify/reference/camel.agents.chat_agent.mdx @@ -897,6 +897,31 @@ response_format was provided. See Also: :meth:`asummarize`: Async version for non-blocking LLM calls. + + +### _build_conversation_text_from_messages + +```python +def _build_conversation_text_from_messages(self, messages: List[Any], include_summaries: bool = False): +``` + +Build conversation text from messages for summarization. + +This is a shared helper method that converts messages to a formatted +conversation text string, handling tool calls, tool results, and +regular messages. + +**Parameters:** + +- **messages** (List[Any]): List of messages to convert. +- **include_summaries** (bool): Whether to include messages starting with [CONTEXT_SUMMARY]. (default: :obj:`False`) + +**Returns:** + + tuple[str, List[str]]: A tuple containing: +- Formatted conversation text +- List of user messages extracted from the conversation + ### clear_memory diff --git a/docs/mintlify/reference/camel.societies.workforce.utils.mdx b/docs/mintlify/reference/camel.societies.workforce.utils.mdx index 90b39a23b8..8523178199 100644 --- a/docs/mintlify/reference/camel.societies.workforce.utils.mdx +++ b/docs/mintlify/reference/camel.societies.workforce.utils.mdx @@ -1,5 +1,56 @@ + + +## is_generic_role_name + +```python +def is_generic_role_name(role_name: str): +``` + +Check if a role name is generic and should trigger fallback logic. + +Generic role names are common, non-specific identifiers that don't +provide meaningful information about an agent's actual purpose. +When a role name is generic, fallback logic should be used to find +a more specific identifier (e.g., from LLM-generated agent_title +or description). + +**Parameters:** + +- **role_name** (str): The role name to check (will be converted to lowercase for case-insensitive comparison). + +**Returns:** + + bool: True if the role name is generic, False otherwise. + + + +## WorkflowMetadata + +```python +class WorkflowMetadata(BaseModel): +``` + +Pydantic model for workflow metadata tracking. + +This model defines the formal schema for workflow metadata that tracks +versioning, timestamps, and contextual information about saved workflows. +Used to maintain workflow history and enable proper version management. + + + +## WorkflowConfig + +```python +class WorkflowConfig(BaseModel): +``` + +Configuration for workflow memory management. + +Centralizes all workflow-related configuration options to avoid scattered +settings across multiple files and methods. + ## WorkerConf diff --git a/docs/mintlify/reference/camel.societies.workforce.workflow_memory_manager.mdx b/docs/mintlify/reference/camel.societies.workforce.workflow_memory_manager.mdx index 78a0599767..5c1b2dc9ce 100644 --- a/docs/mintlify/reference/camel.societies.workforce.workflow_memory_manager.mdx +++ b/docs/mintlify/reference/camel.societies.workforce.workflow_memory_manager.mdx @@ -37,6 +37,8 @@ workflow management concerns from the core worker task processing logic. - **worker** (ChatAgent): The worker agent that will use workflows. - **description** (str): Description of the worker's role. - **context_utility** (Optional[ContextUtility]): Shared context utility for workflow operations. If None, creates a new instance. +- **role_identifier** (Optional[str]): Role identifier for organizing workflows by role. If provided, workflows will be stored in role-based folders. If None, uses default workforce context. +- **config** (Optional[WorkflowConfig]): Configuration for workflow management. If None, uses default configuration. @@ -47,7 +49,9 @@ def __init__( self, worker: ChatAgent, description: str, - context_utility: Optional[ContextUtility] = None + context_utility: Optional[ContextUtility] = None, + role_identifier: Optional[str] = None, + config: Optional[WorkflowConfig] = None ): ``` @@ -61,6 +65,132 @@ def _get_context_utility(self): Get context utility with lazy initialization. +Uses role-based context if role_identifier is set, otherwise falls +back to default workforce shared context. + + + +### _extract_existing_workflow_metadata + +```python +def _extract_existing_workflow_metadata(self, file_path: Path): +``` + +Extract metadata from an existing workflow file for versioning. + +This method reads the metadata section from an existing workflow +markdown file to retrieve version number and creation timestamp, +enabling proper version tracking when updating workflows. + +**Parameters:** + +- **file_path** (Path): Path to the existing workflow file. + +**Returns:** + + Optional[WorkflowMetadata]: WorkflowMetadata instance if file +exists and metadata is successfully parsed, None otherwise. + + + +### _try_role_based_loading + +```python +def _try_role_based_loading( + self, + role_name: str, + pattern: Optional[str], + max_files_to_load: int, + use_smart_selection: bool +): +``` + +Try loading workflows from role-based directory structure. + +**Parameters:** + +- **role_name** (str): Role name to load workflows from. +- **pattern** (Optional[str]): Custom search pattern for workflow files. +- **max_files_to_load** (int): Maximum number of workflow files to load. +- **use_smart_selection** (bool): Whether to use agent-based selection. + +**Returns:** + + bool: True if workflows were successfully loaded, False otherwise. + + + +### _try_session_based_loading + +```python +def _try_session_based_loading( + self, + session_id: str, + role_name: str, + pattern: Optional[str], + max_files_to_load: int, + use_smart_selection: bool +): +``` + +Try loading workflows from session-based directory (deprecated). + +**Parameters:** + +- **session_id** (str): Workforce session ID to load from. +- **role_name** (str): Role name (for deprecation warning). +- **pattern** (Optional[str]): Custom search pattern for workflow files. +- **max_files_to_load** (int): Maximum number of workflow files to load. +- **use_smart_selection** (bool): Whether to use agent-based selection. + +**Returns:** + + bool: True if workflows were successfully loaded, False otherwise. + + + +### _session_based_smart_loading + +```python +def _session_based_smart_loading(self, session_id: str, max_files_to_load: int): +``` + +Load workflows from session using smart selection. + +**Parameters:** + +- **session_id** (str): Session ID to load from. +- **max_files_to_load** (int): Maximum number of files to load. + +**Returns:** + + bool: True if workflows were loaded, False otherwise. + + + +### _session_based_pattern_loading + +```python +def _session_based_pattern_loading( + self, + pattern: Optional[str], + session_id: str, + max_files_to_load: int +): +``` + +Load workflows from session using pattern matching. + +**Parameters:** + +- **pattern** (Optional[str]): Pattern for file matching. +- **session_id** (str): Session ID to load from. +- **max_files_to_load** (int): Maximum number of files to load. + +**Returns:** + + bool: True if workflows were loaded, False otherwise. + ### load_workflows @@ -69,7 +199,7 @@ Get context utility with lazy initialization. def load_workflows( self, pattern: Optional[str] = None, - max_files_to_load: int = 3, + max_files_to_load: Optional[int] = None, session_id: Optional[str] = None, use_smart_selection: bool = True ): @@ -77,15 +207,46 @@ def load_workflows( Load workflow memories using intelligent agent-based selection. -This method uses the worker agent to intelligently select the most -relevant workflows based on workflow information (title, description, -tags) rather than simple filename pattern matching. +This method first tries to load workflows from the role-based folder +structure. If no workflows are found and session_id is provided, falls +back to session-based loading (deprecated). **Parameters:** - **pattern** (Optional[str]): Legacy parameter for backward compatibility. When use_smart_selection=False, uses this pattern for file matching. Ignored when smart selection is enabled. -- **max_files_to_load** (int): Maximum number of workflow files to load. (default: :obj:`3`) -- **session_id** (Optional[str]): Specific workforce session ID to load from. If None, searches across all sessions. (default: :obj:`None`) +- **max_files_to_load** (Optional[int]): Maximum number of workflow files to load. If None, uses config.default_max_files_to_load. (default: :obj:`None`) +- **session_id** (Optional[str]): Deprecated. Specific workforce session ID to load from using legacy session-based organization. (default: :obj:`None`) +- **use_smart_selection** (bool): Whether to use agent-based intelligent workflow selection. When True, uses workflow information and LLM to select most relevant workflows. When False, falls back to pattern matching. (default: :obj:`True`) + +**Returns:** + + bool: True if workflow memories were successfully loaded, False +otherwise. Check logs for detailed error messages. + + + +### load_workflows_by_role + +```python +def load_workflows_by_role( + self, + role_name: Optional[str] = None, + pattern: Optional[str] = None, + max_files_to_load: Optional[int] = None, + use_smart_selection: bool = True +): +``` + +Load workflow memories from role-based directory structure. + +This method loads workflows from the new role-based folder structure: +workforce_workflows/\{role_name\}/*.md + +**Parameters:** + +- **role_name** (Optional[str]): Role name to load workflows from. If None, uses the worker's role_name or role_identifier. +- **pattern** (Optional[str]): Custom search pattern for workflow files. Ignored when use_smart_selection=True. +- **max_files_to_load** (Optional[int]): Maximum number of workflow files to load. If None, uses config.default_max_files_to_load. (default: :obj:`None`) - **use_smart_selection** (bool): Whether to use agent-based intelligent workflow selection. When True, uses workflow information and LLM to select most relevant workflows. When False, falls back to pattern matching. (default: :obj:`True`) **Returns:** @@ -177,6 +338,11 @@ def _find_workflow_files(self, pattern: Optional[str], session_id: Optional[str] Find and return sorted workflow files matching the pattern. +.. note:: +Session-based workflow search will be deprecated in a future +version. Consider using :meth:`_find_workflow_files_by_role` for +role-based organization instead. + **Parameters:** - **pattern** (Optional[str]): Custom search pattern for workflow files. If None, uses worker role_name to generate pattern. @@ -187,6 +353,33 @@ Find and return sorted workflow files matching the pattern. List[str]: Sorted list of workflow file paths (empty if validation fails). + + +### _find_workflow_files_by_role + +```python +def _find_workflow_files_by_role( + self, + role_name: Optional[str] = None, + pattern: Optional[str] = None +): +``` + +Find workflow files in role-based directory structure. + +This method searches for workflows in the new role-based folder +structure: workforce_workflows/\{role_name\}/*.md + +**Parameters:** + +- **role_name** (Optional[str]): Role name to search for. If None, uses the worker's role_name or role_identifier. +- **pattern** (Optional[str]): Custom search pattern for workflow files. If None, searches for all workflow files in the role directory. + +**Returns:** + + List[str]: Sorted list of workflow file paths by modification time +(most recent first). + ### _collect_workflow_contents @@ -287,7 +480,7 @@ def _generate_workflow_filename(self): **Returns:** str: Sanitized filename without timestamp and without .md -extension. Format: \{role_name\}_workflow +extension. Format: \{role_name\}\{workflow_filename_suffix\} diff --git a/docs/mintlify/reference/camel.societies.workforce.workforce.mdx b/docs/mintlify/reference/camel.societies.workforce.workforce.mdx index bc7a6387a1..7191c48803 100644 --- a/docs/mintlify/reference/camel.societies.workforce.workforce.mdx +++ b/docs/mintlify/reference/camel.societies.workforce.workforce.mdx @@ -153,6 +153,34 @@ unnecessary session folder creation during initialization. ContextUtility: The shared context utility instance. + + +### _get_role_identifier + +```python +def _get_role_identifier( + self, + worker: ChatAgent, + description: str, + workflow_summary: Optional['WorkflowSummary'] = None +): +``` + +Extract role identifier for organizing workflows. + +Uses priority fallback: role_name → agent_title (from +WorkflowSummary) → sanitized description. + +**Parameters:** + +- **worker** (ChatAgent): The worker agent to extract role from. +- **description** (str): Worker description to use as fallback. +- **workflow_summary** (Optional[WorkflowSummary]): Optional WorkflowSummary object that may contain agent_title field. + +**Returns:** + + str: Role identifier for organizing workflows. + ### _validate_agent_compatibility @@ -989,29 +1017,9 @@ be called when the workforce is not running. ### save_workflow_memories ```python -def save_workflow_memories(self, session_id: Optional[str] = None): +def save_workflow_memories(self): ``` -Save workflow memories for all SingleAgentWorker instances in the -workforce. - -.. deprecated:: 0.2.80 -This synchronous method processes workers sequentially, which can -be slow for multiple agents. Use -:meth:`save_workflow_memories_async` -instead for parallel processing and significantly better -performance. - -This method iterates through all child workers and triggers workflow -saving for SingleAgentWorker instances using their -save_workflow_memories() -method. -Other worker types are skipped. - -**Parameters:** - -- **session_id** (Optional[str]): Custom session ID to use for saving workflows. If None, auto-generates a timestamped session ID. Useful for organizing workflows by project or context. (default: :obj:`None`) - **Returns:** Dict[str, str]: Dictionary mapping worker node IDs to save results. diff --git a/docs/mintlify/reference/camel.utils.context_utils.mdx b/docs/mintlify/reference/camel.utils.context_utils.mdx index 6fbec3ee64..a8e5d55868 100644 --- a/docs/mintlify/reference/camel.utils.context_utils.mdx +++ b/docs/mintlify/reference/camel.utils.context_utils.mdx @@ -57,7 +57,8 @@ def __init__( self, working_directory: Optional[str] = None, session_id: Optional[str] = None, - create_folder: bool = True + create_folder: bool = True, + use_session_subfolder: bool = True ): ``` @@ -68,6 +69,7 @@ Initialize the ContextUtility. - **working_directory** (str, optional): The directory path where files will be stored. If not provided, a default directory will be used. - **session_id** (str, optional): The session ID to use. If provided, this instance will use the same session folder as other instances with the same session_id. If not provided, a new session ID will be generated. - **create_folder** (bool): Whether to create the session folder immediately. If False, the folder will be created only when needed (e.g., when saving files). Default is True for backward compatibility. +- **use_session_subfolder** (bool): Whether to append session_id as a subfolder. If False, files are saved directly to working_directory without session subfolder. Use False for role-based organization. Default is True for backward compatibility. @@ -78,7 +80,8 @@ def _setup_storage( self, working_directory: Optional[str], session_id: Optional[str] = None, - create_folder: bool = True + create_folder: bool = True, + use_session_subfolder: bool = True ): ``` @@ -321,13 +324,25 @@ Create a session-specific directory. ### get_session_metadata ```python -def get_session_metadata(self): +def get_session_metadata( + self, + workflow_version: int = 1, + created_at: Optional[str] = None +): ``` +Collect comprehensive session information including identifiers, +timestamps, and directory paths for tracking and reference. + +**Parameters:** + +- **workflow_version** (int): Version number of the workflow. Defaults to 1 for new workflows. (default: :obj:`1`) +- **created_at** (Optional[str]): ISO timestamp when workflow was first created. If None, uses current timestamp for new workflows. (default: :obj:`None`) + **Returns:** Dict[str, Any]: Session metadata including ID, timestamp, -directory. +directory, version, and update timestamp. @@ -514,6 +529,11 @@ def get_workforce_shared(cls, session_id: Optional[str] = None): Get or create shared workforce context utility with lazy init. +.. note:: +Session-based workflow storage will be deprecated in a future +version. Consider using :meth:`get_workforce_shared_by_role` for +role-based organization instead. + This method provides a centralized way to access shared context utilities for workforce workflows, ensuring all workforce components use the same session directory. @@ -526,6 +546,27 @@ use the same session directory. ContextUtility: Shared context utility instance for workforce. + + +### get_workforce_shared_by_role + +```python +def get_workforce_shared_by_role(cls, role_identifier: str): +``` + +Get or create shared workforce context utility based on role. + +This method provides role-based context utilities for workforce +workflows, organizing workflows by agent role instead of session ID. + +**Parameters:** + +- **role_identifier** (str): Role identifier (e.g., role_name or agent_title). Will be sanitized for filesystem use. + +**Returns:** + + ContextUtility: Shared context utility instance for the role. + ### reset_shared_sessions From 5b1bde32895d7282361f93b58f60a881384dc0e7 Mon Sep 17 00:00:00 2001 From: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:31:37 +0800 Subject: [PATCH 07/10] feat: Support Claude Opus 4.5 (#3450) --- camel/types/enums.py | 3 + ...net_example.py => claude_model_example.py} | 83 +++++++++++++++++-- test/models/test_anthropic_model.py | 1 + 3 files changed, 82 insertions(+), 5 deletions(-) rename examples/models/{claude_4_5_sonnet_example.py => claude_model_example.py} (59%) diff --git a/camel/types/enums.py b/camel/types/enums.py index 61fdbb5fbf..6525a1cf89 100644 --- a/camel/types/enums.py +++ b/camel/types/enums.py @@ -216,6 +216,7 @@ class ModelType(UnifiedModelType, Enum): CLAUDE_3_5_HAIKU = "claude-3-5-haiku-latest" CLAUDE_3_7_SONNET = "claude-3-7-sonnet-latest" CLAUDE_SONNET_4_5 = "claude-sonnet-4-5" + CLAUDE_OPUS_4_5 = "claude-opus-4-5" CLAUDE_SONNET_4 = "claude-sonnet-4-20250514" CLAUDE_OPUS_4 = "claude-opus-4-20250514" CLAUDE_OPUS_4_1 = "claude-opus-4-1-20250805" @@ -685,6 +686,7 @@ def is_anthropic(self) -> bool: ModelType.CLAUDE_3_5_HAIKU, ModelType.CLAUDE_3_7_SONNET, ModelType.CLAUDE_SONNET_4_5, + ModelType.CLAUDE_OPUS_4_5, ModelType.CLAUDE_SONNET_4, ModelType.CLAUDE_OPUS_4, ModelType.CLAUDE_OPUS_4_1, @@ -1521,6 +1523,7 @@ def token_limit(self) -> int: ModelType.CLAUDE_3_5_HAIKU, ModelType.CLAUDE_3_7_SONNET, ModelType.CLAUDE_SONNET_4_5, + ModelType.CLAUDE_OPUS_4_5, ModelType.CLAUDE_SONNET_4, ModelType.CLAUDE_OPUS_4, ModelType.CLAUDE_OPUS_4_1, diff --git a/examples/models/claude_4_5_sonnet_example.py b/examples/models/claude_model_example.py similarity index 59% rename from examples/models/claude_4_5_sonnet_example.py rename to examples/models/claude_model_example.py index c0d0093413..2b6ec9d258 100644 --- a/examples/models/claude_4_5_sonnet_example.py +++ b/examples/models/claude_model_example.py @@ -12,19 +12,95 @@ # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= +import os + from camel.agents import ChatAgent from camel.configs import AnthropicConfig from camel.models import ModelFactory -from camel.toolkits import FunctionTool +from camel.toolkits import FunctionTool, TerminalToolkit from camel.types import ModelPlatformType, ModelType """ -Claude Sonnet 4.5 Example +Claude Model Example Please set the environment variable: export ANTHROPIC_API_KEY="your-api-key-here" """ +# Define system message +sys_msg = "You are a helpful AI assistant" + +# Get current script directory +base_dir = os.path.dirname(os.path.abspath(__file__)) +# Define workspace directory for the toolkit +workspace_dir = os.path.join( + os.path.dirname(os.path.dirname(base_dir)), "workspace" +) + +# Set model config +tools = [ + *TerminalToolkit(working_directory=workspace_dir).get_tools(), +] +# Create Claude Opus 4.5 model +model_opus_4_5 = ModelFactory.create( + model_platform=ModelPlatformType.ANTHROPIC, + model_type=ModelType.CLAUDE_OPUS_4_5, + model_config_dict=AnthropicConfig(temperature=0.2).as_dict(), +) + +user_msg = """ +Create an interactive HTML webpage that allows users to play with a +Rubik's Cube, and saved it to local file. +""" + +camel_agent_pro = ChatAgent( + system_message=sys_msg, model=model_opus_4_5, tools=tools +) +response_pro = camel_agent_pro.step(user_msg) +print(response_pro.msgs[0].content) +''' +=============================================================================== +The interactive Rubik's Cube HTML file has been created successfully! Here's +what I built: + +## 📁 File: `rubiks_cube.html` (23KB) + +### Features: + +🎮 **Interactive 3D Cube** +- Fully rendered 3D Rubik's Cube using CSS transforms +- Drag to rotate the view (mouse or touch) +- All 6 faces visible with proper colors + +🔄 **Face Rotations** +- **F/B** - Front/Back face +- **U/D** - Up/Down face +- **L/R** - Left/Right face +- **'** versions for counter-clockwise rotations + +⚡ **Actions** +- **Scramble** - Randomly mix the cube with 20 moves +- **Reset** - Return to solved state +- **Undo** - Reverse the last move + +📐 **Net View** +- 2D unfolded view of all cube faces for easier tracking + +⌨️ **Keyboard Support** +- Press F, B, R, L, U, D keys to rotate faces +- Hold Shift for counter-clockwise + +📱 **Responsive Design** +- Works on desktop and mobile devices +- Touch support for rotating the view + +### To use: +Simply open `rubiks_cube.html` in any modern web browser! + +Process finished with exit code 0 +=============================================================================== +''' + # Create Claude Sonnet 4.5 model model = ModelFactory.create( model_platform=ModelPlatformType.ANTHROPIC, @@ -32,9 +108,6 @@ model_config_dict=AnthropicConfig(temperature=0.2).as_dict(), ) -# Define system message -sys_msg = "You are a helpful AI assistant powered by Claude Sonnet 4.5." - # Set agent camel_agent = ChatAgent(system_message=sys_msg, model=model) diff --git a/test/models/test_anthropic_model.py b/test/models/test_anthropic_model.py index a5431cfd64..4f5f43c356 100644 --- a/test/models/test_anthropic_model.py +++ b/test/models/test_anthropic_model.py @@ -34,6 +34,7 @@ ModelType.CLAUDE_3_5_HAIKU, ModelType.CLAUDE_3_7_SONNET, ModelType.CLAUDE_SONNET_4_5, + ModelType.CLAUDE_OPUS_4_5, ModelType.CLAUDE_SONNET_4, ModelType.CLAUDE_OPUS_4, ModelType.CLAUDE_OPUS_4_1, From c101ed71cfa19b739fe52665fc983125e54dcbd3 Mon Sep 17 00:00:00 2001 From: JINO ROHIT Date: Wed, 26 Nov 2025 11:01:30 +0530 Subject: [PATCH 08/10] feat: add user dep packages in terminal toolkit (#3421) Co-authored-by: Saed Bhati <105969318+Saedbhati@users.noreply.github.com> Co-authored-by: Tao Sun <168447269+fengju0213@users.noreply.github.com> --- .../terminal_toolkit/terminal_toolkit.py | 82 +++++++++++++++++++ examples/toolkits/terminal_toolkit.py | 35 ++++++++ 2 files changed, 117 insertions(+) diff --git a/camel/toolkits/terminal_toolkit/terminal_toolkit.py b/camel/toolkits/terminal_toolkit/terminal_toolkit.py index 8888001f27..4ea52a5e4c 100644 --- a/camel/toolkits/terminal_toolkit/terminal_toolkit.py +++ b/camel/toolkits/terminal_toolkit/terminal_toolkit.py @@ -84,6 +84,8 @@ class TerminalToolkit(BaseToolkit): when safe_mode is True. If None, uses default safety rules. clone_current_env (bool): Whether to clone the current Python environment for local execution. Defaults to False. + install_dependencies (List): A list of user specified libraries + to install. """ def __init__( @@ -96,6 +98,7 @@ def __init__( safe_mode: bool = True, allowed_commands: Optional[List[str]] = None, clone_current_env: bool = False, + install_dependencies: Optional[List[str]] = None, ): self.use_docker_backend = use_docker_backend self.timeout = timeout @@ -145,6 +148,7 @@ def __init__( self.cloned_env_path: Optional[str] = None self.initial_env_path: Optional[str] = None self.python_executable = sys.executable + self.install_dependencies = install_dependencies or [] self.log_dir = os.path.abspath( session_logs_dir or os.path.join(self.working_dir, "terminal_logs") @@ -228,6 +232,10 @@ def __init__( "- container is already isolated" ) + # Install dependencies + if self.install_dependencies: + self._install_dependencies() + def _setup_cloned_environment(self): r"""Set up a cloned Python environment.""" self.cloned_env_path = os.path.join(self.working_dir, ".venv") @@ -255,6 +263,80 @@ def update_callback(msg: str): "using system Python" ) + def _install_dependencies(self): + r"""Install user specified dependencies in the current environment.""" + if not self.install_dependencies: + return + + logger.info("Installing dependencies...") + + if self.use_docker_backend: + pkg_str = " ".join( + shlex.quote(p) for p in self.install_dependencies + ) + install_cmd = f'sh -lc "pip install {pkg_str}"' + + try: + exec_id = self.docker_api_client.exec_create( + self.container.id, install_cmd + )["Id"] + log = self.docker_api_client.exec_start(exec_id) + logger.info(f"Package installation output:\n{log}") + + # Check exit code to ensure installation succeeded + exec_info = self.docker_api_client.exec_inspect(exec_id) + if exec_info['ExitCode'] != 0: + error_msg = ( + f"Failed to install dependencies in Docker: " + f"{log.decode('utf-8', errors='ignore')}" + ) + logger.error(error_msg) + raise RuntimeError(error_msg) + + logger.info( + "Successfully installed all dependencies in Docker." + ) + except Exception as e: + if not isinstance(e, RuntimeError): + logger.error(f"Docker dependency installation error: {e}") + raise RuntimeError( + f"Docker dependency installation error: {e}" + ) from e + raise + + else: + pip_cmd = [ + self.python_executable, + "-m", + "pip", + "install", + "--upgrade", + *self.install_dependencies, + ] + + try: + subprocess.run( + pip_cmd, + check=True, + cwd=self.working_dir, + capture_output=True, + text=True, + timeout=300, # 5 minutes timeout for installation + ) + logger.info("Successfully installed all dependencies.") + except subprocess.CalledProcessError as e: + logger.error(f"Failed to install dependencies: {e.stderr}") + raise RuntimeError( + f"Failed to install dependencies: {e.stderr}" + ) from e + except subprocess.TimeoutExpired: + logger.error( + "Dependency installation timed out after 5 minutes" + ) + raise RuntimeError( + "Dependency installation timed out after 5 minutes" + ) + def _setup_initial_environment(self): r"""Set up an initial environment with Python 3.10.""" self.initial_env_path = os.path.join(self.working_dir, ".initial_env") diff --git a/examples/toolkits/terminal_toolkit.py b/examples/toolkits/terminal_toolkit.py index f6cfce3055..4ffc928393 100644 --- a/examples/toolkits/terminal_toolkit.py +++ b/examples/toolkits/terminal_toolkit.py @@ -281,6 +281,41 @@ =============================================================================== """ +tools = TerminalToolkit( + working_directory=workspace_dir, install_dependencies=['numpy'] +).get_tools() + +model_config_dict = ChatGPTConfig( + temperature=0.0, +).as_dict() + +model = ModelFactory.create( + model_platform=ModelPlatformType.DEFAULT, + model_type=ModelType.DEFAULT, + model_config_dict=model_config_dict, +) +camel_agent = ChatAgent( + system_message=sys_msg, + model=model, + tools=tools, +) +camel_agent.reset() + +usr_msg = "check my numpy version" + +# Get response information +response = camel_agent.step(usr_msg) +print(str(response.info['tool_calls'])[:1000]) + +""" +=============================================================================== +[ToolCallingRecord(tool_name='shell_exec', args={'id': 'check_numpy_version_1', +'command': 'python3 -c "import numpy; print(numpy.__version__)"', +'block': True}, result='2.2.6', tool_call_id='call_UuL6YGIMv7I4GSOjA8es65aW', +images=None)] +=============================================================================== +""" + # ------Docker backend------ tools = TerminalToolkit( use_docker_backend=True, From 82379493de12a2a9f3ce8b82594dbf68ad729be2 Mon Sep 17 00:00:00 2001 From: camel-docs-bot Date: Wed, 26 Nov 2025 05:32:25 +0000 Subject: [PATCH 09/10] Auto-update documentation after merge [skip ci] --- ....toolkits.terminal_toolkit.terminal_toolkit.mdx | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/mintlify/reference/camel.toolkits.terminal_toolkit.terminal_toolkit.mdx b/docs/mintlify/reference/camel.toolkits.terminal_toolkit.terminal_toolkit.mdx index eb308f0fcf..2b18a14734 100644 --- a/docs/mintlify/reference/camel.toolkits.terminal_toolkit.terminal_toolkit.mdx +++ b/docs/mintlify/reference/camel.toolkits.terminal_toolkit.terminal_toolkit.mdx @@ -31,6 +31,7 @@ in either a local or a sandboxed Docker environment. - **safe_mode** (bool): Whether to apply security checks to commands. Defaults to True. - **allowed_commands** (Optional[List[str]]): List of allowed commands when safe_mode is True. If None, uses default safety rules. - **clone_current_env** (bool): Whether to clone the current Python environment for local execution. Defaults to False. +- **install_dependencies** (List): A list of user specified libraries to install. @@ -46,7 +47,8 @@ def __init__( session_logs_dir: Optional[str] = None, safe_mode: bool = True, allowed_commands: Optional[List[str]] = None, - clone_current_env: bool = False + clone_current_env: bool = False, + install_dependencies: Optional[List[str]] = None ): ``` @@ -60,6 +62,16 @@ def _setup_cloned_environment(self): Set up a cloned Python environment. + + +### _install_dependencies + +```python +def _install_dependencies(self): +``` + +Install user specified dependencies in the current environment. + ### _setup_initial_environment From 41ceeeb42475818223ba080f19d3c800a40636a9 Mon Sep 17 00:00:00 2001 From: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com> Date: Wed, 26 Nov 2025 13:48:16 +0800 Subject: [PATCH 10/10] chore: support wider openai version (#3456) --- pyproject.toml | 2 +- uv.lock | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index deab785315..a7d30307bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "httpx>=0.28.0,<1.0.0dev", "psutil>=5.9.8,<6", "pillow>=10.1.0,<11.0.0", - "openai>=1.86.0,<2", + "openai>=1.86.0", "websockets>=13.0,<15.1", "astor>=0.8.1", ] diff --git a/uv.lock b/uv.lock index 678b993255..84ab8d6062 100644 --- a/uv.lock +++ b/uv.lock @@ -1398,7 +1398,7 @@ requires-dist = [ { name = "onnxruntime", marker = "extra == 'document-tools'", specifier = "<=1.19.2" }, { name = "onnxruntime", marker = "extra == 'eigent'", specifier = "<=1.19.2" }, { name = "onnxruntime", marker = "extra == 'owl'", specifier = "<=1.19.2" }, - { name = "openai", specifier = ">=1.86.0,<2" }, + { name = "openai", specifier = ">=1.86.0" }, { name = "openapi-spec-validator", marker = "extra == 'all'", specifier = ">=0.7.1,<0.8" }, { name = "openapi-spec-validator", marker = "extra == 'document-tools'", specifier = ">=0.7.1,<0.8" }, { name = "openapi-spec-validator", marker = "extra == 'owl'", specifier = ">=0.7.1,<0.8" }, @@ -6676,7 +6676,7 @@ name = "pexpect" version = "4.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "ptyprocess" }, + { name = "ptyprocess", marker = "sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/42/92/cc564bf6381ff43ce1f4d06852fc19a2f11d180f23dc32d9588bee2f149d/pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f", size = 166450, upload-time = "2023-11-25T09:07:26.339Z" } wheels = [ @@ -7638,7 +7638,7 @@ name = "pyobjc-framework-cocoa" version = "12.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "pyobjc-core" }, + { name = "pyobjc-core", marker = "sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/02/a3/16ca9a15e77c061a9250afbae2eae26f2e1579eb8ca9462ae2d2c71e1169/pyobjc_framework_cocoa-12.1.tar.gz", hash = "sha256:5556c87db95711b985d5efdaaf01c917ddd41d148b1e52a0c66b1a2e2c5c1640", size = 2772191, upload-time = "2025-11-14T10:13:02.069Z" } wheels = [ @@ -7656,8 +7656,8 @@ name = "pyobjc-framework-quartz" version = "12.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "pyobjc-core" }, - { name = "pyobjc-framework-cocoa" }, + { name = "pyobjc-core", marker = "sys_platform != 'win32'" }, + { name = "pyobjc-framework-cocoa", marker = "sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/94/18/cc59f3d4355c9456fc945eae7fe8797003c4da99212dd531ad1b0de8a0c6/pyobjc_framework_quartz-12.1.tar.gz", hash = "sha256:27f782f3513ac88ec9b6c82d9767eef95a5cf4175ce88a1e5a65875fee799608", size = 3159099, upload-time = "2025-11-14T10:21:24.31Z" } wheels = [