diff --git a/resolver/resolver.go b/resolver/resolver.go index 11384e228..c9411907d 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -286,6 +286,11 @@ func (t Target) Endpoint() string { return strings.TrimPrefix(endpoint, "/") } +// String returns a string representation of Target. +func (t Target) String() string { + return t.URL.String() +} + // Builder creates a resolver that will be used to watch name resolution updates. type Builder interface { // Build creates a new resolver for the given target. diff --git a/xds/internal/resolver/cluster_specifier_plugin_test.go b/xds/internal/resolver/cluster_specifier_plugin_test.go index d6186e9c1..eb9f90350 100644 --- a/xds/internal/resolver/cluster_specifier_plugin_test.go +++ b/xds/internal/resolver/cluster_specifier_plugin_test.go @@ -16,21 +16,22 @@ * */ -package resolver +package resolver_test import ( "context" "encoding/json" "fmt" "testing" + "time" "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc/balancer" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/grpctest" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/testutils" xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap" @@ -39,6 +40,8 @@ import ( "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/clusterspecifier" + xdsresolver "google.golang.org/grpc/xds/internal/resolver" + protov2 "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -46,6 +49,80 @@ import ( v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" ) +const ( + defaultTestTimeout = 10 * time.Second + defaultTestShortTimeout = 100 * time.Microsecond +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// verifyUpdateFromResolver waits for the resolver to push an update to the fake +// resolver.ClientConn and verifies that update matches the provided service +// config. +// +// Tests that want to skip verifying the contents of the service config can pass +// an empty string. +// +// Returns the config selector from the state update pushed by the resolver. +// Tests that don't need the config selector can ignore the return value. +func verifyUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State, wantSC string) iresolver.ConfigSelector { + t.Helper() + + var state resolver.State + select { + case <-ctx.Done(): + t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err()) + case state = <-stateCh: + if err := state.ServiceConfig.Err; err != nil { + t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err) + } + if wantSC == "" { + break + } + wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantSC) + if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) { + t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) + } + } + cs := iresolver.GetConfigSelector(state) + if cs == nil { + t.Fatal("Received nil config selector in update from resolver") + } + return cs +} + +// buildResolverForTarget builds an xDS resolver for the given target. It +// returns the following: +// - a channel to read updates from the resolver +// - the newly created xDS resolver +func buildResolverForTarget(t *testing.T, target resolver.Target) (chan resolver.State, resolver.Resolver) { + t.Helper() + + builder := resolver.Get(xdsresolver.Scheme) + if builder == nil { + t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme) + } + + stateCh := make(chan resolver.State, 1) + updateStateF := func(s resolver.State) error { + stateCh <- s + return nil + } + tcc := &testutils.ResolverClientConn{Logger: t, UpdateStateF: updateStateF} + r, err := builder.Build(target, tcc, resolver.BuildOptions{}) + if err != nil { + t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err) + } + t.Cleanup(r.Close) + return stateCh, r +} + func init() { balancer.Register(cspBalancerBuilder{}) clusterspecifier.Register(testClusterSpecifierPlugin{}) @@ -101,7 +178,7 @@ func (testClusterSpecifierPlugin) ParseClusterSpecifierConfig(cfg proto.Message) return nil, fmt.Errorf("testClusterSpecifierPlugin: error parsing config %v: got type %T, want *anypb.Any", cfg, cfg) } lbCfg := new(wrapperspb.StringValue) - if err := ptypes.UnmarshalAny(anyp, lbCfg); err != nil { + if err := anypb.UnmarshalTo(anyp, lbCfg, protov2.UnmarshalOptions{}); err != nil { return nil, fmt.Errorf("testClusterSpecifierPlugin: error parsing config %v: %v", cfg, err) } return []map[string]any{{"csp_experimental": cspBalancerConfig{ArbitraryField: lbCfg.GetValue()}}}, nil @@ -163,46 +240,30 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) { t.Fatal(err) } - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() + stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) // Wait for an update from the resolver, and verify the service config. - val, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` -{ - "loadBalancingConfig": [ - { - "xds_cluster_manager_experimental": { - "children": { - "cluster_specifier_plugin:cspA": { - "childPolicy": [ - { - "csp_experimental": { - "arbitrary_field": "anything" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } - - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } + wantSC := ` + { + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster_specifier_plugin:cspA": { + "childPolicy": [ + { + "csp_experimental": { + "arbitrary_field": "anything" + } + } + ] + } + } + } + } + ] + }` + cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC) res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) @@ -232,37 +293,27 @@ func (s) TestResolverClusterSpecifierPlugin(t *testing.T) { } // Wait for an update from the resolver, and verify the service config. - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` -{ - "loadBalancingConfig": [ - { - "xds_cluster_manager_experimental": { - "children": { - "cluster_specifier_plugin:cspA": { - "childPolicy": [ - { - "csp_experimental": { - "arbitrary_field": "changed" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + wantSC = ` + { + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster_specifier_plugin:cspA": { + "childPolicy": [ + { + "csp_experimental": { + "arbitrary_field": "changed" + } + } + ] + } + } + } + } + ] + }` + verifyUpdateFromResolver(ctx, t, stateCh, wantSC) } // TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and @@ -316,46 +367,31 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { t.Fatal(err) } - tcc, rClose := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) - defer rClose() + stateCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + serviceName)}) // Wait for an update from the resolver, and verify the service config. - val, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState := val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` -{ - "loadBalancingConfig": [ - { - "xds_cluster_manager_experimental": { - "children": { - "cluster_specifier_plugin:cspA": { - "childPolicy": [ - { - "csp_experimental": { - "arbitrary_field": "anythingA" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + wantSC := ` + { + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster_specifier_plugin:cspA": { + "childPolicy": [ + { + "csp_experimental": { + "arbitrary_field": "anythingA" + } + } + ] + } + } + } + } + ] + }` + cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC) - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) @@ -388,52 +424,38 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { } // Wait for an update from the resolver, and verify the service config. - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` -{ - "loadBalancingConfig": [ - { - "xds_cluster_manager_experimental": { - "children": { - "cluster_specifier_plugin:cspA": { - "childPolicy": [ - { - "csp_experimental": { - "arbitrary_field": "anythingA" - } - } - ] - }, - "cluster_specifier_plugin:cspB": { - "childPolicy": [ - { - "csp_experimental": { - "arbitrary_field": "anythingB" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + wantSC = ` + { + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster_specifier_plugin:cspA": { + "childPolicy": [ + { + "csp_experimental": { + "arbitrary_field": "anythingA" + } + } + ] + }, + "cluster_specifier_plugin:cspB": { + "childPolicy": [ + { + "csp_experimental": { + "arbitrary_field": "anythingB" + } + } + ] + } + } + } + } + ] + }` + cs = verifyUpdateFromResolver(ctx, t, stateCh, wantSC) // Perform an RPC and ensure that it is routed to the new cluster. - cs = iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("Received nil config selector in update from resolver") - } resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) if err != nil { t.Fatalf("cs.SelectConfig(): %v", err) @@ -449,35 +471,25 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { // cspA. resOld.OnCommitted() - val, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Timeout waiting for an update from the resolver: %v", err) - } - rState = val.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("Received error in service config: %v", rState.ServiceConfig.Err) - } - wantSCParsed = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(` -{ - "loadBalancingConfig": [ - { - "xds_cluster_manager_experimental": { - "children": { - "cluster_specifier_plugin:cspB": { - "childPolicy": [ - { - "csp_experimental": { - "arbitrary_field": "anythingB" - } - } - ] - } - } - } - } - ] -}`) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, rState.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config)) - } + wantSC = ` + { + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster_specifier_plugin:cspB": { + "childPolicy": [ + { + "csp_experimental": { + "arbitrary_field": "anythingB" + } + } + ] + } + } + } + } + ] + }` + verifyUpdateFromResolver(ctx, t, stateCh, wantSC) } diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 09b335630..6e75887ec 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -37,7 +37,11 @@ import ( "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) -const xdsScheme = "xds" +// Scheme is the xDS resolver's scheme. +// +// TODO(easwars): Rename this package as xdsresolver so that this is accessed as +// xdsresolver.Scheme +const Scheme = "xds" // newBuilderForTesting creates a new xds resolver builder using a specific xds // bootstrap config, so tests can use multiple xds clients in different @@ -152,7 +156,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon // Name helps implement the resolver.Builder interface. func (*xdsResolverBuilder) Scheme() string { - return xdsScheme + return Scheme } // suWithError wraps the ServiceUpdate and error received through a watch API diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 621e9f540..9ed77f815 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -98,8 +98,8 @@ func Test(t *testing.T) { } func (s) TestRegister(t *testing.T) { - if resolver.Get(xdsScheme) == nil { - t.Errorf("scheme %v is not registered", xdsScheme) + if resolver.Get(Scheme) == nil { + t.Errorf("scheme %v is not registered", Scheme) } } @@ -151,9 +151,9 @@ func (s) TestResolverBuilder_ClientCreationFails(t *testing.T) { }() // Build an xDS resolver and expect it to fail. - builder := resolver.Get(xdsScheme) + builder := resolver.Get(Scheme) if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) + t.Fatalf("resolver.Get(%v) returned nil", Scheme) } if _, err := builder.Build(target, newTestClientConn(), resolver.BuildOptions{}); err == nil { t.Fatalf("builder.Build(%v) succeeded when expected to fail", target) @@ -234,9 +234,9 @@ func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) { newXDSClient = origNewClient }() - builder := resolver.Get(xdsScheme) + builder := resolver.Get(Scheme) if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) + t.Fatalf("resolver.Get(%v) returned nil", Scheme) } r, err := builder.Build(test.target, newTestClientConn(), test.buildOpts) @@ -282,9 +282,9 @@ func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *fakeclient.Client, } newXDSClient = oldClientMaker } - builder := resolver.Get(xdsScheme) + builder := resolver.Get(Scheme) if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) + t.Fatalf("resolver.Get(%v) returned nil", Scheme) } tcc := newTestClientConn() @@ -332,9 +332,9 @@ func waitForWatchRouteConfig(ctx context.Context, t *testing.T, xdsC *fakeclient // returns a testClientConn which allows inspection of resolver updates, and a // function to close the resolver once the test is complete. func buildResolverForTarget(t *testing.T, target resolver.Target) (*testClientConn, func()) { - builder := resolver.Get(xdsScheme) + builder := resolver.Get(Scheme) if builder == nil { - t.Fatalf("resolver.Get(%v) returned nil", xdsScheme) + t.Fatalf("resolver.Get(%v) returned nil", Scheme) } tcc := newTestClientConn()