diff --git a/extension/json/include/json_read_function.h b/extension/json/include/json_read_function.h index 56baa9d4f..4ec64de21 100644 --- a/extension/json/include/json_read_function.h +++ b/extension/json/include/json_read_function.h @@ -23,6 +23,7 @@ #include "json_options.h" #include "neug/compiler/function/function.h" #include "neug/compiler/function/read_function.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/execution/execute/ops/batch/batch_update_utils.h" #include "neug/utils/reader/options.h" #include "neug/utils/reader/reader.h" @@ -49,15 +50,20 @@ struct JsonReadFunction { static execution::Context jsonExecFunc( std::shared_ptr state) { - // todo: get file system from vfs manager - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(state->schema.file); - state->schema.file.paths = fileInfo.resolvedPaths; + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(state->schema.file); + auto resolvedPaths = std::vector(); + for (const auto& path : state->schema.file.paths) { + const auto& resolved = fs->glob(path); + resolvedPaths.insert(resolvedPaths.end(), resolved.begin(), + resolved.end()); + } + state->schema.file.paths = std::move(resolvedPaths); auto optionsBuilder = std::make_unique(state); // register JsonDatasetBuilder to the reader to support json array format auto reader = std::make_unique( - state, std::move(optionsBuilder), fileInfo.fileSystem, + state, std::move(optionsBuilder), fs->toArrowFileSystem(), std::make_shared()); execution::Context ctx; auto localState = std::make_shared(); @@ -73,15 +79,20 @@ struct JsonReadFunction { // to be inferred. externalSchema.entry = std::make_shared(); externalSchema.file = schema; - // todo: get file system from vfs manager - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(state->schema.file); - state->schema.file.paths = fileInfo.resolvedPaths; + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(state->schema.file); + auto resolvedPaths = std::vector(); + for (const auto& path : state->schema.file.paths) { + const auto& resolved = fs->glob(path); + resolvedPaths.insert(resolvedPaths.end(), resolved.begin(), + resolved.end()); + } + state->schema.file.paths = std::move(resolvedPaths); auto optionsBuilder = std::make_unique(state); // register JsonDatasetBuilder to the reader to support json array format auto reader = std::make_shared( - state, std::move(optionsBuilder), fileInfo.fileSystem, + state, std::move(optionsBuilder), fs->toArrowFileSystem(), std::make_shared()); auto sniffer = std::make_shared(reader); auto sniffResult = sniffer->sniff(); @@ -110,15 +121,21 @@ struct JsonLReadFunction { static execution::Context jsonLExecFunc( std::shared_ptr state) { // todo: get file system from vfs manager - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(state->schema.file); - state->schema.file.paths = fileInfo.resolvedPaths; + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(state->schema.file); + auto resolvedPaths = std::vector(); + for (const auto& path : state->schema.file.paths) { + const auto& resolved = fs->glob(path); + resolvedPaths.insert(resolvedPaths.end(), resolved.begin(), + resolved.end()); + } + state->schema.file.paths = std::move(resolvedPaths); auto optionsBuilder = std::make_unique(state); // Arrow can support jsonl format by default, no need to register other // DatasetBuilder auto reader = std::make_unique( - state, std::move(optionsBuilder), fileInfo.fileSystem); + state, std::move(optionsBuilder), fs->toArrowFileSystem()); execution::Context ctx; auto localState = std::make_shared(); reader->read(localState, ctx); @@ -134,15 +151,21 @@ struct JsonLReadFunction { externalSchema.entry = std::make_shared(); externalSchema.file = schema; // todo: get file system from vfs manager - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(state->schema.file); - state->schema.file.paths = fileInfo.resolvedPaths; + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(state->schema.file); + auto resolvedPaths = std::vector(); + for (const auto& path : state->schema.file.paths) { + const auto& resolved = fs->glob(path); + resolvedPaths.insert(resolvedPaths.end(), resolved.begin(), + resolved.end()); + } + state->schema.file.paths = std::move(resolvedPaths); auto optionsBuilder = std::make_unique(state); // Arrow can support jsonl format by default, no need to register other // DatasetBuilder auto reader = std::make_shared( - state, std::move(optionsBuilder), fileInfo.fileSystem); + state, std::move(optionsBuilder), fs->toArrowFileSystem()); auto sniffer = std::make_shared(reader); auto sniffResult = sniffer->sniff(); if (!sniffResult) { diff --git a/extension/json/src/json_export_function.cc b/extension/json/src/json_export_function.cc index e3d6a8836..ffbf8c48d 100644 --- a/extension/json/src/json_export_function.cc +++ b/extension/json/src/json_export_function.cc @@ -25,6 +25,7 @@ #include #include "neug/compiler/function/read_function.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/generated/proto/response/response.pb.h" #include "neug/utils/exception/exception.h" #include "neug/utils/property/types.h" @@ -426,10 +427,10 @@ static execution::Context jsonExecFunc( if (schema.paths.empty()) { THROW_INVALID_ARGUMENT_EXCEPTION("Schema paths is empty"); } - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(schema, false); + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(schema); auto writer = std::make_shared( - schema, fileInfo.fileSystem, entry_schema); + schema, fs->toArrowFileSystem(), entry_schema); auto status = writer->write(ctx, graph); if (!status.ok()) { THROW_IO_EXCEPTION("Export failed: " + status.ToString()); @@ -463,10 +464,10 @@ static execution::Context jsonLExecFunc( if (schema.paths.empty()) { THROW_INVALID_ARGUMENT_EXCEPTION("Schema paths is empty"); } - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(schema, false); + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(schema); auto writer = std::make_shared( - schema, fileInfo.fileSystem, entry_schema); + schema, fs->toArrowFileSystem(), entry_schema); auto status = writer->write(ctx, graph); if (!status.ok()) { THROW_IO_EXCEPTION("Export failed: " + status.ToString()); diff --git a/include/neug/compiler/binder/binder.h b/include/neug/compiler/binder/binder.h index ce2526049..1e3aa3922 100644 --- a/include/neug/compiler/binder/binder.h +++ b/include/neug/compiler/binder/binder.h @@ -187,11 +187,6 @@ class Binder { std::unique_ptr bindCopyToClause( const parser::Statement& statement); - std::unique_ptr bindExportDatabaseClause( - const parser::Statement& statement); - std::unique_ptr bindImportDatabaseClause( - const parser::Statement& statement); - static std::unique_ptr bindAttachDatabase( const parser::Statement& statement); static std::unique_ptr bindDetachDatabase( diff --git a/include/neug/compiler/binder/bound_export_database.h b/include/neug/compiler/binder/bound_export_database.h deleted file mode 100644 index 0a8971b7d..000000000 --- a/include/neug/compiler/binder/bound_export_database.h +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once -#include "neug/compiler/binder/binder.h" -#include "neug/compiler/binder/bound_statement.h" -#include "neug/compiler/binder/query/bound_regular_query.h" -#include "neug/compiler/common/copier_config/file_scan_info.h" - -namespace neug { -namespace binder { - -struct ExportedTableData { - std::string tableName; - std::unique_ptr regularQuery; - std::vector columnNames; - std::vector columnTypes; - - const std::vector& getColumnTypesRef() const { - return columnTypes; - } - const BoundRegularQuery* getRegularQuery() const { - return regularQuery.get(); - } -}; - -class BoundExportDatabase final : public BoundStatement { - static constexpr common::StatementType type_ = - common::StatementType::EXPORT_DATABASE; - - public: - BoundExportDatabase(std::string filePath, common::FileTypeInfo fileTypeInfo, - std::vector exportData, - common::case_insensitive_map_t csvOption) - : BoundStatement{type_, - BoundStatementResult::createSingleStringColumnResult()}, - exportData(std::move(exportData)), - boundFileInfo(std::move(fileTypeInfo), - std::vector{std::move(filePath)}) { - boundFileInfo.options = std::move(csvOption); - } - - std::string getFilePath() const { return boundFileInfo.filePaths[0]; } - common::FileType getFileType() const { - return boundFileInfo.fileTypeInfo.fileType; - } - common::case_insensitive_map_t getExportOptions() const { - return boundFileInfo.options; - } - const common::FileScanInfo* getBoundFileInfo() const { - return &boundFileInfo; - } - const std::vector* getExportData() const { - return &exportData; - } - - private: - std::vector exportData; - common::FileScanInfo boundFileInfo; -}; - -} // namespace binder -} // namespace neug diff --git a/include/neug/compiler/binder/bound_import_database.h b/include/neug/compiler/binder/bound_import_database.h deleted file mode 100644 index 4d480d42b..000000000 --- a/include/neug/compiler/binder/bound_import_database.h +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once -#include "neug/compiler/binder/bound_statement.h" - -namespace neug { -namespace binder { - -class BoundImportDatabase final : public BoundStatement { - public: - BoundImportDatabase(std::string filePath, std::string query, - std::string indexQuery) - : BoundStatement{common::StatementType::IMPORT_DATABASE, - BoundStatementResult::createSingleStringColumnResult()}, - filePath{std::move(filePath)}, - query{std::move(query)}, - indexQuery{std::move(indexQuery)} {} - - std::string getFilePath() const { return filePath; } - std::string getQuery() const { return query; } - std::string getIndexQuery() const { return indexQuery; } - - private: - std::string filePath; - // We concatenate queries based on the schema.cypher, copy.cypher, and - // macro.cypher files generated by exporting the database, resulting in - // queries such as "create node table xxx; create rel table xxx; copy from - // xxxx;". - std::string query; - std::string indexQuery; -}; - -} // namespace binder -} // namespace neug diff --git a/include/neug/compiler/common/file_system/compressed_file_system.h b/include/neug/compiler/common/file_system/compressed_file_system.h deleted file mode 100644 index b36e926f8..000000000 --- a/include/neug/compiler/common/file_system/compressed_file_system.h +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include "neug/compiler/common/file_system/file_system.h" -#include "neug/compiler/common/types/types.h" - -namespace neug { -namespace common { - -struct StreamData { - bool refresh = false; - std::unique_ptr inputBuf; - std::unique_ptr outputBuf; - uint8_t* inputBufStart = nullptr; - uint8_t* inputBufEnd = nullptr; - uint8_t* outputBufStart = nullptr; - uint8_t* outputBufEnd = nullptr; - common::idx_t inputBufSize = 0; - common::idx_t outputBufSize = 0; -}; - -struct CompressedFileInfo; - -struct StreamWrapper { - virtual ~StreamWrapper() = default; - virtual void initialize(CompressedFileInfo& file) = 0; - virtual bool read(StreamData& stream_data) = 0; - virtual void close() = 0; -}; - -class CompressedFileSystem : public FileSystem { - public: - virtual std::unique_ptr openCompressedFile( - std::unique_ptr fileInfo) = 0; - virtual std::unique_ptr createStream() = 0; - virtual idx_t getInputBufSize() = 0; - virtual idx_t getOutputBufSize() = 0; - - bool canPerformSeek() const override { return false; } - - protected: - std::vector glob(main::ClientContext* /*context*/, - const std::string& /*path*/) const override { - NEUG_UNREACHABLE; - } - - void readFromFile(FileInfo& /*fileInfo*/, void* /*buffer*/, - uint64_t /*numBytes*/, - uint64_t /*position*/) const override; - - int64_t readFile(FileInfo& fileInfo, void* buf, - size_t numBytes) const override; - - void writeFile(FileInfo& /*fileInfo*/, const uint8_t* /*buffer*/, - uint64_t /*numBytes*/, uint64_t /*offset*/) const override { - NEUG_UNREACHABLE; - } - - void reset(FileInfo& fileInfo) override; - - int64_t seek(FileInfo& /*fileInfo*/, uint64_t /*offset*/, - int /*whence*/) const override { - NEUG_UNREACHABLE; - } - - uint64_t getFileSize(const FileInfo& fileInfo) const override; - - void syncFile(const FileInfo& fileInfo) const override; -}; - -struct CompressedFileInfo : public FileInfo { - CompressedFileSystem& compressedFS; - std::unique_ptr childFileInfo; - StreamData streamData; - idx_t currentPos = 0; - std::unique_ptr stream_wrapper; - - CompressedFileInfo(CompressedFileSystem& compressedFS, - std::unique_ptr childFileInfo) - : FileInfo{childFileInfo->path, &compressedFS}, - compressedFS{compressedFS}, - childFileInfo{std::move(childFileInfo)} {} - ~CompressedFileInfo() override { close(); } - - void initialize(); - int64_t readData(void* buffer, size_t numBytes); - void close(); -}; - -} // namespace common -} // namespace neug diff --git a/include/neug/compiler/common/file_system/file_info.h b/include/neug/compiler/common/file_system/file_info.h deleted file mode 100644 index c010b4ad2..000000000 --- a/include/neug/compiler/common/file_system/file_info.h +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include -#include - -#include "neug/compiler/common/cast.h" -#include "neug/utils/api.h" - -namespace neug { -namespace common { - -class FileSystem; - -struct NEUG_API FileInfo { - FileInfo(std::string path, FileSystem* fileSystem) - : path{std::move(path)}, fileSystem{fileSystem} {} - - virtual ~FileInfo() = default; - - uint64_t getFileSize() const; - - void readFromFile(void* buffer, uint64_t numBytes, uint64_t position); - - int64_t readFile(void* buf, size_t nbyte); - - void writeFile(const uint8_t* buffer, uint64_t numBytes, uint64_t offset); - - void syncFile() const; - - int64_t seek(uint64_t offset, int whence); - - void reset(); - - void truncate(uint64_t size); - - bool canPerformSeek() const; - - template - TARGET* ptrCast() { - return common::neug_dynamic_cast(this); - } - - template - const TARGET* constPtrCast() const { - return common::neug_dynamic_cast(this); - } - - template - const TARGET& constCast() const { - return common::neug_dynamic_cast(*this); - } - - template - TARGET& cast() { - return common::neug_dynamic_cast(*this); - } - - const std::string path; - - FileSystem* fileSystem; -}; - -} // namespace common -} // namespace neug diff --git a/include/neug/compiler/common/file_system/file_system.h b/include/neug/compiler/common/file_system/file_system.h deleted file mode 100644 index 1eb3b5888..000000000 --- a/include/neug/compiler/common/file_system/file_system.h +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include -#include - -#include "file_info.h" -#include "neug/compiler/common/assert.h" -#include "neug/compiler/common/cast.h" - -namespace neug { -namespace main { -class ClientContext; -} // namespace main - -namespace common { - -enum class FileLockType : uint8_t { - NO_LOCK = 0, - READ_LOCK = 1, - WRITE_LOCK = 2 -}; - -enum class FileCompressionType : uint8_t { - AUTO_DETECT = 0, - UNCOMPRESSED = 1, - GZIP = 2, - // ZSTD = 3 -}; - -struct FileFlags { - static constexpr uint8_t READ_ONLY = 1 << 0; - static constexpr uint8_t WRITE = 1 << 1; - // Create file if not exists, can only be used together with WRITE - static constexpr uint8_t CREATE_IF_NOT_EXISTS = 1 << 3; - // Always create a new file. If a file exists, the file is truncated. Cannot - // be used together with CREATE_IF_NOT_EXISTS. - static constexpr uint8_t CREATE_AND_TRUNCATE_IF_EXISTS = 1 << 4; - // Temporary file that is not persisted to disk. - static constexpr uint8_t TEMPORARY = 1 << 5; -#ifdef _WIN32 - // Only used in windows to open files in binary mode. - static constexpr uint8_t BINARY = 1 << 5; -#endif -}; - -struct FileOpenFlags { - int flags; - FileLockType lockType = FileLockType::NO_LOCK; - FileCompressionType compressionType = FileCompressionType::AUTO_DETECT; - - explicit FileOpenFlags(int flags) : flags{flags} {} -}; - -class NEUG_API FileSystem { - friend struct FileInfo; - - public: - FileSystem() = default; - - explicit FileSystem(std::string homeDir) : homeDir(std::move(homeDir)) {} - - virtual ~FileSystem() = default; - - virtual std::unique_ptr openFile( - const std::string& /*path*/, FileOpenFlags /*flags*/, - main::ClientContext* /*context*/ = nullptr) { - NEUG_UNREACHABLE; - } - - virtual std::vector glob(main::ClientContext* /*context*/, - const std::string& /*path*/) const { - NEUG_UNREACHABLE; - } - - virtual void overwriteFile(const std::string& from, const std::string& to); - - virtual void copyFile(const std::string& from, const std::string& to); - - virtual void createDir(const std::string& dir) const; - - virtual void removeFileIfExists(const std::string& path); - - virtual bool fileOrPathExists(const std::string& path, - main::ClientContext* context = nullptr); - - virtual std::string expandPath(main::ClientContext* context, - const std::string& path) const; - - static std::string joinPath(const std::string& base, const std::string& part); - - static std::string getFileExtension(const std::filesystem::path& path); - - static bool isCompressedFile(const std::filesystem::path& path); - - static std::string getFileName(const std::filesystem::path& path); - - virtual bool canHandleFile(const std::string_view /*path*/) const { - NEUG_UNREACHABLE; - } - - virtual void syncFile(const FileInfo& fileInfo) const = 0; - - virtual bool canPerformSeek() const { return true; } - - template - TARGET* ptrCast() { - return common::neug_dynamic_cast(this); - } - - template - const TARGET* constPtrCast() const { - return common::neug_dynamic_cast(this); - } - - virtual void cleanUP(main::ClientContext* /*context*/) {} - - protected: - virtual void readFromFile(FileInfo& fileInfo, void* buffer, uint64_t numBytes, - uint64_t position) const = 0; - - virtual int64_t readFile(FileInfo& fileInfo, void* buf, - size_t numBytes) const = 0; - - virtual void writeFile(FileInfo& fileInfo, const uint8_t* buffer, - uint64_t numBytes, uint64_t offset) const; - - virtual int64_t seek(FileInfo& fileInfo, uint64_t offset, - int whence) const = 0; - - virtual void reset(FileInfo& fileInfo); - - virtual void truncate(FileInfo& fileInfo, uint64_t size) const; - - virtual uint64_t getFileSize(const FileInfo& fileInfo) const = 0; - - static bool isGZIPCompressed(const std::filesystem::path& path); - - std::string homeDir; -}; - -} // namespace common -} // namespace neug diff --git a/include/neug/compiler/common/file_system/local_file_system.h b/include/neug/compiler/common/file_system/local_file_system.h deleted file mode 100644 index bee29742f..000000000 --- a/include/neug/compiler/common/file_system/local_file_system.h +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include - -#include "file_system.h" - -namespace neug { -namespace common { - -struct LocalFileInfo : public FileInfo { -#ifdef _WIN32 - LocalFileInfo(std::string path, const void* handle, FileSystem* fileSystem) - : FileInfo{std::move(path), fileSystem}, handle{handle} {} -#else - LocalFileInfo(std::string path, const int fd, FileSystem* fileSystem) - : FileInfo{std::move(path), fileSystem}, fd{fd} {} -#endif - - ~LocalFileInfo() override; - -#ifdef _WIN32 - const void* handle; -#else - const int fd; -#endif -}; - -class NEUG_API LocalFileSystem final : public FileSystem { - public: - explicit LocalFileSystem(std::string homeDir) - : FileSystem(std::move(homeDir)) {} - - std::unique_ptr openFile( - const std::string& path, FileOpenFlags flags, - main::ClientContext* context = nullptr) override; - - std::vector glob(main::ClientContext* context, - const std::string& path) const override; - - void overwriteFile(const std::string& from, const std::string& to) override; - - void copyFile(const std::string& from, const std::string& to) override; - - void createDir(const std::string& dir) const override; - - void removeFileIfExists(const std::string& path) override; - - bool fileOrPathExists(const std::string& path, - main::ClientContext* context = nullptr) override; - - std::string expandPath(main::ClientContext* context, - const std::string& path) const override; - - void syncFile(const FileInfo& fileInfo) const override; - - static bool isLocalPath(const std::string& path); - - static bool fileExists(const std::string& filename); - - protected: - void readFromFile(FileInfo& fileInfo, void* buffer, uint64_t numBytes, - uint64_t position) const override; - - int64_t readFile(FileInfo& fileInfo, void* buf, size_t nbyte) const override; - - void writeFile(FileInfo& fileInfo, const uint8_t* buffer, uint64_t numBytes, - uint64_t offset) const override; - - int64_t seek(FileInfo& fileInfo, uint64_t offset, int whence) const override; - - void truncate(FileInfo& fileInfo, uint64_t size) const override; - - uint64_t getFileSize(const FileInfo& fileInfo) const override; -}; - -} // namespace common -} // namespace neug diff --git a/include/neug/compiler/common/file_system/virtual_file_system.h b/include/neug/compiler/common/file_system/virtual_file_system.h deleted file mode 100644 index 267341130..000000000 --- a/include/neug/compiler/common/file_system/virtual_file_system.h +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include -#include -#include - -#include "compressed_file_system.h" -#include "file_system.h" - -namespace neug { -namespace main { -class MetadataManager; -} - -namespace storage { -class BufferManager; -}; -namespace common { - -class NEUG_API VirtualFileSystem final : public FileSystem { - friend class storage::BufferManager; - - public: - VirtualFileSystem(); - explicit VirtualFileSystem(std::string homeDir); - - ~VirtualFileSystem() override; - - void registerFileSystem(std::unique_ptr fileSystem); - - std::unique_ptr openFile( - const std::string& path, FileOpenFlags flags, - main::ClientContext* context = nullptr) override; - - std::vector glob(main::ClientContext* context, - const std::string& path) const override; - - void overwriteFile(const std::string& from, const std::string& to) override; - - void createDir(const std::string& dir) const override; - - void removeFileIfExists(const std::string& path) override; - - bool fileOrPathExists(const std::string& path, - main::ClientContext* context = nullptr) override; - - std::string expandPath(main::ClientContext* context, - const std::string& path) const override; - - void syncFile(const FileInfo& fileInfo) const override; - - void cleanUP(main::ClientContext* context) override; - - protected: - void readFromFile(FileInfo& fileInfo, void* buffer, uint64_t numBytes, - uint64_t position) const override; - - int64_t readFile(FileInfo& fileInfo, void* buf, size_t nbyte) const override; - - void writeFile(FileInfo& fileInfo, const uint8_t* buffer, uint64_t numBytes, - uint64_t offset) const override; - - int64_t seek(FileInfo& fileInfo, uint64_t offset, int whence) const override; - - void truncate(FileInfo& fileInfo, uint64_t size) const override; - - uint64_t getFileSize(const FileInfo& fileInfo) const override; - - private: - FileSystem* findFileSystem(const std::string& path) const; - - FileCompressionType autoDetectCompressionType(const std::string& path) const; - - private: - std::vector> subSystems; - std::unique_ptr defaultFS; - std::unordered_map> - compressedFileSystem; -}; - -} // namespace common -} // namespace neug diff --git a/include/neug/compiler/common/serializer/buffered_file.h b/include/neug/compiler/common/serializer/buffered_file.h deleted file mode 100644 index a965060ed..000000000 --- a/include/neug/compiler/common/serializer/buffered_file.h +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include - -#include "neug/compiler/common/serializer/reader.h" -#include "neug/compiler/common/serializer/writer.h" - -namespace neug { -namespace common { - -struct FileInfo; - -class BufferedFileWriter final : public Writer { - public: - explicit BufferedFileWriter(FileInfo& fileInfo); - ~BufferedFileWriter() override; - - void write(const uint8_t* data, uint64_t size) override; - - void flush(); - void sync(); - - // Note: this function is reseting next file offset to be written. Make sure - // buffer is empty. - void setFileOffset(uint64_t fileOffset) { this->fileOffset = fileOffset; } - uint64_t getFileOffset() const { return fileOffset; } - void resetOffsets() { - fileOffset = 0; - bufferOffset = 0; - } - - uint64_t getFileSize() const; - FileInfo& getFileInfo() const { return fileInfo; } - - protected: - std::unique_ptr buffer; - uint64_t fileOffset, bufferOffset; - FileInfo& fileInfo; -}; - -class BufferedFileReader final : public Reader { - public: - explicit BufferedFileReader(std::unique_ptr fileInfo); - - void read(uint8_t* data, uint64_t size) override; - - bool finished() override; - - private: - std::unique_ptr buffer; - uint64_t fileOffset, bufferOffset; - std::unique_ptr fileInfo; - uint64_t fileSize; - uint64_t bufferSize; - - private: - void readNextPage(); -}; - -} // namespace common -} // namespace neug diff --git a/include/neug/compiler/extension/extension_api.h b/include/neug/compiler/extension/extension_api.h index 4b48b8ab6..b0d72ef71 100644 --- a/include/neug/compiler/extension/extension_api.h +++ b/include/neug/compiler/extension/extension_api.h @@ -18,11 +18,9 @@ #include #include #include -#include -#include "neug/compiler/catalog/catalog.h" -#include "neug/compiler/extension/extension.h" #include "neug/compiler/gopt/g_catalog.h" -#include "neug/compiler/gopt/g_catalog_holder.h" +#include "neug/compiler/main/metadata_manager.h" +#include "neug/compiler/main/metadata_registry.h" namespace neug { namespace extension { @@ -41,11 +39,9 @@ struct ExtensionInfo { class ExtensionAPI { public: - static void setCatalog(neug::catalog::Catalog* catalog); - template static void registerFunction(catalog::CatalogEntryType entryType) { - auto gCatalog = catalog::GCatalogHolder::getGCatalog(); + auto gCatalog = neug::main::MetadataRegistry::getCatalog(); if (gCatalog->containsFunction(&neug::transaction::DUMMY_TRANSACTION, T::name, false)) { return; @@ -57,12 +53,23 @@ class ExtensionAPI { template static void registerFunctionAlias(catalog::CatalogEntryType entryType) { - auto gCatalog = catalog::GCatalogHolder::getGCatalog(); + auto gCatalog = neug::main::MetadataRegistry::getCatalog(); gCatalog->addFunctionWithSignature(&neug::transaction::DUMMY_TRANSACTION, entryType, T::name, T::alias::getFunctionSet(), false); } + // Register file system factory for specific protocol. + // For example, register "file" protocol file system factory: + // registerFileSystem("file", [](const reader::FileSchema& schema) { + // return std::make_unique(schema); + // }); + static void registerFileSystem(const std::string& protocol, + neug::fsys::FileSystemFactory factory) { + auto vfs = neug::main::MetadataRegistry::getVFS(); + vfs->Register(protocol, std::move(factory)); + } + static void registerExtension(const ExtensionInfo& info); static const std::unordered_map& getLoadedExtensions(); diff --git a/include/neug/compiler/function/csv_read_function.h b/include/neug/compiler/function/csv_read_function.h index 7f663294b..4275cd5d9 100644 --- a/include/neug/compiler/function/csv_read_function.h +++ b/include/neug/compiler/function/csv_read_function.h @@ -21,6 +21,7 @@ #include #include "neug/compiler/function/function.h" #include "neug/compiler/function/read_function.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/execution/execute/ops/batch/batch_update_utils.h" #include "neug/utils/exception/exception.h" #include "neug/utils/reader/options.h" @@ -59,11 +60,13 @@ struct CSVReadFunction { auto& options = state->schema.file.options; // convert user-specified 'DELIMITER' to 'DELIM' for arrow csv options, all // options are case insensitive - if (options.contains("DELIMITER")) { - options.insert({"DELIM", options.at("DELIMITER")}); + auto it = options.find("DELIMITER"); + if (it != options.end()) { + options["DELIM"] = it->second; } - if (options.contains("DELIM")) { - auto value = options.at("DELIM"); + it = options.find("DELIM"); + if (it != options.end()) { + auto value = it->second; if (value.size() != 1) { THROW_INVALID_ARGUMENT_EXCEPTION( "Delimiter should be a single character: " + value); @@ -85,11 +88,13 @@ struct CSVReadFunction { auto& options = schema.options; // convert user-specified 'DELIMITER' to 'DELIM' for arrow csv options, all // options are case insensitive - if (options.contains("DELIMITER")) { - options.insert({"DELIM", options.at("DELIMITER")}); + auto it = options.find("DELIMITER"); + if (it != options.end()) { + options["DELIM"] = it->second; } - if (options.contains("DELIM")) { - auto value = options.at("DELIM"); + it = options.find("DELIM"); + if (it != options.end()) { + auto value = it->second; if (value.size() != 1) { THROW_INVALID_ARGUMENT_EXCEPTION( "Delimiter should be a single character: " + value); @@ -111,14 +116,19 @@ struct CSVReadFunction { static execution::Context execFunc( std::shared_ptr state) { validateAndConvertExecOptions(state); - // todo: get file system from vfs manager - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(state->schema.file); - state->schema.file.paths = fileInfo.resolvedPaths; + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(state->schema.file); + auto resolvedPaths = std::vector(); + for (const auto& path : state->schema.file.paths) { + const auto& resolved = fs->glob(path); + resolvedPaths.insert(resolvedPaths.end(), resolved.begin(), + resolved.end()); + } + state->schema.file.paths = std::move(resolvedPaths); auto optionsBuilder = std::make_unique(state); auto reader = std::make_unique( - state, std::move(optionsBuilder), fileInfo.fileSystem); + state, std::move(optionsBuilder), fs->toArrowFileSystem()); execution::Context ctx; auto localState = std::make_shared(); reader->read(localState, ctx); @@ -134,14 +144,19 @@ struct CSVReadFunction { externalSchema.entry = std::make_shared(); externalSchema.file = schema; validateAndConvertSniffOptions(externalSchema.file); - // todo: get file system from vfs manager - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(state->schema.file); - state->schema.file.paths = fileInfo.resolvedPaths; + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(state->schema.file); + auto resolvedPaths = std::vector(); + for (const auto& path : state->schema.file.paths) { + const auto& resolved = fs->glob(path); + resolvedPaths.insert(resolvedPaths.end(), resolved.begin(), + resolved.end()); + } + state->schema.file.paths = std::move(resolvedPaths); auto optionsBuilder = std::make_unique(state); auto reader = std::make_shared( - state, std::move(optionsBuilder), fileInfo.fileSystem); + state, std::move(optionsBuilder), fs->toArrowFileSystem()); auto sniffer = std::make_shared(reader); auto sniffResult = sniffer->sniff(); if (sniffResult) { diff --git a/include/neug/compiler/function/read_function.h b/include/neug/compiler/function/read_function.h index a89303558..a85989470 100644 --- a/include/neug/compiler/function/read_function.h +++ b/include/neug/compiler/function/read_function.h @@ -30,47 +30,6 @@ namespace neug { namespace function { -template -struct FileInfo { - // Stores the real path(s) of each file. The user-provided path may be a - // pattern path (i.e. /path/to/*.csv), and here we save the resolved real - // paths for each file. - std::vector resolvedPaths; - std::shared_ptr fileSystem; -}; - -template -class FileSystemProvider { - public: - // Return file info according to the protocol and paths specified in - // file_schema - // TODO: support different file systems by VFS manager in the future. - virtual FileInfo provide(const reader::FileSchema& schema, - bool resolvePaths = true) = 0; -}; - -class LocalFileSystemProvider - : public FileSystemProvider { - public: - // Simple implementation of a local file provider; - // TODO: should be replaced with a VFS manager in the future. - FileInfo provide(const reader::FileSchema& schema, - bool resolvePaths = true) override { - auto fs = std::make_shared(); - auto& paths = schema.paths; - std::vector resolvedPaths; - if (resolvePaths) { - for (auto& path : paths) { - auto files = neug::execution::ops::match_files_with_pattern(path); - resolvedPaths.insert(resolvedPaths.end(), files.begin(), files.end()); - } - } else { - resolvedPaths = paths; - } - return FileInfo{resolvedPaths, fs}; - } -}; - // The exec function invoked by data source operators to load data from external // data sources. using read_exec_func_t = std::function -#include "neug/compiler/storage/wal/wal.h" #include "neug/compiler/transaction/transaction.h" namespace neug { diff --git a/include/neug/compiler/gopt/g_vfs_holder.h b/include/neug/compiler/gopt/g_vfs_holder.h deleted file mode 100644 index 64cdfd7ba..000000000 --- a/include/neug/compiler/gopt/g_vfs_holder.h +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "neug/compiler/common/file_system/virtual_file_system.h" -#include "neug/utils/exception/exception.h" - -namespace neug { -namespace common { - -class VFSHolder { - private: - static VirtualFileSystem* vfs; - - public: - VFSHolder() = delete; - - static void setVFS(VirtualFileSystem* fileSystem); - - static VirtualFileSystem* getVFS(); -}; - -} // namespace common -} // namespace neug \ No newline at end of file diff --git a/include/neug/compiler/main/metadata_manager.h b/include/neug/compiler/main/metadata_manager.h index 514443a28..69f4a691e 100644 --- a/include/neug/compiler/main/metadata_manager.h +++ b/include/neug/compiler/main/metadata_manager.h @@ -31,6 +31,7 @@ #include "neug/compiler/main/option_config.h" #include "neug/compiler/storage/buffer_manager/memory_manager.h" #include "neug/utils/api.h" +#include "neug/utils/file_sys/file_system.h" namespace neug { namespace common { @@ -84,6 +85,8 @@ class MetadataManager { NEUG_API catalog::Catalog* getCatalog() { return catalog.get(); } + NEUG_API neug::fsys::FileSystemRegistry* getVFS() const { return vfs.get(); } + void updateSchema(const std::filesystem::path& schemaPath); void updateSchema(const std::string& schema); @@ -103,7 +106,7 @@ class MetadataManager { mutable std::atomic_flag statsManagerLock = ATOMIC_FLAG_INIT; std::shared_ptr statsManager; std::unique_ptr memoryManager; - std::unique_ptr vfs; + std::unique_ptr vfs; std::unique_ptr extensionManager; }; diff --git a/include/neug/compiler/gopt/g_catalog_holder.h b/include/neug/compiler/main/metadata_registry.h similarity index 56% rename from include/neug/compiler/gopt/g_catalog_holder.h rename to include/neug/compiler/main/metadata_registry.h index 0a7c246af..36e588979 100644 --- a/include/neug/compiler/gopt/g_catalog_holder.h +++ b/include/neug/compiler/main/metadata_registry.h @@ -16,21 +16,27 @@ #pragma once #include "neug/compiler/gopt/g_catalog.h" -#include "neug/utils/exception/exception.h" +#include "neug/compiler/main/metadata_manager.h" namespace neug { -namespace catalog { +namespace main { -class GCatalogHolder { +class MetadataRegistry { private: - static GCatalog* gCatalog; + // MetadataManger is a single instance, there will be only one instance of + // MetadataManager in the lifetime of the database. + static MetadataManager* metadataManager; public: - GCatalogHolder() = delete; + MetadataRegistry() = delete; - static void setGCatalog(GCatalog* catalog); + static void registerMetadata(main::MetadataManager* metadataManager); - static GCatalog* getGCatalog(); + static MetadataManager* getMetadata(); + + static catalog::GCatalog* getCatalog(); + + static neug::fsys::FileSystemRegistry* getVFS(); }; -} // namespace catalog -} // namespace neug \ No newline at end of file +} // namespace main +} // namespace neug diff --git a/include/neug/compiler/planner/gopt_planner.h b/include/neug/compiler/planner/gopt_planner.h index 47a3fdf00..daf38f15c 100644 --- a/include/neug/compiler/planner/gopt_planner.h +++ b/include/neug/compiler/planner/gopt_planner.h @@ -4,16 +4,13 @@ #pragma once #include -#include #include #include #include "neug/compiler/common/case_insensitive_map.h" -#include "neug/compiler/gopt/g_alias_manager.h" -#include "neug/compiler/gopt/g_catalog_holder.h" -#include "neug/compiler/gopt/g_physical_convertor.h" #include "neug/compiler/main/client_context.h" #include "neug/compiler/main/metadata_manager.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/compiler/planner/graph_planner.h" namespace neug { @@ -32,6 +29,7 @@ class GOptPlanner : public neug::IGraphPlanner { GOptPlanner() : IGraphPlanner() { database = std::make_unique(); ctx = std::make_unique(database.get()); + neug::main::MetadataRegistry::registerMetadata(database.get()); } inline std::string type() const override { return "gopt"; } diff --git a/include/neug/compiler/storage/storage_version_info.h b/include/neug/compiler/storage/storage_version_info.h deleted file mode 100644 index add074d18..000000000 --- a/include/neug/compiler/storage/storage_version_info.h +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include -#include -#include - -#include "neug/utils/api.h" - -namespace neug { -namespace storage { - -using storage_version_t = uint64_t; - -struct StorageVersionInfo { - static std::unordered_map - getStorageVersionInfo() { - return { - {"0.9.0", 37}, {"0.8.0", 36}, {"0.7.1.1", 35}, {"0.7.0", 34}, - {"0.6.0.6", 33}, {"0.6.0.5", 32}, {"0.6.0.2", 31}, {"0.6.0.1", 31}, - {"0.6.0", 28}, {"0.5.0", 28}, {"0.4.2", 27}, {"0.4.1", 27}, - {"0.4.0", 27}, {"0.3.2", 26}, {"0.3.1", 26}, {"0.3.0", 26}, - {"0.2.1", 25}, {"0.2.0", 25}, {"0.1.0", 24}, {"0.0.12.3", 24}, - {"0.0.12.2", 24}, {"0.0.12.1", 24}, {"0.0.12", 23}, {"0.0.11", 23}, - {"0.0.10", 23}, {"0.0.9", 23}, {"0.0.8", 17}, {"0.0.7", 15}, - {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3", 1}}; - } - - static NEUG_API storage_version_t getStorageVersion(); - - static constexpr const char* MAGIC_BYTES = "NEUG"; -}; - -} // namespace storage -} // namespace neug diff --git a/include/neug/compiler/storage/wal/wal.h b/include/neug/compiler/storage/wal/wal.h deleted file mode 100644 index b19ab9369..000000000 --- a/include/neug/compiler/storage/wal/wal.h +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include -#include - -#include "neug/compiler/common/enums/rel_direction.h" -#include "neug/compiler/common/file_system/file_info.h" -#include "neug/compiler/common/serializer/buffered_file.h" -#include "neug/compiler/storage/wal/wal_record.h" - -namespace neug { -namespace binder { -struct BoundAlterInfo; -struct BoundCreateTableInfo; -} // namespace binder -namespace common { -class BufferedFileWriter; -class VirtualFileSystem; -class ValueVector; -} // namespace common - -namespace catalog { -class CatalogEntry; -struct SequenceRollbackData; -} // namespace catalog - -namespace storage { -class WALReplayer; -class WAL { - friend class WALReplayer; - - public: - WAL() {} - WAL(const std::string& directory, bool readOnly, - common::VirtualFileSystem* vfs, main::ClientContext* context) {} - - ~WAL() = default; - - void logCreateCatalogEntryRecord(catalog::CatalogEntry* catalogEntry, - bool isInternal); - void logCreateCatalogEntryRecord( - catalog::CatalogEntry* catalogEntry, - std::vector childrenEntries, bool isInternal); - void logDropCatalogEntryRecord(uint64_t tableID, - catalog::CatalogEntryType type); - void logAlterCatalogEntryRecord(const binder::BoundAlterInfo* alterInfo); - void logUpdateSequenceRecord(common::sequence_id_t sequenceID, - uint64_t kCount); - - void logTableInsertion(common::table_id_t tableID, - common::TableType tableType, common::row_idx_t numRows, - const std::vector& vectors); - void logNodeDeletion(common::table_id_t tableID, common::offset_t nodeOffset, - common::ValueVector* pkVector); - void logNodeUpdate(common::table_id_t tableID, common::column_id_t columnID, - common::offset_t nodeOffset, - common::ValueVector* propertyVector); - void logRelDelete(common::table_id_t tableID, - common::ValueVector* srcNodeVector, - common::ValueVector* dstNodeVector, - common::ValueVector* relIDVector); - void logRelDetachDelete(common::table_id_t tableID, - common::RelDataDirection direction, - common::ValueVector* srcNodeVector); - void logRelUpdate(common::table_id_t tableID, common::column_id_t columnID, - common::ValueVector* srcNodeVector, - common::ValueVector* dstNodeVector, - common::ValueVector* relIDVector, - common::ValueVector* propertyVector); - void logCopyTableRecord(common::table_id_t tableID); - - void logBeginTransaction(); - void logAndFlushCommit(); - void logRollback(); - void logAndFlushCheckpoint(); - - void clearWAL(); - - void flushAllPages(); - - void addToUpdatedTables(const common::table_id_t nodeTableID) { - updatedTables.insert(nodeTableID); - } - std::unordered_set& getUpdatedTables() { - return updatedTables; - } - - uint64_t getFileSize() const { return bufferedWriter->getFileSize(); } - - private: - void addNewWALRecordNoLock(const WALRecord& walRecord); - - private: - std::unordered_set updatedTables; - std::unique_ptr fileInfo; - std::shared_ptr bufferedWriter; - std::string directory; - std::mutex mtx; - common::VirtualFileSystem* vfs; -}; - -} // namespace storage -} // namespace neug diff --git a/include/neug/compiler/storage/wal/wal_record.h b/include/neug/compiler/storage/wal/wal_record.h deleted file mode 100644 index ccdda3075..000000000 --- a/include/neug/compiler/storage/wal/wal_record.h +++ /dev/null @@ -1,438 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#pragma once - -#include - -#include "neug/compiler/binder/ddl/bound_alter_info.h" -#include "neug/compiler/catalog/catalog_entry/catalog_entry.h" -#include "neug/compiler/catalog/catalog_entry/sequence_catalog_entry.h" -#include "neug/compiler/common/enums/rel_direction.h" -#include "neug/compiler/common/enums/table_type.h" -#include "neug/compiler/common/vector/value_vector.h" - -namespace neug { -namespace common { -class Serializer; -class Deserializer; -} // namespace common - -namespace storage { - -enum class WALRecordType : uint8_t { - INVALID_RECORD = - 0, // This is not used for any record. 0 is reserved to detect cases - // where we accidentally read from an empty buffer. - BEGIN_TRANSACTION_RECORD = 1, - COMMIT_RECORD = 2, - ROLLBACK_RECORD = 3, - COPY_TABLE_RECORD = 13, - CREATE_CATALOG_ENTRY_RECORD = 14, - DROP_CATALOG_ENTRY_RECORD = 16, - ALTER_TABLE_ENTRY_RECORD = 17, - UPDATE_SEQUENCE_RECORD = 18, - TABLE_INSERTION_RECORD = 30, - NODE_DELETION_RECORD = 31, - NODE_UDPATE_RECORD = 32, - REL_DELETION_RECORD = 33, - REL_DETACH_DELETE_RECORD = 34, - REL_UPDATE_RECORD = 35, - CHECKPOINT_RECORD = 50, -}; - -struct WALRecord { - WALRecordType type = WALRecordType::INVALID_RECORD; - - WALRecord() = default; - explicit WALRecord(WALRecordType type) : type{type} {} - virtual ~WALRecord() = default; - DELETE_COPY_DEFAULT_MOVE(WALRecord); - - virtual void serialize(common::Serializer& serializer) const; - static std::unique_ptr deserialize( - common::Deserializer& deserializer, - const main::ClientContext& clientContext); - - template - const TARGET& constCast() const { - return common::neug_dynamic_cast(*this); - } -}; - -struct BeginTransactionRecord final : WALRecord { - BeginTransactionRecord() - : WALRecord{WALRecordType::BEGIN_TRANSACTION_RECORD} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct CommitRecord final : WALRecord { - CommitRecord() : WALRecord{WALRecordType::COMMIT_RECORD} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct RollbackRecord final : WALRecord { - RollbackRecord() : WALRecord{WALRecordType::ROLLBACK_RECORD} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct CheckpointRecord final : WALRecord { - CheckpointRecord() : WALRecord{WALRecordType::CHECKPOINT_RECORD} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct CreateCatalogEntryRecord final : WALRecord { - catalog::CatalogEntry* catalogEntry; - std::vector childrenEntries; - std::unique_ptr ownedCatalogEntry; - std::vector> ownedChildrenEntries; - bool isInternal = false; - - CreateCatalogEntryRecord() - : WALRecord{WALRecordType::CREATE_CATALOG_ENTRY_RECORD}, - catalogEntry{nullptr} {} - CreateCatalogEntryRecord(catalog::CatalogEntry* catalogEntry, bool isInternal) - : WALRecord{WALRecordType::CREATE_CATALOG_ENTRY_RECORD}, - catalogEntry{catalogEntry}, - isInternal{isInternal} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct CopyTableRecord final : WALRecord { - common::table_id_t tableID; - - CopyTableRecord() - : WALRecord{WALRecordType::COPY_TABLE_RECORD}, - tableID{common::INVALID_TABLE_ID} {} - explicit CopyTableRecord(common::table_id_t tableID) - : WALRecord{WALRecordType::COPY_TABLE_RECORD}, tableID{tableID} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct DropCatalogEntryRecord final : WALRecord { - common::oid_t entryID; - catalog::CatalogEntryType entryType; - - DropCatalogEntryRecord() - : WALRecord{WALRecordType::DROP_CATALOG_ENTRY_RECORD}, - entryID{common::INVALID_OID}, - entryType{} {} - DropCatalogEntryRecord(common::table_id_t entryID, - catalog::CatalogEntryType entryType) - : WALRecord{WALRecordType::DROP_CATALOG_ENTRY_RECORD}, - entryID{entryID}, - entryType{entryType} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct AlterTableEntryRecord final : WALRecord { - const binder::BoundAlterInfo* alterInfo; - std::unique_ptr ownedAlterInfo; - - AlterTableEntryRecord() - : WALRecord{WALRecordType::ALTER_TABLE_ENTRY_RECORD}, - alterInfo{nullptr} {} - explicit AlterTableEntryRecord(const binder::BoundAlterInfo* alterInfo) - : WALRecord{WALRecordType::ALTER_TABLE_ENTRY_RECORD}, - alterInfo{alterInfo} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct UpdateSequenceRecord final : WALRecord { - common::sequence_id_t sequenceID; - uint64_t kCount; - - UpdateSequenceRecord() - : WALRecord{WALRecordType::UPDATE_SEQUENCE_RECORD}, - sequenceID{0}, - kCount{0} {} - UpdateSequenceRecord(common::sequence_id_t sequenceID, uint64_t kCount) - : WALRecord{WALRecordType::UPDATE_SEQUENCE_RECORD}, - sequenceID{sequenceID}, - kCount{kCount} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer); -}; - -struct TableInsertionRecord final : WALRecord { - common::table_id_t tableID; - common::TableType tableType; - common::row_idx_t numRows; - std::vector vectors; - std::vector> ownedVectors; - - TableInsertionRecord() - : WALRecord{WALRecordType::TABLE_INSERTION_RECORD}, - tableID{common::INVALID_TABLE_ID}, - tableType{common::TableType::UNKNOWN}, - numRows{0} {} - TableInsertionRecord(common::table_id_t tableID, common::TableType tableType, - common::row_idx_t numRows, - const std::vector& vectors) - : WALRecord{WALRecordType::TABLE_INSERTION_RECORD}, - tableID{tableID}, - tableType{tableType}, - numRows{numRows}, - vectors{vectors} {} - TableInsertionRecord( - common::table_id_t tableID, common::TableType tableType, - common::row_idx_t numRows, - std::vector> vectors) - : WALRecord{WALRecordType::TABLE_INSERTION_RECORD}, - tableID{tableID}, - tableType{tableType}, - numRows{numRows}, - ownedVectors{std::move(vectors)} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer, - const main::ClientContext& clientContext); -}; - -struct NodeDeletionRecord final : WALRecord { - common::table_id_t tableID; - common::offset_t nodeOffset; - common::ValueVector* pkVector; - std::unique_ptr ownedPKVector; - - NodeDeletionRecord() - : WALRecord{WALRecordType::NODE_DELETION_RECORD}, - tableID{common::INVALID_TABLE_ID}, - nodeOffset{common::INVALID_OFFSET}, - pkVector{nullptr} {} - NodeDeletionRecord(common::table_id_t tableID, common::offset_t nodeOffset, - common::ValueVector* pkVector) - : WALRecord{WALRecordType::NODE_DELETION_RECORD}, - tableID{tableID}, - nodeOffset{nodeOffset}, - pkVector{pkVector} {} - NodeDeletionRecord(common::table_id_t tableID, common::offset_t nodeOffset, - std::unique_ptr pkVector) - : WALRecord{WALRecordType::NODE_DELETION_RECORD}, - tableID{tableID}, - nodeOffset{nodeOffset}, - pkVector{nullptr}, - ownedPKVector{std::move(pkVector)} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer, - const main::ClientContext& clientContext); -}; - -struct NodeUpdateRecord final : WALRecord { - common::table_id_t tableID; - common::column_id_t columnID; - common::offset_t nodeOffset; - common::ValueVector* propertyVector; - std::unique_ptr ownedPropertyVector; - - NodeUpdateRecord() - : WALRecord{WALRecordType::NODE_UDPATE_RECORD}, - tableID{common::INVALID_TABLE_ID}, - columnID{common::INVALID_COLUMN_ID}, - nodeOffset{common::INVALID_OFFSET}, - propertyVector{nullptr} {} - NodeUpdateRecord(common::table_id_t tableID, common::column_id_t columnID, - common::offset_t nodeOffset, - common::ValueVector* propertyVector) - : WALRecord{WALRecordType::NODE_UDPATE_RECORD}, - tableID{tableID}, - columnID{columnID}, - nodeOffset{nodeOffset}, - propertyVector{propertyVector} {} - NodeUpdateRecord(common::table_id_t tableID, common::column_id_t columnID, - common::offset_t nodeOffset, - std::unique_ptr propertyVector) - : WALRecord{WALRecordType::NODE_UDPATE_RECORD}, - tableID{tableID}, - columnID{columnID}, - nodeOffset{nodeOffset}, - propertyVector{nullptr}, - ownedPropertyVector{std::move(propertyVector)} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer, - const main::ClientContext& clientContext); -}; - -struct RelDeletionRecord final : WALRecord { - common::table_id_t tableID; - common::ValueVector* srcNodeIDVector; - common::ValueVector* dstNodeIDVector; - common::ValueVector* relIDVector; - std::unique_ptr ownedSrcNodeIDVector; - std::unique_ptr ownedDstNodeIDVector; - std::unique_ptr ownedRelIDVector; - - RelDeletionRecord() - : WALRecord{WALRecordType::REL_DELETION_RECORD}, - tableID{common::INVALID_TABLE_ID}, - srcNodeIDVector{nullptr}, - dstNodeIDVector{nullptr}, - relIDVector{nullptr} {} - RelDeletionRecord(common::table_id_t tableID, - common::ValueVector* srcNodeIDVector, - common::ValueVector* dstNodeIDVector, - common::ValueVector* relIDVector) - : WALRecord{WALRecordType::REL_DELETION_RECORD}, - tableID{tableID}, - srcNodeIDVector{srcNodeIDVector}, - dstNodeIDVector{dstNodeIDVector}, - relIDVector{relIDVector} {} - RelDeletionRecord(common::table_id_t tableID, - std::unique_ptr srcNodeIDVector, - std::unique_ptr dstNodeIDVector, - std::unique_ptr relIDVector) - : WALRecord{WALRecordType::REL_DELETION_RECORD}, - tableID{tableID}, - srcNodeIDVector{nullptr}, - dstNodeIDVector{nullptr}, - relIDVector{nullptr}, - ownedSrcNodeIDVector{std::move(srcNodeIDVector)}, - ownedDstNodeIDVector{std::move(dstNodeIDVector)}, - ownedRelIDVector{std::move(relIDVector)} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer, - const main::ClientContext& clientContext); -}; - -struct RelDetachDeleteRecord final : WALRecord { - common::table_id_t tableID; - common::RelDataDirection direction; - common::ValueVector* srcNodeIDVector; - std::unique_ptr ownedSrcNodeIDVector; - - RelDetachDeleteRecord() - : WALRecord{WALRecordType::REL_DETACH_DELETE_RECORD}, - tableID{common::INVALID_TABLE_ID}, - direction{common::RelDataDirection::FWD}, - srcNodeIDVector{nullptr} {} - RelDetachDeleteRecord(common::table_id_t tableID, - common::RelDataDirection direction, - common::ValueVector* srcNodeIDVector) - : WALRecord{WALRecordType::REL_DETACH_DELETE_RECORD}, - tableID{tableID}, - direction{direction}, - srcNodeIDVector{srcNodeIDVector} {} - RelDetachDeleteRecord(common::table_id_t tableID, - common::RelDataDirection direction, - std::unique_ptr srcNodeIDVector) - : WALRecord{WALRecordType::REL_DETACH_DELETE_RECORD}, - tableID{tableID}, - direction{direction}, - srcNodeIDVector{nullptr}, - ownedSrcNodeIDVector{std::move(srcNodeIDVector)} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer, - const main::ClientContext& clientContext); -}; - -struct RelUpdateRecord final : WALRecord { - common::table_id_t tableID; - common::column_id_t columnID; - common::ValueVector* srcNodeIDVector; - common::ValueVector* dstNodeIDVector; - common::ValueVector* relIDVector; - common::ValueVector* propertyVector; - std::unique_ptr ownedSrcNodeIDVector; - std::unique_ptr ownedDstNodeIDVector; - std::unique_ptr ownedRelIDVector; - std::unique_ptr ownedPropertyVector; - - RelUpdateRecord() - : WALRecord{WALRecordType::REL_UPDATE_RECORD}, - tableID{common::INVALID_TABLE_ID}, - columnID{common::INVALID_COLUMN_ID}, - srcNodeIDVector{nullptr}, - dstNodeIDVector{nullptr}, - relIDVector{nullptr}, - propertyVector{nullptr} {} - RelUpdateRecord(common::table_id_t tableID, common::column_id_t columnID, - common::ValueVector* srcNodeIDVector, - common::ValueVector* dstNodeIDVector, - common::ValueVector* relIDVector, - common::ValueVector* propertyVector) - : WALRecord{WALRecordType::REL_UPDATE_RECORD}, - tableID{tableID}, - columnID{columnID}, - srcNodeIDVector{srcNodeIDVector}, - dstNodeIDVector{dstNodeIDVector}, - relIDVector{relIDVector}, - propertyVector{propertyVector} {} - RelUpdateRecord(common::table_id_t tableID, common::column_id_t columnID, - std::unique_ptr srcNodeIDVector, - std::unique_ptr dstNodeIDVector, - std::unique_ptr relIDVector, - std::unique_ptr propertyVector) - : WALRecord{WALRecordType::REL_UPDATE_RECORD}, - tableID{tableID}, - columnID{columnID}, - srcNodeIDVector{nullptr}, - dstNodeIDVector{nullptr}, - relIDVector{nullptr}, - propertyVector{nullptr}, - ownedSrcNodeIDVector{std::move(srcNodeIDVector)}, - ownedDstNodeIDVector{std::move(dstNodeIDVector)}, - ownedRelIDVector{std::move(relIDVector)}, - ownedPropertyVector{std::move(propertyVector)} {} - - void serialize(common::Serializer& serializer) const override; - static std::unique_ptr deserialize( - common::Deserializer& deserializer, - const main::ClientContext& clientContext); -}; - -} // namespace storage -} // namespace neug diff --git a/include/neug/utils/file_sys/file_system.h b/include/neug/utils/file_sys/file_system.h new file mode 100644 index 000000000..9d186de38 --- /dev/null +++ b/include/neug/utils/file_sys/file_system.h @@ -0,0 +1,65 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "neug/utils/reader/reader.h" + +namespace neug { +namespace fsys { + +// Unified FileSystem interface for different protocols: local, http, s3, oss +class FileSystem { + public: + virtual ~FileSystem() = default; + // to support path regex patterns, i.e. /path/to/*.csv + virtual std::vector glob(const std::string& path) = 0; + // Currently, our read and write interfaces depend on arrow file system, so we + // return arrow file system here + virtual std::unique_ptr toArrowFileSystem() = 0; + // todo: add other methods like OpenFile, GetFileInfo ... +}; + +// Create specific FileSystem instance according to schema. +using FileSystemFactory = + std::function(const reader::FileSchema&)>; + +// Unified management of registered file system factories for different +// protocols +class FileSystemRegistry { + public: + FileSystemRegistry(); + ~FileSystemRegistry() = default; + + // register factory function for specific protocol + void Register(const std::string& protocol, FileSystemFactory factory); + + // create and return specific FileSystem instance according to schema + std::unique_ptr Provide(const reader::FileSchema& schema); + + private: + std::shared_mutex mtx; + std::unordered_map factories_; +}; +} // namespace fsys +} // namespace neug \ No newline at end of file diff --git a/src/compiler/binder/bind/CMakeLists.txt b/src/compiler/binder/bind/CMakeLists.txt index 0038f40f2..34e2ef01a 100644 --- a/src/compiler/binder/bind/CMakeLists.txt +++ b/src/compiler/binder/bind/CMakeLists.txt @@ -20,8 +20,6 @@ add_library( bind_transaction.cpp bind_updating_clause.cpp bind_extension.cpp - bind_export_database.cpp - bind_import_database.cpp bind_use_database.cpp bind_standalone_call_function.cpp bind_table_function.cpp) diff --git a/src/compiler/binder/bind/bind_export_database.cpp b/src/compiler/binder/bind/bind_export_database.cpp deleted file mode 100644 index e784da000..000000000 --- a/src/compiler/binder/bind/bind_export_database.cpp +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#include "neug/compiler/binder/bound_export_database.h" -#include "neug/compiler/binder/query/bound_regular_query.h" -#include "neug/compiler/catalog/catalog.h" -#include "neug/compiler/catalog/catalog_entry/index_catalog_entry.h" -#include "neug/compiler/catalog/catalog_entry/node_table_catalog_entry.h" -#include "neug/compiler/catalog/catalog_entry/rel_table_catalog_entry.h" -#include "neug/compiler/common/file_system/virtual_file_system.h" -#include "neug/compiler/common/string_utils.h" -#include "neug/compiler/main/client_context.h" -#include "neug/compiler/parser/parser.h" -#include "neug/compiler/parser/port_db.h" -#include "neug/compiler/parser/query/regular_query.h" -#include "neug/utils/exception/exception.h" - -using namespace neug::binder; -using namespace neug::common; -using namespace neug::parser; -using namespace neug::catalog; -using namespace neug::transaction; -using namespace neug::storage; - -namespace neug { -namespace binder { - -static std::vector getExportInfo( - const Catalog& catalog, main::ClientContext* context, Binder* binder) { - auto transaction = context->getTransaction(); - std::vector exportData; - for (auto tableEntry : - catalog.getTableEntries(transaction, false /*useInternal*/)) { - ExportedTableData tableData; - if (binder->bindExportTableData(tableData, *tableEntry, catalog, - transaction)) { - exportData.push_back(std::move(tableData)); - } - } - for (auto indexEntry : catalog.getIndexEntries(transaction)) { - ExportedTableData tableData; - auto tableToExport = indexEntry->getTableEntryToExport(context); - if (tableToExport == nullptr) { - continue; - } - binder->bindExportTableData(tableData, *tableToExport, catalog, - transaction); - exportData.push_back(std::move(tableData)); - } - return exportData; -} - -FileTypeInfo getFileType(case_insensitive_map_t& options) { - auto fileTypeInfo = FileTypeInfo{FileType::PARQUET, "PARQUET"}; - if (options.contains("FORMAT")) { - auto value = options.at("FORMAT"); - if (value.getDataType().getLogicalTypeID() != LogicalTypeID::STRING) { - THROW_BINDER_EXCEPTION("The type of format option must be a string."); - } - auto valueStr = value.getValue(); - StringUtils::toUpper(valueStr); - fileTypeInfo = FileTypeInfo{FileTypeUtils::fromString(valueStr), valueStr}; - options.erase("FORMAT"); - } - return fileTypeInfo; -} - -static void bindExportNodeTableDataQuery(const TableCatalogEntry& entry, - std::string& exportQuery) { - exportQuery = stringFormat("match (a:`{}`) return a.*", entry.getName()); -} - -static void bindExportRelTableDataQuery(const TableCatalogEntry& entry, - std::string& exportQuery, - const Catalog& catalog, - const Transaction* transaction) { - auto relTableEntry = entry.constPtrCast(); - auto& srcTableEntry = - catalog.getTableCatalogEntry(transaction, relTableEntry->getSrcTableID()) - ->constCast(); - auto& dstTableEntry = - catalog.getTableCatalogEntry(transaction, relTableEntry->getDstTableID()) - ->constCast(); - exportQuery = - stringFormat("match (a:`{}`)-[r:`{}`]->(b:`{}`) return a.{},b.{},r.*;", - srcTableEntry.getName(), relTableEntry->getName(), - dstTableEntry.getName(), srcTableEntry.getPrimaryKeyName(), - dstTableEntry.getPrimaryKeyName()); -} - -static bool bindExportQuery(std::string& exportQuery, - const TableCatalogEntry& entry, - const Catalog& catalog, - const Transaction* transaction) { - switch (entry.getTableType()) { - case TableType::NODE: { - bindExportNodeTableDataQuery(entry, exportQuery); - } break; - case TableType::REL: { - bindExportRelTableDataQuery(entry, exportQuery, catalog, transaction); - } break; - default: - return false; - } - return true; -} - -bool Binder::bindExportTableData(ExportedTableData& tableData, - const TableCatalogEntry& entry, - const Catalog& catalog, - const Transaction* transaction) { - std::string exportQuery; - tableData.tableName = entry.getName(); - if (!bindExportQuery(exportQuery, entry, catalog, transaction)) { - return false; - } - auto parsedStatement = Parser::parseQuery(exportQuery); - NEUG_ASSERT(parsedStatement.size() == 1); - auto parsedQuery = parsedStatement[0]->constPtrCast(); - clientContext->setUseInternalCatalogEntry(true /* useInternalCatalogEntry */); - auto query = bindQuery(*parsedQuery); - clientContext->setUseInternalCatalogEntry( - false /* useInternalCatalogEntry */); - auto columns = query->getStatementResult()->getColumns(); - for (auto& column : columns) { - auto columnName = - column->hasAlias() ? column->getAlias() : column->toString(); - tableData.columnNames.push_back(columnName); - tableData.columnTypes.push_back(column->getDataType().copy()); - } - tableData.regularQuery = std::move(query); - return true; -} - -std::unique_ptr Binder::bindExportDatabaseClause( - const Statement& statement) { - auto& exportDB = statement.constCast(); - auto boundFilePath = clientContext->getVFSUnsafe()->expandPath( - clientContext, exportDB.getFilePath()); - auto exportData = - getExportInfo(*clientContext->getCatalog(), clientContext, this); - auto parsedOptions = bindParsingOptions(exportDB.getParsingOptionsRef()); - auto fileTypeInfo = getFileType(parsedOptions); - switch (fileTypeInfo.fileType) { - case FileType::CSV: - case FileType::PARQUET: - break; - default: - THROW_BINDER_EXCEPTION( - "Export database currently only supports csv and parquet files."); - } - if (fileTypeInfo.fileType != FileType::CSV && parsedOptions.size() != 0) { - THROW_BINDER_EXCEPTION("Only export to csv can have options."); - } - return std::make_unique(boundFilePath, fileTypeInfo, - std::move(exportData), - std::move(parsedOptions)); -} - -} // namespace binder -} // namespace neug diff --git a/src/compiler/binder/bind/bind_extension.cpp b/src/compiler/binder/bind/bind_extension.cpp index 541363e77..dc4f0b4a6 100644 --- a/src/compiler/binder/bind/bind_extension.cpp +++ b/src/compiler/binder/bind/bind_extension.cpp @@ -22,7 +22,7 @@ #include "neug/compiler/binder/binder.h" #include "neug/compiler/binder/bound_extension_statement.h" -#include "neug/compiler/common/file_system/local_file_system.h" + #include "neug/compiler/common/string_utils.h" #include "neug/compiler/extension/extension.h" #include "neug/compiler/parser/extension_statement.h" diff --git a/src/compiler/binder/bind/bind_file_scan.cpp b/src/compiler/binder/bind/bind_file_scan.cpp index f97187043..b7a179ace 100644 --- a/src/compiler/binder/bind/bind_file_scan.cpp +++ b/src/compiler/binder/bind/bind_file_scan.cpp @@ -25,19 +25,16 @@ #include "neug/compiler/binder/bound_scan_source.h" #include "neug/compiler/binder/expression/expression.h" #include "neug/compiler/binder/expression/literal_expression.h" -#include "neug/compiler/common/file_system/local_file_system.h" -#include "neug/compiler/common/file_system/virtual_file_system.h" #include "neug/compiler/common/string_format.h" #include "neug/compiler/common/string_utils.h" -#include "neug/compiler/extension/extension_manager.h" #include "neug/compiler/function/read_function.h" #include "neug/compiler/function/table/bind_input.h" #include "neug/compiler/gopt/g_type_converter.h" +#include "neug/compiler/main/client_context.h" #include "neug/compiler/parser/expression/parsed_function_expression.h" #include "neug/compiler/parser/scan_source.h" #include "neug/utils/exception/exception.h" #include "neug/utils/exception/message.h" -#include "neug/utils/reader/reader.h" #include "neug/utils/reader/schema.h" using namespace neug::parser; @@ -49,10 +46,22 @@ using namespace neug::catalog; namespace neug { namespace binder { +bool isCompressedFile(const std::filesystem::path& path) { + return StringUtils::getLower(path.extension().string()) == ".gz"; +} + +std::string getFileExtension(const std::filesystem::path& path) { + auto extension = path.extension(); + if (isCompressedFile(path)) { + extension = path.stem().extension(); + } + return extension.string(); +} + FileTypeInfo bindSingleFileType(const main::ClientContext* context, const std::string& filePath) { std::filesystem::path fileName(filePath); - auto extension = context->getVFSUnsafe()->getFileExtension(fileName); + auto extension = getFileExtension(fileName); return FileTypeInfo{ FileTypeUtils::getFileTypeFromExtension(extension), extension.substr(std::min(1, extension.length()))}; diff --git a/src/compiler/binder/bind/bind_import_database.cpp b/src/compiler/binder/bind/bind_import_database.cpp deleted file mode 100644 index ad4d592b6..000000000 --- a/src/compiler/binder/bind/bind_import_database.cpp +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#include "neug/compiler/binder/binder.h" -#include "neug/compiler/binder/bound_import_database.h" -#include "neug/compiler/common/copier_config/csv_reader_config.h" -#include "neug/compiler/common/file_system/virtual_file_system.h" -#include "neug/compiler/main/client_context.h" -#include "neug/compiler/parser/copy.h" -#include "neug/compiler/parser/parser.h" -#include "neug/compiler/parser/port_db.h" -#include "neug/utils/exception/exception.h" - -using namespace neug::common; -using namespace neug::parser; - -namespace neug { -namespace binder { - -static std::string getQueryFromFile(VirtualFileSystem* vfs, - const std::string& boundFilePath, - const std::string& fileName, - main::ClientContext* context) { - auto filePath = vfs->joinPath(boundFilePath, fileName); - if (!vfs->fileOrPathExists(filePath, context)) { - if (fileName == PortDBConstants::INDEX_FILE_NAME) { - return ""; - } - THROW_BINDER_EXCEPTION(stringFormat("File {} does not exist.", filePath)); - } - auto fileInfo = vfs->openFile(filePath, FileOpenFlags(FileFlags::READ_ONLY -#ifdef _WIN32 - | FileFlags::BINARY -#endif - )); - auto fsize = fileInfo->getFileSize(); - auto buffer = std::make_unique(fsize); - fileInfo->readFile(buffer.get(), fsize); - return std::string(buffer.get(), fsize); -} - -static std::string getColumnNamesToCopy(const CopyFrom& copyFrom) { - std::string columns = ""; - std::string delimiter = ""; - for (auto& column : copyFrom.getCopyColumnInfo().columnNames) { - columns += delimiter; - columns += "`" + column + "`"; - if (delimiter == "") { - delimiter = ","; - } - } - if (columns.empty()) { - return columns; - } - return stringFormat("({})", columns); -} - -static std::string getCopyFilePath(const std::string& boundFilePath, - const std::string& filePath) { - if (filePath[0] == '/' || (std::isalpha(filePath[0]) && filePath[1] == ':')) { - // Note: - // Unix absolute path starts with '/' - // Windows absolute path starts with "[DiskID]://" - // This code path is for backward compatability, we used to export the - // absolute path for csv files to copy.cypher files. - return filePath; - } - - auto path = boundFilePath + "/" + filePath; -#if defined(_WIN32) - // TODO(Ziyi): This is a temporary workaround because our parser requires - // input cypher queries to escape all special characters in string literal. - // E.g. The user input query is: 'IMPORT DATABASE 'C:\\db\\uw''. The parser - // removes any escaped characters and this function accepts the path parameter - // as 'C:\db\uw'. Then the ImportDatabase operator gives the file path to - // antlr4 parser directly without escaping any special characters in the path, - // which causes a parser exception. However, the parser exception is not - // thrown properly which leads to the undefined behaviour. - size_t pos = 0; - while ((pos = path.find('\\', pos)) != std::string::npos) { - path.replace(pos, 1, "\\\\"); - pos += 2; - } -#endif - return path; -} - -std::unique_ptr Binder::bindImportDatabaseClause( - const Statement& statement) { - auto& importDB = statement.constCast(); - auto fs = clientContext->getVFSUnsafe(); - auto boundFilePath = fs->expandPath(clientContext, importDB.getFilePath()); - if (!fs->fileOrPathExists(boundFilePath, clientContext)) { - THROW_BINDER_EXCEPTION( - stringFormat("Directory {} does not exist.", boundFilePath)); - } - std::string finalQueryStatements; - finalQueryStatements += getQueryFromFile( - fs, boundFilePath, PortDBConstants::SCHEMA_FILE_NAME, clientContext); - // replace the path in copy from statement with the bound path - auto copyQuery = getQueryFromFile( - fs, boundFilePath, PortDBConstants::COPY_FILE_NAME, clientContext); - if (!copyQuery.empty()) { - auto parsedStatements = Parser::parseQuery(copyQuery); - for (auto& parsedStatement : parsedStatements) { - NEUG_ASSERT(parsedStatement->getStatementType() == - StatementType::COPY_FROM); - auto& copyFromStatement = parsedStatement->constCast(); - NEUG_ASSERT(copyFromStatement.getSource()->type == ScanSourceType::FILE); - auto filePaths = copyFromStatement.getSource() - ->constPtrCast() - ->filePaths; - NEUG_ASSERT(filePaths.size() == 1); - auto fileTypeInfo = bindFileTypeInfo(filePaths); - std::string query; - auto copyFilePath = getCopyFilePath(boundFilePath, filePaths[0]); - auto columnNames = getColumnNamesToCopy(copyFromStatement); - if (fileTypeInfo.fileType == FileType::CSV) { - auto csvConfig = CSVReaderConfig::construct( - bindParsingOptions(copyFromStatement.getParsingOptions())); - query = stringFormat("COPY `{}` {} FROM \"{}\" {};", - copyFromStatement.getTableName(), columnNames, - copyFilePath, csvConfig.option.toCypher()); - } else { - query = stringFormat("COPY `{}` {} FROM \"{}\";", - copyFromStatement.getTableName(), columnNames, - copyFilePath); - } - finalQueryStatements += query; - } - } - return std::make_unique( - boundFilePath, finalQueryStatements, - getQueryFromFile(fs, boundFilePath, PortDBConstants::INDEX_FILE_NAME, - clientContext)); -} - -} // namespace binder -} // namespace neug diff --git a/src/compiler/binder/binder.cpp b/src/compiler/binder/binder.cpp index b6694d90f..605b0b42b 100644 --- a/src/compiler/binder/binder.cpp +++ b/src/compiler/binder/binder.cpp @@ -96,12 +96,6 @@ std::unique_ptr Binder::bind(const Statement& statement) { case StatementType::EXTENSION: { boundStatement = bindExtension(statement); } break; - case StatementType::EXPORT_DATABASE: { - boundStatement = bindExportDatabaseClause(statement); - } break; - case StatementType::IMPORT_DATABASE: { - boundStatement = bindImportDatabaseClause(statement); - } break; case StatementType::ATTACH_DATABASE: { boundStatement = bindAttachDatabase(statement); } break; diff --git a/src/compiler/catalog/catalog.cpp b/src/compiler/catalog/catalog.cpp index a4469af81..24e8b5b11 100644 --- a/src/compiler/catalog/catalog.cpp +++ b/src/compiler/catalog/catalog.cpp @@ -32,8 +32,6 @@ #include "neug/compiler/catalog/catalog_entry/scalar_macro_catalog_entry.h" #include "neug/compiler/catalog/catalog_entry/sequence_catalog_entry.h" #include "neug/compiler/catalog/catalog_entry/type_catalog_entry.h" -#include "neug/compiler/common/file_system/virtual_file_system.h" -#include "neug/compiler/common/serializer/buffered_file.h" #include "neug/compiler/common/serializer/deserializer.h" #include "neug/compiler/common/serializer/serializer.h" #include "neug/compiler/common/string_format.h" diff --git a/src/compiler/common/CMakeLists.txt b/src/compiler/common/CMakeLists.txt index 33d04eec1..a4702fe2a 100644 --- a/src/compiler/common/CMakeLists.txt +++ b/src/compiler/common/CMakeLists.txt @@ -6,7 +6,6 @@ add_subdirectory(serializer) add_subdirectory(task_system) add_subdirectory(types) add_subdirectory(vector) -add_subdirectory(file_system) add_library(compiler_common OBJECT diff --git a/src/compiler/common/file_system/CMakeLists.txt b/src/compiler/common/file_system/CMakeLists.txt deleted file mode 100644 index 683a8038d..000000000 --- a/src/compiler/common/file_system/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -add_library(neug_file_system - OBJECT - compressed_file_system.cpp - file_info.cpp - file_system.cpp - local_file_system.cpp - virtual_file_system.cpp) - -target_link_libraries(neug_file_system Glob) - -set(ALL_OBJECT_FILES - ${ALL_OBJECT_FILES} $ - PARENT_SCOPE) diff --git a/src/compiler/common/file_system/compressed_file_system.cpp b/src/compiler/common/file_system/compressed_file_system.cpp deleted file mode 100644 index c86d44f49..000000000 --- a/src/compiler/common/file_system/compressed_file_system.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#include "neug/compiler/common/file_system/compressed_file_system.h" - -#include - -#include "neug/utils/exception/exception.h" - -namespace neug { -namespace common { - -int64_t CompressedFileSystem::readFile(neug::common::FileInfo& fileInfo, - void* buf, size_t numBytes) const { - auto& compressedFileInfo = fileInfo.cast(); - return compressedFileInfo.readData(buf, numBytes); -} - -void CompressedFileSystem::reset(neug::common::FileInfo& fileInfo) { - auto& compressedFileInfo = fileInfo.cast(); - compressedFileInfo.childFileInfo->reset(); - compressedFileInfo.initialize(); -} - -uint64_t CompressedFileSystem::getFileSize( - const neug::common::FileInfo& fileInfo) const { - auto& compressedFileInfo = fileInfo.constCast(); - return compressedFileInfo.childFileInfo->getFileSize(); -} - -void CompressedFileSystem::syncFile( - const neug::common::FileInfo& fileInfo) const { - auto& compressedFileInfo = fileInfo.constCast(); - return compressedFileInfo.childFileInfo->syncFile(); -} - -void CompressedFileSystem::readFromFile(FileInfo& /*fileInfo*/, - void* /*buffer*/, uint64_t /*numBytes*/, - uint64_t /*position*/) const { - THROW_IO_EXCEPTION( - "Only sequential read is allowed in compressed file system."); -} - -void CompressedFileInfo::initialize() { - close(); - streamData.inputBufSize = compressedFS.getInputBufSize(); - streamData.outputBufSize = compressedFS.getOutputBufSize(); - streamData.inputBuf = std::make_unique(streamData.inputBufSize); - streamData.inputBufStart = streamData.inputBuf.get(); - streamData.inputBufEnd = streamData.inputBuf.get(); - streamData.outputBuf = std::make_unique(streamData.outputBufSize); - streamData.outputBufStart = streamData.outputBuf.get(); - streamData.outputBufEnd = streamData.outputBuf.get(); - currentPos = 0; - stream_wrapper = compressedFS.createStream(); - stream_wrapper->initialize(*this); -} - -int64_t CompressedFileInfo::readData(void* buffer, size_t numBytes) { - common::idx_t totalNumBytesRead = 0; - while (true) { - if (streamData.outputBufStart != streamData.outputBufEnd) { - auto available = std::min( - numBytes, streamData.outputBufEnd - streamData.outputBufStart); - memcpy(reinterpret_cast(buffer) + totalNumBytesRead, - streamData.outputBufStart, available); - streamData.outputBufStart += available; - totalNumBytesRead += available; - numBytes -= available; - if (numBytes == 0) { - return totalNumBytesRead; - } - } - if (!stream_wrapper) { - return totalNumBytesRead; - } - currentPos += streamData.inputBufEnd - streamData.inputBufStart; - streamData.outputBufStart = streamData.outputBuf.get(); - streamData.outputBufEnd = streamData.outputBuf.get(); - if (streamData.refresh && - (streamData.inputBufEnd == - streamData.inputBuf.get() + streamData.inputBufSize)) { - auto numBytesLeftInBuf = - streamData.inputBufEnd - streamData.inputBufStart; - memmove(streamData.inputBuf.get(), streamData.inputBufStart, - numBytesLeftInBuf); - streamData.inputBufStart = streamData.inputBuf.get(); - auto sz = - childFileInfo->readFile(streamData.inputBufStart + numBytesLeftInBuf, - streamData.inputBufSize - numBytesLeftInBuf); - streamData.inputBufEnd = - streamData.inputBufStart + numBytesLeftInBuf + sz; - if (sz <= 0) { - stream_wrapper.reset(); - break; - } - } - - if (streamData.inputBufStart == streamData.inputBufEnd) { - streamData.inputBufStart = streamData.inputBuf.get(); - streamData.inputBufEnd = streamData.inputBufStart; - auto sz = childFileInfo->readFile(streamData.inputBuf.get(), - streamData.inputBufSize); - if (sz <= 0) { - stream_wrapper.reset(); - break; - } - streamData.inputBufEnd = streamData.inputBufStart + sz; - } - - auto finished = stream_wrapper->read(streamData); - if (finished) { - stream_wrapper.reset(); - } - } - return totalNumBytesRead; -} - -void CompressedFileInfo::close() { - if (stream_wrapper) { - stream_wrapper->close(); - stream_wrapper.reset(); - } - streamData.inputBuf.reset(); - streamData.outputBuf.reset(); - streamData.outputBufStart = nullptr; - streamData.outputBufEnd = nullptr; - streamData.inputBufStart = nullptr; - streamData.inputBufEnd = nullptr; - streamData.inputBufSize = 0; - streamData.outputBufSize = 0; - streamData.refresh = false; -} - -} // namespace common -} // namespace neug diff --git a/src/compiler/common/file_system/file_info.cpp b/src/compiler/common/file_system/file_info.cpp deleted file mode 100644 index eccf709ed..000000000 --- a/src/compiler/common/file_system/file_info.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#include "neug/compiler/common/file_system/file_info.h" - -#include "neug/compiler/common/file_system/file_system.h" - -#if defined(_WIN32) -#include -#else -#include -#endif - -namespace neug { -namespace common { - -uint64_t FileInfo::getFileSize() const { - return fileSystem->getFileSize(*this); -} - -void FileInfo::readFromFile(void* buffer, uint64_t numBytes, - uint64_t position) { - fileSystem->readFromFile(*this, buffer, numBytes, position); -} - -int64_t FileInfo::readFile(void* buf, size_t nbyte) { - return fileSystem->readFile(*this, buf, nbyte); -} - -void FileInfo::writeFile(const uint8_t* buffer, uint64_t numBytes, - uint64_t offset) { - fileSystem->writeFile(*this, buffer, numBytes, offset); -} - -void FileInfo::syncFile() const { fileSystem->syncFile(*this); } - -int64_t FileInfo::seek(uint64_t offset, int whence) { - return fileSystem->seek(*this, offset, whence); -} - -void FileInfo::reset() { fileSystem->reset(*this); } - -void FileInfo::truncate(uint64_t size) { fileSystem->truncate(*this, size); } - -bool FileInfo::canPerformSeek() const { return fileSystem->canPerformSeek(); } - -} // namespace common -} // namespace neug diff --git a/src/compiler/common/file_system/file_system.cpp b/src/compiler/common/file_system/file_system.cpp deleted file mode 100644 index c5fb6a9ba..000000000 --- a/src/compiler/common/file_system/file_system.cpp +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#include "neug/compiler/common/file_system/file_system.h" - -#include "neug/compiler/common/string_utils.h" - -namespace neug { -namespace common { - -void FileSystem::overwriteFile(const std::string& /*from*/, - const std::string& /*to*/) { - NEUG_UNREACHABLE; -} - -void FileSystem::copyFile(const std::string& /*from*/, - const std::string& /*to*/) { - NEUG_UNREACHABLE; -} - -void FileSystem::createDir(const std::string& /*dir*/) const { - NEUG_UNREACHABLE; -} - -void FileSystem::removeFileIfExists(const std::string& /*path*/) { - NEUG_UNREACHABLE; -} - -bool FileSystem::fileOrPathExists(const std::string& /*path*/, - main::ClientContext* /*context*/) { - NEUG_UNREACHABLE; -} - -std::string FileSystem::expandPath(main::ClientContext* /*context*/, - const std::string& path) const { - return path; -} - -std::string FileSystem::joinPath(const std::string& base, - const std::string& part) { - return base + "/" + part; -} - -std::string FileSystem::getFileExtension(const std::filesystem::path& path) { - auto extension = path.extension(); - if (isCompressedFile(path)) { - extension = path.stem().extension(); - } - return extension.string(); -} - -bool FileSystem::isCompressedFile(const std::filesystem::path& path) { - return isGZIPCompressed(path); -} - -std::string FileSystem::getFileName(const std::filesystem::path& path) { - return path.filename().string(); -} - -void FileSystem::writeFile(FileInfo& /*fileInfo*/, const uint8_t* /*buffer*/, - uint64_t /*numBytes*/, uint64_t /*offset*/) const { - NEUG_UNREACHABLE; -} - -void FileSystem::truncate(FileInfo& /*fileInfo*/, uint64_t /*size*/) const { - NEUG_UNREACHABLE; -} - -void FileSystem::reset(neug::common::FileInfo& fileInfo) { - fileInfo.seek(0, SEEK_SET); -} - -bool FileSystem::isGZIPCompressed(const std::filesystem::path& path) { - return StringUtils::getLower(path.extension().string()) == ".gz"; -} - -} // namespace common -} // namespace neug diff --git a/src/compiler/common/file_system/local_file_system.cpp b/src/compiler/common/file_system/local_file_system.cpp deleted file mode 100644 index c54f19044..000000000 --- a/src/compiler/common/file_system/local_file_system.cpp +++ /dev/null @@ -1,580 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#include "neug/compiler/common/file_system/local_file_system.h" - -#include "glob/glob.hpp" -#include "neug/compiler/common/assert.h" -#include "neug/compiler/common/string_format.h" -#include "neug/compiler/common/string_utils.h" -#include "neug/compiler/common/system_message.h" -#include "neug/compiler/main/client_context.h" -#include "neug/compiler/main/settings.h" -#include "neug/utils/exception/exception.h" - -#if defined(_WIN32) -#include -#include -#include -#include "neug/compiler/common/windows_utils.h" -#else -#include -#include "sys/stat.h" -#endif - -#include - -#include - -namespace neug { -namespace common { - -LocalFileInfo::~LocalFileInfo() { -#ifdef _WIN32 - if (handle != nullptr) { - CloseHandle((HANDLE) handle); - } -#else - if (fd != -1) { - close(fd); - } -#endif -} - -static void validateFileFlags(uint8_t flags) { - const bool isRead = flags & FileFlags::READ_ONLY; - const bool isWrite = flags & FileFlags::WRITE; - NEUG_UNUSED(isRead); - NEUG_UNUSED(isWrite); - // Require either READ or WRITE (or both). - NEUG_ASSERT(isRead || isWrite); - // CREATE flags require writing. - NEUG_ASSERT(isWrite || !(flags & FileFlags::CREATE_IF_NOT_EXISTS)); - NEUG_ASSERT(isWrite || !(flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS)); - // CREATE_IF_NOT_EXISTS and CREATE_AND_TRUNCATE_IF_EXISTS flags cannot be - // combined. - NEUG_ASSERT(!(flags & FileFlags::CREATE_IF_NOT_EXISTS && - flags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS)); -} - -std::unique_ptr LocalFileSystem::openFile( - const std::string& path, FileOpenFlags flags, - main::ClientContext* context) { - auto fullPath = expandPath(context, path); - auto fileFlags = flags.flags; - validateFileFlags(fileFlags); - - int openFlags = 0; - bool readMode = fileFlags & FileFlags::READ_ONLY; - bool writeMode = fileFlags & FileFlags::WRITE; - if (readMode && writeMode) { - openFlags = O_RDWR; - } else if (readMode) { - openFlags = O_RDONLY; - } else if (writeMode) { - openFlags = O_WRONLY; - } else { - // LCOV_EXCL_START - THROW_INTERNAL_EXCEPTION( - "READ, WRITE or both should be specified when opening a file."); - // LCOV_EXCL_STOP - } - if (writeMode) { - NEUG_ASSERT(fileFlags & FileFlags::WRITE); - if (fileFlags & FileFlags::CREATE_IF_NOT_EXISTS) { - openFlags |= O_CREAT; - } else if (fileFlags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS) { - openFlags |= O_CREAT | O_TRUNC; - } - } - -#if defined(_WIN32) - auto dwDesiredAccess = 0ul; - int dwCreationDisposition; - if (fileFlags & FileFlags::CREATE_IF_NOT_EXISTS) { - dwCreationDisposition = OPEN_ALWAYS; - } else if (fileFlags & FileFlags::CREATE_AND_TRUNCATE_IF_EXISTS) { - dwCreationDisposition = CREATE_ALWAYS; - } else { - dwCreationDisposition = OPEN_EXISTING; - } - auto dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; - if (openFlags & (O_CREAT | O_WRONLY | O_RDWR)) { - dwDesiredAccess |= GENERIC_WRITE; - } - // O_RDONLY is 0 in practice, so openFlags & (O_RDONLY | O_RDWR) doesn't work. - if (!(openFlags & O_WRONLY)) { - dwDesiredAccess |= GENERIC_READ; - } - if (openFlags & FileFlags::BINARY) { - dwDesiredAccess |= _O_BINARY; - } - - HANDLE handle = - CreateFileA(fullPath.c_str(), dwDesiredAccess, dwShareMode, nullptr, - dwCreationDisposition, FILE_ATTRIBUTE_NORMAL, nullptr); - if (handle == INVALID_HANDLE_VALUE) { - THROW_IO_EXCEPTION(stringFormat( - "Cannot open file. path: {} - Error {}: {}", fullPath, GetLastError(), - std::system_category().message(GetLastError()))); - } - if (flags.lockType != FileLockType::NO_LOCK) { - DWORD dwFlags = flags.lockType == FileLockType::READ_LOCK - ? LOCKFILE_FAIL_IMMEDIATELY - : LOCKFILE_FAIL_IMMEDIATELY | LOCKFILE_EXCLUSIVE_LOCK; - OVERLAPPED overlapped = {0}; - overlapped.Offset = 0; - BOOL rc = LockFileEx(handle, dwFlags, 0 /*reserved*/, 1 /*numBytesLow*/, - 0 /*numBytesHigh*/, &overlapped); - if (!rc) { - THROW_IO_EXCEPTION( - "Could not set lock on file : " + fullPath + "\n" + - "See the docs: https://docs.kuzudb.com/concurrency for " - "more information."); - } - } - return std::make_unique(fullPath, handle, this); -#else - int fd = open(fullPath.c_str(), openFlags, 0644); - if (fd == -1) { - THROW_IO_EXCEPTION( - stringFormat("Cannot open file {}: {}", fullPath, posixErrMessage())); - } - if (flags.lockType != FileLockType::NO_LOCK) { - struct flock fl {}; - memset(&fl, 0, sizeof fl); - fl.l_type = flags.lockType == FileLockType::READ_LOCK ? F_RDLCK : F_WRLCK; - fl.l_whence = SEEK_SET; - fl.l_start = 0; - fl.l_len = 0; - int rc = fcntl(fd, F_SETLK, &fl); - if (rc == -1) { - THROW_IO_EXCEPTION( - "Could not set lock on file : " + fullPath + "\n" + - "See the docs: https://docs.kuzudb.com/concurrency for " - "more information."); - } - } - return std::make_unique(fullPath, fd, this); -#endif -} - -std::vector LocalFileSystem::glob(main::ClientContext* context, - const std::string& path) const { - if (path.empty()) { - return std::vector(); - } - std::vector pathsToGlob; - if (path[0] == '/' || (std::isalpha(path[0]) && path[1] == ':')) { - // Note: - // Unix absolute path starts with '/' - // Windows absolute path starts with "[DiskID]://" - pathsToGlob.push_back(path); - } else if (path[0] == '~') { - // Expands home directory - auto homeDirectory = - context->getCurrentSetting(main::HomeDirectorySetting::name) - .getValue(); - pathsToGlob.push_back(homeDirectory + path.substr(1)); - } else { - // Relative path to the file search path. - auto globbedPaths = glob::glob(path); - if (!globbedPaths.empty()) { - pathsToGlob.push_back(path); - } else { - auto fileSearchPath = - context->getCurrentSetting(main::FileSearchPathSetting::name) - .getValue(); - if (fileSearchPath != "") { - auto searchPaths = common::StringUtils::split(fileSearchPath, ","); - for (auto& searchPath : searchPaths) { - pathsToGlob.push_back( - common::stringFormat("{}/{}", searchPath, path)); - } - } - } - } - std::vector result; - for (auto& pathToGlob : pathsToGlob) { - for (auto& resultPath : glob::glob(pathToGlob)) { - result.emplace_back(resultPath.string()); - } - } - return result; -} - -void LocalFileSystem::overwriteFile(const std::string& from, - const std::string& to) { - if (!fileOrPathExists(from) || !fileOrPathExists(to)) - return; - std::error_code errorCode; - if (!std::filesystem::copy_file( - from, to, std::filesystem::copy_options::overwrite_existing, - errorCode)) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION( - stringFormat("Error copying file {} to {}. ErrorMessage: {}", from, to, - errorCode.message())); - // LCOV_EXCL_STOP - } -} - -void LocalFileSystem::copyFile(const std::string& from, const std::string& to) { - if (!fileOrPathExists(from)) - return; - std::error_code errorCode; - if (!std::filesystem::copy_file(from, to, std::filesystem::copy_options::none, - errorCode)) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION( - stringFormat("Error copying file {} to {}. ErrorMessage: {}", from, to, - errorCode.message())); - // LCOV_EXCL_STOP - } -} - -void LocalFileSystem::createDir(const std::string& dir) const { - try { - if (std::filesystem::exists(dir)) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION(stringFormat("Directory {} already exists.", dir)); - // LCOV_EXCL_STOP - } - auto directoryToCreate = dir; - if (directoryToCreate.ends_with('/') -#if defined(_WIN32) - || directoryToCreate.ends_with('\\') -#endif - ) { - // This is a known issue with std::filesystem::create_directories. (link: - // https://github.com/llvm/llvm-project/issues/60634). We have to manually - // remove the last '/' if the path ends with '/'. (Added the second one - // for windows) - directoryToCreate = - directoryToCreate.substr(0, directoryToCreate.size() - 1); - } - std::error_code errCode; - if (!std::filesystem::create_directories(directoryToCreate, errCode)) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION(stringFormat( - "Directory {} cannot be created. Check if it exists and remove it.", - directoryToCreate)); - // LCOV_EXCL_STOP - } - if (errCode) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION( - stringFormat("Failed to create directory: {}, error message: {}.", - dir, errCode.message())); - // LCOV_EXCL_STOP - } - } catch (std::exception& e) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION(stringFormat("Failed to create directory {} due to: {}", - dir, e.what())); - // LCOV_EXCL_STOP - } -} - -bool isSubdirectory(const std::filesystem::path& base, - const std::filesystem::path& sub) { - try { - // Resolve paths to their canonical form - auto canonicalBase = std::filesystem::canonical(base); - auto canonicalSub = std::filesystem::canonical(sub); - - std::string relative = - std::filesystem::relative(canonicalSub, canonicalBase).string(); - // Size check for a "." result. - // If the path starts with "..", it's not a subdirectory. - return !relative.empty() && !(relative.starts_with("..")); - - } catch (const std::filesystem::filesystem_error& e) { - // Handle errors, e.g., if paths don't exist - std::cerr << "Filesystem error: " << e.what() << std::endl; - return false; - } -} - -void LocalFileSystem::removeFileIfExists(const std::string& path) { - if (!fileOrPathExists(path)) { - return; - } - if (!isSubdirectory(homeDir, path)) { - THROW_IO_EXCEPTION(stringFormat( - "Error: Path {} is not within the allowed home directory {}", path, - homeDir)); - } - std::error_code errCode; - bool success = false; - if (std::filesystem::is_directory(path)) { - success = std::filesystem::remove_all(path, errCode); - } else { - success = std::filesystem::remove(path, errCode); - } - if (!success) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION( - stringFormat("Error removing directory or file {}. Error Message: {}", - path, errCode.message())); - // LCOV_EXCL_STOP - } -} - -bool LocalFileSystem::fileOrPathExists(const std::string& path, - main::ClientContext* /*context*/) { - return std::filesystem::exists(path); -} - -#ifndef _WIN32 -bool LocalFileSystem::fileExists(const std::string& filename) { - if (!filename.empty()) { - if (access(filename.c_str(), 0) == 0) { - struct stat status = {}; - stat(filename.c_str(), &status); - if (S_ISREG(status.st_mode)) { - return true; - } - } - } - // if any condition fails - return false; -} -#else -bool LocalFileSystem::fileExists(const std::string& filename) { - auto unicode_path = WindowsUtils::utf8ToUnicode(filename.c_str()); - const wchar_t* wpath = unicode_path.c_str(); - if (_waccess(wpath, 0) == 0) { - struct _stati64 status = {}; - _wstati64(wpath, &status); - if (status.st_mode & S_IFREG) { - return true; - } - } - return false; -} -#endif - -std::string LocalFileSystem::expandPath(main::ClientContext* context, - const std::string& path) const { - auto fullPath = path; - if (path.starts_with('~')) { - fullPath = context->getCurrentSetting(main::HomeDirectorySetting::name) - .getValue() + - fullPath.substr(1); - } - return fullPath; -} - -bool LocalFileSystem::isLocalPath(const std::string& path) { - return path.rfind("s3://", 0) != 0 && path.rfind("gs://", 0) != 0 && - path.rfind("gcs://", 0) != 0 && path.rfind("http://", 0) != 0 && - path.rfind("https://", 0) != 0; -} - -void LocalFileSystem::readFromFile(FileInfo& fileInfo, void* buffer, - uint64_t numBytes, uint64_t position) const { - auto localFileInfo = fileInfo.constPtrCast(); -#if defined(_WIN32) - DWORD numBytesRead; - OVERLAPPED overlapped{0, 0, 0, 0}; - overlapped.Offset = position & 0xffffffff; - overlapped.OffsetHigh = position >> 32; - if (!ReadFile((HANDLE) localFileInfo->handle, buffer, numBytes, &numBytesRead, - &overlapped)) { - auto error = GetLastError(); - THROW_IO_EXCEPTION(stringFormat( - "Cannot read from file: {} handle: {} " - "numBytesRead: {} numBytesToRead: {} position: {}. Error {}: {}", - fileInfo.path, (intptr_t) localFileInfo->handle, numBytesRead, numBytes, - position, error, std::system_category().message(error))); - } - if (numBytesRead != numBytes && - fileInfo.getFileSize() != position + numBytesRead) { - THROW_IO_EXCEPTION( - stringFormat("Cannot read from file: {} handle: {} " - "numBytesRead: {} numBytesToRead: {} position: {}", - fileInfo.path, (intptr_t) localFileInfo->handle, - numBytesRead, numBytes, position)); - } -#else - auto numBytesRead = pread(localFileInfo->fd, buffer, numBytes, position); - if ((uint64_t) numBytesRead != numBytes && - localFileInfo->getFileSize() != position + numBytesRead) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION(stringFormat( - "Cannot read from file: {} fileDescriptor: {} " - "numBytesRead: {} numBytesToRead: {} position: {}", - fileInfo.path, localFileInfo->fd, numBytesRead, numBytes, position)); - // LCOV_EXCL_STOP - } -#endif -} - -int64_t LocalFileSystem::readFile(FileInfo& fileInfo, void* buf, - size_t nbyte) const { - auto localFileInfo = fileInfo.constPtrCast(); -#if defined(_WIN32) - DWORD numBytesRead; - ReadFile((HANDLE) localFileInfo->handle, buf, nbyte, &numBytesRead, nullptr); - return numBytesRead; -#else - return read(localFileInfo->fd, buf, nbyte); -#endif -} - -void LocalFileSystem::writeFile(FileInfo& fileInfo, const uint8_t* buffer, - uint64_t numBytes, uint64_t offset) const { - auto localFileInfo = fileInfo.constPtrCast(); - uint64_t remainingNumBytesToWrite = numBytes; - uint64_t bufferOffset = 0; - // Split large writes to 1GB at a time - uint64_t maxBytesToWriteAtOnce = 1ull << 30; // 1ull << 30 = 1G - while (remainingNumBytesToWrite > 0) { - uint64_t numBytesToWrite = - std::min(remainingNumBytesToWrite, maxBytesToWriteAtOnce); - -#if defined(_WIN32) - DWORD numBytesWritten; - OVERLAPPED overlapped{0, 0, 0, 0}; - overlapped.Offset = offset & 0xffffffff; - overlapped.OffsetHigh = offset >> 32; - if (!WriteFile((HANDLE) localFileInfo->handle, buffer + bufferOffset, - numBytesToWrite, &numBytesWritten, &overlapped)) { - auto error = GetLastError(); - THROW_IO_EXCEPTION(stringFormat( - "Cannot write to file. path: {} handle: {} offsetToWrite: {} " - "numBytesToWrite: {} numBytesWritten: {}. Error {}: {}.", - fileInfo.path, (intptr_t) localFileInfo->handle, offset, - numBytesToWrite, numBytesWritten, error, - std::system_category().message(error))); - } -#else - auto numBytesWritten = pwrite(localFileInfo->fd, buffer + bufferOffset, - numBytesToWrite, offset); - if (numBytesWritten != (int64_t) numBytesToWrite) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION(stringFormat( - "Cannot write to file. path: {} fileDescriptor: {} offsetToWrite: {} " - "numBytesToWrite: {} numBytesWritten: {}. Error: {}", - fileInfo.path, localFileInfo->fd, offset, numBytesToWrite, - numBytesWritten, posixErrMessage())); - // LCOV_EXCL_STOP - } -#endif - remainingNumBytesToWrite -= numBytesWritten; - offset += numBytesWritten; - bufferOffset += numBytesWritten; - } -} - -void LocalFileSystem::syncFile(const FileInfo& fileInfo) const { - auto localFileInfo = fileInfo.constPtrCast(); -#if defined(_WIN32) - // Note that `FlushFileBuffers` returns 0 when fail, while `fsync` returns 0 - // when succeed. - if (FlushFileBuffers((HANDLE) localFileInfo->handle) == 0) { - auto error = GetLastError(); - THROW_IO_EXCEPTION(stringFormat("Failed to sync file {}. Error {}: {}", - fileInfo.path, error, - std::system_category().message(error))); - } -#else - if (fsync(localFileInfo->fd) != 0) { - THROW_IO_EXCEPTION(stringFormat("Failed to sync file {}.", fileInfo.path)); - } -#endif -} - -int64_t LocalFileSystem::seek(FileInfo& fileInfo, uint64_t offset, - int whence) const { - auto localFileInfo = fileInfo.constPtrCast(); -#if defined(_WIN32) - LARGE_INTEGER result; - LARGE_INTEGER offset_; - offset_.QuadPart = offset; - SetFilePointerEx((HANDLE) localFileInfo->handle, offset_, &result, whence); - return result.QuadPart; -#else - return lseek(localFileInfo->fd, offset, whence); -#endif -} - -void LocalFileSystem::truncate(FileInfo& fileInfo, uint64_t size) const { - auto localFileInfo = fileInfo.constPtrCast(); -#if defined(_WIN32) - auto offsetHigh = (LONG) (size >> 32); - LONG* offsetHighPtr = NULL; - if (offsetHigh > 0) - offsetHighPtr = &offsetHigh; - if (SetFilePointer((HANDLE) localFileInfo->handle, size & 0xffffffff, - offsetHighPtr, FILE_BEGIN) == INVALID_SET_FILE_POINTER) { - auto error = GetLastError(); - THROW_IO_EXCEPTION( - stringFormat("Cannot set file pointer for file: {} handle: {} " - "new position: {}. Error {}: {}", - fileInfo.path, (intptr_t) localFileInfo->handle, size, - error, std::system_category().message(error))); - } - if (!SetEndOfFile((HANDLE) localFileInfo->handle)) { - auto error = GetLastError(); - THROW_IO_EXCEPTION( - stringFormat("Cannot truncate file: {} handle: {} " - "size: {}. Error {}: {}", - fileInfo.path, (intptr_t) localFileInfo->handle, size, - error, std::system_category().message(error))); - } -#else - if (ftruncate(localFileInfo->fd, size) < 0) { - // LCOV_EXCL_START - THROW_IO_EXCEPTION(stringFormat("Failed to truncate file {}: {}", - fileInfo.path, posixErrMessage())); - // LCOV_EXCL_STOP - } -#endif -} - -uint64_t LocalFileSystem::getFileSize(const FileInfo& fileInfo) const { - auto localFileInfo = fileInfo.constPtrCast(); -#ifdef _WIN32 - LARGE_INTEGER size; - if (!GetFileSizeEx((HANDLE) localFileInfo->handle, &size)) { - auto error = GetLastError(); - THROW_IO_EXCEPTION( - stringFormat("Cannot read size of file. path: {} - Error {}: {}", - fileInfo.path, error, systemErrMessage(error))); - } - return size.QuadPart; -#else - struct stat s {}; - if (fstat(localFileInfo->fd, &s) == -1) { - THROW_IO_EXCEPTION( - stringFormat("Cannot read size of file. path: {} - Error {}: {}", - fileInfo.path, errno, posixErrMessage())); - } - NEUG_ASSERT(s.st_size >= 0); - return s.st_size; -#endif -} - -} // namespace common -} // namespace neug diff --git a/src/compiler/common/file_system/virtual_file_system.cpp b/src/compiler/common/file_system/virtual_file_system.cpp deleted file mode 100644 index 34d8409e5..000000000 --- a/src/compiler/common/file_system/virtual_file_system.cpp +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#include "neug/compiler/common/file_system/virtual_file_system.h" - -#include "neug/compiler/common/assert.h" -#include "neug/compiler/common/file_system/local_file_system.h" -#include "neug/compiler/main/client_context.h" - -namespace neug { -namespace common { - -VirtualFileSystem::VirtualFileSystem() : VirtualFileSystem{""} {} - -VirtualFileSystem::VirtualFileSystem(std::string homeDir) { - defaultFS = std::make_unique(homeDir); -} - -VirtualFileSystem::~VirtualFileSystem() = default; - -void VirtualFileSystem::registerFileSystem( - std::unique_ptr fileSystem) { - subSystems.push_back(std::move(fileSystem)); -} - -FileCompressionType VirtualFileSystem::autoDetectCompressionType( - const std::string& path) const { - if (isGZIPCompressed(path)) { - return FileCompressionType::GZIP; - } - return FileCompressionType::UNCOMPRESSED; -} - -std::unique_ptr VirtualFileSystem::openFile( - const std::string& path, FileOpenFlags flags, - main::ClientContext* context) { - auto compressionType = flags.compressionType; - if (compressionType == FileCompressionType::AUTO_DETECT) { - compressionType = autoDetectCompressionType(path); - } - auto fileHandle = findFileSystem(path)->openFile(path, flags, context); - if (compressionType == FileCompressionType::UNCOMPRESSED) { - return fileHandle; - } - return compressedFileSystem.at(compressionType) - ->openCompressedFile(std::move(fileHandle)); -} - -std::vector VirtualFileSystem::glob( - main::ClientContext* context, const std::string& path) const { - return findFileSystem(path)->glob(context, path); -} - -void VirtualFileSystem::overwriteFile(const std::string& from, - const std::string& to) { - findFileSystem(from)->overwriteFile(from, to); -} - -void VirtualFileSystem::createDir(const std::string& dir) const { - findFileSystem(dir)->createDir(dir); -} - -void VirtualFileSystem::removeFileIfExists(const std::string& path) { - findFileSystem(path)->removeFileIfExists(path); -} - -bool VirtualFileSystem::fileOrPathExists(const std::string& path, - main::ClientContext* context) { - return findFileSystem(path)->fileOrPathExists(path, context); -} - -std::string VirtualFileSystem::expandPath(main::ClientContext* context, - const std::string& path) const { - return findFileSystem(path)->expandPath(context, path); -} - -void VirtualFileSystem::readFromFile(FileInfo& /*fileInfo*/, void* /*buffer*/, - uint64_t /*numBytes*/, - uint64_t /*position*/) const { - NEUG_UNREACHABLE; -} - -int64_t VirtualFileSystem::readFile(FileInfo& /*fileInfo*/, void* /*buf*/, - size_t /*nbyte*/) const { - NEUG_UNREACHABLE; -} - -void VirtualFileSystem::writeFile(FileInfo& /*fileInfo*/, - const uint8_t* /*buffer*/, - uint64_t /*numBytes*/, - uint64_t /*offset*/) const { - NEUG_UNREACHABLE; -} - -void VirtualFileSystem::syncFile(const FileInfo& fileInfo) const { - findFileSystem(fileInfo.path)->syncFile(fileInfo); -} - -void VirtualFileSystem::cleanUP(main::ClientContext* context) { - for (auto& subSystem : subSystems) { - subSystem->cleanUP(context); - } - defaultFS->cleanUP(context); -} - -int64_t VirtualFileSystem::seek(FileInfo& /*fileInfo*/, uint64_t /*offset*/, - int /*whence*/) const { - NEUG_UNREACHABLE; -} - -void VirtualFileSystem::truncate(FileInfo& /*fileInfo*/, - uint64_t /*size*/) const { - NEUG_UNREACHABLE; -} - -uint64_t VirtualFileSystem::getFileSize(const FileInfo& /*fileInfo*/) const { - NEUG_UNREACHABLE; -} - -FileSystem* VirtualFileSystem::findFileSystem(const std::string& path) const { - for (auto& subSystem : subSystems) { - if (subSystem->canHandleFile(path)) { - return subSystem.get(); - } - } - return defaultFS.get(); -} - -} // namespace common -} // namespace neug diff --git a/src/compiler/common/serializer/CMakeLists.txt b/src/compiler/common/serializer/CMakeLists.txt index 8a4872366..7954b111a 100644 --- a/src/compiler/common/serializer/CMakeLists.txt +++ b/src/compiler/common/serializer/CMakeLists.txt @@ -2,7 +2,6 @@ add_library(neug_common_serializer OBJECT serializer.cpp deserializer.cpp - buffered_file.cpp buffered_serializer.cpp) set(ALL_OBJECT_FILES diff --git a/src/compiler/common/serializer/buffered_file.cpp b/src/compiler/common/serializer/buffered_file.cpp deleted file mode 100644 index cf1a7e044..000000000 --- a/src/compiler/common/serializer/buffered_file.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is originally from the Kùzu project - * (https://github.com/kuzudb/kuzu) Licensed under the MIT License. Modified by - * Zhou Xiaoli in 2025 to support Neug-specific features. - */ - -#include "neug/compiler/common/serializer/buffered_file.h" - -#include - -#include "neug/compiler/common/assert.h" -#include "neug/compiler/common/file_system/file_info.h" -#include "neug/compiler/common/system_config.h" -#include "neug/utils/exception/exception.h" - -namespace neug { -namespace common { - -static constexpr uint64_t BUFFER_SIZE = NEUG_PAGE_SIZE; - -BufferedFileWriter::~BufferedFileWriter() { flush(); } - -BufferedFileWriter::BufferedFileWriter(FileInfo& fileInfo) - : buffer(std::make_unique(BUFFER_SIZE)), - fileOffset(0), - bufferOffset(0), - fileInfo(fileInfo) {} - -void BufferedFileWriter::write(const uint8_t* data, uint64_t size) { - if (size > BUFFER_SIZE) { - flush(); - fileInfo.writeFile(data, size, fileOffset); - fileOffset += size; - return; - } - NEUG_ASSERT(size <= BUFFER_SIZE); - if (bufferOffset + size <= BUFFER_SIZE) { - memcpy(&buffer[bufferOffset], data, size); - bufferOffset += size; - } else { - auto toCopy = BUFFER_SIZE - bufferOffset; - memcpy(&buffer[bufferOffset], data, toCopy); - bufferOffset += toCopy; - flush(); - auto remaining = size - toCopy; - memcpy(buffer.get(), data + toCopy, remaining); - bufferOffset += remaining; - } -} - -void BufferedFileWriter::flush() { - if (bufferOffset == 0) { - return; - } - fileInfo.writeFile(buffer.get(), bufferOffset, fileOffset); - fileOffset += bufferOffset; - bufferOffset = 0; - memset(buffer.get(), 0, BUFFER_SIZE); -} - -void BufferedFileWriter::sync() { fileInfo.syncFile(); } - -uint64_t BufferedFileWriter::getFileSize() const { - return fileInfo.getFileSize() + bufferOffset; -} - -BufferedFileReader::BufferedFileReader(std::unique_ptr fileInfo) - : buffer(std::make_unique(BUFFER_SIZE)), - fileOffset(0), - bufferOffset(0), - fileInfo(std::move(fileInfo)), - bufferSize{0} { - fileSize = this->fileInfo->getFileSize(); - readNextPage(); -} - -void BufferedFileReader::read(uint8_t* data, uint64_t size) { - if (size > BUFFER_SIZE) { - // Clear read buffer. - fileOffset -= bufferSize; - fileOffset += bufferOffset; - fileInfo->readFromFile(data, size, fileOffset); - fileOffset += size; - bufferOffset = bufferSize; - } else if (bufferOffset + size <= bufferSize) { - memcpy(data, &buffer[bufferOffset], size); - bufferOffset += size; - } else { - auto toCopy = bufferSize - bufferOffset; - memcpy(data, &buffer[bufferOffset], toCopy); - bufferOffset += toCopy; - readNextPage(); - auto remaining = size - toCopy; - memcpy(data + toCopy, buffer.get(), remaining); - bufferOffset += remaining; - } -} - -bool BufferedFileReader::finished() { - return bufferOffset >= bufferSize && fileSize <= fileOffset; -} - -void BufferedFileReader::readNextPage() { - if (fileSize <= fileOffset) { - THROW_RUNTIME_ERROR(stringFormat( - "Reading past the end of the file {} with size {} at offset {}", - fileInfo->path, fileSize, fileOffset)); - } - bufferSize = std::min(fileSize - fileOffset, BUFFER_SIZE); - fileInfo->readFromFile(buffer.get(), bufferSize, fileOffset); - fileOffset += bufferSize; - bufferOffset = 0; -} - -} // namespace common -} // namespace neug diff --git a/src/compiler/extension/extension_manager.cpp b/src/compiler/extension/extension_manager.cpp index 048a673a0..f802572f0 100644 --- a/src/compiler/extension/extension_manager.cpp +++ b/src/compiler/extension/extension_manager.cpp @@ -23,7 +23,6 @@ #include "neug/compiler/extension/extension_manager.h" #include "generated_extension_loader.h" -#include "neug/compiler/common/file_system/virtual_file_system.h" #include "neug/compiler/common/string_utils.h" #include "neug/compiler/extension/extension.h" diff --git a/src/compiler/function/CMakeLists.txt b/src/compiler/function/CMakeLists.txt index 9d58e86fc..c38eeecd7 100644 --- a/src/compiler/function/CMakeLists.txt +++ b/src/compiler/function/CMakeLists.txt @@ -25,7 +25,7 @@ add_library(neug_function vector_date_functions.cpp vector_string_functions.cpp csv_export_function.cpp) -add_dependencies(neug_function neug_plan_proto) +add_dependencies(neug_function neug_plan_proto neug_utils) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/compiler/function/csv_export_function.cpp b/src/compiler/function/csv_export_function.cpp index 51139b99a..42fa419de 100644 --- a/src/compiler/function/csv_export_function.cpp +++ b/src/compiler/function/csv_export_function.cpp @@ -21,7 +21,7 @@ */ #include "neug/compiler/function/export/export_function.h" -#include "neug/compiler/function/read_function.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/utils/writer/writer.h" namespace neug { @@ -34,11 +34,13 @@ static void convertFileSchemaOptions(reader::FileSchema& schema) { // convert user-specified 'DELIMITER' to 'DELIM' for arrow csv options, all // options are case insensitive. Use operator[] so DELIMITER overwrites DELIM // when both are set (avoids silently ignoring DELIMITER). - if (options.contains("DELIMITER")) { - options["DELIM"] = options.at("DELIMITER"); + auto it = options.find("DELIMITER"); + if (it != options.end()) { + options["DELIM"] = it->second; } - if (options.contains("DELIM")) { - auto value = options.at("DELIM"); + it = options.find("DELIM"); + if (it != options.end()) { + auto value = it->second; if (value.size() != 1) { THROW_INVALID_ARGUMENT_EXCEPTION( "Delimiter should be a single character: " + value); @@ -58,10 +60,10 @@ execution::Context writeExecFunc( THROW_INVALID_ARGUMENT_EXCEPTION("Schema paths is empty"); } convertFileSchemaOptions(schema); - LocalFileSystemProvider fsProvider; - auto fileInfo = fsProvider.provide(schema, false); + const auto& vfs = neug::main::MetadataRegistry::getVFS(); + const auto& fs = vfs->Provide(schema); auto writer = std::make_shared( - schema, fileInfo.fileSystem, entry_schema); + schema, fs->toArrowFileSystem(), entry_schema); auto status = writer->write(ctx, graph); if (!status.ok()) { THROW_IO_EXCEPTION("Export failed: " + status.ToString()); diff --git a/src/compiler/gopt/CMakeLists.txt b/src/compiler/gopt/CMakeLists.txt index ad81e01c2..af79a01d3 100644 --- a/src/compiler/gopt/CMakeLists.txt +++ b/src/compiler/gopt/CMakeLists.txt @@ -8,8 +8,6 @@ add_library(neug_gopt g_type_registration.cpp g_query_converter.cpp g_precedence.cpp - g_catalog_holder.cpp - g_vfs_holder.cpp ) add_dependencies(neug_gopt neug_plan_proto) diff --git a/src/compiler/gopt/g_catalog_holder.cpp b/src/compiler/gopt/g_catalog_holder.cpp deleted file mode 100644 index 97b536d8e..000000000 --- a/src/compiler/gopt/g_catalog_holder.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/** Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "neug/compiler/gopt/g_catalog_holder.h" - -namespace neug { -namespace catalog { -GCatalog* GCatalogHolder::gCatalog = nullptr; - -void GCatalogHolder::setGCatalog(GCatalog* catalog) { - if (!catalog) { - THROW_CATALOG_EXCEPTION("cannot set GCatalog to holder, it is nullptr"); - } - gCatalog = catalog; -} - -GCatalog* GCatalogHolder::getGCatalog() { - if (!gCatalog) { - THROW_CATALOG_EXCEPTION("cannot get GCatalog from holder, it is not set"); - } - return gCatalog; -} - -} // namespace catalog -} // namespace neug \ No newline at end of file diff --git a/src/compiler/gopt/g_expr_converter.cpp b/src/compiler/gopt/g_expr_converter.cpp index 49ad7dfe3..315c04d5c 100644 --- a/src/compiler/gopt/g_expr_converter.cpp +++ b/src/compiler/gopt/g_expr_converter.cpp @@ -42,7 +42,6 @@ #include "neug/compiler/function/struct/vector_struct_functions.h" #include "neug/compiler/gopt/g_alias_manager.h" #include "neug/compiler/gopt/g_alias_name.h" -#include "neug/compiler/gopt/g_catalog_holder.h" #include "neug/compiler/gopt/g_scalar_type.h" #include "neug/compiler/parser/expression/parsed_expression.h" #include "neug/compiler/parser/expression/parsed_function_expression.h" diff --git a/src/compiler/gopt/g_vfs_holder.cpp b/src/compiler/gopt/g_vfs_holder.cpp deleted file mode 100644 index eded63d6c..000000000 --- a/src/compiler/gopt/g_vfs_holder.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright 2020 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "neug/compiler/gopt/g_vfs_holder.h" - -namespace neug { -namespace common { - -VirtualFileSystem* VFSHolder::vfs = nullptr; - -void VFSHolder::setVFS(VirtualFileSystem* fileSystem) { - if (!fileSystem) { - THROW_EXCEPTION_WITH_FILE_LINE( - "Cannot set VirtualFileSystem to holder, it is nullptr"); - } - vfs = fileSystem; -} - -VirtualFileSystem* VFSHolder::getVFS() { - if (!vfs) { - THROW_EXCEPTION_WITH_FILE_LINE( - "Cannot get VirtualFileSystem from holder, it is not set"); - } - return vfs; -} - -} // namespace common -} // namespace neug \ No newline at end of file diff --git a/src/compiler/main/CMakeLists.txt b/src/compiler/main/CMakeLists.txt index 002c444f6..89c2f0f99 100644 --- a/src/compiler/main/CMakeLists.txt +++ b/src/compiler/main/CMakeLists.txt @@ -2,6 +2,7 @@ add_library(neug_compiler_main OBJECT client_context.cpp metadata_manager.cpp + metadata_registry.cpp plan_printer.cpp prepared_statement.cpp query_summary.cpp diff --git a/src/compiler/main/client_context.cpp b/src/compiler/main/client_context.cpp index d81bfd8d2..5f1b6263b 100644 --- a/src/compiler/main/client_context.cpp +++ b/src/compiler/main/client_context.cpp @@ -150,10 +150,6 @@ Catalog* ClientContext::getCatalog() const { return localDatabase->catalog.get(); } -VirtualFileSystem* ClientContext::getVFSUnsafe() const { - return localDatabase->vfs.get(); -} - std::string ClientContext::getEnvVariable(const std::string& name) { #if defined(_WIN32) auto envValue = WindowsUtils::utf8ToUnicode(name.c_str()); diff --git a/src/compiler/main/metadata_manager.cpp b/src/compiler/main/metadata_manager.cpp index 9bbda8c8e..8a5ec9f14 100644 --- a/src/compiler/main/metadata_manager.cpp +++ b/src/compiler/main/metadata_manager.cpp @@ -25,7 +25,6 @@ #include "neug/compiler/extension/extension_manager.h" #include "neug/compiler/gopt/g_catalog.h" #include "neug/compiler/main/client_context.h" -#include "neug/utils/yaml_utils.h" #if defined(_WIN32) #include @@ -33,9 +32,6 @@ #include #endif -#include "neug/compiler/common/file_system/virtual_file_system.h" -#include "neug/compiler/gopt/g_catalog_holder.h" -#include "neug/compiler/gopt/g_vfs_holder.h" #include "neug/compiler/storage/stats_manager.h" using namespace neug::catalog; @@ -47,14 +43,11 @@ namespace neug { namespace main { MetadataManager::MetadataManager() { - this->vfs = std::make_unique(); - common::VFSHolder::setVFS(this->vfs.get()); + this->vfs = std::make_unique(); this->extensionManager = std::make_unique(); this->memoryManager = std::make_unique(); // the catalog is initialized only once and is empty before data loading this->catalog = std::make_unique(); - catalog::GCatalogHolder::setGCatalog( - this->catalog->ptrCast()); std::string emptyStats = ""; auto statsManager = std::make_shared( emptyStats, this, *this->memoryManager); diff --git a/src/compiler/main/metadata_registry.cpp b/src/compiler/main/metadata_registry.cpp new file mode 100644 index 000000000..ac6cc80c8 --- /dev/null +++ b/src/compiler/main/metadata_registry.cpp @@ -0,0 +1,61 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "neug/compiler/main/metadata_registry.h" + +#include "neug/compiler/gopt/g_catalog.h" +#include "neug/compiler/main/metadata_manager.h" +#include "neug/utils/exception/exception.h" + +namespace neug { +namespace main { + +MetadataManager* MetadataRegistry::metadataManager = nullptr; + +void MetadataRegistry::registerMetadata( + main::MetadataManager* metadataManager) { + if (!metadataManager) { + THROW_INVALID_ARGUMENT_EXCEPTION("Metadata manager is not set"); + } + MetadataRegistry::metadataManager = metadataManager; +} + +MetadataManager* MetadataRegistry::getMetadata() { + if (!metadataManager) { + THROW_INVALID_ARGUMENT_EXCEPTION("Metadata manager is not set"); + } + return metadataManager; +} + +catalog::GCatalog* MetadataRegistry::getCatalog() { + auto metadataManager = MetadataRegistry::getMetadata(); + auto catalog = + dynamic_cast(metadataManager->getCatalog()); + if (!catalog) { + THROW_INVALID_ARGUMENT_EXCEPTION("Catalog is not set"); + } + return catalog; +} + +neug::fsys::FileSystemRegistry* MetadataRegistry::getVFS() { + auto metadataManager = MetadataRegistry::getMetadata(); + auto vfs = metadataManager->getVFS(); + if (!vfs) { + THROW_INVALID_ARGUMENT_EXCEPTION("Virtual file system is not set"); + } + return vfs; +} +} // namespace main +} // namespace neug diff --git a/src/compiler/main/settings.cpp b/src/compiler/main/settings.cpp index ae8b9824a..ea3681938 100644 --- a/src/compiler/main/settings.cpp +++ b/src/compiler/main/settings.cpp @@ -22,7 +22,6 @@ #include "neug/compiler/main/settings.h" -#include "neug/compiler/common/file_system/virtual_file_system.h" #include "neug/compiler/main/client_context.h" #include "neug/utils/exception/exception.h" diff --git a/src/compiler/planner/gopt_planner.cc b/src/compiler/planner/gopt_planner.cc index 1a26eb15c..c0fbc273f 100644 --- a/src/compiler/planner/gopt_planner.cc +++ b/src/compiler/planner/gopt_planner.cc @@ -18,7 +18,7 @@ limitations under the License. #include #include "neug/compiler/common/case_insensitive_map.h" #include "neug/compiler/gopt/g_catalog.h" -#include "neug/compiler/gopt/g_catalog_holder.h" +#include "neug/compiler/gopt/g_physical_convertor.h" #include "neug/compiler/gopt/g_result_schema.h" #include "neug/utils/exception/exception.h" diff --git a/src/compiler/transaction/transaction.cpp b/src/compiler/transaction/transaction.cpp index c4256235e..65d6d0573 100644 --- a/src/compiler/transaction/transaction.cpp +++ b/src/compiler/transaction/transaction.cpp @@ -4,7 +4,6 @@ #include "neug/compiler/catalog/catalog_entry/rel_group_catalog_entry.h" #include "neug/compiler/main/client_context.h" #include "neug/compiler/main/option_config.h" -#include "neug/compiler/storage/wal/wal.h" #include "neug/utils/exception/exception.h" using namespace neug::catalog; diff --git a/src/execution/execute/ops/batch/data_export.cc b/src/execution/execute/ops/batch/data_export.cc index 609731878..80d9c0190 100644 --- a/src/execution/execute/ops/batch/data_export.cc +++ b/src/execution/execute/ops/batch/data_export.cc @@ -15,7 +15,7 @@ #include "neug/execution/execute/ops/batch/data_export.h" #include "neug/compiler/function/export/export_function.h" -#include "neug/compiler/gopt/g_catalog_holder.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/execution/execute/ops/batch/data_source.h" #include "neug/utils/exception/exception.h" #include "neug/utils/reader/reader.h" @@ -68,7 +68,7 @@ neug::result DataExportOprBuilder::Build( const auto& data_export_opr = plan.plan(op_idx).opr().data_export(); std::string extension_name = data_export_opr.extension_name(); auto signatureName = data_export_opr.extension_name(); - auto gCatalog = catalog::GCatalogHolder::getGCatalog(); + auto gCatalog = neug::main::MetadataRegistry::getCatalog(); auto func = gCatalog->getFunctionWithSignature(signatureName); auto writeFunc = func->ptrCast(); auto fileSchema = diff --git a/src/execution/execute/ops/batch/data_source.cc b/src/execution/execute/ops/batch/data_source.cc index 93243bdac..d8faf395f 100644 --- a/src/execution/execute/ops/batch/data_source.cc +++ b/src/execution/execute/ops/batch/data_source.cc @@ -22,7 +22,7 @@ #include "neug/compiler/common/assert.h" #include "neug/compiler/function/read_function.h" -#include "neug/compiler/gopt/g_catalog_holder.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/execution/common/context.h" #include "neug/execution/execute/ops/batch/data_source.h" #include "neug/utils/reader/reader.h" @@ -125,7 +125,7 @@ neug::result DataSourceOprBuilder::Build( // look up read function from catalog auto signatureName = sourcePB.extension_name(); - auto gCatalog = catalog::GCatalogHolder::getGCatalog(); + auto gCatalog = neug::main::MetadataRegistry::getCatalog(); auto func = gCatalog->getFunctionWithSignature(signatureName); auto readFunc = func->ptrCast(); return std::make_pair(std::make_unique(state, readFunc), diff --git a/src/execution/execute/ops/retrieve/procedure_call.cc b/src/execution/execute/ops/retrieve/procedure_call.cc index 3f94907a2..c0df12180 100644 --- a/src/execution/execute/ops/retrieve/procedure_call.cc +++ b/src/execution/execute/ops/retrieve/procedure_call.cc @@ -14,7 +14,7 @@ */ #include "neug/execution/execute/ops/retrieve/procedure_call.h" -#include "neug/compiler/gopt/g_catalog_holder.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/utils/exception/exception.h" namespace neug { @@ -48,7 +48,7 @@ class ProcedureCallOpr : public IOperator { neug::result ProcedureCallOprBuilder::Build( const neug::Schema& schema, const ContextMeta& ctx_meta, const physical::PhysicalPlan& plan, int op_idx) { - auto gCatalog = catalog::GCatalogHolder::getGCatalog(); + auto gCatalog = neug::main::MetadataRegistry::getCatalog(); auto procedurePB = plan.plan(op_idx).opr().procedure_call(); auto signatureName = procedurePB.query().query_name().name(); auto func = gCatalog->getFunctionWithSignature(signatureName); diff --git a/src/execution/expression/expr.cc b/src/execution/expression/expr.cc index b9dd4ad4d..93d71f374 100644 --- a/src/execution/expression/expr.cc +++ b/src/execution/expression/expr.cc @@ -18,7 +18,7 @@ #include "neug/compiler/function/neug_scalar_function.h" #include "neug/compiler/function/scalar_function.h" -#include "neug/compiler/gopt/g_catalog_holder.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/execution/expression/accessors/const_accessor.h" #include "neug/execution/expression/exprs/arith_expr.h" #include "neug/execution/expression/exprs/case_when.h" @@ -138,7 +138,7 @@ static std::unique_ptr build_expr( const std::string& signature = op.unique_name(); neug::execution::neug_func_exec_t fn = nullptr; - auto gCatalog = catalog::GCatalogHolder::getGCatalog(); + auto gCatalog = neug::main::MetadataRegistry::getCatalog(); auto func = gCatalog->getFunctionWithSignature( &neug::transaction::DUMMY_TRANSACTION, signature); if (!func) { diff --git a/src/utils/file_sys/file_system.cc b/src/utils/file_sys/file_system.cc new file mode 100644 index 000000000..65b209533 --- /dev/null +++ b/src/utils/file_sys/file_system.cc @@ -0,0 +1,100 @@ +/** + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "neug/utils/file_sys/file_system.h" + +#include +#include +#include +#include + +#include "neug/execution/execute/ops/batch/batch_update_utils.h" +#include "neug/utils/exception/exception.h" +#include "neug/utils/reader/schema.h" + +namespace neug { +namespace fsys { + +class LocalFileSystem : public FileSystem { + public: + LocalFileSystem() = default; + + std::vector glob(const std::string& path) override { + // Normalize file:// URIs by stripping the scheme before passing to + // match_files_with_pattern, which uses POSIX glob() and std::filesystem + // and expects local paths without URI scheme. + constexpr const char* kFilePrefix = "file://"; + constexpr size_t kFilePrefixLen = 7; + if (path.starts_with(kFilePrefix)) { + std::string local_path = path.substr(kFilePrefixLen); + if (local_path.empty() || local_path[0] != '/') { + local_path = "/" + local_path; + } + return neug::execution::ops::match_files_with_pattern(local_path); + } + return neug::execution::ops::match_files_with_pattern(path); + } + + std::unique_ptr toArrowFileSystem() override { + return std::make_unique(); + } +}; + +FileSystemRegistry::FileSystemRegistry() { + Register("file", [](const reader::FileSchema&) { + return std::make_unique(); + }); +} + +void FileSystemRegistry::Register(const std::string& protocol, + FileSystemFactory factory) { + std::unique_lock lck(mtx); + factories_[protocol] = std::move(factory); +} + +std::unique_ptr FileSystemRegistry::Provide( + const reader::FileSchema& schema) { + std::string protocol = schema.protocol; + if (protocol.empty()) { + const auto& paths = schema.paths; + if (paths.empty()) { + THROW_INVALID_ARGUMENT_EXCEPTION("No file paths provided"); + } + // we assume all paths share the same protocol + const auto& path = paths[0]; + auto pos = path.find("://"); + if (pos != std::string::npos) { + protocol = path.substr(0, pos); + } else { + protocol = "file"; + } + } + + FileSystemFactory factory; + { + std::shared_lock lck(mtx); + auto it = factories_.find(protocol); + if (it == factories_.end()) { + THROW_INVALID_ARGUMENT_EXCEPTION("Unsupported file system protocol: " + + protocol); + } + factory = it->second; + } + return factory(schema); +} + +} // namespace fsys +} // namespace neug diff --git a/tests/compiler/gopt_test.h b/tests/compiler/gopt_test.h index 7ed49baa4..95b743985 100644 --- a/tests/compiler/gopt_test.h +++ b/tests/compiler/gopt_test.h @@ -42,12 +42,12 @@ #include "neug/compiler/gopt/g_result_schema.h" #include "neug/compiler/main/client_context.h" #include "neug/compiler/main/metadata_manager.h" +#include "neug/compiler/main/metadata_registry.h" #include "neug/compiler/optimizer/expand_getv_fusion.h" #include "neug/compiler/optimizer/filter_push_down_pattern.h" #include "neug/compiler/planner/gopt_planner.h" #include "neug/compiler/planner/operator/logical_plan_util.h" #include "neug/compiler/storage/buffer_manager/memory_manager.h" -#include "neug/compiler/storage/wal/wal.h" #include "neug/compiler/transaction/transaction.h" #include "neug/utils/service_utils.h" @@ -185,6 +185,7 @@ class GOptTest : public ::testing::Test { void SetUp() override { database = std::make_unique(); ctx = std::make_unique(database.get()); + main::MetadataRegistry::registerMetadata(database.get()); } void TearDown() override { diff --git a/tests/unittest/test_extension.cc b/tests/unittest/test_extension.cc index 7515fdd6e..ed96978aa 100644 --- a/tests/unittest/test_extension.cc +++ b/tests/unittest/test_extension.cc @@ -2,8 +2,6 @@ #include #include #include "column_assertions.h" -#include "neug/compiler/common/file_system/virtual_file_system.h" -#include "neug/compiler/gopt/g_vfs_holder.h" #include "neug/main/neug_db.h" class TestJsonExtension : public ::testing::Test { @@ -16,8 +14,6 @@ class TestJsonExtension : public ::testing::Test { std::string vperson_jsonl; void SetUp() override { - auto vfs_ = std::make_unique(); - neug::common::VFSHolder::setVFS(vfs_.get()); const char* dir = std::getenv("FLEX_DATA_DIR"); ASSERT_NE(dir, nullptr) << "FLEX_DATA_DIR environment variable is not set"; ASSERT_NE(dir[0], '\0') << "FLEX_DATA_DIR environment variable is empty";