From e8d837a0366f49e188c57622057d1fb915cb709e Mon Sep 17 00:00:00 2001 From: oshultseva Date: Mon, 15 Dec 2025 12:17:52 +0000 Subject: [PATCH 01/11] stateful docs --- docs/modules/pipelines/pages/transforms.adoc | 186 ++++++++++++++++++- 1 file changed, 182 insertions(+), 4 deletions(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index ea1404a6f..35418c45d 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -4,17 +4,19 @@ {description} +[#_stateless_transforms] == Stateless transforms -Stateless transforms do not have any notion of _state_, meaning that all items must be processed independently of any previous items. +Stateless transforms do not have any notion of _state_, meaning that all items must be processed independently of any previous items. These methods transform the input into the correct shape that is required by further, -more complex ones. The key feature of these transforms is that +more complex ones.The key feature of these transforms is that they do not have side-effects and they treat each item in isolation. +[#_map] === map -Mapping is the simplest kind of stateless transformation. It simply +Mapping is the simplest kind of stateless transformation.It simply applies a function to the input item, and passes the output to the next stage. @@ -23,6 +25,7 @@ stage. StreamStage names = stage.map(name -> name.toLowerCase()); ---- +[#_filter] === filter Similar to `map`, the `filter` is a stateless operator that applies a @@ -32,6 +35,7 @@ predicate to the input to decide whether to pass it to the output. BatchStage names = stage.filter(name -> !name.isEmpty()); ``` +[#_flatMap] === flatMap `flatMap` is equivalent to `map`, with the difference that instead of @@ -737,9 +741,30 @@ just receives the data in the correct order and has O(1) memory needs. === mapStateful -mapStateful is an extension of the simple xref:transforms.adoc#stateless-transforms#map[map] +`mapStateful` is an extension of the xref:transforms.adoc#_map[map] transform. It adds the capability to optionally retain mutable state. +The non-keyed `mapStateful` operation attaches a stateful mapping stage to a pipeline that maintains a single shared state object per processor instance. +The state object is created using `createFn` and is passed to `mapFn` together with each input item. +The function can update the state and emit a single output item (or null if no output is needed). + +For each grouping key, Jet maintains a dedicated state object created by the supplied `createFn`. +Every incoming item is processed by `mapFn`, which receives the current state, the grouping key, and the item. +The function can update the state and emit a single output item (or null if no output is needed). + +If a positive `ttl` is configured, Jet automatically evicts inactive state objects based on event time. +Each state tracks the highest timestamp observed for its key.As watermarks advance, Jet evicts states whose timestamps are older than watermark - ttl. + +When a state is evicted due to TTL expiration, Jet invokes the optional `onEvictFn`, +which can emit a final result derived from the state (for example, a session summary or aggregated total). + +In addition to TTL-based eviction, `mapStateful` supports explicit early eviction using `forceEvictPredicate`. +This predicate is evaluated after processing each input item. If it returns true, the state is removed immediately, without waiting for TTL expiration. +In this case, `onEvictFn` is not invoked. + +Early eviction is useful when the logical lifecycle of a state ends before its TTL expires, such as when a session closes, a transaction completes, or a terminal event is observed. +This helps reduce memory usage and avoids retaining unnecessary state under high load. + The major use case of stateful mapping is recognizing a pattern in the event stream, such as matching start-transaction with end-transaction events based on an event correlation ID. More generally, you can @@ -812,6 +837,159 @@ You will note that we also had to set an expiry time on the states eventually run out of memory as we accumulate more and more transactions. +Here is an example showing how to improve the previous example by evicting the state earlier: +```java +p.readFrom(KafkaSources.kafka(.., "transaction-events")) + .withNativeTimestamps(0) + .groupingKey(event -> event.getTransactionId()) + .mapStateful(MINUTES.toMillis(10), + () -> new TransactionEvent[2], + (state, id, event) -> { + if (event.type() == TRANSACTION_START) { + state[0] = event; + } else if (event.type() == TRANSACTION_END) { + state[1] = event; + } + if (state[0] != null && state[1] != null) { + // we have both start and end events + long duration = state[1].timestamp() - state[0].timestamp(); + return MapUtil.entry(event.transactionId(), duration); + } + // we don't have both events, do nothing for now. + return null; + }, + (state, id, event)-> { + // End of transaction received; this state is no longer needed + return state[1] != null; + } + (state, id, currentWatermark) -> + // if we have not received both events after 10 minutes, + // we will emit a timeout entry + (state[0] == null || state[1] == null) + ? MapUtil.entry(id, TIMED_OUT) + : null + ).writeTo(Sinks.logger()); +``` + +=== flatMapStateful + +`flatMapStateful` is an extension of xref:transforms.adoc#_flatMap[flatMap] +transform. It adds the capability to optionally retain mutable state. + +The non-keyed `flatMapStateful` operation attaches a stateful flat-mapping stage to a pipeline that maintains a single shared state object per processor instance. +The state object is created using `createFn` and is passed to `flatMapFn` together with each input item. +The function can update the state and emit zero, one, or multiple output items using a `Traverser`. + +For each grouping key, Jet maintains a separate state object created by `createFn`. +For every incoming item, Jet invokes `flatMapFn` with the current state, the grouping key, and the item. +The function can update the state and emit zero, one, or multiple output items via a Traverser. +The state object is included in job snapshots and therefore survives job restarts; for this reason, it must be serializable. + +If a positive `ttl` is configured, Jet automatically evicts inactive state objects based on event time. +Each state object tracks the highest timestamp observed for its key. +When the watermark advances, Jet evicts all states whose timestamps are older than watermark - ttl. +Just before eviction, Jet invokes `onEvictFn`, which can emit final output items derived from the state (for example, summary or punctuation records). + +In addition to TTL-based eviction, `flatMapStateful` supports explicit early eviction using `forceEvictPredicate`. +This predicate is evaluated after each input item is processed. If it returns true, the state is removed immediately, +without waiting for watermark progression, and `onEvictFn` is not invoked for that state. +This is useful in scenarios where the logical lifecycle of the state ends earlier than its TTL, such as when a transaction completes, a session closes, or a terminal event is received. + +Early eviction helps reduce memory usage and avoids retaining large or no-longer-needed state until TTL-based cleanup occurs. + +This example processes a stream of transaction events. Each transaction produces multiple events (for example, item updates or intermediate steps). +When a terminal `TRANSACTION_END` event is received, the accumulated state is emitted and immediately evicted using `forceEvictPredicate`, +without waiting for TTL-based eviction. + +```java +StreamStage events = null; + +StreamStage summaries = events + .groupingKey(TransactionEvent::getTransactionId) + .flatMapStateful( + MINUTES.toMillis(10), + TransactionState::new, + (state, txId, event) -> { + switch (event.getType()) { + case ITEM: + state.items.add(event.getItem()); + return Traversers.empty(); + + case TRANSACTION_END: + return Traversers.singleton( + new TransactionSummary(txId, state.items) + ); + + default: + return Traversers.empty(); + } + }, + // Force eviction immediately after transaction end + (state, txId, event) -> + event.getType() == EventType.TRANSACTION_END, + // TTL-based eviction (only used if TRANSACTION_END is never received) + (state, txId, wm) -> + Traversers.singleton( + new TransactionSummary(txId, state.items) + ) + ); +``` +=== filterStateful +`filterStateful` is an extension of xref:transforms.adoc#_filter[filter] +transform. It adds the capability to optionally retain mutable state. + +The non-keyed `filterStateful` operation attaches a stateful filtering stage to a pipeline that maintains a single shared state object per processor instance. +The state object is created using `createFn` and is passed to `filterFn` together with each input item. +The function can update the state and returns a boolean value that determines whether the item should pass downstream. + +For each grouping key, Jet maintains a separate, user-defined state object created by `createFn`. +For every incoming item, Jet invokes `filterFn` with the current state and the item, +allowing the state to be updated and deciding whether the item should pass downstream. + +If a positive `ttl` is configured, the state is considered stale once its `time-to-live` expires. +Each state object tracks the maximum event timestamp observed for its key. +When processing a new event, Jet compares this timestamp with the current watermark and evicts the state +if it is older than watermark - ttl. + +In addition to TTL-based eviction, `filterStateful` supports explicit early eviction via `forceEvictPredicate`. +This predicate is evaluated after each item is processed and allows the state to be removed immediately +when it is no longer needed, for example after a logout or transaction completion event. +Early eviction helps reduce memory usage and avoids retaining state unnecessarily until the watermark advances far enough to trigger TTL-based cleanup. + +This example processes a stream of user events and allows page view events only while the user session is active. +• A session becomes active on LOGIN +• PAGE_VIEW events are allowed only if the session is active +• On LOGOUT, the session state is immediately evicted using forceEvictPredicate + +```java +StreamStage events = null; + +StreamStage filtered = events + .groupingKey(UserEvent::getUserId) + .filterStateful( + HOURS.toMillis(1), + () -> new boolean[1], // state[0] = sessionActive + (state, event) -> { + switch (event.getType()) { + case LOGIN: + state[0] = true; + return false; // do not emit LOGIN + + case PAGE_VIEW: + return state[0]; // allow only if logged in + + case LOGOUT: + return false; // do not emit LOGOUT + + default: + return false; + } + }, + // Force eviction immediately on logout + (state, userId, event) -> + event.getType() == EventType.LOGOUT + ); +``` === co-group / join Co-grouping allows to join any number of inputs on a common key, which From 1b97a2f524bd585d3d9f1366d87b91b986424cf8 Mon Sep 17 00:00:00 2001 From: Olga Shultseva <150694869+shultseva@users.noreply.github.com> Date: Thu, 18 Dec 2025 14:05:32 +0000 Subject: [PATCH 02/11] Update docs/modules/pipelines/pages/transforms.adoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Krzysztof Jamróz <79092062+k-jamroz@users.noreply.github.com> --- docs/modules/pipelines/pages/transforms.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index 5485d1d37..30daa37d9 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -748,7 +748,7 @@ just receives the data in the correct order and has O(1) memory needs. `mapStateful` is an extension of the xref:transforms.adoc#_map[map] transform. It adds the capability to optionally retain mutable state. -The non-keyed `mapStateful` operation attaches a stateful mapping stage to a pipeline that maintains a single shared state object per processor instance. +The non-keyed `mapStateful` operation maintains a single shared state object per processor instance. The state object is created using `createFn` and is passed to `mapFn` together with each input item. The function can update the state and emit a single output item (or null if no output is needed). From 20b7a2fb2bcccedd8fc49d6d0e84abeceb413db1 Mon Sep 17 00:00:00 2001 From: Olga Shultseva <150694869+shultseva@users.noreply.github.com> Date: Thu, 18 Dec 2025 14:06:12 +0000 Subject: [PATCH 03/11] Update docs/modules/pipelines/pages/transforms.adoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Krzysztof Jamróz <79092062+k-jamroz@users.noreply.github.com> --- docs/modules/pipelines/pages/transforms.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index 30daa37d9..4ef442442 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -750,7 +750,7 @@ transform. It adds the capability to optionally retain mutable state. The non-keyed `mapStateful` operation maintains a single shared state object per processor instance. The state object is created using `createFn` and is passed to `mapFn` together with each input item. -The function can update the state and emit a single output item (or null if no output is needed). +The `mapFn` function can update the state and emit a single output item or return null if no output is needed. For each grouping key, Jet maintains a dedicated state object created by the supplied `createFn`. Every incoming item is processed by `mapFn`, which receives the current state, the grouping key, and the item. From cf3aec5bc1ce016893ac5c08fa2c075f9e27cfad Mon Sep 17 00:00:00 2001 From: Olga Shultseva <150694869+shultseva@users.noreply.github.com> Date: Thu, 18 Dec 2025 14:06:27 +0000 Subject: [PATCH 04/11] Update docs/modules/pipelines/pages/transforms.adoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Krzysztof Jamróz <79092062+k-jamroz@users.noreply.github.com> --- docs/modules/pipelines/pages/transforms.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index 4ef442442..f511e63d0 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -757,7 +757,7 @@ Every incoming item is processed by `mapFn`, which receives the current state, t The function can update the state and emit a single output item (or null if no output is needed). If a positive `ttl` is configured, Jet automatically evicts inactive state objects based on event time. -Each state tracks the highest timestamp observed for its key.As watermarks advance, Jet evicts states whose timestamps are older than watermark - ttl. +Each state tracks the highest timestamp observed for its key. As watermarks advance, Jet evicts states whose timestamps are older than `watermark - ttl`. When a state is evicted due to TTL expiration, Jet invokes the optional `onEvictFn`, which can emit a final result derived from the state (for example, a session summary or aggregated total). From a831a71271a3b8e9e768bf24afe4b13604f5db63 Mon Sep 17 00:00:00 2001 From: Olga Shultseva <150694869+shultseva@users.noreply.github.com> Date: Thu, 18 Dec 2025 14:07:04 +0000 Subject: [PATCH 05/11] Update docs/modules/pipelines/pages/transforms.adoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Krzysztof Jamróz <79092062+k-jamroz@users.noreply.github.com> --- docs/modules/pipelines/pages/transforms.adoc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index f511e63d0..cdc62badd 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -841,7 +841,10 @@ You will note that we also had to set an expiry time on the states eventually run out of memory as we accumulate more and more transactions. -Here is an example showing how to improve the previous example by evicting the state earlier: +Here is an example showing how to improve the previous example by evicting the state earlier. +We still need TTL to avoid memory leak if one of the events from pair is never received, +but thanks to forced eviction we can free memory much earlier in most cases. + ```java p.readFrom(KafkaSources.kafka(.., "transaction-events")) .withNativeTimestamps(0) From c91b1262cce61bc0e832e151b13a35cc15fb060c Mon Sep 17 00:00:00 2001 From: oshultseva Date: Thu, 18 Dec 2025 14:25:56 +0000 Subject: [PATCH 06/11] stateful docs --- docs/modules/pipelines/pages/transforms.adoc | 65 +++++--------------- 1 file changed, 14 insertions(+), 51 deletions(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index cdc62badd..fd4c16b46 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -743,6 +743,7 @@ on each member, and then it merges the sorted streams into the final output. The first step requires O(n) heap memory, but the second step just receives the data in the correct order and has O(1) memory needs. +[#_mapStateful] === mapStateful `mapStateful` is an extension of the xref:transforms.adoc#_map[map] @@ -762,11 +763,11 @@ Each state tracks the highest timestamp observed for its key. As watermarks adva When a state is evicted due to TTL expiration, Jet invokes the optional `onEvictFn`, which can emit a final result derived from the state (for example, a session summary or aggregated total). -In addition to TTL-based eviction, `mapStateful` supports explicit early eviction using `forceEvictPredicate`. +In addition to TTL-based eviction, `mapStateful` supports explicit deletion of the state using `deleteStatePredicate`. This predicate is evaluated after processing each input item. If it returns true, the state is removed immediately, without waiting for TTL expiration. In this case, `onEvictFn` is not invoked. -Early eviction is useful when the logical lifecycle of a state ends before its TTL expires, such as when a session closes, a transaction completes, or a terminal event is observed. +Explicit deletion is useful when the logical lifecycle of a state ends before its TTL expires, such as when a session closes, a transaction completes, or a terminal event is observed. This helps reduce memory usage and avoids retaining unnecessary state under high load. The major use case of stateful mapping is recognizing a pattern in the @@ -841,9 +842,9 @@ You will note that we also had to set an expiry time on the states eventually run out of memory as we accumulate more and more transactions. -Here is an example showing how to improve the previous example by evicting the state earlier. +Here is an example showing how to improve the previous example by deletion the state earlier. We still need TTL to avoid memory leak if one of the events from pair is never received, -but thanks to forced eviction we can free memory much earlier in most cases. +but thanks to explicit deletion we can free memory much earlier in most cases. ```java p.readFrom(KafkaSources.kafka(.., "transaction-events")) @@ -880,32 +881,12 @@ p.readFrom(KafkaSources.kafka(.., "transaction-events")) === flatMapStateful -`flatMapStateful` is an extension of xref:transforms.adoc#_flatMap[flatMap] -transform. It adds the capability to optionally retain mutable state. - -The non-keyed `flatMapStateful` operation attaches a stateful flat-mapping stage to a pipeline that maintains a single shared state object per processor instance. -The state object is created using `createFn` and is passed to `flatMapFn` together with each input item. -The function can update the state and emit zero, one, or multiple output items using a `Traverser`. - -For each grouping key, Jet maintains a separate state object created by `createFn`. -For every incoming item, Jet invokes `flatMapFn` with the current state, the grouping key, and the item. -The function can update the state and emit zero, one, or multiple output items via a Traverser. -The state object is included in job snapshots and therefore survives job restarts; for this reason, it must be serializable. - -If a positive `ttl` is configured, Jet automatically evicts inactive state objects based on event time. -Each state object tracks the highest timestamp observed for its key. -When the watermark advances, Jet evicts all states whose timestamps are older than watermark - ttl. -Just before eviction, Jet invokes `onEvictFn`, which can emit final output items derived from the state (for example, summary or punctuation records). - -In addition to TTL-based eviction, `flatMapStateful` supports explicit early eviction using `forceEvictPredicate`. -This predicate is evaluated after each input item is processed. If it returns true, the state is removed immediately, -without waiting for watermark progression, and `onEvictFn` is not invoked for that state. -This is useful in scenarios where the logical lifecycle of the state ends earlier than its TTL, such as when a transaction completes, a session closes, or a terminal event is received. - -Early eviction helps reduce memory usage and avoids retaining large or no-longer-needed state until TTL-based cleanup occurs. +`flatMapStateful` is equivalent to xref:transforms.adoc#_mapStateful[mapStateful], with the difference that instead of +one output item you can have arbitrary number of output items per input +item. This example processes a stream of transaction events. Each transaction produces multiple events (for example, item updates or intermediate steps). -When a terminal `TRANSACTION_END` event is received, the accumulated state is emitted and immediately evicted using `forceEvictPredicate`, +When a terminal `TRANSACTION_END` event is received, the accumulated state is emitted and immediately evicted using `deleteStatePredicate`, without waiting for TTL-based eviction. ```java @@ -931,7 +912,7 @@ StreamStage summaries = events return Traversers.empty(); } }, - // Force eviction immediately after transaction end + // delete state immediately after transaction end (state, txId, event) -> event.getType() == EventType.TRANSACTION_END, // TTL-based eviction (only used if TRANSACTION_END is never received) @@ -942,31 +923,13 @@ StreamStage summaries = events ); ``` === filterStateful -`filterStateful` is an extension of xref:transforms.adoc#_filter[filter] -transform. It adds the capability to optionally retain mutable state. - -The non-keyed `filterStateful` operation attaches a stateful filtering stage to a pipeline that maintains a single shared state object per processor instance. -The state object is created using `createFn` and is passed to `filterFn` together with each input item. -The function can update the state and returns a boolean value that determines whether the item should pass downstream. - -For each grouping key, Jet maintains a separate, user-defined state object created by `createFn`. -For every incoming item, Jet invokes `filterFn` with the current state and the item, -allowing the state to be updated and deciding whether the item should pass downstream. - -If a positive `ttl` is configured, the state is considered stale once its `time-to-live` expires. -Each state object tracks the maximum event timestamp observed for its key. -When processing a new event, Jet compares this timestamp with the current watermark and evicts the state -if it is older than watermark - ttl. - -In addition to TTL-based eviction, `filterStateful` supports explicit early eviction via `forceEvictPredicate`. -This predicate is evaluated after each item is processed and allows the state to be removed immediately -when it is no longer needed, for example after a logout or transaction completion event. -Early eviction helps reduce memory usage and avoids retaining state unnecessarily until the watermark advances far enough to trigger TTL-based cleanup. +Similar to xref:transforms.adoc#_mapStateful[mapStateful], the `filterStateful` is a stateful operator that applies a +predicate to the input to decide whether to pass it to the output. This example processes a stream of user events and allows page view events only while the user session is active. • A session becomes active on LOGIN • PAGE_VIEW events are allowed only if the session is active -• On LOGOUT, the session state is immediately evicted using forceEvictPredicate +• On LOGOUT, the session state is immediately evicted using `deleteStatePredicate` ```java StreamStage events = null; @@ -992,7 +955,7 @@ StreamStage filtered = events return false; } }, - // Force eviction immediately on logout + // delete state immediately on logout (state, userId, event) -> event.getType() == EventType.LOGOUT ); From 97fc6805dfb123118cecb096cfd31218b838638e Mon Sep 17 00:00:00 2001 From: Rob Swain Date: Fri, 19 Dec 2025 16:49:02 +0000 Subject: [PATCH 07/11] copy edit --- docs/modules/pipelines/pages/transforms.adoc | 25 ++++++++++---------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index 433d174ad..1b1d9fd81 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -10,13 +10,13 @@ Stateless transforms do not have any notion of _state_, meaning that all items must be processed independently of any previous items. These methods transform the input into the correct shape that is required by further, -more complex ones.The key feature of these transforms is that +more complex ones. The key feature of these transforms is that they do not have side-effects and they treat each item in isolation. [#_map] === map -Mapping is the simplest kind of stateless transformation.It simply +Mapping is the simplest kind of stateless transformation. It simply applies a function to the input item, and passes the output to the next stage. @@ -785,7 +785,7 @@ In this case, `onEvictFn` is not invoked. Explicit deletion is useful when the logical lifecycle of a state ends before its TTL expires, such as when a session closes, a transaction completes, or a terminal event is observed. This helps reduce memory usage and avoids retaining unnecessary state under high load. -The major use case of stateful mapping is recognizing a pattern in the +The major use case for stateful mapping is recognizing a pattern in the event stream, such as matching start-transaction with end-transaction events based on an event correlation ID. More generally, you can implement any kind of state machine and detect patterns in an input of @@ -857,9 +857,9 @@ You will note that we also had to set an expiry time on the states eventually run out of memory as we accumulate more and more transactions. -Here is an example showing how to improve the previous example by deletion the state earlier. -We still need TTL to avoid memory leak if one of the events from pair is never received, -but thanks to explicit deletion we can free memory much earlier in most cases. +We can improve the previous example by deleting the state earlier. +We still need TTL to avoid a memory leak if one of the events from the pair is never received, +but we can free memory much earlier in most cases with explicit deletion. ```java p.readFrom(KafkaSources.kafka(.., "transaction-events")) @@ -897,8 +897,7 @@ p.readFrom(KafkaSources.kafka(.., "transaction-events")) === flatMapStateful `flatMapStateful` is equivalent to xref:transforms.adoc#_mapStateful[mapStateful], with the difference that instead of -one output item you can have arbitrary number of output items per input -item. +one output item you can have an arbitrary number of output items per input item. This example processes a stream of transaction events. Each transaction produces multiple events (for example, item updates or intermediate steps). When a terminal `TRANSACTION_END` event is received, the accumulated state is emitted and immediately evicted using `deleteStatePredicate`, @@ -938,13 +937,15 @@ StreamStage summaries = events ); ``` === filterStateful -Similar to xref:transforms.adoc#_mapStateful[mapStateful], the `filterStateful` is a stateful operator that applies a + +Similar to xref:transforms.adoc#_mapStateful[mapStateful], `filterStateful` is a stateful operator that applies a predicate to the input to decide whether to pass it to the output. This example processes a stream of user events and allows page view events only while the user session is active. -• A session becomes active on LOGIN -• PAGE_VIEW events are allowed only if the session is active -• On LOGOUT, the session state is immediately evicted using `deleteStatePredicate` + +* A session becomes active on `LOGIN`. +* `PAGE_VIEW` events are allowed only if the session is active. +* On `LOGOUT`, the session state is immediately evicted using `deleteStatePredicate`. ```java StreamStage events = null; From 273a45ce45d643cd0298afa62e26e203a894b128 Mon Sep 17 00:00:00 2001 From: Olga Shultseva <150694869+shultseva@users.noreply.github.com> Date: Tue, 23 Dec 2025 10:45:09 +0000 Subject: [PATCH 08/11] Update docs/modules/pipelines/pages/transforms.adoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Krzysztof Jamróz <79092062+k-jamroz@users.noreply.github.com> --- docs/modules/pipelines/pages/transforms.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index 1b1d9fd81..3ef28a164 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -768,7 +768,7 @@ The non-keyed `mapStateful` operation maintains a single shared state object per The state object is created using `createFn` and is passed to `mapFn` together with each input item. The `mapFn` function can update the state and emit a single output item or return null if no output is needed. -For each grouping key, Jet maintains a dedicated state object created by the supplied `createFn`. +In the keyed `mapStateful` Jet maintains a dedicated state object created by the supplied `createFn` for each grouping key. Every incoming item is processed by `mapFn`, which receives the current state, the grouping key, and the item. The function can update the state and emit a single output item (or null if no output is needed). From 452c284084e4a48762cf6b45ce7e49d728a0281e Mon Sep 17 00:00:00 2001 From: Olga Shultseva <150694869+shultseva@users.noreply.github.com> Date: Tue, 23 Dec 2025 10:45:21 +0000 Subject: [PATCH 09/11] Update docs/modules/pipelines/pages/transforms.adoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Krzysztof Jamróz <79092062+k-jamroz@users.noreply.github.com> --- docs/modules/pipelines/pages/transforms.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index 3ef28a164..2571e6906 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -866,7 +866,7 @@ p.readFrom(KafkaSources.kafka(.., "transaction-events")) .withNativeTimestamps(0) .groupingKey(event -> event.getTransactionId()) .mapStateful(MINUTES.toMillis(10), - () -> new TransactionEvent[2], + () -> new TransactionEvent[2], (state, id, event) -> { if (event.type() == TRANSACTION_START) { state[0] = event; From 5b0a80644e026c6ae74710815bb9a85c1f4060be Mon Sep 17 00:00:00 2001 From: Olga Shultseva <150694869+shultseva@users.noreply.github.com> Date: Tue, 23 Dec 2025 10:46:04 +0000 Subject: [PATCH 10/11] Update docs/modules/pipelines/pages/transforms.adoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Krzysztof Jamróz <79092062+k-jamroz@users.noreply.github.com> --- docs/modules/pipelines/pages/transforms.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index 2571e6906..ad60161b3 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -882,8 +882,8 @@ p.readFrom(KafkaSources.kafka(.., "transaction-events")) return null; }, (state, id, event)-> { - // End of transaction received; this state is no longer needed - return state[1] != null; + // we have both start and end events + return state[0] != null && state[1] != null; } (state, id, currentWatermark) -> // if we have not received both events after 10 minutes, From 65cd170cf3075d7129d81e30506f40dded707122 Mon Sep 17 00:00:00 2001 From: oshultseva Date: Tue, 23 Dec 2025 11:06:00 +0000 Subject: [PATCH 11/11] stateful docs --- docs/modules/pipelines/pages/transforms.adoc | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/docs/modules/pipelines/pages/transforms.adoc b/docs/modules/pipelines/pages/transforms.adoc index ad60161b3..915b8ae9f 100644 --- a/docs/modules/pipelines/pages/transforms.adoc +++ b/docs/modules/pipelines/pages/transforms.adoc @@ -764,24 +764,24 @@ just receives the data in the correct order and has O(1) memory needs. `mapStateful` is an extension of the xref:transforms.adoc#_map[map] transform. It adds the capability to optionally retain mutable state. -The non-keyed `mapStateful` operation maintains a single shared state object per processor instance. +* `The non-keyed `mapStateful` operation maintains a single shared state object per processor instance. The state object is created using `createFn` and is passed to `mapFn` together with each input item. The `mapFn` function can update the state and emit a single output item or return null if no output is needed. -In the keyed `mapStateful` Jet maintains a dedicated state object created by the supplied `createFn` for each grouping key. +* In the keyed `mapStateful` Jet maintains a dedicated state object created by the supplied `createFn` for each grouping key. Every incoming item is processed by `mapFn`, which receives the current state, the grouping key, and the item. The function can update the state and emit a single output item (or null if no output is needed). - ++ If a positive `ttl` is configured, Jet automatically evicts inactive state objects based on event time. Each state tracks the highest timestamp observed for its key. As watermarks advance, Jet evicts states whose timestamps are older than `watermark - ttl`. - ++ When a state is evicted due to TTL expiration, Jet invokes the optional `onEvictFn`, which can emit a final result derived from the state (for example, a session summary or aggregated total). - ++ In addition to TTL-based eviction, `mapStateful` supports explicit deletion of the state using `deleteStatePredicate`. This predicate is evaluated after processing each input item. If it returns true, the state is removed immediately, without waiting for TTL expiration. In this case, `onEvictFn` is not invoked. - ++ Explicit deletion is useful when the logical lifecycle of a state ends before its TTL expires, such as when a session closes, a transaction completes, or a terminal event is observed. This helps reduce memory usage and avoids retaining unnecessary state under high load. @@ -791,9 +791,6 @@ events based on an event correlation ID. More generally, you can implement any kind of state machine and detect patterns in an input of any complexity. -As with other stateful operations, you can also use a `groupingKey` to -have a unique state per key. - For example, consider a pipeline that matches incoming `TRANSACTION_START` events to `TRANSACTION_END` events which can arrive unordered and when both are received outputs how long the transaction