mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			1452 lines
		
	
	
		
			55 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			1452 lines
		
	
	
		
			55 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright 2019 gRPC authors.
 | |
|  *
 | |
|  * Licensed under the Apache License, Version 2.0 (the "License");
 | |
|  * you may not use this file except in compliance with the License.
 | |
|  * You may obtain a copy of the License at
 | |
|  *
 | |
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | |
|  *
 | |
|  * Unless required by applicable law or agreed to in writing, software
 | |
|  * distributed under the License is distributed on an "AS IS" BASIS,
 | |
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|  * See the License for the specific language governing permissions and
 | |
|  * limitations under the License.
 | |
|  *
 | |
|  */
 | |
| 
 | |
| package resolver_test
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	xxhash "github.com/cespare/xxhash/v2"
 | |
| 	"github.com/envoyproxy/go-control-plane/pkg/wellknown"
 | |
| 	"github.com/google/go-cmp/cmp"
 | |
| 	"github.com/google/uuid"
 | |
| 	"google.golang.org/grpc/codes"
 | |
| 	"google.golang.org/grpc/internal"
 | |
| 	"google.golang.org/grpc/internal/grpcsync"
 | |
| 	iresolver "google.golang.org/grpc/internal/resolver"
 | |
| 	"google.golang.org/grpc/internal/testutils"
 | |
| 	xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
 | |
| 	"google.golang.org/grpc/internal/testutils/xds/e2e"
 | |
| 	"google.golang.org/grpc/internal/xds/bootstrap"
 | |
| 	"google.golang.org/grpc/metadata"
 | |
| 	"google.golang.org/grpc/resolver"
 | |
| 	"google.golang.org/grpc/serviceconfig"
 | |
| 	"google.golang.org/grpc/status"
 | |
| 	"google.golang.org/grpc/xds/internal/balancer/clustermanager"
 | |
| 	"google.golang.org/grpc/xds/internal/balancer/ringhash"
 | |
| 	"google.golang.org/grpc/xds/internal/httpfilter"
 | |
| 	xdsresolver "google.golang.org/grpc/xds/internal/resolver"
 | |
| 	rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
 | |
| 	xdstestutils "google.golang.org/grpc/xds/internal/testutils"
 | |
| 	"google.golang.org/grpc/xds/internal/xdsclient"
 | |
| 	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
 | |
| 	"google.golang.org/protobuf/proto"
 | |
| 	"google.golang.org/protobuf/types/known/anypb"
 | |
| 	"google.golang.org/protobuf/types/known/durationpb"
 | |
| 	"google.golang.org/protobuf/types/known/structpb"
 | |
| 	"google.golang.org/protobuf/types/known/wrapperspb"
 | |
| 
 | |
| 	v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
 | |
| 	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
 | |
| 	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
 | |
| 	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
 | |
| 	v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
 | |
| 	v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
 | |
| 	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
 | |
| 
 | |
| 	_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the cds LB policy
 | |
| 	_ "google.golang.org/grpc/xds/internal/httpfilter/router"    // Register the router filter
 | |
| )
 | |
| 
 | |
| // Tests the case where xDS client creation is expected to fail because the
 | |
| // bootstrap configuration is not specified. The test verifies that xDS resolver
 | |
| // build fails as well.
 | |
