xdsclient: close func refactor (#5926)

Fixes https://github.com/grpc/grpc-go/issues/5895
This commit is contained in:
Easwar Swaminathan 2023-01-18 11:32:40 -08:00 committed by GitHub
parent 9326362a37
commit ace808232f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 424 additions and 438 deletions

View File

@ -49,7 +49,11 @@ type Options struct {
NodeID string
// ServerURI is the address of the management server.
ServerURI string
// ServerListenerResourceNameTemplate is the Listener resource name to fetch.
// ClientDefaultListenerResourceNameTemplate is the default listener
// resource name template to be used on the gRPC client.
ClientDefaultListenerResourceNameTemplate string
// ServerListenerResourceNameTemplate is the listener resource name template
// to be used on the gRPC server.
ServerListenerResourceNameTemplate string
// CertificateProviders is the certificate providers configuration.
CertificateProviders map[string]json.RawMessage
@ -111,8 +115,9 @@ func Contents(opts Options) ([]byte, error) {
Node: node{
ID: opts.NodeID,
},
CertificateProviders: opts.CertificateProviders,
ServerListenerResourceNameTemplate: opts.ServerListenerResourceNameTemplate,
CertificateProviders: opts.CertificateProviders,
ClientDefaultListenerResourceNameTemplate: opts.ClientDefaultListenerResourceNameTemplate,
ServerListenerResourceNameTemplate: opts.ServerListenerResourceNameTemplate,
}
switch opts.Version {
case TransportV2:
@ -146,11 +151,12 @@ func Contents(opts Options) ([]byte, error) {
}
type bootstrapConfig struct {
XdsServers []server `json:"xds_servers,omitempty"`
Node node `json:"node,omitempty"`
CertificateProviders map[string]json.RawMessage `json:"certificate_providers,omitempty"`
ServerListenerResourceNameTemplate string `json:"server_listener_resource_name_template,omitempty"`
Authorities map[string]authority `json:"authorities,omitempty"`
XdsServers []server `json:"xds_servers,omitempty"`
Node node `json:"node,omitempty"`
CertificateProviders map[string]json.RawMessage `json:"certificate_providers,omitempty"`
ClientDefaultListenerResourceNameTemplate string `json:"client_default_listener_resource_name_template,omitempty"`
ServerListenerResourceNameTemplate string `json:"server_listener_resource_name_template,omitempty"`
Authorities map[string]authority `json:"authorities,omitempty"`
}
type authority struct {

View File

@ -117,7 +117,7 @@ func StartManagementServer(opts ManagementServerOptions) (*ManagementServer, err
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("failed to start xDS management server: %v", err)
return nil, fmt.Errorf("listening on local host and port: %v", err)
}
}

View File

@ -62,18 +62,19 @@ func prefixLogger(s *ClientStatusDiscoveryServer) *internalgrpclog.PrefixLogger
type ClientStatusDiscoveryServer struct {
logger *internalgrpclog.PrefixLogger
mu sync.Mutex
xdsClient xdsclient.XDSClient
mu sync.Mutex
xdsClient xdsclient.XDSClient
xdsClientClose func()
}
// NewClientStatusDiscoveryServer returns an implementation of the CSDS server
// that can be registered on a gRPC server.
func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
c, err := xdsclient.New()
c, close, err := xdsclient.New()
if err != nil {
logger.Warningf("Failed to create xDS client: %v", err)
}
s := &ClientStatusDiscoveryServer{xdsClient: c}
s := &ClientStatusDiscoveryServer{xdsClient: c, xdsClientClose: close}
s.logger = prefixLogger(s)
s.logger.Infof("Created CSDS server, with xdsClient %p", c)
return s, nil
@ -135,11 +136,9 @@ func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statusp
// Close cleans up the resources.
func (s *ClientStatusDiscoveryServer) Close() {
s.mu.Lock()
if s.xdsClient != nil {
s.xdsClient.Close()
if s.xdsClientClose != nil {
s.xdsClientClose()
}
s.mu.Unlock()
}
// nodeProtoToV3 converts the given proto into a v3.Node. n is from bootstrap

View File

@ -108,11 +108,11 @@ func (s) TestCSDS(t *testing.T) {
// Create an xDS client. This will end up using the same singleton as used
// by the CSDS service.
xdsC, err := xdsclient.New()
xdsC, close, err := xdsclient.New()
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer xdsC.Close()
defer close()
// Initialize an gRPC server and register CSDS on it.
server := grpc.NewServer()

View File

@ -70,7 +70,7 @@ const (
var (
onGCE = googlecloud.OnGCE
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) {
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, func(), error) {
return xdsclient.NewWithConfig(config)
}
@ -135,7 +135,7 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
// Create singleton xds client with this config. The xds client will be
// used by the xds resolver later.
xdsC, err := newClientWithConfig(config)
_, close, err := newClientWithConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to start xDS client: %v", err)
}
@ -154,12 +154,12 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
}
xdsR, err := resolver.Get(xdsName).Build(t, cc, opts)
if err != nil {
xdsC.Close()
close()
return nil, err
}
return &c2pResolver{
Resolver: xdsR,
client: xdsC,
Resolver: xdsR,
clientCloseFunc: close,
}, nil
}
@ -169,12 +169,12 @@ func (b c2pResolverBuilder) Scheme() string {
type c2pResolver struct {
resolver.Resolver
client xdsclient.XDSClient
clientCloseFunc func()
}
func (r *c2pResolver) Close() {
r.Resolver.Close()
r.client.Close()
r.clientCloseFunc()
}
var ipv6EnabledMetadata = &structpb.Struct{

View File

@ -180,9 +180,9 @@ func TestBuildXDS(t *testing.T) {
configCh := make(chan *bootstrap.Config, 1)
oldNewClient := newClientWithConfig
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, error) {
newClientWithConfig = func(config *bootstrap.Config) (xdsclient.XDSClient, func(), error) {
configCh <- config
return tXDSClient, nil
return tXDSClient, func() { tXDSClient.Close() }, nil
}
defer func() { newClientWithConfig = oldNewClient }()

View File

@ -175,7 +175,6 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newChildBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}

View File

@ -255,7 +255,6 @@ func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *t
return xdsC, cdsB.(*cdsBalancer), edsB, tcc, func() {
newChildBalancer = oldEDSBalancerBuilder
xdsC.Close()
}
}
@ -286,7 +285,6 @@ func setupWithWatch(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBal
// provided xdsClient is invoked appropriately.
func (s) TestUpdateClientConnState(t *testing.T) {
xdsC := fakeclient.NewClient()
defer xdsC.Close()
tests := []struct {
name string

View File

@ -88,7 +88,6 @@ func (s) TestDropByCategory(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
@ -248,7 +247,6 @@ func (s) TestDropByCategory(t *testing.T) {
func (s) TestDropCircuitBreaking(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
@ -361,7 +359,6 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
func (s) TestPickerUpdateAfterClose(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
@ -430,7 +427,6 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
@ -506,7 +502,6 @@ func (s) TestReResolution(t *testing.T) {
defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
@ -574,7 +569,6 @@ func (s) TestLoadReporting(t *testing.T) {
}
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)
@ -689,7 +683,6 @@ func (s) TestUpdateLRSServer(t *testing.T) {
}
xdsC := fakeclient.NewClient()
defer xdsC.Close()
builder := balancer.Get(Name)
cc := testutils.NewTestClientConn(t)

View File

@ -215,10 +215,7 @@ func setup(childLBCh *testutils.Channel) (*fakeclient.Client, func()) {
defer func() { childLBCh.Send(childLB) }()
return childLB
}
return xdsC, func() {
newChildBalancer = origNewChildBalancer
xdsC.Close()
}
return xdsC, func() { newChildBalancer = origNewChildBalancer }
}
// TestSubConnStateChange verifies if the top-level clusterResolverBalancer passes on

View File

@ -192,11 +192,11 @@ func (s) TestEDS_OneLocality(t *testing.T) {
}
// Create an xDS client for use by the cluster_resolver LB policy.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Create a manual resolver and push a service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
@ -300,11 +300,11 @@ func (s) TestEDS_MultipleLocalities(t *testing.T) {
}
// Create an xDS client for use by the cluster_resolver LB policy.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Create a manual resolver and push service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
@ -438,11 +438,11 @@ func (s) TestEDS_EndpointsHealth(t *testing.T) {
}
// Create an xDS client for use by the cluster_resolver LB policy.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Create a manual resolver and push service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.
@ -504,11 +504,11 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
}
// Create an xDS client for use by the cluster_resolver LB policy.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Create a manual resolver and push service config specifying the use of
// the cluster_resolver LB policy with a single discovery mechanism.

View File

@ -78,17 +78,14 @@ func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig)
},
}); err != nil {
edsb.Close()
xdsC.Close()
t.Fatal(err)
}
if _, err := xdsC.WaitForWatchEDS(ctx); err != nil {
edsb.Close()
xdsC.Close()
t.Fatalf("xdsClient.WatchEndpoints failed with error: %v", err)
}
return edsb, cc, xdsC, func() {
edsb.Close()
xdsC.Close()
}
}

View File

@ -44,14 +44,14 @@ const xdsScheme = "xds"
// ClientConns at the same time.
func newBuilderForTesting(config []byte) (resolver.Builder, error) {
return &xdsResolverBuilder{
newXDSClient: func() (xdsclient.XDSClient, error) {
newXDSClient: func() (xdsclient.XDSClient, func(), error) {
return xdsclient.NewWithBootstrapContentsForTesting(config)
},
}, nil
}
// For overriding in unittests.
var newXDSClient = func() (xdsclient.XDSClient, error) { return xdsclient.New() }
var newXDSClient = func() (xdsclient.XDSClient, func(), error) { return xdsclient.New() }
func init() {
resolver.Register(&xdsResolverBuilder{})
@ -59,7 +59,7 @@ func init() {
}
type xdsResolverBuilder struct {
newXDSClient func() (xdsclient.XDSClient, error)
newXDSClient func() (xdsclient.XDSClient, func(), error)
}
// Build helps implement the resolver.Builder interface.
@ -87,11 +87,12 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
newXDSClient = b.newXDSClient
}
client, err := newXDSClient()
client, close, err := newXDSClient()
if err != nil {
return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
}
r.client = client
r.xdsClient = client
r.xdsClientClose = close
bootstrapConfig := client.BootstrapConfig()
if bootstrapConfig == nil {
return nil, errors.New("bootstrap configuration is empty")
@ -138,11 +139,11 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon
r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, endpoint)
// Register a watch on the xdsClient for the resource name determined above.
cancelWatch := watchService(r.client, r.ldsResourceName, r.handleServiceUpdate, r.logger)
r.logger.Infof("Watch started on resource name %v with xds-client %p", r.ldsResourceName, r.client)
cancelWatch := watchService(r.xdsClient, r.ldsResourceName, r.handleServiceUpdate, r.logger)
r.logger.Infof("Watch started on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient)
r.cancelWatch = func() {
cancelWatch()
r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.ldsResourceName, r.client)
r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient)
}
go r.run()
@ -174,7 +175,8 @@ type xdsResolver struct {
ldsResourceName string
// The underlying xdsClient which performs all xDS requests and responses.
client xdsclient.XDSClient
xdsClient xdsclient.XDSClient
xdsClientClose func()
// A channel for the watch API callback to write service updates on to. The
// updates are read by the run goroutine and passed on to the ClientConn.
updateCh chan suWithError
@ -218,13 +220,13 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
r.cc.ReportError(err)
return false
}
r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.ldsResourceName, r.client, pretty.FormatJSON(sc))
r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.ldsResourceName, r.xdsClient, pretty.FormatJSON(sc))
// Send the update to the ClientConn.
state := iresolver.SetConfigSelector(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
}, cs)
r.cc.UpdateState(xdsclient.SetClient(state, r.client))
r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient))
return true
}
@ -237,7 +239,7 @@ func (r *xdsResolver) run() {
return
case update := <-r.updateCh:
if update.err != nil {
r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.ldsResourceName, r.client, update.err)
r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.ldsResourceName, r.xdsClient, update.err)
if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
// If error is resource-not-found, it means the LDS
// resource was removed. Ultimately send an empty service
@ -265,7 +267,7 @@ func (r *xdsResolver) run() {
// Create the config selector for this update.
cs, err := r.newConfigSelector(update.su)
if err != nil {
r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.ldsResourceName, r.client, err)
r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.ldsResourceName, r.xdsClient, err)
r.cc.ReportError(err)
continue
}
@ -313,8 +315,8 @@ func (r *xdsResolver) Close() {
if r.cancelWatch != nil {
r.cancelWatch()
}
if r.client != nil {
r.client.Close()
if r.xdsClientClose != nil {
r.xdsClientClose()
}
r.closed.Fire()
r.logger.Infof("Shutdown")

View File

@ -30,11 +30,13 @@ import (
xxhash "github.com/cespare/xxhash/v2"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/testutils"
@ -43,7 +45,6 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
@ -52,13 +53,15 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config
)
const (
targetStr = "target"
routeStr = "route"
cluster = "cluster"
defaultTestTimeout = 1 * time.Second
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 100 * time.Microsecond
)
@ -76,15 +79,14 @@ func Test(t *testing.T) {
}
func (s) TestRegister(t *testing.T) {
b := resolver.Get(xdsScheme)
if b == nil {
if resolver.Get(xdsScheme) == nil {
t.Errorf("scheme %v is not registered", xdsScheme)
}
}
// testClientConn is a fake implemetation of resolver.ClientConn. All is does
// is to store the state received from the resolver locally and signal that
// event through a channel.
// testClientConn is a fake implemetation of resolver.ClientConn that pushes
// state updates and errors returned by the resolver on to channels for
// consumption by tests.
type testClientConn struct {
resolver.ClientConn
stateCh *testutils.Channel
@ -111,34 +113,35 @@ func newTestClientConn() *testClientConn {
}
}
// TestResolverBuilder tests the xdsResolverBuilder's Build method with
// TestResolverBuilder tests the resolver builder's Build() method with
// different parameters.
func (s) TestResolverBuilder(t *testing.T) {
tests := []struct {
name string
xdsClientFunc func() (xdsclient.XDSClient, error)
xdsClientFunc func(closeCh chan struct{}) (xdsclient.XDSClient, func(), error)
target resolver.Target
buildOpts resolver.BuildOptions
wantErr bool
}{
{
name: "simple-good",
xdsClientFunc: func() (xdsclient.XDSClient, error) {
return fakeclient.NewClient(), nil
name: "good",
xdsClientFunc: func(closeCh chan struct{}) (xdsclient.XDSClient, func(), error) {
return fakeclient.NewClient(), func() { close(closeCh) }, nil
},
target: target,
wantErr: false,
},
{
name: "newXDSClient-throws-error",
xdsClientFunc: func() (xdsclient.XDSClient, error) {
return nil, errors.New("newXDSClient-throws-error")
name: "xDS client creation fails",
xdsClientFunc: func(closeCh chan struct{}) (xdsclient.XDSClient, func(), error) {
return nil, func() { close(closeCh) }, errors.New("failed to create xDS client")
},
target: target,
wantErr: true,
},
{
name: "authority not defined in bootstrap",
xdsClientFunc: func() (xdsclient.XDSClient, error) {
xdsClientFunc: func(closeCh chan struct{}) (xdsclient.XDSClient, func(), error) {
c := fakeclient.NewClient()
c.SetBootstrapConfig(&bootstrap.Config{
ClientDefaultListenerResourceNameTemplate: "%s",
@ -148,7 +151,7 @@ func (s) TestResolverBuilder(t *testing.T) {
},
},
})
return c, nil
return c, func() { close(closeCh) }, nil
},
target: resolver.Target{
URL: url.URL{
@ -158,12 +161,32 @@ func (s) TestResolverBuilder(t *testing.T) {
},
wantErr: true,
},
{
name: "xDS creds specified without certificate providers in bootstrap",
xdsClientFunc: func(closeCh chan struct{}) (xdsclient.XDSClient, func(), error) {
c := fakeclient.NewClient()
c.SetBootstrapConfig(&bootstrap.Config{})
return c, func() { close(closeCh) }, nil
},
target: target,
buildOpts: resolver.BuildOptions{
DialCreds: func() credentials.TransportCredentials {
creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
t.Fatalf("xds.NewClientCredentials() failed: %v", err)
}
return creds
}(),
},
wantErr: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Fake out the xdsClient creation process by providing a fake.
// Use a fake xDS client that closes the below channel when closed.
closeCh := make(chan struct{})
oldClientMaker := newXDSClient
newXDSClient = test.xdsClientFunc
newXDSClient = func() (xdsclient.XDSClient, func(), error) { return test.xdsClientFunc(closeCh) }
defer func() {
newXDSClient = oldClientMaker
}()
@ -173,7 +196,7 @@ func (s) TestResolverBuilder(t *testing.T) {
t.Fatalf("resolver.Get(%v) returned nil", xdsScheme)
}
r, err := builder.Build(test.target, newTestClientConn(), resolver.BuildOptions{})
r, err := builder.Build(test.target, newTestClientConn(), test.buildOpts)
if (err != nil) != test.wantErr {
t.Fatalf("builder.Build(%v) returned err: %v, wantErr: %v", target, err, test.wantErr)
}
@ -182,51 +205,16 @@ func (s) TestResolverBuilder(t *testing.T) {
return
}
r.Close()
select {
case <-closeCh:
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for xDS client to be closed")
}
})
}
}
// TestResolverBuilder_xdsCredsBootstrapMismatch tests the case where an xds
// resolver is built with xds credentials being specified by the user. The
// bootstrap file does not contain any certificate provider configuration
// though, and therefore we expect the resolver build to fail.
func (s) TestResolverBuilder_xdsCredsBootstrapMismatch(t *testing.T) {
// Fake out the xdsClient creation process by providing a fake, which does
// not have any certificate provider configuration.
fc := fakeclient.NewClient()
fc.SetBootstrapConfig(&bootstrap.Config{})
oldClientMaker := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, error) {
return fc, nil
}
defer func() { newXDSClient = oldClientMaker }()
defer func() {
select {
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for close")
case <-fc.Closed.Done():
}
}()
builder := resolver.Get(xdsScheme)
if builder == nil {
t.Fatalf("resolver.Get(%v) returned nil", xdsScheme)
}
// Create xds credentials to be passed to resolver.Build().
creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{FallbackCreds: insecure.NewCredentials()})
if err != nil {
t.Fatalf("xds.NewClientCredentials() failed: %v", err)
}
// Since the fake xds client is not configured with any certificate provider
// configs, and we are specifying xds credentials in the call to
// resolver.Build(), we expect it to fail.
if _, err := builder.Build(target, newTestClientConn(), resolver.BuildOptions{DialCreds: creds}); err == nil {
t.Fatal("builder.Build() succeeded when expected to fail")
}
}
type setupOpts struct {
bootstrapC *bootstrap.Config
target resolver.Target
@ -240,8 +228,9 @@ func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *fakeclient.Client,
fc.SetBootstrapConfig(opts.bootstrapC)
}
oldClientMaker := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, error) {
return fc, nil
closeCh := make(chan struct{})
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
return fc, grpcsync.OnceFunc(func() { close(closeCh) }), nil
}
cancel := func() {
// Make sure the xDS client is closed, in all (successful or failed)
@ -249,7 +238,7 @@ func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *fakeclient.Client,
select {
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for close")
case <-fc.Closed.Done():
case <-closeCh:
}
newXDSClient = oldClientMaker
}
@ -410,7 +399,9 @@ func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
},
}, nil)
if gotVal, gotErr := tcc.stateCh.Receive(ctx); gotErr != context.DeadlineExceeded {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if gotVal, gotErr := tcc.stateCh.Receive(sCtx); gotErr != context.DeadlineExceeded {
t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed: %v", gotVal)
}
}
@ -418,12 +409,9 @@ func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
// TestXDSResolverCloseClosesXDSClient tests that the XDS resolver's Close
// method closes the XDS client.
func (s) TestXDSResolverCloseClosesXDSClient(t *testing.T) {
xdsR, xdsC, _, cancel := testSetup(t, setupOpts{target: target})
defer cancel()
xdsR, _, _, cancel := testSetup(t, setupOpts{target: target})
xdsR.Close()
if !xdsC.Closed.HasFired() {
t.Fatalf("xds client not closed by xds resolver Close method")
}
cancel() // Blocks until the xDS client is closed.
}
// TestXDSResolverBadServiceUpdate tests the case the xdsClient returns a bad
@ -471,13 +459,13 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
{
routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{"test-cluster-1": {Weight: 1}}}},
wantJSON: `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
}
}
}}]}`,
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
}
}
}}]}`,
wantClusters: map[string]bool{"cluster:test-cluster-1": true},
},
{
@ -489,19 +477,19 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
// well as this update, as the previous config selector still
// references the old cluster when the new one is pushed.
wantJSON: `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
},
"cluster:cluster_1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
},
"cluster:cluster_2":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
}
}
}}]}`,
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
},
"cluster:cluster_1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
},
"cluster:cluster_2":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
}
}
}}]}`,
wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
},
{
@ -513,16 +501,16 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
// stopped, so there are no more references to the first cluster.
// Only the second update's clusters should remain.
wantJSON: `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster:cluster_1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
},
"cluster:cluster_2":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
}
}
}}]}`,
"xds_cluster_manager_experimental":{
"children":{
"cluster:cluster_1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
},
"cluster:cluster_2":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
}
}
}}]}`,
wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
},
} {
@ -723,13 +711,13 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {
},
}, nil)
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
}
}
}}]}`
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
}
}
}}]}`
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON)
gotState, err := tcc.stateCh.Receive(ctx)
@ -985,13 +973,13 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
}
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
}
}
}}]}`
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
}
}
}}]}`
wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
@ -1044,16 +1032,16 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON2 := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
},
"cluster:NEW":{
"childPolicy":[{"cds_experimental":{"cluster":"NEW"}}]
}
}
}}]}`
"xds_cluster_manager_experimental":{
"children":{
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
},
"cluster:NEW":{
"childPolicy":[{"cds_experimental":{"cluster":"NEW"}}]
}
}
}}]}`
wantSCParsed2 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON2)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
@ -1082,13 +1070,13 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON3 := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster:NEW":{
"childPolicy":[{"cds_experimental":{"cluster":"NEW"}}]
}
}
}}]}`
"xds_cluster_manager_experimental":{
"children":{
"cluster:NEW":{
"childPolicy":[{"cds_experimental":{"cluster":"NEW"}}]
}
}
}}]}`
wantSCParsed3 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantJSON3)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
@ -1166,7 +1154,9 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
suErr := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{}, suErr)
if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != context.DeadlineExceeded {
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if gotErrVal, gotErr := tcc.errorCh.Receive(sCtx); gotErr != context.DeadlineExceeded {
t.Fatalf("ClientConn.ReportError() received %v, %v, want channel recv timeout", gotErrVal, gotErr)
}

View File

@ -22,7 +22,6 @@ package fakeclient
import (
"context"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
@ -56,8 +55,6 @@ type Client struct {
rdsCbs map[string]func(xdsresource.RouteConfigUpdate, error)
cdsCbs map[string]func(xdsresource.ClusterUpdate, error)
edsCbs map[string]func(xdsresource.EndpointsUpdate, error)
Closed *grpcsync.Event // fired when Close is called.
}
// WatchListener registers a LDS watch.
@ -275,11 +272,6 @@ func (xdsC *Client) WaitForReportLoad(ctx context.Context) (ReportLoadArgs, erro
return val.(ReportLoadArgs), nil
}
// Close fires xdsC.Closed, indicating it was called.
func (xdsC *Client) Close() {
xdsC.Closed.Fire()
}
// BootstrapConfig returns the bootstrap config.
func (xdsC *Client) BootstrapConfig() *bootstrap.Config {
return xdsC.bootstrapCfg
@ -321,6 +313,5 @@ func NewClientWithName(name string) *Client {
rdsCbs: make(map[string]func(xdsresource.RouteConfigUpdate, error)),
cdsCbs: make(map[string]func(xdsresource.ClusterUpdate, error)),
edsCbs: make(map[string]func(xdsresource.EndpointsUpdate, error)),
Closed: grpcsync.NewEvent(),
}
}

View File

@ -366,7 +366,7 @@ func newConfigFromContents(data []byte) (*Config, error) {
var jsonData map[string]json.RawMessage
if err := json.Unmarshal(data, &jsonData); err != nil {
return nil, fmt.Errorf("xds: Failed to parse bootstrap config: %v", err)
return nil, fmt.Errorf("xds: failed to parse bootstrap config: %v", err)
}
var node *v3corepb.Node
@ -414,7 +414,7 @@ func newConfigFromContents(data []byte) (*Config, error) {
}
bc, err := parser.ParseConfig(nameAndConfig.Config)
if err != nil {
return nil, fmt.Errorf("xds: Config parsing for plugin %q failed: %v", name, err)
return nil, fmt.Errorf("xds: config parsing for plugin %q failed: %v", name, err)
}
configs[instance] = bc
}
@ -452,13 +452,13 @@ func newConfigFromContents(data []byte) (*Config, error) {
config.ClientDefaultListenerResourceNameTemplate = "%s"
}
if config.XDSServer == nil {
return nil, fmt.Errorf("xds: Required field %q not found in bootstrap %s", "xds_servers", jsonData["xds_servers"])
return nil, fmt.Errorf("xds: required field %q not found in bootstrap %s", "xds_servers", jsonData["xds_servers"])
}
if config.XDSServer.ServerURI == "" {
return nil, fmt.Errorf("xds: Required field %q not found in bootstrap %s", "xds_servers.server_uri", jsonData["xds_servers"])
return nil, fmt.Errorf("xds: required field %q not found in bootstrap %s", "xds_servers.server_uri", jsonData["xds_servers"])
}
if config.XDSServer.Creds == nil {
return nil, fmt.Errorf("xds: Required field %q doesn't contain valid value in bootstrap %s", "xds_servers.channel_creds", jsonData["xds_servers"])
return nil, fmt.Errorf("xds: required field %q doesn't contain valid value in bootstrap %s", "xds_servers.channel_creds", jsonData["xds_servers"])
}
// Post-process the authorities' client listener resource template field:
// - if set, it must start with "xdstp://<authority_name>/"

View File

@ -56,6 +56,4 @@ type XDSClient interface {
ReportLoad(*bootstrap.ServerConfig) (*load.Store, func())
BootstrapConfig() *bootstrap.Config
Close()
}

View File

@ -37,20 +37,28 @@ import (
// The returned client is a reference counted singleton instance. This function
// creates a new client only when one doesn't already exist.
//
// Note that the first invocation of New() or NewWithConfig() sets the client
// singleton. The following calls will return the singleton client without
// checking or using the config.
func New() (XDSClient, error) {
// The second return value represents a close function which releases the
// caller's reference on the returned client. The caller is expected to invoke
// it once they are done using the client. The underlying client will be closed
// only when all references are released, and it is safe for the caller to
// invoke this close function multiple times.
func New() (XDSClient, func(), error) {
return newRefCountedWithConfig(nil)
}
// NewWithConfig returns a new xDS client configured by the given config.
//
// Internal/Testing Only
// The second return value represents a close function which releases the
// caller's reference on the returned client. The caller is expected to invoke
// it once they are done using the client. The underlying client will be closed
// only when all references are released, and it is safe for the caller to
// invoke this close function multiple times.
//
// # Internal/Testing Only
//
// This function should ONLY be used for internal (c2p resolver) and/or testing
// purposese. DO NOT use this elsewhere. Use New() instead.
func NewWithConfig(config *bootstrap.Config) (XDSClient, error) {
func NewWithConfig(config *bootstrap.Config) (XDSClient, func(), error) {
return newRefCountedWithConfig(config)
}
@ -76,58 +84,79 @@ func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, i
// NewWithConfigForTesting returns an xDS client for the specified bootstrap
// config, separate from the global singleton.
//
// The second return value represents a close function which the caller is
// expected to invoke once they are done using the client. It is safe for the
// caller to invoke this close function multiple times.
//
// # Testing Only
//
// This function should ONLY be used for testing purposes.
func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout, authorityIdleTimeout time.Duration) (XDSClient, error) {
// TODO(easwars): Document the new close func.
func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout, authorityIdleTimeout time.Duration) (XDSClient, func(), error) {
cl, err := newWithConfig(config, watchExpiryTimeout, authorityIdleTimeout)
if err != nil {
return nil, err
return nil, nil, err
}
return &clientRefCounted{clientImpl: cl, refCount: 1}, nil
return cl, grpcsync.OnceFunc(cl.close), nil
}
// NewWithBootstrapContentsForTesting returns an xDS client for this config,
// separate from the global singleton.
//
// The second return value represents a close function which the caller is
// expected to invoke once they are done using the client. It is safe for the
// caller to invoke this close function multiple times.
//
// # Testing Only
//
// This function should ONLY be used for testing purposes.
func NewWithBootstrapContentsForTesting(contents []byte) (XDSClient, error) {
func NewWithBootstrapContentsForTesting(contents []byte) (XDSClient, func(), error) {
// Normalize the contents
buf := bytes.Buffer{}
err := json.Indent(&buf, contents, "", "")
if err != nil {
return nil, fmt.Errorf("xds: error normalizing JSON: %v", err)
return nil, nil, fmt.Errorf("xds: error normalizing JSON: %v", err)
}
contents = bytes.TrimSpace(buf.Bytes())
c, err := getOrMakeClientForTesting(contents)
if err != nil {
return nil, nil, err
}
return c, grpcsync.OnceFunc(func() {
clientsMu.Lock()
defer clientsMu.Unlock()
if c.decrRef() == 0 {
c.close()
delete(clients, string(contents))
}
}), nil
}
// getOrMakeClientForTesting creates a new reference counted client (separate
// from the global singleton) for the given config, or returns an existing one.
// It takes care of incrementing the reference count for the returned client,
// and leaves the caller responsible for decrementing the reference count once
// the client is no longer needed.
func getOrMakeClientForTesting(config []byte) (*clientRefCounted, error) {
clientsMu.Lock()
defer clientsMu.Unlock()
if c := clients[string(contents)]; c != nil {
c.mu.Lock()
// Since we don't remove the *Client from the map when it is closed, we
// need to recreate the impl if the ref count dropped to zero.
if c.refCount > 0 {
c.refCount++
c.mu.Unlock()
return c, nil
}
c.mu.Unlock()
if c := clients[string(config)]; c != nil {
c.incrRef()
return c, nil
}
bcfg, err := bootstrap.NewConfigFromContentsForTesting(contents)
bcfg, err := bootstrap.NewConfigFromContentsForTesting(config)
if err != nil {
return nil, fmt.Errorf("xds: error with bootstrap config: %v", err)
return nil, fmt.Errorf("bootstrap config %s: %v", string(config), err)
}
cImpl, err := newWithConfig(bcfg, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout)
if err != nil {
return nil, err
return nil, fmt.Errorf("creating xDS client: %v", err)
}
c := &clientRefCounted{clientImpl: cImpl, refCount: 1}
clients[string(contents)] = c
clients[string(config)] = c
return c, nil
}

View File

@ -34,9 +34,6 @@ func Test(t *testing.T) {
}
const (
testXDSServer = "xds-server"
testXDSServerAuthority = "xds-server-authority"
defaultTestWatchExpiryTimeout = 500 * time.Millisecond
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.

View File

@ -68,11 +68,8 @@ func (c *clientImpl) BootstrapConfig() *bootstrap.Config {
return c.config
}
// Close closes the gRPC connection to the management server.
//
// TODO: ensure that all underlying transports are closed before this function
// returns.
func (c *clientImpl) Close() {
// close closes the gRPC connection to the management server.
func (c *clientImpl) close() {
if c.done.HasFired() {
return
}

View File

@ -66,8 +66,8 @@ var (
// default and the third one pointing to the non-default).
//
// Returns two listeners used by the default and non-default management servers
// respectively, and the xDS client.
func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.Duration) (*testutils.ListenerWrapper, *testutils.ListenerWrapper, xdsclient.XDSClient) {
// respectively, and the xDS client and its close function.
func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.Duration) (*testutils.ListenerWrapper, *testutils.ListenerWrapper, xdsclient.XDSClient, func()) {
overrideFedEnvVar(t)
// Create listener wrappers which notify on to a channel whenever a new
@ -94,7 +94,7 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
// have empty server configs, and therefore end up using the default server
// config, which points to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: defaultAuthorityServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -117,7 +117,6 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
t.Cleanup(func() { client.Close() })
resources := e2e.UpdateOptions{
NodeID: nodeID,
@ -132,7 +131,7 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
if err := defaultAuthorityServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
return lisDefault, lisNonDefault, client
return lisDefault, lisNonDefault, client, close
}
// TestAuthorityShare tests the authority sharing logic. The test verifies the
@ -145,7 +144,8 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T, idleTimeout time.
func (s) TestAuthorityShare(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
lis, _, client := setupForAuthorityTests(ctx, t, time.Duration(0))
lis, _, client, close := setupForAuthorityTests(ctx, t, time.Duration(0))
defer close()
// Verify that no connection is established to the management server at this
// point. A transport is created only when a resource (which belongs to that
@ -189,7 +189,8 @@ func (s) TestAuthorityShare(t *testing.T) {
func (s) TestAuthorityIdleTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
lis, _, client := setupForAuthorityTests(ctx, t, defaultTestIdleAuthorityTimeout)
lis, _, client, close := setupForAuthorityTests(ctx, t, defaultTestIdleAuthorityTimeout)
defer close()
// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})
@ -234,7 +235,7 @@ func (s) TestAuthorityClientClose(t *testing.T) {
// test, until explicitly closed.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
lisDefault, lisNonDefault, client := setupForAuthorityTests(ctx, t, time.Duration(2*defaultTestTimeout))
lisDefault, lisNonDefault, client, close := setupForAuthorityTests(ctx, t, time.Duration(2*defaultTestTimeout))
// Request the first resource. Verify that a new transport is created to the
// default management server.
@ -267,7 +268,7 @@ func (s) TestAuthorityClientClose(t *testing.T) {
// Closing the xDS client should close the connection to both management
// servers, even though we have an open watch to one of them.
client.Close()
close()
if _, err := connDefault.CloseCh.Receive(ctx); err != nil {
t.Fatal("Connection to management server not closed after client close")
}
@ -281,7 +282,8 @@ func (s) TestAuthorityClientClose(t *testing.T) {
func (s) TestAuthorityRevive(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
lis, _, client := setupForAuthorityTests(ctx, t, defaultTestIdleAuthorityTimeout)
lis, _, client, close := setupForAuthorityTests(ctx, t, defaultTestIdleAuthorityTimeout)
defer close()
// Request the first resource. Verify that a new transport is created.
cdsCancel1 := client.WatchCluster(authorityTestResourceName11, func(u xdsresource.ClusterUpdate, err error) {})

View File

@ -148,11 +148,11 @@ func (s) TestCDSWatch(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a cluster resource and have the watch
// callback push the received update on to a channel.
@ -278,11 +278,11 @@ func (s) TestCDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for the same cluster resource and have the
// callbacks push the received updates on to a channel.
@ -363,11 +363,11 @@ func (s) TestCDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for the same cluster resource and have the
// callbacks push the received updates on to a channel.
@ -460,11 +460,11 @@ func (s) TestCDSWatch_ResourceCaching(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a cluster resource and have the watch
// callback push the received update on to a channel.
@ -532,7 +532,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// receive a response for the watch being registered by the test.
// Create an xDS client talking to a non-existent management server.
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy management server address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -543,7 +543,7 @@ func (s) TestCDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a resource which is expected to be invoked with an
// error after the watch expiry timer fires.
@ -579,7 +579,7 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -590,7 +590,7 @@ func (s) TestCDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a cluster resource and have the watch
// callback push the received update on to a channel.
@ -648,11 +648,11 @@ func (s) TesCDSWatch_ResourceRemoved(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for two cluster resources and have the
// callbacks push the received updates on to a channel.
@ -757,11 +757,11 @@ func (s) TestCDSWatch_NACKError(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a cluster resource and have the watch
// callback push the received update on to a channel.
@ -806,11 +806,11 @@ func (s) TestCDSWatch_PartialValid(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for cluster resources. The first watch is expected
// to receive an error because the received resource is NACK'ed. The second
@ -880,11 +880,11 @@ func (s) TestCDSWatch_PartialResponse(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for two cluster resources and have the
// callbacks push the received updates on to a channel.

View File

@ -98,11 +98,11 @@ func (s) TestDumpResources(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Dump resources and expect empty configs.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)

View File

@ -172,11 +172,11 @@ func (s) TestEDSWatch(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a endpoint resource and have the watch
// callback push the received update on to a channel.
@ -326,11 +326,11 @@ func (s) TestEDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for the same endpoint resource and have the
// callbacks push the received updates on to a channel.
@ -412,11 +412,11 @@ func (s) TestEDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for the same endpoint resource and have the
// callbacks push the received updates on to a channel.
@ -510,11 +510,11 @@ func (s) TestEDSWatch_ResourceCaching(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for an endpoint resource and have the watch callback
// push the received update on to a channel.
@ -589,7 +589,7 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// receive a response for the watch being registered by the test.
// Create an xDS client talking to a non-existent management server.
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy management server address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -600,7 +600,7 @@ func (s) TestEDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a resource which is expected to fail with an error
// after the watch expiry timer fires.
@ -636,7 +636,7 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -647,7 +647,7 @@ func (s) TestEDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for an endpoint resource and have the watch callback
// push the received update on to a channel.
@ -704,11 +704,11 @@ func (s) TestEDSWatch_NACKError(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a route configuration resource and have the watch
// callback push the received update on to a channel.
@ -753,11 +753,11 @@ func (s) TestEDSWatch_PartialValid(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for two endpoint resources. The first watch is
// expected to receive an error because the received resource is NACKed.

View File

@ -73,10 +73,11 @@ func setupForFederationWatchersTest(t *testing.T) (*e2e.ManagementServer, string
t.Fatalf("Failed to create bootstrap file: %v", err)
}
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
t.Cleanup(close)
return serverNonDefaultAuthority, nodeID, client
}
@ -88,7 +89,6 @@ func setupForFederationWatchersTest(t *testing.T) (*e2e.ManagementServer, string
// that both watchers are notified.
func (s) TestFederation_ListenerResourceContextParamOrder(t *testing.T) {
serverNonDefaultAuthority, nodeID, client := setupForFederationWatchersTest(t)
defer client.Close()
var (
// Two resource names only differ in context parameter order.
@ -145,7 +145,6 @@ func (s) TestFederation_ListenerResourceContextParamOrder(t *testing.T) {
// parameters. The test verifies that both watchers are notified.
func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) {
serverNonDefaultAuthority, nodeID, client := setupForFederationWatchersTest(t)
defer client.Close()
var (
// Two resource names only differ in context parameter order.
@ -212,7 +211,6 @@ func (s) TestFederation_RouteConfigResourceContextParamOrder(t *testing.T) {
// that both watchers are notified.
func (s) TestFederation_ClusterResourceContextParamOrder(t *testing.T) {
serverNonDefaultAuthority, nodeID, client := setupForFederationWatchersTest(t)
defer client.Close()
var (
// Two resource names only differ in context parameter order.
@ -269,7 +267,6 @@ func (s) TestFederation_ClusterResourceContextParamOrder(t *testing.T) {
// that both watchers are notified.
func (s) TestFederation_EndpointsResourceContextParamOrder(t *testing.T) {
serverNonDefaultAuthority, nodeID, client := setupForFederationWatchersTest(t)
defer client.Close()
var (
// Two resource names only differ in context parameter order.

View File

@ -201,11 +201,11 @@ func (s) TestLDSWatch(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
@ -331,11 +331,11 @@ func (s) TestLDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for the same listener resource and have the
// callbacks push the received updates on to a channel.
@ -417,11 +417,11 @@ func (s) TestLDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for the same listener resource and have the
// callbacks push the received updates on to a channel.
@ -509,11 +509,11 @@ func (s) TestLDSWatch_ResourceCaching(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
@ -581,7 +581,7 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// receive a response for the watch being registered by the test.
// Create an xDS client talking to a non-existent management server.
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy management server address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -592,7 +592,7 @@ func (s) TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a resource which is expected to fail with an error
// after the watch expiry timer fires.
@ -628,7 +628,7 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -639,7 +639,7 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
@ -698,11 +698,11 @@ func (s) TestLDSWatch_ResourceRemoved(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for two listener resources and have the
// callbacks push the received updates on to a channel.
@ -806,11 +806,11 @@ func (s) TestLDSWatch_NACKError(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a listener resource and have the watch
// callback push the received update on to a channel.
@ -855,11 +855,11 @@ func (s) TestLDSWatch_PartialValid(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for listener resources. The first watch is expected
// to receive an error because the received resource is NACKed. The second
@ -930,11 +930,11 @@ func (s) TestLDSWatch_PartialResponse(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for two listener resources and have the
// callbacks push the received updates on to a channel.

View File

@ -42,11 +42,11 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Configure the management server to respond with route config resources.
resources := e2e.UpdateOptions{

View File

@ -182,11 +182,11 @@ func (s) TestRDSWatch(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a route configuration resource and have the
// watch callback push the received update on to a channel.
@ -352,11 +352,11 @@ func (s) TestRDSWatch_TwoWatchesForSameResourceName(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for the same route configuration resource
// and have the callbacks push the received updates on to a channel.
@ -438,11 +438,11 @@ func (s) TestRDSWatch_ThreeWatchesForDifferentResourceNames(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for the same route configuration resource
// and have the callbacks push the received updates on to a channel.
@ -540,11 +540,11 @@ func (s) TestRDSWatch_ResourceCaching(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a route configuration resource and have the watch
// callback push the received update on to a channel.
@ -622,7 +622,7 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
// receive a response for the watch being registered by the test.
// Create an xDS client talking to a non-existent management server.
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy management server address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -633,7 +633,7 @@ func (s) TestRDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a resource which is expected to fail with an error
// after the watch expiry timer fires.
@ -669,7 +669,7 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -680,7 +680,7 @@ func (s) TestRDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a route configuration resource and have the watch
// callback push the received update on to a channel.
@ -741,11 +741,11 @@ func (s) TestRDSWatch_NACKError(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register a watch for a route configuration resource and have the watch
// callback push the received update on to a channel.
@ -790,11 +790,11 @@ func (s) TestRDSWatch_PartialValid(t *testing.T) {
defer cleanup()
// Create an xDS client with the above bootstrap contents.
client, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
if err != nil {
t.Fatalf("Failed to create xDS client: %v", err)
}
defer client.Close()
defer close()
// Register two watches for route configuration resources. The first watch
// is expected to receive an error because the received resource is NACKed.

View File

@ -243,7 +243,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -255,7 +255,7 @@ func (s) TestHandleListenerResponseFromManagementServer(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// A wrapper struct to wrap the update and the associated error, as
@ -510,7 +510,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -522,7 +522,7 @@ func (s) TestHandleRouteConfigResponseFromManagementServer(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// A wrapper struct to wrap the update and the associated error, as
@ -753,7 +753,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -765,7 +765,7 @@ func (s) TestHandleClusterResponseFromManagementServer(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// A wrapper struct to wrap the update and the associated error, as
@ -1079,7 +1079,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
// Create an xDS client talking to the above management server.
nodeID := uuid.New().String()
client, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
client, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -1091,7 +1091,7 @@ func (s) TestHandleEndpointsResponseFromManagementServer(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer client.Close()
defer close()
t.Logf("Created xDS client to %s", mgmtServer.Address)
// A wrapper struct to wrap the update and the associated error, as

View File

@ -50,7 +50,7 @@ func (s) TestLRSClient(t *testing.T) {
}
defer sCleanup()
xdsC, err := NewWithConfigForTesting(&bootstrap.Config{
xdsC, close, err := NewWithConfigForTesting(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: fs.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
@ -61,9 +61,7 @@ func (s) TestLRSClient(t *testing.T) {
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
defer xdsC.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
defer close()
// Report to the same address should not create new ClientConn.
store1, lrsCancel1 := xdsC.ReportLoad(
@ -77,6 +75,8 @@ func (s) TestLRSClient(t *testing.T) {
)
defer lrsCancel1()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if u, err := fs.NewConnChan.Receive(ctx); err != nil {
t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
}

View File

@ -21,8 +21,11 @@ package xdsclient
import (
"fmt"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
)
@ -34,7 +37,8 @@ const (
var (
// This is the client returned by New(). It contains one client implementation,
// and maintains the refcount.
singletonClient = &clientRefCounted{}
singletonMu sync.Mutex
singletonClient *clientRefCounted
// The following functions are no-ops in the actual code, but can be
// overridden in tests to give them visibility into certain events.
@ -45,57 +49,57 @@ var (
// To override in tests.
var bootstrapNewConfig = bootstrap.NewConfig
// onceClosingClient is a thin wrapper around clientRefCounted. The Close()
// method is overridden such that the underlying reference counted client's
// Close() is called at most once, thereby making Close() idempotent.
//
// This is the type which is returned by New() and NewWithConfig(), making it
// safe for these callers to call Close() any number of times.
type onceClosingClient struct {
XDSClient
func clientRefCountedClose() {
singletonMu.Lock()
defer singletonMu.Unlock()
once sync.Once
if singletonClient.decrRef() != 0 {
return
}
singletonClient.clientImpl.close()
singletonClientImplCloseHook()
singletonClient = nil
}
func (o *onceClosingClient) Close() {
o.once.Do(o.XDSClient.Close)
}
func newRefCountedWithConfig(fallbackConfig *bootstrap.Config) (XDSClient, func(), error) {
singletonMu.Lock()
defer singletonMu.Unlock()
func newRefCountedWithConfig(config *bootstrap.Config) (XDSClient, error) {
singletonClient.mu.Lock()
defer singletonClient.mu.Unlock()
if singletonClient != nil {
singletonClient.incrRef()
return singletonClient, grpcsync.OnceFunc(clientRefCountedClose), nil
// If the client implementation was created, increment ref count and return
// the client.
if singletonClient.clientImpl != nil {
singletonClient.refCount++
return &onceClosingClient{XDSClient: singletonClient}, nil
}
// If the passed in config is nil, perform bootstrap to read config.
if config == nil {
// Use fallbackConfig only if bootstrap env vars are unspecified.
var config *bootstrap.Config
if envconfig.XDSBootstrapFileName == "" && envconfig.XDSBootstrapFileContent == "" {
if fallbackConfig == nil {
return nil, nil, fmt.Errorf("xds: bootstrap env vars are unspecified and provided fallback config is nil")
}
config = fallbackConfig
} else {
var err error
config, err = bootstrapNewConfig()
if err != nil {
return nil, fmt.Errorf("xds: failed to read bootstrap file: %v", err)
return nil, nil, fmt.Errorf("xds: failed to read bootstrap file: %v", err)
}
}
// Create the new client implementation.
c, err := newWithConfig(config, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout)
if err != nil {
return nil, err
return nil, nil, err
}
singletonClient.clientImpl = c
singletonClient.refCount++
singletonClient = &clientRefCounted{clientImpl: c, refCount: 1}
singletonClientImplCreateHook()
nodeID := "<unknown>"
if node, ok := config.XDSServer.NodeProto.(interface{ GetId() string }); ok {
nodeID = node.GetId()
}
logger.Infof("xDS node ID: %s", nodeID)
return &onceClosingClient{XDSClient: singletonClient}, nil
return singletonClient, grpcsync.OnceFunc(clientRefCountedClose), nil
}
// clientRefCounted is ref-counted, and to be shared by the xds resolver and
@ -103,23 +107,13 @@ func newRefCountedWithConfig(config *bootstrap.Config) (XDSClient, error) {
type clientRefCounted struct {
*clientImpl
// This mu protects all the fields, including the embedded clientImpl above.
mu sync.Mutex
refCount int
refCount int32 // accessed atomically
}
// Close closes the client. It does ref count of the xds client implementation,
// and closes the gRPC connection to the management server when ref count
// reaches 0.
func (c *clientRefCounted) Close() {
c.mu.Lock()
defer c.mu.Unlock()
c.refCount--
if c.refCount == 0 {
c.clientImpl.Close()
// Set clientImpl back to nil. So if New() is called after this, a new
// implementation will be created.
c.clientImpl = nil
singletonClientImplCloseHook()
}
func (c *clientRefCounted) incrRef() int32 {
return atomic.AddInt32(&c.refCount, 1)
}
func (c *clientRefCounted) decrRef() int32 {
return atomic.AddInt32(&c.refCount, -1)
}

View File

@ -22,28 +22,26 @@ import (
"context"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/google/uuid"
"google.golang.org/grpc/internal/testutils"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/internal/testutils/xds/bootstrap"
)
// Test that multiple New() returns the same Client. And only when the last
// client is closed, the underlying client is closed.
func (s) TestClientNewSingleton(t *testing.T) {
// Override bootstrap with a fake config.
oldBootstrapNewConfig := bootstrapNewConfig
bootstrapNewConfig = func() (*bootstrap.Config, error) {
return &bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: testXDSServer,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
NodeProto: xdstestutils.EmptyNodeProtoV2,
},
}, nil
// Create a bootstrap configuration, place it in a file in the temp
// directory, and set the bootstrap env vars to point to it.
nodeID := uuid.New().String()
cleanup, err := bootstrap.CreateFile(bootstrap.Options{
NodeID: nodeID,
ServerURI: "non-existent-server-address",
Version: bootstrap.TransportV3,
})
if err != nil {
t.Fatal(err)
}
defer func() { bootstrapNewConfig = oldBootstrapNewConfig }()
defer cleanup()
// Override the singleton creation hook to get notified.
origSingletonClientImplCreateHook := singletonClientImplCreateHook
@ -62,7 +60,7 @@ func (s) TestClientNewSingleton(t *testing.T) {
defer func() { singletonClientImplCloseHook = origSingletonClientImplCloseHook }()
// The first call to New() should create a new singleton client.
client, err := New()
_, closeFunc, err := New()
if err != nil {
t.Fatalf("failed to create xDS client: %v", err)
}
@ -75,10 +73,10 @@ func (s) TestClientNewSingleton(t *testing.T) {
// Calling New() again should not create new singleton client implementations.
const count = 9
clients := make([]XDSClient, count)
closeFuncs := make([]func(), 9)
for i := 0; i < count; i++ {
func() {
clients[i], err = New()
_, closeFuncs[i], err = New()
if err != nil {
t.Fatalf("%d-th call to New() failed with error: %v", i, err)
}
@ -97,8 +95,8 @@ func (s) TestClientNewSingleton(t *testing.T) {
// acquired above, via the first call to New().
for i := 0; i < count; i++ {
func() {
clients[i].Close()
clients[i].Close()
closeFuncs[i]()
closeFuncs[i]()
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
@ -109,18 +107,18 @@ func (s) TestClientNewSingleton(t *testing.T) {
}
// Call the last Close(). The underlying implementation should be closed.
client.Close()
closeFunc()
if _, err := singletonCloseCh.Receive(ctx); err != nil {
t.Fatalf("Timeout waiting for singleton client implementation to be closed: %v", err)
}
// Calling New() again, after the previous Client was actually closed, should
// create a new one.
client, err = New()
_, closeFunc, err = New()
if err != nil {
t.Fatalf("failed to create client: %v", err)
}
defer client.Close()
defer closeFunc()
if _, err := singletonCreationCh.Receive(ctx); err != nil {
t.Fatalf("Timeout when waiting for singleton xDS client to be created: %v", err)
}

View File

@ -49,7 +49,7 @@ const serverPrefix = "[xds-server %p] "
var (
// These new functions will be overridden in unit tests.
newXDSClient = func() (xdsclient.XDSClient, error) {
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
return xdsclient.New()
}
newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
@ -89,8 +89,9 @@ type GRPCServer struct {
// clientMu is used only in initXDSClient(), which is called at the
// beginning of Serve(), where we have to decide if we have to create a
// client or use an existing one.
clientMu sync.Mutex
xdsC xdsclient.XDSClient
clientMu sync.Mutex
xdsC xdsclient.XDSClient
xdsClientClose func()
}
// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
@ -184,16 +185,17 @@ func (s *GRPCServer) initXDSClient() error {
newXDSClient := newXDSClient
if s.opts.bootstrapContentsForTesting != nil {
// Bootstrap file contents may be specified as a server option for tests.
newXDSClient = func() (xdsclient.XDSClient, error) {
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
}
}
client, err := newXDSClient()
client, close, err := newXDSClient()
if err != nil {
return fmt.Errorf("xds: failed to create xds-client: %v", err)
}
s.xdsC = client
s.xdsClientClose = close
s.logger.Infof("Created an xdsClient")
return nil
}
@ -334,7 +336,7 @@ func (s *GRPCServer) Stop() {
s.quit.Fire()
s.gs.Stop()
if s.xdsC != nil {
s.xdsC.Close()
s.xdsClientClose()
}
}
@ -345,7 +347,7 @@ func (s *GRPCServer) GracefulStop() {
s.quit.Fire()
s.gs.GracefulStop()
if s.xdsC != nil {
s.xdsC.Close()
s.xdsClientClose()
}
}

View File

@ -320,7 +320,7 @@ func (p *fakeProvider) Close() {
func setupOverrides() (*fakeGRPCServer, *testutils.Channel, func()) {
clientCh := testutils.NewChannel()
origNewXDSClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, error) {
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
c := fakeclient.NewClient()
c.SetBootstrapConfig(&bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
@ -332,7 +332,7 @@ func setupOverrides() (*fakeGRPCServer, *testutils.Channel, func()) {
CertProviderConfigs: certProviderConfigs,
})
clientCh.Send(c)
return c, nil
return c, func() {}, nil
}
fs := newFakeGRPCServer()
@ -352,7 +352,7 @@ func setupOverrides() (*fakeGRPCServer, *testutils.Channel, func()) {
func setupOverridesForXDSCreds(includeCertProviderCfg bool) (*testutils.Channel, func()) {
clientCh := testutils.NewChannel()
origNewXDSClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, error) {
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
c := fakeclient.NewClient()
bc := &bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
@ -367,7 +367,7 @@ func setupOverridesForXDSCreds(includeCertProviderCfg bool) (*testutils.Channel,
}
c.SetBootstrapConfig(bc)
clientCh.Send(c)
return c, nil
return c, func() {}, nil
}
return clientCh, func() { newXDSClient = origNewXDSClient }
@ -631,11 +631,11 @@ func (s) TestServeBootstrapConfigInvalid(t *testing.T) {
// xdsClient with the specified bootstrap configuration.
clientCh := testutils.NewChannel()
origNewXDSClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, error) {
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
c := fakeclient.NewClient()
c.SetBootstrapConfig(test.bootstrapConfig)
clientCh.Send(c)
return c, nil
return c, func() {}, nil
}
defer func() { newXDSClient = origNewXDSClient }()
@ -674,8 +674,8 @@ func (s) TestServeBootstrapConfigInvalid(t *testing.T) {
// verifies that Server() exits with a non-nil error.
func (s) TestServeNewClientFailure(t *testing.T) {
origNewXDSClient := newXDSClient
newXDSClient = func() (xdsclient.XDSClient, error) {
return nil, errors.New("xdsClient creation failed")
newXDSClient = func() (xdsclient.XDSClient, func(), error) {
return nil, nil, errors.New("xdsClient creation failed")
}
defer func() { newXDSClient = origNewXDSClient }()