Merge pull request #54849 from hzxuzhonghu/audit-graceful-shutdown
Automatic merge from submit-queue (batch tested with PRs 46581, 55426, 54849). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. apiserver shutdown gracefully **What this PR does / why we need it**: apiserver shutdown gracefully and wait all non-long running requests finish before process exit. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #54793 **Special notes for your reviewer**: remove waitGroup, use atomic to count. **Release note**: ```release-note NONE ``` Kubernetes-commit: d12d711ba67af9c63c6497a3d73357729a76e9ab
This commit is contained in:
		
						commit
						101bfbcd85
					
				
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| 
						 | 
					@ -30,8 +30,10 @@ go_test(
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
 | 
				
			||||||
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
 | 
				
			||||||
 | 
					        "//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/client-go/informers:go_default_library",
 | 
					        "//vendor/k8s.io/client-go/informers:go_default_library",
 | 
				
			||||||
| 
						 | 
					@ -81,6 +83,7 @@ go_library(
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",
 | 
				
			||||||
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/version:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/version:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/admission/plugin/initialization:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/admission/plugin/initialization:go_default_library",
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -36,6 +36,7 @@ import (
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/serializer"
 | 
						"k8s.io/apimachinery/pkg/runtime/serializer"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
 | 
						utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/version"
 | 
						"k8s.io/apimachinery/pkg/version"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/admission"
 | 
						"k8s.io/apiserver/pkg/admission"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/audit"
 | 
						"k8s.io/apiserver/pkg/audit"
 | 
				
			||||||
| 
						 | 
					@ -128,6 +129,8 @@ type Config struct {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler.
 | 
						// BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler.
 | 
				
			||||||
	BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler)
 | 
						BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler)
 | 
				
			||||||
 | 
						// HandlerChainWaitGroup allows you to wait for all chain handlers exit after the server shutdown.
 | 
				
			||||||
 | 
						HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
 | 
				
			||||||
	// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
 | 
						// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
 | 
				
			||||||
	// always reported
 | 
						// always reported
 | 
				
			||||||
	DiscoveryAddresses discovery.Addresses
 | 
						DiscoveryAddresses discovery.Addresses
 | 
				
			||||||
| 
						 | 
					@ -236,6 +239,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
 | 
				
			||||||
		ReadWritePort:                443,
 | 
							ReadWritePort:                443,
 | 
				
			||||||
		RequestContextMapper:         apirequest.NewRequestContextMapper(),
 | 
							RequestContextMapper:         apirequest.NewRequestContextMapper(),
 | 
				
			||||||
		BuildHandlerChainFunc:        DefaultBuildHandlerChain,
 | 
							BuildHandlerChainFunc:        DefaultBuildHandlerChain,
 | 
				
			||||||
 | 
							HandlerChainWaitGroup:        new(utilwaitgroup.SafeWaitGroup),
 | 
				
			||||||
		LegacyAPIGroupPrefixes:       sets.NewString(DefaultLegacyAPIPrefix),
 | 
							LegacyAPIGroupPrefixes:       sets.NewString(DefaultLegacyAPIPrefix),
 | 
				
			||||||
		DisabledPostStartHooks:       sets.NewString(),
 | 
							DisabledPostStartHooks:       sets.NewString(),
 | 
				
			||||||
		HealthzChecks:                []healthz.HealthzChecker{healthz.PingHealthz},
 | 
							HealthzChecks:                []healthz.HealthzChecker{healthz.PingHealthz},
 | 
				
			||||||
| 
						 | 
					@ -446,8 +450,10 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
 | 
				
			||||||
		Serializer:             c.Serializer,
 | 
							Serializer:             c.Serializer,
 | 
				
			||||||
		AuditBackend:           c.AuditBackend,
 | 
							AuditBackend:           c.AuditBackend,
 | 
				
			||||||
		delegationTarget:       delegationTarget,
 | 
							delegationTarget:       delegationTarget,
 | 
				
			||||||
 | 
							HandlerChainWaitGroup:  c.HandlerChainWaitGroup,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
 | 
							minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
 | 
				
			||||||
 | 
							ShutdownTimeout:   c.RequestTimeout,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		SecureServingInfo: c.SecureServingInfo,
 | 
							SecureServingInfo: c.SecureServingInfo,
 | 
				
			||||||
		ExternalAddress:   c.ExternalAddress,
 | 
							ExternalAddress:   c.ExternalAddress,
 | 
				
			||||||
| 
						 | 
					@ -488,6 +494,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, delegateCheck := range delegationTarget.HealthzChecks() {
 | 
						for _, delegateCheck := range delegationTarget.HealthzChecks() {
 | 
				
			||||||
		skip := false
 | 
							skip := false
 | 
				
			||||||
		for _, existingCheck := range c.HealthzChecks {
 | 
							for _, existingCheck := range c.HealthzChecks {
 | 
				
			||||||
| 
						 | 
					@ -535,6 +542,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
 | 
				
			||||||
	handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler)
 | 
						handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler)
 | 
				
			||||||
	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
 | 
						handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
 | 
				
			||||||
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
 | 
						handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
 | 
				
			||||||
 | 
						handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
 | 
				
			||||||
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
 | 
						handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
 | 
				
			||||||
	handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
 | 
						handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
 | 
				
			||||||
	handler = genericfilters.WithPanicRecovery(handler)
 | 
						handler = genericfilters.WithPanicRecovery(handler)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -37,6 +37,7 @@ go_library(
 | 
				
			||||||
        "longrunning.go",
 | 
					        "longrunning.go",
 | 
				
			||||||
        "maxinflight.go",
 | 
					        "maxinflight.go",
 | 
				
			||||||
        "timeout.go",
 | 
					        "timeout.go",
 | 
				
			||||||
 | 
					        "waitgroup.go",
 | 
				
			||||||
        "wrap.go",
 | 
					        "wrap.go",
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
    importpath = "k8s.io/apiserver/pkg/server/filters",
 | 
					    importpath = "k8s.io/apiserver/pkg/server/filters",
 | 
				
			||||||
| 
						 | 
					@ -46,6 +47,7 @@ go_library(
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
 | 
				
			||||||
 | 
					        "//vendor/k8s.io/apimachinery/pkg/util/waitgroup:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
 | 
				
			||||||
        "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
 | 
					        "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,53 @@
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2017 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package filters
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"net/http"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
 | 
				
			||||||
 | 
						apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown.
 | 
				
			||||||
 | 
					func WithWaitGroup(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning apirequest.LongRunningRequestCheck, wg *utilwaitgroup.SafeWaitGroup) http.Handler {
 | 
				
			||||||
 | 
						return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 | 
				
			||||||
 | 
							ctx, ok := requestContextMapper.Get(req)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								// if this happens, the handler chain isn't setup correctly because there is no context mapper
 | 
				
			||||||
 | 
								handler.ServeHTTP(w, req)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							requestInfo, ok := apirequest.RequestInfoFrom(ctx)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								// if this happens, the handler chain isn't setup correctly because there is no request info
 | 
				
			||||||
 | 
								handler.ServeHTTP(w, req)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if !longRunning(req, requestInfo) {
 | 
				
			||||||
 | 
								if err := wg.Add(1); err != nil {
 | 
				
			||||||
 | 
									http.Error(w, "Apisever is shutting down.", http.StatusInternalServerError)
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								defer wg.Done()
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							handler.ServeHTTP(w, req)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -34,6 +34,7 @@ import (
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						"k8s.io/apimachinery/pkg/runtime/schema"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/serializer"
 | 
						"k8s.io/apimachinery/pkg/runtime/serializer"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						"k8s.io/apimachinery/pkg/util/sets"
 | 
				
			||||||
 | 
						utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/admission"
 | 
						"k8s.io/apiserver/pkg/admission"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/audit"
 | 
						"k8s.io/apiserver/pkg/audit"
 | 
				
			||||||
	genericapi "k8s.io/apiserver/pkg/endpoints"
 | 
						genericapi "k8s.io/apiserver/pkg/endpoints"
 | 
				
			||||||
| 
						 | 
					@ -83,6 +84,10 @@ type GenericAPIServer struct {
 | 
				
			||||||
	// minRequestTimeout is how short the request timeout can be.  This is used to build the RESTHandler
 | 
						// minRequestTimeout is how short the request timeout can be.  This is used to build the RESTHandler
 | 
				
			||||||
	minRequestTimeout time.Duration
 | 
						minRequestTimeout time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ShutdownTimeout is the timeout used for server shutdown. This specifies the timeout before server
 | 
				
			||||||
 | 
						// gracefully shutdown returns.
 | 
				
			||||||
 | 
						ShutdownTimeout time.Duration
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
 | 
						// legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
 | 
				
			||||||
	// to InstallLegacyAPIGroup
 | 
						// to InstallLegacyAPIGroup
 | 
				
			||||||
	legacyAPIGroupPrefixes sets.String
 | 
						legacyAPIGroupPrefixes sets.String
 | 
				
			||||||
| 
						 | 
					@ -146,6 +151,9 @@ type GenericAPIServer struct {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// delegationTarget is the next delegate in the chain or nil
 | 
						// delegationTarget is the next delegate in the chain or nil
 | 
				
			||||||
	delegationTarget DelegationTarget
 | 
						delegationTarget DelegationTarget
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
 | 
				
			||||||
 | 
						HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
 | 
					// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
 | 
				
			||||||
| 
						 | 
					@ -275,16 +283,28 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	<-stopCh
 | 
						<-stopCh
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return s.RunPreShutdownHooks()
 | 
						err = s.RunPreShutdownHooks()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
 | 
				
			||||||
 | 
						s.HandlerChainWaitGroup.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NonBlockingRun spawns the secure http server. An error is
 | 
					// NonBlockingRun spawns the secure http server. An error is
 | 
				
			||||||
// returned if the secure port cannot be listened on.
 | 
					// returned if the secure port cannot be listened on.
 | 
				
			||||||
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
 | 
					func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
 | 
				
			||||||
 | 
						// Use an stop channel to allow graceful shutdown without dropping audit events
 | 
				
			||||||
 | 
						// after http server shutdown.
 | 
				
			||||||
 | 
						auditStopCh := make(chan struct{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Start the audit backend before any request comes in. This means we must call Backend.Run
 | 
						// Start the audit backend before any request comes in. This means we must call Backend.Run
 | 
				
			||||||
	// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
 | 
						// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
 | 
				
			||||||
	if s.AuditBackend != nil {
 | 
						if s.AuditBackend != nil {
 | 
				
			||||||
		if err := s.AuditBackend.Run(stopCh); err != nil {
 | 
							if err := s.AuditBackend.Run(auditStopCh); err != nil {
 | 
				
			||||||
			return fmt.Errorf("failed to run the audit backend: %v", err)
 | 
								return fmt.Errorf("failed to run the audit backend: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -305,6 +325,8 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		<-stopCh
 | 
							<-stopCh
 | 
				
			||||||
		close(internalStopCh)
 | 
							close(internalStopCh)
 | 
				
			||||||
 | 
							s.HandlerChainWaitGroup.Wait()
 | 
				
			||||||
 | 
							close(auditStopCh)
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	s.RunPostStartHooks(stopCh)
 | 
						s.RunPostStartHooks(stopCh)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -25,6 +25,8 @@ import (
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"net/http/httptest"
 | 
						"net/http/httptest"
 | 
				
			||||||
	goruntime "runtime"
 | 
						goruntime "runtime"
 | 
				
			||||||
 | 
						"strconv"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -44,8 +46,11 @@ import (
 | 
				
			||||||
	"k8s.io/apiserver/pkg/authentication/user"
 | 
						"k8s.io/apiserver/pkg/authentication/user"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/authorization/authorizer"
 | 
						"k8s.io/apiserver/pkg/authorization/authorizer"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/endpoints/discovery"
 | 
						"k8s.io/apiserver/pkg/endpoints/discovery"
 | 
				
			||||||
 | 
						genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
 | 
				
			||||||
 | 
						apirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
				
			||||||
	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
						genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
 | 
				
			||||||
	"k8s.io/apiserver/pkg/registry/rest"
 | 
						"k8s.io/apiserver/pkg/registry/rest"
 | 
				
			||||||
 | 
						genericfilters "k8s.io/apiserver/pkg/server/filters"
 | 
				
			||||||
	etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
 | 
						etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
 | 
				
			||||||
	"k8s.io/client-go/informers"
 | 
						"k8s.io/client-go/informers"
 | 
				
			||||||
	"k8s.io/client-go/kubernetes/fake"
 | 
						"k8s.io/client-go/kubernetes/fake"
 | 
				
			||||||
| 
						 | 
					@ -509,3 +514,75 @@ func fakeVersion() version.Info {
 | 
				
			||||||
		Platform:     fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH),
 | 
							Platform:     fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// TestGracefulShutdown verifies server shutdown after request handler finish.
 | 
				
			||||||
 | 
					func TestGracefulShutdown(t *testing.T) {
 | 
				
			||||||
 | 
						etcdserver, config, _ := setUp(t)
 | 
				
			||||||
 | 
						defer etcdserver.Terminate(t)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var graceShutdown bool
 | 
				
			||||||
 | 
						wg := sync.WaitGroup{}
 | 
				
			||||||
 | 
						wg.Add(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
 | 
				
			||||||
 | 
							handler := genericfilters.WithWaitGroup(apiHandler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
 | 
				
			||||||
 | 
							handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
 | 
				
			||||||
 | 
							handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
 | 
				
			||||||
 | 
							return handler
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 | 
				
			||||||
 | 
							wg.Done()
 | 
				
			||||||
 | 
							time.Sleep(2 * time.Second)
 | 
				
			||||||
 | 
							w.WriteHeader(http.StatusOK)
 | 
				
			||||||
 | 
							graceShutdown = true
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						s, err := config.Complete(nil).New("test", EmptyDelegate)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Error in bringing up the server: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						s.Handler.NonGoRestfulMux.Handle("/test", handler)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						insecureServer := &http.Server{
 | 
				
			||||||
 | 
							Addr:    "0.0.0.0:0",
 | 
				
			||||||
 | 
							Handler: s.Handler,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						stopCh := make(chan struct{})
 | 
				
			||||||
 | 
						serverPort, err := RunServer(insecureServer, "tcp", 10*time.Second, stopCh)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("RunServer err: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						graceCh := make(chan struct{})
 | 
				
			||||||
 | 
						// mock a client request
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							resp, err := http.Get("http://127.0.0.1:" + strconv.Itoa(serverPort) + "/test")
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Errorf("Unexpected http error: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if resp.StatusCode != http.StatusOK {
 | 
				
			||||||
 | 
								t.Errorf("Unexpected http status code: %v", resp.StatusCode)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							close(graceCh)
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// close stopCh after request sent to server to guarantee request handler is running.
 | 
				
			||||||
 | 
						wg.Wait()
 | 
				
			||||||
 | 
						close(stopCh)
 | 
				
			||||||
 | 
						// wait for wait group handler finish
 | 
				
			||||||
 | 
						s.HandlerChainWaitGroup.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// check server all handlers finished.
 | 
				
			||||||
 | 
						if !graceShutdown {
 | 
				
			||||||
 | 
							t.Errorf("server shutdown not gracefully.")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// check client to make sure receive response.
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-graceCh:
 | 
				
			||||||
 | 
							t.Logf("server shutdown gracefully.")
 | 
				
			||||||
 | 
						case <-time.After(30 * time.Second):
 | 
				
			||||||
 | 
							t.Errorf("Timed out waiting for response.")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -17,6 +17,7 @@ limitations under the License.
 | 
				
			||||||
package server
 | 
					package server
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
	"crypto/tls"
 | 
						"crypto/tls"
 | 
				
			||||||
	"crypto/x509"
 | 
						"crypto/x509"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
| 
						 | 
					@ -84,13 +85,13 @@ func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
 | 
						glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh)
 | 
						s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, s.ShutdownTimeout, stopCh)
 | 
				
			||||||
	return err
 | 
						return err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// RunServer listens on the given port, then spawns a go-routine continuously serving
 | 
					// RunServer listens on the given port, then spawns a go-routine continuously serving
 | 
				
			||||||
// until the stopCh is closed. The port is returned. This function does not block.
 | 
					// until the stopCh is closed. The port is returned. This function does not block.
 | 
				
			||||||
func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) {
 | 
					func RunServer(server *http.Server, network string, shutDownTimeout time.Duration, stopCh <-chan struct{}) (int, error) {
 | 
				
			||||||
	if len(server.Addr) == 0 {
 | 
						if len(server.Addr) == 0 {
 | 
				
			||||||
		return 0, errors.New("address cannot be empty")
 | 
							return 0, errors.New("address cannot be empty")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					@ -111,10 +112,12 @@ func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int
 | 
				
			||||||
		return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
 | 
							return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Stop the server by closing the listener
 | 
						// Shutdown server gracefully.
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		<-stopCh
 | 
							<-stopCh
 | 
				
			||||||
		ln.Close()
 | 
							ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
 | 
				
			||||||
 | 
							server.Shutdown(ctx)
 | 
				
			||||||
 | 
							cancel()
 | 
				
			||||||
	}()
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue