FlowEngine is an agent-style LLM orchestration framework for building composable, production-ready AI workflows. It provides a zero-external-dependency core with RPC-style action definitions and type-safe response handling.
- 🎯 Agent-style Step Definitions - Context, Guide, Task, Input layers for structured prompts
- 🔧 RPC-style Actions - Automatic JSON Schema generation from Go structs
- 🔄 Session Management - Context chaining with TTL and size limits
- 🔁 Retry with Validation - Configurable retry with hint injection
- 📦 Type-safe Parameters - Generic parameter extraction
- 🧩 Composable Flows - Chain, Decide, Parallel, Iterate primitives
- 🛠️ Function Calling - Native LLM function call support
- 💾 Caching Adapter - Built-in response caching decorator
- 📊 Observability - Logging and metrics middleware
go get github.com/northseadl/flowengineFor Volcengine Ark (Doubao) adapter:
go get github.com/northseadl/flowengine/adapter/arkpackage main
import (
"context"
"fmt"
"github.com/northseadl/flowengine"
)
// Define action parameter structs
type SearchParams struct {
Keywords string `json:"keywords" desc:"search keywords"`
MaxCount int `json:"max_count,omitempty" desc:"max results count"`
}
type ChatParams struct {
Response string `json:"response" desc:"direct response to user"`
}
func main() {
// Create a step with actions
step := flowengine.NewStep("intent").
Context("You are an intent classifier for an e-commerce platform").
Task("Classify user query intent and extract parameters").
Input("Query: {{.query}}").
Guide("- Do not guess", "- Be precise").
Action("search", SearchParams{}).
Action("chat", ChatParams{}).
Build()
// Create engine with your adapter
engine := flowengine.New(yourAdapter)
// Run with metadata
ctx := context.Background()
fc, resp, err := engine.Run(ctx, step, flowengine.Metadata{
"query": "I want a red dress under $50",
})
if err != nil {
panic(err)
}
// Type-safe parameter extraction
if params, ok := flowengine.ParamsAs[SearchParams](resp, "search"); ok {
fmt.Printf("Keywords: %s, MaxCount: %d\n", params.Keywords, params.MaxCount)
}
}// Sequential execution
flow := flowengine.Chain(step1, step2, step3)
// Conditional branching
flow := flowengine.Decide(
flowengine.When("search", searchHandler),
flowengine.When("chat", chatHandler),
flowengine.Otherwise(defaultHandler),
)
// Parallel execution
flow := flowengine.Parallel(step1, step2)
// Iteration until condition
flow := flowengine.Iterate(step, func(r *flowengine.Response) bool {
return r.Is("done")
})// Run within a session (context is preserved across calls)
fc1, resp1, _ := engine.RunWithSession(ctx, "session-id", step1, metadata)
fc2, resp2, _ := engine.RunWithSession(ctx, "session-id", step2, metadata)
// Delete session when done
engine.DeleteSession(ctx, "session-id")
// Configure session manager
manager := flowengine.NewSessionManager(
flowengine.WithTTL(30 * time.Minute),
flowengine.WithMaxSessions(1000),
)import (
"github.com/northseadl/flowengine"
"github.com/northseadl/flowengine/adapter/ark"
"github.com/volcengine/volcengine-go-sdk/service/arkruntime"
)
func main() {
// Create Ark client
client := arkruntime.NewClientWithApiKey("your-api-key")
// Create adapter with options
adapter := ark.New(client,
ark.WithModel("your-model-endpoint"),
ark.WithThinking(), // Enable reasoning mode
)
// Create engine
engine := flowengine.New(adapter)
// Use the engine...
}FlowEngine enforces a structured prompt model:
┌─────────────────────────────────────┐
│ Layer 1: Context (System) │ ← Just-in-time data
│ <context>...</context> │
├─────────────────────────────────────┤
│ Layer 2: Guide (System) │ ← How to operate
│ <guide> │
│ Available actions: ... │
│ Constraints: ... │
│ </guide> │
├─────────────────────────────────────┤
│ Layer 3: Task (User) │ ← What to do
│ <task>...</task> │
│ <input>...</input> │
└─────────────────────────────────────┘
Implement these interfaces to integrate with any LLM provider:
// Basic adapter
type Adapter interface {
Call(ctx context.Context, system, user string) (string, error)
Stream(ctx context.Context, system, user string) (<-chan string, error)
}
// Session-aware adapter
type SessionAdapter interface {
Adapter
CallWithSession(ctx context.Context, system, user, previousResponseID string) (resp, responseID string, err error)
DeleteSession(ctx context.Context, responseID string) error
}
// Function Call adapter
type ToolAdapter interface {
Adapter
CallWithTools(ctx context.Context, system, user string, tools []ToolDef) (*ToolCall, error)
CallWithToolsAndSession(ctx context.Context, system, user, previousResponseID string, tools []ToolDef) (*ToolCall, string, error)
}// Add logging
adapter = flowengine.WithHooks(adapter, flowengine.CallHook{
OnStart: func(ctx context.Context, system, user string) {
log.Printf("LLM call starting")
},
OnEnd: func(ctx context.Context, system, user, response string, err error) {
log.Printf("LLM call completed")
},
})
// Add caching
cache := flowengine.NewMemoryCache()
adapter = flowengine.WithCache(adapter, cache, true)
// Add metrics
adapter = flowengine.WithMetrics(adapter, yourMetricsCollector)MIT License - see LICENSE for details.