predicate) {
+ queue.removeIf(predicate);
+ }
+
/**
* Add an entry to the queue and unlock it, in that order.
*
diff --git a/libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java b/libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java
index 505c92abd5833..ddc24b364c46e 100644
--- a/libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java
+++ b/libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java
@@ -81,30 +81,53 @@ public void releaseAndUnlock(T item) {
/**
* Lock and checkout all items from the pool.
+ *
+ * Phase 1: Snapshot the items set under the pool lock.
+ * Phase 2: Lock each item outside the monitor to avoid holding it while blocking on in-flight operations.
+ * Phase 3: Remove checked-out items from the set and bulk-remove from the queue in a single pass.
*
* @return unmodifiable list of all items locked by current thread
* @throws IllegalStateException if the pool is closed
*/
public List checkoutAll() {
ensureOpen();
- List lockedItems = new ArrayList<>();
- List checkedOutItems = new ArrayList<>();
- for (T item : this) {
+
+ // Phase 1: Snapshot
+ List snapshot;
+ synchronized (this) {
+ if (items.isEmpty()) {
+ return Collections.emptyList();
+ }
+ snapshot = new ArrayList<>(items.size());
+ snapshot.addAll(items);
+ }
+
+ // Phase 2: Lock outside monitor
+ for (T item : snapshot) {
item.lock();
- lockedItems.add(item);
}
+
+ // Phase 3: Process + bulk cleanup
+ List checkedOutItems = new ArrayList<>(snapshot.size());
synchronized (this) {
- for (T item : lockedItems) {
+ Set toRemoveFromQueue = Collections.newSetFromMap(new IdentityHashMap<>(snapshot.size()));
+
+ for (T item : snapshot) {
try {
- if (isRegistered(item) && items.remove(item)) {
- availableItems.remove(item);
+ if (items.remove(item)) {
+ toRemoveFromQueue.add(item);
checkedOutItems.add(item);
}
} finally {
item.unlock();
}
}
+
+ if (toRemoveFromQueue.isEmpty() == false) {
+ availableItems.removeIf(toRemoveFromQueue::contains);
+ }
}
+
return Collections.unmodifiableList(checkedOutItems);
}
diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java
index 75d7b37368f0c..5f095eec3eb07 100644
--- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java
+++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java
@@ -13,6 +13,7 @@
import org.opensearch.index.engine.dataformat.DocumentInput;
import org.opensearch.index.mapper.MappedFieldType;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -47,7 +48,7 @@ public CompositeDocumentInput(
) {
this.primaryFormat = Objects.requireNonNull(primaryFormat, "primaryFormat must not be null");
this.primaryDocumentInput = Objects.requireNonNull(primaryDocumentInput, "primaryDocumentInput must not be null");
- this.secondaryDocumentInputs = Map.copyOf(
+ this.secondaryDocumentInputs = Collections.unmodifiableMap(
Objects.requireNonNull(secondaryDocumentInputs, "secondaryDocumentInputs must not be null")
);
}
diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
index b1080cbe9a63c..7f2d02a3d2b5d 100644
--- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
+++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java
@@ -31,7 +31,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedHashMap;
+import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -275,7 +275,7 @@ public void deleteFiles(Map> filesToDelete) throws IO
@Override
public CompositeDocumentInput newDocumentInput() {
DocumentInput> primaryInput = primaryEngine.newDocumentInput();
- Map> secondaryInputMap = new LinkedHashMap<>();
+ Map> secondaryInputMap = new IdentityHashMap<>();
for (IndexingExecutionEngine, ?> engine : secondaryEngines) {
secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
}
diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java
index 309494adf21e9..63c68dbbea0cd 100644
--- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java
+++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java
@@ -21,8 +21,8 @@
import org.opensearch.index.engine.exec.WriterFileSet;
import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.LinkedHashMap;
+import java.util.Collections;
+import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
@@ -44,7 +44,8 @@ class CompositeWriter implements Writer, Lockable {
private static final Logger logger = LogManager.getLogger(CompositeWriter.class);
- private final Map.Entry>> primaryWriter;
+ private final DataFormat primaryFormat;
+ private final Writer> primaryWriter;
private final Map>> secondaryWritersByFormat;
private final ReentrantLock lock;
private final long writerGeneration;
@@ -87,16 +88,14 @@ enum WriterState {
this.writerGeneration = writerGeneration;
IndexingExecutionEngine, ?> primaryDelegate = engine.getPrimaryDelegate();
- this.primaryWriter = new AbstractMap.SimpleImmutableEntry<>(
- primaryDelegate.getDataFormat(),
- (Writer>) primaryDelegate.createWriter(writerGeneration)
- );
+ this.primaryFormat = primaryDelegate.getDataFormat();
+ this.primaryWriter = (Writer>) primaryDelegate.createWriter(writerGeneration);
- Map>> secondaries = new LinkedHashMap<>();
+ Map>> secondaries = new IdentityHashMap<>();
for (IndexingExecutionEngine, ?> delegate : engine.getSecondaryDelegates()) {
secondaries.put(delegate.getDataFormat(), (Writer>) delegate.createWriter(writerGeneration));
}
- this.secondaryWritersByFormat = Map.copyOf(secondaries);
+ this.secondaryWritersByFormat = Collections.unmodifiableMap(secondaries);
this.rowIdGenerator = new RowIdGenerator(CompositeWriter.class.getName());
}
@@ -106,11 +105,11 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
throw new IllegalStateException("Cannot add document to writer in state " + state.get());
}
// Write to primary first
- WriteResult primaryResult = primaryWriter.getValue().addDoc(doc.getPrimaryInput());
+ WriteResult primaryResult = primaryWriter.addDoc(doc.getPrimaryInput());
switch (primaryResult) {
- case WriteResult.Success s -> logger.trace("Successfully added document in primary format [{}]", primaryWriter.getKey().name());
+ case WriteResult.Success s -> logger.trace("Successfully added document in primary format [{}]", primaryFormat.name());
case WriteResult.Failure f -> {
- logger.debug("Failed to add document in primary format [{}]", primaryWriter.getKey().name());
+ logger.debug("Failed to add document in primary format [{}]", primaryFormat.name());
return primaryResult;
}
}
@@ -141,8 +140,8 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
public FileInfos flush() throws IOException {
FileInfos.Builder builder = FileInfos.builder();
// Flush primary
- Optional primaryWfs = primaryWriter.getValue().flush().getWriterFileSet(primaryWriter.getKey());
- primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryWriter.getKey(), writerFileSet));
+ Optional primaryWfs = primaryWriter.flush().getWriterFileSet(primaryFormat);
+ primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryFormat, writerFileSet));
// Flush secondaries
for (Writer> writer : secondaryWritersByFormat.values()) {
FileInfos fileInfos = writer.flush();
@@ -156,7 +155,7 @@ public FileInfos flush() throws IOException {
@Override
public void sync() throws IOException {
- primaryWriter.getValue().sync();
+ primaryWriter.sync();
for (Writer> writer : secondaryWritersByFormat.values()) {
writer.sync();
}
@@ -164,7 +163,7 @@ public void sync() throws IOException {
@Override
public void close() throws IOException {
- primaryWriter.getValue().close();
+ primaryWriter.close();
for (Writer> writer : secondaryWritersByFormat.values()) {
writer.close();
}