Skip to content

Commit

Permalink
Wait for go routines to finish before cancelling the context
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Aug 25, 2024
1 parent f8d98a4 commit 4ad4c80
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions picker_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package grpc
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -80,6 +81,8 @@ func (s) TestBlockingPick(t *testing.T) {
var finishedCount uint64
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
wg := sync.WaitGroup{}
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
Expand All @@ -93,6 +96,8 @@ func (s) TestBlockingPick(t *testing.T) {
t.Errorf("finished goroutines count: %v, want 0", c)
}
bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
// Wait for all pickers to finish before the context is cancelled.
wg.Wait()
}

func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
Expand All @@ -102,6 +107,8 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// All goroutines should block because picker returns no subConn available.
wg := sync.WaitGroup{}
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
Expand All @@ -115,6 +122,8 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
t.Errorf("finished goroutines count: %v, want 0", c)
}
bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
// Wait for all pickers to finish before the context is cancelled.
wg.Wait()
}

func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
Expand All @@ -125,6 +134,8 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
defer cancel()
// All goroutines should block because picker returns transientFailure and
// picks are not failfast.
wg := sync.WaitGroup{}
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || tr != testT {
Expand All @@ -138,6 +149,8 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
t.Errorf("finished goroutines count: %v, want 0", c)
}
bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
// Wait for all pickers to finish before the context is cancelled.
wg.Wait()
}

func (s) TestBlockingPickSCNotReady(t *testing.T) {
Expand All @@ -147,17 +160,22 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// All goroutines should block because subConn is not ready.
wg := sync.WaitGroup{}
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
}()
}
time.Sleep(time.Millisecond)
if c := atomic.LoadUint64(&finishedCount); c != 0 {
t.Errorf("finished goroutines count: %v, want 0", c)
}
bp.updatePicker(&testingPicker{sc: testSC, maxCalled: goroutineCount})
// Wait for all pickers to finish before the context is cancelled.
wg.Wait()
}

0 comments on commit 4ad4c80

Please sign in to comment.