[X-2182] Upgrade DF to 52.3#34
Conversation
There was a problem hiding this comment.
Pull request overview
Updates the repository to align with Apache DataFusion 52.3 behaviors and APIs, including execution-plan node id annotation, JSON array-format support, improved Parquet metrics/pruning, and refreshed test expectations/changelogs.
Changes:
- Add execution-plan
node_idplumbing and annotate physical plans with node ids. - Extend JSON reading to support both NDJSON and JSON array format (
newline_delimited=false), including proto/config support. - Enhance Parquet pruning/metrics (limit-based row group pruning, “fully matched” stats) and update SQL logic tests/docs accordingly.
Reviewed changes
Copilot reviewed 106 out of 110 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/source/user-guide/sql/scalar_functions.md | Updates NVL docs to match new behavior/plan output. |
| docs/source/user-guide/explain-usage.md | Documents new limit_pruned_row_groups metric. |
| dev/changelog/48.0.1.md | Adds 48.0.1 changelog. |
| dev/changelog/48.0.0.md | Adjusts DF 48.0.0 changelog counts/entries. |
| dev/changelog/44.0.0.md | Adjusts DF 44.0.0 changelog counts/entries. |
| datafusion/sqllogictest/test_files/unnest.slt | Updates physical plan expectations. |
| datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part | Updates filter ordering in plan expectation. |
| datafusion/sqllogictest/test_files/string/string_view.slt | Updates NVL explain plan expectation. |
| datafusion/sqllogictest/test_files/select.slt | Updates coalesce behavior/expected failures. |
| datafusion/sqllogictest/test_files/repartition_subset_satisfaction.slt | Updates repartition/SPM plan expectations. |
| datafusion/sqllogictest/test_files/preserve_file_partitioning.slt | Updates repartition/SPM plan expectations. |
| datafusion/sqllogictest/test_files/nvl.slt | Removes NVL short-circuiting plan/behavior assertions. |
| datafusion/sqllogictest/test_files/limit_pruning.slt | Adds new SLT coverage for limit row-group pruning metrics. |
| datafusion/sqllogictest/test_files/json.slt | Adds SLT coverage for JSON array format option. |
| datafusion/sqllogictest/test_files/joins.slt | Updates join plan expectations. |
| datafusion/sqllogictest/test_files/join_disable_repartition_joins.slt | Updates join plan expectations (Coalesce/Filter placement). |
| datafusion/sqllogictest/test_files/group_by.slt | Updates repartition order-preserving plan expectations. |
| datafusion/sqllogictest/test_files/datetime/timestamps.slt | Formatting-only change (blank line). |
| datafusion/sqllogictest/test_files/copy.slt | Adds EXPLAIN ANALYZE metrics expectations for Parquet sink. |
| datafusion/sqllogictest/test_files/agg_func_substitute.slt | Updates repartition order-preserving plan expectations. |
| datafusion/pruning/src/pruning_predicate.rs | Formatting-only change. |
| datafusion/proto/tests/cases/roundtrip_physical_plan.rs | Annotates node ids for roundtrip equality. |
| datafusion/proto/tests/cases/roundtrip_logical_plan.rs | Switches to JsonReadOptions in tests. |
| datafusion/proto/src/logical_plan/file_formats.rs | Adds newline-delimited JSON option to proto conversion. |
| datafusion/proto/src/lib.rs | Documents node_id workaround in bytes roundtrip example. |
| datafusion/proto/src/generated/datafusion_proto_common.rs | Regenerates proto structs with newline_delimited. |
| datafusion/proto-common/src/to_proto/mod.rs | Serializes newline_delimited in JsonOptions. |
| datafusion/proto-common/src/generated/prost.rs | Regenerates prost output with newline_delimited. |
| datafusion/proto-common/src/generated/pbjson.rs | Regenerates pbjson serde for newline_delimited. |
| datafusion/proto-common/src/from_proto/mod.rs | Deserializes newline_delimited with default true. |
| datafusion/proto-common/proto/datafusion_common.proto | Adds newline_delimited field to JsonOptions message. |
| datafusion/physical-plan/src/work_table.rs | Adds with_node_id implementation. |
| datafusion/physical-plan/src/windows/window_agg_exec.rs | Adds with_node_id implementation. |
| datafusion/physical-plan/src/unnest.rs | Adds with_node_id implementation. |
| datafusion/physical-plan/src/union.rs | Adds with_node_id implementation. |
| datafusion/physical-plan/src/streaming.rs | Adds with_node_id implementation. |
| datafusion/physical-plan/src/spill/spill_pool.rs | Updates spill metrics assertions to reflect incremental bytes. |
| datafusion/physical-plan/src/spill/mod.rs | Lowers log level; updates spill metrics tests; adds new test. |
| datafusion/physical-plan/src/spill/in_progress_spill_file.rs | Updates spill metrics accounting to track incremental bytes. |
| datafusion/physical-plan/src/sorts/sort_preserving_merge.rs | Adds with_preserve_order and with_node_id. |
| datafusion/physical-plan/src/sorts/sort.rs | Adds with_node_id. |
| datafusion/physical-plan/src/sorts/partial_sort.rs | Adds with_node_id. |
| datafusion/physical-plan/src/repartition/mod.rs | Adds with_node_id. |
| datafusion/physical-plan/src/recursive_query.rs | Adds with_node_id. |
| datafusion/physical-plan/src/projection.rs | Adds with_node_id and with_preserve_order. |
| datafusion/physical-plan/src/placeholder_row.rs | Adds with_node_id. |
| datafusion/physical-plan/src/node_id.rs | New: helper to annotate node ids across plans. |
| datafusion/physical-plan/src/limit.rs | Adds required-ordering tracking and with_node_id. |
| datafusion/physical-plan/src/lib.rs | Exports new node_id module. |
| datafusion/physical-plan/src/joins/symmetric_hash_join.rs | Adds with_node_id. |
| datafusion/physical-plan/src/joins/sort_merge_join/exec.rs | Adds with_node_id. |
| datafusion/physical-plan/src/joins/hash_join/exec.rs | Adds with_node_id. |
| datafusion/physical-plan/src/joins/cross_join.rs | Adds with_node_id. |
| datafusion/physical-plan/src/filter.rs | Adds with_node_id and with_preserve_order. |
| datafusion/physical-plan/src/execution_plan.rs | Adds with_node_id/with_preserve_order APIs; stores node_id in PlanProperties. |
| datafusion/physical-plan/src/empty.rs | Adds with_node_id. |
| datafusion/physical-plan/src/display.rs | Temporarily disables printing node_id in display output. |
| datafusion/physical-plan/src/coalesce_partitions.rs | Adds with_node_id and with_preserve_order. |
| datafusion/physical-plan/src/analyze.rs | Adds with_node_id. |
| datafusion/physical-optimizer/src/limit_pushdown.rs | Threads “preserve order” through limit pushdown. |
| datafusion/physical-optimizer/src/ensure_coop.rs | Refines cooperative wrapping logic; adds extensive tests. |
| datafusion/physical-optimizer/src/enforce_sorting/mod.rs | Preserves ordering requirements when replacing Sort+fetch with Limit. |
| datafusion/physical-optimizer/src/enforce_distribution.rs | Refines handling of order-preserving variants, fetch, and streaming benefits. |
| datafusion/physical-expr/src/window/window_expr.rs | Formatting-only change. |
| datafusion/physical-expr-common/src/metrics/value.rs | Extends pruning metrics with “fully matched” counter + formatting. |
| datafusion/optimizer/src/push_down_filter.rs | Adds predicate inference based on equalities; adjusts join predicate handling. |
| datafusion/optimizer/src/optimize_projections/mod.rs | Refactors TableScan rewrite for clarity. |
| datafusion/optimizer/src/common_subexpr_eliminate.rs | Updates CSE behavior for short-circuiting scalar functions. |
| datafusion/functions/src/core/nvl2.rs | Implements NVL2 execution directly (no simplify-to-case). |
| datafusion/functions/src/core/nvl.rs | Implements NVL directly; updates docs/aliases/tests. |
| datafusion/functions/src/core/coalesce.rs | Implements COALESCE execution directly (no simplify-to-case). |
| datafusion/ffi/src/session/mod.rs | Updates expected plan debug output including node_id. |
| datafusion/expr/src/udf.rs | Removes conditional_arguments API; keeps short_circuits. |
| datafusion/expr/src/lib.rs | Re-exports SerializableAccumulator. |
| datafusion/expr/src/expr_rewriter/mod.rs | Adds replace_col_with_expr. |
| datafusion/expr-common/src/accumulator.rs | Adds SerializableAccumulator trait and hook on Accumulator. |
| datafusion/execution/src/runtime_env.rs | Exposes spilling progress API. |
| datafusion/execution/src/disk_manager.rs | Tracks active spill files + exposes SpillingProgress. |
| datafusion/datasource/src/source.rs | Adds with_node_id to DataSourceExec. |
| datafusion/datasource/src/sink.rs | Adds with_node_id to DataSinkExec. |
| datafusion/datasource-parquet/src/row_group_filter.rs | Adds “fully matched” detection + limit pruning logic. |
| datafusion/datasource-parquet/src/opener.rs | Calls row-group limit pruning when limit is present. |
| datafusion/datasource-parquet/src/metrics.rs | Adds limit_pruned_row_groups and bloom-filter matched counter. |
| datafusion/datasource-parquet/src/file_format.rs | Adds ParquetSink write metrics and exposes them via DataSink metrics. |
| datafusion/datasource-json/src/source.rs | Adds JSON array reading pipeline (stream + file), options, and tests. |
| datafusion/datasource-json/src/mod.rs | Exports new utils module. |
| datafusion/datasource-json/src/file_format.rs | Adds newline-delimited option, schema inference for JSON array format. |
| datafusion/datasource-json/Cargo.toml | Adds deps needed for JSON array format pipeline. |
| datafusion/core/tests/physical_optimizer/enforce_distribution.rs | Updates test for new return type of replace_order_preserving_variants. |
| datafusion/core/tests/parquet/row_group_pruning.rs | Adds limit pruning tests + fully-matched statistics expectations. |
| datafusion/core/tests/parquet/mod.rs | Extends pruning metric aggregation with fully-matched + limit pruning. |
| datafusion/core/tests/expr_api/mod.rs | Removes NVL2 “must simplify to case” test. |
| datafusion/core/tests/dataframe/mod.rs | Updates JSON read options type usage. |
| datafusion/core/tests/dataframe/dataframe_functions.rs | Removes NVL2 short-circuit test. |
| datafusion/core/tests/data/json_empty_array.json | Adds test fixture. |
| datafusion/core/tests/data/json_array.json | Adds test fixture. |
| datafusion/core/src/prelude.rs | Exports JsonReadOptions instead of NdJsonReadOptions. |
| datafusion/core/src/execution/session_state.rs | Annotates node ids on created physical plans. |
| datafusion/core/src/execution/context/json.rs | Switches API to JsonReadOptions. |
| datafusion/core/src/datasource/physical_plan/json.rs | Updates tests to use JsonReadOptions. |
| datafusion/core/src/datasource/listing/table.rs | Updates tests to use JsonReadOptions. |
| datafusion/core/src/datasource/file_format/options.rs | Introduces JsonReadOptions and deprecates NdJsonReadOptions. |
| datafusion/core/src/datasource/file_format/json.rs | Adds JSON array format tests and updates existing tests. |
| datafusion/core/src/dataframe/parquet.rs | Adds tests validating ParquetSink metrics. |
| datafusion/core/src/dataframe/mod.rs | Updates doc example to use JsonReadOptions. |
| datafusion/common/src/config.rs | Adds newline_delimited config field for JSON options. |
| datafusion-examples/examples/custom_data_source/csv_json_opener.rs | Updates JsonOpener constructor to pass newline_delimited. |
| Cargo.toml | Enables arrow-ipc zstd; adds tokio-stream dependency. |
| Cargo.lock | Updates lockfile for new deps. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| fn with_node_id( | ||
| self: Arc<Self>, | ||
| node_id: usize, | ||
| ) -> Result<Option<Arc<dyn ExecutionPlan>>> { | ||
| let mut new_plan = DataSourceExec::new(Arc::clone(&self.data_source)); | ||
| let new_props = new_plan.cache.clone().with_node_id(node_id); | ||
| new_plan.cache = new_props; | ||
| Ok(Some(Arc::new(new_plan))) |
There was a problem hiding this comment.
DataSourceExec::with_node_id rebuilds the exec via DataSourceExec::new(...), which recomputes cache from the data source and can drop any plan-property overrides applied on the existing exec (e.g. via with_constraints / with_partitioning). Prefer cloning self and only updating cache with the new node_id to avoid changing semantics/properties during node id annotation.
| fn with_node_id( | ||
| self: Arc<Self>, | ||
| node_id: usize, | ||
| ) -> Result<Option<Arc<dyn ExecutionPlan>>> { | ||
| let mut new_plan = | ||
| GlobalLimitExec::new(Arc::clone(self.input()), self.skip, self.fetch); | ||
| let new_props = new_plan.cache.clone().with_node_id(node_id); | ||
| new_plan.cache = new_props; | ||
| Ok(Some(Arc::new(new_plan))) | ||
| } |
There was a problem hiding this comment.
GlobalLimitExec::with_node_id reconstructs the node with GlobalLimitExec::new(...), which resets the new required_ordering field back to None. This can silently drop order-preservation requirements set earlier (e.g. when a SortExec is removed and replaced by a LimitExec). Ensure the new plan copies required_ordering from self when rebuilding.
| } else if let Expr::Column(col) = right.as_ref() { | ||
| // Only add to map if left side is a literal | ||
| if matches!(left.as_ref(), Expr::Literal(_, _)) { | ||
| equality_map.insert(col.clone(), *right.clone()); | ||
| final_predicates.push(predicate.clone()); | ||
| } |
There was a problem hiding this comment.
In infer_predicates_from_equalities, the literal = column case inserts the column expression into equality_map (*right.clone()), but it should store the literal (*left.clone()). As written, substitutions like t1.b > t2.b with 3 = t2.b will incorrectly rewrite to t1.b > t2.b (no-op) instead of t1.b > 3.
| // MAX: disable this for now since we don't need it displayed + it fails many DF tests | ||
| //if let Some(node_id) = plan.properties().node_id() { | ||
| // write!(self.f, ", node_id={}", node_id)?; | ||
| //} | ||
|
|
There was a problem hiding this comment.
This commented-out block includes a personal note ("MAX:") and a disabled code snippet. Consider replacing with a neutral TODO referencing an issue/PR, or remove the comment entirely, to keep the codebase professional and avoid carrying dead code.
| /// If not specified, the default level for the compression algorithm is used. | ||
| pub compression_level: Option<u32>, default = None | ||
| pub schema_infer_max_rec: Option<usize>, default = None | ||
| /// The JSON format to use when reading files. |
There was a problem hiding this comment.
json feature which has been contributed to DF 53
| /// # async fn main() -> Result<()> { | ||
| /// let ctx = SessionContext::new(); | ||
| /// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?; | ||
| /// let df = ctx.read_json("tests/data/unnest.json", JsonReadOptions::default()).await?; |
There was a problem hiding this comment.
json feature which has been contributed to DF 53
| use object_store::local::LocalFileSystem; | ||
| use regex::Regex; | ||
| use rstest::rstest; | ||
| // ==================== Test Helpers ==================== |
There was a problem hiding this comment.
json feature which has been contributed to DF 53
| self | ||
| } | ||
|
|
||
| /// Set whether to read as newline-delimited JSON. |
There was a problem hiding this comment.
json feature which has been contributed to DF 53
| self.total_matched | ||
| } | ||
|
|
||
| pub fn total_fully_matched(&self) -> usize { |
There was a problem hiding this comment.
limit feature which has been merged to DF 53
| } | ||
|
|
||
| // Set the expected fully matched row groups by statistics | ||
| fn with_fully_matched_by_stats( |
There was a problem hiding this comment.
limit feature which has been merged to DF 53
| .with_type(MetricType::SUMMARY) | ||
| .pruning_metrics("row_groups_pruned_bloom_filter", partition); | ||
|
|
||
| let limit_pruned_row_groups = MetricBuilder::new(metrics) |
There was a problem hiding this comment.
limit feature which has been merged to DF 53
| &self.is_fully_matched | ||
| } | ||
|
|
||
| /// Prunes the access plan based on the limit and fully contained row groups. |
There was a problem hiding this comment.
limit feature which has been merged to DF 53
| } | ||
|
|
||
| /// Returns the current spilling progress | ||
| pub fn spilling_progress(&self) -> SpillingProgress { |
There was a problem hiding this comment.
in progress feature which has been merged to DF 53
| assert_eq!( | ||
| format!("{physical_plan:?}"), | ||
| "EmptyExec { schema: Schema { fields: [], metadata: {} }, partitions: 1, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None } }" | ||
| "EmptyExec { schema: Schema { fields: [], metadata: {} }, partitions: 1, cache: PlanProperties { eq_properties: EquivalenceProperties { eq_group: EquivalenceGroup { map: {}, classes: [] }, oeq_class: OrderingEquivalenceClass { orderings: [] }, oeq_cache: OrderingEquivalenceCache { normal_cls: OrderingEquivalenceClass { orderings: [] }, leading_map: {} }, constraints: Constraints { inner: [] }, schema: Schema { fields: [], metadata: {} } }, partitioning: UnknownPartitioning(1), emission_type: Incremental, boundedness: Bounded, evaluation_type: Lazy, scheduling_type: Cooperative, output_ordering: None, node_id: Some(0) } }" |
There was a problem hiding this comment.
node id feature
| /// | ||
| /// Only applicable to single-child operators; returns false for multi-child | ||
| /// operators (e.g. joins) where child substitution semantics are ambiguous. | ||
| fn preserving_order_enables_streaming( |
There was a problem hiding this comment.
enforce sort for streaming
xudong963
left a comment
There was a problem hiding this comment.
Looks like all the diffs are the custom features from us?
| /// If not specified, the default level for the compression algorithm is used. | ||
| pub compression_level: Option<u32>, default = None | ||
| pub schema_infer_max_rec: Option<usize>, default = None | ||
| /// The JSON format to use when reading files. |
| equality_map.insert(col.clone(), *right.clone()); | ||
| final_predicates.push(predicate.clone()); | ||
| } | ||
| } else if let Expr::Column(col) = right.as_ref() { |
There was a problem hiding this comment.
When processing 3 = col_b (literal on left), this maps col_b → col_b instead of col_b → 3. The substitution will be a no-op for this case, meaning predicates inferred from literal = column equalities won't actually substitute.
I didn't find the method in upstream, might be refactored already
There was a problem hiding this comment.
Fixed it and added more unit test in latest PR.
|
Thanks @xudong963 for review , i will do the final check next Monday. |
No description provided.