| func (s) TestResolverBuilder_ClientCreationFails_NoBootstrap(t *testing.T) {
 | |
| 	// Build an xDS resolver without specifying bootstrap env vars.
 | |
| 	builder := resolver.Get(xdsresolver.Scheme)
 | |
| 	if builder == nil {
 | |
| 		t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
 | |
| 	}
 | |
| 
 | |
| 	target := resolver.Target{URL: *testutils.MustParseURL("xds:///target")}
 | |
| 	if _, err := builder.Build(target, nil, resolver.BuildOptions{}); err == nil {
 | |
| 		t.Fatalf("xds Resolver Build(%v) succeeded when expected to fail, because there is not bootstrap configuration for the xDS client", target)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests the case where the specified dial target contains an authority that is
 | |
| // not specified in the bootstrap file. Verifies that the resolver.Build method
 | |
| // fails with the expected error string.
 | |
| func (s) TestResolverBuilder_AuthorityNotDefinedInBootstrap(t *testing.T) {
 | |
| 	bootstrapCleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
 | |
| 		NodeID:    "node-id",
 | |
| 		ServerURI: "dummy-management-server",
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	defer bootstrapCleanup()
 | |
| 
 | |
| 	builder := resolver.Get(xdsresolver.Scheme)
 | |
| 	if builder == nil {
 | |
| 		t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
 | |
| 	}
 | |
| 
 | |
| 	target := resolver.Target{URL: *testutils.MustParseURL("xds://non-existing-authority/target")}
 | |
| 	const wantErr = `authority "non-existing-authority" specified in dial target "xds://non-existing-authority/target" is not found in the bootstrap file`
 | |
| 
 | |
| 	r, err := builder.Build(target, &testutils.ResolverClientConn{Logger: t}, resolver.BuildOptions{})
 | |
| 	if r != nil {
 | |
| 		r.Close()
 | |
| 	}
 | |
| 	if err == nil {
 | |
| 		t.Fatalf("xds Resolver Build(%v) succeeded for target with authority not specified in bootstrap", target)
 | |
| 	}
 | |
| 	if !strings.Contains(err.Error(), wantErr) {
 | |
| 		t.Fatalf("xds Resolver Build(%v) returned err: %v, wantErr: %v", target, err, wantErr)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Test builds an xDS resolver and verifies that the resource name specified in
 | |
| // the discovery request matches expectations.
 | |
| func (s) TestResolverResourceName(t *testing.T) {
 | |
| 	tests := []struct {
 | |
| 		name                         string
 | |
| 		listenerResourceNameTemplate string
 | |
| 		extraAuthority               string
 | |
| 		dialTarget                   string
 | |
| 		wantResourceNames            []string
 | |
| 	}{
 | |
| 		{
 | |
| 			name:                         "default %s old style",
 | |
| 			listenerResourceNameTemplate: "%s",
 | |
| 			dialTarget:                   "xds:///target",
 | |
| 			wantResourceNames:            []string{"target"},
 | |
| 		},
 | |
| 		{
 | |
| 			name:                         "old style no percent encoding",
 | |
| 			listenerResourceNameTemplate: "/path/to/%s",
 | |
| 			dialTarget:                   "xds:///target",
 | |
| 			wantResourceNames:            []string{"/path/to/target"},
 | |
| 		},
 | |
| 		{
 | |
| 			name:                         "new style with %s",
 | |
| 			listenerResourceNameTemplate: "xdstp://authority.com/%s",
 | |
| 			dialTarget:                   "xds:///0.0.0.0:8080",
 | |
| 			wantResourceNames:            []string{"xdstp://authority.com/0.0.0.0:8080"},
 | |
| 		},
 | |
| 		{
 | |
| 			name:                         "new style percent encoding",
 | |
| 			listenerResourceNameTemplate: "xdstp://authority.com/%s",
 | |
| 			dialTarget:                   "xds:///[::1]:8080",
 | |
| 			wantResourceNames:            []string{"xdstp://authority.com/%5B::1%5D:8080"},
 | |
| 		},
 | |
| 		{
 | |
| 			name:                         "new style different authority",
 | |
| 			listenerResourceNameTemplate: "xdstp://authority.com/%s",
 | |
| 			extraAuthority:               "test-authority",
 | |
| 			dialTarget:                   "xds://test-authority/target",
 | |
| 			wantResourceNames:            []string{"xdstp://test-authority/envoy.config.listener.v3.Listener/target"},
 | |
| 		},
 | |
| 	}
 | |
| 	for _, tt := range tests {
 | |
| 		t.Run(tt.name, func(t *testing.T) {
 | |
| 			// Spin up an xDS management server for the test.
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 			defer cancel()
 | |
| 			nodeID := uuid.New().String()
 | |
| 			mgmtServer, lisCh, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 			// Create a bootstrap configuration with test options.
 | |
| 			opts := xdsbootstrap.Options{
 | |
| 				ServerURI: mgmtServer.Address,
 | |
| 				ClientDefaultListenerResourceNameTemplate: tt.listenerResourceNameTemplate,
 | |
| 			}
 | |
| 			if tt.extraAuthority != "" {
 | |
| 				// In this test, we really don't care about having multiple
 | |
| 				// management servers. All we need to verify is whether the
 | |
| 				// resource name matches expectation.
 | |
| 				opts.Authorities = map[string]string{
 | |
| 					tt.extraAuthority: mgmtServer.Address,
 | |
| 				}
 | |
| 			}
 | |
| 			cleanup, err := xdsbootstrap.CreateFile(opts)
 | |
| 			if err != nil {
 | |
| 				t.Fatal(err)
 | |
| 			}
 | |
| 			defer cleanup()
 | |
| 
 | |
| 			buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL(tt.dialTarget)})
 | |
| 			waitForResourceNames(ctx, t, lisCh, tt.wantResourceNames)
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests the case where a service update from the underlying xDS client is
 | |
| // received after the resolver is closed, and verifies that the update is not
 | |
| // propagated to the ClientConn.
 | |
| func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
 | |
| 	// Setup the management server that synchronizes with the test goroutine
 | |
| 	// using two channels. The management server signals the test goroutine when
 | |
| 	// it receives a discovery request for a route configuration resource. And
 | |
| 	// the test goroutine signals the management server when the resolver is
 | |
| 	// closed.
 | |
| 	routeConfigResourceNamesCh := make(chan []string, 1)
 | |
| 	waitForResolverCloseCh := make(chan struct{})
 | |
| 	mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{
 | |
| 		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
 | |
| 			if req.GetTypeUrl() == version.V3RouteConfigURL {
 | |
| 				select {
 | |
| 				case <-routeConfigResourceNamesCh:
 | |
| 				default:
 | |
| 				}
 | |
| 				select {
 | |
| 				case routeConfigResourceNamesCh <- req.GetResourceNames():
 | |
| 				default:
 | |
| 				}
 | |
| 				<-waitForResolverCloseCh
 | |
| 			}
 | |
| 			return nil
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("Failed to start xDS management server: %v", err)
 | |
| 	}
 | |
| 	defer mgmtServer.Stop()
 | |
| 
 | |
| 	// Create a bootstrap configuration specifying the above management server.
 | |
| 	nodeID := uuid.New().String()
 | |
| 	cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
 | |
| 		NodeID:    nodeID,
 | |
| 		ServerURI: mgmtServer.Address,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 	defer cleanup()
 | |
| 
 | |
| 	// Configure resources on the management server.
 | |
| 	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	// Wait for a discovery request for a route configuration resource.
 | |
| 	stateCh, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 	waitForResourceNames(ctx, t, routeConfigResourceNamesCh, []string{defaultTestRouteConfigName})
 | |
| 
 | |
| 	// Close the resolver and unblock the management server.
 | |
| 	r.Close()
 | |
| 	close(waitForResolverCloseCh)
 | |
| 
 | |
| 	// Verify that the update from the management server is not propagated to
 | |
| 	// the ClientConn. The xDS resolver, once closed, is expected to drop
 | |
| 	// updates from the xDS client.
 | |
| 	verifyNoUpdateFromResolver(ctx, t, stateCh)
 | |
| }
 | |
| 
 | |
| // Tests that the xDS resolver's Close method closes the xDS client.
 | |
| func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
 | |
| 	bootstrapCfg := &bootstrap.Config{
 | |
| 		XDSServer: xdstestutils.ServerConfigForAddress(t, "dummy-management-server-address"),
 | |
| 	}
 | |
| 
 | |
| 	// Override xDS client creation to use bootstrap configuration pointing to a
 | |
| 	// dummy management server. Also close a channel when the returned xDS
 | |
| 	// client is closed.
 | |
| 	origNewClient := rinternal.NewXDSClient
 | |
| 	closeCh := make(chan struct{})
 | |
| 	rinternal.NewXDSClient = func() (xdsclient.XDSClient, func(), error) {
 | |
| 		c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout)
 | |
| 		return c, grpcsync.OnceFunc(func() {
 | |
| 			close(closeCh)
 | |
| 			cancel()
 | |
| 		}), err
 | |
| 	}
 | |
| 	defer func() { rinternal.NewXDSClient = origNewClient }()
 | |
| 
 | |
| 	_, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")})
 | |
| 	r.Close()
 | |
| 
 | |
| 	select {
 | |
| 	case <-closeCh:
 | |
| 	case <-time.After(defaultTestTimeout):
 | |
| 		t.Fatal("Timeout when waiting for xDS client to be closed")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests the case where a resource returned by the management server is NACKed
 | |
| // by the xDS client, which then returns an update containing an error to the
 | |
| // resolver. Verifies that the update is propagated to the ClientConn by the
 | |
| // resolver. It also tests the cases where the resolver gets a good update
 | |
| // subsequently, and another error after the good update. The test also verifies
 | |
| // that these are propagated to the ClientConn.
 | |
| func (s) TestResolverBadServiceUpdate(t *testing.T) {
 | |
| 	// Spin up an xDS management server for the test.
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 	// Configure a listener resource that is expected to be NACKed because it
 | |
| 	// does not contain the `RouteSpecifier` field in the HTTPConnectionManager.
 | |
| 	hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
 | |
| 		HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
 | |
| 	})
 | |
| 	lis := &v3listenerpb.Listener{
 | |
| 		Name:        defaultTestServiceName,
 | |
| 		ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
 | |
| 		FilterChains: []*v3listenerpb.FilterChain{{
 | |
| 			Name: "filter-chain-name",
 | |
| 			Filters: []*v3listenerpb.Filter{{
 | |
| 				Name:       wellknown.HTTPConnectionManager,
 | |
| 				ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
 | |
| 			}},
 | |
| 		}},
 | |
| 	}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil)
 | |
| 
 | |
| 	// Build the resolver and expect an error update from it.
 | |
| 	stateCh, errCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 	wantErr := "no RouteSpecifier"
 | |
| 	verifyErrorFromResolver(ctx, t, errCh, wantErr)
 | |
| 
 | |
| 	// Configure good listener and route configuration resources on the
 | |
| 	// management server.
 | |
| 	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	// Expect a good update from the resolver.
 | |
| 	verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
 | |
| 
 | |
| 	// Configure another bad resource on the management server and expect an
 | |
| 	// error update from the resolver.
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil)
 | |
| 	verifyErrorFromResolver(ctx, t, errCh, wantErr)
 | |
| }
 | |
| 
 | |
| // TestResolverGoodServiceUpdate tests the case where the resource returned by
 | |
| // the management server is ACKed by the xDS client, which then returns a good
 | |
| // service update to the resolver. The test verifies that the service config
 | |
| // returned by the resolver matches expectations, and that the config selector
 | |
| // returned by the resolver picks clusters based on the route configuration
 | |
| // received from the management server.
 | |
| func (s) TestResolverGoodServiceUpdate(t *testing.T) {
 | |
| 	for _, tt := range []struct {
 | |
| 		name              string
 | |
| 		routeConfig       *v3routepb.RouteConfiguration
 | |
| 		wantServiceConfig string
 | |
| 		wantClusters      map[string]bool
 | |
| 	}{
 | |
| 		{
 | |
| 			name: "single cluster",
 | |
| 			routeConfig: e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
 | |
| 				RouteConfigName:      defaultTestRouteConfigName,
 | |
| 				ListenerName:         defaultTestServiceName,
 | |
| 				ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeCluster,
 | |
| 				ClusterName:          defaultTestClusterName,
 | |
| 			}),
 | |
| 			wantServiceConfig: wantDefaultServiceConfig,
 | |
| 			wantClusters:      map[string]bool{fmt.Sprintf("cluster:%s", defaultTestClusterName): true},
 | |
| 		},
 | |
| 		{
 | |
| 			name: "two clusters",
 | |
| 			routeConfig: e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
 | |
| 				RouteConfigName:      defaultTestRouteConfigName,
 | |
| 				ListenerName:         defaultTestServiceName,
 | |
| 				ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster,
 | |
| 				WeightedClusters:     map[string]int{"cluster_1": 75, "cluster_2": 25},
 | |
| 			}),
 | |
