linkerd2/controller/ca/controller.go

200 lines
5.3 KiB
Go

package ca
import (
"fmt"
"strings"
"time"
"github.com/linkerd/linkerd2/controller/k8s"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
log "github.com/sirupsen/logrus"
"k8s.io/api/admissionregistration/v1beta1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type CertificateController struct {
namespace string
k8sAPI *k8s.API
ca *CA
syncHandler func(key string) error
// The queue is keyed on a string. If the string doesn't contain any dots
// then it is a namespace name and the task is to create the CA bundle
// configmap in that namespace. Otherwise the string must be of the form
// "$podOwner.$podKind.$podNamespace" and the task is to create the secret
// for that pod owner.
queue workqueue.RateLimitingInterface
}
func NewCertificateController(controllerNamespace string, k8sAPI *k8s.API, proxyAutoInject bool) (*CertificateController, error) {
ca, err := NewCA()
if err != nil {
return nil, err
}
c := &CertificateController{
namespace: controllerNamespace,
k8sAPI: k8sAPI,
ca: ca,
queue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "certificates"),
}
k8sAPI.Pod().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePodAdd,
UpdateFunc: c.handlePodUpdate,
},
)
if proxyAutoInject {
k8sAPI.MWC().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.handleMWCAdd,
UpdateFunc: c.handleMWCUpdate,
},
)
}
c.syncHandler = c.syncObject
return c, nil
}
func (c *CertificateController) Run(readyCh <-chan struct{}, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()
<-readyCh
log.Info("starting certificate controller")
defer log.Info("shutting down certificate controller")
go wait.Until(c.worker, time.Second, stopCh)
<-stopCh
}
func (c *CertificateController) worker() {
for c.processNextWorkItem() {
}
}
func (c *CertificateController) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncHandler(key.(string))
if err != nil {
log.Errorf("error syncing object: %s", err)
c.queue.AddRateLimited(key)
return true
}
c.queue.Forget(key)
return true
}
func (c *CertificateController) syncObject(key string) error {
log.Debugf("syncObject(%s)", key)
if !strings.Contains(key, ".") {
return c.syncNamespace(key)
}
return c.syncSecret(key)
}
func (c *CertificateController) syncNamespace(ns string) error {
log.Debugf("syncNamespace(%s)", ns)
configMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: pkgK8s.TLSTrustAnchorConfigMapName},
Data: map[string]string{
pkgK8s.TLSTrustAnchorFileName: c.ca.TrustAnchorPEM(),
},
}
log.Debugf("adding configmap [%s] to namespace [%s]",
pkgK8s.TLSTrustAnchorConfigMapName, ns)
_, err := c.k8sAPI.Client.CoreV1().ConfigMaps(ns).Create(configMap)
if apierrors.IsAlreadyExists(err) {
_, err = c.k8sAPI.Client.CoreV1().ConfigMaps(ns).Update(configMap)
}
return err
}
func (c *CertificateController) syncSecret(key string) error {
log.Debugf("syncSecret(%s)", key)
parts := strings.Split(key, ".")
if len(parts) != 3 {
log.Errorf("Failed to parse secret sync request %s", key)
return nil // TODO
}
identity := pkgK8s.TLSIdentity{
Name: parts[0],
Kind: parts[1],
Namespace: parts[2],
ControllerNamespace: c.namespace,
}
dnsName := identity.ToDNSName()
secretName := identity.ToSecretName()
certAndPrivateKey, err := c.ca.IssueEndEntityCertificate(dnsName)
if err != nil {
log.Errorf("Failed to issue certificate for %s", dnsName)
return err
}
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: secretName},
Data: map[string][]byte{
pkgK8s.TLSCertFileName: certAndPrivateKey.Certificate,
pkgK8s.TLSPrivateKeyFileName: certAndPrivateKey.PrivateKey,
},
}
_, err = c.k8sAPI.Client.CoreV1().Secrets(identity.Namespace).Create(secret)
if apierrors.IsAlreadyExists(err) {
_, err = c.k8sAPI.Client.CoreV1().Secrets(identity.Namespace).Update(secret)
}
return err
}
func (c *CertificateController) handlePodAdd(obj interface{}) {
pod := obj.(*v1.Pod)
if pkgK8s.IsMeshed(pod, c.namespace) {
log.Debugf("enqueuing update of CA bundle configmap in %s", pod.Namespace)
c.queue.Add(pod.Namespace)
ownerKind, ownerName := c.k8sAPI.GetOwnerKindAndName(pod)
item := fmt.Sprintf("%s.%s.%s", ownerName, ownerKind, pod.Namespace)
log.Debugf("enqueuing secret write for %s", item)
c.queue.Add(item)
}
}
func (c *CertificateController) handlePodUpdate(oldObj, newObj interface{}) {
c.handlePodAdd(newObj)
}
func (c *CertificateController) handleMWCAdd(obj interface{}) {
mwc := obj.(*v1beta1.MutatingWebhookConfiguration)
log.Debugf("enqueuing secret write for mutating webhook configuration %q", mwc.ObjectMeta.Name)
for _, webhook := range mwc.Webhooks {
if mwc.Name == pkgK8s.ProxyInjectorWebhookConfig {
c.queue.Add(fmt.Sprintf("%s.%s.%s", webhook.ClientConfig.Service.Name, pkgK8s.Service, webhook.ClientConfig.Service.Namespace))
}
}
}
func (c *CertificateController) handleMWCUpdate(oldObj, newObj interface{}) {
c.handleMWCAdd(newObj)
}