Skip to content

Commit

Permalink
ws: Add LastAcknowledgedBeat
Browse files Browse the repository at this point in the history
This commit adds ws.Handler.LastAcknowledgedBeat to allow ws.Gateway to
monitor whether or not the server is still reachable.

It fixes #324.
  • Loading branch information
diamondburned committed Apr 14, 2022
1 parent c71f48c commit f0fa0b0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
8 changes: 8 additions & 0 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,14 @@ func (g *gatewayImpl) SendHeartbeat(ctx context.Context) {
}
}

// LastAcknowledgedBeat returns the last acknowledged beat.
func (g *gatewayImpl) LastAcknowledgedBeat() time.Time {
g.beatMutex.Lock()
defer g.beatMutex.Unlock()

return g.echoBeat
}

// Close closes the state.
func (g *gatewayImpl) Close() error {
g.retryTimer.Stop()
Expand Down
16 changes: 15 additions & 1 deletion utils/ws/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/pkg/errors"
)

// ErrHeartbeatTimeout is returned if the server fails to acknowledge our heart
// beat in time.
var ErrHeartbeatTimeout = errors.New("server timed out replying to heartbeat")

// ConnectionError is given to the user if the gateway fails to connect to the
// gateway for any reason, including during an initial connection or a
// reconnection. To check for this error, use the errors.As function.
Expand Down Expand Up @@ -97,6 +101,7 @@ type Gateway struct {

reconnect chan struct{}
heart lazytime.Ticker
heartRate time.Duration
srcOp <-chan Op // from WS
outer outerState
lastError error
Expand Down Expand Up @@ -125,6 +130,9 @@ type Handler interface {
// SendHeartbeat is called by the gateway event loop everytime a heartbeat
// needs to be sent over.
SendHeartbeat(context.Context)
// LastAcknowledgedBeat returns the last time that the server acknowledged
// our heart beat.
LastAcknowledgedBeat() time.Time
// Close closes the handler.
Close() error
}
Expand Down Expand Up @@ -263,6 +271,7 @@ func (g *Gateway) QueueReconnect() {
// ResetHeartbeat resets the heartbeat to be the given duration.
func (g *Gateway) ResetHeartbeat(d time.Duration) {
g.heart.Reset(d)
g.heartRate = d
}

// SendError sends the given error wrapped in a BackgroundErrorEvent into the
Expand Down Expand Up @@ -327,7 +336,12 @@ func (g *Gateway) spin(ctx context.Context, h Handler) {
g.lastError = nil

case <-g.heart.C:
h.SendHeartbeat(ctx)
if h.LastAcknowledgedBeat().Add(2 * g.heartRate).Before(time.Now()) {
g.SendError(ErrHeartbeatTimeout)
g.QueueReconnect()
} else {
h.SendHeartbeat(ctx)
}

case <-g.reconnect:
// Close the previous connection if it's not already. Ignore the
Expand Down
16 changes: 14 additions & 2 deletions voice/voicegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type Gateway struct {
gateway *ws.Gateway
state State // constant

mutex sync.RWMutex
ready *ReadyEvent
mutex sync.RWMutex
beatAck time.Time
ready *ReadyEvent
}

// DefaultGatewayOpts contains the default options to be used for connecting to
Expand Down Expand Up @@ -187,6 +188,10 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {
g.gateway.QueueReconnect()
}
}
case *HeartbeatAckEvent:
g.mutex.Lock()
g.beatAck = time.Now()
g.mutex.Unlock()
case *ReadyEvent:
g.mutex.Lock()
g.ready = data
Expand All @@ -204,6 +209,13 @@ func (g *gatewayImpl) SendHeartbeat(ctx context.Context) {
}
}

func (g *gatewayImpl) LastAcknowledgedBeat() time.Time {
g.mutex.RLock()
defer g.mutex.RUnlock()

return g.beatAck
}

func (g *gatewayImpl) Close() error {
return nil
}

0 comments on commit f0fa0b0

Please sign in to comment.