Skip to content

Commit

Permalink
Kafka port assignment through discoveryCache (#13197)
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar authored Jan 17, 2025
1 parent 0e5d23d commit db1f75f
Show file tree
Hide file tree
Showing 20 changed files with 621 additions and 130 deletions.
20 changes: 13 additions & 7 deletions ydb/core/discovery/discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ namespace NDiscovery {
if (!CheckEndpointId(endpointId, entry)) {
continue;
}

if (entry.GetSsl()) {
AddEndpoint(cachedMessageSsl, statesSsl, entry);
} else {
Expand All @@ -200,7 +199,6 @@ namespace NDiscovery {
cachedMessageSsl.set_self_location(location);
}
}

return {SerializeResult(cachedMessage), SerializeResult(cachedMessageSsl), std::move(infoEntries)};
}
}
Expand Down Expand Up @@ -235,6 +233,7 @@ namespace NDiscoveryPrivate {

THashMap<TString, TVector<TWaiter>> Requested;
bool Scheduled = false;
TMaybe<TString> EndpointId;

auto Request(const TString& database) {
auto result = Requested.emplace(database, TVector<TWaiter>());
Expand Down Expand Up @@ -279,7 +278,8 @@ namespace NDiscoveryPrivate {

currentCachedMessage = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage(
currentCachedMessage->InfoEntries, std::move(msg->Updates), {}, {}, NameserviceResponse)
currentCachedMessage->InfoEntries, std::move(msg->Updates),
{}, EndpointId.GetOrElse({}), NameserviceResponse)
);

auto it = Requested.find(path);
Expand All @@ -293,7 +293,8 @@ namespace NDiscoveryPrivate {
const auto& path = msg->Path;

auto newCachedData = std::make_shared<NDiscovery::TCachedMessageData>(
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries), {}, {}, NameserviceResponse)
NDiscovery::CreateCachedMessage({}, std::move(msg->InfoEntries),
{}, EndpointId.GetOrElse({}), NameserviceResponse)
);
newCachedData->Status = msg->Status;

Expand Down Expand Up @@ -372,6 +373,11 @@ namespace NDiscoveryPrivate {
}

public:
TDiscoveryCache() = default;
TDiscoveryCache(const TString& endpointId)
: EndpointId(endpointId)
{
}
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::DISCOVERY_CACHE_ACTOR;
}
Expand Down Expand Up @@ -523,7 +529,7 @@ class TDiscoverer: public TActorBootstrapped<TDiscoverer> {
return true;
default:
return true;
}
}
}

