kustomize-controller/controllers/kustomization_controller.go

1069 lines
37 KiB
Go

/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"bytes"
"context"
"fmt"
"os"
"sort"
"strings"
"time"
securejoin "github.com/cyphar/filepath-securejoin"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
"sigs.k8s.io/cli-utils/pkg/object"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
apiacl "github.com/fluxcd/pkg/apis/acl"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/http/fetch"
"github.com/fluxcd/pkg/runtime/acl"
runtimeClient "github.com/fluxcd/pkg/runtime/client"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/pkg/ssa"
"github.com/fluxcd/pkg/tar"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1beta2"
"github.com/fluxcd/kustomize-controller/internal/decryptor"
"github.com/fluxcd/kustomize-controller/internal/generator"
)
// +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations/finalizers,verbs=get;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets;ocirepositories;gitrepositories,verbs=get;list;watch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status;ocirepositories/status;gitrepositories/status,verbs=get
// +kubebuilder:rbac:groups="",resources=configmaps;secrets;serviceaccounts,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// KustomizationReconciler reconciles a Kustomization object
type KustomizationReconciler struct {
client.Client
artifactFetcher *fetch.ArchiveFetcher
requeueDependency time.Duration
Scheme *runtime.Scheme
EventRecorder kuberecorder.EventRecorder
MetricsRecorder *metrics.Recorder
StatusPoller *polling.StatusPoller
PollingOpts polling.Options
ControllerName string
statusManager string
NoCrossNamespaceRefs bool
NoRemoteBases bool
DefaultServiceAccount string
KubeConfigOpts runtimeClient.KubeConfigOptions
}
// KustomizationReconcilerOptions contains options for the KustomizationReconciler.
type KustomizationReconcilerOptions struct {
MaxConcurrentReconciles int
HTTPRetry int
DependencyRequeueInterval time.Duration
RateLimiter ratelimiter.RateLimiter
}
func (r *KustomizationReconciler) SetupWithManager(mgr ctrl.Manager, opts KustomizationReconcilerOptions) error {
const (
ociRepositoryIndexKey string = ".metadata.ociRepository"
gitRepositoryIndexKey string = ".metadata.gitRepository"
bucketIndexKey string = ".metadata.bucket"
)
// Index the Kustomizations by the OCIRepository references they (may) point at.
if err := mgr.GetCache().IndexField(context.TODO(), &kustomizev1.Kustomization{}, ociRepositoryIndexKey,
r.indexBy(sourcev1.OCIRepositoryKind)); err != nil {
return fmt.Errorf("failed setting index fields: %w", err)
}
// Index the Kustomizations by the GitRepository references they (may) point at.
if err := mgr.GetCache().IndexField(context.TODO(), &kustomizev1.Kustomization{}, gitRepositoryIndexKey,
r.indexBy(sourcev1.GitRepositoryKind)); err != nil {
return fmt.Errorf("failed setting index fields: %w", err)
}
// Index the Kustomizations by the Bucket references they (may) point at.
if err := mgr.GetCache().IndexField(context.TODO(), &kustomizev1.Kustomization{}, bucketIndexKey,
r.indexBy(sourcev1.BucketKind)); err != nil {
return fmt.Errorf("failed setting index fields: %w", err)
}
r.requeueDependency = opts.DependencyRequeueInterval
r.statusManager = fmt.Sprintf("gotk-%s", r.ControllerName)
r.artifactFetcher = fetch.NewArchiveFetcher(opts.HTTPRetry, tar.UnlimitedUntarSize, os.Getenv("SOURCE_CONTROLLER_LOCALHOST"))
return ctrl.NewControllerManagedBy(mgr).
For(&kustomizev1.Kustomization{}, builder.WithPredicates(
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}),
)).
Watches(
&source.Kind{Type: &sourcev1.OCIRepository{}},
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(ociRepositoryIndexKey)),
builder.WithPredicates(SourceRevisionChangePredicate{}),
).
Watches(
&source.Kind{Type: &sourcev1.GitRepository{}},
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(gitRepositoryIndexKey)),
builder.WithPredicates(SourceRevisionChangePredicate{}),
).
Watches(
&source.Kind{Type: &sourcev1.Bucket{}},
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(bucketIndexKey)),
builder.WithPredicates(SourceRevisionChangePredicate{}),
).
WithOptions(controller.Options{
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
RateLimiter: opts.RateLimiter,
RecoverPanic: true,
}).
Complete(r)
}
func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
reconcileStart := time.Now()
var kustomization kustomizev1.Kustomization
if err := r.Get(ctx, req.NamespacedName, &kustomization); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Record suspended status metric
defer r.recordSuspension(ctx, kustomization)
// Add our finalizer if it does not exist
if !controllerutil.ContainsFinalizer(&kustomization, kustomizev1.KustomizationFinalizer) {
patch := client.MergeFrom(kustomization.DeepCopy())
controllerutil.AddFinalizer(&kustomization, kustomizev1.KustomizationFinalizer)
if err := r.Patch(ctx, &kustomization, patch, client.FieldOwner(r.statusManager)); err != nil {
log.Error(err, "unable to register finalizer")
return ctrl.Result{}, err
}
}
// Examine if the object is under deletion
if !kustomization.ObjectMeta.DeletionTimestamp.IsZero() {
return r.finalize(ctx, kustomization)
}
// Return early if the Kustomization is suspended.
if kustomization.Spec.Suspend {
log.Info("Reconciliation is suspended for this object")
return ctrl.Result{}, nil
}
// resolve source reference
source, err := r.getSource(ctx, kustomization)
if err != nil {
if apierrors.IsNotFound(err) {
msg := fmt.Sprintf("Source '%s' not found", kustomization.Spec.SourceRef.String())
kustomization = kustomizev1.KustomizationNotReady(kustomization, "", kustomizev1.ArtifactFailedReason, msg)
if err := r.patchStatus(ctx, req, kustomization.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(ctx, kustomization)
log.Info(msg)
// do not requeue immediately, when the source is created the watcher should trigger a reconciliation
return ctrl.Result{RequeueAfter: kustomization.GetRetryInterval()}, nil
}
if acl.IsAccessDenied(err) {
kustomization = kustomizev1.KustomizationNotReady(kustomization, "", apiacl.AccessDeniedReason, err.Error())
if err := r.patchStatus(ctx, req, kustomization.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
log.Error(err, "access denied to cross-namespace source")
r.recordReadiness(ctx, kustomization)
r.event(ctx, kustomization, "unknown", events.EventSeverityError, err.Error(), nil)
return ctrl.Result{RequeueAfter: kustomization.GetRetryInterval()}, nil
}
// retry on transient errors
return ctrl.Result{Requeue: true}, err
}
if source.GetArtifact() == nil {
msg := "Source is not ready, artifact not found"
kustomization = kustomizev1.KustomizationNotReady(kustomization, "", kustomizev1.ArtifactFailedReason, msg)
if err := r.patchStatus(ctx, req, kustomization.Status); err != nil {
log.Error(err, "unable to update status for artifact not found")
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(ctx, kustomization)
log.Info(msg)
// do not requeue immediately, when the artifact is created the watcher should trigger a reconciliation
return ctrl.Result{RequeueAfter: kustomization.GetRetryInterval()}, nil
}
// check dependencies
if len(kustomization.Spec.DependsOn) > 0 {
if err := r.checkDependencies(source, kustomization); err != nil {
kustomization = kustomizev1.KustomizationNotReady(
kustomization, source.GetArtifact().Revision, kustomizev1.DependencyNotReadyReason, err.Error())
if err := r.patchStatus(ctx, req, kustomization.Status); err != nil {
log.Error(err, "unable to update status for dependency not ready")
return ctrl.Result{Requeue: true}, err
}
// we can't rely on exponential backoff because it will prolong the execution too much,
// instead we requeue on a fix interval.
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
log.Info(msg)
r.event(ctx, kustomization, source.GetArtifact().Revision, events.EventSeverityInfo, msg, nil)
r.recordReadiness(ctx, kustomization)
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
}
log.Info("All dependencies are ready, proceeding with reconciliation")
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &kustomization)
if err != nil {
return ctrl.Result{}, err
}
defer r.MetricsRecorder.RecordDuration(*objRef, reconcileStart)
}
// set the reconciliation status to progressing
kustomization = kustomizev1.KustomizationProgressing(kustomization, "reconciliation in progress")
if err := r.patchStatus(ctx, req, kustomization.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(ctx, kustomization)
// reconcile kustomization by applying the latest revision
reconciledKustomization, reconcileErr := r.reconcile(ctx, *kustomization.DeepCopy(), source)
// requeue if the artifact is not found
if reconcileErr == fetch.FileNotFoundError {
msg := fmt.Sprintf("Source is not ready, artifact not found, retrying in %s", r.requeueDependency.String())
log.Info(msg)
if err := r.patchStatus(ctx, req, kustomizev1.KustomizationProgressing(kustomization, msg).Status); err != nil {
log.Error(err, "unable to update status for artifact not found")
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
}
if err := r.patchStatus(ctx, req, reconciledKustomization.Status); err != nil {
return ctrl.Result{Requeue: true}, err
}
r.recordReadiness(ctx, reconciledKustomization)
// broadcast the reconciliation failure and requeue at the specified retry interval
if reconcileErr != nil {
log.Error(reconcileErr, fmt.Sprintf("Reconciliation failed after %s, next try in %s",
time.Since(reconcileStart).String(),
kustomization.GetRetryInterval().String()),
"revision",
source.GetArtifact().Revision)
r.event(ctx, reconciledKustomization, source.GetArtifact().Revision, events.EventSeverityError,
reconcileErr.Error(), nil)
return ctrl.Result{RequeueAfter: kustomization.GetRetryInterval()}, nil
}
// broadcast the reconciliation result and requeue at the specified interval
msg := fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Since(reconcileStart).String(),
kustomization.Spec.Interval.Duration.String())
log.Info(msg, "revision", source.GetArtifact().Revision)
r.event(ctx, reconciledKustomization, source.GetArtifact().Revision, events.EventSeverityInfo,
msg, map[string]string{kustomizev1.GroupVersion.Group + "/commit_status": "update"})
return ctrl.Result{RequeueAfter: kustomization.Spec.Interval.Duration}, nil
}
func (r *KustomizationReconciler) reconcile(
ctx context.Context,
kustomization kustomizev1.Kustomization,
source sourcev1.Source) (kustomizev1.Kustomization, error) {
// record the value of the reconciliation request, if any
if v, ok := meta.ReconcileAnnotationValue(kustomization.GetAnnotations()); ok {
kustomization.Status.SetLastHandledReconcileRequest(v)
}
revision := source.GetArtifact().Revision
// create tmp dir
tmpDir, err := MkdirTempAbs("", "kustomization-")
if err != nil {
err = fmt.Errorf("tmp dir error: %w", err)
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
sourcev1.DirCreationFailedReason,
err.Error(),
), err
}
defer os.RemoveAll(tmpDir)
// download artifact and extract files
err = r.artifactFetcher.Fetch(source.GetArtifact().URL, source.GetArtifact().Checksum, tmpDir)
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.ArtifactFailedReason,
err.Error(),
), err
}
// check build path exists
dirPath, err := securejoin.SecureJoin(tmpDir, kustomization.Spec.Path)
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.ArtifactFailedReason,
err.Error(),
), err
}
if _, err := os.Stat(dirPath); err != nil {
err = fmt.Errorf("kustomization path not found: %w", err)
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.ArtifactFailedReason,
err.Error(),
), err
}
// setup the Kubernetes client for impersonation
impersonation := runtimeClient.NewImpersonator(
r.Client,
r.StatusPoller,
r.PollingOpts,
kustomization.Spec.KubeConfig,
r.KubeConfigOpts,
r.DefaultServiceAccount,
kustomization.Spec.ServiceAccountName,
kustomization.GetNamespace(),
)
kubeClient, statusPoller, err := impersonation.GetClient(ctx)
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.ReconciliationFailedReason,
err.Error(),
), fmt.Errorf("failed to build kube client: %w", err)
}
// generate kustomization.yaml if needed
err = r.generate(kustomization, tmpDir, dirPath)
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.BuildFailedReason,
err.Error(),
), err
}
// build the kustomization
resources, err := r.build(ctx, tmpDir, kustomization, dirPath)
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.BuildFailedReason,
err.Error(),
), err
}
// convert the build result into Kubernetes unstructured objects
objects, err := ssa.ReadObjects(bytes.NewReader(resources))
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.BuildFailedReason,
err.Error(),
), err
}
// create a snapshot of the current inventory
oldStatus := kustomization.Status.DeepCopy()
// create the server-side apply manager
resourceManager := ssa.NewResourceManager(kubeClient, statusPoller, ssa.Owner{
Field: r.ControllerName,
Group: kustomizev1.GroupVersion.Group,
})
resourceManager.SetOwnerLabels(objects, kustomization.GetName(), kustomization.GetNamespace())
// validate and apply resources in stages
drifted, changeSet, err := r.apply(ctx, resourceManager, kustomization, revision, objects)
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.ReconciliationFailedReason,
err.Error(),
), err
}
// create an inventory of objects to be reconciled
newInventory := NewInventory()
err = AddObjectsToInventory(newInventory, changeSet)
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.ReconciliationFailedReason,
err.Error(),
), err
}
// detect stale objects which are subject to garbage collection
var staleObjects []*unstructured.Unstructured
if oldStatus.Inventory != nil {
diffObjects, err := DiffInventory(oldStatus.Inventory, newInventory)
if err != nil {
return kustomizev1.KustomizationNotReady(
kustomization,
revision,
kustomizev1.ReconciliationFailedReason,
err.Error(),
), err
}
// TODO: remove this workaround after kustomize-controller 0.18 release
// skip objects that were wrongly marked as namespaced
// https://github.com/fluxcd/kustomize-controller/issues/466
newObjects, _ := ListObjectsInInventory(newInventory)
for _, obj := range diffObjects {
preserve := false
if obj.GetNamespace() != "" {
for _, newObj := range newObjects {
if newObj.GetNamespace() == "" &&
obj.GetKind() == newObj.GetKind() &&
obj.GetAPIVersion() == newObj.GetAPIVersion() &&
obj.GetName() == newObj.GetName() {
preserve = true
break
}
}
}
if !preserve {
staleObjects = append(staleObjects, obj)
}
}
}
// run garbage collection for stale objects that do not have pruning disabled
if _, err := r.prune(ctx, resourceManager, kustomization, revision, staleObjects); err != nil {
return kustomizev1.KustomizationNotReadyInventory(
kustomization,
newInventory,
revision,
kustomizev1.PruneFailedReason,
err.Error(),
), err
}
// health assessment
if err := r.checkHealth(ctx, resourceManager, kustomization, revision, drifted, changeSet.ToObjMetadataSet()); err != nil {
return kustomizev1.KustomizationNotReadyInventory(
kustomization,
newInventory,
revision,
kustomizev1.HealthCheckFailedReason,
err.Error(),
), err
}
return kustomizev1.KustomizationReadyInventory(
kustomization,
newInventory,
revision,
kustomizev1.ReconciliationSucceededReason,
fmt.Sprintf("Applied revision: %s", revision),
), nil
}
func (r *KustomizationReconciler) checkDependencies(source sourcev1.Source, kustomization kustomizev1.Kustomization) error {
for _, d := range kustomization.Spec.DependsOn {
if d.Namespace == "" {
d.Namespace = kustomization.GetNamespace()
}
dName := types.NamespacedName{
Namespace: d.Namespace,
Name: d.Name,
}
var k kustomizev1.Kustomization
err := r.Get(context.Background(), dName, &k)
if err != nil {
return fmt.Errorf("unable to get '%s' dependency: %w", dName, err)
}
if len(k.Status.Conditions) == 0 || k.Generation != k.Status.ObservedGeneration {
return fmt.Errorf("dependency '%s' is not ready", dName)
}
if !apimeta.IsStatusConditionTrue(k.Status.Conditions, meta.ReadyCondition) {
return fmt.Errorf("dependency '%s' is not ready", dName)
}
if k.Spec.SourceRef.Name == kustomization.Spec.SourceRef.Name && k.Spec.SourceRef.Namespace == kustomization.Spec.SourceRef.Namespace && k.Spec.SourceRef.Kind == kustomization.Spec.SourceRef.Kind && source.GetArtifact().Revision != k.Status.LastAppliedRevision {
return fmt.Errorf("dependency '%s' is not updated yet", dName)
}
}
return nil
}
func (r *KustomizationReconciler) getSource(ctx context.Context, kustomization kustomizev1.Kustomization) (sourcev1.Source, error) {
var source sourcev1.Source
sourceNamespace := kustomization.GetNamespace()
if kustomization.Spec.SourceRef.Namespace != "" {
sourceNamespace = kustomization.Spec.SourceRef.Namespace
}
namespacedName := types.NamespacedName{
Namespace: sourceNamespace,
Name: kustomization.Spec.SourceRef.Name,
}
if r.NoCrossNamespaceRefs && sourceNamespace != kustomization.GetNamespace() {
return source, acl.AccessDeniedError(
fmt.Sprintf("can't access '%s/%s', cross-namespace references have been blocked",
kustomization.Spec.SourceRef.Kind, namespacedName))
}
switch kustomization.Spec.SourceRef.Kind {
case sourcev1.OCIRepositoryKind:
var repository sourcev1.OCIRepository
err := r.Client.Get(ctx, namespacedName, &repository)
if err != nil {
if apierrors.IsNotFound(err) {
return source, err
}
return source, fmt.Errorf("unable to get source '%s': %w", namespacedName, err)
}
source = &repository
case sourcev1.GitRepositoryKind:
var repository sourcev1.GitRepository
err := r.Client.Get(ctx, namespacedName, &repository)
if err != nil {
if apierrors.IsNotFound(err) {
return source, err
}
return source, fmt.Errorf("unable to get source '%s': %w", namespacedName, err)
}
source = &repository
case sourcev1.BucketKind:
var bucket sourcev1.Bucket
err := r.Client.Get(ctx, namespacedName, &bucket)
if err != nil {
if apierrors.IsNotFound(err) {
return source, err
}
return source, fmt.Errorf("unable to get source '%s': %w", namespacedName, err)
}
source = &bucket
default:
return source, fmt.Errorf("source `%s` kind '%s' not supported",
kustomization.Spec.SourceRef.Name, kustomization.Spec.SourceRef.Kind)
}
return source, nil
}
func (r *KustomizationReconciler) generate(kustomization kustomizev1.Kustomization, workDir string, dirPath string) error {
_, err := generator.NewGenerator(workDir, kustomization).WriteFile(dirPath)
return err
}
func (r *KustomizationReconciler) build(ctx context.Context, workDir string, kustomization kustomizev1.Kustomization, dirPath string) ([]byte, error) {
dec, cleanup, err := decryptor.NewTempDecryptor(workDir, r.Client, kustomization)
if err != nil {
return nil, err
}
defer cleanup()
// Import decryption keys
if err := dec.ImportKeys(ctx); err != nil {
return nil, err
}
// Decrypt Kustomize EnvSources files before build
if err = dec.DecryptEnvSources(dirPath); err != nil {
return nil, fmt.Errorf("error decrypting env sources: %w", err)
}
m, err := generator.Build(workDir, dirPath, !r.NoRemoteBases)
if err != nil {
return nil, fmt.Errorf("kustomize build failed: %w", err)
}
for _, res := range m.Resources() {
// check if resources conform to the Kubernetes API conventions
if res.GetName() == "" || res.GetKind() == "" || res.GetApiVersion() == "" {
return nil, fmt.Errorf("failed to decode Kubernetes apiVersion, kind and name from: %v", res.String())
}
// check if resources are encrypted and decrypt them before generating the final YAML
if kustomization.Spec.Decryption != nil {
outRes, err := dec.DecryptResource(res)
if err != nil {
return nil, fmt.Errorf("decryption failed for '%s': %w", res.GetName(), err)
}
if outRes != nil {
_, err = m.Replace(res)
if err != nil {
return nil, err
}
}
}
// run variable substitutions
if kustomization.Spec.PostBuild != nil {
outRes, err := generator.SubstituteVariables(ctx, r.Client, kustomization, res)
if err != nil {
return nil, fmt.Errorf("var substitution failed for '%s': %w", res.GetName(), err)
}
if outRes != nil {
_, err = m.Replace(res)
if err != nil {
return nil, err
}
}
}
}
resources, err := m.AsYaml()
if err != nil {
return nil, fmt.Errorf("kustomize build failed: %w", err)
}
return resources, nil
}
func (r *KustomizationReconciler) apply(ctx context.Context, manager *ssa.ResourceManager, kustomization kustomizev1.Kustomization, revision string, objects []*unstructured.Unstructured) (bool, *ssa.ChangeSet, error) {
log := ctrl.LoggerFrom(ctx)
if err := ssa.SetNativeKindsDefaults(objects); err != nil {
return false, nil, err
}
applyOpts := ssa.DefaultApplyOptions()
applyOpts.Force = kustomization.Spec.Force
applyOpts.Exclusions = map[string]string{
fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
}
applyOpts.Cleanup = ssa.ApplyCleanupOptions{
Annotations: []string{
// remove the kubectl annotation
corev1.LastAppliedConfigAnnotation,
// remove deprecated fluxcd.io annotations
"kustomize.toolkit.fluxcd.io/checksum",
"fluxcd.io/sync-checksum",
},
Labels: []string{
// remove deprecated fluxcd.io labels
"fluxcd.io/sync-gc-mark",
},
FieldManagers: []ssa.FieldManager{
{
// to undo changes made with 'kubectl apply --server-side --force-conflicts'
Name: "kubectl",
OperationType: metav1.ManagedFieldsOperationApply,
},
{
// to undo changes made with 'kubectl apply'
Name: "kubectl",
OperationType: metav1.ManagedFieldsOperationUpdate,
},
{
// to undo changes made with 'kubectl apply'
Name: "before-first-apply",
OperationType: metav1.ManagedFieldsOperationUpdate,
},
{
// to undo changes made by the controller before SSA
Name: r.ControllerName,
OperationType: metav1.ManagedFieldsOperationUpdate,
},
},
Exclusions: map[string]string{
fmt.Sprintf("%s/ssa", kustomizev1.GroupVersion.Group): kustomizev1.MergeValue,
},
}
// contains only CRDs and Namespaces
var defStage []*unstructured.Unstructured
// contains only Kubernetes Class types e.g.: RuntimeClass, PriorityClass,
// StorageClas, VolumeSnapshotClass, IngressClass, GatewayClass, ClusterClass, etc
var classStage []*unstructured.Unstructured
// contains all objects except for CRDs, Namespaces and Class type objects
var resStage []*unstructured.Unstructured
// contains the objects' metadata after apply
resultSet := ssa.NewChangeSet()
for _, u := range objects {
if decryptor.IsEncryptedSecret(u) {
return false, nil,
fmt.Errorf("%s is SOPS encrypted, configuring decryption is required for this secret to be reconciled",
ssa.FmtUnstructured(u))
}
switch {
case ssa.IsClusterDefinition(u):
defStage = append(defStage, u)
case strings.HasSuffix(u.GetKind(), "Class"):
classStage = append(classStage, u)
default:
resStage = append(resStage, u)
}
}
var changeSetLog strings.Builder
// validate, apply and wait for CRDs and Namespaces to register
if len(defStage) > 0 {
changeSet, err := manager.ApplyAll(ctx, defStage, applyOpts)
if err != nil {
return false, nil, err
}
resultSet.Append(changeSet.Entries)
if changeSet != nil && len(changeSet.Entries) > 0 {
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToMap())
for _, change := range changeSet.Entries {
if change.Action != string(ssa.UnchangedAction) {
changeSetLog.WriteString(change.String() + "\n")
}
}
if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{
Interval: 2 * time.Second,
Timeout: kustomization.GetTimeout(),
}); err != nil {
return false, nil, err
}
}
}
// validate, apply and wait for Class type objects to register
if len(classStage) > 0 {
changeSet, err := manager.ApplyAll(ctx, classStage, applyOpts)
if err != nil {
return false, nil, err
}
resultSet.Append(changeSet.Entries)
if changeSet != nil && len(changeSet.Entries) > 0 {
log.Info("server-side apply for cluster class types completed", "output", changeSet.ToMap())
for _, change := range changeSet.Entries {
if change.Action != string(ssa.UnchangedAction) {
changeSetLog.WriteString(change.String() + "\n")
}
}
if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{
Interval: 2 * time.Second,
Timeout: kustomization.GetTimeout(),
}); err != nil {
return false, nil, err
}
}
}
// sort by kind, validate and apply all the others objects
sort.Sort(ssa.SortableUnstructureds(resStage))
if len(resStage) > 0 {
changeSet, err := manager.ApplyAll(ctx, resStage, applyOpts)
if err != nil {
return false, nil, fmt.Errorf("%w\n%s", err, changeSetLog.String())
}
resultSet.Append(changeSet.Entries)
if changeSet != nil && len(changeSet.Entries) > 0 {
log.Info("server-side apply completed", "output", changeSet.ToMap())
for _, change := range changeSet.Entries {
if change.Action != string(ssa.UnchangedAction) {
changeSetLog.WriteString(change.String() + "\n")
}
}
}
}
// emit event only if the server-side apply resulted in changes
applyLog := strings.TrimSuffix(changeSetLog.String(), "\n")
if applyLog != "" {
r.event(ctx, kustomization, revision, events.EventSeverityInfo, applyLog, nil)
}
return applyLog != "", resultSet, nil
}
func (r *KustomizationReconciler) checkHealth(ctx context.Context, manager *ssa.ResourceManager, kustomization kustomizev1.Kustomization, revision string, drifted bool, objects object.ObjMetadataSet) error {
if len(kustomization.Spec.HealthChecks) == 0 && !kustomization.Spec.Wait {
return nil
}
checkStart := time.Now()
var err error
if !kustomization.Spec.Wait {
objects, err = referenceToObjMetadataSet(kustomization.Spec.HealthChecks)
if err != nil {
return err
}
}
if len(objects) == 0 {
return nil
}
// guard against deadlock (waiting on itself)
var toCheck []object.ObjMetadata
for _, object := range objects {
if object.GroupKind.Kind == kustomizev1.KustomizationKind &&
object.Name == kustomization.GetName() &&
object.Namespace == kustomization.GetNamespace() {
continue
}
toCheck = append(toCheck, object)
}
// find the previous health check result
wasHealthy := apimeta.IsStatusConditionTrue(kustomization.Status.Conditions, kustomizev1.HealthyCondition)
// set the Healthy and Ready conditions to progressing
message := fmt.Sprintf("running health checks with a timeout of %s", kustomization.GetTimeout().String())
k := kustomizev1.KustomizationProgressing(kustomization, message)
kustomizev1.SetKustomizationHealthiness(&k, metav1.ConditionUnknown, meta.ProgressingReason, message)
if err := r.patchStatus(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&kustomization)}, k.Status); err != nil {
return fmt.Errorf("unable to update the healthy status to progressing, error: %w", err)
}
// check the health with a default timeout of 30sec shorter than the reconciliation interval
if err := manager.WaitForSet(toCheck, ssa.WaitOptions{
Interval: 5 * time.Second,
Timeout: kustomization.GetTimeout(),
}); err != nil {
return fmt.Errorf("Health check failed after %s, %w", time.Since(checkStart).String(), err)
}
// emit event if the previous health check failed
if !wasHealthy || (kustomization.Status.LastAppliedRevision != revision && drifted) {
r.event(ctx, kustomization, revision, events.EventSeverityInfo,
fmt.Sprintf("Health check passed in %s", time.Since(checkStart).String()), nil)
}
return nil
}
func (r *KustomizationReconciler) prune(ctx context.Context, manager *ssa.ResourceManager, kustomization kustomizev1.Kustomization, revision string, objects []*unstructured.Unstructured) (bool, error) {
if !kustomization.Spec.Prune {
return false, nil
}
log := ctrl.LoggerFrom(ctx)
opts := ssa.DeleteOptions{
PropagationPolicy: metav1.DeletePropagationBackground,
Inclusions: manager.GetOwnerLabels(kustomization.Name, kustomization.Namespace),
Exclusions: map[string]string{
fmt.Sprintf("%s/prune", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
},
}
changeSet, err := manager.DeleteAll(ctx, objects, opts)
if err != nil {
return false, err
}
// emit event only if the prune operation resulted in changes
if changeSet != nil && len(changeSet.Entries) > 0 {
log.Info(fmt.Sprintf("garbage collection completed: %s", changeSet.String()))
r.event(ctx, kustomization, revision, events.EventSeverityInfo, changeSet.String(), nil)
return true, nil
}
return false, nil
}
func (r *KustomizationReconciler) finalize(ctx context.Context, kustomization kustomizev1.Kustomization) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
if kustomization.Spec.Prune &&
!kustomization.Spec.Suspend &&
kustomization.Status.Inventory != nil &&
kustomization.Status.Inventory.Entries != nil {
objects, _ := ListObjectsInInventory(kustomization.Status.Inventory)
impersonation := runtimeClient.NewImpersonator(
r.Client,
r.StatusPoller,
r.PollingOpts,
kustomization.Spec.KubeConfig,
r.KubeConfigOpts,
r.DefaultServiceAccount,
kustomization.Spec.ServiceAccountName,
kustomization.GetNamespace(),
)
if impersonation.CanImpersonate(ctx) {
kubeClient, _, err := impersonation.GetClient(ctx)
if err != nil {
return ctrl.Result{}, err
}
resourceManager := ssa.NewResourceManager(kubeClient, nil, ssa.Owner{
Field: r.ControllerName,
Group: kustomizev1.GroupVersion.Group,
})
opts := ssa.DeleteOptions{
PropagationPolicy: metav1.DeletePropagationBackground,
Inclusions: resourceManager.GetOwnerLabels(kustomization.Name, kustomization.Namespace),
Exclusions: map[string]string{
fmt.Sprintf("%s/prune", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
},
}
changeSet, err := resourceManager.DeleteAll(ctx, objects, opts)
if err != nil {
r.event(ctx, kustomization, kustomization.Status.LastAppliedRevision, events.EventSeverityError, "pruning for deleted resource failed", nil)
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
if changeSet != nil && len(changeSet.Entries) > 0 {
r.event(ctx, kustomization, kustomization.Status.LastAppliedRevision, events.EventSeverityInfo, changeSet.String(), nil)
}
} else {
// when the account to impersonate is gone, log the stale objects and continue with the finalization
msg := fmt.Sprintf("unable to prune objects: \n%s", ssa.FmtUnstructuredList(objects))
log.Error(fmt.Errorf("skiping pruning, failed to find account to impersonate"), msg)
r.event(ctx, kustomization, kustomization.Status.LastAppliedRevision, events.EventSeverityError, msg, nil)
}
}
// Record deleted status
r.recordReadiness(ctx, kustomization)
// Remove our finalizer from the list and update it
controllerutil.RemoveFinalizer(&kustomization, kustomizev1.KustomizationFinalizer)
if err := r.Update(ctx, &kustomization, client.FieldOwner(r.statusManager)); err != nil {
return ctrl.Result{}, err
}
// Stop reconciliation as the object is being deleted
return ctrl.Result{}, nil
}
func (r *KustomizationReconciler) event(ctx context.Context, kustomization kustomizev1.Kustomization, revision, severity, msg string, metadata map[string]string) {
if metadata == nil {
metadata = map[string]string{}
}
if revision != "" {
metadata[kustomizev1.GroupVersion.Group+"/revision"] = revision
}
reason := severity
if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil {
reason = c.Reason
}
eventtype := "Normal"
if severity == events.EventSeverityError {
eventtype = "Warning"
}
r.EventRecorder.AnnotatedEventf(&kustomization, metadata, eventtype, reason, msg)
}
func (r *KustomizationReconciler) recordReadiness(ctx context.Context, kustomization kustomizev1.Kustomization) {
if r.MetricsRecorder == nil {
return
}
log := ctrl.LoggerFrom(ctx)
objRef, err := reference.GetReference(r.Scheme, &kustomization)
if err != nil {
log.Error(err, "unable to record readiness metric")
return
}
if rc := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, !kustomization.DeletionTimestamp.IsZero())
} else {
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionUnknown,
}, !kustomization.DeletionTimestamp.IsZero())
}
}
func (r *KustomizationReconciler) recordSuspension(ctx context.Context, kustomization kustomizev1.Kustomization) {
if r.MetricsRecorder == nil {
return
}
log := ctrl.LoggerFrom(ctx)
objRef, err := reference.GetReference(r.Scheme, &kustomization)
if err != nil {
log.Error(err, "unable to record suspended metric")
return
}
if !kustomization.DeletionTimestamp.IsZero() {
r.MetricsRecorder.RecordSuspend(*objRef, false)
} else {
r.MetricsRecorder.RecordSuspend(*objRef, kustomization.Spec.Suspend)
}
}
func (r *KustomizationReconciler) patchStatus(ctx context.Context, req ctrl.Request, newStatus kustomizev1.KustomizationStatus) error {
var kustomization kustomizev1.Kustomization
if err := r.Get(ctx, req.NamespacedName, &kustomization); err != nil {
return err
}
patch := client.MergeFrom(kustomization.DeepCopy())
kustomization.Status = newStatus
return r.Status().Patch(ctx, &kustomization, patch, client.FieldOwner(r.statusManager))
}