Skip to content

Commit

Permalink
issue-370: cherry-pick ydb commit to configure configs dispatcher wit…
Browse files Browse the repository at this point in the history
…hout tenant slot broker + adopt blockstore/filestore code (#2433)

* issue-370: cherry-pick ydb commit to configure configs dispatcher without tenant slot broker + adopt blockstore/filestore code

* update

* update
  • Loading branch information
yegorskii authored Nov 8, 2024
1 parent e5858c4 commit ab12a72
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 85 deletions.
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/storage/init/disk_agent/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ IActorSystemPtr CreateDiskAgentActorSystem(const TDiskAgentActorSystemArgs& daAr
if (daArgs.StorageConfig->GetConfigsDispatcherServiceEnabled()) {
SetupConfigDispatcher(
daArgs.StorageConfig->GetConfigDispatcherSettings(),
daArgs.StorageConfig->GetSchemeShardDir(),
daArgs.StorageConfig->GetNodeType(),
&runConfig.ConfigsDispatcherInitInfo);
runConfig.ConfigsDispatcherInitInfo.InitialConfig = runConfig.AppConfig;
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/blockstore/libs/storage/init/server/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ IActorSystemPtr CreateActorSystem(const TServerActorSystemArgs& sArgs)
if (sArgs.StorageConfig->GetConfigsDispatcherServiceEnabled()) {
SetupConfigDispatcher(
sArgs.StorageConfig->GetConfigDispatcherSettings(),
sArgs.StorageConfig->GetSchemeShardDir(),
sArgs.StorageConfig->GetNodeType(),
&runConfig.ConfigsDispatcherInitInfo);
runConfig.ConfigsDispatcherInitInfo.InitialConfig = runConfig.AppConfig;
}
Expand Down
134 changes: 61 additions & 73 deletions cloud/blockstore/tests/config_dispatcher/test.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,22 @@
import os
import requests
import signal
import time


from cloud.blockstore.config.server_pb2 import TServerConfig, TServerAppConfig, TKikimrServiceConfig
from cloud.blockstore.config.storage_pb2 import TStorageServiceConfig

from cloud.blockstore.tests.python.lib.nbs_runner import LocalNbs
from cloud.blockstore.tests.python.lib.test_base import thread_count, wait_for_nbs_server
from cloud.blockstore.tests.python.lib.config import NbsConfigurator, generate_disk_agent_txt
from cloud.blockstore.tests.python.lib.daemon import start_ydb, start_nbs, start_disk_agent

from contrib.ydb.core.protos import config_pb2

from contrib.ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory
from contrib.ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator

from contrib.ydb.core.protos.config_pb2 import TLogConfig

import yatest.common as yatest_common


def test_config_dispatcher():
kikimr_binary_path = yatest_common.binary_path('contrib/ydb/apps/ydbd/ydbd')
configurator = KikimrConfigGenerator(
erasure=None,
binary_path=kikimr_binary_path,
has_cluster_uuid=False,
use_in_memory_pdisks=True,
dynamic_storage_pools=[
dict(name='dynamic_storage_pool:1', kind='hdd', pdisk_user_kind=0),
dict(name='dynamic_storage_pool:2', kind='ssd', pdisk_user_kind=0)
])
kikimr_cluster = kikimr_cluster_factory(configurator=configurator)
kikimr_cluster.start()

server_app_config = TServerAppConfig()
server_app_config.ServerConfig.CopyFrom(TServerConfig())
server_app_config.ServerConfig.ThreadsCount = thread_count()
server_app_config.ServerConfig.StrictContractValidation = False
server_app_config.KikimrServiceConfig.CopyFrom(TKikimrServiceConfig())
server_app_config.ServerConfig.NodeType = 'nbs'

certs_dir = yatest_common.source_path('cloud/blockstore/tests/certs')
server_app_config.ServerConfig.RootCertsFile = os.path.join(certs_dir, 'server.crt')
cert = server_app_config.ServerConfig.Certs.add()
cert.CertFile = os.path.join(certs_dir, 'server.crt')
cert.CertPrivateKeyFile = os.path.join(certs_dir, 'server.key')

pm = yatest_common.network.PortManager()
nbs_port = pm.get_port()
nbs_secure_port = pm.get_port()
kikimr_port = list(kikimr_cluster.nodes.values())[0].port

# file config
storage = TStorageServiceConfig()
storage.ConfigsDispatcherServiceEnabled = True
storage.ConfigDispatcherSettings.AllowList.Names.append('NameserviceConfigItem')

nbs_binary_path = yatest_common.binary_path('cloud/blockstore/apps/server/nbsd')
nbs = LocalNbs(
kikimr_port,
configurator.domains_txt,
server_app_config=server_app_config,
storage_config_patches=[storage],
enable_tls=True,
nbs_secure_port=nbs_secure_port,
nbs_port=nbs_port,
kikimr_binary_path=kikimr_binary_path,
nbs_binary_path=nbs_binary_path)
nbs.start()
wait_for_nbs_server(nbs.nbs_port)

def verify_config_update(ydb, mon_port):

def query_monitoring(url, text):
r = requests.get(url, timeout=10)
r.raise_for_status()
return r.text.find(text) != -1

assert query_monitoring(
f'http://localhost:{nbs.mon_port}/actors/logger?c=1025',
f'http://localhost:{mon_port}/actors/logger?c=1025',
'Sampling rate: 0')

app_config = config_pb2.TAppConfig()
Expand All @@ -90,11 +29,11 @@ def query_monitoring(url, text):
entry.Component = component_to_test
entry.SamplingRate = 1000
app_config.LogConfig.MergeFrom(log_config)
kikimr_cluster.client.add_config_item(app_config)
ydb.client.add_config_item(app_config)

# add new static node
app_config = config_pb2.TAppConfig()
naming_config = configurator.names_txt
naming_config = ydb.config.names_txt
node = naming_config.Node.add(
NodeId=2,
Address='::1',
Expand All @@ -105,7 +44,7 @@ def query_monitoring(url, text):
node.WalleLocation.Rack = 'somewhere'
node.WalleLocation.Body = 1
app_config.NameserviceConfig.MergeFrom(naming_config)
kikimr_cluster.client.add_config_item(app_config)
ydb.client.add_config_item(app_config)

def query_monitoring(url, text):
r = requests.get(url, timeout=10)
Expand All @@ -114,16 +53,65 @@ def query_monitoring(url, text):

# wait for nameservice config update
while True:
if query_monitoring(f'http://localhost:{nbs.mon_port}/actors/dnameserver', 'somewhere'):
if query_monitoring(f'http://localhost:{mon_port}/actors/dnameserver', 'somewhere'):
break
else:
time.sleep(10)

# check that logging config was not changed
result = query_monitoring(
f'http://localhost:{nbs.mon_port}/actors/logger?c=1025',
f'http://localhost:{mon_port}/actors/logger?c=1025',
'Sampling rate: 0')

os.kill(nbs.pid, signal.SIGTERM)
return result


def prepare(ydb, node_type, disable_local_service):
nbs_configurator = NbsConfigurator(ydb)
nbs_configurator.generate_default_nbs_configs()

nbs_configurator.files['storage'].NodeType = node_type
nbs_configurator.files['storage'].DisableLocalService = disable_local_service
nbs_configurator.files['storage'].ConfigsDispatcherServiceEnabled = True
nbs_configurator.files['storage'].ConfigDispatcherSettings.AllowList.Names.append('NameserviceConfigItem')

return nbs_configurator


def setup_and_run_test_for_server(node_type, disable_local_service):
ydb = start_ydb()

nbs = start_nbs(prepare(ydb, node_type, disable_local_service))
result = verify_config_update(ydb, nbs.mon_port)
nbs.kill()

return result


def setup_and_run_test_for_da():
ydb = start_ydb()

nbs = start_nbs(prepare(ydb, 'nbs_control', False))

da_configurator = prepare(ydb, 'disk-agent', True)
da_configurator.files["disk-agent"] = generate_disk_agent_txt(agent_id='')
da = start_disk_agent(da_configurator)

result = verify_config_update(ydb, da.mon_port)

da.kill()
nbs.kill()

return result


def test_nbs_control_config_update():
assert setup_and_run_test_for_server('nbs_control', False) is True


def test_server_config_update():
assert setup_and_run_test_for_server('nbs', True) is True


assert result
def test_da_config_update():
assert setup_and_run_test_for_da() is True
1 change: 1 addition & 0 deletions cloud/blockstore/tests/config_dispatcher/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ PEERDIR(
DEPENDS(
cloud/blockstore/apps/client
cloud/blockstore/apps/server
cloud/blockstore/apps/disk_agent
contrib/ydb/apps/ydbd
)

Expand Down
2 changes: 2 additions & 0 deletions cloud/filestore/libs/storage/init/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ void TActorSystem::Init()
if (Args.StorageConfig->GetConfigsDispatcherServiceEnabled()) {
SetupConfigDispatcher(
Args.StorageConfig->GetConfigDispatcherSettings(),
Args.StorageConfig->GetSchemeShardDir(),
Args.StorageConfig->GetNodeType(),
&runConfig.ConfigsDispatcherInitInfo);
runConfig.ConfigsDispatcherInitInfo.InitialConfig = runConfig.AppConfig;
}
Expand Down
8 changes: 8 additions & 0 deletions cloud/storage/core/libs/kikimr/config_dispatcher_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@ using namespace NKikimr::NConfig;

////////////////////////////////////////////////////////////////////////////////

static const TString tenantLabel = "tenant";
static const TString nodeNameLabel = "node_type";

void SetupConfigDispatcher(
const NProto::TConfigDispatcherSettings& settings,
const TString& tenantName,
const TString& nodeType,
NKikimr::NConfig::TConfigsDispatcherInitInfo* config)
{
config->Labels.emplace(tenantLabel, tenantName);
config->Labels.emplace(nodeNameLabel, nodeType);

if (!settings.HasAllowList() && !settings.HasDenyList()) {
return;
}
Expand Down
2 changes: 2 additions & 0 deletions cloud/storage/core/libs/kikimr/config_dispatcher_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ namespace NCloud::NStorage {

void SetupConfigDispatcher(
const NProto::TConfigDispatcherSettings& settings,
const TString& tenantName,
const TString& nodeType,
NKikimr::NConfig::TConfigsDispatcherInitInfo* config);

} // namespace NCloud::NStorage
28 changes: 24 additions & 4 deletions cloud/storage/core/libs/kikimr/config_dispatcher_helpers_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/testing/unittest/registar.h>

#include <util/generic/guid.h>
#include <util/generic/string.h>


#include <variant>

namespace NCloud {
Expand Down Expand Up @@ -42,7 +44,7 @@ Y_UNIT_TEST_SUITE(TConfigDispatcherHelpersTest)
NKikimr::NConfig::TConfigsDispatcherInitInfo config;
NProto::TConfigDispatcherSettings settings;
settings.MutableAllowList()->AddNames("xyz");
SetupConfigDispatcher(settings, &config);
SetupConfigDispatcher(settings, {}, {}, &config);

UNIT_ASSERT(
std::holds_alternative<TAllowList>(config.ItemsServeRules));
Expand All @@ -56,7 +58,7 @@ Y_UNIT_TEST_SUITE(TConfigDispatcherHelpersTest)
NKikimr::NConfig::TConfigsDispatcherInitInfo config;
NProto::TConfigDispatcherSettings settings;
settings.MutableDenyList()->AddNames("xyz");
SetupConfigDispatcher(settings, &config);
SetupConfigDispatcher(settings, {}, {}, &config);

UNIT_ASSERT(
std::holds_alternative<TDenyList>(config.ItemsServeRules));
Expand All @@ -71,7 +73,7 @@ Y_UNIT_TEST_SUITE(TConfigDispatcherHelpersTest)

NKikimr::NConfig::TConfigsDispatcherInitInfo config;
NProto::TConfigDispatcherSettings settings;
SetupConfigDispatcher(settings, &config);
SetupConfigDispatcher(settings, {}, {}, &config);

UNIT_ASSERT(
std::holds_alternative<std::monostate>(config.ItemsServeRules));
Expand All @@ -97,7 +99,7 @@ Y_UNIT_TEST_SUITE(TConfigDispatcherHelpersTest)
NKikimrConsole::TConfigItem_EKind_Name(id));
}

SetupConfigDispatcher(settings, &config);
SetupConfigDispatcher(settings, {}, {}, &config);

UNIT_ASSERT(
std::holds_alternative<TAllowList>(config.ItemsServeRules));
Expand All @@ -106,6 +108,24 @@ Y_UNIT_TEST_SUITE(TConfigDispatcherHelpersTest)
std::get<TAllowList>(config.ItemsServeRules).Items.size());
UNIT_ASSERT_VALUES_EQUAL(0, counter->Val());
}

Y_UNIT_TEST(ShouldFillTenantAndNodeTypeLabels)
{
auto counter = SetupCriticalEvent();

const TString tenant = CreateGuidAsString();
const TString node = CreateGuidAsString();

NKikimr::NConfig::TConfigsDispatcherInitInfo config;
NProto::TConfigDispatcherSettings settings;
SetupConfigDispatcher(settings, tenant, node, &config);

UNIT_ASSERT(
std::holds_alternative<std::monostate>(config.ItemsServeRules));
UNIT_ASSERT_VALUES_EQUAL(0, counter->Val());
UNIT_ASSERT_VALUES_EQUAL(tenant, config.Labels["tenant"]);
UNIT_ASSERT_VALUES_EQUAL(node, config.Labels["node_type"]);
}
}

} // namespace NCloud
8 changes: 7 additions & 1 deletion contrib/ydb/core/cms/console/configs_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,13 @@ void TConfigsDispatcher::Bootstrap()
CurrentConfig,
0,
true,
1);
1,
{},
{},
TNodeInfo{
.Tenant = Labels.contains("tenant") ? Labels.at("tenant") : TString(""),
.NodeType = Labels.contains("node_type") ? Labels.at("node_type") : TString(""),
});
CommonSubscriptionClient = RegisterWithSameMailbox(commonClient);

