From a69a1d51f8e7b55567f8d190a8fab04503a97735 Mon Sep 17 00:00:00 2001 From: Kale Blankenship Date: Wed, 27 Feb 2019 19:15:14 -0800 Subject: [PATCH] Fix settlement mode negotiation. If application has not explicitly requested a settlement mode, any mode returned by the server is accepted. If the application has explicitly requested a settlement mode and the server does not honor it an error is returned during link attachement. --- client.go | 54 ++++++++++++++++++++++++++++++++++++++++-------------- types.go | 14 ++++++++++++++ 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/client.go b/client.go index b91275cb..0aed35e2 100644 --- a/client.go +++ b/client.go @@ -948,18 +948,18 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { l.deliveryCount = resp.InitialDeliveryCount // buffer receiver so that link.mux doesn't block l.messages = make(chan Message, l.receiver.maxCredit) - if resp.SenderSettleMode != nil { - l.senderSettleMode = resp.SenderSettleMode - } } else { // if dynamic address requested, copy assigned name to address if l.dynamicAddr && resp.Target != nil { l.target.Address = resp.Target.Address } l.transfers = make(chan performTransfer) - if resp.ReceiverSettleMode != nil { - l.receiverSettleMode = resp.ReceiverSettleMode - } + } + + err = l.setSettleModes(resp) + if err != nil { + l.muxDetach() + return nil, err } go l.mux() @@ -967,6 +967,32 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { return l, nil } +// setSettleModes sets the settlement modes based on the resp performAttach. +// +// If a settlement mode has been explicitly set locally and it was not honored by the +// server an error is returned. +func (l *link) setSettleModes(resp *performAttach) error { + var ( + localRecvSettle = l.receiverSettleMode.value() + respRecvSettle = resp.ReceiverSettleMode.value() + ) + if l.receiverSettleMode != nil && localRecvSettle != respRecvSettle { + return fmt.Errorf("amqp: receiver settlement mode %q requested, received %q from server", l.receiverSettleMode, &respRecvSettle) + } + l.receiverSettleMode = &respRecvSettle + + var ( + localSendSettle = l.senderSettleMode.value() + respSendSettle = resp.SenderSettleMode.value() + ) + if l.senderSettleMode != nil && localSendSettle != respSendSettle { + return fmt.Errorf("amqp: sender settlement mode %q requested, received %q from server", l.senderSettleMode, &respSendSettle) + } + l.senderSettleMode = &respSendSettle + + return nil +} + func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) { l := &link{ name: randString(40), @@ -1523,12 +1549,12 @@ func LinkBatchMaxAge(d time.Duration) LinkOption { } } -// LinkSenderSettle sets the sender settlement mode. +// LinkSenderSettle sets the requested sender settlement mode. // -// When the Link is the Receiver, this is a request to the remote -// server. +// If a settlement mode is explicitly set and the server does not +// honor it an error will be returned during link attachment. // -// When the Link is the Sender, this is the actual settlement mode. +// Default: Accept the settlement mode set by the server, commonly ModeMixed. func LinkSenderSettle(mode SenderSettleMode) LinkOption { return func(l *link) error { if mode > ModeMixed { @@ -1539,12 +1565,12 @@ func LinkSenderSettle(mode SenderSettleMode) LinkOption { } } -// LinkReceiverSettle sets the receiver settlement mode. +// LinkReceiverSettle sets the requested receiver settlement mode. // -// When the Link is the Sender, this is a request to the remote -// server. +// If a settlement mode is explicitly set and the server does not +// honor it an error will be returned during link attachment. // -// When the Link is the Receiver, this is the actual settlement mode. +// Default: Accept the settlement mode set by the server, commonly ModeFirst. func LinkReceiverSettle(mode ReceiverSettleMode) LinkOption { return func(l *link) error { if mode > ModeSecond { diff --git a/types.go b/types.go index a2b476de..8dd2262e 100644 --- a/types.go +++ b/types.go @@ -2704,6 +2704,13 @@ func (m *SenderSettleMode) unmarshal(r *buffer) error { return err } +func (m *SenderSettleMode) value() SenderSettleMode { + if m == nil { + return ModeMixed + } + return *m +} + // Receiver Settlement Modes const ( // Receiver will spontaneously settle all incoming transfers. @@ -2745,6 +2752,13 @@ func (m *ReceiverSettleMode) unmarshal(r *buffer) error { return err } +func (m *ReceiverSettleMode) value() ReceiverSettleMode { + if m == nil { + return ModeFirst + } + return *m +} + // Durability Policies const ( // No terminus state is retained durably.