Merge pull request #412 from fluxcd/bucket-reconciler
This commit is contained in:
		
						commit
						19d7f8308c
					
				| 
						 | 
				
			
			@ -20,8 +20,6 @@ import (
 | 
			
		|||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/fluxcd/pkg/apis/meta"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/conditions"
 | 
			
		||||
	apimeta "k8s.io/apimachinery/pkg/api/meta"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -30,6 +28,11 @@ const (
 | 
			
		|||
	BucketKind = "Bucket"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	GenericBucketProvider string = "generic"
 | 
			
		||||
	AmazonBucketProvider  string = "aws"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// BucketSpec defines the desired state of an S3 compatible bucket
 | 
			
		||||
type BucketSpec struct {
 | 
			
		||||
	// The S3 compatible storage provider name, default ('generic').
 | 
			
		||||
| 
						 | 
				
			
			@ -79,11 +82,6 @@ type BucketSpec struct {
 | 
			
		|||
	Suspend bool `json:"suspend,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	GenericBucketProvider string = "generic"
 | 
			
		||||
	AmazonBucketProvider  string = "aws"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// BucketStatus defines the observed state of a bucket
 | 
			
		||||
type BucketStatus struct {
 | 
			
		||||
	// ObservedGeneration is the last observed generation.
 | 
			
		||||
| 
						 | 
				
			
			@ -115,45 +113,6 @@ const (
 | 
			
		|||
	BucketOperationFailedReason string = "BucketOperationFailed"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// BucketProgressing resets the conditions of the Bucket to metav1.Condition of
 | 
			
		||||
// type meta.ReadyCondition with status 'Unknown' and meta.ProgressingReason
 | 
			
		||||
// reason and message. It returns the modified Bucket.
 | 
			
		||||
func BucketProgressing(bucket Bucket) Bucket {
 | 
			
		||||
	bucket.Status.ObservedGeneration = bucket.Generation
 | 
			
		||||
	bucket.Status.URL = ""
 | 
			
		||||
	bucket.Status.Conditions = []metav1.Condition{}
 | 
			
		||||
	conditions.MarkUnknown(&bucket, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress")
 | 
			
		||||
	return bucket
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BucketReady sets the given Artifact and URL on the Bucket and sets the
 | 
			
		||||
// meta.ReadyCondition to 'True', with the given reason and message. It returns
 | 
			
		||||
// the modified Bucket.
 | 
			
		||||
func BucketReady(bucket Bucket, artifact Artifact, url, reason, message string) Bucket {
 | 
			
		||||
	bucket.Status.Artifact = &artifact
 | 
			
		||||
	bucket.Status.URL = url
 | 
			
		||||
	conditions.MarkTrue(&bucket, meta.ReadyCondition, reason, message)
 | 
			
		||||
	return bucket
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BucketNotReady sets the meta.ReadyCondition on the Bucket to 'False', with
 | 
			
		||||
// the given reason and message. It returns the modified Bucket.
 | 
			
		||||
func BucketNotReady(bucket Bucket, reason, message string) Bucket {
 | 
			
		||||
	conditions.MarkFalse(&bucket, meta.ReadyCondition, reason, message)
 | 
			
		||||
	return bucket
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// BucketReadyMessage returns the message of the metav1.Condition of type
 | 
			
		||||
// meta.ReadyCondition with status 'True' if present, or an empty string.
 | 
			
		||||
func BucketReadyMessage(bucket Bucket) string {
 | 
			
		||||
	if c := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); c != nil {
 | 
			
		||||
		if c.Status == metav1.ConditionTrue {
 | 
			
		||||
			return c.Message
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetConditions returns the status conditions of the object.
 | 
			
		||||
func (in Bucket) GetConditions() []metav1.Condition {
 | 
			
		||||
	return in.Status.Conditions
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,6 +18,22 @@ package v1beta1
 | 
			
		|||
 | 
			
		||||
const SourceFinalizer = "finalizers.fluxcd.io"
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// ArtifactUnavailableCondition indicates there is no Artifact available for the Source.
 | 
			
		||||
	// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
 | 
			
		||||
	ArtifactUnavailableCondition string = "ArtifactUnavailable"
 | 
			
		||||
 | 
			
		||||
	// ArtifactOutdatedCondition indicates the current Artifact of the Source is outdated.
 | 
			
		||||
	// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
 | 
			
		||||
	ArtifactOutdatedCondition string = "ArtifactOutdated"
 | 
			
		||||
 | 
			
		||||
	// FetchFailedCondition indicates a transient or persistent fetch failure of an upstream Source.
 | 
			
		||||
	// If True, observations on the upstream Source revision may be impossible, and the Artifact available for the
 | 
			
		||||
	// Source may be outdated.
 | 
			
		||||
	// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
 | 
			
		||||
	FetchFailedCondition string = "FetchFailed"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// URLInvalidReason represents the fact that a given source has an invalid URL.
 | 
			
		||||
	URLInvalidReason string = "URLInvalid"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,15 +34,6 @@ const (
 | 
			
		|||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// ArtifactUnavailableCondition indicates there is no Artifact available for the Source.
 | 
			
		||||
	// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
 | 
			
		||||
	ArtifactUnavailableCondition string = "ArtifactUnavailable"
 | 
			
		||||
 | 
			
		||||
	// CheckoutFailedCondition indicates a transient or persistent checkout failure. If True, observations on the
 | 
			
		||||
	// upstream Source revision are not possible, and the Artifact available for the Source may be outdated.
 | 
			
		||||
	// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
 | 
			
		||||
	CheckoutFailedCondition string = "CheckoutFailed"
 | 
			
		||||
 | 
			
		||||
	// SourceVerifiedCondition indicates the integrity of the Source has been verified. If True, the integrity check
 | 
			
		||||
	// succeeded. If False, it failed. The Condition is only present on the resource if the integrity has been verified.
 | 
			
		||||
	SourceVerifiedCondition string = "SourceVerified"
 | 
			
		||||
| 
						 | 
				
			
			@ -51,10 +42,6 @@ const (
 | 
			
		|||
	// exist, or does not have an Artifact.
 | 
			
		||||
	// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
 | 
			
		||||
	IncludeUnavailableCondition string = "IncludeUnavailable"
 | 
			
		||||
 | 
			
		||||
	// ArtifactOutdatedCondition indicates the current Artifact of the Source is outdated.
 | 
			
		||||
	// This is a "negative polarity" or "abnormal-true" type, and is only present on the resource if it is True.
 | 
			
		||||
	ArtifactOutdatedCondition string = "ArtifactOutdated"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// GitRepositorySpec defines the desired state of a Git repository.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,67 @@
 | 
			
		|||
/*
 | 
			
		||||
Copyright 2021 The Flux authors
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package controllers
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
 | 
			
		||||
	. "github.com/onsi/gomega"
 | 
			
		||||
	"github.com/onsi/gomega/types"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// MatchArtifact returns a custom matcher to check equality of a v1beta1.Artifact, the timestamp and URL are ignored.
 | 
			
		||||
func MatchArtifact(expected *sourcev1.Artifact) types.GomegaMatcher {
 | 
			
		||||
	return &matchArtifact{
 | 
			
		||||
		expected: expected,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type matchArtifact struct {
 | 
			
		||||
	expected *sourcev1.Artifact
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m matchArtifact) Match(actual interface{}) (success bool, err error) {
 | 
			
		||||
	actualArtifact, ok := actual.(*sourcev1.Artifact)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return false, fmt.Errorf("actual should be a pointer to an Artifact")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if ok, _ := BeNil().Match(m.expected); ok {
 | 
			
		||||
		return BeNil().Match(actual)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if ok, err = Equal(m.expected.Path).Match(actualArtifact.Path); !ok {
 | 
			
		||||
		return ok, err
 | 
			
		||||
	}
 | 
			
		||||
	if ok, err = Equal(m.expected.Revision).Match(actualArtifact.Revision); !ok {
 | 
			
		||||
		return ok, err
 | 
			
		||||
	}
 | 
			
		||||
	if ok, err = Equal(m.expected.Checksum).Match(actualArtifact.Checksum); !ok {
 | 
			
		||||
		return ok, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ok, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m matchArtifact) FailureMessage(actual interface{}) (message string) {
 | 
			
		||||
	return fmt.Sprintf("expected\n\t%#v\nto match\n\t%#v\n", actual, m.expected)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m matchArtifact) NegatedFailureMessage(actual interface{}) (message string) {
 | 
			
		||||
	return fmt.Sprintf("expected\n\t%#v\nto not match\n\t%#v\n", actual, m.expected)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -18,24 +18,23 @@ package controllers
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"crypto/sha1"
 | 
			
		||||
	"crypto/sha256"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/go-logr/logr"
 | 
			
		||||
	"github.com/minio/minio-go/v7"
 | 
			
		||||
	"github.com/minio/minio-go/v7/pkg/credentials"
 | 
			
		||||
	"github.com/minio/minio-go/v7/pkg/s3utils"
 | 
			
		||||
	"golang.org/x/sync/errgroup"
 | 
			
		||||
	"golang.org/x/sync/semaphore"
 | 
			
		||||
	corev1 "k8s.io/api/core/v1"
 | 
			
		||||
	apimeta "k8s.io/apimachinery/pkg/api/meta"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	kuberecorder "k8s.io/client-go/tools/record"
 | 
			
		||||
	"k8s.io/client-go/tools/reference"
 | 
			
		||||
	kerrors "k8s.io/apimachinery/pkg/util/errors"
 | 
			
		||||
	ctrl "sigs.k8s.io/controller-runtime"
 | 
			
		||||
	"sigs.k8s.io/controller-runtime/pkg/client"
 | 
			
		||||
	"sigs.k8s.io/controller-runtime/pkg/controller"
 | 
			
		||||
| 
						 | 
				
			
			@ -43,8 +42,10 @@ import (
 | 
			
		|||
	"sigs.k8s.io/controller-runtime/pkg/predicate"
 | 
			
		||||
 | 
			
		||||
	"github.com/fluxcd/pkg/apis/meta"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/conditions"
 | 
			
		||||
	helper "github.com/fluxcd/pkg/runtime/controller"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/events"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/metrics"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/patch"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/predicates"
 | 
			
		||||
 | 
			
		||||
	sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
 | 
			
		||||
| 
						 | 
				
			
			@ -59,11 +60,10 @@ import (
 | 
			
		|||
// BucketReconciler reconciles a Bucket object
 | 
			
		||||
type BucketReconciler struct {
 | 
			
		||||
	client.Client
 | 
			
		||||
	Scheme                *runtime.Scheme
 | 
			
		||||
	helper.Events
 | 
			
		||||
	helper.Metrics
 | 
			
		||||
 | 
			
		||||
	Storage *Storage
 | 
			
		||||
	EventRecorder         kuberecorder.EventRecorder
 | 
			
		||||
	ExternalEventRecorder *events.Recorder
 | 
			
		||||
	MetricsRecorder       *metrics.Recorder
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type BucketReconcilerOptions struct {
 | 
			
		||||
| 
						 | 
				
			
			@ -82,403 +82,496 @@ func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts Buc
 | 
			
		|||
		Complete(r)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
 | 
			
		||||
func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	log := ctrl.LoggerFrom(ctx)
 | 
			
		||||
 | 
			
		||||
	var bucket sourcev1.Bucket
 | 
			
		||||
	if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {
 | 
			
		||||
	// Fetch the Bucket
 | 
			
		||||
	obj := &sourcev1.Bucket{}
 | 
			
		||||
	if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
 | 
			
		||||
		return ctrl.Result{}, client.IgnoreNotFound(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Record suspended status metric
 | 
			
		||||
	defer r.recordSuspension(ctx, bucket)
 | 
			
		||||
	r.RecordSuspend(ctx, obj, obj.Spec.Suspend)
 | 
			
		||||
 | 
			
		||||
	// Add our finalizer if it does not exist
 | 
			
		||||
	if !controllerutil.ContainsFinalizer(&bucket, sourcev1.SourceFinalizer) {
 | 
			
		||||
		controllerutil.AddFinalizer(&bucket, sourcev1.SourceFinalizer)
 | 
			
		||||
		if err := r.Update(ctx, &bucket); err != nil {
 | 
			
		||||
			log.Error(err, "unable to register finalizer")
 | 
			
		||||
			return ctrl.Result{}, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Examine if the object is under deletion
 | 
			
		||||
	if !bucket.ObjectMeta.DeletionTimestamp.IsZero() {
 | 
			
		||||
		return r.reconcileDelete(ctx, bucket)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Return early if the object is suspended.
 | 
			
		||||
	if bucket.Spec.Suspend {
 | 
			
		||||
	// Return early if the object is suspended
 | 
			
		||||
	if obj.Spec.Suspend {
 | 
			
		||||
		log.Info("Reconciliation is suspended for this object")
 | 
			
		||||
		return ctrl.Result{}, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// record reconciliation duration
 | 
			
		||||
	if r.MetricsRecorder != nil {
 | 
			
		||||
		objRef, err := reference.GetReference(r.Scheme, &bucket)
 | 
			
		||||
	// Initialize the patch helper
 | 
			
		||||
	patchHelper, err := patch.NewHelper(obj, r.Client)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
		defer r.MetricsRecorder.RecordDuration(*objRef, start)
 | 
			
		||||
 | 
			
		||||
	// Always attempt to patch the object and status after each reconciliation
 | 
			
		||||
	defer func() {
 | 
			
		||||
		// Record the value of the reconciliation request, if any
 | 
			
		||||
		if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
 | 
			
		||||
			obj.Status.SetLastHandledReconcileRequest(v)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	// set initial status
 | 
			
		||||
	if resetBucket, ok := r.resetStatus(bucket); ok {
 | 
			
		||||
		bucket = resetBucket
 | 
			
		||||
		if err := r.updateStatus(ctx, req, bucket.Status); err != nil {
 | 
			
		||||
			log.Error(err, "unable to update status")
 | 
			
		||||
			return ctrl.Result{Requeue: true}, err
 | 
			
		||||
		}
 | 
			
		||||
		r.recordReadiness(ctx, bucket)
 | 
			
		||||
		// Summarize the Ready condition based on abnormalities that may have been observed
 | 
			
		||||
		conditions.SetSummary(obj,
 | 
			
		||||
			meta.ReadyCondition,
 | 
			
		||||
			conditions.WithConditions(
 | 
			
		||||
				sourcev1.ArtifactOutdatedCondition,
 | 
			
		||||
				sourcev1.FetchFailedCondition,
 | 
			
		||||
				sourcev1.ArtifactUnavailableCondition,
 | 
			
		||||
			),
 | 
			
		||||
			conditions.WithNegativePolarityConditions(
 | 
			
		||||
				sourcev1.ArtifactOutdatedCondition,
 | 
			
		||||
				sourcev1.FetchFailedCondition,
 | 
			
		||||
				sourcev1.ArtifactUnavailableCondition,
 | 
			
		||||
			),
 | 
			
		||||
		)
 | 
			
		||||
 | 
			
		||||
		// Patch the object, ignoring conflicts on the conditions owned by this controller
 | 
			
		||||
		patchOpts := []patch.Option{
 | 
			
		||||
			patch.WithOwnedConditions{
 | 
			
		||||
				Conditions: []string{
 | 
			
		||||
					sourcev1.ArtifactOutdatedCondition,
 | 
			
		||||
					sourcev1.FetchFailedCondition,
 | 
			
		||||
					sourcev1.ArtifactUnavailableCondition,
 | 
			
		||||
					meta.ReadyCondition,
 | 
			
		||||
					meta.ReconcilingCondition,
 | 
			
		||||
					meta.StalledCondition,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	// record the value of the reconciliation request, if any
 | 
			
		||||
	// TODO(hidde): would be better to defer this in combination with
 | 
			
		||||
	//   always patching the status sub-resource after a reconciliation.
 | 
			
		||||
	if v, ok := meta.ReconcileAnnotationValue(bucket.GetAnnotations()); ok {
 | 
			
		||||
		bucket.Status.SetLastHandledReconcileRequest(v)
 | 
			
		||||
		// Determine if the resource is still being reconciled, or if it has stalled, and record this observation
 | 
			
		||||
		if retErr == nil && (result.IsZero() || !result.Requeue) {
 | 
			
		||||
			// We are no longer reconciling
 | 
			
		||||
			conditions.Delete(obj, meta.ReconcilingCondition)
 | 
			
		||||
 | 
			
		||||
			// We have now observed this generation
 | 
			
		||||
			patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
 | 
			
		||||
 | 
			
		||||
			readyCondition := conditions.Get(obj, meta.ReadyCondition)
 | 
			
		||||
			switch readyCondition.Status {
 | 
			
		||||
			case metav1.ConditionFalse:
 | 
			
		||||
				// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
 | 
			
		||||
				conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message)
 | 
			
		||||
			case metav1.ConditionTrue:
 | 
			
		||||
				// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
 | 
			
		||||
				conditions.Delete(obj, meta.StalledCondition)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	// purge old artifacts from storage
 | 
			
		||||
	if err := r.gc(bucket); err != nil {
 | 
			
		||||
		log.Error(err, "unable to purge old artifacts")
 | 
			
		||||
		// Finally, patch the resource
 | 
			
		||||
		if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
 | 
			
		||||
			retErr = kerrors.NewAggregate([]error{retErr, err})
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	// reconcile bucket by downloading its content
 | 
			
		||||
	reconciledBucket, reconcileErr := r.reconcile(ctx, *bucket.DeepCopy())
 | 
			
		||||
		// Always record readiness and duration metrics
 | 
			
		||||
		r.Metrics.RecordReadiness(ctx, obj)
 | 
			
		||||
		r.Metrics.RecordDuration(ctx, obj, start)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// update status with the reconciliation result
 | 
			
		||||
	if err := r.updateStatus(ctx, req, reconciledBucket.Status); err != nil {
 | 
			
		||||
		log.Error(err, "unable to update status")
 | 
			
		||||
		return ctrl.Result{Requeue: true}, err
 | 
			
		||||
	// Add finalizer first if not exist to avoid the race condition between init and delete
 | 
			
		||||
	if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
 | 
			
		||||
		controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
 | 
			
		||||
		return ctrl.Result{Requeue: true}, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// if reconciliation failed, record the failure and requeue immediately
 | 
			
		||||
	if reconcileErr != nil {
 | 
			
		||||
		r.event(ctx, reconciledBucket, events.EventSeverityError, reconcileErr.Error())
 | 
			
		||||
		r.recordReadiness(ctx, reconciledBucket)
 | 
			
		||||
		return ctrl.Result{Requeue: true}, reconcileErr
 | 
			
		||||
	// Examine if the object is under deletion
 | 
			
		||||
	if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
 | 
			
		||||
		return r.reconcileDelete(ctx, obj)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// emit revision change event
 | 
			
		||||
	if bucket.Status.Artifact == nil || reconciledBucket.Status.Artifact.Revision != bucket.Status.Artifact.Revision {
 | 
			
		||||
		r.event(ctx, reconciledBucket, events.EventSeverityInfo, sourcev1.BucketReadyMessage(reconciledBucket))
 | 
			
		||||
	}
 | 
			
		||||
	r.recordReadiness(ctx, reconciledBucket)
 | 
			
		||||
 | 
			
		||||
	log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
 | 
			
		||||
		time.Now().Sub(start).String(),
 | 
			
		||||
		bucket.GetRequeueAfter().String(),
 | 
			
		||||
	))
 | 
			
		||||
 | 
			
		||||
	return ctrl.Result{RequeueAfter: bucket.GetRequeueAfter()}, nil
 | 
			
		||||
	// Reconcile actual object
 | 
			
		||||
	return r.reconcile(ctx, obj)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketReconciler) reconcile(ctx context.Context, bucket sourcev1.Bucket) (sourcev1.Bucket, error) {
 | 
			
		||||
	s3Client, err := r.auth(ctx, bucket)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err = fmt.Errorf("auth error: %w", err)
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.AuthenticationFailedReason, err.Error()), err
 | 
			
		||||
// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
 | 
			
		||||
// produces an error.
 | 
			
		||||
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
 | 
			
		||||
	// Mark the resource as under reconciliation
 | 
			
		||||
	conditions.MarkReconciling(obj, meta.ProgressingReason, "")
 | 
			
		||||
 | 
			
		||||
	// Reconcile the storage data
 | 
			
		||||
	if result, err := r.reconcileStorage(ctx, obj); err != nil || result.IsZero() {
 | 
			
		||||
		return result, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// create tmp dir
 | 
			
		||||
	tempDir, err := os.MkdirTemp("", bucket.Name)
 | 
			
		||||
	// Create temp working dir
 | 
			
		||||
	tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err = fmt.Errorf("tmp dir error: %w", err)
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason, "Failed to create temporary directory: %s", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	defer os.RemoveAll(tempDir)
 | 
			
		||||
	defer os.RemoveAll(tmpDir)
 | 
			
		||||
 | 
			
		||||
	ctxTimeout, cancel := context.WithTimeout(ctx, bucket.Spec.Timeout.Duration)
 | 
			
		||||
	// Reconcile the source from upstream
 | 
			
		||||
	var artifact sourcev1.Artifact
 | 
			
		||||
	if result, err := r.reconcileSource(ctx, obj, &artifact, tmpDir); err != nil || result.IsZero() {
 | 
			
		||||
		return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Reconcile the artifact to storage
 | 
			
		||||
	if result, err := r.reconcileArtifact(ctx, obj, artifact, tmpDir); err != nil || result.IsZero() {
 | 
			
		||||
		return result, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// reconcileStorage ensures the current state of the storage matches the desired and previously observed state.
 | 
			
		||||
//
 | 
			
		||||
// All artifacts for the resource except for the current one are garbage collected from the storage.
 | 
			
		||||
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
 | 
			
		||||
// If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set.
 | 
			
		||||
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
 | 
			
		||||
//
 | 
			
		||||
// The caller should assume a failure if an error is returned, or the Result is zero.
 | 
			
		||||
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
 | 
			
		||||
	// Garbage collect previous advertised artifact(s) from storage
 | 
			
		||||
	_ = r.garbageCollect(ctx, obj)
 | 
			
		||||
 | 
			
		||||
	// Determine if the advertised artifact is still in storage
 | 
			
		||||
	if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
 | 
			
		||||
		obj.Status.Artifact = nil
 | 
			
		||||
		obj.Status.URL = ""
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Record that we do not have an artifact
 | 
			
		||||
	if obj.GetArtifact() == nil {
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage")
 | 
			
		||||
		return ctrl.Result{Requeue: true}, nil
 | 
			
		||||
	}
 | 
			
		||||
	conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
 | 
			
		||||
 | 
			
		||||
	// Always update URLs to ensure hostname is up-to-date
 | 
			
		||||
	// TODO(hidde): we may want to send out an event only if we notice the URL has changed
 | 
			
		||||
	r.Storage.SetArtifactURL(obj.GetArtifact())
 | 
			
		||||
	obj.Status.URL = r.Storage.SetHostname(obj.Status.URL)
 | 
			
		||||
 | 
			
		||||
	return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// reconcileSource ensures the upstream bucket can be reached and downloaded using the declared configuration, and
 | 
			
		||||
// observes its state.
 | 
			
		||||
//
 | 
			
		||||
// The bucket contents are downloaded to the given dir using the defined configuration, while taking ignore rules into
 | 
			
		||||
// account. In case of an error during the download process (including transient errors), it records
 | 
			
		||||
// v1beta1.FetchFailedCondition=True and returns early.
 | 
			
		||||
// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to
 | 
			
		||||
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
 | 
			
		||||
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
 | 
			
		||||
//
 | 
			
		||||
// The caller should assume a failure if an error is returned, or the Result is zero.
 | 
			
		||||
func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (ctrl.Result, error) {
 | 
			
		||||
	// Attempt to retrieve secret if one is configured
 | 
			
		||||
	var secret *corev1.Secret
 | 
			
		||||
	if obj.Spec.SecretRef != nil {
 | 
			
		||||
		secret = &corev1.Secret{}
 | 
			
		||||
		name := types.NamespacedName{
 | 
			
		||||
			Namespace: obj.GetNamespace(),
 | 
			
		||||
			Name:      obj.Spec.SecretRef.Name,
 | 
			
		||||
		}
 | 
			
		||||
		if err := r.Client.Get(ctx, name, secret); err != nil {
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
				"Failed to get secret '%s': %s", name.String(), err.Error())
 | 
			
		||||
			r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
				"Failed to get secret '%s': %s", name.String(), err.Error())
 | 
			
		||||
			// Return error as the world as observed may change
 | 
			
		||||
			return ctrl.Result{}, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Build the client with the configuration from the object and secret
 | 
			
		||||
	s3Client, err := r.buildClient(obj, secret)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
			"Failed to construct S3 client: %s", err.Error())
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
			"Failed to construct S3 client: %s", err.Error())
 | 
			
		||||
		// Return error as the contents of the secret may change
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Confirm bucket exists
 | 
			
		||||
	ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	exists, err := s3Client.BucketExists(ctxTimeout, bucket.Spec.BucketName)
 | 
			
		||||
	exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error())
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if !exists {
 | 
			
		||||
		err = fmt.Errorf("bucket '%s' not found", bucket.Spec.BucketName)
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
			"Bucket '%s' does not exist", obj.Spec.BucketName)
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
			"Bucket '%s' does not exist", obj.Spec.BucketName)
 | 
			
		||||
		return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Look for file with ignore rules first
 | 
			
		||||
	// NB: S3 has flat filepath keys making it impossible to look
 | 
			
		||||
	// for files in "subdirectories" without building up a tree first.
 | 
			
		||||
	path := filepath.Join(tempDir, sourceignore.IgnoreFile)
 | 
			
		||||
	if err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
 | 
			
		||||
	path := filepath.Join(dir, sourceignore.IgnoreFile)
 | 
			
		||||
	if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
 | 
			
		||||
		if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
 | 
			
		||||
			return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
				"Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
 | 
			
		||||
			r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
				"Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
 | 
			
		||||
			return ctrl.Result{}, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	ps, err := sourceignore.ReadIgnoreFile(path, nil)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
			"Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
			"Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	// In-spec patterns take precedence
 | 
			
		||||
	if bucket.Spec.Ignore != nil {
 | 
			
		||||
		ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*bucket.Spec.Ignore), nil)...)
 | 
			
		||||
	if obj.Spec.Ignore != nil {
 | 
			
		||||
		ps = append(ps, sourceignore.ReadPatterns(strings.NewReader(*obj.Spec.Ignore), nil)...)
 | 
			
		||||
	}
 | 
			
		||||
	matcher := sourceignore.NewMatcher(ps)
 | 
			
		||||
 | 
			
		||||
	// download bucket content
 | 
			
		||||
	for object := range s3Client.ListObjects(ctxTimeout, bucket.Spec.BucketName, minio.ListObjectsOptions{
 | 
			
		||||
	// Build up an index of object keys and their etags
 | 
			
		||||
	// As the keys define the paths and the etags represent a change in file contents, this should be sufficient to
 | 
			
		||||
	// detect both structural and file changes
 | 
			
		||||
	index := map[string]string{}
 | 
			
		||||
	for object := range s3Client.ListObjects(ctxTimeout, obj.Spec.BucketName, minio.ListObjectsOptions{
 | 
			
		||||
		Recursive: true,
 | 
			
		||||
		UseV1:     s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
 | 
			
		||||
	}) {
 | 
			
		||||
		if object.Err != nil {
 | 
			
		||||
			err = fmt.Errorf("listing objects from bucket '%s' failed: %w", bucket.Spec.BucketName, object.Err)
 | 
			
		||||
			return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
 | 
			
		||||
		if err = object.Err; err != nil {
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
				"Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
 | 
			
		||||
			r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
				"Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
 | 
			
		||||
			return ctrl.Result{}, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Ignore directories and the .sourceignore file
 | 
			
		||||
		if strings.HasSuffix(object.Key, "/") || object.Key == sourceignore.IgnoreFile {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Ignore matches
 | 
			
		||||
		if matcher.Match(strings.Split(object.Key, "/"), false) {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		index[object.Key] = object.ETag
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
		localPath := filepath.Join(tempDir, object.Key)
 | 
			
		||||
		err := s3Client.FGetObject(ctxTimeout, bucket.Spec.BucketName, object.Key, localPath, minio.GetObjectOptions{})
 | 
			
		||||
	// Calculate revision checksum from the collected index values
 | 
			
		||||
	revision, err := r.revision(index)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
			err = fmt.Errorf("downloading object from bucket '%s' failed: %w", bucket.Spec.BucketName, err)
 | 
			
		||||
			return sourcev1.BucketNotReady(bucket, sourcev1.BucketOperationFailedReason, err.Error()), err
 | 
			
		||||
		}
 | 
			
		||||
		err = fmt.Errorf("failed to calculate revision for index: %w", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	revision, err := r.checksum(tempDir)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
 | 
			
		||||
	if !obj.GetArtifact().HasRevision(revision) {
 | 
			
		||||
		// Mark observations about the revision on the object
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision",
 | 
			
		||||
			"New upstream revision '%s'", revision)
 | 
			
		||||
 | 
			
		||||
		// Download the files in parallel, but with a limited number of workers
 | 
			
		||||
		group, groupCtx := errgroup.WithContext(ctx)
 | 
			
		||||
		group.Go(func() error {
 | 
			
		||||
			const workers = 4
 | 
			
		||||
			sem := semaphore.NewWeighted(workers)
 | 
			
		||||
			for key := range index {
 | 
			
		||||
				k := key
 | 
			
		||||
				if err := sem.Acquire(groupCtx, 1); err != nil {
 | 
			
		||||
					return err
 | 
			
		||||
				}
 | 
			
		||||
				group.Go(func() error {
 | 
			
		||||
					defer sem.Release(1)
 | 
			
		||||
					localPath := filepath.Join(dir, k)
 | 
			
		||||
					if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, k, localPath, minio.GetObjectOptions{}); err != nil {
 | 
			
		||||
						return fmt.Errorf("failed to get '%s' file: %w", k, err)
 | 
			
		||||
					}
 | 
			
		||||
					return nil
 | 
			
		||||
				})
 | 
			
		||||
			}
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
		if err = group.Wait(); err != nil {
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
				"Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
 | 
			
		||||
			r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.BucketOperationFailedReason,
 | 
			
		||||
				"Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
 | 
			
		||||
			return ctrl.Result{}, err
 | 
			
		||||
		}
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityInfo, sourcev1.BucketOperationSucceedReason,
 | 
			
		||||
			"Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision)
 | 
			
		||||
	}
 | 
			
		||||
	conditions.Delete(obj, sourcev1.FetchFailedCondition)
 | 
			
		||||
 | 
			
		||||
	// Create potential new artifact
 | 
			
		||||
	*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
 | 
			
		||||
	return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// reconcileArtifact archives a new artifact to the storage, if the current observation on the object does not match the
 | 
			
		||||
// given data.
 | 
			
		||||
//
 | 
			
		||||
// The inspection of the given data to the object is differed, ensuring any stale observations as
 | 
			
		||||
// v1beta1.ArtifactUnavailableCondition and v1beta1.ArtifactOutdatedCondition are always deleted.
 | 
			
		||||
// If the given artifact does not differ from the object's current, it returns early.
 | 
			
		||||
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
 | 
			
		||||
// updated to its path.
 | 
			
		||||
//
 | 
			
		||||
// The caller should assume a failure if an error is returned, or the Result is zero.
 | 
			
		||||
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) (ctrl.Result, error) {
 | 
			
		||||
	// Always restore the Ready condition in case it got removed due to a transient error
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if obj.GetArtifact() != nil {
 | 
			
		||||
			conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
 | 
			
		||||
		}
 | 
			
		||||
		if obj.GetArtifact().HasRevision(artifact.Revision) {
 | 
			
		||||
			conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
 | 
			
		||||
			conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason,
 | 
			
		||||
				"Stored artifact for revision '%s'", artifact.Revision)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	// The artifact is up-to-date
 | 
			
		||||
	if obj.GetArtifact().HasRevision(artifact.Revision) {
 | 
			
		||||
		ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("Already up to date, current revision '%s'", artifact.Revision))
 | 
			
		||||
		return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// return early on unchanged revision
 | 
			
		||||
	artifact := r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), revision, fmt.Sprintf("%s.tar.gz", revision))
 | 
			
		||||
	if apimeta.IsStatusConditionTrue(bucket.Status.Conditions, meta.ReadyCondition) && bucket.GetArtifact().HasRevision(artifact.Revision) {
 | 
			
		||||
		if artifact.URL != bucket.GetArtifact().URL {
 | 
			
		||||
			r.Storage.SetArtifactURL(bucket.GetArtifact())
 | 
			
		||||
			bucket.Status.URL = r.Storage.SetHostname(bucket.Status.URL)
 | 
			
		||||
		}
 | 
			
		||||
		return bucket, nil
 | 
			
		||||
	// Ensure target path exists and is a directory
 | 
			
		||||
	if f, err := os.Stat(dir); err != nil {
 | 
			
		||||
		err = fmt.Errorf("failed to stat target path: %w", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	} else if !f.IsDir() {
 | 
			
		||||
		err = fmt.Errorf("invalid target path: '%s' is not a directory", dir)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// create artifact dir
 | 
			
		||||
	err = r.Storage.MkdirAll(artifact)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err = fmt.Errorf("mkdir dir error: %w", err)
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
 | 
			
		||||
	// Ensure artifact directory exists and acquire lock
 | 
			
		||||
	if err := r.Storage.MkdirAll(artifact); err != nil {
 | 
			
		||||
		err = fmt.Errorf("failed to create artifact directory: %w", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// acquire lock
 | 
			
		||||
	unlock, err := r.Storage.Lock(artifact)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err = fmt.Errorf("unable to acquire lock: %w", err)
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
 | 
			
		||||
		err = fmt.Errorf("failed to acquire lock for artifact: %w", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	defer unlock()
 | 
			
		||||
 | 
			
		||||
	// archive artifact and check integrity
 | 
			
		||||
	if err := r.Storage.Archive(&artifact, tempDir, nil); err != nil {
 | 
			
		||||
		err = fmt.Errorf("storage archive error: %w", err)
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
 | 
			
		||||
	// Archive directory to storage
 | 
			
		||||
	if err := r.Storage.Archive(&artifact, dir, nil); err != nil {
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason,
 | 
			
		||||
			"Unable to archive artifact to storage: %s", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	r.Events.EventWithMetaf(ctx, obj, map[string]string{
 | 
			
		||||
		"revision": artifact.Revision,
 | 
			
		||||
		"checksum": artifact.Checksum,
 | 
			
		||||
	}, events.EventSeverityInfo, "NewArtifact", "Stored artifact for revision '%s'", artifact.Revision)
 | 
			
		||||
 | 
			
		||||
	// update latest symlink
 | 
			
		||||
	// Record it on the object
 | 
			
		||||
	obj.Status.Artifact = artifact.DeepCopy()
 | 
			
		||||
 | 
			
		||||
	// Update symlink on a "best effort" basis
 | 
			
		||||
	url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		err = fmt.Errorf("storage symlink error: %w", err)
 | 
			
		||||
		return sourcev1.BucketNotReady(bucket, sourcev1.StorageOperationFailedReason, err.Error()), err
 | 
			
		||||
		r.Events.Eventf(ctx, obj, events.EventSeverityError, sourcev1.StorageOperationFailedReason,
 | 
			
		||||
			"Failed to update status URL symlink: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
 | 
			
		||||
	return sourcev1.BucketReady(bucket, artifact, url, sourcev1.BucketOperationSucceedReason, message), nil
 | 
			
		||||
	if url != "" {
 | 
			
		||||
		obj.Status.URL = url
 | 
			
		||||
	}
 | 
			
		||||
	return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketReconciler) reconcileDelete(ctx context.Context, bucket sourcev1.Bucket) (ctrl.Result, error) {
 | 
			
		||||
	if err := r.gc(bucket); err != nil {
 | 
			
		||||
		r.event(ctx, bucket, events.EventSeverityError,
 | 
			
		||||
			fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
 | 
			
		||||
// reconcileDelete handles the deletion of an object. It first garbage collects all artifacts for the object from the
 | 
			
		||||
// artifact storage, if successful, the finalizer is removed from the object.
 | 
			
		||||
func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
 | 
			
		||||
	// Garbage collect the resource's artifacts
 | 
			
		||||
	if err := r.garbageCollect(ctx, obj); err != nil {
 | 
			
		||||
		// Return the error so we retry the failed garbage collection
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Record deleted status
 | 
			
		||||
	r.recordReadiness(ctx, bucket)
 | 
			
		||||
 | 
			
		||||
	// Remove our finalizer from the list and update it
 | 
			
		||||
	controllerutil.RemoveFinalizer(&bucket, sourcev1.SourceFinalizer)
 | 
			
		||||
	if err := r.Update(ctx, &bucket); err != nil {
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	// Remove our finalizer from the list
 | 
			
		||||
	controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
 | 
			
		||||
 | 
			
		||||
	// Stop reconciliation as the object is being deleted
 | 
			
		||||
	return ctrl.Result{}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketReconciler) auth(ctx context.Context, bucket sourcev1.Bucket) (*minio.Client, error) {
 | 
			
		||||
	opt := minio.Options{
 | 
			
		||||
		Region: bucket.Spec.Region,
 | 
			
		||||
		Secure: !bucket.Spec.Insecure,
 | 
			
		||||
// garbageCollect performs a garbage collection for the given v1beta1.Bucket. It removes all but the current
 | 
			
		||||
// artifact except for when the deletion timestamp is set, which will result in the removal of all artifacts for the
 | 
			
		||||
// resource.
 | 
			
		||||
func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Bucket) error {
 | 
			
		||||
	if !obj.DeletionTimestamp.IsZero() {
 | 
			
		||||
		if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil {
 | 
			
		||||
			r.Eventf(ctx, obj, events.EventSeverityError, "GarbageCollectionFailed",
 | 
			
		||||
				"Garbage collection for deleted resource failed: %s", err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	if bucket.Spec.SecretRef != nil {
 | 
			
		||||
		secretName := types.NamespacedName{
 | 
			
		||||
			Namespace: bucket.GetNamespace(),
 | 
			
		||||
			Name:      bucket.Spec.SecretRef.Name,
 | 
			
		||||
		obj.Status.Artifact = nil
 | 
			
		||||
		// TODO(hidde): we should only push this event if we actually garbage collected something
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityInfo, "GarbageCollectionSucceeded",
 | 
			
		||||
			"Garbage collected artifacts for deleted resource")
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
		var secret corev1.Secret
 | 
			
		||||
		if err := r.Get(ctx, secretName, &secret); err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("credentials secret error: %w", err)
 | 
			
		||||
	if obj.GetArtifact() != nil {
 | 
			
		||||
		if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
 | 
			
		||||
			r.Eventf(ctx, obj, events.EventSeverityError, "GarbageCollectionFailed", "Garbage collection of old artifacts failed: %s", err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		accesskey := ""
 | 
			
		||||
		secretkey := ""
 | 
			
		||||
		if k, ok := secret.Data["accesskey"]; ok {
 | 
			
		||||
			accesskey = string(k)
 | 
			
		||||
		// TODO(hidde): we should only push this event if we actually garbage collected something
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityInfo, "GarbageCollectionSucceeded", "Garbage collected old artifacts")
 | 
			
		||||
	}
 | 
			
		||||
		if k, ok := secret.Data["secretkey"]; ok {
 | 
			
		||||
			secretkey = string(k)
 | 
			
		||||
		}
 | 
			
		||||
		if accesskey == "" || secretkey == "" {
 | 
			
		||||
			return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
 | 
			
		||||
		}
 | 
			
		||||
		opt.Creds = credentials.NewStaticV4(accesskey, secretkey, "")
 | 
			
		||||
	} else if bucket.Spec.Provider == sourcev1.AmazonBucketProvider {
 | 
			
		||||
		opt.Creds = credentials.NewIAM("")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if opt.Creds == nil {
 | 
			
		||||
		return nil, fmt.Errorf("no bucket credentials found")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return minio.New(bucket.Spec.Endpoint, &opt)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// checksum calculates the SHA1 checksum of the given root directory.
 | 
			
		||||
// It traverses the given root directory and calculates the checksum for any found file, and returns the SHA1 sum of the
 | 
			
		||||
// list with relative file paths and their checksums.
 | 
			
		||||
func (r *BucketReconciler) checksum(root string) (string, error) {
 | 
			
		||||
	sum := sha1.New()
 | 
			
		||||
	if err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
// buildClient constructs a minio.Client with the data from the given object and secret.
 | 
			
		||||
// It returns an error if the given Secret does not have the required fields, or if there is no credential handler
 | 
			
		||||
// configured.
 | 
			
		||||
func (r *BucketReconciler) buildClient(obj *sourcev1.Bucket, secret *corev1.Secret) (*minio.Client, error) {
 | 
			
		||||
	opts := minio.Options{
 | 
			
		||||
		Region: obj.Spec.Region,
 | 
			
		||||
		Secure: !obj.Spec.Insecure,
 | 
			
		||||
	}
 | 
			
		||||
		if !info.Mode().IsRegular() {
 | 
			
		||||
			return nil
 | 
			
		||||
	if secret != nil {
 | 
			
		||||
		var accessKey, secretKey string
 | 
			
		||||
		if k, ok := secret.Data["accesskey"]; ok {
 | 
			
		||||
			accessKey = string(k)
 | 
			
		||||
		}
 | 
			
		||||
		data, err := os.ReadFile(path)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		if k, ok := secret.Data["secretkey"]; ok {
 | 
			
		||||
			secretKey = string(k)
 | 
			
		||||
		}
 | 
			
		||||
		relPath, err := filepath.Rel(root, path)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		if accessKey == "" || secretKey == "" {
 | 
			
		||||
			return nil, fmt.Errorf("invalid '%s' secret data: required fields 'accesskey' and 'secretkey'", secret.Name)
 | 
			
		||||
		}
 | 
			
		||||
		sum.Write([]byte(fmt.Sprintf("%x  %s\n", sha1.Sum(data), relPath)))
 | 
			
		||||
		return nil
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		opts.Creds = credentials.NewStaticV4(accessKey, secretKey, "")
 | 
			
		||||
	} else if obj.Spec.Provider == sourcev1.AmazonBucketProvider {
 | 
			
		||||
		opts.Creds = credentials.NewIAM("")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return minio.New(obj.Spec.Endpoint, &opts)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// revision calculates the SHA256 checksum of the given string map.
 | 
			
		||||
// The keys are sorted to ensure a stable order, and the SHA256 sum is then calculated for the string representations of
 | 
			
		||||
// the key/value pairs, each pair written on a newline. The sum result is returned as a string.
 | 
			
		||||
func (r *BucketReconciler) revision(list map[string]string) (string, error) {
 | 
			
		||||
	keyIndex := make([]string, 0, len(list))
 | 
			
		||||
	for k := range list {
 | 
			
		||||
		keyIndex = append(keyIndex, k)
 | 
			
		||||
	}
 | 
			
		||||
	sort.Strings(keyIndex)
 | 
			
		||||
	sum := sha256.New()
 | 
			
		||||
	for _, k := range keyIndex {
 | 
			
		||||
		if _, err := sum.Write([]byte(fmt.Sprintf("%s  %s\n", k, list[k]))); err != nil {
 | 
			
		||||
			return "", err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return fmt.Sprintf("%x", sum.Sum(nil)), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// resetStatus returns a modified v1beta1.Bucket and a boolean indicating
 | 
			
		||||
// if the status field has been reset.
 | 
			
		||||
func (r *BucketReconciler) resetStatus(bucket sourcev1.Bucket) (sourcev1.Bucket, bool) {
 | 
			
		||||
	// We do not have an artifact, or it does no longer exist
 | 
			
		||||
	if bucket.GetArtifact() == nil || !r.Storage.ArtifactExist(*bucket.GetArtifact()) {
 | 
			
		||||
		bucket = sourcev1.BucketProgressing(bucket)
 | 
			
		||||
		bucket.Status.Artifact = nil
 | 
			
		||||
		return bucket, true
 | 
			
		||||
	}
 | 
			
		||||
	if bucket.Generation != bucket.Status.ObservedGeneration {
 | 
			
		||||
		return sourcev1.BucketProgressing(bucket), true
 | 
			
		||||
	}
 | 
			
		||||
	return bucket, false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// gc performs a garbage collection for the given v1beta1.Bucket.
 | 
			
		||||
// It removes all but the current artifact except for when the
 | 
			
		||||
// deletion timestamp is set, which will result in the removal of
 | 
			
		||||
// all artifacts for the resource.
 | 
			
		||||
func (r *BucketReconciler) gc(bucket sourcev1.Bucket) error {
 | 
			
		||||
	if !bucket.DeletionTimestamp.IsZero() {
 | 
			
		||||
		return r.Storage.RemoveAll(r.Storage.NewArtifactFor(bucket.Kind, bucket.GetObjectMeta(), "", "*"))
 | 
			
		||||
	}
 | 
			
		||||
	if bucket.GetArtifact() != nil {
 | 
			
		||||
		return r.Storage.RemoveAllButCurrent(*bucket.GetArtifact())
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// event emits a Kubernetes event and forwards the event to notification controller if configured
 | 
			
		||||
func (r *BucketReconciler) event(ctx context.Context, bucket sourcev1.Bucket, severity, msg string) {
 | 
			
		||||
	log := logr.FromContext(ctx)
 | 
			
		||||
	if r.EventRecorder != nil {
 | 
			
		||||
		r.EventRecorder.Eventf(&bucket, "Normal", severity, msg)
 | 
			
		||||
	}
 | 
			
		||||
	if r.ExternalEventRecorder != nil {
 | 
			
		||||
		objRef, err := reference.GetReference(r.Scheme, &bucket)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Error(err, "unable to send event")
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
 | 
			
		||||
			log.Error(err, "unable to send event")
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketReconciler) recordReadiness(ctx context.Context, bucket sourcev1.Bucket) {
 | 
			
		||||
	log := logr.FromContext(ctx)
 | 
			
		||||
	if r.MetricsRecorder == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	objRef, err := reference.GetReference(r.Scheme, &bucket)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error(err, "unable to record readiness metric")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if rc := apimeta.FindStatusCondition(bucket.Status.Conditions, meta.ReadyCondition); rc != nil {
 | 
			
		||||
		r.MetricsRecorder.RecordCondition(*objRef, *rc, !bucket.DeletionTimestamp.IsZero())
 | 
			
		||||
	} else {
 | 
			
		||||
		r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
 | 
			
		||||
			Type:   meta.ReadyCondition,
 | 
			
		||||
			Status: metav1.ConditionUnknown,
 | 
			
		||||
		}, !bucket.DeletionTimestamp.IsZero())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketReconciler) recordSuspension(ctx context.Context, bucket sourcev1.Bucket) {
 | 
			
		||||
	if r.MetricsRecorder == nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	log := logr.FromContext(ctx)
 | 
			
		||||
 | 
			
		||||
	objRef, err := reference.GetReference(r.Scheme, &bucket)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Error(err, "unable to record suspended metric")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !bucket.DeletionTimestamp.IsZero() {
 | 
			
		||||
		r.MetricsRecorder.RecordSuspend(*objRef, false)
 | 
			
		||||
	} else {
 | 
			
		||||
		r.MetricsRecorder.RecordSuspend(*objRef, bucket.Spec.Suspend)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *BucketReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.BucketStatus) error {
 | 
			
		||||
	var bucket sourcev1.Bucket
 | 
			
		||||
	if err := r.Get(ctx, req.NamespacedName, &bucket); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	patch := client.MergeFrom(bucket.DeepCopy())
 | 
			
		||||
	bucket.Status = newStatus
 | 
			
		||||
 | 
			
		||||
	return r.Status().Patch(ctx, &bucket, patch)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,59 +17,632 @@ limitations under the License.
 | 
			
		|||
package controllers
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"crypto/md5"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/httptest"
 | 
			
		||||
	"net/url"
 | 
			
		||||
	"os"
 | 
			
		||||
	"path"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/go-logr/logr"
 | 
			
		||||
	. "github.com/onsi/gomega"
 | 
			
		||||
	corev1 "k8s.io/api/core/v1"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	ctrl "sigs.k8s.io/controller-runtime"
 | 
			
		||||
	"sigs.k8s.io/controller-runtime/pkg/client"
 | 
			
		||||
	fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
 | 
			
		||||
	"sigs.k8s.io/controller-runtime/pkg/log"
 | 
			
		||||
 | 
			
		||||
	"github.com/fluxcd/pkg/apis/meta"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/conditions"
 | 
			
		||||
 | 
			
		||||
	sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestBucketReconciler_checksum(t *testing.T) {
 | 
			
		||||
func TestBucketReconciler_Reconcile(t *testing.T) {
 | 
			
		||||
	g := NewWithT(t)
 | 
			
		||||
 | 
			
		||||
	s3Server := newS3Server("test-bucket")
 | 
			
		||||
	s3Server.Objects = []*s3MockObject{
 | 
			
		||||
		{
 | 
			
		||||
			Key:          "test.yaml",
 | 
			
		||||
			Content:      []byte("test"),
 | 
			
		||||
			ContentType:  "text/plain",
 | 
			
		||||
			LastModified: time.Now(),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	s3Server.Start()
 | 
			
		||||
	defer s3Server.Stop()
 | 
			
		||||
 | 
			
		||||
	g.Expect(s3Server.HTTPAddress()).ToNot(BeEmpty())
 | 
			
		||||
	u, err := url.Parse(s3Server.HTTPAddress())
 | 
			
		||||
	g.Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
	secret := &corev1.Secret{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			GenerateName: "bucket-reconcile-",
 | 
			
		||||
			Namespace:    "default",
 | 
			
		||||
		},
 | 
			
		||||
		Data: map[string][]byte{
 | 
			
		||||
			"accesskey": []byte("key"),
 | 
			
		||||
			"secretkey": []byte("secret"),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	g.Expect(testEnv.Create(ctx, secret)).To(Succeed())
 | 
			
		||||
	defer testEnv.Delete(ctx, secret)
 | 
			
		||||
 | 
			
		||||
	obj := &sourcev1.Bucket{
 | 
			
		||||
		ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
			GenerateName: "bucket-reconcile-",
 | 
			
		||||
			Namespace:    "default",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: sourcev1.BucketSpec{
 | 
			
		||||
			Provider:   "generic",
 | 
			
		||||
			BucketName: s3Server.BucketName,
 | 
			
		||||
			Endpoint:   u.Host,
 | 
			
		||||
			Insecure:   true,
 | 
			
		||||
			Interval:   metav1.Duration{Duration: interval},
 | 
			
		||||
			Timeout:    &metav1.Duration{Duration: timeout},
 | 
			
		||||
			SecretRef: &meta.LocalObjectReference{
 | 
			
		||||
				Name: secret.Name,
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	g.Expect(testEnv.Create(ctx, obj)).To(Succeed())
 | 
			
		||||
 | 
			
		||||
	key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}
 | 
			
		||||
 | 
			
		||||
	// Wait for finalizer to be set
 | 
			
		||||
	g.Eventually(func() bool {
 | 
			
		||||
		if err := testEnv.Get(ctx, key, obj); err != nil {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
		return len(obj.Finalizers) > 0
 | 
			
		||||
	}, timeout).Should(BeTrue())
 | 
			
		||||
 | 
			
		||||
	// Wait for Bucket to be Ready
 | 
			
		||||
	g.Eventually(func() bool {
 | 
			
		||||
		if err := testEnv.Get(ctx, key, obj); err != nil {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
		if !conditions.IsReady(obj) || obj.Status.Artifact == nil {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
		readyCondition := conditions.Get(obj, meta.ReadyCondition)
 | 
			
		||||
		return obj.Generation == readyCondition.ObservedGeneration &&
 | 
			
		||||
			obj.Generation == obj.Status.ObservedGeneration
 | 
			
		||||
	}, timeout).Should(BeTrue())
 | 
			
		||||
 | 
			
		||||
	g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
 | 
			
		||||
 | 
			
		||||
	// Wait for Bucket to be deleted
 | 
			
		||||
	g.Eventually(func() bool {
 | 
			
		||||
		if err := testEnv.Get(ctx, key, obj); err != nil {
 | 
			
		||||
			return apierrors.IsNotFound(err)
 | 
			
		||||
		}
 | 
			
		||||
		return false
 | 
			
		||||
	}, timeout).Should(BeTrue())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBucketReconciler_reconcileStorage(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name             string
 | 
			
		||||
		beforeFunc func(root string)
 | 
			
		||||
		want       string
 | 
			
		||||
		beforeFunc       func(obj *sourcev1.Bucket, storage *Storage) error
 | 
			
		||||
		want             ctrl.Result
 | 
			
		||||
		wantErr          bool
 | 
			
		||||
		assertArtifact   *sourcev1.Artifact
 | 
			
		||||
		assertConditions []metav1.Condition
 | 
			
		||||
		assertPaths      []string
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "empty root",
 | 
			
		||||
			want: "da39a3ee5e6b4b0d3255bfef95601890afd80709",
 | 
			
		||||
			name: "garbage collects",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
 | 
			
		||||
				revisions := []string{"a", "b", "c"}
 | 
			
		||||
				for n := range revisions {
 | 
			
		||||
					v := revisions[n]
 | 
			
		||||
					obj.Status.Artifact = &sourcev1.Artifact{
 | 
			
		||||
						Path:     fmt.Sprintf("/reconcile-storage/%s.txt", v),
 | 
			
		||||
						Revision: v,
 | 
			
		||||
					}
 | 
			
		||||
					if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
					if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				testStorage.SetArtifactURL(obj.Status.Artifact)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
			assertArtifact: &sourcev1.Artifact{
 | 
			
		||||
				Path:     "/reconcile-storage/c.txt",
 | 
			
		||||
				Revision: "c",
 | 
			
		||||
				Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
 | 
			
		||||
				URL:      testStorage.Hostname + "/reconcile-storage/c.txt",
 | 
			
		||||
			},
 | 
			
		||||
			assertPaths: []string{
 | 
			
		||||
				"/reconcile-storage/c.txt",
 | 
			
		||||
				"!/reconcile-storage/b.txt",
 | 
			
		||||
				"!/reconcile-storage/a.txt",
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "with file",
 | 
			
		||||
			beforeFunc: func(root string) {
 | 
			
		||||
				mockFile(root, "a/b/c.txt", "a dummy string")
 | 
			
		||||
			name: "notices missing artifact in storage",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
 | 
			
		||||
				obj.Status.Artifact = &sourcev1.Artifact{
 | 
			
		||||
					Path:     fmt.Sprintf("/reconcile-storage/invalid.txt"),
 | 
			
		||||
					Revision: "d",
 | 
			
		||||
				}
 | 
			
		||||
				testStorage.SetArtifactURL(obj.Status.Artifact)
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
			want: ctrl.Result{Requeue: true},
 | 
			
		||||
			assertPaths: []string{
 | 
			
		||||
				"!/reconcile-storage/invalid.txt",
 | 
			
		||||
			},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage"),
 | 
			
		||||
			},
 | 
			
		||||
			want: "309a5e6e96b4a7eea0d1cfaabf1be8ec1c063fa0",
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "with file in different path",
 | 
			
		||||
			beforeFunc: func(root string) {
 | 
			
		||||
				mockFile(root, "a/b.txt", "a dummy string")
 | 
			
		||||
			name: "updates hostname on diff from current",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.Bucket, storage *Storage) error {
 | 
			
		||||
				obj.Status.Artifact = &sourcev1.Artifact{
 | 
			
		||||
					Path:     fmt.Sprintf("/reconcile-storage/hostname.txt"),
 | 
			
		||||
					Revision: "f",
 | 
			
		||||
					Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80",
 | 
			
		||||
					URL:      "http://outdated.com/reconcile-storage/hostname.txt",
 | 
			
		||||
				}
 | 
			
		||||
				if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
 | 
			
		||||
					return err
 | 
			
		||||
				}
 | 
			
		||||
				if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
 | 
			
		||||
					return err
 | 
			
		||||
				}
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
			assertPaths: []string{
 | 
			
		||||
				"/reconcile-storage/hostname.txt",
 | 
			
		||||
			},
 | 
			
		||||
			assertArtifact: &sourcev1.Artifact{
 | 
			
		||||
				Path:     "/reconcile-storage/hostname.txt",
 | 
			
		||||
				Revision: "f",
 | 
			
		||||
				Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80",
 | 
			
		||||
				URL:      testStorage.Hostname + "/reconcile-storage/hostname.txt",
 | 
			
		||||
			},
 | 
			
		||||
			want: "e28c62b5cc488849950c4355dddc5523712616d4",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			root, err := os.MkdirTemp("", "bucket-checksum-")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			g := NewWithT(t)
 | 
			
		||||
 | 
			
		||||
			r := &BucketReconciler{
 | 
			
		||||
				Storage: testStorage,
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			obj := &sourcev1.Bucket{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					GenerateName: "test-",
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
			defer os.RemoveAll(root)
 | 
			
		||||
			if tt.beforeFunc != nil {
 | 
			
		||||
				tt.beforeFunc(root)
 | 
			
		||||
				g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
 | 
			
		||||
			}
 | 
			
		||||
			got, err := (&BucketReconciler{}).checksum(root)
 | 
			
		||||
			if (err != nil) != tt.wantErr {
 | 
			
		||||
				t.Errorf("checksum() error = %v, wantErr %v", err, tt.wantErr)
 | 
			
		||||
				return
 | 
			
		||||
 | 
			
		||||
			got, err := r.reconcileStorage(context.TODO(), obj)
 | 
			
		||||
			g.Expect(err != nil).To(Equal(tt.wantErr))
 | 
			
		||||
			g.Expect(got).To(Equal(tt.want))
 | 
			
		||||
 | 
			
		||||
			g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact))
 | 
			
		||||
			if tt.assertArtifact != nil && tt.assertArtifact.URL != "" {
 | 
			
		||||
				g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL))
 | 
			
		||||
			}
 | 
			
		||||
			if got != tt.want {
 | 
			
		||||
				t.Errorf("checksum() got = %v, want %v", got, tt.want)
 | 
			
		||||
			g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
 | 
			
		||||
 | 
			
		||||
			for _, p := range tt.assertPaths {
 | 
			
		||||
				absoluteP := filepath.Join(testStorage.BasePath, p)
 | 
			
		||||
				if !strings.HasPrefix(p, "!") {
 | 
			
		||||
					g.Expect(absoluteP).To(BeAnExistingFile())
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				g.Expect(absoluteP).NotTo(BeAnExistingFile())
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBucketReconciler_reconcileSource(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name             string
 | 
			
		||||
		bucketName       string
 | 
			
		||||
		bucketObjects    []*s3MockObject
 | 
			
		||||
		middleware       http.Handler
 | 
			
		||||
		secret           *corev1.Secret
 | 
			
		||||
		beforeFunc       func(obj *sourcev1.Bucket)
 | 
			
		||||
		want             ctrl.Result
 | 
			
		||||
		wantErr          bool
 | 
			
		||||
		assertArtifact   sourcev1.Artifact
 | 
			
		||||
		assertConditions []metav1.Condition
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name:       "reconciles source",
 | 
			
		||||
			bucketName: "dummy",
 | 
			
		||||
			bucketObjects: []*s3MockObject{
 | 
			
		||||
				{
 | 
			
		||||
					Key:          "test.txt",
 | 
			
		||||
					Content:      []byte("test"),
 | 
			
		||||
					ContentType:  "text/plain",
 | 
			
		||||
					LastModified: time.Now(),
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			assertArtifact: sourcev1.Artifact{
 | 
			
		||||
				Path:     "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz",
 | 
			
		||||
				Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8",
 | 
			
		||||
			},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		// TODO(hidde): middleware for mock server
 | 
			
		||||
		//{
 | 
			
		||||
		//	name: "authenticates using secretRef",
 | 
			
		||||
		//	bucketName: "dummy",
 | 
			
		||||
		//},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "observes non-existing secretRef",
 | 
			
		||||
			bucketName: "dummy",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.Bucket) {
 | 
			
		||||
				obj.Spec.SecretRef = &meta.LocalObjectReference{
 | 
			
		||||
					Name: "dummy",
 | 
			
		||||
				}
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/dummy': secrets \"dummy\" not found"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "observes invalid secretRef",
 | 
			
		||||
			bucketName: "dummy",
 | 
			
		||||
			secret: &corev1.Secret{
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "dummy",
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.Bucket) {
 | 
			
		||||
				obj.Spec.SecretRef = &meta.LocalObjectReference{
 | 
			
		||||
					Name: "dummy",
 | 
			
		||||
				}
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to construct S3 client: invalid 'dummy' secret data: required fields"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name:       "observes non-existing bucket name",
 | 
			
		||||
			bucketName: "dummy",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.Bucket) {
 | 
			
		||||
				obj.Spec.BucketName = "invalid"
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Bucket 'invalid' does not exist"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "transient bucket name API failure",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.Bucket) {
 | 
			
		||||
				obj.Spec.Endpoint = "transient.example.com"
 | 
			
		||||
				obj.Spec.BucketName = "unavailable"
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket 'unavailable'"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			// TODO(hidde): test the lesser happy paths
 | 
			
		||||
			name:       ".sourceignore",
 | 
			
		||||
			bucketName: "dummy",
 | 
			
		||||
			bucketObjects: []*s3MockObject{
 | 
			
		||||
				{
 | 
			
		||||
					Key:          ".sourceignore",
 | 
			
		||||
					Content:      []byte("ignored/file.txt"),
 | 
			
		||||
					ContentType:  "text/plain",
 | 
			
		||||
					LastModified: time.Now(),
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					Key:          "ignored/file.txt",
 | 
			
		||||
					Content:      []byte("ignored/file.txt"),
 | 
			
		||||
					ContentType:  "text/plain",
 | 
			
		||||
					LastModified: time.Now(),
 | 
			
		||||
				},
 | 
			
		||||
				{
 | 
			
		||||
					Key:          "included/file.txt",
 | 
			
		||||
					Content:      []byte("included/file.txt"),
 | 
			
		||||
					ContentType:  "text/plain",
 | 
			
		||||
					LastModified: time.Now(),
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			assertArtifact: sourcev1.Artifact{
 | 
			
		||||
				Path:     "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz",
 | 
			
		||||
				Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100",
 | 
			
		||||
			},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			g := NewWithT(t)
 | 
			
		||||
 | 
			
		||||
			builder := fakeclient.NewClientBuilder().WithScheme(testEnv.Scheme())
 | 
			
		||||
			if tt.secret != nil {
 | 
			
		||||
				builder.WithObjects(tt.secret)
 | 
			
		||||
			}
 | 
			
		||||
			r := &BucketReconciler{
 | 
			
		||||
				Client:  builder.Build(),
 | 
			
		||||
				Storage: testStorage,
 | 
			
		||||
			}
 | 
			
		||||
			tmpDir, err := os.MkdirTemp("", "reconcile-bucket-source-")
 | 
			
		||||
			g.Expect(err).ToNot(HaveOccurred())
 | 
			
		||||
			defer os.RemoveAll(tmpDir)
 | 
			
		||||
 | 
			
		||||
			obj := &sourcev1.Bucket{
 | 
			
		||||
				TypeMeta: metav1.TypeMeta{
 | 
			
		||||
					Kind: sourcev1.BucketKind,
 | 
			
		||||
				},
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					Name: "test-bucket",
 | 
			
		||||
				},
 | 
			
		||||
				Spec: sourcev1.BucketSpec{
 | 
			
		||||
					Timeout: &metav1.Duration{Duration: timeout},
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			var server *s3MockServer
 | 
			
		||||
			if tt.bucketName != "" {
 | 
			
		||||
				server = newS3Server(tt.bucketName)
 | 
			
		||||
				server.Objects = tt.bucketObjects
 | 
			
		||||
				server.Start()
 | 
			
		||||
				defer server.Stop()
 | 
			
		||||
 | 
			
		||||
				g.Expect(server.HTTPAddress()).ToNot(BeEmpty())
 | 
			
		||||
				u, err := url.Parse(server.HTTPAddress())
 | 
			
		||||
				g.Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
 | 
			
		||||
				obj.Spec.BucketName = tt.bucketName
 | 
			
		||||
				obj.Spec.Endpoint = u.Host
 | 
			
		||||
				// TODO(hidde): also test TLS
 | 
			
		||||
				obj.Spec.Insecure = true
 | 
			
		||||
			}
 | 
			
		||||
			if tt.beforeFunc != nil {
 | 
			
		||||
				tt.beforeFunc(obj)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			artifact := &sourcev1.Artifact{}
 | 
			
		||||
			got, err := r.reconcileSource(context.TODO(), obj, artifact, tmpDir)
 | 
			
		||||
			g.Expect(err != nil).To(Equal(tt.wantErr))
 | 
			
		||||
			g.Expect(got).To(Equal(tt.want))
 | 
			
		||||
 | 
			
		||||
			g.Expect(artifact).To(MatchArtifact(tt.assertArtifact.DeepCopy()))
 | 
			
		||||
			g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBucketReconciler_reconcileArtifact(t *testing.T) {
 | 
			
		||||
	// testChecksum is the checksum value of the artifacts created in this
 | 
			
		||||
	// test.
 | 
			
		||||
	const testChecksum = "4f4fb700ef54461cfa02571ae0db9a0dc1e0cdb5577484a6d75e68dc38e8acc1"
 | 
			
		||||
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name             string
 | 
			
		||||
		beforeFunc       func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string)
 | 
			
		||||
		afterFunc        func(t *WithT, obj *sourcev1.Bucket, dir string)
 | 
			
		||||
		want             ctrl.Result
 | 
			
		||||
		wantErr          bool
 | 
			
		||||
		assertConditions []metav1.Condition
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "Archiving artifact to storage makes Ready=True",
 | 
			
		||||
			beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
 | 
			
		||||
				obj.Spec.Interval = metav1.Duration{Duration: interval}
 | 
			
		||||
			},
 | 
			
		||||
			want: ctrl.Result{RequeueAfter: interval},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "Up-to-date artifact should not update status",
 | 
			
		||||
			beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
 | 
			
		||||
				obj.Spec.Interval = metav1.Duration{Duration: interval}
 | 
			
		||||
				obj.Status.Artifact = artifact.DeepCopy()
 | 
			
		||||
			},
 | 
			
		||||
			afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) {
 | 
			
		||||
				t.Expect(obj.Status.URL).To(BeEmpty())
 | 
			
		||||
			},
 | 
			
		||||
			want: ctrl.Result{RequeueAfter: interval},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "Removes ArtifactUnavailableCondition after creating artifact",
 | 
			
		||||
			beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
 | 
			
		||||
				obj.Spec.Interval = metav1.Duration{Duration: interval}
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "Foo", "")
 | 
			
		||||
			},
 | 
			
		||||
			want: ctrl.Result{RequeueAfter: interval},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "Removes ArtifactOutdatedCondition after creating a new artifact",
 | 
			
		||||
			beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
 | 
			
		||||
				obj.Spec.Interval = metav1.Duration{Duration: interval}
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "")
 | 
			
		||||
			},
 | 
			
		||||
			want: ctrl.Result{RequeueAfter: interval},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "Creates latest symlink to the created artifact",
 | 
			
		||||
			beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
 | 
			
		||||
				obj.Spec.Interval = metav1.Duration{Duration: interval}
 | 
			
		||||
			},
 | 
			
		||||
			afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) {
 | 
			
		||||
				localPath := testStorage.LocalPath(*obj.GetArtifact())
 | 
			
		||||
				symlinkPath := filepath.Join(filepath.Dir(localPath), "latest.tar.gz")
 | 
			
		||||
				targetFile, err := os.Readlink(symlinkPath)
 | 
			
		||||
				t.Expect(err).NotTo(HaveOccurred())
 | 
			
		||||
				t.Expect(localPath).To(Equal(targetFile))
 | 
			
		||||
			},
 | 
			
		||||
			want: ctrl.Result{RequeueAfter: interval},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "Dir path deleted",
 | 
			
		||||
			beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
 | 
			
		||||
				t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred())
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "Dir path is not a directory",
 | 
			
		||||
			beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
 | 
			
		||||
				// Remove the given directory and create a file for the same
 | 
			
		||||
				// path.
 | 
			
		||||
				t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred())
 | 
			
		||||
				f, err := os.Create(dir)
 | 
			
		||||
				defer f.Close()
 | 
			
		||||
				t.Expect(err).ToNot(HaveOccurred())
 | 
			
		||||
			},
 | 
			
		||||
			afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) {
 | 
			
		||||
				t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred())
 | 
			
		||||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			g := NewWithT(t)
 | 
			
		||||
 | 
			
		||||
			r := &BucketReconciler{
 | 
			
		||||
				Storage: testStorage,
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			tmpDir, err := os.MkdirTemp("", "reconcile-bucket-artifact-")
 | 
			
		||||
			g.Expect(err).ToNot(HaveOccurred())
 | 
			
		||||
			defer os.RemoveAll(tmpDir)
 | 
			
		||||
 | 
			
		||||
			obj := &sourcev1.Bucket{
 | 
			
		||||
				TypeMeta: metav1.TypeMeta{
 | 
			
		||||
					Kind: sourcev1.BucketKind,
 | 
			
		||||
				},
 | 
			
		||||
				ObjectMeta: metav1.ObjectMeta{
 | 
			
		||||
					GenerateName: "test-bucket-",
 | 
			
		||||
					Generation:   1,
 | 
			
		||||
					Namespace:    "default",
 | 
			
		||||
				},
 | 
			
		||||
				Spec: sourcev1.BucketSpec{
 | 
			
		||||
					Timeout: &metav1.Duration{Duration: timeout},
 | 
			
		||||
				},
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			artifact := testStorage.NewArtifactFor(obj.Kind, obj, "existing", "foo.tar.gz")
 | 
			
		||||
			artifact.Checksum = testChecksum
 | 
			
		||||
 | 
			
		||||
			if tt.beforeFunc != nil {
 | 
			
		||||
				tt.beforeFunc(g, obj, artifact, tmpDir)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			got, err := r.reconcileArtifact(logr.NewContext(ctx, log.NullLogger{}), obj, artifact, tmpDir)
 | 
			
		||||
			g.Expect(err != nil).To(Equal(tt.wantErr))
 | 
			
		||||
			g.Expect(got).To(Equal(tt.want))
 | 
			
		||||
 | 
			
		||||
			// On error, artifact is empty. Check artifacts only on successful
 | 
			
		||||
			// reconcile.
 | 
			
		||||
			if !tt.wantErr {
 | 
			
		||||
				g.Expect(obj.Status.Artifact).To(MatchArtifact(artifact.DeepCopy()))
 | 
			
		||||
			}
 | 
			
		||||
			g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
 | 
			
		||||
 | 
			
		||||
			if tt.afterFunc != nil {
 | 
			
		||||
				tt.afterFunc(g, obj, tmpDir)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBucketReconciler_revision(t *testing.T) {
 | 
			
		||||
	tests := []struct {
 | 
			
		||||
		name    string
 | 
			
		||||
		list    map[string]string
 | 
			
		||||
		want    string
 | 
			
		||||
		wantErr bool
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "list with items",
 | 
			
		||||
			list: map[string]string{
 | 
			
		||||
				"one":   "one",
 | 
			
		||||
				"two":   "two",
 | 
			
		||||
				"three": "three",
 | 
			
		||||
			},
 | 
			
		||||
			want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc",
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "list with items in different order",
 | 
			
		||||
			list: map[string]string{
 | 
			
		||||
				"three": "three",
 | 
			
		||||
				"one":   "one",
 | 
			
		||||
				"two":   "two",
 | 
			
		||||
			},
 | 
			
		||||
			want: "8afaa9c32d7c187e8acaeffe899226011001f67c095519cdd8b4c03487c5b8bc",
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "empty list",
 | 
			
		||||
			list: map[string]string{},
 | 
			
		||||
			want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
			name: "nil list",
 | 
			
		||||
			list: nil,
 | 
			
		||||
			want: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	for _, tt := range tests {
 | 
			
		||||
		t.Run(tt.name, func(t *testing.T) {
 | 
			
		||||
			got, err := (&BucketReconciler{}).revision(tt.list)
 | 
			
		||||
			if (err != nil) != tt.wantErr {
 | 
			
		||||
				t.Errorf("revision() error = %v, wantErr %v", err, tt.wantErr)
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			if got != tt.want {
 | 
			
		||||
				t.Errorf("revision() got = %v, want %v", got, tt.want)
 | 
			
		||||
			}
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// helpers
 | 
			
		||||
 | 
			
		||||
func mockFile(root, path, content string) error {
 | 
			
		||||
	filePath := filepath.Join(root, path)
 | 
			
		||||
	if err := os.MkdirAll(filepath.Dir(filePath), os.ModePerm); err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -80,3 +653,120 @@ func mockFile(root, path, content string) error {
 | 
			
		|||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type s3MockObject struct {
 | 
			
		||||
	Key          string
 | 
			
		||||
	LastModified time.Time
 | 
			
		||||
	ContentType  string
 | 
			
		||||
	Content      []byte
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type s3MockServer struct {
 | 
			
		||||
	srv *httptest.Server
 | 
			
		||||
	mux *http.ServeMux
 | 
			
		||||
 | 
			
		||||
	BucketName string
 | 
			
		||||
	Objects    []*s3MockObject
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newS3Server(bucketName string) *s3MockServer {
 | 
			
		||||
	s := &s3MockServer{BucketName: bucketName}
 | 
			
		||||
	s.mux = http.NewServeMux()
 | 
			
		||||
	s.mux.Handle(fmt.Sprintf("/%s/", s.BucketName), http.HandlerFunc(s.handler))
 | 
			
		||||
 | 
			
		||||
	s.srv = httptest.NewUnstartedServer(s.mux)
 | 
			
		||||
 | 
			
		||||
	return s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *s3MockServer) Start() {
 | 
			
		||||
	s.srv.Start()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *s3MockServer) Stop() {
 | 
			
		||||
	s.srv.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *s3MockServer) HTTPAddress() string {
 | 
			
		||||
	return s.srv.URL
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *s3MockServer) handler(w http.ResponseWriter, r *http.Request) {
 | 
			
		||||
	key := path.Base(r.URL.Path)
 | 
			
		||||
 | 
			
		||||
	switch key {
 | 
			
		||||
	case s.BucketName:
 | 
			
		||||
		w.Header().Add("Content-Type", "application/xml")
 | 
			
		||||
 | 
			
		||||
		if r.Method == http.MethodHead {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		q := r.URL.Query()
 | 
			
		||||
 | 
			
		||||
		if q["location"] != nil {
 | 
			
		||||
			fmt.Fprint(w, `
 | 
			
		||||
<?xml version="1.0" encoding="UTF-8"?>
 | 
			
		||||
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">Europe</LocationConstraint>
 | 
			
		||||
			`)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		contents := ""
 | 
			
		||||
		for _, o := range s.Objects {
 | 
			
		||||
			etag := md5.Sum(o.Content)
 | 
			
		||||
			contents += fmt.Sprintf(`
 | 
			
		||||
		<Contents>
 | 
			
		||||
			<Key>%s</Key>
 | 
			
		||||
			<LastModified>%s</LastModified>
 | 
			
		||||
			<Size>%d</Size>
 | 
			
		||||
			<ETag>"%b"</ETag>
 | 
			
		||||
			<StorageClass>STANDARD</StorageClass>
 | 
			
		||||
		</Contents>`, o.Key, o.LastModified.UTC().Format(time.RFC3339), len(o.Content), etag)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		fmt.Fprintf(w, `
 | 
			
		||||
<?xml version="1.0" encoding="UTF-8"?>
 | 
			
		||||
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
 | 
			
		||||
	<Name>%s</Name>
 | 
			
		||||
	<Prefix/>
 | 
			
		||||
	<Marker/>
 | 
			
		||||
	<KeyCount>%d</KeyCount>
 | 
			
		||||
	<MaxKeys>1000</MaxKeys>
 | 
			
		||||
	<IsTruncated>false</IsTruncated>
 | 
			
		||||
	%s
 | 
			
		||||
</ListBucketResult>
 | 
			
		||||
		`, s.BucketName, len(s.Objects), contents)
 | 
			
		||||
	default:
 | 
			
		||||
		key, err := filepath.Rel("/"+s.BucketName, r.URL.Path)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			w.WriteHeader(500)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var found *s3MockObject
 | 
			
		||||
		for _, o := range s.Objects {
 | 
			
		||||
			if key == o.Key {
 | 
			
		||||
				found = o
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if found == nil {
 | 
			
		||||
			w.WriteHeader(404)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		etag := md5.Sum(found.Content)
 | 
			
		||||
		lastModified := strings.Replace(found.LastModified.UTC().Format(time.RFC1123), "UTC", "GMT", 1)
 | 
			
		||||
 | 
			
		||||
		w.Header().Add("Content-Type", found.ContentType)
 | 
			
		||||
		w.Header().Add("Last-Modified", lastModified)
 | 
			
		||||
		w.Header().Add("ETag", fmt.Sprintf("\"%b\"", etag))
 | 
			
		||||
		w.Header().Add("Content-Length", fmt.Sprintf("%d", len(found.Content)))
 | 
			
		||||
 | 
			
		||||
		if r.Method == http.MethodHead {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		w.Write(found.Content)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -122,13 +122,13 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
 | 
			
		|||
			conditions.WithConditions(
 | 
			
		||||
				sourcev1.IncludeUnavailableCondition,
 | 
			
		||||
				sourcev1.SourceVerifiedCondition,
 | 
			
		||||
				sourcev1.CheckoutFailedCondition,
 | 
			
		||||
				sourcev1.FetchFailedCondition,
 | 
			
		||||
				sourcev1.ArtifactOutdatedCondition,
 | 
			
		||||
				sourcev1.ArtifactUnavailableCondition,
 | 
			
		||||
			),
 | 
			
		||||
			conditions.WithNegativePolarityConditions(
 | 
			
		||||
				sourcev1.ArtifactUnavailableCondition,
 | 
			
		||||
				sourcev1.CheckoutFailedCondition,
 | 
			
		||||
				sourcev1.FetchFailedCondition,
 | 
			
		||||
				sourcev1.SourceVerifiedCondition,
 | 
			
		||||
				sourcev1.IncludeUnavailableCondition,
 | 
			
		||||
				sourcev1.ArtifactOutdatedCondition,
 | 
			
		||||
| 
						 | 
				
			
			@ -140,7 +140,7 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
 | 
			
		|||
			patch.WithOwnedConditions{
 | 
			
		||||
				Conditions: []string{
 | 
			
		||||
					sourcev1.ArtifactUnavailableCondition,
 | 
			
		||||
					sourcev1.CheckoutFailedCondition,
 | 
			
		||||
					sourcev1.FetchFailedCondition,
 | 
			
		||||
					sourcev1.IncludeUnavailableCondition,
 | 
			
		||||
					sourcev1.ArtifactOutdatedCondition,
 | 
			
		||||
					meta.ReadyCondition,
 | 
			
		||||
| 
						 | 
				
			
			@ -271,8 +271,8 @@ func (r *GitRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sou
 | 
			
		|||
// and observes its state.
 | 
			
		||||
//
 | 
			
		||||
// The repository is checked out to the given dir using the defined configuration, and in case of an error during the
 | 
			
		||||
// checkout process (including transient errors), it records v1beta1.CheckoutFailedCondition=True and returns early.
 | 
			
		||||
// On a successful checkout it removes v1beta1.CheckoutFailedCondition, and compares the current revision of HEAD to the
 | 
			
		||||
// checkout process (including transient errors), it records v1beta1.FetchFailedCondition=True and returns early.
 | 
			
		||||
// On a successful checkout it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to the
 | 
			
		||||
// artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
 | 
			
		||||
// If instructed, the signature of the commit is verified if and recorded as v1beta1.SourceVerifiedCondition. If the
 | 
			
		||||
// signature can not be verified or the verification fails, the Condition=False and it returns early.
 | 
			
		||||
| 
						 | 
				
			
			@ -292,7 +292,7 @@ func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
 | 
			
		|||
		})
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			ctrl.LoggerFrom(ctx).Error(err, fmt.Sprintf("Failed to get auth strategy for Git implementation '%s'", obj.Spec.GitImplementation))
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
				"Failed to get auth strategy for Git implementation '%s': %s", obj.Spec.GitImplementation, err)
 | 
			
		||||
			// Do not return error as recovery without changes is impossible
 | 
			
		||||
			return ctrl.Result{}, nil
 | 
			
		||||
| 
						 | 
				
			
			@ -305,7 +305,7 @@ func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
 | 
			
		|||
		}
 | 
			
		||||
		var secret corev1.Secret
 | 
			
		||||
		if err = r.Client.Get(ctx, name, &secret); err != nil {
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
				"Failed to get secret '%s': %s", name.String(), err.Error())
 | 
			
		||||
			r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
				"Failed to get secret '%s': %s", name.String(), err.Error())
 | 
			
		||||
| 
						 | 
				
			
			@ -316,7 +316,7 @@ func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
 | 
			
		|||
		// Configure strategy with secret
 | 
			
		||||
		auth, err = authStrategy.Method(secret)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
			conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
				"Failed to configure auth strategy for Git implementation '%s': %s", obj.Spec.GitImplementation, err)
 | 
			
		||||
			r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.AuthenticationFailedReason,
 | 
			
		||||
				"Failed to configure auth strategy for Git implementation '%s': %s", obj.Spec.GitImplementation, err)
 | 
			
		||||
| 
						 | 
				
			
			@ -332,7 +332,7 @@ func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
 | 
			
		|||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		ctrl.LoggerFrom(ctx).Error(err, fmt.Sprintf("Failed to configure checkout strategy for Git implementation '%s'", obj.Spec.GitImplementation))
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.GitOperationFailedReason,
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason,
 | 
			
		||||
			"Failed to configure checkout strategy for Git implementation '%s': %s", obj.Spec.GitImplementation, err)
 | 
			
		||||
		// Do not return err as recovery without changes is impossible
 | 
			
		||||
		return ctrl.Result{}, nil
 | 
			
		||||
| 
						 | 
				
			
			@ -343,7 +343,7 @@ func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
 | 
			
		|||
	defer cancel()
 | 
			
		||||
	commit, revision, err := checkoutStrategy.Checkout(gitCtx, dir, obj.Spec.URL, auth)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, sourcev1.GitOperationFailedReason,
 | 
			
		||||
		conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason,
 | 
			
		||||
			"Failed to checkout and determine revision: %s", err)
 | 
			
		||||
		r.Eventf(ctx, obj, events.EventSeverityError, sourcev1.GitOperationFailedReason,
 | 
			
		||||
			"Failed to checkout and determine revision: %s", err)
 | 
			
		||||
| 
						 | 
				
			
			@ -352,7 +352,7 @@ func (r *GitRepositoryReconciler) reconcileSource(ctx context.Context,
 | 
			
		|||
	}
 | 
			
		||||
	r.Eventf(ctx, obj, events.EventSeverityInfo, sourcev1.GitOperationSucceedReason,
 | 
			
		||||
		"Cloned repository '%s' and checked out revision '%s'", obj.Spec.URL, revision)
 | 
			
		||||
	conditions.Delete(obj, sourcev1.CheckoutFailedCondition)
 | 
			
		||||
	conditions.Delete(obj, sourcev1.FetchFailedCondition)
 | 
			
		||||
 | 
			
		||||
	// Verify commit signature
 | 
			
		||||
	if result, err := r.verifyCommitSignature(ctx, obj, commit); err != nil || result.IsZero() {
 | 
			
		||||
| 
						 | 
				
			
			@ -401,22 +401,21 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *so
 | 
			
		|||
 | 
			
		||||
	// Ensure target path exists and is a directory
 | 
			
		||||
	if f, err := os.Stat(dir); err != nil {
 | 
			
		||||
		ctrl.LoggerFrom(ctx).Error(err, "failed to stat source path")
 | 
			
		||||
		err = fmt.Errorf("failed to stat target path: %w", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	} else if !f.IsDir() {
 | 
			
		||||
		err := fmt.Errorf("source path '%s' is not a directory", dir)
 | 
			
		||||
		ctrl.LoggerFrom(ctx).Error(err, "invalid target path")
 | 
			
		||||
		err = fmt.Errorf("invalid target path: '%s' is not a directory", dir)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Ensure artifact directory exists and acquire lock
 | 
			
		||||
	if err := r.Storage.MkdirAll(artifact); err != nil {
 | 
			
		||||
		ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory")
 | 
			
		||||
		err = fmt.Errorf("failed to create artifact directory: %w", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	unlock, err := r.Storage.Lock(artifact)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact")
 | 
			
		||||
		err = fmt.Errorf("failed to acquire lock for artifact: %w", err)
 | 
			
		||||
		return ctrl.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	defer unlock()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -220,7 +220,7 @@ func TestGitRepositoryReconciler_reconcileSource_authStrategy(t *testing.T) {
 | 
			
		|||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.CheckoutFailedCondition, sourcev1.GitOperationFailedReason, "x509: certificate signed by unknown authority"),
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "x509: certificate signed by unknown authority"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
| 
						 | 
				
			
			@ -245,7 +245,7 @@ func TestGitRepositoryReconciler_reconcileSource_authStrategy(t *testing.T) {
 | 
			
		|||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.CheckoutFailedCondition, sourcev1.GitOperationFailedReason, "Failed to checkout and determine revision: unable to clone '<url>', error: Certificate"),
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "Failed to checkout and determine revision: unable to clone '<url>', error: Certificate"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
		{
 | 
			
		||||
| 
						 | 
				
			
			@ -306,7 +306,7 @@ func TestGitRepositoryReconciler_reconcileSource_authStrategy(t *testing.T) {
 | 
			
		|||
			},
 | 
			
		||||
			wantErr: true,
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.CheckoutFailedCondition, "AuthenticationFailed", "Failed to get secret '/non-existing': secrets \"non-existing\" not found"),
 | 
			
		||||
				*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/non-existing': secrets \"non-existing\" not found"),
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -1145,7 +1145,7 @@ func TestGitRepositoryReconciler_ConditionsUpdate(t *testing.T) {
 | 
			
		|||
		{
 | 
			
		||||
			name: "mixed failed conditions",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.GitRepository) {
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, "Foo", "")
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, "Foo", "")
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.IncludeUnavailableCondition, "Foo", "")
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.SourceVerifiedCondition, "Foo", "")
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "")
 | 
			
		||||
| 
						 | 
				
			
			@ -1160,7 +1160,7 @@ func TestGitRepositoryReconciler_ConditionsUpdate(t *testing.T) {
 | 
			
		|||
			name: "reconciling and failed conditions",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.GitRepository) {
 | 
			
		||||
				conditions.MarkTrue(obj, meta.ReconcilingCondition, "Foo", "")
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, "Foo", "")
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, "Foo", "")
 | 
			
		||||
			},
 | 
			
		||||
			want: ctrl.Result{RequeueAfter: interval},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
| 
						 | 
				
			
			@ -1171,7 +1171,7 @@ func TestGitRepositoryReconciler_ConditionsUpdate(t *testing.T) {
 | 
			
		|||
			name: "stalled and failed conditions",
 | 
			
		||||
			beforeFunc: func(obj *sourcev1.GitRepository) {
 | 
			
		||||
				conditions.MarkTrue(obj, meta.StalledCondition, "Foo", "")
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.CheckoutFailedCondition, "Foo", "")
 | 
			
		||||
				conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, "Foo", "")
 | 
			
		||||
			},
 | 
			
		||||
			want: ctrl.Result{RequeueAfter: interval},
 | 
			
		||||
			assertConditions: []metav1.Condition{
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -29,11 +29,11 @@ import (
 | 
			
		|||
	ctrl "sigs.k8s.io/controller-runtime"
 | 
			
		||||
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/controller"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/testenv"
 | 
			
		||||
	"github.com/fluxcd/pkg/testserver"
 | 
			
		||||
 | 
			
		||||
	sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
 | 
			
		||||
	// +kubebuilder:scaffold:imports
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/testenv"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// These tests make use of plain Go using Gomega for assertions.
 | 
			
		||||
| 
						 | 
				
			
			@ -98,6 +98,15 @@ func TestMain(m *testing.M) {
 | 
			
		|||
		panic(fmt.Sprintf("Failed to start GitRepositoryReconciler: %v", err))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := (&BucketReconciler{
 | 
			
		||||
		Client:  testEnv,
 | 
			
		||||
		Events:  testEventsH,
 | 
			
		||||
		Metrics: testMetricsH,
 | 
			
		||||
		Storage: testStorage,
 | 
			
		||||
	}).SetupWithManager(testEnv); err != nil {
 | 
			
		||||
		panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		fmt.Println("Starting the test environment")
 | 
			
		||||
		if err := testEnv.Start(ctx); err != nil {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										27
									
								
								main.go
								
								
								
								
							
							
						
						
									
										27
									
								
								main.go
								
								
								
								
							| 
						 | 
				
			
			@ -25,13 +25,6 @@ import (
 | 
			
		|||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/client"
 | 
			
		||||
	helper "github.com/fluxcd/pkg/runtime/controller"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/events"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/leaderelection"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/logger"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/pprof"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/probes"
 | 
			
		||||
	"github.com/go-logr/logr"
 | 
			
		||||
	flag "github.com/spf13/pflag"
 | 
			
		||||
	"helm.sh/helm/v3/pkg/getter"
 | 
			
		||||
| 
						 | 
				
			
			@ -41,6 +34,14 @@ import (
 | 
			
		|||
	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
 | 
			
		||||
	ctrl "sigs.k8s.io/controller-runtime"
 | 
			
		||||
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/client"
 | 
			
		||||
	helper "github.com/fluxcd/pkg/runtime/controller"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/events"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/leaderelection"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/logger"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/pprof"
 | 
			
		||||
	"github.com/fluxcd/pkg/runtime/probes"
 | 
			
		||||
 | 
			
		||||
	sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
 | 
			
		||||
	"github.com/fluxcd/source-controller/controllers"
 | 
			
		||||
	// +kubebuilder:scaffold:imports
 | 
			
		||||
| 
						 | 
				
			
			@ -144,14 +145,14 @@ func main() {
 | 
			
		|||
	probes.SetupChecks(mgr, setupLog)
 | 
			
		||||
	pprof.SetupHandlers(mgr, setupLog)
 | 
			
		||||
 | 
			
		||||
	eventsH := helper.MakeEvents(mgr, controllerName, eventRecorder)
 | 
			
		||||
	metricsH := helper.MustMakeMetrics(mgr)
 | 
			
		||||
 | 
			
		||||
	if storageAdvAddr == "" {
 | 
			
		||||
		storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog)
 | 
			
		||||
	}
 | 
			
		||||
	storage := mustInitStorage(storagePath, storageAdvAddr, setupLog)
 | 
			
		||||
 | 
			
		||||
	eventsH := helper.MakeEvents(mgr, controllerName, eventRecorder)
 | 
			
		||||
	metricsH := helper.MustMakeMetrics(mgr)
 | 
			
		||||
 | 
			
		||||
	if err = (&controllers.GitRepositoryReconciler{
 | 
			
		||||
		Client:  mgr.GetClient(),
 | 
			
		||||
		Events:  eventsH,
 | 
			
		||||
| 
						 | 
				
			
			@ -194,11 +195,9 @@ func main() {
 | 
			
		|||
	}
 | 
			
		||||
	if err = (&controllers.BucketReconciler{
 | 
			
		||||
		Client:  mgr.GetClient(),
 | 
			
		||||
		Scheme:                mgr.GetScheme(),
 | 
			
		||||
		Events:  eventsH,
 | 
			
		||||
		Metrics: metricsH,
 | 
			
		||||
		Storage: storage,
 | 
			
		||||
		EventRecorder:         mgr.GetEventRecorderFor(controllerName),
 | 
			
		||||
		ExternalEventRecorder: eventRecorder,
 | 
			
		||||
		MetricsRecorder:       metricsH.MetricsRecorder,
 | 
			
		||||
	}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
 | 
			
		||||
		MaxConcurrentReconciles: concurrent,
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue