Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Fix settlement mode negotiation.
Browse files Browse the repository at this point in the history
If application has not explicitly requested a settlement mode, any
mode returned by the server is accepted.

If the application has explicitly requested a settlement mode and
the server does not honor it an error is returned during link
attachement.
  • Loading branch information
vcabbage committed Feb 28, 2019
1 parent 61c4fd5 commit a69a1d5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
54 changes: 40 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,25 +948,51 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
l.deliveryCount = resp.InitialDeliveryCount
// buffer receiver so that link.mux doesn't block
l.messages = make(chan Message, l.receiver.maxCredit)
if resp.SenderSettleMode != nil {
l.senderSettleMode = resp.SenderSettleMode
}
} else {
// if dynamic address requested, copy assigned name to address
if l.dynamicAddr && resp.Target != nil {
l.target.Address = resp.Target.Address
}
l.transfers = make(chan performTransfer)
if resp.ReceiverSettleMode != nil {
l.receiverSettleMode = resp.ReceiverSettleMode
}
}

err = l.setSettleModes(resp)
if err != nil {
l.muxDetach()
return nil, err
}

go l.mux()

return l, nil
}

// setSettleModes sets the settlement modes based on the resp performAttach.
//
// If a settlement mode has been explicitly set locally and it was not honored by the
// server an error is returned.
func (l *link) setSettleModes(resp *performAttach) error {
var (
localRecvSettle = l.receiverSettleMode.value()
respRecvSettle = resp.ReceiverSettleMode.value()
)
if l.receiverSettleMode != nil && localRecvSettle != respRecvSettle {
return fmt.Errorf("amqp: receiver settlement mode %q requested, received %q from server", l.receiverSettleMode, &respRecvSettle)
}
l.receiverSettleMode = &respRecvSettle

var (
localSendSettle = l.senderSettleMode.value()
respSendSettle = resp.SenderSettleMode.value()
)
if l.senderSettleMode != nil && localSendSettle != respSendSettle {
return fmt.Errorf("amqp: sender settlement mode %q requested, received %q from server", l.senderSettleMode, &respSendSettle)
}
l.senderSettleMode = &respSendSettle

return nil
}

func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
l := &link{
name: randString(40),
Expand Down Expand Up @@ -1523,12 +1549,12 @@ func LinkBatchMaxAge(d time.Duration) LinkOption {
}
}

// LinkSenderSettle sets the sender settlement mode.
// LinkSenderSettle sets the requested sender settlement mode.
//
// When the Link is the Receiver, this is a request to the remote
// server.
// If a settlement mode is explicitly set and the server does not
// honor it an error will be returned during link attachment.
//
// When the Link is the Sender, this is the actual settlement mode.
// Default: Accept the settlement mode set by the server, commonly ModeMixed.
func LinkSenderSettle(mode SenderSettleMode) LinkOption {
return func(l *link) error {
if mode > ModeMixed {
Expand All @@ -1539,12 +1565,12 @@ func LinkSenderSettle(mode SenderSettleMode) LinkOption {
}
}

// LinkReceiverSettle sets the receiver settlement mode.
// LinkReceiverSettle sets the requested receiver settlement mode.
//
// When the Link is the Sender, this is a request to the remote
// server.
// If a settlement mode is explicitly set and the server does not
// honor it an error will be returned during link attachment.
//
// When the Link is the Receiver, this is the actual settlement mode.
// Default: Accept the settlement mode set by the server, commonly ModeFirst.
func LinkReceiverSettle(mode ReceiverSettleMode) LinkOption {
return func(l *link) error {
if mode > ModeSecond {
Expand Down
14 changes: 14 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2704,6 +2704,13 @@ func (m *SenderSettleMode) unmarshal(r *buffer) error {
return err
}

func (m *SenderSettleMode) value() SenderSettleMode {
if m == nil {
return ModeMixed
}
return *m
}

// Receiver Settlement Modes
const (
// Receiver will spontaneously settle all incoming transfers.
Expand Down Expand Up @@ -2745,6 +2752,13 @@ func (m *ReceiverSettleMode) unmarshal(r *buffer) error {
return err
}

func (m *ReceiverSettleMode) value() ReceiverSettleMode {
if m == nil {
return ModeFirst
}
return *m
}

// Durability Policies
const (
// No terminus state is retained durably.
Expand Down

0 comments on commit a69a1d5

Please sign in to comment.