Skip to content

Commit

Permalink
Add new parallel merge task executor for parallel actions within a si…
Browse files Browse the repository at this point in the history
…ngle merge action (#13190)

This commit adds a new interface to all MergeScheduler classes that allows the scheduler to provide an Executor for intra-merge parallelism. The first sub-class to satisfy this new interface is the ConcurrentMergeScheduler (CMS). In particular, the CMS will limit the amount of parallelism based on the current number of mergeThreads and the configured maxThread options.

Parallelism is now added by default to the SegmentMerger where each merging task is executed in parallel with the others.

Additionally, the Lucene99 HNSW codec will use the provided MergeScheduler executor if no executor is provided directly.

Relates to: #12740
Relates to: #9626

This is a take 2 of: #13124
  • Loading branch information
benwtrent committed Mar 21, 2024
1 parent b062ed8 commit 6d908b8
Show file tree
Hide file tree
Showing 17 changed files with 409 additions and 87 deletions.
4 changes: 4 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ Improvements

* GITHUB#13066: Support getMaxScore of DisjunctionSumScorer for non top level scoring clause (Shintaro Murakami)

* GITHUB#13124: MergeScheduler can now provide an executor for intra-merge parallelism. The first
implementation is the ConcurrentMergeScheduler and the Lucene99HnswVectorsFormat will use it if no other
executor is provided. (Ben Trent)

Optimizations
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.KnnVectorsWriter;
import org.apache.lucene.codecs.lucene90.IndexedDISI;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -163,7 +165,8 @@ public Lucene99HnswVectorsFormat(int maxConn, int beamWidth) {
* @param numMergeWorkers number of workers (threads) that will be used when doing merge. If
* larger than 1, a non-null {@link ExecutorService} must be passed as mergeExec
* @param mergeExec the {@link ExecutorService} that will be used by ALL vector writers that are
* generated by this format to do the merge
* generated by this format to do the merge. If null, the configured {@link
* MergeScheduler#getIntraMergeExecutor(MergePolicy.OneMerge)} is used.
*/
public Lucene99HnswVectorsFormat(
int maxConn, int beamWidth, int numMergeWorkers, ExecutorService mergeExec) {
Expand All @@ -184,10 +187,6 @@ public Lucene99HnswVectorsFormat(
}
this.maxConn = maxConn;
this.beamWidth = beamWidth;
if (numMergeWorkers > 1 && mergeExec == null) {
throw new IllegalArgumentException(
"No executor service passed in when " + numMergeWorkers + " merge workers are requested");
}
if (numMergeWorkers == 1 && mergeExec != null) {
throw new IllegalArgumentException(
"No executor service is needed as we'll use single thread to merge");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,14 @@ public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOE
int[][] vectorIndexNodeOffsets = null;
if (scorerSupplier.totalVectorCount() > 0) {
// build graph
HnswGraphMerger merger = createGraphMerger(fieldInfo, scorerSupplier);
HnswGraphMerger merger =
createGraphMerger(
fieldInfo,
scorerSupplier,
mergeState.intraMergeTaskExecutor == null
? null
: new TaskExecutor(mergeState.intraMergeTaskExecutor),
numMergeWorkers);
for (int i = 0; i < mergeState.liveDocs.length; i++) {
merger.addReader(
mergeState.knnVectorsReaders[i], mergeState.docMaps[i], mergeState.liveDocs[i]);
Expand Down Expand Up @@ -496,11 +503,23 @@ private void writeMeta(
}

private HnswGraphMerger createGraphMerger(
FieldInfo fieldInfo, RandomVectorScorerSupplier scorerSupplier) {
FieldInfo fieldInfo,
RandomVectorScorerSupplier scorerSupplier,
TaskExecutor parallelMergeTaskExecutor,
int numParallelMergeWorkers) {
if (mergeExec != null) {
return new ConcurrentHnswMerger(
fieldInfo, scorerSupplier, M, beamWidth, mergeExec, numMergeWorkers);
}
if (parallelMergeTaskExecutor != null) {
return new ConcurrentHnswMerger(
fieldInfo,
scorerSupplier,
M,
beamWidth,
parallelMergeTaskExecutor,
numParallelMergeWorkers);
}
return new IncrementalHnswGraphMerger(fieldInfo, scorerSupplier, M, beamWidth);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.MergePolicy.OneMerge;
import org.apache.lucene.internal.tests.ConcurrentMergeSchedulerAccess;
import org.apache.lucene.internal.tests.TestSecrets;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
Expand Down Expand Up @@ -109,6 +112,9 @@ public class ConcurrentMergeScheduler extends MergeScheduler {

private double forceMergeMBPerSec = Double.POSITIVE_INFINITY;

/** The executor provided for intra-merge parallelization */
protected CachedExecutor intraMergeExecutor;

/** Sole constructor, with all settings set to default values. */
public ConcurrentMergeScheduler() {}

Expand Down Expand Up @@ -259,6 +265,16 @@ synchronized void removeMergeThread() {
assert false : "merge thread " + currentThread + " was not found";
}

@Override
public Executor getIntraMergeExecutor(OneMerge merge) {
assert intraMergeExecutor != null : "scaledExecutor is not initialized";
// don't do multithreaded merges for small merges
if (merge.estimatedMergeBytes < MIN_BIG_MERGE_MB * 1024 * 1024) {
return super.getIntraMergeExecutor(merge);
}
return intraMergeExecutor;
}

@Override
public Directory wrapForMerge(OneMerge merge, Directory in) {
Thread mergeThread = Thread.currentThread();
Expand All @@ -268,6 +284,9 @@ public Directory wrapForMerge(OneMerge merge, Directory in) {
}

// Return a wrapped Directory which has rate-limited output.
// Note: the rate limiter is only per thread. So, if there are multiple merge threads running
// and throttling is required, each thread will be throttled independently.
// The implication of this, is that the total IO rate could be higher than the target rate.
RateLimiter rateLimiter = ((MergeThread) mergeThread).rateLimiter;
return new FilterDirectory(in) {
@Override
Expand All @@ -279,14 +298,6 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
// somewhere that is failing to pass down the right IOContext:
assert context.context == IOContext.Context.MERGE : "got context=" + context.context;

// Because rateLimiter is bound to a particular merge thread, this method should
// always be called from that context. Verify this.
assert mergeThread == Thread.currentThread()
: "Not the same merge thread, current="
+ Thread.currentThread()
+ ", expected="
+ mergeThread;

return new RateLimitedIndexOutput(rateLimiter, in.createOutput(name, context));
}
};
Expand Down Expand Up @@ -445,8 +456,15 @@ private static String rateToString(double mbPerSec) {
}

@Override
public void close() {
sync();
public void close() throws IOException {
super.close();
try {
sync();
} finally {
if (intraMergeExecutor != null) {
intraMergeExecutor.shutdown();
}
}
}

/**
Expand Down Expand Up @@ -510,6 +528,9 @@ public synchronized int mergeThreadCount() {
void initialize(InfoStream infoStream, Directory directory) throws IOException {
super.initialize(infoStream, directory);
initDynamicDefaults(directory);
if (intraMergeExecutor == null) {
intraMergeExecutor = new CachedExecutor();
}
}

@Override
Expand Down Expand Up @@ -755,11 +776,16 @@ void clearSuppressExceptions() {

@Override
public String toString() {
StringBuilder sb = new StringBuilder(getClass().getSimpleName() + ": ");
sb.append("maxThreadCount=").append(maxThreadCount).append(", ");
sb.append("maxMergeCount=").append(maxMergeCount).append(", ");
sb.append("ioThrottle=").append(doAutoIOThrottle);
return sb.toString();
return getClass().getSimpleName()
+ ": "
+ "maxThreadCount="
+ maxThreadCount
+ ", "
+ "maxMergeCount="
+ maxMergeCount
+ ", "
+ "ioThrottle="
+ doAutoIOThrottle;
}

private boolean isBacklog(long now, OneMerge merge) {
Expand Down Expand Up @@ -902,12 +928,64 @@ private static String getSegmentName(MergePolicy.OneMerge merge) {
}

static {
TestSecrets.setConcurrentMergeSchedulerAccess(
new ConcurrentMergeSchedulerAccess() {
@Override
public void setSuppressExceptions(ConcurrentMergeScheduler cms) {
cms.setSuppressExceptions();
}
});
TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions);
}

/**
* This executor provides intra-merge threads for parallel execution of merge tasks. It provides a
* limited number of threads to execute merge tasks. In particular, if the number of
* `mergeThreads` is equal to `maxThreadCount`, then the executor will execute the merge task in
* the calling thread.
*/
private class CachedExecutor implements Executor {

private final AtomicInteger activeCount = new AtomicInteger(0);
private final ThreadPoolExecutor executor;

public CachedExecutor() {
this.executor =
new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>());
}

void shutdown() {
executor.shutdown();
}

@Override
public void execute(Runnable command) {
final boolean isThreadAvailable;
// we need to check if a thread is available before submitting the task to the executor
// synchronize on CMS to get an accurate count of current threads
synchronized (ConcurrentMergeScheduler.this) {
int max = maxThreadCount - mergeThreads.size() - 1;
int value = activeCount.get();
if (value < max) {
activeCount.incrementAndGet();
assert activeCount.get() > 0 : "active count must be greater than 0 after increment";
isThreadAvailable = true;
} else {
isThreadAvailable = false;
}
}
if (isThreadAvailable) {
executor.execute(
() -> {
try {
command.run();
} catch (Throwable exc) {
if (suppressExceptions == false) {
// suppressExceptions is normally only set during
// testing.
handleMergeException(exc);
}
} finally {
activeCount.decrementAndGet();
assert activeCount.get() >= 0 : "unexpected negative active count";
}
});
} else {
command.run();
}
}
}
}
17 changes: 15 additions & 2 deletions lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3439,7 +3439,14 @@ public void addIndexesReaderMerge(MergePolicy.OneMerge merge) throws IOException
}

SegmentMerger merger =
new SegmentMerger(readers, segInfo, infoStream, trackingDir, globalFieldNumberMap, context);
new SegmentMerger(
readers,
segInfo,
infoStream,
trackingDir,
globalFieldNumberMap,
context,
mergeScheduler.getIntraMergeExecutor(merge));

if (!merger.shouldMerge()) {
return;
Expand Down Expand Up @@ -5228,7 +5235,13 @@ public int length() {

final SegmentMerger merger =
new SegmentMerger(
mergeReaders, merge.info.info, infoStream, dirWrapper, globalFieldNumberMap, context);
mergeReaders,
merge.info.info,
infoStream,
dirWrapper,
globalFieldNumberMap,
context,
mergeScheduler.getIntraMergeExecutor(merge));
merge.info.setSoftDelCount(Math.toIntExact(softDeleteCount.get()));
merge.checkAborted();

Expand Down
8 changes: 0 additions & 8 deletions lucene/core/src/java/org/apache/lucene/index/MergePolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,6 @@ public boolean isAborted() {
*/
public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition)
throws InterruptedException {
if (Thread.currentThread() != owner) {
throw new RuntimeException(
"Only the merge owner thread can call pauseNanos(). This thread: "
+ Thread.currentThread().getName()
+ ", owner thread: "
+ owner);
}

long start = System.nanoTime();
AtomicLong timeUpdate = pauseTimesNS.get(reason);
pauseLock.lock();
Expand Down
53 changes: 34 additions & 19 deletions lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class MergeRateLimiter extends RateLimiter {
private volatile double mbPerSec;
private volatile long minPauseCheckBytes;

private long lastNS;
private AtomicLong lastNS = new AtomicLong(0);

private AtomicLong totalBytesWritten = new AtomicLong();

Expand Down Expand Up @@ -89,7 +89,7 @@ public long pause(long bytes) throws MergePolicy.MergeAbortedException {
// is changed while we were pausing:
long paused = 0;
long delta;
while ((delta = maybePause(bytes, System.nanoTime())) >= 0) {
while ((delta = maybePause(bytes)) >= 0) {
// Keep waiting.
paused += delta;
}
Expand All @@ -112,30 +112,45 @@ public long getTotalPausedNS() {
* applied. If the thread needs pausing, this method delegates to the linked {@link
* OneMergeProgress}.
*/
private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedException {
private long maybePause(long bytes) throws MergePolicy.MergeAbortedException {
// Now is a good time to abort the merge:
if (mergeProgress.isAborted()) {
throw new MergePolicy.MergeAbortedException("Merge aborted.");
}

double rate = mbPerSec; // read from volatile rate once.
double secondsToPause = (bytes / 1024. / 1024.) / rate;

// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
// maybe we should also offer decayed recent history one?
long targetNS = lastNS + (long) (1000000000 * secondsToPause);

long curPauseNS = targetNS - curNS;

// We don't bother with thread pausing if the pause is smaller than 2 msec.
if (curPauseNS <= MIN_PAUSE_NS) {
// Set to curNS, not targetNS, to enforce the instant rate, not
// the "averaged over all history" rate:
lastNS = curNS;
final double rate = mbPerSec; // read from volatile rate once.
final double secondsToPause = (bytes / 1024. / 1024.) / rate;

AtomicLong curPauseNSSetter = new AtomicLong();
// While we use updateAndGet to avoid a race condition between multiple threads, this doesn't
// mean
// that multiple threads will end up getting paused at the same time.
// We only pause the calling thread. This means if the upstream caller (e.g.
// ConcurrentMergeScheduler)
// is using multiple intra-threads, they will all be paused independently.
lastNS.updateAndGet(
last -> {
long curNS = System.nanoTime();
// Time we should sleep until; this is purely instantaneous
// rate (just adds seconds onto the last time we had paused to);
// maybe we should also offer decayed recent history one?
long targetNS = last + (long) (1000000000 * secondsToPause);
long curPauseNS = targetNS - curNS;
// We don't bother with thread pausing if the pause is smaller than 2 msec.
if (curPauseNS <= MIN_PAUSE_NS) {
// Set to curNS, not targetNS, to enforce the instant rate, not
// the "averaged over all history" rate:
curPauseNSSetter.set(0);
return curNS;
}
curPauseNSSetter.set(curPauseNS);
return last;
});

if (curPauseNSSetter.get() == 0) {
return -1;
}

long curPauseNS = curPauseNSSetter.get();
// Defensive: don't sleep for too long; the loop above will call us again if
// we should keep sleeping and the rate may be adjusted in between.
if (curPauseNS > MAX_PAUSE_NS) {
Expand Down
Loading

0 comments on commit 6d908b8

Please sign in to comment.