Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow forced compaction generated by external script (blockstore-pcompact-tablets) to advance in presense of heavy tablet compaction #2527

Merged
merged 21 commits into from
Dec 10, 2024
Merged
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/compaction_options.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "compaction_options.h"
26 changes: 26 additions & 0 deletions cloud/blockstore/libs/storage/core/compaction_options.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <util/system/types.h>

#include <bitset>

namespace NCloud::NBlockStore::NStorage {

////////////////////////////////////////////////////////////////////////////////

enum class ECompactionOption: size_t
{
Full, // non-incremental compaction
Forced, // compaction initiated externally
MaxFieldNumber,
};

constexpr size_t ToBit(ECompactionOption option)
{
return static_cast<size_t>(option);
}

using TCompactionOptions =
std::bitset<ToBit(ECompactionOption::MaxFieldNumber)>;

} // namespace NCloud::NBlockStore::NStorage
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/compaction_type.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "compaction_type.h"
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/core/compaction_type.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <util/system/types.h>

namespace NCloud::NBlockStore::NStorage {

////////////////////////////////////////////////////////////////////////////////

enum class ECompactionType: ui32
{
Forced, // compaction initiated externally
Tablet, // compaction initiated by tablet
};

} // namespace NCloud::NBlockStore::NStorage
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/storage/core/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ GENERATE_ENUM_SERIALIZATION(mount_token.h)
SRCS(
block_handler.cpp
compaction_map.cpp
compaction_options.cpp
compaction_policy.cpp
compaction_type.cpp
config.cpp
disk_counters.cpp
disk_validation.cpp
Expand Down
51 changes: 38 additions & 13 deletions cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class TCompactionActor final
const ui32 MaxAffectedBlocksPerCompaction;
const IBlockDigestGeneratorPtr BlockDigestGenerator;
const TDuration ReadBlobTimeout;
const ECompactionType CompactionType;

const ui64 CommitId;

Expand Down Expand Up @@ -181,6 +182,7 @@ class TCompactionActor final
ui32 maxAffectedBlocksPerCompaction,
IBlockDigestGeneratorPtr blockDigestGenerator,
TDuration readBlobTimeout,
ECompactionType compactionType,
ui64 commitId,
TVector<TRangeCompactionInfo> rangeCompactionInfos,
TVector<TRequest> requests);
Expand Down Expand Up @@ -242,6 +244,7 @@ TCompactionActor::TCompactionActor(
ui32 maxAffectedBlocksPerCompaction,
IBlockDigestGeneratorPtr blockDigestGenerator,
TDuration readBlobTimeout,
ECompactionType compactionType,
ui64 commitId,
TVector<TRangeCompactionInfo> rangeCompactionInfos,
TVector<TRequest> requests)
Expand All @@ -253,6 +256,7 @@ TCompactionActor::TCompactionActor(
, MaxAffectedBlocksPerCompaction(maxAffectedBlocksPerCompaction)
, BlockDigestGenerator(std::move(blockDigestGenerator))
, ReadBlobTimeout(readBlobTimeout)
, CompactionType(compactionType)
, CommitId(commitId)
, RangeCompactionInfos(std::move(rangeCompactionInfos))
, Requests(std::move(requests))
Expand Down Expand Up @@ -736,6 +740,7 @@ void TCompactionActor::NotifyCompleted(
request->AffectedRanges.push_back(ConvertRangeSafe(rc.BlockRange));
}
request->AffectedBlockInfos = std::move(AffectedBlockInfos);
request->CompactionType = CompactionType;

NCloud::Send(ctx, Tablet, std::move(request));
}
Expand Down Expand Up @@ -1029,7 +1034,9 @@ void TPartitionActor::ChangeRangeCountPerRunIfNeeded(

void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx)
{
if (State->GetCompactionState().Status != EOperationStatus::Idle) {
if (State->GetCompactionState(ECompactionType::Tablet).Status !=
EOperationStatus::Idle)
{
// already enqueued
return;
}
Expand Down Expand Up @@ -1125,7 +1132,8 @@ void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx)
throttlingAllowed = false;
}

