Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINFICPP-2243 - In ListenHTTP process incoming request only in onTrigger #1826

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 163 additions & 81 deletions extensions/civetweb/processors/ListenHTTP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ void ListenHTTP::onSchedule(core::ProcessContext& context, core::ProcessSessionF
context.getProperty(BatchSize, batch_size_);
logger_->log_debug("ListenHTTP using {}: {}", BatchSize.name, batch_size_);

handler_ = std::make_unique<Handler>(basePath, &context, std::move(authDNPattern),
std::optional<std::string> flow_id;
if (auto flow_version = context.getProcessorNode()->getFlowIdentifier()) {
flow_id = flow_version->getFlowId();
}

handler_ = std::make_unique<Handler>(basePath, flow_id, context.getProperty<uint64_t>(BufferSize).value_or(0), std::move(authDNPattern),
headersAsAttributesPattern.empty() ? std::nullopt : std::make_optional<utils::Regex>(headersAsAttributesPattern));
server_->addHandler(basePath, handler_.get());

Expand All @@ -174,13 +179,25 @@ ListenHTTP::~ListenHTTP() = default;

void ListenHTTP::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
logger_->log_trace("OnTrigger ListenHTTP");
bool restored_processed = false;
for (auto& ff : file_store_.getNewFlowFiles()) {
restored_processed = true;
if (!processFlowFile(ff)) {
session.remove(ff);
}
}
const bool incoming_processed = processIncomingFlowFile(session);
const bool request_processed = processRequestBuffer(session);
if (!incoming_processed && !request_processed) {
if (!restored_processed && !incoming_processed && !request_processed) {
yield();
}
}

void ListenHTTP::restore(const std::shared_ptr<core::FlowFile>& flowFile) {
if (!flowFile) return;
file_store_.put(flowFile);
}

/// @return Whether there was a flow file processed.
bool ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) {
std::shared_ptr<core::FlowFile> flow_file = session.get();
Expand All @@ -191,52 +208,97 @@ bool ListenHTTP::processIncomingFlowFile(core::ProcessSession &session) {
std::string type;
flow_file->getAttribute("http.type", type);

if (type == "response_body" && handler_) {
ResponseBody response;
flow_file->getAttribute("filename", response.uri);
flow_file->getAttribute("mime.type", response.mime_type);
if (response.mime_type.empty()) {
logger_->log_warn("Using default mime type of application/octet-stream for response body file: {}", response.uri);
response.mime_type = "application/octet-stream";
}
response.body = session.readBuffer(flow_file).buffer;
handler_->setResponseBody(response);
if (type == "response_body" && handler_ && processFlowFile(flow_file)) {
session.transfer(flow_file, Self);
} else {
session.remove(flow_file);
}

session.remove(flow_file);
return true;
}

bool ListenHTTP::processFlowFile(const std::shared_ptr<core::FlowFile>& flow_file) {
ResponseBody response;
flow_file->getAttribute("filename", response.uri);
flow_file->getAttribute("mime.type", response.mime_type);
if (response.mime_type.empty()) {
logger_->log_warn("Using default mime type of application/octet-stream for response body file: {}", response.uri);
response.mime_type = "application/octet-stream";
}

response.flow_file = flow_file;
return handler_->setResponseBody(response);
}

/// @return Whether there was a request processed
bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) {
gsl_Expects(handler_);
std::size_t flow_file_count = 0;
for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) {
FlowFileBufferPair flow_file_buffer_pair;
if (!handler_->dequeueRequest(flow_file_buffer_pair)) {
Handler::Request req;
if (!handler_->dequeueRequest(req)) {
break;
}

auto flow_file = flow_file_buffer_pair.first;
session.add(flow_file);

if (flow_file_buffer_pair.second) {
session.writeBuffer(flow_file, flow_file_buffer_pair.second->getBuffer());
}

session.transfer(flow_file, Success);
[&] {
std::promise<void> req_done_promise;
auto res = req_done_promise.get_future();
req.set_value(Handler::RequestValue{std::ref(session), std::move(req_done_promise)});
return res;
}().wait();
}

logger_->log_debug("ListenHTTP transferred {} flow files from HTTP request buffer", flow_file_count);
return flow_file_count > 0;
}

ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext *context, std::string &&auth_dn_regex, std::optional<utils::Regex> &&headers_as_attrs_regex)
namespace {

class MgConnectionInputStream : public io::InputStream {
public:
MgConnectionInputStream(struct mg_connection* conn, std::optional<size_t> size): conn_(conn), netstream_size_limit_(size) {}

size_t read(std::span<std::byte> out_buffer) override {
const auto read_size_limit = netstream_size_limit_.value_or(std::numeric_limits<size_t>::max()) - netstream_offset_;
const auto limited_out_buf = out_buffer.subspan(0, std::min(out_buffer.size(), read_size_limit));
const auto mg_read_return = mg_read(conn_, limited_out_buf.data(), limited_out_buf.size());
if (mg_read_return <= 0) {
return 0;
}
szaszm marked this conversation as resolved.
Show resolved Hide resolved
netstream_offset_ += gsl::narrow<size_t>(mg_read_return);
return gsl::narrow<size_t>(mg_read_return);
}

private:
struct mg_connection* conn_;
size_t netstream_offset_{0}; // how much has been read from conn_
std::optional<size_t> netstream_size_limit_; // how much can we read from conn_
lordgamez marked this conversation as resolved.
Show resolved Hide resolved
};

class MgConnectionOutputStream : public io::OutputStream {
public:
explicit MgConnectionOutputStream(struct mg_connection* conn): conn_(conn) {}

size_t write(const uint8_t *value, size_t len) override {
const auto mg_write_return = mg_write(conn_, reinterpret_cast<const void*>(value), len);
if (mg_write_return <= 0) {
return io::STREAM_ERROR;
}
return gsl::narrow<size_t>(mg_write_return);
}

private:
struct mg_connection* conn_;
};

} // namespace

ListenHTTP::Handler::Handler(std::string base_uri, std::optional<std::string> flow_id, uint64_t buffer_size, std::string &&auth_dn_regex, std::optional<utils::Regex> &&headers_as_attrs_regex)
: base_uri_(std::move(base_uri)),
flow_id_(std::move(flow_id)),
auth_dn_regex_(std::move(auth_dn_regex)),
headers_as_attrs_regex_(std::move(headers_as_attrs_regex)),
process_context_(context) {
context->getProperty(BufferSize, buffer_size_);
buffer_size_(buffer_size) {
logger_->log_debug("ListenHTTP using {}: {}", BufferSize.name, buffer_size_);
}

Expand Down Expand Up @@ -269,25 +331,58 @@ void ListenHTTP::Handler::setHeaderAttributes(const mg_request_info *req_info, c
}
}

void ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const mg_request_info *req_info, std::unique_ptr<io::BufferStream> content_buffer) {
auto flow_file = std::make_shared<FlowFileRecord>();
auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier();
if (flow_version != nullptr) {
flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
void ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const mg_request_info *req_info, bool write_body) {
if (buffer_size_ != 0 && request_buffer_.size() >= buffer_size_) {
logger_->log_warn("ListenHTTP buffer is full, '{}' request for '{}' uri was dropped", req_info->request_method, req_info->request_uri);
sendHttp503(conn);
return;
} else {
logger_->log_warn("ListenHTTP buffer is NOT full {}/{}, '{}' request for '{}' uri was dropped", request_buffer_.size() + 1, buffer_size_, req_info->request_method, req_info->request_uri);
}

setHeaderAttributes(req_info, *flow_file);
Request req;
auto req_triggered = req.get_future();

if (buffer_size_ == 0 || request_buffer_.size() < buffer_size_) {
request_buffer_.enqueue(std::make_pair(std::move(flow_file), std::move(content_buffer)));
} else {
logger_->log_warn("ListenHTTP buffer is full, '{}' request for '{}' uri was dropped", req_info->request_method, req_info->request_uri);
{
std::lock_guard lock(request_mtx_);
if (!running_) {
sendHttp503(conn);
return;
}

request_buffer_.enqueue(std::move(req));
}

auto req_result = req_triggered.get();
if (!req_result) {
sendHttp503(conn);
req_result.error().ret.set_value();
return;
}

auto& [session, req_done] = *req_result;

auto flow_file = session.get().create();
if (flow_id_) {
flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, flow_id_.value());
}

if (write_body) {
session.get().write(flow_file, [&] (auto& out) {
std::optional<size_t> request_size = std::nullopt;
if (req_info->content_length > 0) { request_size = gsl::narrow<size_t>(req_info->content_length); }
MgConnectionInputStream mg_body{conn, request_size};
return minifi::internal::pipe(mg_body, *out);
});
}

setHeaderAttributes(req_info, *flow_file);
mg_printf(conn, "HTTP/1.1 200 OK\r\n");
writeBody(conn, req_info);
writeBody(&session.get(), conn, req_info);

session.get().transfer(flow_file, Success);

req_done.set_value();
}

bool ListenHTTP::Handler::handlePost(CivetServer* /*server*/, struct mg_connection *conn) {
Expand All @@ -305,7 +400,7 @@ bool ListenHTTP::Handler::handlePost(CivetServer* /*server*/, struct mg_connecti
// Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");

enqueueRequest(conn, req_info, createContentBuffer(conn, req_info));
enqueueRequest(conn, req_info, true);
return true;
}

Expand Down Expand Up @@ -336,7 +431,7 @@ bool ListenHTTP::Handler::handleGet(CivetServer* /*server*/, struct mg_connectio
return true;
}

enqueueRequest(conn, req_info, nullptr);
enqueueRequest(conn, req_info, false);
return true;
}

Expand All @@ -353,7 +448,7 @@ bool ListenHTTP::Handler::handleHead(CivetServer* /*server*/, struct mg_connecti
}

mg_printf(conn, "HTTP/1.1 200 OK\r\n");
writeBody(conn, req_info, false /*include_payload*/);
writeBody(nullptr, conn, req_info);

return true;
}
Expand All @@ -372,26 +467,28 @@ bool ListenHTTP::Handler::handleDelete(CivetServer* /*server*/, struct mg_connec
return true;
}

