From 56df399a128f365d1139263bcf0fe1d3e9a0a74e Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Tue, 31 Oct 2023 00:41:12 -0400 Subject: [PATCH] Ensure links don't get stuck in dialing status. Only mark link status failed if closing link was current. Fixes #1471 --- router/forwarder/forwarder.go | 12 ++++---- router/forwarder/tables.go | 6 ++++ router/handler_link/close.go | 3 +- router/link/link_events.go | 6 ++-- router/link/link_registry.go | 54 +++++++++++++++++++++++++++-------- router/xgress_edge/hosted.go | 2 +- 6 files changed, 59 insertions(+), 24 deletions(-) diff --git a/router/forwarder/forwarder.go b/router/forwarder/forwarder.go index 1be4eff6a6..3274d7270e 100644 --- a/router/forwarder/forwarder.go +++ b/router/forwarder/forwarder.go @@ -18,14 +18,14 @@ package forwarder import ( "github.com/michaelquigley/pfxlog" + "github.com/openziti/foundation/v2/errorz" + "github.com/openziti/foundation/v2/info" + "github.com/openziti/metrics" "github.com/openziti/ziti/common/inspect" "github.com/openziti/ziti/common/pb/ctrl_pb" "github.com/openziti/ziti/common/trace" "github.com/openziti/ziti/router/xgress" "github.com/openziti/ziti/router/xlink" - "github.com/openziti/foundation/v2/errorz" - "github.com/openziti/foundation/v2/info" - "github.com/openziti/metrics" "github.com/pkg/errors" "github.com/sirupsen/logrus" "time" @@ -110,14 +110,12 @@ func (forwarder *Forwarder) HasDestination(address xgress.Address) bool { } func (forwarder *Forwarder) RegisterLink(link xlink.LinkDestination) error { - if !forwarder.destinations.addDestinationIfAbsent(xgress.Address(link.Id()), link) { - return errors.Errorf("unable to register link %v as it is already registered", link.Id()) - } + forwarder.destinations.addDestination(xgress.Address(link.Id()), link) return nil } func (forwarder *Forwarder) UnregisterLink(link xlink.LinkDestination) { - forwarder.destinations.removeDestination(xgress.Address(link.Id())) + forwarder.destinations.removeDestinationIfMatches(xgress.Address(link.Id()), link) } func (forwarder *Forwarder) Route(ctrlId string, route *ctrl_pb.Route) error { diff --git a/router/forwarder/tables.go b/router/forwarder/tables.go index a4ffdce5f8..2543290c8b 100644 --- a/router/forwarder/tables.go +++ b/router/forwarder/tables.go @@ -130,6 +130,12 @@ func (dt *destinationTable) removeDestination(addr xgress.Address) { dt.destinations.Remove(string(addr)) } +func (dt *destinationTable) removeDestinationIfMatches(addr xgress.Address, destination Destination) { + dt.destinations.RemoveCb(string(addr), func(key string, v Destination, exists bool) bool { + return exists && destination == v + }) +} + func (dt *destinationTable) linkDestinationToCircuit(circuitId string, address xgress.Address) { var addresses []xgress.Address if i, found := dt.xgress.Get(circuitId); found { diff --git a/router/handler_link/close.go b/router/handler_link/close.go index 4676ea6965..8803fed9de 100644 --- a/router/handler_link/close.go +++ b/router/handler_link/close.go @@ -52,6 +52,8 @@ func (self *closeHandler) HandleClose(ch channel.Channel) { WithField("linkId", self.link.Id()). WithField("routerId", self.link.DestinationId()) + self.forwarder.UnregisterLink(self.link) + // ensure that both parts of a split link are closed, if one side closes go func() { _ = self.link.Close() @@ -72,7 +74,6 @@ func (self *closeHandler) HandleClose(ch channel.Channel) { }) }) - self.forwarder.UnregisterLink(self.link) close(self.closeNotify) } } diff --git a/router/link/link_events.go b/router/link/link_events.go index 6524e38cfe..9392d7a44a 100644 --- a/router/link/link_events.go +++ b/router/link/link_events.go @@ -19,11 +19,11 @@ package link import ( "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v2" - "github.com/openziti/ziti/controller/idgen" + "github.com/openziti/foundation/v2/stringz" "github.com/openziti/ziti/common/inspect" "github.com/openziti/ziti/common/pb/ctrl_pb" + "github.com/openziti/ziti/controller/idgen" "github.com/openziti/ziti/router/xlink" - "github.com/openziti/foundation/v2/stringz" "github.com/pkg/errors" "sync/atomic" "time" @@ -255,7 +255,7 @@ func (self *updateLinkState) Handle(registry *linkRegistryImpl) { } state.status = self.status - if state.status == StatusQueueFailed || state.status == StatusDialFailed { + if state.status == StatusDialFailed { state.dialFailed(registry) } } diff --git a/router/link/link_registry.go b/router/link/link_registry.go index 84bd8fd796..f1f311eaeb 100644 --- a/router/link/link_registry.go +++ b/router/link/link_registry.go @@ -22,11 +22,11 @@ import ( "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v2" "github.com/openziti/channel/v2/protobufs" + "github.com/openziti/foundation/v2/goroutines" "github.com/openziti/ziti/common/inspect" "github.com/openziti/ziti/common/pb/ctrl_pb" "github.com/openziti/ziti/router/env" "github.com/openziti/ziti/router/xlink" - "github.com/openziti/foundation/v2/goroutines" "github.com/sirupsen/logrus" "sync" "sync/atomic" @@ -165,9 +165,9 @@ func (self *linkRegistryImpl) LinkClosed(link xlink.Xlink) { defer self.Unlock() if val := self.linkMap[link.Key()]; val == link { delete(self.linkMap, link.Key()) + self.updateLinkStateClosed(link) // only update link state to closed if this was the current link } delete(self.linkByIdMap, link.Id()) - self.updateLinkStateClosed(link) } func (self *linkRegistryImpl) Shutdown() { @@ -325,13 +325,31 @@ func (self *linkRegistryImpl) evaluateLinkStateQueue() { } func (self *linkRegistryImpl) evaluateDestinations() { - for _, dest := range self.destinations { - // TODO: When do we drop destinations? Should we ask the controller after the router has been - // unhealthy for a while and it doesn't have any established links? Do this on exponential backoff? - // Should the controller send router removed messages? + for destId, dest := range self.destinations { + hasEstablishedLinks := false for _, state := range dest.linkMap { + // verify that links marked as established have an open link. There's a small chance that a link established + // and link closed could be processed out of order if the event queue is full. This way, it will eventually + // get fixed. + if state.status == StatusEstablished { + link, _ := self.GetLink(state.linkKey) + if link == nil || link.IsClosed() { + state.retryDelay = time.Duration(0) + state.nextDial = time.Now() + } else { + hasEstablishedLinks = true + } + } + self.evaluateLinkState(state) } + + // we are notified of deleted routers. In case we're unreachable while a router is deleted, + // we will also stop trying to contact unhealthy routers after a period. If a destination + // has nothing to dial, it should also be removed + if len(dest.linkMap) == 0 || (!dest.healthy && !hasEstablishedLinks && time.Since(dest.unhealthyAt) > 8*time.Hour) { + delete(self.destinations, destId) + } } } @@ -343,14 +361,17 @@ func (self *linkRegistryImpl) evaluateLinkState(state *linkState) { if couldDial { state.status = StatusDialing state.dialAttempts++ + log.Info("queuing link to dial") err := self.env.GetLinkDialerPool().QueueOrError(func() { link, _ := self.GetLink(state.linkKey) if link != nil { - log.Warn("link already present, but link status still pending") + log.Info("link already present, attempting to mark established") + self.updateLinkStateEstablished(link) return } + log.Info("dialing link") link, err := state.dialer.Dial(state) if err != nil { log.WithError(err).Error("error dialing link") @@ -360,14 +381,23 @@ func (self *linkRegistryImpl) evaluateLinkState(state *linkState) { }) return } - self.DialSucceeded(link) + + existing, success := self.DialSucceeded(link) + if !success { + if existing != nil { + self.updateLinkStateEstablished(link) + } else { + self.queueEvent(&updateLinkState{ + linkState: state, + status: StatusDialFailed, + }) + } + } }) if err != nil { log.WithError(err).Error("unable to queue link dial, see pool error") - self.queueEvent(&updateLinkState{ - linkState: state, - status: StatusQueueFailed, - }) + state.status = StatusQueueFailed + state.dialFailed(self) } } } diff --git a/router/xgress_edge/hosted.go b/router/xgress_edge/hosted.go index 26187677db..b7a6f0161e 100644 --- a/router/xgress_edge/hosted.go +++ b/router/xgress_edge/hosted.go @@ -66,7 +66,7 @@ func (registry *hostedServiceRegistry) cleanupDuplicates(newest *edgeTerminator) registry.services.Range(func(key, value interface{}) bool { terminator := value.(*edgeTerminator) if terminator != newest && newest.token == terminator.token && newest.instance == terminator.instance { - terminator.close(true, "duplicate terminator") // don't notify, channel is already closed, we can't send messages + terminator.close(false, "duplicate terminator") // don't notify, channel is already closed, we can't send messages registry.services.Delete(key) pfxlog.Logger().WithField("routerId", terminator.edgeClientConn.listener.id.Token). WithField("sessionToken", terminator.token).