Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 224 additions & 9 deletions agentflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class RunOutputFormat(StrEnum):
JSON = "json"
JSON_SUMMARY = "json-summary"
SUMMARY = "summary"
TEXT = "text"


class SmokePreflightMode(StrEnum):
Expand Down Expand Up @@ -506,6 +507,155 @@ def _run_pipeline_path(path: str, runs_dir: str, max_concurrent_runs: int, outpu
_run_pipeline(_load_pipeline(path), runs_dir, max_concurrent_runs, output)


# ---------------------------------------------------------------------------
# Inline pipeline loading helpers
# ---------------------------------------------------------------------------


def _load_inline_json(text: str) -> object:
from agentflow.loader import load_pipeline_from_text

try:
return load_pipeline_from_text(text, base_dir=Path.cwd())
except (json.JSONDecodeError, ValidationError, ValueError) as exc:
typer.echo(f"Failed to parse inline pipeline JSON:\n{exc}", err=True)
raise typer.Exit(code=1) from exc


def _load_inline_python(code: str) -> object:
from agentflow.loader import load_pipeline_from_text

result = subprocess.run(
[sys.executable, "-c", code],
capture_output=True,
text=True,
cwd=str(Path.cwd()),
)
if result.returncode != 0:
typer.echo(f"Inline Python expression failed:\n{result.stderr.strip()}", err=True)
raise typer.Exit(code=1)
stdout = result.stdout.strip()
if not stdout:
typer.echo("Inline Python expression produced no output on stdout.", err=True)
raise typer.Exit(code=1)
try:
return load_pipeline_from_text(stdout, base_dir=Path.cwd())
except (json.JSONDecodeError, ValidationError, ValueError) as exc:
typer.echo(f"Failed to parse pipeline from inline Python output:\n{exc}", err=True)
raise typer.Exit(code=1) from exc


def _load_inline_expression(expression: str) -> object:
stripped = expression.strip()
if stripped.startswith("{"):
return _load_inline_json(stripped)
return _load_inline_python(stripped)


def _load_from_stdin() -> object:
if sys.stdin.isatty():
typer.echo("Reading pipeline from stdin... (Ctrl+D to end)", err=True)
data = sys.stdin.read()
if not data.strip():
typer.echo("No input received on stdin.", err=True)
raise typer.Exit(code=1)
return _load_inline_expression(data)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run - is auto-detecting stdin as JSON-or-Python and will execute any non-JSON input with python -c. That is a dangerous default for a stdin mode, especially because the new docs explicitly recommend piping remote content with curl ... | agentflow run -.

I reproduced this by piping a short Python snippet into agentflow run - that wrote a marker file and then printed valid pipeline JSON; the marker file was created before the pipeline ran. In other words, a non-JSON stdin payload is not treated as data here, it is executed as code. Please require an explicit opt-in for Python-on-stdin instead of auto-executing everything that does not start with {.



# ---------------------------------------------------------------------------
# Exec command helpers
# ---------------------------------------------------------------------------


def _parse_env_options(env_list: list[str] | None) -> dict[str, str]:
if not env_list:
return {}
parsed: dict[str, str] = {}
for item in env_list:
if "=" not in item:
raise typer.BadParameter(f"Invalid env format: {item!r}. Expected KEY=value.")
key, _, value = item.partition("=")
parsed[key] = value
return parsed


def _build_exec_pipeline(
agent: str,
prompt: str,
*,
model: str | None = None,
tools: str = "read_only",
timeout: int = 1800,
env: dict[str, str] | None = None,
provider: str | None = None,
extra_args: list[str] | None = None,
) -> object:
from agentflow.specs import AgentKind, NodeSpec, PipelineSpec, ToolAccess

try:
agent_kind = AgentKind(agent)
except ValueError:
valid = ", ".join(e.value for e in AgentKind)
raise typer.BadParameter(f"Unknown agent: {agent!r}. Valid agents: {valid}.")

try:
tool_access = ToolAccess(tools)
except ValueError:
raise typer.BadParameter(f"Invalid tools value: {tools!r}. Use 'read_only' or 'read_write'.")

node_kwargs: dict[str, object] = {
"id": "exec",
"agent": agent_kind,
"prompt": prompt,
"tools": tool_access,
"timeout_seconds": timeout,
}
if model is not None:
node_kwargs["model"] = model
if env:
node_kwargs["env"] = env
if provider is not None:
node_kwargs["provider"] = provider
if extra_args:
node_kwargs["extra_args"] = extra_args

return PipelineSpec.model_validate({
"name": f"exec-{agent}",
"nodes": [node_kwargs],
})


def _resolve_exec_output(output: RunOutputFormat) -> RunOutputFormat:
if output != RunOutputFormat.AUTO:
return output
if _stream_supports_tty_summary(err=False):
return RunOutputFormat.TEXT
return RunOutputFormat.JSON


def _echo_exec_result(record: object, *, output: RunOutputFormat, run_dir: object = None) -> None:
resolved = _resolve_exec_output(output)
if resolved == RunOutputFormat.TEXT:
node = getattr(record, "nodes", {}).get("exec")
if node is None:
raise typer.Exit(code=1)
status = _status_value(getattr(node, "status", ""))
if status == "completed":
text = getattr(node, "final_response", None) or getattr(node, "output", None) or ""
typer.echo(text)
else:
# On failure, print whatever output we have, then stderr
text = getattr(node, "final_response", None) or getattr(node, "output", None) or ""
if text:
typer.echo(text, err=True)
for line in getattr(node, "stderr_lines", []) or []:
if isinstance(line, str) and line.strip():
typer.echo(line, err=True)
return
# For json/json-summary/summary, delegate to the standard run output
_echo_run_result(record, output=output, run_dir=run_dir)


def _doctor_report():
return build_local_smoke_doctor_report()

Expand Down Expand Up @@ -2157,9 +2307,63 @@ def inspect(
_echo_inspection(report, output=output)


@app.command("exec")
def exec_command(
agent: str = typer.Argument(help="Agent to run (codex, claude, kimi, gemini, shell, python)."),
prompt: str = typer.Argument(help="Prompt to send to the agent."),
model: str | None = typer.Option(None, "--model", "-m", help="Model override."),
tools: str = typer.Option("read_only", "--tools", "-t", help="Tool access: read_only or read_write."),
timeout: int = typer.Option(1800, "--timeout", help="Timeout in seconds."),
env: list[str] | None = typer.Option(None, "--env", help="Environment variables as KEY=value."),
provider: str | None = typer.Option(None, "--provider", help="Provider alias or config."),
extra_arg: list[str] | None = typer.Option(None, "--extra-arg", help="Extra CLI arguments for the agent."),
runs_dir: str = typer.Option(".agentflow/runs", envvar="AGENTFLOW_RUNS_DIR"),
max_concurrent_runs: int = typer.Option(2, envvar="AGENTFLOW_MAX_CONCURRENT_RUNS"),
output: RunOutputFormat = typer.Option(
RunOutputFormat.AUTO,
"--output",
help="Output format. Defaults to raw text on a terminal and json otherwise.",
),
) -> None:
"""Run a single agent with a prompt and print the response.

Examples:

agentflow exec gemini "What's trending on GitHub?"

agentflow exec claude "Review this code" --tools read_only

agentflow exec codex "Fix the test" --tools read_write -m gpt-4.1

agentflow exec shell "ls -la" --output text
"""
if not prompt.strip():
raise typer.BadParameter("Prompt cannot be empty.")

parsed_env = _parse_env_options(env)
pipeline = _build_exec_pipeline(
agent, prompt,
model=model, tools=tools, timeout=timeout,
env=parsed_env, provider=provider, extra_args=extra_arg,
)

store, orchestrator = _build_runtime(runs_dir, max_concurrent_runs)

async def _run() -> None:
run_record = await orchestrator.submit(pipeline)
completed = await orchestrator.wait(run_record.id, timeout=None)
run_dir = store.run_dir(run_record.id) if hasattr(store, "run_dir") else None
_echo_exec_result(completed, output=output, run_dir=run_dir)
status = _status_value(completed.status)
raise typer.Exit(code=0 if status == "completed" else 1)

asyncio.run(_run())


@app.command()
def run(
path: str,
path: str | None = typer.Argument(None, help="Pipeline file path, or '-' for stdin."),
expression: str | None = typer.Option(None, "-e", "--expression", help="Inline pipeline JSON or Python expression."),
runs_dir: str = typer.Option(".agentflow/runs", envvar="AGENTFLOW_RUNS_DIR"),
max_concurrent_runs: int = typer.Option(2, envvar="AGENTFLOW_MAX_CONCURRENT_RUNS"),
output: RunOutputFormat = typer.Option(
Expand All @@ -2178,14 +2382,25 @@ def run(
help="Print a successful local preflight summary to stderr when preflight runs.",
),
) -> None:
pipeline = _load_pipeline_with_optional_smoke_preflight(
path,
path,
preflight,
output,
show_preflight=show_preflight,
)
_run_pipeline(pipeline, runs_dir, max_concurrent_runs, output)
if expression is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new inline/stdin run paths bypass preflight entirely. Only the file-path branch goes through _load_pipeline_with_optional_smoke_preflight(), so run -e ... --preflight always and run - --preflight always skip the existing doctor/preflight gate and jump straight to execution.

I reproduced this with the same Kimi-bootstrapped Claude pipeline in both forms. agentflow run /tmp/pipeline.json --preflight always ... failed immediately with the expected preflight report, but agentflow run -e '<same pipeline>' --preflight always ... skipped preflight and actually launched the run. That changes the safety contract of run in a way that can hide missing local prerequisites until runtime.

if path is not None:
raise typer.BadParameter("Use either a file path or -e/--expression, not both.")
pipeline = _load_inline_expression(expression)
elif path == "-":
pipeline = _load_from_stdin()
elif path is not None:
pipeline = _load_pipeline_with_optional_smoke_preflight(
path,
path,
preflight,
output,
show_preflight=show_preflight,
)
else:
raise typer.BadParameter("Provide a pipeline file path, use '-' for stdin, or pass -e/--expression.")
# For TEXT output (only meaningful for exec), fall back to SUMMARY for run
effective_output = output if output != RunOutputFormat.TEXT else RunOutputFormat.SUMMARY
_run_pipeline(pipeline, runs_dir, max_concurrent_runs, effective_output)


@app.command()
Expand Down
52 changes: 45 additions & 7 deletions skills/agentflow/SKILL.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
---
name: agentflow
description: Build and run multi-agent pipelines using AgentFlow. Use when the user wants to orchestrate codex, claude, or kimi agents in parallel, in sequence, or in iterative loops. Trigger when the user mentions multi-agent workflows, fan-out tasks, code review pipelines, iterative implementation loops, running agents on EC2/ECS, or any task that needs multiple AI agents coordinated together. Also trigger for "agentflow", "pipeline", "graph of agents", "fanout", "shard", or "run codex on remote".
description: Run AI agents and build multi-agent pipelines using AgentFlow. Use when the user wants to call codex, claude, kimi, or gemini agents — either as a single one-off call or orchestrated in parallel, in sequence, or in iterative loops. Trigger when the user mentions running an agent, multi-agent workflows, fan-out tasks, code review pipelines, iterative implementation loops, running agents on EC2/ECS, or any task that needs AI agents coordinated together. Also trigger for "agentflow", "pipeline", "graph of agents", "fanout", "shard", "run codex on remote", "exec agent", or "call gemini/claude/codex". For details on exec and inline execution, read references/exec.md.
---

# AgentFlow

Build multi-agent pipelines where codex, claude, and kimi work together in dependency graphs with parallel fanout, iterative cycles, and remote execution.
Run AI agents directly or build multi-agent pipelines where codex, claude, kimi, and gemini work together in dependency graphs with parallel fanout, iterative cycles, and remote execution.

## Quick Start
## Quick Start: Single Agent Call

The fastest way to run an agent — no files needed:

```bash
agentflow exec gemini "What's trending on GitHub?" --model gemini-3-pro-preview
agentflow exec claude "Explain this codebase" --tools read_only
agentflow exec codex "Fix the failing test" --tools read_write
agentflow exec shell "ls -la"
```

`exec` prints the agent's response directly. Use `--output text` to force raw text, `--output json` for structured output. See `references/exec.md` for all options.

## Quick Start: Pipeline

For multi-step workflows, define a pipeline:

```python
from agentflow import Graph, codex, claude
Expand All @@ -23,17 +38,27 @@ print(g.to_json())

Run: `agentflow run pipeline.py`

Or inline without a file:

```bash
# Inline JSON
agentflow run -e '{"name":"review","nodes":[{"id":"a","agent":"codex","prompt":"Plan the work"},{"id":"b","agent":"claude","prompt":"Implement: {{ nodes.a.output }}","depends_on":["a"]}]}'

# From stdin
python3 pipeline.py | agentflow run -
```

## Imports

```python
from agentflow import Graph, codex, claude, kimi # agents
from agentflow import Graph, codex, claude, kimi, gemini # agents
from agentflow import fanout, merge # parallel shards
from agentflow import shell, python_node, sync # utility nodes
```

## Nodes

Create agent nodes with `codex()`, `claude()`, or `kimi()`. Required: `task_id`, `prompt`.
Create agent nodes with `codex()`, `claude()`, `kimi()`, or `gemini()`. Required: `task_id`, `prompt`.

```python
codex(
Expand Down Expand Up @@ -245,10 +270,23 @@ Graph("name",
## CLI

```bash
agentflow run pipeline.py # run pipeline
agentflow run pipeline.py --output summary
# Single agent call (no file needed)
agentflow exec <agent> "<prompt>" [--model X] [--tools read_only|read_write] [--output text|json]

# Run pipeline from file
agentflow run pipeline.py
agentflow run pipeline.json --output summary

# Run pipeline inline (no file needed)
agentflow run -e '{"name":"q","nodes":[...]}' # inline JSON
agentflow run -e 'from agentflow import ...; print(...)' # inline Python
echo '{"name":"q","nodes":[...]}' | agentflow run - # from stdin

# Inspect and validate
agentflow inspect pipeline.py # show graph structure
agentflow validate pipeline.py # check without running
agentflow templates # list starter templates
agentflow init > pipeline.py # scaffold starter
```

For the full exec reference (all flags, output formats, env vars, examples), read `references/exec.md`.
Loading