Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Blockstore] support DescribeBlocks method for disks with several partitions #662

Merged
merged 3 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,120 @@ void MergeStripedBitMask(
}
}

void MergeDescribeBlocksResponse(
BarkovBG marked this conversation as resolved.
Show resolved Hide resolved
NProto::TDescribeBlocksResponse& src,
NProto::TDescribeBlocksResponse& dst,
const ui32 blocksPerStripe,
const ui32 blockSize,
const ui32 partitionsCount,
const ui32 partitionId)
{
for (const auto& freshBlockRange: src.GetFreshBlockRanges()) {
SplitFreshBlockRangeFromRelativeToGlobalIndices(
freshBlockRange,
blocksPerStripe,
blockSize,
partitionsCount,
partitionId,
&dst);
}

const auto& srcBlobPieces = src.GetBlobPieces();

for (const auto& blobPiece: srcBlobPieces) {
NProto::TBlobPiece dstBlobPiece;
dstBlobPiece.MutableBlobId()->CopyFrom(blobPiece.GetBlobId());
dstBlobPiece.SetBSGroupId(blobPiece.GetBSGroupId());

for (const auto& srcRange: blobPiece.GetRanges()) {
SplitBlobPieceRangeFromRelativeToGlobalIndices(
srcRange,
blocksPerStripe,
partitionsCount,
partitionId,
&dstBlobPiece);
}
dst.MutableBlobPieces()->Add(std::move(dstBlobPiece));
}
}

void SplitFreshBlockRangeFromRelativeToGlobalIndices(
const NProto::TFreshBlockRange& srcRange,
const ui32 blocksPerStripe,
const ui32 blockSize,
const ui32 partitionsCount,
const ui32 partitionId,
NProto::TDescribeBlocksResponse* dst)
{
const ui32 startIndex = srcRange.GetStartIndex();
ui32 blocksCount = 0;

const char* srcRangePtr = srcRange.GetBlocksContent().Data();
while (blocksCount < srcRange.GetBlocksCount()) {
const auto index = RelativeToGlobalIndex(
blocksPerStripe,
startIndex + blocksCount,
partitionsCount,
partitionId);

const auto stripeRange = CalculateStripeRange(blocksPerStripe, index);

const auto rangeBlocksCount = std::min(
static_cast<ui64>(stripeRange.End) - index + 1,
static_cast<ui64>(srcRange.GetBlocksCount()) - blocksCount);

NProto::TFreshBlockRange dstRange;
dstRange.SetStartIndex(index);
dstRange.SetBlocksCount(rangeBlocksCount);

const ui64 bytesCount = rangeBlocksCount * blockSize;
dstRange.MutableBlocksContent()->resize(bytesCount);
char* dstRangePtr = dstRange.MutableBlocksContent()->begin();
std::memcpy(
dstRangePtr,
srcRangePtr,
bytesCount);

srcRangePtr += bytesCount;
blocksCount += rangeBlocksCount;

dst->MutableFreshBlockRanges()->Add(std::move(dstRange));
}
}

void SplitBlobPieceRangeFromRelativeToGlobalIndices(
const NProto::TRangeInBlob& srcRange,
const ui32 blocksPerStripe,
const ui32 partitionsCount,
const ui32 partitionId,
NProto::TBlobPiece* dstBlobPiece)
{
const ui32 blobOffset = srcRange.GetBlobOffset();
const ui32 blockIndex = srcRange.GetBlockIndex();
ui32 blocksCount = 0;

while (blocksCount < srcRange.GetBlocksCount()) {
const auto index = RelativeToGlobalIndex(
blocksPerStripe,
blockIndex + blocksCount,
partitionsCount,
partitionId);

const auto stripeRange = CalculateStripeRange(blocksPerStripe, index);

const auto rangeBlocksCount = std::min(
static_cast<ui64>(stripeRange.End) - index + 1,
static_cast<ui64>(srcRange.GetBlocksCount()) - blocksCount);

NProto::TRangeInBlob dstRange;
dstRange.SetBlobOffset(blobOffset + blocksCount);
dstRange.SetBlockIndex(index);
dstRange.SetBlocksCount(rangeBlocksCount);

blocksCount += rangeBlocksCount;

dstBlobPiece->MutableRanges()->Add(std::move(dstRange));
}
}

} // namespace NCloud::NBlockStore::NStorage
25 changes: 25 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/merge.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <cloud/blockstore/libs/storage/protos_ydb/volume.pb.h>

#include <cloud/blockstore/libs/common/block_range.h>

