Skip to content

nectario/pipeline-services

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

79 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Pipeline Services

CI

Pipeline Services is a locality-aware software architecture framework for composing application behavior as typed, observable pipelines of actions.

It is a middle path between monoliths and microservices: keep modularity and clear execution flow, default to local execution when locality is the right design, and distribute only when distribution is truly warranted.

Pipeline Services is application architecture, not cloud/platform/deployment infrastructure.

Java is the reference implementation in this repository. Python, TypeScript, Rust, Go, C#, and C++ are contract-aligned reference ports with tests and examples. Mojo is a strategic target and experimental runtime-evolution track.

Project status

  • v0.1.0 is an initial public preview focused on the Java reference implementation and the shared portability contract.
  • The non-Java ports are in-repo reference ports, not independently published public releases today.
  • See docs/PROJECT_STATUS.md for the maturity matrix, release surface, and compatibility boundaries.

Release scope for v0.1.0

  • Java is the reference implementation and the primary compatibility anchor.
  • Python, TypeScript, Rust, Go, C#, and C++ validate the portability contract through in-repo tests and examples.
  • Mojo remains manual/experimental for now, and pipeline-disruptor remains experimental and single-thread today.
  • Standalone publication to Maven Central, PyPI, npm, crates.io, NuGet, or other package registries is explicitly out of scope for this release.

Requirements

  • Java
  • Python
  • Mojo
  • TypeScript
  • C++
  • C#
  • Go
  • Rust

What this framework tries to solve

Most “systems” (services, batch jobs, agents, workflows, trading engines, orchestration) are ultimately a series of actions: validate → enrich → transform → call out to something → decide → persist → observe. The problem is that this logic often ends up scattered across classes, functions, and ad-hoc conventions.

Pipeline Services makes the structure explicit: preActions → actions → postActions, with a consistent execution model across languages. That gives you a clear mental model, better reviewability, and a repeatable way to build complex systems without accumulating accidental complexity.

It’s also a pragmatic response to the “microservices everywhere” era. Many teams have pulled back from microservices due to the hidden costs of distributed systems: operational overhead, cross-service coordination, versioning and deployment complexity, network latency/reliability, harder debugging and tracing, and data consistency challenges. Pipeline Services keeps modularity and composability, but shifts complexity back into a clear in-process design primitive (actions) you can still deploy as a single service, a batch job, or a component inside a larger system.

In a world where a lot of code is generated automatically, pipelines also act as a forcing function: you can constrain generated code into a clean, composable pattern where each unit is an action with a clear input/output contract. You can mix:

  • Local actions (pure transforms / domain logic)
  • Remote actions (HTTP calls with sensible defaults and minimal repetition)
  • Prompt actions (prompt-directed steps that compile into checked-in, testable code per target language)

This approach is driven by the creator’s 25+ years building complex Wall Street systems (trading and high-performance platforms) and identifying pipelines as a highly effective structure for clarity, reusability, robustness, and long-term maintainability.

What you get

  • A simpler system design primitive: a runnable architecture that reads like a story.
  • Reusability by construction: actions are small units you can share across pipelines and ports.
  • Robustness by default: consistent error capture, stop-vs-continue controls, and predictable execution phases.
  • Observability as a plug-in: timings/metrics live in actions (keeps the core clean and portable).
  • Prompt-to-code without chaos: prompts stay as source-of-truth, while generated code stays reviewable, testable, and language-native.

This repo is organized around a shared, language-agnostic behavior contract (docs/PORTABILITY_CONTRACT.md) so ports can stay consistent on:

  • short-circuit semantics
  • exception capture vs continue
  • JSON pipeline configuration shape (actions, $local, $remote, remoteDefaults)
  • built-in timings/metrics hooks

Core model (recommended)

  • A pipeline is an ordered list of actions that transform a context value.
  • Two action shapes are supported across ports:
    • Unary: C → C
    • Control-aware: (C, control) → C (explicit short-circuit + error recording)
  • A pipeline has three phases: premainpost.

Pipeline execution model

Design goals

  • Simplicity and clarity first: the common path should read like a list of actions (method refs/lambdas or JSON).
  • Portability by contract: behavior is defined once in docs/PORTABILITY_CONTRACT.md and re-implemented per language.
  • Robustness without ceremony: exceptions are captured; stop-vs-continue is a pipeline setting; post-actions still run.
  • Low-friction remote actions: meaningful defaults (remoteDefaults) with per-action overrides; avoid repeating config.
  • Metrics out of the box: timings are captured and can be emitted via a post-action (keeps the core clean).

