Skip to content

Commit

Permalink
Add warning message when cluster is leaderless longer than configured…
Browse files Browse the repository at this point in the history
… threshold. Fixes #2548
  • Loading branch information
plorenz committed Jan 9, 2025
1 parent 7b4ca1d commit e77ebaf
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 23 deletions.
13 changes: 13 additions & 0 deletions controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func LoadConfig(path string) (*Config, error) {
controllerConfig.Raft.HeartbeatTimeout = 3 * time.Second
controllerConfig.Raft.LeaderLeaseTimeout = 3 * time.Second
controllerConfig.Raft.CommandHandlerOptions.MaxQueueSize = DefaultRaftCommandHandlerMaxQueueSize
controllerConfig.Raft.WarnWhenLeaderlessFor = time.Minute

if value, found := submap["dataDir"]; found {
controllerConfig.Raft.DataDir = value.(string)
Expand Down Expand Up @@ -298,6 +299,18 @@ func LoadConfig(path string) (*Config, error) {
controllerConfig.Raft.LogLevel = &val
}

if value, found := submap["warnWhenLeaderlessFor"]; found {
if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil {
if val < 10*time.Second {
pfxlog.Logger().Infof("invalid value %s for raft.warnWhenLeaderlessFor, must be >= 10s", val)
} else {
controllerConfig.Raft.WarnWhenLeaderlessFor = val
}
} else {
return nil, errors.Wrapf(err, "failed to parse raft.warnWhenLeaderlessFor value '%v", value)
}
}

if value, found := submap["logFile"]; found {
val := fmt.Sprintf("%v", value)
options := *hclog.DefaultOptions
Expand Down
2 changes: 2 additions & 0 deletions controller/config/config_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ type RaftConfig struct {

LogLevel *string
Logger hclog.Logger

WarnWhenLeaderlessFor time.Duration
}
91 changes: 68 additions & 23 deletions controller/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,10 +667,18 @@ func (self *Controller) validateCert() {
}
}

type clusterEventState struct {
isReadWrite bool
hasLeader bool
noLeaderAt time.Time
warningEmitted bool
}

func (self *Controller) ObserveLeaderChanges() {
self.Raft.RegisterObserver(raft.NewObserver(self.clusterEvents, true, func(o *raft.Observation) bool {
_, ok := o.Data.(raft.RaftState)
return ok
_, isRaftState := o.Data.(raft.RaftState)
_, isLeaderState := o.Data.(raft.LeaderObservation)
return isRaftState || isLeaderState
}))

go func() {
Expand All @@ -679,33 +687,70 @@ func (self *Controller) ObserveLeaderChanges() {
self.handleClusterStateChange(ClusterEventLeadershipGained, newClusterState(true, true))
}

isReadWrite := true

for observation := range self.clusterEvents {
pfxlog.Logger().Tracef("raft observation received: isLeader: %v, isReadWrite: %v", self.isLeader.Load(), isReadWrite)
if raftState, ok := observation.Data.(raft.RaftState); ok {
if raftState == raft.Leader && !self.isLeader.Load() {
self.isLeader.Store(true)
self.handleClusterStateChange(ClusterEventLeadershipGained, newClusterState(true, isReadWrite))
} else if raftState != raft.Leader && self.isLeader.Load() {
self.isLeader.Store(false)
self.handleClusterStateChange(ClusterEventLeadershipLost, newClusterState(false, isReadWrite))
}
} else if state, ok := observation.Data.(mesh.ClusterState); ok {
if state == mesh.ClusterReadWrite {
isReadWrite = true
self.handleClusterStateChange(ClusterEventReadWrite, newClusterState(self.isLeader.Load(), isReadWrite))
} else if state == mesh.ClusterReadOnly {
isReadWrite = false
self.handleClusterStateChange(ClusterEventReadOnly, newClusterState(self.isLeader.Load(), isReadWrite))
leaderAddr, _ := self.Raft.LeaderWithID()

eventState := &clusterEventState{
isReadWrite: true,
hasLeader: leaderAddr != "",
noLeaderAt: time.Now(),
}

ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for {
select {
case observation := <-self.clusterEvents:
self.processRaftObservation(observation, eventState)
case <-ticker.C:
if !eventState.warningEmitted && !eventState.hasLeader && time.Since(eventState.noLeaderAt) > self.Config.WarnWhenLeaderlessFor {
pfxlog.Logger().WithField("timeSinceLeader", time.Since(eventState.noLeaderAt).String()).
Warn("cluster running without leader for longer than configured threshold")
eventState.warningEmitted = true
}
}

pfxlog.Logger().Tracef("raft observation processed: isLeader: %v, isReadWrite: %v", self.isLeader.Load(), isReadWrite)
}
}()
}

func (self *Controller) processRaftObservation(observation raft.Observation, eventState *clusterEventState) {
pfxlog.Logger().Tracef("raft observation received: isLeader: %v, isReadWrite: %v", self.isLeader.Load(), eventState.isReadWrite)

if raftState, ok := observation.Data.(raft.RaftState); ok {
if raftState == raft.Leader && !self.isLeader.Load() {
self.isLeader.Store(true)
self.handleClusterStateChange(ClusterEventLeadershipGained, newClusterState(true, eventState.isReadWrite))
} else if raftState != raft.Leader && self.isLeader.Load() {
self.isLeader.Store(false)
self.handleClusterStateChange(ClusterEventLeadershipLost, newClusterState(false, eventState.isReadWrite))
}
}

if state, ok := observation.Data.(mesh.ClusterState); ok {
if state == mesh.ClusterReadWrite {
eventState.isReadWrite = true
self.handleClusterStateChange(ClusterEventReadWrite, newClusterState(self.isLeader.Load(), eventState.isReadWrite))
} else if state == mesh.ClusterReadOnly {
eventState.isReadWrite = false
self.handleClusterStateChange(ClusterEventReadOnly, newClusterState(self.isLeader.Load(), eventState.isReadWrite))
}
}

if leaderState, ok := observation.Data.(raft.LeaderObservation); ok {
if leaderState.LeaderAddr == "" {
if eventState.hasLeader {
eventState.warningEmitted = false
eventState.noLeaderAt = time.Now()
eventState.hasLeader = false
}
} else {
eventState.hasLeader = true
}
}

pfxlog.Logger().Tracef("raft observation processed: isLeader: %v, isReadWrite: %v", self.isLeader.Load(), eventState.isReadWrite)
}

func (self *Controller) handleClusterStateChange(event ClusterEvent, state ClusterState) {
for _, handler := range self.clusterStateChangeHandlers.Value() {
handler(event, state)
Expand Down

0 comments on commit e77ebaf

Please sign in to comment.