diff --git a/.github/workflows/gradle-precommit.yml b/.github/workflows/gradle-precommit.yml index dab5b89c08c2..1b181e4de427 100644 --- a/.github/workflows/gradle-precommit.yml +++ b/.github/workflows/gradle-precommit.yml @@ -43,7 +43,7 @@ jobs: uses: ./.github/actions/gradle-caches - name: Run gradle check (without tests) - run: ./gradlew check -x test -Ptask.times=true --max-workers 2 + run: ./gradlew check -x test -Ptask.times=true --max-workers 2 -x :lucene:analysis.tests:renderJavadoc -x :lucene:distribution.tests:renderJavadoc -x :lucene:analysis:morfologik.tests:renderJavadoc # This runs all tests without any other validation checks. tests: @@ -74,7 +74,7 @@ jobs: uses: ./.github/actions/gradle-caches - name: Run gradle tests - run: ./gradlew test "-Ptask.times=true" --max-workers 2 + run: ./gradlew test "-Ptask.times=true" -Plucene.unload.executorPerDirectory=true -Plucene.unload.ttl=1 -Plucene.unload.initial=0 --max-workers 2 - name: Echo settings run: cat gradle.properties diff --git a/gradle/testing/defaults-tests.gradle b/gradle/testing/defaults-tests.gradle index 25a2291bc182..136140e1d6a2 100644 --- a/gradle/testing/defaults-tests.gradle +++ b/gradle/testing/defaults-tests.gradle @@ -31,6 +31,10 @@ allprojects { // completes. // [propName: 'tests.foo', value: "bar", description: "Sets foo in tests."], testOptions = [ + [propName: 'lucene.unload.executorPerDirectory', value: false, description: "Directory instances supply their own unloading executor"], + [propName: 'lucene.unload.ttl', value: '60m', description: "Time since last use at which a resource becomes eligible for unloading"], + [propName: 'lucene.unload.initial', value: '1m', description: "Extra time allotted for first resource use after load/reload"], + [propName: 'lucene.unload.disable', value: 'false', description: "Disables resource unloading"], // asserts, debug output. [propName: 'tests.verbose', value: false, description: "Enables verbose mode (emits full test outputs immediately)."], [propName: 'tests.workDir', diff --git a/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java index 0e8352da4451..43a74fc0ea04 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldDocValuesFormat.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.Unloader; import org.apache.lucene.util.IOUtils; /** @@ -300,9 +301,17 @@ public FieldsReader(final SegmentReadState readState) throws IOException { String segmentSuffix = getFullSegmentSuffix(readState.segmentSuffix, getSuffix(formatName, suffix)); if (!formats.containsKey(segmentSuffix)) { - formats.put( - segmentSuffix, - format.fieldsProducer(new SegmentReadState(readState, segmentSuffix))); + SegmentReadState srs = new SegmentReadState(readState, segmentSuffix); + DocValuesProducer dvp; + if (format instanceof Unloader.UnloadAware + && ((Unloader.UnloadAware) format).disableUnload(srs)) { + dvp = format.fieldsProducer(srs); + } else { + dvp = + Unloader.docValuesProducer( + () -> format.fieldsProducer(srs), srs.segmentInfo.dir, srs); + } + formats.put(segmentSuffix, dvp); } fields.put(fieldName, formats.get(segmentSuffix)); } diff --git a/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldPostingsFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldPostingsFormat.java index 61749a09284e..fb8607e15059 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldPostingsFormat.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/perfield/PerFieldPostingsFormat.java @@ -43,6 +43,7 @@ import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.index.Terms; +import org.apache.lucene.index.Unloader; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.MergedIterator; @@ -323,9 +324,16 @@ public FieldsReader(final SegmentReadState readState) throws IOException { PostingsFormat format = PostingsFormat.forName(formatName); String segmentSuffix = getSuffix(formatName, suffix); if (!formats.containsKey(segmentSuffix)) { - formats.put( - segmentSuffix, - format.fieldsProducer(new SegmentReadState(readState, segmentSuffix))); + SegmentReadState srs = new SegmentReadState(readState, segmentSuffix); + FieldsProducer fp; + if (format instanceof Unloader.UnloadAware + && ((Unloader.UnloadAware) format).disableUnload(srs)) { + fp = format.fieldsProducer(srs); + } else { + fp = + Unloader.fieldsProducer(() -> format.fieldsProducer(srs), srs.directory, srs); + } + formats.put(segmentSuffix, fp); } fields.put(fieldName, formats.get(segmentSuffix)); } diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java b/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java index a63f0a8a5efe..8cc0d747a4d4 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java +++ b/lucene/core/src/java/org/apache/lucene/index/SegmentCoreReaders.java @@ -30,6 +30,7 @@ import org.apache.lucene.codecs.FieldsProducer; import org.apache.lucene.codecs.KnnVectorsReader; import org.apache.lucene.codecs.NormsProducer; +import org.apache.lucene.codecs.PointsFormat; import org.apache.lucene.codecs.PointsReader; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.codecs.StoredFieldsReader; @@ -149,7 +150,14 @@ protected TermVectorsReader initialValue() { } if (coreFieldInfos.hasPointValues()) { - pointsReader = codec.pointsFormat().fieldsReader(segmentReadState); + PointsFormat pf = codec.pointsFormat(); + if (pf instanceof Unloader.UnloadAware + && ((Unloader.UnloadAware) pf).disableUnload(segmentReadState)) { + pointsReader = pf.fieldsReader(segmentReadState); + } else { + pointsReader = + Unloader.pointsReader(() -> pf.fieldsReader(segmentReadState), dir, segmentReadState); + } } else { pointsReader = null; } diff --git a/lucene/core/src/java/org/apache/lucene/index/Unloader.java b/lucene/core/src/java/org/apache/lucene/index/Unloader.java new file mode 100644 index 000000000000..9f5a8c72e7fc --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/Unloader.java @@ -0,0 +1,1318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.LongSupplier; +import java.util.function.Supplier; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.codecs.PointsReader; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.UnloaderCoordinationPoint; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOFunction; +import org.apache.lucene.util.IOSupplier; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.ThreadInterruptedException; + +/** Handles thread-safe dynamic unloading and on-demand reloading of backing resource. */ +public class Unloader implements Closeable { + + private InfoStream out = InfoStream.getDefault(); + + private static final DelegateFuture CLOSED = new DelegateFuture<>(null, null); + + static { + CLOSED.complete(null); + } + + private final IOFunction, T> reopen; + + private volatile long lastAccessNanos = System.nanoTime(); + + private final AtomicReference> backing; + private final String description; + private final UnloadHelper reporter; + private final ScheduledExecutorService exec; + + private static final class DelegateFuture extends CompletableFuture> + implements Closeable { + private final boolean unloading; + private final WeakReference> prev; + private final AtomicReference> sentinel; + + @SuppressWarnings("unused") + private volatile T strongRef; + + @SuppressWarnings("unused") + private DelegateFuture hardRef; // kept to prevent collection + + public boolean completeStrong(T value) { + if (super.complete(new WeakReference<>(value))) { + strongRef = value; + return true; + } else { + return false; + } + } + + public T getNowStrong(T valueIfAbsent) { + T ret; + WeakReference extant = super.getNow(null); + if (extant == null) { + return valueIfAbsent; + } else if ((ret = extant.get()) == null) { + throw new NullPointerException(); + } else { + return ret; + } + } + + public T getStrong(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + WeakReference ref = super.get(timeout, unit); + T ret = ref.get(); + if (ret == null) { + throw new NullPointerException(); + } else { + return ret; + } + } + + private DelegateFuture(Object initialSentinel, DelegateFuture prev) { + this.unloading = initialSentinel == null; + this.prev = new WeakReference<>(prev); + if (unloading) { + this.sentinel = null; + hardRef = prev; + whenComplete( + (r, e) -> { + if (e == null) { + // if we've completed normally (no exception), then release the hard reference + // (we keep the reference if we complete exceptionally, because we may want to + // retry unloading. This should be rare anyway. + hardRef = null; + } + }); + } else { + sentinel = new AtomicReference<>(new WeakReference<>(initialSentinel)); + } + } + + /** + * true if a reservation was acquired for this instance. Reservation release must be handled + * elsewhere. + */ + private Object acquire() { + Object ret; + boolean reusedSentinel = true; + WeakReference candidate = sentinel.get(); + while ((ret = candidate.get()) == null) { + if (candidate == UNLOADED_REF) { + // unloaded while we're trying to acquire + return null; + } + ret = new Object(); + WeakReference extant = + sentinel.compareAndExchange(candidate, new WeakReference<>(ret)); + if (extant == candidate) { + reusedSentinel = false; + break; + } else { + candidate = extant; + } + } + if (reusedSentinel) { + INDIRECT_TRACK_COUNT.increment(); + } else { + REFS_COLLECTED.increment(); + } + return ret; + } + + private boolean unload(boolean force) { + assert !unloading; + if (force) { + sentinel.set(UNLOADED_REF); + return true; + } else { + WeakReference candidate = sentinel.get(); + for (; ; ) { + WeakReference extant; + if (candidate == UNLOADED_REF || candidate.get() != null) { + // either already unloaded, or still referenced + return false; + } else if ((extant = sentinel.compareAndExchange(candidate, UNLOADED_REF)) == candidate) { + return true; + } else { + candidate = extant; + } + } + } + } + + @Override + public void close() { + // release so it can be GC'd + strongRef = null; + } + } + + private static final WeakReference UNLOADED_REF = new WeakReference<>(null); + + @SuppressWarnings("unchecked") + private final DelegateFuture closedSentinel = (DelegateFuture) CLOSED; + + private static final LongAdder REFS_COLLECTED = new LongAdder(); + + private static final LongSupplier REFS_COLLECTED_COUNT_SUPPLIER = REFS_COLLECTED::sum; + + /** + * Creates a new unloader to handle unloading and on-demand reloading a backing resource + * + * @param reopen function with this as parameter; returns a newly loaded instance of + * the backing resource, and (if applicable) schedules a task to check eligibility to unload + * at a point in the future (determined by `keepAliveNanos`) + * @param keepAliveNanos the time threshold (in nanos) since last access, at which the backing + * resource will be eligible for unloading + * @param receiveFirstInstance informs the calling (shim resource) of the first backing resource, + * returning a string description. This may be used to initialize state on the shim resource + * according to information about the backing resource. The caller should take care to not + * hold any references to the initial object that would prevent it from being GC'ed. + * @throws IOException e.g., on error reading index + */ + public Unloader( + UnloadHelper unloadHelper, + IOFunction, T> reopen, + long keepAliveNanos, + IOFunction receiveFirstInstance) + throws IOException { + if (INITIALIZED_EXTERNAL.compareAndSet(false, true)) { + unloadHelper.maybeHandleRefQueues( + INITIALIZED_EXTERNAL, INDIRECT_TRACK_COUNT_SUPPLIER, REFS_COLLECTED_COUNT_SUPPLIER); + } + this.reporter = unloadHelper; + this.reopen = reopen; + this.keepAliveNanos = keepAliveNanos; + Object sentinel = new Object(); + DelegateFuture holder = new DelegateFuture<>(sentinel, null); + backing = new AtomicReference<>(holder); + this.exec = unloadHelper.onCreation(this); + T in = reopen.apply(this); + try { + description = receiveFirstInstance.apply(in); + holder.completeStrong(in); + } catch (Throwable t) { + try (in) { + unloadHelper.onClose(); + throw t; + } + } finally { + Reference.reachabilityFence(sentinel); + } + } + + private static final AtomicBoolean INITIALIZED_EXTERNAL = new AtomicBoolean(false); + + /** Sets the infostream for {@link Unloader}. */ + public void setInfoStream(InfoStream out) { + this.out = out; + List deferred; + if (out != InfoStream.NO_OUTPUT + && out.isEnabled("UN") + && (deferred = DEFERRED_INIT_MESSAGES.getAndSet(null)) != null) { + for (String m : deferred) { + out.message("UN", m); + } + } + } + + private final long keepAliveNanos; + + /** This resource has already been unloaded */ + public static final long ALREADY_UNLOADED = -2; + + /** This resource was unloaded as a result of this invocation of {@link #maybeUnload()}. */ + public static final long UNLOADED = -1; + + /** This resource is still referenced, so was not unloaded. */ + public static final long STILL_REFERENCED = 0; + + private final Random unloadRandom = new Random(); // single-threaded access + + private static boolean injectDelay(Random r, int oneIn, int millis) { + if (r.nextInt(oneIn) == 0) { + try { + Thread.sleep(millis); + } catch ( + @SuppressWarnings("unused") + InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return true; + } + + private static DelegateFuture unloadRef( + AtomicReference> ref, boolean[] unloading) { + DelegateFuture extant; + while (!(extant = ref.get()).unloading) { + if (!extant.unload(false)) { + // still referenced + return null; + } + DelegateFuture candidate = new DelegateFuture<>(null, extant); + if (ref.compareAndSet(extant, candidate)) { + return candidate; + } + } + if (extant == CLOSED) { + throw new AlreadyClosedException(""); + } + // already unloading + unloading[0] = true; + return null; + } + + private static Map.Entry, Object> loadRef( + AtomicReference> ref, boolean[] weCompute) { + DelegateFuture extant; + Object sentinel; + while ((extant = ref.get()).unloading || (sentinel = extant.acquire()) == null) { + if (extant.unloading) { + if (extant == CLOSED) { + throw new AlreadyClosedException(""); + } + Object initialSentinel = new Object(); + DelegateFuture candidate = new DelegateFuture<>(initialSentinel, extant); + if (ref.compareAndSet(extant, candidate)) { + weCompute[0] = true; + return new AbstractMap.SimpleImmutableEntry<>(candidate, initialSentinel); + } + } + } + assert !extant.unloading; + return new AbstractMap.SimpleImmutableEntry<>(extant, sentinel); + } + + private static Map.Entry, Object> retry( + AtomicReference> ref, + final DelegateFuture replace, + boolean[] weCompute) { + DelegateFuture prev = replace.prev.get(); + if (prev == null) { + replace.unload(true); + ref.compareAndSet(replace, new DelegateFuture<>(null, replace)); + return loadRef(ref, weCompute); + } + Object sentinel = new Object(); + DelegateFuture candidate = new DelegateFuture<>(sentinel, prev); + DelegateFuture extant = ref.compareAndExchange(replace, candidate); + if (extant == replace) { + weCompute[0] = true; + return new AbstractMap.SimpleImmutableEntry<>(candidate, sentinel); + } else if (!extant.unloading && (sentinel = extant.acquire()) != null) { + return new AbstractMap.SimpleImmutableEntry<>(extant, sentinel); + } else { + replace.unload(true); + ref.compareAndSet(replace, new DelegateFuture<>(null, replace)); + return loadRef(ref, weCompute); + } + } + + /** + * Defaults to true; if true, allows {@link #reporter} to defer unloading (e.g., based on + * historical context, to avoid thrashing) via {@link UnloadHelper#deferUnload(long)}. This + * behavior may be disabled by setting sysprop to false. + */ + public static final boolean ADAPTIVE_DEFER = + !"false".equals(System.getProperty("lucene.unload.adaptiveDefer")); + + /** + * Conditionally unloads (closes) the delegate {@link FieldsProducer}. Returns {@link #UNLOADED} + * if resources were unloaded, otherwise returns the number of nanos remaining until the resources + * might be eligible for unloading. + * + *

