From 0171654da161f44a62a8129bac487e8f0eb661f3 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 18 Dec 2025 16:37:57 +0800 Subject: [PATCH 1/5] fix null point. --- cpp/src/common/tsblock/tsblock.h | 12 - cpp/src/encoding/dictionary_decoder.h | 24 +- cpp/src/encoding/gorilla_decoder.h | 162 ++++++------- cpp/src/encoding/int32_rle_decoder.h | 36 +-- cpp/src/encoding/plain_decoder.h | 28 ++- cpp/src/encoding/sprintz_encoder.h | 2 +- cpp/src/reader/aligned_chunk_reader.cc | 45 +++- cpp/src/reader/aligned_chunk_reader.h | 99 ++++---- cpp/src/reader/result_set.h | 5 +- cpp/src/utils/errno_define.h | 4 +- cpp/src/utils/util_define.h | 7 + .../table_view/tsfile_reader_table_test.cc | 215 ++++++++++++++++++ 12 files changed, 441 insertions(+), 198 deletions(-) diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h index dce94f8ad..859ad393d 100644 --- a/cpp/src/common/tsblock/tsblock.h +++ b/cpp/src/common/tsblock/tsblock.h @@ -76,18 +76,6 @@ class TsBlock { capacity_ += extend_size; } - // need to call flush_row_count after using colappender - FORCE_INLINE int flush_row_count(uint32_t row_count) { - int errnum = E_OK; - if (row_count_ == 0) { - row_count_ = row_count; - } else if (row_count_ != row_count) { - LOGE("Inconsistent number of rows in two columns"); - errnum = E_TSBLOCK_DATA_INCONSISTENCY; - } - return errnum; - } - FORCE_INLINE void fill_trailling_nulls() { for (uint32_t i = 0; i < get_column_count(); ++i) { for (uint32_t j = vectors_[i]->get_row_num(); j < row_count_; ++j) { diff --git a/cpp/src/encoding/dictionary_decoder.h b/cpp/src/encoding/dictionary_decoder.h index 0dce5a2b0..5f64b5873 100644 --- a/cpp/src/encoding/dictionary_decoder.h +++ b/cpp/src/encoding/dictionary_decoder.h @@ -37,39 +37,39 @@ class DictionaryDecoder : public Decoder { public: ~DictionaryDecoder() override = default; - bool has_remaining(const common::ByteStream &buffer) { + bool has_remaining(const common::ByteStream& buffer) override { return (!entry_index_.empty() && value_decoder_.has_next_package()) || buffer.has_remaining(); } - int read_boolean(bool &ret_value, common::ByteStream &in) override { + int read_boolean(bool& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } - int read_int32(int32_t &ret_value, common::ByteStream &in) override { + int read_int32(int32_t& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } - int read_int64(int64_t &ret_value, common::ByteStream &in) override { + int read_int64(int64_t& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } - int read_float(float &ret_value, common::ByteStream &in) override { + int read_float(float& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } - int read_double(double &ret_value, common::ByteStream &in) override { + int read_double(double& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } - int read_String(common::String &ret_value, common::PageArena &pa, - common::ByteStream &in) { + int read_String(common::String& ret_value, common::PageArena& pa, + common::ByteStream& in) override { auto std_str = read_string(in); return ret_value.dup_from(std_str, pa); } void init() { value_decoder_.init(); } - void reset() { + void reset() override { value_decoder_.reset(); entry_index_.clear(); } - std::string read_string(common::ByteStream &buffer) { + std::string read_string(common::ByteStream& buffer) { if (entry_index_.empty()) { init_map(buffer); } @@ -77,14 +77,14 @@ class DictionaryDecoder : public Decoder { return entry_index_[code]; } - bool has_next(common::ByteStream &buffer) { + bool has_next(common::ByteStream& buffer) { if (entry_index_.empty()) { init_map(buffer); } return value_decoder_.has_next(buffer); } - int init_map(common::ByteStream &buffer) { + int init_map(common::ByteStream& buffer) { int ret = common::E_OK; int length = 0; if (RET_FAIL(common::SerializationUtil::read_var_int(length, buffer))) { diff --git a/cpp/src/encoding/gorilla_decoder.h b/cpp/src/encoding/gorilla_decoder.h index e2b620634..4429f2b5e 100644 --- a/cpp/src/encoding/gorilla_decoder.h +++ b/cpp/src/encoding/gorilla_decoder.h @@ -49,12 +49,12 @@ class GorillaDecoder : public Decoder { } FORCE_INLINE bool has_next() { return has_next_; } - FORCE_INLINE bool has_remaining(const common::ByteStream &buffer) { + FORCE_INLINE bool has_remaining(const common::ByteStream& buffer) override { return buffer.has_remaining() || has_next(); } // If empty, cache 8 bits from in_stream to 'buffer_'. - void flush_byte_if_empty(common::ByteStream &in) { + void flush_byte_if_empty(common::ByteStream& in) { if (bits_left_ == 0) { uint32_t read_len = 0; in.read_buf(&buffer_, 1, read_len); @@ -63,7 +63,7 @@ class GorillaDecoder : public Decoder { } // Reads the next bit and returns true if the next bit is 1, otherwise 0. - bool read_bit(common::ByteStream &in) { + bool read_bit(common::ByteStream& in) { bool bit = ((buffer_ >> (bits_left_ - 1)) & 1) == 1; bits_left_--; flush_byte_if_empty(in); @@ -76,7 +76,7 @@ class GorillaDecoder : public Decoder { * @bits: How many next bits are reader from the stream * return: long value that was reader from the stream */ - int64_t read_long(int bits, common::ByteStream &in) { + int64_t read_long(int bits, common::ByteStream& in) { int64_t value = 0; while (bits > 0) { if (bits > bits_left_ || bits == 8) { @@ -101,7 +101,7 @@ class GorillaDecoder : public Decoder { } // Read the control bits - uint8_t read_next_control_bit(int max_bits, common::ByteStream &in) { + uint8_t read_next_control_bit(int max_bits, common::ByteStream& in) { uint8_t value = 0x00; for (int i = 0; i < max_bits; i++) { value <<= 1; @@ -114,18 +114,18 @@ class GorillaDecoder : public Decoder { return value; } - T read_next(common::ByteStream &in); - virtual T cache_next(common::ByteStream &in); - T decode(common::ByteStream &in); + T read_next(common::ByteStream& in); + virtual T cache_next(common::ByteStream& in); + T decode(common::ByteStream& in); // interface from Decoder - int read_boolean(bool &ret_value, common::ByteStream &in); - int read_int32(int32_t &ret_value, common::ByteStream &in); - int read_int64(int64_t &ret_value, common::ByteStream &in); - int read_float(float &ret_value, common::ByteStream &in); - int read_double(double &ret_value, common::ByteStream &in); - int read_String(common::String &ret_value, common::PageArena &pa, - common::ByteStream &in); + int read_boolean(bool& ret_value, common::ByteStream& in) override; + int read_int32(int32_t& ret_value, common::ByteStream& in) override; + int read_int64(int64_t& ret_value, common::ByteStream& in) override; + int read_float(float& ret_value, common::ByteStream& in) override; + int read_double(double& ret_value, common::ByteStream& in) override; + int read_String(common::String& ret_value, common::PageArena& pa, + common::ByteStream& in) override; public: common::TSEncoding type_; @@ -140,7 +140,7 @@ class GorillaDecoder : public Decoder { template <> FORCE_INLINE int32_t -GorillaDecoder::read_next(common::ByteStream &in) { +GorillaDecoder::read_next(common::ByteStream& in) { uint8_t control_bits = read_next_control_bit(2, in); uint8_t significant_bits = 0; int32_t xor_value = 0; @@ -172,7 +172,7 @@ GorillaDecoder::read_next(common::ByteStream &in) { template <> FORCE_INLINE int64_t -GorillaDecoder::read_next(common::ByteStream &in) { +GorillaDecoder::read_next(common::ByteStream& in) { uint8_t control_bits = read_next_control_bit(2, in); uint8_t significant_bits = 0; @@ -208,7 +208,7 @@ GorillaDecoder::read_next(common::ByteStream &in) { template <> FORCE_INLINE int32_t -GorillaDecoder::cache_next(common::ByteStream &in) { +GorillaDecoder::cache_next(common::ByteStream& in) { read_next(in); if (stored_value_ == GORILLA_ENCODING_ENDING_INTEGER) { has_next_ = false; @@ -218,7 +218,7 @@ GorillaDecoder::cache_next(common::ByteStream &in) { template <> FORCE_INLINE int64_t -GorillaDecoder::cache_next(common::ByteStream &in) { +GorillaDecoder::cache_next(common::ByteStream& in) { read_next(in); if (stored_value_ == GORILLA_ENCODING_ENDING_LONG) { has_next_ = false; @@ -227,7 +227,7 @@ GorillaDecoder::cache_next(common::ByteStream &in) { } template <> -FORCE_INLINE int32_t GorillaDecoder::decode(common::ByteStream &in) { +FORCE_INLINE int32_t GorillaDecoder::decode(common::ByteStream& in) { int32_t ret_value = stored_value_; if (UNLIKELY(!first_value_was_read_)) { flush_byte_if_empty(in); @@ -240,7 +240,7 @@ FORCE_INLINE int32_t GorillaDecoder::decode(common::ByteStream &in) { } template <> -FORCE_INLINE int64_t GorillaDecoder::decode(common::ByteStream &in) { +FORCE_INLINE int64_t GorillaDecoder::decode(common::ByteStream& in) { int64_t ret_value = stored_value_; if (UNLIKELY(!first_value_was_read_)) { flush_byte_if_empty(in); @@ -254,18 +254,18 @@ FORCE_INLINE int64_t GorillaDecoder::decode(common::ByteStream &in) { class FloatGorillaDecoder : public GorillaDecoder { public: - int read_boolean(bool &ret_value, common::ByteStream &in); - int read_int32(int32_t &ret_value, common::ByteStream &in); - int read_int64(int64_t &ret_value, common::ByteStream &in); - int read_float(float &ret_value, common::ByteStream &in); - int read_double(double &ret_value, common::ByteStream &in); + int read_boolean(bool& ret_value, common::ByteStream& in); + int read_int32(int32_t& ret_value, common::ByteStream& in); + int read_int64(int64_t& ret_value, common::ByteStream& in); + int read_float(float& ret_value, common::ByteStream& in); + int read_double(double& ret_value, common::ByteStream& in); - float decode(common::ByteStream &in) { + float decode(common::ByteStream& in) { int32_t value_int = GorillaDecoder::decode(in); return common::int_to_float(value_int); } - int32_t cache_next(common::ByteStream &in) { + int32_t cache_next(common::ByteStream& in) { read_next(in); if (stored_value_ == common::float_to_int(GORILLA_ENCODING_ENDING_FLOAT)) { @@ -277,18 +277,18 @@ class FloatGorillaDecoder : public GorillaDecoder { class DoubleGorillaDecoder : public GorillaDecoder { public: - int read_boolean(bool &ret_value, common::ByteStream &in); - int read_int32(int32_t &ret_value, common::ByteStream &in); - int read_int64(int64_t &ret_value, common::ByteStream &in); - int read_float(float &ret_value, common::ByteStream &in); - int read_double(double &ret_value, common::ByteStream &in); + int read_boolean(bool& ret_value, common::ByteStream& in); + int read_int32(int32_t& ret_value, common::ByteStream& in); + int read_int64(int64_t& ret_value, common::ByteStream& in); + int read_float(float& ret_value, common::ByteStream& in); + int read_double(double& ret_value, common::ByteStream& in); - double decode(common::ByteStream &in) { + double decode(common::ByteStream& in) { int64_t value_long = GorillaDecoder::decode(in); return common::long_to_double(value_long); } - int64_t cache_next(common::ByteStream &in) { + int64_t cache_next(common::ByteStream& in) { read_next(in); if (stored_value_ == common::double_to_long(GORILLA_ENCODING_ENDING_DOUBLE)) { @@ -303,126 +303,126 @@ typedef GorillaDecoder LongGorillaDecoder; // wrap as Decoder interface template <> -FORCE_INLINE int IntGorillaDecoder::read_boolean(bool &ret_value, - common::ByteStream &in) { +FORCE_INLINE int IntGorillaDecoder::read_boolean(bool& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int IntGorillaDecoder::read_int32(int32_t &ret_value, - common::ByteStream &in) { +FORCE_INLINE int IntGorillaDecoder::read_int32(int32_t& ret_value, + common::ByteStream& in) { ret_value = decode(in); return common::E_OK; } template <> -FORCE_INLINE int IntGorillaDecoder::read_int64(int64_t &ret_value, - common::ByteStream &in) { +FORCE_INLINE int IntGorillaDecoder::read_int64(int64_t& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int IntGorillaDecoder::read_float(float &ret_value, - common::ByteStream &in) { +FORCE_INLINE int IntGorillaDecoder::read_float(float& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int IntGorillaDecoder::read_double(double &ret_value, - common::ByteStream &in) { +FORCE_INLINE int IntGorillaDecoder::read_double(double& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int IntGorillaDecoder::read_String(common::String &ret_value, - common::PageArena &pa, - common::ByteStream &in) { +FORCE_INLINE int IntGorillaDecoder::read_String(common::String& ret_value, + common::PageArena& pa, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int LongGorillaDecoder::read_boolean(bool &ret_value, - common::ByteStream &in) { +FORCE_INLINE int LongGorillaDecoder::read_boolean(bool& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int LongGorillaDecoder::read_int32(int32_t &ret_value, - common::ByteStream &in) { +FORCE_INLINE int LongGorillaDecoder::read_int32(int32_t& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int LongGorillaDecoder::read_int64(int64_t &ret_value, - common::ByteStream &in) { +FORCE_INLINE int LongGorillaDecoder::read_int64(int64_t& ret_value, + common::ByteStream& in) { ret_value = decode(in); return common::E_OK; } template <> -FORCE_INLINE int LongGorillaDecoder::read_float(float &ret_value, - common::ByteStream &in) { +FORCE_INLINE int LongGorillaDecoder::read_float(float& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int LongGorillaDecoder::read_double(double &ret_value, - common::ByteStream &in) { +FORCE_INLINE int LongGorillaDecoder::read_double(double& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } template <> -FORCE_INLINE int LongGorillaDecoder::read_String(common::String &ret_value, - common::PageArena &pa, - common::ByteStream &in) { +FORCE_INLINE int LongGorillaDecoder::read_String(common::String& ret_value, + common::PageArena& pa, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int FloatGorillaDecoder::read_boolean(bool &ret_value, - common::ByteStream &in) { +FORCE_INLINE int FloatGorillaDecoder::read_boolean(bool& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int FloatGorillaDecoder::read_int32(int32_t &ret_value, - common::ByteStream &in) { +FORCE_INLINE int FloatGorillaDecoder::read_int32(int32_t& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int FloatGorillaDecoder::read_int64(int64_t &ret_value, - common::ByteStream &in) { +FORCE_INLINE int FloatGorillaDecoder::read_int64(int64_t& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int FloatGorillaDecoder::read_float(float &ret_value, - common::ByteStream &in) { +FORCE_INLINE int FloatGorillaDecoder::read_float(float& ret_value, + common::ByteStream& in) { ret_value = decode(in); return common::E_OK; } -FORCE_INLINE int FloatGorillaDecoder::read_double(double &ret_value, - common::ByteStream &in) { +FORCE_INLINE int FloatGorillaDecoder::read_double(double& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int DoubleGorillaDecoder::read_boolean(bool &ret_value, - common::ByteStream &in) { +FORCE_INLINE int DoubleGorillaDecoder::read_boolean(bool& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int DoubleGorillaDecoder::read_int32(int32_t &ret_value, - common::ByteStream &in) { +FORCE_INLINE int DoubleGorillaDecoder::read_int32(int32_t& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int DoubleGorillaDecoder::read_int64(int64_t &ret_value, - common::ByteStream &in) { +FORCE_INLINE int DoubleGorillaDecoder::read_int64(int64_t& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int DoubleGorillaDecoder::read_float(float &ret_value, - common::ByteStream &in) { +FORCE_INLINE int DoubleGorillaDecoder::read_float(float& ret_value, + common::ByteStream& in) { ASSERT(false); return common::E_NOT_SUPPORT; } -FORCE_INLINE int DoubleGorillaDecoder::read_double(double &ret_value, - common::ByteStream &in) { +FORCE_INLINE int DoubleGorillaDecoder::read_double(double& ret_value, + common::ByteStream& in) { ret_value = decode(in); return common::E_OK; } diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index 647f095a7..757a92599 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -38,9 +38,9 @@ class Int32RleDecoder : public Decoder { bool is_length_and_bitwidth_readed_; int current_count_; common::ByteStream byte_cache_; - int32_t *current_buffer_; - Int32Packer *packer_; - uint8_t *tmp_buf_; + int32_t* current_buffer_; + Int32Packer* packer_; + uint8_t* tmp_buf_; public: Int32RleDecoder() @@ -55,30 +55,30 @@ class Int32RleDecoder : public Decoder { tmp_buf_(nullptr) {} ~Int32RleDecoder() override { destroy(); } - bool has_remaining(const common::ByteStream &buffer) override { + bool has_remaining(const common::ByteStream& buffer) override { return buffer.has_remaining() || has_next_package(); } - int read_boolean(bool &ret_value, common::ByteStream &in) { + int read_boolean(bool& ret_value, common::ByteStream& in) override { int32_t bool_value; read_int32(bool_value, in); ret_value = bool_value == 0 ? false : true; return common::E_OK; } - int read_int32(int32_t &ret_value, common::ByteStream &in) override { + int read_int32(int32_t& ret_value, common::ByteStream& in) override { ret_value = static_cast(read_int(in)); return common::E_OK; } - int read_int64(int64_t &ret_value, common::ByteStream &in) override { + int read_int64(int64_t& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } - int read_float(float &ret_value, common::ByteStream &in) override { + int read_float(float& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } - int read_double(double &ret_value, common::ByteStream &in) override { + int read_double(double& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } - int read_String(common::String &ret_value, common::PageArena &pa, - common::ByteStream &in) override { + int read_String(common::String& ret_value, common::PageArena& pa, + common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; } @@ -91,7 +91,7 @@ class Int32RleDecoder : public Decoder { current_count_ = 0; } - bool has_next(common::ByteStream &buffer) { + bool has_next(common::ByteStream& buffer) { if (current_count_ > 0 || buffer.remaining_size() > 0 || has_next_package()) { return true; @@ -103,7 +103,7 @@ class Int32RleDecoder : public Decoder { return current_count_ > 0 || byte_cache_.remaining_size() > 0; } - int32_t read_int(common::ByteStream &buffer) { + int32_t read_int(common::ByteStream& buffer) { if (!is_length_and_bitwidth_readed_) { // start to reader a new rle+bit-packing pattern read_length_and_bitwidth(buffer); @@ -153,7 +153,7 @@ class Int32RleDecoder : public Decoder { if (current_buffer_ != nullptr) { common::mem_free(current_buffer_); } - current_buffer_ = static_cast( + current_buffer_ = static_cast( common::mem_alloc(sizeof(int32_t) * bit_packed_group_count * 8, common::MOD_DECODER_OBJ)); if (IS_NULL(current_buffer_)) { @@ -179,7 +179,7 @@ class Int32RleDecoder : public Decoder { return ret; } - int read_length_and_bitwidth(common::ByteStream &buffer) { + int read_length_and_bitwidth(common::ByteStream& buffer) { int ret = common::E_OK; if (RET_FAIL( common::SerializationUtil::read_var_uint(length_, buffer))) { @@ -189,18 +189,18 @@ class Int32RleDecoder : public Decoder { common::mem_free(tmp_buf_); } tmp_buf_ = - (uint8_t *)common::mem_alloc(length_, common::MOD_DECODER_OBJ); + (uint8_t*)common::mem_alloc(length_, common::MOD_DECODER_OBJ); if (tmp_buf_ == nullptr) { return common::E_OOM; } uint32_t ret_read_len = 0; - if (RET_FAIL(buffer.read_buf((uint8_t *)tmp_buf_, length_, + if (RET_FAIL(buffer.read_buf((uint8_t*)tmp_buf_, length_, ret_read_len))) { return ret; } else if (length_ != ret_read_len) { ret = common::E_PARTIAL_READ; } - byte_cache_.wrap_from((char *)tmp_buf_, length_); + byte_cache_.wrap_from((char*)tmp_buf_, length_); is_length_and_bitwidth_readed_ = true; uint8_t tmp_bit_width; common::SerializationUtil::read_ui8(tmp_bit_width, byte_cache_); diff --git a/cpp/src/encoding/plain_decoder.h b/cpp/src/encoding/plain_decoder.h index d1c6969e7..27e8006f4 100644 --- a/cpp/src/encoding/plain_decoder.h +++ b/cpp/src/encoding/plain_decoder.h @@ -27,34 +27,38 @@ namespace storage { class PlainDecoder : public Decoder { public: ~PlainDecoder() override = default; - FORCE_INLINE void reset() { /* do nothing */ - } - FORCE_INLINE bool has_remaining(const common::ByteStream &buffer) { + FORCE_INLINE void reset() override { /* do nothing */ } + FORCE_INLINE bool has_remaining(const common::ByteStream& buffer) override { return buffer.has_remaining(); } - FORCE_INLINE int read_boolean(bool &ret_bool, common::ByteStream &in) { - return common::SerializationUtil::read_ui8((uint8_t &)ret_bool, in); + FORCE_INLINE int read_boolean(bool& ret_bool, + common::ByteStream& in) override { + return common::SerializationUtil::read_ui8((uint8_t&)ret_bool, in); } - FORCE_INLINE int read_int32(int32_t &ret_int32, common::ByteStream &in) { + FORCE_INLINE int read_int32(int32_t& ret_int32, + common::ByteStream& in) override { return common::SerializationUtil::read_var_int(ret_int32, in); } - FORCE_INLINE int read_int64(int64_t &ret_int64, common::ByteStream &in) { + FORCE_INLINE int read_int64(int64_t& ret_int64, + common::ByteStream& in) override { return common::SerializationUtil::read_i64(ret_int64, in); } - FORCE_INLINE int read_float(float &ret_float, common::ByteStream &in) { + FORCE_INLINE int read_float(float& ret_float, + common::ByteStream& in) override { return common::SerializationUtil::read_float(ret_float, in); } - FORCE_INLINE int read_double(double &ret_double, common::ByteStream &in) { + FORCE_INLINE int read_double(double& ret_double, + common::ByteStream& in) override { return common::SerializationUtil::read_double(ret_double, in); } - FORCE_INLINE int read_String(common::String &ret_String, - common::PageArena &pa, - common::ByteStream &in) { + FORCE_INLINE int read_String(common::String& ret_String, + common::PageArena& pa, + common::ByteStream& in) override { return common::SerializationUtil::read_mystring(ret_String, &pa, in); } }; diff --git a/cpp/src/encoding/sprintz_encoder.h b/cpp/src/encoding/sprintz_encoder.h index 309344804..04906f12b 100644 --- a/cpp/src/encoding/sprintz_encoder.h +++ b/cpp/src/encoding/sprintz_encoder.h @@ -34,7 +34,7 @@ class SprintzEncoder : public Encoder { predict_method_ = method; } - virtual void reset() { + virtual void reset() override { byte_cache_.reset(); is_first_cached_ = false; group_num_ = 0; diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index b39db564c..e4779184f 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -253,6 +253,12 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta*& chunk_meta, int cur_page_header_serialized_size = 0; // TODO: configurable int retry_read_want_size = 1024; + if (chunk_visit_offset - chunk_header.serialized_size_ >= + chunk_header.data_size_) { + cur_page_header.reset(); + return E_OK; + } + do { in_stream.mark_read_pos(); cur_page_header.reset(); @@ -434,6 +440,11 @@ int AlignedChunkReader::decode_cur_value_page_data() { char* value_buf = nullptr; uint32_t value_buf_size = 0; + if (cur_value_page_header_.compressed_size_ == 0) { + value_in_.wrap_from(value_buf, 0); + return E_OK; + } + // Step 2: do uncompress if (IS_SUCC(ret)) { value_compressed_buf = @@ -521,10 +532,10 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( uint32_t mask = 1 << 7; \ int64_t time = 0; \ CppType value; \ - while (time_decoder_->has_remaining(time_in) && \ - value_decoder_->has_remaining(value_in)) { \ + while (time_decoder_->has_remaining(time_in)) { \ cur_value_index++; \ - if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & \ + if (value_page_col_notnull_bitmap_.empty() || \ + ((value_page_col_notnull_bitmap_[cur_value_index / 8] & \ 0xFF) & \ (mask >> (cur_value_index % 8))) == 0) { \ ret = time_decoder_->read_int64(time, time_in); \ @@ -539,6 +550,10 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( row_appender.append_null(1); \ continue; \ } \ + assert(value_decoder_->has_remaining(value_in)); \ + if (!value_decoder_->has_remaining(value_in)) { \ + return common::E_DATA_INCONSISTENCY; \ + } \ if (UNLIKELY(!row_appender.add_row())) { \ ret = E_OVERFLOW; \ cur_value_index--; \ @@ -565,10 +580,10 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK( uint32_t mask = 1 << 7; int64_t time = 0; int32_t value; - while (time_decoder_->has_remaining(time_in) && - value_decoder_->has_remaining(value_in)) { + while (time_decoder_->has_remaining(time_in)) { cur_value_index++; - if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & + if (value_page_col_notnull_bitmap_.empty() || + ((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & (mask >> (cur_value_index % 8))) == 0) { ret = time_decoder_->read_int64(time, time_in); if (ret != E_OK) { @@ -582,6 +597,10 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK( row_appender.append_null(1); continue; } + assert(value_decoder_->has_remaining(value_in)); + if (!value_decoder_->has_remaining(value_in)) { + return common::E_DATA_INCONSISTENCY; + } if (UNLIKELY(!row_appender.add_row())) { ret = E_OVERFLOW; cur_value_index--; @@ -654,14 +673,20 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( int64_t time = 0; common::String value; uint32_t mask = 1 << 7; - while (time_decoder_->has_remaining(time_in) && - value_decoder_->has_remaining(value_in)) { + while (time_decoder_->has_remaining(time_in)) { cur_value_index++; bool should_read_data = true; - if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & - (mask >> (cur_value_index % 8))) == 0) { + if (value_page_col_notnull_bitmap_.empty() || + (value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & + (mask >> (cur_value_index % 8)) == 0) { should_read_data = false; } + if (should_read_data) { + assert(value_decoder_->has_remaining(value_in)); + if (!value_decoder_->has_remaining(value_in)) { + return E_DATA_INCONSISTENCY; + } + } if (UNLIKELY(!row_appender.add_row())) { ret = E_OVERFLOW; cur_value_index--; diff --git a/cpp/src/reader/aligned_chunk_reader.h b/cpp/src/reader/aligned_chunk_reader.h index 7bf290470..aefb7bc58 100644 --- a/cpp/src/reader/aligned_chunk_reader.h +++ b/cpp/src/reader/aligned_chunk_reader.h @@ -57,8 +57,8 @@ class AlignedChunkReader : public IChunkReader { time_uncompressed_buf_(nullptr), value_uncompressed_buf_(nullptr), cur_value_index(-1) {} - int init(ReadFile *read_file, common::String m_name, - common::TSDataType data_type, Filter *time_filter) override; + int init(ReadFile* read_file, common::String m_name, + common::TSDataType data_type, Filter* time_filter) override; void reset() override; void destroy() override; ~AlignedChunkReader() override = default; @@ -67,43 +67,46 @@ class AlignedChunkReader : public IChunkReader { return prev_value_page_not_finish() || (value_chunk_visit_offset_ - value_chunk_header_.serialized_size_ < - value_chunk_header_.data_size_); + value_chunk_header_.data_size_) || + prev_time_page_not_finish() || + (time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ < + time_chunk_header_.data_size_); } - ChunkHeader &get_chunk_header() override { return value_chunk_header_; } - int load_by_aligned_meta(ChunkMeta *time_meta, - ChunkMeta *value_meta) override; + ChunkHeader& get_chunk_header() override { return value_chunk_header_; } + int load_by_aligned_meta(ChunkMeta* time_meta, + ChunkMeta* value_meta) override; - int get_next_page(common::TsBlock *tsblock, Filter *oneshoot_filter, - common::PageArena &pa) override; + int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter, + common::PageArena& pa) override; private: FORCE_INLINE bool chunk_has_only_one_page( - const ChunkHeader &chunk_header) const { + const ChunkHeader& chunk_header) const { return (chunk_header.chunk_type_ & ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) == ONLY_ONE_PAGE_CHUNK_HEADER_MARKER; } - int alloc_compressor_and_decoder(storage::Decoder *&decoder, - storage::Compressor *&compressor, + int alloc_compressor_and_decoder(storage::Decoder*& decoder, + storage::Compressor*& compressor, common::TSEncoding encoding, common::TSDataType data_type, common::CompressionType compression_type); - int get_cur_page_header(ChunkMeta *&chunk_meta, - common::ByteStream &in_stream_, - PageHeader &cur_page_header_, - uint32_t &chunk_visit_offset, - ChunkHeader &chunk_header); - int read_from_file_and_rewrap(common::ByteStream &in_stream_, - ChunkMeta *&chunk_meta, - uint32_t &chunk_visit_offset, - int32_t &file_data_buf_size, + int get_cur_page_header(ChunkMeta*& chunk_meta, + common::ByteStream& in_stream_, + PageHeader& cur_page_header_, + uint32_t& chunk_visit_offset, + ChunkHeader& chunk_header); + int read_from_file_and_rewrap(common::ByteStream& in_stream_, + ChunkMeta*& chunk_meta, + uint32_t& chunk_visit_offset, + int32_t& file_data_buf_size, int want_size = 0, bool may_shrink = true); - bool cur_page_statisify_filter(Filter *filter); + bool cur_page_statisify_filter(Filter* filter); int skip_cur_page(); int decode_cur_time_page_data(); int decode_cur_value_page_data(); - int decode_time_value_buf_into_tsblock(common::TsBlock *&ret_tsblock, - Filter *filter, - common::PageArena *pa); + int decode_time_value_buf_into_tsblock(common::TsBlock*& ret_tsblock, + Filter* filter, + common::PageArena* pa); bool prev_time_page_not_finish() const { return (time_decoder_ && time_decoder_->has_remaining(time_in_)) || time_in_.has_remaining(); @@ -114,25 +117,25 @@ class AlignedChunkReader : public IChunkReader { value_in_.has_remaining(); } - int decode_tv_buf_into_tsblock_by_datatype(common::ByteStream &time_in, - common::ByteStream &value_in, - common::TsBlock *ret_tsblock, - Filter *filter, - common::PageArena *pa); - int i32_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream &time_in, - common::ByteStream &value_in, - common::RowAppender &row_appender, - Filter *filter); - int STRING_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream &time_in, - common::ByteStream &value_in, - common::RowAppender &row_appender, - common::PageArena &pa, - Filter *filter); + int decode_tv_buf_into_tsblock_by_datatype(common::ByteStream& time_in, + common::ByteStream& value_in, + common::TsBlock* ret_tsblock, + Filter* filter, + common::PageArena* pa); + int i32_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream& time_in, + common::ByteStream& value_in, + common::RowAppender& row_appender, + Filter* filter); + int STRING_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream& time_in, + common::ByteStream& value_in, + common::RowAppender& row_appender, + common::PageArena& pa, + Filter* filter); private: - ReadFile *read_file_; - ChunkMeta *time_chunk_meta_; - ChunkMeta *value_chunk_meta_; + ReadFile* read_file_; + ChunkMeta* time_chunk_meta_; + ChunkMeta* value_chunk_meta_; common::String measurement_name_; ChunkHeader time_chunk_header_; // TODO: support reading more than one measurement in AlignedChunkReader. @@ -161,16 +164,16 @@ class AlignedChunkReader : public IChunkReader { uint32_t value_chunk_visit_offset_; // Statistic *page_statistic_; - Compressor *time_compressor_; - Compressor *value_compressor_; - Filter *time_filter_; + Compressor* time_compressor_; + Compressor* value_compressor_; + Filter* time_filter_; - Decoder *time_decoder_; - Decoder *value_decoder_; + Decoder* time_decoder_; + Decoder* value_decoder_; common::ByteStream time_in_; common::ByteStream value_in_; - char *time_uncompressed_buf_; - char *value_uncompressed_buf_; + char* time_uncompressed_buf_; + char* value_uncompressed_buf_; std::vector value_page_col_notnull_bitmap_; uint32_t value_page_data_num_; int32_t cur_value_index; diff --git a/cpp/src/reader/result_set.h b/cpp/src/reader/result_set.h index 228a93330..87303cef4 100644 --- a/cpp/src/reader/result_set.h +++ b/cpp/src/reader/result_set.h @@ -261,6 +261,8 @@ class ResultSetIterator { if (result_set_) { int ret = result_set_->next(has_next); ASSERT(ret == 0); + // TODO:handle error in hasNext. + (void)ret; if (has_next) { cached_record_ = result_set_->get_row_record(); } else { @@ -297,7 +299,8 @@ inline ResultSetIterator ResultSet::iterator() { return ResultSetIterator(this); } -static void print_table_result_set(storage::ResultSet* table_result_set) { +static MAYBE_UNUSED void print_table_result_set( + storage::ResultSet* table_result_set) { if (table_result_set == nullptr) { std::cout << "TableResultSet is nullptr" << std::endl; return; diff --git a/cpp/src/utils/errno_define.h b/cpp/src/utils/errno_define.h index df16c5fe7..f52cead99 100644 --- a/cpp/src/utils/errno_define.h +++ b/cpp/src/utils/errno_define.h @@ -43,9 +43,7 @@ const int E_COND_ERR = 19; const int E_OVERFLOW = 20; const int E_NO_MORE_DATA = 21; const int E_OUT_OF_ORDER = 22; -const int E_TSBLOCK_TYPE_NOT_SUPPORTED = 23; -const int E_TSBLOCK_DATA_INCONSISTENCY = 24; -const int E_DDL_UNKNOWN_TYPE = 25; +const int E_DATA_INCONSISTENCY = 24; const int E_TYPE_NOT_SUPPORTED = 26; const int E_TYPE_NOT_MATCH = 27; const int E_FILE_OPEN_ERR = 28; diff --git a/cpp/src/utils/util_define.h b/cpp/src/utils/util_define.h index 9667de76f..2796dfb0f 100644 --- a/cpp/src/utils/util_define.h +++ b/cpp/src/utils/util_define.h @@ -25,6 +25,13 @@ /* ======== unsued ======== */ #define UNUSED(v) ((void)(v)) +#if __cplusplus >= 201703L +#define MAYBE_UNUSED [[maybe_unused]] +#elif defined(__GNUC__) || defined(__clang__) +#define MAYBE_UNUSED __attribute__((unused)) +#else +#define MAYBE_UNUSED +#endif /* ======== inline ======== */ #ifdef __GNUC__ diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index b123ef69b..c59c0259b 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -477,3 +477,218 @@ TEST_F(TsFileTableReaderTest, TestDecoder) { reader.destroy_query_data_set(table_result_set); reader.close(); } + +void test_null_table(WriteFile* write_file, int max_rows, + std::function insert_data_into_tablet, + std::function check) { + std::string table_name = "t1"; + auto* schema = new storage::TableSchema( + table_name, + { + common::ColumnSchema("id1", common::TSDataType::STRING, + common::CompressionType::UNCOMPRESSED, + common::TSEncoding::PLAIN, + common::ColumnCategory::TAG), + common::ColumnSchema("id2", common::TSDataType::STRING, + common::CompressionType::UNCOMPRESSED, + common::TSEncoding::PLAIN, + common::ColumnCategory::TAG), + common::ColumnSchema("s1", common::TSDataType::INT64, + common::CompressionType::UNCOMPRESSED, + common::TSEncoding::PLAIN, + common::ColumnCategory::FIELD), + common::ColumnSchema("s2", common::TSDataType::INT32, + common::CompressionType::UNCOMPRESSED, + common::TSEncoding::PLAIN, + common::ColumnCategory::FIELD), + common::ColumnSchema("s3", common::TSDataType::FLOAT, + common::CompressionType::UNCOMPRESSED, + common::TSEncoding::PLAIN, + common::ColumnCategory::FIELD), + common::ColumnSchema("s4", common::TSDataType::DOUBLE, + common::CompressionType::UNCOMPRESSED, + common::TSEncoding::PLAIN, + common::ColumnCategory::FIELD), + }); + uint64_t memory_threshold = 128 * 1024 * 1024; + auto* writer = + new storage::TsFileTableWriter(write_file, schema, memory_threshold); + storage::Tablet tablet( + { + "id1", + "id2", + "s1", + "s2", + "s3", + "s4", + }, + { + common::TSDataType::STRING, + common::TSDataType::STRING, + common::TSDataType::INT64, + common::TSDataType::INT32, + common::TSDataType::FLOAT, + common::TSDataType::DOUBLE, + }, + max_rows); + insert_data_into_tablet(&tablet, max_rows); + writer->write_table(tablet); + writer->flush(); + writer->close(); + delete writer; + delete schema; + storage::TsFileReader reader; + reader.open(write_file->get_file_path()); + std::vector columns; + std::int64_t start_time = INT64_MIN; + std::int64_t end_time = INT64_MAX; + storage::ResultSet* temp_ret = nullptr; + reader.query(table_name, {"id1", "id2", "s1", "s2", "s3", "s4"}, start_time, + end_time, temp_ret); + auto ret = dynamic_cast(temp_ret); + std::cout << std::endl; + check(ret, max_rows); + ret->close(); + reader.close(); +} + +TEST_F(TsFileTableReaderTest, TestNullInTable) { + // 1. In some rows, all FIELD columns are empty. + test_null_table( + &write_file_, 10, + [](Tablet* tablet, int max_rows) { + for (int row = 0; row < max_rows; row++) { + int64_t timestamp = row; + tablet->add_timestamp(row, timestamp); + tablet->add_value(row, "id1", "id1"); + tablet->add_value(row, "id2", "id2"); + if (row % 2 == 0) { + tablet->add_value(row, "s1", static_cast(row)); + tablet->add_value(row, "s2", 1); + tablet->add_value(row, "s3", 1.1f); + tablet->add_value(row, "s4", 1.2); + } + } + }, + [](TableResultSet* result, int max_rows) { + bool has_next = false; + int line = 0; + while ((result->next(has_next)) == common::E_OK && has_next) { + line++; + if (result->get_value(1) % 2 != 0) { + ASSERT_TRUE(result->is_null("s1")); + ASSERT_TRUE(result->is_null("s2")); + ASSERT_TRUE(result->is_null("s3")); + ASSERT_TRUE(result->is_null("s4")); + } + ASSERT_FALSE(result->is_null("id1")); + ASSERT_FALSE(result->is_null("id2")); + } + ASSERT_EQ(line, max_rows); + }); +} + +TEST_F(TsFileTableReaderTest, TestNullInTable2) { + // 2. In some rows, the TAG column is entirely empty, + // and in some rows, all FIELD columns are empty. + test_null_table( + &write_file_, 10, + [](Tablet* tablet, int max_rows) { + for (int row = 0; row < max_rows; row++) { + int64_t timestamp = row; + tablet->add_timestamp(row, timestamp); + if (row % 2 == 0) { + tablet->add_value(row, "id1", "id1"); + tablet->add_value(row, "id2", "id2"); + } else { + tablet->add_value(row, "s1", static_cast(row)); + tablet->add_value(row, "s2", 1); + tablet->add_value(row, "s3", 1.1f); + tablet->add_value(row, "s4", 1.2); + } + } + }, + [](TableResultSet* result, int max_rows) { + bool has_next = false; + int line = 0; + while ((result->next(has_next)) == common::E_OK && has_next) { + line++; + bool even = result->get_value(1) % 2 == 0; + ASSERT_EQ(result->is_null("s1"), even); + ASSERT_EQ(result->is_null("s2"), even); + ASSERT_EQ(result->is_null("s3"), even); + ASSERT_EQ(result->is_null("s4"), even); + ASSERT_EQ(result->is_null("id1"), !even); + ASSERT_EQ(result->is_null("id2"), !even); + } + ASSERT_EQ(line, max_rows); + }); +} + +TEST_F(TsFileTableReaderTest, TestNullInTable3) { + // 3. In some rows, the TAG and Field columns are entirely empty, + test_null_table( + &write_file_, 10, + [](Tablet* tablet, int max_rows) { + for (int row = 0; row < max_rows; row++) { + int64_t timestamp = row; + tablet->add_timestamp(row, timestamp); + if (row % 2 == 0) { + tablet->add_value(row, "id1", "id1"); + tablet->add_value(row, "id2", "id2"); + tablet->add_value(row, "s1", static_cast(row)); + tablet->add_value(row, "s2", 1); + tablet->add_value(row, "s3", 1.1f); + tablet->add_value(row, "s4", 1.2); + } + } + }, + [](TableResultSet* result, int max_rows) { + bool has_next = false; + int line = 0; + while ((result->next(has_next)) == common::E_OK && has_next) { + line++; + bool odd = result->get_value(1) % 2 != 0; + ASSERT_EQ(result->is_null("s1"), odd); + ASSERT_EQ(result->is_null("s2"), odd); + ASSERT_EQ(result->is_null("s3"), odd); + ASSERT_EQ(result->is_null("s4"), odd); + ASSERT_EQ(result->is_null("id1"), odd); + ASSERT_EQ(result->is_null("id2"), odd); + } + ASSERT_EQ(line, max_rows); + }); +} + +TEST_F(TsFileTableReaderTest, TestNullInTable4) { + // 3. In some rows, the TAG and Field columns are entirely empty, + test_null_table( + &write_file_, 1000000, + [](Tablet* tablet, int max_rows) { + for (int row = 0; row < max_rows; row++) { + int64_t timestamp = row; + tablet->add_timestamp(row, timestamp); + tablet->add_value(row, "id1", "id1"); + tablet->add_value(row, "id2", "id2"); + if (row < 10) { + tablet->add_value(row, "s1", static_cast(row)); + tablet->add_value(row, "s2", 1); + tablet->add_value(row, "s3", 1.1f); + tablet->add_value(row, "s4", 1.2); + } + } + }, + [](TableResultSet* result, int max_rows) { + bool has_next = false; + int line = 0; + while ((result->next(has_next)) == common::E_OK && has_next) { + line++; + bool available = result->get_value(1) < 10; + ASSERT_EQ(!result->is_null("s1"), available); + ASSERT_EQ(!result->is_null("s2"), available); + ASSERT_EQ(!result->is_null("s3"), available); + ASSERT_EQ(!result->is_null("s4"), available); + } + ASSERT_EQ(line, max_rows); + }); +} From 082a6b796880c4fb27a2b2947fe38a6cc255ef30 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 18 Dec 2025 18:46:50 +0800 Subject: [PATCH 2/5] tmp code. --- cpp/src/common/tsfile_common.h | 2 +- cpp/src/encoding/plain_decoder.h | 3 ++- cpp/src/reader/aligned_chunk_reader.cc | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h index 39cd027ef..12b30523b 100644 --- a/cpp/src/common/tsfile_common.h +++ b/cpp/src/common/tsfile_common.h @@ -589,7 +589,7 @@ class AlignedTimeseriesIndex : public ITimeseriesIndex { return value_ts_idx_->get_measurement_name(); } virtual common::TSDataType get_data_type() const { - return time_ts_idx_->get_data_type(); + return value_ts_idx_->get_data_type(); } virtual Statistic* get_statistic() const { return value_ts_idx_->get_statistic(); diff --git a/cpp/src/encoding/plain_decoder.h b/cpp/src/encoding/plain_decoder.h index 27e8006f4..c2627f71d 100644 --- a/cpp/src/encoding/plain_decoder.h +++ b/cpp/src/encoding/plain_decoder.h @@ -27,7 +27,8 @@ namespace storage { class PlainDecoder : public Decoder { public: ~PlainDecoder() override = default; - FORCE_INLINE void reset() override { /* do nothing */ } + FORCE_INLINE void reset() override { /* do nothing */ + } FORCE_INLINE bool has_remaining(const common::ByteStream& buffer) override { return buffer.has_remaining(); } diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index e4779184f..cf3ca8326 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -673,7 +673,7 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( int64_t time = 0; common::String value; uint32_t mask = 1 << 7; - while (time_decoder_->has_remaining(time_in)) { + while (time_decoder_->has_remaining(time_in) && value_decoder_->has_remaining(value_in)) { cur_value_index++; bool should_read_data = true; if (value_page_col_notnull_bitmap_.empty() || From 2c601533b7ea5cf4ddbe90f69ec3cab52a48cdba Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 18 Dec 2025 20:54:37 +0800 Subject: [PATCH 3/5] fix. --- cpp/src/common/tsfile_common.h | 2 +- cpp/src/reader/aligned_chunk_reader.cc | 8 +------- .../table_view/tsfile_reader_table_test.cc | 18 ++++++++++++++++-- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h index 12b30523b..39cd027ef 100644 --- a/cpp/src/common/tsfile_common.h +++ b/cpp/src/common/tsfile_common.h @@ -589,7 +589,7 @@ class AlignedTimeseriesIndex : public ITimeseriesIndex { return value_ts_idx_->get_measurement_name(); } virtual common::TSDataType get_data_type() const { - return value_ts_idx_->get_data_type(); + return time_ts_idx_->get_data_type(); } virtual Statistic* get_statistic() const { return value_ts_idx_->get_statistic(); diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index cf3ca8326..e448989a3 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -673,7 +673,7 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( int64_t time = 0; common::String value; uint32_t mask = 1 << 7; - while (time_decoder_->has_remaining(time_in) && value_decoder_->has_remaining(value_in)) { + while (time_decoder_->has_remaining(time_in)) { cur_value_index++; bool should_read_data = true; if (value_page_col_notnull_bitmap_.empty() || @@ -681,12 +681,6 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( (mask >> (cur_value_index % 8)) == 0) { should_read_data = false; } - if (should_read_data) { - assert(value_decoder_->has_remaining(value_in)); - if (!value_decoder_->has_remaining(value_in)) { - return E_DATA_INCONSISTENCY; - } - } if (UNLIKELY(!row_appender.add_row())) { ret = E_OVERFLOW; cur_value_index--; diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index c59c0259b..4c4bef5cb 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -509,6 +509,10 @@ void test_null_table(WriteFile* write_file, int max_rows, common::CompressionType::UNCOMPRESSED, common::TSEncoding::PLAIN, common::ColumnCategory::FIELD), + common::ColumnSchema("s5", common::TSDataType::STRING, + common::CompressionType::UNCOMPRESSED, + common::TSEncoding::PLAIN, + common::ColumnCategory::FIELD), }); uint64_t memory_threshold = 128 * 1024 * 1024; auto* writer = @@ -521,6 +525,7 @@ void test_null_table(WriteFile* write_file, int max_rows, "s2", "s3", "s4", + "s5", }, { common::TSDataType::STRING, @@ -529,6 +534,7 @@ void test_null_table(WriteFile* write_file, int max_rows, common::TSDataType::INT32, common::TSDataType::FLOAT, common::TSDataType::DOUBLE, + common::TSDataType::STRING, }, max_rows); insert_data_into_tablet(&tablet, max_rows); @@ -543,8 +549,8 @@ void test_null_table(WriteFile* write_file, int max_rows, std::int64_t start_time = INT64_MIN; std::int64_t end_time = INT64_MAX; storage::ResultSet* temp_ret = nullptr; - reader.query(table_name, {"id1", "id2", "s1", "s2", "s3", "s4"}, start_time, - end_time, temp_ret); + reader.query(table_name, {"id1", "id2", "s1", "s2", "s3", "s4", "s5"}, + start_time, end_time, temp_ret); auto ret = dynamic_cast(temp_ret); std::cout << std::endl; check(ret, max_rows); @@ -567,6 +573,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable) { tablet->add_value(row, "s2", 1); tablet->add_value(row, "s3", 1.1f); tablet->add_value(row, "s4", 1.2); + tablet->add_value(row, "s5", "test"); } } }, @@ -580,6 +587,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable) { ASSERT_TRUE(result->is_null("s2")); ASSERT_TRUE(result->is_null("s3")); ASSERT_TRUE(result->is_null("s4")); + ASSERT_TRUE(result->is_null("s5")); } ASSERT_FALSE(result->is_null("id1")); ASSERT_FALSE(result->is_null("id2")); @@ -605,6 +613,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable2) { tablet->add_value(row, "s2", 1); tablet->add_value(row, "s3", 1.1f); tablet->add_value(row, "s4", 1.2); + tablet->add_value(row, "s5", "test"); } } }, @@ -618,6 +627,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable2) { ASSERT_EQ(result->is_null("s2"), even); ASSERT_EQ(result->is_null("s3"), even); ASSERT_EQ(result->is_null("s4"), even); + ASSERT_EQ(result->is_null("s5"), even); ASSERT_EQ(result->is_null("id1"), !even); ASSERT_EQ(result->is_null("id2"), !even); } @@ -640,6 +650,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable3) { tablet->add_value(row, "s2", 1); tablet->add_value(row, "s3", 1.1f); tablet->add_value(row, "s4", 1.2); + tablet->add_value(row, "s5", "test"); } } }, @@ -653,6 +664,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable3) { ASSERT_EQ(result->is_null("s2"), odd); ASSERT_EQ(result->is_null("s3"), odd); ASSERT_EQ(result->is_null("s4"), odd); + ASSERT_EQ(result->is_null("s5"), odd); ASSERT_EQ(result->is_null("id1"), odd); ASSERT_EQ(result->is_null("id2"), odd); } @@ -675,6 +687,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable4) { tablet->add_value(row, "s2", 1); tablet->add_value(row, "s3", 1.1f); tablet->add_value(row, "s4", 1.2); + tablet->add_value(row, "s5", "test"); } } }, @@ -688,6 +701,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable4) { ASSERT_EQ(!result->is_null("s2"), available); ASSERT_EQ(!result->is_null("s3"), available); ASSERT_EQ(!result->is_null("s4"), available); + ASSERT_EQ(!result->is_null("s5"), available); } ASSERT_EQ(line, max_rows); }); From 78835a8e6c70f5d07e5b79fc3775f50c1c537493 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Fri, 19 Dec 2025 10:38:49 +0800 Subject: [PATCH 4/5] fix code. --- .github/workflows/unit-test-cpp.yml | 1 + .github/workflows/unit-test-python.yml | 1 + cpp/src/reader/aligned_chunk_reader.cc | 12 ++++++++++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/.github/workflows/unit-test-cpp.yml b/.github/workflows/unit-test-cpp.yml index d4172f8b4..15a629914 100644 --- a/.github/workflows/unit-test-cpp.yml +++ b/.github/workflows/unit-test-cpp.yml @@ -111,6 +111,7 @@ jobs: if [[ "$RUNNER_OS" == "Linux" ]]; then sudo update-alternatives --install /usr/bin/clang-format clang-format /usr/bin/clang-format-17 100 sudo update-alternatives --set clang-format /usr/bin/clang-format-17 + sudo apt-get update sudo apt-get install -y uuid-dev elif [[ "$RUNNER_OS" == "Windows" ]]; then choco install llvm --version 17.0.6 --force diff --git a/.github/workflows/unit-test-python.yml b/.github/workflows/unit-test-python.yml index 72be32363..f1f799b7b 100644 --- a/.github/workflows/unit-test-python.yml +++ b/.github/workflows/unit-test-python.yml @@ -82,6 +82,7 @@ jobs: if [[ "$RUNNER_OS" == "Linux" ]]; then sudo update-alternatives --install /usr/bin/clang-format clang-format /usr/bin/clang-format-17 100 sudo update-alternatives --set clang-format /usr/bin/clang-format-17 + sudo apt-get update sudo apt-get install -y uuid-dev elif [[ "$RUNNER_OS" == "Windows" ]]; then choco install llvm --version 17.0.6 --force diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index e448989a3..14250e7f8 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -677,10 +677,18 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( cur_value_index++; bool should_read_data = true; if (value_page_col_notnull_bitmap_.empty() || - (value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & - (mask >> (cur_value_index % 8)) == 0) { + ((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & + (mask >> (cur_value_index % 8))) == 0) { should_read_data = false; } + + if (should_read_data) { + assert(value_decoder_->has_remaining(value_in)); + if (!value_decoder_->has_remaining(value_in)) { + return E_DATA_INCONSISTENCY; + } + } + if (UNLIKELY(!row_appender.add_row())) { ret = E_OVERFLOW; cur_value_index--; From 4dbffc36c41bd0246f106b5b7f9f1ab02d7253ad Mon Sep 17 00:00:00 2001 From: colin Date: Fri, 19 Dec 2025 11:10:32 +0800 Subject: [PATCH 5/5] fix memory leak. --- cpp/test/reader/table_view/tsfile_reader_table_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index 4c4bef5cb..c281de413 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -555,6 +555,7 @@ void test_null_table(WriteFile* write_file, int max_rows, std::cout << std::endl; check(ret, max_rows); ret->close(); + reader.destroy_query_data_set(ret); reader.close(); }