Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check MP thread pool reusage #2970

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ cc_library(
"@mediapipe//mediapipe/calculators/tensor:image_to_tensor_calculator",
"@mediapipe//mediapipe/modules/holistic_landmark:holistic_landmark_cpu",
"libovmsmediapipe_utils",
"@mediapipe//mediapipe/framework:thread_pool_executor",
"@mediapipe//mediapipe/calculators/geti/inference:inference_calculators",
"@mediapipe//mediapipe/calculators/geti/utils:utils",
"@mediapipe//mediapipe/calculators/geti/utils:emptylabel_calculators",
Expand Down
5 changes: 5 additions & 0 deletions src/mediapipe_internal/mediapipegraphdefinition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ MediapipeGraphDefinition::MediapipeGraphDefinition(const std::string name,
reporter(std::make_unique<MediapipeServableMetricReporter>(metricConfig, registry, name)) {
mgconfig = config;
passKfsRequestFlag = false;
if (!sharedThreadPool) {
SPDLOG_ERROR("Created shared Thread Pool XXX");
sharedThreadPool = std::make_shared<mediapipe::ThreadPoolExecutor>(std::thread::hardware_concurrency()); // TODO FIXME should be in MP factory
}
}

Status MediapipeGraphDefinition::createInputsInfo() {
Expand Down Expand Up @@ -477,4 +481,5 @@ Status MediapipeGraphDefinition::initializeNodes() {
}
return StatusCode::OK;
}
std::shared_ptr<mediapipe::ThreadPoolExecutor> sharedThreadPool; // TODO FIXME should be in MP factory
} // namespace ovms
2 changes: 2 additions & 0 deletions src/mediapipe_internal/mediapipegraphdefinition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "mediapipe/framework/calculator_graph.h"
#include "mediapipe/framework/port/parse_text_proto.h"
#include "mediapipe/framework/port/status.h"
#include "mediapipe/framework/thread_pool_executor.h"
#pragma GCC diagnostic pop

#include "mediapipegraphconfig.hpp"
Expand All @@ -53,6 +54,7 @@ struct LLMNodeResources;
using PythonNodeResourcesMap = std::unordered_map<std::string, std::shared_ptr<PythonNodeResources>>;
using LLMNodeResourcesMap = std::unordered_map<std::string, std::shared_ptr<LLMNodeResources>>;

extern std::shared_ptr<mediapipe::ThreadPoolExecutor> sharedThreadPool;
class MediapipeGraphDefinition {
friend MediapipeGraphDefinitionUnloadGuard;

Expand Down
9 changes: 9 additions & 0 deletions src/mediapipe_internal/mediapipegraphexecutor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <vector>

#include "../execution_context.hpp"
#include "../logging.hpp"
#include "../model_metric_reporter.hpp"
#include "../profiler.hpp"
#include "../status.hpp"
Expand Down Expand Up @@ -103,7 +104,11 @@ class MediapipeGraphExecutor {
MetricCounterGuard failedRequestsGuard(this->mediapipeServableMetricReporter->getRequestsMetric(executionContext, false));
MetricGaugeGuard currentGraphsGuard(this->mediapipeServableMetricReporter->currentGraphs.get());
::mediapipe::CalculatorGraph graph;
SPDLOG_ERROR("SetExecutor XXX");
std::ignore = graph.SetExecutor("", sharedThreadPool); // TODO FIXME
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXbegin", this->name);
MP_RETURN_ON_FAIL(graph.Initialize(this->config), std::string("failed initialization of MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR);
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} initializationXXXend", this->name);
std::unordered_map<std::string, ::mediapipe::OutputStreamPoller> outputPollers;
for (auto& name : this->outputNames) {
if (name.empty()) {
Expand All @@ -124,7 +129,9 @@ class MediapipeGraphExecutor {
inputSidePackets[PYTHON_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<PythonNodeResourcesMap>(this->pythonNodeResourcesMap).At(STARTING_TIMESTAMP);
inputSidePackets[LLM_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<LLMNodeResourcesMap>(this->llmNodeResourcesMap).At(STARTING_TIMESTAMP);
#endif
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} startRunXXXbegin", this->name);
MP_RETURN_ON_FAIL(graph.StartRun(inputSidePackets), std::string("start MediaPipe graph: ") + this->name, StatusCode::MEDIAPIPE_GRAPH_START_ERROR);
SPDLOG_ERROR("Start unary KServe request mediapipe graph: {} startRunXXXend", this->name);

::mediapipe::Packet packet;
std::set<std::string> outputPollersWithReceivedPacket;
Expand Down Expand Up @@ -216,7 +223,9 @@ class MediapipeGraphExecutor {
{
OVMS_PROFILE_SCOPE("Mediapipe graph initialization");
// Init
SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} initializationXXX", this->name);
MP_RETURN_ON_FAIL(graph.Initialize(this->config), "graph initialization", StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR);
SPDLOG_DEBUG("Start unary KServe request mediapipe graph: {} initializationXXX ended", this->name);
}
{
OVMS_PROFILE_SCOPE("Mediapipe graph installing packet observers");
Expand Down