Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert primary permits changes and add hook #120398

Merged
merged 2 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -349,4 +350,16 @@ public void afterFilesRestoredFromRepository(IndexShard indexShard) {
}
}
}

@Override
public void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {
for (IndexEventListener listener : listeners) {
try {
listener.onAcquirePrimaryOperationPermit(indexShard, onPermitAcquiredListenerSupplier);
} catch (Exception e) {
logger.warn(() -> "[" + indexShard.shardId() + "] failed to invoke the listener on acquiring a primary permit", e);
throw e;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;

import java.util.function.Supplier;

/**
* An index event listener is the primary extension point for plugins and build-in services
* to react / listen to per-index and per-shard events. These listeners are registered per-index
Expand Down Expand Up @@ -190,4 +192,14 @@ default void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void>
* @param indexShard the shard that is recovering
*/
default void afterFilesRestoredFromRepository(IndexShard indexShard) {}

/**
* Called when a single primary permit is acquired for the given shard (see
* {@link IndexShard#acquirePrimaryOperationPermit(ActionListener, java.util.concurrent.Executor)}).
*
* @param indexShard the shard of which a primary permit is requested
* @param onPermitAcquiredListenerSupplier call this immediately to get a listener when the permit is acquired. The listener must be
* completed in order for the permit to be given to the acquiring operation.
*/
default void onAcquirePrimaryOperationPermit(IndexShard indexShard, Supplier<ActionListener<Void>> onPermitAcquiredListenerSupplier) {}
kingherc marked this conversation as resolved.
Show resolved Hide resolved
}
118 changes: 32 additions & 86 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.replication.PendingReplicationActions;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -189,7 +190,6 @@
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.shard.IndexShard.PrimaryPermitCheck.CHECK_PRIMARY_MODE;

public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {

Expand Down Expand Up @@ -779,28 +779,10 @@ public void relocated(
final String targetAllocationId,
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
final ActionListener<Void> listener
) throws IllegalIndexShardStateException, IllegalStateException {
relocated(targetNodeId, targetAllocationId, consumer, listener, null);
}

/**
* Provides an variant of {@link IndexShard#relocated(String, String, BiConsumer, ActionListener, Releasable)} with an option
* to relocate the shard under externally acquired primary permits.
*
* @param acquiredPrimaryPermits if null, waits until all the primary permits are acquired, otherwise it calls the consumer immediately
*/
public void relocated(
final String targetNodeId,
final String targetAllocationId,
final BiConsumer<ReplicationTracker.PrimaryContext, ActionListener<Void>> consumer,
final ActionListener<Void> listener,
@Nullable final Releasable acquiredPrimaryPermits
) throws IllegalIndexShardStateException, IllegalStateException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
assert acquiredPrimaryPermits == null || indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "external primary permits are provided but not held by the shard";
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
ActionListener<Releasable> onAcquired = new ActionListener<>() {
indexShardOperationPermits.blockOperations(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
boolean success = false;
Expand Down Expand Up @@ -878,13 +860,8 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}
};
if (acquiredPrimaryPermits == null) {
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
indexShardOperationPermits.blockOperations(onAcquired, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE);
} else {
ActionListener.completeWith(onAcquired, () -> acquiredPrimaryPermits);
}
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
// CancellableThreads and we want to be able to interrupt it
}
}

Expand Down Expand Up @@ -3592,100 +3569,69 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
);
}

/**
* Check to run before running the primary permit operation
*/
public enum PrimaryPermitCheck {
CHECK_PRIMARY_MODE,
/**
* IMPORTANT: Currently intented to be used only for acquiring primary permits during the recovery of hollow shards.
* Don't disable primary mode checks unless you're really sure.
*/
NONE
}

