1276 lines
44 KiB
Go
1276 lines
44 KiB
Go
/*
|
|
Copyright 2020 The Flux authors
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controller
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
securejoin "github.com/cyphar/filepath-securejoin"
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
apimeta "k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
|
kuberecorder "k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/workqueue"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/builder"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
|
|
"github.com/fluxcd/cli-utils/pkg/kstatus/polling"
|
|
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
|
|
"github.com/fluxcd/cli-utils/pkg/object"
|
|
apiacl "github.com/fluxcd/pkg/apis/acl"
|
|
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
|
|
"github.com/fluxcd/pkg/apis/meta"
|
|
"github.com/fluxcd/pkg/auth"
|
|
"github.com/fluxcd/pkg/cache"
|
|
"github.com/fluxcd/pkg/http/fetch"
|
|
generator "github.com/fluxcd/pkg/kustomize"
|
|
"github.com/fluxcd/pkg/runtime/acl"
|
|
"github.com/fluxcd/pkg/runtime/cel"
|
|
runtimeClient "github.com/fluxcd/pkg/runtime/client"
|
|
"github.com/fluxcd/pkg/runtime/conditions"
|
|
runtimeCtrl "github.com/fluxcd/pkg/runtime/controller"
|
|
"github.com/fluxcd/pkg/runtime/jitter"
|
|
"github.com/fluxcd/pkg/runtime/patch"
|
|
"github.com/fluxcd/pkg/runtime/predicates"
|
|
"github.com/fluxcd/pkg/runtime/statusreaders"
|
|
"github.com/fluxcd/pkg/ssa"
|
|
"github.com/fluxcd/pkg/ssa/normalize"
|
|
ssautil "github.com/fluxcd/pkg/ssa/utils"
|
|
"github.com/fluxcd/pkg/tar"
|
|
sourcev1 "github.com/fluxcd/source-controller/api/v1"
|
|
sourcev1b2 "github.com/fluxcd/source-controller/api/v1beta2"
|
|
|
|
kustomizev1 "github.com/fluxcd/kustomize-controller/api/v1"
|
|
intcache "github.com/fluxcd/kustomize-controller/internal/cache"
|
|
"github.com/fluxcd/kustomize-controller/internal/decryptor"
|
|
"github.com/fluxcd/kustomize-controller/internal/inventory"
|
|
)
|
|
|
|
// +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations,verbs=get;list;watch;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations/status,verbs=get;update;patch
|
|
// +kubebuilder:rbac:groups=kustomize.toolkit.fluxcd.io,resources=kustomizations/finalizers,verbs=get;create;update;patch;delete
|
|
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets;ocirepositories;gitrepositories,verbs=get;list;watch
|
|
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status;ocirepositories/status;gitrepositories/status,verbs=get
|
|
// +kubebuilder:rbac:groups="",resources=configmaps;secrets;serviceaccounts,verbs=get;list;watch
|
|
// +kubebuilder:rbac:groups="",resources=serviceaccounts/token,verbs=create
|
|
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
|
|
|
|
// KustomizationReconciler reconciles a Kustomization object
|
|
type KustomizationReconciler struct {
|
|
client.Client
|
|
kuberecorder.EventRecorder
|
|
runtimeCtrl.Metrics
|
|
|
|
artifactFetchRetries int
|
|
requeueDependency time.Duration
|
|
|
|
Mapper apimeta.RESTMapper
|
|
APIReader client.Reader
|
|
ClusterReader engine.ClusterReaderFactory
|
|
ControllerName string
|
|
statusManager string
|
|
NoCrossNamespaceRefs bool
|
|
NoRemoteBases bool
|
|
FailFast bool
|
|
DefaultServiceAccount string
|
|
KubeConfigOpts runtimeClient.KubeConfigOptions
|
|
ConcurrentSSA int
|
|
DisallowedFieldManagers []string
|
|
StrictSubstitutions bool
|
|
GroupChangeLog bool
|
|
TokenCache *cache.TokenCache
|
|
}
|
|
|
|
// KustomizationReconcilerOptions contains options for the KustomizationReconciler.
|
|
type KustomizationReconcilerOptions struct {
|
|
HTTPRetry int
|
|
DependencyRequeueInterval time.Duration
|
|
RateLimiter workqueue.TypedRateLimiter[reconcile.Request]
|
|
}
|
|
|
|
func (r *KustomizationReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opts KustomizationReconcilerOptions) error {
|
|
const (
|
|
ociRepositoryIndexKey string = ".metadata.ociRepository"
|
|
gitRepositoryIndexKey string = ".metadata.gitRepository"
|
|
bucketIndexKey string = ".metadata.bucket"
|
|
)
|
|
|
|
// Index the Kustomizations by the OCIRepository references they (may) point at.
|
|
if err := mgr.GetCache().IndexField(ctx, &kustomizev1.Kustomization{}, ociRepositoryIndexKey,
|
|
r.indexBy(sourcev1b2.OCIRepositoryKind)); err != nil {
|
|
return fmt.Errorf("failed setting index fields: %w", err)
|
|
}
|
|
|
|
// Index the Kustomizations by the GitRepository references they (may) point at.
|
|
if err := mgr.GetCache().IndexField(ctx, &kustomizev1.Kustomization{}, gitRepositoryIndexKey,
|
|
r.indexBy(sourcev1.GitRepositoryKind)); err != nil {
|
|
return fmt.Errorf("failed setting index fields: %w", err)
|
|
}
|
|
|
|
// Index the Kustomizations by the Bucket references they (may) point at.
|
|
if err := mgr.GetCache().IndexField(ctx, &kustomizev1.Kustomization{}, bucketIndexKey,
|
|
r.indexBy(sourcev1.BucketKind)); err != nil {
|
|
return fmt.Errorf("failed setting index fields: %w", err)
|
|
}
|
|
|
|
r.requeueDependency = opts.DependencyRequeueInterval
|
|
r.statusManager = fmt.Sprintf("gotk-%s", r.ControllerName)
|
|
r.artifactFetchRetries = opts.HTTPRetry
|
|
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&kustomizev1.Kustomization{}, builder.WithPredicates(
|
|
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}),
|
|
)).
|
|
Watches(
|
|
&sourcev1b2.OCIRepository{},
|
|
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(ociRepositoryIndexKey)),
|
|
builder.WithPredicates(SourceRevisionChangePredicate{}),
|
|
).
|
|
Watches(
|
|
&sourcev1.GitRepository{},
|
|
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(gitRepositoryIndexKey)),
|
|
builder.WithPredicates(SourceRevisionChangePredicate{}),
|
|
).
|
|
Watches(
|
|
&sourcev1.Bucket{},
|
|
handler.EnqueueRequestsFromMapFunc(r.requestsForRevisionChangeOf(bucketIndexKey)),
|
|
builder.WithPredicates(SourceRevisionChangePredicate{}),
|
|
).
|
|
WithOptions(controller.Options{
|
|
RateLimiter: opts.RateLimiter,
|
|
}).
|
|
Complete(r)
|
|
}
|
|
|
|
func (r *KustomizationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
reconcileStart := time.Now()
|
|
|
|
obj := &kustomizev1.Kustomization{}
|
|
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
|
|
return ctrl.Result{}, client.IgnoreNotFound(err)
|
|
}
|
|
|
|
// Initialize the runtime patcher with the current version of the object.
|
|
patcher := patch.NewSerialPatcher(obj, r.Client)
|
|
|
|
// Finalise the reconciliation and report the results.
|
|
defer func() {
|
|
// Patch finalizers, status and conditions.
|
|
if err := r.finalizeStatus(ctx, obj, patcher); err != nil {
|
|
retErr = kerrors.NewAggregate([]error{retErr, err})
|
|
}
|
|
|
|
// Record Prometheus metrics.
|
|
r.Metrics.RecordDuration(ctx, obj, reconcileStart)
|
|
|
|
// Do not proceed if the Kustomization is suspended
|
|
if obj.Spec.Suspend {
|
|
return
|
|
}
|
|
|
|
// Log and emit success event.
|
|
if conditions.IsReady(obj) {
|
|
msg := fmt.Sprintf("Reconciliation finished in %s, next run in %s",
|
|
time.Since(reconcileStart).String(),
|
|
obj.Spec.Interval.Duration.String())
|
|
log.Info(msg, "revision", obj.Status.LastAttemptedRevision)
|
|
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, msg,
|
|
map[string]string{
|
|
kustomizev1.GroupVersion.Group + "/" + eventv1.MetaCommitStatusKey: eventv1.MetaCommitStatusUpdateValue,
|
|
})
|
|
}
|
|
}()
|
|
|
|
// Prune managed resources if the object is under deletion.
|
|
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
|
|
return r.finalize(ctx, obj)
|
|
}
|
|
|
|
// Add finalizer first if it doesn't exist to avoid the race condition
|
|
// between init and delete.
|
|
// Note: Finalizers in general can only be added when the deletionTimestamp
|
|
// is not set.
|
|
if !controllerutil.ContainsFinalizer(obj, kustomizev1.KustomizationFinalizer) {
|
|
controllerutil.AddFinalizer(obj, kustomizev1.KustomizationFinalizer)
|
|
return ctrl.Result{Requeue: true}, nil
|
|
}
|
|
|
|
// Skip reconciliation if the object is suspended.
|
|
if obj.Spec.Suspend {
|
|
log.Info("Reconciliation is suspended for this object")
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Configure custom health checks.
|
|
statusReaders, err := cel.PollerWithCustomHealthChecks(ctx, obj.Spec.HealthCheckExprs)
|
|
if err != nil {
|
|
const msg = "Reconciliation failed terminally due to configuration error"
|
|
errMsg := fmt.Sprintf("%s: %v", msg, err)
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.InvalidCELExpressionReason, "%s", errMsg)
|
|
conditions.MarkStalled(obj, meta.InvalidCELExpressionReason, "%s", errMsg)
|
|
obj.Status.ObservedGeneration = obj.Generation
|
|
log.Error(err, msg)
|
|
r.event(obj, "", "", eventv1.EventSeverityError, errMsg, nil)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Check object-level workload identity feature gate.
|
|
if d := obj.Spec.Decryption; d != nil && d.ServiceAccountName != "" && !auth.IsObjectLevelWorkloadIdentityEnabled() {
|
|
const gate = auth.FeatureGateObjectLevelWorkloadIdentity
|
|
const msgFmt = "to use spec.decryption.serviceAccountName for decryption authentication please enable the %s feature gate in the controller"
|
|
msg := fmt.Sprintf(msgFmt, gate)
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FeatureGateDisabledReason, msgFmt, gate)
|
|
conditions.MarkStalled(obj, meta.FeatureGateDisabledReason, msgFmt, gate)
|
|
log.Error(auth.ErrObjectLevelWorkloadIdentityNotEnabled, msg)
|
|
r.event(obj, "", "", eventv1.EventSeverityError, msg, nil)
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// Resolve the source reference and requeue the reconciliation if the source is not found.
|
|
artifactSource, err := r.getSource(ctx, obj)
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", err)
|
|
|
|
if apierrors.IsNotFound(err) {
|
|
msg := fmt.Sprintf("Source '%s' not found", obj.Spec.SourceRef.String())
|
|
log.Info(msg)
|
|
return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil
|
|
}
|
|
|
|
if acl.IsAccessDenied(err) {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, apiacl.AccessDeniedReason, "%s", err)
|
|
log.Error(err, "Access denied to cross-namespace source")
|
|
r.event(obj, "", "", eventv1.EventSeverityError, err.Error(), nil)
|
|
return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil
|
|
}
|
|
|
|
// Retry with backoff on transient errors.
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
// Requeue the reconciliation if the source artifact is not found.
|
|
if artifactSource.GetArtifact() == nil {
|
|
msg := fmt.Sprintf("Source artifact not found, retrying in %s", r.requeueDependency.String())
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", msg)
|
|
log.Info(msg)
|
|
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
|
|
}
|
|
revision := artifactSource.GetArtifact().Revision
|
|
originRevision := getOriginRevision(artifactSource)
|
|
|
|
// Check dependencies and requeue the reconciliation if the check fails.
|
|
if len(obj.Spec.DependsOn) > 0 {
|
|
if err := r.checkDependencies(ctx, obj, artifactSource); err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.DependencyNotReadyReason, "%s", err)
|
|
msg := fmt.Sprintf("Dependencies do not meet ready condition, retrying in %s", r.requeueDependency.String())
|
|
log.Info(msg)
|
|
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
|
|
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
|
|
}
|
|
log.Info("All dependencies are ready, proceeding with reconciliation")
|
|
}
|
|
|
|
// Reconcile the latest revision.
|
|
reconcileErr := r.reconcile(ctx, obj, artifactSource, patcher, statusReaders)
|
|
|
|
// Requeue at the specified retry interval if the artifact tarball is not found.
|
|
if errors.Is(reconcileErr, fetch.ErrFileNotFound) {
|
|
msg := fmt.Sprintf("Source is not ready, artifact not found, retrying in %s", r.requeueDependency.String())
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", msg)
|
|
log.Info(msg)
|
|
return ctrl.Result{RequeueAfter: r.requeueDependency}, nil
|
|
}
|
|
|
|
// Broadcast the reconciliation failure and requeue at the specified retry interval.
|
|
if reconcileErr != nil {
|
|
log.Error(reconcileErr, fmt.Sprintf("Reconciliation failed after %s, next try in %s",
|
|
time.Since(reconcileStart).String(),
|
|
obj.GetRetryInterval().String()),
|
|
"revision",
|
|
revision)
|
|
r.event(obj, revision, originRevision, eventv1.EventSeverityError,
|
|
reconcileErr.Error(), nil)
|
|
return ctrl.Result{RequeueAfter: obj.GetRetryInterval()}, nil
|
|
}
|
|
|
|
// Requeue the reconciliation at the specified interval.
|
|
return ctrl.Result{RequeueAfter: jitter.JitteredIntervalDuration(obj.GetRequeueAfter())}, nil
|
|
}
|
|
|
|
func (r *KustomizationReconciler) reconcile(
|
|
ctx context.Context,
|
|
obj *kustomizev1.Kustomization,
|
|
src sourcev1.Source,
|
|
patcher *patch.SerialPatcher,
|
|
statusReaders []func(apimeta.RESTMapper) engine.StatusReader) error {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
|
|
// Update status with the reconciliation progress.
|
|
revision := src.GetArtifact().Revision
|
|
originRevision := getOriginRevision(src)
|
|
progressingMsg := fmt.Sprintf("Fetching manifests for revision %s with a timeout of %s", revision, obj.GetTimeout().String())
|
|
conditions.MarkUnknown(obj, meta.ReadyCondition, meta.ProgressingReason, "%s", "Reconciliation in progress")
|
|
conditions.MarkReconciling(obj, meta.ProgressingReason, "%s", progressingMsg)
|
|
if err := r.patch(ctx, obj, patcher); err != nil {
|
|
return fmt.Errorf("failed to update status: %w", err)
|
|
}
|
|
|
|
// Create a snapshot of the current inventory.
|
|
oldInventory := inventory.New()
|
|
if obj.Status.Inventory != nil {
|
|
obj.Status.Inventory.DeepCopyInto(oldInventory)
|
|
}
|
|
|
|
// Create tmp dir.
|
|
tmpDir, err := MkdirTempAbs("", "kustomization-")
|
|
if err != nil {
|
|
err = fmt.Errorf("tmp dir error: %w", err)
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.DirCreationFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
defer func(path string) {
|
|
if err := os.RemoveAll(path); err != nil {
|
|
log.Error(err, "failed to remove tmp dir", "path", path)
|
|
}
|
|
}(tmpDir)
|
|
|
|
// Download artifact and extract files to the tmp dir.
|
|
if err = fetch.NewArchiveFetcherWithLogger(
|
|
r.artifactFetchRetries,
|
|
tar.UnlimitedUntarSize,
|
|
tar.UnlimitedUntarSize,
|
|
os.Getenv("SOURCE_CONTROLLER_LOCALHOST"),
|
|
ctrl.LoggerFrom(ctx),
|
|
).Fetch(src.GetArtifact().URL, src.GetArtifact().Digest, tmpDir); err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// check build path exists
|
|
dirPath, err := securejoin.SecureJoin(tmpDir, obj.Spec.Path)
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
if _, err := os.Stat(dirPath); err != nil {
|
|
err = fmt.Errorf("kustomization path not found: %w", err)
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ArtifactFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Report progress and set last attempted revision in status.
|
|
obj.Status.LastAttemptedRevision = revision
|
|
progressingMsg = fmt.Sprintf("Building manifests for revision %s with a timeout of %s", revision, obj.GetTimeout().String())
|
|
conditions.MarkReconciling(obj, meta.ProgressingReason, "%s", progressingMsg)
|
|
if err := r.patch(ctx, obj, patcher); err != nil {
|
|
return fmt.Errorf("failed to update status: %w", err)
|
|
}
|
|
|
|
// Configure the Kubernetes client for impersonation.
|
|
var impersonatorOpts []runtimeClient.ImpersonatorOption
|
|
var mustImpersonate bool
|
|
if r.DefaultServiceAccount != "" || obj.Spec.ServiceAccountName != "" {
|
|
mustImpersonate = true
|
|
impersonatorOpts = append(impersonatorOpts,
|
|
runtimeClient.WithServiceAccount(r.DefaultServiceAccount, obj.Spec.ServiceAccountName, obj.GetNamespace()))
|
|
}
|
|
if obj.Spec.KubeConfig != nil {
|
|
mustImpersonate = true
|
|
impersonatorOpts = append(impersonatorOpts,
|
|
runtimeClient.WithKubeConfig(obj.Spec.KubeConfig, r.KubeConfigOpts, obj.GetNamespace()))
|
|
}
|
|
if r.ClusterReader != nil || len(statusReaders) > 0 {
|
|
impersonatorOpts = append(impersonatorOpts,
|
|
runtimeClient.WithPolling(r.ClusterReader, statusReaders...))
|
|
}
|
|
impersonation := runtimeClient.NewImpersonator(r.Client, impersonatorOpts...)
|
|
|
|
// Create the Kubernetes client that runs under impersonation.
|
|
var kubeClient client.Client
|
|
var statusPoller *polling.StatusPoller
|
|
if mustImpersonate {
|
|
kubeClient, statusPoller, err = impersonation.GetClient(ctx)
|
|
} else {
|
|
kubeClient, statusPoller = r.getClientAndPoller(statusReaders)
|
|
}
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err)
|
|
return fmt.Errorf("failed to build kube client: %w", err)
|
|
}
|
|
|
|
// Generate kustomization.yaml if needed.
|
|
k, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.BuildFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
err = r.generate(unstructured.Unstructured{Object: k}, tmpDir, dirPath)
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.BuildFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Build the Kustomize overlay and decrypt secrets if needed.
|
|
resources, err := r.build(ctx, obj, unstructured.Unstructured{Object: k}, tmpDir, dirPath)
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.BuildFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Convert the build result into Kubernetes unstructured objects.
|
|
objects, err := ssautil.ReadObjects(bytes.NewReader(resources))
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.BuildFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Create the server-side apply manager.
|
|
resourceManager := ssa.NewResourceManager(kubeClient, statusPoller, ssa.Owner{
|
|
Field: r.ControllerName,
|
|
Group: kustomizev1.GroupVersion.Group,
|
|
})
|
|
resourceManager.SetOwnerLabels(objects, obj.GetName(), obj.GetNamespace())
|
|
resourceManager.SetConcurrency(r.ConcurrentSSA)
|
|
|
|
// Update status with the reconciliation progress.
|
|
progressingMsg = fmt.Sprintf("Detecting drift for revision %s with a timeout of %s", revision, obj.GetTimeout().String())
|
|
conditions.MarkReconciling(obj, meta.ProgressingReason, "%s", progressingMsg)
|
|
if err := r.patch(ctx, obj, patcher); err != nil {
|
|
return fmt.Errorf("failed to update status: %w", err)
|
|
}
|
|
|
|
// Validate and apply resources in stages.
|
|
drifted, changeSet, err := r.apply(ctx, resourceManager, obj, revision, originRevision, objects)
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Create an inventory from the reconciled resources.
|
|
newInventory := inventory.New()
|
|
err = inventory.AddChangeSet(newInventory, changeSet)
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Set last applied inventory in status.
|
|
obj.Status.Inventory = newInventory
|
|
|
|
// Detect stale resources which are subject to garbage collection.
|
|
staleObjects, err := inventory.Diff(oldInventory, newInventory)
|
|
if err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.ReconciliationFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Run garbage collection for stale resources that do not have pruning disabled.
|
|
if _, err := r.prune(ctx, resourceManager, obj, revision, originRevision, staleObjects); err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.PruneFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Run the health checks for the last applied resources.
|
|
isNewRevision := !src.GetArtifact().HasRevision(obj.Status.LastAppliedRevision)
|
|
if err := r.checkHealth(ctx,
|
|
resourceManager,
|
|
patcher,
|
|
obj,
|
|
revision,
|
|
originRevision,
|
|
isNewRevision,
|
|
drifted,
|
|
changeSet.ToObjMetadataSet()); err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err)
|
|
return err
|
|
}
|
|
|
|
// Set last applied revisions.
|
|
obj.Status.LastAppliedRevision = revision
|
|
obj.Status.LastAppliedOriginRevision = originRevision
|
|
|
|
// Mark the object as ready.
|
|
conditions.MarkTrue(obj,
|
|
meta.ReadyCondition,
|
|
meta.ReconciliationSucceededReason,
|
|
"Applied revision: %s", revision)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *KustomizationReconciler) checkDependencies(ctx context.Context,
|
|
obj *kustomizev1.Kustomization,
|
|
source sourcev1.Source) error {
|
|
for _, d := range obj.Spec.DependsOn {
|
|
if d.Namespace == "" {
|
|
d.Namespace = obj.GetNamespace()
|
|
}
|
|
dName := types.NamespacedName{
|
|
Namespace: d.Namespace,
|
|
Name: d.Name,
|
|
}
|
|
var k kustomizev1.Kustomization
|
|
err := r.APIReader.Get(ctx, dName, &k)
|
|
if err != nil {
|
|
return fmt.Errorf("dependency '%s' not found: %w", dName, err)
|
|
}
|
|
|
|
if len(k.Status.Conditions) == 0 || k.Generation != k.Status.ObservedGeneration {
|
|
return fmt.Errorf("dependency '%s' is not ready", dName)
|
|
}
|
|
|
|
if !apimeta.IsStatusConditionTrue(k.Status.Conditions, meta.ReadyCondition) {
|
|
return fmt.Errorf("dependency '%s' is not ready", dName)
|
|
}
|
|
|
|
srcNamespace := k.Spec.SourceRef.Namespace
|
|
if srcNamespace == "" {
|
|
srcNamespace = k.GetNamespace()
|
|
}
|
|
dSrcNamespace := obj.Spec.SourceRef.Namespace
|
|
if dSrcNamespace == "" {
|
|
dSrcNamespace = obj.GetNamespace()
|
|
}
|
|
|
|
if k.Spec.SourceRef.Name == obj.Spec.SourceRef.Name &&
|
|
srcNamespace == dSrcNamespace &&
|
|
k.Spec.SourceRef.Kind == obj.Spec.SourceRef.Kind &&
|
|
!source.GetArtifact().HasRevision(k.Status.LastAppliedRevision) {
|
|
return fmt.Errorf("dependency '%s' revision is not up to date", dName)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *KustomizationReconciler) getSource(ctx context.Context,
|
|
obj *kustomizev1.Kustomization) (sourcev1.Source, error) {
|
|
var src sourcev1.Source
|
|
sourceNamespace := obj.GetNamespace()
|
|
if obj.Spec.SourceRef.Namespace != "" {
|
|
sourceNamespace = obj.Spec.SourceRef.Namespace
|
|
}
|
|
namespacedName := types.NamespacedName{
|
|
Namespace: sourceNamespace,
|
|
Name: obj.Spec.SourceRef.Name,
|
|
}
|
|
|
|
if r.NoCrossNamespaceRefs && sourceNamespace != obj.GetNamespace() {
|
|
return src, acl.AccessDeniedError(
|
|
fmt.Sprintf("can't access '%s/%s', cross-namespace references have been blocked",
|
|
obj.Spec.SourceRef.Kind, namespacedName))
|
|
}
|
|
|
|
switch obj.Spec.SourceRef.Kind {
|
|
case sourcev1b2.OCIRepositoryKind:
|
|
var repository sourcev1b2.OCIRepository
|
|
err := r.Client.Get(ctx, namespacedName, &repository)
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return src, err
|
|
}
|
|
return src, fmt.Errorf("unable to get source '%s': %w", namespacedName, err)
|
|
}
|
|
src = &repository
|
|
case sourcev1.GitRepositoryKind:
|
|
var repository sourcev1.GitRepository
|
|
err := r.Client.Get(ctx, namespacedName, &repository)
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return src, err
|
|
}
|
|
return src, fmt.Errorf("unable to get source '%s': %w", namespacedName, err)
|
|
}
|
|
src = &repository
|
|
case sourcev1.BucketKind:
|
|
var bucket sourcev1.Bucket
|
|
err := r.Client.Get(ctx, namespacedName, &bucket)
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return src, err
|
|
}
|
|
return src, fmt.Errorf("unable to get source '%s': %w", namespacedName, err)
|
|
}
|
|
src = &bucket
|
|
default:
|
|
return src, fmt.Errorf("source `%s` kind '%s' not supported",
|
|
obj.Spec.SourceRef.Name, obj.Spec.SourceRef.Kind)
|
|
}
|
|
return src, nil
|
|
}
|
|
|
|
func (r *KustomizationReconciler) generate(obj unstructured.Unstructured,
|
|
workDir string, dirPath string) error {
|
|
_, err := generator.NewGenerator(workDir, obj).WriteFile(dirPath)
|
|
return err
|
|
}
|
|
|
|
func (r *KustomizationReconciler) build(ctx context.Context,
|
|
obj *kustomizev1.Kustomization, u unstructured.Unstructured,
|
|
workDir, dirPath string) ([]byte, error) {
|
|
dec, cleanup, err := decryptor.NewTempDecryptor(workDir, r.Client, obj, r.TokenCache)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer cleanup()
|
|
|
|
// Import keys and static credentials for decryption.
|
|
if err := dec.ImportKeys(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set options for secret-less authentication with cloud providers for decryption.
|
|
dec.SetAuthOptions(ctx)
|
|
|
|
// Decrypt Kustomize EnvSources files before build
|
|
if err = dec.DecryptSources(dirPath); err != nil {
|
|
return nil, fmt.Errorf("error decrypting sources: %w", err)
|
|
}
|
|
|
|
m, err := generator.SecureBuild(workDir, dirPath, !r.NoRemoteBases)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kustomize build failed: %w", err)
|
|
}
|
|
|
|
for _, res := range m.Resources() {
|
|
// check if resources conform to the Kubernetes API conventions
|
|
if res.GetName() == "" || res.GetKind() == "" || res.GetApiVersion() == "" {
|
|
return nil, fmt.Errorf("failed to decode Kubernetes apiVersion, kind and name from: %v", res.String())
|
|
}
|
|
|
|
// check if resources are encrypted and decrypt them before generating the final YAML
|
|
if obj.Spec.Decryption != nil {
|
|
outRes, err := dec.DecryptResource(res)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("decryption failed for '%s': %w", res.GetName(), err)
|
|
}
|
|
|
|
if outRes != nil {
|
|
_, err = m.Replace(res)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// run variable substitutions
|
|
if obj.Spec.PostBuild != nil {
|
|
outRes, err := generator.SubstituteVariables(ctx, r.Client, u, res,
|
|
generator.SubstituteWithStrict(r.StrictSubstitutions))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("post build failed for '%s': %w", res.GetName(), err)
|
|
}
|
|
|
|
if outRes != nil {
|
|
_, err = m.Replace(res)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
resources, err := m.AsYaml()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kustomize build failed: %w", err)
|
|
}
|
|
|
|
return resources, nil
|
|
}
|
|
|
|
func (r *KustomizationReconciler) apply(ctx context.Context,
|
|
manager *ssa.ResourceManager,
|
|
obj *kustomizev1.Kustomization,
|
|
revision string,
|
|
originRevision string,
|
|
objects []*unstructured.Unstructured) (bool, *ssa.ChangeSet, error) {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
|
|
if err := normalize.UnstructuredList(objects); err != nil {
|
|
return false, nil, err
|
|
}
|
|
|
|
if cmeta := obj.Spec.CommonMetadata; cmeta != nil {
|
|
ssautil.SetCommonMetadata(objects, cmeta.Labels, cmeta.Annotations)
|
|
}
|
|
|
|
applyOpts := ssa.DefaultApplyOptions()
|
|
applyOpts.Force = obj.Spec.Force
|
|
applyOpts.ExclusionSelector = map[string]string{
|
|
fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
|
|
fmt.Sprintf("%s/ssa", kustomizev1.GroupVersion.Group): kustomizev1.IgnoreValue,
|
|
}
|
|
applyOpts.IfNotPresentSelector = map[string]string{
|
|
fmt.Sprintf("%s/ssa", kustomizev1.GroupVersion.Group): kustomizev1.IfNotPresentValue,
|
|
}
|
|
applyOpts.ForceSelector = map[string]string{
|
|
fmt.Sprintf("%s/force", kustomizev1.GroupVersion.Group): kustomizev1.EnabledValue,
|
|
}
|
|
|
|
fieldManagers := []ssa.FieldManager{
|
|
{
|
|
// to undo changes made with 'kubectl apply --server-side --force-conflicts'
|
|
Name: "kubectl",
|
|
OperationType: metav1.ManagedFieldsOperationApply,
|
|
},
|
|
{
|
|
// to undo changes made with 'kubectl apply'
|
|
Name: "kubectl",
|
|
OperationType: metav1.ManagedFieldsOperationUpdate,
|
|
},
|
|
{
|
|
// to undo changes made with 'kubectl apply'
|
|
Name: "before-first-apply",
|
|
OperationType: metav1.ManagedFieldsOperationUpdate,
|
|
},
|
|
{
|
|
// to undo changes made by the controller before SSA
|
|
Name: r.ControllerName,
|
|
OperationType: metav1.ManagedFieldsOperationUpdate,
|
|
},
|
|
}
|
|
|
|
for _, fieldManager := range r.DisallowedFieldManagers {
|
|
fieldManagers = append(fieldManagers, ssa.FieldManager{
|
|
Name: fieldManager,
|
|
OperationType: metav1.ManagedFieldsOperationApply,
|
|
})
|
|
// to undo changes made by the controller before SSA
|
|
fieldManagers = append(fieldManagers, ssa.FieldManager{
|
|
Name: fieldManager,
|
|
OperationType: metav1.ManagedFieldsOperationUpdate,
|
|
})
|
|
}
|
|
|
|
applyOpts.Cleanup = ssa.ApplyCleanupOptions{
|
|
Annotations: []string{
|
|
// remove the kubectl annotation
|
|
corev1.LastAppliedConfigAnnotation,
|
|
// remove deprecated fluxcd.io annotations
|
|
"kustomize.toolkit.fluxcd.io/checksum",
|
|
"fluxcd.io/sync-checksum",
|
|
},
|
|
Labels: []string{
|
|
// remove deprecated fluxcd.io labels
|
|
"fluxcd.io/sync-gc-mark",
|
|
},
|
|
FieldManagers: fieldManagers,
|
|
Exclusions: map[string]string{
|
|
fmt.Sprintf("%s/ssa", kustomizev1.GroupVersion.Group): kustomizev1.MergeValue,
|
|
},
|
|
}
|
|
|
|
// contains only CRDs and Namespaces
|
|
var defStage []*unstructured.Unstructured
|
|
|
|
// contains only Kubernetes Class types e.g.: RuntimeClass, PriorityClass,
|
|
// StorageClass, VolumeSnapshotClass, IngressClass, GatewayClass, ClusterClass, etc
|
|
var classStage []*unstructured.Unstructured
|
|
|
|
// contains all objects except for CRDs, Namespaces and Class type objects
|
|
var resStage []*unstructured.Unstructured
|
|
|
|
// contains the objects' metadata after apply
|
|
resultSet := ssa.NewChangeSet()
|
|
|
|
for _, u := range objects {
|
|
if decryptor.IsEncryptedSecret(u) {
|
|
return false, nil,
|
|
fmt.Errorf("%s is SOPS encrypted, configuring decryption is required for this secret to be reconciled",
|
|
ssautil.FmtUnstructured(u))
|
|
}
|
|
|
|
switch {
|
|
case ssautil.IsClusterDefinition(u):
|
|
defStage = append(defStage, u)
|
|
case strings.HasSuffix(u.GetKind(), "Class"):
|
|
classStage = append(classStage, u)
|
|
default:
|
|
resStage = append(resStage, u)
|
|
}
|
|
|
|
}
|
|
|
|
var changeSetLog strings.Builder
|
|
|
|
// validate, apply and wait for CRDs and Namespaces to register
|
|
if len(defStage) > 0 {
|
|
changeSet, err := manager.ApplyAll(ctx, defStage, applyOpts)
|
|
if err != nil {
|
|
return false, nil, err
|
|
}
|
|
|
|
if changeSet != nil && len(changeSet.Entries) > 0 {
|
|
resultSet.Append(changeSet.Entries)
|
|
|
|
if r.GroupChangeLog {
|
|
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap())
|
|
} else {
|
|
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToMap())
|
|
}
|
|
for _, change := range changeSet.Entries {
|
|
if HasChanged(change.Action) {
|
|
changeSetLog.WriteString(change.String() + "\n")
|
|
}
|
|
}
|
|
|
|
if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{
|
|
Interval: 2 * time.Second,
|
|
Timeout: obj.GetTimeout(),
|
|
}); err != nil {
|
|
return false, nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// validate, apply and wait for Class type objects to register
|
|
if len(classStage) > 0 {
|
|
changeSet, err := manager.ApplyAll(ctx, classStage, applyOpts)
|
|
if err != nil {
|
|
return false, nil, err
|
|
}
|
|
|
|
if changeSet != nil && len(changeSet.Entries) > 0 {
|
|
resultSet.Append(changeSet.Entries)
|
|
|
|
if r.GroupChangeLog {
|
|
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap())
|
|
} else {
|
|
log.Info("server-side apply for cluster class types completed", "output", changeSet.ToMap())
|
|
}
|
|
for _, change := range changeSet.Entries {
|
|
if HasChanged(change.Action) {
|
|
changeSetLog.WriteString(change.String() + "\n")
|
|
}
|
|
}
|
|
|
|
if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), ssa.WaitOptions{
|
|
Interval: 2 * time.Second,
|
|
Timeout: obj.GetTimeout(),
|
|
}); err != nil {
|
|
return false, nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// sort by kind, validate and apply all the others objects
|
|
sort.Sort(ssa.SortableUnstructureds(resStage))
|
|
if len(resStage) > 0 {
|
|
changeSet, err := manager.ApplyAll(ctx, resStage, applyOpts)
|
|
if err != nil {
|
|
return false, nil, fmt.Errorf("%w\n%s", err, changeSetLog.String())
|
|
}
|
|
|
|
if changeSet != nil && len(changeSet.Entries) > 0 {
|
|
resultSet.Append(changeSet.Entries)
|
|
|
|
if r.GroupChangeLog {
|
|
log.Info("server-side apply for cluster definitions completed", "output", changeSet.ToGroupedMap())
|
|
} else {
|
|
log.Info("server-side apply completed", "output", changeSet.ToMap(), "revision", revision)
|
|
}
|
|
for _, change := range changeSet.Entries {
|
|
if HasChanged(change.Action) {
|
|
changeSetLog.WriteString(change.String() + "\n")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// emit event only if the server-side apply resulted in changes
|
|
applyLog := strings.TrimSuffix(changeSetLog.String(), "\n")
|
|
if applyLog != "" {
|
|
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, applyLog, nil)
|
|
}
|
|
|
|
return applyLog != "", resultSet, nil
|
|
}
|
|
|
|
func (r *KustomizationReconciler) checkHealth(ctx context.Context,
|
|
manager *ssa.ResourceManager,
|
|
patcher *patch.SerialPatcher,
|
|
obj *kustomizev1.Kustomization,
|
|
revision string,
|
|
originRevision string,
|
|
isNewRevision bool,
|
|
drifted bool,
|
|
objects object.ObjMetadataSet) error {
|
|
if len(obj.Spec.HealthChecks) == 0 && !obj.Spec.Wait {
|
|
conditions.Delete(obj, meta.HealthyCondition)
|
|
return nil
|
|
}
|
|
|
|
checkStart := time.Now()
|
|
var err error
|
|
if !obj.Spec.Wait {
|
|
objects, err = inventory.ReferenceToObjMetadataSet(obj.Spec.HealthChecks)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(objects) == 0 {
|
|
conditions.Delete(obj, meta.HealthyCondition)
|
|
return nil
|
|
}
|
|
|
|
// Guard against deadlock (waiting on itself).
|
|
var toCheck []object.ObjMetadata
|
|
for _, o := range objects {
|
|
if o.GroupKind.Kind == kustomizev1.KustomizationKind &&
|
|
o.Name == obj.GetName() &&
|
|
o.Namespace == obj.GetNamespace() {
|
|
continue
|
|
}
|
|
toCheck = append(toCheck, o)
|
|
}
|
|
|
|
// Find the previous health check result.
|
|
wasHealthy := apimeta.IsStatusConditionTrue(obj.Status.Conditions, meta.HealthyCondition)
|
|
|
|
// Update status with the reconciliation progress.
|
|
message := fmt.Sprintf("Running health checks for revision %s with a timeout of %s", revision, obj.GetTimeout().String())
|
|
conditions.MarkReconciling(obj, meta.ProgressingReason, "%s", message)
|
|
conditions.MarkUnknown(obj, meta.HealthyCondition, meta.ProgressingReason, "%s", message)
|
|
if err := r.patch(ctx, obj, patcher); err != nil {
|
|
return fmt.Errorf("unable to update the healthy status to progressing: %w", err)
|
|
}
|
|
|
|
// Check the health with a default timeout of 30sec shorter than the reconciliation interval.
|
|
if err := manager.WaitForSet(toCheck, ssa.WaitOptions{
|
|
Interval: 5 * time.Second,
|
|
Timeout: obj.GetTimeout(),
|
|
FailFast: r.FailFast,
|
|
}); err != nil {
|
|
conditions.MarkFalse(obj, meta.ReadyCondition, meta.HealthCheckFailedReason, "%s", err)
|
|
conditions.MarkFalse(obj, meta.HealthyCondition, meta.HealthCheckFailedReason, "%s", err)
|
|
return fmt.Errorf("health check failed after %s: %w", time.Since(checkStart).String(), err)
|
|
}
|
|
|
|
// Emit recovery event if the previous health check failed.
|
|
msg := fmt.Sprintf("Health check passed in %s", time.Since(checkStart).String())
|
|
if !wasHealthy || (isNewRevision && drifted) {
|
|
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, msg, nil)
|
|
}
|
|
|
|
conditions.MarkTrue(obj, meta.HealthyCondition, meta.SucceededReason, "%s", msg)
|
|
if err := r.patch(ctx, obj, patcher); err != nil {
|
|
return fmt.Errorf("unable to update the healthy status to progressing: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *KustomizationReconciler) prune(ctx context.Context,
|
|
manager *ssa.ResourceManager,
|
|
obj *kustomizev1.Kustomization,
|
|
revision string,
|
|
originRevision string,
|
|
objects []*unstructured.Unstructured) (bool, error) {
|
|
if !obj.Spec.Prune {
|
|
return false, nil
|
|
}
|
|
|
|
log := ctrl.LoggerFrom(ctx)
|
|
|
|
opts := ssa.DeleteOptions{
|
|
PropagationPolicy: metav1.DeletePropagationBackground,
|
|
Inclusions: manager.GetOwnerLabels(obj.Name, obj.Namespace),
|
|
Exclusions: map[string]string{
|
|
fmt.Sprintf("%s/prune", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
|
|
fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
|
|
},
|
|
}
|
|
|
|
changeSet, err := manager.DeleteAll(ctx, objects, opts)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// emit event only if the prune operation resulted in changes
|
|
if changeSet != nil && len(changeSet.Entries) > 0 {
|
|
log.Info(fmt.Sprintf("garbage collection completed: %s", changeSet.String()))
|
|
r.event(obj, revision, originRevision, eventv1.EventSeverityInfo, changeSet.String(), nil)
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// finalizerShouldDeleteResources determines if resources should be deleted
|
|
// based on the object's inventory and deletion policy.
|
|
// A suspended Kustomization or one without an inventory will not delete resources.
|
|
func finalizerShouldDeleteResources(obj *kustomizev1.Kustomization) bool {
|
|
if obj.Spec.Suspend {
|
|
return false
|
|
}
|
|
|
|
if obj.Status.Inventory == nil || len(obj.Status.Inventory.Entries) == 0 {
|
|
return false
|
|
}
|
|
|
|
switch obj.GetDeletionPolicy() {
|
|
case kustomizev1.DeletionPolicyMirrorPrune:
|
|
return obj.Spec.Prune
|
|
case kustomizev1.DeletionPolicyDelete:
|
|
return true
|
|
case kustomizev1.DeletionPolicyWaitForTermination:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// finalize handles the finalization logic for a Kustomization resource during its deletion process.
|
|
// Managed resources are pruned based on the deletion policy and suspended state of the Kustomization.
|
|
// When the policy is set to WaitForTermination, the function blocks and waits for the resources
|
|
// to be terminated by the Kubernetes Garbage Collector for the specified timeout duration.
|
|
// If the service account used for impersonation is no longer available or if a timeout occurs
|
|
// while waiting for resources to be terminated, an error is logged and the finalizer is removed.
|
|
func (r *KustomizationReconciler) finalize(ctx context.Context,
|
|
obj *kustomizev1.Kustomization) (ctrl.Result, error) {
|
|
log := ctrl.LoggerFrom(ctx)
|
|
if finalizerShouldDeleteResources(obj) {
|
|
objects, _ := inventory.List(obj.Status.Inventory)
|
|
|
|
var impersonatorOpts []runtimeClient.ImpersonatorOption
|
|
var mustImpersonate bool
|
|
if r.DefaultServiceAccount != "" || obj.Spec.ServiceAccountName != "" {
|
|
mustImpersonate = true
|
|
impersonatorOpts = append(impersonatorOpts,
|
|
runtimeClient.WithServiceAccount(r.DefaultServiceAccount, obj.Spec.ServiceAccountName, obj.GetNamespace()))
|
|
}
|
|
if obj.Spec.KubeConfig != nil {
|
|
mustImpersonate = true
|
|
impersonatorOpts = append(impersonatorOpts,
|
|
runtimeClient.WithKubeConfig(obj.Spec.KubeConfig, r.KubeConfigOpts, obj.GetNamespace()))
|
|
}
|
|
if r.ClusterReader != nil {
|
|
impersonatorOpts = append(impersonatorOpts, runtimeClient.WithPolling(r.ClusterReader))
|
|
}
|
|
impersonation := runtimeClient.NewImpersonator(r.Client, impersonatorOpts...)
|
|
if impersonation.CanImpersonate(ctx) {
|
|
var kubeClient client.Client
|
|
var err error
|
|
if mustImpersonate {
|
|
kubeClient, _, err = impersonation.GetClient(ctx)
|
|
} else {
|
|
kubeClient = r.Client
|
|
}
|
|
if err != nil {
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
resourceManager := ssa.NewResourceManager(kubeClient, nil, ssa.Owner{
|
|
Field: r.ControllerName,
|
|
Group: kustomizev1.GroupVersion.Group,
|
|
})
|
|
|
|
opts := ssa.DeleteOptions{
|
|
PropagationPolicy: metav1.DeletePropagationBackground,
|
|
Inclusions: resourceManager.GetOwnerLabels(obj.Name, obj.Namespace),
|
|
Exclusions: map[string]string{
|
|
fmt.Sprintf("%s/prune", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
|
|
fmt.Sprintf("%s/reconcile", kustomizev1.GroupVersion.Group): kustomizev1.DisabledValue,
|
|
},
|
|
}
|
|
|
|
changeSet, err := resourceManager.DeleteAll(ctx, objects, opts)
|
|
if err != nil {
|
|
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, "pruning for deleted resource failed", nil)
|
|
// Return the error so we retry the failed garbage collection
|
|
return ctrl.Result{}, err
|
|
}
|
|
|
|
if changeSet != nil && len(changeSet.Entries) > 0 {
|
|
// Emit event with the resources marked for deletion.
|
|
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityInfo, changeSet.String(), nil)
|
|
|
|
// Wait for the resources marked for deletion to be terminated.
|
|
if obj.GetDeletionPolicy() == kustomizev1.DeletionPolicyWaitForTermination {
|
|
if err := resourceManager.WaitForSetTermination(changeSet, ssa.WaitOptions{
|
|
Interval: 2 * time.Second,
|
|
Timeout: obj.GetTimeout(),
|
|
}); err != nil {
|
|
// Emit an event and log the error if a timeout occurs.
|
|
msg := "failed to wait for resources termination"
|
|
log.Error(err, msg)
|
|
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil)
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// when the account to impersonate is gone, log the stale objects and continue with the finalization
|
|
msg := fmt.Sprintf("unable to prune objects: \n%s", ssautil.FmtUnstructuredList(objects))
|
|
log.Error(fmt.Errorf("skiping pruning, failed to find account to impersonate"), msg)
|
|
r.event(obj, obj.Status.LastAppliedRevision, obj.Status.LastAppliedOriginRevision, eventv1.EventSeverityError, msg, nil)
|
|
}
|
|
}
|
|
|
|
// Remove our finalizer from the list and update it
|
|
controllerutil.RemoveFinalizer(obj, kustomizev1.KustomizationFinalizer)
|
|
|
|
// Cleanup caches.
|
|
for _, op := range intcache.AllOperations {
|
|
r.TokenCache.DeleteEventsForObject(kustomizev1.KustomizationKind, obj.GetName(), obj.GetNamespace(), op)
|
|
}
|
|
|
|
// Stop reconciliation as the object is being deleted
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
func (r *KustomizationReconciler) event(obj *kustomizev1.Kustomization,
|
|
revision, originRevision, severity, msg string,
|
|
metadata map[string]string) {
|
|
if metadata == nil {
|
|
metadata = map[string]string{}
|
|
}
|
|
if revision != "" {
|
|
metadata[kustomizev1.GroupVersion.Group+"/"+eventv1.MetaRevisionKey] = revision
|
|
}
|
|
if originRevision != "" {
|
|
metadata[kustomizev1.GroupVersion.Group+"/"+eventv1.MetaOriginRevisionKey] = originRevision
|
|
}
|
|
|
|
reason := severity
|
|
if r := conditions.GetReason(obj, meta.ReadyCondition); r != "" {
|
|
reason = r
|
|
}
|
|
|
|
eventtype := "Normal"
|
|
if severity == eventv1.EventSeverityError {
|
|
eventtype = "Warning"
|
|
}
|
|
|
|
r.EventRecorder.AnnotatedEventf(obj, metadata, eventtype, reason, msg)
|
|
}
|
|
|
|
func (r *KustomizationReconciler) finalizeStatus(ctx context.Context,
|
|
obj *kustomizev1.Kustomization,
|
|
patcher *patch.SerialPatcher) error {
|
|
// Set the value of the reconciliation request in status.
|
|
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
|
|
obj.Status.LastHandledReconcileAt = v
|
|
}
|
|
|
|
// Remove the Reconciling condition and update the observed generation
|
|
// if the reconciliation was successful.
|
|
if conditions.IsTrue(obj, meta.ReadyCondition) {
|
|
conditions.Delete(obj, meta.ReconcilingCondition)
|
|
obj.Status.ObservedGeneration = obj.Generation
|
|
}
|
|
|
|
// Set the Reconciling reason to ProgressingWithRetry if the
|
|
// reconciliation has failed.
|
|
if conditions.IsFalse(obj, meta.ReadyCondition) &&
|
|
conditions.Has(obj, meta.ReconcilingCondition) {
|
|
rc := conditions.Get(obj, meta.ReconcilingCondition)
|
|
rc.Reason = meta.ProgressingWithRetryReason
|
|
conditions.Set(obj, rc)
|
|
}
|
|
|
|
// Patch finalizers, status and conditions.
|
|
return r.patch(ctx, obj, patcher)
|
|
}
|
|
|
|
func (r *KustomizationReconciler) patch(ctx context.Context,
|
|
obj *kustomizev1.Kustomization,
|
|
patcher *patch.SerialPatcher) (retErr error) {
|
|
|
|
// Configure the runtime patcher.
|
|
patchOpts := []patch.Option{}
|
|
ownedConditions := []string{
|
|
meta.HealthyCondition,
|
|
meta.ReadyCondition,
|
|
meta.ReconcilingCondition,
|
|
meta.StalledCondition,
|
|
}
|
|
patchOpts = append(patchOpts,
|
|
patch.WithOwnedConditions{Conditions: ownedConditions},
|
|
patch.WithForceOverwriteConditions{},
|
|
patch.WithFieldOwner(r.statusManager),
|
|
)
|
|
|
|
// Patch the object status, conditions and finalizers.
|
|
if err := patcher.Patch(ctx, obj, patchOpts...); err != nil {
|
|
if !obj.GetDeletionTimestamp().IsZero() {
|
|
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
|
|
}
|
|
retErr = kerrors.NewAggregate([]error{retErr, err})
|
|
if retErr != nil {
|
|
return retErr
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getClientAndPoller creates a status poller with the custom status readers
|
|
// from CEL expressions and the custom job status reader, and returns the
|
|
// Kubernetes client of the controller and the status poller.
|
|
// Should be used for reconciliations that are not configured to use
|
|
// ServiceAccount impersonation or kubeconfig.
|
|
func (r *KustomizationReconciler) getClientAndPoller(
|
|
readerCtors []func(apimeta.RESTMapper) engine.StatusReader,
|
|
) (client.Client, *polling.StatusPoller) {
|
|
|
|
readers := make([]engine.StatusReader, 0, 1+len(readerCtors))
|
|
readers = append(readers, statusreaders.NewCustomJobStatusReader(r.Mapper))
|
|
for _, ctor := range readerCtors {
|
|
readers = append(readers, ctor(r.Mapper))
|
|
}
|
|
|
|
poller := polling.NewStatusPoller(r.Client, r.Mapper, polling.Options{
|
|
CustomStatusReaders: readers,
|
|
ClusterReaderFactory: r.ClusterReader,
|
|
})
|
|
|
|
return r.Client, poller
|
|
}
|
|
|
|
// getOriginRevision returns the origin revision of the source artifact,
|
|
// or the empty string if it's not present, or if the artifact itself
|
|
// is not present.
|
|
func getOriginRevision(src sourcev1.Source) string {
|
|
a := src.GetArtifact()
|
|
if a == nil {
|
|
return ""
|
|
}
|
|
return a.Metadata[OCIArtifactOriginRevisionAnnotation]
|
|
}
|