Skip to content

Commit

Permalink
Fix terminator id race condition. Fixes #1685
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Jan 19, 2024
1 parent cae2dc5 commit 26c6263
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 33 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ New metrics:
* [Issue #120](https://github.com/openziti/channel/issues/120) - Allow handling new underlay instances with function instead of channel

* github.com/openziti/edge-api: [v0.26.6 -> v0.26.7](https://github.com/openziti/edge-api/compare/v0.26.6...v0.26.7)
* github.com/openziti/sdk-golang: [v0.22.0 -> v0.22.5](https://github.com/openziti/sdk-golang/compare/v0.22.0...v0.22.5)
* github.com/openziti/sdk-golang: [v0.22.0 -> v0.22.6](https://github.com/openziti/sdk-golang/compare/v0.22.0...v0.22.6)
* github.com/openziti/secretstream: [v0.1.14 -> v0.1.15](https://github.com/openziti/secretstream/compare/v0.1.14...v0.1.15)
* github.com/openziti/ziti: [v0.31.4 -> v0.32.0](https://github.com/openziti/ziti/compare/v0.31.4...v0.32.0)
* [Issue #1685](https://github.com/openziti/ziti/issues/1685) - Race condition where we try to create terminator after client connection is closed
* [Issue #1678](https://github.com/openziti/ziti/issues/1678) - Add link validation utility
* [Issue #1673](https://github.com/openziti/ziti/issues/1673) - xgress dialers not getting passed xgress config
* [Issue #1669](https://github.com/openziti/ziti/issues/1669) - Make sure link accepts are not single threaded
* [Issue #1657](https://github.com/openziti/ziti/issues/1657) - Add api session rate limiter

Expand Down
20 changes: 14 additions & 6 deletions controller/handler_edge_ctrl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (

"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/ziti/controller/network"
"github.com/openziti/ziti/controller/xt"
"github.com/openziti/ziti/common/logcontext"
"github.com/openziti/foundation/v2/stringz"
"github.com/openziti/identity"
"github.com/openziti/sdk-golang/ziti/edge"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/common/logcontext"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/network"
"github.com/openziti/ziti/controller/xt"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -367,6 +367,14 @@ func (self *baseSessionRequestContext) verifyTerminator(terminatorId string, bin
return nil
}

func (self *baseSessionRequestContext) verifyTerminatorId(id string) {
if self.err == nil {
if id == "" {
self.err = invalidTerminator("provided terminator id is blank")
}
}
}

func (self *baseSessionRequestContext) updateTerminator(terminator *network.Terminator, request UpdateTerminatorRequest, ctx *change.Context) {
if self.err == nil {
checker := fields.UpdatedFieldsMap{}
Expand Down
1 change: 1 addition & 0 deletions controller/handler_edge_ctrl/create_terminator_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (self *createTerminatorV2Handler) CreateTerminatorV2(ctx *CreateTerminatorV
if !ctx.loadRouter() {
return
}
ctx.verifyTerminatorId(ctx.req.Address)
ctx.loadSession(ctx.req.SessionToken)
ctx.checkSessionType(db.SessionTypeBind)
ctx.checkSessionFingerprints(ctx.req.Fingerprints)
Expand Down
5 changes: 5 additions & 0 deletions router/fabric/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type StateManager interface {
//"Network" Sessions
RemoveEdgeSession(token string)
AddEdgeSessionRemovedListener(token string, callBack func(token string)) RemoveListener
WasSessionRecentlyRemoved(token string) bool

//ApiSessions
GetApiSession(token string) *ApiSession
Expand Down Expand Up @@ -284,6 +285,10 @@ func (sm *StateManagerImpl) GetApiSession(token string) *ApiSession {
return nil
}

func (sm *StateManagerImpl) WasSessionRecentlyRemoved(token string) bool {
return sm.recentlyRemovedSessions.Has(token)
}

func (sm *StateManagerImpl) AddEdgeSessionRemovedListener(token string, callBack func(token string)) RemoveListener {
if sm.recentlyRemovedSessions.Has(token) {
go callBack(token) // callback can be long process with network traffic. Don't block event processing
Expand Down
3 changes: 2 additions & 1 deletion router/xgress_edge/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ func (self *edgeTerminator) close(notify bool, reason string) {
if terminatorId := self.terminatorId.Load(); terminatorId != "" {
if self.terminatorId.CompareAndSwap(terminatorId, "") {
logger.Debug("removing terminator on router")
self.edgeClientConn.listener.factory.hostedServices.Delete(terminatorId)

self.state.Store(TerminatorStateDeleting)
self.edgeClientConn.listener.factory.hostedServices.Delete(terminatorId)

logger.Info("removing terminator on controller")
ctrlCh := self.edgeClientConn.listener.factory.ctrls.AnyCtrlChannel()
if ctrlCh == nil {
Expand Down
49 changes: 36 additions & 13 deletions router/xgress_edge/hosted.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,20 @@ func (self *hostedServiceRegistry) scanForRetries() {
}

func (self *hostedServiceRegistry) tryEstablish(terminator *edgeTerminator) {
terminator.state.Store(TerminatorStateEstablishing)
log := pfxlog.Logger().WithField("terminatorId", terminator.Id()).
WithField("token", terminator.token).
WithField("state", terminator.state.Load().String())

if !terminator.state.CompareAndSwap(TerminatorStatePendingEstablishment, TerminatorStateEstablishing) {
log.Info("terminator not pending, not going to try to establish")
return
}

err := self.env.GetRateLimiterPool().QueueOrError(func() {
self.establishTerminatorWithRetry(terminator)
})
if err != nil {
terminator.state.Store(TerminatorStatePendingEstablishment)
pfxlog.Logger().WithField("terminatorId", terminator.Id()).Info("rate limited: unable to queue to establish")
log.Info("rate limited: unable to queue to establish")
self.retriesPending = true
}
}
Expand Down Expand Up @@ -171,7 +178,7 @@ func (self *hostedServiceRegistry) cleanupDuplicates(newest *edgeTerminator) {
terminator.close(false, "duplicate terminator") // don't notify, channel is already closed, we can't send messages
self.services.Delete(key)
pfxlog.Logger().WithField("routerId", terminator.edgeClientConn.listener.id.Token).
WithField("sessionToken", terminator.token).
WithField("token", terminator.token).
WithField("instance", terminator.instance).
WithField("terminatorId", terminator.terminatorId.Load()).
WithField("duplicateOf", newest.terminatorId.Load()).
Expand All @@ -189,7 +196,7 @@ func (self *hostedServiceRegistry) unbindSession(connId uint32, sessionToken str
terminator.close(false, "unbind successful") // don't notify, sdk asked us to unbind
self.services.Delete(key)
pfxlog.Logger().WithField("routerId", terminator.edgeClientConn.listener.id.Token).
WithField("sessionToken", sessionToken).
WithField("token", sessionToken).
WithField("terminatorId", terminator.terminatorId.Load()).
Info("terminator removed")
atLeastOneRemoved = true
Expand All @@ -212,7 +219,9 @@ func (self *hostedServiceRegistry) getRelatedTerminators(sessionToken string, pr
}

func (self *hostedServiceRegistry) establishTerminatorWithRetry(terminator *edgeTerminator) {
log := logrus.WithField("terminatorId", terminator.terminatorId.Load())
log := logrus.
WithField("terminatorId", terminator.terminatorId.Load()).
WithField("token", terminator.token)

if state := terminator.state.Load(); state != TerminatorStateEstablishing {
log.WithField("state", state.String()).Info("not attempting to establish terminator, not in establishing state")
Expand All @@ -228,10 +237,11 @@ func (self *hostedServiceRegistry) establishTerminatorWithRetry(terminator *edge
return backoff.Permanent(fmt.Errorf("terminator state is %v, stopping terminator creation for terminator %s",
state.String(), terminator.terminatorId.Load()))
}
if terminator.terminatorId.Load() == "" {
return backoff.Permanent(fmt.Errorf("terminator has been closed, stopping terminator creation"))
}

var err error
log.Info("attempting to establish terminator")
err = self.establishTerminator(terminator)
err := self.establishTerminator(terminator)
if err != nil && terminator.state.Load() != TerminatorStateEstablishing {
return backoff.Permanent(err)
}
Expand Down Expand Up @@ -260,10 +270,16 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato

log := pfxlog.Logger().
WithField("routerId", factory.env.GetRouterId().Token).
WithField("terminatorId", terminator.terminatorId.Load())
WithField("terminatorId", terminator.terminatorId.Load()).
WithField("token", terminator.token)

terminatorId := terminator.terminatorId.Load()
if terminatorId == "" {
return fmt.Errorf("edge link is closed, stopping terminator creation for terminator %s", terminatorId)
}

request := &edge_ctrl_pb.CreateTerminatorV2Request{
Address: terminator.terminatorId.Load(),
Address: terminatorId,
SessionToken: terminator.token,
Fingerprints: terminator.edgeClientConn.fingerprints.Prints(),
PeerData: terminator.hostData,
Expand All @@ -281,12 +297,14 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato
return errors.New(errStr)
}

log.Info("sending create terminator v2 request")

err := protobufs.MarshalTyped(request).WithTimeout(timeout).SendAndWaitForWire(ctrlCh)
if err != nil {
return err
}

if self.waitForTerminatorCreated(terminator.terminatorId.Load(), 10*time.Second) {
if self.waitForTerminatorCreated(terminatorId, 10*time.Second) {
return nil
}

Expand All @@ -295,7 +313,7 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato
return errors.Errorf("timeout waiting for response to create terminator request for terminator %v", terminator.terminatorId.Load())
}

func (self *hostedServiceRegistry) HandleCreateTerminatorResponse(msg *channel.Message, ctrlCh channel.Channel) {
func (self *hostedServiceRegistry) HandleCreateTerminatorResponse(msg *channel.Message, _ channel.Channel) {
log := pfxlog.Logger().WithField("routerId", self.env.GetRouterId().Token)

response := &edge_ctrl_pb.CreateTerminatorV2Response{}
Expand All @@ -313,6 +331,11 @@ func (self *hostedServiceRegistry) HandleCreateTerminatorResponse(msg *channel.M
return
}

if response.Result != edge_ctrl_pb.CreateTerminatorResult_Success {
terminator.close(true, response.Msg)
return
}

if terminator.state.CompareAndSwap(TerminatorStateEstablishing, TerminatorStateEstablished) {
self.notifyTerminatorCreated(response.TerminatorId)
log.Info("received terminator created notification")
Expand Down
31 changes: 19 additions & 12 deletions router/xgress_edge/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (self *edgeClientConn) processBind(req *channel.Message, ch channel.Channel
if ctrlCh == nil {
errStr := "no controller available, cannot create terminator"
pfxlog.ContextLogger(ch.Label()).
WithField("sessionToken", string(req.Body)).
WithField("token", string(req.Body)).
WithFields(edge.GetLoggerFields(req)).
WithField("routerId", self.listener.id.Token).
Error(errStr)
Expand All @@ -226,7 +226,7 @@ func (self *edgeClientConn) processBindV1(req *channel.Message, ch channel.Chann
token := string(req.Body)

log := pfxlog.ContextLogger(ch.Label()).
WithField("sessionToken", token).
WithField("token", token).
WithFields(edge.GetLoggerFields(req)).
WithField("routerId", self.listener.id.Token)

Expand Down Expand Up @@ -324,7 +324,7 @@ func (self *edgeClientConn) processBindV2(req *channel.Message, ch channel.Chann
token := string(req.Body)

log := pfxlog.ContextLogger(ch.Label()).
WithField("sessionToken", token).
WithField("token", token).
WithFields(edge.GetLoggerFields(req)).
WithField("routerId", self.listener.id.Token)

Expand Down Expand Up @@ -413,20 +413,27 @@ func (self *edgeClientConn) processBindV2(req *channel.Message, ch channel.Chann
notifyEstablished: notifyEstablished,
}
terminator.terminatorId.Store(terminatorId)

log.Info("establishing terminator")
terminator.state.Store(TerminatorStatePendingEstablishment)

// need to remove session remove listener on close
terminator.onClose = self.listener.factory.stateManager.AddEdgeSessionRemovedListener(token, func(token string) {
terminator.close(true, "session ended")
})

self.sendStateConnectedReply(req, nil)
// If the session was recently removed, the call to AddEdgeSessionRemovedListener will have asynchronously closed
// the terminator
if self.listener.factory.stateManager.WasSessionRecentlyRemoved(token) {
log.Info("invalid session, not establishing terminator")
} else {
log.Info("establishing terminator")

self.listener.factory.hostedServices.EstablishTerminator(terminator)
if listenerId == "" {
// only removed dupes with a scan if we don't have an sdk provided key
self.listener.factory.hostedServices.cleanupDuplicates(terminator)
self.sendStateConnectedReply(req, nil)

self.listener.factory.hostedServices.EstablishTerminator(terminator)
if listenerId == "" {
// only removed dupes with a scan if we don't have an sdk provided key
self.listener.factory.hostedServices.cleanupDuplicates(terminator)
}
}
}

Expand All @@ -438,7 +445,7 @@ func (self *edgeClientConn) processUnbind(req *channel.Message, _ channel.Channe
if !atLeastOneTerminatorRemoved {
pfxlog.Logger().
WithField("connId", connId).
WithField("sessionToken", token).
WithField("token", token).
Info("no terminator found to unbind for token")
}
}
Expand All @@ -458,7 +465,7 @@ func (self *edgeClientConn) removeTerminator(ctrlCh channel.Channel, token, term
func (self *edgeClientConn) processUpdateBind(req *channel.Message, ch channel.Channel) {
token := string(req.Body)

log := pfxlog.ContextLogger(ch.Label()).WithField("sessionToken", token).WithFields(edge.GetLoggerFields(req))
log := pfxlog.ContextLogger(ch.Label()).WithField("token", token).WithFields(edge.GetLoggerFields(req))
terminators := self.listener.factory.hostedServices.getRelatedTerminators(token, self)

if len(terminators) == 0 {
Expand Down

0 comments on commit 26c6263

Please sign in to comment.