Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2503 Remove C2 metric duplication in heartbeat #1908

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions extensions/prometheus/PrometheusMetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ std::vector<gsl::not_null<std::shared_ptr<state::PublishedMetricProvider>>> Prom
metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics);
}
if (metric_classes_str && !metric_classes_str->empty()) {
auto metric_classes = utils::string::split(*metric_classes_str, ",");
for (const std::string& clazz : metric_classes) {
auto metric_classes = utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ",");
std::unordered_set<std::string> unique_metric_classes{metric_classes.begin(), metric_classes.end()};
for (const std::string& clazz : unique_metric_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (response_nodes.empty()) {
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
Expand Down
2 changes: 2 additions & 0 deletions libminifi/include/core/state/nodes/ResponseNodeLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ class ResponseNodeLoader {
mutable std::mutex root_mutex_;
mutable std::mutex component_metrics_mutex_;
mutable std::mutex system_metrics_mutex_;
mutable std::mutex initialization_mutex_;
core::ProcessGroup* root_{};
std::unordered_map<std::string, std::vector<SharedResponseNode>> component_metrics_;
std::unordered_map<std::string, SharedResponseNode> system_metrics_;
std::unordered_set<std::string> initialized_metrics_;
std::shared_ptr<Configure> configuration_;
std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources_;
std::shared_ptr<core::FlowConfiguration> flow_configuration_;
Expand Down
25 changes: 15 additions & 10 deletions libminifi/src/c2/C2MetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ namespace org::apache::nifi::minifi::c2 {

void C2MetricsPublisher::loadNodeClasses(const std::string& class_definitions, const state::response::SharedResponseNode& new_node) {
gsl_Expects(response_node_loader_);
auto classes = utils::string::split(class_definitions, ",");
for (const std::string& clazz : classes) {
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};
for (const std::string& clazz : unique_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (response_nodes.empty()) {
continue;
Expand All @@ -60,9 +61,10 @@ void C2MetricsPublisher::loadC2ResponseConfiguration(const std::string &prefix)
return;
}

std::vector<std::string> classes = utils::string::split(class_definitions, ",");
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};

for (const std::string& metricsClass : classes) {
for (const std::string& metricsClass : unique_classes) {
try {
std::string option = utils::string::join_pack(prefix, ".", metricsClass);
std::string classOption = option + ".classes";
Expand Down Expand Up @@ -97,9 +99,10 @@ state::response::SharedResponseNode C2MetricsPublisher::loadC2ResponseConfigurat
if (!configuration_->get(prefix, class_definitions)) {
return prev_node;
}
std::vector<std::string> classes = utils::string::split(class_definitions, ",");
auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};

for (const std::string& metricsClass : classes) {
for (const std::string& metricsClass : unique_classes) {
try {
std::string option = utils::string::join_pack(prefix, ".", metricsClass);
std::string classOption = option + ".classes";
Expand All @@ -111,8 +114,9 @@ state::response::SharedResponseNode C2MetricsPublisher::loadC2ResponseConfigurat
}
state::response::SharedResponseNode new_node = gsl::make_not_null(std::make_shared<state::response::ObjectNode>(name));
if (name.find(',') != std::string::npos) {
std::vector<std::string> sub_classes = utils::string::split(name, ",");
for (const std::string& subClassStr : classes) {
auto sub_classes = utils::string::splitAndTrimRemovingEmpty(name, ",");
std::unordered_set<std::string> unique_sub_classes{sub_classes.begin(), sub_classes.end()};
for (const std::string& subClassStr : unique_sub_classes) {
auto node = loadC2ResponseConfiguration(subClassStr, prev_node);
static_cast<state::response::ObjectNode*>(prev_node.get())->add_node(node);
}
Expand Down Expand Up @@ -198,9 +202,10 @@ void C2MetricsPublisher::loadMetricNodes() {
std::string class_csv;
std::lock_guard<std::mutex> guard{metrics_mutex_};
if (configuration_->get(minifi::Configuration::nifi_c2_root_classes, class_csv)) {
std::vector<std::string> classes = utils::string::split(class_csv, ",");
auto classes = utils::string::splitAndTrimRemovingEmpty(class_csv, ",");
std::unordered_set<std::string> unique_classes{classes.begin(), classes.end()};

for (const std::string& clazz : classes) {
for (const std::string& clazz : unique_classes) {
auto response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (response_nodes.empty()) {
continue;
Expand Down
5 changes: 3 additions & 2 deletions libminifi/src/core/state/LogMetricsPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,10 @@ void LogMetricsPublisher::loadMetricNodes() {
metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics);
}
if (metric_classes_str && !metric_classes_str->empty()) {
auto metric_classes = utils::string::split(*metric_classes_str, ",");
auto metric_classes = utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ",");
std::unordered_set<std::string> unique_metric_classes{metric_classes.begin(), metric_classes.end()};
std::lock_guard<std::mutex> lock(response_nodes_mutex_);
for (const std::string& clazz : metric_classes) {
for (const std::string& clazz : unique_metric_classes) {
auto loaded_response_nodes = response_node_loader_->loadResponseNodes(clazz);
if (loaded_response_nodes.empty()) {
logger_->log_warn("Metric class '{}' could not be loaded.", clazz);
Expand Down
7 changes: 4 additions & 3 deletions libminifi/src/core/state/MetricsPublisherFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ std::vector<gsl::not_null<std::unique_ptr<MetricsPublisher>>> createMetricsPubli
const std::shared_ptr<Configure>& configuration, const std::shared_ptr<state::response::ResponseNodeLoader>& response_node_loader) {
if (auto metrics_publisher_class_str = configuration->get(minifi::Configure::nifi_metrics_publisher_class)) {
std::vector<gsl::not_null<std::unique_ptr<MetricsPublisher>>> publishers;
auto publisher_classes = minifi::utils::string::split(*metrics_publisher_class_str, ",");
publishers.reserve(publisher_classes.size());
for (const auto& publisher_class : publisher_classes) {
auto publisher_classes = minifi::utils::string::splitAndTrimRemovingEmpty(*metrics_publisher_class_str, ",");
std::unordered_set<std::string> unique_publisher_classes{publisher_classes.begin(), publisher_classes.end()};
publishers.reserve(unique_publisher_classes.size());
for (const auto& publisher_class : unique_publisher_classes) {
publishers.push_back(createMetricsPublisher(publisher_class, configuration, response_node_loader));
}
return publishers;
Expand Down
9 changes: 9 additions & 0 deletions libminifi/src/core/state/nodes/ResponseNodeLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> configuration,
}

void ResponseNodeLoader::clearConfigRoot() {
{
std::lock_guard<std::mutex> guard(initialization_mutex_);
initialized_metrics_.clear();
}
{
std::lock_guard<std::mutex> guard(system_metrics_mutex_);
system_metrics_.clear();
Expand Down Expand Up @@ -237,7 +241,11 @@ std::vector<SharedResponseNode> ResponseNodeLoader::loadResponseNodes(const std:
return {};
}

std::lock_guard<std::mutex> guard(initialization_mutex_);
for (const auto& response_node : response_nodes) {
if (initialized_metrics_.contains(response_node->getName())) {
continue;
}
initializeRepositoryMetrics(response_node);
initializeQueueMetrics(response_node);
initializeAgentIdentifier(response_node);
Expand All @@ -247,6 +255,7 @@ std::vector<SharedResponseNode> ResponseNodeLoader::loadResponseNodes(const std:
initializeConfigurationChecksums(response_node);
initializeFlowMonitor(response_node);
initializeAssetInformation(response_node);
initialized_metrics_.insert(response_node->getName());
}
return response_nodes;
}
Expand Down
15 changes: 12 additions & 3 deletions libminifi/test/integration/C2MetricsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,26 +128,34 @@ class MetricsHandler: public HeartbeatHandler {

static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") &&
runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") &&
runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") &&
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
runtime_metrics.HasMember("agentInfo") &&
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
}

static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) {
return runtime_metrics.HasMember("deviceInfo") &&
runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") &&
runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") &&
runtime_metrics.HasMember("flowInfo") &&
runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") &&
runtime_metrics["flowInfo"].HasMember("queues") &&
runtime_metrics["flowInfo"].HasMember("components") &&
runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") &&
runtime_metrics["flowInfo"]["components"].HasMember("FlowController") &&
runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") &&
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute");
runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") &&
runtime_metrics.HasMember("agentInfo") &&
runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size");
}

static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) {
Expand Down Expand Up @@ -186,11 +194,12 @@ class MetricsHandler: public HeartbeatHandler {
TEST_CASE("C2MetricsTest", "[c2test]") {
std::atomic_bool metrics_updated_successfully{false};
VerifyC2Metrics harness(metrics_updated_successfully);
harness.getConfiguration()->set("nifi.c2.root.classes", "FlowInformation,AgentInformation");
harness.getConfiguration()->set("nifi.c2.root.class.definitions", "metrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.name", "metrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics", "runtimemetrics,loadmetrics,processorMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name", "RuntimeMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes", "DeviceInfoNode,FlowInformation");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes", "DeviceInfoNode,FlowInformation,AssetInformation,DeviceInfoNode,AgentInformation");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics");
harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics");
Expand Down
16 changes: 9 additions & 7 deletions libminifi/test/unit/LogMetricsPublisherTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs"
publisher_.initialize(configuration_, response_node_loader_);
publisher_.loadMetricNodes();
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
std::string expected_log = R"([info] {
"LogMetrics": {
"RepositoryMetrics": {
std::string expected_log_1 = R"([info] {
"LogMetrics": {)";
std::string expected_log_2 = R"("RepositoryMetrics": {
"provenancerepository": {
"running": "false",
"full": "false",
Expand All @@ -117,10 +117,12 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs"
"rocksDbTableReadersSize": "0",
"rocksDbAllMemoryTablesSize": "2048"
}
},
"deviceInfo": {
})";
std::string expected_log_3 = R"("deviceInfo": {
"identifier":)";
REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log));
REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_1));
REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_2));
REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_3));
}

TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", "[LogMetricsPublisher]") {
Expand Down Expand Up @@ -218,7 +220,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo
publisher_.initialize(configuration_, response_node_loader_);
publisher_.loadMetricNodes();
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
std::string expected_log = R"([debug] {
std::string expected_log = R"([info] {
"LogMetrics": {
"RepositoryMetrics": {
"provenancerepository": {
Expand Down