mirror of https://github.com/grpc/grpc-go.git
xdsclient: make Close() idempotent (#5149)
This commit is contained in:
parent
6f54b5ddbe
commit
9cb4113808
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/uuid"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/internal/xds"
|
||||
_ "google.golang.org/grpc/xds/internal/httpfilter/router"
|
||||
|
|
@ -50,9 +51,7 @@ import (
|
|||
v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTestTimeout = 10 * time.Second
|
||||
)
|
||||
const defaultTestTimeout = 10 * time.Second
|
||||
|
||||
var cmpOpts = cmp.Options{
|
||||
cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig {
|
||||
|
|
@ -113,6 +112,14 @@ var (
|
|||
ports = []uint32{123, 456}
|
||||
)
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
grpctest.RunSubTests(t, s{})
|
||||
}
|
||||
|
||||
func init() {
|
||||
for i := range ldsTargets {
|
||||
listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i])
|
||||
|
|
@ -132,7 +139,7 @@ func init() {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCSDS(t *testing.T) {
|
||||
func (s) TestCSDS(t *testing.T) {
|
||||
const retryCount = 10
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
|
|
@ -242,12 +249,13 @@ func commonSetup(ctx context.Context, t *testing.T) (xdsclient.XDSClient, *e2e.M
|
|||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create xds_client.
|
||||
xdsC, err := xdsclient.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create xds client: %v", err)
|
||||
}
|
||||
oldNewXDSClient := newXDSClient
|
||||
origNewXDSClient := newXDSClient
|
||||
newXDSClient = func() xdsclient.XDSClient { return xdsC }
|
||||
|
||||
// Initialize an gRPC server and register CSDS on it.
|
||||
|
|
@ -257,6 +265,7 @@ func commonSetup(ctx context.Context, t *testing.T) (xdsclient.XDSClient, *e2e.M
|
|||
t.Fatal(err)
|
||||
}
|
||||
v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
|
||||
|
||||
// Create a local listener and pass it to Serve().
|
||||
lis, err := testutils.LocalTCPListener()
|
||||
if err != nil {
|
||||
|
|
@ -284,7 +293,7 @@ func commonSetup(ctx context.Context, t *testing.T) (xdsclient.XDSClient, *e2e.M
|
|||
conn.Close()
|
||||
server.Stop()
|
||||
csdss.Close()
|
||||
newXDSClient = oldNewXDSClient
|
||||
newXDSClient = origNewXDSClient
|
||||
xdsC.Close()
|
||||
bootstrapCleanup()
|
||||
}
|
||||
|
|
@ -490,7 +499,7 @@ func checkForNACKed(nackResourceIdx int, stream v3statuspbgrpc.ClientStatusDisco
|
|||
return nil
|
||||
}
|
||||
|
||||
func TestCSDSNoXDSClient(t *testing.T) {
|
||||
func (s) TestCSDSNoXDSClient(t *testing.T) {
|
||||
oldNewXDSClient := newXDSClient
|
||||
newXDSClient = func() xdsclient.XDSClient { return nil }
|
||||
defer func() { newXDSClient = oldNewXDSClient }()
|
||||
|
|
|
|||
|
|
@ -68,25 +68,17 @@ type clientImpl struct {
|
|||
}
|
||||
|
||||
// newWithConfig returns a new xdsClient with the given config.
|
||||
func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (_ *clientImpl, retErr error) {
|
||||
func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) {
|
||||
c := &clientImpl{
|
||||
done: grpcsync.NewEvent(),
|
||||
config: config,
|
||||
watchExpiryTimeout: watchExpiryTimeout,
|
||||
|
||||
authorities: make(map[string]*authority),
|
||||
idleAuthorities: cache.NewTimeoutCache(idleAuthorityDeleteTimeout),
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
c.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
c.logger = prefixLogger(c)
|
||||
c.logger.Infof("Created ClientConn to xDS management server: %s", config.XDSServer)
|
||||
|
||||
c.logger.Infof("Created")
|
||||
return c, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -261,113 +261,3 @@ func verifyEndpointsUpdate(ctx context.Context, updateCh *testutils.Channel, wan
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Test that multiple New() returns the same Client. And only when the last
|
||||
// client is closed, the underlying client is closed.
|
||||
func (s) TestClientNewSingleton(t *testing.T) {
|
||||
oldBootstrapNewConfig := bootstrapNewConfig
|
||||
bootstrapNewConfig = func() (*bootstrap.Config, error) {
|
||||
return &bootstrap.Config{
|
||||
XDSServer: &bootstrap.ServerConfig{
|
||||
ServerURI: testXDSServer,
|
||||
Creds: grpc.WithInsecure(),
|
||||
NodeProto: xdstestutils.EmptyNodeProtoV2,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
defer func() { bootstrapNewConfig = oldBootstrapNewConfig }()
|
||||
|
||||
ctrlCh := overrideNewController(t)
|
||||
|
||||
// The first New(). Should create a Client and a new APIClient.
|
||||
client, err := newRefCounted()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
|
||||
// Call a watch to create the controller.
|
||||
client.WatchCluster("doesnot-matter", func(update xdsresource.ClusterUpdate, err error) {})
|
||||
|
||||
clientImpl := client.clientImpl
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
c, err := ctrlCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("timeout when waiting for API client to be created: %v", err)
|
||||
}
|
||||
apiClient := c.(*testController)
|
||||
|
||||
// Call New() again. They should all return the same client implementation,
|
||||
// and should not create new API client.
|
||||
const count = 9
|
||||
for i := 0; i < count; i++ {
|
||||
tc, terr := newRefCounted()
|
||||
if terr != nil {
|
||||
client.Close()
|
||||
t.Fatalf("%d-th call to New() failed with error: %v", i, terr)
|
||||
}
|
||||
if tc.clientImpl != clientImpl {
|
||||
client.Close()
|
||||
tc.Close()
|
||||
t.Fatalf("%d-th call to New() got a different client %p, want %p", i, tc.clientImpl, clientImpl)
|
||||
}
|
||||
|
||||
sctx, scancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
|
||||
defer scancel()
|
||||
_, err := ctrlCh.Receive(sctx)
|
||||
if err == nil {
|
||||
client.Close()
|
||||
t.Fatalf("%d-th call to New() created a new API client", i)
|
||||
}
|
||||
}
|
||||
|
||||
// Call Close(). Nothing should be actually closed until the last ref calls
|
||||
// Close().
|
||||
for i := 0; i < count; i++ {
|
||||
client.Close()
|
||||
if clientImpl.done.HasFired() {
|
||||
t.Fatalf("%d-th call to Close(), unexpected done in the client implemenation", i)
|
||||
}
|
||||
if apiClient.done.HasFired() {
|
||||
t.Fatalf("%d-th call to Close(), unexpected done in the API client", i)
|
||||
}
|
||||
}
|
||||
|
||||
// Call the last Close(). The underlying implementation and API Client
|
||||
// should all be closed.
|
||||
client.Close()
|
||||
if !clientImpl.done.HasFired() {
|
||||
t.Fatalf("want client implementation to be closed, got not done")
|
||||
}
|
||||
if !apiClient.done.HasFired() {
|
||||
t.Fatalf("want API client to be closed, got not done")
|
||||
}
|
||||
|
||||
// Call New() again after the previous Client is actually closed. Should
|
||||
// create a Client and a new APIClient.
|
||||
client2, err2 := newRefCounted()
|
||||
if err2 != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
defer client2.Close()
|
||||
|
||||
// Call a watch to create the controller.
|
||||
client2.WatchCluster("abc", func(update xdsresource.ClusterUpdate, err error) {})
|
||||
|
||||
c2, err := ctrlCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("timeout when waiting for API client to be created: %v", err)
|
||||
}
|
||||
apiClient2 := c2.(*testController)
|
||||
|
||||
// The client wrapper with ref count should be the same.
|
||||
if client2 != client {
|
||||
t.Fatalf("New() after Close() should return the same client wrapper, got different %p, %p", client2, client)
|
||||
}
|
||||
if client2.clientImpl == clientImpl {
|
||||
t.Fatalf("New() after Close() should return different client implementation, got the same %p", client2.clientImpl)
|
||||
}
|
||||
if apiClient2 == apiClient {
|
||||
t.Fatalf("New() after Close() should return different API client, got the same %p", apiClient2)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,13 +33,36 @@ const (
|
|||
defaultIdleAuthorityDeleteTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
// This is the Client returned by New(). It contains one client implementation,
|
||||
// and maintains the refcount.
|
||||
var singletonClient = &clientRefCounted{}
|
||||
var (
|
||||
// This is the Client returned by New(). It contains one client implementation,
|
||||
// and maintains the refcount.
|
||||
singletonClient = &clientRefCounted{}
|
||||
|
||||
// The following functions are no-ops in the actual code, but can be
|
||||
// overridden in tests to give them visibility into certain events.
|
||||
singletonClientImplCreateHook = func() {}
|
||||
singletonClientImplCloseHook = func() {}
|
||||
)
|
||||
|
||||
// To override in tests.
|
||||
var bootstrapNewConfig = bootstrap.NewConfig
|
||||
|
||||
// onceClosingClient is a thin wrapper around clientRefCounted. The Close()
|
||||
// method is overridden such that the underlying reference counted client's
|
||||
// Close() is called at most once, thereby making Close() idempotent.
|
||||
//
|
||||
// This is the type which is returned by New() and NewWithConfig(), making it
|
||||
// safe for these callers to call Close() any number of times.
|
||||
type onceClosingClient struct {
|
||||
XDSClient
|
||||
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func (o *onceClosingClient) Close() {
|
||||
o.once.Do(o.XDSClient.Close)
|
||||
}
|
||||
|
||||
// clientRefCounted is ref-counted, and to be shared by the xds resolver and
|
||||
// balancer implementations, across multiple ClientConns and Servers.
|
||||
type clientRefCounted struct {
|
||||
|
|
@ -70,29 +93,8 @@ func New() (XDSClient, error) {
|
|||
return c, nil
|
||||
}
|
||||
|
||||
func newRefCounted() (*clientRefCounted, error) {
|
||||
singletonClient.mu.Lock()
|
||||
defer singletonClient.mu.Unlock()
|
||||
// If the client implementation was created, increment ref count and return
|
||||
// the client.
|
||||
if singletonClient.clientImpl != nil {
|
||||
singletonClient.refCount++
|
||||
return singletonClient, nil
|
||||
}
|
||||
|
||||
// Create the new client implementation.
|
||||
config, err := bootstrapNewConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("xds: failed to read bootstrap file: %v", err)
|
||||
}
|
||||
c, err := newWithConfig(config, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
singletonClient.clientImpl = c
|
||||
singletonClient.refCount++
|
||||
return singletonClient, nil
|
||||
func newRefCounted() (XDSClient, error) {
|
||||
return newRefCountedWithConfig(nil)
|
||||
}
|
||||
|
||||
// NewWithConfig returns a new xdsClient configured by the given config.
|
||||
|
|
@ -107,13 +109,27 @@ func newRefCounted() (*clientRefCounted, error) {
|
|||
// This function is internal only, for c2p resolver and testing to use. DO NOT
|
||||
// use this elsewhere. Use New() instead.
|
||||
func NewWithConfig(config *bootstrap.Config) (XDSClient, error) {
|
||||
return newRefCountedWithConfig(config)
|
||||
}
|
||||
|
||||
func newRefCountedWithConfig(config *bootstrap.Config) (XDSClient, error) {
|
||||
singletonClient.mu.Lock()
|
||||
defer singletonClient.mu.Unlock()
|
||||
|
||||
// If the client implementation was created, increment ref count and return
|
||||
// the client.
|
||||
if singletonClient.clientImpl != nil {
|
||||
singletonClient.refCount++
|
||||
return singletonClient, nil
|
||||
return &onceClosingClient{XDSClient: singletonClient}, nil
|
||||
}
|
||||
|
||||
// If the passed in config is nil, perform bootstrap to read config.
|
||||
if config == nil {
|
||||
var err error
|
||||
config, err = bootstrapNewConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("xds: failed to read bootstrap file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create the new client implementation.
|
||||
|
|
@ -124,7 +140,8 @@ func NewWithConfig(config *bootstrap.Config) (XDSClient, error) {
|
|||
|
||||
singletonClient.clientImpl = c
|
||||
singletonClient.refCount++
|
||||
return singletonClient, nil
|
||||
singletonClientImplCreateHook()
|
||||
return &onceClosingClient{XDSClient: singletonClient}, nil
|
||||
}
|
||||
|
||||
// Close closes the client. It does ref count of the xds client implementation,
|
||||
|
|
@ -139,6 +156,7 @@ func (c *clientRefCounted) Close() {
|
|||
// Set clientImpl back to nil. So if New() is called after this, a new
|
||||
// implementation will be created.
|
||||
c.clientImpl = nil
|
||||
singletonClientImplCloseHook()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package xdsclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
|
||||
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
|
||||
)
|
||||
|
||||
// Test that multiple New() returns the same Client. And only when the last
|
||||
// client is closed, the underlying client is closed.
|
||||
func (s) TestClientNewSingleton(t *testing.T) {
|
||||
// Override bootstrap with a fake config.
|
||||
oldBootstrapNewConfig := bootstrapNewConfig
|
||||
bootstrapNewConfig = func() (*bootstrap.Config, error) {
|
||||
return &bootstrap.Config{
|
||||
XDSServer: &bootstrap.ServerConfig{
|
||||
ServerURI: testXDSServer,
|
||||
Creds: grpc.WithInsecure(),
|
||||
NodeProto: xdstestutils.EmptyNodeProtoV2,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
defer func() { bootstrapNewConfig = oldBootstrapNewConfig }()
|
||||
|
||||
// Override the singleton creation hook to get notified.
|
||||
origSingletonClientImplCreateHook := singletonClientImplCreateHook
|
||||
singletonCreationCh := testutils.NewChannel()
|
||||
singletonClientImplCreateHook = func() {
|
||||
singletonCreationCh.Replace(nil)
|
||||
}
|
||||
defer func() { singletonClientImplCreateHook = origSingletonClientImplCreateHook }()
|
||||
|
||||
// Override the singleton close hook to get notified.
|
||||
origSingletonClientImplCloseHook := singletonClientImplCloseHook
|
||||
singletonCloseCh := testutils.NewChannel()
|
||||
singletonClientImplCloseHook = func() {
|
||||
singletonCloseCh.Replace(nil)
|
||||
}
|
||||
defer func() { singletonClientImplCloseHook = origSingletonClientImplCloseHook }()
|
||||
|
||||
// The first call to New() should create a new singleton client.
|
||||
client, err := New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create xDS client: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if _, err := singletonCreationCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("Timeout when waiting for singleton xDS client to be created: %v", err)
|
||||
}
|
||||
|
||||
// Calling New() again should not create new singleton client implementations.
|
||||
const count = 9
|
||||
clients := make([]XDSClient, count)
|
||||
for i := 0; i < count; i++ {
|
||||
func() {
|
||||
clients[i], err = New()
|
||||
if err != nil {
|
||||
t.Fatalf("%d-th call to New() failed with error: %v", i, err)
|
||||
}
|
||||
|
||||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
if _, err := singletonCreationCh.Receive(sCtx); err == nil {
|
||||
t.Fatalf("%d-th call to New() created a new singleton client", i)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Call Close() multiple times on each of the clients created in the above for
|
||||
// loop. Close() calls are idempotent, and the underlying client
|
||||
// implementation will not be closed until we release the first reference we
|
||||
// acquired above, via the first call to New().
|
||||
for i := 0; i < count; i++ {
|
||||
func() {
|
||||
clients[i].Close()
|
||||
clients[i].Close()
|
||||
|
||||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
if _, err := singletonCloseCh.Receive(sCtx); err == nil {
|
||||
t.Fatal("singleton client implementation closed before all references are released")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Call the last Close(). The underlying implementation should be closed.
|
||||
client.Close()
|
||||
if _, err := singletonCloseCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("Timeout waiting for singleton client implementation to be closed: %v", err)
|
||||
}
|
||||
|
||||
// Calling New() again, after the previous Client was actually closed, should
|
||||
// create a new one.
|
||||
client, err = New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create client: %v", err)
|
||||
}
|
||||
defer client.Close()
|
||||
if _, err := singletonCreationCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("Timeout when waiting for singleton xDS client to be created: %v", err)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue