Skip to content

Commit

Permalink
[Blockstore] support DescribeBlocks method for disks with several par…
Browse files Browse the repository at this point in the history
…titions (#662)
  • Loading branch information
BarkovBG committed Mar 27, 2024
1 parent 8b22ae9 commit c234c82
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 1 deletion.
116 changes: 116 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,120 @@ void MergeStripedBitMask(
}
}

void MergeDescribeBlocksResponse(
NProto::TDescribeBlocksResponse& src,
NProto::TDescribeBlocksResponse& dst,
const ui32 blocksPerStripe,
const ui32 blockSize,
const ui32 partitionsCount,
const ui32 partitionId)
{
for (const auto& freshBlockRange: src.GetFreshBlockRanges()) {
SplitFreshBlockRangeFromRelativeToGlobalIndices(
freshBlockRange,
blocksPerStripe,
blockSize,
partitionsCount,
partitionId,
&dst);
}

const auto& srcBlobPieces = src.GetBlobPieces();

for (const auto& blobPiece: srcBlobPieces) {
NProto::TBlobPiece dstBlobPiece;
dstBlobPiece.MutableBlobId()->CopyFrom(blobPiece.GetBlobId());
dstBlobPiece.SetBSGroupId(blobPiece.GetBSGroupId());

for (const auto& srcRange: blobPiece.GetRanges()) {
SplitBlobPieceRangeFromRelativeToGlobalIndices(
srcRange,
blocksPerStripe,
partitionsCount,
partitionId,
&dstBlobPiece);
}
dst.MutableBlobPieces()->Add(std::move(dstBlobPiece));
}
}

void SplitFreshBlockRangeFromRelativeToGlobalIndices(
const NProto::TFreshBlockRange& srcRange,
const ui32 blocksPerStripe,
const ui32 blockSize,
const ui32 partitionsCount,
const ui32 partitionId,
NProto::TDescribeBlocksResponse* dst)
{
const ui32 startIndex = srcRange.GetStartIndex();
ui32 blocksCount = 0;

const char* srcRangePtr = srcRange.GetBlocksContent().Data();
while (blocksCount < srcRange.GetBlocksCount()) {
const auto index = RelativeToGlobalIndex(
blocksPerStripe,
startIndex + blocksCount,
partitionsCount,
partitionId);

const auto stripeRange = CalculateStripeRange(blocksPerStripe, index);

const auto rangeBlocksCount = std::min(
static_cast<ui64>(stripeRange.End) - index + 1,
static_cast<ui64>(srcRange.GetBlocksCount()) - blocksCount);

NProto::TFreshBlockRange dstRange;
dstRange.SetStartIndex(index);
dstRange.SetBlocksCount(rangeBlocksCount);

const ui64 bytesCount = rangeBlocksCount * blockSize;
dstRange.MutableBlocksContent()->resize(bytesCount);
char* dstRangePtr = dstRange.MutableBlocksContent()->begin();
std::memcpy(
dstRangePtr,
srcRangePtr,
bytesCount);

srcRangePtr += bytesCount;
blocksCount += rangeBlocksCount;

dst->MutableFreshBlockRanges()->Add(std::move(dstRange));
}
}

void SplitBlobPieceRangeFromRelativeToGlobalIndices(
const NProto::TRangeInBlob& srcRange,
const ui32 blocksPerStripe,
const ui32 partitionsCount,
const ui32 partitionId,
NProto::TBlobPiece* dstBlobPiece)
{
const ui32 blobOffset = srcRange.GetBlobOffset();
const ui32 blockIndex = srcRange.GetBlockIndex();
ui32 blocksCount = 0;

while (blocksCount < srcRange.GetBlocksCount()) {
const auto index = RelativeToGlobalIndex(
blocksPerStripe,
blockIndex + blocksCount,
partitionsCount,
partitionId);

const auto stripeRange = CalculateStripeRange(blocksPerStripe, index);

const auto rangeBlocksCount = std::min(
static_cast<ui64>(stripeRange.End) - index + 1,
static_cast<ui64>(srcRange.GetBlocksCount()) - blocksCount);

NProto::TRangeInBlob dstRange;
dstRange.SetBlobOffset(blobOffset + blocksCount);
dstRange.SetBlockIndex(index);
dstRange.SetBlocksCount(rangeBlocksCount);

blocksCount += rangeBlocksCount;

dstBlobPiece->MutableRanges()->Add(std::move(dstRange));
}
}

} // namespace NCloud::NBlockStore::NStorage
25 changes: 25 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/merge.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <cloud/blockstore/libs/storage/protos_ydb/volume.pb.h>

#include <cloud/blockstore/libs/common/block_range.h>

