Skip to content

Commit

Permalink
MINIFICPP-2505 SchedulingAgent::scheduled_processors_ isnt thread safe
Browse files Browse the repository at this point in the history
Closes #1910

Signed-off-by: Marton Szasz <[email protected]>
  • Loading branch information
martinzink authored and szaszm committed Dec 29, 2024
1 parent ef27b60 commit 212b145
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
4 changes: 2 additions & 2 deletions libminifi/include/SchedulingAgent.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ class SchedulingAgent {
};

std::shared_ptr<core::logging::Logger> logger_;
mutable std::mutex watchdog_mtx_; // used to protect the set below
std::set<SchedulingInfo> scheduled_processors_; // set was chosen to avoid iterator invalidation
mutable std::mutex watchdog_mtx_; // used to protect the vector below
std::vector<gsl::not_null<SchedulingInfo*>> scheduled_processors_;
std::unique_ptr<utils::CallBackTimer> watchDogTimer_;
std::chrono::milliseconds alert_time_;
};
Expand Down
33 changes: 18 additions & 15 deletions libminifi/src/SchedulingAgent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
* limitations under the License.
*/
#include "SchedulingAgent.h"

#include <chrono>
#include <memory>
#include <thread>
#include <utility>
#include <memory>

#include "core/Processor.h"
#include "utils/gsl.h"

Expand Down Expand Up @@ -68,18 +70,19 @@ nonstd::expected<void, std::exception_ptr> SchedulingAgent::triggerAndCommit(cor
return {};
}

auto schedule_it = scheduled_processors_.end();

auto processor_scheduling_info = SchedulingInfo(processor);
{
std::lock_guard<std::mutex> lock(watchdog_mtx_);
schedule_it = scheduled_processors_.emplace(processor).first;
scheduled_processors_.push_back(gsl::make_not_null(&processor_scheduling_info));
}

const auto guard = gsl::finally([this, &schedule_it](){
const auto guard = gsl::finally([this, &processor_scheduling_info](){
std::lock_guard<std::mutex> lock(watchdog_mtx_);
scheduled_processors_.erase(schedule_it);
[[maybe_unused]] const auto erased_scheduling_infos_count = std::erase(scheduled_processors_, gsl::make_not_null(&processor_scheduling_info));
gsl_Assert(1 == erased_scheduling_infos_count);
});


processor->incrementActiveTasks();
auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); });

Expand Down Expand Up @@ -107,16 +110,16 @@ nonstd::expected<bool, std::exception_ptr> SchedulingAgent::trigger(core::Proces
return false;
}

auto schedule_it = scheduled_processors_.end();

auto processor_scheduling_info = SchedulingInfo(processor);
{
std::lock_guard<std::mutex> lock(watchdog_mtx_);
schedule_it = scheduled_processors_.emplace(processor).first;
scheduled_processors_.push_back(gsl::make_not_null(&processor_scheduling_info));
}

const auto guard = gsl::finally([this, &schedule_it](){
const auto guard = gsl::finally([this, &processor_scheduling_info](){
std::lock_guard<std::mutex> lock(watchdog_mtx_);
scheduled_processors_.erase(schedule_it);
[[maybe_unused]] const auto erased_scheduling_infos_count = std::erase(scheduled_processors_, gsl::make_not_null(&processor_scheduling_info));
gsl_Assert(1 == erased_scheduling_infos_count);
});

processor->incrementActiveTasks();
Expand All @@ -141,11 +144,11 @@ void SchedulingAgent::watchDogFunc() {
std::lock_guard<std::mutex> lock(watchdog_mtx_);
auto now = std::chrono::steady_clock::now();
for (const auto& info : scheduled_processors_) {
auto elapsed = now - info.last_alert_time_;
auto elapsed = now - info->last_alert_time_;
if (elapsed > alert_time_) {
int64_t elapsed_ms{ std::chrono::duration_cast<std::chrono::milliseconds>(now - info.start_time_).count() };
logger_->log_warn("{}::onTrigger has been running for {} ms in {}", info.name_, elapsed_ms, info.uuid_);
info.last_alert_time_ = now;
int64_t elapsed_ms{ std::chrono::duration_cast<std::chrono::milliseconds>(now - info->start_time_).count() };
logger_->log_warn("{}::onTrigger has been running for {} ms in {}", info->name_, elapsed_ms, info->uuid_);
info->last_alert_time_ = now;
}
}
}
Expand Down

0 comments on commit 212b145

Please sign in to comment.