Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ fn parquet_convert_array(
.with_timezone(Arc::clone(tz)),
))
}
// Keep scan-time nested parquet conversion aligned with Spark's legacy
// array<Date> -> array<Int> behavior without affecting scalar Date -> Int casts.
(Date32, Int32) => Ok(new_null_array(to_type, array.len())),
(Map(_, ordered_from), Map(_, ordered_to)) if ordered_from == ordered_to =>
parquet_convert_map_to_map(array.as_map(), to_type, parquet_options, *ordered_to)
,
Expand Down
54 changes: 47 additions & 7 deletions native/spark-expr/src/conversion_funcs/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ pub(crate) fn cast_array(
};

let cast_result = match (&from_type, to_type) {
// Null arrays carry no concrete values, so Arrow's native cast can change only the
// logical type while preserving length and nullness.
(Null, _) => Ok(cast_with_options(&array, to_type, &native_cast_options)?),
(Utf8, Boolean) => spark_cast_utf8_to_boolean::<i32>(&array, eval_mode),
(LargeUtf8, Boolean) => spark_cast_utf8_to_boolean::<i64>(&array, eval_mode),
(Utf8, Timestamp(_, _)) => {
Expand Down Expand Up @@ -366,8 +369,19 @@ pub(crate) fn cast_array(
cast_options,
)?),
(List(_), Utf8) => Ok(cast_array_to_string(array.as_list(), cast_options)?),
(List(_), List(_)) if can_cast_types(&from_type, to_type) => {
Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?)
(List(_), List(to)) => {
let list_array = array.as_list::<i32>();
let casted_values = match (list_array.values().data_type(), to.data_type()) {
// Spark legacy array casts produce null elements for array<Date> -> array<Int>.
(Date32, Int32) => new_null_array(to.data_type(), list_array.values().len()),
_ => cast_array(Arc::clone(list_array.values()), to.data_type(), cast_options)?,
};
Ok(Arc::new(ListArray::new(
Arc::clone(to),
list_array.offsets().clone(),
casted_values,
list_array.nulls().cloned(),
)) as ArrayRef)
}
(Map(_, _), Map(_, _)) => Ok(cast_map_to_map(&array, &from_type, to_type, cast_options)?),
(UInt8 | UInt16 | UInt32 | UInt64, Int8 | Int16 | Int32 | Int64)
Expand Down Expand Up @@ -803,7 +817,8 @@ fn cast_binary_formatter(value: &[u8]) -> String {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::StringArray;
use arrow::array::{ListArray, NullArray, StringArray};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::TimestampMicrosecondType;
use arrow::datatypes::{Field, Fields};
#[test]
Expand Down Expand Up @@ -929,8 +944,6 @@ mod tests {

#[test]
fn test_cast_string_array_to_string() {
use arrow::array::ListArray;
use arrow::buffer::OffsetBuffer;
let values_array =
StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("a"), None, None]);
let offsets_buffer = OffsetBuffer::<i32>::new(vec![0, 3, 5, 6, 6].into());
Expand All @@ -955,8 +968,6 @@ mod tests {

#[test]
fn test_cast_i32_array_to_string() {
use arrow::array::ListArray;
use arrow::buffer::OffsetBuffer;
let values_array = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(1), None, None]);
let offsets_buffer = OffsetBuffer::<i32>::new(vec![0, 3, 5, 6, 6].into());
let item_field = Arc::new(Field::new("item", DataType::Int32, true));
Expand All @@ -977,4 +988,33 @@ mod tests {
assert_eq!(r#"[null]"#, string_array.value(2));
assert_eq!(r#"[]"#, string_array.value(3));
}

#[test]
fn test_cast_array_of_nulls_to_array() {
let offsets_buffer = OffsetBuffer::<i32>::new(vec![0, 2, 3, 3].into());
let from_item_field = Arc::new(Field::new("item", DataType::Null, true));
let from_array: ArrayRef = Arc::new(ListArray::new(
from_item_field,
offsets_buffer,
Arc::new(NullArray::new(3)),
None,
));

let to_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
let to_array = cast_array(
from_array,
&to_type,
&SparkCastOptions::new(EvalMode::Legacy, "UTC", false),
)
.unwrap();

let result = to_array.as_list::<i32>();
assert_eq!(3, result.len());
assert_eq!(result.value_offsets(), &[0, 2, 3, 3]);

let values = result.values().as_primitive::<Int32Type>();
assert_eq!(3, values.len());
assert_eq!(3, values.null_count());
assert!(values.iter().all(|value| value.is_none()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {

(fromType, toType) match {
case (dt: ArrayType, _: ArrayType) if dt.elementType == NullType => Compatible()
case (ArrayType(DataTypes.DateType, _), ArrayType(toElementType, _))
if toElementType != DataTypes.IntegerType && toElementType != DataTypes.StringType =>
unsupported(fromType, toType)
case (dt: ArrayType, DataTypes.StringType) if dt.elementType == DataTypes.BinaryType =>
Incompatible()
case (dt: ArrayType, DataTypes.StringType) =>
Expand Down
Loading
Loading