diff --git a/.github/workflows/build-extensions.yml b/.github/workflows/build-extensions.yml index 55dcd5e04..ce5269624 100644 --- a/.github/workflows/build-extensions.yml +++ b/.github/workflows/build-extensions.yml @@ -4,9 +4,9 @@ on: workflow_dispatch: inputs: extensions: - description: 'Semicolon-separated list of extensions to build (e.g., json)' + description: 'Semicolon-separated list of extensions to build (e.g., json;parquet)' required: false - default: 'json' + default: 'json;parquet' version: description: 'Version tag for the extensions (e.g., 0.1.0)' required: false @@ -51,7 +51,7 @@ jobs: if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then EXTENSIONS="${{ github.event.inputs.extensions }}" else - EXTENSIONS="json" + EXTENSIONS="json;parquet" fi echo "extensions=$EXTENSIONS" >> $GITHUB_OUTPUT echo "Building extensions: $EXTENSIONS" diff --git a/.github/workflows/neug-extension-test.yml b/.github/workflows/neug-extension-test.yml index 8a14e25c5..a0b76b7e6 100644 --- a/.github/workflows/neug-extension-test.yml +++ b/.github/workflows/neug-extension-test.yml @@ -108,6 +108,7 @@ jobs: export TEST_PATH=${GITHUB_WORKSPACE}/tests cd build/neug_py_bind ctest -R json_extension_test -V + ctest -R parquet_extension_test -V export FLEX_DATA_DIR=${GITHUB_WORKSPACE}/example_dataset/tinysnb ctest -R test_extension -V @@ -122,6 +123,7 @@ jobs: GLOG_v=10 ./build/neug_py_bind/tools/utils/bulk_loader -g ../../example_dataset/comprehensive_graph/graph.yaml -l ../../example_dataset/comprehensive_graph/import.yaml -d /tmp/comprehensive_graph NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_load.py -k "json" NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_export.py -k "json" + NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_load.py -k "parquet" extension_tests_wheel_linux_x86_64: runs-on: [self-hosted, linux, x64] diff --git a/CMakeLists.txt b/CMakeLists.txt index c756c30a0..d70b98287 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -589,6 +589,12 @@ if(BUILD_EXTENSIONS AND "json" IN_LIST BUILD_EXTENSIONS) message(STATUS "Arrow JSON support enabled for json extension") endif() +# Configure Arrow Parquet support if parquet extension is enabled +if(BUILD_EXTENSIONS AND "parquet" IN_LIST BUILD_EXTENSIONS) + set(ARROW_ENABLE_PARQUET ON CACHE BOOL "" FORCE) + message(STATUS "Arrow Parquet support enabled for parquet extension") +endif() + set(NEUG_USE_SYSTEM_ARROW OFF) if(DEFINED ENV{CI} AND "$ENV{CI}" STREQUAL "ON") set(NEUG_USE_SYSTEM_ARROW ON) diff --git a/cmake/BuildArrowAsThirdParty.cmake b/cmake/BuildArrowAsThirdParty.cmake index 3a9011d61..86459f968 100644 --- a/cmake/BuildArrowAsThirdParty.cmake +++ b/cmake/BuildArrowAsThirdParty.cmake @@ -52,6 +52,9 @@ function(build_arrow_as_third_party) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=uninitialized") endif() set(CMAKE_POSITION_INDEPENDENT_CODE ON) + # Thrift (Arrow-parquet dependency) emits these warnings + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=unused-function") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=stringop-truncation") set(ARROW_BUILD_SHARED OFF CACHE BOOL "" FORCE) set(ARROW_BUILD_STATIC ON CACHE BOOL "" FORCE) @@ -72,15 +75,23 @@ function(build_arrow_as_third_party) if(NOT DEFINED ARROW_JSON) set(ARROW_JSON OFF CACHE BOOL "" FORCE) endif() - set(ARROW_PARQUET OFF CACHE BOOL "" FORCE) - set(ARROW_PLASMA OFF CACHE BOOL "" FORCE) - set(ARROW_PYTHON OFF CACHE BOOL "" FORCE) - set(ARROW_S3 OFF CACHE BOOL "" FORCE) + # Translate ARROW_ENABLE_PARQUET (our flag set in CMakeLists.txt) to Arrow's + # own ARROW_PARQUET variable so Arrow actually builds Parquet support. + if(ARROW_ENABLE_PARQUET) + set(ARROW_PARQUET ON CACHE BOOL "" FORCE) + else() + set(ARROW_PARQUET OFF CACHE BOOL "" FORCE) + endif() + # Enable Snappy and Zlib + set(ARROW_WITH_SNAPPY ON CACHE BOOL "" FORCE) + set(ARROW_WITH_ZLIB ON CACHE BOOL "" FORCE) set(ARROW_WITH_BZ2 OFF CACHE BOOL "" FORCE) set(ARROW_WITH_LZ4 OFF CACHE BOOL "" FORCE) - set(ARROW_WITH_SNAPPY OFF CACHE BOOL "" FORCE) - set(ARROW_WITH_ZSTD OFF CACHE BOOL "" FORCE) + set(ARROW_WITH_ZSTD ON CACHE BOOL "" FORCE) set(ARROW_WITH_BROTLI OFF CACHE BOOL "" FORCE) + set(ARROW_PLASMA OFF CACHE BOOL "" FORCE) + set(ARROW_PYTHON OFF CACHE BOOL "" FORCE) + set(ARROW_S3 OFF CACHE BOOL "" FORCE) set(ARROW_IPC ON CACHE BOOL "" FORCE) set(ARROW_BUILD_BENCHMARKS OFF CACHE BOOL "" FORCE) set(ARROW_BUILD_TESTS OFF CACHE BOOL "" FORCE) @@ -104,7 +115,6 @@ function(build_arrow_as_third_party) # Point Arrow to use the project's RapidJSON set(RapidJSON_ROOT "${CMAKE_SOURCE_DIR}/third_party/rapidjson" CACHE PATH "" FORCE) endif() - set(ARROW_WITH_ZLIB OFF CACHE BOOL "" FORCE) set(ARROW_ENABLE_THREADING ON CACHE BOOL "" FORCE) # Save original flags and set flags to suppress warnings for Arrow build @@ -116,6 +126,12 @@ function(build_arrow_as_third_party) cmake_policy(SET CMP0135 NEW) endif() + # CMake 3.30+ introduced CMP0169 which deprecates FetchContent_Populate() with declared details. + # CMake 4.x made it an error by default. Set to OLD to preserve the existing pattern. + if(POLICY CMP0169) + cmake_policy(SET CMP0169 OLD) + endif() + message(STATUS "Fetching Arrow ${ARROW_VERSION} from ${ARROW_SOURCE_URL}") fetchcontent_declare(Arrow @@ -127,6 +143,24 @@ function(build_arrow_as_third_party) FetchContent_GetProperties(Arrow) if(NOT arrow_POPULATED) FetchContent_Populate(Arrow) + # CMake 4.x removed support for cmake_minimum_required < 3.5. + # Arrow's bundled ExternalProjects (e.g. snappy) use EP_COMMON_CMAKE_ARGS + # which is built inside Arrow's ThirdpartyToolchain.cmake. We patch that + # file after fetch to append -DCMAKE_POLICY_VERSION_MINIMUM=3.5 so every + # ExternalProject sub-build can configure under CMake 4.x. + if(CMAKE_VERSION VERSION_GREATER_EQUAL "4.0") + set(_toolchain "${arrow_SOURCE_DIR}/cpp/cmake_modules/ThirdpartyToolchain.cmake") + file(READ "${_toolchain}" _toolchain_content) + string(FIND "${_toolchain_content}" "CMAKE_POLICY_VERSION_MINIMUM" _already_patched) + if(_already_patched EQUAL -1) + string(REPLACE + "# if building with a toolchain file, pass that through" + "# CMake 4.x: inject CMAKE_POLICY_VERSION_MINIMUM into all ExternalProject builds\nlist(APPEND EP_COMMON_CMAKE_ARGS \"-DCMAKE_POLICY_VERSION_MINIMUM=3.5\")\n\n# if building with a toolchain file, pass that through" + _toolchain_content "${_toolchain_content}") + file(WRITE "${_toolchain}" "${_toolchain_content}") + message(STATUS "Patched Arrow ThirdpartyToolchain.cmake for CMake 4.x compatibility") + endif() + endif() add_subdirectory(${arrow_SOURCE_DIR}/cpp ${arrow_BINARY_DIR} EXCLUDE_FROM_ALL) endif() @@ -214,6 +248,33 @@ function(build_arrow_as_third_party) include_directories(${arrow_SOURCE_DIR}/cpp/src ${arrow_BINARY_DIR}/src) + # Try different possible Arrow Parquet target names + if(TARGET Arrow::parquet_static) + message(STATUS "Found Arrow::parquet_static target") + set(ARROW_PARQUET_LIB Arrow::parquet_static) + elseif(TARGET arrow_parquet_static) + message(STATUS "Found arrow_parquet_static target") + set(ARROW_PARQUET_LIB arrow_parquet_static) + elseif(TARGET parquet_static) + message(STATUS "Found parquet_static target") + set(ARROW_PARQUET_LIB parquet_static) + elseif(TARGET Arrow::parquet) + message(STATUS "Found Arrow::parquet target (using as fallback)") + set(ARROW_PARQUET_LIB Arrow::parquet) + elseif(TARGET parquet) + message(STATUS "Found parquet target (using as fallback)") + set(ARROW_PARQUET_LIB parquet) + else() + message(WARNING "Arrow parquet target not found. Parquet symbols may be unresolved.") + set(ARROW_PARQUET_LIB "") + endif() + + if(ARROW_PARQUET_LIB) + list(APPEND ARROW_LIB ${ARROW_PARQUET_LIB}) + set(ARROW_LIB ${ARROW_LIB} PARENT_SCOPE) + set(ARROW_PARQUET_LIB ${ARROW_PARQUET_LIB} PARENT_SCOPE) + endif() + # Set additional Arrow variables for compatibility set(ARROW_FOUND TRUE PARENT_SCOPE) set(ARROW_LIBRARIES ${ARROW_LIB} PARENT_SCOPE) @@ -251,6 +312,21 @@ function(build_arrow_as_third_party) FILES_MATCHING PATTERN "*.h" PATTERN "test" EXCLUDE PATTERN "testing" EXCLUDE) + + # Install Parquet headers if Parquet is enabled + if(ARROW_ENABLE_PARQUET) + install(DIRECTORY ${arrow_SOURCE_DIR}/cpp/src/parquet + DESTINATION include + FILES_MATCHING PATTERN "*.h" + PATTERN "test" EXCLUDE + PATTERN "testing" EXCLUDE) + + install(DIRECTORY ${arrow_BINARY_DIR}/src/parquet + DESTINATION include + FILES_MATCHING PATTERN "*.h" + PATTERN "test" EXCLUDE + PATTERN "testing" EXCLUDE) + endif() else() # list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Arrow) diff --git a/doc/source/extensions/load_parquet.md b/doc/source/extensions/load_parquet.md new file mode 100644 index 000000000..fe1fad56e --- /dev/null +++ b/doc/source/extensions/load_parquet.md @@ -0,0 +1,79 @@ +# Parquet Extension + +Apache Parquet is a columnar storage format widely used in data engineering and analytics workloads. NeuG supports Parquet file import functionality through the Extension framework. After loading the Parquet Extension, users can directly load external Parquet files using the `LOAD FROM` syntax. + +## Install Extension + +```cypher +INSTALL PARQUET; +``` + +## Load Extension + +```cypher +LOAD PARQUET; +``` + +## Using Parquet Extension + +`LOAD FROM` reads Parquet files and exposes their columns for querying. Schema is automatically inferred from the Parquet file metadata by default. + +### Parquet Format Options + +The following options control how Parquet files are read: + +| Option | Type | Default | Description | +| ------------------------ | ----- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | +| `buffered_stream` | bool | `true` | Enable buffered I/O stream for improved sequential read performance. | +| `pre_buffer` | bool | `false` | Pre-buffer column data before decoding. Recommended for high-latency filesystems such as S3. | +| `enable_io_coalescing` | bool | `true` | Enable Arrow I/O read coalescing (hole-filling cache) to reduce I/O overhead when reading non-contiguous byte ranges. When `true`, uses lazy coalescing; when `false`, uses eager coalescing. | +| `parquet_batch_rows` | int64 | `65536` | Number of rows per Arrow record batch when converting Parquet row groups into in-memory batches. | + +### Query Examples + +#### Basic Parquet Loading + +Load all columns from a Parquet file: + +```cypher +LOAD FROM "person.parquet" +RETURN *; +``` + +#### Specifying Batch Size + +Tune memory usage by adjusting the number of rows read per batch: + +```cypher +LOAD FROM "person.parquet" (parquet_batch_rows=8192) +RETURN *; +``` + +#### Enabling I/O Coalescing + +Enable eager I/O coalescing for workloads that benefit from pre-fetching contiguous data: + +```cypher +LOAD FROM "person.parquet" (enable_io_coalescing=false) +RETURN *; +``` + +#### Column Projection + +Return only specific columns from Parquet data: + +```cypher +LOAD FROM "person.parquet" +RETURN fName, age; +``` + +#### Column Aliases + +Use `AS` to assign aliases to columns: + +```cypher +LOAD FROM "person.parquet" +RETURN fName AS name, age AS years; +``` + +> **Note:** All relational operations supported by `LOAD FROM` — including type conversion, WHERE filtering, aggregation, sorting, and limiting — work the same way with Parquet files. See the [LOAD FROM reference](../data_io/load_data) for the complete list of operations. diff --git a/example_dataset/comprehensive_graph/csv_to_parquet.py b/example_dataset/comprehensive_graph/csv_to_parquet.py new file mode 100644 index 000000000..7dbfa89c9 --- /dev/null +++ b/example_dataset/comprehensive_graph/csv_to_parquet.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# Copyright 2020 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Preprocess comprehensive_graph CSV files into Parquet format. + +Outputs are written to example_dataset/comprehensive_graph/parquet/. + +Notes: + - interval_property is kept as a Parquet STRING column (pyarrow CSV inference). + NeuG's INTERVAL text format ("1year2months3days...") has no writable Parquet + native interval equivalent via pyarrow, so string is the simplest lossless form. + - All other columns (INT32/INT64/UINT32/UINT64/FLOAT/DOUBLE/STRING/DATE/DATETIME) + are preserved with explicitly typed Arrow columns. +""" + +import os +import sys + +try: + import pyarrow as pa + import pyarrow.csv as pa_csv + import pyarrow.parquet as pq +except ImportError: + print("Error: pyarrow is required. Install with: pip install pyarrow") + sys.exit(1) + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +OUTPUT_DIR = os.path.join(SCRIPT_DIR, "parquet") + +# Edge files: all have duplicate src/dst column both named 'node_a.id'. +# Rename the first two columns to src_id / dst_id for uniqueness in Parquet. +EDGE_FILES = { + "rel_a.csv", +} + +# All CSV files to convert +CSV_FILES = [ + "node_a.csv", + "rel_a.csv", +] + + +def convert(csv_filename: str) -> None: + csv_path = os.path.join(SCRIPT_DIR, csv_filename) + parquet_filename = os.path.splitext(csv_filename)[0] + ".parquet" + parquet_path = os.path.join(OUTPUT_DIR, parquet_filename) + + read_opts = pa_csv.ReadOptions() + parse_opts = pa_csv.ParseOptions(delimiter="|") + # timestamp_parsers: let Arrow auto-detect date/datetime columns. + # Explicitly type columns where CSV auto-inference picks wrong types: + # - i32_property: inferred as int64 (pyarrow defaults all integers to int64) + # - u32/u64_property: inferred as double (values overflow int64) + # - f32_property: inferred as double (pyarrow has no float32 inference) + node_column_types = { + "i32_property": pa.int32(), + "u32_property": pa.uint32(), + "u64_property": pa.uint64(), + "f32_property": pa.float32(), + } if csv_filename in ("node_a.csv", "node_b.csv") else {} + edge_column_types = { + "i32_weight": pa.int32(), + # datetime_weight values are date-only strings ("2023-05-17"); pyarrow infers + # date32[day] by default, but DT_DATETIME requires timestamp[ms]. + "datetime_weight": pa.timestamp("ms"), + } if csv_filename in EDGE_FILES else {} + column_types = {**node_column_types, **edge_column_types} + convert_opts = pa_csv.ConvertOptions( + timestamp_parsers=["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"], + column_types=column_types, + ) + + table = pa_csv.read_csv( + csv_path, + read_options=read_opts, + parse_options=parse_opts, + convert_options=convert_opts, + ) + + # Rename duplicate src/dst columns in edge files + if csv_filename in EDGE_FILES: + old_names = table.schema.names + new_names = ["src_id" if i == 0 else "dst_id" if i == 1 else n + for i, n in enumerate(old_names)] + table = table.rename_columns(new_names) + print(f" renamed src/dst columns: {old_names[:2]} -> ['src_id', 'dst_id']") + + # interval_property is kept as-is: pyarrow CSV infers it as string + pq.write_table(table, parquet_path) + print(f" {csv_filename} -> parquet/{parquet_filename} ({table.num_rows} rows, {len(table.schema)} cols)") + + +def main(): + os.makedirs(OUTPUT_DIR, exist_ok=True) + print(f"Output directory: {OUTPUT_DIR}\n") + + for csv_file in CSV_FILES: + csv_path = os.path.join(SCRIPT_DIR, csv_file) + if not os.path.exists(csv_path): + print(f"[SKIP] {csv_file} not found") + continue + print(f"Converting {csv_file} ...") + convert(csv_file) + + print("\nDone.") + + +if __name__ == "__main__": + main() diff --git a/example_dataset/comprehensive_graph/parquet/node_a.parquet b/example_dataset/comprehensive_graph/parquet/node_a.parquet new file mode 100644 index 000000000..4510e6d9f Binary files /dev/null and b/example_dataset/comprehensive_graph/parquet/node_a.parquet differ diff --git a/example_dataset/comprehensive_graph/parquet/rel_a.parquet b/example_dataset/comprehensive_graph/parquet/rel_a.parquet new file mode 100644 index 000000000..9a2e0bab8 Binary files /dev/null and b/example_dataset/comprehensive_graph/parquet/rel_a.parquet differ diff --git a/example_dataset/tinysnb/parquet/eMeets.parquet b/example_dataset/tinysnb/parquet/eMeets.parquet new file mode 100644 index 000000000..95aa5a815 Binary files /dev/null and b/example_dataset/tinysnb/parquet/eMeets.parquet differ diff --git a/example_dataset/tinysnb/parquet/vPerson.parquet b/example_dataset/tinysnb/parquet/vPerson.parquet new file mode 100644 index 000000000..3284e14e3 Binary files /dev/null and b/example_dataset/tinysnb/parquet/vPerson.parquet differ diff --git a/extension/CMakeLists.txt b/extension/CMakeLists.txt index d4829394c..85407aac6 100644 --- a/extension/CMakeLists.txt +++ b/extension/CMakeLists.txt @@ -124,5 +124,7 @@ endfunction() # Add json extension add_extension_if_enabled("json") +# Add parquet extension +add_extension_if_enabled("parquet") message(STATUS "=== Extension configuration complete ===") \ No newline at end of file diff --git a/extension/parquet/CMakeLists.txt b/extension/parquet/CMakeLists.txt new file mode 100644 index 000000000..ecd87a7ad --- /dev/null +++ b/extension/parquet/CMakeLists.txt @@ -0,0 +1,46 @@ +# Collect source files +file(GLOB PARQUET_EXTENSION_SOURCES + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp") +set(PARQUET_EXTENSION_OBJECT_FILES ${PARQUET_EXTENSION_SOURCES}) + +# Use the extension build system (should be called before add_subdirectory(test)) +build_extension_lib("parquet") + +# Set include directories +target_include_directories(neug_parquet_extension PRIVATE + ${CMAKE_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_BINARY_DIR}) + +if(ARROW_INCLUDE_DIRS) + target_include_directories(neug_parquet_extension PRIVATE SYSTEM ${ARROW_INCLUDE_DIRS}) +endif() + +# Link libraries +# Core Arrow symbols are already in neug (which links arrow_static, +# arrow_dataset_static, etc.). However, the linker strips unreferenced symbols +# from static archives — both arrow::parquet::* and arrow::dataset::Parquet* symbols +# are stripped because neug itself doesn't use them. +# +# Link the Arrow OBJECT libraries directly to include these symbols: +# - arrow_dataset_objlib: provides arrow::dataset::ParquetFileFormat, +# ParquetFragmentScanOptions, and other dataset symbols used by the extension. +# +# Additionally, link ARROW_PARQUET_LIB (the Parquet static library) with +# arrow::parquet::* symbols (e.g., ParquetFileReader, ArrowReaderProperties). +# +# Using OBJECT library targets (resolved at build time) instead of file(GLOB) +# for .o files (which fails on clean builds since GLOB runs at configure time). +target_link_libraries(neug_parquet_extension PRIVATE + neug +) + +# Build test by compiling extension sources directly (no link to neug_parquet_extension) +if(BUILD_TEST) + add_extension_test(NAME parquet + TEST_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/tests/parquet_test.cpp + EXTRA_LINK_LIBS arrow_dataset_objlib ${ARROW_PARQUET_LIB} + ) +endif() + diff --git a/extension/parquet/include/parquet_options.h b/extension/parquet/include/parquet_options.h new file mode 100644 index 000000000..0da07abca --- /dev/null +++ b/extension/parquet/include/parquet_options.h @@ -0,0 +1,99 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include "neug/utils/reader/options.h" +#include "neug/utils/reader/reader.h" + +namespace neug { +namespace reader { + +/** + * @brief Parquet-specific parse options + * + * These options control Parquet file reading behavior: + * - buffered_stream: Enable buffered I/O stream for better performance (default: true) + * - pre_buffer: Pre-buffer data for high-latency filesystems like S3 (default: false) + * - enable_io_coalescing: Enable Arrow I/O read coalescing (hole-filling cache) for + * improved performance when reading non-contiguous ranges (default: true). + * When true, uses lazy coalescing (CacheOptions::LazyDefaults); when false, uses + * eager coalescing (CacheOptions::Defaults). + * - row_batch_size: Number of rows per Arrow batch when converting from Parquet (default: 65536) + * + */ +struct ParquetParseOptions { + Option buffered_stream = + Option::BoolOption("BUFFERED_STREAM", true); + Option pre_buffer = + Option::BoolOption("PRE_BUFFER", false); + Option enable_io_coalescing = + Option::BoolOption("ENABLE_IO_COALESCING", true); + Option row_batch_size = + Option::Int64Option("PARQUET_BATCH_ROWS", 65536); +}; + +/** + * @brief Parquet-specific implementation of Arrow scan options builder + * + * This class extends ArrowOptionsBuilder to provide Parquet-specific + * functionality: + * - buildFragmentOptions(): Builds ParquetFragmentScanOptions with options + * for parallel reading, dictionary encoding, etc. + * - buildFileFormat(): Builds ParquetFileFormat + */ +class ArrowParquetOptionsBuilder : public ArrowOptionsBuilder { + public: + /** + * @brief Constructs an ArrowParquetOptionsBuilder with the given shared state + * @param state The shared read state containing Parquet schema and configuration + */ + explicit ArrowParquetOptionsBuilder(std::shared_ptr state) + : ArrowOptionsBuilder(state){}; + + virtual ArrowOptions build() const override; + + protected: + /** + * @brief Builds Parquet-specific fragment scan options + * + * Creates ParquetFragmentScanOptions with: + * - Reader properties: parallel reading, dictionary encoding, etc. + * + * @return ParquetFragmentScanOptions instance + */ + std::shared_ptr buildFragmentOptions() + const; + + /** + * @brief Builds ParquetFileFormat from scan options + * + * Extracts reader properties from the ParquetFragmentScanOptions + * and configures the ParquetFileFormat. + * + * @param options The scan options containing fragment_scan_options + * @return ParquetFileFormat instance configured with reader properties + */ + std::shared_ptr buildFileFormat( + const arrow::dataset::ScanOptions& options) const; +}; + +} // namespace reader +} // namespace neug diff --git a/extension/parquet/include/parquet_read_function.h b/extension/parquet/include/parquet_read_function.h new file mode 100644 index 000000000..1d0cbc5a5 --- /dev/null +++ b/extension/parquet/include/parquet_read_function.h @@ -0,0 +1,111 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include "parquet_options.h" +#include "neug/compiler/function/function.h" +#include "neug/compiler/function/read_function.h" +#include "neug/execution/execute/ops/batch/batch_update_utils.h" +#include "neug/utils/reader/options.h" +#include "neug/utils/reader/reader.h" +#include "neug/utils/reader/schema.h" +#include "neug/utils/reader/sniffer.h" + +namespace neug { +namespace function { + +struct ParquetReadFunction { + static constexpr const char* name = "PARQUET_SCAN"; + + static function_set getFunctionSet() { + auto typeIDs = + std::vector{common::LogicalTypeID::STRING}; + auto readFunction = std::make_unique(name, typeIDs); + readFunction->execFunc = execFunc; + readFunction->sniffFunc = sniffFunc; + function_set functionSet; + functionSet.push_back(std::move(readFunction)); + return functionSet; + } + + static execution::Context execFunc( + std::shared_ptr state) { + // Get file system from provider + LocalFileSystemProvider fsProvider; + auto fileInfo = fsProvider.provide(state->schema.file); + state->schema.file.paths = fileInfo.resolvedPaths; + + // Create Parquet-specific options builder + auto optionsBuilder = + std::make_unique(state); + + // Create Arrow reader with Parquet options + auto reader = std::make_unique( + state, std::move(optionsBuilder), fileInfo.fileSystem); + + // Execute read operation. + // ArrowReader::read() throws exceptions (via THROW_IO_EXCEPTION / + // THROW_INVALID_ARGUMENT_EXCEPTION) on all Arrow error paths, so + // errors are propagated to the caller rather than silently swallowed. + execution::Context ctx; + auto localState = std::make_shared(); + reader->read(localState, ctx); + return ctx; + } + + static std::shared_ptr sniffFunc( + const reader::FileSchema& schema) { + auto state = std::make_shared(); + auto& externalSchema = state->schema; + + // Create table entry schema with empty column names and types, + // which need to be inferred from Parquet metadata + externalSchema.entry = std::make_shared(); + externalSchema.file = schema; + + // Resolve file paths + LocalFileSystemProvider fsProvider; + auto fileInfo = fsProvider.provide(state->schema.file); + state->schema.file.paths = fileInfo.resolvedPaths; + + // Create Parquet-specific options builder + auto optionsBuilder = + std::make_unique(state); + + // Create Arrow reader with Parquet options + auto reader = std::make_shared( + state, std::move(optionsBuilder), fileInfo.fileSystem); + + // Create sniffer to infer schema from Parquet metadata + auto sniffer = std::make_shared(reader); + auto sniffResult = sniffer->sniff(); + + if (!sniffResult) { + LOG(ERROR) << "Failed to sniff Parquet schema: " + << sniffResult.error().ToString(); + THROW_IO_EXCEPTION("Failed to sniff Parquet schema: " + + sniffResult.error().ToString()); + } + return sniffResult.value(); + } +}; + +} // namespace function +} // namespace neug diff --git a/extension/parquet/src/parquet_extension.cpp b/extension/parquet/src/parquet_extension.cpp new file mode 100644 index 000000000..5b3c2d405 --- /dev/null +++ b/extension/parquet/src/parquet_extension.cpp @@ -0,0 +1,49 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "glog/logging.h" +#include "neug/compiler/extension/extension_api.h" +#include "neug/utils/exception/exception.h" + +#include "parquet_read_function.h" + +extern "C" { + +void Init() { + LOG(INFO) << "[parquet extension] init called"; + + try { + // Register Parquet read function (based on ReadFunction pattern) + neug::extension::ExtensionAPI::registerFunction< + neug::function::ParquetReadFunction>( + neug::catalog::CatalogEntryType::TABLE_FUNCTION_ENTRY); + + neug::extension::ExtensionAPI::registerExtension( + neug::extension::ExtensionInfo{ + "parquet", "Provides functions to read Parquet files."}); + + LOG(INFO) << "[parquet extension] functions registered successfully"; + } catch (const std::exception& e) { + THROW_EXCEPTION_WITH_FILE_LINE("[parquet extension] registration failed: " + + std::string(e.what())); + } catch (...) { + THROW_EXCEPTION_WITH_FILE_LINE( + "[parquet extension] registration failed: unknown exception"); + } +} + +const char* Name() { return "PARQUET"; } + +} // extern "C" diff --git a/extension/parquet/src/parquet_options.cc b/extension/parquet/src/parquet_options.cc new file mode 100644 index 000000000..2c54f559d --- /dev/null +++ b/extension/parquet/src/parquet_options.cc @@ -0,0 +1,148 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "parquet_options.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "neug/utils/exception/exception.h" +#include "neug/utils/reader/options.h" +#include "neug/utils/reader/reader.h" +#include "neug/utils/reader/schema.h" + +namespace neug { +namespace reader { + +ArrowOptions ArrowParquetOptionsBuilder::build() const { + if (!state) { + THROW_INVALID_ARGUMENT_EXCEPTION("State is null"); + } + + auto scanOptions = std::make_shared(); + + // Build format-specific fragment scan options + auto fragment_scan_options = buildFragmentOptions(); + scanOptions->fragment_scan_options = fragment_scan_options; + if (!state->schema.entry) { + THROW_INVALID_ARGUMENT_EXCEPTION("Entry schema is null"); + } + scanOptions->dataset_schema = createSchema(*state->schema.entry); + + // Build file format using scan options + auto fileFormat = buildFileFormat(*scanOptions); + + // Create ArrowOptions with both scanOptions and fileFormat + ArrowOptions arrowOptions; + arrowOptions.scanOptions = scanOptions; + arrowOptions.fileFormat = fileFormat; + return arrowOptions; +} + +std::shared_ptr +ArrowParquetOptionsBuilder::buildFragmentOptions() const { + if (!state) { + THROW_INVALID_ARGUMENT_EXCEPTION("State is null"); + } + if (!state->schema.entry) { + THROW_INVALID_ARGUMENT_EXCEPTION("Entry schema is null"); + } + + auto fragment_scan_options = + std::make_shared(); + + const FileSchema& fileSchema = state->schema.file; + auto& options = fileSchema.options; + ParquetParseOptions parquetOpts; + ReadOptions readOpts; + + // Configure Parquet-specific reader properties + auto reader_properties = std::make_shared(); + + // Enable buffered stream if configured + if (parquetOpts.buffered_stream.get(options)) { + reader_properties->enable_buffered_stream(); + } + + // Set I/O buffer size in bytes + int64_t buffer_size = readOpts.batch_size.get(options); + reader_properties->set_buffer_size(buffer_size); + + fragment_scan_options->reader_properties = reader_properties; + + // Configure Arrow-specific reader properties + auto arrow_reader_properties = std::make_shared(); + + // Set Arrow row batch size (number of rows per batch) + int64_t row_batch_size = parquetOpts.row_batch_size.get(options); + arrow_reader_properties->set_batch_size(row_batch_size); + + // Use threads setting from general read options + arrow_reader_properties->set_use_threads(readOpts.use_threads.get(options)); + + // Configure pre-buffering for high-latency filesystems + arrow_reader_properties->set_pre_buffer(parquetOpts.pre_buffer.get(options)); + + // Configure caching via Arrow I/O coalescing (hole-filling cache). + // When enable_io_coalescing=true (default), use lazy coalescing which only + // loads explicitly-requested byte ranges (CacheOptions::LazyDefaults). + // When false, use eager coalescing which pre-fetches data more aggressively + // (CacheOptions::Defaults). + if (parquetOpts.enable_io_coalescing.get(options)) { + arrow_reader_properties->set_cache_options( + arrow::io::CacheOptions::LazyDefaults()); + } else { + arrow_reader_properties->set_cache_options( + arrow::io::CacheOptions::Defaults()); + } + + fragment_scan_options->arrow_reader_properties = arrow_reader_properties; + + return fragment_scan_options; +} + +std::shared_ptr +ArrowParquetOptionsBuilder::buildFileFormat( + const arrow::dataset::ScanOptions& options) const { + auto fileFormat = std::make_shared(); + auto fragmentOpts = options.fragment_scan_options; + if (!fragmentOpts) { + LOG(WARNING) + << "fragment_scan_options is null in ScanOptions, Parquet reader " + "will use default configuration"; + return fileFormat; + } + + auto parquetFragmentOpts = + std::dynamic_pointer_cast( + fragmentOpts); + if (!parquetFragmentOpts) { + LOG(WARNING) << "fragment_scan_options is not ParquetFragmentScanOptions, " + "reader will use default configuration"; + return fileFormat; + } + + fileFormat->default_fragment_scan_options = options.fragment_scan_options; + return fileFormat; +} + +} // namespace reader +} // namespace neug diff --git a/extension/parquet/tests/parquet_test.cpp b/extension/parquet/tests/parquet_test.cpp new file mode 100644 index 000000000..c0f8c6c3c --- /dev/null +++ b/extension/parquet/tests/parquet_test.cpp @@ -0,0 +1,841 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "neug/compiler/common/case_insensitive_map.h" +#include "neug/execution/common/columns/arrow_context_column.h" +#include "neug/execution/common/context.h" +#include "neug/generated/proto/plan/basic_type.pb.h" +#include "neug/utils/reader/options.h" +#include "neug/utils/reader/reader.h" +#include "neug/utils/reader/schema.h" + +#include "../../extension/parquet/include/parquet_options.h" + +namespace neug { +namespace test { + +static constexpr const char* PARQUET_TEST_DIR = "/tmp/parquet_test"; + +class ParquetTest : public ::testing::Test { + public: + void SetUp() override { + if (std::filesystem::exists(PARQUET_TEST_DIR)) { + std::filesystem::remove_all(PARQUET_TEST_DIR); + } + std::filesystem::create_directories(PARQUET_TEST_DIR); + } + + void TearDown() override { + if (std::filesystem::exists(PARQUET_TEST_DIR)) { + std::filesystem::remove_all(PARQUET_TEST_DIR); + } + } + + // Helper function to create a simple Parquet file + void createSimpleParquetFile(const std::string& filename) { + // Create Arrow schema + auto schema = arrow::schema({ + arrow::field("id", arrow::int64()), + arrow::field("name", arrow::utf8()), + arrow::field("value", arrow::float64()) + }); + + // Create data + arrow::Int64Builder id_builder; + arrow::StringBuilder name_builder; + arrow::DoubleBuilder value_builder; + + ASSERT_TRUE(id_builder.Append(1).ok()); + ASSERT_TRUE(id_builder.Append(2).ok()); + ASSERT_TRUE(id_builder.Append(3).ok()); + + ASSERT_TRUE(name_builder.Append("Alice").ok()); + ASSERT_TRUE(name_builder.Append("Bob").ok()); + ASSERT_TRUE(name_builder.Append("Charlie").ok()); + + ASSERT_TRUE(value_builder.Append(10.5).ok()); + ASSERT_TRUE(value_builder.Append(20.3).ok()); + ASSERT_TRUE(value_builder.Append(30.7).ok()); + + std::shared_ptr id_array; + std::shared_ptr name_array; + std::shared_ptr value_array; + + ASSERT_TRUE(id_builder.Finish(&id_array).ok()); + ASSERT_TRUE(name_builder.Finish(&name_array).ok()); + ASSERT_TRUE(value_builder.Finish(&value_array).ok()); + + // Create table + auto table = arrow::Table::Make(schema, {id_array, name_array, value_array}); + + // Write to Parquet file + std::string filepath = std::string(PARQUET_TEST_DIR) + "/" + filename; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 3)); + } + + // Helper function to create DataType as shared_ptr + std::shared_ptr<::common::DataType> createInt64Type() { + auto type = std::make_shared<::common::DataType>(); + type->set_primitive_type(::common::PrimitiveType::DT_SIGNED_INT64); + return type; + } + + std::shared_ptr<::common::DataType> createStringType() { + auto type = std::make_shared<::common::DataType>(); + auto strType = std::make_unique<::common::String>(); + auto varChar = std::make_unique<::common::String::VarChar>(); + strType->set_allocated_var_char(varChar.release()); + type->set_allocated_string(strType.release()); + return type; + } + + std::shared_ptr<::common::DataType> createDoubleType() { + auto type = std::make_shared<::common::DataType>(); + type->set_primitive_type(::common::PrimitiveType::DT_DOUBLE); + return type; + } + + std::shared_ptr<::common::DataType> createInt32Type() { + auto type = std::make_shared<::common::DataType>(); + type->set_primitive_type(::common::PrimitiveType::DT_SIGNED_INT32); + return type; + } + + std::shared_ptr<::common::DataType> createBoolType() { + auto type = std::make_shared<::common::DataType>(); + type->set_primitive_type(::common::PrimitiveType::DT_BOOL); + return type; + } + + // Helper function to create ReadSharedState + std::shared_ptr createSharedState( + const std::string& parquetFile, + const std::vector& columnNames, + const std::vector>& columnTypes, + const common::case_insensitive_map_t& options = {}) { + auto sharedState = std::make_shared(); + + auto entrySchema = std::make_shared(); + entrySchema->columnNames = columnNames; + entrySchema->columnTypes = columnTypes; + + // Create FileSchema + reader::FileSchema fileSchema; + fileSchema.paths = {std::string(PARQUET_TEST_DIR) + "/" + parquetFile}; + fileSchema.format = "parquet"; + fileSchema.options = options; + + // Create ExternalSchema + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + + sharedState->schema = std::move(externalSchema); + + return sharedState; + } + + std::shared_ptr createParquetReader( + const std::shared_ptr& sharedState) { + auto fileSystem = std::make_shared(); + auto optionsBuilder = + std::make_unique(sharedState); + return std::make_shared( + sharedState, std::move(optionsBuilder), std::move(fileSystem)); + } +}; + +// ============================================================================= +// Test Suite 1: Options Translation Tests +// Verify that Neug options are correctly translated to Arrow Parquet configuration +// ============================================================================= + +TEST_F(ParquetTest, TestOptionsBuilder_BuildsValidParquetFragmentScanOptions) { + createSimpleParquetFile("test_options.parquet"); + + auto sharedState = createSharedState( + "test_options.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + // Verify the builder creates ParquetFragmentScanOptions (not generic FragmentScanOptions) + ASSERT_NE(options.scanOptions, nullptr); + ASSERT_NE(options.scanOptions->fragment_scan_options, nullptr); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr) + << "Extension should create ParquetFragmentScanOptions, not generic FragmentScanOptions"; + + // Verify reader_properties and arrow_reader_properties are initialized + EXPECT_NE(parquetFragmentOpts->reader_properties, nullptr) + << "Extension should initialize reader_properties"; + EXPECT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr) + << "Extension should initialize arrow_reader_properties"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_BufferSize) { + createSimpleParquetFile("test_buffer.parquet"); + + // Test custom buffer_size option + const int64_t custom_buffer_size = 2048; + auto sharedState = createSharedState( + "test_buffer.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"batch_size", std::to_string(custom_buffer_size)}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->reader_properties, nullptr); + + // Verify the Neug batch_size option is correctly translated to Arrow buffer_size + EXPECT_EQ(parquetFragmentOpts->reader_properties->buffer_size(), custom_buffer_size) + << "Extension should translate batch_size option to Arrow buffer_size"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_ParquetBatchRows) { + createSimpleParquetFile("test_batch_rows.parquet"); + + // Test PARQUET_BATCH_ROWS option + const int64_t custom_batch_rows = 4096; + auto sharedState = createSharedState( + "test_batch_rows.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"PARQUET_BATCH_ROWS", std::to_string(custom_batch_rows)}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr); + + // Verify PARQUET_BATCH_ROWS is translated to Arrow batch_size + EXPECT_EQ(parquetFragmentOpts->arrow_reader_properties->batch_size(), custom_batch_rows) + << "Extension should translate PARQUET_BATCH_ROWS to Arrow batch_size"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_PreBuffer) { + createSimpleParquetFile("test_prebuffer.parquet"); + + // Test PRE_BUFFER=true (default is false) + auto sharedState = createSharedState( + "test_prebuffer.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"PRE_BUFFER", "true"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr); + + // Verify PRE_BUFFER option is translated + EXPECT_TRUE(parquetFragmentOpts->arrow_reader_properties->pre_buffer()) + << "Extension should translate PRE_BUFFER=true to Arrow pre_buffer setting"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_UseThreads) { + createSimpleParquetFile("test_threads.parquet"); + + // Test parallel=false (use_threads) + auto sharedState = createSharedState( + "test_threads.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"parallel", "false"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr); + + // Verify parallel option is translated to use_threads + EXPECT_FALSE(parquetFragmentOpts->arrow_reader_properties->use_threads()) + << "Extension should translate parallel=false to use_threads=false"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_IoCoalescing) { + createSimpleParquetFile("test_cache.parquet"); + + // Test ENABLE_IO_COALESCING=true (default) — should use LazyDefaults (lazy=true) + auto sharedState1 = createSharedState( + "test_cache.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"ENABLE_IO_COALESCING", "true"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder1(sharedState1); + auto options1 = optionsBuilder1.build(); + + auto parquetFragmentOpts1 = std::dynamic_pointer_cast( + options1.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts1, nullptr); + ASSERT_NE(parquetFragmentOpts1->arrow_reader_properties, nullptr); + + // Verify lazy coalescing is enabled when ENABLE_IO_COALESCING=true + auto cache_opts1 = parquetFragmentOpts1->arrow_reader_properties->cache_options(); + EXPECT_TRUE(cache_opts1.lazy) + << "Extension should use LazyDefaults (lazy=true) when ENABLE_IO_COALESCING=true"; + + // Test ENABLE_IO_COALESCING=false — should use Defaults (lazy=false, eager coalescing) + auto sharedState2 = createSharedState( + "test_cache.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"ENABLE_IO_COALESCING", "false"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder2(sharedState2); + auto options2 = optionsBuilder2.build(); + + auto parquetFragmentOpts2 = std::dynamic_pointer_cast( + options2.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts2, nullptr); + + auto cache_opts2 = parquetFragmentOpts2->arrow_reader_properties->cache_options(); + EXPECT_FALSE(cache_opts2.lazy) + << "Extension should use Defaults (lazy=false) when ENABLE_IO_COALESCING=false"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_DefaultValues) { + createSimpleParquetFile("test_defaults.parquet"); + + // Create state without any options - should use defaults + auto sharedState = createSharedState( + "test_defaults.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr); + + // Verify default values are applied + // Default PARQUET_BATCH_ROWS = 65536 + EXPECT_EQ(parquetFragmentOpts->arrow_reader_properties->batch_size(), 65536) + << "Extension should use default PARQUET_BATCH_ROWS=65536"; + + // Default PRE_BUFFER = false + EXPECT_FALSE(parquetFragmentOpts->arrow_reader_properties->pre_buffer()) + << "Extension should use default PRE_BUFFER=false"; + + // Default parallel/use_threads = true + EXPECT_TRUE(parquetFragmentOpts->arrow_reader_properties->use_threads()) + << "Extension should use default parallel=true"; +} + +TEST_F(ParquetTest, TestFileFormatConfiguration_SharesFragmentOptions) { + createSimpleParquetFile("test_format.parquet"); + + auto sharedState = createSharedState( + "test_format.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"PARQUET_BATCH_ROWS", "2048"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + // Verify FileFormat is ParquetFileFormat + ASSERT_NE(options.fileFormat, nullptr); + auto parquetFileFormat = std::dynamic_pointer_cast( + options.fileFormat); + ASSERT_NE(parquetFileFormat, nullptr) + << "Extension should create ParquetFileFormat"; + + // Verify the FileFormat shares the same fragment_scan_options as ScanOptions + // This ensures consistency in configuration + EXPECT_EQ(parquetFileFormat->default_fragment_scan_options, + options.scanOptions->fragment_scan_options) + << "Extension should set ParquetFileFormat's default_fragment_scan_options to match ScanOptions"; +} + +// ============================================================================= +// Test Suite 2: Type Mapping Tests +// Verify type conversion between Neug DataType and Arrow types +// ============================================================================= + +TEST_F(ParquetTest, TestTypeMapping_StringToLargeUtf8) { + createSimpleParquetFile("test_string_type.parquet"); + + // Neug uses STRING type, Arrow Parquet may have utf8 + // Extension should convert to large_utf8 for consistency + auto sharedState = createSharedState( + "test_string_type.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"batch_read", "false"}}); + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + // Verify string column is converted to large_utf8 + auto col1 = ctx.columns[1]; + ASSERT_EQ(col1->column_type(), execution::ContextColumnType::kArrowArray); + auto arrayColumn1 = std::dynamic_pointer_cast(col1); + auto arrowType1 = arrayColumn1->GetArrowType(); + + EXPECT_TRUE(arrowType1->Equals(arrow::large_utf8())) + << "Extension should convert Arrow utf8 to large_utf8 for Neug STRING type. " + << "Got: " << arrowType1->ToString(); +} + +TEST_F(ParquetTest, TestTypeMapping_PreserveNumericTypes) { + // Create Parquet file with various numeric types + auto schema = arrow::schema({ + arrow::field("int32_col", arrow::int32()), + arrow::field("int64_col", arrow::int64()), + arrow::field("double_col", arrow::float64()), + arrow::field("bool_col", arrow::boolean()) + }); + + arrow::Int32Builder int32_builder; + arrow::Int64Builder int64_builder; + arrow::DoubleBuilder double_builder; + arrow::BooleanBuilder bool_builder; + + ASSERT_TRUE(int32_builder.Append(42).ok()); + ASSERT_TRUE(int64_builder.Append(9223372036854775807LL).ok()); + ASSERT_TRUE(double_builder.Append(3.14159).ok()); + ASSERT_TRUE(bool_builder.Append(true).ok()); + + std::shared_ptr arrays[4]; + ASSERT_TRUE(int32_builder.Finish(&arrays[0]).ok()); + ASSERT_TRUE(int64_builder.Finish(&arrays[1]).ok()); + ASSERT_TRUE(double_builder.Finish(&arrays[2]).ok()); + ASSERT_TRUE(bool_builder.Finish(&arrays[3]).ok()); + + auto table = arrow::Table::Make(schema, {arrays[0], arrays[1], arrays[2], arrays[3]}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_numeric_types.parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1)); + + // Read with Neug types + auto sharedState = createSharedState( + "test_numeric_types.parquet", + {"int32_col", "int64_col", "double_col", "bool_col"}, + {createInt32Type(), createInt64Type(), createDoubleType(), createBoolType()}, + {{"batch_read", "false"}}); + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + EXPECT_EQ(ctx.col_num(), 4); + EXPECT_EQ(ctx.row_num(), 1); + + // Verify types are preserved correctly + auto col0 = std::dynamic_pointer_cast(ctx.columns[0]); + EXPECT_TRUE(col0->GetArrowType()->Equals(arrow::int32())) + << "Extension should preserve int32 type mapping"; + + auto col1 = std::dynamic_pointer_cast(ctx.columns[1]); + EXPECT_TRUE(col1->GetArrowType()->Equals(arrow::int64())) + << "Extension should preserve int64 type mapping"; + + auto col2 = std::dynamic_pointer_cast(ctx.columns[2]); + EXPECT_TRUE(col2->GetArrowType()->Equals(arrow::float64())) + << "Extension should preserve double type mapping"; + + auto col3 = std::dynamic_pointer_cast(ctx.columns[3]); + EXPECT_TRUE(col3->GetArrowType()->Equals(arrow::boolean())) + << "Extension should preserve boolean type mapping"; +} + +// ============================================================================= +// Test Suite 3: Integration with Neug Query System +// Verify filter pushdown and column pruning work through the extension +// ============================================================================= + +TEST_F(ParquetTest, TestIntegration_ColumnPruning) { + // Create Parquet file with 4 columns + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("name", arrow::utf8()), + arrow::field("score", arrow::float64()), + arrow::field("grade", arrow::utf8()) + }); + + arrow::Int32Builder id_builder; + arrow::StringBuilder name_builder, grade_builder; + arrow::DoubleBuilder score_builder; + + ASSERT_TRUE(id_builder.Append(1).ok()); + ASSERT_TRUE(name_builder.Append("Alice").ok()); + ASSERT_TRUE(score_builder.Append(95.5).ok()); + ASSERT_TRUE(grade_builder.Append("A").ok()); + + std::shared_ptr id_array, name_array, score_array, grade_array; + ASSERT_TRUE(id_builder.Finish(&id_array).ok()); + ASSERT_TRUE(name_builder.Finish(&name_array).ok()); + ASSERT_TRUE(score_builder.Finish(&score_array).ok()); + ASSERT_TRUE(grade_builder.Finish(&grade_array).ok()); + + auto table = arrow::Table::Make(schema, {id_array, name_array, score_array, grade_array}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_pruning.parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1)); + + // Set up shared state with skipColumns + auto sharedState = std::make_shared(); + auto entrySchema = std::make_shared(); + entrySchema->columnNames = {"id", "name", "score", "grade"}; + entrySchema->columnTypes = {createInt32Type(), createStringType(), + createDoubleType(), createStringType()}; + + reader::FileSchema fileSchema; + fileSchema.paths = {filepath}; + fileSchema.format = "parquet"; + fileSchema.options = {{"batch_read", "false"}}; + + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + sharedState->schema = std::move(externalSchema); + + // Neug's column pruning: skip "name" column + sharedState->skipColumns = {"name"}; + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + // Verify extension translates skipColumns to Arrow projection + // Should have 3 columns (id, score, grade - "name" is skipped) + EXPECT_EQ(ctx.col_num(), 3) + << "Extension should translate Neug's skipColumns to Arrow column projection"; + EXPECT_EQ(sharedState->columnNum(), 3) + << "Extension should update columnNum after pruning"; +} + +TEST_F(ParquetTest, TestIntegration_FilterPushdown) { + // Create Parquet file with test data + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("score", arrow::float64()) + }); + + arrow::Int32Builder id_builder; + arrow::DoubleBuilder score_builder; + + std::vector> test_data = { + {1, 95.5}, + {2, 87.0}, + {3, 92.5}, + {4, 78.0}, + {5, 98.0} + }; + + for (const auto& [id, score] : test_data) { + ASSERT_TRUE(id_builder.Append(id).ok()); + ASSERT_TRUE(score_builder.Append(score).ok()); + } + + std::shared_ptr id_array, score_array; + ASSERT_TRUE(id_builder.Finish(&id_array).ok()); + ASSERT_TRUE(score_builder.Finish(&score_array).ok()); + + auto table = arrow::Table::Make(schema, {id_array, score_array}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_filter.parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 5)); + + // Create Neug filter expression: score > 90.0 + auto filterExpr = std::make_shared<::common::Expression>(); + + auto var_opr = filterExpr->add_operators(); + auto var = var_opr->mutable_var(); + var->mutable_tag()->set_name("score"); + + auto gt_opr = filterExpr->add_operators(); + gt_opr->set_logical(::common::Logical::GT); + + auto const_opr = filterExpr->add_operators(); + const_opr->mutable_const_()->set_f64(90.0); + + // Set up shared state with filter + auto sharedState = std::make_shared(); + auto entrySchema = std::make_shared(); + entrySchema->columnNames = {"id", "score"}; + entrySchema->columnTypes = {createInt32Type(), createDoubleType()}; + + reader::FileSchema fileSchema; + fileSchema.paths = {filepath}; + fileSchema.format = "parquet"; + fileSchema.options = {{"batch_read", "false"}}; + + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + sharedState->schema = std::move(externalSchema); + sharedState->skipRows = filterExpr; // Neug's filter expression + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + // Verify extension translates Neug filter to Arrow filter + EXPECT_EQ(ctx.col_num(), 2); + EXPECT_EQ(ctx.row_num(), 3) + << "Extension should translate Neug's skipRows filter to Arrow filter pushdown. " + << "Should filter to 3 rows with score > 90.0"; + + // Verify the filtered data + auto col1 = std::dynamic_pointer_cast(ctx.columns[1]); + ASSERT_NE(col1, nullptr); + const auto& columns = col1->GetColumns(); + ASSERT_FALSE(columns.empty()); + auto scoreArray = std::static_pointer_cast(columns[0]); + + // All scores should be > 90.0 + for (int64_t i = 0; i < scoreArray->length(); ++i) { + EXPECT_GT(scoreArray->Value(i), 90.0) + << "Extension's filter translation should result in all scores > 90.0"; + } +} + +TEST_F(ParquetTest, TestIntegration_BatchReadMode) { + createSimpleParquetFile("test_batch_mode.parquet"); + + // Test with batch_read=true (streaming mode) + auto sharedState = createSharedState( + "test_batch_mode.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"batch_read", "true"}}); + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + EXPECT_EQ(ctx.col_num(), 3); + // Verify extension translates batch_read option to streaming column type + auto col0 = ctx.columns[0]; + EXPECT_EQ(col0->column_type(), execution::ContextColumnType::kArrowStream) + << "Extension should use ArrowStream column type when batch_read=true"; + + // Test with batch_read=false (full read mode) + auto sharedState2 = createSharedState( + "test_batch_mode.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"batch_read", "false"}}); + + auto reader2 = createParquetReader(sharedState2); + auto localState2 = std::make_shared(); + execution::Context ctx2; + reader2->read(localState2, ctx2); + + auto col0_2 = ctx2.columns[0]; + EXPECT_EQ(col0_2->column_type(), execution::ContextColumnType::kArrowArray) + << "Extension should use ArrowArray column type when batch_read=false"; +} + +TEST_F(ParquetTest, TestIntegration_CombinedFilterAndProjection) { + // Create Parquet file + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("name", arrow::utf8()), + arrow::field("score", arrow::float64()), + arrow::field("grade", arrow::utf8()) + }); + + arrow::Int32Builder id_builder; + arrow::StringBuilder name_builder, grade_builder; + arrow::DoubleBuilder score_builder; + + std::vector> test_data = { + {1, "Alice", 95.5, "A"}, + {2, "Bob", 87.0, "B"}, + {3, "Charlie", 92.5, "A"}, + {4, "David", 78.0, "C"} + }; + + for (const auto& [id, name, score, grade] : test_data) { + ASSERT_TRUE(id_builder.Append(id).ok()); + ASSERT_TRUE(name_builder.Append(name).ok()); + ASSERT_TRUE(score_builder.Append(score).ok()); + ASSERT_TRUE(grade_builder.Append(grade).ok()); + } + + std::shared_ptr id_array, name_array, score_array, grade_array; + ASSERT_TRUE(id_builder.Finish(&id_array).ok()); + ASSERT_TRUE(name_builder.Finish(&name_array).ok()); + ASSERT_TRUE(score_builder.Finish(&score_array).ok()); + ASSERT_TRUE(grade_builder.Finish(&grade_array).ok()); + + auto table = arrow::Table::Make(schema, {id_array, name_array, score_array, grade_array}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_combined.parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 4)); + + // Create Neug filter: score > 90.0 + auto filterExpr = std::make_shared<::common::Expression>(); + auto var_opr = filterExpr->add_operators(); + var_opr->mutable_var()->mutable_tag()->set_name("score"); + auto gt_opr = filterExpr->add_operators(); + gt_opr->set_logical(::common::Logical::GT); + auto const_opr = filterExpr->add_operators(); + const_opr->mutable_const_()->set_f64(90.0); + + // Set up shared state with both filter and column pruning + auto sharedState = std::make_shared(); + auto entrySchema = std::make_shared(); + entrySchema->columnNames = {"id", "name", "score", "grade"}; + entrySchema->columnTypes = {createInt32Type(), createStringType(), + createDoubleType(), createStringType()}; + + reader::FileSchema fileSchema; + fileSchema.paths = {filepath}; + fileSchema.format = "parquet"; + fileSchema.options = {{"batch_read", "false"}}; + + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + sharedState->schema = std::move(externalSchema); + sharedState->skipColumns = {"name"}; // Prune "name" column + sharedState->skipRows = filterExpr; // Filter score > 90.0 + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + // Verify extension correctly combines filter and projection + EXPECT_EQ(ctx.col_num(), 3) + << "Extension should apply column pruning (3 of 4 columns)"; + EXPECT_EQ(ctx.row_num(), 2) + << "Extension should apply filter (2 rows with score > 90.0)"; + EXPECT_EQ(sharedState->columnNum(), 3) + << "Extension should update columnNum after pruning"; +} + +// ============================================================================= +// Test Suite 4: Multi-file Handling +// Verify extension correctly handles multiple Parquet files +// ============================================================================= + +TEST_F(ParquetTest, TestMultiFile_ExplicitPaths) { + // Create multiple Parquet files + for (int fileIdx = 0; fileIdx < 3; ++fileIdx) { + auto schema = arrow::schema({arrow::field("id", arrow::int32())}); + arrow::Int32Builder builder; + + for (int i = 0; i < 10; ++i) { + ASSERT_TRUE(builder.Append(fileIdx * 10 + i).ok()); + } + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto table = arrow::Table::Make(schema, {array}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_multi_" + + std::to_string(fileIdx) + ".parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 10)); + } + + // Extension should handle multiple explicit file paths + auto sharedState = std::make_shared(); + auto entrySchema = std::make_shared(); + entrySchema->columnNames = {"id"}; + entrySchema->columnTypes = {createInt32Type()}; + + reader::FileSchema fileSchema; + fileSchema.paths = { + std::string(PARQUET_TEST_DIR) + "/test_multi_0.parquet", + std::string(PARQUET_TEST_DIR) + "/test_multi_1.parquet", + std::string(PARQUET_TEST_DIR) + "/test_multi_2.parquet" + }; + fileSchema.format = "parquet"; + fileSchema.options = {{"batch_read", "false"}}; + + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + sharedState->schema = std::move(externalSchema); + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + EXPECT_EQ(ctx.col_num(), 1); + EXPECT_EQ(ctx.row_num(), 30) + << "Extension should correctly read and concatenate multiple Parquet files"; +} + +// End of Test Suites + +} // namespace test +} // namespace neug diff --git a/scripts/install_deps.sh b/scripts/install_deps.sh index 8b32b1642..b6e0a432e 100644 --- a/scripts/install_deps.sh +++ b/scripts/install_deps.sh @@ -225,14 +225,15 @@ install_arrow_from_source() { -DARROW_HDFS=OFF \ -DARROW_ORC=OFF \ -DARROW_JSON=ON \ - -DARROW_PARQUET=OFF \ + -DARROW_PARQUET=ON \ -DARROW_PLASMA=OFF \ -DARROW_PYTHON=OFF \ -DARROW_S3=OFF \ -DARROW_WITH_BZ2=OFF \ -DARROW_WITH_LZ4=OFF \ - -DARROW_WITH_SNAPPY=OFF \ - -DARROW_WITH_ZSTD=OFF \ + -DARROW_WITH_SNAPPY=ON \ + -DARROW_WITH_ZLIB=ON \ + -DARROW_WITH_ZSTD=ON \ -DARROW_WITH_BROTLI=OFF \ -DARROW_IPC=ON \ -DARROW_BUILD_BENCHMARKS=OFF \ diff --git a/tools/python_bind/example/complex_test.py b/tools/python_bind/example/complex_test.py index fbcb9e86e..e2acccb32 100644 --- a/tools/python_bind/example/complex_test.py +++ b/tools/python_bind/example/complex_test.py @@ -73,6 +73,9 @@ def fail(msg, err=None): JSONL_FILE = os.path.join( REPO_ROOT, "example_dataset", "tinysnb", "json", "vPerson.jsonl" ) +PARQUET_FILE = os.path.join( + REPO_ROOT, "example_dataset", "tinysnb", "parquet", "vPerson.parquet" +) def run_statement(conn, desc, statement): @@ -227,6 +230,67 @@ def _projection(rows): fail("Export LOAD result to JSONL", e) +def run_parquet_extension_suite(db_parquet, conn_parquet, db_path_parquet): + statements = [ + ("LOAD PARQUET succeeded", "LOAD PARQUET;"), + ] + + for desc, stmt in statements: + run_statement(conn_parquet, desc, stmt) + + if not os.path.isfile(PARQUET_FILE): + fail(f"Parquet file not found: {PARQUET_FILE}") + else: + + def _load_all(rows): + print(f" Parquet: loaded {len(rows)} rows from vPerson.parquet") + if rows: + print(f" First row sample: {rows[0]}") + assert len(rows) > 0, "Expected at least 1 row" + return f"LOAD FROM Parquet file returned {len(rows)} rows" + + run_query_with_handler( + conn_parquet, + "LOAD FROM Parquet file", + f'LOAD FROM "{PARQUET_FILE}" RETURN *;', + _load_all, + print_traceback=True, + ) + + def _projection(rows): + print(f" Column projection (fName, age): {len(rows)} rows") + if rows: + print(f" Sample: {rows[0]}") + assert len(rows) > 0, "Expected at least 1 row" + assert len(rows[0]) == 2, "Should return only 2 columns" + return "Parquet column projection" + + run_query_with_handler( + conn_parquet, + "Parquet column projection", + f'LOAD FROM "{PARQUET_FILE}" RETURN fName, age;', + _projection, + ) + + def _filter(rows): + print(f" WHERE age > 30: {len(rows)} rows") + for row in rows: + assert row[1] > 30, f"age {row[1]} should be > 30" + return f"Parquet WHERE filter returned {len(rows)} rows" + + run_query_with_handler( + conn_parquet, + "Parquet WHERE filter (age > 30)", + f'LOAD FROM "{PARQUET_FILE}" WHERE age > 30 RETURN fName, age;', + _filter, + ) + + conn_parquet.close() + db_parquet.close() + ok("Closed Parquet extension test database") + shutil.rmtree(db_path_parquet, ignore_errors=True) + + def run_json_extension_suite(db_json, conn_json, db_path_json): statements = [ ("LOAD JSON succeeded", "LOAD JSON;"), @@ -589,6 +653,32 @@ def _network_stats(): if db_json is not None and conn_json is not None: run_json_extension_suite(db_json, conn_json, db_path_json) +# ================================================================ +# 6. Extensions — Parquet Extension +# ================================================================ +section("6. Extensions — Parquet Extension (Install / Load / Query)") + +_run_ext_tests = os.environ.get("NEUG_RUN_EXTENSION_TESTS", "").strip().lower() +_run_ext_tests = _run_ext_tests in ("1", "true", "on", "yes") + +if not _run_ext_tests: + print(" (skipped: set NEUG_RUN_EXTENSION_TESTS=1 to run extension tests)") +else: + conn_parquet = None + db_path_parquet = tempfile.mkdtemp(prefix="neug_parquet_ext_") + try: + db_parquet = neug.Database(db_path_parquet) + conn_parquet = db_parquet.connect() + ok( + f"Created persistent database for Parquet extension test at {db_path_parquet}" + ) + except Exception as e: + fail("Create database for Parquet extension", e) + db_parquet = None + + if db_parquet is not None and conn_parquet is not None: + run_parquet_extension_suite(db_parquet, conn_parquet, db_path_parquet) + # ================================================================ # Summary # ================================================================ diff --git a/tools/python_bind/tests/test_load.py b/tools/python_bind/tests/test_load.py index 9e6b89b4b..465ab7026 100644 --- a/tools/python_bind/tests/test_load.py +++ b/tools/python_bind/tests/test_load.py @@ -67,6 +67,23 @@ def get_tinysnb_dataset_path(): return "/tmp/tinysnb" +def get_comprehensive_graph_path(): + """Get the path to comprehensive_graph dataset CSV files.""" + current_file = os.path.abspath(__file__) + tests_dir = os.path.dirname(current_file) + python_bind_dir = os.path.dirname(tests_dir) + tools_dir = os.path.dirname(python_bind_dir) + workspace_root = os.path.dirname(tools_dir) + + comprehensive_path = os.path.join( + workspace_root, "example_dataset", "comprehensive_graph" + ) + if os.path.exists(comprehensive_path): + return comprehensive_path + + return "/tmp/comprehensive_graph" + + class TestLoadFrom: """Test cases for LOAD FROM functionality with tinysnb dataset.""" @@ -957,6 +974,314 @@ def test_load_from_jsonl_with_complex_where_conditions(self): assert height > 1.0, f"height {height} should be > 1.0" assert isinstance(fname, str), "fName should be string" + @extension_test + def test_load_from_parquet_basic_return_all(self): + """Test basic LOAD FROM Parquet with RETURN *.""" + # load vertex data + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + # Load parquet extension + self.conn.execute("load parquet") + + query = f""" + LOAD FROM "{parquet_path}" + RETURN * + """ + result = self.conn.execute(query) + + records = list(result) + # vPerson.parquet should have 8 data rows (same as CSV/JSON) + assert len(records) == 8, f"Expected 8 records, got {len(records)}" + + # Check first record structure (should have all columns) + first_record = records[0] + assert len(first_record) == 16, f"Expected 16 columns, got {len(first_record)}" + + # load edge data + parquet_path = os.path.join(self.tinysnb_path, "parquet", "eMeets.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + # Load parquet extension + self.conn.execute("load parquet") + + query = f""" + LOAD FROM "{parquet_path}" + RETURN * + """ + result = self.conn.execute(query) + + records = list(result) + # eMeets.parquet should have 7 data rows + assert len(records) == 7, f"Expected 7 records, got {len(records)}" + + # Check first record structure (should have all columns) + first_record = records[0] + assert len(first_record) == 5, f"Expected 5 columns, got {len(first_record)}" + + @extension_test + def test_load_from_parquet_return_specific_columns(self): + """Test LOAD FROM Parquet with column projection.""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + query = f""" + LOAD FROM "{parquet_path}" + RETURN fName, age + """ + result = self.conn.execute(query) + + records = list(result) + assert len(records) == 8, f"Expected 8 records, got {len(records)}" + + # Check that only specified columns are returned + first_record = records[0] + assert len(first_record) == 2, "Should return only 2 columns" + assert isinstance(first_record[0], str), "fName should be string" + assert isinstance(first_record[1], int), "age should be integer" + + @extension_test + def test_load_from_parquet_with_where(self): + """Test LOAD FROM Parquet with WHERE clause filtering (predicate pushdown).""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + # Test with WHERE clause (predicate pushdown) + query = f""" + LOAD FROM "{parquet_path}" + WHERE age > 30 + RETURN fName, age + """ + result = self.conn.execute(query) + + records = list(result) + assert len(records) > 0, "Should return at least one record" + + # Verify all returned records satisfy the condition + for record in records: + fname, age = record + assert age > 30, f"Age {age} should be greater than 30" + assert isinstance(fname, str), "fName should be string" + + @extension_test + def test_load_from_parquet_with_multiple_where_conditions(self): + """Test LOAD FROM Parquet with multiple WHERE conditions.""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + # Test with multiple conditions: age > 25 AND age < 40 AND gender = 1 + query = f""" + LOAD FROM "{parquet_path}" + WHERE age > 25 AND age < 40 AND gender = 1 + RETURN fName, age, gender, eyeSight + """ + result = self.conn.execute(query) + + records = list(result) + assert len(records) > 0, "Should return at least one record" + + # Verify all returned records satisfy all conditions + for record in records: + fname, age, gender, eye_sight = record + assert 25 < age < 40, f"Age {age} should be between 25 and 40" + assert gender == 1, f"Gender {gender} should be 1" + assert isinstance(fname, str), "fName should be string" + assert isinstance(eye_sight, (int, float)), "eyeSight should be numeric" + + @extension_test + def test_load_from_parquet_with_order_by(self): + """Test LOAD FROM Parquet with ORDER BY clause.""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + # Test ORDER BY + query = f""" + LOAD FROM "{parquet_path}" + RETURN fName, age + ORDER BY age ASC + """ + result = self.conn.execute(query) + + records = list(result) + assert len(records) == 8, f"Expected 8 records, got {len(records)}" + + # Verify records are ordered by age ascending + ages = [record[1] for record in records] + assert ages == sorted(ages), f"Ages should be sorted ascending: {ages}" + + @extension_test + def test_load_from_parquet_with_complex_where_conditions(self): + """Test LOAD FROM Parquet with complex WHERE conditions (age, eyeSight, height).""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + # Test with multiple conditions: age >= 30 AND eyeSight >= 5.0 AND height > 1.0 + query = f""" + LOAD FROM "{parquet_path}" + WHERE age >= 30 AND eyeSight >= 5.0 AND height > 1.0 + RETURN fName, age, eyeSight, height + """ + result = self.conn.execute(query) + + records = list(result) + # May return 0 or more records depending on data + assert len(records) >= 0, "Should execute successfully" + + # Verify all returned records satisfy all conditions + for record in records: + fname, age, eye_sight, height = record + assert age >= 30, f"Age {age} should be >= 30" + assert eye_sight >= 5.0, f"eyeSight {eye_sight} should be >= 5.0" + assert height > 1.0, f"height {height} should be > 1.0" + assert isinstance(fname, str), "fName should be string" + + def test_load_from_comprehensive_graph_csv(self): + """Test LOAD FROM CSV auto-infers typed values without explicit CAST + using example_dataset/comprehensive_graph/node_a.csv (pipe-delimited). + Verifies NeuG correctly infers: INT32, INT64, UINT32, UINT64, FLOAT, + DOUBLE, STRING, DATE, TIMESTAMP, INTERVAL from raw CSV. + """ + comprehensive_path = get_comprehensive_graph_path() + p = os.path.join(comprehensive_path, "node_a.csv") + if not os.path.exists(p): + pytest.skip(f"node_a.csv not found: {p}") + + result = self.conn.execute( + f'LOAD FROM "{p}" (delim="|") ' + 'RETURN id, i32_property, i64_property, u32_property, CAST(u64_property, "UINT64") as u64_property, ' + "f32_property, f64_property, str_property, " + "date_property, datetime_property, interval_property " + "ORDER BY id LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 # id: INT64 + assert rows[0][1] == -123456789 # i32_property: INT32 + assert rows[0][2] == 9223372036854775807 # i64_property: INT64_MAX + assert rows[0][3] == 4294967295 # u32_property: UINT32 + assert rows[0][4] == 18446744073709551615 # u64_property: UINT64 + assert abs(rows[0][5] - 3.1415927) < 1e-6 # f32_property: FLOAT + assert abs(rows[0][6] - 2.718281828459045) < 1e-9 # f64_property: DOUBLE + assert str(rows[0][7]) == "test_string_0" # str_property: STRING + assert str(rows[0][8]) == "2023-01-15" # date_property: DATE + assert str(rows[0][9]).startswith( + "2023-01-15 00:00:00" + ) # datetime_property: TIMESTAMP + assert ( + str(rows[0][10]) == "1year2months3days4hours5minutes6seconds" + ) # interval_property: INTERVAL + + # --- rel_a.csv --- + # Row 0: node_a.id=0, node_a.id=3, double_weight=3.141593, i32_weight=42, + # i64_weight=-1234567890123456789, datetime_weight=2023-05-17 + # First two columns share name 'node_a.id'; rename them to avoid conflict. + p_rel = os.path.join(comprehensive_path, "rel_a.csv") + if not os.path.exists(p_rel): + pytest.skip(f"rel_a.csv not found: {p_rel}") + + result = self.conn.execute( + f'LOAD FROM "{p_rel}" (delim="|") ' + "RETURN f0 as src_id, f1 as dst_id, f2 as double_weight, f3 as i32_weight, " + "f4 as i64_weight, f5 as datetime_weight " + "LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 + assert rows[0][1] == 3 + assert abs(rows[0][2] - 3.141593) < 1e-9 # double_weight: DOUBLE + assert rows[0][3] == 42 # i32_weight: INT32 + assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 + assert str(rows[0][5]) == "2023-05-17" # datetime_weight: DATE + + @extension_test + def test_load_from_comprehensive_graph_parquet(self): + """Test LOAD FROM Parquet covers all NeuG-supported data types + using example_dataset/comprehensive_graph/parquet/. + node_a covers: INT64, INT32, UINT32, UINT64, FLOAT, DOUBLE, STRING, DATE, DATETIME. + rel_a covers: edge src/dst IDs plus DOUBLE, INT32, INT64, DATETIME edge properties. + interval_property is excluded from node files: no standard Parquet type maps to NeuG INTERVAL. + Run example_dataset/comprehensive_graph/csv_to_parquet.py to generate parquet/ first. + """ + comprehensive_path = get_comprehensive_graph_path() + parquet_dir = os.path.join(comprehensive_path, "parquet") + if not os.path.exists(parquet_dir): + pytest.skip( + f"parquet/ dir not found: {parquet_dir}. " + "Run example_dataset/comprehensive_graph/csv_to_parquet.py to generate." + ) + + self.conn.execute("load parquet") + + # --- node_a.parquet --- + # 11 rows; row 0 from node_a.csv: + # id=0, i32_property=-123456789, i64_property=9223372036854775807(INT64_MAX), + # u32_property=4294967295, u64_property=18446744073709551615, + # f32_property=3.1415927, f64_property=2.718281828459045, + # str_property=test_string_0, date_property=2023-01-15, datetime_property=2023-01-15, + # interval_property=1year2months3days4hours5minutes6seconds (stored as Parquet STRING) + p = os.path.join(parquet_dir, "node_a.parquet") + result = self.conn.execute( + f'LOAD FROM "{p}" ' + f"RETURN id, i32_property, i64_property, u32_property, u64_property, " + f"f32_property, f64_property, str_property, date_property, datetime_property, interval_property " + f"LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 # id: INT64 + assert rows[0][1] == -123456789 # i32_property: INT32 + assert rows[0][2] == 9223372036854775807 # i64_property: INT64_MAX + assert rows[0][3] == 4294967295 # u32_property: UINT32 + assert rows[0][4] == 18446744073709551615 # u64_property: UINT64_MAX + assert abs(rows[0][5] - 3.1415927) < 1e-6 # f32_property: FLOAT32 + assert abs(rows[0][6] - 2.718281828459045) < 1e-9 # f64_property: DOUBLE + assert str(rows[0][7]) == "test_string_0" # str_property: STRING + assert str(rows[0][8]) == "2023-01-15" # date_property: DATE + assert str(rows[0][9]).startswith( + "2023-01-15 00:00:00" + ) # datetime_property: TIMESTAMP + assert ( + str(rows[0][10]) == "1year2months3days4hours5minutes6seconds" + ) # interval_property: STRING for now + + # --- rel_a.parquet --- + # 10 rows; row 0 from rel_a.csv: + # src_id=0, dst_id=3, double_weight=3.141593, i32_weight=42, + # i64_weight=-1234567890123456789, datetime_weight=2023-05-17 + p = os.path.join(parquet_dir, "rel_a.parquet") + result = self.conn.execute( + f'LOAD FROM "{p}" ' + f"RETURN src_id, dst_id, double_weight, i32_weight, i64_weight, datetime_weight " + f"ORDER BY src_id LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 and rows[0][1] == 3 # src_id, dst_id: INT64 + assert abs(rows[0][2] - 3.141593) < 1e-9 # double_weight: DOUBLE + assert rows[0][3] == 42 # i32_weight: INT32 + assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 + assert str(rows[0][5]).startswith( + "2023-05-17 00:00:00" + ) # datetime_weight: TIMESTAMP + class TestCopyFrom: """Test cases for COPY FROM functionality with schema creation and data verification.""" @@ -1291,3 +1616,339 @@ def test_copy_from_node_reordered_all_columns(self): assert records[0][0] == 0 and records[0][2] == "Alice" and records[0][1] == 35 assert records[1][0] == 2 and records[1][2] == "Bob" and records[1][1] == 30 assert records[2][0] == 3 and records[2][2] == "Carol" and records[2][1] == 45 + + @extension_test + def test_copy_from_node_parquet_with_column_remapping(self): + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + create_schema = """ + CREATE NODE TABLE person_parquet_remap ( + ID INT64, + age INT64, + fName STRING, + gender INT64, + eyeSight DOUBLE, + isStudent BOOLEAN, + PRIMARY KEY (ID) + ) + """ + + self.conn.execute(create_schema) + + self.conn.execute("load parquet") + + copy_query = f""" + COPY person_parquet_remap FROM ( + LOAD FROM "{parquet_path}" + RETURN ID, age, fName, gender, eyeSight, isStudent + ) + """ + self.conn.execute(copy_query) + + query = "MATCH (p:person_parquet_remap) RETURN p.ID, p.age, p.fName, p.gender, p.eyeSight ORDER BY p.ID LIMIT 3" + result = self.conn.execute(query) + + records = list(result) + + assert len(records) >= 3, "Should have loaded at least 3 persons" + # Verify first record (ID=0, Alice, age=35, eyeSight=5.0) + assert records[0][0] == 0, "First person ID should be 0" + assert records[0][1] == 35, "Alice's age should be 35" + assert records[0][2] == "Alice", "First person name should be Alice" + assert records[0][3] == 1, "Alice's gender should be 1" + assert records[0][4] == 5.0, "Alice's eyeSight should be 5.0" + + @extension_test + def test_copy_from_edge_parquet_with_column_remapping(self): + """Test COPY FROM for edge table with column remapping using Parquet files.""" + person_parquet = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + meets_parquet = os.path.join(self.tinysnb_path, "parquet", "eMeets.parquet") + if not os.path.exists(person_parquet) or not os.path.exists(meets_parquet): + pytest.skip("Parquet files not found") + + # Load parquet extension + self.conn.execute("load parquet") + + # Create node table schema + create_person_schema = """ + CREATE NODE TABLE person ( + ID INT64, + fName STRING, + gender INT64, + age INT64, + PRIMARY KEY (ID) + ) + """ + self.conn.execute(create_person_schema) + + # Create edge table schema + # Parquet file order: from, to, location, times, data + # Schema order: from, to, times, location, data (different order) + create_meets_schema = """ + CREATE REL TABLE meets ( + FROM person TO person, + times INT64, + location STRING, + data STRING + ) + """ + self.conn.execute(create_meets_schema) + + # Copy person nodes first + copy_person = f""" + COPY person FROM ( + LOAD FROM "{person_parquet}" + RETURN ID, fName, gender, age + ) + """ + self.conn.execute(copy_person) + + # Copy meets edges with column remapping + # Parquet: from, to, location, times, data + # Schema expects: from, to, times, location, data + copy_meets = f""" + COPY meets FROM ( + LOAD FROM "{meets_parquet}" + RETURN from, to, times, location, data + ) + """ + self.conn.execute(copy_meets) + + # Verify data with MATCH query + query = """ + MATCH (a:person)-[m:meets]->(b:person) + RETURN a.ID, b.ID, m.times, m.location + ORDER BY a.ID, b.ID + LIMIT 3 + """ + result = self.conn.execute(query) + records = list(result) + + assert len(records) > 0, "Should have loaded at least one meets relationship" + # Verify first relationship (0->2, Alice meets Bob) + assert records[0][0] == 0, "Source person ID should be 0" + assert records[0][1] == 2, "Target person ID should be 2" + assert records[0][2] == 5, "Times should be 5" + assert records[0][3] is not None, "Location should not be None" + + def test_copy_from_comprehensive_graph_csv(self): + """Test COPY FROM CSV using comprehensive_graph node_a.csv (node) and rel_a.csv (edge). + Covers all NeuG types: INT32, INT64, UINT32, UINT64, FLOAT, DOUBLE, + STRING, DATE, TIMESTAMP, INTERVAL for nodes; + DOUBLE, INT32, INT64, TIMESTAMP for edges. + """ + comprehensive_path = get_comprehensive_graph_path() + node_csv = os.path.join(comprehensive_path, "node_a.csv") + rel_csv = os.path.join(comprehensive_path, "rel_a.csv") + if not os.path.exists(node_csv): + pytest.skip(f"node_a.csv not found: {node_csv}") + + # --- node: CREATE + COPY --- + self.conn.execute( + """ + CREATE NODE TABLE cg_node_a ( + id INT64, + i32_property INT32, + i64_property INT64, + u32_property UINT32, + u64_property UINT64, + f32_property FLOAT, + f64_property DOUBLE, + str_property STRING, + date_property DATE, + datetime_property TIMESTAMP, + interval_property INTERVAL, + PRIMARY KEY (id) + ) + """ + ) + self.conn.execute( + f""" + COPY cg_node_a FROM ( + LOAD FROM "{node_csv}" (delim="|") + RETURN id, CAST(i32_property, 'INT32') as i32_property, + i64_property, CAST(u32_property, 'UINT32') as u32_property, CAST(u64_property, 'UINT64') as u64_property, + CAST(f32_property, 'FLOAT') as f32_property, f64_property, str_property, + date_property, datetime_property, interval_property + ) + """ + ) + + result = self.conn.execute( + "MATCH (n:cg_node_a) WHERE n.id = 0 " + "RETURN n.id, n.i32_property, n.i64_property, n.u32_property, n.u64_property, " + "n.f32_property, n.f64_property, n.str_property, n.date_property, " + "n.datetime_property, n.interval_property" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 # id: INT64 + assert rows[0][1] == -123456789 # i32_property: INT32 + assert rows[0][2] == 9223372036854775807 # i64_property: INT64_MAX + assert rows[0][3] == 4294967295 # u32_property: UINT32 + assert ( + abs(rows[0][4] - 1.8446744073709552e19) < 1.0 + ) # u64_property: UINT64_MAX (as float) + assert abs(rows[0][5] - 3.1415927) < 1e-6 # f32_property: FLOAT + assert abs(rows[0][6] - 2.718281828459045) < 1e-9 # f64_property: DOUBLE + assert rows[0][7] == "test_string_0" # str_property: STRING + assert str(rows[0][8]) == "2023-01-15" # date_property: DATE + assert str(rows[0][9]).startswith("2023-01-15") # datetime_property: TIMESTAMP + assert rows[0][10] is not None # interval_property: INTERVAL + + # --- edge: CREATE + COPY --- + if not os.path.exists(rel_csv): + return + self.conn.execute( + """ + CREATE REL TABLE cg_rel_a ( + FROM cg_node_a TO cg_node_a, + double_weight DOUBLE, + i32_weight INT32, + i64_weight INT64, + datetime_weight TIMESTAMP + ) + """ + ) + # rel_a.csv has duplicate 'node_a.id' column names; rename them to avoid conflict + # need some explicit casting (e.g., f3 to INT32, otherwise, it will be imported as int64) + self.conn.execute( + f""" + COPY cg_rel_a FROM ( + LOAD FROM "{rel_csv}" (delim="|") + RETURN f0 as src, f1 as dst, + f2 as double_weight, CAST(f3, 'INT32') as i32_weight, + f4 as i64_weight, CAST(f5, 'TIMESTAMP') as datetime_weight + ) + """ + ) + + result = self.conn.execute( + "MATCH (a:cg_node_a)-[r:cg_rel_a]->(b:cg_node_a) WHERE a.id = 0 " + "RETURN a.id, b.id, r.double_weight, r.i32_weight, r.i64_weight, r.datetime_weight " + "LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 and rows[0][1] == 3 # src_id=0, dst_id=3 + assert abs(rows[0][2] - 3.141593) < 1e-9 # double_weight: DOUBLE + assert rows[0][3] == 42 # i32_weight: INT32 + assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 + assert str(rows[0][5]).startswith( + "2023-05-17 00:00:00" + ) # datetime_weight: TIMESTAMP + + @extension_test + def test_copy_from_comprehensive_graph_parquet(self): + """Test COPY FROM Parquet using comprehensive_graph node_a.parquet (node) + and rel_a.parquet (edge). + interval_property is stored as STRING in Parquet (no native Parquet interval type). + Run example_dataset/comprehensive_graph/csv_to_parquet.py first. + """ + comprehensive_path = get_comprehensive_graph_path() + parquet_dir = os.path.join(comprehensive_path, "parquet") + node_parquet = os.path.join(parquet_dir, "node_a.parquet") + rel_parquet = os.path.join(parquet_dir, "rel_a.parquet") + if not os.path.exists(parquet_dir): + pytest.skip( + f"parquet/ not found: {parquet_dir}. " + "Run example_dataset/comprehensive_graph/csv_to_parquet.py to generate." + ) + + self.conn.execute("load parquet") + + # --- node: CREATE + COPY --- + # interval_property is not supported yet. Defined it as STRING + self.conn.execute( + """ + CREATE NODE TABLE cg_node_a ( + id INT64, + i32_property INT32, + i64_property INT64, + u32_property UINT32, + u64_property UINT64, + f32_property FLOAT, + f64_property DOUBLE, + str_property STRING, + date_property DATE, + datetime_property TIMESTAMP, + interval_property STRING, + PRIMARY KEY (id) + ) + """ + ) + self.conn.execute( + f""" + COPY cg_node_a FROM ( + LOAD FROM "{node_parquet}" + RETURN id, i32_property, i64_property, u32_property, u64_property, + f32_property, f64_property, str_property, + date_property, datetime_property, interval_property + ) + """ + ) + + result = self.conn.execute( + "MATCH (n:cg_node_a) WHERE n.id = 0 " + "RETURN n.id, n.i32_property, n.i64_property, n.u32_property, n.u64_property, " + "n.f32_property, n.f64_property, n.str_property, n.date_property, " + "n.datetime_property, n.interval_property" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 # id: INT64 + assert rows[0][1] == -123456789 # i32_property: INT32 + assert rows[0][2] == 9223372036854775807 # i64_property: INT64_MAX + assert rows[0][3] == 4294967295 # u32_property: UINT32 + assert ( + abs(rows[0][4] - 1.8446744073709552e19) < 1.0 + ) # u64_property: UINT64_MAX (as float) + assert abs(rows[0][5] - 3.1415927) < 1e-6 # f32_property: FLOAT + assert abs(rows[0][6] - 2.718281828459045) < 1e-9 # f64_property: DOUBLE + assert rows[0][7] == "test_string_0" # str_property: STRING + assert str(rows[0][8]) == "2023-01-15" # date_property: DATE + assert str(rows[0][9]).startswith( + "2023-01-15 00:00:00" + ) # datetime_property: TIMESTAMP + assert ( + rows[0][10] == "1year2months3days4hours5minutes6seconds" + ) # interval_property: STRING + + # --- edge: CREATE + COPY --- + self.conn.execute( + """ + CREATE REL TABLE cg_rel_a ( + FROM cg_node_a TO cg_node_a, + double_weight DOUBLE, + i32_weight INT32, + i64_weight INT64, + datetime_weight TIMESTAMP + ) + """ + ) + # rel_a.parquet has src_id / dst_id (renamed from node_a.id during preprocessing) + self.conn.execute( + f""" + COPY cg_rel_a FROM ( + LOAD FROM "{rel_parquet}" + RETURN src_id, dst_id, double_weight, i32_weight, i64_weight, datetime_weight + ) + """ + ) + + result = self.conn.execute( + "MATCH (a:cg_node_a)-[r:cg_rel_a]->(b:cg_node_a) WHERE a.id = 0 " + "RETURN a.id, b.id, r.double_weight, r.i32_weight, r.i64_weight, r.datetime_weight " + "LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 and rows[0][1] == 3 # src_id=0, dst_id=3 + assert abs(rows[0][2] - 3.141593) < 1e-9 # double_weight: DOUBLE + assert rows[0][3] == 42 # i32_weight: INT32 + assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 + assert str(rows[0][5]).startswith( + "2023-05-17 00:00:00" + ) # datetime_weight: TIMESTAMP