diff --git a/BUILD b/BUILD
index ef5ae9077768c..093cddb95af8f 100644
--- a/BUILD
+++ b/BUILD
@@ -4395,17 +4395,18 @@ grpc_cc_library(
grpc_cc_library(
name = "xds_client",
srcs = [
+ "//src/core:xds/xds_client/lrs_client.cc",
"//src/core:xds/xds_client/xds_api.cc",
"//src/core:xds/xds_client/xds_bootstrap.cc",
"//src/core:xds/xds_client/xds_client.cc",
- "//src/core:xds/xds_client/xds_client_stats.cc",
],
hdrs = [
+ "//src/core:xds/xds_client/lrs_client.h",
"//src/core:xds/xds_client/xds_api.h",
"//src/core:xds/xds_client/xds_bootstrap.h",
"//src/core:xds/xds_client/xds_channel_args.h",
"//src/core:xds/xds_client/xds_client.h",
- "//src/core:xds/xds_client/xds_client_stats.h",
+ "//src/core:xds/xds_client/xds_locality.h",
"//src/core:xds/xds_client/xds_metrics.h",
"//src/core:xds/xds_client/xds_resource_type.h",
"//src/core:xds/xds_client/xds_resource_type_impl.h",
@@ -4464,6 +4465,7 @@ grpc_cc_library(
"//src/core:json",
"//src/core:per_cpu",
"//src/core:ref_counted",
+ "//src/core:ref_counted_string",
"//src/core:time",
"//src/core:upb_utils",
"//src/core:useful",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b401fc53b496f..586898b7ec1d3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2662,10 +2662,10 @@ add_library(grpc
src/core/xds/grpc/xds_routing.cc
src/core/xds/grpc/xds_server_grpc.cc
src/core/xds/grpc/xds_transport_grpc.cc
+ src/core/xds/xds_client/lrs_client.cc
src/core/xds/xds_client/xds_api.cc
src/core/xds/xds_client/xds_bootstrap.cc
src/core/xds/xds_client/xds_client.cc
- src/core/xds/xds_client/xds_client_stats.cc
)
target_compile_features(grpc PUBLIC cxx_std_14)
diff --git a/Makefile b/Makefile
index 17e4b30baa4c2..2496f2c915495 100644
--- a/Makefile
+++ b/Makefile
@@ -1515,10 +1515,10 @@ LIBGRPC_SRC = \
src/core/xds/grpc/xds_routing.cc \
src/core/xds/grpc/xds_server_grpc.cc \
src/core/xds/grpc/xds_transport_grpc.cc \
+ src/core/xds/xds_client/lrs_client.cc \
src/core/xds/xds_client/xds_api.cc \
src/core/xds/xds_client/xds_bootstrap.cc \
src/core/xds/xds_client/xds_client.cc \
- src/core/xds/xds_client/xds_client_stats.cc \
third_party/abseil-cpp/absl/base/internal/cycleclock.cc \
third_party/abseil-cpp/absl/base/internal/low_level_alloc.cc \
third_party/abseil-cpp/absl/base/internal/raw_logging.cc \
diff --git a/Package.swift b/Package.swift
index 64e851e770aa4..26ab8a93c4e53 100644
--- a/Package.swift
+++ b/Package.swift
@@ -2031,6 +2031,8 @@ let package = Package(
"src/core/xds/grpc/xds_server_grpc.h",
"src/core/xds/grpc/xds_transport_grpc.cc",
"src/core/xds/grpc/xds_transport_grpc.h",
+ "src/core/xds/xds_client/lrs_client.cc",
+ "src/core/xds/xds_client/lrs_client.h",
"src/core/xds/xds_client/xds_api.cc",
"src/core/xds/xds_client/xds_api.h",
"src/core/xds/xds_client/xds_bootstrap.cc",
@@ -2038,8 +2040,7 @@ let package = Package(
"src/core/xds/xds_client/xds_channel_args.h",
"src/core/xds/xds_client/xds_client.cc",
"src/core/xds/xds_client/xds_client.h",
- "src/core/xds/xds_client/xds_client_stats.cc",
- "src/core/xds/xds_client/xds_client_stats.h",
+ "src/core/xds/xds_client/xds_locality.h",
"src/core/xds/xds_client/xds_metrics.h",
"src/core/xds/xds_client/xds_resource_type.h",
"src/core/xds/xds_client/xds_resource_type_impl.h",
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index f256a8355c3db..3391b4d9d7a45 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -1256,11 +1256,12 @@ libs:
- src/core/xds/grpc/xds_routing.h
- src/core/xds/grpc/xds_server_grpc.h
- src/core/xds/grpc/xds_transport_grpc.h
+ - src/core/xds/xds_client/lrs_client.h
- src/core/xds/xds_client/xds_api.h
- src/core/xds/xds_client/xds_bootstrap.h
- src/core/xds/xds_client/xds_channel_args.h
- src/core/xds/xds_client/xds_client.h
- - src/core/xds/xds_client/xds_client_stats.h
+ - src/core/xds/xds_client/xds_locality.h
- src/core/xds/xds_client/xds_metrics.h
- src/core/xds/xds_client/xds_resource_type.h
- src/core/xds/xds_client/xds_resource_type_impl.h
@@ -2076,10 +2077,10 @@ libs:
- src/core/xds/grpc/xds_routing.cc
- src/core/xds/grpc/xds_server_grpc.cc
- src/core/xds/grpc/xds_transport_grpc.cc
+ - src/core/xds/xds_client/lrs_client.cc
- src/core/xds/xds_client/xds_api.cc
- src/core/xds/xds_client/xds_bootstrap.cc
- src/core/xds/xds_client/xds_client.cc
- - src/core/xds/xds_client/xds_client_stats.cc
deps:
- upb_json_lib
- upb_textformat_lib
diff --git a/config.m4 b/config.m4
index e8a2b6321f8f1..dd953b0801b77 100644
--- a/config.m4
+++ b/config.m4
@@ -890,10 +890,10 @@ if test "$PHP_GRPC" != "no"; then
src/core/xds/grpc/xds_routing.cc \
src/core/xds/grpc/xds_server_grpc.cc \
src/core/xds/grpc/xds_transport_grpc.cc \
+ src/core/xds/xds_client/lrs_client.cc \
src/core/xds/xds_client/xds_api.cc \
src/core/xds/xds_client/xds_bootstrap.cc \
src/core/xds/xds_client/xds_client.cc \
- src/core/xds/xds_client/xds_client_stats.cc \
src/php/ext/grpc/byte_buffer.c \
src/php/ext/grpc/call.c \
src/php/ext/grpc/call_credentials.c \
diff --git a/config.w32 b/config.w32
index 7a5aa5f8b173f..5a8a5d1dd3dd8 100644
--- a/config.w32
+++ b/config.w32
@@ -855,10 +855,10 @@ if (PHP_GRPC != "no") {
"src\\core\\xds\\grpc\\xds_routing.cc " +
"src\\core\\xds\\grpc\\xds_server_grpc.cc " +
"src\\core\\xds\\grpc\\xds_transport_grpc.cc " +
+ "src\\core\\xds\\xds_client\\lrs_client.cc " +
"src\\core\\xds\\xds_client\\xds_api.cc " +
"src\\core\\xds\\xds_client\\xds_bootstrap.cc " +
"src\\core\\xds\\xds_client\\xds_client.cc " +
- "src\\core\\xds\\xds_client\\xds_client_stats.cc " +
"src\\php\\ext\\grpc\\byte_buffer.c " +
"src\\php\\ext\\grpc\\call.c " +
"src\\php\\ext\\grpc\\call_credentials.c " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 4f6a4bc1b1fa3..902195c9fd8c0 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -1370,11 +1370,12 @@ Pod::Spec.new do |s|
'src/core/xds/grpc/xds_routing.h',
'src/core/xds/grpc/xds_server_grpc.h',
'src/core/xds/grpc/xds_transport_grpc.h',
+ 'src/core/xds/xds_client/lrs_client.h',
'src/core/xds/xds_client/xds_api.h',
'src/core/xds/xds_client/xds_bootstrap.h',
'src/core/xds/xds_client/xds_channel_args.h',
'src/core/xds/xds_client/xds_client.h',
- 'src/core/xds/xds_client/xds_client_stats.h',
+ 'src/core/xds/xds_client/xds_locality.h',
'src/core/xds/xds_client/xds_metrics.h',
'src/core/xds/xds_client/xds_resource_type.h',
'src/core/xds/xds_client/xds_resource_type_impl.h',
@@ -2667,11 +2668,12 @@ Pod::Spec.new do |s|
'src/core/xds/grpc/xds_routing.h',
'src/core/xds/grpc/xds_server_grpc.h',
'src/core/xds/grpc/xds_transport_grpc.h',
+ 'src/core/xds/xds_client/lrs_client.h',
'src/core/xds/xds_client/xds_api.h',
'src/core/xds/xds_client/xds_bootstrap.h',
'src/core/xds/xds_client/xds_channel_args.h',
'src/core/xds/xds_client/xds_client.h',
- 'src/core/xds/xds_client/xds_client_stats.h',
+ 'src/core/xds/xds_client/xds_locality.h',
'src/core/xds/xds_client/xds_metrics.h',
'src/core/xds/xds_client/xds_resource_type.h',
'src/core/xds/xds_client/xds_resource_type_impl.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 3ebf0103b56b6..5ef2d92c5c1d9 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -2147,6 +2147,8 @@ Pod::Spec.new do |s|
'src/core/xds/grpc/xds_server_grpc.h',
'src/core/xds/grpc/xds_transport_grpc.cc',
'src/core/xds/grpc/xds_transport_grpc.h',
+ 'src/core/xds/xds_client/lrs_client.cc',
+ 'src/core/xds/xds_client/lrs_client.h',
'src/core/xds/xds_client/xds_api.cc',
'src/core/xds/xds_client/xds_api.h',
'src/core/xds/xds_client/xds_bootstrap.cc',
@@ -2154,8 +2156,7 @@ Pod::Spec.new do |s|
'src/core/xds/xds_client/xds_channel_args.h',
'src/core/xds/xds_client/xds_client.cc',
'src/core/xds/xds_client/xds_client.h',
- 'src/core/xds/xds_client/xds_client_stats.cc',
- 'src/core/xds/xds_client/xds_client_stats.h',
+ 'src/core/xds/xds_client/xds_locality.h',
'src/core/xds/xds_client/xds_metrics.h',
'src/core/xds/xds_client/xds_resource_type.h',
'src/core/xds/xds_client/xds_resource_type_impl.h',
@@ -3450,11 +3451,12 @@ Pod::Spec.new do |s|
'src/core/xds/grpc/xds_routing.h',
'src/core/xds/grpc/xds_server_grpc.h',
'src/core/xds/grpc/xds_transport_grpc.h',
+ 'src/core/xds/xds_client/lrs_client.h',
'src/core/xds/xds_client/xds_api.h',
'src/core/xds/xds_client/xds_bootstrap.h',
'src/core/xds/xds_client/xds_channel_args.h',
'src/core/xds/xds_client/xds_client.h',
- 'src/core/xds/xds_client/xds_client_stats.h',
+ 'src/core/xds/xds_client/xds_locality.h',
'src/core/xds/xds_client/xds_metrics.h',
'src/core/xds/xds_client/xds_resource_type.h',
'src/core/xds/xds_client/xds_resource_type_impl.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 6784a5f4eedcf..ca3b9344c171a 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -2033,6 +2033,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/xds/grpc/xds_server_grpc.h )
s.files += %w( src/core/xds/grpc/xds_transport_grpc.cc )
s.files += %w( src/core/xds/grpc/xds_transport_grpc.h )
+ s.files += %w( src/core/xds/xds_client/lrs_client.cc )
+ s.files += %w( src/core/xds/xds_client/lrs_client.h )
s.files += %w( src/core/xds/xds_client/xds_api.cc )
s.files += %w( src/core/xds/xds_client/xds_api.h )
s.files += %w( src/core/xds/xds_client/xds_bootstrap.cc )
@@ -2040,8 +2042,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/xds/xds_client/xds_channel_args.h )
s.files += %w( src/core/xds/xds_client/xds_client.cc )
s.files += %w( src/core/xds/xds_client/xds_client.h )
- s.files += %w( src/core/xds/xds_client/xds_client_stats.cc )
- s.files += %w( src/core/xds/xds_client/xds_client_stats.h )
+ s.files += %w( src/core/xds/xds_client/xds_locality.h )
s.files += %w( src/core/xds/xds_client/xds_metrics.h )
s.files += %w( src/core/xds/xds_client/xds_resource_type.h )
s.files += %w( src/core/xds/xds_client/xds_resource_type_impl.h )
diff --git a/package.xml b/package.xml
index 341c3a6d8272d..7982ab5359669 100644
--- a/package.xml
+++ b/package.xml
@@ -2015,6 +2015,8 @@
+
+
@@ -2022,8 +2024,7 @@
-
-
+
diff --git a/src/core/BUILD b/src/core/BUILD
index f5949c8e05f17..9483e15f0516c 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -5696,6 +5696,7 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/cleanup",
+ "absl/container:flat_hash_map",
"absl/functional:bind_front",
"absl/log:check",
"absl/log:log",
diff --git a/src/core/load_balancing/xds/xds_cluster_impl.cc b/src/core/load_balancing/xds/xds_cluster_impl.cc
index 66aed1e65d363..eb1a4b3259a53 100644
--- a/src/core/load_balancing/xds/xds_cluster_impl.cc
+++ b/src/core/load_balancing/xds/xds_cluster_impl.cc
@@ -73,7 +73,7 @@
#include "src/core/xds/grpc/xds_endpoint.h"
#include "src/core/xds/xds_client/xds_bootstrap.h"
#include "src/core/xds/xds_client/xds_client.h"
-#include "src/core/xds/xds_client/xds_client_stats.h"
+#include "src/core/xds/xds_client/xds_locality.h"
namespace grpc_core {
@@ -189,13 +189,13 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
private:
class StatsSubchannelWrapper final : public DelegatingSubchannel {
public:
- // If load reporting is enabled and we have an XdsClusterLocalityStats
+ // If load reporting is enabled and we have a ClusterLocalityStats
// object, that object already contains the locality label. We
// need to store the locality label directly only in the case where
// load reporting is disabled.
using LocalityData = absl::variant<
RefCountedStringValue /*locality*/,
- RefCountedPtr /*locality_stats*/>;
+ RefCountedPtr /*locality_stats*/>;
StatsSubchannelWrapper(
RefCountedPtr wrapped_subchannel,
@@ -209,20 +209,20 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
return Match(
locality_data_,
[](RefCountedStringValue locality) { return locality; },
- [](const RefCountedPtr& locality_stats) {
+ [](const RefCountedPtr&
+ locality_stats) {
return locality_stats->locality_name()->human_readable_string();
});
}
- XdsClusterLocalityStats* locality_stats() const {
+ LrsClient::ClusterLocalityStats* locality_stats() const {
return Match(
locality_data_,
[](const RefCountedStringValue&) {
- return static_cast(nullptr);
+ return static_cast(nullptr);
},
- [](const RefCountedPtr& locality_stats) {
- return locality_stats.get();
- });
+ [](const RefCountedPtr&
+ locality_stats) { return locality_stats.get(); });
}
const grpc_event_engine::experimental::Slice& hostname() const {
@@ -250,7 +250,7 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
RefCountedStringValue service_telemetry_label_;
RefCountedStringValue namespace_telemetry_label_;
RefCountedPtr drop_config_;
- RefCountedPtr drop_stats_;
+ RefCountedPtr drop_stats_;
RefCountedPtr picker_;
};
@@ -304,7 +304,7 @@ class XdsClusterImplLb final : public LoadBalancingPolicy {
RefCountedPtr xds_client_;
// The stats for client-side load reporting.
- RefCountedPtr drop_stats_;
+ RefCountedPtr drop_stats_;
OrphanablePtr child_policy_;
@@ -324,7 +324,7 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker final
SubchannelCallTracker(
std::unique_ptr
original_subchannel_call_tracker,
- RefCountedPtr locality_stats,
+ RefCountedPtr locality_stats,
RefCountedPtr call_counter)
: original_subchannel_call_tracker_(
std::move(original_subchannel_call_tracker)),
@@ -380,7 +380,7 @@ class XdsClusterImplLb::Picker::SubchannelCallTracker final
private:
std::unique_ptr
original_subchannel_call_tracker_;
- RefCountedPtr locality_stats_;
+ RefCountedPtr locality_stats_;
RefCountedPtr call_counter_;
#ifndef NDEBUG
bool started_ = false;
@@ -454,7 +454,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
subchannel_wrapper->locality());
}
// Handle load reporting.
- RefCountedPtr locality_stats;
+ RefCountedPtr locality_stats;
if (subchannel_wrapper->locality_stats() != nullptr) {
locality_stats = subchannel_wrapper->locality_stats()->Ref(
DEBUG_LOCATION, "SubchannelCallTracker");
@@ -618,14 +618,15 @@ absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
// Note: We need a drop stats object whenever load reporting is enabled,
// even if we have no EDS drop config, because we also use it when
// reporting circuit breaker drops.
- if (!new_cluster_config.cluster->lrs_load_reporting_server.has_value()) {
+ if (new_cluster_config.cluster->lrs_load_reporting_server == nullptr) {
drop_stats_.reset();
} else if (cluster_resource_ == nullptr ||
old_eds_service_name != new_eds_service_name ||
- cluster_resource_->lrs_load_reporting_server !=
- new_cluster_config.cluster->lrs_load_reporting_server) {
- drop_stats_ = xds_client_->AddClusterDropStats(
- *new_cluster_config.cluster->lrs_load_reporting_server,
+ !LrsServersEqual(
+ cluster_resource_->lrs_load_reporting_server,
+ new_cluster_config.cluster->lrs_load_reporting_server)) {
+ drop_stats_ = xds_client_->lrs_client().AddClusterDropStats(
+ new_cluster_config.cluster->lrs_load_reporting_server,
new_config->cluster_name(), new_eds_service_name);
if (drop_stats_ == nullptr) {
LOG(ERROR)
@@ -819,12 +820,13 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel(
// (if load reporting is enabled) the locality stats object, which
// will be used by the picker.
auto locality_name = per_address_args.GetObjectRef();
- RefCountedPtr locality_stats;
- if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) {
- locality_stats = parent()->xds_client_->AddClusterLocalityStats(
- parent()->cluster_resource_->lrs_load_reporting_server.value(),
- parent()->config_->cluster_name(),
- GetEdsResourceName(*parent()->cluster_resource_), locality_name);
+ RefCountedPtr locality_stats;
+ if (parent()->cluster_resource_->lrs_load_reporting_server != nullptr) {
+ locality_stats =
+ parent()->xds_client_->lrs_client().AddClusterLocalityStats(
+ parent()->cluster_resource_->lrs_load_reporting_server,
+ parent()->config_->cluster_name(),
+ GetEdsResourceName(*parent()->cluster_resource_), locality_name);
if (locality_stats == nullptr) {
LOG(ERROR)
<< "[xds_cluster_impl_lb " << parent()
diff --git a/src/core/load_balancing/xds/xds_wrr_locality.cc b/src/core/load_balancing/xds/xds_wrr_locality.cc
index ce3f3079e2293..efdb91844d6ae 100644
--- a/src/core/load_balancing/xds/xds_wrr_locality.cc
+++ b/src/core/load_balancing/xds/xds_wrr_locality.cc
@@ -50,7 +50,7 @@
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/ref_counted_string.h"
#include "src/core/util/validation_errors.h"
-#include "src/core/xds/xds_client/xds_client_stats.h"
+#include "src/core/xds/xds_client/xds_locality.h"
namespace grpc_core {
diff --git a/src/core/xds/grpc/xds_client_grpc.cc b/src/core/xds/grpc/xds_client_grpc.cc
index 81425bb5d21f1..bc47f168b7170 100644
--- a/src/core/xds/grpc/xds_client_grpc.cc
+++ b/src/core/xds/grpc/xds_client_grpc.cc
@@ -242,7 +242,7 @@ absl::StatusOr> GrpcXdsClient::GetOrCreate(
auto channel_args = ChannelArgs::FromC(xds_channel_args);
return MakeRefCounted(
key, std::move(*bootstrap), channel_args,
- MakeOrphanable(channel_args));
+ MakeRefCounted(channel_args));
}
// Otherwise, use the global instance.
MutexLock lock(g_mu);
@@ -265,7 +265,7 @@ absl::StatusOr> GrpcXdsClient::GetOrCreate(
auto channel_args = ChannelArgs::FromC(g_channel_args);
auto xds_client = MakeRefCounted(
key, std::move(*bootstrap), channel_args,
- MakeOrphanable(channel_args));
+ MakeRefCounted(channel_args));
g_xds_client_map->emplace(xds_client->key(), xds_client.get());
GRPC_TRACE_LOG(xds_client, INFO) << "[xds_client " << xds_client.get()
<< "] Created xDS client for key " << key;
@@ -286,21 +286,28 @@ GlobalStatsPluginRegistry::StatsPluginGroup GetStatsPluginGroupForKey(
return GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
}
+std::string UserAgentName() {
+ return absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING,
+ GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING);
+}
+
+std::string UserAgentVersion() {
+ return absl::StrCat("C-core ", grpc_version_string(),
+ GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING,
+ GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING);
+}
+
} // namespace
GrpcXdsClient::GrpcXdsClient(
- absl::string_view key, std::unique_ptr bootstrap,
+ absl::string_view key, std::shared_ptr bootstrap,
const ChannelArgs& args,
- OrphanablePtr transport_factory)
+ RefCountedPtr transport_factory)
: XdsClient(
- std::move(bootstrap), std::move(transport_factory),
+ bootstrap, transport_factory,
grpc_event_engine::experimental::GetDefaultEventEngine(),
- std::make_unique(*this),
- absl::StrCat("gRPC C-core ", GPR_PLATFORM_STRING,
- GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING),
- absl::StrCat("C-core ", grpc_version_string(),
- GRPC_XDS_USER_AGENT_NAME_SUFFIX_STRING,
- GRPC_XDS_USER_AGENT_VERSION_SUFFIX_STRING),
+ std::make_unique(*this), UserAgentName(),
+ UserAgentVersion(),
std::max(Duration::Zero(),
args.GetDurationFromIntMillis(
GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS)
@@ -314,11 +321,16 @@ GrpcXdsClient::GrpcXdsClient(
[this](CallbackMetricReporter& reporter) {
ReportCallbackMetrics(reporter);
},
- Duration::Seconds(5), kMetricConnected, kMetricResources)) {}
+ Duration::Seconds(5), kMetricConnected, kMetricResources)),
+ lrs_client_(MakeRefCounted(
+ std::move(bootstrap), UserAgentName(), UserAgentVersion(),
+ std::move(transport_factory),
+ grpc_event_engine::experimental::GetDefaultEventEngine())) {}
void GrpcXdsClient::Orphaned() {
registered_metric_callback_.reset();
XdsClient::Orphaned();
+ lrs_client_.reset();
MutexLock lock(g_mu);
auto it = g_xds_client_map->find(key_);
if (it != g_xds_client_map->end() && it->second == this) {
@@ -326,6 +338,11 @@ void GrpcXdsClient::Orphaned() {
}
}
+void GrpcXdsClient::ResetBackoff() {
+ XdsClient::ResetBackoff();
+ lrs_client_->ResetBackoff();
+}
+
grpc_pollset_set* GrpcXdsClient::interested_parties() const {
return reinterpret_cast(transport_factory())
->interested_parties();
diff --git a/src/core/xds/grpc/xds_client_grpc.h b/src/core/xds/grpc/xds_client_grpc.h
index 3093bcab1981e..bf6828f20f6d0 100644
--- a/src/core/xds/grpc/xds_client_grpc.h
+++ b/src/core/xds/grpc/xds_client_grpc.h
@@ -34,6 +34,7 @@
#include "src/core/util/useful.h"
#include "src/core/xds/grpc/certificate_provider_store.h"
#include "src/core/xds/grpc/xds_bootstrap_grpc.h"
+#include "src/core/xds/xds_client/lrs_client.h"
#include "src/core/xds/xds_client/xds_client.h"
#include "src/core/xds/xds_client/xds_transport.h"
@@ -61,9 +62,9 @@ class GrpcXdsClient final : public XdsClient {
// that also use certificate_provider_store(), but we should consider
// alternatives for that case as well.
GrpcXdsClient(absl::string_view key,
- std::unique_ptr bootstrap,
+ std::shared_ptr bootstrap,
const ChannelArgs& args,
- OrphanablePtr transport_factory);
+ RefCountedPtr transport_factory);
// Helpers for encoding the XdsClient object in channel args.
static absl::string_view ChannelArgName() {
@@ -73,6 +74,8 @@ class GrpcXdsClient final : public XdsClient {
return QsortCompare(a, b);
}
+ void ResetBackoff() override;
+
grpc_pollset_set* interested_parties() const;
CertificateProviderStore& certificate_provider_store() const {
@@ -81,6 +84,8 @@ class GrpcXdsClient final : public XdsClient {
absl::string_view key() const { return key_; }
+ LrsClient& lrs_client() { return *lrs_client_; }
+
// Builds ClientStatusResponse containing all resources from all XdsClients
static grpc_slice DumpAllClientConfigs();
@@ -94,6 +99,7 @@ class GrpcXdsClient final : public XdsClient {
OrphanablePtr certificate_provider_store_;
GlobalStatsPluginRegistry::StatsPluginGroup stats_plugin_group_;
std::unique_ptr registered_metric_callback_;
+ RefCountedPtr lrs_client_;
};
namespace internal {
diff --git a/src/core/xds/grpc/xds_cluster.cc b/src/core/xds/grpc/xds_cluster.cc
index 1de3379682eb6..2d4c2834ff04b 100644
--- a/src/core/xds/grpc/xds_cluster.cc
+++ b/src/core/xds/grpc/xds_cluster.cc
@@ -49,7 +49,7 @@ std::string XdsClusterResource::ToString() const {
});
contents.push_back(absl::StrCat("lb_policy_config=",
JsonDump(Json::FromArray(lb_policy_config))));
- if (lrs_load_reporting_server.has_value()) {
+ if (lrs_load_reporting_server != nullptr) {
contents.push_back(absl::StrCat("lrs_load_reporting_server_name=",
lrs_load_reporting_server->server_uri()));
}
diff --git a/src/core/xds/grpc/xds_cluster.h b/src/core/xds/grpc/xds_cluster.h
index 7021b61b30089..f733a9328f8aa 100644
--- a/src/core/xds/grpc/xds_cluster.h
+++ b/src/core/xds/grpc/xds_cluster.h
@@ -35,6 +35,15 @@
namespace grpc_core {
+inline bool LrsServersEqual(
+ const std::shared_ptr& lrs_server1,
+ const std::shared_ptr& lrs_server2) {
+ if (lrs_server1 == nullptr) return lrs_server2 == nullptr;
+ if (lrs_server2 == nullptr) return false;
+ // Neither one is null, so compare them.
+ return *lrs_server1 == *lrs_server2;
+}
+
struct XdsClusterResource : public XdsResourceType::ResourceData {
struct Eds {
// If empty, defaults to the cluster name.
@@ -71,8 +80,8 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
// Note: Remaining fields are not used for aggregate clusters.
// The LRS server to use for load reporting.
- // If not set, load reporting will be disabled.
- absl::optional lrs_load_reporting_server;
+ // If null, load reporting will be disabled.
+ std::shared_ptr lrs_load_reporting_server;
// Tls Context used by clients
CommonTlsContext common_tls_context;
@@ -92,7 +101,8 @@ struct XdsClusterResource : public XdsResourceType::ResourceData {
bool operator==(const XdsClusterResource& other) const {
return type == other.type && lb_policy_config == other.lb_policy_config &&
- lrs_load_reporting_server == other.lrs_load_reporting_server &&
+ LrsServersEqual(lrs_load_reporting_server,
+ other.lrs_load_reporting_server) &&
common_tls_context == other.common_tls_context &&
connection_idle_timeout == other.connection_idle_timeout &&
max_concurrent_requests == other.max_concurrent_requests &&
diff --git a/src/core/xds/grpc/xds_cluster_parser.cc b/src/core/xds/grpc/xds_cluster_parser.cc
index 53a88d6ef9ac0..4b0f2683f1021 100644
--- a/src/core/xds/grpc/xds_cluster_parser.cc
+++ b/src/core/xds/grpc/xds_cluster_parser.cc
@@ -454,7 +454,7 @@ absl::StatusOr> CdsResourceParse(
ValidationErrors::ScopedField field(&errors, ".lrs_server");
errors.AddError("ConfigSource is not self");
}
- cds_update->lrs_load_reporting_server.emplace(
+ cds_update->lrs_load_reporting_server = std::make_shared(
static_cast(context.server));
}
// Protocol options.
diff --git a/src/core/xds/grpc/xds_endpoint.h b/src/core/xds/grpc/xds_endpoint.h
index 191a3b73b4299..4a07e5871c0c5 100644
--- a/src/core/xds/grpc/xds_endpoint.h
+++ b/src/core/xds/grpc/xds_endpoint.h
@@ -29,7 +29,7 @@
#include "src/core/util/ref_counted.h"
#include "src/core/util/ref_counted_ptr.h"
#include "src/core/util/sync.h"
-#include "src/core/xds/xds_client/xds_client_stats.h"
+#include "src/core/xds/xds_client/xds_locality.h"
#include "src/core/xds/xds_client/xds_resource_type.h"
#include "src/core/xds/xds_client/xds_resource_type_impl.h"
diff --git a/src/core/xds/grpc/xds_transport_grpc.cc b/src/core/xds/grpc/xds_transport_grpc.cc
index 435a07083bad0..5bc7526695975 100644
--- a/src/core/xds/grpc/xds_transport_grpc.cc
+++ b/src/core/xds/grpc/xds_transport_grpc.cc
@@ -41,6 +41,7 @@
#include "src/core/lib/channel/channel_fwd.h"
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/config/core_configuration.h"
+#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"
@@ -68,7 +69,7 @@ namespace grpc_core {
//
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
- RefCountedPtr factory, Channel* channel,
+ WeakRefCountedPtr factory, Channel* channel,
const char* method,
std::unique_ptr event_handler)
: factory_(std::move(factory)), event_handler_(std::move(event_handler)) {
@@ -229,25 +230,24 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher final
: public AsyncConnectivityStateWatcherInterface {
public:
- explicit StateWatcher(
- std::function on_connectivity_failure)
- : on_connectivity_failure_(std::move(on_connectivity_failure)) {}
+ explicit StateWatcher(RefCountedPtr watcher)
+ : watcher_(std::move(watcher)) {}
private:
void OnConnectivityStateChange(grpc_connectivity_state new_state,
const absl::Status& status) override {
if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- on_connectivity_failure_(absl::Status(
+ watcher_->OnConnectivityFailure(absl::Status(
status.code(),
absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message())));
}
}
- std::function on_connectivity_failure_;
+ RefCountedPtr watcher_;
};
//
-// GrpcXdsClient::GrpcXdsTransport
+// GrpcXdsTransportFactory::GrpcXdsTransport
//
namespace {
@@ -264,35 +264,74 @@ RefCountedPtr CreateXdsChannel(const ChannelArgs& args,
} // namespace
GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
- GrpcXdsTransportFactory* factory, const XdsBootstrap::XdsServer& server,
- std::function on_connectivity_failure,
- absl::Status* status)
- : factory_(factory) {
- channel_ = CreateXdsChannel(factory->args_,
+ WeakRefCountedPtr factory,
+ const XdsBootstrap::XdsServer& server, absl::Status* status)
+ : XdsTransport(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
+ ? "GrpcXdsTransport"
+ : nullptr),
+ factory_(std::move(factory)),
+ key_(server.Key()) {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[GrpcXdsTransport " << this << "] created";
+ channel_ = CreateXdsChannel(factory_->args_,
static_cast(server));
CHECK(channel_ != nullptr);
if (channel_->IsLame()) {
*status = absl::UnavailableError("xds client has a lame channel");
- } else {
- watcher_ = new StateWatcher(std::move(on_connectivity_failure));
- channel_->AddConnectivityWatcher(
- GRPC_CHANNEL_IDLE,
- OrphanablePtr(watcher_));
}
}
-void GrpcXdsTransportFactory::GrpcXdsTransport::Orphan() {
- if (!channel_->IsLame()) {
- channel_->RemoveConnectivityWatcher(watcher_);
+GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[GrpcXdsTransport " << this << "] destroying";
+}
+
+void GrpcXdsTransportFactory::GrpcXdsTransport::Orphaned() {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[GrpcXdsTransport " << this << "] orphaned";
+ {
+ MutexLock lock(&factory_->mu_);
+ auto it = factory_->transports_.find(key_);
+ if (it != factory_->transports_.end() && it->second == this) {
+ factory_->transports_.erase(it);
+ }
}
// Do an async hop before unreffing. This avoids a deadlock upon
// shutdown in the case where the xDS channel is itself an xDS channel
// (e.g., when using one control plane to find another control plane).
- grpc_event_engine::experimental::GetDefaultEventEngine()->Run([this]() {
- ApplicationCallbackExecCtx application_exec_ctx;
- ExecCtx exec_ctx;
- Unref();
- });
+ grpc_event_engine::experimental::GetDefaultEventEngine()->Run(
+ [self = WeakRefAsSubclass()]() mutable {
+ ApplicationCallbackExecCtx application_exec_ctx;
+ ExecCtx exec_ctx;
+ self.reset();
+ });
+}
+
+void GrpcXdsTransportFactory::GrpcXdsTransport::StartConnectivityFailureWatch(
+ RefCountedPtr watcher) {
+ if (channel_->IsLame()) return;
+ auto* state_watcher = new StateWatcher(watcher);
+ {
+ MutexLock lock(&mu_);
+ watchers_.emplace(watcher, state_watcher);
+ }
+ channel_->AddConnectivityWatcher(
+ GRPC_CHANNEL_IDLE,
+ OrphanablePtr(state_watcher));
+}
+
+void GrpcXdsTransportFactory::GrpcXdsTransport::StopConnectivityFailureWatch(
+ const RefCountedPtr& watcher) {
+ if (channel_->IsLame()) return;
+ StateWatcher* state_watcher = nullptr;
+ {
+ MutexLock lock(&mu_);
+ auto it = watchers_.find(watcher);
+ if (it == watchers_.end()) return;
+ state_watcher = it->second;
+ watchers_.erase(it);
+ }
+ channel_->RemoveConnectivityWatcher(state_watcher);
}
OrphanablePtr
@@ -300,9 +339,8 @@ GrpcXdsTransportFactory::GrpcXdsTransport::CreateStreamingCall(
const char* method,
std::unique_ptr event_handler) {
return MakeOrphanable(
- factory_->RefAsSubclass(DEBUG_LOCATION,
- "StreamingCall"),
- channel_.get(), method, std::move(event_handler));
+ factory_.WeakRef(DEBUG_LOCATION, "StreamingCall"), channel_.get(), method,
+ std::move(event_handler));
}
void GrpcXdsTransportFactory::GrpcXdsTransport::ResetBackoff() {
@@ -336,13 +374,22 @@ GrpcXdsTransportFactory::~GrpcXdsTransportFactory() {
ShutdownInternally();
}
-OrphanablePtr
-GrpcXdsTransportFactory::Create(
- const XdsBootstrap::XdsServer& server,
- std::function on_connectivity_failure,
- absl::Status* status) {
- return MakeOrphanable(
- this, server, std::move(on_connectivity_failure), status);
+RefCountedPtr
+GrpcXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server,
+ absl::Status* status) {
+ std::string key = server.Key();
+ RefCountedPtr transport;
+ MutexLock lock(&mu_);
+ auto it = transports_.find(key);
+ if (it != transports_.end()) {
+ transport = it->second->RefIfNonZero().TakeAsSubclass();
+ }
+ if (transport == nullptr) {
+ transport = MakeRefCounted(
+ WeakRefAsSubclass(), server, status);
+ transports_.emplace(std::move(key), transport.get());
+ }
+ return transport;
}
} // namespace grpc_core
diff --git a/src/core/xds/grpc/xds_transport_grpc.h b/src/core/xds/grpc/xds_transport_grpc.h
index 90ff36aa41ca4..7615d19f6c8ac 100644
--- a/src/core/xds/grpc/xds_transport_grpc.h
+++ b/src/core/xds/grpc/xds_transport_grpc.h
@@ -21,6 +21,7 @@
#include
#include
+#include "absl/container/flat_hash_map.h"
#include "absl/status/status.h"
#include
@@ -35,6 +36,7 @@
#include "src/core/lib/surface/channel.h"
#include "src/core/util/orphanable.h"
#include "src/core/util/ref_counted_ptr.h"
+#include "src/core/util/sync.h"
#include "src/core/xds/xds_client/xds_bootstrap.h"
#include "src/core/xds/xds_client/xds_transport.h"
@@ -47,18 +49,20 @@ class GrpcXdsTransportFactory final : public XdsTransportFactory {
explicit GrpcXdsTransportFactory(const ChannelArgs& args);
~GrpcXdsTransportFactory() override;
- void Orphan() override { Unref(); }
+ void Orphaned() override {}
- OrphanablePtr Create(
- const XdsBootstrap::XdsServer& server,
- std::function on_connectivity_failure,
- absl::Status* status) override;
+ RefCountedPtr GetTransport(
+ const XdsBootstrap::XdsServer& server, absl::Status* status) override;
grpc_pollset_set* interested_parties() const { return interested_parties_; }
private:
ChannelArgs args_;
grpc_pollset_set* interested_parties_;
+
+ Mutex mu_;
+ absl::flat_hash_map
+ transports_ ABSL_GUARDED_BY(&mu_);
};
class GrpcXdsTransportFactory::GrpcXdsTransport final
@@ -66,12 +70,16 @@ class GrpcXdsTransportFactory::GrpcXdsTransport final
public:
class GrpcStreamingCall;
- GrpcXdsTransport(GrpcXdsTransportFactory* factory,
- const XdsBootstrap::XdsServer& server,
- std::function on_connectivity_failure,
- absl::Status* status);
+ GrpcXdsTransport(WeakRefCountedPtr factory,
+ const XdsBootstrap::XdsServer& server, absl::Status* status);
+ ~GrpcXdsTransport() override;
- void Orphan() override;
+ void Orphaned() override;
+
+ void StartConnectivityFailureWatch(
+ RefCountedPtr watcher) override;
+ void StopConnectivityFailureWatch(
+ const RefCountedPtr& watcher) override;
OrphanablePtr CreateStreamingCall(
const char* method,
@@ -82,15 +90,19 @@ class GrpcXdsTransportFactory::GrpcXdsTransport final
private:
class StateWatcher;
- GrpcXdsTransportFactory* factory_; // Not owned.
+ WeakRefCountedPtr factory_;
+ std::string key_;
RefCountedPtr channel_;
- StateWatcher* watcher_;
+
+ Mutex mu_;
+ absl::flat_hash_map, StateWatcher*>
+ watchers_ ABSL_GUARDED_BY(&mu_);
};
class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall final
: public XdsTransportFactory::XdsTransport::StreamingCall {
public:
- GrpcStreamingCall(RefCountedPtr factory,
+ GrpcStreamingCall(WeakRefCountedPtr factory,
Channel* channel, const char* method,
std::unique_ptr event_handler);
~GrpcStreamingCall() override;
@@ -107,7 +119,7 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall final
static void OnResponseReceived(void* arg, grpc_error_handle /*error*/);
static void OnStatusReceived(void* arg, grpc_error_handle /*error*/);
- RefCountedPtr factory_;
+ WeakRefCountedPtr factory_;
std::unique_ptr event_handler_;
diff --git a/src/core/xds/xds_client/lrs_client.cc b/src/core/xds/xds_client/lrs_client.cc
new file mode 100644
index 0000000000000..b1767a0e03755
--- /dev/null
+++ b/src/core/xds/xds_client/lrs_client.cc
@@ -0,0 +1,1206 @@
+//
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "src/core/xds/xds_client/lrs_client.h"
+
+#include
+#include
+#include
+#include
+
+#include "absl/cleanup/cleanup.h"
+#include "absl/log/check.h"
+#include "absl/log/log.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "envoy/config/core/v3/base.upb.h"
+#include "envoy/config/endpoint/v3/load_report.upb.h"
+#include "envoy/service/load_stats/v3/lrs.upb.h"
+#include "envoy/service/load_stats/v3/lrs.upbdefs.h"
+#include "google/protobuf/duration.upb.h"
+#include "upb/base/string_view.h"
+#include "upb/mem/arena.h"
+#include "upb/reflection/def.h"
+#include "upb/text/encode.h"
+
+#include
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/util/backoff.h"
+#include "src/core/util/debug_location.h"
+#include "src/core/util/orphanable.h"
+#include "src/core/util/ref_counted_ptr.h"
+#include "src/core/util/sync.h"
+#include "src/core/util/upb_utils.h"
+#include "src/core/util/uri.h"
+#include "src/core/xds/xds_client/xds_api.h"
+#include "src/core/xds/xds_client/xds_bootstrap.h"
+#include "src/core/xds/xds_client/xds_locality.h"
+
+#define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
+#define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
+#define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
+#define GRPC_XDS_RECONNECT_JITTER 0.2
+#define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
+
+namespace grpc_core {
+
+using ::grpc_event_engine::experimental::EventEngine;
+
+namespace {
+
+uint64_t GetAndResetCounter(std::atomic* from) {
+ return from->exchange(0, std::memory_order_relaxed);
+}
+
+} // namespace
+
+//
+// LrsClient::ClusterDropStats
+//
+
+LrsClient::ClusterDropStats::ClusterDropStats(
+ RefCountedPtr lrs_client, absl::string_view lrs_server,
+ absl::string_view cluster_name, absl::string_view eds_service_name)
+ : RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
+ ? "ClusterDropStats"
+ : nullptr),
+ lrs_client_(std::move(lrs_client)),
+ lrs_server_(lrs_server),
+ cluster_name_(cluster_name),
+ eds_service_name_(eds_service_name) {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client_.get() << "] created drop stats " << this
+ << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
+ << eds_service_name_ << "}";
+}
+
+LrsClient::ClusterDropStats::~ClusterDropStats() {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client_.get() << "] destroying drop stats "
+ << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
+ << eds_service_name_ << "}";
+ lrs_client_->RemoveClusterDropStats(lrs_server_, cluster_name_,
+ eds_service_name_, this);
+ lrs_client_.reset(DEBUG_LOCATION, "ClusterDropStats");
+}
+
+LrsClient::ClusterDropStats::Snapshot
+LrsClient::ClusterDropStats::GetSnapshotAndReset() {
+ Snapshot snapshot;
+ snapshot.uncategorized_drops = GetAndResetCounter(&uncategorized_drops_);
+ MutexLock lock(&mu_);
+ snapshot.categorized_drops = std::move(categorized_drops_);
+ return snapshot;
+}
+
+void LrsClient::ClusterDropStats::AddUncategorizedDrops() {
+ uncategorized_drops_.fetch_add(1);
+}
+
+void LrsClient::ClusterDropStats::AddCallDropped(const std::string& category) {
+ MutexLock lock(&mu_);
+ ++categorized_drops_[category];
+}
+
+//
+// LrsClient::ClusterLocalityStats
+//
+
+LrsClient::ClusterLocalityStats::ClusterLocalityStats(
+ RefCountedPtr lrs_client, absl::string_view lrs_server,
+ absl::string_view cluster_name, absl::string_view eds_service_name,
+ RefCountedPtr name)
+ : RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
+ ? "ClusterLocalityStats"
+ : nullptr),
+ lrs_client_(std::move(lrs_client)),
+ lrs_server_(lrs_server),
+ cluster_name_(cluster_name),
+ eds_service_name_(eds_service_name),
+ name_(std::move(name)) {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client_.get() << "] created locality stats "
+ << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
+ << eds_service_name_ << ", "
+ << (name_ == nullptr ? "" : name_->human_readable_string().c_str())
+ << "}";
+}
+
+LrsClient::ClusterLocalityStats::~ClusterLocalityStats() {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client_.get() << "] destroying locality stats "
+ << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
+ << eds_service_name_ << ", "
+ << (name_ == nullptr ? "" : name_->human_readable_string().c_str())
+ << "}";
+ lrs_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_,
+ eds_service_name_, name_, this);
+ lrs_client_.reset(DEBUG_LOCATION, "ClusterLocalityStats");
+}
+
+LrsClient::ClusterLocalityStats::Snapshot
+LrsClient::ClusterLocalityStats::GetSnapshotAndReset() {
+ Snapshot snapshot;
+ for (auto& percpu_stats : stats_) {
+ Snapshot percpu_snapshot = {
+ GetAndResetCounter(&percpu_stats.total_successful_requests),
+ // Don't reset total_requests_in_progress because it's
+ // not related to a single reporting interval.
+ percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed),
+ GetAndResetCounter(&percpu_stats.total_error_requests),
+ GetAndResetCounter(&percpu_stats.total_issued_requests),
+ {}};
+ {
+ MutexLock lock(&percpu_stats.backend_metrics_mu);
+ percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics);
+ }
+ snapshot += percpu_snapshot;
+ }
+ return snapshot;
+}
+
+void LrsClient::ClusterLocalityStats::AddCallStarted() {
+ Stats& stats = stats_.this_cpu();
+ stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed);
+ stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed);
+}
+
+void LrsClient::ClusterLocalityStats::AddCallFinished(
+ const std::map* named_metrics, bool fail) {
+ Stats& stats = stats_.this_cpu();
+ std::atomic& to_increment =
+ fail ? stats.total_error_requests : stats.total_successful_requests;
+ to_increment.fetch_add(1, std::memory_order_relaxed);
+ stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel);
+ if (named_metrics == nullptr) return;
+ MutexLock lock(&stats.backend_metrics_mu);
+ for (const auto& m : *named_metrics) {
+ stats.backend_metrics[std::string(m.first)] += BackendMetric{1, m.second};
+ }
+}
+
+//
+// Internal class declarations
+//
+
+// A call wrapper that can restart a call upon failure.
+// The template parameter is the kind of wrapped call.
+// TODO(roth): This is basically the same code as in XdsClient, and
+// probably very similar to many other places in the codebase.
+// Consider refactoring this into a common utility library somehow.
+template
+class LrsClient::LrsChannel::RetryableCall final
+ : public InternallyRefCounted> {
+ public:
+ explicit RetryableCall(WeakRefCountedPtr lrs_channel);
+
+ // Disable thread-safety analysis because this method is called via
+ // OrphanablePtr<>, but there's no way to pass the lock annotation
+ // through there.
+ void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+ void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
+
+ T* call() const { return call_.get(); }
+ LrsChannel* lrs_channel() const { return lrs_channel_.get(); }
+
+ bool IsCurrentCallOnChannel() const;
+
+ private:
+ void StartNewCallLocked();
+ void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
+
+ void OnRetryTimer();
+
+ // The wrapped xds call that talks to the xds server. It's instantiated
+ // every time we start a new call. It's null during call retry backoff.
+ OrphanablePtr call_;
+ // The owning xds channel.
+ WeakRefCountedPtr lrs_channel_;
+
+ // Retry state.
+ BackOff backoff_;
+ absl::optional timer_handle_
+ ABSL_GUARDED_BY(&LrsClient::mu_);
+
+ bool shutting_down_ = false;
+};
+
+// An LRS call to the LRS server.
+class LrsClient::LrsChannel::LrsCall final
+ : public InternallyRefCounted {
+ public:
+ // The ctor and dtor should not be used directly.
+ explicit LrsCall(RefCountedPtr> retryable_call);
+
+ void Orphan() override;
+
+ RetryableCall* retryable_call() { return retryable_call_.get(); }
+ LrsChannel* lrs_channel() const { return retryable_call_->lrs_channel(); }
+ LrsClient* lrs_client() const { return lrs_channel()->lrs_client(); }
+ bool seen_response() const { return seen_response_; }
+
+ private:
+ class StreamEventHandler final
+ : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
+ public:
+ explicit StreamEventHandler(RefCountedPtr lrs_call)
+ : lrs_call_(std::move(lrs_call)) {}
+
+ void OnRequestSent(bool /*ok*/) override { lrs_call_->OnRequestSent(); }
+ void OnRecvMessage(absl::string_view payload) override {
+ lrs_call_->OnRecvMessage(payload);
+ }
+ void OnStatusReceived(absl::Status status) override {
+ lrs_call_->OnStatusReceived(std::move(status));
+ }
+
+ private:
+ RefCountedPtr lrs_call_;
+ };
+
+ // A repeating timer for a particular duration.
+ class Timer final : public InternallyRefCounted {
+ public:
+ explicit Timer(RefCountedPtr lrs_call)
+ : lrs_call_(std::move(lrs_call)) {}
+ ~Timer() override { lrs_call_.reset(DEBUG_LOCATION, "LRS timer"); }
+
+ // Disable thread-safety analysis because this method is called via
+ // OrphanablePtr<>, but there's no way to pass the lock annotation
+ // through there.
+ void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
+
+ void ScheduleNextReportLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
+
+ private:
+ bool IsCurrentTimerOnCall() const {
+ return this == lrs_call_->timer_.get();
+ }
+ LrsClient* lrs_client() const { return lrs_call_->lrs_client(); }
+
+ void OnNextReportTimer();
+
+ // The owning LRS call.
+ RefCountedPtr lrs_call_;
+
+ absl::optional timer_handle_
+ ABSL_GUARDED_BY(&LrsClient::mu_);
+ };
+
+ void MaybeScheduleNextReportLocked()
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
+
+ void SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
+
+ void SendMessageLocked(std::string payload)
+ ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
+
+ void OnRequestSent();
+ void OnRecvMessage(absl::string_view payload);
+ void OnStatusReceived(absl::Status status);
+
+ bool IsCurrentCallOnChannel() const;
+
+ // The owning RetryableCall<>.
+ RefCountedPtr> retryable_call_;
+
+ OrphanablePtr
+ streaming_call_;
+
+ bool seen_response_ = false;
+ bool send_message_pending_ ABSL_GUARDED_BY(&LrsClient::mu_) = false;
+
+ // Load reporting state.
+ bool send_all_clusters_ = false;
+ std::set cluster_names_; // Asked for by the LRS server.
+ Duration load_reporting_interval_;
+ bool last_report_counters_were_zero_ = false;
+ OrphanablePtr timer_;
+};
+
+//
+// LrsClient::LrsChannel
+//
+
+LrsClient::LrsChannel::LrsChannel(
+ WeakRefCountedPtr lrs_client,
+ std::shared_ptr server)
+ : DualRefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
+ ? "LrsChannel"
+ : nullptr),
+ lrs_client_(std::move(lrs_client)),
+ server_(std::move(server)) {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client_.get() << "] creating channel " << this
+ << " for server " << server_->server_uri();
+ absl::Status status;
+ transport_ = lrs_client_->transport_factory_->GetTransport(*server_, &status);
+ CHECK(transport_ != nullptr);
+ if (!status.ok()) {
+ LOG(ERROR) << "Error creating LRS channel to " << server_->server_uri()
+ << ": " << status;
+ }
+}
+
+LrsClient::LrsChannel::~LrsChannel() {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client() << "] destroying lrs channel " << this
+ << " for server " << server_->server_uri();
+ lrs_client_.reset(DEBUG_LOCATION, "LrsChannel");
+}
+
+// This method should only ever be called when holding the lock, but we can't
+// use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
+// called from DualRefCounted::Unref(), which cannot have a lock annotation for
+// a lock in this subclass.
+void LrsClient::LrsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client() << "] orphaning lrs channel " << this
+ << " for server " << server_->server_uri();
+ transport_.reset();
+ // At this time, all strong refs are removed, remove from channel map to
+ // prevent subsequent subscription from trying to use this LrsChannel as
+ // it is shutting down.
+ lrs_client_->lrs_channel_map_.erase(server_->Key());
+ lrs_call_.reset();
+}
+
+void LrsClient::LrsChannel::ResetBackoff() { transport_->ResetBackoff(); }
+
+void LrsClient::LrsChannel::MaybeStartLrsCall() {
+ if (lrs_call_ != nullptr) return;
+ lrs_call_ = MakeOrphanable>(
+ WeakRef(DEBUG_LOCATION, "LrsCall"));
+}
+
+void LrsClient::LrsChannel::StopLrsCallLocked() {
+ lrs_client_->load_report_map_.erase(server_->Key());
+ lrs_call_.reset();
+}
+
+//
+// LrsClient::LrsChannel::RetryableCall<>
+//
+
+template
+LrsClient::LrsChannel::RetryableCall::RetryableCall(
+ WeakRefCountedPtr lrs_channel)
+ : lrs_channel_(std::move(lrs_channel)),
+ backoff_(BackOff::Options()
+ .set_initial_backoff(Duration::Seconds(
+ GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
+ .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
+ .set_jitter(GRPC_XDS_RECONNECT_JITTER)
+ .set_max_backoff(Duration::Seconds(
+ GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
+ StartNewCallLocked();
+}
+
+template
+void LrsClient::LrsChannel::RetryableCall::Orphan() {
+ shutting_down_ = true;
+ call_.reset();
+ if (timer_handle_.has_value()) {
+ lrs_channel()->lrs_client()->engine()->Cancel(*timer_handle_);
+ timer_handle_.reset();
+ }
+ this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
+}
+
+template
+void LrsClient::LrsChannel::RetryableCall::OnCallFinishedLocked() {
+ // If we saw a response on the current stream, reset backoff.
+ if (call_->seen_response()) backoff_.Reset();
+ call_.reset();
+ // Start retry timer.
+ StartRetryTimerLocked();
+}
+
+template
+void LrsClient::LrsChannel::RetryableCall::StartNewCallLocked() {
+ if (shutting_down_) return;
+ CHECK(lrs_channel_->transport_ != nullptr);
+ CHECK(call_ == nullptr);
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": start new call from retryable call " << this;
+ call_ = MakeOrphanable(
+ this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
+}
+
+template
+void LrsClient::LrsChannel::RetryableCall::StartRetryTimerLocked() {
+ if (shutting_down_) return;
+ const Duration delay = backoff_.NextAttemptDelay();
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": call attempt failed; retry timer will fire in " << delay.millis()
+ << "ms.";
+ timer_handle_ = lrs_channel()->lrs_client()->engine()->RunAfter(
+ delay,
+ [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
+ ApplicationCallbackExecCtx callback_exec_ctx;
+ ExecCtx exec_ctx;
+ self->OnRetryTimer();
+ });
+}
+
+template
+void LrsClient::LrsChannel::RetryableCall::OnRetryTimer() {
+ MutexLock lock(&lrs_channel_->lrs_client()->mu_);
+ if (timer_handle_.has_value()) {
+ timer_handle_.reset();
+ if (shutting_down_) return;
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": retry timer fired (retryable call: " << this << ")";
+ StartNewCallLocked();
+ }
+}
+
+//
+// LrsClient::LrsChannel::LrsCall::Timer
+//
+
+void LrsClient::LrsChannel::LrsCall::Timer::Orphan() {
+ if (timer_handle_.has_value()) {
+ lrs_client()->engine()->Cancel(*timer_handle_);
+ timer_handle_.reset();
+ }
+ Unref(DEBUG_LOCATION, "Orphan");
+}
+
+void LrsClient::LrsChannel::LrsCall::Timer::ScheduleNextReportLocked() {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client() << "] lrs server "
+ << lrs_call_->lrs_channel()->server_->server_uri()
+ << ": scheduling next load report in "
+ << lrs_call_->load_reporting_interval_;
+ timer_handle_ = lrs_client()->engine()->RunAfter(
+ lrs_call_->load_reporting_interval_,
+ [self = Ref(DEBUG_LOCATION, "timer")]() {
+ ApplicationCallbackExecCtx callback_exec_ctx;
+ ExecCtx exec_ctx;
+ self->OnNextReportTimer();
+ });
+}
+
+void LrsClient::LrsChannel::LrsCall::Timer::OnNextReportTimer() {
+ MutexLock lock(&lrs_client()->mu_);
+ timer_handle_.reset();
+ if (IsCurrentTimerOnCall()) lrs_call_->SendReportLocked();
+}
+
+//
+// LrsClient::LrsChannel::LrsCall
+//
+
+LrsClient::LrsChannel::LrsCall::LrsCall(
+ RefCountedPtr> retryable_call)
+ : InternallyRefCounted(
+ GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "LrsCall" : nullptr),
+ retryable_call_(std::move(retryable_call)) {
+ // Init the LRS call. Note that the call will progress every time there's
+ // activity in lrs_client()->interested_parties_, which is comprised of
+ // the polling entities from client_channel.
+ CHECK_NE(lrs_client(), nullptr);
+ const char* method =
+ "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
+ streaming_call_ = lrs_channel()->transport_->CreateStreamingCall(
+ method, std::make_unique(
+ // Passing the initial ref here. This ref will go away when
+ // the StreamEventHandler is destroyed.
+ RefCountedPtr(this)));
+ CHECK(streaming_call_ != nullptr);
+ // Start the call.
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": starting LRS call (lrs_call=" << this
+ << ", streaming_call=" << streaming_call_.get() << ")";
+ // Send the initial request.
+ std::string serialized_payload = lrs_client()->CreateLrsInitialRequest();
+ SendMessageLocked(std::move(serialized_payload));
+ // Read initial response.
+ streaming_call_->StartRecvMessage();
+}
+
+void LrsClient::LrsChannel::LrsCall::Orphan() {
+ timer_.reset();
+ // Note that the initial ref is held by the StreamEventHandler, which
+ // will be destroyed when streaming_call_ is destroyed, which may not happen
+ // here, since there may be other refs held to streaming_call_ by internal
+ // callbacks.
+ streaming_call_.reset();
+}
+
+void LrsClient::LrsChannel::LrsCall::MaybeScheduleNextReportLocked() {
+ // If there are no more registered stats to report, cancel the call.
+ auto it = lrs_client()->load_report_map_.find(lrs_channel()->server_->Key());
+ if (it == lrs_client()->load_report_map_.end() ||
+ it->second.load_report_map.empty()) {
+ it->second.lrs_channel->StopLrsCallLocked();
+ return;
+ }
+ // Don't start if the previous send_message op hasn't completed yet.
+ // If this happens, we'll be called again from OnRequestSent().
+ if (send_message_pending_) return;
+ // Don't start if no LRS response has arrived.
+ if (!seen_response()) return;
+ // If there is no timer, create one.
+ // This happens on the initial response and whenever the interval changes.
+ if (timer_ == nullptr) {
+ timer_ = MakeOrphanable(Ref(DEBUG_LOCATION, "LRS timer"));
+ }
+ // Schedule the next load report.
+ timer_->ScheduleNextReportLocked();
+}
+
+bool LrsClient::LoadReportCountersAreZero(
+ const ClusterLoadReportMap& snapshot) {
+ for (const auto& p : snapshot) {
+ const ClusterLoadReport& cluster_snapshot = p.second;
+ if (!cluster_snapshot.dropped_requests.IsZero()) return false;
+ for (const auto& q : cluster_snapshot.locality_stats) {
+ const ClusterLocalityStats::Snapshot& locality_snapshot = q.second;
+ if (!locality_snapshot.IsZero()) return false;
+ }
+ }
+ return true;
+}
+
+void LrsClient::LrsChannel::LrsCall::SendReportLocked() {
+ // Construct snapshot from all reported stats.
+ ClusterLoadReportMap snapshot = lrs_client()->BuildLoadReportSnapshotLocked(
+ *lrs_channel()->server_, send_all_clusters_, cluster_names_);
+ // Skip client load report if the counters were all zero in the last
+ // report and they are still zero in this one.
+ const bool old_val = last_report_counters_were_zero_;
+ last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
+ if (old_val && last_report_counters_were_zero_) {
+ MaybeScheduleNextReportLocked();
+ return;
+ }
+ // Send a request that contains the snapshot.
+ std::string serialized_payload =
+ lrs_client()->CreateLrsRequest(std::move(snapshot));
+ SendMessageLocked(std::move(serialized_payload));
+}
+
+void LrsClient::LrsChannel::LrsCall::SendMessageLocked(std::string payload) {
+ send_message_pending_ = true;
+ streaming_call_->SendMessage(std::move(payload));
+}
+
+void LrsClient::LrsChannel::LrsCall::OnRequestSent() {
+ MutexLock lock(&lrs_client()->mu_);
+ send_message_pending_ = false;
+ if (IsCurrentCallOnChannel()) MaybeScheduleNextReportLocked();
+}
+
+void LrsClient::LrsChannel::LrsCall::OnRecvMessage(absl::string_view payload) {
+ MutexLock lock(&lrs_client()->mu_);
+ // If we're no longer the current call, ignore the result.
+ if (!IsCurrentCallOnChannel()) return;
+ // Start recv after any code branch
+ auto cleanup = absl::MakeCleanup(
+ [call = streaming_call_.get()]() { call->StartRecvMessage(); });
+ // Parse the response.
+ bool send_all_clusters = false;
+ std::set new_cluster_names;
+ Duration new_load_reporting_interval;
+ absl::Status status = lrs_client()->ParseLrsResponse(
+ payload, &send_all_clusters, &new_cluster_names,
+ &new_load_reporting_interval);
+ if (!status.ok()) {
+ LOG(ERROR) << "[lrs_client " << lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": LRS response parsing failed: " << status;
+ return;
+ }
+ seen_response_ = true;
+ if (GRPC_TRACE_FLAG_ENABLED(xds_client)) {
+ LOG(INFO) << "[lrs_client " << lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": LRS response received, " << new_cluster_names.size()
+ << " cluster names, send_all_clusters=" << send_all_clusters
+ << ", load_report_interval="
+ << new_load_reporting_interval.millis() << "ms";
+ size_t i = 0;
+ for (const auto& name : new_cluster_names) {
+ LOG(INFO) << "[lrs_client " << lrs_client() << "] cluster_name " << i++
+ << ": " << name;
+ }
+ }
+ if (new_load_reporting_interval <
+ Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
+ new_load_reporting_interval =
+ Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": increased load_report_interval to minimum value "
+ << GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS << "ms";
+ }
+ // Ignore identical update.
+ if (send_all_clusters == send_all_clusters_ &&
+ cluster_names_ == new_cluster_names &&
+ load_reporting_interval_ == new_load_reporting_interval) {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": incoming LRS response identical to current, ignoring.";
+ return;
+ }
+ // If the interval has changed, we'll need to restart the timer below.
+ const bool restart_timer =
+ load_reporting_interval_ != new_load_reporting_interval;
+ // Record the new config.
+ send_all_clusters_ = send_all_clusters;
+ cluster_names_ = std::move(new_cluster_names);
+ load_reporting_interval_ = new_load_reporting_interval;
+ // Restart timer if needed.
+ if (restart_timer) {
+ timer_.reset();
+ MaybeScheduleNextReportLocked();
+ }
+}
+
+void LrsClient::LrsChannel::LrsCall::OnStatusReceived(absl::Status status) {
+ MutexLock lock(&lrs_client()->mu_);
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << lrs_client() << "] lrs server "
+ << lrs_channel()->server_->server_uri()
+ << ": LRS call status received (lrs_channel=" << lrs_channel()
+ << ", lrs_call=" << this << ", streaming_call=" << streaming_call_.get()
+ << "): " << status;
+ // Ignore status from a stale call.
+ if (IsCurrentCallOnChannel()) {
+ // Try to restart the call.
+ retryable_call_->OnCallFinishedLocked();
+ }
+}
+
+bool LrsClient::LrsChannel::LrsCall::IsCurrentCallOnChannel() const {
+ // If the retryable LRS call is null (which only happens when the lrs
+ // channel is shutting down), all the LRS calls are stale.
+ if (lrs_channel()->lrs_call_ == nullptr) return false;
+ return this == lrs_channel()->lrs_call_->call();
+}
+
+//
+// LrsClient
+//
+
+LrsClient::LrsClient(
+ std::shared_ptr bootstrap, std::string user_agent_name,
+ std::string user_agent_version,
+ RefCountedPtr transport_factory,
+ std::shared_ptr engine)
+ : DualRefCounted(
+ GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "LrsClient" : nullptr),
+ bootstrap_(std::move(bootstrap)),
+ user_agent_name_(std::move(user_agent_name)),
+ user_agent_version_(std::move(user_agent_version)),
+ transport_factory_(std::move(transport_factory)),
+ engine_(std::move(engine)) {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << this << "] creating lrs client";
+}
+
+LrsClient::~LrsClient() {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << this << "] destroying lrs client";
+}
+
+void LrsClient::Orphaned() {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << this << "] shutting down lrs client";
+ MutexLock lock(&mu_);
+ // We may still be sending lingering queued load report data, so don't
+ // just clear the load reporting map, but we do want to clear the refs
+ // we're holding to the LrsChannel objects, to make sure that
+ // everything shuts down properly.
+ for (auto& p : load_report_map_) {
+ p.second.lrs_channel.reset(DEBUG_LOCATION, "LrsClient::Orphan()");
+ }
+}
+
+RefCountedPtr LrsClient::GetOrCreateLrsChannelLocked(
+ std::shared_ptr server, const char* reason) {
+ std::string key = server->Key();
+ auto it = lrs_channel_map_.find(key);
+ if (it != lrs_channel_map_.end()) {
+ return it->second->Ref(DEBUG_LOCATION, reason);
+ }
+ // Channel not found, so create a new one.
+ auto lrs_channel = MakeRefCounted(
+ WeakRef(DEBUG_LOCATION, "LrsChannel"), std::move(server));
+ lrs_channel_map_[std::move(key)] = lrs_channel.get();
+ return lrs_channel;
+}
+
+RefCountedPtr LrsClient::AddClusterDropStats(
+ std::shared_ptr lrs_server,
+ absl::string_view cluster_name, absl::string_view eds_service_name) {
+ auto key =
+ std::make_pair(std::string(cluster_name), std::string(eds_service_name));
+ RefCountedPtr cluster_drop_stats;
+ {
+ MutexLock lock(&mu_);
+ // We jump through some hoops here to make sure that the
+ // absl::string_views stored in the ClusterDropStats object point
+ // to the strings in the load_report_map_ keys, so that
+ // they have the same lifetime.
+ auto server_it =
+ load_report_map_.emplace(lrs_server->Key(), LoadReportServer()).first;
+ if (server_it->second.lrs_channel == nullptr) {
+ server_it->second.lrs_channel = GetOrCreateLrsChannelLocked(
+ lrs_server, "load report map (drop stats)");
+ }
+ auto load_report_it = server_it->second.load_report_map
+ .emplace(std::move(key), LoadReportState())
+ .first;
+ LoadReportState& load_report_state = load_report_it->second;
+ if (load_report_state.drop_stats != nullptr) {
+ cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
+ }
+ if (cluster_drop_stats == nullptr) {
+ if (load_report_state.drop_stats != nullptr) {
+ load_report_state.deleted_drop_stats +=
+ load_report_state.drop_stats->GetSnapshotAndReset();
+ }
+ cluster_drop_stats = MakeRefCounted(
+ Ref(DEBUG_LOCATION, "DropStats"), server_it->first /*lrs_server*/,
+ load_report_it->first.first /*cluster_name*/,
+ load_report_it->first.second /*eds_service_name*/);
+ load_report_state.drop_stats = cluster_drop_stats.get();
+ }
+ server_it->second.lrs_channel->MaybeStartLrsCall();
+ }
+ return cluster_drop_stats;
+}
+
+void LrsClient::RemoveClusterDropStats(
+ absl::string_view lrs_server_key, absl::string_view cluster_name,
+ absl::string_view eds_service_name,
+ LrsClient::ClusterDropStats* cluster_drop_stats) {
+ MutexLock lock(&mu_);
+ auto server_it = load_report_map_.find(lrs_server_key);
+ if (server_it == load_report_map_.end()) return;
+ auto load_report_it = server_it->second.load_report_map.find(
+ std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
+ if (load_report_it == server_it->second.load_report_map.end()) return;
+ LoadReportState& load_report_state = load_report_it->second;
+ if (load_report_state.drop_stats == cluster_drop_stats) {
+ // Record final snapshot in deleted_drop_stats, which will be
+ // added to the next load report.
+ load_report_state.deleted_drop_stats +=
+ load_report_state.drop_stats->GetSnapshotAndReset();
+ load_report_state.drop_stats = nullptr;
+ }
+}
+
+RefCountedPtr
+LrsClient::AddClusterLocalityStats(
+ std::shared_ptr lrs_server,
+ absl::string_view cluster_name, absl::string_view eds_service_name,
+ RefCountedPtr locality) {
+ auto key =
+ std::make_pair(std::string(cluster_name), std::string(eds_service_name));
+ RefCountedPtr cluster_locality_stats;
+ {
+ MutexLock lock(&mu_);
+ // We jump through some hoops here to make sure that the
+ // absl::string_views stored in the ClusterLocalityStats object point
+ // to the strings in the load_report_map_ keys, so that
+ // they have the same lifetime.
+ auto server_it =
+ load_report_map_.emplace(lrs_server->Key(), LoadReportServer()).first;
+ if (server_it->second.lrs_channel == nullptr) {
+ server_it->second.lrs_channel = GetOrCreateLrsChannelLocked(
+ std::move(lrs_server), "load report map (locality stats)");
+ }
+ auto load_report_it = server_it->second.load_report_map
+ .emplace(std::move(key), LoadReportState())
+ .first;
+ LoadReportState& load_report_state = load_report_it->second;
+ LoadReportState::LocalityState& locality_state =
+ load_report_state.locality_stats[locality];
+ if (locality_state.locality_stats != nullptr) {
+ cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
+ }
+ if (cluster_locality_stats == nullptr) {
+ if (locality_state.locality_stats != nullptr) {
+ locality_state.deleted_locality_stats +=
+ locality_state.locality_stats->GetSnapshotAndReset();
+ }
+ cluster_locality_stats = MakeRefCounted(
+ Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first /*lrs_server*/,
+ load_report_it->first.first /*cluster_name*/,
+ load_report_it->first.second /*eds_service_name*/,
+ std::move(locality));
+ locality_state.locality_stats = cluster_locality_stats.get();
+ }
+ server_it->second.lrs_channel->MaybeStartLrsCall();
+ }
+ return cluster_locality_stats;
+}
+
+void LrsClient::RemoveClusterLocalityStats(
+ absl::string_view lrs_server_key, absl::string_view cluster_name,
+ absl::string_view eds_service_name,
+ const RefCountedPtr& locality,
+ ClusterLocalityStats* cluster_locality_stats) {
+ MutexLock lock(&mu_);
+ auto server_it = load_report_map_.find(lrs_server_key);
+ if (server_it == load_report_map_.end()) return;
+ auto load_report_it = server_it->second.load_report_map.find(
+ std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
+ if (load_report_it == server_it->second.load_report_map.end()) return;
+ LoadReportState& load_report_state = load_report_it->second;
+ auto locality_it = load_report_state.locality_stats.find(locality);
+ if (locality_it == load_report_state.locality_stats.end()) return;
+ LoadReportState::LocalityState& locality_state = locality_it->second;
+ if (locality_state.locality_stats == cluster_locality_stats) {
+ // Record final snapshot in deleted_locality_stats, which will be
+ // added to the next load report.
+ locality_state.deleted_locality_stats +=
+ locality_state.locality_stats->GetSnapshotAndReset();
+ locality_state.locality_stats = nullptr;
+ }
+}
+
+void LrsClient::ResetBackoff() {
+ MutexLock lock(&mu_);
+ for (auto& p : lrs_channel_map_) {
+ p.second->ResetBackoff();
+ }
+}
+
+LrsClient::ClusterLoadReportMap LrsClient::BuildLoadReportSnapshotLocked(
+ const XdsBootstrap::XdsServer& lrs_server, bool send_all_clusters,
+ const std::set& clusters) {
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << this << "] start building load report";
+ ClusterLoadReportMap snapshot_map;
+ auto server_it = load_report_map_.find(lrs_server.Key());
+ if (server_it == load_report_map_.end()) return snapshot_map;
+ auto& load_report_map = server_it->second.load_report_map;
+ for (auto load_report_it = load_report_map.begin();
+ load_report_it != load_report_map.end();) {
+ // Cluster key is cluster and EDS service name.
+ const auto& cluster_key = load_report_it->first;
+ LoadReportState& load_report = load_report_it->second;
+ // If the CDS response for a cluster indicates to use LRS but the
+ // LRS server does not say that it wants reports for this cluster,
+ // then we'll have stats objects here whose data we're not going to
+ // include in the load report. However, we still need to clear out
+ // the data from the stats objects, so that if the LRS server starts
+ // asking for the data in the future, we don't incorrectly include
+ // data from previous reporting intervals in that future report.
+ const bool record_stats =
+ send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
+ ClusterLoadReport snapshot;
+ // Aggregate drop stats.
+ snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
+ if (load_report.drop_stats != nullptr) {
+ snapshot.dropped_requests +=
+ load_report.drop_stats->GetSnapshotAndReset();
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << this << "] cluster=" << cluster_key.first
+ << " eds_service_name=" << cluster_key.second
+ << " drop_stats=" << load_report.drop_stats;
+ }
+ // Aggregate locality stats.
+ for (auto it = load_report.locality_stats.begin();
+ it != load_report.locality_stats.end();) {
+ const RefCountedPtr& locality_name = it->first;
+ auto& locality_state = it->second;
+ ClusterLocalityStats::Snapshot& locality_snapshot =
+ snapshot.locality_stats[locality_name];
+ locality_snapshot = std::move(locality_state.deleted_locality_stats);
+ if (locality_state.locality_stats != nullptr) {
+ locality_snapshot +=
+ locality_state.locality_stats->GetSnapshotAndReset();
+ GRPC_TRACE_LOG(xds_client, INFO)
+ << "[lrs_client " << this
+ << "] cluster=" << cluster_key.first.c_str()
+ << " eds_service_name=" << cluster_key.second.c_str()
+ << " locality=" << locality_name->human_readable_string().c_str()
+ << " locality_stats=" << locality_state.locality_stats;
+ }
+ // If the only thing left in this entry was final snapshots from
+ // deleted locality stats objects, remove the entry.
+ if (locality_state.locality_stats == nullptr) {
+ it = load_report.locality_stats.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ // Compute load report interval.
+ const Timestamp now = Timestamp::Now();
+ snapshot.load_report_interval = now - load_report.last_report_time;
+ load_report.last_report_time = now;
+ // Record snapshot.
+ if (record_stats) {
+ snapshot_map[cluster_key] = std::move(snapshot);
+ }
+ // If the only thing left in this entry was final snapshots from
+ // deleted stats objects, remove the entry.
+ if (load_report.locality_stats.empty() &&
+ load_report.drop_stats == nullptr) {
+ load_report_it = load_report_map.erase(load_report_it);
+ } else {
+ ++load_report_it;
+ }
+ }
+ return snapshot_map;
+}
+
+namespace {
+
+struct LrsApiContext {
+ LrsClient* client;
+ upb_DefPool* def_pool;
+ upb_Arena* arena;
+};
+
+void MaybeLogLrsRequest(
+ const LrsApiContext& context,
+ const envoy_service_load_stats_v3_LoadStatsRequest* request) {
+ if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
+ const upb_MessageDef* msg_type =
+ envoy_service_load_stats_v3_LoadStatsRequest_getmsgdef(
+ context.def_pool);
+ char buf[10240];
+ upb_TextEncode(reinterpret_cast(request), msg_type,
+ nullptr, 0, buf, sizeof(buf));
+ VLOG(2) << "[lrs_client " << context.client
+ << "] constructed LRS request: " << buf;
+ }
+}
+
+std::string SerializeLrsRequest(
+ const LrsApiContext& context,
+ const envoy_service_load_stats_v3_LoadStatsRequest* request) {
+ size_t output_length;
+ char* output = envoy_service_load_stats_v3_LoadStatsRequest_serialize(
+ request, context.arena, &output_length);
+ return std::string(output, output_length);
+}
+
+} // namespace
+
+std::string LrsClient::CreateLrsInitialRequest() {
+ upb::Arena arena;
+ const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
+ // Create a request.
+ envoy_service_load_stats_v3_LoadStatsRequest* request =
+ envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
+ // Populate node.
+ envoy_config_core_v3_Node* node_msg =
+ envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request,
+ arena.ptr());
+ PopulateXdsNode(bootstrap_->node(), user_agent_name_, user_agent_version_,
+ node_msg, arena.ptr());
+ envoy_config_core_v3_Node_add_client_features(
+ node_msg,
+ upb_StringView_FromString("envoy.lrs.supports_send_all_clusters"),
+ arena.ptr());
+ MaybeLogLrsRequest(context, request);
+ return SerializeLrsRequest(context, request);
+}
+
+namespace {
+
+void LocalityStatsPopulate(
+ const LrsApiContext& context,
+ envoy_config_endpoint_v3_UpstreamLocalityStats* output,
+ const XdsLocalityName& locality_name,
+ const LrsClient::ClusterLocalityStats::Snapshot& snapshot) {
+ // Set locality.
+ envoy_config_core_v3_Locality* locality =
+ envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_locality(
+ output, context.arena);
+ if (!locality_name.region().empty()) {
+ envoy_config_core_v3_Locality_set_region(
+ locality, StdStringToUpbString(locality_name.region()));
+ }
+ if (!locality_name.zone().empty()) {
+ envoy_config_core_v3_Locality_set_zone(
+ locality, StdStringToUpbString(locality_name.zone()));
+ }
+ if (!locality_name.sub_zone().empty()) {
+ envoy_config_core_v3_Locality_set_sub_zone(
+ locality, StdStringToUpbString(locality_name.sub_zone()));
+ }
+ // Set total counts.
+ envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_successful_requests(
+ output, snapshot.total_successful_requests);
+ envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_requests_in_progress(
+ output, snapshot.total_requests_in_progress);
+ envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_error_requests(
+ output, snapshot.total_error_requests);
+ envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_issued_requests(
+ output, snapshot.total_issued_requests);
+ // Add backend metrics.
+ for (const auto& p : snapshot.backend_metrics) {
+ const std::string& metric_name = p.first;
+ const LrsClient::ClusterLocalityStats::BackendMetric& metric_value =
+ p.second;
+ envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric =
+ envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats(
+ output, context.arena);
+ envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name(
+ load_metric, StdStringToUpbString(metric_name));
+ envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
+ load_metric, metric_value.num_requests_finished_with_metric);
+ envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value(
+ load_metric, metric_value.total_metric_value);
+ }
+}
+
+} // namespace
+
+std::string LrsClient::CreateLrsRequest(
+ ClusterLoadReportMap cluster_load_report_map) {
+ upb::Arena arena;
+ const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
+ // Create a request.
+ envoy_service_load_stats_v3_LoadStatsRequest* request =
+ envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
+ for (auto& p : cluster_load_report_map) {
+ const std::string& cluster_name = p.first.first;
+ const std::string& eds_service_name = p.first.second;
+ const ClusterLoadReport& load_report = p.second;
+ // Add cluster stats.
+ envoy_config_endpoint_v3_ClusterStats* cluster_stats =
+ envoy_service_load_stats_v3_LoadStatsRequest_add_cluster_stats(
+ request, arena.ptr());
+ // Set the cluster name.
+ envoy_config_endpoint_v3_ClusterStats_set_cluster_name(
+ cluster_stats, StdStringToUpbString(cluster_name));
+ // Set EDS service name, if non-empty.
+ if (!eds_service_name.empty()) {
+ envoy_config_endpoint_v3_ClusterStats_set_cluster_service_name(
+ cluster_stats, StdStringToUpbString(eds_service_name));
+ }
+ // Add locality stats.
+ for (const auto& p : load_report.locality_stats) {
+ const XdsLocalityName& locality_name = *p.first;
+ const auto& snapshot = p.second;
+ envoy_config_endpoint_v3_UpstreamLocalityStats* locality_stats =
+ envoy_config_endpoint_v3_ClusterStats_add_upstream_locality_stats(
+ cluster_stats, arena.ptr());
+ LocalityStatsPopulate(context, locality_stats, locality_name, snapshot);
+ }
+ // Add dropped requests.
+ uint64_t total_dropped_requests = 0;
+ for (const auto& p : load_report.dropped_requests.categorized_drops) {
+ const std::string& category = p.first;
+ const uint64_t count = p.second;
+ envoy_config_endpoint_v3_ClusterStats_DroppedRequests* dropped_requests =
+ envoy_config_endpoint_v3_ClusterStats_add_dropped_requests(
+ cluster_stats, arena.ptr());
+ envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_category(
+ dropped_requests, StdStringToUpbString(category));
+ envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_dropped_count(
+ dropped_requests, count);
+ total_dropped_requests += count;
+ }
+ total_dropped_requests += load_report.dropped_requests.uncategorized_drops;
+ // Set total dropped requests.
+ envoy_config_endpoint_v3_ClusterStats_set_total_dropped_requests(
+ cluster_stats, total_dropped_requests);
+ // Set real load report interval.
+ gpr_timespec timespec = load_report.load_report_interval.as_timespec();
+ google_protobuf_Duration* load_report_interval =
+ envoy_config_endpoint_v3_ClusterStats_mutable_load_report_interval(
+ cluster_stats, arena.ptr());
+ google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
+ google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
+ }
+ MaybeLogLrsRequest(context, request);
+ return SerializeLrsRequest(context, request);
+}
+
+namespace {
+
+void MaybeLogLrsResponse(
+ const LrsApiContext& context,
+ const envoy_service_load_stats_v3_LoadStatsResponse* response) {
+ if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
+ const upb_MessageDef* msg_type =
+ envoy_service_load_stats_v3_LoadStatsResponse_getmsgdef(
+ context.def_pool);
+ char buf[10240];
+ upb_TextEncode(reinterpret_cast(response), msg_type,
+ nullptr, 0, buf, sizeof(buf));
+ VLOG(2) << "[lrs_client " << context.client
+ << "] received LRS response: " << buf;
+ }
+}
+
+} // namespace
+
+absl::Status LrsClient::ParseLrsResponse(absl::string_view encoded_response,
+ bool* send_all_clusters,
+ std::set* cluster_names,
+ Duration* load_reporting_interval) {
+ upb::Arena arena;
+ // Decode the response.
+ const envoy_service_load_stats_v3_LoadStatsResponse* decoded_response =
+ envoy_service_load_stats_v3_LoadStatsResponse_parse(
+ encoded_response.data(), encoded_response.size(), arena.ptr());
+ // Parse the response.
+ if (decoded_response == nullptr) {
+ return absl::UnavailableError("Can't decode response.");
+ }
+ const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
+ MaybeLogLrsResponse(context, decoded_response);
+ // Check send_all_clusters.
+ if (envoy_service_load_stats_v3_LoadStatsResponse_send_all_clusters(
+ decoded_response)) {
+ *send_all_clusters = true;
+ } else {
+ // Store the cluster names.
+ size_t size;
+ const upb_StringView* clusters =
+ envoy_service_load_stats_v3_LoadStatsResponse_clusters(decoded_response,
+ &size);
+ for (size_t i = 0; i < size; ++i) {
+ cluster_names->emplace(UpbStringToStdString(clusters[i]));
+ }
+ }
+ // Get the load report interval.
+ const google_protobuf_Duration* load_reporting_interval_duration =
+ envoy_service_load_stats_v3_LoadStatsResponse_load_reporting_interval(
+ decoded_response);
+ *load_reporting_interval = Duration::FromSecondsAndNanoseconds(
+ google_protobuf_Duration_seconds(load_reporting_interval_duration),
+ google_protobuf_Duration_nanos(load_reporting_interval_duration));
+ return absl::OkStatus();
+}
+
+} // namespace grpc_core
diff --git a/src/core/xds/xds_client/lrs_client.h b/src/core/xds/xds_client/lrs_client.h
new file mode 100644
index 0000000000000..7bda6f68799ef
--- /dev/null
+++ b/src/core/xds/xds_client/lrs_client.h
@@ -0,0 +1,358 @@
+//
+// Copyright 2019 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
+#define GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
+
+#include
+#include