diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2ConnectionLivenessHandlerTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2ConnectionLivenessHandlerTest.java index 6bdb97dab..557227432 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2ConnectionLivenessHandlerTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2ConnectionLivenessHandlerTest.java @@ -16,12 +16,15 @@ package reactor.netty.http.client; import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http2.DefaultHttp2PingFrame; import io.netty.handler.codec.http2.Http2FrameCodec; import io.netty.handler.codec.http2.Http2FrameCodecBuilder; import io.netty.handler.codec.http2.Http2PingFrame; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.junit.jupiter.api.BeforeAll; @@ -30,11 +33,13 @@ import reactor.netty.BaseHttpTest; import reactor.netty.DisposableServer; import reactor.netty.NettyPipeline; -import reactor.netty.http.Http2SslContextSpec; +import reactor.netty.resources.ConnectionProvider; +import javax.net.ssl.SSLException; import java.security.cert.CertificateException; import java.time.Duration; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.function.BiConsumer; @@ -52,28 +57,30 @@ class Http2ConnectionLivenessHandlerTest extends BaseHttpTest { static SelfSignedCertificate ssc; + static SslContext sslServer; + static SslContext sslClient; @BeforeAll - static void createSelfSignedCertificate() throws CertificateException { + static void createSelfSignedCertificate() throws CertificateException, SSLException { ssc = new SelfSignedCertificate(); + sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) + .build(); + sslClient = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); } @Test void successReceiveResponse() { DisposableServer disposableServer = createServer() .protocol(H2) - .secure(spec -> spec.sslContext( - Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()) - )) + .secure(spec -> spec.sslContext(sslServer)) .handle((req, resp) -> resp.sendString(Mono.just("Test"))) .bindNow(); String result = createClient(disposableServer::address) .protocol(H2) - .secure(spec -> spec.sslContext( - Http2SslContextSpec.forClient() - .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)) - )) + .secure(spec -> spec.sslContext(sslClient)) .get() .uri("/") .responseSingle((resp, bytes) -> bytes.asString()) @@ -89,9 +96,7 @@ void noPingCheckWhenNotConfigured() { DisposableServer disposableServer = createServer() .protocol(H2) .maxKeepAliveRequests(1) - .secure(spec -> spec.sslContext( - Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()) - )) + .secure(spec -> spec.sslContext(sslServer)) .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forServer() .autoAckPingFrame(false) @@ -107,10 +112,7 @@ void noPingCheckWhenNotConfigured() { Channel channel = createClient(disposableServer::address) .protocol(H2) .keepAlive(true) - .secure(spec -> spec.sslContext( - Http2SslContextSpec.forClient() - .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)) - )) + .secure(spec -> spec.sslContext(sslClient)) .get() .uri("/") .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) @@ -125,19 +127,20 @@ void noPingCheckWhenNotConfigured() { } @Test - void closePingFrameIfDelayed() { + void closeConnectionIfPingFrameDelayed() { Http2PingFrameHandler handler = new Http2PingFrameHandler( (ctx, frame) -> Mono.delay(Duration.ofMillis(150)) - .doOnNext(unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true))) + .doOnNext( + unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) + ) .subscribe() ); DisposableServer disposableServer = createServer() .protocol(H2) .maxKeepAliveRequests(1) - .secure(spec -> spec.sslContext( - Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()) - )) + .secure(spec -> spec.sslContext(sslServer)) .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forServer() .autoAckPingFrame(false) @@ -153,10 +156,55 @@ void closePingFrameIfDelayed() { Channel channel = createClient(disposableServer::address) .protocol(H2) .keepAlive(true) - .secure(spec -> spec.sslContext( - Http2SslContextSpec.forClient() - .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)) - )) + .secure(spec -> spec.sslContext(sslClient)) + .http2Settings(builder -> { + builder.pingInterval(Duration.ofMillis(100)); + }) + .get() + .uri("/") + .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) + .single() + .block(); + + Mono.delay(Duration.ofMillis(600)) + .block(); + + assertThat(handler.getReceivedPingTimes()).hasSize(1); + assertThat(channel.parent().isOpen()).isFalse(); + } + + @Test + void closeConnectionInPoolIfPingFrameDelayed() { + Http2PingFrameHandler handler = new Http2PingFrameHandler( + (ctx, frame) -> Mono.delay(Duration.ofMillis(150)) + .doOnNext( + unUsed -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE) + ) + .subscribe() + ); + + DisposableServer disposableServer = createServer() + .protocol(H2) + .maxKeepAliveRequests(1) + .secure(spec -> spec.sslContext(sslServer)) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forServer() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + ConnectionProvider pool = ConnectionProvider.create("closeConnectionInPoolIfPingFrameDelayed", 1); + Channel channel = createClient(pool, disposableServer::address) + .protocol(H2) + .keepAlive(true) + .secure(spec -> spec.sslContext(sslClient)) .http2Settings(builder -> { builder.pingInterval(Duration.ofMillis(100)); }) @@ -180,9 +228,7 @@ void ackPingFrameWithinInterval() { DisposableServer disposableServer = createServer() .protocol(H2) .maxKeepAliveRequests(1) - .secure(spec -> spec.sslContext( - Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()) - )) + .secure(spec -> spec.sslContext(sslServer)) .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forServer() .autoAckPingFrame(false) @@ -198,10 +244,48 @@ void ackPingFrameWithinInterval() { Channel channel = createClient(disposableServer::address) .protocol(H2) .keepAlive(true) - .secure(spec -> spec.sslContext( - Http2SslContextSpec.forClient() - .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)) - )) + .secure(spec -> spec.sslContext(sslClient)) + .http2Settings(builder -> { + builder.pingInterval(Duration.ofMillis(100)); + }) + .get() + .uri("/") + .responseConnection((conn, receiver) -> Mono.just(receiver.channel())) + .single() + .block(); + + Mono.delay(Duration.ofSeconds(1)) + .block(); + + assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThanOrEqualTo(2); + assertThat(channel.parent().isOpen()).isTrue(); + } + + @Test + void connectionRetentionInPoolOnPingFrameAck() { + Http2PingFrameHandler handler = new Http2PingFrameHandler(); + + DisposableServer disposableServer = createServer() + .protocol(H2) + .maxKeepAliveRequests(1) + .secure(spec -> spec.sslContext(sslServer)) + .doOnChannelInit((connectionObserver, channel, remoteAddress) -> { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forServer() + .autoAckPingFrame(false) + .autoAckSettingsFrame(true) + .build(); + + channel.pipeline().replace(NettyPipeline.HttpCodec, NettyPipeline.HttpCodec, http2FrameCodec); + channel.pipeline().addLast(handler); + }) + .handle((req, resp) -> resp.sendString(Mono.just("Test"))) + .bindNow(); + + ConnectionProvider pool = ConnectionProvider.create("connectionRetentionInPoolOnPingFrameAck", 1); + Channel channel = createClient(pool, disposableServer::address) + .protocol(H2) + .keepAlive(true) + .secure(spec -> spec.sslContext(sslClient)) .http2Settings(builder -> { builder.pingInterval(Duration.ofMillis(100)); }) @@ -211,21 +295,23 @@ void ackPingFrameWithinInterval() { .single() .block(); - Mono.delay(Duration.ofMillis(1000)) + Mono.delay(Duration.ofSeconds(1)) .block(); assertThat(handler.getReceivedPingTimes()).hasSizeGreaterThanOrEqualTo(2); assertThat(channel.parent().isOpen()).isTrue(); } - private static class Http2PingFrameHandler extends SimpleChannelInboundHandler { + private static final class Http2PingFrameHandler extends SimpleChannelInboundHandler { - private List receivedPingTimes = new ArrayList<>(); + private final List receivedPingTimes = new ArrayList<>(); private final BiConsumer consumer; private Http2PingFrameHandler() { - this.consumer = (ctx, frame) -> ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)); + this.consumer = (ctx, frame) -> + ctx.writeAndFlush(new DefaultHttp2PingFrame(frame.content(), true)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } private Http2PingFrameHandler(BiConsumer consumer) { @@ -234,7 +320,7 @@ private Http2PingFrameHandler(BiConsumer @Override protected void channelRead0(ChannelHandlerContext ctx, Http2PingFrame frame) throws InterruptedException { - receivedPingTimes.add(LocalDateTime.now()); + receivedPingTimes.add(LocalDateTime.now(ZoneId.systemDefault())); consumer.accept(ctx, frame); }