diff --git a/include/neug/execution/common/columns/arrow_context_column.h b/include/neug/execution/common/columns/arrow_context_column.h index 8e7514ea..32c8ba89 100644 --- a/include/neug/execution/common/columns/arrow_context_column.h +++ b/include/neug/execution/common/columns/arrow_context_column.h @@ -93,8 +93,6 @@ class ArrowArrayContextColumn : public IContextColumn { std::shared_ptr shuffle( const std::vector& offsets) const override; - void generate_dedup_offset(std::vector& offsets) const override; - std::shared_ptr cast_to_value_column() const; private: @@ -152,6 +150,11 @@ class ArrowStreamContextColumn : public IContextColumn { return suppliers_; } + Value get_elem(size_t idx) const override { + LOG(FATAL) << "get_elem not implemented for arrow stream column"; + return Value(DataType::SQLNULL); + } + private: std::shared_ptr first_batch_; std::vector> suppliers_; diff --git a/include/neug/execution/common/columns/columns_utils.h b/include/neug/execution/common/columns/columns_utils.h index 2dceaf8c..cbf07c29 100644 --- a/include/neug/execution/common/columns/columns_utils.h +++ b/include/neug/execution/common/columns/columns_utils.h @@ -25,10 +25,14 @@ namespace execution { class ColumnsUtils { public: template - static void generate_dedup_offset(const std::vector& vec, size_t row_num, + static void generate_dedup_offset(const std::vector& vec, std::vector& offsets) { - std::vector row_indices(row_num); - row_indices.resize(row_num); + std::vector row_indices(vec.size()); + if (vec.empty()) { + offsets.clear(); + return; + } + row_indices.resize(vec.size()); std::iota(row_indices.begin(), row_indices.end(), 0); std::sort(row_indices.begin(), row_indices.end(), [&vec](size_t a, size_t b) { diff --git a/include/neug/execution/common/columns/edge_columns.h b/include/neug/execution/common/columns/edge_columns.h index 59bede4c..75f9f653 100644 --- a/include/neug/execution/common/columns/edge_columns.h +++ b/include/neug/execution/common/columns/edge_columns.h @@ -78,9 +78,9 @@ class SDSLEdgeColumn : public IEdgeColumn { inline Direction dir() const override { return dir_; } - void generate_dedup_offset(std::vector& offsets) const override { - // TODO(liulexiao): dedup with property value - ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + bool generate_dedup_offset(std::vector& offsets) const override { + ColumnsUtils::generate_dedup_offset(edges_, offsets); + return true; } std::string column_info() const override { @@ -185,8 +185,9 @@ class MSEdgeColumn : public IEdgeColumn { inline size_t size() const override { return total_size_; } - void generate_dedup_offset(std::vector& offsets) const override { - LOG(FATAL) << "not implemented for " << this->column_info(); + bool generate_dedup_offset(std::vector& offsets) const override { + LOG(ERROR) << "not implemented for " << this->column_info(); + return false; } std::string column_info() const override { @@ -352,8 +353,9 @@ class BDSLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } - void generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + bool generate_dedup_offset(std::vector& offsets) const override { + ColumnsUtils::generate_dedup_offset(edges_, offsets); + return true; } std::string column_info() const override { @@ -459,8 +461,9 @@ class SDMLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } - void generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + bool generate_dedup_offset(std::vector& offsets) const override { + ColumnsUtils::generate_dedup_offset(edges_, offsets); + return true; } std::string column_info() const override { @@ -580,8 +583,9 @@ class BDMLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } - void generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + bool generate_dedup_offset(std::vector& offsets) const override { + ColumnsUtils::generate_dedup_offset(edges_, offsets); + return true; } std::string column_info() const override { diff --git a/include/neug/execution/common/columns/i_context_column.h b/include/neug/execution/common/columns/i_context_column.h index f799ac63..adcfcd33 100644 --- a/include/neug/execution/common/columns/i_context_column.h +++ b/include/neug/execution/common/columns/i_context_column.h @@ -45,10 +45,7 @@ class IContextColumn { IContextColumn() = default; virtual ~IContextColumn() = default; - virtual size_t size() const { - LOG(FATAL) << "not implemented for " << this->column_info(); - return 0; - } + virtual size_t size() const = 0; virtual std::string column_info() const = 0; virtual ContextColumnType column_type() const = 0; @@ -57,46 +54,47 @@ class IContextColumn { virtual std::shared_ptr shuffle( const std::vector& offsets) const { - LOG(FATAL) << "not implemented for " << this->column_info(); + LOG(FATAL) << "shuffle not implemented for " << this->column_info(); return nullptr; } virtual std::shared_ptr optional_shuffle( const std::vector& offsets) const { - LOG(FATAL) << "not implemented for " << this->column_info(); + LOG(FATAL) << "optional_shuffle not implemented for " + << this->column_info(); return nullptr; } virtual std::shared_ptr union_col( std::shared_ptr other) const { - LOG(FATAL) << "not implemented for " << this->column_info(); + LOG(FATAL) << "union_col not implemented for " << this->column_info(); return nullptr; } - virtual Value get_elem(size_t idx) const { - LOG(FATAL) << "not implemented for " << this->column_info(); - return Value(elem_type()); - } - + virtual Value get_elem(size_t idx) const = 0; virtual bool has_value(size_t idx) const { return true; } virtual bool is_optional() const { return false; } - virtual void generate_dedup_offset(std::vector& offsets) const { - LOG(FATAL) << "not implemented for " << this->column_info(); + virtual bool generate_dedup_offset(std::vector& offsets) const { + LOG(ERROR) << "generate_dedup_offset not implemented for " + << this->column_info() << ", return false by default"; + return false; } virtual std::pair, std::vector>> generate_aggregate_offset() const { - LOG(INFO) << "not implemented for " << this->column_info(); + LOG(INFO) << "generate_aggregate_offset not implemented for " + << this->column_info() << ", return empty by default"; std::shared_ptr col(nullptr); return std::make_pair(col, std::vector>()); } virtual bool order_by_limit(bool asc, size_t limit, std::vector& offsets) const { - LOG(INFO) << "order by limit not implemented for " << this->column_info(); + LOG(ERROR) << "order by limit not implemented for " << this->column_info() + << ", return false by default"; return false; } }; diff --git a/include/neug/execution/common/columns/list_columns.h b/include/neug/execution/common/columns/list_columns.h index ba3332ea..6d9c2bc1 100644 --- a/include/neug/execution/common/columns/list_columns.h +++ b/include/neug/execution/common/columns/list_columns.h @@ -19,12 +19,6 @@ namespace neug { namespace execution { -class ListColumnBase : public IContextColumn { - public: - virtual std::pair, std::vector> - unfold() const = 0; -}; - struct list_item { uint64_t offset; uint64_t length; @@ -32,7 +26,7 @@ struct list_item { class ListColumnBuilder; -class ListColumn : public ListColumnBase { +class ListColumn : public IContextColumn { public: explicit ListColumn(DataType type) : elem_type_(type) { std::shared_ptr elem_type_info = @@ -64,13 +58,8 @@ class ListColumn : public ListColumnBase { return Value::LIST(elem_type_, std::move(list_values)); } - void generate_dedup_offset(std::vector& offsets) const override { - LOG(FATAL) << "not implemented for " << this->column_info(); - // ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); - } - std::pair, std::vector> unfold() - const override; + const; std::shared_ptr data_column() const { return datas_; } diff --git a/include/neug/execution/common/columns/path_columns.h b/include/neug/execution/common/columns/path_columns.h index 89b4efaf..506910c1 100644 --- a/include/neug/execution/common/columns/path_columns.h +++ b/include/neug/execution/common/columns/path_columns.h @@ -47,8 +47,9 @@ class PathColumn : public IContextColumn { } inline const Path& get_path(size_t idx) const { return data_[idx]; } - void generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); + bool generate_dedup_offset(std::vector& offsets) const override { + ColumnsUtils::generate_dedup_offset(data_, offsets); + return true; } template diff --git a/include/neug/execution/common/columns/struct_columns.h b/include/neug/execution/common/columns/struct_columns.h index 6c4dc760..050fdaed 100644 --- a/include/neug/execution/common/columns/struct_columns.h +++ b/include/neug/execution/common/columns/struct_columns.h @@ -49,8 +49,6 @@ class StructColumn : public IContextColumn { const DataType& elem_type() const override { return type_; } Value get_elem(size_t idx) const override; - void generate_dedup_offset(std::vector& offsets) const override; - bool is_optional() const override { return is_optional_; } bool has_value(size_t idx) const override { diff --git a/include/neug/execution/common/columns/value_columns.h b/include/neug/execution/common/columns/value_columns.h index 742f0ce8..790087c6 100644 --- a/include/neug/execution/common/columns/value_columns.h +++ b/include/neug/execution/common/columns/value_columns.h @@ -60,9 +60,10 @@ class ValueColumn : public IContextColumn { const std::vector& data() const { return data_; } const std::vector& validity_bitmap() const { return valid_; } - void generate_dedup_offset(std::vector& offsets) const override { + bool generate_dedup_offset(std::vector& offsets) const override { if (!is_optional_) { - return ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); + ColumnsUtils::generate_dedup_offset(data_, offsets); + return true; } std::set st; size_t null_index = std::numeric_limits::max(); @@ -79,6 +80,7 @@ class ValueColumn : public IContextColumn { if (null_index != std::numeric_limits::max()) { offsets.push_back(null_index); } + return true; } std::shared_ptr union_col( diff --git a/include/neug/execution/common/columns/vertex_columns.h b/include/neug/execution/common/columns/vertex_columns.h index 63b57f6b..d0e7357f 100644 --- a/include/neug/execution/common/columns/vertex_columns.h +++ b/include/neug/execution/common/columns/vertex_columns.h @@ -31,8 +31,7 @@ class IVertexColumn : public IContextColumn { IVertexColumn() : type_(DataType(DataTypeId::kVertex)) {} virtual ~IVertexColumn() = default; - __attribute__((always_inline)) ContextColumnType column_type() - const override { + ContextColumnType column_type() const override { return ContextColumnType::kVertex; } @@ -89,8 +88,7 @@ class SLVertexColumn : public IVertexColumn { std::to_string(size()) + "]"; } - __attribute__((always_inline)) VertexColumnType vertex_column_type() - const override { + VertexColumnType vertex_column_type() const override { return VertexColumnType::kSingle; } @@ -116,7 +114,7 @@ class SLVertexColumn : public IVertexColumn { std::shared_ptr union_col( std::shared_ptr other) const override; - void generate_dedup_offset(std::vector& offsets) const override; + bool generate_dedup_offset(std::vector& offsets) const override; std::pair, std::vector>> generate_aggregate_offset() const override; @@ -175,8 +173,7 @@ class MSVertexColumn : public IVertexColumn { std::to_string(size()) + "]"; } - __attribute__((always_inline)) VertexColumnType vertex_column_type() - const override { + VertexColumnType vertex_column_type() const override { return VertexColumnType::kMultiSegment; } @@ -234,6 +231,8 @@ class MSVertexColumn : public IVertexColumn { return vertices_[seg_id].second; } + bool generate_dedup_offset(std::vector& offsets) const override; + private: friend class MSVertexColumnBuilder; std::vector>> vertices_; @@ -320,8 +319,7 @@ class MLVertexColumn : public IVertexColumn { std::to_string(size()) + "]"; } - __attribute__((always_inline)) VertexColumnType vertex_column_type() - const override { + VertexColumnType vertex_column_type() const override { return VertexColumnType::kMultiple; } @@ -353,7 +351,7 @@ class MLVertexColumn : public IVertexColumn { std::set get_labels_set() const override { return labels_; } - void generate_dedup_offset(std::vector& offsets) const override; + bool generate_dedup_offset(std::vector& offsets) const override; private: friend class MLVertexColumnBuilder; diff --git a/src/execution/common/columns/arrow_context_column.cc b/src/execution/common/columns/arrow_context_column.cc index 7c666504..5f0647b7 100644 --- a/src/execution/common/columns/arrow_context_column.cc +++ b/src/execution/common/columns/arrow_context_column.cc @@ -238,105 +238,6 @@ shuffle_impl( return result_array; } -template -static bool less_than(const std::vector>& columns, - size_t size, size_t offset_a, size_t offset_b) { - auto [array_idx_a, local_offset_a] = - locate_array_and_offset(columns, size, offset_a); - auto [array_idx_b, local_offset_b] = - locate_array_and_offset(columns, size, offset_b); - auto casted_a = - std::static_pointer_cast(columns[array_idx_a]); - auto casted_b = - std::static_pointer_cast(columns[array_idx_b]); - return casted_a->Value(local_offset_a) < casted_b->Value(local_offset_b); -} - -template -static bool equal(const std::vector>& columns, - size_t size, size_t offset_a, size_t offset_b) { - auto [array_idx_a, local_offset_a] = - locate_array_and_offset(columns, size, offset_a); - auto [array_idx_b, local_offset_b] = - locate_array_and_offset(columns, size, offset_b); - auto casted_a = - std::static_pointer_cast(columns[array_idx_a]); - auto casted_b = - std::static_pointer_cast(columns[array_idx_b]); - return casted_a->Value(local_offset_a) == casted_b->Value(local_offset_b); -} - -// Template function to generate dedup offsets -template -static void generate_dedup_offset( - const std::vector>& columns, size_t size, - std::vector& offsets) { - // Create vector of all offsets - std::vector row_indices(size); - std::iota(row_indices.begin(), row_indices.end(), 0); - - // Create comparator that directly uses Arrow arrays - auto compare = [&](size_t a, size_t b) -> bool { - if (equal(columns, size, a, b)) { - return a < b; - } - return less_than(columns, size, a, b); - }; - - // Sort indices using stable_sort to maintain order for equal values - std::stable_sort(row_indices.begin(), row_indices.end(), compare); - - // Extract deduplicated offsets - offsets.clear(); - if (row_indices.empty()) { - return; - } - - offsets.push_back(row_indices[0]); - - for (size_t i = 1; i < row_indices.size(); ++i) { - if (!equal(columns, size, row_indices[i], - row_indices[i - 1])) { - offsets.push_back(row_indices[i]); - } - } -} - -// Define DISPATCH macro to generate dedup offsets based on Arrow type -void dispatch_generate_dedup_offset( - const std::vector>& columns, size_t size, - const std::shared_ptr& arrow_type, - std::vector& offsets) { - // Use Arrow type ID for dispatch - switch (arrow_type->id()) { -#define ARROW_TYPE_DISPATCHER_DEDUP(arrow_type_id, arrow_array_type) \ - case arrow::Type::arrow_type_id: { \ - generate_dedup_offset(columns, size, offsets); \ - break; \ - } - - ARROW_TYPE_DISPATCHER_DEDUP(BOOL, arrow::BooleanArray) - ARROW_TYPE_DISPATCHER_DEDUP(INT64, arrow::Int64Array) - ARROW_TYPE_DISPATCHER_DEDUP(INT32, arrow::Int32Array) - ARROW_TYPE_DISPATCHER_DEDUP(UINT32, arrow::UInt32Array) - ARROW_TYPE_DISPATCHER_DEDUP(UINT64, arrow::UInt64Array) - ARROW_TYPE_DISPATCHER_DEDUP(FLOAT, arrow::FloatArray) - ARROW_TYPE_DISPATCHER_DEDUP(DOUBLE, arrow::DoubleArray) - ARROW_TYPE_DISPATCHER_DEDUP(STRING, arrow::StringArray) - ARROW_TYPE_DISPATCHER_DEDUP(LARGE_STRING, arrow::LargeStringArray) - ARROW_TYPE_DISPATCHER_DEDUP(DATE32, arrow::Date32Array) - ARROW_TYPE_DISPATCHER_DEDUP(DATE64, arrow::Date64Array) - ARROW_TYPE_DISPATCHER_DEDUP(TIMESTAMP, arrow::TimestampArray) - // Interval type has been converted to arrow string type - -#undef ARROW_TYPE_DISPATCHER_DEDUP - - default: - THROW_NOT_SUPPORTED_EXCEPTION("Unsupported arrow type for dedup: " + - arrow_type->ToString()); - } -} - std::shared_ptr ArrowArrayContextColumnBuilder::finish() { return std::make_shared(columns_); } @@ -467,17 +368,6 @@ Value ArrowArrayContextColumn::get_elem(size_t idx) const { } } -void ArrowArrayContextColumn::generate_dedup_offset( - std::vector& offsets) const { - if (columns_.empty()) { - offsets.clear(); - return; - } - - auto arrow_type = columns_[0]->type(); - dispatch_generate_dedup_offset(columns_, size_, arrow_type, offsets); -} - std::shared_ptr ArrowArrayContextColumn::cast_to_value_column() const { auto builder = ColumnsUtils::create_builder(elem_type()); diff --git a/src/execution/common/columns/struct_columns.cc b/src/execution/common/columns/struct_columns.cc index a0408820..d5956b94 100644 --- a/src/execution/common/columns/struct_columns.cc +++ b/src/execution/common/columns/struct_columns.cc @@ -71,10 +71,6 @@ Value StructColumn::get_elem(size_t idx) const { return Value::STRUCT(type_, std::move(struct_values)); } -void StructColumn::generate_dedup_offset(std::vector& offsets) const { - LOG(FATAL) << "not implemented for " << this->column_info(); -} - StructColumnBuilder::StructColumnBuilder(DataType type) : type_(type) { const auto& child_types = StructType::GetChildTypes(type); for (const auto& child_type : child_types) { diff --git a/src/execution/common/columns/vertex_columns.cc b/src/execution/common/columns/vertex_columns.cc index 135e0bb3..89b880a5 100644 --- a/src/execution/common/columns/vertex_columns.cc +++ b/src/execution/common/columns/vertex_columns.cc @@ -59,7 +59,7 @@ std::shared_ptr SLVertexColumn::optional_shuffle( return builder.finish(); } -void SLVertexColumn::generate_dedup_offset(std::vector& offsets) const { +bool SLVertexColumn::generate_dedup_offset(std::vector& offsets) const { offsets.clear(); std::vector bitset; @@ -92,6 +92,7 @@ void SLVertexColumn::generate_dedup_offset(std::vector& offsets) const { if (flag) { offsets.push_back(idx); } + return true; } std::pair, std::vector>> @@ -225,6 +226,25 @@ std::shared_ptr MSVertexColumnBuilder::finish() { } } +bool MSVertexColumn::generate_dedup_offset(std::vector& offsets) const { + offsets.clear(); + std::set vset; + bool null_seen = false; + size_t len = size(); + for (size_t i = 0; i != len; ++i) { + auto cur = get_vertex(i); + if (cur.vid_ == std::numeric_limits::max()) { + if (!null_seen) { + null_seen = true; + offsets.push_back(i); + } + } else if (vset.find(cur) == vset.end()) { + offsets.push_back(i); + vset.insert(cur); + } + } + return true; +} std::shared_ptr MLVertexColumn::shuffle( const std::vector& offsets) const { MLVertexColumnBuilderOpt builder(this->get_labels_set()); @@ -259,7 +279,7 @@ std::shared_ptr MLVertexColumn::optional_shuffle( return builder.finish(); } -void MLVertexColumn::generate_dedup_offset(std::vector& offsets) const { +bool MLVertexColumn::generate_dedup_offset(std::vector& offsets) const { offsets.clear(); std::set vset; size_t n = vertices_.size(); @@ -270,6 +290,7 @@ void MLVertexColumn::generate_dedup_offset(std::vector& offsets) const { vset.insert(cur); } } + return true; } std::shared_ptr MLVertexColumnBuilder::finish() { diff --git a/src/execution/common/operators/retrieve/dedup.cc b/src/execution/common/operators/retrieve/dedup.cc index 11960f0f..202c696f 100644 --- a/src/execution/common/operators/retrieve/dedup.cc +++ b/src/execution/common/operators/retrieve/dedup.cc @@ -33,9 +33,10 @@ neug::result Dedup::dedup(Context&& ctx, std::vector offsets; if (cols.size() == 0) { return ctx; - } else if (cols.size() == 1) { - ctx.get(cols[0])->generate_dedup_offset(offsets); + } + if (cols.size() == 1 && ctx.get(cols[0])->generate_dedup_offset(offsets)) { } else { + offsets.clear(); phmap::flat_hash_set set; for (size_t r_i = 0; r_i < row_num; ++r_i) { std::vector bytes; diff --git a/src/execution/common/operators/retrieve/unfold.cc b/src/execution/common/operators/retrieve/unfold.cc index 4109a917..33e03820 100644 --- a/src/execution/common/operators/retrieve/unfold.cc +++ b/src/execution/common/operators/retrieve/unfold.cc @@ -30,7 +30,7 @@ neug::result Unfold::unfold(Context&& ctxs, int key, int alias) { LOG(ERROR) << "Unfold column type is not list"; RETURN_INVALID_ARGUMENT_ERROR("Unfold column type is not list"); } - auto list_col = std::dynamic_pointer_cast(col); + auto list_col = std::dynamic_pointer_cast(col); auto [ptr, offsets] = list_col->unfold(); ctxs.set_with_reshuffle(alias, ptr, offsets);