Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ arrow-flight = { version = "57.1.0", features = [
] }
arrow-ipc = { version = "57.1.0", default-features = false, features = [
"lz4",
"zstd",
] }
arrow-ord = { version = "57.1.0", default-features = false }
arrow-schema = { version = "57.1.0", default-features = false }
Expand Down Expand Up @@ -187,6 +188,7 @@ strum_macros = "0.27.2"
tempfile = "3"
testcontainers-modules = { version = "0.14" }
tokio = { version = "1.48", features = ["macros", "rt", "sync"] }
tokio-stream = "0.1"
url = "2.5.7"
zstd = { version = "0.13", default-features = false }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ async fn json_opener() -> Result<()> {
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
true,
);

let scan_config = FileScanConfigBuilder::new(
Expand Down
16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3046,6 +3046,22 @@ config_namespace! {
/// If not specified, the default level for the compression algorithm is used.
pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
/// The JSON format to use when reading files.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json feature which has been contributed to DF 53

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like DF53 will come soon apache#19692

///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `false`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
pub newline_delimited: bool, default = true
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ impl DataFrame {
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
/// let df = ctx.read_json("tests/data/unnest.json", JsonReadOptions::default()).await?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json feature which has been contributed to DF 53

/// // expand into multiple columns if it's json array, flatten field name if it's nested structure
/// let df = df.unnest_columns(&["b","c","d"])?;
/// let expected = vec![
Expand Down
143 changes: 143 additions & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,147 @@ mod tests {

Ok(())
}

/// Test that ParquetSink exposes rows_written, bytes_written, and
/// elapsed_compute metrics via DataSinkExec.
#[tokio::test]
async fn test_parquet_sink_metrics() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_execution::TaskContext;

use futures::TryStreamExt;

let ctx = SessionContext::new();
let tmp_dir = TempDir::new()?;
let output_path = tmp_dir.path().join("metrics_test.parquet");
let output_path_str = output_path.to_str().unwrap();

// Register a table with 100 rows
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("val", DataType::Int32, false),
]));
let ids: Vec<i32> = (0..100).collect();
let vals: Vec<i32> = (100..200).collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(ids)),
Arc::new(Int32Array::from(vals)),
],
)?;
ctx.register_batch("source", batch)?;

// Create the physical plan for COPY TO
let df = ctx
.sql(&format!(
"COPY source TO '{output_path_str}' STORED AS PARQUET"
))
.await?;
let plan = df.create_physical_plan().await?;

// Execute the plan
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx)?;
let _batches: Vec<_> = stream.try_collect().await?;

// Check metrics on the DataSinkExec (top-level plan)
let metrics = plan
.metrics()
.expect("DataSinkExec should return metrics from ParquetSink");
let aggregated = metrics.aggregate_by_name();

// rows_written should be 100
let rows_written = aggregated
.iter()
.find(|m| m.value().name() == "rows_written")
.expect("should have rows_written metric");
assert_eq!(
rows_written.value().as_usize(),
100,
"expected 100 rows written"
);

// bytes_written should be > 0
let bytes_written = aggregated
.iter()
.find(|m| m.value().name() == "bytes_written")
.expect("should have bytes_written metric");
assert!(
bytes_written.value().as_usize() > 0,
"expected bytes_written > 0, got {}",
bytes_written.value().as_usize()
);

// elapsed_compute should be > 0
let elapsed = aggregated
.iter()
.find(|m| m.value().name() == "elapsed_compute")
.expect("should have elapsed_compute metric");
assert!(
elapsed.value().as_usize() > 0,
"expected elapsed_compute > 0"
);

Ok(())
}

/// Test that ParquetSink metrics work with single_file_parallelism enabled.
#[tokio::test]
async fn test_parquet_sink_metrics_parallel() -> Result<()> {
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_execution::TaskContext;

use futures::TryStreamExt;

let ctx = SessionContext::new();
ctx.sql("SET datafusion.execution.parquet.allow_single_file_parallelism = true")
.await?
.collect()
.await?;

let tmp_dir = TempDir::new()?;
let output_path = tmp_dir.path().join("metrics_parallel.parquet");
let output_path_str = output_path.to_str().unwrap();

let schema =
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let ids: Vec<i32> = (0..50).collect();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(ids))],
)?;
ctx.register_batch("source2", batch)?;

let df = ctx
.sql(&format!(
"COPY source2 TO '{output_path_str}' STORED AS PARQUET"
))
.await?;
let plan = df.create_physical_plan().await?;
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx)?;
let _batches: Vec<_> = stream.try_collect().await?;

let metrics = plan.metrics().expect("DataSinkExec should return metrics");
let aggregated = metrics.aggregate_by_name();

let rows_written = aggregated
.iter()
.find(|m| m.value().name() == "rows_written")
.expect("should have rows_written metric");
assert_eq!(rows_written.value().as_usize(), 50);

let bytes_written = aggregated
.iter()
.find(|m| m.value().name() == "bytes_written")
.expect("should have bytes_written metric");
assert!(bytes_written.value().as_usize() > 0);

Ok(())
}
}
Loading
Loading