xds/resolver: cleanup tests to use real xDS client 3/n (#5953)

This commit is contained in:
Easwar Swaminathan 2023-01-24 19:16:33 -08:00 committed by GitHub
parent bf8fc46fa6
commit a6376c9893
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 394 additions and 183 deletions

View File

@ -38,7 +38,6 @@ import (
xdscreds "google.golang.org/grpc/credentials/xds" xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/grpctest"
iresolver "google.golang.org/grpc/internal/resolver" iresolver "google.golang.org/grpc/internal/resolver"
@ -65,6 +64,7 @@ import (
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@ -108,12 +108,12 @@ type testClientConn struct {
} }
func (t *testClientConn) UpdateState(s resolver.State) error { func (t *testClientConn) UpdateState(s resolver.State) error {
t.stateCh.Send(s) t.stateCh.Replace(s)
return nil return nil
} }
func (t *testClientConn) ReportError(err error) { func (t *testClientConn) ReportError(err error) {
t.errorCh.Send(err) t.errorCh.Replace(err)
} }
func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult { func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult {
@ -566,147 +566,341 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
} }
} }
// TestXDSResolverCloseClosesXDSClient tests that the XDS resolver's Close // TestResolverBadServiceUpdate tests the case where a resource returned by the
// method closes the XDS client. // management server is NACKed by the xDS client, which then returns an update
func (s) TestXDSResolverCloseClosesXDSClient(t *testing.T) { // containing an error to the resolver. Verifies that the update is propagated
xdsR, _, _, cancel := testSetup(t, setupOpts{target: target}) // to the ClientConn by the resolver. It also tests the cases where the resolver
xdsR.Close() // gets a good update subsequently, and another error after the good update.
cancel() // Blocks until the xDS client is closed. func (s) TestResolverBadServiceUpdate(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
} }
defer mgmtServer.Stop()
// TestXDSResolverBadServiceUpdate tests the case the xdsClient returns a bad // Create a bootstrap configuration specifying the above management server.
// service update. nodeID := uuid.New().String()
func (s) TestXDSResolverBadServiceUpdate(t *testing.T) { cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) NodeID: nodeID,
defer xdsR.Close() ServerURI: mgmtServer.Address,
defer cancel() Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
// Configure a listener resource that is expected to be NACKed because it
// does not contain the `RouteSpecifier` field in the HTTPConnectionManager.
hcm := testutils.MarshalAny(&v3httppb.HttpConnectionManager{
HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
})
lis := &v3listenerpb.Listener{
Name: serviceName,
ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
FilterChains: []*v3listenerpb.FilterChain{{
Name: "filter-chain-name",
Filters: []*v3listenerpb.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
}},
}},
}
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{lis},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel() defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr) if err := mgmtServer.Update(ctx, resources); err != nil {
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) t.Fatal(err)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr) }
// Invoke the watchAPI callback with a bad service update and wait for the wantErr := "no RouteSpecifier"
// ReportError method to be called on the ClientConn. val, err := tcc.errorCh.Receive(ctx)
suErr := errors.New("bad serviceupdate") if err != nil {
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr) t.Fatal("Timeout when waiting for error to be propagated to the ClientConn")
}
gotErr := val.(error)
if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr)
}
if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != nil || gotErrVal != suErr { // Configure good listener and route configuration resources on the
t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr) // management server.
rdsName := "route-" + serviceName
cdsName := "cluster-" + serviceName
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Expect a good update from the resolver.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
// Configure another bad resource on the management server.
resources = e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{lis},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Expect an error update from the resolver.
val, err = tcc.errorCh.Receive(ctx)
if err != nil {
t.Fatal("Timeout when waiting for error to be propagated to the ClientConn")
}
gotErr = val.(error)
if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr)
} }
} }
// TestXDSResolverGoodServiceUpdate tests the happy case where the resolver // TestResolverGoodServiceUpdate tests the case where the resource returned by
// gets a good service update from the xdsClient. // the management server is ACKed by the xDS client, which then returns a good
func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { // service update to the resolver. The test verifies that the service config
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) // returned by the resolver matches expectations, and that the config selector
defer xdsR.Close() // returned by the resolver picks clusters based on the route configuration
defer cancel() // received from the management server.
func (s) TestResolverGoodServiceUpdate(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) // Create a bootstrap configuration specifying the above management server.
defer cancel() nodeID := uuid.New().String()
waitForWatchListener(ctx, t, xdsC, targetStr) cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) NodeID: nodeID,
waitForWatchRouteConfig(ctx, t, xdsC, routeStr) ServerURI: mgmtServer.Address,
defer replaceRandNumGenerator(0)() Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
ldsName := serviceName
rdsName := "route-" + serviceName
for _, tt := range []struct { for _, tt := range []struct {
routes []*xdsresource.Route routeConfig *v3routepb.RouteConfiguration
wantJSON string wantServiceConfig string
wantClusters map[string]bool wantClusters map[string]bool
}{ }{
{ {
routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"test-cluster-1": {Weight: 1}}}}, // A route configuration with a single cluster.
wantJSON: `{"loadBalancingConfig":[{ routeConfig: &v3routepb.RouteConfiguration{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "test-cluster-1",
Weight: &wrapperspb.UInt32Value{Value: 100},
},
},
}},
}},
}},
}},
},
wantServiceConfig: `
{
"loadBalancingConfig": [{
"xds_cluster_manager_experimental": { "xds_cluster_manager_experimental": {
"children": { "children": {
"cluster:test-cluster-1": { "cluster:test-cluster-1": {
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] "childPolicy": [{
"cds_experimental": {
"cluster": "test-cluster-1"
}
}]
} }
} }
}}]}`, }
}]
}`,
wantClusters: map[string]bool{"cluster:test-cluster-1": true}, wantClusters: map[string]bool{"cluster:test-cluster-1": true},
}, },
{ {
routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ // A route configuration with a two new clusters.
"cluster_1": {Weight: 75}, routeConfig: &v3routepb.RouteConfiguration{
"cluster_2": {Weight: 25}, Name: rdsName,
}}}, VirtualHosts: []*v3routepb.VirtualHost{{
// This update contains the cluster from the previous update as Domains: []string{ldsName},
// well as this update, as the previous config selector still Routes: []*v3routepb.Route{{
// references the old cluster when the new one is pushed. Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
wantJSON: `{"loadBalancingConfig":[{ Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "cluster_1",
Weight: &wrapperspb.UInt32Value{Value: 75},
},
{
Name: "cluster_2",
Weight: &wrapperspb.UInt32Value{Value: 25},
},
},
}},
}},
}},
}},
},
// This update contains the cluster from the previous update as well
// as this update, as the previous config selector still references
// the old cluster when the new one is pushed.
wantServiceConfig: `
{
"loadBalancingConfig": [{
"xds_cluster_manager_experimental": { "xds_cluster_manager_experimental": {
"children": { "children": {
"cluster:test-cluster-1": { "cluster:test-cluster-1": {
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] "childPolicy": [{
"cds_experimental": {
"cluster": "test-cluster-1"
}
}]
}, },
"cluster:cluster_1": { "cluster:cluster_1": {
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] "childPolicy": [{
"cds_experimental": {
"cluster": "cluster_1"
}
}]
}, },
"cluster:cluster_2": { "cluster:cluster_2": {
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] "childPolicy": [{
"cds_experimental": {
"cluster": "cluster_2"
}
}]
} }
} }
}}]}`, }
}]
}`,
wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
}, },
{ {
routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ // A redundant route configuration update.
"cluster_1": {Weight: 75}, // TODO(easwars): Do we need this, or can we do something else? Because the xds client might swallow this update.
"cluster_2": {Weight: 25}, routeConfig: &v3routepb.RouteConfiguration{
}}}, Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "cluster_1",
Weight: &wrapperspb.UInt32Value{Value: 75},
},
{
Name: "cluster_2",
Weight: &wrapperspb.UInt32Value{Value: 25},
},
},
}},
}},
}},
}},
},
// With this redundant update, the old config selector has been // With this redundant update, the old config selector has been
// stopped, so there are no more references to the first cluster. // stopped, so there are no more references to the first cluster.
// Only the second update's clusters should remain. // Only the second update's clusters should remain.
wantJSON: `{"loadBalancingConfig":[{ wantServiceConfig: `
{
"loadBalancingConfig": [{
"xds_cluster_manager_experimental": { "xds_cluster_manager_experimental": {
"children": { "children": {
"cluster:cluster_1": { "cluster:cluster_1": {
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] "childPolicy": [{
"cds_experimental": {
"cluster": "cluster_1"
}
}]
}, },
"cluster:cluster_2": { "cluster:cluster_2": {
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] "childPolicy": [{
"cds_experimental": {
"cluster": "cluster_2"
}
}]
} }
} }
}}]}`, }
}]
}`,
wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
}, },
} { } {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: tt.routes,
},
},
}, nil)
// Configure the management server with a good listener resource and a
// route configuration resource, as specified by the test case.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{tt.routeConfig},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel() defer cancel()
gotState, err := tcc.stateCh.Receive(ctx) if err := mgmtServer.Update(ctx, resources); err != nil {
if err != nil { t.Fatal(err)
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
} }
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(tt.wantJSON) // Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(tt.wantServiceConfig)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config") t.Errorf("Received unexpected service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config)) t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
} }
cs := iresolver.GetConfigSelector(rState) cs := iresolver.GetConfigSelector(rState)
if cs == nil { if cs == nil {
t.Error("received nil config selector") t.Fatal("Received nil config selector in update from resolver")
continue
} }
pickedClusters := make(map[string]bool) pickedClusters := make(map[string]bool)
@ -714,15 +908,15 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
// with the random number generator stubbed out, we can rely on this // with the random number generator stubbed out, we can rely on this
// to be 100% reproducible. // to be 100% reproducible.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil { if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) t.Fatalf("cs.SelectConfig(): %v", err)
} }
cluster := clustermanager.GetPickedClusterForTesting(res.Context) cluster := clustermanager.GetPickedClusterForTesting(res.Context)
pickedClusters[cluster] = true pickedClusters[cluster] = true
res.OnCommitted() res.OnCommitted()
} }
if !reflect.DeepEqual(pickedClusters, tt.wantClusters) { if !cmp.Equal(pickedClusters, tt.wantClusters) {
t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters) t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters)
} }
} }
@ -965,110 +1159,139 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) {
} }
} }
// TestXDSResolverRemovedResource tests for proper behavior after a resource is // TestResolverRemovedResource tests the case where resources returned by the
// removed. // management server are removed. The test verifies that the resolver pushes the
func (s) TestXDSResolverRemovedResource(t *testing.T) { // expected config selector and service config in this case.
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) func (s) TestResolverRemovedResource(t *testing.T) {
defer cancel() mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
defer xdsR.Close() if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
// Configure the management server with a good listener and route
// configuration resource.
ldsName := serviceName
rdsName := "route-" + serviceName
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel() defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr) if err := mgmtServer.Update(ctx, resources); err != nil {
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) t.Fatal(err)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr) }
// Invoke the watchAPI callback with a good service update and wait for the // Read the update pushed by the resolver to the ClientConn.
// UpdateState method to be called on the ClientConn. val, err := tcc.stateCh.Receive(ctx)
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ if err != nil {
VirtualHosts: []*xdsresource.VirtualHost{ t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`
{
"loadBalancingConfig": [
{ {
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"test-cluster-1": {Weight: 1}}}},
},
},
}, nil)
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental": { "xds_cluster_manager_experimental": {
"children": { "children": {
"cluster:test-cluster-1": { "cluster:test-cluster-1": {
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] "childPolicy": [
{
"cds_experimental": {
"cluster": "test-cluster-1"
} }
} }
}}]}` ]
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON)
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
} }
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
} }
}
}
]
}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config") t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
} }
// "Make an RPC" by invoking the config selector. // "Make an RPC" by invoking the config selector.
cs := iresolver.GetConfigSelector(rState) cs := iresolver.GetConfigSelector(rState)
if cs == nil { if cs == nil {
t.Fatalf("received nil config selector") t.Fatal("Received nil config selector in update from resolver")
} }
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil { if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) t.Fatalf("cs.SelectConfig(): %v", err)
} }
// "Finish the RPC"; this could cause a panic if the resolver doesn't // "Finish the RPC"; this could cause a panic if the resolver doesn't
// handle it correctly. // handle it correctly.
res.OnCommitted() res.OnCommitted()
// Delete the resource. The channel should receive a service config with the // Delete the resources on the management server, resulting in a
// original cluster but with an erroring config selector. // resource-not-found error from the xDS client.
suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error") if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr) t.Fatal(err)
if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
} }
rState = gotState.(resolver.State)
// The channel should receive the existing service config with the original
// cluster but with an erroring config selector.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil { if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
} }
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config") t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
} }
// "Make another RPC" by invoking the config selector. // "Make another RPC" by invoking the config selector.
cs = iresolver.GetConfigSelector(rState) cs = iresolver.GetConfigSelector(rState)
if cs == nil { if cs == nil {
t.Fatalf("received nil config selector") t.Fatal("Received nil config selector in update from resolver")
} }
res, err = cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err == nil || status.Code(err) != codes.Unavailable { if err == nil || status.Code(err) != codes.Unavailable {
t.Fatalf("Expected UNAVAILABLE error from cs.SelectConfig(_); got %v, %v", res, err) t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err)
} }
// In the meantime, an empty ServiceConfig update should have been sent. // In the meantime, an empty ServiceConfig update should have been sent.
if gotState, err = tcc.stateCh.Receive(ctx); err != nil { val, err = tcc.stateCh.Receive(ctx)
t.Fatalf("Error waiting for UpdateState to be called: %v", err) if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
} }
rState = gotState.(resolver.State) rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil { if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
} }
wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}") wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config") t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Error("want: ", cmp.Diff(nil, wantSCParsed.Config))
} }
} }
@ -1892,18 +2115,6 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {
} }
} }
func replaceRandNumGenerator(start int64) func() {
nextInt := start
xdsresource.RandInt63n = func(int64) (ret int64) {
ret = nextInt
nextInt++
return
}
return func() {
xdsresource.RandInt63n = grpcrand.Int63n
}
}
func newDurationP(d time.Duration) *time.Duration { func newDurationP(d time.Duration) *time.Duration {
return &d return &d
} }