ringhash: fix bug where ring hash can be stuck in transient failure despite having available endpoints (#7364)

This commit is contained in:
Antoine Tollenaere 2024-08-20 19:39:14 +02:00 committed by GitHub
parent 1e2bb717e0
commit ee5cbce343
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 490 additions and 42 deletions

View File

@ -127,3 +127,13 @@ func (h *Hold) Fail(err error) {
h.blockCh <- err
close(h.blockCh)
}
// IsStarted returns true if this hold has received a connection attempt.
func (h *Hold) IsStarted() bool {
select {
case <-h.waitCh:
return true
default:
return false
}
}

View File

@ -59,6 +59,10 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) {
d := NewBlockingDialer()
h := d.Hold(lis.Addr().String())
if h.IsStarted() {
t.Fatalf("hold.IsStarted() = true, want false")
}
done := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
@ -69,6 +73,10 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) {
t.Errorf("BlockingDialer.DialContext() got error: %v, want success", err)
return
}
if !h.IsStarted() {
t.Errorf("hold.IsStarted() = false, want true")
}
conn.Close()
}()
@ -76,6 +84,11 @@ func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) {
if !h.Wait(ctx) {
t.Fatalf("Timeout while waiting for a connection attempt to %q", h.addr)
}
if !h.IsStarted() {
t.Errorf("hold.IsStarted() = false, want true")
}
select {
case <-done:
t.Fatalf("Expected dialer to be blocked.")

View File

@ -20,6 +20,7 @@ package ringhash_test
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
@ -1566,3 +1567,443 @@ func (s) TestRingHash_ReattemptWhenGoingFromTransientFailureToIdle(t *testing.T)
t.Errorf("conn.GetState(): got %v, want %v", got, want)
}
}
// Tests that when all backends are down and then up, we may pick a TF backend
// and we will then jump to ready backend.
func (s) TestRingHash_TransientFailureSkipToAvailableReady(t *testing.T) {
emptyCallF := func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
restartableListener1 := testutils.NewRestartableListener(lis)
restartableServer1 := stubserver.StartTestService(t, &stubserver.StubServer{
Listener: restartableListener1,
EmptyCallF: emptyCallF,
})
defer restartableServer1.Stop()
lis, err = testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
restartableListener2 := testutils.NewRestartableListener(lis)
restartableServer2 := stubserver.StartTestService(t, &stubserver.StubServer{
Listener: restartableListener2,
EmptyCallF: emptyCallF,
})
defer restartableServer2.Stop()
nonExistantBackends := makeNonExistentBackends(t, 2)
const clusterName = "cluster"
backends := []string{restartableServer1.Address, restartableServer2.Address}
backends = append(backends, nonExistantBackends...)
endpoints := endpointResource(t, clusterName, backends)
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Policy: e2e.LoadBalancingPolicyRingHash,
})
route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash")
listener := e2e.DefaultClientListener(virtualHostName, route.Name)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t)
if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil {
t.Fatalf("Failed to update xDS resources: %v", err)
}
opts := []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{
// Disable backoff to speed up the test.
MinConnectTimeout: 100 * time.Millisecond,
}),
grpc.WithResolvers(xdsResolver),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
conn, err := grpc.NewClient("xds:///test.server", opts...)
if err != nil {
t.Fatalf("Failed to create client: %s", err)
}
defer conn.Close()
client := testgrpc.NewTestServiceClient(conn)
testutils.AwaitState(ctx, t, conn, connectivity.Idle)
// Test starts with backends not listening.
restartableListener1.Stop()
restartableListener2.Stop()
// Send a request with a hash that should go to restartableServer1.
// Because it is not accepting connections, and no other backend is
// listening, the RPC fails.
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", restartableServer1.Address+"_0"))
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatalf("rpc EmptyCall() succeeded, want error")
}
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)
// Bring up first backend. The channel should become Ready without any
// picks, because in TF, we are always trying to connect to at least one
// backend at all times.
restartableListener1.Restart()
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
// Bring down backend 1 and bring up backend 2.
// Note the RPC contains a header value that will always be hashed to
// backend 1. So by purposely bringing down backend 1 and bringing up
// another backend, this will ensure Picker's first choice of backend 1
// fails and it will go through the remaining subchannels to find one in
// READY. Since the entries in the ring are pretty distributed and we have
// unused ports to fill the ring, it is almost guaranteed that the Picker
// will go through some non-READY entries and skip them as per design.
t.Logf("bringing down backend 1")
restartableListener1.Stop()
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatalf("rpc EmptyCall() succeeded, want error")
}
t.Logf("bringing up backend 2")
restartableListener2.Restart()
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
wantPeerAddr := ""
for wantPeerAddr != restartableServer2.Address {
p := peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p)); errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("Timed out waiting for rpc EmptyCall() to be routed to the expected backend")
}
wantPeerAddr = p.Addr.String()
}
}
// Tests that when all backends are down, we keep reattempting.
func (s) TestRingHash_ReattemptWhenAllEndpointsUnreachable(t *testing.T) {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
restartableListener := testutils.NewRestartableListener(lis)
restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{
Listener: restartableListener,
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
})
defer restartableServer.Stop()
const clusterName = "cluster"
endpoints := endpointResource(t, clusterName, []string{restartableServer.Address})
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Policy: e2e.LoadBalancingPolicyRingHash,
})
route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash")
listener := e2e.DefaultClientListener(virtualHostName, route.Name)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t)
if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil {
t.Fatalf("Failed to update xDS resources: %v", err)
}
dopts := []grpc.DialOption{
grpc.WithResolvers(xdsResolver),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithConnectParams(fastConnectParams),
}
conn, err := grpc.NewClient("xds:///test.server", dopts...)
if err != nil {
t.Fatalf("Failed to create client: %s", err)
}
defer conn.Close()
client := testgrpc.NewTestServiceClient(conn)
testutils.AwaitState(ctx, t, conn, connectivity.Idle)
t.Log("Stopping the backend server")
restartableListener.Stop()
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
t.Fatalf("rpc EmptyCall() succeeded, want Unavailable error")
}
// Wait for channel to fail.
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)
t.Log("Restarting the backend server")
restartableListener.Restart()
// Wait for channel to become connected without any pending RPC.
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
}
// Tests that when a backend goes down, we will move on to the next subchannel
// (with a lower priority). When the backend comes back up, traffic will move
// back.
func (s) TestRingHash_SwitchToLowerPriorityAndThenBack(t *testing.T) {
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
restartableListener := testutils.NewRestartableListener(lis)
restartableServer := stubserver.StartTestService(t, &stubserver.StubServer{
Listener: restartableListener,
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
})
defer restartableServer.Stop()
otherBackend := startTestServiceBackends(t, 1)[0]
// We must set the host name socket address in EDS, as the ring hash policy
// uses it to construct the ring.
host, _, err := net.SplitHostPort(otherBackend)
if err != nil {
t.Fatalf("Failed to split host and port from stubserver: %v", err)
}
const clusterName = "cluster"
endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: clusterName,
Host: host,
Localities: []e2e.LocalityOptions{{
Backends: backendOptions(t, []string{restartableServer.Address}),
Weight: 1,
}, {
Backends: backendOptions(t, []string{otherBackend}),
Weight: 1,
Priority: 1,
}}})
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Policy: e2e.LoadBalancingPolicyRingHash,
})
route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash")
listener := e2e.DefaultClientListener(virtualHostName, route.Name)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t)
if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil {
t.Fatalf("Failed to update xDS resources: %v", err)
}
dopts := []grpc.DialOption{
grpc.WithResolvers(xdsResolver),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithConnectParams(fastConnectParams),
}
conn, err := grpc.NewClient("xds:///test.server", dopts...)
if err != nil {
t.Fatalf("Failed to create client: %s", err)
}
defer conn.Close()
client := testgrpc.NewTestServiceClient(conn)
// Note each type of RPC contains a header value that will always be hashed
// to the value that was used to place the non-existent endpoint on the ring.
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", restartableServer.Address+"_0"))
var got string
for got = range checkRPCSendOK(ctx, t, client, 1) {
}
if want := restartableServer.Address; got != want {
t.Fatalf("Got RPC routed to addr %v, want %v", got, want)
}
// Trigger failure with the existing backend, which should cause the
// balancer to go in transient failure and the priority balancer to move
// to the lower priority.
restartableListener.Stop()
for {
p := peer.Peer{}
_, err = client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p))
// Ignore errors: we may need to attempt to send an RPC to detect the
// failure (the next write on connection fails).
if err == nil {
if got, want := p.Addr.String(), otherBackend; got != want {
t.Fatalf("Got RPC routed to addr %v, want %v", got, want)
}
break
}
}
// Now we start the backend with the address hash that is used in the
// metadata, so eventually RPCs should be routed to it, since it is in a
// locality with higher priority.
peerAddr := ""
restartableListener.Restart()
for peerAddr != restartableServer.Address {
p := peer.Peer{}
_, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p))
if errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("Timed out waiting for rpc EmptyCall() to be routed to the expected backend")
}
peerAddr = p.Addr.String()
}
}
// Tests that when we trigger internal connection attempts without picks, we do
// so for only one subchannel at a time.
func (s) TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime(t *testing.T) {
backends := startTestServiceBackends(t, 1)
nonExistantBackends := makeNonExistentBackends(t, 3)
const clusterName = "cluster"
endpoints := endpointResource(t, clusterName, append(nonExistantBackends, backends...))
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Policy: e2e.LoadBalancingPolicyRingHash,
})
route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash")
listener := e2e.DefaultClientListener(virtualHostName, route.Name)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t)
if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil {
t.Fatalf("Failed to update xDS resources: %v", err)
}
dialer := testutils.NewBlockingDialer()
dialOpts := []grpc.DialOption{
grpc.WithResolvers(xdsResolver),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer.DialContext),
grpc.WithConnectParams(fastConnectParams),
}
conn, err := grpc.NewClient("xds:///test.server", dialOpts...)
if err != nil {
t.Fatalf("Failed to create client: %s", err)
}
defer conn.Close()
client := testgrpc.NewTestServiceClient(conn)
holdNonExistant0 := dialer.Hold(nonExistantBackends[0])
holdNonExistant1 := dialer.Hold(nonExistantBackends[1])
holdNonExistant2 := dialer.Hold(nonExistantBackends[2])
holdGood := dialer.Hold(backends[0])
rpcCtx, rpcCancel := context.WithCancel(ctx)
errCh := make(chan error, 1)
go func() {
rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistantBackends[0]+"_0"))
_, err := client.EmptyCall(rpcCtx, &testpb.Empty{})
if status.Code(err) == codes.Canceled {
errCh <- nil
return
}
errCh <- err
}()
// Wait for the RPC to trigger a connection attempt to the first address,
// then cancel the RPC. No other connection attempts should be started yet.
if !holdNonExistant0.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 0")
}
rpcCancel()
if err := <-errCh; err != nil {
t.Fatalf("Expected RPC to fail be canceled, got %v", err)
}
// Since the connection attempt to the first address is still blocked, no
// other connection attempts should be started yet.
if holdNonExistant1.IsStarted() {
t.Errorf("Got connection attempt to backend 1, expected no connection attempt.")
}
if holdNonExistant2.IsStarted() {
t.Errorf("Got connection attempt to backend 2, expected no connection attempt.")
}
if holdGood.IsStarted() {
t.Errorf("Got connection attempt to good backend, expected no connection attempt.")
}
// Allow the connection attempt to the first address to resume and wait for
// the attempt for the second address. No other connection attempts should
// be started yet.
holdNonExistant0Again := dialer.Hold(nonExistantBackends[0])
holdNonExistant0.Resume()
if !holdNonExistant1.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 1")
}
if holdNonExistant0Again.IsStarted() {
t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.")
}
if holdNonExistant2.IsStarted() {
t.Errorf("Got connection attempt to backend 2, expected no connection attempt.")
}
if holdGood.IsStarted() {
t.Errorf("Got connection attempt to good backend, expected no connection attempt.")
}
// Allow the connection attempt to the second address to resume and wait for
// the attempt for the third address. No other connection attempts should
// be started yet.
holdNonExistant1Again := dialer.Hold(nonExistantBackends[1])
holdNonExistant1.Resume()
if !holdNonExistant2.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 2")
}
if holdNonExistant0Again.IsStarted() {
t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.")
}
if holdNonExistant1Again.IsStarted() {
t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.")
}
if holdGood.IsStarted() {
t.Errorf("Got connection attempt to good backend, expected no connection attempt.")
}
// Allow the connection attempt to the third address to resume and wait
// for the attempt for the final address. No other connection attempts
// should be started yet.
holdNonExistant2Again := dialer.Hold(nonExistantBackends[2])
holdNonExistant2.Resume()
if !holdGood.Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to good backend")
}
if holdNonExistant0Again.IsStarted() {
t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.")
}
if holdNonExistant1Again.IsStarted() {
t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.")
}
if holdNonExistant2Again.IsStarted() {
t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.")
}
// Allow the final attempt to resume.
holdGood.Resume()
// Wait for channel to become connected without any pending RPC.
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
// No other connection attempts should have been started
if holdNonExistant0Again.IsStarted() {
t.Errorf("Got connection attempt to backend 0 again, expected no connection attempt.")
}
if holdNonExistant1Again.IsStarted() {
t.Errorf("Got connection attempt to backend 1 again, expected no connection attempt.")
}
if holdNonExistant2Again.IsStarted() {
t.Errorf("Got connection attempt to backend 2 again, expected no connection attempt.")
}
}

