clusterresolver: fix deadlock when dns resolver responds inline with update or error at build time (#6563)

This commit is contained in:
Easwar Swaminathan 2023-08-23 16:32:58 -07:00 committed by GitHub
parent 81b9df233e
commit 4c9777ceff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 197 additions and 214 deletions

View File

@ -280,7 +280,7 @@ func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bo
// EDS resource was removed. No action needs to be taken for this, and we
// should continue watching the same EDS resource.
if fromParent && xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
b.resourceWatcher.stop()
b.resourceWatcher.stop(false)
}
if b.child != nil {
@ -326,7 +326,7 @@ func (b *clusterResolverBalancer) run() {
// Close results in stopping the endpoint resolvers and closing the
// underlying child policy and is the only way to exit this goroutine.
case <-b.closed.Done():
b.resourceWatcher.stop()
b.resourceWatcher.stop(true)
if b.child != nil {
b.child.Close()

View File

@ -18,9 +18,10 @@ package e2e_test
import (
"context"
"errors"
"fmt"
"net"
"sort"
"strconv"
"strings"
"testing"
"time"
@ -28,6 +29,7 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
@ -317,29 +319,124 @@ func (s) TestAggregateCluster_WithTwoEDSClusters_PrioritiesChange(t *testing.T)
}
}
func hostAndPortFromAddress(t *testing.T, addr string) (string, uint32) {
t.Helper()
host, p, err := net.SplitHostPort(addr)
if err != nil {
t.Fatalf("Invalid serving address: %v", addr)
}
port, err := strconv.ParseUint(p, 10, 32)
if err != nil {
t.Fatalf("Invalid serving port %q: %v", p, err)
}
return host, uint32(port)
}
// TestAggregateCluster_WithOneDNSCluster tests the case where the top-level
// cluster resource is an aggregate cluster that resolves to a single
// LOGICAL_DNS cluster. The test verifies that RPCs can be made to backends that
// make up the LOGICAL_DNS cluster.
func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
// Start two test backends.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, _ := backendAddressesAndPorts(t, servers)
// Start a test service backend.
server := stubserver.StartTestService(t, nil)
defer server.Stop()
host, port := hostAndPortFromAddress(t, server.Address)
// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
const (
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
const dnsClusterName = clusterName + "-dns"
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
makeLogicalDNSClusterResource(dnsClusterName, host, uint32(port)),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()
// Make an RPC and ensure that it gets routed to the first backend since the
// child policy for a LOGICAL_DNS cluster is pick_first by default.
client := testgrpc.NewTestServiceClient(cc)
peer := &peer.Peer{}
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != server.Address {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
}
}
// Tests the case where the top-level cluster resource is an aggregate cluster
// that resolves to a single LOGICAL_DNS cluster. The specified dns hostname is
// expected to fail url parsing. The test verifies that the channel moves to
// TRANSIENT_FAILURE.
func (s) TestAggregateCluster_WithOneDNSCluster_ParseFailure(t *testing.T) {
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
const dnsClusterName = clusterName + "-dns"
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
makeLogicalDNSClusterResource(dnsClusterName, "%gh&%ij", uint32(8080)),
},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()
// Ensure that the ClientConn moves to TransientFailure.
for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
if !cc.WaitForStateChange(ctx, state) {
t.Fatalf("Timed out waiting for state change. got %v; want %v", state, connectivity.TransientFailure)
}
}
}
// Tests the case where the top-level cluster resource is an aggregate cluster
// that resolves to a single LOGICAL_DNS cluster. The test verifies that RPCs
// can be made to backends that make up the LOGICAL_DNS cluster. The hostname of
// the LOGICAL_DNS cluster is updated, and the test verifies that RPCs can be
// made to backends that the new hostname resolves to.
func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) {
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup1()
// Start two test backends and extract their host and port. The first
// backend is used initially for the LOGICAL_DNS cluster and an update
// switches the cluster to use the second backend.
servers, cleanup2 := startTestServiceBackends(t, 2)
defer cleanup2()
// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
const dnsClusterName = clusterName + "-dns"
dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[0].Address)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
@ -359,20 +456,6 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) {
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()
// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}
// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs})
// Make an RPC and ensure that it gets routed to the first backend since the
// child policy for a LOGICAL_DNS cluster is pick_first by default.
client := testgrpc.NewTestServiceClient(cc)
@ -380,8 +463,35 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
if peer.Addr.String() != servers[0].Address {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, servers[0].Address)
}
// Update the LOGICAL_DNS cluster's hostname to point to the second backend.
dnsHostName, dnsPort = hostAndPortFromAddress(t, servers[1].Address)
resources = e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
},
SkipValidation: true,
}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Ensure that traffic moves to the second backend eventually.
for ctx.Err() == nil {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() == servers[1].Address {
break
}
}
if ctx.Err() != nil {
t.Fatal("Timeout when waiting for RPCs to switch to the second backend")
}
}
@ -500,9 +610,6 @@ func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) {
// cluster. The test verifies that RPCs are successful, this time to backends in
// the DNS cluster.
func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
@ -513,15 +620,12 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)
dnsHostName, dnsPort := hostAndPortFromAddress(t, addrs[1].Addr)
// Configure an aggregate cluster pointing to a single EDS cluster. Also,
// configure the underlying EDS cluster (and the corresponding endpoints
// resource) and DNS cluster (will be used later in the test).
const (
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
const dnsClusterName = clusterName + "-dns"
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
@ -563,20 +667,6 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
t.Fatal(err)
}
// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}
// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
// Ensure that start getting routed to the backend corresponding to the
// LOGICAL_DNS cluster.
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
@ -705,17 +795,14 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
// the DNS resolver pushes an update, the test verifies that we switch to the
// DNS cluster and can make a successful RPC.
func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
// Start two test backends.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, _ := backendAddressesAndPorts(t, servers)
// Start a test service backend.
server := stubserver.StartTestService(t, nil)
defer server.Stop()
dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address)
// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
// cluster. Also configure an empty endpoints resource for the EDS cluster
@ -723,8 +810,6 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{
@ -755,23 +840,9 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()
// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}
// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs})
// Ensure that RPCs start getting routed to the first backend since the
// child policy for a LOGICAL_DNS cluster is pick_first by default.
pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0])
pickfirst.CheckRPCsToBackend(ctx, cc, resolver.Address{Addr: server.Address})
}
// TestAggregateCluster_BadDNS_GoodEDS tests the case where the top-level
@ -780,34 +851,29 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
// good update, this test verifies the cluster_resolver balancer correctly falls
// back from the LOGICAL_DNS cluster to the EDS cluster.
func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
// Start two test backends.
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)
// Start a test service backend.
server := stubserver.StartTestService(t, nil)
defer server.Stop()
_, edsPort := hostAndPortFromAddress(t, server.Address)
// Configure an aggregate cluster pointing to an LOGICAL_DNS and EDS
// cluster. Also configure an endpoints resource for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080),
e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(edsPort)})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -821,20 +887,6 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()
// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}
// Push an error through the DNS resolver.
dnsR.ReportError(errors.New("some error"))
// RPCs should work, higher level DNS cluster errors so should fallback to
// EDS cluster.
client := testgrpc.NewTestServiceClient(cc)
@ -842,8 +894,8 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
if peer.Addr.String() != addrs[0].Addr {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
if peer.Addr.String() != server.Address {
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
}
}
@ -854,9 +906,6 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
// error, the test verifies that RPCs fail with the error triggered by the DNS
// Discovery Mechanism (from sending an empty address list down).
func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
// Start an xDS management server.
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
@ -867,8 +916,6 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
emptyEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
resources := e2e.UpdateOptions{
@ -876,7 +923,7 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
Clusters: []*v3clusterpb.Cluster{
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080),
},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{emptyEndpointResource},
SkipValidation: true,
@ -892,39 +939,20 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()
// Make an RPC with a short deadline. We expect this RPC to not succeed
// because the EDS resource came back with no endpoints, and we are yet to
// push an update through the DNS resolver.
client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
}
// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}
// Push an error from the DNS resolver as well.
dnsErr := fmt.Errorf("DNS error")
dnsR.ReportError(dnsErr)
// Ensure that the error from the DNS Resolver leads to an empty address
// update for both priorities.
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if code := status.Code(err); code != codes.Unavailable {
t.Fatalf("EmptyCall() failed with code %s, want %s", code, codes.Unavailable)
client := testgrpc.NewTestServiceClient(cc)
for ctx.Err() == nil {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err == nil {
t.Fatal("EmptyCall() succeeded when expected to fail")
}
if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") {
break
}
}
if err == nil || !strings.Contains(err.Error(), "produced zero addresses") {
t.Fatalf("EmptyCall() failed with error: %v, want: produced zero addresses", err)
if ctx.Err() != nil {
t.Fatalf("Timeout when waiting for RPCs to fail with expected code and error")
}
}
@ -937,9 +965,6 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
// previously received good update and that RPCs still get routed to the EDS
// cluster.
func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
// Start an xDS management server.
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
@ -950,14 +975,13 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)
dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address)
// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also
// configure an endpoints resource for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
@ -980,20 +1004,6 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()
// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}
// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
// Make an RPC and ensure that it gets routed to the first backend since the
// EDS cluster is of higher priority than the LOGICAL_DNS cluster.
client := testgrpc.NewTestServiceClient(cc)
@ -1032,9 +1042,6 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test
// the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response
// as though it received an update with no endpoints.
func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
// Start an xDS management server.
mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
@ -1045,13 +1052,12 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes
servers, cleanup3 := startTestServiceBackends(t, 2)
defer cleanup3()
addrs, ports := backendAddressesAndPorts(t, servers)
dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address)
// Configure an aggregate cluster pointing to an EDS and DNS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
@ -1080,20 +1086,6 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()
// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}
// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
peer := &peer.Peer{}
client := testgrpc.NewTestServiceClient(cc)
@ -1111,9 +1103,6 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes
// cluster. The test verifies that the cluster_resolver LB policy falls back to
// the LOGICAL_DNS cluster in this case.
func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
defer cleanup1()
// Start an xDS management server.
mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
defer cleanup2()
@ -1121,14 +1110,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
// Start a test backend for the LOGICAL_DNS cluster.
server := stubserver.StartTestService(t, nil)
defer server.Stop()
dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address)
// Configure an aggregate cluster pointing to an EDS and DNS cluster. No
// endpoints are configured for the EDS cluster.
const (
edsClusterName = clusterName + "-eds"
dnsClusterName = clusterName + "-dns"
dnsHostName = "dns_host"
dnsPort = uint32(8080)
)
resources := e2e.UpdateOptions{
NodeID: nodeID,
@ -1177,35 +1165,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
}
defer cc.Close()
// Make an RPC with a short deadline. We expect this RPC to not succeed
// because the DNS resolver has not responded with endpoint addresses.
client := testgrpc.NewTestServiceClient(cc)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
}
// Ensure that the DNS resolver is started for the expected target.
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for DNS resolver to be started")
case target := <-dnsTargetCh:
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
if got != want {
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
}
}
// Update DNS resolver with test backend addresses.
dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}})
// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
// Even though the EDS cluster is of higher priority, since the management
// server does not respond with an EDS resource, the cluster_resolver LB
// policy is expected to fallback to the LOGICAL_DNS cluster once the watch
// timeout expires.
peer := &peer.Peer{}
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