State->GetCompactionState().SetStatus(EOperationStatus::Enqueued);
State->GetCompactionState(ECompactionType::Tablet).SetStatus(
EOperationStatus::Enqueued);

if (Config->GetCompactionCountPerRunIncreasingThreshold()
&& Config->GetCompactionCountPerRunDecreasingThreshold()
Expand Down Expand Up @@ -1176,7 +1184,7 @@ void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx)
if (mode == TEvPartitionPrivate::GarbageCompaction
|| !diskGarbageBelowThreshold)
{
request->ForceFullCompaction = true;
request->CompactionOptions.set(ToBit(ECompactionOption::Full));
}

if (throttlingAllowed && Config->GetMaxCompactionDelay()) {
Expand Down Expand Up @@ -1245,13 +1253,21 @@ void TPartitionActor::HandleCompaction(
NCloud::Reply(ctx, requestInfo, std::move(response));
};

if (State->GetCompactionState().Status == EOperationStatus::Started) {
const auto compactionType =
msg->CompactionOptions.test(ToBit(ECompactionOption::Forced)) ?
ECompactionType::Forced:
ECompactionType::Tablet;

if (State->GetCompactionState(compactionType).Status ==
EOperationStatus::Started)
{
replyError(ctx, *requestInfo, E_TRY_AGAIN, "compaction already started");
return;
}

if (!State->IsCompactionAllowed()) {
State->GetCompactionState().SetStatus(EOperationStatus::Idle);
State->GetCompactionState(compactionType).SetStatus(
EOperationStatus::Idle);

replyError(ctx, *requestInfo, E_BS_OUT_OF_SPACE, "all channels readonly");
return;
Expand Down Expand Up @@ -1287,7 +1303,8 @@ void TPartitionActor::HandleCompaction(
}

if (tops.empty() || !tops.front().Stat.BlobCount) {
State->GetCompactionState().SetStatus(EOperationStatus::Idle);
State->GetCompactionState(compactionType).SetStatus(
EOperationStatus::Idle);

replyError(ctx, *requestInfo, S_ALREADY, "nothing to compact");
return;
Expand All @@ -1310,9 +1327,10 @@ void TPartitionActor::HandleCompaction(
State->GetBlocksCount() - 1);

LOG_DEBUG(ctx, TBlockStoreComponents::PARTITION,
"[%lu] Start compaction @%lu (range: %s, blobs: %u, blocks: %u"
"[%lu] Start %s compaction @%lu (range: %s, blobs: %u, blocks: %u"
", reads: %u, blobsread: %u, blocksread: %u, score: %f)",
TabletID(),
compactionType == ECompactionType::Forced ? "forced" : "tablet",
commitId,
DescribeRange(blockRange).c_str(),
x.Stat.BlobCount,
Expand All @@ -1325,7 +1343,7 @@ void TPartitionActor::HandleCompaction(
ranges.emplace_back(rangeIdx, blockRange);
}

State->GetCompactionState().SetStatus(EOperationStatus::Started);
State->GetCompactionState(compactionType).SetStatus(EOperationStatus::Started);

State->GetCommitQueue().AcquireBarrier(commitId);
State->GetCleanupQueue().AcquireBarrier(commitId);
Expand All @@ -1336,7 +1354,7 @@ void TPartitionActor::HandleCompaction(
auto tx = CreateTx<TCompaction>(
requestInfo,
commitId,
msg->ForceFullCompaction,
msg->CompactionOptions,
std::move(ranges));

ui64 minCommitId = State->GetCommitQueue().GetMinCommitId();
Expand Down Expand Up @@ -1401,7 +1419,8 @@ void TPartitionActor::HandleCompactionCompleted(
State->GetCleanupQueue().ReleaseBarrier(commitId);
State->GetGarbageQueue().ReleaseBarrier(commitId);

State->GetCompactionState().SetStatus(EOperationStatus::Idle);
State->GetCompactionState(msg->CompactionType).SetStatus(
EOperationStatus::Idle);

Actors.Erase(ev->Sender);

Expand Down Expand Up @@ -1458,7 +1477,7 @@ void PrepareRangeCompaction(
const TString& folderId,
const TString& diskId,
const ui64 commitId,
const bool forceFullCompaction,
const bool fullCompaction,
const TActorContext& ctx,
const ui64 tabletId,
THashSet<TPartialBlobId, TPartialBlobIdHash>& affectedBlobIds,
Expand Down Expand Up @@ -1503,7 +1522,7 @@ void PrepareRangeCompaction(

if (ready
&& incrementalCompactionEnabled
&& !forceFullCompaction)
&& !fullCompaction)
{
THashMap<TPartialBlobId, ui32, TPartialBlobIdHash> liveBlocks;
for (const auto& m: args.BlockMarks) {
Expand Down Expand Up @@ -1885,7 +1904,7 @@ bool TPartitionActor::PrepareCompaction(
PartitionConfig.GetFolderId(),
PartitionConfig.GetDiskId(),
args.CommitId,
args.ForceFullCompaction,
args.CompactionOptions.test(ToBit(ECompactionOption::Full)),
ctx,
TabletID(),
affectedBlobIds,
Expand Down Expand Up @@ -1962,6 +1981,11 @@ void TPartitionActor::CompleteCompaction(
Config->GetBlobStorageAsyncGetTimeoutSSD() :
Config->GetBlobStorageAsyncGetTimeoutHDD();

const auto compactionType =
args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ?
ECompactionType::Forced:
ECompactionType::Tablet;

auto actor = NCloud::Register<TCompactionActor>(
ctx,
args.RequestInfo,
Expand All @@ -1972,6 +1996,7 @@ void TPartitionActor::CompleteCompaction(
Config->GetMaxAffectedBlocksPerCompaction(),
BlockDigestGenerator,
readBlobTimeout,
compactionType,
args.CommitId,
std::move(rangeCompactionInfos),
std::move(requests));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ void TForcedCompactionActor::SendCompactionRequest(const TActorContext& ctx)
auto request = std::make_unique<TEvPartitionPrivate::TEvCompactionRequest>(
MakeIntrusive<TCallContext>(),
RangesToCompact[CurrentBlock],
true);
TCompactionOptions().
set(ToBit(ECompactionOption::Forced)).
set(ToBit(ECompactionOption::Full)));

NCloud::Send(ctx, Tablet, std::move(request));
}
Expand Down
30 changes: 24 additions & 6 deletions cloud/blockstore/libs/storage/partition/part_events_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <cloud/blockstore/libs/diagnostics/profile_log.h>
#include <cloud/blockstore/libs/kikimr/components.h>
#include <cloud/blockstore/libs/kikimr/events.h>
#include <cloud/blockstore/libs/storage/core/compaction_options.h>
#include <cloud/blockstore/libs/storage/core/compaction_type.h>
#include <cloud/blockstore/libs/storage/core/request_info.h>
#include <cloud/blockstore/libs/storage/model/channel_data_kind.h>
#include <cloud/blockstore/libs/storage/model/channel_permissions.h>
Expand Down Expand Up @@ -441,14 +443,19 @@ struct TEvPartitionPrivate
{
ECompactionMode Mode = RangeCompaction;
TMaybe<ui32> BlockIndex;
bool ForceFullCompaction = false;
TCompactionOptions CompactionOptions;

TCompactionRequest() = default;

TCompactionRequest(ui32 blockIndex, bool forceFullCompaction)
: Mode(RangeCompaction)
, BlockIndex(blockIndex)
, ForceFullCompaction(forceFullCompaction)
TCompactionRequest(
ui32 blockIndex,
TCompactionOptions compactionOptions)
: BlockIndex(blockIndex)
, CompactionOptions(compactionOptions)
{}

TCompactionRequest(ui32 blockIndex)
: TCompactionRequest(blockIndex, {})
{}

TCompactionRequest(ECompactionMode mode)
Expand Down Expand Up @@ -773,6 +780,17 @@ struct TEvPartitionPrivate
{
};


//
// CompactionCompleted
//

struct TCompactionCompleted
: TOperationCompleted
{
ECompactionType CompactionType;
};

//
// MetadataRebuildCompleted
//
Expand Down Expand Up @@ -880,7 +898,7 @@ struct TEvPartitionPrivate
using TEvWriteBlocksCompleted = TResponseEvent<TWriteBlocksCompleted, EvWriteBlocksCompleted>;
using TEvZeroBlocksCompleted = TResponseEvent<TOperationCompleted, EvZeroBlocksCompleted>;
using TEvFlushCompleted = TResponseEvent<TFlushCompleted, EvFlushCompleted>;
using TEvCompactionCompleted = TResponseEvent<TOperationCompleted, EvCompactionCompleted>;
using TEvCompactionCompleted = TResponseEvent<TCompactionCompleted, EvCompactionCompleted>;
using TEvCollectGarbageCompleted = TResponseEvent<TOperationCompleted, EvCollectGarbageCompleted>;
using TEvForcedCompactionCompleted = TResponseEvent<TForcedCompactionCompleted, EvForcedCompactionCompleted>;
using TEvMetadataRebuildCompleted = TResponseEvent<TOperationCompleted, EvMetadataRebuildCompleted>;
Expand Down
10 changes: 10 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,16 @@ ui64 TPartitionState::GetMixedIndexCacheMemSize() const
return MixedIndexCacheAllocator.GetBytesAllocated();
}

////////////////////////////////////////////////////////////////////////////////
// Compaction

TOperationState& TPartitionState::GetCompactionState(ECompactionType type)
{
return type == ECompactionType::Forced ?
ForcedCompactionState.State :
CompactionState;
}

////////////////////////////////////////////////////////////////////////////////

void TPartitionState::SetUsedBlocks(
Expand Down
7 changes: 3 additions & 4 deletions cloud/blockstore/libs/storage/partition/part_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cloud/blockstore/libs/diagnostics/downtime_history.h>
#include <cloud/blockstore/libs/storage/api/partition.h>
#include <cloud/blockstore/libs/storage/core/compaction_map.h>
#include <cloud/blockstore/libs/storage/core/compaction_type.h>
#include <cloud/blockstore/libs/storage/core/request_buffer.h>
#include <cloud/blockstore/libs/storage/core/request_info.h>
#include <cloud/blockstore/libs/storage/core/ts_ring_buffer.h>
Expand Down Expand Up @@ -78,6 +79,7 @@ struct TForcedCompactionState
ui32 Progress = 0;
ui32 RangesCount = 0;
TString OperationId;
TOperationState State;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -800,10 +802,7 @@ class TPartitionState
TInstant LastCompactionRangeCountPerRunTs;

public:
TOperationState& GetCompactionState()
{
return CompactionState;
}
TOperationState& GetCompactionState(ECompactionType type);

TCompactionMap& GetCompactionMap()
{
Expand Down
7 changes: 4 additions & 3 deletions cloud/blockstore/libs/storage/partition/part_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cloud/blockstore/libs/storage/api/partition.h>
#include <cloud/blockstore/libs/storage/core/block_handler.h>
#include <cloud/blockstore/libs/storage/core/compaction_map.h>
#include <cloud/blockstore/libs/storage/core/compaction_options.h>
#include <cloud/blockstore/libs/storage/core/request_info.h>
#include <cloud/blockstore/libs/storage/partition/model/blob_to_confirm.h>
#include <cloud/blockstore/libs/storage/partition/model/block.h>
Expand Down Expand Up @@ -430,18 +431,18 @@ struct TTxPartition
{
const TRequestInfoPtr RequestInfo;
const ui64 CommitId;
const bool ForceFullCompaction;
const TCompactionOptions CompactionOptions;

TVector<TRangeCompaction> RangeCompactions;

TCompaction(
TRequestInfoPtr requestInfo,
ui64 commitId,
bool forceFullCompaction,
TCompactionOptions compactionOptions,
const TVector<std::pair<ui32, TBlockRange32>>& ranges)
: RequestInfo(std::move(requestInfo))
, CommitId(commitId)
, ForceFullCompaction(forceFullCompaction)
, CompactionOptions(compactionOptions)
{
RangeCompactions.reserve(ranges.size());
for (const auto& range: ranges) {
Expand Down
Loading
Loading