namespace NCloud::NBlockStore::NStorage {
Expand All @@ -15,4 +17,27 @@ void MergeStripedBitMask(
const TString& srcMask,
TString& dstMask);

void MergeDescribeBlocksResponse(
NProto::TDescribeBlocksResponse& src,
NProto::TDescribeBlocksResponse& dst,
const ui32 blocksPerStripe,
const ui32 blockSize,
const ui32 partitionsCount,
const ui32 partitionId);

void SplitFreshBlockRangeFromRelativeToGlobalIndices(
const NProto::TFreshBlockRange& srcRange,
const ui32 blocksPerStripe,
const ui32 blockSize,
const ui32 partitionsCount,
const ui32 partitionId,
NProto::TDescribeBlocksResponse* dst);

void SplitBlobPieceRangeFromRelativeToGlobalIndices(
const NProto::TRangeInBlob& srcRange,
const ui32 blocksPerStripe,
const ui32 partitionsCount,
const ui32 partitionId,
NProto::TBlobPiece* dstBlobPiece);

} // namespace NCloud::NBlockStore::NStorage
65 changes: 65 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/merge_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,71 @@ Y_UNIT_TEST_SUITE(TMergeTest)
}
}

Y_UNIT_TEST(ShouldSplitFreshBlockRangeFromRelativeToGlobalIndices)
{
NProto::TFreshBlockRange freshData;
freshData.SetStartIndex(5);
freshData.SetBlocksCount(8);
freshData.MutableBlocksContent()->append(TString(1024, 'X'));

NProto::TDescribeBlocksResponse dst;
SplitFreshBlockRangeFromRelativeToGlobalIndices(
freshData,
4, // blocksPerStripe
128, // blockSize
2, // partitionsCount
0, // partitionId
&dst);

UNIT_ASSERT_VALUES_EQUAL(3, dst.FreshBlockRangesSize());

const auto& range1 = dst.GetFreshBlockRanges(0);
UNIT_ASSERT_VALUES_EQUAL(9, range1.GetStartIndex());
UNIT_ASSERT_VALUES_EQUAL(3, range1.GetBlocksCount());

const auto& range2 = dst.GetFreshBlockRanges(1);
UNIT_ASSERT_VALUES_EQUAL(16, range2.GetStartIndex());
UNIT_ASSERT_VALUES_EQUAL(4, range2.GetBlocksCount());

TString actualContent;
for (size_t i = 0; i < dst.FreshBlockRangesSize(); ++i) {
const auto& freshRange = dst.GetFreshBlockRanges(i);
actualContent += freshRange.GetBlocksContent();
}

UNIT_ASSERT_VALUES_EQUAL(1024, actualContent.size());
for (size_t i = 0; i < actualContent.size(); i++) {
UNIT_ASSERT_VALUES_EQUAL('X', actualContent[i]);
}
}

Y_UNIT_TEST(ShouldSplitBlobPieceRangeFromRelativeToGlobalIndices)
{
NProto::TRangeInBlob rangeInBlob;
rangeInBlob.SetBlobOffset(10);
rangeInBlob.SetBlockIndex(13);
rangeInBlob.SetBlocksCount(1024);

NProto::TBlobPiece dst;
SplitBlobPieceRangeFromRelativeToGlobalIndices(
rangeInBlob,
4, // blocksPerStripe
2, // partitionsCount
0, // partitionId
&dst);

UNIT_ASSERT_VALUES_EQUAL(257, dst.RangesSize());

const auto& range1 = dst.GetRanges(0);
UNIT_ASSERT_VALUES_EQUAL(10, range1.GetBlobOffset());
UNIT_ASSERT_VALUES_EQUAL(25, range1.GetBlockIndex());
UNIT_ASSERT_VALUES_EQUAL(3, range1.GetBlocksCount());

const auto& range2 = dst.GetRanges(1);
UNIT_ASSERT_VALUES_EQUAL(13, range2.GetBlobOffset());
UNIT_ASSERT_VALUES_EQUAL(32, range2.GetBlockIndex());
UNIT_ASSERT_VALUES_EQUAL(4, range2.GetBlocksCount());
}
}

} // namespace NCloud::NBlockStore::NStorage
10 changes: 10 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/stripe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ ui64 RelativeToGlobalIndex(
return stripe * blocksPerStripe + offsetInStripe;
}

TBlockRange64 CalculateStripeRange(
const ui32 blocksPerStripe,
const ui64 globalIndex)
BarkovBG marked this conversation as resolved.
Show resolved Hide resolved
{
const auto stripeInd = globalIndex / blocksPerStripe;
return TBlockRange64::WithLength(
stripeInd * blocksPerStripe,
blocksPerStripe);
}