| 			// This update contains the cluster from the previous update as well
 | |
| 			// as this update, as the previous config selector still references
 | |
| 			// the old cluster when the new one is pushed.
 | |
| 			wantServiceConfig: `{
 | |
|   "loadBalancingConfig": [{
 | |
|     "xds_cluster_manager_experimental": {
 | |
|       "children": {
 | |
|         "cluster:cluster_1": {
 | |
|           "childPolicy": [{
 | |
| 			"cds_experimental": {
 | |
| 			  "cluster": "cluster_1"
 | |
| 			}
 | |
| 		  }]
 | |
|         },
 | |
|         "cluster:cluster_2": {
 | |
|           "childPolicy": [{
 | |
| 			"cds_experimental": {
 | |
| 			  "cluster": "cluster_2"
 | |
| 			}
 | |
| 		  }]
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|   }]}`,
 | |
| 			wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
 | |
| 		},
 | |
| 	} {
 | |
| 		t.Run(tt.name, func(t *testing.T) {
 | |
| 			// Spin up an xDS management server for the test.
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 			defer cancel()
 | |
| 			nodeID := uuid.New().String()
 | |
| 			mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 			// Configure the management server with a good listener resource and a
 | |
| 			// route configuration resource, as specified by the test case.
 | |
| 			listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 			routes := []*v3routepb.RouteConfiguration{tt.routeConfig}
 | |
| 			configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 			stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 			// Read the update pushed by the resolver to the ClientConn.
 | |
| 			cs := verifyUpdateFromResolver(ctx, t, stateCh, tt.wantServiceConfig)
 | |
| 
 | |
| 			pickedClusters := make(map[string]bool)
 | |
| 			// Odds of picking 75% cluster 100 times in a row: 1 in 3E-13.  And
 | |
| 			// with the random number generator stubbed out, we can rely on this
 | |
| 			// to be 100% reproducible.
 | |
| 			for i := 0; i < 100; i++ {
 | |
| 				res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 				if err != nil {
 | |
| 					t.Fatalf("cs.SelectConfig(): %v", err)
 | |
| 				}
 | |
| 				cluster := clustermanager.GetPickedClusterForTesting(res.Context)
 | |
| 				pickedClusters[cluster] = true
 | |
| 				res.OnCommitted()
 | |
| 			}
 | |
| 			if !cmp.Equal(pickedClusters, tt.wantClusters) {
 | |
| 				t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters)
 | |
| 			}
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests a case where a resolver receives a RouteConfig update with a HashPolicy
 | |
| // specifying to generate a hash. The configSelector generated should
 | |
| // successfully generate a Hash.
 | |
| func (s) TestResolverRequestHash(t *testing.T) {
 | |
| 	// Spin up an xDS management server for the test.
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 	// Configure the management server with a good listener resource and a
 | |
| 	// route configuration resource that specifies a hash policy.
 | |
| 	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 	routes := []*v3routepb.RouteConfiguration{{
 | |
| 		Name: defaultTestRouteConfigName,
 | |
| 		VirtualHosts: []*v3routepb.VirtualHost{{
 | |
| 			Domains: []string{defaultTestServiceName},
 | |
| 			Routes: []*v3routepb.Route{{
 | |
| 				Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
 | |
| 				Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
 | |
| 					ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
 | |
| 						Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
 | |
| 							{
 | |
| 								Name:   defaultTestClusterName,
 | |
| 								Weight: &wrapperspb.UInt32Value{Value: 100},
 | |
| 							},
 | |
| 						},
 | |
| 					}},
 | |
| 					HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
 | |
| 						PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
 | |
| 							Header: &v3routepb.RouteAction_HashPolicy_Header{
 | |
| 								HeaderName: ":path",
 | |
| 							},
 | |
| 						},
 | |
| 						Terminal: true,
 | |
| 					}},
 | |
| 				}},
 | |
| 			}},
 | |
| 		}},
 | |
| 	}}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	// Build the resolver and read the config selector out of it.
 | |
| 	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 	cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
 | |
| 
 | |
| 	// Selecting a config when there was a hash policy specified in the route
 | |
| 	// that will be selected should put a request hash in the config's context.
 | |
