diff --git a/internal/internal.go b/internal/internal.go index 6d355b0b0..0f4512248 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -63,6 +63,46 @@ var ( // xDS-enabled server invokes this method on a grpc.Server when a particular // listener moves to "not-serving" mode. DrainServerTransports interface{} // func(*grpc.Server, string) + + // NewXDSResolverWithConfigForTesting creates a new xds resolver builder using + // the provided xds bootstrap config instead of the global configuration from + // the supported environment variables. The resolver.Builder is meant to be + // used in conjunction with the grpc.WithResolvers DialOption. + // + // Testing Only + // + // This function should ONLY be used for testing and may not work with some + // other features, including the CSDS service. + NewXDSResolverWithConfigForTesting interface{} // func([]byte) (resolver.Builder, error) + + // RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster + // Specifier Plugin for testing purposes, regardless of the XDSRLS environment + // variable. + // + // TODO: Remove this function once the RLS env var is removed. + RegisterRLSClusterSpecifierPluginForTesting func() + + // UnregisterRLSClusterSpecifierPluginForTesting unregisters the RLS Cluster + // Specifier Plugin for testing purposes. This is needed because there is no way + // to unregister the RLS Cluster Specifier Plugin after registering it solely + // for testing purposes using RegisterRLSClusterSpecifierPluginForTesting(). + // + // TODO: Remove this function once the RLS env var is removed. + UnregisterRLSClusterSpecifierPluginForTesting func() + + // RegisterRBACHTTPFilterForTesting registers the RBAC HTTP Filter for testing + // purposes, regardless of the RBAC environment variable. + // + // TODO: Remove this function once the RBAC env var is removed. + RegisterRBACHTTPFilterForTesting func() + + // UnregisterRBACHTTPFilterForTesting unregisters the RBAC HTTP Filter for + // testing purposes. This is needed because there is no way to unregister the + // HTTP Filter after registering it solely for testing purposes using + // RegisterRBACHTTPFilterForTesting(). + // + // TODO: Remove this function once the RBAC env var is removed. + UnregisterRBACHTTPFilterForTesting func() ) // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/xds/internal/testutils/e2e/bootstrap.go b/internal/testutils/xds/e2e/bootstrap.go similarity index 100% rename from xds/internal/testutils/e2e/bootstrap.go rename to internal/testutils/xds/e2e/bootstrap.go diff --git a/xds/internal/testutils/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go similarity index 100% rename from xds/internal/testutils/e2e/clientresources.go rename to internal/testutils/xds/e2e/clientresources.go diff --git a/xds/internal/testutils/e2e/server.go b/internal/testutils/xds/e2e/server.go similarity index 100% rename from xds/internal/testutils/e2e/server.go rename to internal/testutils/xds/e2e/server.go index e47dcc521..e611c56c6 100644 --- a/xds/internal/testutils/e2e/server.go +++ b/internal/testutils/xds/e2e/server.go @@ -26,18 +26,18 @@ import ( "reflect" "strconv" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3" v3resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3" - - "google.golang.org/grpc" - "google.golang.org/grpc/grpclog" ) var logger = grpclog.Component("xds-e2e") diff --git a/internal/testutils/xds/e2e/setup_certs.go b/internal/testutils/xds/e2e/setup_certs.go new file mode 100644 index 000000000..62ea51d04 --- /dev/null +++ b/internal/testutils/xds/e2e/setup_certs.go @@ -0,0 +1,98 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package e2e + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "os" + "path" + "testing" + + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/testdata" +) + +const ( + // Names of files inside tempdir, for certprovider plugin to watch. + certFile = "cert.pem" + keyFile = "key.pem" + rootFile = "ca.pem" +) + +func createTmpFile(src, dst string) error { + data, err := ioutil.ReadFile(src) + if err != nil { + return fmt.Errorf("ioutil.ReadFile(%q) failed: %v", src, err) + } + if err := ioutil.WriteFile(dst, data, os.ModePerm); err != nil { + return fmt.Errorf("ioutil.WriteFile(%q) failed: %v", dst, err) + } + return nil +} + +// createTempDirWithFiles creates a temporary directory under the system default +// tempDir with the given dirSuffix. It also reads from certSrc, keySrc and +// rootSrc files are creates appropriate files under the newly create tempDir. +// Returns the name of the created tempDir. +func createTmpDirWithFiles(dirSuffix, certSrc, keySrc, rootSrc string) (string, error) { + // Create a temp directory. Passing an empty string for the first argument + // uses the system temp directory. + dir, err := ioutil.TempDir("", dirSuffix) + if err != nil { + return "", fmt.Errorf("ioutil.TempDir() failed: %v", err) + } + + if err := createTmpFile(testdata.Path(certSrc), path.Join(dir, certFile)); err != nil { + return "", err + } + if err := createTmpFile(testdata.Path(keySrc), path.Join(dir, keyFile)); err != nil { + return "", err + } + if err := createTmpFile(testdata.Path(rootSrc), path.Join(dir, rootFile)); err != nil { + return "", err + } + return dir, nil +} + +// CreateClientTLSCredentials creates client-side TLS transport credentials +// using certificate and key files from testdata/x509 directory. +func CreateClientTLSCredentials(t *testing.T) credentials.TransportCredentials { + t.Helper() + + cert, err := tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem")) + if err != nil { + t.Fatalf("tls.LoadX509KeyPair(x509/client1_cert.pem, x509/client1_key.pem) failed: %v", err) + } + b, err := ioutil.ReadFile(testdata.Path("x509/server_ca_cert.pem")) + if err != nil { + t.Fatalf("ioutil.ReadFile(x509/server_ca_cert.pem) failed: %v", err) + } + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(b) { + t.Fatal("failed to append certificates") + } + return credentials.NewTLS(&tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: roots, + ServerName: "x.test.example.com", + }) +} diff --git a/internal/testutils/xds/e2e/setup_management_server.go b/internal/testutils/xds/e2e/setup_management_server.go new file mode 100644 index 000000000..ca45363d6 --- /dev/null +++ b/internal/testutils/xds/e2e/setup_management_server.go @@ -0,0 +1,100 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package e2e + +import ( + "encoding/json" + "path" + "testing" + + "github.com/google/uuid" + "google.golang.org/grpc/internal" + xdsinternal "google.golang.org/grpc/internal/xds" + "google.golang.org/grpc/resolver" +) + +// SetupManagementServer performs the following: +// - spin up an xDS management server on a local port +// - set up certificates for consumption by the file_watcher plugin +// - creates a bootstrap file in a temporary location +// - creates an xDS resolver using the above bootstrap contents +// +// Returns the following: +// - management server +// - nodeID to be used by the client when connecting to the management server +// - bootstrap contents to be used by the client +// - xDS resolver builder to be used by the client +// - a cleanup function to be invoked at the end of the test +func SetupManagementServer(t *testing.T) (*ManagementServer, string, []byte, resolver.Builder, func()) { + t.Helper() + + // Spin up an xDS management server on a local port. + server, err := StartManagementServer() + if err != nil { + t.Fatalf("Failed to spin up the xDS management server: %v", err) + } + defer func() { + if err != nil { + server.Stop() + } + }() + + // Create a directory to hold certs and key files used on the server side. + serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem") + if err != nil { + server.Stop() + t.Fatal(err) + } + + // Create a directory to hold certs and key files used on the client side. + clientDir, err := createTmpDirWithFiles("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem") + if err != nil { + server.Stop() + t.Fatal(err) + } + + // Create certificate providers section of the bootstrap config with entries + // for both the client and server sides. + cpc := map[string]json.RawMessage{ + ServerSideCertProviderInstance: DefaultFileWatcherConfig(path.Join(serverDir, certFile), path.Join(serverDir, keyFile), path.Join(serverDir, rootFile)), + ClientSideCertProviderInstance: DefaultFileWatcherConfig(path.Join(clientDir, certFile), path.Join(clientDir, keyFile), path.Join(clientDir, rootFile)), + } + + // Create a bootstrap file in a temporary directory. + nodeID := uuid.New().String() + bootstrapContents, err := xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{ + Version: xdsinternal.TransportV3, + NodeID: nodeID, + ServerURI: server.Address, + CertificateProviders: cpc, + ServerListenerResourceNameTemplate: ServerListenerResourceNameTemplate, + }) + if err != nil { + server.Stop() + t.Fatalf("Failed to create bootstrap file: %v", err) + } + resolverBuilder := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error)) + resolver, err := resolverBuilder(bootstrapContents) + if err != nil { + server.Stop() + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + return server, nodeID, bootstrapContents, resolver, func() { server.Stop() } +} diff --git a/xds/internal/test/xds_client_affinity_test.go b/test/xds/xds_client_affinity_test.go similarity index 92% rename from xds/internal/test/xds_client_affinity_test.go rename to test/xds/xds_client_affinity_test.go index 55d984592..91ca69707 100644 --- a/xds/internal/test/xds_client_affinity_test.go +++ b/test/xds/xds_client_affinity_test.go @@ -1,6 +1,3 @@ -//go:build !386 -// +build !386 - /* * * Copyright 2021 gRPC authors. @@ -19,7 +16,7 @@ * */ -package xds_test +package xds import ( "context" @@ -29,16 +26,15 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/envconfig" - "google.golang.org/grpc/xds/internal/testutils/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing" ) -const hashHeaderName = "session_id" - // hashRouteConfig returns a RouteConfig resource with hash policy set to // header "session_id". func hashRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.RouteConfiguration { @@ -53,7 +49,7 @@ func hashRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.RouteC HashPolicy: []*v3routepb.RouteAction_HashPolicy{{ PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ Header: &v3routepb.RouteAction_HashPolicy_Header{ - HeaderName: hashHeaderName, + HeaderName: "session_id", }, }, Terminal: true, @@ -92,10 +88,10 @@ func (s) TestClientSideAffinitySanityCheck(t *testing.T) { return func() { envconfig.XDSRingHash = old } }()() - managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) defer cleanup1() - port, cleanup2 := clientSetup(t, &testService{}) + port, cleanup2 := startTestService(t, nil) defer cleanup2() const serviceName = "my-service-client-side-xds" @@ -130,7 +126,7 @@ func (s) TestClientSideAffinitySanityCheck(t *testing.T) { } defer cc.Close() - client := testpb.NewTestServiceClient(cc) + client := testgrpc.NewTestServiceClient(cc) if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("rpc EmptyCall() failed: %v", err) } diff --git a/xds/internal/test/xds_client_federation_test.go b/test/xds/xds_client_federation_test.go similarity index 87% rename from xds/internal/test/xds_client_federation_test.go rename to test/xds/xds_client_federation_test.go index 09db314b7..595e2272f 100644 --- a/xds/internal/test/xds_client_federation_test.go +++ b/test/xds/xds_client_federation_test.go @@ -1,6 +1,3 @@ -//go:build !386 -// +build !386 - /* * * Copyright 2021 gRPC authors. @@ -18,7 +15,7 @@ * limitations under the License. */ -package xds_test +package xds import ( "context" @@ -32,13 +29,13 @@ import ( "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/testutils/xds/e2e" xdsinternal "google.golang.org/grpc/internal/xds" + "google.golang.org/grpc/resolver" + testgrpc "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing" - "google.golang.org/grpc/xds" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/e2e" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) // TestClientSideFederation tests that federation is supported. @@ -83,22 +80,23 @@ func (s) TestClientSideFederation(t *testing.T) { t.Fatalf("Failed to create bootstrap file: %v", err) } - resolver, err := xds.NewXDSResolverWithConfigForTesting(bootstrapContents) + resolverBuilder := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error)) + resolver, err := resolverBuilder(bootstrapContents) if err != nil { t.Fatalf("Failed to create xDS resolver for testing: %v", err) } - port, cleanup := clientSetup(t, &testService{}) + port, cleanup := startTestService(t, nil) defer cleanup() const serviceName = "my-service-client-side-xds" // LDS is old style name. ldsName := serviceName // RDS is new style, with the non default authority. - rdsName := testutils.BuildResourceName(xdsresource.RouteConfigResource, nonDefaultAuth, "route-"+serviceName, nil) + rdsName := fmt.Sprintf("xdstp://%s/envoy.config.route.v3.RouteConfiguration/%s", nonDefaultAuth, "route-"+serviceName) // CDS is old style name. cdsName := "cluster-" + serviceName // EDS is new style, with the non default authority. - edsName := testutils.BuildResourceName(xdsresource.EndpointsResource, nonDefaultAuth, "endpoints-"+serviceName, nil) + edsName := fmt.Sprintf("xdstp://%s/envoy.config.route.v3.ClusterLoadAssignment/%s", nonDefaultAuth, "endpoints-"+serviceName) // Split resources, put LDS/CDS in the default authority, and put RDS/EDS in // the other authority. @@ -135,7 +133,7 @@ func (s) TestClientSideFederation(t *testing.T) { } defer cc.Close() - client := testpb.NewTestServiceClient(cc) + client := testgrpc.NewTestServiceClient(cc) if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("rpc EmptyCall() failed: %v", err) } diff --git a/test/xds/xds_client_integration_test.go b/test/xds/xds_client_integration_test.go new file mode 100644 index 000000000..399a0042a --- /dev/null +++ b/test/xds/xds_client_integration_test.go @@ -0,0 +1,108 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "fmt" + "net" + "strconv" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/xds/e2e" + + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen. +) + +// startTestService spins up a server exposing the TestService on a local port. +// +// Returns the following: +// - the port the server is listening on +// - cleanup function to be invoked by the tests when done +func startTestService(t *testing.T, server *stubserver.StubServer) (uint32, func()) { + if server == nil { + server = &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + } + } + server.StartServer() + + _, p, err := net.SplitHostPort(server.Address) + if err != nil { + t.Fatalf("invalid serving address for stub server: %v", err) + } + port, err := strconv.ParseUint(p, 10, 32) + if err != nil { + t.Fatalf("invalid serving port for stub server: %v", err) + } + return uint32(port), server.Stop +} + +func (s) TestClientSideXDS(t *testing.T) { + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + defer cleanup1() + + port, cleanup2 := startTestService(t, nil) + defer cleanup2() + + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} diff --git a/test/xds/xds_client_retry_test.go b/test/xds/xds_client_retry_test.go new file mode 100644 index 000000000..31968f885 --- /dev/null +++ b/test/xds/xds_client_retry_test.go @@ -0,0 +1,181 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "fmt" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/wrapperspb" + + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +func (s) TestClientSideRetry(t *testing.T) { + ctr := 0 + errs := []codes.Code{codes.ResourceExhausted} + ss := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + defer func() { ctr++ }() + if ctr < len(errs) { + return nil, status.Errorf(errs[ctr], "this should be retried") + } + return &testpb.Empty{}, nil + }, + } + + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + defer cleanup1() + + port, cleanup2 := startTestService(t, ss) + defer cleanup2() + + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + defer cancel() + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.ResourceExhausted { + t.Fatalf("rpc EmptyCall() = _, %v; want _, ResourceExhausted", err) + } + + testCases := []struct { + name string + vhPolicy *v3routepb.RetryPolicy + routePolicy *v3routepb.RetryPolicy + errs []codes.Code // the errors returned by the server for each RPC + tryAgainErr codes.Code // the error that would be returned if we are still using the old retry policies. + errWant codes.Code + }{{ + name: "virtualHost only, fail", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted,unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 1}, + }, + errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, + routePolicy: nil, + tryAgainErr: codes.ResourceExhausted, + errWant: codes.Unavailable, + }, { + name: "virtualHost only", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted, unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, + routePolicy: nil, + tryAgainErr: codes.Unavailable, + errWant: codes.OK, + }, { + name: "virtualHost+route, fail", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted,unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + routePolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, + tryAgainErr: codes.OK, + errWant: codes.Unavailable, + }, { + name: "virtualHost+route", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "resource-exhausted", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + routePolicy: &v3routepb.RetryPolicy{ + RetryOn: "unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + errs: []codes.Code{codes.Unavailable}, + tryAgainErr: codes.Unavailable, + errWant: codes.OK, + }, { + name: "virtualHost+route, not enough attempts", + vhPolicy: &v3routepb.RetryPolicy{ + RetryOn: "unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 2}, + }, + routePolicy: &v3routepb.RetryPolicy{ + RetryOn: "unavailable", + NumRetries: &wrapperspb.UInt32Value{Value: 1}, + }, + errs: []codes.Code{codes.Unavailable, codes.Unavailable}, + tryAgainErr: codes.OK, + errWant: codes.Unavailable, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + errs = tc.errs + + // Confirm tryAgainErr is correct before updating resources. + ctr = 0 + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if code := status.Code(err); code != tc.tryAgainErr { + t.Fatalf("with old retry policy: EmptyCall() = _, %v; want _, %v", err, tc.tryAgainErr) + } + + resources.Routes[0].VirtualHosts[0].RetryPolicy = tc.vhPolicy + resources.Routes[0].VirtualHosts[0].Routes[0].GetRoute().RetryPolicy = tc.routePolicy + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + for { + ctr = 0 + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if code := status.Code(err); code == tc.tryAgainErr { + continue + } else if code != tc.errWant { + t.Fatalf("rpc EmptyCall() = _, %v; want _, %v", err, tc.errWant) + } + break + } + }) + } +} diff --git a/test/xds/xds_rls_clusterspecifier_plugin_test.go b/test/xds/xds_rls_clusterspecifier_plugin_test.go new file mode 100644 index 000000000..392894017 --- /dev/null +++ b/test/xds/xds_rls_clusterspecifier_plugin_test.go @@ -0,0 +1,170 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "fmt" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/rls" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/protobuf/types/known/durationpb" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" + + _ "google.golang.org/grpc/balancer/rls" // Register the RLS Load Balancing policy. +) + +// defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a +// client to connect to a server with a RLS Load Balancer as a child of Cluster Manager. +func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions { + routeConfigName := "route-" + params.DialTarget + clusterName := "cluster-" + params.DialTarget + endpointsName := "endpoints-" + params.DialTarget + return e2e.UpdateOptions{ + NodeID: params.NodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{defaultRouteConfigWithRLSCSP(routeConfigName, params.DialTarget, rlsProto)}, + Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, params.SecLevel)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})}, + } +} + +// defaultRouteConfigWithRLSCSP returns a basic xds RouteConfig resource with an +// RLS Cluster Specifier Plugin configured as the route. +func defaultRouteConfigWithRLSCSP(routeName, ldsTarget string, rlsProto *rlspb.RouteLookupConfig) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + ClusterSpecifierPlugins: []*v3routepb.ClusterSpecifierPlugin{ + { + Extension: &v3corepb.TypedExtensionConfig{ + Name: "rls-csp", + TypedConfig: testutils.MarshalAny(&rlspb.RouteLookupClusterSpecifier{ + RouteLookupConfig: rlsProto, + }), + }, + }, + }, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_ClusterSpecifierPlugin{ClusterSpecifierPlugin: "rls-csp"}, + }}, + }}, + }}, + } +} + +// TestRLSinxDS tests an xDS configured system with an RLS Balancer present. +// +// This test sets up the RLS Balancer using the RLS Cluster Specifier Plugin, +// spins up a test service and has a fake RLS Server correctly respond with a +// target corresponding to this test service. This test asserts an RPC proceeds +// as normal with the RLS Balancer as part of system. +func (s) TestRLSinxDS(t *testing.T) { + oldRLS := envconfig.XDSRLS + envconfig.XDSRLS = true + internal.RegisterRLSClusterSpecifierPluginForTesting() + defer func() { + envconfig.XDSRLS = oldRLS + internal.UnregisterRLSClusterSpecifierPluginForTesting() + }() + + // Set up all components and configuration necessary - management server, + // xDS resolver, fake RLS Server, and xDS configuration which specifies an + // RLS Balancer that communicates to this set up fake RLS Server. + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + defer cleanup1() + port, cleanup2 := startTestService(t, nil) + defer cleanup2() + + lis := testutils.NewListenerWrapper(t, nil) + rlsServer, rlsRequestCh := rls.SetupFakeRLSServer(t, lis) + rlsProto := &rlspb.RouteLookupConfig{ + GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}}, + LookupService: rlsServer.Address, + LookupServiceTimeout: durationpb.New(defaultTestTimeout), + CacheSizeBytes: 1024, + } + + const serviceName = "my-service-client-side-xds" + resources := defaultClientResourcesWithRLSCSP(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }, rlsProto) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Configure the fake RLS Server to set the RLS Balancers child CDS + // Cluster's name as the target for the RPC to use. + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rls.RouteLookupResponse { + return &rls.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{"cluster-" + serviceName}}} + }) + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + // Successfully sending the RPC will require the RLS Load Balancer to + // communicate with the fake RLS Server for information about the target. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // These RLS Verifications makes sure the RLS Load Balancer is actually part + // of the xDS Configured system that correctly sends out RPC. + + // Verify connection is established to RLS Server. + if _, err = lis.NewConnCh.Receive(ctx); err != nil { + t.Fatal("Timeout when waiting for RLS LB policy to create control channel") + } + + // Verify an rls request is sent out to fake RLS Server. + select { + case <-ctx.Done(): + t.Fatalf("Timeout when waiting for an RLS request to be sent out") + case <-rlsRequestCh: + } +} diff --git a/xds/internal/test/xds_security_config_nack_test.go b/test/xds/xds_security_config_nack_test.go similarity index 96% rename from xds/internal/test/xds_security_config_nack_test.go rename to test/xds/xds_security_config_nack_test.go index 7b8e36c3f..4fe469ed3 100644 --- a/xds/internal/test/xds_security_config_nack_test.go +++ b/test/xds/xds_security_config_nack_test.go @@ -1,6 +1,3 @@ -//go:build !386 -// +build !386 - /* * * Copyright 2021 gRPC authors. @@ -19,7 +16,7 @@ * */ -package xds_test +package xds import ( "context" @@ -30,10 +27,11 @@ import ( "google.golang.org/grpc/credentials/insecure" xdscreds "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/e2e" + "google.golang.org/grpc/internal/testutils/xds/e2e" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -43,7 +41,7 @@ func (s) TestUnmarshalListener_WithUpdateValidatorFunc(t *testing.T) { missingIdentityProviderInstance = "missing-identity-provider-instance" missingRootProviderInstance = "missing-root-provider-instance" ) - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -207,7 +205,7 @@ func (s) TestUnmarshalListener_WithUpdateValidatorFunc(t *testing.T) { } ctx2, cancel2 := context.WithTimeout(ctx, timeout) defer cancel2() - client := testpb.NewTestServiceClient(cc) + client := testgrpc.NewTestServiceClient(cc) if _, err := client.EmptyCall(ctx2, &testpb.Empty{}, grpc.WaitForReady(true)); (err != nil) != test.wantErr { t.Fatalf("EmptyCall() returned err: %v, wantErr %v", err, test.wantErr) } @@ -323,13 +321,13 @@ func (s) TestUnmarshalCluster_WithUpdateValidatorFunc(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - // setupManagementServer() sets up a bootstrap file with certificate + // SetupManagementServer() sets up a bootstrap file with certificate // provider instance names: `e2e.ServerSideCertProviderInstance` and // `e2e.ClientSideCertProviderInstance`. - managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) defer cleanup1() - port, cleanup2 := clientSetup(t, &testService{}) + port, cleanup2 := startTestService(t, nil) defer cleanup2() // This creates a `Cluster` resource with a security config which @@ -363,7 +361,7 @@ func (s) TestUnmarshalCluster_WithUpdateValidatorFunc(t *testing.T) { } ctx2, cancel2 := context.WithTimeout(ctx, timeout) defer cancel2() - client := testpb.NewTestServiceClient(cc) + client := testgrpc.NewTestServiceClient(cc) if _, err := client.EmptyCall(ctx2, &testpb.Empty{}, grpc.WaitForReady(true)); (err != nil) != test.wantErr { t.Fatalf("EmptyCall() returned err: %v, wantErr %v", err, test.wantErr) } diff --git a/test/xds/xds_server_integration_test.go b/test/xds/xds_server_integration_test.go new file mode 100644 index 000000000..f3057aa0e --- /dev/null +++ b/test/xds/xds_server_integration_test.go @@ -0,0 +1,370 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "fmt" + "net" + "strconv" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + xdscreds "google.golang.org/grpc/credentials/xds" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/status" + "google.golang.org/grpc/xds" + + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +type testService struct { + testpb.TestServiceServer +} + +func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil +} + +func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil +} + +// setupGRPCServer performs the following: +// - spin up an xDS-enabled gRPC server, configure it with xdsCredentials and +// register the test service on it +// - create a local TCP listener and start serving on it +// +// Returns the following: +// - local listener on which the xDS-enabled gRPC server is serving on +// - cleanup function to be invoked by the tests when done +func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func()) { + t.Helper() + + // Configure xDS credentials to be used on the server-side. + creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{ + FallbackCreds: insecure.NewCredentials(), + }) + if err != nil { + t.Fatal(err) + } + + // Create a server option to get notified about serving mode changes. We don't + // do anything other than throwing a log entry here. But this is required, + // since the server code emits a log entry at the default level (which is + // ERROR) if no callback is registered for serving mode changes. Our + // testLogger fails the test if there is any log entry at ERROR level. It does + // provide an ExpectError() method, but that takes a string and it would be + // painful to construct the exact error message expected here. Instead this + // works just fine. + modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { + t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) + }) + + // Initialize an xDS-enabled gRPC server and register the stubServer on it. + server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) + testgrpc.RegisterTestServiceServer(server, &testService{}) + + // Create a local listener and pass it to Serve(). + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + + go func() { + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) + } + }() + + return lis, func() { + server.Stop() + } +} + +func hostPortFromListener(lis net.Listener) (string, uint32, error) { + host, p, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err) + } + port, err := strconv.ParseInt(p, 10, 32) + if err != nil { + return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err) + } + return host, uint32(port), nil +} + +// TestServerSideXDS_Fallback is an e2e test which verifies xDS credentials +// fallback functionality. +// +// The following sequence of events happen as part of this test: +// - An xDS-enabled gRPC server is created and xDS credentials are configured. +// - xDS is enabled on the client by the use of the xds:/// scheme, and xDS +// credentials are configured. +// - Control plane is configured to not send any security configuration to both +// the client and the server. This results in both of them using the +// configured fallback credentials (which is insecure creds in this case). +func (s) TestServerSideXDS_Fallback(t *testing.T) { + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + defer cleanup1() + + lis, cleanup2 := setupGRPCServer(t, bootstrapContents) + defer cleanup2() + + // Grab the host and port of the server and create client side xDS resources + // corresponding to it. This contains default resources with no security + // configuration in the Cluster resources. + host, port, err := hostPortFromListener(lis) + if err != nil { + t.Fatalf("failed to retrieve host and port of server: %v", err) + } + const serviceName = "my-service-fallback" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: host, + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + + // Create an inbound xDS listener resource for the server side that does not + // contain any security configuration. This should force the server-side + // xdsCredentials to use fallback. + inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone) + resources.Listeners = append(resources.Listeners, inboundLis) + + // Setup the management server with client and server-side resources. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create client-side xDS credentials with an insecure fallback. + creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{ + FallbackCreds: insecure.NewCredentials(), + }) + if err != nil { + t.Fatal(err) + } + + // Create a ClientConn with the xds scheme and make a successful RPC. + cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Errorf("rpc EmptyCall() failed: %v", err) + } +} + +// TestServerSideXDS_FileWatcherCerts is an e2e test which verifies xDS +// credentials with file watcher certificate provider. +// +// The following sequence of events happen as part of this test: +// - An xDS-enabled gRPC server is created and xDS credentials are configured. +// - xDS is enabled on the client by the use of the xds:/// scheme, and xDS +// credentials are configured. +// - Control plane is configured to send security configuration to both the +// client and the server, pointing to the file watcher certificate provider. +// We verify both TLS and mTLS scenarios. +func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) { + tests := []struct { + name string + secLevel e2e.SecurityLevel + }{ + { + name: "tls", + secLevel: e2e.SecurityLevelTLS, + }, + { + name: "mtls", + secLevel: e2e.SecurityLevelMTLS, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + defer cleanup1() + + lis, cleanup2 := setupGRPCServer(t, bootstrapContents) + defer cleanup2() + + // Grab the host and port of the server and create client side xDS + // resources corresponding to it. + host, port, err := hostPortFromListener(lis) + if err != nil { + t.Fatalf("failed to retrieve host and port of server: %v", err) + } + + // Create xDS resources to be consumed on the client side. This + // includes the listener, route configuration, cluster (with + // security configuration) and endpoint resources. + serviceName := "my-service-file-watcher-certs-" + test.name + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: host, + Port: port, + SecLevel: test.secLevel, + }) + + // Create an inbound xDS listener resource for the server side that + // contains security configuration pointing to the file watcher + // plugin. + inboundLis := e2e.DefaultServerListener(host, port, test.secLevel) + resources.Listeners = append(resources.Listeners, inboundLis) + + // Setup the management server with client and server resources. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create client-side xDS credentials with an insecure fallback. + creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{ + FallbackCreds: insecure.NewCredentials(), + }) + if err != nil { + t.Fatal(err) + } + + // Create a ClientConn with the xds scheme and make an RPC. + cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + }) + } +} + +// TestServerSideXDS_SecurityConfigChange is an e2e test where xDS is enabled on +// the server-side and xdsCredentials are configured for security. The control +// plane initially does not any security configuration. This forces the +// xdsCredentials to use fallback creds, which is this case is insecure creds. +// We verify that a client connecting with TLS creds is not able to successfully +// make an RPC. The control plane then sends a listener resource with security +// configuration pointing to the use of the file_watcher plugin and we verify +// that the same client is now able to successfully make an RPC. +func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) { + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) + defer cleanup1() + + lis, cleanup2 := setupGRPCServer(t, bootstrapContents) + defer cleanup2() + + // Grab the host and port of the server and create client side xDS resources + // corresponding to it. This contains default resources with no security + // configuration in the Cluster resource. This should force the xDS + // credentials on the client to use its fallback. + host, port, err := hostPortFromListener(lis) + if err != nil { + t.Fatalf("failed to retrieve host and port of server: %v", err) + } + const serviceName = "my-service-security-config-change" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: host, + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + + // Create an inbound xDS listener resource for the server side that does not + // contain any security configuration. This should force the xDS credentials + // on server to use its fallback. + inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone) + resources.Listeners = append(resources.Listeners, inboundLis) + + // Setup the management server with client and server-side resources. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create client-side xDS credentials with an insecure fallback. + xdsCreds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{ + FallbackCreds: insecure.NewCredentials(), + }) + if err != nil { + t.Fatal(err) + } + + // Create a ClientConn with the xds scheme and make a successful RPC. + xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer xdsCC.Close() + + client := testgrpc.NewTestServiceClient(xdsCC) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + + // Create a ClientConn with TLS creds. This should fail since the server is + // using fallback credentials which in this case in insecure creds. + tlsCreds := e2e.CreateClientTLSCredentials(t) + tlsCC, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithTransportCredentials(tlsCreds)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer tlsCC.Close() + + // We don't set 'waitForReady` here since we want this call to failfast. + client = testgrpc.NewTestServiceClient(tlsCC) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { + t.Fatal("rpc EmptyCall() succeeded when expected to fail") + } + + // Switch server and client side resources with ones that contain required + // security configuration for mTLS with a file watcher certificate provider. + resources = e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: host, + Port: port, + SecLevel: e2e.SecurityLevelMTLS, + }) + inboundLis = e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS) + resources.Listeners = append(resources.Listeners, inboundLis) + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Make another RPC with `waitForReady` set and expect this to succeed. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} diff --git a/xds/internal/test/xds_server_integration_test.go b/test/xds/xds_server_rbac_test.go similarity index 70% rename from xds/internal/test/xds_server_integration_test.go rename to test/xds/xds_server_rbac_test.go index b36292690..216653a8d 100644 --- a/xds/internal/test/xds_server_integration_test.go +++ b/test/xds/xds_server_rbac_test.go @@ -1,9 +1,6 @@ -//go:build !386 -// +build !386 - /* * - * Copyright 2020 gRPC authors. + * Copyright 2022 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +16,7 @@ * */ -// Package xds_test contains e2e tests for xDS use. -package xds_test +package xds import ( "context" @@ -30,356 +26,29 @@ import ( "strings" "testing" + v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/status" - "google.golang.org/grpc/xds" - "google.golang.org/grpc/xds/internal/httpfilter/rbac" - "google.golang.org/grpc/xds/internal/testutils/e2e" + "google.golang.org/protobuf/types/known/anypb" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3rbacpb "github.com/envoyproxy/go-control-plane/envoy/config/rbac/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" rpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/rbac/v3" - v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3matcherpb "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" - anypb "github.com/golang/protobuf/ptypes/any" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" - xdscreds "google.golang.org/grpc/credentials/xds" + testgrpc "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing" ) -const ( - // Names of files inside tempdir, for certprovider plugin to watch. - certFile = "cert.pem" - keyFile = "key.pem" - rootFile = "ca.pem" -) - -// setupGRPCServer performs the following: -// - spin up an xDS-enabled gRPC server, configure it with xdsCredentials and -// register the test service on it -// - create a local TCP listener and start serving on it -// -// Returns the following: -// - local listener on which the xDS-enabled gRPC server is serving on -// - cleanup function to be invoked by the tests when done -func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func()) { - t.Helper() - - // Configure xDS credentials to be used on the server-side. - creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{ - FallbackCreds: insecure.NewCredentials(), - }) - if err != nil { - t.Fatal(err) - } - - // Create a server option to get notified about serving mode changes. We don't - // do anything other than throwing a log entry here. But this is required, - // since the server code emits a log entry at the default level (which is - // ERROR) if no callback is registered for serving mode changes. Our - // testLogger fails the test if there is any log entry at ERROR level. It does - // provide an ExpectError() method, but that takes a string and it would be - // painful to construct the exact error message expected here. Instead this - // works just fine. - modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { - t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) - }) - - // Initialize an xDS-enabled gRPC server and register the stubServer on it. - server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) - testpb.RegisterTestServiceServer(server, &testService{}) - - // Create a local listener and pass it to Serve(). - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("testutils.LocalTCPListener() failed: %v", err) - } - - go func() { - if err := server.Serve(lis); err != nil { - t.Errorf("Serve() failed: %v", err) - } - }() - - return lis, func() { - server.Stop() - } -} - -func hostPortFromListener(lis net.Listener) (string, uint32, error) { - host, p, err := net.SplitHostPort(lis.Addr().String()) - if err != nil { - return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err) - } - port, err := strconv.ParseInt(p, 10, 32) - if err != nil { - return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err) - } - return host, uint32(port), nil -} - -// TestServerSideXDS_Fallback is an e2e test which verifies xDS credentials -// fallback functionality. -// -// The following sequence of events happen as part of this test: -// - An xDS-enabled gRPC server is created and xDS credentials are configured. -// - xDS is enabled on the client by the use of the xds:/// scheme, and xDS -// credentials are configured. -// - Control plane is configured to not send any security configuration to both -// the client and the server. This results in both of them using the -// configured fallback credentials (which is insecure creds in this case). -func (s) TestServerSideXDS_Fallback(t *testing.T) { - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t) - defer cleanup1() - - lis, cleanup2 := setupGRPCServer(t, bootstrapContents) - defer cleanup2() - - // Grab the host and port of the server and create client side xDS resources - // corresponding to it. This contains default resources with no security - // configuration in the Cluster resources. - host, port, err := hostPortFromListener(lis) - if err != nil { - t.Fatalf("failed to retrieve host and port of server: %v", err) - } - const serviceName = "my-service-fallback" - resources := e2e.DefaultClientResources(e2e.ResourceParams{ - DialTarget: serviceName, - NodeID: nodeID, - Host: host, - Port: port, - SecLevel: e2e.SecurityLevelNone, - }) - - // Create an inbound xDS listener resource for the server side that does not - // contain any security configuration. This should force the server-side - // xdsCredentials to use fallback. - inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone) - resources.Listeners = append(resources.Listeners, inboundLis) - - // Setup the management server with client and server-side resources. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Create client-side xDS credentials with an insecure fallback. - creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{ - FallbackCreds: insecure.NewCredentials(), - }) - if err != nil { - t.Fatal(err) - } - - // Create a ClientConn with the xds scheme and make a successful RPC. - cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolver)) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer cc.Close() - - client := testpb.NewTestServiceClient(cc) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { - t.Errorf("rpc EmptyCall() failed: %v", err) - } -} - -// TestServerSideXDS_FileWatcherCerts is an e2e test which verifies xDS -// credentials with file watcher certificate provider. -// -// The following sequence of events happen as part of this test: -// - An xDS-enabled gRPC server is created and xDS credentials are configured. -// - xDS is enabled on the client by the use of the xds:/// scheme, and xDS -// credentials are configured. -// - Control plane is configured to send security configuration to both the -// client and the server, pointing to the file watcher certificate provider. -// We verify both TLS and mTLS scenarios. -func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) { - tests := []struct { - name string - secLevel e2e.SecurityLevel - }{ - { - name: "tls", - secLevel: e2e.SecurityLevelTLS, - }, - { - name: "mtls", - secLevel: e2e.SecurityLevelMTLS, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t) - defer cleanup1() - - lis, cleanup2 := setupGRPCServer(t, bootstrapContents) - defer cleanup2() - - // Grab the host and port of the server and create client side xDS - // resources corresponding to it. - host, port, err := hostPortFromListener(lis) - if err != nil { - t.Fatalf("failed to retrieve host and port of server: %v", err) - } - - // Create xDS resources to be consumed on the client side. This - // includes the listener, route configuration, cluster (with - // security configuration) and endpoint resources. - serviceName := "my-service-file-watcher-certs-" + test.name - resources := e2e.DefaultClientResources(e2e.ResourceParams{ - DialTarget: serviceName, - NodeID: nodeID, - Host: host, - Port: port, - SecLevel: test.secLevel, - }) - - // Create an inbound xDS listener resource for the server side that - // contains security configuration pointing to the file watcher - // plugin. - inboundLis := e2e.DefaultServerListener(host, port, test.secLevel) - resources.Listeners = append(resources.Listeners, inboundLis) - - // Setup the management server with client and server resources. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Create client-side xDS credentials with an insecure fallback. - creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{ - FallbackCreds: insecure.NewCredentials(), - }) - if err != nil { - t.Fatal(err) - } - - // Create a ClientConn with the xds scheme and make an RPC. - cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolver)) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer cc.Close() - - client := testpb.NewTestServiceClient(cc) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { - t.Fatalf("rpc EmptyCall() failed: %v", err) - } - }) - } -} - -// TestServerSideXDS_SecurityConfigChange is an e2e test where xDS is enabled on -// the server-side and xdsCredentials are configured for security. The control -// plane initially does not any security configuration. This forces the -// xdsCredentials to use fallback creds, which is this case is insecure creds. -// We verify that a client connecting with TLS creds is not able to successfully -// make an RPC. The control plane then sends a listener resource with security -// configuration pointing to the use of the file_watcher plugin and we verify -// that the same client is now able to successfully make an RPC. -func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) { - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t) - defer cleanup1() - - lis, cleanup2 := setupGRPCServer(t, bootstrapContents) - defer cleanup2() - - // Grab the host and port of the server and create client side xDS resources - // corresponding to it. This contains default resources with no security - // configuration in the Cluster resource. This should force the xDS - // credentials on the client to use its fallback. - host, port, err := hostPortFromListener(lis) - if err != nil { - t.Fatalf("failed to retrieve host and port of server: %v", err) - } - const serviceName = "my-service-security-config-change" - resources := e2e.DefaultClientResources(e2e.ResourceParams{ - DialTarget: serviceName, - NodeID: nodeID, - Host: host, - Port: port, - SecLevel: e2e.SecurityLevelNone, - }) - - // Create an inbound xDS listener resource for the server side that does not - // contain any security configuration. This should force the xDS credentials - // on server to use its fallback. - inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone) - resources.Listeners = append(resources.Listeners, inboundLis) - - // Setup the management server with client and server-side resources. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Create client-side xDS credentials with an insecure fallback. - xdsCreds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{ - FallbackCreds: insecure.NewCredentials(), - }) - if err != nil { - t.Fatal(err) - } - - // Create a ClientConn with the xds scheme and make a successful RPC. - xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds), grpc.WithResolvers(resolver)) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer xdsCC.Close() - - client := testpb.NewTestServiceClient(xdsCC) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { - t.Fatalf("rpc EmptyCall() failed: %v", err) - } - - // Create a ClientConn with TLS creds. This should fail since the server is - // using fallback credentials which in this case in insecure creds. - tlsCreds := createClientTLSCredentials(t) - tlsCC, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithTransportCredentials(tlsCreds)) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer tlsCC.Close() - - // We don't set 'waitForReady` here since we want this call to failfast. - client = testpb.NewTestServiceClient(tlsCC) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable { - t.Fatal("rpc EmptyCall() succeeded when expected to fail") - } - - // Switch server and client side resources with ones that contain required - // security configuration for mTLS with a file watcher certificate provider. - resources = e2e.DefaultClientResources(e2e.ResourceParams{ - DialTarget: serviceName, - NodeID: nodeID, - Host: host, - Port: port, - SecLevel: e2e.SecurityLevelMTLS, - }) - inboundLis = e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS) - resources.Listeners = append(resources.Listeners, inboundLis) - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Make another RPC with `waitForReady` set and expect this to succeed. - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { - t.Fatalf("rpc EmptyCall() failed: %v", err) - } -} - // TestServerSideXDS_RouteConfiguration is an e2e test which verifies routing // functionality. The xDS enabled server will be set up with route configuration // where the route configuration has routes with the correct routing actions @@ -391,7 +60,7 @@ func (s) TestServerSideXDS_RouteConfiguration(t *testing.T) { defer func() { envconfig.XDSRBAC = oldRBAC }() - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -583,7 +252,7 @@ func (s) TestServerSideXDS_RouteConfiguration(t *testing.T) { } defer cc.Close() - client := testpb.NewTestServiceClient(cc) + client := testgrpc.NewTestServiceClient(cc) // This Empty Call should match to a route with a correct action // (NonForwardingAction). Thus, this RPC should proceed as normal. There is @@ -739,8 +408,8 @@ func (s) TestRBACHTTPFilter(t *testing.T) { defer func() { envconfig.XDSRBAC = oldRBAC }() - rbac.RegisterForTesting() - defer rbac.UnregisterForTesting() + internal.RegisterRBACHTTPFilterForTesting() + defer internal.UnregisterRBACHTTPFilterForTesting() tests := []struct { name string rbacCfg *rpb.RBAC @@ -936,7 +605,7 @@ func (s) TestRBACHTTPFilter(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { func() { - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -970,7 +639,7 @@ func (s) TestRBACHTTPFilter(t *testing.T) { } defer cc.Close() - client := testpb.NewTestServiceClient(cc) + client := testgrpc.NewTestServiceClient(cc) if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != test.wantStatusEmptyCall { t.Fatalf("EmptyCall() returned err with status: %v, wantStatusEmptyCall: %v", status.Code(err), test.wantStatusEmptyCall) @@ -1121,7 +790,7 @@ func (s) TestRBACToggledOn_WithBadRouteConfiguration(t *testing.T) { envconfig.XDSRBAC = oldRBAC }() - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) @@ -1178,7 +847,7 @@ func (s) TestRBACToggledOff_WithBadRouteConfiguration(t *testing.T) { envconfig.XDSRBAC = oldRBAC }() - managementServer, nodeID, bootstrapContents, resolver, cleanup1 := setupManagementServer(t) + managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t) defer cleanup1() lis, cleanup2 := setupGRPCServer(t, bootstrapContents) diff --git a/xds/internal/test/xds_server_serving_mode_test.go b/test/xds/xds_server_serving_mode_test.go similarity index 96% rename from xds/internal/test/xds_server_serving_mode_test.go rename to test/xds/xds_server_serving_mode_test.go index 236a831c6..fe3a5612d 100644 --- a/xds/internal/test/xds_server_serving_mode_test.go +++ b/test/xds/xds_server_serving_mode_test.go @@ -1,6 +1,3 @@ -//go:build !386 -// +build !386 - /* * * Copyright 2021 gRPC authors. @@ -19,8 +16,7 @@ * */ -// Package xds_test contains e2e tests for xDS use. -package xds_test +package xds import ( "context" @@ -34,10 +30,11 @@ import ( "google.golang.org/grpc/credentials/insecure" xdscreds "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds" - "google.golang.org/grpc/xds/internal/testutils/e2e" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing" ) @@ -46,7 +43,7 @@ import ( // change callback is not invoked and client connections to the server are not // recycled. func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { - managementServer, nodeID, bootstrapContents, _, cleanup := setupManagementServer(t) + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t) defer cleanup() creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()}) @@ -68,7 +65,7 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { // Initialize an xDS-enabled gRPC server and register the stubServer on it. server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) defer server.Stop() - testpb.RegisterTestServiceServer(server, &testService{}) + testgrpc.RegisterTestServiceServer(server, &testService{}) // Setup the management server to respond with the listener resources. host, port, err := hostPortFromListener(lis) @@ -157,7 +154,7 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { // xDS enabled gRPC servers. It verifies that appropriate mode changes happen in // the server, and also verifies behavior of clientConns under these modes. func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { - managementServer, nodeID, bootstrapContents, _, cleanup := setupManagementServer(t) + managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t) defer cleanup() // Configure xDS credentials to be used on the server-side. @@ -359,7 +356,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { t.Helper() - c := testpb.NewTestServiceClient(cc) + c := testgrpc.NewTestServiceClient(cc) if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { t.Fatalf("rpc EmptyCall() failed: %v", err) } @@ -369,7 +366,7 @@ func waitForFailedRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) { t.Helper() // Attempt one RPC before waiting for the ticker to expire. - c := testpb.NewTestServiceClient(cc) + c := testgrpc.NewTestServiceClient(cc) if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil { return } diff --git a/xds/csds/csds_test.go b/xds/csds/csds_test.go index 1d772b67f..d8dcdcdfb 100644 --- a/xds/csds/csds_test.go +++ b/xds/csds/csds_test.go @@ -33,9 +33,9 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds" _ "google.golang.org/grpc/xds/internal/httpfilter/router" - "google.golang.org/grpc/xds/internal/testutils/e2e" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/protobuf/testing/protocmp" diff --git a/xds/internal/clusterspecifier/rls/rls.go b/xds/internal/clusterspecifier/rls/rls.go index 69fb7f4a9..a167cc5fa 100644 --- a/xds/internal/clusterspecifier/rls/rls.go +++ b/xds/internal/clusterspecifier/rls/rls.go @@ -38,27 +38,15 @@ func init() { if envconfig.XDSRLS { clusterspecifier.Register(rls{}) } -} -// RegisterForTesting registers the RLS Cluster Specifier Plugin for testing -// purposes, regardless of the XDSRLS environment variable. This is needed -// because there is no way to set the XDSRLS environment variable to true in a -// test before init() in this package is run. -// -// TODO: Remove this function once the RLS env var is removed. -func RegisterForTesting() { - clusterspecifier.Register(rls{}) -} - -// UnregisterForTesting unregisters the RLS Cluster Specifier Plugin for testing -// purposes. This is needed because there is no way to unregister the RLS -// Cluster Specifier Plugin after registering it solely for testing purposes -// using rls.RegisterForTesting(). -// -// TODO: Remove this function once the RLS env var is removed. -func UnregisterForTesting() { - for _, typeURL := range rls.TypeURLs(rls{}) { - clusterspecifier.UnregisterForTesting(typeURL) + // TODO: Remove these once the RLS env var is removed. + internal.RegisterRLSClusterSpecifierPluginForTesting = func() { + clusterspecifier.Register(rls{}) + } + internal.UnregisterRLSClusterSpecifierPluginForTesting = func() { + for _, typeURL := range rls.TypeURLs(rls{}) { + clusterspecifier.UnregisterForTesting(typeURL) + } } } diff --git a/xds/internal/httpfilter/fault/fault_test.go b/xds/internal/httpfilter/fault/fault_test.go index 6ee5e654c..e44f91a55 100644 --- a/xds/internal/httpfilter/fault/fault_test.go +++ b/xds/internal/httpfilter/fault/fault_test.go @@ -39,10 +39,10 @@ import ( "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "google.golang.org/grpc/xds/internal/testutils/e2e" "google.golang.org/protobuf/types/known/wrapperspb" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" diff --git a/xds/internal/httpfilter/rbac/rbac.go b/xds/internal/httpfilter/rbac/rbac.go index 3dc4b5682..209283c3b 100644 --- a/xds/internal/httpfilter/rbac/rbac.go +++ b/xds/internal/httpfilter/rbac/rbac.go @@ -27,6 +27,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/xds/rbac" @@ -41,26 +42,15 @@ func init() { if envconfig.XDSRBAC { httpfilter.Register(builder{}) } -} -// RegisterForTesting registers the RBAC HTTP Filter for testing purposes, -// regardless of the RBAC environment variable. This is needed because there is -// no way to set the RBAC environment variable to true in a test before init() -// in this package is run. -// -// TODO: Remove this function once the RBAC env var is removed. -func RegisterForTesting() { - httpfilter.Register(builder{}) -} - -// UnregisterForTesting unregisters the RBAC HTTP Filter for testing purposes. -// This is needed because there is no way to unregister the HTTP Filter after -// registering it solely for testing purposes using rbac.RegisterForTesting(). -// -// TODO: Remove this function once the RBAC env var is removed. -func UnregisterForTesting() { - for _, typeURL := range builder.TypeURLs(builder{}) { - httpfilter.UnregisterForTesting(typeURL) + // TODO: Remove these once the RBAC env var is removed. + internal.RegisterRBACHTTPFilterForTesting = func() { + httpfilter.Register(builder{}) + } + internal.UnregisterRBACHTTPFilterForTesting = func() { + for _, typeURL := range builder.TypeURLs(builder{}) { + httpfilter.UnregisterForTesting(typeURL) + } } } diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 8a613c4c4..4f31d9c44 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -25,6 +25,7 @@ import ( "strings" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" @@ -37,10 +38,10 @@ import ( const xdsScheme = "xds" -// NewBuilderForTesting creates a new xds resolver builder using a specific xds +// newBuilderForTesting creates a new xds resolver builder using a specific xds // bootstrap config, so tests can use multiple xds clients in different // ClientConns at the same time. -func NewBuilderForTesting(config []byte) (resolver.Builder, error) { +func newBuilderForTesting(config []byte) (resolver.Builder, error) { return &xdsResolverBuilder{ newXDSClient: func() (xdsclient.XDSClient, error) { return xdsclient.NewWithBootstrapContentsForTesting(config) @@ -53,6 +54,7 @@ var newXDSClient = func() (xdsclient.XDSClient, error) { return xdsclient.New() func init() { resolver.Register(&xdsResolverBuilder{}) + internal.NewXDSResolverWithConfigForTesting = newBuilderForTesting } type xdsResolverBuilder struct { diff --git a/xds/internal/server/listener_wrapper_test.go b/xds/internal/server/listener_wrapper_test.go index 85ac93c2e..2c1e8b75b 100644 --- a/xds/internal/server/listener_wrapper_test.go +++ b/xds/internal/server/listener_wrapper_test.go @@ -26,19 +26,21 @@ import ( "testing" "time" + "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/testutils/fakeclient" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" - "google.golang.org/grpc/internal/envconfig" - "google.golang.org/grpc/internal/grpctest" - "google.golang.org/grpc/internal/testutils" + _ "google.golang.org/grpc/xds/internal/httpfilter/router" - "google.golang.org/grpc/xds/internal/testutils/e2e" - "google.golang.org/grpc/xds/internal/testutils/fakeclient" - "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) const ( diff --git a/xds/internal/test/e2e/controlplane.go b/xds/internal/test/e2e/controlplane.go index b663cb31f..8f27ff053 100644 --- a/xds/internal/test/e2e/controlplane.go +++ b/xds/internal/test/e2e/controlplane.go @@ -21,8 +21,8 @@ import ( "fmt" "github.com/google/uuid" + "google.golang.org/grpc/internal/testutils/xds/e2e" xdsinternal "google.golang.org/grpc/internal/xds" - "google.golang.org/grpc/xds/internal/testutils/e2e" ) type controlPlane struct { diff --git a/xds/internal/test/e2e/e2e_test.go b/xds/internal/test/e2e/e2e_test.go index ca547a522..309c58010 100644 --- a/xds/internal/test/e2e/e2e_test.go +++ b/xds/internal/test/e2e/e2e_test.go @@ -27,11 +27,12 @@ import ( "testing" "time" + "google.golang.org/grpc/internal/testutils/xds/e2e" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" testpb "google.golang.org/grpc/interop/grpc_testing" - "google.golang.org/grpc/xds/internal/testutils/e2e" ) var ( diff --git a/xds/internal/test/xds_client_integration_test.go b/xds/internal/test/xds_client_integration_test.go deleted file mode 100644 index e9e3fd584..000000000 --- a/xds/internal/test/xds_client_integration_test.go +++ /dev/null @@ -1,384 +0,0 @@ -//go:build !386 -// +build !386 - -/* - * - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds_test - -import ( - "context" - "fmt" - "net" - "testing" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal/envconfig" - rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" - "google.golang.org/grpc/internal/stubserver" - "google.golang.org/grpc/internal/testutils" - rlstest "google.golang.org/grpc/internal/testutils/rls" - "google.golang.org/grpc/status" - rlscsp "google.golang.org/grpc/xds/internal/clusterspecifier/rls" - "google.golang.org/grpc/xds/internal/testutils/e2e" - "google.golang.org/protobuf/types/known/durationpb" - - _ "google.golang.org/grpc/balancer/rls" // Register the RLS Load Balancing policy. - _ "google.golang.org/grpc/xds/internal/clusterspecifier/rls" // Register the RLS Cluster Specifier Plugin. - - v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - wrapperspb "github.com/golang/protobuf/ptypes/wrappers" - testpb "google.golang.org/grpc/test/grpc_testing" -) - -// clientSetup performs a bunch of steps common to all xDS client tests here: -// - spin up a gRPC server and register the test service on it -// - create a local TCP listener and start serving on it -// -// Returns the following: -// - the port the server is listening on -// - cleanup function to be invoked by the tests when done -func clientSetup(t *testing.T, tss testpb.TestServiceServer) (uint32, func()) { - // Initialize a gRPC server and register the stubServer on it. - server := grpc.NewServer() - testpb.RegisterTestServiceServer(server, tss) - - // Create a local listener and pass it to Serve(). - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("testutils.LocalTCPListener() failed: %v", err) - } - - go func() { - if err := server.Serve(lis); err != nil { - t.Errorf("Serve() failed: %v", err) - } - }() - - return uint32(lis.Addr().(*net.TCPAddr).Port), func() { - server.Stop() - } -} - -func (s) TestClientSideXDS(t *testing.T) { - managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) - defer cleanup1() - - port, cleanup2 := clientSetup(t, &testService{}) - defer cleanup2() - - const serviceName = "my-service-client-side-xds" - resources := e2e.DefaultClientResources(e2e.ResourceParams{ - DialTarget: serviceName, - NodeID: nodeID, - Host: "localhost", - Port: port, - SecLevel: e2e.SecurityLevelNone, - }) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Create a ClientConn and make a successful RPC. - cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer cc.Close() - - client := testpb.NewTestServiceClient(cc) - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { - t.Fatalf("rpc EmptyCall() failed: %v", err) - } -} - -func (s) TestClientSideRetry(t *testing.T) { - ctr := 0 - errs := []codes.Code{codes.ResourceExhausted} - ss := &stubserver.StubServer{ - EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { - defer func() { ctr++ }() - if ctr < len(errs) { - return nil, status.Errorf(errs[ctr], "this should be retried") - } - return &testpb.Empty{}, nil - }, - } - - managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) - defer cleanup1() - - port, cleanup2 := clientSetup(t, ss) - defer cleanup2() - - const serviceName = "my-service-client-side-xds" - resources := e2e.DefaultClientResources(e2e.ResourceParams{ - DialTarget: serviceName, - NodeID: nodeID, - Host: "localhost", - Port: port, - SecLevel: e2e.SecurityLevelNone, - }) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Create a ClientConn and make a successful RPC. - cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer cc.Close() - - client := testpb.NewTestServiceClient(cc) - defer cancel() - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.ResourceExhausted { - t.Fatalf("rpc EmptyCall() = _, %v; want _, ResourceExhausted", err) - } - - testCases := []struct { - name string - vhPolicy *v3routepb.RetryPolicy - routePolicy *v3routepb.RetryPolicy - errs []codes.Code // the errors returned by the server for each RPC - tryAgainErr codes.Code // the error that would be returned if we are still using the old retry policies. - errWant codes.Code - }{{ - name: "virtualHost only, fail", - vhPolicy: &v3routepb.RetryPolicy{ - RetryOn: "resource-exhausted,unavailable", - NumRetries: &wrapperspb.UInt32Value{Value: 1}, - }, - errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, - routePolicy: nil, - tryAgainErr: codes.ResourceExhausted, - errWant: codes.Unavailable, - }, { - name: "virtualHost only", - vhPolicy: &v3routepb.RetryPolicy{ - RetryOn: "resource-exhausted, unavailable", - NumRetries: &wrapperspb.UInt32Value{Value: 2}, - }, - errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, - routePolicy: nil, - tryAgainErr: codes.Unavailable, - errWant: codes.OK, - }, { - name: "virtualHost+route, fail", - vhPolicy: &v3routepb.RetryPolicy{ - RetryOn: "resource-exhausted,unavailable", - NumRetries: &wrapperspb.UInt32Value{Value: 2}, - }, - routePolicy: &v3routepb.RetryPolicy{ - RetryOn: "resource-exhausted", - NumRetries: &wrapperspb.UInt32Value{Value: 2}, - }, - errs: []codes.Code{codes.ResourceExhausted, codes.Unavailable}, - tryAgainErr: codes.OK, - errWant: codes.Unavailable, - }, { - name: "virtualHost+route", - vhPolicy: &v3routepb.RetryPolicy{ - RetryOn: "resource-exhausted", - NumRetries: &wrapperspb.UInt32Value{Value: 2}, - }, - routePolicy: &v3routepb.RetryPolicy{ - RetryOn: "unavailable", - NumRetries: &wrapperspb.UInt32Value{Value: 2}, - }, - errs: []codes.Code{codes.Unavailable}, - tryAgainErr: codes.Unavailable, - errWant: codes.OK, - }, { - name: "virtualHost+route, not enough attempts", - vhPolicy: &v3routepb.RetryPolicy{ - RetryOn: "unavailable", - NumRetries: &wrapperspb.UInt32Value{Value: 2}, - }, - routePolicy: &v3routepb.RetryPolicy{ - RetryOn: "unavailable", - NumRetries: &wrapperspb.UInt32Value{Value: 1}, - }, - errs: []codes.Code{codes.Unavailable, codes.Unavailable}, - tryAgainErr: codes.OK, - errWant: codes.Unavailable, - }} - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - errs = tc.errs - - // Confirm tryAgainErr is correct before updating resources. - ctr = 0 - _, err := client.EmptyCall(ctx, &testpb.Empty{}) - if code := status.Code(err); code != tc.tryAgainErr { - t.Fatalf("with old retry policy: EmptyCall() = _, %v; want _, %v", err, tc.tryAgainErr) - } - - resources.Routes[0].VirtualHosts[0].RetryPolicy = tc.vhPolicy - resources.Routes[0].VirtualHosts[0].Routes[0].GetRoute().RetryPolicy = tc.routePolicy - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - for { - ctr = 0 - _, err := client.EmptyCall(ctx, &testpb.Empty{}) - if code := status.Code(err); code == tc.tryAgainErr { - continue - } else if code != tc.errWant { - t.Fatalf("rpc EmptyCall() = _, %v; want _, %v", err, tc.errWant) - } - break - } - }) - } -} - -// defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a -// client to connect to a server with a RLS Load Balancer as a child of Cluster Manager. -func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions { - routeConfigName := "route-" + params.DialTarget - clusterName := "cluster-" + params.DialTarget - endpointsName := "endpoints-" + params.DialTarget - return e2e.UpdateOptions{ - NodeID: params.NodeID, - Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)}, - Routes: []*v3routepb.RouteConfiguration{defaultRouteConfigWithRLSCSP(routeConfigName, params.DialTarget, rlsProto)}, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, params.SecLevel)}, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})}, - } -} - -// defaultRouteConfigWithRLSCSP returns a basic xds RouteConfig resource with an -// RLS Cluster Specifier Plugin configured as the route. -func defaultRouteConfigWithRLSCSP(routeName, ldsTarget string, rlsProto *rlspb.RouteLookupConfig) *v3routepb.RouteConfiguration { - return &v3routepb.RouteConfiguration{ - Name: routeName, - ClusterSpecifierPlugins: []*v3routepb.ClusterSpecifierPlugin{ - { - Extension: &v3corepb.TypedExtensionConfig{ - Name: "rls-csp", - TypedConfig: testutils.MarshalAny(&rlspb.RouteLookupClusterSpecifier{ - RouteLookupConfig: rlsProto, - }), - }, - }, - }, - VirtualHosts: []*v3routepb.VirtualHost{{ - Domains: []string{ldsTarget}, - Routes: []*v3routepb.Route{{ - Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, - Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ - ClusterSpecifier: &v3routepb.RouteAction_ClusterSpecifierPlugin{ClusterSpecifierPlugin: "rls-csp"}, - }}, - }}, - }}, - } -} - -// TestRLSinxDS tests an xDS configured system with a RLS Balancer present. -// This test sets up the RLS Balancer using the RLS Cluster Specifier Plugin, -// spins up a test service and has a fake RLS Server correctly respond with a target -// corresponding to this test service. This test asserts an RPC proceeds as normal -// with the RLS Balancer as part of system. -func (s) TestRLSinxDS(t *testing.T) { - oldRLS := envconfig.XDSRLS - envconfig.XDSRLS = true - rlscsp.RegisterForTesting() - defer func() { - envconfig.XDSRLS = oldRLS - rlscsp.UnregisterForTesting() - }() - - // Set up all components and configuration necessary - management server, - // xDS resolver, fake RLS Server, and xDS configuration which specifies a - // RLS Balancer that communicates to this set up fake RLS Server. - managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) - defer cleanup1() - port, cleanup2 := clientSetup(t, &testService{}) - defer cleanup2() - - lis := testutils.NewListenerWrapper(t, nil) - rlsServer, rlsRequestCh := rlstest.SetupFakeRLSServer(t, lis) - rlsProto := &rlspb.RouteLookupConfig{ - GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}}, - LookupService: rlsServer.Address, - LookupServiceTimeout: durationpb.New(defaultTestTimeout), - CacheSizeBytes: 1024, - } - - const serviceName = "my-service-client-side-xds" - resources := defaultClientResourcesWithRLSCSP(e2e.ResourceParams{ - DialTarget: serviceName, - NodeID: nodeID, - Host: "localhost", - Port: port, - SecLevel: e2e.SecurityLevelNone, - }, rlsProto) - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - if err := managementServer.Update(ctx, resources); err != nil { - t.Fatal(err) - } - - // Configure the fake RLS Server to set the RLS Balancers child CDS - // Cluster's name as the target for the RPC to use. - rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { - return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{"cluster-" + serviceName}}} - }) - - // Create a ClientConn and make a successful RPC. - cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) - if err != nil { - t.Fatalf("failed to dial local test server: %v", err) - } - defer cc.Close() - - client := testpb.NewTestServiceClient(cc) - // Successfully sending the RPC will require the RLS Load Balancer to - // communicate with the fake RLS Server for information about the target. - if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { - t.Fatalf("rpc EmptyCall() failed: %v", err) - } - - // These RLS Verifications makes sure the RLS Load Balancer is actually part - // of the xDS Configured system that correctly sends out RPC. - - // Verify connection is established to RLS Server. - if _, err = lis.NewConnCh.Receive(ctx); err != nil { - t.Fatal("Timeout when waiting for RLS LB policy to create control channel") - } - - // Verify an rls request is sent out to fake RLS Server. - select { - case <-ctx.Done(): - t.Fatalf("Timeout when waiting for an RLS request to be sent out") - case <-rlsRequestCh: - } -} diff --git a/xds/internal/test/xds_integration_test.go b/xds/internal/test/xds_integration_test.go deleted file mode 100644 index 4b7cca3b8..000000000 --- a/xds/internal/test/xds_integration_test.go +++ /dev/null @@ -1,200 +0,0 @@ -//go:build !386 -// +build !386 - -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package xds_test contains e2e tests for xDS use. -package xds_test - -import ( - "context" - "crypto/tls" - "crypto/x509" - "encoding/json" - "fmt" - "io/ioutil" - "os" - "path" - "testing" - "time" - - "github.com/google/uuid" - - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/internal/grpctest" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/testdata" - "google.golang.org/grpc/xds" - "google.golang.org/grpc/xds/internal/testutils/e2e" - - xdsinternal "google.golang.org/grpc/internal/xds" - testpb "google.golang.org/grpc/test/grpc_testing" -) - -const ( - defaultTestTimeout = 10 * time.Second - defaultTestShortTimeout = 100 * time.Millisecond -) - -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} - -type testService struct { - testpb.TestServiceServer -} - -func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { - return &testpb.Empty{}, nil -} - -func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{}, nil -} - -func createTmpFile(src, dst string) error { - data, err := ioutil.ReadFile(src) - if err != nil { - return fmt.Errorf("ioutil.ReadFile(%q) failed: %v", src, err) - } - if err := ioutil.WriteFile(dst, data, os.ModePerm); err != nil { - return fmt.Errorf("ioutil.WriteFile(%q) failed: %v", dst, err) - } - return nil -} - -// createTempDirWithFiles creates a temporary directory under the system default -// tempDir with the given dirSuffix. It also reads from certSrc, keySrc and -// rootSrc files are creates appropriate files under the newly create tempDir. -// Returns the name of the created tempDir. -func createTmpDirWithFiles(dirSuffix, certSrc, keySrc, rootSrc string) (string, error) { - // Create a temp directory. Passing an empty string for the first argument - // uses the system temp directory. - dir, err := ioutil.TempDir("", dirSuffix) - if err != nil { - return "", fmt.Errorf("ioutil.TempDir() failed: %v", err) - } - - if err := createTmpFile(testdata.Path(certSrc), path.Join(dir, certFile)); err != nil { - return "", err - } - if err := createTmpFile(testdata.Path(keySrc), path.Join(dir, keyFile)); err != nil { - return "", err - } - if err := createTmpFile(testdata.Path(rootSrc), path.Join(dir, rootFile)); err != nil { - return "", err - } - return dir, nil -} - -// createClientTLSCredentials creates client-side TLS transport credentials. -func createClientTLSCredentials(t *testing.T) credentials.TransportCredentials { - t.Helper() - - cert, err := tls.LoadX509KeyPair(testdata.Path("x509/client1_cert.pem"), testdata.Path("x509/client1_key.pem")) - if err != nil { - t.Fatalf("tls.LoadX509KeyPair(x509/client1_cert.pem, x509/client1_key.pem) failed: %v", err) - } - b, err := ioutil.ReadFile(testdata.Path("x509/server_ca_cert.pem")) - if err != nil { - t.Fatalf("ioutil.ReadFile(x509/server_ca_cert.pem) failed: %v", err) - } - roots := x509.NewCertPool() - if !roots.AppendCertsFromPEM(b) { - t.Fatal("failed to append certificates") - } - return credentials.NewTLS(&tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: roots, - ServerName: "x.test.example.com", - }) -} - -// setupManagement server performs the following: -// - spin up an xDS management server on a local port -// - set up certificates for consumption by the file_watcher plugin -// - creates a bootstrap file in a temporary location -// - creates an xDS resolver using the above bootstrap contents -// -// Returns the following: -// - management server -// - nodeID to be used by the client when connecting to the management server -// - bootstrap contents to be used by the client -// - xDS resolver builder to be used by the client -// - a cleanup function to be invoked at the end of the test -func setupManagementServer(t *testing.T) (*e2e.ManagementServer, string, []byte, resolver.Builder, func()) { - t.Helper() - - // Spin up an xDS management server on a local port. - server, err := e2e.StartManagementServer() - if err != nil { - t.Fatalf("Failed to spin up the xDS management server: %v", err) - } - defer func() { - if err != nil { - server.Stop() - } - }() - - // Create a directory to hold certs and key files used on the server side. - serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem") - if err != nil { - server.Stop() - t.Fatal(err) - } - - // Create a directory to hold certs and key files used on the client side. - clientDir, err := createTmpDirWithFiles("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem") - if err != nil { - server.Stop() - t.Fatal(err) - } - - // Create certificate providers section of the bootstrap config with entries - // for both the client and server sides. - cpc := map[string]json.RawMessage{ - e2e.ServerSideCertProviderInstance: e2e.DefaultFileWatcherConfig(path.Join(serverDir, certFile), path.Join(serverDir, keyFile), path.Join(serverDir, rootFile)), - e2e.ClientSideCertProviderInstance: e2e.DefaultFileWatcherConfig(path.Join(clientDir, certFile), path.Join(clientDir, keyFile), path.Join(clientDir, rootFile)), - } - - // Create a bootstrap file in a temporary directory. - nodeID := uuid.New().String() - bootstrapContents, err := xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{ - Version: xdsinternal.TransportV3, - NodeID: nodeID, - ServerURI: server.Address, - CertificateProviders: cpc, - ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate, - }) - if err != nil { - server.Stop() - t.Fatalf("Failed to create bootstrap file: %v", err) - } - resolver, err := xds.NewXDSResolverWithConfigForTesting(bootstrapContents) - if err != nil { - server.Stop() - t.Fatalf("Failed to create xDS resolver for testing: %v", err) - } - - return server, nodeID, bootstrapContents, resolver, func() { server.Stop() } -} diff --git a/xds/internal/xdsclient/xdsresource/filter_chain_test.go b/xds/internal/xdsclient/xdsresource/filter_chain_test.go index 71e537f29..c141619c5 100644 --- a/xds/internal/xdsclient/xdsresource/filter_chain_test.go +++ b/xds/internal/xdsclient/xdsresource/filter_chain_test.go @@ -40,9 +40,9 @@ import ( "google.golang.org/grpc/internal/envconfig" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/httpfilter/router" - "google.golang.org/grpc/xds/internal/testutils/e2e" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" ) diff --git a/xds/internal/xdsclient/xdsresource/unmarshal_lds_test.go b/xds/internal/xdsclient/xdsresource/unmarshal_lds_test.go index 9150d64df..f46cf3801 100644 --- a/xds/internal/xdsclient/xdsresource/unmarshal_lds_test.go +++ b/xds/internal/xdsclient/xdsresource/unmarshal_lds_test.go @@ -29,10 +29,8 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds/internal/httpfilter" - _ "google.golang.org/grpc/xds/internal/httpfilter/rbac" - _ "google.golang.org/grpc/xds/internal/httpfilter/router" - "google.golang.org/grpc/xds/internal/testutils/e2e" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" "google.golang.org/protobuf/types/known/durationpb" @@ -53,6 +51,9 @@ import ( anypb "github.com/golang/protobuf/ptypes/any" spb "github.com/golang/protobuf/ptypes/struct" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" + + _ "google.golang.org/grpc/xds/internal/httpfilter/rbac" // Register the RBAC HTTP filter. + _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter. ) func (s) TestUnmarshalListener_ClientSide(t *testing.T) { diff --git a/xds/server_test.go b/xds/server_test.go index ac0c573fd..4ad86879d 100644 --- a/xds/server_test.go +++ b/xds/server_test.go @@ -35,9 +35,9 @@ import ( "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" _ "google.golang.org/grpc/xds/internal/httpfilter/router" xdstestutils "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/e2e" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" diff --git a/xds/xds.go b/xds/xds.go index 1b2b0c579..3ff3c76bc 100644 --- a/xds/xds.go +++ b/xds/xds.go @@ -32,17 +32,15 @@ import ( v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3" "google.golang.org/grpc" + _ "google.golang.org/grpc/credentials/tls/certprovider/pemfile" // Register the file watcher certificate provider plugin. internaladmin "google.golang.org/grpc/internal/admin" - "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/csds" - - _ "google.golang.org/grpc/credentials/tls/certprovider/pemfile" // Register the file watcher certificate provider plugin. _ "google.golang.org/grpc/xds/internal/balancer" // Register the balancers. _ "google.golang.org/grpc/xds/internal/clusterspecifier/rls" // Register the RLS cluster specifier plugin. Note that this does not register the RLS LB policy. _ "google.golang.org/grpc/xds/internal/httpfilter/fault" // Register the fault injection filter. _ "google.golang.org/grpc/xds/internal/httpfilter/rbac" // Register the RBAC filter. _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter. - xdsresolver "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver. + _ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver _ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v2" // Register the v2 xDS API client. _ "google.golang.org/grpc/xds/internal/xdsclient/controller/version/v3" // Register the v3 xDS API client. ) @@ -75,21 +73,3 @@ func init() { return csdss.Close, nil }) } - -// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using -// the provided xds bootstrap config instead of the global configuration from -// the supported environment variables. The resolver.Builder is meant to be -// used in conjunction with the grpc.WithResolvers DialOption. -// -// Testing Only -// -// This function should ONLY be used for testing and may not work with some -// other features, including the CSDS service. -// -// Experimental -// -// Notice: This API is EXPERIMENTAL and may be changed or removed in a -// later release. -func NewXDSResolverWithConfigForTesting(bootstrapConfig []byte) (resolver.Builder, error) { - return xdsresolver.NewBuilderForTesting(bootstrapConfig) -}