Skip to content

Commit

Permalink
fix review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxim Deb Natkh committed Mar 29, 2024
1 parent 9d44221 commit e23324e
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 67 deletions.
40 changes: 19 additions & 21 deletions cloud/filestore/libs/storage/service/service_actor_writedata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
{
const auto* msg = ev->Get();

if (FAILED(msg->GetStatus())) {
WriteData(ctx, FormatError(msg->GetError()));
if (HasError(msg->GetError())) {
WriteData(ctx, msg->GetError());
return;
}

Expand All @@ -106,12 +106,12 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
ctx,
TFileStoreComponents::SERVICE,
"GenerateBlobIds response received: %s",
GenerateBlobIdsResponse.DebugString().c_str());
GenerateBlobIdsResponse.DebugString().Quote().c_str());

WriteBlobIfNeeded(ctx);
WriteBlobs(ctx);
}

void WriteBlobIfNeeded(const TActorContext& ctx)
void WriteBlobs(const TActorContext& ctx)
{
RemainingBlobsToWrite = GenerateBlobIdsResponse.BlobsSize();
ui64 offset = 0;
Expand Down Expand Up @@ -139,10 +139,8 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
LOG_DEBUG(
ctx,
TFileStoreComponents::SERVICE,
"Sending TEvPut request to blob storage, blobId: %s, size: "
"%lu, proxy: %s",
"Sending TEvPut request to blob storage, blobId: %s, proxy: %s",
blobId.ToString().c_str(),
blobId.BlobSize(),
proxy.ToString().c_str());
SendToBSProxy(ctx, proxy, request.release(), blobId.Cookie());
offset += blobId.BlobSize();
Expand All @@ -156,17 +154,17 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
const auto* msg = ev->Get();

if (msg->Status != NKikimrProto::OK) {
const auto errorReason = FormatError(
MakeError(MAKE_KIKIMR_ERROR(msg->Status), msg->ErrorReason));
const auto error =
MakeError(MAKE_KIKIMR_ERROR(msg->Status), msg->ErrorReason);
LOG_WARN(
ctx,
TFileStoreComponents::SERVICE,
"WriteData error: %s",
errorReason.c_str());
msg->ErrorReason.Quote().c_str());
// We still may receive some responses, but we do not want to
// process them
RemainingBlobsToWrite = std::numeric_limits<ui32>::max();
return WriteData(ctx, errorReason);
return WriteData(ctx, error);
}

