From 690bc59bd7cef8c7bb844223c2436cad17473f8a Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Fri, 13 Mar 2026 15:32:40 +0800 Subject: [PATCH 01/13] support for reading parquet in extension --- CMakeLists.txt | 6 + cmake/BuildArrowAsThirdParty.cmake | 60 +- doc/source/extensions/load_parquet.md | 80 ++ .../comprehensive_graph/csv_to_parquet.py | 122 +++ .../parquet/node_a.parquet | Bin 0 -> 5075 bytes .../comprehensive_graph/parquet/rel_a.parquet | Bin 0 -> 2860 bytes .../tinysnb/parquet/eMeets.parquet | Bin 0 -> 4412 bytes .../tinysnb/parquet/vPerson.parquet | Bin 0 -> 12392 bytes extension/CMakeLists.txt | 2 + extension/parquet/CMakeLists.txt | 46 + extension/parquet/include/parquet_options.h | 99 ++ .../parquet/include/parquet_read_function.h | 108 +++ extension/parquet/src/parquet_extension.cpp | 49 + extension/parquet/src/parquet_options.cc | 144 +++ extension/parquet/tests/parquet_test.cpp | 866 ++++++++++++++++++ tools/python_bind/example/complex_test.py | 83 ++ tools/python_bind/tests/test_load.py | 671 ++++++++++++++ 17 files changed, 2333 insertions(+), 3 deletions(-) create mode 100644 doc/source/extensions/load_parquet.md create mode 100644 example_dataset/comprehensive_graph/csv_to_parquet.py create mode 100644 example_dataset/comprehensive_graph/parquet/node_a.parquet create mode 100644 example_dataset/comprehensive_graph/parquet/rel_a.parquet create mode 100644 example_dataset/tinysnb/parquet/eMeets.parquet create mode 100644 example_dataset/tinysnb/parquet/vPerson.parquet create mode 100644 extension/parquet/CMakeLists.txt create mode 100644 extension/parquet/include/parquet_options.h create mode 100644 extension/parquet/include/parquet_read_function.h create mode 100644 extension/parquet/src/parquet_extension.cpp create mode 100644 extension/parquet/src/parquet_options.cc create mode 100644 extension/parquet/tests/parquet_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index a8857bf8c..2a5391b49 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -593,6 +593,12 @@ if(BUILD_EXTENSIONS AND "json" IN_LIST BUILD_EXTENSIONS) message(STATUS "Arrow JSON support enabled for json extension") endif() +# Configure Arrow Parquet support if parquet extension is enabled +if(BUILD_EXTENSIONS AND "parquet" IN_LIST BUILD_EXTENSIONS) + set(ARROW_PARQUET ON CACHE BOOL "" FORCE) + message(STATUS "Arrow Parquet support enabled for parquet extension") +endif() + set(NEUG_USE_SYSTEM_ARROW OFF) if(DEFINED ENV{CI} AND "$ENV{CI}" STREQUAL "ON") set(NEUG_USE_SYSTEM_ARROW ON) diff --git a/cmake/BuildArrowAsThirdParty.cmake b/cmake/BuildArrowAsThirdParty.cmake index 3a9011d61..ea70cfa99 100644 --- a/cmake/BuildArrowAsThirdParty.cmake +++ b/cmake/BuildArrowAsThirdParty.cmake @@ -52,6 +52,9 @@ function(build_arrow_as_third_party) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=uninitialized") endif() set(CMAKE_POSITION_INDEPENDENT_CODE ON) + # Thrift (Arrow-parquet dependency) emits these warnings + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=unused-function") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=stringop-truncation") set(ARROW_BUILD_SHARED OFF CACHE BOOL "" FORCE) set(ARROW_BUILD_STATIC ON CACHE BOOL "" FORCE) @@ -72,13 +75,23 @@ function(build_arrow_as_third_party) if(NOT DEFINED ARROW_JSON) set(ARROW_JSON OFF CACHE BOOL "" FORCE) endif() - set(ARROW_PARQUET OFF CACHE BOOL "" FORCE) + # ARROW_PARQUET is set by the main CMakeLists.txt if parquet extension is enabled + if(NOT DEFINED ARROW_PARQUET) + set(ARROW_PARQUET OFF CACHE BOOL "" FORCE) + endif() + # Enable Snappy and Zlib for Parquet if needed, otherwise disable them + if(ARROW_PARQUET) + set(ARROW_WITH_SNAPPY ON CACHE BOOL "" FORCE) + set(ARROW_WITH_ZLIB ON CACHE BOOL "" FORCE) + else() + set(ARROW_WITH_SNAPPY OFF CACHE BOOL "" FORCE) + set(ARROW_WITH_ZLIB OFF CACHE BOOL "" FORCE) + endif() set(ARROW_PLASMA OFF CACHE BOOL "" FORCE) set(ARROW_PYTHON OFF CACHE BOOL "" FORCE) set(ARROW_S3 OFF CACHE BOOL "" FORCE) set(ARROW_WITH_BZ2 OFF CACHE BOOL "" FORCE) set(ARROW_WITH_LZ4 OFF CACHE BOOL "" FORCE) - set(ARROW_WITH_SNAPPY OFF CACHE BOOL "" FORCE) set(ARROW_WITH_ZSTD OFF CACHE BOOL "" FORCE) set(ARROW_WITH_BROTLI OFF CACHE BOOL "" FORCE) set(ARROW_IPC ON CACHE BOOL "" FORCE) @@ -104,7 +117,6 @@ function(build_arrow_as_third_party) # Point Arrow to use the project's RapidJSON set(RapidJSON_ROOT "${CMAKE_SOURCE_DIR}/third_party/rapidjson" CACHE PATH "" FORCE) endif() - set(ARROW_WITH_ZLIB OFF CACHE BOOL "" FORCE) set(ARROW_ENABLE_THREADING ON CACHE BOOL "" FORCE) # Save original flags and set flags to suppress warnings for Arrow build @@ -214,6 +226,33 @@ function(build_arrow_as_third_party) include_directories(${arrow_SOURCE_DIR}/cpp/src ${arrow_BINARY_DIR}/src) + # Try different possible Arrow Parquet target names + if(TARGET Arrow::parquet_static) + message(STATUS "Found Arrow::parquet_static target") + set(ARROW_PARQUET_LIB Arrow::parquet_static) + elseif(TARGET arrow_parquet_static) + message(STATUS "Found arrow_parquet_static target") + set(ARROW_PARQUET_LIB arrow_parquet_static) + elseif(TARGET parquet_static) + message(STATUS "Found parquet_static target") + set(ARROW_PARQUET_LIB parquet_static) + elseif(TARGET Arrow::parquet) + message(STATUS "Found Arrow::parquet target (using as fallback)") + set(ARROW_PARQUET_LIB Arrow::parquet) + elseif(TARGET parquet) + message(STATUS "Found parquet target (using as fallback)") + set(ARROW_PARQUET_LIB parquet) + else() + message(WARNING "Arrow parquet target not found. Parquet symbols may be unresolved.") + set(ARROW_PARQUET_LIB "") + endif() + + if(ARROW_PARQUET_LIB) + list(APPEND ARROW_LIB ${ARROW_PARQUET_LIB}) + set(ARROW_LIB ${ARROW_LIB} PARENT_SCOPE) + set(ARROW_PARQUET_LIB ${ARROW_PARQUET_LIB} PARENT_SCOPE) + endif() + # Set additional Arrow variables for compatibility set(ARROW_FOUND TRUE PARENT_SCOPE) set(ARROW_LIBRARIES ${ARROW_LIB} PARENT_SCOPE) @@ -251,6 +290,21 @@ function(build_arrow_as_third_party) FILES_MATCHING PATTERN "*.h" PATTERN "test" EXCLUDE PATTERN "testing" EXCLUDE) + + # Install Parquet headers if Parquet is enabled + if(ARROW_PARQUET) + install(DIRECTORY ${arrow_SOURCE_DIR}/cpp/src/parquet + DESTINATION include + FILES_MATCHING PATTERN "*.h" + PATTERN "test" EXCLUDE + PATTERN "testing" EXCLUDE) + + install(DIRECTORY ${arrow_BINARY_DIR}/src/parquet + DESTINATION include + FILES_MATCHING PATTERN "*.h" + PATTERN "test" EXCLUDE + PATTERN "testing" EXCLUDE) + endif() else() # list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Arrow) diff --git a/doc/source/extensions/load_parquet.md b/doc/source/extensions/load_parquet.md new file mode 100644 index 000000000..8e1717a01 --- /dev/null +++ b/doc/source/extensions/load_parquet.md @@ -0,0 +1,80 @@ +# Parquet Extension + +Apache Parquet is a columnar storage format widely used in data engineering and analytics workloads. NeuG supports Parquet file import functionality through the Extension framework. After loading the Parquet Extension, users can directly load external Parquet files using the `LOAD FROM` syntax. + +## Install Extension + +```cypher +INSTALL PARQUET; +``` + +## Load Extension + +```cypher +LOAD PARQUET; +``` + +## Using Parquet Extension + +`LOAD FROM` reads Parquet files and exposes their columns for querying. Schema is automatically inferred from the Parquet file metadata by default. + +### Parquet Format Options + +The following options control how Parquet files are read: + +| Option | Type | Default | Description | +| ---------------------- | ----- | ------- | ---------------------------------------------------------------------------------------------------- | +| `use_embedded_schema` | bool | `true` | Use the schema embedded in the Parquet file metadata. Set to `false` to infer schema independently. | +| `buffered_stream` | bool | `true` | Enable buffered I/O stream for improved sequential read performance. | +| `pre_buffer` | bool | `false` | Pre-buffer column data before decoding. Recommended for high-latency filesystems such as S3. | +| `cache_decompressed` | bool | `true` | Cache decompressed column chunks to accelerate repeated reads of the same data. | +| `parquet_batch_rows` | int64 | `65536` | Number of rows per Arrow record batch when converting Parquet row groups into in-memory batches. | + +### Query Examples + +#### Basic Parquet Loading + +Load all columns from a Parquet file: + +```cypher +LOAD FROM "person.parquet" +RETURN *; +``` + +#### Specifying Batch Size + +Tune memory usage by adjusting the number of rows read per batch: + +```cypher +LOAD FROM "person.parquet" (parquet_batch_rows=8192) +RETURN *; +``` + +#### Disabling Embedded Schema + +Force schema inference instead of using the schema stored in Parquet metadata: + +```cypher +LOAD FROM "person.parquet" (use_embedded_schema=false) +RETURN *; +``` + +#### Column Projection + +Return only specific columns from Parquet data: + +```cypher +LOAD FROM "person.parquet" +RETURN fName, age; +``` + +#### Column Aliases + +Use `AS` to assign aliases to columns: + +```cypher +LOAD FROM "person.parquet" +RETURN fName AS name, age AS years; +``` + +> **Note:** All relational operations supported by `LOAD FROM` — including type conversion, WHERE filtering, aggregation, sorting, and limiting — work the same way with Parquet files. See the [LOAD FROM reference](../data_io/load_data) for the complete list of operations. diff --git a/example_dataset/comprehensive_graph/csv_to_parquet.py b/example_dataset/comprehensive_graph/csv_to_parquet.py new file mode 100644 index 000000000..7dbfa89c9 --- /dev/null +++ b/example_dataset/comprehensive_graph/csv_to_parquet.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# Copyright 2020 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Preprocess comprehensive_graph CSV files into Parquet format. + +Outputs are written to example_dataset/comprehensive_graph/parquet/. + +Notes: + - interval_property is kept as a Parquet STRING column (pyarrow CSV inference). + NeuG's INTERVAL text format ("1year2months3days...") has no writable Parquet + native interval equivalent via pyarrow, so string is the simplest lossless form. + - All other columns (INT32/INT64/UINT32/UINT64/FLOAT/DOUBLE/STRING/DATE/DATETIME) + are preserved with explicitly typed Arrow columns. +""" + +import os +import sys + +try: + import pyarrow as pa + import pyarrow.csv as pa_csv + import pyarrow.parquet as pq +except ImportError: + print("Error: pyarrow is required. Install with: pip install pyarrow") + sys.exit(1) + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +OUTPUT_DIR = os.path.join(SCRIPT_DIR, "parquet") + +# Edge files: all have duplicate src/dst column both named 'node_a.id'. +# Rename the first two columns to src_id / dst_id for uniqueness in Parquet. +EDGE_FILES = { + "rel_a.csv", +} + +# All CSV files to convert +CSV_FILES = [ + "node_a.csv", + "rel_a.csv", +] + + +def convert(csv_filename: str) -> None: + csv_path = os.path.join(SCRIPT_DIR, csv_filename) + parquet_filename = os.path.splitext(csv_filename)[0] + ".parquet" + parquet_path = os.path.join(OUTPUT_DIR, parquet_filename) + + read_opts = pa_csv.ReadOptions() + parse_opts = pa_csv.ParseOptions(delimiter="|") + # timestamp_parsers: let Arrow auto-detect date/datetime columns. + # Explicitly type columns where CSV auto-inference picks wrong types: + # - i32_property: inferred as int64 (pyarrow defaults all integers to int64) + # - u32/u64_property: inferred as double (values overflow int64) + # - f32_property: inferred as double (pyarrow has no float32 inference) + node_column_types = { + "i32_property": pa.int32(), + "u32_property": pa.uint32(), + "u64_property": pa.uint64(), + "f32_property": pa.float32(), + } if csv_filename in ("node_a.csv", "node_b.csv") else {} + edge_column_types = { + "i32_weight": pa.int32(), + # datetime_weight values are date-only strings ("2023-05-17"); pyarrow infers + # date32[day] by default, but DT_DATETIME requires timestamp[ms]. + "datetime_weight": pa.timestamp("ms"), + } if csv_filename in EDGE_FILES else {} + column_types = {**node_column_types, **edge_column_types} + convert_opts = pa_csv.ConvertOptions( + timestamp_parsers=["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"], + column_types=column_types, + ) + + table = pa_csv.read_csv( + csv_path, + read_options=read_opts, + parse_options=parse_opts, + convert_options=convert_opts, + ) + + # Rename duplicate src/dst columns in edge files + if csv_filename in EDGE_FILES: + old_names = table.schema.names + new_names = ["src_id" if i == 0 else "dst_id" if i == 1 else n + for i, n in enumerate(old_names)] + table = table.rename_columns(new_names) + print(f" renamed src/dst columns: {old_names[:2]} -> ['src_id', 'dst_id']") + + # interval_property is kept as-is: pyarrow CSV infers it as string + pq.write_table(table, parquet_path) + print(f" {csv_filename} -> parquet/{parquet_filename} ({table.num_rows} rows, {len(table.schema)} cols)") + + +def main(): + os.makedirs(OUTPUT_DIR, exist_ok=True) + print(f"Output directory: {OUTPUT_DIR}\n") + + for csv_file in CSV_FILES: + csv_path = os.path.join(SCRIPT_DIR, csv_file) + if not os.path.exists(csv_path): + print(f"[SKIP] {csv_file} not found") + continue + print(f"Converting {csv_file} ...") + convert(csv_file) + + print("\nDone.") + + +if __name__ == "__main__": + main() diff --git a/example_dataset/comprehensive_graph/parquet/node_a.parquet b/example_dataset/comprehensive_graph/parquet/node_a.parquet new file mode 100644 index 0000000000000000000000000000000000000000..4510e6d9fde79caae09f0018c799d1c51e824dcc GIT binary patch literal 5075 zcmcgw3v5%@89w*sIyk8_oW$3=2B!<9xeY48uRNh^e4Tf2ieu$Lf?5c+iA_l2*iK9` zb!2FxjHcZ~oz{hpQVF4yrX3BV>KGkpMeA0TF6~xUu`+c~MNCW-bx{Xx)$IR|?Zl6p zj!6qgzUTR$@BH8Y|K}k)B&Uhz_`?kEw(}aEYvC$K&@&uTT=_gU>>Ry@Fn%Krskw-(3|o+Xf}p&$g>4!$MEweXp1<|8*>_Wo@> zIcUB-@X3~SOsVG_e}2odA7mYQhj`BCRXnxr*4!V)H>7anuvg$v9=OG|4i9~BxG4S;Ja}qM#w?tG8k?T$zX0+NBQsk z+*HQFqYOlNv-p^R@tIAheoM?lKEI3&_r4&b( zfs0cy7gLI0B1e?CnLd#)#QEk&O~ z5;)+3NhVHjs^WY)nC#QH8SOM9{#YeW+@()KlhqmgxIf;Hun9>iLp~fOWVU+mk6bnH zW`5(^*z+wf4CuF-Un;xN^TxpB$zMEe9(|Kd)b@t(`A{L6_;m9MQ_ zZWRKiGq0`uW91VM=bk;7KmOncAd^f(;ILQVPzGWmt|jlKbGdq68QXa~lD!n!dB4jM zFJ!~cTNGKA4tVUD1&_VzYu7xUc}8J9zhTdgLx(@D)VOWNqMb#~zugXXzt&EMp)`-di zd6`~$BfR}}J)b#mgk*oh*1O1x$F<_D2DaX9uOw@fKePPad7~U-tXZH?$P)~04Tb`~ zzV5BY2eSz=VP?k6f>|+UC76|BR)$$QW);cKhiJ=+N2?GLnnro{Z?4qysu+9FBk|z* zEKNJfif31dhjbXt34kg1vYZp*dU7KnjdnubgRi!UkiAgeguYCuw?Tad>c^l=rY3MG zEpRv=<9eYbPipq=h0`a?-4*gAvWf)ur+MPOJP7Osuq0?%J`mVP=Y8n7v58FoYU+D% z969cHUEUAn*(vMs$DtHPH~sqwD9OyjX$PRZx;k(DW7vPA`L|`}8{~mqJ|`LxOWDx)$gu(<^U^1FIKKt86qy$k&@>4hZp&Sn;2$#H0Bb;!(x_ z>vFQ@*_XzjVq^Eq>&(^rKr7R*rz>FY?e7bDgBEw!aIo0hKNtv>^!oY+Az({`9#?;# zJD3il78DKQz9{Wg7&%i3V^XW2WoBx*ph;0jc&3dQGqSTgh>=-iRKrklIYqM=v$hch zQ^c%gYDpO*awfCl9+Jc8viB)1#$d){%%es#MYlvWNj5Zux2g=aGnpx%z?+Oz-V{^a zh@-Hg%yoC6aA6l^`}#tjz);t&#GpIHX+KF5e{-Ms(Y?5pZfh+$6YJh8Awqi4<@NM- z@mhGa6|ndcgqIvt5}r?yv4qz|R4V>LE?8lo%*}_#ZB#t|Mk~<6n3awx5k3BjAbck6 ziaRg~ufneQQ>1{{^$3m1CGNxmm3>!i(bT{!$gz**=y57cM$}ZRMxnJDonCi(Ngu}I z61`BxJM`)k|BtURaS5J3;u5_>#5;7gi%WWmic58yiFfjD5bxL#seMbY)rd;%*a{6H zI=!HhoX$<{>mw(oFuBZtsjRz7lB8z%Fm05iuw?6&Y9*;#s_LeFHk%|hVGRTuu!MH3 zgjNuHT0tDfp}Q#z!&S|Q*e=z;5Vos(a9#kjdb(6OR$)MrHtno17_eDW3(|BdHn2*Q z4XZZ)_CDt>S6`=hdu>J7)mstn+}vpNAXME>AW`lz3>ge4SmmX&=z0{NHYFz(oprUw z&dpB$_V%_NuKLCyS6$n%yBp8vdy5#w`UYw6> zNDVFbej#i zWCS=+dp1aA^L&~ppGGP`tF%^)d0ody)2>9Dg8zYf2gX4pHA_8Fel{s$Us-+JEsN<> zL2=q0fI{utu(&?;t&8Z>9MK1(72@l5^y!k*r(q;&AIV0QXrl20m^D?A_@k;W#9zc; z{-%*sf9;O(k!q+tU{Wi(3hY*o`v?*!P4OU$;@fC{96m;w&EE+*Py@fh{BUG{(V2h< z>6=8z_jd&b20fu7m)~F16$tc)^M^cvps&9#-(+5ATxZmshF8K@|41qP>E?jH^Zo-z CIJmz6 literal 0 HcmV?d00001 diff --git a/example_dataset/comprehensive_graph/parquet/rel_a.parquet b/example_dataset/comprehensive_graph/parquet/rel_a.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9a2e0bab876cefb28d32f377744fa9dd52884a00 GIT binary patch literal 2860 zcmcguO-vg{6rNqyX6;~bkjGkAd!SLXL2UxY4iQwTVAlKwLmd~zz@f-s11=^QLolL$ zDTjtokfI7zL`e`r9D3-X2ZYq0h^UR4ibGWO&;wDVHngdtN~tPU6>U=7H|yQ7u;$PM zjF@>dZ{ECl-}h#p36NkJ=5PZ0{Wu?^LWC+1vN4FU@rZ@aK{wm@(>W;9Osd2Pi@4}G zc3?X;W3wpmW_3s4)d_N<;w)O&%|FS)=wNb8lXW%a2uV|{h&fle9hqH%c{thK8H-@n z>2NNx&IwjZNc{it?>Y{E5M~h!b`S_D00YkeayiJtpj=BXhfDyEhi(CgoEo4uCZJ+T(_-Ag?g^3ggFxN*)^) z-o5$N#FOVO)z^leTePm4Zhi6D=DqoazpAR6LZ8lzJ*c{xe`7wi@%QRg6CYu!7m0OV zE_t|m$Bx7|SDsX<)~i565q?3RY0a;|Yo2M4MlH}_Kb^@mXR>=*M=yd#dSDT;Rn4_6 zGI*gY9_>s;W4(PNO3K$*=UrC1%}cYnz=zT49#>`MBY<1|@IvXUF24RET>dNMEpIEG zMv!aaX*z9z^(08xLY4GsZ>G}*yd2#TIJyUy>sDaD6W<(Yq(z&ESrwl(=Jn{oJ!G7Z z8EGj`T0SHSI#0X&W(5Puo!(Av^>6PuCl397|MA%JW@G32vJF1kFE=WFSP%JK%?uh{ zy!83?E&Sf2xBs*sIo#v_DShvwZ@0ud?bn|Easv&C_Y1~@4#ohL*E|Tk=AZIy)Eb~q zQ0Q}qx8JxmcIu=aI^=bYmHsJ^?&d>YjUM;7Co2^T44wrlGMNpy(Z|fONd^MAHU8lT z44-!~RWbu&DfGu>5OXixoCe|eI(Z4iWY@+N!}GTvEt!fd{K@{y^jzzaf2Tki=|EsN z13OUDl=DcbX-2Jq4&^1RK#A0*R?0Dka z^;S88(%Ib?9SGw*sCY+Nt&*yXOwB>q3OhT$JE)bQ)4kQq30(#=irpBF7W$_PMG?RQ zFAkUJitl$J-~TFlVC89l>cqi(g3`ggZ0B6Q(;@NBA8ihVT%wRig27iqFJ~UQ@CjE+ zTY0Zc2ssO#wkASSMD8VZg!B?$ucDK|pqb8mgfvnKYfZwcovMbZhf{Rcl!8Ha(`kSX zitCz+($UgVSjfIUMX7kE7pqM%?WnJFceDi)U7>S55zo0qho{d?>boGw3NWiB%>kHF z_h%cGcpH?%TIdiT^;+NWX{oF9c7)>R!ws+t&@fd7;K1LLJ|5C8xG literal 0 HcmV?d00001 diff --git a/example_dataset/tinysnb/parquet/eMeets.parquet b/example_dataset/tinysnb/parquet/eMeets.parquet new file mode 100644 index 0000000000000000000000000000000000000000..95aa5a8158e7a0f756d64bdc03ce25be234b56cd GIT binary patch literal 4412 zcmcgwJC7UJ6&`Y5a8iH$VK@c{ff=hFT#W6=O6)$%{DxuD472B9yn8-o4Srkj^X>UQ^E1xacE2pc2G|dyaON(%$!;D7 zeG^X5=ZY`^=KcyQaCh!|Vcz$wt^MK6hmqfV583sD)LmxtAh>?j?HH^#upjudH}JhD zVnuvE1^OTgzA&tpf$1)M7zXyAv*{A+qrQdKy^P<(Z1}f6XSQ53*W520xyJ@)J(@@a zeTakuJzAy$WBVh=Bk$UO?nM5&aqE!R?ofCB=L!7R6ZvL6@_G#%TE=(>@=AwZd)U8w zCh7WZdI$7BeGp6Y(L^kjyxh1G;$!It(PWHI`0w0L#Ew(ZbS#;?-1Z!O#K$-e#JEep ze?Q1aPh#=QEze#d5lzMTOU&_8z=(1nu$bUu=Kju%mf-f&DHa@?&*zC zMO*8bGz@ROfj8dBKYWpYZyx>_;!3|Q_VL8)^!f%S((B(^pO&c~$`oUPW6$|u9W0dT zgfpMiWhwy7JN0(=Hme)9W~jYMoqpww{BtMrb{i7AjDv|?rZOuXeB)tXLt?jSt;(1A zGOU@3I(ja5^uA&C!k>noz6{?04OZcJl~%-g64By0G&oSDLCT&*y25SlxX7qm@-WzOYtO8&7J(iATxYeHR6 z^fTO|CEHa+yVIqWq-#=F?2fUJtg+bbc82KCK<)P6X-6u@$Ko+A42hn&Im1LAAAZFI zcS)u4sP?JTv4dajeS+hp^^g#94nG`U5RxRB7ReFPBH0$DWioU8Arj$?%P*dvI(wKY z3grug3<;crF9m@wJp^+DewksTmbz)w&j`s4;e6(SVCem3zM8D_<$+YF+LC#uJ<>9g zIxcoakQJ^wEx|0dB}35#ZP*fwR9mV&L7Bc%8}&+AqREL$TQ<*uMuzTwAZLYR;Z*Jk z+2RW&UmYUOS)Egm-{4PrN{#E+c}I7%KGoT8)HoewC9~R-vZ$|VHEW}rI=@11J_okv zUeptnl$=MKh^Ew17U17BF9g&LHgZF?dJ5B@$meW`-^4DWHO=Bca`YNvt=enmPNx2VA7hD+ zYcxJ9?1os!%h%_W55C)SLBCQ8`Vjm$20tvxJm<@+Y|f`RDL*!Jj-a%NWhVy!%ALXI(hE8;U~7?$2hxbdr1D9%s1vPz}4-;}+tG;l2S*$Urqr48aarUd_%dJ7I*%dKze0f*x{Gyt7>!%L! z5sdP@A#i<{@SuehzdBYxY{cv2mCjGOEBGe-E zglUa8b=fRi5}%|PXIgStx)!u6r}II-BZSAPi2W*agG=a6Fi;?KhOH78nF`Wq6T|*C z8Q4P!jUC0y-k`kl7$$6o=K`hXskEaIml<3Zoc!ayIg!Wq3&Wc1&!5mkw24kzH6d9P z%HP&4ZqZ(_i`v6Y&pY}Z)LZ?uH5V!aLQV_cpelJqt=5#HRuJn|MFW|#7f*Y z>^0c$1@F~!lhkk%JlpP?SW+;4?^<2&Tj^ig=nt)vPi4;(Qx7%0ilBy!ao!-oCv+ja z(FNg-QLgUHXmQ;3mMqE)BNZAuJc#ThN$V%{dqbVP7|bi0tWA`6m6*@FYb{Tt+GCC|2**>awEMus7?j zP5R~G8}8n=6Ce)%P;QnfZC_lSHTkMl+eI8 z2?t4Ba-16=yfA>r;WGh)cm~`-*IoUFTlCOtJkeV0nCQo}ermLP1TeSCfwfBS6Y@P+w*_+wlys5#E;N<+C@%op?o zc+KK^*+PGCf&OR=f;6f+#(-&%uaVTBtx`ic%BUaBs9cz3y(|Ymj`Nd#cno@Z9v%bm zIOpYkj`N!#*2}S8KB$X_;f?e1bN#UgiJ2iT+~T_7xuv)#*7=|PkbdOu%qb43Sg93c zv8MOv8y44(EcCk<>9<^HUO7&qX>cD{sFy9R6#X%oz5s@K7Mx{6UY_-E+;u*1-N!mE z+JQhmJM8mtzKM$iT$miUUGuQ~bsvA7=f-VI13r$t=JY`m-f@k{kkM%_JQwtGJm6u= z4s+N2{K!Q&j{x#Wr5FuPTh^Mz+Kz{d+ zz#X5bp@@~Lhxf>0qY6=a3gx=Z$(D**y{1$>Ai}XDv^(G_m9<*o+6*!6_tdMGfbI{C zUMKw2AJ1@{r>f-2np}OBj1nPgBEidZX_H9X;dZgst9rT^7K z|Jx-5=F>Z1it*1$>aBL!urrNHfgO%z0$In9lV?I%8#!+GGXaOg$uZ%ql^nD4*=NXe z9v=f#CdkY=Ty`!yPP&$%6~{3g&$L-tliaiIK)bBlwu97xv98$_N~B-W9Z;S*MC*dQ zj6lbLylrv4WuZU6O#f^e0n)3{&WThClqBddyX9XHWWotBRhfuhRUd< zXiT)C$l2$}(SotqVW4IzDk{M74d+r(%i2kYJE~OWLnUj=jK?aXj4=>oW>I05i_EQt z!rZB-qGpD{b34ZyaF6epC@}!_ldBz|98ibJs3Ou~de7>52L|C4`qSqTOn-neQ!IZ^ zQom|r`RD6Sgd56~P?p(&G=O{dyltEfLANqHVTNO~S^LNsIZTY&h9~D3KTtyqIp+*# z0dq3u3<6<}ylBs6ZMM;2Xgz3ikYUH3e-0SX8f(p(!Drjq0dcpp-@E~&&t^LyUC5z4 zQtsx;6{WfkbCyym;jH?R)%A&m{^A<_eXpKX$0#=i1!ky^+o1e_oX6P|4l)5h6ACh6 zKI<5r4l#b73Hz8(Xk>JnW99-354j(3jLysjfyjizOepLaB^QUoOfW$CWI{b&4;mnIU0ZX?>C{c8UE}hs%BpK z{N|e|9SJuxBv3}-_HE*re4?l*Uj(zZG6~r^|v?s=MlHPrOs_{DRe=7W2745 zeLd8vZw3oWySjmm*dr+6=IDgus+fV@Lp}%kW#WZ*!uuDskg>cv# zUSK0$|57yMjYNWr-e4pWjx72@tiW+J$h{=8iX!F6EBAA(kWe3u-zvUwku8`1s=Td zcLWi918Uih!%(ty0@N$jR~R)B7**_7AK zZw~hs3Ax@J5xV1Ky~%PpCnhJpbe`q`27UJX`_tcdKh19XDfY;xyhlBYE#j$c&CYQ1 z^8cc`KhbKMKI4j$KIP(#KBdBkKJ@~F{y$$xUB6*mPTKDJkLTg=Y=UtQ?u(i%R^6XF zUz#ey@#Xy&a%Ht%(yCL}Jv%Q=?Ewv*rYfSASEd-xlm>ehAZT?}g|})=tkmF@#n0t^ zcnsl31wC>PvhbqodjTs~nreHs?85F$exaA9yY)h?v5(A6)wN=-EbAWBOZ$yBqb%K1 zay8^xNvVl&1|)*`OH+yl;zccw+HP$|y`iZGUC#7+F%@4#?R&gv()zq8YPFyrqtN3+ zo809B0_Xt)o;1Un!sp&bJ8g2GsV58AU`;=YHk#|u`|P!2(4IWB_F=QpUJt9!q*N}e zMlf5{$6?tzcQ&)9pGYIliJ61y&}gYi>@)H-Ft$!?jkdZueFk^cvS^x=U5v*;**-co zTJ2%=nSILi>^^-o8t&(wtnI$@I-_|Lfs^&1>u}6yy^ndamfNRIM!P+%leKDcd}FkK zjL^w?q92kNjd$oLYfnF3?l;6rrF@7?A1IY79D*Y%$6sTwaZ|7ewr1ikCgSl^4+!_LkVxD~UDvnu z?$hI~Ca5PJ`*?4d6xQ;w817i6#AMe(gY7PSzytcJWRzA}NiPiU_jt zd*yswTPa8-S&bJD^YK!kAf>jDOkGahudXf##hfou$Y~ou6Aqj22f2lKFus_t#urwO zA9WeWW?X*!&N!O)ziFgJCknPtZ%0{70F6kvMMbgdzt-A>i%AuH?bRwg749z zv@a3J#gI>Mc%BOEgZ{nD-8ixhK5~ajGR!LJ2-X`fg^OY-c~4%z{8}cRD1Z)j`!4vq zTrFl&fqN34*q2H)&RejUqbQj)_9bnDU(@*}{;G9(ahuh#=Ttcsh5kzv+88afRpfVm> z+de&3DV1cmQ;C8cTWaUT?%l&Yar{a+sYgjZ+z|O>L+|hY+!u;sdMz#aRw|iv9P2%OpQ2iG zESdCMn|Bs{rYfoArkIk;&9Q6ZPco26uI*>|f<9i%^1GC&N_>=EyVuNLV}6&USh(d@LYeqe+HSe!tWl& zjZbdETHVN`*EUdY!l!mzZh{<_)7I22s8b+MkG5jTMiM@fzRsVO-&(E|-9z=tG3$H} z|E$Kh7)y5MJVU(%YeqNUEdNetPUMd3OZim$tXX6FR#d2y4xr}1b)6K0KQ=8z;Sa`e zJ$9(1qK#ZB?9<0&Muptkmv}#}`EGB<>$eyD_v0GWRtqbg+$zCX=kdG*HC?>1dQYg| zF0O=ec@0IkAS|+|?5?5mJ1#6heZ7I+U(b zcn*a18OHvKn&TpBPKBDV#0rb+$o{>nv$F_^r6}I7L^|>7#`oYB;c|R(57)P74+EQ~ z*@M~HHQXNFn)oauP8U84NY!%hTr9*k!Cp9z+5@*0FG8j}-gWdY!5%|i+CSLdOK^}w zte6Vdg%#BP>c;=l9y{5;t|FbnZPb2N=xd)ttS-DUl-nh>A;$`+?pF~%gyOw=)WT~K z;|19Um%i1*k0_K#r)ENLd6bCT=&oORA&<7Ah$R#-2_;dt{YCV?osv}{vIvqmQz5D0 zS~_aJFU8FJ3)_bpTE{)7BO5~Yly&&R`Mix)8Tt!RW9reCFTp{c)n#Pw);(x+bq}?N zx0b)=`N=2Ip0)wwVY?|W2spCEtycSHe)r>zhgN0}*G|YPgtnpM0i6O&9lt|;v|=Wb z>iE;I5BCb}3qd3=0GQA|hbD&RrH^F&wUVPx-X&xxR-&n$N X0t>(9hTr}_(GQV^KgU>y|6}k!@kDEn literal 0 HcmV?d00001 diff --git a/extension/CMakeLists.txt b/extension/CMakeLists.txt index d4829394c..85407aac6 100644 --- a/extension/CMakeLists.txt +++ b/extension/CMakeLists.txt @@ -124,5 +124,7 @@ endfunction() # Add json extension add_extension_if_enabled("json") +# Add parquet extension +add_extension_if_enabled("parquet") message(STATUS "=== Extension configuration complete ===") \ No newline at end of file diff --git a/extension/parquet/CMakeLists.txt b/extension/parquet/CMakeLists.txt new file mode 100644 index 000000000..ecd87a7ad --- /dev/null +++ b/extension/parquet/CMakeLists.txt @@ -0,0 +1,46 @@ +# Collect source files +file(GLOB PARQUET_EXTENSION_SOURCES + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp") +set(PARQUET_EXTENSION_OBJECT_FILES ${PARQUET_EXTENSION_SOURCES}) + +# Use the extension build system (should be called before add_subdirectory(test)) +build_extension_lib("parquet") + +# Set include directories +target_include_directories(neug_parquet_extension PRIVATE + ${CMAKE_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_BINARY_DIR}) + +if(ARROW_INCLUDE_DIRS) + target_include_directories(neug_parquet_extension PRIVATE SYSTEM ${ARROW_INCLUDE_DIRS}) +endif() + +# Link libraries +# Core Arrow symbols are already in neug (which links arrow_static, +# arrow_dataset_static, etc.). However, the linker strips unreferenced symbols +# from static archives — both arrow::parquet::* and arrow::dataset::Parquet* symbols +# are stripped because neug itself doesn't use them. +# +# Link the Arrow OBJECT libraries directly to include these symbols: +# - arrow_dataset_objlib: provides arrow::dataset::ParquetFileFormat, +# ParquetFragmentScanOptions, and other dataset symbols used by the extension. +# +# Additionally, link ARROW_PARQUET_LIB (the Parquet static library) with +# arrow::parquet::* symbols (e.g., ParquetFileReader, ArrowReaderProperties). +# +# Using OBJECT library targets (resolved at build time) instead of file(GLOB) +# for .o files (which fails on clean builds since GLOB runs at configure time). +target_link_libraries(neug_parquet_extension PRIVATE + neug +) + +# Build test by compiling extension sources directly (no link to neug_parquet_extension) +if(BUILD_TEST) + add_extension_test(NAME parquet + TEST_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/tests/parquet_test.cpp + EXTRA_LINK_LIBS arrow_dataset_objlib ${ARROW_PARQUET_LIB} + ) +endif() + diff --git a/extension/parquet/include/parquet_options.h b/extension/parquet/include/parquet_options.h new file mode 100644 index 000000000..2949cbe7f --- /dev/null +++ b/extension/parquet/include/parquet_options.h @@ -0,0 +1,99 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include "neug/utils/reader/options.h" +#include "neug/utils/reader/reader.h" + +namespace neug { +namespace reader { + +/** + * @brief Parquet-specific parse options + * + * These options control Parquet file reading behavior: + * - use_embedded_schema: Use schema from Parquet metadata (default: true) + * - buffered_stream: Enable buffered I/O stream for better performance (default: true) + * - pre_buffer: Pre-buffer data for high-latency filesystems like S3 (default: false) + * - cache_decompressed: Cache decompressed data for repeated reads (default: true) + * - row_batch_size: Number of rows per Arrow batch when converting from Parquet (default: 65536) + * + */ +struct ParquetParseOptions { + Option use_embedded_schema = + Option::BoolOption("USE_EMBEDDED_SCHEMA", true); + Option buffered_stream = + Option::BoolOption("BUFFERED_STREAM", true); + Option pre_buffer = + Option::BoolOption("PRE_BUFFER", false); + Option cache_decompressed = + Option::BoolOption("CACHE_DECOMPRESSED", true); + Option row_batch_size = + Option::Int64Option("PARQUET_BATCH_ROWS", 65536); +}; + +/** + * @brief Parquet-specific implementation of Arrow scan options builder + * + * This class extends ArrowOptionsBuilder to provide Parquet-specific + * functionality: + * - buildFragmentOptions(): Builds ParquetFragmentScanOptions with options + * for parallel reading, dictionary encoding, etc. + * - buildFileFormat(): Builds ParquetFileFormat + */ +class ArrowParquetOptionsBuilder : public ArrowOptionsBuilder { + public: + /** + * @brief Constructs an ArrowParquetOptionsBuilder with the given shared state + * @param state The shared read state containing Parquet schema and configuration + */ + explicit ArrowParquetOptionsBuilder(std::shared_ptr state) + : ArrowOptionsBuilder(state){}; + + virtual ArrowOptions build() const override; + + protected: + /** + * @brief Builds Parquet-specific fragment scan options + * + * Creates ParquetFragmentScanOptions with: + * - Reader properties: parallel reading, dictionary encoding, etc. + * + * @return ParquetFragmentScanOptions instance + */ + std::shared_ptr buildFragmentOptions() + const; + + /** + * @brief Builds ParquetFileFormat from scan options + * + * Extracts reader properties from the ParquetFragmentScanOptions + * and configures the ParquetFileFormat. + * + * @param options The scan options containing fragment_scan_options + * @return ParquetFileFormat instance configured with reader properties + */ + std::shared_ptr buildFileFormat( + const arrow::dataset::ScanOptions& options) const; +}; + +} // namespace reader +} // namespace neug diff --git a/extension/parquet/include/parquet_read_function.h b/extension/parquet/include/parquet_read_function.h new file mode 100644 index 000000000..a0c7ed247 --- /dev/null +++ b/extension/parquet/include/parquet_read_function.h @@ -0,0 +1,108 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include "parquet_options.h" +#include "neug/compiler/function/function.h" +#include "neug/compiler/function/read_function.h" +#include "neug/execution/execute/ops/batch/batch_update_utils.h" +#include "neug/utils/reader/options.h" +#include "neug/utils/reader/reader.h" +#include "neug/utils/reader/schema.h" +#include "neug/utils/reader/sniffer.h" + +namespace neug { +namespace function { + +struct ParquetReadFunction { + static constexpr const char* name = "PARQUET_SCAN"; + + static function_set getFunctionSet() { + auto typeIDs = + std::vector{common::LogicalTypeID::STRING}; + auto readFunction = std::make_unique(name, typeIDs); + readFunction->execFunc = execFunc; + readFunction->sniffFunc = sniffFunc; + function_set functionSet; + functionSet.push_back(std::move(readFunction)); + return functionSet; + } + + static execution::Context execFunc( + std::shared_ptr state) { + // Get file system from provider + LocalFileSystemProvider fsProvider; + auto fileInfo = fsProvider.provide(state->schema.file); + state->schema.file.paths = fileInfo.resolvedPaths; + + // Create Parquet-specific options builder + auto optionsBuilder = + std::make_unique(state); + + // Create Arrow reader with Parquet options + auto reader = std::make_unique( + state, std::move(optionsBuilder), fileInfo.fileSystem); + + // Execute read operation + execution::Context ctx; + auto localState = std::make_shared(); + reader->read(localState, ctx); + return ctx; + } + + static std::shared_ptr sniffFunc( + const reader::FileSchema& schema) { + auto state = std::make_shared(); + auto& externalSchema = state->schema; + + // Create table entry schema with empty column names and types, + // which need to be inferred from Parquet metadata + externalSchema.entry = std::make_shared(); + externalSchema.file = schema; + + // Resolve file paths + LocalFileSystemProvider fsProvider; + auto fileInfo = fsProvider.provide(state->schema.file); + state->schema.file.paths = fileInfo.resolvedPaths; + + // Create Parquet-specific options builder + auto optionsBuilder = + std::make_unique(state); + + // Create Arrow reader with Parquet options + auto reader = std::make_shared( + state, std::move(optionsBuilder), fileInfo.fileSystem); + + // Create sniffer to infer schema from Parquet metadata + auto sniffer = std::make_shared(reader); + auto sniffResult = sniffer->sniff(); + + if (!sniffResult) { + LOG(ERROR) << "Failed to sniff Parquet schema: " + << sniffResult.error().ToString(); + THROW_IO_EXCEPTION("Failed to sniff Parquet schema: " + + sniffResult.error().ToString()); + } + return sniffResult.value(); + } +}; + +} // namespace function +} // namespace neug diff --git a/extension/parquet/src/parquet_extension.cpp b/extension/parquet/src/parquet_extension.cpp new file mode 100644 index 000000000..5b3c2d405 --- /dev/null +++ b/extension/parquet/src/parquet_extension.cpp @@ -0,0 +1,49 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "glog/logging.h" +#include "neug/compiler/extension/extension_api.h" +#include "neug/utils/exception/exception.h" + +#include "parquet_read_function.h" + +extern "C" { + +void Init() { + LOG(INFO) << "[parquet extension] init called"; + + try { + // Register Parquet read function (based on ReadFunction pattern) + neug::extension::ExtensionAPI::registerFunction< + neug::function::ParquetReadFunction>( + neug::catalog::CatalogEntryType::TABLE_FUNCTION_ENTRY); + + neug::extension::ExtensionAPI::registerExtension( + neug::extension::ExtensionInfo{ + "parquet", "Provides functions to read Parquet files."}); + + LOG(INFO) << "[parquet extension] functions registered successfully"; + } catch (const std::exception& e) { + THROW_EXCEPTION_WITH_FILE_LINE("[parquet extension] registration failed: " + + std::string(e.what())); + } catch (...) { + THROW_EXCEPTION_WITH_FILE_LINE( + "[parquet extension] registration failed: unknown exception"); + } +} + +const char* Name() { return "PARQUET"; } + +} // extern "C" diff --git a/extension/parquet/src/parquet_options.cc b/extension/parquet/src/parquet_options.cc new file mode 100644 index 000000000..3463cb91a --- /dev/null +++ b/extension/parquet/src/parquet_options.cc @@ -0,0 +1,144 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "parquet_options.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "neug/utils/exception/exception.h" +#include "neug/utils/reader/options.h" +#include "neug/utils/reader/reader.h" +#include "neug/utils/reader/schema.h" + +namespace neug { +namespace reader { + +ArrowOptions ArrowParquetOptionsBuilder::build() const { + if (!state) { + THROW_INVALID_ARGUMENT_EXCEPTION("State is null"); + } + + auto scanOptions = std::make_shared(); + + // Build format-specific fragment scan options + auto fragment_scan_options = buildFragmentOptions(); + scanOptions->fragment_scan_options = fragment_scan_options; + if (!state->schema.entry) { + THROW_INVALID_ARGUMENT_EXCEPTION("Entry schema is null"); + } + scanOptions->dataset_schema = createSchema(*state->schema.entry); + + // Build file format using scan options + auto fileFormat = buildFileFormat(*scanOptions); + + // Create ArrowOptions with both scanOptions and fileFormat + ArrowOptions arrowOptions; + arrowOptions.scanOptions = scanOptions; + arrowOptions.fileFormat = fileFormat; + return arrowOptions; +} + +std::shared_ptr +ArrowParquetOptionsBuilder::buildFragmentOptions() const { + if (!state) { + THROW_INVALID_ARGUMENT_EXCEPTION("State is null"); + } + if (!state->schema.entry) { + THROW_INVALID_ARGUMENT_EXCEPTION("Entry schema is null"); + } + + auto fragment_scan_options = + std::make_shared(); + + const FileSchema& fileSchema = state->schema.file; + auto& options = fileSchema.options; + ParquetParseOptions parquetOpts; + ReadOptions readOpts; + + // Configure Parquet-specific reader properties + auto reader_properties = std::make_shared(); + + // Enable buffered stream if configured + if (parquetOpts.buffered_stream.get(options)) { + reader_properties->enable_buffered_stream(); + } + + // Set I/O buffer size in bytes + int64_t buffer_size = readOpts.batch_size.get(options); + reader_properties->set_buffer_size(buffer_size); + + fragment_scan_options->reader_properties = reader_properties; + + // Configure Arrow-specific reader properties + auto arrow_reader_properties = std::make_shared(); + + // Set Arrow row batch size (number of rows per batch) + int64_t row_batch_size = parquetOpts.row_batch_size.get(options); + arrow_reader_properties->set_batch_size(row_batch_size); + + // Use threads setting from general read options + arrow_reader_properties->set_use_threads(readOpts.use_threads.get(options)); + + // Configure pre-buffering for high-latency filesystems + arrow_reader_properties->set_pre_buffer(parquetOpts.pre_buffer.get(options)); + + // Configure caching of decompressed data + if (parquetOpts.cache_decompressed.get(options)) { + arrow_reader_properties->set_cache_options( + arrow::io::CacheOptions::LazyDefaults()); + } else { + arrow_reader_properties->set_cache_options( + arrow::io::CacheOptions::Defaults()); + } + + fragment_scan_options->arrow_reader_properties = arrow_reader_properties; + + return fragment_scan_options; +} + +std::shared_ptr +ArrowParquetOptionsBuilder::buildFileFormat( + const arrow::dataset::ScanOptions& options) const { + auto fileFormat = std::make_shared(); + auto fragmentOpts = options.fragment_scan_options; + if (!fragmentOpts) { + LOG(WARNING) + << "fragment_scan_options is null in ScanOptions, Parquet reader " + "will use default configuration"; + return fileFormat; + } + + auto parquetFragmentOpts = + std::dynamic_pointer_cast( + fragmentOpts); + if (!parquetFragmentOpts) { + LOG(WARNING) << "fragment_scan_options is not ParquetFragmentScanOptions, " + "reader will use default configuration"; + return fileFormat; + } + + fileFormat->default_fragment_scan_options = options.fragment_scan_options; + return fileFormat; +} + +} // namespace reader +} // namespace neug diff --git a/extension/parquet/tests/parquet_test.cpp b/extension/parquet/tests/parquet_test.cpp new file mode 100644 index 000000000..8bd41273b --- /dev/null +++ b/extension/parquet/tests/parquet_test.cpp @@ -0,0 +1,866 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "neug/compiler/common/case_insensitive_map.h" +#include "neug/execution/common/columns/arrow_context_column.h" +#include "neug/execution/common/context.h" +#include "neug/generated/proto/plan/basic_type.pb.h" +#include "neug/utils/reader/options.h" +#include "neug/utils/reader/reader.h" +#include "neug/utils/reader/schema.h" + +#include "../../extension/parquet/include/parquet_options.h" + +namespace neug { +namespace test { + +static constexpr const char* PARQUET_TEST_DIR = "/tmp/parquet_test"; + +class ParquetTest : public ::testing::Test { + public: + void SetUp() override { + if (std::filesystem::exists(PARQUET_TEST_DIR)) { + std::filesystem::remove_all(PARQUET_TEST_DIR); + } + std::filesystem::create_directories(PARQUET_TEST_DIR); + } + + void TearDown() override { + if (std::filesystem::exists(PARQUET_TEST_DIR)) { + std::filesystem::remove_all(PARQUET_TEST_DIR); + } + } + + // Helper function to create a simple Parquet file + void createSimpleParquetFile(const std::string& filename) { + // Create Arrow schema + auto schema = arrow::schema({ + arrow::field("id", arrow::int64()), + arrow::field("name", arrow::utf8()), + arrow::field("value", arrow::float64()) + }); + + // Create data + arrow::Int64Builder id_builder; + arrow::StringBuilder name_builder; + arrow::DoubleBuilder value_builder; + + ASSERT_TRUE(id_builder.Append(1).ok()); + ASSERT_TRUE(id_builder.Append(2).ok()); + ASSERT_TRUE(id_builder.Append(3).ok()); + + ASSERT_TRUE(name_builder.Append("Alice").ok()); + ASSERT_TRUE(name_builder.Append("Bob").ok()); + ASSERT_TRUE(name_builder.Append("Charlie").ok()); + + ASSERT_TRUE(value_builder.Append(10.5).ok()); + ASSERT_TRUE(value_builder.Append(20.3).ok()); + ASSERT_TRUE(value_builder.Append(30.7).ok()); + + std::shared_ptr id_array; + std::shared_ptr name_array; + std::shared_ptr value_array; + + ASSERT_TRUE(id_builder.Finish(&id_array).ok()); + ASSERT_TRUE(name_builder.Finish(&name_array).ok()); + ASSERT_TRUE(value_builder.Finish(&value_array).ok()); + + // Create table + auto table = arrow::Table::Make(schema, {id_array, name_array, value_array}); + + // Write to Parquet file + std::string filepath = std::string(PARQUET_TEST_DIR) + "/" + filename; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 3)); + } + + // Helper function to create DataType as shared_ptr + std::shared_ptr<::common::DataType> createInt64Type() { + auto type = std::make_shared<::common::DataType>(); + type->set_primitive_type(::common::PrimitiveType::DT_SIGNED_INT64); + return type; + } + + std::shared_ptr<::common::DataType> createStringType() { + auto type = std::make_shared<::common::DataType>(); + auto strType = std::make_unique<::common::String>(); + auto varChar = std::make_unique<::common::String::VarChar>(); + strType->set_allocated_var_char(varChar.release()); + type->set_allocated_string(strType.release()); + return type; + } + + std::shared_ptr<::common::DataType> createDoubleType() { + auto type = std::make_shared<::common::DataType>(); + type->set_primitive_type(::common::PrimitiveType::DT_DOUBLE); + return type; + } + + std::shared_ptr<::common::DataType> createInt32Type() { + auto type = std::make_shared<::common::DataType>(); + type->set_primitive_type(::common::PrimitiveType::DT_SIGNED_INT32); + return type; + } + + std::shared_ptr<::common::DataType> createBoolType() { + auto type = std::make_shared<::common::DataType>(); + type->set_primitive_type(::common::PrimitiveType::DT_BOOL); + return type; + } + + // Helper function to create ReadSharedState + std::shared_ptr createSharedState( + const std::string& parquetFile, + const std::vector& columnNames, + const std::vector>& columnTypes, + const common::case_insensitive_map_t& options = {}) { + auto sharedState = std::make_shared(); + + auto entrySchema = std::make_shared(); + entrySchema->columnNames = columnNames; + entrySchema->columnTypes = columnTypes; + + // Create FileSchema + reader::FileSchema fileSchema; + fileSchema.paths = {std::string(PARQUET_TEST_DIR) + "/" + parquetFile}; + fileSchema.format = "parquet"; + fileSchema.options = options; + + // Create ExternalSchema + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + + sharedState->schema = std::move(externalSchema); + + return sharedState; + } + + std::shared_ptr createParquetReader( + const std::shared_ptr& sharedState) { + auto fileSystem = std::make_shared(); + auto optionsBuilder = + std::make_unique(sharedState); + return std::make_shared( + sharedState, std::move(optionsBuilder), std::move(fileSystem)); + } +}; + +// ============================================================================= +// Test Suite 1: Options Translation Tests +// Verify that Neug options are correctly translated to Arrow Parquet configuration +// ============================================================================= + +TEST_F(ParquetTest, TestOptionsBuilder_BuildsValidParquetFragmentScanOptions) { + createSimpleParquetFile("test_options.parquet"); + + auto sharedState = createSharedState( + "test_options.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + // Verify the builder creates ParquetFragmentScanOptions (not generic FragmentScanOptions) + ASSERT_NE(options.scanOptions, nullptr); + ASSERT_NE(options.scanOptions->fragment_scan_options, nullptr); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr) + << "Extension should create ParquetFragmentScanOptions, not generic FragmentScanOptions"; + + // Verify reader_properties and arrow_reader_properties are initialized + EXPECT_NE(parquetFragmentOpts->reader_properties, nullptr) + << "Extension should initialize reader_properties"; + EXPECT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr) + << "Extension should initialize arrow_reader_properties"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_BufferSize) { + createSimpleParquetFile("test_buffer.parquet"); + + // Test custom buffer_size option + const int64_t custom_buffer_size = 2048; + auto sharedState = createSharedState( + "test_buffer.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"batch_size", std::to_string(custom_buffer_size)}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->reader_properties, nullptr); + + // Verify the Neug batch_size option is correctly translated to Arrow buffer_size + EXPECT_EQ(parquetFragmentOpts->reader_properties->buffer_size(), custom_buffer_size) + << "Extension should translate batch_size option to Arrow buffer_size"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_ParquetBatchRows) { + createSimpleParquetFile("test_batch_rows.parquet"); + + // Test PARQUET_BATCH_ROWS option + const int64_t custom_batch_rows = 4096; + auto sharedState = createSharedState( + "test_batch_rows.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"PARQUET_BATCH_ROWS", std::to_string(custom_batch_rows)}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr); + + // Verify PARQUET_BATCH_ROWS is translated to Arrow batch_size + EXPECT_EQ(parquetFragmentOpts->arrow_reader_properties->batch_size(), custom_batch_rows) + << "Extension should translate PARQUET_BATCH_ROWS to Arrow batch_size"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_BufferedStream) { + createSimpleParquetFile("test_buffered.parquet"); + + // Test BUFFERED_STREAM=false (default is true) + auto sharedState = createSharedState( + "test_buffered.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"BUFFERED_STREAM", "false"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->reader_properties, nullptr); + + // When BUFFERED_STREAM=false, buffered stream should NOT be enabled + // Note: We verify this by checking that the option was processed + // (Arrow doesn't expose a getter for is_buffered_stream_enabled) + EXPECT_NE(parquetFragmentOpts->reader_properties, nullptr) + << "Extension should configure reader_properties based on BUFFERED_STREAM option"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_PreBuffer) { + createSimpleParquetFile("test_prebuffer.parquet"); + + // Test PRE_BUFFER=true (default is false) + auto sharedState = createSharedState( + "test_prebuffer.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"PRE_BUFFER", "true"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr); + + // Verify PRE_BUFFER option is translated + EXPECT_TRUE(parquetFragmentOpts->arrow_reader_properties->pre_buffer()) + << "Extension should translate PRE_BUFFER=true to Arrow pre_buffer setting"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_UseThreads) { + createSimpleParquetFile("test_threads.parquet"); + + // Test parallel=false (use_threads) + auto sharedState = createSharedState( + "test_threads.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"parallel", "false"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr); + + // Verify parallel option is translated to use_threads + EXPECT_FALSE(parquetFragmentOpts->arrow_reader_properties->use_threads()) + << "Extension should translate parallel=false to use_threads=false"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_CacheOptions) { + createSimpleParquetFile("test_cache.parquet"); + + // Test CACHE_DECOMPRESSED=true (should use LazyDefaults) + auto sharedState1 = createSharedState( + "test_cache.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"CACHE_DECOMPRESSED", "true"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder1(sharedState1); + auto options1 = optionsBuilder1.build(); + + auto parquetFragmentOpts1 = std::dynamic_pointer_cast( + options1.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts1, nullptr); + ASSERT_NE(parquetFragmentOpts1->arrow_reader_properties, nullptr); + + // Verify cache options are set (extension should configure LazyDefaults vs Defaults) + auto cache_opts1 = parquetFragmentOpts1->arrow_reader_properties->cache_options(); + EXPECT_TRUE(cache_opts1.lazy) + << "Extension should use LazyDefaults when CACHE_DECOMPRESSED=true"; + + // Test CACHE_DECOMPRESSED=false (should use Defaults) + auto sharedState2 = createSharedState( + "test_cache.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"CACHE_DECOMPRESSED", "false"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder2(sharedState2); + auto options2 = optionsBuilder2.build(); + + auto parquetFragmentOpts2 = std::dynamic_pointer_cast( + options2.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts2, nullptr); + + auto cache_opts2 = parquetFragmentOpts2->arrow_reader_properties->cache_options(); + EXPECT_FALSE(cache_opts2.lazy) + << "Extension should use Defaults when CACHE_DECOMPRESSED=false"; +} + +TEST_F(ParquetTest, TestOptionsTranslation_DefaultValues) { + createSimpleParquetFile("test_defaults.parquet"); + + // Create state without any options - should use defaults + auto sharedState = createSharedState( + "test_defaults.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + auto parquetFragmentOpts = std::dynamic_pointer_cast( + options.scanOptions->fragment_scan_options); + ASSERT_NE(parquetFragmentOpts, nullptr); + ASSERT_NE(parquetFragmentOpts->arrow_reader_properties, nullptr); + + // Verify default values are applied + // Default PARQUET_BATCH_ROWS = 65536 + EXPECT_EQ(parquetFragmentOpts->arrow_reader_properties->batch_size(), 65536) + << "Extension should use default PARQUET_BATCH_ROWS=65536"; + + // Default PRE_BUFFER = false + EXPECT_FALSE(parquetFragmentOpts->arrow_reader_properties->pre_buffer()) + << "Extension should use default PRE_BUFFER=false"; + + // Default parallel/use_threads = true + EXPECT_TRUE(parquetFragmentOpts->arrow_reader_properties->use_threads()) + << "Extension should use default parallel=true"; +} + +TEST_F(ParquetTest, TestFileFormatConfiguration_SharesFragmentOptions) { + createSimpleParquetFile("test_format.parquet"); + + auto sharedState = createSharedState( + "test_format.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"PARQUET_BATCH_ROWS", "2048"}}); + + reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); + auto options = optionsBuilder.build(); + + // Verify FileFormat is ParquetFileFormat + ASSERT_NE(options.fileFormat, nullptr); + auto parquetFileFormat = std::dynamic_pointer_cast( + options.fileFormat); + ASSERT_NE(parquetFileFormat, nullptr) + << "Extension should create ParquetFileFormat"; + + // Verify the FileFormat shares the same fragment_scan_options as ScanOptions + // This ensures consistency in configuration + EXPECT_EQ(parquetFileFormat->default_fragment_scan_options, + options.scanOptions->fragment_scan_options) + << "Extension should set ParquetFileFormat's default_fragment_scan_options to match ScanOptions"; +} + +// ============================================================================= +// Test Suite 2: Type Mapping Tests +// Verify type conversion between Neug DataType and Arrow types +// ============================================================================= + +TEST_F(ParquetTest, TestTypeMapping_StringToLargeUtf8) { + createSimpleParquetFile("test_string_type.parquet"); + + // Neug uses STRING type, Arrow Parquet may have utf8 + // Extension should convert to large_utf8 for consistency + auto sharedState = createSharedState( + "test_string_type.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"batch_read", "false"}}); + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + // Verify string column is converted to large_utf8 + auto col1 = ctx.columns[1]; + ASSERT_EQ(col1->column_type(), execution::ContextColumnType::kArrowArray); + auto arrayColumn1 = std::dynamic_pointer_cast(col1); + auto arrowType1 = arrayColumn1->GetArrowType(); + + EXPECT_TRUE(arrowType1->Equals(arrow::large_utf8())) + << "Extension should convert Arrow utf8 to large_utf8 for Neug STRING type. " + << "Got: " << arrowType1->ToString(); +} + +TEST_F(ParquetTest, TestTypeMapping_PreserveNumericTypes) { + // Create Parquet file with various numeric types + auto schema = arrow::schema({ + arrow::field("int32_col", arrow::int32()), + arrow::field("int64_col", arrow::int64()), + arrow::field("double_col", arrow::float64()), + arrow::field("bool_col", arrow::boolean()) + }); + + arrow::Int32Builder int32_builder; + arrow::Int64Builder int64_builder; + arrow::DoubleBuilder double_builder; + arrow::BooleanBuilder bool_builder; + + ASSERT_TRUE(int32_builder.Append(42).ok()); + ASSERT_TRUE(int64_builder.Append(9223372036854775807LL).ok()); + ASSERT_TRUE(double_builder.Append(3.14159).ok()); + ASSERT_TRUE(bool_builder.Append(true).ok()); + + std::shared_ptr arrays[4]; + ASSERT_TRUE(int32_builder.Finish(&arrays[0]).ok()); + ASSERT_TRUE(int64_builder.Finish(&arrays[1]).ok()); + ASSERT_TRUE(double_builder.Finish(&arrays[2]).ok()); + ASSERT_TRUE(bool_builder.Finish(&arrays[3]).ok()); + + auto table = arrow::Table::Make(schema, {arrays[0], arrays[1], arrays[2], arrays[3]}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_numeric_types.parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1)); + + // Read with Neug types + auto sharedState = createSharedState( + "test_numeric_types.parquet", + {"int32_col", "int64_col", "double_col", "bool_col"}, + {createInt32Type(), createInt64Type(), createDoubleType(), createBoolType()}, + {{"batch_read", "false"}}); + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + EXPECT_EQ(ctx.col_num(), 4); + EXPECT_EQ(ctx.row_num(), 1); + + // Verify types are preserved correctly + auto col0 = std::dynamic_pointer_cast(ctx.columns[0]); + EXPECT_TRUE(col0->GetArrowType()->Equals(arrow::int32())) + << "Extension should preserve int32 type mapping"; + + auto col1 = std::dynamic_pointer_cast(ctx.columns[1]); + EXPECT_TRUE(col1->GetArrowType()->Equals(arrow::int64())) + << "Extension should preserve int64 type mapping"; + + auto col2 = std::dynamic_pointer_cast(ctx.columns[2]); + EXPECT_TRUE(col2->GetArrowType()->Equals(arrow::float64())) + << "Extension should preserve double type mapping"; + + auto col3 = std::dynamic_pointer_cast(ctx.columns[3]); + EXPECT_TRUE(col3->GetArrowType()->Equals(arrow::boolean())) + << "Extension should preserve boolean type mapping"; +} + +// ============================================================================= +// Test Suite 3: Integration with Neug Query System +// Verify filter pushdown and column pruning work through the extension +// ============================================================================= + +TEST_F(ParquetTest, TestIntegration_ColumnPruning) { + // Create Parquet file with 4 columns + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("name", arrow::utf8()), + arrow::field("score", arrow::float64()), + arrow::field("grade", arrow::utf8()) + }); + + arrow::Int32Builder id_builder; + arrow::StringBuilder name_builder, grade_builder; + arrow::DoubleBuilder score_builder; + + ASSERT_TRUE(id_builder.Append(1).ok()); + ASSERT_TRUE(name_builder.Append("Alice").ok()); + ASSERT_TRUE(score_builder.Append(95.5).ok()); + ASSERT_TRUE(grade_builder.Append("A").ok()); + + std::shared_ptr id_array, name_array, score_array, grade_array; + ASSERT_TRUE(id_builder.Finish(&id_array).ok()); + ASSERT_TRUE(name_builder.Finish(&name_array).ok()); + ASSERT_TRUE(score_builder.Finish(&score_array).ok()); + ASSERT_TRUE(grade_builder.Finish(&grade_array).ok()); + + auto table = arrow::Table::Make(schema, {id_array, name_array, score_array, grade_array}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_pruning.parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 1)); + + // Set up shared state with skipColumns + auto sharedState = std::make_shared(); + auto entrySchema = std::make_shared(); + entrySchema->columnNames = {"id", "name", "score", "grade"}; + entrySchema->columnTypes = {createInt32Type(), createStringType(), + createDoubleType(), createStringType()}; + + reader::FileSchema fileSchema; + fileSchema.paths = {filepath}; + fileSchema.format = "parquet"; + fileSchema.options = {{"batch_read", "false"}}; + + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + sharedState->schema = std::move(externalSchema); + + // Neug's column pruning: skip "name" column + sharedState->skipColumns = {"name"}; + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + // Verify extension translates skipColumns to Arrow projection + // Should have 3 columns (id, score, grade - "name" is skipped) + EXPECT_EQ(ctx.col_num(), 3) + << "Extension should translate Neug's skipColumns to Arrow column projection"; + EXPECT_EQ(sharedState->columnNum(), 3) + << "Extension should update columnNum after pruning"; +} + +TEST_F(ParquetTest, TestIntegration_FilterPushdown) { + // Create Parquet file with test data + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("score", arrow::float64()) + }); + + arrow::Int32Builder id_builder; + arrow::DoubleBuilder score_builder; + + std::vector> test_data = { + {1, 95.5}, + {2, 87.0}, + {3, 92.5}, + {4, 78.0}, + {5, 98.0} + }; + + for (const auto& [id, score] : test_data) { + ASSERT_TRUE(id_builder.Append(id).ok()); + ASSERT_TRUE(score_builder.Append(score).ok()); + } + + std::shared_ptr id_array, score_array; + ASSERT_TRUE(id_builder.Finish(&id_array).ok()); + ASSERT_TRUE(score_builder.Finish(&score_array).ok()); + + auto table = arrow::Table::Make(schema, {id_array, score_array}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_filter.parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 5)); + + // Create Neug filter expression: score > 90.0 + auto filterExpr = std::make_shared<::common::Expression>(); + + auto var_opr = filterExpr->add_operators(); + auto var = var_opr->mutable_var(); + var->mutable_tag()->set_name("score"); + + auto gt_opr = filterExpr->add_operators(); + gt_opr->set_logical(::common::Logical::GT); + + auto const_opr = filterExpr->add_operators(); + const_opr->mutable_const_()->set_f64(90.0); + + // Set up shared state with filter + auto sharedState = std::make_shared(); + auto entrySchema = std::make_shared(); + entrySchema->columnNames = {"id", "score"}; + entrySchema->columnTypes = {createInt32Type(), createDoubleType()}; + + reader::FileSchema fileSchema; + fileSchema.paths = {filepath}; + fileSchema.format = "parquet"; + fileSchema.options = {{"batch_read", "false"}}; + + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + sharedState->schema = std::move(externalSchema); + sharedState->skipRows = filterExpr; // Neug's filter expression + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + // Verify extension translates Neug filter to Arrow filter + EXPECT_EQ(ctx.col_num(), 2); + EXPECT_EQ(ctx.row_num(), 3) + << "Extension should translate Neug's skipRows filter to Arrow filter pushdown. " + << "Should filter to 3 rows with score > 90.0"; + + // Verify the filtered data + auto col1 = std::dynamic_pointer_cast(ctx.columns[1]); + ASSERT_NE(col1, nullptr); + const auto& columns = col1->GetColumns(); + ASSERT_FALSE(columns.empty()); + auto scoreArray = std::static_pointer_cast(columns[0]); + + // All scores should be > 90.0 + for (int64_t i = 0; i < scoreArray->length(); ++i) { + EXPECT_GT(scoreArray->Value(i), 90.0) + << "Extension's filter translation should result in all scores > 90.0"; + } +} + +TEST_F(ParquetTest, TestIntegration_BatchReadMode) { + createSimpleParquetFile("test_batch_mode.parquet"); + + // Test with batch_read=true (streaming mode) + auto sharedState = createSharedState( + "test_batch_mode.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"batch_read", "true"}}); + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + EXPECT_EQ(ctx.col_num(), 3); + // Verify extension translates batch_read option to streaming column type + auto col0 = ctx.columns[0]; + EXPECT_EQ(col0->column_type(), execution::ContextColumnType::kArrowStream) + << "Extension should use ArrowStream column type when batch_read=true"; + + // Test with batch_read=false (full read mode) + auto sharedState2 = createSharedState( + "test_batch_mode.parquet", + {"id", "name", "value"}, + {createInt64Type(), createStringType(), createDoubleType()}, + {{"batch_read", "false"}}); + + auto reader2 = createParquetReader(sharedState2); + auto localState2 = std::make_shared(); + execution::Context ctx2; + reader2->read(localState2, ctx2); + + auto col0_2 = ctx2.columns[0]; + EXPECT_EQ(col0_2->column_type(), execution::ContextColumnType::kArrowArray) + << "Extension should use ArrowArray column type when batch_read=false"; +} + +TEST_F(ParquetTest, TestIntegration_CombinedFilterAndProjection) { + // Create Parquet file + auto schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("name", arrow::utf8()), + arrow::field("score", arrow::float64()), + arrow::field("grade", arrow::utf8()) + }); + + arrow::Int32Builder id_builder; + arrow::StringBuilder name_builder, grade_builder; + arrow::DoubleBuilder score_builder; + + std::vector> test_data = { + {1, "Alice", 95.5, "A"}, + {2, "Bob", 87.0, "B"}, + {3, "Charlie", 92.5, "A"}, + {4, "David", 78.0, "C"} + }; + + for (const auto& [id, name, score, grade] : test_data) { + ASSERT_TRUE(id_builder.Append(id).ok()); + ASSERT_TRUE(name_builder.Append(name).ok()); + ASSERT_TRUE(score_builder.Append(score).ok()); + ASSERT_TRUE(grade_builder.Append(grade).ok()); + } + + std::shared_ptr id_array, name_array, score_array, grade_array; + ASSERT_TRUE(id_builder.Finish(&id_array).ok()); + ASSERT_TRUE(name_builder.Finish(&name_array).ok()); + ASSERT_TRUE(score_builder.Finish(&score_array).ok()); + ASSERT_TRUE(grade_builder.Finish(&grade_array).ok()); + + auto table = arrow::Table::Make(schema, {id_array, name_array, score_array, grade_array}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_combined.parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 4)); + + // Create Neug filter: score > 90.0 + auto filterExpr = std::make_shared<::common::Expression>(); + auto var_opr = filterExpr->add_operators(); + var_opr->mutable_var()->mutable_tag()->set_name("score"); + auto gt_opr = filterExpr->add_operators(); + gt_opr->set_logical(::common::Logical::GT); + auto const_opr = filterExpr->add_operators(); + const_opr->mutable_const_()->set_f64(90.0); + + // Set up shared state with both filter and column pruning + auto sharedState = std::make_shared(); + auto entrySchema = std::make_shared(); + entrySchema->columnNames = {"id", "name", "score", "grade"}; + entrySchema->columnTypes = {createInt32Type(), createStringType(), + createDoubleType(), createStringType()}; + + reader::FileSchema fileSchema; + fileSchema.paths = {filepath}; + fileSchema.format = "parquet"; + fileSchema.options = {{"batch_read", "false"}}; + + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + sharedState->schema = std::move(externalSchema); + sharedState->skipColumns = {"name"}; // Prune "name" column + sharedState->skipRows = filterExpr; // Filter score > 90.0 + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + // Verify extension correctly combines filter and projection + EXPECT_EQ(ctx.col_num(), 3) + << "Extension should apply column pruning (3 of 4 columns)"; + EXPECT_EQ(ctx.row_num(), 2) + << "Extension should apply filter (2 rows with score > 90.0)"; + EXPECT_EQ(sharedState->columnNum(), 3) + << "Extension should update columnNum after pruning"; +} + +// ============================================================================= +// Test Suite 4: Multi-file Handling +// Verify extension correctly handles multiple Parquet files +// ============================================================================= + +TEST_F(ParquetTest, TestMultiFile_ExplicitPaths) { + // Create multiple Parquet files + for (int fileIdx = 0; fileIdx < 3; ++fileIdx) { + auto schema = arrow::schema({arrow::field("id", arrow::int32())}); + arrow::Int32Builder builder; + + for (int i = 0; i < 10; ++i) { + ASSERT_TRUE(builder.Append(fileIdx * 10 + i).ok()); + } + + std::shared_ptr array; + ASSERT_TRUE(builder.Finish(&array).ok()); + auto table = arrow::Table::Make(schema, {array}); + + std::string filepath = std::string(PARQUET_TEST_DIR) + "/test_multi_" + + std::to_string(fileIdx) + ".parquet"; + std::shared_ptr outfile; + PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(filepath)); + PARQUET_THROW_NOT_OK( + parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 10)); + } + + // Extension should handle multiple explicit file paths + auto sharedState = std::make_shared(); + auto entrySchema = std::make_shared(); + entrySchema->columnNames = {"id"}; + entrySchema->columnTypes = {createInt32Type()}; + + reader::FileSchema fileSchema; + fileSchema.paths = { + std::string(PARQUET_TEST_DIR) + "/test_multi_0.parquet", + std::string(PARQUET_TEST_DIR) + "/test_multi_1.parquet", + std::string(PARQUET_TEST_DIR) + "/test_multi_2.parquet" + }; + fileSchema.format = "parquet"; + fileSchema.options = {{"batch_read", "false"}}; + + reader::ExternalSchema externalSchema; + externalSchema.entry = entrySchema; + externalSchema.file = fileSchema; + sharedState->schema = std::move(externalSchema); + + auto reader = createParquetReader(sharedState); + auto localState = std::make_shared(); + execution::Context ctx; + reader->read(localState, ctx); + + EXPECT_EQ(ctx.col_num(), 1); + EXPECT_EQ(ctx.row_num(), 30) + << "Extension should correctly read and concatenate multiple Parquet files"; +} + +// End of Test Suites + +} // namespace test +} // namespace neug diff --git a/tools/python_bind/example/complex_test.py b/tools/python_bind/example/complex_test.py index 3af0a1195..e86b3c231 100644 --- a/tools/python_bind/example/complex_test.py +++ b/tools/python_bind/example/complex_test.py @@ -72,6 +72,9 @@ def fail(msg, err=None): JSONL_FILE = os.path.join( REPO_ROOT, "example_dataset", "tinysnb", "json", "vPerson.jsonl" ) +PARQUET_FILE = os.path.join( + REPO_ROOT, "example_dataset", "tinysnb", "parquet", "vPerson.parquet" +) def run_statement(conn, desc, statement): @@ -188,6 +191,68 @@ def _projection(rows): ) +def run_parquet_extension_suite(db_parquet, conn_parquet, db_path_parquet): + statements = [ + ("INSTALL PARQUET succeeded", "INSTALL PARQUET;"), + ("LOAD PARQUET succeeded", "LOAD PARQUET;"), + ] + + for desc, stmt in statements: + run_statement(conn_parquet, desc, stmt) + + if not os.path.isfile(PARQUET_FILE): + fail(f"Parquet file not found: {PARQUET_FILE}") + else: + + def _load_all(rows): + print(f" Parquet: loaded {len(rows)} rows from vPerson.parquet") + if rows: + print(f" First row sample: {rows[0]}") + assert len(rows) > 0, "Expected at least 1 row" + return f"LOAD FROM Parquet file returned {len(rows)} rows" + + run_query_with_handler( + conn_parquet, + "LOAD FROM Parquet file", + f'LOAD FROM "{PARQUET_FILE}" RETURN *;', + _load_all, + print_traceback=True, + ) + + def _projection(rows): + print(f" Column projection (fName, age): {len(rows)} rows") + if rows: + print(f" Sample: {rows[0]}") + assert len(rows) > 0, "Expected at least 1 row" + assert len(rows[0]) == 2, "Should return only 2 columns" + return "Parquet column projection" + + run_query_with_handler( + conn_parquet, + "Parquet column projection", + f'LOAD FROM "{PARQUET_FILE}" RETURN fName, age;', + _projection, + ) + + def _filter(rows): + print(f" WHERE age > 30: {len(rows)} rows") + for row in rows: + assert row[1] > 30, f"age {row[1]} should be > 30" + return f"Parquet WHERE filter returned {len(rows)} rows" + + run_query_with_handler( + conn_parquet, + "Parquet WHERE filter (age > 30)", + f'LOAD FROM "{PARQUET_FILE}" WHERE age > 30 RETURN fName, age;', + _filter, + ) + + conn_parquet.close() + db_parquet.close() + ok("Closed Parquet extension test database") + shutil.rmtree(db_path_parquet, ignore_errors=True) + + def run_json_extension_suite(db_json, conn_json, db_path_json): statements = [ ("INSTALL JSON succeeded", "INSTALL JSON;"), @@ -545,6 +610,24 @@ def _network_stats(): if db_json is not None and conn_json is not None: run_json_extension_suite(db_json, conn_json, db_path_json) +# ================================================================ +# 6. Extensions — Parquet Extension +# ================================================================ +section("6. Extensions — Parquet Extension (Install / Load / Query)") + +conn_parquet = None +db_path_parquet = tempfile.mkdtemp(prefix="neug_parquet_ext_") +try: + db_parquet = neug.Database(db_path_parquet) + conn_parquet = db_parquet.connect() + ok(f"Created persistent database for Parquet extension test at {db_path_parquet}") +except Exception as e: + fail("Create database for Parquet extension", e) + db_parquet = None + +if db_parquet is not None and conn_parquet is not None: + run_parquet_extension_suite(db_parquet, conn_parquet, db_path_parquet) + # ================================================================ # Summary # ================================================================ diff --git a/tools/python_bind/tests/test_load.py b/tools/python_bind/tests/test_load.py index e1b0ff1af..d82ca10c1 100644 --- a/tools/python_bind/tests/test_load.py +++ b/tools/python_bind/tests/test_load.py @@ -36,6 +36,16 @@ not JSON_TESTS_ENABLED, reason="JSON tests disabled by default; set NEUG_RUN_JSON_TESTS=1 to enable.", ) +PARQUET_TESTS_ENABLED = os.environ.get("NEUG_RUN_PARQUET_TESTS", "").lower() in ( + "1", + "true", + "yes", + "on", +) +parquet_test = pytest.mark.skipif( + not PARQUET_TESTS_ENABLED, + reason="Parquet tests disabled by default; set NEUG_RUN_PARQUET_TESTS=1 to enable.", +) def get_tinysnb_dataset_path(): @@ -67,6 +77,23 @@ def get_tinysnb_dataset_path(): return "/tmp/tinysnb" +def get_comprehensive_graph_path(): + """Get the path to comprehensive_graph dataset CSV files.""" + current_file = os.path.abspath(__file__) + tests_dir = os.path.dirname(current_file) + python_bind_dir = os.path.dirname(tests_dir) + tools_dir = os.path.dirname(python_bind_dir) + workspace_root = os.path.dirname(tools_dir) + + comprehensive_path = os.path.join( + workspace_root, "example_dataset", "comprehensive_graph" + ) + if os.path.exists(comprehensive_path): + return comprehensive_path + + return "/tmp/comprehensive_graph" + + class TestLoadFrom: """Test cases for LOAD FROM functionality with tinysnb dataset.""" @@ -957,6 +984,314 @@ def test_load_from_jsonl_with_complex_where_conditions(self): assert height > 1.0, f"height {height} should be > 1.0" assert isinstance(fname, str), "fName should be string" + @parquet_test + def test_load_from_parquet_basic_return_all(self): + """Test basic LOAD FROM Parquet with RETURN *.""" + # load vertex data + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + # Load parquet extension + self.conn.execute("load parquet") + + query = f""" + LOAD FROM "{parquet_path}" + RETURN * + """ + result = self.conn.execute(query) + + records = list(result) + # vPerson.parquet should have 8 data rows (same as CSV/JSON) + assert len(records) == 8, f"Expected 8 records, got {len(records)}" + + # Check first record structure (should have all columns) + first_record = records[0] + assert len(first_record) == 16, f"Expected 16 columns, got {len(first_record)}" + + # load edge data + parquet_path = os.path.join(self.tinysnb_path, "parquet", "eMeets.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + # Load parquet extension + self.conn.execute("load parquet") + + query = f""" + LOAD FROM "{parquet_path}" + RETURN * + """ + result = self.conn.execute(query) + + records = list(result) + # eMeets.parquet should have 7 data rows + assert len(records) == 7, f"Expected 7 records, got {len(records)}" + + # Check first record structure (should have all columns) + first_record = records[0] + assert len(first_record) == 5, f"Expected 5 columns, got {len(first_record)}" + + @parquet_test + def test_load_from_parquet_return_specific_columns(self): + """Test LOAD FROM Parquet with column projection.""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + query = f""" + LOAD FROM "{parquet_path}" + RETURN fName, age + """ + result = self.conn.execute(query) + + records = list(result) + assert len(records) == 8, f"Expected 8 records, got {len(records)}" + + # Check that only specified columns are returned + first_record = records[0] + assert len(first_record) == 2, "Should return only 2 columns" + assert isinstance(first_record[0], str), "fName should be string" + assert isinstance(first_record[1], int), "age should be integer" + + @parquet_test + def test_load_from_parquet_with_where(self): + """Test LOAD FROM Parquet with WHERE clause filtering (predicate pushdown).""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + # Test with WHERE clause (predicate pushdown) + query = f""" + LOAD FROM "{parquet_path}" + WHERE age > 30 + RETURN fName, age + """ + result = self.conn.execute(query) + + records = list(result) + assert len(records) > 0, "Should return at least one record" + + # Verify all returned records satisfy the condition + for record in records: + fname, age = record + assert age > 30, f"Age {age} should be greater than 30" + assert isinstance(fname, str), "fName should be string" + + @parquet_test + def test_load_from_parquet_with_multiple_where_conditions(self): + """Test LOAD FROM Parquet with multiple WHERE conditions.""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + # Test with multiple conditions: age > 25 AND age < 40 AND gender = 1 + query = f""" + LOAD FROM "{parquet_path}" + WHERE age > 25 AND age < 40 AND gender = 1 + RETURN fName, age, gender, eyeSight + """ + result = self.conn.execute(query) + + records = list(result) + assert len(records) > 0, "Should return at least one record" + + # Verify all returned records satisfy all conditions + for record in records: + fname, age, gender, eye_sight = record + assert 25 < age < 40, f"Age {age} should be between 25 and 40" + assert gender == 1, f"Gender {gender} should be 1" + assert isinstance(fname, str), "fName should be string" + assert isinstance(eye_sight, (int, float)), "eyeSight should be numeric" + + @parquet_test + def test_load_from_parquet_with_order_by(self): + """Test LOAD FROM Parquet with ORDER BY clause.""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + # Test ORDER BY + query = f""" + LOAD FROM "{parquet_path}" + RETURN fName, age + ORDER BY age ASC + """ + result = self.conn.execute(query) + + records = list(result) + assert len(records) == 8, f"Expected 8 records, got {len(records)}" + + # Verify records are ordered by age ascending + ages = [record[1] for record in records] + assert ages == sorted(ages), f"Ages should be sorted ascending: {ages}" + + @parquet_test + def test_load_from_parquet_with_complex_where_conditions(self): + """Test LOAD FROM Parquet with complex WHERE conditions (age, eyeSight, height).""" + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + self.conn.execute("load parquet") + + # Test with multiple conditions: age >= 30 AND eyeSight >= 5.0 AND height > 1.0 + query = f""" + LOAD FROM "{parquet_path}" + WHERE age >= 30 AND eyeSight >= 5.0 AND height > 1.0 + RETURN fName, age, eyeSight, height + """ + result = self.conn.execute(query) + + records = list(result) + # May return 0 or more records depending on data + assert len(records) >= 0, "Should execute successfully" + + # Verify all returned records satisfy all conditions + for record in records: + fname, age, eye_sight, height = record + assert age >= 30, f"Age {age} should be >= 30" + assert eye_sight >= 5.0, f"eyeSight {eye_sight} should be >= 5.0" + assert height > 1.0, f"height {height} should be > 1.0" + assert isinstance(fname, str), "fName should be string" + + def test_load_from_comprehensive_graph_csv(self): + """Test LOAD FROM CSV auto-infers typed values without explicit CAST + using example_dataset/comprehensive_graph/node_a.csv (pipe-delimited). + Verifies NeuG correctly infers: INT32, INT64, UINT32, UINT64, FLOAT, + DOUBLE, STRING, DATE, TIMESTAMP, INTERVAL from raw CSV. + """ + comprehensive_path = get_comprehensive_graph_path() + p = os.path.join(comprehensive_path, "node_a.csv") + if not os.path.exists(p): + pytest.skip(f"node_a.csv not found: {p}") + + result = self.conn.execute( + f'LOAD FROM "{p}" (delim="|") ' + 'RETURN id, i32_property, i64_property, u32_property, CAST(u64_property, "UINT64") as u64_property, ' + "f32_property, f64_property, str_property, " + "date_property, datetime_property, interval_property " + "ORDER BY id LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 # id: INT64 + assert rows[0][1] == -123456789 # i32_property: INT32 + assert rows[0][2] == 9223372036854775807 # i64_property: INT64_MAX + assert rows[0][3] == 4294967295 # u32_property: UINT32 + assert rows[0][4] == 18446744073709551615 # u64_property: UINT64 + assert abs(rows[0][5] - 3.1415927) < 1e-6 # f32_property: FLOAT + assert abs(rows[0][6] - 2.718281828459045) < 1e-9 # f64_property: DOUBLE + assert str(rows[0][7]) == "test_string_0" # str_property: STRING + assert str(rows[0][8]) == "2023-01-15" # date_property: DATE + assert str(rows[0][9]).startswith( + "2023-01-15 00:00:00" + ) # datetime_property: TIMESTAMP + assert ( + str(rows[0][10]) == "1year2months3days4hours5minutes6seconds" + ) # interval_property: INTERVAL + + # --- rel_a.csv --- + # Row 0: node_a.id=0, node_a.id=3, double_weight=3.141593, i32_weight=42, + # i64_weight=-1234567890123456789, datetime_weight=2023-05-17 + # First two columns share name 'node_a.id'; rename them to avoid conflict. + p_rel = os.path.join(comprehensive_path, "rel_a.csv") + if not os.path.exists(p_rel): + pytest.skip(f"rel_a.csv not found: {p_rel}") + + result = self.conn.execute( + f'LOAD FROM "{p_rel}" (delim="|") ' + "RETURN f0 as src_id, f1 as dst_id, f2 as double_weight, f3 as i32_weight, " + "f4 as i64_weight, f5 as datetime_weight " + "LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 + assert rows[0][1] == 3 + assert abs(rows[0][2] - 3.141593) < 1e-9 # double_weight: DOUBLE + assert rows[0][3] == 42 # i32_weight: INT32 + assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 + assert str(rows[0][5]) == "2023-05-17" # datetime_weight: DATE + + @parquet_test + def test_load_from_comprehensive_graph_parquet(self): + """Test LOAD FROM Parquet covers all NeuG-supported data types + using example_dataset/comprehensive_graph/parquet/. + node_a covers: INT64, INT32, UINT32, UINT64, FLOAT, DOUBLE, STRING, DATE, DATETIME. + rel_a covers: edge src/dst IDs plus DOUBLE, INT32, INT64, DATETIME edge properties. + interval_property is excluded from node files: no standard Parquet type maps to NeuG INTERVAL. + Run example_dataset/comprehensive_graph/csv_to_parquet.py to generate parquet/ first. + """ + comprehensive_path = get_comprehensive_graph_path() + parquet_dir = os.path.join(comprehensive_path, "parquet") + if not os.path.exists(parquet_dir): + pytest.skip( + f"parquet/ dir not found: {parquet_dir}. " + "Run example_dataset/comprehensive_graph/csv_to_parquet.py to generate." + ) + + self.conn.execute("load parquet") + + # --- node_a.parquet --- + # 11 rows; row 0 from node_a.csv: + # id=0, i32_property=-123456789, i64_property=9223372036854775807(INT64_MAX), + # u32_property=4294967295, u64_property=18446744073709551615, + # f32_property=3.1415927, f64_property=2.718281828459045, + # str_property=test_string_0, date_property=2023-01-15, datetime_property=2023-01-15, + # interval_property=1year2months3days4hours5minutes6seconds (stored as Parquet STRING) + p = os.path.join(parquet_dir, "node_a.parquet") + result = self.conn.execute( + f'LOAD FROM "{p}" ' + f"RETURN id, i32_property, i64_property, u32_property, u64_property, " + f"f32_property, f64_property, str_property, date_property, datetime_property, interval_property " + f"LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 # id: INT64 + assert rows[0][1] == -123456789 # i32_property: INT32 + assert rows[0][2] == 9223372036854775807 # i64_property: INT64_MAX + assert rows[0][3] == 4294967295 # u32_property: UINT32 + assert rows[0][4] == 18446744073709551615 # u64_property: UINT64_MAX + assert abs(rows[0][5] - 3.1415927) < 1e-6 # f32_property: FLOAT32 + assert abs(rows[0][6] - 2.718281828459045) < 1e-9 # f64_property: DOUBLE + assert str(rows[0][7]) == "test_string_0" # str_property: STRING + assert str(rows[0][8]) == "2023-01-15" # date_property: DATE + assert str(rows[0][9]).startswith( + "2023-01-15 00:00:00" + ) # datetime_property: TIMESTAMP + assert ( + str(rows[0][10]) == "1year2months3days4hours5minutes6seconds" + ) # interval_property: STRING for now + + # --- rel_a.parquet --- + # 10 rows; row 0 from rel_a.csv: + # src_id=0, dst_id=3, double_weight=3.141593, i32_weight=42, + # i64_weight=-1234567890123456789, datetime_weight=2023-05-17 + p = os.path.join(parquet_dir, "rel_a.parquet") + result = self.conn.execute( + f'LOAD FROM "{p}" ' + f"RETURN src_id, dst_id, double_weight, i32_weight, i64_weight, datetime_weight " + f"ORDER BY src_id LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 and rows[0][1] == 3 # src_id, dst_id: INT64 + assert abs(rows[0][2] - 3.141593) < 1e-9 # double_weight: DOUBLE + assert rows[0][3] == 42 # i32_weight: INT32 + assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 + assert str(rows[0][5]).startswith( + "2023-05-17 00:00:00" + ) # datetime_weight: TIMESTAMP + class TestCopyFrom: """Test cases for COPY FROM functionality with schema creation and data verification.""" @@ -1291,3 +1626,339 @@ def test_copy_from_node_reordered_all_columns(self): assert records[0][0] == 0 and records[0][2] == "Alice" and records[0][1] == 35 assert records[1][0] == 2 and records[1][2] == "Bob" and records[1][1] == 30 assert records[2][0] == 3 and records[2][2] == "Carol" and records[2][1] == 45 + + @parquet_test + def test_copy_from_node_parquet_with_column_remapping(self): + parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + if not os.path.exists(parquet_path): + pytest.skip(f"Parquet file not found: {parquet_path}") + + create_schema = """ + CREATE NODE TABLE person_parquet_remap ( + ID INT64, + age INT64, + fName STRING, + gender INT64, + eyeSight DOUBLE, + isStudent BOOLEAN, + PRIMARY KEY (ID) + ) + """ + + self.conn.execute(create_schema) + + self.conn.execute("load parquet") + + copy_query = f""" + COPY person_parquet_remap FROM ( + LOAD FROM "{parquet_path}" + RETURN ID, age, fName, gender, eyeSight, isStudent + ) + """ + self.conn.execute(copy_query) + + query = "MATCH (p:person_parquet_remap) RETURN p.ID, p.age, p.fName, p.gender, p.eyeSight ORDER BY p.ID LIMIT 3" + result = self.conn.execute(query) + + records = list(result) + + assert len(records) >= 3, "Should have loaded at least 3 persons" + # Verify first record (ID=0, Alice, age=35, eyeSight=5.0) + assert records[0][0] == 0, "First person ID should be 0" + assert records[0][1] == 35, "Alice's age should be 35" + assert records[0][2] == "Alice", "First person name should be Alice" + assert records[0][3] == 1, "Alice's gender should be 1" + assert records[0][4] == 5.0, "Alice's eyeSight should be 5.0" + + @parquet_test + def test_copy_from_edge_parquet_with_column_remapping(self): + """Test COPY FROM for edge table with column remapping using Parquet files.""" + person_parquet = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") + meets_parquet = os.path.join(self.tinysnb_path, "parquet", "eMeets.parquet") + if not os.path.exists(person_parquet) or not os.path.exists(meets_parquet): + pytest.skip("Parquet files not found") + + # Load parquet extension + self.conn.execute("load parquet") + + # Create node table schema + create_person_schema = """ + CREATE NODE TABLE person ( + ID INT64, + fName STRING, + gender INT64, + age INT64, + PRIMARY KEY (ID) + ) + """ + self.conn.execute(create_person_schema) + + # Create edge table schema + # Parquet file order: from, to, location, times, data + # Schema order: from, to, times, location, data (different order) + create_meets_schema = """ + CREATE REL TABLE meets ( + FROM person TO person, + times INT64, + location STRING, + data STRING + ) + """ + self.conn.execute(create_meets_schema) + + # Copy person nodes first + copy_person = f""" + COPY person FROM ( + LOAD FROM "{person_parquet}" + RETURN ID, fName, gender, age + ) + """ + self.conn.execute(copy_person) + + # Copy meets edges with column remapping + # Parquet: from, to, location, times, data + # Schema expects: from, to, times, location, data + copy_meets = f""" + COPY meets FROM ( + LOAD FROM "{meets_parquet}" + RETURN from, to, times, location, data + ) + """ + self.conn.execute(copy_meets) + + # Verify data with MATCH query + query = """ + MATCH (a:person)-[m:meets]->(b:person) + RETURN a.ID, b.ID, m.times, m.location + ORDER BY a.ID, b.ID + LIMIT 3 + """ + result = self.conn.execute(query) + records = list(result) + + assert len(records) > 0, "Should have loaded at least one meets relationship" + # Verify first relationship (0->2, Alice meets Bob) + assert records[0][0] == 0, "Source person ID should be 0" + assert records[0][1] == 2, "Target person ID should be 2" + assert records[0][2] == 5, "Times should be 5" + assert records[0][3] is not None, "Location should not be None" + + def test_copy_from_comprehensive_graph_csv(self): + """Test COPY FROM CSV using comprehensive_graph node_a.csv (node) and rel_a.csv (edge). + Covers all NeuG types: INT32, INT64, UINT32, UINT64, FLOAT, DOUBLE, + STRING, DATE, TIMESTAMP, INTERVAL for nodes; + DOUBLE, INT32, INT64, TIMESTAMP for edges. + """ + comprehensive_path = get_comprehensive_graph_path() + node_csv = os.path.join(comprehensive_path, "node_a.csv") + rel_csv = os.path.join(comprehensive_path, "rel_a.csv") + if not os.path.exists(node_csv): + pytest.skip(f"node_a.csv not found: {node_csv}") + + # --- node: CREATE + COPY --- + self.conn.execute( + """ + CREATE NODE TABLE cg_node_a ( + id INT64, + i32_property INT32, + i64_property INT64, + u32_property UINT32, + u64_property UINT64, + f32_property FLOAT, + f64_property DOUBLE, + str_property STRING, + date_property DATE, + datetime_property TIMESTAMP, + interval_property INTERVAL, + PRIMARY KEY (id) + ) + """ + ) + self.conn.execute( + f""" + COPY cg_node_a FROM ( + LOAD FROM "{node_csv}" (delim="|") + RETURN id, CAST(i32_property, 'INT32') as i32_property, + i64_property, CAST(u32_property, 'UINT32') as u32_property, CAST(u64_property, 'UINT64') as u64_property, + CAST(f32_property, 'FLOAT') as f32_property, f64_property, str_property, + date_property, datetime_property, interval_property + ) + """ + ) + + result = self.conn.execute( + "MATCH (n:cg_node_a) WHERE n.id = 0 " + "RETURN n.id, n.i32_property, n.i64_property, n.u32_property, n.u64_property, " + "n.f32_property, n.f64_property, n.str_property, n.date_property, " + "n.datetime_property, n.interval_property" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 # id: INT64 + assert rows[0][1] == -123456789 # i32_property: INT32 + assert rows[0][2] == 9223372036854775807 # i64_property: INT64_MAX + assert rows[0][3] == 4294967295 # u32_property: UINT32 + assert ( + abs(rows[0][4] - 1.8446744073709552e19) < 1.0 + ) # u64_property: UINT64_MAX (as float) + assert abs(rows[0][5] - 3.1415927) < 1e-6 # f32_property: FLOAT + assert abs(rows[0][6] - 2.718281828459045) < 1e-9 # f64_property: DOUBLE + assert rows[0][7] == "test_string_0" # str_property: STRING + assert str(rows[0][8]) == "2023-01-15" # date_property: DATE + assert str(rows[0][9]).startswith("2023-01-15") # datetime_property: TIMESTAMP + assert rows[0][10] is not None # interval_property: INTERVAL + + # --- edge: CREATE + COPY --- + if not os.path.exists(rel_csv): + return + self.conn.execute( + """ + CREATE REL TABLE cg_rel_a ( + FROM cg_node_a TO cg_node_a, + double_weight DOUBLE, + i32_weight INT32, + i64_weight INT64, + datetime_weight TIMESTAMP + ) + """ + ) + # rel_a.csv has duplicate 'node_a.id' column names; rename them to avoid conflict + # need some explicit casting (e.g., f3 to INT32, otherwise, it will be imported as int64) + self.conn.execute( + f""" + COPY cg_rel_a FROM ( + LOAD FROM "{rel_csv}" (delim="|") + RETURN f0 as src, f1 as dst, + f2 as double_weight, CAST(f3, 'INT32') as i32_weight, + f4 as i64_weight, CAST(f5, 'TIMESTAMP') as datetime_weight + ) + """ + ) + + result = self.conn.execute( + "MATCH (a:cg_node_a)-[r:cg_rel_a]->(b:cg_node_a) WHERE a.id = 0 " + "RETURN a.id, b.id, r.double_weight, r.i32_weight, r.i64_weight, r.datetime_weight " + "LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 and rows[0][1] == 3 # src_id=0, dst_id=3 + assert abs(rows[0][2] - 3.141593) < 1e-9 # double_weight: DOUBLE + assert rows[0][3] == 42 # i32_weight: INT32 + assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 + assert str(rows[0][5]).startswith( + "2023-05-17 00:00:00" + ) # datetime_weight: TIMESTAMP + + @parquet_test + def test_copy_from_comprehensive_graph_parquet(self): + """Test COPY FROM Parquet using comprehensive_graph node_a.parquet (node) + and rel_a.parquet (edge). + interval_property is stored as STRING in Parquet (no native Parquet interval type). + Run example_dataset/comprehensive_graph/csv_to_parquet.py first. + """ + comprehensive_path = get_comprehensive_graph_path() + parquet_dir = os.path.join(comprehensive_path, "parquet") + node_parquet = os.path.join(parquet_dir, "node_a.parquet") + rel_parquet = os.path.join(parquet_dir, "rel_a.parquet") + if not os.path.exists(parquet_dir): + pytest.skip( + f"parquet/ not found: {parquet_dir}. " + "Run example_dataset/comprehensive_graph/csv_to_parquet.py to generate." + ) + + self.conn.execute("load parquet") + + # --- node: CREATE + COPY --- + # interval_property is not supported yet. Defined it as STRING + self.conn.execute( + """ + CREATE NODE TABLE cg_node_a ( + id INT64, + i32_property INT32, + i64_property INT64, + u32_property UINT32, + u64_property UINT64, + f32_property FLOAT, + f64_property DOUBLE, + str_property STRING, + date_property DATE, + datetime_property TIMESTAMP, + interval_property STRING, + PRIMARY KEY (id) + ) + """ + ) + self.conn.execute( + f""" + COPY cg_node_a FROM ( + LOAD FROM "{node_parquet}" + RETURN id, i32_property, i64_property, u32_property, u64_property, + f32_property, f64_property, str_property, + date_property, datetime_property, interval_property + ) + """ + ) + + result = self.conn.execute( + "MATCH (n:cg_node_a) WHERE n.id = 0 " + "RETURN n.id, n.i32_property, n.i64_property, n.u32_property, n.u64_property, " + "n.f32_property, n.f64_property, n.str_property, n.date_property, " + "n.datetime_property, n.interval_property" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 # id: INT64 + assert rows[0][1] == -123456789 # i32_property: INT32 + assert rows[0][2] == 9223372036854775807 # i64_property: INT64_MAX + assert rows[0][3] == 4294967295 # u32_property: UINT32 + assert ( + abs(rows[0][4] - 1.8446744073709552e19) < 1.0 + ) # u64_property: UINT64_MAX (as float) + assert abs(rows[0][5] - 3.1415927) < 1e-6 # f32_property: FLOAT + assert abs(rows[0][6] - 2.718281828459045) < 1e-9 # f64_property: DOUBLE + assert rows[0][7] == "test_string_0" # str_property: STRING + assert str(rows[0][8]) == "2023-01-15" # date_property: DATE + assert str(rows[0][9]).startswith( + "2023-01-15 00:00:00" + ) # datetime_property: TIMESTAMP + assert ( + rows[0][10] == "1year2months3days4hours5minutes6seconds" + ) # interval_property: STRING + + # --- edge: CREATE + COPY --- + self.conn.execute( + """ + CREATE REL TABLE cg_rel_a ( + FROM cg_node_a TO cg_node_a, + double_weight DOUBLE, + i32_weight INT32, + i64_weight INT64, + datetime_weight TIMESTAMP + ) + """ + ) + # rel_a.parquet has src_id / dst_id (renamed from node_a.id during preprocessing) + self.conn.execute( + f""" + COPY cg_rel_a FROM ( + LOAD FROM "{rel_parquet}" + RETURN src_id, dst_id, double_weight, i32_weight, i64_weight, datetime_weight + ) + """ + ) + + result = self.conn.execute( + "MATCH (a:cg_node_a)-[r:cg_rel_a]->(b:cg_node_a) WHERE a.id = 0 " + "RETURN a.id, b.id, r.double_weight, r.i32_weight, r.i64_weight, r.datetime_weight " + "LIMIT 1" + ) + rows = list(result) + assert len(rows) == 1 + assert rows[0][0] == 0 and rows[0][1] == 3 # src_id=0, dst_id=3 + assert abs(rows[0][2] - 3.141593) < 1e-9 # double_weight: DOUBLE + assert rows[0][3] == 42 # i32_weight: INT32 + assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 + assert str(rows[0][5]).startswith( + "2023-05-17 00:00:00" + ) # datetime_weight: TIMESTAMP \ No newline at end of file From 5675122559e53725882a1a8ec1d15ad5cba6cbd7 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Mon, 16 Mar 2026 17:21:13 +0800 Subject: [PATCH 02/13] cmake: fix Arrow bundled ExternalProject builds under CMake 4.x --- cmake/BuildArrowAsThirdParty.cmake | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/cmake/BuildArrowAsThirdParty.cmake b/cmake/BuildArrowAsThirdParty.cmake index ea70cfa99..a31308b4f 100644 --- a/cmake/BuildArrowAsThirdParty.cmake +++ b/cmake/BuildArrowAsThirdParty.cmake @@ -128,6 +128,12 @@ function(build_arrow_as_third_party) cmake_policy(SET CMP0135 NEW) endif() + # CMake 3.30+ introduced CMP0169 which deprecates FetchContent_Populate() with declared details. + # CMake 4.x made it an error by default. Set to OLD to preserve the existing pattern. + if(POLICY CMP0169) + cmake_policy(SET CMP0169 OLD) + endif() + message(STATUS "Fetching Arrow ${ARROW_VERSION} from ${ARROW_SOURCE_URL}") fetchcontent_declare(Arrow @@ -139,6 +145,24 @@ function(build_arrow_as_third_party) FetchContent_GetProperties(Arrow) if(NOT arrow_POPULATED) FetchContent_Populate(Arrow) + # CMake 4.x removed support for cmake_minimum_required < 3.5. + # Arrow's bundled ExternalProjects (e.g. snappy) use EP_COMMON_CMAKE_ARGS + # which is built inside Arrow's ThirdpartyToolchain.cmake. We patch that + # file after fetch to append -DCMAKE_POLICY_VERSION_MINIMUM=3.5 so every + # ExternalProject sub-build can configure under CMake 4.x. + if(CMAKE_VERSION VERSION_GREATER_EQUAL "4.0") + set(_toolchain "${arrow_SOURCE_DIR}/cpp/cmake_modules/ThirdpartyToolchain.cmake") + file(READ "${_toolchain}" _toolchain_content) + string(FIND "${_toolchain_content}" "CMAKE_POLICY_VERSION_MINIMUM" _already_patched) + if(_already_patched EQUAL -1) + string(REPLACE + "# if building with a toolchain file, pass that through" + "# CMake 4.x: inject CMAKE_POLICY_VERSION_MINIMUM into all ExternalProject builds\nlist(APPEND EP_COMMON_CMAKE_ARGS \"-DCMAKE_POLICY_VERSION_MINIMUM=3.5\")\n\n# if building with a toolchain file, pass that through" + _toolchain_content "${_toolchain_content}") + file(WRITE "${_toolchain}" "${_toolchain_content}") + message(STATUS "Patched Arrow ThirdpartyToolchain.cmake for CMake 4.x compatibility") + endif() + endif() add_subdirectory(${arrow_SOURCE_DIR}/cpp ${arrow_BINARY_DIR} EXCLUDE_FROM_ALL) endif() From a8732e4902465ff494bbed4433f9bde597ff37ef Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Mon, 16 Mar 2026 17:26:06 +0800 Subject: [PATCH 03/13] minor fix --- tools/python_bind/tests/test_load.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/python_bind/tests/test_load.py b/tools/python_bind/tests/test_load.py index d82ca10c1..25a8a3809 100644 --- a/tools/python_bind/tests/test_load.py +++ b/tools/python_bind/tests/test_load.py @@ -1961,4 +1961,4 @@ def test_copy_from_comprehensive_graph_parquet(self): assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 assert str(rows[0][5]).startswith( "2023-05-17 00:00:00" - ) # datetime_weight: TIMESTAMP \ No newline at end of file + ) # datetime_weight: TIMESTAMP From 1fb1d56bdb8f4430fc340d0640a04195a0965d7e Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 17 Mar 2026 10:47:04 +0800 Subject: [PATCH 04/13] add parquet tests in ci workflows --- .github/workflows/build-extensions.yml | 6 +++--- .github/workflows/neug-extension-test.yml | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-extensions.yml b/.github/workflows/build-extensions.yml index b2504dce8..e3c723464 100644 --- a/.github/workflows/build-extensions.yml +++ b/.github/workflows/build-extensions.yml @@ -4,9 +4,9 @@ on: workflow_dispatch: inputs: extensions: - description: 'Semicolon-separated list of extensions to build (e.g., json)' + description: 'Semicolon-separated list of extensions to build (e.g., json;parquet)' required: false - default: 'json' + default: 'json;parquet' version: description: 'Version tag for the extensions (e.g., 0.1.0)' required: false @@ -51,7 +51,7 @@ jobs: if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then EXTENSIONS="${{ github.event.inputs.extensions }}" else - EXTENSIONS="json" + EXTENSIONS="json;parquet" fi echo "extensions=$EXTENSIONS" >> $GITHUB_OUTPUT echo "Building extensions: $EXTENSIONS" diff --git a/.github/workflows/neug-extension-test.yml b/.github/workflows/neug-extension-test.yml index 6ae62f2fe..672913c83 100644 --- a/.github/workflows/neug-extension-test.yml +++ b/.github/workflows/neug-extension-test.yml @@ -100,6 +100,7 @@ jobs: export TEST_PATH=${GITHUB_WORKSPACE}/tests cd build/neug_py_bind ctest -R json_extension_test -V + ctest -R parquet_extension_test -V export FLEX_DATA_DIR=${GITHUB_WORKSPACE}/example_dataset/tinysnb ctest -R test_extension -V @@ -110,5 +111,7 @@ jobs: pip3 install -r requirements_dev.txt export FLEX_DATA_DIR=${GITHUB_WORKSPACE}/example_dataset/tinysnb export NEUG_RUN_JSON_TESTS=true + export NEUG_RUN_PARQUET_TESTS=true GLOG_v=10 ./build/neug_py_bind/tools/utils/bulk_loader -g ../../example_dataset/tinysnb/graph.yaml -l ../../example_dataset/tinysnb/import.yaml -d /tmp/tinysnb - python3 -m pytest -sv tests/test_load.py -k "json" \ No newline at end of file + python3 -m pytest -sv tests/test_load.py -k "json" + python3 -m pytest -sv tests/test_load.py -k "parquet" From 2de213791fe308a6ff1711df5d40206c664ca0bf Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 17 Mar 2026 15:44:08 +0800 Subject: [PATCH 05/13] fix(deps): enable Arrow Parquet and Snappy codec support in CI build --- scripts/install_deps.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/install_deps.sh b/scripts/install_deps.sh index 8d25ee6cf..493c1c70d 100644 --- a/scripts/install_deps.sh +++ b/scripts/install_deps.sh @@ -225,13 +225,14 @@ install_arrow_from_source() { -DARROW_HDFS=OFF \ -DARROW_ORC=OFF \ -DARROW_JSON=ON \ - -DARROW_PARQUET=OFF \ + -DARROW_PARQUET=ON \ -DARROW_PLASMA=OFF \ -DARROW_PYTHON=OFF \ -DARROW_S3=OFF \ -DARROW_WITH_BZ2=OFF \ -DARROW_WITH_LZ4=OFF \ - -DARROW_WITH_SNAPPY=OFF \ + -DARROW_WITH_SNAPPY=ON \ + -DARROW_WITH_ZLIB=ON \ -DARROW_WITH_ZSTD=OFF \ -DARROW_WITH_BROTLI=OFF \ -DARROW_IPC=ON \ From ff02825102d95a05544862909e535c9b843fa89e Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 17 Mar 2026 16:06:14 +0800 Subject: [PATCH 06/13] fix: address PR review comments on parquet extension - Remove use_embedded_schema option (was declared but never read by buildFragmentOptions(); Arrow always uses the embedded Parquet schema by default and provides no simple API to override this) - Rename cache_decompressed -> enable_io_coalescing to accurately reflect what arrow::io::CacheOptions controls (I/O hole-filling / read coalescing, not decompressed column chunk caching); update comments and docs accordingly - Add comment in execFunc clarifying that ArrowReader::read() propagates all errors via exceptions, not silent logging - Remove TestOptionsTranslation_BufferedStream test which could not meaningfully verify the disabled case (Arrow exposes no getter for is_buffered_stream_enabled); rename TestOptionsTranslation_CacheOptions -> TestOptionsTranslation_IoCoalescing and update option keys to match the renamed ENABLE_IO_COALESCING option --- doc/source/extensions/load_parquet.md | 19 ++++----- extension/parquet/include/parquet_options.h | 12 +++--- .../parquet/include/parquet_read_function.h | 5 ++- extension/parquet/src/parquet_options.cc | 8 +++- extension/parquet/tests/parquet_test.cpp | 41 ++++--------------- 5 files changed, 33 insertions(+), 52 deletions(-) diff --git a/doc/source/extensions/load_parquet.md b/doc/source/extensions/load_parquet.md index 8e1717a01..fe1fad56e 100644 --- a/doc/source/extensions/load_parquet.md +++ b/doc/source/extensions/load_parquet.md @@ -22,13 +22,12 @@ LOAD PARQUET; The following options control how Parquet files are read: -| Option | Type | Default | Description | -| ---------------------- | ----- | ------- | ---------------------------------------------------------------------------------------------------- | -| `use_embedded_schema` | bool | `true` | Use the schema embedded in the Parquet file metadata. Set to `false` to infer schema independently. | -| `buffered_stream` | bool | `true` | Enable buffered I/O stream for improved sequential read performance. | -| `pre_buffer` | bool | `false` | Pre-buffer column data before decoding. Recommended for high-latency filesystems such as S3. | -| `cache_decompressed` | bool | `true` | Cache decompressed column chunks to accelerate repeated reads of the same data. | -| `parquet_batch_rows` | int64 | `65536` | Number of rows per Arrow record batch when converting Parquet row groups into in-memory batches. | +| Option | Type | Default | Description | +| ------------------------ | ----- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- | +| `buffered_stream` | bool | `true` | Enable buffered I/O stream for improved sequential read performance. | +| `pre_buffer` | bool | `false` | Pre-buffer column data before decoding. Recommended for high-latency filesystems such as S3. | +| `enable_io_coalescing` | bool | `true` | Enable Arrow I/O read coalescing (hole-filling cache) to reduce I/O overhead when reading non-contiguous byte ranges. When `true`, uses lazy coalescing; when `false`, uses eager coalescing. | +| `parquet_batch_rows` | int64 | `65536` | Number of rows per Arrow record batch when converting Parquet row groups into in-memory batches. | ### Query Examples @@ -50,12 +49,12 @@ LOAD FROM "person.parquet" (parquet_batch_rows=8192) RETURN *; ``` -#### Disabling Embedded Schema +#### Enabling I/O Coalescing -Force schema inference instead of using the schema stored in Parquet metadata: +Enable eager I/O coalescing for workloads that benefit from pre-fetching contiguous data: ```cypher -LOAD FROM "person.parquet" (use_embedded_schema=false) +LOAD FROM "person.parquet" (enable_io_coalescing=false) RETURN *; ``` diff --git a/extension/parquet/include/parquet_options.h b/extension/parquet/include/parquet_options.h index 2949cbe7f..0da07abca 100644 --- a/extension/parquet/include/parquet_options.h +++ b/extension/parquet/include/parquet_options.h @@ -30,22 +30,22 @@ namespace reader { * @brief Parquet-specific parse options * * These options control Parquet file reading behavior: - * - use_embedded_schema: Use schema from Parquet metadata (default: true) * - buffered_stream: Enable buffered I/O stream for better performance (default: true) * - pre_buffer: Pre-buffer data for high-latency filesystems like S3 (default: false) - * - cache_decompressed: Cache decompressed data for repeated reads (default: true) + * - enable_io_coalescing: Enable Arrow I/O read coalescing (hole-filling cache) for + * improved performance when reading non-contiguous ranges (default: true). + * When true, uses lazy coalescing (CacheOptions::LazyDefaults); when false, uses + * eager coalescing (CacheOptions::Defaults). * - row_batch_size: Number of rows per Arrow batch when converting from Parquet (default: 65536) * */ struct ParquetParseOptions { - Option use_embedded_schema = - Option::BoolOption("USE_EMBEDDED_SCHEMA", true); Option buffered_stream = Option::BoolOption("BUFFERED_STREAM", true); Option pre_buffer = Option::BoolOption("PRE_BUFFER", false); - Option cache_decompressed = - Option::BoolOption("CACHE_DECOMPRESSED", true); + Option enable_io_coalescing = + Option::BoolOption("ENABLE_IO_COALESCING", true); Option row_batch_size = Option::Int64Option("PARQUET_BATCH_ROWS", 65536); }; diff --git a/extension/parquet/include/parquet_read_function.h b/extension/parquet/include/parquet_read_function.h index a0c7ed247..1d0cbc5a5 100644 --- a/extension/parquet/include/parquet_read_function.h +++ b/extension/parquet/include/parquet_read_function.h @@ -60,7 +60,10 @@ struct ParquetReadFunction { auto reader = std::make_unique( state, std::move(optionsBuilder), fileInfo.fileSystem); - // Execute read operation + // Execute read operation. + // ArrowReader::read() throws exceptions (via THROW_IO_EXCEPTION / + // THROW_INVALID_ARGUMENT_EXCEPTION) on all Arrow error paths, so + // errors are propagated to the caller rather than silently swallowed. execution::Context ctx; auto localState = std::make_shared(); reader->read(localState, ctx); diff --git a/extension/parquet/src/parquet_options.cc b/extension/parquet/src/parquet_options.cc index 3463cb91a..2c54f559d 100644 --- a/extension/parquet/src/parquet_options.cc +++ b/extension/parquet/src/parquet_options.cc @@ -101,8 +101,12 @@ ArrowParquetOptionsBuilder::buildFragmentOptions() const { // Configure pre-buffering for high-latency filesystems arrow_reader_properties->set_pre_buffer(parquetOpts.pre_buffer.get(options)); - // Configure caching of decompressed data - if (parquetOpts.cache_decompressed.get(options)) { + // Configure caching via Arrow I/O coalescing (hole-filling cache). + // When enable_io_coalescing=true (default), use lazy coalescing which only + // loads explicitly-requested byte ranges (CacheOptions::LazyDefaults). + // When false, use eager coalescing which pre-fetches data more aggressively + // (CacheOptions::Defaults). + if (parquetOpts.enable_io_coalescing.get(options)) { arrow_reader_properties->set_cache_options( arrow::io::CacheOptions::LazyDefaults()); } else { diff --git a/extension/parquet/tests/parquet_test.cpp b/extension/parquet/tests/parquet_test.cpp index 8bd41273b..c0f8c6c3c 100644 --- a/extension/parquet/tests/parquet_test.cpp +++ b/extension/parquet/tests/parquet_test.cpp @@ -253,31 +253,6 @@ TEST_F(ParquetTest, TestOptionsTranslation_ParquetBatchRows) { << "Extension should translate PARQUET_BATCH_ROWS to Arrow batch_size"; } -TEST_F(ParquetTest, TestOptionsTranslation_BufferedStream) { - createSimpleParquetFile("test_buffered.parquet"); - - // Test BUFFERED_STREAM=false (default is true) - auto sharedState = createSharedState( - "test_buffered.parquet", - {"id", "name", "value"}, - {createInt64Type(), createStringType(), createDoubleType()}, - {{"BUFFERED_STREAM", "false"}}); - - reader::ArrowParquetOptionsBuilder optionsBuilder(sharedState); - auto options = optionsBuilder.build(); - - auto parquetFragmentOpts = std::dynamic_pointer_cast( - options.scanOptions->fragment_scan_options); - ASSERT_NE(parquetFragmentOpts, nullptr); - ASSERT_NE(parquetFragmentOpts->reader_properties, nullptr); - - // When BUFFERED_STREAM=false, buffered stream should NOT be enabled - // Note: We verify this by checking that the option was processed - // (Arrow doesn't expose a getter for is_buffered_stream_enabled) - EXPECT_NE(parquetFragmentOpts->reader_properties, nullptr) - << "Extension should configure reader_properties based on BUFFERED_STREAM option"; -} - TEST_F(ParquetTest, TestOptionsTranslation_PreBuffer) { createSimpleParquetFile("test_prebuffer.parquet"); @@ -324,15 +299,15 @@ TEST_F(ParquetTest, TestOptionsTranslation_UseThreads) { << "Extension should translate parallel=false to use_threads=false"; } -TEST_F(ParquetTest, TestOptionsTranslation_CacheOptions) { +TEST_F(ParquetTest, TestOptionsTranslation_IoCoalescing) { createSimpleParquetFile("test_cache.parquet"); - // Test CACHE_DECOMPRESSED=true (should use LazyDefaults) + // Test ENABLE_IO_COALESCING=true (default) — should use LazyDefaults (lazy=true) auto sharedState1 = createSharedState( "test_cache.parquet", {"id", "name", "value"}, {createInt64Type(), createStringType(), createDoubleType()}, - {{"CACHE_DECOMPRESSED", "true"}}); + {{"ENABLE_IO_COALESCING", "true"}}); reader::ArrowParquetOptionsBuilder optionsBuilder1(sharedState1); auto options1 = optionsBuilder1.build(); @@ -342,17 +317,17 @@ TEST_F(ParquetTest, TestOptionsTranslation_CacheOptions) { ASSERT_NE(parquetFragmentOpts1, nullptr); ASSERT_NE(parquetFragmentOpts1->arrow_reader_properties, nullptr); - // Verify cache options are set (extension should configure LazyDefaults vs Defaults) + // Verify lazy coalescing is enabled when ENABLE_IO_COALESCING=true auto cache_opts1 = parquetFragmentOpts1->arrow_reader_properties->cache_options(); EXPECT_TRUE(cache_opts1.lazy) - << "Extension should use LazyDefaults when CACHE_DECOMPRESSED=true"; + << "Extension should use LazyDefaults (lazy=true) when ENABLE_IO_COALESCING=true"; - // Test CACHE_DECOMPRESSED=false (should use Defaults) + // Test ENABLE_IO_COALESCING=false — should use Defaults (lazy=false, eager coalescing) auto sharedState2 = createSharedState( "test_cache.parquet", {"id", "name", "value"}, {createInt64Type(), createStringType(), createDoubleType()}, - {{"CACHE_DECOMPRESSED", "false"}}); + {{"ENABLE_IO_COALESCING", "false"}}); reader::ArrowParquetOptionsBuilder optionsBuilder2(sharedState2); auto options2 = optionsBuilder2.build(); @@ -363,7 +338,7 @@ TEST_F(ParquetTest, TestOptionsTranslation_CacheOptions) { auto cache_opts2 = parquetFragmentOpts2->arrow_reader_properties->cache_options(); EXPECT_FALSE(cache_opts2.lazy) - << "Extension should use Defaults when CACHE_DECOMPRESSED=false"; + << "Extension should use Defaults (lazy=false) when ENABLE_IO_COALESCING=false"; } TEST_F(ParquetTest, TestOptionsTranslation_DefaultValues) { From 03686a335e47c271dd1b2efffdea76391bcaebe7 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 18 Mar 2026 12:29:45 +0800 Subject: [PATCH 07/13] enable ARROW_WITH_SNAPPY, ARROW_WITH_ZLIB, and ARROW_WITH_ZSTD --- cmake/BuildArrowAsThirdParty.cmake | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/cmake/BuildArrowAsThirdParty.cmake b/cmake/BuildArrowAsThirdParty.cmake index a31308b4f..ff8d9cf7e 100644 --- a/cmake/BuildArrowAsThirdParty.cmake +++ b/cmake/BuildArrowAsThirdParty.cmake @@ -79,21 +79,16 @@ function(build_arrow_as_third_party) if(NOT DEFINED ARROW_PARQUET) set(ARROW_PARQUET OFF CACHE BOOL "" FORCE) endif() - # Enable Snappy and Zlib for Parquet if needed, otherwise disable them - if(ARROW_PARQUET) - set(ARROW_WITH_SNAPPY ON CACHE BOOL "" FORCE) - set(ARROW_WITH_ZLIB ON CACHE BOOL "" FORCE) - else() - set(ARROW_WITH_SNAPPY OFF CACHE BOOL "" FORCE) - set(ARROW_WITH_ZLIB OFF CACHE BOOL "" FORCE) - endif() - set(ARROW_PLASMA OFF CACHE BOOL "" FORCE) - set(ARROW_PYTHON OFF CACHE BOOL "" FORCE) - set(ARROW_S3 OFF CACHE BOOL "" FORCE) + # Enable Snappy and Zlib + set(ARROW_WITH_SNAPPY ON CACHE BOOL "" FORCE) + set(ARROW_WITH_ZLIB ON CACHE BOOL "" FORCE) set(ARROW_WITH_BZ2 OFF CACHE BOOL "" FORCE) set(ARROW_WITH_LZ4 OFF CACHE BOOL "" FORCE) - set(ARROW_WITH_ZSTD OFF CACHE BOOL "" FORCE) + set(ARROW_WITH_ZSTD ON CACHE BOOL "" FORCE) set(ARROW_WITH_BROTLI OFF CACHE BOOL "" FORCE) + set(ARROW_PLASMA OFF CACHE BOOL "" FORCE) + set(ARROW_PYTHON OFF CACHE BOOL "" FORCE) + set(ARROW_S3 OFF CACHE BOOL "" FORCE) set(ARROW_IPC ON CACHE BOOL "" FORCE) set(ARROW_BUILD_BENCHMARKS OFF CACHE BOOL "" FORCE) set(ARROW_BUILD_TESTS OFF CACHE BOOL "" FORCE) From 501e5f2ee154f8576bbbfd7482ca9e071ff47679 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 18 Mar 2026 12:37:32 +0800 Subject: [PATCH 08/13] some fix --- .github/workflows/neug-extension-test.yml | 4 +-- scripts/install_deps.sh | 2 +- tools/python_bind/tests/test_load.py | 31 ++++++++--------------- 3 files changed, 12 insertions(+), 25 deletions(-) diff --git a/.github/workflows/neug-extension-test.yml b/.github/workflows/neug-extension-test.yml index bda3209c7..84e4f3758 100644 --- a/.github/workflows/neug-extension-test.yml +++ b/.github/workflows/neug-extension-test.yml @@ -118,12 +118,12 @@ jobs: pip3 install -r requirements.txt pip3 install -r requirements_dev.txt export FLEX_DATA_DIR=${GITHUB_WORKSPACE}/example_dataset/tinysnb - export NEUG_RUN_PARQUET_TESTS=true GLOG_v=10 ./build/neug_py_bind/tools/utils/bulk_loader -g ../../example_dataset/tinysnb/graph.yaml -l ../../example_dataset/tinysnb/import.yaml -d /tmp/tinysnb export FLEX_DATA_DIR=${GITHUB_WORKSPACE}/example_dataset/comprehensive_graph GLOG_v=10 ./build/neug_py_bind/tools/utils/bulk_loader -g ../../example_dataset/comprehensive_graph/graph.yaml -l ../../example_dataset/comprehensive_graph/import.yaml -d /tmp/comprehensive_graph NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_load.py -k "json" NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_export.py -k "json" + NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_load.py -k "parquet" extension_tests_wheel_linux_x86_64: runs-on: [self-hosted, linux, x64] @@ -280,5 +280,3 @@ jobs: rm -rf ${TMP_DIR}/test_example GLOG_v=10 python3 tools/python_bind/example/simple_example.py example_dataset/modern_graph ${TMP_DIR}/test_example NEUG_RUN_EXTENSION_TESTS=1 python3 tools/python_bind/example/complex_test.py - - python3 -m pytest -sv tests/test_load.py -k "parquet" diff --git a/scripts/install_deps.sh b/scripts/install_deps.sh index 493c1c70d..e8aad75d5 100644 --- a/scripts/install_deps.sh +++ b/scripts/install_deps.sh @@ -233,7 +233,7 @@ install_arrow_from_source() { -DARROW_WITH_LZ4=OFF \ -DARROW_WITH_SNAPPY=ON \ -DARROW_WITH_ZLIB=ON \ - -DARROW_WITH_ZSTD=OFF \ + -DARROW_WITH_ZSTD=ON \ -DARROW_WITH_BROTLI=OFF \ -DARROW_IPC=ON \ -DARROW_BUILD_BENCHMARKS=OFF \ diff --git a/tools/python_bind/tests/test_load.py b/tools/python_bind/tests/test_load.py index 8c7e23c19..ca6997c7e 100644 --- a/tools/python_bind/tests/test_load.py +++ b/tools/python_bind/tests/test_load.py @@ -36,17 +36,6 @@ not EXTENSION_TESTS_ENABLED, reason="Extension tests disabled by default; set NEUG_RUN_EXTENSION_TESTS=1 to enable.", ) -PARQUET_TESTS_ENABLED = os.environ.get("NEUG_RUN_PARQUET_TESTS", "").lower() in ( - "1", - "true", - "yes", - "on", -) -parquet_test = pytest.mark.skipif( - not PARQUET_TESTS_ENABLED, - reason="Parquet tests disabled by default; set NEUG_RUN_PARQUET_TESTS=1 to enable.", -) - def get_tinysnb_dataset_path(): """Get the path to tinysnb dataset CSV files.""" @@ -984,7 +973,7 @@ def test_load_from_jsonl_with_complex_where_conditions(self): assert height > 1.0, f"height {height} should be > 1.0" assert isinstance(fname, str), "fName should be string" - @parquet_test + @extension_test def test_load_from_parquet_basic_return_all(self): """Test basic LOAD FROM Parquet with RETURN *.""" # load vertex data @@ -1031,7 +1020,7 @@ def test_load_from_parquet_basic_return_all(self): first_record = records[0] assert len(first_record) == 5, f"Expected 5 columns, got {len(first_record)}" - @parquet_test + @extension_test def test_load_from_parquet_return_specific_columns(self): """Test LOAD FROM Parquet with column projection.""" parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") @@ -1055,7 +1044,7 @@ def test_load_from_parquet_return_specific_columns(self): assert isinstance(first_record[0], str), "fName should be string" assert isinstance(first_record[1], int), "age should be integer" - @parquet_test + @extension_test def test_load_from_parquet_with_where(self): """Test LOAD FROM Parquet with WHERE clause filtering (predicate pushdown).""" parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") @@ -1081,7 +1070,7 @@ def test_load_from_parquet_with_where(self): assert age > 30, f"Age {age} should be greater than 30" assert isinstance(fname, str), "fName should be string" - @parquet_test + @extension_test def test_load_from_parquet_with_multiple_where_conditions(self): """Test LOAD FROM Parquet with multiple WHERE conditions.""" parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") @@ -1109,7 +1098,7 @@ def test_load_from_parquet_with_multiple_where_conditions(self): assert isinstance(fname, str), "fName should be string" assert isinstance(eye_sight, (int, float)), "eyeSight should be numeric" - @parquet_test + @extension_test def test_load_from_parquet_with_order_by(self): """Test LOAD FROM Parquet with ORDER BY clause.""" parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") @@ -1133,7 +1122,7 @@ def test_load_from_parquet_with_order_by(self): ages = [record[1] for record in records] assert ages == sorted(ages), f"Ages should be sorted ascending: {ages}" - @parquet_test + @extension_test def test_load_from_parquet_with_complex_where_conditions(self): """Test LOAD FROM Parquet with complex WHERE conditions (age, eyeSight, height).""" parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") @@ -1221,7 +1210,7 @@ def test_load_from_comprehensive_graph_csv(self): assert rows[0][4] == -1234567890123456789 # i64_weight: INT64 assert str(rows[0][5]) == "2023-05-17" # datetime_weight: DATE - @parquet_test + @extension_test def test_load_from_comprehensive_graph_parquet(self): """Test LOAD FROM Parquet covers all NeuG-supported data types using example_dataset/comprehensive_graph/parquet/. @@ -1627,7 +1616,7 @@ def test_copy_from_node_reordered_all_columns(self): assert records[1][0] == 2 and records[1][2] == "Bob" and records[1][1] == 30 assert records[2][0] == 3 and records[2][2] == "Carol" and records[2][1] == 45 - @parquet_test + @extension_test def test_copy_from_node_parquet_with_column_remapping(self): parquet_path = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") if not os.path.exists(parquet_path): @@ -1670,7 +1659,7 @@ def test_copy_from_node_parquet_with_column_remapping(self): assert records[0][3] == 1, "Alice's gender should be 1" assert records[0][4] == 5.0, "Alice's eyeSight should be 5.0" - @parquet_test + @extension_test def test_copy_from_edge_parquet_with_column_remapping(self): """Test COPY FROM for edge table with column remapping using Parquet files.""" person_parquet = os.path.join(self.tinysnb_path, "parquet", "vPerson.parquet") @@ -1850,7 +1839,7 @@ def test_copy_from_comprehensive_graph_csv(self): "2023-05-17 00:00:00" ) # datetime_weight: TIMESTAMP - @parquet_test + @extension_test def test_copy_from_comprehensive_graph_parquet(self): """Test COPY FROM Parquet using comprehensive_graph node_a.parquet (node) and rel_a.parquet (edge). From 96907b6a32dcfda352ee5087f1e5fc223786fc4f Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 18 Mar 2026 13:30:22 +0800 Subject: [PATCH 09/13] format --- tools/python_bind/tests/test_load.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/python_bind/tests/test_load.py b/tools/python_bind/tests/test_load.py index ca6997c7e..465ab7026 100644 --- a/tools/python_bind/tests/test_load.py +++ b/tools/python_bind/tests/test_load.py @@ -37,6 +37,7 @@ reason="Extension tests disabled by default; set NEUG_RUN_EXTENSION_TESTS=1 to enable.", ) + def get_tinysnb_dataset_path(): """Get the path to tinysnb dataset CSV files.""" # Try to get from environment variable first From ec524302a3c0e6b4ca4fe81d7d43dae5b648948a Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 18 Mar 2026 14:33:28 +0800 Subject: [PATCH 10/13] run parquet in complex test only when enabled --- tools/python_bind/example/complex_test.py | 28 ++++++++++++++--------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/tools/python_bind/example/complex_test.py b/tools/python_bind/example/complex_test.py index 36284e6ee..8c165ee79 100644 --- a/tools/python_bind/example/complex_test.py +++ b/tools/python_bind/example/complex_test.py @@ -659,18 +659,24 @@ def _network_stats(): # ================================================================ section("6. Extensions — Parquet Extension (Install / Load / Query)") -conn_parquet = None -db_path_parquet = tempfile.mkdtemp(prefix="neug_parquet_ext_") -try: - db_parquet = neug.Database(db_path_parquet) - conn_parquet = db_parquet.connect() - ok(f"Created persistent database for Parquet extension test at {db_path_parquet}") -except Exception as e: - fail("Create database for Parquet extension", e) - db_parquet = None +_run_ext_tests = os.environ.get("NEUG_RUN_EXTENSION_TESTS", "").strip().lower() +_run_ext_tests = _run_ext_tests in ("1", "true", "on", "yes") + +if not _run_ext_tests: + print(" (skipped: set NEUG_RUN_EXTENSION_TESTS=1 to run extension tests)") +else: + conn_parquet = None + db_path_parquet = tempfile.mkdtemp(prefix="neug_parquet_ext_") + try: + db_parquet = neug.Database(db_path_parquet) + conn_parquet = db_parquet.connect() + ok(f"Created persistent database for Parquet extension test at {db_path_parquet}") + except Exception as e: + fail("Create database for Parquet extension", e) + db_parquet = None -if db_parquet is not None and conn_parquet is not None: - run_parquet_extension_suite(db_parquet, conn_parquet, db_path_parquet) + if db_parquet is not None and conn_parquet is not None: + run_parquet_extension_suite(db_parquet, conn_parquet, db_path_parquet) # ================================================================ # Summary From df9eb377b0dbd64246e92812f94bbccfbf6d5d62 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 18 Mar 2026 15:43:03 +0800 Subject: [PATCH 11/13] format --- tools/python_bind/example/complex_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/python_bind/example/complex_test.py b/tools/python_bind/example/complex_test.py index 8c165ee79..d4f29270f 100644 --- a/tools/python_bind/example/complex_test.py +++ b/tools/python_bind/example/complex_test.py @@ -670,7 +670,9 @@ def _network_stats(): try: db_parquet = neug.Database(db_path_parquet) conn_parquet = db_parquet.connect() - ok(f"Created persistent database for Parquet extension test at {db_path_parquet}") + ok( + f"Created persistent database for Parquet extension test at {db_path_parquet}" + ) except Exception as e: fail("Create database for Parquet extension", e) db_parquet = None From 5bd8466d6af155c122b1759bbcac239988e8070b Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Thu, 19 Mar 2026 12:05:41 +0800 Subject: [PATCH 12/13] remove install parquet in complex test --- tools/python_bind/example/complex_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/python_bind/example/complex_test.py b/tools/python_bind/example/complex_test.py index d4f29270f..e2acccb32 100644 --- a/tools/python_bind/example/complex_test.py +++ b/tools/python_bind/example/complex_test.py @@ -232,7 +232,6 @@ def _projection(rows): def run_parquet_extension_suite(db_parquet, conn_parquet, db_path_parquet): statements = [ - ("INSTALL PARQUET succeeded", "INSTALL PARQUET;"), ("LOAD PARQUET succeeded", "LOAD PARQUET;"), ] From 2dc7466a19f1f0f11579f3e71ad639d344e86486 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Fri, 20 Mar 2026 16:01:42 +0800 Subject: [PATCH 13/13] minor fix --- CMakeLists.txt | 2 +- cmake/BuildArrowAsThirdParty.cmake | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2a5391b49..0786ecd64 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -595,7 +595,7 @@ endif() # Configure Arrow Parquet support if parquet extension is enabled if(BUILD_EXTENSIONS AND "parquet" IN_LIST BUILD_EXTENSIONS) - set(ARROW_PARQUET ON CACHE BOOL "" FORCE) + set(ARROW_ENABLE_PARQUET ON CACHE BOOL "" FORCE) message(STATUS "Arrow Parquet support enabled for parquet extension") endif() diff --git a/cmake/BuildArrowAsThirdParty.cmake b/cmake/BuildArrowAsThirdParty.cmake index ff8d9cf7e..86459f968 100644 --- a/cmake/BuildArrowAsThirdParty.cmake +++ b/cmake/BuildArrowAsThirdParty.cmake @@ -75,8 +75,11 @@ function(build_arrow_as_third_party) if(NOT DEFINED ARROW_JSON) set(ARROW_JSON OFF CACHE BOOL "" FORCE) endif() - # ARROW_PARQUET is set by the main CMakeLists.txt if parquet extension is enabled - if(NOT DEFINED ARROW_PARQUET) + # Translate ARROW_ENABLE_PARQUET (our flag set in CMakeLists.txt) to Arrow's + # own ARROW_PARQUET variable so Arrow actually builds Parquet support. + if(ARROW_ENABLE_PARQUET) + set(ARROW_PARQUET ON CACHE BOOL "" FORCE) + else() set(ARROW_PARQUET OFF CACHE BOOL "" FORCE) endif() # Enable Snappy and Zlib @@ -311,7 +314,7 @@ function(build_arrow_as_third_party) PATTERN "testing" EXCLUDE) # Install Parquet headers if Parquet is enabled - if(ARROW_PARQUET) + if(ARROW_ENABLE_PARQUET) install(DIRECTORY ${arrow_SOURCE_DIR}/cpp/src/parquet DESTINATION include FILES_MATCHING PATTERN "*.h"