278 lines
8.9 KiB
Go
278 lines
8.9 KiB
Go
/*
|
|
Copyright 2019 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package dynamiccertificates
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/x509"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
corev1informers "k8s.io/client-go/informers/core/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
corev1listers "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"k8s.io/klog"
|
|
)
|
|
|
|
// ConfigMapCAController provies a CAContentProvider that can dynamically react to configmap changes
|
|
// It also fulfills the authenticator interface to provide verifyoptions
|
|
type ConfigMapCAController struct {
|
|
name string
|
|
|
|
configmapLister corev1listers.ConfigMapLister
|
|
configmapNamespace string
|
|
configmapName string
|
|
configmapKey string
|
|
// configMapInformer is tracked so that we can start these on Run
|
|
configMapInformer cache.SharedIndexInformer
|
|
|
|
// caBundle is a caBundleAndVerifier that contains the last read, non-zero length content of the file
|
|
caBundle atomic.Value
|
|
|
|
listeners []Listener
|
|
|
|
queue workqueue.RateLimitingInterface
|
|
// preRunCaches are the caches to sync before starting the work of this control loop
|
|
preRunCaches []cache.InformerSynced
|
|
}
|
|
|
|
var _ Notifier = &ConfigMapCAController{}
|
|
var _ CAContentProvider = &ConfigMapCAController{}
|
|
var _ ControllerRunner = &ConfigMapCAController{}
|
|
|
|
// NewDynamicCAFromConfigMapController returns a CAContentProvider based on a configmap that automatically reloads content.
|
|
// It is near-realtime via an informer.
|
|
func NewDynamicCAFromConfigMapController(purpose, namespace, name, key string, kubeClient kubernetes.Interface) (*ConfigMapCAController, error) {
|
|
if len(purpose) == 0 {
|
|
return nil, fmt.Errorf("missing purpose for ca bundle")
|
|
}
|
|
if len(namespace) == 0 {
|
|
return nil, fmt.Errorf("missing namespace for ca bundle")
|
|
}
|
|
if len(name) == 0 {
|
|
return nil, fmt.Errorf("missing name for ca bundle")
|
|
}
|
|
if len(key) == 0 {
|
|
return nil, fmt.Errorf("missing key for ca bundle")
|
|
}
|
|
caContentName := fmt.Sprintf("%s::%s::%s::%s", purpose, namespace, name, key)
|
|
|
|
// we construct our own informer because we need such a small subset of the information available. Just one namespace.
|
|
uncastConfigmapInformer := corev1informers.NewFilteredConfigMapInformer(kubeClient, namespace, 12*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(listOptions *v1.ListOptions) {
|
|
listOptions.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String()
|
|
})
|
|
|
|
configmapLister := corev1listers.NewConfigMapLister(uncastConfigmapInformer.GetIndexer())
|
|
|
|
c := &ConfigMapCAController{
|
|
name: caContentName,
|
|
configmapNamespace: namespace,
|
|
configmapName: name,
|
|
configmapKey: key,
|
|
configmapLister: configmapLister,
|
|
configMapInformer: uncastConfigmapInformer,
|
|
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("DynamicConfigMapCABundle-%s", purpose)),
|
|
preRunCaches: []cache.InformerSynced{uncastConfigmapInformer.HasSynced},
|
|
}
|
|
if err := c.loadCABundle(); err != nil {
|
|
// don't fail, but do print out a message
|
|
klog.Warningf("unable to load initial CA bundle for: %q due to: %s", c.name, err)
|
|
}
|
|
uncastConfigmapInformer.AddEventHandler(cache.FilteringResourceEventHandler{
|
|
FilterFunc: func(obj interface{}) bool {
|
|
if cast, ok := obj.(*corev1.ConfigMap); ok {
|
|
return cast.Name == c.configmapName && cast.Namespace == c.configmapNamespace
|
|
}
|
|
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
|
if cast, ok := tombstone.Obj.(*corev1.ConfigMap); ok {
|
|
return cast.Name == c.configmapName && cast.Namespace == c.configmapNamespace
|
|
}
|
|
}
|
|
return true // always return true just in case. The checks are fairly cheap
|
|
},
|
|
Handler: cache.ResourceEventHandlerFuncs{
|
|
// we have a filter, so any time we're called, we may as well queue. We only ever check one configmap
|
|
// so we don't have to be choosy about our key.
|
|
AddFunc: func(obj interface{}) {
|
|
c.queue.Add(c.keyFn())
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
c.queue.Add(c.keyFn())
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
c.queue.Add(c.keyFn())
|
|
},
|
|
},
|
|
})
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (c *ConfigMapCAController) keyFn() string {
|
|
// this format matches DeletionHandlingMetaNamespaceKeyFunc for our single key
|
|
return c.configmapNamespace + "/" + c.configmapName
|
|
}
|
|
|
|
// AddListener adds a listener to be notified when the CA content changes.
|
|
func (c *ConfigMapCAController) AddListener(listener Listener) {
|
|
c.listeners = append(c.listeners, listener)
|
|
}
|
|
|
|
// loadCABundle determines the next set of content for the file.
|
|
func (c *ConfigMapCAController) loadCABundle() error {
|
|
configMap, err := c.configmapLister.ConfigMaps(c.configmapNamespace).Get(c.configmapName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
caBundle := configMap.Data[c.configmapKey]
|
|
if len(caBundle) == 0 {
|
|
return fmt.Errorf("missing content for CA bundle %q", c.Name())
|
|
}
|
|
|
|
// check to see if we have a change. If the values are the same, do nothing.
|
|
if !c.hasCAChanged([]byte(caBundle)) {
|
|
return nil
|
|
}
|
|
|
|
caBundleAndVerifier, err := newCABundleAndVerifier(c.Name(), []byte(caBundle))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.caBundle.Store(caBundleAndVerifier)
|
|
|
|
for _, listener := range c.listeners {
|
|
listener.Enqueue()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// hasCAChanged returns true if the caBundle is different than the current.
|
|
func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool {
|
|
uncastExisting := c.caBundle.Load()
|
|
if uncastExisting == nil {
|
|
return true
|
|
}
|
|
|
|
// check to see if we have a change. If the values are the same, do nothing.
|
|
existing, ok := uncastExisting.(*caBundleAndVerifier)
|
|
if !ok {
|
|
return true
|
|
}
|
|
if !bytes.Equal(existing.caBundle, caBundle) {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// RunOnce runs a single sync loop
|
|
func (c *ConfigMapCAController) RunOnce() error {
|
|
// Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for
|
|
// a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures.
|
|
_ = c.loadCABundle()
|
|
return nil
|
|
}
|
|
|
|
// Run starts the kube-apiserver and blocks until stopCh is closed.
|
|
func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) {
|
|
defer utilruntime.HandleCrash()
|
|
defer c.queue.ShutDown()
|
|
|
|
klog.Infof("Starting %s", c.name)
|
|
defer klog.Infof("Shutting down %s", c.name)
|
|
|
|
// we have a personal informer that is narrowly scoped, start it.
|
|
go c.configMapInformer.Run(stopCh)
|
|
|
|
// wait for your secondary caches to fill before starting your work
|
|
if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) {
|
|
return
|
|
}
|
|
|
|
// doesn't matter what workers say, only start one.
|
|
go wait.Until(c.runWorker, time.Second, stopCh)
|
|
|
|
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
|
|
_ = wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
|
|
c.queue.Add(workItemKey)
|
|
return false, nil
|
|
}, stopCh)
|
|
|
|
<-stopCh
|
|
}
|
|
|
|
func (c *ConfigMapCAController) runWorker() {
|
|
for c.processNextWorkItem() {
|
|
}
|
|
}
|
|
|
|
func (c *ConfigMapCAController) processNextWorkItem() bool {
|
|
dsKey, quit := c.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
defer c.queue.Done(dsKey)
|
|
|
|
err := c.loadCABundle()
|
|
if err == nil {
|
|
c.queue.Forget(dsKey)
|
|
return true
|
|
}
|
|
|
|
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
|
|
c.queue.AddRateLimited(dsKey)
|
|
|
|
return true
|
|
}
|
|
|
|
// Name is just an identifier
|
|
func (c *ConfigMapCAController) Name() string {
|
|
return c.name
|
|
}
|
|
|
|
// CurrentCABundleContent provides ca bundle byte content
|
|
func (c *ConfigMapCAController) CurrentCABundleContent() []byte {
|
|
uncastObj := c.caBundle.Load()
|
|
if uncastObj == nil {
|
|
return nil // this can happen if we've been unable load data from the apiserver for some reason
|
|
}
|
|
|
|
return c.caBundle.Load().(*caBundleAndVerifier).caBundle
|
|
}
|
|
|
|
// VerifyOptions provides verifyoptions compatible with authenticators
|
|
func (c *ConfigMapCAController) VerifyOptions() (x509.VerifyOptions, bool) {
|
|
uncastObj := c.caBundle.Load()
|
|
if uncastObj == nil {
|
|
// This can happen if we've been unable load data from the apiserver for some reason.
|
|
// In this case, we should not accept any connections on the basis of this ca bundle.
|
|
return x509.VerifyOptions{}, false
|
|
}
|
|
|
|
return uncastObj.(*caBundleAndVerifier).verifyOptions, true
|
|
}
|