Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge to stable-23-3 #861

Merged
merged 8 commits into from
Mar 29, 2024
3 changes: 3 additions & 0 deletions cloud/blockstore/config/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -948,4 +948,7 @@ message TStorageServiceConfig
// Timeout for attempts to acquire the shadow disk when writes to the source
// disk are not blocked (in ms).
optional uint32 MaxAcquireShadowDiskTotalTimeoutWhenNonBlocked = 358;

// Max number of volume MetaHistory records displayed on volume monpage.
optional uint32 VolumeMetaHistoryDisplayedRecordLimit = 359;
}
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ TDuration MSeconds(ui32 value)
xxx(VolumeHistoryDuration, TDuration, Days(7) )\
xxx(VolumeHistoryCacheSize, ui32, 100 )\
xxx(DeletedCheckpointHistoryLifetime, TDuration, Days(7) )\
xxx(VolumeMetaHistoryDisplayedRecordLimit, ui32, 30 )\
\
xxx(BytesPerPartition, ui64, 512_TB )\
xxx(BytesPerPartitionSSD, ui64, 512_GB )\
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ class TStorageConfig

TDuration GetVolumeHistoryDuration() const;
ui32 GetVolumeHistoryCacheSize() const;
ui32 GetVolumeMetaHistoryDisplayedRecordLimit() const;

ui64 GetBytesPerPartition() const;
ui64 GetBytesPerPartitionSSD() const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ void TCompactionActor::WriteBlobs(const TActorContext& ctx)
auto request = std::make_unique<TEvPartitionPrivate::TEvWriteBlobRequest>(
rc.DataBlobId,
rc.BlobContent.GetGuardedSgList(),
0, // blockSizeForChecksums
true); // async

if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ void TFlushActor::WriteBlobs(const TActorContext& ctx)
auto request = std::make_unique<TEvPartitionPrivate::TEvWriteBlobRequest>(
req.BlobId,
req.BlobContent.GetGuardedSgList(),
0, // blockSizeForChecksums
true); // async

