Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stall when checking acks after checking window size. Fixes #1583 #1584

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ziti fabric raft remove-member ctrl3
* [Issue #462](https://github.com/openziti/sdk-golang/issues/462) - Allow refreshing a single service

* github.com/openziti/ziti: [v0.31.2 -> v0.31.3](https://github.com/openziti/ziti/compare/v0.31.2...v0.31.3)
* [Issue #1583](https://github.com/openziti/ziti/issues/1583) - xgress: Potential data stall due when processing acks after checking window size
* [Issue #1578](https://github.com/openziti/ziti/issues/1578) - Send BindSuccess notifications to SDK if supported
* [Issue #1544](https://github.com/openziti/ziti/issues/1544) - Support transfer raft leadership via REST
* [Issue #1543](https://github.com/openziti/ziti/issues/1543) - Support add/remove raft peer via REST
Expand Down
23 changes: 12 additions & 11 deletions router/xgress/link_send_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,27 +193,28 @@ func (buffer *LinkSendBuffer) run() {
defer retransmitTicker.Stop()

for {
// don't block when we're closing, since the only thing that should still be coming in is end-of-circuit
// if we're blocked, but empty, let one payload in to reduce the chances of a stall
if buffer.isBlocked() && !buffer.closeWhenEmpty.Load() && buffer.linkSendBufferSize != 0 {
buffered = nil
} else {
buffered = buffer.newlyBuffered
}

// bias acks by allowing 10 acks to be processed for every payload in
for i := 0; i < 10; i++ {
// bias acks, process all pending, since that should not block
processingAcks := true
for processingAcks {
select {
case ack := <-buffer.newlyReceivedAcks:
buffer.receiveAcknowledgement(ack)
case <-buffer.closeNotify:
buffer.close()
return
default:
i = 10
processingAcks = false
}
}

// don't block when we're closing, since the only thing that should still be coming in is end-of-circuit
// if we're blocked, but empty, let one payload in to reduce the chances of a stall
if buffer.isBlocked() && !buffer.closeWhenEmpty.Load() && buffer.linkSendBufferSize != 0 {
buffered = nil
} else {
buffered = buffer.newlyBuffered
}

select {
case inspectEvent := <-buffer.inspectRequests:
inspectEvent.handle(buffer)
Expand Down
Loading