Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/iceberg/expression/manifest_evaluator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,10 @@ Result<std::unique_ptr<ManifestEvaluator>> ManifestEvaluator::MakePartitionFilte
std::shared_ptr<Expression> expr, const std::shared_ptr<PartitionSpec>& spec,
const Schema& schema, bool case_sensitive) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(schema));
auto field_span = partition_type->fields();
std::vector<SchemaField> fields(field_span.begin(), field_span.end());
auto partition_schema = std::make_shared<Schema>(fields);
ICEBERG_ASSIGN_OR_RAISE(auto rewrite_expr, RewriteNot::Visit(std::move(expr)));
ICEBERG_ASSIGN_OR_RAISE(auto partition_expr,
Binder::Bind(*partition_schema, rewrite_expr, case_sensitive));
ICEBERG_ASSIGN_OR_RAISE(
auto partition_expr,
Binder::Bind(*partition_type->ToSchema(), rewrite_expr, case_sensitive));
return std::unique_ptr<ManifestEvaluator>(
new ManifestEvaluator(std::move(partition_expr)));
}
Expand Down
21 changes: 10 additions & 11 deletions src/iceberg/expression/residual_evaluator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/transform.h"
#include "iceberg/type.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand All @@ -42,8 +43,7 @@ class ResidualVisitor : public BoundVisitor<std::shared_ptr<Expression>> {
const StructLike& partition_data,
bool case_sensitive) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec.PartitionType(schema));
auto partition_schema = FromStructType(std::move(*partition_type), std::nullopt);
return ResidualVisitor(spec, schema, std::move(partition_schema), partition_data,
return ResidualVisitor(spec, schema, std::move(partition_type), partition_data,
case_sensitive);
}

Expand Down Expand Up @@ -202,17 +202,17 @@ class ResidualVisitor : public BoundVisitor<std::shared_ptr<Expression>> {

private:
ResidualVisitor(const PartitionSpec& spec, const Schema& schema,
std::unique_ptr<Schema> partition_schema,
std::unique_ptr<StructType> partition_type,
const StructLike& partition_data, bool case_sensitive)
: spec_(spec),
schema_(schema),
partition_schema_(std::move(partition_schema)),
partition_type_(std::move(partition_type)),
partition_data_(partition_data),
case_sensitive_(case_sensitive) {}

const PartitionSpec& spec_;
const Schema& schema_;
std::unique_ptr<Schema> partition_schema_;
std::unique_ptr<StructType> partition_type_;
const StructLike& partition_data_;
bool case_sensitive_;
};
Expand All @@ -235,6 +235,7 @@ Result<std::shared_ptr<Expression>> ResidualVisitor::Predicate(
// Not associated with a partition field, can't be evaluated
return pred;
}
auto schema = partition_type_->ToSchema();

