forked from Azure/go-amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsender.go
156 lines (132 loc) · 4.14 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package amqp
import (
"context"
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// Sender sends messages on a single AMQP link.
type Sender struct {
link *link
mu sync.Mutex // protects buf and nextDeliveryTag
buf buffer.Buffer
nextDeliveryTag uint64
}
// LinkName() is the name of the link used for this Sender.
func (s *Sender) LinkName() string {
return s.link.Key.name
}
// MaxMessageSize is the maximum size of a single message.
func (s *Sender) MaxMessageSize() uint64 {
return s.link.MaxMessageSize
}
// Send sends a Message.
//
// Blocks until the message is sent, ctx completes, or an error occurs.
//
// Send is safe for concurrent use. Since only a single message can be
// sent on a link at a time, this is most useful when settlement confirmation
// has been requested (receiver settle mode is "Second"). In this case,
// additional messages can be sent while the current goroutine is waiting
// for the confirmation.
func (s *Sender) Send(ctx context.Context, msg *Message) error {
if err := s.link.Check(); err != nil {
return err
}
done, err := s.send(ctx, msg)
if err != nil {
return err
}
// wait for transfer to be confirmed
select {
case state := <-done:
if state, ok := state.(*encoding.StateRejected); ok {
return state.Error
}
return nil
case <-s.link.Detached:
return s.link.err
case <-ctx.Done():
return ctx.Err()
}
}
// send is separated from Send so that the mutex unlock can be deferred without
// locking the transfer confirmation that happens in Send.
func (s *Sender) send(ctx context.Context, msg *Message) (chan encoding.DeliveryState, error) {
const maxDeliveryTagLength = 32
if len(msg.DeliveryTag) > maxDeliveryTagLength {
return nil, fmt.Errorf("delivery tag is over the allowed %v bytes, len: %v", maxDeliveryTagLength, len(msg.DeliveryTag))
}
s.mu.Lock()
defer s.mu.Unlock()
s.buf.Reset()
err := msg.Marshal(&s.buf)
if err != nil {
return nil, err
}
if s.link.MaxMessageSize != 0 && uint64(s.buf.Len()) > s.link.MaxMessageSize {
return nil, fmt.Errorf("encoded message size exceeds max of %d", s.link.MaxMessageSize)
}
var (
maxPayloadSize = int64(s.link.Session.conn.PeerMaxFrameSize) - maxTransferFrameHeader
sndSettleMode = s.link.SenderSettleMode
senderSettled = sndSettleMode != nil && (*sndSettleMode == ModeSettled || (*sndSettleMode == ModeMixed && msg.SendSettled))
deliveryID = atomic.AddUint32(&s.link.Session.nextDeliveryID, 1)
)
deliveryTag := msg.DeliveryTag
if len(deliveryTag) == 0 {
// use uint64 encoded as []byte as deliveryTag
deliveryTag = make([]byte, 8)
binary.BigEndian.PutUint64(deliveryTag, s.nextDeliveryTag)
s.nextDeliveryTag++
}
fr := frames.PerformTransfer{
Handle: s.link.Handle,
DeliveryID: &deliveryID,
DeliveryTag: deliveryTag,
MessageFormat: &msg.Format,
More: s.buf.Len() > 0,
}
for fr.More {
buf, _ := s.buf.Next(maxPayloadSize)
fr.Payload = append([]byte(nil), buf...)
fr.More = s.buf.Len() > 0
if !fr.More {
// SSM=settled: overrides RSM; no acks.
// SSM=unsettled: sender should wait for receiver to ack
// RSM=first: receiver considers it settled immediately, but must still send ack (SSM=unsettled only)
// RSM=second: receiver sends ack and waits for return ack from sender (SSM=unsettled only)
// mark final transfer as settled when sender mode is settled
fr.Settled = senderSettled
// set done on last frame
fr.Done = make(chan encoding.DeliveryState, 1)
}
select {
case s.link.Transfers <- fr:
case <-s.link.Detached:
return nil, s.link.err
case <-ctx.Done():
return nil, ctx.Err()
}
// clear values that are only required on first message
fr.DeliveryID = nil
fr.DeliveryTag = nil
fr.MessageFormat = nil
}
return fr.Done, nil
}
// Address returns the link's address.
func (s *Sender) Address() string {
if s.link.Target == nil {
return ""
}
return s.link.Target.Address
}
// Close closes the Sender and AMQP link.
func (s *Sender) Close(ctx context.Context) error {
return s.link.Close(ctx)
}