Add throttle stream operator with trailing-edge semantics#207
Add throttle stream operator with trailing-edge semantics#207
Conversation
Add a throttle helper to @effectionx/stream-helpers that emits at most one value per delay window while guaranteeing the final value is never dropped. Uses leading+trailing semantics following the pattern from batch.ts with spawn + timebox for time-windowed upstream consumption.
📝 WalkthroughWalkthroughAdds a new exported Changes
Sequence DiagramsequenceDiagram
participant Consumer
participant Throttle as Throttle<br/>Transformer
participant Upstream as Upstream<br/>Stream
participant Timebox
Consumer->>Throttle: next()
Throttle->>Upstream: subscription.next()
Upstream-->>Throttle: value A
Throttle->>Timebox: start(delayMS)
Throttle-->>Consumer: emit A (leading)
Note over Upstream,Throttle: during window, Throttle continues upstream reads\nkeeping only the latest as trailing
Consumer->>Throttle: next() (during window)
Throttle->>Upstream: subscription.next()
Upstream-->>Throttle: value B (store as pending)
Throttle->>Upstream: subscription.next()
Upstream-->>Throttle: value C (replace pending)
Timebox-->>Throttle: timeout
Throttle-->>Consumer: emit C (trailing)
Throttle->>Upstream: continue loop or propagate completion
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Important Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 error)
✅ Passed checks (3 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
commit: |
Use void instead of never for TClose so source.close() is callable.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@stream-helpers/throttle.test.ts`:
- Around line 82-93: The test uses wall-clock sleeps (sleep(50)) inside the
faucet.pour generator which is too close to the throttle window (~20ms) and can
flake on CI; update the sleep calls in this test (the sleep(...) invocations
inside the faucet.pour block) to use a larger margin (e.g., sleep(100) or
greater) or add an inline comment documenting the timing assumption (throttle
window ~20ms) so the test is robust under load.
In `@stream-helpers/throttle.ts`:
- Around line 12-16: The throttle function currently treats delayMS <= 0 as
passthrough implicitly; update the implementation or docs to make this explicit:
either add a guard at the top of export function throttle<A>(delayMS: number)
that returns the original stream when delayMS <= 0 (e.g., if (delayMS <= 0)
return (stream) => stream) or add a clear JSDoc note on the throttle function
stating that non-positive delayMS disables throttling and results in passthrough
behavior; reference the throttle function signature and the delayMS parameter
when applying the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 37f7d7e5-1ff6-4db6-ad2b-38567047aee7
📒 Files selected for processing (3)
stream-helpers/mod.tsstream-helpers/throttle.test.tsstream-helpers/throttle.ts
There was a problem hiding this comment.
♻️ Duplicate comments (1)
stream-helpers/throttle.test.ts (1)
82-87:⚠️ Potential issue | 🟠 MajorReplace non-zero
sleep()waits with deterministic synchronization.Lines 84, 86, and 112 use wall-clock waits (
sleep(50/60)) to drive assertions. This is brittle under CI load and violates the no-sleep sync policy for tests. Please switch these to deterministic coordination (state/lifecycle assertions) instead of elapsed-time waits.Based on learnings:
sleep(0)is acceptable as a yield point, but non-zero sleeps used for waiting are not.
As per coding guidelines.policies/no-sleep-test-sync.md: tests must not usesleep()for waiting; use deterministic helpers.Also applies to: 112-112
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-helpers/throttle.test.ts` around lines 82 - 87, The test uses non-zero wall-clock sleeps inside the faucet.pour generator (calls to sleep(50)) to drive timing-sensitive assertions; replace those sleeps with deterministic synchronization by introducing a handshake or barrier between producer and test consumer (e.g., yield/ack or a "ready" signal) so the test awaits explicit state change rather than elapsed time; update the generator in throttle.test.ts (the faucet.pour block that calls send and sleep) to emit or yield a deterministic sync token and make the test consume/await that token (or use an existing test helper such as a nextTick/waitForNextYield helper) so assertions run only after the explicit lifecycle event instead of sleep(50).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@stream-helpers/throttle.test.ts`:
- Around line 82-87: The test uses non-zero wall-clock sleeps inside the
faucet.pour generator (calls to sleep(50)) to drive timing-sensitive assertions;
replace those sleeps with deterministic synchronization by introducing a
handshake or barrier between producer and test consumer (e.g., yield/ack or a
"ready" signal) so the test awaits explicit state change rather than elapsed
time; update the generator in throttle.test.ts (the faucet.pour block that calls
send and sleep) to emit or yield a deterministic sync token and make the test
consume/await that token (or use an existing test helper such as a
nextTick/waitForNextYield helper) so assertions run only after the explicit
lifecycle event instead of sleep(50).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: c3f646f6-d676-47e9-9baf-480318379ab9
📒 Files selected for processing (1)
stream-helpers/throttle.test.ts
Fix two bugs in the original throttle operator: 1. Leading value was delayed: the absorption loop ran synchronously before returning, deferring the first emission by delayMS. 2. Trailing value emitted too early: it was returned on the immediate next pull with no delay enforcement, allowing two emissions 0ms apart. The rewrite uses inline absorption deferred to the next next() call: - Leading: returns immediately, records a windowDeadline - Next call: absorbs upstream values until deadline, then emits trailing - Tasks spawned via spawn() live in the subscription scope and survive across next() calls (matching the batch.ts pattern) Also: - Add 8 timing-aware tests covering leading/trailing/window/close - Bump stream-helpers to 0.9.0 (new exported feature)
Replace the deferred-absorption design with a persistent pump task (matching the valve.ts pattern). The pump owns all upstream reads and writes ready-to-emit IteratorResults to a shared ArraySignal buffer. next() simply shifts from the buffer. This fixes two issues: 1. Stale intermediate values when the consumer is slower than the window — the pump absorbs eagerly regardless of pull timing, so the trailing value is always the latest seen during the window. 2. Subscription deadlocks from spawning/halting pulls in child scopes then re-calling subscription.next() from the parent scope. Added test: "yields the latest window value when consumer is slower than the window" — sends [1,2,3] within a 100ms window, waits 500ms before the second next(), and asserts 3 (not 2) is returned.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
stream-helpers/throttle.ts (1)
23-25:⚠️ Potential issue | 🟠 MajorShort-circuit non-positive delays before starting the pump.
When
delayMS <= 0, the window exits immediately, so this task drains upstream as fast as it can and buffers intooutput. That changes backpressure and memory behavior instead of behaving like a no-op.♻️ Proposed fix
export function throttle<A>( delayMS: number, ): <TClose>(stream: Stream<A, TClose>) => Stream<A, TClose> { + if (delayMS <= 0) { + return <TClose>(stream: Stream<A, TClose>) => stream; + } + return <TClose>(stream: Stream<A, TClose>): Stream<A, TClose> => ({Also applies to: 53-55
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-helpers/throttle.ts` around lines 23 - 25, The throttle function should short-circuit when delayMS <= 0 instead of starting the pump and buffering; update the exported throttle<A> overload (and the other overload at lines 53-55) to immediately return the identity/no-op stream transformer when delayMS <= 0 so upstream is drained with original backpressure semantics rather than being pumped into an internal buffer; locate the throttle implementation and add an early-return branch that returns the input stream unchanged if delayMS <= 0, referencing the throttle function name and the Stream<A, TClose> return type to find where to modify.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@stream-helpers/throttle.ts`:
- Around line 29-30: The terminal IteratorResult currently gets queued only once
(created via createArraySignal and referenced as output) so after a consumer
shifts it future next() calls hang; modify the throttle/pump logic to stash the
terminal result in a doneResult variable when you enqueue the final { value,
done: true } and, whenever emitting to output (or when consumers call next and
output would be empty), if doneResult is set then replay/emit that doneResult
instead of leaving the queue empty; update the code paths around the pump and
any places that push terminal values (the spots corresponding to the
createArraySignal<IteratorResult<A, TClose>> output usage and the pump/emit
logic) to check doneResult before returning or pushing new items so the terminal
result is durable for subsequent next() calls.
---
Duplicate comments:
In `@stream-helpers/throttle.ts`:
- Around line 23-25: The throttle function should short-circuit when delayMS <=
0 instead of starting the pump and buffering; update the exported throttle<A>
overload (and the other overload at lines 53-55) to immediately return the
identity/no-op stream transformer when delayMS <= 0 so upstream is drained with
original backpressure semantics rather than being pumped into an internal
buffer; locate the throttle implementation and add an early-return branch that
returns the input stream unchanged if delayMS <= 0, referencing the throttle
function name and the Stream<A, TClose> return type to find where to modify.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: ef9a6061-24d1-4f23-acc5-6aa63f097b21
📒 Files selected for processing (3)
stream-helpers/package.jsonstream-helpers/throttle.test.tsstream-helpers/throttle.ts
stream-helpers/throttle.ts
Outdated
| const output = yield* createArraySignal<IteratorResult<A, TClose>>([]); | ||
|
|
There was a problem hiding this comment.
Persist the terminal result after the queue drains.
The terminal IteratorResult is only queued once. After a consumer shifts that item, later next() calls block forever because the pump has already returned and nothing can push into output again. Keep a doneResult stash and replay it whenever the queue is empty.
🛠️ Proposed fix
*[Symbol.iterator]() {
const subscription = yield* stream;
const output = yield* createArraySignal<IteratorResult<A, TClose>>([]);
+ let doneResult: IteratorReturnResult<TClose> | undefined;
// ── pump ──────────────────────────────────────────────────────
// A persistent background task that owns all upstream reads.
@@
// ── leading edge ────────────────────────────────────────
const first = yield* subscription.next();
if (first.done) {
+ doneResult = first;
output.push(first);
return;
}
@@
if (tb.value.done) {
// Stream closed during window — flush trailing, then done
if (hasTrailing) {
output.push({ done: false as const, value: trailing as A });
}
+ doneResult = tb.value;
output.push(tb.value);
return;
}
@@
return {
*next() {
+ if (doneResult !== undefined && output.length === 0) {
+ return doneResult;
+ }
return yield* output.shift();
},
};Also applies to: 42-44, 63-69, 85-87
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@stream-helpers/throttle.ts` around lines 29 - 30, The terminal IteratorResult
currently gets queued only once (created via createArraySignal and referenced as
output) so after a consumer shifts it future next() calls hang; modify the
throttle/pump logic to stash the terminal result in a doneResult variable when
you enqueue the final { value, done: true } and, whenever emitting to output (or
when consumers call next and output would be empty), if doneResult is set then
replay/emit that doneResult instead of leaving the queue empty; update the code
paths around the pump and any places that push terminal values (the spots
corresponding to the createArraySignal<IteratorResult<A, TClose>> output usage
and the pump/emit logic) to check doneResult before returning or pushing new
items so the terminal result is durable for subsequent next() calls.
…stantly The pump eagerly fills an output buffer, so a slow consumer could drain multiple pre-buffered values back-to-back, violating the documented delayMS spacing guarantee. Fix: add a delay gate in next() that sleeps for the remaining interval when the previous emission was less than delayMS ago. Output items are tagged with a `flush` flag so the stream-completion exception (trailing-before-close) still bypasses the gate. Added test: "enforces spacing when consumer drains a backlog" — pumps two windows while the consumer is idle, then drains and asserts the gap between consecutive emissions respects delayMS.
Replace all non-zero sleep() calls in the main test body with deterministic coordination per no-sleep-test-sync policy: - "does not emit trailing before delay" — replaced sleep(delay*0.4) checkpoint with timebox assertion proving next() does not resolve within half the window - "emits at most once per delay window" — removed sleep(delay+20) gap between bursts; is() already syncs, and the pump blocks on its next leading pull - "yields latest window value" — removed sleep(500); renamed to "buffers the latest window value, not the oldest"; output.shift() blocks until the pump pushes - "enforces spacing when consumer drains a backlog" — removed sleep(delay*3); same rationale Fixed Biome formatting on long emissions.push() lines.
There was a problem hiding this comment.
♻️ Duplicate comments (2)
stream-helpers/throttle.test.ts (1)
257-263:⚠️ Potential issue | 🟠 MajorReplace the remaining wall-clock sleeps with deterministic coordination.
The
sleep(50)calls here are still being used to synchronize the test, so this case can flake under load and it no longer complies with the repo's no-sleep policy. Please drive the later sends with deterministic checkpoints ortimebox()-based assertions instead of fixed waits.As per coding guidelines, ".policies/no-sleep-test-sync.md - Tests must not use
sleep()for waiting; use deterministic helpers".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-helpers/throttle.test.ts` around lines 257 - 263, The test currently uses wall-clock sleeps inside the faucet.pour generator (the yield* sleep(50) calls) which can flake; replace them with deterministic coordination by driving the later send(2) and send(3) from explicit checkpoints or a test-controlled signal: for example, have faucet.pour accept and yield to a checkpoint or token (e.g., yield* waitCheckpoint('after1')) before calling yield* send(2) and similarly for send(3), or alternatively keep the sends immediate and use a timebox()-based assertion (timebox(() => collectNext(), maxMs)) to assert timing instead of sleeping; update the generator calls around faucet.pour and any helper functions invoked (send, faucet.pour, waitCheckpoint/timebox) so the test no longer calls sleep().stream-helpers/throttle.ts (1)
41-41:⚠️ Potential issue | 🟠 MajorPersist the terminal result after the queue drains.
doneis still only enqueued once. BecausecreateArraySignal.shift()blocks on an empty queue, anynext()after the first terminal read will hang forever once the pump has returned. Stash the terminalIteratorReturnResultand replay it whenever the output queue is empty after completion.🛠️ Suggested fix
const subscription = yield* stream; const output = yield* createArraySignal<OutputItem<A, TClose>>([]); + let doneResult: IteratorReturnResult<TClose> | undefined; @@ const first = yield* subscription.next(); if (first.done) { + doneResult = first; output.push({ result: first, flush: true }); return; } @@ if (tb.value.done) { // Stream closed during window — flush trailing, then done if (hasTrailing) { output.push({ result: { done: false as const, value: trailing as A }, flush: true, }); } + doneResult = tb.value; output.push({ result: tb.value, flush: true }); return; } @@ *next(): Operation<IteratorResult<A, TClose>> { + if (doneResult !== undefined && output.valueOf().length === 0) { + return doneResult; + } const { result, flush } = yield* output.shift();Also applies to: 54-56, 78-87, 107-127
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@stream-helpers/throttle.ts` at line 41, The output queue currently enqueues the terminal IteratorReturnResult only once which causes subsequent createArraySignal.shift() calls to block; modify the code around the output created by createArraySignal<OutputItem<A, TClose>> to stash the terminal IteratorReturnResult when the stream/pump completes and, instead of enqueuing it just once, replay that stored terminal result whenever consumers call shift() on an empty queue after completion. Concretely: in the pump/finish path where you currently push or enqueue the `done`/return value, capture that IteratorReturnResult into a local variable (e.g., terminalResult) and change dequeue/shift behavior to return terminalResult immediately if set and the internal array is empty; apply the same stash-and-replay pattern wherever you handle terminal enqueueing for OutputItem (the other similar blocks noted in the comment).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@stream-helpers/throttle.test.ts`:
- Around line 257-263: The test currently uses wall-clock sleeps inside the
faucet.pour generator (the yield* sleep(50) calls) which can flake; replace them
with deterministic coordination by driving the later send(2) and send(3) from
explicit checkpoints or a test-controlled signal: for example, have faucet.pour
accept and yield to a checkpoint or token (e.g., yield*
waitCheckpoint('after1')) before calling yield* send(2) and similarly for
send(3), or alternatively keep the sends immediate and use a timebox()-based
assertion (timebox(() => collectNext(), maxMs)) to assert timing instead of
sleeping; update the generator calls around faucet.pour and any helper functions
invoked (send, faucet.pour, waitCheckpoint/timebox) so the test no longer calls
sleep().
In `@stream-helpers/throttle.ts`:
- Line 41: The output queue currently enqueues the terminal IteratorReturnResult
only once which causes subsequent createArraySignal.shift() calls to block;
modify the code around the output created by createArraySignal<OutputItem<A,
TClose>> to stash the terminal IteratorReturnResult when the stream/pump
completes and, instead of enqueuing it just once, replay that stored terminal
result whenever consumers call shift() on an empty queue after completion.
Concretely: in the pump/finish path where you currently push or enqueue the
`done`/return value, capture that IteratorReturnResult into a local variable
(e.g., terminalResult) and change dequeue/shift behavior to return
terminalResult immediately if set and the internal array is empty; apply the
same stash-and-replay pattern wherever you handle terminal enqueueing for
OutputItem (the other similar blocks noted in the comment).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 74cd98c3-6a04-4371-8689-ded914c9c490
📒 Files selected for processing (2)
stream-helpers/throttle.test.tsstream-helpers/throttle.ts
cowboyd
left a comment
There was a problem hiding this comment.
one comment, otherwise, love this
| }, stream), | ||
| ); | ||
|
|
||
| yield* sleep(0); |
There was a problem hiding this comment.
Why do we need a sleep. Maybe use a resource for your loop?
There was a problem hiding this comment.
Made forEach a resource in #209 — how does this look? It'd need to be merged before this PR.
There was a problem hiding this comment.
I'd hate to lose synchronous foreach and would prefer it to stay blocking.
I think there are a couple of ways to have our cake and eat it too.
- have both a foreground and background versions
forEach<T, TClose>(stream: Stream<T, TClose>, body): Operation<TClose>-- foregrounduuseForEach<T, TClose>(stream: Stream<T, TClose>, body): Future<TClose>-- background
-
allow forEach to accept both a subscription and a stream. That allows you to subscribe first and then spawn safely in the background.
-
add a "live stream" helper that takes a stream which does (2) for you
useBoundStream(stream: Stream): Stream
not sure what a good name for this is, but it takes a stream, subscribes to it, and then return a stream that returns that subscription. This ensures that any stream interfaces can use it it the background while ensuring that they do not miss any items.
There was a problem hiding this comment.
Does that indicate enough context? I worry that the programmer (ai or otherwise) might see that and think: "well of course I want to use the stream"
What differenties this from a normal steam is that it laid over a "live" subscription.
useSubscribedStream()useLiveStream()useUniqueStream()
There was a problem hiding this comment.
In observables word, I believe these are called hot observables. We could use that terminology to reduce naming dissonance for similar concepts.
useHotStream()Sounds steamy.
Motivation
Naive throttling limits how often a stream emits but can silently drop the final value when a burst ends mid-window. As described in "Your Throttling Is Lying to You" by Gábor Koos, this is a subtle but critical failure mode — the last state is often the most important one (resize dimensions, analytics events, etc.).
@effectionx/stream-helpershad no throttle operator, so consumers had to roll their own or misusebatchfor rate limiting.Approach
Added a
throttle(delayMS)stream operator with leading + trailing semantics:delayMSwindow are dropped, only the latest is keptImplementation follows the same
spawn+timeboxpattern established bybatch.tsfor time-windowed upstream consumption. ThelastPulltask survives acrossnext()calls, and adoneResultstash ensures trailing → done ordering on stream close.Files changed:
stream-helpers/throttle.ts— operator implementationstream-helpers/throttle.test.ts— 5 test cases covering leading/trailing/close/spacing/multi-windowstream-helpers/mod.ts— re-export