Skip to content

fix: Check maintains_input_order in EnforceDistribution#35

Open
rkrishn7 wants to merge 2 commits intobranch-51from
rohan/enforce-dist-maintains-order
Open

fix: Check maintains_input_order in EnforceDistribution#35
rkrishn7 wants to merge 2 commits intobranch-51from
rohan/enforce-dist-maintains-order

Conversation

@rkrishn7
Copy link
Collaborator

@rkrishn7 rkrishn7 commented Mar 20, 2026

In the event that there is no ordering requirement from the parent, and the required distribution is a single partition, we should make sure to check that input order is not maintained before replacing nodes with their order preserving variants.

This is already done when the parent's distribution is unspecified -- we should do the same when it's single/hash partitioned.

Context

This was discovered when debugging a seemingly sub-optimal plan post remote execution optimization (certain details have been omitted):

| physical_plan after RemoteExecutionOptimizerRule           | SortPreservingMergeExec: [product_code@8 ASC NULLS LAST, ticker@12 ASC NULLS LAST, first_trade_date@3 ASC NULLS LAST, last_trade_date@4 ASC NULLS LAST], fetch=101                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                                                            |   OneOfExec(best=4), costs=[20000008,30000008,30000008,30000008,8], required_input_ordering=[[product_code@8 ASC NULLS LAST]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                                                            |     UnionExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                                                            |       RemoteExec: service_urls=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                                                            |         ProjectionExec: expr=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                                                            |           FileScanExec: table = us_futures_contracts, file_count=2, output_ordering=["product_code@1 ASC NULLS LAST", "ticker@0 ASC NULLS LAST", "first_trade_date@3 ASC NULLS LAST", "last_trade_date@4 ASC NULLS LAST"], scan_direction=Native, page_size=1024, fetch=101, plans=[], partitions=[4 (1 files), 5 (1 files)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                                                            |       RemoteExec: service_urls=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                                                            |         ProjectionExec: expr=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                                                            |           FileScanExec: table = us_futures_contracts, file_count=3, output_ordering=["product_code@1 ASC NULLS LAST", "ticker@0 ASC NULLS LAST", "first_trade_date@3 ASC NULLS LAST", "last_trade_date@4 ASC NULLS LAST"], scan_direction=Native, page_size=1024, fetch=101, plans=[], partitions=[0 (1 files), 2 (1 files), 3 (1 files)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                                                            |       RemoteExec: service_urls=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                                                            |         ProjectionExec: expr=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                                                            |           FileScanExec: table = us_futures_contracts, file_count=3, output_ordering=["product_code@1 ASC NULLS LAST", "ticker@0 ASC NULLS LAST", "first_trade_date@3 ASC NULLS LAST", "last_trade_date@4 ASC NULLS LAST"], scan_direction=Native, page_size=1024, fetch=101, plans=[], partitions=[1 (1 files), 6 (1 files), 7 (1 files)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                                                            |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| physical_plan after ReverseOrder                           | SAME TEXT AS ABOVE                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| physical_plan after EnforceDistribution                    | SortPreservingMergeExec: [product_code@8 ASC NULLS LAST, ticker@12 ASC NULLS LAST, first_trade_date@3 ASC NULLS LAST, last_trade_date@4 ASC NULLS LAST], fetch=101                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                                                            |   OneOfExec(best=4), costs=[20000008,30000008,30000008,30000008,8], required_input_ordering=[[product_code@8 ASC NULLS LAST]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                                                            |     UnionExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                                                            |       RemoteExec: service_urls=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                                                            |         CoalescePartitionsExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                                                            |           ProjectionExec: expr=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                                                            |             FileScanExec: table = us_futures_contracts, file_count=2, output_ordering=["product_code@1 ASC NULLS LAST", "ticker@0 ASC NULLS LAST", "first_trade_date@3 ASC NULLS LAST", "last_trade_date@4 ASC NULLS LAST"], scan_direction=Native, page_size=1024, fetch=101, plans=[], partitions=[4 (1 files), 5 (1 files)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                                                            |       RemoteExec: service_urls=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                                                            |         CoalescePartitionsExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                                                            |           ProjectionExec: expr=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                                                            |             FileScanExec: table = us_futures_contracts, file_count=3, output_ordering=["product_code@1 ASC NULLS LAST", "ticker@0 ASC NULLS LAST", "first_trade_date@3 ASC NULLS LAST", "last_trade_date@4 ASC NULLS LAST"], scan_direction=Native, page_size=1024, fetch=101, plans=[], partitions=[0 (1 files), 2 (1 files), 3 (1 files)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|                                                            |       RemoteExec: service_urls=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                                                            |         CoalescePartitionsExec                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                                                            |           ProjectionExec: expr=[...]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                                                            |             FileScanExec: table = us_futures_contracts, file_count=3, output_ordering=["product_code@1 ASC NULLS LAST", "ticker@0 ASC NULLS LAST", "first_trade_date@3 ASC NULLS LAST", "last_trade_date@4 ASC NULLS LAST"], scan_direction=Native, page_size=1024, fetch=101, plans=[], partitions=[1 (1 files), 6 (1 files), 7 (1 files)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|                                                            |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |

Each child of RemoteExec has a declared input ordering...which the top-level SPM leverages. However, after EnforceDistribution runs, CoalescePartitionsExec is added under RemoteExec, rather than SPM. Since RemoteExec maintains its input ordering, EnforceDistribution should use SortPreservingMergeExec here instead. The CoalescePartitionsExec ends up producing a suboptimal plan that requires a SortExec, which a subsequent run of EnforceSorting adds.

…d does not assert that it maintains its input order. If it does, we should not replace order preserving variants
Copy link
Collaborator

@zhuqi-lucas zhuqi-lucas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, don't forget to cherry-pick to branch-52

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants