add readyz endpoint for kube-apiserver readiness checks

add startup sequence duration and readyz endpoint

add rbac bootstrapping policy for readyz

add integration test around grace period and readyz

rename startup sequence duration flag

copy health checks to fields

rename health-check installed boolean, refactor clock injection logic

cleanup clock injection code

remove todo about poststarthook url registration from healthz

Kubernetes-commit: 54dcf5c9c46fc4782d4861936309349b5a71a1ac
This commit is contained in:
Han Kang 2019-05-30 11:19:49 -07:00 committed by Kubernetes Publisher
parent 3722cb6855
commit 7fd71e31ef
9 changed files with 249 additions and 17 deletions

View File

@ -32,11 +32,11 @@ import (
jsonpatch "github.com/evanphx/json-patch"
"github.com/go-openapi/spec"
"github.com/pborman/uuid"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
"k8s.io/apimachinery/pkg/version"
@ -65,6 +65,7 @@ import (
restclient "k8s.io/client-go/rest"
certutil "k8s.io/client-go/util/cert"
"k8s.io/component-base/logs"
"k8s.io/klog"
openapicommon "k8s.io/kube-openapi/pkg/common"
// install apis
@ -135,6 +136,8 @@ type Config struct {
DiscoveryAddresses discovery.Addresses
// The default set of healthz checks. There might be more added via AddHealthzChecks dynamically.
HealthzChecks []healthz.HealthzChecker
// The default set of readyz-only checks. There might be more added via AddReadyzChecks dynamically.
ReadyzChecks []healthz.HealthzChecker
// LegacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
// to InstallLegacyAPIGroup. New API servers don't generally have legacy groups at all.
LegacyAPIGroupPrefixes sets.String
@ -156,6 +159,12 @@ type Config struct {
// If specified, long running requests such as watch will be allocated a random timeout between this value, and
// twice this value. Note that it is up to the request handlers to ignore or honor this timeout. In seconds.
MinRequestTimeout int
// This represents the maximum amount of time it should take for apiserver to complete its startup
// sequence and become healthy. From apiserver's start time to when this amount of time has
// elapsed, /healthz will assume that unfinished post-start hooks will complete successfully and
// therefore return true.
MaxStartupSequenceDuration time.Duration
// The limit on the total size increase all "copy" operations in a json
// patch may cause.
// This affects all places that applies json patch in the binary.
@ -256,13 +265,15 @@ type AuthorizationInfo struct {
// NewConfig returns a Config struct with the default values
func NewConfig(codecs serializer.CodecFactory) *Config {
defaultHealthChecks := []healthz.HealthzChecker{healthz.PingHealthz, healthz.LogHealthz}
return &Config{
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
HandlerChainWaitGroup: new(utilwaitgroup.SafeWaitGroup),
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz, healthz.LogHealthz},
HealthzChecks: append([]healthz.HealthzChecker{}, defaultHealthChecks...),
ReadyzChecks: append([]healthz.HealthzChecker{}, defaultHealthChecks...),
EnableIndex: true,
EnableDiscovery: true,
EnableProfiling: true,
@ -271,6 +282,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
MaxMutatingRequestsInFlight: 200,
RequestTimeout: time.Duration(60) * time.Second,
MinRequestTimeout: 1800,
MaxStartupSequenceDuration: time.Duration(0),
// 10MB is the recommended maximum client request size in bytes
// the etcd server should accept. See
// https://github.com/etcd-io/etcd/blob/release-3.3/etcdserver/server.go#L90.
@ -479,7 +491,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
ShutdownTimeout: c.RequestTimeout,
SecureServingInfo: c.SecureServing,
ExternalAddress: c.ExternalAddress,
@ -493,12 +504,16 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
preShutdownHooks: map[string]preShutdownHookEntry{},
disabledPostStartHooks: c.DisabledPostStartHooks,
healthzChecks: c.HealthzChecks,
healthzChecks: c.HealthzChecks,
readyzChecks: c.ReadyzChecks,
readinessStopCh: make(chan struct{}),
maxStartupSequenceDuration: c.MaxStartupSequenceDuration,
DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
enableAPIResponseCompression: c.EnableAPIResponseCompression,
maxRequestBodyBytes: c.MaxRequestBodyBytes,
healthzClock: clock.RealClock{},
}
for {
@ -546,6 +561,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
}
s.healthzChecks = append(s.healthzChecks, delegateCheck)
s.readyzChecks = append(s.readyzChecks, delegateCheck)
}
s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget}

View File

@ -115,7 +115,15 @@ func TestNewWithDelegate(t *testing.T) {
"/healthz/poststarthook/generic-apiserver-start-informers",
"/healthz/poststarthook/wrapping-post-start-hook",
"/healthz/wrapping-health",
"/metrics"
"/metrics",
"/readyz",
"/readyz/delegate-health",
"/readyz/log",
"/readyz/ping",
"/readyz/poststarthook/delegate-post-start-hook",
"/readyz/poststarthook/generic-apiserver-start-informers",
"/readyz/poststarthook/wrapping-post-start-hook",
"/readyz/shutdown"
]
}`, t)
checkPath(server.URL+"/healthz", http.StatusInternalServerError, `[+]ping ok

View File

@ -26,13 +26,13 @@ import (
systemd "github.com/coreos/go-systemd/daemon"
"github.com/go-openapi/spec"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
"k8s.io/apiserver/pkg/admission"
@ -45,6 +45,7 @@ import (
"k8s.io/apiserver/pkg/server/routes"
utilopenapi "k8s.io/apiserver/pkg/util/openapi"
restclient "k8s.io/client-go/rest"
"k8s.io/klog"
openapibuilder "k8s.io/kube-openapi/pkg/builder"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/handler"
@ -145,9 +146,17 @@ type GenericAPIServer struct {
preShutdownHooksCalled bool
// healthz checks
healthzLock sync.Mutex
healthzChecks []healthz.HealthzChecker
healthzCreated bool
healthzLock sync.Mutex
healthzChecks []healthz.HealthzChecker
healthzChecksInstalled bool
readyzLock sync.Mutex
readyzChecks []healthz.HealthzChecker
readyzChecksInstalled bool
maxStartupSequenceDuration time.Duration
healthzClock clock.Clock
// the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this
// will cause readyz to return unhealthy.
readinessStopCh chan struct{}
// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
@ -259,6 +268,7 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
}
s.installHealthz()
s.installReadyz(s.readinessStopCh)
// Register audit backend preShutdownHook.
if s.AuditBackend != nil {
@ -327,6 +337,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
// ensure cleanup.
go func() {
<-stopCh
close(s.readinessStopCh)
close(internalStopCh)
if stoppedCh != nil {
<-stoppedCh

View File

@ -18,20 +18,30 @@ package server
import (
"fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apiserver/pkg/server/healthz"
)
// AddHealthzCheck allows you to add a HealthzCheck.
// AddHealthzCheck adds HealthzCheck(s) to both healthz and readyz. All healthz checks
// are automatically added to readyz, since we want to avoid the situation where the
// apiserver is ready but not live.
func (s *GenericAPIServer) AddHealthzChecks(checks ...healthz.HealthzChecker) error {
s.healthzLock.Lock()
defer s.healthzLock.Unlock()
return s.AddDelayedHealthzChecks(0, checks...)
}
if s.healthzCreated {
return fmt.Errorf("unable to add because the healthz endpoint has already been created")
// AddReadyzChecks allows you to add a HealthzCheck to readyz.
func (s *GenericAPIServer) AddReadyzChecks(checks ...healthz.HealthzChecker) error {
s.readyzLock.Lock()
defer s.readyzLock.Unlock()
if s.readyzChecksInstalled {
return fmt.Errorf("unable to add because the readyz endpoint has already been created")
}
s.healthzChecks = append(s.healthzChecks, checks...)
s.readyzChecks = append(s.readyzChecks, checks...)
return nil
}
@ -39,7 +49,81 @@ func (s *GenericAPIServer) AddHealthzChecks(checks ...healthz.HealthzChecker) er
func (s *GenericAPIServer) installHealthz() {
s.healthzLock.Lock()
defer s.healthzLock.Unlock()
s.healthzCreated = true
s.healthzChecksInstalled = true
healthz.InstallHandler(s.Handler.NonGoRestfulMux, s.healthzChecks...)
}
// installReadyz creates the readyz endpoint for this server.
func (s *GenericAPIServer) installReadyz(stopCh <-chan struct{}) {
s.AddReadyzChecks(shutdownCheck{stopCh})
s.readyzLock.Lock()
defer s.readyzLock.Unlock()
s.readyzChecksInstalled = true
healthz.InstallReadyzHandler(s.Handler.NonGoRestfulMux, s.readyzChecks...)
}
// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences
// for the apiserver.
type shutdownCheck struct {
StopCh <-chan struct{}
}
func (shutdownCheck) Name() string {
return "shutdown"
}
func (c shutdownCheck) Check(req *http.Request) error {
select {
case <-c.StopCh:
return fmt.Errorf("process is shutting down")
default:
}
return nil
}
// AddDelayedHealthzChecks adds a health check to both healthz and readyz. The delay parameter
// allows you to set the grace period for healthz checks, which will return healthy while
// grace period has not yet elapsed. One may want to set a grace period in order to prevent
// the kubelet from restarting the kube-apiserver due to long-ish boot sequences. Readyz health
// checks have no grace period, since we want readyz to fail while boot has not completed.
func (s *GenericAPIServer) AddDelayedHealthzChecks(delay time.Duration, checks ...healthz.HealthzChecker) error {
s.healthzLock.Lock()
defer s.healthzLock.Unlock()
if s.healthzChecksInstalled {
return fmt.Errorf("unable to add because the healthz endpoint has already been created")
}
for _, check := range checks {
s.healthzChecks = append(s.healthzChecks, delayedHealthCheck(check, s.healthzClock, s.maxStartupSequenceDuration))
}
return s.AddReadyzChecks(checks...)
}
// delayedHealthCheck wraps a health check which will not fail until the explicitly defined delay has elapsed.
func delayedHealthCheck(check healthz.HealthzChecker, clock clock.Clock, delay time.Duration) healthz.HealthzChecker {
return delayedHealthzCheck{
check,
clock.Now().Add(delay),
clock,
}
}
type delayedHealthzCheck struct {
check healthz.HealthzChecker
startCheck time.Time
clock clock.Clock
}
func (c delayedHealthzCheck) Name() string {
return c.check.Name()
}
func (c delayedHealthzCheck) Check(req *http.Request) error {
if c.clock.Now().After(c.startCheck) {
return c.check.Check(req)
}
return nil
}

View File

@ -93,6 +93,14 @@ func InstallHandler(mux mux, checks ...HealthzChecker) {
InstallPathHandler(mux, "/healthz", checks...)
}
// InstallReadyzHandler registers handlers for health checking on the path
// "/readyz" to mux. *All handlers* for mux must be specified in
// exactly one call to InstallHandler. Calling InstallHandler more
// than once for the same mux will result in a panic.
func InstallReadyzHandler(mux mux, checks ...HealthzChecker) {
InstallPathHandler(mux, "/readyz", checks...)
}
// InstallPathHandler registers handlers for health checking on
// a specific path to mux. *All handlers* for the path must be
// specified in exactly one call to InstallPathHandler. Calling

View File

@ -0,0 +1,77 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
"time"
"k8s.io/apimachinery/pkg/util/clock"
)
func TestDelayedHealthCheck(t *testing.T) {
t.Run("test that liveness check returns true until the delay has elapsed", func(t *testing.T) {
t0 := time.Unix(0, 0)
c := clock.NewFakeClock(t0)
doneCh := make(chan struct{})
healthCheck := delayedHealthCheck(postStartHookHealthz{"test", doneCh}, c, time.Duration(10)*time.Second)
err := healthCheck.Check(nil)
if err != nil {
t.Errorf("Got %v, expected no error", err)
}
c.Step(10 * time.Second)
err = healthCheck.Check(nil)
if err != nil {
t.Errorf("Got %v, expected no error", err)
}
c.Step(1 * time.Millisecond)
err = healthCheck.Check(nil)
if err == nil || err.Error() != "not finished" {
t.Errorf("Got '%v', but expected error to be 'not finished'", err)
}
close(doneCh)
err = healthCheck.Check(nil)
if err != nil {
t.Errorf("Got %v, expected no error", err)
}
})
t.Run("test that liveness check does not toggle false even if done channel is closed early", func(t *testing.T) {
t0 := time.Unix(0, 0)
c := clock.NewFakeClock(t0)
doneCh := make(chan struct{})
healthCheck := delayedHealthCheck(postStartHookHealthz{"test", doneCh}, c, time.Duration(10)*time.Second)
err := healthCheck.Check(nil)
if err != nil {
t.Errorf("Got %v, expected no error", err)
}
close(doneCh)
c.Step(10 * time.Second)
err = healthCheck.Check(nil)
if err != nil {
t.Errorf("Got %v, expected no error", err)
}
c.Step(1 * time.Millisecond)
err = healthCheck.Check(nil)
if err != nil {
t.Errorf("Got %v, expected no error", err)
}
})
}

View File

@ -97,7 +97,7 @@ func (s *GenericAPIServer) AddPostStartHook(name string, hook PostStartHookFunc)
// done is closed when the poststarthook is finished. This is used by the health check to be able to indicate
// that the poststarthook is finished
done := make(chan struct{})
if err := s.AddHealthzChecks(postStartHookHealthz{name: "poststarthook/" + name, done: done}); err != nil {
if err := s.AddDelayedHealthzChecks(s.maxStartupSequenceDuration, postStartHookHealthz{name: "poststarthook/" + name, done: done}); err != nil {
return err
}
s.postStartHooks[name] = postStartHookEntry{hook: hook, originatingStack: string(debug.Stack()), done: done}

View File

@ -41,6 +41,7 @@ type ServerRunOptions struct {
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
RequestTimeout time.Duration
MaxStartupSequenceDuration time.Duration
MinRequestTimeout int
// We intentionally did not add a flag for this option. Users of the
// apiserver library can wire it to a flag.
@ -60,6 +61,7 @@ func NewServerRunOptions() *ServerRunOptions {
MaxRequestsInFlight: defaults.MaxRequestsInFlight,
MaxMutatingRequestsInFlight: defaults.MaxMutatingRequestsInFlight,
RequestTimeout: defaults.RequestTimeout,
MaxStartupSequenceDuration: defaults.MaxStartupSequenceDuration,
MinRequestTimeout: defaults.MinRequestTimeout,
JSONPatchMaxCopyBytes: defaults.JSONPatchMaxCopyBytes,
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
@ -72,6 +74,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.ExternalAddress = s.ExternalHost
c.MaxRequestsInFlight = s.MaxRequestsInFlight
c.MaxMutatingRequestsInFlight = s.MaxMutatingRequestsInFlight
c.MaxStartupSequenceDuration = s.MaxStartupSequenceDuration
c.RequestTimeout = s.RequestTimeout
c.MinRequestTimeout = s.MinRequestTimeout
c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes
@ -106,6 +109,10 @@ func (s *ServerRunOptions) Validate() []error {
errors = append(errors, fmt.Errorf("--target-ram-mb can not be negative value"))
}
if s.MaxStartupSequenceDuration < 0 {
errors = append(errors, fmt.Errorf("--maximum-startup-sequence-duration can not be a negative value"))
}
if s.EnableInfightQuotaHandler {
if !utilfeature.DefaultFeatureGate.Enabled(features.RequestManagement) {
errors = append(errors, fmt.Errorf("--enable-inflight-quota-handler can not be set if feature "+
@ -185,6 +192,11 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"it out. This is the default request timeout for requests but may be overridden by flags such as "+
"--min-request-timeout for specific types of requests.")
fs.DurationVar(&s.MaxStartupSequenceDuration, "maximum-startup-sequence-duration", s.MaxStartupSequenceDuration, ""+
"This option represents the maximum amount of time it should take for apiserver to complete its startup sequence "+
"and become healthy. From apiserver's start time to when this amount of time has elapsed, /healthz will assume "+
"that unfinished post-start hooks will complete successfully and therefore return true.")
fs.IntVar(&s.MinRequestTimeout, "min-request-timeout", s.MinRequestTimeout, ""+
"An optional field indicating the minimum number of seconds a handler must keep "+
"a request open before timing it out. Currently only honored by the watch request "+

View File

@ -136,6 +136,22 @@ func TestServerRunOptionsValidate(t *testing.T) {
},
expectErr: "--max-resource-write-bytes can not be negative value",
},
{
name: "Test when MaxStartupSequenceDuration is negative value",
testOptions: &ServerRunOptions{
AdvertiseAddress: net.ParseIP("192.168.10.10"),
CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"},
MaxRequestsInFlight: 400,
MaxMutatingRequestsInFlight: 200,
RequestTimeout: time.Duration(2) * time.Minute,
MinRequestTimeout: 1800,
JSONPatchMaxCopyBytes: 10 * 1024 * 1024,
MaxRequestBodyBytes: 10 * 1024 * 1024,
TargetRAMMB: 65536,
MaxStartupSequenceDuration: -time.Second,
},
expectErr: "--maximum-startup-sequence-duration can not be a negative value",
},
{
name: "Test when ServerRunOptions is valid",
testOptions: &ServerRunOptions{