for (const auto& part : parts) {
// Check the strict projection
Expand All @@ -243,9 +244,8 @@ Result<std::shared_ptr<Expression>> ResidualVisitor::Predicate(
std::shared_ptr<Expression> strict_result = nullptr;

if (strict_projection != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(
auto bound_strict,
strict_projection->Bind(*partition_schema_, case_sensitive_));
ICEBERG_ASSIGN_OR_RAISE(auto bound_strict,
strict_projection->Bind(*schema, case_sensitive_));
if (bound_strict->is_bound_predicate()) {
ICEBERG_ASSIGN_OR_RAISE(
strict_result, BoundVisitor::Predicate(
Expand All @@ -268,9 +268,8 @@ Result<std::shared_ptr<Expression>> ResidualVisitor::Predicate(
std::shared_ptr<Expression> inclusive_result = nullptr;

if (inclusive_projection != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(
auto bound_inclusive,
inclusive_projection->Bind(*partition_schema_, case_sensitive_));
ICEBERG_ASSIGN_OR_RAISE(auto bound_inclusive,
inclusive_projection->Bind(*schema, case_sensitive_));

if (bound_inclusive->is_bound_predicate()) {
ICEBERG_ASSIGN_OR_RAISE(
Expand Down
10 changes: 5 additions & 5 deletions src/iceberg/row/struct_like.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ StructLikeAccessor::StructLikeAccessor(std::shared_ptr<Type> type,
return std::get<std::shared_ptr<StructLike>>(first_level_field)->GetField(pos1);
};
} else if (!position_path.empty()) {
accessor_ = [position_path](const StructLike& struct_like) -> Result<Scalar> {
accessor_ = [this](const StructLike& struct_like) -> Result<Scalar> {
std::vector<std::shared_ptr<StructLike>> backups;
const StructLike* current_struct_like = &struct_like;
for (size_t i = 0; i < position_path.size() - 1; ++i) {
for (size_t i = 0; i < position_path_.size() - 1; ++i) {
ICEBERG_ASSIGN_OR_RAISE(auto field,
current_struct_like->GetField(position_path[i]));
current_struct_like->GetField(position_path_[i]));
if (!std::holds_alternative<std::shared_ptr<StructLike>>(field)) {
return InvalidSchema("Encountered non-struct in the position path [{}]",
position_path);
position_path_);
}
backups.push_back(std::get<std::shared_ptr<StructLike>>(field));
current_struct_like = backups.back().get();
}
return current_struct_like->GetField(position_path.back());
return current_struct_like->GetField(position_path_.back());
};
} else {
accessor_ = [](const StructLike&) -> Result<Scalar> {
Expand Down
11 changes: 7 additions & 4 deletions src/iceberg/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,22 @@ class ICEBERG_EXPORT Schema : public StructType {

std::string ToString() const override;

/// \brief Find the SchemaField by field name.
/// \brief Recursively find the SchemaField by field name.
///
/// Short names for maps and lists are included for any name that does not conflict with
/// a canonical name. For example, a list, 'l', of structs with field 'x' will produce
/// short name 'l.x' in addition to canonical name 'l.element.x'. a map 'm', if its
/// short name 'l.x' in addition to canonical name 'l.element.x'. A map 'm', if its
/// value include a structs with field 'x' wil produce short name 'm.x' in addition to
/// canonical name 'm.value.x'
/// canonical name 'm.value.x'.
/// FIXME: Currently only handles ASCII lowercase conversion; extend to support
/// non-ASCII characters (e.g., using std::towlower or ICU)
Result<std::optional<std::reference_wrapper<const SchemaField>>> FindFieldByName(
std::string_view name, bool case_sensitive = true) const;

/// \brief Find the SchemaField by field id.
/// \brief Recursively find the SchemaField by field id.
///
/// \param field_id The id of the field to get the accessor for.
/// \return The field with the given id, or std::nullopt if not found.
Result<std::optional<std::reference_wrapper<const SchemaField>>> FindFieldById(
int32_t field_id) const;

Expand Down
4 changes: 2 additions & 2 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,13 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co

// Get the table schema and partition type
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, context_.table_metadata->Schema());
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructType> partition_schema,
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<StructType> partition_type,
partition_spec->PartitionType(*current_schema));

for (const auto& manifest_file : manifest_files) {
ICEBERG_ASSIGN_OR_RAISE(
auto manifest_reader,
ManifestReader::Make(manifest_file, file_io_, partition_schema));
ManifestReader::Make(manifest_file, file_io_, partition_type));
ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());

// TODO(gty404): filter manifests using partition spec and filter expression
Expand Down
7 changes: 3 additions & 4 deletions src/iceberg/test/manifest_writer_versions_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,9 @@ class ManifestWriterVersionsTest : public ::testing::Test {
}

std::vector<ManifestEntry> ReadManifest(const ManifestFile& manifest_file) {
auto partition_schema_result = spec_->PartitionType(*schema_);
EXPECT_THAT(partition_schema_result, IsOk());
std::shared_ptr<StructType> partition_type =
std::move(partition_schema_result.value());
auto partition_type_result = spec_->PartitionType(*schema_);
EXPECT_THAT(partition_type_result, IsOk());
std::shared_ptr<StructType> partition_type = std::move(partition_type_result.value());
auto reader_result = ManifestReader::Make(manifest_file, file_io_, partition_type);
EXPECT_THAT(reader_result, IsOk());

Expand Down
21 changes: 21 additions & 0 deletions src/iceberg/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <utility>

#include "iceberg/exception.h"
#include "iceberg/schema.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"
Expand All @@ -48,6 +49,7 @@ std::string StructType::ToString() const {
repr += ">";
return repr;
}

std::span<const SchemaField> StructType::fields() const { return fields_; }
Result<std::optional<NestedType::SchemaFieldConstRef>> StructType::GetFieldById(
int32_t field_id) const {
Expand All @@ -56,13 +58,15 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> StructType::GetFieldById(
if (it == field_by_id.get().end()) return std::nullopt;
return it->second;
}

Result<std::optional<NestedType::SchemaFieldConstRef>> StructType::GetFieldByIndex(
int32_t index) const {
if (index < 0 || static_cast<size_t>(index) >= fields_.size()) {
return InvalidArgument("Invalid index {} to get field from struct", index);
}
return fields_[index];
}

Result<std::optional<NestedType::SchemaFieldConstRef>> StructType::GetFieldByName(
std::string_view name, bool case_sensitive) const {
if (case_sensitive) {
Expand All @@ -81,13 +85,19 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> StructType::GetFieldByNam
}
return std::nullopt;
}

std::unique_ptr<Schema> StructType::ToSchema() const {
return std::make_unique<Schema>(fields_);
}

bool StructType::Equals(const Type& other) const {
if (other.type_id() != TypeId::kStruct) {
return false;
}
const auto& struct_ = static_cast<const StructType&>(other);
return fields_ == struct_.fields_;
}

Result<std::unordered_map<int32_t, StructType::SchemaFieldConstRef>>
StructType::InitFieldById(const StructType& self) {
std::unordered_map<int32_t, SchemaFieldConstRef> field_by_id;
Expand All @@ -100,6 +110,7 @@ StructType::InitFieldById(const StructType& self) {
}
return field_by_id;
}

Result<std::unordered_map<std::string_view, StructType::SchemaFieldConstRef>>
StructType::InitFieldByName(const StructType& self) {
std::unordered_map<std::string_view, StructType::SchemaFieldConstRef> field_by_name;
Expand All @@ -113,6 +124,7 @@ StructType::InitFieldByName(const StructType& self) {
}
return field_by_name;
}

Result<std::unordered_map<std::string, StructType::SchemaFieldConstRef>>
StructType::InitFieldByLowerCaseName(const StructType& self) {
std::unordered_map<std::string, SchemaFieldConstRef> field_by_lowercase_name;
Expand Down Expand Up @@ -146,6 +158,7 @@ std::string ListType::ToString() const {
repr += ">";
return repr;
}

std::span<const SchemaField> ListType::fields() const { return {&element_, 1}; }
Result<std::optional<NestedType::SchemaFieldConstRef>> ListType::GetFieldById(
int32_t field_id) const {
Expand All @@ -154,13 +167,15 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> ListType::GetFieldById(
}
return std::nullopt;
}

Result<std::optional<NestedType::SchemaFieldConstRef>> ListType::GetFieldByIndex(
int index) const {
if (index == 0) {
return std::cref(element_);
}
return InvalidArgument("Invalid index {} to get field from list", index);
}

Result<std::optional<NestedType::SchemaFieldConstRef>> ListType::GetFieldByName(
std::string_view name, bool case_sensitive) const {
if (case_sensitive) {
Expand All @@ -174,6 +189,7 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> ListType::GetFieldByName(
}
return std::nullopt;
}

bool ListType::Equals(const Type& other) const {
if (other.type_id() != TypeId::kList) {
return false;
Expand All @@ -195,6 +211,7 @@ MapType::MapType(SchemaField key, SchemaField value)
const SchemaField& MapType::key() const { return fields_[0]; }
const SchemaField& MapType::value() const { return fields_[1]; }
TypeId MapType::type_id() const { return kTypeId; }

std::string MapType::ToString() const {
// XXX: work around Clang/libc++: "<{}>" in a format string appears to get
// parsed as {<>} or something; split up the format string to avoid that
Expand All @@ -204,6 +221,7 @@ std::string MapType::ToString() const {
repr += ">";
return repr;
}

std::span<const SchemaField> MapType::fields() const { return fields_; }
Result<std::optional<NestedType::SchemaFieldConstRef>> MapType::GetFieldById(
int32_t field_id) const {
Expand All @@ -214,6 +232,7 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> MapType::GetFieldById(
}
return std::nullopt;
}

Result<std::optional<NestedType::SchemaFieldConstRef>> MapType::GetFieldByIndex(
int32_t index) const {
if (index == 0) {
Expand All @@ -223,6 +242,7 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> MapType::GetFieldByIndex(
}
return InvalidArgument("Invalid index {} to get field from map", index);
}

Result<std::optional<NestedType::SchemaFieldConstRef>> MapType::GetFieldByName(
std::string_view name, bool case_sensitive) const {
if (case_sensitive) {
Expand All @@ -241,6 +261,7 @@ Result<std::optional<NestedType::SchemaFieldConstRef>> MapType::GetFieldByName(
}
return std::nullopt;
}

bool MapType::Equals(const Type& other) const {
if (other.type_id() != TypeId::kMap) {
return false;
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class ICEBERG_EXPORT StructType : public NestedType {
std::string_view name, bool case_sensitive) const override;
using NestedType::GetFieldByName;

std::unique_ptr<Schema> ToSchema() const;

protected:
bool Equals(const Type& other) const override;

Expand Down
Loading