diff --git a/xds/internal/resolver/helpers_test.go b/xds/internal/resolver/helpers_test.go index 4c5af2313..d30ea1099 100644 --- a/xds/internal/resolver/helpers_test.go +++ b/xds/internal/resolver/helpers_test.go @@ -21,6 +21,7 @@ package resolver_test import ( "context" "fmt" + "strings" "testing" "time" @@ -161,6 +162,21 @@ func verifyNoUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan } } +// verifyErrorFromResolver waits for the resolver to push an error and verifies +// that it matches the expected error. +func verifyErrorFromResolver(ctx context.Context, t *testing.T, errCh chan error, wantErr string) { + t.Helper() + + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for error to be propagated to the ClientConn") + case gotErr := <-errCh: + if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { + t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr) + } + } +} + // Spins up an xDS management server and sets up an xDS bootstrap configuration // file that points to it. // diff --git a/xds/internal/resolver/internal/internal.go b/xds/internal/resolver/internal/internal.go new file mode 100644 index 000000000..f505eeb43 --- /dev/null +++ b/xds/internal/resolver/internal/internal.go @@ -0,0 +1,30 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package internal contains functionality internal to the xDS resolver. +package internal + +// The following variables are overridden in tests. +var ( + // NewWRR is the function used to create a new weighted round robin + // implementation. + NewWRR any // func() wrr.WRR + + // NewXDSClient is the function used to create a new xDS client. + NewXDSClient any // func() (xdsclient.XDSClient, func(), error) +) diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 06f6a4751..7b2fbe26e 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -40,6 +40,7 @@ import ( "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/httpfilter" + rinternal "google.golang.org/grpc/xds/internal/resolver/internal" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -348,9 +349,6 @@ func (cs *configSelector) stop() { } } -// A global for testing. -var newWRR = wrr.NewRandom - // newConfigSelector creates the config selector for su; may add entries to // r.activeClusters for previously-unseen clusters. func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) { @@ -366,7 +364,7 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } for i, rt := range su.virtualHost.Routes { - clusters := newWRR() + clusters := rinternal.NewWRR.(func() wrr.WRR)() if rt.ClusterSpecifierPlugin != "" { clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin clusters.Add(&routeCluster{ diff --git a/xds/internal/resolver/serviceconfig_test.go b/xds/internal/resolver/serviceconfig_test.go index a75bdb73e..ed1b68cb0 100644 --- a/xds/internal/resolver/serviceconfig_test.go +++ b/xds/internal/resolver/serviceconfig_test.go @@ -25,13 +25,23 @@ import ( xxhash "github.com/cespare/xxhash/v2" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/grpcutil" iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/metadata" _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + func (s) TestPruneActiveClusters(t *testing.T) { r := &xdsResolver{activeClusters: map[string]*clusterInfo{ "zero": {refCount: 0}, @@ -53,7 +63,7 @@ func (s) TestGenerateRequestHash(t *testing.T) { const channelID = 12378921 cs := &configSelector{ r: &xdsResolver{ - cc: &testClientConn{}, + cc: &testutils.ResolverClientConn{Logger: t}, channelID: channelID, }, } diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 6e75887ec..2a9b6332f 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -31,7 +31,9 @@ import ( "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/resolver" + rinternal "google.golang.org/grpc/xds/internal/resolver/internal" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -54,12 +56,12 @@ func newBuilderForTesting(config []byte) (resolver.Builder, error) { }, nil } -// For overriding in unittests. -var newXDSClient = func() (xdsclient.XDSClient, func(), error) { return xdsclient.New() } - func init() { resolver.Register(&xdsResolverBuilder{}) internal.NewXDSResolverWithConfigForTesting = newBuilderForTesting + + rinternal.NewWRR = wrr.NewRandom + rinternal.NewXDSClient = xdsclient.New } type xdsResolverBuilder struct { @@ -86,7 +88,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon r.logger = prefixLogger(r) r.logger.Infof("Creating resolver for target: %+v", target) - newXDSClient := newXDSClient + newXDSClient := rinternal.NewXDSClient.(func() (xdsclient.XDSClient, func(), error)) if b.newXDSClient != nil { newXDSClient = b.newXDSClient } @@ -115,7 +117,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon } if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() { if len(bootstrapConfig.CertProviderConfigs) == 0 { - return nil, errors.New("xds: xdsCreds specified but certificate_providers config missing in bootstrap file") + return nil, fmt.Errorf("xds: use of xDS credentials is specified, but certificate_providers config missing in bootstrap file") } } diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index c38ab8cb9..d7f42a941 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -16,19 +16,18 @@ * */ -package resolver +package resolver_test import ( "context" - "errors" - "net/url" - "reflect" + "fmt" "strings" "testing" "time" xxhash "github.com/cespare/xxhash/v2" "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc/codes" @@ -38,12 +37,10 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpcsync" - "google.golang.org/grpc/internal/grpctest" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/testutils" xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" - "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -51,16 +48,18 @@ import ( "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" "google.golang.org/grpc/xds/internal/httpfilter" - "google.golang.org/grpc/xds/internal/httpfilter/router" + xdsresolver "google.golang.org/grpc/xds/internal/resolver" + rinternal "google.golang.org/grpc/xds/internal/resolver/internal" xdstestutils "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/wrapperspb" + v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -68,135 +67,49 @@ import ( v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the cds LB policy _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config + _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter ) -const ( - targetStr = "target" - routeStr = "route" - cluster = "cluster" - defaultTestTimeout = 10 * time.Second - defaultTestShortTimeout = 100 * time.Microsecond -) - -var ( - target = resolver.Target{URL: *testutils.MustParseURL("xds:///" + targetStr)} -) - -func makeRouterFilter(t *testing.T) xdsresource.HTTPFilter { - f := httpfilter.Get(router.TypeURL) - cfg, _ := f.ParseFilterConfig(testutils.MarshalAny(t, &v3routerpb.Router{})) - return xdsresource.HTTPFilter{Name: "rtr", Filter: f, Config: cfg} -} - -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} - -func (s) TestRegister(t *testing.T) { - if resolver.Get(Scheme) == nil { - t.Errorf("scheme %v is not registered", Scheme) - } -} - -// testClientConn is a fake implemetation of resolver.ClientConn that pushes -// state updates and errors returned by the resolver on to channels for -// consumption by tests. -type testClientConn struct { - resolver.ClientConn - stateCh *testutils.Channel - errorCh *testutils.Channel -} - -func (t *testClientConn) UpdateState(s resolver.State) error { - // Tests should ideally consume all state updates, and if one happens - // unexpectedly, tests should catch it. Hence using `Send()` here. - t.stateCh.Send(s) - return nil -} - -func (t *testClientConn) ReportError(err error) { - // When used with a go-control-plane management server that continuously - // resends resources which are NACKed by the xDS client, using a `Replace()` - // here simplifies tests which will have access to the most recently - // received error. - t.errorCh.Replace(err) -} - -func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult { - return internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC) -} - -func newTestClientConn() *testClientConn { - return &testClientConn{ - stateCh: testutils.NewChannel(), - errorCh: testutils.NewChannel(), - } -} - -// TestResolverBuilder_ClientCreationFails tests the case where xDS client -// creation fails, and verifies that xDS resolver build fails as well. -func (s) TestResolverBuilder_ClientCreationFails(t *testing.T) { - // Override xDS client creation function and return an error. - origNewClient := newXDSClient - newXDSClient = func() (xdsclient.XDSClient, func(), error) { - return nil, nil, errors.New("failed to create xDS client") - } - defer func() { - newXDSClient = origNewClient - }() - - // Build an xDS resolver and expect it to fail. - builder := resolver.Get(Scheme) +// Tests the case where xDS client creation is expected to fail because the +// bootstrap configuration is not specified. The test verifies that xDS resolver +// build fails as well. +func (s) TestResolverBuilder_ClientCreationFails_NoBootstap(t *testing.T) { + // Build an xDS resolver without specifying bootstrap env vars. + builder := resolver.Get(xdsresolver.Scheme) if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", Scheme) + t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme) } - if _, err := builder.Build(target, newTestClientConn(), resolver.BuildOptions{}); err == nil { - t.Fatalf("builder.Build(%v) succeeded when expected to fail", target) + + target := resolver.Target{URL: *testutils.MustParseURL("xds:///target")} + if _, err := builder.Build(target, nil, resolver.BuildOptions{}); err == nil { + t.Fatalf("xds Resolver Build(%v) succeeded when expected to fail, because there is not bootstrap configuration for the xDS client", target) } } -// TestResolverBuilder_DifferentBootstrapConfigs tests the resolver builder's -// Build() method with different xDS bootstrap configurations. +// Tests the resolver builder's Build() method with different xDS bootstrap +// configurations. func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) { tests := []struct { - name string - bootstrapCfg *bootstrap.Config // Empty top-level xDS server config, will be set by test logic. - target resolver.Target - buildOpts resolver.BuildOptions - wantErr string + name string + target resolver.Target + buildOpts resolver.BuildOptions + bootstrapOpts xdsbootstrap.Options + wantErr string }{ { - name: "good", - bootstrapCfg: &bootstrap.Config{}, - target: target, - }, - { - name: "authority not defined in bootstrap", - bootstrapCfg: &bootstrap.Config{ - ClientDefaultListenerResourceNameTemplate: "%s", - Authorities: map[string]*bootstrap.Authority{ - "test-authority": { - ClientListenerResourceNameTemplate: "xdstp://test-authority/%s", - }, - }, - }, - target: resolver.Target{ - URL: url.URL{ - Host: "non-existing-authority", - Path: "/" + targetStr, - }, + name: "authority not defined in bootstrap", + target: resolver.Target{URL: *testutils.MustParseURL("xds://non-existing-authority/target")}, + bootstrapOpts: xdsbootstrap.Options{ + NodeID: "node-id", + ServerURI: "dummy-management-server", }, wantErr: `authority "non-existing-authority" is not found in the bootstrap file`, }, { - name: "xDS creds specified without certificate providers in bootstrap", - bootstrapCfg: &bootstrap.Config{}, - target: target, + name: "xDS creds specified without certificate providers in bootstrap", + target: resolver.Target{URL: *testutils.MustParseURL("xds:///target")}, buildOpts: resolver.BuildOptions{ DialCreds: func() credentials.TransportCredentials { creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{FallbackCreds: insecure.NewCredentials()}) @@ -206,45 +119,33 @@ func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) { return creds }(), }, - wantErr: `xdsCreds specified but certificate_providers config missing in bootstrap file`, + bootstrapOpts: xdsbootstrap.Options{ + NodeID: "node-id", + ServerURI: "dummy-management-server", + }, + wantErr: `use of xDS credentials is specified, but certificate_providers config missing in bootstrap file`, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) + // Create a bootstrap file and set the env var. + bootstrapCleanup, err := xdsbootstrap.CreateFile(test.bootstrapOpts) if err != nil { - t.Fatalf("Starting xDS management server: %v", err) + t.Fatal(err) } - defer mgmtServer.Stop() + defer bootstrapCleanup() - // Add top-level xDS server config corresponding to the above - // management server. - test.bootstrapCfg.XDSServer = xdstestutils.ServerConfigForAddress(t, mgmtServer.Address) - - // Override xDS client creation to use bootstrap configuration - // specified by the test. - origNewClient := newXDSClient - newXDSClient = func() (xdsclient.XDSClient, func(), error) { - // The watch timeout and idle authority timeout values passed to - // NewWithConfigForTesing() are immaterial for this test, as we - // are only testing the resolver build functionality. - return xdsclient.NewWithConfigForTesting(test.bootstrapCfg, defaultTestTimeout, defaultTestTimeout) - } - defer func() { - newXDSClient = origNewClient - }() - - builder := resolver.Get(Scheme) + builder := resolver.Get(xdsresolver.Scheme) if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", Scheme) + t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme) } - r, err := builder.Build(test.target, newTestClientConn(), test.buildOpts) + r, err := builder.Build(test.target, &testutils.ResolverClientConn{Logger: t}, test.buildOpts) if gotErr, wantErr := err != nil, test.wantErr != ""; gotErr != wantErr { - t.Fatalf("builder.Build(%v) returned err: %v, wantErr: %v", target, err, test.wantErr) + t.Fatalf("xds Resolver Build(%v) returned err: %v, wantErr: %v", test.target, err, test.wantErr) } if test.wantErr != "" && !strings.Contains(err.Error(), test.wantErr) { - t.Fatalf("builder.Build(%v) returned err: %v, wantErr: %v", target, err, test.wantErr) + t.Fatalf("xds Resolver Build(%v) returned err: %v, wantErr: %v", test.target, err, test.wantErr) } if err != nil { // This is the case where we expect an error and got it. @@ -255,98 +156,8 @@ func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) { } } -type setupOpts struct { - bootstrapC *bootstrap.Config - target resolver.Target -} - -func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *fakeclient.Client, *testClientConn, func()) { - t.Helper() - - fc := fakeclient.NewClient() - if opts.bootstrapC != nil { - fc.SetBootstrapConfig(opts.bootstrapC) - } - oldClientMaker := newXDSClient - closeCh := make(chan struct{}) - newXDSClient = func() (xdsclient.XDSClient, func(), error) { - return fc, grpcsync.OnceFunc(func() { close(closeCh) }), nil - } - cancel := func() { - // Make sure the xDS client is closed, in all (successful or failed) - // cases. - select { - case <-time.After(defaultTestTimeout): - t.Fatalf("timeout waiting for close") - case <-closeCh: - } - newXDSClient = oldClientMaker - } - builder := resolver.Get(Scheme) - if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", Scheme) - } - - tcc := newTestClientConn() - r, err := builder.Build(opts.target, tcc, resolver.BuildOptions{}) - if err != nil { - t.Fatalf("builder.Build(%v) returned err: %v", target, err) - } - return r.(*xdsResolver), fc, tcc, func() { - r.Close() - cancel() - } -} - -// waitForWatchListener waits for the WatchListener method to be called on the -// xdsClient within a reasonable amount of time, and also verifies that the -// watch is called with the expected target. -func waitForWatchListener(ctx context.Context, t *testing.T, xdsC *fakeclient.Client, wantTarget string) { - t.Helper() - - gotTarget, err := xdsC.WaitForWatchListener(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchService failed with error: %v", err) - } - if gotTarget != wantTarget { - t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", gotTarget, wantTarget) - } -} - -// waitForWatchRouteConfig waits for the WatchRoute method to be called on the -// xdsClient within a reasonable amount of time, and also verifies that the -// watch is called with the expected target. -func waitForWatchRouteConfig(ctx context.Context, t *testing.T, xdsC *fakeclient.Client, wantTarget string) { - t.Helper() - - gotTarget, err := xdsC.WaitForWatchRouteConfig(ctx) - if err != nil { - t.Fatalf("xdsClient.WatchService failed with error: %v", err) - } - if gotTarget != wantTarget { - t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", gotTarget, wantTarget) - } -} - -// buildResolverForTarget builds an xDS resolver for the given target. It -// returns a testClientConn which allows inspection of resolver updates, and a -// function to close the resolver once the test is complete. -func buildResolverForTarget(t *testing.T, target resolver.Target) (*testClientConn, func()) { - builder := resolver.Get(Scheme) - if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", Scheme) - } - - tcc := newTestClientConn() - r, err := builder.Build(target, tcc, resolver.BuildOptions{}) - if err != nil { - t.Fatalf("builder.Build(%v) returned err: %v", target, err) - } - return tcc, r.Close -} - -// TestResolverResourceName builds an xDS resolver and verifies that the -// resource name specified in the discovery request matches expectations. +// Test builds an xDS resolver and verifies that the resource name specified in +// the discovery request matches expectations. func (s) TestResolverResourceName(t *testing.T) { // Federation support is required when new style names are used. oldXDSFederation := envconfig.XDSFederation @@ -358,69 +169,47 @@ func (s) TestResolverResourceName(t *testing.T) { listenerResourceNameTemplate string extraAuthority string dialTarget string - wantResourceName string + wantResourceNames []string }{ { name: "default %s old style", listenerResourceNameTemplate: "%s", dialTarget: "xds:///target", - wantResourceName: "target", + wantResourceNames: []string{"target"}, }, { name: "old style no percent encoding", listenerResourceNameTemplate: "/path/to/%s", dialTarget: "xds:///target", - wantResourceName: "/path/to/target", + wantResourceNames: []string{"/path/to/target"}, }, { name: "new style with %s", listenerResourceNameTemplate: "xdstp://authority.com/%s", dialTarget: "xds:///0.0.0.0:8080", - wantResourceName: "xdstp://authority.com/0.0.0.0:8080", + wantResourceNames: []string{"xdstp://authority.com/0.0.0.0:8080"}, }, { name: "new style percent encoding", listenerResourceNameTemplate: "xdstp://authority.com/%s", dialTarget: "xds:///[::1]:8080", - wantResourceName: "xdstp://authority.com/%5B::1%5D:8080", + wantResourceNames: []string{"xdstp://authority.com/%5B::1%5D:8080"}, }, { name: "new style different authority", listenerResourceNameTemplate: "xdstp://authority.com/%s", extraAuthority: "test-authority", dialTarget: "xds://test-authority/target", - wantResourceName: "xdstp://test-authority/envoy.config.listener.v3.Listener/target", + wantResourceNames: []string{"xdstp://test-authority/envoy.config.listener.v3.Listener/target"}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Setup the management server to push the requested resource name - // on to a channel. No resources are configured on the management - // server as part of this test, as we are only interested in the - // resource name being requested. - resourceNameCh := make(chan string, 1) - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{ - OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { - // When the resolver is being closed, the watch associated - // with the listener resource will be cancelled, and it - // might result in a discovery request with no resource - // names. Hence, we only consider requests which contain a - // resource name. - var name string - if len(req.GetResourceNames()) == 1 { - name = req.GetResourceNames()[0] - } - select { - case resourceNameCh <- name: - default: - } - return nil - }, - }) - if err != nil { - t.Fatalf("Failed to start xDS management server: %v", err) - } - defer mgmtServer.Stop() + // Spin up an xDS management server for the test. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + nodeID := uuid.New().String() + mgmtServer, lisCh, _ := setupManagementServerForTest(ctx, t, nodeID) // Create a bootstrap configuration with test options. opts := xdsbootstrap.Options{ @@ -441,38 +230,32 @@ func (s) TestResolverResourceName(t *testing.T) { } defer cleanup() - _, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL(tt.dialTarget)}) - defer rClose() - - // Verify the resource name in the discovery request being sent out. - select { - case gotResourceName := <-resourceNameCh: - if gotResourceName != tt.wantResourceName { - t.Fatalf("Received discovery request with resource name: %v, want %v", gotResourceName, tt.wantResourceName) - } - case <-time.After(defaultTestTimeout): - t.Fatalf("Timeout when waiting for discovery request") - } + buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL(tt.dialTarget)}) + waitForResourceNames(ctx, t, lisCh, tt.wantResourceNames) }) } } -// TestResolverWatchCallbackAfterClose tests the case where a service update -// from the underlying xDS client is received after the resolver is closed, and -// verifies that the update is not propagated to the ClientConn. +// Tests the case where a service update from the underlying xDS client is +// received after the resolver is closed, and verifies that the update is not +// propagated to the ClientConn. func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { // Setup the management server that synchronizes with the test goroutine // using two channels. The management server signals the test goroutine when // it receives a discovery request for a route configuration resource. And // the test goroutine signals the management server when the resolver is // closed. - waitForRouteConfigDiscoveryReqCh := make(chan struct{}, 1) + routeConfigResourceNamesCh := make(chan []string, 1) waitForResolverCloseCh := make(chan struct{}) mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{ OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { if req.GetTypeUrl() == version.V3RouteConfigURL { select { - case waitForRouteConfigDiscoveryReqCh <- struct{}{}: + case <-routeConfigResourceNamesCh: + default: + } + select { + case routeConfigResourceNamesCh <- req.GetResourceNames(): default: } <-waitForResolverCloseCh @@ -496,49 +279,28 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) { } defer cleanup() - // Configure listener and route configuration resources on the management - // server. - const serviceName = "my-service-client-side-xds" - rdsName := "route-" + serviceName - cdsName := "cluster-" + serviceName - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)}, - SkipValidation: true, - } + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) // Wait for a discovery request for a route configuration resource. - select { - case <-waitForRouteConfigDiscoveryReqCh: - case <-ctx.Done(): - t.Fatal("Timeout when waiting for a discovery request for a route configuration resource") - } + stateCh, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) + waitForResourceNames(ctx, t, routeConfigResourceNamesCh, []string{defaultTestRouteConfigName}) // Close the resolver and unblock the management server. - rClose() + r.Close() close(waitForResolverCloseCh) // Verify that the update from the management server is not propagated to // the ClientConn. The xDS resolver, once closed, is expected to drop // updates from the xDS client. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := tcc.stateCh.Receive(sCtx); err != context.DeadlineExceeded { - t.Fatalf("ClientConn received an update from the resolver that was closed: %v", err) - } + verifyNoUpdateFromResolver(ctx, t, stateCh) } -// TestResolverCloseClosesXDSClient tests that the xDS resolver's Close method -// closes the xDS client. +// Tests that the xDS resolver's Close method closes the xDS client. func (s) TestResolverCloseClosesXDSClient(t *testing.T) { bootstrapCfg := &bootstrap.Config{ XDSServer: xdstestutils.ServerConfigForAddress(t, "dummy-management-server-address"), @@ -547,21 +309,19 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { // Override xDS client creation to use bootstrap configuration pointing to a // dummy management server. Also close a channel when the returned xDS // client is closed. + origNewClient := rinternal.NewXDSClient closeCh := make(chan struct{}) - origNewClient := newXDSClient - newXDSClient = func() (xdsclient.XDSClient, func(), error) { + rinternal.NewXDSClient = func() (xdsclient.XDSClient, func(), error) { c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout) - return c, func() { + return c, grpcsync.OnceFunc(func() { close(closeCh) cancel() - }, err + }), err } - defer func() { - newXDSClient = origNewClient - }() + defer func() { rinternal.NewXDSClient = origNewClient }() - _, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")}) - rClose() + _, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")}) + r.Close() select { case <-closeCh: @@ -570,32 +330,18 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { } } -// TestResolverBadServiceUpdate tests the case where a resource returned by the -// management server is NACKed by the xDS client, which then returns an update -// containing an error to the resolver. Verifies that the update is propagated -// to the ClientConn by the resolver. It also tests the cases where the resolver -// gets a good update subsequently, and another error after the good update. +// Tests the case where a resource returned by the management server is NACKed +// by the xDS client, which then returns an update containing an error to the +// resolver. Verifies that the update is propagated to the ClientConn by the +// resolver. It also tests the cases where the resolver gets a good update +// subsequently, and another error after the good update. The test also verifies +// that these are propagated to the ClientConn. func (s) TestResolverBadServiceUpdate(t *testing.T) { - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. + // Spin up an xDS management server for the test. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) // Configure a listener resource that is expected to be NACKed because it // does not contain the `RouteSpecifier` field in the HTTPConnectionManager. @@ -603,7 +349,7 @@ func (s) TestResolverBadServiceUpdate(t *testing.T) { HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, }) lis := &v3listenerpb.Listener{ - Name: serviceName, + Name: defaultTestServiceName, ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, FilterChains: []*v3listenerpb.FilterChain{{ Name: "filter-chain-name", @@ -613,70 +359,26 @@ func (s) TestResolverBadServiceUpdate(t *testing.T) { }}, }}, } - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{lis}, - SkipValidation: true, - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil) + // Build the resolver and expect an error update from it. + stateCh, errCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) wantErr := "no RouteSpecifier" - val, err := tcc.errorCh.Receive(ctx) - if err != nil { - t.Fatal("Timeout when waiting for error to be propagated to the ClientConn") - } - gotErr := val.(error) - if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { - t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr) - } + verifyErrorFromResolver(ctx, t, errCh, wantErr) // Configure good listener and route configuration resources on the // management server. - rdsName := "route-" + serviceName - cdsName := "cluster-" + serviceName - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, serviceName, cdsName)}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) // Expect a good update from the resolver. - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } + verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) - // Configure another bad resource on the management server. - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{lis}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Expect an error update from the resolver. - val, err = tcc.errorCh.Receive(ctx) - if err != nil { - t.Fatal("Timeout when waiting for error to be propagated to the ClientConn") - } - gotErr = val.(error) - if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { - t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr) - } + // Configure another bad resource on the management server and expect an + // error update from the resolver. + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil) + verifyErrorFromResolver(ctx, t, errCh, wantErr) } // TestResolverGoodServiceUpdate tests the case where the resource returned by @@ -686,83 +388,38 @@ func (s) TestResolverBadServiceUpdate(t *testing.T) { // returned by the resolver picks clusters based on the route configuration // received from the management server. func (s) TestResolverGoodServiceUpdate(t *testing.T) { - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. - nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() - - ldsName := serviceName - rdsName := "route-" + serviceName for _, tt := range []struct { + name string routeConfig *v3routepb.RouteConfiguration wantServiceConfig string wantClusters map[string]bool }{ { - // A route configuration with a single cluster. + name: "single cluster", routeConfig: e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ - RouteConfigName: rdsName, - ListenerName: ldsName, + RouteConfigName: defaultTestRouteConfigName, + ListenerName: defaultTestServiceName, ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeCluster, - ClusterName: "test-cluster-1", + ClusterName: defaultTestClusterName, }), - wantServiceConfig: ` -{ - "loadBalancingConfig": [{ - "xds_cluster_manager_experimental": { - "children": { - "cluster:test-cluster-1": { - "childPolicy": [{ - "cds_experimental": { - "cluster": "test-cluster-1" - } - }] - } - } - } - }] -}`, - wantClusters: map[string]bool{"cluster:test-cluster-1": true}, + wantServiceConfig: wantDefaultServiceConfig, + wantClusters: map[string]bool{fmt.Sprintf("cluster:%s", defaultTestClusterName): true}, }, { - // A route configuration with a two new clusters. + name: "two clusters", routeConfig: e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ - RouteConfigName: rdsName, - ListenerName: ldsName, + RouteConfigName: defaultTestRouteConfigName, + ListenerName: defaultTestServiceName, ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster, WeightedClusters: map[string]int{"cluster_1": 75, "cluster_2": 25}, }), // This update contains the cluster from the previous update as well // as this update, as the previous config selector still references // the old cluster when the new one is pushed. - wantServiceConfig: ` -{ + wantServiceConfig: `{ "loadBalancingConfig": [{ "xds_cluster_manager_experimental": { "children": { - "cluster:test-cluster-1": { - "childPolicy": [{ - "cds_experimental": { - "cluster": "test-cluster-1" - } - }] - }, "cluster:cluster_1": { "childPolicy": [{ "cds_experimental": { @@ -779,148 +436,97 @@ func (s) TestResolverGoodServiceUpdate(t *testing.T) { } } } - }] -}`, + }]}`, wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, } { - // Configure the management server with a good listener resource and a - // route configuration resource, as specified by the test case. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{tt.routeConfig}, - SkipValidation: true, - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + t.Run(tt.name, func(t *testing.T) { + // Spin up an xDS management server for the test. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) - // Read the update pushed by the resolver to the ClientConn. - val, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } + // Configure the management server with a good listener resource and a + // route configuration resource, as specified by the test case. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{tt.routeConfig} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(tt.wantServiceConfig) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } + // Read the update pushed by the resolver to the ClientConn. + cs := verifyUpdateFromResolver(ctx, t, stateCh, tt.wantServiceConfig) - pickedClusters := make(map[string]bool) - // Odds of picking 75% cluster 100 times in a row: 1 in 3E-13. And - // with the random number generator stubbed out, we can rely on this - // to be 100% reproducible. - for i := 0; i < 100; i++ { - res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) - if err != nil { - t.Fatalf("cs.SelectConfig(): %v", err) + pickedClusters := make(map[string]bool) + // Odds of picking 75% cluster 100 times in a row: 1 in 3E-13. And + // with the random number generator stubbed out, we can rely on this + // to be 100% reproducible. + for i := 0; i < 100; i++ { + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) + if err != nil { + t.Fatalf("cs.SelectConfig(): %v", err) + } + cluster := clustermanager.GetPickedClusterForTesting(res.Context) + pickedClusters[cluster] = true + res.OnCommitted() } - cluster := clustermanager.GetPickedClusterForTesting(res.Context) - pickedClusters[cluster] = true - res.OnCommitted() - } - if !cmp.Equal(pickedClusters, tt.wantClusters) { - t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters) - } + if !cmp.Equal(pickedClusters, tt.wantClusters) { + t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters) + } + }) } } -// TestResolverRequestHash tests a case where a resolver receives a RouteConfig update -// with a HashPolicy specifying to generate a hash. The configSelector generated should +// Tests a case where a resolver receives a RouteConfig update with a HashPolicy +// specifying to generate a hash. The configSelector generated should // successfully generate a Hash. func (s) TestResolverRequestHash(t *testing.T) { oldRH := envconfig.XDSRingHash envconfig.XDSRingHash = true defer func() { envconfig.XDSRingHash = oldRH }() - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. + // Spin up an xDS management server for the test. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() - - ldsName := serviceName - rdsName := "route-" + serviceName // Configure the management server with a good listener resource and a // route configuration resource that specifies a hash policy. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{{ - Name: rdsName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{ldsName}, - Routes: []*v3routepb.Route{{ - Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, - Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - { - Name: "test-cluster-1", - Weight: &wrapperspb.UInt32Value{Value: 100}, - }, + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: defaultTestClusterName, + Weight: &wrapperspb.UInt32Value{Value: 100}, }, - }}, - HashPolicy: []*v3routepb.RouteAction_HashPolicy{{ - PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ - Header: &v3routepb.RouteAction_HashPolicy_Header{ - HeaderName: ":path", - }, + }, + }}, + HashPolicy: []*v3routepb.RouteAction_HashPolicy{{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: ":path", }, - Terminal: true, - }}, + }, + Terminal: true, }}, }}, }}, }}, - SkipValidation: true, - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + }} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) - // Read the update pushed by the resolver to the ClientConn. - val, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } + // Build the resolver and read the config selector out of it. + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) + cs := verifyUpdateFromResolver(ctx, t, stateCh, "") // Selecting a config when there was a hash policy specified in the route // that will be selected should put a request hash in the config's context. @@ -938,85 +544,27 @@ func (s) TestResolverRequestHash(t *testing.T) { } } -// TestResolverRemovedWithRPCs tests the case where resources are removed from -// the management server, causing it to send an empty update to the xDS client, -// which returns a resource-not-found error to the xDS resolver. The test -// verifies that an ongoing RPC is handled properly when this happens. +// Tests the case where resources are removed from the management server, +// causing it to send an empty update to the xDS client, which returns a +// resource-not-found error to the xDS resolver. The test verifies that an +// ongoing RPC is handled to completion when this happens. func (s) TestResolverRemovedWithRPCs(t *testing.T) { - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. - nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() - - ldsName := serviceName - rdsName := "route-" + serviceName - // Configure the management server with a good listener and route - // configuration resource. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")}, - SkipValidation: true, - } + // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) + + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) // Read the update pushed by the resolver to the ClientConn. - val, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` -{ - "loadBalancingConfig": [ - { - "xds_cluster_manager_experimental": { - "children": { - "cluster:test-cluster-1": { - "childPolicy": [ - { - "cds_experimental": { - "cluster": "test-cluster-1" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) @@ -1032,21 +580,7 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) { // not produce an empty service config at this point. Instead it will retain // the cluster to which the RPC is ongoing in the service config, but will // return an erroring config selector which will fail new RPCs. - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } - cs = iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } + cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) _, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err == nil || status.Code(err) != codes.Unavailable { t.Fatalf("cs.SelectConfig() returned: %v, want: %v", err, codes.Unavailable) @@ -1058,100 +592,42 @@ func (s) TestResolverRemovedWithRPCs(t *testing.T) { // Now that the RPC is committed, the xDS resolver is expected to send an // update with an empty service config. - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`{}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) + var state resolver.State + select { + case <-ctx.Done(): + t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err()) + case state = <-stateCh: + if err := state.ServiceConfig.Err; err != nil { + t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err) + } + wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}") + if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) { + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) + } } } -// TestResolverRemovedResource tests the case where resources returned by the -// management server are removed. The test verifies that the resolver pushes the -// expected config selector and service config in this case. +// Tests the case where resources returned by the management server are removed. +// The test verifies that the resolver pushes the expected config selector and +// service config in this case. func (s) TestResolverRemovedResource(t *testing.T) { - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. - nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() - - // Configure the management server with a good listener and route - // configuration resource. - ldsName := serviceName - rdsName := "route-" + serviceName - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "test-cluster-1")}, - SkipValidation: true, - } + // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) + + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) // Read the update pushed by the resolver to the ClientConn. - val, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` -{ - "loadBalancingConfig": [ - { - "xds_cluster_manager_experimental": { - "children": { - "cluster:test-cluster-1": { - "childPolicy": [ - { - "cds_experimental": { - "cluster": "test-cluster-1" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) // "Make an RPC" by invoking the config selector. - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } - res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) @@ -1169,257 +645,128 @@ func (s) TestResolverRemovedResource(t *testing.T) { // The channel should receive the existing service config with the original // cluster but with an erroring config selector. - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) // "Make another RPC" by invoking the config selector. - cs = iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } - res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err == nil || status.Code(err) != codes.Unavailable { t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err) } // In the meantime, an empty ServiceConfig update should have been sent. - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}") - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) + var state resolver.State + select { + case <-ctx.Done(): + t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err()) + case state = <-stateCh: + if err := state.ServiceConfig.Err; err != nil { + t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err) + } + wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}") + if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) { + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) + } } } -// TestResolverWRR tests the case where the route configuration returned by the -// management server contains a set of weighted clusters. The test performs a -// bunch of RPCs using the cluster specifier returned by the resolver, and -// verifies the cluster distribution. -func (s) TestResolverWRR(t *testing.T) { - defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) - newWRR = testutils.NewTestWRR - - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. - nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() - - ldsName := serviceName - rdsName := "route-" + serviceName - // Configure the management server with a good listener resource and a - // route configuration resource. - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ - RouteConfigName: rdsName, - ListenerName: ldsName, - ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster, - WeightedClusters: map[string]int{"A": 75, "B": 25}, - })}, - SkipValidation: true, - } +// Tests the case where the resolver receives max stream duration as part of the +// listener and route configuration resources. The test verifies that the RPC +// timeout returned by the config selector matches expectations. A non-nil max +// stream duration (this includes an explicit zero value) in a matching route +// overrides the value specified in the listener resource. +func (s) TestResolverMaxStreamDuration(t *testing.T) { + // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Read the update pushed by the resolver to the ClientConn. - gotState, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := gotState.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } - - // Make RPCs are verify WRR behavior in the cluster specifier. - picks := map[string]int{} - for i := 0; i < 100; i++ { - res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) - if err != nil { - t.Fatalf("cs.SelectConfig(): %v", err) - } - picks[clustermanager.GetPickedClusterForTesting(res.Context)]++ - res.OnCommitted() - } - want := map[string]int{"cluster:A": 75, "cluster:B": 25} - if !cmp.Equal(picks, want) { - t.Errorf("Picked clusters: %v; want: %v", picks, want) - } -} - -// TestResolverMaxStreamDuration tests the case where the resolver receives max -// stream duration as part of the listener and route configuration resources. -// The test verifies that the RPC timeout returned by the config selector -// matches expectations. A non-nil max stream duration (this includes an -// explicit zero value) in a matching route overrides the value specified in the -// listener resource. -func (s) TestResolverMaxStreamDuration(t *testing.T) { - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) // Configure the management server with a listener resource that specifies a // max stream duration as part of its HTTP connection manager. Also // configure a route configuration resource, which has multiple routes with // different values of max stream duration. - ldsName := serviceName - rdsName := "route-" + serviceName hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{ ConfigSource: &v3corepb.ConfigSource{ ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, }, - RouteConfigName: rdsName, + RouteConfigName: defaultTestRouteConfigName, }}, HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ MaxStreamDuration: durationpb.New(1 * time.Second), }, }) - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{{ - Name: ldsName, - ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, - FilterChains: []*v3listenerpb.FilterChain{{ - Name: "filter-chain-name", - Filters: []*v3listenerpb.Filter{{ - Name: wellknown.HTTPConnectionManager, - ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, - }}, + listeners := []*v3listenerpb.Listener{{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, }}, }}, - Routes: []*v3routepb.RouteConfiguration{{ - Name: rdsName, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{ldsName}, - Routes: []*v3routepb.Route{ - { - Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo"}}, - Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - { - Name: "A", - Weight: &wrapperspb.UInt32Value{Value: 100}, - }, - }}, - }, - MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{ - MaxStreamDuration: durationpb.New(5 * time.Second), - }, - }}, - }, - { - Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/bar"}}, - Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - { - Name: "B", - Weight: &wrapperspb.UInt32Value{Value: 100}, - }, - }}, - }, - MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{ - MaxStreamDuration: durationpb.New(0 * time.Second), - }, - }}, - }, - { - Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, - Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ - Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ - { - Name: "C", - Weight: &wrapperspb.UInt32Value{Value: 100}, - }, - }}, - }, - }}, - }, + }} + routes := []*v3routepb.RouteConfiguration{{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "A", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }}, + }, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{ + MaxStreamDuration: durationpb.New(5 * time.Second), + }, + }}, }, - }}, + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/bar"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "B", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }}, + }, + MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{ + MaxStreamDuration: durationpb.New(0 * time.Second), + }, + }}, + }, + { + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + { + Name: "C", + Weight: &wrapperspb.UInt32Value{Value: 100}, + }, + }}, + }, + }}, + }, + }, }}, - SkipValidation: true, - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + }} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) // Read the update pushed by the resolver to the ClientConn. - gotState, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := gotState.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } + cs := verifyUpdateFromResolver(ctx, t, stateCh, "") testCases := []struct { name string @@ -1459,90 +806,32 @@ func (s) TestResolverMaxStreamDuration(t *testing.T) { } } -// TestResolverDelayedOnCommitted tests that clusters remain in service -// config if RPCs are in flight. +// Tests that clusters remain in service config if RPCs are in flight. func (s) TestResolverDelayedOnCommitted(t *testing.T) { - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. - nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() - - // Configure the management server with a good listener and route - // configuration resource. - ldsName := serviceName - rdsName := "route-" + serviceName - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "old-cluster")}, - SkipValidation: true, - } + // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) + + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) // Read the update pushed by the resolver to the ClientConn. - val, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` -{ - "loadBalancingConfig": [ - { - "xds_cluster_manager_experimental": { - "children": { - "cluster:old-cluster": { - "childPolicy": [ - { - "cds_experimental": { - "cluster": "old-cluster" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig) // Make an RPC, but do not commit it yet. - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } - if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != "cluster:old-cluster" { - t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:old-cluster") + wantClusterName := fmt.Sprintf("cluster:%s", defaultTestClusterName) + if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != wantClusterName { + t.Fatalf("Picked cluster is %q, want %q", cluster, wantClusterName) } // Delay resOld.OnCommitted(). As long as there are pending RPCs to removed @@ -1550,70 +839,51 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { // Update the route configuration resource on the management server to // return a new cluster. - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(rdsName, ldsName, "new-cluster")}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + newClusterName := "new-" + defaultTestClusterName + routes = []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) // Read the update pushed by the resolver to the ClientConn and ensure the // old cluster is present in the service config. Also ensure that the newly // returned config selector does not hold a reference to the old cluster. - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` + wantSC := fmt.Sprintf(` { "loadBalancingConfig": [ { "xds_cluster_manager_experimental": { "children": { - "cluster:old-cluster": { + "cluster:%s": { "childPolicy": [ { "cds_experimental": { - "cluster": "old-cluster" - } - } - ] - }, - "cluster:new-cluster": { + "cluster": "%s" + } + } + ] + }, + "cluster:%s": { "childPolicy": [ { "cds_experimental": { - "cluster": "new-cluster" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s\nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + "cluster": "%s" + } + } + ] + } + } + } + } + ] +}`, defaultTestClusterName, defaultTestClusterName, newClusterName, newClusterName) + cs = verifyUpdateFromResolver(ctx, t, stateCh, wantSC) - cs = iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) } - if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != "cluster:new-cluster" { - t.Fatalf("Picked cluster is %q, want %q", cluster, "cluster:new-cluster") + wantClusterName = fmt.Sprintf("cluster:%s", newClusterName) + if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != wantClusterName { + t.Fatalf("Picked cluster is %q, want %q", cluster, wantClusterName) } // Invoke OnCommitted on the old RPC; should lead to a service config update @@ -1621,319 +891,510 @@ func (s) TestResolverDelayedOnCommitted(t *testing.T) { // pending RPCs. resOld.OnCommitted() - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` + wantSC = fmt.Sprintf(` { "loadBalancingConfig": [ { "xds_cluster_manager_experimental": { "children": { - "cluster:new-cluster": { + "cluster:%s": { "childPolicy": [ { "cds_experimental": { - "cluster": "new-cluster" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + "cluster": "%s" + } + } + ] + } + } + } + } + ] +}`, newClusterName, newClusterName) + verifyUpdateFromResolver(ctx, t, stateCh, wantSC) } -// TestResolverMultipleLDSUpdates tests the case where two LDS updates with the -// same RDS name to watch are received without an RDS in between. Those LDS -// updates shouldn't trigger a service config update. +// Tests the case where two LDS updates with the same RDS name to watch are +// received without an RDS in between. Those LDS updates shouldn't trigger a +// service config update. func (s) TestResolverMultipleLDSUpdates(t *testing.T) { - mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) - if err != nil { - t.Fatal(err) - } - defer mgmtServer.Stop() - - // Create a bootstrap configuration specifying the above management server. + // Spin up an xDS management server for the test. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() nodeID := uuid.New().String() - cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{ - NodeID: nodeID, - ServerURI: mgmtServer.Address, - }) - if err != nil { - t.Fatal(err) - } - defer cleanup() - - // Build an xDS resolver that uses the above bootstrap configuration - // Creating the xDS resolver should result in creation of the xDS client. - const serviceName = "my-service-client-side-xds" - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) // Configure the management server with a listener resource, but no route // configuration resource. - ldsName := serviceName - rdsName := "route-" + serviceName - resources := e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)}, - SkipValidation: true, - } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) // Ensure there is no update from the resolver. - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - gotState, err := tcc.stateCh.Receive(sCtx) - if err == nil { - t.Fatalf("Received update from resolver %v when none expected", gotState) - } + verifyNoUpdateFromResolver(ctx, t, stateCh) // Configure the management server with a listener resource that points to - // the same route configuration resource but has different values for some - // other fields. There is still no route configuration resource on the - // management server. + // the same route configuration resource but has different values for max + // stream duration field. There is still no route configuration resource on + // the management server. hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{ ConfigSource: &v3corepb.ConfigSource{ ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, }, - RouteConfigName: rdsName, + RouteConfigName: defaultTestRouteConfigName, }}, HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{ MaxStreamDuration: durationpb.New(1 * time.Second), }, }) - resources = e2e.UpdateOptions{ - NodeID: nodeID, - Listeners: []*v3listenerpb.Listener{{ - Name: ldsName, - ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, - FilterChains: []*v3listenerpb.FilterChain{{ - Name: "filter-chain-name", - Filters: []*v3listenerpb.Filter{{ - Name: wellknown.HTTPConnectionManager, - ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, - }}, + listeners = []*v3listenerpb.Listener{{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, }}, }}, - SkipValidation: true, - } - if err := mgmtServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } + }} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil) // Ensure that there is no update from the resolver. - sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - gotState, err = tcc.stateCh.Receive(sCtx) - if err == nil { - t.Fatalf("Received update from resolver %v when none expected", gotState) + verifyNoUpdateFromResolver(ctx, t, stateCh) +} + +// TestResolverWRR tests the case where the route configuration returned by the +// management server contains a set of weighted clusters. The test performs a +// bunch of RPCs using the cluster specifier returned by the resolver, and +// verifies the cluster distribution. +func (s) TestResolverWRR(t *testing.T) { + origNewWRR := rinternal.NewWRR + rinternal.NewWRR = testutils.NewTestWRR + defer func() { rinternal.NewWRR = origNewWRR }() + + // Spin up an xDS management server for the test. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) + + // Configure resources on the management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ + RouteConfigName: defaultTestRouteConfigName, + ListenerName: defaultTestServiceName, + ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster, + WeightedClusters: map[string]int{"A": 75, "B": 25}, + })} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + + // Read the update pushed by the resolver to the ClientConn. + cs := verifyUpdateFromResolver(ctx, t, stateCh, "") + + // Make RPCs are verify WRR behavior in the cluster specifier. + picks := map[string]int{} + for i := 0; i < 100; i++ { + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) + if err != nil { + t.Fatalf("cs.SelectConfig(): %v", err) + } + picks[clustermanager.GetPickedClusterForTesting(res.Context)]++ + res.OnCommitted() } + want := map[string]int{"cluster:A": 75, "cluster:B": 25} + if !cmp.Equal(picks, want) { + t.Errorf("Picked clusters: %v; want: %v", picks, want) + } +} + +const filterCfgPathFieldName = "path" +const filterCfgErrorFieldName = "new_stream_error" + +type filterCfg struct { + httpfilter.FilterConfig + path string + newStreamErr error } type filterBuilder struct { - httpfilter.Filter // embedded as we do not need to implement registry / parsing in this test. - path *[]string + paths []string + typeURL string } +func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} } + +func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) { + ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct) + if !ok { + return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{}) + } + + if ts.GetValue() == nil { + return filterCfg{}, nil + } + ret := filterCfg{} + if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil { + ret.path = v.GetStringValue() + } + if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil { + if v.GetStringValue() == "" { + ret.newStreamErr = nil + } else { + ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue()) + } + } + return ret, nil +} + +func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + return filterConfigFromProto(cfg) +} + +func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { + return filterConfigFromProto(override) +} + +func (*filterBuilder) IsTerminal() bool { return false } + var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{} func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { if config == nil { panic("unexpected missing config") } - *fb.path = append(*fb.path, "build:"+config.(filterCfg).s) + + fi := &filterInterceptor{ + parent: fb, + pathCh: make(chan string, 10), + } + + fb.paths = append(fb.paths, "build:"+config.(filterCfg).path) err := config.(filterCfg).newStreamErr if override != nil { - *fb.path = append(*fb.path, "override:"+override.(filterCfg).s) + fb.paths = append(fb.paths, "override:"+override.(filterCfg).path) err = override.(filterCfg).newStreamErr } - return &filterInterceptor{path: fb.path, s: config.(filterCfg).s, err: err}, nil + fi.cfgPath = config.(filterCfg).path + fi.err = err + return fi, nil } type filterInterceptor struct { - path *[]string - s string - err error + parent *filterBuilder + pathCh chan string + cfgPath string + err error } func (fi *filterInterceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { - *fi.path = append(*fi.path, "newstream:"+fi.s) + fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath) if fi.err != nil { return nil, fi.err } d := func() { - *fi.path = append(*fi.path, "done:"+fi.s) + fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath) done() } cs, err := newStream(ctx, d) if err != nil { return nil, err } - return &clientStream{ClientStream: cs, path: fi.path, s: fi.s}, nil + return &clientStream{ClientStream: cs}, nil } type clientStream struct { iresolver.ClientStream - path *[]string - s string } -type filterCfg struct { - httpfilter.FilterConfig - s string - newStreamErr error -} +func (s) TestConfigSelector_FailureCases(t *testing.T) { + const methodName = "1" -func (s) TestXDSResolverHTTPFilters(t *testing.T) { - var path []string - testCases := []struct { - name string - ldsFilters []xdsresource.HTTPFilter - rtCfgUpdate xdsresource.RouteConfigUpdate - rpcRes map[string][][]string - selectErr string - newStreamErr string + tests := []struct { + name string + listener *v3listenerpb.Listener + wantErr string }{ - { name: "route type RouteActionUnsupported invalid for client", - ldsFilters: []xdsresource.HTTPFilter{ - {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, - }, - rtCfgUpdate: xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("1"), - WeightedClusters: map[string]xdsresource.WeightedCluster{ - "A": {Weight: 1}, - "B": {Weight: 1}, - }, - ActionType: xdsresource.RouteActionUnsupported, - }}, - }, + listener: &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName}, + }, + Action: &v3routepb.Route_FilterAction{}, + }}, + }}, + }}, + HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, + }), }, }, - rpcRes: map[string][][]string{ - "1": { - {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - }, - }, - selectErr: errUnsupportedClientRouteAction.Error(), + wantErr: "matched route does not have a supported route action type", }, { name: "route type RouteActionNonForwardingAction invalid for client", - ldsFilters: []xdsresource.HTTPFilter{ - {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, - }, - rtCfgUpdate: xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("1"), - WeightedClusters: map[string]xdsresource.WeightedCluster{ - "A": {Weight: 1}, - "B": {Weight: 1}, - }, - ActionType: xdsresource.RouteActionNonForwardingAction, - }}, - }, + listener: &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName}, + }, + Action: &v3routepb.Route_NonForwardingAction{}, + }}, + }}, + }}, + HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter}, + }), }, }, - rpcRes: map[string][][]string{ - "1": { - {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, - }, - }, - selectErr: errUnsupportedClientRouteAction.Error(), + wantErr: "matched route does not have a supported route action type", }, - { - name: "NewStream error; ensure earlier interceptor Done is still called", - ldsFilters: []xdsresource.HTTPFilter{ - {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1"}}, - {Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1", newStreamErr: errors.New("bar newstream err")}}, - makeRouterFilter(t), - }, - rtCfgUpdate: xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("1"), - WeightedClusters: map[string]xdsresource.WeightedCluster{ - "A": {Weight: 1}, - "B": {Weight: 1}, - }, - ActionType: xdsresource.RouteActionRoute, - }}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Spin up an xDS management server. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) + + // Build an xDS resolver. + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) + + // Update the management server with a listener resource that + // contains inline route configuration. + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{test.listener}, nil) + + // Ensure that the resolver pushes a state update to the channel. + cs := verifyUpdateFromResolver(ctx, t, stateCh, "") + + // Ensure that it returns the expected error. + _, err := cs.SelectConfig(iresolver.RPCInfo{Method: methodName, Context: ctx}) + if err == nil || !strings.Contains(err.Error(), test.wantErr) { + t.Errorf("SelectConfig(_) = _, %v; want _, Contains(%v)", err, test.wantErr) + } + }) + } +} + +func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter { + return &v3httppb.HttpFilter{ + Name: name, + ConfigType: &v3httppb.HttpFilter_TypedConfig{ + TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: typeURL, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: path}}, + filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}}, }, }, + }), + }, + } +} + +func (s) TestXDSResolverHTTPFilters(t *testing.T) { + const methodName1 = "1" + const methodName2 = "2" + testFilterName := t.Name() + + testCases := []struct { + name string + listener *v3listenerpb.Listener + rpcRes map[string][][]string + wantStreamErr string + }{ + { + name: "NewStream error - ensure earlier interceptor Done is still called", + listener: &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + {Name: "B", Weight: wrapperspb.UInt32(1)}, + }, + }, + }, + }, + }, + }}, + }}, + }}, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "foo", testFilterName, "foo1", ""), + newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"), + e2e.RouterHTTPFilter, + }, + }), + }, }, rpcRes: map[string][][]string{ - "1": { - {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1" /* */, "done:foo1"}, + methodName1: { + {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"}, // err in bar1 NewStream() }, }, - newStreamErr: "bar newstream err", + wantStreamErr: "bar newstream err", }, { name: "all overrides", - ldsFilters: []xdsresource.HTTPFilter{ - {Name: "foo", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "foo1", newStreamErr: errors.New("this is overridden to nil")}}, - {Name: "bar", Filter: &filterBuilder{path: &path}, Config: filterCfg{s: "bar1"}}, - makeRouterFilter(t), - }, - rtCfgUpdate: xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{ - Prefix: newStringP("1"), - WeightedClusters: map[string]xdsresource.WeightedCluster{ - "A": {Weight: 1}, - "B": {Weight: 1}, - }, - ActionType: xdsresource.RouteActionRoute, - }, { - Prefix: newStringP("2"), - WeightedClusters: map[string]xdsresource.WeightedCluster{ - "A": {Weight: 1}, - "B": {Weight: 1, HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo4"}, "bar": filterCfg{s: "bar4"}}}, - }, - HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo3"}, "bar": filterCfg{s: "bar3"}}, - ActionType: xdsresource.RouteActionRoute, - }}, - HTTPFilterConfigOverride: map[string]httpfilter.FilterConfig{"foo": filterCfg{s: "foo2"}, "bar": filterCfg{s: "bar2"}}, - }, + listener: &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{ + RouteConfig: &v3routepb.RouteConfiguration{ + Name: defaultTestRouteConfigName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{defaultTestServiceName}, + Routes: []*v3routepb.Route{ + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + {Name: "B", Weight: wrapperspb.UInt32(1)}, + }, + }, + }, + }, + }, + }, + { + Match: &v3routepb.RouteMatch{ + PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2}, + }, + Action: &v3routepb.Route_Route{ + Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{ + WeightedClusters: &v3routepb.WeightedCluster{ + Clusters: []*v3routepb.WeightedCluster_ClusterWeight{ + {Name: "A", Weight: wrapperspb.UInt32(1)}, + { + Name: "B", + Weight: wrapperspb.UInt32(1), + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}}, + }, + }, + }), + }, + }, + }, + }, + }, + }, + }, + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}}, + filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}}, + }, + }, + }), + }, + }, + }, + TypedPerFilterConfig: map[string]*anypb.Any{ + "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}}, + filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}}, + }, + }, + }), + "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{ + TypeUrl: testFilterName, + Value: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}}, + }, + }, + }), + }, + }}, + }}, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"), + newHTTPFilter(t, "bar", testFilterName, "bar1", ""), + e2e.RouterHTTPFilter, + }, + }), }, }, rpcRes: map[string][][]string{ - "1": { + methodName1: { {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, }, - "2": { + methodName2: { {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"}, @@ -1943,73 +1404,50 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { }, } - for i, tc := range testCases { + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() + origNewWRR := rinternal.NewWRR + rinternal.NewWRR = testutils.NewTestWRR + defer func() { rinternal.NewWRR = origNewWRR }() + // Register a custom httpFilter builder for the test. + fb := &filterBuilder{typeURL: testFilterName} + httpfilter.Register(fb) + + // Spin up an xDS management server. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) + nodeID := uuid.New().String() + mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{ - RouteConfigName: routeStr, - HTTPFilters: tc.ldsFilters, - }, nil) - if i == 0 { - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - } + // Build an xDS resolver. + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}) - defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR) - newWRR = testutils.NewTestWRR + // Update the management server with a listener resource that + // contains an inline route configuration. + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil) - // Invoke the watchAPI callback with a good service update and wait for the - // UpdateState method to be called on the ClientConn. - xdsC.InvokeWatchRouteConfigCallback("", tc.rtCfgUpdate, nil) - - gotState, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) - } - rState := gotState.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) - } - - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("received nil config selector") - } + // Ensure that the resolver pushes a state update to the channel. + cs := verifyUpdateFromResolver(ctx, t, stateCh, "") for method, wants := range tc.rpcRes { // Order of wants is non-deterministic. remainingWant := make([][]string, len(wants)) copy(remainingWant, wants) for n := range wants { - path = nil - - res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: context.Background()}) - if tc.selectErr != "" { - if err == nil || !strings.Contains(err.Error(), tc.selectErr) { - t.Errorf("SelectConfig(_) = _, %v; want _, Contains(%v)", err, tc.selectErr) - } - if err == nil { - res.OnCommitted() - } - continue - } + res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx}) if err != nil { t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) } + var doneFunc func() _, err = res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, func() {}, func(ctx context.Context, done func()) (iresolver.ClientStream, error) { doneFunc = done return nil, nil }) - if tc.newStreamErr != "" { - if err == nil || !strings.Contains(err.Error(), tc.newStreamErr) { - t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.newStreamErr) + if tc.wantStreamErr != "" { + if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) { + t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr) } if err == nil { res.OnCommitted() @@ -2024,10 +1462,13 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { res.OnCommitted() doneFunc() + gotPaths := fb.paths + fb.paths = []string{} + // Confirm the desired path is found in remainingWant, and remove it. pass := false for i := range remainingWant { - if reflect.DeepEqual(path, remainingWant[i]) { + if cmp.Equal(gotPaths, remainingWant[i]) { remainingWant[i] = remainingWant[len(remainingWant)-1] remainingWant = remainingWant[:len(remainingWant)-1] pass = true @@ -2035,7 +1476,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { } } if !pass { - t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, path, remainingWant) + t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant) } } } @@ -2046,7 +1487,3 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) { func newDurationP(d time.Duration) *time.Duration { return &d } - -func newStringP(s string) *string { - return &s -}