Skip to content

Commit

Permalink
Introduce locality counters for single node tx. (#13415)
Browse files Browse the repository at this point in the history
TotalSingleNodeReqCount    -     increases if tx touches the only one node (all shards located on the same node)
NonLocalSingleNodeReqCount - increases if tx touches the only one node and this node is same with session node (node where located kqp session and grpc request accepted)


### Changelog category <!-- remove all except one -->

* New feature
  • Loading branch information
dcherednik authored Jan 20, 2025
1 parent 345e7b5 commit 1ad3712
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 1 deletion.
3 changes: 3 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,9 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
SchedulerDelays = KqpGroup->GetHistogram("NodeScheduler/Delay", NMonitoring::ExponentialHistogram(20, 2, 1));

RowsDuplicationsFound = KqpGroup->GetCounter("RowsDuplicationFound", true);

TotalSingleNodeReqCount = KqpGroup->GetCounter("TotalSingleNodeReqCount", true);
NonLocalSingleNodeReqCount = KqpGroup->GetCounter("NonLocalSingleNodeReqCount", true);
}

::NMonitoring::TDynamicCounterPtr TKqpCounters::GetKqpCounters() const {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/counters/kqp_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter

NMonitoring::TDynamicCounters::TCounterPtr RowsDuplicationsFound;

// Locality metrics for request
NMonitoring::TDynamicCounters::TCounterPtr TotalSingleNodeReqCount;
NMonitoring::TDynamicCounters::TCounterPtr NonLocalSingleNodeReqCount;

TAlignedPagePoolCounters AllocCounters;

// db counters
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const ui64 shardId = res->GetOrigin();
LastShard = shardId;

ParticipantNodes.emplace(ev->Sender.NodeId());

TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState);

Expand Down Expand Up @@ -2105,7 +2107,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (i64 msc = (i64) Request.MaxShardCount; msc > 0) {
shardsLimit = std::min(shardsLimit, (ui32) msc);
}
size_t shards = datashardTasks.size() + sourceScanPartitionsCount;
const size_t shards = datashardTasks.size() + sourceScanPartitionsCount;

if (shardsLimit > 0 && shards > shardsLimit) {
LOG_W("Too many affected shards: datashardTasks=" << shards << ", limit: " << shardsLimit);
Counters->TxProxyMon->TxResultError->Inc();
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
ShardIdToNodeId = std::move(reply.ShardNodes);
for (auto& [shardId, nodeId] : ShardIdToNodeId) {
ShardsOnNode[nodeId].push_back(shardId);
ParticipantNodes.emplace(nodeId);
}

if (IsDebugLogEnabled()) {
Expand Down Expand Up @@ -1923,6 +1924,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
LOG_I("Full stats: " << response.GetResult().GetStats());
}
}

for (const auto nodeId : ParticipantNodes) {
response.MutableResult()->AddParticipantNodes(nodeId);
}
}

Request.Transactions.crop(0);
Expand Down Expand Up @@ -2055,6 +2060,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
bool CheckDuplicateRows = false;

ui32 StatementResultIndex;

// Track which nodes has been involved during execution
THashSet<ui32> ParticipantNodes;

bool AlreadyReplied = false;
bool EnableReadsMerge = false;

Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ class TKqpQueryState : public TNonCopyable {
ui32 StatementResultSize = 0;

TMaybe<TString> CommandTagName;
THashSet<uint32_t> ParticipantNodes;

bool IsLocalExecution(ui32 nodeId) const {
if (RequestEv->GetRequestCtx() == nullptr) {
return false;
}
if (ParticipantNodes.size() == 1) {
return *ParticipantNodes.begin() == nodeId;
}
return false;
}

NKikimrKqp::EQueryAction GetAction() const {
return QueryAction;
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1554,6 +1554,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->QueryStats.Executions.back().Swap(executerResults.MutableStats());
}

if (executerResults.ParticipantNodesSize()) {
for (auto nodeId : executerResults.GetParticipantNodes()) {
QueryState->ParticipantNodes.emplace(nodeId);
}
}

if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
const auto executionType = ev->ExecutionType;

Expand Down Expand Up @@ -2302,6 +2308,13 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->PoolHandlerActor = Nothing();
}

if (QueryState && QueryState->ParticipantNodes.size() == 1) {
Counters->TotalSingleNodeReqCount->Inc();
if (!QueryState->IsLocalExecution(SelfId().NodeId())) {
Counters->NonLocalSingleNodeReqCount->Inc();
}
}

LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
<< " TransactionsToBeAborted.size(): " << (CleanupCtx ? CleanupCtx->TransactionsToBeAborted.size() : 0)
<< " WorkerId: " << (workerId ? *workerId : TActorId())
Expand Down
216 changes: 216 additions & 0 deletions ydb/core/kqp/ut/query/kqp_stats_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
Expand Down Expand Up @@ -637,6 +638,221 @@ Y_UNIT_TEST(SysViewCancelled) {
}
}

