apiserver + controllers: enhance context support

27a68aee3a4834 introduced context support for events. Creating an event
broadcaster with context makes tests more resilient against leaking goroutines
when that context gets canceled at the end of a test and enables per-test
output via ktesting.

The context could get passed to the constructor. A cleaner solution is to
enhance context support for the apiserver and then pass the context into the
controller's run method. This ripples up the call stack to all places which
start an apiserver.

Kubernetes-commit: b92273a760503cc57aba37c4d3a28554f7fec7f8
This commit is contained in:
Patrick Ohly 2023-12-01 09:00:59 +01:00 committed by Kubernetes Publisher
parent 9dcdab7a7c
commit 5ea67c789a
6 changed files with 98 additions and 35 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@ -42,6 +44,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)
@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
}
func TestNewWithDelegate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("test is done"))
delegateConfig := NewConfig(codecs)
delegateConfig.ExternalAddress = "192.168.10.4:443"
delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4")
@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) {
return nil
})
stopCh := make(chan struct{})
defer close(stopCh)
wrappingServer.PrepareRun()
wrappingServer.RunPostStartHooks(stopCh)
wrappingServer.RunPostStartHooks(ctx)
server := httptest.NewServer(wrappingServer.Handler)
defer server.Close()

View File

@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/util/managedfields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/admission"
@ -442,9 +443,19 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
// This is the diagram of what channels/signals are dependent on each other:
//
// | stopCh
// Deprecated: use RunWithContext instead. Run will not get removed to avoid
// breaking consumers, but should not be used in new code.
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
ctx := wait.ContextForChannel(stopCh)
return s.RunWithContext(ctx)
}
// RunWithContext spawns the secure http server. It only returns if ctx is canceled
// or the secure port cannot be listened on initially.
// This is the diagram of what contexts/channels/signals are dependent on each other:
//
// | ctx
// | |
// | ---------------------------------------------------------
// | | |
@ -477,12 +488,13 @@ func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
// | | | |
// | |-------------------|---------------------|----------------------------------------|
// | | |
// | stopHttpServerCh (AuditBackend::Shutdown())
// | stopHttpServerCtx (AuditBackend::Shutdown())
// | |
// | listenerStoppedCh
// | |
// | HTTPServerStoppedListening (httpServerStoppedListeningCh)
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
func (s preparedGenericAPIServer) RunWithContext(ctx context.Context) error {
stopCh := ctx.Done()
delayedStopCh := s.lifecycleSignals.AfterShutdownDelayDuration
shutdownInitiatedCh := s.lifecycleSignals.ShutdownInitiated
@ -544,9 +556,11 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
notAcceptingNewRequestCh := s.lifecycleSignals.NotAcceptingNewRequest
drainedCh := s.lifecycleSignals.InFlightRequestsDrained
stopHttpServerCh := make(chan struct{})
// Canceling the parent context does not immediately cancel the HTTP server.
// We only inherit context values here and deal with cancellation ourselves.
stopHTTPServerCtx, stopHTTPServer := context.WithCancelCause(context.WithoutCancel(ctx))
go func() {
defer close(stopHttpServerCh)
defer stopHTTPServer(errors.New("time to stop HTTP server"))
timeToStopHttpServerCh := notAcceptingNewRequestCh.Signaled()
if s.ShutdownSendRetryAfter {
@ -565,7 +579,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
}
}
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
stoppedCh, listenerStoppedCh, err := s.NonBlockingRunWithContext(stopHTTPServerCtx, shutdownTimeout)
if err != nil {
return err
}
@ -694,7 +708,18 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// NonBlockingRun spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
//
// Deprecated: use RunWithContext instead. Run will not get removed to avoid
// breaking consumers, but should not be used in new code.
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
ctx := wait.ContextForChannel(stopCh)
return s.NonBlockingRunWithContext(ctx, shutdownTimeout)
}
// NonBlockingRunWithContext spawns the secure http server. An error is
// returned if the secure port cannot be listened on.
// The returned channel is closed when the (asynchronous) termination is finished.
func (s preparedGenericAPIServer) NonBlockingRunWithContext(ctx context.Context, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
// Use an internal stop channel to allow cleanup of the listeners on error.
internalStopCh := make(chan struct{})
var stoppedCh <-chan struct{}
@ -712,11 +737,11 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdow
// responsibility of the caller to close the provided channel to
// ensure cleanup.
go func() {
<-stopCh
<-ctx.Done()
close(internalStopCh)
}()
s.RunPostStartHooks(stopCh)
s.RunPostStartHooks(ctx)
if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)

View File

