Skip to content

Commit

Permalink
Merge pull request #2685 from openziti/fix-ha-startup-bugs
Browse files Browse the repository at this point in the history
Fix boostrap panic. Fixes #2682 Fix error on peer save. Fixes #2683. Fix duplicate cluster events. Fixes #2684
  • Loading branch information
plorenz authored Jan 24, 2025
2 parents 8b410b9 + d0c0685 commit e80a109
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 18 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# Release 1.3.1

# What's New

* Bug Fixes

## Component Updates and Bug Fixes

* github.com/openziti/ziti: [v1.3.0 -> v1.3.1](https://github.com/openziti/ziti/compare/v1.3.0...v1.3.1)
* [Issue #2682](https://github.com/openziti/ziti/issues/2682) - HA Controller panics when bootstrapping by setting the db variable in the configuration
* [Issue #2683](https://github.com/openziti/ziti/issues/2683) - Controller fails to save peer configuration on a fresh install
* [Issue #2684](https://github.com/openziti/ziti/issues/2684) - Controller emits duplicate cluster events on startup

# Release 1.3.0

## What's New
Expand Down
8 changes: 7 additions & 1 deletion controller/model/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,20 @@ func (self *ControllerManager) PeersConnected(peers []*event.ClusterPeer, peerCo
}
}

pfxlog.Logger().Infof("acting as leader, updating controllers with peers, self: %s, peers: %s", nfpem.FingerprintFromCertificate(selfAsPeer.ServerCert[0]), peerFingerprints)
pfxlog.Logger().Infof("acting as leader, updating controllers with peers, self: %s, peer count: %d, peers: %s",
nfpem.FingerprintFromCertificate(selfAsPeer.ServerCert[0]), len(peers), peerFingerprints)

if !peerConnectedEvent {
// add this controller as a "peer" when leadership is gained
peers = append(peers, selfAsPeer)
}

for _, peer := range peers {
// Use our locally built peer instance to represent ourselves in the list
if peer.Id == selfAsPeer.Id && peer != selfAsPeer {
continue
}

if len(peer.ServerCert) < 1 {
pfxlog.Logger().Errorf("peer %s has no certificate", peer.Id)
continue
Expand Down
12 changes: 7 additions & 5 deletions controller/raft/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package raft
import (
"bytes"
"compress/gzip"
"errors"
"fmt"
"github.com/hashicorp/raft"
"github.com/michaelquigley/pfxlog"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/openziti/ziti/controller/command"
"github.com/openziti/ziti/controller/db"
event2 "github.com/openziti/ziti/controller/event"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.etcd.io/bbolt"
"io"
Expand Down Expand Up @@ -153,7 +153,9 @@ func (self *BoltDbFsm) storeConfigurationInRaft(index uint64, servers []raft.Ser
func (self *BoltDbFsm) storeServers(tx *bbolt.Tx, servers []raft.Server) error {
raftBucket := boltz.GetOrCreatePath(tx, db.RootBucket, db.MetadataBucket)
if err := raftBucket.DeleteBucket([]byte(ServersBucket)); err != nil {
return err
if !errors.Is(err, bbolt.ErrBucketNotFound) {
return err
}
}

for _, server := range servers {
Expand Down Expand Up @@ -261,7 +263,7 @@ func (self *BoltDbFsm) Apply(log *raft.Log) interface{} {

return err
} else {
return errors.Errorf("log data contained invalid message type. data: %+v", log.Data)
return fmt.Errorf("log data contained invalid message type. data: %+v", log.Data)
}
}
return nil
Expand All @@ -278,7 +280,7 @@ func (self *BoltDbFsm) Snapshot() (raft.FSMSnapshot, error) {
}

if err = gzWriter.Close(); err != nil {
return nil, errors.Wrap(err, "error finishing gz compression of raft snapshot")
return nil, fmt.Errorf("error finishing gz compression of raft snapshot (%w)", err)
}

logrus.WithField("id", id).WithField("index", self.indexTracker.Index()).Info("creating snapshot")
Expand Down Expand Up @@ -379,7 +381,7 @@ func (self *BoltDbFsm) restoreSnapshotDbFile(path string, snapshot io.ReadCloser

gzReader, err := gzip.NewReader(snapshot)
if err != nil {
return errors.Wrapf(err, "unable to create gz reader for reading raft snapshot during restore")
return fmt.Errorf("unable to create gz reader for reading raft snapshot during restore (%w)", err)
}

if _, err = io.Copy(dbFile, gzReader); err != nil {
Expand Down
32 changes: 20 additions & 12 deletions controller/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (self *Controller) initErrorMappers() {

func (self *Controller) RegisterClusterEventHandler(f func(event ClusterEvent, state ClusterState, leaderId string)) {
if self.isLeader.Load() {
f(ClusterEventLeadershipGained, newClusterState(true, !self.Mesh.IsReadOnly()), "")
f(ClusterEventLeadershipGained, newClusterState(true, !self.Mesh.IsReadOnly()), self.env.GetId().Token)
}
self.clusterStateChangeHandlers.Append(f)
}
Expand Down Expand Up @@ -698,11 +698,6 @@ func (self *Controller) ObserveLeaderChanges() {
leaderId: string(leaderId),
}

if self.Raft.State() == raft.Leader {
self.isLeader.Store(true)
self.handleClusterStateChange(ClusterEventLeadershipGained, eventState)
}

if eventState.hasLeader {
self.handleClusterStateChange(ClusterEventHasLeader, eventState)
} else {
Expand All @@ -712,11 +707,24 @@ func (self *Controller) ObserveLeaderChanges() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

first := false

for {
select {
case observation := <-self.clusterEvents:
self.processRaftObservation(observation, eventState)
case <-ticker.C:
if first {
// delay this check because it seems like raft generates observations for leader state, so if we do this
// first we're going to get duplicates
if self.Raft.State() == raft.Leader {
if wasLeader := self.isLeader.Swap(true); !wasLeader {
self.handleClusterStateChange(ClusterEventLeadershipGained, eventState)
}
}
first = false
}

if !eventState.warningEmitted && !eventState.hasLeader && time.Since(eventState.noLeaderAt) > self.Config.WarnWhenLeaderlessFor {
pfxlog.Logger().WithField("timeSinceLeader", time.Since(eventState.noLeaderAt).String()).
Warn("cluster running without leader for longer than configured threshold")
Expand All @@ -731,11 +739,11 @@ func (self *Controller) processRaftObservation(observation raft.Observation, eve
pfxlog.Logger().Tracef("raft observation received: isLeader: %v, isReadWrite: %v", self.isLeader.Load(), eventState.isReadWrite)

if raftState, ok := observation.Data.(raft.RaftState); ok {
if raftState == raft.Leader && !self.isLeader.Load() {
self.isLeader.Store(true)
self.handleClusterStateChange(ClusterEventLeadershipGained, eventState)
} else if raftState != raft.Leader && self.isLeader.Load() {
self.isLeader.Store(false)
if raftState == raft.Leader {
if wasLeader := self.isLeader.Swap(true); !wasLeader {
self.handleClusterStateChange(ClusterEventLeadershipGained, eventState)
}
} else if wasLeader := self.isLeader.Swap(false); wasLeader {
self.handleClusterStateChange(ClusterEventLeadershipLost, eventState)
}
}
Expand Down Expand Up @@ -798,7 +806,7 @@ func (self *Controller) Bootstrap() error {
firstCheckPassed := false
for {
// make sure this is in a reasonably steady state by waiting a bit longer and checking twice
if self.isLeader.Load() {
if _, leaderId := self.Raft.LeaderWithID(); leaderId != "" {
if firstCheckPassed {
break
} else {
Expand Down

0 comments on commit e80a109

Please sign in to comment.