xds/resolver: cleanup tests to use real xDS client 4/n (#5954)

This commit is contained in:
Easwar Swaminathan 2023-01-19 16:16:47 -08:00 committed by GitHub
parent 6e749384f7
commit bc9728f98b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 352 additions and 127 deletions

View File

@ -0,0 +1,34 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import (
"fmt"
"net/url"
)
// MustParseURL attempts to parse the provided target using url.Parse()
// and panics if parsing fails.
func MustParseURL(target string) *url.URL {
u, err := url.Parse(target)
if err != nil {
panic(fmt.Sprintf("Error parsing target(%s): %v", target, err))
}
return u
}

View File

@ -29,6 +29,7 @@ import (
xxhash "github.com/cespare/xxhash/v2"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@ -57,7 +58,10 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
"google.golang.org/protobuf/types/known/wrapperspb"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config
@ -313,6 +317,23 @@ func waitForWatchRouteConfig(ctx context.Context, t *testing.T, xdsC *fakeclient
}
}
// buildResolverForTarget builds an xDS resolver for the given target. It
// returns a testClientConn which allows inspection of resolver updates, and a
// function to close the resolver once the test is complete.
func buildResolverForTarget(t *testing.T, target resolver.Target) (*testClientConn, func()) {
builder := resolver.Get(xdsScheme)
if builder == nil {
t.Fatalf("resolver.Get(%v) returned nil", xdsScheme)
}
tcc := newTestClientConn()
r, err := builder.Build(target, tcc, resolver.BuildOptions{})
if err != nil {
t.Fatalf("builder.Build(%v) returned err: %v", target, err)
}
return tcc, r.Close
}
// TestResolverResourceName builds an xDS resolver and verifies that the
// resource name specified in the discovery request matches expectations.
func (s) TestResolverResourceName(t *testing.T) {
@ -404,27 +425,14 @@ func (s) TestResolverResourceName(t *testing.T) {
tt.extraAuthority: mgmtServer.Address,
}
}
bootstrapContents, err := xdsbootstrap.Contents(opts)
cleanup, err := xdsbootstrap.CreateFile(opts)
if err != nil {
t.Fatal(err)
}
defer cleanup()
// Build an xDS resolver that uses the above bootstrap configuration
// and pass it to grpc.Dial(). Creating the xDS resolver should
// result in creation of the xDS client.
newResolver := internal.NewXDSResolverWithConfigForTesting
if newResolver == nil {
t.Fatal("internal.NewXDSResolverWithConfigForTesting is nil")
}
resolver, err := newResolver.(func([]byte) (resolver.Builder, error))(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
cc, err := grpc.Dial(tt.dialTarget, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
_, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL(tt.dialTarget)})
defer rClose()
// Verify the resource name in the discovery request being sent out.
select {
@ -632,123 +640,265 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
}
}
// TestXDSResolverRequestHash tests a case where a resolver receives a RouteConfig update
// TestResolverRequestHash tests a case where a resolver receives a RouteConfig update
// with a HashPolicy specifying to generate a hash. The configSelector generated should
// successfully generate a Hash.
func (s) TestXDSResolverRequestHash(t *testing.T) {
func (s) TestResolverRequestHash(t *testing.T) {
oldRH := envconfig.XDSRingHash
envconfig.XDSRingHash = true
defer func() { envconfig.XDSRingHash = oldRH }()
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
// Invoke watchAPI callback with a good service update (with hash policies
// specified) and wait for UpdateState method to be called on ClientConn.
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{
Prefix: newStringP(""),
WeightedClusters: map[string]xdsresource.WeightedCluster{
"cluster_1": {Weight: 75},
"cluster_2": {Weight: 25},
},
HashPolicies: []*xdsresource.HashPolicy{{
HashPolicyType: xdsresource.HashPolicyTypeHeader,
HeaderName: ":path",
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
ldsName := serviceName
rdsName := "route-" + serviceName
// Configure the management server with a good listener resource and a
// route configuration resource that specifies a hash policy.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "test-cluster-1",
Weight: &wrapperspb.UInt32Value{Value: 100},
},
},
}},
HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: ":path",
},
},
Terminal: true,
}},
}},
}},
},
},
}, nil)
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}},
}},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
rState := gotState.(resolver.State)
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Error("received nil config selector")
t.Fatal("Received nil config selector in update from resolver")
}
// Selecting a config when there was a hash policy specified in the route
// that will be selected should put a request hash in the config's context.
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: metadata.NewOutgoingContext(context.Background(), metadata.Pairs(":path", "/products"))})
res, err := cs.SelectConfig(iresolver.RPCInfo{
Context: metadata.NewOutgoingContext(ctx, metadata.Pairs(":path", "/products")),
Method: "/service/method",
})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
t.Fatalf("cs.SelectConfig(): %v", err)
}
requestHashGot := ringhash.GetRequestHashForTesting(res.Context)
requestHashWant := xxhash.Sum64String("/products")
if requestHashGot != requestHashWant {
t.Fatalf("requestHashGot = %v, requestHashWant = %v", requestHashGot, requestHashWant)
gotHash := ringhash.GetRequestHashForTesting(res.Context)
wantHash := xxhash.Sum64String("/products")
if gotHash != wantHash {
t.Fatalf("Got request hash: %v, want: %v", gotHash, wantHash)
}
}
// TestXDSResolverRemovedWithRPCs tests the case where a config selector sends
// an empty update to the resolver after the resource is removed.
func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer cancel()
defer xdsR.Close()
// TestResolverRemovedWithRPCs tests the case where resources are removed from
// the management server, causing it to send an empty update to the xDS client,
// which returns a resource-not-found error to the xDS resolver. The test
// verifies that an ongoing RPC is handled properly when this happens.
func (s) TestResolverRemovedWithRPCs(t *testing.T) {
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
ldsName := serviceName
rdsName := "route-" + serviceName
// Configure the management server with a good listener and route
// configuration resource.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "test-cluster-1",
Weight: &wrapperspb.UInt32Value{Value: 100},
},
},
}},
}},
}},
}},
}},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"test-cluster-1": {Weight: 1}}}},
},
},
}, nil)
gotState, err := tcc.stateCh.Receive(ctx)
// Read the update pushed by the resolver to the ClientConn.
val, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := gotState.(resolver.State)
rState := val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster:test-cluster-1": {
"childPolicy": [
{
"cds_experimental": {
"cluster": "test-cluster-1"
}
}
]
}
}
}
}
]
}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("Received unexpected service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
// "Make an RPC" by invoking the config selector.
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatalf("received nil config selector")
t.Fatal("Received nil config selector in update from resolver")
}
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
t.Fatalf("cs.SelectConfig(): %v", err)
}
// Delete the resource
suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr)
// Delete the resources on the management server. This should result in a
// resource-not-found error from the xDS client.
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
t.Fatal(err)
}
if _, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
// The RPC started earlier is still in progress. So, the xDS resolver will
// not produce an empty service config at this point. Instead it will retain
// the cluster to which the RPC is ongoing in the service config, but will
// return an erroring config selector which will fail new RPCs.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("Received unexpected service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
cs = iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("Received nil config selector in update from resolver")
}
_, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err == nil || status.Code(err) != codes.Unavailable {
t.Fatalf("cs.SelectConfig() returned: %v, want: %v", err, codes.Unavailable)
}
// "Finish the RPC"; this could cause a panic if the resolver doesn't
// handle it correctly.
res.OnCommitted()
// Now that the RPC is committed, the xDS resolver is expected to send an
// update with an empty service config.
val, err = tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState = val.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(`{}`)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("Received unexpected service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}
}
// TestXDSResolverRemovedResource tests for proper behavior after a resource is
@ -858,60 +1008,101 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {
}
}
func (s) TestXDSResolverWRR(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
// TestResolverWRR tests the case where the route configuration returned by the
// management server contains a set of weighted clusters. The test performs a
// bunch of RPCs using the cluster specifier returned by the resolver, and
// verifies the cluster distribution.
func (s) TestResolverWRR(t *testing.T) {
defer func(oldNewWRR func() wrr.WRR) { newWRR = oldNewWRR }(newWRR)
newWRR = testutils.NewTestWRR
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{
"A": {Weight: 5},
"B": {Weight: 10},
}}},
},
},
}, nil)
mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
if err != nil {
t.Fatal(err)
}
defer mgmtServer.Stop()
// Create a bootstrap configuration specifying the above management server.
nodeID := uuid.New().String()
cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
NodeID: nodeID,
ServerURI: mgmtServer.Address,
Version: xdsbootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer cleanup()
const serviceName = "my-service-client-side-xds"
tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)})
defer rClose()
ldsName := serviceName
rdsName := "route-" + serviceName
// Configure the management server with a good listener resource and a
// route configuration resource.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(ldsName, rdsName)},
Routes: []*v3routepb.RouteConfiguration{{
Name: rdsName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsName},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
{
Name: "A",
Weight: &wrapperspb.UInt32Value{Value: 75},
},
{
Name: "B",
Weight: &wrapperspb.UInt32Value{Value: 25},
},
},
}},
}},
}},
}},
}},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Read the update pushed by the resolver to the ClientConn.
gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
t.Fatalf("Timeout waiting for an update from the resolver: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err)
}
cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("received nil config selector")
t.Fatal("Received nil config selector in update from resolver")
}
// Make RPCs are verify WRR behavior in the cluster specifier.
picks := map[string]int{}
for i := 0; i < 30; i++ {
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
for i := 0; i < 100; i++ {
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
t.Fatalf("cs.SelectConfig(): %v", err)
}
picks[clustermanager.GetPickedClusterForTesting(res.Context)]++
res.OnCommitted()
}
want := map[string]int{"cluster:A": 10, "cluster:B": 20}
if !reflect.DeepEqual(picks, want) {
t.Errorf("picked clusters = %v; want %v", picks, want)
want := map[string]int{"cluster:A": 75, "cluster:B": 25}
if !cmp.Equal(picks, want) {
t.Errorf("Picked clusters: %v; want: %v", picks, want)
}
}