diff --git a/extensions/prometheus/PrometheusMetricsPublisher.cpp b/extensions/prometheus/PrometheusMetricsPublisher.cpp index 36f2391660..f4a010f082 100644 --- a/extensions/prometheus/PrometheusMetricsPublisher.cpp +++ b/extensions/prometheus/PrometheusMetricsPublisher.cpp @@ -85,8 +85,9 @@ std::vector>> Prom metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics); } if (metric_classes_str && !metric_classes_str->empty()) { - auto metric_classes = utils::string::split(*metric_classes_str, ","); - for (const std::string& clazz : metric_classes) { + auto metric_classes = utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ","); + std::unordered_set unique_metric_classes{metric_classes.begin(), metric_classes.end()}; + for (const std::string& clazz : unique_metric_classes) { auto response_nodes = response_node_loader_->loadResponseNodes(clazz); if (response_nodes.empty()) { logger_->log_warn("Metric class '{}' could not be loaded.", clazz); diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index 9eb55f413e..31e8ac93f4 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -69,9 +69,11 @@ class ResponseNodeLoader { mutable std::mutex root_mutex_; mutable std::mutex component_metrics_mutex_; mutable std::mutex system_metrics_mutex_; + mutable std::mutex initialization_mutex_; core::ProcessGroup* root_{}; std::unordered_map> component_metrics_; std::unordered_map system_metrics_; + std::unordered_set initialized_metrics_; std::shared_ptr configuration_; std::vector> repository_metric_sources_; std::shared_ptr flow_configuration_; diff --git a/libminifi/src/c2/C2MetricsPublisher.cpp b/libminifi/src/c2/C2MetricsPublisher.cpp index a58b72bf40..3e90d36fd8 100644 --- a/libminifi/src/c2/C2MetricsPublisher.cpp +++ b/libminifi/src/c2/C2MetricsPublisher.cpp @@ -41,8 +41,9 @@ namespace org::apache::nifi::minifi::c2 { void C2MetricsPublisher::loadNodeClasses(const std::string& class_definitions, const state::response::SharedResponseNode& new_node) { gsl_Expects(response_node_loader_); - auto classes = utils::string::split(class_definitions, ","); - for (const std::string& clazz : classes) { + auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ","); + std::unordered_set unique_classes{classes.begin(), classes.end()}; + for (const std::string& clazz : unique_classes) { auto response_nodes = response_node_loader_->loadResponseNodes(clazz); if (response_nodes.empty()) { continue; @@ -60,9 +61,10 @@ void C2MetricsPublisher::loadC2ResponseConfiguration(const std::string &prefix) return; } - std::vector classes = utils::string::split(class_definitions, ","); + auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ","); + std::unordered_set unique_classes{classes.begin(), classes.end()}; - for (const std::string& metricsClass : classes) { + for (const std::string& metricsClass : unique_classes) { try { std::string option = utils::string::join_pack(prefix, ".", metricsClass); std::string classOption = option + ".classes"; @@ -97,9 +99,10 @@ state::response::SharedResponseNode C2MetricsPublisher::loadC2ResponseConfigurat if (!configuration_->get(prefix, class_definitions)) { return prev_node; } - std::vector classes = utils::string::split(class_definitions, ","); + auto classes = utils::string::splitAndTrimRemovingEmpty(class_definitions, ","); + std::unordered_set unique_classes{classes.begin(), classes.end()}; - for (const std::string& metricsClass : classes) { + for (const std::string& metricsClass : unique_classes) { try { std::string option = utils::string::join_pack(prefix, ".", metricsClass); std::string classOption = option + ".classes"; @@ -111,8 +114,9 @@ state::response::SharedResponseNode C2MetricsPublisher::loadC2ResponseConfigurat } state::response::SharedResponseNode new_node = gsl::make_not_null(std::make_shared(name)); if (name.find(',') != std::string::npos) { - std::vector sub_classes = utils::string::split(name, ","); - for (const std::string& subClassStr : classes) { + auto sub_classes = utils::string::splitAndTrimRemovingEmpty(name, ","); + std::unordered_set unique_sub_classes{sub_classes.begin(), sub_classes.end()}; + for (const std::string& subClassStr : unique_sub_classes) { auto node = loadC2ResponseConfiguration(subClassStr, prev_node); static_cast(prev_node.get())->add_node(node); } @@ -198,9 +202,10 @@ void C2MetricsPublisher::loadMetricNodes() { std::string class_csv; std::lock_guard guard{metrics_mutex_}; if (configuration_->get(minifi::Configuration::nifi_c2_root_classes, class_csv)) { - std::vector classes = utils::string::split(class_csv, ","); + auto classes = utils::string::splitAndTrimRemovingEmpty(class_csv, ","); + std::unordered_set unique_classes{classes.begin(), classes.end()}; - for (const std::string& clazz : classes) { + for (const std::string& clazz : unique_classes) { auto response_nodes = response_node_loader_->loadResponseNodes(clazz); if (response_nodes.empty()) { continue; diff --git a/libminifi/src/core/state/LogMetricsPublisher.cpp b/libminifi/src/core/state/LogMetricsPublisher.cpp index e31428a5ab..bb91a67a5a 100644 --- a/libminifi/src/core/state/LogMetricsPublisher.cpp +++ b/libminifi/src/core/state/LogMetricsPublisher.cpp @@ -99,9 +99,10 @@ void LogMetricsPublisher::loadMetricNodes() { metric_classes_str = configuration_->get(minifi::Configuration::nifi_metrics_publisher_metrics); } if (metric_classes_str && !metric_classes_str->empty()) { - auto metric_classes = utils::string::split(*metric_classes_str, ","); + auto metric_classes = utils::string::splitAndTrimRemovingEmpty(*metric_classes_str, ","); + std::unordered_set unique_metric_classes{metric_classes.begin(), metric_classes.end()}; std::lock_guard lock(response_nodes_mutex_); - for (const std::string& clazz : metric_classes) { + for (const std::string& clazz : unique_metric_classes) { auto loaded_response_nodes = response_node_loader_->loadResponseNodes(clazz); if (loaded_response_nodes.empty()) { logger_->log_warn("Metric class '{}' could not be loaded.", clazz); diff --git a/libminifi/src/core/state/MetricsPublisherFactory.cpp b/libminifi/src/core/state/MetricsPublisherFactory.cpp index aba0ddf11d..d44d706dca 100644 --- a/libminifi/src/core/state/MetricsPublisherFactory.cpp +++ b/libminifi/src/core/state/MetricsPublisherFactory.cpp @@ -41,9 +41,10 @@ std::vector>> createMetricsPubli const std::shared_ptr& configuration, const std::shared_ptr& response_node_loader) { if (auto metrics_publisher_class_str = configuration->get(minifi::Configure::nifi_metrics_publisher_class)) { std::vector>> publishers; - auto publisher_classes = minifi::utils::string::split(*metrics_publisher_class_str, ","); - publishers.reserve(publisher_classes.size()); - for (const auto& publisher_class : publisher_classes) { + auto publisher_classes = minifi::utils::string::splitAndTrimRemovingEmpty(*metrics_publisher_class_str, ","); + std::unordered_set unique_publisher_classes{publisher_classes.begin(), publisher_classes.end()}; + publishers.reserve(unique_publisher_classes.size()); + for (const auto& publisher_class : unique_publisher_classes) { publishers.push_back(createMetricsPublisher(publisher_class, configuration, response_node_loader)); } return publishers; diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index 68b74da48d..e52ed79c5e 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -42,6 +42,10 @@ ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr configuration, } void ResponseNodeLoader::clearConfigRoot() { + { + std::lock_guard guard(initialization_mutex_); + initialized_metrics_.clear(); + } { std::lock_guard guard(system_metrics_mutex_); system_metrics_.clear(); @@ -238,6 +242,10 @@ std::vector ResponseNodeLoader::loadResponseNodes(const std: } for (const auto& response_node : response_nodes) { + std::lock_guard guard(initialization_mutex_); + if (initialized_metrics_.contains(response_node->getName())) { + continue; + } initializeRepositoryMetrics(response_node); initializeQueueMetrics(response_node); initializeAgentIdentifier(response_node); @@ -247,6 +255,7 @@ std::vector ResponseNodeLoader::loadResponseNodes(const std: initializeConfigurationChecksums(response_node); initializeFlowMonitor(response_node); initializeAssetInformation(response_node); + initialized_metrics_.insert(response_node->getName()); } return response_nodes; } diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 39a0a19c75..6065b5ecf6 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -128,6 +128,8 @@ class MetricsHandler: public HeartbeatHandler { static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { return runtime_metrics.HasMember("deviceInfo") && + runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") && + runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") && runtime_metrics.HasMember("flowInfo") && runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") && runtime_metrics["flowInfo"].HasMember("queues") && @@ -135,11 +137,15 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics["flowInfo"]["queues"].HasMember("2438e3c8-015a-1000-79ca-83af40ec1997") && runtime_metrics["flowInfo"]["components"].HasMember("FlowController") && runtime_metrics["flowInfo"]["components"].HasMember("GetTCP") && - runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute"); + runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") && + runtime_metrics.HasMember("agentInfo") && + runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size"); } static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) { return runtime_metrics.HasMember("deviceInfo") && + runtime_metrics["deviceInfo"]["systemInfo"].HasMember("operatingSystem") && + runtime_metrics["deviceInfo"]["networkInfo"].HasMember("hostname") && runtime_metrics.HasMember("flowInfo") && runtime_metrics["flowInfo"].HasMember("versionedFlowSnapshotURI") && runtime_metrics["flowInfo"].HasMember("queues") && @@ -147,7 +153,9 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics["flowInfo"]["queues"].HasMember("8368e3c8-015a-1003-52ca-83af40ec1332") && runtime_metrics["flowInfo"]["components"].HasMember("FlowController") && runtime_metrics["flowInfo"]["components"].HasMember("GenerateFlowFile") && - runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute"); + runtime_metrics["flowInfo"]["components"].HasMember("LogAttribute") && + runtime_metrics.HasMember("agentInfo") && + runtime_metrics["agentInfo"]["status"]["repositories"]["ff"].HasMember("size"); } static bool verifyLoadMetrics(const rapidjson::Value& load_metrics) { @@ -186,11 +194,12 @@ class MetricsHandler: public HeartbeatHandler { TEST_CASE("C2MetricsTest", "[c2test]") { std::atomic_bool metrics_updated_successfully{false}; VerifyC2Metrics harness(metrics_updated_successfully); + harness.getConfiguration()->set("nifi.c2.root.classes", "FlowInformation,AgentInformation"); harness.getConfiguration()->set("nifi.c2.root.class.definitions", "metrics"); harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.name", "metrics"); harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics", "runtimemetrics,loadmetrics,processorMetrics"); harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.name", "RuntimeMetrics"); - harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes", "DeviceInfoNode,FlowInformation"); + harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.runtimemetrics.classes", "DeviceInfoNode,FlowInformation,AssetInformation,DeviceInfoNode,AgentInformation"); harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.name", "LoadMetrics"); harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.loadmetrics.classes", "QueueMetrics,RepositoryMetrics"); harness.getConfiguration()->set("nifi.c2.root.class.definitions.metrics.metrics.processorMetrics.name", "ProcessorMetrics"); diff --git a/libminifi/test/unit/LogMetricsPublisherTests.cpp b/libminifi/test/unit/LogMetricsPublisherTests.cpp index 511113d378..1801bfadf5 100644 --- a/libminifi/test/unit/LogMetricsPublisherTests.cpp +++ b/libminifi/test/unit/LogMetricsPublisherTests.cpp @@ -96,9 +96,9 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs" publisher_.initialize(configuration_, response_node_loader_); publisher_.loadMetricNodes(); using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([info] { - "LogMetrics": { - "RepositoryMetrics": { + std::string expected_log_1 = R"([info] { + "LogMetrics": {)"; + std::string expected_log_2 = R"("RepositoryMetrics": { "provenancerepository": { "running": "false", "full": "false", @@ -117,10 +117,12 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify multiple metric nodes in logs" "rocksDbTableReadersSize": "0", "rocksDbAllMemoryTablesSize": "2048" } - }, - "deviceInfo": { + })"; + std::string expected_log_3 = R"("deviceInfo": { "identifier":)"; - REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log)); + REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_1)); + REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_2)); + REQUIRE(verifyLogLinePresenceInPollTime(5s, expected_log_3)); } TEST_CASE_METHOD(LogPublisherTestFixture, "Verify reloading different metrics", "[LogMetricsPublisher]") { @@ -218,7 +220,7 @@ TEST_CASE_METHOD(LogPublisherTestFixture, "Verify changing log level property fo publisher_.initialize(configuration_, response_node_loader_); publisher_.loadMetricNodes(); using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime; - std::string expected_log = R"([debug] { + std::string expected_log = R"([info] { "LogMetrics": { "RepositoryMetrics": { "provenancerepository": {