Skip to content

Add async execution mode for sync and async tool chains #80

@dgenio

Description

@dgenio

Context / Problem

All flow execution is synchronous. Many MCP tools involve I/O operations (HTTP requests, database queries, file operations) where the Python process blocks waiting for responses. For sequential flows this is tolerable, but:

  1. It prevents future parallel execution of independent DAG steps (Design and implement DAG-based flow model with topological execution #10)
  2. It makes ChainWeaver incompatible with async MCP clients (the MCP Python SDK is async-first)
  3. I/O-bound tool chains unnecessarily serialize waiting time

Since the MCP Python SDK uses asyncio natively, async support is a prerequisite for real MCP integration (#70, #72).

Proposal

Add async execution support alongside the existing sync API:

# Async tool definition
async_tool = Tool(
    name="fetch",
    description="Fetches data from URL.",
    input_schema=FetchInput,
    output_schema=FetchOutput,
    fn=async_fetch_fn,  # async def async_fetch_fn(inp) -> dict
)

# Async execution
result = await executor.execute_flow_async("my_flow", {"url": "..."})

Core capabilities:

  1. Tool supports async fn — detect whether fn is a coroutine and handle accordingly
  2. FlowExecutor.execute_flow_async() — async version of execute_flow()
  3. Mixed sync/async — flows can contain both sync and async tools; the executor wraps sync tools in asyncio.to_thread() or calls them directly
  4. Backward compatibleexecute_flow() (sync) remains unchanged and works for sync-only tools
  5. Foundation for parallel execution — async mode is a prerequisite for running independent DAG branches concurrently

Acceptance Criteria

  • Tool.fn accepts both sync and async callables
  • Tool.is_async property indicates whether the tool's callable is async
  • FlowExecutor.execute_flow_async() exists and returns ExecutionResult
  • Async tools are awaited; sync tools are called normally (or wrapped in to_thread())
  • Mixed flows (sync + async tools) work correctly
  • execute_flow() (sync) continues to work unchanged for all existing flows
  • ExecutionResult structure is identical for sync and async execution
  • Tests cover: all-async flow, all-sync flow, mixed, error handling in async tools
  • No new required dependencies (uses stdlib asyncio)

Implementation Notes

  • Use inspect.iscoroutinefunction() to detect async callables
  • execute_flow_async() should mirror the structure of execute_flow() but use await
  • For sync tools in async context: either call directly (if they're fast/CPU-bound) or use asyncio.to_thread() (if they might block)
  • Consider an AsyncFlowExecutor subclass vs. adding execute_flow_async() to existing FlowExecutor
  • The existing _execute_step() helper can be split into _execute_step_sync() and _execute_step_async()

Dependencies

Tasks

  • Add async detection to Tool class
  • Implement FlowExecutor.execute_flow_async() method
  • Implement _execute_step_async() helper
  • Handle mixed sync/async tool chains
  • Add unit tests in tests/test_async_execution.py
  • Add example: examples/async_flow.py
  • Update README with async usage section

Metadata

Metadata

Assignees

No one assigned

    Labels

    ai-friendlyDesigned for AI-assisted implementationarea:executorFlow execution enginecomplexity:complexSignificant effort, design review neededpriority:highMust address first within the milestonesize:LLarge effort (3-5 days)type:featureNew feature or capability

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions