Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions include/neug/execution/common/columns/arrow_context_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ class ArrowArrayContextColumn : public IContextColumn {
std::shared_ptr<IContextColumn> shuffle(
const std::vector<size_t>& offsets) const override;

void generate_dedup_offset(std::vector<size_t>& offsets) const override;

std::shared_ptr<IContextColumn> cast_to_value_column() const;

private:
Expand Down Expand Up @@ -152,6 +150,11 @@ class ArrowStreamContextColumn : public IContextColumn {
return suppliers_;
}

Value get_elem(size_t idx) const override {
LOG(FATAL) << "get_elem not implemented for arrow stream column";
return Value(DataType::SQLNULL);
}

private:
std::shared_ptr<arrow::RecordBatch> first_batch_;
std::vector<std::shared_ptr<IRecordBatchSupplier>> suppliers_;
Expand Down
10 changes: 7 additions & 3 deletions include/neug/execution/common/columns/columns_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ namespace execution {
class ColumnsUtils {
public:
template <typename T>
static void generate_dedup_offset(const std::vector<T>& vec, size_t row_num,
static void generate_dedup_offset(const std::vector<T>& vec,
std::vector<size_t>& offsets) {
std::vector<size_t> row_indices(row_num);
row_indices.resize(row_num);
std::vector<size_t> row_indices(vec.size());
if (vec.empty()) {
offsets.clear();
return;
}
row_indices.resize(vec.size());
std::iota(row_indices.begin(), row_indices.end(), 0);
std::sort(row_indices.begin(), row_indices.end(),
[&vec](size_t a, size_t b) {
Expand Down
26 changes: 15 additions & 11 deletions include/neug/execution/common/columns/edge_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class SDSLEdgeColumn : public IEdgeColumn {

inline Direction dir() const override { return dir_; }

void generate_dedup_offset(std::vector<size_t>& offsets) const override {
// TODO(liulexiao): dedup with property value
ColumnsUtils::generate_dedup_offset(edges_, size(), offsets);
bool generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(edges_, offsets);
return true;
}

std::string column_info() const override {
Expand Down Expand Up @@ -185,8 +185,9 @@ class MSEdgeColumn : public IEdgeColumn {

inline size_t size() const override { return total_size_; }

void generate_dedup_offset(std::vector<size_t>& offsets) const override {
LOG(FATAL) << "not implemented for " << this->column_info();
bool generate_dedup_offset(std::vector<size_t>& offsets) const override {
LOG(ERROR) << "not implemented for " << this->column_info();
return false;
}

std::string column_info() const override {
Expand Down Expand Up @@ -352,8 +353,9 @@ class BDSLEdgeColumn : public IEdgeColumn {

inline size_t size() const override { return edges_.size(); }

void generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(edges_, size(), offsets);
bool generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(edges_, offsets);
return true;
}

std::string column_info() const override {
Expand Down Expand Up @@ -459,8 +461,9 @@ class SDMLEdgeColumn : public IEdgeColumn {

inline size_t size() const override { return edges_.size(); }

void generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(edges_, size(), offsets);
bool generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(edges_, offsets);
return true;
}

std::string column_info() const override {
Expand Down Expand Up @@ -580,8 +583,9 @@ class BDMLEdgeColumn : public IEdgeColumn {

inline size_t size() const override { return edges_.size(); }

void generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(edges_, size(), offsets);
bool generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(edges_, offsets);
return true;
}

std::string column_info() const override {
Expand Down
30 changes: 14 additions & 16 deletions include/neug/execution/common/columns/i_context_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@ class IContextColumn {
IContextColumn() = default;
virtual ~IContextColumn() = default;

virtual size_t size() const {
LOG(FATAL) << "not implemented for " << this->column_info();
return 0;
}
virtual size_t size() const = 0;

virtual std::string column_info() const = 0;
virtual ContextColumnType column_type() const = 0;
Expand All @@ -57,46 +54,47 @@ class IContextColumn {

virtual std::shared_ptr<IContextColumn> shuffle(
const std::vector<size_t>& offsets) const {
LOG(FATAL) << "not implemented for " << this->column_info();
LOG(FATAL) << "shuffle not implemented for " << this->column_info();
return nullptr;
}

virtual std::shared_ptr<IContextColumn> optional_shuffle(
const std::vector<size_t>& offsets) const {
LOG(FATAL) << "not implemented for " << this->column_info();
LOG(FATAL) << "optional_shuffle not implemented for "
<< this->column_info();
return nullptr;
}

virtual std::shared_ptr<IContextColumn> union_col(
std::shared_ptr<IContextColumn> other) const {
LOG(FATAL) << "not implemented for " << this->column_info();
LOG(FATAL) << "union_col not implemented for " << this->column_info();
return nullptr;
}

virtual Value get_elem(size_t idx) const {
LOG(FATAL) << "not implemented for " << this->column_info();
return Value(elem_type());
}

virtual Value get_elem(size_t idx) const = 0;
virtual bool has_value(size_t idx) const { return true; }

virtual bool is_optional() const { return false; }

virtual void generate_dedup_offset(std::vector<size_t>& offsets) const {
LOG(FATAL) << "not implemented for " << this->column_info();
virtual bool generate_dedup_offset(std::vector<size_t>& offsets) const {
LOG(ERROR) << "generate_dedup_offset not implemented for "
<< this->column_info() << ", return false by default";
return false;
}

virtual std::pair<std::shared_ptr<IContextColumn>,
std::vector<std::vector<size_t>>>
generate_aggregate_offset() const {
LOG(INFO) << "not implemented for " << this->column_info();
LOG(INFO) << "generate_aggregate_offset not implemented for "
<< this->column_info() << ", return empty by default";
std::shared_ptr<IContextColumn> col(nullptr);
return std::make_pair(col, std::vector<std::vector<size_t>>());
}

virtual bool order_by_limit(bool asc, size_t limit,
std::vector<size_t>& offsets) const {
LOG(INFO) << "order by limit not implemented for " << this->column_info();
LOG(ERROR) << "order by limit not implemented for " << this->column_info()
<< ", return false by default";
return false;
}
};
Expand Down
15 changes: 2 additions & 13 deletions include/neug/execution/common/columns/list_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,14 @@
namespace neug {
namespace execution {

class ListColumnBase : public IContextColumn {
public:
virtual std::pair<std::shared_ptr<IContextColumn>, std::vector<size_t>>
unfold() const = 0;
};

struct list_item {
uint64_t offset;
uint64_t length;
};

class ListColumnBuilder;

class ListColumn : public ListColumnBase {
class ListColumn : public IContextColumn {
public:
explicit ListColumn(DataType type) : elem_type_(type) {
std::shared_ptr<ExtraTypeInfo> elem_type_info =
Expand Down Expand Up @@ -64,13 +58,8 @@ class ListColumn : public ListColumnBase {
return Value::LIST(elem_type_, std::move(list_values));
}

void generate_dedup_offset(std::vector<size_t>& offsets) const override {
LOG(FATAL) << "not implemented for " << this->column_info();
// ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets);
}

std::pair<std::shared_ptr<IContextColumn>, std::vector<size_t>> unfold()
const override;
const;

std::shared_ptr<IContextColumn> data_column() const { return datas_; }

Expand Down
5 changes: 3 additions & 2 deletions include/neug/execution/common/columns/path_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ class PathColumn : public IContextColumn {
}
inline const Path& get_path(size_t idx) const { return data_[idx]; }

void generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets);
bool generate_dedup_offset(std::vector<size_t>& offsets) const override {
ColumnsUtils::generate_dedup_offset(data_, offsets);
return true;
}

template <typename FUNC>
Expand Down
2 changes: 0 additions & 2 deletions include/neug/execution/common/columns/struct_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class StructColumn : public IContextColumn {
const DataType& elem_type() const override { return type_; }
Value get_elem(size_t idx) const override;

void generate_dedup_offset(std::vector<size_t>& offsets) const override;

bool is_optional() const override { return is_optional_; }

bool has_value(size_t idx) const override {
Expand Down
6 changes: 4 additions & 2 deletions include/neug/execution/common/columns/value_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ class ValueColumn : public IContextColumn {
const std::vector<T>& data() const { return data_; }
const std::vector<bool>& validity_bitmap() const { return valid_; }

void generate_dedup_offset(std::vector<size_t>& offsets) const override {
bool generate_dedup_offset(std::vector<size_t>& offsets) const override {
if (!is_optional_) {
return ColumnsUtils::generate_dedup_offset(data_, data_.size(), offsets);
ColumnsUtils::generate_dedup_offset(data_, offsets);
return true;
}
std::set<T> st;
size_t null_index = std::numeric_limits<size_t>::max();
Expand All @@ -79,6 +80,7 @@ class ValueColumn : public IContextColumn {
if (null_index != std::numeric_limits<size_t>::max()) {
offsets.push_back(null_index);
}
return true;
}

std::shared_ptr<IContextColumn> union_col(
Expand Down
18 changes: 8 additions & 10 deletions include/neug/execution/common/columns/vertex_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class IVertexColumn : public IContextColumn {
IVertexColumn() : type_(DataType(DataTypeId::kVertex)) {}
virtual ~IVertexColumn() = default;

__attribute__((always_inline)) ContextColumnType column_type()
const override {
ContextColumnType column_type() const override {
return ContextColumnType::kVertex;
}

Expand Down Expand Up @@ -89,8 +88,7 @@ class SLVertexColumn : public IVertexColumn {
std::to_string(size()) + "]";
}

__attribute__((always_inline)) VertexColumnType vertex_column_type()
const override {
VertexColumnType vertex_column_type() const override {
return VertexColumnType::kSingle;
}

Expand All @@ -116,7 +114,7 @@ class SLVertexColumn : public IVertexColumn {
std::shared_ptr<IContextColumn> union_col(
std::shared_ptr<IContextColumn> other) const override;

void generate_dedup_offset(std::vector<size_t>& offsets) const override;
bool generate_dedup_offset(std::vector<size_t>& offsets) const override;

std::pair<std::shared_ptr<IContextColumn>, std::vector<std::vector<size_t>>>
generate_aggregate_offset() const override;
Expand Down Expand Up @@ -175,8 +173,7 @@ class MSVertexColumn : public IVertexColumn {
std::to_string(size()) + "]";
}

__attribute__((always_inline)) VertexColumnType vertex_column_type()
const override {
VertexColumnType vertex_column_type() const override {
return VertexColumnType::kMultiSegment;
}

Expand Down Expand Up @@ -234,6 +231,8 @@ class MSVertexColumn : public IVertexColumn {
return vertices_[seg_id].second;
}

bool generate_dedup_offset(std::vector<size_t>& offsets) const override;

private:
friend class MSVertexColumnBuilder;
std::vector<std::pair<label_t, std::vector<vid_t>>> vertices_;
Expand Down Expand Up @@ -320,8 +319,7 @@ class MLVertexColumn : public IVertexColumn {
std::to_string(size()) + "]";
}

__attribute__((always_inline)) VertexColumnType vertex_column_type()
const override {
VertexColumnType vertex_column_type() const override {
return VertexColumnType::kMultiple;
}

Expand Down Expand Up @@ -353,7 +351,7 @@ class MLVertexColumn : public IVertexColumn {

std::set<label_t> get_labels_set() const override { return labels_; }

void generate_dedup_offset(std::vector<size_t>& offsets) const override;
bool generate_dedup_offset(std::vector<size_t>& offsets) const override;

private:
friend class MLVertexColumnBuilder;
Expand Down
Loading
Loading