Skip to content

Commit

Permalink
Revert primary permits changes and add hook
Browse files Browse the repository at this point in the history
We would like to avoid using directly primary permits for hollow
shards. So we revert relevant changes, and add a hook into the
function that gets a primary permit with the purpose of a plugin
being able to extend the behavior.

Relates ES-10537
  • Loading branch information
kingherc committed Jan 17, 2025
1 parent d19f3d3 commit af0481e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -349,4 +350,16 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) {
}
}
}

@Override
public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {
for (IndexEventListener listener : listeners) {
try {
listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier);
} catch (Exception e) {
logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e);
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;

import java.util.function.Supplier;

/**
* An index event listener is the primary extension point for plugins and build-in services
* to react / listen to per-index and per-shard events. These listeners are registered per-index
Expand Down Expand Up @@ -190,4 +192,14 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void>
* @param indexShard the shard that is recovering
*/
default void afterFilesRestoredFromRepository(IndexShard indexShard) {}

/**
* Called when a single primary permit is attempted to be acquired for the given shard (see
* {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}).
*
* @param indexShard the shard of which a primary permit is requested
* @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be
* completed in order for the permit to be given to the acquiring operation.
*/
default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {}
}
118 changes: 32 additions & 86 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.replication.PendingReplicationActions;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -189,7 +190,6 @@
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE;

public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {

Expand Down Expand Up @@ -779,28 +779,10 @@ public void relocated(
final String targetAllocationId,
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
final ActionListener<Void> listener
) throws IllegalIndexShardStateException, IllegalStateException {
relocated(targetNodeId, targetAllocationId, consumer, listener, null);
}

/**
* Provides an variant of {@link IndexShard#relocated(String, String, BiConsumer, ActionListener, Releasable)} with an option
* to relocate the shard under externally acquired primary permits.
*
* @param acquiredPrimaryPermits if null, waits until all the primary permits are acquired, otherwise it calls the consumer immediately
*/
public void relocated(
final String targetNodeId,
final String targetAllocationId,
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
final ActionListener<Void> listener,
@Nullable final Releasable acquiredPrimaryPermits
) throws IllegalIndexShardStateException, IllegalStateException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
assert acquiredPrimaryPermits == null || indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "external primary permits are provided but not held by the shard";
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
ActionListener<Releasable> onAcquired = new ActionListener<>() {
indexShardOperationPermits.blockOperations(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
boolean success = false;
Expand Down Expand Up @@ -878,13 +860,8 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}
};
if (acquiredPrimaryPermits == null) {
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
indexShardOperationPermits.blockOperations(onAcquired, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE);
} else {
ActionListener.completeWith(onAcquired, () -> acquiredPrimaryPermits);
}
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
// CancellableThreads and we want to be able to interrupt it
}
}

Expand Down Expand Up @@ -3592,100 +3569,69 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
);
}

/**
* Check to run before running the primary permit operation
*/
public enum PrimaryPermitCheck {
CHECK_PRIMARY_MODE,
/**
* IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards.
* Don't disable primary mode checks unless you're really sure.
*/
NONE
}

/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, Executor executorOnDelay) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
Executor executorOnDelay,
boolean forceExecution
) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
Executor executorOnDelay,
boolean forceExecution,
PrimaryPermitCheck primaryPermitCheck
) {
verifyNotClosed();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
indexShardOperationPermits.acquire(
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
executorOnDelay,
forceExecution
);

ActionListener<Releasable> onPermitAcquiredWrapped = onPermitAcquired.delegateFailureAndWrap((delegate, releasable) -> {
final ActionListener<Releasable> wrappedListener = indexShardOperationPermits.wrapContextPreservingActionListener(
delegate,
executorOnDelay,
forceExecution
);
try (var listeners = new RefCountingListener(wrappedListener.map(unused -> releasable))) {
indexEventListener.onAcquirePrimaryOperationPermit(this, () -> listeners.acquire());
}
});

indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquiredWrapped), executorOnDelay, forceExecution);
}

public boolean isPrimaryMode() {
assert indexShardOperationPermits.getActiveOperationsCount() != 0 : "must hold permit to check primary mode";
return replicationTracker.isPrimaryMode();
}

public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE);
}

/**
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
* It is the responsibility of the caller to close the {@link Releasable}.
*/
public void acquireAllPrimaryOperationsPermits(
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout,
final PrimaryPermitCheck primaryPermitCheck
) {
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

asyncBlockOperations(
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
timeout.duration(),
timeout.timeUnit()
);
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
}

/**
* Wraps the action to run on a primary after acquiring permit.
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
* executing the action.
*
* @param primaryPermitCheck check to run before the primary mode operation
* @param listener the listener to wrap
* @return the wrapped listener
*/
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(
final PrimaryPermitCheck primaryPermitCheck,
final ActionListener<Releasable> listener
) {
return switch (primaryPermitCheck) {
case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> {
if (isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
case NONE -> listener;
};
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
return listener.delegateFailure((l, r) -> {
if (isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
Expand Down Expand Up @@ -3723,7 +3669,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Except
runnable.run();
}
}, onFailure);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
}

private <E extends Exception> void bumpPrimaryTerm(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,32 +216,7 @@ private void innerAcquire(
try {
synchronized (this) {
if (queuedBlockOperations > 0) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<Releasable> wrappedListener;
if (executorOnDelay != null) {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired).delegateFailure(
(l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) {
@Override
public boolean isForceExecution() {
return forceExecution;
}

@Override
protected void doRun() {
listener.onResponse(r);
}

@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(r);
super.onRejection(e);
}
})
);
} else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
}
delayedOperations.add(wrappedListener);
delayedOperations.add(wrapContextPreservingActionListener(onAcquired, executorOnDelay, forceExecution));
return;
} else {
releasable = acquire();
Expand All @@ -255,6 +230,39 @@ public void onRejection(Exception e) {
onAcquired.onResponse(releasable);
}

public <T extends Closeable> ActionListener<T> wrapContextPreservingActionListener(
ActionListener<T> listener,
@Nullable final Executor executorOnDelay,
final boolean forceExecution
) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<T> wrappedListener;
if (executorOnDelay != null) {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener).delegateFailure(
(l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) {
@Override
public boolean isForceExecution() {
return forceExecution;
}

@Override
protected void doRun() {
listener.onResponse(r);
}

@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(r);
super.onRejection(e);
}
})
);
} else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener);
}
return wrappedListener;
}

private Releasable acquire() throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,21 +790,6 @@ public void onFailure(final Exception e) {
}
}, TimeValue.timeValueSeconds(30));
latch.await();

// It's possible to acquire permits if we skip the primary mode check
var permitAcquiredLatch = new CountDownLatch(1);
indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> {
r.close();
permitAcquiredLatch.countDown();
}, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE);
safeAwait(permitAcquiredLatch);

var allPermitsAcquiredLatch = new CountDownLatch(1);
indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
r.close();
allPermitsAcquiredLatch.countDown();
}, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE);
safeAwait(allPermitsAcquiredLatch);
}

if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
Expand Down

0 comments on commit af0481e

Please sign in to comment.