From f43f1949a4cb8306a916f832ce4bc71a01cb1e4c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 26 Sep 2024 10:49:28 -0400 Subject: [PATCH] NIFI-13807: Ensure that we always close InputStream / OutputSTrema for HttpExchange. Verified fix using InvokeHTTP to hit the health endpoint hundreds of thousands of times and verified NiFi heap usage was still at 200 MB after a young GC event --- .../runtime/HealthClusterHttpHandler.java | 43 ++++++++++--------- .../runtime/HealthDiagnosticsHttpHandler.java | 24 ++++++----- .../nifi/runtime/HealthHttpHandler.java | 21 +++++---- .../HealthStatusHistoryHttpHandler.java | 24 ++++++----- .../apache/nifi/util/HttpExchangeUtils.java | 42 ++++++++++++++++++ 5 files changed, 103 insertions(+), 51 deletions(-) create mode 100644 nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/util/HttpExchangeUtils.java diff --git a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthClusterHttpHandler.java b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthClusterHttpHandler.java index a5d472701e41..c2433bc66379 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthClusterHttpHandler.java +++ b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthClusterHttpHandler.java @@ -22,6 +22,7 @@ import org.apache.nifi.cluster.ClusterDetailsFactory; import org.apache.nifi.cluster.ConnectionState; import org.apache.nifi.controller.DecommissionTask; +import org.apache.nifi.util.HttpExchangeUtils; import java.io.IOException; import java.io.OutputStream; @@ -57,28 +58,30 @@ class HealthClusterHttpHandler implements HttpHandler { @Override public void handle(final HttpExchange exchange) throws IOException { + HttpExchangeUtils.drainRequestBody(exchange); + final String requestMethod = exchange.getRequestMethod(); - final OutputStream responseBody = exchange.getResponseBody(); - - if (GET_METHOD.contentEquals(requestMethod)) { - exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN); - final ConnectionState connectionState = getConnectionState(); - final String status = STATUS.formatted(connectionState); - final byte[] response = status.getBytes(StandardCharsets.UTF_8); - final int responseCode = getResponseCode(connectionState); - exchange.sendResponseHeaders(responseCode, response.length); - responseBody.write(response); - } else if (DELETE_METHOD.contentEquals(requestMethod)) { - startDecommission(); - - exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN); - final String status = STATUS.formatted(ConnectionState.OFFLOADING); - final byte[] response = status.getBytes(StandardCharsets.UTF_8); - exchange.sendResponseHeaders(HTTP_ACCEPTED, response.length); - responseBody.write(response); - } else { - exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY); + try (final OutputStream responseBody = exchange.getResponseBody()) { + if (GET_METHOD.contentEquals(requestMethod)) { + exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN); + final ConnectionState connectionState = getConnectionState(); + final String status = STATUS.formatted(connectionState); + final byte[] response = status.getBytes(StandardCharsets.UTF_8); + final int responseCode = getResponseCode(connectionState); + exchange.sendResponseHeaders(responseCode, response.length); + responseBody.write(response); + } else if (DELETE_METHOD.contentEquals(requestMethod)) { + startDecommission(); + + exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN); + final String status = STATUS.formatted(ConnectionState.OFFLOADING); + final byte[] response = status.getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(HTTP_ACCEPTED, response.length); + responseBody.write(response); + } else { + exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY); + } } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthDiagnosticsHttpHandler.java b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthDiagnosticsHttpHandler.java index 3f4e37b81b45..c232724c5940 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthDiagnosticsHttpHandler.java +++ b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthDiagnosticsHttpHandler.java @@ -21,6 +21,7 @@ import org.apache.nifi.NiFiServer; import org.apache.nifi.diagnostics.DiagnosticsDump; import org.apache.nifi.diagnostics.DiagnosticsFactory; +import org.apache.nifi.util.HttpExchangeUtils; import java.io.IOException; import java.io.OutputStream; @@ -54,22 +55,23 @@ class HealthDiagnosticsHttpHandler implements HttpHandler { @Override public void handle(final HttpExchange exchange) throws IOException { - final String requestMethod = exchange.getRequestMethod(); + HttpExchangeUtils.drainRequestBody(exchange); - if (GET_METHOD.contentEquals(requestMethod)) { - exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN); - exchange.sendResponseHeaders(HTTP_OK, STREAM_RESPONSE_BODY); + final String requestMethod = exchange.getRequestMethod(); + try (final OutputStream responseBody = exchange.getResponseBody()) { + if (GET_METHOD.contentEquals(requestMethod)) { + exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN); + exchange.sendResponseHeaders(HTTP_OK, STREAM_RESPONSE_BODY); - final URI requestUri = exchange.getRequestURI(); - final boolean verboseRequested = getVerboseRequested(requestUri); + final URI requestUri = exchange.getRequestURI(); + final boolean verboseRequested = getVerboseRequested(requestUri); - final DiagnosticsFactory diagnosticsFactory = server.getDiagnosticsFactory(); - final DiagnosticsDump diagnosticsDump = diagnosticsFactory.create(verboseRequested); - try (OutputStream responseBody = exchange.getResponseBody()) { + final DiagnosticsFactory diagnosticsFactory = server.getDiagnosticsFactory(); + final DiagnosticsDump diagnosticsDump = diagnosticsFactory.create(verboseRequested); diagnosticsDump.writeTo(responseBody); + } else { + exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY); } - } else { - exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthHttpHandler.java b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthHttpHandler.java index f37e980fa928..bed568c362a3 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthHttpHandler.java +++ b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthHttpHandler.java @@ -18,6 +18,7 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import org.apache.nifi.util.HttpExchangeUtils; import java.io.IOException; import java.io.OutputStream; @@ -42,17 +43,19 @@ class HealthHttpHandler implements HttpHandler { @Override public void handle(final HttpExchange exchange) throws IOException { - final String requestMethod = exchange.getRequestMethod(); + HttpExchangeUtils.drainRequestBody(exchange); - final OutputStream responseBody = exchange.getResponseBody(); + final String requestMethod = exchange.getRequestMethod(); - if (GET_METHOD.contentEquals(requestMethod)) { - exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN); - final byte[] response = STATUS_UP.getBytes(StandardCharsets.UTF_8); - exchange.sendResponseHeaders(HTTP_OK, response.length); - responseBody.write(response); - } else { - exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY); + try (final OutputStream responseBody = exchange.getResponseBody()) { + if (GET_METHOD.contentEquals(requestMethod)) { + exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, TEXT_PLAIN); + final byte[] response = STATUS_UP.getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(HTTP_OK, response.length); + responseBody.write(response); + } else { + exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY); + } } } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthStatusHistoryHttpHandler.java b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthStatusHistoryHttpHandler.java index 6483e0d92bc7..a91485376a42 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthStatusHistoryHttpHandler.java +++ b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthStatusHistoryHttpHandler.java @@ -21,6 +21,7 @@ import org.apache.nifi.NiFiServer; import org.apache.nifi.controller.status.history.StatusHistoryDump; import org.apache.nifi.controller.status.history.StatusHistoryDumpFactory; +import org.apache.nifi.util.HttpExchangeUtils; import java.io.IOException; import java.io.OutputStream; @@ -60,23 +61,24 @@ class HealthStatusHistoryHttpHandler implements HttpHandler { @Override public void handle(final HttpExchange exchange) throws IOException { - final String requestMethod = exchange.getRequestMethod(); + HttpExchangeUtils.drainRequestBody(exchange); - if (GET_METHOD.contentEquals(requestMethod)) { - exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, APPLICATION_JSON); - exchange.sendResponseHeaders(HTTP_OK, STREAM_RESPONSE_BODY); + final String requestMethod = exchange.getRequestMethod(); - final URI requestUri = exchange.getRequestURI(); - final int daysRequested = getDaysRequested(requestUri); + try (final OutputStream responseBody = exchange.getResponseBody()) { + if (GET_METHOD.contentEquals(requestMethod)) { + exchange.getResponseHeaders().set(CONTENT_TYPE_HEADER, APPLICATION_JSON); + exchange.sendResponseHeaders(HTTP_OK, STREAM_RESPONSE_BODY); - final StatusHistoryDumpFactory statusHistoryDumpFactory = server.getStatusHistoryDumpFactory(); - final StatusHistoryDump statusHistoryDump = statusHistoryDumpFactory.create(daysRequested); + final URI requestUri = exchange.getRequestURI(); + final int daysRequested = getDaysRequested(requestUri); - try (OutputStream responseBody = exchange.getResponseBody()) { + final StatusHistoryDumpFactory statusHistoryDumpFactory = server.getStatusHistoryDumpFactory(); + final StatusHistoryDump statusHistoryDump = statusHistoryDumpFactory.create(daysRequested); statusHistoryDump.writeTo(responseBody); + } else { + exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY); } - } else { - exchange.sendResponseHeaders(HTTP_BAD_METHOD, NO_RESPONSE_BODY); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/util/HttpExchangeUtils.java b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/util/HttpExchangeUtils.java new file mode 100644 index 000000000000..5801b43b482b --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/util/HttpExchangeUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.util; + +import com.sun.net.httpserver.HttpExchange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; + +public class HttpExchangeUtils { + private static final Logger logger = LoggerFactory.getLogger(HttpExchangeUtils.class); + + public static void drainRequestBody(final HttpExchange exchange) { + final byte[] buffer = new byte[4096]; + try (final InputStream in = exchange.getRequestBody()) { + while ((in.read(buffer)) != -1) { + // Ignore the data read, just drain the input stream + } + } catch (final IOException ioe) { + // Since we don't actually care about the contents of the input, we will ignore any Exceptions when reading from it. + logger.debug("Failed to fully drain HttpExchange InputStream from {}", exchange.getRequestURI(), ioe); + } + } + +}