Skip to content

Commit

Permalink
Do not cancel subscription on BlockingIterable#hasNext(long, TimeUnit)
Browse files Browse the repository at this point in the history
This makes sure that if this method, or in extend next(long, TimeUnit) is
called and times out, the caller is able to retry the operation and also
the upstream source will not be cancelled.

In the context of a Blocking Streaming server, this means that if a timeout
is thrown on the incoming request, the outgoing response can still be modified
since the underlying socket will not be immediately closed.

It also aligns the semantics with Single#toFuture where a blocking get with
a timeout on the future also does not cancel the upstream Single.

A (temporary) system property is introduced which allows to fall back to the
old behavior should incompatibilities be discovered in the wild.

A note for the reader who wonders how to close the subscription now: the close()
method always did cancel the subscription and continues to do so.
  • Loading branch information
daschl committed Dec 5, 2024
1 parent 74dc8c1 commit c4ebe43
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public BlockingIterator<T> iterator() {
}

private static final class SubscriberAndIterator<T> implements Subscriber<T>, BlockingIterator<T> {
/**
* Allows to re-enable cancelling the subscription on {@link #hasNext(long, TimeUnit)} timeout. This flag
* will be removed after a couple releases and no issues identified with the new behavior.
*/
private static final boolean CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT = Boolean.parseBoolean(
System.getProperty("io.servicetalk.concurrent.api.cancelSubscriptionOnHasNextTimeout", "false"));
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberAndIterator.class);
private static final Object CANCELLED_SIGNAL = new Object();
private static final TerminalNotification COMPLETE_NOTIFICATION = complete();
Expand Down Expand Up @@ -172,7 +178,9 @@ public boolean hasNext(final long timeout, final TimeUnit unit) throws TimeoutEx
next = data.poll(timeout, unit);
if (next == null) {
terminated = true;
subscription.cancel();
if (CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT) {
subscription.cancel();
}
throw new TimeoutException("timed out after: " + timeout + " units: " + unit);
}
requestMoreIfRequired();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void errorEmittedIsThrown() {
DeliberateException de = new DeliberateException();
Iterator<Integer> iterator = Publisher.<Integer>failed(de).toIterable().iterator();
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next()));
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
}

@Test
Expand All @@ -80,7 +80,7 @@ void doubleHashNextWithError() {
Iterator<Integer> iterator = Publisher.<Integer>failed(de).toIterable().iterator();
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
assertThat("Second hasNext inconsistent with first.", iterator.hasNext(), is(true));
assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next()));
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
}

@Test
Expand All @@ -93,7 +93,7 @@ void hasNextWithEmpty() {
void nextWithEmpty() {
Iterator<Integer> iterator = Publisher.<Integer>empty().toIterable().iterator();
assertThat("Item not expected but found.", iterator.hasNext(), is(false));
assertThrows(NoSuchElementException.class, () -> iterator.next());
assertThrows(NoSuchElementException.class, iterator::next);
}

@Test
Expand All @@ -109,7 +109,10 @@ void hasNextWithTimeout() throws Exception {

assertThrows(TimeoutException.class, () -> iterator.hasNext(10, MILLISECONDS));
assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false));
assertTrue(subscription.isCancelled());

assertThat(subscription.isCancelled(), is(false));
iterator.close();
assertThat(subscription.isCancelled(), is(true));
}

@Test
Expand All @@ -124,9 +127,11 @@ void nextWithTimeout() throws Exception {
assertThat("Unexpected item found.", iterator.next(-1, MILLISECONDS), is(2));

assertThrows(TimeoutException.class, () -> iterator.next(10, MILLISECONDS));

assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false));
assertTrue(subscription.isCancelled());

assertThat(subscription.isCancelled(), is(false));
iterator.close();
assertThat(subscription.isCancelled(), is(true));
}

@Test
Expand Down Expand Up @@ -173,7 +178,7 @@ void nextWithoutHasNextAndTerminal() {
source.onNext(2);
assertThat("Unexpected item found.", iterator.next(), is(2));
source.onComplete();
assertThrows(NoSuchElementException.class, () -> iterator.next());
assertThrows(NoSuchElementException.class, iterator::next);
}

@Test
Expand Down Expand Up @@ -234,7 +239,7 @@ void delayOnNextThenError() {
DeliberateException de = new DeliberateException();
source.onError(de);
assertThat("Item not expected but found.", iterator.hasNext(), is(true));
Exception e = assertThrows(DeliberateException.class, () -> iterator.next());
Exception e = assertThrows(DeliberateException.class, iterator::next);
assertThat(e, is(de));
}

Expand Down Expand Up @@ -310,7 +315,7 @@ void queueFullButAccommodatesOnError() {
source.onError(de);
verifyNextIs(iterator, 1);
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
Exception e = assertThrows(DeliberateException.class, () -> iterator.next());
Exception e = assertThrows(DeliberateException.class, iterator::next);
assertThat(e, sameInstance(de));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public interface BlockingIterable<T> extends CloseableIterable<T> {
default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, TimeUnit unit)
throws TimeoutException {
requireNonNull(action);
// TODO: should we here try/catch with close explicitly now?
BlockingIterator<T> iterator = iterator();
while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) {
action.accept(iterator.next(timeoutSupplier.getAsLong(), unit));
Expand Down Expand Up @@ -96,6 +97,8 @@ default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, T
*/
default void forEach(Consumer<? super T> action, long timeout, TimeUnit unit) throws TimeoutException {
requireNonNull(action);
// TODO: should we here try/catch with close explicitly now?

BlockingIterator<T> iterator = iterator();
long remainingTimeoutNanos = unit.toNanos(timeout);
long timeStampANanos = nanoTime();
Expand Down

0 comments on commit c4ebe43

Please sign in to comment.