Skip to content

Commit

Permalink
Single.repeat: use TerminateRepeatException as a terminal marker (#…
Browse files Browse the repository at this point in the history
…3045)

Motivation:

Currently, `RepeatWhenSingle` uses its own static `END_REPEAT_EXCEPTION`
object as an end marker. That static exception does not discard
suppressed exceptions. If under any circumcises it unintentionally
leaks, it may cause a memory leak if some path adds suppressed
exceptions to it. Also, it's inconsistent with `RepeatStrategies`.

Modifications:

- Use `RepeatStrategies.TerminateRepeatException` instead that discards
suppressed exceptions and doesn't write stacktrace.
- Cache failed `Completable` with `TerminateRepeatException` instance
instead of allocating a new one on every repeat check.

Result:

1. Avoids static exception instance that may attach suppressed
exceptions.
2. Consistency between all repeat operators and strategies.
  • Loading branch information
idelpivnitskiy authored Aug 21, 2024
1 parent 977ac05 commit edb05b5
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
*/
public final class RepeatStrategies {

private static final Completable TERMINATE_REPEAT = failed(TerminateRepeatException.INSTANCE);

/**
* An {@link Exception} instance used to indicate termination of repeats.
*/
Expand Down Expand Up @@ -273,7 +275,7 @@ public static IntFunction<Completable> repeatWithExponentialBackoffDeltaJitter(f
};
}

private static Completable terminateRepeat() {
return failed(TerminateRepeatException.INSTANCE);
static Completable terminateRepeat() {
return TERMINATE_REPEAT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,10 @@
import static io.servicetalk.concurrent.internal.SubscriberUtils.handleExceptionFromOnSubscribe;
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;

final class RepeatWhenSingle<T> extends AbstractNoHandleSubscribePublisher<T> {
private static final Exception END_REPEAT_EXCEPTION =
unknownStackTrace(new Exception(), RepeatWhenSingle.class, "<init>");
static final Completable END_REPEAT_COMPLETABLE = Completable.failed(END_REPEAT_EXCEPTION);
private final Single<T> original;
private final BiIntFunction<? super T, ? extends Completable> repeater;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static io.servicetalk.concurrent.api.NeverSingle.neverSingle;
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.api.Publisher.fromIterable;
import static io.servicetalk.concurrent.api.RepeatWhenSingle.END_REPEAT_COMPLETABLE;
import static io.servicetalk.concurrent.api.RepeatStrategies.terminateRepeat;
import static io.servicetalk.concurrent.api.SingleDoOnUtils.doOnErrorSupplier;
import static io.servicetalk.concurrent.api.SingleDoOnUtils.doOnSubscribeSupplier;
import static io.servicetalk.concurrent.api.SingleDoOnUtils.doOnSuccessSupplier;
Expand Down Expand Up @@ -1002,7 +1002,7 @@ public final Single<T> retryWhen(BiIntFunction<Throwable, ? extends Completable>
* @see <a href="https://reactivex.io/documentation/operators/repeat.html">ReactiveX repeat operator.</a>
*/
public final Publisher<T> repeat(IntPredicate shouldRepeat) {
return repeatWhen((i, __) -> shouldRepeat.test(i) ? Completable.completed() : END_REPEAT_COMPLETABLE);
return repeatWhen((i, __) -> shouldRepeat.test(i) ? Completable.completed() : terminateRepeat());
}

/**
Expand Down Expand Up @@ -1031,7 +1031,7 @@ public final Publisher<T> repeat(IntPredicate shouldRepeat) {
* @see <a href="https://reactivex.io/documentation/operators/repeat.html">ReactiveX repeat operator.</a>
*/
public final Publisher<T> repeat(BiIntPredicate<? super T> shouldRepeat) {
return repeatWhen((i, t) -> shouldRepeat.test(i, t) ? Completable.completed() : END_REPEAT_COMPLETABLE);
return repeatWhen((i, t) -> shouldRepeat.test(i, t) ? Completable.completed() : terminateRepeat());
}

/**
Expand Down

0 comments on commit edb05b5

Please sign in to comment.