Skip to content

Commit

Permalink
issue-1541: postpone DestroyHandle request if HandleOpsQueue overflows (
Browse files Browse the repository at this point in the history
#2394)

* postpone DestroyHandle request if HandleOpsQueue overflows

* get rid of unnecessary copying

* fix issue

* fix issues
  • Loading branch information
WilyTiger authored Nov 11, 2024
1 parent 34b31ed commit 79223f7
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 69 deletions.
76 changes: 48 additions & 28 deletions cloud/filestore/libs/vfs_fuse/fs_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,47 @@ void TFileSystem::CancelRequest(TCallContextPtr callContext, fuse_req_t req)
CompletionQueue->Complete(req, [&] (fuse_req_t) { return 0; });
}

void TFileSystem::CompleteAsyncDestroyHandle(
TCallContext& callContext,
const NProto::TDestroyHandleResponse& response)
{
const auto& error = response.GetError();
RequestStats->RequestCompleted(callContext, error);

// If destroy request failed, we need to retry it.
// Otherwise, remove it from queue.
if (HasError(error)) {
STORAGE_ERROR(
"DestroyHandle request failed: "
<< "filesystem " << Config->GetFileSystemId()
<< " error: " << FormatError(error));
if (GetErrorKind(error) != EErrorKind::ErrorRetriable) {
ReportAsyncDestroyHandleFailed();
with_lock (HandleOpsQueueLock) {
HandleOpsQueue->Pop();
}
}
} else {
with_lock (HandleOpsQueueLock) {
HandleOpsQueue->Pop();
}
with_lock (DelayedReleaseQueueLock) {
if (!DelayedReleaseQueue.empty()) {
const auto& nextRequest = DelayedReleaseQueue.front();
if (ProcessAsyncRelease(
nextRequest.CallContext,
nextRequest.Req,
nextRequest.Ino,
nextRequest.Fh))
{
DelayedReleaseQueue.pop();
}
}
}
}
ScheduleProcessHandleOpsQueue();
}