| 	res, err := cs.SelectConfig(iresolver.RPCInfo{
 | |
| 		Context: metadata.NewOutgoingContext(ctx, metadata.Pairs(":path", "/products")),
 | |
| 		Method:  "/service/method",
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("cs.SelectConfig(): %v", err)
 | |
| 	}
 | |
| 	gotHash := ringhash.GetRequestHashForTesting(res.Context)
 | |
| 	wantHash := xxhash.Sum64String("/products")
 | |
| 	if gotHash != wantHash {
 | |
| 		t.Fatalf("Got request hash: %v, want: %v", gotHash, wantHash)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests the case where resources are removed from the management server,
 | |
| // causing it to send an empty update to the xDS client, which returns a
 | |
| // resource-not-found error to the xDS resolver. The test verifies that an
 | |
| // ongoing RPC is handled to completion when this happens.
 | |
| func (s) TestResolverRemovedWithRPCs(t *testing.T) {
 | |
| 	// Spin up an xDS management server for the test.
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 	// Configure resources on the management server.
 | |
| 	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 	// Read the update pushed by the resolver to the ClientConn.
 | |
| 	cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
 | |
| 
 | |
| 	res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("cs.SelectConfig(): %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// Delete the resources on the management server. This should result in a
 | |
| 	// resource-not-found error from the xDS client.
 | |
| 	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	// The RPC started earlier is still in progress. So, the xDS resolver will
 | |
| 	// not produce an empty service config at this point. Instead it will retain
 | |
| 	// the cluster to which the RPC is ongoing in the service config, but will
 | |
| 	// return an erroring config selector which will fail new RPCs.
 | |
| 	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
 | |
| 	_, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 	if err == nil || status.Code(err) != codes.Unavailable {
 | |
| 		t.Fatalf("cs.SelectConfig() returned: %v, want: %v", err, codes.Unavailable)
 | |
| 	}
 | |
| 
 | |
| 	// "Finish the RPC"; this could cause a panic if the resolver doesn't
 | |
| 	// handle it correctly.
 | |
| 	res.OnCommitted()
 | |
| 
 | |
| 	// Now that the RPC is committed, the xDS resolver is expected to send an
 | |
| 	// update with an empty service config.
 | |
| 	var state resolver.State
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
 | |
| 	case state = <-stateCh:
 | |
| 		if err := state.ServiceConfig.Err; err != nil {
 | |
| 			t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
 | |
| 		}
 | |
| 		wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
 | |
| 		if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
 | |
| 			t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Re-add the listener and expect everything to work again.
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 	// Read the update pushed by the resolver to the ClientConn.
 | |
| 	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
 | |
| 
 | |
| 	res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("cs.SelectConfig(): %v", err)
 | |
| 	}
 | |
| 	res.OnCommitted()
 | |
| }
 | |
| 
 | |
| // Tests the case where resources returned by the management server are removed.
 | |
| // The test verifies that the resolver pushes the expected config selector and
 | |
| // service config in this case.
 | |
| func (s) TestResolverRemovedResource(t *testing.T) {
 | |
| 	// Spin up an xDS management server for the test.
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 	// Configure resources on the management server.
 | |
| 	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 	// Read the update pushed by the resolver to the ClientConn.
 | |
| 	cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
 | |
| 
 | |
| 	// "Make an RPC" by invoking the config selector.
 | |
| 	res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("cs.SelectConfig(): %v", err)
 | |
| 	}
 | |
| 
 | |
| 	// "Finish the RPC"; this could cause a panic if the resolver doesn't
 | |
| 	// handle it correctly.
 | |
| 	res.OnCommitted()
 | |
| 
 | |
| 	// Delete the resources on the management server, resulting in a
 | |
| 	// resource-not-found error from the xDS client.
 | |
| 	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
 | |
| 		t.Fatal(err)
 | |
| 	}
 | |
| 
 | |
| 	// The channel should receive the existing service config with the original
 | |
| 	// cluster but with an erroring config selector.
 | |
| 	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
 | |
| 
 | |
| 	// "Make another RPC" by invoking the config selector.
 | |
| 	res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 	if err == nil || status.Code(err) != codes.Unavailable {
 | |
| 		t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err)
 | |
| 	}
 | |
| 
 | |
| 	// In the meantime, an empty ServiceConfig update should have been sent.
 | |
| 	var state resolver.State
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
 | |
| 	case state = <-stateCh:
 | |
| 		if err := state.ServiceConfig.Err; err != nil {
 | |
| 			t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
 | |
| 		}
 | |
| 		wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
 | |
| 		if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
 | |
| 			t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests the case where the resolver receives max stream duration as part of the
 | |
| // listener and route configuration resources.  The test verifies that the RPC
 | |
| // timeout returned by the config selector matches expectations. A non-nil max
 | |
| // stream duration (this includes an explicit zero value) in a matching route
 | |
| // overrides the value specified in the listener resource.
 | |
| func (s) TestResolverMaxStreamDuration(t *testing.T) {
 | |
| 	// Spin up an xDS management server for the test.
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 	// Configure the management server with a listener resource that specifies a
 | |
| 	// max stream duration as part of its HTTP connection manager. Also
 | |
| 	// configure a route configuration resource, which has multiple routes with
 | |
| 	// different values of max stream duration.
 | |
| 	hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
 | |
| 		RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
 | |
| 			ConfigSource: &v3corepb.ConfigSource{
 | |
| 				ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
 | |
| 			},
 | |
| 			RouteConfigName: defaultTestRouteConfigName,
 | |
| 		}},
 | |
| 		HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
 | |
| 		CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
 | |
| 			MaxStreamDuration: durationpb.New(1 * time.Second),
 | |
| 		},
 | |
| 	})
 | |
| 	listeners := []*v3listenerpb.Listener{{
 | |
| 		Name:        defaultTestServiceName,
 | |
| 		ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
 | |
| 		FilterChains: []*v3listenerpb.FilterChain{{
 | |
| 			Name: "filter-chain-name",
 | |
| 			Filters: []*v3listenerpb.Filter{{
 | |
| 				Name:       wellknown.HTTPConnectionManager,
 | |
| 				ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
 | |
| 			}},
 | |
| 		}},
 | |
| 	}}
 | |
| 	routes := []*v3routepb.RouteConfiguration{{
 | |
| 		Name: defaultTestRouteConfigName,
 | |
| 		VirtualHosts: []*v3routepb.VirtualHost{{
 | |
| 			Domains: []string{defaultTestServiceName},
 | |
| 			Routes: []*v3routepb.Route{
 | |
| 				{
 | |
| 					Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo"}},
 | |
| 					Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
 | |
| 						ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
 | |
| 							Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
 | |
| 								{
 | |
| 									Name:   "A",
 | |
| 									Weight: &wrapperspb.UInt32Value{Value: 100},
 | |
| 								},
 | |
| 							}},
 | |
| 						},
 | |
| 						MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{
 | |
| 							MaxStreamDuration: durationpb.New(5 * time.Second),
 | |
| 						},
 | |
| 					}},
 | |
| 				},
 | |
| 				{
 | |
| 					Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/bar"}},
 | |
| 					Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
 | |
| 						ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
 | |
| 							Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
 | |
| 								{
 | |
| 									Name:   "B",
 | |
| 									Weight: &wrapperspb.UInt32Value{Value: 100},
 | |
| 								},
 | |
| 							}},
 | |
| 						},
 | |