void ListenHTTP::Handler::setResponseBody(const ResponseBody& response) {
bool ListenHTTP::Handler::setResponseBody(const ResponseBody& response) {
std::lock_guard<std::mutex> guard(uri_map_mutex_);

if (response.body.empty()) {
if (response.flow_file->getSize() == 0) {
logger_->log_info("Unregistering response body for URI '{}'",
response.uri);
response_uri_map_.erase(response.uri);
return false;
} else {
logger_->log_info("Registering response body for URI '{}' of length {}",
response.uri,
response.body.size());
response.flow_file->getSize());
response_uri_map_[response.uri] = response;
return true;
}
}

bool ListenHTTP::Handler::dequeueRequest(FlowFileBufferPair &flow_file_buffer_pair) {
return request_buffer_.tryDequeue(flow_file_buffer_pair);
bool ListenHTTP::Handler::dequeueRequest(Request& req) {
return request_buffer_.tryDequeue(req);
}

void ListenHTTP::Handler::writeBody(mg_connection *conn, const mg_request_info *req_info, bool include_payload /*=true*/) {
void ListenHTTP::Handler::writeBody(core::ProcessSession* payload_reader, mg_connection *conn, const mg_request_info *req_info) {
const auto &request_uri_str = std::string(req_info->request_uri);

if (request_uri_str.size() > base_uri_.size() + 1) {
Expand All @@ -406,16 +503,19 @@ void ListenHTTP::Handler::writeBody(mg_connection *conn, const mg_request_info *
}
}

if (!response.body.empty()) {
logger_->log_debug("Writing response body of {} bytes for URI: {}", response.body.size(), req_info->request_uri);
if (response.flow_file && response.flow_file->getSize() != 0) {
logger_->log_debug("Writing response body of {} bytes for URI: {}", response.flow_file->getSize(), req_info->request_uri);
mg_printf(conn, "Content-type: ");
mg_printf(conn, "%s", response.mime_type.c_str());
mg_printf(conn, "\r\n");
mg_printf(conn, "Content-length: ");
mg_printf(conn, "%s", std::to_string(response.body.size()).c_str());
mg_printf(conn, "%s", std::to_string(response.flow_file->getSize()).c_str());
mg_printf(conn, "\r\n\r\n");
if (include_payload) {
mg_write(conn, reinterpret_cast<char*>(response.body.data()), response.body.size());
if (payload_reader) {
payload_reader->read(response.flow_file, [&] (auto& content) {
MgConnectionOutputStream out{conn};
return minifi::internal::pipe(*content, out);
});
}
} else {
logger_->log_debug("No response body available for URI: {}", req_info->request_uri);
Expand All @@ -427,36 +527,6 @@ void ListenHTTP::Handler::writeBody(mg_connection *conn, const mg_request_info *
}
}

std::unique_ptr<io::BufferStream> ListenHTTP::Handler::createContentBuffer(struct mg_connection *conn, const struct mg_request_info *req_info) {
auto content_buffer = std::make_unique<io::BufferStream>();
size_t nlen = 0;
int64_t tlen = req_info->content_length;
std::array<uint8_t, 16384> buf{};

// if we have no content length we should call mg_read until
// there is no data left from the stream to be HTTP/1.1 compliant
while (tlen == -1 || (tlen > 0 && nlen < gsl::narrow<size_t>(tlen))) {
auto rlen = tlen == -1 ? buf.size() : gsl::narrow<size_t>(tlen) - nlen;
if (rlen > buf.size()) {
rlen = buf.size();
}

// Read a buffer of data from client
const auto mg_read_return = mg_read(conn, buf.data(), rlen);
if (mg_read_return <= 0) {
break;
}
rlen = gsl::narrow<size_t>(mg_read_return);

// Transfer buffer data to the output stream
content_buffer->write(buf.data(), rlen);

nlen += rlen;
}

return content_buffer;
}

bool ListenHTTP::isSecure() const {
return (listeningPort.length() > 0) && *listeningPort.rbegin() == 's';
}
Expand All @@ -469,10 +539,22 @@ std::string ListenHTTP::getPort() const {
}

void ListenHTTP::notifyStop() {
if (handler_) {
handler_->stop();
}

server_.reset();
handler_.reset();
}

std::set<core::Connectable*> ListenHTTP::getOutGoingConnections(const std::string &relationship) {
auto result = core::Processor::getOutGoingConnections(relationship);
if (relationship == Self.name) {
result.insert(this);
}
return result;
}

REGISTER_RESOURCE(ListenHTTP, Processor);

} // namespace org::apache::nifi::minifi::processors
Loading
Loading