Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 148 additions & 6 deletions docs/modules/pipelines/pages/transforms.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

{description}

[#_stateless_transforms]
== Stateless transforms

Stateless transforms do not have any notion of _state_, meaning that all items must be processed independently of any previous items.
Stateless transforms do not have any notion of _state_, meaning that all items must be processed independently of any previous items.

These methods transform the input into the correct shape that is required by further,
more complex ones. The key feature of these transforms is that
they do not have side-effects and they treat each item in isolation.

[#_map]
=== map

Mapping is the simplest kind of stateless transformation. It simply
Expand All @@ -23,6 +25,7 @@ stage.
StreamStage<String> names = stage.map(name -> name.toLowerCase());
----

[#_filter]
=== filter

Similar to `map`, the `filter` is a stateless operator that applies a
Expand All @@ -32,6 +35,7 @@ predicate to the input to decide whether to pass it to the output.
BatchStage<String> names = stage.filter(name -> !name.isEmpty());
```

[#_flatMap]
=== flatMap

`flatMap` is equivalent to `map`, with the difference that instead of
Expand Down Expand Up @@ -754,20 +758,39 @@ on each member, and then it merges the sorted streams into the final
output. The first step requires O(n) heap memory, but the second step
just receives the data in the correct order and has O(1) memory needs.

[#_mapStateful]
=== mapStateful

mapStateful is an extension of the simple xref:transforms.adoc#stateless-transforms#map[map]
`mapStateful` is an extension of the xref:transforms.adoc#_map[map]
transform. It adds the capability to optionally retain mutable state.

The major use case of stateful mapping is recognizing a pattern in the
* `The non-keyed `mapStateful` operation maintains a single shared state object per processor instance.
The state object is created using `createFn` and is passed to `mapFn` together with each input item.
The `mapFn` function can update the state and emit a single output item or return null if no output is needed.

* In the keyed `mapStateful` Jet maintains a dedicated state object created by the supplied `createFn` for each grouping key.
Every incoming item is processed by `mapFn`, which receives the current state, the grouping key, and the item.
The function can update the state and emit a single output item (or null if no output is needed).
+
If a positive `ttl` is configured, Jet automatically evicts inactive state objects based on event time.
Each state tracks the highest timestamp observed for its key. As watermarks advance, Jet evicts states whose timestamps are older than `watermark - ttl`.
+
When a state is evicted due to TTL expiration, Jet invokes the optional `onEvictFn`,
which can emit a final result derived from the state (for example, a session summary or aggregated total).
+
In addition to TTL-based eviction, `mapStateful` supports explicit deletion of the state using `deleteStatePredicate`.
This predicate is evaluated after processing each input item. If it returns true, the state is removed immediately, without waiting for TTL expiration.
In this case, `onEvictFn` is not invoked.
+
Explicit deletion is useful when the logical lifecycle of a state ends before its TTL expires, such as when a session closes, a transaction completes, or a terminal event is observed.
This helps reduce memory usage and avoids retaining unnecessary state under high load.

The major use case for stateful mapping is recognizing a pattern in the
event stream, such as matching start-transaction with end-transaction
events based on an event correlation ID. More generally, you can
implement any kind of state machine and detect patterns in an input of
any complexity.

As with other stateful operations, you can also use a `groupingKey` to
have a unique state per key.

For example, consider a pipeline that matches incoming
`TRANSACTION_START` events to `TRANSACTION_END` events which can arrive
unordered and when both are received outputs how long the transaction
Expand Down Expand Up @@ -831,6 +854,125 @@ You will note that we also had to set an expiry time on the states
eventually run out of memory as we accumulate more and more
transactions.

We can improve the previous example by deleting the state earlier.
We still need TTL to avoid a memory leak if one of the events from the pair is never received,
but we can free memory much earlier in most cases with explicit deletion.

```java
p.readFrom(KafkaSources.kafka(.., "transaction-events"))
.withNativeTimestamps(0)
.groupingKey(event -> event.getTransactionId())
.mapStateful(MINUTES.toMillis(10),
() -> new TransactionEvent[2],
(state, id, event) -> {
if (event.type() == TRANSACTION_START) {
state[0] = event;
} else if (event.type() == TRANSACTION_END) {
state[1] = event;
}
if (state[0] != null && state[1] != null) {
// we have both start and end events
long duration = state[1].timestamp() - state[0].timestamp();
return MapUtil.entry(event.transactionId(), duration);
}
// we don't have both events, do nothing for now.
return null;
},
(state, id, event)-> {
// we have both start and end events
return state[0] != null && state[1] != null;
}
(state, id, currentWatermark) ->
// if we have not received both events after 10 minutes,
// we will emit a timeout entry
(state[0] == null || state[1] == null)
? MapUtil.entry(id, TIMED_OUT)
: null
).writeTo(Sinks.logger());
```

=== flatMapStateful

`flatMapStateful` is equivalent to xref:transforms.adoc#_mapStateful[mapStateful], with the difference that instead of
one output item you can have an arbitrary number of output items per input item.

This example processes a stream of transaction events. Each transaction produces multiple events (for example, item updates or intermediate steps).
When a terminal `TRANSACTION_END` event is received, the accumulated state is emitted and immediately evicted using `deleteStatePredicate`,
without waiting for TTL-based eviction.

```java
StreamStage<TransactionEvent> events = null;

StreamStage<TransactionSummary> summaries = events
.groupingKey(TransactionEvent::getTransactionId)
.flatMapStateful(
MINUTES.toMillis(10),
TransactionState::new,
(state, txId, event) -> {
switch (event.getType()) {
case ITEM:
state.items.add(event.getItem());
return Traversers.empty();

case TRANSACTION_END:
return Traversers.singleton(
new TransactionSummary(txId, state.items)
);

default:
return Traversers.empty();
}
},
// delete state immediately after transaction end
(state, txId, event) ->
event.getType() == EventType.TRANSACTION_END,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this example is less than ideal if events are allowed out of order and some may come after TRANSACTION_END

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, it’s not ideal; it’s just a synthetic example meant to demonstrate how to use this method, without overcomplicating it with additional details that aren’t relevant to the purpose.

// TTL-based eviction (only used if TRANSACTION_END is never received)
(state, txId, wm) ->
Traversers.singleton(
new TransactionSummary(txId, state.items)
)
);
```
=== filterStateful

Similar to xref:transforms.adoc#_mapStateful[mapStateful], `filterStateful` is a stateful operator that applies a
predicate to the input to decide whether to pass it to the output.

This example processes a stream of user events and allows page view events only while the user session is active.

* A session becomes active on `LOGIN`.
* `PAGE_VIEW` events are allowed only if the session is active.
* On `LOGOUT`, the session state is immediately evicted using `deleteStatePredicate`.

```java
StreamStage<UserEvent> events = null;

StreamStage<UserEvent> filtered = events
.groupingKey(UserEvent::getUserId)
.filterStateful(
HOURS.toMillis(1),
() -> new boolean[1], // state[0] = sessionActive
(state, event) -> {
switch (event.getType()) {
case LOGIN:
state[0] = true;
return false; // do not emit LOGIN

case PAGE_VIEW:
return state[0]; // allow only if logged in

case LOGOUT:
return false; // do not emit LOGOUT

default:
return false;
}
},
// delete state immediately on logout
(state, userId, event) ->
event.getType() == EventType.LOGOUT
);
```
=== co-group / join

Co-grouping allows to join any number of inputs on a common key, which
Expand Down
Loading