Y_UNIT_TEST(OneShardLocalExec) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
{
auto result = session.ExecuteDataQuery(R"(
SELECT * FROM `/Root/KeyValue` WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 2);
}
{
auto result = session.ExecuteDataQuery(R"(
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, "1");
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 3);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
SELECT * FROM `/Root/KeyValue` WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 4);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, "1");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), 5);
}
UNIT_ASSERT_VALUES_EQUAL(counters.NonLocalSingleNodeReqCount->Val(), 0);
}

Y_UNIT_TEST(OneShardNonLocalExec) {
TKikimrRunner kikimr(TKikimrSettings().SetNodeCount(2));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
auto monPort = kikimr.GetTestServer().GetRuntime()->GetMonPort();

auto firstNodeId = kikimr.GetTestServer().GetRuntime()->GetFirstNodeId();

TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);

auto expectedTotalSingleNodeReqCount = counters.TotalSingleNodeReqCount->Val();
auto expectedNonLocalSingleNodeReqCount = counters.NonLocalSingleNodeReqCount->Val();

auto drainNode = [monPort](size_t nodeId, bool undrain = false) {
TNetworkAddress addr("localhost", monPort);
TSocket s(addr);
TString url;
if (undrain) {
url = "/tablets/app?TabletID=72057594037968897&node=" + std::to_string(nodeId) + "&page=SetDown&down=0";
} else {
url = "/tablets/app?TabletID=72057594037968897&node=" + std::to_string(nodeId) + "&page=DrainNode";
}
SendMinimalHttpRequest(s, "localhost", url);
TSocketInput si(s);
THttpInput input(&si);
TString firstLine = input.FirstLine();

const auto httpCode = ParseHttpRetCode(firstLine);
UNIT_ASSERT_VALUES_EQUAL(httpCode, 200);
};

auto waitTablets = [&session](size_t nodeId) mutable {
TDescribeTableSettings describeTableSettings =
TDescribeTableSettings()
.WithTableStatistics(true)
.WithPartitionStatistics(true)
.WithShardNodesInfo(true);

bool done = false;
for (int i = 0; i < 10; i++) {
std::unordered_set<ui32> nodeIds;
auto res = session.DescribeTable("Root/EightShard", describeTableSettings)
.ExtractValueSync();

UNIT_ASSERT_EQUAL(res.IsTransportError(), false);
UNIT_ASSERT_EQUAL(res.GetStatus(), EStatus::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(res.GetTableDescription().GetPartitionsCount(), 8);
UNIT_ASSERT_VALUES_EQUAL(res.GetTableDescription().GetPartitionStats().size(), 8);
for (const auto& s : res.GetTableDescription().GetPartitionStats()) {
nodeIds.emplace(s.LeaderNodeId);
}
if (nodeIds.size() == 1 && *nodeIds.begin() == nodeId) {
done = true;
break;
}
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(done, "unable to wait tablets move on specific node");
};

// Move all tablets on the node2, we have a grpc connection to node 1
// so all sessions will be created on the node 1
drainNode(firstNodeId);
waitTablets(firstNodeId + 1);

{
auto result = session.ExecuteDataQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
expectedNonLocalSingleNodeReqCount += 6;
UNIT_ASSERT_VALUES_EQUAL(counters.NonLocalSingleNodeReqCount->Val(), expectedNonLocalSingleNodeReqCount);

// Now resume node 1 and move all tablets on the node1
// so all tablets will be on the same node with session
drainNode(firstNodeId, true);
drainNode(firstNodeId + 1);
waitTablets(firstNodeId);

{
auto result = session.ExecuteDataQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPSERT INTO `/Root/EightShard` (Key, Data) VALUES (1, 1);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = session.ExecuteDataQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
{
auto result = kikimr.GetQueryClient().ExecuteQuery(R"(
UPDATE `/Root/EightShard` SET Data = 111 WHERE Key = 1;
SELECT * FROM `/Root/EightShard` WHERE Key = 1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(counters.TotalSingleNodeReqCount->Val(), ++expectedTotalSingleNodeReqCount);
}
// All executions are local - same value of counter
UNIT_ASSERT_VALUES_EQUAL(counters.NonLocalSingleNodeReqCount->Val(), expectedNonLocalSingleNodeReqCount);
}

} // suite

} // namespace NKqp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ message TExecuterTxResult {
reserved 5; // (deprecated) Stats
optional NYql.NDqProto.TDqExecutionStats Stats = 6;
reserved 7;
repeated uint32 ParticipantNodes = 8;
};

message TExecuterTxResponse {
Expand Down

0 comments on commit 1ad3712

Please sign in to comment.