Skip to content

Commit

Permalink
fix: allow a single node to activate stress relief mode during signif…
Browse files Browse the repository at this point in the history
…icant load increase (#1256)

<!--
Thank you for contributing to the project! 💜
Please make sure to:
- Chat with us first if this is a big change
  - Open a new issue (or comment on an existing one)
- We want to make sure you don't spend time implementing something we
might have to say No to
- Add unit tests
- Mention any relevant issues in the PR description (e.g. "Fixes #123")

Please see our [OSS process
document](https://github.com/honeycombio/home/blob/main/honeycomb-oss-lifecycle-and-practices.md#)
to get an idea of how we operate.
-->

## Which problem is this PR solving?

Without resolving trace locality issue, a single peer can receive a
large trace that significantly raise its stress level than the rest of
the cluster. To address this issue, we can allow individual refineries
to go into stress relief mode if their own stress is too high, even if
the cluster's isn't.

## Short description of the changes

- use the max of the individual stress level and the cluster stress
level as the overall stress level when calculating stress relief
activation and deactivation
- record the stress level that determined stress relief activation as
`stress_level` metric
- add tests
  • Loading branch information
VinozzZ authored Jul 26, 2024
1 parent fa870c1 commit 70781b0
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 11 deletions.
37 changes: 29 additions & 8 deletions collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ var _ StressReliever = &MockStressReliever{}
type MockStressReliever struct {
IsStressed bool
SampleDeterministically bool
SampleRate uint
ShouldKeep bool
SampleRate uint
}

func (m *MockStressReliever) Start() error { return nil }
Expand Down Expand Up @@ -99,6 +99,8 @@ type StressRelief struct {

lock sync.RWMutex
stressLevels map[string]stressReport
// only used in tests
disableStressLevelReport bool
}

const StressReliefHealthKey = "stress_relief"
Expand All @@ -113,6 +115,7 @@ func (s *StressRelief) Start() error {
// register stress level metrics
s.RefineryMetrics.Register("cluster_stress_level", "gauge")
s.RefineryMetrics.Register("individual_stress_level", "gauge")
s.RefineryMetrics.Register("stress_level", "gauge")
s.RefineryMetrics.Register("stress_relief_activated", "gauge")

// We use an algorithms map so that we can name these algorithms, which makes it easier for several things:
Expand Down Expand Up @@ -151,19 +154,23 @@ func (s *StressRelief) Start() error {

// start our monitor goroutine that periodically calls recalc
// and also reports that it's healthy

go func(s *StressRelief) {
// only publish stress level if it has changed or if it's been a while since the last publish
if s.disableStressLevelReport {
return
}
const maxTicksBetweenReports = 30
var (
lastLevel uint = 0
tickCounter = 0
)

tick := time.NewTicker(100 * time.Millisecond)
tick := s.Clock.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
case <-tick.Chan():
currentLevel := s.Recalc()

if lastLevel != currentLevel || tickCounter == maxTicksBetweenReports {
Expand Down Expand Up @@ -390,7 +397,7 @@ func (s *StressRelief) Recalc() uint {
formula = fmt.Sprintf("%s(%v/%v)=%v", c.Algorithm, c.Numerator, c.Denominator, stress)
}
}
s.Logger.Debug().WithField("stress_level", maximumLevel).WithField("stress_formula", s.formula).WithField("reason", reason).Logf("calculated stress level")
s.Logger.Debug().WithField("individual_stress_level", maximumLevel).WithField("stress_formula", s.formula).WithField("reason", reason).Logf("calculated stress level")

s.RefineryMetrics.Gauge("individual_stress_level", float64(maximumLevel))
localLevel := uint(maximumLevel)
Expand All @@ -401,7 +408,11 @@ func (s *StressRelief) Recalc() uint {
s.lock.Lock()
defer s.lock.Unlock()

s.overallStressLevel = clusterStressLevel
// The overall stress level is the max of the individual and cluster stress levels
// If a single node is under significant stress, it can activate stress relief mode
s.overallStressLevel = uint(math.Max(float64(clusterStressLevel), float64(localLevel)))
s.RefineryMetrics.Gauge("stress_level", s.overallStressLevel)

s.reason = reason
s.formula = formula

Expand All @@ -414,18 +425,28 @@ func (s *StressRelief) Recalc() uint {
// If it's off, should we activate it?
if !s.stressed && s.overallStressLevel >= s.activateLevel {
s.stressed = true
s.Logger.Warn().WithField("cluster_stress_level", s.overallStressLevel).WithField("stress_formula", s.formula).WithField("reason", s.reason).Logf("StressRelief has been activated")
s.Logger.Warn().WithFields(map[string]interface{}{
"individual_stress_level": localLevel,
"cluster_stress_level": clusterStressLevel,
"stress_level": s.overallStressLevel,
"stress_formula": s.formula,
"reason": s.reason,
}).Logf("StressRelief has been activated")
}
// We want make sure that stress relief is below the deactivate level
// for a minimum time after the last time we said it should be, so
// whenever it's above that value we push the time out.
if s.stressed && s.overallStressLevel >= s.deactivateLevel {
s.stayOnUntil = time.Now().Add(s.minDuration)
s.stayOnUntil = s.Clock.Now().Add(s.minDuration)
}
// If it's on, should we deactivate it?
if s.stressed && s.overallStressLevel < s.deactivateLevel && s.Clock.Now().After(s.stayOnUntil) {
s.stressed = false
s.Logger.Warn().WithField("cluster_stress_level", s.overallStressLevel).Logf("StressRelief has been deactivated")
s.Logger.Warn().WithFields(map[string]interface{}{
"individual_stress_level": localLevel,
"cluster_stress_level": clusterStressLevel,
"stress_level": s.overallStressLevel,
}).Logf("StressRelief has been deactivated")
}
}

Expand Down
87 changes: 84 additions & 3 deletions collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/pubsub"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -101,21 +102,22 @@ func TestStressRelief_Peer(t *testing.T) {
require.Eventually(t, func() bool {
clock.Advance(time.Second * 1)
return sr.Stressed()
}, 2*time.Second, 100*time.Millisecond, "stress relief should be false")
}, 2*time.Second, 100*time.Millisecond, "stress relief should be active")

require.Eventually(t, func() bool {
// pretend another refinery just started up
msg := stressReliefMessage{
level: 90,
level: 10,
peerID: "peer1",
}
require.NoError(t, channel.Publish(context.Background(), stressReliefTopic, msg.String()))
clock.Advance(time.Second * 1)
return sr.Stressed()
}, 2*time.Second, 100*time.Millisecond, "stress relief should be false")
}, 2*time.Second, 100*time.Millisecond, "stress relief should be remain activated")

// now the peer has reported valid stress level
// it should be taken into account for the overall stress level
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 5)
require.Eventually(t, func() bool {
msg := stressReliefMessage{
level: 10,
Expand All @@ -128,6 +130,85 @@ func TestStressRelief_Peer(t *testing.T) {
}, 2*time.Second, 100*time.Millisecond, "stress relief should be false")
}

func TestStressRelief_OverallStressLevel(t *testing.T) {
clock := clockwork.NewFakeClock()
sr, stop := newStressRelief(t, clock, nil)
defer stop()

// disable the automatic stress level recalculation
sr.disableStressLevelReport = true
sr.Start()

sr.RefineryMetrics.Register("collector_incoming_queue_length", "gauge")

sr.RefineryMetrics.Store("INCOMING_CAP", 1200)

cfg := config.StressReliefConfig{
Mode: "monitor",
ActivationLevel: 80,
DeactivationLevel: 65,
MinimumActivationDuration: config.Duration(5 * time.Second),
}

// On startup, the stress relief should not be active
sr.UpdateFromConfig(cfg)
require.False(t, sr.Stressed())

// Test 1
// when a single peer's individual stress level is above the activation level
// the overall stress level should be above the activation level
// and the stress relief should be active
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 965)
clock.Advance(time.Second * 1)
sr.stressLevels = make(map[string]stressReport, 100)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 10,
timestamp: sr.Clock.Now(),
}
}

localLevel := sr.Recalc()
require.Equal(t, localLevel, sr.overallStressLevel)
require.True(t, sr.stressed)

// Test 2
// when a single peer's individual stress level is below the activation level
// and the rest of the cluster is above the activation level
// the single peer should remain in stress relief mode
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 85,
timestamp: sr.Clock.Now(),
}
}
localLevel = sr.Recalc()
require.Greater(t, sr.overallStressLevel, localLevel)
require.True(t, sr.stressed)

// Test 3
// Only when both the single peer's individual stress level and the cluster stress
// level is below the activation level, the stress relief should be deactivated.
sr.RefineryMetrics.Gauge("collector_incoming_queue_length", 10)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("peer%d", i)
sr.stressLevels[key] = stressReport{
key: key,
level: 1,
timestamp: sr.Clock.Now(),
}
}
clock.Advance(sr.minDuration * 2)
localLevel = sr.Recalc()
assert.Equal(t, sr.overallStressLevel, localLevel)
assert.False(t, sr.stressed)
}

// TestStressRelief_Sample tests that traces are sampled deterministically
// by traceID.
// The test generates 10000 traceIDs and checks that the sampling rate is
Expand Down

0 comments on commit 70781b0

Please sign in to comment.