Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build-extensions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ on:
workflow_dispatch:
inputs:
extensions:
description: 'Semicolon-separated list of extensions to build (e.g., json)'
description: 'Semicolon-separated list of extensions to build (e.g., json;parquet)'
required: false
default: 'json'
default: 'json;parquet'
version:
description: 'Version tag for the extensions (e.g., 0.1.0)'
required: false
Expand Down Expand Up @@ -51,7 +51,7 @@ jobs:
if [ "${{ github.event_name }}" == "workflow_dispatch" ]; then
EXTENSIONS="${{ github.event.inputs.extensions }}"
else
EXTENSIONS="json"
EXTENSIONS="json;parquet"
fi
echo "extensions=$EXTENSIONS" >> $GITHUB_OUTPUT
echo "Building extensions: $EXTENSIONS"
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/neug-extension-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
export TEST_PATH=${GITHUB_WORKSPACE}/tests
cd build/neug_py_bind
ctest -R json_extension_test -V
ctest -R parquet_extension_test -V
export FLEX_DATA_DIR=${GITHUB_WORKSPACE}/example_dataset/tinysnb
ctest -R test_extension -V

Expand All @@ -122,6 +123,7 @@ jobs:
GLOG_v=10 ./build/neug_py_bind/tools/utils/bulk_loader -g ../../example_dataset/comprehensive_graph/graph.yaml -l ../../example_dataset/comprehensive_graph/import.yaml -d /tmp/comprehensive_graph
NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_load.py -k "json"
NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_export.py -k "json"
NEUG_RUN_EXTENSION_TESTS=true python3 -m pytest -sv tests/test_load.py -k "parquet"

extension_tests_wheel_linux_x86_64:
runs-on: [self-hosted, linux, x64]
Expand Down
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,12 @@ if(BUILD_EXTENSIONS AND "json" IN_LIST BUILD_EXTENSIONS)
message(STATUS "Arrow JSON support enabled for json extension")
endif()

# Configure Arrow Parquet support if parquet extension is enabled
if(BUILD_EXTENSIONS AND "parquet" IN_LIST BUILD_EXTENSIONS)
set(ARROW_PARQUET ON CACHE BOOL "" FORCE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ARROW_PARQUET itself is a guard in Arrow CMakeList, maybe we should use a different name here, i.e. ARROW_ENABLE_PARQUET

message(STATUS "Arrow Parquet support enabled for parquet extension")
endif()

set(NEUG_USE_SYSTEM_ARROW OFF)
if(DEFINED ENV{CI} AND "$ENV{CI}" STREQUAL "ON")
set(NEUG_USE_SYSTEM_ARROW ON)
Expand Down
87 changes: 80 additions & 7 deletions cmake/BuildArrowAsThirdParty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ function(build_arrow_as_third_party)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=uninitialized")
endif()
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
# Thrift (Arrow-parquet dependency) emits these warnings
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=unused-function")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=stringop-truncation")

set(ARROW_BUILD_SHARED OFF CACHE BOOL "" FORCE)
set(ARROW_BUILD_STATIC ON CACHE BOOL "" FORCE)
Expand All @@ -72,15 +75,20 @@ function(build_arrow_as_third_party)
if(NOT DEFINED ARROW_JSON)
set(ARROW_JSON OFF CACHE BOOL "" FORCE)
endif()
set(ARROW_PARQUET OFF CACHE BOOL "" FORCE)
set(ARROW_PLASMA OFF CACHE BOOL "" FORCE)
set(ARROW_PYTHON OFF CACHE BOOL "" FORCE)
set(ARROW_S3 OFF CACHE BOOL "" FORCE)
# ARROW_PARQUET is set by the main CMakeLists.txt if parquet extension is enabled
if(NOT DEFINED ARROW_PARQUET)
set(ARROW_PARQUET OFF CACHE BOOL "" FORCE)
endif()
# Enable Snappy and Zlib
set(ARROW_WITH_SNAPPY ON CACHE BOOL "" FORCE)
set(ARROW_WITH_ZLIB ON CACHE BOOL "" FORCE)
set(ARROW_WITH_BZ2 OFF CACHE BOOL "" FORCE)
set(ARROW_WITH_LZ4 OFF CACHE BOOL "" FORCE)
set(ARROW_WITH_SNAPPY OFF CACHE BOOL "" FORCE)
set(ARROW_WITH_ZSTD OFF CACHE BOOL "" FORCE)
set(ARROW_WITH_ZSTD ON CACHE BOOL "" FORCE)
set(ARROW_WITH_BROTLI OFF CACHE BOOL "" FORCE)
set(ARROW_PLASMA OFF CACHE BOOL "" FORCE)
set(ARROW_PYTHON OFF CACHE BOOL "" FORCE)
set(ARROW_S3 OFF CACHE BOOL "" FORCE)
set(ARROW_IPC ON CACHE BOOL "" FORCE)
set(ARROW_BUILD_BENCHMARKS OFF CACHE BOOL "" FORCE)
set(ARROW_BUILD_TESTS OFF CACHE BOOL "" FORCE)
Expand All @@ -104,7 +112,6 @@ function(build_arrow_as_third_party)
# Point Arrow to use the project's RapidJSON
set(RapidJSON_ROOT "${CMAKE_SOURCE_DIR}/third_party/rapidjson" CACHE PATH "" FORCE)
endif()
set(ARROW_WITH_ZLIB OFF CACHE BOOL "" FORCE)
set(ARROW_ENABLE_THREADING ON CACHE BOOL "" FORCE)

