diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 851f0a244f53..bb5f772d09bf 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -42,7 +42,6 @@ arrow-cast = { workspace = true } arrow-data = { workspace = true } arrow-schema = { workspace = true } half = { version = "2.1", default-features = false } -indexmap = { version = "2.0", default-features = false, features = ["std"] } num-traits = { version = "0.2.19", default-features = false, features = ["std"] } serde_core = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["std"] } diff --git a/arrow-json/src/reader/schema.rs b/arrow-json/src/reader/schema.rs index 524e6b2aa560..1558ac132581 100644 --- a/arrow-json/src/reader/schema.rs +++ b/arrow-json/src/reader/schema.rs @@ -15,118 +15,18 @@ // specific language governing permissions and limitations // under the License. -use super::ValueIter; -use arrow_schema::{ArrowError, DataType, Field, Fields, Schema}; -use indexmap::map::IndexMap as HashMap; -use indexmap::set::IndexSet as HashSet; -use serde_json::Value; use std::borrow::Borrow; use std::io::{BufRead, Seek}; -use std::sync::Arc; - -#[derive(Debug, Clone)] -enum InferredType { - Scalar(HashSet), - Array(Box), - Object(HashMap), - Any, -} - -impl InferredType { - fn merge(&mut self, other: InferredType) -> Result<(), ArrowError> { - match (self, other) { - (InferredType::Array(s), InferredType::Array(o)) => { - s.merge(*o)?; - } - (InferredType::Scalar(self_hs), InferredType::Scalar(other_hs)) => { - other_hs.into_iter().for_each(|v| { - self_hs.insert(v); - }); - } - (InferredType::Object(self_map), InferredType::Object(other_map)) => { - for (k, v) in other_map { - self_map.entry(k).or_insert(InferredType::Any).merge(v)?; - } - } - (s @ InferredType::Any, v) => { - *s = v; - } - (_, InferredType::Any) => {} - // convert a scalar type to a single-item scalar array type. - (InferredType::Array(self_inner_type), other_scalar @ InferredType::Scalar(_)) => { - self_inner_type.merge(other_scalar)?; - } - (s @ InferredType::Scalar(_), InferredType::Array(mut other_inner_type)) => { - other_inner_type.merge(s.clone())?; - *s = InferredType::Array(other_inner_type); - } - // incompatible types - (s, o) => { - return Err(ArrowError::JsonError(format!( - "Incompatible type found during schema inference: {s:?} v.s. {o:?}", - ))); - } - } - - Ok(()) - } - - fn is_none_or_any(ty: Option<&Self>) -> bool { - matches!(ty, Some(Self::Any) | None) - } -} - -/// Shorthand for building list data type of `ty` -fn list_type_of(ty: DataType) -> DataType { - DataType::List(Arc::new(Field::new_list_field(ty, true))) -} -/// Coerce data type during inference -/// -/// * `Int64` and `Float64` should be `Float64` -/// * Lists and scalars are coerced to a list of a compatible scalar -/// * All other types are coerced to `Utf8` -fn coerce_data_type(dt: Vec<&DataType>) -> DataType { - let mut dt_iter = dt.into_iter().cloned(); - let dt_init = dt_iter.next().unwrap_or(DataType::Utf8); - - dt_iter.fold(dt_init, |l, r| match (l, r) { - (DataType::Null, o) | (o, DataType::Null) => o, - (DataType::Boolean, DataType::Boolean) => DataType::Boolean, - (DataType::Int64, DataType::Int64) => DataType::Int64, - (DataType::Float64, DataType::Float64) - | (DataType::Float64, DataType::Int64) - | (DataType::Int64, DataType::Float64) => DataType::Float64, - (DataType::List(l), DataType::List(r)) => { - list_type_of(coerce_data_type(vec![l.data_type(), r.data_type()])) - } - // coerce scalar and scalar array into scalar array - (DataType::List(e), not_list) | (not_list, DataType::List(e)) => { - list_type_of(coerce_data_type(vec![e.data_type(), ¬_list])) - } - _ => DataType::Utf8, - }) -} - -fn generate_datatype(t: &InferredType) -> Result { - Ok(match t { - InferredType::Scalar(hs) => coerce_data_type(hs.iter().collect()), - InferredType::Object(spec) => DataType::Struct(generate_fields(spec)?), - InferredType::Array(ele_type) => list_type_of(generate_datatype(ele_type)?), - InferredType::Any => DataType::Null, - }) -} +use arrow_schema::{ArrowError, Schema}; +use serde_json::Value; -fn generate_fields(spec: &HashMap) -> Result { - spec.iter() - .map(|(k, types)| Ok(Field::new(k, generate_datatype(types)?, true))) - .collect() -} +use self::infer::{InferTy, infer_json_type}; +use self::json_type::TapeValue; +use super::tape::TapeDecoder; -/// Generate schema from JSON field names and inferred data types -fn generate_schema(spec: HashMap) -> Result { - Ok(Schema::new(generate_fields(&spec)?)) -} +mod infer; +mod json_type; /// Infer the fields of a JSON file by reading the first n records of the file, with /// `max_read_records` controlling the maximum number of records to read. @@ -144,7 +44,7 @@ fn generate_schema(spec: HashMap) -> Result( /// use flate2::read::GzDecoder; /// use arrow_json::reader::infer_json_schema; /// -/// let mut file = File::open("test/data/mixed_arrays.json.gz").unwrap(); +/// let mut file = File::open("test/data/arrays.json.gz").unwrap(); /// /// // file's cursor's offset at 0 /// let mut reader = BufReader::new(GzDecoder::new(&file)); @@ -201,211 +101,28 @@ pub fn infer_json_schema_from_seekable( /// file.seek(SeekFrom::Start(0)).unwrap(); /// ``` pub fn infer_json_schema( - reader: R, + mut reader: R, max_read_records: Option, ) -> Result<(Schema, usize), ArrowError> { - let mut values = ValueIter::new(reader, max_read_records); - let schema = infer_json_schema_from_iterator(&mut values)?; - Ok((schema, values.record_count())) -} - -fn set_object_scalar_field_type( - field_types: &mut HashMap, - key: &str, - ftype: DataType, -) -> Result<(), ArrowError> { - if InferredType::is_none_or_any(field_types.get(key)) { - field_types.insert(key.to_string(), InferredType::Scalar(HashSet::new())); - } + let mut decoder = SchemaDecoder::new(max_read_records); - match field_types.get_mut(key).unwrap() { - InferredType::Scalar(hs) => { - hs.insert(ftype); - Ok(()) - } - // in case of column contains both scalar type and scalar array type, we convert type of - // this column to scalar array. - scalar_array @ InferredType::Array(_) => { - let mut hs = HashSet::new(); - hs.insert(ftype); - scalar_array.merge(InferredType::Scalar(hs))?; - Ok(()) - } - t => Err(ArrowError::JsonError(format!( - "Expected scalar or scalar array JSON type, found: {t:?}", - ))), - } -} + loop { + let buf = reader.fill_buf()?; + let read = buf.len(); -fn infer_scalar_array_type(array: &[Value]) -> Result { - let mut hs = HashSet::new(); - - for v in array { - match v { - Value::Null => {} - Value::Number(n) => { - if n.is_i64() { - hs.insert(DataType::Int64); - } else { - hs.insert(DataType::Float64); - } - } - Value::Bool(_) => { - hs.insert(DataType::Boolean); - } - Value::String(_) => { - hs.insert(DataType::Utf8); - } - Value::Array(_) | Value::Object(_) => { - return Err(ArrowError::JsonError(format!( - "Expected scalar value for scalar array, got: {v:?}" - ))); - } + if read == 0 { + break; } - } - Ok(InferredType::Scalar(hs)) -} + let decoded = decoder.decode(buf)?; + reader.consume(decoded); -fn infer_nested_array_type(array: &[Value]) -> Result { - let mut inner_ele_type = InferredType::Any; - - for v in array { - match v { - Value::Array(inner_array) => { - inner_ele_type.merge(infer_array_element_type(inner_array)?)?; - } - x => { - return Err(ArrowError::JsonError(format!( - "Got non array element in nested array: {x:?}" - ))); - } + if decoded != read { + break; } } - Ok(InferredType::Array(Box::new(inner_ele_type))) -} - -fn infer_struct_array_type(array: &[Value]) -> Result { - let mut field_types = HashMap::new(); - - for v in array { - match v { - Value::Object(map) => { - collect_field_types_from_object(&mut field_types, map)?; - } - _ => { - return Err(ArrowError::JsonError(format!( - "Expected struct value for struct array, got: {v:?}" - ))); - } - } - } - - Ok(InferredType::Object(field_types)) -} - -fn infer_array_element_type(array: &[Value]) -> Result { - match array.iter().take(1).next() { - None => Ok(InferredType::Any), // empty array, return any type that can be updated later - Some(a) => match a { - Value::Array(_) => infer_nested_array_type(array), - Value::Object(_) => infer_struct_array_type(array), - _ => infer_scalar_array_type(array), - }, - } -} - -fn collect_field_types_from_object( - field_types: &mut HashMap, - map: &serde_json::map::Map, -) -> Result<(), ArrowError> { - for (k, v) in map { - match v { - Value::Array(array) => { - let ele_type = infer_array_element_type(array)?; - - if InferredType::is_none_or_any(field_types.get(k)) { - match ele_type { - InferredType::Scalar(_) => { - field_types.insert( - k.to_string(), - InferredType::Array(Box::new(InferredType::Scalar(HashSet::new()))), - ); - } - InferredType::Object(_) => { - field_types.insert( - k.to_string(), - InferredType::Array(Box::new(InferredType::Object(HashMap::new()))), - ); - } - InferredType::Any | InferredType::Array(_) => { - // set inner type to any for nested array as well - // so it can be updated properly from subsequent type merges - field_types.insert( - k.to_string(), - InferredType::Array(Box::new(InferredType::Any)), - ); - } - } - } - - match field_types.get_mut(k).unwrap() { - InferredType::Array(inner_type) => { - inner_type.merge(ele_type)?; - } - // in case of column contains both scalar type and scalar array type, we - // convert type of this column to scalar array. - field_type @ InferredType::Scalar(_) => { - field_type.merge(ele_type)?; - *field_type = InferredType::Array(Box::new(field_type.clone())); - } - t => { - return Err(ArrowError::JsonError(format!( - "Expected array json type, found: {t:?}", - ))); - } - } - } - Value::Bool(_) => { - set_object_scalar_field_type(field_types, k, DataType::Boolean)?; - } - Value::Null => { - // we treat json as nullable by default when inferring, so just - // mark existence of a field if it wasn't known before - if !field_types.contains_key(k) { - field_types.insert(k.to_string(), InferredType::Any); - } - } - Value::Number(n) => { - if n.is_i64() { - set_object_scalar_field_type(field_types, k, DataType::Int64)?; - } else { - set_object_scalar_field_type(field_types, k, DataType::Float64)?; - } - } - Value::String(_) => { - set_object_scalar_field_type(field_types, k, DataType::Utf8)?; - } - Value::Object(inner_map) => { - if let InferredType::Any = field_types.get(k).unwrap_or(&InferredType::Any) { - field_types.insert(k.to_string(), InferredType::Object(HashMap::new())); - } - match field_types.get_mut(k).unwrap() { - InferredType::Object(inner_field_types) => { - collect_field_types_from_object(inner_field_types, inner_map)?; - } - t => { - return Err(ArrowError::JsonError(format!( - "Expected object json type, found: {t:?}", - ))); - } - } - } - } - } - - Ok(()) + decoder.finish() } /// Infer the fields of a JSON file by reading all items from the JSON Value Iterator. @@ -426,30 +143,89 @@ where I: Iterator>, V: Borrow, { - let mut field_types: HashMap = HashMap::new(); - - for record in value_iter { - match record?.borrow() { - Value::Object(map) => { - collect_field_types_from_object(&mut field_types, map)?; - } - value => { - return Err(ArrowError::JsonError(format!( - "Expected JSON record to be an object, found {value:?}" - ))); - } - }; + value_iter + .into_iter() + .try_fold(InferTy::any(), |ty, record| { + infer_json_type(record?.borrow(), ty) + })? + .into_schema() +} + +struct SchemaDecoder { + decoder: TapeDecoder, + max_read_records: Option, + record_count: usize, + schema: InferTy, +} + +impl SchemaDecoder { + pub fn new(max_read_records: Option) -> Self { + Self { + decoder: TapeDecoder::new(1024, 8), + max_read_records, + record_count: 0, + schema: InferTy::empty_object(), + } + } + + pub fn decode(&mut self, buf: &[u8]) -> Result { + let read = self.decoder.decode(buf)?; + if read != buf.len() { + self.infer_batch()?; + } + Ok(read) + } + + pub fn finish(mut self) -> Result<(Schema, usize), ArrowError> { + self.infer_batch()?; + Ok((self.schema.into_schema()?, self.record_count)) } - generate_schema(field_types) + fn infer_batch(&mut self) -> Result<(), ArrowError> { + let tape = self.decoder.finish()?; + + let remaining_records = self + .max_read_records + .map_or(usize::MAX, |max| max - self.record_count); + + let records = tape + .iter_rows() + .map(|idx| TapeValue::new(&tape, idx)) + .take(remaining_records); + + for record in records { + self.schema = infer_json_type(record, self.schema.clone())?; + self.record_count += 1; + } + + self.decoder.clear(); + Ok(()) + } } #[cfg(test)] mod tests { use super::*; - use flate2::read::GzDecoder; + use std::fs::File; use std::io::{BufReader, Cursor}; + use std::sync::Arc; + + use arrow_schema::{DataType, Field, Fields}; + use flate2::read::GzDecoder; + + /// Shorthand for building list data type of `ty` + fn list_type_of(ty: DataType) -> DataType { + DataType::List(Arc::new(Field::new_list_field(ty, true))) + } + + #[test] + fn test_empty_input_infers_empty_schema() { + let (inferred_schema, n_rows) = infer_json_schema_from_seekable(Cursor::new(b""), None) + .expect("failed to infer schema"); + assert_eq!(inferred_schema, Schema::empty()); + assert_eq!(n_rows, 0); + } #[test] fn test_json_infer_schema() { @@ -457,21 +233,21 @@ mod tests { Field::new("a", DataType::Int64, true), Field::new("b", list_type_of(DataType::Float64), true), Field::new("c", list_type_of(DataType::Boolean), true), - Field::new("d", list_type_of(DataType::Utf8), true), + Field::new("d", DataType::Utf8, true), ]); - let mut reader = BufReader::new(File::open("test/data/mixed_arrays.json").unwrap()); + let mut reader = BufReader::new(File::open("test/data/arrays.json").unwrap()); let (inferred_schema, n_rows) = infer_json_schema_from_seekable(&mut reader, None).unwrap(); assert_eq!(inferred_schema, schema); - assert_eq!(n_rows, 4); + assert_eq!(n_rows, 3); - let file = File::open("test/data/mixed_arrays.json.gz").unwrap(); + let file = File::open("test/data/arrays.json.gz").unwrap(); let mut reader = BufReader::new(GzDecoder::new(&file)); let (inferred_schema, n_rows) = infer_json_schema(&mut reader, None).unwrap(); assert_eq!(inferred_schema, schema); - assert_eq!(n_rows, 4); + assert_eq!(n_rows, 3); } #[test] @@ -600,33 +376,12 @@ mod tests { assert_eq!(small_field.data_type(), &DataType::Float64); } - #[test] - fn test_coercion_scalar_and_list() { - assert_eq!( - list_type_of(DataType::Float64), - coerce_data_type(vec![&DataType::Float64, &list_type_of(DataType::Float64)]) - ); - assert_eq!( - list_type_of(DataType::Float64), - coerce_data_type(vec![&DataType::Float64, &list_type_of(DataType::Int64)]) - ); - assert_eq!( - list_type_of(DataType::Int64), - coerce_data_type(vec![&DataType::Int64, &list_type_of(DataType::Int64)]) - ); - // boolean and number are incompatible, return utf8 - assert_eq!( - list_type_of(DataType::Utf8), - coerce_data_type(vec![&DataType::Boolean, &list_type_of(DataType::Float64)]) - ); - } - #[test] fn test_invalid_json_infer_schema() { let re = infer_json_schema_from_seekable(Cursor::new(b"}"), None); assert_eq!( re.err().unwrap().to_string(), - "Json error: Not valid JSON: expected value at line 1 column 1", + "Json error: Encountered unexpected '}' whilst parsing value" ); } @@ -640,14 +395,14 @@ mod tests { let (inferred_schema, _) = infer_json_schema_from_seekable(Cursor::new(data), None).expect("infer"); let schema = Schema::new(vec![ - Field::new("an", list_type_of(DataType::Null), true), Field::new("in", DataType::Int64, true), - Field::new("n", DataType::Null, true), - Field::new("na", list_type_of(DataType::Null), true), - Field::new("nas", list_type_of(DataType::Utf8), true), Field::new("ni", DataType::Int64, true), Field::new("ns", DataType::Utf8, true), Field::new("sn", DataType::Utf8, true), + Field::new("n", DataType::Null, true), + Field::new("an", list_type_of(DataType::Null), true), + Field::new("na", list_type_of(DataType::Null), true), + Field::new("nas", list_type_of(DataType::Utf8), true), ]); assert_eq!(inferred_schema, schema); } diff --git a/arrow-json/src/reader/schema/infer.rs b/arrow-json/src/reader/schema/infer.rs new file mode 100644 index 000000000000..911be595e8b2 --- /dev/null +++ b/arrow-json/src/reader/schema/infer.rs @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fmt::Display; +use std::sync::{Arc, LazyLock}; + +use arrow_schema::{ArrowError, DataType, Field, Fields, Schema}; + +use super::json_type::{JsonType, JsonValue}; + +/// Represents an inferred JSON type. +#[derive(Clone, Debug)] +pub(crate) enum InferTy { + Any, + Scalar(ScalarTy), + Array(Arc), + Object(Arc), +} + +/// An inferred scalar type. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub(crate) enum ScalarTy { + Bool, + Int64, + Float64, + String, +} + +/// A field in an inferred object type. +pub(crate) type InferFields = [(Arc, InferTy)]; + +/// Infers the type of a JSON value, given an expected type. +pub fn infer_json_type<'a, T>(value: T, expected: InferTy) -> Result +where + T: JsonValue<'a>, +{ + match value.get() { + JsonType::Null => Ok(expected), + JsonType::Bool => infer_scalar(ScalarTy::Bool, expected), + JsonType::Int64 => infer_scalar(ScalarTy::Int64, expected), + JsonType::Float64 => infer_scalar(ScalarTy::Float64, expected), + JsonType::String => infer_scalar(ScalarTy::String, expected), + JsonType::Array => infer_array(value.elements(), expected), + JsonType::Object => infer_object(value.fields(), expected), + } +} + +/// Infers the type of a scalar JSON value, given an expected type. +/// +/// Mixed integer and float types will coerce to float, and any other +/// mixtures will coerce to string. +fn infer_scalar(scalar: ScalarTy, expected: InferTy) -> Result { + use ScalarTy::*; + + Ok(InferTy::Scalar(match expected { + InferTy::Any => scalar, + InferTy::Scalar(expect) => match (expect, &scalar) { + (Bool, Bool) => Bool, + (Int64, Int64) => Int64, + (Int64 | Float64, Int64 | Float64) => Float64, + _ => String, + }, + _ => Err(ArrowError::JsonError(format!( + "Expected {expected}, found {scalar}" + )))?, + })) +} + +/// Infers the type of a JSON array, given an expected type. +fn infer_array<'a, I>(mut elements: I, expected: InferTy) -> Result +where + I: Iterator, + I::Item: JsonValue<'a>, +{ + let expected_elem = match expected { + InferTy::Any => InferTy::Any.to_arc(), + InferTy::Array(inner) => inner.clone(), + _ => Err(ArrowError::JsonError(format!( + "Expected {expected}, found an array" + )))?, + }; + + let elem = elements.try_fold((*expected_elem).clone(), |expected, value| { + infer_json_type(value, expected) + })?; + + // If the inferred type is the same as the expected type, + // reuse the expected type and thus any inner `Arc`s it contains, + // to avoid excess heap allocations. + if elem.ptr_eq(&expected_elem) { + Ok(InferTy::Array(expected_elem)) + } else { + Ok(InferTy::Array(elem.to_arc())) + } +} + +/// Infers the type of a JSON object, given an expected type. +fn infer_object<'a, I, T>(fields: I, expected: InferTy) -> Result +where + I: Iterator, + T: JsonValue<'a>, +{ + let expected_fields = match expected { + InferTy::Any => InferTy::empty_fields(), + InferTy::Object(fields) => fields.clone(), + _ => Err(ArrowError::JsonError(format!( + "Expected {expected}, found an object" + )))?, + }; + + let mut num_fields = expected_fields.len(); + let mut substs = HashMap::, InferTy)>::new(); + + for (key, value) in fields { + let existing_field = expected_fields + .iter() + .enumerate() + .find(|(_, (existing_key, _))| &**existing_key == key); + + match existing_field { + Some((field_idx, (key, expect_ty))) => { + let ty = infer_json_type(value, expect_ty.clone())?; + if !ty.ptr_eq(expect_ty) { + substs.insert(field_idx, (key.clone(), ty)); + } + } + None => { + let field_idx = num_fields; + num_fields += 1; + let ty = infer_json_type(value, InferTy::Any)?; + substs.insert(field_idx, (key.into(), ty)); + } + }; + } + + if substs.is_empty() { + return Ok(InferTy::Object(expected_fields)); + } + + let fields = (0..num_fields) + .map(|idx| match substs.remove(&idx) { + Some(subst) => subst, + None => expected_fields[idx].clone(), + }) + .collect(); + + Ok(InferTy::Object(fields)) +} + +macro_rules! memoize { + ($ty:ty, $value:expr) => {{ + static VALUE: LazyLock> = LazyLock::new(|| Arc::new($value)); + VALUE.clone() + }}; +} + +impl InferTy { + /// Returns an `InferTy` that represents any possible JSON type. + pub const fn any() -> Self { + Self::Any + } + + /// Returns an `InferTy` that represents an empty JSON object. + pub fn empty_object() -> Self { + Self::Object(Self::empty_fields()) + } + + /// Returns an `InferFields` with no fields. + fn empty_fields() -> Arc { + memoize!(InferFields, []) + } + + /// Returns this `InferTy` as an `Arc`. + /// + /// To avoid excess heaps allocations, common types are memoized in static variables. + fn to_arc(&self) -> Arc { + use ScalarTy::*; + + match self { + // Use a memoized `Arc` if possible + Self::Any => memoize!(InferTy, InferTy::Any), + Self::Scalar(Bool) => memoize!(InferTy, InferTy::Scalar(Bool)), + Self::Scalar(Int64) => memoize!(InferTy, InferTy::Scalar(Int64)), + Self::Scalar(Float64) => memoize!(InferTy, InferTy::Scalar(Float64)), + Self::Scalar(String) => memoize!(InferTy, InferTy::Scalar(String)), + // Otherwise allocate a new `Arc` + _ => Arc::new(self.clone()), + } + } + + /// Performs a shallow comparison between two types, only checking for + /// pointer equality on any nested `Arc`s. + fn ptr_eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Any, Self::Any) => true, + (Self::Scalar(lhs), Self::Scalar(rhs)) => lhs == rhs, + (Self::Array(lhs), Self::Array(rhs)) => Arc::ptr_eq(lhs, rhs), + (Self::Object(lhs), Self::Object(rhs)) => Arc::ptr_eq(lhs, rhs), + _ => false, + } + } + + pub fn to_datatype(&self) -> DataType { + match self { + InferTy::Any => DataType::Null, + InferTy::Scalar(s) => s.into_datatype(), + InferTy::Array(elem) => DataType::List(elem.to_list_field().into()), + InferTy::Object(fields) => { + DataType::Struct(fields.iter().map(|(key, ty)| ty.to_field(&**key)).collect()) + } + } + } + + pub fn to_field(&self, name: impl Into) -> Field { + Field::new(name, self.to_datatype(), true) + } + + pub fn to_list_field(&self) -> Field { + Field::new_list_field(self.to_datatype(), true) + } + + pub fn into_schema(self) -> Result { + let InferTy::Object(fields) = self else { + Err(ArrowError::JsonError(format!( + "Expected JSON object, found {self:?}", + )))? + }; + + let fields = fields + .iter() + .map(|(key, ty)| ty.to_field(&**key)) + .collect::(); + + Ok(Schema::new(fields)) + } +} + +impl Display for InferTy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + InferTy::Any => write!(f, "any value"), + InferTy::Scalar(s) => write!(f, "{}", s), + InferTy::Array(_) => write!(f, "an array"), + InferTy::Object(_) => write!(f, "an object"), + } + } +} + +impl ScalarTy { + fn into_datatype(self) -> DataType { + match self { + Self::Bool => DataType::Boolean, + Self::Int64 => DataType::Int64, + Self::Float64 => DataType::Float64, + Self::String => DataType::Utf8, + } + } +} + +impl Display for ScalarTy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ScalarTy::Bool => write!(f, "a boolean"), + ScalarTy::Int64 => write!(f, "an integer"), + ScalarTy::Float64 => write!(f, "a number"), + ScalarTy::String => write!(f, "a string"), + } + } +} diff --git a/arrow-json/src/reader/schema/json_type.rs b/arrow-json/src/reader/schema/json_type.rs new file mode 100644 index 000000000000..266bc00685fe --- /dev/null +++ b/arrow-json/src/reader/schema/json_type.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::reader::tape::{Tape, TapeElement}; + +/// Abstraction of a JSON value that is only concerned with the +/// type of the value rather than the value itself. +pub trait JsonValue<'a> { + /// Gets the type of this JSON value. + fn get(&self) -> JsonType; + + /// Returns an iterator over the elements of this JSON array. + /// + /// Panics if the JSON value is not an array. + fn elements(&self) -> impl Iterator; + + /// Returns an iterator over the fields of this JSON object. + /// + /// Panics if the JSON value is not an object. + fn fields(&self) -> impl Iterator; +} + +/// The type of a JSON value +pub enum JsonType { + Null, + Bool, + Int64, + Float64, + String, + Array, + Object, +} + +#[derive(Copy, Clone, Debug)] +pub struct TapeValue<'a> { + tape: &'a Tape<'a>, + idx: u32, +} + +impl<'a> TapeValue<'a> { + pub fn new(tape: &'a Tape<'a>, idx: u32) -> Self { + Self { tape, idx } + } +} + +impl<'a> JsonValue<'a> for TapeValue<'a> { + fn get(&self) -> JsonType { + match self.tape.get(self.idx) { + TapeElement::Null => JsonType::Null, + TapeElement::False => JsonType::Bool, + TapeElement::True => JsonType::Bool, + TapeElement::I64(_) | TapeElement::I32(_) => JsonType::Int64, + TapeElement::F64(_) | TapeElement::F32(_) => JsonType::Float64, + TapeElement::Number(s) => { + if self.tape.get_string(s).parse::().is_ok() { + JsonType::Int64 + } else { + JsonType::Float64 + } + } + TapeElement::String(_) => JsonType::String, + TapeElement::StartList(_) => JsonType::Array, + TapeElement::EndList(_) => unreachable!(), + TapeElement::StartObject(_) => JsonType::Object, + TapeElement::EndObject(_) => unreachable!(), + } + } + + fn elements(&self) -> impl Iterator { + self.tape + .iter_elements(self.idx) + .map(move |idx| Self { idx, ..*self }) + } + + fn fields(&self) -> impl Iterator { + self.tape + .iter_fields(self.idx) + .map(move |(key, idx)| (key, Self { idx, ..*self })) + } +} + +impl<'a> JsonValue<'a> for &'a serde_json::Value { + fn get(&self) -> JsonType { + use serde_json::Value; + + match self { + Value::Null => JsonType::Null, + Value::Bool(_) => JsonType::Bool, + Value::Number(n) => { + if n.is_i64() { + JsonType::Int64 + } else { + JsonType::Float64 + } + } + Value::String(_) => JsonType::String, + Value::Array(_) => JsonType::Array, + Value::Object(_) => JsonType::Object, + } + } + + fn elements(&self) -> impl Iterator { + use serde_json::Value; + + match self { + Value::Array(elements) => elements.iter(), + _ => panic!("Expected an array"), + } + } + + fn fields(&self) -> impl Iterator { + use serde_json::Value; + + match self { + Value::Object(fields) => fields.iter().map(|(key, value)| (key.as_str(), value)), + _ => panic!("Expected an object"), + } + } +} diff --git a/arrow-json/src/reader/tape.rs b/arrow-json/src/reader/tape.rs index 89ee3f778765..4ea8dce2f668 100644 --- a/arrow-json/src/reader/tape.rs +++ b/arrow-json/src/reader/tape.rs @@ -142,6 +142,55 @@ impl<'a> Tape<'a> { self.num_rows } + /// Iterates over the rows of the tape + pub fn iter_rows(&self) -> impl Iterator { + self.iter_values(0, self.elements.len() as u32) + } + + /// Iterates over the elements of the array starting at `idx` + pub fn iter_elements(&self, idx: u32) -> impl Iterator { + let end = match self.get(idx) { + TapeElement::StartList(end) => end, + elem => panic!("Expected the start of a list, found {:?}", elem), + }; + + self.iter_values(idx, end) + } + + /// Iterates over the fields of the objected starting at `idx` + pub fn iter_fields(&self, idx: u32) -> impl Iterator { + let end = match self.get(idx) { + TapeElement::StartObject(end) => end, + elem => panic!("Expected the start of an object, found {:?}", elem), + }; + + let mut idx = idx + 1; + + std::iter::from_fn(move || { + (idx < end).then(|| { + let key = match self.get(idx) { + TapeElement::String(s) => self.get_string(s), + elem => panic!("Expected a string, found {:?}", elem), + }; + let value_idx = idx + 1; + idx = self.next(value_idx, "field value").unwrap(); + (key, value_idx) + }) + }) + } + + fn iter_values(&self, start: u32, end: u32) -> impl Iterator { + let mut idx = start + 1; + + std::iter::from_fn(move || { + (idx < end).then(|| { + let value_id = idx; + idx = self.next(idx, "value").unwrap(); + value_id + }) + }) + } + /// Serialize the tape element at index `idx` to `out` returning the next field index fn serialize(&self, out: &mut String, idx: u32) -> u32 { match self.get(idx) { diff --git a/arrow-json/src/reader/value_iter.rs b/arrow-json/src/reader/value_iter.rs index f70b893f52a0..350c2290fd78 100644 --- a/arrow-json/src/reader/value_iter.rs +++ b/arrow-json/src/reader/value_iter.rs @@ -30,7 +30,7 @@ use serde_json::Value; /// use arrow_json::reader::ValueIter; /// /// let mut reader = -/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap()); +/// BufReader::new(File::open("test/data/arrays.json").unwrap()); /// let mut value_reader = ValueIter::new(&mut reader, None); /// for value in value_reader { /// println!("JSON value: {}", value.unwrap()); diff --git a/arrow-json/test/data/arrays.json.gz b/arrow-json/test/data/arrays.json.gz new file mode 100644 index 000000000000..c584e16847ec Binary files /dev/null and b/arrow-json/test/data/arrays.json.gz differ diff --git a/arrow-json/test/data/mixed_arrays.json b/arrow-json/test/data/mixed_arrays.json deleted file mode 100644 index 18987284a5b8..000000000000 --- a/arrow-json/test/data/mixed_arrays.json +++ /dev/null @@ -1,4 +0,0 @@ -{"a":1, "b":[2.0, 1.3, -6.1], "c":[false, true], "d":4.1} -{"a":-10, "b":[2.0, 1.3, -6.1], "c":null, "d":null} -{"a":2, "b":[2.0, null, -6.1], "c":[false, null], "d":"text"} -{"a":3, "b":4, "c": true, "d":[1, false, "array", 2.4]} diff --git a/arrow-json/test/data/mixed_arrays.json.gz b/arrow-json/test/data/mixed_arrays.json.gz deleted file mode 100644 index 0f6040092ff1..000000000000 Binary files a/arrow-json/test/data/mixed_arrays.json.gz and /dev/null differ