The special value {@link #STILL_REFERENCED} indicates that based on last known access time, + * this resource should be eligible for unloading -- but for some reason (e.g., refCount?) + * did not permit unloading. If this happens a lot, it probably indicates an error in logic + * somewhere. + */ + public long maybeUnload() throws IOException { + long nanosSinceLastAccess = System.nanoTime() - lastAccessNanos; + long deferNanos; + if (nanosSinceLastAccess < keepAliveNanos) { + // don't unload + return keepAliveNanos - nanosSinceLastAccess; + } else if (ADAPTIVE_DEFER && (deferNanos = reporter.deferUnload(nanosSinceLastAccess)) > 0) { + return deferNanos; + } + final boolean[] unloaded = new boolean[1]; + DelegateFuture holder = unloadRef(backing, unloaded); + if (holder == null) { + return unloaded[0] ? ALREADY_UNLOADED : STILL_REFERENCED; + } + // try to unload + try { + T weUnloaded = doUnload(holder, unloaded); + holder.complete(new WeakReference<>(weUnloaded)); + if (weUnloaded != null) { + return UNLOADED; + } else { + return unloaded[0] ? ALREADY_UNLOADED : STILL_REFERENCED; + } + } catch (Throwable t) { + holder.completeExceptionally(t); + throw t; + } + } + + private T doUnload(DelegateFuture holder, boolean[] unloaded) throws IOException { + assert injectDelay(unloadRandom, 5, 20); + final DelegateFuture active; + T toClose; + try { + active = holder.prev.get(); + assert active != null; + toClose = active.getNowStrong(null); + } catch ( + @SuppressWarnings("unused") + Exception ex) { + // exception during loading means there's nothing to unload + unloaded[0] = true; + return null; + } + if (toClose == null) { + // this can happen if (and should be _only_ if) we're trying to close + // a value that's still loading for some reason. This might cause a + // problem for foreground threads (resource access or close), but from + // background `maybeUnload()` perspective we don't _want_ to wait for + // it to finish loading. But it's our responsibility to ensure that the + // value _does_ get closed: either by putting it back onto `backing`, + // or as a last resort, waiting for it to load and then closing it. + + // first try to put our value back + if (backing.compareAndSet(holder, active)) { + // if CAS succeeds, it's guaranteed that state has not changed since + // we fetched this value for closing, so we have safely put it back + // as "still referenced" + return null; + } else { + // state has changed; it's our responsibility to wait for and close + // the resource we already pulled. Here we wait an _absurdly_ long + // time, because it's a leak at this point if we don't close the + // resource we've pulled. + try { + toClose = active.getStrong(10, TimeUnit.MINUTES); + } catch (InterruptedException ex) { + // we're probably shutting down + throw new ThreadInterruptedException(ex); + } catch ( + @SuppressWarnings("unused") + TimeoutException ex) { + if (out.isEnabled("UN")) + out.message("UN", "ERROR: stuck waiting to close loading resource!"); + // TODO: we could put this into a queue somewhere that retries closing? + // that said, if properly implemented, this should literally _never_ happen. + return null; + } catch ( + @SuppressWarnings("unused") + ExecutionException ex) { + // exception during loading means there's nothing to unload + unloaded[0] = true; + return null; + } + } + } + try (active) { + toClose.close(); + } + unloaded[0] = true; + return toClose; + } + + private static final long CLOSE_WAIT_FOR_LOAD_SECONDS = 10; + private static final long CLOSE_WAIT_FOR_BACKGROUND_UNLOAD_SECONDS = 1; + + @Override + @SuppressWarnings("try") + public void close() throws IOException { + this.reporter.onClose(); + DelegateFuture holder = backing.getAndSet(closedSentinel); + if (holder == CLOSED) { + throw new AlreadyClosedException(""); + } + if (holder.unloading) { + closeUnloading(holder); + } else { + // it's an active or loading instance + DelegateFuture maybeUnloadingHolder = holder.prev.get(); + assert maybeUnloadingHolder == null || maybeUnloadingHolder.unloading; + try (T toClose = + interruptProtectedGet(holder, CLOSE_WAIT_FOR_LOAD_SECONDS, TimeUnit.SECONDS)) { + closeUnloading(maybeUnloadingHolder); + // if we're closing ourselves, it should also count as an unload. + reporter.onUnload(System.nanoTime() - lastAccessNanos); + return; + } catch ( + @SuppressWarnings("unused") + ExecutionException e) { + // an exception while loading means there's nothing to close, and that's ok + } catch (TimeoutException e) { + // probably deadlock; we have no way to get the value to close + throw new IOException("timed out waiting to close loading value ", e); + } + closeUnloading(maybeUnloadingHolder); + } + } + + @SuppressWarnings("try") + private void closeUnloading(DelegateFuture unloading) throws IOException { + if (unloading == null) { + // must be done + return; + } + DelegateFuture toClose = unloading.prev.get(); + try { + // first, wait for unloading to complete + interruptProtectedGet(unloading, CLOSE_WAIT_FOR_BACKGROUND_UNLOAD_SECONDS, TimeUnit.SECONDS); + return; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + // fallthrough to try to close it ourselves + } catch ( + @SuppressWarnings("unused") + TimeoutException e) { + if (out.isEnabled("UN")) + out.message( + "UN", "WARN: timeout out waiting for background unload to complete " + description); + // fallthrough to try to close it ourselves + } + if (toClose != null) { + // (if `toClose == null`, we've done all we can) + // otherwise, `toClose` represents the resource that `unloading` was trying + // to close. As a last-ditch effort, we'll try to close it ourselves here. + // The risk here is double-close, but that's preferable to a resource leak. + try { + interruptProtectedGet(toClose, CLOSE_WAIT_FOR_LOAD_SECONDS, TimeUnit.SECONDS).close(); + reporter.onUnload(System.nanoTime() - lastAccessNanos); + } catch ( + @SuppressWarnings("unused") + ExecutionException e) { + // exception during compute means there's nothing to close + } catch (TimeoutException e) { + throw new IOException("timeout out waiting to close loading value " + description, e); + } + } + } + + /** + * Circumvents {@link InterruptedException} and forces the calling thread to block for the full + * allotted time. This should only be invoked from synchronous close, where the risk of a resource + * leak outweighs the risk from delayed thread exit. + * + *

If this method swallows an {@link InterruptedException}, it will re-set the thread's + * interrupted status before returning. + * + *

TODO: evaluate whether this behavior is actually desired in non-test context. + * + *

This method may return null! It should only be called (directly or indirectly) from within + * top-level {@link Unloader#close()} code. It is guaranteed to block for the specified amount of + * time, and is thus appropriate for coordination; but it should be considered "best-effort" in + * terms of returning an actual {@link Closeable} value. + */ + private static T interruptProtectedGet( + DelegateFuture future, long longWaitSeconds, TimeUnit timeUnit) + throws ExecutionException, TimeoutException { + boolean interrupted = false; + long now = System.nanoTime(); + final long until = now + timeUnit.toNanos(longWaitSeconds); + long waitNanos; + try { + while ((waitNanos = until - now) >= 0) { + try { + return future.get(waitNanos, TimeUnit.NANOSECONDS).get(); + } catch ( + @SuppressWarnings("unused") + InterruptedException ex) { + interrupted = true; + } + now = System.nanoTime(); + } + throw new TimeoutException(); + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + private static final class CloseableVal implements Supplier, Closeable { + + private final T val; + private Object sentinel; + + private CloseableVal(T val, Object sentinel) { + this.val = val; + this.sentinel = sentinel; + } + + @Override + public T get() { + return val; + } + + @Override + public void close() throws IOException { + Object toRemove = sentinel; + try { + sentinel = null; + } finally { + Reference.reachabilityFence(toRemove); + } + } + } + + private static final long TOTAL_BLOCK_NANOS = TimeUnit.SECONDS.toNanos(10); + + private CloseableVal backing() throws IOException { + boolean[] weCompute = new boolean[1]; + Map.Entry, Object> holder = loadRef(backing, weCompute); + long now = System.nanoTime(); + long until = now + TOTAL_BLOCK_NANOS; + while (!weCompute[0]) { + try { + return new CloseableVal<>( + holder.getKey().getStrong(until - now, TimeUnit.NANOSECONDS), holder.getValue()); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof IOException) { + throw (IOException) t; + } + holder = retry(backing, holder.getKey(), weCompute); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } catch ( + @SuppressWarnings("unused") + TimeoutException e) { + throw new IOException("timed out waiting to load backing resource"); + } + now = System.nanoTime(); + } + // we compute the result + boolean successfullyComputed = false; + T candidate = null; + try { + candidate = reopen.apply(this); + holder.getKey().completeStrong(candidate); + successfullyComputed = true; + return new CloseableVal<>(candidate, holder.getValue()); + } catch (Throwable t) { + holder.getKey().completeExceptionally(t); + throw t; + } finally { + if (candidate != null && !successfullyComputed) { + candidate.close(); + } + } + } + + private static final AtomicReference> DEFERRED_INIT_MESSAGES = + new AtomicReference<>(new ArrayList<>()); + + private static final LongAdder INDIRECT_TRACK_COUNT = new LongAdder(); + private static final LongSupplier INDIRECT_TRACK_COUNT_SUPPLIER = INDIRECT_TRACK_COUNT::sum; + + static PointValues.PointTree wrap(PointValues.PointTree pt, Object sentinel) throws IOException { + return new PointValues.PointTree() { + @Override + public PointValues.PointTree clone() { + try { + return wrap(pt.clone(), sentinel); + } catch (IOException e) { + throw new UncheckedIOException("this should never happen", e); + } + } + + @Override + public boolean moveToChild() throws IOException { + return pt.moveToChild(); + } + + @Override + public boolean moveToSibling() throws IOException { + return pt.moveToSibling(); + } + + @Override + public boolean moveToParent() throws IOException { + return pt.moveToParent(); + } + + @Override + public byte[] getMinPackedValue() { + return pt.getMinPackedValue(); + } + + @Override + public byte[] getMaxPackedValue() { + return pt.getMaxPackedValue(); + } + + @Override + public long size() { + return pt.size(); + } + + @Override + public void visitDocIDs(PointValues.IntersectVisitor visitor) throws IOException { + try { + pt.visitDocIDs(visitor); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public void visitDocValues(PointValues.IntersectVisitor visitor) throws IOException { + try { + pt.visitDocValues(visitor); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + } + + private static final class TrackingPostingsEnum extends FilterLeafReader.FilterPostingsEnum { + private final Object sentinel; + + private TrackingPostingsEnum(PostingsEnum in, Object sentinel) { + super(in); + this.sentinel = sentinel; + } + + @Override + public long cost() { + try { + return super.cost(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + } + + static TermsEnum wrap(TermsEnum te, Object sentinel) throws IOException { + return new FilterLeafReader.FilterTermsEnum(te) { + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + INDIRECT_TRACK_COUNT.increment(); + if (reuse instanceof TrackingPostingsEnum) { + PostingsEnum extantIn = ((TrackingPostingsEnum) reuse).in; + PostingsEnum check = super.postings(extantIn, flags); + if (check == extantIn) { + // same backing, with updated state + return reuse; + } else { + return new TrackingPostingsEnum(check, sentinel); + } + } else { + return new TrackingPostingsEnum(super.postings(reuse, flags), sentinel); + } + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + INDIRECT_TRACK_COUNT.increment(); + ImpactsEnum raw = super.impacts(flags); + return new ImpactsEnum() { + @Override + public void advanceShallow(int target) throws IOException { + raw.advanceShallow(target); + } + + @Override + public Impacts getImpacts() throws IOException { + return raw.getImpacts(); + } + + @Override + public int freq() throws IOException { + return raw.freq(); + } + + @Override + public int nextPosition() throws IOException { + return raw.nextPosition(); + } + + @Override + public int startOffset() throws IOException { + return raw.startOffset(); + } + + @Override + public int endOffset() throws IOException { + return raw.endOffset(); + } + + @Override + public BytesRef getPayload() throws IOException { + return raw.getPayload(); + } + + @Override + public int docID() { + return raw.docID(); + } + + @Override + public int nextDoc() throws IOException { + return raw.nextDoc(); + } + + @Override + public int advance(int target) throws IOException { + return raw.advance(target); + } + + @Override + public long cost() { + try { + return raw.cost(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + } + }; + } + + interface RefTrackShim { + V shim(V in, Object sentinel); + } + + V execute(FPIOFunction function) throws IOException { + return execute(function, null, null); + } + + /** + * trackRaw deserves special explanation: where possible, we want to track the raw + * instance, not the shimmed instance. There is an exception however (e.g. for {@link + * PointsReader#getValues(String)} and {@link FieldsProducer#terms(String)}), where shared + * instances may be returned. In order to leverage refcounting to determine when a resource is + * eligible for unloading, in such cases we must register/track the reference of our one-off + * shim/wrapper instance. + * + *

When this method is called with trackRaw=false, the shim implementation should + * (out of an abundance of caution) override all methods to wrap with try/finally and + * call {@link Reference#reachabilityFence(Object)} in the finally block; i.e.: + * Reference.reachabilityFence(this). This will prevent dead reference analysis from + * collecting the wrapper and unloading the resource after entering the shim method, but before + * completing the call to the raw/backing method. + */ + V execute(FPIOFunction function, K arg, RefTrackShim shim) throws IOException { + try (CloseableVal active = backing()) { + V raw = function.apply(active.get(), arg); + if (raw == null) { + return null; + } else { + return shim.shim(raw, active.sentinel); + } + } finally { + lastAccessNanos = System.nanoTime(); + } + } + + interface FPIOFunction { + V apply(T fp, K arg) throws IOException; + } + + /** + * This should be set to true for Lucene tests (where the only lifecycle hook we have + * is per-{@link Directory}), and set to false from contexts that call {@link + * UnloaderCoordinationPoint#setUnloadHelperSupplier(Supplier)} with an external executor whose + * lifecycle is managed by other means (e.g., from Solr). + */ + public static final boolean EXECUTOR_PER_DIRECTORY; + + /** + * Time threshold at which a resource becomes eligible for unloading. Set this very low (0 or 1) + * for stress testing. + */ + public static final long KEEP_ALIVE_NANOS; + + private static final String DEFAULT_KEEP_ALIVE_SPEC = "60m"; + private static final long DEFAULT_KEEP_ALIVE_NANOS = TimeUnit.MINUTES.toNanos(60); + + /** + * Additional time allowance for the first use of resource after load/reload. Set this very low + * (e.g., 0) for stress testing. + */ + private static final long INITIAL_NANOS; + + private static final String DEFAULT_INITIAL_SPEC = "1m"; + private static final long DFEAULT_INITIAL_NANOS = TimeUnit.MINUTES.toNanos(1); + + /** + * Visible for testing. This may be used to override/disable resource unloading for specific tests + * that are known to not play well with unloading. + * + *

NOTE: this is known to be the case only for tests that evaluate opening index over a + * directory without communication with the {@link IndexWriter}. In such cases, with an + * active {@link IndexWriter}, there is no way to "refcount" index files to prevent them from + * being deleted. Lucene is ok with this by design, since, once opened, segment readers will hold + * filehandles to files even if they are deleted; but it doesn't play well with {@link Unloader}, + * which has to re-load from disk. This would be problematic for some supported uses of Lucene, + * but not for Lucene as used by Solr, e.g., where {@link DirectoryReader} instances are + * always acquired from (and incRef) {@link IndexWriter}. + */ + static boolean DISABLE; + + static { + DISABLE = "true".equals(System.getProperty("lucene.unload.disable")); + } + + static { + List deferred = DEFERRED_INIT_MESSAGES.get(); + EXECUTOR_PER_DIRECTORY = + "true".equals(System.getProperty("lucene.unload.executorPerDirectory")); // default to false + deferred.add("INFO: set static property EXECUTOR_PER_DIRECTORY=" + EXECUTOR_PER_DIRECTORY); + KEEP_ALIVE_NANOS = + getNanos("lucene.unload.ttl", DEFAULT_KEEP_ALIVE_SPEC, DEFAULT_KEEP_ALIVE_NANOS, deferred); + deferred.add( + "INFO: set static property DEFAULT_KEEP_ALIVE_MILLIS=" + + TimeUnit.NANOSECONDS.toMillis(KEEP_ALIVE_NANOS)); + INITIAL_NANOS = + getNanos("lucene.unload.initial", DEFAULT_INITIAL_SPEC, DFEAULT_INITIAL_NANOS, deferred); + deferred.add( + "INFO: set static property DEFAULT_INITIAL_MILLIS=" + + TimeUnit.NANOSECONDS.toMillis(INITIAL_NANOS)); + } + + private static long getNanos( + String syspropName, String defaultSpec, long defaultNanos, List deferred) { + try { + String unloadSpec = System.getProperty(syspropName, defaultSpec); + try { + return getNanos(unloadSpec); + } catch (IllegalArgumentException ex) { + deferred.add("WARN: bad " + syspropName + " spec: " + unloadSpec + " " + ex); + return defaultNanos; + } + } catch ( + @SuppressWarnings("unused") + Exception ex) { + return defaultNanos; + } + } + + /** + * Returns the number of nanoseconds represented by the specified string spec. + * + *

The spec may be suffixed with "s" (seconds), "m" (minutes), "h" (hours), or "d" (days). If + * unsuffixed (a straight numeric value), the spec is interpreted as milliseconds. + */ + public static long getNanos(String unloadSpec) { + if (unloadSpec.isEmpty()) { + throw new IllegalArgumentException("empty unloadSpec"); + } + int endIdx = unloadSpec.length() - 1; + TimeUnit t; + char c = unloadSpec.charAt(endIdx); + switch (c) { + case 's': + t = TimeUnit.SECONDS; + break; + case 'm': + t = TimeUnit.MINUTES; + break; + case 'h': + t = TimeUnit.HOURS; + break; + case 'd': + t = TimeUnit.DAYS; + break; + default: + if (c >= '0' && c <= '9') { + endIdx++; + t = TimeUnit.MILLISECONDS; + } else { + throw new IllegalArgumentException("expected plain numeric arg (millis)"); + } + } + int v = Integer.parseInt(unloadSpec, 0, endIdx, 10); + return t.toNanos(v); + } + + /** + * Passed to {@link Unloader#Unloader(UnloadHelper, IOFunction, long, IOFunction)} ctor. This + * provides a means for reporting unload/reload lifecycle events to higher-level components. This + * can be useful if used in a framework that wants to track metrics around load/unload, or manage + * the underlying components that handle load/unload according to framework lifecycles. + * + *

e.g., framework may supply its own {@link ScheduledExecutorService} for running unload + * checks, and may (via {@link #maybeHandleRefQueues(AtomicBoolean, LongSupplier, LongSupplier)} + * manage the handling of reference tracking as well. + */ + public interface UnloadHelper { + /** + * Called once, from the {@link Unloader#Unloader(UnloadHelper, IOFunction, long, IOFunction)} + * ctor, to provide the {@link ScheduledExecutorService} for scheduling periodic unload tasks. + * The {@link Unloader} under construction is passed as an arg. Implementations should call + * {@link Unloader#setInfoStream(InfoStream)} on the specified instance, and may also be used to + * update metrics about number of created instances, etc. + */ + ScheduledExecutorService onCreation(Unloader u); + + /** + * Called for each load/reload of backing resource + * + * @param nanosSincePriorAccess how long it's been since this resource was last accessed before + * reload + * @param loadTime how long did it take to load this resource (nanos) + */ + default void onLoad(long nanosSincePriorAccess, long loadTime, boolean initial) {} + + /** + * Called for each unload of backing resource + * + * @param nanosSinceLastAccess how long it's bene since this resource was last accessed before + * unload + */ + default void onUnload(long nanosSinceLastAccess) {} + + /** + * called when the associated top-level resource is closed. Any backing resources held open at + * time of close will also be unloaded (closed); this will be separately reported via {@link + * #onUnload(long)}. + */ + default void onClose() {} + + /** + * A callback that allows a framework to handle refQueue management (and provides a window into + * the size of the refQueue(s) for metrics purposes. + * + *

handle the refQueues, and should set it back to false if/when they stop + * handling any of the provided refQueues. + * + * @param indirectTrackedCount for metrics; the number of references not tracked directly, but + * tracked via reference to top-level tracked object. + */ + default void maybeHandleRefQueues( + AtomicBoolean initialized, LongSupplier indirectTrackedCount, LongSupplier refsCollected) { + // no-op default impl + } + + /** + * Allows this {@link UnloadHelper} to defer the unloading of the associated resource. This is + * usually used in conjunction with tracking historical behavior of the resource, and may be + * used to avoid thrashing. + * + * @param nanosSinceLastAccess number of nanos since last access of the associated resource + * @return If positive (> 0), represents the number of nanos that unloading + * should be deferred (if <= 0, unloading will not be deferred). + */ + default long deferUnload(long nanosSinceLastAccess) { + return -1; + } + + default boolean disableUnload(Class resourceClass, SegmentReadState srs) { + return false; + } + } + + /** + * {@link UnloadHelper} base impl that handles setting {@link ScheduledExecutorService} and {@link + * InfoStream} on {@link Unloader} callers of {@link #onCreation(Unloader)}. + */ + public abstract static class AbstractUnloadHelper implements UnloadHelper { + private volatile ScheduledExecutorService exec; + private volatile InfoStream infoStream; + + /** + * Provides executor (for scheduling periodic unload tasks) and infoStream for setting on + * calling {@link Unloader}s. Both args must be non-null. + */ + public AbstractUnloadHelper(ScheduledExecutorService exec, InfoStream infoStream) { + this.exec = exec; + this.infoStream = infoStream; + } + + @Override + public ScheduledExecutorService onCreation(Unloader u) { + ScheduledExecutorService ret = exec; + InfoStream infoStream = this.infoStream; + u.setInfoStream(infoStream); + exec = null; + this.infoStream = null; + return ret; + } + } + + private static boolean disableUnload(SegmentReadState srs) { + return srs.context.context == IOContext.Context.MERGE; + } + + /** + * Returns a {@link PointsReader} over the specified {@link SegmentReadState}, conditionally + * wrapped to allow dynamic unloading and on-demand reloading of the backing resource. + * + *

The backing resource is initially loaded, and will be reloaded if applicable, via the + * provided `open` {@link IOSupplier}. The {@link Directory} is passed only to be used as an + * {@link UnloaderCoordinationPoint}. + * + *

NOTE: the segment files specified by {@link SegmentReadState}, which must be present upon + * initialization, must still be accessible on disk if/when the backing resource is reloaded + * (after having been unloaded). In practice, this means that {@link + * IndexWriter#incRefDeleter(SegmentInfos)} must have been called for the {@link SegmentInfos} + * associated with the specified {@link SegmentReadState}. This happens organically in many + * contexts, but not all -- particularly in tests. + */ + public static PointsReader pointsReader( + IOSupplier open, Directory dir, SegmentReadState srs) throws IOException { + UnloadHelper unloadHelper; + if (disableUnload(srs) + || DISABLE + || (unloadHelper = UnloaderCoordinationPoint.getUnloadHelper(dir)) == null + || unloadHelper.disableUnload(PointsReader.class, srs)) { + return open.get(); + } + String type = PointsReader.class.getSimpleName(); + return new UnloadingPointsReader( + unloadHelper, + (u) -> { + long start = System.nanoTime(); + PointsReader pr = open.get(); + try { + u.exec.schedule( + maybeUnloadTask(u, type, u.reporter), + KEEP_ALIVE_NANOS + INITIAL_NANOS, + TimeUnit.NANOSECONDS); + } catch ( + @SuppressWarnings("unused") + RejectedExecutionException ex) { + // shutting down; log and swallow + if (u.out.isEnabled("UN")) + u.out.message("UN", "WARN: new PointsReader while shutting down"); + } catch (Throwable t) { + try (pr) { + throw t; + } + } + u.reporter.onLoad( + start - u.lastAccessNanos, System.nanoTime() - start, u.description == null); + return pr; + }, + KEEP_ALIVE_NANOS); + } + + /** + * To be implemented by components ({@link org.apache.lucene.codecs.PostingsFormat}, {@link + * org.apache.lucene.codecs.DocValuesFormat} or {@link org.apache.lucene.codecs.PointsFormat}) + * that are aware of resource unloading and need to be able to prevent unloading of associated + * resources. + */ + public interface UnloadAware { + /** + * If this method returns {@code true}, unloading of associated resources should be disabled. + */ + default boolean disableUnload(SegmentReadState state) { + return true; + } + } + + /** + * Returns a {@link FieldsProducer} over the specified {@link SegmentReadState}, conditionally + * wrapped to allow dynamic unloading and on-demand reloading of the backing resource. + * + *

The backing resource is initially loaded, and will be reloaded if applicable, via the + * provided `open` {@link IOSupplier}. The {@link Directory} is passed only to be used as an + * {@link UnloaderCoordinationPoint}. + * + *

NOTE: the segment files specified by {@link SegmentReadState}, which must be present upon + * initialization, must still be accessible on disk if/when the backing resource is reloaded + * (after having been unloaded). In practice, this means that {@link + * IndexWriter#incRefDeleter(SegmentInfos)} must have been called for the {@link SegmentInfos} + * associated with the specified {@link SegmentReadState}. This happens organically in many + * contexts, but not all -- particularly in tests. + */ + public static FieldsProducer fieldsProducer( + IOSupplier open, Directory dir, SegmentReadState srs) throws IOException { + UnloadHelper unloadHelper; + if (disableUnload(srs) + || DISABLE + || (unloadHelper = UnloaderCoordinationPoint.getUnloadHelper(dir)) == null + || unloadHelper.disableUnload(FieldsProducer.class, srs)) { + return open.get(); + } + String type = FieldsProducer.class.getSimpleName(); + return new UnloadingFieldsProducer( + unloadHelper, + (u) -> { + long start = System.nanoTime(); + FieldsProducer fp = open.get(); + try { + u.exec.schedule( + maybeUnloadTask(u, type, u.reporter), + KEEP_ALIVE_NANOS + INITIAL_NANOS, + TimeUnit.NANOSECONDS); + } catch ( + @SuppressWarnings("unused") + RejectedExecutionException ex) { + // shutting down; log and swallow + if (u.out.isEnabled("UN")) + u.out.message("UN", "WARN: new FieldsProducer while shutting down"); + } catch (Throwable t) { + try (fp) { + throw t; + } + } + u.reporter.onLoad( + start - u.lastAccessNanos, System.nanoTime() - start, u.description == null); + return fp; + }, + KEEP_ALIVE_NANOS); + } + + /** + * Returns a {@link DocValuesProducer} over the specified {@link SegmentReadState}, conditionally + * wrapped to allow dynamic unloading and on-demand reloading of the backing resource. + * + *

The backing resource is initially loaded, and will be reloaded if applicable, via the + * provided `open` {@link IOSupplier}. The {@link Directory} is passed only to be used as an + * {@link UnloaderCoordinationPoint}. + * + *

NOTE: the segment files specified by {@link SegmentReadState}, which must be present upon + * initialization, must still be accessible on disk if/when the backing resource is reloaded + * (after having been unloaded). In practice, this means that {@link + * IndexWriter#incRefDeleter(SegmentInfos)} must have been called for the {@link SegmentInfos} + * associated with the specified {@link SegmentReadState}. This happens organically in many + * contexts, but not all -- particularly in tests. + */ + public static DocValuesProducer docValuesProducer( + IOSupplier open, Directory dir, SegmentReadState srs) throws IOException { + UnloadHelper unloadHelper; + if (disableUnload(srs) + || DISABLE + || (unloadHelper = UnloaderCoordinationPoint.getUnloadHelper(dir)) == null + || unloadHelper.disableUnload(DocValuesProducer.class, srs)) { + return open.get(); + } + String type = DocValuesProducer.class.getSimpleName(); + return new UnloadingDocValuesProducer( + unloadHelper, + (u) -> { + long start = System.nanoTime(); + DocValuesProducer dvp = open.get(); + try { + u.exec.schedule( + maybeUnloadTask(u, type, u.reporter), + KEEP_ALIVE_NANOS + INITIAL_NANOS, + TimeUnit.NANOSECONDS); + } catch ( + @SuppressWarnings("unused") + RejectedExecutionException ex) { + // shutting down; log and swallow + if (u.out.isEnabled("UN")) + u.out.message("UN", "WARN: new DocValuesProducer while shutting down"); + } catch (Throwable t) { + try (dvp) { + throw t; + } + } + u.reporter.onLoad( + start - u.lastAccessNanos, System.nanoTime() - start, u.description == null); + return dvp; + }, + KEEP_ALIVE_NANOS); + } + + private static void printStackTrace(Throwable t, InfoStream out) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + t.printStackTrace(new PrintStream(baos, true, StandardCharsets.UTF_8)); + if (out.isEnabled("UN")) out.message("UN", baos.toString(StandardCharsets.UTF_8)); + } + + private static Callable maybeUnloadTask(Unloader u, String type, UnloadHelper reporter) { + return () -> { + long remaining; + try { + if (u.backing.get() == CLOSED || u.exec.isShutdown()) { + return null; + } + remaining = u.maybeUnload(); + } catch (Throwable t) { + if (!(t instanceof AlreadyClosedException)) { + if (u.out.isEnabled("UN")) + u.out.message( + "UN", + "WARN: exception in maybeUnload(); recheck in " + + TimeUnit.NANOSECONDS.toMillis(KEEP_ALIVE_NANOS) + + "ms " + + t); + printStackTrace(t, u.out); + u.exec.schedule( + maybeUnloadTask(u, type, reporter), KEEP_ALIVE_NANOS, TimeUnit.NANOSECONDS); + } + throw t; + } + if (remaining > 0) { + u.exec.schedule(maybeUnloadTask(u, type, reporter), remaining, TimeUnit.NANOSECONDS); + } else if (remaining == STILL_REFERENCED) { + u.exec.schedule(maybeUnloadTask(u, type, reporter), KEEP_ALIVE_NANOS, TimeUnit.NANOSECONDS); + } else if (remaining == UNLOADED) { + reporter.onUnload(System.nanoTime() - u.lastAccessNanos); + } else if (remaining == ALREADY_UNLOADED) { + // already unloaded + } else { + if (u.out.isEnabled("UN")) + u.out.message("UN", "ERROR: unexpected return value: " + remaining); + } + return null; + }; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/UnloadingDocValuesProducer.java b/lucene/core/src/java/org/apache/lucene/index/UnloadingDocValuesProducer.java new file mode 100644 index 000000000000..e6e8421fafa7 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/UnloadingDocValuesProducer.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import static org.apache.lucene.index.Unloader.FPIOFunction; + +import java.io.IOException; +import java.lang.ref.Reference; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.util.IOFunction; +import org.apache.lucene.util.automaton.CompiledAutomaton; + +/** + * A {@link DocValuesProducer} that conditionally unloads (and subsequently reloads on-demand) + * backing resources (via {@link Unloader}). + */ +public class UnloadingDocValuesProducer extends DocValuesProducer { + + private final Unloader u; + + /** + * Creates a new instance + * + * @param reopen opens/reopens the backing resource + * @param keepAliveNanos time threshold (since last access) at which the backing resource is + * eligible to be unloaded + * @throws IOException e.g., on error opening backing resource + */ + public UnloadingDocValuesProducer( + Unloader.UnloadHelper reporter, + IOFunction, DocValuesProducer> reopen, + long keepAliveNanos) + throws IOException { + u = new Unloader<>(reporter, reopen, keepAliveNanos, Object::toString); + } + + @Override + public void close() throws IOException { + u.close(); + } + + private final FPIOFunction checkIntegrity = + (dvp, ignored) -> { + dvp.checkIntegrity(); + return null; + }; + + @Override + public void checkIntegrity() throws IOException { + u.execute(checkIntegrity); + } + + private final FPIOFunction getNumeric = + DocValuesProducer::getNumeric; + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + return u.execute( + getNumeric, + field, + (raw, sentinel) -> { + assert sentinel != null : "sentinel must not be null"; + return new FilterNumericDocValues(raw) { + @Override + public long cost() { + try { + return super.cost(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + }); + } + + private final FPIOFunction getBinary = + DocValuesProducer::getBinary; + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + return u.execute( + getBinary, + field, + (raw, sentinel) -> { + assert sentinel != null : "sentinel must not be null"; + return new FilterBinaryDocValues(raw) { + @Override + public long cost() { + try { + return super.cost(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + }); + } + + private final FPIOFunction getSorted = + DocValuesProducer::getSorted; + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + return u.execute( + getSorted, + field, + (rawSorted, sentinel) -> { + assert sentinel != null : "sentinel must not be null"; + // wrap so that we can track refs for returned `TermsEnum` instances + return new FilterSortedDocValues(rawSorted) { + @Override + public TermsEnum intersect(CompiledAutomaton automaton) throws IOException { + return Unloader.wrap(super.intersect(automaton), sentinel); + } + + @Override + public TermsEnum termsEnum() throws IOException { + return Unloader.wrap(super.termsEnum(), sentinel); + } + }; + }); + } + + private final FPIOFunction + getSortedNumeric = DocValuesProducer::getSortedNumeric; + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + return u.execute( + getSortedNumeric, + field, + (raw, sentinel) -> { + assert sentinel != null : "sentinel must not be null"; + return new FilterSortedNumericDocValues(raw) { + @Override + public long cost() { + try { + return super.cost(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + }); + } + + private final FPIOFunction getSortedSet = + DocValuesProducer::getSortedSet; + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + return u.execute( + getSortedSet, + field, + (rawSorted, sentinel) -> { + assert sentinel != null : "sentinel must not be null"; + // wrap so that we can track refs for returned `TermsEnum` instances + return new FilterSortedSetDocValues(rawSorted) { + @Override + public TermsEnum intersect(CompiledAutomaton automaton) throws IOException { + try { + return Unloader.wrap(super.intersect(automaton), sentinel); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public TermsEnum termsEnum() throws IOException { + try { + return Unloader.wrap(super.termsEnum(), sentinel); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + }); + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/UnloadingFieldsProducer.java b/lucene/core/src/java/org/apache/lucene/index/UnloadingFieldsProducer.java new file mode 100644 index 000000000000..f24a0383ddea --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/UnloadingFieldsProducer.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import static org.apache.lucene.index.Unloader.FPIOFunction; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.ref.Reference; +import java.util.Iterator; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOFunction; +import org.apache.lucene.util.automaton.CompiledAutomaton; + +/** + * A {@link DocValuesProducer} that conditionally unloads (and subsequently reloads on-demand) + * backing resources (via {@link Unloader}). + */ +public class UnloadingFieldsProducer extends FieldsProducer { + + private final int size; + + private final Unloader u; + + /** + * Creates a new instance + * + * @param reopen opens/reopens the backing resource + * @param keepAliveNanos time threshold (since last access) at which the backing resource is + * eligible to be unloaded + * @throws IOException e.g., on error opening backing resource + */ + public UnloadingFieldsProducer( + Unloader.UnloadHelper unloadHelper, + IOFunction, FieldsProducer> reopen, + long keepAliveNanos) + throws IOException { + final int[] size = new int[1]; + u = + new Unloader<>( + unloadHelper, + reopen, + keepAliveNanos, + (fp) -> { + size[0] = fp.size(); + return fp.toString(); + }); + this.size = size[0]; + } + + @Override + public void close() throws IOException { + u.close(); + } + + private final FPIOFunction checkIntegrity = + (fp, ignored) -> { + fp.checkIntegrity(); + return null; + }; + + @Override + public void checkIntegrity() throws IOException { + u.execute(checkIntegrity); + } + + private final FPIOFunction> iterator = + (fp, ignored) -> fp.iterator(); + + @Override + public Iterator iterator() { + try { + return u.execute( + iterator, + null, + (raw, sentinel) -> { + assert sentinel != null : "sentinel must not be null"; + return new Iterator() { + @Override + public boolean hasNext() { + return raw.hasNext(); + } + + @Override + public String next() { + try { + return raw.next(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + }); + } catch (IOException e) { + // this should never happen + throw new UncheckedIOException(e); + } + } + + private final FPIOFunction terms = Fields::terms; + + public static final ThreadLocal FIELD_REQUESTED = new ThreadLocal<>(); + + @Override + public Terms terms(String field) throws IOException { + try { + FIELD_REQUESTED.set(field); + return terms0(field); + } finally { + FIELD_REQUESTED.remove(); + } + } + + private Terms terms0(String field) throws IOException { + return u.execute( + terms, + field, + (rawTerms, sentinel) -> { + assert sentinel != null : "sentinel must not be null"; + // NOTE: we have to wrap here because a reference to the raw value may be + // retained internal to the backing `FieldsProducer`. This can generate a + // profusion of redundant references that never get collected. This is a + // memory leak, and also prevents resources from being unloaded, even when + // they should be eligible for unloading. + // + // This particular rationale for wrapping only applies to `Terms` -- other + // resources are already created as one-offs. + return new FilterLeafReader.FilterTerms(rawTerms) { + @Override + public TermsEnum iterator() throws IOException { + try { + return Unloader.wrap(super.iterator(), sentinel); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public TermsEnum intersect(CompiledAutomaton compiled, BytesRef startTerm) + throws IOException { + try { + return Unloader.wrap(super.intersect(compiled, startTerm), sentinel); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public long size() throws IOException { + try { + return super.size(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public long getSumTotalTermFreq() throws IOException { + try { + return super.getSumTotalTermFreq(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public long getSumDocFreq() throws IOException { + try { + return super.getSumDocFreq(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public int getDocCount() throws IOException { + try { + return super.getDocCount(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public boolean hasFreqs() { + try { + return super.hasFreqs(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public boolean hasOffsets() { + try { + return super.hasOffsets(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public boolean hasPositions() { + try { + return super.hasPositions(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public boolean hasPayloads() { + try { + return super.hasPayloads(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public Object getStats() throws IOException { + try { + return super.getStats(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public BytesRef getMin() throws IOException { + try { + return super.getMin(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public BytesRef getMax() throws IOException { + try { + return super.getMax(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + }); + } + + @Override + public int size() { + return size; + } +} diff --git a/lucene/core/src/java/org/apache/lucene/index/UnloadingPointsReader.java b/lucene/core/src/java/org/apache/lucene/index/UnloadingPointsReader.java new file mode 100644 index 000000000000..8d6d00fd8258 --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/index/UnloadingPointsReader.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import static org.apache.lucene.index.Unloader.FPIOFunction; + +import java.io.IOException; +import java.lang.ref.Reference; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.PointsReader; +import org.apache.lucene.util.IOFunction; + +/** + * A {@link DocValuesProducer} that conditionally unloads (and subsequently reloads on-demand) + * backing resources (via {@link Unloader}). + */ +public class UnloadingPointsReader extends PointsReader { + + private final Unloader u; + + /** + * Creates a new instance + * + * @param reopen opens/reopens the backing resource + * @param keepAliveNanos time threshold (since last access) at which the backing resource is + * eligible to be unloaded + * @throws IOException e.g., on error opening backing resource + */ + public UnloadingPointsReader( + Unloader.UnloadHelper unloadHelper, + IOFunction, PointsReader> reopen, + long keepAliveNanos) + throws IOException { + u = new Unloader<>(unloadHelper, reopen, keepAliveNanos, Object::toString); + } + + @Override + public void close() throws IOException { + u.close(); + } + + private final FPIOFunction checkIntegrity = + (pr, ignored) -> { + pr.checkIntegrity(); + return null; + }; + + @Override + public void checkIntegrity() throws IOException { + u.execute(checkIntegrity); + } + + private final FPIOFunction getValues = PointsReader::getValues; + + @Override + public PointValues getValues(String field) throws IOException { + return u.execute( + getValues, + field, + (rawPointValues, sentinel) -> { + assert sentinel != null : "sentinel must not be null!"; + // NOTE: we have to wrap here in order to track derived `PointTree` instances + return new PointValues() { + + @Override + public PointTree getPointTree() throws IOException { + try { + return Unloader.wrap(rawPointValues.getPointTree(), sentinel); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public byte[] getMinPackedValue() throws IOException { + try { + return rawPointValues.getMinPackedValue(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public byte[] getMaxPackedValue() throws IOException { + try { + return rawPointValues.getMaxPackedValue(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public int getNumDimensions() throws IOException { + try { + return rawPointValues.getNumDimensions(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public int getNumIndexDimensions() throws IOException { + try { + return rawPointValues.getNumIndexDimensions(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public int getBytesPerDimension() throws IOException { + try { + return rawPointValues.getBytesPerDimension(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public long size() { + try { + return rawPointValues.size(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + + @Override + public int getDocCount() { + try { + return rawPointValues.getDocCount(); + } finally { + Reference.reachabilityFence(sentinel); + } + } + }; + }); + } +} diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java index dbefd6a03cc9..7f7ee28f401b 100644 --- a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java +++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java @@ -30,13 +30,18 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import java.util.zip.CRC32; import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.Unloader; import org.apache.lucene.util.BitUtil; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.NamedThreadFactory; /** * A {@link ByteBuffer}-based {@link Directory} implementation that can be used to store index files @@ -49,7 +54,7 @@ * * @lucene.experimental */ -public final class ByteBuffersDirectory extends BaseDirectory { +public final class ByteBuffersDirectory extends BaseDirectory implements UnloaderCoordinationPoint { public static final BiFunction OUTPUT_AS_MANY_BUFFERS = (fileName, output) -> { ByteBuffersDataInput dataInput = output.toDataInput(); @@ -159,6 +164,9 @@ public ByteBuffersDirectory( super(factory); this.outputToInput = Objects.requireNonNull(outputToInput); this.bbOutputSupplier = Objects.requireNonNull(bbOutputSupplier); + if (exec != null) { + unloadHelperSupplier = () -> new Unloader.AbstractUnloadHelper(exec, InfoStream.NO_OUTPUT) {}; + } } @Override @@ -256,9 +264,34 @@ public IndexInput openInput(String name, IOContext context) throws IOException { @Override public void close() throws IOException { isOpen = false; + if (Unloader.EXECUTOR_PER_DIRECTORY) { + exec.shutdownNow(); + } files.clear(); } + private ScheduledExecutorService exec = + Unloader.EXECUTOR_PER_DIRECTORY + ? Executors.newSingleThreadScheduledExecutor( + new NamedThreadFactory("unload@" + System.identityHashCode(this))) + : null; + + private volatile Supplier unloadHelperSupplier; + + @Override + public void setUnloadHelperSupplier(Supplier supplier) { + if (Unloader.EXECUTOR_PER_DIRECTORY) { + throw new IllegalStateException(); + } + this.unloadHelperSupplier = supplier; + } + + @Override + public Unloader.UnloadHelper getUnloadHelper() { + Supplier supplier = unloadHelperSupplier; + return supplier == null ? null : supplier.get(); + } + @Override public Set getPendingDeletions() { return Collections.emptySet(); diff --git a/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java b/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java index f36a651b16b6..07069436738b 100644 --- a/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java +++ b/lucene/core/src/java/org/apache/lucene/store/FSDirectory.java @@ -36,11 +36,17 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import org.apache.lucene.index.Unloader; import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.NamedThreadFactory; /** * Base class for Directory implementations that store index files in the file system. new Unloader.AbstractUnloadHelper(exec, InfoStream.NO_OUTPUT) {}; + } } /** @@ -283,9 +292,34 @@ public void syncMetaData() throws IOException { @Override public synchronized void close() throws IOException { isOpen = false; + if (Unloader.EXECUTOR_PER_DIRECTORY) { + exec.shutdownNow(); + } deletePendingFiles(); } + private final ScheduledExecutorService exec = + Unloader.EXECUTOR_PER_DIRECTORY + ? Executors.newSingleThreadScheduledExecutor( + new NamedThreadFactory("unload@" + System.identityHashCode(this))) + : null; + + private volatile Supplier unloadHelperSupplier; + + @Override + public void setUnloadHelperSupplier(Supplier supplier) { + if (Unloader.EXECUTOR_PER_DIRECTORY) { + throw new IllegalStateException(); + } + this.unloadHelperSupplier = supplier; + } + + @Override + public Unloader.UnloadHelper getUnloadHelper() { + Supplier supplier = unloadHelperSupplier; + return supplier == null ? null : supplier.get(); + } + /** * @return the underlying filesystem directory */ diff --git a/lucene/core/src/java/org/apache/lucene/store/MappedByteBufferIndexInputProvider.java b/lucene/core/src/java/org/apache/lucene/store/MappedByteBufferIndexInputProvider.java index 0b3612407a20..c0dfa6013a0c 100644 --- a/lucene/core/src/java/org/apache/lucene/store/MappedByteBufferIndexInputProvider.java +++ b/lucene/core/src/java/org/apache/lucene/store/MappedByteBufferIndexInputProvider.java @@ -37,6 +37,7 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.SuppressForbidden; +/** Provides {@link IndexInput} over {@link MappedByteBuffer}s. */ public final class MappedByteBufferIndexInputProvider implements MMapDirectory.MMapIndexInputProvider { diff --git a/lucene/core/src/java/org/apache/lucene/store/UnloaderCoordinationPoint.java b/lucene/core/src/java/org/apache/lucene/store/UnloaderCoordinationPoint.java new file mode 100644 index 000000000000..168b4210484e --- /dev/null +++ b/lucene/core/src/java/org/apache/lucene/store/UnloaderCoordinationPoint.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.store; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; +import org.apache.lucene.index.Unloader; + +/** + * Provides a means of setting and getting a {@link ScheduledExecutorService} for executing + * ancillary tasks. + */ +public interface UnloaderCoordinationPoint { + Unloader.UnloadHelper getUnloadHelper(); + + void setUnloadHelperSupplier(Supplier helper); + + static boolean setUnloadHelperSupplier( + Directory dir, Supplier helperSupplier) { + dir = FilterDirectory.unwrap(dir); + if (dir instanceof UnloaderCoordinationPoint) { + ((UnloaderCoordinationPoint) dir).setUnloadHelperSupplier(helperSupplier); + return true; + } else { + return false; + } + } + + static Unloader.UnloadHelper getUnloadHelper(Directory dir) { + dir = FilterDirectory.unwrap(dir); + if (dir instanceof UnloaderCoordinationPoint) { + return ((UnloaderCoordinationPoint) dir).getUnloadHelper(); + } else { + return null; + } + } +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java b/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java index 8832f4cfde8a..32cb6e78e972 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestAddIndexes.java @@ -1053,6 +1053,7 @@ void close(boolean doWait) throws Throwable { void closeDir() throws Throwable { for (int i = 0; i < NUM_COPY; i++) readers[i].close(); dir2.close(); + dir.close(); } abstract void doBody(int j, Directory[] dirs) throws Throwable; @@ -1464,6 +1465,7 @@ public void testNonCFSLeftovers() throws Exception { assertEquals("Only one compound segment should exist", 1, sis.size()); assertTrue(sis.info(0).info.getUseCompoundFile()); dir.close(); + IOUtils.close(dirs); } private static final class UnRegisteredCodec extends FilterCodec { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestCodecHoldsOpenFiles.java b/lucene/core/src/test/org/apache/lucene/index/TestCodecHoldsOpenFiles.java index 5f1a4cc5e365..60d5f1008503 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestCodecHoldsOpenFiles.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestCodecHoldsOpenFiles.java @@ -27,33 +27,38 @@ public class TestCodecHoldsOpenFiles extends LuceneTestCase { public void test() throws Exception { - BaseDirectoryWrapper d = newDirectory(); - d.setCheckIndexOnClose(false); - // we nuke files, but verify the reader still works - RandomIndexWriter w = new RandomIndexWriter(random(), d); - int numDocs = atLeast(100); - for (int i = 0; i < numDocs; i++) { - Document doc = new Document(); - doc.add(newField("foo", "bar", TextField.TYPE_NOT_STORED)); - doc.add(new IntPoint("doc", i)); - doc.add(new IntPoint("doc2d", i, i)); - doc.add(new NumericDocValuesField("dv", i)); - w.addDocument(doc); - } + Unloader.DISABLE = true; + try { + BaseDirectoryWrapper d = newDirectory(); + d.setCheckIndexOnClose(false); + // we nuke files, but verify the reader still works + RandomIndexWriter w = new RandomIndexWriter(random(), d); + int numDocs = atLeast(100); + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + doc.add(newField("foo", "bar", TextField.TYPE_NOT_STORED)); + doc.add(new IntPoint("doc", i)); + doc.add(new IntPoint("doc2d", i, i)); + doc.add(new NumericDocValuesField("dv", i)); + w.addDocument(doc); + } - IndexReader r = w.getReader(); - w.commit(); - w.close(); + IndexReader r = w.getReader(); + w.commit(); + w.close(); - for (String name : d.listAll()) { - d.deleteFile(name); - } + for (String name : d.listAll()) { + d.deleteFile(name); + } - for (LeafReaderContext cxt : r.leaves()) { - TestUtil.checkReader(cxt.reader()); - } + for (LeafReaderContext cxt : r.leaves()) { + TestUtil.checkReader(cxt.reader()); + } - r.close(); - d.close(); + r.close(); + d.close(); + } finally { + Unloader.DISABLE = false; + } } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDirectoryReaderReopen.java b/lucene/core/src/test/org/apache/lucene/index/TestDirectoryReaderReopen.java index 6e993a17473e..6ab38e6aeb3c 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDirectoryReaderReopen.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDirectoryReaderReopen.java @@ -1068,6 +1068,8 @@ public void testDeleteIndexFilesWhileReaderStillOpen() throws Exception { () -> { DirectoryReader.openIfChanged(r); }); + r.close(); + dir.close(); } public void testReuseUnchangedLeafReaderOnDVUpdate() throws IOException { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestForTooMuchCloning.java b/lucene/core/src/test/org/apache/lucene/index/TestForTooMuchCloning.java index 7c72b3d2e76a..cf2cdec6b990 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestForTooMuchCloning.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestForTooMuchCloning.java @@ -16,6 +16,8 @@ */ package org.apache.lucene.index; +import java.io.IOException; +import java.io.UncheckedIOException; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.TextField; @@ -65,6 +67,23 @@ public void test() throws Exception { dir.getInputCloneCount() < 500); final IndexSearcher s = newSearcher(r); + + // NOTE: this is a workaround -- holding Terms instances in memory prevent them + // from being GC'd, which prevents the backing FieldsProducer from being aggressively + // unloaded (via `Unloader`), which artificially inflates the number of clones. + @SuppressWarnings("unused") + Terms[] holdTermsHack = + s.getLeafContexts().stream() + .map( + (c) -> { + try { + return c.reader().terms("field"); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }) + .toArray(Terms[]::new); + // important: set this after newSearcher, it might have run checkindex final int cloneCount = dir.getInputCloneCount(); // dir.setVerboseClone(true); @@ -83,5 +102,6 @@ public void test() throws Exception { queryCloneCount < 50); r.close(); dir.close(); + holdTermsHack = null; // ensure it's referenced, so it's not GC'd until now. } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterCommit.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterCommit.java index ff13703eb5d0..4792f26af8e7 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterCommit.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriterCommit.java @@ -383,6 +383,9 @@ public void run() { assertTrue(r2 != r); r.close(); r = r2; + // TODO: we sometimes hit `Unloader`-related FileNotFoundExceptions + // here, because there's no connection between IndexWriter and Reader. + // TODO: Confirm this is only a test issue. assertEquals("term=f:" + s + "; r=" + r, 1, r.docFreq(new Term("f", s))); } } while (++iterations < maxIterations); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestLazyProxSkipping.java b/lucene/core/src/test/org/apache/lucene/index/TestLazyProxSkipping.java index 310d8524be91..2ca27cd8df77 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestLazyProxSkipping.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestLazyProxSkipping.java @@ -62,7 +62,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { } } - private void createIndex(int numHits) throws IOException { + private Directory createIndex(int numHits) throws IOException { int numDocs = 500; final Analyzer analyzer = @@ -106,6 +106,7 @@ public TokenStreamComponents createComponents(String fieldName) { LeafReader reader = getOnlyLeafReader(DirectoryReader.open(directory)); this.searcher = newSearcher(reader); + return directory; } private ScoreDoc[] search() throws IOException { @@ -115,7 +116,7 @@ private ScoreDoc[] search() throws IOException { } private void performTest(int numHits) throws IOException { - createIndex(numHits); + Directory dir = createIndex(numHits); this.seeksCounter = 0; ScoreDoc[] hits = search(); // verify that the right number of docs was found @@ -127,6 +128,7 @@ private void performTest(int numHits) throws IOException { "seeksCounter=" + this.seeksCounter + " numHits=" + numHits, this.seeksCounter <= numHits + 1); searcher.getIndexReader().close(); + dir.close(); } public void testLazySkipping() throws IOException { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMultiDocValues.java b/lucene/core/src/test/org/apache/lucene/index/TestMultiDocValues.java index e4cb61eca0af..acee0a7d25b6 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestMultiDocValues.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestMultiDocValues.java @@ -55,6 +55,7 @@ public void testNumerics() throws Exception { } } DirectoryReader ir = iw.getReader(); + incRefFiles(iw, ir); iw.forceMerge(1); DirectoryReader ir2 = iw.getReader(); LeafReader merged = getOnlyLeafReader(ir2); @@ -100,6 +101,7 @@ public void testBinary() throws Exception { } } DirectoryReader ir = iw.getReader(); + incRefFiles(iw, ir); iw.forceMerge(1); DirectoryReader ir2 = iw.getReader(); LeafReader merged = getOnlyLeafReader(ir2); @@ -149,6 +151,7 @@ public void testSorted() throws Exception { } } DirectoryReader ir = iw.getReader(); + incRefFiles(iw, ir); iw.forceMerge(1); DirectoryReader ir2 = iw.getReader(); LeafReader merged = getOnlyLeafReader(ir2); @@ -202,6 +205,7 @@ public void testSortedWithLotsOfDups() throws Exception { } } DirectoryReader ir = iw.getReader(); + incRefFiles(iw, ir); iw.forceMerge(1); DirectoryReader ir2 = iw.getReader(); LeafReader merged = getOnlyLeafReader(ir2); @@ -254,6 +258,7 @@ public void testSortedSet() throws Exception { } } DirectoryReader ir = iw.getReader(); + incRefFiles(iw, ir); iw.forceMerge(1); DirectoryReader ir2 = iw.getReader(); LeafReader merged = getOnlyLeafReader(ir2); @@ -320,6 +325,7 @@ public void testSortedSetWithDups() throws Exception { } } DirectoryReader ir = iw.getReader(); + incRefFiles(iw, ir); iw.forceMerge(1); DirectoryReader ir2 = iw.getReader(); LeafReader merged = getOnlyLeafReader(ir2); @@ -385,6 +391,7 @@ public void testSortedNumeric() throws Exception { } } DirectoryReader ir = iw.getReader(); + incRefFiles(iw, ir); iw.forceMerge(1); DirectoryReader ir2 = iw.getReader(); LeafReader merged = getOnlyLeafReader(ir2); @@ -446,4 +453,24 @@ private void testRandomAdvanceExact(DocValuesIterator iter1, DocValuesIterator i assertEquals(exists1, exists2); } } + + /** + * In conjunction with {@link Unloader}, certain configurations/implementations of {@link + * DirectoryReader} do not incRef their associated files. This is a problem here because the files + * get deleted (though presumably still referenced as zombie filehandles from + * unclosed/non-unloading resource objects. + * + *

To work around this, we manually incRef the files. + * + *

TODO: confirm that in practice this will be a "test-only" problem. + */ + private static void incRefFiles(RandomIndexWriter w, DirectoryReader ir) throws IOException { + ir = FilterDirectoryReader.unwrap(ir); + if (ir instanceof StandardDirectoryReader) { + StandardDirectoryReader sdr = (StandardDirectoryReader) ir; + SegmentInfos segInfos = sdr.segmentInfos; + IndexWriter writer = w.w; + writer.incRefDeleter(segInfos); + } + } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestMultiLevelSkipList.java b/lucene/core/src/test/org/apache/lucene/index/TestMultiLevelSkipList.java index 0b0e6c8759cf..c444574a00b9 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestMultiLevelSkipList.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestMultiLevelSkipList.java @@ -96,6 +96,8 @@ public void testSimpleSkip() throws IOException { // because than more bytes would be read from the freqStream checkSkipTo(tp, 4800, 250); // one skip on level 2 } + reader.close(); + dir.close(); } public void checkSkipTo(PostingsEnum tp, int target, int maxCounter) throws IOException { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestPointValues.java b/lucene/core/src/test/org/apache/lucene/index/TestPointValues.java index 334db5dccc2c..a24fe10f728b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestPointValues.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestPointValues.java @@ -772,6 +772,7 @@ public void testMergedStatsOneSegmentWithoutPoints() throws IOException { assertNull(PointValues.getMaxPackedValue(reader, "field2")); assertEquals(0, PointValues.getDocCount(reader, "field2")); assertEquals(0, PointValues.size(reader, "field2")); + dir.close(); } public void testMergedStatsAllPointsDeleted() throws IOException { @@ -792,6 +793,7 @@ public void testMergedStatsAllPointsDeleted() throws IOException { assertNull(PointValues.getMaxPackedValue(reader, "field")); assertEquals(0, PointValues.getDocCount(reader, "field")); assertEquals(0, PointValues.size(reader, "field")); + dir.close(); } public void testMergedStats() throws IOException { diff --git a/lucene/core/src/test/org/apache/lucene/index/TestReadOnlyIndex.java b/lucene/core/src/test/org/apache/lucene/index/TestReadOnlyIndex.java index 9ea726f8abff..5de68c6cacdc 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestReadOnlyIndex.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestReadOnlyIndex.java @@ -100,6 +100,7 @@ private Void doTestReadOnlyIndex() throws Exception { assertEquals(1, isearcher.count(phraseQuery)); ireader.close(); + dir.close(); return null; // void } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSizeBoundedForceMerge.java b/lucene/core/src/test/org/apache/lucene/index/TestSizeBoundedForceMerge.java index e0fe06167871..9a1d159b373b 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestSizeBoundedForceMerge.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestSizeBoundedForceMerge.java @@ -93,6 +93,7 @@ public void testByteSizeLimit() throws Exception { // Should only be 3 segments in the index, because one of them exceeds the size limit sis = SegmentInfos.readLatestCommit(dir); assertEquals(3, sis.size()); + dir.close(); } public void testNumDocsLimit() throws Exception { @@ -125,6 +126,7 @@ public void testNumDocsLimit() throws Exception { // Should only be 3 segments in the index, because one of them exceeds the size limit SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(3, sis.size()); + dir.close(); } public void testLastSegmentTooLarge() throws Exception { @@ -151,6 +153,7 @@ public void testLastSegmentTooLarge() throws Exception { SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(2, sis.size()); + dir.close(); } public void testFirstSegmentTooLarge() throws Exception { @@ -177,6 +180,7 @@ public void testFirstSegmentTooLarge() throws Exception { SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(2, sis.size()); + dir.close(); } public void testAllSegmentsSmall() throws Exception { @@ -203,6 +207,7 @@ public void testAllSegmentsSmall() throws Exception { SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(1, sis.size()); + dir.close(); } public void testAllSegmentsLarge() throws Exception { @@ -228,6 +233,7 @@ public void testAllSegmentsLarge() throws Exception { SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(3, sis.size()); + dir.close(); } public void testOneLargeOneSmall() throws Exception { @@ -254,6 +260,7 @@ public void testOneLargeOneSmall() throws Exception { SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(4, sis.size()); + dir.close(); } public void testMergeFactor() throws Exception { @@ -286,6 +293,7 @@ public void testMergeFactor() throws Exception { // max merge docs settings. SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(4, sis.size()); + dir.close(); } public void testSingleMergeableSegment() throws Exception { @@ -315,6 +323,7 @@ public void testSingleMergeableSegment() throws Exception { SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(3, sis.size()); assertFalse(sis.info(2).hasDeletions()); + dir.close(); } public void testSingleNonMergeableSegment() throws Exception { @@ -339,6 +348,7 @@ public void testSingleNonMergeableSegment() throws Exception { // Verify that the last segment does not have deletions. SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(1, sis.size()); + dir.close(); } public void testSingleMergeableTooLargeSegment() throws Exception { @@ -367,5 +377,6 @@ public void testSingleMergeableTooLargeSegment() throws Exception { SegmentInfos sis = SegmentInfos.readLatestCommit(dir); assertEquals(1, sis.size()); assertTrue(sis.info(0).hasDeletions()); + dir.close(); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java b/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java index 78d00a1eec4e..e3e9bdf903a1 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestTryDelete.java @@ -100,6 +100,7 @@ public void testTryDeleteDocument() throws IOException { topDocs = searcher.search(new TermQuery(new Term("foo", "0")), 100); assertEquals(0, topDocs.totalHits.value); + directory.close(); } public void testTryDeleteDocumentCloseAndReopen() throws IOException { @@ -137,6 +138,7 @@ public void testTryDeleteDocumentCloseAndReopen() throws IOException { topDocs = searcher.search(new TermQuery(new Term("foo", "0")), 100); assertEquals(0, topDocs.totalHits.value); + directory.close(); } public void testDeleteDocuments() throws IOException { @@ -166,5 +168,6 @@ public void testDeleteDocuments() throws IOException { topDocs = searcher.search(new TermQuery(new Term("foo", "0")), 100); assertEquals(0, topDocs.totalHits.value); + directory.close(); } } diff --git a/lucene/core/src/test/org/apache/lucene/index/TestUnloader.java b/lucene/core/src/test/org/apache/lucene/index/TestUnloader.java new file mode 100644 index 000000000000..cd64a10cf9f0 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestUnloader.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.IOFunction; +import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.NamedThreadFactory; + +public class TestUnloader extends LuceneTestCase { + private static final class MyCloseable implements Closeable { + + private final LongAdder tracker; + private final AtomicInteger count; + private final LongAdder closedCt; + private final AtomicInteger exceptionOnClose = new AtomicInteger(); + + private MyCloseable(LongAdder tracker, AtomicInteger count, LongAdder closedCt) { + if (random().nextInt(20) == 0) { + throw new RuntimeException("exception opening"); + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + tracker.increment(); + this.tracker = tracker; + this.count = count; + this.closedCt = closedCt; + } + + @Override + public void close() throws IOException { + int v = random().nextInt(20) == 0 ? 1 : 2; + switch (exceptionOnClose.compareAndExchange(0, v)) { + case 0: + // first time here + break; + case 1: + return; // no exception + case 2: + throw new RuntimeException("exception closing"); + default: + throw new IllegalStateException(); + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + tracker.decrement(); + closedCt.increment(); + if (v == 2) { + throw new RuntimeException("exception closing"); + } + } + } + + private static final IOFunction NO_OP = Object::toString; + + public void test() throws IOException, InterruptedException { + for (int i = 0; i < 10; i++) { + System.out.println("do " + i); + doTest(); + } + } + + private static Unloader newInstance( + LongAdder tracker, AtomicInteger count, LongAdder createdCt, LongAdder closedCt) { + while (true) { + try { + return new Unloader<>( + new Unloader.AbstractUnloadHelper(null, InfoStream.NO_OUTPUT) {}, + (unloader) -> { + MyCloseable ret = new MyCloseable(tracker, count, closedCt); + createdCt.increment(); + return ret; + }, + 0, + NO_OP); + } catch ( + @SuppressWarnings("unused") + Exception ex) { + // keep trying + } + } + } + + public void doTest() throws IOException, InterruptedException { + final int nThreadsPerOp = 2; + final LongAdder tracker = new LongAdder(); + final AtomicInteger count = new AtomicInteger(); + final LongAdder createdCt = new LongAdder(); + final LongAdder closedCt = new LongAdder(); + Unloader u = newInstance(tracker, count, createdCt, closedCt); + ExecutorService exec = + Executors.newFixedThreadPool(nThreadsPerOp + 1, new NamedThreadFactory("testUnloader")); + AtomicBoolean complete = new AtomicBoolean(); + LongAdder check = new LongAdder(); + try { + List> futures = new ArrayList<>(nThreadsPerOp + 1); + for (int i = nThreadsPerOp; i > 0; i--) { + futures.add( + exec.submit( + () -> { + while (!complete.get()) { + Thread.sleep(20); + @SuppressWarnings("unused") + Object o = + u.execute( + (a, b) -> { + a.count.incrementAndGet(); + return null; + }); + check.increment(); + } + return "execute"; + })); + } + futures.add( + exec.submit( + () -> { + while (!complete.get()) { + try { + u.maybeUnload(); + } catch ( + @SuppressWarnings("unused") + AlreadyClosedException ex) { + break; + } catch (Throwable t) { + if (!"exception closing".equals(t.getMessage())) { + t.printStackTrace(System.err); + } + } + } + return "unload"; + })); + Thread.sleep(1000); // let it run for a while + System.out.println("closing ..."); + try { + u.close(); + } catch (RuntimeException ex) { + if (!"exception closing".equals(ex.getMessage())) { + throw ex; + } + } + int iterations = 0; + AssertionError deferred = null; + boolean eventuallyOk = false; + do { + try { + assertEquals(createdCt.sum(), closedCt.sum()); + assertEquals(0, tracker.sum()); + eventuallyOk = true; + } catch (AssertionError er) { + Thread.sleep(100); + if (deferred == null) { + deferred = er; + } + } + } while (++iterations < 10); + if (deferred != null) { + System.err.println("eventually ok: " + eventuallyOk); + throw deferred; + } + System.out.println( + "closed; tracker=" + + tracker.sum() + + ", count=" + + count.get() + + ", " + + createdCt.sum() + + "?=" + + closedCt.sum()); + long now = System.nanoTime(); + long until = now + TimeUnit.SECONDS.toNanos(30); + for (Future f : futures) { + try { + System.out.println("\t" + f.get(until - now, TimeUnit.NANOSECONDS)); + } catch (Exception ex) { + Throwable cause = ex.getCause(); + String msg = cause == null ? null : cause.getMessage(); + if (!(cause instanceof AlreadyClosedException) + && !"exception opening".equals(msg) + && !"exception closing".equals(msg)) { + System.out.println("\t" + ex); + ex.printStackTrace(System.out); + } + } + now = System.nanoTime(); + } + } finally { + exec.shutdown(); + exec.awaitTermination(5, TimeUnit.SECONDS); + } + } +}