| 						MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{
 | |
| 							MaxStreamDuration: durationpb.New(0 * time.Second),
 | |
| 						},
 | |
| 					}},
 | |
| 				},
 | |
| 				{
 | |
| 					Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
 | |
| 					Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
 | |
| 						ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
 | |
| 							Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
 | |
| 								{
 | |
| 									Name:   "C",
 | |
| 									Weight: &wrapperspb.UInt32Value{Value: 100},
 | |
| 								},
 | |
| 							}},
 | |
| 						},
 | |
| 					}},
 | |
| 				},
 | |
| 			},
 | |
| 		}},
 | |
| 	}}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	// Read the update pushed by the resolver to the ClientConn.
 | |
| 	cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
 | |
| 
 | |
| 	testCases := []struct {
 | |
| 		name   string
 | |
| 		method string
 | |
| 		want   *time.Duration
 | |
| 	}{{
 | |
| 		name:   "RDS setting",
 | |
| 		method: "/foo/method",
 | |
| 		want:   newDurationP(5 * time.Second),
 | |
| 	}, {
 | |
| 		name:   "explicit zero in RDS; ignore LDS",
 | |
| 		method: "/bar/method",
 | |
| 		want:   nil,
 | |
| 	}, {
 | |
| 		name:   "no config in RDS; fallback to LDS",
 | |
| 		method: "/baz/method",
 | |
| 		want:   newDurationP(time.Second),
 | |
| 	}}
 | |
| 
 | |
| 	for _, tc := range testCases {
 | |
| 		t.Run(tc.name, func(t *testing.T) {
 | |
| 			req := iresolver.RPCInfo{
 | |
| 				Method:  tc.method,
 | |
| 				Context: ctx,
 | |
| 			}
 | |
| 			res, err := cs.SelectConfig(req)
 | |
| 			if err != nil {
 | |
| 				t.Errorf("cs.SelectConfig(%v): %v", req, err)
 | |
| 				return
 | |
| 			}
 | |
| 			res.OnCommitted()
 | |
| 			got := res.MethodConfig.Timeout
 | |
| 			if !cmp.Equal(got, tc.want) {
 | |
| 				t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want)
 | |
| 			}
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Tests that clusters remain in service config if RPCs are in flight.
 | |
| func (s) TestResolverDelayedOnCommitted(t *testing.T) {
 | |
| 	// Spin up an xDS management server for the test.
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 	// Configure resources on the management server.
 | |
| 	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 	// Read the update pushed by the resolver to the ClientConn.
 | |
| 	cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
 | |
| 
 | |
| 	// Make an RPC, but do not commit it yet.
 | |
| 	resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("cs.SelectConfig(): %v", err)
 | |
| 	}
 | |
| 	wantClusterName := fmt.Sprintf("cluster:%s", defaultTestClusterName)
 | |
| 	if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != wantClusterName {
 | |
| 		t.Fatalf("Picked cluster is %q, want %q", cluster, wantClusterName)
 | |
| 	}
 | |
| 
 | |
| 	// Delay resOld.OnCommitted(). As long as there are pending RPCs to removed
 | |
| 	// clusters, they still appear in the service config.
 | |
| 
 | |
| 	// Update the route configuration resource on the management server to
 | |
| 	// return a new cluster.
 | |
| 	newClusterName := "new-" + defaultTestClusterName
 | |
| 	routes = []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName)}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	// Read the update pushed by the resolver to the ClientConn and ensure the
 | |
| 	// old cluster is present in the service config. Also ensure that the newly
 | |
| 	// returned config selector does not hold a reference to the old cluster.
 | |
| 	wantSC := fmt.Sprintf(`
 | |
| {
 | |
| 	"loadBalancingConfig": [
 | |
| 		{
 | |
| 		  "xds_cluster_manager_experimental": {
 | |
| 			"children": {
 | |
| 			  "cluster:%s": {
 | |
| 				"childPolicy": [
 | |
| 				  {
 | |
| 					"cds_experimental": {
 | |
| 					  "cluster": "%s"
 | |
| 					}
 | |
| 				  }
 | |
| 				]
 | |
| 			  },
 | |
| 			  "cluster:%s": {
 | |
| 				"childPolicy": [
 | |
| 				  {
 | |
| 					"cds_experimental": {
 | |
| 					  "cluster": "%s"
 | |
| 					}
 | |
| 				  }
 | |
| 				]
 | |
| 			  }
 | |
| 			}
 | |
| 		  }
 | |
| 		}
 | |
| 	  ]
 | |
| }`, defaultTestClusterName, defaultTestClusterName, newClusterName, newClusterName)
 | |
| 	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
 | |
| 
 | |
| 	resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 	if err != nil {
 | |
| 		t.Fatalf("cs.SelectConfig(): %v", err)
 | |
| 	}
 | |
| 	wantClusterName = fmt.Sprintf("cluster:%s", newClusterName)
 | |
| 	if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != wantClusterName {
 | |
| 		t.Fatalf("Picked cluster is %q, want %q", cluster, wantClusterName)
 | |
| 	}
 | |
| 
 | |
| 	// Invoke OnCommitted on the old RPC; should lead to a service config update
 | |
| 	// that deletes the old cluster, as the old cluster no longer has any
 | |
| 	// pending RPCs.
 | |
| 	resOld.OnCommitted()
 | |
| 
 | |
| 	wantSC = fmt.Sprintf(`
 | |
| {
 | |
| 	"loadBalancingConfig": [
 | |
| 		{
 | |
| 		  "xds_cluster_manager_experimental": {
 | |
| 			"children": {
 | |
| 			  "cluster:%s": {
 | |
| 				"childPolicy": [
 | |
| 				  {
 | |
| 					"cds_experimental": {
 | |
| 					  "cluster": "%s"
 | |
| 					}
 | |
| 				  }
 | |
| 				]
 | |
| 			  }
 | |
| 			}
 | |
| 		  }
 | |
| 		}
 | |
| 	  ]
 | |
| }`, newClusterName, newClusterName)
 | |
| 	verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
 | |
| }
 | |
| 
 | |
| // Tests the case where two LDS updates with the same RDS name to watch are
 | |
| // received without an RDS in between. Those LDS updates shouldn't trigger a
 | |
| // service config update.
 | |
| func (s) TestResolverMultipleLDSUpdates(t *testing.T) {
 | |
| 	// Spin up an xDS management server for the test.
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 	// Configure the management server with a listener resource, but no route
 | |
| 	// configuration resource.
 | |
| 	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil)
 | |
| 
 | |
| 	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 	// Ensure there is no update from the resolver.
 | |
| 	verifyNoUpdateFromResolver(ctx, t, stateCh)
 | |
| 
 | |
| 	// Configure the management server with a listener resource that points to
 | |
| 	// the same route configuration resource but has different values for max
 | |
| 	// stream duration field. There is still no route configuration resource on
 | |
| 	// the management server.
 | |
| 	hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
 | |
| 		RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
 | |
