diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java new file mode 100644 index 0000000000..4017204a3f --- /dev/null +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java @@ -0,0 +1,38 @@ +/* + * RecordCoreInternalException.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record; + +import com.apple.foundationdb.annotation.API; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Exception thrown when an inconsistency in core record layer behavior is detected. + */ +@API(API.Status.STABLE) +@SuppressWarnings("serial") +public class RecordCoreInternalException extends RecordCoreException { + + public RecordCoreInternalException(@Nonnull final String msg, @Nullable final Object... keyValues) { + super(msg, keyValues); + } +} diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java index 070be07355..052276254f 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintainer.java @@ -247,6 +247,14 @@ private void insertField(LuceneDocumentFromRecord.DocumentField field, final Doc } } + void writeDocument(final FDBIndexableRecord newRecord, final Map.Entry> entry, final Integer partitionId) { + try { + writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey()); + } catch (IOException e) { + throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey()); + } + } + @SuppressWarnings("PMD.CloseResource") private void writeDocument(@Nonnull List fields, Tuple groupingKey, @@ -473,30 +481,30 @@ CompletableFuture update(@Nullable FDBIndexableRecord< LOG.trace("update oldFields={}, newFields{}", oldRecordFields, newRecordFields); - // delete old - return AsyncUtil.whenAll(oldRecordFields.keySet().stream().map(t -> { - try { - return tryDelete(Objects.requireNonNull(oldRecord), t); - } catch (IOException e) { - throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", Objects.requireNonNull(oldRecord).getPrimaryKey()); - } - }).collect(Collectors.toList())).thenCompose(ignored -> - // update new - AsyncUtil.whenAll(newRecordFields.entrySet().stream().map(entry -> { - try { - return tryDeleteInWriteOnlyMode(Objects.requireNonNull(newRecord), entry.getKey()).thenCompose(countDeleted -> - partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint).thenApply(partitionId -> { - try { - writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey()); - } catch (IOException e) { - throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey()); - } - return null; - })); - } catch (IOException e) { - throw LuceneExceptions.toRecordCoreException("Issue updating", e, "record", Objects.requireNonNull(newRecord).getPrimaryKey()); - } - }).collect(Collectors.toList()))); + return AsyncUtil.whenAll(oldRecordFields.keySet().stream() + // delete old + .map(groupingKey -> tryDelete(Objects.requireNonNull(oldRecord), groupingKey)) + .collect(Collectors.toList())) + .thenCompose(ignored -> + // update new + AsyncUtil.whenAll(newRecordFields.entrySet().stream() + .map(entry -> updateRecord(newRecord, destinationPartitionIdHint, entry)) + .collect(Collectors.toList()))); + } + + /** + * Internal utility to update a single record. + * @param newRecord the new record to save + * @param destinationPartitionIdHint partition ID + * @param entry entry from the grouping key to the document fields + */ + private CompletableFuture updateRecord( + final FDBIndexableRecord newRecord, + final Integer destinationPartitionIdHint, + final Map.Entry> entry) { + return tryDeleteInWriteOnlyMode(Objects.requireNonNull(newRecord), entry.getKey()).thenCompose(countDeleted -> + partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint) + .thenAccept(partitionId -> writeDocument(newRecord, entry, partitionId))); } /** @@ -509,10 +517,9 @@ CompletableFuture update(@Nullable FDBIndexableRecord< * @param groupingKey grouping key * @param message * @return count of deleted docs - * @throws IOException propagated by {@link #tryDelete(FDBIndexableRecord, Tuple)} */ private CompletableFuture tryDeleteInWriteOnlyMode(@Nonnull FDBIndexableRecord record, - @Nonnull Tuple groupingKey) throws IOException { + @Nonnull Tuple groupingKey) { if (!state.store.isIndexWriteOnly(state.index)) { // no op return CompletableFuture.completedFuture(0); @@ -529,29 +536,35 @@ private CompletableFuture tryDeleteInWriteOnlyMode( * @param record message * @return count of deleted docs: 1 indicates that the record has been deleted, 0 means that either no record was deleted or it was deleted by * query. - * @throws IOException propagated from {@link #deleteDocument(Tuple, Integer, Tuple)} */ private CompletableFuture tryDelete(@Nonnull FDBIndexableRecord record, - @Nonnull Tuple groupingKey) throws IOException { - // non-partitioned - if (!partitioner.isPartitioningEnabled()) { - return CompletableFuture.completedFuture(deleteDocument(groupingKey, null, record.getPrimaryKey())); + @Nonnull Tuple groupingKey) { + try { + // non-partitioned + if (!partitioner.isPartitioningEnabled()) { + return CompletableFuture.completedFuture(deleteDocument(groupingKey, null, record.getPrimaryKey())); + } + } catch (IOException e) { + throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", Objects.requireNonNull(record).getPrimaryKey()); } // partitioned - return partitioner.tryGetPartitionInfo(record, groupingKey).thenApply(partitionInfo -> { - if (partitionInfo != null) { - try { - int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey()); - if (countDeleted > 0) { - partitioner.decrementCountAndSave(groupingKey, partitionInfo, countDeleted); - } - return countDeleted; - } catch (IOException e) { - throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", record.getPrimaryKey()); + return partitioner.tryGetPartitionInfo(record, groupingKey).thenCompose(partitionInfo -> { + if (partitionInfo == null) { + return CompletableFuture.completedFuture(0); + } + try { + int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey()); + // this might be 0 when in writeOnly mode, but otherwise should not happen. + if (countDeleted > 0) { + return partitioner.decrementCountAndSave(groupingKey, countDeleted, partitionInfo.getId()) + .thenApply(vignore -> countDeleted); + } else { + return CompletableFuture.completedFuture(countDeleted); } + } catch (IOException e) { + throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", record.getPrimaryKey()); } - return 0; }); } diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java index 8af4abcc03..2aa6c273af 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LucenePartitioner.java @@ -33,6 +33,7 @@ import com.apple.foundationdb.record.PipelineOperation; import com.apple.foundationdb.record.RecordCoreArgumentException; import com.apple.foundationdb.record.RecordCoreException; +import com.apple.foundationdb.record.RecordCoreInternalException; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordCursorContinuation; import com.apple.foundationdb.record.RecordCursorEndContinuation; @@ -40,8 +41,10 @@ import com.apple.foundationdb.record.ScanProperties; import com.apple.foundationdb.record.TupleRange; import com.apple.foundationdb.record.cursors.ChainedCursor; +import com.apple.foundationdb.record.locking.LockIdentifier; import com.apple.foundationdb.record.logging.KeyValueLogMessage; import com.apple.foundationdb.record.logging.LogMessageKeys; +import com.apple.foundationdb.record.lucene.codec.LazyOpener; import com.apple.foundationdb.record.lucene.directory.FDBDirectoryManager; import com.apple.foundationdb.record.metadata.Key; import com.apple.foundationdb.record.metadata.RecordType; @@ -81,7 +84,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -111,6 +113,7 @@ public class LucenePartitioner { private final int indexPartitionLowWatermark; private final KeyExpression partitioningKeyExpression; private final LuceneRepartitionPlanner repartitionPlanner; + private final LazyOpener directoryManagerSupplier; public LucenePartitioner(@Nonnull IndexMaintainerState state) { this.state = state; @@ -137,6 +140,7 @@ public LucenePartitioner(@Nonnull IndexMaintainerState state) { throw new RecordCoreArgumentException("High watermark must be greater than low watermark"); } this.repartitionPlanner = new LuceneRepartitionPlanner(indexPartitionLowWatermark, indexPartitionHighWatermark); + this.directoryManagerSupplier = LazyOpener.supply(() -> FDBDirectoryManager.getManager(state)); } /** @@ -489,28 +493,30 @@ public CompletableFuture addToAndSavePartitionMetad private CompletableFuture addToAndSavePartitionMetadata(@Nonnull final Tuple groupingKey, @Nonnull final Tuple partitioningKey, @Nullable final Integer assignedPartitionIdOverride) { - - final CompletableFuture assignmentFuture; - if (assignedPartitionIdOverride != null) { - assignmentFuture = getPartitionMetaInfoById(assignedPartitionIdOverride, groupingKey); - } else { - assignmentFuture = getOrCreatePartitionInfo(groupingKey, partitioningKey); - } - return assignmentFuture.thenApply(assignedPartition -> { - // assignedPartition is not null, since a new one is created by the previous call if none exist - LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(assignedPartition).toBuilder(); - builder.setCount(assignedPartition.getCount() + 1); - if (isOlderThan(partitioningKey, assignedPartition)) { - // clear the previous key - state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(assignedPartition))); - builder.setFrom(ByteString.copyFrom(partitioningKey.pack())); - } - if (isNewerThan(partitioningKey, assignedPartition)) { - builder.setTo(ByteString.copyFrom(partitioningKey.pack())); - } - savePartitionMetadata(groupingKey, builder); - return assignedPartition.getId(); - }); + return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)), + () -> { + final CompletableFuture assignmentFuture; + if (assignedPartitionIdOverride != null) { + assignmentFuture = getPartitionMetaInfoById(assignedPartitionIdOverride, groupingKey); + } else { + assignmentFuture = getOrCreatePartitionInfo(groupingKey, partitioningKey); + } + return assignmentFuture.thenApply(assignedPartition -> { + // assignedPartition is not null, since a new one is created by the previous call if none exist + LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(assignedPartition).toBuilder(); + builder.setCount(assignedPartition.getCount() + 1); + if (isOlderThan(partitioningKey, assignedPartition)) { + // clear the previous key + state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(assignedPartition))); + builder.setFrom(ByteString.copyFrom(partitioningKey.pack())); + } + if (isNewerThan(partitioningKey, assignedPartition)) { + builder.setTo(ByteString.copyFrom(partitioningKey.pack())); + } + savePartitionMetadata(groupingKey, builder); + return assignedPartition.getId(); + }); + }); } /** @@ -525,6 +531,10 @@ byte[] partitionMetadataKeyFromPartitioningValue(@Nonnull Tuple groupKey, @Nonnu return state.indexSubspace.pack(partitionMetadataKeyTuple(groupKey, partitionKey)); } + Subspace partitionMetadataSubspace(@Nonnull Tuple groupKey) { + return state.indexSubspace.subspace(groupKey.add(PARTITION_META_SUBSPACE)); + } + private static Tuple partitionMetadataKeyTuple(final @Nonnull Tuple groupKey, @Nonnull Tuple partitionKey) { return groupKey.add(PARTITION_META_SUBSPACE).addAll(partitionKey); } @@ -600,22 +610,30 @@ CompletableFuture decrementCountAndSave(@Nonnull Tuple groupingKey, + int amount, final int partitionId) { + return state.context.doWithWriteLock(new LockIdentifier(partitionMetadataSubspace(groupingKey)), + () -> getPartitionMetaInfoById(partitionId, groupingKey).thenAccept(serialized -> { + if (serialized == null) { + throw new RecordCoreInternalException("Lucene partition metadata changed during delete") + .addLogInfo(LogMessageKeys.INDEX_NAME, state.index.getName()) + .addLogInfo(LogMessageKeys.INDEX_SUBSPACE, state.indexSubspace); + } + LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = Objects.requireNonNull(serialized).toBuilder(); + // note that the to/from of the partition do not get updated, since that would require us to know + // what the next potential boundary value(s) are. The counts, nonetheless, remain valid. + builder.setCount(serialized.getCount() - amount); + + if (builder.getCount() < 0) { + // should never happen + throw new RecordCoreInternalException("Issue updating Lucene partition metadata (resulting count < 0)", + LogMessageKeys.PARTITION_ID, partitionId); + } + savePartitionMetadata(groupingKey, builder); + })); } /** @@ -862,7 +880,11 @@ public CompletableFuture processPartitionRebalancing(@Nonnull final Tup LOGGER.debug(repartitionLogMessage("Repartitioning records", groupingKey, repartitioningContext.countToMove, partitionInfo).toString()); } - return moveDocsFromPartitionThenLog(repartitioningContext, logMessages); + if (repartitioningContext.action == LuceneRepartitionPlanner.RepartitioningAction.REMOVE_EMPTY_PARTITION) { + return CompletableFuture.completedFuture(removeEmptyPartition(repartitioningContext)); + } else { + return moveDocsFromPartitionThenLog(repartitioningContext, logMessages); + } } } // here: no partitions need re-balancing @@ -989,12 +1011,7 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe return CompletableFuture.completedFuture(0); } RepartitionTimings timings = new RepartitionTimings(); - final StoreTimerSnapshot timerSnapshot; - if (LOGGER.isDebugEnabled() && state.context.getTimer() != null) { - timerSnapshot = StoreTimerSnapshot.from(state.context.getTimer()); - } else { - timerSnapshot = null; - } + final StoreTimerSnapshot timerSnapshot = getStoreTimerSnapshot(); timings.startNanos = System.nanoTime(); Collection recordTypes = state.store.getRecordMetaData().recordTypesForIndex(state.index); if (recordTypes.stream().map(RecordType::isSynthetic).distinct().count() > 1) { @@ -1024,7 +1041,7 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe timings.initializationNanos = System.nanoTime(); fetchedRecordsFuture = fetchedRecordsFuture.whenComplete((ignored, throwable) -> cursor.close()); - return fetchedRecordsFuture.thenCompose(records -> { + return fetchedRecordsFuture.thenApply(records -> { timings.searchNanos = System.nanoTime(); if (records.size() == 0) { throw new RecordCoreException("Unexpected error: 0 records fetched. repartitionContext {}", repartitioningContext); @@ -1041,7 +1058,7 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe if (LOGGER.isDebugEnabled()) { LOGGER.debug("no records to move, partition {}", partitionInfo); } - return CompletableFuture.completedFuture(0); + return 0; } // reset partition info @@ -1092,43 +1109,143 @@ private CompletableFuture moveDocsFromPartition(@Nonnull final LuceneRe } long updateStart = System.nanoTime(); - Iterator> recordIterator = records.iterator(); final int destinationPartitionId = destinationPartition.getId(); - return AsyncUtil.whileTrue(() -> indexMaintainer.update(null, recordIterator.next(), destinationPartitionId) - .thenApply(ignored -> recordIterator.hasNext()), state.context.getExecutor()) - .thenApply(ignored -> { - if (LOGGER.isDebugEnabled()) { - long updateNanos = System.nanoTime(); - final KeyValueLogMessage logMessage = repartitionLogMessage("Repartitioned records", groupingKey, records.size(), partitionInfo); - logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos)); - logMessage.addKeyAndValue("initializationMicros", TimeUnit.NANOSECONDS.toMicros(timings.initializationNanos - timings.startNanos)); - logMessage.addKeyAndValue("searchMicros", TimeUnit.NANOSECONDS.toMicros(timings.searchNanos - timings.initializationNanos)); - logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.searchNanos)); - if (timings.emptyingNanos > 0) { - logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos)); - } - if (timings.deleteNanos > 0) { - logMessage.addKeyAndValue("deleteMicros", TimeUnit.NANOSECONDS.toMicros(timings.deleteNanos - timings.clearInfoNanos)); - } - if (timings.metadataUpdateNanos > 0) { - logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.deleteNanos)); - } - if (timings.createPartitionNanos > 0) { - logMessage.addKeyAndValue("createPartitionMicros", TimeUnit.NANOSECONDS.toMicros(timings.createPartitionNanos - endCleanupNanos)); - } - logMessage.addKeyAndValue("updateMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - updateStart)); - if (timerSnapshot != null && state.context.getTimer() != null) { - logMessage.addKeysAndValues( - StoreTimer.getDifference(state.context.getTimer(), timerSnapshot) - .getKeysAndValues()); - } - LOGGER.debug(logMessage.toString()); - } - return records.size(); - }); + for (FDBIndexableRecord rec : records) { + LuceneDocumentFromRecord.getRecordFields(state.index.getRootExpression(), rec) + .entrySet().forEach(entry -> { + indexMaintainer.writeDocument(rec, entry, destinationPartitionId); + addToAndSavePartitionMetadata(rec, groupingKey, destinationPartitionId); + }); + } + if (LOGGER.isDebugEnabled()) { + long updateNanos = System.nanoTime(); + final KeyValueLogMessage logMessage = repartitionLogMessage("Repartitioned records", groupingKey, records.size(), partitionInfo); + logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos)); + logMessage.addKeyAndValue("initializationMicros", TimeUnit.NANOSECONDS.toMicros(timings.initializationNanos - timings.startNanos)); + logMessage.addKeyAndValue("searchMicros", TimeUnit.NANOSECONDS.toMicros(timings.searchNanos - timings.initializationNanos)); + logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.searchNanos)); + if (timings.emptyingNanos > 0) { + logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos)); + } + if (timings.deleteNanos > 0) { + logMessage.addKeyAndValue("deleteMicros", TimeUnit.NANOSECONDS.toMicros(timings.deleteNanos - timings.clearInfoNanos)); + } + if (timings.metadataUpdateNanos > 0) { + logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.deleteNanos)); + } + if (timings.createPartitionNanos > 0) { + logMessage.addKeyAndValue("createPartitionMicros", TimeUnit.NANOSECONDS.toMicros(timings.createPartitionNanos - endCleanupNanos)); + } + logMessage.addKeyAndValue("updateMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - updateStart)); + if (timerSnapshot != null && state.context.getTimer() != null) { + logMessage.addKeysAndValues( + StoreTimer.getDifference(state.context.getTimer(), timerSnapshot) + .getKeysAndValues()); + } + LOGGER.debug(logMessage.toString()); + } + return records.size(); }); } + /** + * Remove an empty partition from the index. + * @param repartitioningContext the context returned from the partitioning planner + * @return 0 in case no operation was performed, 1 if the operation was successful + */ + @VisibleForTesting + @Nonnull + int removeEmptyPartition(@Nonnull final LuceneRepartitionPlanner.RepartitioningContext repartitioningContext) { + // sanity check + if (repartitioningContext.countToMove != 0) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("removeEmptyPartition called with invalid countToMove {}", repartitioningContext.countToMove); + } + return 0; + } + if ((repartitioningContext.olderPartition == null) && (repartitioningContext.newerPartition == null)) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("removeEmptyPartition called with null neighboring partitions"); + } + return 0; + } + + try { + if (verifyNoDocumentsInPartition(repartitioningContext.sourcePartition, repartitioningContext.groupingKey)) { + clearEmptyPartition(repartitioningContext); + return 1; + } else { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("removeEmptyPartition called with documents still in the index"); + } + return 0; + } + } catch (IOException ioe) { + // log the error and stop the process - this would not prevent other groups from being rebalanced + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("removeEmptyPartition failed with an exception", ioe); + } + return 0; + } + } + + /** + * Return TRUE if the partition is empty (no docs in the actual Lucene index). + * @param partitionInfo the partition to check + * @param groupingKey the grouping key for the index + * @return TRUE if the index is empty, FALSE otherwise + */ + private boolean verifyNoDocumentsInPartition(@Nonnull LucenePartitionInfoProto.LucenePartitionInfo partitionInfo, final Tuple groupingKey) throws IOException { + return directoryManagerSupplier.get().getIndexReader(groupingKey, partitionInfo.getId()).numDocs() == 0; + } + + private void clearEmptyPartition(@Nonnull final LuceneRepartitionPlanner.RepartitioningContext repartitioningContext) { + RepartitionTimings timings = new RepartitionTimings(); + final StoreTimerSnapshot timerSnapshot = getStoreTimerSnapshot(); + timings.startNanos = System.nanoTime(); + + final LucenePartitionInfoProto.LucenePartitionInfo partitionInfo = repartitioningContext.sourcePartition; + final Tuple groupingKey = repartitioningContext.groupingKey; + + // reset partition info for deleted partition + state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(partitionInfo))); + timings.clearInfoNanos = System.nanoTime(); + + // Clear empty data area + Range partitionDataRange = Range.startsWith(state.indexSubspace.subspace(groupingKey.add(PARTITION_DATA_SUBSPACE).add(partitionInfo.getId())).pack()); + state.context.clear(partitionDataRange); + timings.emptyingNanos = System.nanoTime(); + + if (repartitioningContext.olderPartition != null) { + // update other partition's metadata (set "to" from deleted partition) + LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = repartitioningContext.olderPartition.toBuilder(); + builder.setTo(partitionInfo.getTo()); + savePartitionMetadata(groupingKey, builder); + } else { + // no older partition - need to delete the newer partition data and set a new "from" ("from" is the key) + state.context.ensureActive().clear(partitionMetadataKeyFromPartitioningValue(groupingKey, getPartitionKey(repartitioningContext.newerPartition))); + LucenePartitionInfoProto.LucenePartitionInfo.Builder builder = repartitioningContext.newerPartition.toBuilder(); + builder.setFrom(partitionInfo.getFrom()); + savePartitionMetadata(groupingKey, builder); + } + timings.metadataUpdateNanos = System.nanoTime(); + + if (LOGGER.isDebugEnabled()) { + long updateNanos = System.nanoTime(); + final KeyValueLogMessage logMessage = repartitionLogMessage("Removed empty partition", groupingKey, 0, partitionInfo); + logMessage.addKeyAndValue("totalMicros", TimeUnit.NANOSECONDS.toMicros(updateNanos - timings.startNanos)); + logMessage.addKeyAndValue("clearInfoMicros", TimeUnit.NANOSECONDS.toMicros(timings.clearInfoNanos - timings.startNanos)); + logMessage.addKeyAndValue("emptyingMicros", TimeUnit.NANOSECONDS.toMicros(timings.emptyingNanos - timings.clearInfoNanos)); + logMessage.addKeyAndValue("metadataUpdateMicros", TimeUnit.NANOSECONDS.toMicros(timings.metadataUpdateNanos - timings.emptyingNanos)); + if (timerSnapshot != null && state.context.getTimer() != null) { + logMessage.addKeysAndValues( + StoreTimer.getDifference(state.context.getTimer(), timerSnapshot) + .getKeysAndValues()); + } + LOGGER.debug(logMessage.toString()); + } + } + /** * Get all partition metadata for a given grouping key. * @@ -1396,6 +1513,15 @@ public RepartitioningLogMessages setRepartitionDocCount(int repartitionDocCount) } } + @Nullable + private StoreTimerSnapshot getStoreTimerSnapshot() { + if (LOGGER.isDebugEnabled() && state.context.getTimer() != null) { + return StoreTimerSnapshot.from(state.context.getTimer()); + } else { + return null; + } + } + /** * Timing information for {@link #moveDocsFromPartition(LuceneRepartitionPlanner.RepartitioningContext)}, to get a * better idea as to what is taking a long time when repartitioning is failing. diff --git a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneRepartitionPlanner.java b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneRepartitionPlanner.java index 877a41284e..e0753d3d53 100644 --- a/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneRepartitionPlanner.java +++ b/fdb-record-layer-lucene/src/main/java/com/apple/foundationdb/record/lucene/LuceneRepartitionPlanner.java @@ -74,6 +74,11 @@ RepartitioningContext determineRepartitioningAction(@Nonnull final Tuple groupin // candidate partition's current doc count final int currentPartitionCount = candidatePartition.getCount(); + if ((currentPartitionCount == 0) && ((olderPartition != null) || newerPartition != null)) { + // remove current empty partition + repartitioningContext.action = RepartitioningAction.REMOVE_EMPTY_PARTITION; + return repartitioningContext; + } if (currentPartitionCount >= indexPartitionLowWatermark && currentPartitionCount <= indexPartitionHighWatermark) { // repartitioning not required return repartitioningContext; @@ -150,7 +155,11 @@ enum RepartitioningAction { /** * partition is under the low watermark, but its immediate neighbors have no capacity to absorb it. */ - NO_CAPACITY_FOR_MERGE + NO_CAPACITY_FOR_MERGE, + /** + * partition is empty, remove from metadata. + */ + REMOVE_EMPTY_PARTITION } /** diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java index 7fbe62d057..ca8c2f3844 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexMaintenanceTest.java @@ -55,6 +55,7 @@ import com.apple.foundationdb.subspace.Subspace; import com.apple.foundationdb.tuple.Tuple; import com.apple.foundationdb.util.LoggableKeysAndValues; +import com.apple.test.ParameterizedTestUtils; import com.apple.test.RandomizedTestUtils; import com.apple.test.SuperSlow; import com.apple.test.Tags; @@ -77,12 +78,15 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Random; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -95,6 +99,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -108,7 +113,10 @@ import static com.apple.foundationdb.record.metadata.Key.Expressions.function; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -900,6 +908,197 @@ void sampledDelete(boolean isSynthetic, boolean isGrouped, long seed) throws IOE assertThat(partitionCounts, Matchers.contains(5, 3, 4))); } + static Stream removeEmptyPartitions() { + return Stream.of( + // In the following, (5, 20) means we delete records 5-20, emptying the second partition, + // (15, 25) means we empty the last partition, (0, 10) means we empty the first partition + // and (0, 20) means removing the first 2 partitions + // (there are 25 records total in each partition and the high watermark is 10). + Arguments.of(true, true, 987654, 5, 20, 2), + Arguments.of(true, true, 987654, 15, 25, 2), + Arguments.of(true, true, 987654, 0, 10, 2), + Arguments.of(true, true, 987654, 0, 20, 1), + Arguments.of(false, false, 543210, 5, 20, 2), + Arguments.of(false, false, 543210, 15, 25, 2), + Arguments.of(false, false, 543210, 0, 10, 2), + Arguments.of(false, false, 543210, 0, 20, 1)); + } + + /** + * clear a partition and ensure it gets removed. + * + * @param isSynthetic whether to use synthetic records + * @param isGrouped whether to use grouped index + * @param seed the random seed + * @param start the first record to delete from each group + * @param end the last record to delete from each group + * @param expectedCount the expected number of remaining partitions + */ + @ParameterizedTest + @MethodSource + void removeEmptyPartitions(boolean isSynthetic, boolean isGrouped, long seed, int start, int end, int expectedCount) throws IOException { + // Test that empty partitions are removed during repartitioning + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) + .setIsGrouped(isGrouped) + .setIsSynthetic(isSynthetic) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(10) + .build(); + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0 + .build(); + + // Create multiple partitions with documents + try (FDBRecordContext context = openContext(contextProps)) { + dataModel.saveRecordsToAllGroups(25, context); + commit(context); + } + + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + + // Verify we have 3 partitions (25 docs) initially + dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) -> + assertThat(partitionCounts, hasSize(3))); + + // Delete documents + try (FDBRecordContext context = openContext(contextProps)) { + final FDBRecordStore recordStore = dataModel.createOrOpenRecordStore(context); + dataModel.groupingKeys().forEach(groupingKey -> { + List sortedPartitionKeys = dataModel.groupingKeyToPrimaryKeyToPartitionKey.get(groupingKey) + .values().stream().sorted().collect(Collectors.toList()); + // delete enough docs to empty partition + Set partitionKeysToDelete = new HashSet<>(sortedPartitionKeys.subList(start, end)); + dataModel.recordsUnderTest().stream() + .filter(rec -> partitionKeysToDelete.contains(rec.getPartitioningKey())) + .forEach(rec -> rec.deleteRecord(recordStore).join()); + }); + context.commit(); + } + + // Before repartitioning, we should still have 3 partitions (including the empty one) + dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) -> { + assertThat(partitionCounts, hasSize(3)); + assertThat(partitionCounts, hasItem(0)); + }); + // Trigger repartitioning - this should remove the empty partition + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + + // After repartitioning, we should have only expectedCount partitions (empty one removed) + dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) -> + assertThat(partitionCounts, hasSize(expectedCount))); + + // Validate the index is still consistent + dataModel.validate(() -> openContext(contextProps)); + } + + @ParameterizedTest + @MethodSource("sampledDelete") + void emptyAllPartitions(boolean isSynthetic, boolean isGrouped, long seed) throws IOException { + // Test that empty partitions are removed during repartitioning + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) + .setIsGrouped(isGrouped) + .setIsSynthetic(isSynthetic) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(10) + .build(); + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0 + .build(); + + // Create multiple partitions with documents + try (FDBRecordContext context = openContext(contextProps)) { + dataModel.saveRecordsToAllGroups(25, context); + commit(context); + } + + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + + // Verify we have 3 partitions (25 docs) initially + dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) -> + assertThat(partitionCounts, hasSize(3))); + + // Delete all documents + try (FDBRecordContext context = openContext(contextProps)) { + final FDBRecordStore recordStore = dataModel.createOrOpenRecordStore(context); + dataModel.recordsUnderTest() + .forEach(rec -> rec.deleteRecord(recordStore).join()); + context.commit(); + } + + // Before repartitioning, we should still have 3 partitions (including the empty one) + dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) -> + assertThat(partitionCounts, hasSize(3))); + + // Trigger repartitioning - this should remove the empty partition + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + + // After repartitioning, we should have only 1 partition (empty ones removed) + dataModel.getPartitionCounts(() -> openContext(contextProps)).forEach((groupingKey, partitionCounts) -> + assertThat(partitionCounts, hasSize(1))); + + // Validate the index is still consistent + dataModel.validate(() -> openContext(contextProps)); + } + + /** + * Remove some random set of records, commit and merge, ensure that once all records are + * removed we have one empty partition. + */ + @Test + void randomlyRemoveAllRecords() throws IOException { + // Test that empty partitions are removed during repartitioning + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(3663, this::getStoreBuilder, pathManager) + .setIsGrouped(true) + .setIsSynthetic(true) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(10) + .build(); + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0 + .build(); + + // Create multiple partitions with documents + try (FDBRecordContext context = openContext(contextProps)) { + dataModel.saveRecordsToAllGroups(25, context); + commit(context); + } + + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + + // Iterate and delete 1/2 of records each time + boolean done = false; + int maxLoop = 100; + AtomicInteger currentLoopCount = new AtomicInteger(1); + while (!done) { + try (FDBRecordContext context = openContext(contextProps)) { + final FDBRecordStore recordStore = dataModel.createOrOpenRecordStore(context); + dataModel.recordsUnderTest() + .forEach(rec -> { + if ((dataModel.nextInt(2) == 0) || (currentLoopCount.get() >= maxLoop)) { + rec.deleteRecord(recordStore).join(); + } + }); + context.commit(); + } + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + dataModel.validate(() -> openContext(contextProps)); + currentLoopCount.incrementAndGet(); + // done once all documents have been deleted + done = dataModel.recordsUnderTest().isEmpty(); + } + + // Ensure we only have one empty partition + dataModel.getPartitionCounts(() -> openContext(contextProps)) + .forEach((groupingKey, partitionCounts) -> + assertThat(partitionCounts, contains(0))); + } + static Stream changingEncryptionKey() { return Stream.concat(Stream.of(Arguments.of(true, true, 288513), Arguments.of(false, false, 792025)), @@ -946,12 +1145,16 @@ void changingEncryptionKey(boolean isSynthetic, boolean isGrouped, long seed) { containsString("Un-supported compression version"))); } + private enum PartitionCount { NONE, ONE, MULTIPLE } + private static Stream concurrentParameters() { // only run the individual tests with synthetic during nightly, the mix runs both return Stream.concat(Stream.of(false), TestConfigurationUtils.onlyNightly( IntStream.range(0, 3).boxed().flatMap(i -> Stream.of(true, false)))) - .map(Arguments::of); + .flatMap(isSynthetic -> + Arrays.stream(PartitionCount.values()) + .map(partitionHighWatermark -> Arguments.of(isSynthetic, partitionHighWatermark))); } /** @@ -965,12 +1168,12 @@ private static Stream concurrentParameters() { */ @ParameterizedTest @MethodSource("concurrentParameters") - void concurrentUpdate(final boolean isSynthetic) throws IOException { + void concurrentUpdate(final boolean isSynthetic, PartitionCount partitionCount) throws IOException { concurrentTestWithinTransaction(isSynthetic, (dataModel, recordStore) -> RecordCursor.fromList(dataModel.recordsUnderTest()) .mapPipelined(record -> record.updateOtherValue(recordStore), 10) .asList().join(), - Assertions::assertEquals); + Assertions::assertEquals, noopConsumer(), partitionCount); } /** @@ -984,12 +1187,15 @@ void concurrentUpdate(final boolean isSynthetic) throws IOException { */ @ParameterizedTest @MethodSource("concurrentParameters") - void concurrentDelete(final boolean isSynthetic) throws IOException { + void concurrentDelete(final boolean isSynthetic, PartitionCount partitionCount) throws IOException { concurrentTestWithinTransaction(isSynthetic, (dataModel, recordStore) -> RecordCursor.fromList(dataModel.recordsUnderTest()) .mapPipelined(record -> record.deleteRecord(recordStore), 10) .asList().join(), - (inserted, actual) -> assertEquals(0, actual)); + (inserted, actual) -> assertEquals(0, actual), + // Assert that there is only one partition left, and that it has 0 documents + partitionCounts -> partitionCounts.values().forEach(counts -> assertThat(counts, contains(0))), + partitionCount); } /** @@ -1003,18 +1209,20 @@ void concurrentDelete(final boolean isSynthetic) throws IOException { */ @ParameterizedTest @MethodSource("concurrentParameters") - void concurrentInsert(final boolean isSynthetic) throws IOException { + void concurrentInsert(final boolean isSynthetic, final PartitionCount partitionCount) throws IOException { concurrentTestWithinTransaction(isSynthetic, (dataModel, recordStore) -> RecordCursor.fromList(dataModel.recordsUnderTest()) .mapPipelined(record -> { // ignore the record, we're just using that as a count return dataModel.saveRecordAsync(true, recordStore, 1); }, 10) .asList().join(), - (inserted, actual) -> assertEquals(inserted * 2, actual)); + (inserted, actual) -> assertEquals(inserted * 2, actual), noopConsumer(), partitionCount); } private static Stream concurrentMixParameters() { - return Stream.of(true, false).map(Arguments::of); + return ParameterizedTestUtils.cartesianProduct( + ParameterizedTestUtils.booleans("isSynthetic"), + Arrays.stream(PartitionCount.values())); } /** @@ -1028,7 +1236,7 @@ private static Stream concurrentMixParameters() { */ @ParameterizedTest @MethodSource("concurrentMixParameters") - void concurrentMix(final boolean isSynthetic) throws IOException { + void concurrentMix(final boolean isSynthetic, final PartitionCount partitionCount) throws IOException { // We never touch the same record twice. AtomicInteger step = new AtomicInteger(0); AtomicInteger updates = new AtomicInteger(0); @@ -1051,12 +1259,16 @@ void concurrentMix(final boolean isSynthetic) throws IOException { } }, 10) .asList().join(), - (inserted, actual) -> assertEquals(inserted + saves.get() - deletes.get(), actual)); + (inserted, actual) -> assertEquals(inserted + saves.get() - deletes.get(), actual), + noopConsumer(), + partitionCount); } private void concurrentTestWithinTransaction(boolean isSynthetic, final BiConsumer applyChangeConcurrently, - final BiConsumer assertDataModelCount) throws IOException { + final BiConsumer assertDataModelCount, + final Consumer>> assertPartitionCounts, + final PartitionCount partitionCountToPopulate) throws IOException { // Once the two issues noted below are fixed, we should make this parameterized, and run with additional random // configurations. AtomicInteger threadCounter = new AtomicInteger(); @@ -1074,9 +1286,22 @@ private void concurrentTestWithinTransaction(boolean isSynthetic, final long seed = 320947L; final boolean isGrouped = true; final boolean primaryKeySegmentIndexEnabled = true; - // LucenePartitioner is not thread safe, and the counts get broken - // See: https://github.com/FoundationDB/fdb-record-layer/issues/2990 - final int partitionHighWatermark = -1; + int partitionHighWatermark; + + switch (partitionCountToPopulate) { + case NONE: + partitionHighWatermark = 0; + break; + case ONE: + partitionHighWatermark = 100_000; + break; + case MULTIPLE: + partitionHighWatermark = 100; + break; + default: + throw new IllegalArgumentException("Unknown value for partitionCountToPopulate:" + partitionCountToPopulate); + } + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) .setIsGrouped(isGrouped) .setIsSynthetic(isSynthetic) @@ -1120,8 +1345,13 @@ private void concurrentTestWithinTransaction(boolean isSynthetic, commit(context); } + explicitMergeIndex(dataModel.index, contextProps, dataModel.schemaSetup); + assertDataModelCount.accept(recordsPerIteration * loopCount, dataModel.groupingKeyToPrimaryKeyToPartitionKey.values().stream().mapToInt(Map::size).sum()); + if (dataModel.partitionHighWatermark > 0) { + assertPartitionCounts.accept(dataModel.getPartitionCounts(() -> openContext(contextProps))); + } dataModel.validate(() -> openContext(contextProps)); @@ -1350,4 +1580,8 @@ protected RecordLayerPropertyStorage.Builder addDefaultProps(final RecordLayerPr } return super.addDefaultProps(props).addProp(LuceneRecordContextProperties.LUCENE_INDEX_COMPRESSION_ENABLED, true); } + + private Consumer noopConsumer() { + return ignored -> { }; + } } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java index 52804c016e..57f1b8084b 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestDataModel.java @@ -599,7 +599,8 @@ private SyntheticRecord(@Nonnull final Tuple groupingKey, @Nonnull final Tuple p this.parentPrimaryKey = primaryKey; this.childPrimaryKey = childPrimaryKey; this.syntheticPrimaryKey = syntheticPrimaryKey; - partitioningKey = Tuple.from(timestamp).addAll(primaryKey); + // The partitioning key for the record is the synthetic PK ("-1" + both PKs for constituents) + partitioningKey = Tuple.from(timestamp).addAll(syntheticPrimaryKey); } @Override diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java index 61d297e40f..f9d1a50da1 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneIndexTestValidator.java @@ -64,6 +64,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -163,6 +164,7 @@ private void validateUnpartitionedGroup(final Index index, final String universa private void validatePartitionedGroup(final Index index, final String universalSearch, final boolean allowDuplicatePrimaryKeys, final Tuple groupingKey, final int partitionLowWatermark, final int partitionHighWatermark, final List records, final Map> missingDocuments) throws IOException { List partitionInfos = getPartitionMeta(index, groupingKey); + assertThat(partitionInfos.size(), greaterThan(0)); partitionInfos.sort(Comparator.comparing(info -> Tuple.fromBytes(info.getFrom().toByteArray()))); Set usedPartitionIds = new HashSet<>(); Tuple lastToTuple = null; @@ -188,13 +190,13 @@ private void validatePartitionedGroup(final Index index, final String universalS "partitionInfo.count", partitionInfo.getCount())); // if partitionInfo.getCount() is wrong, this can be very confusing, so a different assertion might be // worthwhile + assertThat(records, hasSize(greaterThanOrEqualTo(visitedCount + partitionInfo.getCount()))); final Set expectedPrimaryKeys = Set.copyOf(records.subList(visitedCount, visitedCount + partitionInfo.getCount())); validateDocsInPartition(recordStore, index, partitionInfo.getId(), groupingKey, expectedPrimaryKeys, universalSearch); visitedCount += partitionInfo.getCount(); - assertThat(records.size(), greaterThanOrEqualTo(visitedCount)); validatePrimaryKeySegmentIndex(recordStore, index, groupingKey, partitionInfo.getId(), expectedPrimaryKeys, allowDuplicatePrimaryKeys); expectedPrimaryKeys.forEach(primaryKey -> missingDocuments.get(groupingKey).remove(primaryKey)); @@ -213,7 +215,7 @@ private Tuple validatePartition(final Tuple groupingKey, final int partitionLowW final List partitionInfos, final int partitionIndex, final Set usedPartitionIds, Tuple previousToTuple) { final LucenePartitionInfoProto.LucenePartitionInfo partitionInfo = partitionInfos.get(partitionIndex); - assertTrue(isParititionCountWithinBounds(partitionInfos, partitionIndex, partitionLowWatermark, partitionHighWatermark), + assertTrue(isPartitionCountWithinBounds(partitionInfos, partitionIndex, partitionLowWatermark, partitionHighWatermark), () -> partitionMessage(groupingKey, partitionLowWatermark, partitionHighWatermark, partitionInfos, partitionIndex)); assertTrue(usedPartitionIds.add(partitionInfo.getId()), () -> "Duplicate id: " + partitionInfo); final Tuple fromTuple = Tuple.fromBytes(partitionInfo.getFrom().toByteArray()); @@ -285,10 +287,10 @@ List getPartitionMeta(Index index, } } - boolean isParititionCountWithinBounds(@Nonnull final List partitionInfos, - int currentPartitionIndex, - int lowWatermark, - int highWatermark) { + boolean isPartitionCountWithinBounds(@Nonnull final List partitionInfos, + int currentPartitionIndex, + int lowWatermark, + int highWatermark) { int currentCount = partitionInfos.get(currentPartitionIndex).getCount(); if (currentCount > highWatermark) { return false; @@ -296,10 +298,19 @@ boolean isParititionCountWithinBounds(@Nonnull final List= lowWatermark) { return true; } + if (currentCount == 0) { + if (partitionInfos.size() == 1) { + // we have no documents in the index, one partition must remain + return true; + } else { + // We have an empty partition that should have been deleted + return false; + } + } // here: count < lowWatermark int leftNeighborCapacity = currentPartitionIndex == 0 ? 0 : getPartitionExtraCapacity(partitionInfos.get(currentPartitionIndex - 1).getCount(), highWatermark); int rightNeighborCapacity = currentPartitionIndex == (partitionInfos.size() - 1) ? 0 : getPartitionExtraCapacity(partitionInfos.get(currentPartitionIndex + 1).getCount(), highWatermark); - + // Ensure that if we have capacity in left and right neighbors, the records are moved away from currentPartition return currentCount > 0 && (leftNeighborCapacity + rightNeighborCapacity) < currentCount; } diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LucenePartitionerTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LucenePartitionerTest.java new file mode 100644 index 0000000000..23da69fe76 --- /dev/null +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LucenePartitionerTest.java @@ -0,0 +1,224 @@ +/* + * LucenePartitionerTest.java + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2015-2025 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.apple.foundationdb.record.lucene; + +import com.apple.foundationdb.record.RecordCoreInternalException; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore; +import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreConcurrentTestBase; +import com.apple.foundationdb.record.provider.foundationdb.properties.RecordLayerPropertyStorage; +import com.apple.foundationdb.tuple.Tuple; +import com.apple.test.BooleanSource; +import com.apple.test.Tags; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Tag(Tags.RequiresFDB) +public class LucenePartitionerTest extends FDBRecordStoreConcurrentTestBase { + @Test + void testDecrementCountNegative() throws Exception { + final long seed = 6647237; + final int repartitionCount = 3; + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) + .setIsGrouped(true) + .setIsSynthetic(true) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(10) + .build(); + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, repartitionCount) + .addProp(LuceneRecordContextProperties.LUCENE_MAX_DOCUMENTS_TO_MOVE_DURING_REPARTITIONING, dataModel.nextInt(1000) + repartitionCount) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, (double)dataModel.nextInt(10) + 2) // it must be at least 2.0 + .build(); + + dataModel.saveManyRecords(20, () -> openContext(contextProps), dataModel.nextInt(15) + 1); + + mergeIndex(contextProps, dataModel); + + // decrement partition counts + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore recordStore = dataModel.createOrOpenRecordStore(context); + final LuceneIndexMaintainer indexMaintainer = (LuceneIndexMaintainer)recordStore.getIndexMaintainer(dataModel.index); + final LucenePartitioner partitioner = indexMaintainer.getPartitioner(); + dataModel.groupingKeyToPrimaryKeyToPartitionKey.keySet().forEach(groupingKey -> { + final LucenePartitionInfoProto.LucenePartitionInfo firstPartition = partitioner.getAllPartitionMetaInfo(groupingKey).join().stream().findFirst().get(); + Assertions.assertThatThrownBy(() -> partitioner.decrementCountAndSave(groupingKey, 5000, firstPartition.getId()).join()) + .hasCauseInstanceOf(RecordCoreInternalException.class); + }); + // Commit here to ensure that the data is not corrupt as a result + context.commit(); + } + + dataModel.validate(() -> openContext(contextProps)); + } + + @ParameterizedTest + @BooleanSource + void testTryRemovingNonEmptyPartition(boolean isGrouped) throws Exception { + // Test that removeEmptyPartition correctly detects and refuses to remove a partition that still has documents + final long seed = 123456; + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) + .setIsGrouped(isGrouped) + .setIsSynthetic(true) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(10) + .build(); + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, 2.0) + .build(); + + // Create multiple partitions with documents + try (FDBRecordContext context = openContext(contextProps)) { + dataModel.saveRecordsToAllGroups(25, context); + commit(context); + } + + mergeIndex(contextProps, dataModel); + + // Try to remove a non-empty partition + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore recordStore = dataModel.createOrOpenRecordStore(context); + final LuceneIndexMaintainer indexMaintainer = (LuceneIndexMaintainer)recordStore.getIndexMaintainer(dataModel.index); + final LucenePartitioner partitioner = indexMaintainer.getPartitioner(); + + dataModel.groupingKeyToPrimaryKeyToPartitionKey.keySet().forEach(groupingKey -> { + // Get partitions for this grouping key + final List partitions = + partitioner.getAllPartitionMetaInfo(groupingKey).join(); + Assertions.assertThat(partitions).hasSizeGreaterThan(0); + + // Get the first partition (which has documents in the actual Lucene index) + final LucenePartitionInfoProto.LucenePartitionInfo partitionWithDocs = partitions.get(0); + // Note: We don't check the count metadata field here because it could theoretically be 0 + // even if documents exist in the index (metadata out of sync). The test verifies that + // verifyNoDocumentsInPartition() checks the actual index, not just the metadata. + + // Create a RepartitioningContext for removing this partition + final LucenePartitionInfoProto.LucenePartitionInfo olderPartition = partitions.size() > 1 ? partitions.get(1) : null; + final LuceneRepartitionPlanner.RepartitioningContext repartitioningContext = + new LuceneRepartitionPlanner.RepartitioningContext( + groupingKey, + partitions.stream().mapToInt(LucenePartitionInfoProto.LucenePartitionInfo::getId).max().orElse(0), + partitionWithDocs, + olderPartition, + null + ); + repartitioningContext.countToMove = 0; // Set to 0 to indicate removing empty partition + repartitioningContext.emptyingPartition = true; + + // Call removeEmptyPartition - it should return 0 because the partition is not empty + final int result = partitioner.removeEmptyPartition(repartitioningContext); + + // The method should return 0 indicating it did not remove the partition + Assertions.assertThat(result).isZero(); + + // Verify the partition still exists + final List partitionsAfter = + partitioner.getAllPartitionMetaInfo(groupingKey).join(); + Assertions.assertThat(partitionsAfter).hasSize(partitions.size()); + }); + + context.commit(); + } + + // Validate the index is still consistent + dataModel.validate(() -> openContext(contextProps)); + } + + @ParameterizedTest + @BooleanSource + void testMergeNonEmptyPartitionFails(boolean isGrouped) throws Exception { + // Test that a partition with inconsistent metadata does not get removed + final long seed = 36548347; + final LuceneIndexTestDataModel dataModel = new LuceneIndexTestDataModel.Builder(seed, this::getStoreBuilder, pathManager) + .setIsGrouped(isGrouped) + .setIsSynthetic(true) + .setPrimaryKeySegmentIndexEnabled(true) + .setPartitionHighWatermark(10) + .build(); + + final RecordLayerPropertyStorage contextProps = RecordLayerPropertyStorage.newBuilder() + .addProp(LuceneRecordContextProperties.LUCENE_REPARTITION_DOCUMENT_COUNT, 2) + .addProp(LuceneRecordContextProperties.LUCENE_MERGE_SEGMENTS_PER_TIER, 2.0) + .build(); + + // Create multiple partitions with documents + try (FDBRecordContext context = openContext(contextProps)) { + dataModel.saveRecordsToAllGroups(25, context); + commit(context); + } + + mergeIndex(contextProps, dataModel); + + Map partitionsWithZeroCount = new HashMap<>(); + // simulate a partition metadata being 0 (though still has records) + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore recordStore = dataModel.createOrOpenRecordStore(context); + final LuceneIndexMaintainer indexMaintainer = (LuceneIndexMaintainer)recordStore.getIndexMaintainer(dataModel.index); + final LucenePartitioner partitioner = indexMaintainer.getPartitioner(); + + dataModel.groupingKeyToPrimaryKeyToPartitionKey.keySet().forEach(groupingKey -> { + final List partitions = + partitioner.getAllPartitionMetaInfo(groupingKey).join(); + Assertions.assertThat(partitions).hasSizeGreaterThan(0); + // Get the first partition (which has documents in the actual Lucene index) + final LucenePartitionInfoProto.LucenePartitionInfo partition = partitions.get(0); + // zero out the partition's count + partitioner.decrementCountAndSave(groupingKey, partition.getCount(), partition.getId()); + partitionsWithZeroCount.put(groupingKey, partition.getId()); + }); + + context.commit(); + } + + // Merge index (should not remove the partition) + mergeIndex(contextProps, dataModel); + + // Ensure partition does not get removed + try (FDBRecordContext context = openContext(contextProps)) { + FDBRecordStore recordStore = dataModel.createOrOpenRecordStore(context); + final LuceneIndexMaintainer indexMaintainer = (LuceneIndexMaintainer)recordStore.getIndexMaintainer(dataModel.index); + final LucenePartitioner partitioner = indexMaintainer.getPartitioner(); + + dataModel.groupingKeyToPrimaryKeyToPartitionKey.keySet().forEach(groupingKey -> { + final List partitions = + partitioner.getAllPartitionMetaInfo(groupingKey).join(); + Assertions.assertThat(partitions).anyMatch(partition -> partition.getId() == partitionsWithZeroCount.get(groupingKey)); + }); + } + } + + private void mergeIndex(final RecordLayerPropertyStorage contextProps, final LuceneIndexTestDataModel dataModel) { + try (FDBRecordContext context = openContext(contextProps)) { + dataModel.explicitMergeIndex(context, null); + context.commit(); + } + } +} diff --git a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneRepartitionPlannerTest.java b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneRepartitionPlannerTest.java index faa0826061..7ef8c01a84 100644 --- a/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneRepartitionPlannerTest.java +++ b/fdb-record-layer-lucene/src/test/java/com/apple/foundationdb/record/lucene/LuceneRepartitionPlannerTest.java @@ -26,6 +26,7 @@ import com.google.protobuf.ByteString; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; @@ -34,6 +35,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -45,8 +47,18 @@ */ @Tag(Tags.RequiresFDB) public class LuceneRepartitionPlannerTest { + + private static Stream luceneRepartitionPlannerTest() { + return Stream.concat(LuceneIndexTest.simplePartitionConsolidationTest(), + Stream.of( + // ensure empty partition gets removed + Arguments.of(1, 5, 3, new int[] {5, 0, 4}, new int[] {5, 4}, 9002508147645127223L), + // ensure empty partition isn't erroring out + Arguments.of(1, 5, 3, new int[] {0}, new int[] {0}, 9002508147645127223L))); + } + @ParameterizedTest - @MethodSource(value = "com.apple.foundationdb.record.lucene.LuceneIndexTest#simplePartitionConsolidationTest") + @MethodSource public void luceneRepartitionPlannerTest(int lowWatermark, int highWatermark, int repartitionDocumentCount, @@ -76,10 +88,6 @@ public void luceneRepartitionPlannerTest(int lowWatermark, for (int i = 0; i < allPartitions.size(); i++) { LucenePartitionInfoProto.LucenePartitionInfo currentPartitionInfo = allPartitions.get(i); - if (currentPartitionInfo.getCount() == 0) { - continue; - } - Pair neighborPartitions = LucenePartitioner.getPartitionNeighbors(allPartitions, i); @@ -137,6 +145,9 @@ public void luceneRepartitionPlannerTest(int lowWatermark, } totalMoved += actualCountToMove; break; + case REMOVE_EMPTY_PARTITION: + allPartitions.remove(i); + break; default: break; }