Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Send error if received message is too large.
Browse files Browse the repository at this point in the history
  • Loading branch information
vcabbage committed May 13, 2018
1 parent 9233ef0 commit 970864c
Showing 1 changed file with 62 additions and 57 deletions.
119 changes: 62 additions & 57 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,21 +778,22 @@ const (
//
// May be used for sending or receiving.
type link struct {
name string // our name
handle uint32 // our handle
remoteHandle uint32 // remote's handle
dynamicAddr bool // request a dynamic link address from the server
rx chan frameBody // sessions sends frames for this link on this channel
transfers chan performTransfer // sender uses for send; receiver uses for receive
close chan struct{}
closeOnce sync.Once
done chan struct{}
doneOnce sync.Once
session *Session // parent session
receiver *Receiver // allows link options to modify Receiver
source *source
target *target
properties map[symbol]interface{} // additional properties sent upon link attach
name string // our name
handle uint32 // our handle
remoteHandle uint32 // remote's handle
dynamicAddr bool // request a dynamic link address from the server
rx chan frameBody // sessions sends frames for this link on this channel
transfers chan performTransfer // sender uses for send; receiver uses for receive
closeOnce sync.Once // closeOnce protects close from being closed multiple times
close chan struct{} // close signals the mux to shutdown
done chan struct{} // done is closed by mux/muxDetach when the link is fully detached
detachErrorMu sync.Mutex // protects detachError
detachError *Error // error to send to remote on detach, set by closeWithError
session *Session // parent session
receiver *Receiver // allows link options to modify Receiver
source *source
target *target
properties map[symbol]interface{} // additional properties sent upon link attach

// "The delivery-count is initialized by the sender when a link endpoint is created,
// and is incremented whenever a message is sent. Only the sender MAY independently
Expand All @@ -805,7 +806,6 @@ type link struct {
senderSettleMode *SenderSettleMode
receiverSettleMode *ReceiverSettleMode
maxMessageSize uint64
detachSent bool // detach frame has been sent
detachReceived bool
err error // err returned on Close()
}
Expand Down Expand Up @@ -941,7 +941,7 @@ func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
}

func (l *link) mux() {
defer l.detach()
defer l.muxDetach()

var (
isReceiver = l.receiver != nil
Expand Down Expand Up @@ -1058,15 +1058,10 @@ func (l *link) muxHandleFrame(fr frameBody) error {
debug(3, "RX: %s", fr)
if isSender {
// Senders should never receive transfer frames, but handle it just in case.
l.session.txFrame(&performDetach{
Handle: l.handle,
Closed: true,
Error: &Error{
Condition: ErrorNotAllowed,
Description: "sender cannot process transfer frame",
},
}, nil)
l.detachSent = true
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: "sender cannot process transfer frame",
})
return errorErrorf("sender received transfer frame")
}

Expand Down Expand Up @@ -1176,8 +1171,32 @@ func (l *link) Close(ctx context.Context) error {
return l.err
}

func (l *link) detach() {
defer l.doneOnce.Do(func() { close(l.done) })
func (l *link) closeWithError(de *Error) {
l.closeOnce.Do(func() {
l.detachErrorMu.Lock()
l.detachError = de
l.detachErrorMu.Unlock()
close(l.close)
})
}

func (l *link) muxDetach() {
defer func() {
// final cleanup and signaling

// deallocate handle
select {
case l.session.deallocateHandle <- l:
case <-l.session.done:
if l.err == nil {
l.err = l.session.err
}
}

// signal other goroutines that links is done
close(l.done)
}()

// "A peer closes a link by sending the detach frame with the
// handle for the specified link, and the closed flag set to
// true. The partner will destroy the corresponding link
Expand All @@ -1188,13 +1207,15 @@ func (l *link) detach() {
// partner is sending a non-closing detach. In this case,
// the partner MUST signal that it has closed the link by
// reattaching and then sending a closing detach."
if l.detachSent {
return
}

l.detachErrorMu.Lock()
detachError := l.detachError
l.detachErrorMu.Unlock()

fr := &performDetach{
Handle: l.handle,
Closed: true,
Error: detachError,
}
select {
case l.session.tx <- fr:
Expand All @@ -1204,29 +1225,20 @@ func (l *link) detach() {
}
return
}
l.detachSent = true

// already received detach from remote
if l.detachReceived {
select {
case l.session.deallocateHandle <- l:
case <-l.session.done:
if l.err == nil {
l.err = l.session.err
}
}
// don't wait for remote to detach when already
// received or closing due to error
if l.detachReceived || detachError != nil {
return
}

// wait for remote to detach
Loop:
for {
select {
// read from link until detach with Close == true is received,
// other frames are discarded.
case fr := <-l.rx:
if fr, ok := fr.(*performDetach); ok && fr.Closed {
break Loop
return
}

// connection has ended
Expand All @@ -1237,16 +1249,6 @@ Loop:
return
}
}

// deallocate handle
select {
case l.session.deallocateHandle <- l:
case <-l.session.done:
if l.err == nil {
l.err = l.session.err
}
return
}
}

// LinkOption is a function for configuring an AMQP link.
Expand Down Expand Up @@ -1488,9 +1490,12 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
// ensure maxMessageSize will not be exceeded
messageSize += uint64(len(fr.Payload))
if maxMessageSize != 0 && messageSize > maxMessageSize {
// TODO: send error
_ = r.Close(ctx)
return nil, errorErrorf("received message larger than max size of %d", maxMessageSize)
msg := fmt.Sprintf("received message larger than max size of %d", maxMessageSize)
r.link.closeWithError(&Error{
Condition: ErrorMessageSizeExceeded,
Description: msg,
})
return nil, errorNew(msg)
}

// add the payload the the buffer
Expand Down

0 comments on commit 970864c

Please sign in to comment.