Cleanup etcd healthcheck on shutdown
Kubernetes-commit: cb80082f666e0e5fe220df32e31a8face18e9393
This commit is contained in:
parent
5f27f61940
commit
0a7c4bcca1
|
@ -528,6 +528,11 @@ func completeOpenAPI(config *openapicommon.Config, version *version.Info) {
|
|||
}
|
||||
}
|
||||
|
||||
// StopNotify returns a lifecycle signal of genericapiserver shutting down.
|
||||
func (c *Config) StopNotify() <-chan struct{} {
|
||||
return c.lifecycleSignals.ShutdownInitiated.Signaled()
|
||||
}
|
||||
|
||||
// Complete fills in any fields not set that are required to have valid data and can be derived
|
||||
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
|
||||
func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig {
|
||||
|
|
|
@ -226,7 +226,7 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
|
|||
}
|
||||
|
||||
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
|
||||
healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig)
|
||||
healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig, c.StopNotify())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -21,7 +21,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
|
@ -214,9 +216,12 @@ func TestKMSHealthzEndpoint(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
codecs := serializer.NewCodecFactory(scheme)
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
serverConfig := &server.Config{}
|
||||
serverConfig := server.NewConfig(codecs)
|
||||
etcdOptions := &EtcdOptions{
|
||||
EncryptionProviderConfigFilepath: tc.encryptionConfigPath,
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||
|
@ -35,6 +34,7 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
genericfeatures "k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
|
@ -72,31 +72,64 @@ func init() {
|
|||
dbMetricsMonitors = make(map[string]struct{})
|
||||
}
|
||||
|
||||
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
|
||||
func newETCD3HealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) {
|
||||
// constructing the etcd v3 client blocks and times out if etcd is not available.
|
||||
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
|
||||
|
||||
clientValue := &atomic.Value{}
|
||||
|
||||
clientErrMsg := &atomic.Value{}
|
||||
clientErrMsg.Store("etcd client connection not yet established")
|
||||
lock := sync.Mutex{}
|
||||
var client *clientv3.Client
|
||||
clientErr := fmt.Errorf("etcd client connection not yet established")
|
||||
|
||||
go wait.PollUntil(time.Second, func() (bool, error) {
|
||||
client, err := newETCD3Client(c.Transport)
|
||||
newClient, err := newETCD3Client(c.Transport)
|
||||
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
// Ensure that server is already not shutting down.
|
||||
select {
|
||||
case <-stopCh:
|
||||
if err == nil {
|
||||
newClient.Close()
|
||||
}
|
||||
return true, nil
|
||||
default:
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
clientErrMsg.Store(err.Error())
|
||||
clientErr = err
|
||||
return false, nil
|
||||
}
|
||||
clientValue.Store(client)
|
||||
clientErrMsg.Store("")
|
||||
client = newClient
|
||||
clientErr = nil
|
||||
return true, nil
|
||||
}, wait.NeverStop)
|
||||
}, stopCh)
|
||||
|
||||
// Close the client on shutdown.
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
<-stopCh
|
||||
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
if client != nil {
|
||||
client.Close()
|
||||
clientErr = fmt.Errorf("server is shutting down")
|
||||
}
|
||||
}()
|
||||
|
||||
return func() error {
|
||||
if errMsg := clientErrMsg.Load().(string); len(errMsg) > 0 {
|
||||
return fmt.Errorf(errMsg)
|
||||
// Given that client is closed on shutdown we hold the lock for
|
||||
// the entire period of healthcheck call to ensure that client will
|
||||
// not be closed during healthcheck.
|
||||
// Given that healthchecks has a 2s timeout, worst case of blocking
|
||||
// shutdown for additional 2s seems acceptable.
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
if clientErr != nil {
|
||||
return clientErr
|
||||
}
|
||||
client := clientValue.Load().(*clientv3.Client)
|
||||
|
||||
healthcheckTimeout := storagebackend.DefaultHealthcheckTimeout
|
||||
if c.HealthcheckTimeout != time.Duration(0) {
|
||||
healthcheckTimeout = c.HealthcheckTimeout
|
||||
|
|
|
@ -40,12 +40,12 @@ func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object) (
|
|||
}
|
||||
|
||||
// CreateHealthCheck creates a healthcheck function based on given config.
|
||||
func CreateHealthCheck(c storagebackend.Config) (func() error, error) {
|
||||
func CreateHealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) {
|
||||
switch c.Type {
|
||||
case storagebackend.StorageTypeETCD2:
|
||||
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3HealthCheck(c)
|
||||
return newETCD3HealthCheck(c, stopCh)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue