Add additional etcd check to readyz with 2 seconds timeout.
Kubernetes-commit: b42045a64fd07fb948660839b6c7c14440bee9df
This commit is contained in:
parent
f53b829906
commit
cb0bb2af35
|
@ -455,6 +455,12 @@ func (c *Config) AddHealthChecks(healthChecks ...healthz.HealthChecker) {
|
|||
c.ReadyzChecks = append(c.ReadyzChecks, healthChecks...)
|
||||
}
|
||||
|
||||
// AddReadyzChecks adds a health check to our config to be exposed by the readyz endpoint
|
||||
// of our configured apiserver.
|
||||
func (c *Config) AddReadyzChecks(healthChecks ...healthz.HealthChecker) {
|
||||
c.ReadyzChecks = append(c.ReadyzChecks, healthChecks...)
|
||||
}
|
||||
|
||||
// AddPostStartHook allows you to add a PostStartHook that will later be added to the server itself in a New call.
|
||||
// Name conflicts will cause an error.
|
||||
func (c *Config) AddPostStartHook(name string, hook PostStartHookFunc) error {
|
||||
|
|
|
@ -183,6 +183,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
|||
fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout,
|
||||
"The timeout to use when checking etcd health.")
|
||||
|
||||
fs.DurationVar(&s.StorageConfig.ReadycheckTimeout, "etcd-readycheck-timeout", s.StorageConfig.ReadycheckTimeout,
|
||||
"The timeout to use when checking etcd readiness")
|
||||
|
||||
fs.Int64Var(&s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds,
|
||||
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
|
||||
}
|
||||
|
@ -234,6 +237,14 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
|
|||
return healthCheck()
|
||||
}))
|
||||
|
||||
readyCheck, err := storagefactory.CreateReadyCheck(s.StorageConfig, c.DrainedNotify())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.AddReadyzChecks(healthz.NamedCheck("etcd-readiness", func(r *http.Request) error {
|
||||
return readyCheck()
|
||||
}))
|
||||
|
||||
if s.EncryptionProviderConfigFilepath != "" {
|
||||
kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath)
|
||||
if err != nil {
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
|
@ -230,18 +231,57 @@ func TestKMSHealthzEndpoint(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, n := range tc.wantChecks {
|
||||
found := false
|
||||
for _, h := range serverConfig.HealthzChecks {
|
||||
if n == h.Name() {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
if !hasCheck(n, serverConfig.HealthzChecks) {
|
||||
t.Errorf("Missing HealthzChecker %s", n)
|
||||
}
|
||||
found = false
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadinessCheck(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
wantReadyzChecks []string
|
||||
wantHealthzChecks []string
|
||||
}{
|
||||
{
|
||||
name: "Readyz should have etcd-readiness check",
|
||||
wantReadyzChecks: []string{"etcd", "etcd-readiness"},
|
||||
wantHealthzChecks: []string{"etcd"},
|
||||
},
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
codecs := serializer.NewCodecFactory(scheme)
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
serverConfig := server.NewConfig(codecs)
|
||||
etcdOptions := &EtcdOptions{}
|
||||
if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil {
|
||||
t.Fatalf("Failed to add healthz error: %v", err)
|
||||
}
|
||||
|
||||
for _, n := range tc.wantReadyzChecks {
|
||||
if !hasCheck(n, serverConfig.ReadyzChecks) {
|
||||
t.Errorf("Missing ReadyzChecker %s", n)
|
||||
}
|
||||
}
|
||||
for _, n := range tc.wantHealthzChecks {
|
||||
if !hasCheck(n, serverConfig.HealthzChecks) {
|
||||
t.Errorf("Missing HealthzChecker %s", n)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func hasCheck(want string, healthchecks []healthz.HealthChecker) bool {
|
||||
for _, h := range healthchecks {
|
||||
if want == h.Name() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ const (
|
|||
DefaultCompactInterval = 5 * time.Minute
|
||||
DefaultDBMetricPollInterval = 30 * time.Second
|
||||
DefaultHealthcheckTimeout = 2 * time.Second
|
||||
DefaultReadinessTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
// TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
|
||||
|
@ -84,6 +85,8 @@ type Config struct {
|
|||
DBMetricPollInterval time.Duration
|
||||
// HealthcheckTimeout specifies the timeout used when checking health
|
||||
HealthcheckTimeout time.Duration
|
||||
// ReadycheckTimeout specifies the timeout used when checking readiness
|
||||
ReadycheckTimeout time.Duration
|
||||
|
||||
LeaseManagerConfig etcd3.LeaseManagerConfig
|
||||
|
||||
|
@ -117,6 +120,7 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
|||
CompactionInterval: DefaultCompactInterval,
|
||||
DBMetricPollInterval: DefaultDBMetricPollInterval,
|
||||
HealthcheckTimeout: DefaultHealthcheckTimeout,
|
||||
ReadycheckTimeout: DefaultReadinessTimeout,
|
||||
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,6 +73,22 @@ func init() {
|
|||
}
|
||||
|
||||
func newETCD3HealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) {
|
||||
timeout := storagebackend.DefaultHealthcheckTimeout
|
||||
if c.HealthcheckTimeout != time.Duration(0) {
|
||||
timeout = c.HealthcheckTimeout
|
||||
}
|
||||
return newETCD3Check(c, timeout, stopCh)
|
||||
}
|
||||
|
||||
func newETCD3ReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) {
|
||||
timeout := storagebackend.DefaultReadinessTimeout
|
||||
if c.ReadycheckTimeout != time.Duration(0) {
|
||||
timeout = c.ReadycheckTimeout
|
||||
}
|
||||
return newETCD3Check(c, timeout, stopCh)
|
||||
}
|
||||
|
||||
func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan struct{}) (func() error, error) {
|
||||
// constructing the etcd v3 client blocks and times out if etcd is not available.
|
||||
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
|
||||
|
||||
|
@ -129,23 +145,18 @@ func newETCD3HealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func(
|
|||
if clientErr != nil {
|
||||
return clientErr
|
||||
}
|
||||
|
||||
healthcheckTimeout := storagebackend.DefaultHealthcheckTimeout
|
||||
if c.HealthcheckTimeout != time.Duration(0) {
|
||||
healthcheckTimeout = c.HealthcheckTimeout
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), healthcheckTimeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
// See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118
|
||||
_, err := client.Get(ctx, path.Join("/", c.Prefix, "health"))
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("error getting data from etcd: %v", err)
|
||||
return fmt.Errorf("error getting data from etcd: %w", err)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
||||
var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: c.CertFile,
|
||||
KeyFile: c.KeyFile,
|
||||
|
|
|
@ -50,3 +50,14 @@ func CreateHealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func()
|
|||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func CreateReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) {
|
||||
switch c.Type {
|
||||
case storagebackend.StorageTypeETCD2:
|
||||
return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
return newETCD3ReadyCheck(c, stopCh)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,235 @@
|
|||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package factory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
type mockKV struct {
|
||||
get func(ctx context.Context) (*clientv3.GetResponse, error)
|
||||
}
|
||||
|
||||
func (mkv mockKV) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (mkv mockKV) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
||||
return mkv.get(ctx)
|
||||
}
|
||||
func (mockKV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (mockKV) Compact(ctx context.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (mockKV) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
|
||||
return clientv3.OpResponse{}, nil
|
||||
}
|
||||
func (mockKV) Txn(ctx context.Context) clientv3.Txn {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestCreateHealthcheck(t *testing.T) {
|
||||
etcdConfig := testserver.NewTestConfig(t)
|
||||
client := testserver.RunEtcd(t, etcdConfig)
|
||||
newETCD3ClientFn := newETCD3Client
|
||||
defer func() {
|
||||
newETCD3Client = newETCD3ClientFn
|
||||
}()
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg storagebackend.Config
|
||||
want error
|
||||
responseTime time.Duration
|
||||
}{
|
||||
{
|
||||
name: "ok if response time lower than default timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
},
|
||||
responseTime: 1 * time.Second,
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "ok if response time lower than custom timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
HealthcheckTimeout: 5 * time.Second,
|
||||
},
|
||||
responseTime: 3 * time.Second,
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "timeouts if response time higher than default timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
},
|
||||
responseTime: 3 * time.Second,
|
||||
want: context.DeadlineExceeded,
|
||||
},
|
||||
{
|
||||
name: "timeouts if response time higher than custom timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
HealthcheckTimeout: 3 * time.Second,
|
||||
},
|
||||
responseTime: 5 * time.Second,
|
||||
want: context.DeadlineExceeded,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
tc.cfg.Transport.ServerList = client.Endpoints()
|
||||
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
||||
dummyKV := mockKV{
|
||||
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(tc.responseTime):
|
||||
return nil, nil
|
||||
}
|
||||
},
|
||||
}
|
||||
client.KV = dummyKV
|
||||
return client, nil
|
||||
}
|
||||
stop := make(chan struct{})
|
||||
healthcheck, err := CreateHealthCheck(tc.cfg, stop)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Wait for healthcheck to establish connection
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
got := healthcheck()
|
||||
|
||||
if !errors.Is(got, tc.want) {
|
||||
t.Errorf("healthcheck() missmatch want %v got %v", tc.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateReadycheck(t *testing.T) {
|
||||
etcdConfig := testserver.NewTestConfig(t)
|
||||
client := testserver.RunEtcd(t, etcdConfig)
|
||||
newETCD3ClientFn := newETCD3Client
|
||||
defer func() {
|
||||
newETCD3Client = newETCD3ClientFn
|
||||
}()
|
||||
tests := []struct {
|
||||
name string
|
||||
cfg storagebackend.Config
|
||||
want error
|
||||
responseTime time.Duration
|
||||
}{
|
||||
{
|
||||
name: "ok if response time lower than default timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
},
|
||||
responseTime: 1 * time.Second,
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "ok if response time lower than custom timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
ReadycheckTimeout: 5 * time.Second,
|
||||
},
|
||||
responseTime: 3 * time.Second,
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "timeouts if response time higher than default timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
},
|
||||
responseTime: 3 * time.Second,
|
||||
want: context.DeadlineExceeded,
|
||||
},
|
||||
{
|
||||
name: "timeouts if response time higher than custom timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
ReadycheckTimeout: 3 * time.Second,
|
||||
},
|
||||
responseTime: 5 * time.Second,
|
||||
want: context.DeadlineExceeded,
|
||||
},
|
||||
{
|
||||
name: "timeouts if response time higher than default timeout with custom healthcheck timeout",
|
||||
cfg: storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{},
|
||||
HealthcheckTimeout: 10 * time.Second,
|
||||
},
|
||||
responseTime: 3 * time.Second,
|
||||
want: context.DeadlineExceeded,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
tc.cfg.Transport.ServerList = client.Endpoints()
|
||||
newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
|
||||
dummyKV := mockKV{
|
||||
get: func(ctx context.Context) (*clientv3.GetResponse, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(tc.responseTime):
|
||||
return nil, nil
|
||||
}
|
||||
},
|
||||
}
|
||||
client.KV = dummyKV
|
||||
return client, nil
|
||||
}
|
||||
stop := make(chan struct{})
|
||||
healthcheck, err := CreateReadyCheck(tc.cfg, stop)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Wait for healthcheck to establish connection
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
got := healthcheck()
|
||||
|
||||
if !errors.Is(got, tc.want) {
|
||||
t.Errorf("healthcheck() missmatch want %v got %v", tc.want, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue