diff --git a/pkg/authentication/request/headerrequest/requestheader_controller.go b/pkg/authentication/request/headerrequest/requestheader_controller.go index 561b6fba9..d8c4090b1 100644 --- a/pkg/authentication/request/headerrequest/requestheader_controller.go +++ b/pkg/authentication/request/headerrequest/requestheader_controller.go @@ -162,29 +162,29 @@ func (c *RequestHeaderAuthRequestController) AllowedClientNames() []string { } // Run starts RequestHeaderAuthRequestController controller and blocks until stopCh is closed. -func (c *RequestHeaderAuthRequestController) Run(workers int, stopCh <-chan struct{}) { +func (c *RequestHeaderAuthRequestController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting %s", c.name) defer klog.Infof("Shutting down %s", c.name) - go c.configmapInformer.Run(stopCh) + go c.configmapInformer.Run(ctx.Done()) // wait for caches to fill before starting your work - if !cache.WaitForNamedCacheSync(c.name, stopCh, c.configmapInformerSynced) { + if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.configmapInformerSynced) { return } // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) - <-stopCh + <-ctx.Done() } // // RunOnce runs a single sync loop -func (c *RequestHeaderAuthRequestController) RunOnce() error { - configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(context.TODO(), c.configmapName, metav1.GetOptions{}) +func (c *RequestHeaderAuthRequestController) RunOnce(ctx context.Context) error { + configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(ctx, c.configmapName, metav1.GetOptions{}) switch { case errors.IsNotFound(err): // ignore, authConfigMap is nil now diff --git a/pkg/authentication/request/headerrequest/requestheader_controller_test.go b/pkg/authentication/request/headerrequest/requestheader_controller_test.go index 2577d2635..36dfbf1ec 100644 --- a/pkg/authentication/request/headerrequest/requestheader_controller_test.go +++ b/pkg/authentication/request/headerrequest/requestheader_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package headerrequest import ( + "context" "encoding/json" "k8s.io/apimachinery/pkg/api/equality" "testing" @@ -221,7 +222,8 @@ func TestRequestHeaderAuthRequestControllerSyncOnce(t *testing.T) { target.client = fakeKubeClient // act - err := target.RunOnce() + ctx := context.TODO() + err := target.RunOnce(ctx) if err != nil && !scenario.expectErr { t.Errorf("got unexpected error %v", err) diff --git a/pkg/server/dynamiccertificates/configmap_cafile_content.go b/pkg/server/dynamiccertificates/configmap_cafile_content.go index b09474bc4..428fd66ba 100644 --- a/pkg/server/dynamiccertificates/configmap_cafile_content.go +++ b/pkg/server/dynamiccertificates/configmap_cafile_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "fmt" "sync/atomic" @@ -186,7 +187,7 @@ func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool { } // RunOnce runs a single sync loop -func (c *ConfigMapCAController) RunOnce() error { +func (c *ConfigMapCAController) RunOnce(ctx context.Context) error { // Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for // a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures. _ = c.loadCABundle() @@ -194,7 +195,7 @@ func (c *ConfigMapCAController) RunOnce() error { } // Run starts the kube-apiserver and blocks until stopCh is closed. -func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { +func (c *ConfigMapCAController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -202,23 +203,23 @@ func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // we have a personal informer that is narrowly scoped, start it. - go c.configMapInformer.Run(stopCh) + go c.configMapInformer.Run(ctx.Done()) // wait for your secondary caches to fill before starting your work - if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) { + if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.preRunCaches...) { return } // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) { c.queue.Add(workItemKey) return false, nil - }, stopCh) + }, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *ConfigMapCAController) runWorker() { diff --git a/pkg/server/dynamiccertificates/dynamic_cafile_content.go b/pkg/server/dynamiccertificates/dynamic_cafile_content.go index fb1515c18..58761acd9 100644 --- a/pkg/server/dynamiccertificates/dynamic_cafile_content.go +++ b/pkg/server/dynamiccertificates/dynamic_cafile_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "fmt" "io/ioutil" @@ -39,10 +40,10 @@ var FileRefreshDuration = 1 * time.Minute // ControllerRunner is a generic interface for starting a controller type ControllerRunner interface { // RunOnce runs the sync loop a single time. This useful for synchronous priming - RunOnce() error + RunOnce(ctx context.Context) error // Run should be called a go .Run - Run(workers int, stopCh <-chan struct{}) + Run(ctx context.Context, workers int) } // DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content @@ -144,12 +145,12 @@ func (c *DynamicFileCAContent) hasCAChanged(caBundle []byte) bool { } // RunOnce runs a single sync loop -func (c *DynamicFileCAContent) RunOnce() error { +func (c *DynamicFileCAContent) RunOnce(ctx context.Context) error { return c.loadCABundle() } // Run starts the controller and blocks until stopCh is closed. -func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { +func (c *DynamicFileCAContent) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -157,16 +158,16 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start the loop that watches the CA file until stopCh is closed. go wait.Until(func() { - if err := c.watchCAFile(stopCh); err != nil { + if err := c.watchCAFile(ctx.Done()); err != nil { klog.ErrorS(err, "Failed to watch CA file, will retry later") } - }, time.Minute, stopCh) + }, time.Minute, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error { diff --git a/pkg/server/dynamiccertificates/dynamic_serving_content.go b/pkg/server/dynamiccertificates/dynamic_serving_content.go index 00117176b..9ff1abb64 100644 --- a/pkg/server/dynamiccertificates/dynamic_serving_content.go +++ b/pkg/server/dynamiccertificates/dynamic_serving_content.go @@ -17,6 +17,7 @@ limitations under the License. package dynamiccertificates import ( + "context" "crypto/tls" "fmt" "io/ioutil" @@ -119,12 +120,12 @@ func (c *DynamicCertKeyPairContent) loadCertKeyPair() error { } // RunOnce runs a single sync loop -func (c *DynamicCertKeyPairContent) RunOnce() error { +func (c *DynamicCertKeyPairContent) RunOnce(ctx context.Context) error { return c.loadCertKeyPair() } -// Run starts the controller and blocks until stopCh is closed. -func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { +// Run starts the controller and blocks until context is killed. +func (c *DynamicCertKeyPairContent) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -132,16 +133,16 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { defer klog.InfoS("Shutting down controller", "name", c.name) // doesn't matter what workers say, only start one. - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(c.runWorker, time.Second, ctx.Done()) // start the loop that watches the cert and key files until stopCh is closed. go wait.Until(func() { - if err := c.watchCertKeyFile(stopCh); err != nil { + if err := c.watchCertKeyFile(ctx.Done()); err != nil { klog.ErrorS(err, "Failed to watch cert and key file, will retry later") } - }, time.Minute, stopCh) + }, time.Minute, ctx.Done()) - <-stopCh + <-ctx.Done() } func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error { diff --git a/pkg/server/dynamiccertificates/union_content.go b/pkg/server/dynamiccertificates/union_content.go index e10b112bc..57622bd34 100644 --- a/pkg/server/dynamiccertificates/union_content.go +++ b/pkg/server/dynamiccertificates/union_content.go @@ -18,6 +18,7 @@ package dynamiccertificates import ( "bytes" + "context" "crypto/x509" "strings" @@ -81,11 +82,11 @@ func (c unionCAContent) AddListener(listener Listener) { } // AddListener adds a listener to be notified when the CA content changes. -func (c unionCAContent) RunOnce() error { +func (c unionCAContent) RunOnce(ctx context.Context) error { errors := []error{} for _, curr := range c { if controller, ok := curr.(ControllerRunner); ok { - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { errors = append(errors, err) } } @@ -95,10 +96,10 @@ func (c unionCAContent) RunOnce() error { } // Run runs the controller -func (c unionCAContent) Run(workers int, stopCh <-chan struct{}) { +func (c unionCAContent) Run(ctx context.Context, workers int) { for _, curr := range c { if controller, ok := curr.(ControllerRunner); ok { - go controller.Run(workers, stopCh) + go controller.Run(ctx, workers) } } } diff --git a/pkg/server/options/authentication.go b/pkg/server/options/authentication.go index a82b4a739..8ff771af0 100644 --- a/pkg/server/options/authentication.go +++ b/pkg/server/options/authentication.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "strings" "time" @@ -387,7 +388,10 @@ func (s *DelegatingAuthenticationOptions) createRequestHeaderConfig(client kuber } // look up authentication configuration in the cluster and in case of an err defer to authentication-tolerate-lookup-failure flag - if err := dynamicRequestHeaderProvider.RunOnce(); err != nil { + // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the + // context is not used at all. So passing a empty context shouldn't be a problem + ctx := context.TODO() + if err := dynamicRequestHeaderProvider.RunOnce(ctx); err != nil { return nil, err } diff --git a/pkg/server/options/authentication_dynamic_request_header.go b/pkg/server/options/authentication_dynamic_request_header.go index e2beb5c23..0dac34021 100644 --- a/pkg/server/options/authentication_dynamic_request_header.go +++ b/pkg/server/options/authentication_dynamic_request_header.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "k8s.io/apimachinery/pkg/util/errors" @@ -64,15 +65,15 @@ func newDynamicRequestHeaderController(client kubernetes.Interface) (*DynamicReq }, nil } -func (c *DynamicRequestHeaderController) RunOnce() error { +func (c *DynamicRequestHeaderController) RunOnce(ctx context.Context) error { errs := []error{} - errs = append(errs, c.ConfigMapCAController.RunOnce()) - errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce()) + errs = append(errs, c.ConfigMapCAController.RunOnce(ctx)) + errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce(ctx)) return errors.NewAggregate(errs) } -func (c *DynamicRequestHeaderController) Run(workers int, stopCh <-chan struct{}) { - go c.ConfigMapCAController.Run(workers, stopCh) - go c.RequestHeaderAuthRequestController.Run(workers, stopCh) - <-stopCh +func (c *DynamicRequestHeaderController) Run(ctx context.Context, workers int) { + go c.ConfigMapCAController.Run(ctx, workers) + go c.RequestHeaderAuthRequestController.Run(ctx, workers) + <-ctx.Done() } diff --git a/pkg/server/secure_serving.go b/pkg/server/secure_serving.go index d4caa08d3..64bcc87eb 100644 --- a/pkg/server/secure_serving.go +++ b/pkg/server/secure_serving.go @@ -93,36 +93,45 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro if s.Cert != nil { s.Cert.AddListener(dynamicCertificateController) } - + // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver + // TODO: See if we can pass ctx to the current method + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-stopCh: + cancel() // stopCh closed, so cancel our context + case <-ctx.Done(): + } + }() // start controllers if possible if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of client CA failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of default serving certificate failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } for _, sniCert := range s.SNICerts { sniCert.AddListener(dynamicCertificateController) if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok { // runonce to try to prime data. If this fails, it's ok because we fail closed. // Files are required to be populated already, so this is for convenience. - if err := controller.RunOnce(); err != nil { + if err := controller.RunOnce(ctx); err != nil { klog.Warningf("Initial population of SNI serving certificate failed: %v", err) } - go controller.Run(1, stopCh) + go controller.Run(ctx, 1) } }