Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE)
endif ()

if (NOT DEFINED CMAKE_BUILD_PARALLEL_LEVEL)
include(ProcessorCount)
ProcessorCount(N)
if (N EQUAL 0)
set(N 1)
endif ()
set(CMAKE_BUILD_PARALLEL_LEVEL ${N} CACHE STRING "Number of parallel build jobs")
message("CMAKE BUILD PARALLEL LEVEL: ${CMAKE_BUILD_PARALLEL_LEVEL} (auto-detected)")
else ()
message("CMAKE BUILD PARALLEL LEVEL: ${CMAKE_BUILD_PARALLEL_LEVEL} (from environment)")
endif ()

message("CMAKE BUILD TYPE " ${CMAKE_BUILD_TYPE})
if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g")
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/common/allocator/my_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ struct String {
memcpy(buf_, str.buf_, len_);
return common::E_OK;
}

FORCE_INLINE int dup_from(const char *str, uint32_t len,
common::PageArena &pa) {
len_ = len;
if (UNLIKELY(len_ == 0)) {
return common::E_OK;
}
buf_ = pa.alloc(len_);
if (IS_NULL(buf_)) {
return common::E_OOM;
}
memcpy(buf_, str, len_);
return common::E_OK;
}

FORCE_INLINE int build_from(const String &s1, const String &s2,
common::PageArena &pa) {
len_ = s1.len_ + s2.len_;
Expand Down
20 changes: 6 additions & 14 deletions cpp/src/common/record.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ struct DataPoint {
int64_t i64_val_;
float float_val_;
double double_val_;
common::String *str_val_;
} u_;
TextType text_val_;
common::String text_val_;

DataPoint(const std::string &measurement_name, bool b)
: measurement_name_(measurement_name), text_val_() {
Expand All @@ -82,19 +81,12 @@ struct DataPoint {
u_.double_val_ = d;
}

DataPoint(const std::string &measurement_name, common::String &str,
common::PageArena &pa)
DataPoint(const std::string &measurement_name, common::String str)
: measurement_name_(measurement_name), text_val_() {
char *p_buf = (char *)pa.alloc(sizeof(common::String));
u_.str_val_ = new (p_buf) common::String();
u_.str_val_->dup_from(str, pa);
text_val_.buf_ = str.buf_;
text_val_.len_ = str.len_;
}

// DataPoint(const std::string &measurement_name, Text &text),
// : measurement_name_(measurement_name),
// data_type_(common::TEXT),
// text_val_(text) {}

DataPoint(const std::string &measurement_name)
: isnull(true), measurement_name_(measurement_name) {}
void set_i32(int32_t i32) {
Expand Down Expand Up @@ -126,7 +118,7 @@ struct TsRecord {
}

TsRecord(const std::string &device_name, const int64_t &timestamp)
: device_id_(device_name), timestamp_(timestamp) {
: timestamp_(timestamp), device_id_(device_name) {
pa.init(512, common::MOD_TSFILE_READER);
}

Expand All @@ -150,7 +142,7 @@ template <>
inline int TsRecord::add_point(const std::string &measurement_name,
common::String val) {
int ret = common::E_OK;
points_.emplace_back(DataPoint(measurement_name, val, pa));
points_.emplace_back(DataPoint(measurement_name, val));
return ret;
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/common/row_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ struct Field {
switch (type_) {
case common::TSDataType::BOOLEAN:
return value_.bval_;
case common::TSDataType::DATE:
case common::TSDataType::INT32:
return value_.ival_;
case common::TSDataType::TIMESTAMP:
Expand Down
60 changes: 20 additions & 40 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ TsRecord _ts_record_new(const char* device_id, Timestamp timestamp,
return common::E_OK; \
}

ERRNO _insert_data_into_ts_record_by_name_string_with_len(
TsRecord data, const char* measurement_name, const char* value,
const uint32_t value_len) {
auto* record = (storage::TsRecord*)data;
if (record->points_.size() + 1 > record->points_.capacity())
return common::E_BUF_NOT_ENOUGH;
common::String str_value;
str_value.dup_from(value, value_len, record->pa);
record->add_point(measurement_name, str_value);
return common::E_OK;
}

INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int32_t);
INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int64_t);
INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(bool);
Expand Down Expand Up @@ -344,7 +356,7 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns,
}

bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) {
auto* r = static_cast<storage::TableResultSet*>(result_set);
auto* r = static_cast<storage::ResultSet*>(result_set);
bool has_next = true;
int ret = common::E_OK;
ret = r->next(has_next);
Expand All @@ -358,7 +370,7 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) {
#define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type) \
type tsfile_result_set_get_value_by_name_##type(ResultSet result_set, \
const char* column_name) { \
auto* r = static_cast<storage::TableResultSet*>(result_set); \
auto* r = static_cast<storage::ResultSet*>(result_set); \
std::string column_name_(column_name); \
return r->get_value<type>(column_name_); \
}
Expand All @@ -370,7 +382,7 @@ TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(float);
TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(double);
char* tsfile_result_set_get_value_by_name_string(ResultSet result_set,
const char* column_name) {
auto* r = static_cast<storage::TableResultSet*>(result_set);
auto* r = static_cast<storage::ResultSet*>(result_set);
std::string column_name_(column_name);
common::String* ret = r->get_value<common::String*>(column_name_);
// Caller should free return's char* 's space.
Expand All @@ -385,7 +397,7 @@ char* tsfile_result_set_get_value_by_name_string(ResultSet result_set,
#define TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(type) \
type tsfile_result_set_get_value_by_index_##type(ResultSet result_set, \
uint32_t column_index) { \
auto* r = static_cast<storage::TableResultSet*>(result_set); \
auto* r = static_cast<storage::ResultSet*>(result_set); \
return r->get_value<type>(column_index); \
}

Expand All @@ -397,7 +409,7 @@ TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(bool);

char* tsfile_result_set_get_value_by_index_string(ResultSet result_set,
uint32_t column_index) {
auto* r = static_cast<storage::TableResultSet*>(result_set);
auto* r = static_cast<storage::ResultSet*>(result_set);
common::String* ret = r->get_value<common::String*>(column_index);
// Caller should free return's char* 's space.
char* dup = (char*)malloc(ret->len_ + 1);
Expand All @@ -410,18 +422,18 @@ char* tsfile_result_set_get_value_by_index_string(ResultSet result_set,

bool tsfile_result_set_is_null_by_name(ResultSet result_set,
const char* column_name) {
auto* r = static_cast<storage::TableResultSet*>(result_set);
auto* r = static_cast<storage::ResultSet*>(result_set);
return r->is_null(column_name);
}

bool tsfile_result_set_is_null_by_index(const ResultSet result_set,
const uint32_t column_index) {
auto* r = static_cast<storage::TableResultSet*>(result_set);
auto* r = static_cast<storage::ResultSet*>(result_set);
return r->is_null(column_index);
}

ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) {
auto* r = static_cast<storage::TableResultSet*>(result_set);
auto* r = static_cast<storage::ResultSet*>(result_set);
if (result_set == NULL) {
return ResultSetMetaData();
}
Expand Down Expand Up @@ -463,38 +475,6 @@ int tsfile_result_set_metadata_get_column_num(ResultSetMetaData result_set) {
return result_set.column_num;
}

// TableSchema tsfile_reader_get_table_schema(TsFileReader reader,
// const char *table_name) {
// // TODO: Implement get table schema with tsfile reader.
// return TableSchema();
// }
//
// DeviceSchema tsfile_reader_get_device_schema(TsFileReader reader,
// const char *device_id) {
// auto *r = static_cast<storage::TsFileReader *>(reader);
// std::vector<storage::MeasurementSchema> measurement_schemas;
// r->get_timeseries_schema(
// std::make_shared<storage::StringArrayDeviceID>(device_id),
// measurement_schemas);
// DeviceSchema schema;
// schema.device_name = strdup(device_id);
// schema.timeseries_num = measurement_schemas.size();
// schema.timeseries_schema = static_cast<TimeseriesSchema *>(
// malloc(sizeof(TimeseriesSchema) * schema.timeseries_num));
// for (int i = 0; i < schema.timeseries_num; i++) {
// schema.timeseries_schema[i].timeseries_name =
// strdup(measurement_schemas[i].measurement_name_.c_str());
// schema.timeseries_schema[i].data_type =
// static_cast<TSDataType>(measurement_schemas[i].data_type_);
// schema.timeseries_schema[i].compression =
// static_cast<CompressionType>(
// measurement_schemas[i].compression_type_);
// schema.timeseries_schema[i].encoding =
// static_cast<TSEncoding>(measurement_schemas[i].encoding_);
// }
// return schema;
// }

TableSchema tsfile_reader_get_table_schema(TsFileReader reader,
const char* table_name) {
auto* r = static_cast<storage::TsFileReader*>(reader);
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ typedef enum {
TS_DATATYPE_DOUBLE = 4,
TS_DATATYPE_TEXT = 5,
TS_DATATYPE_VECTOR = 6,
TS_DATATYPE_DATE = 9,
TS_DATATYPE_BLOB = 10,
TS_DATATYPE_STRING = 11,
TS_DATATYPE_NULL_TYPE = 254,
TS_DATATYPE_INVALID = 255
Expand Down Expand Up @@ -637,6 +639,10 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(float);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(double);

ERRNO _insert_data_into_ts_record_by_name_string_with_len(
TsRecord data, const char* measurement_name, const char* value,
const uint32_t value_len);

// Write a tablet into a device.
ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet);

Expand Down
43 changes: 22 additions & 21 deletions cpp/src/reader/qds_with_timegenerator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ void SeriesScanStream::pop_front(int64_t beyond_this_time) {
int64_t SeriesScanStream::read_timestamp() {
uint32_t ret_len = 0;
bool is_null = false;
char *data = col_iter_->read(&ret_len, &is_null);
char* data = col_iter_->read(&ret_len, &is_null);
ASSERT(ret_len == 8);
return *(int64_t *)data;
return *(int64_t*)data;
}

// get value object pointer at time @target_timestamp
// if no such TV exists, return nullptr
void *ValueAt::at(int64_t target_timestamp) {
void* ValueAt::at(int64_t target_timestamp) {
ASSERT(ssi_ != nullptr);
if (cur_time_ > target_timestamp) {
return nullptr;
Expand All @@ -120,10 +120,10 @@ void *ValueAt::at(int64_t target_timestamp) {
uint32_t ret_len = 0;
while (true) {
while (!time_col_iter_->end()) {
char *iter_time_ptr = time_col_iter_->read(&ret_len);
cur_time_ = *(int64_t *)iter_time_ptr;
char* iter_time_ptr = time_col_iter_->read(&ret_len);
cur_time_ = *(int64_t*)iter_time_ptr;
if (cur_time_ == target_timestamp) {
char *val_obj_ptr = value_col_iter_->read(&ret_len);
char* val_obj_ptr = value_col_iter_->read(&ret_len);
time_col_iter_->next();
value_col_iter_->next();
return val_obj_ptr;
Expand Down Expand Up @@ -174,7 +174,7 @@ void ValueAt::destroy() {
#ifdef DEBUG_SE
int depth = 0;
struct DG {
explicit DG(int &depth) : depth_(depth) { depth_++; }
explicit DG(int& depth) : depth_(depth) { depth_++; }
~DG() { depth_--; }
std::string get_indent() {
std::string s;
Expand All @@ -183,7 +183,7 @@ struct DG {
}
return s;
}
int &depth_;
int& depth_;
};
#endif

Expand Down Expand Up @@ -283,7 +283,7 @@ void Node::next_timestamp(int64_t beyond_this_time) {
}
}

int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) {
int QDSWithTimeGenerator::init(TsFileIOReader* io_reader, QueryExpression* qe) {
pa_.reset();
pa_.init(512, common::MOD_TSFILE_READER);
int ret = common::E_OK; // cppcheck-suppress unreadVariable
Expand All @@ -294,7 +294,7 @@ int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) {
std::vector<common::TSDataType> data_types;
column_names.reserve(paths.size());
data_types.reserve(paths.size());
for (const auto &path : paths) {
for (const auto& path : paths) {
column_names.push_back(path.full_path_);
}
index_lookup_.insert({"time", 0});
Expand All @@ -318,7 +318,7 @@ int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) {
return ret;
}

void destroy_node(Node *node) {
void destroy_node(Node* node) {
if (node->left_) {
destroy_node(node->left_);
}
Expand Down Expand Up @@ -348,7 +348,7 @@ void QDSWithTimeGenerator::close() {
pa_.destroy();
}

int QDSWithTimeGenerator::next(bool &has_next) {
int QDSWithTimeGenerator::next(bool& has_next) {
if (tree_ == nullptr) {
has_next = false;
return E_OK;
Expand All @@ -367,8 +367,8 @@ int QDSWithTimeGenerator::next(bool &has_next) {
#endif

for (size_t i = 0; i < value_at_vec_.size(); i++) {
ValueAt &va = value_at_vec_[i];
void *val_obj_ptr = va.at(timestamp);
ValueAt& va = value_at_vec_[i];
void* val_obj_ptr = va.at(timestamp);
row_record_->get_field(i + 1)->set_value(va.data_type_, val_obj_ptr,
get_len(va.data_type_), pa_);
}
Expand All @@ -381,26 +381,27 @@ int QDSWithTimeGenerator::next(bool &has_next) {
return E_OK;
}

bool QDSWithTimeGenerator::is_null(const std::string &column_name) {
bool QDSWithTimeGenerator::is_null(const std::string& column_name) {
auto iter = index_lookup_.find(column_name);
if (iter == index_lookup_.end()) {
return true;
} else {
return is_null(iter->second);
return is_null(iter->second + 1);
}
}

bool QDSWithTimeGenerator::is_null(uint32_t column_index) {
return row_record_->get_field(column_index) == nullptr;
return row_record_->get_field(column_index - 1) == nullptr ||
row_record_->get_field(column_index - 1)->type_ == NULL_TYPE;
}

RowRecord *QDSWithTimeGenerator::get_row_record() { return row_record_; }
RowRecord* QDSWithTimeGenerator::get_row_record() { return row_record_; }

std::shared_ptr<ResultSetMetadata> QDSWithTimeGenerator::get_metadata() {
return result_set_metadata_;
}

int QDSWithTimeGenerator::construct_node_tree(Expression *expr, Node *&node) {
int QDSWithTimeGenerator::construct_node_tree(Expression* expr, Node*& node) {
int ret = E_OK;
if (expr->type_ == AND_EXPR || expr->type_ == OR_EXPR) {
if (expr->type_ == AND_EXPR) {
Expand All @@ -412,8 +413,8 @@ int QDSWithTimeGenerator::construct_node_tree(Expression *expr, Node *&node) {
} else if (RET_FAIL(construct_node_tree(expr->right_, node->right_))) {
}
} else if (expr->type_ == SERIES_EXPR) {
Node *leaf = new Node(LEAF_NODE);
Path &path = expr->series_path_;
Node* leaf = new Node(LEAF_NODE);
Path& path = expr->series_path_;
int ret = io_reader_->alloc_ssi(path.device_id_, path.measurement_,
leaf->sss_.ssi_, pa_, expr->filter_);
if (E_OK == ret) {
Expand Down
Loading
Loading