Skip to content

Commit

Permalink
Ensure links don't get stuck in dialing status. Only mark link status…
Browse files Browse the repository at this point in the history
… failed if closing link was current. Fixes #1471
  • Loading branch information
plorenz committed Oct 31, 2023
1 parent 386db7c commit eaba704
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 28 deletions.
12 changes: 5 additions & 7 deletions router/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package forwarder

import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/foundation/v2/info"
"github.com/openziti/metrics"
"github.com/openziti/ziti/common/inspect"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/openziti/ziti/common/trace"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xlink"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/foundation/v2/info"
"github.com/openziti/metrics"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"time"
Expand Down Expand Up @@ -110,14 +110,12 @@ func (forwarder *Forwarder) HasDestination(address xgress.Address) bool {
}

func (forwarder *Forwarder) RegisterLink(link xlink.LinkDestination) error {
if !forwarder.destinations.addDestinationIfAbsent(xgress.Address(link.Id()), link) {
return errors.Errorf("unable to register link %v as it is already registered", link.Id())
}
forwarder.destinations.addDestination(xgress.Address(link.Id()), link)
return nil
}

func (forwarder *Forwarder) UnregisterLink(link xlink.LinkDestination) {
forwarder.destinations.removeDestination(xgress.Address(link.Id()))
forwarder.destinations.removeDestinationIfMatches(xgress.Address(link.Id()), link)
}

func (forwarder *Forwarder) Route(ctrlId string, route *ctrl_pb.Route) error {
Expand Down
10 changes: 6 additions & 4 deletions router/forwarder/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ func (dt *destinationTable) addDestination(addr xgress.Address, destination Dest
dt.destinations.Set(string(addr), destination)
}

func (dt *destinationTable) addDestinationIfAbsent(addr xgress.Address, destination Destination) bool {
return dt.destinations.SetIfAbsent(string(addr), destination)
}

func (dt *destinationTable) getDestination(addr xgress.Address) (Destination, bool) {
if dst, found := dt.destinations.Get(string(addr)); found {
return dst, true
Expand All @@ -130,6 +126,12 @@ func (dt *destinationTable) removeDestination(addr xgress.Address) {
dt.destinations.Remove(string(addr))
}

func (dt *destinationTable) removeDestinationIfMatches(addr xgress.Address, destination Destination) {
dt.destinations.RemoveCb(string(addr), func(key string, v Destination, exists bool) bool {
return exists && destination == v
})
}

func (dt *destinationTable) linkDestinationToCircuit(circuitId string, address xgress.Address) {
var addresses []xgress.Address
if i, found := dt.xgress.Get(circuitId); found {
Expand Down
3 changes: 2 additions & 1 deletion router/handler_link/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (self *closeHandler) HandleClose(ch channel.Channel) {
WithField("linkId", self.link.Id()).
WithField("routerId", self.link.DestinationId())

self.forwarder.UnregisterLink(self.link)

// ensure that both parts of a split link are closed, if one side closes
go func() {
_ = self.link.Close()
Expand All @@ -72,7 +74,6 @@ func (self *closeHandler) HandleClose(ch channel.Channel) {
})
})

self.forwarder.UnregisterLink(self.link)
close(self.closeNotify)
}
}
6 changes: 3 additions & 3 deletions router/link/link_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package link
import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/controller/idgen"
"github.com/openziti/foundation/v2/stringz"
"github.com/openziti/ziti/common/inspect"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/openziti/ziti/controller/idgen"
"github.com/openziti/ziti/router/xlink"
"github.com/openziti/foundation/v2/stringz"
"github.com/pkg/errors"
"sync/atomic"
"time"
Expand Down Expand Up @@ -255,7 +255,7 @@ func (self *updateLinkState) Handle(registry *linkRegistryImpl) {
}

state.status = self.status
if state.status == StatusQueueFailed || state.status == StatusDialFailed {
if state.status == StatusDialFailed {
state.dialFailed(registry)
}
}
Expand Down
54 changes: 42 additions & 12 deletions router/link/link_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/channel/v2/protobufs"
"github.com/openziti/foundation/v2/goroutines"
"github.com/openziti/ziti/common/inspect"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/xlink"
"github.com/openziti/foundation/v2/goroutines"
"github.com/sirupsen/logrus"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -165,9 +165,9 @@ func (self *linkRegistryImpl) LinkClosed(link xlink.Xlink) {
defer self.Unlock()
if val := self.linkMap[link.Key()]; val == link {
delete(self.linkMap, link.Key())
self.updateLinkStateClosed(link) // only update link state to closed if this was the current link
}
delete(self.linkByIdMap, link.Id())
self.updateLinkStateClosed(link)
}

func (self *linkRegistryImpl) Shutdown() {
Expand Down Expand Up @@ -325,13 +325,31 @@ func (self *linkRegistryImpl) evaluateLinkStateQueue() {
}

func (self *linkRegistryImpl) evaluateDestinations() {
for _, dest := range self.destinations {
// TODO: When do we drop destinations? Should we ask the controller after the router has been
// unhealthy for a while and it doesn't have any established links? Do this on exponential backoff?
// Should the controller send router removed messages?
for destId, dest := range self.destinations {
hasEstablishedLinks := false
for _, state := range dest.linkMap {
// verify that links marked as established have an open link. There's a small chance that a link established
// and link closed could be processed out of order if the event queue is full. This way, it will eventually
// get fixed.
if state.status == StatusEstablished {
link, _ := self.GetLink(state.linkKey)
if link == nil || link.IsClosed() {
state.retryDelay = time.Duration(0)
state.nextDial = time.Now()
} else {
hasEstablishedLinks = true
}
}

self.evaluateLinkState(state)
}

// we are notified of deleted routers. In case we're unreachable while a router is deleted,
// we will also stop trying to contact unhealthy routers after a period. If a destination
// has nothing to dial, it should also be removed
if len(dest.linkMap) == 0 || (!dest.healthy && !hasEstablishedLinks && time.Since(dest.unhealthyAt) > 8*time.Hour) {
delete(self.destinations, destId)
}
}
}

Expand All @@ -343,14 +361,17 @@ func (self *linkRegistryImpl) evaluateLinkState(state *linkState) {
if couldDial {
state.status = StatusDialing
state.dialAttempts++
log.Info("queuing link to dial")

err := self.env.GetLinkDialerPool().QueueOrError(func() {
link, _ := self.GetLink(state.linkKey)
if link != nil {
log.Warn("link already present, but link status still pending")
log.Info("link already present, attempting to mark established")
self.updateLinkStateEstablished(link)
return
}

log.Info("dialing link")
link, err := state.dialer.Dial(state)
if err != nil {
log.WithError(err).Error("error dialing link")
Expand All @@ -360,14 +381,23 @@ func (self *linkRegistryImpl) evaluateLinkState(state *linkState) {
})
return
}
self.DialSucceeded(link)

existing, success := self.DialSucceeded(link)
if !success {
if existing != nil {
self.updateLinkStateEstablished(link)
} else {
self.queueEvent(&updateLinkState{
linkState: state,
status: StatusDialFailed,
})
}
}
})
if err != nil {
log.WithError(err).Error("unable to queue link dial, see pool error")
self.queueEvent(&updateLinkState{
linkState: state,
status: StatusQueueFailed,
})
state.status = StatusQueueFailed
state.dialFailed(self)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion router/xgress_edge/hosted.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (registry *hostedServiceRegistry) cleanupDuplicates(newest *edgeTerminator)
registry.services.Range(func(key, value interface{}) bool {
terminator := value.(*edgeTerminator)
if terminator != newest && newest.token == terminator.token && newest.instance == terminator.instance {
terminator.close(true, "duplicate terminator") // don't notify, channel is already closed, we can't send messages
terminator.close(false, "duplicate terminator") // don't notify, channel is already closed, we can't send messages
registry.services.Delete(key)
pfxlog.Logger().WithField("routerId", terminator.edgeClientConn.listener.id.Token).
WithField("sessionToken", terminator.token).
Expand Down

0 comments on commit eaba704

Please sign in to comment.