View File

@ -159,28 +159,3 @@ func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
// There's no qualifying next entry.
return nil
}
// nextSkippingDuplicatesSubConn finds the next subconn in the ring, that's
// different from the given subconn.
func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn {
var entry *ringEntry
for _, it := range ring.items {
if it.sc == sc {
entry = it
break
}
}
if entry == nil {
// If the given subconn is not in the ring (e.g. it was deleted), return
// the first one.
if len(ring.items) > 0 {
return ring.items[0].sc
}
return nil
}
ee := nextSkippingDuplicates(ring, entry)
if ee == nil {
return nil
}
return ee.sc
}

View File

@ -46,10 +46,11 @@ type bb struct{}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &ringhashBalancer{
cc: cc,
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]*subConn),
csEvltr: &connectivityStateEvaluator{},
cc: cc,
subConns: resolver.NewAddressMap(),
scStates: make(map[balancer.SubConn]*subConn),
csEvltr: &connectivityStateEvaluator{},
orderedSubConns: make([]*subConn, 0),
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
@ -197,6 +198,14 @@ type ringhashBalancer struct {
resolverErr error // the last error reported by the resolver; cleared on successful resolution
connErr error // the last connection error; cleared upon leaving TransientFailure
// orderedSubConns contains the list of subconns in the order that addresses
// appear from the resolver. Together with lastInternallyTriggeredSCIndex,
// this allows triggering connection attempts to all SubConns independently
// of the order they appear on the ring. Always in sync with ring and
// subConns. The index is reset when addresses change.
orderedSubConns []*subConn
lastInternallyTriggeredSCIndex int
}
// updateAddresses creates new SubConns and removes SubConns, based on the
@ -214,6 +223,9 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
var addrsUpdated bool
// addrsSet is the set converted from addrs, used for quick lookup.
addrsSet := resolver.NewAddressMap()
b.orderedSubConns = b.orderedSubConns[:0] // reuse the underlying array.
for _, addr := range addrs {
addrsSet.Set(addr, true)
newWeight := getWeightAttribute(addr)
@ -234,6 +246,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns.Set(addr, scs)
b.scStates[sc] = scs
b.orderedSubConns = append(b.orderedSubConns, scs)
addrsUpdated = true
} else {
// We have seen this address before and created a subConn for it. If the
@ -244,6 +257,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
// since *only* the weight attribute has changed, and that does not affect
// subConn uniqueness.
scInfo := val.(*subConn)
b.orderedSubConns = append(b.orderedSubConns, scInfo)
if oldWeight := scInfo.weight; oldWeight != newWeight {
scInfo.weight = newWeight
b.subConns.Set(addr, scInfo)
@ -264,6 +278,9 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
// The entry will be deleted in updateSubConnState.
}
}
if addrsUpdated {
b.lastInternallyTriggeredSCIndex = 0
}
return addrsUpdated
}
@ -399,19 +416,11 @@ func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balance
return
}
}
// Trigger a SubConn (this updated SubConn's next SubConn in the ring)
// to connect if nobody is attempting to connect.
sc := nextSkippingDuplicatesSubConn(b.ring, scs)
if sc != nil {
sc.queueConnect()
return
}
// This handles the edge case where we have a single subConn in the
// ring. nextSkippingDuplicatesSubCon() would have returned nil. We
// still need to ensure that some subConn is attempting to connect, in
// order to give the LB policy a chance to move out of
// TRANSIENT_FAILURE. Hence, we try connecting on the current subConn.
scs.queueConnect()
// Trigger a SubConn (the next in the order addresses appear in the
// resolver) to connect if nobody is attempting to connect.
b.lastInternallyTriggeredSCIndex = (b.lastInternallyTriggeredSCIndex + 1) % len(b.orderedSubConns)
b.orderedSubConns[b.lastInternallyTriggeredSCIndex].queueConnect()
}
}