Skip to content

Commit a399aee

Browse files
committed
feat: InMemoryCatalog::CreateTable & StageCreateTable api
1 parent f952779 commit a399aee

14 files changed

+356
-12
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ set(ICEBERG_SOURCES
8383
util/decimal.cc
8484
util/gzip_internal.cc
8585
util/murmurhash3_internal.cc
86+
util/property_util.cc
8687
util/snapshot_util.cc
8788
util/temporal_util.cc
8889
util/timepoint.cc

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
#include "iceberg/table_identifier.h"
2727
#include "iceberg/table_metadata.h"
2828
#include "iceberg/table_requirement.h"
29+
#include "iceberg/table_requirements.h"
2930
#include "iceberg/table_update.h"
31+
#include "iceberg/transaction.h"
3032
#include "iceberg/util/macros.h"
3133

3234
namespace iceberg {
@@ -318,7 +320,7 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
318320
ICEBERG_RETURN_UNEXPECTED(ns);
319321
const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
320322
if (it == ns.value()->table_metadata_locations_.end()) {
321-
return NotFound("{} does not exist", table_ident.name);
323+
return NotFound("Table does not exist: {}", table_ident);
322324
}
323325
return it->second;
324326
}
@@ -405,32 +407,66 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
405407
const std::string& location,
406408
const std::unordered_map<std::string, std::string>& properties) {
407409
std::unique_lock lock(mutex_);
408-
return NotImplemented("create table");
410+
if (root_namespace_->TableExists(identifier).value_or(false)) {
411+
return AlreadyExists("Table already exists: {}", identifier);
412+
}
413+
414+
std::string base_location =
415+
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;
416+
417+
ICEBERG_ASSIGN_OR_RAISE(auto table_metadata, TableMetadata::Make(*schema, *spec, *order,
418+
location, properties));
419+
420+
ICEBERG_ASSIGN_OR_RAISE(
421+
auto metadata_file_location,
422+
TableMetadataUtil::Write(*file_io_, nullptr, "", *table_metadata));
423+
ICEBERG_RETURN_UNEXPECTED(
424+
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
425+
return Table::Make(identifier, std::move(table_metadata),
426+
std::move(metadata_file_location), file_io_,
427+
std::static_pointer_cast<Catalog>(shared_from_this()));
409428
}
410429

411430
Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
412431
const TableIdentifier& identifier,
413432
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
414433
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
415434
std::unique_lock lock(mutex_);
416-
ICEBERG_ASSIGN_OR_RAISE(auto base_metadata_location,
417-
root_namespace_->GetTableMetadataLocation(identifier));
418-
419-
ICEBERG_ASSIGN_OR_RAISE(auto base,
420-
TableMetadataUtil::Read(*file_io_, base_metadata_location));
435+
auto base_metadata_location = root_namespace_->GetTableMetadataLocation(identifier);
436+
std::unique_ptr<TableMetadata> base;
437+
std::unique_ptr<TableMetadataBuilder> builder;
438+
ICEBERG_ASSIGN_OR_RAISE(auto is_create, TableRequirements::IsCreate(requirements));
439+
if (is_create) {
440+
if (base_metadata_location.has_value()) {
441+
return AlreadyExists("Table already exists: {}", identifier);
442+
}
443+
int8_t format_version = TableMetadata::kDefaultTableFormatVersion;
444+
for (const auto& update : updates) {
445+
if (update->kind() == TableUpdate::Kind::kUpgradeFormatVersion) {
446+
format_version =
447+
dynamic_cast<const table::UpgradeFormatVersion&>(*update).format_version();
448+
}
449+
}
450+
builder = TableMetadataBuilder::BuildFromEmpty(format_version);
451+
} else {
452+
ICEBERG_RETURN_UNEXPECTED(base_metadata_location);
453+
ICEBERG_ASSIGN_OR_RAISE(
454+
base, TableMetadataUtil::Read(*file_io_, base_metadata_location.value()));
455+
builder = TableMetadataBuilder::BuildFrom(base.get());
456+
}
421457

422458
for (const auto& requirement : requirements) {
423459
ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get()));
424460
}
425461

