From b3e081568a76828f5d378a311f2b2916013b9bed Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 17 Dec 2025 11:42:51 +0800 Subject: [PATCH 1/3] feat: implement transaction api - Introduce `Transaction` class to manage multi-operation table updates. - Add `Table::NewTransaction()` and `Table::NewUpdateProperties()` to initiate updates. - Move `PendingUpdate` and `UpdateProperties` to `src/iceberg/update/` and refactor them to use the transaction mechanism. - Add `StagedTable` to represent tables with uncommitted changes. - Update `InMemoryCatalog` and `RestCatalog` to align with the new update flow. - Refactor `Table` to support metadata refresh and location management within transactions. - Add comprehensive tests for transactions and property updates. --- example/demo_example.cc | 8 +- src/iceberg/CMakeLists.txt | 2 + src/iceberg/catalog.h | 6 +- .../catalog/memory/in_memory_catalog.cc | 21 +- .../catalog/memory/in_memory_catalog.h | 6 +- src/iceberg/catalog/rest/rest_catalog.cc | 6 +- src/iceberg/catalog/rest/rest_catalog.h | 6 +- src/iceberg/meson.build | 5 +- src/iceberg/pending_update.h | 72 ------ src/iceberg/table.cc | 102 +++++++- src/iceberg/table.h | 93 ++++--- src/iceberg/table_metadata.cc | 8 + src/iceberg/table_metadata.h | 9 + src/iceberg/table_update.cc | 6 +- src/iceberg/table_update.h | 59 ++++- src/iceberg/test/CMakeLists.txt | 29 ++- src/iceberg/test/in_memory_catalog_test.cc | 52 ++-- src/iceberg/test/meson.build | 6 +- src/iceberg/test/mock_catalog.h | 6 +- src/iceberg/test/mock_io.h | 42 ++++ src/iceberg/test/table_test.cc | 196 ++++++++------- src/iceberg/test/transaction_test.cc | 90 +++++++ src/iceberg/test/update_properties_test.cc | 226 +++++++----------- src/iceberg/test/update_test_base.h | 75 ++++++ src/iceberg/transaction.cc | 111 +++++++++ src/iceberg/transaction.h | 66 +++-- src/iceberg/type_fwd.h | 87 ++++--- src/iceberg/update/meson.build | 21 ++ src/iceberg/update/pending_update.cc | 38 +++ src/iceberg/update/pending_update.h | 87 +++++++ src/iceberg/update/update_properties.cc | 88 ++++--- src/iceberg/update/update_properties.h | 35 +-- 32 files changed, 1125 insertions(+), 539 deletions(-) delete mode 100644 src/iceberg/pending_update.h create mode 100644 src/iceberg/test/mock_io.h create mode 100644 src/iceberg/test/transaction_test.cc create mode 100644 src/iceberg/test/update_test_base.h create mode 100644 src/iceberg/transaction.cc create mode 100644 src/iceberg/update/meson.build create mode 100644 src/iceberg/update/pending_update.cc create mode 100644 src/iceberg/update/pending_update.h diff --git a/example/demo_example.cc b/example/demo_example.cc index c333c7971..ab011feec 100644 --- a/example/demo_example.cc +++ b/example/demo_example.cc @@ -58,7 +58,13 @@ int main(int argc, char** argv) { } auto table = std::move(load_result.value()); - auto scan_result = table->NewScan()->Build(); + auto scan_builder = table->NewScan(); + if (!scan_builder.has_value()) { + std::cerr << "Failed to create scan builder: " << scan_builder.error().message + << std::endl; + return 1; + } + auto scan_result = scan_builder.value()->Build(); if (!scan_result.has_value()) { std::cerr << "Failed to build scan: " << scan_result.error().message << std::endl; return 1; diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index a0d93967f..9c25015c3 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -72,9 +72,11 @@ set(ICEBERG_SOURCES table_requirements.cc table_scan.cc table_update.cc + transaction.cc transform.cc transform_function.cc type.cc + update/pending_update.cc update/update_properties.cc util/bucket_util.cc util/conversions.cc diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h index 6c4957ade..08965df81 100644 --- a/src/iceberg/catalog.h +++ b/src/iceberg/catalog.h @@ -110,7 +110,7 @@ class ICEBERG_EXPORT Catalog { /// \param location a location for the table; leave empty if unspecified /// \param properties a string map of table properties /// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists - virtual Result> CreateTable( + virtual Result> CreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, const std::string& location, const std::unordered_map& properties) = 0; @@ -121,7 +121,7 @@ class ICEBERG_EXPORT Catalog { /// \param requirements a list of table requirements /// \param updates a list of table updates /// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists - virtual Result> UpdateTable( + virtual Result> UpdateTable( const TableIdentifier& identifier, const std::vector>& requirements, const std::vector>& updates) = 0; @@ -175,7 +175,7 @@ class ICEBERG_EXPORT Catalog { /// \param identifier a table identifier /// \return instance of Table implementation referred to by identifier or /// ErrorKind::kNoSuchTable if the table does not exist - virtual Result> LoadTable(const TableIdentifier& identifier) = 0; + virtual Result> LoadTable(const TableIdentifier& identifier) = 0; /// \brief Register a table with the catalog if it does not exist /// diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index dc6d9d009..a0c143c5e 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -399,7 +399,7 @@ Result> InMemoryCatalog::ListTables( return table_idents; } -Result> InMemoryCatalog::CreateTable( +Result> InMemoryCatalog::CreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, const std::string& location, const std::unordered_map& properties) { @@ -407,7 +407,7 @@ Result> InMemoryCatalog::CreateTable( return NotImplemented("create table"); } -Result> InMemoryCatalog::UpdateTable( +Result> InMemoryCatalog::UpdateTable( const TableIdentifier& identifier, const std::vector>& requirements, const std::vector>& updates) { @@ -434,9 +434,8 @@ Result> InMemoryCatalog::UpdateTable( root_namespace_->UpdateTableMetadataLocation(identifier, new_metadata_location)); TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated); - return std::make_unique(identifier, std::move(updated), - std::move(new_metadata_location), file_io_, - std::static_pointer_cast(shared_from_this())); + return Table::Make(identifier, std::move(updated), std::move(new_metadata_location), + file_io_, shared_from_this()); } Result> InMemoryCatalog::StageCreateTable( @@ -464,7 +463,7 @@ Status InMemoryCatalog::RenameTable(const TableIdentifier& from, return NotImplemented("rename table"); } -Result> InMemoryCatalog::LoadTable( +Result> InMemoryCatalog::LoadTable( const TableIdentifier& identifier) { if (!file_io_) [[unlikely]] { return InvalidArgument("file_io is not set for catalog {}", catalog_name_); @@ -479,9 +478,8 @@ Result> InMemoryCatalog::LoadTable( ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadataUtil::Read(*file_io_, metadata_location)); - return std::make_unique
(identifier, std::move(metadata), - std::move(metadata_location), file_io_, - std::static_pointer_cast(shared_from_this())); + return Table::Make(identifier, std::move(metadata), std::move(metadata_location), + file_io_, shared_from_this()); } Result> InMemoryCatalog::RegisterTable( @@ -500,9 +498,8 @@ Result> InMemoryCatalog::RegisterTable( if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) { return UnknownError("The registry failed."); } - return std::make_unique
(identifier, std::move(metadata), metadata_file_location, - file_io_, - std::static_pointer_cast(shared_from_this())); + return Table::Make(identifier, std::move(metadata), metadata_file_location, file_io_, + shared_from_this()); } } // namespace iceberg diff --git a/src/iceberg/catalog/memory/in_memory_catalog.h b/src/iceberg/catalog/memory/in_memory_catalog.h index e6a9acbce..dd72dd89c 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.h +++ b/src/iceberg/catalog/memory/in_memory_catalog.h @@ -70,12 +70,12 @@ class ICEBERG_EXPORT InMemoryCatalog Result> ListTables(const Namespace& ns) const override; - Result> CreateTable( + Result> CreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, const std::string& location, const std::unordered_map& properties) override; - Result> UpdateTable( + Result> UpdateTable( const TableIdentifier& identifier, const std::vector>& requirements, const std::vector>& updates) override; @@ -91,7 +91,7 @@ class ICEBERG_EXPORT InMemoryCatalog Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override; - Result> LoadTable(const TableIdentifier& identifier) override; + Result> LoadTable(const TableIdentifier& identifier) override; Result> RegisterTable( const TableIdentifier& identifier, diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index b9dfaafc7..0d14ea386 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -240,7 +240,7 @@ Result> RestCatalog::ListTables( return NotImplemented("Not implemented"); } -Result> RestCatalog::CreateTable( +Result> RestCatalog::CreateTable( [[maybe_unused]] const TableIdentifier& identifier, [[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec, [[maybe_unused]] const std::string& location, @@ -248,7 +248,7 @@ Result> RestCatalog::CreateTable( return NotImplemented("Not implemented"); } -Result> RestCatalog::UpdateTable( +Result> RestCatalog::UpdateTable( [[maybe_unused]] const TableIdentifier& identifier, [[maybe_unused]] const std::vector>& requirements, [[maybe_unused]] const std::vector>& updates) { @@ -278,7 +278,7 @@ Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from, return NotImplemented("Not implemented"); } -Result> RestCatalog::LoadTable( +Result> RestCatalog::LoadTable( [[maybe_unused]] const TableIdentifier& identifier) { return NotImplemented("Not implemented"); } diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index c8ddca587..266168274 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -71,12 +71,12 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog { Result> ListTables(const Namespace& ns) const override; - Result> CreateTable( + Result> CreateTable( const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec, const std::string& location, const std::unordered_map& properties) override; - Result> UpdateTable( + Result> UpdateTable( const TableIdentifier& identifier, const std::vector>& requirements, const std::vector>& updates) override; @@ -92,7 +92,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog { Status DropTable(const TableIdentifier& identifier, bool purge) override; - Result> LoadTable(const TableIdentifier& identifier) override; + Result> LoadTable(const TableIdentifier& identifier) override; Result> RegisterTable( const TableIdentifier& identifier, diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index d473d72e1..21a41dcf2 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -94,9 +94,11 @@ iceberg_sources = files( 'table_requirements.cc', 'table_scan.cc', 'table_update.cc', + 'transaction.cc', 'transform.cc', 'transform_function.cc', 'type.cc', + 'update/pending_update.cc', 'update/update_properties.cc', 'util/bucket_util.cc', 'util/conversions.cc', @@ -175,7 +177,6 @@ install_headers( 'name_mapping.h', 'partition_field.h', 'partition_spec.h', - 'pending_update.h', 'result.h', 'schema_field.h', 'schema.h', @@ -196,7 +197,6 @@ install_headers( 'transform.h', 'type_fwd.h', 'type.h', - 'update/update_properties.h', ], subdir: 'iceberg', ) @@ -205,6 +205,7 @@ subdir('catalog') subdir('expression') subdir('manifest') subdir('row') +subdir('update') subdir('util') if get_option('tests').enabled() diff --git a/src/iceberg/pending_update.h b/src/iceberg/pending_update.h deleted file mode 100644 index 8370db141..000000000 --- a/src/iceberg/pending_update.h +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -/// \file iceberg/pending_update.h -/// API for table changes using builder pattern - -#include "iceberg/iceberg_export.h" -#include "iceberg/result.h" -#include "iceberg/type_fwd.h" -#include "iceberg/util/error_collector.h" - -namespace iceberg { - -/// \brief Base class for table metadata changes using builder pattern -/// -/// This base class allows storing different types of PendingUpdate operations -/// in the same collection (e.g., in Transaction). It provides the common Commit() -/// interface that all updates share. -/// -/// This matches the Java Iceberg pattern where BaseTransaction stores a -/// List without type parameters. -class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { - public: - virtual ~PendingUpdate() = default; - - /// \brief Verify that the changes are valid and apply them. - /// \return Status::OK if the changes are valid, or an error: - /// - ValidationFailed: if pending changes cannot be applied - /// - InvalidArgument: if pending changes are conflicting - virtual Status Apply() = 0; - - /// \brief Apply and commit the pending changes to the table - /// - /// Changes are committed by calling the underlying table's commit operation. - /// - /// Once the commit is successful, the updated table will be refreshed. - /// - /// \return Status::OK if the commit was successful, or an error: - /// - ValidationFailed: if update cannot be applied to current metadata - /// - CommitFailed: if update cannot be committed due to conflicts - /// - CommitStateUnknown: if commit success state is unknown - virtual Status Commit() = 0; - - // Non-copyable, movable - PendingUpdate(const PendingUpdate&) = delete; - PendingUpdate& operator=(const PendingUpdate&) = delete; - PendingUpdate(PendingUpdate&&) noexcept = default; - PendingUpdate& operator=(PendingUpdate&&) noexcept = default; - - protected: - PendingUpdate() = default; -}; - -} // namespace iceberg diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index 09ff7bda1..b5a1e582e 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -21,16 +21,40 @@ #include "iceberg/catalog.h" #include "iceberg/partition_spec.h" +#include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/table_scan.h" +#include "iceberg/transaction.h" #include "iceberg/update/update_properties.h" #include "iceberg/util/macros.h" namespace iceberg { +Result> Table::Make(TableIdentifier identifier, + std::shared_ptr metadata, + std::string metadata_location, + std::shared_ptr io, + std::shared_ptr catalog) { + if (metadata == nullptr) [[unlikely]] { + return InvalidArgument("Metadata cannot be null"); + } + if (metadata_location.empty()) [[unlikely]] { + return InvalidArgument("Metadata location cannot be empty"); + } + if (io == nullptr) [[unlikely]] { + return InvalidArgument("FileIO cannot be null"); + } + if (catalog == nullptr) [[unlikely]] { + return InvalidArgument("Catalog cannot be null"); + } + return std::shared_ptr
(new Table(std::move(identifier), std::move(metadata), + std::move(metadata_location), std::move(io), + std::move(catalog))); +} + Table::~Table() = default; Table::Table(TableIdentifier identifier, std::shared_ptr metadata, @@ -46,10 +70,6 @@ Table::Table(TableIdentifier identifier, std::shared_ptr metadata const std::string& Table::uuid() const { return metadata_->table_uuid; } Status Table::Refresh() { - if (!catalog_) { - return NotSupported("Refresh is not supported for table without a catalog"); - } - ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_)); if (metadata_location_ != refreshed_table->metadata_file_location()) { metadata_ = std::move(refreshed_table->metadata_); @@ -110,18 +130,78 @@ const std::vector& Table::history() const { return metadata_->snapshot_log; } -std::unique_ptr Table::UpdateProperties() const { - return std::make_unique(identifier_, catalog_, metadata_); +const std::shared_ptr& Table::io() const { return io_; } + +const std::shared_ptr& Table::metadata() const { return metadata_; } + +const std::shared_ptr& Table::catalog() const { return catalog_; } + +Result> Table::NewScan() const { + return std::make_unique(metadata_, io_); } -std::unique_ptr Table::NewTransaction() const { - throw NotImplemented("Table::NewTransaction is not implemented"); +Result> Table::NewTransaction() { + // Create a brand new transaction object for the table. Users are expected to commit the + // transaction manually. + return Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/false); } -const std::shared_ptr& Table::io() const { return io_; } +Result> Table::NewUpdateProperties() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewUpdateProperties(); +} -std::unique_ptr Table::NewScan() const { - return std::make_unique(metadata_, io_); +Result> StagedTable::Make( + TableIdentifier identifier, std::shared_ptr metadata, + std::string metadata_location, std::shared_ptr io, + std::shared_ptr catalog) { + if (metadata == nullptr) [[unlikely]] { + return InvalidArgument("Metadata cannot be null"); + } + if (io == nullptr) [[unlikely]] { + return InvalidArgument("FileIO cannot be null"); + } + if (catalog == nullptr) [[unlikely]] { + return InvalidArgument("Catalog cannot be null"); + } + return std::shared_ptr( + new StagedTable(std::move(identifier), std::move(metadata), + std::move(metadata_location), std::move(io), std::move(catalog))); +} + +StagedTable::~StagedTable() = default; + +Result> StagedTable::NewScan() const { + return NotSupported("Cannot scan a staged table"); +} + +Result> StaticTable::Make( + TableIdentifier identifier, std::shared_ptr metadata, + std::string metadata_location, std::shared_ptr io) { + if (metadata == nullptr) [[unlikely]] { + return InvalidArgument("Metadata cannot be null"); + } + if (io == nullptr) [[unlikely]] { + return InvalidArgument("FileIO cannot be null"); + } + return std::shared_ptr( + new StaticTable(std::move(identifier), std::move(metadata), + std::move(metadata_location), std::move(io), /*catalog=*/nullptr)); +} + +StaticTable::~StaticTable() = default; + +Status StaticTable::Refresh() { return NotSupported("Cannot refresh a static table"); } + +Result> StaticTable::NewTransaction() { + return NotSupported("Cannot create a transaction for a static table"); +} + +Result> StaticTable::NewUpdateProperties() { + return NotSupported("Cannot create an update properties for a static table"); } } // namespace iceberg diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 98e6b998c..57f8c43af 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -34,20 +34,21 @@ namespace iceberg { /// \brief Represents an Iceberg table -class ICEBERG_EXPORT Table { +class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ public: - ~Table(); - /// \brief Construct a table. /// \param[in] identifier The identifier of the table. /// \param[in] metadata The metadata for the table. /// \param[in] metadata_location The location of the table metadata file. /// \param[in] io The FileIO to read and write table data and metadata files. - /// \param[in] catalog The catalog that this table belongs to. If null, the table will - /// be read-only. - Table(TableIdentifier identifier, std::shared_ptr metadata, - std::string metadata_location, std::shared_ptr io, - std::shared_ptr catalog); + /// \param[in] catalog The catalog that this table belongs to. + static Result> Make(TableIdentifier identifier, + std::shared_ptr metadata, + std::string metadata_location, + std::shared_ptr io, + std::shared_ptr catalog); + + virtual ~Table(); /// \brief Return the identifier of this table const TableIdentifier& name() const { return identifier_; } @@ -55,9 +56,6 @@ class ICEBERG_EXPORT Table { /// \brief Returns the UUID of the table const std::string& uuid() const; - /// \brief Refresh the current table metadata - Status Refresh(); - /// \brief Return the schema for this table, return NotFoundError if not found Result> schema() const; @@ -107,34 +105,38 @@ class ICEBERG_EXPORT Table { const std::vector>& snapshots() const; /// \brief Get the snapshot history of this table - /// - /// \return a vector of history entries const std::vector& history() const; - /// \brief Create a new UpdateProperties to update table properties and commit the - /// changes - /// - /// \return a new UpdateProperties instance - virtual std::unique_ptr UpdateProperties() const; + /// \brief Returns a FileIO to read and write table data and metadata files + const std::shared_ptr& io() const; + + /// \brief Returns the current metadata for this table + const std::shared_ptr& metadata() const; + + /// \brief Returns the catalog that this table belongs to + const std::shared_ptr& catalog() const; + + /// \brief Refresh the current table metadata + virtual Status Refresh(); /// \brief Create a new table scan builder for this table /// /// Once a table scan builder is created, it can be refined to project columns and /// filter data. - virtual std::unique_ptr NewScan() const; + virtual Result> NewScan() const; - /// \brief Create a new transaction for this table - /// - /// \return a pointer to the new Transaction - virtual std::unique_ptr NewTransaction() const; + /// \brief Create a new Transaction to commit multiple table operations at once. + virtual Result> NewTransaction(); - /// \brief Returns a FileIO to read and write table data and metadata files - const std::shared_ptr& io() const; + /// \brief Create a new UpdateProperties to update table properties and commit the + /// changes. + virtual Result> NewUpdateProperties(); - /// \brief Returns the current metadata for this table - const std::shared_ptr& metadata() const; + protected: + Table(TableIdentifier identifier, std::shared_ptr metadata, + std::string metadata_location, std::shared_ptr io, + std::shared_ptr catalog); - private: const TableIdentifier identifier_; std::shared_ptr metadata_; std::string metadata_location_; @@ -143,4 +145,39 @@ class ICEBERG_EXPORT Table { std::unique_ptr metadata_cache_; }; +class ICEBERG_EXPORT StagedTable : public Table { + public: + static Result> Make( + TableIdentifier identifier, std::shared_ptr metadata, + std::string metadata_location, std::shared_ptr io, + std::shared_ptr catalog); + + ~StagedTable() override; + + Status Refresh() override { return {}; } + + Result> NewScan() const override; + + private: + using Table::Table; +}; + +class ICEBERG_EXPORT StaticTable : public Table { + public: + static Result> Make( + TableIdentifier identifier, std::shared_ptr metadata, + std::string metadata_location, std::shared_ptr io); + + ~StaticTable() override; + + Status Refresh() override; + + Result> NewTransaction() override; + + Result> NewUpdateProperties() override; + + private: + using Table::Table; +}; + } // namespace iceberg diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 4e814e027..3d5f4cf1b 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -790,4 +790,12 @@ Result> TableMetadataBuilder::Build() { return std::make_unique(std::move(impl_->metadata)); } +const std::vector>& TableMetadataBuilder::changes() const { + return impl_->changes; +} + +const TableMetadata* TableMetadataBuilder::base() const { return impl_->base; } + +const TableMetadata* TableMetadataBuilder::current() const { return &impl_->metadata; } + } // namespace iceberg diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index f428fd34f..bdfea2309 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -419,6 +419,15 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \return A Result containing the constructed TableMetadata or an error Result> Build(); + /// \brief Return the changes made to the table metadata + const std::vector>& changes() const; + + /// \brief Return the base metadata without any changes + const TableMetadata* base() const; + + /// \brief Return the current metadata with staged changes applied + const TableMetadata* current() const; + /// \brief Destructor ~TableMetadataBuilder() override; diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 90f7de622..87fcd1349 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -23,6 +23,10 @@ #include "iceberg/table_metadata.h" #include "iceberg/table_requirements.h" +namespace iceberg { +TableUpdate::~TableUpdate() = default; +} + namespace iceberg::table { // AssignUUID @@ -38,7 +42,7 @@ void AssignUUID::GenerateRequirements(TableUpdateContext& context) const { // UpgradeFormatVersion void UpgradeFormatVersion::ApplyTo(TableMetadataBuilder& builder) const { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + builder.UpgradeFormatVersion(format_version_); } void UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) const { diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index 93b48cf27..a040cb36c 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -40,7 +40,30 @@ namespace iceberg { /// represents a specific type of update operation. class ICEBERG_EXPORT TableUpdate { public: - virtual ~TableUpdate() = default; + enum class Kind : uint8_t { + kAssignUUID, + kUpgradeFormatVersion, + kAddSchema, + kSetCurrentSchema, + kAddPartitionSpec, + kSetDefaultPartitionSpec, + kRemovePartitionSpecs, + kRemoveSchemas, + kAddSortOrder, + kSetDefaultSortOrder, + kAddSnapshot, + kRemoveSnapshots, + kRemoveSnapshotRef, + kSetSnapshotRef, + kSetProperties, + kRemoveProperties, + kSetLocation, + }; + + virtual ~TableUpdate(); + + /// \brief Return the kind of this update. + virtual Kind kind() const = 0; /// \brief Apply this update to a TableMetadataBuilder /// @@ -74,6 +97,8 @@ class ICEBERG_EXPORT AssignUUID : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kAssignUUID; } + private: std::string uuid_; }; @@ -90,6 +115,8 @@ class ICEBERG_EXPORT UpgradeFormatVersion : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kUpgradeFormatVersion; } + private: int8_t format_version_; }; @@ -108,6 +135,8 @@ class ICEBERG_EXPORT AddSchema : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kAddSchema; } + private: std::shared_ptr schema_; int32_t last_column_id_; @@ -124,6 +153,8 @@ class ICEBERG_EXPORT SetCurrentSchema : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kSetCurrentSchema; } + private: int32_t schema_id_; }; @@ -140,6 +171,8 @@ class ICEBERG_EXPORT AddPartitionSpec : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kAddPartitionSpec; } + private: std::shared_ptr spec_; }; @@ -155,6 +188,8 @@ class ICEBERG_EXPORT SetDefaultPartitionSpec : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kSetDefaultPartitionSpec; } + private: int32_t spec_id_; }; @@ -171,6 +206,8 @@ class ICEBERG_EXPORT RemovePartitionSpecs : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kRemovePartitionSpecs; } + private: std::vector spec_ids_; }; @@ -187,6 +224,8 @@ class ICEBERG_EXPORT RemoveSchemas : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kRemoveSchemas; } + private: std::vector schema_ids_; }; @@ -203,6 +242,8 @@ class ICEBERG_EXPORT AddSortOrder : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kAddSortOrder; } + private: std::shared_ptr sort_order_; }; @@ -218,6 +259,8 @@ class ICEBERG_EXPORT SetDefaultSortOrder : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kSetDefaultSortOrder; } + private: int32_t sort_order_id_; }; @@ -234,6 +277,8 @@ class ICEBERG_EXPORT AddSnapshot : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kAddSnapshot; } + private: std::shared_ptr snapshot_; }; @@ -250,6 +295,8 @@ class ICEBERG_EXPORT RemoveSnapshots : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kRemoveSnapshots; } + private: std::vector snapshot_ids_; }; @@ -265,6 +312,8 @@ class ICEBERG_EXPORT RemoveSnapshotRef : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kRemoveSnapshotRef; } + private: std::string ref_name_; }; @@ -298,6 +347,8 @@ class ICEBERG_EXPORT SetSnapshotRef : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kSetSnapshotRef; } + private: std::string ref_name_; int64_t snapshot_id_; @@ -319,6 +370,8 @@ class ICEBERG_EXPORT SetProperties : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kSetProperties; } + private: std::unordered_map updated_; }; @@ -335,6 +388,8 @@ class ICEBERG_EXPORT RemoveProperties : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kRemoveProperties; } + private: std::vector removed_; }; @@ -350,6 +405,8 @@ class ICEBERG_EXPORT SetLocation : public TableUpdate { void GenerateRequirements(TableUpdateContext& context) const override; + Kind kind() const override { return Kind::kSetLocation; } + private: std::string location_; }; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 2c4e0f512..2af7d1c4e 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -55,39 +55,36 @@ endfunction() add_iceberg_test(schema_test SOURCES name_mapping_test.cc - schema_test.cc - schema_field_test.cc - type_test.cc - transform_test.cc partition_field_test.cc partition_spec_test.cc partition_value_test.cc + schema_field_test.cc + schema_test.cc + schema_util_test.cc sort_field_test.cc sort_order_test.cc - snapshot_test.cc - schema_util_test.cc) + transform_test.cc + type_test.cc) add_iceberg_test(table_test SOURCES - json_internal_test.cc metrics_config_test.cc - schema_json_test.cc - table_test.cc + snapshot_test.cc table_metadata_builder_test.cc table_requirement_test.cc table_requirements_test.cc - table_update_test.cc - update_properties_test.cc) + table_test.cc + table_update_test.cc) add_iceberg_test(expression_test SOURCES aggregate_test.cc expression_test.cc expression_visitor_test.cc - literal_test.cc - manifest_evaluator_test.cc inclusive_metrics_evaluator_test.cc inclusive_metrics_evaluator_with_transform_test.cc + literal_test.cc + manifest_evaluator_test.cc predicate_test.cc projections_test.cc residual_evaluator_test.cc @@ -151,6 +148,12 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(scan_test USE_BUNDLE SOURCES file_scan_task_test.cc) + add_iceberg_test(table_update_test + USE_BUNDLE + SOURCES + transaction_test.cc + update_properties_test.cc) + endif() if(ICEBERG_BUILD_REST) diff --git a/src/iceberg/test/in_memory_catalog_test.cc b/src/iceberg/test/in_memory_catalog_test.cc index d1a8ccef9..194d6da51 100644 --- a/src/iceberg/test/in_memory_catalog_test.cc +++ b/src/iceberg/test/in_memory_catalog_test.cc @@ -36,6 +36,7 @@ #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" +#include "iceberg/test/mock_io.h" #include "iceberg/test/test_resource.h" #include "iceberg/util/uuid.h" @@ -128,22 +129,21 @@ TEST_F(InMemoryCatalogTest, RefreshTable) { std::vector{SchemaField::MakeRequired(1, "x", int64())}, /*schema_id=*/1); - std::shared_ptr io; - + auto io = std::make_shared(); auto catalog = std::make_shared(); // Mock 1st call to LoadTable EXPECT_CALL(*catalog, LoadTable(::testing::_)) .WillOnce(::testing::Return( - std::make_unique
(table_ident, - std::make_shared(TableMetadata{ - .schemas = {schema}, - .current_schema_id = 1, - .current_snapshot_id = 1, - .snapshots = {std::make_shared(Snapshot{ - .snapshot_id = 1, - .sequence_number = 1, - })}}), - "s3://location/1.json", io, catalog))); + Table::Make(table_ident, + std::make_shared( + TableMetadata{.schemas = {schema}, + .current_schema_id = 1, + .current_snapshot_id = 1, + .snapshots = {std::make_shared(Snapshot{ + .snapshot_id = 1, + .sequence_number = 1, + })}}), + "s3://location/1.json", io, catalog))); auto load_table_result = catalog->LoadTable(table_ident); ASSERT_THAT(load_table_result, IsOk()); auto loaded_table = std::move(load_table_result.value()); @@ -152,20 +152,20 @@ TEST_F(InMemoryCatalogTest, RefreshTable) { // Mock 2nd call to LoadTable EXPECT_CALL(*catalog, LoadTable(::testing::_)) .WillOnce(::testing::Return( - std::make_unique
(table_ident, - std::make_shared(TableMetadata{ - .schemas = {schema}, - .current_schema_id = 1, - .current_snapshot_id = 2, - .snapshots = {std::make_shared(Snapshot{ - .snapshot_id = 1, - .sequence_number = 1, - }), - std::make_shared(Snapshot{ - .snapshot_id = 2, - .sequence_number = 2, - })}}), - "s3://location/2.json", io, catalog))); + Table::Make(table_ident, + std::make_shared( + TableMetadata{.schemas = {schema}, + .current_schema_id = 1, + .current_snapshot_id = 2, + .snapshots = {std::make_shared(Snapshot{ + .snapshot_id = 1, + .sequence_number = 1, + }), + std::make_shared(Snapshot{ + .snapshot_id = 2, + .sequence_number = 2, + })}}), + "s3://location/2.json", io, catalog))); auto refreshed_result = loaded_table->Refresh(); ASSERT_THAT(refreshed_result, IsOk()); // check table is refreshed diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index f12b43afa..5ccab940c 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -37,7 +37,6 @@ iceberg_tests = { 'schema_field_test.cc', 'schema_test.cc', 'schema_util_test.cc', - 'snapshot_test.cc', 'sort_field_test.cc', 'sort_order_test.cc', 'transform_test.cc', @@ -46,14 +45,13 @@ iceberg_tests = { }, 'table_test': { 'sources': files( - 'json_internal_test.cc', 'metrics_config_test.cc', - 'schema_json_test.cc', + 'snapshot_test.cc', 'table_metadata_builder_test.cc', 'table_requirement_test.cc', + 'table_requirements_test.cc', 'table_test.cc', 'table_update_test.cc', - 'update_properties_test.cc', ), }, 'expression_test': { diff --git a/src/iceberg/test/mock_catalog.h b/src/iceberg/test/mock_catalog.h index 46f01c8db..1f43cfab7 100644 --- a/src/iceberg/test/mock_catalog.h +++ b/src/iceberg/test/mock_catalog.h @@ -55,12 +55,12 @@ class MockCatalog : public Catalog { MOCK_METHOD((Result>), ListTables, (const Namespace&), (const, override)); - MOCK_METHOD((Result>), CreateTable, + MOCK_METHOD((Result>), CreateTable, (const TableIdentifier&, const Schema&, const PartitionSpec&, const std::string&, (const std::unordered_map&)), (override)); - MOCK_METHOD((Result>), UpdateTable, + MOCK_METHOD((Result>), UpdateTable, (const TableIdentifier&, (const std::vector>&), (const std::vector>&)), @@ -78,7 +78,7 @@ class MockCatalog : public Catalog { MOCK_METHOD(Status, RenameTable, (const TableIdentifier&, const TableIdentifier&), (override)); - MOCK_METHOD((Result>), LoadTable, (const TableIdentifier&), + MOCK_METHOD((Result>), LoadTable, (const TableIdentifier&), (override)); MOCK_METHOD((Result>), RegisterTable, diff --git a/src/iceberg/test/mock_io.h b/src/iceberg/test/mock_io.h new file mode 100644 index 000000000..c9f38e505 --- /dev/null +++ b/src/iceberg/test/mock_io.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/file_io.h" + +namespace iceberg { + +class MockFileIO : public FileIO { + public: + MockFileIO() = default; + ~MockFileIO() override = default; + + MOCK_METHOD((Result), ReadFile, + (const std::string&, std::optional), (override)); + + MOCK_METHOD(Status, WriteFile, (const std::string&, std::string_view), (override)); + + MOCK_METHOD(Status, DeleteFile, (const std::string&), (override)); +}; + +} // namespace iceberg diff --git a/src/iceberg/test/table_test.cc b/src/iceberg/test/table_test.cc index 59710e0f6..e445d9011 100644 --- a/src/iceberg/test/table_test.cc +++ b/src/iceberg/test/table_test.cc @@ -19,99 +19,129 @@ #include "iceberg/table.h" +#include #include -#include -#include "iceberg/partition_spec.h" #include "iceberg/schema.h" -#include "iceberg/snapshot.h" +#include "iceberg/schema_field.h" #include "iceberg/table_metadata.h" #include "iceberg/test/matchers.h" -#include "iceberg/test/test_resource.h" +#include "iceberg/test/mock_catalog.h" +#include "iceberg/test/mock_io.h" namespace iceberg { -TEST(Table, TableV1) { - ICEBERG_UNWRAP_OR_FAIL(auto metadata, - ReadTableMetadataFromResource("TableMetadataV1Valid.json")); - TableIdentifier tableIdent{.ns = {}, .name = "test_table_v1"}; - Table table(tableIdent, std::move(metadata), "s3://bucket/test/location/meta/", nullptr, - nullptr); - ASSERT_EQ(table.name().name, "test_table_v1"); - - // Check table schema - auto schema = table.schema(); - ASSERT_TRUE(schema.has_value()); - ASSERT_EQ(schema.value()->fields().size(), 3); - auto schemas = table.schemas(); - ASSERT_TRUE(schemas->get().empty()); - - // Check table spec - auto spec = table.spec(); - ASSERT_TRUE(spec.has_value()); - auto specs = table.specs(); - ASSERT_EQ(1UL, specs->get().size()); - - // Check table sort_order - auto sort_order = table.sort_order(); - ASSERT_TRUE(sort_order.has_value()); - auto sort_orders = table.sort_orders(); - ASSERT_EQ(1UL, sort_orders->get().size()); - - // Check table location - auto location = table.location(); - ASSERT_EQ(location, "s3://bucket/test/location"); - - // Check table snapshots - auto snapshots = table.snapshots(); - ASSERT_TRUE(snapshots.empty()); - - auto io = table.io(); - ASSERT_TRUE(io == nullptr); +template +struct TableTraits; + +template <> +struct TableTraits
{ + static constexpr bool kRefreshSupported = true; + static constexpr bool kTransactionSupported = true; + + static Result> Make(const TableIdentifier& ident, + std::shared_ptr metadata, + const std::string& location, + std::shared_ptr io, + std::shared_ptr catalog) { + return Table::Make(ident, std::move(metadata), location, std::move(io), + std::move(catalog)); + } +}; + +template <> +struct TableTraits { + static constexpr bool kRefreshSupported = false; + static constexpr bool kTransactionSupported = false; + + static Result> Make( + const TableIdentifier& ident, std::shared_ptr metadata, + const std::string& location, std::shared_ptr io, std::shared_ptr) { + return StaticTable::Make(ident, std::move(metadata), location, std::move(io)); + } +}; + +template <> +struct TableTraits { + static constexpr bool kRefreshSupported = true; + static constexpr bool kTransactionSupported = true; + + static Result> Make( + const TableIdentifier& ident, std::shared_ptr metadata, + const std::string& location, std::shared_ptr io, + std::shared_ptr catalog) { + return StagedTable::Make(ident, std::move(metadata), location, std::move(io), + std::move(catalog)); + } +}; + +template +class TypedTableTest : public ::testing::Test { + protected: + using Traits = TableTraits; + + void SetUp() override { + io_ = std::make_shared(); + catalog_ = std::make_shared(); + + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int64()), + SchemaField::MakeOptional(2, "name", string())}, + 1); + metadata_ = std::make_shared( + TableMetadata{.format_version = 2, .schemas = {schema}, .current_schema_id = 1}); + } + + Result> MakeTable(const std::string& name) { + TableIdentifier ident{.ns = Namespace{.levels = {"db"}}, .name = name}; + return Traits::Make(ident, metadata_, "s3://bucket/meta.json", io_, catalog_); + } + + std::shared_ptr io_; + std::shared_ptr catalog_; + std::shared_ptr metadata_; +}; + +using TableTypes = ::testing::Types; +TYPED_TEST_SUITE(TypedTableTest, TableTypes); + +TYPED_TEST(TypedTableTest, BasicMetadata) { + ICEBERG_UNWRAP_OR_FAIL(auto table, this->MakeTable("test_table")); + + EXPECT_EQ(table->name().name, "test_table"); + EXPECT_EQ(table->name().ns.levels, (std::vector{"db"})); + EXPECT_EQ(table->metadata()->format_version, 2); + EXPECT_EQ(table->metadata()->schemas.size(), 1); } -TEST(Table, TableV2) { - ICEBERG_UNWRAP_OR_FAIL(auto metadata, - ReadTableMetadataFromResource("TableMetadataV2Valid.json")); - TableIdentifier tableIdent{.ns = {}, .name = "test_table_v2"}; - - Table table(tableIdent, std::move(metadata), "s3://bucket/test/location/meta/", nullptr, - nullptr); - ASSERT_EQ(table.name().name, "test_table_v2"); - - // Check table schema - auto schema = table.schema(); - ASSERT_TRUE(schema.has_value()); - ASSERT_EQ(schema.value()->fields().size(), 3); - auto schemas = table.schemas(); - ASSERT_FALSE(schemas->get().empty()); - - // Check partition spec - auto spec = table.spec(); - ASSERT_TRUE(spec.has_value()); - auto specs = table.specs(); - ASSERT_EQ(1UL, specs->get().size()); - - // Check sort order - auto sort_order = table.sort_order(); - ASSERT_TRUE(sort_order.has_value()); - auto sort_orders = table.sort_orders(); - ASSERT_EQ(1UL, sort_orders->get().size()); - - // Check table location - auto location = table.location(); - ASSERT_EQ(location, "s3://bucket/test/location"); - - // Check snapshot - auto snapshots = table.snapshots(); - ASSERT_EQ(2UL, snapshots.size()); - auto snapshot = table.current_snapshot(); - ASSERT_TRUE(snapshot.has_value()); - snapshot = table.SnapshotById(snapshot.value()->snapshot_id); - ASSERT_TRUE(snapshot.has_value()); - auto invalid_snapshot_id = 9999; - snapshot = table.SnapshotById(invalid_snapshot_id); - ASSERT_FALSE(snapshot.has_value()); +TYPED_TEST(TypedTableTest, Refresh) { + using Traits = typename TestFixture::Traits; + ICEBERG_UNWRAP_OR_FAIL(auto table, this->MakeTable("test_table")); + + if constexpr (Traits::kRefreshSupported) { + if constexpr (std::is_same_v) { + TableIdentifier ident{.ns = Namespace{.levels = {"db"}}, .name = "test_table"}; + ICEBERG_UNWRAP_OR_FAIL(auto refreshed, + Table::Make(ident, this->metadata_, "s3://bucket/meta2.json", + this->io_, this->catalog_)); + EXPECT_CALL(*this->catalog_, LoadTable(::testing::_)) + .WillOnce(::testing::Return(refreshed)); + } + EXPECT_THAT(table->Refresh(), IsOk()); + } else { + EXPECT_THAT(table->Refresh(), IsError(ErrorKind::kNotSupported)); + } +} + +TYPED_TEST(TypedTableTest, NewTransaction) { + using Traits = typename TestFixture::Traits; + ICEBERG_UNWRAP_OR_FAIL(auto table, this->MakeTable("test_table")); + + if constexpr (Traits::kTransactionSupported) { + EXPECT_THAT(table->NewTransaction(), IsOk()); + } else { + EXPECT_THAT(table->NewTransaction(), IsError(ErrorKind::kNotSupported)); + } } } // namespace iceberg diff --git a/src/iceberg/test/transaction_test.cc b/src/iceberg/test/transaction_test.cc new file mode 100644 index 000000000..b8a55d83c --- /dev/null +++ b/src/iceberg/test/transaction_test.cc @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/transaction.h" + +#include "iceberg/table.h" +#include "iceberg/table_update.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/update/update_properties.h" + +namespace iceberg { + +class TransactionTest : public UpdateTestBase {}; + +TEST_F(TransactionTest, CreateTransaction) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + EXPECT_NE(txn, nullptr); + EXPECT_EQ(txn->table(), table_); +} + +TEST_F(TransactionTest, UpdatePropertiesInTransaction) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties()); + + update->Set("key1", "value1"); + EXPECT_THAT(update->Apply(), IsOk()); +} + +TEST_F(TransactionTest, CommitEmptyTransaction) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + EXPECT_THAT(txn->Commit(), IsOk()); +} + +TEST_F(TransactionTest, CommitTransactionWithPropertyUpdate) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto update, txn->NewUpdateProperties()); + + update->Set("txn.property", "txn.value"); + EXPECT_THAT(update->Commit(), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); + EXPECT_NE(updated_table, nullptr); + + // Reload table and verify the property was set + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + const auto& props = reloaded->properties().configs(); + EXPECT_EQ(props.at("txn.property"), "txn.value"); +} + +TEST_F(TransactionTest, MultipleUpdatesInTransaction) { + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + + // First update + ICEBERG_UNWRAP_OR_FAIL(auto update1, txn->NewUpdateProperties()); + update1->Set("key1", "value1"); + EXPECT_THAT(update1->Commit(), IsOk()); + + // Second update + ICEBERG_UNWRAP_OR_FAIL(auto update2, txn->NewUpdateProperties()); + update2->Set("key2", "value2"); + EXPECT_THAT(update2->Commit(), IsOk()); + + // Commit transaction + ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); + + // Verify both properties were set + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + const auto& props = reloaded->properties().configs(); + EXPECT_EQ(props.at("key1"), "value1"); + EXPECT_EQ(props.at("key2"), "value2"); +} + +} // namespace iceberg diff --git a/src/iceberg/test/update_properties_test.cc b/src/iceberg/test/update_properties_test.cc index 13cfec831..1084f08fd 100644 --- a/src/iceberg/test/update_properties_test.cc +++ b/src/iceberg/test/update_properties_test.cc @@ -19,188 +19,122 @@ #include "iceberg/update/update_properties.h" -#include -#include -#include - -#include -#include - -#include "iceberg/file_format.h" -#include "iceberg/result.h" -#include "iceberg/schema.h" -#include "iceberg/schema_field.h" #include "iceberg/table.h" -#include "iceberg/table_identifier.h" -#include "iceberg/table_metadata.h" +#include "iceberg/table_update.h" #include "iceberg/test/matchers.h" -#include "iceberg/test/mock_catalog.h" +#include "iceberg/test/update_test_base.h" namespace iceberg { -class UpdatePropertiesTest : public ::testing::Test { - protected: - void SetUp() override { - // Create a simple schema - SchemaField f(1, "col1", std::make_shared(), false); - schema_ = std::make_shared(std::vector{f}, 1); - - // Create basic table metadata - metadata_ = std::make_shared(); - metadata_->schemas.push_back(schema_); - - // Create catalog and table identifier - catalog_ = std::make_shared(); - identifier_ = TableIdentifier(Namespace({"test"}), "table"); - } - - std::shared_ptr schema_; - std::shared_ptr metadata_; - std::shared_ptr catalog_; - TableIdentifier identifier_; -}; +class UpdatePropertiesTest : public UpdateTestBase {}; TEST_F(UpdatePropertiesTest, EmptyUpdates) { - UpdateProperties update(identifier_, catalog_, metadata_); - - auto result = update.Commit(); - EXPECT_THAT(result, IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + EXPECT_THAT(update->Commit(), IsOk()); } TEST_F(UpdatePropertiesTest, SetProperty) { - UpdateProperties update(identifier_, catalog_, metadata_); - update.Set("key1", "value1").Set("key2", "value2"); + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Set("key1", "value1").Set("key2", "value2"); - auto result = update.Apply(); - EXPECT_THAT(result, IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.updates.size(), 1); + EXPECT_EQ(result.updates[0]->kind(), table::SetProperties::Kind::kSetProperties); } TEST_F(UpdatePropertiesTest, RemoveProperty) { - UpdateProperties update(identifier_, catalog_, metadata_); - update.Remove("key1").Remove("key2"); + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Remove("key1").Remove("key2"); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.updates.size(), 1); + EXPECT_EQ(result.updates[0]->kind(), table::RemoveProperties::Kind::kRemoveProperties); +} + +TEST_F(UpdatePropertiesTest, SetThenRemoveSameKey) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Set("key1", "value1").Remove("key1"); - auto result = update.Apply(); - EXPECT_THAT(result, IsOk()); + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("already marked for update")); } -TEST_F(UpdatePropertiesTest, SetRemoveConflict) { - { - // Set a property that is already marked for removal - UpdateProperties update(identifier_, catalog_, metadata_); - update.Set("key1", "value1").Remove("key1"); - - auto result = update.Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); - EXPECT_THAT(result, HasErrorMessage("already marked for update")); - } - - { - // Remove a property that is already marked for update - UpdateProperties update(identifier_, catalog_, metadata_); - update.Remove("key1").Set("key1", "value1"); - - auto result = update.Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); - EXPECT_THAT(result, HasErrorMessage("already marked for removal")); - } +TEST_F(UpdatePropertiesTest, RemoveThenSetSameKey) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Remove("key1").Set("key1", "value1"); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("already marked for removal")); } -TEST_F(UpdatePropertiesTest, UpgradeFormatVersion) { - { - // Valid format-version upgrade - UpdateProperties update(identifier_, catalog_, metadata_); - update.Set("format-version", "2"); - - auto result = update.Apply(); - EXPECT_THAT(result, IsOk()); - } - - { - // Format-version is not a valid integer - UpdateProperties update(identifier_, catalog_, metadata_); - update.Set("format-version", "invalid"); - - auto result = update.Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("Invalid format version")); - } - - { - // Format-version is out of range - UpdateProperties update(identifier_, catalog_, metadata_); - update.Set("format-version", "5000000000"); - - auto result = update.Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("out of range")); - } - - { - // Format-version not supported - UpdateProperties update(identifier_, catalog_, metadata_); - update.Set("format-version", - std::to_string(TableMetadata::kSupportedTableFormatVersion + 1)); - - auto result = update.Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("unsupported format version")); - } +TEST_F(UpdatePropertiesTest, UpgradeFormatVersionValid) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Set("format-version", "2"); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.updates.size(), 1); + EXPECT_EQ(result.updates[0]->kind(), + table::UpgradeFormatVersion::Kind::kUpgradeFormatVersion); } -TEST_F(UpdatePropertiesTest, InvalidTable) { - { - // catalog is null - UpdateProperties update(identifier_, nullptr, metadata_); +TEST_F(UpdatePropertiesTest, UpgradeFormatVersionInvalidString) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Set("format-version", "invalid"); - auto result = update.Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("Catalog is required")); - } + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Invalid format version")); +} - { - // metadata is null - UpdateProperties update(identifier_, catalog_, nullptr); +TEST_F(UpdatePropertiesTest, UpgradeFormatVersionOutOfRange) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Set("format-version", "5000000000"); - auto result = update.Apply(); - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); - EXPECT_THAT(result, HasErrorMessage("Base table metadata is required")); - } + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("out of range")); } -TEST_F(UpdatePropertiesTest, Commit) { - { - // Successful commit - UpdateProperties update(identifier_, catalog_, metadata_); - update.Set("key1", "value1"); +TEST_F(UpdatePropertiesTest, UpgradeFormatVersionUnsupported) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Set("format-version", + std::to_string(TableMetadata::kSupportedTableFormatVersion + 1)); - EXPECT_CALL(*catalog_, UpdateTable).Times(1).WillOnce(::testing::Return(nullptr)); + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("unsupported format version")); +} - auto result = update.Commit(); - EXPECT_THAT(result, IsOk()); - } +TEST_F(UpdatePropertiesTest, CommitSuccess) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Set("new.property", "new.value"); - { - // Failed commit - UpdateProperties update(identifier_, catalog_, metadata_); - update.Set("key1", "value1"); + EXPECT_THAT(update->Commit(), IsOk()); - EXPECT_CALL(*catalog_, UpdateTable) - .WillOnce(::testing::Return(CommitFailed("Commit update failed"))); - auto result = update.Commit(); - EXPECT_THAT(result, IsError(ErrorKind::kCommitFailed)); - } + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + const auto& props = reloaded->properties().configs(); + EXPECT_EQ(props.at("new.property"), "new.value"); } TEST_F(UpdatePropertiesTest, FluentInterface) { - UpdateProperties update(identifier_, catalog_, metadata_); + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + auto& ref = update->Set("key1", "value1").Remove("key2"); - auto& ref = update.Set("key1", "value1").Remove("key2"); + EXPECT_EQ(&ref, update.get()); + EXPECT_THAT(update->Apply(), IsOk()); +} - // Should return reference to itself - EXPECT_EQ(&ref, &update); +TEST_F(UpdatePropertiesTest, SetAndRemoveDifferentKeys) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateProperties()); + update->Set("key1", "value1").Remove("key2"); + EXPECT_THAT(update->Commit(), IsOk()); - auto result = update.Apply(); - EXPECT_THAT(result, IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + const auto& props = reloaded->properties().configs(); + EXPECT_EQ(props.at("key1"), "value1"); + EXPECT_FALSE(props.contains("key2")); } } // namespace iceberg diff --git a/src/iceberg/test/update_test_base.h b/src/iceberg/test/update_test_base.h new file mode 100644 index 000000000..c78dc4d0e --- /dev/null +++ b/src/iceberg/test/update_test_base.h @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/catalog/memory/in_memory_catalog.h" +#include "iceberg/table.h" +#include "iceberg/table_identifier.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/test_resource.h" +#include "iceberg/util/uuid.h" + +namespace iceberg { + +// Base test fixture for table update operations +class UpdateTestBase : public ::testing::Test { + protected: + void SetUp() override { + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + catalog_ = + InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", /*properties=*/{}); + + // Arrow MockFS cannot automatically create directories. + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok()); + + // Write table metadata to the table location. + auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json", + table_location_, Uuid::GenerateV7().ToString()); + ICEBERG_UNWRAP_OR_FAIL(auto metadata, + ReadTableMetadataFromResource("TableMetadataV2Valid.json")); + metadata->location = table_location_; + ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata), + IsOk()); + + // Register the table in the catalog. + ICEBERG_UNWRAP_OR_FAIL(table_, + catalog_->RegisterTable(table_ident_, metadata_location)); + } + + const TableIdentifier table_ident_{.name = "test_table"}; + const std::string table_location_{"/warehouse/test_table"}; + std::shared_ptr file_io_; + std::shared_ptr catalog_; + std::shared_ptr
table_; +}; + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc new file mode 100644 index 000000000..08f430415 --- /dev/null +++ b/src/iceberg/transaction.cc @@ -0,0 +1,111 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "iceberg/transaction.h" + +#include "iceberg/catalog.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/table_requirement.h" +#include "iceberg/table_requirements.h" +#include "iceberg/table_update.h" +#include "iceberg/update/update_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Transaction::Transaction(std::shared_ptr
table, Kind kind, bool auto_commit) + : table_(std::move(table)), + kind_(kind), + auto_commit_(auto_commit), + metadata_builder_(TableMetadataBuilder::BuildFrom(table_->metadata().get())) {} + +Transaction::~Transaction() = default; + +Result> Transaction::Make(std::shared_ptr
table, + Kind kind, bool auto_commit) { + if (!table || !table->catalog()) [[unlikely]] { + return InvalidArgument("Table and catalog cannot be null"); + } + return std::shared_ptr( + new Transaction(std::move(table), kind, auto_commit)); +} + +const TableMetadata* Transaction::base() const { return metadata_builder_->base(); } + +const TableMetadata* Transaction::current() const { return metadata_builder_->current(); } + +Status Transaction::AddUpdate(const std::shared_ptr& update) { + if (!last_update_committed_) { + return InvalidArgument("Cannot add update when previous update is not committed"); + } + pending_updates_.emplace_back(std::weak_ptr(update)); + last_update_committed_ = false; + return {}; +} + +Status Transaction::Apply(std::vector> updates) { + for (const auto& update : updates) { + update->ApplyTo(*metadata_builder_); + } + + last_update_committed_ = true; + + if (auto_commit_) { + ICEBERG_RETURN_UNEXPECTED(Commit()); + } + + return {}; +} + +Result> Transaction::Commit() { + if (!last_update_committed_) { + return InvalidArgument( + "Cannot commit transaction when previous update is not committed"); + } + + const auto& updates = metadata_builder_->changes(); + if (updates.empty()) { + return table_; + } + + std::vector> requirements; + switch (kind_) { + case Kind::kCreate: { + ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForCreateTable(updates)); + } break; + case Kind::kUpdate: { + ICEBERG_ASSIGN_OR_RAISE(requirements, TableRequirements::ForUpdateTable( + *metadata_builder_->base(), updates)); + + } break; + } + + // XXX: we should handle commit failure and retry here. + return table_->catalog()->UpdateTable(table_->name(), requirements, updates); +} + +Result> Transaction::NewUpdateProperties() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_properties, + UpdateProperties::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_properties)); + return update_properties; +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 72ba5182c..3a13638cd 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -21,6 +21,7 @@ #pragma once #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" @@ -29,28 +30,61 @@ namespace iceberg { /// \brief A transaction for performing multiple updates to a table -class ICEBERG_EXPORT Transaction { +class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this { public: - virtual ~Transaction() = default; + enum class Kind : uint8_t { kCreate, kUpdate }; + + ~Transaction(); + + /// \brief Create a new transaction + static Result> Make(std::shared_ptr
table, + Kind kind, bool auto_commit); /// \brief Return the Table that this transaction will update - /// - /// \return this transaction's table - virtual const std::shared_ptr
& table() const = 0; + const std::shared_ptr
& table() const { return table_; } - /// \brief Create a new append API to add files to this table - /// - /// \return a new AppendFiles - virtual std::shared_ptr NewAppend() = 0; + /// \brief Return the base metadata without any changes + const TableMetadata* base() const; - /// \brief Apply the pending changes from all actions and commit - /// - /// This method applies all pending data operations and metadata updates in the - /// transaction and commits them to the table in a single atomic operation. + /// \brief Return the current metadata with staged changes applied + const TableMetadata* current() const; + + /// \brief Apply the pending changes from all actions and commit. /// - /// \return Status::OK if the transaction was committed successfully, or an error - /// status if validation failed or the commit encountered conflicts - virtual Status CommitTransaction() = 0; + /// \return Updated table if the transaction was committed successfully, or an error: + /// - ValidationFailed: if any update cannot be applied to the current table metadata. + /// - CommitFailed: if the updates cannot be committed due to conflicts. + Result> Commit(); + + /// \brief Create a new UpdateProperties to update table properties and commit the + /// changes. + Result> NewUpdateProperties(); + + private: + Transaction(std::shared_ptr
table, Kind kind, bool auto_commit); + + Status AddUpdate(const std::shared_ptr& update); + + /// \brief Apply the pending changes to current table. + Status Apply(std::vector> updates); + + friend class PendingUpdate; // Need to access the Apply method. + + private: + // The table that this transaction will update. + std::shared_ptr
table_; + // The kind of this transaction. + const Kind kind_; + // Whether to auto-commit the transaction when updates are applied. + // This is useful when a temporary transaction is created for a single operation. + const bool auto_commit_; + // To make the state simple, we require updates are added and committed in order. + bool last_update_committed_ = true; + // Keep track of all created pending updates. Use weak_ptr to avoid circular references. + // This is useful to retry failed updates. + std::vector> pending_updates_; + // Accumulated updates from all pending updates. + std::unique_ptr metadata_builder_; }; } // namespace iceberg diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 0e1867f60..133a7043c 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -57,6 +57,7 @@ enum class TimeUnit { kMicrosecond, }; +/// \brief Data type family. class BinaryType; class BooleanType; class DateType; @@ -69,12 +70,7 @@ class LongType; class ListType; class MapType; class NestedType; -class PartitionField; -class PartitionSpec; -class PartitionValues; class PrimitiveType; -class Schema; -class SchemaField; class StringType; class StructType; class TimeType; @@ -84,84 +80,103 @@ class TimestampTzType; class Type; class UuidType; -struct Namespace; -struct TableIdentifier; +/// \brief Data values. +class Decimal; +class Uuid; -class Catalog; -class FileIO; -class LocationProvider; +/// \brief Schema. +class Schema; +class SchemaField; + +/// \brief Partition spec and values. +class PartitionField; +class PartitionSpec; +class PartitionValues; + +/// \brief Sort order. class SortField; class SortOrder; -class Table; -class TableProperties; -class Transaction; + +/// \brief Name mapping. +struct MappedField; +class MappedFields; +class NameMapping; + +/// \brief Transform. +enum class TransformType; class Transform; class TransformFunction; -struct PartitionStatisticsFile; -struct Snapshot; -struct SnapshotRef; +/// \brief Table identifier. +struct Namespace; +struct TableIdentifier; +/// \brief Table metadata. +enum class SnapshotRefType; struct MetadataLogEntry; +struct PartitionStatisticsFile; +struct Snapshot; struct SnapshotLogEntry; - +struct SnapshotRef; struct StatisticsFile; struct TableMetadata; -struct MappedField; -class MappedFields; -class NameMapping; - -enum class SnapshotRefType; -enum class TransformType; -enum class ManifestContent; - -class Decimal; -class Uuid; - +/// \brief Expression. +class BoundPredicate; class Expression; class Literal; - -class BoundPredicate; class UnboundPredicate; +/// \brief Scan. class DataTableScan; class FileScanTask; class ScanTask; class TableScan; class TableScanBuilder; +/// \brief Manifest. +enum class ManifestContent; struct DataFile; struct ManifestEntry; struct ManifestFile; struct ManifestList; struct PartitionFieldSummary; - -class PartitionSummary; - class ManifestListReader; class ManifestListWriter; class ManifestReader; class ManifestWriter; +class PartitionSummary; +/// \brief File I/O. struct ReaderOptions; struct WriterOptions; +class FileIO; class Reader; class Writer; +/// \brief Row-based data structures. class ArrayLike; class MapLike; class StructLike; class StructLikeAccessor; +/// \brief Catalog +class Catalog; +class LocationProvider; + +/// \brief Table. +class Table; +class TableProperties; + +/// \brief Table update. +class TableMetadataBuilder; class TableUpdate; class TableRequirement; -class TableMetadataBuilder; class TableUpdateContext; +class Transaction; +/// \brief Update family. class PendingUpdate; -template -class PendingUpdateTyped; class UpdateProperties; /// ---------------------------------------------------------------------------- diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build new file mode 100644 index 000000000..38502b14e --- /dev/null +++ b/src/iceberg/update/meson.build @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +install_headers( + ['pending_update.h', 'update_properties.h'], + subdir: 'iceberg/update', +) diff --git a/src/iceberg/update/pending_update.cc b/src/iceberg/update/pending_update.cc new file mode 100644 index 000000000..4dbc67884 --- /dev/null +++ b/src/iceberg/update/pending_update.cc @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/pending_update.h" + +#include "iceberg/table_update.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +PendingUpdate::PendingUpdate(std::shared_ptr transaction) + : transaction_(std::move(transaction)) {} + +PendingUpdate::~PendingUpdate() = default; + +Status PendingUpdate::Commit() { + ICEBERG_ASSIGN_OR_RAISE(auto apply_result, Apply()); + return transaction_->Apply(std::move(apply_result.updates)); +} + +} // namespace iceberg diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h new file mode 100644 index 000000000..c4618400d --- /dev/null +++ b/src/iceberg/update/pending_update.h @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/update/pending_update.h +/// API for table changes using builder pattern + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/error_collector.h" + +namespace iceberg { + +/// \brief Base class for all kinds of table metadata updates. +/// +/// Any created `PendingUpdate` instance is tracked by the `Transaction` instance +/// and commit is also delegated to the `Transaction` instance. +/// +/// \note Implementations are expected to use builder pattern and errors +/// should be handled by the ErrorCollector base class. +class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { + public: + enum class Kind : uint8_t { + kUpdateProperties, + }; + + /// \brief Return the kind of this pending update. + virtual Kind kind() const = 0; + + struct ApplyResult { + std::vector> updates; + }; + + /// \brief Apply the pending changes and return the uncommitted changes for validation. + /// + /// \note This does not result in a permanent update. + /// \return The uncommitted changes that would be committed by calling Commit(), or an + /// error: + /// - ValidationFailed: the pending changes cannot be applied to the current + /// metadata + /// - InvalidArgument: if pending changes are conflicting or invalid + virtual Result Apply() = 0; + + /// \brief Apply the pending changes and commit. + /// + /// \return An OK status if the commit was successful, or an error: + /// - ValidationFailed: if it cannot be applied to the current table metadata. + /// - CommitFailed: if it cannot be committed due to conflicts. + /// - CommitStateUnknown: unknown status, no cleanup should be done. + virtual Status Commit(); + + // Non-copyable, movable + PendingUpdate(const PendingUpdate&) = delete; + PendingUpdate& operator=(const PendingUpdate&) = delete; + PendingUpdate(PendingUpdate&&) noexcept = default; + PendingUpdate& operator=(PendingUpdate&&) noexcept = default; + + ~PendingUpdate() override; + + protected: + explicit PendingUpdate(std::shared_ptr transaction); + + std::shared_ptr transaction_; +}; + +} // namespace iceberg diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index a4dcd1548..22fca9d5b 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -19,28 +19,33 @@ #include "iceberg/update/update_properties.h" +#include #include #include +#include -#include "iceberg/catalog.h" #include "iceberg/metrics_config.h" #include "iceberg/result.h" -#include "iceberg/table.h" -#include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" -#include "iceberg/table_requirements.h" #include "iceberg/table_update.h" +#include "iceberg/transaction.h" #include "iceberg/util/macros.h" namespace iceberg { -UpdateProperties::UpdateProperties(TableIdentifier identifier, - std::shared_ptr catalog, - std::shared_ptr base) - : identifier_(std::move(identifier)), - catalog_(std::move(catalog)), - base_metadata_(std::move(base)) {} +Result> UpdateProperties::Make( + std::shared_ptr transaction) { + if (!transaction) [[unlikely]] { + return InvalidArgument("Cannot create UpdateProperties without a transaction"); + } + return std::shared_ptr(new UpdateProperties(std::move(transaction))); +} + +UpdateProperties::UpdateProperties(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +UpdateProperties::~UpdateProperties() = default; UpdateProperties& UpdateProperties::Set(const std::string& key, const std::string& value) { @@ -70,65 +75,56 @@ UpdateProperties& UpdateProperties::Remove(const std::string& key) { return *this; } -Status UpdateProperties::Apply() { - if (!catalog_) { - return InvalidArgument("Catalog is required to apply property updates"); - } - if (!base_metadata_) { +Result UpdateProperties::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + const auto* base_metadata = transaction_->base(); + if (!base_metadata) { return InvalidArgument("Base table metadata is required to apply property updates"); } - ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - auto iter = updates_.find(TableProperties::kFormatVersion.key()); if (iter != updates_.end()) { - try { - int parsed_version = std::stoi(iter->second); - if (parsed_version > TableMetadata::kSupportedTableFormatVersion) { - return InvalidArgument( - "Cannot upgrade table to unsupported format version: v{} (supported: v{})", - parsed_version, TableMetadata::kSupportedTableFormatVersion); - } - format_version_ = static_cast(parsed_version); - } catch (const std::invalid_argument& e) { - return InvalidArgument("Invalid format version '{}': not a valid integer", - iter->second); - } catch (const std::out_of_range& e) { - return InvalidArgument("Format version '{}' is out of range", iter->second); + int parsed_version = 0; + const auto& val = iter->second; + auto [ptr, ec] = std::from_chars(val.data(), val.data() + val.size(), parsed_version); + + if (ec == std::errc::invalid_argument) { + return InvalidArgument("Invalid format version '{}': not a valid integer", val); + } else if (ec == std::errc::result_out_of_range) { + return InvalidArgument("Format version '{}' is out of range", val); + } + + if (parsed_version > TableMetadata::kSupportedTableFormatVersion) { + return InvalidArgument( + "Cannot upgrade table to unsupported format version: v{} (supported: v{})", + parsed_version, TableMetadata::kSupportedTableFormatVersion); } + format_version_ = static_cast(parsed_version); updates_.erase(iter); } - if (auto schema = base_metadata_->Schema(); schema.has_value()) { + if (auto schema = base_metadata->Schema(); schema.has_value()) { ICEBERG_RETURN_UNEXPECTED( MetricsConfig::VerifyReferencedColumns(updates_, *schema.value())); } - return {}; -} - -Status UpdateProperties::Commit() { - ICEBERG_RETURN_UNEXPECTED(Apply()); - std::vector> updates; + ApplyResult result; if (!updates_.empty()) { - updates.emplace_back(std::make_unique(std::move(updates_))); + result.updates.emplace_back( + std::make_unique(std::move(updates_))); } if (!removals_.empty()) { - updates.emplace_back(std::make_unique( + result.updates.emplace_back(std::make_unique( std::vector{removals_.begin(), removals_.end()})); } if (format_version_.has_value()) { - updates.emplace_back( + result.updates.emplace_back( std::make_unique(format_version_.value())); }; - if (!updates.empty()) { - ICEBERG_ASSIGN_OR_RAISE(auto requirements, - TableRequirements::ForUpdateTable(*base_metadata_, updates)); - ICEBERG_RETURN_UNEXPECTED(catalog_->UpdateTable(identifier_, requirements, updates)); - } - return {}; + return result; } } // namespace iceberg diff --git a/src/iceberg/update/update_properties.h b/src/iceberg/update/update_properties.h index 0f1adf76a..fc8f46f1a 100644 --- a/src/iceberg/update/update_properties.h +++ b/src/iceberg/update/update_properties.h @@ -20,28 +20,24 @@ #pragma once #include +#include #include #include #include -#include "iceberg/file_format.h" #include "iceberg/iceberg_export.h" -#include "iceberg/pending_update.h" -#include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" namespace iceberg { /// \brief Updates table properties. class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { public: - /// \brief Constructs a UpdateProperties for the specified table. - /// - /// \param identifier The table identifier - /// \param catalog The catalog containing the table - /// \param metadata The current table metadata - UpdateProperties(TableIdentifier identifier, std::shared_ptr catalog, - std::shared_ptr base); + static Result> Make( + std::shared_ptr transaction); + + ~UpdateProperties() override; /// \brief Sets a property key to a specified value. /// @@ -57,25 +53,12 @@ class ICEBERG_EXPORT UpdateProperties : public PendingUpdate { /// \return Reference to this UpdateProperties for chaining UpdateProperties& Remove(const std::string& key); - /// \brief Applies the property changes without committing them. - /// - /// Validates the pending property changes but does not commit them to the table. - /// This method can be used to validate changes before actually committing them. - /// - /// \return Status::OK if the changes are valid, or an error if validation fails - Status Apply() override; + Kind kind() const final { return Kind::kUpdateProperties; } - /// \brief Commits the property changes to the table. - /// - /// Validates the changes and applies them to the table through the catalog. - /// - /// \return OK if the changes are valid and committed successfully, or an error - Status Commit() override; + Result Apply() final; private: - TableIdentifier identifier_; - std::shared_ptr catalog_; - std::shared_ptr base_metadata_; + explicit UpdateProperties(std::shared_ptr transaction); std::unordered_map updates_; std::unordered_set removals_; From abe874a5cad003b8ed6627e9448d8bf249709b79 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 18 Dec 2025 20:45:19 +0800 Subject: [PATCH 2/3] address feedback --- src/iceberg/table.h | 7 +++++-- src/iceberg/table_metadata.h | 6 +++--- src/iceberg/transaction.h | 2 +- src/iceberg/update/update_properties.cc | 5 ++--- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 57f8c43af..efe175828 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -145,7 +145,8 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this
{ std::unique_ptr metadata_cache_; }; -class ICEBERG_EXPORT StagedTable : public Table { +/// \brief A table created by stage-create and not yet committed. +class ICEBERG_EXPORT StagedTable final : public Table { public: static Result> Make( TableIdentifier identifier, std::shared_ptr metadata, @@ -162,7 +163,9 @@ class ICEBERG_EXPORT StagedTable : public Table { using Table::Table; }; -class ICEBERG_EXPORT StaticTable : public Table { +/// \brief A read-only table. + +class ICEBERG_EXPORT StaticTable final : public Table { public: static Result> Make( TableIdentifier identifier, std::shared_ptr metadata, diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index bdfea2309..f93929171 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -419,13 +419,13 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { /// \return A Result containing the constructed TableMetadata or an error Result> Build(); - /// \brief Return the changes made to the table metadata + /// \brief Returns the changes made to the table metadata const std::vector>& changes() const; - /// \brief Return the base metadata without any changes + /// \brief Returns the base metadata without any changes const TableMetadata* base() const; - /// \brief Return the current metadata with staged changes applied + /// \brief Returns the current metadata with staged changes applied const TableMetadata* current() const; /// \brief Destructor diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 3a13638cd..035c6f8ca 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -43,7 +43,7 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this& table() const { return table_; } - /// \brief Return the base metadata without any changes + /// \brief Returns the base metadata without any changes const TableMetadata* base() const; /// \brief Return the current metadata with staged changes applied diff --git a/src/iceberg/update/update_properties.cc b/src/iceberg/update/update_properties.cc index 22fca9d5b..81c6db9fe 100644 --- a/src/iceberg/update/update_properties.cc +++ b/src/iceberg/update/update_properties.cc @@ -78,7 +78,7 @@ UpdateProperties& UpdateProperties::Remove(const std::string& key) { Result UpdateProperties::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - const auto* base_metadata = transaction_->base(); + const auto* base_metadata = transaction_->current(); if (!base_metadata) { return InvalidArgument("Base table metadata is required to apply property updates"); } @@ -112,8 +112,7 @@ Result UpdateProperties::Apply() { ApplyResult result; if (!updates_.empty()) { - result.updates.emplace_back( - std::make_unique(std::move(updates_))); + result.updates.emplace_back(std::make_unique(updates_)); } if (!removals_.empty()) { result.updates.emplace_back(std::make_unique( From 56e8251537967b613fe682dea4a788cceefb7e31 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Thu, 18 Dec 2025 21:37:39 +0800 Subject: [PATCH 3/3] use const ref for current --- src/iceberg/table_metadata.cc | 2 +- src/iceberg/table_metadata.h | 2 +- src/iceberg/transaction.cc | 2 +- src/iceberg/transaction.h | 2 +- src/iceberg/update/update_properties.cc | 7 +------ 5 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 3d5f4cf1b..61bb8e089 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -796,6 +796,6 @@ const std::vector>& TableMetadataBuilder::changes() const TableMetadata* TableMetadataBuilder::base() const { return impl_->base; } -const TableMetadata* TableMetadataBuilder::current() const { return &impl_->metadata; } +const TableMetadata& TableMetadataBuilder::current() const { return impl_->metadata; } } // namespace iceberg diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index f93929171..f7c260114 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -426,7 +426,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public ErrorCollector { const TableMetadata* base() const; /// \brief Returns the current metadata with staged changes applied - const TableMetadata* current() const; + const TableMetadata& current() const; /// \brief Destructor ~TableMetadataBuilder() override; diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index 08f430415..ca39ec043 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -49,7 +49,7 @@ Result> Transaction::Make(std::shared_ptr
ta const TableMetadata* Transaction::base() const { return metadata_builder_->base(); } -const TableMetadata* Transaction::current() const { return metadata_builder_->current(); } +const TableMetadata& Transaction::current() const { return metadata_builder_->current(); } Status Transaction::AddUpdate(const std::shared_ptr& update) { if (!last_update_committed_) { diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 035c6f8ca..36328026b 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -47,7 +47,7 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this UpdateProperties::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); - const auto* base_metadata = transaction_->current(); - if (!base_metadata) { - return InvalidArgument("Base table metadata is required to apply property updates"); - } - auto iter = updates_.find(TableProperties::kFormatVersion.key()); if (iter != updates_.end()) { int parsed_version = 0; @@ -105,7 +100,7 @@ Result UpdateProperties::Apply() { updates_.erase(iter); } - if (auto schema = base_metadata->Schema(); schema.has_value()) { + if (auto schema = transaction_->current().Schema(); schema.has_value()) { ICEBERG_RETURN_UNEXPECTED( MetricsConfig::VerifyReferencedColumns(updates_, *schema.value())); }