| 			ConfigSource: &v3corepb.ConfigSource{
 | |
| 				ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
 | |
| 			},
 | |
| 			RouteConfigName: defaultTestRouteConfigName,
 | |
| 		}},
 | |
| 		HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
 | |
| 		CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
 | |
| 			MaxStreamDuration: durationpb.New(1 * time.Second),
 | |
| 		},
 | |
| 	})
 | |
| 	listeners = []*v3listenerpb.Listener{{
 | |
| 		Name:        defaultTestServiceName,
 | |
| 		ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
 | |
| 		FilterChains: []*v3listenerpb.FilterChain{{
 | |
| 			Name: "filter-chain-name",
 | |
| 			Filters: []*v3listenerpb.Filter{{
 | |
| 				Name:       wellknown.HTTPConnectionManager,
 | |
| 				ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
 | |
| 			}},
 | |
| 		}},
 | |
| 	}}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil)
 | |
| 
 | |
| 	// Ensure that there is no update from the resolver.
 | |
| 	verifyNoUpdateFromResolver(ctx, t, stateCh)
 | |
| }
 | |
| 
 | |
| // TestResolverWRR tests the case where the route configuration returned by the
 | |
| // management server contains a set of weighted clusters. The test performs a
 | |
| // bunch of RPCs using the cluster specifier returned by the resolver, and
 | |
| // verifies the cluster distribution.
 | |
| func (s) TestResolverWRR(t *testing.T) {
 | |
| 	origNewWRR := rinternal.NewWRR
 | |
| 	rinternal.NewWRR = testutils.NewTestWRR
 | |
| 	defer func() { rinternal.NewWRR = origNewWRR }()
 | |
| 
 | |
| 	// Spin up an xDS management server for the test.
 | |
| 	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 	defer cancel()
 | |
| 	nodeID := uuid.New().String()
 | |
| 	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 	// Configure resources on the management server.
 | |
| 	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
 | |
| 	routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
 | |
| 		RouteConfigName:      defaultTestRouteConfigName,
 | |
| 		ListenerName:         defaultTestServiceName,
 | |
| 		ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster,
 | |
| 		WeightedClusters:     map[string]int{"A": 75, "B": 25},
 | |
| 	})}
 | |
| 	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
 | |
| 
 | |
| 	// Read the update pushed by the resolver to the ClientConn.
 | |
| 	cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
 | |
| 
 | |
| 	// Make RPCs to verify WRR behavior in the cluster specifier.
 | |
| 	picks := map[string]int{}
 | |
| 	for i := 0; i < 100; i++ {
 | |
| 		res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
 | |
| 		if err != nil {
 | |
| 			t.Fatalf("cs.SelectConfig(): %v", err)
 | |
| 		}
 | |
| 		picks[clustermanager.GetPickedClusterForTesting(res.Context)]++
 | |
| 		res.OnCommitted()
 | |
| 	}
 | |
| 	want := map[string]int{"cluster:A": 75, "cluster:B": 25}
 | |
| 	if !cmp.Equal(picks, want) {
 | |
| 		t.Errorf("Picked clusters: %v; want: %v", picks, want)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| const filterCfgPathFieldName = "path"
 | |
| const filterCfgErrorFieldName = "new_stream_error"
 | |
| 
 | |
| type filterCfg struct {
 | |
| 	httpfilter.FilterConfig
 | |
| 	path         string
 | |
| 	newStreamErr error
 | |
| }
 | |
| 
 | |
| type filterBuilder struct {
 | |
| 	paths   []string
 | |
| 	typeURL string
 | |
| }
 | |
| 
 | |
| func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} }
 | |
| 
 | |
| func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) {
 | |
| 	ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct)
 | |
| 	if !ok {
 | |
| 		return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{})
 | |
| 	}
 | |
| 
 | |
| 	if ts.GetValue() == nil {
 | |
| 		return filterCfg{}, nil
 | |
| 	}
 | |
| 	ret := filterCfg{}
 | |
| 	if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil {
 | |
| 		ret.path = v.GetStringValue()
 | |
| 	}
 | |
| 	if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil {
 | |
| 		if v.GetStringValue() == "" {
 | |
| 			ret.newStreamErr = nil
 | |
| 		} else {
 | |
| 			ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue())
 | |
| 		}
 | |
| 	}
 | |
| 	return ret, nil
 | |
| }
 | |
| 
 | |
| func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) {
 | |
| 	return filterConfigFromProto(cfg)
 | |
| }
 | |
| 
 | |
| func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) {
 | |
| 	return filterConfigFromProto(override)
 | |
| }
 | |
| 
 | |
| func (*filterBuilder) IsTerminal() bool { return false }
 | |
| 
 | |
| var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{}
 | |
| 
 | |
| func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
 | |
| 	if config == nil {
 | |
| 		panic("unexpected missing config")
 | |
| 	}
 | |
| 
 | |
| 	fi := &filterInterceptor{
 | |
| 		parent: fb,
 | |
| 		pathCh: make(chan string, 10),
 | |
| 	}
 | |
| 
 | |
| 	fb.paths = append(fb.paths, "build:"+config.(filterCfg).path)
 | |
| 	err := config.(filterCfg).newStreamErr
 | |
| 	if override != nil {
 | |
| 		fb.paths = append(fb.paths, "override:"+override.(filterCfg).path)
 | |
| 		err = override.(filterCfg).newStreamErr
 | |
| 	}
 | |
| 
 | |
| 	fi.cfgPath = config.(filterCfg).path
 | |
| 	fi.err = err
 | |
| 	return fi, nil
 | |
| }
 | |
| 
 | |
| type filterInterceptor struct {
 | |
| 	parent  *filterBuilder
 | |
| 	pathCh  chan string
 | |
| 	cfgPath string
 | |
| 	err     error
 | |
| }
 | |
| 
 | |
| func (fi *filterInterceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
 | |
| 	fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath)
 | |
| 	if fi.err != nil {
 | |
| 		return nil, fi.err
 | |
| 	}
 | |
| 	d := func() {
 | |
| 		fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath)
 | |
| 		done()
 | |
| 	}
 | |
| 	cs, err := newStream(ctx, d)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return &clientStream{ClientStream: cs}, nil
 | |
| }
 | |
| 
 | |
| type clientStream struct {
 | |
| 	iresolver.ClientStream
 | |
| }
 | |
| 
 | |
