diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 2bc869fec92b..99c6fc49a0f9 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -35,6 +35,10 @@ Improvements * GITHUB#13066: Support getMaxScore of DisjunctionSumScorer for non top level scoring clause (Shintaro Murakami) +* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first + implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other + executor is provided. (Ben Trent) + Optimizations --------------------- diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java index 582155d9b76c..bcea40170865 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsFormat.java @@ -24,6 +24,8 @@ import org.apache.lucene.codecs.KnnVectorsReader; import org.apache.lucene.codecs.KnnVectorsWriter; import org.apache.lucene.codecs.lucene90.IndexedDISI; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergeScheduler; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.search.DocIdSetIterator; @@ -163,7 +165,8 @@ public Lucene99HnswVectorsFormat(int maxConn, int beamWidth) { * @param numMergeWorkers number of workers (threads) that will be used when doing merge. If * larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec * @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are - * generated by this format to do the merge + * generated by this format to do the merge. If null, the configured {@link + * MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used. */ public Lucene99HnswVectorsFormat( int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) { @@ -184,10 +187,6 @@ public Lucene99HnswVectorsFormat( } this.maxConn = maxConn; this.beamWidth = beamWidth; - if (numMergeWorkers > 1 && mergeExec == null) { - throw new IllegalArgumentException( - "No executor service passed in when " + numMergeWorkers + " merge workers are requested"); - } if (numMergeWorkers == 1 && mergeExec != null) { throw new IllegalArgumentException( "No executor service is needed as we'll use single thread to merge"); diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java index bc5039fe31d8..514ddf7a5111 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99HnswVectorsWriter.java @@ -352,7 +352,14 @@ public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOE int[][] vectorIndexNodeOffsets = null; if (scorerSupplier.totalVectorCount() > 0) { // build graph - HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier); + HnswGraphMerger merger = + createGraphMerger( + fieldInfo, + scorerSupplier, + mergeState.intraMergeTaskExecutor == null + ? null + : new TaskExecutor(mergeState.intraMergeTaskExecutor), + numMergeWorkers); for (int i = 0; i < mergeState.liveDocs.length; i++) { merger.addReader( mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]); @@ -496,11 +503,23 @@ private void writeMeta( } private HnswGraphMerger createGraphMerger( - FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) { + FieldInfo fieldInfo, + RandomVectorScorerSupplier scorerSupplier, + TaskExecutor parallelMergeTaskExecutor, + int numParallelMergeWorkers) { if (mergeExec != null) { return new ConcurrentHnswMerger( fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers); } + if (parallelMergeTaskExecutor != null) { + return new ConcurrentHnswMerger( + fieldInfo, + scorerSupplier, + M, + beamWidth, + parallelMergeTaskExecutor, + numParallelMergeWorkers); + } return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth); } diff --git a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java index 8ffbbd759a40..d5ebd2e8bb1f 100644 --- a/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java @@ -21,9 +21,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.index.MergePolicy.OneMerge; -import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess; import org.apache.lucene.internal.tests.TestSecrets; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; @@ -109,6 +112,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler { private double forceMergeMBPerSec = Double.POSITIVE_INFINITY; + /** The executor provided for intra-merge parallelization */ + protected CachedExecutor intraMergeExecutor; + /** Sole constructor, with all settings set to default values. */ public ConcurrentMergeScheduler() {} @@ -259,6 +265,16 @@ synchronized void removeMergeThread() { assert false : "merge thread " + currentThread + " was not found"; } + @Override + public Executor getIntraMergeExecutor(OneMerge merge) { + assert intraMergeExecutor != null : "scaledExecutor is not initialized"; + // don't do multithreaded merges for small merges + if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) { + return super.getIntraMergeExecutor(merge); + } + return intraMergeExecutor; + } + @Override public Directory wrapForMerge(OneMerge merge, Directory in) { Thread mergeThread = Thread.currentThread(); @@ -268,6 +284,9 @@ public Directory wrapForMerge(OneMerge merge, Directory in) { } // Return a wrapped Directory which has rate-limited output. + // Note: the rate limiter is only per thread. So, if there are multiple merge threads running + // and throttling is required, each thread will be throttled independently. + // The implication of this, is that the total IO rate could be higher than the target rate. RateLimiter rateLimiter = ((MergeThread) mergeThread).rateLimiter; return new FilterDirectory(in) { @Override @@ -279,14 +298,6 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti // somewhere that is failing to pass down the right IOContext: assert context.context == IOContext.Context.MERGE : "got context=" + context.context; - // Because rateLimiter is bound to a particular merge thread, this method should - // always be called from that context. Verify this. - assert mergeThread == Thread.currentThread() - : "Not the same merge thread, current=" - + Thread.currentThread() - + ", expected=" - + mergeThread; - return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context)); } }; @@ -445,8 +456,15 @@ private static String rateToString(double mbPerSec) { } @Override - public void close() { - sync(); + public void close() throws IOException { + super.close(); + try { + sync(); + } finally { + if (intraMergeExecutor != null) { + intraMergeExecutor.shutdown(); + } + } } /** @@ -510,6 +528,9 @@ public synchronized int mergeThreadCount() { void initialize(InfoStream infoStream, Directory directory) throws IOException { super.initialize(infoStream, directory); initDynamicDefaults(directory); + if (intraMergeExecutor == null) { + intraMergeExecutor = new CachedExecutor(); + } } @Override @@ -755,11 +776,16 @@ void clearSuppressExceptions() { @Override public String toString() { - StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": "); - sb.append("maxThreadCount=").append(maxThreadCount).append(", "); - sb.append("maxMergeCount=").append(maxMergeCount).append(", "); - sb.append("ioThrottle=").append(doAutoIOThrottle); - return sb.toString(); + return getClass().getSimpleName() + + ": " + + "maxThreadCount=" + + maxThreadCount + + ", " + + "maxMergeCount=" + + maxMergeCount + + ", " + + "ioThrottle=" + + doAutoIOThrottle; } private boolean isBacklog(long now, OneMerge merge) { @@ -902,12 +928,64 @@ private static String getSegmentName(MergePolicy.OneMerge merge) { } static { - TestSecrets.setConcurrentMergeSchedulerAccess( - new ConcurrentMergeSchedulerAccess() { - @Override - public void setSuppressExceptions(ConcurrentMergeScheduler cms) { - cms.setSuppressExceptions(); - } - }); + TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions); + } + + /** + * This executor provides intra-merge threads for parallel execution of merge tasks. It provides a + * limited number of threads to execute merge tasks. In particular, if the number of + * `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in + * the calling thread. + */ + private class CachedExecutor implements Executor { + + private final AtomicInteger activeCount = new AtomicInteger(0); + private final ThreadPoolExecutor executor; + + public CachedExecutor() { + this.executor = + new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>()); + } + + void shutdown() { + executor.shutdown(); + } + + @Override + public void execute(Runnable command) { + final boolean isThreadAvailable; + // we need to check if a thread is available before submitting the task to the executor + // synchronize on CMS to get an accurate count of current threads + synchronized (ConcurrentMergeScheduler.this) { + int max = maxThreadCount - mergeThreads.size() - 1; + int value = activeCount.get(); + if (value < max) { + activeCount.incrementAndGet(); + assert activeCount.get() > 0 : "active count must be greater than 0 after increment"; + isThreadAvailable = true; + } else { + isThreadAvailable = false; + } + } + if (isThreadAvailable) { + executor.execute( + () -> { + try { + command.run(); + } catch (Throwable exc) { + if (suppressExceptions == false) { + // suppressExceptions is normally only set during + // testing. + handleMergeException(exc); + } + } finally { + activeCount.decrementAndGet(); + assert activeCount.get() >= 0 : "unexpected negative active count"; + } + }); + } else { + command.run(); + } + } } } diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java index 9b60b44473cc..5dcb0e416db3 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java @@ -3439,7 +3439,14 @@ public void addIndexesReaderMerge(MergePolicy.OneMerge merge) throws IOException } SegmentMerger merger = - new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context); + new SegmentMerger( + readers, + segInfo, + infoStream, + trackingDir, + globalFieldNumberMap, + context, + mergeScheduler.getIntraMergeExecutor(merge)); if (!merger.shouldMerge()) { return; @@ -5228,7 +5235,13 @@ public int length() { final SegmentMerger merger = new SegmentMerger( - mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context); + mergeReaders, + merge.info.info, + infoStream, + dirWrapper, + globalFieldNumberMap, + context, + mergeScheduler.getIntraMergeExecutor(merge)); merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get())); merge.checkAborted(); diff --git a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java index b8df032549be..273493bc22b6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergePolicy.java @@ -136,14 +136,6 @@ public boolean isAborted() { */ public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition) throws InterruptedException { - if (Thread.currentThread() != owner) { - throw new RuntimeException( - "Only the merge owner thread can call pauseNanos(). This thread: " - + Thread.currentThread().getName() - + ", owner thread: " - + owner); - } - long start = System.nanoTime(); AtomicLong timeUpdate = pauseTimesNS.get(reason); pauseLock.lock(); diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java index 92b046af9389..30ef47f7b931 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java @@ -39,7 +39,7 @@ public class MergeRateLimiter extends RateLimiter { private volatile double mbPerSec; private volatile long minPauseCheckBytes; - private long lastNS; + private AtomicLong lastNS = new AtomicLong(0); private AtomicLong totalBytesWritten = new AtomicLong(); @@ -89,7 +89,7 @@ public long pause(long bytes) throws MergePolicy.MergeAbortedException { // is changed while we were pausing: long paused = 0; long delta; - while ((delta = maybePause(bytes, System.nanoTime())) >= 0) { + while ((delta = maybePause(bytes)) >= 0) { // Keep waiting. paused += delta; } @@ -112,30 +112,45 @@ public long getTotalPausedNS() { * applied. If the thread needs pausing, this method delegates to the linked {@link * OneMergeProgress}. */ - private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException { + private long maybePause(long bytes) throws MergePolicy.MergeAbortedException { // Now is a good time to abort the merge: if (mergeProgress.isAborted()) { throw new MergePolicy.MergeAbortedException("Merge aborted."); } - double rate = mbPerSec; // read from volatile rate once. - double secondsToPause = (bytes / 1024. / 1024.) / rate; - - // Time we should sleep until; this is purely instantaneous - // rate (just adds seconds onto the last time we had paused to); - // maybe we should also offer decayed recent history one? - long targetNS = lastNS + (long) (1000000000 * secondsToPause); - - long curPauseNS = targetNS - curNS; - - // We don't bother with thread pausing if the pause is smaller than 2 msec. - if (curPauseNS <= MIN_PAUSE_NS) { - // Set to curNS, not targetNS, to enforce the instant rate, not - // the "averaged over all history" rate: - lastNS = curNS; + final double rate = mbPerSec; // read from volatile rate once. + final double secondsToPause = (bytes / 1024. / 1024.) / rate; + + AtomicLong curPauseNSSetter = new AtomicLong(); + // While we use updateAndGet to avoid a race condition between multiple threads, this doesn't + // mean + // that multiple threads will end up getting paused at the same time. + // We only pause the calling thread. This means if the upstream caller (e.g. + // ConcurrentMergeScheduler) + // is using multiple intra-threads, they will all be paused independently. + lastNS.updateAndGet( + last -> { + long curNS = System.nanoTime(); + // Time we should sleep until; this is purely instantaneous + // rate (just adds seconds onto the last time we had paused to); + // maybe we should also offer decayed recent history one? + long targetNS = last + (long) (1000000000 * secondsToPause); + long curPauseNS = targetNS - curNS; + // We don't bother with thread pausing if the pause is smaller than 2 msec. + if (curPauseNS <= MIN_PAUSE_NS) { + // Set to curNS, not targetNS, to enforce the instant rate, not + // the "averaged over all history" rate: + curPauseNSSetter.set(0); + return curNS; + } + curPauseNSSetter.set(curPauseNS); + return last; + }); + + if (curPauseNSSetter.get() == 0) { return -1; } - + long curPauseNS = curPauseNSSetter.get(); // Defensive: don't sleep for too long; the loop above will call us again if // we should keep sleeping and the rate may be adjusted in between. if (curPauseNS > MAX_PAUSE_NS) { diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java index 101720488a9e..fb8286b31bd6 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeScheduler.java @@ -18,10 +18,13 @@ import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import org.apache.lucene.index.MergePolicy.OneMerge; import org.apache.lucene.store.Directory; import org.apache.lucene.store.RateLimitedIndexOutput; import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.SameThreadExecutorService; /** * Expert: {@link IndexWriter} uses an instance implementing this interface to execute the merges @@ -32,6 +35,8 @@ */ public abstract class MergeScheduler implements Closeable { + private final ExecutorService executor = new SameThreadExecutorService(); + /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ protected MergeScheduler() {} @@ -52,9 +57,20 @@ public Directory wrapForMerge(OneMerge merge, Directory in) { return in; } + /** + * Provides an executor for parallelism during a single merge operation. By default, the method + * returns a {@link SameThreadExecutorService} where all intra-merge actions occur in their + * calling thread. + */ + public Executor getIntraMergeExecutor(OneMerge merge) { + return executor; + } + /** Close this MergeScheduler. */ @Override - public abstract void close() throws IOException; + public void close() throws IOException { + executor.shutdown(); + } /** For messages about merge scheduling */ protected InfoStream infoStream; diff --git a/lucene/core/src/java/org/apache/lucene/index/MergeState.java b/lucene/core/src/java/org/apache/lucene/index/MergeState.java index 6153a57693b3..de3c8d8e4161 100644 --- a/lucene/core/src/java/org/apache/lucene/index/MergeState.java +++ b/lucene/core/src/java/org/apache/lucene/index/MergeState.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.FieldsProducer; @@ -84,15 +85,23 @@ public class MergeState { /** InfoStream for debugging messages. */ public final InfoStream infoStream; + /** Executor for intra merge activity */ + public final Executor intraMergeTaskExecutor; + /** Indicates if the index needs to be sorted * */ public boolean needsIndexSort; /** Sole constructor. */ - MergeState(List readers, SegmentInfo segmentInfo, InfoStream infoStream) + MergeState( + List readers, + SegmentInfo segmentInfo, + InfoStream infoStream, + Executor intraMergeTaskExecutor) throws IOException { verifyIndexSort(readers, segmentInfo); this.infoStream = infoStream; int numReaders = readers.size(); + this.intraMergeTaskExecutor = intraMergeTaskExecutor; maxDocs = new int[numReaders]; fieldsProducers = new FieldsProducer[numReaders]; diff --git a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java index 0142288d383c..fd6d2a8e81be 100644 --- a/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java +++ b/lucene/core/src/java/org/apache/lucene/index/NoMergeScheduler.java @@ -16,6 +16,7 @@ */ package org.apache.lucene.index; +import java.util.concurrent.Executor; import org.apache.lucene.index.MergePolicy.OneMerge; import org.apache.lucene.store.Directory; @@ -52,4 +53,9 @@ public Directory wrapForMerge(OneMerge merge, Directory in) { public MergeScheduler clone() { return this; } + + @Override + public Executor getIntraMergeExecutor(OneMerge merge) { + throw new UnsupportedOperationException("NoMergeScheduler does not support merges"); + } } diff --git a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java index 7fc2f046a656..4589f7ec53ed 100644 --- a/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java +++ b/lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java @@ -17,7 +17,10 @@ package org.apache.lucene.index; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesConsumer; @@ -28,6 +31,7 @@ import org.apache.lucene.codecs.PointsWriter; import org.apache.lucene.codecs.StoredFieldsWriter; import org.apache.lucene.codecs.TermVectorsWriter; +import org.apache.lucene.search.TaskExecutor; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.InfoStream; @@ -56,13 +60,14 @@ final class SegmentMerger { InfoStream infoStream, Directory dir, FieldInfos.FieldNumbers fieldNumbers, - IOContext context) + IOContext context, + Executor intraMergeTaskExecutor) throws IOException { if (context.context != IOContext.Context.MERGE) { throw new IllegalArgumentException( "IOContext.context should be MERGE; got: " + context.context); } - mergeState = new MergeState(readers, segmentInfo, infoStream); + mergeState = new MergeState(readers, segmentInfo, infoStream, intraMergeTaskExecutor); directory = dir; this.codec = segmentInfo.getCodec(); this.context = context; @@ -130,19 +135,36 @@ MergeState merge() throws IOException { IOContext.READ, segmentWriteState.segmentSuffix); - if (mergeState.mergeFieldInfos.hasNorms()) { - mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged); - } + TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor); + List> mergingTasks = new ArrayList<>(); + mergingTasks.add( + () -> { + if (mergeState.mergeFieldInfos.hasNorms()) { + mergeWithLogging( + this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged); + } - mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged); + mergeWithLogging( + this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged); + return null; + }); if (mergeState.mergeFieldInfos.hasDocValues()) { - mergeWithLogging( - this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged); + mergingTasks.add( + () -> { + mergeWithLogging( + this::mergeDocValues, segmentWriteState, segmentReadState, "doc values", numMerged); + return null; + }); } if (mergeState.mergeFieldInfos.hasPointValues()) { - mergeWithLogging(this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged); + mergingTasks.add( + () -> { + mergeWithLogging( + this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged); + return null; + }); } if (mergeState.mergeFieldInfos.hasVectorValues()) { @@ -155,9 +177,14 @@ MergeState merge() throws IOException { } if (mergeState.mergeFieldInfos.hasVectors()) { - mergeWithLogging(this::mergeTermVectors, "term vectors"); + mergingTasks.add( + () -> { + mergeWithLogging(this::mergeTermVectors, "term vectors"); + return null; + }); } + taskExecutor.invokeAll(mergingTasks); // write the merged infos mergeWithLogging( this::mergeFieldInfos, segmentWriteState, segmentReadState, "field infos", numMerged); diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java index 67faf8c67399..889c44ce04d8 100644 --- a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java +++ b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswQuantizedVectorsFormat.java @@ -181,9 +181,6 @@ public void testLimits() { expectThrows( IllegalArgumentException.class, () -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 0, 0.8f, null)); - expectThrows( - IllegalArgumentException.class, - () -> new Lucene99HnswScalarQuantizedVectorsFormat(20, 100, 100, null, null)); expectThrows( IllegalArgumentException.class, () -> diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java index c444802679a8..493b2cd5b921 100644 --- a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java +++ b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestLucene99HnswVectorsFormat.java @@ -49,8 +49,6 @@ public void testLimits() { expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, -1)); expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(512 + 1, 20)); expectThrows(IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 3201)); - expectThrows( - IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 100, null)); expectThrows( IllegalArgumentException.class, () -> new Lucene99HnswVectorsFormat(20, 100, 1, new SameThreadExecutorService())); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java index 4cb1b11d8858..ec2edbe44538 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java @@ -20,17 +20,21 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; +import org.apache.lucene.document.KnnFloatVectorField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriterConfig.OpenMode; @@ -43,6 +47,10 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.SameThreadExecutorService; +import org.apache.lucene.util.StringHelper; +import org.apache.lucene.util.SuppressForbidden; +import org.apache.lucene.util.Version; public class TestConcurrentMergeScheduler extends LuceneTestCase { @@ -90,12 +98,22 @@ protected boolean isOK(Throwable th) { || (th instanceof IllegalStateException && th.getMessage().contains("this writer hit an unrecoverable error")); } + + @Override + // override here to ensure even tiny merges get the parallel executor + public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) { + assert intraMergeExecutor != null : "intraMergeExecutor is not initialized"; + return intraMergeExecutor; + } }); } IndexWriter writer = new IndexWriter(directory, iwc); Document doc = new Document(); Field idField = newStringField("id", "", Field.Store.YES); + KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f}); doc.add(idField); + // Add knn float vectors to test parallel merge + doc.add(knnField); outer: for (int i = 0; i < 10; i++) { @@ -105,6 +123,7 @@ protected boolean isOK(Throwable th) { for (int j = 0; j < 20; j++) { idField.setStringValue(Integer.toString(i * 20 + j)); + knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()}); writer.addDocument(doc); } @@ -226,23 +245,35 @@ public void testNoWaitClose() throws IOException { Directory directory = newDirectory(); Document doc = new Document(); Field idField = newStringField("id", "", Field.Store.YES); + KnnFloatVectorField knnField = new KnnFloatVectorField("knn", new float[] {0.0f, 0.0f}); doc.add(idField); + doc.add(knnField); + IndexWriterConfig iwc = + newIndexWriterConfig(new MockAnalyzer(random())) + // Force excessive merging: + .setMaxBufferedDocs(2) + .setMergePolicy(newLogMergePolicy(100)) + .setCommitOnClose(false); + if (iwc.getMergeScheduler() instanceof ConcurrentMergeScheduler) { + iwc.setMergeScheduler( + new ConcurrentMergeScheduler() { + @Override + // override here to ensure even tiny merges get the parallel executor + public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) { + assert intraMergeExecutor != null : "scaledExecutor is not initialized"; + return intraMergeExecutor; + } + }); + } - IndexWriter writer = - new IndexWriter( - directory, - newIndexWriterConfig(new MockAnalyzer(random())) - . - // Force excessive merging: - setMaxBufferedDocs(2) - .setMergePolicy(newLogMergePolicy(100)) - .setCommitOnClose(false)); + IndexWriter writer = new IndexWriter(directory, iwc); int numIters = TEST_NIGHTLY ? 10 : 3; for (int iter = 0; iter < numIters; iter++) { for (int j = 0; j < 201; j++) { idField.setStringValue(Integer.toString(iter * 201 + j)); + knnField.setVectorValue(new float[] {random().nextFloat(), random().nextFloat()}); writer.addDocument(doc); } @@ -363,6 +394,118 @@ protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) dir.close(); } + public void testSmallMergesDonNotGetThreads() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(new MockAnalyzer(random())); + iwc.setMaxBufferedDocs(2); + iwc.setMergeScheduler( + new ConcurrentMergeScheduler() { + @Override + protected void doMerge(MergeSource mergeSource, MergePolicy.OneMerge merge) + throws IOException { + assertTrue(this.getIntraMergeExecutor(merge) instanceof SameThreadExecutorService); + super.doMerge(mergeSource, merge); + } + }); + IndexWriter w = new IndexWriter(dir, iwc); + for (int i = 0; i < 10; i++) { + Document doc = new Document(); + doc.add(new StringField("id", "" + i, Field.Store.NO)); + w.addDocument(doc); + } + w.forceMerge(1); + w.close(); + dir.close(); + } + + @SuppressForbidden(reason = "Thread sleep") + public void testIntraMergeThreadPoolIsLimitedByMaxThreads() throws IOException { + ConcurrentMergeScheduler mergeScheduler = new ConcurrentMergeScheduler(); + MergeScheduler.MergeSource mergeSource = + new MergeScheduler.MergeSource() { + @Override + public MergePolicy.OneMerge getNextMerge() { + fail("should not be called"); + return null; + } + + @Override + public void onMergeFinished(MergePolicy.OneMerge merge) { + fail("should not be called"); + } + + @Override + public boolean hasPendingMerges() { + fail("should not be called"); + return false; + } + + @Override + public void merge(MergePolicy.OneMerge merge) throws IOException { + fail("should not be called"); + } + }; + try (Directory dir = newDirectory(); + mergeScheduler) { + MergePolicy.OneMerge merge = + new MergePolicy.OneMerge( + List.of( + new SegmentCommitInfo( + new SegmentInfo( + dir, + Version.LATEST, + null, + "test", + 0, + false, + false, + Codec.getDefault(), + Collections.emptyMap(), + StringHelper.randomId(), + new HashMap<>(), + null), + 0, + 0, + 0, + 0, + 0, + new byte[16]))); + mergeScheduler.initialize(InfoStream.NO_OUTPUT, dir); + mergeScheduler.setMaxMergesAndThreads(6, 6); + Executor executor = mergeScheduler.intraMergeExecutor; + AtomicInteger threadsExecutedOnPool = new AtomicInteger(); + AtomicInteger threadsExecutedOnSelf = new AtomicInteger(); + for (int i = 0; i < 4; i++) { + mergeScheduler.mergeThreads.add( + mergeScheduler.new MergeThread(mergeSource, merge) { + @Override + @SuppressForbidden(reason = "Thread sleep") + public void run() { + executor.execute( + () -> { + if (Thread.currentThread() == this) { + threadsExecutedOnSelf.incrementAndGet(); + } else { + threadsExecutedOnPool.incrementAndGet(); + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + }); + } + for (ConcurrentMergeScheduler.MergeThread thread : mergeScheduler.mergeThreads) { + thread.start(); + } + mergeScheduler.sync(); + assertEquals(3, threadsExecutedOnSelf.get()); + assertEquals(1, threadsExecutedOnPool.get()); + } + } + private static class TrackingCMS extends ConcurrentMergeScheduler { long totMergedBytes; CountDownLatch atLeastOneMerge; diff --git a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java index 4445a649772f..b3d8da8fef85 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestDoc.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestDoc.java @@ -45,6 +45,7 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.util.Bits; import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.SameThreadExecutorService; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; @@ -237,7 +238,8 @@ private SegmentCommitInfo merge( InfoStream.getDefault(), trackingDir, new FieldInfos.FieldNumbers(null, null, Version.LATEST.major), - context); + context, + new SameThreadExecutorService()); merger.merge(); r1.close(); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java index aea989720de0..04df466df521 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestIndexWriter.java @@ -41,6 +41,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -965,6 +966,7 @@ public void run() { @Override protected boolean isOK(Throwable th) { return th instanceof AlreadyClosedException + || th instanceof RejectedExecutionException || (th instanceof IllegalStateException && th.getMessage() .contains("this writer hit an unrecoverable error")); diff --git a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java index 3f09061c6e98..da6fb497e81e 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestSegmentMerger.java @@ -32,6 +32,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.InfoStream; +import org.apache.lucene.util.SameThreadExecutorService; import org.apache.lucene.util.StringHelper; import org.apache.lucene.util.Version; import org.apache.lucene.util.packed.PackedLongValues; @@ -105,7 +106,8 @@ public void testMerge() throws IOException { InfoStream.getDefault(), mergedDir, new FieldInfos.FieldNumbers(null, null, Version.LATEST.major), - newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1)))); + newIOContext(random(), new IOContext(new MergeInfo(-1, -1, false, -1))), + new SameThreadExecutorService()); MergeState mergeState = merger.merge(); int docsMerged = mergeState.segmentInfo.maxDoc(); assertTrue(docsMerged == 2);