Merge pull request #54795 from sttts/sttts-audit-shutdown-sync-revert

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Revert: Shutdown http handlers before shutting down audit backend

Fixes https://github.com/kubernetes/kubernetes/issues/54793

Kubernetes-commit: 3096a32568a231743ef008a85f2e670e28d915ae
This commit is contained in:
Kubernetes Publisher 2017-10-30 09:47:35 -07:00
commit f9f51e2b63
7 changed files with 6 additions and 167 deletions

View File

@ -30,10 +30,8 @@ go_test(
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authorization/authorizer:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",

View File

@ -27,7 +27,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/emicklei/go-restful-swagger12"
@ -129,8 +128,6 @@ type Config struct {
// BuildHandlerChainFunc allows you to build custom handler chains by decorating the apiHandler.
BuildHandlerChainFunc func(apiHandler http.Handler, c *Config) (secure http.Handler)
// HandlerChainWaitGroup allows you to wait for all chain handlers exit after the server shutdown.
HandlerChainWaitGroup *sync.WaitGroup
// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
// always reported
DiscoveryAddresses discovery.Addresses
@ -239,7 +236,6 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
ReadWritePort: 443,
RequestContextMapper: apirequest.NewRequestContextMapper(),
BuildHandlerChainFunc: DefaultBuildHandlerChain,
HandlerChainWaitGroup: new(sync.WaitGroup),
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
HealthzChecks: []healthz.HealthzChecker{healthz.PingHealthz},
@ -450,10 +446,8 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
Serializer: c.Serializer,
AuditBackend: c.AuditBackend,
delegationTarget: delegationTarget,
HandlerChainWaitGroup: c.HandlerChainWaitGroup,
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
shutdownTimeout: c.RequestTimeout,
SecureServingInfo: c.SecureServingInfo,
ExternalAddress: c.ExternalAddress,
@ -494,7 +488,6 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
return nil, err
}
}
for _, delegateCheck := range delegationTarget.HealthzChecks() {
skip := false
for _, existingCheck := range c.HealthzChecks {
@ -542,7 +535,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithAuthentication(handler, c.RequestContextMapper, c.Authenticator, failedHandler)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.RequestContextMapper, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
handler = genericfilters.WithPanicRecovery(handler)

View File

@ -37,7 +37,6 @@ go_library(
"longrunning.go",
"maxinflight.go",
"timeout.go",
"waitgroup.go",
"wrap.go",
],
importpath = "k8s.io/apiserver/pkg/server/filters",

View File

@ -1,50 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"net/http"
"sync"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
)
// WithWaitGroup adds all non long-running requests to wait group, which is used for graceful shutdown.
func WithWaitGroup(handler http.Handler, requestContextMapper apirequest.RequestContextMapper, longRunning apirequest.LongRunningRequestCheck, wg *sync.WaitGroup) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, ok := requestContextMapper.Get(req)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no context mapper
handler.ServeHTTP(w, req)
return
}
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no request info
handler.ServeHTTP(w, req)
return
}
if !longRunning(req, requestInfo) {
wg.Add(1)
defer wg.Done()
}
handler.ServeHTTP(w, req)
})
}

View File

