Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for IoRing shutdown hang bug. #144

Merged
merged 4 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions src/llfs/ioring.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,48 +228,51 @@ TEST(IoRingTest, EnqueueManyHandlers)
{
constexpr usize kNumThreads = 3;
constexpr usize kNumHandlers = 50;
constexpr usize kNumIterations = 5000;

StatusOr<IoRing> io = IoRing::make_new(llfs::MaxQueueDepth{64});
ASSERT_TRUE(io.ok()) << BATT_INSPECT(io.status());
for (usize n = 0; n < kNumIterations; ++n) {
StatusOr<IoRing> io = IoRing::make_new(llfs::MaxQueueDepth{64});
ASSERT_TRUE(io.ok()) << BATT_INSPECT(io.status());

io->on_work_started();
io->on_work_started();

std::array<llfs::Status, kNumThreads> status;
status.fill(batt::StatusCode::kUnknown);
std::array<llfs::Status, kNumThreads> status;
status.fill(batt::StatusCode::kUnknown);

std::atomic<bool> begin{false};
std::atomic<i32> counter{0};
std::atomic<bool> begin{false};
std::atomic<i32> counter{0};

std::vector<std::thread> helper_threads;
for (usize i = 0; i < kNumThreads; ++i) {
helper_threads.emplace_back([&io, &begin, &status, i] {
while (!begin) {
std::this_thread::yield();
}
status[i] = io->run();
});
}
std::vector<std::thread> helper_threads;
for (usize i = 0; i < kNumThreads; ++i) {
helper_threads.emplace_back([&io, &begin, &status, i] {
while (!begin) {
std::this_thread::yield();
}
status[i] = io->run();
});
}

for (usize i = 0; i < kNumHandlers; ++i) {
io->post([&counter](llfs::StatusOr<i32>) {
counter++;
});
}
for (usize i = 0; i < kNumHandlers; ++i) {
io->post([&counter](llfs::StatusOr<i32>) {
counter++;
});
}

begin = true;
begin = true;

io->on_work_finished();
io->on_work_finished();

{
usize i = 0;
for (std::thread& t : helper_threads) {
t.join();
EXPECT_TRUE(status[i].ok()) << BATT_INSPECT(status[i]) << BATT_INSPECT(i);
++i;
};
}
{
usize i = 0;
for (std::thread& t : helper_threads) {
t.join();
EXPECT_TRUE(status[i].ok()) << BATT_INSPECT(status[i]) << BATT_INSPECT(i);
++i;
};
}

EXPECT_EQ(counter, kNumHandlers);
EXPECT_EQ(counter, kNumHandlers);
}
}

#ifdef BATT_PLATFORM_IS_LINUX
Expand Down
117 changes: 81 additions & 36 deletions src/llfs/ioring_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,8 @@ void IoRingImpl::on_work_finished() noexcept

const isize prior_count = this->work_count_.fetch_sub(1);
if (prior_count == 1) {
this->state_change_.notify_all();
this->wake_all();
}

// Submit a no-op to wake the run loop.
//
this->submit(
no_buffers(),

/*handler=*/
[this](StatusOr<i32>) {
},

/*start_op=*/
[](struct io_uring_sqe* sqe, auto&& /*op_handler*/) {
io_uring_prep_nop(sqe);
});
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down Expand Up @@ -217,6 +203,12 @@ Status IoRingImpl::run() noexcept
// eventfd_read, UNLESS handler is nullptr. So in that case, wake up a waiting thread.
//
if (handler == nullptr) {
{
// See comment in IoRingImpl::wake_all() for an explanation of why this seemingly
// unproductive statement is necessary.
//
std::unique_lock<std::mutex> queue_lock{this->queue_mutex_};
gabrielbornstein marked this conversation as resolved.
Show resolved Hide resolved
}
this->state_change_.notify_one();
}
});
Expand Down Expand Up @@ -276,10 +268,12 @@ Status IoRingImpl::wait_for_ring_event()
return batt::status_from_retval(retval);
}

