diff --git a/router/handler_link/ack.go b/router/handler_link/ack.go index e6957443f9..5de7d32e8e 100644 --- a/router/handler_link/ack.go +++ b/router/handler_link/ack.go @@ -17,43 +17,23 @@ package handler_link import ( - "github.com/ef-ds/deque" "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v2" "github.com/openziti/ziti/router/forwarder" "github.com/openziti/ziti/router/xgress" "github.com/openziti/ziti/router/xlink" - "sync/atomic" ) type ackHandler struct { - link xlink.Xlink - forwarder *forwarder.Forwarder - acks *deque.Deque - ackIngest chan *xgress.Acknowledgement - ackForward chan *xgress.Acknowledgement - acksQueueSize int64 - closeNotify <-chan struct{} + link xlink.Xlink + forwarder *forwarder.Forwarder } -func newAckHandler(link xlink.Xlink, forwarder *forwarder.Forwarder, closeNotify <-chan struct{}) *ackHandler { - result := &ackHandler{ - link: link, - forwarder: forwarder, - acks: deque.New(), - ackIngest: make(chan *xgress.Acknowledgement, 16), - ackForward: make(chan *xgress.Acknowledgement, 1), - closeNotify: closeNotify, +func newAckHandler(link xlink.Xlink, forwarder *forwarder.Forwarder) *ackHandler { + return &ackHandler{ + link: link, + forwarder: forwarder, } - - go result.ackIngester() - go result.ackForwarder() - - forwarder.MetricsRegistry().FuncGauge("xgress.acks.queue_size", func() int64 { - return atomic.LoadInt64(&result.acksQueueSize) - }) - - return result } func (self *ackHandler) ContentType() int32 { @@ -61,63 +41,19 @@ func (self *ackHandler) ContentType() int32 { } func (self *ackHandler) HandleReceive(msg *channel.Message, ch channel.Channel) { - if ack, err := xgress.UnmarshallAcknowledgement(msg); err == nil { - select { - case self.ackIngest <- ack: - case <-self.closeNotify: - } - } else { + ack, err := xgress.UnmarshallAcknowledgement(msg) + if err != nil { pfxlog.ContextLogger(ch.Label()). WithField("linkId", self.link.Id()). WithField("routerId", self.link.DestinationId()). - WithError(err). - Error("error unmarshalling ack") + WithError(err).Error("error unmarshalling ack", err) + return } -} - -func (self *ackHandler) ackIngester() { - var next *xgress.Acknowledgement - for { - if next == nil { - if val, _ := self.acks.PopFront(); val != nil { - next = val.(*xgress.Acknowledgement) - } - } - if next == nil { - select { - case ack := <-self.ackIngest: - self.acks.PushBack(ack) - case <-self.closeNotify: - return - } - } else { - select { - case ack := <-self.ackIngest: - self.acks.PushBack(ack) - case self.ackForward <- next: - next = nil - case <-self.closeNotify: - return - } - } - atomic.StoreInt64(&self.acksQueueSize, int64(self.acks.Len())) - } -} - -func (self *ackHandler) ackForwarder() { - logger := pfxlog.Logger() - for { - select { - case ack := <-self.ackForward: - if err := self.forwarder.ForwardAcknowledgement(xgress.Address(self.link.Id()), ack); err != nil { - logger.WithField("linkId", self.link.Id()). - WithField("routerId", self.link.DestinationId()). - WithError(err). - Debug("unable to forward acknowledgement") - } - case <-self.closeNotify: - return - } + if err = self.forwarder.ForwardAcknowledgement(xgress.Address(self.link.Id()), ack); err != nil { + pfxlog.ContextLogger(ch.Label()). + WithField("linkId", self.link.Id()). + WithField("routerId", self.link.DestinationId()). + WithError(err).Debug("unable to forward acknowledgement") } } diff --git a/router/handler_link/ack_test.go b/router/handler_link/ack_test.go deleted file mode 100644 index cb6fe58ee1..0000000000 --- a/router/handler_link/ack_test.go +++ /dev/null @@ -1,35 +0,0 @@ -/* - Copyright NetFoundry Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package handler_link - -import ( - "sync/atomic" - "testing" -) - -// A simple test to check for failure of alignment on atomic operations for 64 bit variables in a struct -func Test64BitAlignment(t *testing.T) { - defer func() { - if r := recover(); r != nil { - t.Errorf("One of the variables that was tested is not properly 64-bit aligned.") - } - }() - - ah := ackHandler{} - - atomic.LoadInt64(&ah.acksQueueSize) -} diff --git a/router/handler_link/bind.go b/router/handler_link/bind.go index c3dd7563d6..236a5dfa05 100644 --- a/router/handler_link/bind.go +++ b/router/handler_link/bind.go @@ -5,15 +5,15 @@ import ( "github.com/openziti/channel/v2" "github.com/openziti/channel/v2/latency" "github.com/openziti/channel/v2/protobufs" + "github.com/openziti/foundation/v2/concurrenz" + nfpem "github.com/openziti/foundation/v2/pem" + "github.com/openziti/metrics" "github.com/openziti/ziti/common/pb/ctrl_pb" "github.com/openziti/ziti/common/trace" "github.com/openziti/ziti/router/env" "github.com/openziti/ziti/router/forwarder" metrics2 "github.com/openziti/ziti/router/metrics" "github.com/openziti/ziti/router/xlink" - "github.com/openziti/foundation/v2/concurrenz" - nfpem "github.com/openziti/foundation/v2/pem" - "github.com/openziti/metrics" "github.com/pkg/errors" "github.com/sirupsen/logrus" "time" @@ -74,7 +74,7 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error { binding.AddCloseHandler(newCloseHandler(self.xlink, self.ctrl, self.forwarder, closeNotify, self.xlinkRegistry)) binding.AddErrorHandler(newErrorHandler(self.xlink, self.ctrl)) binding.AddTypedReceiveHandler(newPayloadHandler(self.xlink, self.forwarder)) - binding.AddTypedReceiveHandler(newAckHandler(self.xlink, self.forwarder, closeNotify)) + binding.AddTypedReceiveHandler(newAckHandler(self.xlink, self.forwarder)) binding.AddTypedReceiveHandler(&latency.LatencyHandler{}) binding.AddTypedReceiveHandler(newControlHandler(self.xlink, self.forwarder)) binding.AddPeekHandler(metrics2.NewChannelPeekHandler(self.xlink.Id(), self.forwarder.MetricsRegistry())) diff --git a/router/xgress/link_send_buffer.go b/router/xgress/link_send_buffer.go index 75052bfb3a..9f7c751ab5 100644 --- a/router/xgress/link_send_buffer.go +++ b/router/xgress/link_send_buffer.go @@ -18,8 +18,8 @@ package xgress import ( "github.com/michaelquigley/pfxlog" - "github.com/openziti/ziti/common/inspect" "github.com/openziti/foundation/v2/info" + "github.com/openziti/ziti/common/inspect" "github.com/pkg/errors" "github.com/sirupsen/logrus" "math" @@ -105,7 +105,7 @@ func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer { x: x, buffer: make(map[int32]*txPayload), newlyBuffered: make(chan *txPayload, x.Options.TxQueueSize), - newlyReceivedAcks: make(chan *Acknowledgement), + newlyReceivedAcks: make(chan *Acknowledgement, 2), closeNotify: make(chan struct{}), windowsSize: x.Options.TxPortalStartSize, retxThreshold: x.Options.RetxStartMs,