Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,14 @@ void Block::checkNumberOfRows() const
{
auto first_col = data.front();
throw Exception(
fmt::format(
"Sizes of columns doesn't match: {}(id={}): {}, {}(id={}): {}",
first_col.name,
first_col.column_id,
rows,
elem.name,
elem.column_id,
size),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH,
"Sizes of columns doesn't match: {}(id={}): rows={}, {}(id={}): rows={}",
first_col.name,
first_col.column_id,
rows,
elem.name,
elem.column_id,
size);
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,20 @@ static inline bool atomicReadWrite(
std::tie(decoding_schema_snapshot, block_ptr)
= storage->getSchemaSnapshotAndBlockForDecoding(lock, true, should_handle_version_col);
block_decoding_schema_epoch = decoding_schema_snapshot->decoding_schema_epoch;
#if 0
const auto & table_info = storage->getTableInfo();
LOG_INFO(
Logger::get("dddddddddd"),
"decode with schema snapshot, keyspace={} table_id={} region_id={} epoch={} update_ts={} "
"with_version_column={} columns={}",
rw_ctx.keyspace_id,
rw_ctx.table_id,
region->id(),
block_decoding_schema_epoch,
table_info.update_timestamp,
should_handle_version_col,
table_info.columns.size());
#endif

auto reader = RegionBlockReader(decoding_schema_snapshot);
if (!reader.read(*block_ptr, data_list_read, force_decode))
Expand Down
23 changes: 20 additions & 3 deletions dbms/src/Storages/KVStore/Decode/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ bool RegionBlockReader::read(Block & block, const ReadList & data_list, bool for
{
try
{
#if 0
for (const auto & data : data_list)
{
LOG_INFO(Logger::get("dddddddddd"), "RegionBlockReader::read data.value={}", data.value->toDebugString());
}
#endif

switch (schema_snapshot->pk_type)
{
case TMTPKType::INT64:
Expand Down Expand Up @@ -177,8 +184,16 @@ struct VersionColResolver<RegionUncommittedDataList>
template <TMTPKType pk_type, typename ReadList>
bool RegionBlockReader::readImpl(Block & block, const ReadList & data_list, bool force_decode)
{
#if 0
LOG_INFO(
Logger::get("dddddddddd"),
"RegionBlockReader::readImpl start, schema_snapshot.column_defines={}",
*schema_snapshot->column_defines);
#endif

VersionColResolver<ReadList> version_col_resolver;
version_col_resolver.check(block, schema_snapshot->column_defines->size());
// The column_ids to read according to schema_snapshot, each elem is (column_id, block_pos)
const auto & read_column_ids = schema_snapshot->getColId2BlockPosMap();
const auto & pk_column_ids = schema_snapshot->pk_column_ids;
const auto & pk_pos_map = schema_snapshot->pk_pos_map;
Expand All @@ -194,12 +209,12 @@ bool RegionBlockReader::readImpl(Block & block, const ReadList & data_list, bool
/// extra handle, del, version column is with column id smaller than other visible column id,
/// so they must exists before all other columns, and we can get them before decoding other columns
ColumnUInt8 * raw_delmark_col = nullptr;
const size_t invalid_column_pos = std::numeric_limits<size_t>::max();
const static size_t INVALID_COLUMN_POS = std::numeric_limits<size_t>::max();
// we cannot figure out extra_handle's column type now, so we just remember it's pos here
size_t extra_handle_column_pos = invalid_column_pos;
size_t extra_handle_column_pos = INVALID_COLUMN_POS;

while (raw_delmark_col == nullptr || version_col_resolver.needBuild()
|| extra_handle_column_pos == invalid_column_pos)
|| extra_handle_column_pos == INVALID_COLUMN_POS)
{
if (column_ids_iter->first == MutSup::delmark_col_id)
{
Expand Down Expand Up @@ -269,6 +284,8 @@ bool RegionBlockReader::readImpl(Block & block, const ReadList & data_list, bool
else
{
// Parse column value from encoded value
// Decode the column_ids from `column_ids_iter` to `read_column_ids.end()`
// and insert into `block` at position starting from `next_column_pos`
if (!appendRowToBlock(
*value_ptr,
column_ids_iter,
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,13 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
{
auto tikv_key = TiKVKey(cmds.keys[i].data, cmds.keys[i].len);
auto tikv_value = TiKVValue(cmds.vals[i].data, cmds.vals[i].len);
#if 0
LOG_INFO(
Logger::get("dddddddddd"),
"Region::handleWriteRaftCmd Put key={}, value={}",
tikv_key.toDebugString(),
tikv_value.toDebugString());
#endif
if (cf == ColumnFamilyType::Write)
{
write_put_key_count++;
Expand Down
86 changes: 84 additions & 2 deletions dbms/src/Storages/KVStore/tests/gtest_region_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,9 @@ TEST_F(RegionBlockReaderTest, MissingColumnRowV2)
encodeColumns(table_info, fields, RowEncodeVersion::RowV2);
auto new_table_info = getTableInfoWithMoreColumns({MutSup::extra_handle_id}, false);
auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info);
ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, false));
// `decodeAndCheckColumns` will return false because "column1" not_null=true but no_default_value=false
// FIXME: Re check the logic here
ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false));
}

TEST_F(RegionBlockReaderTest, MissingColumnRowV1)
Expand All @@ -363,7 +365,9 @@ TEST_F(RegionBlockReaderTest, MissingColumnRowV1)
encodeColumns(table_info, fields, RowEncodeVersion::RowV1);
auto new_table_info = getTableInfoWithMoreColumns({MutSup::extra_handle_id}, false);
auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info);
ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, false));
// `decodeAndCheckColumns` will return false because "column1" not_null=true but no_default_value=false
// FIXME: Re check the logic here
ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false));
}

TEST_F(RegionBlockReaderTest, ExtraColumnRowV2)
Expand Down Expand Up @@ -693,4 +697,82 @@ try
CATCH


TEST_F(RegionBlockReaderTest, ReadFromRegionDefaultValue)
try
{
// With this table_info, c1 is filled with "0" according to ori_default
TableInfo table_info(
R"({"cols":[{"id":1,"name":{"L":"c0","O":"c0"},"offset":0,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":4,"Tp":1}},{"id":2,"name":{"L":"handle","O":"handle"},"offset":1,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":515,"Flen":11,"Tp":3}},{"default":"-56083770","id":7,"name":{"L":"c1","O":"c1"},"offset":2,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":1,"Flen":20,"Tp":8}},{"id":4,"name":{"L":"c2","O":"c2"},"offset":3,"origin_default":"0.07954397","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":-1,"Flag":4097,"Flen":12,"Tp":4}},{"id":5,"name":{"L":"c5","O":"c5"},"offset":4,"origin_default":"0","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}},{"default":"247262911","id":6,"name":{"L":"c4","O":"c4"},"offset":5,"origin_default":"247262911","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}}],"id":711,"index_info":[],"is_common_handle":false,"keyspace_id":4294967295,"name":{"L":"t0","O":"t0"},"pk_is_handle":true,"state":5,"tiflash_replica":{"Available":true,"Count":1},"update_timestamp":463845180343844895})",
NullspaceID);

