Skip to content

Commit bfcc616

Browse files
committed
parquet: refine encrypted bloom filter deserialize helpers
1 parent bf9ffb5 commit bfcc616

1 file changed

Lines changed: 104 additions & 88 deletions

File tree

cpp/src/parquet/bloom_filter.cc

Lines changed: 104 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,11 @@ void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
9595
this->hasher_ = std::make_unique<XxHasher>();
9696
}
9797

98-
static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
98+
namespace {
99+
100+
constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
99101

100-
static ::arrow::Status ValidateBloomFilterHeader(
101-
const format::BloomFilterHeader& header) {
102+
::arrow::Status ValidateBloomFilterHeader(const format::BloomFilterHeader& header) {
102103
if (!header.algorithm.__isset.BLOCK) {
103104
return ::arrow::Status::Invalid(
104105
"Unsupported Bloom filter algorithm: ", header.algorithm, ".");
@@ -124,6 +125,104 @@ static ::arrow::Status ValidateBloomFilterHeader(
124125
return ::arrow::Status::OK();
125126
}
126127

128+
BlockSplitBloomFilter DeserializeEncrypted(const ReaderProperties& properties,
129+
ArrowInputStream* input,
130+
std::optional<int64_t> bloom_filter_length,
131+
Decryptor* header_decryptor,
132+
Decryptor* bitset_decryptor) {
133+
// Encrypted path: header and bitset are encrypted separately.
134+
ThriftDeserializer deserializer(properties);
135+
format::BloomFilterHeader header;
136+
137+
// Read the length-prefixed ciphertext for the header.
138+
PARQUET_ASSIGN_OR_THROW(auto length_buf, input->Read(kCiphertextLengthSize));
139+
if (ARROW_PREDICT_FALSE(length_buf->size() < kCiphertextLengthSize)) {
140+
std::stringstream ss;
141+
ss << "Bloom filter header read failed: expected " << kCiphertextLengthSize
142+
<< " bytes, got " << length_buf->size();
143+
throw ParquetException(ss.str());
144+
}
145+
146+
const int64_t header_cipher_total_len =
147+
ParseCiphertextTotalLength(length_buf->data(), length_buf->size());
148+
if (ARROW_PREDICT_FALSE(header_cipher_total_len >
149+
std::numeric_limits<int32_t>::max())) {
150+
throw ParquetException("Bloom filter header ciphertext length overflows int32");
151+
}
152+
if (bloom_filter_length && header_cipher_total_len > *bloom_filter_length) {
153+
throw ParquetException(
154+
"Bloom filter length less than encrypted bloom filter header length");
155+
}
156+
// Read the full header ciphertext and decrypt the Thrift header.
157+
auto header_cipher_buf =
158+
AllocateBuffer(properties.memory_pool(), header_cipher_total_len);
159+
std::memcpy(header_cipher_buf->mutable_data(), length_buf->data(),
160+
kCiphertextLengthSize);
161+
const int64_t header_cipher_remaining = header_cipher_total_len - kCiphertextLengthSize;
162+
PARQUET_ASSIGN_OR_THROW(auto read_size, input->Read(header_cipher_remaining,
163+
header_cipher_buf->mutable_data() +
164+
kCiphertextLengthSize));
165+
if (ARROW_PREDICT_FALSE(read_size < header_cipher_remaining)) {
166+
std::stringstream ss;
167+
ss << "Bloom filter header read failed: expected " << header_cipher_remaining
168+
<< " bytes, got " << read_size;
169+
throw ParquetException(ss.str());
170+
}
171+
172+
uint32_t header_cipher_len = static_cast<uint32_t>(header_cipher_total_len);
173+
try {
174+
deserializer.DeserializeMessage(header_cipher_buf->data(), &header_cipher_len,
175+
&header, header_decryptor);
176+
DCHECK_EQ(header_cipher_len, header_cipher_total_len);
177+
} catch (std::exception& e) {
178+
std::stringstream ss;
179+
ss << "Deserializing bloom filter header failed.\n" << e.what();
180+
throw ParquetException(ss.str());
181+
}
182+
PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));
183+
184+
const int32_t bloom_filter_size = header.numBytes;
185+
const int32_t bitset_cipher_len = bitset_decryptor->CiphertextLength(bloom_filter_size);
186+
const int64_t total_cipher_len =
187+
header_cipher_total_len + static_cast<int64_t>(bitset_cipher_len);
188+
if (bloom_filter_length && *bloom_filter_length != total_cipher_len) {
189+
std::stringstream ss;
190+
ss << "Bloom filter length (" << bloom_filter_length.value()
191+
<< ") does not match the actual bloom filter (size: " << total_cipher_len << ").";
192+
throw ParquetException(ss.str());
193+
}
194+
195+
// Read and decrypt the bitset bytes.
196+
PARQUET_ASSIGN_OR_THROW(auto bitset_cipher_buf, input->Read(bitset_cipher_len));
197+
if (ARROW_PREDICT_FALSE(bitset_cipher_buf->size() < bitset_cipher_len)) {
198+
std::stringstream ss;
199+
ss << "Bloom filter bitset read failed: expected " << bitset_cipher_len
200+
<< " bytes, got " << bitset_cipher_buf->size();
201+
throw ParquetException(ss.str());
202+
}
203+
204+
const int32_t bitset_plain_len =
205+
bitset_decryptor->PlaintextLength(static_cast<int32_t>(bitset_cipher_len));
206+
if (ARROW_PREDICT_FALSE(bitset_plain_len != bloom_filter_size)) {
207+
throw ParquetException("Bloom filter bitset size does not match header");
208+
}
209+
210+
auto bitset_plain_buf = AllocateBuffer(properties.memory_pool(), bitset_plain_len);
211+
int32_t decrypted_len =
212+
bitset_decryptor->Decrypt(bitset_cipher_buf->span_as<const uint8_t>(),
213+
bitset_plain_buf->mutable_span_as<uint8_t>());
214+
if (ARROW_PREDICT_FALSE(decrypted_len != bitset_plain_len)) {
215+
throw ParquetException("Bloom filter bitset decryption failed");
216+
}
217+
218+
// Initialize the bloom filter from the decrypted bitset.
219+
BlockSplitBloomFilter bloom_filter(properties.memory_pool());
220+
bloom_filter.Init(bitset_plain_buf->data(), bloom_filter_size);
221+
return bloom_filter;
222+
}
223+
224+
} // namespace
225+
127226
BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
128227
const ReaderProperties& properties, ArrowInputStream* input,
129228
std::optional<int64_t> bloom_filter_length, Decryptor* header_decryptor,
@@ -133,91 +232,8 @@ BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
133232
throw ParquetException(
134233
"Bloom filter decryptors must be both provided or both null");
135234
}
136-
137-
// Encrypted path: header and bitset are encrypted separately.
138-
ThriftDeserializer deserializer(properties);
139-
format::BloomFilterHeader header;
140-
141-
// Read the length-prefixed ciphertext for the header.
142-
PARQUET_ASSIGN_OR_THROW(auto length_buf, input->Read(kCiphertextLengthSize));
143-
if (ARROW_PREDICT_FALSE(length_buf->size() < kCiphertextLengthSize)) {
144-
throw ParquetException("Bloom filter header read failed: not enough data");
145-
}
146-
147-
const int64_t header_cipher_total_len =
148-
ParseCiphertextTotalLength(length_buf->data(), length_buf->size());
149-
if (ARROW_PREDICT_FALSE(header_cipher_total_len >
150-
std::numeric_limits<int32_t>::max())) {
151-
throw ParquetException("Bloom filter header ciphertext length overflows int32");
152-
}
153-
if (bloom_filter_length && header_cipher_total_len > *bloom_filter_length) {
154-
throw ParquetException(
155-
"Bloom filter length less than encrypted bloom filter header length");
156-
}
157-
// Read the full header ciphertext and decrypt the Thrift header.
158-
auto header_cipher_buf =
159-
AllocateBuffer(properties.memory_pool(), header_cipher_total_len);
160-
std::memcpy(header_cipher_buf->mutable_data(), length_buf->data(),
161-
kCiphertextLengthSize);
162-
const int64_t header_cipher_remaining =
163-
header_cipher_total_len - kCiphertextLengthSize;
164-
PARQUET_ASSIGN_OR_THROW(
165-
auto read_size,
166-
input->Read(header_cipher_remaining,
167-
header_cipher_buf->mutable_data() + kCiphertextLengthSize));
168-
if (ARROW_PREDICT_FALSE(read_size < header_cipher_remaining)) {
169-
throw ParquetException("Bloom filter header read failed: not enough data");
170-
}
171-
172-
uint32_t header_cipher_len = static_cast<uint32_t>(header_cipher_total_len);
173-
try {
174-
deserializer.DeserializeMessage(header_cipher_buf->data(), &header_cipher_len,
175-
&header, header_decryptor);
176-
DCHECK_EQ(header_cipher_len, header_cipher_total_len);
177-
} catch (std::exception& e) {
178-
std::stringstream ss;
179-
ss << "Deserializing bloom filter header failed.\n" << e.what();
180-
throw ParquetException(ss.str());
181-
}
182-
PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));
183-
184-
const int32_t bloom_filter_size = header.numBytes;
185-
const int32_t bitset_cipher_len =
186-
bitset_decryptor->CiphertextLength(bloom_filter_size);
187-
const int64_t total_cipher_len =
188-
header_cipher_total_len + static_cast<int64_t>(bitset_cipher_len);
189-
if (bloom_filter_length && *bloom_filter_length != total_cipher_len) {
190-
std::stringstream ss;
191-
ss << "Bloom filter length (" << bloom_filter_length.value()
192-
<< ") does not match the actual bloom filter (size: " << total_cipher_len
193-
<< ").";
194-
throw ParquetException(ss.str());
195-
}
196-
197-
// Read and decrypt the bitset bytes.
198-
PARQUET_ASSIGN_OR_THROW(auto bitset_cipher_buf, input->Read(bitset_cipher_len));
199-
if (ARROW_PREDICT_FALSE(bitset_cipher_buf->size() < bitset_cipher_len)) {
200-
throw ParquetException("Bloom filter read failed: not enough data");
201-
}
202-
203-
const int32_t bitset_plain_len =
204-
bitset_decryptor->PlaintextLength(static_cast<int32_t>(bitset_cipher_len));
205-
if (ARROW_PREDICT_FALSE(bitset_plain_len != bloom_filter_size)) {
206-
throw ParquetException("Bloom filter bitset size does not match header");
207-
}
208-
209-
auto bitset_plain_buf = AllocateBuffer(properties.memory_pool(), bitset_plain_len);
210-
int32_t decrypted_len =
211-
bitset_decryptor->Decrypt(bitset_cipher_buf->span_as<const uint8_t>(),
212-
bitset_plain_buf->mutable_span_as<uint8_t>());
213-
if (ARROW_PREDICT_FALSE(decrypted_len != bitset_plain_len)) {
214-
throw ParquetException("Bloom filter bitset decryption failed");
215-
}
216-
217-
// Initialize the bloom filter from the decrypted bitset.
218-
BlockSplitBloomFilter bloom_filter(properties.memory_pool());
219-
bloom_filter.Init(bitset_plain_buf->data(), bloom_filter_size);
220-
return bloom_filter;
235+
return DeserializeEncrypted(properties, input, bloom_filter_length, header_decryptor,
236+
bitset_decryptor);
221237
}
222238

223239
ThriftDeserializer deserializer(properties);

0 commit comments

Comments
 (0)