diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b2b577a..acb7bce 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -16,7 +16,7 @@ default: - "test -d /local/gitlab-runner-local-cache || { echo \"FATAL: no local docker cache volume mapped\"; false; }" - export XDG_CACHE_HOME=/local/gitlab-runner-local-cache/.cache - export CONAN_USER_HOME=/local/gitlab-runner-local-cache - - export CONAN_HOME=/local/gitlab-runner-local-cache/.conan2 + - export CONAN_HOME=/local/gitlab-runner-local-cache/.conan_2.0.16 - /setup-conan.sh - echo "CI_DEPLOY_USER=${CI_DEPLOY_USER}" - echo "CI_DEPLOY_PASSWORD=${CI_DEPLOY_PASSWORD}" diff --git a/script b/script index b9a0b7d..6d266c2 160000 --- a/script +++ b/script @@ -1 +1 @@ -Subproject commit b9a0b7d16ed9b3977622fadd7061d3cab8b9e77d +Subproject commit 6d266c2de572dde8fcf70085b8fa960084aae663 diff --git a/src/llfs/ioring.test.cpp b/src/llfs/ioring.test.cpp index e69b48c..18b4b90 100644 --- a/src/llfs/ioring.test.cpp +++ b/src/llfs/ioring.test.cpp @@ -228,48 +228,51 @@ TEST(IoRingTest, EnqueueManyHandlers) { constexpr usize kNumThreads = 3; constexpr usize kNumHandlers = 50; + constexpr usize kNumIterations = 5000; - StatusOr io = IoRing::make_new(llfs::MaxQueueDepth{64}); - ASSERT_TRUE(io.ok()) << BATT_INSPECT(io.status()); + for (usize n = 0; n < kNumIterations; ++n) { + StatusOr io = IoRing::make_new(llfs::MaxQueueDepth{64}); + ASSERT_TRUE(io.ok()) << BATT_INSPECT(io.status()); - io->on_work_started(); + io->on_work_started(); - std::array status; - status.fill(batt::StatusCode::kUnknown); + std::array status; + status.fill(batt::StatusCode::kUnknown); - std::atomic begin{false}; - std::atomic counter{0}; + std::atomic begin{false}; + std::atomic counter{0}; - std::vector helper_threads; - for (usize i = 0; i < kNumThreads; ++i) { - helper_threads.emplace_back([&io, &begin, &status, i] { - while (!begin) { - std::this_thread::yield(); - } - status[i] = io->run(); - }); - } + std::vector helper_threads; + for (usize i = 0; i < kNumThreads; ++i) { + helper_threads.emplace_back([&io, &begin, &status, i] { + while (!begin) { + std::this_thread::yield(); + } + status[i] = io->run(); + }); + } - for (usize i = 0; i < kNumHandlers; ++i) { - io->post([&counter](llfs::StatusOr) { - counter++; - }); - } + for (usize i = 0; i < kNumHandlers; ++i) { + io->post([&counter](llfs::StatusOr) { + counter++; + }); + } - begin = true; + begin = true; - io->on_work_finished(); + io->on_work_finished(); - { - usize i = 0; - for (std::thread& t : helper_threads) { - t.join(); - EXPECT_TRUE(status[i].ok()) << BATT_INSPECT(status[i]) << BATT_INSPECT(i); - ++i; - }; - } + { + usize i = 0; + for (std::thread& t : helper_threads) { + t.join(); + EXPECT_TRUE(status[i].ok()) << BATT_INSPECT(status[i]) << BATT_INSPECT(i); + ++i; + }; + } - EXPECT_EQ(counter, kNumHandlers); + EXPECT_EQ(counter, kNumHandlers); + } } #ifdef BATT_PLATFORM_IS_LINUX diff --git a/src/llfs/ioring_impl.cpp b/src/llfs/ioring_impl.cpp index afe5357..ebea724 100644 --- a/src/llfs/ioring_impl.cpp +++ b/src/llfs/ioring_impl.cpp @@ -139,22 +139,8 @@ void IoRingImpl::on_work_finished() noexcept const isize prior_count = this->work_count_.fetch_sub(1); if (prior_count == 1) { - this->state_change_.notify_all(); + this->wake_all(); } - - // Submit a no-op to wake the run loop. - // - this->submit( - no_buffers(), - - /*handler=*/ - [this](StatusOr) { - }, - - /*start_op=*/ - [](struct io_uring_sqe* sqe, auto&& /*op_handler*/) { - io_uring_prep_nop(sqe); - }); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -217,6 +203,12 @@ Status IoRingImpl::run() noexcept // eventfd_read, UNLESS handler is nullptr. So in that case, wake up a waiting thread. // if (handler == nullptr) { + { + // See comment in IoRingImpl::wake_all() for an explanation of why this seemingly + // unproductive statement is necessary. + // + std::unique_lock queue_lock{this->queue_mutex_}; + } this->state_change_.notify_one(); } }); @@ -276,10 +268,12 @@ Status IoRingImpl::wait_for_ring_event() return batt::status_from_retval(retval); } - // If we are stopping, then write to the eventfd to wake up any other threads which might be - // blocked inside eventfd_read. + // If we are stopping, either because this->stop() was called or because the work count has gone + // to zero, then write to the eventfd to wake up any other threads which might be blocked inside + // eventfd_read. (This propagation of the event essentially turns the call to eventfd_write in + // IoRingImpl::wake_all() into a broadcast-to-all-threads.) // - if (this->needs_reset_) { + if (!this->can_run()) { eventfd_write(this->event_fd_, v); } @@ -290,14 +284,20 @@ Status IoRingImpl::wait_for_ring_event() // auto IoRingImpl::wait_for_completions() -> StatusOr { + const auto is_unblocked = [this] { // We aren't blocked if any of: + return !this->can_run() // - the run loop has been stopped + || !this->completions_.empty() // - there are completions to execute + || !this->event_wait_.load() // - no other thread is waiting for events + ; + }; + + //----- --- -- - - - - + std::unique_lock queue_lock{this->queue_mutex_}; - this->state_change_.wait(queue_lock, [this] { - return !this->can_run() // block while we `can_run`... - || !this->completions_.empty() // and there are no completions... - || !this->event_wait_.load() // and someone else is in event_wait. - ; - }); + while (!is_unblocked()) { + this->state_change_.wait(queue_lock); + } return {this->pop_completion_with_lock(queue_lock)}; } @@ -438,17 +438,7 @@ StatusOr IoRingImpl::transfer_completions(CompletionHandler** handler_out void IoRingImpl::stop() noexcept { this->needs_reset_.store(true); - this->state_change_.notify_all(); - - // Submit a no-op to wake the run loop. - // - this->submit( - no_buffers(), - [](StatusOr) { - }, - [](struct io_uring_sqe* sqe, auto&&) { - io_uring_prep_nop(sqe); - }); + this->wake_all(); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -464,10 +454,7 @@ void IoRingImpl::invoke_handler(CompletionHandler** handler) noexcept { auto on_scope_exit = batt::finally([handler, this] { *handler = nullptr; - const isize prior_count = this->work_count_.fetch_sub(1); - if (prior_count == 1) { - this->state_change_.notify_all(); - } + this->on_work_finished(); }); BATT_CHECK((*handler)->result); @@ -671,6 +658,61 @@ Status IoRingImpl::unregister_files_with_lock(const std::unique_lock return OkStatus(); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void IoRingImpl::wake_all() noexcept +{ + //+++++++++++-+-+--+----- --- -- - - - - + // We must create a barrier so that the condition variable wait in wait_for_completions() will + // always see either the changes on the current thread made prior to calling wake_all(), _or_ the + // call to notify_all(). + // + // wait_for_completions() is called by all threads who lose the race to call eventfd_read to wait + // directly on the io_uring event queue. Its primary mechanism of operation is a condition + // variable wait: + // + // std::unique_lock queue_lock{this->queue_mutex_}; + // + // while (!is_unblocked()) { + // this->state_change_.wait(queue_lock); + // } + // + // `is_unblocked()` may become true if, for example, the work counter goes from 1 to 0. In this + // case, without the queue_lock barrier, the following interleaving of events would be possible: + // + // 1. [Thread A] A1: Lock queue_mutex_ + // 2. [Thread A] A2: Read work counter, observe non-zero (implies may be blocked) + // 3. [Thread B] B1: Decrement work counter, 1 -> 0 + // 4. [Thread B] B2: Notify state_change_ condition variable + // 5. [Thread A] A3: Condition wait on state_change_ (atomic unlock-mutex-and-wait-for-notify) + // + // Since the call to notify_all (B2) happens strictly before the condition wait (A3), Thread A + // will wait indefinitely (BUG). + // + // With the queue_lock barrier in place (B1.5, between B1 and B2), this becomes impossible since + // A1..A3 form a critical section, which by definition must be serialized with all other critical + // sections (for this->queue_mutex_). In other words, either B1.5 happens-before A1 or B1.5 + // happens-after A3: + // + // - If B1.5 happens-before A1, then B1 also happens-before A2, which means A2 can't observe the + // pre-B1 value of the work counter. + // - If B1.5 happens-after A3, then B2 also happens-after A3, which means B2 will interrupt the + // condition wait. + // + { + std::unique_lock queue_lock{this->queue_mutex_}; + } + //+++++++++++-+-+--+----- --- -- - - - - + + // Wake any threads inside this->wait_for_completions(). + // + this->state_change_.notify_all(); + + // Wake any thread inside this->wait_for_ring_event(). + // + eventfd_write(this->event_fd_, 1); +} + } //namespace llfs #endif // LLFS_DISABLE_IO_URING diff --git a/src/llfs/ioring_impl.hpp b/src/llfs/ioring_impl.hpp index 1645738..df2072e 100644 --- a/src/llfs/ioring_impl.hpp +++ b/src/llfs/ioring_impl.hpp @@ -183,6 +183,17 @@ class IoRingImpl */ void invoke_handler(CompletionHandler** handler) noexcept; + /** \brief Wakes all threads inside a call to this->run(). + * + * There are two blocking mechanisms which this function needs to address: + * + * 1. eventfd_read(), used to block waiting for the io_uring to signal an event + * (one thread at a time) + * 2. this->state_change_.wait(), a condition variable wait used by all threads + * _not_ waiting in eventfd_read(), to receive notification that they can proceed + */ + void wake_all() noexcept; + //+++++++++++-+-+--+----- --- -- - - - - // Protects access to the io_uring context and associated data (registered_fds_, free_fds_,