426-
auto builder = TableMetadataBuilder::BuildFrom(base.get());
427462
for (const auto& update : updates) {
428463
update->ApplyTo(*builder);
429464
}
430465
ICEBERG_ASSIGN_OR_RAISE(auto updated, builder->Build());
431466
ICEBERG_ASSIGN_OR_RAISE(
432467
auto new_metadata_location,
433-
TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location, *updated));
468+
TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location.value(),
469+
*updated));
434470
ICEBERG_RETURN_UNEXPECTED(
435471
root_namespace_->UpdateTableMetadataLocation(identifier, new_metadata_location));
436472
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);
@@ -445,7 +481,20 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
445481
const std::string& location,
446482
const std::unordered_map<std::string, std::string>& properties) {
447483
std::unique_lock lock(mutex_);
448-
return NotImplemented("stage create table");
484+
if (root_namespace_->TableExists(identifier).value_or(false)) {
485+
return AlreadyExists("Table already exists: {}", identifier);
486+
}
487+
488+
std::string base_location =
489+
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;
490+
491+
ICEBERG_ASSIGN_OR_RAISE(
492+
auto table_metadata,
493+
TableMetadata::Make(*schema, *spec, *order, base_location, properties));
494+
ICEBERG_ASSIGN_OR_RAISE(
495+
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
496+
shared_from_this()));
497+
return Transaction::Make(std::move(table), Transaction::Kind::kCreate, false);
449498
}
450499

451500
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
@@ -495,7 +544,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
495544

496545
std::unique_lock lock(mutex_);
497546
if (!root_namespace_->NamespaceExists(identifier.ns)) {
498-
return NoSuchNamespace("table namespace does not exist.");
547+
return NoSuchNamespace("Table namespace does not exist: {}", identifier.ns);
499548
}
500549
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
501550
return UnknownError("The registry failed.");

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ iceberg_sources = files(
105105
'util/decimal.cc',
106106
'util/gzip_internal.cc',
107107
'util/murmurhash3_internal.cc',
108+
'util/property_util.cc',
108109
'util/snapshot_util.cc',
109110
'util/temporal_util.cc',
110111
'util/timepoint.cc',

src/iceberg/table_identifier.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
/// \file iceberg/table_identifier.h
2323
/// A TableIdentifier is a unique identifier for a table
2424

25+
#include <format>
26+
#include <sstream>
2527
#include <string>
2628
#include <vector>
2729

@@ -35,6 +37,15 @@ struct ICEBERG_EXPORT Namespace {
3537
std::vector<std::string> levels;
3638

3739
bool operator==(const Namespace& other) const { return levels == other.levels; }
40+
41+
std::string ToString() const {
42+
std::ostringstream oss;
43+
for (size_t i = 0; i < levels.size(); ++i) {
44+
if (i) oss << '.';
45+
oss << levels[i];
46+
}
47+
return oss.str();
48+
}
3849
};
3950

4051
/// \brief Identifies a table in iceberg catalog.
@@ -53,6 +64,27 @@ struct ICEBERG_EXPORT TableIdentifier {
5364
}
5465
return {};
5566
}
67+
68+
std::string ToString() const { return ns.ToString() + '.' + name; }
5669
};
5770

5871
} // namespace iceberg
72+
73+
namespace std {
74+
75+
template <>
76+
struct formatter<iceberg::Namespace> : std::formatter<std::string> {
77+
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
78+
auto format(const iceberg::Namespace& ns, format_context& ctx) const {
79+
return std::formatter<std::string>::format(ns.ToString(), ctx);
80+
}
81+
};
82+
83+
template <>
84+
struct formatter<iceberg::TableIdentifier> : std::formatter<std::string> {
85+
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
86+
auto format(const iceberg::TableIdentifier& id, format_context& ctx) const {
87+
return std::formatter<std::string>::format(id.ToString(), ctx);
88+
}
89+
};
90+
} // namespace std

src/iceberg/table_metadata.cc

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "iceberg/table_metadata.h"
2121

