-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexchange.go
81 lines (72 loc) · 2.16 KB
/
exchange.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package clarimq
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
// ExchangeOptions are used to configure an exchange.
// If the Passive flag is set the client will only check if the exchange exists on the broker
// and that the settings match, no creation attempt will be made.
type ExchangeOptions struct {
// Are used by plugins and broker-specific features such as message TTL, queue length limit, etc.
Args Table
// Exchange name.
Name string
// Exchange type. Possible values: empty string for default exchange or direct, topic, fanout
Kind string
// If true, the exchange survives broker restart.
Durable bool
// If true, the exchange is deleted when last queue is unbound from it.
AutoDelete bool
// If yes, clients cannot publish to this exchange directly. It can only be used with exchange to exchange bindings.
Internal bool
// If true, the client does not wait for a reply method. If the broker could not complete the method it will raise a channel or connection exception.
NoWait bool
// If false, a missing exchange will be created on the broker.
Passive bool
// If true, the exchange will be created only if it does not already exist.
Declare bool
}
func defaultExchangeOptions() *ExchangeOptions {
return &ExchangeOptions{
Args: make(Table),
Name: ExchangeDefault,
Kind: amqp.ExchangeDirect,
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Passive: false,
Declare: false,
}
}
func declareExchange(channelExec channelExec, options *ExchangeOptions) error {
const errMessage = "failed to declare exchange: %w"
if !options.Declare {
return nil
}
if err := channelExec(func(channel *amqp.Channel) error {
if options.Passive {
return channel.ExchangeDeclarePassive(
options.Name,
options.Kind,
options.Durable,
options.AutoDelete,
options.Internal,
options.NoWait,
amqp.Table(options.Args),
)
}
return channel.ExchangeDeclare(
options.Name,
options.Kind,
options.Durable,
options.AutoDelete,
options.Internal,
options.NoWait,
amqp.Table(options.Args),
)
}); err != nil {
return fmt.Errorf(errMessage, err)
}
return nil
}