LOG_DEBUG(
Expand All @@ -191,16 +189,16 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
request->Record.SetHandle(WriteRequest.GetHandle());
request->Record.SetOffset(WriteRequest.GetOffset());
request->Record.SetLength(WriteRequest.GetBuffer().size());
for (const auto& blobId: GenerateBlobIdsResponse.GetBlobs()) {
request->Record.AddBlobIds()->CopyFrom(blobId.GetBlobId());
for (auto& blob: *GenerateBlobIdsResponse.MutableBlobs()) {
request->Record.AddBlobIds()->Swap(blob.MutableBlobId());
}
request->Record.SetCommitId(GenerateBlobIdsResponse.GetCommitId());

LOG_DEBUG(
ctx,
TFileStoreComponents::SERVICE,
"Sending AddData request to tablet: %s",
request->Record.DebugString().c_str());
request->Record.DebugString().Quote().c_str());

ctx.Send(MakeIndexTabletProxyServiceId(), request.release());
}
Expand All @@ -211,8 +209,8 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
{
auto* msg = ev->Get();

if (FAILED(msg->GetStatus())) {
return WriteData(ctx, FormatError(msg->GetError()));
if (HasError(msg->GetError())) {
return WriteData(ctx, msg->GetError());
}

auto response = std::make_unique<TEvService::TEvWriteDataResponse>();
Expand All @@ -224,7 +222,7 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
/**
* @brief Fallback to regular write if two-stage write fails for any reason
*/
void WriteData(const TActorContext& ctx, const TString& fallbackReason)
void WriteData(const TActorContext& ctx, const NProto::TError& error)
{
LOG_WARN(
ctx,
Expand All @@ -235,7 +233,7 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
WriteRequest.GetHandle(),
WriteRequest.GetOffset(),
WriteRequest.GetBuffer().size(),
fallbackReason.Quote().c_str());
FormatError(error).Quote().c_str());

auto request = std::make_unique<TEvService::TEvWriteDataRequest>();
request->Record = std::move(WriteRequest);
Expand All @@ -250,7 +248,7 @@ class TWriteDataActor final: public TActorBootstrapped<TWriteDataActor>
{
auto* msg = ev->Get();

if (FAILED(msg->GetStatus())) {
if (HasError(msg->GetError())) {
HandleError(ctx, msg->GetError());
return;
}
Expand Down Expand Up @@ -332,7 +330,7 @@ void TStorageServiceActor::HandleWriteData(
if (filestore.GetFeatures().GetThreeStageWriteEnabled() &&
msg->Record.GetOffset() % blockSize == 0 &&
msg->Record.GetBuffer().Size() % blockSize == 0 &&
msg->Record.GetBuffer().Size() >
msg->Record.GetBuffer().Size() >=
filestore.GetFeatures().GetThreeStageWriteThreshold())
{
LOG_DEBUG(
Expand Down
83 changes: 37 additions & 46 deletions cloud/filestore/libs/storage/service/service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2040,9 +2040,9 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)

TString GenerateValidateData(ui32 size)
{
TString data(size, '\0');
TString data(size, 0);
for (ui32 i = 0; i < size; ++i) {
data[i] = 'A' + (i % 25);
data[i] = 'A' + (i % ('Z' - 'A' + 1));
}
return data;
}
Expand All @@ -2054,7 +2054,6 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)

ui32 nodeIdx = env.CreateNode("nfs");


TServiceClient service(env.GetRuntime(), nodeIdx);
const TString fs = "test";
service.CreateFileStore(fs, 1000);
Expand All @@ -2076,26 +2075,26 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
}

auto headers = service.InitSession(fs, "client");
ui64 nodeId =
service
.CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();
ui64 handle =
service
.CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR)
->Record.GetHandle();
ui64 nodeId = service
.CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();
ui64 handle = service
.CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR)
->Record.GetHandle();

ui32 putRequestCount = 0;
TActorId worker;
env.GetRuntime().SetObserverFunc(
[&](TAutoPtr<IEventHandle>& event)
env.GetRuntime().SetEventFilter(
[&](auto& runtime, auto& event)
{
Y_UNUSED(runtime);
switch (event->GetTypeRewrite()) {
case TEvIndexTablet::EvGenerateBlobIdsRequest: {
if (!worker) {
worker = event->Sender;
}
break;
}
case TEvBlobStorage::EvPut: {
if (event->Sender == worker &&
Expand All @@ -2104,12 +2103,10 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
{
++putRequestCount;
}
break;
}

break;
}

return TTestActorRuntime::DefaultObserverFunc(event);
return false;
});

auto& runtime = env.GetRuntime();
Expand All @@ -2130,7 +2127,7 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
UNIT_ASSERT_VALUES_EQUAL(1, runtime.GetCounter(TEvIndexTabletPrivate::EvAddBlobRequest));
UNIT_ASSERT_VALUES_EQUAL(0, runtime.GetCounter(TEvIndexTabletPrivate::EvWriteBlobRequest));
UNIT_ASSERT_VALUES_EQUAL(1, runtime.GetCounter(TEvService::EvWriteDataResponse));
UNIT_ASSERT_VALUES_EQUAL(putRequestCount, expectedPutCount);
UNIT_ASSERT_VALUES_EQUAL(expectedPutCount, putRequestCount);
// clang-format on
runtime.ClearCounters();
putRequestCount = 0;
Expand All @@ -2146,19 +2143,19 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
DefaultBlockSize * BlockGroupSize * 10,
11);
validateWriteData(0, DefaultBlockSize * BlockGroupSize * 3, 3);
// Currently there are 1 + 1 + 64 + 64 + 64 * 10 + 64 * 3 = 962 blocks
// out of 1000. Therefore, the next write should fail
// Currently the data is written from 0th to (1 + BlockGroupSize * 10) = 641th block
// Therefore, the next write should fail

auto data =
GenerateValidateData(DefaultBlockSize * BlockGroupSize * 30);
GenerateValidateData(DefaultBlockSize * 360);

auto response =
service.AssertWriteDataFailed(headers, fs, nodeId, handle, 0, data);
service.AssertWriteDataFailed(headers, fs, nodeId, handle, DefaultBlockSize * 641, data);
auto error = STATUS_FROM_CODE(response->GetError().GetCode());
UNIT_ASSERT_VALUES_EQUAL(error, (ui32)NProto::E_FS_NOSPC);
UNIT_ASSERT_VALUES_EQUAL((ui32)NProto::E_FS_NOSPC, error);
}

Y_UNIT_TEST(ShouldNotUseThreeStageWriteForSmallOrUnaligned)
Y_UNIT_TEST(ShouldNotUseThreeStageWriteForSmallOrUnalignedRequests)
{
TTestEnv env;
env.CreateSubDomain("nfs");
Expand All @@ -2182,15 +2179,13 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
}

auto headers = service.InitSession(fs, "client");
ui64 nodeId =
service
.CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();
ui64 handle =
service
.CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR)
->Record.GetHandle();
ui64 nodeId = service
.CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();
ui64 handle = service
.CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR)
->Record.GetHandle();

auto& runtime = env.GetRuntime();

Expand All @@ -2216,7 +2211,7 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
validateWriteData(1, 128_KB);
}

Y_UNIT_TEST(ShouldFallbackThreeStageWrite)
Y_UNIT_TEST(ShouldFallbackThreeStageWriteToSimpleWrite)
{
TTestEnv env;
env.CreateSubDomain("nfs");
Expand Down Expand Up @@ -2259,17 +2254,13 @@ Y_UNIT_TEST_SUITE(TStorageServiceTest)
}

auto headers = service.InitSession(fs, "client");

ui64 nodeId =
service
.CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();

ui64 handle =
service
.CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR)
->Record.GetHandle();
ui64 nodeId = service
.CreateNode(headers, TCreateNodeArgs::File(RootNodeId, "file"))
->Record.GetNode()
.GetId();
ui64 handle = service
.CreateHandle(headers, fs, nodeId, "", TCreateHandleArgs::RDWR)
->Record.GetHandle();

// GenerateBlobIdsResponse fails
TString data = GenerateValidateData(256_KB);
Expand Down

0 comments on commit e23324e

Please sign in to comment.