diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index 1cf920354..915b8ae9f 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -4,14 +4,16 @@ {description} +[#_stateless_transforms] == Stateless transforms -Stateless transforms do not have any notion of _state_, meaning that all items must be processed independently of any previous items. +Stateless transforms do not have any notion of _state_, meaning that all items must be processed independently of any previous items. These methods transform the input into the correct shape that is required by further, more complex ones. The key feature of these transforms is that they do not have side-effects and they treat each item in isolation. +[#_map] === map Mapping is the simplest kind of stateless transformation. It simply @@ -23,6 +25,7 @@ stage. StreamStage names = stage.map(name -> name.toLowerCase()); ---- +[#_filter] === filter Similar to `map`, the `filter` is a stateless operator that applies a @@ -32,6 +35,7 @@ predicate to the input to decide whether to pass it to the output. BatchStage names = stage.filter(name -> !name.isEmpty()); ``` +[#_flatMap] === flatMap `flatMap` is equivalent to `map`, with the difference that instead of @@ -754,20 +758,39 @@ on each member, and then it merges the sorted streams into the final output. The first step requires O(n) heap memory, but the second step just receives the data in the correct order and has O(1) memory needs. +[#_mapStateful] === mapStateful -mapStateful is an extension of the simple xref:transforms.adoc#stateless-transforms#map[map] +`mapStateful` is an extension of the xref:transforms.adoc#_map[map] transform. It adds the capability to optionally retain mutable state. -The major use case of stateful mapping is recognizing a pattern in the +* `The non-keyed `mapStateful` operation maintains a single shared state object per processor instance. +The state object is created using `createFn` and is passed to `mapFn` together with each input item. +The `mapFn` function can update the state and emit a single output item or return null if no output is needed. + +* In the keyed `mapStateful` Jet maintains a dedicated state object created by the supplied `createFn` for each grouping key. +Every incoming item is processed by `mapFn`, which receives the current state, the grouping key, and the item. +The function can update the state and emit a single output item (or null if no output is needed). ++ +If a positive `ttl` is configured, Jet automatically evicts inactive state objects based on event time. +Each state tracks the highest timestamp observed for its key. As watermarks advance, Jet evicts states whose timestamps are older than `watermark - ttl`. ++ +When a state is evicted due to TTL expiration, Jet invokes the optional `onEvictFn`, +which can emit a final result derived from the state (for example, a session summary or aggregated total). ++ +In addition to TTL-based eviction, `mapStateful` supports explicit deletion of the state using `deleteStatePredicate`. +This predicate is evaluated after processing each input item. If it returns true, the state is removed immediately, without waiting for TTL expiration. +In this case, `onEvictFn` is not invoked. ++ +Explicit deletion is useful when the logical lifecycle of a state ends before its TTL expires, such as when a session closes, a transaction completes, or a terminal event is observed. +This helps reduce memory usage and avoids retaining unnecessary state under high load. + +The major use case for stateful mapping is recognizing a pattern in the event stream, such as matching start-transaction with end-transaction events based on an event correlation ID. More generally, you can implement any kind of state machine and detect patterns in an input of any complexity. -As with other stateful operations, you can also use a `groupingKey` to -have a unique state per key. - For example, consider a pipeline that matches incoming `TRANSACTION_START` events to `TRANSACTION_END` events which can arrive unordered and when both are received outputs how long the transaction @@ -831,6 +854,125 @@ You will note that we also had to set an expiry time on the states eventually run out of memory as we accumulate more and more transactions. +We can improve the previous example by deleting the state earlier. +We still need TTL to avoid a memory leak if one of the events from the pair is never received, +but we can free memory much earlier in most cases with explicit deletion. + +```java +p.readFrom(KafkaSources.kafka(.., "transaction-events")) + .withNativeTimestamps(0) + .groupingKey(event -> event.getTransactionId()) + .mapStateful(MINUTES.toMillis(10), + () -> new TransactionEvent[2], + (state, id, event) -> { + if (event.type() == TRANSACTION_START) { + state[0] = event; + } else if (event.type() == TRANSACTION_END) { + state[1] = event; + } + if (state[0] != null && state[1] != null) { + // we have both start and end events + long duration = state[1].timestamp() - state[0].timestamp(); + return MapUtil.entry(event.transactionId(), duration); + } + // we don't have both events, do nothing for now. + return null; + }, + (state, id, event)-> { + // we have both start and end events + return state[0] != null && state[1] != null; + } + (state, id, currentWatermark) -> + // if we have not received both events after 10 minutes, + // we will emit a timeout entry + (state[0] == null || state[1] == null) + ? MapUtil.entry(id, TIMED_OUT) + : null + ).writeTo(Sinks.logger()); +``` + +=== flatMapStateful + +`flatMapStateful` is equivalent to xref:transforms.adoc#_mapStateful[mapStateful], with the difference that instead of +one output item you can have an arbitrary number of output items per input item. + +This example processes a stream of transaction events. Each transaction produces multiple events (for example, item updates or intermediate steps). +When a terminal `TRANSACTION_END` event is received, the accumulated state is emitted and immediately evicted using `deleteStatePredicate`, +without waiting for TTL-based eviction. + +```java +StreamStage events = null; + +StreamStage summaries = events + .groupingKey(TransactionEvent::getTransactionId) + .flatMapStateful( + MINUTES.toMillis(10), + TransactionState::new, + (state, txId, event) -> { + switch (event.getType()) { + case ITEM: + state.items.add(event.getItem()); + return Traversers.empty(); + + case TRANSACTION_END: + return Traversers.singleton( + new TransactionSummary(txId, state.items) + ); + + default: + return Traversers.empty(); + } + }, + // delete state immediately after transaction end + (state, txId, event) -> + event.getType() == EventType.TRANSACTION_END, + // TTL-based eviction (only used if TRANSACTION_END is never received) + (state, txId, wm) -> + Traversers.singleton( + new TransactionSummary(txId, state.items) + ) + ); +``` +=== filterStateful + +Similar to xref:transforms.adoc#_mapStateful[mapStateful], `filterStateful` is a stateful operator that applies a +predicate to the input to decide whether to pass it to the output. + +This example processes a stream of user events and allows page view events only while the user session is active. + +* A session becomes active on `LOGIN`. +* `PAGE_VIEW` events are allowed only if the session is active. +* On `LOGOUT`, the session state is immediately evicted using `deleteStatePredicate`. + +```java +StreamStage events = null; + +StreamStage filtered = events + .groupingKey(UserEvent::getUserId) + .filterStateful( + HOURS.toMillis(1), + () -> new boolean[1], // state[0] = sessionActive + (state, event) -> { + switch (event.getType()) { + case LOGIN: + state[0] = true; + return false; // do not emit LOGIN + + case PAGE_VIEW: + return state[0]; // allow only if logged in + + case LOGOUT: + return false; // do not emit LOGOUT + + default: + return false; + } + }, + // delete state immediately on logout + (state, userId, event) -> + event.getType() == EventType.LOGOUT + ); +``` === co-group / join Co-grouping allows to join any number of inputs on a common key, which