From c155729732128ffeba3cb19a44e5cc2124e0f9e5 Mon Sep 17 00:00:00 2001 From: liulx20 <519459125@qq.com> Date: Wed, 18 Mar 2026 19:13:07 +0800 Subject: [PATCH 1/3] make dedup operator cover all column types --- .../common/columns/arrow_context_column.h | 2 - .../execution/common/columns/edge_columns.h | 18 +-- .../common/columns/i_context_column.h | 5 +- .../execution/common/columns/list_columns.h | 6 +- .../execution/common/columns/path_columns.h | 3 +- .../execution/common/columns/struct_columns.h | 2 +- .../execution/common/columns/value_columns.h | 6 +- .../execution/common/columns/vertex_columns.h | 30 +++-- .../common/columns/arrow_context_column.cc | 110 ------------------ .../common/columns/struct_columns.cc | 5 +- .../common/columns/vertex_columns.cc | 19 ++- .../common/operators/retrieve/dedup.cc | 4 +- 12 files changed, 60 insertions(+), 150 deletions(-) diff --git a/include/neug/execution/common/columns/arrow_context_column.h b/include/neug/execution/common/columns/arrow_context_column.h index 8e7514ead..9ad194375 100644 --- a/include/neug/execution/common/columns/arrow_context_column.h +++ b/include/neug/execution/common/columns/arrow_context_column.h @@ -93,8 +93,6 @@ class ArrowArrayContextColumn : public IContextColumn { std::shared_ptr shuffle( const std::vector& offsets) const override; - void generate_dedup_offset(std::vector& offsets) const override; - std::shared_ptr cast_to_value_column() const; private: diff --git a/include/neug/execution/common/columns/edge_columns.h b/include/neug/execution/common/columns/edge_columns.h index 59bede4c8..663776fb5 100644 --- a/include/neug/execution/common/columns/edge_columns.h +++ b/include/neug/execution/common/columns/edge_columns.h @@ -78,9 +78,9 @@ class SDSLEdgeColumn : public IEdgeColumn { inline Direction dir() const override { return dir_; } - void generate_dedup_offset(std::vector& offsets) const override { - // TODO(liulexiao): dedup with property value + bool generate_dedup_offset(std::vector& offsets) const override { ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + return true; } std::string column_info() const override { @@ -185,8 +185,9 @@ class MSEdgeColumn : public IEdgeColumn { inline size_t size() const override { return total_size_; } - void generate_dedup_offset(std::vector& offsets) const override { - LOG(FATAL) << "not implemented for " << this->column_info(); + bool generate_dedup_offset(std::vector& offsets) const override { + LOG(ERROR) << "not implemented for " << this->column_info(); + return false; } std::string column_info() const override { @@ -352,8 +353,9 @@ class BDSLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } - void generate_dedup_offset(std::vector& offsets) const override { + bool generate_dedup_offset(std::vector& offsets) const override { ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + return true; } std::string column_info() const override { @@ -459,8 +461,9 @@ class SDMLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } - void generate_dedup_offset(std::vector& offsets) const override { + bool generate_dedup_offset(std::vector& offsets) const override { ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + return true; } std::string column_info() const override { @@ -580,8 +583,9 @@ class BDMLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } - void generate_dedup_offset(std::vector& offsets) const override { + bool generate_dedup_offset(std::vector& offsets) const override { ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + return true; } std::string column_info() const override { diff --git a/include/neug/execution/common/columns/i_context_column.h b/include/neug/execution/common/columns/i_context_column.h index f799ac63d..a92d63d6c 100644 --- a/include/neug/execution/common/columns/i_context_column.h +++ b/include/neug/execution/common/columns/i_context_column.h @@ -82,8 +82,9 @@ class IContextColumn { virtual bool is_optional() const { return false; } - virtual void generate_dedup_offset(std::vector& offsets) const { - LOG(FATAL) << "not implemented for " << this->column_info(); + virtual bool generate_dedup_offset(std::vector& offsets) const { + LOG(ERROR) << "not implemented for " << this->column_info(); + return false; } virtual std::pair, diff --git a/include/neug/execution/common/columns/list_columns.h b/include/neug/execution/common/columns/list_columns.h index ba3332ea4..9e7d77e04 100644 --- a/include/neug/execution/common/columns/list_columns.h +++ b/include/neug/execution/common/columns/list_columns.h @@ -64,9 +64,9 @@ class ListColumn : public ListColumnBase { return Value::LIST(elem_type_, std::move(list_values)); } - void generate_dedup_offset(std::vector& offsets) const override { - LOG(FATAL) << "not implemented for " << this->column_info(); - // ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); + bool generate_dedup_offset(std::vector& offsets) const override { + LOG(ERROR) << "not implemented for " << this->column_info(); + return false; } std::pair, std::vector> unfold() diff --git a/include/neug/execution/common/columns/path_columns.h b/include/neug/execution/common/columns/path_columns.h index 89b4efafd..d5e1b577b 100644 --- a/include/neug/execution/common/columns/path_columns.h +++ b/include/neug/execution/common/columns/path_columns.h @@ -47,8 +47,9 @@ class PathColumn : public IContextColumn { } inline const Path& get_path(size_t idx) const { return data_[idx]; } - void generate_dedup_offset(std::vector& offsets) const override { + bool generate_dedup_offset(std::vector& offsets) const override { ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); + return true; } template diff --git a/include/neug/execution/common/columns/struct_columns.h b/include/neug/execution/common/columns/struct_columns.h index 6c4dc7600..7c38ff6fe 100644 --- a/include/neug/execution/common/columns/struct_columns.h +++ b/include/neug/execution/common/columns/struct_columns.h @@ -49,7 +49,7 @@ class StructColumn : public IContextColumn { const DataType& elem_type() const override { return type_; } Value get_elem(size_t idx) const override; - void generate_dedup_offset(std::vector& offsets) const override; + bool generate_dedup_offset(std::vector& offsets) const override; bool is_optional() const override { return is_optional_; } diff --git a/include/neug/execution/common/columns/value_columns.h b/include/neug/execution/common/columns/value_columns.h index 742f0ce83..5af6643ae 100644 --- a/include/neug/execution/common/columns/value_columns.h +++ b/include/neug/execution/common/columns/value_columns.h @@ -60,9 +60,10 @@ class ValueColumn : public IContextColumn { const std::vector& data() const { return data_; } const std::vector& validity_bitmap() const { return valid_; } - void generate_dedup_offset(std::vector& offsets) const override { + bool generate_dedup_offset(std::vector& offsets) const override { if (!is_optional_) { - return ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); + ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); + return true; } std::set st; size_t null_index = std::numeric_limits::max(); @@ -79,6 +80,7 @@ class ValueColumn : public IContextColumn { if (null_index != std::numeric_limits::max()) { offsets.push_back(null_index); } + return true; } std::shared_ptr union_col( diff --git a/include/neug/execution/common/columns/vertex_columns.h b/include/neug/execution/common/columns/vertex_columns.h index 63b57f6b5..7b63d0c44 100644 --- a/include/neug/execution/common/columns/vertex_columns.h +++ b/include/neug/execution/common/columns/vertex_columns.h @@ -31,8 +31,7 @@ class IVertexColumn : public IContextColumn { IVertexColumn() : type_(DataType(DataTypeId::kVertex)) {} virtual ~IVertexColumn() = default; - __attribute__((always_inline)) ContextColumnType column_type() - const override { + ContextColumnType column_type() const override { return ContextColumnType::kVertex; } @@ -89,8 +88,7 @@ class SLVertexColumn : public IVertexColumn { std::to_string(size()) + "]"; } - __attribute__((always_inline)) VertexColumnType vertex_column_type() - const override { + VertexColumnType vertex_column_type() const override { return VertexColumnType::kSingle; } @@ -100,8 +98,8 @@ class SLVertexColumn : public IVertexColumn { std::shared_ptr optional_shuffle( const std::vector& offset) const override; - __attribute__((always_inline)) VertexRecord get_vertex( - size_t idx) const override { + __attribute__((always_inline)) VertexRecord + get_vertex(size_t idx) const override { return {label_, vertices_[idx]}; } @@ -116,7 +114,7 @@ class SLVertexColumn : public IVertexColumn { std::shared_ptr union_col( std::shared_ptr other) const override; - void generate_dedup_offset(std::vector& offsets) const override; + bool generate_dedup_offset(std::vector& offsets) const override; std::pair, std::vector>> generate_aggregate_offset() const override; @@ -175,8 +173,7 @@ class MSVertexColumn : public IVertexColumn { std::to_string(size()) + "]"; } - __attribute__((always_inline)) VertexColumnType vertex_column_type() - const override { + VertexColumnType vertex_column_type() const override { return VertexColumnType::kMultiSegment; } @@ -186,8 +183,8 @@ class MSVertexColumn : public IVertexColumn { std::shared_ptr optional_shuffle( const std::vector& offsets) const override; - __attribute__((always_inline)) VertexRecord get_vertex( - size_t idx) const override { + __attribute__((always_inline)) VertexRecord + get_vertex(size_t idx) const override { for (auto& pair : vertices_) { if (idx < pair.second.size()) { return {pair.first, pair.second[idx]}; @@ -234,6 +231,8 @@ class MSVertexColumn : public IVertexColumn { return vertices_[seg_id].second; } + bool generate_dedup_offset(std::vector& offsets) const override; + private: friend class MSVertexColumnBuilder; std::vector>> vertices_; @@ -320,8 +319,7 @@ class MLVertexColumn : public IVertexColumn { std::to_string(size()) + "]"; } - __attribute__((always_inline)) VertexColumnType vertex_column_type() - const override { + VertexColumnType vertex_column_type() const override { return VertexColumnType::kMultiple; } @@ -330,8 +328,8 @@ class MLVertexColumn : public IVertexColumn { std::shared_ptr optional_shuffle( const std::vector& offsets) const override; - __attribute__((always_inline)) VertexRecord get_vertex( - size_t idx) const override { + __attribute__((always_inline)) VertexRecord + get_vertex(size_t idx) const override { return vertices_[idx]; } @@ -353,7 +351,7 @@ class MLVertexColumn : public IVertexColumn { std::set get_labels_set() const override { return labels_; } - void generate_dedup_offset(std::vector& offsets) const override; + bool generate_dedup_offset(std::vector& offsets) const override; private: friend class MLVertexColumnBuilder; diff --git a/src/execution/common/columns/arrow_context_column.cc b/src/execution/common/columns/arrow_context_column.cc index 7c666504a..5f0647b71 100644 --- a/src/execution/common/columns/arrow_context_column.cc +++ b/src/execution/common/columns/arrow_context_column.cc @@ -238,105 +238,6 @@ shuffle_impl( return result_array; } -template -static bool less_than(const std::vector>& columns, - size_t size, size_t offset_a, size_t offset_b) { - auto [array_idx_a, local_offset_a] = - locate_array_and_offset(columns, size, offset_a); - auto [array_idx_b, local_offset_b] = - locate_array_and_offset(columns, size, offset_b); - auto casted_a = - std::static_pointer_cast(columns[array_idx_a]); - auto casted_b = - std::static_pointer_cast(columns[array_idx_b]); - return casted_a->Value(local_offset_a) < casted_b->Value(local_offset_b); -} - -template -static bool equal(const std::vector>& columns, - size_t size, size_t offset_a, size_t offset_b) { - auto [array_idx_a, local_offset_a] = - locate_array_and_offset(columns, size, offset_a); - auto [array_idx_b, local_offset_b] = - locate_array_and_offset(columns, size, offset_b); - auto casted_a = - std::static_pointer_cast(columns[array_idx_a]); - auto casted_b = - std::static_pointer_cast(columns[array_idx_b]); - return casted_a->Value(local_offset_a) == casted_b->Value(local_offset_b); -} - -// Template function to generate dedup offsets -template -static void generate_dedup_offset( - const std::vector>& columns, size_t size, - std::vector& offsets) { - // Create vector of all offsets - std::vector row_indices(size); - std::iota(row_indices.begin(), row_indices.end(), 0); - - // Create comparator that directly uses Arrow arrays - auto compare = [&](size_t a, size_t b) -> bool { - if (equal(columns, size, a, b)) { - return a < b; - } - return less_than(columns, size, a, b); - }; - - // Sort indices using stable_sort to maintain order for equal values - std::stable_sort(row_indices.begin(), row_indices.end(), compare); - - // Extract deduplicated offsets - offsets.clear(); - if (row_indices.empty()) { - return; - } - - offsets.push_back(row_indices[0]); - - for (size_t i = 1; i < row_indices.size(); ++i) { - if (!equal(columns, size, row_indices[i], - row_indices[i - 1])) { - offsets.push_back(row_indices[i]); - } - } -} - -// Define DISPATCH macro to generate dedup offsets based on Arrow type -void dispatch_generate_dedup_offset( - const std::vector>& columns, size_t size, - const std::shared_ptr& arrow_type, - std::vector& offsets) { - // Use Arrow type ID for dispatch - switch (arrow_type->id()) { -#define ARROW_TYPE_DISPATCHER_DEDUP(arrow_type_id, arrow_array_type) \ - case arrow::Type::arrow_type_id: { \ - generate_dedup_offset(columns, size, offsets); \ - break; \ - } - - ARROW_TYPE_DISPATCHER_DEDUP(BOOL, arrow::BooleanArray) - ARROW_TYPE_DISPATCHER_DEDUP(INT64, arrow::Int64Array) - ARROW_TYPE_DISPATCHER_DEDUP(INT32, arrow::Int32Array) - ARROW_TYPE_DISPATCHER_DEDUP(UINT32, arrow::UInt32Array) - ARROW_TYPE_DISPATCHER_DEDUP(UINT64, arrow::UInt64Array) - ARROW_TYPE_DISPATCHER_DEDUP(FLOAT, arrow::FloatArray) - ARROW_TYPE_DISPATCHER_DEDUP(DOUBLE, arrow::DoubleArray) - ARROW_TYPE_DISPATCHER_DEDUP(STRING, arrow::StringArray) - ARROW_TYPE_DISPATCHER_DEDUP(LARGE_STRING, arrow::LargeStringArray) - ARROW_TYPE_DISPATCHER_DEDUP(DATE32, arrow::Date32Array) - ARROW_TYPE_DISPATCHER_DEDUP(DATE64, arrow::Date64Array) - ARROW_TYPE_DISPATCHER_DEDUP(TIMESTAMP, arrow::TimestampArray) - // Interval type has been converted to arrow string type - -#undef ARROW_TYPE_DISPATCHER_DEDUP - - default: - THROW_NOT_SUPPORTED_EXCEPTION("Unsupported arrow type for dedup: " + - arrow_type->ToString()); - } -} - std::shared_ptr ArrowArrayContextColumnBuilder::finish() { return std::make_shared(columns_); } @@ -467,17 +368,6 @@ Value ArrowArrayContextColumn::get_elem(size_t idx) const { } } -void ArrowArrayContextColumn::generate_dedup_offset( - std::vector& offsets) const { - if (columns_.empty()) { - offsets.clear(); - return; - } - - auto arrow_type = columns_[0]->type(); - dispatch_generate_dedup_offset(columns_, size_, arrow_type, offsets); -} - std::shared_ptr ArrowArrayContextColumn::cast_to_value_column() const { auto builder = ColumnsUtils::create_builder(elem_type()); diff --git a/src/execution/common/columns/struct_columns.cc b/src/execution/common/columns/struct_columns.cc index a04088207..d3f04b9dc 100644 --- a/src/execution/common/columns/struct_columns.cc +++ b/src/execution/common/columns/struct_columns.cc @@ -71,8 +71,9 @@ Value StructColumn::get_elem(size_t idx) const { return Value::STRUCT(type_, std::move(struct_values)); } -void StructColumn::generate_dedup_offset(std::vector& offsets) const { - LOG(FATAL) << "not implemented for " << this->column_info(); +bool StructColumn::generate_dedup_offset(std::vector& offsets) const { + LOG(ERROR) << "not implemented for " << this->column_info(); + return false; } StructColumnBuilder::StructColumnBuilder(DataType type) : type_(type) { diff --git a/src/execution/common/columns/vertex_columns.cc b/src/execution/common/columns/vertex_columns.cc index 135e0bb3d..79a033ee0 100644 --- a/src/execution/common/columns/vertex_columns.cc +++ b/src/execution/common/columns/vertex_columns.cc @@ -59,7 +59,7 @@ std::shared_ptr SLVertexColumn::optional_shuffle( return builder.finish(); } -void SLVertexColumn::generate_dedup_offset(std::vector& offsets) const { +bool SLVertexColumn::generate_dedup_offset(std::vector& offsets) const { offsets.clear(); std::vector bitset; @@ -92,6 +92,7 @@ void SLVertexColumn::generate_dedup_offset(std::vector& offsets) const { if (flag) { offsets.push_back(idx); } + return true; } std::pair, std::vector>> @@ -225,6 +226,19 @@ std::shared_ptr MSVertexColumnBuilder::finish() { } } +bool MSVertexColumn::generate_dedup_offset(std::vector& offsets) const { + offsets.clear(); + std::set vset; + size_t len = size(); + for (size_t i = 0; i != len; ++i) { + auto cur = get_vertex(i); + if (vset.find(cur) == vset.end()) { + offsets.push_back(i); + vset.insert(cur); + } + } + return true; +} std::shared_ptr MLVertexColumn::shuffle( const std::vector& offsets) const { MLVertexColumnBuilderOpt builder(this->get_labels_set()); @@ -259,7 +273,7 @@ std::shared_ptr MLVertexColumn::optional_shuffle( return builder.finish(); } -void MLVertexColumn::generate_dedup_offset(std::vector& offsets) const { +bool MLVertexColumn::generate_dedup_offset(std::vector& offsets) const { offsets.clear(); std::set vset; size_t n = vertices_.size(); @@ -270,6 +284,7 @@ void MLVertexColumn::generate_dedup_offset(std::vector& offsets) const { vset.insert(cur); } } + return true; } std::shared_ptr MLVertexColumnBuilder::finish() { diff --git a/src/execution/common/operators/retrieve/dedup.cc b/src/execution/common/operators/retrieve/dedup.cc index 11960f0f2..48367d7af 100644 --- a/src/execution/common/operators/retrieve/dedup.cc +++ b/src/execution/common/operators/retrieve/dedup.cc @@ -33,8 +33,8 @@ neug::result Dedup::dedup(Context&& ctx, std::vector offsets; if (cols.size() == 0) { return ctx; - } else if (cols.size() == 1) { - ctx.get(cols[0])->generate_dedup_offset(offsets); + } + if (cols.size() == 1 && ctx.get(cols[0])->generate_dedup_offset(offsets)) { } else { phmap::flat_hash_set set; for (size_t r_i = 0; r_i < row_num; ++r_i) { From 58e4e98e0a48169dbf68d9338c9ef763ab7f0b23 Mon Sep 17 00:00:00 2001 From: liulx20 <519459125@qq.com> Date: Wed, 18 Mar 2026 19:16:42 +0800 Subject: [PATCH 2/3] format --- .../neug/execution/common/columns/vertex_columns.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/include/neug/execution/common/columns/vertex_columns.h b/include/neug/execution/common/columns/vertex_columns.h index 7b63d0c44..d0e7357f1 100644 --- a/include/neug/execution/common/columns/vertex_columns.h +++ b/include/neug/execution/common/columns/vertex_columns.h @@ -98,8 +98,8 @@ class SLVertexColumn : public IVertexColumn { std::shared_ptr optional_shuffle( const std::vector& offset) const override; - __attribute__((always_inline)) VertexRecord - get_vertex(size_t idx) const override { + __attribute__((always_inline)) VertexRecord get_vertex( + size_t idx) const override { return {label_, vertices_[idx]}; } @@ -183,8 +183,8 @@ class MSVertexColumn : public IVertexColumn { std::shared_ptr optional_shuffle( const std::vector& offsets) const override; - __attribute__((always_inline)) VertexRecord - get_vertex(size_t idx) const override { + __attribute__((always_inline)) VertexRecord get_vertex( + size_t idx) const override { for (auto& pair : vertices_) { if (idx < pair.second.size()) { return {pair.first, pair.second[idx]}; @@ -328,8 +328,8 @@ class MLVertexColumn : public IVertexColumn { std::shared_ptr optional_shuffle( const std::vector& offsets) const override; - __attribute__((always_inline)) VertexRecord - get_vertex(size_t idx) const override { + __attribute__((always_inline)) VertexRecord get_vertex( + size_t idx) const override { return vertices_[idx]; } From c4ed5f6b16a3040718ae61f41bfa3aa5bd9334ac Mon Sep 17 00:00:00 2001 From: liulx20 <519459125@qq.com> Date: Thu, 19 Mar 2026 10:33:42 +0800 Subject: [PATCH 3/3] fix --- .../common/columns/arrow_context_column.h | 5 ++++ .../execution/common/columns/columns_utils.h | 10 ++++--- .../execution/common/columns/edge_columns.h | 8 +++--- .../common/columns/i_context_column.h | 27 +++++++++---------- .../execution/common/columns/list_columns.h | 15 ++--------- .../execution/common/columns/path_columns.h | 2 +- .../execution/common/columns/struct_columns.h | 2 -- .../execution/common/columns/value_columns.h | 2 +- .../common/columns/struct_columns.cc | 5 ---- .../common/columns/vertex_columns.cc | 8 +++++- .../common/operators/retrieve/dedup.cc | 1 + .../common/operators/retrieve/unfold.cc | 2 +- 12 files changed, 41 insertions(+), 46 deletions(-) diff --git a/include/neug/execution/common/columns/arrow_context_column.h b/include/neug/execution/common/columns/arrow_context_column.h index 9ad194375..32c8ba89c 100644 --- a/include/neug/execution/common/columns/arrow_context_column.h +++ b/include/neug/execution/common/columns/arrow_context_column.h @@ -150,6 +150,11 @@ class ArrowStreamContextColumn : public IContextColumn { return suppliers_; } + Value get_elem(size_t idx) const override { + LOG(FATAL) << "get_elem not implemented for arrow stream column"; + return Value(DataType::SQLNULL); + } + private: std::shared_ptr first_batch_; std::vector> suppliers_; diff --git a/include/neug/execution/common/columns/columns_utils.h b/include/neug/execution/common/columns/columns_utils.h index 2dceaf8c9..cbf07c294 100644 --- a/include/neug/execution/common/columns/columns_utils.h +++ b/include/neug/execution/common/columns/columns_utils.h @@ -25,10 +25,14 @@ namespace execution { class ColumnsUtils { public: template - static void generate_dedup_offset(const std::vector& vec, size_t row_num, + static void generate_dedup_offset(const std::vector& vec, std::vector& offsets) { - std::vector row_indices(row_num); - row_indices.resize(row_num); + std::vector row_indices(vec.size()); + if (vec.empty()) { + offsets.clear(); + return; + } + row_indices.resize(vec.size()); std::iota(row_indices.begin(), row_indices.end(), 0); std::sort(row_indices.begin(), row_indices.end(), [&vec](size_t a, size_t b) { diff --git a/include/neug/execution/common/columns/edge_columns.h b/include/neug/execution/common/columns/edge_columns.h index 663776fb5..75f9f6535 100644 --- a/include/neug/execution/common/columns/edge_columns.h +++ b/include/neug/execution/common/columns/edge_columns.h @@ -79,7 +79,7 @@ class SDSLEdgeColumn : public IEdgeColumn { inline Direction dir() const override { return dir_; } bool generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + ColumnsUtils::generate_dedup_offset(edges_, offsets); return true; } @@ -354,7 +354,7 @@ class BDSLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } bool generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + ColumnsUtils::generate_dedup_offset(edges_, offsets); return true; } @@ -462,7 +462,7 @@ class SDMLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } bool generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + ColumnsUtils::generate_dedup_offset(edges_, offsets); return true; } @@ -584,7 +584,7 @@ class BDMLEdgeColumn : public IEdgeColumn { inline size_t size() const override { return edges_.size(); } bool generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(edges_, size(), offsets); + ColumnsUtils::generate_dedup_offset(edges_, offsets); return true; } diff --git a/include/neug/execution/common/columns/i_context_column.h b/include/neug/execution/common/columns/i_context_column.h index a92d63d6c..adcfcd333 100644 --- a/include/neug/execution/common/columns/i_context_column.h +++ b/include/neug/execution/common/columns/i_context_column.h @@ -45,10 +45,7 @@ class IContextColumn { IContextColumn() = default; virtual ~IContextColumn() = default; - virtual size_t size() const { - LOG(FATAL) << "not implemented for " << this->column_info(); - return 0; - } + virtual size_t size() const = 0; virtual std::string column_info() const = 0; virtual ContextColumnType column_type() const = 0; @@ -57,47 +54,47 @@ class IContextColumn { virtual std::shared_ptr shuffle( const std::vector& offsets) const { - LOG(FATAL) << "not implemented for " << this->column_info(); + LOG(FATAL) << "shuffle not implemented for " << this->column_info(); return nullptr; } virtual std::shared_ptr optional_shuffle( const std::vector& offsets) const { - LOG(FATAL) << "not implemented for " << this->column_info(); + LOG(FATAL) << "optional_shuffle not implemented for " + << this->column_info(); return nullptr; } virtual std::shared_ptr union_col( std::shared_ptr other) const { - LOG(FATAL) << "not implemented for " << this->column_info(); + LOG(FATAL) << "union_col not implemented for " << this->column_info(); return nullptr; } - virtual Value get_elem(size_t idx) const { - LOG(FATAL) << "not implemented for " << this->column_info(); - return Value(elem_type()); - } - + virtual Value get_elem(size_t idx) const = 0; virtual bool has_value(size_t idx) const { return true; } virtual bool is_optional() const { return false; } virtual bool generate_dedup_offset(std::vector& offsets) const { - LOG(ERROR) << "not implemented for " << this->column_info(); + LOG(ERROR) << "generate_dedup_offset not implemented for " + << this->column_info() << ", return false by default"; return false; } virtual std::pair, std::vector>> generate_aggregate_offset() const { - LOG(INFO) << "not implemented for " << this->column_info(); + LOG(INFO) << "generate_aggregate_offset not implemented for " + << this->column_info() << ", return empty by default"; std::shared_ptr col(nullptr); return std::make_pair(col, std::vector>()); } virtual bool order_by_limit(bool asc, size_t limit, std::vector& offsets) const { - LOG(INFO) << "order by limit not implemented for " << this->column_info(); + LOG(ERROR) << "order by limit not implemented for " << this->column_info() + << ", return false by default"; return false; } }; diff --git a/include/neug/execution/common/columns/list_columns.h b/include/neug/execution/common/columns/list_columns.h index 9e7d77e04..6d9c2bc10 100644 --- a/include/neug/execution/common/columns/list_columns.h +++ b/include/neug/execution/common/columns/list_columns.h @@ -19,12 +19,6 @@ namespace neug { namespace execution { -class ListColumnBase : public IContextColumn { - public: - virtual std::pair, std::vector> - unfold() const = 0; -}; - struct list_item { uint64_t offset; uint64_t length; @@ -32,7 +26,7 @@ struct list_item { class ListColumnBuilder; -class ListColumn : public ListColumnBase { +class ListColumn : public IContextColumn { public: explicit ListColumn(DataType type) : elem_type_(type) { std::shared_ptr elem_type_info = @@ -64,13 +58,8 @@ class ListColumn : public ListColumnBase { return Value::LIST(elem_type_, std::move(list_values)); } - bool generate_dedup_offset(std::vector& offsets) const override { - LOG(ERROR) << "not implemented for " << this->column_info(); - return false; - } - std::pair, std::vector> unfold() - const override; + const; std::shared_ptr data_column() const { return datas_; } diff --git a/include/neug/execution/common/columns/path_columns.h b/include/neug/execution/common/columns/path_columns.h index d5e1b577b..506910c1d 100644 --- a/include/neug/execution/common/columns/path_columns.h +++ b/include/neug/execution/common/columns/path_columns.h @@ -48,7 +48,7 @@ class PathColumn : public IContextColumn { inline const Path& get_path(size_t idx) const { return data_[idx]; } bool generate_dedup_offset(std::vector& offsets) const override { - ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); + ColumnsUtils::generate_dedup_offset(data_, offsets); return true; } diff --git a/include/neug/execution/common/columns/struct_columns.h b/include/neug/execution/common/columns/struct_columns.h index 7c38ff6fe..050fdaed3 100644 --- a/include/neug/execution/common/columns/struct_columns.h +++ b/include/neug/execution/common/columns/struct_columns.h @@ -49,8 +49,6 @@ class StructColumn : public IContextColumn { const DataType& elem_type() const override { return type_; } Value get_elem(size_t idx) const override; - bool generate_dedup_offset(std::vector& offsets) const override; - bool is_optional() const override { return is_optional_; } bool has_value(size_t idx) const override { diff --git a/include/neug/execution/common/columns/value_columns.h b/include/neug/execution/common/columns/value_columns.h index 5af6643ae..790087c66 100644 --- a/include/neug/execution/common/columns/value_columns.h +++ b/include/neug/execution/common/columns/value_columns.h @@ -62,7 +62,7 @@ class ValueColumn : public IContextColumn { bool generate_dedup_offset(std::vector& offsets) const override { if (!is_optional_) { - ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets); + ColumnsUtils::generate_dedup_offset(data_, offsets); return true; } std::set st; diff --git a/src/execution/common/columns/struct_columns.cc b/src/execution/common/columns/struct_columns.cc index d3f04b9dc..d5956b945 100644 --- a/src/execution/common/columns/struct_columns.cc +++ b/src/execution/common/columns/struct_columns.cc @@ -71,11 +71,6 @@ Value StructColumn::get_elem(size_t idx) const { return Value::STRUCT(type_, std::move(struct_values)); } -bool StructColumn::generate_dedup_offset(std::vector& offsets) const { - LOG(ERROR) << "not implemented for " << this->column_info(); - return false; -} - StructColumnBuilder::StructColumnBuilder(DataType type) : type_(type) { const auto& child_types = StructType::GetChildTypes(type); for (const auto& child_type : child_types) { diff --git a/src/execution/common/columns/vertex_columns.cc b/src/execution/common/columns/vertex_columns.cc index 79a033ee0..89b880a54 100644 --- a/src/execution/common/columns/vertex_columns.cc +++ b/src/execution/common/columns/vertex_columns.cc @@ -229,10 +229,16 @@ std::shared_ptr MSVertexColumnBuilder::finish() { bool MSVertexColumn::generate_dedup_offset(std::vector& offsets) const { offsets.clear(); std::set vset; + bool null_seen = false; size_t len = size(); for (size_t i = 0; i != len; ++i) { auto cur = get_vertex(i); - if (vset.find(cur) == vset.end()) { + if (cur.vid_ == std::numeric_limits::max()) { + if (!null_seen) { + null_seen = true; + offsets.push_back(i); + } + } else if (vset.find(cur) == vset.end()) { offsets.push_back(i); vset.insert(cur); } diff --git a/src/execution/common/operators/retrieve/dedup.cc b/src/execution/common/operators/retrieve/dedup.cc index 48367d7af..202c696fd 100644 --- a/src/execution/common/operators/retrieve/dedup.cc +++ b/src/execution/common/operators/retrieve/dedup.cc @@ -36,6 +36,7 @@ neug::result Dedup::dedup(Context&& ctx, } if (cols.size() == 1 && ctx.get(cols[0])->generate_dedup_offset(offsets)) { } else { + offsets.clear(); phmap::flat_hash_set set; for (size_t r_i = 0; r_i < row_num; ++r_i) { std::vector bytes; diff --git a/src/execution/common/operators/retrieve/unfold.cc b/src/execution/common/operators/retrieve/unfold.cc index 4109a9175..33e03820b 100644 --- a/src/execution/common/operators/retrieve/unfold.cc +++ b/src/execution/common/operators/retrieve/unfold.cc @@ -30,7 +30,7 @@ neug::result Unfold::unfold(Context&& ctxs, int key, int alias) { LOG(ERROR) << "Unfold column type is not list"; RETURN_INVALID_ARGUMENT_ERROR("Unfold column type is not list"); } - auto list_col = std::dynamic_pointer_cast(col); + auto list_col = std::dynamic_pointer_cast(col); auto [ptr, offsets] = list_col->unfold(); ctxs.set_with_reshuffle(alias, ptr, offsets);