namespace NCloud::NBlockStore::NStorage {
Expand All @@ -15,4 +17,27 @@ void MergeStripedBitMask(
const TString& srcMask,
TString& dstMask);

void MergeDescribeBlocksResponse(
NProto::TDescribeBlocksResponse& src,
NProto::TDescribeBlocksResponse& dst,
const ui32 blocksPerStripe,
const ui32 blockSize,
const ui32 partitionsCount,
const ui32 partitionId);

void SplitFreshBlockRangeFromRelativeToGlobalIndices(
const NProto::TFreshBlockRange& srcRange,
const ui32 blocksPerStripe,
const ui32 blockSize,
const ui32 partitionsCount,
const ui32 partitionId,
NProto::TDescribeBlocksResponse* dst);

void SplitBlobPieceRangeFromRelativeToGlobalIndices(
const NProto::TRangeInBlob& srcRange,
const ui32 blocksPerStripe,
const ui32 partitionsCount,
const ui32 partitionId,
NProto::TBlobPiece* dstBlobPiece);

} // namespace NCloud::NBlockStore::NStorage
65 changes: 65 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/merge_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,71 @@ Y_UNIT_TEST_SUITE(TMergeTest)
}
}

Y_UNIT_TEST(ShouldSplitFreshBlockRangeFromRelativeToGlobalIndices)
{
NProto::TFreshBlockRange freshData;
freshData.SetStartIndex(5);
freshData.SetBlocksCount(8);
freshData.MutableBlocksContent()->append(TString(1024, 'X'));

NProto::TDescribeBlocksResponse dst;
SplitFreshBlockRangeFromRelativeToGlobalIndices(
freshData,
4, // blocksPerStripe
128, // blockSize
2, // partitionsCount
0, // partitionId
&dst);

UNIT_ASSERT_VALUES_EQUAL(3, dst.FreshBlockRangesSize());

const auto& range1 = dst.GetFreshBlockRanges(0);
UNIT_ASSERT_VALUES_EQUAL(9, range1.GetStartIndex());
UNIT_ASSERT_VALUES_EQUAL(3, range1.GetBlocksCount());

const auto& range2 = dst.GetFreshBlockRanges(1);
UNIT_ASSERT_VALUES_EQUAL(16, range2.GetStartIndex());
UNIT_ASSERT_VALUES_EQUAL(4, range2.GetBlocksCount());

TString actualContent;
for (size_t i = 0; i < dst.FreshBlockRangesSize(); ++i) {
const auto& freshRange = dst.GetFreshBlockRanges(i);
actualContent += freshRange.GetBlocksContent();
}

UNIT_ASSERT_VALUES_EQUAL(1024, actualContent.size());
for (size_t i = 0; i < actualContent.size(); i++) {
UNIT_ASSERT_VALUES_EQUAL('X', actualContent[i]);
}
}

Y_UNIT_TEST(ShouldSplitBlobPieceRangeFromRelativeToGlobalIndices)
{
NProto::TRangeInBlob rangeInBlob;
rangeInBlob.SetBlobOffset(10);
rangeInBlob.SetBlockIndex(13);
rangeInBlob.SetBlocksCount(1024);

NProto::TBlobPiece dst;
SplitBlobPieceRangeFromRelativeToGlobalIndices(
rangeInBlob,
4, // blocksPerStripe
2, // partitionsCount
0, // partitionId
&dst);

UNIT_ASSERT_VALUES_EQUAL(257, dst.RangesSize());

const auto& range1 = dst.GetRanges(0);
UNIT_ASSERT_VALUES_EQUAL(10, range1.GetBlobOffset());
UNIT_ASSERT_VALUES_EQUAL(25, range1.GetBlockIndex());
UNIT_ASSERT_VALUES_EQUAL(3, range1.GetBlocksCount());

const auto& range2 = dst.GetRanges(1);
UNIT_ASSERT_VALUES_EQUAL(13, range2.GetBlobOffset());
UNIT_ASSERT_VALUES_EQUAL(32, range2.GetBlockIndex());
UNIT_ASSERT_VALUES_EQUAL(4, range2.GetBlocksCount());
}
}

} // namespace NCloud::NBlockStore::NStorage
10 changes: 10 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/stripe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ ui64 RelativeToGlobalIndex(
return stripe * blocksPerStripe + offsetInStripe;
}

TBlockRange64 CalculateStripeRange(
const ui32 blocksPerStripe,
const ui64 globalIndex)
{
const auto stripeInd = globalIndex / blocksPerStripe;
return TBlockRange64::WithLength(
stripeInd * blocksPerStripe,
blocksPerStripe);
}

