diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java index 73bb1fe08f..8add02e5f0 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterable.java @@ -72,6 +72,12 @@ public BlockingIterator iterator() { } private static final class SubscriberAndIterator implements Subscriber, BlockingIterator { + /** + * Allows to re-enable cancelling the subscription on {@link #hasNext(long, TimeUnit)} timeout. This flag + * will be removed after a couple releases and no issues identified with the new behavior. + */ + private static final boolean CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT = Boolean + .getBoolean("io.servicetalk.concurrent.api.cancelSubscriptionOnHasNextTimeout"); private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberAndIterator.class); private static final Object CANCELLED_SIGNAL = new Object(); private static final TerminalNotification COMPLETE_NOTIFICATION = complete(); @@ -172,7 +178,9 @@ public boolean hasNext(final long timeout, final TimeUnit unit) throws TimeoutEx next = data.poll(timeout, unit); if (next == null) { terminated = true; - subscription.cancel(); + if (CANCEL_SUBSCRIPTION_ON_HAS_NEXT_TIMEOUT) { + subscription.cancel(); + } throw new TimeoutException("timed out after: " + timeout + " units: " + unit); } requestMoreIfRequired(); diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterableTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterableTest.java index 894b8b309b..f5056cbf59 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterableTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherAsBlockingIterableTest.java @@ -71,7 +71,7 @@ void errorEmittedIsThrown() { DeliberateException de = new DeliberateException(); Iterator iterator = Publisher.failed(de).toIterable().iterator(); assertThat("Item expected but not found.", iterator.hasNext(), is(true)); - assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next())); + assertSame(de, assertThrows(DeliberateException.class, iterator::next)); } @Test @@ -80,7 +80,7 @@ void doubleHashNextWithError() { Iterator iterator = Publisher.failed(de).toIterable().iterator(); assertThat("Item expected but not found.", iterator.hasNext(), is(true)); assertThat("Second hasNext inconsistent with first.", iterator.hasNext(), is(true)); - assertSame(de, assertThrows(DeliberateException.class, () -> iterator.next())); + assertSame(de, assertThrows(DeliberateException.class, iterator::next)); } @Test @@ -93,7 +93,7 @@ void hasNextWithEmpty() { void nextWithEmpty() { Iterator iterator = Publisher.empty().toIterable().iterator(); assertThat("Item not expected but found.", iterator.hasNext(), is(false)); - assertThrows(NoSuchElementException.class, () -> iterator.next()); + assertThrows(NoSuchElementException.class, iterator::next); } @Test @@ -109,7 +109,10 @@ void hasNextWithTimeout() throws Exception { assertThrows(TimeoutException.class, () -> iterator.hasNext(10, MILLISECONDS)); assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false)); - assertTrue(subscription.isCancelled()); + + assertThat(subscription.isCancelled(), is(false)); + iterator.close(); + assertThat(subscription.isCancelled(), is(true)); } @Test @@ -124,9 +127,11 @@ void nextWithTimeout() throws Exception { assertThat("Unexpected item found.", iterator.next(-1, MILLISECONDS), is(2)); assertThrows(TimeoutException.class, () -> iterator.next(10, MILLISECONDS)); - assertThat("Unexpected item found.", iterator.hasNext(-1, MILLISECONDS), is(false)); - assertTrue(subscription.isCancelled()); + + assertThat(subscription.isCancelled(), is(false)); + iterator.close(); + assertThat(subscription.isCancelled(), is(true)); } @Test @@ -173,7 +178,7 @@ void nextWithoutHasNextAndTerminal() { source.onNext(2); assertThat("Unexpected item found.", iterator.next(), is(2)); source.onComplete(); - assertThrows(NoSuchElementException.class, () -> iterator.next()); + assertThrows(NoSuchElementException.class, iterator::next); } @Test @@ -234,7 +239,7 @@ void delayOnNextThenError() { DeliberateException de = new DeliberateException(); source.onError(de); assertThat("Item not expected but found.", iterator.hasNext(), is(true)); - Exception e = assertThrows(DeliberateException.class, () -> iterator.next()); + Exception e = assertThrows(DeliberateException.class, iterator::next); assertThat(e, is(de)); } @@ -310,7 +315,7 @@ void queueFullButAccommodatesOnError() { source.onError(de); verifyNextIs(iterator, 1); assertThat("Item expected but not found.", iterator.hasNext(), is(true)); - Exception e = assertThrows(DeliberateException.class, () -> iterator.next()); + Exception e = assertThrows(DeliberateException.class, iterator::next); assertThat(e, sameInstance(de)); } diff --git a/servicetalk-concurrent/src/main/java/io/servicetalk/concurrent/BlockingIterable.java b/servicetalk-concurrent/src/main/java/io/servicetalk/concurrent/BlockingIterable.java index d5a192e2c0..8ca8ed0fa4 100644 --- a/servicetalk-concurrent/src/main/java/io/servicetalk/concurrent/BlockingIterable.java +++ b/servicetalk-concurrent/src/main/java/io/servicetalk/concurrent/BlockingIterable.java @@ -40,6 +40,19 @@ public interface BlockingIterable extends CloseableIterable { @Override BlockingIterator iterator(); + @Override + default void forEach(final Consumer action) { + try (BlockingIterator iterator = iterator()) { + while (iterator.hasNext()) { + action.accept(iterator.next()); + } + } catch (RuntimeException ex) { + throw ex; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + /** * Mimics the behavior of {@link #forEach(Consumer)} but uses the {@code timeoutSupplier} to determine the timeout * value for interactions with the {@link BlockingIterator}. @@ -64,9 +77,14 @@ public interface BlockingIterable extends CloseableIterable { default void forEach(Consumer action, LongSupplier timeoutSupplier, TimeUnit unit) throws TimeoutException { requireNonNull(action); - BlockingIterator iterator = iterator(); - while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) { - action.accept(iterator.next(timeoutSupplier.getAsLong(), unit)); + try (BlockingIterator iterator = iterator()) { + while (iterator.hasNext(timeoutSupplier.getAsLong(), unit)) { + action.accept(iterator.next(timeoutSupplier.getAsLong(), unit)); + } + } catch (TimeoutException | RuntimeException ex) { + throw ex; + } catch (Exception ex) { + throw new RuntimeException(ex); } } @@ -96,18 +114,24 @@ default void forEach(Consumer action, LongSupplier timeoutSupplier, T */ default void forEach(Consumer action, long timeout, TimeUnit unit) throws TimeoutException { requireNonNull(action); - BlockingIterator iterator = iterator(); - long remainingTimeoutNanos = unit.toNanos(timeout); - long timeStampANanos = nanoTime(); - while (iterator.hasNext(remainingTimeoutNanos, NANOSECONDS)) { - final long timeStampBNanos = nanoTime(); - remainingTimeoutNanos -= timeStampBNanos - timeStampANanos; - // We do not check for timeout expiry here and instead let hasNext(), next() determine what a timeout of - // <= 0 means. It may be that those methods decide to throw a TimeoutException or provide a fallback value. - action.accept(iterator.next(remainingTimeoutNanos, NANOSECONDS)); + try (BlockingIterator iterator = iterator()) { + long remainingTimeoutNanos = unit.toNanos(timeout); + long timeStampANanos = nanoTime(); + while (iterator.hasNext(remainingTimeoutNanos, NANOSECONDS)) { + final long timeStampBNanos = nanoTime(); + remainingTimeoutNanos -= timeStampBNanos - timeStampANanos; + // We do not check for timeout expiry here and instead let hasNext(), next() determine what a timeout + // of <= 0 means. It may be that those methods decide to throw a TimeoutException or provide a + // fallback value. + action.accept(iterator.next(remainingTimeoutNanos, NANOSECONDS)); - timeStampANanos = nanoTime(); - remainingTimeoutNanos -= timeStampANanos - timeStampBNanos; + timeStampANanos = nanoTime(); + remainingTimeoutNanos -= timeStampANanos - timeStampBNanos; + } + } catch (TimeoutException | RuntimeException ex) { + throw ex; + } catch (Exception ex) { + throw new RuntimeException(ex); } }