mirror of https://github.com/grpc/grpc-go.git
xds: add test-only injection of xds config to client and server (#4476)
This commit is contained in:
parent
e5cad3dcff
commit
3508452162
|
@ -65,11 +65,32 @@ type BootstrapOptions struct {
|
|||
// completed successfully. It is the responsibility of the caller to invoke the
|
||||
// cleanup function at the end of the test.
|
||||
func SetupBootstrapFile(opts BootstrapOptions) (func(), error) {
|
||||
bootstrapContents, err := BootstrapContents(opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f, err := ioutil.TempFile("", "test_xds_bootstrap_*")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(f.Name(), bootstrapContents, 0644); err != nil {
|
||||
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
|
||||
}
|
||||
logger.Infof("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents)
|
||||
|
||||
origBootstrapFileName := env.BootstrapFileName
|
||||
env.BootstrapFileName = f.Name()
|
||||
return func() {
|
||||
os.Remove(f.Name())
|
||||
env.BootstrapFileName = origBootstrapFileName
|
||||
}, nil
|
||||
}
|
||||
|
||||
// BootstrapContents returns the contents to go into a bootstrap file,
|
||||
// environment, or configuration passed to
|
||||
// xds.NewXDSResolverWithConfigForTesting.
|
||||
func BootstrapContents(opts BootstrapOptions) ([]byte, error) {
|
||||
cfg := &bootstrapConfig{
|
||||
XdsServers: []server{
|
||||
{
|
||||
|
@ -100,17 +121,7 @@ func SetupBootstrapFile(opts BootstrapOptions) (func(), error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
|
||||
}
|
||||
if err := ioutil.WriteFile(f.Name(), bootstrapContents, 0644); err != nil {
|
||||
return nil, fmt.Errorf("failed to created bootstrap file: %v", err)
|
||||
}
|
||||
logger.Infof("Created bootstrap file at %q with contents: %s\n", f.Name(), bootstrapContents)
|
||||
|
||||
origBootstrapFileName := env.BootstrapFileName
|
||||
env.BootstrapFileName = f.Name()
|
||||
return func() {
|
||||
os.Remove(f.Name())
|
||||
env.BootstrapFileName = origBootstrapFileName
|
||||
}, nil
|
||||
return bootstrapContents, nil
|
||||
}
|
||||
|
||||
type bootstrapConfig struct {
|
||||
|
|
|
@ -59,7 +59,7 @@ var (
|
|||
// not deal with subConns.
|
||||
return builder.Build(cc, opts), nil
|
||||
}
|
||||
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
|
||||
newXDSClient func() (xdsClientInterface, error)
|
||||
buildProvider = buildProviderFunc
|
||||
)
|
||||
|
||||
|
@ -86,12 +86,15 @@ func (cdsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
|
|||
b.logger = prefixLogger((b))
|
||||
b.logger.Infof("Created")
|
||||
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
b.logger.Errorf("failed to create xds-client: %v", err)
|
||||
return nil
|
||||
if newXDSClient != nil {
|
||||
// For tests
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
b.logger.Errorf("failed to create xds-client: %v", err)
|
||||
return nil
|
||||
}
|
||||
b.xdsClient = client
|
||||
}
|
||||
b.xdsClient = client
|
||||
|
||||
var creds credentials.TransportCredentials
|
||||
switch {
|
||||
|
@ -359,7 +362,15 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) {
|
|||
lbCfg.LrsLoadReportingServerName = new(string)
|
||||
|
||||
}
|
||||
resolverState := resolver.State{}
|
||||
// Include the xds client for the child LB policies to use. For unit
|
||||
// tests, b.xdsClient may not be a full *xdsclient.Client, but it will
|
||||
// always be in production.
|
||||
if c, ok := b.xdsClient.(*xdsclient.Client); ok {
|
||||
resolverState = xdsclient.SetClient(resolverState, c)
|
||||
}
|
||||
ccState := balancer.ClientConnState{
|
||||
ResolverState: resolverState,
|
||||
BalancerConfig: lbCfg,
|
||||
}
|
||||
if err := b.edsLB.UpdateClientConnState(ccState); err != nil {
|
||||
|
@ -397,7 +408,9 @@ func (b *cdsBalancer) run() {
|
|||
b.edsLB.Close()
|
||||
b.edsLB = nil
|
||||
}
|
||||
b.xdsClient.Close()
|
||||
if newXDSClient != nil {
|
||||
b.xdsClient.Close()
|
||||
}
|
||||
if b.cachedRoot != nil {
|
||||
b.cachedRoot.Close()
|
||||
}
|
||||
|
@ -468,6 +481,14 @@ func (b *cdsBalancer) UpdateClientConnState(state balancer.ClientConnState) erro
|
|||
return errBalancerClosed
|
||||
}
|
||||
|
||||
if b.xdsClient == nil {
|
||||
c := xdsclient.FromResolverState(state.ResolverState)
|
||||
if c == nil {
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
b.xdsClient = c
|
||||
}
|
||||
|
||||
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(state.BalancerConfig))
|
||||
// The errors checked here should ideally never happen because the
|
||||
// ServiceConfig in this case is prepared by the xdsResolver and is not
|
||||
|
|
|
@ -52,7 +52,7 @@ func init() {
|
|||
balancer.Register(clusterImplBB{})
|
||||
}
|
||||
|
||||
var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
|
||||
var newXDSClient func() (xdsClientInterface, error)
|
||||
|
||||
type clusterImplBB struct{}
|
||||
|
||||
|
@ -61,18 +61,22 @@ func (clusterImplBB) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions)
|
|||
ClientConn: cc,
|
||||
bOpts: bOpts,
|
||||
closed: grpcsync.NewEvent(),
|
||||
done: grpcsync.NewEvent(),
|
||||
loadWrapper: loadstore.NewWrapper(),
|
||||
pickerUpdateCh: buffer.NewUnbounded(),
|
||||
requestCountMax: defaultRequestCountMax,
|
||||
}
|
||||
b.logger = prefixLogger(b)
|
||||
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
b.logger.Errorf("failed to create xds-client: %v", err)
|
||||
return nil
|
||||
if newXDSClient != nil {
|
||||
// For tests
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
b.logger.Errorf("failed to create xds-client: %v", err)
|
||||
return nil
|
||||
}
|
||||
b.xdsC = client
|
||||
}
|
||||
b.xdsC = client
|
||||
go b.run()
|
||||
|
||||
b.logger.Infof("Created")
|
||||
|
@ -107,6 +111,7 @@ type clusterImplBalancer struct {
|
|||
// synchronized with Close().
|
||||
mu sync.Mutex
|
||||
closed *grpcsync.Event
|
||||
done *grpcsync.Event
|
||||
|
||||
bOpts balancer.BuildOptions
|
||||
logger *grpclog.PrefixLogger
|
||||
|
@ -204,6 +209,14 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState
|
|||
return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
|
||||
}
|
||||
|
||||
if cib.xdsC == nil {
|
||||
c := xdsclient.FromResolverState(s.ResolverState)
|
||||
if c == nil {
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
cib.xdsC = c
|
||||
}
|
||||
|
||||
// Update load reporting config. This needs to be done before updating the
|
||||
// child policy because we need the loadStore from the updated client to be
|
||||
// passed to the ccWrapper, so that the next picker from the child policy
|
||||
|
@ -315,7 +328,10 @@ func (cib *clusterImplBalancer) Close() {
|
|||
cib.childLB.Close()
|
||||
cib.childLB = nil
|
||||
}
|
||||
cib.xdsC.Close()
|
||||
if newXDSClient != nil {
|
||||
cib.xdsC.Close()
|
||||
}
|
||||
<-cib.done.Done()
|
||||
cib.logger.Infof("Shutdown")
|
||||
}
|
||||
|
||||
|
@ -363,6 +379,7 @@ type dropConfigs struct {
|
|||
}
|
||||
|
||||
func (cib *clusterImplBalancer) run() {
|
||||
defer cib.done.Fire()
|
||||
for {
|
||||
select {
|
||||
case update := <-cib.pickerUpdateCh.Get():
|
||||
|
|
|
@ -53,7 +53,7 @@ var (
|
|||
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions, enqueueState func(priorityType, balancer.State), lw load.PerClusterReporter, logger *grpclog.PrefixLogger) edsBalancerImplInterface {
|
||||
return newEDSBalancerImpl(cc, opts, enqueueState, lw, logger)
|
||||
}
|
||||
newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
|
||||
newXDSClient func() (xdsClientInterface, error)
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -76,13 +76,16 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
|
|||
}
|
||||
x.logger = prefixLogger(x)
|
||||
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
x.logger.Errorf("xds: failed to create xds-client: %v", err)
|
||||
return nil
|
||||
if newXDSClient != nil {
|
||||
// For tests
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
x.logger.Errorf("xds: failed to create xds-client: %v", err)
|
||||
return nil
|
||||
}
|
||||
x.xdsClient = client
|
||||
}
|
||||
|
||||
x.xdsClient = client
|
||||
x.edsImpl = newEDSBalancer(x.cc, opts, x.enqueueChildBalancerState, x.loadWrapper, x.logger)
|
||||
x.logger.Infof("Created")
|
||||
go x.run()
|
||||
|
@ -172,7 +175,9 @@ func (x *edsBalancer) run() {
|
|||
x.edsImpl.updateState(u.priority, u.s)
|
||||
case <-x.closed.Done():
|
||||
x.cancelWatch()
|
||||
x.xdsClient.Close()
|
||||
if newXDSClient != nil {
|
||||
x.xdsClient.Close()
|
||||
}
|
||||
x.edsImpl.close()
|
||||
x.logger.Infof("Shutdown")
|
||||
x.done.Fire()
|
||||
|
@ -380,6 +385,14 @@ func (x *edsBalancer) ResolverError(err error) {
|
|||
}
|
||||
|
||||
func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||
if x.xdsClient == nil {
|
||||
c := xdsclient.FromResolverState(s.ResolverState)
|
||||
if c == nil {
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
x.xdsClient = c
|
||||
}
|
||||
|
||||
select {
|
||||
case x.grpcUpdate <- &s:
|
||||
case <-x.closed.Done():
|
||||
|
|
|
@ -36,7 +36,7 @@ func init() {
|
|||
balancer.Register(&lrsBB{})
|
||||
}
|
||||
|
||||
var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
|
||||
var newXDSClient func() (xdsClientInterface, error)
|
||||
|
||||
// Name is the name of the LRS balancer.
|
||||
const Name = "lrs_experimental"
|
||||
|
@ -51,12 +51,15 @@ func (l *lrsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balanc
|
|||
b.logger = prefixLogger(b)
|
||||
b.logger.Infof("Created")
|
||||
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
b.logger.Errorf("failed to create xds-client: %v", err)
|
||||
return nil
|
||||
if newXDSClient != nil {
|
||||
// For tests
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
b.logger.Errorf("failed to create xds-client: %v", err)
|
||||
return nil
|
||||
}
|
||||
b.client = newXDSClientWrapper(client)
|
||||
}
|
||||
b.client = newXDSClientWrapper(client)
|
||||
|
||||
return b
|
||||
}
|
||||
|
@ -87,6 +90,14 @@ func (b *lrsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
|
||||
}
|
||||
|
||||
if b.client == nil {
|
||||
c := xdsclient.FromResolverState(s.ResolverState)
|
||||
if c == nil {
|
||||
return balancer.ErrBadResolverState
|
||||
}
|
||||
b.client = newXDSClientWrapper(c)
|
||||
}
|
||||
|
||||
// Update load reporting config or xds client. This needs to be done before
|
||||
// updating the child policy because we need the loadStore from the updated
|
||||
// client to be passed to the ccWrapper.
|
||||
|
@ -245,5 +256,7 @@ func (w *xdsClientWrapper) close() {
|
|||
w.cancelLoadReport()
|
||||
w.cancelLoadReport = nil
|
||||
}
|
||||
w.c.Close()
|
||||
if newXDSClient != nil {
|
||||
w.c.Close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package client
|
||||
|
||||
import "google.golang.org/grpc/resolver"
|
||||
|
||||
type clientKeyType string
|
||||
|
||||
const clientKey = clientKeyType("grpc.xds.internal.client.Client")
|
||||
|
||||
// FromResolverState returns the Client from state, or nil if not present.
|
||||
func FromResolverState(state resolver.State) *Client {
|
||||
cs, _ := state.Attributes.Value(clientKey).(*Client)
|
||||
return cs
|
||||
}
|
||||
|
||||
// SetClient sets c in state and returns the new state.
|
||||
func SetClient(state resolver.State, c *Client) resolver.State {
|
||||
state.Attributes = state.Attributes.WithValues(clientKey, c)
|
||||
return state
|
||||
}
|
|
@ -160,13 +160,19 @@ func bootstrapConfigFromEnvVariable() ([]byte, error) {
|
|||
// fields left unspecified, in which case the caller should use some sane
|
||||
// defaults.
|
||||
func NewConfig() (*Config, error) {
|
||||
config := &Config{}
|
||||
|
||||
data, err := bootstrapConfigFromEnvVariable()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("xds: Failed to read bootstrap config: %v", err)
|
||||
}
|
||||
logger.Debugf("Bootstrap content: %s", data)
|
||||
return NewConfigFromContents(data)
|
||||
}
|
||||
|
||||
// NewConfigFromContents returns a new Config using the specified bootstrap
|
||||
// file contents instead of reading the environment variable. This is only
|
||||
// suitable for testing purposes.
|
||||
func NewConfigFromContents(data []byte) (*Config, error) {
|
||||
config := &Config{}
|
||||
|
||||
var jsonData map[string]json.RawMessage
|
||||
if err := json.Unmarshal(data, &jsonData); err != nil {
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -92,8 +94,8 @@ func New() (*Client, error) {
|
|||
// singleton. The following calls will return the singleton xds client without
|
||||
// checking or using the config.
|
||||
//
|
||||
// This function is internal only, for c2p resolver to use. DO NOT use this
|
||||
// elsewhere. Use New() instead.
|
||||
// This function is internal only, for c2p resolver and testing to use. DO NOT
|
||||
// use this elsewhere. Use New() instead.
|
||||
func NewWithConfig(config *bootstrap.Config) (*Client, error) {
|
||||
singletonClient.mu.Lock()
|
||||
defer singletonClient.mu.Unlock()
|
||||
|
@ -141,3 +143,49 @@ func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout time.D
|
|||
}
|
||||
return &Client{clientImpl: cl, refCount: 1}, nil
|
||||
}
|
||||
|
||||
// NewClientWithBootstrapContents returns an xds client for this config,
|
||||
// separate from the global singleton. This should be used for testing
|
||||
// purposes only.
|
||||
func NewClientWithBootstrapContents(contents []byte) (*Client, error) {
|
||||
// Normalize the contents
|
||||
buf := bytes.Buffer{}
|
||||
err := json.Indent(&buf, contents, "", "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("xds: error normalizing JSON: %v", err)
|
||||
}
|
||||
contents = bytes.TrimSpace(buf.Bytes())
|
||||
|
||||
clientsMu.Lock()
|
||||
defer clientsMu.Unlock()
|
||||
if c := clients[string(contents)]; c != nil {
|
||||
c.mu.Lock()
|
||||
// Since we don't remove the *Client from the map when it is closed, we
|
||||
// need to recreate the impl if the ref count dropped to zero.
|
||||
if c.refCount > 0 {
|
||||
c.refCount++
|
||||
c.mu.Unlock()
|
||||
return c, nil
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
bcfg, err := bootstrap.NewConfigFromContents(contents)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("xds: error with bootstrap config: %v", err)
|
||||
}
|
||||
|
||||
cImpl, err := newWithConfig(bcfg, defaultWatchExpiryTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := &Client{clientImpl: cImpl, refCount: 1}
|
||||
clients[string(contents)] = c
|
||||
return c, nil
|
||||
}
|
||||
|
||||
var (
|
||||
clients = map[string]*Client{}
|
||||
clientsMu sync.Mutex
|
||||
)
|
||||
|
|
|
@ -36,6 +36,17 @@ import (
|
|||
|
||||
const xdsScheme = "xds"
|
||||
|
||||
// NewBuilder creates a new xds resolver builder using a specific xds bootstrap
|
||||
// config, so tests can use multiple xds clients in different ClientConns at
|
||||
// the same time.
|
||||
func NewBuilder(config []byte) (resolver.Builder, error) {
|
||||
return &xdsResolverBuilder{
|
||||
newXDSClient: func() (xdsClientInterface, error) {
|
||||
return xdsclient.NewClientWithBootstrapContents(config)
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// For overriding in unittests.
|
||||
var newXDSClient = func() (xdsClientInterface, error) { return xdsclient.New() }
|
||||
|
||||
|
@ -43,7 +54,9 @@ func init() {
|
|||
resolver.Register(&xdsResolverBuilder{})
|
||||
}
|
||||
|
||||
type xdsResolverBuilder struct{}
|
||||
type xdsResolverBuilder struct {
|
||||
newXDSClient func() (xdsClientInterface, error)
|
||||
}
|
||||
|
||||
// Build helps implement the resolver.Builder interface.
|
||||
//
|
||||
|
@ -60,6 +73,11 @@ func (b *xdsResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, op
|
|||
r.logger = prefixLogger((r))
|
||||
r.logger.Infof("Creating resolver for target: %+v", t)
|
||||
|
||||
newXDSClient := newXDSClient
|
||||
if b.newXDSClient != nil {
|
||||
newXDSClient = b.newXDSClient
|
||||
}
|
||||
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
|
||||
|
@ -178,6 +196,13 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
|
|||
state := iresolver.SetConfigSelector(resolver.State{
|
||||
ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
|
||||
}, cs)
|
||||
|
||||
// Include the xds client for the LB policies to use. For unit tests,
|
||||
// r.client may not be a full *xdsclient.Client, but it will always be in
|
||||
// production.
|
||||
if c, ok := r.client.(*xdsclient.Client); ok {
|
||||
state = xdsclient.SetClient(state, c)
|
||||
}
|
||||
r.cc.UpdateState(state)
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ func (s) TestClientSideXDS(t *testing.T) {
|
|||
port, cleanup := clientSetup(t)
|
||||
defer cleanup()
|
||||
|
||||
serviceName := xdsServiceName + "-client-side-xds"
|
||||
const serviceName = "my-service-client-side-xds"
|
||||
resources := e2e.DefaultClientResources(e2e.ResourceParams{
|
||||
DialTarget: serviceName,
|
||||
NodeID: xdsClientNodeID,
|
||||
|
@ -81,7 +81,7 @@ func (s) TestClientSideXDS(t *testing.T) {
|
|||
}
|
||||
|
||||
// Create a ClientConn and make a successful RPC.
|
||||
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolverBuilder))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to dial local test server: %v", err)
|
||||
}
|
||||
|
|
|
@ -40,7 +40,9 @@ import (
|
|||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/leakcheck"
|
||||
"google.golang.org/grpc/internal/xds/env"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/testdata"
|
||||
"google.golang.org/grpc/xds"
|
||||
"google.golang.org/grpc/xds/internal/testutils/e2e"
|
||||
|
||||
xdsinternal "google.golang.org/grpc/internal/xds"
|
||||
|
@ -71,8 +73,10 @@ func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, er
|
|||
var (
|
||||
// Globals corresponding to the single instance of the xDS management server
|
||||
// which is spawned for all the tests in this package.
|
||||
managementServer *e2e.ManagementServer
|
||||
xdsClientNodeID string
|
||||
managementServer *e2e.ManagementServer
|
||||
xdsClientNodeID string
|
||||
bootstrapContents []byte
|
||||
xdsResolverBuilder resolver.Builder
|
||||
)
|
||||
|
||||
// TestMain sets up an xDS management server, runs all tests, and stops the
|
||||
|
@ -158,30 +162,33 @@ func createClientTLSCredentials(t *testing.T) credentials.TransportCredentials {
|
|||
// - sets up the global variables which refer to this management server and the
|
||||
// nodeID to be used when talking to this management server.
|
||||
//
|
||||
// Returns a function to be invoked by the caller to stop the management server.
|
||||
func setupManagementServer() (func(), error) {
|
||||
// Returns a function to be invoked by the caller to stop the management
|
||||
// server.
|
||||
func setupManagementServer() (cleanup func(), err error) {
|
||||
// Turn on the env var protection for client-side security.
|
||||
origClientSideSecurityEnvVar := env.ClientSideSecuritySupport
|
||||
env.ClientSideSecuritySupport = true
|
||||
|
||||
// Spin up an xDS management server on a local port.
|
||||
var err error
|
||||
managementServer, err = e2e.StartManagementServer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
managementServer.Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
// Create a directory to hold certs and key files used on the server side.
|
||||
serverDir, err := createTmpDirWithFiles("testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
|
||||
if err != nil {
|
||||
managementServer.Stop()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create a directory to hold certs and key files used on the client side.
|
||||
clientDir, err := createTmpDirWithFiles("testClientSideXDS*", "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem")
|
||||
if err != nil {
|
||||
managementServer.Stop()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -194,7 +201,7 @@ func setupManagementServer() (func(), error) {
|
|||
|
||||
// Create a bootstrap file in a temporary directory.
|
||||
xdsClientNodeID = uuid.New().String()
|
||||
bootstrapCleanup, err := xdsinternal.SetupBootstrapFile(xdsinternal.BootstrapOptions{
|
||||
bootstrapContents, err = xdsinternal.BootstrapContents(xdsinternal.BootstrapOptions{
|
||||
Version: xdsinternal.TransportV3,
|
||||
NodeID: xdsClientNodeID,
|
||||
ServerURI: managementServer.Address,
|
||||
|
@ -202,13 +209,15 @@ func setupManagementServer() (func(), error) {
|
|||
ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
|
||||
})
|
||||
if err != nil {
|
||||
managementServer.Stop()
|
||||
return nil, err
|
||||
}
|
||||
xdsResolverBuilder, err = xds.NewXDSResolverWithConfigForTesting(bootstrapContents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return func() {
|
||||
managementServer.Stop()
|
||||
bootstrapCleanup()
|
||||
env.ClientSideSecuritySupport = origClientSideSecurityEnvVar
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -46,8 +46,6 @@ const (
|
|||
certFile = "cert.pem"
|
||||
keyFile = "key.pem"
|
||||
rootFile = "ca.pem"
|
||||
|
||||
xdsServiceName = "my-service"
|
||||
)
|
||||
|
||||
// setupGRPCServer performs the following:
|
||||
|
@ -70,7 +68,7 @@ func setupGRPCServer(t *testing.T) (net.Listener, func()) {
|
|||
}
|
||||
|
||||
// Initialize an xDS-enabled gRPC server and register the stubServer on it.
|
||||
server := xds.NewGRPCServer(grpc.Creds(creds))
|
||||
server := xds.NewGRPCServer(grpc.Creds(creds), xds.BootstrapContentsForTesting(bootstrapContents))
|
||||
testpb.RegisterTestServiceServer(server, &testService{})
|
||||
|
||||
// Create a local listener and pass it to Serve().
|
||||
|
@ -123,7 +121,7 @@ func (s) TestServerSideXDS_Fallback(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("failed to retrieve host and port of server: %v", err)
|
||||
}
|
||||
serviceName := xdsServiceName + "-fallback"
|
||||
const serviceName = "my-service-fallback"
|
||||
resources := e2e.DefaultClientResources(e2e.ResourceParams{
|
||||
DialTarget: serviceName,
|
||||
NodeID: xdsClientNodeID,
|
||||
|
@ -154,7 +152,7 @@ func (s) TestServerSideXDS_Fallback(t *testing.T) {
|
|||
// Create a ClientConn with the xds scheme and make a successful RPC.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds))
|
||||
cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(xdsResolverBuilder))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to dial local test server: %v", err)
|
||||
}
|
||||
|
@ -205,7 +203,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
|
|||
// Create xDS resources to be consumed on the client side. This
|
||||
// includes the listener, route configuration, cluster (with
|
||||
// security configuration) and endpoint resources.
|
||||
serviceName := xdsServiceName + "-file-watcher-certs-" + test.name
|
||||
serviceName := "my-service-file-watcher-certs-" + test.name
|
||||
resources := e2e.DefaultClientResources(e2e.ResourceParams{
|
||||
DialTarget: serviceName,
|
||||
NodeID: xdsClientNodeID,
|
||||
|
@ -236,7 +234,7 @@ func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
|
|||
// Create a ClientConn with the xds scheme and make an RPC.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds))
|
||||
cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(xdsResolverBuilder))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to dial local test server: %v", err)
|
||||
}
|
||||
|
@ -270,7 +268,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("failed to retrieve host and port of server: %v", err)
|
||||
}
|
||||
serviceName := xdsServiceName + "-security-config-change"
|
||||
const serviceName = "my-service-security-config-change"
|
||||
resources := e2e.DefaultClientResources(e2e.ResourceParams{
|
||||
DialTarget: serviceName,
|
||||
NodeID: xdsClientNodeID,
|
||||
|
@ -301,7 +299,7 @@ func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
|
|||
// Create a ClientConn with the xds scheme and make a successful RPC.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds))
|
||||
xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds), grpc.WithResolvers(xdsResolverBuilder))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to dial local test server: %v", err)
|
||||
}
|
||||
|
|
|
@ -105,7 +105,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
|
|||
})
|
||||
|
||||
// Initialize an xDS-enabled gRPC server and register the stubServer on it.
|
||||
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt)
|
||||
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
|
||||
defer server.Stop()
|
||||
testpb.RegisterTestServiceServer(server, &testService{})
|
||||
|
||||
|
|
|
@ -131,8 +131,8 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
|
|||
func handleServerOptions(opts []grpc.ServerOption) *serverOptions {
|
||||
so := &serverOptions{}
|
||||
for _, opt := range opts {
|
||||
if o, ok := opt.(serverOption); ok {
|
||||
o.applyServerOption(so)
|
||||
if o, ok := opt.(*serverOption); ok {
|
||||
o.apply(so)
|
||||
}
|
||||
}
|
||||
return so
|
||||
|
@ -154,6 +154,12 @@ func (s *GRPCServer) initXDSClient() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
newXDSClient := newXDSClient
|
||||
if s.opts.bootstrapContents != nil {
|
||||
newXDSClient = func() (xdsClientInterface, error) {
|
||||
return xdsclient.NewClientWithBootstrapContents(s.opts.bootstrapContents)
|
||||
}
|
||||
}
|
||||
client, err := newXDSClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("xds: failed to create xds-client: %v", err)
|
||||
|
@ -181,7 +187,6 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
|
|||
if err := s.initXDSClient(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg := s.xdsC.BootstrapConfig()
|
||||
if cfg == nil {
|
||||
return errors.New("bootstrap configuration is empty")
|
||||
|
|
|
@ -25,31 +25,20 @@ import (
|
|||
iserver "google.golang.org/grpc/xds/internal/server"
|
||||
)
|
||||
|
||||
type serverOptions struct {
|
||||
modeCallback ServingModeCallbackFunc
|
||||
bootstrapContents []byte
|
||||
}
|
||||
|
||||
type serverOption struct {
|
||||
grpc.EmptyServerOption
|
||||
apply func(*serverOptions)
|
||||
}
|
||||
|
||||
// ServingModeCallback returns a grpc.ServerOption which allows users to
|
||||
// register a callback to get notified about serving mode changes.
|
||||
func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption {
|
||||
return &smcOption{cb: cb}
|
||||
}
|
||||
|
||||
type serverOption interface {
|
||||
applyServerOption(*serverOptions)
|
||||
}
|
||||
|
||||
// smcOption is a server option containing a callback to be invoked when the
|
||||
// serving mode changes.
|
||||
type smcOption struct {
|
||||
// Embedding the empty server option makes it safe to pass it to
|
||||
// grpc.NewServer().
|
||||
grpc.EmptyServerOption
|
||||
cb ServingModeCallbackFunc
|
||||
}
|
||||
|
||||
func (s *smcOption) applyServerOption(o *serverOptions) {
|
||||
o.modeCallback = s.cb
|
||||
}
|
||||
|
||||
type serverOptions struct {
|
||||
modeCallback ServingModeCallbackFunc
|
||||
return &serverOption{apply: func(o *serverOptions) { o.modeCallback = cb }}
|
||||
}
|
||||
|
||||
// ServingMode indicates the current mode of operation of the server.
|
||||
|
@ -82,3 +71,15 @@ type ServingModeChangeArgs struct {
|
|||
// not-serving mode.
|
||||
Err error
|
||||
}
|
||||
|
||||
// BootstrapContentsForTesting returns a grpc.ServerOption which allows users
|
||||
// to inject a bootstrap configuration used by only this server, instead of the
|
||||
// global configuration from the environment variables.
|
||||
//
|
||||
// Testing Only
|
||||
//
|
||||
// This function should ONLY be used for testing and may not work with some
|
||||
// other features, including the CSDS service.
|
||||
func BootstrapContentsForTesting(contents []byte) grpc.ServerOption {
|
||||
return &serverOption{apply: func(o *serverOptions) { o.bootstrapContents = contents }}
|
||||
}
|
||||
|
|
16
xds/xds.go
16
xds/xds.go
|
@ -38,6 +38,7 @@ import (
|
|||
v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
|
||||
"google.golang.org/grpc"
|
||||
internaladmin "google.golang.org/grpc/internal/admin"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/xds/csds"
|
||||
|
||||
_ "google.golang.org/grpc/credentials/tls/certprovider/pemfile" // Register the file watcher certificate provider plugin.
|
||||
|
@ -45,7 +46,7 @@ import (
|
|||
_ "google.golang.org/grpc/xds/internal/client/v2" // Register the v2 xDS API client.
|
||||
_ "google.golang.org/grpc/xds/internal/client/v3" // Register the v3 xDS API client.
|
||||
_ "google.golang.org/grpc/xds/internal/httpfilter/fault" // Register the fault injection filter.
|
||||
_ "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver.
|
||||
xdsresolver "google.golang.org/grpc/xds/internal/resolver" // Register the xds_resolver.
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -76,3 +77,16 @@ func init() {
|
|||
return csdss.Close, nil
|
||||
})
|
||||
}
|
||||
|
||||
// NewXDSResolverWithConfigForTesting creates a new xds resolver builder using
|
||||
// the provided xds bootstrap config instead of the global configuration from
|
||||
// the supported environment variables. The resolver.Builder is meant to be
|
||||
// used in conjunction with the grpc.WithResolvers DialOption.
|
||||
//
|
||||
// Testing Only
|
||||
//
|
||||
// This function should ONLY be used for testing and may not work with some
|
||||
// other features, including the CSDS service.
|
||||
func NewXDSResolverWithConfigForTesting(bootstrapConfig []byte) (resolver.Builder, error) {
|
||||
return xdsresolver.NewBuilder(bootstrapConfig)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue