forked from relab/gorums
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmgr.go
137 lines (124 loc) · 3.49 KB
/
mgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package gorums
import (
"fmt"
"log"
"sync"
"sync/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)
// RawManager maintains a connection pool of nodes on
// which quorum calls can be performed.
//
// This struct is intended to be used by generated code.
// You should use the generated `Manager` struct instead.
type RawManager struct {
mu sync.Mutex
nodes []*RawNode
lookup map[uint32]*RawNode
closeOnce sync.Once
logger *log.Logger
opts managerOptions
nextMsgID uint64
}
// NewRawManager returns a new RawManager for managing connection to nodes added
// to the manager. This function accepts manager options used to configure
// various aspects of the manager. This function is meant for internal use.
// You should use the `NewManager` function in the generated code instead.
func NewRawManager(opts ...ManagerOption) *RawManager {
m := &RawManager{
lookup: make(map[uint32]*RawNode),
opts: newManagerOptions(),
}
for _, opt := range opts {
opt(&m.opts)
}
if m.opts.logger != nil {
m.logger = m.opts.logger
}
m.opts.grpcDialOpts = append(m.opts.grpcDialOpts, grpc.WithDefaultCallOptions(
grpc.CallContentSubtype(ContentSubtype),
))
if m.opts.backoff != backoff.DefaultConfig {
m.opts.grpcDialOpts = append(m.opts.grpcDialOpts, grpc.WithConnectParams(
grpc.ConnectParams{Backoff: m.opts.backoff},
))
}
if m.logger != nil {
m.logger.Printf("ready")
}
return m
}
func (m *RawManager) closeNodeConns() {
for _, node := range m.nodes {
err := node.close()
if err != nil && m.logger != nil {
m.logger.Printf("error closing: %v", err)
}
}
}
// Close closes all node connections and any client streams.
func (m *RawManager) Close() {
m.closeOnce.Do(func() {
if m.logger != nil {
m.logger.Printf("closing")
}
m.closeNodeConns()
})
}
// NodeIDs returns the identifier of each available node. IDs are returned in
// the same order as they were provided in the creation of the Manager.
func (m *RawManager) NodeIDs() []uint32 {
m.mu.Lock()
defer m.mu.Unlock()
ids := make([]uint32, 0, len(m.nodes))
for _, node := range m.nodes {
ids = append(ids, node.ID())
}
return ids
}
// Node returns the node with the given identifier if present.
func (m *RawManager) Node(id uint32) (node *RawNode, found bool) {
m.mu.Lock()
defer m.mu.Unlock()
node, found = m.lookup[id]
return node, found
}
// Nodes returns a slice of each available node. IDs are returned in the same
// order as they were provided in the creation of the Manager.
func (m *RawManager) Nodes() []*RawNode {
m.mu.Lock()
defer m.mu.Unlock()
return m.nodes
}
// Size returns the number of nodes in the Manager.
func (m *RawManager) Size() (nodes int) {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.nodes)
}
// AddNode adds the node to the manager's node pool
// and establishes a connection to the node.
func (m *RawManager) AddNode(node *RawNode) error {
if _, found := m.Node(node.ID()); found {
// Node IDs must be unique
return fmt.Errorf("config: node %d (%s) already exists", node.ID(), node.Address())
}
if m.logger != nil {
m.logger.Printf("Connecting to %s with id %d\n", node, node.id)
}
if err := node.connect(m); err != nil {
if m.logger != nil {
m.logger.Printf("Failed to connect to %s: %v (retrying)", node, err)
}
}
m.mu.Lock()
defer m.mu.Unlock()
m.lookup[node.id] = node
m.nodes = append(m.nodes, node)
return nil
}
// getMsgID returns a unique message ID.
func (m *RawManager) getMsgID() uint64 {
return atomic.AddUint64(&m.nextMsgID, 1)
}