/* Copyright 2020 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controller import ( "bytes" "context" "errors" "fmt" "os" "sort" "strings" "time" securejoin "github.com/cyphar/filepath-securejoin" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/fluxcd/cli-utils/pkg/kstatus/polling" "github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine" "github.com/fluxcd/cli-utils/pkg/object" apiacl "github.com/fluxcd/pkg/apis/acl" eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/apis/meta" "github.com/fluxcd/pkg/auth" "github.com/fluxcd/pkg/cache" "github.com/fluxcd/pkg/http/fetch" generator "github.com/fluxcd/pkg/kustomize" "github.com/fluxcd/pkg/runtime/acl" "github.com/fluxcd/pkg/runtime/cel" runtimeClient "github.com/fluxcd/pkg/runtime/client" "github.com/fluxcd/pkg/runtime/conditions" runtimeCtrl "github.com/fluxcd/pkg/runtime/controller" "github.com/fluxcd/pkg/runtime/jitter" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" "github.com/fluxcd/pkg/runtime/statusreaders" "github.com/fluxcd/pkg/ssa" "github.com/fluxcd/pkg/ssa/normalize" ssautil "github.com/fluxcd/pkg/ssa/utils" "github.com/fluxcd/pkg/tar" sourcev1 "github.com/fluxcd/source-controller/api/v1" sourcev1b2 "github.com/fluxcd/source-controller/api/v1beta2" kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1" intcache "github.com/fluxcd/kustomize-controller/internal/cache" "github.com/fluxcd/kustomize-controller/internal/decryptor" "github.com/fluxcd/kustomize-controller/internal/inventory" ) // +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations/status,verbs=get;update;patch // +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets;ocirepositories;gitrepositories,verbs=get;list;watch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status;ocirepositories/status;gitrepositories/status,verbs=get // +kubebuilder:rbac:groups="",resources=configmaps;secrets;serviceaccounts,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=serviceaccounts/token,verbs=create // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch // KustomizationReconciler reconciles a Kustomization object type KustomizationReconciler struct { client.Client kuberecorder.EventRecorder runtimeCtrl.Metrics artifactFetchRetries int requeueDependency time.Duration Mapper apimeta.RESTMapper APIReader client.Reader ClusterReader engine.ClusterReaderFactory ControllerName string statusManager string NoCrossNamespaceRefs bool NoRemoteBases bool FailFast bool DefaultServiceAccount string KubeConfigOpts runtimeClient.KubeConfigOptions ConcurrentSSA int DisallowedFieldManagers []string StrictSubstitutions bool GroupChangeLog bool TokenCache *cache.TokenCache } // KustomizationReconcilerOptions contains options for the KustomizationReconciler. type KustomizationReconcilerOptions struct { HTTPRetry int DependencyRequeueInterval time.Duration RateLimiter workqueue.TypedRateLimiter[reconcile.Request] } func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opts KustomizationReconcilerOptions) error { const ( ociRepositoryIndexKey string = ".metadata.ociRepository" gitRepositoryIndexKey string = ".metadata.gitRepository" bucketIndexKey string = ".metadata.bucket" ) // Index the Kustomizations by the OCIRepository references they (may) point at. if err := mgr.GetCache().IndexField(ctx, &kustomizev1.Kustomization{}, ociRepositoryIndexKey, r.indexBy(sourcev1b2.OCIRepositoryKind)); err != nil { return fmt.Errorf("failed setting index fields: %w", err) } // Index the Kustomizations by the GitRepository references they (may) point at. if err := mgr.GetCache().IndexField(ctx, &kustomizev1.Kustomization{}, gitRepositoryIndexKey, r.indexBy(sourcev1.GitRepositoryKind)); err != nil { return fmt.Errorf("failed setting index fields: %w", err) } // Index the Kustomizations by the Bucket references they (may) point at. if err := mgr.GetCache().IndexField(ctx, &kustomizev1.Kustomization{}, bucketIndexKey, r.indexBy(sourcev1.BucketKind)); err != nil { return fmt.Errorf("failed setting index fields: %w", err) } r.requeueDependency = opts.DependencyRequeueInterval r.statusManager = fmt.Sprintf("gotk-%s", r.ControllerName) r.artifactFetchRetries = opts.HTTPRetry return ctrl.NewControllerManagedBy(mgr). For(&kustomizev1.Kustomization{}, builder.WithPredicates( predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}), )). Watches( &sourcev1b2.OCIRepository{}, handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(ociRepositoryIndexKey)), builder.WithPredicates(SourceRevisionChangePredicate{}), ). Watches( &sourcev1.GitRepository{}, handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(gitRepositoryIndexKey)), builder.WithPredicates(SourceRevisionChangePredicate{}), ). Watches( &sourcev1.Bucket{}, handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(bucketIndexKey)), builder.WithPredicates(SourceRevisionChangePredicate{}), ). WithOptions(controller.Options{ RateLimiter: opts.RateLimiter, }). Complete(r) } func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) { log := ctrl.LoggerFrom(ctx) reconcileStart := time.Now() obj := &kustomizev1.Kustomization{} if err := r.Get(ctx, req.NamespacedName, obj); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } // Initialize the runtime patcher with the current version of the object. patcher := patch.NewSerialPatcher(obj, r.Client) // Finalise the reconciliation and report the results. defer func() { // Patch finalizers, status and conditions. if err := r.finalizeStatus(ctx, obj, patcher); err != nil { retErr = kerrors.NewAggregate([]error{retErr, err}) } // Record Prometheus metrics. r.Metrics.RecordDuration(ctx, obj, reconcileStart) // Do not proceed if the Kustomization is suspended if obj.Spec.Suspend { return } // Log and emit success event. if conditions.IsReady(obj) { msg := fmt.Sprintf("Reconciliation finished in %s, next run in %s", time.Since(reconcileStart).String(), obj.Spec.Interval.Duration.String()) log.Info(msg, "revision", obj.Status.LastAttemptedRevision) r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, msg, map[string]string{ kustomizev1.GroupVersion.Group + "/" + eventv1.MetaCommitStatusKey: eventv1.MetaCommitStatusUpdateValue, }) } }() // Prune managed resources if the object is under deletion. if !obj.ObjectMeta.DeletionTimestamp.IsZero() { return r.finalize(ctx, obj) } // Add finalizer first if it doesn't exist to avoid the race condition // between init and delete. // Note: Finalizers in general can only be added when the deletionTimestamp // is not set. if !controllerutil.ContainsFinalizer(obj, kustomizev1.KustomizationFinalizer) { controllerutil.AddFinalizer(obj, kustomizev1.KustomizationFinalizer) return ctrl.Result{Requeue: true}, nil } // Skip reconciliation if the object is suspended. if obj.Spec.Suspend { log.Info("Reconciliation is suspended for this object") return ctrl.Result{}, nil } // Configure custom health checks. statusReaders, err := cel.PollerWithCustomHealthChecks(ctx, obj.Spec.HealthCheckExprs) if err != nil { const msg = "Reconciliation failed terminally due to configuration error" errMsg := fmt.Sprintf("%s: %v", msg, err) conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg) conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg) obj.Status.ObservedGeneration = obj.Generation log.Error(err, msg) r.event(obj, "", "", eventv1.EventSeverityError, errMsg, nil) return ctrl.Result{}, nil } // Check object-level workload identity feature gate. if d := obj.Spec.Decryption; d != nil && d.ServiceAccountName != "" && !auth.IsObjectLevelWorkloadIdentityEnabled() { const gate = auth.FeatureGateObjectLevelWorkloadIdentity const msgFmt = "to use spec.decryption.serviceAccountName for decryption authentication please enable the %s feature gate in the controller" msg := fmt.Sprintf(msgFmt, gate) conditions.MarkFalse(obj, meta.ReadyCondition, meta.FeatureGateDisabledReason, msgFmt, gate) conditions.MarkStalled(obj, meta.FeatureGateDisabledReason, msgFmt, gate) log.Error(auth.ErrObjectLevelWorkloadIdentityNotEnabled, msg) r.event(obj, "", "", eventv1.EventSeverityError, msg, nil) return ctrl.Result{}, nil } // Resolve the source reference and requeue the reconciliation if the source is not found. artifactSource, err := r.getSource(ctx, obj) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", err) if apierrors.IsNotFound(err) { msg := fmt.Sprintf("Source '%s' not found", obj.Spec.SourceRef.String()) log.Info(msg) return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil } if acl.IsAccessDenied(err) { conditions.MarkFalse(obj, meta.ReadyCondition, apiacl.AccessDeniedReason, "%s", err) log.Error(err, "Access denied to cross-namespace source") r.event(obj, "", "", eventv1.EventSeverityError, err.Error(), nil) return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil } // Retry with backoff on transient errors. return ctrl.Result{}, err } // Requeue the reconciliation if the source artifact is not found. if artifactSource.GetArtifact() == nil { msg := fmt.Sprintf("Source artifact not found, retrying in %s", r.requeueDependency.String()) conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", msg) log.Info(msg) return ctrl.Result{RequeueAfter: r.requeueDependency}, nil } revision := artifactSource.GetArtifact().Revision originRevision := getOriginRevision(artifactSource) // Check dependencies and requeue the reconciliation if the check fails. if len(obj.Spec.DependsOn) > 0 { if err := r.checkDependencies(ctx, obj, artifactSource); err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err) msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String()) log.Info(msg) r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil) return ctrl.Result{RequeueAfter: r.requeueDependency}, nil } log.Info("All dependencies are ready, proceeding with reconciliation") } // Reconcile the latest revision. reconcileErr := r.reconcile(ctx, obj, artifactSource, patcher, statusReaders) // Requeue at the specified retry interval if the artifact tarball is not found. if errors.Is(reconcileErr, fetch.ErrFileNotFound) { msg := fmt.Sprintf("Source is not ready, artifact not found, retrying in %s", r.requeueDependency.String()) conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", msg) log.Info(msg) return ctrl.Result{RequeueAfter: r.requeueDependency}, nil } // Broadcast the reconciliation failure and requeue at the specified retry interval. if reconcileErr != nil { log.Error(reconcileErr, fmt.Sprintf("Reconciliation failed after %s, next try in %s", time.Since(reconcileStart).String(), obj.GetRetryInterval().String()), "revision", revision) r.event(obj, revision, originRevision, eventv1.EventSeverityError, reconcileErr.Error(), nil) return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil } // Requeue the reconciliation at the specified interval. return ctrl.Result{RequeueAfter: jitter.JitteredIntervalDuration(obj.GetRequeueAfter())}, nil } func (r *KustomizationReconciler) reconcile( ctx context.Context, obj *kustomizev1.Kustomization, src sourcev1.Source, patcher *patch.SerialPatcher, statusReaders []func(apimeta.RESTMapper) engine.StatusReader) error { log := ctrl.LoggerFrom(ctx) // Update status with the reconciliation progress. revision := src.GetArtifact().Revision originRevision := getOriginRevision(src) progressingMsg := fmt.Sprintf("Fetching manifests for revision %s with a timeout of %s", revision, obj.GetTimeout().String()) conditions.MarkUnknown(obj, meta.ReadyCondition, meta.ProgressingReason, "%s", "Reconciliation in progress") conditions.MarkReconciling(obj, meta.ProgressingReason, "%s", progressingMsg) if err := r.patch(ctx, obj, patcher); err != nil { return fmt.Errorf("failed to update status: %w", err) } // Create a snapshot of the current inventory. oldInventory := inventory.New() if obj.Status.Inventory != nil { obj.Status.Inventory.DeepCopyInto(oldInventory) } // Create tmp dir. tmpDir, err := MkdirTempAbs("", "kustomization-") if err != nil { err = fmt.Errorf("tmp dir error: %w", err) conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.DirCreationFailedReason, "%s", err) return err } defer func(path string) { if err := os.RemoveAll(path); err != nil { log.Error(err, "failed to remove tmp dir", "path", path) } }(tmpDir) // Download artifact and extract files to the tmp dir. if err = fetch.NewArchiveFetcherWithLogger( r.artifactFetchRetries, tar.UnlimitedUntarSize, tar.UnlimitedUntarSize, os.Getenv("SOURCE_CONTROLLER_LOCALHOST"), ctrl.LoggerFrom(ctx), ).Fetch(src.GetArtifact().URL, src.GetArtifact().Digest, tmpDir); err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", err) return err } // check build path exists dirPath, err := securejoin.SecureJoin(tmpDir, obj.Spec.Path) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", err) return err } if _, err := os.Stat(dirPath); err != nil { err = fmt.Errorf("kustomization path not found: %w", err) conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", err) return err } // Report progress and set last attempted revision in status. obj.Status.LastAttemptedRevision = revision progressingMsg = fmt.Sprintf("Building manifests for revision %s with a timeout of %s", revision, obj.GetTimeout().String()) conditions.MarkReconciling(obj, meta.ProgressingReason, "%s", progressingMsg) if err := r.patch(ctx, obj, patcher); err != nil { return fmt.Errorf("failed to update status: %w", err) } // Configure the Kubernetes client for impersonation. var impersonatorOpts []runtimeClient.ImpersonatorOption var mustImpersonate bool if r.DefaultServiceAccount != "" || obj.Spec.ServiceAccountName != "" { mustImpersonate = true impersonatorOpts = append(impersonatorOpts, runtimeClient.WithServiceAccount(r.DefaultServiceAccount, obj.Spec.ServiceAccountName, obj.GetNamespace())) } if obj.Spec.KubeConfig != nil { mustImpersonate = true impersonatorOpts = append(impersonatorOpts, runtimeClient.WithKubeConfig(obj.Spec.KubeConfig, r.KubeConfigOpts, obj.GetNamespace())) } if r.ClusterReader != nil || len(statusReaders) > 0 { impersonatorOpts = append(impersonatorOpts, runtimeClient.WithPolling(r.ClusterReader, statusReaders...)) } impersonation := runtimeClient.NewImpersonator(r.Client, impersonatorOpts...) // Create the Kubernetes client that runs under impersonation. var kubeClient client.Client var statusPoller *polling.StatusPoller if mustImpersonate { kubeClient, statusPoller, err = impersonation.GetClient(ctx) } else { kubeClient, statusPoller = r.getClientAndPoller(statusReaders) } if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err) return fmt.Errorf("failed to build kube client: %w", err) } // Generate kustomization.yaml if needed. k, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.BuildFailedReason, "%s", err) return err } err = r.generate(unstructured.Unstructured{Object: k}, tmpDir, dirPath) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.BuildFailedReason, "%s", err) return err } // Build the Kustomize overlay and decrypt secrets if needed. resources, err := r.build(ctx, obj, unstructured.Unstructured{Object: k}, tmpDir, dirPath) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.BuildFailedReason, "%s", err) return err } // Convert the build result into Kubernetes unstructured objects. objects, err := ssautil.ReadObjects(bytes.NewReader(resources)) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.BuildFailedReason, "%s", err) return err } // Create the server-side apply manager. resourceManager := ssa.NewResourceManager(kubeClient, statusPoller, ssa.Owner{ Field: r.ControllerName, Group: kustomizev1.GroupVersion.Group, }) resourceManager.SetOwnerLabels(objects, obj.GetName(), obj.GetNamespace()) resourceManager.SetConcurrency(r.ConcurrentSSA) // Update status with the reconciliation progress. progressingMsg = fmt.Sprintf("Detecting drift for revision %s with a timeout of %s", revision, obj.GetTimeout().String()) conditions.MarkReconciling(obj, meta.ProgressingReason, "%s", progressingMsg) if err := r.patch(ctx, obj, patcher); err != nil { return fmt.Errorf("failed to update status: %w", err) } // Validate and apply resources in stages. drifted, changeSet, err := r.apply(ctx, resourceManager, obj, revision, originRevision, objects) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err) return err } // Create an inventory from the reconciled resources. newInventory := inventory.New() err = inventory.AddChangeSet(newInventory, changeSet) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err) return err } // Set last applied inventory in status. obj.Status.Inventory = newInventory // Detect stale resources which are subject to garbage collection. staleObjects, err := inventory.Diff(oldInventory, newInventory) if err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err) return err } // Run garbage collection for stale resources that do not have pruning disabled. if _, err := r.prune(ctx, resourceManager, obj, revision, originRevision, staleObjects); err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.PruneFailedReason, "%s", err) return err } // Run the health checks for the last applied resources. isNewRevision := !src.GetArtifact().HasRevision(obj.Status.LastAppliedRevision) if err := r.checkHealth(ctx, resourceManager, patcher, obj, revision, originRevision, isNewRevision, drifted, changeSet.ToObjMetadataSet()); err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err) return err } // Set last applied revisions. obj.Status.LastAppliedRevision = revision obj.Status.LastAppliedOriginRevision = originRevision // Mark the object as ready. conditions.MarkTrue(obj, meta.ReadyCondition, meta.ReconciliationSucceededReason, "Applied revision: %s", revision) return nil } func (r *KustomizationReconciler) checkDependencies(ctx context.Context, obj *kustomizev1.Kustomization, source sourcev1.Source) error { for _, d := range obj.Spec.DependsOn { if d.Namespace == "" { d.Namespace = obj.GetNamespace() } dName := types.NamespacedName{ Namespace: d.Namespace, Name: d.Name, } var k kustomizev1.Kustomization err := r.APIReader.Get(ctx, dName, &k) if err != nil { return fmt.Errorf("dependency '%s' not found: %w", dName, err) } if len(k.Status.Conditions) == 0 || k.Generation != k.Status.ObservedGeneration { return fmt.Errorf("dependency '%s' is not ready", dName) } if !apimeta.IsStatusConditionTrue(k.Status.Conditions, meta.ReadyCondition) { return fmt.Errorf("dependency '%s' is not ready", dName) } srcNamespace := k.Spec.SourceRef.Namespace if srcNamespace == "" { srcNamespace = k.GetNamespace() } dSrcNamespace := obj.Spec.SourceRef.Namespace if dSrcNamespace == "" { dSrcNamespace = obj.GetNamespace() } if k.Spec.SourceRef.Name == obj.Spec.SourceRef.Name && srcNamespace == dSrcNamespace && k.Spec.SourceRef.Kind == obj.Spec.SourceRef.Kind && !source.GetArtifact().HasRevision(k.Status.LastAppliedRevision) { return fmt.Errorf("dependency '%s' revision is not up to date", dName) } } return nil } func (r *KustomizationReconciler) getSource(ctx context.Context, obj *kustomizev1.Kustomization) (sourcev1.Source, error) { var src sourcev1.Source sourceNamespace := obj.GetNamespace() if obj.Spec.SourceRef.Namespace != "" { sourceNamespace = obj.Spec.SourceRef.Namespace } namespacedName := types.NamespacedName{ Namespace: sourceNamespace, Name: obj.Spec.SourceRef.Name, } if r.NoCrossNamespaceRefs && sourceNamespace != obj.GetNamespace() { return src, acl.AccessDeniedError( fmt.Sprintf("can't access '%s/%s', cross-namespace references have been blocked", obj.Spec.SourceRef.Kind, namespacedName)) } switch obj.Spec.SourceRef.Kind { case sourcev1b2.OCIRepositoryKind: var repository sourcev1b2.OCIRepository err := r.Client.Get(ctx, namespacedName, &repository) if err != nil { if apierrors.IsNotFound(err) { return src, err } return src, fmt.Errorf("unable to get source '%s': %w", namespacedName, err) } src = &repository case sourcev1.GitRepositoryKind: var repository sourcev1.GitRepository err := r.Client.Get(ctx, namespacedName, &repository) if err != nil { if apierrors.IsNotFound(err) { return src, err } return src, fmt.Errorf("unable to get source '%s': %w", namespacedName, err) } src = &repository case sourcev1.BucketKind: var bucket sourcev1.Bucket err := r.Client.Get(ctx, namespacedName, &bucket) if err != nil { if apierrors.IsNotFound(err) { return src, err } return src, fmt.Errorf("unable to get source '%s': %w", namespacedName, err) } src = &bucket default: return src, fmt.Errorf("source `%s` kind '%s' not supported", obj.Spec.SourceRef.Name, obj.Spec.SourceRef.Kind) } return src, nil } func (r *KustomizationReconciler) generate(obj unstructured.Unstructured, workDir string, dirPath string) error { _, err := generator.NewGenerator(workDir, obj).WriteFile(dirPath) return err } func (r *KustomizationReconciler) build(ctx context.Context, obj *kustomizev1.Kustomization, u unstructured.Unstructured, workDir, dirPath string) ([]byte, error) { dec, cleanup, err := decryptor.NewTempDecryptor(workDir, r.Client, obj, r.TokenCache) if err != nil { return nil, err } defer cleanup() // Import keys and static credentials for decryption. if err := dec.ImportKeys(ctx); err != nil { return nil, err } // Set options for secret-less authentication with cloud providers for decryption. dec.SetAuthOptions(ctx) // Decrypt Kustomize EnvSources files before build if err = dec.DecryptSources(dirPath); err != nil { return nil, fmt.Errorf("error decrypting sources: %w", err) } m, err := generator.SecureBuild(workDir, dirPath, !r.NoRemoteBases) if err != nil { return nil, fmt.Errorf("kustomize build failed: %w", err) } for _, res := range m.Resources() { // check if resources conform to the Kubernetes API conventions if res.GetName() == "" || res.GetKind() == "" || res.GetApiVersion() == "" { return nil, fmt.Errorf("failed to decode Kubernetes apiVersion, kind and name from: %v", res.String()) } // check if resources are encrypted and decrypt them before generating the final YAML if obj.Spec.Decryption != nil { outRes, err := dec.DecryptResource(res) if err != nil { return nil, fmt.Errorf("decryption failed for '%s': %w", res.GetName(), err) } if outRes != nil { _, err = m.Replace(res) if err != nil { return nil, err } } } // run variable substitutions if obj.Spec.PostBuild != nil { outRes, err := generator.SubstituteVariables(ctx, r.Client, u, res, generator.SubstituteWithStrict(r.StrictSubstitutions)) if err != nil { return nil, fmt.Errorf("post build failed for '%s': %w", res.GetName(), err) } if outRes != nil { _, err = m.Replace(res) if err != nil { return nil, err } } } } resources, err := m.AsYaml() if err != nil { return nil, fmt.Errorf("kustomize build failed: %w", err) } return resources, nil } func (r *KustomizationReconciler) apply(ctx context.Context, manager *ssa.ResourceManager, obj *kustomizev1.Kustomization, revision string, originRevision string, objects []*unstructured.Unstructured) (bool, *ssa.ChangeSet, error) { log := ctrl.LoggerFrom(ctx) if err := normalize.UnstructuredList(objects); err != nil { return false, nil, err } if cmeta := obj.Spec.CommonMetadata; cmeta != nil { ssautil.SetCommonMetadata(objects, cmeta.Labels, cmeta.Annotations) } applyOpts := ssa.DefaultApplyOptions() applyOpts.Force = obj.Spec.Force applyOpts.ExclusionSelector = map[string]string{ fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue, fmt.Sprintf("%s/ssa", kustomizev1.GroupVersion.Group): kustomizev1.IgnoreValue, } applyOpts.IfNotPresentSelector = map[string]string{ fmt.Sprintf("%s/ssa", kustomizev1.GroupVersion.Group): kustomizev1.IfNotPresentValue, } applyOpts.ForceSelector = map[string]string{ fmt.Sprintf("%s/force", kustomizev1.GroupVersion.Group): kustomizev1.EnabledValue, } fieldManagers := []ssa.FieldManager{ { // to undo changes made with 'kubectl apply --server-side --force-conflicts' Name: "kubectl", OperationType: metav1.ManagedFieldsOperationApply, }, { // to undo changes made with 'kubectl apply' Name: "kubectl", OperationType: metav1.ManagedFieldsOperationUpdate, }, { // to undo changes made with 'kubectl apply' Name: "before-first-apply", OperationType: metav1.ManagedFieldsOperationUpdate, }, { // to undo changes made by the controller before SSA Name: r.ControllerName, OperationType: metav1.ManagedFieldsOperationUpdate, }, } for _, fieldManager := range r.DisallowedFieldManagers { fieldManagers = append(fieldManagers, ssa.FieldManager{ Name: fieldManager, OperationType: metav1.ManagedFieldsOperationApply, }) // to undo changes made by the controller before SSA fieldManagers = append(fieldManagers, ssa.FieldManager{ Name: fieldManager, OperationType: metav1.ManagedFieldsOperationUpdate, }) } applyOpts.Cleanup = ssa.ApplyCleanupOptions{ Annotations: []string{ // remove the kubectl annotation corev1.LastAppliedConfigAnnotation, // remove deprecated fluxcd.io annotations "kustomize.toolkit.fluxcd.io/checksum", "fluxcd.io/sync-checksum", }, Labels: []string{ // remove deprecated fluxcd.io labels "fluxcd.io/sync-gc-mark", }, FieldManagers: fieldManagers, Exclusions: map[string]string{ fmt.Sprintf("%s/ssa", kustomizev1.GroupVersion.Group): kustomizev1.MergeValue, }, } // contains only CRDs and Namespaces var defStage []*unstructured.Unstructured // contains only Kubernetes Class types e.g.: RuntimeClass, PriorityClass, // StorageClass, VolumeSnapshotClass, IngressClass, GatewayClass, ClusterClass, etc var classStage []*unstructured.Unstructured // contains all objects except for CRDs, Namespaces and Class type objects var resStage []*unstructured.Unstructured // contains the objects' metadata after apply resultSet := ssa.NewChangeSet() for _, u := range objects { if decryptor.IsEncryptedSecret(u) { return false, nil, fmt.Errorf("%s is SOPS encrypted, configuring decryption is required for this secret to be reconciled", ssautil.FmtUnstructured(u)) } switch { case ssautil.IsClusterDefinition(u): defStage = append(defStage, u) case strings.HasSuffix(u.GetKind(), "Class"): classStage = append(classStage, u) default: resStage = append(resStage, u) } } var changeSetLog strings.Builder // validate, apply and wait for CRDs and Namespaces to register if len(defStage) > 0 { changeSet, err := manager.ApplyAll(ctx, defStage, applyOpts) if err != nil { return false, nil, err } if changeSet != nil && len(changeSet.Entries) > 0 { resultSet.Append(changeSet.Entries) if r.GroupChangeLog { log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap()) } else { log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToMap()) } for _, change := range changeSet.Entries { if HasChanged(change.Action) { changeSetLog.WriteString(change.String() + "\n") } } if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{ Interval: 2 * time.Second, Timeout: obj.GetTimeout(), }); err != nil { return false, nil, err } } } // validate, apply and wait for Class type objects to register if len(classStage) > 0 { changeSet, err := manager.ApplyAll(ctx, classStage, applyOpts) if err != nil { return false, nil, err } if changeSet != nil && len(changeSet.Entries) > 0 { resultSet.Append(changeSet.Entries) if r.GroupChangeLog { log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap()) } else { log.Info("server-side apply for cluster class types completed", "output", changeSet.ToMap()) } for _, change := range changeSet.Entries { if HasChanged(change.Action) { changeSetLog.WriteString(change.String() + "\n") } } if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{ Interval: 2 * time.Second, Timeout: obj.GetTimeout(), }); err != nil { return false, nil, err } } } // sort by kind, validate and apply all the others objects sort.Sort(ssa.SortableUnstructureds(resStage)) if len(resStage) > 0 { changeSet, err := manager.ApplyAll(ctx, resStage, applyOpts) if err != nil { return false, nil, fmt.Errorf("%w\n%s", err, changeSetLog.String()) } if changeSet != nil && len(changeSet.Entries) > 0 { resultSet.Append(changeSet.Entries) if r.GroupChangeLog { log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap()) } else { log.Info("server-side apply completed", "output", changeSet.ToMap(), "revision", revision) } for _, change := range changeSet.Entries { if HasChanged(change.Action) { changeSetLog.WriteString(change.String() + "\n") } } } } // emit event only if the server-side apply resulted in changes applyLog := strings.TrimSuffix(changeSetLog.String(), "\n") if applyLog != "" { r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, applyLog, nil) } return applyLog != "", resultSet, nil } func (r *KustomizationReconciler) checkHealth(ctx context.Context, manager *ssa.ResourceManager, patcher *patch.SerialPatcher, obj *kustomizev1.Kustomization, revision string, originRevision string, isNewRevision bool, drifted bool, objects object.ObjMetadataSet) error { if len(obj.Spec.HealthChecks) == 0 && !obj.Spec.Wait { conditions.Delete(obj, meta.HealthyCondition) return nil } checkStart := time.Now() var err error if !obj.Spec.Wait { objects, err = inventory.ReferenceToObjMetadataSet(obj.Spec.HealthChecks) if err != nil { return err } } if len(objects) == 0 { conditions.Delete(obj, meta.HealthyCondition) return nil } // Guard against deadlock (waiting on itself). var toCheck []object.ObjMetadata for _, o := range objects { if o.GroupKind.Kind == kustomizev1.KustomizationKind && o.Name == obj.GetName() && o.Namespace == obj.GetNamespace() { continue } toCheck = append(toCheck, o) } // Find the previous health check result. wasHealthy := apimeta.IsStatusConditionTrue(obj.Status.Conditions, meta.HealthyCondition) // Update status with the reconciliation progress. message := fmt.Sprintf("Running health checks for revision %s with a timeout of %s", revision, obj.GetTimeout().String()) conditions.MarkReconciling(obj, meta.ProgressingReason, "%s", message) conditions.MarkUnknown(obj, meta.HealthyCondition, meta.ProgressingReason, "%s", message) if err := r.patch(ctx, obj, patcher); err != nil { return fmt.Errorf("unable to update the healthy status to progressing: %w", err) } // Check the health with a default timeout of 30sec shorter than the reconciliation interval. if err := manager.WaitForSet(toCheck, ssa.WaitOptions{ Interval: 5 * time.Second, Timeout: obj.GetTimeout(), FailFast: r.FailFast, }); err != nil { conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err) conditions.MarkFalse(obj, meta.HealthyCondition, meta.HealthCheckFailedReason, "%s", err) return fmt.Errorf("health check failed after %s: %w", time.Since(checkStart).String(), err) } // Emit recovery event if the previous health check failed. msg := fmt.Sprintf("Health check passed in %s", time.Since(checkStart).String()) if !wasHealthy || (isNewRevision && drifted) { r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil) } conditions.MarkTrue(obj, meta.HealthyCondition, meta.SucceededReason, "%s", msg) if err := r.patch(ctx, obj, patcher); err != nil { return fmt.Errorf("unable to update the healthy status to progressing: %w", err) } return nil } func (r *KustomizationReconciler) prune(ctx context.Context, manager *ssa.ResourceManager, obj *kustomizev1.Kustomization, revision string, originRevision string, objects []*unstructured.Unstructured) (bool, error) { if !obj.Spec.Prune { return false, nil } log := ctrl.LoggerFrom(ctx) opts := ssa.DeleteOptions{ PropagationPolicy: metav1.DeletePropagationBackground, Inclusions: manager.GetOwnerLabels(obj.Name, obj.Namespace), Exclusions: map[string]string{ fmt.Sprintf("%s/prune", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue, fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue, }, } changeSet, err := manager.DeleteAll(ctx, objects, opts) if err != nil { return false, err } // emit event only if the prune operation resulted in changes if changeSet != nil && len(changeSet.Entries) > 0 { log.Info(fmt.Sprintf("garbage collection completed: %s", changeSet.String())) r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, changeSet.String(), nil) return true, nil } return false, nil } // finalizerShouldDeleteResources determines if resources should be deleted // based on the object's inventory and deletion policy. // A suspended Kustomization or one without an inventory will not delete resources. func finalizerShouldDeleteResources(obj *kustomizev1.Kustomization) bool { if obj.Spec.Suspend { return false } if obj.Status.Inventory == nil || len(obj.Status.Inventory.Entries) == 0 { return false } switch obj.GetDeletionPolicy() { case kustomizev1.DeletionPolicyMirrorPrune: return obj.Spec.Prune case kustomizev1.DeletionPolicyDelete: return true case kustomizev1.DeletionPolicyWaitForTermination: return true default: return false } } // finalize handles the finalization logic for a Kustomization resource during its deletion process. // Managed resources are pruned based on the deletion policy and suspended state of the Kustomization. // When the policy is set to WaitForTermination, the function blocks and waits for the resources // to be terminated by the Kubernetes Garbage Collector for the specified timeout duration. // If the service account used for impersonation is no longer available or if a timeout occurs // while waiting for resources to be terminated, an error is logged and the finalizer is removed. func (r *KustomizationReconciler) finalize(ctx context.Context, obj *kustomizev1.Kustomization) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) if finalizerShouldDeleteResources(obj) { objects, _ := inventory.List(obj.Status.Inventory) var impersonatorOpts []runtimeClient.ImpersonatorOption var mustImpersonate bool if r.DefaultServiceAccount != "" || obj.Spec.ServiceAccountName != "" { mustImpersonate = true impersonatorOpts = append(impersonatorOpts, runtimeClient.WithServiceAccount(r.DefaultServiceAccount, obj.Spec.ServiceAccountName, obj.GetNamespace())) } if obj.Spec.KubeConfig != nil { mustImpersonate = true impersonatorOpts = append(impersonatorOpts, runtimeClient.WithKubeConfig(obj.Spec.KubeConfig, r.KubeConfigOpts, obj.GetNamespace())) } if r.ClusterReader != nil { impersonatorOpts = append(impersonatorOpts, runtimeClient.WithPolling(r.ClusterReader)) } impersonation := runtimeClient.NewImpersonator(r.Client, impersonatorOpts...) if impersonation.CanImpersonate(ctx) { var kubeClient client.Client var err error if mustImpersonate { kubeClient, _, err = impersonation.GetClient(ctx) } else { kubeClient = r.Client } if err != nil { return ctrl.Result{}, err } resourceManager := ssa.NewResourceManager(kubeClient, nil, ssa.Owner{ Field: r.ControllerName, Group: kustomizev1.GroupVersion.Group, }) opts := ssa.DeleteOptions{ PropagationPolicy: metav1.DeletePropagationBackground, Inclusions: resourceManager.GetOwnerLabels(obj.Name, obj.Namespace), Exclusions: map[string]string{ fmt.Sprintf("%s/prune", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue, fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue, }, } changeSet, err := resourceManager.DeleteAll(ctx, objects, opts) if err != nil { r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, "pruning for deleted resource failed", nil) // Return the error so we retry the failed garbage collection return ctrl.Result{}, err } if changeSet != nil && len(changeSet.Entries) > 0 { // Emit event with the resources marked for deletion. r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, changeSet.String(), nil) // Wait for the resources marked for deletion to be terminated. if obj.GetDeletionPolicy() == kustomizev1.DeletionPolicyWaitForTermination { if err := resourceManager.WaitForSetTermination(changeSet, ssa.WaitOptions{ Interval: 2 * time.Second, Timeout: obj.GetTimeout(), }); err != nil { // Emit an event and log the error if a timeout occurs. msg := "failed to wait for resources termination" log.Error(err, msg) r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil) } } } } else { // when the account to impersonate is gone, log the stale objects and continue with the finalization msg := fmt.Sprintf("unable to prune objects: \n%s", ssautil.FmtUnstructuredList(objects)) log.Error(fmt.Errorf("skiping pruning, failed to find account to impersonate"), msg) r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil) } } // Remove our finalizer from the list and update it controllerutil.RemoveFinalizer(obj, kustomizev1.KustomizationFinalizer) // Cleanup caches. for _, op := range intcache.AllOperations { r.TokenCache.DeleteEventsForObject(kustomizev1.KustomizationKind, obj.GetName(), obj.GetNamespace(), op) } // Stop reconciliation as the object is being deleted return ctrl.Result{}, nil } func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization, revision, originRevision, severity, msg string, metadata map[string]string) { if metadata == nil { metadata = map[string]string{} } if revision != "" { metadata[kustomizev1.GroupVersion.Group+"/"+eventv1.MetaRevisionKey] = revision } if originRevision != "" { metadata[kustomizev1.GroupVersion.Group+"/"+eventv1.MetaOriginRevisionKey] = originRevision } reason := severity if r := conditions.GetReason(obj, meta.ReadyCondition); r != "" { reason = r } eventtype := "Normal" if severity == eventv1.EventSeverityError { eventtype = "Warning" } r.EventRecorder.AnnotatedEventf(obj, metadata, eventtype, reason, msg) } func (r *KustomizationReconciler) finalizeStatus(ctx context.Context, obj *kustomizev1.Kustomization, patcher *patch.SerialPatcher) error { // Set the value of the reconciliation request in status. if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok { obj.Status.LastHandledReconcileAt = v } // Remove the Reconciling condition and update the observed generation // if the reconciliation was successful. if conditions.IsTrue(obj, meta.ReadyCondition) { conditions.Delete(obj, meta.ReconcilingCondition) obj.Status.ObservedGeneration = obj.Generation } // Set the Reconciling reason to ProgressingWithRetry if the // reconciliation has failed. if conditions.IsFalse(obj, meta.ReadyCondition) && conditions.Has(obj, meta.ReconcilingCondition) { rc := conditions.Get(obj, meta.ReconcilingCondition) rc.Reason = meta.ProgressingWithRetryReason conditions.Set(obj, rc) } // Patch finalizers, status and conditions. return r.patch(ctx, obj, patcher) } func (r *KustomizationReconciler) patch(ctx context.Context, obj *kustomizev1.Kustomization, patcher *patch.SerialPatcher) (retErr error) { // Configure the runtime patcher. patchOpts := []patch.Option{} ownedConditions := []string{ meta.HealthyCondition, meta.ReadyCondition, meta.ReconcilingCondition, meta.StalledCondition, } patchOpts = append(patchOpts, patch.WithOwnedConditions{Conditions: ownedConditions}, patch.WithForceOverwriteConditions{}, patch.WithFieldOwner(r.statusManager), ) // Patch the object status, conditions and finalizers. if err := patcher.Patch(ctx, obj, patchOpts...); err != nil { if !obj.GetDeletionTimestamp().IsZero() { err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) }) } retErr = kerrors.NewAggregate([]error{retErr, err}) if retErr != nil { return retErr } } return nil } // getClientAndPoller creates a status poller with the custom status readers // from CEL expressions and the custom job status reader, and returns the // Kubernetes client of the controller and the status poller. // Should be used for reconciliations that are not configured to use // ServiceAccount impersonation or kubeconfig. func (r *KustomizationReconciler) getClientAndPoller( readerCtors []func(apimeta.RESTMapper) engine.StatusReader, ) (client.Client, *polling.StatusPoller) { readers := make([]engine.StatusReader, 0, 1+len(readerCtors)) readers = append(readers, statusreaders.NewCustomJobStatusReader(r.Mapper)) for _, ctor := range readerCtors { readers = append(readers, ctor(r.Mapper)) } poller := polling.NewStatusPoller(r.Client, r.Mapper, polling.Options{ CustomStatusReaders: readers, ClusterReaderFactory: r.ClusterReader, }) return r.Client, poller } // getOriginRevision returns the origin revision of the source artifact, // or the empty string if it's not present, or if the artifact itself // is not present. func getOriginRevision(src sourcev1.Source) string { a := src.GetArtifact() if a == nil { return "" } return a.Metadata[OCIArtifactOriginRevisionAnnotation] }