2222
#include <algorithm>
23+
#include <atomic>
2324
#include <charconv>
2425
#include <chrono>
2526
#include <cstdint>
@@ -36,6 +37,7 @@
3637
#include "iceberg/exception.h"
3738
#include "iceberg/file_io.h"
3839
#include "iceberg/json_internal.h"
40+
#include "iceberg/metrics_config.h"
3941
#include "iceberg/partition_spec.h"
4042
#include "iceberg/result.h"
4143
#include "iceberg/schema.h"
@@ -47,12 +49,73 @@
4749
#include "iceberg/util/gzip_internal.h"
4850
#include "iceberg/util/location_util.h"
4951
#include "iceberg/util/macros.h"
52+
#include "iceberg/util/property_util.h"
53+
#include "iceberg/util/type_util.h"
5054
#include "iceberg/util/uuid.h"
5155
namespace iceberg {
5256
namespace {
5357
const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min();
5458
constexpr int32_t kLastAdded = -1;
5559
constexpr std::string_view kMetadataFolderName = "metadata";
60+
61+
// TableMetadata private static methods
62+
Result<std::shared_ptr<PartitionSpec>> FreshPartitionSpec(
63+
int32_t spec_id, const PartitionSpec& spec, const Schema& base_schema,
64+
const Schema& fresh_schema, std::function<int32_t()> next_id) {
65+
std::vector<PartitionField> partition_fields;
66+
for (auto& field : spec.fields()) {
67+
ICEBERG_ASSIGN_OR_RAISE(auto source_name,
68+
base_schema.FindColumnNameById(field.source_id()));
69+
int32_t source_id;
70+
if (!source_name.has_value()) {
71+
// In the case of a source field not found, the column has been deleted.
72+
// This only happens in V1 tables where the reference is still around as a void
73+
// transform
74+
source_id = field.source_id();
75+
} else {
76+
ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
77+
fresh_schema.FindFieldByName(source_name.value()));
78+
if (!fresh_field.has_value()) [[unlikely]] {
79+
return InvalidSchema("Partition field {} does not exist in the schema",
80+
source_name.value());
81+
}
82+
source_id = fresh_field.value().get().field_id();
83+
}
84+
partition_fields.emplace_back(source_id, next_id ? next_id() : field.field_id(),
85+
std::string(field.name()), field.transform());
86+
}
87+
return PartitionSpec::Make(fresh_schema, spec_id, std::move(partition_fields), false);
88+
}
89+
90+
Result<std::shared_ptr<SortOrder>> FreshSortOrder(int32_t order_id, const Schema& schema,
91+
const SortOrder& order) {
92+
if (order.is_unsorted()) {
93+
return SortOrder::Unsorted();
94+
}
95+
96+
std::vector<SortField> fresh_fields;
97+
for (const auto& field : order.fields()) {
98+
ICEBERG_ASSIGN_OR_RAISE(auto source_name,
99+
schema.FindColumnNameById(field.source_id()));
100+
if (!source_name.has_value()) {
101+
return InvalidSchema("Unable to find source field with ID {} in the old schema",
102+
field.source_id());
103+
}
104+
105+
ICEBERG_ASSIGN_OR_RAISE(auto fresh_field,
106+
schema.FindFieldByName(source_name.value()));
107+
if (!fresh_field.has_value()) {
108+
return InvalidSchema("Unable to find field '{}' in the new schema",
109+
source_name.value());
110+
}
111+
112+
int32_t new_source_id = fresh_field.value().get().field_id();
113+
fresh_fields.emplace_back(new_source_id, field.transform(), field.direction(),
114+
field.null_order());
115+
}
116+
117+
return SortOrder::Make(order_id, std::move(fresh_fields));
118+
}
56119
} // namespace
57120

58121
std::string ToString(const SnapshotLogEntry& entry) {
@@ -65,6 +128,53 @@ std::string ToString(const MetadataLogEntry& entry) {
65128
entry.metadata_file);
66129
}
67130