if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) {
Expand Down
24 changes: 23 additions & 1 deletion cloud/blockstore/libs/storage/partition/part_actor_writeblob.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "part_actor.h"

#include <cloud/blockstore/libs/diagnostics/block_digest.h>
#include <cloud/blockstore/libs/diagnostics/critical_events.h>
#include <cloud/blockstore/libs/storage/core/probes.h>
#include <cloud/blockstore/libs/storage/partition/model/fresh_blob.h>
Expand Down Expand Up @@ -42,6 +43,7 @@ class TWriteBlobActor final
TInstant ResponseReceived;
TStorageStatusFlags StorageStatusFlags;
double ApproximateFreeSpaceShare = 0;
TVector<ui32> BlockChecksums;

public:
TWriteBlobActor(
Expand Down Expand Up @@ -141,7 +143,26 @@ void TWriteBlobActor::SendPutRequest(const TActorContext& ctx)
blobContent = std::move(std::get<TString>(Request->Data));
}

Y_ABORT_UNLESS(!blobContent.Empty());
STORAGE_VERIFY(
!blobContent.Empty(),
TWellKnownEntityTypes::TABLET,
TabletId);

if (Request->BlockSizeForChecksums) {
STORAGE_VERIFY(
blobContent.Size() % Request->BlockSizeForChecksums == 0,
TWellKnownEntityTypes::TABLET,
TabletId);

ui32 offset = 0;
while (offset < blobContent.Size()) {
BlockChecksums.push_back(ComputeDefaultDigest({
blobContent.Data() + offset,
Request->BlockSizeForChecksums}));

offset += Request->BlockSizeForChecksums;
}
}

auto request = std::make_unique<TEvBlobStorage::TEvPut>(
MakeBlobId(TabletId, Request->BlobId),
Expand Down Expand Up @@ -242,6 +263,7 @@ void TWriteBlobActor::HandlePutResult(

auto response = std::make_unique<TResponse>();
response->ExecCycles = RequestInfo->GetExecCycles();
response->BlockChecksums = std::move(BlockChecksums);

ReplyAndDie(ctx, std::move(response));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void TPartitionActor::HandleWriteBlocksCompleted(
if (msg->UnconfirmedBlobsAdded) {
// blobs are confirmed, but AddBlobs request will be executed
// (for this commit) later
State->BlobsConfirmed(commitId);
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
Y_DEBUG_ABORT_UNLESS(msg->CollectGarbageBarrierAcquired);
// commit & garbage queue barriers will be released when confirmed
// blobs are added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ void TWriteFreshBlocksActor::WriteBlob(const TActorContext& ctx)
CombinedContext,
blobId,
std::move(BlobContent),
false); // async
0, // blockSizeForChecksums
false); // async

NCloud::Send(
ctx,
Expand Down Expand Up @@ -251,8 +252,10 @@ void TWriteFreshBlocksActor::NotifyCompleted(
using TEvent = TEvPartitionPrivate::TEvWriteBlocksCompleted;
auto ev = std::make_unique<TEvent>(
error,
false, // collectGarbageBarrierAcquired
false); // unconfirmedBlobsAdded
false, // collectGarbageBarrierAcquired
false, // unconfirmedBlobsAdded
TVector<TBlobToConfirm>{} // blobsToConfirm
);

ev->ExecCycles = Requests.front().RequestInfo->GetExecCycles();
ev->TotalCycles = Requests.front().RequestInfo->GetTotalCycles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ class TWriteMergedBlocksActor final
};

private:
const ui64 TabletId;
const TActorId Tablet;
const IBlockDigestGeneratorPtr BlockDigestGenerator;
const ui64 CommitId;
const TRequestInfoPtr RequestInfo;
TVector<TWriteBlobRequest> WriteBlobRequests;
TVector<TBlobToConfirm> BlobsToConfirm;
const bool ReplyLocal;
const bool ShouldAddUnconfirmedBlobs = false;
const IWriteBlocksHandlerPtr WriteHandler;
const bool ChecksumsEnabled;
const ui32 BlockSizeForChecksums;

TVector<IProfileLog::TBlockInfo> AffectedBlockInfos;
size_t WriteBlobRequestsCompleted = 0;
Expand All @@ -72,6 +74,7 @@ class TWriteMergedBlocksActor final

public:
TWriteMergedBlocksActor(
const ui64 tabletId,
const TActorId& tablet,
IBlockDigestGeneratorPtr blockDigestGenerator,
ui64 commitId,
Expand All @@ -80,7 +83,7 @@ class TWriteMergedBlocksActor final
bool replyLocal,
bool shouldAddUnconfirmedBlobs,
IWriteBlocksHandlerPtr writeHandler,
bool checksumsEnabled);
ui32 blockSizeForChecksums);

void Bootstrap(const TActorContext& ctx);

Expand All @@ -98,7 +101,7 @@ class TWriteMergedBlocksActor final
void Reply(
const TActorContext& ctx,
TRequestInfo& requestInfo,
IEventBasePtr response);
IEventBasePtr response) const;

private:
STFUNC(StateWork);
Expand All @@ -123,6 +126,7 @@ class TWriteMergedBlocksActor final
////////////////////////////////////////////////////////////////////////////////

TWriteMergedBlocksActor::TWriteMergedBlocksActor(
const ui64 tabletId,
const TActorId& tablet,
IBlockDigestGeneratorPtr blockDigestGenerator,
ui64 commitId,
Expand All @@ -131,16 +135,17 @@ TWriteMergedBlocksActor::TWriteMergedBlocksActor(
bool replyLocal,
bool shouldAddUnconfirmedBlobs,
IWriteBlocksHandlerPtr writeHandler,
bool checksumsEnabled)
: Tablet(tablet)
ui32 blockSizeForChecksums)
: TabletId(tabletId)
, Tablet(tablet)
, BlockDigestGenerator(std::move(blockDigestGenerator))
, CommitId(commitId)
, RequestInfo(std::move(requestInfo))
, WriteBlobRequests(std::move(writeBlobRequests))
, ReplyLocal(replyLocal)
, ShouldAddUnconfirmedBlobs(shouldAddUnconfirmedBlobs)
, WriteHandler(std::move(writeHandler))
, ChecksumsEnabled(checksumsEnabled)
, BlockSizeForChecksums(blockSizeForChecksums)
{}

void TWriteMergedBlocksActor::Bootstrap(const TActorContext& ctx)
Expand Down Expand Up @@ -181,10 +186,6 @@ TGuardedSgList TWriteMergedBlocksActor::BuildBlobContentAndComputeChecksums(
if (digest.Defined()) {
AffectedBlockInfos.push_back({blockIndex, *digest});
}

if (ChecksumsEnabled) {
request.Checksums.push_back(ComputeDefaultDigest(block));
}
}
}
return guardedSgList;
Expand All @@ -198,7 +199,9 @@ void TWriteMergedBlocksActor::WriteBlobs(const TActorContext& ctx)

auto request = std::make_unique<TEvPartitionPrivate::TEvWriteBlobRequest>(
req.BlobId,
std::move(guardedSglist));
std::move(guardedSglist),
BlockSizeForChecksums,
false); // async

