Add batch, conflate, expand, and extrapolate Flow operators#417
Merged
Add batch, conflate, expand, and extrapolate Flow operators#417
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds four new Flow operators for controlling element emission rates and handling backpressure scenarios: batch/batchWeighted for aggregating elements when downstream is slow, conflate/conflateWithSeed for unbounded aggregation, expand for expanding elements via iterators with freshness bias, and extrapolate for filling gaps between upstream elements.
Changes:
- Adds
batchWeightedas the core batching operator with weighted cost function and configurable seed/aggregate functions - Adds
batch,conflate, andconflateWithSeedas specialized variants delegating tobatchWeighted - Adds
expandfor iterator-based element expansion favoring freshness over completeness - Adds
extrapolatefor emitting upstream elements followed by generated values to fill gaps
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| core/src/main/scala/ox/flow/FlowOps.scala | Implements batchWeighted, batch, conflate, conflateWithSeed, expand, and extrapolate operators with proper async boundaries and error handling |
| core/src/test/scala/ox/flow/FlowOpsBatchTest.scala | Tests count-based batching and empty flow handling |
| core/src/test/scala/ox/flow/FlowOpsBatchWeightedTest.scala | Tests weighted batching with custom cost functions, maximal batch creation, oversized elements, and error propagation from all components |
| core/src/test/scala/ox/flow/FlowOpsConflateTest.scala | Tests unbounded aggregation when downstream is busy, pass-through behavior, and custom seed aggregation |
| core/src/test/scala/ox/flow/FlowOpsExpandTest.scala | Tests iterator expansion with receive-first bias, infinite/finite iterators, and error propagation from upstream/expander/iterator |
| core/src/test/scala/ox/flow/FlowOpsExtrapolateTest.scala | Tests element repetition, iterator switching, initial value emission, and error propagation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
e201d2c to
4be94d4
Compare
…ators Implements backpressure-aware operators for aggregating, conflating, expanding, and extrapolating flow elements based on downstream demand: - batchWeighted: core operator that batches by weighted cost function - batch: count-based batching (delegates to batchWeighted) - conflate/conflateWithSeed: unbounded aggregation when downstream is slow - expand: iterator-based element expansion with freshness bias - extrapolate: emits upstream elements then fills gaps with generated values Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4be94d4 to
073ab30
Compare
The parent flow error could propagate to childOutputChannel before value 10 was picked up by the main loop's select, causing receive() to see the error instead of the value. Fix by gating the error on the test having received the value from the output. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
batchWeightedas the core batching operator (weighted cost function with configurable seed/aggregate)batch(count-based, delegates tobatchWeighted),conflate/conflateWithSeed(unbounded aggregation when downstream is slow)expand(iterator-based element expansion with freshness bias) andextrapolate(emits upstream elements then fills gaps with generated values)Test plan
FlowOpsBatchTest— verifies count-based aggregation and empty flowFlowOpsBatchWeightedTest— verifies pass-through, aggregation, maximal batch creation with custom cost, oversized element handling, empty flow, and error propagation from upstream/costFn/seed/aggregateFlowOpsConflateTest— verifies conflate aggregation, pass-through, empty flow, error propagation, and conflateWithSeed with custom seedFlowOpsExpandTest— verifies infinite/finite expanders, iterator replacement, gated upstream, empty flow, and error propagation from upstream/expander/iteratorFlowOpsExtrapolateTest— verifies repetition, element switching, initial value, pass-through, multi-value extrapolation, empty flow (with/without initial), and error propagation🤖 Generated with Claude Code