Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1136,10 +1136,6 @@ class FlatMapFieldWriter : public FieldWriter {
return;
}

// Only create keys on first call to write (with valid ranges). Subsequent
// calls must have the same set of keys, otherwise writer will throw.
bool populateMap = currentPassthroughFields_.empty();

const auto& values = flatMapVector->mapValues();
const auto& inMaps = flatMapVector->inMaps();

Expand All @@ -1158,8 +1154,13 @@ class FlatMapFieldWriter : public FieldWriter {
"FlatMapVector keys are not distinct.");
distinctKeySet.insert(key);

auto& writer = populateMap ? createPassthroughValueFieldWriter(key)
: findPassthroughValueFieldWriter(key);
// Only create keys on first call to write (with valid ranges). To
// account for key pruning, let's avoid failing on unfound subsequent
// keys.
auto existingPair = currentPassthroughFields_.find(key);
auto& writer = existingPair != currentPassthroughFields_.end()
? *existingPair->second
: createPassthroughValueFieldWriter(key);

if (inMaps[i]) {
writer.write(values[i], inMaps[i], childRanges);
Expand Down
72 changes: 55 additions & 17 deletions dwio/nimble/velox/selective/FlatMapColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ std::vector<KeyNode<T>> makeKeyNodes(

// Adjust the scan spec according to the output type.
switch (outputType) {
// For a kMap output, just need a scan spec for map keys and one for map
// values.
case FlatMapOutput::kMap: {
// For a kMap and kFlatMap output, just need a scan spec for map keys and
// one for map values.
case FlatMapOutput::kMap:
case FlatMapOutput::kFlatMap: {
keysSpec = scanSpec.getOrCreateChild(
common::Subfield(common::ScanSpec::kMapKeysFieldName));
valuesSpec = scanSpec.getOrCreateChild(
Expand All @@ -78,18 +79,6 @@ std::vector<KeyNode<T>> makeKeyNodes(
valuesSpec->setProjectOut(true);
break;
}
// For a kFlatMap output, need to find the streams (distinct keys) to read
// from the file (nimbleType).
case FlatMapOutput::kFlatMap: {
for (int i = 0; i < childrenCount; ++i) {
auto key = parseKeyValue<T>(nimbleType.nameAt(i));
auto spec = scanSpec.getOrCreateChild(nimbleType.nameAt(i));
spec->setProjectOut(true);
spec->setChannel(i);
childSpecs[key] = spec;
}
break;
}
// For a kStruct output, the streams to be read are part of the scan spec
// already.
case FlatMapOutput::kStruct: {
Expand All @@ -113,7 +102,7 @@ std::vector<KeyNode<T>> makeKeyNodes(
if (auto it = childSpecs.find(node.key);
it != childSpecs.end() && !it->second->isConstant()) {
childSpec = it->second;
} else if (outputType != FlatMapOutput::kMap) {
} else if (outputType == FlatMapOutput::kStruct) {
// Column not selected in 'scanSpec', skipping it.
continue;
} else {
Expand Down Expand Up @@ -246,7 +235,6 @@ class FlatMapColumnReader
for (int i = 0; i < keyNodes_.size(); ++i) {
keyNodes_[i].reader->scanSpec()->setSubscript(i);
children_[i] = keyNodes_[i].reader.get();

rawKeys[i] = keyNodes_[i].key.get();
}
}
Expand Down Expand Up @@ -308,6 +296,56 @@ class FlatMapColumnReader
// decoders.
}

// Same as FlatMapAsMapColumnReader.
void read(int64_t offset, const RowSet& rows, const uint64_t* incomingNulls) {
numReads_ = scanSpec_->newRead();
prepareRead<char>(offset, rows, incomingNulls);
VELOX_DCHECK(!hasDeletion());
auto activeRows = rows;
auto* mapNulls =
nullsInReadRange_ ? nullsInReadRange_->as<uint64_t>() : nullptr;
if (scanSpec_->filter()) {
auto kind = scanSpec_->filter()->kind();
VELOX_CHECK(
kind == velox::common::FilterKind::kIsNull ||
kind == velox::common::FilterKind::kIsNotNull);
filterNulls<int32_t>(
rows, kind == velox::common::FilterKind::kIsNull, false);
if (outputRows_.empty()) {
for (auto* child : children_) {
child->addParentNulls(offset, mapNulls, rows);
}
readOffset_ = offset + rows.back() + 1;
return;
}
activeRows = outputRows_;
}
// Separate the loop to be cache friendly.
for (auto* child : children_) {
advanceFieldReader(child, offset);
}
for (auto* child : children_) {
child->read(offset, activeRows, mapNulls);
child->addParentNulls(offset, mapNulls, rows);
}
readOffset_ = offset + rows.back() + 1;
}

void getValues(const RowSet& rows, VectorPtr* result) override {
SelectiveFlatMapColumnReader::getValues(rows, result);

// After reading the flat map streams recursively, need to read the in map
// buffers.
VELOX_CHECK(result && *result);
auto flatMapVector = (*result)->as<FlatMapVector>();
VELOX_CHECK(flatMapVector);

for (int i = 0; i < keyNodes_.size(); ++i) {
auto& nimbleData = children_[i]->formatData().template as<NimbleData>();
flatMapVector->inMapsAt(i, true) = nimbleData.inMapBuffer();
}
}

private:
std::vector<KeyNode<T>> keyNodes_;
};
Expand Down
39 changes: 38 additions & 1 deletion dwio/nimble/velox/selective/tests/SelectiveNimbleReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1523,7 +1523,6 @@ TEST_P(SelectiveNimbleReaderTest, nativeFlatMap) {

// Dictionary wrapped keys
{
LOG(INFO) << "Dictionary wrapped keys";
testRoundtrip(constructFlatMap(
BaseVector::wrapInDictionary(
nullptr,
Expand Down Expand Up @@ -1564,6 +1563,44 @@ TEST_P(SelectiveNimbleReaderTest, nativeFlatMap) {
3, 0, makeFlatVector<int32_t>({1, 2, 3})))),
"FlatMapVector keys are not distinct.");
}

// Scan spec with map key filter
{
auto inputFlatMap = makeFlatMapVector<int64_t, int64_t>({
{{{1, 100}, {5, 500}, {10, 1000}, {15, 1500}, {20, 2000}}},
{{{2, 200}, {8, 800}, {12, 1200}}},
{{{3, 300}, {7, 700}, {18, 1800}, {25, 2500}}},
});
auto input = makeRowVector({inputFlatMap, inputFlatMap->toMapVector()});

VeloxWriterOptions writerOptions;
writerOptions.flatMapColumns = {"c0"};
auto fileContent =
test::createNimbleFile(*rootPool(), input, writerOptions, true);

// Test with map key filter [5, 12]
auto scanSpec = std::make_shared<common::ScanSpec>("root");
scanSpec->addAllChildFields(*input->type());
scanSpec->childByName("c0")
->childByName(common::ScanSpec::kMapKeysFieldName)
->setFilter(std::make_unique<common::BigintRange>(5, 12, false));
scanSpec->childByName("c1")
->childByName(common::ScanSpec::kMapKeysFieldName)
->setFilter(std::make_unique<common::BigintRange>(5, 12, false));
auto readers = makeReaders(input, fileContent, scanSpec, true);

// Expected output after filtering: only keys in [5, 12] remain
auto expectedFlatMap = makeFlatMapVector<int64_t, int64_t>({
{{{5, 500}, {10, 1000}}},
{{{8, 800}, {12, 1200}}},
{{{7, 700}}},
});
auto expected =
makeRowVector({expectedFlatMap->toMapVector(), expectedFlatMap});
validate(*expected, *readers.rowReader, inputFlatMap->size(), [](auto) {
return true;
});
}
}

TEST_P(SelectiveNimbleReaderTest, mapAsStruct) {
Expand Down
Loading