Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.queue;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

/**
* JMH benchmark for {@link LockablePool} measuring:
* <ul>
* <li>Isolated checkout/return throughput at varying thread counts</li>
* <li>Mixed workload: concurrent writers + periodic checkoutAll (refresh)</li>
* <li>Aggressive refresh: frequent checkoutAll to stress Phase 3 bulk cleanup</li>
* <li>Writer latency during refresh contention (sample mode)</li>
* </ul>
*/
@Fork(2)
@Warmup(iterations = 2, time = 3)
@Measurement(iterations = 3, time = 5)
@State(Scope.Benchmark)
@SuppressWarnings("unused")
public class LockablePoolBenchmark {

@Param({ "4", "8" })
int concurrency;

private LockablePool<PoolEntry> pool;

@Setup(Level.Iteration)
public void setup() {
AtomicInteger counter = new AtomicInteger(0);
pool = new LockablePool<>(() -> new PoolEntry(counter.getAndIncrement()), LinkedList::new, concurrency);
// Pre-warm the pool with more entries to populate multiple stripes
for (int i = 0; i < concurrency * 4; i++) {
PoolEntry e = pool.getAndLock();
pool.releaseAndUnlock(e);
}
}

// ── Mixed workload: writers + periodic refresh (1s interval) ──

@Benchmark
@Group("mixed_7w_1r")
@GroupThreads(7)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void writers_7w1r(Blackhole bh) {
PoolEntry e = pool.getAndLock();
bh.consume(simulateWork(e));
pool.releaseAndUnlock(e);
}

@Benchmark
@Group("mixed_7w_1r")
@GroupThreads(1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public List<PoolEntry> refresh_7w1r() throws InterruptedException {
Thread.sleep(1000);
return pool.checkoutAll();
}

@Benchmark
@Group("mixed_3w_1r")
@GroupThreads(3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void writers_3w1r(Blackhole bh) {
PoolEntry e = pool.getAndLock();
bh.consume(simulateWork(e));
pool.releaseAndUnlock(e);
}

@Benchmark
@Group("mixed_3w_1r")
@GroupThreads(1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public List<PoolEntry> refresh_3w1r() throws InterruptedException {
Thread.sleep(1000);
return pool.checkoutAll();
}

// ── Aggressive refresh: 10ms interval to stress checkoutAll Phase 3 ──
// This makes checkoutAll fire ~100x/sec instead of 1x/sec, amplifying
// the difference between per-item remove (old) and bulk removeIf (new).

@Benchmark
@Group("aggressive_7w_1r")
@GroupThreads(7)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void writers_aggressive_7w1r(Blackhole bh) {
PoolEntry e = pool.getAndLock();
bh.consume(simulateWork(e));
pool.releaseAndUnlock(e);
}

@Benchmark
@Group("aggressive_7w_1r")
@GroupThreads(1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public List<PoolEntry> refresh_aggressive_7w1r() {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
return pool.checkoutAll();
}

@Benchmark
@Group("aggressive_3w_1r")
@GroupThreads(3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void writers_aggressive_3w1r(Blackhole bh) {
PoolEntry e = pool.getAndLock();
bh.consume(simulateWork(e));
pool.releaseAndUnlock(e);
}

@Benchmark
@Group("aggressive_3w_1r")
@GroupThreads(1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public List<PoolEntry> refresh_aggressive_3w1r() {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
return pool.checkoutAll();
}

// ── Writer latency during refresh contention (sample mode) ──
// Measures per-operation latency distribution to capture tail latency
// spikes caused by checkoutAll holding the pool lock.

@Benchmark
@Group("latency_7w_1r")
@GroupThreads(7)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public void writers_latency_7w1r(Blackhole bh) {
PoolEntry e = pool.getAndLock();
bh.consume(simulateWork(e));
pool.releaseAndUnlock(e);
}

@Benchmark
@Group("latency_7w_1r")
@GroupThreads(1)
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public List<PoolEntry> refresh_latency_7w1r() {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10));
return pool.checkoutAll();
}

// ── Isolated: pure writer throughput (no refresh contention) ──

@Benchmark
@Group("writers_only_4t")
@GroupThreads(4)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void writersOnly_4t(Blackhole bh) {
PoolEntry e = pool.getAndLock();
bh.consume(simulateWork(e));
pool.releaseAndUnlock(e);
}

@Benchmark
@Group("writers_only_8t")
@GroupThreads(8)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void writersOnly_8t(Blackhole bh) {
PoolEntry e = pool.getAndLock();
bh.consume(simulateWork(e));
pool.releaseAndUnlock(e);
}

// ── Isolated checkoutAll throughput (no writers) ──
// Directly measures checkoutAll cost with a pre-populated pool.

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public List<PoolEntry> checkoutAll_isolated() {
// Re-populate pool before each checkout
for (int i = 0; i < concurrency; i++) {
PoolEntry e = pool.getAndLock();
pool.releaseAndUnlock(e);
}
return pool.checkoutAll();
}

private static long simulateWork(PoolEntry entry) {
long result = entry.hashCode();
for (int i = 0; i < 20; i++) {
result ^= (result << 13);
result ^= (result >> 7);
result ^= (result << 17);
}
return result;
}

static final class PoolEntry implements Lockable {
final int id;
private final ReentrantLock lock = new ReentrantLock();

PoolEntry(int id) {
this.id = id;
}

@Override
public void lock() {
lock.lock();
}

@Override
public boolean tryLock() {
return lock.tryLock();
}

@Override
public void unlock() {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,24 @@ boolean remove(T entry) {
}
return false;
}

/**
* Removes all entries matching the given predicate in a single pass across all stripes.
* Each stripe is locked once, and all matching entries within that stripe are removed
* before moving to the next stripe.
*
* @param predicate the condition for removal
*/
void removeIf(Predicate<T> predicate) {
for (int i = 0; i < concurrency; ++i) {
final Lock lock = locks[i];
final Queue<T> queue = queues[i];
lock.lock();
try {
queue.removeIf(predicate);
} finally {
lock.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -66,6 +67,15 @@ public boolean remove(T entry) {
return queue.remove(entry);
}

/**
* Removes all entries matching the given predicate in a single pass across all stripes.
*
* @param predicate the condition for removal
*/
public void removeIf(Predicate<T> predicate) {
queue.removeIf(predicate);
}

/**
* Add an entry to the queue and unlock it, in that order.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,30 +81,53 @@ public void releaseAndUnlock(T item) {

/**
* Lock and checkout all items from the pool.
* <p>
* Phase 1: Snapshot the items set under the pool lock.
* Phase 2: Lock each item outside the monitor to avoid holding it while blocking on in-flight operations.
* Phase 3: Remove checked-out items from the set and bulk-remove from the queue in a single pass.
*
* @return unmodifiable list of all items locked by current thread
* @throws IllegalStateException if the pool is closed
*/
public List<T> checkoutAll() {
ensureOpen();
List<T> lockedItems = new ArrayList<>();
List<T> checkedOutItems = new ArrayList<>();
for (T item : this) {

// Phase 1: Snapshot
List<T> snapshot;
synchronized (this) {
if (items.isEmpty()) {
return Collections.emptyList();
}
snapshot = new ArrayList<>(items.size());
snapshot.addAll(items);
}

// Phase 2: Lock outside monitor
for (T item : snapshot) {
item.lock();
lockedItems.add(item);
}

// Phase 3: Process + bulk cleanup
List<T> checkedOutItems = new ArrayList<>(snapshot.size());
synchronized (this) {
for (T item : lockedItems) {
Set<T> toRemoveFromQueue = Collections.newSetFromMap(new IdentityHashMap<>(snapshot.size()));

for (T item : snapshot) {
try {
if (isRegistered(item) && items.remove(item)) {
availableItems.remove(item);
if (items.remove(item)) {
toRemoveFromQueue.add(item);
checkedOutItems.add(item);
}
} finally {
item.unlock();
}
}

if (toRemoveFromQueue.isEmpty() == false) {
availableItems.removeIf(toRemoveFromQueue::contains);
}
}

return Collections.unmodifiableList(checkedOutItems);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.engine.dataformat.DocumentInput;
import org.opensearch.index.mapper.MappedFieldType;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -47,7 +48,7 @@ public CompositeDocumentInput(
) {
this.primaryFormat = Objects.requireNonNull(primaryFormat, "primaryFormat must not be null");
this.primaryDocumentInput = Objects.requireNonNull(primaryDocumentInput, "primaryDocumentInput must not be null");
this.secondaryDocumentInputs = Map.copyOf(
this.secondaryDocumentInputs = Collections.unmodifiableMap(
Objects.requireNonNull(secondaryDocumentInputs, "secondaryDocumentInputs must not be null")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -275,7 +275,7 @@ public void deleteFiles(Map<String, Collection<String>> filesToDelete) throws IO
@Override
public CompositeDocumentInput newDocumentInput() {
DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
Map<DataFormat, DocumentInput<?>> secondaryInputMap = new IdentityHashMap<>();
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
}
Expand Down
Loading
Loading