-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathheartbeat.go
131 lines (117 loc) · 2.8 KB
/
heartbeat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
* Copyright (c) 2019 Zenichi Amano
*
* This file is part of go-push-receiver, which is MIT licensed.
* See http://opensource.org/licenses/MIT
*/
package pushreceiver
import (
"context"
"time"
)
// Heartbeat sends signal for connection keep alive.
type Heartbeat struct {
clientInterval time.Duration
serverInterval time.Duration
deadmanTimeout time.Duration
adaptive bool
}
// HeartbeatOption type
type HeartbeatOption func(*Heartbeat)
// WithClientInterval is heartbeat client interval setter
func WithClientInterval(interval time.Duration) HeartbeatOption {
return func(heartbeat *Heartbeat) {
heartbeat.clientInterval = interval
}
}
// WithServerInterval is heartbeat server interval setter
func WithServerInterval(interval time.Duration) HeartbeatOption {
return func(heartbeat *Heartbeat) {
// minimum 1 minute
if interval > 1*time.Minute {
heartbeat.serverInterval = interval
} else {
heartbeat.serverInterval = 1 * time.Minute
}
}
}
// WithDeadmanTimeout is heartbeat deadman timeout setter
func WithDeadmanTimeout(timeout time.Duration) HeartbeatOption {
return func(heartbeat *Heartbeat) {
heartbeat.deadmanTimeout = timeout
}
}
// WithAdaptive is heartbeat adaptive setter
func WithAdaptive(enabled bool) HeartbeatOption {
return func(heartbeat *Heartbeat) {
heartbeat.adaptive = enabled
}
}
func newHeartbeat(options ...HeartbeatOption) *Heartbeat {
h := &Heartbeat{}
for _, option := range options {
option(h)
}
return h
}
func (h *Heartbeat) start(ctx context.Context, mcs *mcs) {
if h.deadmanTimeout <= 0 {
if h.clientInterval < h.serverInterval {
h.deadmanTimeout = durationDeadmanTimeout(h.serverInterval)
} else {
h.deadmanTimeout = durationDeadmanTimeout(h.clientInterval)
}
}
var (
pingDeadman *time.Timer
pingDeadmanC <-chan time.Time
)
if h.deadmanTimeout > 0 {
pingDeadman = time.NewTimer(h.deadmanTimeout)
pingDeadmanC = pingDeadman.C
}
defer func() {
if pingDeadman != nil {
pingDeadman.Stop()
}
}()
var (
pingTicker *time.Ticker
pingTickerC <-chan time.Time
)
if h.clientInterval > 0 {
pingTicker = time.NewTicker(h.clientInterval)
pingTickerC = pingTicker.C
}
defer func() {
if pingTicker != nil {
pingTicker.Stop()
}
}()
for {
select {
case <-ctx.Done():
mcs.log.Info().Msg("Force disconnect by canceled context")
mcs.disconnect()
return
case <-mcs.heartbeatAck:
if pingDeadman != nil {
pingDeadman.Reset(h.deadmanTimeout)
}
case <-pingDeadmanC:
// force disconnect
mcs.log.Info().Msg("Force disconnect by heartbeat timeout")
mcs.disconnect()
return
case <-pingTickerC:
// send heartbeat to FCM
err := mcs.SendHeartbeatPingPacket()
if err != nil {
return
}
}
}
}
func durationDeadmanTimeout(interval time.Duration) time.Duration {
return interval * 4
}