API Server Changes

This commit includes all the changes needed for APIServer. Instead of modifying the existing signatures for the methods which either generate or return stopChannel, we generate a context from the channel and use the generated context to be passed to the controllers which are started in APIServer. This ensures we don't have to touch APIServer dependencies.

Kubernetes-commit: 8b84a793b39fed2a62af0876b2eda461a68008c9
This commit is contained in:
Ravi Gudimetla 2022-03-07 09:20:45 -05:00 committed by Kubernetes Publisher
parent 78e10a1e85
commit 1ee261d219
9 changed files with 69 additions and 49 deletions

View File

@ -162,29 +162,29 @@ func (c *RequestHeaderAuthRequestController) AllowedClientNames() []string {
} }
// Run starts RequestHeaderAuthRequestController controller and blocks until stopCh is closed. // Run starts RequestHeaderAuthRequestController controller and blocks until stopCh is closed.
func (c *RequestHeaderAuthRequestController) Run(workers int, stopCh <-chan struct{}) { func (c *RequestHeaderAuthRequestController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
klog.Infof("Starting %s", c.name) klog.Infof("Starting %s", c.name)
defer klog.Infof("Shutting down %s", c.name) defer klog.Infof("Shutting down %s", c.name)
go c.configmapInformer.Run(stopCh) go c.configmapInformer.Run(ctx.Done())
// wait for caches to fill before starting your work // wait for caches to fill before starting your work
if !cache.WaitForNamedCacheSync(c.name, stopCh, c.configmapInformerSynced) { if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.configmapInformerSynced) {
return return
} }
// doesn't matter what workers say, only start one. // doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, ctx.Done())
<-stopCh <-ctx.Done()
} }
// // RunOnce runs a single sync loop // // RunOnce runs a single sync loop
func (c *RequestHeaderAuthRequestController) RunOnce() error { func (c *RequestHeaderAuthRequestController) RunOnce(ctx context.Context) error {
configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(context.TODO(), c.configmapName, metav1.GetOptions{}) configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(ctx, c.configmapName, metav1.GetOptions{})
switch { switch {
case errors.IsNotFound(err): case errors.IsNotFound(err):
// ignore, authConfigMap is nil now // ignore, authConfigMap is nil now

View File

@ -17,6 +17,7 @@ limitations under the License.
package headerrequest package headerrequest
import ( import (
"context"
"encoding/json" "encoding/json"
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
"testing" "testing"
@ -221,7 +222,8 @@ func TestRequestHeaderAuthRequestControllerSyncOnce(t *testing.T) {
target.client = fakeKubeClient target.client = fakeKubeClient
// act // act
err := target.RunOnce() ctx := context.TODO()
err := target.RunOnce(ctx)
if err != nil && !scenario.expectErr { if err != nil && !scenario.expectErr {
t.Errorf("got unexpected error %v", err) t.Errorf("got unexpected error %v", err)

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import ( import (
"bytes" "bytes"
"context"
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
@ -186,7 +187,7 @@ func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool {
} }
// RunOnce runs a single sync loop // RunOnce runs a single sync loop
func (c *ConfigMapCAController) RunOnce() error { func (c *ConfigMapCAController) RunOnce(ctx context.Context) error {
// Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for // Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for
// a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures. // a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures.
_ = c.loadCABundle() _ = c.loadCABundle()
@ -194,7 +195,7 @@ func (c *ConfigMapCAController) RunOnce() error {
} }
// Run starts the kube-apiserver and blocks until stopCh is closed. // Run starts the kube-apiserver and blocks until stopCh is closed.
func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) { func (c *ConfigMapCAController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
@ -202,23 +203,23 @@ func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name) defer klog.InfoS("Shutting down controller", "name", c.name)
// we have a personal informer that is narrowly scoped, start it. // we have a personal informer that is narrowly scoped, start it.
go c.configMapInformer.Run(stopCh) go c.configMapInformer.Run(ctx.Done())
// wait for your secondary caches to fill before starting your work // wait for your secondary caches to fill before starting your work
if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) { if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.preRunCaches...) {
return return
} }
// doesn't matter what workers say, only start one. // doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, ctx.Done())
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) { go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
c.queue.Add(workItemKey) c.queue.Add(workItemKey)
return false, nil return false, nil
}, stopCh) }, ctx.Done())
<-stopCh <-ctx.Done()
} }
func (c *ConfigMapCAController) runWorker() { func (c *ConfigMapCAController) runWorker() {

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import ( import (
"bytes" "bytes"
"context"
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -39,10 +40,10 @@ var FileRefreshDuration = 1 * time.Minute
// ControllerRunner is a generic interface for starting a controller // ControllerRunner is a generic interface for starting a controller
type ControllerRunner interface { type ControllerRunner interface {
// RunOnce runs the sync loop a single time. This useful for synchronous priming // RunOnce runs the sync loop a single time. This useful for synchronous priming
RunOnce() error RunOnce(ctx context.Context) error
// Run should be called a go .Run // Run should be called a go .Run
Run(workers int, stopCh <-chan struct{}) Run(ctx context.Context, workers int)
} }
// DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content // DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content
@ -144,12 +145,12 @@ func (c *DynamicFileCAContent) hasCAChanged(caBundle []byte) bool {
} }
// RunOnce runs a single sync loop // RunOnce runs a single sync loop
func (c *DynamicFileCAContent) RunOnce() error { func (c *DynamicFileCAContent) RunOnce(ctx context.Context) error {
return c.loadCABundle() return c.loadCABundle()
} }
// Run starts the controller and blocks until stopCh is closed. // Run starts the controller and blocks until stopCh is closed.
func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { func (c *DynamicFileCAContent) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
@ -157,16 +158,16 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name) defer klog.InfoS("Shutting down controller", "name", c.name)
// doesn't matter what workers say, only start one. // doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, ctx.Done())
// start the loop that watches the CA file until stopCh is closed. // start the loop that watches the CA file until stopCh is closed.
go wait.Until(func() { go wait.Until(func() {
if err := c.watchCAFile(stopCh); err != nil { if err := c.watchCAFile(ctx.Done()); err != nil {
klog.ErrorS(err, "Failed to watch CA file, will retry later") klog.ErrorS(err, "Failed to watch CA file, will retry later")
} }
}, time.Minute, stopCh) }, time.Minute, ctx.Done())
<-stopCh <-ctx.Done()
} }
func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error { func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error {

View File

@ -17,6 +17,7 @@ limitations under the License.
package dynamiccertificates package dynamiccertificates
import ( import (
"context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -119,12 +120,12 @@ func (c *DynamicCertKeyPairContent) loadCertKeyPair() error {
} }
// RunOnce runs a single sync loop // RunOnce runs a single sync loop
func (c *DynamicCertKeyPairContent) RunOnce() error { func (c *DynamicCertKeyPairContent) RunOnce(ctx context.Context) error {
return c.loadCertKeyPair() return c.loadCertKeyPair()
} }
// Run starts the controller and blocks until stopCh is closed. // Run starts the controller and blocks until context is killed.
func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) { func (c *DynamicCertKeyPairContent) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
@ -132,16 +133,16 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name) defer klog.InfoS("Shutting down controller", "name", c.name)
// doesn't matter what workers say, only start one. // doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, ctx.Done())
// start the loop that watches the cert and key files until stopCh is closed. // start the loop that watches the cert and key files until stopCh is closed.
go wait.Until(func() { go wait.Until(func() {
if err := c.watchCertKeyFile(stopCh); err != nil { if err := c.watchCertKeyFile(ctx.Done()); err != nil {
klog.ErrorS(err, "Failed to watch cert and key file, will retry later") klog.ErrorS(err, "Failed to watch cert and key file, will retry later")
} }
}, time.Minute, stopCh) }, time.Minute, ctx.Done())
<-stopCh <-ctx.Done()
} }
func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error { func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error {

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import ( import (
"bytes" "bytes"
"context"
"crypto/x509" "crypto/x509"
"strings" "strings"
@ -81,11 +82,11 @@ func (c unionCAContent) AddListener(listener Listener) {
} }
// AddListener adds a listener to be notified when the CA content changes. // AddListener adds a listener to be notified when the CA content changes.
func (c unionCAContent) RunOnce() error { func (c unionCAContent) RunOnce(ctx context.Context) error {
errors := []error{} errors := []error{}
for _, curr := range c { for _, curr := range c {
if controller, ok := curr.(ControllerRunner); ok { if controller, ok := curr.(ControllerRunner); ok {
if err := controller.RunOnce(); err != nil { if err := controller.RunOnce(ctx); err != nil {
errors = append(errors, err) errors = append(errors, err)
} }
} }
@ -95,10 +96,10 @@ func (c unionCAContent) RunOnce() error {
} }
// Run runs the controller // Run runs the controller
func (c unionCAContent) Run(workers int, stopCh <-chan struct{}) { func (c unionCAContent) Run(ctx context.Context, workers int) {
for _, curr := range c { for _, curr := range c {
if controller, ok := curr.(ControllerRunner); ok { if controller, ok := curr.(ControllerRunner); ok {
go controller.Run(workers, stopCh) go controller.Run(ctx, workers)
} }
} }
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package options package options
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -387,7 +388,10 @@ func (s *DelegatingAuthenticationOptions) createRequestHeaderConfig(client kuber
} }
// look up authentication configuration in the cluster and in case of an err defer to authentication-tolerate-lookup-failure flag // look up authentication configuration in the cluster and in case of an err defer to authentication-tolerate-lookup-failure flag
if err := dynamicRequestHeaderProvider.RunOnce(); err != nil { // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the
// context is not used at all. So passing a empty context shouldn't be a problem
ctx := context.TODO()
if err := dynamicRequestHeaderProvider.RunOnce(ctx); err != nil {
return nil, err return nil, err
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package options package options
import ( import (
"context"
"fmt" "fmt"
"k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/errors"
@ -64,15 +65,15 @@ func newDynamicRequestHeaderController(client kubernetes.Interface) (*DynamicReq
}, nil }, nil
} }
func (c *DynamicRequestHeaderController) RunOnce() error { func (c *DynamicRequestHeaderController) RunOnce(ctx context.Context) error {
errs := []error{} errs := []error{}
errs = append(errs, c.ConfigMapCAController.RunOnce()) errs = append(errs, c.ConfigMapCAController.RunOnce(ctx))
errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce()) errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce(ctx))
return errors.NewAggregate(errs) return errors.NewAggregate(errs)
} }
func (c *DynamicRequestHeaderController) Run(workers int, stopCh <-chan struct{}) { func (c *DynamicRequestHeaderController) Run(ctx context.Context, workers int) {
go c.ConfigMapCAController.Run(workers, stopCh) go c.ConfigMapCAController.Run(ctx, workers)
go c.RequestHeaderAuthRequestController.Run(workers, stopCh) go c.RequestHeaderAuthRequestController.Run(ctx, workers)
<-stopCh <-ctx.Done()
} }

View File

@ -93,36 +93,45 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro
if s.Cert != nil { if s.Cert != nil {
s.Cert.AddListener(dynamicCertificateController) s.Cert.AddListener(dynamicCertificateController)
} }
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-stopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
// start controllers if possible // start controllers if possible
if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok { if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed. // runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience. // Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil { if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of client CA failed: %v", err) klog.Warningf("Initial population of client CA failed: %v", err)
} }
go controller.Run(1, stopCh) go controller.Run(ctx, 1)
} }
if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok { if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed. // runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience. // Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil { if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of default serving certificate failed: %v", err) klog.Warningf("Initial population of default serving certificate failed: %v", err)
} }
go controller.Run(1, stopCh) go controller.Run(ctx, 1)
} }
for _, sniCert := range s.SNICerts { for _, sniCert := range s.SNICerts {
sniCert.AddListener(dynamicCertificateController) sniCert.AddListener(dynamicCertificateController)
if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok { if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed. // runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience. // Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil { if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of SNI serving certificate failed: %v", err) klog.Warningf("Initial population of SNI serving certificate failed: %v", err)
} }
go controller.Run(1, stopCh) go controller.Run(ctx, 1)
} }
} }