forked from gocraft/work
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathheartbeater.go
134 lines (113 loc) · 2.74 KB
/
heartbeater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package work
import (
"os"
"sort"
"strings"
"time"
)
const (
beatPeriod = 5 * time.Second
)
type workerPoolHeartbeater struct {
workerPoolID string
namespace string // eg, "myapp-work"
pool Pool
beatPeriod time.Duration
concurrency uint
jobNames string
startedAt int64
pid int
hostname string
workerIDs string
stopChan chan struct{}
doneStoppingChan chan struct{}
logger StructuredLogger
}
func newWorkerPoolHeartbeater(
namespace string,
pool Pool,
workerPoolID string,
jobTypes map[string]*jobType,
concurrency uint,
workerIDs []string,
logger StructuredLogger,
) *workerPoolHeartbeater {
h := &workerPoolHeartbeater{
workerPoolID: workerPoolID,
namespace: namespace,
pool: pool,
beatPeriod: beatPeriod,
concurrency: concurrency,
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
logger: logger,
}
jobNames := make([]string, 0, len(jobTypes))
for k := range jobTypes {
jobNames = append(jobNames, k)
}
sort.Strings(jobNames)
h.jobNames = strings.Join(jobNames, ",")
sort.Strings(workerIDs)
h.workerIDs = strings.Join(workerIDs, ",")
h.pid = os.Getpid()
host, err := os.Hostname()
if err != nil {
h.logger.Error("heartbeat.hostname", errAttr(err))
host = "hostname_errored"
}
h.hostname = host
return h
}
func (h *workerPoolHeartbeater) start() {
go h.loop()
}
func (h *workerPoolHeartbeater) stop() {
h.stopChan <- struct{}{}
<-h.doneStoppingChan
}
func (h *workerPoolHeartbeater) loop() {
h.startedAt = nowEpochSeconds()
h.heartbeat() // do it right away
ticker := time.Tick(h.beatPeriod)
for {
select {
case <-h.stopChan:
h.removeHeartbeat()
h.doneStoppingChan <- struct{}{}
return
case <-ticker:
h.heartbeat()
}
}
}
func (h *workerPoolHeartbeater) heartbeat() {
conn := h.pool.Get()
defer conn.Close()
workerPoolsKey := redisKeyWorkerPools(h.namespace)
heartbeatKey := redisKeyHeartbeat(h.namespace, h.workerPoolID)
conn.Send("SADD", workerPoolsKey, h.workerPoolID)
conn.Send("HMSET", heartbeatKey,
"heartbeat_at", nowEpochSeconds(),
"started_at", h.startedAt,
"job_names", h.jobNames,
"concurrency", h.concurrency,
"worker_ids", h.workerIDs,
"host", h.hostname,
"pid", h.pid,
)
if err := conn.Flush(); err != nil {
h.logger.Error("heartbeat", errAttr(err))
}
}
func (h *workerPoolHeartbeater) removeHeartbeat() {
conn := h.pool.Get()
defer conn.Close()
workerPoolsKey := redisKeyWorkerPools(h.namespace)
heartbeatKey := redisKeyHeartbeat(h.namespace, h.workerPoolID)
conn.Send("SREM", workerPoolsKey, h.workerPoolID)
conn.Send("DEL", heartbeatKey)
if err := conn.Flush(); err != nil {
h.logger.Error("remove_heartbeat", errAttr(err))
}
}