diff --git a/cloud/filestore/config/storage.proto b/cloud/filestore/config/storage.proto index 8726d5f64c2..804926b8a36 100644 --- a/cloud/filestore/config/storage.proto +++ b/cloud/filestore/config/storage.proto @@ -231,4 +231,13 @@ message TStorageConfig optional uint32 CleanupThresholdAverage = 342; // Enables the aforementioned threshold. optional bool NewCleanupEnabled = 343; + + // Enables GenerateBlobIds + WriteBlob + AddData instead of WriteBlob + // for writing. + optional bool ThreeStageWriteEnabled = 344; + + // When issuing blob ids, the tablet acquires a collect barrier. In order + // to release it in case of a client disconnect, this timeout is used. + optional uint32 GenerateBlobIdsReleaseCollectBarrierTimeout = 345; + } diff --git a/cloud/filestore/libs/storage/api/tablet.h b/cloud/filestore/libs/storage/api/tablet.h index 4c5cd91a5e5..d88758b18fd 100644 --- a/cloud/filestore/libs/storage/api/tablet.h +++ b/cloud/filestore/libs/storage/api/tablet.h @@ -24,6 +24,8 @@ namespace NCloud::NFileStore::NStorage { xxx(ChangeStorageConfig, __VA_ARGS__) \ xxx(DescribeData, __VA_ARGS__) \ xxx(DescribeSessions, __VA_ARGS__) \ + xxx(GenerateBlobIds, __VA_ARGS__) \ + xxx(AddData, __VA_ARGS__) \ // FILESTORE_TABLET_REQUESTS //////////////////////////////////////////////////////////////////////////////// @@ -65,6 +67,12 @@ struct TEvIndexTablet EvDescribeSessionsRequest = EvBegin + 17, EvDescribeSessionsResponse, + EvGenerateBlobIdsRequest = EvBegin + 19, + EvGenerateBlobIdsResponse, + + EvAddDataRequest = EvBegin + 21, + EvAddDataResponse, + EvEnd }; diff --git a/cloud/filestore/libs/storage/core/config.cpp b/cloud/filestore/libs/storage/core/config.cpp index 93128dfdf14..d30c966af53 100644 --- a/cloud/filestore/libs/storage/core/config.cpp +++ b/cloud/filestore/libs/storage/core/config.cpp @@ -139,11 +139,16 @@ namespace { NCloud::NProto::AUTHORIZATION_IGNORE )\ \ xxx(TwoStageReadEnabled, bool, false )\ + xxx(ThreeStageWriteEnabled, bool, false )\ xxx(EntryTimeout, TDuration, TDuration::Zero() )\ xxx(NegativeEntryTimeout, TDuration, TDuration::Zero() )\ xxx(AttrTimeout, TDuration, TDuration::Zero() )\ xxx(MaxOutOfOrderCompactionMapLoadRequestsInQueue, ui32, 5 )\ xxx(MaxBackpressureErrorsBeforeSuicide, ui32, 1000 )\ + \ + xxx(GenerateBlobIdsReleaseCollectBarrierTimeout, \ + TDuration, \ + TDuration::Seconds(10) )\ // FILESTORE_STORAGE_CONFIG #define FILESTORE_DECLARE_CONFIG(name, type, value) \ diff --git a/cloud/filestore/libs/storage/core/config.h b/cloud/filestore/libs/storage/core/config.h index d2dfae3902c..015ca29137f 100644 --- a/cloud/filestore/libs/storage/core/config.h +++ b/cloud/filestore/libs/storage/core/config.h @@ -181,6 +181,7 @@ class TStorageConfig NCloud::NProto::EAuthorizationMode GetAuthorizationMode() const; bool GetTwoStageReadEnabled() const; + bool GetThreeStageWriteEnabled() const; TDuration GetEntryTimeout() const; TDuration GetNegativeEntryTimeout() const; TDuration GetAttrTimeout() const; @@ -191,6 +192,8 @@ class TStorageConfig ui32 GetMaxBackpressureErrorsBeforeSuicide() const; + TDuration GetGenerateBlobIdsReleaseCollectBarrierTimeout() const; + void Dump(IOutputStream& out) const; void DumpHtml(IOutputStream& out) const; void DumpOverridesHtml(IOutputStream& out) const; diff --git a/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp new file mode 100644 index 00000000000..d7a08cda887 --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.cpp @@ -0,0 +1,140 @@ +#include "tablet_adddata.h" + +#include +#include +#include +#include +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; + +//////////////////////////////////////////////////////////////////////////////// + +LWTRACE_USING(FILESTORE_STORAGE_PROVIDER); + +//////////////////////////////////////////////////////////////////////////////// + +TAddDataActor::TAddDataActor( + ITraceSerializerPtr traceSerializer, + TString logTag, + TActorId tablet, + TRequestInfoPtr requestInfo, + ui64 commitId, + TVector blobs, + TWriteRange writeRange) + : TraceSerializer(std::move(traceSerializer)) + , LogTag(std::move(logTag)) + , Tablet(tablet) + , RequestInfo(std::move(requestInfo)) + , CommitId(commitId) + , Blobs(std::move(blobs)) + , WriteRange(writeRange) +{} + +void TAddDataActor::Bootstrap(const TActorContext& ctx) +{ + FILESTORE_TRACK( + RequestReceived_TabletWorker, + RequestInfo->CallContext, + "AddData"); + + AddBlob(ctx); + Become(&TThis::StateWork); +} + +void TAddDataActor::AddBlob(const TActorContext& ctx) +{ + auto request = std::make_unique( + RequestInfo->CallContext); + request->Mode = EAddBlobMode::Write; + request->WriteRanges.push_back(WriteRange); + + for (const auto& blob: Blobs) { + request->MergedBlobs.emplace_back( + blob.BlobId, + blob.Block, + blob.BlocksCount); + } + + NCloud::Send(ctx, Tablet, std::move(request)); +} + +void TAddDataActor::HandleAddBlobResponse( + const TEvIndexTabletPrivate::TEvAddBlobResponse::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + ReplyAndDie(ctx, msg->GetError()); +} + +void TAddDataActor::HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + ReplyAndDie(ctx, MakeError(E_REJECTED, "tablet is shutting down")); +} + +void TAddDataActor::ReplyAndDie( + const TActorContext& ctx, + const NProto::TError& error) +{ + // notify tablet + NCloud::Send( + ctx, + // We try to release commit barrier twice: once for the lock + // acquired by the GenerateBlob request and once for the lock + // acquired by the AddData request. Though, the first lock is + // scheduled to be released, it is better to release it as early + // as possible. + Tablet, + std::make_unique( + CommitId, + 2)); + + FILESTORE_TRACK( + ResponseSent_TabletWorker, + RequestInfo->CallContext, + "AddData"); + + if (RequestInfo->Sender != Tablet) { + auto response = + std::make_unique(error); + LOG_DEBUG( + ctx, + TFileStoreComponents::TABLET_WORKER, + "%s AddData: #%lu completed (%s)", + LogTag.c_str(), + RequestInfo->CallContext->RequestId, + FormatError(response->Record.GetError()).c_str()); + + BuildTraceInfo( + TraceSerializer, + RequestInfo->CallContext, + response->Record); + BuildThrottlerInfo(*RequestInfo->CallContext, response->Record); + + NCloud::Reply(ctx, *RequestInfo, std::move(response)); + } + + Die(ctx); +} + +STFUNC(TAddDataActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); + + HFunc(TEvIndexTabletPrivate::TEvAddBlobResponse, HandleAddBlobResponse); + + default: + HandleUnexpectedEvent(ev, TFileStoreComponents::TABLET_WORKER); + break; + } +} + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.h b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.h new file mode 100644 index 00000000000..35645e5a8bc --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/actors/tablet_adddata.h @@ -0,0 +1,59 @@ +#include +#include +#include +#include + +#include +#include + +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; +using namespace NKikimr; + +//////////////////////////////////////////////////////////////////////////////// + +class TAddDataActor final: public TActorBootstrapped +{ +private: + const ITraceSerializerPtr TraceSerializer; + + const TString LogTag; + const TActorId Tablet; + const TRequestInfoPtr RequestInfo; + + const ui64 CommitId; + /*const*/ TVector Blobs; + const TWriteRange WriteRange; + +public: + TAddDataActor( + ITraceSerializerPtr traceSerializer, + TString logTag, + TActorId tablet, + TRequestInfoPtr requestInfo, + ui64 commitId, + TVector blobs, + TWriteRange writeRange); + + void Bootstrap(const TActorContext& ctx); + +private: + STFUNC(StateWork); + + void AddBlob(const TActorContext& ctx); + void HandleAddBlobResponse( + const TEvIndexTabletPrivate::TEvAddBlobResponse::TPtr& ev, + const TActorContext& ctx); + + void HandlePoisonPill( + const TEvents::TEvPoisonPill::TPtr& ev, + const TActorContext& ctx); + + void + ReplyAndDie(const TActorContext& ctx, const NProto::TError& error = {}); +}; + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/actors/ya.make b/cloud/filestore/libs/storage/tablet/actors/ya.make index 91d3be4d7b6..389d926dc02 100644 --- a/cloud/filestore/libs/storage/tablet/actors/ya.make +++ b/cloud/filestore/libs/storage/tablet/actors/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( tablet_writedata.cpp + tablet_adddata.cpp ) PEERDIR( diff --git a/cloud/filestore/libs/storage/tablet/model/garbage_queue.cpp b/cloud/filestore/libs/storage/tablet/model/garbage_queue.cpp index 36e3b14af1f..1c47512b29d 100644 --- a/cloud/filestore/libs/storage/tablet/model/garbage_queue.cpp +++ b/cloud/filestore/libs/storage/tablet/model/garbage_queue.cpp @@ -215,6 +215,12 @@ bool TGarbageQueue::TryReleaseCollectBarrier(ui64 commitId) return true; } +bool TGarbageQueue::IsCollectBarrierAcquired(ui64 commitId) const +{ + auto it = Impl->Barriers.find(commitId); + return it != Impl->Barriers.end() && it->RefCount > 0; +} + ui64 TGarbageQueue::GetCollectCommitId() const { if (Impl->Barriers) { diff --git a/cloud/filestore/libs/storage/tablet/model/garbage_queue.h b/cloud/filestore/libs/storage/tablet/model/garbage_queue.h index 8f78d58bed7..bbdb12da8ab 100644 --- a/cloud/filestore/libs/storage/tablet/model/garbage_queue.h +++ b/cloud/filestore/libs/storage/tablet/model/garbage_queue.h @@ -47,6 +47,7 @@ class TGarbageQueue void AcquireCollectBarrier(ui64 commitId); [[ nodiscard ]] bool TryReleaseCollectBarrier(ui64 commitId); + bool IsCollectBarrierAcquired(ui64 commitId) const; ui64 GetCollectCommitId() const; }; diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor.cpp index a6693bc055f..eebb95eb427 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor.cpp @@ -361,6 +361,12 @@ TIndexTabletActor::ValidateWriteRequest( const NProto::TWriteDataRequest& request, const TByteRange& range); +template NProto::TError +TIndexTabletActor::ValidateWriteRequest( + const TActorContext& ctx, + const NProtoPrivate::TAddDataRequest& request, + const TByteRange& range); + //////////////////////////////////////////////////////////////////////////////// NProto::TError TIndexTabletActor::IsDataOperationAllowed() const @@ -551,6 +557,7 @@ STFUNC(TIndexTabletActor::StateBoot) IgnoreFunc(TEvLocal::TEvTabletMetrics); IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateCounters); IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters); + IgnoreFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier); IgnoreFunc(TEvHiveProxy::TEvReassignTabletResponse); @@ -574,6 +581,7 @@ STFUNC(TIndexTabletActor::StateInit) HFunc(TEvFileStore::TEvUpdateConfig, HandleUpdateConfig); HFunc(TEvIndexTabletPrivate::TEvUpdateCounters, HandleUpdateCounters); HFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters, HandleUpdateLeakyBucketCounters); + HFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier, HandleReleaseCollectBarrier); FILESTORE_HANDLE_REQUEST(WaitReady, TEvIndexTablet) @@ -601,6 +609,8 @@ STFUNC(TIndexTabletActor::StateWork) HFunc(TEvIndexTabletPrivate::TEvUpdateCounters, HandleUpdateCounters); HFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters, HandleUpdateLeakyBucketCounters); + HFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier, HandleReleaseCollectBarrier); + HFunc(TEvents::TEvWakeup, HandleWakeup); HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); @@ -638,6 +648,8 @@ STFUNC(TIndexTabletActor::StateZombie) IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateCounters); IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters); + IgnoreFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier); + IgnoreFunc(TEvIndexTabletPrivate::TEvReadDataCompleted); IgnoreFunc(TEvIndexTabletPrivate::TEvWriteDataCompleted); @@ -663,6 +675,7 @@ STFUNC(TIndexTabletActor::StateBroken) switch (ev->GetTypeRewrite()) { IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateCounters); IgnoreFunc(TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters); + IgnoreFunc(TEvIndexTabletPrivate::TEvReleaseCollectBarrier); HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); HFunc(TEvTablet::TEvTabletDead, HandleTabletDead); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor.h b/cloud/filestore/libs/storage/tablet/tablet_actor.h index 6df591bb01d..5e8cf1e48b7 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor.h +++ b/cloud/filestore/libs/storage/tablet/tablet_actor.h @@ -405,6 +405,10 @@ class TIndexTabletActor final const TEvIndexTabletPrivate::TEvUpdateLeakyBucketCounters::TPtr& ev, const NActors::TActorContext& ctx); + void HandleReleaseCollectBarrier( + const TEvIndexTabletPrivate::TEvReleaseCollectBarrier::TPtr& ev, + const NActors::TActorContext& ctx); + void HandleReadDataCompleted( const TEvIndexTabletPrivate::TEvReadDataCompleted::TPtr& ev, const NActors::TActorContext& ctx); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp new file mode 100644 index 00000000000..e6a5b2c5dce --- /dev/null +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_adddata.cpp @@ -0,0 +1,372 @@ +#include "tablet_actor.h" + +#include +#include + +#include + +#include + +#include +#include +#include + +namespace NCloud::NFileStore::NStorage { + +using namespace NActors; + +using namespace NKikimr; +using namespace NKikimr::NTabletFlatExecutor; + + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +/** + * @param range Aligned byte range + * @returns The vector of sizes of the blobs that the data should be split into. + */ +TVector SplitData(ui32 blockSize, TByteRange range) +{ + TVector blobSizes; + blobSizes.reserve(range.AlignedBlockCount() / BlockGroupSize + 1); + + SplitRange( + range.FirstAlignedBlock(), + range.AlignedBlockCount(), + BlockGroupSize, + [&](ui32 /*blockOffset*/, ui64 blocksCount) + { blobSizes.push_back(blocksCount * blockSize); }); + + return blobSizes; +} + +} // namespace + +class TWriteDataActor; + +//////////////////////////////////////////////////////////////////////////////// + +bool TIndexTabletActor::PrepareTx_AddData( + const TActorContext& ctx, + TTransactionContext& tx, + TTxIndexTablet::TAddData& args) +{ + auto* session = + FindSession(args.ClientId, args.SessionId, args.SessionSeqNo); + if (!session) { + args.Error = ErrorInvalidSession( + args.ClientId, + args.SessionId, + args.SessionSeqNo); + return true; + } + + auto* handle = FindHandle(args.Handle); + if (!handle || handle->Session != session) { + args.Error = ErrorInvalidHandle(args.Handle); + return true; + } + + if (!HasFlag(handle->GetFlags(), NProto::TCreateHandleRequest::E_WRITE)) { + args.Error = ErrorInvalidHandle(args.Handle); + return true; + } + + args.NodeId = handle->GetNodeId(); + ui64 commitId = GetCurrentCommitId(); + + LOG_TRACE( + ctx, + TFileStoreComponents::TABLET, + "%s AddData tx %lu @%lu %s", + LogTag.c_str(), + args.CommitId, + args.NodeId, + args.ByteRange.Describe().c_str()); + + TIndexTabletDatabase db(tx.DB); + + if (!ReadNode(db, args.NodeId, commitId, args.Node)) { + return false; + } + + // TODO: access check + TABLET_VERIFY(args.Node); + if (!HasSpaceLeft(args.Node->Attrs, args.ByteRange.End())) { + args.Error = ErrorNoSpaceLeft(); + return true; + } + + return true; +} + +void TIndexTabletActor::ExecuteTx_AddData( + const TActorContext& ctx, + TTransactionContext& tx, + TTxIndexTablet::TAddData& args) +{ + Y_UNUSED(ctx, tx, args); +} + +void TIndexTabletActor::CompleteTx_AddData( + const TActorContext& ctx, + TTxIndexTablet::TAddData& args) +{ + RemoveTransaction(*args.RequestInfo); + + auto reply = [&](const TActorContext& ctx, TTxIndexTablet::TAddData& args) + { + TABLET_VERIFY(TryReleaseCollectBarrier(args.CommitId)); + TryReleaseCollectBarrier(args.CommitId); + + auto response = + std::make_unique(args.Error); + CompleteResponse( + response->Record, + args.RequestInfo->CallContext, + ctx); + + NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); + }; + + if (HasError(args.Error)) { + reply(ctx, args); + return; + } + + TVector blobs; + SplitRange( + args.ByteRange.FirstAlignedBlock(), + args.ByteRange.AlignedBlockCount(), + BlockGroupSize, + [&](ui32 blockOffset, ui32 blocksCount) + { + TBlock block{ + args.NodeId, + IntegerCast( + args.ByteRange.FirstAlignedBlock() + blockOffset), + // correct CommitId will be assigned later in AddBlobs + InvalidCommitId, + InvalidCommitId}; + blobs.emplace_back( + TPartialBlobId(), // need to generate BlobId later + block, + blocksCount, + /* data buffer */ ""); + }); + + if (blobs.empty() || blobs.size() != args.BlobIds.size()) { + args.Error = MakeError( + E_ARGUMENT, + TStringBuilder() << "blobs count mismatch: expected" << blobs.size() + << " got " << args.BlobIds.size()); + reply(ctx, args); + return; + } + + for (size_t i = 0; i < blobs.size(); ++i) { + auto& targetBlob = blobs[i]; + auto& srcBlob = args.BlobIds[i]; + targetBlob.BlobId = TPartialBlobId( + srcBlob.Generation(), + srcBlob.Step(), + srcBlob.Channel(), + srcBlob.BlobSize(), + srcBlob.Cookie(), + srcBlob.PartId()); + } + auto actor = std::make_unique( + TraceSerializer, + LogTag, + ctx.SelfID, + args.RequestInfo, + args.CommitId, + std::move(blobs), + TWriteRange{args.NodeId, args.ByteRange.End()}); + + auto actorId = NCloud::Register(ctx, std::move(actor)); + WorkerActors.insert(actorId); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TIndexTabletActor::HandleGenerateBlobIds( + const TEvIndexTablet::TEvGenerateBlobIdsRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + LOG_DEBUG( + ctx, + TFileStoreComponents::TABLET, + "%s %s: %s", + LogTag.c_str(), + "GenerateBlobIds", + DumpMessage(msg->Record).c_str()); + + // It is up to the client to provide an aligned range, but we still verify + // it and reject the request if it is not aligned. + const ui32 blockSize = GetBlockSize(); + if (msg->Record.GetLength() % blockSize != 0 || + msg->Record.GetOffset() % blockSize != 0) + { + auto response = + std::make_unique( + MakeError(E_ARGUMENT, "unaligned range")); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + ui64 commitId = GenerateCommitId(); + if (commitId == InvalidCommitId) { + return RebootTabletOnCommitOverflow(ctx, "GenerateBlobIds"); + } + + // We schedule this event for the case if the client does not call AddData. + // Thus we ensure that the collect barrier will be released eventually. + ctx.Schedule( + Config->GetGenerateBlobIdsReleaseCollectBarrierTimeout(), + new TEvIndexTabletPrivate::TEvReleaseCollectBarrier(commitId, 1)); + AcquireCollectBarrier(commitId); + + TByteRange range( + msg->Record.GetOffset(), + msg->Record.GetLength(), + blockSize); + + auto response = + std::make_unique(); + ui64 offset = range.Offset; + for (auto [blobIndex, length]: Enumerate(SplitData(blockSize, range))) { + TPartialBlobId partialBlobId; + // TODO(debnatkh): better selection of channel + + const auto ok = + GenerateBlobId(commitId, length, blobIndex, &partialBlobId); + if (!ok) { + ReassignDataChannelsIfNeeded(ctx); + + auto response = + std::make_unique( + MakeError(E_FS_OUT_OF_SPACE, "failed to generate blobId")); + + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + auto generatedBlob = response->Record.MutableBlobs()->Add(); + LogoBlobIDFromLogoBlobID( + MakeBlobId(TabletID(), partialBlobId), + generatedBlob->MutableBlobId()); + generatedBlob->SetOffset(offset); + generatedBlob->SetBSGroupId(Info()->GroupFor( + partialBlobId.Channel(), + partialBlobId.Generation())); + offset += length; + } + + // TODO(debnatkh): Throttling + + response->Record.SetCommitId(commitId); + + NCloud::Reply(ctx, *ev, std::move(response)); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TIndexTabletActor::HandleAddData( + const TEvIndexTablet::TEvAddDataRequest::TPtr& ev, + const TActorContext& ctx) +{ + auto* msg = ev->Get(); + + if (auto error = IsDataOperationAllowed(); HasError(error)) { + NCloud::Reply( + ctx, + *ev, + std::make_unique( + std::move(error))); + return; + } + + auto commitId = msg->Record.GetCommitId(); + if (!IsCollectBarrierAcquired(commitId)) { + // The client has sent the AddData request too late, after + // the lease has expired. + auto response = std::make_unique( + MakeError(E_REJECTED, "collect barrier expired")); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + // We acquire the collect barrier for the second time in order to prolong + // the already acquired lease + AcquireCollectBarrier(commitId); + bool txStarted = false; + Y_DEFER + { + // Until the tx is started, it is this method's responsibility to + // release the collect barrier + if (!txStarted) { + TABLET_VERIFY(TryReleaseCollectBarrier(commitId)); + // The second one is used to release the barrier, acquired in + // GenerateBlobIds method. Though it will be eventually released + // upon lease expiration, it is better to release it as soon as + // possible. + TryReleaseCollectBarrier(commitId); + } + }; + + const TByteRange range( + msg->Record.GetOffset(), + msg->Record.GetLength(), + GetBlockSize()); + + auto validator = [&](const NProtoPrivate::TAddDataRequest& request) + { + return ValidateWriteRequest(ctx, request, range); + }; + + if (!AcceptRequest(ev, ctx, validator)) { + return; + } + + auto requestInfo = + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext); + requestInfo->StartedTs = ctx.Now(); + + TVector blobIds; + for (const auto& blobId: msg->Record.GetBlobIds()) { + blobIds.push_back(LogoBlobIDFromLogoBlobID(blobId)); + } + + if (blobIds.empty()) { + auto response = + std::make_unique(MakeError( + E_ARGUMENT, + "empty list of blobs given in AddData request")); + NCloud::Reply(ctx, *ev, std::move(response)); + } + + LOG_DEBUG( + ctx, + TFileStoreComponents::TABLET, + "%s %s: blobId: %s,... (total: %lu)", + LogTag.c_str(), + "AddData", + blobIds[0].ToString().c_str(), + blobIds.size()); + + AddTransaction(*requestInfo); + + ExecuteTx( + ctx, + std::move(requestInfo), + msg->Record, + range, + std::move(blobIds), + msg->Record.GetCommitId()); + txStarted = true; +} + +} // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_collectgarbage.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_collectgarbage.cpp index ecbffb146d0..58c8b1042a7 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_collectgarbage.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_collectgarbage.cpp @@ -473,4 +473,19 @@ void TIndexTabletActor::HandleCollectGarbageCompleted( EnqueueCollectGarbageIfNeeded(ctx); } +//////////////////////////////////////////////////////////////////////////////// + +void TIndexTabletActor::HandleReleaseCollectBarrier( + const TEvIndexTabletPrivate::TEvReleaseCollectBarrier::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ctx); + auto commitId = ev->Get()->CommitId; + for (ui32 i = 0; i < ev->Get()->Count; ++i) { + // We do not check if the barrier was acquired, because the barrier may + // have already been released by a completed three-stage write operation + TryReleaseCollectBarrier(commitId); + } +} + } // namespace NCloud::NFileStore::NStorage diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp index 1196ce6a58f..378831c527a 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_createsession.cpp @@ -15,6 +15,7 @@ void FillFeatures(const TStorageConfig& config, NProto::TFileStore& fileStore) { auto* features = fileStore.MutableFeatures(); features->SetTwoStageReadEnabled(config.GetTwoStageReadEnabled()); + features->SetThreeStageWriteEnabled(config.GetThreeStageWriteEnabled()); features->SetEntryTimeout(config.GetEntryTimeout().MilliSeconds()); features->SetNegativeEntryTimeout( config.GetNegativeEntryTimeout().MilliSeconds()); diff --git a/cloud/filestore/libs/storage/tablet/tablet_actor_request.cpp b/cloud/filestore/libs/storage/tablet/tablet_actor_request.cpp index 665dc3a927e..1b4cf0daa79 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_actor_request.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_actor_request.cpp @@ -120,6 +120,8 @@ template void TIndexTabletActor::CompleteResponse( FILESTORE_SERVICE(FILESTORE_GENERATE_IMPL, TEvService) FILESTORE_GENERATE_IMPL(DescribeData, TEvIndexTablet) FILESTORE_GENERATE_IMPL(DescribeSessions, TEvIndexTablet) +FILESTORE_GENERATE_IMPL(GenerateBlobIds, TEvIndexTablet) +FILESTORE_GENERATE_IMPL(AddData, TEvIndexTablet) #undef FILESTORE_GENERATE_IMPL diff --git a/cloud/filestore/libs/storage/tablet/tablet_counters.h b/cloud/filestore/libs/storage/tablet/tablet_counters.h index 8be4fa7d47e..389ea9d2001 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_counters.h +++ b/cloud/filestore/libs/storage/tablet/tablet_counters.h @@ -31,6 +31,7 @@ namespace NCloud::NFileStore::NStorage { xxx(CheckpointBlocksCount, __VA_ARGS__) \ xxx(CheckpointBlobsCount, __VA_ARGS__) \ xxx(FreshBytesCount, __VA_ARGS__) \ + xxx(LastCollectCommitId, __VA_ARGS__) \ // FILESTORE_TABLET_STATS #define FILESTORE_TABLET_SIMPLE_COUNTERS(xxx) \ diff --git a/cloud/filestore/libs/storage/tablet/tablet_private.h b/cloud/filestore/libs/storage/tablet/tablet_private.h index 006084e8fcb..15ec02c53c5 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_private.h +++ b/cloud/filestore/libs/storage/tablet/tablet_private.h @@ -580,6 +580,23 @@ struct TEvIndexTabletPrivate TSet Nodes; }; + // + // Release collect barrier + // + + struct TReleaseCollectBarrier + { + // Commit id to release + ui64 CommitId; + // Number of times to perform the release + ui32 Count; + + TReleaseCollectBarrier(ui64 commitId, ui32 count) + : CommitId(commitId) + , Count(count) + {} + }; + // // Events declaration // @@ -597,6 +614,8 @@ struct TEvIndexTabletPrivate EvReadDataCompleted, EvWriteDataCompleted, + EvReleaseCollectBarrier, + EvEnd }; @@ -609,6 +628,9 @@ struct TEvIndexTabletPrivate using TEvUpdateCounters = TRequestEvent; using TEvUpdateLeakyBucketCounters = TRequestEvent; + using TEvReleaseCollectBarrier = + TRequestEvent; + using TEvReadDataCompleted = TResponseEvent; using TEvWriteDataCompleted = TResponseEvent; }; diff --git a/cloud/filestore/libs/storage/tablet/tablet_state.h b/cloud/filestore/libs/storage/tablet/tablet_state.h index 4831ee885b1..795c82e82b5 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_state.h +++ b/cloud/filestore/libs/storage/tablet/tablet_state.h @@ -766,6 +766,7 @@ FILESTORE_DUPCACHE_REQUESTS(FILESTORE_DECLARE_DUPCACHE) void AcquireCollectBarrier(ui64 commitId); bool TryReleaseCollectBarrier(ui64 commitId); + bool IsCollectBarrierAcquired(ui64 commitId) const; ui64 GetCollectCommitId() const; diff --git a/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp b/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp index 7da973679a5..8525d48788f 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_state_data.cpp @@ -927,6 +927,11 @@ bool TIndexTabletState::TryReleaseCollectBarrier(ui64 commitId) return Impl->GarbageQueue.TryReleaseCollectBarrier(commitId); } +bool TIndexTabletState::IsCollectBarrierAcquired(ui64 commitId) const +{ + return Impl->GarbageQueue.IsCollectBarrierAcquired(commitId); +} + ui64 TIndexTabletState::GetCollectCommitId() const { // should not collect after any barrier diff --git a/cloud/filestore/libs/storage/tablet/tablet_tx.h b/cloud/filestore/libs/storage/tablet/tablet_tx.h index af5e3e47196..4d51f412f38 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_tx.h +++ b/cloud/filestore/libs/storage/tablet/tablet_tx.h @@ -97,6 +97,7 @@ namespace NCloud::NFileStore::NStorage { \ xxx(ReadData, __VA_ARGS__) \ xxx(WriteData, __VA_ARGS__) \ + xxx(AddData, __VA_ARGS__) \ xxx(WriteBatch, __VA_ARGS__) \ xxx(AllocateData, __VA_ARGS__) \ \ @@ -1212,6 +1213,42 @@ struct TTxIndexTablet } }; + // + // AddData + // + + struct TAddData : TSessionAware + { + const TRequestInfoPtr RequestInfo; + const ui64 Handle; + const TByteRange ByteRange; + TVector BlobIds; + ui64 CommitId; + + ui64 NodeId = InvalidNodeId; + TMaybe Node; + + TAddData( + TRequestInfoPtr requestInfo, + const NProtoPrivate::TAddDataRequest& request, + TByteRange byteRange, + TVector blobIds, + ui64 commitId) + : TSessionAware(request) + , RequestInfo(std::move(requestInfo)) + , Handle(request.GetHandle()) + , ByteRange(byteRange) + , BlobIds(std::move(blobIds)) + , CommitId(commitId) + {} + + void Clear() + { + NodeId = InvalidNodeId; + Node.Clear(); + } + }; + // // WriteBatch // diff --git a/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp b/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp index bb081cf57bf..0d3e79f56ee 100644 --- a/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp +++ b/cloud/filestore/libs/storage/tablet/tablet_ut_data.cpp @@ -7,6 +7,7 @@ #include +#include #include #include @@ -4322,6 +4323,426 @@ Y_UNIT_TEST_SUITE(TIndexTabletTest_Data) response->GetErrorReason()); } +#define CHECK_GENERATED_BLOB(offset, length, expected) \ + { \ + ui64 currentOffset = offset; \ + TVector expectedSizes = expected; \ + auto blobs = tablet.GenerateBlobIds(node, handle, offset, length); \ + auto commitId = blobs->Record.GetCommitId(); \ + const auto [generation, step] = ParseCommitId(commitId); \ + UNIT_ASSERT_VALUES_EQUAL( \ + blobs->Record.BlobsSize(), \ + expectedSizes.size()); \ + for (size_t i = 0; i < expectedSizes.size(); ++i) { \ + auto generatedBlob = blobs->Record.GetBlobs(i); \ + auto blob = LogoBlobIDFromLogoBlobID(generatedBlob.GetBlobId()); \ + UNIT_ASSERT_VALUES_EQUAL(blob.BlobSize(), expectedSizes[i]); \ + UNIT_ASSERT_VALUES_EQUAL(blob.Generation(), generation); \ + UNIT_ASSERT_VALUES_EQUAL(blob.Step(), step); \ + UNIT_ASSERT_VALUES_EQUAL( \ + generatedBlob.GetOffset(), \ + currentOffset); \ + currentOffset += expectedSizes[i]; \ + } \ + } + + TABLET_TEST(ShouldGenerateBlobIds) + { + auto block = tabletConfig.BlockSize; + + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto node = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + ui64 handle = CreateHandle(tablet, node); + + CHECK_GENERATED_BLOB(0, block, TVector{block}); + CHECK_GENERATED_BLOB(block, block, TVector{block}); + CHECK_GENERATED_BLOB(3 * block, 2 * block, TVector{2 * block}); + CHECK_GENERATED_BLOB( + BlockGroupSize * block / 2, + block * BlockGroupSize, + (TVector{ + BlockGroupSize * block / 2, + BlockGroupSize * block / 2})); + CHECK_GENERATED_BLOB( + 3 * block * BlockGroupSize, + 3 * block * BlockGroupSize, + (TVector{ + block * BlockGroupSize, + block * BlockGroupSize, + block * BlockGroupSize})); + CHECK_GENERATED_BLOB( + block, + 2 * block * BlockGroupSize, + (TVector{ + block * (BlockGroupSize - 1), + block * BlockGroupSize, + block})); + } + +#undef CHECK_GENERATED_BLOB + + TABLET_TEST(ShouldAcquireLockForCollectGarbageOnGenerateBlobIds) + { + auto block = tabletConfig.BlockSize; + + TTestEnv env; + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto moveBarrier = [&tablet, block] + { + auto node = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + ui64 handle = CreateHandle(tablet, node); + tablet.WriteData(handle, 0, block, 'a'); + tablet.Flush(); + tablet.DestroyHandle(handle); + tablet.UnlinkNode(RootNodeId, "test", false); + tablet.CollectGarbage(); + }; + moveBarrier(); + + ui64 lastCollectGarbage = 0; + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + lastCollectGarbage = stats.GetLastCollectCommitId(); + } + UNIT_ASSERT_GT(lastCollectGarbage, 0); + + auto node = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test2")); + ui64 handle = CreateHandle(tablet, node); + + auto blobs = tablet.GenerateBlobIds(node, handle, 0, block); + auto commitId = blobs->Record.GetCommitId(); + + moveBarrier(); + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_LE(stats.GetLastCollectCommitId(), commitId); + } + + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(15)); + + moveBarrier(); + // After the GenerateBlobIdsReleaseCollectBarrierTimeout has passed, we + // can observe that the last collect garbage has moved beyond the commit + // id of the generated blob. + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + UNIT_ASSERT_GT(stats.GetLastCollectCommitId(), commitId); + } + + // Now we validate that the barrier is released even if the TX fails + TVector blobIds; + + NProto::TError error; + error.SetCode(E_REJECTED); + + auto filter = [&](auto& runtime, auto& event) + { + Y_UNUSED(runtime); + + switch (event->GetTypeRewrite()) { + case TEvBlobStorage::EvPutResult: { + using TResponse = TEvBlobStorage::TEvPutResult; + auto* msg = event->template Get(); + if (msg->Id.Channel() >= TIndexTabletSchema::DataChannel) { + blobIds.push_back(msg->Id); + } + return false; + } + case TEvIndexTabletPrivate::EvWriteBlobResponse: { + using TResponse = + TEvIndexTabletPrivate::TEvWriteBlobResponse; + auto* msg = event->template Get(); + auto& e = const_cast(msg->Error); + e.SetCode(E_REJECTED); + return false; + } + } + + return false; + }; + + env.GetRuntime().SetEventFilter(filter); + + auto generateResult = + tablet.GenerateBlobIds(node, handle, 0, block * BlockGroupSize); + commitId = generateResult->Record.GetCommitId(); + + // intercepted blob was successfully written to BlobStorage, yet the + // following operation is expected to fail + tablet.AssertWriteDataFailed(handle, 0, block * BlockGroupSize, 'x'); + + env.GetRuntime().SetEventFilter( + TTestActorRuntimeBase::DefaultFilterFunc); + + // because we use handle + 1 instead of handle, it is expected that the + // handler will fail will fail + tablet.AssertAddDataFailed( + node, + handle + 1, + 0, + block * BlockGroupSize, + blobIds, + commitId); + + moveBarrier(); + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + // We expect that upon failre the barrier was released, thus moving + // the last collect garbage beyond the commit id of the issued blob + UNIT_ASSERT_GT(stats.GetLastCollectCommitId(), commitId); + } + + node = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test3")); + handle = CreateHandle(tablet, node, "", TCreateHandleArgs::RDNLY); + + + // now we do the same thing, but expect HandleAddData to execute tx, yet + // the tx to fail + generateResult = + tablet.GenerateBlobIds(node, handle, 0, block * BlockGroupSize); + commitId = generateResult->Record.GetCommitId(); + + tablet.AssertAddDataFailed( + node, + handle, + 0, + block * BlockGroupSize, + blobIds, + commitId); + + moveBarrier(); + { + auto response = tablet.GetStorageStats(); + const auto& stats = response->Record.GetStats(); + // We expect that upon failre the barrier was released, thus moving + // the last collect garbage beyond the commit id of the issued blob + UNIT_ASSERT_GT(stats.GetLastCollectCommitId(), commitId); + } + } + + TABLET_TEST(ShouldAddData) + { + const auto block = tabletConfig.BlockSize; + + NProto::TStorageConfig storageConfig; + storageConfig.SetCompactionThreshold(999'999); + storageConfig.SetCleanupThreshold(999'999); + storageConfig.SetWriteBlobThreshold(block); + + TTestEnv env({}, std::move(storageConfig)); + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto id = CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + auto handle = CreateHandle(tablet, id); + + TVector blobIds; + bool shouldDropPutResult = true; + + // We can't make direct writes to BlobStorage, so we store the blob ids + // from an ordinary write and then use them in AddData + env.GetRuntime().SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvBlobStorage::EvPutResult: { + // We intercept all PutResult events in order for tablet + // to consider them as written. Nevertheless, these + // blobs are already written and we will use them in + // AddData + auto* msg = event->Get(); + if (msg->Id.Channel() >= + TIndexTabletSchema::DataChannel) { + blobIds.push_back(msg->Id); + } + if (shouldDropPutResult) { + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + TString data(block * BlockGroupSize * 2, '\0'); + for (size_t i = 0; i < data.size(); ++i) { + // 77 and 256 are coprimes + data[i] = static_cast(i % 77); + } + + tablet.SendWriteDataRequest(handle, 0, data.size(), data.data()); + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(blobIds.size(), 2); + shouldDropPutResult = false; + + // We acquire commitId just so there is something to release on + // completion + auto commitId = tablet.GenerateBlobIds(id, handle, 0, block) + ->Record.GetCommitId(); + + auto id2 = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test2")); + auto handle2 = CreateHandle(tablet, id2); + + Sort(blobIds.begin(), blobIds.end()); + + // Now we try to submit the same blobs for another node + auto request = tablet.CreateAddDataRequest( + id2, + handle2, + 0, + data.size(), + blobIds, + commitId); + + tablet.SendRequest(std::move(request)); + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(2)); + + auto readData = tablet.ReadData(handle2, 0, data.size()); + + // After AddData, we should receive AddDataResponse + auto response = tablet.RecvAddDataResponse(); + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + + // Use DescribeData to check that proper blobs were added + auto describe = tablet.DescribeData(handle2, 0, data.size()); + UNIT_ASSERT_VALUES_EQUAL(describe->Record.BlobPiecesSize(), 2); + for (auto [i, blobPiece]: Enumerate(describe->Record.GetBlobPieces())) { + UNIT_ASSERT_VALUES_EQUAL(1, blobPiece.RangesSize()); + UNIT_ASSERT_VALUES_EQUAL( + i * (block * BlockGroupSize), + blobPiece.GetRanges(0).GetOffset()); + UNIT_ASSERT_VALUES_EQUAL(0, blobPiece.GetRanges(0).GetBlobOffset()); + UNIT_ASSERT_VALUES_EQUAL( + block * BlockGroupSize, + blobPiece.GetRanges(0).GetLength()); + + auto blobId = LogoBlobIDFromLogoBlobID(blobPiece.GetBlobId()); + UNIT_ASSERT_VALUES_EQUAL(blobId, blobIds[i]); + } + + // validate, that no more BlobStorage requests were made + UNIT_ASSERT_VALUES_EQUAL(blobIds.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(data, readData->Record.GetBuffer()); + } + + TABLET_TEST(ShouldRejectAddDataIfCollectBarrierIsAlreadyReleased) + { + const auto block = tabletConfig.BlockSize; + + NProto::TStorageConfig storageConfig; + storageConfig.SetCompactionThreshold(999'999); + storageConfig.SetCleanupThreshold(999'999); + storageConfig.SetWriteBlobThreshold(block); + + TTestEnv env({}, std::move(storageConfig)); + env.CreateSubDomain("nfs"); + + ui32 nodeIdx = env.CreateNode("nfs"); + ui64 tabletId = env.BootIndexTablet(nodeIdx); + + TIndexTabletClient tablet( + env.GetRuntime(), + nodeIdx, + tabletId, + tabletConfig); + tablet.InitSession("client", "session"); + + auto id = CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test")); + auto handle = CreateHandle(tablet, id); + + TVector blobIds; + + env.GetRuntime().SetObserverFunc( + [&](TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvBlobStorage::EvPutResult: { + auto* msg = event->Get(); + if (msg->Id.Channel() >= + TIndexTabletSchema::DataChannel) { + blobIds.push_back(msg->Id); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + + return TTestActorRuntime::DefaultObserverFunc(event); + }); + + TString data(block * BlockGroupSize * 2, 'x'); + + tablet.SendWriteDataRequest(handle, 0, data.size(), data.data()); + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(blobIds.size(), 2); + + auto commitId = tablet.GenerateBlobIds(id, handle, 0, block) + ->Record.GetCommitId(); + + // We wait for the collect barrier lease to expire. We expect that the + // following AddData request will be rejected + env.GetRuntime().DispatchEvents({}, TDuration::Seconds(15)); + + auto id2 = + CreateNode(tablet, TCreateNodeArgs::File(RootNodeId, "test2")); + auto handle2 = CreateHandle(tablet, id2); + + Sort(blobIds.begin(), blobIds.end()); + + tablet.SendAddDataRequest( + id2, + handle2, + 0, + data.size(), + blobIds, + commitId); + + auto response = tablet.RecvAddDataResponse(); + UNIT_ASSERT_VALUES_EQUAL(E_REJECTED, response->GetStatus()); + } + #undef TABLET_TEST } diff --git a/cloud/filestore/libs/storage/tablet/ya.make b/cloud/filestore/libs/storage/tablet/ya.make index d008c670d80..765e3080cab 100644 --- a/cloud/filestore/libs/storage/tablet/ya.make +++ b/cloud/filestore/libs/storage/tablet/ya.make @@ -15,6 +15,7 @@ SRCS( tablet_actor_accessnode.cpp tablet_actor_acquirelock.cpp tablet_actor_addblob.cpp + tablet_actor_adddata.cpp tablet_actor_allocatedata.cpp tablet_actor_change_storage_config.cpp tablet_actor_cleanup.cpp diff --git a/cloud/filestore/libs/storage/testlib/tablet_client.h b/cloud/filestore/libs/storage/testlib/tablet_client.h index 9c60fb51f5c..bcc61ea0d13 100644 --- a/cloud/filestore/libs/storage/testlib/tablet_client.h +++ b/cloud/filestore/libs/storage/testlib/tablet_client.h @@ -557,6 +557,44 @@ class TIndexTabletClient return request; } + auto CreateGenerateBlobIdsRequest( + ui64 nodeId, + ui64 handle, + ui64 offset, + ui64 length) + { + auto request = + CreateSessionRequest(); + request->Record.SetNodeId(nodeId); + request->Record.SetHandle(handle); + request->Record.SetOffset(offset); + request->Record.SetLength(length); + return request; + } + + auto CreateAddDataRequest( + ui64 nodeId, + ui64 handle, + ui64 offset, + ui32 length, + const TVector& blobIds, + ui64 commitId) + { + auto request = CreateSessionRequest< + TEvIndexTablet::TEvAddDataRequest>(); + request->Record.SetNodeId(nodeId); + request->Record.SetHandle(handle); + request->Record.SetOffset(offset); + request->Record.SetLength(length); + for (const auto& blobId: blobIds) { + NKikimr::LogoBlobIDFromLogoBlobID( + blobId, + request->Record.MutableBlobIds()->Add()); + } + request->Record.SetCommitId(commitId); + return request; + } + auto CreateAcquireLockRequest( ui64 handle, ui64 owner, diff --git a/cloud/filestore/private/api/protos/tablet.proto b/cloud/filestore/private/api/protos/tablet.proto index 35d8efc8960..8efded8ebca 100644 --- a/cloud/filestore/private/api/protos/tablet.proto +++ b/cloud/filestore/private/api/protos/tablet.proto @@ -117,6 +117,7 @@ message TStorageStats uint64 FreshBytesCount = 109; uint64 AllocatedCompactionRanges = 110; uint64 UsedCompactionRanges = 111; + uint64 LastCollectCommitId = 112; // channel stats uint64 TabletChannelCount = 1000; @@ -368,3 +369,88 @@ message TDescribeSessionsResponse // All tablet sessions. repeated TTabletSessionInfo Sessions = 2; } + +//////////////////////////////////////////////////////////////////////////////// +// GenerateBlobIds request/response. + +message TGenerateBlobIdsRequest +{ + // Optional request headers. + NProto.THeaders Headers = 1; + + // FileSystem identifier. + string FileSystemId = 2; + + // Node. + uint64 NodeId = 3; + + // IO handle. + uint64 Handle = 4; + + // Starting offset for write. Expected to be aligned to the block size. + uint64 Offset = 5; + + // Length of data to write. Expected to be aligned to the block size. + uint64 Length = 6; +} + +message TGeneratedBlob +{ + // Blob id. + NKikimrProto.TLogoBlobID BlobId = 1; + + // Offset + uint64 Offset = 2; + + // Group id. + uint32 BSGroupId = 3; +} + +message TGenerateBlobIdsResponse +{ + // Optional error, set only if error happened. + NCloud.NProto.TError Error = 1; + + // Blob ids, in the same order as in the request. + repeated TGeneratedBlob Blobs = 2; + + // AcquireCollectBarrier has been executed for this commit id. + uint64 CommitId = 4; +} + +//////////////////////////////////////////////////////////////////////////////// + +// ThreeStageWrite request/response. + +message TAddDataRequest +{ + // Optional request headers. + NProto.THeaders Headers = 1; + + // FileSystem identifier. + string FileSystemId = 2; + + // Node. + uint64 NodeId = 3; + + // IO handle. + uint64 Handle = 4; + + // Starting offset for write. + uint64 Offset = 5; + + // Data size. + uint64 Length = 6; + + // Blob ids to be added. Ordered by the offset in the original data. + repeated NKikimrProto.TLogoBlobID BlobIds = 7; + + // Commit id. + uint64 CommitId = 8; +} + +message TAddDataResponse +{ + // Optional error, set only if error happened. + NCloud.NProto.TError Error = 1; +} diff --git a/cloud/filestore/public/api/protos/fs.proto b/cloud/filestore/public/api/protos/fs.proto index ad472c2b515..728df48f9e2 100644 --- a/cloud/filestore/public/api/protos/fs.proto +++ b/cloud/filestore/public/api/protos/fs.proto @@ -17,6 +17,7 @@ message TFileStoreFeatures uint32 EntryTimeout = 2; uint32 NegativeEntryTimeout = 3; uint32 AttrTimeout = 4; + bool ThreeStageWriteEnabled = 5; } message TFileStore