Skip to content

Commit

Permalink
ws: Add LastAcknowledgedBeat
Browse files Browse the repository at this point in the history
This commit adds ws.Handler.LastAcknowledgedBeat to allow ws.Gateway to
monitor whether or not the server is still reachable.

It fixes #324.
  • Loading branch information
diamondburned committed Apr 18, 2022
1 parent df0fa66 commit da19fc2
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 5 deletions.
17 changes: 15 additions & 2 deletions gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {
now := time.Now()

g.beatMutex.Lock()
// Keep sentBeat separately with the echoed beat to calculate the
// gateway latency properly.
g.sentBeat = g.lastSentBeat
g.echoBeat = now
g.beatMutex.Unlock()
Expand All @@ -408,15 +410,26 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {

// SendHeartbeat sends a heartbeat with the gateway's current sequence.
func (g *gatewayImpl) SendHeartbeat(ctx context.Context) {
g.lastSentBeat = time.Now()

sequence := HeartbeatCommand(g.state.Sequence)

if err := g.gateway.Send(ctx, &sequence); err != nil {
g.gateway.SendErrorWrap(err, "heartbeat error")
g.gateway.QueueReconnect()
} else {
g.beatMutex.Lock()
g.lastSentBeat = time.Now()
g.beatMutex.Unlock()
}
}

// LastAcknowledgedBeat returns the last acknowledged beat.
func (g *gatewayImpl) LastAcknowledgedBeat() time.Time {
g.beatMutex.Lock()
defer g.beatMutex.Unlock()

return g.echoBeat
}

// Close closes the state.
func (g *gatewayImpl) Close() error {
g.retryTimer.Stop()
Expand Down
42 changes: 41 additions & 1 deletion utils/ws/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/pkg/errors"
)

// ErrHeartbeatTimeout is returned if the server fails to acknowledge our heart
// beat in time.
var ErrHeartbeatTimeout = errors.New("server timed out replying to heartbeat")

// ConnectionError is given to the user if the gateway fails to connect to the
// gateway for any reason, including during an initial connection or a
// reconnection. To check for this error, use the errors.As function.
Expand Down Expand Up @@ -95,15 +99,31 @@ var DefaultGatewayOpts = GatewayOpts{
type Gateway struct {
ws *Websocket

reconnect chan struct{}
heart lazytime.Ticker
heartRate time.Duration
lastBeat time.Time

reconnect chan struct{}
srcOp <-chan Op // from WS
outer outerState
lastError error

opts GatewayOpts
}

// HeartbeatInfo is the heart rate information. It is used to ensure that the
// gateway is alive.
type HeartbeatInfo struct {
LastAcknowledged time.Time
LastSent time.Time
}

// ShouldReconnect returns true if the heartbeat info and the heart rate
// suggests that we should reconnect.
func (i HeartbeatInfo) ShouldReconnect(hr time.Duration) bool {
return i.LastAcknowledged.Add(2 * hr).Before(i.LastSent)
}

// outerState holds gateway state that the caller may change concurrently. As
// such, it holds a mutex to allow that. The main purpose of this
// synchronization is to allow the caller to use the gateway while the event
Expand All @@ -125,6 +145,9 @@ type Handler interface {
// SendHeartbeat is called by the gateway event loop everytime a heartbeat
// needs to be sent over.
SendHeartbeat(context.Context)
// LastAcknowledgedBeat returns the last time that the server acknowledged
// our heart beat.
LastAcknowledgedBeat() time.Time
// Close closes the handler.
Close() error
}
Expand Down Expand Up @@ -263,6 +286,7 @@ func (g *Gateway) QueueReconnect() {
// ResetHeartbeat resets the heartbeat to be the given duration.
func (g *Gateway) ResetHeartbeat(d time.Duration) {
g.heart.Reset(d)
g.heartRate = d
}

// SendError sends the given error wrapped in a BackgroundErrorEvent into the
Expand Down Expand Up @@ -327,6 +351,17 @@ func (g *Gateway) spin(ctx context.Context, h Handler) {
g.lastError = nil

case <-g.heart.C:
const missThreshold = 2 // allow 2 heart beat misses

if !g.lastBeat.IsZero() {
if h.LastAcknowledgedBeat().Add(missThreshold * g.heartRate).Before(g.lastBeat) {
g.SendError(ErrHeartbeatTimeout)
g.QueueReconnect()
continue
}
}

g.lastBeat = time.Now()
h.SendHeartbeat(ctx)

case <-g.reconnect:
Expand All @@ -339,6 +374,11 @@ func (g *Gateway) spin(ctx context.Context, h Handler) {
// Invalidate our srcOp.
g.srcOp = nil

// Invalidate our last sent beat timestamp so we don't mistakenly
// think that the gateway is dead after we reconnect and before we
// sent a heartbeat.
g.lastBeat = time.Time{}

// Keep track of the last error for notifying.
var err error

Expand Down
16 changes: 14 additions & 2 deletions voice/voicegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ type Gateway struct {
gateway *ws.Gateway
state State // constant

mutex sync.RWMutex
ready *ReadyEvent
mutex sync.RWMutex
beatAck time.Time
ready *ReadyEvent
}

// DefaultGatewayOpts contains the default options to be used for connecting to
Expand Down Expand Up @@ -187,6 +188,10 @@ func (g *gatewayImpl) OnOp(ctx context.Context, op ws.Op) bool {
g.gateway.QueueReconnect()
}
}
case *HeartbeatAckEvent:
g.mutex.Lock()
g.beatAck = time.Now()
g.mutex.Unlock()
case *ReadyEvent:
g.mutex.Lock()
g.ready = data
Expand All @@ -204,6 +209,13 @@ func (g *gatewayImpl) SendHeartbeat(ctx context.Context) {
}
}

func (g *gatewayImpl) LastAcknowledgedBeat() time.Time {
g.mutex.RLock()
defer g.mutex.RUnlock()

return g.beatAck
}

func (g *gatewayImpl) Close() error {
return nil
}

0 comments on commit da19fc2

Please sign in to comment.