Skip to content

Commit

Permalink
IGNITE-23769 Compute job cancel race (#5095)
Browse files Browse the repository at this point in the history
  • Loading branch information
valepakh authored Jan 24, 2025
1 parent 175fd9e commit 044313d
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.compute.queue;

class QueueEntryCanceledException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.internal.compute.state.ComputeStateMachine;
import org.apache.ignite.internal.compute.state.IllegalJobStatusTransition;
import org.apache.ignite.internal.logger.IgniteLogger;
Expand Down Expand Up @@ -91,6 +92,7 @@ public CompletableFuture<R> resultAsync() {

@Override
public boolean cancel() {
executionLock.lock();
try {
stateMachine.cancelingJob(jobId);

Expand All @@ -101,20 +103,17 @@ public boolean cancel() {
}
} catch (IllegalJobStatusTransition e) {
LOG.info("Cannot cancel the job", e);
} finally {
executionLock.unlock();
}
return false;
}

private void cancel(QueueEntry<R> queueEntry) {
executionLock.lock();
try {
if (executor.remove(queueEntry)) {
result.cancel(true);
} else {
queueEntry.interrupt();
}
} finally {
executionLock.unlock();
if (executor.remove(queueEntry)) {
result.cancel(true);
} else {
queueEntry.interrupt();
}
}

Expand Down Expand Up @@ -152,7 +151,19 @@ void run(int numRetries) {

private void run() {
QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
stateMachine.executeJob(jobId);
executionLock.lock();
try {
// If status is CANCELED here, then the job was in the QUEUED state when cancel was called, but executor already removed it
// from the queue. This could happen because the executor takes an entry without using the executionLock.
// Don't transition to EXECUTING state and don't run the job, throw an exception here so that we can later check it and
// cancel the result.
if (isCanceled()) {
throw new QueueEntryCanceledException();
}
stateMachine.executeJob(jobId);
} finally {
executionLock.unlock();
}

return job.call();
}, priority);
Expand All @@ -173,22 +184,16 @@ private void run() {

queueEntry.toFuture().whenComplete((r, throwable) -> {
if (throwable != null) {
if (retries.decrementAndGet() >= 0) {
if (throwable instanceof QueueEntryCanceledException) {
result.completeExceptionally(new CancellationException());
} else if (queueEntry.isInterrupted()) {
stateMachine.cancelJob(jobId);
result.completeExceptionally(throwable);
} else if (retries.decrementAndGet() >= 0) {
stateMachine.queueJob(jobId);
run();
} else {
try {
if (queueEntry.isInterrupted()) {
stateMachine.cancelJob(jobId);
} else {
stateMachine.failJob(jobId);
}
// TODO: Need to be refactored after https://issues.apache.org/jira/browse/IGNITE-23769
} catch (IllegalJobStatusTransition err) {
throwable.addSuppressed(err);
result.completeExceptionally(throwable);
}

stateMachine.failJob(jobId);
result.completeExceptionally(throwable);
}
} else {
Expand All @@ -202,4 +207,14 @@ private void run() {
}
});
}

/**
* Checks if the current job state is {@link JobStatus#CANCELED}.
*
* @return {@code true} if job is in the {@link JobStatus#CANCELED} state.
*/
private boolean isCanceled() {
JobState state = stateMachine.currentState(jobId);
return state != null && state.status() == JobStatus.CANCELED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -248,6 +249,26 @@ void taskCatchesInterruption() {
assertThat(execution.resultAsync(), willThrow(CancellationException.class));
}

@RepeatedTest(value = 10, failureThreshold = 1)
void cancelExecutionRace() {
initExecutor(1);

QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> {
try {
new CountDownLatch(1).await();
} catch (InterruptedException ignored) {
// ignored
}
return null;
});

assertThat(execution.cancel(), is(true));

assertThat(execution.resultAsync(), willThrow(CancellationException.class));

assertThat(execution.state(), is(jobStateWithStatus(CANCELED)));
}

@Test
void taskDoesntCatchInterruption() {
initExecutor(1);
Expand Down Expand Up @@ -329,6 +350,27 @@ void retryTaskFail() {
assertThat(runTimes.get(), is(maxRetries + 1));
}

@Test
void retryTaskCancel() {
initExecutor(1);

AtomicInteger runTimes = new AtomicInteger();

int maxRetries = 5;

QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> {
runTimes.incrementAndGet();
new CountDownLatch(1).await();
return null;
}, 0, maxRetries);

await().until(execution::state, jobStateWithStatus(EXECUTING));
execution.cancel();

await().until(execution::state, jobStateWithStatus(CANCELED));
assertThat(runTimes.get(), is(1));
}

@Test
void retryTaskSuccess() {
initExecutor(1);
Expand Down

0 comments on commit 044313d

Please sign in to comment.