diff --git a/pkg/server/config.go b/pkg/server/config.go index f6fcff43b..ddff43e53 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -455,6 +455,12 @@ func (c *Config) AddHealthChecks(healthChecks ...healthz.HealthChecker) { c.ReadyzChecks = append(c.ReadyzChecks, healthChecks...) } +// AddReadyzChecks adds a health check to our config to be exposed by the readyz endpoint +// of our configured apiserver. +func (c *Config) AddReadyzChecks(healthChecks ...healthz.HealthChecker) { + c.ReadyzChecks = append(c.ReadyzChecks, healthChecks...) +} + // AddPostStartHook allows you to add a PostStartHook that will later be added to the server itself in a New call. // Name conflicts will cause an error. func (c *Config) AddPostStartHook(name string, hook PostStartHookFunc) error { diff --git a/pkg/server/options/etcd.go b/pkg/server/options/etcd.go index ed108badf..00c43324b 100644 --- a/pkg/server/options/etcd.go +++ b/pkg/server/options/etcd.go @@ -183,6 +183,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout, "The timeout to use when checking etcd health.") + fs.DurationVar(&s.StorageConfig.ReadycheckTimeout, "etcd-readycheck-timeout", s.StorageConfig.ReadycheckTimeout, + "The timeout to use when checking etcd readiness") + fs.Int64Var(&s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.") } @@ -234,6 +237,14 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error { return healthCheck() })) + readyCheck, err := storagefactory.CreateReadyCheck(s.StorageConfig, c.DrainedNotify()) + if err != nil { + return err + } + c.AddReadyzChecks(healthz.NamedCheck("etcd-readiness", func(r *http.Request) error { + return readyCheck() + })) + if s.EncryptionProviderConfigFilepath != "" { kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath) if err != nil { diff --git a/pkg/server/options/etcd_test.go b/pkg/server/options/etcd_test.go index bd454800a..0d380f076 100644 --- a/pkg/server/options/etcd_test.go +++ b/pkg/server/options/etcd_test.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/server" + "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -230,18 +231,57 @@ func TestKMSHealthzEndpoint(t *testing.T) { } for _, n := range tc.wantChecks { - found := false - for _, h := range serverConfig.HealthzChecks { - if n == h.Name() { - found = true - break - } - } - if !found { + if !hasCheck(n, serverConfig.HealthzChecks) { t.Errorf("Missing HealthzChecker %s", n) } - found = false } }) } } + +func TestReadinessCheck(t *testing.T) { + testCases := []struct { + name string + wantReadyzChecks []string + wantHealthzChecks []string + }{ + { + name: "Readyz should have etcd-readiness check", + wantReadyzChecks: []string{"etcd", "etcd-readiness"}, + wantHealthzChecks: []string{"etcd"}, + }, + } + + scheme := runtime.NewScheme() + codecs := serializer.NewCodecFactory(scheme) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + serverConfig := server.NewConfig(codecs) + etcdOptions := &EtcdOptions{} + if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil { + t.Fatalf("Failed to add healthz error: %v", err) + } + + for _, n := range tc.wantReadyzChecks { + if !hasCheck(n, serverConfig.ReadyzChecks) { + t.Errorf("Missing ReadyzChecker %s", n) + } + } + for _, n := range tc.wantHealthzChecks { + if !hasCheck(n, serverConfig.HealthzChecks) { + t.Errorf("Missing HealthzChecker %s", n) + } + } + }) + } +} + +func hasCheck(want string, healthchecks []healthz.HealthChecker) bool { + for _, h := range healthchecks { + if want == h.Name() { + return true + } + } + return false +} diff --git a/pkg/storage/storagebackend/config.go b/pkg/storage/storagebackend/config.go index aa4163877..3dc7d0c00 100644 --- a/pkg/storage/storagebackend/config.go +++ b/pkg/storage/storagebackend/config.go @@ -36,6 +36,7 @@ const ( DefaultCompactInterval = 5 * time.Minute DefaultDBMetricPollInterval = 30 * time.Second DefaultHealthcheckTimeout = 2 * time.Second + DefaultReadinessTimeout = 2 * time.Second ) // TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to. @@ -84,6 +85,8 @@ type Config struct { DBMetricPollInterval time.Duration // HealthcheckTimeout specifies the timeout used when checking health HealthcheckTimeout time.Duration + // ReadycheckTimeout specifies the timeout used when checking readiness + ReadycheckTimeout time.Duration LeaseManagerConfig etcd3.LeaseManagerConfig @@ -117,6 +120,7 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { CompactionInterval: DefaultCompactInterval, DBMetricPollInterval: DefaultDBMetricPollInterval, HealthcheckTimeout: DefaultHealthcheckTimeout, + ReadycheckTimeout: DefaultReadinessTimeout, LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(), } } diff --git a/pkg/storage/storagebackend/factory/etcd3.go b/pkg/storage/storagebackend/factory/etcd3.go index eb95c0fac..07b61d357 100644 --- a/pkg/storage/storagebackend/factory/etcd3.go +++ b/pkg/storage/storagebackend/factory/etcd3.go @@ -73,6 +73,22 @@ func init() { } func newETCD3HealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) { + timeout := storagebackend.DefaultHealthcheckTimeout + if c.HealthcheckTimeout != time.Duration(0) { + timeout = c.HealthcheckTimeout + } + return newETCD3Check(c, timeout, stopCh) +} + +func newETCD3ReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) { + timeout := storagebackend.DefaultReadinessTimeout + if c.ReadycheckTimeout != time.Duration(0) { + timeout = c.ReadycheckTimeout + } + return newETCD3Check(c, timeout, stopCh) +} + +func newETCD3Check(c storagebackend.Config, timeout time.Duration, stopCh <-chan struct{}) (func() error, error) { // constructing the etcd v3 client blocks and times out if etcd is not available. // retry in a loop in the background until we successfully create the client, storing the client or error encountered @@ -129,23 +145,18 @@ func newETCD3HealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func( if clientErr != nil { return clientErr } - - healthcheckTimeout := storagebackend.DefaultHealthcheckTimeout - if c.HealthcheckTimeout != time.Duration(0) { - healthcheckTimeout = c.HealthcheckTimeout - } - ctx, cancel := context.WithTimeout(context.Background(), healthcheckTimeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // See https://github.com/etcd-io/etcd/blob/c57f8b3af865d1b531b979889c602ba14377420e/etcdctl/ctlv3/command/ep_command.go#L118 _, err := client.Get(ctx, path.Join("/", c.Prefix, "health")) if err == nil { return nil } - return fmt.Errorf("error getting data from etcd: %v", err) + return fmt.Errorf("error getting data from etcd: %w", err) }, nil } -func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) { +var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { tlsInfo := transport.TLSInfo{ CertFile: c.CertFile, KeyFile: c.KeyFile, diff --git a/pkg/storage/storagebackend/factory/factory.go b/pkg/storage/storagebackend/factory/factory.go index d1d349216..4c8a409d6 100644 --- a/pkg/storage/storagebackend/factory/factory.go +++ b/pkg/storage/storagebackend/factory/factory.go @@ -50,3 +50,14 @@ func CreateHealthCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() return nil, fmt.Errorf("unknown storage type: %s", c.Type) } } + +func CreateReadyCheck(c storagebackend.Config, stopCh <-chan struct{}) (func() error, error) { + switch c.Type { + case storagebackend.StorageTypeETCD2: + return nil, fmt.Errorf("%s is no longer a supported storage backend", c.Type) + case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: + return newETCD3ReadyCheck(c, stopCh) + default: + return nil, fmt.Errorf("unknown storage type: %s", c.Type) + } +} diff --git a/pkg/storage/storagebackend/factory/factory_test.go b/pkg/storage/storagebackend/factory/factory_test.go new file mode 100644 index 000000000..963d24c91 --- /dev/null +++ b/pkg/storage/storagebackend/factory/factory_test.go @@ -0,0 +1,235 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "context" + "errors" + "testing" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/apiserver/pkg/storage/etcd3/testserver" + "k8s.io/apiserver/pkg/storage/storagebackend" +) + +type mockKV struct { + get func(ctx context.Context) (*clientv3.GetResponse, error) +} + +func (mkv mockKV) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + return nil, nil +} +func (mkv mockKV) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + return mkv.get(ctx) +} +func (mockKV) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + return nil, nil +} +func (mockKV) Compact(ctx context.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) { + return nil, nil +} +func (mockKV) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { + return clientv3.OpResponse{}, nil +} +func (mockKV) Txn(ctx context.Context) clientv3.Txn { + return nil +} + +func TestCreateHealthcheck(t *testing.T) { + etcdConfig := testserver.NewTestConfig(t) + client := testserver.RunEtcd(t, etcdConfig) + newETCD3ClientFn := newETCD3Client + defer func() { + newETCD3Client = newETCD3ClientFn + }() + tests := []struct { + name string + cfg storagebackend.Config + want error + responseTime time.Duration + }{ + { + name: "ok if response time lower than default timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + }, + responseTime: 1 * time.Second, + want: nil, + }, + { + name: "ok if response time lower than custom timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + HealthcheckTimeout: 5 * time.Second, + }, + responseTime: 3 * time.Second, + want: nil, + }, + { + name: "timeouts if response time higher than default timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + }, + responseTime: 3 * time.Second, + want: context.DeadlineExceeded, + }, + { + name: "timeouts if response time higher than custom timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + HealthcheckTimeout: 3 * time.Second, + }, + responseTime: 5 * time.Second, + want: context.DeadlineExceeded, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.cfg.Transport.ServerList = client.Endpoints() + newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + dummyKV := mockKV{ + get: func(ctx context.Context) (*clientv3.GetResponse, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(tc.responseTime): + return nil, nil + } + }, + } + client.KV = dummyKV + return client, nil + } + stop := make(chan struct{}) + healthcheck, err := CreateHealthCheck(tc.cfg, stop) + if err != nil { + t.Fatal(err) + } + // Wait for healthcheck to establish connection + time.Sleep(2 * time.Second) + + got := healthcheck() + + if !errors.Is(got, tc.want) { + t.Errorf("healthcheck() missmatch want %v got %v", tc.want, got) + } + }) + } +} + +func TestCreateReadycheck(t *testing.T) { + etcdConfig := testserver.NewTestConfig(t) + client := testserver.RunEtcd(t, etcdConfig) + newETCD3ClientFn := newETCD3Client + defer func() { + newETCD3Client = newETCD3ClientFn + }() + tests := []struct { + name string + cfg storagebackend.Config + want error + responseTime time.Duration + }{ + { + name: "ok if response time lower than default timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + }, + responseTime: 1 * time.Second, + want: nil, + }, + { + name: "ok if response time lower than custom timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + ReadycheckTimeout: 5 * time.Second, + }, + responseTime: 3 * time.Second, + want: nil, + }, + { + name: "timeouts if response time higher than default timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + }, + responseTime: 3 * time.Second, + want: context.DeadlineExceeded, + }, + { + name: "timeouts if response time higher than custom timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + ReadycheckTimeout: 3 * time.Second, + }, + responseTime: 5 * time.Second, + want: context.DeadlineExceeded, + }, + { + name: "timeouts if response time higher than default timeout with custom healthcheck timeout", + cfg: storagebackend.Config{ + Type: storagebackend.StorageTypeETCD3, + Transport: storagebackend.TransportConfig{}, + HealthcheckTimeout: 10 * time.Second, + }, + responseTime: 3 * time.Second, + want: context.DeadlineExceeded, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.cfg.Transport.ServerList = client.Endpoints() + newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) { + dummyKV := mockKV{ + get: func(ctx context.Context) (*clientv3.GetResponse, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(tc.responseTime): + return nil, nil + } + }, + } + client.KV = dummyKV + return client, nil + } + stop := make(chan struct{}) + healthcheck, err := CreateReadyCheck(tc.cfg, stop) + if err != nil { + t.Fatal(err) + } + // Wait for healthcheck to establish connection + time.Sleep(2 * time.Second) + + got := healthcheck() + + if !errors.Is(got, tc.want) { + t.Errorf("healthcheck() missmatch want %v got %v", tc.want, got) + } + }) + } +}