Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-196: block checksums should be calculated after we copy the data from the client's buffer to our buffer, otherwise our client may modify the data between the checksum calculation stage and the writeblob stage #825

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ void TCompactionActor::WriteBlobs(const TActorContext& ctx)
auto request = std::make_unique<TEvPartitionPrivate::TEvWriteBlobRequest>(
rc.DataBlobId,
rc.BlobContent.GetGuardedSgList(),
0, // blockSizeForChecksums
true); // async

if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ void TFlushActor::WriteBlobs(const TActorContext& ctx)
auto request = std::make_unique<TEvPartitionPrivate::TEvWriteBlobRequest>(
req.BlobId,
req.BlobContent.GetGuardedSgList(),
0, // blockSizeForChecksums
true); // async

if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) {
Expand Down
24 changes: 23 additions & 1 deletion cloud/blockstore/libs/storage/partition/part_actor_writeblob.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "part_actor.h"

#include <cloud/blockstore/libs/diagnostics/block_digest.h>
#include <cloud/blockstore/libs/diagnostics/critical_events.h>
#include <cloud/blockstore/libs/storage/core/probes.h>
#include <cloud/blockstore/libs/storage/partition/model/fresh_blob.h>
Expand Down Expand Up @@ -42,6 +43,7 @@ class TWriteBlobActor final
TInstant ResponseReceived;
TStorageStatusFlags StorageStatusFlags;
double ApproximateFreeSpaceShare = 0;
TVector<ui32> BlockChecksums;

public:
TWriteBlobActor(
Expand Down Expand Up @@ -141,7 +143,26 @@ void TWriteBlobActor::SendPutRequest(const TActorContext& ctx)
blobContent = std::move(std::get<TString>(Request->Data));
}

Y_ABORT_UNLESS(!blobContent.Empty());
STORAGE_VERIFY(
!blobContent.Empty(),
TWellKnownEntityTypes::TABLET,
TabletId);

if (Request->BlockSizeForChecksums) {
STORAGE_VERIFY(
blobContent.Size() % Request->BlockSizeForChecksums == 0,
TWellKnownEntityTypes::TABLET,
TabletId);

ui32 offset = 0;
while (offset < blobContent.Size()) {
BlockChecksums.push_back(ComputeDefaultDigest({
blobContent.Data() + offset,
Request->BlockSizeForChecksums}));

offset += Request->BlockSizeForChecksums;
}
}

auto request = std::make_unique<TEvBlobStorage::TEvPut>(
MakeBlobId(TabletId, Request->BlobId),
Expand Down Expand Up @@ -242,6 +263,7 @@ void TWriteBlobActor::HandlePutResult(

auto response = std::make_unique<TResponse>();
response->ExecCycles = RequestInfo->GetExecCycles();
response->BlockChecksums = std::move(BlockChecksums);

ReplyAndDie(ctx, std::move(response));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void TPartitionActor::HandleWriteBlocksCompleted(
if (msg->UnconfirmedBlobsAdded) {
// blobs are confirmed, but AddBlobs request will be executed
// (for this commit) later
State->BlobsConfirmed(commitId);
State->BlobsConfirmed(commitId, std::move(msg->BlobsToConfirm));
Y_DEBUG_ABORT_UNLESS(msg->CollectGarbageBarrierAcquired);
// commit & garbage queue barriers will be released when confirmed
// blobs are added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ void TWriteFreshBlocksActor::WriteBlob(const TActorContext& ctx)
CombinedContext,
blobId,
std::move(BlobContent),
false); // async
0, // blockSizeForChecksums
false); // async

NCloud::Send(
ctx,
Expand Down Expand Up @@ -251,8 +252,10 @@ void TWriteFreshBlocksActor::NotifyCompleted(
using TEvent = TEvPartitionPrivate::TEvWriteBlocksCompleted;
auto ev = std::make_unique<TEvent>(
error,
false, // collectGarbageBarrierAcquired
false); // unconfirmedBlobsAdded
false, // collectGarbageBarrierAcquired
false, // unconfirmedBlobsAdded
TVector<TBlobToConfirm>{} // blobsToConfirm
);

ev->ExecCycles = Requests.front().RequestInfo->GetExecCycles();
ev->TotalCycles = Requests.front().RequestInfo->GetTotalCycles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ class TWriteMergedBlocksActor final
};

private:
const ui64 TabletId;
const TActorId Tablet;
const IBlockDigestGeneratorPtr BlockDigestGenerator;
const ui64 CommitId;
const TRequestInfoPtr RequestInfo;
TVector<TWriteBlobRequest> WriteBlobRequests;
TVector<TBlobToConfirm> BlobsToConfirm;
const bool ReplyLocal;
const bool ShouldAddUnconfirmedBlobs = false;
const IWriteBlocksHandlerPtr WriteHandler;
const bool ChecksumsEnabled;
const ui32 BlockSizeForChecksums;

TVector<IProfileLog::TBlockInfo> AffectedBlockInfos;
size_t WriteBlobRequestsCompleted = 0;
Expand All @@ -72,6 +74,7 @@ class TWriteMergedBlocksActor final

public:
TWriteMergedBlocksActor(
const ui64 tabletId,
const TActorId& tablet,
IBlockDigestGeneratorPtr blockDigestGenerator,
ui64 commitId,
Expand All @@ -80,7 +83,7 @@ class TWriteMergedBlocksActor final
bool replyLocal,
bool shouldAddUnconfirmedBlobs,
IWriteBlocksHandlerPtr writeHandler,
bool checksumsEnabled);
ui32 blockSizeForChecksums);

void Bootstrap(const TActorContext& ctx);

Expand All @@ -98,7 +101,7 @@ class TWriteMergedBlocksActor final
void Reply(
const TActorContext& ctx,
TRequestInfo& requestInfo,
IEventBasePtr response);
IEventBasePtr response) const;

private:
STFUNC(StateWork);
Expand All @@ -123,6 +126,7 @@ class TWriteMergedBlocksActor final
////////////////////////////////////////////////////////////////////////////////

TWriteMergedBlocksActor::TWriteMergedBlocksActor(
const ui64 tabletId,
const TActorId& tablet,
IBlockDigestGeneratorPtr blockDigestGenerator,
ui64 commitId,
Expand All @@ -131,16 +135,17 @@ TWriteMergedBlocksActor::TWriteMergedBlocksActor(
bool replyLocal,
bool shouldAddUnconfirmedBlobs,
IWriteBlocksHandlerPtr writeHandler,
bool checksumsEnabled)
: Tablet(tablet)
ui32 blockSizeForChecksums)
: TabletId(tabletId)
, Tablet(tablet)
, BlockDigestGenerator(std::move(blockDigestGenerator))
, CommitId(commitId)
, RequestInfo(std::move(requestInfo))
, WriteBlobRequests(std::move(writeBlobRequests))
, ReplyLocal(replyLocal)
, ShouldAddUnconfirmedBlobs(shouldAddUnconfirmedBlobs)
, WriteHandler(std::move(writeHandler))
, ChecksumsEnabled(checksumsEnabled)
, BlockSizeForChecksums(blockSizeForChecksums)
{}

void TWriteMergedBlocksActor::Bootstrap(const TActorContext& ctx)
Expand Down Expand Up @@ -181,10 +186,6 @@ TGuardedSgList TWriteMergedBlocksActor::BuildBlobContentAndComputeChecksums(
if (digest.Defined()) {
AffectedBlockInfos.push_back({blockIndex, *digest});
}

if (ChecksumsEnabled) {
request.Checksums.push_back(ComputeDefaultDigest(block));
}
}
}
return guardedSgList;
Expand All @@ -198,7 +199,9 @@ void TWriteMergedBlocksActor::WriteBlobs(const TActorContext& ctx)