Java modules (Maven)

pipeline-core        # Pipeline<C>, StepAction<C>, ActionControl<C> (legacy: StepControl<C>), PipelineResult<C>, RuntimePipeline<T>, metrics
pipeline-config      # Minimal JSON loader for unary String pipelines
pipeline-remote      # HTTP action adapter (json GET/POST)
pipeline-prompt      # Prompt-to-code generated actions (optional check-in) + Java helpers
pipeline-api         # Higher-level facade (labels/jumps/beans/inline JSON + optional metrics)
pipeline-disruptor   # Experimental runner wrapper (single-thread for now)
pipeline-examples    # Runnable examples (+ main runner)

Ports

  • Java: reference implementation (src/Java/)
  • Python: contract-aligned in-repo reference port (src/Python/)
  • TypeScript: contract-aligned in-repo reference port (src/typescript/)
  • Rust: contract-aligned in-repo reference port (src/Rust/)
  • Go: contract-aligned in-repo reference port (src/Go/)
  • C#: contract-aligned in-repo reference port (src/CSharp/)
  • C++: contract-aligned in-repo reference port (src/Cpp/)
  • Mojo: strategic target and experimental reference port (src/Mojo/pipeline_services/)

These ports live in-repo to validate the shared contract. v0.1.0 does not imply separate registry publication or identical maturity across every port.

Experimental directories

  • src/Java/pipeline-api-pr/ is an incubating Java API work area that is not part of the release build or the public v0.1.0 compatibility surface. See src/Java/pipeline-api-pr/README.md.
  • statemachine/ is a standalone experiment and is not part of the main Pipeline Services release surface. See statemachine/README.md.
  • archive/ contains historical snapshots and is not part of the supported release surface.

Portability contract

Why Mojo

Mojo is a primary target for a future “fast, portable pipeline runtime” story: compile-time performance, predictable execution, and an ecosystem that can still interop with Python when needed.

This repo includes a Mojo port that follows the shared behavior contract so semantics stay comparable across languages. CI for Mojo remains manual until the toolchain is pinned cleanly for GitHub-hosted runners.

Quick start

Java (reference implementation)

Requirements: Java 21+, Maven 3.9+ (wrapper included)

./mvnw -q clean test

Run all examples:

./mvnw -q -pl pipeline-examples exec:java -Dexec.mainClass=com.pipeline.examples.ExamplesMain

Example (Pipeline<C>)

import com.pipeline.core.Pipeline;
import com.pipeline.core.PipelineResult;
import com.pipeline.examples.steps.PolicySteps;
import com.pipeline.examples.steps.TextSteps;

Pipeline<String> pipeline = new Pipeline<>("clean_text", /*shortCircuitOnException=*/true)
    .addPreAction(PolicySteps::rateLimit)
    .addAction(TextSteps::strip)
    .addAction(TextSteps::normalizeWhitespace)
    .addAction((s, control) -> {
      if (s.length() > 280) {
        control.shortCircuit();        // explicit short-circuit (stops MAIN actions)
        return s.substring(0, 280);
      }
      return s;
    })
    .addPostAction(PolicySteps::audit);

PipelineResult<String> result = pipeline.run("  Hello   World  ");
System.out.println(result.context());

Mojo port

Mojo toolchain lives under pipeline_services/pixi.toml.

cd pipeline_services
pixi run mojo run -I ../src/Mojo ../src/Mojo/pipeline_services/examples/example01_text_clean.mojo
pixi run mojo run -I ../src/Mojo ../src/Mojo/pipeline_services/examples/example02_json_loader.mojo
pixi run mojo run -I ../src/Mojo ../src/Mojo/pipeline_services/examples/example05_metrics_post_action.mojo

Notes:

  • JSON loading uses a registry for $local actions and supports $remote HTTP actions.

Python port

cd src/Python
python3 -m pipeline_services.examples.example01_text_clean
python3 -m pipeline_services.examples.example02_json_loader
python3 -m pipeline_services.examples.example05_metrics_post_action
python3 -m pipeline_services.examples.benchmark01_pipeline_run

TypeScript port

cd src/typescript
npm ci
npm run build
npm test
node dist/src/pipeline_services/examples/example01_text_clean.js

