Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/iceberg/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ class ICEBERG_EXPORT Catalog {
/// \param identifier a table identifier
/// \param schema a schema
/// \param spec a partition spec
/// \param order a sort order
/// \param location a location for the table; leave empty if unspecified
/// \param properties a string map of table properties
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
virtual Result<std::shared_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) = 0;

Expand All @@ -131,12 +133,14 @@ class ICEBERG_EXPORT Catalog {
/// \param identifier a table identifier
/// \param schema a schema
/// \param spec a partition spec
/// \param order a sort order
/// \param location a location for the table; leave empty if unspecified
/// \param properties a string map of table properties
/// \return a Transaction to create the table or ErrorKind::kAlreadyExists if the
/// table already exists
virtual Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) = 0;

Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
}

Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
Expand Down Expand Up @@ -439,7 +440,8 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
}

Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
Expand Down
6 changes: 4 additions & 2 deletions src/iceberg/catalog/memory/in_memory_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class ICEBERG_EXPORT InMemoryCatalog
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;

Result<std::shared_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;

Expand All @@ -81,7 +82,8 @@ class ICEBERG_EXPORT InMemoryCatalog
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;

Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;

Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/catalog/rest/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ set(ICEBERG_REST_SOURCES
json_internal.cc
resource_paths.cc
rest_catalog.cc
rest_util.cc)
rest_util.cc
types.cc)

set(ICEBERG_REST_STATIC_BUILD_INTERFACE_LIBS)
set(ICEBERG_REST_SHARED_BUILD_INTERFACE_LIBS)
Expand Down
55 changes: 55 additions & 0 deletions src/iceberg/catalog/rest/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "iceberg/catalog/rest/json_internal.h"

#include <memory>
#include <string>
#include <utility>
#include <vector>
Expand All @@ -27,6 +28,8 @@

#include "iceberg/catalog/rest/types.h"
#include "iceberg/json_internal.h"
#include "iceberg/partition_spec.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_identifier.h"
#include "iceberg/util/json_util_internal.h"
#include "iceberg/util/macros.h"
Expand Down Expand Up @@ -336,6 +339,57 @@ Result<ListTablesResponse> ListTablesResponseFromJson(const nlohmann::json& json
return response;
}

nlohmann::json ToJson(const CreateTableRequest& request) {
nlohmann::json json;
json[kName] = request.name;
SetOptionalStringField(json, kLocation, request.location);
if (request.schema) {
json[kSchema] = ToJson(*request.schema);
}
if (request.partition_spec) {
json[kPartitionSpec] = ToJson(*request.partition_spec);
}
if (request.write_order) {
json[kWriteOrder] = ToJson(*request.write_order);
}
if (request.stage_create) {
json[kStageCreate] = request.stage_create;
}
SetContainerField(json, kProperties, request.properties);
return json;
}

Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json) {
CreateTableRequest request;
ICEBERG_ASSIGN_OR_RAISE(request.name, GetJsonValue<std::string>(json, kName));
ICEBERG_ASSIGN_OR_RAISE(request.location,
GetJsonValueOrDefault<std::string>(json, kLocation));
ICEBERG_ASSIGN_OR_RAISE(auto schema, GetJsonValue<nlohmann::json>(json, kSchema));
ICEBERG_ASSIGN_OR_RAISE(request.schema, SchemaFromJson(schema));

if (json.contains(kPartitionSpec)) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_spec,
GetJsonValue<nlohmann::json>(json, kPartitionSpec));
ICEBERG_ASSIGN_OR_RAISE(request.partition_spec,
PartitionSpecFromJson(request.schema, partition_spec,
PartitionSpec::kInitialSpecId));
}
if (json.contains(kWriteOrder)) {
ICEBERG_ASSIGN_OR_RAISE(auto sort_order,
GetJsonValue<nlohmann::json>(json, kWriteOrder));
ICEBERG_ASSIGN_OR_RAISE(request.write_order,
SortOrderFromJson(sort_order, request.schema));
}

ICEBERG_ASSIGN_OR_RAISE(request.stage_create,
GetJsonValueOrDefault<bool>(json, kStageCreate, false));
ICEBERG_ASSIGN_OR_RAISE(
request.properties,
GetJsonValueOrDefault<decltype(request.properties)>(json, kProperties));
ICEBERG_RETURN_UNEXPECTED(request.Validate());
return request;
}

#define ICEBERG_DEFINE_FROM_JSON(Model) \
template <> \
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
Expand All @@ -354,5 +408,6 @@ ICEBERG_DEFINE_FROM_JSON(ListTablesResponse)
ICEBERG_DEFINE_FROM_JSON(LoadTableResult)
ICEBERG_DEFINE_FROM_JSON(RegisterTableRequest)
ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)

} // namespace iceberg::rest
1 change: 1 addition & 0 deletions src/iceberg/catalog/rest/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ ICEBERG_DECLARE_JSON_SERDE(ListTablesResponse)
ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)

