Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: count ping as websocket activity #203

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions internal/websocket/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@
mode := ws.nextMode(ipv4)

policy := ws.retryPolicyFactory.NewPolicy(ctx)
inactivityTimeout := time.After(ws.inactivityTimeout)

for {
var next time.Duration
Expand Down Expand Up @@ -258,6 +257,7 @@
// Store the connection so writing can take place.
ws.m.Lock()
ws.conn = conn
activity := make(chan struct{})
ws.conn.SetPingListener((func(ctx context.Context, b []byte) {
if ctx.Err() != nil {
return
Expand All @@ -270,7 +270,9 @@
})
})

inactivityTimeout = time.After(ws.inactivityTimeout)
if len(activity) == 0 {
activity <- struct{}{}

Check warning on line 274 in internal/websocket/ws.go

View check run for this annotation

Codecov / codecov/patch

internal/websocket/ws.go#L273-L274

Added lines #L273 - L274 were not covered by tests
}
}))
ws.conn.SetPongListener(func(ctx context.Context, b []byte) {
if ctx.Err() != nil {
Expand All @@ -289,22 +291,32 @@
// Read loop
for {
var msg wrp.Message
ctx, cancel := context.WithTimeout(ctx, ws.inactivityTimeout)
typ, reader, err := ws.conn.Reader(ctx)
Copy link
Contributor Author

@denopink denopink Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to include the activity from pings. that requires us to manually trigger a timeout.

Otherwise, the device will reconnect in intervals of ws.inactivityTimeout

if errors.Is(err, context.DeadlineExceeded) {
select {
case <-inactivityTimeout:
// inactivityTimeout occurred, continue with ws.read()'s error handling (connection will be closed).
default:
// Ping was received during ws.conn.Reader(), i.e.: inactivityTimeout was reset.
// Reset inactivityTimeout again for the next ws.conn.Reader().
inactivityTimeout = time.After(ws.inactivityTimeout)
cancel()
continue
ctx, cancel := context.WithCancelCause(ctx)

// Monitor for activity.
go func() {
inactivityTimeout := time.After(ws.inactivityTimeout)
loop1:
for {
select {
case <-ctx.Done():
break loop1
case <-activity:
inactivityTimeout = time.After(ws.inactivityTimeout)

Check warning on line 305 in internal/websocket/ws.go

View check run for this annotation

Codecov / codecov/patch

internal/websocket/ws.go#L304-L305

Added lines #L304 - L305 were not covered by tests
case <-inactivityTimeout:
// inactivityTimeout occurred, cancel the context.
cancel(context.DeadlineExceeded)
break loop1
}
}
} else if errors.Is(err, context.Canceled) {
// Parent context has been canceled.
cancel()
}()

typ, reader, err := ws.conn.Reader(ctx)
ctxErr := context.Cause(ctx)
err = errors.Join(err, ctxErr)
// If ctxErr is context.Canceled then the parent context has been canceled.
if errors.Is(ctxErr, context.Canceled) {
cancel(nil)
break
}

Expand All @@ -318,7 +330,7 @@
}

// Cancel ws.conn.Reader()'s context after wrp decoding.
cancel()
cancel(nil)
if err != nil {
ws.m.Lock()
ws.conn = nil
Expand Down
Loading