diff --git a/cloud/blockstore/libs/storage/core/compaction_options.cpp b/cloud/blockstore/libs/storage/core/compaction_options.cpp new file mode 100644 index 00000000000..a3d3d81dc16 --- /dev/null +++ b/cloud/blockstore/libs/storage/core/compaction_options.cpp @@ -0,0 +1 @@ +#include "compaction_options.h" diff --git a/cloud/blockstore/libs/storage/core/compaction_options.h b/cloud/blockstore/libs/storage/core/compaction_options.h new file mode 100644 index 00000000000..34d017a2740 --- /dev/null +++ b/cloud/blockstore/libs/storage/core/compaction_options.h @@ -0,0 +1,26 @@ +#pragma once + +#include + +#include + +namespace NCloud::NBlockStore::NStorage { + +//////////////////////////////////////////////////////////////////////////////// + +enum class ECompactionOption: size_t +{ + Full, // non-incremental compaction + Forced, // compaction initiated externally + MaxFieldNumber, +}; + +constexpr size_t ToBit(ECompactionOption option) +{ + return static_cast(option); +} + +using TCompactionOptions = + std::bitset; + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/compaction_type.cpp b/cloud/blockstore/libs/storage/core/compaction_type.cpp new file mode 100644 index 00000000000..30277d31822 --- /dev/null +++ b/cloud/blockstore/libs/storage/core/compaction_type.cpp @@ -0,0 +1 @@ +#include "compaction_type.h" diff --git a/cloud/blockstore/libs/storage/core/compaction_type.h b/cloud/blockstore/libs/storage/core/compaction_type.h new file mode 100644 index 00000000000..b42f29d9a87 --- /dev/null +++ b/cloud/blockstore/libs/storage/core/compaction_type.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace NCloud::NBlockStore::NStorage { + +//////////////////////////////////////////////////////////////////////////////// + +enum class ECompactionType: ui32 +{ + Forced, // compaction initiated externally + Tablet, // compaction initiated by tablet +}; + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/core/ya.make b/cloud/blockstore/libs/storage/core/ya.make index e0566aef3b7..6467cd97190 100644 --- a/cloud/blockstore/libs/storage/core/ya.make +++ b/cloud/blockstore/libs/storage/core/ya.make @@ -5,7 +5,9 @@ GENERATE_ENUM_SERIALIZATION(mount_token.h) SRCS( block_handler.cpp compaction_map.cpp + compaction_options.cpp compaction_policy.cpp + compaction_type.cpp config.cpp disk_counters.cpp disk_validation.cpp diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp index 4b7c27e591f..70f75adf578 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp @@ -149,6 +149,7 @@ class TCompactionActor final const ui32 MaxAffectedBlocksPerCompaction; const IBlockDigestGeneratorPtr BlockDigestGenerator; const TDuration ReadBlobTimeout; + const ECompactionType CompactionType; const ui64 CommitId; @@ -181,6 +182,7 @@ class TCompactionActor final ui32 maxAffectedBlocksPerCompaction, IBlockDigestGeneratorPtr blockDigestGenerator, TDuration readBlobTimeout, + ECompactionType compactionType, ui64 commitId, TVector rangeCompactionInfos, TVector requests); @@ -242,6 +244,7 @@ TCompactionActor::TCompactionActor( ui32 maxAffectedBlocksPerCompaction, IBlockDigestGeneratorPtr blockDigestGenerator, TDuration readBlobTimeout, + ECompactionType compactionType, ui64 commitId, TVector rangeCompactionInfos, TVector requests) @@ -253,6 +256,7 @@ TCompactionActor::TCompactionActor( , MaxAffectedBlocksPerCompaction(maxAffectedBlocksPerCompaction) , BlockDigestGenerator(std::move(blockDigestGenerator)) , ReadBlobTimeout(readBlobTimeout) + , CompactionType(compactionType) , CommitId(commitId) , RangeCompactionInfos(std::move(rangeCompactionInfos)) , Requests(std::move(requests)) @@ -736,6 +740,7 @@ void TCompactionActor::NotifyCompleted( request->AffectedRanges.push_back(ConvertRangeSafe(rc.BlockRange)); } request->AffectedBlockInfos = std::move(AffectedBlockInfos); + request->CompactionType = CompactionType; NCloud::Send(ctx, Tablet, std::move(request)); } @@ -1029,7 +1034,9 @@ void TPartitionActor::ChangeRangeCountPerRunIfNeeded( void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx) { - if (State->GetCompactionState().Status != EOperationStatus::Idle) { + if (State->GetCompactionState(ECompactionType::Tablet).Status != + EOperationStatus::Idle) + { // already enqueued return; } @@ -1125,7 +1132,8 @@ void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx) throttlingAllowed = false; } - State->GetCompactionState().SetStatus(EOperationStatus::Enqueued); + State->GetCompactionState(ECompactionType::Tablet).SetStatus( + EOperationStatus::Enqueued); if (Config->GetCompactionCountPerRunIncreasingThreshold() && Config->GetCompactionCountPerRunDecreasingThreshold() @@ -1176,7 +1184,7 @@ void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx) if (mode == TEvPartitionPrivate::GarbageCompaction || !diskGarbageBelowThreshold) { - request->ForceFullCompaction = true; + request->CompactionOptions.set(ToBit(ECompactionOption::Full)); } if (throttlingAllowed && Config->GetMaxCompactionDelay()) { @@ -1245,13 +1253,21 @@ void TPartitionActor::HandleCompaction( NCloud::Reply(ctx, requestInfo, std::move(response)); }; - if (State->GetCompactionState().Status == EOperationStatus::Started) { + const auto compactionType = + msg->CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? + ECompactionType::Forced: + ECompactionType::Tablet; + + if (State->GetCompactionState(compactionType).Status == + EOperationStatus::Started) + { replyError(ctx, *requestInfo, E_TRY_AGAIN, "compaction already started"); return; } if (!State->IsCompactionAllowed()) { - State->GetCompactionState().SetStatus(EOperationStatus::Idle); + State->GetCompactionState(compactionType).SetStatus( + EOperationStatus::Idle); replyError(ctx, *requestInfo, E_BS_OUT_OF_SPACE, "all channels readonly"); return; @@ -1287,7 +1303,8 @@ void TPartitionActor::HandleCompaction( } if (tops.empty() || !tops.front().Stat.BlobCount) { - State->GetCompactionState().SetStatus(EOperationStatus::Idle); + State->GetCompactionState(compactionType).SetStatus( + EOperationStatus::Idle); replyError(ctx, *requestInfo, S_ALREADY, "nothing to compact"); return; @@ -1310,9 +1327,10 @@ void TPartitionActor::HandleCompaction( State->GetBlocksCount() - 1); LOG_DEBUG(ctx, TBlockStoreComponents::PARTITION, - "[%lu] Start compaction @%lu (range: %s, blobs: %u, blocks: %u" + "[%lu] Start %s compaction @%lu (range: %s, blobs: %u, blocks: %u" ", reads: %u, blobsread: %u, blocksread: %u, score: %f)", TabletID(), + compactionType == ECompactionType::Forced ? "forced" : "tablet", commitId, DescribeRange(blockRange).c_str(), x.Stat.BlobCount, @@ -1325,7 +1343,7 @@ void TPartitionActor::HandleCompaction( ranges.emplace_back(rangeIdx, blockRange); } - State->GetCompactionState().SetStatus(EOperationStatus::Started); + State->GetCompactionState(compactionType).SetStatus(EOperationStatus::Started); State->GetCommitQueue().AcquireBarrier(commitId); State->GetCleanupQueue().AcquireBarrier(commitId); @@ -1336,7 +1354,7 @@ void TPartitionActor::HandleCompaction( auto tx = CreateTx( requestInfo, commitId, - msg->ForceFullCompaction, + msg->CompactionOptions, std::move(ranges)); ui64 minCommitId = State->GetCommitQueue().GetMinCommitId(); @@ -1401,7 +1419,8 @@ void TPartitionActor::HandleCompactionCompleted( State->GetCleanupQueue().ReleaseBarrier(commitId); State->GetGarbageQueue().ReleaseBarrier(commitId); - State->GetCompactionState().SetStatus(EOperationStatus::Idle); + State->GetCompactionState(msg->CompactionType).SetStatus( + EOperationStatus::Idle); Actors.Erase(ev->Sender); @@ -1458,7 +1477,7 @@ void PrepareRangeCompaction( const TString& folderId, const TString& diskId, const ui64 commitId, - const bool forceFullCompaction, + const bool fullCompaction, const TActorContext& ctx, const ui64 tabletId, THashSet& affectedBlobIds, @@ -1503,7 +1522,7 @@ void PrepareRangeCompaction( if (ready && incrementalCompactionEnabled - && !forceFullCompaction) + && !fullCompaction) { THashMap liveBlocks; for (const auto& m: args.BlockMarks) { @@ -1885,7 +1904,7 @@ bool TPartitionActor::PrepareCompaction( PartitionConfig.GetFolderId(), PartitionConfig.GetDiskId(), args.CommitId, - args.ForceFullCompaction, + args.CompactionOptions.test(ToBit(ECompactionOption::Full)), ctx, TabletID(), affectedBlobIds, @@ -1962,6 +1981,11 @@ void TPartitionActor::CompleteCompaction( Config->GetBlobStorageAsyncGetTimeoutSSD() : Config->GetBlobStorageAsyncGetTimeoutHDD(); + const auto compactionType = + args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? + ECompactionType::Forced: + ECompactionType::Tablet; + auto actor = NCloud::Register( ctx, args.RequestInfo, @@ -1972,6 +1996,7 @@ void TPartitionActor::CompleteCompaction( Config->GetMaxAffectedBlocksPerCompaction(), BlockDigestGenerator, readBlobTimeout, + compactionType, args.CommitId, std::move(rangeCompactionInfos), std::move(requests)); diff --git a/cloud/blockstore/libs/storage/partition/part_actor_compactrange.cpp b/cloud/blockstore/libs/storage/partition/part_actor_compactrange.cpp index ac5f341f8de..4fd42e99e44 100644 --- a/cloud/blockstore/libs/storage/partition/part_actor_compactrange.cpp +++ b/cloud/blockstore/libs/storage/partition/part_actor_compactrange.cpp @@ -93,7 +93,9 @@ void TForcedCompactionActor::SendCompactionRequest(const TActorContext& ctx) auto request = std::make_unique( MakeIntrusive(), RangesToCompact[CurrentBlock], - true); + TCompactionOptions(). + set(ToBit(ECompactionOption::Forced)). + set(ToBit(ECompactionOption::Full))); NCloud::Send(ctx, Tablet, std::move(request)); } diff --git a/cloud/blockstore/libs/storage/partition/part_events_private.h b/cloud/blockstore/libs/storage/partition/part_events_private.h index d6af907df2e..29652a9a112 100644 --- a/cloud/blockstore/libs/storage/partition/part_events_private.h +++ b/cloud/blockstore/libs/storage/partition/part_events_private.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include #include @@ -441,14 +443,19 @@ struct TEvPartitionPrivate { ECompactionMode Mode = RangeCompaction; TMaybe BlockIndex; - bool ForceFullCompaction = false; + TCompactionOptions CompactionOptions; TCompactionRequest() = default; - TCompactionRequest(ui32 blockIndex, bool forceFullCompaction) - : Mode(RangeCompaction) - , BlockIndex(blockIndex) - , ForceFullCompaction(forceFullCompaction) + TCompactionRequest( + ui32 blockIndex, + TCompactionOptions compactionOptions) + : BlockIndex(blockIndex) + , CompactionOptions(compactionOptions) + {} + + TCompactionRequest(ui32 blockIndex) + : TCompactionRequest(blockIndex, {}) {} TCompactionRequest(ECompactionMode mode) @@ -773,6 +780,17 @@ struct TEvPartitionPrivate { }; + + // + // CompactionCompleted + // + + struct TCompactionCompleted + : TOperationCompleted + { + ECompactionType CompactionType; + }; + // // MetadataRebuildCompleted // @@ -880,7 +898,7 @@ struct TEvPartitionPrivate using TEvWriteBlocksCompleted = TResponseEvent; using TEvZeroBlocksCompleted = TResponseEvent; using TEvFlushCompleted = TResponseEvent; - using TEvCompactionCompleted = TResponseEvent; + using TEvCompactionCompleted = TResponseEvent; using TEvCollectGarbageCompleted = TResponseEvent; using TEvForcedCompactionCompleted = TResponseEvent; using TEvMetadataRebuildCompleted = TResponseEvent; diff --git a/cloud/blockstore/libs/storage/partition/part_state.cpp b/cloud/blockstore/libs/storage/partition/part_state.cpp index 9c0c7c2ce5c..f8037c7c322 100644 --- a/cloud/blockstore/libs/storage/partition/part_state.cpp +++ b/cloud/blockstore/libs/storage/partition/part_state.cpp @@ -1008,6 +1008,16 @@ ui64 TPartitionState::GetMixedIndexCacheMemSize() const return MixedIndexCacheAllocator.GetBytesAllocated(); } +//////////////////////////////////////////////////////////////////////////////// +// Compaction + +TOperationState& TPartitionState::GetCompactionState(ECompactionType type) +{ + return type == ECompactionType::Forced ? + ForcedCompactionState.State : + CompactionState; +} + //////////////////////////////////////////////////////////////////////////////// void TPartitionState::SetUsedBlocks( diff --git a/cloud/blockstore/libs/storage/partition/part_state.h b/cloud/blockstore/libs/storage/partition/part_state.h index 6e3c93b25e8..20575413688 100644 --- a/cloud/blockstore/libs/storage/partition/part_state.h +++ b/cloud/blockstore/libs/storage/partition/part_state.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -78,6 +79,7 @@ struct TForcedCompactionState ui32 Progress = 0; ui32 RangesCount = 0; TString OperationId; + TOperationState State; }; //////////////////////////////////////////////////////////////////////////////// @@ -800,10 +802,7 @@ class TPartitionState TInstant LastCompactionRangeCountPerRunTs; public: - TOperationState& GetCompactionState() - { - return CompactionState; - } + TOperationState& GetCompactionState(ECompactionType type); TCompactionMap& GetCompactionMap() { diff --git a/cloud/blockstore/libs/storage/partition/part_tx.h b/cloud/blockstore/libs/storage/partition/part_tx.h index 2c65e545da2..68fea44117b 100644 --- a/cloud/blockstore/libs/storage/partition/part_tx.h +++ b/cloud/blockstore/libs/storage/partition/part_tx.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -430,18 +431,18 @@ struct TTxPartition { const TRequestInfoPtr RequestInfo; const ui64 CommitId; - const bool ForceFullCompaction; + const TCompactionOptions CompactionOptions; TVector RangeCompactions; TCompaction( TRequestInfoPtr requestInfo, ui64 commitId, - bool forceFullCompaction, + TCompactionOptions compactionOptions, const TVector>& ranges) : RequestInfo(std::move(requestInfo)) , CommitId(commitId) - , ForceFullCompaction(forceFullCompaction) + , CompactionOptions(compactionOptions) { RangeCompactions.reserve(ranges.size()); for (const auto& range: ranges) { diff --git a/cloud/blockstore/libs/storage/partition/part_ut.cpp b/cloud/blockstore/libs/storage/partition/part_ut.cpp index 545a24aa774..ee4a63c6aab 100644 --- a/cloud/blockstore/libs/storage/partition/part_ut.cpp +++ b/cloud/blockstore/libs/storage/partition/part_ut.cpp @@ -15,11 +15,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -728,14 +730,10 @@ class TPartitionClient return std::make_unique(); } - std::unique_ptr CreateCompactionRequest() + template + std::unique_ptr CreateCompactionRequest(TArgs&&... args) { - return std::make_unique(); - } - - std::unique_ptr CreateCompactionRequest(ui32 blockIndex, bool forceFullCompaction = false) - { - return std::make_unique(blockIndex, forceFullCompaction); + return std::make_unique(std::forward(args)...); } std::unique_ptr CreateMetadataRebuildBlockCountRequest( @@ -4069,19 +4067,19 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } } - Y_UNIT_TEST(ShouldCreateBlobsForEveryWrittenRangeDuringForcedCompaction) + void DoTestFullCompaction(bool forced) { constexpr ui32 rangesCount = 5; - auto runtime = PrepareTestActorRuntime(DefaultConfig(), rangesCount * 1024); + auto storageConfig = DefaultConfig(); + storageConfig.SetWriteBlobThreshold(1_MB); + auto runtime = PrepareTestActorRuntime(storageConfig, rangesCount * 1024); TPartitionClient partition(*runtime); partition.WaitReady(); for (ui32 range = 0; range < rangesCount; ++range) { partition.WriteBlocks( - TBlockRange32::MakeClosedInterval( - range * 1024 + 100, - range * 1024 + 1000), + TBlockRange32::WithLength(range * 1024, 1024), 1); } partition.Flush(); @@ -4089,8 +4087,13 @@ Y_UNIT_TEST_SUITE(TPartitionTest) auto response = partition.StatPartition(); auto oldStats = response->Record.GetStats(); + TCompactionOptions options; + options.set(ToBit(ECompactionOption::Full)); + if (forced) { + options.set(ToBit(ECompactionOption::Forced)); + } for (ui32 range = 0; range < rangesCount; ++range) { - partition.Compaction(range * 1024); + partition.Compaction(range * 1024, options); } response = partition.StatPartition(); @@ -4101,11 +4104,23 @@ Y_UNIT_TEST_SUITE(TPartitionTest) ); } - Y_UNIT_TEST(ShouldNotCreateBlobsForEmptyRangesDuringForcedCompaction) + Y_UNIT_TEST(ShouldCreateBlobsForEveryWrittenRangeDuringFullCompaction) + { + DoTestFullCompaction(false); + } + + Y_UNIT_TEST(ShouldCreateBlobsForEveryWrittenRangeDuringForcedFullCompaction) + { + DoTestFullCompaction(true); + } + + void DoTestEmptyRangesFullCompaction(bool forced) { constexpr ui32 rangesCount = 5; constexpr ui32 emptyRange = 2; - auto runtime = PrepareTestActorRuntime(DefaultConfig(), rangesCount * 1024); + auto storageConfig = DefaultConfig(); + storageConfig.SetWriteBlobThreshold(1_MB); + auto runtime = PrepareTestActorRuntime(storageConfig, rangesCount * 1024); TPartitionClient partition(*runtime); partition.WaitReady(); @@ -4113,9 +4128,7 @@ Y_UNIT_TEST_SUITE(TPartitionTest) for (ui32 range = 0; range < rangesCount; ++range) { if (range != emptyRange) { partition.WriteBlocks( - TBlockRange32::MakeClosedInterval( - range * 1024 + 100, - range * 1024 + 1000), + TBlockRange32::WithLength(range * 1024, 900), 1); } } @@ -4124,8 +4137,14 @@ Y_UNIT_TEST_SUITE(TPartitionTest) auto response = partition.StatPartition(); auto oldStats = response->Record.GetStats(); + TCompactionOptions options; + options.set(ToBit(ECompactionOption::Full)); + if (forced) { + options.set(ToBit(ECompactionOption::Forced)); + } + for (ui32 range = 0; range < rangesCount; ++range) { - partition.Compaction(range * 1024); + partition.Compaction(range * 1024, options); } response = partition.StatPartition(); @@ -4136,6 +4155,16 @@ Y_UNIT_TEST_SUITE(TPartitionTest) ); } + Y_UNIT_TEST(ShouldNotCreateBlobsForEmptyRangesDuringFullCompaction) + { + DoTestEmptyRangesFullCompaction(false); + } + + Y_UNIT_TEST(ShouldNotCreateBlobsForEmptyRangesDuringForcedFullCompaction) + { + DoTestEmptyRangesFullCompaction(true); + } + Y_UNIT_TEST(ShouldCorrectlyMarkFirstBlockInBlobIfItIsTheSameAsLastBlockInPreviousBlob) { auto config = DefaultConfig(); @@ -5066,7 +5095,8 @@ Y_UNIT_TEST_SUITE(TPartitionTest) event->Get(); UNIT_ASSERT_VALUES_EQUAL( incrementalCompactionExpected, - !request->ForceFullCompaction + !request->CompactionOptions.test( + ToBit(ECompactionOption::Full)) ); compactionRequest.reset(event.Release()); @@ -7203,13 +7233,19 @@ Y_UNIT_TEST_SUITE(TPartitionTest) for (ui32 i = 0; i < 4; ++i) { partition.WriteBlocks(0, 100); - auto compResponse = partition.CompactRange(0, 100); - op.push_back(compResponse->Record.GetOperationId()); - } + partition.SendCompactRangeRequest(0, 100); - for (ui32 i = 0; i < 4; ++i) - { - partition.GetCompactionStatus(op[i]); + { + TDispatchOptions options; + options.FinalEvents.emplace_back( + TEvPartitionPrivate::EvForcedCompactionCompleted, + 1); + runtime->DispatchEvents(options); + } + + auto response = partition.RecvCompactRangeResponse(); + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + op.push_back(response->Record.GetOperationId()); } runtime->AdvanceCurrentTime(CompactOpHistoryDuration + TDuration::Seconds(1)); @@ -11214,6 +11250,88 @@ Y_UNIT_TEST_SUITE(TPartitionTest) } UNIT_ASSERT_VALUES_EQUAL(1, failedReadBlob); } + + Y_UNIT_TEST(ShouldAllowForcedCompactionRequestsInPresenseOfTabletCompaction) + { + constexpr ui32 rangesCount = 5; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), rangesCount * 1024); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + for (ui32 range = 0; range < rangesCount; ++range) { + partition.WriteBlocks( + TBlockRange32::WithLength(range * 1024, 1024), + 1); + } + partition.Flush(); + + bool steal = true; + runtime->SetEventFilter([&] + (TTestActorRuntimeBase& runtime, TAutoPtr& ev) + { + Y_UNUSED(runtime); + + if (ev->GetTypeRewrite() == TEvPartitionPrivate::EvCompactionCompleted && + steal) + { + steal = false; + return true; + } + return false; + }); + + partition.SendCompactionRequest( + 0, + TCompactionOptions()); + partition.Compaction( + 0, + TCompactionOptions(). + set(ToBit(ECompactionOption::Forced))); + } + + Y_UNIT_TEST(ShouldAllowOnlyOneForcedCompactionRequestAtATime) + { + constexpr ui32 rangesCount = 5; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), rangesCount * 1024); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + for (ui32 range = 0; range < rangesCount; ++range) { + partition.WriteBlocks( + TBlockRange32::WithLength(range * 1024, 1024), + 1); + } + partition.Flush(); + + bool steal = true; + runtime->SetEventFilter([&] + (TTestActorRuntimeBase& runtime, TAutoPtr& ev) + { + Y_UNUSED(runtime); + + if (ev->GetTypeRewrite() == TEvPartitionPrivate::EvCompactionCompleted && + steal) + { + steal = false; + return true; + } + return false; + }); + + partition.SendCompactionRequest( + 0, + TCompactionOptions(). + set(ToBit(ECompactionOption::Forced))); + partition.SendCompactionRequest( + 0, + TCompactionOptions(). + set(ToBit(ECompactionOption::Forced))); + + auto response = partition.RecvCompactionResponse(); + UNIT_ASSERT_VALUES_EQUAL(E_TRY_AGAIN, response->GetStatus()); + } } } // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/partition2/part2_actor_compaction.cpp b/cloud/blockstore/libs/storage/partition2/part2_actor_compaction.cpp index d6a9a01fc6c..429a92cd5a0 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_actor_compaction.cpp +++ b/cloud/blockstore/libs/storage/partition2/part2_actor_compaction.cpp @@ -73,6 +73,7 @@ class TCompactionActor final const ui64 CommitId; const TBlockRange32 BlockRange; const TDuration ReadBlobTimeout; + const ECompactionType CompactionType; TGarbageInfo GarbageInfo; TAffectedBlobInfos AffectedBlobInfos; ui32 BlobsSkipped; @@ -109,6 +110,7 @@ class TCompactionActor final ui64 commitId, const TBlockRange32& blockRange, TDuration readBlobTimeout, + ECompactionType compactionType, TGarbageInfo garbageInfo, TAffectedBlobInfos affectedBlobInfos, ui32 blobsSkipped, @@ -165,6 +167,7 @@ TCompactionActor::TCompactionActor( ui64 commitId, const TBlockRange32& blockRange, TDuration readBlobTimeout, + ECompactionType compactionType, TGarbageInfo garbageInfo, TAffectedBlobInfos affectedBlobInfos, ui32 blobsSkipped, @@ -179,6 +182,7 @@ TCompactionActor::TCompactionActor( , CommitId(commitId) , BlockRange(blockRange) , ReadBlobTimeout(readBlobTimeout) + , CompactionType(compactionType) , GarbageInfo(std::move(garbageInfo)) , AffectedBlobInfos(std::move(affectedBlobInfos)) , BlobsSkipped(blobsSkipped) @@ -420,6 +424,7 @@ void TCompactionActor::NotifyCompleted( request->AffectedRanges = std::move(AffectedRanges); request->AffectedBlockInfos = std::move(AffectedBlockInfos); request->BlockCommitIds = std::move(BlockCommitIds); + request->CompactionType = CompactionType; NCloud::Send(ctx, Tablet, std::move(request)); } @@ -628,7 +633,9 @@ class TCompactionVisitor final void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx) { - if (State->GetCompactionStatus() != EOperationStatus::Idle) { + if (State->GetCompactionStatus(ECompactionType::Tablet) != + EOperationStatus::Idle) + { // compaction already enqueued return; } @@ -685,7 +692,6 @@ void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx) return; } - TEvPartitionPrivate::ECompactionMode compactionMode; if (rangeScore > 0) { compactionMode = TEvPartitionPrivate::RangeCompaction; @@ -696,7 +702,9 @@ void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx) return; } - State->SetCompactionStatus(EOperationStatus::Enqueued); + State->SetCompactionStatus( + ECompactionType::Tablet, + EOperationStatus::Enqueued); auto request = std::make_unique( MakeIntrusive(CreateRequestId()), @@ -747,13 +755,18 @@ void TPartitionActor::HandleCompaction( NCloud::Reply(ctx, requestInfo, std::move(response)); }; - if (State->GetCompactionStatus() == EOperationStatus::Started) { - replyError(ctx, *requestInfo, E_TRY_AGAIN, "compaction already in progress"); + const auto compactionType = + msg->CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? + ECompactionType::Forced: + ECompactionType::Tablet; + + if (State->GetCompactionStatus(compactionType) == EOperationStatus::Started) { + replyError(ctx, *requestInfo, E_TRY_AGAIN, "compaction already started"); return; } if (!State->IsCompactionAllowed()) { - State->SetCompactionStatus(EOperationStatus::Idle); + State->SetCompactionStatus(compactionType, EOperationStatus::Idle); replyError(ctx, *requestInfo, E_BS_OUT_OF_SPACE, "all channels readonly"); return; @@ -788,7 +801,7 @@ void TPartitionActor::HandleCompaction( } if (!rangeStat.BlobCount && !garbageInfo.BlobCounters) { - State->SetCompactionStatus(EOperationStatus::Idle); + State->SetCompactionStatus(compactionType, EOperationStatus::Idle); replyError(ctx, *requestInfo, S_ALREADY, "nothing to compact"); return; @@ -817,14 +830,13 @@ void TPartitionActor::HandleCompaction( rangeStat.ReadRequestBlobCount, rangeStat.ReadRequestBlockCount, rangeStat.CompactionScore.Score, - msg->ForceFullCompaction + msg->CompactionOptions.test(ToBit(ECompactionOption::Forced)) ); tx = CreateTx( requestInfo, blockRange, - msg->ForceFullCompaction - ); + msg->CompactionOptions); } else { LOG_DEBUG(ctx, TBlockStoreComponents::PARTITION, "[%lu] Start compaction (blobs: %s)", @@ -834,7 +846,8 @@ void TPartitionActor::HandleCompaction( tx = CreateTx(requestInfo, std::move(garbageInfo)); } - State->SetCompactionStatus(EOperationStatus::Started); + + State->SetCompactionStatus(compactionType, EOperationStatus::Started); AddTransaction(*requestInfo); @@ -868,7 +881,8 @@ void TPartitionActor::HandleCompactionCompleted( PartCounters->RequestCounters.Compaction.AddRequest(d.MicroSeconds()); State->ReleaseCollectBarrier(msg->CommitId); - State->SetCompactionStatus(EOperationStatus::Idle); + + State->SetCompactionStatus(msg->CompactionType, EOperationStatus::Idle); Actors.erase(ev->Sender); @@ -955,7 +969,7 @@ bool TPartitionActor::PrepareCompaction( visitor.Finish(); - if (!args.ForceFullCompaction) { + if (!args.CompactionOptions.test(ToBit(ECompactionOption::Full))) { Sort( args.Blobs.begin(), args.Blobs.end(), @@ -1177,6 +1191,12 @@ void TPartitionActor::CompleteCompaction( Config->GetBlobStorageAsyncGetTimeoutSSD() : Config->GetBlobStorageAsyncGetTimeoutHDD(); + + const auto compactionType = + args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ? + ECompactionType::Forced: + ECompactionType::Tablet; + auto actor = NCloud::Register( ctx, args.RequestInfo, @@ -1187,6 +1207,7 @@ void TPartitionActor::CompleteCompaction( args.CommitId, args.BlockRange, readBlobTimeout, + compactionType, std::move(args.GarbageInfo), std::move(args.AffectedBlobInfos), args.BlobsSkipped, diff --git a/cloud/blockstore/libs/storage/partition2/part2_actor_compactrange.cpp b/cloud/blockstore/libs/storage/partition2/part2_actor_compactrange.cpp index 8b9e82bfab2..74f30cb89a7 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_actor_compactrange.cpp +++ b/cloud/blockstore/libs/storage/partition2/part2_actor_compactrange.cpp @@ -93,8 +93,9 @@ void TForcedCompactionActor::SendCompactionRequest(const TActorContext& ctx) auto request = std::make_unique( MakeIntrusive(), RangesToCompact[CurrentBlock], - true // forceFullCompaction - ); + TCompactionOptions(). + set(ToBit(ECompactionOption::Full)). + set(ToBit(ECompactionOption::Forced))); NCloud::Send(ctx, Tablet, std::move(request)); } diff --git a/cloud/blockstore/libs/storage/partition2/part2_events_private.h b/cloud/blockstore/libs/storage/partition2/part2_events_private.h index bf35df53dae..76ba7e7fe9d 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_events_private.h +++ b/cloud/blockstore/libs/storage/partition2/part2_events_private.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include #include #include @@ -295,20 +297,26 @@ struct TEvPartitionPrivate { ECompactionMode Mode = RangeCompaction; TMaybe BlockIndex; - bool ForceFullCompaction = false; + TCompactionOptions CompactionOptions; TGarbageInfo GarbageInfo; TCompactionRequest() = default; + TCompactionRequest( + ui32 blockIndex, + TCompactionOptions compactionOptions) + : BlockIndex(blockIndex) + , CompactionOptions(compactionOptions) + {} + + TCompactionRequest(ui32 blockIndex) + : TCompactionRequest(blockIndex, {}) + {} + TCompactionRequest(ECompactionMode mode) : Mode(mode) {} - TCompactionRequest(ui32 blockIndex, bool forceFullCompaction) - : Mode(RangeCompaction) - , BlockIndex(blockIndex) - , ForceFullCompaction(forceFullCompaction) - {} TCompactionRequest(TGarbageInfo garbageInfo) : Mode(GarbageCompaction) @@ -516,6 +524,16 @@ struct TEvPartitionPrivate { }; + // + // CompactionCompleted + // + + struct TCompactionCompleted + : TOperationCompleted + { + ECompactionType CompactionType; + }; + // // ForcedCleanupCompleted // @@ -571,7 +589,7 @@ struct TEvPartitionPrivate using TEvWriteBlocksCompleted = TResponseEvent; using TEvZeroBlocksCompleted = TResponseEvent; using TEvFlushCompleted = TResponseEvent; - using TEvCompactionCompleted = TResponseEvent; + using TEvCompactionCompleted = TResponseEvent; using TEvCollectGarbageCompleted = TResponseEvent; using TEvForcedCompactionCompleted = TResponseEvent; using TEvForcedCleanupCompleted = TResponseEvent; diff --git a/cloud/blockstore/libs/storage/partition2/part2_state.cpp b/cloud/blockstore/libs/storage/partition2/part2_state.cpp index f7723dfdd61..f5a82ab40f1 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_state.cpp +++ b/cloud/blockstore/libs/storage/partition2/part2_state.cpp @@ -1638,6 +1638,28 @@ void TPartitionState::ResetCompactionMap( flush(); } +//////////////////////////////////////////////////////////////////////////////// +// Compaction + +EOperationStatus TPartitionState::GetCompactionStatus( + ECompactionType type) const +{ + const auto& state = type == ECompactionType::Forced ? + ForcedCompactionState.State : + CompactionState; + return state.GetStatus(); +} + +void TPartitionState::SetCompactionStatus( + ECompactionType type, + EOperationStatus status) +{ + auto& state = type == ECompactionType::Forced ? + ForcedCompactionState.State : + CompactionState; + state.SetStatus(status); +} + //////////////////////////////////////////////////////////////////////////////// // Stats diff --git a/cloud/blockstore/libs/storage/partition2/part2_state.h b/cloud/blockstore/libs/storage/partition2/part2_state.h index e024c195209..48679aeb148 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_state.h +++ b/cloud/blockstore/libs/storage/partition2/part2_state.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -102,6 +103,7 @@ struct TForcedCompactionState ui32 Progress = 0; ui32 RangeCount = 0; TString OperationId; + TOperationState State; }; //////////////////////////////////////////////////////////////////////////////// @@ -898,15 +900,8 @@ class TPartitionState TCompactionMap CompactionMap; public: - EOperationStatus GetCompactionStatus() const - { - return CompactionState.GetStatus(); - } - - void SetCompactionStatus(EOperationStatus status) - { - CompactionState.SetStatus(status); - } + EOperationStatus GetCompactionStatus(ECompactionType type) const; + void SetCompactionStatus(ECompactionType type, EOperationStatus status); TCompactionMap& GetCompactionMap() { diff --git a/cloud/blockstore/libs/storage/partition2/part2_tx.h b/cloud/blockstore/libs/storage/partition2/part2_tx.h index e44fdd48e8b..0c4a6111710 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_tx.h +++ b/cloud/blockstore/libs/storage/partition2/part2_tx.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -242,7 +243,7 @@ struct TTxPartition { const TRequestInfoPtr RequestInfo; const TBlockRange32 BlockRange; - const bool ForceFullCompaction; + const TCompactionOptions CompactionOptions; TGarbageInfo GarbageInfo; ui64 CommitId = 0; @@ -254,17 +255,16 @@ struct TTxPartition TCompaction( TRequestInfoPtr requestInfo, const TBlockRange32& blockRange, - bool forceFullCompaction) + TCompactionOptions compactionOptions) : RequestInfo(std::move(requestInfo)) , BlockRange(blockRange) - , ForceFullCompaction(forceFullCompaction) + , CompactionOptions(compactionOptions) {} TCompaction( TRequestInfoPtr requestInfo, TGarbageInfo garbageInfo) : RequestInfo(std::move(requestInfo)) - , ForceFullCompaction(false) , GarbageInfo(std::move(garbageInfo)) {} diff --git a/cloud/blockstore/libs/storage/partition2/part2_ut.cpp b/cloud/blockstore/libs/storage/partition2/part2_ut.cpp index c5b29ccab4e..3e631ab56d7 100644 --- a/cloud/blockstore/libs/storage/partition2/part2_ut.cpp +++ b/cloud/blockstore/libs/storage/partition2/part2_ut.cpp @@ -674,9 +674,10 @@ class TPartitionClient return std::make_unique(); } - std::unique_ptr CreateCompactionRequest() + template + std::unique_ptr CreateCompactionRequest(TArgs&&... args) { - return std::make_unique(); + return std::make_unique(std::forward(args)...); } std::unique_ptr CreateCompactionRequest( @@ -691,16 +692,6 @@ class TPartitionClient return request; } - std::unique_ptr CreateCompactionRequest( - ui32 blockIndex, - bool forceFullCompaction = false) - { - return std::make_unique( - blockIndex, - forceFullCompaction - ); - } - std::unique_ptr CreateCleanupRequest( TEvPartitionPrivate::ECleanupMode mode = TEvPartitionPrivate::DirtyBlobCleanup) { @@ -4997,13 +4988,19 @@ Y_UNIT_TEST_SUITE(TPartition2Test) for (ui32 i = 0; i < 4; ++i) { partition.WriteBlocks(0, 100); - auto compResponse = partition.CompactRange(0, 100); - op.push_back(compResponse->Record.GetOperationId()); - } + partition.SendCompactRangeRequest(0, 100); - for (ui32 i = 0; i < 4; ++i) - { - partition.GetCompactionStatus(op[i]); + { + TDispatchOptions options; + options.FinalEvents.emplace_back( + TEvPartitionPrivate::EvForcedCompactionCompleted, + 1); + runtime->DispatchEvents(options); + } + + auto response = partition.RecvCompactRangeResponse(); + UNIT_ASSERT_VALUES_EQUAL(S_OK, response->GetStatus()); + op.push_back(response->Record.GetOperationId()); } runtime->AdvanceCurrentTime(CompactOpHistoryDuration + TDuration::Seconds(1)); @@ -7084,6 +7081,88 @@ Y_UNIT_TEST_SUITE(TPartition2Test) } UNIT_ASSERT_VALUES_EQUAL(1, failedReadBlob); } + + Y_UNIT_TEST(ShouldAllowForcedCompactionRequestsInPresenseOfTabletCompaction) + { + constexpr ui32 rangesCount = 5; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), rangesCount * 1024); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + for (ui32 range = 0; range < rangesCount; ++range) { + partition.WriteBlocks( + TBlockRange32::WithLength(range * 1024, 1024), + 1); + } + partition.Flush(); + + bool steal = true; + runtime->SetEventFilter([&] + (TTestActorRuntimeBase& runtime, TAutoPtr& ev) + { + Y_UNUSED(runtime); + + if (ev->GetTypeRewrite() == TEvPartitionPrivate::EvCompactionCompleted && + steal) + { + steal = false; + return true; + } + return false; + }); + + partition.SendCompactionRequest( + 0, + TCompactionOptions()); + partition.Compaction( + 0, + TCompactionOptions(). + set(ToBit(ECompactionOption::Forced))); + } + + Y_UNIT_TEST(ShouldAllowOnlyOneForcedCompactionRequestAtATime) + { + constexpr ui32 rangesCount = 5; + auto runtime = PrepareTestActorRuntime(DefaultConfig(), rangesCount * 1024); + + TPartitionClient partition(*runtime); + partition.WaitReady(); + + for (ui32 range = 0; range < rangesCount; ++range) { + partition.WriteBlocks( + TBlockRange32::WithLength(range * 1024, 1024), + 1); + } + partition.Flush(); + + bool steal = true; + runtime->SetEventFilter([&] + (TTestActorRuntimeBase& runtime, TAutoPtr& ev) + { + Y_UNUSED(runtime); + + if (ev->GetTypeRewrite() == TEvPartitionPrivate::EvCompactionCompleted && + steal) + { + steal = false; + return true; + } + return false; + }); + + partition.SendCompactionRequest( + 0, + TCompactionOptions(). + set(ToBit(ECompactionOption::Forced))); + partition.SendCompactionRequest( + 0, + TCompactionOptions(). + set(ToBit(ECompactionOption::Forced))); + + auto response = partition.RecvCompactionResponse(); + UNIT_ASSERT_VALUES_EQUAL(E_TRY_AGAIN, response->GetStatus()); + } } } // namespace NCloud::NBlockStore::NStorage::NPartition2