Skip to content

Commit 590f97e

Browse files
committed
array json support
1 parent b2c29ac commit 590f97e

18 files changed

Lines changed: 557 additions & 29 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> {
125125
projected,
126126
FileCompressionType::UNCOMPRESSED,
127127
Arc::new(object_store),
128+
false,
128129
);
129130

130131
let scan_config = FileScanConfigBuilder::new(

datafusion/common/src/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3065,6 +3065,22 @@ config_namespace! {
30653065
/// If not specified, the default level for the compression algorithm is used.
30663066
pub compression_level: Option<u32>, default = None
30673067
pub schema_infer_max_rec: Option<usize>, default = None
3068+
/// The format of JSON input files.
3069+
///
3070+
/// When `false` (default), expects newline-delimited JSON (NDJSON):
3071+
/// ```text
3072+
/// {"key1": 1, "key2": "val"}
3073+
/// {"key1": 2, "key2": "vals"}
3074+
/// ```
3075+
///
3076+
/// When `true`, expects JSON array format:
3077+
/// ```text
3078+
/// [
3079+
/// {"key1": 1, "key2": "val"},
3080+
/// {"key1": 2, "key2": "vals"}
3081+
/// ]
3082+
/// ```
3083+
pub format_array: bool, default = false
30683084
}
30693085
}
30703086

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ mod tests {
4747
use datafusion_common::stats::Precision;
4848

4949
use datafusion_common::Result;
50+
use datafusion_datasource::file_compression_type::FileCompressionType;
5051
use futures::StreamExt;
5152
use insta::assert_snapshot;
5253
use object_store::local::LocalFileSystem;
@@ -391,4 +392,276 @@ mod tests {
391392
assert_eq!(metadata.len(), 0);
392393
Ok(())
393394
}
395+
396+
#[tokio::test]
397+
async fn test_json_array_format() -> Result<()> {
398+
let session = SessionContext::new();
399+
let ctx = session.state();
400+
let store = Arc::new(LocalFileSystem::new()) as _;
401+
402+
// Create a temporary file with JSON array format
403+
let tmp_dir = tempfile::TempDir::new()?;
404+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
405+
std::fs::write(
406+
&path,
407+
r#"[
408+
{"a": 1, "b": 2.0, "c": true},
409+
{"a": 2, "b": 3.5, "c": false},
410+
{"a": 3, "b": 4.0, "c": true}
411+
]"#,
412+
)?;
413+
414+
// Test with format_array = true
415+
let format = JsonFormat::default().with_format_array(true);
416+
let file_schema = format
417+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
418+
.await
419+
.expect("Schema inference");
420+
421+
let fields = file_schema
422+
.fields()
423+
.iter()
424+
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
425+
.collect::<Vec<_>>();
426+
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
427+
428+
Ok(())
429+
}
430+
431+
#[tokio::test]
432+
async fn test_json_array_format_empty() -> Result<()> {
433+
let session = SessionContext::new();
434+
let ctx = session.state();
435+
let store = Arc::new(LocalFileSystem::new()) as _;
436+
437+
let tmp_dir = tempfile::TempDir::new()?;
438+
let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy());
439+
std::fs::write(&path, "[]")?;
440+
441+
let format = JsonFormat::default().with_format_array(true);
442+
let result = format
443+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
444+
.await;
445+
446+
assert!(result.is_err());
447+
assert!(
448+
result
449+
.unwrap_err()
450+
.to_string()
451+
.contains("JSON array is empty")
452+
);
453+
454+
Ok(())
455+
}
456+
457+
#[tokio::test]
458+
async fn test_json_array_format_with_limit() -> Result<()> {
459+
let session = SessionContext::new();
460+
let ctx = session.state();
461+
let store = Arc::new(LocalFileSystem::new()) as _;
462+
463+
let tmp_dir = tempfile::TempDir::new()?;
464+
let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy());
465+
std::fs::write(
466+
&path,
467+
r#"[
468+
{"a": 1},
469+
{"a": 2, "b": "extra"}
470+
]"#,
471+
)?;
472+
473+
// Only infer from first record
474+
let format = JsonFormat::default()
475+
.with_format_array(true)
476+
.with_schema_infer_max_rec(1);
477+
478+
let file_schema = format
479+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
480+
.await
481+
.expect("Schema inference");
482+
483+
// Should only have field "a" since we limited to 1 record
484+
let fields = file_schema
485+
.fields()
486+
.iter()
487+
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
488+
.collect::<Vec<_>>();
489+
assert_eq!(vec!["a: Int64"], fields);
490+
491+
Ok(())
492+
}
493+
494+
#[tokio::test]
495+
async fn test_json_array_format_read_data() -> Result<()> {
496+
let session = SessionContext::new();
497+
let ctx = session.state();
498+
let task_ctx = ctx.task_ctx();
499+
let store = Arc::new(LocalFileSystem::new()) as _;
500+
501+
// Create a temporary file with JSON array format
502+
let tmp_dir = tempfile::TempDir::new()?;
503+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
504+
std::fs::write(
505+
&path,
506+
r#"[
507+
{"a": 1, "b": 2.0, "c": true},
508+
{"a": 2, "b": 3.5, "c": false},
509+
{"a": 3, "b": 4.0, "c": true}
510+
]"#,
511+
)?;
512+
513+
let format = JsonFormat::default().with_format_array(true);
514+
515+
// Infer schema
516+
let file_schema = format
517+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
518+
.await?;
519+
520+
// Scan and read data
521+
let exec = scan_format(
522+
&ctx,
523+
&format,
524+
Some(file_schema),
525+
tmp_dir.path().to_str().unwrap(),
526+
"array.json",
527+
None,
528+
None,
529+
)
530+
.await?;
531+
let batches = collect(exec, task_ctx).await?;
532+
533+
assert_eq!(1, batches.len());
534+
assert_eq!(3, batches[0].num_columns());
535+
assert_eq!(3, batches[0].num_rows());
536+
537+
// Verify data
538+
let array_a = as_int64_array(batches[0].column(0))?;
539+
assert_eq!(
540+
vec![1, 2, 3],
541+
(0..3).map(|i| array_a.value(i)).collect::<Vec<_>>()
542+
);
543+
544+
Ok(())
545+
}
546+
547+
#[tokio::test]
548+
async fn test_json_array_format_with_projection() -> Result<()> {
549+
let session = SessionContext::new();
550+
let ctx = session.state();
551+
let task_ctx = ctx.task_ctx();
552+
let store = Arc::new(LocalFileSystem::new()) as _;
553+
554+
let tmp_dir = tempfile::TempDir::new()?;
555+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
556+
std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?;
557+
558+
let format = JsonFormat::default().with_format_array(true);
559+
let file_schema = format
560+
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
561+
.await?;
562+
563+
// Project only column "a"
564+
let exec = scan_format(
565+
&ctx,
566+
&format,
567+
Some(file_schema),
568+
tmp_dir.path().to_str().unwrap(),
569+
"array.json",
570+
Some(vec![0]),
571+
None,
572+
)
573+
.await?;
574+
let batches = collect(exec, task_ctx).await?;
575+
576+
assert_eq!(1, batches.len());
577+
assert_eq!(1, batches[0].num_columns()); // Only 1 column projected
578+
assert_eq!(2, batches[0].num_rows());
579+
580+
Ok(())
581+
}
582+
583+
#[tokio::test]
584+
async fn test_ndjson_read_options_format_array() -> Result<()> {
585+
let ctx = SessionContext::new();
586+
587+
// Create a temporary file with JSON array format
588+
let tmp_dir = tempfile::TempDir::new()?;
589+
let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
590+
std::fs::write(
591+
&path,
592+
r#"[
593+
{"a": 1, "b": "hello"},
594+
{"a": 2, "b": "world"},
595+
{"a": 3, "b": "test"}
596+
]"#,
597+
)?;
598+
599+
// Use NdJsonReadOptions with format_array = true
600+
let options = NdJsonReadOptions::default().format_array(true);
601+
602+
ctx.register_json("json_array_table", &path, options)
603+
.await?;
604+
605+
let result = ctx
606+
.sql("SELECT a, b FROM json_array_table ORDER BY a")
607+
.await?
608+
.collect()
609+
.await?;
610+
611+
assert_snapshot!(batches_to_string(&result), @r"
612+
+---+-------+
613+
| a | b |
614+
+---+-------+
615+
| 1 | hello |
616+
| 2 | world |
617+
| 3 | test |
618+
+---+-------+
619+
");
620+
621+
Ok(())
622+
}
623+
624+
#[tokio::test]
625+
async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> {
626+
use flate2::Compression;
627+
use flate2::write::GzEncoder;
628+
use std::io::Write;
629+
630+
let ctx = SessionContext::new();
631+
632+
// Create a temporary gzip compressed JSON array file
633+
let tmp_dir = tempfile::TempDir::new()?;
634+
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());
635+
636+
let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#;
637+
let file = std::fs::File::create(&path)?;
638+
let mut encoder = GzEncoder::new(file, Compression::default());
639+
encoder.write_all(json_content.as_bytes())?;
640+
encoder.finish()?;
641+
642+
// Use NdJsonReadOptions with format_array and GZIP compression
643+
let options = NdJsonReadOptions::default()
644+
.format_array(true)
645+
.file_compression_type(FileCompressionType::GZIP)
646+
.file_extension(".json.gz");
647+
648+
ctx.register_json("json_array_gzip", &path, options).await?;
649+
650+
let result = ctx
651+
.sql("SELECT a, b FROM json_array_gzip ORDER BY a")
652+
.await?
653+
.collect()
654+
.await?;
655+
656+
assert_snapshot!(batches_to_string(&result), @r"
657+
+---+-------+
658+
| a | b |
659+
+---+-------+
660+
| 1 | hello |
661+
| 2 | world |
662+
+---+-------+
663+
");
664+
665+
Ok(())
666+
}
394667
}

