Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions core/task_router.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,86 @@
class Task:
"""A task with optional capability requirements."""

def __init__(self, name: str, required_capabilities: list[str] | None = None):
self.name = name
self.required_capabilities = required_capabilities or []
Comment on lines +4 to +6
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

README advertises Python 3.9+, but these type hints use PEP 604 unions (list[str] | None), which require Python 3.10+. To keep 3.9 compatibility, switch to Optional[...] / Union[...] (or bump the documented minimum Python version).

Copilot uses AI. Check for mistakes.

def __repr__(self):
caps = ", ".join(self.required_capabilities) if self.required_capabilities else "any"
return f"Task({self.name\!r}, requires={caps})"
Comment on lines +8 to +10
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task.__repr__ uses {self.name\!r} (and similar in Agent.__repr__), which is invalid f-string syntax and will raise a SyntaxError. Remove the backslash and use the standard repr conversion {self.name!r} so the module can import/run.

Copilot uses AI. Check for mistakes.


class Agent:
"""An agent with a defined set of capabilities."""

def __init__(self, name: str, capabilities: list[str] | None = None):
self.name = name
self.capabilities = set(capabilities or [])

def can_handle(self, task: Task) -> bool:
"""Return True if this agent has all capabilities required by the task."""
return all(cap in self.capabilities for cap in task.required_capabilities)

def __repr__(self):
return f"Agent({self.name\!r}, capabilities={sorted(self.capabilities)})"
Comment on lines +24 to +25
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agent.__repr__ uses {self.name\!r}, which is invalid f-string syntax and will raise a SyntaxError. Remove the backslash and use {self.name!r}.

Copilot uses AI. Check for mistakes.


class TaskRouter:
"""Routes tasks to agents based on capability matching."""

def __init__(self):
self.tasks = []
self.tasks: list[Task] = []
self.agents: dict[str, Agent] = {}

def submit_task(self, task):
def register_agent(self, agent: Agent) -> None:
"""Register an agent with its capabilities."""
self.agents[agent.name] = agent

def submit_task(self, task: str | Task, required_capabilities: list[str] | None = None) -> None:
"""Submit a task to the queue.

Parameters
----------
task : str | Task
Task name string or a Task object.
required_capabilities : list[str] | None
Capabilities required to handle the task. Ignored if task is a Task object.
"""
if isinstance(task, str):
task = Task(task, required_capabilities)
self.tasks.append(task)

def claim_task(self, agent_name):
def claim_task(self, agent_name: str) -> dict | None:
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return annotation uses dict | None, which requires Python 3.10+. If TaskMesh targets Python 3.9+ (per README), use Optional[dict[...]] (and import Optional from typing) or update the documented Python requirement.

Copilot uses AI. Check for mistakes.
"""Claim the first task this agent is capable of handling.

Iterates through the task queue in order and returns the first task
whose required capabilities are all present in the agent's capability set.
Falls back to the first available task if the agent has no capabilities registered.
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring says routing “falls back to the first available task if the agent has no capabilities registered,” but the implementation only falls back when the agent is not registered (agent is None). For registered agents with an empty capability set, the router will only match tasks with no required capabilities. Please update the docstring (or adjust logic) so behavior and docs align.

Suggested change
Falls back to the first available task if the agent has no capabilities registered.
Falls back to the first available task if the agent is not registered.

Copilot uses AI. Check for mistakes.

Parameters
----------
agent_name : str
Name of the claiming agent.

Returns
-------
dict | None
A dict with keys 'agent' and 'task', or None if no suitable task exists.
"""
if not self.tasks:
return None
task = self.tasks.pop(0)
return {"agent": agent_name, "task": task}

agent = self.agents.get(agent_name)

if agent is None:
# Unknown agent: fall back to FIFO (original behaviour)
task = self.tasks.pop(0)
return {"agent": agent_name, "task": task}

# Find first task this agent can handle
for i, task in enumerate(self.tasks):
if agent.can_handle(task):
self.tasks.pop(i)
return {"agent": agent_name, "task": task}

return None # No suitable task found
21 changes: 15 additions & 6 deletions examples/demo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from core.task_router import TaskRouter
from core.task_router import Agent, Task, TaskRouter

router = TaskRouter()

router.submit_task("analyze_market_data")
router.submit_task("generate_report")
# Register agents with their capabilities
router.register_agent(Agent("analysis_agent", capabilities=["data_analysis", "statistics"]))
router.register_agent(Agent("report_agent", capabilities=["reporting", "pdf_generation"]))
router.register_agent(Agent("general_agent", capabilities=[]))

print("Agent claim 1:", router.claim_task("analysis_agent"))
print("Agent claim 2:", router.claim_task("report_agent"))
print("Agent claim 3:", router.claim_task("idle_agent"))
# Submit tasks with capability requirements
router.submit_task(Task("analyze_market_data", required_capabilities=["data_analysis"]))
router.submit_task(Task("generate_report", required_capabilities=["reporting"]))
router.submit_task(Task("send_notification")) # No specific capability required

# Agents claim tasks matching their capabilities
print("Claim 1:", router.claim_task("analysis_agent")) # gets analyze_market_data
print("Claim 2:", router.claim_task("report_agent")) # gets generate_report
print("Claim 3:", router.claim_task("general_agent")) # gets send_notification
print("Claim 4:", router.claim_task("analysis_agent")) # None - queue empty
Loading