From 07466730c09a11579c143cccaf3b722575c455a1 Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Tue, 21 Jun 2022 15:42:33 +0300 Subject: [PATCH] Implement OCIRepository controller for public repos Signed-off-by: Stefan Prodan --- api/v1beta2/ocirepository_types.go | 4 +- .../samples/source_v1beta2_ocirepository.yaml | 9 + controllers/ocirepository_controller.go | 593 ++++++++++++++++++ controllers/ocirepository_controller_test.go | 122 ++++ controllers/suite_test.go | 9 + hack/ci/e2e.sh | 2 + main.go | 13 + 7 files changed, 750 insertions(+), 2 deletions(-) create mode 100644 config/samples/source_v1beta2_ocirepository.yaml create mode 100644 controllers/ocirepository_controller.go create mode 100644 controllers/ocirepository_controller_test.go diff --git a/api/v1beta2/ocirepository_types.go b/api/v1beta2/ocirepository_types.go index f308ae49..bc6d830b 100644 --- a/api/v1beta2/ocirepository_types.go +++ b/api/v1beta2/ocirepository_types.go @@ -158,13 +158,13 @@ func (in *OCIRepository) SetConditions(conditions []metav1.Condition) { in.Status.Conditions = conditions } -// GetRequeueAfter returns the duration after which the GitRepository must be +// GetRequeueAfter returns the duration after which the OCIRepository must be // reconciled again. func (in OCIRepository) GetRequeueAfter() time.Duration { return in.Spec.Interval.Duration } -// GetArtifact returns the latest Artifact from the GitRepository if present in +// GetArtifact returns the latest Artifact from the OCIRepository if present in // the status sub-resource. func (in *OCIRepository) GetArtifact() *Artifact { return in.Status.Artifact diff --git a/config/samples/source_v1beta2_ocirepository.yaml b/config/samples/source_v1beta2_ocirepository.yaml new file mode 100644 index 00000000..2fbdf996 --- /dev/null +++ b/config/samples/source_v1beta2_ocirepository.yaml @@ -0,0 +1,9 @@ +apiVersion: source.toolkit.fluxcd.io/v1beta2 +kind: OCIRepository +metadata: + name: ocirepository-sample +spec: + interval: 1m + url: ghcr.io/stefanprodan/manifests/podinfo + ref: + tag: 6.1.6 diff --git a/controllers/ocirepository_controller.go b/controllers/ocirepository_controller.go new file mode 100644 index 00000000..da4916f6 --- /dev/null +++ b/controllers/ocirepository_controller.go @@ -0,0 +1,593 @@ +/* +Copyright 2022 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "errors" + "fmt" + "os" + "time" + + "github.com/google/go-containerregistry/pkg/crane" + gcrv1 "github.com/google/go-containerregistry/pkg/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/uuid" + kuberecorder "k8s.io/client-go/tools/record" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/ratelimiter" + + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/conditions" + helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" + "github.com/fluxcd/pkg/runtime/patch" + "github.com/fluxcd/pkg/runtime/predicates" + "github.com/fluxcd/pkg/untar" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + serror "github.com/fluxcd/source-controller/internal/error" + sreconcile "github.com/fluxcd/source-controller/internal/reconcile" + "github.com/fluxcd/source-controller/internal/reconcile/summarize" +) + +// ociRepositoryReadyCondition contains the information required to summarize a +// v1beta2.OCIRepository Ready Condition. +var ociRepositoryReadyCondition = summarize.Conditions{ + Target: meta.ReadyCondition, + Owned: []string{ + sourcev1.StorageOperationFailedCondition, + sourcev1.FetchFailedCondition, + sourcev1.ArtifactOutdatedCondition, + sourcev1.ArtifactInStorageCondition, + meta.ReadyCondition, + meta.ReconcilingCondition, + meta.StalledCondition, + }, + Summarize: []string{ + sourcev1.StorageOperationFailedCondition, + sourcev1.FetchFailedCondition, + sourcev1.ArtifactOutdatedCondition, + sourcev1.ArtifactInStorageCondition, + meta.StalledCondition, + meta.ReconcilingCondition, + }, + NegativePolarity: []string{ + sourcev1.StorageOperationFailedCondition, + sourcev1.FetchFailedCondition, + sourcev1.ArtifactOutdatedCondition, + meta.StalledCondition, + meta.ReconcilingCondition, + }, +} + +// ociRepositoryFailConditions contains the conditions that represent a failure. +var ociRepositoryFailConditions = []string{ + sourcev1.FetchFailedCondition, + sourcev1.StorageOperationFailedCondition, +} + +// ociRepositoryReconcileFunc is the function type for all the v1beta2.OCIRepository +// (sub)reconcile functions. The type implementations are grouped and +// executed serially to perform the complete reconcile of the object. +type ociRepositoryReconcileFunc func(ctx context.Context, obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) + +// OCIRepositoryReconciler reconciles a v1beta2.OCIRepository object +type OCIRepositoryReconciler struct { + client.Client + helper.Metrics + kuberecorder.EventRecorder + + Storage *Storage + ControllerName string + requeueDependency time.Duration +} + +type OCIRepositoryReconcilerOptions struct { + MaxConcurrentReconciles int + DependencyRequeueInterval time.Duration + RateLimiter ratelimiter.RateLimiter +} + +// SetupWithManager sets up the controller with the Manager. +func (r *OCIRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error { + return r.SetupWithManagerAndOptions(mgr, OCIRepositoryReconcilerOptions{}) +} + +func (r *OCIRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts OCIRepositoryReconcilerOptions) error { + r.requeueDependency = opts.DependencyRequeueInterval + + return ctrl.NewControllerManagedBy(mgr). + For(&sourcev1.OCIRepository{}, builder.WithPredicates( + predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}), + )). + WithOptions(controller.Options{ + MaxConcurrentReconciles: opts.MaxConcurrentReconciles, + RateLimiter: opts.RateLimiter, + }). + Complete(r) +} + +// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=ocirepositories,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=ocirepositories/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=ocirepositories/finalizers,verbs=get;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch + +func (r *OCIRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) { + start := time.Now() + log := ctrl.LoggerFrom(ctx). + // Sets a reconcile ID to correlate logs from all suboperations. + WithValues("reconcileID", uuid.NewUUID()) + + // logger will be associated to the new context that is + // returned from ctrl.LoggerInto. + ctx = ctrl.LoggerInto(ctx, log) + + // Fetch the OCIRepository + obj := &sourcev1.OCIRepository{} + if err := r.Get(ctx, req.NamespacedName, obj); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Record suspended status metric + r.RecordSuspend(ctx, obj, obj.Spec.Suspend) + + // Return early if the object is suspended + if obj.Spec.Suspend { + log.Info("reconciliation is suspended for this object") + return ctrl.Result{}, nil + } + + // Initialize the patch helper with the current version of the object. + patchHelper, err := patch.NewHelper(obj, r.Client) + if err != nil { + return ctrl.Result{}, err + } + + // recResult stores the abstracted reconcile result. + var recResult sreconcile.Result + + // Always attempt to patch the object and status after each reconciliation + // NOTE: The final runtime result and error are set in this block. + defer func() { + summarizeHelper := summarize.NewHelper(r.EventRecorder, patchHelper) + summarizeOpts := []summarize.Option{ + summarize.WithConditions(ociRepositoryReadyCondition), + summarize.WithReconcileResult(recResult), + summarize.WithReconcileError(retErr), + summarize.WithIgnoreNotFound(), + summarize.WithProcessors( + summarize.RecordContextualError, + summarize.RecordReconcileReq, + ), + summarize.WithResultBuilder(sreconcile.AlwaysRequeueResultBuilder{RequeueAfter: obj.GetRequeueAfter()}), + summarize.WithPatchFieldOwner(r.ControllerName), + } + result, retErr = summarizeHelper.SummarizeAndPatch(ctx, obj, summarizeOpts...) + + // Always record readiness and duration metrics + r.Metrics.RecordReadiness(ctx, obj) + r.Metrics.RecordDuration(ctx, obj, start) + }() + + // Add finalizer first if not exist to avoid the race condition between init and delete + if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) { + controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer) + recResult = sreconcile.ResultRequeue + return + } + + // Examine if the object is under deletion + if !obj.ObjectMeta.DeletionTimestamp.IsZero() { + recResult, retErr = r.reconcileDelete(ctx, obj) + return + } + + // Reconcile actual object + reconcilers := []ociRepositoryReconcileFunc{ + r.reconcileStorage, + r.reconcileSource, + r.reconcileArtifact, + } + recResult, retErr = r.reconcile(ctx, obj, reconcilers) + return +} + +// reconcile iterates through the ociRepositoryReconcileFunc tasks for the +// object. It returns early on the first call that returns +// reconcile.ResultRequeue, or produces an error. +func (r *OCIRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.OCIRepository, reconcilers []ociRepositoryReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + + // Mark as reconciling if generation differs. + if obj.Generation != obj.Status.ObservedGeneration { + conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) + } + + // Create temp working dir + tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name)) + if err != nil { + e := &serror.Event{ + Err: fmt.Errorf("failed to create temporary working directory: %w", err), + Reason: sourcev1.DirCreationFailedReason, + } + conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) + return sreconcile.ResultEmpty, e + } + defer func() { + if err = os.RemoveAll(tmpDir); err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to remove temporary working directory") + } + }() + conditions.Delete(obj, sourcev1.StorageOperationFailedCondition) + + hs := gcrv1.Hash{} + var ( + res sreconcile.Result + resErr error + digest = hs.DeepCopy() + ) + + // Run the sub-reconcilers and build the result of reconciliation. + for _, rec := range reconcilers { + recResult, err := rec(ctx, obj, digest, tmpDir) + // Exit immediately on ResultRequeue. + if recResult == sreconcile.ResultRequeue { + return sreconcile.ResultRequeue, nil + } + // If an error is received, prioritize the returned results because an + // error also means immediate requeue. + if err != nil { + resErr = err + res = recResult + break + } + // Prioritize requeue request in the result. + res = sreconcile.LowestRequeuingResult(res, recResult) + } + + r.notify(ctx, oldObj, obj, digest, res, resErr) + + return res, resErr +} + +// notify emits notification related to the reconciliation. +func (r *OCIRepositoryReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.OCIRepository, digest *gcrv1.Hash, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision, + sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum, + } + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + message := fmt.Sprintf("stored artifact with digest '%s' from '%s'", digest.String(), newObj.Spec.URL) + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + "NewArtifact", message) + ctrl.LoggerFrom(ctx).Info(message) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, ociRepositoryFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + meta.SucceededReason, message) + ctrl.LoggerFrom(ctx).Info(message) + } + } + } +} + +// reconcileSource fetches the upstream OCI artifact content. +// If this fails, it records v1beta2.FetchFailedCondition=True on the object and returns early. +func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) { + ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) + defer cancel() + + url := obj.Spec.URL + if obj.Spec.Reference != nil { + if obj.Spec.Reference.Tag != "" { + url = fmt.Sprintf("%s:%s", obj.Spec.URL, obj.Spec.Reference.Tag) + } + if obj.Spec.Reference.Digest != "" { + url = fmt.Sprintf("%s@%s", obj.Spec.URL, obj.Spec.Reference.Digest) + } + } + + // Pull OCI artifact + img, err := crane.Pull(url, r.craneOptions(ctxTimeout)...) + if err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } + + // Fetch digest + imgDigest, err := img.Digest() + if err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } + + // Set revision from digest hex + imgDigest.DeepCopyInto(digest) + revision := imgDigest.Hex + + // Mark observations about the revision on the object + defer func() { + if !obj.GetArtifact().HasRevision(revision) { + message := fmt.Sprintf("new upstream revision '%s'", revision) + conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) + conditions.MarkReconciling(obj, "NewRevision", message) + } + }() + + // Extract the content of the first artifact layer + if !obj.GetArtifact().HasRevision(revision) { + layers, err := img.Layers() + if err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } + + if len(layers) < 1 { + err = fmt.Errorf("no layers found in artifact") + e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } + + blob, err := layers[0].Compressed() + if err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } + + if _, err = untar.Untar(blob, dir); err != nil { + e := &serror.Event{Err: err, Reason: sourcev1.OCIOperationFailedReason} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } + } + + conditions.Delete(obj, sourcev1.FetchFailedCondition) + return sreconcile.ResultSuccess, nil +} + +// reconcileStorage ensures the current state of the storage matches the +// desired and previously observed state. +// +// All Artifacts for the object except for the current one in the Status are +// garbage collected from the Storage. +// If the Artifact in the Status of the object disappeared from the Storage, +// it is removed from the object. +// If the object does not have an Artifact in its Status, a Reconciling +// condition is added. +// The hostname of any URL in the Status of the object are updated, to ensure +// they match the Storage server hostname of current runtime. +func (r *OCIRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.OCIRepository, _ *gcrv1.Hash, _ string) (sreconcile.Result, error) { + // Garbage collect previous advertised artifact(s) from storage + _ = r.garbageCollect(ctx, obj) + + // Determine if the advertised artifact is still in storage + if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) { + obj.Status.Artifact = nil + obj.Status.URL = "" + // Remove the condition as the artifact doesn't exist. + conditions.Delete(obj, sourcev1.ArtifactInStorageCondition) + } + + // Record that we do not have an artifact + if obj.GetArtifact() == nil { + conditions.MarkReconciling(obj, "NoArtifact", "no artifact for resource in storage") + conditions.Delete(obj, sourcev1.ArtifactInStorageCondition) + return sreconcile.ResultSuccess, nil + } + + // Always update URLs to ensure hostname is up-to-date + r.Storage.SetArtifactURL(obj.GetArtifact()) + obj.Status.URL = r.Storage.SetHostname(obj.Status.URL) + + return sreconcile.ResultSuccess, nil +} + +// reconcileArtifact archives a new Artifact to the Storage, if the current +// (Status) data on the object does not match the given. +// +// The inspection of the given data to the object is differed, ensuring any +// stale observations like v1beta2.ArtifactOutdatedCondition are removed. +// If the given Artifact does not differ from the object's current, it returns +// early. +// On a successful archive, the Artifact in the Status of the object is set, +// and the symlink in the Storage is updated to its path. +func (r *OCIRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.OCIRepository, digest *gcrv1.Hash, dir string) (sreconcile.Result, error) { + // Calculate revision + revision := digest.Hex + + // Create artifact + artifact := r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + + // Set the ArtifactInStorageCondition if there's no drift. + defer func() { + if obj.GetArtifact().HasRevision(artifact.Revision) { + conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition) + conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason, + "stored artifact for revision '%s'", artifact.Revision) + } + }() + + // The artifact is up-to-date + if obj.GetArtifact().HasRevision(artifact.Revision) { + r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.ArtifactUpToDateReason, "artifact up-to-date with remote revision: '%s'", artifact.Revision) + return sreconcile.ResultSuccess, nil + } + + // Ensure target path exists and is a directory + if f, err := os.Stat(dir); err != nil { + e := &serror.Event{ + Err: fmt.Errorf("failed to stat source path: %w", err), + Reason: sourcev1.StatOperationFailedReason, + } + conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) + return sreconcile.ResultEmpty, e + } else if !f.IsDir() { + e := &serror.Event{ + Err: fmt.Errorf("source path '%s' is not a directory", dir), + Reason: sourcev1.InvalidPathReason, + } + conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) + return sreconcile.ResultEmpty, e + } + + // Ensure artifact directory exists and acquire lock + if err := r.Storage.MkdirAll(artifact); err != nil { + e := &serror.Event{ + Err: fmt.Errorf("failed to create artifact directory: %w", err), + Reason: sourcev1.DirCreationFailedReason, + } + conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) + return sreconcile.ResultEmpty, e + } + unlock, err := r.Storage.Lock(artifact) + if err != nil { + return sreconcile.ResultEmpty, &serror.Event{ + Err: fmt.Errorf("failed to acquire lock for artifact: %w", err), + Reason: meta.FailedReason, + } + } + defer unlock() + + // Archive directory to storage + if err := r.Storage.Archive(&artifact, dir, nil); err != nil { + e := &serror.Event{ + Err: fmt.Errorf("unable to archive artifact to storage: %s", err), + Reason: sourcev1.ArchiveOperationFailedReason, + } + conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) + return sreconcile.ResultEmpty, e + } + + // Record it on the object + obj.Status.Artifact = artifact.DeepCopy() + + // Update symlink on a "best effort" basis + url, err := r.Storage.Symlink(artifact, "latest.tar.gz") + if err != nil { + r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.SymlinkUpdateFailedReason, + "failed to update status URL symlink: %s", err) + } + if url != "" { + obj.Status.URL = url + } + conditions.Delete(obj, sourcev1.StorageOperationFailedCondition) + return sreconcile.ResultSuccess, nil +} + +// reconcileDelete handles the deletion of the object. +// It first garbage collects all Artifacts for the object from the Storage. +// Removing the finalizer from the object if successful. +func (r *OCIRepositoryReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.OCIRepository) (sreconcile.Result, error) { + // Garbage collect the resource's artifacts + if err := r.garbageCollect(ctx, obj); err != nil { + // Return the error so we retry the failed garbage collection + return sreconcile.ResultEmpty, err + } + + // Remove our finalizer from the list + controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer) + + // Stop reconciliation as the object is being deleted + return sreconcile.ResultEmpty, nil +} + +// garbageCollect performs a garbage collection for the given object. +// +// It removes all but the current Artifact from the Storage, unless the +// deletion timestamp on the object is set. Which will result in the +// removal of all Artifacts for the objects. +func (r *OCIRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourcev1.OCIRepository) error { + if !obj.DeletionTimestamp.IsZero() { + if deleted, err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil { + return serror.NewGeneric( + fmt.Errorf("garbage collection for deleted resource failed: %w", err), + "GarbageCollectionFailed", + ) + } else if deleted != "" { + r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", + "garbage collected artifacts for deleted resource") + } + obj.Status.Artifact = nil + return nil + } + if obj.GetArtifact() != nil { + delFiles, err := r.Storage.GarbageCollect(ctx, *obj.GetArtifact(), time.Second*5) + if err != nil { + return serror.NewGeneric( + fmt.Errorf("garbage collection of artifacts failed: %w", err), + "GarbageCollectionFailed", + ) + } + if len(delFiles) > 0 { + r.eventLogf(ctx, obj, events.EventTypeTrace, "GarbageCollectionSucceeded", + fmt.Sprintf("garbage collected %d artifacts", len(delFiles))) + return nil + } + } + return nil +} + +// eventLogf records events, and logs at the same time. +// +// This log is different from the debug log in the EventRecorder, in the sense +// that this is a simple log. While the debug log contains complete details +// about the event. +func (r *OCIRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) { + msg := fmt.Sprintf(messageFmt, args...) + // Log and emit event. + if eventType == corev1.EventTypeWarning { + ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg) + } else { + ctrl.LoggerFrom(ctx).Info(msg) + } + r.Eventf(obj, eventType, reason, msg) +} + +func (r *OCIRepositoryReconciler) craneOptions(ctx context.Context) []crane.Option { + return []crane.Option{ + crane.WithContext(ctx), + crane.WithUserAgent("flux/v2"), + crane.WithPlatform(&gcrv1.Platform{ + Architecture: "flux", + OS: "flux", + OSVersion: "v2", + }), + } +} diff --git a/controllers/ocirepository_controller_test.go b/controllers/ocirepository_controller_test.go new file mode 100644 index 00000000..561ef42e --- /dev/null +++ b/controllers/ocirepository_controller_test.go @@ -0,0 +1,122 @@ +package controllers + +import ( + "testing" + "time" + + "github.com/darkowlzz/controller-check/status" + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/conditions" + "github.com/fluxcd/pkg/runtime/patch" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" + . "github.com/onsi/gomega" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kstatus "sigs.k8s.io/cli-utils/pkg/kstatus/status" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestOCIRepository_Reconcile(t *testing.T) { + + tests := []struct { + name string + url string + tag string + digest string + }{ + { + name: "public latest", + url: "ghcr.io/stefanprodan/manifests/podinfo", + tag: "6.1.6", + digest: "3b6cdcc7adcc9a84d3214ee1c029543789d90b5ae69debe9efa3f66e982875de", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + ns, err := testEnv.CreateNamespace(ctx, "ocirepository-reconcile-test") + g.Expect(err).ToNot(HaveOccurred()) + defer func() { g.Expect(testEnv.Delete(ctx, ns)).To(Succeed()) }() + + obj := &sourcev1.OCIRepository{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "ocirepository-reconcile", + Namespace: ns.Name, + }, + Spec: sourcev1.OCIRepositorySpec{ + URL: tt.url, + Interval: metav1.Duration{Duration: 60 * time.Minute}, + Reference: &sourcev1.OCIRepositoryRef{ + Tag: tt.tag, + }, + }, + } + + g.Expect(testEnv.Create(ctx, obj)).To(Succeed()) + + key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace} + + // Wait for the finalizer to be set + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return false + } + return len(obj.Finalizers) > 0 + }, timeout).Should(BeFalse()) + + // Wait for the object to be Ready + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return false + } + if !conditions.IsReady(obj) { + return false + } + readyCondition := conditions.Get(obj, meta.ReadyCondition) + return obj.Generation == readyCondition.ObservedGeneration && + obj.Generation == obj.Status.ObservedGeneration + }, timeout).Should(BeTrue()) + + // Check if the revision is set to the digest format + g.Expect(obj.Status.Artifact.Revision).To(Equal(tt.digest)) + + // Check if the object status is valid + condns := &status.Conditions{NegativePolarity: ociRepositoryReadyCondition.NegativePolarity} + checker := status.NewChecker(testEnv.Client, condns) + checker.CheckErr(ctx, obj) + + // kstatus client conformance check + u, err := patch.ToUnstructured(obj) + g.Expect(err).ToNot(HaveOccurred()) + res, err := kstatus.Compute(u) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(res.Status).To(Equal(kstatus.CurrentStatus)) + + // Patch the object with reconcile request annotation. + patchHelper, err := patch.NewHelper(obj, testEnv.Client) + g.Expect(err).ToNot(HaveOccurred()) + annotations := map[string]string{ + meta.ReconcileRequestAnnotation: "now", + } + obj.SetAnnotations(annotations) + g.Expect(patchHelper.Patch(ctx, obj)).ToNot(HaveOccurred()) + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return false + } + return obj.Status.LastHandledReconcileAt == "now" + }, timeout).Should(BeTrue()) + + // Wait for the object to be deleted + g.Expect(testEnv.Delete(ctx, obj)).To(Succeed()) + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return apierrors.IsNotFound(err) + } + return false + }, timeout).Should(BeTrue()) + }) + } +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 011b5de7..39711a2d 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -235,6 +235,15 @@ func TestMain(m *testing.M) { testCache = cache.New(5, 1*time.Second) cacheRecorder := cache.MustMakeMetrics() + if err := (&OCIRepositoryReconciler{ + Client: testEnv, + EventRecorder: record.NewFakeRecorder(32), + Metrics: testMetricsH, + Storage: testStorage, + }).SetupWithManager(testEnv); err != nil { + panic(fmt.Sprintf("Failed to start OCIRepositoryReconciler: %v", err)) + } + if err := (&HelmRepositoryReconciler{ Client: testEnv, EventRecorder: record.NewFakeRecorder(32), diff --git a/hack/ci/e2e.sh b/hack/ci/e2e.sh index cbeac1d8..ccb2540f 100755 --- a/hack/ci/e2e.sh +++ b/hack/ci/e2e.sh @@ -36,6 +36,7 @@ function cleanup(){ kubectl -n kube-system describe pods kubectl -n source-system describe pods kubectl -n source-system get gitrepositories -oyaml + kubectl -n source-system get ocirepositories -oyaml kubectl -n source-system get helmrepositories -oyaml kubectl -n source-system get helmcharts -oyaml kubectl -n source-system get all @@ -72,6 +73,7 @@ echo "Run smoke tests" kubectl -n source-system apply -f "${ROOT_DIR}/config/samples" kubectl -n source-system rollout status deploy/source-controller --timeout=1m kubectl -n source-system wait gitrepository/gitrepository-sample --for=condition=ready --timeout=1m +kubectl -n source-system wait ocirepository/ocirepository-sample --for=condition=ready --timeout=1m kubectl -n source-system wait helmrepository/helmrepository-sample --for=condition=ready --timeout=1m kubectl -n source-system wait helmchart/helmchart-sample --for=condition=ready --timeout=1m kubectl -n source-system delete -f "${ROOT_DIR}/config/samples" diff --git a/main.go b/main.go index 0121fd62..621cea36 100644 --- a/main.go +++ b/main.go @@ -309,6 +309,19 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Bucket") os.Exit(1) } + if err = (&controllers.OCIRepositoryReconciler{ + Client: mgr.GetClient(), + Storage: storage, + EventRecorder: eventRecorder, + ControllerName: controllerName, + Metrics: metricsH, + }).SetupWithManagerAndOptions(mgr, controllers.OCIRepositoryReconcilerOptions{ + MaxConcurrentReconciles: concurrent, + RateLimiter: helper.GetRateLimiter(rateLimiterOptions), + }); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "OCIRepository") + os.Exit(1) + } // +kubebuilder:scaffold:builder go func() {