From 70781b0a987557dbf07c70a3a6e0041c56879927 Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Fri, 26 Jul 2024 13:29:45 -0400 Subject: [PATCH] fix: allow a single node to activate stress relief mode during significant load increase (#1256) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which problem is this PR solving? Without resolving trace locality issue, a single peer can receive a large trace that significantly raise its stress level than the rest of the cluster. To address this issue, we can allow individual refineries to go into stress relief mode if their own stress is too high, even if the cluster's isn't. ## Short description of the changes - use the max of the individual stress level and the cluster stress level as the overall stress level when calculating stress relief activation and deactivation - record the stress level that determined stress relief activation as `stress_level` metric - add tests --- collect/stressRelief.go | 37 +++++++++++---- collect/stress_relief_test.go | 87 +++++++++++++++++++++++++++++++++-- 2 files changed, 113 insertions(+), 11 deletions(-) diff --git a/collect/stressRelief.go b/collect/stressRelief.go index 587c44f759..cc5fb30c60 100644 --- a/collect/stressRelief.go +++ b/collect/stressRelief.go @@ -35,8 +35,8 @@ var _ StressReliever = &MockStressReliever{} type MockStressReliever struct { IsStressed bool SampleDeterministically bool - SampleRate uint ShouldKeep bool + SampleRate uint } func (m *MockStressReliever) Start() error { return nil } @@ -99,6 +99,8 @@ type StressRelief struct { lock sync.RWMutex stressLevels map[string]stressReport + // only used in tests + disableStressLevelReport bool } const StressReliefHealthKey = "stress_relief" @@ -113,6 +115,7 @@ func (s *StressRelief) Start() error { // register stress level metrics s.RefineryMetrics.Register("cluster_stress_level", "gauge") s.RefineryMetrics.Register("individual_stress_level", "gauge") + s.RefineryMetrics.Register("stress_level", "gauge") s.RefineryMetrics.Register("stress_relief_activated", "gauge") // We use an algorithms map so that we can name these algorithms, which makes it easier for several things: @@ -151,19 +154,23 @@ func (s *StressRelief) Start() error { // start our monitor goroutine that periodically calls recalc // and also reports that it's healthy + go func(s *StressRelief) { // only publish stress level if it has changed or if it's been a while since the last publish + if s.disableStressLevelReport { + return + } const maxTicksBetweenReports = 30 var ( lastLevel uint = 0 tickCounter = 0 ) - tick := time.NewTicker(100 * time.Millisecond) + tick := s.Clock.NewTicker(100 * time.Millisecond) defer tick.Stop() for { select { - case <-tick.C: + case <-tick.Chan(): currentLevel := s.Recalc() if lastLevel != currentLevel || tickCounter == maxTicksBetweenReports { @@ -390,7 +397,7 @@ func (s *StressRelief) Recalc() uint { formula = fmt.Sprintf("%s(%v/%v)=%v", c.Algorithm, c.Numerator, c.Denominator, stress) } } - s.Logger.Debug().WithField("stress_level", maximumLevel).WithField("stress_formula", s.formula).WithField("reason", reason).Logf("calculated stress level") + s.Logger.Debug().WithField("individual_stress_level", maximumLevel).WithField("stress_formula", s.formula).WithField("reason", reason).Logf("calculated stress level") s.RefineryMetrics.Gauge("individual_stress_level", float64(maximumLevel)) localLevel := uint(maximumLevel) @@ -401,7 +408,11 @@ func (s *StressRelief) Recalc() uint { s.lock.Lock() defer s.lock.Unlock() - s.overallStressLevel = clusterStressLevel + // The overall stress level is the max of the individual and cluster stress levels + // If a single node is under significant stress, it can activate stress relief mode + s.overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel))) + s.RefineryMetrics.Gauge("stress_level", s.overallStressLevel) + s.reason = reason s.formula = formula @@ -414,18 +425,28 @@ func (s *StressRelief) Recalc() uint { // If it's off, should we activate it? if !s.stressed && s.overallStressLevel >= s.activateLevel { s.stressed = true - s.Logger.Warn().WithField("cluster_stress_level", s.overallStressLevel).WithField("stress_formula", s.formula).WithField("reason", s.reason).Logf("StressRelief has been activated") + s.Logger.Warn().WithFields(map[string]interface{}{ + "individual_stress_level": localLevel, + "cluster_stress_level": clusterStressLevel, + "stress_level": s.overallStressLevel, + "stress_formula": s.formula, + "reason": s.reason, + }).Logf("StressRelief has been activated") } // We want make sure that stress relief is below the deactivate level // for a minimum time after the last time we said it should be, so // whenever it's above that value we push the time out. if s.stressed && s.overallStressLevel >= s.deactivateLevel { - s.stayOnUntil = time.Now().Add(s.minDuration) + s.stayOnUntil = s.Clock.Now().Add(s.minDuration) } // If it's on, should we deactivate it? if s.stressed && s.overallStressLevel < s.deactivateLevel && s.Clock.Now().After(s.stayOnUntil) { s.stressed = false - s.Logger.Warn().WithField("cluster_stress_level", s.overallStressLevel).Logf("StressRelief has been deactivated") + s.Logger.Warn().WithFields(map[string]interface{}{ + "individual_stress_level": localLevel, + "cluster_stress_level": clusterStressLevel, + "stress_level": s.overallStressLevel, + }).Logf("StressRelief has been deactivated") } } diff --git a/collect/stress_relief_test.go b/collect/stress_relief_test.go index 8a8ee99ced..73874a3380 100644 --- a/collect/stress_relief_test.go +++ b/collect/stress_relief_test.go @@ -15,6 +15,7 @@ import ( "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/pubsub" "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -101,21 +102,22 @@ func TestStressRelief_Peer(t *testing.T) { require.Eventually(t, func() bool { clock.Advance(time.Second * 1) return sr.Stressed() - }, 2*time.Second, 100*time.Millisecond, "stress relief should be false") + }, 2*time.Second, 100*time.Millisecond, "stress relief should be active") require.Eventually(t, func() bool { // pretend another refinery just started up msg := stressReliefMessage{ - level: 90, + level: 10, peerID: "peer1", } require.NoError(t, channel.Publish(context.Background(), stressReliefTopic, msg.String())) clock.Advance(time.Second * 1) return sr.Stressed() - }, 2*time.Second, 100*time.Millisecond, "stress relief should be false") + }, 2*time.Second, 100*time.Millisecond, "stress relief should be remain activated") // now the peer has reported valid stress level // it should be taken into account for the overall stress level + sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 5) require.Eventually(t, func() bool { msg := stressReliefMessage{ level: 10, @@ -128,6 +130,85 @@ func TestStressRelief_Peer(t *testing.T) { }, 2*time.Second, 100*time.Millisecond, "stress relief should be false") } +func TestStressRelief_OverallStressLevel(t *testing.T) { + clock := clockwork.NewFakeClock() + sr, stop := newStressRelief(t, clock, nil) + defer stop() + + // disable the automatic stress level recalculation + sr.disableStressLevelReport = true + sr.Start() + + sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge") + + sr.RefineryMetrics.Store("INCOMING_CAP", 1200) + + cfg := config.StressReliefConfig{ + Mode: "monitor", + ActivationLevel: 80, + DeactivationLevel: 65, + MinimumActivationDuration: config.Duration(5 * time.Second), + } + + // On startup, the stress relief should not be active + sr.UpdateFromConfig(cfg) + require.False(t, sr.Stressed()) + + // Test 1 + // when a single peer's individual stress level is above the activation level + // the overall stress level should be above the activation level + // and the stress relief should be active + sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 965) + clock.Advance(time.Second * 1) + sr.stressLevels = make(map[string]stressReport, 100) + for i := 0; i < 100; i++ { + key := fmt.Sprintf("peer%d", i) + sr.stressLevels[key] = stressReport{ + key: key, + level: 10, + timestamp: sr.Clock.Now(), + } + } + + localLevel := sr.Recalc() + require.Equal(t, localLevel, sr.overallStressLevel) + require.True(t, sr.stressed) + + // Test 2 + // when a single peer's individual stress level is below the activation level + // and the rest of the cluster is above the activation level + // the single peer should remain in stress relief mode + sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10) + for i := 0; i < 100; i++ { + key := fmt.Sprintf("peer%d", i) + sr.stressLevels[key] = stressReport{ + key: key, + level: 85, + timestamp: sr.Clock.Now(), + } + } + localLevel = sr.Recalc() + require.Greater(t, sr.overallStressLevel, localLevel) + require.True(t, sr.stressed) + + // Test 3 + // Only when both the single peer's individual stress level and the cluster stress + // level is below the activation level, the stress relief should be deactivated. + sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10) + for i := 0; i < 100; i++ { + key := fmt.Sprintf("peer%d", i) + sr.stressLevels[key] = stressReport{ + key: key, + level: 1, + timestamp: sr.Clock.Now(), + } + } + clock.Advance(sr.minDuration * 2) + localLevel = sr.Recalc() + assert.Equal(t, sr.overallStressLevel, localLevel) + assert.False(t, sr.stressed) +} + // TestStressRelief_Sample tests that traces are sampled deterministically // by traceID. // The test generates 10000 traceIDs and checks that the sampling rate is