Skip to content

Commit

Permalink
[xDS] add support for xRFC TP3 (grpc#38381)
Browse files Browse the repository at this point in the history
As per gRFC A88 (grpc/proposal#466).

Closes grpc#38381

COPYBARA_INTEGRATE_REVIEW=grpc#38381 from markdroth:xrfc_tp3 8831037
PiperOrigin-RevId: 721842312
  • Loading branch information
markdroth authored and copybara-github committed Jan 31, 2025
1 parent bd94588 commit 01d649d
Show file tree
Hide file tree
Showing 4 changed files with 669 additions and 9 deletions.
141 changes: 139 additions & 2 deletions src/core/xds/xds_client/xds_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ class XdsClient::XdsChannel::AdsCall final
absl::string_view serialized_resource,
DecodeContext* context)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
void HandleServerReportedResourceError(size_t idx,
absl::string_view resource_name,
absl::Status status,
DecodeContext* context)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
absl::Status DecodeAdsResponse(absl::string_view encoded_response,
DecodeContext* context)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
Expand Down Expand Up @@ -1056,6 +1061,91 @@ void XdsClient::XdsChannel::AdsCall::ParseResource(
context->read_delay_handle);
}

void XdsClient::XdsChannel::AdsCall::HandleServerReportedResourceError(
size_t idx, absl::string_view resource_name, absl::Status status,
DecodeContext* context) {
std::string error_prefix = absl::StrCat(
"resource_errors index ", idx, ": ",
resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
if (resource_name.empty()) {
context->errors.emplace_back(
absl::StrCat(error_prefix, "resource_name unset"));
++context->num_invalid_resources;
return;
}
if (status.ok()) {
context->errors.emplace_back(
absl::StrCat(error_prefix, "error_detail must be non-OK"));
++context->num_invalid_resources;
return;
}
// Check the resource name.
auto parsed_resource_name =
xds_client()->ParseXdsResourceName(resource_name, context->type);
if (!parsed_resource_name.ok()) {
context->errors.emplace_back(
absl::StrCat(error_prefix, "Cannot parse xDS resource name"));
++context->num_invalid_resources;
return;
}
// Cancel resource-does-not-exist timer, if needed.
auto timer_it = state_map_.find(context->type);
if (timer_it != state_map_.end()) {
auto it = timer_it->second.subscribed_resources.find(
parsed_resource_name->authority);
if (it != timer_it->second.subscribed_resources.end()) {
auto res_it = it->second.find(parsed_resource_name->key);
if (res_it != it->second.end()) {
res_it->second->MarkSeen();
}
}
}
// Lookup the authority in the cache.
auto authority_it =
xds_client()->authority_state_map_.find(parsed_resource_name->authority);
if (authority_it == xds_client()->authority_state_map_.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
AuthorityState& authority_state = authority_it->second;
// Found authority, so look up type.
auto type_it = authority_state.resource_map.find(context->type);
if (type_it == authority_state.resource_map.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
auto& type_map = type_it->second;
// Found type, so look up resource key.
auto it = type_map.find(parsed_resource_name->key);
if (it == type_map.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
ResourceState& resource_state = it->second;
// If needed, record that we've seen this resource.
if (context->type->AllResourcesRequiredInSotW()) {
context->resources_seen[parsed_resource_name->authority].insert(
parsed_resource_name->key);
}
++context->num_invalid_resources;
// Update cache state.
const bool drop_cached_resource =
xds_channel()->server_.FailOnDataErrors() &&
(status.code() == absl::StatusCode::kNotFound ||
status.code() == absl::StatusCode::kPermissionDenied);
resource_state.SetReceivedError(context->version, std::move(status),
context->update_time, drop_cached_resource);
// If there is no cached resource (either because we didn't have one
// or because we just dropped it due to fail_on_data_errors), then notify
// via OnResourceChanged(); otherwise, notify via OnAmbientError().
if (!resource_state.HasResource()) {
xds_client()->NotifyWatchersOnResourceChanged(
resource_state.failed_status(), resource_state.watchers(),
context->read_delay_handle);
} else {
xds_client()->NotifyWatchersOnAmbientError(resource_state.failed_status(),
resource_state.watchers(),
context->read_delay_handle);
}
}

namespace {

void MaybeLogDiscoveryResponse(
Expand Down Expand Up @@ -1086,7 +1176,8 @@ absl::Status XdsClient::XdsChannel::AdsCall::DecodeAdsResponse(
}
MaybeLogDiscoveryResponse(xds_client(), xds_client()->def_pool_.ptr(),
response);
// Get the type_url, version, nonce, and number of resources.
// Get the type_url, version, nonce, number of resources, and number
// of errors.
context->type_url = std::string(absl::StripPrefix(
UpbStringToAbsl(
envoy_service_discovery_v3_DiscoveryResponse_type_url(response)),
Expand All @@ -1099,12 +1190,18 @@ absl::Status XdsClient::XdsChannel::AdsCall::DecodeAdsResponse(
const google_protobuf_Any* const* resources =
envoy_service_discovery_v3_DiscoveryResponse_resources(response,
&num_resources);
size_t num_errors = 0;
const envoy_service_discovery_v3_ResourceError* const* errors = nullptr;
if (XdsDataErrorHandlingEnabled()) {
errors = envoy_service_discovery_v3_DiscoveryResponse_resource_errors(
response, &num_errors);
}
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": received ADS response: type_url=" << context->type_url
<< ", version=" << context->version << ", nonce=" << context->nonce
<< ", num_resources=" << num_resources;
<< ", num_resources=" << num_resources << ", num_errors=" << num_errors;
context->type = xds_client()->GetResourceTypeLocked(context->type_url);
if (context->type == nullptr) {
return absl::InvalidArgumentError(
Expand Down Expand Up @@ -1149,6 +1246,29 @@ absl::Status XdsClient::XdsChannel::AdsCall::DecodeAdsResponse(
}
ParseResource(i, type_url, resource_name, serialized_resource, context);
}
// Process each error.
for (size_t i = 0; i < num_errors; ++i) {
absl::string_view name;
{
const envoy_service_discovery_v3_ResourceName* resource_name =
envoy_service_discovery_v3_ResourceError_resource_name(errors[i]);
if (resource_name != nullptr) {
name = UpbStringToAbsl(
envoy_service_discovery_v3_ResourceName_name(resource_name));
}
}
absl::Status status;
{
const google_rpc_Status* error_detail =
envoy_service_discovery_v3_ResourceError_error_detail(errors[i]);
if (error_detail != nullptr) {
status = absl::Status(
static_cast<absl::StatusCode>(google_rpc_Status_code(error_detail)),
UpbStringToAbsl(google_rpc_Status_message(error_detail)));
}
}
HandleServerReportedResourceError(i, name, std::move(status), context);
}
return absl::OkStatus();
}

Expand Down Expand Up @@ -1323,6 +1443,20 @@ void XdsClient::ResourceState::SetNacked(const std::string& version,
failed_update_time_ = update_time;
}

void XdsClient::ResourceState::SetReceivedError(const std::string& version,
absl::Status status,
Timestamp update_time,
bool drop_cached_resource) {
if (drop_cached_resource) {
resource_.reset();
serialized_proto_.clear();
}
client_status_ = ClientResourceStatus::RECEIVED_ERROR;
failed_version_ = version;
failed_status_ = std::move(status);
failed_update_time_ = update_time;
}

void XdsClient::ResourceState::SetDoesNotExistOnLdsOrCdsDeletion(
const std::string& version, Timestamp update_time,
bool drop_cached_resource) {
Expand Down Expand Up @@ -1359,6 +1493,9 @@ absl::string_view XdsClient::ResourceState::CacheStateString() const {
return "acked";
case ClientResourceStatus::NACKED:
return resource_ != nullptr ? "nacked_but_cached" : "nacked";
case ClientResourceStatus::RECEIVED_ERROR:
return resource_ != nullptr ? "received_error_but_cached"
: "received_error";
case ClientResourceStatus::TIMEOUT:
return "timeout";
}
Expand Down
10 changes: 8 additions & 2 deletions src/core/xds/xds_client/xds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,10 @@ class XdsClient : public DualRefCounted<XdsClient> {
ACKED,
// Client received this resource and replied with NACK.
NACKED,
// Server sent an error for the resource.
RECEIVED_ERROR,
// Client encountered timeout getting resource from server.
// TODO(roth): Remove explicit value when adding RECEIVED_ERROR.
TIMEOUT = 6,
TIMEOUT,
};
static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_REQUESTED) ==
ClientResourceStatus::REQUESTED);
Expand All @@ -291,6 +292,9 @@ class XdsClient : public DualRefCounted<XdsClient> {
ClientResourceStatus::ACKED);
static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_NACKED) ==
ClientResourceStatus::NACKED);
static_assert(
static_cast<ClientResourceStatus>(envoy_admin_v3_RECEIVED_ERROR) ==
ClientResourceStatus::RECEIVED_ERROR);
static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_TIMEOUT) ==
ClientResourceStatus::TIMEOUT);

Expand All @@ -308,6 +312,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
Timestamp update_time);
void SetNacked(const std::string& version, absl::string_view details,
Timestamp update_time, bool drop_cached_resource);
void SetReceivedError(const std::string& version, absl::Status status,
Timestamp update_time, bool drop_cached_resource);
void SetDoesNotExistOnLdsOrCdsDeletion(const std::string& version,
Timestamp update_time,
bool drop_cached_resource);
Expand Down
24 changes: 23 additions & 1 deletion src/proto/grpc/testing/xds/v3/discovery.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ message Status {
repeated google.protobuf.Any details = 3;
}

// Specifies a concrete resource name.
message ResourceName {
// The name of the resource.
string name = 1;
}

// An error associated with a specific resource name, returned to the
// client by the server.
message ResourceError {
// The name of the resource.
ResourceName resource_name = 1;

// The error reported for the resource.
Status error_detail = 2;
}

// [#protodoc-title: Common discovery API components]

// A DiscoveryRequest requests a set of versioned resources of the same type for
Expand Down Expand Up @@ -83,7 +99,7 @@ message DiscoveryRequest {
Status error_detail = 6;
}

// [#next-free-field: 7]
// [#next-free-field: 8]
message DiscoveryResponse {
// The version of the response data.
string version_info = 1;
Expand Down Expand Up @@ -120,6 +136,12 @@ message DiscoveryResponse {
// DiscoveryRequest bearing the nonce. The nonce is optional and is not
// required for non-stream based xDS implementations.
string nonce = 5;

// Errors associated with specific resources. Clients are expected to
// remember the most recent error for a given resource across responses;
// the error condition is not considered to be cleared until a response is
// received that contains the resource in the 'resources' field.
repeated ResourceError resource_errors = 7;
}

// [#next-free-field: 8]
Expand Down
Loading

0 comments on commit 01d649d

Please sign in to comment.