View File

@ -19,9 +19,11 @@
package clusterresolver
import (
"context"
"sync"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@ -83,9 +85,11 @@ type discoveryMechanismAndResolver struct {
}
type resourceResolver struct {
parent *clusterResolverBalancer
logger *grpclog.PrefixLogger
updateChannel chan *resourceUpdate
parent *clusterResolverBalancer
logger *grpclog.PrefixLogger
updateChannel chan *resourceUpdate
serializer *grpcsync.CallbackSerializer
serializerCancel context.CancelFunc
// mu protects the slice and map, and content of the resolvers in the slice.
mu sync.Mutex
@ -106,12 +110,16 @@ type resourceResolver struct {
}
func newResourceResolver(parent *clusterResolverBalancer, logger *grpclog.PrefixLogger) *resourceResolver {
return &resourceResolver{
rr := &resourceResolver{
parent: parent,
logger: logger,
updateChannel: make(chan *resourceUpdate, 1),
childrenMap: make(map[discoveryMechanismKey]discoveryMechanismAndResolver),
}
ctx, cancel := context.WithCancel(context.Background())
rr.serializer = grpcsync.NewCallbackSerializer(ctx)
rr.serializerCancel = cancel
return rr
}
func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool {
@ -210,8 +218,9 @@ func (rr *resourceResolver) resolveNow() {
}
}
func (rr *resourceResolver) stop() {
func (rr *resourceResolver) stop(closing bool) {
rr.mu.Lock()
// Save the previous childrenMap to stop the children outside the mutex,
// and reinitialize the map. We only need to reinitialize to allow for the
// policy to be reused if the resource comes back. In practice, this does
@ -222,12 +231,18 @@ func (rr *resourceResolver) stop() {
rr.childrenMap = make(map[discoveryMechanismKey]discoveryMechanismAndResolver)
rr.mechanisms = nil
rr.children = nil
rr.mu.Unlock()
for _, r := range cm {
r.r.stop()
}
if closing {
rr.serializerCancel()
<-rr.serializer.Done()
}
// stop() is called when the LB policy is closed or when the underlying
// cluster resource is removed by the management server. In the latter case,
// an empty config update needs to be pushed to the child policy to ensure
@ -272,7 +287,9 @@ func (rr *resourceResolver) generateLocked() {
}
func (rr *resourceResolver) onUpdate() {
rr.mu.Lock()
rr.generateLocked()
rr.mu.Unlock()
rr.serializer.Schedule(func(context.Context) {
rr.mu.Lock()
rr.generateLocked()
rr.mu.Unlock()
})
}