auto request = std::make_unique<TEvPartitionPrivate::TEvWriteBlobRequest>(
req.BlobId,
std::move(guardedSglist));
std::move(guardedSglist),
BlockSizeForChecksums,
false); // async

if (!RequestInfo->CallContext->LWOrbit.Fork(request->CallContext->LWOrbit)) {
LWTRACK(
Expand All @@ -213,7 +216,8 @@ void TWriteMergedBlocksActor::WriteBlobs(const TActorContext& ctx)
NCloud::Send(
ctx,
Tablet,
std::move(request));
std::move(request),
i);
}
}

Expand Down Expand Up @@ -245,19 +249,20 @@ void TWriteMergedBlocksActor::AddBlobs(
ADD_WRITE_RESULT
);
} else {
TVector<TBlobToConfirm> blobs(Reserve(WriteBlobRequests.size()));
BlobsToConfirm.reserve(WriteBlobRequests.size());

for (const auto& req: WriteBlobRequests) {
blobs.emplace_back(
BlobsToConfirm.emplace_back(
req.BlobId.UniqueId(),
req.WriteRange,
req.Checksums);
// checksums are not ready at this point
TVector<ui32>());
}

request = std::make_unique<TEvPartitionPrivate::TEvAddUnconfirmedBlobsRequest>(
RequestInfo->CallContext,
CommitId,
std::move(blobs));
BlobsToConfirm);
}