/**
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, Executor executorOnDelay) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
Executor executorOnDelay,
boolean forceExecution
) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, forceExecution, CHECK_PRIMARY_MODE);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
Executor executorOnDelay,
boolean forceExecution,
PrimaryPermitCheck primaryPermitCheck
) {
verifyNotClosed();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
indexShardOperationPermits.acquire(
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
executorOnDelay,
forceExecution
);

ActionListener<Releasable> onPermitAcquiredWrapped = onPermitAcquired.delegateFailureAndWrap((delegate, releasable) -> {
final ActionListener<Releasable> wrappedListener = indexShardOperationPermits.wrapContextPreservingActionListener(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do the context preserving in all cases? I don't think so? i.e. during normal operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fcofdez I'm not familiar with what operations need the context. I saw it in IndexShardOperationPermits.java and assumed it's needed here as well.

Note that before this PR, the IndexShardOperationPermits.java was the one potentially triggering the final listener of the operation. However, with this PR, we potentially add another blocking step in the plugin (the additional RefCountingListener here). So we can't be sure whatever thread finally triggers the RefCountingListener has the context, thus I elected to made sure we preserve the context as well to "replicate" the original behavior of IndexShardOperationPermits.java.

After this explanation, do you believe we can safely skip the context?

delegate,
executorOnDelay,
forceExecution
);
try (var listeners = new RefCountingListener(wrappedListener.map(unused -> releasable))) {
kingherc marked this conversation as resolved.
Show resolved Hide resolved
indexEventListener.onAcquirePrimaryOperationPermit(this, () -> listeners.acquire());
}
});

indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquiredWrapped), executorOnDelay, forceExecution);
}

public boolean isPrimaryMode() {
assert indexShardOperationPermits.getActiveOperationsCount() != 0 : "must hold permit to check primary mode";
return replicationTracker.isPrimaryMode();
}

public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
acquireAllPrimaryOperationsPermits(onPermitAcquired, timeout, CHECK_PRIMARY_MODE);
}

/**
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
* It is the responsibility of the caller to close the {@link Releasable}.
*/
public void acquireAllPrimaryOperationsPermits(
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout,
final PrimaryPermitCheck primaryPermitCheck
) {
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

asyncBlockOperations(
wrapPrimaryOperationPermitListener(primaryPermitCheck, onPermitAcquired),
timeout.duration(),
timeout.timeUnit()
);
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
}

/**
* Wraps the action to run on a primary after acquiring permit.
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
* executing the action.
*
* @param primaryPermitCheck check to run before the primary mode operation
* @param listener the listener to wrap
* @return the wrapped listener
*/
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(
final PrimaryPermitCheck primaryPermitCheck,
final ActionListener<Releasable> listener
) {
return switch (primaryPermitCheck) {
case CHECK_PRIMARY_MODE -> listener.delegateFailure((l, r) -> {
if (isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
case NONE -> listener;
};
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
return listener.delegateFailure((l, r) -> {
if (isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
Expand Down Expand Up @@ -3723,7 +3669,7 @@ public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Except
runnable.run();
}
}, onFailure);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false, CHECK_PRIMARY_MODE);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
}

private <E extends Exception> void bumpPrimaryTerm(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,32 +216,7 @@ private void innerAcquire(
try {
synchronized (this) {
if (queuedBlockOperations > 0) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<Releasable> wrappedListener;
if (executorOnDelay != null) {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired).delegateFailure(
(l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) {
@Override
public boolean isForceExecution() {
return forceExecution;
}

@Override
protected void doRun() {
listener.onResponse(r);
}

@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(r);
super.onRejection(e);
}
})
);
} else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
}
delayedOperations.add(wrappedListener);
delayedOperations.add(wrapContextPreservingActionListener(onAcquired, executorOnDelay, forceExecution));
return;
} else {
releasable = acquire();
Expand All @@ -255,6 +230,39 @@ public void onRejection(Exception e) {
onAcquired.onResponse(releasable);
}

public <T extends Closeable> ActionListener<T> wrapContextPreservingActionListener(
ActionListener<T> listener,
@Nullable final Executor executorOnDelay,
final boolean forceExecution
) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<T> wrappedListener;
if (executorOnDelay != null) {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener).delegateFailure(
(l, r) -> executorOnDelay.execute(new ActionRunnable<>(l) {
@Override
public boolean isForceExecution() {
return forceExecution;
}

@Override
protected void doRun() {
listener.onResponse(r);
}

@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(r);
super.onRejection(e);
}
})
);
} else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, listener);
}
return wrappedListener;
}

private Releasable acquire() throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,21 +790,6 @@ public void onFailure(final Exception e) {
}
}, TimeValue.timeValueSeconds(30));
latch.await();

// It's possible to acquire permits if we skip the primary mode check
var permitAcquiredLatch = new CountDownLatch(1);
indexShard.acquirePrimaryOperationPermit(ActionListener.wrap(r -> {
r.close();
permitAcquiredLatch.countDown();
}, Assert::assertNotNull), EsExecutors.DIRECT_EXECUTOR_SERVICE, false, IndexShard.PrimaryPermitCheck.NONE);
safeAwait(permitAcquiredLatch);

var allPermitsAcquiredLatch = new CountDownLatch(1);
indexShard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
r.close();
allPermitsAcquiredLatch.countDown();
}, Assert::assertNotNull), TimeValue.timeValueSeconds(30), IndexShard.PrimaryPermitCheck.NONE);
safeAwait(allPermitsAcquiredLatch);
}

if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
Expand Down