From 5ae457dff2beba5ca56d7abaa17198bbc8849ad7 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy <47869999+rkrishn7@users.noreply.github.com> Date: Fri, 20 Mar 2026 12:26:56 -0700 Subject: [PATCH 1/2] fix: Ensure order preserving variants are replaced ONLY when the child does not assert that it maintains its input order. If it does, we should not replace order preserving variants --- datafusion/physical-optimizer/src/enforce_distribution.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index bae8d3688194f..448cdd5ba1732 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1405,7 +1405,7 @@ pub fn ensure_distribution( &plan, &child.plan, ); - if !streaming_benefit { + if !streaming_benefit && !maintains { child = replace_order_preserving_variants(child, false)?.0; } From 78622d6cd92607de705e22e5e569e6101f193fce Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy <47869999+rkrishn7@users.noreply.github.com> Date: Fri, 20 Mar 2026 13:06:23 -0700 Subject: [PATCH 2/2] Update plan in joins.slt --- datafusion/sqllogictest/test_files/joins.slt | 31 ++++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index b2a9442f0bbdb..80a2a63e68c8f 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3471,22 +3471,21 @@ logical_plan 08)----------TableScan: annotated_data projection=[a, b] physical_plan 01)SortPreservingMergeExec: [a@0 ASC] -02)--SortExec: expr=[a@0 ASC], preserve_partitioning=[true] -03)----ProjectionExec: expr=[a@0 as a, last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]@3 as last_col1] -04)------AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]] -05)--------CoalesceBatchesExec: target_batch_size=2 -06)----------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), input_partitions=2 -07)------------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]] -08)--------------CoalesceBatchesExec: target_batch_size=2 -09)----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] -10)------------------CoalesceBatchesExec: target_batch_size=2 -11)--------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 -12)----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -13)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true -14)------------------CoalesceBatchesExec: target_batch_size=2 -15)--------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 -16)----------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -17)------------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], file_type=csv, has_header=true +02)--ProjectionExec: expr=[a@0 as a, last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]@3 as last_col1] +03)----AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]], ordering_mode=PartiallySorted([0]) +04)------CoalesceBatchesExec: target_batch_size=2 +05)--------RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC +06)----------AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, c@2 as c], aggr=[last_value(r.b) ORDER BY [r.a ASC NULLS FIRST]], ordering_mode=PartiallySorted([0]) +07)------------CoalesceBatchesExec: target_batch_size=2 +08)--------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)] +09)----------------CoalesceBatchesExec: target_batch_size=2 +10)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2 +11)--------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +12)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true +13)----------------CoalesceBatchesExec: target_batch_size=2 +14)------------------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2, preserve_order=true, sort_exprs=a@0 ASC, b@1 ASC NULLS LAST +15)--------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +16)----------------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC, b@1 ASC NULLS LAST], file_type=csv, has_header=true query TT EXPLAIN SELECT *