♻️ Make forEach a resource for guaranteed stream subscription#209
♻️ Make forEach a resource for guaranteed stream subscription#209
Conversation
📝 WalkthroughWalkthroughThe Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
commit: |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@stream-helpers/for-each.ts`:
- Around line 42-53: The spawned Task currently doesn't own the subscription
because subscription is acquired in the outer resource scope (subscription is
set before spawn), so halt() on the returned Task stops the child but leaves the
subscription alive; move the subscription acquisition into the spawned child
(i.e., call yield* stream.next()/subscribe inside the generator passed to spawn)
so the spawned Operation<TClose> owns and cleans up the subscription, mirroring
drain()’s semantics (see resource(), spawn(), subscription, task), and add a
unit test that calls task.halt() early to assert the subscription is closed when
the task halts.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 5d8da522-afc1-4044-a920-1878860256d0
📒 Files selected for processing (8)
stream-helpers/batch.test.tsstream-helpers/for-each.test.tsstream-helpers/for-each.tsstream-helpers/reduce.test.tsstream-helpers/take-until.test.tsstream-helpers/take-while.test.tsstream-helpers/take.test.tswatch/test/watch.test.ts
| return resource(function* (provide) { | ||
| const subscription = yield* stream; | ||
| const task = yield* spawn(function* (): Operation<TClose> { | ||
| let next = yield* subscription.next(); | ||
| while (!next.done) { | ||
| yield* fn(next.value); | ||
| next = yield* subscription.next(); | ||
| } | ||
| return next.value; | ||
| }, | ||
| }; | ||
| }); | ||
| yield* provide(task); | ||
| }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find and examine the for-each.ts file
find . -name "for-each.ts" -type fRepository: thefrontside/effectionx
Length of output: 95
🏁 Script executed:
# Also locate any test files for forEach
find . -name "*for-each*" -o -name "*forEach*" | grep -E "\.(test|spec)\.(ts|tsx)?"Repository: thefrontside/effectionx
Length of output: 100
🏁 Script executed:
# Check the directory structure for stream-helpers
ls -la stream-helpers/ 2>/dev/null || find . -type d -name "stream-helpers"Repository: thefrontside/effectionx
Length of output: 2466
🏁 Script executed:
cat -n stream-helpers/for-each.tsRepository: thefrontside/effectionx
Length of output: 2071
🏁 Script executed:
cat -n stream-helpers/for-each.test.tsRepository: thefrontside/effectionx
Length of output: 1521
🏁 Script executed:
cat -n stream-helpers/drain.tsRepository: thefrontside/effectionx
Length of output: 1482
🏁 Script executed:
# Search for patterns where provide() is used with spawn
rg "provide\(" stream-helpers/ -B 5 -A 2Repository: thefrontside/effectionx
Length of output: 890
🏁 Script executed:
# Look for tests that halt or cancel tasks from stream operations
rg "\.halt\(\)" stream-helpers/ -B 3 -A 3Repository: thefrontside/effectionx
Length of output: 523
🏁 Script executed:
# Check other stream operations for comparison with forEach pattern
cat -n stream-helpers/reduce.tsRepository: thefrontside/effectionx
Length of output: 1868
🏁 Script executed:
# Look for documentation or tests about resource lifecycle
rg "resource\(" stream-helpers/ -B 2 -A 5Repository: thefrontside/effectionx
Length of output: 1046
🏁 Script executed:
# Check policies directory for guidance
find . -path "*policies*" -name "*.md" | head -20Repository: thefrontside/effectionx
Length of output: 702
🏁 Script executed:
# Look at the stream-helpers README
cat stream-helpers/README.md | head -100Repository: thefrontside/effectionx
Length of output: 2706
🏁 Script executed:
# Search for other resource patterns with spawn
rg "spawn\(" . -B 10 -A 10 | grep -A 15 "resource"Repository: thefrontside/effectionx
Length of output: 6048
🏁 Script executed:
# Check if there are examples of task.halt() in tests
rg "task\.halt\|\.halt\(\)" . --type ts -B 5 -A 5 | head -80Repository: thefrontside/effectionx
Length of output: 49
🏁 Script executed:
# Read the relevant policies
cat .policies/ergonomics.mdRepository: thefrontside/effectionx
Length of output: 4267
🏁 Script executed:
cat .policies/structured-concurrency.mdRepository: thefrontside/effectionx
Length of output: 3956
🏁 Script executed:
# Look for any examples of Task halt behavior in tests
rg "\.halt\(\)" . --type ts -A 3 -B 3Repository: thefrontside/effectionx
Length of output: 4099
🏁 Script executed:
# Check parallel.ts to see how similar resource/spawn patterns handle this
cat -n ./fx/parallel.ts | head -80Repository: thefrontside/effectionx
Length of output: 3030
🏁 Script executed:
# Verify Effection's resource lifecycle semantics
rg "resource\(" . --type ts -A 8 | grep -A 8 "provide" | head -50Repository: thefrontside/effectionx
Length of output: 2468
🏁 Script executed:
# Check if there are any comments or docs about this pattern in Effection
rg "resource.*spawn.*provide" . -C 5Repository: thefrontside/effectionx
Length of output: 49
🏁 Script executed:
# Look for any issues or discussions about task.halt with resources
find . -name "*.md" -type f -exec grep -l "halt\|cancellation\|cleanup" {} \; | grep -E "README|GUIDE|DESIGN"Repository: thefrontside/effectionx
Length of output: 187
🏁 Script executed:
# Check the README files for any discussion of resource/task lifecycle
cat ./effect-ts/README.md | grep -A 10 -B 5 "resource\|halt\|cleanup" | head -40Repository: thefrontside/effectionx
Length of output: 1403
🏁 Script executed:
# Look for how forEach should work by checking if there are any examples
rg "forEach" . --type ts -B 5 -A 10 | grep -E "(yield|task|halt)" -A 3 -B 3 | head -80Repository: thefrontside/effectionx
Length of output: 3895
The returned Task doesn't own the subscription lifetime.
Line 43 subscribes in the resource scope, but line 44 spawns a child task that only consumes from the subscription. Halting the returned Task stops the drain loop but leaves the resource scope mounted until the caller's scope exits, keeping the subscription alive. This violates .policies/ergonomics.md ("Helpers must preserve structured concurrency semantics") and .policies/structured-concurrency.md ("Cancellation, cleanup, and task lifetimes should be obvious and enforced").
Compare this to drain() (line 27–32), where the subscription lifetime equals the operation lifetime—clear and correct. Move the subscription acquisition into the spawned child task, or document that the returned Task doesn't own cleanup and callers must scope the entire operation. Add a test for early task.halt() to verify the intended semantics.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@stream-helpers/for-each.ts` around lines 42 - 53, The spawned Task currently
doesn't own the subscription because subscription is acquired in the outer
resource scope (subscription is set before spawn), so halt() on the returned
Task stops the child but leaves the subscription alive; move the subscription
acquisition into the spawned child (i.e., call yield* stream.next()/subscribe
inside the generator passed to spawn) so the spawned Operation<TClose> owns and
cleans up the subscription, mirroring drain()’s semantics (see resource(),
spawn(), subscription, task), and add a unit test that calls task.halt() early
to assert the subscription is closed when the task halts.
Motivation
forEachas a blockingOperation<TClose>requires callers to wrap it inspawn()+sleep(0)when used as a background consumer — the sleep is needed to yield control so the spawned task can subscribe before values are sent. This pattern appears inthrottle.test.ts,batch.test.ts, andwatch.test.ts.As discussed in #207, making
forEacha resource eliminates this by guaranteeing the stream subscription is established before the caller resumes.Approach
Rewrote
forEachto returnOperation<Task<TClose>>using Effection'sresource():provide)provide)Task<TClose>handle — caller resumes with subscription activeBackground consumers become simpler — no
spawn()orsleep(0)needed:Blocking consumers await the task:
Files changed:
stream-helpers/for-each.ts— resource rewritestream-helpers/for-each.test.ts— double yield (2 sites)stream-helpers/batch.test.ts— remove spawn+sleep (2 sites)stream-helpers/reduce.test.ts— double yield (3 sites)stream-helpers/take.test.ts— double yield (5 sites)stream-helpers/take-while.test.ts— double yield (5 sites)stream-helpers/take-until.test.ts— double yield (5 sites)watch/test/watch.test.ts— remove spawn wrapper (1 site)Summary by CodeRabbit
Refactoring
forEachhelper to return a Task handle instead of close value directly.Tests