diff --git a/pkg/server/config_test.go b/pkg/server/config_test.go index a1d6d8902..f58f3bf9c 100644 --- a/pkg/server/config_test.go +++ b/pkg/server/config_test.go @@ -17,6 +17,8 @@ limitations under the License. package server import ( + "context" + "errors" "fmt" "io" "net/http" @@ -42,6 +44,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" "k8s.io/component-base/tracing" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" ) @@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) { } func TestNewWithDelegate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("test is done")) delegateConfig := NewConfig(codecs) delegateConfig.ExternalAddress = "192.168.10.4:443" delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4") @@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) { return nil }) - stopCh := make(chan struct{}) - defer close(stopCh) wrappingServer.PrepareRun() - wrappingServer.RunPostStartHooks(stopCh) + wrappingServer.RunPostStartHooks(ctx) server := httptest.NewServer(wrappingServer.Handler) defer server.Close() diff --git a/pkg/server/genericapiserver.go b/pkg/server/genericapiserver.go index 6066c0fcb..e0dcbf758 100644 --- a/pkg/server/genericapiserver.go +++ b/pkg/server/genericapiserver.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/util/managedfields" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup" "k8s.io/apimachinery/pkg/version" "k8s.io/apiserver/pkg/admission" @@ -442,9 +443,19 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // Run spawns the secure http server. It only returns if stopCh is closed // or the secure port cannot be listened on initially. -// This is the diagram of what channels/signals are dependent on each other: // -// | stopCh +// Deprecated: use RunWithContext instead. Run will not get removed to avoid +// breaking consumers, but should not be used in new code. +func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { + ctx := wait.ContextForChannel(stopCh) + return s.RunWithContext(ctx) +} + +// RunWithContext spawns the secure http server. It only returns if ctx is canceled +// or the secure port cannot be listened on initially. +// This is the diagram of what contexts/channels/signals are dependent on each other: +// +// | ctx // | | // | --------------------------------------------------------- // | | | @@ -477,12 +488,13 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { // | | | | // | |-------------------|---------------------|----------------------------------------| // | | | -// | stopHttpServerCh (AuditBackend::Shutdown()) +// | stopHttpServerCtx (AuditBackend::Shutdown()) // | | // | listenerStoppedCh // | | // | HTTPServerStoppedListening (httpServerStoppedListeningCh) -func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { +func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error { + stopCh := ctx.Done() delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated @@ -544,9 +556,11 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest drainedCh := s.lifecycleSignals.InFlightRequestsDrained - stopHttpServerCh := make(chan struct{}) + // Canceling the parent context does not immediately cancel the HTTP server. + // We only inherit context values here and deal with cancellation ourselves. + stopHTTPServerCtx, stopHTTPServer := context.WithCancelCause(context.WithoutCancel(ctx)) go func() { - defer close(stopHttpServerCh) + defer stopHTTPServer(errors.New("time to stop HTTP server")) timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled() if s.ShutdownSendRetryAfter { @@ -565,7 +579,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { } } - stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout) + stoppedCh, listenerStoppedCh, err := s.NonBlockingRunWithContext(stopHTTPServerCtx, shutdownTimeout) if err != nil { return err } @@ -694,7 +708,18 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { // NonBlockingRun spawns the secure http server. An error is // returned if the secure port cannot be listened on. // The returned channel is closed when the (asynchronous) termination is finished. +// +// Deprecated: use RunWithContext instead. Run will not get removed to avoid +// breaking consumers, but should not be used in new code. func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) { + ctx := wait.ContextForChannel(stopCh) + return s.NonBlockingRunWithContext(ctx, shutdownTimeout) +} + +// NonBlockingRunWithContext spawns the secure http server. An error is +// returned if the secure port cannot be listened on. +// The returned channel is closed when the (asynchronous) termination is finished. +func (s preparedGenericAPIServer) NonBlockingRunWithContext(ctx context.Context, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) { // Use an internal stop channel to allow cleanup of the listeners on error. internalStopCh := make(chan struct{}) var stoppedCh <-chan struct{} @@ -712,11 +737,11 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow // responsibility of the caller to close the provided channel to // ensure cleanup. go func() { - <-stopCh + <-ctx.Done() close(internalStopCh) }() - s.RunPostStartHooks(stopCh) + s.RunPostStartHooks(ctx) if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil { klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) diff --git a/pkg/server/genericapiserver_graceful_termination_test.go b/pkg/server/genericapiserver_graceful_termination_test.go index 967b97606..39079c616 100644 --- a/pkg/server/genericapiserver_graceful_termination_test.go +++ b/pkg/server/genericapiserver_graceful_termination_test.go @@ -20,6 +20,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "errors" "fmt" "io" "log" @@ -41,6 +42,7 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" "github.com/google/go-cmp/cmp" "golang.org/x/net/http2" @@ -200,10 +202,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t }, nil) // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -222,7 +229,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t } // signal termination event: initiate a shutdown - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntilSignaled(t, signals.ShutdownInitiated) // /readyz must return an error, but we need to give it some time @@ -423,10 +430,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t }, nil) // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -445,7 +457,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t } // signal termination event: initiate a shutdown - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntilSignaled(t, signals.ShutdownInitiated) // /readyz must return an error, but we need to give it some time @@ -568,10 +580,15 @@ func TestMuxAndDiscoveryComplete(t *testing.T) { } // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) @@ -612,6 +629,9 @@ func TestPreShutdownHooks(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + stopCtx, stop := context.WithCancelCause(ctx) + defer stop(errors.New("test has completed")) s := test.server() doer := setupDoer(t, s.SecureServingInfo) @@ -643,14 +663,16 @@ func TestPreShutdownHooks(t *testing.T) { } // start the API server - stopCh, runCompletedCh := make(chan struct{}), make(chan struct{}) + runCompletedCh := make(chan struct{}) go func() { defer close(runCompletedCh) - s.PrepareRun().Run(stopCh) + if err := s.PrepareRun().RunWithContext(stopCtx); err != nil { + t.Errorf("unexpected error from RunWithContext: %v", err) + } }() waitForAPIServerStarted(t, doer) - close(stopCh) + stop(errors.New("shutting down")) waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return") diff --git a/pkg/server/genericapiserver_test.go b/pkg/server/genericapiserver_test.go index 091349182..3549b238c 100644 --- a/pkg/server/genericapiserver_test.go +++ b/pkg/server/genericapiserver_test.go @@ -52,6 +52,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" + "k8s.io/klog/v2/ktesting" kubeopenapi "k8s.io/kube-openapi/pkg/common" "k8s.io/kube-openapi/pkg/validation/spec" netutils "k8s.io/utils/net" @@ -317,17 +318,16 @@ func TestInstallAPIGroups(t *testing.T) { } func TestPrepareRun(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) s, config, assert := newMaster(t) assert.NotNil(config.OpenAPIConfig) server := httptest.NewServer(s.Handler.Director) defer server.Close() - done := make(chan struct{}) - defer close(done) s.PrepareRun() - s.RunPostStartHooks(done) + s.RunPostStartHooks(ctx) // openapi is installed in PrepareRun resp, err := http.Get(server.URL + "/openapi/v2") @@ -353,9 +353,10 @@ func TestPrepareRun(t *testing.T) { } func TestUpdateOpenAPISpec(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) s, _, assert := newMaster(t) s.PrepareRun() - s.RunPostStartHooks(make(chan struct{})) + s.RunPostStartHooks(ctx) server := httptest.NewServer(s.Handler.Director) defer server.Close() diff --git a/pkg/server/hooks.go b/pkg/server/hooks.go index 065df6bc5..1561d7a84 100644 --- a/pkg/server/hooks.go +++ b/pkg/server/hooks.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "errors" "fmt" "net/http" @@ -48,8 +49,13 @@ type PreShutdownHookFunc func() error type PostStartHookContext struct { // LoopbackClientConfig is a config for a privileged loopback connection to the API server LoopbackClientConfig *restclient.Config - // StopCh is the channel that will be closed when the server stops + // StopCh is the channel that will be closed when the server stops. + // + // Deprecated: use the PostStartHookContext itself instead, it contains a context that + // gets cancelled when the server stops. StopCh keeps getting provided for existing code. StopCh <-chan struct{} + // Context gets cancelled when the server stops. + context.Context } // PostStartHookProvider is an interface in addition to provide a post start hook for the api server @@ -151,15 +157,16 @@ func (s *GenericAPIServer) AddPreShutdownHookOrDie(name string, hook PreShutdown } } -// RunPostStartHooks runs the PostStartHooks for the server -func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) { +// RunPostStartHooks runs the PostStartHooks for the server. +func (s *GenericAPIServer) RunPostStartHooks(ctx context.Context) { s.postStartHookLock.Lock() defer s.postStartHookLock.Unlock() s.postStartHooksCalled = true context := PostStartHookContext{ LoopbackClientConfig: s.LoopbackClientConfig, - StopCh: stopCh, + StopCh: ctx.Done(), + Context: ctx, } for hookName, hookEntry := range s.postStartHooks { diff --git a/pkg/server/options/serving_test.go b/pkg/server/options/serving_test.go index f1ca80cb2..1ccccb417 100644 --- a/pkg/server/options/serving_test.go +++ b/pkg/server/options/serving_test.go @@ -18,6 +18,7 @@ package options import ( "bytes" + "context" cryptorand "crypto/rand" "crypto/rsa" "crypto/tls" @@ -25,6 +26,7 @@ import ( "crypto/x509/pkix" "encoding/base64" "encoding/pem" + "errors" "fmt" "io/ioutil" "math/big" @@ -44,6 +46,7 @@ import ( "k8s.io/client-go/discovery" restclient "k8s.io/client-go/rest" cliflag "k8s.io/component-base/cli/flag" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" ) @@ -215,6 +218,10 @@ func TestServerRunWithSNI(t *testing.T) { test := tests[title] t.Run(title, func(t *testing.T) { t.Parallel() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(errors.New("test has completed")) + // create server cert certDir := "testdata/" + specToName(test.Cert) serverCertBundleFile := filepath.Join(certDir, "cert") @@ -267,9 +274,6 @@ func TestServerRunWithSNI(t *testing.T) { signatures[sig] = j } - stopCh := make(chan struct{}) - defer close(stopCh) - // launch server config := setUp(t) @@ -316,7 +320,7 @@ func TestServerRunWithSNI(t *testing.T) { preparedServer := s.PrepareRun() preparedServerErrors := make(chan error) go func() { - if err := preparedServer.Run(stopCh); err != nil { + if err := preparedServer.RunWithContext(ctx); err != nil { preparedServerErrors <- err } }()