Composable stream primitives for eser stack. Built on the Web Streams API with a Span-based formatting model that makes output adapter-agnostic.
Source<T> → Layer<I, O> → Layer<I, O> → Sink<T>
(produce) (transform) (transform) (consume)
All data flows through Chunks — wrappers that carry data + metadata (timestamp, kind, channel, annotations).
Instead of baking ANSI codes into output strings, formatting is declared as a lightweight Span tree. Renderers serialize Spans per target:
import * as streams from "@eser/streams";
import * as span from "@eser/streams/span";
// Handler code — format-agnostic
ctx.out.writeln(span.bold("Recipe: "), span.cyan(recipe.name));
ctx.out.writeln(span.dim(`[${recipe.language}]`));
ctx.out.writeln(span.green("✓ Done"));Same Span tree, different output:
| Renderer | bold("hello") |
red("error") |
|---|---|---|
| ANSI (terminal) | \x1b[1mhello\x1b[22m |
\x1b[31merror\x1b[39m |
| Markdown (MCP/HTTP) | **hello** |
error |
| Plain (tests/logs) | hello |
error |
The output() function is the console.log replacement:
import * as streams from "@eser/streams";
// CLI: colored terminal output
const out = streams.output({
renderer: streams.renderers.ansi(),
sink: streams.sinks.stdout(),
});
// MCP: markdown for tool responses
const out = streams.output({
renderer: streams.renderers.markdown(),
sink: streams.sinks.buffer(),
});
// Test: plain text for assertions
const out = streams.output({
renderer: streams.renderers.plain(),
sink: streams.sinks.buffer(),
});
out.write("plain text");
out.writeln(span.bold("bold"), " and ", span.dim("dim"));
await out.flush();
await out.close();import * as span from "@eser/streams/span";
span.text("plain text"); // { kind: "text", value: "plain text" }
span.bold("important"); // { kind: "bold", children: [...] }
span.dim("subtle"); // { kind: "dim", children: [...] }
span.italic("emphasis"); // { kind: "italic", children: [...] }
span.underline("link"); // { kind: "underline", children: [...] }
span.strikethrough("removed"); // { kind: "strikethrough", children: [...] }
span.red("error"); // { kind: "color", color: "red", children: [...] }
span.green("success"); // { kind: "color", color: "green", children: [...] }
span.cyan("info"); // { kind: "color", color: "cyan", children: [...] }
span.yellow("warning"); // { kind: "color", color: "yellow", children: [...] }
span.nl(); // { kind: "newline" }span.table(
["Name", "Language", "Scale"],
[
["fp-pipe", "typescript", "utility"],
["go-service", "go", "project"],
],
);
span.codeBlock("const x = 42;", "typescript");
span.list([
["Install: ", span.dim("deno install")],
["Run: ", span.dim("deno task dev")],
]);Spans compose naturally:
span.bold("Recipe: ", span.cyan(recipe.name));
// { kind: "bold", children: [
// { kind: "text", value: "Recipe: " },
// { kind: "color", color: "cyan", children: [
// { kind: "text", value: "my-recipe" }
// ]}
// ]}The Renderer<T> interface is generic — built-in renderers return strings, but
external packages can implement renderers that return any type.
| Renderer | Import | Use Case |
|---|---|---|
ansi() |
@eser/streams/renderers |
Terminal (colored escape codes) |
markdown() |
@eser/streams/renderers |
MCP tool responses, HTTP APIs |
plain() |
@eser/streams/renderers |
Tests, log files |
import * as renderers from "@eser/streams/renderers";
const renderer = renderers.ansi();
const text = renderer.render([span.bold("hello"), span.text(" world")]);
// "\x1b[1mhello\x1b[22m world\x1b[0m"Any package can implement Renderer<T> to produce a different output type. For
example, @eser/laroux-react provides a React renderer:
import { reactRenderer } from "@eser/laroux-react";
const renderer = reactRenderer(); // Renderer<React.ReactElement>
const element = renderer.render([span.bold("hello")]);
// <strong>hello</strong>import type { Renderer } from "@eser/streams/renderers";
import type { Span } from "@eser/streams/span";
const myRenderer = (): Renderer<MyOutputType> => ({
name: "my-renderer",
render: (spans: readonly Span[]) => {
// Convert Span tree to your target format
},
});For stream processing pipelines:
import * as streams from "@eser/streams";
// Transform pipeline
const items = await streams.pipeline()
.from(streams.sources.values(1, 2, 3, 4, 5))
.through(streams.layers.filter((n) => n > 2))
.through(streams.layers.map((n) => n * 10))
.collect<number>();
// [30, 40, 50]| Sink | Purpose |
|---|---|
stdout() |
Write to process.stdout |
buffer() |
Collect in memory (.items(), .chunks()) |
null() |
Discard all output |
writable(stream) |
Wrap any WritableStream |
multiplex(...sinks) |
Fan-out to multiple sinks |
Handlers use ctx.out (an Output instance) injected via the Task context:
import * as task from "@eser/functions/task";
import * as span from "@eser/streams/span";
type HandlerContext = { readonly out: Output };
const myHandler = (input: Input): task.Task<Output, Error, HandlerContext> =>
task.task(async (ctx) => {
ctx.out.writeln(span.bold("Processing..."));
// ... business logic
ctx.out.writeln(span.green("✓ Done"));
return results.ok(output);
});The adapter provides the Output with the right renderer + sink:
// CLI adapter
const out = streams.output({
renderer: renderers.ansi(),
sink: sinks.stdout(),
});
await task.runTask(myHandler(input), { out });
// MCP adapter
const buf = sinks.buffer();
const out = streams.output({ renderer: renderers.markdown(), sink: buf });
await task.runTask(myHandler(input), { out });
const response = buf.items().join("");Apache-2.0