From 3a4dfeb9ded54db61c85d3d001a8de8a8ab7833f Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Wed, 24 Jan 2024 13:18:54 +0100 Subject: [PATCH] make thread creation async by using a separate thread for it --- src/Common/ThreadPool.cpp | 196 +++++++++++------- src/Common/ThreadPool.h | 10 + .../MySQL/MySQLBinlogEventsDispatcher.cpp | 2 + .../MySQL/MySQLBinlogEventsDispatcher.h | 2 +- .../Formats/Impl/ParallelParsingInputFormat.h | 2 +- 5 files changed, 135 insertions(+), 77 deletions(-) diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index e596aa154221..3f49a4b37d71 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -130,11 +130,15 @@ ThreadPoolImpl::ThreadPoolImpl( , queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */) , shutdown_on_exception(shutdown_on_exception_) { - std::lock_guard lock(mutex); - jobs.reserve(max_free_threads); - LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), - "ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}", - static_cast(this), max_threads, max_free_threads, queue_size, StackTrace().toString()); + // LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), + // "ThreadPoolImpl constructor [Instance Address: {}]: max_threads = {}, max_free_threads = {}, queue_size = {}, StackTrace: {}", + // static_cast(this), max_threads, max_free_threads, queue_size, StackTrace().toString()); + + // TODO: actually it seems like for a global pool one thread is not enough for a very dynamic thread pool expansion + housekeepeing_thread = std::thread(&ThreadPoolImpl::threadPoolHousekeep, this); + + // jobs.reserve(max_free_threads); + // while (threads.size() < max_free_threads) // { // try @@ -162,20 +166,11 @@ void ThreadPoolImpl::setMaxThreads(size_t value) queue_size = queue_size ? std::max(queue_size, max_threads) : 0; jobs.reserve(queue_size); + desired_pool_size = std::min(max_threads, scheduled_jobs); + if (need_start_threads) { - /// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached. - while (!shutdown && threads.size() < std::min(max_threads, scheduled_jobs)) - { - try - { - createThreadNoLock(); - } - catch (...) - { - break; /// failed to start more threads - } - } + threads_cv.notify_one(); } else if (need_finish_free_threads) { @@ -195,12 +190,10 @@ template void ThreadPoolImpl::setMaxFreeThreads(size_t value) { std::lock_guard lock(mutex); - bool need_finish_free_threads = (value < max_free_threads); - max_free_threads = std::min(value, max_threads); - if (need_finish_free_threads) - { + if (current_pool_size > scheduled_jobs + max_free_threads) { + desired_pool_size = std::min(max_threads, scheduled_jobs + max_free_threads); /// Wake up free threads so they can finish themselves. new_job_or_shutdown.notify_all(); } @@ -275,7 +268,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: } throw DB::Exception(DB::ErrorCodes::CANNOT_SCHEDULE_TASK, "Cannot schedule a task: {} (threads={}, jobs={})", reason, - threads.size(), scheduled_jobs); + current_pool_size, scheduled_jobs); } else return false; @@ -311,23 +304,10 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Because if an exception would be thrown, we won't notify a thread about job occurrence. /// Check if there are enough threads to process job. - if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) + if (desired_pool_size < std::min(max_threads, scheduled_jobs + 1)) { - // while (threads.size() < std::min(max_threads, scheduled_jobs + 1) ) - // { - try - { - createThreadNoLock(); - } - catch (DB::Exception & e) - { - on_error(e.what()); - } - catch (...) - { - on_error("can not start new thread"); - } - // } + desired_pool_size = std::min(max_threads, scheduled_jobs + 1); + threads_cv.notify_one(); } Stopwatch watch3; @@ -400,10 +380,9 @@ ThreadPoolImpl::~ThreadPoolImpl() /// Note: should not use logger from here, /// because it can be an instance of GlobalThreadPool that is a global variable /// and the destruction order of global variables is unspecified. - LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), - "ThreadPoolImpl destructor [Instance Address: {}]: threads.size() = {}", - static_cast(this), threads.size()); - + // LOG_ERROR(&Poco::Logger::get("ThreadPoolImpl"), + // "ThreadPoolImpl destructor [Instance Address: {}]: threads.size() = {}", + // static_cast(this), threads.size()); finalize(); onDestroy(); @@ -418,20 +397,12 @@ void ThreadPoolImpl::finalize() /// We don't want threads to remove themselves from `threads` anymore, otherwise `thread.join()` will go wrong below in this function. threads_remove_themselves = false; } + desired_pool_size = 0; + threads_cv.notify_all(); /// Wake up threads so they can finish themselves. new_job_or_shutdown.notify_all(); - - /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). - for (auto & thread : threads) - { - thread.join(); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks - ); - } - - threads.clear(); + housekeepeing_thread.join(); } template @@ -466,6 +437,68 @@ bool ThreadPoolImpl::finished() const return shutdown; } +template +void ThreadPoolImpl::threadPoolHousekeep() +{ + while (true) { + { + std::unique_lock lock(threads_mutex); + + // Wait for notification or timeout + if (threads_cv.wait_for(lock, std::chrono::seconds(5), [this]{ return desired_pool_size != current_pool_size; })) + { + if (desired_pool_size == 0 && current_pool_size > 0) // shutdown + { + + /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). + for (auto & thread : threads) + { + thread.join(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks + ); + } + + threads.clear(); + break; + } + + while (desired_pool_size > threads.size()) + { + try + { + createThreadNoLock(); + current_pool_size = threads.size(); + } + catch (DB::Exception & e) + { + LOG_ERROR(&Poco::Logger::get("ThreadPool"), + "ThreadPoolImpl createThreadNoLock failed: {}", e.what()); + break; + } + catch (...) + { + LOG_ERROR(&Poco::Logger::get("ThreadPool"), + "ThreadPoolImpl createThreadNoLock failed: unknown exception"); + break; + } + } + // todo: check if we have to shrink the pool + // else if (threads.size() > desired_pool_size) { + // // We have to wake up threads so they can finish themselves. + // new_job_or_shutdown.notify_all(); + // } + } + else + { + // timer expired + // TODO: check if we have to shrink the pool + } + } + } +} + + template void ThreadPoolImpl::worker(typename std::list::iterator thread_it) { @@ -473,6 +506,9 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ CurrentMetrics::Increment metric_pool_threads(metric_threads); bool job_is_done = false; + bool thread_is_not_needed_anymore = false; + bool thread_should_remove_itself = false; + std::exception_ptr exception_from_job; /// We'll run jobs in this worker while there are scheduled jobs and until some special event occurs (e.g. shutdown, or decreasing the number of max_threads). @@ -517,41 +553,51 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves. } - new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); }); + new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || desired_pool_size < current_pool_size; }); - if (jobs.empty() || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads)) + if (jobs.empty() || desired_pool_size < current_pool_size ) { // We enter here if: // - either this thread is not needed anymore due to max_free_threads excess; // - or shutdown happened AND all jobs are already handled. - if (threads_remove_themselves) - { - thread_it->detach(); - threads.erase(thread_it); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks - ); - } - return; - } - - /// boost::priority_queue does not provide interface for getting non-const reference to an element - /// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority. - job_data = std::move(const_cast(jobs.top())); - jobs.pop(); - /// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them. - if (shutdown) - { - job_is_done = true; - continue; + thread_is_not_needed_anymore = true; // we will remove this thread from the pool after leaving the critical section + thread_should_remove_itself = threads_remove_themselves; // we will not be able to access thread_should_remove_itself after leaving the critical section } + else + { + /// boost::priority_queue does not provide interface for getting non-const reference to an element + /// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority. + job_data = std::move(const_cast(jobs.top())); + jobs.pop(); + /// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them. + if (shutdown) + { + job_is_done = true; + continue; + } + } ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolWorkerLockHoldingMicroseconds : ProfileEvents::LocalThreadPoolWorkerLockHoldingMicroseconds, watch.elapsedMicroseconds()); } + if (thread_is_not_needed_anymore) + { + if (thread_should_remove_itself) + { + std::lock_guard lock(threads_mutex); + thread_it->detach(); + threads.erase(thread_it); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks + ); + current_pool_size--; + } + return; + } + ALLOW_ALLOCATIONS_IN_SCOPE; /// Set up tracing context for this thread by its parent context. diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index c312e465b3ff..ad078acb97e0 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -130,10 +130,18 @@ class ThreadPoolImpl const bool shutdown_on_exception = true; boost::heap::priority_queue jobs; + std::list threads; + mutable std::mutex threads_mutex; // used only for threads list manipulations + std::thread housekeepeing_thread; // thread that maintains the number of the threads in pool close to desired (asynchronously) + std::atomic desired_pool_size = 0; + std::atomic current_pool_size = 0; + std::condition_variable threads_cv; + std::exception_ptr first_exception; std::stack on_destroy_callbacks; + template ReturnType scheduleImpl(Job job, Priority priority, std::optional wait_microseconds, bool propagate_opentelemetry_tracing_context = true); @@ -144,6 +152,8 @@ class ThreadPoolImpl void finalize(); void onDestroy(); + + void threadPoolHousekeep(); }; diff --git a/src/Databases/MySQL/MySQLBinlogEventsDispatcher.cpp b/src/Databases/MySQL/MySQLBinlogEventsDispatcher.cpp index 4af307f9c0f9..eff2256286f0 100644 --- a/src/Databases/MySQL/MySQLBinlogEventsDispatcher.cpp +++ b/src/Databases/MySQL/MySQLBinlogEventsDispatcher.cpp @@ -1,6 +1,8 @@ #include "MySQLBinlogEventsDispatcher.h" #include #include +#include + namespace DB::ErrorCodes { diff --git a/src/Databases/MySQL/MySQLBinlogEventsDispatcher.h b/src/Databases/MySQL/MySQLBinlogEventsDispatcher.h index 433796970150..d60577b5923e 100644 --- a/src/Databases/MySQL/MySQLBinlogEventsDispatcher.h +++ b/src/Databases/MySQL/MySQLBinlogEventsDispatcher.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include diff --git a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h index ff97afa83487..4bb11879f8ed 100644 --- a/src/Processors/Formats/Impl/ParallelParsingInputFormat.h +++ b/src/Processors/Formats/Impl/ParallelParsingInputFormat.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include