bucket: Add more reconciler design improvements
- Remove ArtifactUnavailable condition and use Reconciling condition to convey the same. - Make Reconciling condition affect the ready condition. - Introduce summarizeAndPatch() to calculate the final status conditions and patch them. - Introduce reconcile() to iterate through the sub-reconcilers and execute them. Signed-off-by: Sunny <darkowlzz@protonmail.com>
This commit is contained in:
parent
848534a8f1
commit
ba7cbd31f1
|
@ -19,6 +19,7 @@ package controllers
|
|||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -36,7 +37,7 @@ import (
|
|||
"google.golang.org/api/option"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
kerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
kuberecorder "k8s.io/client-go/tools/record"
|
||||
|
@ -53,9 +54,37 @@ import (
|
|||
"github.com/fluxcd/pkg/runtime/predicates"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
serror "github.com/fluxcd/source-controller/internal/error"
|
||||
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
|
||||
"github.com/fluxcd/source-controller/pkg/sourceignore"
|
||||
)
|
||||
|
||||
// Status conditions owned by Bucket reconciler.
|
||||
var bucketOwnedConditions = []string{
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.FetchFailedCondition,
|
||||
meta.ReadyCondition,
|
||||
meta.ReconcilingCondition,
|
||||
meta.StalledCondition,
|
||||
}
|
||||
|
||||
// Conditions that Ready condition is influenced by in descending order of their
|
||||
// priority.
|
||||
var bucketReadyDeps = []string{
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.FetchFailedCondition,
|
||||
meta.StalledCondition,
|
||||
meta.ReconcilingCondition,
|
||||
}
|
||||
|
||||
// Negative conditions that Ready condition is influenced by.
|
||||
var bucketReadyDepsNegative = []string{
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.FetchFailedCondition,
|
||||
meta.StalledCondition,
|
||||
meta.ReconcilingCondition,
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
|
||||
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
|
||||
|
@ -74,6 +103,10 @@ type BucketReconcilerOptions struct {
|
|||
MaxConcurrentReconciles int
|
||||
}
|
||||
|
||||
// bucketReconcilerFunc is the function type for all the bucket reconciler
|
||||
// functions.
|
||||
type bucketReconcilerFunc func(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error)
|
||||
|
||||
func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||||
return r.SetupWithManagerAndOptions(mgr, BucketReconcilerOptions{})
|
||||
}
|
||||
|
@ -111,69 +144,14 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
|
|||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
var recResult sreconcile.Result
|
||||
|
||||
// Always attempt to patch the object and status after each reconciliation
|
||||
// NOTE: This deferred block only modifies the named return error. The
|
||||
// result from the reconciliation remains the same. Any requeue attributes
|
||||
// set in the result will continue to be effective.
|
||||
defer func() {
|
||||
// Record the value of the reconciliation request, if any
|
||||
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
|
||||
obj.Status.SetLastHandledReconcileRequest(v)
|
||||
}
|
||||
|
||||
// Summarize the Ready condition based on abnormalities that may have been observed
|
||||
conditions.SetSummary(obj,
|
||||
meta.ReadyCondition,
|
||||
conditions.WithConditions(
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.FetchFailedCondition,
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
),
|
||||
conditions.WithNegativePolarityConditions(
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.FetchFailedCondition,
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
),
|
||||
)
|
||||
|
||||
// Patch the object, ignoring conflicts on the conditions owned by this controller
|
||||
patchOpts := []patch.Option{
|
||||
patch.WithOwnedConditions{
|
||||
Conditions: []string{
|
||||
sourcev1.ArtifactOutdatedCondition,
|
||||
sourcev1.FetchFailedCondition,
|
||||
sourcev1.ArtifactUnavailableCondition,
|
||||
meta.ReadyCondition,
|
||||
meta.ReconcilingCondition,
|
||||
meta.StalledCondition,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Determine if the resource is still being reconciled, or if it has stalled, and record this observation
|
||||
if retErr == nil && (result.IsZero() || !result.Requeue) {
|
||||
// We are no longer reconciling
|
||||
conditions.Delete(obj, meta.ReconcilingCondition)
|
||||
|
||||
// We have now observed this generation
|
||||
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
|
||||
|
||||
readyCondition := conditions.Get(obj, meta.ReadyCondition)
|
||||
switch readyCondition.Status {
|
||||
case metav1.ConditionFalse:
|
||||
// As we are no longer reconciling and the end-state is not ready, the reconciliation has stalled
|
||||
conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message)
|
||||
case metav1.ConditionTrue:
|
||||
// As we are no longer reconciling and the end-state is ready, the reconciliation is no longer stalled
|
||||
conditions.Delete(obj, meta.StalledCondition)
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, patch the resource
|
||||
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
|
||||
// Ignore patch error "not found" when the object is being deleted.
|
||||
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
|
||||
}
|
||||
retErr = kerrors.NewAggregate([]error{retErr, err})
|
||||
}
|
||||
retErr = r.summarizeAndPatch(ctx, obj, patchHelper, recResult, retErr)
|
||||
|
||||
// Always record readiness and duration metrics
|
||||
r.Metrics.RecordReadiness(ctx, obj)
|
||||
|
@ -183,60 +161,109 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
|
|||
// Add finalizer first if not exist to avoid the race condition between init and delete
|
||||
if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
|
||||
controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
|
||||
recResult = sreconcile.ResultRequeue
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
}
|
||||
|
||||
// Examine if the object is under deletion
|
||||
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
return r.reconcileDelete(ctx, obj)
|
||||
res, err := r.reconcileDelete(ctx, obj)
|
||||
return sreconcile.BuildRuntimeResult(ctx, r.EventRecorder, obj, res, err)
|
||||
}
|
||||
|
||||
// Reconcile actual object
|
||||
return r.reconcile(ctx, obj)
|
||||
reconcilers := []bucketReconcilerFunc{
|
||||
r.reconcileStorage,
|
||||
r.reconcileSource,
|
||||
r.reconcileArtifact,
|
||||
}
|
||||
recResult, err = r.reconcile(ctx, obj, reconcilers)
|
||||
return sreconcile.BuildRuntimeResult(ctx, r.EventRecorder, obj, recResult, err)
|
||||
}
|
||||
|
||||
// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
|
||||
// produces an error.
|
||||
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
|
||||
// Mark the resource as under reconciliation
|
||||
conditions.MarkReconciling(obj, meta.ProgressingReason, "")
|
||||
|
||||
// Reconcile the storage data
|
||||
if result, err := r.reconcileStorage(ctx, obj); err != nil || result.IsZero() {
|
||||
return result, err
|
||||
// summarizeAndPatch analyzes the object conditions to create a summary of the
|
||||
// status conditions and patches the object with the calculated summary.
|
||||
func (r *BucketReconciler) summarizeAndPatch(ctx context.Context, obj *sourcev1.Bucket, patchHelper *patch.Helper, res sreconcile.Result, recErr error) error {
|
||||
// Record the value of the reconciliation request if any.
|
||||
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
|
||||
obj.Status.SetLastHandledReconcileRequest(v)
|
||||
}
|
||||
|
||||
// Compute the reconcile results, obtain patch options and reconcile error.
|
||||
var patchOpts []patch.Option
|
||||
patchOpts, recErr = sreconcile.ComputeReconcileResult(obj, res, recErr, bucketOwnedConditions)
|
||||
|
||||
// Summarize the Ready condition based on abnormalities that may have been observed.
|
||||
conditions.SetSummary(obj,
|
||||
meta.ReadyCondition,
|
||||
conditions.WithConditions(
|
||||
bucketReadyDeps...,
|
||||
),
|
||||
conditions.WithNegativePolarityConditions(
|
||||
bucketReadyDepsNegative...,
|
||||
),
|
||||
)
|
||||
|
||||
// Finally, patch the resource.
|
||||
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
|
||||
// Ignore patch error "not found" when the object is being deleted.
|
||||
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
|
||||
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
|
||||
}
|
||||
recErr = kerrors.NewAggregate([]error{recErr, err})
|
||||
}
|
||||
|
||||
return recErr
|
||||
}
|
||||
|
||||
// reconcile steps iterates through the actual reconciliation tasks for objec,
|
||||
// it returns early on the first step that returns ResultRequeue or produces an
|
||||
// error.
|
||||
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcilerFunc) (sreconcile.Result, error) {
|
||||
if obj.Generation != obj.Status.ObservedGeneration {
|
||||
conditions.MarkReconciling(obj, "NewGeneration", "Reconciling new generation %d", obj.Generation)
|
||||
}
|
||||
|
||||
var artifact sourcev1.Artifact
|
||||
|
||||
// Create temp working dir
|
||||
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
|
||||
if err != nil {
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason, "Failed to create temporary directory: %s", err)
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, &serror.Event{
|
||||
Err: fmt.Errorf("failed to create temporary directory: %w", err),
|
||||
Reason: sourcev1.StorageOperationFailedReason,
|
||||
}
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Reconcile the source from upstream
|
||||
var artifact sourcev1.Artifact
|
||||
if result, err := r.reconcileSource(ctx, obj, &artifact, tmpDir); err != nil || result.IsZero() {
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, err
|
||||
// Run the sub-reconcilers and build the result of reconciliation.
|
||||
var res sreconcile.Result
|
||||
var resErr error
|
||||
for _, rec := range reconcilers {
|
||||
recResult, err := rec(ctx, obj, &artifact, tmpDir)
|
||||
// Exit immediately on ResultRequeue.
|
||||
if recResult == sreconcile.ResultRequeue {
|
||||
return sreconcile.ResultRequeue, nil
|
||||
}
|
||||
// If an error is received, prioritize the returned results because an
|
||||
// error also means immediate requeue.
|
||||
if err != nil {
|
||||
resErr = err
|
||||
res = recResult
|
||||
break
|
||||
}
|
||||
// Prioritize requeue request in the result.
|
||||
res = sreconcile.LowestRequeuingResult(res, recResult)
|
||||
}
|
||||
|
||||
// Reconcile the artifact to storage
|
||||
if result, err := r.reconcileArtifact(ctx, obj, artifact, tmpDir); err != nil || result.IsZero() {
|
||||
return result, err
|
||||
}
|
||||
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
return res, resErr
|
||||
}
|
||||
|
||||
// reconcileStorage ensures the current state of the storage matches the desired and previously observed state.
|
||||
//
|
||||
// All artifacts for the resource except for the current one are garbage collected from the storage.
|
||||
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
|
||||
// If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set.
|
||||
// If the hostname of the URLs on the object do not match the current storage server hostname, they are updated.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
|
||||
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
||||
// Garbage collect previous advertised artifact(s) from storage
|
||||
_ = r.garbageCollect(ctx, obj)
|
||||
|
||||
|
@ -248,26 +275,23 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.B
|
|||
|
||||
// Record that we do not have an artifact
|
||||
if obj.GetArtifact() == nil {
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage")
|
||||
return ctrl.Result{Requeue: true}, nil
|
||||
conditions.MarkReconciling(obj, "NoArtifact", "No artifact for resource in storage")
|
||||
return sreconcile.ResultSuccess, nil
|
||||
}
|
||||
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
|
||||
|
||||
// Always update URLs to ensure hostname is up-to-date
|
||||
// TODO(hidde): we may want to send out an event only if we notice the URL has changed
|
||||
r.Storage.SetArtifactURL(obj.GetArtifact())
|
||||
obj.Status.URL = r.Storage.SetHostname(obj.Status.URL)
|
||||
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
return sreconcile.ResultSuccess, nil
|
||||
}
|
||||
|
||||
// reconcileSource reconciles the upstream bucket with the client for the given object's Provider, and returns the
|
||||
// result.
|
||||
// If a SecretRef is defined, it attempts to fetch the Secret before calling the provider. If the fetch of the Secret
|
||||
// fails, it records v1beta1.FetchFailedCondition=True and returns early.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (ctrl.Result, error) {
|
||||
func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
||||
var secret *corev1.Secret
|
||||
if obj.Spec.SecretRef != nil {
|
||||
secretName := types.NamespacedName{
|
||||
|
@ -276,12 +300,13 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
|
|||
}
|
||||
secret = &corev1.Secret{}
|
||||
if err := r.Get(ctx, secretName, secret); err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason,
|
||||
"Failed to get secret '%s': %s", secretName.String(), err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.AuthenticationFailedReason,
|
||||
"Failed to get secret '%s': %s", secretName.String(), err.Error())
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to get secret '%s': %w", secretName.String(), err),
|
||||
Reason: sourcev1.AuthenticationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, e.Err.Error())
|
||||
// Return error as the world as observed may change
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -302,19 +327,18 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
|
|||
// On a successful download, it removes v1beta1.FetchFailedCondition, and compares the current revision of HEAD to
|
||||
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
||||
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact,
|
||||
secret *corev1.Secret, dir string) (ctrl.Result, error) {
|
||||
secret *corev1.Secret, dir string) (sreconcile.Result, error) {
|
||||
// Build the client with the configuration from the object and secret
|
||||
s3Client, err := r.buildMinioClient(obj, secret)
|
||||
if err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to construct S3 client: %s", err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to construct S3 client: %s", err.Error())
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to construct S3 client: %w", err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
// Return error as the contents of the secret may change
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
|
||||
// Confirm bucket exists
|
||||
|
@ -322,36 +346,42 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
|
|||
defer cancel()
|
||||
exists, err := s3Client.BucketExists(ctxTimeout, obj.Spec.BucketName)
|
||||
if err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
if !exists {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
|
||||
// Look for file with ignore rules first
|
||||
path := filepath.Join(dir, sourceignore.IgnoreFile)
|
||||
if err := s3Client.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path, minio.GetObjectOptions{}); err != nil {
|
||||
if resp, ok := err.(minio.ErrorResponse); ok && resp.Code != "NoSuchKey" {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
}
|
||||
ps, err := sourceignore.ReadIgnoreFile(path, nil)
|
||||
if err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
// In-spec patterns take precedence
|
||||
if obj.Spec.Ignore != nil {
|
||||
|
@ -368,11 +398,12 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
|
|||
UseV1: s3utils.IsGoogleEndpoint(*s3Client.EndpointURL()),
|
||||
}) {
|
||||
if err = object.Err; err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
|
||||
// Ignore directories and the .sourceignore file
|
||||
|
@ -391,13 +422,17 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
|
|||
revision, err := index.Revision()
|
||||
if err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision")
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, &serror.Event{
|
||||
Err: fmt.Errorf("failed to calculate revision: %w", err),
|
||||
Reason: meta.FailedReason,
|
||||
}
|
||||
}
|
||||
|
||||
if !obj.GetArtifact().HasRevision(revision) {
|
||||
// Mark observations about the revision on the object
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision",
|
||||
"New upstream revision '%s'", revision)
|
||||
message := fmt.Sprintf("new upstream revision '%s'", revision)
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
|
||||
conditions.MarkReconciling(obj, "NewRevision", message)
|
||||
|
||||
// Download the files in parallel, but with a limited number of workers
|
||||
group, groupCtx := errgroup.WithContext(ctx)
|
||||
|
@ -421,20 +456,21 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
|
|||
return nil
|
||||
})
|
||||
if err = group.Wait(); err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("download from bucket '%s' failed: %w", obj.Spec.BucketName, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
r.Eventf(obj, corev1.EventTypeNormal, sourcev1.BucketOperationSucceedReason,
|
||||
"Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision)
|
||||
r.eventLogf(ctx, obj, corev1.EventTypeNormal, sourcev1.BucketOperationSucceedReason,
|
||||
"downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision)
|
||||
}
|
||||
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
||||
|
||||
// Create potential new artifact
|
||||
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
return sreconcile.ResultSuccess, nil
|
||||
}
|
||||
|
||||
// reconcileGCPSource ensures the upstream Google Cloud Storage bucket can be reached and downloaded from using the
|
||||
|
@ -446,18 +482,17 @@ func (r *BucketReconciler) reconcileMinioSource(ctx context.Context, obj *source
|
|||
// On a successful download, it removes v1beta1.DownloadFailedCondition, and compares the current revision of HEAD to
|
||||
// the artifact on the object, and records v1beta1.ArtifactOutdatedCondition if they differ.
|
||||
// If the download was successful, the given artifact pointer is set to a new artifact with the available metadata.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact,
|
||||
secret *corev1.Secret, dir string) (ctrl.Result, error) {
|
||||
secret *corev1.Secret, dir string) (sreconcile.Result, error) {
|
||||
gcpClient, err := r.buildGCPClient(ctx, secret)
|
||||
if err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to construct GCP client: %s", err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to construct GCP client: %s", err.Error())
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to construct GCP client: %w", err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
// Return error as the contents of the secret may change
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
defer gcpClient.Close(ctrl.LoggerFrom(ctx))
|
||||
|
||||
|
@ -466,36 +501,42 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
|
|||
defer cancel()
|
||||
exists, err := gcpClient.BucketExists(ctxTimeout, obj.Spec.BucketName)
|
||||
if err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to verify existence of bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to verify existence of bucket '%s': %w", obj.Spec.BucketName, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
if !exists {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
return ctrl.Result{}, fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName)
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("bucket '%s' does not exist", obj.Spec.BucketName),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
|
||||
// Look for file with ignore rules first
|
||||
path := filepath.Join(dir, sourceignore.IgnoreFile)
|
||||
if err := gcpClient.FGetObject(ctxTimeout, obj.Spec.BucketName, sourceignore.IgnoreFile, path); err != nil {
|
||||
if err != gcpstorage.ErrObjectNotExist {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to get '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to get '%s' file: %w", sourceignore.IgnoreFile, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
}
|
||||
ps, err := sourceignore.ReadIgnoreFile(path, nil)
|
||||
if err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to read '%s' file: %s", sourceignore.IgnoreFile, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to read '%s' file: %w", sourceignore.IgnoreFile, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
// In-spec patterns take precedence
|
||||
if obj.Spec.Ignore != nil {
|
||||
|
@ -514,11 +555,12 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
|
|||
if err == gcp.IteratorDone {
|
||||
break
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Failed to list objects from bucket '%s': %s", obj.Spec.BucketName, err.Error())
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("failed to list objects from bucket '%s': %w", obj.Spec.BucketName, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
|
||||
if strings.HasSuffix(object.Name, "/") || object.Name == sourceignore.IgnoreFile {
|
||||
|
@ -535,14 +577,17 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
|
|||
// Calculate revision checksum from the collected index values
|
||||
revision, err := index.Revision()
|
||||
if err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to calculate revision")
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, &serror.Event{
|
||||
Err: fmt.Errorf("failed to calculate revision: %w", err),
|
||||
Reason: meta.FailedReason,
|
||||
}
|
||||
}
|
||||
|
||||
if !obj.GetArtifact().HasRevision(revision) {
|
||||
// Mark observations about the revision on the object
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision",
|
||||
"New upstream revision '%s'", revision)
|
||||
message := fmt.Sprintf("new upstream revision '%s'", revision)
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
|
||||
conditions.MarkReconciling(obj, "NewRevision", message)
|
||||
|
||||
// Download the files in parallel, but with a limited number of workers
|
||||
group, groupCtx := errgroup.WithContext(ctx)
|
||||
|
@ -566,113 +611,121 @@ func (r *BucketReconciler) reconcileGCPSource(ctx context.Context, obj *sourcev1
|
|||
return nil
|
||||
})
|
||||
if err = group.Wait(); err != nil {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason,
|
||||
"Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.BucketOperationFailedReason,
|
||||
"Download from bucket '%s' failed: %s", obj.Spec.BucketName, err)
|
||||
return ctrl.Result{}, err
|
||||
e := &serror.Event{
|
||||
Err: fmt.Errorf("download from bucket '%s' failed: %w", obj.Spec.BucketName, err),
|
||||
Reason: sourcev1.BucketOperationFailedReason,
|
||||
}
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, e.Err.Error())
|
||||
return sreconcile.ResultEmpty, e
|
||||
}
|
||||
r.Eventf(obj, corev1.EventTypeNormal, sourcev1.BucketOperationSucceedReason,
|
||||
"Downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision)
|
||||
r.eventLogf(ctx, obj, corev1.EventTypeNormal, sourcev1.BucketOperationSucceedReason,
|
||||
"downloaded %d files from bucket '%s' revision '%s'", len(index), obj.Spec.BucketName, revision)
|
||||
}
|
||||
conditions.Delete(obj, sourcev1.FetchFailedCondition)
|
||||
|
||||
// Create potential new artifact
|
||||
*artifact = r.Storage.NewArtifactFor(obj.Kind, obj, revision, fmt.Sprintf("%s.tar.gz", revision))
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
return sreconcile.ResultSuccess, nil
|
||||
}
|
||||
|
||||
// reconcileArtifact archives a new artifact to the storage, if the current observation on the object does not match the
|
||||
// given data.
|
||||
//
|
||||
// The inspection of the given data to the object is differed, ensuring any stale observations as
|
||||
// v1beta1.ArtifactUnavailableCondition and v1beta1.ArtifactOutdatedCondition are always deleted.
|
||||
// If the given artifact does not differ from the object's current, it returns early.
|
||||
// On a successful archive, the artifact in the status of the given object is set, and the symlink in the storage is
|
||||
// updated to its path.
|
||||
//
|
||||
// The caller should assume a failure if an error is returned, or the Result is zero.
|
||||
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) (ctrl.Result, error) {
|
||||
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, artifact *sourcev1.Artifact, dir string) (sreconcile.Result, error) {
|
||||
// Always restore the Ready condition in case it got removed due to a transient error
|
||||
defer func() {
|
||||
if obj.GetArtifact() != nil {
|
||||
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
|
||||
}
|
||||
if obj.GetArtifact().HasRevision(artifact.Revision) {
|
||||
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
|
||||
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason,
|
||||
"Stored artifact for revision '%s'", artifact.Revision)
|
||||
"stored artifact for revision '%s'", artifact.Revision)
|
||||
}
|
||||
}()
|
||||
|
||||
// The artifact is up-to-date
|
||||
if obj.GetArtifact().HasRevision(artifact.Revision) {
|
||||
ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("Already up to date, current revision '%s'", artifact.Revision))
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
r.eventLogf(ctx, obj, corev1.EventTypeNormal, meta.SucceededReason, "already up to date, current revision '%s'", artifact.Revision)
|
||||
return sreconcile.ResultSuccess, nil
|
||||
}
|
||||
|
||||
// Mark reconciling because the artifact and remote source are different.
|
||||
// and they have to be reconciled.
|
||||
conditions.MarkReconciling(obj, "NewRevision", "new upstream revision '%s'", artifact.Revision)
|
||||
|
||||
// Ensure target path exists and is a directory
|
||||
if f, err := os.Stat(dir); err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to stat source path")
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, &serror.Event{
|
||||
Err: fmt.Errorf("failed to stat source path: %w", err),
|
||||
Reason: sourcev1.StorageOperationFailedReason,
|
||||
}
|
||||
} else if !f.IsDir() {
|
||||
err := fmt.Errorf("source path '%s' is not a directory", dir)
|
||||
ctrl.LoggerFrom(ctx).Error(err, "invalid target path")
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, &serror.Event{
|
||||
Err: fmt.Errorf("source path '%s' is not a directory", dir),
|
||||
Reason: sourcev1.StorageOperationFailedReason,
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure artifact directory exists and acquire lock
|
||||
if err := r.Storage.MkdirAll(artifact); err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory")
|
||||
return ctrl.Result{}, err
|
||||
if err := r.Storage.MkdirAll(*artifact); err != nil {
|
||||
return sreconcile.ResultEmpty, &serror.Event{
|
||||
Err: fmt.Errorf("failed to create artifact directory: %w", err),
|
||||
Reason: sourcev1.StorageOperationFailedReason,
|
||||
}
|
||||
}
|
||||
unlock, err := r.Storage.Lock(artifact)
|
||||
unlock, err := r.Storage.Lock(*artifact)
|
||||
if err != nil {
|
||||
ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact")
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, &serror.Event{
|
||||
Err: fmt.Errorf("failed to acquire lock for artifact: %w", err),
|
||||
Reason: meta.FailedReason,
|
||||
}
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
// Archive directory to storage
|
||||
if err := r.Storage.Archive(&artifact, dir, nil); err != nil {
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
|
||||
"Unable to archive artifact to storage: %s", err)
|
||||
return ctrl.Result{}, err
|
||||
if err := r.Storage.Archive(artifact, dir, nil); err != nil {
|
||||
return sreconcile.ResultEmpty, &serror.Event{
|
||||
Err: fmt.Errorf("unable to archive artifact to storage: %s", err),
|
||||
Reason: sourcev1.StorageOperationFailedReason,
|
||||
}
|
||||
}
|
||||
r.AnnotatedEventf(obj, map[string]string{
|
||||
"revision": artifact.Revision,
|
||||
"checksum": artifact.Checksum,
|
||||
}, corev1.EventTypeNormal, "NewArtifact", "Stored artifact for revision '%s'", artifact.Revision)
|
||||
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for revision '%s'", artifact.Revision)
|
||||
|
||||
// Record it on the object
|
||||
obj.Status.Artifact = artifact.DeepCopy()
|
||||
|
||||
// Update symlink on a "best effort" basis
|
||||
url, err := r.Storage.Symlink(artifact, "latest.tar.gz")
|
||||
url, err := r.Storage.Symlink(*artifact, "latest.tar.gz")
|
||||
if err != nil {
|
||||
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
|
||||
"Failed to update status URL symlink: %s", err)
|
||||
r.eventLogf(ctx, obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
|
||||
"failed to update status URL symlink: %s", err)
|
||||
}
|
||||
if url != "" {
|
||||
obj.Status.URL = url
|
||||
}
|
||||
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
|
||||
return sreconcile.ResultSuccess, nil
|
||||
}
|
||||
|
||||
// reconcileDelete handles the deletion of an object. It first garbage collects all artifacts for the object from the
|
||||
// artifact storage, if successful, the finalizer is removed from the object.
|
||||
func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
|
||||
// func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bucket) (ctrl.Result, error) {
|
||||
func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bucket) (sreconcile.Result, error) {
|
||||
// Garbage collect the resource's artifacts
|
||||
if err := r.garbageCollect(ctx, obj); err != nil {
|
||||
// Return the error so we retry the failed garbage collection
|
||||
return ctrl.Result{}, err
|
||||
return sreconcile.ResultEmpty, err
|
||||
}
|
||||
|
||||
// Remove our finalizer from the list
|
||||
controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
|
||||
|
||||
// Stop reconciliation as the object is being deleted
|
||||
return ctrl.Result{}, nil
|
||||
return sreconcile.ResultEmpty, nil
|
||||
}
|
||||
|
||||
// garbageCollect performs a garbage collection for the given v1beta1.Bucket. It removes all but the current
|
||||
|
@ -681,23 +734,26 @@ func (r *BucketReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.Bu
|
|||
func (r *BucketReconciler) garbageCollect(ctx context.Context, obj *sourcev1.Bucket) error {
|
||||
if !obj.DeletionTimestamp.IsZero() {
|
||||
if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil {
|
||||
r.Eventf(obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
|
||||
"Garbage collection for deleted resource failed: %s", err)
|
||||
return err
|
||||
return &serror.Event{
|
||||
Err: fmt.Errorf("garbage collection for deleted resource failed: %s", err),
|
||||
Reason: "GarbageCollectionFailed",
|
||||
}
|
||||
}
|
||||
obj.Status.Artifact = nil
|
||||
// TODO(hidde): we should only push this event if we actually garbage collected something
|
||||
r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded",
|
||||
"Garbage collected artifacts for deleted resource")
|
||||
r.eventLogf(ctx, obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded",
|
||||
"garbage collected artifacts for deleted resource")
|
||||
return nil
|
||||
}
|
||||
if obj.GetArtifact() != nil {
|
||||
if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
|
||||
r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionFailed", "Garbage collection of old artifacts failed: %s", err)
|
||||
return err
|
||||
return &serror.Event{
|
||||
Err: fmt.Errorf("garbage collection of old artifacts failed: %s", err),
|
||||
Reason: "GarbageCollectionFailed",
|
||||
}
|
||||
}
|
||||
// TODO(hidde): we should only push this event if we actually garbage collected something
|
||||
r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded", "Garbage collected old artifacts")
|
||||
r.eventLogf(ctx, obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded", "garbage collected old artifacts")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -771,3 +827,17 @@ func (i etagIndex) Revision() (string, error) {
|
|||
}
|
||||
return fmt.Sprintf("%x", sum.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// eventLog records event and logs at the same time. This log is different from
|
||||
// the debug log in the event recorder in the sense that this is a simple log,
|
||||
// the event recorder debug log contains complete details about the event.
|
||||
func (r *BucketReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
|
||||
msg := fmt.Sprintf(messageFmt, args...)
|
||||
// Log and emit event.
|
||||
if eventType == corev1.EventTypeWarning {
|
||||
ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg)
|
||||
} else {
|
||||
ctrl.LoggerFrom(ctx).Info(msg)
|
||||
}
|
||||
r.Eventf(obj, eventType, reason, msg)
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/darkowlzz/controller-check/status"
|
||||
"github.com/go-logr/logr"
|
||||
. "github.com/onsi/gomega"
|
||||
raw "google.golang.org/api/storage/v1"
|
||||
|
@ -38,7 +39,6 @@ import (
|
|||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||||
|
@ -46,7 +46,8 @@ import (
|
|||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/conditions"
|
||||
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
|
||||
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
|
||||
)
|
||||
|
||||
// Environment variable to set the GCP Storage host for the GCP client.
|
||||
|
@ -126,6 +127,11 @@ func TestBucketReconciler_Reconcile(t *testing.T) {
|
|||
obj.Generation == obj.Status.ObservedGeneration
|
||||
}, timeout).Should(BeTrue())
|
||||
|
||||
// Check if the object status is valid.
|
||||
condns := &status.Conditions{NegativePolarity: bucketReadyDepsNegative}
|
||||
checker := status.NewChecker(testEnv.Client, testEnv.GetScheme(), condns)
|
||||
checker.CheckErr(ctx, obj)
|
||||
|
||||
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
|
||||
|
||||
// Wait for Bucket to be deleted
|
||||
|
@ -141,7 +147,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
beforeFunc func(obj *sourcev1.Bucket, storage *Storage) error
|
||||
want ctrl.Result
|
||||
want sreconcile.Result
|
||||
wantErr bool
|
||||
assertArtifact *sourcev1.Artifact
|
||||
assertConditions []metav1.Condition
|
||||
|
@ -167,6 +173,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
|
|||
testStorage.SetArtifactURL(obj.Status.Artifact)
|
||||
return nil
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: &sourcev1.Artifact{
|
||||
Path: "/reconcile-storage/c.txt",
|
||||
Revision: "c",
|
||||
|
@ -189,12 +196,12 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
|
|||
testStorage.SetArtifactURL(obj.Status.Artifact)
|
||||
return nil
|
||||
},
|
||||
want: ctrl.Result{Requeue: true},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertPaths: []string{
|
||||
"!/reconcile-storage/invalid.txt",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NoArtifact", "No artifact for resource in storage"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -214,6 +221,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
|
|||
}
|
||||
return nil
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertPaths: []string{
|
||||
"/reconcile-storage/hostname.txt",
|
||||
},
|
||||
|
@ -243,7 +251,9 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) {
|
|||
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
|
||||
}
|
||||
|
||||
got, err := r.reconcileStorage(context.TODO(), obj)
|
||||
var artifact sourcev1.Artifact
|
||||
|
||||
got, err := r.reconcileStorage(context.TODO(), obj, &artifact, "")
|
||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||
g.Expect(got).To(Equal(tt.want))
|
||||
|
||||
|
@ -273,7 +283,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
middleware http.Handler
|
||||
secret *corev1.Secret
|
||||
beforeFunc func(obj *sourcev1.Bucket)
|
||||
want ctrl.Result
|
||||
want sreconcile.Result
|
||||
wantErr bool
|
||||
assertArtifact sourcev1.Artifact
|
||||
assertConditions []metav1.Condition
|
||||
|
@ -289,12 +299,14 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
LastModified: time.Now(),
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz",
|
||||
Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
|
||||
},
|
||||
},
|
||||
// TODO(hidde): middleware for mock server
|
||||
|
@ -312,7 +324,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
},
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/dummy': secrets \"dummy\" not found"),
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -330,7 +342,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
},
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to construct S3 client: invalid 'dummy' secret data: required fields"),
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to construct S3 client: invalid 'dummy' secret data: required fields"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -341,7 +353,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
},
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Bucket 'invalid' does not exist"),
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' does not exist"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -352,7 +364,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
},
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket 'unavailable'"),
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to verify existence of bucket 'unavailable'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -379,12 +391,14 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
LastModified: time.Now(),
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100.tar.gz",
|
||||
Revision: "94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"),
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '94992ae8fb8300723e970e304ea3414266cb414e364ba3f570bb09069f883100'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -414,12 +428,14 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
LastModified: time.Now(),
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar.gz",
|
||||
Revision: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -438,6 +454,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
LastModified: time.Now(),
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz",
|
||||
Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8",
|
||||
|
@ -448,7 +465,7 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
name: "Removes FetchFailedCondition after reconciling source",
|
||||
bucketName: "dummy",
|
||||
beforeFunc: func(obj *sourcev1.Bucket) {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to read test file")
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to read test file")
|
||||
},
|
||||
bucketObjects: []*s3MockObject{
|
||||
{
|
||||
|
@ -458,12 +475,14 @@ func TestBucketReconciler_reconcileMinioSource(t *testing.T) {
|
|||
LastModified: time.Now(),
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8.tar.gz",
|
||||
Revision: "f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'f0467900d3cede8323f3e61a1467f7cd370d1c0d942ff990a1a7be1eb1a231e8'"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -534,7 +553,7 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
bucketObjects []*gcpMockObject
|
||||
secret *corev1.Secret
|
||||
beforeFunc func(obj *sourcev1.Bucket)
|
||||
want ctrl.Result
|
||||
want sreconcile.Result
|
||||
wantErr bool
|
||||
assertArtifact sourcev1.Artifact
|
||||
assertConditions []metav1.Condition
|
||||
|
@ -564,12 +583,14 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
Name: "dummy",
|
||||
}
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz",
|
||||
Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"),
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -580,9 +601,10 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
Name: "dummy",
|
||||
}
|
||||
},
|
||||
want: sreconcile.ResultEmpty,
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "Failed to get secret '/dummy': secrets \"dummy\" not found"),
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -598,9 +620,10 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
Name: "dummy",
|
||||
}
|
||||
},
|
||||
want: sreconcile.ResultEmpty,
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to construct GCP client: invalid 'dummy' secret data: required fields"),
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to construct GCP client: invalid 'dummy' secret data: required fields"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -609,9 +632,10 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
beforeFunc: func(obj *sourcev1.Bucket) {
|
||||
obj.Spec.BucketName = "invalid"
|
||||
},
|
||||
want: sreconcile.ResultEmpty,
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Bucket 'invalid' does not exist"),
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' does not exist"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -620,9 +644,10 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
obj.Spec.Endpoint = "transient.example.com"
|
||||
obj.Spec.BucketName = "unavailable"
|
||||
},
|
||||
want: sreconcile.ResultEmpty,
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to verify existence of bucket 'unavailable'"),
|
||||
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to verify existence of bucket 'unavailable'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -645,12 +670,14 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
ContentType: "text/plain",
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4.tar.gz",
|
||||
Revision: "7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4'"),
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '7556d9ebaa9bcf1b24f363a6d5543af84403acb340fe1eaaf31dcdb0a6e6b4d4'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -677,12 +704,14 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
ContentType: "text/plain",
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar.gz",
|
||||
Revision: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -700,6 +729,7 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
ContentType: "text/plain",
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz",
|
||||
Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8",
|
||||
|
@ -710,7 +740,7 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
name: "Removes FetchFailedCondition after reconciling source",
|
||||
bucketName: "dummy",
|
||||
beforeFunc: func(obj *sourcev1.Bucket) {
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "Failed to read test file")
|
||||
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to read test file")
|
||||
},
|
||||
bucketObjects: []*gcpMockObject{
|
||||
{
|
||||
|
@ -719,12 +749,14 @@ func TestBucketReconciler_reconcileGCPSource(t *testing.T) {
|
|||
ContentType: "text/plain",
|
||||
},
|
||||
},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertArtifact: sourcev1.Artifact{
|
||||
Path: "bucket/test-bucket/23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8.tar.gz",
|
||||
Revision: "23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8",
|
||||
},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"),
|
||||
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '23d97ef9557996c9d911df4359d6086eda7bec5af76e43651581d80f5bcad4b8'"),
|
||||
},
|
||||
},
|
||||
// TODO: Middleware for mock server to test authentication using secret.
|
||||
|
@ -802,7 +834,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
|||
name string
|
||||
beforeFunc func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string)
|
||||
afterFunc func(t *WithT, obj *sourcev1.Bucket, dir string)
|
||||
want ctrl.Result
|
||||
want sreconcile.Result
|
||||
wantErr bool
|
||||
assertConditions []metav1.Condition
|
||||
}{
|
||||
|
@ -811,9 +843,10 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
|||
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
|
||||
obj.Spec.Interval = metav1.Duration{Duration: interval}
|
||||
},
|
||||
want: ctrl.Result{RequeueAfter: interval},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'existing'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -825,20 +858,9 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
|||
afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) {
|
||||
t.Expect(obj.Status.URL).To(BeEmpty())
|
||||
},
|
||||
want: ctrl.Result{RequeueAfter: interval},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Removes ArtifactUnavailableCondition after creating artifact",
|
||||
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
|
||||
obj.Spec.Interval = metav1.Duration{Duration: interval}
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "Foo", "")
|
||||
},
|
||||
want: ctrl.Result{RequeueAfter: interval},
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -847,9 +869,10 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
|||
obj.Spec.Interval = metav1.Duration{Duration: interval}
|
||||
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "")
|
||||
},
|
||||
want: ctrl.Result{RequeueAfter: interval},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'existing'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -864,9 +887,10 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
|||
t.Expect(err).NotTo(HaveOccurred())
|
||||
t.Expect(localPath).To(Equal(targetFile))
|
||||
},
|
||||
want: ctrl.Result{RequeueAfter: interval},
|
||||
want: sreconcile.ResultSuccess,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
|
||||
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact for revision 'existing'"),
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'existing'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -874,7 +898,11 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
|||
beforeFunc: func(t *WithT, obj *sourcev1.Bucket, artifact sourcev1.Artifact, dir string) {
|
||||
t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred())
|
||||
},
|
||||
want: sreconcile.ResultEmpty,
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'existing'"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Dir path is not a directory",
|
||||
|
@ -889,7 +917,11 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
|||
afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) {
|
||||
t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred())
|
||||
},
|
||||
want: sreconcile.ResultEmpty,
|
||||
wantErr: true,
|
||||
assertConditions: []metav1.Condition{
|
||||
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'existing'"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -929,7 +961,7 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) {
|
|||
|
||||
dlog := log.NewDelegatingLogSink(log.NullLogSink{})
|
||||
nullLogger := logr.New(dlog)
|
||||
got, err := r.reconcileArtifact(logr.NewContext(ctx, nullLogger), obj, artifact, tmpDir)
|
||||
got, err := r.reconcileArtifact(logr.NewContext(ctx, nullLogger), obj, &artifact, tmpDir)
|
||||
g.Expect(err != nil).To(Equal(tt.wantErr))
|
||||
g.Expect(got).To(Equal(tt.want))
|
||||
|
||||
|
|
Loading…
Reference in New Issue