diff --git a/internal/testutils/blocking_context_dialer.go b/internal/testutils/blocking_context_dialer.go index 32dfabf36..f99a1a87d 100644 --- a/internal/testutils/blocking_context_dialer.go +++ b/internal/testutils/blocking_context_dialer.go @@ -127,3 +127,13 @@ func (h *Hold) Fail(err error) { h.blockCh <- err close(h.blockCh) } + +// IsStarted returns true if this hold has received a connection attempt. +func (h *Hold) IsStarted() bool { + select { + case <-h.waitCh: + return true + default: + return false + } +} diff --git a/internal/testutils/blocking_context_dialer_test.go b/internal/testutils/blocking_context_dialer_test.go index d2b595414..eb5d418d4 100644 --- a/internal/testutils/blocking_context_dialer_test.go +++ b/internal/testutils/blocking_context_dialer_test.go @@ -59,6 +59,10 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) { d := NewBlockingDialer() h := d.Hold(lis.Addr().String()) + if h.IsStarted() { + t.Fatalf("hold.IsStarted() = true, want false") + } + done := make(chan struct{}) ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() @@ -69,6 +73,10 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) { t.Errorf("BlockingDialer.DialContext() got error: %v, want success", err) return } + + if !h.IsStarted() { + t.Errorf("hold.IsStarted() = false, want true") + } conn.Close() }() @@ -76,6 +84,11 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) { if !h.Wait(ctx) { t.Fatalf("Timeout while waiting for a connection attempt to %q", h.addr) } + + if !h.IsStarted() { + t.Errorf("hold.IsStarted() = false, want true") + } + select { case <-done: t.Fatalf("Expected dialer to be blocked.") diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go index c317cb2f1..2b98ee5ad 100644 --- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -20,6 +20,7 @@ package ringhash_test import ( "context" + "errors" "fmt" "math" "math/rand" @@ -1566,3 +1567,443 @@ func (s) TestRingHash_ReattemptWhenGoingFromTransientFailureToIdle(t *testing.T) t.Errorf("conn.GetState(): got %v, want %v", got, want) } } + +// Tests that when all backends are down and then up, we may pick a TF backend +// and we will then jump to ready backend. +func (s) TestRingHash_TransientFailureSkipToAvailableReady(t *testing.T) { + emptyCallF := func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + } + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener1 := testutils.NewRestartableListener(lis) + restartableServer1 := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener1, + EmptyCallF: emptyCallF, + }) + defer restartableServer1.Stop() + + lis, err = testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener2 := testutils.NewRestartableListener(lis) + restartableServer2 := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener2, + EmptyCallF: emptyCallF, + }) + defer restartableServer2.Stop() + + nonExistantBackends := makeNonExistentBackends(t, 2) + + const clusterName = "cluster" + backends := []string{restartableServer1.Address, restartableServer2.Address} + backends = append(backends, nonExistantBackends...) + endpoints := endpointResource(t, clusterName, backends) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + opts := []grpc.DialOption{ + grpc.WithConnectParams(grpc.ConnectParams{ + // Disable backoff to speed up the test. + MinConnectTimeout: 100 * time.Millisecond, + }), + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + conn, err := grpc.NewClient("xds:///test.server", opts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + testutils.AwaitState(ctx, t, conn, connectivity.Idle) + + // Test starts with backends not listening. + restartableListener1.Stop() + restartableListener2.Stop() + + // Send a request with a hash that should go to restartableServer1. + // Because it is not accepting connections, and no other backend is + // listening, the RPC fails. + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", restartableServer1.Address+"_0")) + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil { + t.Fatalf("rpc EmptyCall() succeeded, want error") + } + + testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure) + + // Bring up first backend. The channel should become Ready without any + // picks, because in TF, we are always trying to connect to at least one + // backend at all times. + restartableListener1.Restart() + testutils.AwaitState(ctx, t, conn, connectivity.Ready) + + // Bring down backend 1 and bring up backend 2. + // Note the RPC contains a header value that will always be hashed to + // backend 1. So by purposely bringing down backend 1 and bringing up + // another backend, this will ensure Picker's first choice of backend 1 + // fails and it will go through the remaining subchannels to find one in + // READY. Since the entries in the ring are pretty distributed and we have + // unused ports to fill the ring, it is almost guaranteed that the Picker + // will go through some non-READY entries and skip them as per design. + t.Logf("bringing down backend 1") + restartableListener1.Stop() + + testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure) + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil { + t.Fatalf("rpc EmptyCall() succeeded, want error") + } + + t.Logf("bringing up backend 2") + restartableListener2.Restart() + testutils.AwaitState(ctx, t, conn, connectivity.Ready) + + wantPeerAddr := "" + for wantPeerAddr != restartableServer2.Address { + p := peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p)); errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("Timed out waiting for rpc EmptyCall() to be routed to the expected backend") + } + wantPeerAddr = p.Addr.String() + } +} + +// Tests that when all backends are down, we keep reattempting. +func (s) TestRingHash_ReattemptWhenAllEndpointsUnreachable(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener := testutils.NewRestartableListener(lis) + restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener, + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + }) + defer restartableServer.Stop() + + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, []string{restartableServer.Address}) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + dopts := []grpc.DialOption{ + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithConnectParams(fastConnectParams), + } + conn, err := grpc.NewClient("xds:///test.server", dopts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + testutils.AwaitState(ctx, t, conn, connectivity.Idle) + + t.Log("Stopping the backend server") + restartableListener.Stop() + + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { + t.Fatalf("rpc EmptyCall() succeeded, want Unavailable error") + } + + // Wait for channel to fail. + testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure) + + t.Log("Restarting the backend server") + restartableListener.Restart() + + // Wait for channel to become connected without any pending RPC. + testutils.AwaitState(ctx, t, conn, connectivity.Ready) +} + +// Tests that when a backend goes down, we will move on to the next subchannel +// (with a lower priority). When the backend comes back up, traffic will move +// back. +func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("Failed to create listener: %v", err) + } + restartableListener := testutils.NewRestartableListener(lis) + restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{ + Listener: restartableListener, + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + }) + defer restartableServer.Stop() + + otherBackend := startTestServiceBackends(t, 1)[0] + + // We must set the host name socket address in EDS, as the ring hash policy + // uses it to construct the ring. + host, _, err := net.SplitHostPort(otherBackend) + if err != nil { + t.Fatalf("Failed to split host and port from stubserver: %v", err) + } + + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Host: host, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, []string{restartableServer.Address}), + Weight: 1, + }, { + Backends: backendOptions(t, []string{otherBackend}), + Weight: 1, + Priority: 1, + }}}) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + dopts := []grpc.DialOption{ + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithConnectParams(fastConnectParams), + } + conn, err := grpc.NewClient("xds:///test.server", dopts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Note each type of RPC contains a header value that will always be hashed + // to the value that was used to place the non-existent endpoint on the ring. + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", restartableServer.Address+"_0")) + var got string + for got = range checkRPCSendOK(ctx, t, client, 1) { + } + if want := restartableServer.Address; got != want { + t.Fatalf("Got RPC routed to addr %v, want %v", got, want) + } + + // Trigger failure with the existing backend, which should cause the + // balancer to go in transient failure and the priority balancer to move + // to the lower priority. + restartableListener.Stop() + + for { + p := peer.Peer{} + _, err = client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)) + + // Ignore errors: we may need to attempt to send an RPC to detect the + // failure (the next write on connection fails). + if err == nil { + if got, want := p.Addr.String(), otherBackend; got != want { + t.Fatalf("Got RPC routed to addr %v, want %v", got, want) + } + break + } + } + + // Now we start the backend with the address hash that is used in the + // metadata, so eventually RPCs should be routed to it, since it is in a + // locality with higher priority. + peerAddr := "" + restartableListener.Restart() + for peerAddr != restartableServer.Address { + p := peer.Peer{} + _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p)) + if errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("Timed out waiting for rpc EmptyCall() to be routed to the expected backend") + } + peerAddr = p.Addr.String() + } +} + +// Tests that when we trigger internal connection attempts without picks, we do +// so for only one subchannel at a time. +func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *testing.T) { + backends := startTestServiceBackends(t, 1) + nonExistantBackends := makeNonExistentBackends(t, 3) + + const clusterName = "cluster" + + endpoints := endpointResource(t, clusterName, append(nonExistantBackends, backends...)) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + dialer := testutils.NewBlockingDialer() + dialOpts := []grpc.DialOption{ + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(dialer.DialContext), + grpc.WithConnectParams(fastConnectParams), + } + conn, err := grpc.NewClient("xds:///test.server", dialOpts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + holdNonExistant0 := dialer.Hold(nonExistantBackends[0]) + holdNonExistant1 := dialer.Hold(nonExistantBackends[1]) + holdNonExistant2 := dialer.Hold(nonExistantBackends[2]) + holdGood := dialer.Hold(backends[0]) + + rpcCtx, rpcCancel := context.WithCancel(ctx) + errCh := make(chan error, 1) + go func() { + rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistantBackends[0]+"_0")) + _, err := client.EmptyCall(rpcCtx, &testpb.Empty{}) + if status.Code(err) == codes.Canceled { + errCh <- nil + return + } + errCh <- err + }() + + // Wait for the RPC to trigger a connection attempt to the first address, + // then cancel the RPC. No other connection attempts should be started yet. + if !holdNonExistant0.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to backend 0") + } + rpcCancel() + if err := <-errCh; err != nil { + t.Fatalf("Expected RPC to fail be canceled, got %v", err) + } + + // Since the connection attempt to the first address is still blocked, no + // other connection attempts should be started yet. + if holdNonExistant1.IsStarted() { + t.Errorf("Got connection attempt to backend 1, expected no connection attempt.") + } + if holdNonExistant2.IsStarted() { + t.Errorf("Got connection attempt to backend 2, expected no connection attempt.") + } + if holdGood.IsStarted() { + t.Errorf("Got connection attempt to good backend, expected no connection attempt.") + } + + // Allow the connection attempt to the first address to resume and wait for + // the attempt for the second address. No other connection attempts should + // be started yet. + holdNonExistant0Again := dialer.Hold(nonExistantBackends[0]) + holdNonExistant0.Resume() + if !holdNonExistant1.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to backend 1") + } + if holdNonExistant0Again.IsStarted() { + t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + } + if holdNonExistant2.IsStarted() { + t.Errorf("Got connection attempt to backend 2, expected no connection attempt.") + } + if holdGood.IsStarted() { + t.Errorf("Got connection attempt to good backend, expected no connection attempt.") + } + + // Allow the connection attempt to the second address to resume and wait for + // the attempt for the third address. No other connection attempts should + // be started yet. + holdNonExistant1Again := dialer.Hold(nonExistantBackends[1]) + holdNonExistant1.Resume() + if !holdNonExistant2.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to backend 2") + } + if holdNonExistant0Again.IsStarted() { + t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + } + if holdNonExistant1Again.IsStarted() { + t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") + } + if holdGood.IsStarted() { + t.Errorf("Got connection attempt to good backend, expected no connection attempt.") + } + + // Allow the connection attempt to the third address to resume and wait + // for the attempt for the final address. No other connection attempts + // should be started yet. + holdNonExistant2Again := dialer.Hold(nonExistantBackends[2]) + holdNonExistant2.Resume() + if !holdGood.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to good backend") + } + if holdNonExistant0Again.IsStarted() { + t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + } + if holdNonExistant1Again.IsStarted() { + t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") + } + if holdNonExistant2Again.IsStarted() { + t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.") + } + + // Allow the final attempt to resume. + holdGood.Resume() + + // Wait for channel to become connected without any pending RPC. + testutils.AwaitState(ctx, t, conn, connectivity.Ready) + + // No other connection attempts should have been started + if holdNonExistant0Again.IsStarted() { + t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.") + } + if holdNonExistant1Again.IsStarted() { + t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.") + } + if holdNonExistant2Again.IsStarted() { + t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.") + } +} diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index b450716fa..5ce72cade 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -159,28 +159,3 @@ func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry { // There's no qualifying next entry. return nil } - -// nextSkippingDuplicatesSubConn finds the next subconn in the ring, that's -// different from the given subconn. -func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn { - var entry *ringEntry - for _, it := range ring.items { - if it.sc == sc { - entry = it - break - } - } - if entry == nil { - // If the given subconn is not in the ring (e.g. it was deleted), return - // the first one. - if len(ring.items) > 0 { - return ring.items[0].sc - } - return nil - } - ee := nextSkippingDuplicates(ring, entry) - if ee == nil { - return nil - } - return ee.sc -} diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index e63c6f653..8cc7fd534 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -46,10 +46,11 @@ type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { b := &ringhashBalancer{ - cc: cc, - subConns: resolver.NewAddressMap(), - scStates: make(map[balancer.SubConn]*subConn), - csEvltr: &connectivityStateEvaluator{}, + cc: cc, + subConns: resolver.NewAddressMap(), + scStates: make(map[balancer.SubConn]*subConn), + csEvltr: &connectivityStateEvaluator{}, + orderedSubConns: make([]*subConn, 0), } b.logger = prefixLogger(b) b.logger.Infof("Created") @@ -197,6 +198,14 @@ type ringhashBalancer struct { resolverErr error // the last error reported by the resolver; cleared on successful resolution connErr error // the last connection error; cleared upon leaving TransientFailure + + // orderedSubConns contains the list of subconns in the order that addresses + // appear from the resolver. Together with lastInternallyTriggeredSCIndex, + // this allows triggering connection attempts to all SubConns independently + // of the order they appear on the ring. Always in sync with ring and + // subConns. The index is reset when addresses change. + orderedSubConns []*subConn + lastInternallyTriggeredSCIndex int } // updateAddresses creates new SubConns and removes SubConns, based on the @@ -214,6 +223,9 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { var addrsUpdated bool // addrsSet is the set converted from addrs, used for quick lookup. addrsSet := resolver.NewAddressMap() + + b.orderedSubConns = b.orderedSubConns[:0] // reuse the underlying array. + for _, addr := range addrs { addrsSet.Set(addr, true) newWeight := getWeightAttribute(addr) @@ -234,6 +246,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle) b.subConns.Set(addr, scs) b.scStates[sc] = scs + b.orderedSubConns = append(b.orderedSubConns, scs) addrsUpdated = true } else { // We have seen this address before and created a subConn for it. If the @@ -244,6 +257,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { // since *only* the weight attribute has changed, and that does not affect // subConn uniqueness. scInfo := val.(*subConn) + b.orderedSubConns = append(b.orderedSubConns, scInfo) if oldWeight := scInfo.weight; oldWeight != newWeight { scInfo.weight = newWeight b.subConns.Set(addr, scInfo) @@ -264,6 +278,9 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { // The entry will be deleted in updateSubConnState. } } + if addrsUpdated { + b.lastInternallyTriggeredSCIndex = 0 + } return addrsUpdated } @@ -399,19 +416,11 @@ func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balance return } } - // Trigger a SubConn (this updated SubConn's next SubConn in the ring) - // to connect if nobody is attempting to connect. - sc := nextSkippingDuplicatesSubConn(b.ring, scs) - if sc != nil { - sc.queueConnect() - return - } - // This handles the edge case where we have a single subConn in the - // ring. nextSkippingDuplicatesSubCon() would have returned nil. We - // still need to ensure that some subConn is attempting to connect, in - // order to give the LB policy a chance to move out of - // TRANSIENT_FAILURE. Hence, we try connecting on the current subConn. - scs.queueConnect() + + // Trigger a SubConn (the next in the order addresses appear in the + // resolver) to connect if nobody is attempting to connect. + b.lastInternallyTriggeredSCIndex = (b.lastInternallyTriggeredSCIndex + 1) % len(b.orderedSubConns) + b.orderedSubConns[b.lastInternallyTriggeredSCIndex].queueConnect() } }