xds: add xDS transport custom dial options support (#7997)

This commit is contained in:
Yousuk Seung 2025-01-29 04:30:39 -08:00 committed by GitHub
parent 73e447014d
commit 59411f22d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 181 additions and 7 deletions

View File

@ -179,9 +179,9 @@ type ServerConfig struct {
// As part of unmarshalling the JSON config into this struct, we ensure that
// the credentials config is valid by building an instance of the specified
// credentials and store it here for easy access.
selectedCreds ChannelCreds
credsDialOption grpc.DialOption
dialerOption grpc.DialOption
selectedCreds ChannelCreds
credsDialOption grpc.DialOption
extraDialOptions []grpc.DialOption
cleanups []func()
}
@ -224,8 +224,8 @@ func (sc *ServerConfig) ServerFeaturesIgnoreResourceDeletion() bool {
// server.
func (sc *ServerConfig) DialOptions() []grpc.DialOption {
dopts := []grpc.DialOption{sc.credsDialOption}
if sc.dialerOption != nil {
dopts = append(dopts, sc.dialerOption)
if sc.extraDialOptions != nil {
dopts = append(dopts, sc.extraDialOptions...)
}
return dopts
}
@ -283,11 +283,18 @@ func (sc *ServerConfig) MarshalJSON() ([]byte, error) {
}
// dialer captures the Dialer method specified via the credentials bundle.
// Deprecated: use extradDialOptions. Will take precedence over this.
type dialer interface {
// Dialer specifies how to dial the xDS server.
Dialer(context.Context, string) (net.Conn, error)
}
// extraDialOptions captures custom dial options specified via
// credentials.Bundle.
type extraDialOptions interface {
DialOptions() []grpc.DialOption
}
// UnmarshalJSON takes the json data (a server) and unmarshals it to the struct.
func (sc *ServerConfig) UnmarshalJSON(data []byte) error {
server := serverConfigJSON{}
@ -311,8 +318,10 @@ func (sc *ServerConfig) UnmarshalJSON(data []byte) error {
}
sc.selectedCreds = cc
sc.credsDialOption = grpc.WithCredentialsBundle(bundle)
if d, ok := bundle.(dialer); ok {
sc.dialerOption = grpc.WithContextDialer(d.Dialer)
if d, ok := bundle.(extraDialOptions); ok {
sc.extraDialOptions = d.DialOptions()
} else if d, ok := bundle.(dialer); ok {
sc.extraDialOptions = []grpc.DialOption{grpc.WithContextDialer(d.Dialer)}
}
sc.cleanups = append(sc.cleanups, cancel)
break

View File

@ -0,0 +1,165 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsclient_test
import (
"context"
"encoding/json"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/bootstrap"
xci "google.golang.org/grpc/xds/internal/xdsclient/internal"
)
// nopDialOption is a no-op grpc.DialOption with a name.
type nopDialOption struct {
grpc.EmptyDialOption
name string
}
// testCredsBundle implements `credentials.Bundle` and `extraDialOptions`.
type testCredsBundle struct {
credentials.Bundle
testDialOptNames []string
}
func (t *testCredsBundle) DialOptions() []grpc.DialOption {
var opts []grpc.DialOption
for _, name := range t.testDialOptNames {
opts = append(opts, &nopDialOption{name: name})
}
return opts
}
type testCredsBuilder struct {
testDialOptNames []string
}
func (t *testCredsBuilder) Build(config json.RawMessage) (credentials.Bundle, func(), error) {
return &testCredsBundle{
Bundle: insecure.NewBundle(),
testDialOptNames: t.testDialOptNames,
}, func() {}, nil
}
func (t *testCredsBuilder) Name() string {
return "test_dialer_creds"
}
func (s) TestClientCustomDialOptsFromCredentialsBundle(t *testing.T) {
// Create and register the credentials bundle builder.
credsBuilder := &testCredsBuilder{
testDialOptNames: []string{"opt1", "opt2", "opt3"},
}
bootstrap.RegisterCredentials(credsBuilder)
// Start an xDS management server.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create bootstrap configuration pointing to the above management server.
nodeID := uuid.New().String()
bc, err := internalbootstrap.NewContentsForTesting(internalbootstrap.ConfigOptionsForTesting{
Servers: []byte(fmt.Sprintf(`[{
"server_uri": %q,
"channel_creds": [{
"type": %q,
"config": {"mgmt_server_address": %q}
}]
}]`, mgmtServer.Address, credsBuilder.Name(), mgmtServer.Address)),
Node: []byte(fmt.Sprintf(`{"id": "%s"}`, nodeID)),
})
if err != nil {
t.Fatalf("Failed to create bootstrap configuration: %v", err)
}
// Create an xDS resolver with the above bootstrap configuration.
var resolverBuilder resolver.Builder
if newResolver := internal.NewXDSResolverWithConfigForTesting; newResolver != nil {
resolverBuilder, err = newResolver.(func([]byte) (resolver.Builder, error))(bc)
if err != nil {
t.Fatalf("Failed to create xDS resolver for testing: %v", err)
}
}
// Spin up a test backend.
server := stubserver.StartTestService(t, nil)
defer server.Stop()
// Configure client side xDS resources on the management server.
const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: testutils.ParsePort(t, server.Address),
SecLevel: e2e.SecurityLevelNone,
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Intercept a grpc.NewClient call from the xds client to validate DialOptions.
xci.GRPCNewClient = func(target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
got := map[string]int{}
for _, opt := range opts {
if mo, ok := opt.(*nopDialOption); ok {
got[mo.name]++
}
}
want := map[string]int{}
for _, name := range credsBuilder.testDialOptNames {
want[name]++
}
if !cmp.Equal(got, want) {
t.Errorf("grpc.NewClient() was called with unexpected DialOptions: got %v, want %v", got, want)
}
return grpc.NewClient(target, opts...)
}
defer func() { xci.GRPCNewClient = grpc.NewClient }()
// Create a ClientConn and make a successful RPC. The insecure transport
// credentials passed into the gRPC.NewClient is the credentials for the
// data plane communication with the test backend.
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolverBuilder))
if err != nil {
t.Fatalf("Failed to dial local test server: %v", err)
}
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}
cc.Close()
}