diff --git a/xds/internal/xdsclient/e2e_test/federation_watchers_test.go b/xds/internal/xdsclient/e2e_test/federation_watchers_test.go index 16db9da3b..91c92c6fd 100644 --- a/xds/internal/xdsclient/e2e_test/federation_watchers_test.go +++ b/xds/internal/xdsclient/e2e_test/federation_watchers_test.go @@ -22,8 +22,6 @@ import ( "fmt" "testing" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/bootstrap" @@ -328,26 +326,3 @@ func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) { func newStringP(s string) *string { return &s } - -// verifyRouteConfigUpdate waits for an update to be received on the provided -// update channel and verifies that it matches the expected update. -// -// Returns an error if no update is received before the context deadline expires -// or the received update does not match the expected one. -func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.RouteConfigUpdateErrTuple) error { - u, err := updateCh.Receive(ctx) - if err != nil { - return fmt.Errorf("timeout when waiting for a route configuration resource from the management server: %v", err) - } - got := u.(xdsresource.RouteConfigUpdateErrTuple) - if wantUpdate.Err != nil { - if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType { - return fmt.Errorf("received update with error type %v, want %v", gotType, wantType) - } - } - cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw")} - if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" { - return fmt.Errorf("received unepected diff in the route configuration resource update: (-want, got):\n%s", diff) - } - return nil -} diff --git a/xds/internal/xdsclient/e2e_test/rds_watchers_test.go b/xds/internal/xdsclient/e2e_test/rds_watchers_test.go new file mode 100644 index 000000000..3f4dd419f --- /dev/null +++ b/xds/internal/xdsclient/e2e_test/rds_watchers_test.go @@ -0,0 +1,863 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package e2e_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/xdsclient" + "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + "google.golang.org/protobuf/types/known/wrapperspb" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" +) + +// badRouteConfigResource returns a RouteConfiguration resource for the given +// routeName which contains a retry config with num_retries set to `0`. This is +// expected to be NACK'ed by the xDS client. +func badRouteConfigResource(routeName, ldsTarget, clusterName string) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + }}}}, + RetryPolicy: &v3routepb.RetryPolicy{ + NumRetries: &wrapperspb.UInt32Value{Value: 0}, + }, + }}, + } +} + +// xdsClient is expected to produce an error containing this string when an +// update is received containing a route configuration resource created using +// `badRouteConfigResource`. +const wantRouteConfigNACKErr = "received route is invalid: retry_policy.num_retries = 0; must be >= 1" + +// verifyRouteConfigUpdate waits for an update to be received on the provided +// update channel and verifies that it matches the expected update. +// +// Returns an error if no update is received before the context deadline expires +// or the received update does not match the expected one. +func verifyRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel, wantUpdate xdsresource.RouteConfigUpdateErrTuple) error { + u, err := updateCh.Receive(ctx) + if err != nil { + return fmt.Errorf("timeout when waiting for a route configuration resource from the management server: %v", err) + } + got := u.(xdsresource.RouteConfigUpdateErrTuple) + if wantUpdate.Err != nil { + if gotType, wantType := xdsresource.ErrType(got.Err), xdsresource.ErrType(wantUpdate.Err); gotType != wantType { + return fmt.Errorf("received update with error type %v, want %v", gotType, wantType) + } + } + cmpOpts := []cmp.Option{cmpopts.EquateEmpty(), cmpopts.IgnoreFields(xdsresource.RouteConfigUpdate{}, "Raw")} + if diff := cmp.Diff(wantUpdate.Update, got.Update, cmpOpts...); diff != "" { + return fmt.Errorf("received unepected diff in the route configuration resource update: (-want, got):\n%s", diff) + } + return nil +} + +// verifyNoRouteConfigUpdate verifies that no route configuration update is +// received on the provided update channel, and returns an error if an update is +// received. +// +// A very short deadline is used while waiting for the update, as this function +// is intended to be used when an update is not expected. +func verifyNoRouteConfigUpdate(ctx context.Context, updateCh *testutils.Channel) error { + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + if u, err := updateCh.Receive(sCtx); err != context.DeadlineExceeded { + return fmt.Errorf("unexpected RouteConfigUpdate: %v", u) + } + return nil +} + +// TestRDSWatch covers the case where a single watcher exists for a single route +// configuration resource. The test verifies the following scenarios: +// 1. An update from the management server containing the resource being +// watched should result in the invocation of the watch callback. +// 2. An update from the management server containing a resource *not* being +// watched should not result in the invocation of the watch callback. +// 3. After the watch is cancelled, an update from the management server +// containing the resource that was being watched should not result in the +// invocation of the watch callback. +// +// The test is run for old and new style names. +func (s) TestRDSWatch(t *testing.T) { + tests := []struct { + desc string + resourceName string + watchedResource *v3routepb.RouteConfiguration // The resource being watched. + updatedWatchedResource *v3routepb.RouteConfiguration // The watched resource after an update. + notWatchedResource *v3routepb.RouteConfiguration // A resource which is not being watched. + wantUpdate xdsresource.RouteConfigUpdateErrTuple + }{ + { + desc: "old style resource", + resourceName: rdsName, + watchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, cdsName), + updatedWatchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, "new-cds-resource"), + notWatchedResource: e2e.DefaultRouteConfig("unsubscribed-rds-resource", ldsName, cdsName), + wantUpdate: xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsName}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + }, + }, + }, + }, + }, + }, + }, + { + desc: "new style resource", + resourceName: rdsNameNewStyle, + watchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsNameNewStyle), + updatedWatchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, "new-cds-resource"), + notWatchedResource: e2e.DefaultRouteConfig("unsubscribed-rds-resource", ldsNameNewStyle, cdsNameNewStyle), + wantUpdate: xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsNameNewStyle}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsNameNewStyle: {Weight: 1}}, + }, + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + overrideFedEnvVar(t) + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil) + defer cleanup() + + // Create an xDS client with the above bootstrap contents. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Register a watch for a route configuration resource and have the + // watch callback push the received update on to a channel. + updateCh := testutils.NewChannel() + rdsCancel := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + + // Configure the management server to return a single route + // configuration resource, corresponding to the one being watched. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{test.watchedResource}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify the contents of the received update. + if err := verifyRouteConfigUpdate(ctx, updateCh, test.wantUpdate); err != nil { + t.Fatal(err) + } + + // Configure the management server to return an additional route + // configuration resource, one that we are not interested in. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{test.watchedResource, test.notWatchedResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil { + t.Fatal(err) + } + + // Cancel the watch and update the resource corresponding to the original + // watch. Ensure that the cancelled watch callback is not invoked. + rdsCancel() + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{test.updatedWatchedResource, test.notWatchedResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil { + t.Fatal(err) + } + }) + } +} + +// TestRDSWatch_TwoWatchesForSameResourceName covers the case where two watchers +// exist for a single route configuration resource. The test verifies the +// following scenarios: +// 1. An update from the management server containing the resource being +// watched should result in the invocation of both watch callbacks. +// 2. After one of the watches is cancelled, a redundant update from the +// management server should not result in the invocation of either of the +// watch callbacks. +// 3. An update from the management server containing the resource being +// watched should result in the invocation of the un-cancelled watch +// callback. +// +// The test is run for old and new style names. +func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) { + tests := []struct { + desc string + resourceName string + watchedResource *v3routepb.RouteConfiguration // The resource being watched. + updatedWatchedResource *v3routepb.RouteConfiguration // The watched resource after an update. + wantUpdateV1 xdsresource.RouteConfigUpdateErrTuple + wantUpdateV2 xdsresource.RouteConfigUpdateErrTuple + }{ + { + desc: "old style resource", + resourceName: rdsName, + watchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, cdsName), + updatedWatchedResource: e2e.DefaultRouteConfig(rdsName, ldsName, "new-cds-resource"), + wantUpdateV1: xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsName}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + }, + }, + }, + }, + }, + }, + wantUpdateV2: xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsName}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{"new-cds-resource": {Weight: 1}}, + }, + }, + }, + }, + }, + }, + }, + { + desc: "new style resource", + resourceName: rdsNameNewStyle, + watchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, cdsNameNewStyle), + updatedWatchedResource: e2e.DefaultRouteConfig(rdsNameNewStyle, ldsNameNewStyle, "new-cds-resource"), + wantUpdateV1: xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsNameNewStyle}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsNameNewStyle: {Weight: 1}}, + }, + }, + }, + }, + }, + }, + wantUpdateV2: xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsNameNewStyle}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{"new-cds-resource": {Weight: 1}}, + }, + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + overrideFedEnvVar(t) + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil) + defer cleanup() + + // Create an xDS client with the above bootstrap contents. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Register two watches for the same route configuration resource + // and have the callbacks push the received updates on to a channel. + updateCh1 := testutils.NewChannel() + rdsCancel1 := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel1() + updateCh2 := testutils.NewChannel() + rdsCancel2 := client.WatchRouteConfig(test.resourceName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + + // Configure the management server to return a single route + // configuration resource, corresponding to the one being watched. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{test.watchedResource}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify the contents of the received update. + if err := verifyRouteConfigUpdate(ctx, updateCh1, test.wantUpdateV1); err != nil { + t.Fatal(err) + } + if err := verifyRouteConfigUpdate(ctx, updateCh2, test.wantUpdateV1); err != nil { + t.Fatal(err) + } + + // Cancel the second watch and force the management server to push a + // redundant update for the resource being watched. Neither of the + // two watch callbacks should be invoked. + rdsCancel2() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + if err := verifyNoRouteConfigUpdate(ctx, updateCh1); err != nil { + t.Fatal(err) + } + if err := verifyNoRouteConfigUpdate(ctx, updateCh2); err != nil { + t.Fatal(err) + } + + // Update to the resource being watched. The un-cancelled callback + // should be invoked while the cancelled one should not be. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{test.updatedWatchedResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + if err := verifyRouteConfigUpdate(ctx, updateCh1, test.wantUpdateV2); err != nil { + t.Fatal(err) + } + if err := verifyNoRouteConfigUpdate(ctx, updateCh2); err != nil { + t.Fatal(err) + } + }) + } +} + +// TestRDSWatch_ThreeWatchesForDifferentResourceNames covers the case with three +// watchers (two watchers for one resource, and the third watcher for another +// resource), exist across two route configuration resources. The test verifies +// that an update from the management server containing both resources results +// in the invocation of all watch callbacks. +// +// The test is run with both old and new style names. +func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) { + overrideFedEnvVar(t) + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil) + defer cleanup() + + // Create an xDS client with the above bootstrap contents. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Register two watches for the same route configuration resource + // and have the callbacks push the received updates on to a channel. + updateCh1 := testutils.NewChannel() + rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel1() + updateCh2 := testutils.NewChannel() + rdsCancel2 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel2() + + // Register the third watch for a different route configuration resource. + updateCh3 := testutils.NewChannel() + rdsCancel3 := client.WatchRouteConfig(rdsNameNewStyle, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh3.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel3() + + // Configure the management server to return two route configuration + // resources, corresponding to the registered watches. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{ + e2e.DefaultRouteConfig(rdsName, ldsName, cdsName), + e2e.DefaultRouteConfig(rdsNameNewStyle, ldsName, cdsName), + }, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify the contents of the received update for the all watchers. The two + // resources returned differ only in the resource name. Therefore the + // expected update is the same for all the watchers. + wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsName}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + }, + }, + }, + }, + }, + } + if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate); err != nil { + t.Fatal(err) + } + if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil { + t.Fatal(err) + } + if err := verifyRouteConfigUpdate(ctx, updateCh3, wantUpdate); err != nil { + t.Fatal(err) + } +} + +// TestRDSWatch_ResourceCaching covers the case where a watch is registered for +// a resource which is already present in the cache. The test verifies that the +// watch callback is invoked with the contents from the cache, instead of a +// request being sent to the management server. +func (s) TestRDSWatch_ResourceCaching(t *testing.T) { + overrideFedEnvVar(t) + firstRequestReceived := false + firstAckReceived := grpcsync.NewEvent() + secondRequestReceived := grpcsync.NewEvent() + + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, &e2e.ManagementServerOptions{ + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + // The first request has an empty version string. + if !firstRequestReceived && req.GetVersionInfo() == "" { + firstRequestReceived = true + return nil + } + // The first ack has a non-empty version string. + if !firstAckReceived.HasFired() && req.GetVersionInfo() != "" { + firstAckReceived.Fire() + return nil + } + // Any requests after the first request and ack, are not expected. + secondRequestReceived.Fire() + return nil + }, + }) + defer cleanup() + + // Create an xDS client with the above bootstrap contents. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Register a watch for a route configuration resource and have the watch + // callback push the received update on to a channel. + updateCh1 := testutils.NewChannel() + rdsCancel1 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh1.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel1() + + // Configure the management server to return a single route configuration + // resource, corresponding to the one we registered a watch for. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, cdsName)}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify the contents of the received update. + wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsName}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + }, + }, + }, + }, + }, + } + if err := verifyRouteConfigUpdate(ctx, updateCh1, wantUpdate); err != nil { + t.Fatal(err) + } + select { + case <-ctx.Done(): + t.Fatal("timeout when waiting for receipt of ACK at the management server") + case <-firstAckReceived.Done(): + } + + // Register another watch for the same resource. This should get the update + // from the cache. + updateCh2 := testutils.NewChannel() + rdsCancel2 := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh2.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel2() + if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil { + t.Fatal(err) + } + // No request should get sent out as part of this watch. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case <-secondRequestReceived.Done(): + t.Fatal("xdsClient sent out request instead of using update from cache") + } +} + +// TestRDSWatch_ExpiryTimerFiresBeforeResponse tests the case where the client +// does not receive an RDS response for the request that it sends. The test +// verifies that the watch callback is invoked with an error once the +// watchExpiryTimer fires. +func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { + // No need to spin up a management server since we don't want the client to + // receive a response for the watch being registered by the test. + + // Create an xDS client talking to a non-existent management server. + client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ + XDSServer: &bootstrap.ServerConfig{ + ServerURI: "dummy management server address", + Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), + TransportAPI: version.TransportV3, + NodeProto: &v3corepb.Node{}, + }, + }, defaultTestWatchExpiryTimeout, time.Duration(0)) + if err != nil { + t.Fatalf("failed to create xds client: %v", err) + } + defer client.Close() + + // Register a watch for a resource which is expected to fail with an error + // after the watch expiry timer fires. + updateCh := testutils.NewChannel() + rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel() + + // Wait for the watch expiry timer to fire. + <-time.After(defaultTestWatchExpiryTimeout) + + // Verify that an empty update with the expected error is received. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + wantErr := fmt.Errorf("watch for resource %q of type RouteConfiguration timed out", rdsName) + if err := verifyRouteConfigUpdate(ctx, updateCh, xdsresource.RouteConfigUpdateErrTuple{Err: wantErr}); err != nil { + t.Fatal(err) + } +} + +// TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior tests the case where the +// client receives a valid RDS response for the request that it sends. The test +// verifies that the behavior associated with the expiry timer (i.e, callback +// invocation with error) does not take place. +func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { + overrideFedEnvVar(t) + mgmtServer, err := e2e.StartManagementServer(nil) + if err != nil { + t.Fatalf("Failed to spin up the xDS management server: %v", err) + } + defer mgmtServer.Stop() + + // Create an xDS client talking to the above management server. + nodeID := uuid.New().String() + client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{ + XDSServer: &bootstrap.ServerConfig{ + ServerURI: mgmtServer.Address, + Creds: grpc.WithTransportCredentials(insecure.NewCredentials()), + TransportAPI: version.TransportV3, + NodeProto: &v3corepb.Node{Id: nodeID}, + }, + }, defaultTestWatchExpiryTimeout, time.Duration(0)) + if err != nil { + t.Fatalf("failed to create xds client: %v", err) + } + defer client.Close() + + // Register a watch for a route configuration resource and have the watch + // callback push the received update on to a channel. + updateCh := testutils.NewChannel() + rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh.Send(xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel() + + // Configure the management server to return a single route configuration + // resource, corresponding to the one we registered a watch for. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, cdsName)}, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify the contents of the received update. + wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsName}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + }, + }, + }, + }, + }, + } + if err := verifyRouteConfigUpdate(ctx, updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Wait for the watch expiry timer to fire, and verify that the callback is + // not invoked. + <-time.After(defaultTestWatchExpiryTimeout) + if err := verifyNoRouteConfigUpdate(ctx, updateCh); err != nil { + t.Fatal(err) + } +} + +// TestRDSWatch_NACKError covers the case where an update from the management +// server is NACK'ed by the xdsclient. The test verifies that the error is +// propagated to the watcher. +func (s) TestRDSWatch_NACKError(t *testing.T) { + overrideFedEnvVar(t) + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil) + defer cleanup() + + // Create an xDS client with the above bootstrap contents. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Register a watch for a route configuration resource and have the watch + // callback push the received update on to a channel. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + updateCh := testutils.NewChannel() + rdsCancel := client.WatchRouteConfig(rdsName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel() + + // Configure the management server to return a single route configuration + // resource which is expected to be NACKed by the client. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{badRouteConfigResource(rdsName, ldsName, cdsName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify that the expected error is propagated to the watcher. + u, err := updateCh.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for a route configuration resource from the management server: %v", err) + } + gotErr := u.(xdsresource.RouteConfigUpdateErrTuple).Err + if gotErr == nil || !strings.Contains(gotErr.Error(), wantRouteConfigNACKErr) { + t.Fatalf("update received with error: %v, want %q", gotErr, wantRouteConfigNACKErr) + } +} + +// TestRDSWatch_PartialValid covers the case where a response from the +// management server contains both valid and invalid resources and is expected +// to be NACK'ed by the xdsclient. The test verifies that watchers corresponding +// to the valid resource receive the update, while watchers corresponding to the +// invalid resource receive an error. +func (s) TestRDSWatch_PartialValid(t *testing.T) { + overrideFedEnvVar(t) + mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, nil) + defer cleanup() + + // Create an xDS client with the above bootstrap contents. + client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer client.Close() + + // Register two watches for route configuration resources. The first watch + // is expected to receive an error because the received resource is NACKed. + // The second watch is expected to get a good update. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + badResourceName := rdsName + updateCh1 := testutils.NewChannel() + rdsCancel1 := client.WatchRouteConfig(badResourceName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh1.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel1() + goodResourceName := rdsNameNewStyle + updateCh2 := testutils.NewChannel() + rdsCancel2 := client.WatchRouteConfig(goodResourceName, func(u xdsresource.RouteConfigUpdate, err error) { + updateCh2.SendContext(ctx, xdsresource.RouteConfigUpdateErrTuple{Update: u, Err: err}) + }) + defer rdsCancel2() + + // Configure the management server to return two route configuration + // resources, corresponding to the registered watches. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Routes: []*v3routepb.RouteConfiguration{ + badRouteConfigResource(badResourceName, ldsName, cdsName), + e2e.DefaultRouteConfig(goodResourceName, ldsName, cdsName), + }, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Verify that the expected error is propagated to the watcher which + // requested for the bad resource. + u, err := updateCh1.Receive(ctx) + if err != nil { + t.Fatalf("timeout when waiting for a route configuration resource from the management server: %v", err) + } + gotErr := u.(xdsresource.RouteConfigUpdateErrTuple).Err + if gotErr == nil || !strings.Contains(gotErr.Error(), wantRouteConfigNACKErr) { + t.Fatalf("update received with error: %v, want %q", gotErr, wantRouteConfigNACKErr) + } + + // Verify that the watcher watching the good resource receives a good + // update. + wantUpdate := xdsresource.RouteConfigUpdateErrTuple{ + Update: xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{ldsName}, + Routes: []*xdsresource.Route{ + { + Prefix: newStringP("/"), + ActionType: xdsresource.RouteActionRoute, + WeightedClusters: map[string]xdsresource.WeightedCluster{cdsName: {Weight: 1}}, + }, + }, + }, + }, + }, + } + if err := verifyRouteConfigUpdate(ctx, updateCh2, wantUpdate); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/xdsclient/watchers_route_test.go b/xds/internal/xdsclient/watchers_route_test.go deleted file mode 100644 index 669785084..000000000 --- a/xds/internal/xdsclient/watchers_route_test.go +++ /dev/null @@ -1,122 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xdsclient - -import ( - "context" - "fmt" - "testing" - - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" -) - -// TestRDSWatch covers the cases: -// - an update is received after a watch() -// - an update for another resource name (which doesn't trigger callback) -// - an update is received after cancel() -func (s) TestRDSWatch(t *testing.T) { - testWatch(t, xdsresource.RouteConfigResource, xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{testLDSName}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{testCDSName: {Weight: 1}}}}, - }, - }, - }, testRDSName) -} - -// TestRDSTwoWatchSameResourceName covers the case where an update is received -// after two watch() for the same resource name. -func (s) TestRDSTwoWatchSameResourceName(t *testing.T) { - testTwoWatchSameResourceName(t, xdsresource.RouteConfigResource, xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{testLDSName}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{testCDSName: {Weight: 1}}}}, - }, - }, - }, testRDSName) -} - -// TestRDSThreeWatchDifferentResourceName covers the case where an update is -// received after three watch() for different resource names. -func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) { - testThreeWatchDifferentResourceName(t, xdsresource.RouteConfigResource, - xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{testLDSName}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{testCDSName + "1": {Weight: 1}}}}, - }, - }, - }, testRDSName+"1", - xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{testLDSName}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{testCDSName + "2": {Weight: 1}}}}, - }, - }, - }, testRDSName+"2", - ) -} - -// TestRDSWatchAfterCache covers the case where watch is called after the update -// is in cache. -func (s) TestRDSWatchAfterCache(t *testing.T) { - testWatchAfterCache(t, xdsresource.RouteConfigResource, xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{testLDSName}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{testCDSName: {Weight: 1}}}}, - }, - }, - }, testRDSName) -} - -// TestRouteWatchNACKError covers the case that an update is NACK'ed, and the -// watcher should also receive the error. -func (s) TestRouteWatchNACKError(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - client, ctrlCh := testClientSetup(t, false) - rdsUpdateCh, _ := newWatch(t, client, xdsresource.RouteConfigResource, testRDSName) - _, updateHandler := getControllerAndPubsub(ctx, t, client, ctrlCh, xdsresource.RouteConfigResource, testRDSName) - - wantError := fmt.Errorf("testing error") - updateHandler.NewRouteConfigs(map[string]xdsresource.RouteConfigUpdateErrTuple{testRDSName: {Err: wantError}}, xdsresource.UpdateMetadata{ErrState: &xdsresource.UpdateErrorMetadata{Err: wantError}}) - if err := verifyRouteConfigUpdate(ctx, rdsUpdateCh, xdsresource.RouteConfigUpdate{}, wantError); err != nil { - t.Fatal(err) - } -} - -// TestRouteWatchPartialValid covers the case that a response contains both -// valid and invalid resources. This response will be NACK'ed by the xdsclient. -// But the watchers with valid resources should receive the update, those with -// invalida resources should receive an error. -func (s) TestRouteWatchPartialValid(t *testing.T) { - testWatchPartialValid(t, xdsresource.RouteConfigResource, xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{testLDSName}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{testCDSName: {Weight: 1}}}}, - }, - }, - }, testRDSName) -}