RegionID region_id = 4;
// the start_key and end_key for table_id = 68
String region_start_key(bytesFromHexString("7480000000000002FF7C5F720000000000FA"));
String region_end_key(bytesFromHexString("7480000000000002FF7D00000000000000F8"));
auto region = RegionBench::makeRegionForRange(region_id, region_start_key, region_end_key);
// the hex kv dump from SSTFile
std::vector<std::tuple<std::string_view, std::string_view>> kvs = {
// {
// "7480000000000002FFA95F728000000000FF0000010000000000FAF9901806DEF7FFDA",
// "50A380A08892FFF9B706762C8000040000000405060708000F0016001A00BFDC4011A00000000A0080000000000A008000003CA339"
// "1ABC85",
// },
// {
// "7480000000000002FFA95F728000000000FF0000010000000000FAF9901806DEF7FFD8",
// "50A680A08892FFF9B706762C8000040000000405060708000F0016001A00BFDC4011A00000000A008033E04D600A008000003CA339"
// "1ABC85",
// },
{
"7480000000000002FFA95F728000000000FF0000020000000000FAF9901806DE33FFE8",
"509680B08E92FFF9B706762580000300000004050608000F001600BF720CDD400000000A0080000000010A00800000393C",
},
};
for (const auto & [k, v] : kvs)
{
region->insertDebug("write", TiKVKey(bytesFromHexString(k)), TiKVValue(bytesFromHexString(v)));
}

