From 05db8673c59e34ed1ce16607039040aba83b22f5 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Fri, 17 Nov 2023 14:24:37 -0500 Subject: [PATCH] Simplify link ack handling. Fixes #1519 --- router/handler_link/ack.go | 93 +++++-------------------------- router/handler_link/bind.go | 8 +-- router/xgress/link_send_buffer.go | 4 +- 3 files changed, 21 insertions(+), 84 deletions(-) diff --git a/router/handler_link/ack.go b/router/handler_link/ack.go index e6957443f9..225c4402e8 100644 --- a/router/handler_link/ack.go +++ b/router/handler_link/ack.go @@ -17,43 +17,23 @@ package handler_link import ( - "github.com/ef-ds/deque" "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v2" "github.com/openziti/ziti/router/forwarder" "github.com/openziti/ziti/router/xgress" "github.com/openziti/ziti/router/xlink" - "sync/atomic" ) type ackHandler struct { - link xlink.Xlink - forwarder *forwarder.Forwarder - acks *deque.Deque - ackIngest chan *xgress.Acknowledgement - ackForward chan *xgress.Acknowledgement - acksQueueSize int64 - closeNotify <-chan struct{} + link xlink.Xlink + forwarder *forwarder.Forwarder } -func newAckHandler(link xlink.Xlink, forwarder *forwarder.Forwarder, closeNotify <-chan struct{}) *ackHandler { - result := &ackHandler{ - link: link, - forwarder: forwarder, - acks: deque.New(), - ackIngest: make(chan *xgress.Acknowledgement, 16), - ackForward: make(chan *xgress.Acknowledgement, 1), - closeNotify: closeNotify, +func newAckHandler(link xlink.Xlink, forwarder *forwarder.Forwarder) *ackHandler { + return &ackHandler{ + link: link, + forwarder: forwarder, } - - go result.ackIngester() - go result.ackForwarder() - - forwarder.MetricsRegistry().FuncGauge("xgress.acks.queue_size", func() int64 { - return atomic.LoadInt64(&result.acksQueueSize) - }) - - return result } func (self *ackHandler) ContentType() int32 { @@ -61,63 +41,20 @@ func (self *ackHandler) ContentType() int32 { } func (self *ackHandler) HandleReceive(msg *channel.Message, ch channel.Channel) { - if ack, err := xgress.UnmarshallAcknowledgement(msg); err == nil { - select { - case self.ackIngest <- ack: - case <-self.closeNotify: - } - } else { + ack, err := xgress.UnmarshallAcknowledgement(msg) + if err == nil { pfxlog.ContextLogger(ch.Label()). WithField("linkId", self.link.Id()). WithField("routerId", self.link.DestinationId()). - WithError(err). - Error("error unmarshalling ack") - } -} - -func (self *ackHandler) ackIngester() { - var next *xgress.Acknowledgement - for { - if next == nil { - if val, _ := self.acks.PopFront(); val != nil { - next = val.(*xgress.Acknowledgement) - } - } - - if next == nil { - select { - case ack := <-self.ackIngest: - self.acks.PushBack(ack) - case <-self.closeNotify: - return - } - } else { - select { - case ack := <-self.ackIngest: - self.acks.PushBack(ack) - case self.ackForward <- next: - next = nil - case <-self.closeNotify: - return - } - } - atomic.StoreInt64(&self.acksQueueSize, int64(self.acks.Len())) + WithError(err).Error("error unmarshalling ack", err) + return } -} -func (self *ackHandler) ackForwarder() { - logger := pfxlog.Logger() - for { - select { - case ack := <-self.ackForward: - if err := self.forwarder.ForwardAcknowledgement(xgress.Address(self.link.Id()), ack); err != nil { - logger.WithField("linkId", self.link.Id()). - WithField("routerId", self.link.DestinationId()). - WithError(err). - Debug("unable to forward acknowledgement") - } - case <-self.closeNotify: - return + if err = self.forwarder.ForwardAcknowledgement(xgress.Address(self.link.Id()), ack); err != nil { + pfxlog.ContextLogger(ch.Label()). + WithField("linkId", self.link.Id()). + WithField("routerId", self.link.DestinationId()). + WithError(err).Debug("unable to forward acknowledgement") } } } diff --git a/router/handler_link/bind.go b/router/handler_link/bind.go index c3dd7563d6..236a5dfa05 100644 --- a/router/handler_link/bind.go +++ b/router/handler_link/bind.go @@ -5,15 +5,15 @@ import ( "github.com/openziti/channel/v2" "github.com/openziti/channel/v2/latency" "github.com/openziti/channel/v2/protobufs" + "github.com/openziti/foundation/v2/concurrenz" + nfpem "github.com/openziti/foundation/v2/pem" + "github.com/openziti/metrics" "github.com/openziti/ziti/common/pb/ctrl_pb" "github.com/openziti/ziti/common/trace" "github.com/openziti/ziti/router/env" "github.com/openziti/ziti/router/forwarder" metrics2 "github.com/openziti/ziti/router/metrics" "github.com/openziti/ziti/router/xlink" - "github.com/openziti/foundation/v2/concurrenz" - nfpem "github.com/openziti/foundation/v2/pem" - "github.com/openziti/metrics" "github.com/pkg/errors" "github.com/sirupsen/logrus" "time" @@ -74,7 +74,7 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error { binding.AddCloseHandler(newCloseHandler(self.xlink, self.ctrl, self.forwarder, closeNotify, self.xlinkRegistry)) binding.AddErrorHandler(newErrorHandler(self.xlink, self.ctrl)) binding.AddTypedReceiveHandler(newPayloadHandler(self.xlink, self.forwarder)) - binding.AddTypedReceiveHandler(newAckHandler(self.xlink, self.forwarder, closeNotify)) + binding.AddTypedReceiveHandler(newAckHandler(self.xlink, self.forwarder)) binding.AddTypedReceiveHandler(&latency.LatencyHandler{}) binding.AddTypedReceiveHandler(newControlHandler(self.xlink, self.forwarder)) binding.AddPeekHandler(metrics2.NewChannelPeekHandler(self.xlink.Id(), self.forwarder.MetricsRegistry())) diff --git a/router/xgress/link_send_buffer.go b/router/xgress/link_send_buffer.go index 75052bfb3a..9f7c751ab5 100644 --- a/router/xgress/link_send_buffer.go +++ b/router/xgress/link_send_buffer.go @@ -18,8 +18,8 @@ package xgress import ( "github.com/michaelquigley/pfxlog" - "github.com/openziti/ziti/common/inspect" "github.com/openziti/foundation/v2/info" + "github.com/openziti/ziti/common/inspect" "github.com/pkg/errors" "github.com/sirupsen/logrus" "math" @@ -105,7 +105,7 @@ func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer { x: x, buffer: make(map[int32]*txPayload), newlyBuffered: make(chan *txPayload, x.Options.TxQueueSize), - newlyReceivedAcks: make(chan *Acknowledgement), + newlyReceivedAcks: make(chan *Acknowledgement, 2), closeNotify: make(chan struct{}), windowsSize: x.Options.TxPortalStartSize, retxThreshold: x.Options.RetxStartMs,