Skip to content

Commit

Permalink
health config dynamic update
Browse files Browse the repository at this point in the history
  • Loading branch information
zakhar1i committed Dec 17, 2024
1 parent a005e6d commit be46752
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 12 deletions.
27 changes: 24 additions & 3 deletions example/health/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

const (
mercedesTarget = "mercedes.citaro"
bogdanTarget = "bogdan.A091"

speedMetricID = "speed"
stationsCounterID = "stations"
Expand Down Expand Up @@ -57,13 +58,13 @@ func main() {

wg := &sync.WaitGroup{}
wg.Add(1)
go bus(wg)
go bus(manager, wg)
wg.Wait()

frameworkStop()
}

func bus(wg *sync.WaitGroup) {
func bus(manager health.Manager, wg *sync.WaitGroup) {
log := log.GetLogger()
defer wg.Done()

Expand Down Expand Up @@ -122,6 +123,26 @@ func bus(wg *sync.WaitGroup) {
collector.AddMetricValue(mercedesTarget, speedMetricID, 5.0)
collector.AddMetricValue(mercedesTarget, speedMetricID, 25.0)

time.Sleep(sleeps / 2)

log.Info().Msg("An unregistered vehicle appears at a crossroads")
collector.AddMetricValue(bogdanTarget, speedMetricID, 2)
time.Sleep(sleeps / 2)

log.Info().Msg("Updating the config to monitor it")
cfg := health.NewConfig()
cfg.RegistrationOnCollect = true
cfg.CollectionCycle = 4 * time.Second
manager.UpdateConfig(cfg)
sleeps = 4 * time.Second
time.Sleep(sleeps / 2)

collector.AddMetricValue(bogdanTarget, speedMetricID, 4.0)
collector.AddMetricValue(mercedesTarget, speedMetricID, 5.0)
time.Sleep(sleeps)

collector.AddMetricValue(bogdanTarget, speedMetricID, 7.0)
collector.AddMetricValue(mercedesTarget, speedMetricID, 11.0)
time.Sleep(sleeps)
log.Info().Msg("Bus is keep moving but we don't need to monitor it anymore")
log.Info().Msg("Buses keep moving but we don't need to monitor them anymore")
}
53 changes: 53 additions & 0 deletions health/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package health

import (
"context"
"fmt"
"sync"
"time"

logging "github.com/zenoss/zenoss-go-sdk/health/log"
"github.com/zenoss/zenoss-go-sdk/health/target"
"github.com/zenoss/zenoss-go-sdk/health/utils"
)
Expand All @@ -19,6 +21,8 @@ type Collector interface {
// HeartBeat runs a new goroutine that sends heartbeat data once per collection cycle.
// It returns the cancel that will stop the heartbeat goroutine.
HeartBeat(targetID string) (context.CancelFunc, error)
// UpdateCycleDuration updates heartbeat cycle duration with provided value for all active targets
UpdateCycleDuration(d time.Duration) error
// AddToCounter updates counter with provided value (can be negative).
// Used for both TotalCounters and Counters
AddToCounter(targetID, counterID string, value int32) error
Expand Down Expand Up @@ -79,22 +83,47 @@ func ResetCollectorSingleton() {

type healthCollector struct {
cycleDuration time.Duration
heartbeats sync.Map
mu sync.RWMutex
metricsIn chan<- *TargetMeasurement
done chan struct{}
doneOnce sync.Once
}

type heartbeatTracker struct {
cancel context.CancelFunc
ticker *time.Ticker
}

func (hc *healthCollector) HeartBeat(targetID string) (context.CancelFunc, error) {
select {
case <-hc.done:
return nil, utils.ErrDeadCollector
default:
}

if hb, exists := hc.heartbeats.Load(targetID); exists {
heartbeat := hb.(*heartbeatTracker)
heartbeat.ticker.Stop()
heartbeat.cancel()
hc.heartbeats.Delete(targetID)
}

ctx, cancel := context.WithCancel(context.Background())

go func() {
hc.mu.RLock()
ticker := time.NewTicker(hc.cycleDuration)
hc.mu.RUnlock()
hc.heartbeats.Store(targetID, &heartbeatTracker{
cancel: cancel,
ticker: ticker,
})
defer func() {
ticker.Stop()
hc.heartbeats.Delete(targetID)
}()

for {
select {
case <-ctx.Done():
Expand All @@ -119,6 +148,30 @@ func (hc *healthCollector) HeartBeat(targetID string) (context.CancelFunc, error
return cancel, nil
}

func (hc *healthCollector) UpdateCycleDuration(newDuration time.Duration) error {
if newDuration <= 0 {
return fmt.Errorf("cycle duration must be positive")
}

hc.mu.Lock()
hc.cycleDuration = newDuration
hc.mu.Unlock()

targets := 0
hc.heartbeats.Range(func(_, hb any) bool {
heartbeat := hb.(*heartbeatTracker)
heartbeat.ticker.Reset(newDuration)
targets++
return true
})
if targets > 0 {
logging.GetLogger().Info().Msgf("Updated heartbeat interval to %v for %d targets",
newDuration, targets,
)
}
return nil
}

func (hc *healthCollector) AddToCounter(targetID, counterID string, value int32) error {
select {
case hc.metricsIn <- &TargetMeasurement{
Expand Down
4 changes: 4 additions & 0 deletions health/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ var _ = Describe("Collector", func() {
Ω(hbMeasure).ShouldNot(BeNil())
Ω(hbMeasure.MeasureType).Should(Equal(health.Heartbeat))

// should restart active heartbeat goroutine for existing target
hbCancel, err = collector.HeartBeat(testTargetID)
Ω(err).Should(BeNil())

hbCancel()
})
})
Expand Down
75 changes: 66 additions & 9 deletions health/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func FrameworkStart(ctx context.Context, cfg *Config, m Manager, writer w.Health

StopCollectorSingleton()
m.Shutdown()
close(measurementsCh)
writer.Shutdown()
doneWg.Wait()
}
Expand All @@ -93,6 +94,8 @@ type Manager interface {
ctx context.Context, measureOut <-chan *TargetMeasurement,
healthIn chan<- *target.Health, targetIn chan<- *target.Target,
)
// UpdateConfig applies the new configuration for manager and collector
UpdateConfig(config *Config) error
// Shutdown method closes manager's channels and terminates goroutines
Shutdown()
// IsStarted return the status of the manager
Expand All @@ -119,6 +122,7 @@ func NewManager(_ context.Context, config *Config) Manager {
type healthManager struct {
registry healthRegistry
config *Config
configIn chan<- *Config
healthIn chan<- *target.Health
targetIn chan<- *target.Target

Expand All @@ -129,15 +133,19 @@ type healthManager struct {
// used to wait for manager processes to stop so we can mark started as false in a right time
wg *sync.WaitGroup

mu sync.Mutex
started atomic.Bool
mu sync.Mutex
configMu sync.RWMutex
started atomic.Bool
}

func (hm *healthManager) Start(
ctx context.Context, measureOut <-chan *TargetMeasurement,
healthIn chan<- *target.Health, targetIn chan<- *target.Target,
) {
configCh := make(chan *Config)

hm.mu.Lock()
hm.configIn = configCh
hm.targetIn = targetIn
hm.healthIn = healthIn
hm.stopSig = make(chan struct{})
Expand All @@ -152,7 +160,7 @@ func (hm *healthManager) Start(
hm.wg.Add(1)
go func() {
defer hm.wg.Done()
hm.healthForwarder(ctx, healthIn)
hm.healthForwarder(ctx, configCh, healthIn)
}()

hm.started.Store(true)
Expand All @@ -166,12 +174,56 @@ func (hm *healthManager) Start(
hm.sendTargetsInfo() // if some targets were added before start we need to register them
}

func (hm *healthManager) UpdateConfig(newConfig *Config) error {
if newConfig == nil {
return fmt.Errorf("config should not be nil")
}
if newConfig.CollectionCycle <= 0 {
return fmt.Errorf("collection cycle must be positive")
}
logging.SetLogLevel(newConfig.LogLevel)

var cycleDurUpdated bool
hm.configMu.Lock()
cycleDurUpdated = hm.config.CollectionCycle != newConfig.CollectionCycle
hm.config = newConfig
hm.configMu.Unlock()

if cycleDurUpdated {
coll, err := GetCollectorSingleton()
if err != nil {
return err
}
err = coll.UpdateCycleDuration(newConfig.CollectionCycle)
if err != nil {
return err
}
go func() {
hm.configIn <- newConfig
}()
}
return nil
}

func (hm *healthManager) collectionCycle() time.Duration {
hm.configMu.RLock()
defer hm.configMu.RUnlock()
return hm.config.CollectionCycle
}

func (hm *healthManager) registrationOnCollect() bool {
hm.configMu.RLock()
defer hm.configMu.RUnlock()
return hm.config.RegistrationOnCollect
}

func (hm *healthManager) Shutdown() {
hm.mu.Lock()
defer hm.mu.Unlock()
close(hm.stopSig)
<-hm.stopWait
close(hm.targetIn)
close(hm.configIn)
}

func (hm *healthManager) Done() <-chan struct{} {
Expand Down Expand Up @@ -240,7 +292,7 @@ func (hm *healthManager) updateTargetHealthData(measure *TargetMeasurement) erro

targetHealth, ok := hm.registry.getRawHealthForTarget(measure.TargetID)
if !ok {
if !hm.config.RegistrationOnCollect {
if !hm.registrationOnCollect() {
return utils.ErrTargetNotRegistered
}
targetHealth, err = hm.buildTargetFromMeasure(measure)
Expand Down Expand Up @@ -278,7 +330,7 @@ func (hm *healthManager) updateTargetHealthData(measure *TargetMeasurement) erro

func (hm *healthManager) updateTargetsMetric(tHealth *rawHealth, measure *TargetMeasurement) error {
if !sdk_utils.ListContainsString(tHealth.target.MetricIDs, measure.MeasureID) {
if !hm.config.RegistrationOnCollect {
if !hm.registrationOnCollect() {
return utils.ErrMetricNotRegistered
}
if !tHealth.target.IsMeasureIDUnique(measure.MeasureID) {
Expand All @@ -300,7 +352,7 @@ func (hm *healthManager) updateTargetsCounter(tHealth *rawHealth, measure *Targe
tHealth.totalCounters[measure.MeasureID] += measure.CounterChange
} else {
if !sdk_utils.ListContainsString(tHealth.target.CounterIDs, measure.MeasureID) {
if !hm.config.RegistrationOnCollect {
if !hm.registrationOnCollect() {
return utils.ErrCounterNotRegistered
}
if !tHealth.target.IsMeasureIDUnique(measure.MeasureID) {
Expand Down Expand Up @@ -337,15 +389,20 @@ func (*healthManager) buildTargetFromMeasure(measure *TargetMeasurement) (*rawHe
}

// Calculates raw health data from the registry and forwards all health data to the writer once per cycle
func (hm *healthManager) healthForwarder(ctx context.Context, healthIn chan<- *target.Health) {
func (hm *healthManager) healthForwarder(ctx context.Context, configUpd <-chan *Config, healthIn chan<- *target.Health) {
log := logging.GetLogger()
log.Info().Msgf("Start to send health data to a writer with cycle %v", hm.config.CollectionCycle)
log.Info().Msgf("Start to send health data to a writer with cycle %v", hm.collectionCycle())
defer func() { log.Info().Msg("Finish to send health data to a writer") }()
ticker := time.NewTicker(hm.config.CollectionCycle)
ticker := time.NewTicker(hm.collectionCycle())
for {
select {
case <-ticker.C:
hm.writeHealthResult(healthIn)
case cfg, updated := <-configUpd:
if updated {
ticker.Reset(cfg.CollectionCycle)
logging.GetLogger().Info().Msgf("Updated collection interval to %v", cfg.CollectionCycle)
}
case <-hm.stopSig:
hm.writeHealthResult(healthIn)
close(healthIn)
Expand Down
Loading

0 comments on commit be46752

Please sign in to comment.