131+
Result<std::unique_ptr<TableMetadata>> TableMetadata::Make(
132+
const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
133+
const iceberg::SortOrder& sort_order, const std::string& location,
134+
const std::unordered_map<std::string, std::string>& properties, int format_version) {
135+
for (const auto& [key, _] : properties) {
136+
if (TableProperties::reserved_properties().contains(key)) {
137+
return InvalidArgument(
138+
"Table properties should not contain reserved properties, but got {}", key);
139+
}
140+
}
141+
142+
// Reassign all column ids to ensure consistency
143+
std::atomic<int32_t> last_column_id = 0;
144+
auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; };
145+
ICEBERG_ASSIGN_OR_RAISE(auto fresh_schema,
146+
AssignFreshIds(Schema::kInitialSchemaId, schema, next_id));
147+
148+
// Rebuild the partition spec using the new column ids
149+
std::atomic<int32_t> last_partition_field_id = PartitionSpec::kInvalidPartitionFieldId;
150+
auto next_partition_field_id = [&last_partition_field_id]() -> int32_t {
151+
return ++last_partition_field_id;
152+
};
153+
ICEBERG_ASSIGN_OR_RAISE(auto fresh_spec,
154+
FreshPartitionSpec(PartitionSpec::kInitialSpecId, spec, schema,
155+
*fresh_schema, next_partition_field_id));
156+
157+
// rebuild the sort order using the new column ids
158+
int32_t fresh_order_id =
159+
sort_order.is_unsorted() ? sort_order.order_id() : SortOrder::kInitialSortOrderId;
160+
ICEBERG_ASSIGN_OR_RAISE(auto fresh_order,
161+
FreshSortOrder(fresh_order_id, *fresh_schema, sort_order))
162+
163+
// Validata the metrics configuration.
164+
ICEBERG_RETURN_UNEXPECTED(
165+
MetricsConfig::VerifyReferencedColumns(properties, *fresh_schema));
166+
167+
PropertyUtil::ValidateCommitProperties(properties);
168+
169+
return TableMetadataBuilder::BuildFromEmpty(format_version)
170+
->SetLocation(location)
171+
.SetCurrentSchema(std::move(fresh_schema), last_column_id.load())
172+
.SetDefaultPartitionSpec(std::move(fresh_spec))
173+
.SetDefaultSortOrder(std::move(fresh_order))
174+
.SetProperties(properties)
175+
.Build();
176+
}
177+
68178
Result<std::shared_ptr<Schema>> TableMetadata::Schema() const {
69179
return SchemaById(current_schema_id);
70180
}
@@ -405,6 +515,10 @@ class TableMetadataBuilder::Impl {
405515
const TableMetadata* base() const { return base_; }
406516
const TableMetadata& metadata() const { return metadata_; }
407517

518+
void SetLocation(std::string_view location) {
519+
metadata_.location = std::string(location);
520+
}
521+
408522
void SetMetadataLocation(std::string_view metadata_location) {
409523
metadata_location_ = std::string(metadata_location);
410524
if (base_ != nullptr) {
@@ -826,7 +940,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveProperties(
826940
}
827941

828942
TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) {
829-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
943+
impl_->SetLocation(location);
944+
return *this;
830945
}
831946

832947
TableMetadataBuilder& TableMetadataBuilder::AddEncryptionKey(

src/iceberg/table_metadata.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ struct ICEBERG_EXPORT TableMetadata {
124124
/// A `long` higher than all assigned row IDs
125125
int64_t next_row_id;
126126

127+
static Result<std::unique_ptr<TableMetadata>> Make(
128+
const iceberg::Schema& schema, const iceberg::PartitionSpec& spec,
129+
const iceberg::SortOrder& sort_order, const std::string& location,
130+
const std::unordered_map<std::string, std::string>& properties,
131+
int format_version = kDefaultTableFormatVersion);
132+
127133
/// \brief Get the current schema, return NotFoundError if not found
128134
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
129135
/// \brief Get the current schema by ID, return NotFoundError if not found

src/iceberg/table_properties.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ const std::unordered_set<std::string>& TableProperties::reserved_properties() {
3131
return kReservedProperties;
3232
}
3333

34+
const std::unordered_set<std::string>& TableProperties::commit_properties() {
35+
static const std::unordered_set<std::string> kCommitProperties = {
36+
kCommitNumRetries.key(), kCommitMinRetryWaitMs.key(), kCommitMaxRetryWaitMs.key(),
37+
kCommitTotalRetryTimeMs.key()};
38+
return kCommitProperties;
39+
}
40+
3441
TableProperties TableProperties::default_properties() { return {}; }
3542

3643
TableProperties TableProperties::FromMap(

src/iceberg/table_properties.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,9 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase<TableProperties> {
286286
/// \return The set of reserved property keys
287287
static const std::unordered_set<std::string>& reserved_properties();
288288

289+
/// \brief Get the set of commit table property keys.
290+
static const std::unordered_set<std::string>& commit_properties();
291+
289292
/// \brief Create a default TableProperties instance.
290293
///
291294
/// \return A unique pointer to a TableProperties instance with default values

0 commit comments

Comments
 (0)