@ -20,6 +20,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"log"
@ -41,6 +42,7 @@ import (
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
@ -200,10 +202,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
}, nil)
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -222,7 +229,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationDisabled(t
}
// signal termination event: initiate a shutdown
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntilSignaled(t, signals.ShutdownInitiated)
// /readyz must return an error, but we need to give it some time
@ -423,10 +430,15 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
}, nil)
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -445,7 +457,7 @@ func TestGracefulTerminationWithKeepListeningDuringGracefulTerminationEnabled(t
}
// signal termination event: initiate a shutdown
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntilSignaled(t, signals.ShutdownInitiated)
// /readyz must return an error, but we need to give it some time
@ -568,10 +580,15 @@ func TestMuxAndDiscoveryComplete(t *testing.T) {
}
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
@ -612,6 +629,9 @@ func TestPreShutdownHooks(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
stopCtx, stop := context.WithCancelCause(ctx)
defer stop(errors.New("test has completed"))
s := test.server()
doer := setupDoer(t, s.SecureServingInfo)
@ -643,14 +663,16 @@ func TestPreShutdownHooks(t *testing.T) {
}
// start the API server
stopCh, runCompletedCh := make(chan struct{}), make(chan struct{})
runCompletedCh := make(chan struct{})
go func() {
defer close(runCompletedCh)
s.PrepareRun().Run(stopCh)
if err := s.PrepareRun().RunWithContext(stopCtx); err != nil {
t.Errorf("unexpected error from RunWithContext: %v", err)
}
}()
waitForAPIServerStarted(t, doer)
close(stopCh)
stop(errors.New("shutting down"))
waitForeverUntil(t, runCompletedCh, "the apiserver Run method did not return")

View File

@ -52,6 +52,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2/ktesting"
kubeopenapi "k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/validation/spec"
netutils "k8s.io/utils/net"
@ -317,17 +318,16 @@ func TestInstallAPIGroups(t *testing.T) {
}
func TestPrepareRun(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
s, config, assert := newMaster(t)
assert.NotNil(config.OpenAPIConfig)
server := httptest.NewServer(s.Handler.Director)
defer server.Close()
done := make(chan struct{})
defer close(done)
s.PrepareRun()
s.RunPostStartHooks(done)
s.RunPostStartHooks(ctx)
// openapi is installed in PrepareRun
resp, err := http.Get(server.URL + "/openapi/v2")
@ -353,9 +353,10 @@ func TestPrepareRun(t *testing.T) {
}
func TestUpdateOpenAPISpec(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
s, _, assert := newMaster(t)
s.PrepareRun()
s.RunPostStartHooks(make(chan struct{}))
s.RunPostStartHooks(ctx)
server := httptest.NewServer(s.Handler.Director)
defer server.Close()

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"errors"
"fmt"
"net/http"
@ -48,8 +49,13 @@ type PreShutdownHookFunc func() error
type PostStartHookContext struct {
// LoopbackClientConfig is a config for a privileged loopback connection to the API server
LoopbackClientConfig *restclient.Config
// StopCh is the channel that will be closed when the server stops
// StopCh is the channel that will be closed when the server stops.
//
// Deprecated: use the PostStartHookContext itself instead, it contains a context that
// gets cancelled when the server stops. StopCh keeps getting provided for existing code.
StopCh <-chan struct{}
// Context gets cancelled when the server stops.
context.Context
}
// PostStartHookProvider is an interface in addition to provide a post start hook for the api server
@ -151,15 +157,16 @@ func (s *GenericAPIServer) AddPreShutdownHookOrDie(name string, hook PreShutdown
}
}
// RunPostStartHooks runs the PostStartHooks for the server
func (s *GenericAPIServer) RunPostStartHooks(stopCh <-chan struct{}) {
// RunPostStartHooks runs the PostStartHooks for the server.
func (s *GenericAPIServer) RunPostStartHooks(ctx context.Context) {
s.postStartHookLock.Lock()
defer s.postStartHookLock.Unlock()
s.postStartHooksCalled = true
context := PostStartHookContext{
LoopbackClientConfig: s.LoopbackClientConfig,
StopCh: stopCh,
StopCh: ctx.Done(),
Context: ctx,
}
for hookName, hookEntry := range s.postStartHooks {

View File

@ -18,6 +18,7 @@ package options
import (
"bytes"
"context"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/tls"
@ -25,6 +26,7 @@ import (
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"errors"
"fmt"
"io/ioutil"
"math/big"
@ -44,6 +46,7 @@ import (
"k8s.io/client-go/discovery"
restclient "k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)
@ -215,6 +218,10 @@ func TestServerRunWithSNI(t *testing.T) {
test := tests[title]
t.Run(title, func(t *testing.T) {
t.Parallel()
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("test has completed"))
// create server cert
certDir := "testdata/" + specToName(test.Cert)
serverCertBundleFile := filepath.Join(certDir, "cert")
@ -267,9 +274,6 @@ func TestServerRunWithSNI(t *testing.T) {
signatures[sig] = j
}
stopCh := make(chan struct{})
defer close(stopCh)
// launch server
config := setUp(t)
@ -316,7 +320,7 @@ func TestServerRunWithSNI(t *testing.T) {
preparedServer := s.PrepareRun()
preparedServerErrors := make(chan error)
go func() {
if err := preparedServer.Run(stopCh); err != nil {
if err := preparedServer.RunWithContext(ctx); err != nil {
preparedServerErrors <- err
}
}()