if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) {
LWTRACK(
Expand All @@ -213,7 +216,8 @@ void TWriteMergedBlocksActor::WriteBlobs(const TActorContext& ctx)
NCloud::Send(
ctx,
Tablet,
std::move(request));
std::move(request),
i);
}
}

Expand Down Expand Up @@ -245,19 +249,20 @@ void TWriteMergedBlocksActor::AddBlobs(
ADD_WRITE_RESULT
);
} else {
TVector<TBlobToConfirm> blobs(Reserve(WriteBlobRequests.size()));
BlobsToConfirm.reserve(WriteBlobRequests.size());

for (const auto& req: WriteBlobRequests) {
blobs.emplace_back(
BlobsToConfirm.emplace_back(
req.BlobId.UniqueId(),
req.WriteRange,
req.Checksums);
// checksums are not ready at this point
TVector<ui32>());
}

request = std::make_unique<TEvPartitionPrivate::TEvAddUnconfirmedBlobsRequest>(
RequestInfo->CallContext,
CommitId,
std::move(blobs));
BlobsToConfirm);
}

SafeToUseOrbit = false;
Expand All @@ -273,10 +278,18 @@ void TWriteMergedBlocksActor::NotifyCompleted(
const NProto::TError& error)
{
using TEvent = TEvPartitionPrivate::TEvWriteBlocksCompleted;
if (!BlockSizeForChecksums) {
// this structure is needed only to transfer block checksums to
// PartState - passing it without checksums will trigger a couple
// of debug asserts and is actually pointless even if we didn't have
// those asserts
BlobsToConfirm.clear();
}
auto ev = std::make_unique<TEvent>(
error,
true, // collectGarbageBarrierAcquired
UnconfirmedBlobsAdded);
UnconfirmedBlobsAdded,
std::move(BlobsToConfirm));

ev->ExecCycles = RequestInfo->GetExecCycles();
ev->TotalCycles = RequestInfo->GetTotalCycles();
Expand Down Expand Up @@ -331,7 +344,7 @@ void TWriteMergedBlocksActor::ReplyAndDie(
void TWriteMergedBlocksActor::Reply(
const TActorContext& ctx,
TRequestInfo& requestInfo,
IEventBasePtr response)
IEventBasePtr response) const
{
if (SafeToUseOrbit) {
LWTRACK(
Expand All @@ -350,20 +363,41 @@ void TWriteMergedBlocksActor::HandleWriteBlobResponse(
const TEvPartitionPrivate::TEvWriteBlobResponse::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();
auto* msg = ev->Get();

RequestInfo->AddExecCycles(msg->ExecCycles);

if (HandleError(ctx, msg->GetError())) {
return;
}

Y_ABORT_UNLESS(WriteBlobRequestsCompleted < WriteBlobRequests.size());
STORAGE_VERIFY(
ev->Cookie < WriteBlobRequests.size(),
TWellKnownEntityTypes::TABLET,
TabletId);

if (BlobsToConfirm.empty()) {
WriteBlobRequests[ev->Cookie].Checksums =
std::move(msg->BlockChecksums);
} else {
STORAGE_VERIFY(
BlobsToConfirm.size() == WriteBlobRequests.size(),
TWellKnownEntityTypes::TABLET,
TabletId);

BlobsToConfirm[ev->Cookie].Checksums = std::move(msg->BlockChecksums);
}

STORAGE_VERIFY(
WriteBlobRequestsCompleted < WriteBlobRequests.size(),
TWellKnownEntityTypes::TABLET,
TabletId);

if (++WriteBlobRequestsCompleted < WriteBlobRequests.size()) {
return;
}

for (auto context: ForkedCallContexts) {
for (const auto& context: ForkedCallContexts) {
RequestInfo->CallContext->LWOrbit.Join(context->LWOrbit);
}

Expand Down Expand Up @@ -520,6 +554,7 @@ void TPartitionActor::WriteMergedBlocks(

auto actor = NCloud::Register<TWriteMergedBlocksActor>(
ctx,
TabletID(),
SelfId(),
BlockDigestGenerator,
commitId,
Expand All @@ -528,7 +563,7 @@ void TPartitionActor::WriteMergedBlocks(
requestInBuffer.Data.ReplyLocal,
shouldAddUnconfirmedBlobs,
std::move(requestInBuffer.Data.Handler),
checksumsEnabled
checksumsEnabled ? State->GetBlockSize() : 0
);
Actors.Insert(actor);
}
Expand Down
Loading