From 1359a3f71bd5b9cc650e6f1a072184284a54f81a Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Tue, 25 Apr 2023 12:36:04 +0200 Subject: [PATCH] join: count effort on inputs to the `results` closure Previously, the join operator's effort counting was based on the outputs of the passed `results` closure. This had the unfortunate effect that fueling would become ineffective in cases where the `results` closure always returns empty iterators. In such scenarios, the join operator would only yield once it has exhausted its inputs, negatively impacting concurrent operators and possibly application interactivity. Having the `results` closure return empty iterators is useful when the caller does not care about the results of a join anymore (e.g. when the dataflow is shutting down). By returning nothing from `results`, the updates queued up before the join can be drained quickly, without feeding additional updates to downstream operators. This commit attempts to improve the situation by changing the way the join operator counts its effort. Instead of counting the number of `results` outputs, we can count the number of inputs. That way, even if `results` decides to stop emitting updates, join fueling continues to work as expected. The new behavior is consistent with the half join operator from `dogsdogsdogs`, which also uses the input, rather than the output, of the `output_func` for its work counting. --- src/operators/join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/operators/join.rs b/src/operators/join.rs index 8710ac07b..7a2172df2 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -720,6 +720,7 @@ where // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { + effort += 1; let key = batch.key(batch_storage); for (d, t, r) in logic(key, v1, v2, &t, r1, r2) { temp.push(((d, t), r)); @@ -733,7 +734,6 @@ where // consolidation, and then deposit results in `session`. crate::consolidation::consolidate(temp); - effort += temp.len(); for ((d, t), r) in temp.drain(..) { session.give((d, t, r)); }