Comment on lines 112 to 116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 cmake_policy(SET CMP0169 OLD) suppresses a CMake 4.x deprecation warning but defers the real fix

Setting CMP0169 OLD silences the deprecation of the legacy FetchContent_Populate() pattern, but this is marked as an error in CMake 4.x by default, meaning the workaround must stay in sync with future CMake releases. The FetchContent_MakeAvailable() API (available since CMake 3.14) is the recommended migration path.

This is acceptable as a short-term compatibility fix, but a follow-up to migrate away from FetchContent_Populate() should be tracked, especially since the project already handles CMake 4.x incompatibilities in other parts of this patch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. The cmake_policy(SET CMP0169 OLD) is an intentional short-term compatibility shim to unblock CMake 4.x builds. Migrating the affected FetchContent_Populate() calls to FetchContent_MakeAvailable() is tracked as a follow-up — the current patch keeps the existing FetchContent call sites working without regressions while the proper migration is planned separately.

# Save original flags and set flags to suppress warnings for Arrow build
Expand All @@ -116,6 +123,12 @@ function(build_arrow_as_third_party)
cmake_policy(SET CMP0135 NEW)
endif()

# CMake 3.30+ introduced CMP0169 which deprecates FetchContent_Populate() with declared details.
# CMake 4.x made it an error by default. Set to OLD to preserve the existing pattern.
if(POLICY CMP0169)
cmake_policy(SET CMP0169 OLD)
endif()

message(STATUS "Fetching Arrow ${ARROW_VERSION} from ${ARROW_SOURCE_URL}")

fetchcontent_declare(Arrow
Expand All @@ -127,6 +140,24 @@ function(build_arrow_as_third_party)
FetchContent_GetProperties(Arrow)
if(NOT arrow_POPULATED)
FetchContent_Populate(Arrow)
# CMake 4.x removed support for cmake_minimum_required < 3.5.
# Arrow's bundled ExternalProjects (e.g. snappy) use EP_COMMON_CMAKE_ARGS
# which is built inside Arrow's ThirdpartyToolchain.cmake. We patch that
# file after fetch to append -DCMAKE_POLICY_VERSION_MINIMUM=3.5 so every
# ExternalProject sub-build can configure under CMake 4.x.
if(CMAKE_VERSION VERSION_GREATER_EQUAL "4.0")
set(_toolchain "${arrow_SOURCE_DIR}/cpp/cmake_modules/ThirdpartyToolchain.cmake")
file(READ "${_toolchain}" _toolchain_content)
string(FIND "${_toolchain_content}" "CMAKE_POLICY_VERSION_MINIMUM" _already_patched)
if(_already_patched EQUAL -1)
string(REPLACE
"# if building with a toolchain file, pass that through"
"# CMake 4.x: inject CMAKE_POLICY_VERSION_MINIMUM into all ExternalProject builds\nlist(APPEND EP_COMMON_CMAKE_ARGS \"-DCMAKE_POLICY_VERSION_MINIMUM=3.5\")\n\n# if building with a toolchain file, pass that through"
_toolchain_content "${_toolchain_content}")
file(WRITE "${_toolchain}" "${_toolchain_content}")
message(STATUS "Patched Arrow ThirdpartyToolchain.cmake for CMake 4.x compatibility")
endif()
endif()
add_subdirectory(${arrow_SOURCE_DIR}/cpp ${arrow_BINARY_DIR} EXCLUDE_FROM_ALL)
endif()

Expand Down Expand Up @@ -214,6 +245,33 @@ function(build_arrow_as_third_party)
include_directories(${arrow_SOURCE_DIR}/cpp/src
${arrow_BINARY_DIR}/src)

# Try different possible Arrow Parquet target names
if(TARGET Arrow::parquet_static)
message(STATUS "Found Arrow::parquet_static target")
set(ARROW_PARQUET_LIB Arrow::parquet_static)
elseif(TARGET arrow_parquet_static)
message(STATUS "Found arrow_parquet_static target")
set(ARROW_PARQUET_LIB arrow_parquet_static)
elseif(TARGET parquet_static)
message(STATUS "Found parquet_static target")
set(ARROW_PARQUET_LIB parquet_static)
elseif(TARGET Arrow::parquet)
message(STATUS "Found Arrow::parquet target (using as fallback)")
set(ARROW_PARQUET_LIB Arrow::parquet)
elseif(TARGET parquet)
message(STATUS "Found parquet target (using as fallback)")
set(ARROW_PARQUET_LIB parquet)
else()
message(WARNING "Arrow parquet target not found. Parquet symbols may be unresolved.")
set(ARROW_PARQUET_LIB "")
endif()

if(ARROW_PARQUET_LIB)
list(APPEND ARROW_LIB ${ARROW_PARQUET_LIB})
set(ARROW_LIB ${ARROW_LIB} PARENT_SCOPE)
set(ARROW_PARQUET_LIB ${ARROW_PARQUET_LIB} PARENT_SCOPE)
endif()

# Set additional Arrow variables for compatibility
set(ARROW_FOUND TRUE PARENT_SCOPE)
set(ARROW_LIBRARIES ${ARROW_LIB} PARENT_SCOPE)
Expand Down Expand Up @@ -251,6 +309,21 @@ function(build_arrow_as_third_party)
FILES_MATCHING PATTERN "*.h"
PATTERN "test" EXCLUDE
PATTERN "testing" EXCLUDE)

