From 037c4d582518b507caa3e2a55f9b0f95be7d6f26 Mon Sep 17 00:00:00 2001 From: Zakariye Mohamed Date: Thu, 12 Mar 2026 13:00:50 -0400 Subject: [PATCH] feat: add capability-based task routing Closes #1 Adds Task and Agent classes with capability metadata. TaskRouter now matches tasks to agents by required capabilities - claim_task() iterates the queue and returns the first task the agent can handle. Falls back to FIFO for unregistered agents (backward compatible). --- core/task_router.py | 84 ++++++++++++++++++++++++++++++++++++++++++--- examples/demo.py | 21 ++++++++---- 2 files changed, 94 insertions(+), 11 deletions(-) diff --git a/core/task_router.py b/core/task_router.py index a9ba048..6bff1ce 100644 --- a/core/task_router.py +++ b/core/task_router.py @@ -1,12 +1,86 @@ +class Task: + """A task with optional capability requirements.""" + + def __init__(self, name: str, required_capabilities: list[str] | None = None): + self.name = name + self.required_capabilities = required_capabilities or [] + + def __repr__(self): + caps = ", ".join(self.required_capabilities) if self.required_capabilities else "any" + return f"Task({self.name\!r}, requires={caps})" + + +class Agent: + """An agent with a defined set of capabilities.""" + + def __init__(self, name: str, capabilities: list[str] | None = None): + self.name = name + self.capabilities = set(capabilities or []) + + def can_handle(self, task: Task) -> bool: + """Return True if this agent has all capabilities required by the task.""" + return all(cap in self.capabilities for cap in task.required_capabilities) + + def __repr__(self): + return f"Agent({self.name\!r}, capabilities={sorted(self.capabilities)})" + + class TaskRouter: + """Routes tasks to agents based on capability matching.""" + def __init__(self): - self.tasks = [] + self.tasks: list[Task] = [] + self.agents: dict[str, Agent] = {} - def submit_task(self, task): + def register_agent(self, agent: Agent) -> None: + """Register an agent with its capabilities.""" + self.agents[agent.name] = agent + + def submit_task(self, task: str | Task, required_capabilities: list[str] | None = None) -> None: + """Submit a task to the queue. + + Parameters + ---------- + task : str | Task + Task name string or a Task object. + required_capabilities : list[str] | None + Capabilities required to handle the task. Ignored if task is a Task object. + """ + if isinstance(task, str): + task = Task(task, required_capabilities) self.tasks.append(task) - def claim_task(self, agent_name): + def claim_task(self, agent_name: str) -> dict | None: + """Claim the first task this agent is capable of handling. + + Iterates through the task queue in order and returns the first task + whose required capabilities are all present in the agent's capability set. + Falls back to the first available task if the agent has no capabilities registered. + + Parameters + ---------- + agent_name : str + Name of the claiming agent. + + Returns + ------- + dict | None + A dict with keys 'agent' and 'task', or None if no suitable task exists. + """ if not self.tasks: return None - task = self.tasks.pop(0) - return {"agent": agent_name, "task": task} + + agent = self.agents.get(agent_name) + + if agent is None: + # Unknown agent: fall back to FIFO (original behaviour) + task = self.tasks.pop(0) + return {"agent": agent_name, "task": task} + + # Find first task this agent can handle + for i, task in enumerate(self.tasks): + if agent.can_handle(task): + self.tasks.pop(i) + return {"agent": agent_name, "task": task} + + return None # No suitable task found diff --git a/examples/demo.py b/examples/demo.py index 9d27895..e5efa0f 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -1,10 +1,19 @@ -from core.task_router import TaskRouter +from core.task_router import Agent, Task, TaskRouter router = TaskRouter() -router.submit_task("analyze_market_data") -router.submit_task("generate_report") +# Register agents with their capabilities +router.register_agent(Agent("analysis_agent", capabilities=["data_analysis", "statistics"])) +router.register_agent(Agent("report_agent", capabilities=["reporting", "pdf_generation"])) +router.register_agent(Agent("general_agent", capabilities=[])) -print("Agent claim 1:", router.claim_task("analysis_agent")) -print("Agent claim 2:", router.claim_task("report_agent")) -print("Agent claim 3:", router.claim_task("idle_agent")) +# Submit tasks with capability requirements +router.submit_task(Task("analyze_market_data", required_capabilities=["data_analysis"])) +router.submit_task(Task("generate_report", required_capabilities=["reporting"])) +router.submit_task(Task("send_notification")) # No specific capability required + +# Agents claim tasks matching their capabilities +print("Claim 1:", router.claim_task("analysis_agent")) # gets analyze_market_data +print("Claim 2:", router.claim_task("report_agent")) # gets generate_report +print("Claim 3:", router.claim_task("general_agent")) # gets send_notification +print("Claim 4:", router.claim_task("analysis_agent")) # None - queue empty