Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement HTTP/2 IDLE based eviction #2277

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Add relax data binding support for service and client data binding](https://github.com/ballerina-platform/ballerina-library/issues/7366)
- [Add support for configuring server name to be used in the SSL SNI extension](https://github.com/ballerina-platform/ballerina-library/issues/7435)
- [Add default HTTP listener](https://github.com/ballerina-platform/ballerina-library/issues/7514)
- [Add idle based eviction for HTTP/2 connections](https://github.com/ballerina-platform/ballerina-library/issues/7309)

### Fixed

Expand Down
7 changes: 4 additions & 3 deletions compiler-plugin-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ compileJava {
}

test {
systemProperty "ballerina.offline.flag", "true"
useTestNG()
finalizedBy jacocoTestReport
useTestNG() {
suites 'src/test/resources/testng.xml'
}
testLogging.showStandardStreams = true
testLogging {
events "PASSED", "FAILED", "SKIPPED"
Expand All @@ -105,6 +105,7 @@ test {
}
}
}
finalizedBy jacocoTestReport
}

jacocoTestReport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/**
* This class includes tests for Ballerina Http static code analyzer.
*/
class StaticCodeAnalyzerTest {
public class StaticCodeAnalyzerTest {

private static final Path RESOURCE_PACKAGES_DIRECTORY = Paths
.get("src", "test", "resources", "static_code_analyzer", "ballerina_packages").toAbsolutePath();
Expand Down
1 change: 1 addition & 0 deletions compiler-plugin-tests/src/test/resources/testng.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<packages>
<package name="io.ballerina.stdlib.http.compiler.codeaction.*"/>
<package name="io.ballerina.stdlib.http.compiler.completion.*"/>
<package name="io.ballerina.stdlib.http.compiler.staticcodeanalyzer.*"/>
</packages>
<classes>
<class name="io.ballerina.stdlib.http.compiler.CompilerPluginTest"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* {@code Http2ClientChannel} encapsulates the Channel associated with a particular connection.
Expand All @@ -62,6 +63,7 @@ public class Http2ClientChannel {
private Map<String, Http2DataEventListener> dataEventListeners;
private StreamCloseListener streamCloseListener;
private long timeSinceMarkedAsStale = 0;
private AtomicLong timeSinceMarkedAsIdle = new AtomicLong(0);
private AtomicBoolean isStale = new AtomicBoolean(false);

public Http2ClientChannel(Http2ConnectionManager http2ConnectionManager, Http2Connection connection,
Expand Down Expand Up @@ -293,6 +295,7 @@ private class StreamCloseListener extends Http2EventAdapter {
public void onStreamClosed(Http2Stream stream) {
// Channel is no longer exhausted, so we can return it back to the pool
http2ClientChannel.removeInFlightMessage(stream.id());
http2ConnectionManager.markClientChannelAsIdle(http2ClientChannel);
activeStreams.decrementAndGet();
http2ClientChannel.getDataEventListeners().
forEach(dataEventListener -> dataEventListener.onStreamClose(stream.id()));
Expand Down Expand Up @@ -349,4 +352,16 @@ void setTimeSinceMarkedAsStale(long timeSinceMarkedAsStale) {
long getTimeSinceMarkedAsStale() {
return timeSinceMarkedAsStale;
}

void setTimeSinceMarkedAsIdle(long timeSinceMarkedAsIdle) {
this.timeSinceMarkedAsIdle.set(timeSinceMarkedAsIdle);
}

long getTimeSinceMarkedAsIdle() {
return timeSinceMarkedAsIdle.get();
}

HttpRoute getHttpRoute() {
return httpRoute;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@

private final Http2ChannelPool http2ChannelPool = new Http2ChannelPool();
private final BlockingQueue<Http2ClientChannel> http2StaleClientChannels = new LinkedBlockingQueue<>();
private final BlockingQueue<Http2ClientChannel> http2ClientChannels = new LinkedBlockingQueue<>();
private final PoolConfiguration poolConfiguration;
private final ReentrantLock lock = new ReentrantLock();

public Http2ConnectionManager(PoolConfiguration poolConfiguration) {
this.poolConfiguration = poolConfiguration;
initiateConnectionEvictionTask();
initiateStaleConnectionEvictionTask();
initiateIdleConnectionEvictionTask();
}

/**
Expand Down Expand Up @@ -180,7 +182,20 @@
}
}

private void initiateConnectionEvictionTask() {
void removeClosedChannelFromIdlePool(Http2ClientChannel http2ClientChannel) {
if (!http2ClientChannels.remove(http2ClientChannel)) {
logger.warn("Specified channel does not exist in the HTTP2 client channel list.");
}
}

void markClientChannelAsIdle(Http2ClientChannel http2ClientChannel) {
http2ClientChannel.setTimeSinceMarkedAsIdle(System.currentTimeMillis());
if (!http2ClientChannels.contains(http2ClientChannel)) {
http2ClientChannels.add(http2ClientChannel);
}
}

private void initiateStaleConnectionEvictionTask() {
Timer timer = new Timer(true);
TimerTask timerTask = new TimerTask() {
@Override
Expand All @@ -192,27 +207,49 @@
}
} else if ((System.currentTimeMillis() - http2ClientChannel.getTimeSinceMarkedAsStale()) >
poolConfiguration.getMinIdleTimeInStaleState()) {
http2ClientChannel.getInFlightMessages().forEach((streamId, outboundMsgHolder) -> {
Http2MessageStateContext messageStateContext =
outboundMsgHolder.getRequest().getHttp2MessageStateContext();
if (messageStateContext != null) {
messageStateContext.getSenderState().handleConnectionClose(outboundMsgHolder);
}
});
closeInFlightRequests(http2ClientChannel);
closeChannelAndEvict(http2ClientChannel);
}
});
}

public void closeChannelAndEvict(Http2ClientChannel http2ClientChannel) {
removeClosedChannelFromStalePool(http2ClientChannel);
http2ClientChannel.getConnection().close(http2ClientChannel.getChannel().newPromise());
}
};
timer.schedule(timerTask, poolConfiguration.getTimeBetweenStaleEviction(),
poolConfiguration.getTimeBetweenStaleEviction());
}

private void initiateIdleConnectionEvictionTask() {
Timer timer = new Timer(true);
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
http2ClientChannels.forEach(http2ClientChannel -> {
if (poolConfiguration.getMinEvictableIdleTime() == -1) {
if (!http2ClientChannel.hasInFlightMessages()) {
closeChannelAndEvict(http2ClientChannel);

Check warning on line 228 in native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ConnectionManager.java

View check run for this annotation

Codecov / codecov/patch

native/src/main/java/io/ballerina/stdlib/http/transport/contractimpl/sender/http2/Http2ConnectionManager.java#L228

Added line #L228 was not covered by tests
}
} else if ((System.currentTimeMillis() - http2ClientChannel.getTimeSinceMarkedAsIdle()) >
poolConfiguration.getMinEvictableIdleTime()) {
closeInFlightRequests(http2ClientChannel);
removeClientChannel(http2ClientChannel.getHttpRoute(), http2ClientChannel);
closeChannelAndEvict(http2ClientChannel);
}
});
}
};
timer.schedule(timerTask, poolConfiguration.getTimeBetweenEvictionRuns(),
poolConfiguration.getTimeBetweenEvictionRuns());
}

private static void closeInFlightRequests(Http2ClientChannel http2ClientChannel) {
http2ClientChannel.getInFlightMessages().forEach((streamId, outboundMsgHolder) -> {
Http2MessageStateContext messageStateContext =
outboundMsgHolder.getRequest().getHttp2MessageStateContext();
if (messageStateContext != null) {
messageStateContext.getSenderState().handleConnectionClose(outboundMsgHolder);
}
});
}

private Http2ChannelPool.PerRouteConnectionPool fetchPerRoutePool(HttpRoute httpRoute) {
String key = generateKey(httpRoute);
return this.http2ChannelPool.fetchPerRoutePool(key);
Expand All @@ -222,4 +259,9 @@
return httpRoute.getScheme() + ":" + httpRoute.getHost() + ":" + httpRoute.getPort() + ":" +
httpRoute.getConfigHash();
}

private void closeChannelAndEvict(Http2ClientChannel http2ClientChannel) {
removeClosedChannelFromIdlePool(http2ClientChannel);
http2ClientChannel.getConnection().close(http2ClientChannel.getChannel().newPromise());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.http.transport.http2.connectionpool;

import io.ballerina.stdlib.http.transport.contract.Constants;
import io.ballerina.stdlib.http.transport.contract.HttpClientConnector;
import io.ballerina.stdlib.http.transport.contract.HttpConnectorListener;
import io.ballerina.stdlib.http.transport.contract.HttpWsConnectorFactory;
import io.ballerina.stdlib.http.transport.contract.ServerConnector;
import io.ballerina.stdlib.http.transport.contract.ServerConnectorFuture;
import io.ballerina.stdlib.http.transport.contract.config.ListenerConfiguration;
import io.ballerina.stdlib.http.transport.contract.config.SenderConfiguration;
import io.ballerina.stdlib.http.transport.contract.config.ServerBootstrapConfiguration;
import io.ballerina.stdlib.http.transport.contract.config.TransportsConfiguration;
import io.ballerina.stdlib.http.transport.contract.exceptions.ServerConnectorException;
import io.ballerina.stdlib.http.transport.contractimpl.DefaultHttpWsConnectorFactory;
import io.ballerina.stdlib.http.transport.contractimpl.listener.http2.Http2SourceHandler;
import io.ballerina.stdlib.http.transport.message.HttpCarbonMessage;
import io.ballerina.stdlib.http.transport.message.HttpCarbonResponse;
import io.ballerina.stdlib.http.transport.message.HttpConnectorUtil;
import io.ballerina.stdlib.http.transport.message.HttpMessageDataStreamer;
import io.ballerina.stdlib.http.transport.util.TestUtil;
import io.ballerina.stdlib.http.transport.util.client.http2.MessageGenerator;
import io.ballerina.stdlib.http.transport.util.client.http2.MessageSender;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.HashMap;

import static io.ballerina.stdlib.http.transport.contract.Constants.CHNL_HNDLR_CTX;
import static io.ballerina.stdlib.http.transport.contract.Constants.HTTP_2_0;
import static io.ballerina.stdlib.http.transport.util.TestUtil.HTTP_SCHEME;
import static io.ballerina.stdlib.http.transport.util.TestUtil.SERVER_CONNECTOR_PORT;
import static org.testng.Assert.assertNotNull;

public class ConnectionPoolEvictionTest {

private static final Logger LOG = LoggerFactory.getLogger(ConnectionPoolEvictionTest.class);

private HttpWsConnectorFactory httpWsConnectorFactory;
private ServerConnector serverConnector;

@BeforeClass
public void setup() {
httpWsConnectorFactory = new DefaultHttpWsConnectorFactory();
ListenerConfiguration listenerConfiguration = new ListenerConfiguration();
listenerConfiguration.setPort(SERVER_CONNECTOR_PORT);
listenerConfiguration.setScheme(Constants.HTTP_SCHEME);
listenerConfiguration.setVersion(Constants.HTTP_2_0);
serverConnector = httpWsConnectorFactory
.createServerConnector(new ServerBootstrapConfiguration(new HashMap<>()), listenerConfiguration);
ServerConnectorFuture serverConnectorFuture = serverConnector.start();
TransportsConfiguration transportsConfiguration = new TransportsConfiguration();
SenderConfiguration h2cSenderConfiguration = HttpConnectorUtil.getSenderConfiguration(transportsConfiguration,
Constants.HTTP_SCHEME);
h2cSenderConfiguration.setHttpVersion(Constants.HTTP_2_0);
serverConnectorFuture.setHttpConnectorListener(new Listener());
try {
serverConnectorFuture.sync();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for server connector to start");
}
}

@Test
public void testConnectionEvictionWithUpgrade() throws InterruptedException {
TransportsConfiguration transportsConfiguration = new TransportsConfiguration();
SenderConfiguration senderConfiguration = HttpConnectorUtil.getSenderConfiguration(transportsConfiguration,
Constants.HTTP_SCHEME);
senderConfiguration.getPoolConfiguration().setMinEvictableIdleTime(2000);
senderConfiguration.getPoolConfiguration().setTimeBetweenEvictionRuns(1000);
senderConfiguration.setHttpVersion(HTTP_2_0);
HttpClientConnector client = httpWsConnectorFactory.createHttpClientConnector(
HttpConnectorUtil.getTransportProperties(transportsConfiguration), senderConfiguration);
String firstId = getResponse(client);

Thread.sleep(500);
String secondId = getResponse(client);
Assert.assertEquals(firstId, secondId);

Thread.sleep(5000);
String thirdId = getResponse(client);
Assert.assertNotEquals(firstId, thirdId);
}

@Test
public void testConnectionEvictionWithPriorKnowledge() throws InterruptedException {
TransportsConfiguration transportsConfiguration = new TransportsConfiguration();
SenderConfiguration senderConfiguration = HttpConnectorUtil.getSenderConfiguration(transportsConfiguration,
Constants.HTTP_SCHEME);
senderConfiguration.getPoolConfiguration().setMinEvictableIdleTime(2000);
senderConfiguration.getPoolConfiguration().setTimeBetweenEvictionRuns(1000);
senderConfiguration.setHttpVersion(HTTP_2_0);
senderConfiguration.setForceHttp2(true);
HttpClientConnector client = httpWsConnectorFactory.createHttpClientConnector(
HttpConnectorUtil.getTransportProperties(transportsConfiguration), senderConfiguration);
String firstId = getResponse(client);

Thread.sleep(500);
String secondId = getResponse(client);
Assert.assertEquals(firstId, secondId);

Thread.sleep(5000);
String thirdId = getResponse(client);
Assert.assertNotEquals(firstId, thirdId);
}

private String getResponse(HttpClientConnector client) {
HttpCarbonMessage httpCarbonMessage = MessageGenerator.generateRequest(HttpMethod.GET, null,
SERVER_CONNECTOR_PORT, HTTP_SCHEME);
HttpCarbonMessage response = new MessageSender(client).sendMessage(httpCarbonMessage);
assertNotNull(response);
return TestUtil.getStringFromInputStream(new HttpMessageDataStreamer(response).getInputStream());
}

@AfterClass
public void cleanUp() throws ServerConnectorException {
try {
serverConnector.stop();
httpWsConnectorFactory.shutdown();
} catch (Exception e) {
LOG.warn("Resource clean up is interrupted", e);
}
}

static class Listener implements HttpConnectorListener {

@Override
public void onMessage(HttpCarbonMessage httpRequest) {
Thread.startVirtualThread(() -> {
try {
HttpVersion httpVersion = new HttpVersion(Constants.HTTP_VERSION_2_0, true);
HttpCarbonMessage httpResponse = new HttpCarbonResponse(new DefaultHttpResponse(httpVersion,
HttpResponseStatus.OK));
httpResponse.setHeader(HttpHeaderNames.CONTENT_TYPE.toString(), Constants.TEXT_PLAIN);
httpResponse.setHttpStatusCode(HttpResponseStatus.OK.code());
String id = ((Http2SourceHandler) ((ChannelHandlerContext) httpRequest
.getProperty(CHNL_HNDLR_CTX)).handler()).getChannelHandlerContext()
.channel().id().asLongText();

do {
HttpContent httpContent = httpRequest.getHttpContent();
if (httpContent instanceof LastHttpContent) {
break;
}
} while (true);

HttpContent httpContent = new DefaultLastHttpContent(Unpooled.wrappedBuffer(id.getBytes()));
httpResponse.addHttpContent(httpContent);
httpRequest.respond(httpResponse);
} catch (ServerConnectorException e) {
LOG.error("Error occurred during message notification: {}", e.getMessage());
}
});
}

@Override
public void onError(Throwable throwable) {}
}
}
1 change: 1 addition & 0 deletions native/src/test/resources/testng.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
<class name="io.ballerina.stdlib.http.transport.http2.clienttimeout.TimeoutDuringResponseReceive"/>
<class name="io.ballerina.stdlib.http.transport.http2.Http2WithHttp2ResetContent"/>
<class name="io.ballerina.stdlib.http.transport.http2.Http2WithPriorKnowledgeTestCase"/>
<class name="io.ballerina.stdlib.http.transport.http2.connectionpool.ConnectionPoolEvictionTest"/>
<class name="io.ballerina.stdlib.http.transport.http2.connectionpool.H2ConnectionPoolWithALPN"/>
<class name="io.ballerina.stdlib.http.transport.http2.connectionpool.H2ConnectionPoolWithPriorKnowledge"/>
<class name="io.ballerina.stdlib.http.transport.http2.connectionpool.H2ConnectionPoolWithUpgrade"/>
Expand Down
Loading