ui32 CalculateRequestCount(
const ui32 blocksPerStripe,
const TBlockRange64& original,
Expand Down
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/stripe.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ ui64 RelativeToGlobalIndex(
const ui32 partitionCount,
const ui32 partitionId);

TBlockRange64 CalculateStripeRange(
const ui32 blocksPerStripe,
const ui64 globalIndex);

ui32 CalculateRequestCount(
const ui32 blocksPerStripe,
const TBlockRange64& original,
Expand Down
19 changes: 19 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/stripe_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,25 @@ Y_UNIT_TEST_SUITE(TStripeTest)
UNIT_ASSERT_VALUES_EQUAL(75, RelativeToGlobalIndex(10, 25, 3, 1));
}

Y_UNIT_TEST(ShouldCalculateStripeRange)
{
UNIT_ASSERT_VALUES_EQUAL(
DescribeRange(TBlockRange64::WithLength(0, 10)),
DescribeRange(CalculateStripeRange(10, 0)));

UNIT_ASSERT_VALUES_EQUAL(
DescribeRange(TBlockRange64::WithLength(0, 10)),
DescribeRange(CalculateStripeRange(10, 5)));

UNIT_ASSERT_VALUES_EQUAL(
DescribeRange(TBlockRange64::WithLength(10, 10)),
DescribeRange(CalculateStripeRange(10, 15)));

UNIT_ASSERT_VALUES_EQUAL(
DescribeRange(TBlockRange64::WithLength(10, 10)),
DescribeRange(CalculateStripeRange(10, 19)));
}

Y_UNIT_TEST(ShouldCalculateRequestCount)
{
UNIT_ASSERT_VALUES_EQUAL(
Expand Down
23 changes: 23 additions & 0 deletions cloud/blockstore/libs/storage/volume/partition_requests.h
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ class TPartitionRequestActor final
const ui64 VolumeRequestId;
const TBlockRange64 OriginalRange;
const ui32 BlocksPerStripe;
const ui32 BlockSize;
const ui32 PartitionsCount;
TVector<TPartitionRequest<TMethod>> PartitionRequests;
const TRequestTraceInfo TraceInfo;
Expand All @@ -695,6 +696,7 @@ class TPartitionRequestActor final
ui64 volumeRequestId,
TBlockRange64 originalRange,
ui32 blocksPerStripe,
ui32 blockSize,
ui32 partitionsCount,
TVector<TPartitionRequest<TMethod>> partitionRequests,
TRequestTraceInfo traceInfo);
Expand Down Expand Up @@ -865,6 +867,25 @@ class TPartitionRequestActor final
}
}

void Merge(
NProto::TDescribeBlocksResponse& src,
ui32 requestNo,
NProto::TDescribeBlocksResponse& dst)
{
if (FAILED(src.GetError().GetCode())) {
*dst.MutableError() = std::move(*src.MutableError());
return;
}

MergeDescribeBlocksResponse(
src,
dst,
BlocksPerStripe,
BlockSize,
PartitionsCount,
PartitionRequests[requestNo].PartitionId);
}

template <typename T>
void Merge(
T& src,
Expand Down Expand Up @@ -908,6 +929,7 @@ TPartitionRequestActor<TMethod>::TPartitionRequestActor(
ui64 volumeRequestId,
TBlockRange64 originalRange,
ui32 blocksPerStripe,
ui32 blockSize,
ui32 partitionsCount,
TVector<TPartitionRequest<TMethod>> partitionRequests,
TRequestTraceInfo traceInfo)
Expand All @@ -916,6 +938,7 @@ TPartitionRequestActor<TMethod>::TPartitionRequestActor(
, VolumeRequestId(volumeRequestId)
, OriginalRange(originalRange)
, BlocksPerStripe(blocksPerStripe)
, BlockSize(blockSize)
, PartitionsCount(partitionsCount)
, PartitionRequests(std::move(partitionRequests))
, TraceInfo(std::move(traceInfo))
Expand Down
13 changes: 12 additions & 1 deletion cloud/blockstore/libs/storage/volume/volume_actor_forward.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

template <typename T>
constexpr bool IsDescribeBlocksMethod =
std::is_same_v<T, TEvVolume::TDescribeBlocksMethod>;

////////////////////////////////////////////////////////////////////////////////

template <typename TMethod>
bool CanForwardToPartition(ui32 partitionCount)
{
Expand Down Expand Up @@ -127,7 +133,11 @@ bool TVolumeActor::HandleRequest(
return false;
}

if (partitionRequests.size() == 1) {
// Should always forward request via TPartitionRequestActor for
// DesribeBlocks method and multi-partitioned volume.
if (State->GetPartitions().size() == 1 ||
(partitionRequests.size() == 1 && !IsDescribeBlocksMethod<TMethod>))
{
ev->Get()->Record = std::move(partitionRequests.front().Event->Record);
SendRequestToPartition<TMethod>(
ctx,
Expand Down Expand Up @@ -166,6 +176,7 @@ bool TVolumeActor::HandleRequest(
volumeRequestId,
blockRange,
blocksPerStripe,
State->GetBlockSize(),
State->GetPartitions().size(),
std::move(partitionRequests),
TRequestTraceInfo(isTraced, traceTs, TraceSerializer));
Expand Down
Loading

0 comments on commit c234c82

Please sign in to comment.