2020#include " iceberg/table_metadata.h"
2121
2222#include < algorithm>
23+ #include < atomic>
2324#include < charconv>
2425#include < chrono>
2526#include < cstdint>
3738#include " iceberg/exception.h"
3839#include " iceberg/file_io.h"
3940#include " iceberg/json_internal.h"
41+ #include " iceberg/metrics_config.h"
4042#include " iceberg/partition_field.h"
4143#include " iceberg/partition_spec.h"
4244#include " iceberg/result.h"
5052#include " iceberg/util/gzip_internal.h"
5153#include " iceberg/util/location_util.h"
5254#include " iceberg/util/macros.h"
55+ #include " iceberg/util/property_util.h"
56+ #include " iceberg/util/type_util.h"
5357#include " iceberg/util/uuid.h"
5458namespace iceberg {
5559namespace {
5660const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
5761constexpr int32_t kLastAdded = -1 ;
5862constexpr std::string_view kMetadataFolderName = " metadata" ;
63+
64+ // TableMetadata private static methods
65+ Result<std::shared_ptr<PartitionSpec>> FreshPartitionSpec (
66+ int32_t spec_id, const PartitionSpec& spec, const Schema& base_schema,
67+ const Schema& fresh_schema, std::function<int32_t ()> next_id) {
68+ std::vector<PartitionField> partition_fields;
69+ for (auto & field : spec.fields ()) {
70+ ICEBERG_ASSIGN_OR_RAISE (auto source_name,
71+ base_schema.FindColumnNameById (field.source_id ()));
72+ int32_t source_id;
73+ if (!source_name.has_value ()) {
74+ // In the case of a source field not found, the column has been deleted.
75+ // This only happens in V1 tables where the reference is still around as a void
76+ // transform
77+ source_id = field.source_id ();
78+ } else {
79+ ICEBERG_ASSIGN_OR_RAISE (auto fresh_field,
80+ fresh_schema.FindFieldByName (source_name.value ()));
81+ if (!fresh_field.has_value ()) [[unlikely]] {
82+ return InvalidSchema (" Partition field {} does not exist in the schema" ,
83+ source_name.value ());
84+ }
85+ source_id = fresh_field.value ().get ().field_id ();
86+ }
87+ partition_fields.emplace_back (source_id, next_id ? next_id () : field.field_id (),
88+ std::string (field.name ()), field.transform ());
89+ }
90+ return PartitionSpec::Make (fresh_schema, spec_id, std::move (partition_fields), false );
91+ }
92+
93+ Result<std::shared_ptr<SortOrder>> FreshSortOrder (int32_t order_id, const Schema& schema,
94+ const SortOrder& order) {
95+ if (order.is_unsorted ()) {
96+ return SortOrder::Unsorted ();
97+ }
98+
99+ std::vector<SortField> fresh_fields;
100+ for (const auto & field : order.fields ()) {
101+ ICEBERG_ASSIGN_OR_RAISE (auto source_name,
102+ schema.FindColumnNameById (field.source_id ()));
103+ if (!source_name.has_value ()) {
104+ return InvalidSchema (" Unable to find source field with ID {} in the old schema" ,
105+ field.source_id ());
106+ }
107+
108+ ICEBERG_ASSIGN_OR_RAISE (auto fresh_field,
109+ schema.FindFieldByName (source_name.value ()));
110+ if (!fresh_field.has_value ()) {
111+ return InvalidSchema (" Unable to find field '{}' in the new schema" ,
112+ source_name.value ());
113+ }
114+
115+ int32_t new_source_id = fresh_field.value ().get ().field_id ();
116+ fresh_fields.emplace_back (new_source_id, field.transform (), field.direction (),
117+ field.null_order ());
118+ }
119+
120+ return SortOrder::Make (order_id, std::move (fresh_fields));
121+ }
59122} // namespace
60123
61124std::string ToString (const SnapshotLogEntry& entry) {
@@ -68,6 +131,53 @@ std::string ToString(const MetadataLogEntry& entry) {
68131 entry.metadata_file );
69132}
70133
134+ Result<std::unique_ptr<TableMetadata>> TableMetadata::Make (
135+ const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
136+ const iceberg::SortOrder& sort_order, const std::string& location,
137+ const std::unordered_map<std::string, std::string>& properties, int format_version) {
138+ for (const auto & [key, _] : properties) {
139+ if (TableProperties::reserved_properties ().contains (key)) {
140+ return InvalidArgument (
141+ " Table properties should not contain reserved properties, but got {}" , key);
142+ }
143+ }
144+
145+ // Reassign all column ids to ensure consistency
146+ std::atomic<int32_t > last_column_id = 0 ;
147+ auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; };
148+ ICEBERG_ASSIGN_OR_RAISE (auto fresh_schema,
149+ AssignFreshIds (Schema::kInitialSchemaId , schema, next_id));
150+
151+ // Rebuild the partition spec using the new column ids
152+ std::atomic<int32_t > last_partition_field_id = PartitionSpec::kInvalidPartitionFieldId ;
153+ auto next_partition_field_id = [&last_partition_field_id]() -> int32_t {
154+ return ++last_partition_field_id;
155+ };
156+ ICEBERG_ASSIGN_OR_RAISE (auto fresh_spec,
157+ FreshPartitionSpec (PartitionSpec::kInitialSpecId , spec, schema,
158+ *fresh_schema, next_partition_field_id));
159+
160+ // rebuild the sort order using the new column ids
161+ int32_t fresh_order_id =
162+ sort_order.is_unsorted () ? sort_order.order_id () : SortOrder::kInitialSortOrderId ;
163+ ICEBERG_ASSIGN_OR_RAISE (auto fresh_order,
164+ FreshSortOrder (fresh_order_id, *fresh_schema, sort_order))
165+
166+ // Validata the metrics configuration.
167+ ICEBERG_RETURN_UNEXPECTED (
168+ MetricsConfig::VerifyReferencedColumns (properties, *fresh_schema));
169+
170+ PropertyUtil::ValidateCommitProperties (properties);
171+
172+ return TableMetadataBuilder::BuildFromEmpty (format_version)
173+ ->SetLocation (location)
174+ .SetCurrentSchema (std::move (fresh_schema), last_column_id.load ())
175+ .SetDefaultPartitionSpec (std::move (fresh_spec))
176+ .SetDefaultSortOrder (std::move (fresh_order))
177+ .SetProperties (properties)
178+ .Build ();
179+ }
180+
71181Result<std::shared_ptr<Schema>> TableMetadata::Schema () const {
72182 return SchemaById (current_schema_id);
73183}
@@ -408,6 +518,10 @@ class TableMetadataBuilder::Impl {
408518 const TableMetadata* base () const { return base_; }
409519 const TableMetadata& metadata () const { return metadata_; }
410520
521+ void SetLocation (std::string_view location) {
522+ metadata_.location = std::string (location);
523+ }
524+
411525 void SetMetadataLocation (std::string_view metadata_location) {
412526 metadata_location_ = std::string (metadata_location);
413527 if (base_ != nullptr ) {
@@ -917,7 +1031,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveProperties(
9171031}
9181032
9191033TableMetadataBuilder& TableMetadataBuilder::SetLocation (std::string_view location) {
920- throw IcebergError (std::format (" {} not implemented" , __FUNCTION__));
1034+ impl_->SetLocation (location);
1035+ return *this ;
9211036}
9221037
9231038TableMetadataBuilder& TableMetadataBuilder::AddEncryptionKey (
0 commit comments