| func (s) TestConfigSelector_FailureCases(t *testing.T) {
 | |
| 	const methodName = "1"
 | |
| 
 | |
| 	tests := []struct {
 | |
| 		name     string
 | |
| 		listener *v3listenerpb.Listener
 | |
| 		wantErr  string
 | |
| 	}{
 | |
| 		{
 | |
| 			name: "route type RouteActionUnsupported invalid for client",
 | |
| 			listener: &v3listenerpb.Listener{
 | |
| 				Name: defaultTestServiceName,
 | |
| 				ApiListener: &v3listenerpb.ApiListener{
 | |
| 					ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
 | |
| 						RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
 | |
| 							RouteConfig: &v3routepb.RouteConfiguration{
 | |
| 								Name: defaultTestRouteConfigName,
 | |
| 								VirtualHosts: []*v3routepb.VirtualHost{{
 | |
| 									Domains: []string{defaultTestServiceName},
 | |
| 									Routes: []*v3routepb.Route{{
 | |
| 										Match: &v3routepb.RouteMatch{
 | |
| 											PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName},
 | |
| 										},
 | |
| 										Action: &v3routepb.Route_FilterAction{},
 | |
| 									}},
 | |
| 								}},
 | |
| 							}},
 | |
| 						HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
 | |
| 					}),
 | |
| 				},
 | |
| 			},
 | |
| 			wantErr: "matched route does not have a supported route action type",
 | |
| 		},
 | |
| 		{
 | |
| 			name: "route type RouteActionNonForwardingAction invalid for client",
 | |
| 			listener: &v3listenerpb.Listener{
 | |
| 				Name: defaultTestServiceName,
 | |
| 				ApiListener: &v3listenerpb.ApiListener{
 | |
| 					ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
 | |
| 						RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
 | |
| 							RouteConfig: &v3routepb.RouteConfiguration{
 | |
| 								Name: defaultTestRouteConfigName,
 | |
| 								VirtualHosts: []*v3routepb.VirtualHost{{
 | |
| 									Domains: []string{defaultTestServiceName},
 | |
| 									Routes: []*v3routepb.Route{{
 | |
| 										Match: &v3routepb.RouteMatch{
 | |
| 											PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName},
 | |
| 										},
 | |
| 										Action: &v3routepb.Route_NonForwardingAction{},
 | |
| 									}},
 | |
| 								}},
 | |
| 							}},
 | |
| 						HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
 | |
| 					}),
 | |
| 				},
 | |
| 			},
 | |
| 			wantErr: "matched route does not have a supported route action type",
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	for _, test := range tests {
 | |
| 		t.Run(test.name, func(t *testing.T) {
 | |
| 			// Spin up an xDS management server.
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 			defer cancel()
 | |
| 			nodeID := uuid.New().String()
 | |
| 			mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 			// Build an xDS resolver.
 | |
| 			stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 			// Update the management server with a listener resource that
 | |
| 			// contains inline route configuration.
 | |
| 			configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{test.listener}, nil)
 | |
| 
 | |
| 			// Ensure that the resolver pushes a state update to the channel.
 | |
| 			cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
 | |
| 
 | |
| 			// Ensure that it returns the expected error.
 | |
| 			_, err := cs.SelectConfig(iresolver.RPCInfo{Method: methodName, Context: ctx})
 | |
| 			if err == nil || !strings.Contains(err.Error(), test.wantErr) {
 | |
| 				t.Errorf("SelectConfig(_) = _, %v; want _, Contains(%v)", err, test.wantErr)
 | |
| 			}
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter {
 | |
| 	return &v3httppb.HttpFilter{
 | |
| 		Name: name,
 | |
| 		ConfigType: &v3httppb.HttpFilter_TypedConfig{
 | |
| 			TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
 | |
| 				TypeUrl: typeURL,
 | |
| 				Value: &structpb.Struct{
 | |
| 					Fields: map[string]*structpb.Value{
 | |
| 						filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: path}},
 | |
| 						filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}},
 | |
| 					},
 | |
| 				},
 | |
| 			}),
 | |
| 		},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (s) TestXDSResolverHTTPFilters(t *testing.T) {
 | |
| 	const methodName1 = "1"
 | |
| 	const methodName2 = "2"
 | |
| 	testFilterName := t.Name()
 | |
| 
 | |
| 	testCases := []struct {
 | |
| 		name          string
 | |
| 		listener      *v3listenerpb.Listener
 | |
| 		rpcRes        map[string][][]string
 | |
| 		wantStreamErr string
 | |
| 	}{
 | |
| 		{
 | |
| 			name: "NewStream error - ensure earlier interceptor Done is still called",
 | |
| 			listener: &v3listenerpb.Listener{
 | |
| 				Name: defaultTestServiceName,
 | |
| 				ApiListener: &v3listenerpb.ApiListener{
 | |
| 					ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
 | |
| 						RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
 | |
| 							RouteConfig: &v3routepb.RouteConfiguration{
 | |
| 								Name: defaultTestRouteConfigName,
 | |
| 								VirtualHosts: []*v3routepb.VirtualHost{{
 | |
| 									Domains: []string{defaultTestServiceName},
 | |
| 									Routes: []*v3routepb.Route{{
 | |
| 										Match: &v3routepb.RouteMatch{
 | |
| 											PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1},
 | |
| 										},
 | |
| 										Action: &v3routepb.Route_Route{
 | |
| 											Route: &v3routepb.RouteAction{
 | |
| 												ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
 | |
| 													WeightedClusters: &v3routepb.WeightedCluster{
 | |
| 														Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
 | |
| 															{Name: "A", Weight: wrapperspb.UInt32(1)},
 | |
| 															{Name: "B", Weight: wrapperspb.UInt32(1)},
 | |
| 														},
 | |
| 													},
 | |
| 												},
 | |
| 											},
 | |
| 										},
 | |
| 									}},
 | |
| 								}},
 | |
| 							}},
 | |
| 						HttpFilters: []*v3httppb.HttpFilter{
 | |
| 							newHTTPFilter(t, "foo", testFilterName, "foo1", ""),
 | |
| 							newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"),
 | |
| 							e2e.RouterHTTPFilter,
 | |
| 						},
 | |
| 					}),
 | |
| 				},
 | |
| 			},
 | |
| 			rpcRes: map[string][][]string{
 | |
| 				methodName1: {
 | |
| 					{"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"}, // err in bar1 NewStream()
 | |
| 				},
 | |
| 			},
 | |
| 			wantStreamErr: "bar newstream err",
 | |
| 		},
 | |