void TFileSystem::ProcessHandleOpsQueue()
{
TGuard g{HandleOpsQueueLock};
Expand Down Expand Up @@ -291,35 +332,14 @@ void TFileSystem::ProcessHandleOpsQueue()
RequestStats->RequestStarted(Log, *callContext);

Session->DestroyHandle(callContext, std::move(request))
.Subscribe([=, ptr = weak_from_this()] (const auto& future) {
const auto& response = future.GetValue();
const auto& error = response.GetError();
if (auto self = ptr.lock()) {
RequestStats->RequestCompleted(*callContext, error);

// If destroy request failed, we need to retry it.
// Otherwise, remove it from queue.
if (HasError(error)) {
STORAGE_ERROR(
"DestroyHandle request failed: "
<< "filesystem " << Config->GetFileSystemId()
<< " error: " << FormatError(error));
if (GetErrorKind(error) != EErrorKind::ErrorRetriable) {
ReportAsyncDestroyHandleFailed();
with_lock(HandleOpsQueueLock) {
HandleOpsQueue->Pop();
}
}
} else {
with_lock(HandleOpsQueueLock) {
HandleOpsQueue->Pop();
// TODO(#1541): check if we have delayed request
// due to queue overflow
}
.Subscribe(
[ptr = weak_from_this(), callContext](const auto& future)
{
const auto& response = future.GetValue();
if (auto self = ptr.lock()) {
self->CompleteAsyncDestroyHandle(*callContext, response);
}
ScheduleProcessHandleOpsQueue();
}
});
});
} else {
// TODO(#1541): process create handle
ReportHandleOpsQueueProcessError(
Expand Down
22 changes: 22 additions & 0 deletions cloud/filestore/libs/vfs_fuse/fs_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ struct TRangeLock

////////////////////////////////////////////////////////////////////////////////

struct TReleaseRequest
{
TCallContextPtr CallContext;
fuse_req_t Req;
fuse_ino_t Ino;
ui64 Fh;
};

////////////////////////////////////////////////////////////////////////////////

class TFileSystem final
: public IFileSystem
, public std::enable_shared_from_this<TFileSystem>
Expand Down Expand Up @@ -75,6 +85,9 @@ class TFileSystem final
THandleOpsQueuePtr HandleOpsQueue;
TMutex HandleOpsQueueLock;

TQueue<TReleaseRequest> DelayedReleaseQueue;
TMutex DelayedReleaseQueueLock;

public:
TFileSystem(
ILoggingServicePtr logging,
Expand Down Expand Up @@ -405,6 +418,15 @@ class TFileSystem final
fuse_req_t req,
const NProto::TNodeAttr& attrs);

bool ProcessAsyncRelease(
TCallContextPtr callContext,
fuse_req_t req,
fuse_ino_t ino,
ui64 fh);
void CompleteAsyncDestroyHandle(
TCallContext& callContext,
const NProto::TDestroyHandleResponse& response);

void ClearDirectoryCache();

void ScheduleProcessHandleOpsQueue();
Expand Down
66 changes: 37 additions & 29 deletions cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,39 @@ void TFileSystem::Open(
});
}

bool TFileSystem::ProcessAsyncRelease(
TCallContextPtr callContext,
fuse_req_t req,
fuse_ino_t ino,
ui64 fh)
{
with_lock (HandleOpsQueueLock) {
const auto res = HandleOpsQueue->AddDestroyRequest(ino, fh);
if (res == THandleOpsQueue::EResult::QueueOverflow) {
STORAGE_DEBUG(
"HandleOpsQueue overflow, can't add destroy handle request to "
"queue #"
<< ino << " @" << fh);
return false;
}
if (res == THandleOpsQueue::EResult::SerializationError) {
TStringBuilder msg;
msg << "Unable to add DestroyHandleRequest to HandleOpsQueue #"
<< ino << " @" << fh << ". Serialization failed";

ReportHandleOpsQueueProcessError(msg);

ReplyError(*callContext, MakeError(E_FAIL, msg), req, 0);
return true;
}
}

STORAGE_DEBUG(
"Destroy handle request added to queue #" << ino << " @" << fh);
ReplyError(*callContext, {}, req, 0);
return true;
}

void TFileSystem::Release(
TCallContextPtr callContext,
fuse_req_t req,
Expand All @@ -135,37 +168,12 @@ void TFileSystem::Release(
}

if (Config->GetAsyncDestroyHandleEnabled()) {
STORAGE_DEBUG("Add destroy handle request to queue #" << ino << " @" << fi->fh);
with_lock(HandleOpsQueueLock) {
const auto& res = HandleOpsQueue->AddDestroyRequest(ino, fi->fh);
if (res == THandleOpsQueue::EResult::QueueOveflow) {
// TODO(#1541): delay request
STORAGE_ERROR("Failed to add destroy handle request to queue");
ReplyError(
*callContext,
MakeError(E_FAIL, "HandleOpsQueue overflow"),
req,
0);
return;
}
if (res == THandleOpsQueue::EResult::SerializationError) {
TStringBuilder msg;
msg << "Unable to add DestroyHandleRequest to HandleOpsQueue #"
<< ino << " @" << fi->fh << ". Serialization failed";

ReportHandleOpsQueueProcessError(msg);

ReplyError(
*callContext,
MakeError(
E_FAIL,
msg),
req,
0);
return;
if (!ProcessAsyncRelease(callContext, req, ino, fi->fh)) {
with_lock (DelayedReleaseQueueLock) {
DelayedReleaseQueue.push(
TReleaseRequest(callContext, req, ino, fi->fh));
}
}
ReplyError(*callContext, {}, req, 0);
return;
}

Expand Down
42 changes: 32 additions & 10 deletions cloud/filestore/libs/vfs_fuse/fs_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
UNIT_ASSERT_VALUES_EQUAL(1, static_cast<int>(*errorCounter));
}

Y_UNIT_TEST(ShouldFailDestroyHandleRequestIfHandleOpsQueueOverflows)
Y_UNIT_TEST(ShouldPostponeDestroyHandleRequestIfHandleOpsQueueOverflows)
{
NProto::TFileStoreFeatures features;
features.SetAsyncDestroyHandleEnabled(true);
Expand All @@ -1604,33 +1604,55 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
Y_UNUSED(request);

if (++handlerCalled == 1) {
UNIT_ASSERT_VALUES_EQUAL(handle1, request->GetHandle());
UNIT_ASSERT_VALUES_EQUAL(nodeId1, request->GetNodeId());
return responsePromise;
}

UNIT_ASSERT_C(false, "Handler should not be called 2nd time");
if (handlerCalled == 2) {
UNIT_ASSERT_VALUES_EQUAL(handle2, request->GetHandle());
UNIT_ASSERT_VALUES_EQUAL(nodeId2, request->GetNodeId());
}

return NewPromise<NProto::TDestroyHandleResponse>();
});

bootstrap.Start();

auto counters = bootstrap.Counters
->FindSubgroup("component", "fs_ut")
->FindSubgroup("request", "DestroyHandle");
auto future =
bootstrap.Fuse->SendRequest<TReleaseRequest>(nodeId1, handle1);
UNIT_ASSERT_NO_EXCEPTION(future.GetValue(WaitTimeout));
UNIT_ASSERT_EQUAL(
0,
AtomicGet(counters->GetCounter("InProgress")->GetAtomic()));

future =
bootstrap.Fuse->SendRequest<TReleaseRequest>(nodeId2, handle2);
UNIT_ASSERT_NO_EXCEPTION(future.GetValue(WaitTimeout));

scheduler->RunAllScheduledTasks();
responsePromise.SetValue(NProto::TDestroyHandleResponse{});
// Second request should wait until the first request is processed.
UNIT_ASSERT_EXCEPTION(future.GetValue(WaitTimeout), yexception);
UNIT_ASSERT_EQUAL(
1,
AtomicGet(counters->GetCounter("InProgress")->GetAtomic()));

// Process first request.
scheduler->RunAllScheduledTasks();

responsePromise.SetValue(NProto::TDestroyHandleResponse{});
UNIT_ASSERT_EQUAL(1, handlerCalled);

auto counters = bootstrap.Counters
->FindSubgroup("component", "fs_ut")
->FindSubgroup("request", "DestroyHandle");
UNIT_ASSERT_EQUAL(1, counters->GetCounter("Errors")->GetAtomic());
// After the first request is processed, the second request should be
// completed and added to the HandleOpsQueue.
UNIT_ASSERT_NO_EXCEPTION(future.GetValue(WaitTimeout));
UNIT_ASSERT_EQUAL(
0,
AtomicGet(counters->GetCounter("InProgress")->GetAtomic()));

// Check that second request was added to the queue and processed later.
scheduler->RunAllScheduledTasks();
UNIT_ASSERT_EQUAL(2, handlerCalled);
}
}

Expand Down
2 changes: 1 addition & 1 deletion cloud/filestore/libs/vfs_fuse/handle_ops_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ THandleOpsQueue::EResult THandleOpsQueue::AddDestroyRequest(
}

if (!RequestsToProcess.Push(result)) {
return THandleOpsQueue::EResult::QueueOveflow;
return THandleOpsQueue::EResult::QueueOverflow;
}

return THandleOpsQueue::EResult::Ok;
Expand Down
2 changes: 1 addition & 1 deletion cloud/filestore/libs/vfs_fuse/handle_ops_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class THandleOpsQueue
enum class EResult
{
Ok,
QueueOveflow,
QueueOverflow,
SerializationError,
};

Expand Down

0 comments on commit 79223f7

Please sign in to comment.