feat: enable native_datafusion scan in auto mode#3781
feat: enable native_datafusion scan in auto mode#3781andygrove merged 22 commits intoapache:mainfrom
Conversation
|
Some |
auto scan mode now uses native_datafusion scan, which does not support row index generation, so these tests should be skipped for auto mode just as they are for native_datafusion mode.
…usion mode Remove IgnoreCometNativeDataFusion annotations from 3.5.8 Spark SQL test diff for issues that have been closed: apache#3311, apache#3313, apache#3314, apache#3315, apache#3320, apache#3401.
…ve_datafusion mode" This reverts commit d7fd22e.
…_datafusion The auto scan mode now tries native_datafusion first and falls back to native_iceberg_compat if the scan cannot be converted, rather than always using native_iceberg_compat.
Remove IgnoreCometNativeDataFusion tags for issues that have been resolved and closed: apache#3312, apache#3313, apache#3314, apache#3315.
This reverts commit 96622cf.
dev/diffs/3.4.3.diff
Outdated
| @@ -2377,7 +2377,7 @@ index 351c6d698fc..583d9225cca 100644 | |||
|
|
|||
| test(s"invalid row index column type - ${conf.desc}") { | |||
| + // native_datafusion Parquet scan does not support row index generation. | |||
There was a problem hiding this comment.
should we point to github issue?
| .orElse { | ||
| // clear explain info tags from the failed nativeDataFusionScan | ||
| // attempt so they don't leak into the fallback path | ||
| scanExec.unsetTagValue(CometExplainInfo.EXTENSION_INFO) |
There was a problem hiding this comment.
just wondering why we don't unset tags for failed in variants below?
There was a problem hiding this comment.
if fallback reasons are added for native_datafusion then we need to remove them before trying native_iceberg_compat, otherwise the plan still falls back to Spark.
There was a problem hiding this comment.
I feel we should keep the reason, so that we know why we used native_iceberg_compat instead of native_datafusion
There was a problem hiding this comment.
If we keep a fallback reason then we will fall back to Spark. The goal was to use native_iceberg_compat for the cases that native_datafusion cannot support.
There was a problem hiding this comment.
An alternative approach is for auto mode to just try native_datafusion and then fall back to Spark, rather than try native_iceberg_compat first.
There was a problem hiding this comment.
I updated this PR to just use native_datafusion in auto mode. It is likely that I will need to fix some test assumptions as well, but will wait for CI to run first.
…diffs Add reference to apache#3432 in ParquetRowIndexSuite assume() comments across all three Spark version diffs.
| .orElse { | ||
| // clear explain info tags from the failed nativeDataFusionScan | ||
| // attempt so they don't leak into the fallback path | ||
| scanExec.unsetTagValue(CometExplainInfo.EXTENSION_INFO) |
There was a problem hiding this comment.
I feel we should keep the reason, so that we know why we used native_iceberg_compat instead of native_datafusion
| .get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) { | ||
| val scan = CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) | ||
| val isNativeDataFusionScan = | ||
| scan == CometConf.SCAN_NATIVE_DATAFUSION || scan == CometConf.SCAN_AUTO |
There was a problem hiding this comment.
nit: this is not strictly correct since we could still fallback to native_iceberg_compat when mode is AUTO
There was a problem hiding this comment.
With the latest commits, auto mode no longer falls back to native_iceberg_compat
… shuffle tests native_datafusion scan needs CometExecRule.transform() to wrap it in CometNativeExec, so it cannot work when COMET_EXEC_ENABLED is false. Add a guard in nativeDataFusionScan() to return None in this case, matching the existing pattern for v2 Iceberg scans. Update shuffle test suites to use COMET_EXEC_ENABLED=true so they run with auto mode (native_datafusion) scans. Remove stale assume guards that checked for explicit native_datafusion config but didn't account for auto mode now using native_datafusion.
Auto mode now falls back to Spark directly instead of native_iceberg_compat. Document that native_datafusion scan requires spark.comet.exec.enabled=true.
|
@parthchandra @comphead @mbutrovich Thanks for the feedback so far. I simplified this PR so that |
| CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> asyncShuffleEnable.toString, | ||
| CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString, | ||
| CometConf.COMET_EXEC_ENABLED.key -> "false", | ||
| CometConf.COMET_EXEC_ENABLED.key -> "true", |
There was a problem hiding this comment.
native_datafusion scan cannot work if COMET_EXEC_ENABLED is disabled
| return None | ||
| } | ||
| if (!CometNativeScan.isSupported(scanExec)) { | ||
| return None |
There was a problem hiding this comment.
should we also add withInfo here?
There was a problem hiding this comment.
yes, good point, I will add this
There was a problem hiding this comment.
actually, nm, withInfo is called from within isSupported already
comphead
left a comment
There was a problem hiding this comment.
Thanks @andygrove it is lgtm overall
mbutrovich
left a comment
There was a problem hiding this comment.
Just one overall question that applies to the Spark SQL tests.
Row index tests were being skipped for native_datafusion and auto scan modes. Instead of skipping, add CometNativeScanExec to the pattern match in ParquetRowIndexSuite so the test correctly counts partitions and output rows when the v1 scan uses CometNativeScanExec. Also fix FileBasedDataSourceSuite line length violation in 3.5.8 diff.
mbutrovich
left a comment
There was a problem hiding this comment.
Thanks @andygrove! Glad we’re making progress cleaning up the scan code.
Comet's NativeBatchReader throws RuntimeException instead of SparkException for invalid row index column types. Skip the test for SCAN_NATIVE_DATAFUSION and SCAN_AUTO modes. See apache#3886
|
Merged. Thanks @comphead @mbutrovich |
Which issue does this PR close?
Part of #3321
Rationale for this change
Improve performance with default config. The
native_datafusionscan does not have the FFI roundtrip overhead thatnative_iceberg_compathas.What changes are included in this PR?
native_datafusionfirst, falling back to Spark when not supported (no longer falls back tonative_iceberg_compat)COMET_EXEC_ENABLEDguard tonativeDataFusionScan()since the scan node requiresCometExecRule.transform()to wrap it inCometNativeExecCOMET_EXEC_ENABLED=trueso they run with auto mode (native_datafusion) scansassume(COMET_NATIVE_SCAN_IMPL != SCAN_NATIVE_DATAFUSION)guards from shuffle testsHow are these changes tested?
Existing tests. Shuffle test suites (
CometShuffleSuite,CometAsyncShuffleSuite,CometNativeShuffleSuite) updated and verified to pass with auto mode native_datafusion scans.