Skip to content

Commit

Permalink
Some more tests required
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Jan 21, 2025
1 parent 4e076d9 commit c6e8b25
Show file tree
Hide file tree
Showing 13 changed files with 798 additions and 980 deletions.
26 changes: 14 additions & 12 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,12 @@ func init() {
// ChildState is the balancer state of a child along with the endpoint which
// identifies the child balancer.
type ChildState struct {
Endpoint resolver.Endpoint
State balancer.State
balancerWrapper *balancerWrapper
}
Endpoint resolver.Endpoint
State balancer.State

// ExitIdle pings the child balancer to exit idle state. It calls ExitIdle of
// the child balancer on a separate goroutine, so callers don't need to handle
// synchronous picker updates.
func (cs *ChildState) ExitIdle() {
cs.balancerWrapper.exitIdle()
// Balancer exposes only the ExitIdler interface of the child LB policy.
// Other methods on the child policy are called only by endpointsharding.
Balancer balancer.ExitIdler
}

// NewBalancer returns a load balancing policy that manages homogeneous child
Expand Down Expand Up @@ -143,12 +139,17 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
var bal *balancerWrapper
if child, ok := children.Get(endpoint); ok {
bal = child.(*balancerWrapper)
// Endpoint attributes may have changes, update the stored endpoint.
es.mu.Lock()
bal.childState.Endpoint = endpoint
es.mu.Unlock()
} else {
bal = &balancerWrapper{
childState: ChildState{Endpoint: endpoint},
ClientConn: es.cc,
es: es,
}
bal.childState.Balancer = bal
bal.Balancer = gracefulswitch.NewBalancer(bal, es.bOpts)
}
newChildren.Set(endpoint, bal)
Expand Down Expand Up @@ -324,13 +325,14 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
bw.childState.State = state
bw.es.mu.Unlock()
if state.ConnectivityState == connectivity.Idle && bw.es.enableAutoReconnect {
bw.exitIdle()
bw.ExitIdle()
}
bw.es.updateState()
}

// exitIdle pings an IDLE child balancer to exit idle.
func (bw *balancerWrapper) exitIdle() {
// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to
// avoid deadlocks due to synchronous balancer state updates.
func (bw *balancerWrapper) ExitIdle() {
if ei, ok := bw.Balancer.(balancer.ExitIdler); ok {
go func() {
bw.es.childMu.Lock()
Expand Down
74 changes: 73 additions & 1 deletion balancer/endpointsharding/endpointsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"testing"
"time"

Expand All @@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/roundrobin"
Expand All @@ -42,6 +44,11 @@ import (
testgrpc "google.golang.org/grpc/interop/grpc_testing"
)

var (
defaultTestTimeout = time.Second * 10
defaultTestShortTimeout = time.Millisecond * 10

Check failure on line 49 in balancer/endpointsharding/endpointsharding_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

var defaultTestShortTimeout is unused (U1000)

Check failure on line 49 in balancer/endpointsharding/endpointsharding_test.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.22)

var defaultTestShortTimeout is unused (U1000)
)

type s struct {
grpctest.Tester
}
Expand Down Expand Up @@ -141,7 +148,7 @@ func (s) TestEndpointShardingBasic(t *testing.T) {
log.Fatalf("Failed to create new client: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
// Assert a round robin distribution between the two spun up backends. This
Expand All @@ -151,3 +158,68 @@ func (s) TestEndpointShardingBasic(t *testing.T) {
t.Fatalf("error in expected round robin: %v", err)
}
}

// Tests that endpointsharding doesn't automatically re-connect IDLE children.
func (s) TestEndpointShardingReconnectDisabled(t *testing.T) {
backend1 := stubserver.StartTestService(t, nil)
defer backend1.Stop()
backend2 := stubserver.StartTestService(t, nil)
defer backend2.Stop()
backend3 := stubserver.StartTestService(t, nil)
defer backend3.Stop()

mr := manual.NewBuilderWithScheme("e2e-test")
defer mr.Close()

name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "")
bf := stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bal := endpointsharding.NewBalancerWithoutAutoReconnect(bd.ClientConn, bd.BuildOptions)
bd.Data = bal
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: endpointsharding.PickFirstConfig,
ResolverState: ccs.ResolverState,
})
},
Close: func(bd *stub.BalancerData) {
bd.Data.(balancer.Balancer).Close()
},
}
stub.Register(name, bf)

json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, name)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json)
mr.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend1.Address}, {Addr: backend2.Address}}},
{Addresses: []resolver.Address{{Addr: backend3.Address}}},
},
ServiceConfig: sc,
})

cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to create new client: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)
// Assert a round robin distribution between the two spun up backends. This
// requires a poll and eventual consistency as both endpoint children do not
// start in state READY.
if err = roundrobin.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: backend1.Address}, {Addr: backend3.Address}}); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}

// On closing the first server, the first child balancer should enter
// IDLE. Since endpointsharding is configured not to auto-reconnect, it will
// remain IDLE and will not try to connect to the second backend in the same
// endpoint.
backend1.Stop()
if err = roundrobin.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: backend3.Address}}); err != nil {
t.Fatalf("error in expected round robin: %v", err)
}
}
17 changes: 5 additions & 12 deletions balancer/lazy/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (lb *lazyBalancer) ResolverError(err error) {
func (lb *lazyBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
lb.mu.Lock()
defer lb.mu.Unlock()
if childLBCfg, ok := ccs.BalancerConfig.(*lbCfg); !ok {
if childLBCfg, ok := ccs.BalancerConfig.(lbCfg); !ok {
lb.logger.Errorf("Got LB config of unexpected type: %v", ccs.BalancerConfig)
ccs.BalancerConfig = nil
} else {
Expand All @@ -119,12 +119,6 @@ func (lb *lazyBalancer) UpdateClientConnState(ccs balancer.ClientConnState) erro
return lb.delegate.UpdateClientConnState(ccs)
}

if childLBCfg, ok := ccs.BalancerConfig.(*lbCfg); !ok {
lb.logger.Errorf("Got LB config of unexpected type: %v", ccs.BalancerConfig)
ccs.BalancerConfig = nil
} else {
ccs.BalancerConfig = childLBCfg.childLBCfg
}
lb.latestClientConnState = &ccs
lb.latestResolverError = nil
lb.updateBalancerStateLocked()
Expand All @@ -147,7 +141,7 @@ func (lb *lazyBalancer) ExitIdle() {
}
lb.delegate = gracefulswitch.NewBalancer(lb.cc, lb.buildOptions)
if lb.latestClientConnState != nil {
if err := lb.UpdateClientConnState(*lb.latestClientConnState); err != nil {
if err := lb.delegate.UpdateClientConnState(*lb.latestClientConnState); err != nil {
if err == balancer.ErrBadResolverState {
lb.cc.ResolveNow(resolver.ResolveNowOptions{})
} else {
Expand Down Expand Up @@ -179,10 +173,9 @@ type lbCfg struct {
childLBCfg serviceconfig.LoadBalancingConfig
}

func (b *builder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
func (b builder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
jsonReprsentation := &struct {
ChildPolicy json.RawMessage
NoAutoReconnect bool
ChildPolicy json.RawMessage
}{}
if err := json.Unmarshal(lbConfig, jsonReprsentation); err != nil {
return nil, err
Expand All @@ -191,7 +184,7 @@ func (b *builder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalan
if err != nil {
return nil, err
}
return &lbCfg{childLBCfg: childCfg}, nil
return lbCfg{childLBCfg: childCfg}, nil
}

// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
Expand Down
8 changes: 8 additions & 0 deletions balancer/weightedroundrobin/weightedroundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func GetAddrInfo(addr resolver.Address) AddrInfo {
return ai
}

// GetAddrInfoFromEndpoint returns the AddrInfo stored in the Attributes field
// of endpoint.
func GetAddrInfoFromEndpoint(endpoint resolver.Endpoint) AddrInfo {
v := endpoint.Attributes.Value(attributeKey{})
ai, _ := v.(AddrInfo)
return ai
}

func (a AddrInfo) String() string {
return fmt.Sprintf("Weight: %d", a.Weight)
}
44 changes: 27 additions & 17 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,24 @@ import (
// TestSubConn implements the SubConn interface, to be used in tests.
type TestSubConn struct {
balancer.SubConn
tcc *BalancerClientConn // the CC that owns this SubConn
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
connectCalled *grpcsync.Event
Addresses []resolver.Address
tcc *BalancerClientConn // the CC that owns this SubConn
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
connectCalled *grpcsync.Event
Addresses []resolver.Address
HealthUpdateDelivered *grpcsync.Event
}

// NewTestSubConn returns a newly initialized SubConn. Typically, subconns
// should be created via TestClientConn.NewSubConn instead, but can be useful
// for some tests.
func NewTestSubConn(id string) *TestSubConn {
return &TestSubConn{
ConnectCh: make(chan struct{}, 1),
connectCalled: grpcsync.NewEvent(),
id: id,
ConnectCh: make(chan struct{}, 1),
connectCalled: grpcsync.NewEvent(),
HealthUpdateDelivered: grpcsync.NewEvent(),
id: id,
}
}

Expand Down Expand Up @@ -93,8 +95,15 @@ func (tsc *TestSubConn) String() string {
return tsc.id
}

// RegisterHealthListener is a no-op.
func (*TestSubConn) RegisterHealthListener(func(balancer.SubConnState)) {}
// RegisterHealthListener is send a READY update to mock a situation when no
// health checking mechanisms are configured.
func (tsc *TestSubConn) RegisterHealthListener(lis func(balancer.SubConnState)) {
// Call the listener in a separate gorouting to avoid deadlocks.
go func() {
lis(balancer.SubConnState{ConnectivityState: connectivity.Ready})
tsc.HealthUpdateDelivered.Fire()
}()
}

// BalancerClientConn is a mock balancer.ClientConn used in tests.
type BalancerClientConn struct {
Expand Down Expand Up @@ -131,12 +140,13 @@ func NewBalancerClientConn(t *testing.T) *BalancerClientConn {
// NewSubConn creates a new SubConn.
func (tcc *BalancerClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &TestSubConn{
tcc: tcc,
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
connectCalled: grpcsync.NewEvent(),
Addresses: a,
tcc: tcc,
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
connectCalled: grpcsync.NewEvent(),
HealthUpdateDelivered: grpcsync.NewEvent(),
Addresses: a,
}
tcc.subConnIdx++
tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)
Expand Down
45 changes: 12 additions & 33 deletions xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,9 +1845,9 @@ func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) {
}
}

// Tests that when we trigger internal connection attempts without picks, we do
// so for only one subchannel at a time.
func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *testing.T) {
// Tests that when we trigger internal connection attempts without picks, we
// keep retrying all the SubConns that haver reported TF previously.
func (s) TestRingHash_ContinuesConnectingWithoutPicksToMultipleSubConnsConcurrently(t *testing.T) {
backends := startTestServiceBackends(t, 1)
unReachableBackends := makeUnreachableBackends(t, 3)

Expand Down Expand Up @@ -1931,8 +1931,8 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes
if !holdNonExistent1.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 1")
}
if holdNonExistent0Again.IsStarted() {
t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.")
if !holdNonExistent0Again.Wait(ctx) {
t.Fatalf("Timeout waiting for re-connection attempt to backend 0")
}
if holdNonExistent2.IsStarted() {
t.Errorf("Got connection attempt to backend 2, expected no connection attempt.")
Expand All @@ -1942,55 +1942,34 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes
}

// Allow the connection attempt to the second address to resume and wait for
// the attempt for the third address. No other connection attempts should
// the attempt for the third address. No new connection attempts should
// be started yet.
holdNonExistent1Again := dialer.Hold(unReachableBackends[1])
holdNonExistent1.Resume()
if !holdNonExistent2.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 2")
}
if holdNonExistent0Again.IsStarted() {
t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.")
}
if holdNonExistent1Again.IsStarted() {
t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.")
if !holdNonExistent1Again.Wait(ctx) {
t.Fatalf("Timeout waiting for re-connection attempt to backend 1")
}
if holdGood.IsStarted() {
t.Errorf("Got connection attempt to good backend, expected no connection attempt.")
}

// Allow the connection attempt to the third address to resume and wait
// for the attempt for the final address. No other connection attempts
// should be started yet.
// for the attempt for the final address.
holdNonExistent2Again := dialer.Hold(unReachableBackends[2])
holdNonExistent2.Resume()
if !holdNonExistent2Again.Wait(ctx) {
t.Fatalf("Timeout waiting for re-connection attempt to backend 2")
}
if !holdGood.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to good backend")
}
if holdNonExistent0Again.IsStarted() {
t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.")
}
if holdNonExistent1Again.IsStarted() {
t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.")
}
if holdNonExistent2Again.IsStarted() {
t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.")
}

// Allow the final attempt to resume.
holdGood.Resume()

// Wait for channel to become connected without any pending RPC.
testutils.AwaitState(ctx, t, conn, connectivity.Ready)

// No other connection attempts should have been started
if holdNonExistent0Again.IsStarted() {
t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.")
}
if holdNonExistent1Again.IsStarted() {
t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.")
}
if holdNonExistent2Again.IsStarted() {
t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.")
}
}
4 changes: 0 additions & 4 deletions xds/internal/balancer/ringhash/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,3 @@ var logger = grpclog.Component("xds")
func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

func subConnPrefixLogger(p *ringhashBalancer, sc *subConn) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)+fmt.Sprintf("[subConn %p] ", sc))
}
Loading

0 comments on commit c6e8b25

Please sign in to comment.