| 		{
 | |
| 			name: "all overrides",
 | |
| 			listener: &v3listenerpb.Listener{
 | |
| 				Name: defaultTestServiceName,
 | |
| 				ApiListener: &v3listenerpb.ApiListener{
 | |
| 					ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
 | |
| 						RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
 | |
| 							RouteConfig: &v3routepb.RouteConfiguration{
 | |
| 								Name: defaultTestRouteConfigName,
 | |
| 								VirtualHosts: []*v3routepb.VirtualHost{{
 | |
| 									Domains: []string{defaultTestServiceName},
 | |
| 									Routes: []*v3routepb.Route{
 | |
| 										{
 | |
| 											Match: &v3routepb.RouteMatch{
 | |
| 												PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1},
 | |
| 											},
 | |
| 											Action: &v3routepb.Route_Route{
 | |
| 												Route: &v3routepb.RouteAction{
 | |
| 													ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
 | |
| 														WeightedClusters: &v3routepb.WeightedCluster{
 | |
| 															Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
 | |
| 																{Name: "A", Weight: wrapperspb.UInt32(1)},
 | |
| 																{Name: "B", Weight: wrapperspb.UInt32(1)},
 | |
| 															},
 | |
| 														},
 | |
| 													},
 | |
| 												},
 | |
| 											},
 | |
| 										},
 | |
| 										{
 | |
| 											Match: &v3routepb.RouteMatch{
 | |
| 												PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2},
 | |
| 											},
 | |
| 											Action: &v3routepb.Route_Route{
 | |
| 												Route: &v3routepb.RouteAction{
 | |
| 													ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
 | |
| 														WeightedClusters: &v3routepb.WeightedCluster{
 | |
| 															Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
 | |
| 																{Name: "A", Weight: wrapperspb.UInt32(1)},
 | |
| 																{
 | |
| 																	Name:   "B",
 | |
| 																	Weight: wrapperspb.UInt32(1),
 | |
| 																	TypedPerFilterConfig: map[string]*anypb.Any{
 | |
| 																		"foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
 | |
| 																			TypeUrl: testFilterName,
 | |
| 																			Value: &structpb.Struct{
 | |
| 																				Fields: map[string]*structpb.Value{
 | |
| 																					filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}},
 | |
| 																				},
 | |
| 																			},
 | |
| 																		}),
 | |
| 																		"bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
 | |
| 																			TypeUrl: testFilterName,
 | |
| 																			Value: &structpb.Struct{
 | |
| 																				Fields: map[string]*structpb.Value{
 | |
| 																					filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}},
 | |
| 																				},
 | |
| 																			},
 | |
| 																		}),
 | |
| 																	},
 | |
| 																},
 | |
| 															},
 | |
| 														},
 | |
| 													},
 | |
| 												},
 | |
| 											},
 | |
| 											TypedPerFilterConfig: map[string]*anypb.Any{
 | |
| 												"foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
 | |
| 													TypeUrl: testFilterName,
 | |
| 													Value: &structpb.Struct{
 | |
| 														Fields: map[string]*structpb.Value{
 | |
| 															filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: "foo3"}},
 | |
| 															filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}},
 | |
| 														},
 | |
| 													},
 | |
| 												}),
 | |
| 												"bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
 | |
| 													TypeUrl: testFilterName,
 | |
| 													Value: &structpb.Struct{
 | |
| 														Fields: map[string]*structpb.Value{
 | |
| 															filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}},
 | |
| 														},
 | |
| 													},
 | |
| 												}),
 | |
| 											},
 | |
| 										},
 | |
| 									},
 | |
| 									TypedPerFilterConfig: map[string]*anypb.Any{
 | |
| 										"foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
 | |
| 											TypeUrl: testFilterName,
 | |
| 											Value: &structpb.Struct{
 | |
| 												Fields: map[string]*structpb.Value{
 | |
| 													filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: "foo2"}},
 | |
| 													filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}},
 | |
| 												},
 | |
| 											},
 | |
| 										}),
 | |
| 										"bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
 | |
| 											TypeUrl: testFilterName,
 | |
| 											Value: &structpb.Struct{
 | |
| 												Fields: map[string]*structpb.Value{
 | |
| 													filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}},
 | |
| 												},
 | |
| 											},
 | |
| 										}),
 | |
| 									},
 | |
| 								}},
 | |
| 							}},
 | |
| 						HttpFilters: []*v3httppb.HttpFilter{
 | |
| 							newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"),
 | |
| 							newHTTPFilter(t, "bar", testFilterName, "bar1", ""),
 | |
| 							e2e.RouterHTTPFilter,
 | |
| 						},
 | |
| 					}),
 | |
| 				},
 | |
| 			},
 | |
| 			rpcRes: map[string][][]string{
 | |
| 				methodName1: {
 | |
| 					{"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
 | |
| 					{"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
 | |
| 				},
 | |
| 				methodName2: {
 | |
| 					{"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
 | |
| 					{"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
 | |
| 					{"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
 | |
| 					{"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
 | |
| 				},
 | |
| 			},
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	for _, tc := range testCases {
 | |
| 		t.Run(tc.name, func(t *testing.T) {
 | |
| 			origNewWRR := rinternal.NewWRR
 | |
| 			rinternal.NewWRR = testutils.NewTestWRR
 | |
| 			defer func() { rinternal.NewWRR = origNewWRR }()
 | |
| 
 | |
| 			// Register a custom httpFilter builder for the test.
 | |
| 			fb := &filterBuilder{typeURL: testFilterName}
 | |
| 			httpfilter.Register(fb)
 | |
| 
 | |
| 			// Spin up an xDS management server.
 | |
| 			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
 | |
| 			defer cancel()
 | |
| 			nodeID := uuid.New().String()
 | |
| 			mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
 | |
| 
 | |
| 			// Build an xDS resolver.
 | |
| 			stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
 | |
| 
 | |
| 			// Update the management server with a listener resource that
 | |
| 			// contains an inline route configuration.
 | |
| 			configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil)
 | |
| 
 | |
| 			// Ensure that the resolver pushes a state update to the channel.
 | |
| 			cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
 | |
| 
 | |
| 			for method, wants := range tc.rpcRes {
 | |
| 				// Order of wants is non-deterministic.
 | |
| 				remainingWant := make([][]string, len(wants))
 | |
| 				copy(remainingWant, wants)
 | |
| 				for n := range wants {
 | |
| 					res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx})
 | |
| 					if err != nil {
 | |
| 						t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
 | |
| 					}
 | |
| 
 | |
| 					var doneFunc func()
 | |
| 					_, err = res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, func() {}, func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
 | |
| 						doneFunc = done
 | |
| 						return nil, nil
 | |
| 					})
 | |
| 					if tc.wantStreamErr != "" {
 | |
| 						if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) {
 | |
| 							t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr)
 | |
| 						}
 | |
| 						if err == nil {
 | |
| 							res.OnCommitted()
 | |
| 							doneFunc()
 | |
| 						}
 | |
| 						continue
 | |
| 					}
 | |
| 					if err != nil {
 | |
| 						t.Fatalf("unexpected error from Interceptor.NewStream: %v", err)
 | |
| 
 | |
| 					}
 | |
| 					res.OnCommitted()
 | |
| 					doneFunc()
 | |
| 
 | |
| 					gotPaths := fb.paths
 | |
| 					fb.paths = []string{}
 | |
| 
 | |
| 					// Confirm the desired path is found in remainingWant, and remove it.
 | |
| 					pass := false
 | |
| 					for i := range remainingWant {
 | |
| 						if cmp.Equal(gotPaths, remainingWant[i]) {
 | |
| 							remainingWant[i] = remainingWant[len(remainingWant)-1]
 | |
| 							remainingWant = remainingWant[:len(remainingWant)-1]
 | |
| 							pass = true
 | |
| 							break
 | |
| 						}
 | |
| 					}
 | |
| 					if !pass {
 | |
| 						t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant)
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func newDurationP(d time.Duration) *time.Duration {
 | |
| 	return &d
 | |
| }
 |