Become(&TThis::StateInit);
Expand Down
33 changes: 29 additions & 4 deletions contrib/ydb/core/cms/console/console_configs_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class TConfigsSubscriber : public TActorBootstrapped<TConfigsSubscriber> {
bool processYaml,
ui64 version,
const TString &yamlConfig,
const TMap<ui64, TString> &volatileYamlConfigs)
const TMap<ui64, TString> &volatileYamlConfigs,
const std::optional<TNodeInfo> explicitNodeInfo)
: OwnerId(ownerId)
, Cookie(cookie)
, Kinds(kinds)
Expand All @@ -67,6 +68,15 @@ class TConfigsSubscriber : public TActorBootstrapped<TConfigsSubscriber> {
VolatileYamlConfigHashes[id] = THash<TString>()(config);
}
}

if (explicitNodeInfo) {
if (explicitNodeInfo->Tenant) {
Tenant = explicitNodeInfo->Tenant;
} else {
Tenant = "<none>";
}
NodeType = explicitNodeInfo->NodeType;
}
}

static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand All @@ -86,7 +96,12 @@ class TConfigsSubscriber : public TActorBootstrapped<TConfigsSubscriber> {
DomainUid = dinfo->Domains.begin()->second->DomainUid;
StateStorageGroup = dinfo->GetDefaultStateStorageGroup(DomainUid);

SendPoolStatusRequest(ctx);
if (!Tenant) {
SendPoolStatusRequest(ctx);
} else {
Subscribe(ctx);
}

Become(&TThis::StateWork);
}

Expand Down Expand Up @@ -413,9 +428,19 @@ IActor *CreateConfigsSubscriber(
bool processYaml,
ui64 version,
const TString &yamlConfig,
const TMap<ui64, TString> &volatileYamlConfigs)
const TMap<ui64, TString> &volatileYamlConfigs,
const std::optional<TNodeInfo> explicitNodeInfo)
{
return new TConfigsSubscriber(ownerId, cookie, kinds, currentConfig, processYaml, version, yamlConfig, volatileYamlConfigs);
return new TConfigsSubscriber(
ownerId,
cookie,
kinds,
currentConfig,
processYaml,
version,
yamlConfig,
volatileYamlConfigs,
explicitNodeInfo);
}

} // namespace NKikimr::NConsole
Loading

0 comments on commit ab12a72

Please sign in to comment.