Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions packages/pipeline/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Pipeline

A framework for transforming large RDF datasets using pure [SPARQL](https://www.w3.org/TR/sparql11-query/) queries.
A framework for transforming large RDF datasets, primarily using [SPARQL](https://www.w3.org/TR/sparql11-query/) queries with TypeScript for the parts that are hard to express in SPARQL alone.

- **SPARQL-native.** Data transformations are plain SPARQL query files — portable, transparent, testable and version-controlled.
- **Composable.** Decorators wrap executors and resolvers to add behaviour (provenance, vocabulary detection, data import) without subclassing.
- **Composable.** Executors are an interface: wrap a SPARQL executor with custom TypeScript to handle edge cases like date parsing or string normalisation (see [Executor](#executor)).
- **Extensible.** A plugin system lets packages like [@lde/pipeline-void](../pipeline-void) (or your own plugins) hook into the pipeline lifecycle.

## Components
Expand Down Expand Up @@ -61,14 +61,73 @@ const itemSelector: ItemSelector = {

### Executor

Generates RDF triples. `SparqlConstructExecutor` runs a SPARQL CONSTRUCT query with template substitution and variable bindings:
Generates RDF triples. The built-in `SparqlConstructExecutor` runs a SPARQL CONSTRUCT query with template substitution and variable bindings:

```typescript
const executor = new SparqlConstructExecutor({
query: 'CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }',
});
```

`Executor` is an interface, so you can implement your own for logic that's hard to express in pure SPARQL — for example, cleaning up messy date notations or converting locale-specific dates to ISO 8601. The decorator pattern lets you wrap a SPARQL executor and post-process its quad stream in TypeScript:

```typescript
import { DataFactory } from 'n3';
import type { Quad, Literal } from '@rdfjs/types';
import type { Dataset, Distribution } from '@lde/dataset';
import {
type Executor,
type ExecuteOptions,
NotSupported,
} from '@lde/pipeline';

class TransformExecutor implements Executor {
constructor(
private readonly inner: Executor,
private readonly transform: (
quads: AsyncIterable<Quad>,
dataset: Dataset,
) => AsyncIterable<Quad>,
) {}

async execute(
dataset: Dataset,
distribution: Distribution,
options?: ExecuteOptions,
): Promise<AsyncIterable<Quad> | NotSupported> {
const result = await this.inner.execute(dataset, distribution, options);
if (result instanceof NotSupported) return result;
return this.transform(result, dataset);
}
}
```

Then use it to wrap any SPARQL executor:

```typescript
new Stage({
name: 'dates',
executors: new TransformExecutor(
await SparqlConstructExecutor.fromFile('dates.rq'),
async function* (quads) {
for await (const quad of quads) {
if (quad.object.termType === 'Literal' && isMessyDate(quad.object)) {
const cleaned = DataFactory.literal(
parseDutchDate(quad.object.value),
DataFactory.namedNode('http://www.w3.org/2001/XMLSchema#date'),
);
yield DataFactory.quad(quad.subject, quad.predicate, cleaned);
} else {
yield quad;
}
}
},
),
});
```

This keeps SPARQL doing the heavy lifting while TypeScript handles the edge cases. See [@lde/pipeline-void](../pipeline-void)'s `VocabularyExecutor` for a real-world example of this pattern.

### Writer

Writes generated quads to a destination:
Expand Down