void MaybeReply() {
Expand Down Expand Up @@ -621,8 +627,8 @@ IActor* CreateDiscoverer(
return new TDiscoverer(f, database, replyTo, cacheId);
}

IActor* CreateDiscoveryCache() {
return new NDiscoveryPrivate::TDiscoveryCache();
IActor* CreateDiscoveryCache(const TString& endpointId) {
return new NDiscoveryPrivate::TDiscoveryCache(endpointId);
}

}
2 changes: 1 addition & 1 deletion ydb/core/discovery/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ IActor* CreateDiscoverer(
const TActorId& cacheId);

// Used to reduce number of requests to Board
IActor* CreateDiscoveryCache();
IActor* CreateDiscoveryCache(const TString& endpointId = {});

}
6 changes: 6 additions & 0 deletions ydb/core/driver_lib/run/config_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void TRunCommandConfigParser::SetupLastGetOptForConfigFiles(NLastGetopt::TOpts&
opts.AddLongOption("grpc-file", "gRPC config file").OptionalArgument("PATH");
opts.AddLongOption("grpc-port", "enable gRPC server on port").RequiredArgument("PORT");
opts.AddLongOption("grpcs-port", "enable gRPC SSL server on port").RequiredArgument("PORT");
opts.AddLongOption("kafka-port", "enable kafka proxy server on port").OptionalArgument("PORT");
opts.AddLongOption("grpc-public-host", "set public gRPC host for discovery").RequiredArgument("HOST");
opts.AddLongOption("grpc-public-port", "set public gRPC port for discovery").RequiredArgument("PORT");
opts.AddLongOption("grpcs-public-port", "set public gRPC SSL port for discovery").RequiredArgument("PORT");
Expand Down Expand Up @@ -165,6 +166,11 @@ void TRunCommandConfigParser::ParseConfigFiles(const NLastGetopt::TOptsParseResu
conf.SetSslPort(FromString<ui16>(res.Get("grpcs-port")));
}

if (res.Has("kafka-port")) {
auto& conf = *Config.AppConfig.MutableKafkaProxyConfig();
conf.SetListeningPort(FromString<ui16>(res.Get("kafka-port")));
}

if (res.Has("grpc-public-host")) {
auto& conf = *Config.AppConfig.MutableGRpcConfig();
conf.SetPublicHost(res.Get("grpc-public-host"));
Expand Down
23 changes: 21 additions & 2 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include <ydb/core/control/immediate_control_board_actor.h>

#include <ydb/core/driver_lib/version/version.h>
#include <ydb/core/discovery/discovery.h>

#include <ydb/core/grpc_services/grpc_mon.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
Expand Down Expand Up @@ -1714,6 +1715,18 @@ void TGRpcServicesInitializer::InitializeServices(NActors::TActorSystemSetup* se
endpoints.push_back(std::move(desc));
}

if (Config.GetKafkaProxyConfig().GetEnableKafkaProxy()) {
const auto& kakfaConfig = Config.GetKafkaProxyConfig();
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();
desc->Address = config.GetPublicHost() ? config.GetPublicHost() : address;
desc->Port = kakfaConfig.GetListeningPort();
desc->Ssl = kakfaConfig.HasSslCertificate();

desc->EndpointId = NGRpcService::KafkaEndpointId;
endpoints.push_back(std::move(desc));

}

for (auto &sx : config.GetExtEndpoints()) {
const TString &localAddress = sx.GetHost() ? (sx.GetHost() != "[::]" ? sx.GetHost() : FQDNHostName()) : address;
if (const ui32 port = sx.GetPort()) {
Expand Down Expand Up @@ -2743,10 +2756,16 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
settings.PrivateKeyFile = Config.GetKafkaProxyConfig().GetKey();

setup->LocalServices.emplace_back(
TActorId(),
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig()),
NKafka::MakeKafkaDiscoveryCacheID(),
TActorSetupCmd(CreateDiscoveryCache(NGRpcService::KafkaEndpointId),
TMailboxType::HTSwap, appData->UserPoolId)
);
setup->LocalServices.emplace_back(
TActorId(),
TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings, Config.GetKafkaProxyConfig(),
NKafka::MakeKafkaDiscoveryCacheID()),
TMailboxType::HTSwap, appData->UserPoolId)
);

IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters});
setup->LocalServices.emplace_back(
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/grpc_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ inline TActorId CreateGrpcPublisherServiceActorId() {
return actorId;
}

const static TString KafkaEndpointId = "KafkaProxy";
}
}
10 changes: 7 additions & 3 deletions ydb/core/kafka_proxy/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ struct TContext {

NKikimr::NPQ::TRlContext RlContext;

bool Authenticated() {
return !RequireAuthentication || AuthenticationStep == SUCCESS;
bool Authenticated() {
return !RequireAuthentication || AuthenticationStep == SUCCESS;
}

TActorId DiscoveryCacheActor;
};

template<std::derived_from<TApiMessage> T>
Expand Down Expand Up @@ -165,7 +167,9 @@ inline TString GetUserSerializedToken(std::shared_ptr<TContext> context) {

NActors::IActor* CreateKafkaApiVersionsActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TApiVersionsRequestData>& message);
NActors::IActor* CreateKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TInitProducerIdRequestData>& message);
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message);
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId,
const TMessagePtr<TMetadataRequestData>& message,
const TActorId& discoveryCacheActor);
NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
NActors::IActor* CreateKafkaReadSessionActor(const TContext::TPtr context, ui64 cookie);
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
Expand Down
Loading

0 comments on commit db1f75f

Please sign in to comment.