Skip to content

Commit

Permalink
[RlsLB] Fix Deadlock (grpc#37459)
Browse files Browse the repository at this point in the history
Internal bug: b/357864682

A lock ordering inversion was noticed with the following stacks -
```
[mutex.cc : 1418] RAW: Potential Mutex deadlock:
        @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck()
        @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock()
        @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock()
        @ 0x564f4be968c5 grpc::internal::OpenTelemetryPluginImpl::RemoveCallback()
        @ 0x564f4cd097b8 grpc_core::RegisteredMetricCallback::~RegisteredMetricCallback()
        @ 0x564f4c1f1216 std::default_delete<>::operator()()
        @ 0x564f4c1f157f std::__uniq_ptr_impl<>::reset()
        @ 0x564f4c1ee967 std::unique_ptr<>::reset()
        @ 0x564f4c352f44 grpc_core::GrpcXdsClient::Orphaned()
        @ 0x564f4c25dad1 grpc_core::DualRefCounted<>::Unref()
        @ 0x564f4c4653ed grpc_core::RefCountedPtr<>::reset()
        @ 0x564f4c463c73 grpc_core::XdsClusterDropStats::~XdsClusterDropStats()
        @ 0x564f4c463d02 grpc_core::XdsClusterDropStats::~XdsClusterDropStats()
        @ 0x564f4c25efa9 grpc_core::UnrefDelete::operator()<>()
        @ 0x564f4c25d5f0 grpc_core::RefCounted<>::Unref()
        @ 0x564f4c25c2d9 grpc_core::RefCountedPtr<>::~RefCountedPtr()
        @ 0x564f4c25b1d8 grpc_core::(anonymous namespace)::XdsClusterImplLb::Picker::~Picker()
        @ 0x564f4c25b240 grpc_core::(anonymous namespace)::XdsClusterImplLb::Picker::~Picker()
        @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>()
        @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref()
        @ 0x564f4c124fb8 grpc_core::DualRefCounted<>::Unref()
        @ 0x564f4c11f029 grpc_core::RefCountedPtr<>::~RefCountedPtr()
        @ 0x564f4c14e958 grpc_core::(anonymous namespace)::OutlierDetectionLb::Picker::~Picker()
        @ 0x564f4c14e980 grpc_core::(anonymous namespace)::OutlierDetectionLb::Picker::~Picker()
        @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>()
        @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref()
        @ 0x564f4c124fb8 grpc_core::DualRefCounted<>::Unref()
        @ 0x564f4c11f029 grpc_core::RefCountedPtr<>::~RefCountedPtr()
        @ 0x564f4c26bafc std::pair<>::~pair()
        @ 0x564f4c26bb28 __gnu_cxx::new_allocator<>::destroy<>()
        @ 0x564f4c26b88f std::allocator_traits<>::destroy<>()
        @ 0x564f4c26b297 std::_Rb_tree<>::_M_destroy_node()
        @ 0x564f4c26abfb std::_Rb_tree<>::_M_drop_node()
        @ 0x564f4c26a926 std::_Rb_tree<>::_M_erase()
        @ 0x564f4c26a6f0 std::_Rb_tree<>::~_Rb_tree()
        @ 0x564f4c26a62a std::map<>::~map()
        @ 0x564f4c2691a4 grpc_core::(anonymous namespace)::XdsClusterManagerLb::ClusterPicker::~ClusterPicker()
        @ 0x564f4c2691cc grpc_core::(anonymous namespace)::XdsClusterManagerLb::ClusterPicker::~ClusterPicker()
        @ 0x564f4c12c71a grpc_core::UnrefDelete::operator()<>()
        @ 0x564f4c1292ac grpc_core::DualRefCounted<>::WeakUnref()

[mutex.cc : 1428] RAW: Acquiring absl::Mutex 0x564f4f22ad40 while holding  0x7f939834bb70; a cycle in the historical lock ordering graph has been observed
[mutex.cc : 1432] RAW: Cycle:
[mutex.cc : 1446] RAW: mutex@0x564f4f22ad40 stack:
        @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck()
        @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock()
        @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock()
        @ 0x564f4be96124 grpc::internal::OpenTelemetryPluginImpl::AddCallback()
        @ 0x564f4cd096f0 grpc_core::RegisteredMetricCallback::RegisteredMetricCallback()
        @ 0x564f4c1f111b std::make_unique<>()
        @ 0x564f4c3564b0 grpc_core::GlobalStatsPluginRegistry::StatsPluginGroup::RegisterCallback<>()
        @ 0x564f4c352dea grpc_core::GrpcXdsClient::GrpcXdsClient()
        @ 0x564f4c355bc6 grpc_core::MakeRefCounted<>()
        @ 0x564f4c3525f2 grpc_core::GrpcXdsClient::GetOrCreate()
        @ 0x564f4c28f8f8 grpc_core::(anonymous namespace)::XdsResolver::StartLocked()
        @ 0x564f4c2f5f82 grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartXdsResolver()
        @ 0x564f4c2f515d grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::ZoneQueryDone()
        @ 0x564f4c2f496b grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartLocked()::{lambda()#1}::operator()()::{lambda()#1}::operator()()
        @ 0x564f4c2f80f6 std::__invoke_impl<>()
        @ 0x564f4c2f7b9d _ZSt10__invoke_rIvRZZN9grpc_core12_GLOBAL__N_124GoogleCloud2ProdResolver11StartLockedEvENUlNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEN4absl12lts_202401168StatusOrIS8_EEE_clES8_SC_EUlvE_J...
        @ 0x564f4c2f748c std::_Function_handler<>::_M_invoke()
        @ 0x564f4b8ad682 std::function<>::operator()()
        @ 0x564f4cd1c6bf grpc_core::WorkSerializer::LegacyWorkSerializer::Run()
        @ 0x564f4cd1dae4 grpc_core::WorkSerializer::Run()
        @ 0x564f4c2f4b0b grpc_core::(anonymous namespace)::GoogleCloud2ProdResolver::StartLocked()::{lambda()#1}::operator()()
        @ 0x564f4c2f8dc7 absl::lts_20240116::base_internal::Callable::Invoke<>()
        @ 0x564f4c2f8cb8 absl::lts_20240116::base_internal::invoke<>()
        @ 0x564f4c2f8b16 absl::lts_20240116::internal_any_invocable::InvokeR<>()
        @ 0x564f4c2f8a0c absl::lts_20240116::internal_any_invocable::LocalInvoker<>()
        @ 0x564f4c2fb88d absl::lts_20240116::internal_any_invocable::Impl<>::operator()()
        @ 0x564f4c2fb1f3 grpc_core::GcpMetadataQuery::OnDone()
        @ 0x564f4cd75a72 exec_ctx_run()
        @ 0x564f4cd75ba9 grpc_core::ExecCtx::Flush()
        @ 0x564f4cc8ee1d end_worker()
        @ 0x564f4cc8f304 pollset_work()
        @ 0x564f4cc5dcaf pollset_work()
        @ 0x564f4cc69220 grpc_pollset_work()
        @ 0x564f4cbe7733 cq_pluck()
        @ 0x564f4cbe7ad5 grpc_completion_queue_pluck
        @ 0x564f4bc61d96 grpc::CompletionQueue::Pluck()
        @ 0x564f4bfdb055 grpc::ClientReader<>::ClientReader<>()
        @ 0x564f4bfd6035 grpc::internal::ClientReaderFactory<>::Create<>()
        @ 0x564f4bfc322b google::storage::v2::Storage::Stub::ReadObjectRaw()
        @ 0x564f4bf9934b google::storage::v2::Storage::Stub::ReadObject()

[mutex.cc : 1446] RAW: mutex@0x7f939834bb70 stack:
        @ 0x564f4ce62fe5 absl::lts_20240116::DebugOnlyDeadlockCheck()
        @ 0x564f4ce632dc absl::lts_20240116::Mutex::Lock()
        @ 0x564f4be5886c absl::lts_20240116::MutexLock::MutexLock()
        @ 0x564f4c1ce9eb grpc_core::(anonymous namespace)::RlsLb::RlsLb()::{lambda()#1}::operator()()
        @ 0x564f4c1e794c absl::lts_20240116::base_internal::Callable::Invoke<>()
        @ 0x564f4c1e72c1 absl::lts_20240116::base_internal::invoke<>()
        @ 0x564f4c1e6af1 absl::lts_20240116::internal_any_invocable::InvokeR<>()
        @ 0x564f4c1e5d6c absl::lts_20240116::internal_any_invocable::LocalInvoker<>()
        @ 0x564f4be9d0c8 absl::lts_20240116::internal_any_invocable::Impl<>::operator()()
        @ 0x564f4be9b4ff grpc_core::RegisteredMetricCallback::Run()
        @ 0x564f4bea07ae grpc::internal::OpenTelemetryPluginImpl::CallbackGaugeState<>::CallbackGaugeCallback()
        @ 0x564f4bf844de opentelemetry::v1::sdk::metrics::ObservableRegistry::Observe()
        @ 0x564f4bf56529 opentelemetry::v1::sdk::metrics::Meter::Collect()
        @ 0x564f4bf8c1d5 opentelemetry::v1::sdk::metrics::MetricCollector::Collect()::{lambda()#1}::operator()()
        @ 0x564f4bf8c5ac opentelemetry::v1::nostd::function_ref<>::BindTo<>()::{lambda()#1}::operator()()
        @ 0x564f4bf8c5e8 opentelemetry::v1::nostd::function_ref<>::BindTo<>()::{lambda()#1}::_FUN()
        @ 0x564f4bf7604d opentelemetry::v1::nostd::function_ref<>::operator()()
        @ 0x564f4bf74ad9 opentelemetry::v1::sdk::metrics::MeterContext::ForEachMeter()
        @ 0x564f4bf8c457 opentelemetry::v1::sdk::metrics::MetricCollector::Collect()
        @ 0x564f4bf4a7fe opentelemetry::v1::sdk::metrics::MetricReader::Collect()
        @ 0x564f4bed5e24 opentelemetry::v1::exporter::metrics::PrometheusCollector::Collect()
        @ 0x564f4bef004f prometheus::detail::CollectMetrics()
        @ 0x564f4beec26d prometheus::detail::MetricsHandler::handleGet()
        @ 0x564f4bf1cd8b CivetServer::requestHandler()
        @ 0x564f4bf35e7b handle_request
        @ 0x564f4bf29534 handle_request_stat_log
        @ 0x564f4bf39b3f process_new_connection
        @ 0x564f4bf3a448 worker_thread_run
        @ 0x564f4bf3a57f worker_thread
        @ 0x7f93e9137ea7 start_thread

[mutex.cc : 1454] RAW: dying due to potential deadlock
Aborted
```

From the stack, it looks like we are ending up holding a lock to the `RlsLB` policy while removing a callback from the gRPC OpenTelemetry plugin, which is a lock ordering inversion. The correct order is `OpenTelemetry` -> `gRPC OpenTelemetry plugin` -> `gRPC Component like RLS/xDSClient`.

A common pattern we employ for metrics is for the callbacks to be unregistered when the corresponding component object is orphaned/destroyed (unreffing). Also, note that removing callbacks requires a lock in `gRPC OpenTelemetry plugin`. To avoid deadlocks, we remove the callback inside `RlsLb` from outside the critical region, but `RlsLb` owns refs to child policies which in turn hold refs to `XdsClient`. The lock ordering inversion occurred due to unreffing child policies within the critical region.

This PR is an alternative fix to this problem. Original fix in grpc#37425.
Verified that it fixes the bug.

Closes grpc#37459

COPYBARA_INTEGRATE_REVIEW=grpc#37459 from yashykt:FixDeadlocks ec7fbcf
PiperOrigin-RevId: 663360427
  • Loading branch information
yashykt authored and copybara-github committed Aug 15, 2024
1 parent a09aaf0 commit 7407dbf
Showing 1 changed file with 90 additions and 33 deletions.
123 changes: 90 additions & 33 deletions src/core/load_balancing/rls/rls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ class RlsLb final : public LoadBalancingPolicy {
// is called after releasing it.
//
// Both methods grab the data they need from the parent object.
void StartUpdate() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
void StartUpdate(OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
absl::Status MaybeFinishUpdate() ABSL_LOCKS_EXCLUDED(&RlsLb::mu_);

void ExitIdleLocked() {
Expand Down Expand Up @@ -397,14 +398,14 @@ class RlsLb final : public LoadBalancingPolicy {
};

// Note: We are forced to disable lock analysis here because
// Orphan() is called by Unref() which is called by RefCountedPtr<>, which
// Orphaned() is called by Unref() which is called by RefCountedPtr<>, which
// cannot have lock annotations for this particular caller.
void Orphaned() override ABSL_NO_THREAD_SAFETY_ANALYSIS;

RefCountedPtr<RlsLb> lb_policy_;
std::string target_;

bool is_shutdown_ = false;
bool is_shutdown_ = false; // Protected by WorkSerializer

OrphanablePtr<ChildPolicyHandler> child_policy_;
RefCountedPtr<LoadBalancingPolicy::Config> pending_config_;
Expand Down Expand Up @@ -503,12 +504,25 @@ class RlsLb final : public LoadBalancingPolicy {
// Returns a list of child policy wrappers on which FinishUpdate()
// needs to be called after releasing the lock.
std::vector<ChildPolicyWrapper*> OnRlsResponseLocked(
ResponseInfo response, std::unique_ptr<BackOff> backoff_state)
ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);

// Moves entry to the end of the LRU list.
void MarkUsed() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);

// Takes entries from child_policy_wrappers_ and appends them to the end
// of \a child_policy_wrappers.
void TakeChildPolicyWrappers(
std::vector<RefCountedPtr<ChildPolicyWrapper>>* child_policy_wrappers)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_) {
child_policy_wrappers->insert(
child_policy_wrappers->end(),
std::make_move_iterator(child_policy_wrappers_.begin()),
std::make_move_iterator(child_policy_wrappers_.end()));
child_policy_wrappers_.clear();
}

private:
class BackoffTimer final : public InternallyRefCounted<BackoffTimer> {
public:
Expand Down Expand Up @@ -566,19 +580,24 @@ class RlsLb final : public LoadBalancingPolicy {
// the caller. Otherwise, the entry found is returned to the caller. The
// entry returned to the user is considered recently used and its order in
// the LRU list of the cache is updated.
Entry* FindOrInsert(const RequestKey& key)
Entry* FindOrInsert(const RequestKey& key,
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);

// Resizes the cache. If the new cache size is greater than the current size
// of the cache, do nothing. Otherwise, evict the oldest entries that
// exceed the new size limit of the cache.
void Resize(size_t bytes) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
void Resize(size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);

// Resets backoff of all the cache entries.
void ResetAllBackoff() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);

// Shutdown the cache; clean-up and orphan all the stored cache entries.
void Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
GRPC_MUST_USE_RESULT std::vector<RefCountedPtr<ChildPolicyWrapper>>
Shutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);

void ReportMetricsLocked(CallbackMetricReporter& reporter)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);
Expand All @@ -594,7 +613,9 @@ class RlsLb final : public LoadBalancingPolicy {

// Evicts oversized cache elements when the current size is greater than
// the specified limit.
void MaybeShrinkSize(size_t bytes)
void MaybeShrinkSize(size_t bytes,
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&RlsLb::mu_);

RlsLb* lb_policy_;
Expand Down Expand Up @@ -857,7 +878,8 @@ absl::optional<Json> InsertOrUpdateChildPolicyField(const std::string& field,
return Json::FromArray(std::move(array));
}

void RlsLb::ChildPolicyWrapper::StartUpdate() {
void RlsLb::ChildPolicyWrapper::StartUpdate(
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
ValidationErrors errors;
auto child_policy_config = InsertOrUpdateChildPolicyField(
lb_policy_->config_->child_policy_config_target_field_name(), target_,
Expand All @@ -880,7 +902,7 @@ void RlsLb::ChildPolicyWrapper::StartUpdate() {
pending_config_.reset();
picker_ = MakeRefCounted<TransientFailurePicker>(
absl::UnavailableError(config.status().message()));
child_policy_.reset();
*child_policy_to_delete = std::move(child_policy_);
} else {
pending_config_ = std::move(*config);
}
Expand Down Expand Up @@ -934,9 +956,9 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
<< ": UpdateState(state=" << ConnectivityStateName(state)
<< ", status=" << status << ", picker=" << picker.get() << ")";
}
if (wrapper_->is_shutdown_) return;
{
MutexLock lock(&wrapper_->lb_policy_->mu_);
if (wrapper_->is_shutdown_) return;
// TODO(roth): It looks like this ignores subsequent TF updates that
// might change the status used to fail picks, which seems wrong.
if (wrapper_->connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
Expand All @@ -946,7 +968,8 @@ void RlsLb::ChildPolicyWrapper::ChildPolicyHelper::UpdateState(
wrapper_->connectivity_state_ = state;
DCHECK(picker != nullptr);
if (picker != nullptr) {
wrapper_->picker_ = std::move(picker);
// We want to unref the picker after we release the lock.
wrapper_->picker_.swap(picker);
}
}
wrapper_->lb_policy_->UpdatePickerLocked();
Expand Down Expand Up @@ -1194,18 +1217,19 @@ RlsLb::Cache::Entry::Entry(RefCountedPtr<RlsLb> lb_policy,
lb_policy_->cache_.lru_list_.end(), key)) {}

void RlsLb::Cache::Entry::Orphan() {
// We should be holding RlsLB::mu_.
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_.get() << "] cache entry=" << this << " "
<< lru_iterator_->ToString() << ": cache entry evicted";
is_shutdown_ = true;
lb_policy_->cache_.lru_list_.erase(lru_iterator_);
lru_iterator_ = lb_policy_->cache_.lru_list_.end(); // Just in case.
CHECK(child_policy_wrappers_.empty());
backoff_state_.reset();
if (backoff_timer_ != nullptr) {
backoff_timer_.reset();
lb_policy_->UpdatePickerAsync();
}
child_policy_wrappers_.clear();
Unref(DEBUG_LOCATION, "Orphan");
}

Expand Down Expand Up @@ -1284,7 +1308,8 @@ void RlsLb::Cache::Entry::MarkUsed() {

std::vector<RlsLb::ChildPolicyWrapper*>
RlsLb::Cache::Entry::OnRlsResponseLocked(
ResponseInfo response, std::unique_ptr<BackOff> backoff_state) {
ResponseInfo response, std::unique_ptr<BackOff> backoff_state,
OrphanablePtr<ChildPolicyHandler>* child_policy_to_delete) {
// Move the entry to the end of the LRU list.
MarkUsed();
// If the request failed, store the failed status and update the
Expand Down Expand Up @@ -1345,7 +1370,7 @@ RlsLb::Cache::Entry::OnRlsResponseLocked(
if (it == lb_policy_->child_policy_map_.end()) {
auto new_child = MakeRefCounted<ChildPolicyWrapper>(
lb_policy_.Ref(DEBUG_LOCATION, "ChildPolicyWrapper"), target);
new_child->StartUpdate();
new_child->StartUpdate(child_policy_to_delete);
child_policies_to_finish_update.push_back(new_child.get());
new_child_policy_wrappers.emplace_back(std::move(new_child));
} else {
Expand Down Expand Up @@ -1382,12 +1407,15 @@ RlsLb::Cache::Entry* RlsLb::Cache::Find(const RequestKey& key) {
return it->second.get();
}

RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(
const RequestKey& key, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete) {
auto it = map_.find(key);
// If not found, create new entry.
if (it == map_.end()) {
size_t entry_size = EntrySizeForKey(key);
MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size));
MaybeShrinkSize(size_limit_ - std::min(size_limit_, entry_size),
child_policy_wrappers_to_delete);
Entry* entry = new Entry(
lb_policy_->RefAsSubclass<RlsLb>(DEBUG_LOCATION, "CacheEntry"), key);
map_.emplace(key, OrphanablePtr<Entry>(entry));
Expand All @@ -1405,11 +1433,13 @@ RlsLb::Cache::Entry* RlsLb::Cache::FindOrInsert(const RequestKey& key) {
return it->second.get();
}

void RlsLb::Cache::Resize(size_t bytes) {
void RlsLb::Cache::Resize(size_t bytes,
std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_ << "] resizing cache to " << bytes << " bytes";
size_limit_ = bytes;
MaybeShrinkSize(size_limit_);
MaybeShrinkSize(size_limit_, child_policy_wrappers_to_delete);
}

void RlsLb::Cache::ResetAllBackoff() {
Expand All @@ -1419,7 +1449,12 @@ void RlsLb::Cache::ResetAllBackoff() {
lb_policy_->UpdatePickerAsync();
}

void RlsLb::Cache::Shutdown() {
std::vector<RefCountedPtr<RlsLb::ChildPolicyWrapper>> RlsLb::Cache::Shutdown() {
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
for (auto& entry : map_) {
entry.second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
}
map_.clear();
lru_list_.clear();
if (cleanup_timer_handle_.has_value() &&
Expand All @@ -1429,6 +1464,7 @@ void RlsLb::Cache::Shutdown() {
<< "[rlslb " << lb_policy_ << "] cache cleanup timer canceled";
}
cleanup_timer_handle_.reset();
return child_policy_wrappers_to_delete;
}

void RlsLb::Cache::ReportMetricsLocked(CallbackMetricReporter& reporter) {
Expand Down Expand Up @@ -1464,12 +1500,15 @@ void RlsLb::Cache::StartCleanupTimer() {
void RlsLb::Cache::OnCleanupTimer() {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << lb_policy_ << "] cache cleanup timer fired";
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
MutexLock lock(&lb_policy_->mu_);
if (!cleanup_timer_handle_.has_value()) return;
if (lb_policy_->is_shutdown_) return;
for (auto it = map_.begin(); it != map_.end();) {
if (GPR_UNLIKELY(it->second->ShouldRemove() && it->second->CanEvict())) {
size_ -= it->second->Size();
it->second->TakeChildPolicyWrappers(&child_policy_wrappers_to_delete);
it = map_.erase(it);
} else {
++it;
Expand All @@ -1483,7 +1522,9 @@ size_t RlsLb::Cache::EntrySizeForKey(const RequestKey& key) {
return (key.Size() * 2) + sizeof(Entry);
}

void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
void RlsLb::Cache::MaybeShrinkSize(
size_t bytes, std::vector<RefCountedPtr<ChildPolicyWrapper>>*
child_policy_wrappers_to_delete) {
while (size_ > bytes) {
auto lru_it = lru_list_.begin();
if (GPR_UNLIKELY(lru_it == lru_list_.end())) break;
Expand All @@ -1494,6 +1535,7 @@ void RlsLb::Cache::MaybeShrinkSize(size_t bytes) {
<< "[rlslb " << lb_policy_ << "] LRU eviction: removing entry "
<< map_it->second.get() << " " << lru_it->ToString();
size_ -= map_it->second->Size();
map_it->second->TakeChildPolicyWrappers(child_policy_wrappers_to_delete);
map_.erase(map_it);
}
GRPC_TRACE_LOG(rls_lb, INFO)
Expand Down Expand Up @@ -1814,13 +1856,18 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
<< "[rlslb " << lb_policy_.get() << "] rls_request=" << this << " "
<< key_.ToString() << ": response info: " << response.ToString();
std::vector<ChildPolicyWrapper*> child_policies_to_finish_update;
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
{
MutexLock lock(&lb_policy_->mu_);
if (lb_policy_->is_shutdown_) return;
rls_channel_->ReportResponseLocked(response.status.ok());
Cache::Entry* cache_entry = lb_policy_->cache_.FindOrInsert(key_);
Cache::Entry* cache_entry =
lb_policy_->cache_.FindOrInsert(key_, &child_policy_wrappers_to_delete);
child_policies_to_finish_update = cache_entry->OnRlsResponseLocked(
std::move(response), std::move(backoff_state_));
std::move(response), std::move(backoff_state_),
&child_policy_to_delete);
lb_policy_->request_map_.erase(key_);
}
// Now that we've released the lock, finish the update on any newly
Expand Down Expand Up @@ -1999,6 +2046,9 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
}
}
// Now grab the lock to swap out the state it guards.
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
OrphanablePtr<ChildPolicyHandler> child_policy_to_delete;
{
MutexLock lock(&mu_);
// Swap out RLS channel if needed.
Expand All @@ -2010,19 +2060,20 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
// Resize cache if needed.
if (old_config == nullptr ||
config_->cache_size_bytes() != old_config->cache_size_bytes()) {
cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()));
cache_.Resize(static_cast<size_t>(config_->cache_size_bytes()),
&child_policy_wrappers_to_delete);
}
// Start update of child policies if needed.
if (update_child_policies) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] starting child policy updates";
for (auto& p : child_policy_map_) {
p.second->StartUpdate();
p.second->StartUpdate(&child_policy_to_delete);
}
} else if (created_default_child) {
GRPC_TRACE_LOG(rls_lb, INFO)
<< "[rlslb " << this << "] starting default child policy update";
default_child_policy_->StartUpdate();
default_child_policy_->StartUpdate(&child_policy_to_delete);
}
}
// Now that we've released the lock, finish update of child policies.
Expand Down Expand Up @@ -2097,14 +2148,20 @@ void RlsLb::ResetBackoffLocked() {
void RlsLb::ShutdownLocked() {
GRPC_TRACE_LOG(rls_lb, INFO) << "[rlslb " << this << "] policy shutdown";
registered_metric_callback_.reset();
MutexLock lock(&mu_);
is_shutdown_ = true;
config_.reset(DEBUG_LOCATION, "ShutdownLocked");
RefCountedPtr<ChildPolicyWrapper> child_policy_to_delete;
std::vector<RefCountedPtr<ChildPolicyWrapper>>
child_policy_wrappers_to_delete;
OrphanablePtr<RlsChannel> rls_channel_to_delete;
{
MutexLock lock(&mu_);
is_shutdown_ = true;
config_.reset(DEBUG_LOCATION, "ShutdownLocked");
child_policy_wrappers_to_delete = cache_.Shutdown();
request_map_.clear();
rls_channel_to_delete = std::move(rls_channel_);
child_policy_to_delete = std::move(default_child_policy_);
}
channel_args_ = ChannelArgs();
cache_.Shutdown();
request_map_.clear();
rls_channel_.reset();
default_child_policy_.reset();
}

void RlsLb::UpdatePickerAsync() {
Expand Down

0 comments on commit 7407dbf

Please sign in to comment.