Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* RecordCoreInternalException.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record;

import com.apple.foundationdb.annotation.API;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Exception thrown when an inconsistency in core record layer behavior is detected.
*/
@API(API.Status.STABLE)
@SuppressWarnings("serial")

Check warning on line 32 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java#L32

Warning of type 'serial' should not be suppressed https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3699%2Fohadzeliger%2Fthread-safe-partitioner%3AHEAD&id=168853FE53030B61D9B1BA502C912CBB
public class RecordCoreInternalException extends RecordCoreException {

public RecordCoreInternalException(@Nonnull final String msg, @Nullable final Object... keyValues) {
super(msg, keyValues);
}
}

Check warning on line 38 in fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java

View check run for this annotation

fdb.teamscale.io / Teamscale | Findings

fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/RecordCoreInternalException.java#L33-L38

`RecordCoreInternalException` has inheritance depth of 4 which is deeper than maximum of 2 https://fdb.teamscale.io/findings/details/foundationdb-fdb-record-layer?t=FORK_MR%2F3699%2Fohadzeliger%2Fthread-safe-partitioner%3AHEAD&id=A193D7F6D871B3092816E104FFF31874
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ private void insertField(LuceneDocumentFromRecord.DocumentField field, final Doc
}
}

<M extends Message> void writeDocument(final FDBIndexableRecord<M> newRecord, final Map.Entry<Tuple, List<LuceneDocumentFromRecord.DocumentField>> entry, final Integer partitionId) {
try {
writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey());
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey());
}
}

@SuppressWarnings("PMD.CloseResource")
private void writeDocument(@Nonnull List<LuceneDocumentFromRecord.DocumentField> fields,
Tuple groupingKey,
Expand Down Expand Up @@ -473,30 +481,30 @@ <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexableRecord<

LOG.trace("update oldFields={}, newFields{}", oldRecordFields, newRecordFields);

// delete old
return AsyncUtil.whenAll(oldRecordFields.keySet().stream().map(t -> {
try {
return tryDelete(Objects.requireNonNull(oldRecord), t);
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", Objects.requireNonNull(oldRecord).getPrimaryKey());
}
}).collect(Collectors.toList())).thenCompose(ignored ->
// update new
AsyncUtil.whenAll(newRecordFields.entrySet().stream().map(entry -> {
try {
return tryDeleteInWriteOnlyMode(Objects.requireNonNull(newRecord), entry.getKey()).thenCompose(countDeleted ->
partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint).thenApply(partitionId -> {
try {
writeDocument(entry.getValue(), entry.getKey(), partitionId, newRecord.getPrimaryKey());
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue updating new index keys", e, "newRecord", newRecord.getPrimaryKey());
}
return null;
}));
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue updating", e, "record", Objects.requireNonNull(newRecord).getPrimaryKey());
}
}).collect(Collectors.toList())));
return AsyncUtil.whenAll(oldRecordFields.keySet().stream()
// delete old
.map(groupingKey -> tryDelete(Objects.requireNonNull(oldRecord), groupingKey))
.collect(Collectors.toList()))
.thenCompose(ignored ->
// update new
AsyncUtil.whenAll(newRecordFields.entrySet().stream()
.map(entry -> updateRecord(newRecord, destinationPartitionIdHint, entry))
.collect(Collectors.toList())));
}

/**
* Internal utility to update a single record.
* @param newRecord the new record to save
* @param destinationPartitionIdHint partition ID
* @param entry entry from the grouping key to the document fields
*/
private <M extends Message> CompletableFuture<Void> updateRecord(
final FDBIndexableRecord<M> newRecord,
final Integer destinationPartitionIdHint,
final Map.Entry<Tuple, List<LuceneDocumentFromRecord.DocumentField>> entry) {
return tryDeleteInWriteOnlyMode(Objects.requireNonNull(newRecord), entry.getKey()).thenCompose(countDeleted ->
partitioner.addToAndSavePartitionMetadata(newRecord, entry.getKey(), destinationPartitionIdHint)
.thenAccept(partitionId -> writeDocument(newRecord, entry, partitionId)));
}

/**
Expand All @@ -509,10 +517,9 @@ <M extends Message> CompletableFuture<Void> update(@Nullable FDBIndexableRecord<
* @param groupingKey grouping key
* @param <M> message
* @return count of deleted docs
* @throws IOException propagated by {@link #tryDelete(FDBIndexableRecord, Tuple)}
*/
private <M extends Message> CompletableFuture<Integer> tryDeleteInWriteOnlyMode(@Nonnull FDBIndexableRecord<M> record,
@Nonnull Tuple groupingKey) throws IOException {
@Nonnull Tuple groupingKey) {
if (!state.store.isIndexWriteOnly(state.index)) {
// no op
return CompletableFuture.completedFuture(0);
Expand All @@ -529,29 +536,35 @@ private <M extends Message> CompletableFuture<Integer> tryDeleteInWriteOnlyMode(
* @param <M> record message
* @return count of deleted docs: 1 indicates that the record has been deleted, 0 means that either no record was deleted or it was deleted by
* query.
* @throws IOException propagated from {@link #deleteDocument(Tuple, Integer, Tuple)}
*/
private <M extends Message> CompletableFuture<Integer> tryDelete(@Nonnull FDBIndexableRecord<M> record,
@Nonnull Tuple groupingKey) throws IOException {
// non-partitioned
if (!partitioner.isPartitioningEnabled()) {
return CompletableFuture.completedFuture(deleteDocument(groupingKey, null, record.getPrimaryKey()));
@Nonnull Tuple groupingKey) {
try {
// non-partitioned
if (!partitioner.isPartitioningEnabled()) {
return CompletableFuture.completedFuture(deleteDocument(groupingKey, null, record.getPrimaryKey()));
}
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", Objects.requireNonNull(record).getPrimaryKey());
}

// partitioned
return partitioner.tryGetPartitionInfo(record, groupingKey).thenApply(partitionInfo -> {
if (partitionInfo != null) {
try {
int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey());
if (countDeleted > 0) {
partitioner.decrementCountAndSave(groupingKey, partitionInfo, countDeleted);
}
return countDeleted;
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", record.getPrimaryKey());
return partitioner.tryGetPartitionInfo(record, groupingKey).thenCompose(partitionInfo -> {
if (partitionInfo == null) {
return CompletableFuture.completedFuture(0);
}
try {
int countDeleted = deleteDocument(groupingKey, partitionInfo.getId(), record.getPrimaryKey());
// this might be 0 when in writeOnly mode, but otherwise should not happen.
if (countDeleted > 0) {
return partitioner.decrementCountAndSave(groupingKey, countDeleted, partitionInfo.getId())
.thenApply(vignore -> countDeleted);
} else {
return CompletableFuture.completedFuture(countDeleted);
}
} catch (IOException e) {
throw LuceneExceptions.toRecordCoreException("Issue deleting", e, "record", record.getPrimaryKey());
}
return 0;
});
}

Expand Down
Loading
Loading