From ab7080ad7483ab57d849817c7345aea819eb712b Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Fri, 27 Feb 2026 11:07:44 +1100 Subject: [PATCH 1/2] Move `ValueIter` into own module, and add public `record_count` function --- arrow-json/src/reader/mod.rs | 2 + arrow-json/src/reader/schema.rs | 80 +-------------------------- arrow-json/src/reader/value_iter.rs | 86 +++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 78 deletions(-) create mode 100644 arrow-json/src/reader/value_iter.rs diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 786cf9212d04..04271368a4aa 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -150,6 +150,7 @@ use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit}; pub use schema::*; +pub use value_iter::ValueIter; use crate::reader::boolean_array::BooleanArrayDecoder; use crate::reader::decimal_array::DecimalArrayDecoder; @@ -179,6 +180,7 @@ mod string_view_array; mod struct_array; mod tape; mod timestamp_array; +mod value_iter; /// A builder for [`Reader`] and [`Decoder`] pub struct ReaderBuilder { diff --git a/arrow-json/src/reader/schema.rs b/arrow-json/src/reader/schema.rs index fb7d93a85e12..524e6b2aa560 100644 --- a/arrow-json/src/reader/schema.rs +++ b/arrow-json/src/reader/schema.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use super::ValueIter; use arrow_schema::{ArrowError, DataType, Field, Fields, Schema}; use indexmap::map::IndexMap as HashMap; use indexmap::set::IndexSet as HashSet; @@ -127,83 +128,6 @@ fn generate_schema(spec: HashMap) -> Result { - reader: R, - max_read_records: Option, - record_count: usize, - // reuse line buffer to avoid allocation on each record - line_buf: String, -} - -impl ValueIter { - /// Creates a new `ValueIter` - pub fn new(reader: R, max_read_records: Option) -> Self { - Self { - reader, - max_read_records, - record_count: 0, - line_buf: String::new(), - } - } -} - -impl Iterator for ValueIter { - type Item = Result; - - fn next(&mut self) -> Option { - if let Some(max) = self.max_read_records { - if self.record_count >= max { - return None; - } - } - - loop { - self.line_buf.truncate(0); - match self.reader.read_line(&mut self.line_buf) { - Ok(0) => { - // read_line returns 0 when stream reached EOF - return None; - } - Err(e) => { - return Some(Err(ArrowError::JsonError(format!( - "Failed to read JSON record: {e}" - )))); - } - _ => { - let trimmed_s = self.line_buf.trim(); - if trimmed_s.is_empty() { - // ignore empty lines - continue; - } - - self.record_count += 1; - return Some( - serde_json::from_str(trimmed_s) - .map_err(|e| ArrowError::JsonError(format!("Not valid JSON: {e}"))), - ); - } - } - } - } -} - /// Infer the fields of a JSON file by reading the first n records of the file, with /// `max_read_records` controlling the maximum number of records to read. /// @@ -282,7 +206,7 @@ pub fn infer_json_schema( ) -> Result<(Schema, usize), ArrowError> { let mut values = ValueIter::new(reader, max_read_records); let schema = infer_json_schema_from_iterator(&mut values)?; - Ok((schema, values.record_count)) + Ok((schema, values.record_count())) } fn set_object_scalar_field_type( diff --git a/arrow-json/src/reader/value_iter.rs b/arrow-json/src/reader/value_iter.rs new file mode 100644 index 000000000000..848ab8f564e6 --- /dev/null +++ b/arrow-json/src/reader/value_iter.rs @@ -0,0 +1,86 @@ +use std::io::BufRead; + +use arrow_schema::ArrowError; +use serde_json::Value; + +/// JSON file reader that produces a serde_json::Value iterator from a Read trait +/// +/// # Example +/// +/// ``` +/// use std::fs::File; +/// use std::io::BufReader; +/// use arrow_json::reader::ValueIter; +/// +/// let mut reader = +/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap()); +/// let mut value_reader = ValueIter::new(&mut reader, None); +/// for value in value_reader { +/// println!("JSON value: {}", value.unwrap()); +/// } +/// ``` +#[derive(Debug)] +pub struct ValueIter { + reader: R, + max_read_records: Option, + record_count: usize, + // reuse line buffer to avoid allocation on each record + line_buf: String, +} + +impl ValueIter { + /// Creates a new `ValueIter` + pub fn new(reader: R, max_read_records: Option) -> Self { + Self { + reader, + max_read_records, + record_count: 0, + line_buf: String::new(), + } + } + + /// Returns the number of records this iterator has consumed + pub fn record_count(&self) -> usize { + self.record_count + } +} + +impl Iterator for ValueIter { + type Item = Result; + + fn next(&mut self) -> Option { + if let Some(max) = self.max_read_records { + if self.record_count >= max { + return None; + } + } + + loop { + self.line_buf.truncate(0); + match self.reader.read_line(&mut self.line_buf) { + Ok(0) => { + // read_line returns 0 when stream reached EOF + return None; + } + Err(e) => { + return Some(Err(ArrowError::JsonError(format!( + "Failed to read JSON record: {e}" + )))); + } + _ => { + let trimmed_s = self.line_buf.trim(); + if trimmed_s.is_empty() { + // ignore empty lines + continue; + } + + self.record_count += 1; + return Some( + serde_json::from_str(trimmed_s) + .map_err(|e| ArrowError::JsonError(format!("Not valid JSON: {e}"))), + ); + } + } + } + } +} From baf72c3e201783a08ea6ea79a9300bf2bd17059d Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Sat, 14 Mar 2026 11:24:21 +1100 Subject: [PATCH 2/2] Add license header --- arrow-json/src/reader/value_iter.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/arrow-json/src/reader/value_iter.rs b/arrow-json/src/reader/value_iter.rs index 848ab8f564e6..f70b893f52a0 100644 --- a/arrow-json/src/reader/value_iter.rs +++ b/arrow-json/src/reader/value_iter.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::io::BufRead; use arrow_schema::ArrowError;