Skip to content

Commit

Permalink
[Single|Completable].toFuture() cancellable handle exception (#2632)
Browse files Browse the repository at this point in the history
Motivation:
[Single|Completable].toFuture() may hang if there is a thread
blocked on get(), a different thread invokes cancel, and the
Cancellable for the Subscriber chain throws.

Modifications:
- Unblock the CountDownLatch in a finally block
- Preserve stack trace from the thread that invokes cancel
  • Loading branch information
Scottmitch authored Jun 29, 2023
1 parent 9823c00 commit 0f49071
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.ThrowableWrapper.isThrowableWrapper;
import static java.util.Objects.requireNonNull;

abstract class SourceToFuture<T> implements Future<T> {

static final Object NULL = new Object();
private static final Object CANCELLED = new Object();

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SourceToFuture, Object> valueUpdater =
Expand Down Expand Up @@ -65,17 +64,21 @@ public final void onError(final Throwable t) {

@Override
public final boolean cancel(final boolean mayInterruptIfRunning) {
if (valueUpdater.compareAndSet(this, null, CANCELLED)) {
cancellable.cancel();
latch.countDown();
if (value == null && valueUpdater.compareAndSet(this, null,
new CancellationWrapper(new CancellationException("Stacktrace from thread calling cancel()")))) {
try {
cancellable.cancel();
} finally {
latch.countDown();
}
return true;
}
return false;
}

@Override
public final boolean isCancelled() {
return value == CANCELLED;
return CancellationWrapper.isCancellationWrapper(value);
}

@Override
Expand Down Expand Up @@ -122,10 +125,12 @@ private T reportGet(@Nullable final Object value) throws ExecutionException {
if (value instanceof Throwable) {
throw new ExecutionException((Throwable) value);
}
if (value == CANCELLED) {
throw new CancellationException();
if (CancellationWrapper.isCancellationWrapper(value)) {
CancellationException exception = new CancellationException("Stacktrace from thread calling get()");
exception.initCause(((CancellationWrapper) value).exception);
throw exception;
}
if (value instanceof ThrowableWrapper) {
if (isThrowableWrapper(value)) {
return (T) ((ThrowableWrapper) value).unwrap();
}
return (T) value;
Expand Down Expand Up @@ -170,4 +175,16 @@ public void onComplete() {
setValue(NULL);
}
}

private static final class CancellationWrapper {
private final CancellationException exception;

private CancellationWrapper(final CancellationException exception) {
this.exception = exception;
}

static boolean isCancellationWrapper(@Nullable Object o) {
return o != null && o.getClass().equals(CancellationWrapper.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.servicetalk.concurrent.api;

import javax.annotation.Nullable;

/**
* Used to distinguish between a real object and a {@link Throwable} from terminal error.
*/
Expand All @@ -25,6 +27,10 @@ final class ThrowableWrapper {
this.throwable = throwable;
}

static boolean isThrowableWrapper(@Nullable Object o) {
return o != null && o.getClass().equals(ThrowableWrapper.class);
}

Throwable unwrap() {
return throwable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.internal.DeliberateException;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -24,25 +25,28 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.utils.internal.PlatformDependent.throwException;
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

public abstract class AbstractToFutureTest<T> {

@RegisterExtension
protected static final ExecutorExtension<Executor> EXEC = ExecutorExtension.withCachedExecutor()
.setClassLevel(true);
Expand All @@ -55,7 +59,7 @@ public abstract class AbstractToFutureTest<T> {

protected abstract void completeSource();

protected abstract void failSource(Throwable t);
protected abstract void failSource(@Nullable Throwable t);

@Nullable
protected abstract T expectedResult();
Expand All @@ -67,6 +71,30 @@ void testSubscribed() {
assertThat(isSubscribed(), is(true));
}

@Test
void testCancellableThrows() throws InterruptedException, ExecutionException {
doThrow(DELIBERATE_EXCEPTION).when(mockCancellable).cancel();
Future<T> future = toFuture();
// Since this test is targeting our Future implementation, use a JDK executor to avoid having to use Future
// conversions in this test.
ExecutorService executorService = Executors.newCachedThreadPool();
try {
// Goal is to have future.get() called before future.cancel() to avoid short circuit due to cancel and
// increase likelihood of needing to unblock the thread waiting on future.get().
CountDownLatch latch = new CountDownLatch(1);
Future<Void> f2 = executorService.submit(() -> {
latch.countDown();
assertThrows(CancellationException.class, future::get);
return null;
});
latch.await();
assertThrows(DeliberateException.class, () -> future.cancel(true));
assertThat(f2.get(), nullValue());
} finally {
executorService.shutdownNow();
}
}

@Test
void testSucceeded() throws Exception {
Future<T> future = toFuture();
Expand Down

0 comments on commit 0f49071

Please sign in to comment.