Rust port

cd src/Rust
cargo test
cargo run --example example01_text_clean

C++ port

cd src/Cpp
cmake -S . -B build
cmake --build build -j
ctest --test-dir build
./build/example01_text_clean

Go port

cd src/Go
go test ./...
go run ./examples/example01_text_clean

C# port

cd src/CSharp
dotnet test ./pipeline_services_tests/PipelineServices.Tests.csproj
dotnet run --project pipeline_services_examples -- example01_text_clean

Prompt-to-code (compile phase)

Prompt-to-code flow

Pipelines can include $prompt actions in the source JSON. $prompt is a compile-time directive: a prompt compiler generates:

  • Per-language compiled pipelines: pipelines/generated/<lang>/<pipeline>.json (with $prompt → $local rewrites)
  • Per-language generated actions (code) that implement the prompt contract

Run prompt compilation:

python3 tools/prompt_codegen.py --pipelines-dir pipelines

Runtime behavior:

  • If a source pipeline file has no $prompt, loaders run it directly.
  • If a source pipeline file has $prompt, loaders automatically load the compiled JSON from pipelines/generated/<lang>/....
  • If compiled JSON is missing, loaders throw a clear “run prompt codegen” error.

Registering generated actions (per port):

  • Java: com.pipeline.generated.PromptGeneratedActions.register(registry)
  • Python: pipeline_services.generated.register_generated_actions(registry)
  • TypeScript: register_generated_actions(registry) from pipeline_services/generated
  • Rust: pipeline_services::generated::register_generated_actions(&mut registry)
  • Go: generated.RegisterGeneratedActions(registry)
  • C++: pipeline_services::generated::registerGeneratedActions(registry)
  • Mojo: registry = pipeline_services.generated.register_generated_actions(registry)
  • C#: PipelineServices.Generated.PromptGeneratedActions.Register(registry)

Semantics (portable)

  • Explicit short-circuit: inside a StepAction<C>, call control.shortCircuit().
  • shortCircuitOnException=true: an action exception records an error and short-circuits MAIN actions.
  • shortCircuitOnException=false: an action exception records an error and continues.
  • Pre/post actions always run fully (not stopped by short-circuit) by default.
  • No checked exceptions in StepAction<C>; exceptions are captured in PipelineResult<C>.
  • Optional: attach errors to your context via Pipeline.onError((ctx, err) -> /*return updated ctx*/); default is no-op.

JSON config (portable shape)

Canonical JSON form (across ports):

{
  "pipeline": "json_clean_text",
  "type": "unary",
  "shortCircuitOnException": true,
  "actions": [
    { "$local": "com.pipeline.examples.adapters.TextStripStep" },
    { "$local": "com.pipeline.examples.adapters.TextNormalizeStep" }
  ]
}

Notes:

  • "steps" is accepted as a legacy alias for "actions".
  • Java JSON loader also accepts "preActions"/"postActions" (preferred) with legacy aliases "pre"/"post".
  • Root-level "remoteDefaults" can be used to avoid repeating remote configuration across many "$remote" actions.

Java-only: JSON singleton mode + action lifecycles:

  • Set "singletonMode": true to treat the loaded pipeline definition as reusable across many runs.
  • Per action, set "lifecycle": "shared" | "pooled" | "perRun".
    • "pooled" borrows an instance per invocation and calls ResettableAction.reset() before returning it to the pool.
    • "pool": { "max": 128 } controls the maximum pool size.

Example (pooled $local action):

{
  "pipeline": "singleton_mode_pooled",
  "type": "unary",
  "singletonMode": true,
  "actions": [
    {
      "label": "normalize_whitespace",
      "$local": "com.pipeline.examples.adapters.PooledScratchNormalizeAction",
      "lifecycle": "pooled",
      "pool": { "max": 64 }
    }
  ]
}

And the action implements ResettableAction:

import com.pipeline.core.ResettableAction;
import java.util.function.UnaryOperator;

public final class PooledScratchNormalizeAction implements UnaryOperator<String>, ResettableAction {
  @Override public String apply(String input) { /* uses internal scratch buffers */ }
  @Override public void reset() { /* clears internal state */ }
}

Programmatic pipelines: prefer PipelineProvider when you need singleton/pooled/per-run behavior:

Modes:

  • shared: one pipeline instance reused across runs (actions must be safe to share concurrently)
  • pooled: pipeline instances reused but never shared concurrently
  • perRun: a new pipeline instance is created per run
import com.pipeline.core.Pipeline;
import com.pipeline.core.PipelineProvider;

PipelineProvider<String> provider = PipelineProvider.pooled(
    () -> new Pipeline<String>("programmatic_pooled", true)
        .addAction("normalize", new PooledScratchNormalizeAction()),
    64
);

String out = provider.run("  hello   world  ").context();

If you want to reuse the same pipeline plan while ensuring stateful actions are never shared concurrently, enable pooled local actions. Any action that implements ResettableAction is pooled under a stable key (pipelineName + phase + index + label) and reset() is called after each invocation:

import com.pipeline.core.ActionPoolCache;
import com.pipeline.core.Pipeline;
import com.pipeline.core.PipelineProvider;

ActionPoolCache actionPoolCache = new ActionPoolCache(64);

PipelineProvider<String> provider = PipelineProvider.pooled(
    () -> new Pipeline<String>("programmatic_pooled_actions", true)
        .addAction("normalize", new PooledScratchNormalizeAction()),
    64
).withPooledLocalActions(actionPoolCache);

String out = provider.run("  hello   world  ").context();

If you prefer method references, you can decorate an action with an explicit reset hook:

import com.pipeline.core.Actions;
import com.pipeline.core.Pipeline;

Pipeline<String> pipeline = new Pipeline<String>("ref_style", true)
    .addAction("normalize",
        Actions.resettable(new PooledScratchNormalizeAction(), PooledScratchNormalizeAction::reset));

Java loader (pipeline-config) is intentionally minimal and currently targets unary String pipelines:

import com.pipeline.config.PipelineJsonLoader;

try (var in = getClass().getResourceAsStream("/pipelines/json_clean_text.json")) {
  var pipeline = PipelineJsonLoader.loadUnary(in);
  System.out.println(pipeline.run("  Hello   World  ").context());
}

Placeholders (Identity)

For iterative development you can use an explicit placeholder action:

  • JSON: { "$local": "identity" } (built-in, no reflection)
  • Programmatic: pipeline.addAction("todo_normalize"); (adds an identity action with that label)

Remote action (HTTP)

Use pipeline-remote to turn an HTTP call into a StepAction<C>:

import com.pipeline.core.Pipeline;
import com.pipeline.remote.http.HttpStep;

record Ctx(String q, String body) {}

var spec = new HttpStep.RemoteSpec<Ctx>();
spec.endpoint = "https://httpbin.org/post";
spec.timeoutMillis = 800;
spec.retries = 1;
spec.toJson = ctx -> "{\"q\":\"" + ctx.q() + "\"}";
spec.fromJson = (ctx, body) -> new Ctx(ctx.q(), body);

var pipeline = new Pipeline<Ctx>("remote_demo", true).addAction(HttpStep.jsonPost(spec));
Ctx out = pipeline.run(new Ctx("hello", null)).context();

If you have many remote actions, use HttpStep.RemoteDefaults so you don’t repeat base URL, timeouts, retries, headers, and client wiring.

Runtime / imperative sessions

RuntimePipeline<T> is an imperative, single-threaded helper for REPL/tools:

import com.pipeline.core.RuntimePipeline;
import com.pipeline.examples.steps.TextSteps;

var runtimePipeline = new RuntimePipeline<>("adhoc_text", /*shortCircuitOnException=*/false, "  Hello   World  ");
runtimePipeline.addAction(TextSteps::strip);
runtimePipeline.addAction(TextSteps::normalizeWhitespace);
System.out.println(runtimePipeline.value());

Advanced: labels + jumps + inline JSON (pipeline-api)

pipeline-api provides a higher-level facade for polling/workflows (labels + Jumps.now/after) and JSON that can target @this / beans. See README-JUMPS.md and README-API-QUICKSTART.md.

Examples

Examples live in pipeline-examples and show:

  • Core Pipeline<C> (pre/actions/post, short-circuit, continue-on-error)
  • JSON loader (PipelineJsonLoader)
  • HTTP remote step (HttpStep)
  • Jump engine + metrics (pipeline-api)
  • Runtime sessions (RuntimePipeline<T>)

About

Functional Pipeline Framework for: Java, Python, Mojo, TypeScript, C++, C#, Go, and Rust

Resources

License

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages