Skip to content

Commit

Permalink
Re-enable xgress MTU, with 0 disabling chunking. Implement frame when…
Browse files Browse the repository at this point in the history
… chunked. Fixes openziti#2336
  • Loading branch information
plorenz committed Aug 21, 2024
1 parent a37d858 commit f731405
Show file tree
Hide file tree
Showing 23 changed files with 877 additions and 65 deletions.
2 changes: 1 addition & 1 deletion controller/sync_strats/sync_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (strategy *InstantStrategy) ReceiveClientHello(routerId string, msg *channe
for _, listener := range respHello.Listeners {
protocols[listener.Advertise.Protocol] = fmt.Sprintf("%s://%s:%d", listener.Advertise.Protocol, listener.Advertise.Hostname, listener.Advertise.Port)
}
} else {
} else if respHello.Hostname != "" {
for idx, protocol := range respHello.Protocols {
if len(respHello.ProtocolPorts) > idx {
port := respHello.ProtocolPorts[idx]
Expand Down
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/openziti/agent v1.0.17
github.com/openziti/channel/v2 v2.0.137
github.com/openziti/channel/v2 v2.0.141
github.com/openziti/edge-api v0.26.23
github.com/openziti/foundation/v2 v2.0.48
github.com/openziti/identity v1.0.82
github.com/openziti/identity v1.0.84
github.com/openziti/jwks v1.0.3
github.com/openziti/metrics v1.2.57
github.com/openziti/runzmd v1.0.50
github.com/openziti/sdk-golang v0.23.40
github.com/openziti/secretstream v0.1.21
github.com/openziti/storage v0.3.0
github.com/openziti/transport/v2 v2.0.139
github.com/openziti/transport/v2 v2.0.143
github.com/openziti/x509-claims v1.0.3
github.com/openziti/xweb/v2 v2.1.1
github.com/openziti/ziti-db-explorer v1.1.3
Expand Down Expand Up @@ -154,6 +154,9 @@ require (
github.com/openziti/dilithium v0.3.3 // indirect
github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pion/dtls/v3 v3.0.1 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/transport/v3 v3.0.7 // indirect
github.com/pkg/term v1.2.0-beta.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
Expand Down
18 changes: 12 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -566,16 +566,16 @@ github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b/go.mo
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/openziti/agent v1.0.17 h1:CNBVWl8m4CWXz/pmdWjEhg1rvtUGinQNRAr3vgF90go=
github.com/openziti/agent v1.0.17/go.mod h1:GJVKVikwmvZ0U+hNP7Zi2P+xd/wTb6VZ9wz24/2WQ+U=
github.com/openziti/channel/v2 v2.0.137 h1:Zedls96/wjYFqGIXdfy6tFdcf+ESSdk2NkD3J4KleD0=
github.com/openziti/channel/v2 v2.0.137/go.mod h1:VHs5ZmkIV32/YOsNwTTFG6oI6G2PeJDxtfmiWrzbI24=
github.com/openziti/channel/v2 v2.0.141 h1:ikow9j2fKlOlGRabrBcYZt8DuTtIIuiwJZBbmzsPkyU=
github.com/openziti/channel/v2 v2.0.141/go.mod h1:/wqA1X0n+gkQvijmOXBadWCLBJ1AxaX5z0z09TWp7ec=
github.com/openziti/dilithium v0.3.3 h1:PLgQ6PMNLSTzCFbX/h98cmudgz/cU6TmjdSv5NAPD8k=
github.com/openziti/dilithium v0.3.3/go.mod h1:vsCjI2AU/hon9e+dLhUFbCNGesJDj2ASgkySOcpmvjo=
github.com/openziti/edge-api v0.26.23 h1:tXXGc/ACg7bQys+8K8ElS49A8VWfBi8Y6nGV53NqE9U=
github.com/openziti/edge-api v0.26.23/go.mod h1:t0qfgV5u2+HItpvgDIShA69v6m7RZ+PrbQuLQaDDdx8=
github.com/openziti/foundation/v2 v2.0.48 h1:G0/P8XQS+xTAS3KYQ/PHjLFHLABZkLJeNDbPgPTaxU0=
github.com/openziti/foundation/v2 v2.0.48/go.mod h1:pj5nPmVtAdF1QX+aRtALw69hxcsAzrWDpBUe+Zrc73Q=
github.com/openziti/identity v1.0.82 h1:of53LaTP1hu2zQA40oCI5qcpmvl6+vyVqRYjy0ml3F4=
github.com/openziti/identity v1.0.82/go.mod h1:EVkwgH4r/uQ/GUUFjIZN6WBiGcQHUhfiv6o+WUAtuDk=
github.com/openziti/identity v1.0.84 h1:AxNS/KZb50rzJkZ+1IF1aoollObrelZBUoMlWtpw2Y4=
github.com/openziti/identity v1.0.84/go.mod h1:EVkwgH4r/uQ/GUUFjIZN6WBiGcQHUhfiv6o+WUAtuDk=
github.com/openziti/jwks v1.0.3 h1:hf8wkb+Cg4nH/HM0KROFd7u+C3DkRVcFZJ7tDV+4icc=
github.com/openziti/jwks v1.0.3/go.mod h1:t4xxq8vlXGsPn29kiQVnZBBDDnEoOFqtJoHibkJunQQ=
github.com/openziti/metrics v1.2.57 h1:CIhz2MILuUauQF3gYHVj0hsHZXJcWu7yHA0Y1aT6BCU=
Expand All @@ -588,8 +588,8 @@ github.com/openziti/secretstream v0.1.21 h1:r4xN8/CzSEvxZFFYGSztrlhMtIvk3B+SQcq2
github.com/openziti/secretstream v0.1.21/go.mod h1:1lfAnS8gBHsKZiPbRRK1sularbAsqizN6tWUEuZSfo0=
github.com/openziti/storage v0.3.0 h1:DH2SN8GYy7rSlBZM9X5W1Dv2b2qZ8kSKyt0iivokVMw=
github.com/openziti/storage v0.3.0/go.mod h1:1f6cGRKYLzwst5hwVY+qr8GCcUeO/U5jJftE8+qFqbk=
github.com/openziti/transport/v2 v2.0.139 h1:5uN80RLSmTpleOnjMu+39nZ9iIuZSvcvl6R1PQ7gyBg=
github.com/openziti/transport/v2 v2.0.139/go.mod h1:JIJUD59R+rLcf8sauTYXCVnJAGM9UCHCRGrUowe91j4=
github.com/openziti/transport/v2 v2.0.143 h1:qhqI/yEN4SvP8SBx7ERCt0x67Im+Icy/hGtJ7Dn/xOQ=
github.com/openziti/transport/v2 v2.0.143/go.mod h1:3BxxlWa8fbhmZG1CmIOpeEHlCCY1G7DPx7v7+bAXYEQ=
github.com/openziti/x509-claims v1.0.3 h1:HNdQ8Nf1agB3lBs1gahcO6zfkeS4S5xoQ2/PkY4HRX0=
github.com/openziti/x509-claims v1.0.3/go.mod h1:Z0WIpBm6c4ecrpRKrou6Gk2wrLWxJO/+tuUwKh8VewE=
github.com/openziti/xweb/v2 v2.1.1 h1:T6vbmG2189WWwq16wryM7RQEbT5wNARrVHNQs23jEPE=
Expand All @@ -607,6 +607,12 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pion/dtls/v3 v3.0.1 h1:0kmoaPYLAo0md/VemjcrAXQiSf8U+tuU3nDYVNpEKaw=
github.com/pion/dtls/v3 v3.0.1/go.mod h1:dfIXcFkKoujDQ+jtd8M6RgqKK3DuaUilm3YatAbGp5k=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0=
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
2 changes: 1 addition & 1 deletion router/handler_edge_ctrl/hello.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewHelloHandler(stateManager state.Manager, listeners []*edge_ctrl_pb.Liste

//v0.26.3 and older used to check and ensure all advertise hostnames were the same which can't be done now
//with the ability to report multiple advertise protocols on different hostnames
hostname: listeners[0].Advertise.Hostname,
hostname: hostname,
supportedProtocols: supportedProtocols,
protocolPorts: protocolPorts,
}
Expand Down
9 changes: 7 additions & 2 deletions router/xgress/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
PayloadFlagCircuitEnd PayloadFlag = 1
PayloadFlagOriginator PayloadFlag = 2
PayloadFlagCircuitStart PayloadFlag = 4
PayloadFlagChunk PayloadFlag = 8
)

type Header struct {
Expand All @@ -81,8 +82,8 @@ func (header *Header) GetCircuitId() string {
return header.CircuitId
}

func (header *Header) GetFlags() string {
return header.CircuitId
func (header *Header) GetFlags() uint32 {
return header.Flags
}

func (header *Header) GetOriginator() Originator {
Expand Down Expand Up @@ -257,6 +258,10 @@ func isPayloadFlagSet(flags uint32, flag PayloadFlag) bool {
return PayloadFlag(flags)&flag == flag
}

func setPayloadFlag(flags uint32, flag PayloadFlag) uint32 {
return uint32(PayloadFlag(flags) | flag)
}

func (payload *Payload) IsCircuitEndFlagSet() bool {
return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitEnd)
}
Expand Down
2 changes: 1 addition & 1 deletion router/xgress/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func LoadOptions(data OptionsData) (*Options, error) {

func DefaultOptions() *Options {
return &Options{
Mtu: 64 * 1024,
Mtu: 0,
RandomDrops: false,
Drop1InN: 100,
TxQueueSize: 1,
Expand Down
187 changes: 154 additions & 33 deletions router/xgress/xgress.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package xgress
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -450,23 +451,28 @@ func (self *Xgress) tx() {
}
}()

var payload *Payload
clearPayloadFromSendBuffer := func(payload *Payload) {
payloadSize := len(payload.Data)
size := atomic.AddUint32(&self.linkRxBuffer.size, ^uint32(payloadSize-1)) // subtraction for uint32

for {
payload = self.nextPayload()
payloadLogger := log.WithFields(payload.GetLoggerFields())
payloadLogger.Debugf("payload %v of size %v removed from rx buffer, new size: %v", payload.Sequence, payloadSize, size)

if payload == nil {
log.Debug("nil payload received, exiting")
return
lastBufferSizeSent := self.linkRxBuffer.getLastBufferSizeSent()
if lastBufferSizeSent > 10000 && (lastBufferSizeSent>>1) > size {
self.SendEmptyAck()
}
}

sendPayload := func(payload *Payload) bool {
payloadLogger := log.WithFields(payload.GetLoggerFields())

if payload.IsCircuitEndFlagSet() {
self.markCircuitEndReceived()
log.Debug("circuit end payload received, exiting")
return
payloadLogger.Debug("circuit end payload received, exiting")
return false
}

payloadLogger := log.WithFields(payload.GetLoggerFields())
payloadLogger.Debug("sending")

for _, peekHandler := range self.peekHandlers {
Expand All @@ -479,20 +485,82 @@ func (self *Xgress) tx() {
if err != nil {
payloadLogger.Warnf("write failed (%s), closing xgress", err)
self.Close()
return
return false
} else {
payloadWriteTimer.UpdateSince(start)
payloadLogger.Debugf("sent [%s]", info.ByteCount(int64(n)))
payloadLogger.Infof("payload sent [%s]", info.ByteCount(int64(n)))
}
}
payloadSize := len(payload.Data)
size := atomic.AddUint32(&self.linkRxBuffer.size, ^uint32(payloadSize-1)) // subtraction for uint32
payloadLogger.Debugf("Payload %v of size %v removed from rx buffer. New size: %v", payload.Sequence, payloadSize, size)
return true
}

lastBufferSizeSent := self.linkRxBuffer.getLastBufferSizeSent()
if lastBufferSizeSent > 10000 && (lastBufferSizeSent>>1) > size {
self.SendEmptyAck()
var payload *Payload
var payloadChunk *Payload

payloadStarted := false
payloadComplete := false
var payloadSize int64
var payloadWriteOffset int

for {
payloadChunk = self.nextPayload()

if payloadChunk == nil {
log.Debug("nil payload received, exiting")
return
}

if !isPayloadFlagSet(payloadChunk.GetFlags(), PayloadFlagChunk) {
if !sendPayload(payloadChunk) {
return
}
clearPayloadFromSendBuffer(payloadChunk)
continue
}

var payloadReadOffset int
if !payloadStarted {
payloadSize, payloadReadOffset = binary.Varint(payloadChunk.Data)

if len(payloadChunk.Data) == 0 || payloadSize+int64(payloadReadOffset) == int64(len(payloadChunk.Data)) {
payload = payloadChunk
payload.Data = payload.Data[payloadReadOffset:]
payloadComplete = true
} else {
payload = &Payload{
Header: payloadChunk.Header,
Sequence: payloadChunk.Sequence,
Headers: payloadChunk.Headers,
Data: make([]byte, payloadSize),
}
}
payloadStarted = true
}

if !payloadComplete {
chunkData := payloadChunk.Data[payloadReadOffset:]
copy(payload.Data[payloadWriteOffset:], chunkData)
payloadWriteOffset += len(chunkData)
payloadComplete = int64(payloadWriteOffset) == payloadSize
}

payloadLogger := log.WithFields(payload.GetLoggerFields())
payloadLogger.Debugf("received payload chunk. seq: %d, first: %v, complete: %v, chunk size: %d, payload size: %d, writeOffset: %d",
payloadChunk.Sequence, len(payload.Data) == 0 || payloadReadOffset > 0, payloadComplete, len(payloadChunk.Data), payloadSize, payloadWriteOffset)

if !payloadComplete {
clearPayloadFromSendBuffer(payloadChunk)
continue
}

payloadStarted = false
payloadComplete = false
payloadWriteOffset = 0

if !sendPayload(payload) {
return
}
clearPayloadFromSendBuffer(payloadChunk)
}
}

Expand All @@ -504,7 +572,7 @@ func (self *Xgress) flushSendThenClose() {
return false
}

pfxlog.ContextLogger(self.Label()).Debug("sending end of circuit payload")
pfxlog.ContextLogger(self.Label()).Info("sending end of circuit payload")
return self.forwardPayload(payload)
})
}
Expand All @@ -526,7 +594,7 @@ func (self *Xgress) rx() {

for {
buffer, headers, err := self.peer.ReadPayload()
log.Debugf("read: %v bytes read", len(buffer))
log.Debugf("payload read: %d bytes read", len(buffer))
n := len(buffer)

// if we got an EOF, but also some data, ignore the EOF, next read we'll get 0, EOF
Expand All @@ -544,28 +612,81 @@ func (self *Xgress) rx() {
return
}

payload := &Payload{
Header: Header{
CircuitId: self.circuitId,
Flags: SetOriginatorFlag(0, self.originator),
},
Sequence: self.nextReceiveSequence(),
Data: buffer[0:n],
Headers: headers,
if n < int(self.Options.Mtu) || self.Options.Mtu == 0 {
if !self.sendUnchunkedBuffer(buffer, headers) {
return
}
continue
}

// if the payload buffer is closed, we can't forward any more data, so might as well exit the rx loop
// The txer will still have a chance to flush any already received data
if !self.forwardPayload(payload) {
return
first := true
for len(buffer) > 0 {
chunk := make([]byte, self.Options.Mtu)
dataTarget := chunk
offset := 0
if first {
offset = binary.PutVarint(chunk, int64(n))
dataTarget = chunk[offset:]
}

written := copy(dataTarget, buffer)
buffer = buffer[written:]

payload := &Payload{
Header: Header{
CircuitId: self.circuitId,
Flags: setPayloadFlag(SetOriginatorFlag(0, self.originator), PayloadFlagChunk),
},
Sequence: self.nextReceiveSequence(),
Data: chunk[:offset+written],
}

if first {
payload.Headers = headers
}
log.Debugf("sending payload chunk. seq: %d, first: %v, chunk size: %d, payload size: %d, remainder: %d", payload.Sequence, first, len(payload.Data), n, len(buffer))
first = false

// if the payload buffer is closed, we can't forward any more data, so might as well exit the rx loop
// The txer will still have a chance to flush any already received data
if !self.forwardPayload(payload) {
return
}

payloadLogger := log.WithFields(payload.GetLoggerFields())
payloadLogger.Debugf("forwarded [%s]", info.ByteCount(int64(n)))
}
payloadLogger := log.WithFields(payload.GetLoggerFields())
payloadLogger.Debugf("received [%s]", info.ByteCount(int64(n)))

logrus.Debugf("received payload for [%d] bytes", n)
}
}

func (self *Xgress) sendUnchunkedBuffer(buf []byte, headers map[uint8][]byte) bool {
log := pfxlog.ContextLogger(self.Label())

payload := &Payload{
Header: Header{
CircuitId: self.circuitId,
Flags: SetOriginatorFlag(0, self.originator),
},
Sequence: self.nextReceiveSequence(),
Data: buf,
Headers: headers,
}

log.Debugf("sending unchunked payload. seq: %d, payload size: %d", payload.Sequence, len(payload.Data))

// if the payload buffer is closed, we can't forward any more data, so might as well exit the rx loop
// The txer will still have a chance to flush any already received data
if !self.forwardPayload(payload) {
return false
}

payloadLogger := log.WithFields(payload.GetLoggerFields())
payloadLogger.Debugf("forwarded [%s]", info.ByteCount(int64(len(buf))))
return true
}

func (self *Xgress) forwardPayload(payload *Payload) bool {
sendCallback, err := self.payloadBuffer.BufferPayload(payload)

Expand Down
Loading

0 comments on commit f731405

Please sign in to comment.