#undef ICEBERG_DECLARE_JSON_SERDE

Expand Down
1 change: 1 addition & 0 deletions src/iceberg/catalog/rest/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ iceberg_rest_sources = files(
'resource_paths.cc',
'rest_catalog.cc',
'rest_util.cc',
'types.cc',
)
# cpr does not export symbols, so on Windows it must
# be used as a static lib
Expand Down
51 changes: 41 additions & 10 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -77,9 +78,12 @@ Result<CatalogConfig> FetchServerConfig(const ResourcePaths& paths,

RestCatalog::~RestCatalog() = default;

Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
const RestCatalogProperties& config) {
Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
const RestCatalogProperties& config, std::shared_ptr<FileIO> file_io) {
ICEBERG_ASSIGN_OR_RAISE(auto uri, config.Uri());
if (!file_io) {
return InvalidArgument("FileIO is required to create RestCatalog");
}
ICEBERG_ASSIGN_OR_RAISE(
auto paths, ResourcePaths::Make(std::string(TrimTrailingSlash(uri)),
config.Get(RestCatalogProperties::kPrefix)));
Expand All @@ -103,14 +107,17 @@ Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri());
ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri))));

return std::unique_ptr<RestCatalog>(
new RestCatalog(std::move(final_config), std::move(paths), std::move(endpoints)));
return std::shared_ptr<RestCatalog>(
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),
std::move(endpoints)));
}

RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::shared_ptr<FileIO> file_io,
std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints)
: config_(std::move(config)),
file_io_(std::move(file_io)),
client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
paths_(std::move(paths)),
name_(config_->Get(RestCatalogProperties::kName)),
Expand Down Expand Up @@ -241,11 +248,33 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
}

Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
[[maybe_unused]] const std::string& location,
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("Not implemented");
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::CreateTable()));
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));

CreateTableRequest request{
.name = identifier.name,
.location = location,
.schema = schema,
.partition_spec = spec,
.write_order = order,
.stage_create = false,
.properties = properties,
};

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
return Table::Make(identifier, load_result.metadata,
std::move(load_result.metadata_location), file_io_,
shared_from_this());
}

Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
Expand All @@ -257,7 +286,9 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(

Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
[[maybe_unused]] const std::shared_ptr<Schema>& schema,
[[maybe_unused]] const std::shared_ptr<PartitionSpec>& spec,
[[maybe_unused]] const std::shared_ptr<SortOrder>& order,
[[maybe_unused]] const std::string& location,
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
return NotImplemented("Not implemented");
Expand Down
20 changes: 13 additions & 7 deletions src/iceberg/catalog/rest/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#pragma once

#include <memory>
#include <set>
#include <string>
#include <unordered_set>

#include "iceberg/catalog.h"
#include "iceberg/catalog/rest/endpoint.h"
Expand All @@ -35,7 +35,8 @@
namespace iceberg::rest {

/// \brief Rest catalog implementation.
class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
public std::enable_shared_from_this<RestCatalog> {
public:
~RestCatalog() override;

Expand All @@ -47,8 +48,10 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
/// \brief Create a RestCatalog instance
///
/// \param config the configuration for the RestCatalog
/// \return a unique_ptr to RestCatalog instance
static Result<std::unique_ptr<RestCatalog>> Make(const RestCatalogProperties& config);
/// \param file_io the FileIO instance to use for table operations
/// \return a shared_ptr to RestCatalog instance
static Result<std::shared_ptr<RestCatalog>> Make(const RestCatalogProperties& config,
std::shared_ptr<FileIO> file_io);

std::string_view name() const override;

Expand All @@ -72,7 +75,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;

Result<std::shared_ptr<Table>> CreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;

Expand All @@ -82,7 +86,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;

Result<std::shared_ptr<Transaction>> StageCreateTable(
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) override;

Expand All @@ -100,10 +105,11 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {

private:
RestCatalog(std::unique_ptr<RestCatalogProperties> config,
std::unique_ptr<ResourcePaths> paths,
std::shared_ptr<FileIO> file_io, std::unique_ptr<ResourcePaths> paths,
std::unordered_set<Endpoint> endpoints);

std::unique_ptr<RestCatalogProperties> config_;
std::shared_ptr<FileIO> file_io_;
std::unique_ptr<HttpClient> client_;
std::unique_ptr<ResourcePaths> paths_;
std::string name_;
Expand Down
Loading
Loading