From ccf0b624a7c4128a2904e472af5d648eb8217a0d Mon Sep 17 00:00:00 2001 From: Sunny Date: Fri, 25 Nov 2022 13:43:40 +0000 Subject: [PATCH] Add progressive status in bucket reconciler Signed-off-by: Sunny --- controllers/bucket_controller.go | 71 +++++-- controllers/bucket_controller_test.go | 257 ++++++++++++++++++++++---- 2 files changed, 276 insertions(+), 52 deletions(-) diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index 71360dd7..e2f9343e 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -47,6 +47,7 @@ import ( helper "github.com/fluxcd/pkg/runtime/controller" "github.com/fluxcd/pkg/runtime/patch" "github.com/fluxcd/pkg/runtime/predicates" + rreconcile "github.com/fluxcd/pkg/runtime/reconcile" eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" "github.com/fluxcd/pkg/sourceignore" @@ -119,6 +120,8 @@ type BucketReconciler struct { Storage *Storage ControllerName string + + patchOptions []patch.Option } type BucketReconcilerOptions struct { @@ -151,7 +154,7 @@ type BucketProvider interface { // bucketReconcileFunc is the function type for all the v1beta2.Bucket // (sub)reconcile functions. The type implementations are grouped and // executed serially to perform the complete reconcile of the object. -type bucketReconcileFunc func(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) +type bucketReconcileFunc func(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) // etagIndex is an index of storage object keys and their Etag values. type etagIndex struct { @@ -234,6 +237,8 @@ func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts BucketReconcilerOptions) error { + r.patchOptions = getPatchOptions(bucketReadyCondition.Owned, r.ControllerName) + return ctrl.NewControllerManagedBy(mgr). For(&sourcev1.Bucket{}). WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})). @@ -259,10 +264,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res r.RecordSuspend(ctx, obj, obj.Spec.Suspend) // Initialize the patch helper with the current version of the object. - patchHelper, err := patch.NewHelper(obj, r.Client) - if err != nil { - return ctrl.Result{}, err - } + serialPatcher := patch.NewSerialPatcher(obj, r.Client) // recResult stores the abstracted reconcile result. var recResult sreconcile.Result @@ -270,7 +272,7 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res // Always attempt to patch the object and status after each reconciliation // NOTE: The final runtime result and error are set in this block. defer func() { - summarizeHelper := summarize.NewHelper(r.EventRecorder, patchHelper) + summarizeHelper := summarize.NewHelper(r.EventRecorder, serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(bucketReadyCondition), summarize.WithReconcileResult(recResult), @@ -316,19 +318,35 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res r.reconcileSource, r.reconcileArtifact, } - recResult, retErr = r.reconcile(ctx, obj, reconcilers) + recResult, retErr = r.reconcile(ctx, serialPatcher, obj, reconcilers) return } // reconcile iterates through the bucketReconcileFunc tasks for the // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. -func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcile(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) { oldObj := obj.DeepCopy() - // Mark as reconciling if generation differs. - if obj.Generation != obj.Status.ObservedGeneration { - conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) + rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason, "reconciliation in progress") + + var recAtVal string + if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok { + recAtVal = v + } + + // Persist reconciling if generation differs or reconciliation is requested. + switch { + case obj.Generation != obj.Status.ObservedGeneration: + rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason, + "processing object: new generation %d -> %d", obj.Status.ObservedGeneration, obj.Generation) + if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil { + return sreconcile.ResultEmpty, err + } + case recAtVal != obj.Status.GetLastHandledReconcileRequest(): + if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil { + return sreconcile.ResultEmpty, err + } } // Create temp working dir @@ -356,7 +374,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, ) for _, rec := range reconcilers { - recResult, err := rec(ctx, obj, index, tmpDir) + recResult, err := rec(ctx, sp, obj, index, tmpDir) // Exit immediately on ResultRequeue. if recResult == sreconcile.ResultRequeue { return sreconcile.ResultRequeue, nil @@ -421,22 +439,31 @@ func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1. // condition is added. // The hostname of any URL in the Status of the object are updated, to ensure // they match the Storage server hostname of current runtime. -func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) { // Garbage collect previous advertised artifact(s) from storage _ = r.garbageCollect(ctx, obj) // Determine if the advertised artifact is still in storage + var artifactMissing bool if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) { obj.Status.Artifact = nil obj.Status.URL = "" + artifactMissing = true // Remove the condition as the artifact doesn't exist. conditions.Delete(obj, sourcev1.ArtifactInStorageCondition) } // Record that we do not have an artifact if obj.GetArtifact() == nil { - conditions.MarkReconciling(obj, "NoArtifact", "no artifact for resource in storage") + msg := "building artifact" + if artifactMissing { + msg += ": disappeared from storage" + } + rreconcile.ProgressiveStatus(true, obj, meta.ProgressingReason, msg) conditions.Delete(obj, sourcev1.ArtifactInStorageCondition) + if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil { + return sreconcile.ResultEmpty, err + } return sreconcile.ResultSuccess, nil } @@ -453,7 +480,7 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.B // When a SecretRef is defined, it attempts to fetch the Secret before calling // the provider. If this fails, it records v1beta2.FetchFailedCondition=True on // the object and returns early. -func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { secret, err := r.getBucketSecret(ctx, obj) if err != nil { e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason} @@ -528,8 +555,14 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu if !obj.GetArtifact().HasRevision(revision) { message := fmt.Sprintf("new upstream revision '%s'", revision) - conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) - conditions.MarkReconciling(obj, "NewRevision", message) + if obj.GetArtifact() != nil { + conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message) + } + rreconcile.ProgressiveStatus(true, obj, meta.ProgressingReason, "building artifact: %s", message) + if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil { + ctrl.LoggerFrom(ctx).Error(err, "failed to patch") + return + } } }() @@ -554,7 +587,7 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu // early. // On a successful archive, the Artifact in the Status of the object is set, // and the symlink in the Storage is updated to its path. -func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { +func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) { // Calculate revision revision, err := index.Revision() if err != nil { @@ -572,7 +605,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1. if obj.GetArtifact().HasRevision(artifact.Revision) { conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition) conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason, - "stored artifact for revision '%s'", artifact.Revision) + "stored artifact: revision '%s'", artifact.Revision) } }() diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index 883f0864..0593c608 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -185,6 +185,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { } } testStorage.SetArtifactURL(obj.Status.Artifact) + conditions.MarkTrue(obj, meta.ReadyCondition, "foo", "bar") return nil }, assertArtifact: &sourcev1.Artifact{ @@ -201,6 +202,17 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { "!/reconcile-storage/a.txt", }, want: sreconcile.ResultSuccess, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReadyCondition, "foo", "bar"), + }, + }, + { + name: "build artifact first time", + want: sreconcile.ResultSuccess, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact"), + }, }, { name: "notices missing artifact in storage", @@ -217,7 +229,8 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { "!/reconcile-storage/invalid.txt", }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(meta.ReconcilingCondition, "NoArtifact", "no artifact for resource in storage"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: disappeared from storage"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: disappeared from storage"), }, }, { @@ -235,6 +248,7 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0o640); err != nil { return err } + conditions.MarkTrue(obj, meta.ReadyCondition, "foo", "bar") return nil }, want: sreconcile.ResultSuccess, @@ -248,6 +262,9 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { URL: testStorage.Hostname + "/reconcile-storage/hostname.txt", Size: int64p(int64(len("file"))), }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReadyCondition, "foo", "bar"), + }, }, } for _, tt := range tests { @@ -259,22 +276,31 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { }() r := &BucketReconciler{ + Client: fakeclient.NewClientBuilder().WithScheme(testEnv.GetScheme()).Build(), EventRecorder: record.NewFakeRecorder(32), Storage: testStorage, + patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } obj := &sourcev1.Bucket{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "test-", + Generation: 1, }, } if tt.beforeFunc != nil { g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed()) } - index := newEtagIndex() + g.Expect(r.Client.Create(context.TODO(), obj)).ToNot(HaveOccurred()) + defer func() { + g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred()) + }() - got, err := r.reconcileStorage(context.TODO(), obj, index, "") + index := newEtagIndex() + sp := patch.NewSerialPatcher(obj, r.Client) + + got, err := r.reconcileStorage(context.TODO(), sp, obj, index, "") g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) @@ -292,6 +318,10 @@ func TestBucketReconciler_reconcileStorage(t *testing.T) { } g.Expect(absoluteP).NotTo(BeAnExistingFile()) } + + // In-progress status condition validity. + checker := conditionscheck.NewInProgressChecker(r.Client) + checker.CheckErr(ctx, obj) }) } } @@ -327,8 +357,8 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, // TODO(hidde): middleware for mock server @@ -343,11 +373,15 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { obj.Spec.SecretRef = &meta.LocalObjectReference{ Name: "dummy", } + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, wantErr: true, assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -362,11 +396,15 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { obj.Spec.SecretRef = &meta.LocalObjectReference{ Name: "dummy", } + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, wantErr: true, assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields 'accesskey' and 'secretkey'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -374,11 +412,15 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.BucketName = "invalid" + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, wantErr: true, assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -386,11 +428,15 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.Endpoint = "transient.example.com" obj.Spec.BucketName = "unavailable" + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, wantErr: true, assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -423,8 +469,8 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), }, }, { @@ -462,8 +508,8 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), }, }, { @@ -473,6 +519,8 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { obj.Status.Artifact = &sourcev1.Artifact{ Revision: "b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479", } + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, bucketObjects: []*s3mock.Object{ { @@ -488,7 +536,10 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { "test.txt": "098f6bcd4621d373cade4e832627b4f6", }, }, - assertConditions: []metav1.Condition{}, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), + }, }, { name: "Removes FetchFailedCondition after reconciling source", @@ -510,9 +561,38 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { "test.txt": "098f6bcd4621d373cade4e832627b4f6", }, }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + }, + }, + { + name: "Existing artifact makes ArtifactOutdated=True", + bucketName: "dummy", + bucketObjects: []*s3mock.Object{ + { + Key: "test.txt", + Content: []byte("test"), + ContentType: "text/plain", + LastModified: time.Now(), + }, + }, + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: "some-path", + Revision: "some-rev", + } + }, + want: sreconcile.ResultSuccess, + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, + }, assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, } @@ -528,6 +608,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { EventRecorder: record.NewFakeRecorder(32), Client: builder.Build(), Storage: testStorage, + patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } tmpDir := t.TempDir() @@ -536,7 +617,8 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { Kind: sourcev1.BucketKind, }, ObjectMeta: metav1.ObjectMeta{ - Name: "test-bucket", + Name: "test-bucket", + Generation: 1, }, Spec: sourcev1.BucketSpec{ Timeout: &metav1.Duration{Duration: timeout}, @@ -563,14 +645,24 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { tt.beforeFunc(obj) } - index := newEtagIndex() + g.Expect(r.Client.Create(context.TODO(), obj)).ToNot(HaveOccurred()) + defer func() { + g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred()) + }() - got, err := r.reconcileSource(context.TODO(), obj, index, tmpDir) + index := newEtagIndex() + sp := patch.NewSerialPatcher(obj, r.Client) + + got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) g.Expect(index.Index()).To(Equal(tt.assertIndex.Index())) g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + + // In-progress status condition validity. + checker := conditionscheck.NewInProgressChecker(r.Client) + checker.CheckErr(ctx, obj) }) } } @@ -620,8 +712,8 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, { @@ -631,12 +723,16 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { obj.Spec.SecretRef = &meta.LocalObjectReference{ Name: "dummy", } + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, want: sreconcile.ResultEmpty, wantErr: true, assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -651,12 +747,16 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { obj.Spec.SecretRef = &meta.LocalObjectReference{ Name: "dummy", } + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, want: sreconcile.ResultEmpty, wantErr: true, assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid 'dummy' secret data: required fields"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -664,12 +764,16 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { bucketName: "dummy", beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.BucketName = "invalid" + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, want: sreconcile.ResultEmpty, wantErr: true, assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "bucket 'invalid' not found"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -677,12 +781,16 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { beforeFunc: func(obj *sourcev1.Bucket) { obj.Spec.Endpoint = "transient.example.com" obj.Spec.BucketName = "unavailable" + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, want: sreconcile.ResultEmpty, wantErr: true, assertIndex: newEtagIndex(), assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.BucketOperationFailedReason, "failed to confirm existence of 'unavailable' bucket"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -715,8 +823,8 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '9fc2ddfc4a6f44e6c3efee40af36578b9e76d4d930eaf384b8435a0aa0bf7a0f'"), }, }, { @@ -754,8 +862,8 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { }, }, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision '117f586dc64cfc559329e21d286edcbb94cb6b1581517eaddc0ab5292b470cd5'"), }, }, { @@ -765,6 +873,8 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { obj.Status.Artifact = &sourcev1.Artifact{ Revision: "b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479", } + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, bucketObjects: []*gcsmock.Object{ { @@ -780,7 +890,10 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { "test.txt": "098f6bcd4621d373cade4e832627b4f6", }, }, - assertConditions: []metav1.Condition{}, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), + }, }, { name: "Removes FetchFailedCondition after reconciling source", @@ -802,9 +915,38 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { "test.txt": "098f6bcd4621d373cade4e832627b4f6", }, }, + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + }, + }, + { + name: "Existing artifact makes ArtifactOutdated=True", + bucketName: "dummy", + bucketObjects: []*gcsmock.Object{ + { + Key: "test.txt", + ContentType: "text/plain", + Content: []byte("test"), + Generation: 3, + }, + }, + beforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{ + Path: "some-path", + Revision: "some-rev", + } + }, + want: sreconcile.ResultSuccess, + assertIndex: &etagIndex{ + index: map[string]string{ + "test.txt": "098f6bcd4621d373cade4e832627b4f6", + }, + }, assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), - *conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), + *conditions.UnknownCondition(meta.ReadyCondition, meta.ProgressingReason, "building artifact: new upstream revision 'b4c2a60ce44b67f5b659a95ce4e4cc9e2a86baf13afb72bd397c5384cbc0e479'"), }, }, // TODO: Middleware for mock server to test authentication using secret. @@ -821,6 +963,7 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { EventRecorder: record.NewFakeRecorder(32), Client: builder.Build(), Storage: testStorage, + patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } tmpDir := t.TempDir() @@ -830,7 +973,8 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { Kind: sourcev1.BucketKind, }, ObjectMeta: metav1.ObjectMeta{ - Name: "test-bucket", + Name: "test-bucket", + Generation: 1, }, Spec: sourcev1.BucketSpec{ BucketName: tt.bucketName, @@ -860,15 +1004,25 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { g.Expect(os.Unsetenv(EnvGcpStorageHost)).ToNot(HaveOccurred()) }() - index := newEtagIndex() + g.Expect(r.Client.Create(context.TODO(), obj)).ToNot(HaveOccurred()) + defer func() { + g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred()) + }() - got, err := r.reconcileSource(context.TODO(), obj, index, tmpDir) + index := newEtagIndex() + sp := patch.NewSerialPatcher(obj, r.Client) + + got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir) t.Log(err) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) g.Expect(index.Index()).To(Equal(tt.assertIndex.Index())) g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions)) + + // In-progress status condition validity. + checker := conditionscheck.NewInProgressChecker(r.Client) + checker.CheckErr(ctx, obj) }) } } @@ -886,10 +1040,14 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { name: "Archiving artifact to storage makes ArtifactInStorage=True", beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -899,6 +1057,8 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { obj.Spec.Interval = metav1.Duration{Duration: interval} // Incomplete artifact obj.Status.Artifact = &sourcev1.Artifact{Revision: revision} + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) { // Still incomplete @@ -906,7 +1066,9 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -914,16 +1076,22 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "") + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { name: "Creates latest symlink to the created artifact", beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { obj.Spec.Interval = metav1.Duration{Duration: interval} + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) { localPath := testStorage.LocalPath(*obj.GetArtifact()) @@ -934,18 +1102,24 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { }, want: sreconcile.ResultSuccess, assertConditions: []metav1.Condition{ - *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact for revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(sourcev1.ArtifactInStorageCondition, meta.SucceededReason, "stored artifact: revision 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { name: "Dir path deleted", beforeFunc: func(t *WithT, obj *sourcev1.Bucket, index *etagIndex, dir string) { t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred()) + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, want: sreconcile.ResultEmpty, wantErr: true, assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.StorageOperationFailedCondition, sourcev1.StatOperationFailedReason, "failed to stat source path"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, { @@ -957,6 +1131,8 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { f, err := os.Create(dir) defer f.Close() t.Expect(err).ToNot(HaveOccurred()) + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") }, afterFunc: func(t *WithT, obj *sourcev1.Bucket, dir string) { t.Expect(os.RemoveAll(dir)).ToNot(HaveOccurred()) @@ -965,6 +1141,8 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { wantErr: true, assertConditions: []metav1.Condition{ *conditions.TrueCondition(sourcev1.StorageOperationFailedCondition, sourcev1.InvalidPathReason, "is not a directory"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, } @@ -974,8 +1152,10 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { g := NewWithT(t) r := &BucketReconciler{ + Client: fakeclient.NewClientBuilder().WithScheme(testEnv.GetScheme()).Build(), EventRecorder: record.NewFakeRecorder(32), Storage: testStorage, + patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } tmpDir := t.TempDir() @@ -1000,7 +1180,14 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { tt.beforeFunc(g, obj, index, tmpDir) } - got, err := r.reconcileArtifact(context.TODO(), obj, index, tmpDir) + g.Expect(r.Client.Create(context.TODO(), obj)).ToNot(HaveOccurred()) + defer func() { + g.Expect(r.Client.Delete(context.TODO(), obj)).ToNot(HaveOccurred()) + }() + + sp := patch.NewSerialPatcher(obj, r.Client) + + got, err := r.reconcileArtifact(context.TODO(), sp, obj, index, tmpDir) g.Expect(err != nil).To(Equal(tt.wantErr)) g.Expect(got).To(Equal(tt.want)) @@ -1011,6 +1198,10 @@ func TestBucketReconciler_reconcileArtifact(t *testing.T) { if tt.afterFunc != nil { tt.afterFunc(g, obj, tmpDir) } + + // In-progress status condition validity. + checker := conditionscheck.NewInProgressChecker(r.Client) + checker.CheckErr(ctx, obj) }) } } @@ -1128,8 +1319,7 @@ func TestBucketReconciler_statusConditions(t *testing.T) { clientBuilder.WithObjects(obj) c := clientBuilder.Build() - patchHelper, err := patch.NewHelper(obj, c) - g.Expect(err).ToNot(HaveOccurred()) + serialPatcher := patch.NewSerialPatcher(obj, c) if tt.beforeFunc != nil { tt.beforeFunc(obj) @@ -1139,7 +1329,7 @@ func TestBucketReconciler_statusConditions(t *testing.T) { recResult := sreconcile.ResultSuccess var retErr error - summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), patchHelper) + summarizeHelper := summarize.NewHelper(record.NewFakeRecorder(32), serialPatcher) summarizeOpts := []summarize.Option{ summarize.WithConditions(bucketReadyCondition), summarize.WithReconcileResult(recResult), @@ -1247,6 +1437,7 @@ func TestBucketReconciler_notify(t *testing.T) { reconciler := &BucketReconciler{ EventRecorder: recorder, + patchOptions: getPatchOptions(bucketReadyCondition.Owned, "sc"), } index := &etagIndex{ index: map[string]string{