# Install Parquet headers if Parquet is enabled
if(ARROW_PARQUET)
install(DIRECTORY ${arrow_SOURCE_DIR}/cpp/src/parquet
DESTINATION include
FILES_MATCHING PATTERN "*.h"
PATTERN "test" EXCLUDE
PATTERN "testing" EXCLUDE)

install(DIRECTORY ${arrow_BINARY_DIR}/src/parquet
DESTINATION include
FILES_MATCHING PATTERN "*.h"
PATTERN "test" EXCLUDE
PATTERN "testing" EXCLUDE)
endif()

else()
# list(APPEND ICEBERG_SYSTEM_DEPENDENCIES Arrow)
Expand Down
79 changes: 79 additions & 0 deletions doc/source/extensions/load_parquet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Parquet Extension

Apache Parquet is a columnar storage format widely used in data engineering and analytics workloads. NeuG supports Parquet file import functionality through the Extension framework. After loading the Parquet Extension, users can directly load external Parquet files using the `LOAD FROM` syntax.

## Install Extension

```cypher
INSTALL PARQUET;
```

## Load Extension

```cypher
LOAD PARQUET;
```

## Using Parquet Extension

`LOAD FROM` reads Parquet files and exposes their columns for querying. Schema is automatically inferred from the Parquet file metadata by default.

### Parquet Format Options

The following options control how Parquet files are read:

| Option | Type | Default | Description |
| ------------------------ | ----- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------- |
| `buffered_stream` | bool | `true` | Enable buffered I/O stream for improved sequential read performance. |
| `pre_buffer` | bool | `false` | Pre-buffer column data before decoding. Recommended for high-latency filesystems such as S3. |
| `enable_io_coalescing` | bool | `true` | Enable Arrow I/O read coalescing (hole-filling cache) to reduce I/O overhead when reading non-contiguous byte ranges. When `true`, uses lazy coalescing; when `false`, uses eager coalescing. |
| `parquet_batch_rows` | int64 | `65536` | Number of rows per Arrow record batch when converting Parquet row groups into in-memory batches. |

### Query Examples

#### Basic Parquet Loading

Load all columns from a Parquet file:

```cypher
LOAD FROM "person.parquet"
RETURN *;
```

#### Specifying Batch Size

Tune memory usage by adjusting the number of rows read per batch:

```cypher
LOAD FROM "person.parquet" (parquet_batch_rows=8192)
RETURN *;
```

#### Enabling I/O Coalescing

Enable eager I/O coalescing for workloads that benefit from pre-fetching contiguous data:

```cypher
LOAD FROM "person.parquet" (enable_io_coalescing=false)
RETURN *;
```

#### Column Projection

Return only specific columns from Parquet data:

```cypher
LOAD FROM "person.parquet"
RETURN fName, age;
```

#### Column Aliases

Use `AS` to assign aliases to columns:

```cypher
LOAD FROM "person.parquet"
RETURN fName AS name, age AS years;
```

> **Note:** All relational operations supported by `LOAD FROM` — including type conversion, WHERE filtering, aggregation, sorting, and limiting — work the same way with Parquet files. See the [LOAD FROM reference](../data_io/load_data) for the complete list of operations.
122 changes: 122 additions & 0 deletions example_dataset/comprehensive_graph/csv_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python3
# Copyright 2020 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Preprocess comprehensive_graph CSV files into Parquet format.

Outputs are written to example_dataset/comprehensive_graph/parquet/.

Notes:
- interval_property is kept as a Parquet STRING column (pyarrow CSV inference).
NeuG's INTERVAL text format ("1year2months3days...") has no writable Parquet
native interval equivalent via pyarrow, so string is the simplest lossless form.
- All other columns (INT32/INT64/UINT32/UINT64/FLOAT/DOUBLE/STRING/DATE/DATETIME)
are preserved with explicitly typed Arrow columns.
"""

import os
import sys

try:
import pyarrow as pa
import pyarrow.csv as pa_csv
import pyarrow.parquet as pq
except ImportError:
print("Error: pyarrow is required. Install with: pip install pyarrow")
sys.exit(1)

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
OUTPUT_DIR = os.path.join(SCRIPT_DIR, "parquet")

# Edge files: all have duplicate src/dst column both named 'node_a.id'.
# Rename the first two columns to src_id / dst_id for uniqueness in Parquet.
EDGE_FILES = {
"rel_a.csv",
}

# All CSV files to convert
CSV_FILES = [
"node_a.csv",
"rel_a.csv",
]


def convert(csv_filename: str) -> None:
csv_path = os.path.join(SCRIPT_DIR, csv_filename)
parquet_filename = os.path.splitext(csv_filename)[0] + ".parquet"
parquet_path = os.path.join(OUTPUT_DIR, parquet_filename)

read_opts = pa_csv.ReadOptions()
parse_opts = pa_csv.ParseOptions(delimiter="|")
# timestamp_parsers: let Arrow auto-detect date/datetime columns.
# Explicitly type columns where CSV auto-inference picks wrong types:
# - i32_property: inferred as int64 (pyarrow defaults all integers to int64)
# - u32/u64_property: inferred as double (values overflow int64)
# - f32_property: inferred as double (pyarrow has no float32 inference)
node_column_types = {
"i32_property": pa.int32(),
"u32_property": pa.uint32(),
"u64_property": pa.uint64(),
"f32_property": pa.float32(),
} if csv_filename in ("node_a.csv", "node_b.csv") else {}
edge_column_types = {
"i32_weight": pa.int32(),
# datetime_weight values are date-only strings ("2023-05-17"); pyarrow infers
# date32[day] by default, but DT_DATETIME requires timestamp[ms].
"datetime_weight": pa.timestamp("ms"),
} if csv_filename in EDGE_FILES else {}
column_types = {**node_column_types, **edge_column_types}
convert_opts = pa_csv.ConvertOptions(
timestamp_parsers=["%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"],
column_types=column_types,
)

table = pa_csv.read_csv(
csv_path,
read_options=read_opts,
parse_options=parse_opts,
convert_options=convert_opts,
)

# Rename duplicate src/dst columns in edge files
if csv_filename in EDGE_FILES:
old_names = table.schema.names
new_names = ["src_id" if i == 0 else "dst_id" if i == 1 else n
for i, n in enumerate(old_names)]
table = table.rename_columns(new_names)
print(f" renamed src/dst columns: {old_names[:2]} -> ['src_id', 'dst_id']")

# interval_property is kept as-is: pyarrow CSV infers it as string
pq.write_table(table, parquet_path)
print(f" {csv_filename} -> parquet/{parquet_filename} ({table.num_rows} rows, {len(table.schema)} cols)")


def main():
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"Output directory: {OUTPUT_DIR}\n")

for csv_file in CSV_FILES:
csv_path = os.path.join(SCRIPT_DIR, csv_file)
if not os.path.exists(csv_path):
print(f"[SKIP] {csv_file} not found")
continue
print(f"Converting {csv_file} ...")
convert(csv_file)

print("\nDone.")


if __name__ == "__main__":
main()
Binary file not shown.
Binary file not shown.
Binary file added example_dataset/tinysnb/parquet/eMeets.parquet
Binary file not shown.
Binary file added example_dataset/tinysnb/parquet/vPerson.parquet
Binary file not shown.
Loading
Loading