// If we are stopping, then write to the eventfd to wake up any other threads which might be
// blocked inside eventfd_read.
// If we are stopping, either because this->stop() was called or because the work count has gone
// to zero, then write to the eventfd to wake up any other threads which might be blocked inside
// eventfd_read. (This propagation of the event essentially turns the call to eventfd_write in
// IoRingImpl::wake_all() into a broadcast-to-all-threads.)
//
if (this->needs_reset_) {
if (!this->can_run()) {
eventfd_write(this->event_fd_, v);
}

Expand All @@ -290,14 +284,20 @@ Status IoRingImpl::wait_for_ring_event()
//
auto IoRingImpl::wait_for_completions() -> StatusOr<CompletionHandler*>
{
const auto is_unblocked = [this] { // We aren't blocked if any of:
return !this->can_run() // - the run loop has been stopped
|| !this->completions_.empty() // - there are completions to execute
|| !this->event_wait_.load() // - no other thread is waiting for events
;
};

//----- --- -- - - - -

std::unique_lock<std::mutex> queue_lock{this->queue_mutex_};

this->state_change_.wait(queue_lock, [this] {
return !this->can_run() // block while we `can_run`...
|| !this->completions_.empty() // and there are no completions...
|| !this->event_wait_.load() // and someone else is in event_wait.
;
});
while (!is_unblocked()) {
this->state_change_.wait(queue_lock);
}

return {this->pop_completion_with_lock(queue_lock)};
}
Expand Down Expand Up @@ -438,17 +438,7 @@ StatusOr<usize> IoRingImpl::transfer_completions(CompletionHandler** handler_out
void IoRingImpl::stop() noexcept
{
this->needs_reset_.store(true);
this->state_change_.notify_all();

// Submit a no-op to wake the run loop.
//
this->submit(
no_buffers(),
[](StatusOr<i32>) {
},
[](struct io_uring_sqe* sqe, auto&&) {
io_uring_prep_nop(sqe);
});
this->wake_all();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -466,7 +456,7 @@ void IoRingImpl::invoke_handler(CompletionHandler** handler) noexcept
*handler = nullptr;
const isize prior_count = this->work_count_.fetch_sub(1);
if (prior_count == 1) {
this->state_change_.notify_all();
this->wake_all();
}
});

Expand Down Expand Up @@ -671,6 +661,61 @@ Status IoRingImpl::unregister_files_with_lock(const std::unique_lock<std::mutex>
return OkStatus();
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void IoRingImpl::wake_all() noexcept
{
//+++++++++++-+-+--+----- --- -- - - - -
// We must create a barrier so that the condition variable wait in wait_for_completions() will
// always see either the changes on the current thread made prior to calling wake_all(), _or_ the
// call to notify_all().
//
// wait_for_completions() is called by all threads who lose the race to call eventfd_read to wait
// directly on the io_uring event queue. Its primary mechanism of operation is a condition
// variable wait:
//
// std::unique_lock<std::mutex> queue_lock{this->queue_mutex_};
//
// while (!is_unblocked()) {
// this->state_change_.wait(queue_lock);
// }
//
// `is_unblocked()` may become true if, for example, the work counter goes from 1 to 0. In this
// case, without the queue_lock barrier, the following interleaving of events would be possible:
//
// 1. [Thread A] A1: Lock queue_mutex_
// 2. [Thread A] A2: Read work counter, observe non-zero (implies may be blocked)
// 3. [Thread B] B1: Decrement work counter, 1 -> 0
// 4. [Thread B] B2: Notify state_change_ condition variable
// 5. [Thread A] A3: Condition wait on state_change_ (atomic unlock-mutex-and-wait-for-notify)
//
// Since the call to notify_all (B2) happens strictly before the condition wait (A3), Thread A
// will wait indefinitely (BUG).
//
// With the queue_lock barrier in place (B1.5, between B1 and B2), this becomes impossible since
// A1..A3 form a critical section, which by definition must be serialized with all other critical
// sections (for this->queue_mutex_). In other words, either B1.5 happens-before A1 or B1.5
// happens-after A3:
//
// - If B1.5 happens-before A1, then B1 also happens-before A2, which means A2 can't observe the
// pre-B1 value of the work counter.
// - If B1.5 happens-after A3, then B2 also happens-after A3, which means B2 will interrupt the
// condition wait.
//
{
std::unique_lock<std::mutex> queue_lock{this->queue_mutex_};
gabrielbornstein marked this conversation as resolved.
Show resolved Hide resolved
}
//+++++++++++-+-+--+----- --- -- - - - -

// Wake any threads inside this->wait_for_completions().
//
this->state_change_.notify_all();

// Wake any thread inside this->wait_for_ring_event().
//
eventfd_write(this->event_fd_, 1);
}

} //namespace llfs

#endif // LLFS_DISABLE_IO_URING
11 changes: 11 additions & 0 deletions src/llfs/ioring_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ class IoRingImpl
*/
void invoke_handler(CompletionHandler** handler) noexcept;

/** \brief Wakes all threads inside a call to this->run().
*
* There are two blocking mechanisms which this function needs to address:
*
* 1. eventfd_read(), used to block waiting for the io_uring to signal an event
* (one thread at a time)
* 2. this->state_change_.wait(), a condition variable wait used by all threads
* _not_ waiting in eventfd_read(), to receive notification that they can proceed
*/
void wake_all() noexcept;

//+++++++++++-+-+--+----- --- -- - - - -

// Protects access to the io_uring context and associated data (registered_fds_, free_fds_,
Expand Down