diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 250cb81183899..f3d58fe4b051f 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.function.Consumer; +import java.util.function.Supplier; import static org.elasticsearch.core.Strings.format; @@ -349,4 +350,16 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) { } } } + + @Override + public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier> onPermitAcquiredListenerSupplier) { + for (IndexEventListener listener : listeners) { + try { + listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier); + } catch (Exception e) { + logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e); + throw e; + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java index 4e55a2e9599d5..93270e82c60a2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java @@ -17,6 +17,8 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; +import java.util.function.Supplier; + /** * An index event listener is the primary extension point for plugins and build-in services * to react / listen to per-index and per-shard events. These listeners are registered per-index @@ -190,4 +192,14 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener * @param indexShard the shard that is recovering */ default void afterFilesRestoredFromRepository(IndexShard indexShard) {} + + /** + * Called when a single primary permit is attempted to be acquired for the given shard (see + * {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}). + * + * @param indexShard the shard of which a primary permit is requested + * @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be + * completed in order for the permit to be given to the acquiring operation. + */ + default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier> onPermitAcquiredListenerSupplier) {} } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f52ea41d811c0..ab1c936d1c469 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.replication.PendingReplicationActions; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -189,7 +190,6 @@ import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; -import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { @@ -779,28 +779,10 @@ public void relocated( final String targetAllocationId, final BiConsumer> consumer, final ActionListener listener - ) throws IllegalIndexShardStateException, IllegalStateException { - relocated(targetNodeId, targetAllocationId, consumer, listener, null); - } - - /** - * Provides an variant of {@link IndexShard#relocated(String, String, BiConsumer, ActionListener, Releasable)} with an option - * to relocate the shard under externally acquired primary permits. - * - * @param acquiredPrimaryPermits if null, waits until all the primary permits are acquired, otherwise it calls the consumer immediately - */ - public void relocated( - final String targetNodeId, - final String targetAllocationId, - final BiConsumer> consumer, - final ActionListener listener, - @Nullable final Releasable acquiredPrimaryPermits ) throws IllegalIndexShardStateException, IllegalStateException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; - assert acquiredPrimaryPermits == null || indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED - : "external primary permits are provided but not held by the shard"; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { - ActionListener onAcquired = new ActionListener<>() { + indexShardOperationPermits.blockOperations(new ActionListener<>() { @Override public void onResponse(Releasable releasable) { boolean success = false; @@ -878,13 +860,8 @@ public void onFailure(Exception e) { listener.onFailure(e); } } - }; - if (acquiredPrimaryPermits == null) { - // Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it - indexShardOperationPermits.blockOperations(onAcquired, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); - } else { - ActionListener.completeWith(onAcquired, () -> acquiredPrimaryPermits); - } + }, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by + // CancellableThreads and we want to be able to interrupt it } } @@ -3592,48 +3569,35 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { ); } - /** - * Check to run before running the primary permit operation - */ - public enum PrimaryPermitCheck { - CHECK_PRIMARY_MODE, - /** - * IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards. - * Don't disable primary mode checks unless you're really sure. - */ - NONE - } - /** * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided * ActionListener will then be called using the provided executor. */ public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, Executor executorOnDelay) { - acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE); + acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false); } public void acquirePrimaryOperationPermit( ActionListener onPermitAcquired, Executor executorOnDelay, boolean forceExecution - ) { - acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE); - } - - public void acquirePrimaryOperationPermit( - ActionListener onPermitAcquired, - Executor executorOnDelay, - boolean forceExecution, - PrimaryPermitCheck primaryPermitCheck ) { verifyNotClosed(); assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; - indexShardOperationPermits.acquire( - wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired), - executorOnDelay, - forceExecution - ); + + ActionListener onPermitAcquiredWrapped = onPermitAcquired.delegateFailureAndWrap((delegate, releasable) -> { + final ActionListener wrappedListener = indexShardOperationPermits.wrapContextPreservingActionListener( + delegate, + executorOnDelay, + forceExecution + ); + try (var listeners = new RefCountingListener(wrappedListener.map(unused -> releasable))) { + indexEventListener.onAcquirePrimaryOperationPermit(this, () -> listeners.acquire()); + } + }); + + indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquiredWrapped), executorOnDelay, forceExecution); } public boolean isPrimaryMode() { @@ -3641,51 +3605,33 @@ public boolean isPrimaryMode() { return replicationTracker.isPrimaryMode(); } - public void acquireAllPrimaryOperationsPermits(final ActionListener onPermitAcquired, final TimeValue timeout) { - acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE); - } - /** * Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called. * It is the responsibility of the caller to close the {@link Releasable}. */ - public void acquireAllPrimaryOperationsPermits( - final ActionListener onPermitAcquired, - final TimeValue timeout, - final PrimaryPermitCheck primaryPermitCheck - ) { + public void acquireAllPrimaryOperationsPermits(final ActionListener onPermitAcquired, final TimeValue timeout) { verifyNotClosed(); assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting; - asyncBlockOperations( - wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired), - timeout.duration(), - timeout.timeUnit() - ); + asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit()); } /** - * Wraps the action to run on a primary after acquiring permit. + * Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before + * executing the action. * - * @param primaryPermitCheck check to run before the primary mode operation * @param listener the listener to wrap * @return the wrapped listener */ - private ActionListener wrapPrimaryOperationPermitListener( - final PrimaryPermitCheck primaryPermitCheck, - final ActionListener listener - ) { - return switch (primaryPermitCheck) { - case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> { - if (isPrimaryMode()) { - l.onResponse(r); - } else { - r.close(); - l.onFailure(new ShardNotInPrimaryModeException(shardId, state)); - } - }); - case NONE -> listener; - }; + private ActionListener wrapPrimaryOperationPermitListener(final ActionListener listener) { + return listener.delegateFailure((l, r) -> { + if (isPrimaryMode()) { + l.onResponse(r); + } else { + r.close(); + l.onFailure(new ShardNotInPrimaryModeException(shardId, state)); + } + }); } private void asyncBlockOperations(ActionListener onPermitAcquired, long timeout, TimeUnit timeUnit) { @@ -3723,7 +3669,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer void bumpPrimaryTerm( diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 94ac4f4aca096..79f5d054df30d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -216,32 +216,7 @@ private void innerAcquire( try { synchronized (this) { if (queuedBlockOperations > 0) { - final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); - final ActionListener wrappedListener; - if (executorOnDelay != null) { - wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired).delegateFailure( - (l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) { - @Override - public boolean isForceExecution() { - return forceExecution; - } - - @Override - protected void doRun() { - listener.onResponse(r); - } - - @Override - public void onRejection(Exception e) { - IOUtils.closeWhileHandlingException(r); - super.onRejection(e); - } - }) - ); - } else { - wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired); - } - delayedOperations.add(wrappedListener); + delayedOperations.add(wrapContextPreservingActionListener(onAcquired, executorOnDelay, forceExecution)); return; } else { releasable = acquire(); @@ -255,6 +230,39 @@ public void onRejection(Exception e) { onAcquired.onResponse(releasable); } + public ActionListener wrapContextPreservingActionListener( + ActionListener listener, + @Nullable final Executor executorOnDelay, + final boolean forceExecution + ) { + final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); + final ActionListener wrappedListener; + if (executorOnDelay != null) { + wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener).delegateFailure( + (l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) { + @Override + public boolean isForceExecution() { + return forceExecution; + } + + @Override + protected void doRun() { + listener.onResponse(r); + } + + @Override + public void onRejection(Exception e) { + IOUtils.closeWhileHandlingException(r); + super.onRejection(e); + } + }) + ); + } else { + wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener); + } + return wrappedListener; + } + private Releasable acquire() throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0bbf63b07c4cb..7d436ab5d8d22 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -790,21 +790,6 @@ public void onFailure(final Exception e) { } }, TimeValue.timeValueSeconds(30)); latch.await(); - - // It's possible to acquire permits if we skip the primary mode check - var permitAcquiredLatch = new CountDownLatch(1); - indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> { - r.close(); - permitAcquiredLatch.countDown(); - }, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE); - safeAwait(permitAcquiredLatch); - - var allPermitsAcquiredLatch = new CountDownLatch(1); - indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> { - r.close(); - allPermitsAcquiredLatch.countDown(); - }, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE); - safeAwait(allPermitsAcquiredLatch); } if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {