Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 41 additions & 18 deletions extension/json/include/json_read_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "json_options.h"
#include "neug/compiler/function/function.h"
#include "neug/compiler/function/read_function.h"
#include "neug/compiler/main/metadata_registry.h"
#include "neug/execution/execute/ops/batch/batch_update_utils.h"
#include "neug/utils/reader/options.h"
#include "neug/utils/reader/reader.h"
Expand All @@ -49,15 +50,20 @@ struct JsonReadFunction {

static execution::Context jsonExecFunc(
std::shared_ptr<reader::ReadSharedState> state) {
// todo: get file system from vfs manager
LocalFileSystemProvider fsProvider;
auto fileInfo = fsProvider.provide(state->schema.file);
state->schema.file.paths = fileInfo.resolvedPaths;
const auto& vfs = neug::main::MetadataRegistry::getVFS();
const auto& fs = vfs->Provide(state->schema.file);
auto resolvedPaths = std::vector<std::string>();
for (const auto& path : state->schema.file.paths) {
const auto& resolved = fs->glob(path);
resolvedPaths.insert(resolvedPaths.end(), resolved.begin(),
resolved.end());
}
state->schema.file.paths = std::move(resolvedPaths);
auto optionsBuilder =
std::make_unique<reader::ArrowJsonOptionsBuilder>(state);
// register JsonDatasetBuilder to the reader to support json array format
auto reader = std::make_unique<reader::ArrowReader>(
state, std::move(optionsBuilder), fileInfo.fileSystem,
state, std::move(optionsBuilder), fs->toArrowFileSystem(),
std::make_shared<reader::JsonDatasetBuilder>());
execution::Context ctx;
auto localState = std::make_shared<reader::ReadLocalState>();
Expand All @@ -73,15 +79,20 @@ struct JsonReadFunction {
// to be inferred.
externalSchema.entry = std::make_shared<reader::TableEntrySchema>();
externalSchema.file = schema;
// todo: get file system from vfs manager
LocalFileSystemProvider fsProvider;
auto fileInfo = fsProvider.provide(state->schema.file);
state->schema.file.paths = fileInfo.resolvedPaths;
const auto& vfs = neug::main::MetadataRegistry::getVFS();
const auto& fs = vfs->Provide(state->schema.file);
auto resolvedPaths = std::vector<std::string>();
for (const auto& path : state->schema.file.paths) {
const auto& resolved = fs->glob(path);
resolvedPaths.insert(resolvedPaths.end(), resolved.begin(),
resolved.end());
}
state->schema.file.paths = std::move(resolvedPaths);
auto optionsBuilder =
std::make_unique<reader::ArrowJsonOptionsBuilder>(state);
// register JsonDatasetBuilder to the reader to support json array format
auto reader = std::make_shared<reader::ArrowReader>(
state, std::move(optionsBuilder), fileInfo.fileSystem,
state, std::move(optionsBuilder), fs->toArrowFileSystem(),
std::make_shared<reader::JsonDatasetBuilder>());
auto sniffer = std::make_shared<reader::ArrowSniffer>(reader);
auto sniffResult = sniffer->sniff();
Expand Down Expand Up @@ -110,15 +121,21 @@ struct JsonLReadFunction {
static execution::Context jsonLExecFunc(
std::shared_ptr<reader::ReadSharedState> state) {
// todo: get file system from vfs manager
LocalFileSystemProvider fsProvider;
auto fileInfo = fsProvider.provide(state->schema.file);
state->schema.file.paths = fileInfo.resolvedPaths;
const auto& vfs = neug::main::MetadataRegistry::getVFS();
const auto& fs = vfs->Provide(state->schema.file);
auto resolvedPaths = std::vector<std::string>();
for (const auto& path : state->schema.file.paths) {
const auto& resolved = fs->glob(path);
resolvedPaths.insert(resolvedPaths.end(), resolved.begin(),
resolved.end());
}
state->schema.file.paths = std::move(resolvedPaths);
auto optionsBuilder =
std::make_unique<reader::ArrowJsonOptionsBuilder>(state);
// Arrow can support jsonl format by default, no need to register other
// DatasetBuilder
auto reader = std::make_unique<reader::ArrowReader>(
state, std::move(optionsBuilder), fileInfo.fileSystem);
state, std::move(optionsBuilder), fs->toArrowFileSystem());
execution::Context ctx;
auto localState = std::make_shared<reader::ReadLocalState>();
reader->read(localState, ctx);
Expand All @@ -134,15 +151,21 @@ struct JsonLReadFunction {
externalSchema.entry = std::make_shared<reader::TableEntrySchema>();
externalSchema.file = schema;
// todo: get file system from vfs manager
LocalFileSystemProvider fsProvider;
auto fileInfo = fsProvider.provide(state->schema.file);
state->schema.file.paths = fileInfo.resolvedPaths;
const auto& vfs = neug::main::MetadataRegistry::getVFS();
const auto& fs = vfs->Provide(state->schema.file);
auto resolvedPaths = std::vector<std::string>();
for (const auto& path : state->schema.file.paths) {
const auto& resolved = fs->glob(path);
resolvedPaths.insert(resolvedPaths.end(), resolved.begin(),
resolved.end());
}
state->schema.file.paths = std::move(resolvedPaths);
auto optionsBuilder =
std::make_unique<reader::ArrowJsonOptionsBuilder>(state);
// Arrow can support jsonl format by default, no need to register other
// DatasetBuilder
auto reader = std::make_shared<reader::ArrowReader>(
state, std::move(optionsBuilder), fileInfo.fileSystem);
state, std::move(optionsBuilder), fs->toArrowFileSystem());
auto sniffer = std::make_shared<reader::ArrowSniffer>(reader);
auto sniffResult = sniffer->sniff();
if (!sniffResult) {
Expand Down
13 changes: 7 additions & 6 deletions extension/json/src/json_export_function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <string>

#include "neug/compiler/function/read_function.h"
#include "neug/compiler/main/metadata_registry.h"
#include "neug/generated/proto/response/response.pb.h"
#include "neug/utils/exception/exception.h"
#include "neug/utils/property/types.h"
Expand Down Expand Up @@ -426,10 +427,10 @@ static execution::Context jsonExecFunc(
if (schema.paths.empty()) {
THROW_INVALID_ARGUMENT_EXCEPTION("Schema paths is empty");
}
LocalFileSystemProvider fsProvider;
auto fileInfo = fsProvider.provide(schema, false);
const auto& vfs = neug::main::MetadataRegistry::getVFS();
const auto& fs = vfs->Provide(schema);
auto writer = std::make_shared<neug::writer::ArrowJsonArrayExportWriter>(
schema, fileInfo.fileSystem, entry_schema);
schema, fs->toArrowFileSystem(), entry_schema);
auto status = writer->write(ctx, graph);
if (!status.ok()) {
THROW_IO_EXCEPTION("Export failed: " + status.ToString());
Expand Down Expand Up @@ -463,10 +464,10 @@ static execution::Context jsonLExecFunc(
if (schema.paths.empty()) {
THROW_INVALID_ARGUMENT_EXCEPTION("Schema paths is empty");
}
LocalFileSystemProvider fsProvider;
auto fileInfo = fsProvider.provide(schema, false);
const auto& vfs = neug::main::MetadataRegistry::getVFS();
const auto& fs = vfs->Provide(schema);
auto writer = std::make_shared<neug::writer::ArrowJsonLExportWriter>(
schema, fileInfo.fileSystem, entry_schema);
schema, fs->toArrowFileSystem(), entry_schema);
auto status = writer->write(ctx, graph);
if (!status.ok()) {
THROW_IO_EXCEPTION("Export failed: " + status.ToString());
Expand Down
5 changes: 0 additions & 5 deletions include/neug/compiler/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ class Binder {
std::unique_ptr<BoundStatement> bindCopyToClause(
const parser::Statement& statement);

std::unique_ptr<BoundStatement> bindExportDatabaseClause(
const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindImportDatabaseClause(
const parser::Statement& statement);

static std::unique_ptr<BoundStatement> bindAttachDatabase(
const parser::Statement& statement);
static std::unique_ptr<BoundStatement> bindDetachDatabase(
Expand Down
82 changes: 0 additions & 82 deletions include/neug/compiler/binder/bound_export_database.h

This file was deleted.

54 changes: 0 additions & 54 deletions include/neug/compiler/binder/bound_import_database.h

This file was deleted.

Loading
Loading