From c6e8b254dd3d4558904d71e2d905c75ce455ad34 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 21 Jan 2025 01:13:25 +0530 Subject: [PATCH] Some more tests required --- balancer/endpointsharding/endpointsharding.go | 26 +- .../endpointsharding/endpointsharding_test.go | 74 ++- balancer/lazy/lazy.go | 17 +- .../weightedroundrobin/weightedroundrobin.go | 8 + internal/testutils/balancer.go | 44 +- .../ringhash/e2e/ringhash_balancer_test.go | 45 +- xds/internal/balancer/ringhash/logging.go | 4 - xds/internal/balancer/ringhash/picker.go | 148 +---- xds/internal/balancer/ringhash/picker_test.go | 202 +----- xds/internal/balancer/ringhash/ring.go | 103 +-- xds/internal/balancer/ringhash/ring_test.go | 37 +- xds/internal/balancer/ringhash/ringhash.go | 589 +++++++----------- .../balancer/ringhash/ringhash_test.go | 481 +++++++++----- 13 files changed, 798 insertions(+), 980 deletions(-) diff --git a/balancer/endpointsharding/endpointsharding.go b/balancer/endpointsharding/endpointsharding.go index 28d8d3ec01b0..1fa92fcc7cf2 100644 --- a/balancer/endpointsharding/endpointsharding.go +++ b/balancer/endpointsharding/endpointsharding.go @@ -60,16 +60,12 @@ func init() { // ChildState is the balancer state of a child along with the endpoint which // identifies the child balancer. type ChildState struct { - Endpoint resolver.Endpoint - State balancer.State - balancerWrapper *balancerWrapper -} + Endpoint resolver.Endpoint + State balancer.State -// ExitIdle pings the child balancer to exit idle state. It calls ExitIdle of -// the child balancer on a separate goroutine, so callers don't need to handle -// synchronous picker updates. -func (cs *ChildState) ExitIdle() { - cs.balancerWrapper.exitIdle() + // Balancer exposes only the ExitIdler interface of the child LB policy. + // Other methods on the child policy are called only by endpointsharding. + Balancer balancer.ExitIdler } // NewBalancer returns a load balancing policy that manages homogeneous child @@ -143,12 +139,17 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState var bal *balancerWrapper if child, ok := children.Get(endpoint); ok { bal = child.(*balancerWrapper) + // Endpoint attributes may have changes, update the stored endpoint. + es.mu.Lock() + bal.childState.Endpoint = endpoint + es.mu.Unlock() } else { bal = &balancerWrapper{ childState: ChildState{Endpoint: endpoint}, ClientConn: es.cc, es: es, } + bal.childState.Balancer = bal bal.Balancer = gracefulswitch.NewBalancer(bal, es.bOpts) } newChildren.Set(endpoint, bal) @@ -324,13 +325,14 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) { bw.childState.State = state bw.es.mu.Unlock() if state.ConnectivityState == connectivity.Idle && bw.es.enableAutoReconnect { - bw.exitIdle() + bw.ExitIdle() } bw.es.updateState() } -// exitIdle pings an IDLE child balancer to exit idle. -func (bw *balancerWrapper) exitIdle() { +// ExitIdle pings an IDLE child balancer to exit idle in a new goroutine to +// avoid deadlocks due to synchronous balancer state updates. +func (bw *balancerWrapper) ExitIdle() { if ei, ok := bw.Balancer.(balancer.ExitIdler); ok { go func() { bw.es.childMu.Lock() diff --git a/balancer/endpointsharding/endpointsharding_test.go b/balancer/endpointsharding/endpointsharding_test.go index bc98979a0b9e..974404d23b2c 100644 --- a/balancer/endpointsharding/endpointsharding_test.go +++ b/balancer/endpointsharding/endpointsharding_test.go @@ -23,6 +23,7 @@ import ( "encoding/json" "fmt" "log" + "strings" "testing" "time" @@ -32,6 +33,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils/roundrobin" @@ -42,6 +44,11 @@ import ( testgrpc "google.golang.org/grpc/interop/grpc_testing" ) +var ( + defaultTestTimeout = time.Second * 10 + defaultTestShortTimeout = time.Millisecond * 10 +) + type s struct { grpctest.Tester } @@ -141,7 +148,7 @@ func (s) TestEndpointShardingBasic(t *testing.T) { log.Fatalf("Failed to create new client: %v", err) } defer cc.Close() - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() client := testgrpc.NewTestServiceClient(cc) // Assert a round robin distribution between the two spun up backends. This @@ -151,3 +158,68 @@ func (s) TestEndpointShardingBasic(t *testing.T) { t.Fatalf("error in expected round robin: %v", err) } } + +// Tests that endpointsharding doesn't automatically re-connect IDLE children. +func (s) TestEndpointShardingReconnectDisabled(t *testing.T) { + backend1 := stubserver.StartTestService(t, nil) + defer backend1.Stop() + backend2 := stubserver.StartTestService(t, nil) + defer backend2.Stop() + backend3 := stubserver.StartTestService(t, nil) + defer backend3.Stop() + + mr := manual.NewBuilderWithScheme("e2e-test") + defer mr.Close() + + name := strings.ReplaceAll(strings.ToLower(t.Name()), "/", "") + bf := stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + bal := endpointsharding.NewBalancerWithoutAutoReconnect(bd.ClientConn, bd.BuildOptions) + bd.Data = bal + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: endpointsharding.PickFirstConfig, + ResolverState: ccs.ResolverState, + }) + }, + Close: func(bd *stub.BalancerData) { + bd.Data.(balancer.Balancer).Close() + }, + } + stub.Register(name, bf) + + json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, name) + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json) + mr.InitialState(resolver.State{ + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: backend1.Address}, {Addr: backend2.Address}}}, + {Addresses: []resolver.Address{{Addr: backend3.Address}}}, + }, + ServiceConfig: sc, + }) + + cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("Failed to create new client: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + client := testgrpc.NewTestServiceClient(cc) + // Assert a round robin distribution between the two spun up backends. This + // requires a poll and eventual consistency as both endpoint children do not + // start in state READY. + if err = roundrobin.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: backend1.Address}, {Addr: backend3.Address}}); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + + // On closing the first server, the first child balancer should enter + // IDLE. Since endpointsharding is configured not to auto-reconnect, it will + // remain IDLE and will not try to connect to the second backend in the same + // endpoint. + backend1.Stop() + if err = roundrobin.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: backend3.Address}}); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } +} diff --git a/balancer/lazy/lazy.go b/balancer/lazy/lazy.go index 41a934ff2749..430e40735fac 100644 --- a/balancer/lazy/lazy.go +++ b/balancer/lazy/lazy.go @@ -109,7 +109,7 @@ func (lb *lazyBalancer) ResolverError(err error) { func (lb *lazyBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { lb.mu.Lock() defer lb.mu.Unlock() - if childLBCfg, ok := ccs.BalancerConfig.(*lbCfg); !ok { + if childLBCfg, ok := ccs.BalancerConfig.(lbCfg); !ok { lb.logger.Errorf("Got LB config of unexpected type: %v", ccs.BalancerConfig) ccs.BalancerConfig = nil } else { @@ -119,12 +119,6 @@ func (lb *lazyBalancer) UpdateClientConnState(ccs balancer.ClientConnState) erro return lb.delegate.UpdateClientConnState(ccs) } - if childLBCfg, ok := ccs.BalancerConfig.(*lbCfg); !ok { - lb.logger.Errorf("Got LB config of unexpected type: %v", ccs.BalancerConfig) - ccs.BalancerConfig = nil - } else { - ccs.BalancerConfig = childLBCfg.childLBCfg - } lb.latestClientConnState = &ccs lb.latestResolverError = nil lb.updateBalancerStateLocked() @@ -147,7 +141,7 @@ func (lb *lazyBalancer) ExitIdle() { } lb.delegate = gracefulswitch.NewBalancer(lb.cc, lb.buildOptions) if lb.latestClientConnState != nil { - if err := lb.UpdateClientConnState(*lb.latestClientConnState); err != nil { + if err := lb.delegate.UpdateClientConnState(*lb.latestClientConnState); err != nil { if err == balancer.ErrBadResolverState { lb.cc.ResolveNow(resolver.ResolveNowOptions{}) } else { @@ -179,10 +173,9 @@ type lbCfg struct { childLBCfg serviceconfig.LoadBalancingConfig } -func (b *builder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { +func (b builder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { jsonReprsentation := &struct { - ChildPolicy json.RawMessage - NoAutoReconnect bool + ChildPolicy json.RawMessage }{} if err := json.Unmarshal(lbConfig, jsonReprsentation); err != nil { return nil, err @@ -191,7 +184,7 @@ func (b *builder) ParseConfig(lbConfig json.RawMessage) (serviceconfig.LoadBalan if err != nil { return nil, err } - return &lbCfg{childLBCfg: childCfg}, nil + return lbCfg{childLBCfg: childCfg}, nil } // idlePicker is used when the SubConn is IDLE and kicks the SubConn into diff --git a/balancer/weightedroundrobin/weightedroundrobin.go b/balancer/weightedroundrobin/weightedroundrobin.go index 258cdd5db280..60950b6d650d 100644 --- a/balancer/weightedroundrobin/weightedroundrobin.go +++ b/balancer/weightedroundrobin/weightedroundrobin.go @@ -71,6 +71,14 @@ func GetAddrInfo(addr resolver.Address) AddrInfo { return ai } +// GetAddrInfoFromEndpoint returns the AddrInfo stored in the Attributes field +// of endpoint. +func GetAddrInfoFromEndpoint(endpoint resolver.Endpoint) AddrInfo { + v := endpoint.Attributes.Value(attributeKey{}) + ai, _ := v.(AddrInfo) + return ai +} + func (a AddrInfo) String() string { return fmt.Sprintf("Weight: %d", a.Weight) } diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index 5a446b147136..5a13150b1d84 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -33,12 +33,13 @@ import ( // TestSubConn implements the SubConn interface, to be used in tests. type TestSubConn struct { balancer.SubConn - tcc *BalancerClientConn // the CC that owns this SubConn - id string - ConnectCh chan struct{} - stateListener func(balancer.SubConnState) - connectCalled *grpcsync.Event - Addresses []resolver.Address + tcc *BalancerClientConn // the CC that owns this SubConn + id string + ConnectCh chan struct{} + stateListener func(balancer.SubConnState) + connectCalled *grpcsync.Event + Addresses []resolver.Address + HealthUpdateDelivered *grpcsync.Event } // NewTestSubConn returns a newly initialized SubConn. Typically, subconns @@ -46,9 +47,10 @@ type TestSubConn struct { // for some tests. func NewTestSubConn(id string) *TestSubConn { return &TestSubConn{ - ConnectCh: make(chan struct{}, 1), - connectCalled: grpcsync.NewEvent(), - id: id, + ConnectCh: make(chan struct{}, 1), + connectCalled: grpcsync.NewEvent(), + HealthUpdateDelivered: grpcsync.NewEvent(), + id: id, } } @@ -93,8 +95,15 @@ func (tsc *TestSubConn) String() string { return tsc.id } -// RegisterHealthListener is a no-op. -func (*TestSubConn) RegisterHealthListener(func(balancer.SubConnState)) {} +// RegisterHealthListener is send a READY update to mock a situation when no +// health checking mechanisms are configured. +func (tsc *TestSubConn) RegisterHealthListener(lis func(balancer.SubConnState)) { + // Call the listener in a separate gorouting to avoid deadlocks. + go func() { + lis(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + tsc.HealthUpdateDelivered.Fire() + }() +} // BalancerClientConn is a mock balancer.ClientConn used in tests. type BalancerClientConn struct { @@ -131,12 +140,13 @@ func NewBalancerClientConn(t *testing.T) *BalancerClientConn { // NewSubConn creates a new SubConn. func (tcc *BalancerClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) { sc := &TestSubConn{ - tcc: tcc, - id: fmt.Sprintf("sc%d", tcc.subConnIdx), - ConnectCh: make(chan struct{}, 1), - stateListener: o.StateListener, - connectCalled: grpcsync.NewEvent(), - Addresses: a, + tcc: tcc, + id: fmt.Sprintf("sc%d", tcc.subConnIdx), + ConnectCh: make(chan struct{}, 1), + stateListener: o.StateListener, + connectCalled: grpcsync.NewEvent(), + HealthUpdateDelivered: grpcsync.NewEvent(), + Addresses: a, } tcc.subConnIdx++ tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc) diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go index afc60924b3f8..2fbe29968d7e 100644 --- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -1845,9 +1845,9 @@ func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) { } } -// Tests that when we trigger internal connection attempts without picks, we do -// so for only one subchannel at a time. -func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *testing.T) { +// Tests that when we trigger internal connection attempts without picks, we +// keep retrying all the SubConns that haver reported TF previously. +func (s) TestRingHash_ContinuesConnectingWithoutPicksToMultipleSubConnsConcurrently(t *testing.T) { backends := startTestServiceBackends(t, 1) unReachableBackends := makeUnreachableBackends(t, 3) @@ -1931,8 +1931,8 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes if !holdNonExistent1.Wait(ctx) { t.Fatalf("Timeout waiting for connection attempt to backend 1") } - if holdNonExistent0Again.IsStarted() { - t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + if !holdNonExistent0Again.Wait(ctx) { + t.Fatalf("Timeout waiting for re-connection attempt to backend 0") } if holdNonExistent2.IsStarted() { t.Errorf("Got connection attempt to backend 2, expected no connection attempt.") @@ -1942,55 +1942,34 @@ func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *tes } // Allow the connection attempt to the second address to resume and wait for - // the attempt for the third address. No other connection attempts should + // the attempt for the third address. No new connection attempts should // be started yet. holdNonExistent1Again := dialer.Hold(unReachableBackends[1]) holdNonExistent1.Resume() if !holdNonExistent2.Wait(ctx) { t.Fatalf("Timeout waiting for connection attempt to backend 2") } - if holdNonExistent0Again.IsStarted() { - t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") - } - if holdNonExistent1Again.IsStarted() { - t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") + if !holdNonExistent1Again.Wait(ctx) { + t.Fatalf("Timeout waiting for re-connection attempt to backend 1") } if holdGood.IsStarted() { t.Errorf("Got connection attempt to good backend, expected no connection attempt.") } // Allow the connection attempt to the third address to resume and wait - // for the attempt for the final address. No other connection attempts - // should be started yet. + // for the attempt for the final address. holdNonExistent2Again := dialer.Hold(unReachableBackends[2]) holdNonExistent2.Resume() + if !holdNonExistent2Again.Wait(ctx) { + t.Fatalf("Timeout waiting for re-connection attempt to backend 2") + } if !holdGood.Wait(ctx) { t.Fatalf("Timeout waiting for connection attempt to good backend") } - if holdNonExistent0Again.IsStarted() { - t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") - } - if holdNonExistent1Again.IsStarted() { - t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") - } - if holdNonExistent2Again.IsStarted() { - t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.") - } // Allow the final attempt to resume. holdGood.Resume() // Wait for channel to become connected without any pending RPC. testutils.AwaitState(ctx, t, conn, connectivity.Ready) - - // No other connection attempts should have been started - if holdNonExistent0Again.IsStarted() { - t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") - } - if holdNonExistent1Again.IsStarted() { - t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") - } - if holdNonExistent2Again.IsStarted() { - t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.") - } } diff --git a/xds/internal/balancer/ringhash/logging.go b/xds/internal/balancer/ringhash/logging.go index 3e0f0adf58eb..64a1d467f554 100644 --- a/xds/internal/balancer/ringhash/logging.go +++ b/xds/internal/balancer/ringhash/logging.go @@ -32,7 +32,3 @@ var logger = grpclog.Component("xds") func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger { return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) } - -func subConnPrefixLogger(p *ringhashBalancer, sc *subConn) *internalgrpclog.PrefixLogger { - return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)+fmt.Sprintf("[subConn %p] ", sc)) -} diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index 5ce72caded48..91ce367d4419 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -19,143 +19,51 @@ package ringhash import ( - "fmt" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpclog" - "google.golang.org/grpc/status" ) type picker struct { - ring *ring - logger *grpclog.PrefixLogger - subConnStates map[*subConn]connectivity.State + ring *ring + logger *grpclog.PrefixLogger + // endpointStates is a cache of endpoint connectivity states and pickers. + endpointStates map[*endpointState]balancer.State } func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker { - states := make(map[*subConn]connectivity.State) + states := make(map[*endpointState]balancer.State) for _, e := range ring.items { - states[e.sc] = e.sc.effectiveState() - } - return &picker{ring: ring, logger: logger, subConnStates: states} -} - -// handleRICSResult is the return type of handleRICS. It's needed to wrap the -// returned error from Pick() in a struct. With this, if the return values are -// `balancer.PickResult, error, bool`, linter complains because error is not the -// last return value. -type handleRICSResult struct { - pr balancer.PickResult - err error -} - -// handleRICS generates pick result if the entry is in Ready, Idle, Connecting -// or Shutdown. TransientFailure will be handled specifically after this -// function returns. -// -// The first return value indicates if the state is in Ready, Idle, Connecting -// or Shutdown. If it's true, the PickResult and error should be returned from -// Pick() as is. -func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { - switch state := p.subConnStates[e.sc]; state { - case connectivity.Ready: - return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true - case connectivity.Idle: - // Trigger Connect() and queue the pick. - e.sc.queueConnect() - return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true - case connectivity.Connecting: - return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true - case connectivity.TransientFailure: - // Return ok==false, so TransientFailure will be handled afterwards. - return handleRICSResult{}, false - case connectivity.Shutdown: - // Shutdown can happen in a race where the old picker is called. A new - // picker should already be sent. - return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true - default: - // Should never reach this. All the connectivity states are already - // handled in the cases. - p.logger.Errorf("SubConn has undefined connectivity state: %v", state) - return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true + e.endpointState.mu.RLock() + states[e.endpointState] = e.endpointState.state + e.endpointState.mu.RUnlock() } + return &picker{ring: ring, logger: logger, endpointStates: states} } func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { e := p.ring.pick(getRequestHash(info.Ctx)) - if hr, ok := p.handleRICS(e); ok { - return hr.pr, hr.err - } - // ok was false, the entry is in transient failure. - return p.handleTransientFailure(e) -} - -func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, error) { - // Queue a connect on the first picked SubConn. - e.sc.queueConnect() - - // Find next entry in the ring, skipping duplicate SubConns. - e2 := nextSkippingDuplicates(p.ring, e) - if e2 == nil { - // There's no next entry available, fail the pick. - return balancer.PickResult{}, fmt.Errorf("the only SubConn is in Transient Failure") - } - - // For the second SubConn, also check Ready/Idle/Connecting as if it's the - // first entry. - if hr, ok := p.handleRICS(e2); ok { - return hr.pr, hr.err - } - - // The second SubConn is also in TransientFailure. Queue a connect on it. - e2.sc.queueConnect() - - // If it gets here, this is after the second SubConn, and the second SubConn - // was in TransientFailure. - // - // Loop over all other SubConns: - // - If all SubConns so far are all TransientFailure, trigger Connect() on - // the TransientFailure SubConns, and keep going. - // - If there's one SubConn that's not in TransientFailure, keep checking - // the remaining SubConns (in case there's a Ready, which will be returned), - // but don't not trigger Connect() on the other SubConns. - var firstNonFailedFound bool - for ee := nextSkippingDuplicates(p.ring, e2); ee != e; ee = nextSkippingDuplicates(p.ring, ee) { - scState := p.subConnStates[ee.sc] - if scState == connectivity.Ready { - return balancer.PickResult{SubConn: ee.sc.sc}, nil - } - if firstNonFailedFound { - continue - } - if scState == connectivity.TransientFailure { - // This will queue a connect. - ee.sc.queueConnect() - continue - } - // This is a SubConn in a non-failure state. We continue to check the - // other SubConns, but remember that there was a non-failed SubConn - // seen. After this, Pick() will never trigger any SubConn to Connect(). - firstNonFailedFound = true - if scState == connectivity.Idle { - // This is the first non-failed SubConn, and it is in a real Idle - // state. Trigger it to Connect(). - ee.sc.queueConnect() + ringSize := len(p.ring.items) + // Per gRFC A61, because of sticky-TF with PickFirst's auto reconnect on TF, + // we ignore all TF subchannels and find the first ring entry in READY, + // CONNECTING or IDLE. If that entry is in IDLE, we need to initiate a + // connection. The idlePicker returned by the LazyLB or the new Pickfirst + // should do this automatically. + for i := 0; i < ringSize; i++ { + index := (e.idx + i) % ringSize + balState := p.balancerState(p.ring.items[index]) + switch balState.ConnectivityState { + case connectivity.Ready, connectivity.Connecting, connectivity.Idle: + return balState.Picker.Pick(info) + case connectivity.TransientFailure: + default: + p.logger.Errorf("Found child balancer in unknown state: %v", balState.ConnectivityState) } } - return balancer.PickResult{}, fmt.Errorf("no connection is Ready") + // All children are in transient failure. Return the first failure. + return p.balancerState(e).Picker.Pick(info) } -// nextSkippingDuplicates finds the next entry in the ring, with a different -// subconn from the given entry. -func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry { - for next := ring.next(entry); next != entry; next = ring.next(next) { - if next.sc != entry.sc { - return next - } - } - // There's no qualifying next entry. - return nil +func (p *picker) balancerState(e *ringEntry) balancer.State { + return p.endpointStates[e.endpointState] } diff --git a/xds/internal/balancer/ringhash/picker_test.go b/xds/internal/balancer/ringhash/picker_test.go index 67c4f3cc9315..470d12d44d4f 100644 --- a/xds/internal/balancer/ringhash/picker_test.go +++ b/xds/internal/balancer/ringhash/picker_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" @@ -40,6 +39,27 @@ func init() { } } +// fakePicker is used to mock pickers from child pickfirst balancers. +type fakePicker struct { + connectivityState connectivity.State + subConn *testutils.TestSubConn + tfError error +} + +func (p *fakePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + switch p.connectivityState { + case connectivity.Idle: + p.subConn.Connect() + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + case connectivity.Connecting: + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + case connectivity.Ready: + return balancer.PickResult{SubConn: p.subConn}, nil + default: + return balancer.PickResult{}, p.tfError + } +} + func newTestRing(cStats []connectivity.State) *ring { var items []*ringEntry for i, st := range cStats { @@ -47,10 +67,16 @@ func newTestRing(cStats []connectivity.State) *ring { items = append(items, &ringEntry{ idx: i, hash: uint64((i + 1) * 10), - sc: &subConn{ - addr: testSC.String(), - sc: testSC, - state: st, + endpointState: &endpointState{ + firstAddr: testSC.String(), + state: balancer.State{ + ConnectivityState: st, + Picker: &fakePicker{ + connectivityState: st, + tfError: fmt.Errorf("%d", i), + subConn: testSC, + }, + }, }, }) } @@ -128,169 +154,3 @@ func (s) TestPickerPickFirstTwo(t *testing.T) { }) } } - -// TestPickerPickTriggerTFConnect covers that if the picked SubConn is -// TransientFailures, all SubConns until a non-TransientFailure are queued for -// Connect(). -func (s) TestPickerPickTriggerTFConnect(t *testing.T) { - ring := newTestRing([]connectivity.State{ - connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, - connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, - }) - p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) - _, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)}) - if err == nil { - t.Fatalf("Pick() error = %v, want non-nil", err) - } - // The first 4 SubConns, all in TransientFailure, should be queued to - // connect. - for i := 0; i < 4; i++ { - it := ring.items[i] - if !it.sc.connectQueued { - t.Errorf("the %d-th SubConn is not queued for connect", i) - } - } - // The other SubConns, after the first Idle, should not be queued to - // connect. - for i := 5; i < len(ring.items); i++ { - it := ring.items[i] - if it.sc.connectQueued { - t.Errorf("the %d-th SubConn is unexpected queued for connect", i) - } - } -} - -// TestPickerPickTriggerTFReturnReady covers that if the picked SubConn is -// TransientFailure, SubConn 2 and 3 are TransientFailure, 4 is Ready. SubConn 2 -// and 3 will Connect(), and 4 will be returned. -func (s) TestPickerPickTriggerTFReturnReady(t *testing.T) { - ring := newTestRing([]connectivity.State{ - connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Ready, - }) - p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) - pr, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)}) - if err != nil { - t.Fatalf("Pick() error = %v, want nil", err) - } - if wantSC := testSubConns[3]; pr.SubConn != wantSC { - t.Fatalf("Pick() = %v, want %v", pr.SubConn, wantSC) - } - // The first 3 SubConns, all in TransientFailure, should be queued to - // connect. - for i := 0; i < 3; i++ { - it := ring.items[i] - if !it.sc.connectQueued { - t.Errorf("the %d-th SubConn is not queued for connect", i) - } - } -} - -// TestPickerPickTriggerTFWithIdle covers that if the picked SubConn is -// TransientFailure, SubConn 2 is TransientFailure, 3 is Idle (init Idle). Pick -// will be queue, SubConn 3 will Connect(), SubConn 4 and 5 (in TransientFailure) -// will not queue a Connect. -func (s) TestPickerPickTriggerTFWithIdle(t *testing.T) { - ring := newTestRing([]connectivity.State{ - connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure, - }) - p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test")) - _, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)}) - if err == balancer.ErrNoSubConnAvailable { - t.Fatalf("Pick() error = %v, want %v", err, balancer.ErrNoSubConnAvailable) - } - // The first 2 SubConns, all in TransientFailure, should be queued to - // connect. - for i := 0; i < 2; i++ { - it := ring.items[i] - if !it.sc.connectQueued { - t.Errorf("the %d-th SubConn is not queued for connect", i) - } - } - // SubConn 3 was in Idle, so should Connect() - select { - case <-testSubConns[2].ConnectCh: - case <-time.After(defaultTestShortTimeout): - t.Errorf("timeout waiting for Connect() from SubConn %v", testSubConns[2]) - } - // The other SubConns, after the first Idle, should not be queued to - // connect. - for i := 3; i < len(ring.items); i++ { - it := ring.items[i] - if it.sc.connectQueued { - t.Errorf("the %d-th SubConn is unexpected queued for connect", i) - } - } -} - -func (s) TestNextSkippingDuplicatesNoDup(t *testing.T) { - testRing := newTestRing([]connectivity.State{connectivity.Idle, connectivity.Idle}) - tests := []struct { - name string - ring *ring - cur *ringEntry - want *ringEntry - }{ - { - name: "no dup", - ring: testRing, - cur: testRing.items[0], - want: testRing.items[1], - }, - { - name: "only one entry", - ring: &ring{items: []*ringEntry{testRing.items[0]}}, - cur: testRing.items[0], - want: nil, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := nextSkippingDuplicates(tt.ring, tt.cur); !cmp.Equal(got, tt.want, cmpOpts) { - t.Errorf("nextSkippingDuplicates() = %v, want %v", got, tt.want) - } - }) - } -} - -// addDups adds duplicates of items[0] to the ring. -func addDups(r *ring, count int) *ring { - var ( - items []*ringEntry - idx int - ) - for i, it := range r.items { - itt := *it - itt.idx = idx - items = append(items, &itt) - idx++ - if i == 0 { - // Add duplicate of items[0] to the ring - for j := 0; j < count; j++ { - itt2 := *it - itt2.idx = idx - items = append(items, &itt2) - idx++ - } - } - } - return &ring{items: items} -} - -func (s) TestNextSkippingDuplicatesMoreDup(t *testing.T) { - testRing := newTestRing([]connectivity.State{connectivity.Idle, connectivity.Idle}) - // Make a new ring with duplicate SubConns. - dupTestRing := addDups(testRing, 3) - if got := nextSkippingDuplicates(dupTestRing, dupTestRing.items[0]); !cmp.Equal(got, dupTestRing.items[len(dupTestRing.items)-1], cmpOpts) { - t.Errorf("nextSkippingDuplicates() = %v, want %v", got, dupTestRing.items[len(dupTestRing.items)-1]) - } -} - -func (s) TestNextSkippingDuplicatesOnlyDup(t *testing.T) { - testRing := newTestRing([]connectivity.State{connectivity.Idle}) - // Make a new ring with only duplicate SubConns. - dupTestRing := addDups(testRing, 3) - // This ring only has duplicates of items[0], should return nil. - if got := nextSkippingDuplicates(dupTestRing, dupTestRing.items[0]); got != nil { - t.Errorf("nextSkippingDuplicates() = %v, want nil", got) - } -} diff --git a/xds/internal/balancer/ringhash/ring.go b/xds/internal/balancer/ringhash/ring.go index 45dbb2d2a83f..3eeaf48344ca 100644 --- a/xds/internal/balancer/ringhash/ring.go +++ b/xds/internal/balancer/ringhash/ring.go @@ -32,24 +32,25 @@ type ring struct { items []*ringEntry } -type subConnWithWeight struct { - sc *subConn - weight float64 +type endpointWithWeight struct { + firstAddr string + endpointState *endpointState + weight float64 } type ringEntry struct { - idx int - hash uint64 - sc *subConn + idx int + hash uint64 + endpointState *endpointState } -// newRing creates a ring from the subConns stored in the AddressMap. The ring +// newRing creates a ring from the endpoints stored in the EndpointMap. The ring // size is limited by the passed in max/min. // -// ring entries will be created for each subConn, and subConn with high weight -// (specified by the address) may have multiple entries. +// ring entries will be created for each endpoint, and endpoints with high +// weight (specified by the address) may have multiple entries. // -// For example, for subConns with weights {a:3, b:3, c:4}, a generated ring of +// For example, for endpoints with weights {a:3, b:3, c:4}, a generated ring of // size 10 could be: // - {idx:0 hash:3689675255460411075 b} // - {idx:1 hash:4262906501694543955 c} @@ -65,16 +66,16 @@ type ringEntry struct { // To pick from a ring, a binary search will be done for the given target hash, // and first item with hash >= given hash will be returned. // -// Must be called with a non-empty subConns map. -func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64, logger *grpclog.PrefixLogger) *ring { +// Must be called with a non-empty endpoints map. +func newRing(endpoints *resolver.EndpointMap, minRingSize, maxRingSize uint64, logger *grpclog.PrefixLogger) *ring { if logger.V(2) { - logger.Infof("newRing: number of subConns is %d, minRingSize is %d, maxRingSize is %d", subConns.Len(), minRingSize, maxRingSize) + logger.Infof("newRing: number of endpoints is %d, minRingSize is %d, maxRingSize is %d", endpoints.Len(), minRingSize, maxRingSize) } // https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114 - normalizedWeights, minWeight := normalizeWeights(subConns) + normalizedWeights, minWeight := normalizeWeights(endpoints) if logger.V(2) { - logger.Infof("newRing: normalized subConn weights is %v", normalizedWeights) + logger.Infof("newRing: normalized endpoint weights is %v", normalizedWeights) } // Normalized weights for {3,3,4} is {0.3,0.3,0.4}. @@ -99,16 +100,16 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64, log // A hash is generated for each item, and later the results will be sorted // based on the hash. var currentHashes, targetHashes float64 - for _, scw := range normalizedWeights { - targetHashes += scale * scw.weight + for _, epw := range normalizedWeights { + targetHashes += scale * epw.weight // This index ensures that ring entries corresponding to the same // address hash to different values. And since this index is // per-address, these entries hash to the same value across address // updates. idx := 0 for currentHashes < targetHashes { - h := xxhash.Sum64String(scw.sc.addr + "_" + strconv.Itoa(idx)) - items = append(items, &ringEntry{hash: h, sc: scw.sc}) + h := xxhash.Sum64String(epw.firstAddr + "_" + strconv.Itoa(idx)) + items = append(items, &ringEntry{hash: h, endpointState: epw.endpointState}) idx++ currentHashes++ } @@ -122,45 +123,51 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64, log return &ring{items: items} } -// normalizeWeights calculates the normalized weights for each subConn in the -// given subConns map. It returns a slice of subConnWithWeight structs, where -// each struct contains a subConn and its corresponding weight. The function -// also returns the minimum weight among all subConns. +// normalizeWeights calculates the normalized weights for each endpoint in the +// given endpoints map. It returns a slice of endpointWithState structs, where +// each struct contains the picker for an endpoint and its corresponding weight. +// The function also returns the minimum weight among all endpoints. // -// The normalized weight of each subConn is calculated by dividing its weight -// attribute by the sum of all subConn weights. If the weight attribute is not -// found on the address, a default weight of 1 is used. +// The normalized weight of each endpoint is calculated by dividing its weight +// attribute by the sum of all endpoint weights. If the weight attribute is not +// found on the endpoint, a default weight of 1 is used. // -// The addresses are sorted in ascending order to ensure consistent results. +// The endpoints are sorted in ascending order to ensure consistent results. // -// Must be called with a non-empty subConns map. -func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) { +// Must be called with a non-empty endpoints map. +func normalizeWeights(endpoints *resolver.EndpointMap) ([]endpointWithWeight, float64) { var weightSum uint32 - // Since attributes are explicitly ignored in the AddressMap key, we need to - // iterate over the values to get the weights. - scVals := subConns.Values() - for _, a := range scVals { - weightSum += a.(*subConn).weight + // Since attributes are explicitly ignored in the EndpointMap key, we need + // to iterate over the values to get the weights. + endpointVals := endpoints.Values() + for _, a := range endpointVals { + weightSum += a.(*endpointState).weight } - ret := make([]subConnWithWeight, 0, subConns.Len()) + ret := make([]endpointWithWeight, 0, endpoints.Len()) min := 1.0 - for _, a := range scVals { - scInfo := a.(*subConn) - // (*subConn).weight is set to 1 if the weight attribute is not found on - // the address. And since this function is guaranteed to be called with - // a non-empty subConns map, weightSum is guaranteed to be non-zero. So, - // we need not worry about divide by zero error here. - nw := float64(scInfo.weight) / float64(weightSum) - ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw}) + for _, a := range endpointVals { + epInfo := a.(*endpointState) + // (*endpointState).weight is set to 1 if the weight attribute is not + // found on the endpoint. And since this function is guaranteed to be + // called with a non-empty endpoints map, weightSum is guaranteed to be + // non-zero. So, we need not worry about divide by zero error here. + nw := float64(epInfo.weight) / float64(weightSum) + ret = append(ret, endpointWithWeight{ + endpointState: epInfo, + weight: nw, + firstAddr: epInfo.firstAddr, + }) min = math.Min(min, nw) } - // Sort the addresses to return consistent results. + // Sort the endpoints to return consistent results. // // Note: this might not be necessary, but this makes sure the ring is - // consistent as long as the addresses are the same, for example, in cases - // where an address is added and then removed, the RPCs will still pick the - // same old SubConn. - sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr }) + // consistent as long as the endpoints are the same, for example, in cases + // where an endpoint is added and then removed, the RPCs will still pick the + // same old endpoint. + sort.Slice(ret, func(i, j int) bool { + return ret[i].firstAddr < ret[j].firstAddr + }) return ret, min } diff --git a/xds/internal/balancer/ringhash/ring_test.go b/xds/internal/balancer/ringhash/ring_test.go index 8bca19baebb6..e08004c19d2e 100644 --- a/xds/internal/balancer/ringhash/ring_test.go +++ b/xds/internal/balancer/ringhash/ring_test.go @@ -28,23 +28,24 @@ import ( "google.golang.org/grpc/resolver" ) -var testAddrs []resolver.Address -var testSubConnMap *resolver.AddressMap +var testEndpoints []resolver.Endpoint +var testEndpointStateMap *resolver.EndpointMap func init() { - testAddrs = []resolver.Address{ - testAddr("a", 3), - testAddr("b", 3), - testAddr("c", 4), + testEndpoints = []resolver.Endpoint{ + testEndpoint("a", 3), + testEndpoint("b", 3), + testEndpoint("c", 4), } - testSubConnMap = resolver.NewAddressMap() - testSubConnMap.Set(testAddrs[0], &subConn{addr: "a", weight: 3}) - testSubConnMap.Set(testAddrs[1], &subConn{addr: "b", weight: 3}) - testSubConnMap.Set(testAddrs[2], &subConn{addr: "c", weight: 4}) + testEndpointStateMap = resolver.NewEndpointMap() + testEndpointStateMap.Set(testEndpoints[0], &endpointState{firstAddr: "a", weight: 3}) + testEndpointStateMap.Set(testEndpoints[1], &endpointState{firstAddr: "b", weight: 3}) + testEndpointStateMap.Set(testEndpoints[2], &endpointState{firstAddr: "c", weight: 4}) } -func testAddr(addr string, weight uint32) resolver.Address { - return weightedroundrobin.SetAddrInfo(resolver.Address{Addr: addr}, weightedroundrobin.AddrInfo{Weight: weight}) +func testEndpoint(addr string, weight uint32) resolver.Endpoint { + ep := resolver.Endpoint{Addresses: []resolver.Address{{Addr: addr}}} + return weightedroundrobin.SetAddrInfoInEndpoint(ep, weightedroundrobin.AddrInfo{Weight: weight}) } func (s) TestRingNew(t *testing.T) { @@ -52,20 +53,20 @@ func (s) TestRingNew(t *testing.T) { for _, min := range []uint64{3, 4, 6, 8} { for _, max := range []uint64{20, 8} { t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) { - r := newRing(testSubConnMap, min, max, nil) + r := newRing(testEndpointStateMap, min, max, nil) totalCount := len(r.items) if totalCount < int(min) || totalCount > int(max) { t.Fatalf("unexpected size %v, want min %v, max %v", totalCount, min, max) } - for _, a := range testAddrs { + for _, e := range testEndpoints { var count int for _, ii := range r.items { - if ii.sc.addr == a.Addr { + if ii.endpointState.firstAddr == e.Addresses[0].Addr { count++ } } got := float64(count) / float64(totalCount) - want := float64(getWeightAttribute(a)) / totalWeight + want := float64(getWeightAttribute(e)) / totalWeight if !equalApproximately(got, want) { t.Fatalf("unexpected item weight in ring: %v != %v", got, want) } @@ -82,7 +83,7 @@ func equalApproximately(x, y float64) bool { } func (s) TestRingPick(t *testing.T) { - r := newRing(testSubConnMap, 10, 20, nil) + r := newRing(testEndpointStateMap, 10, 20, nil) for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} { t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) { e := r.pick(h) @@ -100,7 +101,7 @@ func (s) TestRingPick(t *testing.T) { } func (s) TestRingNext(t *testing.T) { - r := newRing(testSubConnMap, 10, 20, nil) + r := newRing(testEndpointStateMap, 10, 20, nil) for _, e := range r.items { ne := r.next(e) diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 8c44f19c3b14..6bdabf1b138f 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -27,6 +27,9 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/balancer/endpointsharding" + "google.golang.org/grpc/balancer/lazy" + "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpclog" @@ -38,20 +41,29 @@ import ( // Name is the name of the ring_hash balancer. const Name = "ring_hash_experimental" +var lazyPickfirstConfig serviceconfig.LoadBalancingConfig + func init() { + var err error + lazyPickfirstConfig, err = endpointsharding.ParseConfig(json.RawMessage(fmt.Sprintf(`[{ + "%s": { + "childPolicy": [{"%s": {}}] + } + }]`, lazy.Name, pickfirstleaf.Name))) + if err != nil { + logger.Fatal(err) + } balancer.Register(bb{}) } type bb struct{} -func (bb) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { +func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { b := &ringhashBalancer{ - cc: cc, - subConns: resolver.NewAddressMap(), - scStates: make(map[balancer.SubConn]*subConn), - csEvltr: &connectivityStateEvaluator{}, - orderedSubConns: make([]*subConn, 0), + ClientConn: cc, + endpointStates: resolver.NewEndpointMap(), } + b.child = endpointsharding.NewBalancerWithoutAutoReconnect(b, opts) b.logger = prefixLogger(b) b.logger.Infof("Created") return b @@ -65,450 +77,256 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err return parseConfig(c) } -type subConn struct { - addr string - weight uint32 - sc balancer.SubConn - logger *grpclog.PrefixLogger - - mu sync.RWMutex - // This is the actual state of this SubConn (as updated by the ClientConn). - // The effective state can be different, see comment of attemptedToConnect. - state connectivity.State - // failing is whether this SubConn is in a failing state. A subConn is - // considered to be in a failing state if it was previously in - // TransientFailure. - // - // This affects the effective connectivity state of this SubConn, e.g. - // - if the actual state is Idle or Connecting, but this SubConn is failing, - // the effective state is TransientFailure. - // - // This is used in pick(). E.g. if a subConn is Idle, but has failing as - // true, pick() will - // - consider this SubConn as TransientFailure, and check the state of the - // next SubConn. - // - trigger Connect() (note that normally a SubConn in real - // TransientFailure cannot Connect()) - // - // A subConn starts in non-failing (failing is false). A transition to - // TransientFailure sets failing to true (and it stays true). A transition - // to Ready sets failing to false. - failing bool - // connectQueued is true if a Connect() was queued for this SubConn while - // it's not in Idle (most likely was in TransientFailure). A Connect() will - // be triggered on this SubConn when it turns Idle. - // - // When connectivity state is updated to Idle for this SubConn, if - // connectQueued is true, Connect() will be called on the SubConn. - connectQueued bool - // attemptingToConnect indicates if this subconn is attempting to connect. - // It's set when queueConnect is called. It's unset when the state is - // changed to Ready/Shutdown, or Idle (and if connectQueued is false). - attemptingToConnect bool -} - -// setState updates the state of this SubConn. -// -// It also handles the queued Connect(). If the new state is Idle, and a -// Connect() was queued, this SubConn will be triggered to Connect(). -func (sc *subConn) setState(s connectivity.State) { - sc.mu.Lock() - defer sc.mu.Unlock() - switch s { - case connectivity.Idle: - // Trigger Connect() if new state is Idle, and there is a queued connect. - if sc.connectQueued { - sc.connectQueued = false - sc.logger.Infof("Executing a queued connect for subConn moving to state: %v", sc.state) - sc.sc.Connect() - } else { - sc.attemptingToConnect = false - } - case connectivity.Connecting: - // Clear connectQueued if the SubConn isn't failing. This state - // transition is unlikely to happen, but handle this just in case. - sc.connectQueued = false - case connectivity.Ready: - // Clear connectQueued if the SubConn isn't failing. This state - // transition is unlikely to happen, but handle this just in case. - sc.connectQueued = false - sc.attemptingToConnect = false - // Set to a non-failing state. - sc.failing = false - case connectivity.TransientFailure: - // Set to a failing state. - sc.failing = true - case connectivity.Shutdown: - sc.attemptingToConnect = false - } - sc.state = s -} - -// effectiveState returns the effective state of this SubConn. It can be -// different from the actual state, e.g. Idle while the subConn is failing is -// considered TransientFailure. Read comment of field failing for other cases. -func (sc *subConn) effectiveState() connectivity.State { - sc.mu.RLock() - defer sc.mu.RUnlock() - if sc.failing && (sc.state == connectivity.Idle || sc.state == connectivity.Connecting) { - return connectivity.TransientFailure - } - return sc.state -} - -// queueConnect sets a boolean so that when the SubConn state changes to Idle, -// it's Connect() will be triggered. If the SubConn state is already Idle, it -// will just call Connect(). -func (sc *subConn) queueConnect() { - sc.mu.Lock() - defer sc.mu.Unlock() - sc.attemptingToConnect = true - if sc.state == connectivity.Idle { - sc.logger.Infof("Executing a queued connect for subConn in state: %v", sc.state) - sc.sc.Connect() - return - } - // Queue this connect, and when this SubConn switches back to Idle (happens - // after backoff in TransientFailure), it will Connect(). - sc.logger.Infof("Queueing a connect for subConn in state: %v", sc.state) - sc.connectQueued = true -} - -func (sc *subConn) isAttemptingToConnect() bool { - sc.mu.Lock() - defer sc.mu.Unlock() - return sc.attemptingToConnect -} - type ringhashBalancer struct { - cc balancer.ClientConn + balancer.ClientConn logger *grpclog.PrefixLogger - - config *LBConfig - subConns *resolver.AddressMap // Map from resolver.Address to `*subConn`. - scStates map[balancer.SubConn]*subConn - - // ring is always in sync with subConns. When subConns change, a new ring is - // generated. Note that address weights updates (they are keys in the - // subConns map) also regenerates the ring. - ring *ring - picker balancer.Picker - csEvltr *connectivityStateEvaluator - state connectivity.State - - resolverErr error // the last error reported by the resolver; cleared on successful resolution - connErr error // the last connection error; cleared upon leaving TransientFailure - - // orderedSubConns contains the list of subconns in the order that addresses - // appear from the resolver. Together with lastInternallyTriggeredSCIndex, - // this allows triggering connection attempts to all SubConns independently - // of the order they appear on the ring. Always in sync with ring and - // subConns. The index is reset when addresses change. - orderedSubConns []*subConn - lastInternallyTriggeredSCIndex int + child balancer.Balancer + config *LBConfig + + mu sync.Mutex + inhibitChildUpdates bool + shouldRegenerateRing bool + endpointStates *resolver.EndpointMap // Map from endpoint -> *endpointState + orderedEndpoints []resolver.Endpoint + + // ring is always in sync with endpoints. When endpoints change, a new ring + // is generated. Note that address weights updates also regenerates the + // ring. + ring *ring } -// updateAddresses creates new SubConns and removes SubConns, based on the -// address update. -// -// The return value is whether the new address list is different from the -// previous. True if -// - an address was added -// - an address was removed -// - an address's weight was updated -// -// Note that this function doesn't trigger SubConn connecting, so all the new -// SubConn states are Idle. -func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { - var addrsUpdated bool - // addrsSet is the set converted from addrs, used for quick lookup. - addrsSet := resolver.NewAddressMap() - - b.orderedSubConns = b.orderedSubConns[:0] // reuse the underlying array. - - for _, addr := range addrs { - addrsSet.Set(addr, true) - newWeight := getWeightAttribute(addr) - if val, ok := b.subConns.Get(addr); !ok { - var sc balancer.SubConn - opts := balancer.NewSubConnOptions{ - HealthCheckEnabled: true, - StateListener: func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }, - } - sc, err := b.cc.NewSubConn([]resolver.Address{addr}, opts) - if err != nil { - b.logger.Warningf("Failed to create new SubConn: %v", err) - continue - } - scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc} - scs.logger = subConnPrefixLogger(b, scs) - scs.setState(connectivity.Idle) - b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle) - b.subConns.Set(addr, scs) - b.scStates[sc] = scs - b.orderedSubConns = append(b.orderedSubConns, scs) - addrsUpdated = true +// UpdateState intercepts child balancer state updates. It updates the +// per-endpoint state stored in the ring, and also the aggregated state based on +// the child picker. It also reconciles the endpoint list. It sets +// `b.shouldRegenerateRing` to true if the new endpoints list is different from +// the previous, i.e either of the following is true: +// - an endpoint was added +// - an endpoint was removed +// - an endpoint's weight was updated +func (b *ringhashBalancer) UpdateState(state balancer.State) { + b.mu.Lock() + defer b.mu.Unlock() + childStates := endpointsharding.ChildStatesFromPicker(state.Picker) + // endpointsSet is the set converted from endpoints, used for quick lookup. + endpointsSet := resolver.NewEndpointMap() + + for _, childState := range childStates { + endpoint := childState.Endpoint + endpointsSet.Set(endpoint, true) + newWeight := getWeightAttribute(endpoint) + var es *endpointState + if val, ok := b.endpointStates.Get(endpoint); !ok { + es = &endpointState{} + b.endpointStates.Set(endpoint, es) + b.shouldRegenerateRing = true + // Set the first address only during creation time since it's hash + // is used to create the ring. Even if the address ordering changes + // in subsequent resolver updates, the endpoint hash should remain + // the same. + es.firstAddr = endpoint.Addresses[0].Addr + es.balancer = childState.Balancer } else { - // We have seen this address before and created a subConn for it. If the - // weight associated with the address has changed, update the subConns map - // with the new weight. This will be used when a new ring is created. - // - // There is no need to call UpdateAddresses on the subConn at this point - // since *only* the weight attribute has changed, and that does not affect - // subConn uniqueness. - scInfo := val.(*subConn) - b.orderedSubConns = append(b.orderedSubConns, scInfo) - if oldWeight := scInfo.weight; oldWeight != newWeight { - scInfo.weight = newWeight - b.subConns.Set(addr, scInfo) - // Return true to force recreation of the ring. - addrsUpdated = true + // We have seen this endpoint before and created a `endpointState` + // object for it. If the weight associated with the endpoint has + // changed, update the endpoint state map with the new weight. + // This will be used when a new ring is created. + es = val.(*endpointState) + if oldWeight := es.weight; oldWeight != newWeight { + b.shouldRegenerateRing = true } } + es.weight = newWeight + es.mu.Lock() + es.state = childState.State + es.mu.Unlock() } - for _, addr := range b.subConns.Keys() { - // addr was removed by resolver. - if _, ok := addrsSet.Get(addr); !ok { - v, _ := b.subConns.Get(addr) - scInfo := v.(*subConn) - scInfo.sc.Shutdown() - b.subConns.Delete(addr) - addrsUpdated = true - // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. - // The entry will be deleted in updateSubConnState. + + for _, endpoint := range b.endpointStates.Keys() { + if _, ok := endpointsSet.Get(endpoint); ok { + continue } + // endpoint was removed by resolver. + b.endpointStates.Delete(endpoint) + b.shouldRegenerateRing = true } - if addrsUpdated { - b.lastInternallyTriggeredSCIndex = 0 - } - return addrsUpdated + + b.updatePickerLocked() } -func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error { +func (b *ringhashBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { if b.logger.V(2) { - b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig)) + b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(ccs.BalancerConfig)) } - newConfig, ok := s.BalancerConfig.(*LBConfig) + newConfig, ok := ccs.BalancerConfig.(*LBConfig) if !ok { - return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) + return fmt.Errorf("unexpected balancer config with type: %T", ccs.BalancerConfig) } - // If addresses were updated, whether it resulted in SubConn - // creation/deletion, or just weight update, we need to regenerate the ring - // and send a new picker. - regenerateRing := b.updateAddresses(s.ResolverState.Addresses) - - // If the ring configuration has changed, we need to regenerate the ring and - // send a new picker. - if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize { - regenerateRing = true + b.mu.Lock() + b.inhibitChildUpdates = true + // Save the endpoint list. It's used to try IDLE endpoints when previous + // endpoints have reported TRANSIENT_FAILURE. + b.orderedEndpoints = ccs.ResolverState.Endpoints + b.mu.Unlock() + + defer func() { + b.mu.Lock() + b.inhibitChildUpdates = false + b.updatePickerLocked() + b.mu.Unlock() + }() + + // Make pickfirst children use health listeners for outlier detection and + // health checking to work. + ccs.ResolverState = pickfirstleaf.EnableHealthListener(ccs.ResolverState) + childConfig := balancer.ClientConnState{ + BalancerConfig: lazyPickfirstConfig, + ResolverState: ccs.ResolverState, } - b.config = newConfig - - // If resolver state contains no addresses, return an error so ClientConn - // will trigger re-resolve. Also records this as an resolver error, so when - // the overall state turns transient failure, the error message will have - // the zero address information. - if len(s.ResolverState.Addresses) == 0 { - b.ResolverError(errors.New("produced zero addresses")) - return balancer.ErrBadResolverState + if err := b.child.UpdateClientConnState(childConfig); err != nil { + return err } - if regenerateRing { - // Ring creation is guaranteed to not fail because we call newRing() - // with a non-empty subConns map. - b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize, b.logger) - b.regeneratePicker() - b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) + b.mu.Lock() + // Ring updates can happen due to the following: + // 1. Addition or deletion of endpoints: The synchronous picker update from + // the child endpointsharding balancer would contain the list of updated + // endpoints. Updates triggered by the child after handling the + // `UpdateClientConnState` call will not change the endpoint list. + // 2. Change in the `LoadBalancerConfig`: Ring config such as max/min ring + // size. + // To avoid extra ring updates, a boolean is used to track the need for a + // ring update and the update is done only once at the end. + // + // If the ring configuration has changed, we need to regenerate the ring + // while sending a new picker. + if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize { + b.shouldRegenerateRing = true } - - // Successful resolution; clear resolver error and return nil. - b.resolverErr = nil + b.config = newConfig + b.mu.Unlock() return nil } func (b *ringhashBalancer) ResolverError(err error) { - b.resolverErr = err - if b.subConns.Len() == 0 { - b.state = connectivity.TransientFailure - } - - if b.state != connectivity.TransientFailure { - // The picker will not change since the balancer does not currently - // report an error. - return - } - b.regeneratePicker() - b.cc.UpdateState(balancer.State{ - ConnectivityState: b.state, - Picker: b.picker, - }) + b.child.ResolverError(err) } func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state) } -// updateSubConnState updates the per-SubConn state stored in the ring, and also -// the aggregated state. -// -// It triggers an update to cc when: -// - the new state is TransientFailure, to update the error message -// - it's possible that this is a noop, but sending an extra update is easier -// than comparing errors -// -// - the aggregated state is changed -// - the same picker will be sent again, but this update may trigger a re-pick -// for some RPCs. -func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { - s := state.ConnectivityState - if logger.V(2) { - b.logger.Infof("Handle SubConn state change: %p, %v", sc, s) - } - scs, ok := b.scStates[sc] - if !ok { - b.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s) +func (b *ringhashBalancer) updatePickerLocked() { + if b.inhibitChildUpdates { return } - oldSCState := scs.effectiveState() - scs.setState(s) - newSCState := scs.effectiveState() - b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState) - - b.state = b.csEvltr.recordTransition(oldSCState, newSCState) - - switch s { - case connectivity.TransientFailure: - // Save error to be reported via picker. - b.connErr = state.ConnectionError - case connectivity.Shutdown: - // When an address was removed by resolver, b called Shutdown but kept - // the sc's state in scStates. Remove state for this sc here. - delete(b.scStates, sc) - } - - if oldSCState != newSCState { - // Because the picker caches the state of the subconns, we always - // regenerate and update the picker when the effective SubConn state - // changes. - b.regeneratePicker() - b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker) - b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) - } + state := b.aggregatedStateLocked() - switch b.state { - case connectivity.Connecting, connectivity.TransientFailure: + // Start connecting to new endpoints if necessary. + if state == connectivity.Connecting || state == connectivity.TransientFailure { // When overall state is TransientFailure, we need to make sure at least - // one SubConn is attempting to connect, otherwise this balancer may + // one endpoint is attempting to connect, otherwise this balancer may // never get picks if the parent is priority. // // Because we report Connecting as the overall state when only one - // SubConn is in TransientFailure, we do the same check for Connecting + // endpoint is in TransientFailure, we do the same check for Connecting // here. // - // Note that this check also covers deleting SubConns due to address - // change. E.g. if the SubConn attempting to connect is deleted, and the - // overall state is TF. Since there must be at least one SubConn - // attempting to connect, we need to trigger one. But since the deleted - // SubConn will eventually send a shutdown update, this code will run - // and trigger the next SubConn to connect. - for _, v := range b.subConns.Values() { - sc := v.(*subConn) - if sc.isAttemptingToConnect() { - return + // Note that this check also covers deleting endpoints. E.g. if the + // endpoint attempting to connect is deleted, and the overall state is + // TF. Since there must be at least one endpoint attempting to connect, + // we need to trigger one. + var idleBalancer balancer.ExitIdler + for _, e := range b.orderedEndpoints { + val, ok := b.endpointStates.Get(e) + if !ok { + b.logger.Errorf("Missing endpoint information in picker update from child balancer: %v", e) + continue + } + es := val.(*endpointState) + es.mu.RLock() + connState := es.state.ConnectivityState + es.mu.RUnlock() + if connState == connectivity.Connecting { + idleBalancer = nil + break + } + if idleBalancer == nil && connState == connectivity.Idle { + idleBalancer = es.balancer } } - - // Trigger a SubConn (the next in the order addresses appear in the - // resolver) to connect if nobody is attempting to connect. - b.lastInternallyTriggeredSCIndex = (b.lastInternallyTriggeredSCIndex + 1) % len(b.orderedSubConns) - b.orderedSubConns[b.lastInternallyTriggeredSCIndex].queueConnect() + if idleBalancer != nil { + idleBalancer.ExitIdle() + } } -} -// mergeErrors builds an error from the last connection error and the last -// resolver error. Must only be called if b.state is TransientFailure. -func (b *ringhashBalancer) mergeErrors() error { - // connErr must always be non-nil unless there are no SubConns, in which - // case resolverErr must be non-nil. - if b.connErr == nil { - return fmt.Errorf("last resolver error: %v", b.resolverErr) + // Update the channel. + if b.endpointStates.Len() > 0 && b.shouldRegenerateRing { + // with a non-empty list of endpoints. + b.ring = newRing(b.endpointStates, b.config.MinRingSize, b.config.MaxRingSize, b.logger) } - if b.resolverErr == nil { - return fmt.Errorf("last connection error: %v", b.connErr) + b.shouldRegenerateRing = false + var picker balancer.Picker + if b.endpointStates.Len() == 0 { + picker = base.NewErrPicker(errors.New("produced zero addresses")) + } else { + picker = newPicker(b.ring, b.logger) } - return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr) -} - -func (b *ringhashBalancer) regeneratePicker() { - if b.state == connectivity.TransientFailure { - b.picker = base.NewErrPicker(b.mergeErrors()) - return - } - b.picker = newPicker(b.ring, b.logger) + b.logger.Infof("Pushing new state %v and picker %p", state, picker) + b.ClientConn.UpdateState(balancer.State{ + ConnectivityState: state, + Picker: picker, + }) } func (b *ringhashBalancer) Close() { b.logger.Infof("Shutdown") + b.child.Close() } func (b *ringhashBalancer) ExitIdle() { // ExitIdle implementation is a no-op because connections are either - // triggers from picks or from subConn state changes. + // triggers from picks or from child balancer state changes. } -// connectivityStateEvaluator takes the connectivity states of multiple SubConns -// and returns one aggregated connectivity state. -// -// It's not thread safe. -type connectivityStateEvaluator struct { - sum uint64 - nums [5]uint64 -} - -// recordTransition records state change happening in subConn and based on that -// it evaluates what aggregated state should be. -// -// - If there is at least one subchannel in READY state, report READY. -// - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE. -// - If there is at least one subchannel in CONNECTING state, report CONNECTING. -// - If there is one subchannel in TRANSIENT_FAILURE and there is more than one subchannel, report state CONNECTING. -// - If there is at least one subchannel in Idle state, report Idle. -// - Otherwise, report TRANSIENT_FAILURE. +// aggregatedStateLocked records returns the aggregated child balancers state +// based on the following rules. +// - If there is at least one endpoint in READY state, report READY. +// - If there are 2 or more endpoints in TRANSIENT_FAILURE state, report +// TRANSIENT_FAILURE. +// - If there is at least one endpoint in CONNECTING state, report CONNECTING. +// - If there is one endpoint in TRANSIENT_FAILURE and there is more than one +// endpoint, report state CONNECTING. +// - If there is at least one endpoint in Idle state, report Idle. +// - Otherwise, report TRANSIENT_FAILURE. // // Note that if there are 1 connecting, 2 transient failure, the overall state // is transient failure. This is because the second transient failure is a -// fallback of the first failing SubConn, and we want to report transient +// fallback of the first failing endpoint, and we want to report transient // failure to failover to the lower priority. -func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State { - // Update counters. - for idx, state := range []connectivity.State{oldState, newState} { - updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. - cse.nums[state] += updateVal - } - if oldState == connectivity.Shutdown { - // There's technically no transition from Shutdown. But we record a - // Shutdown->Idle transition when a new SubConn is created. - cse.sum++ - } - if newState == connectivity.Shutdown { - cse.sum-- +func (b *ringhashBalancer) aggregatedStateLocked() connectivity.State { + var nums [5]int + for _, val := range b.endpointStates.Values() { + es := val.(*endpointState) + es.mu.RLock() + nums[es.state.ConnectivityState]++ + es.mu.RUnlock() } - if cse.nums[connectivity.Ready] > 0 { + if nums[connectivity.Ready] > 0 { return connectivity.Ready } - if cse.nums[connectivity.TransientFailure] > 1 { + if nums[connectivity.TransientFailure] > 1 { return connectivity.TransientFailure } - if cse.nums[connectivity.Connecting] > 0 { + if nums[connectivity.Connecting] > 0 { return connectivity.Connecting } - if cse.nums[connectivity.TransientFailure] > 0 && cse.sum > 1 { + if nums[connectivity.TransientFailure] == 1 && b.endpointStates.Len() > 1 { return connectivity.Connecting } - if cse.nums[connectivity.Idle] > 0 { + if nums[connectivity.Idle] > 0 { return connectivity.Idle } return connectivity.TransientFailure @@ -521,10 +339,19 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne // When used in the xDS context, the weight attribute is guaranteed to be // non-zero. But, when used in a non-xDS context, the weight attribute could be // unset. A Default of 1 is used in the latter case. -func getWeightAttribute(addr resolver.Address) uint32 { - w := weightedroundrobin.GetAddrInfo(addr).Weight +func getWeightAttribute(e resolver.Endpoint) uint32 { + w := weightedroundrobin.GetAddrInfoFromEndpoint(e).Weight if w == 0 { return 1 } return w } + +type endpointState struct { + firstAddr string + weight uint32 + balancer balancer.ExitIdler + + mu sync.RWMutex + state balancer.State +} diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index f6778d832f8c..f845a03073b1 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -24,26 +24,17 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/stats" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal" ) -var ( - cmpOpts = cmp.Options{ - cmp.AllowUnexported(testutils.TestSubConn{}, ringEntry{}, subConn{}), - cmpopts.IgnoreFields(subConn{}, "mu"), - cmpopts.IgnoreFields(testutils.TestSubConn{}, "connectCalled"), - } -) - const ( defaultTestTimeout = 10 * time.Second defaultTestShortTimeout = 10 * time.Millisecond @@ -67,37 +58,41 @@ func ctxWithHash(h uint64) context.Context { } // setupTest creates the balancer, and does an initial sanity check. -func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.BalancerClientConn, balancer.Balancer, balancer.Picker) { +func setupTest(t *testing.T, endpoints []resolver.Endpoint) (*testutils.BalancerClientConn, balancer.Balancer, balancer.Picker) { t.Helper() cc := testutils.NewBalancerClientConn(t) builder := balancer.Get(Name) - b := builder.Build(cc, balancer.BuildOptions{}) + b := builder.Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}}) if b == nil { t.Fatalf("builder.Build(%s) failed and returned nil", Name) } if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: addrs}, + ResolverState: resolver.State{Endpoints: endpoints}, BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } - for _, addr := range addrs { - addr1 := <-cc.NewSubConnAddrsCh - if want := []resolver.Address{addr}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) { - t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{}))) - } - sc1 := <-cc.NewSubConnCh - // All the SubConns start in Idle, and should not Connect(). - select { - case <-sc1.ConnectCh: - t.Errorf("unexpected Connect() from SubConn %v", sc1) - case <-time.After(defaultTestShortTimeout): - } + // The leaf pickfirst are created lazily, only when their endpoint is picked + // or other endpoints are in TF. No SubConns should be created immediately. + select { + case sc := <-cc.NewSubConnCh: + t.Errorf("unexpected SubConn creation: %v", sc) + case <-time.After(defaultTestShortTimeout): } - // Should also have a picker, with all SubConns in Idle. + // Should also have a picker, with all endpoints in Idle. p1 := <-cc.NewPickerCh + + ringHashPicker := p1.(*picker) + if got, want := len(ringHashPicker.endpointStates), len(endpoints); got != want { + t.Errorf("Number of child balancers = %d, want = %d", got, want) + } + for es, bs := range ringHashPicker.endpointStates { + if got, want := bs.ConnectivityState, connectivity.Idle; got != want { + t.Errorf("Child balancer connectivity state for address %q = %v, want = %v", es.firstAddr, got, want) + } + } return cc, b, p1 } @@ -117,15 +112,15 @@ func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) { origMinRingSize, origMaxRingSize := 1, 10 // Configured from `testConfig` in `setupTest` newMinRingSize, newMaxRingSize := 20, 100 - addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}} - cc, b, p1 := setupTest(t, addrs) + endpoints := []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}} + cc, b, p1 := setupTest(t, endpoints) ring1 := p1.(*picker).ring if ringSize := len(ring1.items); ringSize < origMinRingSize || ringSize > origMaxRingSize { t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, origMinRingSize, origMaxRingSize) } if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: addrs}, + ResolverState: resolver.State{Endpoints: endpoints}, BalancerConfig: &LBConfig{MinRingSize: uint64(newMinRingSize), MaxRingSize: uint64(newMaxRingSize)}, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) @@ -143,20 +138,30 @@ func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) { } } -func (s) TestOneSubConn(t *testing.T) { +func (s) TestOneEndpoint(t *testing.T) { wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]} - cc, _, p0 := setupTest(t, []resolver.Address{wantAddr1}) + cc, _, p0 := setupTest(t, []resolver.Endpoint{{Addresses: []resolver.Address{wantAddr1}}}) ring0 := p0.(*picker).ring firstHash := ring0.items[0].hash // firstHash-1 will pick the first (and only) SubConn from the ring. testHash := firstHash - 1 - // The first pick should be queued, and should trigger Connect() on the only - // SubConn. + // The first pick should be queued, and should trigger a connection to the + // only Endpoint which has a single address. if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) } - sc0 := ring0.items[0].sc.sc.(*testutils.TestSubConn) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + var sc0 *testutils.TestSubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc0 = <-cc.NewSubConnCh: + } + if got, want := sc0.Addresses[0].Addr, wantAddr1.Addr; got != want { + t.Fatalf("SubConn.Addresses = %v, want = %v", got, want) + } select { case <-sc0.ConnectCh: case <-time.After(defaultTestTimeout): @@ -166,6 +171,9 @@ func (s) TestOneSubConn(t *testing.T) { // Send state updates to Ready. sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil { + t.Fatal(err) + } // Test pick with one backend. p1 := <-cc.NewPickerCh @@ -181,18 +189,18 @@ func (s) TestOneSubConn(t *testing.T) { // same hash always pick the same SubConn. When the one picked is down, another // one will be picked. func (s) TestThreeSubConnsAffinity(t *testing.T) { - wantAddrs := []resolver.Address{ - {Addr: testBackendAddrStrs[0]}, - {Addr: testBackendAddrStrs[1]}, - {Addr: testBackendAddrStrs[2]}, + wantEndpoints := []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, } - cc, _, p0 := setupTest(t, wantAddrs) + cc, _, p0 := setupTest(t, wantEndpoints) // This test doesn't update addresses, so this ring will be used by all the // pickers. ring0 := p0.(*picker).ring firstHash := ring0.items[0].hash - // firstHash+1 will pick the second SubConn from the ring. + // firstHash+1 will pick the second endpoint from the ring. testHash := firstHash + 1 // The first pick should be queued, and should trigger Connect() on the only // SubConn. @@ -200,7 +208,17 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) } // The picked SubConn should be the second in the ring. - sc0 := ring0.items[1].sc.sc.(*testutils.TestSubConn) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + var sc0 *testutils.TestSubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc0 = <-cc.NewSubConnCh: + } + if got, want := sc0.Addresses[0].Addr, ring0.items[1].endpointState.firstAddr; got != want { + t.Fatalf("SubConn.Address = %v, want = %v", got, want) + } select { case <-sc0.ConnectCh: case <-time.After(defaultTestTimeout): @@ -210,6 +228,8 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { // Send state updates to Ready. sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + <-sc0.HealthUpdateDelivered.Done() + p1 := <-cc.NewPickerCh for i := 0; i < 5; i++ { gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) @@ -219,8 +239,45 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { } // Turn down the subConn in use. - sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + // The channel should transition to IDLE. + if err := cc.WaitForConnectivityState(ctx, connectivity.Idle); err != nil { + t.Fatal(err) + } p2 := <-cc.NewPickerCh + // The first pick should be queued, and should trigger Connect() on the only + // SubConn. + if _, err := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + select { + case <-sc0.ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + // Fail the first SubConn. + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + // Each child balancer state update should trigger a picker update. + _ = <-cc.NewPickerCh + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + // Reporting TF will cause the balancer to try connecting to the first + // address in the endpoint list. + var sc1 *testutils.TestSubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc1 = <-cc.NewSubConnCh: + } + if got, want := sc1.Addresses[0].Addr, ring0.items[0].endpointState.firstAddr; got != want { + t.Fatalf("SubConn.Address = %v, want = %v", got, want) + } + select { + case <-sc1.ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + p2 = <-cc.NewPickerCh // Pick with the same hash should be queued, because the SubConn after the // first picked is Idle. if _, err := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { @@ -228,26 +285,39 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { } // The third SubConn in the ring should connect. - sc1 := ring0.items[2].sc.sc.(*testutils.TestSubConn) + var sc2 *testutils.TestSubConn select { - case <-sc1.ConnectCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc2 = <-cc.NewSubConnCh: + } + if got, want := sc2.Addresses[0].Addr, ring0.items[2].endpointState.firstAddr; got != want { + t.Fatalf("SubConn.Address = %v, want = %v", got, want) + } + select { + case <-sc2.ConnectCh: case <-time.After(defaultTestTimeout): - t.Errorf("timeout waiting for Connect() from SubConn %v", sc1) + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) } // Send state updates to Ready. - sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil { + t.Fatal(err) + } // New picks should all return this SubConn. p3 := <-cc.NewPickerCh for i := 0; i < 5; i++ { gotSCSt, _ := p3.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) - if gotSCSt.SubConn != sc1 { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) + if gotSCSt.SubConn != sc2 { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) } } // Now, after backoff, the first picked SubConn will turn Idle. + // The leaf pickfirst will be in sticky TF, so it will report TF till the + // SubConn becomes ready, no picker updates will be sent by ringhash. sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) // The picks above should have queued Connect() for the first picked // SubConn, so this Idle state change will trigger a Connect(). @@ -260,10 +330,16 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { // After the first picked SubConn turn Ready, new picks should return it // again (even though the second picked SubConn is also Ready). sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + // Once the SubConn is Ready, it will report Connecting and wait for a + // health update. Once it get a health update for Ready, pickfirst will + // report Ready. + sc0.HealthUpdateDelivered = grpcsync.NewEvent() sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + <-sc0.HealthUpdateDelivered.Done() p4 := <-cc.NewPickerCh for i := 0; i < 5; i++ { - gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + gotSCSt, err := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + fmt.Println(err) if gotSCSt.SubConn != sc0 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) } @@ -273,13 +349,13 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { // TestThreeBackendsAffinity covers that there are 3 SubConns, RPCs with the // same hash always pick the same SubConn. Then try different hash to pick // another backend, and verify the first hash still picks the first backend. -func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) { - wantAddrs := []resolver.Address{ - {Addr: testBackendAddrStrs[0]}, - {Addr: testBackendAddrStrs[1]}, - {Addr: testBackendAddrStrs[2]}, +func (s) TestThreeBackendsAffinityMultiple(t *testing.T) { + wantEndpoints := []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, } - cc, _, p0 := setupTest(t, wantAddrs) + cc, _, p0 := setupTest(t, wantEndpoints) // This test doesn't update addresses, so this ring will be used by all the // pickers. ring0 := p0.(*picker).ring @@ -292,7 +368,18 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) { if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) } - sc0 := ring0.items[1].sc.sc.(*testutils.TestSubConn) + // The picked SubConn should be the second in the ring. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + var sc0 *testutils.TestSubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc0 = <-cc.NewSubConnCh: + } + if got, want := sc0.Addresses[0].Addr, ring0.items[1].endpointState.firstAddr; got != want { + t.Fatalf("SubConn.Address = %v, want = %v", got, want) + } select { case <-sc0.ConnectCh: case <-time.After(defaultTestTimeout): @@ -302,6 +389,9 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) { // Send state updates to Ready. sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + if err := cc.WaitForConnectivityState(ctx, connectivity.Ready); err != nil { + t.Fatal(err) + } // First hash should always pick sc0. p1 := <-cc.NewPickerCh @@ -318,7 +408,15 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) { if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}); err != balancer.ErrNoSubConnAvailable { t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) } - sc1 := ring0.items[2].sc.sc.(*testutils.TestSubConn) + var sc1 *testutils.TestSubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc1 = <-cc.NewSubConnCh: + } + if got, want := sc1.Addresses[0].Addr, ring0.items[2].endpointState.firstAddr; got != want { + t.Fatalf("SubConn.Address = %v, want = %v", got, want) + } select { case <-sc1.ConnectCh: case <-time.After(defaultTestTimeout): @@ -326,6 +424,7 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) { } sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + <-sc1.HealthUpdateDelivered.Done() // With the new generated picker, hash2 always picks sc1. p2 := <-cc.NewPickerCh @@ -353,117 +452,140 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) { // sent. And the new ring should contain the correct number of entries // and weights. func (s) TestAddrWeightChange(t *testing.T) { - addrs := []resolver.Address{ - {Addr: testBackendAddrStrs[0]}, - {Addr: testBackendAddrStrs[1]}, - {Addr: testBackendAddrStrs[2]}, + endpoints := []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, } - cc, b, p0 := setupTest(t, addrs) + cc, b, p0 := setupTest(t, endpoints) ring0 := p0.(*picker).ring - // Update with the same addresses, should not send a new Picker. + // Update with the same addresses, it will result in a new picker, but with + // the same ring. if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: addrs}, + ResolverState: resolver.State{Endpoints: endpoints}, BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } + var p1 balancer.Picker select { - case <-cc.NewPickerCh: - t.Fatalf("unexpected picker after UpdateClientConn with the same addresses") - case <-time.After(defaultTestShortTimeout): + case p1 = <-cc.NewPickerCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timeout waiting for picker after UpdateClientConn with same addresses") + } + ring1 := p1.(*picker).ring + if ring1 != ring0 { + t.Fatalf("new picker with same address has a different ring than before, want same") } // Delete an address, should send a new Picker. if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - {Addr: testBackendAddrStrs[0]}, - {Addr: testBackendAddrStrs[1]}, - }}, + ResolverState: resolver.State{Endpoints: endpoints[:2]}, BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } - var p1 balancer.Picker + var p2 balancer.Picker select { - case p1 = <-cc.NewPickerCh: + case p2 = <-cc.NewPickerCh: case <-time.After(defaultTestTimeout): t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses") } - ring1 := p1.(*picker).ring - if ring1 == ring0 { + ring2 := p2.(*picker).ring + if ring2 == ring0 { t.Fatalf("new picker after removing address has the same ring as before, want different") } // Another update with the same addresses, but different weight. if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: []resolver.Address{ - {Addr: testBackendAddrStrs[0]}, - weightedroundrobin.SetAddrInfo( - resolver.Address{Addr: testBackendAddrStrs[1]}, + ResolverState: resolver.State{Endpoints: []resolver.Endpoint{ + endpoints[0], + weightedroundrobin.SetAddrInfoInEndpoint( + endpoints[1], weightedroundrobin.AddrInfo{Weight: 2}), }}, BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err) } - var p2 balancer.Picker + var p3 balancer.Picker select { - case p2 = <-cc.NewPickerCh: + case p3 = <-cc.NewPickerCh: case <-time.After(defaultTestTimeout): t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses") } - if p2.(*picker).ring == ring1 { + if p3.(*picker).ring == ring2 { t.Fatalf("new picker after changing address weight has the same ring as before, want different") } // With the new update, the ring must look like this: // [ - // {idx:0 sc: {addr: testBackendAddrStrs[0], weight: 1}}, - // {idx:1 sc: {addr: testBackendAddrStrs[1], weight: 2}}, - // {idx:2 sc: {addr: testBackendAddrStrs[2], weight: 2}}, + // {idx:0 endpoint: {addr: testBackendAddrStrs[0], weight: 1}}, + // {idx:1 endpoint: {addr: testBackendAddrStrs[1], weight: 2}}, + // {idx:2 endpoint: {addr: testBackendAddrStrs[2], weight: 1}}, // ]. - if len(p2.(*picker).ring.items) != 3 { - t.Fatalf("new picker after changing address weight has %d entries, want 3", len(p2.(*picker).ring.items)) + if len(p3.(*picker).ring.items) != 3 { + t.Fatalf("new picker after changing address weight has %d entries, want 3", len(p3.(*picker).ring.items)) } - for _, i := range p2.(*picker).ring.items { - if i.sc.addr == testBackendAddrStrs[0] { - if i.sc.weight != 1 { - t.Fatalf("new picker after changing address weight has weight %d for %v, want 1", i.sc.weight, i.sc.addr) + for _, i := range p3.(*picker).ring.items { + if i.endpointState.firstAddr == testBackendAddrStrs[0] { + if i.endpointState.weight != 1 { + t.Fatalf("new picker after changing address weight has weight %d for %v, want 1", i.endpointState.weight, i.endpointState.firstAddr) } } - if i.sc.addr == testBackendAddrStrs[1] { - if i.sc.weight != 2 { - t.Fatalf("new picker after changing address weight has weight %d for %v, want 2", i.sc.weight, i.sc.addr) + if i.endpointState.firstAddr == testBackendAddrStrs[1] { + if i.endpointState.weight != 2 { + t.Fatalf("new picker after changing address weight has weight %d for %v, want 2", i.endpointState.weight, i.endpointState.firstAddr) } } } } -// TestSubConnToConnectWhenOverallTransientFailure covers the situation when the -// overall state is TransientFailure, the SubConns turning Idle will trigger the -// next SubConn in the ring to Connect(). But not when the overall state is not -// TransientFailure. -func (s) TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { - wantAddrs := []resolver.Address{ - {Addr: testBackendAddrStrs[0]}, - {Addr: testBackendAddrStrs[1]}, - {Addr: testBackendAddrStrs[2]}, - } - _, _, p0 := setupTest(t, wantAddrs) - ring0 := p0.(*picker).ring +// TestAutoConnectEndpointOnTransientFailure covers the situation when an +// endpoint fails. It verifies that a new endpoint is automatically tried +// (without a pick) when there is no endpoint already in Connecting state. +func (s) TestAutoConnectEndpointOnTransientFailure(t *testing.T) { + wantEndpoints := []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[0]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[1]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[2]}}}, + {Addresses: []resolver.Address{{Addr: testBackendAddrStrs[3]}}}, + } + cc, _, p0 := setupTest(t, wantEndpoints) // ringhash won't tell SCs to connect until there is an RPC, so simulate // one now. p0.Pick(balancer.PickInfo{Ctx: context.Background()}) - // Turn the first subconn to transient failure. - sc0 := ring0.items[0].sc.sc.(*testutils.TestSubConn) + // The picked SubConn should be the second in the ring. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + var sc0 *testutils.TestSubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc0 = <-cc.NewSubConnCh: + } + select { + case <-sc0.ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // Turn the first subconn to transient failure. This should set the overall + // connectivity state to CONNECTING. + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) - sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + cc.WaitForConnectivityState(ctx, connectivity.Connecting) - // It will trigger the second subconn to connect (because overall state is - // Connect (when one subconn is TF)). - sc1 := ring0.items[1].sc.sc.(*testutils.TestSubConn) + // It will trigger the second subconn to connect since there is only one + // endpoint, which is in TF. + var sc1 *testutils.TestSubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc1 = <-cc.NewSubConnCh: + } select { case <-sc1.ConnectCh: case <-time.After(defaultTestShortTimeout): @@ -471,35 +593,49 @@ func (s) TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { } // Turn the second subconn to TF. This will set the overall state to TF. + sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) - sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + cc.WaitForConnectivityState(ctx, connectivity.TransientFailure) // It will trigger the third subconn to connect. - sc2 := ring0.items[2].sc.sc.(*testutils.TestSubConn) + var sc2 *testutils.TestSubConn + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case sc2 = <-cc.NewSubConnCh: + } select { case <-sc2.ConnectCh: case <-time.After(defaultTestShortTimeout): t.Fatalf("timeout waiting for Connect() from SubConn %v", sc2) } - // Turn the third subconn to TF. This will set the overall state to TF. - sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) - sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - // It will trigger the first subconn to connect. + // Send the first SubConn into CONNECTING. To do this, first make it READY, + // then CONNECTING. + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready}) + cc.WaitForConnectivityState(ctx, connectivity.Ready) + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) + // Since one endpoint is in TF and one in CONNECTING, the aggregated state + // will be CONNECTING. + cc.WaitForConnectivityState(ctx, connectivity.Connecting) + p1 := <-cc.NewPickerCh + p1.Pick(balancer.PickInfo{Ctx: context.Background()}) select { case <-sc0.ConnectCh: - case <-time.After(defaultTestShortTimeout): - t.Fatalf("timeout waiting for Connect() from SubConn %v", sc0) + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) } + sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - // Turn the third subconn to TF again. + // This will not trigger any new SubCOnns to be created, because sc0 is + // still attempting to connect, and we only need one SubConn to connect. sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) - sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle}) - // This will not trigger any new Connect() on the SubConns, because sc0 is - // still attempting to connect, and we only need one SubConn to connect. select { + case sc := <-cc.NewSubConnCh: + t.Fatalf("unexpected SubConn creation: %v", sc) case <-sc0.ConnectCh: t.Fatalf("unexpected Connect() from SubConn %v", sc0) case <-sc1.ConnectCh: @@ -510,52 +646,49 @@ func (s) TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { } } -func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) { +func (s) TestAggregatedConnectivityState(t *testing.T) { tests := []struct { - name string - from, to []connectivity.State - want connectivity.State + name string + endpointStates []connectivity.State + want connectivity.State }{ { - name: "one ready", - from: []connectivity.State{connectivity.Idle}, - to: []connectivity.State{connectivity.Ready}, - want: connectivity.Ready, + name: "one ready", + endpointStates: []connectivity.State{connectivity.Ready}, + want: connectivity.Ready, }, { - name: "one connecting", - from: []connectivity.State{connectivity.Idle}, - to: []connectivity.State{connectivity.Connecting}, - want: connectivity.Connecting, + name: "one connecting", + endpointStates: []connectivity.State{connectivity.Connecting}, + want: connectivity.Connecting, }, { - name: "one ready one transient failure", - from: []connectivity.State{connectivity.Idle, connectivity.Idle}, - to: []connectivity.State{connectivity.Ready, connectivity.TransientFailure}, - want: connectivity.Ready, + name: "one ready one transient failure", + endpointStates: []connectivity.State{connectivity.Ready, connectivity.TransientFailure}, + want: connectivity.Ready, }, { - name: "one connecting one transient failure", - from: []connectivity.State{connectivity.Idle, connectivity.Idle}, - to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure}, - want: connectivity.Connecting, + name: "one connecting one transient failure", + endpointStates: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure}, + want: connectivity.Connecting, }, { - name: "one connecting two transient failure", - from: []connectivity.State{connectivity.Idle, connectivity.Idle, connectivity.Idle}, - to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure, connectivity.TransientFailure}, - want: connectivity.TransientFailure, + name: "one connecting two transient failure", + endpointStates: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure, connectivity.TransientFailure}, + want: connectivity.TransientFailure, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cse := &connectivityStateEvaluator{} - var got connectivity.State - for i, fff := range tt.from { - ttt := tt.to[i] - got = cse.recordTransition(fff, ttt) + bal := &ringhashBalancer{endpointStates: resolver.NewEndpointMap()} + for i, cs := range tt.endpointStates { + es := &endpointState{ + state: balancer.State{ConnectivityState: cs}, + } + ep := resolver.Endpoint{Addresses: []resolver.Address{{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)}}} + bal.endpointStates.Set(ep, es) } - if got != tt.want { + if got := bal.aggregatedStateLocked(); got != tt.want { t.Errorf("recordTransition() = %v, want %v", got, tt.want) } }) @@ -564,16 +697,38 @@ func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) { // TestAddrBalancerAttributesChange tests the case where the ringhash balancer // receives a ClientConnUpdate with the same config and addresses as received in -// the previous update. Although the `BalancerAttributes` contents are the same, -// the pointer is different. This test verifies that subConns are not recreated -// in this scenario. +// the previous update. Although the `BalancerAttributes` and endpoint +// attributes contents are the same, the pointers are different. This test +// verifies that subConns are not recreated in this scenario. func (s) TestAddrBalancerAttributesChange(t *testing.T) { - addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})} - cc, b, _ := setupTest(t, addrs1) + locality := internal.LocalityID{Region: "americas"} + addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, locality)} + wantEndpoints1 := []resolver.Endpoint{ + internal.SetLocalityIDInEndpoint(resolver.Endpoint{Addresses: addrs1}, locality), + } + cc, b, p0 := setupTest(t, wantEndpoints1) + ring0 := p0.(*picker).ring + + firstHash := ring0.items[0].hash + // The first pick should be queued, and should trigger a connection to the + // only Endpoint which has a single address. + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(firstHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-ctx.Done(): + t.Fatalf("Timed out waiting for SubConn creation.") + case <-cc.NewSubConnCh: + } - addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})} + addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, locality)} + wantEndpoints2 := []resolver.Endpoint{ + internal.SetLocalityIDInEndpoint(resolver.Endpoint{Addresses: addrs2}, locality), + } if err := b.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: resolver.State{Addresses: addrs2}, + ResolverState: resolver.State{Endpoints: wantEndpoints2}, BalancerConfig: testConfig, }); err != nil { t.Fatalf("UpdateClientConnState returned err: %v", err)