datafusion/core/src/datasource/file_format/options.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,9 @@ pub struct NdJsonReadOptions<'a> {
465465
pub infinite: bool,
466466
/// Indicates how the file is sorted
467467
pub file_sort_order: Vec<Vec<SortExpr>>,
468+
/// Whether the JSON file is in array format `[{...}, {...}]` instead of
469+
/// line-delimited format. Defaults to `false`.
470+
pub format_array: bool,
468471
}
469472

470473
impl Default for NdJsonReadOptions<'_> {
@@ -477,6 +480,7 @@ impl Default for NdJsonReadOptions<'_> {
477480
file_compression_type: FileCompressionType::UNCOMPRESSED,
478481
infinite: false,
479482
file_sort_order: vec![],
483+
format_array: false,
480484
}
481485
}
482486
}
@@ -529,6 +533,13 @@ impl<'a> NdJsonReadOptions<'a> {
529533
self.schema_infer_max_records = schema_infer_max_records;
530534
self
531535
}
536+
537+
/// Specify whether the JSON file is in array format `[{...}, {...}]`
538+
/// instead of line-delimited format.
539+
pub fn format_array(mut self, format_array: bool) -> Self {
540+
self.format_array = format_array;
541+
self
542+
}
532543
}
533544

534545
#[async_trait]
@@ -663,7 +674,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
663674
let file_format = JsonFormat::default()
664675
.with_options(table_options.json)
665676
.with_schema_infer_max_rec(self.schema_infer_max_records)
666-
.with_file_compression_type(self.file_compression_type.to_owned());
677+
.with_file_compression_type(self.file_compression_type.to_owned())
678+
.with_format_array(self.format_array);
667679

668680
ListingOptions::new(Arc::new(file_format))
669681
.with_file_extension(self.file_extension)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[
2+
{"a": 1, "b": "hello"},
3+
{"a": 2, "b": "world"},
4+
{"a": 3, "b": "test"}
5+
]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[]

datafusion/datasource-json/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ datafusion-physical-plan = { workspace = true }
4444
datafusion-session = { workspace = true }
4545
futures = { workspace = true }
4646
object_store = { workspace = true }
47+
serde_json = { workspace = true }
4748
tokio = { workspace = true }
4849

4950
# Note: add additional linter rules in lib.rs.

0 commit comments

Comments
 (0)