Skip to content

Commit

Permalink
allow forced compaction generated by external script (blockstore-pcom…
Browse files Browse the repository at this point in the history
…pact-tablets) to advance in presense of heavy tablet compaction (#2527)

* let forced compaction to proceed in a presence of heavy tablet compaction

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update
  • Loading branch information
yegorskii authored Dec 10, 2024
1 parent 5329d25 commit 4f5380e
Show file tree
Hide file tree
Showing 19 changed files with 460 additions and 106 deletions.
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/compaction_options.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "compaction_options.h"
26 changes: 26 additions & 0 deletions cloud/blockstore/libs/storage/core/compaction_options.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <util/system/types.h>

#include <bitset>

namespace NCloud::NBlockStore::NStorage {

////////////////////////////////////////////////////////////////////////////////

enum class ECompactionOption: size_t
{
Full, // non-incremental compaction
Forced, // compaction initiated externally
MaxFieldNumber,
};

constexpr size_t ToBit(ECompactionOption option)
{
return static_cast<size_t>(option);
}

using TCompactionOptions =
std::bitset<ToBit(ECompactionOption::MaxFieldNumber)>;

} // namespace NCloud::NBlockStore::NStorage
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/compaction_type.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "compaction_type.h"
15 changes: 15 additions & 0 deletions cloud/blockstore/libs/storage/core/compaction_type.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include <util/system/types.h>

namespace NCloud::NBlockStore::NStorage {

////////////////////////////////////////////////////////////////////////////////

enum class ECompactionType: ui32
{
Forced, // compaction initiated externally
Tablet, // compaction initiated by tablet
};

} // namespace NCloud::NBlockStore::NStorage
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/storage/core/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ GENERATE_ENUM_SERIALIZATION(mount_token.h)
SRCS(
block_handler.cpp
compaction_map.cpp
compaction_options.cpp
compaction_policy.cpp
compaction_type.cpp
config.cpp
disk_counters.cpp
disk_validation.cpp
Expand Down
51 changes: 38 additions & 13 deletions cloud/blockstore/libs/storage/partition/part_actor_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class TCompactionActor final
const ui32 MaxAffectedBlocksPerCompaction;
const IBlockDigestGeneratorPtr BlockDigestGenerator;
const TDuration ReadBlobTimeout;
const ECompactionType CompactionType;

const ui64 CommitId;

Expand Down Expand Up @@ -181,6 +182,7 @@ class TCompactionActor final
ui32 maxAffectedBlocksPerCompaction,
IBlockDigestGeneratorPtr blockDigestGenerator,
TDuration readBlobTimeout,
ECompactionType compactionType,
ui64 commitId,
TVector<TRangeCompactionInfo> rangeCompactionInfos,
TVector<TRequest> requests);
Expand Down Expand Up @@ -242,6 +244,7 @@ TCompactionActor::TCompactionActor(
ui32 maxAffectedBlocksPerCompaction,
IBlockDigestGeneratorPtr blockDigestGenerator,
TDuration readBlobTimeout,
ECompactionType compactionType,
ui64 commitId,
TVector<TRangeCompactionInfo> rangeCompactionInfos,
TVector<TRequest> requests)
Expand All @@ -253,6 +256,7 @@ TCompactionActor::TCompactionActor(
, MaxAffectedBlocksPerCompaction(maxAffectedBlocksPerCompaction)
, BlockDigestGenerator(std::move(blockDigestGenerator))
, ReadBlobTimeout(readBlobTimeout)
, CompactionType(compactionType)
, CommitId(commitId)
, RangeCompactionInfos(std::move(rangeCompactionInfos))
, Requests(std::move(requests))
Expand Down Expand Up @@ -736,6 +740,7 @@ void TCompactionActor::NotifyCompleted(
request->AffectedRanges.push_back(ConvertRangeSafe(rc.BlockRange));
}
request->AffectedBlockInfos = std::move(AffectedBlockInfos);
request->CompactionType = CompactionType;

NCloud::Send(ctx, Tablet, std::move(request));
}
Expand Down Expand Up @@ -1029,7 +1034,9 @@ void TPartitionActor::ChangeRangeCountPerRunIfNeeded(

void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx)
{
if (State->GetCompactionState().Status != EOperationStatus::Idle) {
if (State->GetCompactionState(ECompactionType::Tablet).Status !=
EOperationStatus::Idle)
{
// already enqueued
return;
}
Expand Down Expand Up @@ -1125,7 +1132,8 @@ void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx)
throttlingAllowed = false;
}

State->GetCompactionState().SetStatus(EOperationStatus::Enqueued);
State->GetCompactionState(ECompactionType::Tablet).SetStatus(
EOperationStatus::Enqueued);

if (Config->GetCompactionCountPerRunIncreasingThreshold()
&& Config->GetCompactionCountPerRunDecreasingThreshold()
Expand Down Expand Up @@ -1176,7 +1184,7 @@ void TPartitionActor::EnqueueCompactionIfNeeded(const TActorContext& ctx)
if (mode == TEvPartitionPrivate::GarbageCompaction
|| !diskGarbageBelowThreshold)
{
request->ForceFullCompaction = true;
request->CompactionOptions.set(ToBit(ECompactionOption::Full));
}

if (throttlingAllowed && Config->GetMaxCompactionDelay()) {
Expand Down Expand Up @@ -1245,13 +1253,21 @@ void TPartitionActor::HandleCompaction(
NCloud::Reply(ctx, requestInfo, std::move(response));
};

if (State->GetCompactionState().Status == EOperationStatus::Started) {
const auto compactionType =
msg->CompactionOptions.test(ToBit(ECompactionOption::Forced)) ?
ECompactionType::Forced:
ECompactionType::Tablet;

if (State->GetCompactionState(compactionType).Status ==
EOperationStatus::Started)
{
replyError(ctx, *requestInfo, E_TRY_AGAIN, "compaction already started");
return;
}

if (!State->IsCompactionAllowed()) {
State->GetCompactionState().SetStatus(EOperationStatus::Idle);
State->GetCompactionState(compactionType).SetStatus(
EOperationStatus::Idle);

replyError(ctx, *requestInfo, E_BS_OUT_OF_SPACE, "all channels readonly");
return;
Expand Down Expand Up @@ -1287,7 +1303,8 @@ void TPartitionActor::HandleCompaction(
}

if (tops.empty() || !tops.front().Stat.BlobCount) {
State->GetCompactionState().SetStatus(EOperationStatus::Idle);
State->GetCompactionState(compactionType).SetStatus(
EOperationStatus::Idle);

replyError(ctx, *requestInfo, S_ALREADY, "nothing to compact");
return;
Expand All @@ -1310,9 +1327,10 @@ void TPartitionActor::HandleCompaction(
State->GetBlocksCount() - 1);

LOG_DEBUG(ctx, TBlockStoreComponents::PARTITION,
"[%lu] Start compaction @%lu (range: %s, blobs: %u, blocks: %u"
"[%lu] Start %s compaction @%lu (range: %s, blobs: %u, blocks: %u"
", reads: %u, blobsread: %u, blocksread: %u, score: %f)",
TabletID(),
compactionType == ECompactionType::Forced ? "forced" : "tablet",
commitId,
DescribeRange(blockRange).c_str(),
x.Stat.BlobCount,
Expand All @@ -1325,7 +1343,7 @@ void TPartitionActor::HandleCompaction(
ranges.emplace_back(rangeIdx, blockRange);
}

State->GetCompactionState().SetStatus(EOperationStatus::Started);
State->GetCompactionState(compactionType).SetStatus(EOperationStatus::Started);

State->GetCommitQueue().AcquireBarrier(commitId);
State->GetCleanupQueue().AcquireBarrier(commitId);
Expand All @@ -1336,7 +1354,7 @@ void TPartitionActor::HandleCompaction(
auto tx = CreateTx<TCompaction>(
requestInfo,
commitId,
msg->ForceFullCompaction,
msg->CompactionOptions,
std::move(ranges));

ui64 minCommitId = State->GetCommitQueue().GetMinCommitId();
Expand Down Expand Up @@ -1401,7 +1419,8 @@ void TPartitionActor::HandleCompactionCompleted(
State->GetCleanupQueue().ReleaseBarrier(commitId);
State->GetGarbageQueue().ReleaseBarrier(commitId);

State->GetCompactionState().SetStatus(EOperationStatus::Idle);
State->GetCompactionState(msg->CompactionType).SetStatus(
EOperationStatus::Idle);

Actors.Erase(ev->Sender);

Expand Down Expand Up @@ -1458,7 +1477,7 @@ void PrepareRangeCompaction(
const TString& folderId,
const TString& diskId,
const ui64 commitId,
const bool forceFullCompaction,
const bool fullCompaction,
const TActorContext& ctx,
const ui64 tabletId,
THashSet<TPartialBlobId, TPartialBlobIdHash>& affectedBlobIds,
Expand Down Expand Up @@ -1503,7 +1522,7 @@ void PrepareRangeCompaction(

if (ready
&& incrementalCompactionEnabled
&& !forceFullCompaction)
&& !fullCompaction)
{
THashMap<TPartialBlobId, ui32, TPartialBlobIdHash> liveBlocks;
for (const auto& m: args.BlockMarks) {
Expand Down Expand Up @@ -1885,7 +1904,7 @@ bool TPartitionActor::PrepareCompaction(
PartitionConfig.GetFolderId(),
PartitionConfig.GetDiskId(),
args.CommitId,
args.ForceFullCompaction,
args.CompactionOptions.test(ToBit(ECompactionOption::Full)),
ctx,
TabletID(),
affectedBlobIds,
Expand Down Expand Up @@ -1962,6 +1981,11 @@ void TPartitionActor::CompleteCompaction(
Config->GetBlobStorageAsyncGetTimeoutSSD() :
Config->GetBlobStorageAsyncGetTimeoutHDD();

const auto compactionType =
args.CompactionOptions.test(ToBit(ECompactionOption::Forced)) ?
ECompactionType::Forced:
ECompactionType::Tablet;

auto actor = NCloud::Register<TCompactionActor>(
ctx,
args.RequestInfo,
Expand All @@ -1972,6 +1996,7 @@ void TPartitionActor::CompleteCompaction(
Config->GetMaxAffectedBlocksPerCompaction(),
BlockDigestGenerator,
readBlobTimeout,
compactionType,
args.CommitId,
std::move(rangeCompactionInfos),
std::move(requests));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ void TForcedCompactionActor::SendCompactionRequest(const TActorContext& ctx)
auto request = std::make_unique<TEvPartitionPrivate::TEvCompactionRequest>(
MakeIntrusive<TCallContext>(),
RangesToCompact[CurrentBlock],
true);
TCompactionOptions().
set(ToBit(ECompactionOption::Forced)).
set(ToBit(ECompactionOption::Full)));

NCloud::Send(ctx, Tablet, std::move(request));
}
Expand Down
30 changes: 24 additions & 6 deletions cloud/blockstore/libs/storage/partition/part_events_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <cloud/blockstore/libs/diagnostics/profile_log.h>
#include <cloud/blockstore/libs/kikimr/components.h>
#include <cloud/blockstore/libs/kikimr/events.h>
#include <cloud/blockstore/libs/storage/core/compaction_options.h>
#include <cloud/blockstore/libs/storage/core/compaction_type.h>
#include <cloud/blockstore/libs/storage/core/request_info.h>
#include <cloud/blockstore/libs/storage/model/channel_data_kind.h>
#include <cloud/blockstore/libs/storage/model/channel_permissions.h>
Expand Down Expand Up @@ -441,14 +443,19 @@ struct TEvPartitionPrivate
{
ECompactionMode Mode = RangeCompaction;
TMaybe<ui32> BlockIndex;
bool ForceFullCompaction = false;
TCompactionOptions CompactionOptions;

TCompactionRequest() = default;

TCompactionRequest(ui32 blockIndex, bool forceFullCompaction)
: Mode(RangeCompaction)
, BlockIndex(blockIndex)
, ForceFullCompaction(forceFullCompaction)
TCompactionRequest(
ui32 blockIndex,
TCompactionOptions compactionOptions)
: BlockIndex(blockIndex)
, CompactionOptions(compactionOptions)
{}

TCompactionRequest(ui32 blockIndex)
: TCompactionRequest(blockIndex, {})
{}

TCompactionRequest(ECompactionMode mode)
Expand Down Expand Up @@ -773,6 +780,17 @@ struct TEvPartitionPrivate
{
};


//
// CompactionCompleted
//

struct TCompactionCompleted
: TOperationCompleted
{
ECompactionType CompactionType;
};

//
// MetadataRebuildCompleted
//
Expand Down Expand Up @@ -880,7 +898,7 @@ struct TEvPartitionPrivate
using TEvWriteBlocksCompleted = TResponseEvent<TWriteBlocksCompleted, EvWriteBlocksCompleted>;
using TEvZeroBlocksCompleted = TResponseEvent<TOperationCompleted, EvZeroBlocksCompleted>;
using TEvFlushCompleted = TResponseEvent<TFlushCompleted, EvFlushCompleted>;
using TEvCompactionCompleted = TResponseEvent<TOperationCompleted, EvCompactionCompleted>;
using TEvCompactionCompleted = TResponseEvent<TCompactionCompleted, EvCompactionCompleted>;
using TEvCollectGarbageCompleted = TResponseEvent<TOperationCompleted, EvCollectGarbageCompleted>;
using TEvForcedCompactionCompleted = TResponseEvent<TForcedCompactionCompleted, EvForcedCompactionCompleted>;
using TEvMetadataRebuildCompleted = TResponseEvent<TOperationCompleted, EvMetadataRebuildCompleted>;
Expand Down
10 changes: 10 additions & 0 deletions cloud/blockstore/libs/storage/partition/part_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,16 @@ ui64 TPartitionState::GetMixedIndexCacheMemSize() const
return MixedIndexCacheAllocator.GetBytesAllocated();
}

////////////////////////////////////////////////////////////////////////////////
// Compaction

TOperationState& TPartitionState::GetCompactionState(ECompactionType type)
{
return type == ECompactionType::Forced ?
ForcedCompactionState.State :
CompactionState;
}

////////////////////////////////////////////////////////////////////////////////

void TPartitionState::SetUsedBlocks(
Expand Down
7 changes: 3 additions & 4 deletions cloud/blockstore/libs/storage/partition/part_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cloud/blockstore/libs/diagnostics/downtime_history.h>
#include <cloud/blockstore/libs/storage/api/partition.h>
#include <cloud/blockstore/libs/storage/core/compaction_map.h>
#include <cloud/blockstore/libs/storage/core/compaction_type.h>
#include <cloud/blockstore/libs/storage/core/request_buffer.h>
#include <cloud/blockstore/libs/storage/core/request_info.h>
#include <cloud/blockstore/libs/storage/core/ts_ring_buffer.h>
Expand Down Expand Up @@ -78,6 +79,7 @@ struct TForcedCompactionState
ui32 Progress = 0;
ui32 RangesCount = 0;
TString OperationId;
TOperationState State;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -800,10 +802,7 @@ class TPartitionState
TInstant LastCompactionRangeCountPerRunTs;

public:
TOperationState& GetCompactionState()
{
return CompactionState;
}
TOperationState& GetCompactionState(ECompactionType type);

TCompactionMap& GetCompactionMap()
{
Expand Down
7 changes: 4 additions & 3 deletions cloud/blockstore/libs/storage/partition/part_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cloud/blockstore/libs/storage/api/partition.h>
#include <cloud/blockstore/libs/storage/core/block_handler.h>
#include <cloud/blockstore/libs/storage/core/compaction_map.h>
#include <cloud/blockstore/libs/storage/core/compaction_options.h>
#include <cloud/blockstore/libs/storage/core/request_info.h>
#include <cloud/blockstore/libs/storage/partition/model/blob_to_confirm.h>
#include <cloud/blockstore/libs/storage/partition/model/block.h>
Expand Down Expand Up @@ -430,18 +431,18 @@ struct TTxPartition
{
const TRequestInfoPtr RequestInfo;
const ui64 CommitId;
const bool ForceFullCompaction;
const TCompactionOptions CompactionOptions;

TVector<TRangeCompaction> RangeCompactions;

TCompaction(
TRequestInfoPtr requestInfo,
ui64 commitId,
bool forceFullCompaction,
TCompactionOptions compactionOptions,
const TVector<std::pair<ui32, TBlockRange32>>& ranges)
: RequestInfo(std::move(requestInfo))
, CommitId(commitId)
, ForceFullCompaction(forceFullCompaction)
, CompactionOptions(compactionOptions)
{
RangeCompactions.reserve(ranges.size());
for (const auto& range: ranges) {
Expand Down
Loading

0 comments on commit 4f5380e

Please sign in to comment.