diff --git a/libs/core/synchronization/include/hpx/synchronization/condition_variable.hpp b/libs/core/synchronization/include/hpx/synchronization/condition_variable.hpp index d29c9ab67779..420b2da18efc 100644 --- a/libs/core/synchronization/include/hpx/synchronization/condition_variable.hpp +++ b/libs/core/synchronization/include/hpx/synchronization/condition_variable.hpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include @@ -158,7 +157,7 @@ namespace hpx { /// /// \returns \a notify_one returns \a void. /// - void notify_one(error_code& ec = throws) + void notify_one(error_code& ec = throws) const { std::unique_lock l(data_->mtx_); data_->cond_.notify_one(HPX_MOVE(l), ec); @@ -173,7 +172,7 @@ namespace hpx { /// /// \returns \a notify_all returns \a void. /// - void notify_all(error_code& ec = throws) + void notify_all(error_code& ec = throws) const { std::unique_lock l(data_->mtx_); data_->cond_.notify_all(HPX_MOVE(l), ec); @@ -214,8 +213,7 @@ namespace hpx { auto const data = data_; // keep data alive - util::ignore_all_while_checking const ignore_lock; - HPX_UNUSED(ignore_lock); + [[maybe_unused]] util::ignore_all_while_checking const ignore_lock; std::unique_lock l(data->mtx_); unlock_guard> unlock(lock); @@ -321,8 +319,7 @@ namespace hpx { auto const data = data_; // keep data alive - util::ignore_all_while_checking const ignore_lock; - HPX_UNUSED(ignore_lock); + [[maybe_unused]] util::ignore_all_while_checking const ignore_lock; std::unique_lock l(data->mtx_); unlock_guard> unlock(lock); @@ -620,7 +617,7 @@ namespace hpx { /// /// \returns \a notify_one returns \a void. /// - void notify_one(error_code& ec = throws) + void notify_one(error_code& ec = throws) const { std::unique_lock l(data_->mtx_); data_->cond_.notify_one(HPX_MOVE(l), ec); @@ -651,7 +648,7 @@ namespace hpx { /// /// \returns \a notify_all returns \a void. /// - void notify_all(error_code& ec = throws) + void notify_all(error_code& ec = throws) const { std::unique_lock l(data_->mtx_); data_->cond_.notify_all(HPX_MOVE(l), ec); @@ -702,8 +699,7 @@ namespace hpx { auto const data = data_; // keep data alive - util::ignore_all_while_checking const ignore_lock; - HPX_UNUSED(ignore_lock); + [[maybe_unused]] util::ignore_all_while_checking const ignore_lock; std::unique_lock l(data->mtx_); unlock_guard unlock(lock); @@ -819,8 +815,7 @@ namespace hpx { auto const data = data_; // keep data alive - util::ignore_all_while_checking const ignore_lock; - HPX_UNUSED(ignore_lock); + [[maybe_unused]] util::ignore_all_while_checking const ignore_lock; std::unique_lock l(data->mtx_); unlock_guard unlock(lock); @@ -1069,14 +1064,14 @@ namespace hpx { while (!pred()) { - util::ignore_all_while_checking const ignore_lock; - HPX_UNUSED(ignore_lock); + [[maybe_unused]] util::ignore_all_while_checking const + ignore_lock; std::unique_lock l(data->mtx_); if (stoken.stop_requested()) { // pred() has already evaluated to false since we last - // a acquired lock + // an acquired lock return false; } @@ -1164,8 +1159,8 @@ namespace hpx { { bool should_stop; { - util::ignore_all_while_checking const ignore_lock; - HPX_UNUSED(ignore_lock); + [[maybe_unused]] util::ignore_all_while_checking const + ignore_lock; std::unique_lock l(data->mtx_); if (stoken.stop_requested()) diff --git a/libs/core/synchronization/include/hpx/synchronization/detail/condition_variable.hpp b/libs/core/synchronization/include/hpx/synchronization/detail/condition_variable.hpp index d52159055602..169ac7e32439 100644 --- a/libs/core/synchronization/include/hpx/synchronization/detail/condition_variable.hpp +++ b/libs/core/synchronization/include/hpx/synchronization/detail/condition_variable.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2022 Hartmut Kaiser +// Copyright (c) 2007-2023 Hartmut Kaiser // Copyright (c) 2013-2015 Agustin Berge // // SPDX-License-Identifier: BSL-1.0 @@ -55,17 +55,31 @@ namespace hpx::lcos::local::detail { // Return false if no more threads are waiting (returns true if queue is // non-empty). - HPX_CORE_EXPORT bool notify_one(std::unique_lock lock, - threads::thread_priority priority, error_code& ec = throws); + HPX_CORE_EXPORT bool notify_one(std::unique_lock& lock, + threads::thread_priority priority, bool unlock, + error_code& ec = throws); HPX_CORE_EXPORT void notify_all(std::unique_lock lock, threads::thread_priority priority, error_code& ec = throws); + bool notify_one(std::unique_lock lock, + threads::thread_priority priority, error_code& ec = throws) + { + return notify_one(lock, priority, true, ec); + } + bool notify_one( std::unique_lock lock, error_code& ec = throws) { return notify_one( - HPX_MOVE(lock), threads::thread_priority::default_, ec); + lock, threads::thread_priority::default_, true, ec); + } + + bool notify_one_no_unlock( + std::unique_lock& lock, error_code& ec = throws) + { + return notify_one( + lock, threads::thread_priority::default_, false, ec); } void notify_all( diff --git a/libs/core/synchronization/include/hpx/synchronization/shared_mutex.hpp b/libs/core/synchronization/include/hpx/synchronization/shared_mutex.hpp index 24beb12e22db..1b3d5105ac93 100644 --- a/libs/core/synchronization/include/hpx/synchronization/shared_mutex.hpp +++ b/libs/core/synchronization/include/hpx/synchronization/shared_mutex.hpp @@ -11,226 +11,570 @@ #pragma once #include -#include -#include - +#include +#include +#include +#include +#include + +#include +#include #include -namespace hpx { +namespace hpx::detail { - namespace detail { + /////////////////////////////////////////////////////////////////////////// + template + struct shared_mutex_data + { + using mutex_type = Mutex; - template - class shared_mutex + HPX_HOST_DEVICE_CONSTEXPR shared_mutex_data() noexcept + : count_(1) { - private: - using mutex_type = Mutex; + } + + struct state_data + { + std::uint32_t shared_count; + std::uint8_t tag; // ABA protection + bool exclusive; + bool upgrade; + bool exclusive_waiting_blocked; + }; - struct state_data + struct shared_state + { + union { - unsigned shared_count; - bool exclusive; - bool upgrade; - bool exclusive_waiting_blocked; + std::uint64_t value = 0; + state_data data; }; - state_data state; - mutex_type state_change; - hpx::condition_variable shared_cond; - hpx::condition_variable exclusive_cond; - hpx::condition_variable upgrade_cond; + shared_state() = default; + }; - void release_waiters() - { - exclusive_cond.notify_one(); - shared_cond.notify_all(); - } + util::cache_aligned_data_derived> state; - public: - shared_mutex() - : state{0u, false, false, false} - { - } + using condition_variable = lcos::local::detail::condition_variable; - void lock_shared() - { - std::unique_lock lk(state_change); + util::cache_aligned_data_derived state_change; + util::cache_aligned_data_derived shared_cond; + util::cache_aligned_data_derived exclusive_cond; + util::cache_aligned_data_derived upgrade_cond; + + void release_waiters(std::unique_lock& lk) + { + exclusive_cond.notify_one_no_unlock(lk); + shared_cond.notify_all(HPX_MOVE(lk)); + } + + bool set_state(shared_state& s1, shared_state& s) noexcept + { + ++s.data.tag; + return s1.value == state.load(std::memory_order_relaxed).value && + state.compare_exchange_strong(s1, s, std::memory_order_release); + } + + bool set_state(shared_state& s1, shared_state& s, + std::unique_lock& lk) noexcept + { + if (s1.value != state.load(std::memory_order_relaxed).value) + return false; + + ++s.data.tag; - while (state.exclusive || state.exclusive_waiting_blocked) + lk = std::unique_lock(state_change); + if (state.compare_exchange_strong(s1, s, std::memory_order_release)) + return true; + + lk.unlock(); + return false; + } + + void lock_shared() + { + while (true) + { + auto s = state.load(std::memory_order_acquire); + while (s.data.exclusive || s.data.exclusive_waiting_blocked) { - shared_cond.wait(lk); + { + std::unique_lock lk(state_change); + shared_cond.wait(lk); + } + + s = state.load(std::memory_order_acquire); } - ++state.shared_count; + auto s1 = s; + + ++s.data.shared_count; + if (set_state(s1, s)) + { + break; + } } + } - bool try_lock_shared() + bool try_lock_shared() + { + while (true) { - std::unique_lock lk(state_change); - - if (state.exclusive || state.exclusive_waiting_blocked) + auto s = state.load(std::memory_order_acquire); + if (s.data.exclusive || s.data.exclusive_waiting_blocked) + { return false; + } - else + auto s1 = s; + + ++s.data.shared_count; + if (set_state(s1, s)) { - ++state.shared_count; - return true; + break; } } + return true; + } - void unlock_shared() + void unlock_shared() + { + while (true) { - std::unique_lock lk(state_change); + auto s = state.load(std::memory_order_acquire); + auto s1 = s; - if (/*bool const last_reader = */ !--state.shared_count) + if (--s.data.shared_count == 0) { - if (state.upgrade) + if (s.data.upgrade) { - state.upgrade = false; - state.exclusive = true; - - upgrade_cond.notify_one(); + s.data.upgrade = false; + s.data.exclusive = true; + + std::unique_lock lk; + if (set_state(s1, s, lk)) + { + HPX_ASSERT_OWNS_LOCK(lk); + upgrade_cond.notify_one_no_unlock(lk); + release_waiters(lk); + break; + } } else { - state.exclusive_waiting_blocked = false; + s.data.exclusive_waiting_blocked = false; + + std::unique_lock lk; + if (set_state(s1, s, lk)) + { + HPX_ASSERT_OWNS_LOCK(lk); + release_waiters(lk); + break; + } } - - release_waiters(); + } + else if (set_state(s1, s)) + { + break; } } + } - void lock() + void lock() + { + while (true) { - std::unique_lock lk(state_change); - - while (state.shared_count || state.exclusive) + auto s = state.load(std::memory_order_acquire); + while (s.data.shared_count != 0 || s.data.exclusive) { - state.exclusive_waiting_blocked = true; - exclusive_cond.wait(lk); + auto s1 = s; + + s.data.exclusive_waiting_blocked = true; + std::unique_lock lk; + if (set_state(s1, s, lk)) + { + HPX_ASSERT_OWNS_LOCK(lk); + exclusive_cond.wait(lk); + } + + s = state.load(std::memory_order_acquire); } - state.exclusive = true; + auto s1 = s; + + s.data.exclusive = true; + if (set_state(s1, s)) + { + break; + } } + } - bool try_lock() + bool try_lock() + { + while (true) { - std::unique_lock lk(state_change); - - if (state.shared_count || state.exclusive) + auto s = state.load(std::memory_order_acquire); + if (s.data.shared_count || s.data.exclusive) + { return false; + } - else + auto s1 = s; + + s.data.exclusive = true; + if (set_state(s1, s)) { - state.exclusive = true; - return true; + break; } } + return true; + } - void unlock() + void unlock() + { + while (true) { - std::unique_lock lk(state_change); - state.exclusive = false; - state.exclusive_waiting_blocked = false; - release_waiters(); + auto s = state.load(std::memory_order_acquire); + auto s1 = s; + + s.data.exclusive = false; + s.data.exclusive_waiting_blocked = false; + + std::unique_lock lk; + if (set_state(s1, s, lk)) + { + HPX_ASSERT_OWNS_LOCK(lk); + release_waiters(lk); + break; + } } + } - void lock_upgrade() + void lock_upgrade() + { + while (true) { - std::unique_lock lk(state_change); - - while (state.exclusive || state.exclusive_waiting_blocked || - state.upgrade) + auto s = state.load(std::memory_order_acquire); + while (s.data.exclusive || s.data.exclusive_waiting_blocked || + s.data.upgrade) { - shared_cond.wait(lk); + { + std::unique_lock lk(state_change); + shared_cond.wait(lk); + } + + s = state.load(std::memory_order_acquire); } - ++state.shared_count; - state.upgrade = true; + auto s1 = s; + + ++s.data.shared_count = true; + s.data.upgrade = true; + if (set_state(s1, s)) + { + break; + } } + } - bool try_lock_upgrade() + bool try_lock_upgrade() + { + while (true) { - std::unique_lock lk(state_change); - - if (state.exclusive || state.exclusive_waiting_blocked || - state.upgrade) + auto s = state.load(std::memory_order_acquire); + if (s.data.exclusive || s.data.exclusive_waiting_blocked || + s.data.upgrade) { return false; } - ++state.shared_count; - state.upgrade = true; - return true; + auto s1 = s; + + ++s.data.shared_count; + s.data.upgrade = true; + if (set_state(s1, s)) + { + break; + } } + return true; + } - void unlock_upgrade() + void unlock_upgrade() + { + while (true) { - std::unique_lock lk(state_change); - state.upgrade = false; + auto s = state.load(std::memory_order_acquire); + auto s1 = s; + + bool release = false; + s.data.upgrade = false; + if (--s.data.shared_count == 0) + { + s.data.exclusive_waiting_blocked = false; + release = true; + } - if (/*bool const last_reader = */ !--state.shared_count) + if (release) { - state.exclusive_waiting_blocked = false; - release_waiters(); + std::unique_lock lk; + if (set_state(s1, s, lk)) + { + HPX_ASSERT_OWNS_LOCK(lk); + release_waiters(lk); + break; + } + } + else if (set_state(s1, s)) + { + break; } } + } - void unlock_upgrade_and_lock() + void unlock_upgrade_and_lock() + { + while (true) { - std::unique_lock lk(state_change); - --state.shared_count; + auto s = state.load(std::memory_order_acquire); + auto s1 = s; - while (state.shared_count) + --s.data.shared_count; + if (!set_state(s1, s)) { - upgrade_cond.wait(lk); + continue; + } + + s = state.load(std::memory_order_acquire); + while (s.data.shared_count != 0) + { + { + std::unique_lock lk(state_change); + upgrade_cond.wait(lk); + } + s = state.load(std::memory_order_acquire); } - state.upgrade = false; - state.exclusive = true; + s1 = s; + + s.data.upgrade = false; + s.data.exclusive = true; + if (set_state(s1, s)) + { + break; + } } + } - void unlock_and_lock_upgrade() + void unlock_and_lock_upgrade() + { + while (true) { - std::unique_lock lk(state_change); - state.exclusive = false; - state.upgrade = true; - ++state.shared_count; - state.exclusive_waiting_blocked = false; - release_waiters(); + auto s = state.load(std::memory_order_acquire); + auto s1 = s; + + s.data.exclusive = false; + s.data.exclusive_waiting_blocked = false; + s.data.upgrade = true; + ++s.data.shared_count; + + std::unique_lock lk; + if (set_state(s1, s, lk)) + { + HPX_ASSERT_OWNS_LOCK(lk); + release_waiters(lk); + break; + } } + } - void unlock_and_lock_shared() + void unlock_and_lock_shared() + { + while (true) { - std::unique_lock lk(state_change); - state.exclusive = false; - ++state.shared_count; - state.exclusive_waiting_blocked = false; - release_waiters(); + auto s = state.load(std::memory_order_acquire); + auto s1 = s; + + s.data.exclusive = false; + s.data.exclusive_waiting_blocked = false; + ++s.data.shared_count; + + std::unique_lock lk; + if (set_state(s1, s, lk)) + { + HPX_ASSERT_OWNS_LOCK(lk); + release_waiters(lk); + break; + } } + } - bool try_unlock_shared_and_lock() + bool try_unlock_shared_and_lock() + { + while (true) { - std::unique_lock lk(state_change); - if (!state.exclusive && !state.exclusive_waiting_blocked && - !state.upgrade && state.shared_count == 1) + auto s = state.load(std::memory_order_acquire); + if (s.data.exclusive || s.data.exclusive_waiting_blocked || + s.data.upgrade || s.data.shared_count == 1) + { + return false; + } + + auto s1 = s; + + s.data.shared_count = 0; + s.data.exclusive = true; + if (set_state(s1, s)) { - state.shared_count = 0; - state.exclusive = true; - return true; + break; } - return false; } + return true; + } - void unlock_upgrade_and_lock_shared() + void unlock_upgrade_and_lock_shared() + { + while (true) { - std::unique_lock lk(state_change); - state.upgrade = false; - state.exclusive_waiting_blocked = false; - release_waiters(); + auto s = state.load(std::memory_order_acquire); + auto s1 = s; + + s.data.exclusive_waiting_blocked = false; + s.data.upgrade = false; + + std::unique_lock lk; + if (set_state(s1, s, lk)) + { + HPX_ASSERT_OWNS_LOCK(lk); + release_waiters(lk); + break; + } } - }; - } // namespace detail + } + + private: + friend void intrusive_ptr_add_ref(shared_mutex_data* p) noexcept + { + ++p->count_; + } + + friend void intrusive_ptr_release(shared_mutex_data* p) noexcept + { + if (0 == --p->count_) + { + delete p; + } + } + + hpx::util::atomic_count count_; + }; + + template + class shared_mutex + { + private: + using mutex_type = Mutex; + + using data_type = hpx::intrusive_ptr>; + hpx::util::cache_aligned_data_derived data_; + + using shared_state = typename shared_mutex_data::shared_state; + + public: + shared_mutex() + : data_(new shared_mutex_data, false) + { + } + + void lock_shared() + { + auto data = data_; + data->lock_shared(); + } + + bool try_lock_shared() + { + auto data = data_; + return data->try_lock_shared(); + } + + void unlock_shared() + { + auto data = data_; + data->unlock_shared(); + } + + void lock() + { + auto data = data_; + data->lock(); + } + + bool try_lock() + { + auto data = data_; + return data->try_lock(); + } + + void unlock() + { + auto data = data_; + data->unlock(); + } + + void lock_upgrade() + { + auto data = data_; + data->lock_upgrade(); + } + + bool try_lock_upgrade() + { + auto data = data_; + return data->try_lock_upgrade(); + } + + void unlock_upgrade() + { + auto data = data_; + data->unlock_upgrade(); + } + + void unlock_upgrade_and_lock() + { + auto data = data_; + data->unlock_upgrade_and_lock(); + } + + void unlock_and_lock_upgrade() + { + auto data = data_; + data->unlock_and_lock_upgrade(); + } + + void unlock_and_lock_shared() + { + auto data = data_; + data->unlock_and_lock_shared(); + } + + bool try_unlock_shared_and_lock() + { + auto data = data_; + return data->try_unlock_shared_and_lock(); + } + + void unlock_upgrade_and_lock_shared() + { + auto data = data_; + data->unlock_upgrade_and_lock_shared(); + } + }; +} // namespace hpx::detail + +namespace hpx { /// The \a shared_mutex class is a synchronization primitive that can be /// used to protect shared data from being simultaneously accessed by diff --git a/libs/core/synchronization/src/detail/condition_variable.cpp b/libs/core/synchronization/src/detail/condition_variable.cpp index 7b86274f2af0..3db9fa72b0c6 100644 --- a/libs/core/synchronization/src/detail/condition_variable.cpp +++ b/libs/core/synchronization/src/detail/condition_variable.cpp @@ -108,8 +108,8 @@ namespace hpx::lcos::local::detail { // Return false if no more threads are waiting (returns true if queue // is non-empty). - bool condition_variable::notify_one(std::unique_lock lock, - threads::thread_priority /* priority */, error_code& ec) + bool condition_variable::notify_one(std::unique_lock& lock, + threads::thread_priority priority, bool unlock, error_code& ec) { // Caller failing to hold lock 'lock' before calling function #if defined(HPX_MSVC) @@ -138,7 +138,8 @@ namespace hpx::lcos::local::detail { } bool const not_empty = !queue_.empty(); - lock.unlock(); + if (unlock) + lock.unlock(); ctx.resume(); @@ -148,6 +149,9 @@ namespace hpx::lcos::local::detail { if (&ec != &throws) ec = make_success_code(); + if (unlock) + lock.unlock(); + return false; #if defined(HPX_MSVC) diff --git a/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex1.cpp b/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex1.cpp index a36f3cf9d3af..9994085ce797 100644 --- a/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex1.cpp +++ b/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex1.cpp @@ -31,24 +31,25 @@ void test_multiple_readers() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; - unsigned const number_of_threads = 10; + constexpr unsigned number_of_threads = 10; test::thread_group pool; - hpx::shared_mutex rw_mutex; - unsigned unblocked_count = 0; - unsigned simultaneous_running_count = 0; unsigned max_simultaneous_running = 0; mutex_type unblocked_count_mutex; - hpx::condition_variable unblocked_condition; mutex_type finish_mutex; std::unique_lock finish_lock(finish_mutex); try { + hpx::shared_mutex rw_mutex; + unsigned unblocked_count = 0; + unsigned simultaneous_running_count = 0; + hpx::condition_variable unblocked_condition; + for (unsigned i = 0; i != number_of_threads; ++i) { pool.create_thread( @@ -86,24 +87,25 @@ void test_multiple_readers() void test_only_one_writer_permitted() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; - unsigned const number_of_threads = 10; + constexpr unsigned number_of_threads = 10; test::thread_group pool; - hpx::shared_mutex rw_mutex; unsigned unblocked_count = 0; - unsigned simultaneous_running_count = 0; unsigned max_simultaneous_running = 0; mutex_type unblocked_count_mutex; - hpx::condition_variable unblocked_condition; mutex_type finish_mutex; std::unique_lock finish_lock(finish_mutex); try { + hpx::shared_mutex rw_mutex; + unsigned simultaneous_running_count = 0; + hpx::condition_variable unblocked_condition; + for (unsigned i = 0; i != number_of_threads; ++i) { pool.create_thread( @@ -135,22 +137,23 @@ void test_only_one_writer_permitted() void test_reader_blocks_writer() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; test::thread_group pool; - hpx::shared_mutex rw_mutex; unsigned unblocked_count = 0; - unsigned simultaneous_running_count = 0; unsigned max_simultaneous_running = 0; mutex_type unblocked_count_mutex; - hpx::condition_variable unblocked_condition; mutex_type finish_mutex; std::unique_lock finish_lock(finish_mutex); try { + hpx::shared_mutex rw_mutex; + unsigned simultaneous_running_count = 0; + hpx::condition_variable unblocked_condition; + pool.create_thread( test::locking_thread>(rw_mutex, unblocked_count, unblocked_count_mutex, unblocked_condition, @@ -195,25 +198,26 @@ void test_reader_blocks_writer() void test_unlocking_writer_unblocks_all_readers() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; test::thread_group pool; hpx::shared_mutex rw_mutex; std::unique_lock write_lock(rw_mutex); - unsigned unblocked_count = 0; - unsigned simultaneous_running_count = 0; unsigned max_simultaneous_running = 0; mutex_type unblocked_count_mutex; - hpx::condition_variable unblocked_condition; mutex_type finish_mutex; std::unique_lock finish_lock(finish_mutex); - unsigned const reader_count = 10; + constexpr unsigned reader_count = 10; try { + unsigned unblocked_count = 0; + unsigned simultaneous_running_count = 0; + hpx::condition_variable unblocked_condition; + for (unsigned i = 0; i != reader_count; ++i) { pool.create_thread( @@ -257,29 +261,30 @@ void test_unlocking_writer_unblocks_all_readers() void test_unlocking_last_reader_only_unblocks_one_writer() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; test::thread_group pool; - hpx::shared_mutex rw_mutex; unsigned unblocked_count = 0; - unsigned simultaneous_running_readers = 0; unsigned max_simultaneous_readers = 0; - unsigned simultaneous_running_writers = 0; unsigned max_simultaneous_writers = 0; mutex_type unblocked_count_mutex; - hpx::condition_variable unblocked_condition; mutex_type finish_reading_mutex; std::unique_lock finish_reading_lock(finish_reading_mutex); mutex_type finish_writing_mutex; std::unique_lock finish_writing_lock(finish_writing_mutex); - unsigned const reader_count = 10; - unsigned const writer_count = 10; + constexpr unsigned reader_count = 10; + constexpr unsigned writer_count = 10; try { + hpx::shared_mutex rw_mutex; + unsigned simultaneous_running_readers = 0; + unsigned simultaneous_running_writers = 0; + hpx::condition_variable unblocked_condition; + for (unsigned i = 0; i != reader_count; ++i) { pool.create_thread( @@ -360,7 +365,7 @@ int hpx_main() int main(int argc, char* argv[]) { - // By default this test should run on all available cores + // By default, this test should run on all available cores std::vector const cfg = {"hpx.os_threads=all"}; // Initialize and run HPX diff --git a/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex2.cpp b/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex2.cpp index 9a0cf5fe6ed5..b9ee89713411 100644 --- a/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex2.cpp +++ b/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex2.cpp @@ -1,5 +1,5 @@ // (C) Copyright 2006-7 Anthony Williams -// Copyright (c) 2015-2022 Hartmut Kaiser +// Copyright (c) 2015-2023 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See @@ -29,24 +29,25 @@ void test_only_one_upgrade_lock_permitted() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; - unsigned const number_of_threads = 2; + constexpr unsigned number_of_threads = 2; test::thread_group pool; - shared_mutex_type rw_mutex; unsigned unblocked_count = 0; - unsigned simultaneous_running_count = 0; unsigned max_simultaneous_running = 0; mutex_type unblocked_count_mutex; - hpx::condition_variable unblocked_condition; mutex_type finish_mutex; std::unique_lock finish_lock(finish_mutex); try { + shared_mutex_type rw_mutex; + unsigned simultaneous_running_count = 0; + hpx::condition_variable unblocked_condition; + for (unsigned i = 0; i != number_of_threads; ++i) { pool.create_thread( @@ -78,24 +79,25 @@ void test_only_one_upgrade_lock_permitted() void test_can_lock_upgrade_if_currently_locked_shared() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; test::thread_group pool; - shared_mutex_type rw_mutex; unsigned unblocked_count = 0; - unsigned simultaneous_running_count = 0; unsigned max_simultaneous_running = 0; mutex_type unblocked_count_mutex; - hpx::condition_variable unblocked_condition; mutex_type finish_mutex; std::unique_lock finish_lock(finish_mutex); - unsigned const reader_count = 10; + constexpr unsigned reader_count = 10; try { + shared_mutex_type rw_mutex; + unsigned simultaneous_running_count = 0; + hpx::condition_variable unblocked_condition; + for (unsigned i = 0; i != reader_count; ++i) { pool.create_thread( @@ -143,18 +145,18 @@ void test_can_lock_upgrade_if_currently_locked_shared() void test_can_lock_upgrade_to_unique_if_currently_locked_upgrade() { - typedef hpx::shared_mutex shared_mutex_type; + using shared_mutex_type = hpx::shared_mutex; shared_mutex_type mtx; hpx::upgrade_lock l(mtx); - hpx::upgrade_to_unique_lock ul(l); + hpx::upgrade_to_unique_lock const ul(l); HPX_TEST(ul.owns_lock()); } void test_if_other_thread_has_write_lock_try_lock_shared_returns_false() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; shared_mutex_type rw_mutex; mutex_type finish_mutex; @@ -181,8 +183,8 @@ void test_if_other_thread_has_write_lock_try_lock_shared_returns_false() void test_if_other_thread_has_write_lock_try_lock_upgrade_returns_false() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; shared_mutex_type rw_mutex; mutex_type finish_mutex; @@ -209,7 +211,7 @@ void test_if_other_thread_has_write_lock_try_lock_upgrade_returns_false() void test_if_no_thread_has_lock_try_lock_shared_returns_true() { - typedef hpx::shared_mutex shared_mutex_type; + using shared_mutex_type = hpx::shared_mutex; shared_mutex_type rw_mutex; bool const try_succeeded = rw_mutex.try_lock_shared(); @@ -222,7 +224,7 @@ void test_if_no_thread_has_lock_try_lock_shared_returns_true() void test_if_no_thread_has_lock_try_lock_upgrade_returns_true() { - typedef hpx::shared_mutex shared_mutex_type; + using shared_mutex_type = hpx::shared_mutex; shared_mutex_type rw_mutex; bool const try_succeeded = rw_mutex.try_lock_upgrade(); @@ -235,8 +237,8 @@ void test_if_no_thread_has_lock_try_lock_upgrade_returns_true() void test_if_other_thread_has_shared_lock_try_lock_shared_returns_true() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; shared_mutex_type rw_mutex; mutex_type finish_mutex; @@ -263,8 +265,8 @@ void test_if_other_thread_has_shared_lock_try_lock_shared_returns_true() void test_if_other_thread_has_shared_lock_try_lock_upgrade_returns_true() { - typedef hpx::shared_mutex shared_mutex_type; - typedef hpx::mutex mutex_type; + using shared_mutex_type = hpx::shared_mutex; + using mutex_type = hpx::mutex; shared_mutex_type rw_mutex; mutex_type finish_mutex; @@ -307,7 +309,7 @@ int hpx_main() int main(int argc, char* argv[]) { - // By default this test should run on all available cores + // By default, this test should run on all available cores std::vector const cfg = {"hpx.os_threads=all"}; // Initialize and run HPX diff --git a/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex_locking_thread.hpp b/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex_locking_thread.hpp index 8e593a57d5ad..f872454ab898 100644 --- a/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex_locking_thread.hpp +++ b/libs/core/synchronization/tests/unit/shared_mutex/shared_mutex_locking_thread.hpp @@ -1,5 +1,5 @@ // (C) Copyright 2008 Anthony Williams -// Copyright (c) 2015-2022 Hartmut Kaiser +// Copyright (c) 2015-2023 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See @@ -45,7 +45,7 @@ namespace test { { } - void operator()() + void operator()() const { // acquire lock Lock lock(rw_mutex); @@ -93,7 +93,7 @@ namespace test { { } - void operator()() + void operator()() const { std::unique_lock lk(rwm); { @@ -124,7 +124,7 @@ namespace test { { } - void operator()() + void operator()() const { std::shared_lock lk(rwm); { diff --git a/libs/core/synchronization/tests/unit/shared_mutex/thread_group.hpp b/libs/core/synchronization/tests/unit/shared_mutex/thread_group.hpp index f98a708af8e9..a95781b36eb1 100644 --- a/libs/core/synchronization/tests/unit/shared_mutex/thread_group.hpp +++ b/libs/core/synchronization/tests/unit/shared_mutex/thread_group.hpp @@ -1,5 +1,5 @@ // (C) Copyright 2007-9 Anthony Williams -// Copyright (c) 2015-2022 Hartmut Kaiser +// Copyright (c) 2015-2023 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See @@ -27,10 +27,11 @@ #endif namespace test { + class thread_group { private: - typedef hpx::shared_mutex mutex_type; + using mutex_type = hpx::shared_mutex; public: thread_group() {} @@ -40,16 +41,16 @@ namespace test { ~thread_group() { - for (hpx::thread* t : threads) + for (hpx::thread const* t : threads) delete t; } private: - bool is_this_thread_in() + bool is_this_thread_in() const { - hpx::thread::id id = hpx::this_thread::get_id(); + hpx::thread::id const id = hpx::this_thread::get_id(); std::shared_lock guard(mtx_); - for (hpx::thread* t : threads) + for (hpx::thread const* t : threads) { if (t->get_id() == id) return true; @@ -57,14 +58,14 @@ namespace test { return false; } - bool is_thread_in(hpx::thread* thrd) + bool is_thread_in(hpx::thread const* thrd) const { if (!thrd) return false; - hpx::thread::id id = thrd->get_id(); + hpx::thread::id const id = thrd->get_id(); std::shared_lock guard(mtx_); - for (hpx::thread* t : threads) + for (hpx::thread const* t : threads) { if (t->get_id() == id) return true; @@ -93,7 +94,6 @@ namespace test { "thread_group::add_thread", "resource_deadlock_would_occur: trying to add a " "duplicated thread"); - return; }; std::lock_guard guard(mtx_); @@ -101,7 +101,7 @@ namespace test { } } - void remove_thread(hpx::thread* thrd) + void remove_thread(hpx::thread const* thrd) { std::lock_guard guard(mtx_); std::list::iterator const it = @@ -111,14 +111,13 @@ namespace test { threads.erase(it); } - void join_all() + void join_all() const { if (is_this_thread_in()) { HPX_THROW_EXCEPTION(hpx::error::thread_resource_error, "thread_group::join_all", "resource_deadlock_would_occur: trying joining itself"); - return; } std::shared_lock guard(mtx_); @@ -129,7 +128,7 @@ namespace test { } } - void interrupt_all() + void interrupt_all() const { std::shared_lock guard(mtx_); for (hpx::thread* t : threads) diff --git a/libs/full/components_base/include/hpx/components_base/server/one_size_heap_list.hpp b/libs/full/components_base/include/hpx/components_base/server/one_size_heap_list.hpp index 06715a58a00e..2ea37a40956e 100644 --- a/libs/full/components_base/include/hpx/components_base/server/one_size_heap_list.hpp +++ b/libs/full/components_base/include/hpx/components_base/server/one_size_heap_list.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 1998-2017 Hartmut Kaiser +// Copyright (c) 1998-2023 Hartmut Kaiser // Copyright (c) 2011 Bryce Lelbach // // SPDX-License-Identifier: BSL-1.0 @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -30,7 +31,7 @@ namespace hpx { namespace util { using iterator = typename list_type::iterator; using const_iterator = typename list_type::const_iterator; - using mutex_type = hpx::spinlock; + using mutex_type = hpx::shared_mutex; using heap_parameters = wrapper_heap_base::heap_parameters; @@ -107,8 +108,7 @@ namespace hpx { namespace util { std::string name() const; protected: -// mutable mutex_type mtx_; - mutable pthread_rwlock_t rwlock; + mutable mutex_type rwlock_; list_type heap_list_; private: diff --git a/libs/full/components_base/include/hpx/components_base/server/wrapper_heap.hpp b/libs/full/components_base/include/hpx/components_base/server/wrapper_heap.hpp index 9c820e7c0c92..130890293b52 100644 --- a/libs/full/components_base/include/hpx/components_base/server/wrapper_heap.hpp +++ b/libs/full/components_base/include/hpx/components_base/server/wrapper_heap.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -78,7 +79,7 @@ namespace hpx::components::detail { public: explicit wrapper_heap(char const* class_name, std::size_t count, - heap_parameters parameters); + heap_parameters const& parameters); wrapper_heap(); ~wrapper_heap() override; @@ -112,11 +113,11 @@ namespace hpx::components::detail { protected: char* pool_; heap_parameters const parameters_; - alignas(64) std::atomic first_free_; - alignas(64) std::atomic free_size_; + util::cache_aligned_data_derived> first_free_; + util::cache_aligned_data_derived> free_size_; // these values are used for AGAS registration of all elements of this // managed_component heap - alignas(64) mutable mutex_type mtx_; + mutable util::cache_aligned_data_derived mtx_; naming::gid_type base_gid_; public: @@ -158,7 +159,7 @@ namespace hpx::components::detail { using value_type = T; explicit fixed_wrapper_heap(char const* class_name, std::size_t count, - heap_parameters parameters) + heap_parameters const& parameters) : base_type(class_name, count, parameters) { } diff --git a/libs/full/components_base/include/hpx/components_base/server/wrapper_heap_list.hpp b/libs/full/components_base/include/hpx/components_base/server/wrapper_heap_list.hpp index 544a6b3da16e..e83037233b33 100644 --- a/libs/full/components_base/include/hpx/components_base/server/wrapper_heap_list.hpp +++ b/libs/full/components_base/include/hpx/components_base/server/wrapper_heap_list.hpp @@ -9,8 +9,11 @@ #include #include #include +#include #include +#include +#include #include /////////////////////////////////////////////////////////////////////////////// @@ -25,7 +28,7 @@ namespace hpx::components::detail { using value_type = typename Heap::value_type; using storage_type = std::aligned_storage_t::value>; + std::alignment_of_v>; enum { @@ -44,27 +47,23 @@ namespace hpx::components::detail { get_component_type()), base_type::heap_parameters{ heap_capacity, heap_element_alignment, heap_element_size}, - (Heap*) nullptr) + static_cast(nullptr)) , type_(get_component_type()) { } naming::gid_type get_gid(void* p) { - pthread_rwlock_rdlock(&rwlock); + std::shared_lock sl(rwlock_); - using iterator = typename base_type::const_iterator; - - iterator end = heap_list_.end(); - for (iterator it = heap_list_.begin(); it != end; ++it) + auto const end = heap_list_.end(); + for (auto it = heap_list_.begin(); it != end; ++it) { if ((*it)->did_alloc(p)) { - pthread_rwlock_unlock(&rwlock); return (*it)->get_gid(p, type_); } } - pthread_rwlock_unlock(&rwlock); return naming::invalid_gid; } diff --git a/libs/full/components_base/src/server/one_size_heap_list.cpp b/libs/full/components_base/src/server/one_size_heap_list.cpp index 3033e088cf25..bcf5ab5dc3eb 100644 --- a/libs/full/components_base/src/server/one_size_heap_list.cpp +++ b/libs/full/components_base/src/server/one_size_heap_list.cpp @@ -1,4 +1,4 @@ -// Copyright (c) 1998-2021 Hartmut Kaiser +// Copyright (c) 1998-2023 Hartmut Kaiser // Copyright (c) 2011 Bryce Lelbach // // SPDX-License-Identifier: BSL-1.0 @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,7 @@ #include #include #include +#include #include namespace hpx { namespace util { @@ -55,37 +57,35 @@ namespace hpx { namespace util { void* p = nullptr; - pthread_rwlock_rdlock(&rwlock); - - if (!heap_list_.empty()) { - for (auto& heap : heap_list_) - { - bool allocated = heap->alloc(&p, count); + std::shared_lock sl(rwlock_); - if (allocated) + if (!heap_list_.empty()) + { + for (auto& heap : heap_list_) { + if (heap->alloc(&p, count)) + { #if defined(HPX_DEBUG) - // Allocation succeeded, update statistics. - alloc_count_ += count; - if (alloc_count_ - free_count_ > max_alloc_count_) - max_alloc_count_ = alloc_count_ - free_count_; + // Allocation succeeded, update statistics. + alloc_count_ += count; + if (alloc_count_ - free_count_ > max_alloc_count_) + max_alloc_count_ = alloc_count_ - free_count_; #endif - pthread_rwlock_unlock(&rwlock); - return p; - } + return p; + } #if defined(HPX_DEBUG) - LOSH_(info).format( - "{1}::alloc: failed to allocate from heap[{2}] " - "(heap[{2}] has allocated {3} objects and has " - "space for {4} more objects)", - name(), heap->heap_count(), heap->size(), - heap->free_size()); + LOSH_(info).format( + "{1}::alloc: failed to allocate from heap[{2}] " + "(heap[{2}] has allocated {3} objects and has " + "space for {4} more objects)", + name(), heap->heap_count(), heap->size(), + heap->free_size()); #endif + } } } - pthread_rwlock_unlock(&rwlock); // Create new heap. bool result = false; @@ -98,11 +98,10 @@ namespace hpx { namespace util { result = heap->alloc((void**) &p, count); // Add the heap into the list -// mtx_.lock(); - pthread_rwlock_wrlock(&rwlock); - heap_list_.push_front(heap); - pthread_rwlock_unlock(&rwlock); -// mtx_.unlock(); + { + std::unique_lock ul(rwlock_); + heap_list_.push_front(heap); + } if (HPX_UNLIKELY(!result || nullptr == p)) { @@ -149,24 +148,22 @@ namespace hpx { namespace util { if (reschedule(p, count)) return; -// mtx_.lock(); - pthread_rwlock_rdlock(&rwlock); -// mtx_.unlock(); - // Find the heap which allocated this pointer. - for (auto& heap : heap_list_) { - bool did_allocate = heap->did_alloc(p); - if (did_allocate) + std::shared_lock sl(rwlock_); + + // Find the heap which allocated this pointer. + for (auto const& heap : heap_list_) { - heap->free(p, count); + if (heap->did_alloc(p)) + { + heap->free(p, count); #if defined(HPX_DEBUG) - free_count_ += count; + free_count_ += count; #endif - pthread_rwlock_unlock(&rwlock); - return; + return; + } } } - pthread_rwlock_unlock(&rwlock); HPX_THROW_EXCEPTION(hpx::error::bad_parameter, name() + "::free", "pointer {1} was not allocated by this {2}", p, name()); @@ -174,16 +171,14 @@ namespace hpx { namespace util { bool one_size_heap_list::did_alloc(void* p) const { - pthread_rwlock_rdlock(&rwlock); - for (typename list_type::value_type const& heap : heap_list_) + std::shared_lock sl(rwlock_); + for (auto const& heap : heap_list_) { if (heap->did_alloc(p)) { - pthread_rwlock_unlock(&rwlock); return true; } } - pthread_rwlock_unlock(&rwlock); return false; } @@ -191,7 +186,7 @@ namespace hpx { namespace util { { if (class_name_.empty()) { - return std::string("one_size_heap_list(unknown)"); + return {"one_size_heap_list(unknown)"}; } return std::string("one_size_heap_list(") + class_name_ + ")"; } diff --git a/libs/full/components_base/src/server/wrapper_heap.cpp b/libs/full/components_base/src/server/wrapper_heap.cpp index 76f6480cece5..354d0590eff1 100644 --- a/libs/full/components_base/src/server/wrapper_heap.cpp +++ b/libs/full/components_base/src/server/wrapper_heap.cpp @@ -67,7 +67,7 @@ namespace hpx::components::detail { /////////////////////////////////////////////////////////////////////////// wrapper_heap::wrapper_heap(char const* class_name, - [[maybe_unused]] std::size_t count, heap_parameters parameters) + [[maybe_unused]] std::size_t count, heap_parameters const& parameters) : pool_(nullptr) , parameters_(parameters) , first_free_(nullptr) @@ -88,15 +88,15 @@ namespace hpx::components::detail { { throw std::bad_alloc(); } + // use the pool's base address as the first gid, this will also // allow for the ids to be locally resolvable base_gid_ = naming::replace_locality_id( - naming::replace_component_type( - naming::gid_type(pool_), 0), + naming::replace_component_type(naming::gid_type(pool_), 0), agas::get_locality_id()); naming::detail::set_credit_for_gid( - base_gid_, std::int64_t(HPX_GLOBALCREDIT_INITIAL)); + base_gid_, static_cast(HPX_GLOBALCREDIT_INITIAL)); } wrapper_heap::wrapper_heap() @@ -177,7 +177,9 @@ namespace hpx::components::detail { alloc_count_ += count; #endif - char* p = first_free_.fetch_add(count * parameters_.element_size, std::memory_order_relaxed); + char* p = first_free_.fetch_add( + static_cast(count * parameters_.element_size), + std::memory_order_relaxed); if (p + num_bytes > pool_ + total_num_bytes) { @@ -219,7 +221,8 @@ namespace hpx::components::detail { #if defined(HPX_DEBUG) free_count_ += count; #endif - size_t current_free_size = free_size_.fetch_add(count, std::memory_order_relaxed) + count; + size_t const current_free_size = + free_size_.fetch_add(count, std::memory_order_relaxed) + count; // release the pool if this one was the last allocated item if (current_free_size == parameters_.capacity) @@ -247,27 +250,12 @@ namespace hpx::components::detail { [[maybe_unused]] util::itt::heap_internal_access hia; HPX_ASSERT(did_alloc(p)); + HPX_ASSERT(base_gid_); -// if (!base_gid_) -// { -// std::unique_lock l(mtx_); -// if (!base_gid_) -// { -// // use the pool's base address as the first gid, this will also -// // allow for the ids to be locally resolvable -// base_gid_ = naming::replace_locality_id( -// naming::replace_component_type( -// naming::gid_type(pool_), type), -// agas::get_locality_id()); -// -// naming::detail::set_credit_for_gid( -// base_gid_, std::int64_t(HPX_GLOBALCREDIT_INITIAL)); -// } -// } naming::gid_type result = base_gid_; - if (type) { - result = naming::replace_component_type( - result, type); + if (type) + { + result = naming::replace_component_type(result, type); } result.set_lsb(p); @@ -289,15 +277,15 @@ namespace hpx::components::detail { bool wrapper_heap::free_pool() { HPX_ASSERT(pool_); - HPX_ASSERT(first_free_ == pool_ + - parameters_.capacity * parameters_.element_size); + HPX_ASSERT(first_free_ == + pool_ + parameters_.capacity * parameters_.element_size); // unbind in AGAS service if (base_gid_) { naming::gid_type base_gid = naming::invalid_gid; { -// std::unique_lock l(mtx_); + std::unique_lock l(mtx_); if (base_gid_) { base_gid = base_gid_; @@ -324,13 +312,19 @@ namespace hpx::components::detail { return false; } - first_free_ = (reinterpret_cast(pool_) % - parameters_.element_alignment == - 0) ? - pool_ : - pool_ + parameters_.element_alignment; + if (reinterpret_cast(pool_) % + parameters_.element_alignment == + 0) + { + first_free_.store(pool_, std::memory_order_relaxed); + } + else + { + first_free_.store(pool_ + parameters_.element_alignment, + std::memory_order_relaxed); + } -// free_size_ = parameters_.capacity; + free_size_.store(parameters_.capacity, std::memory_order_release); LOSH_(info).format("wrapper_heap ({}): init_pool ({}) size: {}.", !class_name_.empty() ? class_name_.c_str() : "", @@ -370,8 +364,9 @@ namespace hpx::components::detail { parameters_.capacity * parameters_.element_size; allocator_type::free(pool_, total_num_bytes); pool_ = nullptr; -// pool_ = first_free_ = nullptr; - free_size_ = 0; + + first_free_.store(nullptr, std::memory_order_relaxed); + free_size_.store(0, std::memory_order_release); } } } // namespace hpx::components::detail