auto data_list_read = ReadRegionCommitCache(region, true);
ASSERT_TRUE(data_list_read.has_value());

auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info);
{
// force_decode=false can not decode because there are
// missing value for column with not null flag.
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_FALSE(reader.read(res_block, *data_list_read, false));
}
{
// force_decode=true can decode the block, and filling the default value for c1
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_TRUE(reader.read(res_block, *data_list_read, true));
// TODO: verify the default value is filled correctly
LOG_INFO(
Logger::get(),
"Decoded block:\n{}",
DB::tests::getColumnsContent(res_block.getColumnsWithTypeAndName()));
}

// With this table_info, c1 does not have the "not null" flag
TableInfo table_info_c1_nullable(
R"({"cols":[{"id":1,"name":{"L":"c0","O":"c0"},"offset":0,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":4,"Tp":1}},{"id":2,"name":{"L":"handle","O":"handle"},"offset":1,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":515,"Flen":11,"Tp":3}},{"id":7,"name":{"L":"c1","O":"c1"},"offset":2,"state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":20,"Tp":8}},{"id":4,"name":{"L":"c2","O":"c2"},"offset":3,"origin_default":"0.07954397","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":-1,"Flag":4097,"Flen":12,"Tp":4}},{"id":5,"name":{"L":"c5","O":"c5"},"offset":4,"origin_default":"0","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}},{"default":"247262911","id":6,"name":{"L":"c4","O":"c4"},"offset":5,"origin_default":"247262911","state":5,"type":{"Charset":"binary","Collate":"binary","Decimal":0,"Flag":0,"Flen":10,"Tp":246}}],"id":681,"index_info":[],"is_common_handle":false,"keyspace_id":4294967295,"name":{"L":"t0","O":"t0"},"pk_is_handle":true,"state":5,"tiflash_replica":{"Available":true,"Count":1},"update_timestamp":463844343842340870})",
NullspaceID);

decoding_schema = getDecodingStorageSchemaSnapshot(table_info_c1_nullable);
{
// force_decode=false should be able to decode because c1 is nullable
auto reader = RegionBlockReader(decoding_schema);
Block res_block = createBlockSortByColumnID(decoding_schema);
ASSERT_TRUE(reader.read(res_block, *data_list_read, false));
// TODO: verify the default value is filled correctly
LOG_INFO(
Logger::get(),
"Decoded block:\n{}",
DB::tests::getColumnsContent(res_block.getColumnsWithTypeAndName()));
}
}
CATCH

} // namespace DB::tests
10 changes: 10 additions & 0 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,16 @@ std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> StorageDeltaMerg
store->getHandle(),
decoding_schema_epoch++,
with_version_column);
#if 0
LOG_INFO(
Logger::get("dddddddddd"),
"Refresh decoding schema snapshot, table_id={} epoch={} update_ts={} with_version_column={} table_info={}",
tidb_table_info.id,
decoding_schema_snapshot->decoding_schema_epoch,
tidb_table_info.update_timestamp,
with_version_column,
tidb_table_info.serialize());
#endif
cache_blocks.clear();
decoding_schema_changed = false;
}
Expand Down
69 changes: 65 additions & 4 deletions dbms/src/TiDB/Decode/RowCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <TiDB/Decode/RowCodec.h>
#include <TiDB/Schema/TiDB.h>

#include "Common/FieldVisitors.h"


namespace DB
{
Expand Down Expand Up @@ -412,10 +414,15 @@ inline bool addDefaultValueToColumnIfPossible(
// fallthrough to fill default value when force_decode
}

if (column_info.hasNoDefaultValueFlag() && column_info.hasNotNullFlag())
if (column_info.hasNotNullFlag())
{
if (!force_decode)
{
// This is a Column that does not have encoded datum in the value, but it is defined as NOT NULL.
// It could be a row encoded by newer schema after turning `NOT NULL` to `NULLABLE`.
// Return false to trigger schema sync when `force_decode==false`.
return false;
}
// Else the row does not contain this "not null" / "no default value" column,
// it could be a row encoded by older schema.
// fallthrough to fill default value when force_decode
Expand Down Expand Up @@ -459,8 +466,18 @@ bool appendRowV2ToBlockImpl(
raw_value,
num_not_null_columns,
value_offsets);
#if 0
LOG_INFO(
Logger::get("dddddddddd"),
"not_null_column_ids={} null_column_ids={}",
not_null_column_ids,
null_column_ids);
#endif

size_t values_start_pos = cursor;
// how many not null columns have been processed
size_t idx_not_null = 0;
// how many null columns have been processed
size_t idx_null = 0;
// Merge ordered not null/null columns to keep order.
while (idx_not_null < not_null_column_ids.size() || idx_null < null_column_ids.size())
Expand All @@ -479,33 +496,59 @@ bool appendRowV2ToBlockImpl(

auto next_datum_column_id = is_null ? null_column_ids[idx_null] : not_null_column_ids[idx_not_null];
const auto next_column_id = column_ids_iter->first;
#if 0
LOG_INFO(
Logger::get("dddddddddd"),
"next_column_id={} next_datum_column_id={} force_decode={}",
next_column_id,
next_datum_column_id,
force_decode);
#endif
if (next_column_id > next_datum_column_id)
{
// The next column id to read is bigger than the column id of next datum in encoded row.
// The next_column_id to read is bigger than the next_datum_column_id in encoded row.
// It means this is the datum of extra column. May happen when reading after dropping
// a column.
// For `force_decode == false`, we should return false to let upper layer trigger schema sync.
if (!force_decode)
return false;
// Ignore the extra column and continue to parse other datum
// For `force_decode == true`, we just skip this extra column and continue to parse other datum.
if (is_null)
idx_null++;
else
idx_not_null++;
}
else if (next_column_id < next_datum_column_id)
{
// The next column id to read is less than the column id of next datum in encoded row.
// The next_column_id to read is less than the next_datum_column_id in encoded row.
// It means this is the datum of missing column. May happen when reading after adding
// a column.
// Fill with default value and continue to read data for next column id.
const auto & column_info = column_infos[column_ids_iter->second];
#if 0
LOG_INFO(
Logger::get("dddddddddd"),
"appendRowV2ToBlockImpl: fill default value for missing column,"
" next_column_id={} next_datum_column_id={} block_column_pos={}"
" column_info={{name={} id={} not_null={} no_default_val={} default_value={}}}",
next_column_id,
next_datum_column_id,
block_column_pos,
column_info.name,
column_info.id,
column_info.hasNotNullFlag(),
column_info.hasNoDefaultValueFlag(),
applyVisitor(FieldVisitorToString(), column_info.defaultValueToField()));
#endif
if (!addDefaultValueToColumnIfPossible(
column_info,
block,
block_column_pos,
ignore_pk_if_absent,
force_decode))
{
return false;
}
column_ids_iter++;
block_column_pos++;
}
Expand Down Expand Up @@ -570,18 +613,36 @@ bool appendRowV2ToBlockImpl(
block_column_pos++;
}
}
// There are more columns to read other than the datum encoded in the row.
while (column_ids_iter != column_ids_iter_end)
{
// Skip if the column is the same as `pk_handle_id`. The value of column
// `pk_handle_id` will be filled in upper layer but not in this function.
if (column_ids_iter->first != pk_handle_id)
{
const auto & column_info = column_infos[column_ids_iter->second];
#if 0
LOG_INFO(
Logger::get("dddddddddd"),
"appendRowV2ToBlockImpl: fill default value for missing column,"
" block_column_pos={}"
" column_info={{name={} id={} not_null={} no_default_val={} default_value={}}}",
block_column_pos,
column_info.name,
column_info.id,
column_info.hasNotNullFlag(),
column_info.hasNoDefaultValueFlag(),
applyVisitor(FieldVisitorToString(), column_info.defaultValueToField()));
#endif
if (!addDefaultValueToColumnIfPossible(
column_info,
block,
block_column_pos,
ignore_pk_if_absent,
force_decode))
{
return false;
}
}
column_ids_iter++;
block_column_pos++;
Expand Down