From 588ccbfe99fe8cfd219be3455e4f8f74f45a2311 Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Sat, 31 Jul 2021 03:58:43 +0200 Subject: [PATCH] Rewrite `BucketReconciler` to new standards This commit rewrites the `BucketReconciler` to new standards, while implementing the newly introduced Condition types, and trying to adhere better to Kubernetes API conventions. More specifically it introduces: - Implementation of more explicit Condition types to highlight abnormalities. - Extensive usage of the `conditions` subpackage from `runtime`. - Better and more conflict-resilient (status)patching of reconciled objects using the `patch` subpackage from runtime. - Proper implementation of kstatus' `Reconciling` and `Stalled` conditions. - Refactor of reconciler logic, including more efficient detection of changes to bucket objects by making use of the etag data available, and downloading of object files in parallel with a limited number of workers (4). - Integration tests that solely rely on `testenv` and do not use Ginkgo. There are a couple of TODOs marked in-code, these are suggestions for the future and should be non-blocking. In addition to the TODOs, more complex and/or edge-case test scenarios may be added as well. Signed-off-by: Hidde Beydals --- api/v1beta1/bucket_types.go | 58 +-- controllers/artifact_matchers_test.go | 67 +++ controllers/bucket_controller.go | 720 +++++++++++++++----------- controllers/bucket_controller_test.go | 677 +++++++++++++++++++++++- controllers/suite_test.go | 11 +- main.go | 31 +- 6 files changed, 1162 insertions(+), 402 deletions(-) create mode 100644 controllers/artifact_matchers_test.go diff --git a/api/v1beta1/bucket_types.go b/api/v1beta1/bucket_types.go index 9aebad62..2b0a1ea4 100644 --- a/api/v1beta1/bucket_types.go +++ b/api/v1beta1/bucket_types.go @@ -20,8 +20,6 @@ import ( "time" "github.com/fluxcd/pkg/apis/meta" - "github.com/fluxcd/pkg/runtime/conditions" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -30,6 +28,18 @@ const ( BucketKind = "Bucket" ) +const ( + GenericBucketProvider string = "generic" + AmazonBucketProvider string = "aws" +) + +const ( + // DownloadFailedCondition indicates a transient or persistent download failure. If True, observations on the + // upstream Source revision are not possible, and the Artifact available for the Source may be outdated. + // This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True. + DownloadFailedCondition string = "DownloadFailed" +) + // BucketSpec defines the desired state of an S3 compatible bucket type BucketSpec struct { // The S3 compatible storage provider name, default ('generic'). @@ -79,11 +89,6 @@ type BucketSpec struct { Suspend bool `json:"suspend,omitempty"` } -const ( - GenericBucketProvider string = "generic" - AmazonBucketProvider string = "aws" -) - // BucketStatus defines the observed state of a bucket type BucketStatus struct { // ObservedGeneration is the last observed generation. @@ -115,45 +120,6 @@ const ( BucketOperationFailedReason string = "BucketOperationFailed" ) -// BucketProgressing resets the conditions of the Bucket to metav1.Condition of -// type meta.ReadyCondition with status 'Unknown' and meta.ProgressingReason -// reason and message. It returns the modified Bucket. -func BucketProgressing(bucket Bucket) Bucket { - bucket.Status.ObservedGeneration = bucket.Generation - bucket.Status.URL = "" - bucket.Status.Conditions = []metav1.Condition{} - conditions.MarkUnknown(&bucket, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress") - return bucket -} - -// BucketReady sets the given Artifact and URL on the Bucket and sets the -// meta.ReadyCondition to 'True', with the given reason and message. It returns -// the modified Bucket. -func BucketReady(bucket Bucket, artifact Artifact, url, reason, message string) Bucket { - bucket.Status.Artifact = &artifact - bucket.Status.URL = url - conditions.MarkTrue(&bucket, meta.ReadyCondition, reason, message) - return bucket -} - -// BucketNotReady sets the meta.ReadyCondition on the Bucket to 'False', with -// the given reason and message. It returns the modified Bucket. -func BucketNotReady(bucket Bucket, reason, message string) Bucket { - conditions.MarkFalse(&bucket, meta.ReadyCondition, reason, message) - return bucket -} - -// BucketReadyMessage returns the message of the metav1.Condition of type -// meta.ReadyCondition with status 'True' if present, or an empty string. -func BucketReadyMessage(bucket Bucket) string { - if c := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil { - if c.Status == metav1.ConditionTrue { - return c.Message - } - } - return "" -} - // GetConditions returns the status conditions of the object. func (in Bucket) GetConditions() []metav1.Condition { return in.Status.Conditions diff --git a/controllers/artifact_matchers_test.go b/controllers/artifact_matchers_test.go new file mode 100644 index 00000000..63ff81ce --- /dev/null +++ b/controllers/artifact_matchers_test.go @@ -0,0 +1,67 @@ +/* +Copyright 2021 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "fmt" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/types" +) + +// MatchArtifact returns a custom matcher to check equality of a v1beta1.Artifact, the timestamp and URL are ignored. +func MatchArtifact(expected *sourcev1.Artifact) types.GomegaMatcher { + return &matchArtifact{ + expected: expected, + } +} + +type matchArtifact struct { + expected *sourcev1.Artifact +} + +func (m matchArtifact) Match(actual interface{}) (success bool, err error) { + actualArtifact, ok := actual.(*sourcev1.Artifact) + if !ok { + return false, fmt.Errorf("actual should be a pointer to an Artifact") + } + + if ok, _ := BeNil().Match(m.expected); ok { + return BeNil().Match(actual) + } + + if ok, err = Equal(m.expected.Path).Match(actualArtifact.Path); !ok { + return ok, err + } + if ok, err = Equal(m.expected.Revision).Match(actualArtifact.Revision); !ok { + return ok, err + } + if ok, err = Equal(m.expected.Checksum).Match(actualArtifact.Checksum); !ok { + return ok, err + } + + return ok, err +} + +func (m matchArtifact) FailureMessage(actual interface{}) (message string) { + return fmt.Sprintf("expected\n\t%#v\nto match\n\t%#v\n", actual, m.expected) +} + +func (m matchArtifact) NegatedFailureMessage(actual interface{}) (message string) { + return fmt.Sprintf("expected\n\t%#v\nto not match\n\t%#v\n", actual, m.expected) +} diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 82844eb5..9d5e003c 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -18,24 +18,23 @@ package controllers import ( "context" - "crypto/sha1" + "crypto/sha256" "fmt" "os" "path/filepath" + "sort" "strings" "time" - "github.com/go-logr/logr" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/s3utils" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" corev1 "k8s.io/api/core/v1" - apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - kuberecorder "k8s.io/client-go/tools/record" - "k8s.io/client-go/tools/reference" + kerrors "k8s.io/apimachinery/pkg/util/errors" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -43,8 +42,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/conditions" + helper "github.com/fluxcd/pkg/runtime/controller" "github.com/fluxcd/pkg/runtime/events" - "github.com/fluxcd/pkg/runtime/metrics" + "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" @@ -59,11 +60,10 @@ import ( // BucketReconciler reconciles a Bucket object type BucketReconciler struct { client.Client - Scheme *runtime.Scheme - Storage *Storage - EventRecorder kuberecorder.EventRecorder - ExternalEventRecorder *events.Recorder - MetricsRecorder *metrics.Recorder + helper.Events + helper.Metrics + + Storage *Storage } type BucketReconcilerOptions struct { @@ -82,403 +82,497 @@ func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts Buc Complete(r) } -func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) { start := time.Now() log := ctrl.LoggerFrom(ctx) - var bucket sourcev1.Bucket - if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil { + // Fetch the Bucket + obj := &sourcev1.Bucket{} + if err := r.Get(ctx, req.NamespacedName, obj); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } // Record suspended status metric - defer r.recordSuspension(ctx, bucket) + r.RecordSuspend(ctx, obj, obj.Spec.Suspend) - // Add our finalizer if it does not exist - if !controllerutil.ContainsFinalizer(&bucket, sourcev1.SourceFinalizer) { - controllerutil.AddFinalizer(&bucket, sourcev1.SourceFinalizer) - if err := r.Update(ctx, &bucket); err != nil { - log.Error(err, "unable to register finalizer") - return ctrl.Result{}, err - } - } - - // Examine if the object is under deletion - if !bucket.ObjectMeta.DeletionTimestamp.IsZero() { - return r.reconcileDelete(ctx, bucket) - } - - // Return early if the object is suspended. - if bucket.Spec.Suspend { + // Return early if the object is suspended + if obj.Spec.Suspend { log.Info("Reconciliation is suspended for this object") return ctrl.Result{}, nil } - // record reconciliation duration - if r.MetricsRecorder != nil { - objRef, err := reference.GetReference(r.Scheme, &bucket) - if err != nil { - return ctrl.Result{}, err + // Initialize the patch helper + patchHelper, err := patch.NewHelper(obj, r.Client) + if err != nil { + return ctrl.Result{}, err + } + + // Always attempt to patch the object and status after each reconciliation + defer func() { + // Record the value of the reconciliation request, if any + if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok { + obj.Status.SetLastHandledReconcileRequest(v) } - defer r.MetricsRecorder.RecordDuration(*objRef, start) - } - // set initial status - if resetBucket, ok := r.resetStatus(bucket); ok { - bucket = resetBucket - if err := r.updateStatus(ctx, req, bucket.Status); err != nil { - log.Error(err, "unable to update status") - return ctrl.Result{Requeue: true}, err + // Summarize the Ready condition based on abnormalities that may have been observed + conditions.SetSummary(obj, + meta.ReadyCondition, + conditions.WithConditions( + sourcev1.ArtifactOutdatedCondition, + sourcev1.DownloadFailedCondition, + sourcev1.ArtifactUnavailableCondition, + ), + conditions.WithNegativePolarityConditions( + sourcev1.ArtifactOutdatedCondition, + sourcev1.DownloadFailedCondition, + sourcev1.ArtifactUnavailableCondition, + ), + ) + + // Patch the object, ignoring conflicts on the conditions owned by this controller + patchOpts := []patch.Option{ + patch.WithOwnedConditions{ + Conditions: []string{ + sourcev1.ArtifactOutdatedCondition, + sourcev1.DownloadFailedCondition, + sourcev1.ArtifactUnavailableCondition, + meta.ReadyCondition, + meta.ReconcilingCondition, + meta.StalledCondition, + }, + }, } - r.recordReadiness(ctx, bucket) + + // Determine if the resource is still being reconciled, or if it has stalled, and record this observation + if retErr == nil && (result.IsZero() || !result.Requeue) { + // We are no longer reconciling + conditions.Delete(obj, meta.ReconcilingCondition) + + // We have now observed this generation + patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{}) + + readyCondition := conditions.Get(obj, meta.ReadyCondition) + switch readyCondition.Status { + case metav1.ConditionFalse: + // As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled + conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message) + case metav1.ConditionTrue: + // As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled + conditions.Delete(obj, meta.StalledCondition) + } + } + + // Finally, patch the resource + if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil { + retErr = kerrors.NewAggregate([]error{retErr, err}) + } + + // Always record readiness and duration metrics + r.Metrics.RecordReadiness(ctx, obj) + r.Metrics.RecordDuration(ctx, obj, start) + }() + + // Add finalizer first if not exist to avoid the race condition between init and delete + if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) { + controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer) + return ctrl.Result{Requeue: true}, nil } - // record the value of the reconciliation request, if any - // TODO(hidde): would be better to defer this in combination with - // always patching the status sub-resource after a reconciliation. - if v, ok := meta.ReconcileAnnotationValue(bucket.GetAnnotations()); ok { - bucket.Status.SetLastHandledReconcileRequest(v) + // Examine if the object is under deletion + if !obj.ObjectMeta.DeletionTimestamp.IsZero() { + return r.reconcileDelete(ctx, obj) } - // purge old artifacts from storage - if err := r.gc(bucket); err != nil { - log.Error(err, "unable to purge old artifacts") - } - - // reconcile bucket by downloading its content - reconciledBucket, reconcileErr := r.reconcile(ctx, *bucket.DeepCopy()) - - // update status with the reconciliation result - if err := r.updateStatus(ctx, req, reconciledBucket.Status); err != nil { - log.Error(err, "unable to update status") - return ctrl.Result{Requeue: true}, err - } - - // if reconciliation failed, record the failure and requeue immediately - if reconcileErr != nil { - r.event(ctx, reconciledBucket, events.EventSeverityError, reconcileErr.Error()) - r.recordReadiness(ctx, reconciledBucket) - return ctrl.Result{Requeue: true}, reconcileErr - } - - // emit revision change event - if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision { - r.event(ctx, reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket)) - } - r.recordReadiness(ctx, reconciledBucket) - - log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s", - time.Now().Sub(start).String(), - bucket.GetRequeueAfter().String(), - )) - - return ctrl.Result{RequeueAfter: bucket.GetRequeueAfter()}, nil + // Reconcile actual object + return r.reconcile(ctx, obj) } -func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) { - s3Client, err := r.auth(ctx, bucket) - if err != nil { - err = fmt.Errorf("auth error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err +// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that +// produces an error. +func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) { + // Mark the resource as under reconciliation + conditions.MarkReconciling(obj, meta.ProgressingReason, "") + + // Reconcile the storage data + if result, err := r.reconcileStorage(ctx, obj); err != nil || result.IsZero() { + return result, err } - // create tmp dir - tempDir, err := os.MkdirTemp("", bucket.Name) + // Create temp working dir + tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name)) if err != nil { - err = fmt.Errorf("tmp dir error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason, "Failed to create temporary directory: %s", err) + return ctrl.Result{}, err } - defer os.RemoveAll(tempDir) + defer os.RemoveAll(tmpDir) - ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration) + // Reconcile the source from upstream + var artifact sourcev1.Artifact + if result, err := r.reconcileSource(ctx, obj, &artifact, tmpDir); err != nil || result.IsZero() { + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, err + } + + // Reconcile the artifact to storage + if result, err := r.reconcileArtifact(ctx, obj, artifact, tmpDir); err != nil || result.IsZero() { + return result, err + } + + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil +} + +// reconcileStorage ensures the current state of the storage matches the desired and previously observed state. +// +// All artifacts for the resource except for the current one are garbage collected from the storage. +// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object. +// If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set. +// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated. +// +// The caller should assume a failure if an error is returned, or the Result is zero. +func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) { + // Garbage collect previous advertised artifact(s) from storage + _ = r.garbageCollect(ctx, obj) + + // Determine if the advertised artifact is still in storage + if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) { + obj.Status.Artifact = nil + obj.Status.URL = "" + } + + // Record that we do not have an artifact + if obj.GetArtifact() == nil { + conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage") + return ctrl.Result{Requeue: true}, nil + } + conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition) + + // Always update URLs to ensure hostname is up-to-date + // TODO(hidde): we may want to send out an event only if we notice the URL has changed + r.Storage.SetArtifactURL(obj.GetArtifact()) + obj.Status.URL = r.Storage.SetHostname(obj.Status.URL) + + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil +} + +// reconcileSource ensures the upstream bucket can be reached and downloaded using the declared configuration, and +// observes its state. +// +// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into +// account. In case of an error during the download process (including transient errors), it records +// v1beta1.DownloadFailedCondition=True and returns early. +// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to +// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ. +// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata. +// +// The caller should assume a failure if an error is returned, or the Result is zero. +func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (ctrl.Result, error) { + // Attempt to retrieve secret if one is configured + var secret *corev1.Secret + if obj.Spec.SecretRef != nil { + secret = &corev1.Secret{} + name := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.Spec.SecretRef.Name, + } + if err := r.Client.Get(ctx, name, secret); err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason, + "Failed to get secret '%s': %s", name.String(), err.Error()) + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason, + "Failed to get secret '%s': %s", name.String(), err.Error()) + // Return error as the world as observed may change + return ctrl.Result{}, err + } + } + + // Build the client with the configuration from the object and secret + s3Client, err := r.buildClient(obj, secret) + if err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to construct S3 client: %s", err.Error()) + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason, + "Failed to construct S3 client: %s", err.Error()) + // Return error as the contents of the secret may change + return ctrl.Result{}, err + } + + // Confirm bucket exists + ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration) defer cancel() - - exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName) + exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName) if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error()) + return ctrl.Result{}, err } if !exists { - err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Bucket '%s' does not exist", obj.Spec.BucketName) + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason, + "Bucket '%s' does not exist", obj.Spec.BucketName) + return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName) } // Look for file with ignore rules first - // NB: S3 has flat filepath keys making it impossible to look - // for files in "subdirectories" without building up a tree first. - path := filepath.Join(tempDir, sourceignore.IgnoreFile) - if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { + path := filepath.Join(dir, sourceignore.IgnoreFile) + if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil { if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason, + "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + return ctrl.Result{}, err } } ps, err := sourceignore.ReadIgnoreFile(path, nil) if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason, + "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error()) + return ctrl.Result{}, err } // In-spec patterns take precedence - if bucket.Spec.Ignore != nil { - ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...) + if obj.Spec.Ignore != nil { + ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...) } matcher := sourceignore.NewMatcher(ps) - // download bucket content - for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{ + // Build up an index of object keys and their etags + // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to + // detect both structural and file changes + index := map[string]string{} + for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{ Recursive: true, UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()), }) { - if object.Err != nil { - err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err + if err = object.Err; err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error()) + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason, + "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error()) + return ctrl.Result{}, err } + // Ignore directories and the .sourceignore file if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile { continue } - + // Ignore matches if matcher.Match(strings.Split(object.Key, "/"), false) { continue } - - localPath := filepath.Join(tempDir, object.Key) - err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{}) - if err != nil { - err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err) - return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err - } + index[object.Key] = object.ETag } - revision, err := r.checksum(tempDir) + // Calculate revision checksum from the collected index values + revision, err := r.revision(index) if err != nil { - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err + ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision") + return ctrl.Result{}, err } - // return early on unchanged revision - artifact := r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", revision)) - if apimeta.IsStatusConditionTrue(bucket.Status.Conditions, meta.ReadyCondition) && bucket.GetArtifact().HasRevision(artifact.Revision) { - if artifact.URL != bucket.GetArtifact().URL { - r.Storage.SetArtifactURL(bucket.GetArtifact()) - bucket.Status.URL = r.Storage.SetHostname(bucket.Status.URL) + if !obj.GetArtifact().HasRevision(revision) { + // Mark observations about the revision on the object + conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", + "New upstream revision '%s'", revision) + + // Download the files in parallel, but with a limited number of workers + group, groupCtx := errgroup.WithContext(ctx) + group.Go(func() error { + const workers = 4 + sem := semaphore.NewWeighted(workers) + for key := range index { + k := key + if err := sem.Acquire(groupCtx, 1); err != nil { + return err + } + group.Go(func() error { + defer sem.Release(1) + localPath := filepath.Join(dir, k) + if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil { + return fmt.Errorf("failed to get '%s' file: %w", k, err) + } + return nil + }) + } + return nil + }) + if err = group.Wait(); err != nil { + conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, + "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err) + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason, + "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err) + return ctrl.Result{}, err } - return bucket, nil + r.Eventf(ctx, obj, events.EventSeverityInfo, sourcev1.BucketOperationSucceedReason, + "Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision) + } + conditions.Delete(obj, sourcev1.DownloadFailedCondition) + + // Create potential new artifact + *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision)) + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil +} + +// reconcileArtifact archives a new artifact to the storage, if the current observation on the object does not match the +// given data. +// +// The inspection of the given data to the object is differed, ensuring any stale observations as +// v1beta1.ArtifactUnavailableCondition and v1beta1.ArtifactOutdatedCondition are always deleted. +// If the given artifact does not differ from the object's current, it returns early. +// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is +// updated to its path. +// +// The caller should assume a failure if an error is returned, or the Result is zero. +func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) (ctrl.Result, error) { + // Always restore the Ready condition in case it got removed due to a transient error + defer func() { + if obj.GetArtifact() != nil { + conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition) + } + if obj.GetArtifact().HasRevision(artifact.Revision) { + conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition) + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, + "Stored artifact for revision '%s'", artifact.Revision) + } + }() + + // The artifact is up-to-date + if obj.GetArtifact().HasRevision(artifact.Revision) { + ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("Already up to date, current revision '%s'", artifact.Revision)) + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil } - // create artifact dir - err = r.Storage.MkdirAll(artifact) - if err != nil { - err = fmt.Errorf("mkdir dir error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err + // Ensure target path exists and is a directory + if f, err := os.Stat(dir); err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to stat source path") + return ctrl.Result{}, err + } else if !f.IsDir() { + err := fmt.Errorf("source path '%s' is not a directory", dir) + ctrl.LoggerFrom(ctx).Error(err, "invalid target path") + return ctrl.Result{}, err } - // acquire lock + // Ensure artifact directory exists and acquire lock + if err := r.Storage.MkdirAll(artifact); err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory") + return ctrl.Result{}, err + } unlock, err := r.Storage.Lock(artifact) if err != nil { - err = fmt.Errorf("unable to acquire lock: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err + ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact") + return ctrl.Result{}, err } defer unlock() - // archive artifact and check integrity - if err := r.Storage.Archive(&artifact, tempDir, nil); err != nil { - err = fmt.Errorf("storage archive error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err + // Archive directory to storage + if err := r.Storage.Archive(&artifact, dir, nil); err != nil { + r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason, + "Unable to archive artifact to storage: %s", err) + return ctrl.Result{}, err } + r.Events.EventWithMetaf(ctx, obj, map[string]string{ + "revision": artifact.Revision, + "checksum": artifact.Checksum, + }, events.EventSeverityInfo, "NewArtifact", "Stored artifact for revision '%s'", artifact.Revision) - // update latest symlink + // Record it on the object + obj.Status.Artifact = artifact.DeepCopy() + + // Update symlink on a "best effort" basis url, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { - err = fmt.Errorf("storage symlink error: %w", err) - return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err + r.Events.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason, + "Failed to update status URL symlink: %s", err) } - - message := fmt.Sprintf("Fetched revision: %s", artifact.Revision) - return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil + if url != "" { + obj.Status.URL = url + } + return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil } -func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) { - if err := r.gc(bucket); err != nil { - r.event(ctx, bucket, events.EventSeverityError, - fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error())) +// reconcileDelete handles the deletion of an object. It first garbage collects all artifacts for the object from the +// artifact storage, if successful, the finalizer is removed from the object. +func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) { + // Garbage collect the resource's artifacts + if err := r.garbageCollect(ctx, obj); err != nil { // Return the error so we retry the failed garbage collection return ctrl.Result{}, err } - // Record deleted status - r.recordReadiness(ctx, bucket) - - // Remove our finalizer from the list and update it - controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer) - if err := r.Update(ctx, &bucket); err != nil { - return ctrl.Result{}, err - } + // Remove our finalizer from the list + controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer) // Stop reconciliation as the object is being deleted return ctrl.Result{}, nil } -func (r *BucketReconciler) auth(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) { - opt := minio.Options{ - Region: bucket.Spec.Region, - Secure: !bucket.Spec.Insecure, - } - - if bucket.Spec.SecretRef != nil { - secretName := types.NamespacedName{ - Namespace: bucket.GetNamespace(), - Name: bucket.Spec.SecretRef.Name, - } - - var secret corev1.Secret - if err := r.Get(ctx, secretName, &secret); err != nil { - return nil, fmt.Errorf("credentials secret error: %w", err) - } - - accesskey := "" - secretkey := "" - if k, ok := secret.Data["accesskey"]; ok { - accesskey = string(k) - } - if k, ok := secret.Data["secretkey"]; ok { - secretkey = string(k) - } - if accesskey == "" || secretkey == "" { - return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) - } - opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "") - } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider { - opt.Creds = credentials.NewIAM("") - } - - if opt.Creds == nil { - return nil, fmt.Errorf("no bucket credentials found") - } - - return minio.New(bucket.Spec.Endpoint, &opt) -} - -// checksum calculates the SHA1 checksum of the given root directory. -// It traverses the given root directory and calculates the checksum for any found file, and returns the SHA1 sum of the -// list with relative file paths and their checksums. -func (r *BucketReconciler) checksum(root string) (string, error) { - sum := sha1.New() - if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { - if err != nil { +// garbageCollect performs a garbage collection for the given v1beta1.Bucket. It removes all but the current +// artifact except for when the deletion timestamp is set, which will result in the removal of all artifacts for the +// resource. +func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Bucket) error { + if !obj.DeletionTimestamp.IsZero() { + if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil { + r.Eventf(ctx, obj, events.EventSeverityError, "GarbageCollectionFailed", + "Garbage collection for deleted resource failed: %s", err) return err } - if !info.Mode().IsRegular() { - return nil - } - data, err := os.ReadFile(path) - if err != nil { - return err - } - relPath, err := filepath.Rel(root, path) - if err != nil { - return err - } - sum.Write([]byte(fmt.Sprintf("%x %s\n", sha1.Sum(data), relPath))) + obj.Status.Artifact = nil + // TODO(hidde): we should only push this event if we actually garbage collected something + r.Eventf(ctx, obj, events.EventSeverityInfo, "GarbageCollectionSucceeded", + "Garbage collected artifacts for deleted resource") return nil - }); err != nil { - return "", err } - return fmt.Sprintf("%x", sum.Sum(nil)), nil -} - -// resetStatus returns a modified v1beta1.Bucket and a boolean indicating -// if the status field has been reset. -func (r *BucketReconciler) resetStatus(bucket sourcev1.Bucket) (sourcev1.Bucket, bool) { - // We do not have an artifact, or it does no longer exist - if bucket.GetArtifact() == nil || !r.Storage.ArtifactExist(*bucket.GetArtifact()) { - bucket = sourcev1.BucketProgressing(bucket) - bucket.Status.Artifact = nil - return bucket, true - } - if bucket.Generation != bucket.Status.ObservedGeneration { - return sourcev1.BucketProgressing(bucket), true - } - return bucket, false -} - -// gc performs a garbage collection for the given v1beta1.Bucket. -// It removes all but the current artifact except for when the -// deletion timestamp is set, which will result in the removal of -// all artifacts for the resource. -func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error { - if !bucket.DeletionTimestamp.IsZero() { - return r.Storage.RemoveAll(r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), "", "*")) - } - if bucket.GetArtifact() != nil { - return r.Storage.RemoveAllButCurrent(*bucket.GetArtifact()) + if obj.GetArtifact() != nil { + if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil { + r.Eventf(ctx, obj, events.EventSeverityError, "GarbageCollectionFailed", "Garbage collection of old artifacts failed: %s", err) + return err + } + // TODO(hidde): we should only push this event if we actually garbage collected something + r.Eventf(ctx, obj, events.EventSeverityInfo, "GarbageCollectionSucceeded", "Garbage collected old artifacts") } return nil } -// event emits a Kubernetes event and forwards the event to notification controller if configured -func (r *BucketReconciler) event(ctx context.Context, bucket sourcev1.Bucket, severity, msg string) { - log := logr.FromContext(ctx) - if r.EventRecorder != nil { - r.EventRecorder.Eventf(&bucket, "Normal", severity, msg) +// buildClient constructs a minio.Client with the data from the given object and secret. +// It returns an error if the given Secret does not have the required fields, or if there is no credential handler +// configured. +func (r *BucketReconciler) buildClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) { + opts := minio.Options{ + Region: obj.Spec.Region, + Secure: !obj.Spec.Insecure, } - if r.ExternalEventRecorder != nil { - objRef, err := reference.GetReference(r.Scheme, &bucket) - if err != nil { - log.Error(err, "unable to send event") - return + if secret != nil { + var accessKey, secretKey string + if k, ok := secret.Data["accesskey"]; ok { + accessKey = string(k) } + if k, ok := secret.Data["secretkey"]; ok { + secretKey = string(k) + } + if accessKey == "" || secretKey == "" { + return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name) + } + opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "") + } else if obj.Spec.Provider == sourcev1.AmazonBucketProvider { + opts.Creds = credentials.NewIAM("") + } - if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil { - log.Error(err, "unable to send event") - return + return minio.New(obj.Spec.Endpoint, &opts) +} + +// revision calculates the SHA256 checksum of the given string map. +// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of +// the key/value pairs, each pair written on a newline. The sum result is returned as a string. +func (r *BucketReconciler) revision(list map[string]string) (string, error) { + keyIndex := make([]string, 0, len(list)) + for k := range list { + keyIndex = append(keyIndex, k) + } + sort.Strings(keyIndex) + sum := sha256.New() + for _, k := range keyIndex { + if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, list[k]))); err != nil { + return "", err } } -} - -func (r *BucketReconciler) recordReadiness(ctx context.Context, bucket sourcev1.Bucket) { - log := logr.FromContext(ctx) - if r.MetricsRecorder == nil { - return - } - objRef, err := reference.GetReference(r.Scheme, &bucket) - if err != nil { - log.Error(err, "unable to record readiness metric") - return - } - if rc := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil { - r.MetricsRecorder.RecordCondition(*objRef, *rc, !bucket.DeletionTimestamp.IsZero()) - } else { - r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{ - Type: meta.ReadyCondition, - Status: metav1.ConditionUnknown, - }, !bucket.DeletionTimestamp.IsZero()) - } -} - -func (r *BucketReconciler) recordSuspension(ctx context.Context, bucket sourcev1.Bucket) { - if r.MetricsRecorder == nil { - return - } - log := logr.FromContext(ctx) - - objRef, err := reference.GetReference(r.Scheme, &bucket) - if err != nil { - log.Error(err, "unable to record suspended metric") - return - } - - if !bucket.DeletionTimestamp.IsZero() { - r.MetricsRecorder.RecordSuspend(*objRef, false) - } else { - r.MetricsRecorder.RecordSuspend(*objRef, bucket.Spec.Suspend) - } -} - -func (r *BucketReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.BucketStatus) error { - var bucket sourcev1.Bucket - if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil { - return err - } - - patch := client.MergeFrom(bucket.DeepCopy()) - bucket.Status = newStatus - - return r.Status().Patch(ctx, &bucket, patch) + return fmt.Sprintf("%x", sum.Sum(nil)), nil } diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index 01ff20d8..60fe1bec 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -17,59 +17,567 @@ limitations under the License. package controllers import ( + "context" + "crypto/md5" + "fmt" + "net/http" + "net/http/httptest" + "net/url" "os" + "path" "path/filepath" + "strings" "testing" + "time" + + "github.com/go-logr/logr" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/runtime/conditions" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" ) -func TestBucketReconciler_checksum(t *testing.T) { +func TestBucketReconciler_Reconcile(t *testing.T) { + g := NewWithT(t) + + s3Server := newS3Server("test-bucket") + s3Server.Objects = []*s3MockObject{ + { + Key: "test.yaml", + Content: []byte("test"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + } + s3Server.Start() + defer s3Server.Stop() + + g.Expect(s3Server.HTTPAddress()).ToNot(BeEmpty()) + u, err := url.Parse(s3Server.HTTPAddress()) + g.Expect(err).NotTo(HaveOccurred()) + + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "bucket-reconcile-", + Namespace: "default", + }, + Data: map[string][]byte{ + "accesskey": []byte("key"), + "secretkey": []byte("secret"), + }, + } + g.Expect(testEnv.Create(ctx, secret)).To(Succeed()) + defer testEnv.Delete(ctx, secret) + + obj := &sourcev1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "bucket-reconcile-", + Namespace: "default", + }, + Spec: sourcev1.BucketSpec{ + Provider: "generic", + BucketName: s3Server.BucketName, + Endpoint: u.Host, + Insecure: true, + Interval: metav1.Duration{Duration: interval}, + Timeout: &metav1.Duration{Duration: timeout}, + SecretRef: &meta.LocalObjectReference{ + Name: secret.Name, + }, + }, + } + g.Expect(testEnv.Create(ctx, obj)).To(Succeed()) + + key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace} + + // Wait for finalizer to be set + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return false + } + return len(obj.Finalizers) > 0 + }, timeout).Should(BeTrue()) + + // Wait for Bucket to be Ready + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return false + } + if !conditions.IsReady(obj) || obj.Status.Artifact == nil { + return false + } + readyCondition := conditions.Get(obj, meta.ReadyCondition) + return obj.Generation == readyCondition.ObservedGeneration && + obj.Generation == obj.Status.ObservedGeneration + }, timeout).Should(BeTrue()) + + g.Expect(testEnv.Delete(ctx, obj)).To(Succeed()) + + // Wait for Bucket to be deleted + g.Eventually(func() bool { + if err := testEnv.Get(ctx, key, obj); err != nil { + return apierrors.IsNotFound(err) + } + return false + }, timeout).Should(BeTrue()) +} + +func TestBucketReconciler_reconcileStorage(t *testing.T) { tests := []struct { - name string - beforeFunc func(root string) - want string - wantErr bool + name string + beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error + want ctrl.Result + wantErr bool + assertArtifact *sourcev1.Artifact + assertConditions []metav1.Condition + assertPaths []string }{ { - name: "empty root", - want: "da39a3ee5e6b4b0d3255bfef95601890afd80709", + name: "garbage collects", + beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { + revisions := []string{"a", "b", "c"} + for n := range revisions { + v := revisions[n] + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/%s.txt", v), + Revision: v, + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil { + return err + } + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/c.txt", + Revision: "c", + Checksum: "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", + URL: testStorage.Hostname + "/reconcile-storage/c.txt", + }, + assertPaths: []string{ + "/reconcile-storage/c.txt", + "!/reconcile-storage/b.txt", + "!/reconcile-storage/a.txt", + }, }, { - name: "with file", - beforeFunc: func(root string) { - mockFile(root, "a/b/c.txt", "a dummy string") + name: "notices missing artifact in storage", + beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/invalid.txt"), + Revision: "d", + } + testStorage.SetArtifactURL(obj.Status.Artifact) + return nil + }, + want: ctrl.Result{Requeue: true}, + assertPaths: []string{ + "!/reconcile-storage/invalid.txt", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage"), }, - want: "309a5e6e96b4a7eea0d1cfaabf1be8ec1c063fa0", }, { - name: "with file in different path", - beforeFunc: func(root string) { - mockFile(root, "a/b.txt", "a dummy string") + name: "updates hostname on diff from current", + beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: fmt.Sprintf("/reconcile-storage/hostname.txt"), + Revision: "f", + Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2", + URL: "http://outdated.com/reconcile-storage/hostname.txt", + } + if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil { + return err + } + if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil { + return err + } + return nil + }, + assertPaths: []string{ + "/reconcile-storage/hostname.txt", + }, + assertArtifact: &sourcev1.Artifact{ + Path: "/reconcile-storage/hostname.txt", + Revision: "f", + Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2", + URL: testStorage.Hostname + "/reconcile-storage/hostname.txt", }, - want: "e28c62b5cc488849950c4355dddc5523712616d4", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - root, err := os.MkdirTemp("", "bucket-checksum-") - if err != nil { - t.Fatal(err) + g := NewWithT(t) + + r := &BucketReconciler{ + Storage: testStorage, + } + + obj := &sourcev1.Bucket{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + }, } - defer os.RemoveAll(root) if tt.beforeFunc != nil { - tt.beforeFunc(root) + g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) } - got, err := (&BucketReconciler{}).checksum(root) - if (err != nil) != tt.wantErr { - t.Errorf("checksum() error = %v, wantErr %v", err, tt.wantErr) - return + + got, err := r.reconcileStorage(context.TODO(), obj) + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact)) + if tt.assertArtifact != nil && tt.assertArtifact.URL != "" { + g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL)) } - if got != tt.want { - t.Errorf("checksum() got = %v, want %v", got, tt.want) + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + + for _, p := range tt.assertPaths { + absoluteP := filepath.Join(testStorage.BasePath, p) + if !strings.HasPrefix(p, "!") { + g.Expect(absoluteP).To(BeAnExistingFile()) + continue + } + g.Expect(absoluteP).NotTo(BeAnExistingFile()) } }) } } +func TestBucketReconciler_reconcileSource(t *testing.T) { + tests := []struct { + name string + bucketName string + bucketObjects []*s3MockObject + middleware http.Handler + secret *corev1.Secret + beforeFunc func(obj *sourcev1.Bucket) + want ctrl.Result + wantErr bool + assertArtifact sourcev1.Artifact + assertConditions []metav1.Condition + }{ + { + name: "reconciles source", + bucketName: "dummy", + bucketObjects: []*s3MockObject{ + { + Key: "test.txt", + Content: []byte("test"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + }, + assertArtifact: sourcev1.Artifact{ + Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz", + Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"), + }, + }, + // TODO(hidde): middleware for mock server + //{ + // name: "authenticates using secretRef", + // bucketName: "dummy", + //}, + { + name: "observes non-existing secretRef", + bucketName: "dummy", + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Spec.SecretRef = &meta.LocalObjectReference{ + Name: "dummy", + } + }, + wantErr: true, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/dummy': secrets \"dummy\" not found"), + }, + }, + { + name: "observes invalid secretRef", + bucketName: "dummy", + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dummy", + }, + }, + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Spec.SecretRef = &meta.LocalObjectReference{ + Name: "dummy", + } + }, + wantErr: true, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to construct S3 client: invalid 'dummy' secret data: required fields"), + }, + }, + { + name: "observes non-existing bucket name", + bucketName: "dummy", + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Spec.BucketName = "invalid" + }, + wantErr: true, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Bucket 'invalid' does not exist"), + }, + }, + { + name: "transient bucket name API failure", + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Spec.Endpoint = "transient.example.com" + obj.Spec.BucketName = "unavailable" + }, + wantErr: true, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket 'unavailable'"), + }, + }, + { + // TODO(hidde): test the lesser happy paths + name: ".sourceignore", + bucketName: "dummy", + bucketObjects: []*s3MockObject{ + { + Key: ".sourceignore", + Content: []byte("ignored/file.txt"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + { + Key: "ignored/file.txt", + Content: []byte("ignored/file.txt"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + { + Key: "included/file.txt", + Content: []byte("included/file.txt"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + }, + assertArtifact: sourcev1.Artifact{ + Path: "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz", + Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100", + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + builder := fakeclient.NewClientBuilder().WithScheme(testEnv.Scheme()) + if tt.secret != nil { + builder.WithObjects(tt.secret) + } + r := &BucketReconciler{ + Client: builder.Build(), + Storage: testStorage, + } + tmpDir, err := os.MkdirTemp("", "reconcile-bucket-source-") + g.Expect(err).ToNot(HaveOccurred()) + defer os.RemoveAll(tmpDir) + + obj := &sourcev1.Bucket{ + TypeMeta: metav1.TypeMeta{ + Kind: sourcev1.BucketKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bucket", + }, + Spec: sourcev1.BucketSpec{ + Timeout: &metav1.Duration{Duration: timeout}, + }, + } + + var server *s3MockServer + if tt.bucketName != "" { + server = newS3Server(tt.bucketName) + server.Objects = tt.bucketObjects + server.Start() + defer server.Stop() + + g.Expect(server.HTTPAddress()).ToNot(BeEmpty()) + u, err := url.Parse(server.HTTPAddress()) + g.Expect(err).NotTo(HaveOccurred()) + + obj.Spec.BucketName = tt.bucketName + obj.Spec.Endpoint = u.Host + // TODO(hidde): also test TLS + obj.Spec.Insecure = true + } + if tt.beforeFunc != nil { + tt.beforeFunc(obj) + } + + artifact := &sourcev1.Artifact{} + got, err := r.reconcileSource(context.TODO(), obj, artifact, tmpDir) + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy())) + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + }) + } +} + +func TestBucketReconciler_reconcileArtifact(t *testing.T) { + tests := []struct { + name string + artifact sourcev1.Artifact + beforeFunc func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) + want ctrl.Result + wantErr bool + assertConditions []metav1.Condition + }{ + { + name: "artifact revision up-to-date", + artifact: sourcev1.Artifact{ + Revision: "existing", + }, + beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + obj.Status.Artifact = &artifact + }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"), + }, + }, + { + name: "dir path deleted", + beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + _ = os.RemoveAll(dir) + }, + wantErr: true, + }, + //{ + // name: "dir path empty", + //}, + //{ + // name: "success", + // artifact: sourcev1.Artifact{ + // Revision: "existing", + // }, + // beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) { + // obj.Status.Artifact = &artifact + // }, + // assertConditions: []metav1.Condition{ + // *conditions.TrueCondition(sourcev1.ArtifactAvailableCondition, meta.SucceededReason, "Compressed source to artifact with revision 'existing'"), + // }, + //}, + //{ + // name: "symlink", + //}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + tmpDir, err := os.MkdirTemp("", "reconcile-bucket-artifact-") + g.Expect(err).ToNot(HaveOccurred()) + defer os.RemoveAll(tmpDir) + + obj := &sourcev1.Bucket{ + TypeMeta: metav1.TypeMeta{ + Kind: sourcev1.BucketKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bucket", + }, + Spec: sourcev1.BucketSpec{ + Timeout: &metav1.Duration{Duration: timeout}, + }, + } + + if tt.beforeFunc != nil { + tt.beforeFunc(obj, tt.artifact, tmpDir) + } + + r := &BucketReconciler{ + Storage: testStorage, + } + + got, err := r.reconcileArtifact(logr.NewContext(ctx, log.NullLogger{}), obj, tt.artifact, tmpDir) + g.Expect(err != nil).To(Equal(tt.wantErr)) + g.Expect(got).To(Equal(tt.want)) + + //g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy())) + g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + }) + } +} + +func TestBucketReconciler_revision(t *testing.T) { + tests := []struct { + name string + list map[string]string + want string + wantErr bool + }{ + { + name: "list with items", + list: map[string]string{ + "one": "one", + "two": "two", + "three": "three", + }, + want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc", + }, + { + name: "list with items in different order", + list: map[string]string{ + "three": "three", + "one": "one", + "two": "two", + }, + want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc", + }, + { + name: "empty list", + list: map[string]string{}, + want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + }, + { + name: "nil list", + list: nil, + want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := (&BucketReconciler{}).revision(tt.list) + if (err != nil) != tt.wantErr { + t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("revision() got = %v, want %v", got, tt.want) + } + }) + } +} + +// helpers + func mockFile(root, path, content string) error { filePath := filepath.Join(root, path) if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil { @@ -80,3 +588,120 @@ func mockFile(root, path, content string) error { } return nil } + +type s3MockObject struct { + Key string + LastModified time.Time + ContentType string + Content []byte +} + +type s3MockServer struct { + srv *httptest.Server + mux *http.ServeMux + + BucketName string + Objects []*s3MockObject +} + +func newS3Server(bucketName string) *s3MockServer { + s := &s3MockServer{BucketName: bucketName} + s.mux = http.NewServeMux() + s.mux.Handle(fmt.Sprintf("/%s/", s.BucketName), http.HandlerFunc(s.handler)) + + s.srv = httptest.NewUnstartedServer(s.mux) + + return s +} + +func (s *s3MockServer) Start() { + s.srv.Start() +} + +func (s *s3MockServer) Stop() { + s.srv.Close() +} + +func (s *s3MockServer) HTTPAddress() string { + return s.srv.URL +} + +func (s *s3MockServer) handler(w http.ResponseWriter, r *http.Request) { + key := path.Base(r.URL.Path) + + switch key { + case s.BucketName: + w.Header().Add("Content-Type", "application/xml") + + if r.Method == http.MethodHead { + return + } + + q := r.URL.Query() + + if q["location"] != nil { + fmt.Fprint(w, ` + +Europe + `) + return + } + + contents := "" + for _, o := range s.Objects { + etag := md5.Sum(o.Content) + contents += fmt.Sprintf(` + + %s + %s + %d + "%b" + STANDARD + `, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag) + } + + fmt.Fprintf(w, ` + + + %s + + + %d + 1000 + false + %s + + `, s.BucketName, len(s.Objects), contents) + default: + key, err := filepath.Rel("/"+s.BucketName, r.URL.Path) + if err != nil { + w.WriteHeader(500) + return + } + + var found *s3MockObject + for _, o := range s.Objects { + if key == o.Key { + found = o + } + } + if found == nil { + w.WriteHeader(404) + return + } + + etag := md5.Sum(found.Content) + lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1) + + w.Header().Add("Content-Type", found.ContentType) + w.Header().Add("Last-Modified", lastModified) + w.Header().Add("ETag", fmt.Sprintf("\"%b\"", etag)) + w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content))) + + if r.Method == http.MethodHead { + return + } + + w.Write(found.Content) + } +} diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 8dd32e35..3429e7df 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -29,11 +29,11 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/testenv" "github.com/fluxcd/pkg/testserver" sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" // +kubebuilder:scaffold:imports - "github.com/fluxcd/pkg/runtime/testenv" ) // These tests make use of plain Go using Gomega for assertions. @@ -98,6 +98,15 @@ func TestMain(m *testing.M) { panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err)) } + if err := (&BucketReconciler{ + Client: testEnv, + Events: testEventsH, + Metrics: testMetricsH, + Storage: testStorage, + }).SetupWithManager(testEnv); err != nil { + panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err)) + } + go func() { fmt.Println("Starting the test environment") if err := testEnv.Start(ctx); err != nil { diff --git a/main.go b/main.go index 0730bb94..71647e2f 100644 --- a/main.go +++ b/main.go @@ -25,13 +25,6 @@ import ( "strings" "time" - "github.com/fluxcd/pkg/runtime/client" - helper "github.com/fluxcd/pkg/runtime/controller" - "github.com/fluxcd/pkg/runtime/events" - "github.com/fluxcd/pkg/runtime/leaderelection" - "github.com/fluxcd/pkg/runtime/logger" - "github.com/fluxcd/pkg/runtime/pprof" - "github.com/fluxcd/pkg/runtime/probes" "github.com/go-logr/logr" flag "github.com/spf13/pflag" "helm.sh/helm/v3/pkg/getter" @@ -41,6 +34,14 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ctrl "sigs.k8s.io/controller-runtime" + "github.com/fluxcd/pkg/runtime/client" + helper "github.com/fluxcd/pkg/runtime/controller" + "github.com/fluxcd/pkg/runtime/events" + "github.com/fluxcd/pkg/runtime/leaderelection" + "github.com/fluxcd/pkg/runtime/logger" + "github.com/fluxcd/pkg/runtime/pprof" + "github.com/fluxcd/pkg/runtime/probes" + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" "github.com/fluxcd/source-controller/controllers" // +kubebuilder:scaffold:imports @@ -144,14 +145,14 @@ func main() { probes.SetupChecks(mgr, setupLog) pprof.SetupHandlers(mgr, setupLog) - eventsH := helper.MakeEvents(mgr, controllerName, eventRecorder) - metricsH := helper.MustMakeMetrics(mgr) - if storageAdvAddr == "" { storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog) } storage := mustInitStorage(storagePath, storageAdvAddr, setupLog) + eventsH := helper.MakeEvents(mgr, controllerName, eventRecorder) + metricsH := helper.MustMakeMetrics(mgr) + if err = (&controllers.GitRepositoryReconciler{ Client: mgr.GetClient(), Events: eventsH, @@ -193,12 +194,10 @@ func main() { os.Exit(1) } if err = (&controllers.BucketReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Storage: storage, - EventRecorder: mgr.GetEventRecorderFor(controllerName), - ExternalEventRecorder: eventRecorder, - MetricsRecorder: metricsH.MetricsRecorder, + Client: mgr.GetClient(), + Events: eventsH, + Metrics: metricsH, + Storage: storage, }).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{ MaxConcurrentReconciles: concurrent, }); err != nil {