From 541f86161aece6e33b08f551c1beefe85d7ff8c0 Mon Sep 17 00:00:00 2001 From: atobisze Date: Thu, 16 Jan 2025 15:07:24 +0100 Subject: [PATCH] Check --- src/BUILD | 1 + src/mediapipe_internal/mediapipegraphdefinition.cpp | 5 +++++ src/mediapipe_internal/mediapipegraphdefinition.hpp | 2 ++ src/mediapipe_internal/mediapipegraphexecutor.hpp | 9 +++++++++ 4 files changed, 17 insertions(+) diff --git a/src/BUILD b/src/BUILD index fbf1916903..7d108399ff 100644 --- a/src/BUILD +++ b/src/BUILD @@ -543,6 +543,7 @@ cc_library( "@mediapipe//mediapipe/calculators/tensor:image_to_tensor_calculator", "@mediapipe//mediapipe/modules/holistic_landmark:holistic_landmark_cpu", "libovmsmediapipe_utils", + "@mediapipe//mediapipe/framework:thread_pool_executor", "@mediapipe//mediapipe/calculators/geti/inference:inference_calculators", "@mediapipe//mediapipe/calculators/geti/utils:utils", "@mediapipe//mediapipe/calculators/geti/utils:emptylabel_calculators", diff --git a/src/mediapipe_internal/mediapipegraphdefinition.cpp b/src/mediapipe_internal/mediapipegraphdefinition.cpp index d34094171e..52f921d311 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.cpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.cpp @@ -194,6 +194,10 @@ MediapipeGraphDefinition::MediapipeGraphDefinition(const std::string name, reporter(std::make_unique(metricConfig, registry, name)) { mgconfig = config; passKfsRequestFlag = false; + if (!sharedThreadPool) { + SPDLOG_ERROR("Created shared Thread Pool XXX"); + sharedThreadPool = std::make_shared(std::thread::hardware_concurrency()); // TODO FIXME should be in MP factory + } } Status MediapipeGraphDefinition::createInputsInfo() { @@ -477,4 +481,5 @@ Status MediapipeGraphDefinition::initializeNodes() { } return StatusCode::OK; } +std::shared_ptr sharedThreadPool; // TODO FIXME should be in MP factory } // namespace ovms diff --git a/src/mediapipe_internal/mediapipegraphdefinition.hpp b/src/mediapipe_internal/mediapipegraphdefinition.hpp index 51cc2101ab..71414430bb 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.hpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.hpp @@ -34,6 +34,7 @@ #include "mediapipe/framework/calculator_graph.h" #include "mediapipe/framework/port/parse_text_proto.h" #include "mediapipe/framework/port/status.h" +#include "mediapipe/framework/thread_pool_executor.h" #pragma GCC diagnostic pop #include "mediapipegraphconfig.hpp" @@ -53,6 +54,7 @@ struct LLMNodeResources; using PythonNodeResourcesMap = std::unordered_map>; using LLMNodeResourcesMap = std::unordered_map>; +extern std::shared_ptr sharedThreadPool; class MediapipeGraphDefinition { friend MediapipeGraphDefinitionUnloadGuard; diff --git a/src/mediapipe_internal/mediapipegraphexecutor.hpp b/src/mediapipe_internal/mediapipegraphexecutor.hpp index b52bc07037..dbea59a5f4 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.hpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.hpp @@ -24,6 +24,7 @@ #include #include "../execution_context.hpp" +#include "../logging.hpp" #include "../model_metric_reporter.hpp" #include "../profiler.hpp" #include "../status.hpp" @@ -103,7 +104,11 @@ class MediapipeGraphExecutor { MetricCounterGuard failedRequestsGuard(this->mediapipeServableMetricReporter->getRequestsMetric(executionContext, false)); MetricGaugeGuard currentGraphsGuard(this->mediapipeServableMetricReporter->currentGraphs.get()); ::mediapipe::CalculatorGraph graph; + SPDLOG_ERROR("SetExecutor XXX"); + std::ignore = graph.SetExecutor("", sharedThreadPool); // TODO FIXME + SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXbegin", this->name); MP_RETURN_ON_FAIL(graph.Initialize(this->config), std::string("failed initialization of MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR); + SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXend", this->name); std::unordered_map outputPollers; for (auto& name : this->outputNames) { if (name.empty()) { @@ -124,7 +129,9 @@ class MediapipeGraphExecutor { inputSidePackets[PYTHON_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->pythonNodeResourcesMap).At(STARTING_TIMESTAMP); inputSidePackets[LLM_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->llmNodeResourcesMap).At(STARTING_TIMESTAMP); #endif + SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} startRunXXXbegin", this->name); MP_RETURN_ON_FAIL(graph.StartRun(inputSidePackets), std::string("start MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_START_ERROR); + SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} startRunXXXend", this->name); ::mediapipe::Packet packet; std::set outputPollersWithReceivedPacket; @@ -216,7 +223,9 @@ class MediapipeGraphExecutor { { OVMS_PROFILE_SCOPE("Mediapipe graph initialization"); // Init + SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} initializationXXX", this->name); MP_RETURN_ON_FAIL(graph.Initialize(this->config), "graph initialization", StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR); + SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} initializationXXX ended", this->name); } { OVMS_PROFILE_SCOPE("Mediapipe graph installing packet observers");