Skip to content

Latest commit

 

History

History
300 lines (233 loc) · 8.83 KB

File metadata and controls

300 lines (233 loc) · 8.83 KB

a3s-flow

Workflow engine for agentic platforms — Execute JSON-defined DAGs with concurrent wave scheduling, pluggable node types, and full lifecycle control.

License: MIT Crates.io Coverage

Quick start

# Cargo.toml
[dependencies]
a3s-flow = "0.3"
tokio = { version = "1", features = ["full"] }
serde_json = "1"
use a3s_flow::{FlowEngine, NodeRegistry};
use serde_json::json;
use std::collections::HashMap;

#[tokio::main]
async fn main() -> a3s_flow::Result<()> {
    let engine = FlowEngine::new(NodeRegistry::with_defaults());

    let definition = json!({
        "nodes": [
            { "id": "fetch", "type": "http-request",
              "data": { "url": "https://api.example.com", "method": "GET" } },
            { "id": "process", "type": "code",
              "data": { "language": "rhai", "code": "#{ status: inputs.fetch.status }" } }
        ],
        "edges": [{ "source": "fetch", "target": "process" }]
    });

    let id = engine.start(&definition, HashMap::new()).await?;
    let state = engine.state(id).await?;
    println!("{state:?}");
    Ok(())
}

Why a3s-flow?

Feature Detail
JSON-native Workflows are plain JSON — no YAML, no DSL
Correct by construction Cycle detection and reference validation at parse time
Concurrent by default Nodes with no mutual dependency run in the same wave
Full lifecycle control Pause at wave boundaries, resume, or cancel mid-execution
Dify-compatible Built-in nodes match Dify's node types and config schema
Extend without forking Implement Node trait to add any node type

Built-in nodes

Type Description
"noop" Pass all upstream inputs through unchanged
"start" Declare typed flow inputs with optional defaults
"end" Collect outputs via JSON pointer paths
"http-request" HTTP GET/POST/PUT/DELETE/PATCH
"if-else" Multi-case conditional routing
"template-transform" Jinja2 string rendering
"variable-aggregator" First non-null fan-in
"code" Sandboxed Rhai script
"csv-parse" Parse CSV text into JSON array
"iteration" Loop sub-flow over array (parallel or sequential)
"sub-flow" Execute a named flow as an inline step
"llm" OpenAI-compatible chat completion
"question-classifier" LLM-powered intent classification
"assign" Write key-value pairs into variable scope
"context-get" Read keys from shared execution context
"context-set" Write key-value pairs into shared context
"parameter-extractor" LLM-powered structured extraction
"loop" While-loop with break condition
"list-operator" Filter / sort / deduplicate / limit JSON array

Flow definition format

{
  "nodes": [
    { "id": "a", "type": "http-request", "data": { "url": "..." } },
    { "id": "b", "type": "if-else",      "data": { "cases": [...] } }
  ],
  "edges": [
    { "source": "a", "target": "b" }
  ]
}

run_if guard — conditionally skip a node:

{ "data": { "run_if": { "from": "a", "path": "status", "op": "eq", "value": 200 } } }

Operators: eq, ne, gt, lt, gte, lte, contains


Progressive API

Six levels — adopt only what you need:

// L0 — discovery
engine.node_types();
engine.node_descriptors();

// L1 — pre-flight validation (zero-cost, no execution)
engine.validate(&definition);

// L2 — fire and forget
let id = engine.start(&definition, variables).await?;

// L3 — streaming events
let (id, mut rx) = engine.start_streaming(&definition, variables).await?;
while let Ok(event) = rx.recv().await { /* FlowEvent */ }

// L4 — runtime control
engine.pause(id).await?;
engine.resume(id).await?;
engine.terminate(id).await?;

// L5 — shared context (human-in-the-loop, side-channel enrichment)
engine.set_context_entry(id, "approval".into(), json!("granted")).await?;

// L6 — named flows
engine.with_flow_store(flow_store);
engine.start_named("daily-briefing", variables).await?;

Lifecycle state machine

                     start()
  ┌──────────────────────────────────────┐
  │              Running                  │
  └──────┬───────────────────────────────┘
         │ pause()
         ▼
  ┌──────────────────────┐
  │       Paused         │◄─── resume()
  └──────┬───────────────┘
         │ terminate() / node error / all done
         ▼
  ┌──────────────────────────────────────┐
  │  Completed │ Failed │ Terminated     │
  └──────────────────────────────────────┘

Adding a custom node

use a3s_flow::{ExecContext, FlowError, Node, NodeRegistry};
use async_trait::async_trait;
use serde_json::Value;
use std::sync::Arc;

struct MyNode;
#[async_trait]
impl Node for MyNode {
    fn node_type(&self) -> &str { "my-node" }
    async fn execute(&self, ctx: ExecContext) -> Result<Value, FlowError> {
        // ... your logic
        Ok(json!({ "result": "done" }))
    }
}

let mut registry = NodeRegistry::with_defaults();
registry.register(Arc::new(MyNode));
let engine = FlowEngine::new(registry);

Architecture

DagGraph           NodeRegistry           FlowRunner
────────           ───────────           ──────────
parse JSON    →   type string → Node   execute(ExecContext)
validate DAG       with_defaults()      wave-based concurrency
topo sort                              JoinSet per wave

ExecContext passed to every node:

Field Content
data Node config from flow definition
inputs Upstream node outputs (by node ID)
variables Global flow variables
context Shared mutable KV store across nodes
registry Node type registry (for sub-flows)
flow_store Named flow storage (optional)

Error handling

#[derive(Debug, thiserror::Error)]
pub enum FlowError {
    #[error("invalid definition: {0}")]
    InvalidDefinition(String),

    #[error("cyclic graph")]
    CyclicGraph,

    #[error("unknown node: {0}")]
    UnknownNode(String),

    #[error("node '{node_id}' failed: {reason}")]
    NodeFailed { node_id: String, reason: String },

    #[error("execution not found: {0}")]
    ExecutionNotFound(Uuid),

    #[error("invalid transition: cannot {action} a {from} execution")]
    InvalidTransition { action: String, from: String },

    #[error("execution was terminated")]
    Terminated,

    #[error(transparent)]
    Json(#[from] serde_json::Error),

    #[error("internal: {0}")]
    Internal(String),
}

Reliability

Feature Config
Per-node retry data["retry"] = { "max_attempts": 3, "backoff_ms": 500 }
Per-node timeout data["timeout_ms"] = 5000
Continue on error data["continue_on_error"] = true — failed nodes output { "__error__": "..." }
Concurrency cap FlowEngine::new(registry).with_max_concurrency(4)
Partial resume runner.resume_from(&prior_result, vars).await?

Extension points

Trait / Type Purpose Default
Node Custom node execution logic 19 built-in types
NodeRegistry Type string → Arc<dyn Node> Ships with all built-ins
ExecContext Per-node runtime data
FlowEngine Lifecycle orchestrator
ExecutionStore Persist execution history MemoryExecutionStore
FlowStore Load/save named flow definitions MemoryFlowStore
EventEmitter Lifecycle event hooks NoopEventEmitter
FlowEvent Cloneable event broadcast 16 variants

Event emitter

use a3s_flow::{EventEmitter, FlowEngine, NodeRegistry};
use async_trait::async_trait;
use serde_json::Value;
use uuid::Uuid;

struct Logger;
#[async_trait]
impl EventEmitter for Logger {
    async fn on_flow_started(&self, _: Uuid) {}
    async fn on_node_completed(&self, _: Uuid, id: &str, out: &Value) {
        println!("{id}: {out}");
    }
    // ... implement all 16 trait methods (default to no-op)
}

let engine = FlowEngine::new(NodeRegistry::with_defaults())
    .with_event_emitter(std::sync::Arc::new(Logger) as _);

License

MIT — see LICENSE.