Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBSNEBIUS-148: implement throttling for fast data path #838

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,17 @@ class TExternalVhostEndpointListener final
args.emplace_back("--read-only");
}

if (epType == EEndpointType::Rdma) {
const auto& profile = volume.GetPerformanceProfile();
args.insert(args.end(), {
"--perf-profile", TStringBuilder()
<< profile.GetMaxReadBandwidth() << ":"
<< profile.GetMaxReadIops() << ":"
<< profile.GetMaxWriteBandwidth() << ":"
<< profile.GetMaxWriteIops()
});
}

TVector<TString> cgroups(
request.GetClientCGroups().begin(),
request.GetClientCGroups().end()
Expand Down
15 changes: 13 additions & 2 deletions cloud/blockstore/libs/endpoints_vhost/external_vhost_server_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ struct TFixture
volume.SetStorageMediaKind(NProto::STORAGE_MEDIA_SSD_NONREPLICATED);
volume.SetIsFastPathEnabled(true);

auto* profile = volume.MutablePerformanceProfile();
profile->SetMaxReadBandwidth(1111);
profile->SetMaxReadIops(2222);
profile->SetMaxWriteBandwidth(3333);
profile->SetMaxWriteIops(4444);

{
auto* device = volume.AddDevices();
device->SetDeviceName("/dev/disk/by-path/pci-0000:00:16.0-sas-phy2-lun-0");
Expand Down Expand Up @@ -477,10 +483,11 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest)
--device ... 2
--device ... 2
--read-only 1
19
--perf-profile 2
21
*/

UNIT_ASSERT_VALUES_EQUAL(19, create->CmdArgs.size());
UNIT_ASSERT_VALUES_EQUAL(21, create->CmdArgs.size());
UNIT_ASSERT_VALUES_EQUAL("local0", GetArg(create->CmdArgs, "--serial"));

UNIT_ASSERT_VALUES_EQUAL(
Expand All @@ -497,6 +504,10 @@ Y_UNIT_TEST_SUITE(TExternalEndpointTest)

UNIT_ASSERT_VALUES_EQUAL("4096", GetArg(create->CmdArgs, "--block-size"));

UNIT_ASSERT_VALUES_EQUAL(
"1111:2222:3333:4444",
GetArg(create->CmdArgs, "--perf-profile"));

UNIT_ASSERT(FindPtr(create->CmdArgs, "--read-only"));

auto devices = GetArgN(create->CmdArgs, "--device");
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/vhost-server/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct IBackend: public IStartable
virtual ~IBackend() = default;

virtual vhd_bdev_info Init(const TOptions& options) = 0;
virtual void PrepareStop() = 0;
virtual void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/vhost-server/backend_aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class TAioBackend final: public IBackend

vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void PrepareStop() override;
void Stop() override;
void ProcessQueue(
ui32 queueIndex,
Expand Down Expand Up @@ -278,6 +279,10 @@ void TAioBackend::Start()
CompletionThread = std::thread([this] { CompletionThreadFunc(); });
}

void TAioBackend::PrepareStop()
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Давай здесь тоже сделаем {}

}

void TAioBackend::Stop()
{
STORAGE_INFO("Stopping AIO backend");
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/vhost-server/backend_null.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class TNullBackend final: public IBackend

vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void PrepareStop() override;
void Stop() override;
void ProcessQueue(
ui32 queueIndex,
Expand Down Expand Up @@ -60,6 +61,10 @@ vhd_bdev_info TNullBackend::Init(const TOptions& options)
void TNullBackend::Start()
{}

void TNullBackend::PrepareStop()
{
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А в Start и Stop используется {}. Давай сделаем одинаково


void TNullBackend::Stop()
{}

Expand Down
90 changes: 71 additions & 19 deletions cloud/blockstore/vhost-server/backend_rdma.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "backend_rdma.h"

#include "backend.h"
#include "throttler.h"

#include <cloud/blockstore/libs/client/config.h>
#include <cloud/blockstore/libs/client/durable.h>
Expand Down Expand Up @@ -36,6 +37,7 @@ namespace {
////////////////////////////////////////////////////////////////////////////////

constexpr ui32 REQUEST_TIMEOUT_MSEC = 86400000;
constexpr TDuration THROTTLE_DELAY = TDuration::MicroSeconds(100);

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -132,12 +134,14 @@ class TRdmaBackend final: public IBackend
bool ReadOnly = false;
ui32 BlockSize = 0;
ui32 SectorsToBlockShift = 0;
TVector<IThrottlerPtr> Throttlers;

public:
explicit TRdmaBackend(ILoggingServicePtr logging);

vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void PrepareStop() override;
void Stop() override;
void ProcessQueue(
ui32 queueIndex,
Expand All @@ -153,6 +157,9 @@ class TRdmaBackend final: public IBackend
TCpuCycles startCycles,
bool isError);
IBlockStorePtr CreateDataClient(IStoragePtr storage);
void ProcessThrottledRequests(ui32 queueIndex, TSimpleStats& queueStats);

void ProcessIo(struct vhd_io *io, TSimpleStats& queueStats);
};

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -171,6 +178,15 @@ vhd_bdev_info TRdmaBackend::Init(const TOptions& options)
Scheduler = CreateScheduler();
Timer = CreateWallClockTimer();

for (ui32 i = 0; i < options.QueueCount; i++) {
Throttlers.push_back(CreateThrottler(
Timer,
options.MaxReadBandwidth / options.QueueCount,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А мы точно хотим безусловно резать полосу между очередями? Может всё-таки сделать один тротлер?

Если очередей насоздавали, но по какой-то причине не утилизируют, то мы занизим скорость пользователю в разы. А если будет даже небольшой дисбаланс, пользователь хотя бы получит всю свою полосу

options.MaxReadIops / options.QueueCount,
options.MaxWriteBandwidth / options.QueueCount,
options.MaxWriteIops / options.QueueCount));
}

ClientId = options.ClientId;
ReadOnly = options.ReadOnly;

Expand Down Expand Up @@ -293,6 +309,13 @@ void TRdmaBackend::Start()
DataClient = CreateDataClient(std::move(storage));
}

void TRdmaBackend::PrepareStop()
{
for (auto& throttler : Throttlers) {
throttler->Stop();
}
}

void TRdmaBackend::Stop()
{
STORAGE_INFO("Stopping RDMA backend");
Expand All @@ -306,31 +329,60 @@ void TRdmaBackend::ProcessQueue(
vhd_request_queue* queue,
TSimpleStats& queueStats)
{
Y_UNUSED(queueIndex);
auto& throttler = Throttlers[queueIndex];

vhd_request req;
while (vhd_dequeue_request(queue, &req)) {

++queueStats.Dequeued;

struct vhd_bdev_io* bio = vhd_get_bdev_io(req.io);
const TCpuCycles now = GetCycleCount();
switch (bio->type) {
case VHD_BDEV_READ:
ProcessReadRequest(req.io, now);
++queueStats.Submitted;
break;
case VHD_BDEV_WRITE:
ProcessWriteRequest(req.io, now);
++queueStats.Submitted;
break;
default:
STORAGE_ERROR(
"Unexpected vhost request type: "
<< static_cast<int>(bio->type));
vhd_complete_bio(req.io, VHD_BDEV_IOERR);
++queueStats.SubFailed;
break;
if (throttler->ThrottleIo(req.io)) {
continue;
}

ProcessIo(req.io, queueStats);
}

ProcessThrottledRequests(queueIndex, queueStats);
}

void TRdmaBackend::ProcessThrottledRequests(
ui32 queueIndex,
TSimpleStats& queueStats)
{
auto& throttler = Throttlers[queueIndex];

while (throttler->HasThrottledIos()) {
auto* io = throttler->ResumeNextThrottledIo();
if (!io) {
::Sleep(THROTTLE_DELAY);
continue;
}

ProcessIo(io, queueStats);
}
}

void TRdmaBackend::ProcessIo(struct vhd_io *io, TSimpleStats& queueStats)
{
struct vhd_bdev_io* bio = vhd_get_bdev_io(io);
const TCpuCycles now = GetCycleCount();
switch (bio->type) {
case VHD_BDEV_READ:
ProcessReadRequest(io, now);
++queueStats.Submitted;
break;
case VHD_BDEV_WRITE:
ProcessWriteRequest(io, now);
++queueStats.Submitted;
break;
default:
STORAGE_ERROR(
"Unexpected vhost request type: "
<< static_cast<int>(bio->type));
vhd_complete_bio(io, VHD_BDEV_IOERR);
++queueStats.SubFailed;
break;
}
}

Expand Down
18 changes: 18 additions & 0 deletions cloud/blockstore/vhost-server/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,24 @@ void TOptions::Parse(int argc, char** argv)
.RequiredArgument("INT")
.StoreResultDef(&RdmaClient.MaxBufferSize);

opts.AddLongOption(
"perf-profile",
"Performance profile "
"(<MaxReadBw>:<MaxReadIops>:<MaxWriteBW>:<MaxWriteIops>)")
.RequiredArgument("STR")
.Handler1T<TString>(
[this](TStringBuf s)
{
TVector<TString> vals;
Split(s.data(), ":", vals);
Y_ENSURE(vals.size() == 4, "invalid format");

MaxReadBandwidth = FromString<ui32>(vals[0]);
MaxReadIops = FromString<ui32>(vals[1]);
MaxWriteBandwidth = FromString<ui32>(vals[2]);
MaxWriteIops = FromString<ui32>(vals[3]);
});

TOptsParseResultException res(&opts, argc, argv);

if (res.FindLongOptParseResult("verbose") && VerboseLevel.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions cloud/blockstore/vhost-server/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ struct TOptions

TString ClientId = "vhost-server";

ui32 MaxReadBandwidth = 0;
ui32 MaxReadIops = 0;
ui32 MaxWriteBandwidth = 0;
ui32 MaxWriteIops = 0;

struct
{
ui32 QueueSize = 256;
Expand Down
50 changes: 50 additions & 0 deletions cloud/blockstore/vhost-server/options_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,56 @@ Y_UNIT_TEST_SUITE(TOptionsTest)
UNIT_ASSERT_VALUES_EQUAL(1111111, options.Layout[1].Offset);
UNIT_ASSERT_VALUES_EQUAL(0, options.Layout[2].Offset);
}

Y_UNIT_TEST(ShouldParseRdmaBackendOptions)
{
TOptions options;

TVector<TString> params {
"binary-path",
"--serial", "id",
"--socket-path", "vhost.sock",
"-q" , "8",
"--disk-id", "disk-id",
"--device-backend", "rdma",
"--block-size", "4096",
"--device", "rdma://host1:port1/uuid1:1111111:0",
"--device", "rdma://host2:port2/uuid2:2222222:0",
"--read-only",
"--perf-profile", "1:2:3:4"
};

TVector<char*> argv;
for (auto& p: params) {
argv.push_back(&p[0]);
}

options.Parse(argv.size(), argv.data());

UNIT_ASSERT_VALUES_EQUAL("vhost.sock", options.SocketPath);
UNIT_ASSERT_VALUES_EQUAL("id", options.Serial);
UNIT_ASSERT_VALUES_EQUAL("disk-id", options.DiskId);
UNIT_ASSERT(options.ReadOnly);
UNIT_ASSERT(!options.NoSync);
UNIT_ASSERT(!options.NoChmod);
UNIT_ASSERT_VALUES_EQUAL(1024, options.BatchSize);
UNIT_ASSERT_VALUES_EQUAL(8, options.QueueCount);
UNIT_ASSERT_VALUES_EQUAL(2, options.Layout.size());
UNIT_ASSERT_VALUES_EQUAL("rdma://host1:port1/uuid1", options.Layout[0].DevicePath);
UNIT_ASSERT_VALUES_EQUAL("rdma://host2:port2/uuid2", options.Layout[1].DevicePath);

UNIT_ASSERT_VALUES_EQUAL(1111111, options.Layout[0].ByteCount);
UNIT_ASSERT_VALUES_EQUAL(2222222, options.Layout[1].ByteCount);

UNIT_ASSERT_VALUES_EQUAL(0, options.Layout[0].Offset);
UNIT_ASSERT_VALUES_EQUAL(0, options.Layout[1].Offset);

UNIT_ASSERT_VALUES_EQUAL(1, options.MaxReadBandwidth);
UNIT_ASSERT_VALUES_EQUAL(2, options.MaxReadIops);
UNIT_ASSERT_VALUES_EQUAL(3, options.MaxWriteBandwidth);
UNIT_ASSERT_VALUES_EQUAL(4, options.MaxWriteIops);
}

}

} // namespace NCloud::NBlockStore::NVHostServer
3 changes: 3 additions & 0 deletions cloud/blockstore/vhost-server/public.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ using IBackendPtr = std::shared_ptr<IBackend>;
struct ICompletionStats;
using ICompletionStatsPtr = std::shared_ptr<ICompletionStats>;

struct IThrottler;
using IThrottlerPtr = std::shared_ptr<IThrottler>;

} // namespace NCloud::NBlockStore::NVHostServer
2 changes: 2 additions & 0 deletions cloud/blockstore/vhost-server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ void TServer::Stop()
{
STORAGE_INFO("Stopping the server");

Backend->PrepareStop();

{
auto promise = NewPromise();
vhd_unregister_blockdev(Handler, [] (void* opaque) {
Expand Down
Loading
Loading