Skip to content

Commit

Permalink
Simplify link ack handling. Fixes #1519
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Nov 17, 2023
1 parent 2571929 commit 895f893
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 120 deletions.
94 changes: 15 additions & 79 deletions router/handler_link/ack.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,107 +17,43 @@
package handler_link

import (
"github.com/ef-ds/deque"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/router/forwarder"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xlink"
"sync/atomic"
)

type ackHandler struct {
link xlink.Xlink
forwarder *forwarder.Forwarder
acks *deque.Deque
ackIngest chan *xgress.Acknowledgement
ackForward chan *xgress.Acknowledgement
acksQueueSize int64
closeNotify <-chan struct{}
link xlink.Xlink
forwarder *forwarder.Forwarder
}

func newAckHandler(link xlink.Xlink, forwarder *forwarder.Forwarder, closeNotify <-chan struct{}) *ackHandler {
result := &ackHandler{
link: link,
forwarder: forwarder,
acks: deque.New(),
ackIngest: make(chan *xgress.Acknowledgement, 16),
ackForward: make(chan *xgress.Acknowledgement, 1),
closeNotify: closeNotify,
func newAckHandler(link xlink.Xlink, forwarder *forwarder.Forwarder) *ackHandler {
return &ackHandler{
link: link,
forwarder: forwarder,
}

go result.ackIngester()
go result.ackForwarder()

forwarder.MetricsRegistry().FuncGauge("xgress.acks.queue_size", func() int64 {
return atomic.LoadInt64(&result.acksQueueSize)
})

return result
}

func (self *ackHandler) ContentType() int32 {
return xgress.ContentTypeAcknowledgementType
}

func (self *ackHandler) HandleReceive(msg *channel.Message, ch channel.Channel) {
if ack, err := xgress.UnmarshallAcknowledgement(msg); err == nil {
select {
case self.ackIngest <- ack:
case <-self.closeNotify:
}
} else {
ack, err := xgress.UnmarshallAcknowledgement(msg)
if err != nil {
pfxlog.ContextLogger(ch.Label()).
WithField("linkId", self.link.Id()).
WithField("routerId", self.link.DestinationId()).
WithError(err).
Error("error unmarshalling ack")
WithError(err).Error("error unmarshalling ack", err)
return
}
}

func (self *ackHandler) ackIngester() {
var next *xgress.Acknowledgement
for {
if next == nil {
if val, _ := self.acks.PopFront(); val != nil {
next = val.(*xgress.Acknowledgement)
}
}

if next == nil {
select {
case ack := <-self.ackIngest:
self.acks.PushBack(ack)
case <-self.closeNotify:
return
}
} else {
select {
case ack := <-self.ackIngest:
self.acks.PushBack(ack)
case self.ackForward <- next:
next = nil
case <-self.closeNotify:
return
}
}
atomic.StoreInt64(&self.acksQueueSize, int64(self.acks.Len()))
}
}

func (self *ackHandler) ackForwarder() {
logger := pfxlog.Logger()
for {
select {
case ack := <-self.ackForward:
if err := self.forwarder.ForwardAcknowledgement(xgress.Address(self.link.Id()), ack); err != nil {
logger.WithField("linkId", self.link.Id()).
WithField("routerId", self.link.DestinationId()).
WithError(err).
Debug("unable to forward acknowledgement")
}
case <-self.closeNotify:
return
}
if err = self.forwarder.ForwardAcknowledgement(xgress.Address(self.link.Id()), ack); err != nil {
pfxlog.ContextLogger(ch.Label()).
WithField("linkId", self.link.Id()).
WithField("routerId", self.link.DestinationId()).
WithError(err).Debug("unable to forward acknowledgement")
}
}
35 changes: 0 additions & 35 deletions router/handler_link/ack_test.go

This file was deleted.

8 changes: 4 additions & 4 deletions router/handler_link/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"github.com/openziti/channel/v2"
"github.com/openziti/channel/v2/latency"
"github.com/openziti/channel/v2/protobufs"
"github.com/openziti/foundation/v2/concurrenz"
nfpem "github.com/openziti/foundation/v2/pem"
"github.com/openziti/metrics"
"github.com/openziti/ziti/common/pb/ctrl_pb"
"github.com/openziti/ziti/common/trace"
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/forwarder"
metrics2 "github.com/openziti/ziti/router/metrics"
"github.com/openziti/ziti/router/xlink"
"github.com/openziti/foundation/v2/concurrenz"
nfpem "github.com/openziti/foundation/v2/pem"
"github.com/openziti/metrics"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"time"
Expand Down Expand Up @@ -74,7 +74,7 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error {
binding.AddCloseHandler(newCloseHandler(self.xlink, self.ctrl, self.forwarder, closeNotify, self.xlinkRegistry))
binding.AddErrorHandler(newErrorHandler(self.xlink, self.ctrl))
binding.AddTypedReceiveHandler(newPayloadHandler(self.xlink, self.forwarder))
binding.AddTypedReceiveHandler(newAckHandler(self.xlink, self.forwarder, closeNotify))
binding.AddTypedReceiveHandler(newAckHandler(self.xlink, self.forwarder))
binding.AddTypedReceiveHandler(&latency.LatencyHandler{})
binding.AddTypedReceiveHandler(newControlHandler(self.xlink, self.forwarder))
binding.AddPeekHandler(metrics2.NewChannelPeekHandler(self.xlink.Id(), self.forwarder.MetricsRegistry()))
Expand Down
4 changes: 2 additions & 2 deletions router/xgress/link_send_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package xgress

import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/ziti/common/inspect"
"github.com/openziti/foundation/v2/info"
"github.com/openziti/ziti/common/inspect"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"math"
Expand Down Expand Up @@ -105,7 +105,7 @@ func NewLinkSendBuffer(x *Xgress) *LinkSendBuffer {
x: x,
buffer: make(map[int32]*txPayload),
newlyBuffered: make(chan *txPayload, x.Options.TxQueueSize),
newlyReceivedAcks: make(chan *Acknowledgement),
newlyReceivedAcks: make(chan *Acknowledgement, 2),
closeNotify: make(chan struct{}),
windowsSize: x.Options.TxPortalStartSize,
retxThreshold: x.Options.RetxStartMs,
Expand Down

0 comments on commit 895f893

Please sign in to comment.