From 59411f22d9e308e057d1d41f861a716264065be1 Mon Sep 17 00:00:00 2001 From: Yousuk Seung Date: Wed, 29 Jan 2025 04:30:39 -0800 Subject: [PATCH] xds: add xDS transport custom dial options support (#7997) --- internal/xds/bootstrap/bootstrap.go | 23 ++- .../tests/client_custom_dialopts_test.go | 165 ++++++++++++++++++ 2 files changed, 181 insertions(+), 7 deletions(-) create mode 100644 xds/internal/xdsclient/tests/client_custom_dialopts_test.go diff --git a/internal/xds/bootstrap/bootstrap.go b/internal/xds/bootstrap/bootstrap.go index 35aeea701..f3211c1e9 100644 --- a/internal/xds/bootstrap/bootstrap.go +++ b/internal/xds/bootstrap/bootstrap.go @@ -179,9 +179,9 @@ type ServerConfig struct { // As part of unmarshalling the JSON config into this struct, we ensure that // the credentials config is valid by building an instance of the specified // credentials and store it here for easy access. - selectedCreds ChannelCreds - credsDialOption grpc.DialOption - dialerOption grpc.DialOption + selectedCreds ChannelCreds + credsDialOption grpc.DialOption + extraDialOptions []grpc.DialOption cleanups []func() } @@ -224,8 +224,8 @@ func (sc *ServerConfig) ServerFeaturesIgnoreResourceDeletion() bool { // server. func (sc *ServerConfig) DialOptions() []grpc.DialOption { dopts := []grpc.DialOption{sc.credsDialOption} - if sc.dialerOption != nil { - dopts = append(dopts, sc.dialerOption) + if sc.extraDialOptions != nil { + dopts = append(dopts, sc.extraDialOptions...) } return dopts } @@ -283,11 +283,18 @@ func (sc *ServerConfig) MarshalJSON() ([]byte, error) { } // dialer captures the Dialer method specified via the credentials bundle. +// Deprecated: use extradDialOptions. Will take precedence over this. type dialer interface { // Dialer specifies how to dial the xDS server. Dialer(context.Context, string) (net.Conn, error) } +// extraDialOptions captures custom dial options specified via +// credentials.Bundle. +type extraDialOptions interface { + DialOptions() []grpc.DialOption +} + // UnmarshalJSON takes the json data (a server) and unmarshals it to the struct. func (sc *ServerConfig) UnmarshalJSON(data []byte) error { server := serverConfigJSON{} @@ -311,8 +318,10 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error { } sc.selectedCreds = cc sc.credsDialOption = grpc.WithCredentialsBundle(bundle) - if d, ok := bundle.(dialer); ok { - sc.dialerOption = grpc.WithContextDialer(d.Dialer) + if d, ok := bundle.(extraDialOptions); ok { + sc.extraDialOptions = d.DialOptions() + } else if d, ok := bundle.(dialer); ok { + sc.extraDialOptions = []grpc.DialOption{grpc.WithContextDialer(d.Dialer)} } sc.cleanups = append(sc.cleanups, cancel) break diff --git a/xds/internal/xdsclient/tests/client_custom_dialopts_test.go b/xds/internal/xdsclient/tests/client_custom_dialopts_test.go new file mode 100644 index 000000000..268bc79ea --- /dev/null +++ b/xds/internal/xdsclient/tests/client_custom_dialopts_test.go @@ -0,0 +1,165 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/e2e" + internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap" + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/bootstrap" + xci "google.golang.org/grpc/xds/internal/xdsclient/internal" +) + +// nopDialOption is a no-op grpc.DialOption with a name. +type nopDialOption struct { + grpc.EmptyDialOption + name string +} + +// testCredsBundle implements `credentials.Bundle` and `extraDialOptions`. +type testCredsBundle struct { + credentials.Bundle + testDialOptNames []string +} + +func (t *testCredsBundle) DialOptions() []grpc.DialOption { + var opts []grpc.DialOption + for _, name := range t.testDialOptNames { + opts = append(opts, &nopDialOption{name: name}) + } + return opts +} + +type testCredsBuilder struct { + testDialOptNames []string +} + +func (t *testCredsBuilder) Build(config json.RawMessage) (credentials.Bundle, func(), error) { + return &testCredsBundle{ + Bundle: insecure.NewBundle(), + testDialOptNames: t.testDialOptNames, + }, func() {}, nil +} + +func (t *testCredsBuilder) Name() string { + return "test_dialer_creds" +} + +func (s) TestClientCustomDialOptsFromCredentialsBundle(t *testing.T) { + // Create and register the credentials bundle builder. + credsBuilder := &testCredsBuilder{ + testDialOptNames: []string{"opt1", "opt2", "opt3"}, + } + bootstrap.RegisterCredentials(credsBuilder) + + // Start an xDS management server. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + // Create bootstrap configuration pointing to the above management server. + nodeID := uuid.New().String() + bc, err := internalbootstrap.NewContentsForTesting(internalbootstrap.ConfigOptionsForTesting{ + Servers: []byte(fmt.Sprintf(`[{ + "server_uri": %q, + "channel_creds": [{ + "type": %q, + "config": {"mgmt_server_address": %q} + }] + }]`, mgmtServer.Address, credsBuilder.Name(), mgmtServer.Address)), + Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)), + }) + if err != nil { + t.Fatalf("Failed to create bootstrap configuration: %v", err) + } + + // Create an xDS resolver with the above bootstrap configuration. + var resolverBuilder resolver.Builder + if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil { + resolverBuilder, err = newResolver.(func([]byte) (resolver.Builder, error))(bc) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + } + + // Spin up a test backend. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + + // Configure client side xDS resources on the management server. + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: testutils.ParsePort(t, server.Address), + SecLevel: e2e.SecurityLevelNone, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Intercept a grpc.NewClient call from the xds client to validate DialOptions. + xci.GRPCNewClient = func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + got := map[string]int{} + for _, opt := range opts { + if mo, ok := opt.(*nopDialOption); ok { + got[mo.name]++ + } + } + want := map[string]int{} + for _, name := range credsBuilder.testDialOptNames { + want[name]++ + } + if !cmp.Equal(got, want) { + t.Errorf("grpc.NewClient() was called with unexpected DialOptions: got %v, want %v", got, want) + } + return grpc.NewClient(target, opts...) + } + defer func() { xci.GRPCNewClient = grpc.NewClient }() + + // Create a ClientConn and make a successful RPC. The insecure transport + // credentials passed into the gRPC.NewClient is the credentials for the + // data plane communication with the test backend. + cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder)) + if err != nil { + t.Fatalf("Failed to dial local test server: %v", err) + } + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + cc.Close() +}