diff --git a/api/v1beta2/bucket_types.go b/api/v1beta2/bucket_types.go
index e3df2a10..eee511ea 100644
--- a/api/v1beta2/bucket_types.go
+++ b/api/v1beta2/bucket_types.go
@@ -19,12 +19,10 @@ package v1beta2
import (
"time"
- apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/fluxcd/pkg/apis/acl"
"github.com/fluxcd/pkg/apis/meta"
- "github.com/fluxcd/pkg/runtime/conditions"
)
const (
@@ -32,6 +30,19 @@ const (
BucketKind = "Bucket"
)
+const (
+ GenericBucketProvider string = "generic"
+ AmazonBucketProvider string = "aws"
+ GoogleBucketProvider string = "gcp"
+)
+
+const (
+ // DownloadFailedCondition indicates a transient or persistent download failure. If True, observations on the
+ // upstream Source revision are not possible, and the Artifact available for the Source may be outdated.
+ // This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
+ DownloadFailedCondition string = "DownloadFailed"
+)
+
// BucketSpec defines the desired state of an S3 compatible bucket
type BucketSpec struct {
// The S3 compatible storage provider name, default ('generic').
@@ -85,12 +96,6 @@ type BucketSpec struct {
AccessFrom *acl.AccessFrom `json:"accessFrom,omitempty"`
}
-const (
- GenericBucketProvider string = "generic"
- AmazonBucketProvider string = "aws"
- GoogleBucketProvider string = "gcp"
-)
-
// BucketStatus defines the observed state of a bucket
type BucketStatus struct {
// ObservedGeneration is the last observed generation.
@@ -122,45 +127,6 @@ const (
BucketOperationFailedReason string = "BucketOperationFailed"
)
-// BucketProgressing resets the conditions of the Bucket to metav1.Condition of
-// type meta.ReadyCondition with status 'Unknown' and meta.ProgressingReason
-// reason and message. It returns the modified Bucket.
-func BucketProgressing(bucket Bucket) Bucket {
- bucket.Status.ObservedGeneration = bucket.Generation
- bucket.Status.URL = ""
- bucket.Status.Conditions = []metav1.Condition{}
- conditions.MarkUnknown(&bucket, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress")
- return bucket
-}
-
-// BucketReady sets the given Artifact and URL on the Bucket and sets the
-// meta.ReadyCondition to 'True', with the given reason and message. It returns
-// the modified Bucket.
-func BucketReady(bucket Bucket, artifact Artifact, url, reason, message string) Bucket {
- bucket.Status.Artifact = &artifact
- bucket.Status.URL = url
- conditions.MarkTrue(&bucket, meta.ReadyCondition, reason, message)
- return bucket
-}
-
-// BucketNotReady sets the meta.ReadyCondition on the Bucket to 'False', with
-// the given reason and message. It returns the modified Bucket.
-func BucketNotReady(bucket Bucket, reason, message string) Bucket {
- conditions.MarkFalse(&bucket, meta.ReadyCondition, reason, message)
- return bucket
-}
-
-// BucketReadyMessage returns the message of the metav1.Condition of type
-// meta.ReadyCondition with status 'True' if present, or an empty string.
-func BucketReadyMessage(bucket Bucket) string {
- if c := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil {
- if c.Status == metav1.ConditionTrue {
- return c.Message
- }
- }
- return ""
-}
-
// GetConditions returns the status conditions of the object.
func (in Bucket) GetConditions() []metav1.Condition {
return in.Status.Conditions
diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go
index 44b9ee03..e97f97b2 100644
--- a/controllers/bucket_controller.go
+++ b/controllers/bucket_controller.go
@@ -18,24 +18,27 @@ package controllers
import (
"context"
- "crypto/sha1"
+ "crypto/sha256"
"fmt"
"os"
"path/filepath"
+ "sort"
"strings"
"time"
+ gcpstorage "cloud.google.com/go/storage"
+ "github.com/fluxcd/source-controller/pkg/gcp"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/s3utils"
+ "golang.org/x/sync/errgroup"
+ "golang.org/x/sync/semaphore"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
- apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
+ kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
- "k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -43,10 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/fluxcd/pkg/apis/meta"
- "github.com/fluxcd/pkg/runtime/events"
- "github.com/fluxcd/pkg/runtime/metrics"
+ "github.com/fluxcd/pkg/runtime/conditions"
+ helper "github.com/fluxcd/pkg/runtime/controller"
+ "github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
- "github.com/fluxcd/source-controller/pkg/gcp"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/pkg/sourceignore"
@@ -60,11 +63,10 @@ import (
// BucketReconciler reconciles a Bucket object
type BucketReconciler struct {
client.Client
- Scheme *runtime.Scheme
- Storage *Storage
- EventRecorder kuberecorder.EventRecorder
- ExternalEventRecorder *events.Recorder
- MetricsRecorder *metrics.Recorder
+ kuberecorder.EventRecorder
+ helper.Metrics
+
+ Storage *Storage
}
type BucketReconcilerOptions struct {
@@ -83,244 +85,434 @@ func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts Buc
Complete(r)
}
-func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
+func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
- var bucket sourcev1.Bucket
- if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {
+ // Fetch the Bucket
+ obj := &sourcev1.Bucket{}
+ if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Record suspended status metric
- defer r.recordSuspension(ctx, bucket)
+ r.RecordSuspend(ctx, obj, obj.Spec.Suspend)
- // Add our finalizer if it does not exist
- if !controllerutil.ContainsFinalizer(&bucket, sourcev1.SourceFinalizer) {
- patch := client.MergeFrom(bucket.DeepCopy())
- controllerutil.AddFinalizer(&bucket, sourcev1.SourceFinalizer)
- if err := r.Patch(ctx, &bucket, patch); err != nil {
- log.Error(err, "unable to register finalizer")
- return ctrl.Result{}, err
- }
- }
-
- // Examine if the object is under deletion
- if !bucket.ObjectMeta.DeletionTimestamp.IsZero() {
- return r.reconcileDelete(ctx, bucket)
- }
-
- // Return early if the object is suspended.
- if bucket.Spec.Suspend {
+ // Return early if the object is suspended
+ if obj.Spec.Suspend {
log.Info("Reconciliation is suspended for this object")
return ctrl.Result{}, nil
}
- // record reconciliation duration
- if r.MetricsRecorder != nil {
- objRef, err := reference.GetReference(r.Scheme, &bucket)
- if err != nil {
- return ctrl.Result{}, err
- }
- defer r.MetricsRecorder.RecordDuration(*objRef, start)
- }
-
- // set initial status
- if resetBucket, ok := r.resetStatus(bucket); ok {
- bucket = resetBucket
- if err := r.updateStatus(ctx, req, bucket.Status); err != nil {
- log.Error(err, "unable to update status")
- return ctrl.Result{Requeue: true}, err
- }
- r.recordReadiness(ctx, bucket)
- }
-
- // record the value of the reconciliation request, if any
- // TODO(hidde): would be better to defer this in combination with
- // always patching the status sub-resource after a reconciliation.
- if v, ok := meta.ReconcileAnnotationValue(bucket.GetAnnotations()); ok {
- bucket.Status.SetLastHandledReconcileRequest(v)
- }
-
- // purge old artifacts from storage
- if err := r.gc(bucket); err != nil {
- log.Error(err, "unable to purge old artifacts")
- }
-
- // reconcile bucket by downloading its content
- reconciledBucket, reconcileErr := r.reconcile(ctx, *bucket.DeepCopy())
-
- // update status with the reconciliation result
- if err := r.updateStatus(ctx, req, reconciledBucket.Status); err != nil {
- log.Error(err, "unable to update status")
- return ctrl.Result{Requeue: true}, err
- }
-
- // if reconciliation failed, record the failure and requeue immediately
- if reconcileErr != nil {
- r.event(ctx, reconciledBucket, events.EventSeverityError, reconcileErr.Error())
- r.recordReadiness(ctx, reconciledBucket)
- return ctrl.Result{Requeue: true}, reconcileErr
- }
-
- // emit revision change event
- if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
- r.event(ctx, reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
- }
- r.recordReadiness(ctx, reconciledBucket)
-
- log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
- time.Since(start).String(),
- bucket.GetRequeueAfter().String(),
- ))
-
- return ctrl.Result{RequeueAfter: bucket.GetRequeueAfter()}, nil
-}
-
-func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) {
- log := ctrl.LoggerFrom(ctx)
- var err error
- var sourceBucket sourcev1.Bucket
-
- tempDir, err := os.MkdirTemp("", bucket.Name)
+ // Initialize the patch helper
+ patchHelper, err := patch.NewHelper(obj, r.Client)
if err != nil {
- err = fmt.Errorf("tmp dir error: %w", err)
- return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
+ return ctrl.Result{}, err
}
+
+ // Always attempt to patch the object and status after each reconciliation
defer func() {
- if err := os.RemoveAll(tempDir); err != nil {
- log.Error(err, "failed to remove working directory", "path", tempDir)
+ // Record the value of the reconciliation request, if any
+ if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
+ obj.Status.SetLastHandledReconcileRequest(v)
}
+
+ // Summarize the Ready condition based on abnormalities that may have been observed
+ conditions.SetSummary(obj,
+ meta.ReadyCondition,
+ conditions.WithConditions(
+ sourcev1.ArtifactOutdatedCondition,
+ sourcev1.DownloadFailedCondition,
+ sourcev1.ArtifactUnavailableCondition,
+ ),
+ conditions.WithNegativePolarityConditions(
+ sourcev1.ArtifactOutdatedCondition,
+ sourcev1.DownloadFailedCondition,
+ sourcev1.ArtifactUnavailableCondition,
+ ),
+ )
+
+ // Patch the object, ignoring conflicts on the conditions owned by this controller
+ patchOpts := []patch.Option{
+ patch.WithOwnedConditions{
+ Conditions: []string{
+ sourcev1.ArtifactOutdatedCondition,
+ sourcev1.DownloadFailedCondition,
+ sourcev1.ArtifactUnavailableCondition,
+ meta.ReadyCondition,
+ meta.ReconcilingCondition,
+ meta.StalledCondition,
+ },
+ },
+ }
+
+ // Determine if the resource is still being reconciled, or if it has stalled, and record this observation
+ if retErr == nil && (result.IsZero() || !result.Requeue) {
+ // We are no longer reconciling
+ conditions.Delete(obj, meta.ReconcilingCondition)
+
+ // We have now observed this generation
+ patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
+
+ readyCondition := conditions.Get(obj, meta.ReadyCondition)
+ switch readyCondition.Status {
+ case metav1.ConditionFalse:
+ // As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
+ conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message)
+ case metav1.ConditionTrue:
+ // As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
+ conditions.Delete(obj, meta.StalledCondition)
+ }
+ }
+
+ // Finally, patch the resource
+ if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
+ retErr = kerrors.NewAggregate([]error{retErr, err})
+ }
+
+ // Always record readiness and duration metrics
+ r.Metrics.RecordReadiness(ctx, obj)
+ r.Metrics.RecordDuration(ctx, obj, start)
}()
- if bucket.Spec.Provider == sourcev1.GoogleBucketProvider {
- sourceBucket, err = r.reconcileWithGCP(ctx, bucket, tempDir)
- if err != nil {
- return sourceBucket, err
- }
- } else {
- sourceBucket, err = r.reconcileWithMinio(ctx, bucket, tempDir)
- if err != nil {
- return sourceBucket, err
- }
- }
- revision, err := r.checksum(tempDir)
- if err != nil {
- return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
+ // Add finalizer first if not exist to avoid the race condition between init and delete
+ if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
+ controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
+ return ctrl.Result{Requeue: true}, nil
}
- // return early on unchanged revision
- artifact := r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", revision))
- if apimeta.IsStatusConditionTrue(bucket.Status.Conditions, meta.ReadyCondition) && bucket.GetArtifact().HasRevision(artifact.Revision) {
- if artifact.URL != bucket.GetArtifact().URL {
- r.Storage.SetArtifactURL(bucket.GetArtifact())
- bucket.Status.URL = r.Storage.SetHostname(bucket.Status.URL)
- }
- return bucket, nil
+ // Examine if the object is under deletion
+ if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
+ return r.reconcileDelete(ctx, obj)
}
- // create artifact dir
- err = r.Storage.MkdirAll(artifact)
- if err != nil {
- err = fmt.Errorf("mkdir dir error: %w", err)
- return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
- }
-
- // acquire lock
- unlock, err := r.Storage.Lock(artifact)
- if err != nil {
- err = fmt.Errorf("unable to acquire lock: %w", err)
- return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
- }
- defer unlock()
-
- // archive artifact and check integrity
- if err := r.Storage.Archive(&artifact, tempDir, nil); err != nil {
- err = fmt.Errorf("storage archive error: %w", err)
- return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
- }
-
- // update latest symlink
- url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
- if err != nil {
- err = fmt.Errorf("storage symlink error: %w", err)
- return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
- }
-
- message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
- return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil
+ // Reconcile actual object
+ return r.reconcile(ctx, obj)
}
-func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {
- if err := r.gc(bucket); err != nil {
- r.event(ctx, bucket, events.EventSeverityError,
- fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
- // Return the error so we retry the failed garbage collection
+// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
+// produces an error.
+func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
+ // Mark the resource as under reconciliation
+ conditions.MarkReconciling(obj, meta.ProgressingReason, "")
+
+ // Reconcile the storage data
+ if result, err := r.reconcileStorage(ctx, obj); err != nil || result.IsZero() {
+ return result, err
+ }
+
+ // Create temp working dir
+ tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
+ if err != nil {
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason, "Failed to create temporary directory: %s", err)
+ return ctrl.Result{}, err
+ }
+ defer os.RemoveAll(tmpDir)
+
+ // Reconcile the source from upstream
+ var artifact sourcev1.Artifact
+ if result, err := r.reconcileSource(ctx, obj, &artifact, tmpDir); err != nil || result.IsZero() {
+ return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, err
+ }
+
+ // Reconcile the artifact to storage
+ if result, err := r.reconcileArtifact(ctx, obj, artifact, tmpDir); err != nil || result.IsZero() {
+ return result, err
+ }
+
+ return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
+}
+
+// reconcileStorage ensures the current state of the storage matches the desired and previously observed state.
+//
+// All artifacts for the resource except for the current one are garbage collected from the storage.
+// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
+// If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set.
+// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
+//
+// The caller should assume a failure if an error is returned, or the Result is zero.
+func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
+ // Garbage collect previous advertised artifact(s) from storage
+ _ = r.garbageCollect(ctx, obj)
+
+ // Determine if the advertised artifact is still in storage
+ if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
+ obj.Status.Artifact = nil
+ obj.Status.URL = ""
+ }
+
+ // Record that we do not have an artifact
+ if obj.GetArtifact() == nil {
+ conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage")
+ return ctrl.Result{Requeue: true}, nil
+ }
+ conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
+
+ // Always update URLs to ensure hostname is up-to-date
+ // TODO(hidde): we may want to send out an event only if we notice the URL has changed
+ r.Storage.SetArtifactURL(obj.GetArtifact())
+ obj.Status.URL = r.Storage.SetHostname(obj.Status.URL)
+
+ return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
+}
+
+// reconcileSource reconciles the upstream bucket with the client for the given object's Provider, and returns the
+// result.
+// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret
+// fails, it records v1beta1.DownloadFailedCondition=True and returns early.
+//
+// The caller should assume a failure if an error is returned, or the Result is zero.
+func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (ctrl.Result, error) {
+ var secret corev1.Secret
+ if obj.Spec.SecretRef != nil {
+ secretName := types.NamespacedName{
+ Namespace: obj.GetNamespace(),
+ Name: obj.Spec.SecretRef.Name,
+ }
+ if err := r.Get(ctx, secretName, &secret); err != nil {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason,
+ "Failed to get secret '%s': %s", secretName.String(), err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.AuthenticationFailedReason,
+ "Failed to get secret '%s': %s", secretName.String(), err.Error())
+ // Return error as the world as observed may change
+ return ctrl.Result{}, err
+ }
+ }
+
+ switch obj.Spec.Provider {
+ case sourcev1.GoogleBucketProvider:
+ return r.reconcileGCPSource(ctx, obj, artifact, &secret, dir)
+ default:
+ return r.reconcileMinioSource(ctx, obj, artifact, &secret, dir)
+ }
+}
+
+// reconcileMinioSource ensures the upstream Minio client compatible bucket can be reached and downloaded from using the
+// declared configuration, and observes its state.
+//
+// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into
+// account. In case of an error during the download process (including transient errors), it records
+// v1beta1.DownloadFailedCondition=True and returns early.
+// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
+// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
+// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
+//
+// The caller should assume a failure if an error is returned, or the Result is zero.
+func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact,
+ secret *corev1.Secret, dir string) (ctrl.Result, error) {
+ // Build the client with the configuration from the object and secret
+ s3Client, err := r.buildMinioClient(obj, secret)
+ if err != nil {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to construct S3 client: %s", err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Failed to construct S3 client: %s", err.Error())
+ // Return error as the contents of the secret may change
return ctrl.Result{}, err
}
- // Record deleted status
- r.recordReadiness(ctx, bucket)
-
- // Remove our finalizer from the list and update it
- controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer)
- if err := r.Update(ctx, &bucket); err != nil {
- return ctrl.Result{}, err
- }
-
- // Stop reconciliation as the object is being deleted
- return ctrl.Result{}, nil
-}
-
-// reconcileWithGCP handles getting objects from a Google Cloud Platform bucket
-// using a gcp client
-func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
- log := ctrl.LoggerFrom(ctx)
- gcpClient, err := r.authGCP(ctx, bucket)
- if err != nil {
- err = fmt.Errorf("auth error: %w", err)
- return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
- }
- defer gcpClient.Close(log)
-
- ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
+ // Confirm bucket exists
+ ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
-
- exists, err := gcpClient.BucketExists(ctxTimeout, bucket.Spec.BucketName)
+ exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName)
if err != nil {
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error())
+ return ctrl.Result{}, err
}
if !exists {
- err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName)
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Bucket '%s' does not exist", obj.Spec.BucketName)
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Bucket '%s' does not exist", obj.Spec.BucketName)
+ return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName)
}
- // Look for file with ignore rules first.
- path := filepath.Join(tempDir, sourceignore.IgnoreFile)
- if err := gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
- if err == gcp.ErrorObjectDoesNotExist && sourceignore.IgnoreFile != ".sourceignore" {
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
+ // Look for file with ignore rules first
+ path := filepath.Join(dir, sourceignore.IgnoreFile)
+ if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
+ if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
+ return ctrl.Result{}, err
}
}
ps, err := sourceignore.ReadIgnoreFile(path, nil)
if err != nil {
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
+ return ctrl.Result{}, err
}
// In-spec patterns take precedence
- if bucket.Spec.Ignore != nil {
- ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
+ if obj.Spec.Ignore != nil {
+ ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
}
matcher := sourceignore.NewMatcher(ps)
- objects := gcpClient.ListObjects(ctxTimeout, bucket.Spec.BucketName, nil)
- // download bucket content
+
+ // Build up an index of object keys and their etags
+ // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
+ // detect both structural and file changes
+ var index = make(etagIndex)
+ for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{
+ Recursive: true,
+ UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
+ }) {
+ if err = object.Err; err != nil {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
+ return ctrl.Result{}, err
+ }
+
+ // Ignore directories and the .sourceignore file
+ if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
+ continue
+ }
+ // Ignore matches
+ if matcher.Match(strings.Split(object.Key, "/"), false) {
+ continue
+ }
+
+ index[object.Key] = object.ETag
+ }
+
+ // Calculate revision checksum from the collected index values
+ revision, err := index.Revision()
+ if err != nil {
+ ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision")
+ return ctrl.Result{}, err
+ }
+
+ if !obj.GetArtifact().HasRevision(revision) {
+ // Mark observations about the revision on the object
+ conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision",
+ "New upstream revision '%s'", revision)
+
+ // Download the files in parallel, but with a limited number of workers
+ group, groupCtx := errgroup.WithContext(ctx)
+ group.Go(func() error {
+ const workers = 4
+ sem := semaphore.NewWeighted(workers)
+ for key := range index {
+ k := key
+ if err := sem.Acquire(groupCtx, 1); err != nil {
+ return err
+ }
+ group.Go(func() error {
+ defer sem.Release(1)
+ localPath := filepath.Join(dir, k)
+ if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil {
+ return fmt.Errorf("failed to get '%s' file: %w", k, err)
+ }
+ return nil
+ })
+ }
+ return nil
+ })
+ if err = group.Wait(); err != nil {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
+ return ctrl.Result{}, err
+ }
+ r.Eventf(obj, corev1.EventTypeNormal, sourcev1.BucketOperationSucceedReason,
+ "Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision)
+ }
+ conditions.Delete(obj, sourcev1.DownloadFailedCondition)
+
+ // Create potential new artifact
+ *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
+ return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
+}
+
+// reconcileGCPSource ensures the upstream Google Cloud Storage bucket can be reached and downloaded from using the
+// declared configuration, and observes its state.
+//
+// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into
+// account. In case of an error during the download process (including transient errors), it records
+// v1beta1.DownloadFailedCondition=True and returns early.
+// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
+// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
+// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
+//
+// The caller should assume a failure if an error is returned, or the Result is zero.
+func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact,
+ secret *corev1.Secret, dir string) (ctrl.Result, error) {
+ gcpClient, err := r.buildGCPClient(ctx, secret)
+ if err != nil {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to construct GCP client: %s", err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Failed to construct GCP client: %s", err.Error())
+ // Return error as the contents of the secret may change
+ return ctrl.Result{}, err
+ }
+ defer gcpClient.Close(ctrl.LoggerFrom(ctx))
+
+ // Confirm bucket exists
+ ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
+ defer cancel()
+ exists, err := gcpClient.BucketExists(ctxTimeout, obj.Spec.BucketName)
+ if err != nil {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error())
+ return ctrl.Result{}, err
+ }
+ if !exists {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Bucket '%s' does not exist", obj.Spec.BucketName)
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Bucket '%s' does not exist", obj.Spec.BucketName)
+ return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName)
+ }
+
+ // Look for file with ignore rules first
+ path := filepath.Join(dir, sourceignore.IgnoreFile)
+ if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
+ if err != gcpstorage.ErrObjectNotExist {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
+ return ctrl.Result{}, err
+ }
+ }
+ ps, err := sourceignore.ReadIgnoreFile(path, nil)
+ if err != nil {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
+ return ctrl.Result{}, err
+ }
+ // In-spec patterns take precedence
+ if obj.Spec.Ignore != nil {
+ ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
+ }
+ matcher := sourceignore.NewMatcher(ps)
+
+ // Build up an index of object keys and their etags
+ // As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
+ // detect both structural and file changes
+ var index = make(etagIndex)
+ objects := gcpClient.ListObjects(ctxTimeout, obj.Spec.BucketName, nil)
for {
object, err := objects.Next()
- if err == gcp.IteratorDone {
- break
- }
if err != nil {
- err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
+ if err == gcp.IteratorDone {
+ break
+ }
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
+ return ctrl.Result{}, err
}
if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile {
@@ -331,98 +523,211 @@ func (r *BucketReconciler) reconcileWithGCP(ctx context.Context, bucket sourcev1
continue
}
- localPath := filepath.Join(tempDir, object.Name)
- if err = gcpClient.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Name, localPath); err != nil {
- err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
- }
+ index[object.Name] = object.Etag
}
- return sourcev1.Bucket{}, nil
+
+ // Calculate revision checksum from the collected index values
+ revision, err := index.Revision()
+ if err != nil {
+ ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision")
+ return ctrl.Result{}, err
+ }
+
+ if !obj.GetArtifact().HasRevision(revision) {
+ // Mark observations about the revision on the object
+ conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision",
+ "New upstream revision '%s'", revision)
+
+ // Download the files in parallel, but with a limited number of workers
+ group, groupCtx := errgroup.WithContext(ctx)
+ group.Go(func() error {
+ const workers = 4
+ sem := semaphore.NewWeighted(workers)
+ for key := range index {
+ k := key
+ if err := sem.Acquire(groupCtx, 1); err != nil {
+ return err
+ }
+ group.Go(func() error {
+ defer sem.Release(1)
+ localPath := filepath.Join(dir, k)
+ if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath); err != nil {
+ return fmt.Errorf("failed to get '%s' file: %w", k, err)
+ }
+ return nil
+ })
+ }
+ return nil
+ })
+ if err = group.Wait(); err != nil {
+ conditions.MarkTrue(obj, sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason,
+ "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
+ "Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
+ return ctrl.Result{}, err
+ }
+ r.Eventf(obj, corev1.EventTypeNormal, sourcev1.BucketOperationSucceedReason,
+ "Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision)
+ }
+ conditions.Delete(obj, sourcev1.DownloadFailedCondition)
+
+ // Create potential new artifact
+ *artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
+ return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
-// reconcileWithMinio handles getting objects from an S3 compatible bucket
-// using a minio client
-func (r *BucketReconciler) reconcileWithMinio(ctx context.Context, bucket sourcev1.Bucket, tempDir string) (sourcev1.Bucket, error) {
- s3Client, err := r.authMinio(ctx, bucket)
+// reconcileArtifact archives a new artifact to the storage, if the current observation on the object does not match the
+// given data.
+//
+// The inspection of the given data to the object is differed, ensuring any stale observations as
+// v1beta1.ArtifactUnavailableCondition and v1beta1.ArtifactOutdatedCondition are always deleted.
+// If the given artifact does not differ from the object's current, it returns early.
+// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
+// updated to its path.
+//
+// The caller should assume a failure if an error is returned, or the Result is zero.
+func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) (ctrl.Result, error) {
+ // Always restore the Ready condition in case it got removed due to a transient error
+ defer func() {
+ if obj.GetArtifact() != nil {
+ conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
+ }
+ if obj.GetArtifact().HasRevision(artifact.Revision) {
+ conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
+ conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason,
+ "Stored artifact for revision '%s'", artifact.Revision)
+ }
+ }()
+
+ // The artifact is up-to-date
+ if obj.GetArtifact().HasRevision(artifact.Revision) {
+ ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("Already up to date, current revision '%s'", artifact.Revision))
+ return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
+ }
+
+ // Ensure target path exists and is a directory
+ if f, err := os.Stat(dir); err != nil {
+ ctrl.LoggerFrom(ctx).Error(err, "failed to stat source path")
+ return ctrl.Result{}, err
+ } else if !f.IsDir() {
+ err := fmt.Errorf("source path '%s' is not a directory", dir)
+ ctrl.LoggerFrom(ctx).Error(err, "invalid target path")
+ return ctrl.Result{}, err
+ }
+
+ // Ensure artifact directory exists and acquire lock
+ if err := r.Storage.MkdirAll(artifact); err != nil {
+ ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory")
+ return ctrl.Result{}, err
+ }
+ unlock, err := r.Storage.Lock(artifact)
if err != nil {
- err = fmt.Errorf("auth error: %w", err)
- return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
+ ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact")
+ return ctrl.Result{}, err
}
+ defer unlock()
- ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
- defer cancel()
+ // Archive directory to storage
+ if err := r.Storage.Archive(&artifact, dir, nil); err != nil {
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
+ "Unable to archive artifact to storage: %s", err)
+ return ctrl.Result{}, err
+ }
+ r.AnnotatedEventf(obj, map[string]string{
+ "revision": artifact.Revision,
+ "checksum": artifact.Checksum,
+ }, corev1.EventTypeNormal, "NewArtifact", "Stored artifact for revision '%s'", artifact.Revision)
- exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName)
+ // Record it on the object
+ obj.Status.Artifact = artifact.DeepCopy()
+
+ // Update symlink on a "best effort" basis
+ url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
if err != nil {
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
+ r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
+ "Failed to update status URL symlink: %s", err)
}
- if !exists {
- err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName)
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
+ if url != "" {
+ obj.Status.URL = url
}
-
- // Look for file with ignore rules first
- // NB: S3 has flat filepath keys making it impossible to look
- // for files in "subdirectories" without building up a tree first.
- path := filepath.Join(tempDir, sourceignore.IgnoreFile)
- if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
- if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
- }
- }
- ps, err := sourceignore.ReadIgnoreFile(path, nil)
- if err != nil {
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
- }
- // In-spec patterns take precedence
- if bucket.Spec.Ignore != nil {
- ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
- }
- matcher := sourceignore.NewMatcher(ps)
-
- // download bucket content
- for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{
- Recursive: true,
- UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
- }) {
- if object.Err != nil {
- err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err)
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
- }
-
- if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
- continue
- }
-
- if matcher.Match(strings.Split(object.Key, "/"), false) {
- continue
- }
-
- localPath := filepath.Join(tempDir, object.Key)
- err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{})
- if err != nil {
- err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
- return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
- }
- }
- return sourcev1.Bucket{}, nil
+ return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
-// authGCP creates a new Google Cloud Platform storage client
-// to interact with the storage service.
-func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket) (*gcp.GCPClient, error) {
+// reconcileDelete handles the deletion of an object. It first garbage collects all artifacts for the object from the
+// artifact storage, if successful, the finalizer is removed from the object.
+func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
+ // Garbage collect the resource's artifacts
+ if err := r.garbageCollect(ctx, obj); err != nil {
+ // Return the error so we retry the failed garbage collection
+ return ctrl.Result{}, err
+ }
+
+ // Remove our finalizer from the list
+ controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
+
+ // Stop reconciliation as the object is being deleted
+ return ctrl.Result{}, nil
+}
+
+// garbageCollect performs a garbage collection for the given v1beta1.Bucket. It removes all but the current
+// artifact except for when the deletion timestamp is set, which will result in the removal of all artifacts for the
+// resource.
+func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Bucket) error {
+ if !obj.DeletionTimestamp.IsZero() {
+ if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil {
+ r.Eventf(obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
+ "Garbage collection for deleted resource failed: %s", err)
+ return err
+ }
+ obj.Status.Artifact = nil
+ // TODO(hidde): we should only push this event if we actually garbage collected something
+ r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded",
+ "Garbage collected artifacts for deleted resource")
+ return nil
+ }
+ if obj.GetArtifact() != nil {
+ if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
+ r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionFailed", "Garbage collection of old artifacts failed: %s", err)
+ return err
+ }
+ // TODO(hidde): we should only push this event if we actually garbage collected something
+ r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded", "Garbage collected old artifacts")
+ }
+ return nil
+}
+
+// buildMinioClient constructs a minio.Client with the data from the given object and Secret.
+// It returns an error if the Secret does not have the required fields, or if there is no credential handler
+// configured.
+func (r *BucketReconciler) buildMinioClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) {
+ opts := minio.Options{
+ Region: obj.Spec.Region,
+ Secure: !obj.Spec.Insecure,
+ }
+ if secret != nil {
+ var accessKey, secretKey string
+ if k, ok := secret.Data["accesskey"]; ok {
+ accessKey = string(k)
+ }
+ if k, ok := secret.Data["secretkey"]; ok {
+ secretKey = string(k)
+ }
+ if accessKey == "" || secretKey == "" {
+ return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
+ }
+ opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "")
+ } else if obj.Spec.Provider == sourcev1.AmazonBucketProvider {
+ opts.Creds = credentials.NewIAM("")
+ }
+ return minio.New(obj.Spec.Endpoint, &opts)
+}
+
+// buildGCPClient constructs a gcp.GCPClient with the data from the given Secret.
+// It returns an error if the Secret does not have the required field, or if the client construction fails.
+func (r *BucketReconciler) buildGCPClient(ctx context.Context, secret *corev1.Secret) (*gcp.GCPClient, error) {
var client *gcp.GCPClient
var err error
- if bucket.Spec.SecretRef != nil {
- secretName := types.NamespacedName{
- Namespace: bucket.GetNamespace(),
- Name: bucket.Spec.SecretRef.Name,
- }
-
- var secret corev1.Secret
- if err := r.Get(ctx, secretName, &secret); err != nil {
- return nil, fmt.Errorf("credentials secret error: %w", err)
- }
+ if secret != nil {
if err := gcp.ValidateSecret(secret.Data, secret.Name); err != nil {
return nil, err
}
@@ -437,165 +742,26 @@ func (r *BucketReconciler) authGCP(ctx context.Context, bucket sourcev1.Bucket)
}
}
return client, nil
-
}
-// authMinio creates a new Minio client to interact with S3
-// compatible storage services.
-func (r *BucketReconciler) authMinio(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) {
- opt := minio.Options{
- Region: bucket.Spec.Region,
- Secure: !bucket.Spec.Insecure,
+// etagIndex is an index of bucket keys and their Etag values.
+type etagIndex map[string]string
+
+// Revision calculates the SHA256 checksum of the index.
+// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of
+// the key/value pairs, each pair written on a newline
+// The sum result is returned as a string.
+func (i etagIndex) Revision() (string, error) {
+ keyIndex := make([]string, 0, len(i))
+ for k := range i {
+ keyIndex = append(keyIndex, k)
}
-
- if bucket.Spec.SecretRef != nil {
- secretName := types.NamespacedName{
- Namespace: bucket.GetNamespace(),
- Name: bucket.Spec.SecretRef.Name,
+ sort.Strings(keyIndex)
+ sum := sha256.New()
+ for _, k := range keyIndex {
+ if _, err := sum.Write([]byte(fmt.Sprintf("%s %s\n", k, i[k]))); err != nil {
+ return "", err
}
-
- var secret corev1.Secret
- if err := r.Get(ctx, secretName, &secret); err != nil {
- return nil, fmt.Errorf("credentials secret error: %w", err)
- }
-
- accesskey := ""
- secretkey := ""
- if k, ok := secret.Data["accesskey"]; ok {
- accesskey = string(k)
- }
- if k, ok := secret.Data["secretkey"]; ok {
- secretkey = string(k)
- }
- if accesskey == "" || secretkey == "" {
- return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
- }
- opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "")
- } else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider {
- opt.Creds = credentials.NewIAM("")
- }
-
- if opt.Creds == nil {
- return nil, fmt.Errorf("no bucket credentials found")
- }
-
- return minio.New(bucket.Spec.Endpoint, &opt)
-}
-
-// checksum calculates the SHA1 checksum of the given root directory.
-// It traverses the given root directory and calculates the checksum for any found file, and returns the SHA1 sum of the
-// list with relative file paths and their checksums.
-func (r *BucketReconciler) checksum(root string) (string, error) {
- sum := sha1.New()
- if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if !info.Mode().IsRegular() {
- return nil
- }
- data, err := os.ReadFile(path)
- if err != nil {
- return err
- }
- relPath, err := filepath.Rel(root, path)
- if err != nil {
- return err
- }
- sum.Write([]byte(fmt.Sprintf("%x %s\n", sha1.Sum(data), relPath)))
- return nil
- }); err != nil {
- return "", err
}
return fmt.Sprintf("%x", sum.Sum(nil)), nil
}
-
-// resetStatus returns a modified v1beta1.Bucket and a boolean indicating
-// if the status field has been reset.
-func (r *BucketReconciler) resetStatus(bucket sourcev1.Bucket) (sourcev1.Bucket, bool) {
- // We do not have an artifact, or it does no longer exist
- if bucket.GetArtifact() == nil || !r.Storage.ArtifactExist(*bucket.GetArtifact()) {
- bucket = sourcev1.BucketProgressing(bucket)
- bucket.Status.Artifact = nil
- return bucket, true
- }
- if bucket.Generation != bucket.Status.ObservedGeneration {
- return sourcev1.BucketProgressing(bucket), true
- }
- return bucket, false
-}
-
-// gc performs a garbage collection for the given v1beta1.Bucket.
-// It removes all but the current artifact except for when the
-// deletion timestamp is set, which will result in the removal of
-// all artifacts for the resource.
-func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error {
- if !bucket.DeletionTimestamp.IsZero() {
- return r.Storage.RemoveAll(r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), "", "*"))
- }
- if bucket.GetArtifact() != nil {
- return r.Storage.RemoveAllButCurrent(*bucket.GetArtifact())
- }
- return nil
-}
-
-// event emits a Kubernetes event and forwards the event to notification controller if configured
-func (r *BucketReconciler) event(ctx context.Context, bucket sourcev1.Bucket, severity, msg string) {
- if r.EventRecorder != nil {
- r.EventRecorder.Eventf(&bucket, corev1.EventTypeNormal, severity, msg)
- }
- if r.ExternalEventRecorder != nil {
- r.ExternalEventRecorder.Eventf(&bucket, corev1.EventTypeNormal, severity, msg)
- }
-}
-
-func (r *BucketReconciler) recordReadiness(ctx context.Context, bucket sourcev1.Bucket) {
- log := ctrl.LoggerFrom(ctx)
- if r.MetricsRecorder == nil {
- return
- }
- objRef, err := reference.GetReference(r.Scheme, &bucket)
- if err != nil {
- log.Error(err, "unable to record readiness metric")
- return
- }
- if rc := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil {
- r.MetricsRecorder.RecordCondition(*objRef, *rc, !bucket.DeletionTimestamp.IsZero())
- } else {
- r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
- Type: meta.ReadyCondition,
- Status: metav1.ConditionUnknown,
- }, !bucket.DeletionTimestamp.IsZero())
- }
-}
-
-func (r *BucketReconciler) recordSuspension(ctx context.Context, bucket sourcev1.Bucket) {
- if r.MetricsRecorder == nil {
- return
- }
- log := ctrl.LoggerFrom(ctx)
-
- objRef, err := reference.GetReference(r.Scheme, &bucket)
- if err != nil {
- log.Error(err, "unable to record suspended metric")
- return
- }
-
- if !bucket.DeletionTimestamp.IsZero() {
- r.MetricsRecorder.RecordSuspend(*objRef, false)
- } else {
- r.MetricsRecorder.RecordSuspend(*objRef, bucket.Spec.Suspend)
- }
-}
-
-func (r *BucketReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.BucketStatus) error {
- var bucket sourcev1.Bucket
- if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {
- return err
- }
-
- patch := client.MergeFrom(bucket.DeepCopy())
- bucket.Status = newStatus
-
- return r.Status().Patch(ctx, &bucket, patch)
-}
diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go
index 01ff20d8..b5c9debe 100644
--- a/controllers/bucket_controller_test.go
+++ b/controllers/bucket_controller_test.go
@@ -17,59 +17,573 @@ limitations under the License.
package controllers
import (
+ "context"
+ "crypto/md5"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
"os"
+ "path"
"path/filepath"
+ "strings"
"testing"
+ "time"
+
+ "github.com/go-logr/logr"
+ . "github.com/onsi/gomega"
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/tools/record"
+ ctrl "sigs.k8s.io/controller-runtime"
+ "sigs.k8s.io/controller-runtime/pkg/client"
+ fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
+ "sigs.k8s.io/controller-runtime/pkg/log"
+
+ "github.com/fluxcd/pkg/apis/meta"
+ "github.com/fluxcd/pkg/runtime/conditions"
+
+ sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
)
-func TestBucketReconciler_checksum(t *testing.T) {
+func TestBucketReconciler_Reconcile(t *testing.T) {
+ g := NewWithT(t)
+
+ s3Server := newS3Server("test-bucket")
+ s3Server.Objects = []*s3MockObject{
+ {
+ Key: "test.yaml",
+ Content: []byte("test"),
+ ContentType: "text/plain",
+ LastModified: time.Now(),
+ },
+ }
+ s3Server.Start()
+ defer s3Server.Stop()
+
+ g.Expect(s3Server.HTTPAddress()).ToNot(BeEmpty())
+ u, err := url.Parse(s3Server.HTTPAddress())
+ g.Expect(err).NotTo(HaveOccurred())
+
+ secret := &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ GenerateName: "bucket-reconcile-",
+ Namespace: "default",
+ },
+ Data: map[string][]byte{
+ "accesskey": []byte("key"),
+ "secretkey": []byte("secret"),
+ },
+ }
+ g.Expect(testEnv.Create(ctx, secret)).To(Succeed())
+ defer testEnv.Delete(ctx, secret)
+
+ obj := &sourcev1.Bucket{
+ ObjectMeta: metav1.ObjectMeta{
+ GenerateName: "bucket-reconcile-",
+ Namespace: "default",
+ },
+ Spec: sourcev1.BucketSpec{
+ Provider: "generic",
+ BucketName: s3Server.BucketName,
+ Endpoint: u.Host,
+ Insecure: true,
+ Interval: metav1.Duration{Duration: interval},
+ Timeout: &metav1.Duration{Duration: timeout},
+ SecretRef: &meta.LocalObjectReference{
+ Name: secret.Name,
+ },
+ },
+ }
+ g.Expect(testEnv.Create(ctx, obj)).To(Succeed())
+
+ key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}
+
+ // Wait for finalizer to be set
+ g.Eventually(func() bool {
+ if err := testEnv.Get(ctx, key, obj); err != nil {
+ return false
+ }
+ return len(obj.Finalizers) > 0
+ }, timeout).Should(BeTrue())
+
+ // Wait for Bucket to be Ready
+ g.Eventually(func() bool {
+ if err := testEnv.Get(ctx, key, obj); err != nil {
+ return false
+ }
+ if !conditions.IsReady(obj) || obj.Status.Artifact == nil {
+ return false
+ }
+ readyCondition := conditions.Get(obj, meta.ReadyCondition)
+ return obj.Generation == readyCondition.ObservedGeneration &&
+ obj.Generation == obj.Status.ObservedGeneration
+ }, timeout).Should(BeTrue())
+
+ g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
+
+ // Wait for Bucket to be deleted
+ g.Eventually(func() bool {
+ if err := testEnv.Get(ctx, key, obj); err != nil {
+ return apierrors.IsNotFound(err)
+ }
+ return false
+ }, timeout).Should(BeTrue())
+}
+
+func TestBucketReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
- name string
- beforeFunc func(root string)
- want string
- wantErr bool
+ name string
+ beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
+ want ctrl.Result
+ wantErr bool
+ assertArtifact *sourcev1.Artifact
+ assertConditions []metav1.Condition
+ assertPaths []string
}{
{
- name: "empty root",
- want: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
+ name: "garbage collects",
+ beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
+ revisions := []string{"a", "b", "c"}
+ for n := range revisions {
+ v := revisions[n]
+ obj.Status.Artifact = &sourcev1.Artifact{
+ Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
+ Revision: v,
+ }
+ if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
+ return err
+ }
+ if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
+ return err
+ }
+ }
+ testStorage.SetArtifactURL(obj.Status.Artifact)
+ return nil
+ },
+ assertArtifact: &sourcev1.Artifact{
+ Path: "/reconcile-storage/c.txt",
+ Revision: "c",
+ Checksum: "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4",
+ URL: testStorage.Hostname + "/reconcile-storage/c.txt",
+ },
+ assertPaths: []string{
+ "/reconcile-storage/c.txt",
+ "!/reconcile-storage/b.txt",
+ "!/reconcile-storage/a.txt",
+ },
},
{
- name: "with file",
- beforeFunc: func(root string) {
- mockFile(root, "a/b/c.txt", "a dummy string")
+ name: "notices missing artifact in storage",
+ beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
+ obj.Status.Artifact = &sourcev1.Artifact{
+ Path: fmt.Sprintf("/reconcile-storage/invalid.txt"),
+ Revision: "d",
+ }
+ testStorage.SetArtifactURL(obj.Status.Artifact)
+ return nil
+ },
+ want: ctrl.Result{Requeue: true},
+ assertPaths: []string{
+ "!/reconcile-storage/invalid.txt",
+ },
+ assertConditions: []metav1.Condition{
+ *conditions.TrueCondition(sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage"),
},
- want: "309a5e6e96b4a7eea0d1cfaabf1be8ec1c063fa0",
},
{
- name: "with file in different path",
- beforeFunc: func(root string) {
- mockFile(root, "a/b.txt", "a dummy string")
+ name: "updates hostname on diff from current",
+ beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
+ obj.Status.Artifact = &sourcev1.Artifact{
+ Path: fmt.Sprintf("/reconcile-storage/hostname.txt"),
+ Revision: "f",
+ Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2",
+ URL: "http://outdated.com/reconcile-storage/hostname.txt",
+ }
+ if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
+ return err
+ }
+ if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
+ return err
+ }
+ return nil
+ },
+ assertPaths: []string{
+ "/reconcile-storage/hostname.txt",
+ },
+ assertArtifact: &sourcev1.Artifact{
+ Path: "/reconcile-storage/hostname.txt",
+ Revision: "f",
+ Checksum: "971c419dd609331343dee105fffd0f4608dc0bf2",
+ URL: testStorage.Hostname + "/reconcile-storage/hostname.txt",
},
- want: "e28c62b5cc488849950c4355dddc5523712616d4",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- root, err := os.MkdirTemp("", "bucket-checksum-")
- if err != nil {
- t.Fatal(err)
+ g := NewWithT(t)
+
+ r := &BucketReconciler{
+ EventRecorder: record.NewFakeRecorder(32),
+ Storage: testStorage,
+ }
+
+ obj := &sourcev1.Bucket{
+ ObjectMeta: metav1.ObjectMeta{
+ GenerateName: "test-",
+ },
}
- defer os.RemoveAll(root)
if tt.beforeFunc != nil {
- tt.beforeFunc(root)
+ g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
}
- got, err := (&BucketReconciler{}).checksum(root)
- if (err != nil) != tt.wantErr {
- t.Errorf("checksum() error = %v, wantErr %v", err, tt.wantErr)
- return
+
+ got, err := r.reconcileStorage(context.TODO(), obj)
+ g.Expect(err != nil).To(Equal(tt.wantErr))
+ g.Expect(got).To(Equal(tt.want))
+
+ g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact))
+ if tt.assertArtifact != nil && tt.assertArtifact.URL != "" {
+ g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL))
}
- if got != tt.want {
- t.Errorf("checksum() got = %v, want %v", got, tt.want)
+ g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
+
+ for _, p := range tt.assertPaths {
+ absoluteP := filepath.Join(testStorage.BasePath, p)
+ if !strings.HasPrefix(p, "!") {
+ g.Expect(absoluteP).To(BeAnExistingFile())
+ continue
+ }
+ g.Expect(absoluteP).NotTo(BeAnExistingFile())
}
})
}
}
+func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
+ tests := []struct {
+ name string
+ bucketName string
+ bucketObjects []*s3MockObject
+ middleware http.Handler
+ secret *corev1.Secret
+ beforeFunc func(obj *sourcev1.Bucket)
+ want ctrl.Result
+ wantErr bool
+ assertArtifact sourcev1.Artifact
+ assertConditions []metav1.Condition
+ }{
+ {
+ name: "reconciles source",
+ bucketName: "dummy",
+ bucketObjects: []*s3MockObject{
+ {
+ Key: "test.txt",
+ Content: []byte("test"),
+ ContentType: "text/plain",
+ LastModified: time.Now(),
+ },
+ },
+ assertArtifact: sourcev1.Artifact{
+ Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz",
+ Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8",
+ },
+ assertConditions: []metav1.Condition{
+ *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
+ },
+ },
+ // TODO(hidde): middleware for mock server
+ //{
+ // name: "authenticates using secretRef",
+ // bucketName: "dummy",
+ //},
+ {
+ name: "observes non-existing secretRef",
+ bucketName: "dummy",
+ beforeFunc: func(obj *sourcev1.Bucket) {
+ obj.Spec.SecretRef = &meta.LocalObjectReference{
+ Name: "dummy",
+ }
+ },
+ wantErr: true,
+ assertConditions: []metav1.Condition{
+ *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/dummy': secrets \"dummy\" not found"),
+ },
+ },
+ {
+ name: "observes invalid secretRef",
+ bucketName: "dummy",
+ secret: &corev1.Secret{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "dummy",
+ },
+ },
+ beforeFunc: func(obj *sourcev1.Bucket) {
+ obj.Spec.SecretRef = &meta.LocalObjectReference{
+ Name: "dummy",
+ }
+ },
+ wantErr: true,
+ assertConditions: []metav1.Condition{
+ *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to construct S3 client: invalid 'dummy' secret data: required fields"),
+ },
+ },
+ {
+ name: "observes non-existing bucket name",
+ bucketName: "dummy",
+ beforeFunc: func(obj *sourcev1.Bucket) {
+ obj.Spec.BucketName = "invalid"
+ },
+ wantErr: true,
+ assertConditions: []metav1.Condition{
+ *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Bucket 'invalid' does not exist"),
+ },
+ },
+ {
+ name: "transient bucket name API failure",
+ beforeFunc: func(obj *sourcev1.Bucket) {
+ obj.Spec.Endpoint = "transient.example.com"
+ obj.Spec.BucketName = "unavailable"
+ },
+ wantErr: true,
+ assertConditions: []metav1.Condition{
+ *conditions.TrueCondition(sourcev1.DownloadFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket 'unavailable'"),
+ },
+ },
+ {
+ // TODO(hidde): test the lesser happy paths
+ name: ".sourceignore",
+ bucketName: "dummy",
+ bucketObjects: []*s3MockObject{
+ {
+ Key: ".sourceignore",
+ Content: []byte("ignored/file.txt"),
+ ContentType: "text/plain",
+ LastModified: time.Now(),
+ },
+ {
+ Key: "ignored/file.txt",
+ Content: []byte("ignored/file.txt"),
+ ContentType: "text/plain",
+ LastModified: time.Now(),
+ },
+ {
+ Key: "included/file.txt",
+ Content: []byte("included/file.txt"),
+ ContentType: "text/plain",
+ LastModified: time.Now(),
+ },
+ },
+ assertArtifact: sourcev1.Artifact{
+ Path: "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz",
+ Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100",
+ },
+ assertConditions: []metav1.Condition{
+ *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"),
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ g := NewWithT(t)
+
+ builder := fakeclient.NewClientBuilder().WithScheme(testEnv.Scheme())
+ if tt.secret != nil {
+ builder.WithObjects(tt.secret)
+ }
+ r := &BucketReconciler{
+ EventRecorder: record.NewFakeRecorder(32),
+ Client: builder.Build(),
+ Storage: testStorage,
+ }
+ tmpDir, err := os.MkdirTemp("", "reconcile-bucket-source-")
+ g.Expect(err).ToNot(HaveOccurred())
+ defer os.RemoveAll(tmpDir)
+
+ obj := &sourcev1.Bucket{
+ TypeMeta: metav1.TypeMeta{
+ Kind: sourcev1.BucketKind,
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-bucket",
+ },
+ Spec: sourcev1.BucketSpec{
+ Timeout: &metav1.Duration{Duration: timeout},
+ },
+ }
+
+ var server *s3MockServer
+ if tt.bucketName != "" {
+ server = newS3Server(tt.bucketName)
+ server.Objects = tt.bucketObjects
+ server.Start()
+ defer server.Stop()
+
+ g.Expect(server.HTTPAddress()).ToNot(BeEmpty())
+ u, err := url.Parse(server.HTTPAddress())
+ g.Expect(err).NotTo(HaveOccurred())
+
+ obj.Spec.BucketName = tt.bucketName
+ obj.Spec.Endpoint = u.Host
+ // TODO(hidde): also test TLS
+ obj.Spec.Insecure = true
+ }
+ if tt.beforeFunc != nil {
+ tt.beforeFunc(obj)
+ }
+
+ artifact := &sourcev1.Artifact{}
+ got, err := r.reconcileSource(context.TODO(), obj, artifact, tmpDir)
+ g.Expect(err != nil).To(Equal(tt.wantErr))
+ g.Expect(got).To(Equal(tt.want))
+
+ g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy()))
+ g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
+ })
+ }
+}
+
+func TestBucketReconciler_reconcileArtifact(t *testing.T) {
+ tests := []struct {
+ name string
+ artifact sourcev1.Artifact
+ beforeFunc func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string)
+ want ctrl.Result
+ wantErr bool
+ assertConditions []metav1.Condition
+ }{
+ {
+ name: "artifact revision up-to-date",
+ artifact: sourcev1.Artifact{
+ Revision: "existing",
+ },
+ beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
+ obj.Status.Artifact = &artifact
+ },
+ assertConditions: []metav1.Condition{
+ *conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
+ },
+ },
+ {
+ name: "dir path deleted",
+ beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
+ _ = os.RemoveAll(dir)
+ },
+ wantErr: true,
+ },
+ //{
+ // name: "dir path empty",
+ //},
+ //{
+ // name: "success",
+ // artifact: sourcev1.Artifact{
+ // Revision: "existing",
+ // },
+ // beforeFunc: func(obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
+ // obj.Status.Artifact = &artifact
+ // },
+ // assertConditions: []metav1.Condition{
+ // *conditions.TrueCondition(sourcev1.ArtifactAvailableCondition, meta.SucceededReason, "Compressed source to artifact with revision 'existing'"),
+ // },
+ //},
+ //{
+ // name: "symlink",
+ //},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ g := NewWithT(t)
+
+ tmpDir, err := os.MkdirTemp("", "reconcile-bucket-artifact-")
+ g.Expect(err).ToNot(HaveOccurred())
+ defer os.RemoveAll(tmpDir)
+
+ obj := &sourcev1.Bucket{
+ TypeMeta: metav1.TypeMeta{
+ Kind: sourcev1.BucketKind,
+ },
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-bucket",
+ },
+ Spec: sourcev1.BucketSpec{
+ Timeout: &metav1.Duration{Duration: timeout},
+ },
+ }
+
+ if tt.beforeFunc != nil {
+ tt.beforeFunc(obj, tt.artifact, tmpDir)
+ }
+
+ r := &BucketReconciler{
+ EventRecorder: record.NewFakeRecorder(32),
+ Storage: testStorage,
+ }
+
+ dlog := log.NewDelegatingLogSink(log.NullLogSink{})
+ nullLogger := logr.New(dlog)
+ got, err := r.reconcileArtifact(logr.NewContext(ctx, nullLogger), obj, tt.artifact, tmpDir)
+ g.Expect(err != nil).To(Equal(tt.wantErr))
+ g.Expect(got).To(Equal(tt.want))
+
+ //g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy()))
+ g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
+ })
+ }
+}
+
+func Test_etagIndex_Revision(t *testing.T) {
+ tests := []struct {
+ name string
+ list etagIndex
+ want string
+ wantErr bool
+ }{
+ {
+ name: "index with items",
+ list: map[string]string{
+ "one": "one",
+ "two": "two",
+ "three": "three",
+ },
+ want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc",
+ },
+ {
+ name: "index with items in different order",
+ list: map[string]string{
+ "three": "three",
+ "one": "one",
+ "two": "two",
+ },
+ want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc",
+ },
+ {
+ name: "empty index",
+ list: map[string]string{},
+ want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+ },
+ {
+ name: "nil index",
+ list: nil,
+ want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := tt.list.Revision()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if got != tt.want {
+ t.Errorf("revision() got = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+// helpers
+
func mockFile(root, path, content string) error {
filePath := filepath.Join(root, path)
if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil {
@@ -80,3 +594,120 @@ func mockFile(root, path, content string) error {
}
return nil
}
+
+type s3MockObject struct {
+ Key string
+ LastModified time.Time
+ ContentType string
+ Content []byte
+}
+
+type s3MockServer struct {
+ srv *httptest.Server
+ mux *http.ServeMux
+
+ BucketName string
+ Objects []*s3MockObject
+}
+
+func newS3Server(bucketName string) *s3MockServer {
+ s := &s3MockServer{BucketName: bucketName}
+ s.mux = http.NewServeMux()
+ s.mux.Handle(fmt.Sprintf("/%s/", s.BucketName), http.HandlerFunc(s.handler))
+
+ s.srv = httptest.NewUnstartedServer(s.mux)
+
+ return s
+}
+
+func (s *s3MockServer) Start() {
+ s.srv.Start()
+}
+
+func (s *s3MockServer) Stop() {
+ s.srv.Close()
+}
+
+func (s *s3MockServer) HTTPAddress() string {
+ return s.srv.URL
+}
+
+func (s *s3MockServer) handler(w http.ResponseWriter, r *http.Request) {
+ key := path.Base(r.URL.Path)
+
+ switch key {
+ case s.BucketName:
+ w.Header().Add("Content-Type", "application/xml")
+
+ if r.Method == http.MethodHead {
+ return
+ }
+
+ q := r.URL.Query()
+
+ if q["location"] != nil {
+ fmt.Fprint(w, `
+
+Europe
+ `)
+ return
+ }
+
+ contents := ""
+ for _, o := range s.Objects {
+ etag := md5.Sum(o.Content)
+ contents += fmt.Sprintf(`
+
+ %s
+ %s
+ %d
+ "%b"
+ STANDARD
+ `, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag)
+ }
+
+ fmt.Fprintf(w, `
+
+
+ %s
+
+
+ %d
+ 1000
+ false
+ %s
+
+ `, s.BucketName, len(s.Objects), contents)
+ default:
+ key, err := filepath.Rel("/"+s.BucketName, r.URL.Path)
+ if err != nil {
+ w.WriteHeader(500)
+ return
+ }
+
+ var found *s3MockObject
+ for _, o := range s.Objects {
+ if key == o.Key {
+ found = o
+ }
+ }
+ if found == nil {
+ w.WriteHeader(404)
+ return
+ }
+
+ etag := md5.Sum(found.Content)
+ lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1)
+
+ w.Header().Add("Content-Type", found.ContentType)
+ w.Header().Add("Last-Modified", lastModified)
+ w.Header().Add("ETag", fmt.Sprintf("\"%b\"", etag))
+ w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content)))
+
+ if r.Method == http.MethodHead {
+ return
+ }
+
+ w.Write(found.Content)
+ }
+}
diff --git a/controllers/suite_test.go b/controllers/suite_test.go
index a33108dc..b27bda8f 100644
--- a/controllers/suite_test.go
+++ b/controllers/suite_test.go
@@ -97,6 +97,15 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err))
}
+ if err := (&BucketReconciler{
+ Client: testEnv,
+ EventRecorder: record.NewFakeRecorder(32),
+ Metrics: testMetricsH,
+ Storage: testStorage,
+ }).SetupWithManager(testEnv); err != nil {
+ panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err))
+ }
+
go func() {
fmt.Println("Starting the test environment")
if err := testEnv.Start(ctx); err != nil {
diff --git a/main.go b/main.go
index 3c0f2791..4ea5a102 100644
--- a/main.go
+++ b/main.go
@@ -203,11 +203,10 @@ func main() {
os.Exit(1)
}
if err = (&controllers.BucketReconciler{
- Client: mgr.GetClient(),
- Scheme: mgr.GetScheme(),
- Storage: storage,
- EventRecorder: eventRecorder,
- MetricsRecorder: metricsH.MetricsRecorder,
+ Client: mgr.GetClient(),
+ EventRecorder: eventRecorder,
+ Metrics: metricsH,
+ Storage: storage,
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {