diff --git a/gateway/gateway.go b/gateway/gateway.go index ca88e5c5..78013e0b 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -392,6 +392,8 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool { now := time.Now() g.beatMutex.Lock() + // Keep sentBeat separately with the echoed beat to calculate the + // gateway latency properly. g.sentBeat = g.lastSentBeat g.echoBeat = now g.beatMutex.Unlock() @@ -408,15 +410,26 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool { // SendHeartbeat sends a heartbeat with the gateway's current sequence. func (g *gatewayImpl) SendHeartbeat(ctx context.Context) { - g.lastSentBeat = time.Now() - sequence := HeartbeatCommand(g.state.Sequence) + if err := g.gateway.Send(ctx, &sequence); err != nil { g.gateway.SendErrorWrap(err, "heartbeat error") g.gateway.QueueReconnect() + } else { + g.beatMutex.Lock() + g.lastSentBeat = time.Now() + g.beatMutex.Unlock() } } +// LastAcknowledgedBeat returns the last acknowledged beat. +func (g *gatewayImpl) LastAcknowledgedBeat() time.Time { + g.beatMutex.Lock() + defer g.beatMutex.Unlock() + + return g.echoBeat +} + // Close closes the state. func (g *gatewayImpl) Close() error { g.retryTimer.Stop() diff --git a/utils/ws/gateway.go b/utils/ws/gateway.go index 69c06a88..17af211f 100644 --- a/utils/ws/gateway.go +++ b/utils/ws/gateway.go @@ -11,6 +11,10 @@ import ( "github.com/pkg/errors" ) +// ErrHeartbeatTimeout is returned if the server fails to acknowledge our heart +// beat in time. +var ErrHeartbeatTimeout = errors.New("server timed out replying to heartbeat") + // ConnectionError is given to the user if the gateway fails to connect to the // gateway for any reason, including during an initial connection or a // reconnection. To check for this error, use the errors.As function. @@ -95,8 +99,11 @@ var DefaultGatewayOpts = GatewayOpts{ type Gateway struct { ws *Websocket - reconnect chan struct{} heart lazytime.Ticker + heartRate time.Duration + lastBeat time.Time + + reconnect chan struct{} srcOp <-chan Op // from WS outer outerState lastError error @@ -104,6 +111,19 @@ type Gateway struct { opts GatewayOpts } +// HeartbeatInfo is the heart rate information. It is used to ensure that the +// gateway is alive. +type HeartbeatInfo struct { + LastAcknowledged time.Time + LastSent time.Time +} + +// ShouldReconnect returns true if the heartbeat info and the heart rate +// suggests that we should reconnect. +func (i HeartbeatInfo) ShouldReconnect(hr time.Duration) bool { + return i.LastAcknowledged.Add(2 * hr).Before(i.LastSent) +} + // outerState holds gateway state that the caller may change concurrently. As // such, it holds a mutex to allow that. The main purpose of this // synchronization is to allow the caller to use the gateway while the event @@ -125,6 +145,9 @@ type Handler interface { // SendHeartbeat is called by the gateway event loop everytime a heartbeat // needs to be sent over. SendHeartbeat(context.Context) + // LastAcknowledgedBeat returns the last time that the server acknowledged + // our heart beat. + LastAcknowledgedBeat() time.Time // Close closes the handler. Close() error } @@ -263,6 +286,7 @@ func (g *Gateway) QueueReconnect() { // ResetHeartbeat resets the heartbeat to be the given duration. func (g *Gateway) ResetHeartbeat(d time.Duration) { g.heart.Reset(d) + g.heartRate = d } // SendError sends the given error wrapped in a BackgroundErrorEvent into the @@ -327,6 +351,17 @@ func (g *Gateway) spin(ctx context.Context, h Handler) { g.lastError = nil case <-g.heart.C: + const missThreshold = 2 // allow 2 heart beat misses + + if !g.lastBeat.IsZero() { + if h.LastAcknowledgedBeat().Add(missThreshold * g.heartRate).Before(g.lastBeat) { + g.SendError(ErrHeartbeatTimeout) + g.QueueReconnect() + continue + } + } + + g.lastBeat = time.Now() h.SendHeartbeat(ctx) case <-g.reconnect: @@ -339,6 +374,11 @@ func (g *Gateway) spin(ctx context.Context, h Handler) { // Invalidate our srcOp. g.srcOp = nil + // Invalidate our last sent beat timestamp so we don't mistakenly + // think that the gateway is dead after we reconnect and before we + // sent a heartbeat. + g.lastBeat = time.Time{} + // Keep track of the last error for notifying. var err error diff --git a/voice/voicegateway/gateway.go b/voice/voicegateway/gateway.go index f8d73994..2a06d7b0 100644 --- a/voice/voicegateway/gateway.go +++ b/voice/voicegateway/gateway.go @@ -45,8 +45,9 @@ type Gateway struct { gateway *ws.Gateway state State // constant - mutex sync.RWMutex - ready *ReadyEvent + mutex sync.RWMutex + beatAck time.Time + ready *ReadyEvent } // DefaultGatewayOpts contains the default options to be used for connecting to @@ -187,6 +188,10 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool { g.gateway.QueueReconnect() } } + case *HeartbeatAckEvent: + g.mutex.Lock() + g.beatAck = time.Now() + g.mutex.Unlock() case *ReadyEvent: g.mutex.Lock() g.ready = data @@ -204,6 +209,13 @@ func (g *gatewayImpl) SendHeartbeat(ctx context.Context) { } } +func (g *gatewayImpl) LastAcknowledgedBeat() time.Time { + g.mutex.RLock() + defer g.mutex.RUnlock() + + return g.beatAck +} + func (g *gatewayImpl) Close() error { return nil }