ui32 CalculateRequestCount(
const ui32 blocksPerStripe,
const TBlockRange64& original,
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/stripe.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ ui64 RelativeToGlobalIndex(
const ui32 partitionCount,
const ui32 partitionId);

TBlockRange64 CalculateStripeRange(
const ui32 blocksPerStripe,
const ui64 globalIndex);

ui32 CalculateRequestCount(
const ui32 blocksPerStripe,
const TBlockRange64& original,
Expand Down
19 changes: 19 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/stripe_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,25 @@ Y_UNIT_TEST_SUITE(TStripeTest)
UNIT_ASSERT_VALUES_EQUAL(75, RelativeToGlobalIndex(10, 25, 3, 1));
}

Y_UNIT_TEST(ShouldCalculateStripeRange)
{
UNIT_ASSERT_VALUES_EQUAL(
DescribeRange(TBlockRange64::WithLength(0, 10)),
DescribeRange(CalculateStripeRange(10, 0)));

UNIT_ASSERT_VALUES_EQUAL(
DescribeRange(TBlockRange64::WithLength(0, 10)),
DescribeRange(CalculateStripeRange(10, 5)));

UNIT_ASSERT_VALUES_EQUAL(
DescribeRange(TBlockRange64::WithLength(10, 10)),
DescribeRange(CalculateStripeRange(10, 15)));

UNIT_ASSERT_VALUES_EQUAL(
DescribeRange(TBlockRange64::WithLength(10, 10)),
DescribeRange(CalculateStripeRange(10, 19)));
}

Y_UNIT_TEST(ShouldCalculateRequestCount)
{
UNIT_ASSERT_VALUES_EQUAL(
Expand Down
23 changes: 23 additions & 0 deletions cloud/blockstore/libs/storage/volume/partition_requests.h
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ class TPartitionRequestActor final
const ui64 VolumeRequestId;
const TBlockRange64 OriginalRange;
const ui32 BlocksPerStripe;
const ui32 BlockSize;
const ui32 PartitionsCount;
TVector<TPartitionRequest<TMethod>> PartitionRequests;
const TRequestTraceInfo TraceInfo;
Expand All @@ -695,6 +696,7 @@ class TPartitionRequestActor final
ui64 volumeRequestId,
TBlockRange64 originalRange,
ui32 blocksPerStripe,
ui32 blockSize,
ui32 partitionsCount,
TVector<TPartitionRequest<TMethod>> partitionRequests,
TRequestTraceInfo traceInfo);
Expand Down Expand Up @@ -865,6 +867,25 @@ class TPartitionRequestActor final
}
}

void Merge(
NProto::TDescribeBlocksResponse& src,
ui32 requestNo,
NProto::TDescribeBlocksResponse& dst)
{
if (FAILED(src.GetError().GetCode())) {
*dst.MutableError() = std::move(*src.MutableError());
return;
}

MergeDescribeBlocksResponse(
src,
dst,
BlocksPerStripe,
BlockSize,
PartitionsCount,
PartitionRequests[requestNo].PartitionId);
}

template <typename T>
void Merge(
T& src,
Expand Down Expand Up @@ -908,6 +929,7 @@ TPartitionRequestActor<TMethod>::TPartitionRequestActor(
ui64 volumeRequestId,
TBlockRange64 originalRange,
ui32 blocksPerStripe,
ui32 blockSize,
ui32 partitionsCount,
TVector<TPartitionRequest<TMethod>> partitionRequests,
TRequestTraceInfo traceInfo)
Expand All @@ -916,6 +938,7 @@ TPartitionRequestActor<TMethod>::TPartitionRequestActor(
, VolumeRequestId(volumeRequestId)
, OriginalRange(originalRange)
, BlocksPerStripe(blocksPerStripe)
, BlockSize(blockSize)
, PartitionsCount(partitionsCount)
, PartitionRequests(std::move(partitionRequests))
, TraceInfo(std::move(traceInfo))
Expand Down
13 changes: 12 additions & 1 deletion cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

template <typename T>
constexpr bool IsDescribeBlocksMethod =
std::is_same_v<T, TEvVolume::TDescribeBlocksMethod>;

////////////////////////////////////////////////////////////////////////////////

template <typename TMethod>
bool CanForwardToPartition(ui32 partitionCount)
{
Expand Down Expand Up @@ -127,7 +133,11 @@ bool TVolumeActor::HandleRequest(
return false;
}

if (partitionRequests.size() == 1) {
// Should always forward request via TPartitionRequestActor for
// DesribeBlocks method and multi-partitioned volume.
if (State->GetPartitions().size() == 1 ||
(partitionRequests.size() == 1 && !IsDescribeBlocksMethod<TMethod>))
{
ev->Get()->Record = std::move(partitionRequests.front().Event->Record);
SendRequestToPartition<TMethod>(
ctx,
Expand Down Expand Up @@ -166,6 +176,7 @@ bool TVolumeActor::HandleRequest(
volumeRequestId,
blockRange,
blocksPerStripe,
State->GetBlockSize(),
State->GetPartitions().size(),
std::move(partitionRequests),
TRequestTraceInfo(isTraced, traceTs, TraceSerializer));
Expand Down
Loading
Loading