From d0c06854d471d83ff368f07af0d10f7ccd1a7179 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Thu, 23 Jan 2025 15:59:30 -0500 Subject: [PATCH] Fix boostrap panic. Fixes #2682 Fix error on peer save. Fixes #2683. Fix duplicate cluster events. Fixes #2684 --- CHANGELOG.md | 13 +++++++++++ controller/model/controller_manager.go | 8 ++++++- controller/raft/fsm.go | 12 ++++++---- controller/raft/raft.go | 32 ++++++++++++++++---------- 4 files changed, 47 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61f352737..d2acb8814 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,16 @@ +# Release 1.3.1 + +# What's New + +* Bug Fixes + +## Component Updates and Bug Fixes + +* github.com/openziti/ziti: [v1.3.0 -> v1.3.1](https://github.com/openziti/ziti/compare/v1.3.0...v1.3.1) + * [Issue #2682](https://github.com/openziti/ziti/issues/2682) - HA Controller panics when bootstrapping by setting the db variable in the configuration + * [Issue #2683](https://github.com/openziti/ziti/issues/2683) - Controller fails to save peer configuration on a fresh install + * [Issue #2684](https://github.com/openziti/ziti/issues/2684) - Controller emits duplicate cluster events on startup + # Release 1.3.0 ## What's New diff --git a/controller/model/controller_manager.go b/controller/model/controller_manager.go index 71a2e184d..a5a97548f 100644 --- a/controller/model/controller_manager.go +++ b/controller/model/controller_manager.go @@ -218,7 +218,8 @@ func (self *ControllerManager) PeersConnected(peers []*event.ClusterPeer, peerCo } } - pfxlog.Logger().Infof("acting as leader, updating controllers with peers, self: %s, peers: %s", nfpem.FingerprintFromCertificate(selfAsPeer.ServerCert[0]), peerFingerprints) + pfxlog.Logger().Infof("acting as leader, updating controllers with peers, self: %s, peer count: %d, peers: %s", + nfpem.FingerprintFromCertificate(selfAsPeer.ServerCert[0]), len(peers), peerFingerprints) if !peerConnectedEvent { // add this controller as a "peer" when leadership is gained @@ -226,6 +227,11 @@ func (self *ControllerManager) PeersConnected(peers []*event.ClusterPeer, peerCo } for _, peer := range peers { + // Use our locally built peer instance to represent ourselves in the list + if peer.Id == selfAsPeer.Id && peer != selfAsPeer { + continue + } + if len(peer.ServerCert) < 1 { pfxlog.Logger().Errorf("peer %s has no certificate", peer.Id) continue diff --git a/controller/raft/fsm.go b/controller/raft/fsm.go index cad7f717a..52d98c7b7 100644 --- a/controller/raft/fsm.go +++ b/controller/raft/fsm.go @@ -19,6 +19,7 @@ package raft import ( "bytes" "compress/gzip" + "errors" "fmt" "github.com/hashicorp/raft" "github.com/michaelquigley/pfxlog" @@ -27,7 +28,6 @@ import ( "github.com/openziti/ziti/controller/command" "github.com/openziti/ziti/controller/db" event2 "github.com/openziti/ziti/controller/event" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "go.etcd.io/bbolt" "io" @@ -153,7 +153,9 @@ func (self *BoltDbFsm) storeConfigurationInRaft(index uint64, servers []raft.Ser func (self *BoltDbFsm) storeServers(tx *bbolt.Tx, servers []raft.Server) error { raftBucket := boltz.GetOrCreatePath(tx, db.RootBucket, db.MetadataBucket) if err := raftBucket.DeleteBucket([]byte(ServersBucket)); err != nil { - return err + if !errors.Is(err, bbolt.ErrBucketNotFound) { + return err + } } for _, server := range servers { @@ -261,7 +263,7 @@ func (self *BoltDbFsm) Apply(log *raft.Log) interface{} { return err } else { - return errors.Errorf("log data contained invalid message type. data: %+v", log.Data) + return fmt.Errorf("log data contained invalid message type. data: %+v", log.Data) } } return nil @@ -278,7 +280,7 @@ func (self *BoltDbFsm) Snapshot() (raft.FSMSnapshot, error) { } if err = gzWriter.Close(); err != nil { - return nil, errors.Wrap(err, "error finishing gz compression of raft snapshot") + return nil, fmt.Errorf("error finishing gz compression of raft snapshot (%w)", err) } logrus.WithField("id", id).WithField("index", self.indexTracker.Index()).Info("creating snapshot") @@ -379,7 +381,7 @@ func (self *BoltDbFsm) restoreSnapshotDbFile(path string, snapshot io.ReadCloser gzReader, err := gzip.NewReader(snapshot) if err != nil { - return errors.Wrapf(err, "unable to create gz reader for reading raft snapshot during restore") + return fmt.Errorf("unable to create gz reader for reading raft snapshot during restore (%w)", err) } if _, err = io.Copy(dbFile, gzReader); err != nil { diff --git a/controller/raft/raft.go b/controller/raft/raft.go index 2a5982104..f32a2e6b2 100644 --- a/controller/raft/raft.go +++ b/controller/raft/raft.go @@ -184,7 +184,7 @@ func (self *Controller) initErrorMappers() { func (self *Controller) RegisterClusterEventHandler(f func(event ClusterEvent, state ClusterState, leaderId string)) { if self.isLeader.Load() { - f(ClusterEventLeadershipGained, newClusterState(true, !self.Mesh.IsReadOnly()), "") + f(ClusterEventLeadershipGained, newClusterState(true, !self.Mesh.IsReadOnly()), self.env.GetId().Token) } self.clusterStateChangeHandlers.Append(f) } @@ -698,11 +698,6 @@ func (self *Controller) ObserveLeaderChanges() { leaderId: string(leaderId), } - if self.Raft.State() == raft.Leader { - self.isLeader.Store(true) - self.handleClusterStateChange(ClusterEventLeadershipGained, eventState) - } - if eventState.hasLeader { self.handleClusterStateChange(ClusterEventHasLeader, eventState) } else { @@ -712,11 +707,24 @@ func (self *Controller) ObserveLeaderChanges() { ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() + first := false + for { select { case observation := <-self.clusterEvents: self.processRaftObservation(observation, eventState) case <-ticker.C: + if first { + // delay this check because it seems like raft generates observations for leader state, so if we do this + // first we're going to get duplicates + if self.Raft.State() == raft.Leader { + if wasLeader := self.isLeader.Swap(true); !wasLeader { + self.handleClusterStateChange(ClusterEventLeadershipGained, eventState) + } + } + first = false + } + if !eventState.warningEmitted && !eventState.hasLeader && time.Since(eventState.noLeaderAt) > self.Config.WarnWhenLeaderlessFor { pfxlog.Logger().WithField("timeSinceLeader", time.Since(eventState.noLeaderAt).String()). Warn("cluster running without leader for longer than configured threshold") @@ -731,11 +739,11 @@ func (self *Controller) processRaftObservation(observation raft.Observation, eve pfxlog.Logger().Tracef("raft observation received: isLeader: %v, isReadWrite: %v", self.isLeader.Load(), eventState.isReadWrite) if raftState, ok := observation.Data.(raft.RaftState); ok { - if raftState == raft.Leader && !self.isLeader.Load() { - self.isLeader.Store(true) - self.handleClusterStateChange(ClusterEventLeadershipGained, eventState) - } else if raftState != raft.Leader && self.isLeader.Load() { - self.isLeader.Store(false) + if raftState == raft.Leader { + if wasLeader := self.isLeader.Swap(true); !wasLeader { + self.handleClusterStateChange(ClusterEventLeadershipGained, eventState) + } + } else if wasLeader := self.isLeader.Swap(false); wasLeader { self.handleClusterStateChange(ClusterEventLeadershipLost, eventState) } } @@ -798,7 +806,7 @@ func (self *Controller) Bootstrap() error { firstCheckPassed := false for { // make sure this is in a reasonably steady state by waiting a bit longer and checking twice - if self.isLeader.Load() { + if _, leaderId := self.Raft.LeaderWithID(); leaderId != "" { if firstCheckPassed { break } else {