SafeToUseOrbit = false;
Expand All @@ -273,10 +278,18 @@ void TWriteMergedBlocksActor::NotifyCompleted(
const NProto::TError& error)
{
using TEvent = TEvPartitionPrivate::TEvWriteBlocksCompleted;
if (!BlockSizeForChecksums) {
// this structure is needed only to transfer block checksums to
// PartState - passing it without checksums will trigger a couple
// of debug asserts and is actually pointless even if we didn't have
// those asserts
BlobsToConfirm.clear();
}
auto ev = std::make_unique<TEvent>(
error,
true, // collectGarbageBarrierAcquired
UnconfirmedBlobsAdded);
UnconfirmedBlobsAdded,
std::move(BlobsToConfirm));

ev->ExecCycles = RequestInfo->GetExecCycles();
ev->TotalCycles = RequestInfo->GetTotalCycles();
Expand Down Expand Up @@ -331,7 +344,7 @@ void TWriteMergedBlocksActor::ReplyAndDie(
void TWriteMergedBlocksActor::Reply(
const TActorContext& ctx,
TRequestInfo& requestInfo,
IEventBasePtr response)
IEventBasePtr response) const
{
if (SafeToUseOrbit) {
LWTRACK(
Expand All @@ -350,20 +363,41 @@ void TWriteMergedBlocksActor::HandleWriteBlobResponse(
const TEvPartitionPrivate::TEvWriteBlobResponse::TPtr& ev,
const TActorContext& ctx)
{
const auto* msg = ev->Get();
auto* msg = ev->Get();

RequestInfo->AddExecCycles(msg->ExecCycles);

if (HandleError(ctx, msg->GetError())) {
return;
}

Y_ABORT_UNLESS(WriteBlobRequestsCompleted < WriteBlobRequests.size());
STORAGE_VERIFY(
ev->Cookie < WriteBlobRequests.size(),
TWellKnownEntityTypes::TABLET,
TabletId);

if (BlobsToConfirm.empty()) {
WriteBlobRequests[ev->Cookie].Checksums =
std::move(msg->BlockChecksums);
} else {
STORAGE_VERIFY(
BlobsToConfirm.size() == WriteBlobRequests.size(),
TWellKnownEntityTypes::TABLET,
TabletId);

BlobsToConfirm[ev->Cookie].Checksums = std::move(msg->BlockChecksums);
}

STORAGE_VERIFY(
WriteBlobRequestsCompleted < WriteBlobRequests.size(),
TWellKnownEntityTypes::TABLET,
TabletId);

if (++WriteBlobRequestsCompleted < WriteBlobRequests.size()) {
return;
}

for (auto context: ForkedCallContexts) {
for (const auto& context: ForkedCallContexts) {
RequestInfo->CallContext->LWOrbit.Join(context->LWOrbit);
}

Expand Down Expand Up @@ -520,6 +554,7 @@ void TPartitionActor::WriteMergedBlocks(

auto actor = NCloud::Register<TWriteMergedBlocksActor>(
ctx,
TabletID(),
SelfId(),
BlockDigestGenerator,
commitId,
Expand All @@ -528,7 +563,7 @@ void TPartitionActor::WriteMergedBlocks(
requestInBuffer.Data.ReplyLocal,
shouldAddUnconfirmedBlobs,
std::move(requestInBuffer.Data.Handler),
checksumsEnabled
checksumsEnabled ? State->GetBlockSize() : 0
qkrorlqr marked this conversation as resolved.
Show resolved Hide resolved
);
Actors.Insert(actor);
}
Expand Down
Loading