Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: notifier metric alive count #1130

Merged
merged 10 commits into from
Dec 17, 2024
4 changes: 4 additions & 0 deletions cmd/notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type notifierConfig struct {
MaxFailAttemptToSendAvailable int `yaml:"max_fail_attempt_to_send_available"`
// Specify log level by entities
SetLogLevel setLogLevelConfig `yaml:"set_log_level"`
// CheckNotifierStateTimeout is the timeout between marking *.alive.count metric based on notifier state.
CheckNotifierStateTimeout string `yaml:"check_notifier_state_timeout"`
}

type selfStateConfig struct {
Expand Down Expand Up @@ -116,6 +118,7 @@ func getDefault() config {
Timezone: "UTC",
ReadBatchSize: int(notifier.NotificationsLimitUnlimited),
MaxFailAttemptToSendAvailable: 3,
CheckNotifierStateTimeout: "10s",
Tetrergeru marked this conversation as resolved.
Show resolved Hide resolved
},
Telemetry: cmd.TelemetryConfig{
Listen: ":8093",
Expand Down Expand Up @@ -202,6 +205,7 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config {
MaxFailAttemptToSendAvailable: config.MaxFailAttemptToSendAvailable,
LogContactsToLevel: contacts,
LogSubscriptionsToLevel: subscriptions,
CheckNotifierStateTimeout: to.Duration(config.CheckNotifierStateTimeout),
}
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/notifier/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -99,6 +100,7 @@ func main() {
}

notifierMetrics := metrics.ConfigureNotifierMetrics(telemetry.Metrics, serviceName)

sender := notifier.NewNotifier(
database,
logger,
Expand All @@ -119,7 +121,7 @@ func main() {

// Start moira self state checker
if config.Notifier.SelfState.getSettings().Enabled {
selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings(), metrics.ConfigureHeartBeatMetrics(telemetry.Metrics))
selfState := selfstate.NewSelfCheckWorker(logger, database, sender, config.Notifier.SelfState.getSettings())
if err := selfState.Start(); err != nil {
logger.Fatal().
Error(err).
Expand Down Expand Up @@ -156,6 +158,12 @@ func main() {
fetchEventsWorker.Start()
defer stopFetchEvents(fetchEventsWorker)

aliveWatcher := notifier.NewAliveWatcher(logger, database, notifierConfig, notifierMetrics)
ctx, cancel := context.WithCancel(context.Background())

aliveWatcher.Start(ctx)
defer cancel()

logger.Info().
String("moira_version", MoiraVersion).
Msg("Moira Notifier Started")
Expand Down
22 changes: 0 additions & 22 deletions metrics/heartbeat.go

This file was deleted.

16 changes: 15 additions & 1 deletion metrics/notifier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import "time"
import (
"time"
)

// NotifierMetrics is a collection of metrics used in notifier.
type NotifierMetrics struct {
Expand All @@ -16,6 +18,7 @@ type NotifierMetrics struct {
PlotsBuildDurationMs Histogram
PlotsEvaluateTriggerDurationMs Histogram
fetchNotificationsDurationMs Histogram
notifierIsAlive Meter
}

// ConfigureNotifierMetrics is notifier metrics configurator.
Expand All @@ -33,6 +36,7 @@ func ConfigureNotifierMetrics(registry Registry, prefix string) *NotifierMetrics
PlotsBuildDurationMs: registry.NewHistogram("plots", "build", "duration", "ms"),
PlotsEvaluateTriggerDurationMs: registry.NewHistogram("plots", "evaluate", "trigger", "duration", "ms"),
fetchNotificationsDurationMs: registry.NewHistogram("fetch", "notifications", "duration", "ms"),
notifierIsAlive: registry.NewMeter("", "alive"),
}
}

Expand Down Expand Up @@ -66,3 +70,13 @@ func (metrics *NotifierMetrics) MarkSendersFailedMetrics(contactType string) {
func (metrics *NotifierMetrics) MarkSendingFailed() {
metrics.SendingFailed.Mark(1)
}

// MarkNotifierIsAlive marks metric value.
func (metrics *NotifierMetrics) MarkNotifierIsAlive(isAlive bool) {
if isAlive {
metrics.notifierIsAlive.Mark(1)
return
}

metrics.notifierIsAlive.Mark(0)
}
66 changes: 66 additions & 0 deletions notifier/alive_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package notifier

import (
"context"
"time"

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics"
)

// AliveWatcher is responsible for checking notifier state and marking notifier.alive metrics.
type AliveWatcher struct {
logger moira.Logger
database moira.Database
config Config
Tetrergeru marked this conversation as resolved.
Show resolved Hide resolved
notifierMetrics *metrics.NotifierMetrics
}

// NewAliveWatcher is an initializer for AliveWatcher.
func NewAliveWatcher(
logger moira.Logger,
database moira.Database,
config Config,
notifierMetrics *metrics.NotifierMetrics,
) *AliveWatcher {
return &AliveWatcher{
logger: logger,
database: database,
config: config,
notifierMetrics: notifierMetrics,
}
}

// Start starts the checking loop in separate goroutine.
// Use context.WithCancel, context.WithTimeout etc. to terminate check loop.
func (watcher *AliveWatcher) Start(ctx context.Context) {
go watcher.stateChecker(ctx)
}

func (watcher *AliveWatcher) stateChecker(ctx context.Context) {
watcher.logger.Info().
Interface("check_timeout_seconds", watcher.config.CheckNotifierStateTimeout.Seconds()).
Msg("Moira Notifier alive watcher started")

ticker := time.NewTicker(watcher.config.CheckNotifierStateTimeout)

for {
select {
case <-ctx.Done():
watcher.logger.Info().Msg("Moira Notifier alive watcher stopped")
return
case <-ticker.C:
watcher.checkNotifierState()
}
}
}

func (watcher *AliveWatcher) checkNotifierState() {
state, _ := watcher.database.GetNotifierState()
if state != moira.SelfStateOK {
watcher.notifierMetrics.MarkNotifierIsAlive(false)
return
}

watcher.notifierMetrics.MarkNotifierIsAlive(true)
}
110 changes: 110 additions & 0 deletions notifier/alive_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package notifier

import (
"context"
"errors"
"testing"
"time"

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics"
. "github.com/smartystreets/goconvey/convey"
"go.uber.org/mock/gomock"

mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert"
mock_metrics "github.com/moira-alert/moira/mock/moira-alert/metrics"
)

func initAliveMeter(mockCtrl *gomock.Controller) (*mock_metrics.MockRegistry, *mock_metrics.MockMeter) {
mockRegistry := mock_metrics.NewMockRegistry(mockCtrl)
mockAliveMeter := mock_metrics.NewMockMeter(mockCtrl)

mockRegistry.EXPECT().NewMeter(gomock.Any()).Times(5)
mockRegistry.EXPECT().NewHistogram(gomock.Any()).Times(3)
mockRegistry.EXPECT().NewMeter("", "alive").Return(mockAliveMeter)

return mockRegistry, mockAliveMeter
}

func TestAliveWatcher_checkNotifierState(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

dataBase := mock_moira_alert.NewMockDatabase(mockCtrl)

mockRegistry, mockAliveMeter := initAliveMeter(mockCtrl)
testNotifierMetrics := metrics.ConfigureNotifierMetrics(mockRegistry, "")

aliveWatcher := NewAliveWatcher(nil, dataBase, Config{}, testNotifierMetrics)

Convey("checkNotifierState", t, func() {
Convey("when OK", func() {
dataBase.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil)
mockAliveMeter.EXPECT().Mark(int64(1))

aliveWatcher.checkNotifierState()
})

Convey("when not OK state and no errors", func() {
notOKStates := []string{moira.SelfStateERROR, "err", "bad", "", "1"}

for _, badState := range notOKStates {
dataBase.EXPECT().GetNotifierState().Return(badState, nil)
mockAliveMeter.EXPECT().Mark(int64(0))

aliveWatcher.checkNotifierState()
}
})

Convey("when not OK state and errors", func() {
notOKState := ""
givenErrors := []error{
errors.New("one error"),
errors.New("another error"),
}

for _, err := range givenErrors {
dataBase.EXPECT().GetNotifierState().Return(notOKState, err)
mockAliveMeter.EXPECT().Mark(int64(0))

aliveWatcher.checkNotifierState()
}
})
})
}

func TestAliveWatcher_Start(t *testing.T) {
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

logger := mock_moira_alert.NewMockLogger(mockCtrl)
eventsBuilder := mock_moira_alert.NewMockEventBuilder(mockCtrl)
logger.EXPECT().Info().Return(eventsBuilder).AnyTimes()

dataBase := mock_moira_alert.NewMockDatabase(mockCtrl)

testConf := Config{
CheckNotifierStateTimeout: time.Second,
}

mockRegistry, mockAliveMeter := initAliveMeter(mockCtrl)
testNotifierMetrics := metrics.ConfigureNotifierMetrics(mockRegistry, "")

aliveWatcher := NewAliveWatcher(logger, dataBase, testConf, testNotifierMetrics)

Convey("AliveWatcher stops on cancel", t, func() {
eventsBuilder.EXPECT().Interface("check_timeout_seconds", testConf.CheckNotifierStateTimeout.Seconds()).Return(eventsBuilder)
eventsBuilder.EXPECT().Msg("Moira Notifier alive watcher started")

eventsBuilder.EXPECT().Msg("Moira Notifier alive watcher stopped")

dataBase.EXPECT().GetNotifierState().Return(moira.SelfStateOK, nil).AnyTimes()
mockAliveMeter.EXPECT().Mark(int64(1)).AnyTimes()

ctx, cancel := context.WithCancel(context.Background())
aliveWatcher.Start(ctx)

time.Sleep(time.Second * 3)
cancel()
})
}
1 change: 1 addition & 0 deletions notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ type Config struct {
MaxFailAttemptToSendAvailable int
LogContactsToLevel map[string]string
LogSubscriptionsToLevel map[string]string
CheckNotifierStateTimeout time.Duration
}
17 changes: 5 additions & 12 deletions notifier/selfstate/heartbeat/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,30 @@ package heartbeat
import (
"fmt"

"github.com/moira-alert/moira/metrics"

"github.com/moira-alert/moira"
)

type notifier struct {
db moira.Database
log moira.Logger
metrics *metrics.HeartBeatMetrics
db moira.Database
log moira.Logger
}

func GetNotifier(logger moira.Logger, database moira.Database, metrics *metrics.HeartBeatMetrics) Heartbeater {
func GetNotifier(logger moira.Logger, database moira.Database) Heartbeater {
return &notifier{
db: database,
log: logger,
metrics: metrics,
db: database,
log: logger,
}
}

func (check notifier) Check(int64) (int64, bool, error) {
state, _ := check.db.GetNotifierState()
if state != moira.SelfStateOK {
check.metrics.MarkNotifierIsAlive(false)

check.log.Error().
String("error", check.GetErrorMessage()).
Msg("Notifier is not healthy")

return 0, true, nil
}
check.metrics.MarkNotifierIsAlive(true)

check.log.Debug().
String("state", state).
Expand Down
5 changes: 1 addition & 4 deletions notifier/selfstate/heartbeat/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"testing"
"time"

"github.com/moira-alert/moira/metrics"

"github.com/moira-alert/moira"
mock_moira_alert "github.com/moira-alert/moira/mock/moira-alert"

Expand Down Expand Up @@ -47,7 +45,6 @@ func TestNotifierState(t *testing.T) {
func createNotifierStateTest(t *testing.T) *notifier {
mockCtrl := gomock.NewController(t)
logger, _ := logging.GetLogger("MetricDelay")
metric := metrics.ConfigureHeartBeatMetrics(metrics.NewDummyRegistry())

return GetNotifier(logger, mock_moira_alert.NewMockDatabase(mockCtrl), metric).(*notifier)
return GetNotifier(logger, mock_moira_alert.NewMockDatabase(mockCtrl)).(*notifier)
}
Loading
Loading