Skip to content

Commit

Permalink
Subscribable sources: try-catch onSubscribe and cleanup state if any (
Browse files Browse the repository at this point in the history
#3046)

Motivation:

All original sources that implement `handleSubscribe` method should protect from a buggy `Subscriber` that may unexpectedly throw from `onSubscribe`. If they have any state or resource, it should be cleaned up.

Modifications:
- Find all `Publisher`/`Single`/`Completable` that implement `handleSubscribe` and use `handleExceptionFromOnSubscribe` utility;

The following sources clean up their state on exception:
- `FromInputStreamPublisher` closes `InputStream`;
- `SpliceFlatStreamToMetaSingle` cancels upstream `Subscription`;
- `TcpConnector` signals `ConnectionObserver.connectionClosed(t)`;
- `TcpServerBinder` cancels bind `Future` and closes `Channel` if necessary;
- `NettyChannelPublisher` registers `fatalError` and closes `Channel`;
- `DefaultNettyConnection`, `ChannelInitSingle`, `H2ClientParentConnectionContext`, `H2ServerParentConnectionContext` close `Channel`;

Results:

1. Best effort to propagate exception via reactive flow.
2. Cleaning up resources.
  • Loading branch information
idelpivnitskiy authored Aug 21, 2024
1 parent edb05b5 commit 8074930
Show file tree
Hide file tree
Showing 19 changed files with 246 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.internal.DelayedCancellable;

import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static java.util.Objects.requireNonNull;

abstract class AbstractSubmitCompletable extends Completable implements CompletableSource {
Expand All @@ -33,7 +34,12 @@ abstract class AbstractSubmitCompletable extends Completable implements Completa
@Override
protected final void handleSubscribe(final CompletableSource.Subscriber subscriber) {
DelayedCancellable cancellable = new DelayedCancellable();
subscriber.onSubscribe(cancellable);
try {
subscriber.onSubscribe(cancellable);
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
return;
}
final Cancellable eCancellable;
try {
eCancellable = runExecutor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.concurrent.Callable;

import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static java.util.Objects.requireNonNull;

abstract class AbstractSubmitSingle<T> extends Single<T> implements SingleSource<T> {
Expand All @@ -35,7 +36,12 @@ abstract class AbstractSubmitSingle<T> extends Single<T> implements SingleSource
@Override
protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
DelayedCancellable cancellable = new DelayedCancellable();
subscriber.onSubscribe(cancellable);
try {
subscriber.onSubscribe(cancellable);
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
return;
}
final Cancellable eCancellable;
try {
eCancellable = runExecutor.execute(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.concurrent.internal.TerminalNotification;

import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.concurrent.internal.TerminalNotification.complete;

/**
Expand All @@ -36,7 +37,12 @@ protected void handleSubscribe(Subscriber subscriber) {
// We used a DelayedCancellable to avoid the case where the Subscriber will synchronously cancel and then
// we would add the subscriber to the queue and possibly never (until termination) dereference the subscriber.
DelayedCancellable delayedCancellable = new DelayedCancellable();
subscriber.onSubscribe(delayedCancellable);
try {
subscriber.onSubscribe(delayedCancellable);
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
return;
}
if (stack.push(subscriber)) {
delayedCancellable.delayedCancellable(() -> {
// Cancel in this case will just cleanup references from the queue to ensure we don't prevent GC of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,14 @@ public void subscribe(final Subscriber<? super T> subscriber) {
@Override
protected void handleSubscribe(final Subscriber<? super T> subscriber) {
if (subscribedUpdater.compareAndSet(this, 0, 1)) {
final InputStreamPublisherSubscription<T> subscription =
new InputStreamPublisherSubscription<>(stream, subscriber, mapper);
try {
subscriber.onSubscribe(new InputStreamPublisherSubscription<>(stream, subscriber, mapper));
subscriber.onSubscribe(subscription);
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
// Ownership of the stream is transferred to a new Subscriber, we must close it in case of an error.
subscription.closeStreamOnError(t);
}
} else {
deliverErrorFromSource(subscriber, new DuplicateSubscribeException(null, subscriber));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.function.Consumer;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;

/**
* A {@link Single} which is also a {@link Subscriber}. State of this {@link Single} can be modified by using the
* {@link Subscriber} methods which is forwarded to all existing or subsequent {@link Subscriber}s.
Expand All @@ -37,7 +39,12 @@ protected void handleSubscribe(final Subscriber<? super T> subscriber) {
// We used a DelayedCancellable to avoid the case where the Subscriber will synchronously cancel and then
// we would add the subscriber to the queue and possibly never (until termination) dereference the subscriber.
DelayedCancellable delayedCancellable = new DelayedCancellable();
subscriber.onSubscribe(delayedCancellable);
try {
subscriber.onSubscribe(delayedCancellable);
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
return;
}
if (stack.push(subscriber)) {
delayedCancellable.delayedCancellable(() -> {
// Cancel in this case will just cleanup references from the queue to ensure we don't prevent GC of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand All @@ -44,7 +45,12 @@ final class TimerCompletable extends Completable implements CompletableSource {
@Override
protected void handleSubscribe(final Subscriber subscriber) {
DelayedCancellable cancellable = new DelayedCancellable();
subscriber.onSubscribe(cancellable);
try {
subscriber.onSubscribe(cancellable);
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
return;
}
try {
cancellable.delayedCancellable(
timeoutExecutor.schedule(subscriber::onComplete, delayNs, NANOSECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Publisher.fromInputStream;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
Expand Down Expand Up @@ -145,6 +146,32 @@ void closeStreamOnCancelByDefault() throws Exception {
verify(inputStream).close();
}

@Test
void closeStreamOnThrowingSubscriber() throws Exception {
AtomicReference<Throwable> onError = new AtomicReference<>();
toSource(pub).subscribe(new Subscriber<byte[]>() {
@Override
public void onSubscribe(final Subscription subscription) {
throw DELIBERATE_EXCEPTION;
}

@Override
public void onNext(@Nullable final byte[] bytes) {
}

@Override
public void onError(final Throwable t) {
onError.set(t);
}

@Override
public void onComplete() {
}
});
assertThat(onError.get(), is(DELIBERATE_EXCEPTION));
verify(inputStream).close();
}

@Test
void streamClosedAndErrorOnInvalidReqN() throws Exception {
toSource(pub).subscribe(sub1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.ChannelHandler;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.close;

abstract class ChannelInitSingle<T> extends SubscribableSingle<T> {
Expand All @@ -42,7 +43,13 @@ protected final void handleSubscribe(final Subscriber<? super T> subscriber) {
deliverErrorFromSource(subscriber, cause);
return;
}
subscriber.onSubscribe(channel::close);
try {
subscriber.onSubscribe(channel::close);
} catch (Throwable cause) {
close(channel, cause);
handleExceptionFromOnSubscribe(subscriber, cause);
return;
}
// We have to add to the pipeline AFTER we call onSubscribe, because adding to the pipeline may invoke
// callbacks that interact with the subscriber.
channel.pipeline().addLast(newChannelHandler(subscriber));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ delayedCancellable, shouldWaitForSslHandshake(sslSession, sslConfig),
deliverErrorFromSource(subscriber, cause);
return;
}
subscriber.onSubscribe(delayedCancellable);
try {
subscriber.onSubscribe(delayedCancellable);
} catch (Throwable cause) {
close(channel, cause);
handleExceptionFromOnSubscribe(subscriber, cause);
return;
}
// We have to add to the pipeline AFTER we call onSubscribe, because adding to the pipeline may invoke
// callbacks that interact with the subscriber.
pipeline.addLast(parentChannelInitializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.HttpDebugUtils.showPipeline;
import static io.servicetalk.transport.netty.internal.ChannelSet.CHANNEL_CLOSEABLE_KEY;
Expand Down Expand Up @@ -198,7 +199,13 @@ protected void initChannel(final Http2StreamChannel streamChannel) {
deliverErrorFromSource(subscriber, cause);
return;
}
subscriber.onSubscribe(delayedCancellable);
try {
subscriber.onSubscribe(delayedCancellable);
} catch (Throwable cause) {
ChannelCloseUtils.close(channel, cause);
handleExceptionFromOnSubscribe(subscriber, cause);
return;
}
// We have to add to the pipeline AFTER we call onSubscribe, because adding to the pipeline may invoke
// callbacks that interact with the subscriber.
pipeline.addLast(parentChannelInitializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import static io.servicetalk.concurrent.api.Completable.defer;
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.api.StreamingHttpRequests.newTransportRequest;
Expand Down Expand Up @@ -545,7 +546,12 @@ private static final class SingleSubscriberProcessor extends SubscribableComplet

@Override
protected void handleSubscribe(final Subscriber subscriber) {
subscriber.onSubscribe(this);
try {
subscriber.onSubscribe(this);
} catch (Throwable t) {
handleExceptionFromOnSubscribe(subscriber, t);
return;
}
for (;;) {
final Object cState = state;
if (cState instanceof TerminalNotification) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL;
import static io.servicetalk.concurrent.internal.EmptySubscriptions.EMPTY_SUBSCRIPTION;
import static io.servicetalk.concurrent.internal.SubscriberUtils.checkDuplicateSubscription;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -236,7 +237,18 @@ protected void handleSubscribe(PublisherSource.Subscriber<? super Payload> newSu
// newSubscriber.onSubscribe MUST be called before making newSubscriber visible below with the CAS
// on maybePayloadSubUpdater. Otherwise there is a potential for concurrent invocation on the
// Subscriber which is not allowed by the Reactive Streams specification.
newSubscriber.onSubscribe(delayedSubscription);
try {
newSubscriber.onSubscribe(delayedSubscription);
} catch (Throwable t) {
handleExceptionFromOnSubscribe(newSubscriber, t);
if (maybePayloadSubUpdater.compareAndSet(SplicingSubscriber.this, PENDING,
EMPTY_COMPLETED_DELIVERED)) {
final Subscription subscription = rawSubscription;
assert subscription != null : "Expected rawSubscription but got null";
subscription.cancel();
}
return;
}
if (maybePayloadSubUpdater.compareAndSet(SplicingSubscriber.this, PENDING, newSubscriber)) {
assert rawSubscription != null : "Expected rawSubscription but got null";
delayedSubscription.delayedSubscription(rawSubscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TestPublisher;
Expand All @@ -28,6 +29,8 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
Expand Down Expand Up @@ -148,6 +151,44 @@ void cancelDataBeforeDataCompleteShouldDeliverError() {
assertThat(dataSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
}

@Test
void cancelUpstreamIfPayloadSubscriberThrowsFromOnSubscribe() {
Single<Data> op = upstream.liftSyncToSingle(new SpliceFlatStreamToMetaSingle<>(Data::new));
toSource(op).subscribe(dataSubscriber);
upstream.onSubscribe(subscription);
upstream.onNext(metaData);
Data data = dataSubscriber.awaitOnSuccess();
assertThat(data, is(notNullValue()));
assertThat(data.meta(), equalTo(metaData.meta()));
assertFalse(subscription.isCancelled());

Publisher<Payload> payload = data.getPayload();
AtomicReference<Throwable> onError = new AtomicReference<>();
toSource(payload).subscribe(new PublisherSource.Subscriber<Payload>() {
@Override
public void onSubscribe(final PublisherSource.Subscription subscription) {
throw DELIBERATE_EXCEPTION;
}

@Override
public void onNext(@Nullable final Payload payload) {
}

@Override
public void onError(final Throwable t) {
onError.set(t);
}

@Override
public void onComplete() {
}
});
assertTrue(subscription.isCancelled(), "Upstream subscription not cancelled");
assertThat(onError.get(), is(DELIBERATE_EXCEPTION));
toSource(payload).subscribe(dupePayloadSubscriber);
assertThat(dupePayloadSubscriber.awaitOnError(), instanceOf(DuplicateSubscribeException.class));
}

@ParameterizedTest(name = "{displayName} [{index}]: withPayload={0}")
@ValueSource(booleans = {false, true})
void streamErrorAfterPublisherSubscribeShouldDeliverError(boolean withPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.transport.netty.internal.BuilderUtils.socketChannel;
import static io.servicetalk.transport.netty.internal.BuilderUtils.toNettyAddress;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError;
Expand Down Expand Up @@ -97,8 +98,16 @@ public static <C extends ListenableAsyncCloseable> Single<C> connect(
return new SubscribableSingle<C>() {
@Override
protected void handleSubscribe(final Subscriber<? super C> subscriber) {
ConnectHandler<C> connectHandler = new ConnectHandler<>(subscriber, connectionFactory,
observer.onNewConnection(localAddress, resolvedRemoteAddress));
final ConnectionObserver connectionObserver =
observer.onNewConnection(localAddress, resolvedRemoteAddress);
final ConnectHandler<C> connectHandler;
try {
connectHandler = new ConnectHandler<>(subscriber, connectionFactory, connectionObserver);
} catch (Throwable t) {
connectionObserver.connectionClosed(t);
handleExceptionFromOnSubscribe(subscriber, t);
return;
}
try {
Future<?> connectFuture = connect0(localAddress, resolvedRemoteAddress, config, autoRead,
executionContext, connectHandler);
Expand Down
Loading

0 comments on commit 8074930

Please sign in to comment.