diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index fef4a5b9e9..1e44a9f28c 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::operators::{copy_array, copy_or_unpack_array, CopyMode}; +use crate::execution::operators::copy_array; use crate::{ errors::CometError, execution::{ @@ -81,8 +81,6 @@ pub struct ScanExec { jvm_fetch_time: Time, /// Time spent in FFI arrow_ffi_time: Time, - /// Whether native code can assume ownership of batches that it receives - arrow_ffi_safe: bool, } impl ScanExec { @@ -91,7 +89,6 @@ impl ScanExec { input_source: Option>, input_source_description: &str, data_types: Vec, - arrow_ffi_safe: bool, ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); @@ -112,7 +109,6 @@ impl ScanExec { data_types.len(), &jvm_fetch_time, &arrow_ffi_time, - arrow_ffi_safe, )?; timer.stop(); batch @@ -143,7 +139,6 @@ impl ScanExec { jvm_fetch_time, arrow_ffi_time, schema, - arrow_ffi_safe, }) } @@ -178,7 +173,6 @@ impl ScanExec { self.data_types.len(), &self.jvm_fetch_time, &self.arrow_ffi_time, - self.arrow_ffi_safe, )?; *current_batch = Some(next_batch); } @@ -195,7 +189,6 @@ impl ScanExec { num_cols: usize, jvm_fetch_time: &Time, arrow_ffi_time: &Time, - arrow_ffi_safe: bool, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -264,15 +257,16 @@ impl ScanExec { array }; - let array = if arrow_ffi_safe { - // ownership of this array has been transferred to native - // but we still need to unpack dictionary arrays - copy_or_unpack_array(&array, &CopyMode::UnpackOrClone)? - } else { - // it is necessary to copy the array because the contents may be - // overwritten on the JVM side in the future - copy_array(&array) - }; + // There are two reasons why it is important to make a deep copy of the array here. + // 1. If the underlying scan is `native_comet` then ownership of the array is + // not transferred to native and the contents of the buffers may be overwritten + // on the JVM side in the future. + // 2. Even if ownership is transferred, the JVM wrapper classes `ArrowArray` + // and `ArrowSchema` will not be released on the JVM side until the release + // callback is invoked when the array is eventually dropped in native code. This + // can lead to GC pressure when the batches are buffered in operators such as + // SortExec. + let array = copy_array(&array); inputs.push(array); diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index ca6e2084e3..0f98bdfa61 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1377,13 +1377,8 @@ impl PhysicalPlanner { }; // The `ScanExec` operator will take actual arrays from Spark during execution - let scan = ScanExec::new( - self.exec_context_id, - input_source, - &scan.source, - data_types, - scan.arrow_ffi_safe, - )?; + let scan = + ScanExec::new(self.exec_context_id, input_source, &scan.source, data_types)?; Ok(( vec![scan.clone()], @@ -2978,7 +2973,6 @@ mod tests { type_info: None, }], source: "".to_string(), - arrow_ffi_safe: false, })), }; @@ -3052,7 +3046,6 @@ mod tests { type_info: None, }], source: "".to_string(), - arrow_ffi_safe: false, })), }; @@ -3259,7 +3252,6 @@ mod tests { op_struct: Some(OpStruct::Scan(spark_operator::Scan { fields: vec![create_proto_datatype()], source: "".to_string(), - arrow_ffi_safe: false, })), } } @@ -3302,7 +3294,6 @@ mod tests { }, ], source: "".to_string(), - arrow_ffi_safe: false, })), }; @@ -3418,7 +3409,6 @@ mod tests { }, ], source: "".to_string(), - arrow_ffi_safe: false, })), }; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 3306ad574d..185c60fdae 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -77,8 +77,6 @@ message Scan { // is purely for informational purposes when viewing native query plans in // debug mode. string source = 2; - // Whether native code can assume ownership of batches that it receives - bool arrow_ffi_safe = 3; } message NativeScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 6dd4548268..2a13c7ea00 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1464,23 +1464,6 @@ object QueryPlanSerde extends Logging with CometExprShim { scanBuilder.setSource(source) } - val ffiSafe = op match { - case _ if isExchangeSink(op) => - // Source of broadcast exchange batches is ArrowStreamReader - // Source of shuffle exchange batches is NativeBatchDecoderIterator - true - case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_COMET => - // native_comet scan reuses mutable buffers - false - case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT => - // native_iceberg_compat scan reuses mutable buffers for constant columns - // https://github.com/apache/datafusion-comet/issues/2152 - false - case _ => - false - } - scanBuilder.setArrowFfiSafe(ffiSafe) - val scanTypes = op.output.flatten { attr => serializeDataType(attr.dataType) }