Merge pull request #125483 from wojtek-t/storage_readiness_hook

Implement resilient watchcache initialization post-start-hook

Kubernetes-commit: 79fee524e65ddc7c1448d5d2554c6f91233cf98d
This commit is contained in:
Kubernetes Publisher 2024-07-01 13:48:29 -07:00
commit 19c13772cd
13 changed files with 282 additions and 11 deletions

View File

@ -290,6 +290,12 @@ const (
// Enables support for watch bookmark events.
WatchBookmark featuregate.Feature = "WatchBookmark"
// owner: @wojtek-t
// beta: v1.31
//
// Enables post-start-hook for storage readiness
WatchCacheInitializationPostStartHook featuregate.Feature = "WatchCacheInitializationPostStartHook"
// owner: @serathius
// beta: 1.30
// Enables watches without resourceVersion to be served from storage.
@ -408,6 +414,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
WatchCacheInitializationPostStartHook: {Default: false, PreRelease: featuregate.Beta},
WatchFromStorageWithoutResourceVersion: {Default: false, PreRelease: featuregate.Beta},
InPlacePodVerticalScaling: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -226,6 +226,10 @@ type Store struct {
// storageVersionHash as empty in the discovery document.
StorageVersioner runtime.GroupVersioner
// ReadinessCheckFunc checks if the storage is ready for accepting requests.
// The field is optional, if set needs to be thread-safe.
ReadinessCheckFunc func() error
// DestroyFunc cleans up clients used by the underlying Storage; optional.
// If set, DestroyFunc has to be implemented in thread-safe way and
// be prepared for being called more than once.
@ -234,6 +238,7 @@ type Store struct {
// Note: the rest.StandardStorage interface aggregates the common REST verbs
var _ rest.StandardStorage = &Store{}
var _ rest.StorageWithReadiness = &Store{}
var _ rest.TableConvertor = &Store{}
var _ GenericStore = &Store{}
@ -292,6 +297,14 @@ func (e *Store) New() runtime.Object {
return e.NewFunc()
}
// ReadinessCheck checks if the storage is ready for accepting requests.
func (e *Store) ReadinessCheck() error {
if e.ReadinessCheckFunc != nil {
return e.ReadinessCheckFunc()
}
return nil
}
// Destroy cleans up its resources on shutdown.
func (e *Store) Destroy() {
if e.DestroyFunc != nil {
@ -1614,6 +1627,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
}
}
}
if e.Storage.Storage != nil {
e.ReadinessCheckFunc = e.Storage.Storage.ReadinessCheck
}
return nil
}

View File

@ -52,6 +52,8 @@ import (
// Storage is a generic interface for RESTful storage services.
// Resources which are exported to the RESTful API of apiserver need to implement this interface. It is expected
// that objects may implement any of the below interfaces.
//
// Consider using StorageWithReadiness whenever possible.
type Storage interface {
// New returns an empty object that can be used with Create and Update after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
@ -63,6 +65,14 @@ type Storage interface {
Destroy()
}
// StorageWithReadiness extends Storage interface with the readiness check.
type StorageWithReadiness interface {
Storage
// ReadinessCheck allows for checking storage readiness.
ReadinessCheck() error
}
// Scoper indicates what scope the resource is at. It must be specified.
// It is usually provided automatically based on your strategy.
type Scoper interface {

View File

@ -216,6 +216,10 @@ type Config struct {
// twice this value. Note that it is up to the request handlers to ignore or honor this timeout. In seconds.
MinRequestTimeout int
// StorageInitializationTimeout defines the maximum amount of time to wait for storage initialization
// before declaring apiserver ready.
StorageInitializationTimeout time.Duration
// This represents the maximum amount of time it should take for apiserver to complete its startup
// sequence and become healthy. From apiserver's start time to when this amount of time has
// elapsed, /livez will assume that unfinished post-start hooks will complete successfully and
@ -426,6 +430,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
MaxMutatingRequestsInFlight: 200,
RequestTimeout: time.Duration(60) * time.Second,
MinRequestTimeout: 1800,
StorageInitializationTimeout: time.Minute,
LivezGracePeriod: time.Duration(0),
ShutdownDelayDuration: time.Duration(0),
// 1.5MB is the default client request size in bytes
@ -824,6 +829,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
ShutdownSendRetryAfter: c.ShutdownSendRetryAfter,
APIServerID: c.APIServerID,
StorageReadinessHook: NewStorageReadinessHook(c.StorageInitializationTimeout),
StorageVersionManager: c.StorageVersionManager,
EffectiveVersion: c.EffectiveVersion,

View File

@ -233,6 +233,10 @@ type GenericAPIServer struct {
// APIServerID is the ID of this API server
APIServerID string
// StorageReadinessHook implements post-start-hook functionality for checking readiness
// of underlying storage for registered resources.
StorageReadinessHook *StorageReadinessHook
// StorageVersionManager holds the storage versions of the API resources installed by this server.
StorageVersionManager storageversion.Manager
@ -844,6 +848,7 @@ func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo
} else {
s.Handler.GoRestfulContainer.Add(legacyRootAPIHandler.WebService())
}
s.registerStorageReadinessCheck("", apiGroupInfo)
return nil
}
@ -902,10 +907,28 @@ func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) erro
s.DiscoveryGroupManager.AddGroup(apiGroup)
s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
s.registerStorageReadinessCheck(apiGroupInfo.PrioritizedVersions[0].Group, apiGroupInfo)
}
return nil
}
// registerStorageReadinessCheck registers the readiness checks for all underlying storages
// for a given APIGroup.
func (s *GenericAPIServer) registerStorageReadinessCheck(groupName string, apiGroupInfo *APIGroupInfo) {
for version, storageMap := range apiGroupInfo.VersionedResourcesStorageMap {
for resource, storage := range storageMap {
if withReadiness, ok := storage.(rest.StorageWithReadiness); ok {
gvr := metav1.GroupVersionResource{
Group: groupName,
Version: version,
Resource: resource,
}
s.StorageReadinessHook.RegisterStorage(gvr, withReadiness)
}
}
}
}
// InstallAPIGroup exposes the given api group in the API.
// The <apiGroupInfo> passed into this function shouldn't be used elsewhere as the
// underlying storage will be destroyed on this servers shutdown.

View File

@ -56,6 +56,7 @@ type ServerRunOptions struct {
GoawayChance float64
LivezGracePeriod time.Duration
MinRequestTimeout int
StorageInitializationTimeout time.Duration
ShutdownDelayDuration time.Duration
// We intentionally did not add a flag for this option. Users of the
// apiserver library can wire it to a flag.
@ -116,6 +117,7 @@ func NewServerRunOptionsForComponent(componentName string, componentGlobalsRegis
RequestTimeout: defaults.RequestTimeout,
LivezGracePeriod: defaults.LivezGracePeriod,
MinRequestTimeout: defaults.MinRequestTimeout,
StorageInitializationTimeout: defaults.StorageInitializationTimeout,
ShutdownDelayDuration: defaults.ShutdownDelayDuration,
ShutdownWatchTerminationGracePeriod: defaults.ShutdownWatchTerminationGracePeriod,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
@ -140,6 +142,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.RequestTimeout = s.RequestTimeout
c.GoawayChance = s.GoawayChance
c.MinRequestTimeout = s.MinRequestTimeout
c.StorageInitializationTimeout = s.StorageInitializationTimeout
c.ShutdownDelayDuration = s.ShutdownDelayDuration
c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes
c.MaxRequestBodyBytes = s.MaxRequestBodyBytes
@ -197,6 +200,10 @@ func (s *ServerRunOptions) Validate() []error {
errors = append(errors, fmt.Errorf("--min-request-timeout can not be negative value"))
}
if s.StorageInitializationTimeout < 0 {
errors = append(errors, fmt.Errorf("--storage-initialization-timeout can not be negative value"))
}
if s.ShutdownDelayDuration < 0 {
errors = append(errors, fmt.Errorf("--shutdown-delay-duration can not be negative value"))
}
@ -350,6 +357,9 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"handler, which picks a randomized value above this number as the connection timeout, "+
"to spread out load.")
fs.DurationVar(&s.StorageInitializationTimeout, "storage-initialization-timeout", s.StorageInitializationTimeout,
"Maximum amount of time to wait for storage initialization before declaring apiserver ready. Defaults to 1m.")
fs.DurationVar(&s.ShutdownDelayDuration, "shutdown-delay-duration", s.ShutdownDelayDuration, ""+
"Time to delay the termination. During that time the server keeps serving requests normally. The endpoints /healthz and /livez "+
"will return success, but /readyz immediately returns failure. Graceful termination starts after this delay "+

View File

@ -0,0 +1,91 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"errors"
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/klog/v2"
)
// StorageReadinessHook implements PostStartHook functionality for checking readiness
// of underlying storage for registered resources.
type StorageReadinessHook struct {
timeout time.Duration
lock sync.Mutex
checks map[string]func() error
}
// NewStorageReadinessHook created new StorageReadinessHook.
func NewStorageReadinessHook(timeout time.Duration) *StorageReadinessHook {
return &StorageReadinessHook{
checks: make(map[string]func() error),
timeout: timeout,
}
}
func (h *StorageReadinessHook) RegisterStorage(gvr metav1.GroupVersionResource, storage rest.StorageWithReadiness) {
h.lock.Lock()
defer h.lock.Unlock()
if _, ok := h.checks[gvr.String()]; !ok {
h.checks[gvr.String()] = storage.ReadinessCheck
} else {
klog.Errorf("Registering storage readiness hook for %s again: ", gvr.String())
}
}
func (h *StorageReadinessHook) check() bool {
h.lock.Lock()
defer h.lock.Unlock()
failedChecks := []string{}
for gvr, check := range h.checks {
if err := check(); err != nil {
failedChecks = append(failedChecks, gvr)
}
}
if len(failedChecks) == 0 {
klog.Infof("Storage is ready for all registered resources")
return true
}
klog.V(4).Infof("Storage is not ready for: %v", failedChecks)
return false
}
func (h *StorageReadinessHook) Hook(ctx PostStartHookContext) error {
deadlineCtx, cancel := context.WithTimeout(ctx, h.timeout)
defer cancel()
err := wait.PollUntilContextCancel(deadlineCtx, 100*time.Millisecond, true,
func(_ context.Context) (bool, error) {
if ok := h.check(); ok {
return true, nil
}
return false, nil
})
if errors.Is(err, context.DeadlineExceeded) {
klog.Warningf("Deadline exceeded while waiting for storage readiness... ignoring")
}
return nil
}

View File

@ -0,0 +1,85 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"context"
"fmt"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
type fakeReadinessStorage struct {
result error
}
func (s *fakeReadinessStorage) New() runtime.Object { return nil }
func (s *fakeReadinessStorage) Destroy() {}
func (s *fakeReadinessStorage) ReadinessCheck() error { return s.result }
func testGVR(index int) metav1.GroupVersionResource {
return metav1.GroupVersionResource{
Group: "group",
Version: "version",
Resource: fmt.Sprintf("resource-%d", index),
}
}
func TestStorageReadinessHook(t *testing.T) {
h := NewStorageReadinessHook(time.Second)
numChecks := 5
storages := make([]*fakeReadinessStorage, numChecks)
for i := 0; i < numChecks; i++ {
storages[i] = &fakeReadinessStorage{
result: fmt.Errorf("failed"),
}
h.RegisterStorage(testGVR(i), storages[i])
}
for i := 0; i < numChecks; i++ {
if ok := h.check(); ok {
t.Errorf("%d: unexpected check pass", i)
}
storages[i].result = nil
}
if ok := h.check(); !ok {
t.Errorf("unexpected check failure")
}
}
func TestStorageReadinessHookTimeout(t *testing.T) {
h := NewStorageReadinessHook(time.Second)
storage := &fakeReadinessStorage{
result: fmt.Errorf("failed"),
}
h.RegisterStorage(testGVR(0), storage)
ctx := context.Background()
hookCtx := PostStartHookContext{
LoopbackClientConfig: nil,
StopCh: ctx.Done(),
Context: ctx,
}
if err := h.Hook(hookCtx); err != nil {
t.Errorf("unexpected hook failure on timeout")
}
}

View File

@ -962,6 +962,14 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
// ReadinessCheck implements storage.Interface.
func (c *Cacher) ReadinessCheck() error {
if !c.ready.check() {
return storage.ErrStorageNotReady
}
return nil
}
// baseObjectThreadUnsafe omits locking for cachingObject.
func baseObjectThreadUnsafe(object runtime.Object) runtime.Object {
if co, ok := object.(*cachingObject); ok {

View File

@ -181,6 +181,9 @@ func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.O
func (d *dummyStorage) Count(_ string) (int64, error) {
return 0, fmt.Errorf("unimplemented")
}
func (d *dummyStorage) ReadinessCheck() error {
return nil
}
func (d *dummyStorage) injectError(err error) {
d.Lock()
defer d.Unlock()

View File

@ -25,7 +25,10 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
)
var ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
var (
ErrResourceVersionSetOnCreate = errors.New("resourceVersion should not be set on objects to be created")
ErrStorageNotReady = errors.New("storage not ready")
)
const (
ErrCodeKeyNotFound int = iota + 1

View File

@ -591,6 +591,11 @@ func (s *store) Count(key string) (int64, error) {
return getResp.Count, nil
}
// ReadinessCheck implements storage.Interface.
func (s *store) ReadinessCheck() error {
return nil
}
// resolveGetListRev is used by GetList to resolve the rev to use in the client.KV.Get request.
func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts storage.ListOptions) (int64, error) {
var withRev int64

View File

@ -243,6 +243,9 @@ type Interface interface {
// Count returns number of different entries under the key (generally being path prefix).
Count(key string) (int64, error)
// ReadinessCheck checks if the storage is ready for accepting requests.
ReadinessCheck() error
// RequestWatchProgress requests the a watch stream progress status be sent in the
// watch response stream as soon as possible.
// Used for monitor watch progress even if watching resources with no changes.