diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 51f69e83c..76df92f33 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -55,6 +55,18 @@ if (NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build." FORCE) endif () +if (NOT DEFINED CMAKE_BUILD_PARALLEL_LEVEL) + include(ProcessorCount) + ProcessorCount(N) + if (N EQUAL 0) + set(N 1) + endif () + set(CMAKE_BUILD_PARALLEL_LEVEL ${N} CACHE STRING "Number of parallel build jobs") + message("CMAKE BUILD PARALLEL LEVEL: ${CMAKE_BUILD_PARALLEL_LEVEL} (auto-detected)") +else () + message("CMAKE BUILD PARALLEL LEVEL: ${CMAKE_BUILD_PARALLEL_LEVEL} (from environment)") +endif () + message("CMAKE BUILD TYPE " ${CMAKE_BUILD_TYPE}) if (CMAKE_BUILD_TYPE STREQUAL "Debug") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g") diff --git a/cpp/src/common/allocator/my_string.h b/cpp/src/common/allocator/my_string.h index ef27f2d3c..f3ec60a3f 100644 --- a/cpp/src/common/allocator/my_string.h +++ b/cpp/src/common/allocator/my_string.h @@ -78,6 +78,21 @@ struct String { memcpy(buf_, str.buf_, len_); return common::E_OK; } + + FORCE_INLINE int dup_from(const char *str, uint32_t len, + common::PageArena &pa) { + len_ = len; + if (UNLIKELY(len_ == 0)) { + return common::E_OK; + } + buf_ = pa.alloc(len_); + if (IS_NULL(buf_)) { + return common::E_OOM; + } + memcpy(buf_, str, len_); + return common::E_OK; + } + FORCE_INLINE int build_from(const String &s1, const String &s2, common::PageArena &pa) { len_ = s1.len_ + s2.len_; diff --git a/cpp/src/common/record.h b/cpp/src/common/record.h index 8c729f688..a8449c658 100644 --- a/cpp/src/common/record.h +++ b/cpp/src/common/record.h @@ -53,9 +53,8 @@ struct DataPoint { int64_t i64_val_; float float_val_; double double_val_; - common::String *str_val_; } u_; - TextType text_val_; + common::String text_val_; DataPoint(const std::string &measurement_name, bool b) : measurement_name_(measurement_name), text_val_() { @@ -82,19 +81,12 @@ struct DataPoint { u_.double_val_ = d; } - DataPoint(const std::string &measurement_name, common::String &str, - common::PageArena &pa) + DataPoint(const std::string &measurement_name, common::String str) : measurement_name_(measurement_name), text_val_() { - char *p_buf = (char *)pa.alloc(sizeof(common::String)); - u_.str_val_ = new (p_buf) common::String(); - u_.str_val_->dup_from(str, pa); + text_val_.buf_ = str.buf_; + text_val_.len_ = str.len_; } - // DataPoint(const std::string &measurement_name, Text &text), - // : measurement_name_(measurement_name), - // data_type_(common::TEXT), - // text_val_(text) {} - DataPoint(const std::string &measurement_name) : isnull(true), measurement_name_(measurement_name) {} void set_i32(int32_t i32) { @@ -126,7 +118,7 @@ struct TsRecord { } TsRecord(const std::string &device_name, const int64_t ×tamp) - : device_id_(device_name), timestamp_(timestamp) { + : timestamp_(timestamp), device_id_(device_name) { pa.init(512, common::MOD_TSFILE_READER); } @@ -150,7 +142,7 @@ template <> inline int TsRecord::add_point(const std::string &measurement_name, common::String val) { int ret = common::E_OK; - points_.emplace_back(DataPoint(measurement_name, val, pa)); + points_.emplace_back(DataPoint(measurement_name, val)); return ret; } diff --git a/cpp/src/common/row_record.h b/cpp/src/common/row_record.h index 5ff5e232f..713cabc83 100644 --- a/cpp/src/common/row_record.h +++ b/cpp/src/common/row_record.h @@ -103,6 +103,7 @@ struct Field { switch (type_) { case common::TSDataType::BOOLEAN: return value_.bval_; + case common::TSDataType::DATE: case common::TSDataType::INT32: return value_.ival_; case common::TSDataType::TIMESTAMP: diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 8a8f462b8..4c38daa96 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -280,6 +280,18 @@ TsRecord _ts_record_new(const char* device_id, Timestamp timestamp, return common::E_OK; \ } +ERRNO _insert_data_into_ts_record_by_name_string_with_len( + TsRecord data, const char* measurement_name, const char* value, + const uint32_t value_len) { + auto* record = (storage::TsRecord*)data; + if (record->points_.size() + 1 > record->points_.capacity()) + return common::E_BUF_NOT_ENOUGH; + common::String str_value; + str_value.dup_from(value, value_len, record->pa); + record->add_point(measurement_name, str_value); + return common::E_OK; +} + INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int32_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int64_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(bool); @@ -344,7 +356,7 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, } bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { - auto* r = static_cast(result_set); + auto* r = static_cast(result_set); bool has_next = true; int ret = common::E_OK; ret = r->next(has_next); @@ -358,7 +370,7 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { #define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type) \ type tsfile_result_set_get_value_by_name_##type(ResultSet result_set, \ const char* column_name) { \ - auto* r = static_cast(result_set); \ + auto* r = static_cast(result_set); \ std::string column_name_(column_name); \ return r->get_value(column_name_); \ } @@ -370,7 +382,7 @@ TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(float); TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(double); char* tsfile_result_set_get_value_by_name_string(ResultSet result_set, const char* column_name) { - auto* r = static_cast(result_set); + auto* r = static_cast(result_set); std::string column_name_(column_name); common::String* ret = r->get_value(column_name_); // Caller should free return's char* 's space. @@ -385,7 +397,7 @@ char* tsfile_result_set_get_value_by_name_string(ResultSet result_set, #define TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(type) \ type tsfile_result_set_get_value_by_index_##type(ResultSet result_set, \ uint32_t column_index) { \ - auto* r = static_cast(result_set); \ + auto* r = static_cast(result_set); \ return r->get_value(column_index); \ } @@ -397,7 +409,7 @@ TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(bool); char* tsfile_result_set_get_value_by_index_string(ResultSet result_set, uint32_t column_index) { - auto* r = static_cast(result_set); + auto* r = static_cast(result_set); common::String* ret = r->get_value(column_index); // Caller should free return's char* 's space. char* dup = (char*)malloc(ret->len_ + 1); @@ -410,18 +422,18 @@ char* tsfile_result_set_get_value_by_index_string(ResultSet result_set, bool tsfile_result_set_is_null_by_name(ResultSet result_set, const char* column_name) { - auto* r = static_cast(result_set); + auto* r = static_cast(result_set); return r->is_null(column_name); } bool tsfile_result_set_is_null_by_index(const ResultSet result_set, const uint32_t column_index) { - auto* r = static_cast(result_set); + auto* r = static_cast(result_set); return r->is_null(column_index); } ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) { - auto* r = static_cast(result_set); + auto* r = static_cast(result_set); if (result_set == NULL) { return ResultSetMetaData(); } @@ -463,38 +475,6 @@ int tsfile_result_set_metadata_get_column_num(ResultSetMetaData result_set) { return result_set.column_num; } -// TableSchema tsfile_reader_get_table_schema(TsFileReader reader, -// const char *table_name) { -// // TODO: Implement get table schema with tsfile reader. -// return TableSchema(); -// } -// -// DeviceSchema tsfile_reader_get_device_schema(TsFileReader reader, -// const char *device_id) { -// auto *r = static_cast(reader); -// std::vector measurement_schemas; -// r->get_timeseries_schema( -// std::make_shared(device_id), -// measurement_schemas); -// DeviceSchema schema; -// schema.device_name = strdup(device_id); -// schema.timeseries_num = measurement_schemas.size(); -// schema.timeseries_schema = static_cast( -// malloc(sizeof(TimeseriesSchema) * schema.timeseries_num)); -// for (int i = 0; i < schema.timeseries_num; i++) { -// schema.timeseries_schema[i].timeseries_name = -// strdup(measurement_schemas[i].measurement_name_.c_str()); -// schema.timeseries_schema[i].data_type = -// static_cast(measurement_schemas[i].data_type_); -// schema.timeseries_schema[i].compression = -// static_cast( -// measurement_schemas[i].compression_type_); -// schema.timeseries_schema[i].encoding = -// static_cast(measurement_schemas[i].encoding_); -// } -// return schema; -// } - TableSchema tsfile_reader_get_table_schema(TsFileReader reader, const char* table_name) { auto* r = static_cast(reader); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 32f85aa4f..093b413ed 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -35,6 +35,8 @@ typedef enum { TS_DATATYPE_DOUBLE = 4, TS_DATATYPE_TEXT = 5, TS_DATATYPE_VECTOR = 6, + TS_DATATYPE_DATE = 9, + TS_DATATYPE_BLOB = 10, TS_DATATYPE_STRING = 11, TS_DATATYPE_NULL_TYPE = 254, TS_DATATYPE_INVALID = 255 @@ -637,6 +639,10 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool); INSERT_DATA_INTO_TS_RECORD_BY_NAME(float); INSERT_DATA_INTO_TS_RECORD_BY_NAME(double); +ERRNO _insert_data_into_ts_record_by_name_string_with_len( + TsRecord data, const char* measurement_name, const char* value, + const uint32_t value_len); + // Write a tablet into a device. ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); diff --git a/cpp/src/reader/qds_with_timegenerator.cc b/cpp/src/reader/qds_with_timegenerator.cc index 61dd974b3..3df3defbc 100644 --- a/cpp/src/reader/qds_with_timegenerator.cc +++ b/cpp/src/reader/qds_with_timegenerator.cc @@ -92,14 +92,14 @@ void SeriesScanStream::pop_front(int64_t beyond_this_time) { int64_t SeriesScanStream::read_timestamp() { uint32_t ret_len = 0; bool is_null = false; - char *data = col_iter_->read(&ret_len, &is_null); + char* data = col_iter_->read(&ret_len, &is_null); ASSERT(ret_len == 8); - return *(int64_t *)data; + return *(int64_t*)data; } // get value object pointer at time @target_timestamp // if no such TV exists, return nullptr -void *ValueAt::at(int64_t target_timestamp) { +void* ValueAt::at(int64_t target_timestamp) { ASSERT(ssi_ != nullptr); if (cur_time_ > target_timestamp) { return nullptr; @@ -120,10 +120,10 @@ void *ValueAt::at(int64_t target_timestamp) { uint32_t ret_len = 0; while (true) { while (!time_col_iter_->end()) { - char *iter_time_ptr = time_col_iter_->read(&ret_len); - cur_time_ = *(int64_t *)iter_time_ptr; + char* iter_time_ptr = time_col_iter_->read(&ret_len); + cur_time_ = *(int64_t*)iter_time_ptr; if (cur_time_ == target_timestamp) { - char *val_obj_ptr = value_col_iter_->read(&ret_len); + char* val_obj_ptr = value_col_iter_->read(&ret_len); time_col_iter_->next(); value_col_iter_->next(); return val_obj_ptr; @@ -174,7 +174,7 @@ void ValueAt::destroy() { #ifdef DEBUG_SE int depth = 0; struct DG { - explicit DG(int &depth) : depth_(depth) { depth_++; } + explicit DG(int& depth) : depth_(depth) { depth_++; } ~DG() { depth_--; } std::string get_indent() { std::string s; @@ -183,7 +183,7 @@ struct DG { } return s; } - int &depth_; + int& depth_; }; #endif @@ -283,7 +283,7 @@ void Node::next_timestamp(int64_t beyond_this_time) { } } -int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) { +int QDSWithTimeGenerator::init(TsFileIOReader* io_reader, QueryExpression* qe) { pa_.reset(); pa_.init(512, common::MOD_TSFILE_READER); int ret = common::E_OK; // cppcheck-suppress unreadVariable @@ -294,7 +294,7 @@ int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) { std::vector data_types; column_names.reserve(paths.size()); data_types.reserve(paths.size()); - for (const auto &path : paths) { + for (const auto& path : paths) { column_names.push_back(path.full_path_); } index_lookup_.insert({"time", 0}); @@ -318,7 +318,7 @@ int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) { return ret; } -void destroy_node(Node *node) { +void destroy_node(Node* node) { if (node->left_) { destroy_node(node->left_); } @@ -348,7 +348,7 @@ void QDSWithTimeGenerator::close() { pa_.destroy(); } -int QDSWithTimeGenerator::next(bool &has_next) { +int QDSWithTimeGenerator::next(bool& has_next) { if (tree_ == nullptr) { has_next = false; return E_OK; @@ -367,8 +367,8 @@ int QDSWithTimeGenerator::next(bool &has_next) { #endif for (size_t i = 0; i < value_at_vec_.size(); i++) { - ValueAt &va = value_at_vec_[i]; - void *val_obj_ptr = va.at(timestamp); + ValueAt& va = value_at_vec_[i]; + void* val_obj_ptr = va.at(timestamp); row_record_->get_field(i + 1)->set_value(va.data_type_, val_obj_ptr, get_len(va.data_type_), pa_); } @@ -381,26 +381,27 @@ int QDSWithTimeGenerator::next(bool &has_next) { return E_OK; } -bool QDSWithTimeGenerator::is_null(const std::string &column_name) { +bool QDSWithTimeGenerator::is_null(const std::string& column_name) { auto iter = index_lookup_.find(column_name); if (iter == index_lookup_.end()) { return true; } else { - return is_null(iter->second); + return is_null(iter->second + 1); } } bool QDSWithTimeGenerator::is_null(uint32_t column_index) { - return row_record_->get_field(column_index) == nullptr; + return row_record_->get_field(column_index - 1) == nullptr || + row_record_->get_field(column_index - 1)->type_ == NULL_TYPE; } -RowRecord *QDSWithTimeGenerator::get_row_record() { return row_record_; } +RowRecord* QDSWithTimeGenerator::get_row_record() { return row_record_; } std::shared_ptr QDSWithTimeGenerator::get_metadata() { return result_set_metadata_; } -int QDSWithTimeGenerator::construct_node_tree(Expression *expr, Node *&node) { +int QDSWithTimeGenerator::construct_node_tree(Expression* expr, Node*& node) { int ret = E_OK; if (expr->type_ == AND_EXPR || expr->type_ == OR_EXPR) { if (expr->type_ == AND_EXPR) { @@ -412,8 +413,8 @@ int QDSWithTimeGenerator::construct_node_tree(Expression *expr, Node *&node) { } else if (RET_FAIL(construct_node_tree(expr->right_, node->right_))) { } } else if (expr->type_ == SERIES_EXPR) { - Node *leaf = new Node(LEAF_NODE); - Path &path = expr->series_path_; + Node* leaf = new Node(LEAF_NODE); + Path& path = expr->series_path_; int ret = io_reader_->alloc_ssi(path.device_id_, path.measurement_, leaf->sss_.ssi_, pa_, expr->filter_); if (E_OK == ret) { diff --git a/cpp/src/reader/qds_without_timegenerator.cc b/cpp/src/reader/qds_without_timegenerator.cc index fb2ef6c6b..90c782131 100644 --- a/cpp/src/reader/qds_without_timegenerator.cc +++ b/cpp/src/reader/qds_without_timegenerator.cc @@ -25,8 +25,8 @@ using namespace common; namespace storage { -int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, - QueryExpression *qe) { +int QDSWithoutTimeGenerator::init(TsFileIOReader* io_reader, + QueryExpression* qe) { int ret = E_OK; // cppcheck-suppress unreadVariable pa_.reset(); pa_.init(512, common::MOD_TSFILE_READER); @@ -40,14 +40,14 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, std::vector data_types; column_names.reserve(origin_path_count); data_types.reserve(origin_path_count); - Expression *global_time_expression = qe->expression_; - Filter *global_time_filter = nullptr; + Expression* global_time_expression = qe->expression_; + Filter* global_time_filter = nullptr; if (global_time_expression != nullptr) { global_time_filter = global_time_expression->filter_; } index_lookup_.insert({"time", 0}); for (size_t i = 0; i < origin_path_count; i++) { - TsFileSeriesScanIterator *ssi = nullptr; + TsFileSeriesScanIterator* ssi = nullptr; ret = io_reader_->alloc_ssi(paths[i].device_id_, paths[i].measurement_, ssi, pa_, global_time_filter); if (ret != 0) { @@ -99,7 +99,7 @@ void QDSWithoutTimeGenerator::close() { ssi_vec_[i]->revert_tsblock(); } for (size_t i = 0; i < ssi_vec_.size(); i++) { - TsFileSeriesScanIterator *ssi = ssi_vec_[i]; + TsFileSeriesScanIterator* ssi = ssi_vec_[i]; io_reader_->revert_ssi(ssi); } ssi_vec_.clear(); @@ -110,7 +110,7 @@ void QDSWithoutTimeGenerator::close() { pa_.destroy(); } -int QDSWithoutTimeGenerator::next(bool &has_next) { +int QDSWithoutTimeGenerator::next(bool& has_next) { row_record_->reset(); if (heap_time_.size() == 0) { has_next = false; @@ -125,12 +125,12 @@ int QDSWithoutTimeGenerator::next(bool &has_next) { for (uint32_t i = 0; i < count; ++i) { uint32_t len = 0; auto val_datatype = value_iters_[iter->second]->get_data_type(); - void *val_ptr = value_iters_[iter->second]->read(&len); + void* val_ptr = value_iters_[iter->second]->read(&len); row_record_->get_field(iter->second + 1) ->set_value(val_datatype, val_ptr, len, pa_); value_iters_[iter->second]->next(); if (!time_iters_[iter->second]->end()) { - int64_t timev = *(int64_t *)(time_iters_[iter->second]->read(&len)); + int64_t timev = *(int64_t*)(time_iters_[iter->second]->read(&len)); heap_time_.insert(std::make_pair(timev, iter->second)); time_iters_[iter->second]->next(); } else { @@ -144,20 +144,21 @@ int QDSWithoutTimeGenerator::next(bool &has_next) { return E_OK; } -bool QDSWithoutTimeGenerator::is_null(const std::string &column_name) { +bool QDSWithoutTimeGenerator::is_null(const std::string& column_name) { auto iter = index_lookup_.find(column_name); if (iter == index_lookup_.end()) { return true; } else { - return is_null(iter->second); + return is_null(iter->second + 1); } } bool QDSWithoutTimeGenerator::is_null(uint32_t column_index) { - return row_record_->get_field(column_index) == nullptr; + return row_record_->get_field(column_index - 1) == nullptr || + row_record_->get_field(column_index - 1)->type_ == NULL_TYPE; } -RowRecord *QDSWithoutTimeGenerator::get_row_record() { return row_record_; } +RowRecord* QDSWithoutTimeGenerator::get_row_record() { return row_record_; } std::shared_ptr QDSWithoutTimeGenerator::get_metadata() { return result_set_metadata_; @@ -176,7 +177,7 @@ int QDSWithoutTimeGenerator::get_next_tsblock(uint32_t index, bool alloc_mem) { if (IS_SUCC(ret)) { time_iters_[index] = new ColIterator(0, tsblocks_[index]); uint32_t len = 0; - int64_t time = *(int64_t *)(time_iters_[index]->read(&len)); + int64_t time = *(int64_t*)(time_iters_[index]->read(&len)); time_iters_[index]->next(); heap_time_.insert(std::pair(time, index)); value_iters_[index] = new ColIterator(1, tsblocks_[index]); diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 3ae5cf28a..20c747f92 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -657,7 +657,7 @@ int TsFileWriter::write_point(ChunkWriter *chunk_writer, int64_t timestamp, case common::BLOB: case common::TEXT: case common::STRING: - return chunk_writer->write(timestamp, *point.u_.str_val_); + return chunk_writer->write(timestamp, point.text_val_); default: return E_INVALID_DATA_POINT; } @@ -689,7 +689,7 @@ int TsFileWriter::write_point_aligned(ValueChunkWriter *value_chunk_writer, case common::BLOB: case common::TEXT: case common::STRING: - return value_chunk_writer->write(timestamp, point.u_.str_val_, + return value_chunk_writer->write(timestamp, point.text_val_, isnull); default: return E_INVALID_DATA_POINT; diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index 90a93fb42..8c1fd82a7 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -19,6 +19,17 @@ #include #include #include + +#include "common/row_record.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "reader/result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/tsfile_writer.h" + +namespace storage { +class TsFileReader; +} + extern "C" { #include "cwrapper/errno_define_c.h" #include "cwrapper/tsfile_cwrapper.h" @@ -28,23 +39,138 @@ extern "C" { #include "utils/errno_define.h" namespace cwrapper { -class CWrapperTest : public testing::Test {}; - -// TEST_F(CWrapperTest, RegisterTimeSeries) { -// ERRNO code = 0; -// char* temperature = strdup("temperature"); -// TimeseriesSchema ts_schema{temperature, TS_DATATYPE_INT32, -// TS_ENCODING_PLAIN, -// TS_COMPRESSION_UNCOMPRESSED}; -// remove("cwrapper_register_timeseries.tsfile"); -// TsFileWriter writer = -// tsfile_writer_new("cwrapper_register_timeseries.tsfile", &code); -// ASSERT_EQ(code, 0); -// code = tsfile_writer_register_timeseries(writer, "device1", &ts_schema); -// ASSERT_EQ(code, 0); -// free(temperature); -// tsfile_writer_close(writer); -// } +class CWrapperTest : public testing::Test { + public: + static void ASSERT_OK(ERRNO code, const char* msg = "") { + ASSERT_EQ(code, RET_OK) << msg; + } +}; + +TEST_F(CWrapperTest, TestForPythonInterfaceInsert) { + ERRNO code = 0; + const char* filename = "cwrapper_for_python.tsfile"; + remove(filename); // Clean up any existing file + + // Device and measurement definitions + char* device_id = strdup("root.device1"); + char* str_measurement_id = strdup("str_measurement"); + char* text_measurement_id = strdup("text_measurement"); + char* date_measurement_id = strdup("date_measurement"); + + // Define time series schemas for different data types + timeseries_schema str_measurement; + str_measurement.timeseries_name = str_measurement_id; + str_measurement.compression = TS_COMPRESSION_UNCOMPRESSED; + str_measurement.data_type = TS_DATATYPE_STRING; + str_measurement.encoding = TS_ENCODING_PLAIN; + + timeseries_schema text_measurement; + text_measurement.timeseries_name = text_measurement_id; + text_measurement.compression = TS_COMPRESSION_UNCOMPRESSED; + text_measurement.data_type = TS_DATATYPE_TEXT; + text_measurement.encoding = TS_ENCODING_PLAIN; + + timeseries_schema date_measurement; + date_measurement.timeseries_name = date_measurement_id; + date_measurement.compression = TS_COMPRESSION_UNCOMPRESSED; + date_measurement.data_type = TS_DATATYPE_DATE; + date_measurement.encoding = TS_ENCODING_PLAIN; + + // Create TsFile writer + auto* writer = (storage::TsFileWriter*)_tsfile_writer_new( + filename, 128 * 1024 * 1024, &code); + ASSERT_OK(code, "create writer failed"); + + // Register time series with the writer + ASSERT_OK( + _tsfile_writer_register_timeseries(writer, device_id, &str_measurement), + "register timeseries failed"); + + ASSERT_OK(_tsfile_writer_register_timeseries(writer, device_id, + &text_measurement), + "register timeseries failed"); + + ASSERT_OK(_tsfile_writer_register_timeseries(writer, device_id, + &date_measurement), + "register timeseries failed"); + + // Create a new time series record + auto* record = (storage::TsRecord*)_ts_record_new(device_id, 0, 3); + + // Insert string data + const char* test_str = "test_string"; + ASSERT_OK(_insert_data_into_ts_record_by_name_string_with_len( + record, str_measurement_id, test_str, strlen(test_str)), + "insert data failed"); + + // Insert text data + const char* test_text = "test_text"; + ASSERT_OK(_insert_data_into_ts_record_by_name_string_with_len( + record, text_measurement_id, test_text, strlen(test_text)), + "insert data failed"); + + // Insert date data - NOTE: There's a bug here, should use + // date_measurement_id + int32_t test_date = 20251118; + ASSERT_OK(_insert_data_into_ts_record_by_name_int32_t( + record, date_measurement_id, test_date), + "insert data failed"); + + // Write the record to file and close writer + ASSERT_OK(_tsfile_writer_write_ts_record(writer, record), + "write record failed"); + ASSERT_OK(_tsfile_writer_flush(writer), "flush failed"); + ASSERT_OK(_tsfile_writer_close(writer), "close writer failed"); + _free_tsfile_ts_record(reinterpret_cast(&record)); + // Create reader to verify the written data + auto* reader = (storage::TsFileReader*)tsfile_reader_new(filename, &code); + ASSERT_OK(code, "create reader failed"); + + // Query the data we just wrote + char* sensors[] = {str_measurement_id, text_measurement_id, + date_measurement_id}; + auto* result = (storage::ResultSet*)_tsfile_reader_query_device( + reader, device_id, sensors, 3, 0, 100, &code); + ASSERT_OK(code, "query device failed"); + + // Verify the retrieved data matches what we inserted + bool has_next = false; + int row_count = 0; + while (result->next(has_next) == common::E_OK && has_next) { + // Verify timestamp + EXPECT_EQ(result->get_value(1), row_count); + + // Verify string data + const common::String* str = result->get_value(2); + EXPECT_EQ(strlen(test_str), str->len_); + const char* ret_char = + tsfile_result_set_get_value_by_index_string(result, 2); + EXPECT_EQ(strcmp(test_str, ret_char), 0); + free((void*)ret_char); + + // Verify text data + const common::String* text = result->get_value(3); + EXPECT_EQ(strlen(test_text), text->len_); + const char* ret_text = + tsfile_result_set_get_value_by_index_string(result, 3); + EXPECT_EQ(strcmp(test_text, ret_text), 0); + free((void*)ret_text); + + // Verify date data + int32_t ret_date = + tsfile_result_set_get_value_by_index_int32_t(result, 4); + EXPECT_EQ(test_date, ret_date); + + row_count++; + } + free_tsfile_result_set(reinterpret_cast(&result)); + + ASSERT_OK(tsfile_reader_close(reader), "close reader failed"); + free(device_id); + free(str_measurement_id); + free(text_measurement_id); + free(date_measurement_id); +} TEST_F(CWrapperTest, WriterFlushTabletAndReadData) { ERRNO code = 0; diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index 4386b1835..1df2e47b7 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -456,6 +456,7 @@ TEST_F(TsFileTableReaderTest, TestDecoder) { ResultSet* ret = nullptr; int ret_value = reader.query("test_table", columns, INT64_MIN, INT64_MAX, ret); + ASSERT_EQ(ret_value, E_OK); auto* table_result_set = (storage::TableResultSet*)ret; bool has_next = false; int cur_lin = 0; diff --git a/python/setup.py b/python/setup.py index 329cc2aae..cd3699818 100644 --- a/python/setup.py +++ b/python/setup.py @@ -25,7 +25,7 @@ from setuptools import setup, Extension from setuptools.command.build_ext import build_ext -version = "2.1.0.dev0" +version = "2.2.0.dev" system = platform.system() diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index bb1e2b84a..8846348f2 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -16,8 +16,6 @@ # under the License. # -import os - import numpy as np import pandas as pd import pytest @@ -36,34 +34,57 @@ def test_row_record_write_and_read(): try: + if os.path.exists("record_write_and_read.tsfile"): + os.remove("record_write_and_read.tsfile") writer = TsFileWriter("record_write_and_read.tsfile") writer.register_timeseries("root.device1", TimeseriesSchema("level1", TSDataType.INT64)) writer.register_timeseries("root.device1", TimeseriesSchema("level2", TSDataType.DOUBLE)) - writer.register_timeseries("root.device2", TimeseriesSchema("level1", TSDataType.INT32)) + writer.register_timeseries("root.device1", TimeseriesSchema("level3", TSDataType.INT32)) + writer.register_timeseries("root.device1", TimeseriesSchema("level4", TSDataType.STRING)) + writer.register_timeseries("root.device1", TimeseriesSchema("level5", TSDataType.TEXT)) + writer.register_timeseries("root.device1", TimeseriesSchema("level6", TSDataType.BLOB)) + writer.register_timeseries("root.device1", TimeseriesSchema("level7", TSDataType.DATE)) + + max_row_num = 10 - max_row_num = 1000 for i in range(max_row_num): row = RowRecord("root.device1", i, [Field("level1", i + 1, TSDataType.INT64), - Field("level2", i * 1.1, TSDataType.DOUBLE)]) - writer.write_row_record(row) - row = RowRecord("root.device2", i, - [Field("level1", i + 1, TSDataType.INT32)]) + Field("level2", i * 1.1, TSDataType.DOUBLE), + Field("level3", i * 2, TSDataType.INT32), + Field("level4", f"string_value_{i}", TSDataType.STRING), + Field("level5", f"text_value_{i}", TSDataType.TEXT), + Field("level6", f"blob_data_{i}".encode('utf-8'), TSDataType.BLOB), + Field("level7", i, TSDataType.DATE)]) writer.write_row_record(row) writer.close() reader = TsFileReader("record_write_and_read.tsfile") - result = reader.query_timeseries("root.device1", ["level1", "level2"], 10, 100) - i = 10 - while result.next(): - print(result.get_value_by_index(1)) - print(reader.get_active_query_result()) + result = reader.query_timeseries( + "root.device1", + ["level1", "level2", "level3", "level4", "level5", "level6", "level7"], + 0, + 100, + ) + assert len(reader.get_active_query_result()) == 1 + + for row_num in range(max_row_num): + assert result.next() + assert result.get_value_by_index(1) == row_num + assert result.get_value_by_index(2) == row_num + 1 + assert result.get_value_by_index(3) == pytest.approx(row_num * 1.1) + assert result.get_value_by_index(4) == row_num * 2 + assert result.get_value_by_index(5) == f"string_value_{row_num}" + assert result.get_value_by_index(6) == f"text_value_{row_num}" + assert result.get_value_by_index(7) == f"blob_data_{row_num}" + assert result.get_value_by_index(8) == row_num + + assert not result.next() + assert len(reader.get_active_query_result()) == 1 result.close() - result2 = reader.query_table_on_tree(["level1", "level2"], 20, 50) - print(result2.read_data_frame()) - result2.close() print(reader.get_active_query_result()) + assert len(reader.get_active_query_result()) == 0 reader.close() @@ -72,7 +93,6 @@ def test_row_record_write_and_read(): if os.path.exists("record_write_and_read.tsfile"): os.remove("record_write_and_read.tsfile") - def test_tree_query_to_dataframe_variants(): file_path = "tree_query_to_dataframe.tsfile" device_ids = [ @@ -226,7 +246,6 @@ def _extract_device(row, path_columns): if os.path.exists(file_path): os.remove(file_path) - def test_get_all_timeseries_schemas(): file_path = "get_all_timeseries_schema.tsfile" device_ids = [ @@ -290,12 +309,10 @@ def test_get_all_timeseries_schemas(): if os.path.exists(file_path): os.remove(file_path) - -@pytest.mark.skip(reason="API not match") def test_tablet_write_and_read(): try: - if os.path.exists("record_write_and_read.tsfile"): - os.remove("record_write_and_read.tsfile") + if os.path.exists("tablet_write_and_read.tsfile"): + os.remove("tablet_write_and_read.tsfile") writer = TsFileWriter("tablet_write_and_read.tsfile") measurement_num = 30 for i in range(measurement_num): @@ -324,9 +341,8 @@ def test_tablet_write_and_read(): while result.next(): assert result.is_null_by_index(1) == False assert result.get_value_by_index(1) == row_num - # Here, the data retrieval uses the table model's API, - # which might be incompatible. Therefore, it is better to skip it for now. assert result.get_value_by_name("level0") == row_num + assert result.get_value_by_index(2) == row_num row_num = row_num + 1 assert row_num == max_row_num @@ -338,12 +354,13 @@ def test_tablet_write_and_read(): if os.path.exists("tablet_write_and_read.tsfile"): os.remove("tablet_write_and_read.tsfile") - def test_table_writer_and_reader(): table = TableSchema("test_table", [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) try: + if os.path.exists("table_write.tsfile"): + os.remove("table_write.tsfile") with TsFileTableWriter("table_write.tsfile", table) as writer: tablet = Tablet(["device", "value"], [TSDataType.STRING, TSDataType.DOUBLE], 100) @@ -392,7 +409,6 @@ def test_table_writer_and_reader(): if os.path.exists("table_write.tsfile"): os.remove("table_write.tsfile") - def test_query_result_detach_from_reader(): try: ## Prepare data @@ -423,7 +439,6 @@ def test_query_result_detach_from_reader(): if os.path.exists("query_result_detach_from_reader.tsfile"): os.remove("query_result_detach_from_reader.tsfile") - def test_lower_case_name(): if os.path.exists("lower_case_name.tsfile"): os.remove("lower_case_name.tsfile") @@ -447,7 +462,6 @@ def test_lower_case_name(): assert data_frame.shape == (100, 3) assert data_frame["value"].sum() == 5445.0 - def test_tsfile_config(): from tsfile import get_tsfile_config, set_tsfile_config @@ -500,7 +514,6 @@ def test_tsfile_config(): with pytest.raises(NotSupportedError): set_tsfile_config({"time_compress_type_": Compressor.PAA}) - def test_tsfile_to_df(): table = TableSchema("test_table", [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), @@ -533,3 +546,13 @@ def test_tsfile_to_df(): to_dataframe("table_write_to_df.tsfile", "test_table", ["device1"]) finally: os.remove("table_write_to_df.tsfile") + + +import os + +if __name__ == "__main__": + os.chdir(os.path.dirname(os.path.abspath(__file__))) + pytest.main([ + "test_write_and_read.py::test_row_record_write_and_read", + "-s", "-v" + ]) diff --git a/python/tsfile/constants.py b/python/tsfile/constants.py index 72ac434bf..97bc36132 100644 --- a/python/tsfile/constants.py +++ b/python/tsfile/constants.py @@ -35,7 +35,7 @@ class TSDataType(IntEnum): def to_py_type(self): if self == TSDataType.BOOLEAN: return bool - elif self == TSDataType.INT32: + elif self == TSDataType.INT32 or self == TSDataType.DATE: return int elif self == TSDataType.INT64: return int diff --git a/python/tsfile/field.py b/python/tsfile/field.py index ae46c6282..26d02c5f7 100644 --- a/python/tsfile/field.py +++ b/python/tsfile/field.py @@ -75,6 +75,7 @@ def get_int_value(self): if ( self.data_type != TSDataType.INT32 + and self.data_type != TSDataType.DATE and self.data_type != TSDataType.INT64 and self.data_type != TSDataType.FLOAT and self.data_type != TSDataType.DOUBLE @@ -178,10 +179,21 @@ def get_string_value(self): return str(self.value) # BLOB elif self.data_type == TSDataType.BLOB: - return str(hex(int.from_bytes(self.value, byteorder="big"))) + return self.value else: return str(self.get_object_value(self.data_type)) + def get_bytes_value(self): + if self.value is None: + return None + if self.data_type is None: + raise NoneDataTypeException("None Data Type Exception!") + + if self.data_type == TSDataType.BLOB: + return self.value + else: + raise TypeError("get_bytes_value() only supports BLOB data type") + def __str__(self): return self.get_string_value() diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index d35be96e3..a2f621d4b 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -42,6 +42,8 @@ cdef extern from "./tsfile_cwrapper.h": TS_DATATYPE_DOUBLE = 4 TS_DATATYPE_TEXT = 5 TS_DATATYPE_VECTOR = 6 + TS_DATATYPE_DATE = 9 + TS_DATATYPE_BLOB = 10 TS_DATATYPE_STRING = 11 TS_DATATYPE_NULL_TYPE = 254 TS_DATATYPE_INVALID = 255 @@ -159,7 +161,9 @@ cdef extern from "./tsfile_cwrapper.h": ErrorCode _insert_data_into_ts_record_by_name_double(TsRecord data, const char *measurement_name, const double value); ErrorCode _insert_data_into_ts_record_by_name_bool(TsRecord data, const char *measurement_name, const bint value); - + ErrorCode _insert_data_into_ts_record_by_name_string_with_len(TsRecord data, const char *measurement_name, + const char *value, + const uint32_t value_len); void _free_tsfile_ts_record(TsRecord * record); # resulSet : query data from tsfile reader diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 7a9aa889e..10441041c 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -23,8 +23,8 @@ from libc.stdlib cimport free from libc.stdlib cimport malloc from libc.string cimport strdup from cpython.exc cimport PyErr_SetObject -from cpython.unicode cimport PyUnicode_AsUTF8String, PyUnicode_AsUTF8 -from cpython.bytes cimport PyBytes_AsString +from cpython.unicode cimport PyUnicode_AsUTF8String, PyUnicode_AsUTF8, PyUnicode_AsUTF8AndSize +from cpython.bytes cimport PyBytes_AsString, PyBytes_AsStringAndSize from tsfile.exceptions import ERROR_MAPPING from tsfile.schema import ResultSetMetaData as ResultSetMetaDataPy @@ -97,8 +97,10 @@ cdef dict TS_DATA_TYPE_MAP = { TSDataTypePy.INT64: TSDataType.TS_DATATYPE_INT64, TSDataTypePy.FLOAT: TSDataType.TS_DATATYPE_FLOAT, TSDataTypePy.DOUBLE: TSDataType.TS_DATATYPE_DOUBLE, + TSDataTypePy.DATE: TSDataType.TS_DATATYPE_DATE, TSDataTypePy.TEXT: TSDataType.TS_DATATYPE_TEXT, - TSDataTypePy.STRING: TSDataType.TS_DATATYPE_STRING + TSDataTypePy.STRING: TSDataType.TS_DATATYPE_STRING, + TSDataTypePy.BLOB: TSDataType.TS_DATATYPE_BLOB } cdef dict TS_ENCODING_MAP = { @@ -278,8 +280,8 @@ cdef Tablet to_c_tablet(object tablet): if value[row] is not None: tablet_add_value_by_index_double(ctablet, row, col, value[row]) - # STRING - elif data_type == TS_DATATYPE_STRING: + # STRING or TEXT or BLOB + elif data_type == TS_DATATYPE_STRING or data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_BLOB: for row in range(max_row_num): if value[row] is not None: py_value = value[row] @@ -293,7 +295,10 @@ cdef TsRecord to_c_record(object row_record): cdef int field_num = row_record.get_fields_num() cdef int64_t timestamp = row_record.get_timestamp() cdef bytes device_id_bytes = PyUnicode_AsUTF8String(row_record.get_device_id()) - cdef const char * device_id = device_id_bytes + cdef const char* device_id = device_id_bytes + cdef const char* str_ptr + cdef char* blob_ptr + cdef Py_ssize_t str_len cdef TsRecord record cdef int i cdef TSDataType data_type @@ -302,11 +307,9 @@ cdef TsRecord to_c_record(object row_record): field = row_record.get_fields()[i] data_type = to_c_data_type(field.get_data_type()) if data_type == TS_DATATYPE_BOOLEAN: - _insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), - field.get_bool_value()) - elif data_type == TS_DATATYPE_INT32: - _insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), - field.get_int_value()) + _insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bool_value()) + elif data_type == TS_DATATYPE_INT32 or data_type == TS_DATATYPE_DATE: + _insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_int_value()) elif data_type == TS_DATATYPE_INT64: _insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) @@ -314,9 +317,15 @@ cdef TsRecord to_c_record(object row_record): _insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) elif data_type == TS_DATATYPE_FLOAT: - _insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), - field.get_float_value()) - + _insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) + elif data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_STRING: + str_ptr = PyUnicode_AsUTF8AndSize(field.get_string_value(), &str_len) + _insert_data_into_ts_record_by_name_string_with_len(record, PyUnicode_AsUTF8(field.get_field_name()), str_ptr, str_len) + elif data_type == TS_DATATYPE_BLOB or data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_STRING: + if PyBytes_AsStringAndSize(field.get_string_value(), &blob_ptr, &str_len) < 0: + raise ValueError("blob not legal") + _insert_data_into_ts_record_by_name_string_with_len(record, PyUnicode_AsUTF8(field.get_field_name()), + blob_ptr, str_len) return record # Free c structs' space diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 6cc6b0042..3b9616399 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -159,10 +159,11 @@ cdef class ResultSetPy: self.check_result_set_invalid() # Well when we check is null, id from 0, so there index -1. if tsfile_result_set_is_null_by_index(self.result, index): + print("get value by index and check is null") return None # data type in metadata is an array, id from 0. data_type = self.metadata.get_data_type(index) - if data_type == TSDataTypePy.INT32: + if data_type == TSDataTypePy.INT32 or data_type == TSDataTypePy.DATE: return tsfile_result_set_get_value_by_index_int32_t(self.result, index) elif data_type == TSDataTypePy.INT64: return tsfile_result_set_get_value_by_index_int64_t(self.result, index) @@ -172,14 +173,14 @@ cdef class ResultSetPy: return tsfile_result_set_get_value_by_index_double(self.result, index) elif data_type == TSDataTypePy.BOOLEAN: return tsfile_result_set_get_value_by_index_bool(self.result, index) - elif data_type == TSDataTypePy.STRING or data_type == TSDataTypePy.TEXT: + elif data_type == TSDataTypePy.STRING or data_type == TSDataTypePy.TEXT or data_type == TSDataTypePy.BLOB: try: string = tsfile_result_set_get_value_by_index_string(self.result, index) - py_str = string.decode('utf-8') - return py_str + if string == NULL: + return None + return string.decode('utf-8') finally: - if string != NULL: - free(string) + pass def get_value_by_name(self, column_name : str): """