Skip to content

Commit

Permalink
NBSNEBIUS-148: implement throttling for fast data path
Browse files Browse the repository at this point in the history
  • Loading branch information
budevg committed Mar 28, 2024
1 parent 9acbbb0 commit 3aa34cf
Show file tree
Hide file tree
Showing 16 changed files with 501 additions and 21 deletions.
11 changes: 11 additions & 0 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,17 @@ class TExternalVhostEndpointListener final
args.emplace_back("--read-only");
}

if (epType == EEndpointType::Rdma) {
const auto& profile = volume.GetPerformanceProfile();
args.insert(args.end(), {
"--perf-profile", TStringBuilder()
<< profile.GetMaxReadBandwidth() << ":"
<< profile.GetMaxReadIops() << ":"
<< profile.GetMaxWriteBandwidth() << ":"
<< profile.GetMaxWriteIops()
});
}

TVector<TString> cgroups(
request.GetClientCGroups().begin(),
request.GetClientCGroups().end()
Expand Down
15 changes: 13 additions & 2 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ struct TFixture
volume.SetStorageMediaKind(NProto::STORAGE_MEDIA_SSD_NONREPLICATED);
volume.SetIsFastPathEnabled(true);

auto* profile = volume.MutablePerformanceProfile();
profile->SetMaxReadBandwidth(1111);
profile->SetMaxReadIops(2222);
profile->SetMaxWriteBandwidth(3333);
profile->SetMaxWriteIops(4444);

{
auto* device = volume.AddDevices();
device->SetDeviceName("/dev/disk/by-path/pci-0000:00:16.0-sas-phy2-lun-0");
Expand Down Expand Up @@ -477,10 +483,11 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest)
--device ... 2
--device ... 2
--read-only 1
19
--perf-profile 2
21
*/

UNIT_ASSERT_VALUES_EQUAL(19, create->CmdArgs.size());
UNIT_ASSERT_VALUES_EQUAL(21, create->CmdArgs.size());
UNIT_ASSERT_VALUES_EQUAL("local0", GetArg(create->CmdArgs, "--serial"));

UNIT_ASSERT_VALUES_EQUAL(
Expand All @@ -497,6 +504,10 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest)

UNIT_ASSERT_VALUES_EQUAL("4096", GetArg(create->CmdArgs, "--block-size"));

UNIT_ASSERT_VALUES_EQUAL(
"1111:2222:3333:4444",
GetArg(create->CmdArgs, "--perf-profile"));

UNIT_ASSERT(FindPtr(create->CmdArgs, "--read-only"));

auto devices = GetArgN(create->CmdArgs, "--device");
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/vhost-server/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct IBackend: public IStartable
virtual ~IBackend() = default;

virtual vhd_bdev_info Init(const TOptions& options) = 0;
virtual void PrepareStop() = 0;
virtual void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/vhost-server/backend_aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class TAioBackend final: public IBackend

vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void PrepareStop() override;
void Stop() override;
void ProcessQueue(
ui32 queueIndex,
Expand Down Expand Up @@ -278,6 +279,10 @@ void TAioBackend::Start()
CompletionThread = std::thread([this] { CompletionThreadFunc(); });
}

void TAioBackend::PrepareStop()
{
}

void TAioBackend::Stop()
{
STORAGE_INFO("Stopping AIO backend");
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/vhost-server/backend_null.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class TNullBackend final: public IBackend

vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void PrepareStop() override;
void Stop() override;
void ProcessQueue(
ui32 queueIndex,
Expand Down Expand Up @@ -60,6 +61,10 @@ vhd_bdev_info TNullBackend::Init(const TOptions& options)
void TNullBackend::Start()
{}

void TNullBackend::PrepareStop()
{
}

void TNullBackend::Stop()
{}

Expand Down
90 changes: 71 additions & 19 deletions cloud/blockstore/vhost-server/backend_rdma.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "backend_rdma.h"

#include "backend.h"
#include "throttler.h"

#include <cloud/blockstore/libs/client/config.h>
#include <cloud/blockstore/libs/client/durable.h>
Expand Down Expand Up @@ -36,6 +37,7 @@ namespace {
////////////////////////////////////////////////////////////////////////////////

constexpr ui32 REQUEST_TIMEOUT_MSEC = 86400000;
constexpr TDuration THROTTLE_DELAY = TDuration::MicroSeconds(100);

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -132,12 +134,14 @@ class TRdmaBackend final: public IBackend
bool ReadOnly = false;
ui32 BlockSize = 0;
ui32 SectorsToBlockShift = 0;
TVector<IThrottlerPtr> Throttlers;

public:
explicit TRdmaBackend(ILoggingServicePtr logging);

vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void PrepareStop() override;
void Stop() override;
void ProcessQueue(
ui32 queueIndex,
Expand All @@ -153,6 +157,9 @@ class TRdmaBackend final: public IBackend
TCpuCycles startCycles,
bool isError);
IBlockStorePtr CreateDataClient(IStoragePtr storage);
void ProcessThrottledRequests(ui32 queueIndex, TSimpleStats& queueStats);

void ProcessIo(struct vhd_io *io, TSimpleStats& queueStats);
};

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -171,6 +178,15 @@ vhd_bdev_info TRdmaBackend::Init(const TOptions& options)
Scheduler = CreateScheduler();
Timer = CreateWallClockTimer();

for (ui32 i = 0; i < options.QueueCount; i++) {
Throttlers.push_back(CreateThrottler(
Timer,
options.MaxReadBandwidth / options.QueueCount,
options.MaxReadIops / options.QueueCount,
options.MaxWriteBandwidth / options.QueueCount,
options.MaxWriteIops / options.QueueCount));
}

ClientId = options.ClientId;
ReadOnly = options.ReadOnly;

Expand Down Expand Up @@ -293,6 +309,13 @@ void TRdmaBackend::Start()
DataClient = CreateDataClient(std::move(storage));
}

void TRdmaBackend::PrepareStop()
{
for (auto& throttler : Throttlers) {
throttler->Stop();
}
}

void TRdmaBackend::Stop()
{
STORAGE_INFO("Stopping RDMA backend");
Expand All @@ -306,31 +329,60 @@ void TRdmaBackend::ProcessQueue(
vhd_request_queue* queue,
TSimpleStats& queueStats)
{
Y_UNUSED(queueIndex);
auto& throttler = Throttlers[queueIndex];

vhd_request req;
while (vhd_dequeue_request(queue, &req)) {

++queueStats.Dequeued;

struct vhd_bdev_io* bio = vhd_get_bdev_io(req.io);
const TCpuCycles now = GetCycleCount();
switch (bio->type) {
case VHD_BDEV_READ:
ProcessReadRequest(req.io, now);
++queueStats.Submitted;
break;
case VHD_BDEV_WRITE:
ProcessWriteRequest(req.io, now);
++queueStats.Submitted;
break;
default:
STORAGE_ERROR(
"Unexpected vhost request type: "
<< static_cast<int>(bio->type));
vhd_complete_bio(req.io, VHD_BDEV_IOERR);
++queueStats.SubFailed;
break;
if (throttler->ThrottleIo(req.io)) {
continue;
}

ProcessIo(req.io, queueStats);
}

ProcessThrottledRequests(queueIndex, queueStats);
}

void TRdmaBackend::ProcessThrottledRequests(
ui32 queueIndex,
TSimpleStats& queueStats)
{
auto& throttler = Throttlers[queueIndex];

while (throttler->HasThrottledIos()) {
auto* io = throttler->ResumeNextThrottledIo();
if (!io) {
::Sleep(THROTTLE_DELAY);
continue;
}

ProcessIo(io, queueStats);
}
}

void TRdmaBackend::ProcessIo(struct vhd_io *io, TSimpleStats& queueStats)
{
struct vhd_bdev_io* bio = vhd_get_bdev_io(io);
const TCpuCycles now = GetCycleCount();
switch (bio->type) {
case VHD_BDEV_READ:
ProcessReadRequest(io, now);
++queueStats.Submitted;
break;
case VHD_BDEV_WRITE:
ProcessWriteRequest(io, now);
++queueStats.Submitted;
break;
default:
STORAGE_ERROR(
"Unexpected vhost request type: "
<< static_cast<int>(bio->type));
vhd_complete_bio(io, VHD_BDEV_IOERR);
++queueStats.SubFailed;
break;
}
}

Expand Down
18 changes: 18 additions & 0 deletions cloud/blockstore/vhost-server/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ void TOptions::Parse(int argc, char** argv)
.RequiredArgument("INT")
.StoreResultDef(&RdmaClient.MaxBufferSize);

opts.AddLongOption(
"perf-profile",
"Performance profile "
"(<MaxReadBw>:<MaxReadIops>:<MaxWriteBW>:<MaxWriteIops>)")
.RequiredArgument("STR")
.Handler1T<TString>(
[this](TStringBuf s)
{
TVector<TString> vals;
Split(s.data(), ":", vals);
Y_ENSURE(vals.size() == 4, "invalid format");

MaxReadBandwidth = FromString<ui32>(vals[0]);
MaxReadIops = FromString<ui32>(vals[1]);
MaxWriteBandwidth = FromString<ui32>(vals[2]);
MaxWriteIops = FromString<ui32>(vals[3]);
});

TOptsParseResultException res(&opts, argc, argv);

if (res.FindLongOptParseResult("verbose") && VerboseLevel.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/vhost-server/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ struct TOptions

TString ClientId = "vhost-server";

ui32 MaxReadBandwidth = 0;
ui32 MaxReadIops = 0;
ui32 MaxWriteBandwidth = 0;
ui32 MaxWriteIops = 0;

struct
{
ui32 QueueSize = 256;
Expand Down
50 changes: 50 additions & 0 deletions cloud/blockstore/vhost-server/options_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,56 @@ Y_UNIT_TEST_SUITE(TOptionsTest)
UNIT_ASSERT_VALUES_EQUAL(1111111, options.Layout[1].Offset);
UNIT_ASSERT_VALUES_EQUAL(0, options.Layout[2].Offset);
}

Y_UNIT_TEST(ShouldParseRdmaBackendOptions)
{
TOptions options;

TVector<TString> params {
"binary-path",
"--serial", "id",
"--socket-path", "vhost.sock",
"-q" , "8",
"--disk-id", "disk-id",
"--device-backend", "rdma",
"--block-size", "4096",
"--device", "rdma://host1:port1/uuid1:1111111:0",
"--device", "rdma://host2:port2/uuid2:2222222:0",
"--read-only",
"--perf-profile", "1:2:3:4"
};

TVector<char*> argv;
for (auto& p: params) {
argv.push_back(&p[0]);
}

options.Parse(argv.size(), argv.data());

UNIT_ASSERT_VALUES_EQUAL("vhost.sock", options.SocketPath);
UNIT_ASSERT_VALUES_EQUAL("id", options.Serial);
UNIT_ASSERT_VALUES_EQUAL("disk-id", options.DiskId);
UNIT_ASSERT(options.ReadOnly);
UNIT_ASSERT(!options.NoSync);
UNIT_ASSERT(!options.NoChmod);
UNIT_ASSERT_VALUES_EQUAL(1024, options.BatchSize);
UNIT_ASSERT_VALUES_EQUAL(8, options.QueueCount);
UNIT_ASSERT_VALUES_EQUAL(2, options.Layout.size());
UNIT_ASSERT_VALUES_EQUAL("rdma://host1:port1/uuid1", options.Layout[0].DevicePath);
UNIT_ASSERT_VALUES_EQUAL("rdma://host2:port2/uuid2", options.Layout[1].DevicePath);

UNIT_ASSERT_VALUES_EQUAL(1111111, options.Layout[0].ByteCount);
UNIT_ASSERT_VALUES_EQUAL(2222222, options.Layout[1].ByteCount);

UNIT_ASSERT_VALUES_EQUAL(0, options.Layout[0].Offset);
UNIT_ASSERT_VALUES_EQUAL(0, options.Layout[1].Offset);

UNIT_ASSERT_VALUES_EQUAL(1, options.MaxReadBandwidth);
UNIT_ASSERT_VALUES_EQUAL(2, options.MaxReadIops);
UNIT_ASSERT_VALUES_EQUAL(3, options.MaxWriteBandwidth);
UNIT_ASSERT_VALUES_EQUAL(4, options.MaxWriteIops);
}

}

} // namespace NCloud::NBlockStore::NVHostServer
3 changes: 3 additions & 0 deletions cloud/blockstore/vhost-server/public.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ using IBackendPtr = std::shared_ptr<IBackend>;
struct ICompletionStats;
using ICompletionStatsPtr = std::shared_ptr<ICompletionStats>;

struct IThrottler;
using IThrottlerPtr = std::shared_ptr<IThrottler>;

} // namespace NCloud::NBlockStore::NVHostServer
2 changes: 2 additions & 0 deletions cloud/blockstore/vhost-server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ void TServer::Stop()
{
STORAGE_INFO("Stopping the server");

Backend->PrepareStop();

{
auto promise = NewPromise();
vhd_unregister_blockdev(Handler, [] (void* opaque) {
Expand Down
Loading

0 comments on commit 3aa34cf

Please sign in to comment.