Skip to content

Commit

Permalink
backport http-stream-content-size-handler (#120417)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhl-b authored Jan 18, 2025
1 parent fb9b55c commit ddd0ffa
Show file tree
Hide file tree
Showing 5 changed files with 467 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpBodyTracer;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -93,10 +92,16 @@
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {

private static final int MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toIntBytes(50);

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(50, ByteSizeUnit.MB));

builder.put(
HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(),
new ByteSizeValue(MAX_CONTENT_LENGTH, ByteSizeUnit.BYTES)
);
return builder.build();
}

Expand Down Expand Up @@ -135,7 +140,7 @@ public void testReceiveAllChunks() throws Exception {
var opaqueId = opaqueId(reqNo);

// this dataset will be compared with one on server side
var dataSize = randomIntBetween(1024, maxContentLength());
var dataSize = randomIntBetween(1024, MAX_CONTENT_LENGTH);
var sendData = Unpooled.wrappedBuffer(randomByteArrayOfLength(dataSize));
sendData.retain();
ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, sendData));
Expand Down Expand Up @@ -243,7 +248,7 @@ public void testServerExceptionMidStream() throws Exception {
public void testClientBackpressure() throws Exception {
try (var ctx = setupClientCtx()) {
var opaqueId = opaqueId(0);
var payloadSize = maxContentLength();
var payloadSize = MAX_CONTENT_LENGTH;
var totalParts = 10;
var partSize = payloadSize / totalParts;
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
Expand Down Expand Up @@ -285,7 +290,7 @@ public void test100Continue() throws Exception {
try (var ctx = setupClientCtx()) {
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var acceptableContentLength = randomIntBetween(0, maxContentLength());
var acceptableContentLength = randomIntBetween(0, MAX_CONTENT_LENGTH);

// send request header and await 100-continue
var req = httpRequest(id, acceptableContentLength);
Expand Down Expand Up @@ -317,7 +322,7 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
try (var ctx = setupClientCtx()) {
for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var oversized = maxContentLength() + 1;
var oversized = MAX_CONTENT_LENGTH + 1;

// send request header and await 413 too large
var req = httpRequest(id, oversized);
Expand All @@ -333,32 +338,28 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
}
}

// ensures that oversized chunked encoded request has no limits at http layer
// rest handler is responsible for oversized requests
public void testOversizedChunkedEncodingNoLimits() throws Exception {
// ensures that oversized chunked encoded request has maxContentLength limit and returns 413
public void testOversizedChunkedEncoding() throws Exception {
try (var ctx = setupClientCtx()) {
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = maxContentLength() + 1;
var content = randomByteArrayOfLength(contentSize);
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
var chunkedIs = new ChunkedStream(is);
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
var req = httpRequest(id, 0);
HttpUtil.setTransferEncodingChunked(req, true);

ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(httpChunkedIs);
var handler = ctx.awaitRestChannelAccepted(id);
var consumed = handler.readAllBytes();
assertEquals(contentSize, consumed);
handler.sendResponse(new RestResponse(RestStatus.OK, ""));

var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.OK, resp.status());
resp.release();
}
var id = opaqueId(0);
var contentSize = MAX_CONTENT_LENGTH + 1;
var content = randomByteArrayOfLength(contentSize);
var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
var chunkedIs = new ChunkedStream(is);
var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
var req = httpRequest(id, 0);
HttpUtil.setTransferEncodingChunked(req, true);

ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(httpChunkedIs);
var handler = ctx.awaitRestChannelAccepted(id);
var consumed = handler.readAllBytes();
assertTrue(consumed <= MAX_CONTENT_LENGTH);

var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
resp.release();
}
}

Expand All @@ -369,7 +370,7 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
try (var ctx = setupClientCtx()) {
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
var req = httpRequest(id, contentSize);
var content = randomContent(contentSize, true);

Expand Down Expand Up @@ -405,7 +406,7 @@ public void testHttpClientStats() throws Exception {

for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
var contentSize = randomIntBetween(0, MAX_CONTENT_LENGTH);
totalBytesSent += contentSize;
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
Expand Down Expand Up @@ -485,10 +486,6 @@ private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exceptio
}
}

private int maxContentLength() {
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
}

private String opaqueId(int reqNo) {
return getTestName() + "-" + reqNo;
}
Expand Down Expand Up @@ -658,14 +655,22 @@ void sendResponse(RestResponse response) {
int readBytes(int bytes) {
var consumed = 0;
if (recvLast == false) {
while (consumed < bytes) {
stream.next();
var recvChunk = safePoll(recvChunks);
consumed += recvChunk.chunk.length();
recvChunk.chunk.close();
if (recvChunk.isLast) {
recvLast = true;
break;
stream.next();
while (consumed < bytes && streamClosed == false) {
try {
var recvChunk = recvChunks.poll(10, TimeUnit.MILLISECONDS);
if (recvChunk != null) {
consumed += recvChunk.chunk.length();
recvChunk.chunk.close();
if (recvChunk.isLast) {
recvLast = true;
break;
}
stream.next();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpRequestDecoder;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
Expand All @@ -27,18 +24,19 @@
/**
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
* Provides content size handling for non-aggregated requests too.
*/
public class Netty4HttpAggregator extends HttpObjectAggregator {
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;

private final Predicate<HttpPreRequest> decider;
private final Netty4HttpContentSizeHandler streamContentSizeHandler;
private boolean aggregating = true;
private boolean ignoreContentAfterContinueResponse = false;

public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider, HttpRequestDecoder decoder) {
super(maxContentLength);
this.decider = decider;
this.streamContentSizeHandler = new Netty4HttpContentSizeHandler(decoder, maxContentLength);
}

@Override
Expand All @@ -51,34 +49,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (aggregating || msg instanceof FullHttpRequest) {
super.channelRead(ctx, msg);
} else {
handle(ctx, (HttpObject) msg);
}
}

private void handle(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpRequest request) {
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
if (continueResponse != null) {
// there are 3 responses expected: 100, 413, 417
// on 100 we pass request further and reply to client to continue
// on 413/417 we ignore following content
ctx.writeAndFlush(continueResponse);
var resp = (FullHttpResponse) continueResponse;
if (resp.status() != HttpResponseStatus.CONTINUE) {
ignoreContentAfterContinueResponse = true;
return;
}
HttpUtil.set100ContinueExpected(request, false);
}
ignoreContentAfterContinueResponse = false;
ctx.fireChannelRead(msg);
} else {
var httpContent = (HttpContent) msg;
if (ignoreContentAfterContinueResponse) {
httpContent.release();
} else {
ctx.fireChannelRead(msg);
}
streamContentSizeHandler.channelRead(ctx, msg);
}
}
}
Loading

0 comments on commit ddd0ffa

Please sign in to comment.