@ -89,9 +89,6 @@ type GenericAPIServer struct {
// minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler
minRequestTimeout time.Duration
// shutdownTimeout is the timeout used for server shutdown.
shutdownTimeout time.Duration
// legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
// to InstallLegacyAPIGroup
legacyAPIGroupPrefixes sets.String
@ -155,9 +152,6 @@ type GenericAPIServer struct {
// delegationTarget is the next delegate in the chain or nil
delegationTarget DelegationTarget
// HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
HandlerChainWaitGroup *sync.WaitGroup
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -287,28 +281,16 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
<-stopCh
err = s.RunPreShutdownHooks()
if err != nil {
return err
}
// Wait for all requests to finish, which is bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()
return nil
return s.RunPreShutdownHooks()
}
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
// Use an stop channel to allow graceful shutdown without dropping audit events
// after http server shutdown.
auditStopCh := make(chan struct{})
// Start the audit backend before any request comes in. This means we must call Backend.Run
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(auditStopCh); err != nil {
if err := s.AuditBackend.Run(stopCh); err != nil {
return fmt.Errorf("failed to run the audit backend: %v", err)
}
}
@ -329,8 +311,6 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
go func() {
<-stopCh
close(internalStopCh)
s.HandlerChainWaitGroup.Wait()
close(auditStopCh)
}()
s.RunPostStartHooks(stopCh)

View File

@ -25,8 +25,6 @@ import (
"net/http"
"net/http/httptest"
goruntime "runtime"
"strconv"
"sync"
"testing"
"time"
@ -46,11 +44,8 @@ import (
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/endpoints/discovery"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
genericfilters "k8s.io/apiserver/pkg/server/filters"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
@ -514,75 +509,3 @@ func fakeVersion() version.Info {
Platform: fmt.Sprintf("%s/%s", goruntime.GOOS, goruntime.GOARCH),
}
}
// TestGracefulShutdown verifies server shutdown after request handler finish.
func TestGracefulShutdown(t *testing.T) {
etcdserver, config, _ := setUp(t)
defer etcdserver.Terminate(t)
var graceShutdown bool
wg := sync.WaitGroup{}
wg.Add(1)
config.BuildHandlerChainFunc = func(apiHandler http.Handler, c *Config) http.Handler {
handler := genericfilters.WithWaitGroup(apiHandler, c.RequestContextMapper, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver, c.RequestContextMapper)
handler = apirequest.WithRequestContext(handler, c.RequestContextMapper)
return handler
}
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
wg.Done()
time.Sleep(2 * time.Second)
w.WriteHeader(http.StatusOK)
graceShutdown = true
})
s, err := config.Complete(nil).New("test", EmptyDelegate)
if err != nil {
t.Fatalf("Error in bringing up the server: %v", err)
}
s.Handler.NonGoRestfulMux.Handle("/test", handler)
insecureServer := &http.Server{
Addr: "0.0.0.0:0",
Handler: s.Handler,
}
stopCh := make(chan struct{})
serverPort, err := RunServer(insecureServer, "tcp", 10*time.Second, stopCh)
if err != nil {
t.Errorf("RunServer err: %v", err)
}
graceCh := make(chan struct{})
// mock a client request
go func() {
resp, err := http.Get("http://127.0.0.1:" + strconv.Itoa(serverPort) + "/test")
if err != nil {
t.Errorf("Unexpected http error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Errorf("Unexpected http status code: %v", resp.StatusCode)
}
close(graceCh)
}()
// close stopCh after request sent to server to guarantee request handler is running.
wg.Wait()
close(stopCh)
// wait for wait group handler finish
s.HandlerChainWaitGroup.Wait()
// check server all handlers finished.
if !graceShutdown {
t.Errorf("server shutdown not gracefully.")
}
// check client to make sure receive response.
select {
case <-graceCh:
t.Logf("server shutdown gracefully.")
case <-time.After(30 * time.Second):
t.Errorf("Timed out waiting for response.")
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package server
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
@ -85,13 +84,13 @@ func (s *GenericAPIServer) serveSecurely(stopCh <-chan struct{}) error {
glog.Infof("Serving securely on %s", s.SecureServingInfo.BindAddress)
var err error
s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, s.shutdownTimeout, stopCh)
s.effectiveSecurePort, err = RunServer(secureServer, s.SecureServingInfo.BindNetwork, stopCh)
return err
}
// RunServer listens on the given port, then spawns a go-routine continuously serving
// until the stopCh is closed. The port is returned. This function does not block.
func RunServer(server *http.Server, network string, shutDownTimeout time.Duration, stopCh <-chan struct{}) (int, error) {
func RunServer(server *http.Server, network string, stopCh <-chan struct{}) (int, error) {
if len(server.Addr) == 0 {
return 0, errors.New("address cannot be empty")
}
@ -112,12 +111,10 @@ func RunServer(server *http.Server, network string, shutDownTimeout time.Duratio
return 0, fmt.Errorf("invalid listen address: %q", ln.Addr().String())
}
// Shutdown server gracefully.
// Stop the server by closing the listener
go func() {
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
server.Shutdown(ctx)
cancel()
ln.Close()
}()
go func() {