Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not cancel subscription on BlockingIterable#hasNext(long, TimeUnit) #3128

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public BlockingIterator<T> iterator() {
}

private static final class SubscriberAndIterator<T> implements Subscriber<T>, BlockingIterator<T> {
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth adding that this is false by default, ie not enabled.

* Allows to re-enable cancelling the subscription on {@link #hasNext(long, TimeUnit)} timeout. This flag
* will be removed after a couple releases and no issues identified with the new behavior.
*/
private static final boolean CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT = Boolean
.getBoolean("io.servicetalk.concurrent.api.cancelSubscriptionOnHasNextTimeout");
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberAndIterator.class);
private static final Object CANCELLED_SIGNAL = new Object();
private static final TerminalNotification COMPLETE_NOTIFICATION = complete();
Expand Down Expand Up @@ -172,7 +178,9 @@ public boolean hasNext(final long timeout, final TimeUnit unit) throws TimeoutEx
next = data.poll(timeout, unit);
if (next == null) {
terminated = true;
subscription.cancel();
if (CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT) {
subscription.cancel();
}
throw new TimeoutException("timed out after: " + timeout + " units: " + unit);
}
requestMoreIfRequired();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void errorEmittedIsThrown() {
DeliberateException de = new DeliberateException();
Iterator<Integer> iterator = Publisher.<Integer>failed(de).toIterable().iterator();
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next()));
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
}

@Test
Expand All @@ -80,7 +80,7 @@ void doubleHashNextWithError() {
Iterator<Integer> iterator = Publisher.<Integer>failed(de).toIterable().iterator();
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
assertThat("Second hasNext inconsistent with first.", iterator.hasNext(), is(true));
assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next()));
assertSame(de, assertThrows(DeliberateException.class, iterator::next));
}

@Test
Expand All @@ -93,7 +93,7 @@ void hasNextWithEmpty() {
void nextWithEmpty() {
Iterator<Integer> iterator = Publisher.<Integer>empty().toIterable().iterator();
assertThat("Item not expected but found.", iterator.hasNext(), is(false));
assertThrows(NoSuchElementException.class, () -> iterator.next());
assertThrows(NoSuchElementException.class, iterator::next);
}

@Test
Expand All @@ -109,7 +109,10 @@ void hasNextWithTimeout() throws Exception {

assertThrows(TimeoutException.class, () -> iterator.hasNext(10, MILLISECONDS));
assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false));
assertTrue(subscription.isCancelled());

assertThat(subscription.isCancelled(), is(false));
iterator.close();
assertThat(subscription.isCancelled(), is(true));
}

@Test
Expand All @@ -124,9 +127,11 @@ void nextWithTimeout() throws Exception {
assertThat("Unexpected item found.", iterator.next(-1, MILLISECONDS), is(2));

assertThrows(TimeoutException.class, () -> iterator.next(10, MILLISECONDS));

assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false));
assertTrue(subscription.isCancelled());

assertThat(subscription.isCancelled(), is(false));
iterator.close();
assertThat(subscription.isCancelled(), is(true));
}

@Test
Expand Down Expand Up @@ -173,7 +178,7 @@ void nextWithoutHasNextAndTerminal() {
source.onNext(2);
assertThat("Unexpected item found.", iterator.next(), is(2));
source.onComplete();
assertThrows(NoSuchElementException.class, () -> iterator.next());
assertThrows(NoSuchElementException.class, iterator::next);
}

@Test
Expand Down Expand Up @@ -234,7 +239,7 @@ void delayOnNextThenError() {
DeliberateException de = new DeliberateException();
source.onError(de);
assertThat("Item not expected but found.", iterator.hasNext(), is(true));
Exception e = assertThrows(DeliberateException.class, () -> iterator.next());
Exception e = assertThrows(DeliberateException.class, iterator::next);
assertThat(e, is(de));
}

Expand Down Expand Up @@ -310,7 +315,7 @@ void queueFullButAccommodatesOnError() {
source.onError(de);
verifyNextIs(iterator, 1);
assertThat("Item expected but not found.", iterator.hasNext(), is(true));
Exception e = assertThrows(DeliberateException.class, () -> iterator.next());
Exception e = assertThrows(DeliberateException.class, iterator::next);
assertThat(e, sameInstance(de));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ public interface BlockingIterable<T> extends CloseableIterable<T> {
@Override
BlockingIterator<T> iterator();

@Override
default void forEach(final Consumer<? super T> action) {
try (BlockingIterator<T> iterator = iterator()) {
while (iterator.hasNext()) {
action.accept(iterator.next());
}
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

/**
* Mimics the behavior of {@link #forEach(Consumer)} but uses the {@code timeoutSupplier} to determine the timeout
* value for interactions with the {@link BlockingIterator}.
Expand All @@ -64,9 +77,14 @@ public interface BlockingIterable<T> extends CloseableIterable<T> {
default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, TimeUnit unit)
throws TimeoutException {
requireNonNull(action);
BlockingIterator<T> iterator = iterator();
while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) {
action.accept(iterator.next(timeoutSupplier.getAsLong(), unit));
try (BlockingIterator<T> iterator = iterator()) {
while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) {
action.accept(iterator.next(timeoutSupplier.getAsLong(), unit));
}
} catch (TimeoutException | RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

Expand Down Expand Up @@ -96,18 +114,24 @@ default void forEach(Consumer<? super T> action, LongSupplier timeoutSupplier, T
*/
default void forEach(Consumer<? super T> action, long timeout, TimeUnit unit) throws TimeoutException {
requireNonNull(action);
BlockingIterator<T> iterator = iterator();
long remainingTimeoutNanos = unit.toNanos(timeout);
long timeStampANanos = nanoTime();
while (iterator.hasNext(remainingTimeoutNanos, NANOSECONDS)) {
final long timeStampBNanos = nanoTime();
remainingTimeoutNanos -= timeStampBNanos - timeStampANanos;
// We do not check for timeout expiry here and instead let hasNext(), next() determine what a timeout of
// <= 0 means. It may be that those methods decide to throw a TimeoutException or provide a fallback value.
action.accept(iterator.next(remainingTimeoutNanos, NANOSECONDS));
try (BlockingIterator<T> iterator = iterator()) {
long remainingTimeoutNanos = unit.toNanos(timeout);
long timeStampANanos = nanoTime();
while (iterator.hasNext(remainingTimeoutNanos, NANOSECONDS)) {
final long timeStampBNanos = nanoTime();
remainingTimeoutNanos -= timeStampBNanos - timeStampANanos;
// We do not check for timeout expiry here and instead let hasNext(), next() determine what a timeout
// of <= 0 means. It may be that those methods decide to throw a TimeoutException or provide a
// fallback value.
action.accept(iterator.next(remainingTimeoutNanos, NANOSECONDS));

timeStampANanos = nanoTime();
remainingTimeoutNanos -= timeStampANanos - timeStampBNanos;
timeStampANanos = nanoTime();
remainingTimeoutNanos -= timeStampANanos - timeStampBNanos;
}
} catch (TimeoutException | RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

Expand Down
Loading