Skip to content

Commit

Permalink
Merge branch 'master' into local_creds
Browse files Browse the repository at this point in the history
  • Loading branch information
erm-g committed Aug 9, 2024
2 parents a69f4b4 + d7861e8 commit f64532e
Show file tree
Hide file tree
Showing 30 changed files with 378 additions and 547 deletions.
2 changes: 1 addition & 1 deletion grpc.def

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion include/grpc/support/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ typedef enum gpr_log_severity {
GPRAPI void gpr_log(const char* file, int line, gpr_log_severity severity,
const char* format, ...) GPR_PRINT_FORMAT_CHECK(4, 5);

GPRAPI int gpr_should_log(gpr_log_severity severity);
/** Deprecated. **/
GPRAPI int absl_vlog2_enabled();

GPRAPI void gpr_log_verbosity_init(void);

Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/transport/binder/wire_format/wire_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ void WireWriterImpl::TryScheduleTransaction() {
} else {
// It is common to fill `kFlowControlWindowSize` completely because
// transactions are send at faster rate than the other end of transport
// can handle it, so here we use `GPR_DEBUG` log level.
// can handle it, so here we use VLOG(2).
VLOG(2) << "Some work cannot be scheduled yet due to slow ack from the "
"other end of transport. This transport might be blocked if "
"this number don't go down. pending_outgoing_tx_.size() = "
Expand Down
69 changes: 30 additions & 39 deletions src/core/load_balancing/grpclb/grpclb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1174,17 +1174,16 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
if (response.client_stats_report_interval != Duration::Zero()) {
client_stats_report_interval_ = std::max(
Duration::Seconds(1), response.client_stats_report_interval);
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Received initial LB response message; client load "
"reporting interval = "
<< client_stats_report_interval_.millis()
<< " milliseconds";
}
} else if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Received initial LB response message; client load "
"reporting NOT enabled";
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Received initial LB response message; client load "
"reporting interval = "
<< client_stats_report_interval_.millis() << " milliseconds";
} else {
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Received initial LB response message; client load "
"reporting NOT enabled";
}
seen_initial_response_ = true;
break;
Expand All @@ -1193,13 +1192,11 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
CHECK_NE(lb_call_, nullptr);
auto serverlist_wrapper =
MakeRefCounted<Serverlist>(std::move(response.serverlist));
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Serverlist with "
<< serverlist_wrapper->serverlist().size()
<< " servers received:\n"
<< serverlist_wrapper->AsText();
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Serverlist with " << serverlist_wrapper->serverlist().size()
<< " servers received:\n"
<< serverlist_wrapper->AsText();
seen_serverlist_ = true;
// Start sending client load report only after we start using the
// serverlist returned from the current LB call.
Expand All @@ -1213,11 +1210,10 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked() {
// Check if the serverlist differs from the previous one.
if (grpclb_policy()->serverlist_ != nullptr &&
*grpclb_policy()->serverlist_ == *serverlist_wrapper) {
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Incoming server list identical to current, "
"ignoring.";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << grpclb_policy() << "] lb_calld=" << this
<< ": Incoming server list identical to current, "
"ignoring.";
} else { // New serverlist.
// Dispose of the fallback.
// TODO(roth): Ideally, we should stay in fallback mode until we
Expand Down Expand Up @@ -1457,11 +1453,10 @@ GrpcLb::GrpcLb(Args args)
GRPC_ARG_GRPCLB_SUBCHANNEL_CACHE_INTERVAL_MS)
.value_or(Duration::Milliseconds(
GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS)))) {
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this << "] Will use '"
<< std::string(channel_control_helper()->GetAuthority())
<< "' as the server name for LB request.";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << this << "] Will use '"
<< std::string(channel_control_helper()->GetAuthority())
<< "' as the server name for LB request.";
}

void GrpcLb::ShutdownLocked() {
Expand Down Expand Up @@ -1542,9 +1537,7 @@ class GrpcLb::NullLbTokenEndpointIterator final
};

absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this << "] received update";
}
GRPC_TRACE_LOG(glb, INFO) << "[grpclb " << this << "] received update";
const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config.TakeAsSubclass<GrpcLbConfig>();
CHECK(config_ != nullptr);
Expand Down Expand Up @@ -1656,11 +1649,10 @@ void GrpcLb::StartBalancerCallLocked() {
// Init the LB call data.
CHECK(lb_calld_ == nullptr);
lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this
<< "] Query for backends (lb_channel: " << lb_channel_.get()
<< ", lb_calld: " << lb_calld_.get() << ")";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << this
<< "] Query for backends (lb_channel: " << lb_channel_.get()
<< ", lb_calld: " << lb_calld_.get() << ")";
lb_calld_->StartQuery();
}

Expand Down Expand Up @@ -1695,9 +1687,8 @@ void GrpcLb::StartBalancerCallRetryTimerLocked() {
void GrpcLb::OnBalancerCallRetryTimerLocked() {
lb_call_retry_timer_handle_.reset();
if (!shutting_down_ && lb_calld_ == nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this << "] Restarting call to LB server";
}
GRPC_TRACE_LOG(glb, INFO)
<< "[grpclb " << this << "] Restarting call to LB server";
StartBalancerCallLocked();
}
}
Expand Down
48 changes: 20 additions & 28 deletions src/core/load_balancing/health_check_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,9 @@ void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked(

void HealthProducer::HealthChecker::NotifyWatchersLocked(
grpc_connectivity_state state, absl::Status status) {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthProducer " << producer_.get() << " HealthChecker "
<< this << ": reporting state " << ConnectivityStateName(state)
<< " to watchers";
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthProducer " << producer_.get() << " HealthChecker " << this
<< ": reporting state " << ConnectivityStateName(state) << " to watchers";
work_serializer_->Schedule(
[self = Ref(), state, status = std::move(status)]() {
MutexLock lock(&self->producer_->mu_);
Expand Down Expand Up @@ -285,11 +283,10 @@ class HealthProducer::HealthChecker::HealthStreamEventHandler final
void SetHealthStatusLocked(SubchannelStreamClient* client,
grpc_connectivity_state state,
const char* reason) {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthCheckClient " << client
<< ": setting state=" << ConnectivityStateName(state)
<< " reason=" << reason;
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthCheckClient " << client
<< ": setting state=" << ConnectivityStateName(state)
<< " reason=" << reason;
health_checker_->OnHealthWatchStatusChange(
state, state == GRPC_CHANNEL_TRANSIENT_FAILURE
? absl::UnavailableError(reason)
Expand All @@ -300,11 +297,9 @@ class HealthProducer::HealthChecker::HealthStreamEventHandler final
};

void HealthProducer::HealthChecker::StartHealthStreamLocked() {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthProducer " << producer_.get() << " HealthChecker "
<< this << ": creating HealthClient for \""
<< health_check_service_name_ << "\"";
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthProducer " << producer_.get() << " HealthChecker " << this
<< ": creating HealthClient for \"" << health_check_service_name_ << "\"";
stream_client_ = MakeOrphanable<SubchannelStreamClient>(
producer_->connected_subchannel_, producer_->subchannel_->pollset_set(),
std::make_unique<HealthStreamEventHandler>(Ref()),
Expand Down Expand Up @@ -356,9 +351,8 @@ void HealthProducer::Start(RefCountedPtr<Subchannel> subchannel) {
}

void HealthProducer::Orphaned() {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthProducer " << this << ": shutting down";
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthProducer " << this << ": shutting down";
{
MutexLock lock(&mu_);
health_checkers_.clear();
Expand Down Expand Up @@ -406,11 +400,10 @@ void HealthProducer::RemoveWatcher(

void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
const absl::Status& status) {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthProducer " << this
<< ": subchannel state update: state="
<< ConnectivityStateName(state) << " status=" << status;
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthProducer " << this
<< ": subchannel state update: state=" << ConnectivityStateName(state)
<< " status=" << status;
MutexLock lock(&mu_);
state_ = state;
status_ = status;
Expand All @@ -432,11 +425,10 @@ void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
//

HealthWatcher::~HealthWatcher() {
if (GRPC_TRACE_FLAG_ENABLED(health_check_client)) {
LOG(INFO) << "HealthWatcher " << this << ": unregistering from producer "
<< producer_.get() << " (health_check_service_name=\""
<< health_check_service_name_.value_or("N/A") << "\")";
}
GRPC_TRACE_LOG(health_check_client, INFO)
<< "HealthWatcher " << this << ": unregistering from producer "
<< producer_.get() << " (health_check_service_name=\""
<< health_check_service_name_.value_or("N/A") << "\")";
if (producer_ != nullptr) {
producer_->RemoveWatcher(this, health_check_service_name_);
}
Expand Down
63 changes: 26 additions & 37 deletions src/core/load_balancing/outlier_detection/outlier_detection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,10 @@ OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb,
RefCountedPtr<SubchannelPicker> picker,
bool counting_enabled)
: picker_(std::move(picker)), counting_enabled_(counting_enabled) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << outlier_detection_lb
<< "] constructed new picker " << this << " and counting "
<< "is " << (counting_enabled ? "enabled" : "disabled");
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << outlier_detection_lb
<< "] constructed new picker " << this << " and counting "
<< "is " << (counting_enabled ? "enabled" : "disabled");
}

LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(
Expand Down Expand Up @@ -574,9 +573,8 @@ LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick(

OutlierDetectionLb::OutlierDetectionLb(Args args)
: LoadBalancingPolicy(std::move(args)) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this << "] created";
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this << "] created";
}

OutlierDetectionLb::~OutlierDetectionLb() {
Expand All @@ -586,9 +584,8 @@ OutlierDetectionLb::~OutlierDetectionLb() {
}

void OutlierDetectionLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this << "] shutting down";
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this << "] shutting down";
ejection_timer_.reset();
shutting_down_ = true;
// Remove the child policy's interested_parties pollset_set from the
Expand All @@ -612,9 +609,8 @@ void OutlierDetectionLb::ResetBackoffLocked() {
}

absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this << "] Received update";
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this << "] Received update";
auto old_config = std::move(config_);
// Update config.
config_ = args.config.TakeAsSubclass<OutlierDetectionLbConfig>();
Expand All @@ -627,9 +623,8 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
ejection_timer_.reset();
} else if (ejection_timer_ == nullptr) {
// No timer running. Start it now.
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this << "] starting timer";
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this << "] starting timer";
ejection_timer_ = MakeOrphanable<EjectionTimer>(
RefAsSubclass<OutlierDetectionLb>(), Timestamp::Now());
for (const auto& p : endpoint_state_map_) {
Expand Down Expand Up @@ -687,11 +682,9 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
key, MakeRefCounted<EndpointState>(std::move(subchannels)));
} else if (!config_->CountingEnabled()) {
// If counting is not enabled, reset state.
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << this
<< "] counting disabled; disabling ejection for "
<< key.ToString();
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << this
<< "] counting disabled; disabling ejection for " << key.ToString();
it->second->DisableEjection();
}
});
Expand Down Expand Up @@ -931,17 +924,14 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
const double success_rate_stdev_factor =
static_cast<double>(config.success_rate_ejection->stdev_factor) / 1000;
double ejection_threshold = mean - stdev * success_rate_stdev_factor;
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << parent_.get()
<< "] stdev=" << stdev
<< ", ejection_threshold=" << ejection_threshold;
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << parent_.get() << "] stdev=" << stdev
<< ", ejection_threshold=" << ejection_threshold;
for (auto& candidate : success_rate_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << parent_.get()
<< "] checking candidate " << candidate.first
<< ": success_rate=" << candidate.second;
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << parent_.get()
<< "] checking candidate " << candidate.first
<< ": success_rate=" << candidate.second;
if (candidate.second < ejection_threshold) {
uint32_t random_key = absl::Uniform(bit_gen_, 1, 100);
double current_percent =
Expand Down Expand Up @@ -979,11 +969,10 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
<< config.failure_percentage_ejection->enforcement_percentage;
}
for (auto& candidate : failure_percentage_ejection_candidates) {
if (GRPC_TRACE_FLAG_ENABLED(outlier_detection_lb)) {
LOG(INFO) << "[outlier_detection_lb " << parent_.get()
<< "] checking candidate " << candidate.first
<< ": success_rate=" << candidate.second;
}
GRPC_TRACE_LOG(outlier_detection_lb, INFO)
<< "[outlier_detection_lb " << parent_.get()
<< "] checking candidate " << candidate.first
<< ": success_rate=" << candidate.second;
// Extra check to make sure success rate algorithm didn't already
// eject this backend.
if (candidate.first->ejection_time().has_value()) continue;
Expand Down
Loading

0 comments on commit f64532e

Please sign in to comment.