Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/neug/storages/graph/edge_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class EdgeTable {
size_t Capacity() const;

private:
void dropAndCreateNewBundledCSR();
void dropAndCreateNewBundledCSR(std::shared_ptr<ColumnBase> prev_data_col);
void dropAndCreateNewUnbundledCSR(bool delete_property);
std::string get_next_csr_path_suffix();

Expand Down
129 changes: 108 additions & 21 deletions src/storages/graph/edge_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,58 @@ void batch_put_edges_with_default_edata(const std::vector<vid_t>& src_lid,
case DataTypeId::kEmpty:
batch_put_edges_with_default_edata_impl<EmptyType>(src_lid, dst_lid,
EmptyType(), out_csr);
break;
default:
THROW_NOT_SUPPORTED_EXCEPTION("not support edge data type: " +
std::to_string(property_type));
}
}

void batch_put_edges_to_bundled_csr(const std::vector<vid_t>& src_lid,
const std::vector<vid_t>& dst_lid,
DataTypeId property_type,
const std::vector<Property>& edge_data,
CsrBase* out_csr) {
switch (property_type) {
#define TYPE_DISPATCHER(enum_val, type) \
case DataTypeId::enum_val: { \
std::vector<type> typed_data; \
typed_data.reserve(edge_data.size()); \
for (const auto& prop : edge_data) { \
typed_data.emplace_back(PropUtils<type>::to_typed(prop)); \
} \
dynamic_cast<TypedCsrBase<type>*>(out_csr)->batch_put_edges( \
src_lid, dst_lid, typed_data); \
break; \
}
TYPE_DISPATCHER(kBoolean, bool);
TYPE_DISPATCHER(kInt32, int32_t);
TYPE_DISPATCHER(kUInt32, uint32_t);
TYPE_DISPATCHER(kInt64, int64_t);
TYPE_DISPATCHER(kUInt64, uint64_t);
TYPE_DISPATCHER(kFloat, float);
TYPE_DISPATCHER(kDouble, double);
TYPE_DISPATCHER(kDate, Date);
TYPE_DISPATCHER(kTimestampMs, DateTime);
TYPE_DISPATCHER(kInterval, Interval);
#undef TYPE_DISPATCHER
case DataTypeId::kEmpty: {
dynamic_cast<TypedCsrBase<EmptyType>*>(out_csr)->batch_put_edges(
src_lid, dst_lid, {});
break;
}
case DataTypeId::kVarchar: {
THROW_NOT_SUPPORTED_EXCEPTION("not support edge data type: " +
std::to_string(property_type));
break;
}
default:
THROW_NOT_SUPPORTED_EXCEPTION(
"Unsupported edge property type: " +
std::to_string(static_cast<int>(property_type)));
}
}

template <typename T>
std::unique_ptr<CsrBase> create_csr_impl(bool is_mutable,
EdgeStrategy strategy) {
Expand Down Expand Up @@ -191,7 +237,8 @@ static std::unique_ptr<CsrBase> create_csr(bool is_mutable,
return create_csr_impl<EmptyType>(is_mutable, strategy);
}
default: {
LOG(FATAL) << "not support edge data type";
THROW_NOT_SUPPORTED_EXCEPTION("not support edge data type: " +
std::to_string(property_type));
return nullptr;
}
}
Expand Down Expand Up @@ -426,7 +473,8 @@ void batch_add_bundled_edges_impl(CsrBase* out_csr, CsrBase* in_csr,
FOR_EACH_DATA_TYPE_NO_STRING(TYPE_DISPATCHER)
#undef TYPE_DISPATCHER
default:
LOG(FATAL) << "not support edge data type: " << prop_types[0].ToString();
THROW_NOT_SUPPORTED_EXCEPTION("not support edge data type: " +
std::to_string(prop_types[0].id()));
}
}

Expand Down Expand Up @@ -780,8 +828,9 @@ void EdgeTable::AddProperties(const std::vector<std::string>& prop_names,
if (table_->col_num() == 0) {
// NOTE: Rather than check meta_->is_bundled(),we check whether the table
// is empty.
if (meta_->properties.size() == 1) {
dropAndCreateNewBundledCSR();
if (meta_->properties.size() == 1 &&
meta_->properties[0].id() != DataTypeId::kVarchar) {
dropAndCreateNewBundledCSR(nullptr);
} else {
dropAndCreateNewUnbundledCSR(false);
}
Expand Down Expand Up @@ -822,6 +871,14 @@ void EdgeTable::DeleteProperties(const std::vector<std::string>& col_names) {
table_->delete_column(col);
VLOG(1) << "delete column " << col;
}
if (table_->col_num() == 0) {
dropAndCreateNewUnbundledCSR(true);
} else if (table_->col_num() == 1) {
auto remaining_col = table_->get_column_by_id(0);
if (remaining_col->type() != DataTypeId::kVarchar) {
dropAndCreateNewBundledCSR(remaining_col);
}
}
}
}

Expand All @@ -834,9 +891,10 @@ int32_t EdgeTable::AddEdge(vid_t src_lid, vid_t dst_lid,
(edge_data.size() == 0 &&
(meta_->properties.empty() ||
meta_->properties[0] == DataTypeId::kEmpty)));
in_csr_->put_generic_edge(dst_lid, src_lid, edge_data[0], ts, alloc);
Property bundled_data = edge_data.empty() ? Property() : edge_data[0];
in_csr_->put_generic_edge(dst_lid, src_lid, bundled_data, ts, alloc);
oe_offset =
out_csr_->put_generic_edge(src_lid, dst_lid, edge_data[0], ts, alloc);
out_csr_->put_generic_edge(src_lid, dst_lid, bundled_data, ts, alloc);
} else {
if (meta_->properties.size() != edge_data.size()) {
THROW_INVALID_ARGUMENT_EXCEPTION(
Expand Down Expand Up @@ -980,7 +1038,11 @@ size_t EdgeTable::Capacity() const {
return capacity_.load();
}

void EdgeTable::dropAndCreateNewBundledCSR() {
void EdgeTable::dropAndCreateNewBundledCSR(
std::shared_ptr<ColumnBase> remaining_col) {
DataTypeId property_type = (remaining_col == nullptr)
? meta_->properties[0].id()
: remaining_col->type();
auto suffix = get_next_csr_path_suffix();
std::string next_oe_csr_path =
tmp_dir(work_dir_) + "/" +
Expand All @@ -993,26 +1055,46 @@ void EdgeTable::dropAndCreateNewBundledCSR() {
meta_->edge_label_name) +
suffix;

auto edges = out_csr_->batch_export(nullptr);
std::unique_ptr<CsrBase> new_out_csr, new_in_csr;
assert(meta_->properties.size() == 1);
new_out_csr = create_csr(meta_->oe_mutable, meta_->oe_strategy,
meta_->properties[0].id());
new_in_csr = create_csr(meta_->ie_mutable, meta_->ie_strategy,
meta_->properties[0].id());
new_out_csr =
create_csr(meta_->oe_mutable, meta_->oe_strategy, property_type);
new_in_csr = create_csr(meta_->ie_mutable, meta_->ie_strategy, property_type);

new_out_csr->open_in_memory(next_oe_csr_path);
new_in_csr->open_in_memory(next_ie_csr_path);
new_out_csr->resize(out_csr_->size());
new_in_csr->resize(in_csr_->size());

batch_put_edges_with_default_edata(
std::get<0>(edges), std::get<1>(edges), meta_->properties[0].id(),
meta_->default_property_values[0], new_out_csr.get());
batch_put_edges_with_default_edata(
std::get<1>(edges), std::get<0>(edges), meta_->properties[0].id(),
meta_->default_property_values[0], new_in_csr.get());
if (remaining_col == nullptr) {
auto edges = out_csr_->batch_export(nullptr);
batch_put_edges_with_default_edata(
std::get<0>(edges), std::get<1>(edges), property_type,
meta_->default_property_values[0], new_out_csr.get());
batch_put_edges_with_default_edata(
std::get<1>(edges), std::get<0>(edges), property_type,
meta_->default_property_values[0], new_in_csr.get());
} else {
auto row_id_col = std::make_shared<ULongColumn>(StorageStrategy::kMem);
auto edges = out_csr_->batch_export(row_id_col);
std::vector<Property> remaining_data;
remaining_data.reserve(row_id_col->size());
for (size_t i = 0; i < row_id_col->size(); ++i) {
auto row_id = row_id_col->get_view(i);
CHECK_LT(row_id, remaining_col->size());
remaining_data.emplace_back(remaining_col->get_prop(row_id));
}
batch_put_edges_to_bundled_csr(std::get<0>(edges), std::get<1>(edges),
property_type, remaining_data,
new_out_csr.get());
batch_put_edges_to_bundled_csr(std::get<1>(edges), std::get<0>(edges),
property_type, remaining_data,
new_in_csr.get());
}

table_->drop();
table_ = std::make_unique<Table>();
table_idx_.store(0);
capacity_.store(0);
out_csr_->close();
in_csr_->close();
out_csr_ = std::move(new_out_csr);
Expand Down Expand Up @@ -1046,7 +1128,9 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) {
std::shared_ptr<ColumnBase> prev_data_col = nullptr;

if (!delete_property) {
if (table_->col_num() >= 1) {
if (table_->col_num() >= 1 &&
table_->get_column_by_id(0)->type() != DataTypeId::kVarchar &&
table_->get_column_by_id(0)->type() != DataTypeId::kEmpty) {
prev_data_col = table_->get_column_by_id(0);
}
} else {
Expand All @@ -1061,12 +1145,15 @@ void EdgeTable::dropAndCreateNewUnbundledCSR(bool delete_property) {
table_->resize(prev_data_col->size(), meta_->default_property_values);
table_idx_.store(prev_data_col->size());
EnsureCapacity(prev_data_col->size());
} else if (!delete_property) {
table_->resize(std::get<0>(edges).size(), meta_->default_property_values);
table_idx_.store(std::get<0>(edges).size());
EnsureCapacity(std::get<0>(edges).size());
}
std::vector<uint64_t> row_ids;
for (size_t i = 0; i < std::get<0>(edges).size(); ++i) {
row_ids.push_back(i);
}

std::unique_ptr<CsrBase> new_out_csr, new_in_csr;
if (delete_property) {
new_out_csr =
Expand Down
Loading
Loading