Skip to content

Commit

Permalink
Merge pull request #1587 from openziti/fix-early-ack-stall
Browse files Browse the repository at this point in the history
Fix stall cause by ack arriving before payload is received by link send buffer. Fixes #1586
  • Loading branch information
plorenz authored Dec 14, 2023
2 parents beb8daa + d1cc18c commit 6dbcde0
Show file tree
Hide file tree
Showing 14 changed files with 269 additions and 50 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# Release 0.31.4

## What's New

* Bug fix for a data flow stall which is especially likely to happen on circuits with single router paths

## Thanks

* @marvkis - for providing high quality debug data which made tracking down a couple of flow control stall issues much easier

## Component Updates and Bug Fixes

* github.com/openziti/metrics: [v1.2.40 -> v1.2.41](https://github.com/openziti/metrics/compare/v1.2.40...v1.2.41)
* github.com/openziti/storage: [v0.2.26 -> v0.2.27](https://github.com/openziti/storage/compare/v0.2.26...v0.2.27)
* github.com/openziti/ziti: [v0.31.3 -> v0.31.4](https://github.com/openziti/ziti/compare/v0.31.3...v0.31.4)
* [Issue #1586](https://github.com/openziti/ziti/issues/1586) - If ack is received before payload is processed by link send buffer, a stall can result

# Release 0.31.3

## What's New
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ require (
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/google/go-cmp v0.6.0
github.com/google/gopacket v1.1.19
github.com/google/uuid v1.4.0
github.com/google/uuid v1.5.0
github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/hashicorp/go-hclog v1.6.1
github.com/hashicorp/go-hclog v1.6.2
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hashicorp/raft v1.6.0
github.com/hashicorp/raft-boltdb v0.0.0-20220329195025-15018e9b97e0
Expand All @@ -52,11 +52,11 @@ require (
github.com/openziti/foundation/v2 v2.0.35
github.com/openziti/identity v1.0.68
github.com/openziti/jwks v1.0.3
github.com/openziti/metrics v1.2.40
github.com/openziti/metrics v1.2.41
github.com/openziti/runzmd v1.0.36
github.com/openziti/sdk-golang v0.21.2
github.com/openziti/secretstream v0.1.14
github.com/openziti/storage v0.2.26
github.com/openziti/storage v0.2.27
github.com/openziti/transport/v2 v2.0.119
github.com/openziti/x509-claims v1.0.3
github.com/openziti/xweb/v2 v2.1.0
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,8 @@ github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLe
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
Expand Down Expand Up @@ -367,8 +367,8 @@ github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brv
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v1.6.1 h1:pa92nu9bPoAqI7p+uPDCIWGAibUdlCi6TYWJEQQkLf8=
github.com/hashicorp/go-hclog v1.6.1/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I=
github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc=
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
Expand Down Expand Up @@ -592,16 +592,16 @@ github.com/openziti/identity v1.0.68 h1:SaFr7BeFQBoWQDiT28vUb8D9w7v6lIAK6/9RkwmV
github.com/openziti/identity v1.0.68/go.mod h1:HbOu3TQ032v8xE6xZWjO51azF4fUxRLjO/l/oGqJwUI=
github.com/openziti/jwks v1.0.3 h1:hf8wkb+Cg4nH/HM0KROFd7u+C3DkRVcFZJ7tDV+4icc=
github.com/openziti/jwks v1.0.3/go.mod h1:t4xxq8vlXGsPn29kiQVnZBBDDnEoOFqtJoHibkJunQQ=
github.com/openziti/metrics v1.2.40 h1:gySRgR8prCPqaEjmUtX0eXFs7NkI9uPAzp+z6A8+JqA=
github.com/openziti/metrics v1.2.40/go.mod h1:HXdVryf3xpZfnY4VcaOjMxiBv+qw0wJlEJNLbooB9hY=
github.com/openziti/metrics v1.2.41 h1:JShcFb6qJPA2cMiWQLtcSXiJjsrhEWpH+aVcjT/Mcbs=
github.com/openziti/metrics v1.2.41/go.mod h1:L9h0NrliMA3+p7+ascKgvx28qoKHymN9l+CMA+Q+sZc=
github.com/openziti/runzmd v1.0.36 h1:HOqTZFzTTFu52qmCAQfFvKDmCSl8ZqP1PQQ0UnJIA4E=
github.com/openziti/runzmd v1.0.36/go.mod h1:jYqummjskmFh63htJFF2SrUuvxNQifqd5REUhYVaY/A=
github.com/openziti/sdk-golang v0.21.2 h1:P66cslOAmQX37VFan+df+MoD2PqaFjHWDNMpuhhXHSo=
github.com/openziti/sdk-golang v0.21.2/go.mod h1:mepEUD39DsBm/v1WVLedYRoYCFdet5mmJ5Sxqm/zkFI=
github.com/openziti/secretstream v0.1.14 h1:Ta+nB5Prcct+L5LIKUA1nE56QhWS6lMPQYTlpxUltU0=
github.com/openziti/secretstream v0.1.14/go.mod h1:/hhuLfu+GIv0+cnapfsu/VOnXEvmTt3GKtCu+lQ0RIw=
github.com/openziti/storage v0.2.26 h1:15EbOC6A//dsdLSs/RYJP6Qn3Rj6Od4btXEWGezatxc=
github.com/openziti/storage v0.2.26/go.mod h1:pDCkPIN7h9L+FyJP+hcfOwk+GofzrrNoYsAThOjNza0=
github.com/openziti/storage v0.2.27 h1:WdFD0KxXZxSoWOaojdi5r1LO0BTvn4x/7wwdwhRPssc=
github.com/openziti/storage v0.2.27/go.mod h1:p/04So5E3NT2jsGew0AS4hYTfP5srti3VytOjlVcx0M=
github.com/openziti/transport/v2 v2.0.119 h1:KOgHU+9EZUVPvv8ncifqHmNEcFUHbJHigo3jyPvWnOc=
github.com/openziti/transport/v2 v2.0.119/go.mod h1:H2IIBP6ed9isE/eJHGXtAZL0d73ApYOpLG9sSvutNNI=
github.com/openziti/x509-claims v1.0.3 h1:HNdQ8Nf1agB3lBs1gahcO6zfkeS4S5xoQ2/PkY4HRX0=
Expand Down
9 changes: 7 additions & 2 deletions router/forwarder/faulter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type Faulter struct {
closeNotify chan struct{}
}

type FaultReceiver interface {
Report(circuitId string, ctrlId string)
NotifyInvalidLink(linkId string)
}

func NewFaulter(ctrls env.NetworkControllers, interval time.Duration, closeNotify chan struct{}) *Faulter {
f := &Faulter{
ctrls: ctrls,
Expand All @@ -50,13 +55,13 @@ func NewFaulter(ctrls env.NetworkControllers, interval time.Duration, closeNotif
return f
}

func (self *Faulter) report(circuitId string, ctrlId string) {
func (self *Faulter) Report(circuitId string, ctrlId string) {
if self.interval > 0 {
self.circuitIds.Set(circuitId, ctrlId)
}
}

func (self *Faulter) notifyInvalidLink(linkId string) {
func (self *Faulter) NotifyInvalidLink(linkId string) {
log := pfxlog.Logger()
self.ctrls.ForEach(func(ctrlId string, ch channel.Channel) {
fault := &ctrl_pb.Fault{Subject: ctrl_pb.FaultSubject_LinkFault, Id: linkId}
Expand Down
23 changes: 16 additions & 7 deletions router/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/openziti/ziti/common/inspect"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/openziti/ziti/common/trace"
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xlink"
"github.com/pkg/errors"
Expand All @@ -34,8 +35,7 @@ import (
type Forwarder struct {
circuits *circuitTable
destinations *destinationTable
faulter *Faulter
scanner *Scanner
faulter FaultReceiver
metricsRegistry metrics.UsageRegistry
traceController trace.Controller
Options *Options
Expand All @@ -58,21 +58,30 @@ type XgressDestination interface {
GetTimeOfLastRxFromLink() int64
}

func NewForwarder(metricsRegistry metrics.UsageRegistry, faulter *Faulter, scanner *Scanner, options *Options, closeNotify <-chan struct{}) *Forwarder {
func NewForwarder(metricsRegistry metrics.UsageRegistry, faulter FaultReceiver, options *Options, closeNotify <-chan struct{}) *Forwarder {
f := &Forwarder{
circuits: newCircuitTable(),
destinations: newDestinationTable(),
faulter: faulter,
scanner: scanner,
metricsRegistry: metricsRegistry,
traceController: trace.NewController(closeNotify),
Options: options,
CloseNotify: closeNotify,
}
f.scanner.setCircuitTable(f.circuits)
return f
}

func (forwarder *Forwarder) StartScanner(ctrls env.NetworkControllers) {
scanner := newScanner(ctrls, forwarder.Options, forwarder.CloseNotify)
scanner.setCircuitTable(forwarder.circuits)

if scanner.interval > 0 {
go scanner.run()
} else {
logrus.Warnf("scanner disabled")
}
}

func (forwarder *Forwarder) MetricsRegistry() metrics.UsageRegistry {
return forwarder.metricsRegistry
}
Expand Down Expand Up @@ -129,7 +138,7 @@ func (forwarder *Forwarder) Route(ctrlId string, route *ctrl_pb.Route) error {
for _, forward := range route.Forwards {
if !forwarder.HasDestination(xgress.Address(forward.DstAddress)) {
if forward.DstType == ctrl_pb.DestType_Link {
forwarder.faulter.notifyInvalidLink(forward.DstAddress)
forwarder.faulter.NotifyInvalidLink(forward.DstAddress)
return errors.Errorf("invalid link destination %v", forward.DstAddress)
}
if forward.DstType == ctrl_pb.DestType_End {
Expand Down Expand Up @@ -258,7 +267,7 @@ func (forwarder *Forwarder) ForwardControl(srcAddr xgress.Address, control *xgre

func (forwarder *Forwarder) ReportForwardingFault(circuitId string, ctrlId string) {
if forwarder.faulter != nil {
forwarder.faulter.report(circuitId, ctrlId)
forwarder.faulter.Report(circuitId, ctrlId)
} else {
logrus.Error("nil faulter, cannot accept forwarding fault report")
}
Expand Down
7 changes: 1 addition & 6 deletions router/forwarder/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,13 @@ type Scanner struct {
closeNotify <-chan struct{}
}

func NewScanner(ctrls env.NetworkControllers, options *Options, closeNotify <-chan struct{}) *Scanner {
func newScanner(ctrls env.NetworkControllers, options *Options, closeNotify <-chan struct{}) *Scanner {
s := &Scanner{
ctrls: ctrls,
interval: options.IdleTxInterval,
timeout: options.IdleCircuitTimeout,
closeNotify: closeNotify,
}
if s.interval > 0 {
go s.run()
} else {
logrus.Warnf("scanner disabled")
}
return s
}

Expand Down
5 changes: 2 additions & 3 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ type Router struct {
ctrls env.NetworkControllers
ctrlBindhandler channel.BindHandler
faulter *forwarder.Faulter
scanner *forwarder.Scanner
forwarder *forwarder.Forwarder
xrctrls []env.Xrctrl
xlinkFactories map[string]xlink.Factory
Expand Down Expand Up @@ -196,8 +195,8 @@ func Create(config *Config, versionProvider versions.VersionProvider) *Router {
router.ctrls = env.NewNetworkControllers(config.Ctrl.DefaultRequestTimeout, router.connectToController, &config.Ctrl.Heartbeats)
router.xlinkRegistry = link.NewLinkRegistry(router)
router.faulter = forwarder.NewFaulter(router.ctrls, config.Forwarder.FaultTxInterval, closeNotify)
router.scanner = forwarder.NewScanner(router.ctrls, config.Forwarder, closeNotify)
router.forwarder = forwarder.NewForwarder(metricsRegistry, router.faulter, router.scanner, config.Forwarder, closeNotify)
router.forwarder = forwarder.NewForwarder(metricsRegistry, router.faulter, config.Forwarder, closeNotify)
router.forwarder.StartScanner(router.ctrls)

xgress.InitPayloadIngester(closeNotify)
xgress.InitAcker(router.forwarder, metricsRegistry, closeNotify)
Expand Down
Loading

0 comments on commit 6dbcde0

Please sign in to comment.