-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathelection.go
164 lines (133 loc) · 3.53 KB
/
election.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package supervisor
import (
"errors"
"fmt"
"strconv"
"strings"
"github.com/samuel/go-zookeeper/zk"
)
const (
// NodeRoleMaster current node is master
NodeRoleMaster NodeRole = 1
// NodeRoleSlave current node is slave
NodeRoleSlave NodeRole = 2
)
// NodeRole current node state (master or slave)
type NodeRole int32
func (ns NodeRole) String() string {
ss := "Master"
if ns == 2 {
return "Slave"
}
return ss
}
// NodeRoleChangeFunc callback function when node changes role (Master or Slave)
type NodeRoleChangeFunc func()
// ByNodeGUID order list of nodes by incremental id
type ByNodeGUID []string
func (ni ByNodeGUID) getID(nodeGUID string) int64 {
id, _ := strconv.ParseInt(nodeGUID[strings.LastIndex(nodeGUID, "-")+1:], 10, 16)
return id
}
func (ni ByNodeGUID) Len() int {
return len(ni)
}
func (ni ByNodeGUID) Swap(i, j int) {
ni[i], ni[j] = ni[j], ni[i]
}
func (ni ByNodeGUID) Less(i, j int) bool {
return ni.getID(ni[i]) < ni.getID(ni[j])
}
// RoleSelector holds role selector information
type RoleSelector struct {
client *Client
notificationSent bool
path string
guid string
nodePath string
Role NodeRole
IsMaster chan bool
Error chan error
close chan bool
}
// Start starts listening for node role change
func (rs *RoleSelector) Start() {
if !rs.client.isConnected {
rs.Error <- errors.New("Client not connected")
}
_, err := rs.client.createParentNodeIfNotExists(rs.path, []byte{})
if err != nil {
rs.Error <- err
}
abspath, guid, err := rs.client.createProtectedEphemeralSequential(rs.path, []byte{})
if err != nil {
rs.Error <- fmt.Errorf("%s - %s", err.Error(), rs.path)
}
rs.nodePath = abspath
rs.guid = guid
go rs.listen()
}
func (rs *RoleSelector) listen() {
for {
children, _, channel, err := rs.client.childrenWatch(rs.path)
if err != nil {
rs.Error <- err
}
if len(children) == 1 && !rs.notificationSent {
rs.client.currentRole = NodeRoleMaster
rs.notificationSent = true
rs.Role = NodeRoleMaster
rs.IsMaster <- true
}
select {
case event := <-channel:
rs.notify(event)
case <-rs.close:
}
}
}
func (rs *RoleSelector) notify(event zk.Event) {
if event.Type == zk.EventNodeChildrenChanged {
nodeGUIDList, err := rs.client.getSortedNodeGUIDList(rs.path)
if err != nil {
rs.Error <- err
}
// notify only if current node turns master
if len(nodeGUIDList) > 0 {
if nodeGUIDList[0] == rs.guid && rs.client.currentRole == NodeRoleSlave && rs.notificationSent == false {
rs.notificationSent = true
rs.Role = NodeRoleMaster
rs.IsMaster <- true
}
}
}
}
// Stop stops listening for node role change
func (rs *RoleSelector) Stop() error {
rs.close <- true
if err := rs.client.deleteNodeLastVersion(rs.nodePath); err != nil {
return fmt.Errorf("Could not remove node %s - %s", rs.nodePath, err.Error())
}
nodeGUIDList, err := rs.client.getSortedNodeGUIDList(rs.path)
if err != nil {
return err
}
if len(nodeGUIDList) == 0 {
if err := rs.client.deleteNodeLastVersion(rs.path); err != nil {
return err
}
}
return nil
}
// NewRoleSelector returns new role selector for master election
func NewRoleSelector(c *Client, path string) *RoleSelector {
rs := RoleSelector{
client: c,
path: path,
Role: NodeRoleSlave,
IsMaster: make(chan bool),
Error: make(chan error),
close: make(chan bool),
}
return &rs
}