diff --git a/collect/stressRelief.go b/collect/stressRelief.go index 587c44f759..cc5fb30c60 100644 --- a/collect/stressRelief.go +++ b/collect/stressRelief.go @@ -35,8 +35,8 @@ var _ StressReliever = &MockStressReliever{} type MockStressReliever struct { IsStressed bool SampleDeterministically bool - SampleRate uint ShouldKeep bool + SampleRate uint } func (m *MockStressReliever) Start() error { return nil } @@ -99,6 +99,8 @@ type StressRelief struct { lock sync.RWMutex stressLevels map[string]stressReport + // only used in tests + disableStressLevelReport bool } const StressReliefHealthKey = "stress_relief" @@ -113,6 +115,7 @@ func (s *StressRelief) Start() error { // register stress level metrics s.RefineryMetrics.Register("cluster_stress_level", "gauge") s.RefineryMetrics.Register("individual_stress_level", "gauge") + s.RefineryMetrics.Register("stress_level", "gauge") s.RefineryMetrics.Register("stress_relief_activated", "gauge") // We use an algorithms map so that we can name these algorithms, which makes it easier for several things: @@ -151,19 +154,23 @@ func (s *StressRelief) Start() error { // start our monitor goroutine that periodically calls recalc // and also reports that it's healthy + go func(s *StressRelief) { // only publish stress level if it has changed or if it's been a while since the last publish + if s.disableStressLevelReport { + return + } const maxTicksBetweenReports = 30 var ( lastLevel uint = 0 tickCounter = 0 ) - tick := time.NewTicker(100 * time.Millisecond) + tick := s.Clock.NewTicker(100 * time.Millisecond) defer tick.Stop() for { select { - case <-tick.C: + case <-tick.Chan(): currentLevel := s.Recalc() if lastLevel != currentLevel || tickCounter == maxTicksBetweenReports { @@ -390,7 +397,7 @@ func (s *StressRelief) Recalc() uint { formula = fmt.Sprintf("%s(%v/%v)=%v", c.Algorithm, c.Numerator, c.Denominator, stress) } } - s.Logger.Debug().WithField("stress_level", maximumLevel).WithField("stress_formula", s.formula).WithField("reason", reason).Logf("calculated stress level") + s.Logger.Debug().WithField("individual_stress_level", maximumLevel).WithField("stress_formula", s.formula).WithField("reason", reason).Logf("calculated stress level") s.RefineryMetrics.Gauge("individual_stress_level", float64(maximumLevel)) localLevel := uint(maximumLevel) @@ -401,7 +408,11 @@ func (s *StressRelief) Recalc() uint { s.lock.Lock() defer s.lock.Unlock() - s.overallStressLevel = clusterStressLevel + // The overall stress level is the max of the individual and cluster stress levels + // If a single node is under significant stress, it can activate stress relief mode + s.overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel))) + s.RefineryMetrics.Gauge("stress_level", s.overallStressLevel) + s.reason = reason s.formula = formula @@ -414,18 +425,28 @@ func (s *StressRelief) Recalc() uint { // If it's off, should we activate it? if !s.stressed && s.overallStressLevel >= s.activateLevel { s.stressed = true - s.Logger.Warn().WithField("cluster_stress_level", s.overallStressLevel).WithField("stress_formula", s.formula).WithField("reason", s.reason).Logf("StressRelief has been activated") + s.Logger.Warn().WithFields(map[string]interface{}{ + "individual_stress_level": localLevel, + "cluster_stress_level": clusterStressLevel, + "stress_level": s.overallStressLevel, + "stress_formula": s.formula, + "reason": s.reason, + }).Logf("StressRelief has been activated") } // We want make sure that stress relief is below the deactivate level // for a minimum time after the last time we said it should be, so // whenever it's above that value we push the time out. if s.stressed && s.overallStressLevel >= s.deactivateLevel { - s.stayOnUntil = time.Now().Add(s.minDuration) + s.stayOnUntil = s.Clock.Now().Add(s.minDuration) } // If it's on, should we deactivate it? if s.stressed && s.overallStressLevel < s.deactivateLevel && s.Clock.Now().After(s.stayOnUntil) { s.stressed = false - s.Logger.Warn().WithField("cluster_stress_level", s.overallStressLevel).Logf("StressRelief has been deactivated") + s.Logger.Warn().WithFields(map[string]interface{}{ + "individual_stress_level": localLevel, + "cluster_stress_level": clusterStressLevel, + "stress_level": s.overallStressLevel, + }).Logf("StressRelief has been deactivated") } } diff --git a/collect/stress_relief_test.go b/collect/stress_relief_test.go index 8a8ee99ced..73874a3380 100644 --- a/collect/stress_relief_test.go +++ b/collect/stress_relief_test.go @@ -15,6 +15,7 @@ import ( "github.com/honeycombio/refinery/metrics" "github.com/honeycombio/refinery/pubsub" "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -101,21 +102,22 @@ func TestStressRelief_Peer(t *testing.T) { require.Eventually(t, func() bool { clock.Advance(time.Second * 1) return sr.Stressed() - }, 2*time.Second, 100*time.Millisecond, "stress relief should be false") + }, 2*time.Second, 100*time.Millisecond, "stress relief should be active") require.Eventually(t, func() bool { // pretend another refinery just started up msg := stressReliefMessage{ - level: 90, + level: 10, peerID: "peer1", } require.NoError(t, channel.Publish(context.Background(), stressReliefTopic, msg.String())) clock.Advance(time.Second * 1) return sr.Stressed() - }, 2*time.Second, 100*time.Millisecond, "stress relief should be false") + }, 2*time.Second, 100*time.Millisecond, "stress relief should be remain activated") // now the peer has reported valid stress level // it should be taken into account for the overall stress level + sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 5) require.Eventually(t, func() bool { msg := stressReliefMessage{ level: 10, @@ -128,6 +130,85 @@ func TestStressRelief_Peer(t *testing.T) { }, 2*time.Second, 100*time.Millisecond, "stress relief should be false") } +func TestStressRelief_OverallStressLevel(t *testing.T) { + clock := clockwork.NewFakeClock() + sr, stop := newStressRelief(t, clock, nil) + defer stop() + + // disable the automatic stress level recalculation + sr.disableStressLevelReport = true + sr.Start() + + sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge") + + sr.RefineryMetrics.Store("INCOMING_CAP", 1200) + + cfg := config.StressReliefConfig{ + Mode: "monitor", + ActivationLevel: 80, + DeactivationLevel: 65, + MinimumActivationDuration: config.Duration(5 * time.Second), + } + + // On startup, the stress relief should not be active + sr.UpdateFromConfig(cfg) + require.False(t, sr.Stressed()) + + // Test 1 + // when a single peer's individual stress level is above the activation level + // the overall stress level should be above the activation level + // and the stress relief should be active + sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 965) + clock.Advance(time.Second * 1) + sr.stressLevels = make(map[string]stressReport, 100) + for i := 0; i < 100; i++ { + key := fmt.Sprintf("peer%d", i) + sr.stressLevels[key] = stressReport{ + key: key, + level: 10, + timestamp: sr.Clock.Now(), + } + } + + localLevel := sr.Recalc() + require.Equal(t, localLevel, sr.overallStressLevel) + require.True(t, sr.stressed) + + // Test 2 + // when a single peer's individual stress level is below the activation level + // and the rest of the cluster is above the activation level + // the single peer should remain in stress relief mode + sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10) + for i := 0; i < 100; i++ { + key := fmt.Sprintf("peer%d", i) + sr.stressLevels[key] = stressReport{ + key: key, + level: 85, + timestamp: sr.Clock.Now(), + } + } + localLevel = sr.Recalc() + require.Greater(t, sr.overallStressLevel, localLevel) + require.True(t, sr.stressed) + + // Test 3 + // Only when both the single peer's individual stress level and the cluster stress + // level is below the activation level, the stress relief should be deactivated. + sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10) + for i := 0; i < 100; i++ { + key := fmt.Sprintf("peer%d", i) + sr.stressLevels[key] = stressReport{ + key: key, + level: 1, + timestamp: sr.Clock.Now(), + } + } + clock.Advance(sr.minDuration * 2) + localLevel = sr.Recalc() + assert.Equal(t, sr.overallStressLevel, localLevel) + assert.False(t, sr.stressed) +} + // TestStressRelief_Sample tests that traces are sampled deterministically // by traceID. // The test generates 10000 traceIDs and checks that the sampling rate is