From 89ba8374b673cd30da9adf242ecab77842026a75 Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Sat, 31 Jul 2021 03:58:43 +0200 Subject: [PATCH] Rewrite `BucketReconciler` to new standards This commit rewrites the `BucketReconciler` to new standards, while implementing the newly introduced Condition types, and trying to adhere better to Kubernetes API conventions. More specifically it introduces: - Implementation of more explicit Condition types to highlight abnormalities. - Extensive usage of the `conditions` subpackage from `runtime`. - Better and more conflict-resilient (status)patching of reconciled objects using the `patch` subpackage from runtime. - Proper implementation of kstatus' `Reconciling` and `Stalled` conditions. - Refactor of reconciler logic, including more efficient detection of changes to bucket objects by making use of the etag data available, and downloading of object files in parallel with a limited number of workers (4). - Integration tests that solely rely on `testenv` and do not use Ginkgo. There are a couple of TODOs marked in-code, these are suggestions for the future and should be non-blocking. In addition to the TODOs, more complex and/or edge-case test scenarios may be added as well. Signed-off-by: Hidde Beydals --- api/v1beta2/bucket_types.go | 60 +- controllers/bucket_controller.go | 1044 ++++++++++++++----------- controllers/bucket_controller_test.go | 683 +++++++++++++++- controllers/suite_test.go | 9 + main.go | 9 +- 5 files changed, 1288 insertions(+), 517 deletions(-) diff --git a/api/v1beta2/bucket_types.go b/api/v1beta2/bucket_types.go index 4626f169..d074fc60 100644 --- a/api/v1beta2/bucket_types.go +++ b/api/v1beta2/bucket_types.go @@ -19,12 +19,10 @@ package v1beta2 import ( "time" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/fluxcd/pkg/apis/acl" "github.com/fluxcd/pkg/apis/meta" - "github.com/fluxcd/pkg/runtime/conditions" ) const ( @@ -32,6 +30,19 @@ const ( BucketKind = "Bucket" ) +const ( + GenericBucketProvider string = "generic" + AmazonBucketProvider string = "aws" + GoogleBucketProvider string = "gcp" +) + +const ( + // DownloadFailedCondition indicates a transient or persistent download failure. If True, observations on the + // upstream Source revision are not possible, and the Artifact available for the Source may be outdated. + // This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True. + DownloadFailedCondition string = "DownloadFailed" +) + // BucketSpec defines the desired state of an S3 compatible bucket type BucketSpec struct { // The S3 compatible storage provider name, default ('generic'). @@ -85,12 +96,6 @@ type BucketSpec struct { AccessFrom *acl.AccessFrom `json:"accessFrom,omitempty"` } -const ( - GenericBucketProvider string = "generic" - AmazonBucketProvider string = "aws" - GoogleBucketProvider string = "gcp" -) - // BucketStatus defines the observed state of a bucket type BucketStatus struct { // ObservedGeneration is the last observed generation. @@ -122,45 +127,6 @@ const ( BucketOperationFailedReason string = "BucketOperationFailed" ) -// BucketProgressing resets the conditions of the Bucket to metav1.Condition of -// type meta.ReadyCondition with status 'Unknown' and meta.ProgressingReason -// reason and message. It returns the modified Bucket. -func BucketProgressing(bucket Bucket) Bucket { - bucket.Status.ObservedGeneration = bucket.Generation - bucket.Status.URL = "" - bucket.Status.Conditions = []metav1.Condition{} - conditions.MarkUnknown(&bucket, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress") - return bucket -} - -// BucketReady sets the given Artifact and URL on the Bucket and sets the -// meta.ReadyCondition to 'True', with the given reason and message. It returns -// the modified Bucket. -func BucketReady(bucket Bucket, artifact Artifact, url, reason, message string) Bucket { - bucket.Status.Artifact = &artifact - bucket.Status.URL = url - conditions.MarkTrue(&bucket, meta.ReadyCondition, reason, message) - return bucket -} - -// BucketNotReady sets the meta.ReadyCondition on the Bucket to 'False', with -// the given reason and message. It returns the modified Bucket. -func BucketNotReady(bucket Bucket, reason, message string) Bucket { - conditions.MarkFalse(&bucket, meta.ReadyCondition, reason, message) - return bucket -} - -// BucketReadyMessage returns the message of the metav1.Condition of type -// meta.ReadyCondition with status 'True' if present, or an empty string. -func BucketReadyMessage(bucket Bucket) string { - if c := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil { - if c.Status == metav1.ConditionTrue { - return c.Message - } - } - return "" -} - // GetConditions returns the status conditions of the object. func (in Bucket) GetConditions() []metav1.Condition { return in.Status.Conditions diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 44b9ee03..e97f97b2 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -18,24 +18,27 @@ package controllers import ( "context" - "crypto/sha1" + "crypto/sha256" "fmt" "os" "path/filepath" + "sort" "strings" "time" + gcpstorage "cloud.google.com/go/storage" + "github.com/fluxcd/source-controller/pkg/gcp" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/s3utils" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" "google.golang.org/api/option" corev1 "k8s.io/api/core/v1" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + kerrors "k8s.io/apimachinery/pkg/util/errors" kuberecorder "k8s.io/client-go/tools/record" - "k8s.io/client-go/tools/reference" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -43,10 +46,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/fluxcd/pkg/apis/meta" - "github.com/fluxcd/pkg/runtime/events" - "github.com/fluxcd/pkg/runtime/metrics" + "github.com/fluxcd/pkg/runtime/conditions" + helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" - "github.com/fluxcd/source-controller/pkg/gcp" sourcev1 "github.com/fluxcd/source-controller/api/v1beta2" "github.com/fluxcd/source-controller/pkg/sourceignore" @@ -60,11 +63,10 @@ import ( // BucketReconciler reconciles a Bucket object type BucketReconciler struct { client.Client - Scheme *runtime.Scheme - Storage *Storage - EventRecorder kuberecorder.EventRecorder - ExternalEventRecorder *events.Recorder - MetricsRecorder *metrics.Recorder + kuberecorder.EventRecorder + helper.Metrics + + Storage *Storage } type BucketReconcilerOptions struct { @@ -83,244 +85,434 @@ func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts Buc Complete(r) } -func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) { start := time.Now() log := ctrl.LoggerFrom(ctx) - var bucket sourcev1.Bucket - if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil { + // Fetch the Bucket + obj := &sourcev1.Bucket{} + if err := r.Get(ctx, req.NamespacedName, obj); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } // Record suspended status metric - defer r.recordSuspension(ctx, bucket) + r.RecordSuspend(ctx, obj, obj.Spec.Suspend) - // Add our finalizer if it does not exist - if !controllerutil.ContainsFinalizer(&bucket, sourcev1.SourceFinalizer) { - patch := client.MergeFrom(bucket.DeepCopy()) - controllerutil.AddFinalizer(&bucket, sourcev1.SourceFinalizer) - if err := r.Patch(ctx, &bucket, patch); err != nil { - log.Error(err, "unable to register finalizer") - return ctrl.Result{}, err - } - } - - // Examine if the object is under deletion - if !bucket.ObjectMeta.DeletionTimestamp.IsZero() { - return r.reconcileDelete(ctx, bucket) - } - - // Return early if the object is suspended. - if bucket.Spec.Suspend { + // Return early if the object is suspended + if obj.Spec.Suspend { log.Info("Reconciliation is suspended for this object") return ctrl.Result{}, nil } - // record reconciliation duration - if r.MetricsRecorder != nil { - objRef, err := reference.GetReference(r.Scheme, &bucket) - if err != nil { - return ctrl.Result{}, err - } - defer r.MetricsRecorder.RecordDuration(*objRef, start) - } - - // set initial status - if resetBucket, ok := r.resetStatus(bucket); ok { - bucket = resetBucket - if err := r.updateStatus(ctx, req, bucket.Status); err != nil { - log.Error(err, "unable to update status") - return ctrl.Result{Requeue: true}, err - } - r.recordReadiness(ctx, bucket) - } - - // record the value of the reconciliation request, if any - // TODO(hidde): would be better to defer this in combination with - // always patching the status sub-resource after a reconciliation. - if v, ok := meta.ReconcileAnnotationValue(bucket.GetAnnotations()); ok { - bucket.Status.SetLastHandledReconcileRequest(v) - } - - // purge old artifacts from storage - if err := r.gc(bucket); err != nil { - log.Error(err, "unable to purge old artifacts") - } - - // reconcile bucket by downloading its content - reconciledBucket, reconcileErr := r.reconcile(ctx, *bucket.DeepCopy()) - - // update status with the reconciliation result - if err := r.updateStatus(ctx, req, reconciledBucket.Status); err != nil { - log.Error(err, "unable to update status") - return ctrl.Result{Requeue: true}, err - } - - // if reconciliation failed, record the failure and requeue immediately - if reconcileErr != nil { - r.event(ctx, reconciledBucket, events.EventSeverityError, reconcileErr.Error()) - r.recordReadiness(ctx, reconciledBucket) - return ctrl.Result{Requeue: true}, reconcileErr - } - - // emit revision change event - if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision { - r.event(ctx, reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket)) - } - r.recordReadiness(ctx, reconciledBucket) - - log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s", - time.Since(start).String(), - bucket.GetRequeueAfter().String(), - )) - - return ctrl.Result{RequeueAfter: bucket.GetRequeueAfter()}, nil -} - -func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) { - log := ctrl.LoggerFrom(ctx) - var err error - var sourceBucket sourcev1.Bucket - - tempDir, err := os.MkdirTemp("", bucket.Name) + // Initialize the patch helper + patchHelper, err := patch.NewHelper(obj, r.Client) if err != nil { - err = fmt.Errorf("tmp dir error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err + return ctrl.Result{}, err } + + // Always attempt to patch the object and status after each reconciliation defer func() { - if err := os.RemoveAll(tempDir); err != nil { - log.Error(err, "failed to remove working directory", "path", tempDir) + // Record the value of the reconciliation request, if any + if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok { + obj.Status.SetLastHandledReconcileRequest(v) } + + // Summarize the Ready condition based on abnormalities that may have been observed + conditions.SetSummary(obj, + meta.ReadyCondition, + conditions.WithConditions( + sourcev1.ArtifactOutdatedCondition, + sourcev1.DownloadFailedCondition, + sourcev1.ArtifactUnavailableCondition, + ), + conditions.WithNegativePolarityConditions( + sourcev1.ArtifactOutdatedCondition, + sourcev1.DownloadFailedCondition, + sourcev1.ArtifactUnavailableCondition, + ), + ) + + // Patch the object, ignoring conflicts on the conditions owned by this controller + patchOpts := []patch.Option{ + patch.WithOwnedConditions{ + Conditions: []string{ + sourcev1.ArtifactOutdatedCondition, + sourcev1.DownloadFailedCondition, + sourcev1.ArtifactUnavailableCondition, + meta.ReadyCondition, + meta.ReconcilingCondition, + meta.StalledCondition, + }, + }, + } + + // Determine if the resource is still being reconciled, or if it has stalled, and record this observation + if retErr == nil && (result.IsZero() || !result.Requeue) { + // We are no longer reconciling + conditions.Delete(obj, meta.ReconcilingCondition) + + // We have now observed this generation + patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{}) + + readyCondition := conditions.Get(obj, meta.ReadyCondition) + switch readyCondition.Status { + case metav1.ConditionFalse: + // As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled + conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message) + case metav1.ConditionTrue: + // As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled + conditions.Delete(obj, meta.StalledCondition) + } + } + + // Finally, patch the resource + if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil { + retErr = kerrors.NewAggregate([]error{retErr, err}) + } + + // Always record readiness and duration metrics + r.Metrics.RecordReadiness(ctx, obj) + r.Metrics.RecordDuration(ctx, obj, start) }() - if bucket.Spec.Provider == sourcev1.GoogleBucketProvider { - sourceBucket, err = r.reconcileWithGCP(ctx, bucket, tempDir) - if err != nil { - return sourceBucket, err - } - } else { - sourceBucket, err = r.reconcileWithMinio(ctx, bucket, tempDir) - if err != nil { - return sourceBucket, err - } - } - revision, err := r.checksum(tempDir) - if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err + // Add finalizer first if not exist to avoid the race condition between init and delete + if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) { + controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer) + return ctrl.Result{Requeue: true}, nil } - // return early on unchanged revision - artifact := r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", revision)) - if apimeta.IsStatusConditionTrue(bucket.Status.Conditions, meta.ReadyCondition) && bucket.GetArtifact().HasRevision(artifact.Revision) { - if artifact.URL != bucket.GetArtifact().URL { - r.Storage.SetArtifactURL(bucket.GetArtifact()) - bucket.Status.URL = r.Storage.SetHostname(bucket.Status.URL) - } - return bucket, nil + // Examine if the object is under deletion + if !obj.ObjectMeta.DeletionTimestamp.IsZero() { + return r.reconcileDelete(ctx, obj) } - // create artifact dir - err = r.Storage.MkdirAll(artifact) - if err != nil { - err = fmt.Errorf("mkdir dir error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err - } - - // acquire lock - unlock, err := r.Storage.Lock(artifact) - if err != nil { - err = fmt.Errorf("unable to acquire lock: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err - } - defer unlock() - - // archive artifact and check integrity - if err := r.Storage.Archive(&artifact, tempDir, nil); err != nil { - err = fmt.Errorf("storage archive error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err - } - - // update latest symlink - url, err := r.Storage.Symlink(artifact, "latest.tar.gz") - if err != nil { - err = fmt.Errorf("storage symlink error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err - } - - message := fmt.Sprintf("Fetched revision: %s", artifact.Revision) - return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil + // Reconcile actual object + return r.reconcile(ctx, obj) } -func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { - if err := r.gc(bucket); err != nil { - r.event(ctx, bucket, events.EventSeverityError, - fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) - // Return the error so we retry the failed garbage collection +// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that +// produces an error. +func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) { + // Mark the resource as under reconciliation + conditions.MarkReconciling(obj, meta.ProgressingReason, "") + + // Reconcile the storage data + if result, err := r.reconcileStorage(ctx, obj); err != nil || result.IsZero() { + return result, err + } + + // Create temp working dir + tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name)) + if err != nil { + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason, "Failed to create temporary directory: %s", err) + return ctrl.Result{}, err + } + defer os.RemoveAll(tmpDir) + + // Reconcile the source from upstream + var artifact sourcev1.Artifact + if result, err := r.reconcileSource(ctx, obj, &artifact, tmpDir); err != nil || result.IsZero() { + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, err + } + + // Reconcile the artifact to storage + if result, err := r.reconcileArtifact(ctx, obj, artifact, tmpDir); err != nil || result.IsZero() { + return result, err + } + + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil +} + +// reconcileStorage ensures the current state of the storage matches the desired and previously observed state. +// +// All artifacts for the resource except for the current one are garbage collected from the storage. +// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object. +// If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set. +// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated. +// +// The caller should assume a failure if an error is returned, or the Result is zero. +func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) { + // Garbage collect previous advertised artifact(s) from storage + _ = r.garbageCollect(ctx, obj) + + // Determine if the advertised artifact is still in storage + if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) { + obj.Status.Artifact = nil + obj.Status.URL = "" + } + + // Record that we do not have an artifact + if obj.GetArtifact() == nil { + conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage") + return ctrl.Result{Requeue: true}, nil + } + conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition) + + // Always update URLs to ensure hostname is up-to-date + // TODO(hidde): we may want to send out an event only if we notice the URL has changed + r.Storage.SetArtifactURL(obj.GetArtifact()) + obj.Status.URL = r.Storage.SetHostname(obj.Status.URL) + + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil +} + +// reconcileSource reconciles the upstream bucket with the client for the given object's Provider, and returns the +// result. +// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret +// fails, it records v1beta1.DownloadFailedCondition=True and returns early. +// +// The caller should assume a failure if an error is returned, or the Result is zero. +func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (ctrl.Result, error) { + var secret corev1.Secret + if obj.Spec.SecretRef != nil { + secretName := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.Spec.SecretRef.Name, + } + if err := r.Get(ctx, secretName, &secret); err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason, + "Failed to get secret '%s': %s", secretName.String(), err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.AuthenticationFailedReason, + "Failed to get secret '%s': %s", secretName.String(), err.Error()) + // Return error as the world as observed may change + return ctrl.Result{}, err + } + } + + switch obj.Spec.Provider { + case sourcev1.GoogleBucketProvider: + return r.reconcileGCPSource(ctx, obj, artifact, &secret, dir) + default: + return r.reconcileMinioSource(ctx, obj, artifact, &secret, dir) + } +} + +// reconcileMinioSource ensures the upstream Minio client compatible bucket can be reached and downloaded from using the +// declared configuration, and observes its state. +// +// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into +// account. In case of an error during the download process (including transient errors), it records +// v1beta1.DownloadFailedCondition=True and returns early. +// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to +// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ. +// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata. +// +// The caller should assume a failure if an error is returned, or the Result is zero. +func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, + secret *corev1.Secret, dir string) (ctrl.Result, error) { + // Build the client with the configuration from the object and secret + s3Client, err := r.buildMinioClient(obj, secret) + if err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to construct S3 client: %s", err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Failed to construct S3 client: %s", err.Error()) + // Return error as the contents of the secret may change return ctrl.Result{}, err } - // Record deleted status - r.recordReadiness(ctx, bucket) - - // Remove our finalizer from the list and update it - controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) - if err := r.Update(ctx, &bucket); err != nil { - return ctrl.Result{}, err - } - - // Stop reconciliation as the object is being deleted - return ctrl.Result{}, nil -} - -// reconcileWithGCP handles getting objects from a Google Cloud Platform bucket -// using a gcp client -func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { - log := ctrl.LoggerFrom(ctx) - gcpClient, err := r.authGCP(ctx, bucket) - if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err - } - defer gcpClient.Close(log) - - ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) + // Confirm bucket exists + ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) defer cancel() - - exists, err := gcpClient.BucketExists(ctxTimeout, bucket.Spec.BucketName) + exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName) if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error()) + return ctrl.Result{}, err } if !exists { - err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Bucket '%s' does not exist", obj.Spec.BucketName) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Bucket '%s' does not exist", obj.Spec.BucketName) + return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName) } - // Look for file with ignore rules first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { - if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + // Look for file with ignore rules first + path := filepath.Join(dir, sourceignore.IgnoreFile) + if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { + if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + return ctrl.Result{}, err } } ps, err := sourceignore.ReadIgnoreFile(path, nil) if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + return ctrl.Result{}, err } // In-spec patterns take precedence - if bucket.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) + if obj.Spec.Ignore != nil { + ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) } matcher := sourceignore.NewMatcher(ps) - objects := gcpClient.ListObjects(ctxTimeout, bucket.Spec.BucketName, nil) - // download bucket content + + // Build up an index of object keys and their etags + // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to + // detect both structural and file changes + var index = make(etagIndex) + for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{ + Recursive: true, + UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()), + }) { + if err = object.Err; err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error()) + return ctrl.Result{}, err + } + + // Ignore directories and the .sourceignore file + if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { + continue + } + // Ignore matches + if matcher.Match(strings.Split(object.Key, "/"), false) { + continue + } + + index[object.Key] = object.ETag + } + + // Calculate revision checksum from the collected index values + revision, err := index.Revision() + if err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision") + return ctrl.Result{}, err + } + + if !obj.GetArtifact().HasRevision(revision) { + // Mark observations about the revision on the object + conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", + "New upstream revision '%s'", revision) + + // Download the files in parallel, but with a limited number of workers + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + const workers = 4 + sem := semaphore.NewWeighted(workers) + for key := range index { + k := key + if err := sem.Acquire(groupCtx, 1); err != nil { + return err + } + group.Go(func() error { + defer sem.Release(1) + localPath := filepath.Join(dir, k) + if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil { + return fmt.Errorf("failed to get '%s' file: %w", k, err) + } + return nil + }) + } + return nil + }) + if err = group.Wait(); err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err) + return ctrl.Result{}, err + } + r.Eventf(obj, corev1.EventTypeNormal, sourcev1.BucketOperationSucceedReason, + "Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision) + } + conditions.Delete(obj, sourcev1.DownloadFailedCondition) + + // Create potential new artifact + *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil +} + +// reconcileGCPSource ensures the upstream Google Cloud Storage bucket can be reached and downloaded from using the +// declared configuration, and observes its state. +// +// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into +// account. In case of an error during the download process (including transient errors), it records +// v1beta1.DownloadFailedCondition=True and returns early. +// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to +// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ. +// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata. +// +// The caller should assume a failure if an error is returned, or the Result is zero. +func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, + secret *corev1.Secret, dir string) (ctrl.Result, error) { + gcpClient, err := r.buildGCPClient(ctx, secret) + if err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to construct GCP client: %s", err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Failed to construct GCP client: %s", err.Error()) + // Return error as the contents of the secret may change + return ctrl.Result{}, err + } + defer gcpClient.Close(ctrl.LoggerFrom(ctx)) + + // Confirm bucket exists + ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) + defer cancel() + exists, err := gcpClient.BucketExists(ctxTimeout, obj.Spec.BucketName) + if err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error()) + return ctrl.Result{}, err + } + if !exists { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Bucket '%s' does not exist", obj.Spec.BucketName) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Bucket '%s' does not exist", obj.Spec.BucketName) + return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName) + } + + // Look for file with ignore rules first + path := filepath.Join(dir, sourceignore.IgnoreFile) + if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil { + if err != gcpstorage.ErrObjectNotExist { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + return ctrl.Result{}, err + } + } + ps, err := sourceignore.ReadIgnoreFile(path, nil) + if err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + return ctrl.Result{}, err + } + // In-spec patterns take precedence + if obj.Spec.Ignore != nil { + ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) + } + matcher := sourceignore.NewMatcher(ps) + + // Build up an index of object keys and their etags + // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to + // detect both structural and file changes + var index = make(etagIndex) + objects := gcpClient.ListObjects(ctxTimeout, obj.Spec.BucketName, nil) for { object, err := objects.Next() - if err == gcp.IteratorDone { - break - } if err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + if err == gcp.IteratorDone { + break + } + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error()) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error()) + return ctrl.Result{}, err } if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile { @@ -331,98 +523,211 @@ func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1 continue } - localPath := filepath.Join(tempDir, object.Name) - if err = gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Name, localPath); err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } + index[object.Name] = object.Etag } - return sourcev1.Bucket{}, nil + + // Calculate revision checksum from the collected index values + revision, err := index.Revision() + if err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision") + return ctrl.Result{}, err + } + + if !obj.GetArtifact().HasRevision(revision) { + // Mark observations about the revision on the object + conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", + "New upstream revision '%s'", revision) + + // Download the files in parallel, but with a limited number of workers + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + const workers = 4 + sem := semaphore.NewWeighted(workers) + for key := range index { + k := key + if err := sem.Acquire(groupCtx, 1); err != nil { + return err + } + group.Go(func() error { + defer sem.Release(1) + localPath := filepath.Join(dir, k) + if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath); err != nil { + return fmt.Errorf("failed to get '%s' file: %w", k, err) + } + return nil + }) + } + return nil + }) + if err = group.Wait(); err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err) + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason, + "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err) + return ctrl.Result{}, err + } + r.Eventf(obj, corev1.EventTypeNormal, sourcev1.BucketOperationSucceedReason, + "Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision) + } + conditions.Delete(obj, sourcev1.DownloadFailedCondition) + + // Create potential new artifact + *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil } -// reconcileWithMinio handles getting objects from an S3 compatible bucket -// using a minio client -func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) { - s3Client, err := r.authMinio(ctx, bucket) +// reconcileArtifact archives a new artifact to the storage, if the current observation on the object does not match the +// given data. +// +// The inspection of the given data to the object is differed, ensuring any stale observations as +// v1beta1.ArtifactUnavailableCondition and v1beta1.ArtifactOutdatedCondition are always deleted. +// If the given artifact does not differ from the object's current, it returns early. +// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is +// updated to its path. +// +// The caller should assume a failure if an error is returned, or the Result is zero. +func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) (ctrl.Result, error) { + // Always restore the Ready condition in case it got removed due to a transient error + defer func() { + if obj.GetArtifact() != nil { + conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition) + } + if obj.GetArtifact().HasRevision(artifact.Revision) { + conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition) + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, + "Stored artifact for revision '%s'", artifact.Revision) + } + }() + + // The artifact is up-to-date + if obj.GetArtifact().HasRevision(artifact.Revision) { + ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("Already up to date, current revision '%s'", artifact.Revision)) + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil + } + + // Ensure target path exists and is a directory + if f, err := os.Stat(dir); err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to stat source path") + return ctrl.Result{}, err + } else if !f.IsDir() { + err := fmt.Errorf("source path '%s' is not a directory", dir) + ctrl.LoggerFrom(ctx).Error(err, "invalid target path") + return ctrl.Result{}, err + } + + // Ensure artifact directory exists and acquire lock + if err := r.Storage.MkdirAll(artifact); err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory") + return ctrl.Result{}, err + } + unlock, err := r.Storage.Lock(artifact) if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err + ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact") + return ctrl.Result{}, err } + defer unlock() - ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) - defer cancel() + // Archive directory to storage + if err := r.Storage.Archive(&artifact, dir, nil); err != nil { + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason, + "Unable to archive artifact to storage: %s", err) + return ctrl.Result{}, err + } + r.AnnotatedEventf(obj, map[string]string{ + "revision": artifact.Revision, + "checksum": artifact.Checksum, + }, corev1.EventTypeNormal, "NewArtifact", "Stored artifact for revision '%s'", artifact.Revision) - exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName) + // Record it on the object + obj.Status.Artifact = artifact.DeepCopy() + + // Update symlink on a "best effort" basis + url, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason, + "Failed to update status URL symlink: %s", err) } - if !exists { - err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + if url != "" { + obj.Status.URL = url } - - // Look for file with ignore rules first - // NB: S3 has flat filepath keys making it impossible to look - // for files in "subdirectories" without building up a tree first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { - if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - } - ps, err := sourceignore.ReadIgnoreFile(path, nil) - if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - // In-spec patterns take precedence - if bucket.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) - } - matcher := sourceignore.NewMatcher(ps) - - // download bucket content - for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{ - Recursive: true, - UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()), - }) { - if object.Err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - - if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { - continue - } - - if matcher.Match(strings.Split(object.Key, "/"), false) { - continue - } - - localPath := filepath.Join(tempDir, object.Key) - err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{}) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } - } - return sourcev1.Bucket{}, nil + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil } -// authGCP creates a new Google Cloud Platform storage client -// to interact with the storage service. -func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket) (*gcp.GCPClient, error) { +// reconcileDelete handles the deletion of an object. It first garbage collects all artifacts for the object from the +// artifact storage, if successful, the finalizer is removed from the object. +func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) { + // Garbage collect the resource's artifacts + if err := r.garbageCollect(ctx, obj); err != nil { + // Return the error so we retry the failed garbage collection + return ctrl.Result{}, err + } + + // Remove our finalizer from the list + controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer) + + // Stop reconciliation as the object is being deleted + return ctrl.Result{}, nil +} + +// garbageCollect performs a garbage collection for the given v1beta1.Bucket. It removes all but the current +// artifact except for when the deletion timestamp is set, which will result in the removal of all artifacts for the +// resource. +func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Bucket) error { + if !obj.DeletionTimestamp.IsZero() { + if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil { + r.Eventf(obj, corev1.EventTypeWarning, "GarbageCollectionFailed", + "Garbage collection for deleted resource failed: %s", err) + return err + } + obj.Status.Artifact = nil + // TODO(hidde): we should only push this event if we actually garbage collected something + r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded", + "Garbage collected artifacts for deleted resource") + return nil + } + if obj.GetArtifact() != nil { + if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { + r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionFailed", "Garbage collection of old artifacts failed: %s", err) + return err + } + // TODO(hidde): we should only push this event if we actually garbage collected something + r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded", "Garbage collected old artifacts") + } + return nil +} + +// buildMinioClient constructs a minio.Client with the data from the given object and Secret. +// It returns an error if the Secret does not have the required fields, or if there is no credential handler +// configured. +func (r *BucketReconciler) buildMinioClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) { + opts := minio.Options{ + Region: obj.Spec.Region, + Secure: !obj.Spec.Insecure, + } + if secret != nil { + var accessKey, secretKey string + if k, ok := secret.Data["accesskey"]; ok { + accessKey = string(k) + } + if k, ok := secret.Data["secretkey"]; ok { + secretKey = string(k) + } + if accessKey == "" || secretKey == "" { + return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) + } + opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "") + } else if obj.Spec.Provider == sourcev1.AmazonBucketProvider { + opts.Creds = credentials.NewIAM("") + } + return minio.New(obj.Spec.Endpoint, &opts) +} + +// buildGCPClient constructs a gcp.GCPClient with the data from the given Secret. +// It returns an error if the Secret does not have the required field, or if the client construction fails. +func (r *BucketReconciler) buildGCPClient(ctx context.Context, secret *corev1.Secret) (*gcp.GCPClient, error) { var client *gcp.GCPClient var err error - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } - - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } + if secret != nil { if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil { return nil, err } @@ -437,165 +742,26 @@ func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket) } } return client, nil - } -// authMinio creates a new Minio client to interact with S3 -// compatible storage services. -func (r *BucketReconciler) authMinio(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) { - opt := minio.Options{ - Region: bucket.Spec.Region, - Secure: !bucket.Spec.Insecure, +// etagIndex is an index of bucket keys and their Etag values. +type etagIndex map[string]string + +// Revision calculates the SHA256 checksum of the index. +// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of +// the key/value pairs, each pair written on a newline +// The sum result is returned as a string. +func (i etagIndex) Revision() (string, error) { + keyIndex := make([]string, 0, len(i)) + for k := range i { + keyIndex = append(keyIndex, k) } - - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, + sort.Strings(keyIndex) + sum := sha256.New() + for _, k := range keyIndex { + if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i[k]))); err != nil { + return "", err } - - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } - - accesskey := "" - secretkey := "" - if k, ok := secret.Data["accesskey"]; ok { - accesskey = string(k) - } - if k, ok := secret.Data["secretkey"]; ok { - secretkey = string(k) - } - if accesskey == "" || secretkey == "" { - return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) - } - opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "") - } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { - opt.Creds = credentials.NewIAM("") - } - - if opt.Creds == nil { - return nil, fmt.Errorf("no bucket credentials found") - } - - return minio.New(bucket.Spec.Endpoint, &opt) -} - -// checksum calculates the SHA1 checksum of the given root directory. -// It traverses the given root directory and calculates the checksum for any found file, and returns the SHA1 sum of the -// list with relative file paths and their checksums. -func (r *BucketReconciler) checksum(root string) (string, error) { - sum := sha1.New() - if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if !info.Mode().IsRegular() { - return nil - } - data, err := os.ReadFile(path) - if err != nil { - return err - } - relPath, err := filepath.Rel(root, path) - if err != nil { - return err - } - sum.Write([]byte(fmt.Sprintf("%x %s\n", sha1.Sum(data), relPath))) - return nil - }); err != nil { - return "", err } return fmt.Sprintf("%x", sum.Sum(nil)), nil } - -// resetStatus returns a modified v1beta1.Bucket and a boolean indicating -// if the status field has been reset. -func (r *BucketReconciler) resetStatus(bucket sourcev1.Bucket) (sourcev1.Bucket, bool) { - // We do not have an artifact, or it does no longer exist - if bucket.GetArtifact() == nil || !r.Storage.ArtifactExist(*bucket.GetArtifact()) { - bucket = sourcev1.BucketProgressing(bucket) - bucket.Status.Artifact = nil - return bucket, true - } - if bucket.Generation != bucket.Status.ObservedGeneration { - return sourcev1.BucketProgressing(bucket), true - } - return bucket, false -} - -// gc performs a garbage collection for the given v1beta1.Bucket. -// It removes all but the current artifact except for when the -// deletion timestamp is set, which will result in the removal of -// all artifacts for the resource. -func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error { - if !bucket.DeletionTimestamp.IsZero() { - return r.Storage.RemoveAll(r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), "", "*")) - } - if bucket.GetArtifact() != nil { - return r.Storage.RemoveAllButCurrent(*bucket.GetArtifact()) - } - return nil -} - -// event emits a Kubernetes event and forwards the event to notification controller if configured -func (r *BucketReconciler) event(ctx context.Context, bucket sourcev1.Bucket, severity, msg string) { - if r.EventRecorder != nil { - r.EventRecorder.Eventf(&bucket, corev1.EventTypeNormal, severity, msg) - } - if r.ExternalEventRecorder != nil { - r.ExternalEventRecorder.Eventf(&bucket, corev1.EventTypeNormal, severity, msg) - } -} - -func (r *BucketReconciler) recordReadiness(ctx context.Context, bucket sourcev1.Bucket) { - log := ctrl.LoggerFrom(ctx) - if r.MetricsRecorder == nil { - return - } - objRef, err := reference.GetReference(r.Scheme, &bucket) - if err != nil { - log.Error(err, "unable to record readiness metric") - return - } - if rc := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil { - r.MetricsRecorder.RecordCondition(*objRef, *rc, !bucket.DeletionTimestamp.IsZero()) - } else { - r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{ - Type: meta.ReadyCondition, - Status: metav1.ConditionUnknown, - }, !bucket.DeletionTimestamp.IsZero()) - } -} - -func (r *BucketReconciler) recordSuspension(ctx context.Context, bucket sourcev1.Bucket) { - if r.MetricsRecorder == nil { - return - } - log := ctrl.LoggerFrom(ctx) - - objRef, err := reference.GetReference(r.Scheme, &bucket) - if err != nil { - log.Error(err, "unable to record suspended metric") - return - } - - if !bucket.DeletionTimestamp.IsZero() { - r.MetricsRecorder.RecordSuspend(*objRef, false) - } else { - r.MetricsRecorder.RecordSuspend(*objRef, bucket.Spec.Suspend) - } -} - -func (r *BucketReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.BucketStatus) error { - var bucket sourcev1.Bucket - if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil { - return err - } - - patch := client.MergeFrom(bucket.DeepCopy()) - bucket.Status = newStatus - - return r.Status().Patch(ctx, &bucket, patch) -} diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index 01ff20d8..b5c9debe 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -17,59 +17,573 @@ limitations under the License. package controllers import ( + "context" + "crypto/md5" + "fmt" + "net/http" + "net/http/httptest" + "net/url" "os" + "path" "path/filepath" + "strings" "testing" + "time" + + "github.com/go-logr/logr" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/conditions" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" ) -func TestBucketReconciler_checksum(t *testing.T) { +func TestBucketReconciler_Reconcile(t *testing.T) { + g := NewWithT(t) + + s3Server := newS3Server("test-bucket") + s3Server.Objects = []*s3MockObject{ + { + Key: "test.yaml", + Content: []byte("test"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + } + s3Server.Start() + defer s3Server.Stop() + + g.Expect(s3Server.HTTPAddress()).ToNot(BeEmpty()) + u, err := url.Parse(s3Server.HTTPAddress()) + g.Expect(err).NotTo(HaveOccurred()) + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "bucket-reconcile-", + Namespace: "default", + }, + Data: map[string][]byte{ + "accesskey": []byte("key"), + "secretkey": []byte("secret"), + }, + } + g.Expect(testEnv.Create(ctx, secret)).To(Succeed()) + defer testEnv.Delete(ctx, secret) + + obj := &sourcev1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "bucket-reconcile-", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + Provider: "generic", + BucketName: s3Server.BucketName, + Endpoint: u.Host, + Insecure: true, + Interval: metav1.Duration{Duration: interval}, + Timeout: &metav1.Duration{Duration: timeout}, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + g.Expect(testEnv.Create(ctx, obj)).To(Succeed()) + + key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace} + + // Wait for finalizer to be set + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return false + } + return len(obj.Finalizers) > 0 + }, timeout).Should(BeTrue()) + + // Wait for Bucket to be Ready + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return false + } + if !conditions.IsReady(obj) || obj.Status.Artifact == nil { + return false + } + readyCondition := conditions.Get(obj, meta.ReadyCondition) + return obj.Generation == readyCondition.ObservedGeneration && + obj.Generation == obj.Status.ObservedGeneration + }, timeout).Should(BeTrue()) + + g.Expect(testEnv.Delete(ctx, obj)).To(Succeed()) + + // Wait for Bucket to be deleted + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return apierrors.IsNotFound(err) + } + return false + }, timeout).Should(BeTrue()) +} + +func TestBucketReconciler_reconcileStorage(t *testing.T) { tests := []struct { - name string - beforeFunc func(root string) - want string - wantErr bool + name string + beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error + want ctrl.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string }{ { - name: "empty root", - want: "da39a3ee5e6b4b0d3255bfef95601890afd80709", + name: "garbage collects", + beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { + revisions := []string{"a", "b", "c"} + for n := range revisions { + v := revisions[n] + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), + Revision: v, + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + return err + } + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/c.txt", + Revision: "c", + Checksum: "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", + URL: testStorage.Hostname + "/reconcile-storage/c.txt", + }, + assertPaths: []string{ + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", + }, }, { - name: "with file", - beforeFunc: func(root string) { - mockFile(root, "a/b/c.txt", "a dummy string") + name: "notices missing artifact in storage", + beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/invalid.txt"), + Revision: "d", + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + want: ctrl.Result{Requeue: true}, + assertPaths: []string{ + "!/reconcile-storage/invalid.txt", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage"), }, - want: "309a5e6e96b4a7eea0d1cfaabf1be8ec1c063fa0", }, { - name: "with file in different path", - beforeFunc: func(root string) { - mockFile(root, "a/b.txt", "a dummy string") + name: "updates hostname on diff from current", + beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/hostname.txt"), + Revision: "f", + Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2", + URL: "http://outdated.com/reconcile-storage/hostname.txt", + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + return err + } + return nil + }, + assertPaths: []string{ + "/reconcile-storage/hostname.txt", + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/hostname.txt", + Revision: "f", + Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2", + URL: testStorage.Hostname + "/reconcile-storage/hostname.txt", }, - want: "e28c62b5cc488849950c4355dddc5523712616d4", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - root, err := os.MkdirTemp("", "bucket-checksum-") - if err != nil { - t.Fatal(err) + g := NewWithT(t) + + r := &BucketReconciler{ + EventRecorder: record.NewFakeRecorder(32), + Storage: testStorage, + } + + obj := &sourcev1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + }, } - defer os.RemoveAll(root) if tt.beforeFunc != nil { - tt.beforeFunc(root) + g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) } - got, err := (&BucketReconciler{}).checksum(root) - if (err != nil) != tt.wantErr { - t.Errorf("checksum() error = %v, wantErr %v", err, tt.wantErr) - return + + got, err := r.reconcileStorage(context.TODO(), obj) + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact)) + if tt.assertArtifact != nil && tt.assertArtifact.URL != "" { + g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL)) } - if got != tt.want { - t.Errorf("checksum() got = %v, want %v", got, tt.want) + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + + for _, p := range tt.assertPaths { + absoluteP := filepath.Join(testStorage.BasePath, p) + if !strings.HasPrefix(p, "!") { + g.Expect(absoluteP).To(BeAnExistingFile()) + continue + } + g.Expect(absoluteP).NotTo(BeAnExistingFile()) } }) } } +func TestBucketReconciler_reconcileMinioSource(t *testing.T) { + tests := []struct { + name string + bucketName string + bucketObjects []*s3MockObject + middleware http.Handler + secret *corev1.Secret + beforeFunc func(obj *sourcev1.Bucket) + want ctrl.Result + wantErr bool + assertArtifact sourcev1.Artifact + assertConditions []metav1.Condition + }{ + { + name: "reconciles source", + bucketName: "dummy", + bucketObjects: []*s3MockObject{ + { + Key: "test.txt", + Content: []byte("test"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + }, + assertArtifact: sourcev1.Artifact{ + Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz", + Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), + }, + }, + // TODO(hidde): middleware for mock server + //{ + // name: "authenticates using secretRef", + // bucketName: "dummy", + //}, + { + name: "observes non-existing secretRef", + bucketName: "dummy", + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Spec.SecretRef = &meta.LocalObjectReference{ + Name: "dummy", + } + }, + wantErr: true, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/dummy': secrets \"dummy\" not found"), + }, + }, + { + name: "observes invalid secretRef", + bucketName: "dummy", + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dummy", + }, + }, + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Spec.SecretRef = &meta.LocalObjectReference{ + Name: "dummy", + } + }, + wantErr: true, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to construct S3 client: invalid 'dummy' secret data: required fields"), + }, + }, + { + name: "observes non-existing bucket name", + bucketName: "dummy", + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Spec.BucketName = "invalid" + }, + wantErr: true, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Bucket 'invalid' does not exist"), + }, + }, + { + name: "transient bucket name API failure", + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Spec.Endpoint = "transient.example.com" + obj.Spec.BucketName = "unavailable" + }, + wantErr: true, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket 'unavailable'"), + }, + }, + { + // TODO(hidde): test the lesser happy paths + name: ".sourceignore", + bucketName: "dummy", + bucketObjects: []*s3MockObject{ + { + Key: ".sourceignore", + Content: []byte("ignored/file.txt"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + { + Key: "ignored/file.txt", + Content: []byte("ignored/file.txt"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + { + Key: "included/file.txt", + Content: []byte("included/file.txt"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + }, + assertArtifact: sourcev1.Artifact{ + Path: "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz", + Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + builder := fakeclient.NewClientBuilder().WithScheme(testEnv.Scheme()) + if tt.secret != nil { + builder.WithObjects(tt.secret) + } + r := &BucketReconciler{ + EventRecorder: record.NewFakeRecorder(32), + Client: builder.Build(), + Storage: testStorage, + } + tmpDir, err := os.MkdirTemp("", "reconcile-bucket-source-") + g.Expect(err).ToNot(HaveOccurred()) + defer os.RemoveAll(tmpDir) + + obj := &sourcev1.Bucket{ + TypeMeta: metav1.TypeMeta{ + Kind: sourcev1.BucketKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bucket", + }, + Spec: sourcev1.BucketSpec{ + Timeout: &metav1.Duration{Duration: timeout}, + }, + } + + var server *s3MockServer + if tt.bucketName != "" { + server = newS3Server(tt.bucketName) + server.Objects = tt.bucketObjects + server.Start() + defer server.Stop() + + g.Expect(server.HTTPAddress()).ToNot(BeEmpty()) + u, err := url.Parse(server.HTTPAddress()) + g.Expect(err).NotTo(HaveOccurred()) + + obj.Spec.BucketName = tt.bucketName + obj.Spec.Endpoint = u.Host + // TODO(hidde): also test TLS + obj.Spec.Insecure = true + } + if tt.beforeFunc != nil { + tt.beforeFunc(obj) + } + + artifact := &sourcev1.Artifact{} + got, err := r.reconcileSource(context.TODO(), obj, artifact, tmpDir) + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy())) + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + }) + } +} + +func TestBucketReconciler_reconcileArtifact(t *testing.T) { + tests := []struct { + name string + artifact sourcev1.Artifact + beforeFunc func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) + want ctrl.Result + wantErr bool + assertConditions []metav1.Condition + }{ + { + name: "artifact revision up-to-date", + artifact: sourcev1.Artifact{ + Revision: "existing", + }, + beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + obj.Status.Artifact = &artifact + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"), + }, + }, + { + name: "dir path deleted", + beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + _ = os.RemoveAll(dir) + }, + wantErr: true, + }, + //{ + // name: "dir path empty", + //}, + //{ + // name: "success", + // artifact: sourcev1.Artifact{ + // Revision: "existing", + // }, + // beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + // obj.Status.Artifact = &artifact + // }, + // assertConditions: []metav1.Condition{ + // *conditions.TrueCondition(sourcev1.ArtifactAvailableCondition, meta.SucceededReason, "Compressed source to artifact with revision 'existing'"), + // }, + //}, + //{ + // name: "symlink", + //}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + tmpDir, err := os.MkdirTemp("", "reconcile-bucket-artifact-") + g.Expect(err).ToNot(HaveOccurred()) + defer os.RemoveAll(tmpDir) + + obj := &sourcev1.Bucket{ + TypeMeta: metav1.TypeMeta{ + Kind: sourcev1.BucketKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bucket", + }, + Spec: sourcev1.BucketSpec{ + Timeout: &metav1.Duration{Duration: timeout}, + }, + } + + if tt.beforeFunc != nil { + tt.beforeFunc(obj, tt.artifact, tmpDir) + } + + r := &BucketReconciler{ + EventRecorder: record.NewFakeRecorder(32), + Storage: testStorage, + } + + dlog := log.NewDelegatingLogSink(log.NullLogSink{}) + nullLogger := logr.New(dlog) + got, err := r.reconcileArtifact(logr.NewContext(ctx, nullLogger), obj, tt.artifact, tmpDir) + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + //g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy())) + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + }) + } +} + +func Test_etagIndex_Revision(t *testing.T) { + tests := []struct { + name string + list etagIndex + want string + wantErr bool + }{ + { + name: "index with items", + list: map[string]string{ + "one": "one", + "two": "two", + "three": "three", + }, + want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc", + }, + { + name: "index with items in different order", + list: map[string]string{ + "three": "three", + "one": "one", + "two": "two", + }, + want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc", + }, + { + name: "empty index", + list: map[string]string{}, + want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + }, + { + name: "nil index", + list: nil, + want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.list.Revision() + if (err != nil) != tt.wantErr { + t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("revision() got = %v, want %v", got, tt.want) + } + }) + } +} + +// helpers + func mockFile(root, path, content string) error { filePath := filepath.Join(root, path) if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { @@ -80,3 +594,120 @@ func mockFile(root, path, content string) error { } return nil } + +type s3MockObject struct { + Key string + LastModified time.Time + ContentType string + Content []byte +} + +type s3MockServer struct { + srv *httptest.Server + mux *http.ServeMux + + BucketName string + Objects []*s3MockObject +} + +func newS3Server(bucketName string) *s3MockServer { + s := &s3MockServer{BucketName: bucketName} + s.mux = http.NewServeMux() + s.mux.Handle(fmt.Sprintf("/%s/", s.BucketName), http.HandlerFunc(s.handler)) + + s.srv = httptest.NewUnstartedServer(s.mux) + + return s +} + +func (s *s3MockServer) Start() { + s.srv.Start() +} + +func (s *s3MockServer) Stop() { + s.srv.Close() +} + +func (s *s3MockServer) HTTPAddress() string { + return s.srv.URL +} + +func (s *s3MockServer) handler(w http.ResponseWriter, r *http.Request) { + key := path.Base(r.URL.Path) + + switch key { + case s.BucketName: + w.Header().Add("Content-Type", "application/xml") + + if r.Method == http.MethodHead { + return + } + + q := r.URL.Query() + + if q["location"] != nil { + fmt.Fprint(w, ` + +Europe + `) + return + } + + contents := "" + for _, o := range s.Objects { + etag := md5.Sum(o.Content) + contents += fmt.Sprintf(` + + %s + %s + %d + "%b" + STANDARD + `, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag) + } + + fmt.Fprintf(w, ` + + + %s + + + %d + 1000 + false + %s + + `, s.BucketName, len(s.Objects), contents) + default: + key, err := filepath.Rel("/"+s.BucketName, r.URL.Path) + if err != nil { + w.WriteHeader(500) + return + } + + var found *s3MockObject + for _, o := range s.Objects { + if key == o.Key { + found = o + } + } + if found == nil { + w.WriteHeader(404) + return + } + + etag := md5.Sum(found.Content) + lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1) + + w.Header().Add("Content-Type", found.ContentType) + w.Header().Add("Last-Modified", lastModified) + w.Header().Add("ETag", fmt.Sprintf("\"%b\"", etag)) + w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content))) + + if r.Method == http.MethodHead { + return + } + + w.Write(found.Content) + } +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index a33108dc..b27bda8f 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -97,6 +97,15 @@ func TestMain(m *testing.M) { panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err)) } + if err := (&BucketReconciler{ + Client: testEnv, + EventRecorder: record.NewFakeRecorder(32), + Metrics: testMetricsH, + Storage: testStorage, + }).SetupWithManager(testEnv); err != nil { + panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err)) + } + go func() { fmt.Println("Starting the test environment") if err := testEnv.Start(ctx); err != nil { diff --git a/main.go b/main.go index 3c0f2791..4ea5a102 100644 --- a/main.go +++ b/main.go @@ -203,11 +203,10 @@ func main() { os.Exit(1) } if err = (&controllers.BucketReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Storage: storage, - EventRecorder: eventRecorder, - MetricsRecorder: metricsH.MetricsRecorder, + Client: mgr.GetClient(), + EventRecorder: eventRecorder, + Metrics: metricsH, + Storage: storage, }).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{ MaxConcurrentReconciles: concurrent, }); err != nil {