apiserver: NonBlockingRun should return a listener stopped channel

NonBlockingRun should also return a channel that gets closed when the
underlying http Server has stopped listening (during the graceful
shutdown period)

Kubernetes-commit: a84c1b71005930e8253c1348515020132c5c175b
This commit is contained in:
Abu Kashem 2021-06-24 16:04:54 -04:00 committed by Kubernetes Publisher
parent 5c1642946b
commit c1c949723d
5 changed files with 91 additions and 13 deletions

View File

@ -52,7 +52,7 @@ func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTime
} else {
klog.Infof("Serving insecurely on %s", s.Listener.Addr())
}
_, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
_, _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
// NOTE: we do not handle stoppedCh returned by RunServer for graceful termination here
return err
}

View File

@ -351,10 +351,15 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
}()
// close socket after delayed stopCh
stoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled())
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(delayedStopCh.Signaled())
if err != nil {
return err
}
httpServerStoppedListeningCh := s.terminationSignals.HTTPServerStoppedListening
go func() {
<-listenerStoppedCh
httpServerStoppedListeningCh.Signal()
}()
drainedCh := s.terminationSignals.InFlightRequestsDrained
go func() {
@ -389,7 +394,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
// Use an stop channel to allow graceful shutdown without dropping audit events
// after http server shutdown.
auditStopCh := make(chan struct{})
@ -398,20 +403,22 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan
// before http server start serving. Otherwise the Backend.ProcessEvents call might block.
if s.AuditBackend != nil {
if err := s.AuditBackend.Run(auditStopCh); err != nil {
return nil, fmt.Errorf("failed to run the audit backend: %v", err)
return nil, nil, fmt.Errorf("failed to run the audit backend: %v", err)
}
}
// Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
var listenerStoppedCh <-chan struct{}
if s.SecureServingInfo != nil && s.Handler != nil {
var err error
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
klog.V(1).Infof("[graceful-termination] ShutdownTimeout=%s", s.ShutdownTimeout)
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.ServeWithListenerStopped(s.Handler, s.ShutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
close(auditStopCh)
return nil, err
return nil, nil, err
}
}
@ -434,7 +441,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
}
return stoppedCh, nil
return stoppedCh, listenerStoppedCh, nil
}
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource

View File

@ -590,7 +590,7 @@ func TestGracefulShutdown(t *testing.T) {
// get port
serverPort := ln.Addr().(*net.TCPAddr).Port
stoppedCh, err := RunServer(insecureServer, ln, 10*time.Second, stopCh)
stoppedCh, _, err := RunServer(insecureServer, ln, 10*time.Second, stopCh)
if err != nil {
t.Fatalf("RunServer err: %v", err)
}

View File

@ -47,6 +47,10 @@ T0+70s: AfterShutdownDelayDuration: shutdown delay duration has passed
up to '60s' (dictated by ShutdownTimeout)
- active long running requests will receive a GOAWAY.
T0+70s: HTTPServerStoppedListening:
- this event is signaled when the HTTP Server has stopped listening
which is immediately after server.Shutdown has been invoked
T0 + 70s + up-to 60s: InFlightRequestsDrained: existing in flight requests have been drained
- long running requests are outside of this scope
- up-to 60s: the default value of 'ShutdownTimeout' is 60s, this means that
@ -88,6 +92,10 @@ type terminationSignals struct {
// InFlightRequestsDrained event is signaled when the existing requests
// in flight have completed. This is used as signal to shut down the audit backends
InFlightRequestsDrained terminationSignal
// HTTPServerStoppedListening termination event is signaled when the
// HTTP Server has stopped listening to the underlying socket.
HTTPServerStoppedListening terminationSignal
}
// newTerminationSignals returns an instance of terminationSignals interface to be used
@ -97,6 +105,7 @@ func newTerminationSignals() terminationSignals {
ShutdownInitiated: newNamedChannelWrapper("ShutdownInitiated"),
AfterShutdownDelayDuration: newNamedChannelWrapper("AfterShutdownDelayDuration"),
InFlightRequestsDrained: newNamedChannelWrapper("InFlightRequestsDrained"),
HTTPServerStoppedListening: newNamedChannelWrapper("HTTPServerStoppedListening"),
}
}

View File

@ -192,6 +192,67 @@ func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Dur
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
secureServer.ErrorLog = tlsErrorLogger
klog.Infof("Serving securely on %s", secureServer.Addr)
stoppedCh, _, err := RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
return stoppedCh, err
}
// ServeWithListenerStopped runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
// The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. ServeWithListenerStopped does not block.
// It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
// It returns a listenerStoppedCh that is closed when the underlying http Server has stopped listening.
// TODO: do a follow up PR to remove this function, change 'Serve' to return listenerStoppedCh
// and update all components that call 'Serve'
func (s *SecureServingInfo) ServeWithListenerStopped(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
if s.Listener == nil {
return nil, nil, fmt.Errorf("listener must not be nil")
}
tlsConfig, err := s.tlsConfig(stopCh)
if err != nil {
return nil, nil, err
}
secureServer := &http.Server{
Addr: s.Listener.Addr().String(),
Handler: handler,
MaxHeaderBytes: 1 << 20,
TLSConfig: tlsConfig,
}
// At least 99% of serialized resources in surveyed clusters were smaller than 256kb.
// This should be big enough to accommodate most API POST requests in a single frame,
// and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`.
const resourceBody99Percentile = 256 * 1024
http2Options := &http2.Server{}
// shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame
http2Options.MaxUploadBufferPerStream = resourceBody99Percentile
http2Options.MaxReadFrameSize = resourceBody99Percentile
// use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately
if s.HTTP2MaxStreamsPerConnection > 0 {
http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection)
} else {
http2Options.MaxConcurrentStreams = 250
}
// increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams
http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams)
if !s.DisableHTTP2 {
// apply settings to the server
if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
return nil, nil, fmt.Errorf("error configuring http2: %v", err)
}
}
// use tlsHandshakeErrorWriter to handle messages of tls handshake error
tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
secureServer.ErrorLog = tlsErrorLogger
klog.Infof("Serving securely on %s", secureServer.Addr)
return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}
@ -207,15 +268,15 @@ func RunServer(
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, error) {
) (<-chan struct{}, <-chan struct{}, error) {
if ln == nil {
return nil, fmt.Errorf("listener must not be nil")
return nil, nil, fmt.Errorf("listener must not be nil")
}
// Shutdown server gracefully.
stoppedCh := make(chan struct{})
serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{})
go func() {
defer close(stoppedCh)
defer close(serverShutdownCh)
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
server.Shutdown(ctx)
@ -224,6 +285,7 @@ func RunServer(
go func() {
defer utilruntime.HandleCrash()
defer close(listenerStoppedCh)
var listener net.Listener
listener = tcpKeepAliveListener{ln}
@ -242,7 +304,7 @@ func RunServer(
}
}()
return stoppedCh, nil
return serverShutdownCh, listenerStoppedCh, nil
}
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted