Add notify() in all the reconcilers

notify() is used to emit events for new artifact and failure recovery
scenarios. It's implemented in all the reconcilers.
Previously, when there used to be a failure due to any reason, on a
subsequent successful reconciliation, no notification was sent to
indicate that the failure has been resolved.
With notify(), the old version of the object is compared with the new
version of the object to determine if all, if any, of the failures have
been resolved and a notification is sent. The notification message is
the same that's sent in usual successful source reconciliation message
about stored artifact.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
This commit is contained in:
Sunny 2022-03-18 20:26:01 +05:30
parent 73aa3c4511
commit 5da74ca5a9
No known key found for this signature in database
GPG Key ID: 9F3D25DDFF7FA3CF
10 changed files with 724 additions and 35 deletions

View File

@ -99,6 +99,12 @@ var bucketReadyCondition = summarize.Conditions{
}, },
} }
// bucketFailConditions contains the conditions that represent a failure.
var bucketFailConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.StorageOperationFailedCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
@ -307,10 +313,13 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
return return
} }
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the // reconcile iterates through the bucketReconcileFunc tasks for the
// object. It returns early on the first call that returns // object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error. // reconcile.ResultRequeue, or produces an error.
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) { func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs.
if obj.Generation != obj.Status.ObservedGeneration { if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
} }
@ -355,9 +364,42 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
// Prioritize requeue request in the result. // Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult) res = sreconcile.LowestRequeuingResult(res, recResult)
} }
r.notify(oldObj, obj, index, res, resErr)
return res, resErr return res, resErr
} }
// notify emits notification related to the reconciliation.
func (r *BucketReconciler) notify(oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}
var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}
message := fmt.Sprintf("stored artifact with %d fetched files from '%s' bucket", index.Len(), newObj.Spec.BucketName)
// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
}
}
}
}
// reconcileStorage ensures the current state of the storage matches the // reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state. // desired and previously observed state.
// //
@ -574,10 +616,6 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e return sreconcile.ResultEmpty, e
} }
r.annotatedEventLogf(ctx, obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName)
// Record it on the object // Record it on the object
obj.Status.Artifact = artifact.DeepCopy() obj.Status.Artifact = artifact.DeepCopy()

View File

@ -18,6 +18,7 @@ package controllers
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
@ -1171,3 +1172,117 @@ func TestBucketReconciler_statusConditions(t *testing.T) {
}) })
} }
} }
func TestBucketReconciler_notify(t *testing.T) {
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.Bucket)
newObjBeforeFunc func(obj *sourcev1.Bucket)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
},
wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal Succeeded stored artifact with 2 fetched files from",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)
oldObj := &sourcev1.Bucket{
Spec: sourcev1.BucketSpec{
BucketName: "test-bucket",
},
}
newObj := oldObj.DeepCopy()
if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}
reconciler := &BucketReconciler{
EventRecorder: recorder,
}
index := &etagIndex{
index: map[string]string{
"zzz": "qqq",
"bbb": "ddd",
},
}
reconciler.notify(oldObj, newObj, index, tt.res, tt.resErr)
select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}

View File

@ -91,6 +91,13 @@ var gitRepositoryReadyCondition = summarize.Conditions{
}, },
} }
// gitRepositoryFailConditions contains the conditions that represent a failure.
var gitRepositoryFailConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.StorageOperationFailedCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete
@ -212,6 +219,8 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// object. It returns early on the first call that returns // object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error. // reconcile.ResultRequeue, or produces an error.
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepositoryReconcileFunc) (sreconcile.Result, error) { func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepositoryReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs // Mark as reconciling if generation differs
if obj.Generation != obj.Status.ObservedGeneration { if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
@ -258,9 +267,42 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
// Prioritize requeue request in the result. // Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult) res = sreconcile.LowestRequeuingResult(res, recResult)
} }
r.notify(oldObj, obj, commit, res, resErr)
return res, resErr return res, resErr
} }
// notify emits notification related to the reconciliation.
func (r *GitRepositoryReconciler) notify(oldObj, newObj *sourcev1.GitRepository, commit git.Commit, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}
var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}
message := fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage())
// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
}
}
}
}
// reconcileStorage ensures the current state of the storage matches the // reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state. // desired and previously observed state.
// //
@ -523,10 +565,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context,
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e return sreconcile.ResultEmpty, e
} }
r.AnnotatedEventf(obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage())
// Record it on the object // Record it on the object
obj.Status.Artifact = artifact.DeepCopy() obj.Status.Artifact = artifact.DeepCopy()

View File

@ -18,6 +18,7 @@ package controllers
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
@ -1776,3 +1777,109 @@ func TestGitRepositoryReconciler_statusConditions(t *testing.T) {
}) })
} }
} }
func TestGitRepositoryReconciler_notify(t *testing.T) {
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.GitRepository)
newObjBeforeFunc func(obj *sourcev1.GitRepository)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
},
wantEvent: "Normal NewArtifact stored artifact for commit",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal Succeeded stored artifact for commit",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal NewArtifact stored artifact for commit",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)
oldObj := &sourcev1.GitRepository{}
newObj := oldObj.DeepCopy()
if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}
reconciler := &GitRepositoryReconciler{
EventRecorder: recorder,
}
commit := &git.Commit{
Message: "test commit",
}
reconciler.notify(oldObj, newObj, *commit, tt.res, tt.resErr)
select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}

View File

@ -99,6 +99,13 @@ var helmChartReadyCondition = summarize.Conditions{
}, },
} }
// helmChartFailConditions contains the conditions that represent a failure.
var helmChartFailConditions = []string{
sourcev1.BuildFailedCondition,
sourcev1.FetchFailedCondition,
sourcev1.StorageOperationFailedCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/finalizers,verbs=get;create;update;patch;delete
@ -239,10 +246,13 @@ func (r *HelmChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return return
} }
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the // reconcile iterates through the helmChartReconcileFunc tasks for the
// object. It returns early on the first call that returns // object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error. // reconcile.ResultRequeue, or produces an error.
func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmChart, reconcilers []helmChartReconcileFunc) (sreconcile.Result, error) { func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmChart, reconcilers []helmChartReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs.
if obj.Generation != obj.Status.ObservedGeneration { if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
} }
@ -269,9 +279,40 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC
// Prioritize requeue request in the result. // Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult) res = sreconcile.LowestRequeuingResult(res, recResult)
} }
r.notify(oldObj, obj, &build, res, resErr)
return res, resErr return res, resErr
} }
// notify emits notification related to the reconciliation.
func (r *HelmChartReconciler) notify(oldObj, newObj *sourcev1.HelmChart, build *chart.Build, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}
var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}
// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
reasonForBuild(build), build.Summary())
} else {
if sreconcile.FailureRecovery(oldObj, newObj, helmChartFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
reasonForBuild(build), build.Summary())
}
}
}
}
// reconcileStorage ensures the current state of the storage matches the // reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state. // desired and previously observed state.
// //
@ -714,12 +755,6 @@ func (r *HelmChartReconciler) reconcileArtifact(ctx context.Context, obj *source
obj.Status.Artifact = artifact.DeepCopy() obj.Status.Artifact = artifact.DeepCopy()
obj.Status.ObservedChartName = b.Name obj.Status.ObservedChartName = b.Name
// Publish an event
r.AnnotatedEventf(obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, reasonForBuild(b), b.Summary())
// Update symlink on a "best effort" basis // Update symlink on a "best effort" basis
symURL, err := r.Storage.Symlink(artifact, "latest.tar.gz") symURL, err := r.Storage.Symlink(artifact, "latest.tar.gz")
if err != nil { if err != nil {

View File

@ -1585,3 +1585,112 @@ func TestHelmChartReconciler_statusConditions(t *testing.T) {
}) })
} }
} }
func TestHelmChartReconciler_notify(t *testing.T) {
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.HelmChart)
newObjBeforeFunc func(obj *sourcev1.HelmChart)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
},
wantEvent: "Normal ChartPackageSucceeded packaged",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal ChartPackageSucceeded packaged",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal ChartPackageSucceeded packaged",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)
oldObj := &sourcev1.HelmChart{}
newObj := oldObj.DeepCopy()
if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}
reconciler := &HelmChartReconciler{
EventRecorder: recorder,
}
build := &chart.Build{
Name: "foo",
Version: "1.0.0",
Path: "some/path",
Packaged: true,
}
reconciler.notify(oldObj, newObj, build, tt.res, tt.resErr)
select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}

View File

@ -22,7 +22,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/url" "net/url"
"os"
"time" "time"
"github.com/docker/go-units" "github.com/docker/go-units"
@ -82,6 +81,13 @@ var helmRepositoryReadyCondition = summarize.Conditions{
}, },
} }
// helmRepositoryFailConditions contains the conditions that represent a
// failure.
var helmRepositoryFailConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.StorageOperationFailedCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/finalizers,verbs=get;create;update;patch;delete
@ -195,10 +201,13 @@ func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return return
} }
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the // reconcile iterates through the helmRepositoryReconcileFunc tasks for the
// object. It returns early on the first call that returns // object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error. // reconcile.ResultRequeue, or produces an error.
func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmRepository, reconcilers []helmRepositoryReconcileFunc) (sreconcile.Result, error) { func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmRepository, reconcilers []helmRepositoryReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs.
if obj.Generation != obj.Status.ObservedGeneration { if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
} }
@ -225,9 +234,44 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.
// Prioritize requeue request in the result for successful results. // Prioritize requeue request in the result for successful results.
res = sreconcile.LowestRequeuingResult(res, recResult) res = sreconcile.LowestRequeuingResult(res, recResult)
} }
r.notify(oldObj, obj, chartRepo, res, resErr)
return res, resErr return res, resErr
} }
// notify emits notification related to the reconciliation.
func (r *HelmRepositoryReconciler) notify(oldObj, newObj *sourcev1.HelmRepository, chartRepo repository.ChartRepository, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}
size := units.HumanSize(float64(*newObj.Status.Artifact.Size))
var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}
message := fmt.Sprintf("stored fetched index of size %s from '%s'", size, chartRepo.URL)
// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, helmRepositoryFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
}
}
}
}
// reconcileStorage ensures the current state of the storage matches the // reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state. // desired and previously observed state.
// //
@ -448,23 +492,6 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s
return sreconcile.ResultEmpty, e return sreconcile.ResultEmpty, e
} }
// Calculate the artifact size to be included in the NewArtifact event.
fi, err := os.Stat(chartRepo.CachePath)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("unable to read the artifact: %w", err),
Reason: sourcev1.ReadOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
size := units.HumanSize(float64(fi.Size()))
r.AnnotatedEventf(obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "fetched index of size %s from '%s'", size, chartRepo.URL)
// Record it on the object. // Record it on the object.
obj.Status.Artifact = artifact.DeepCopy() obj.Status.Artifact = artifact.DeepCopy()

View File

@ -18,6 +18,7 @@ package controllers
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
@ -842,3 +843,110 @@ func TestHelmRepositoryReconciler_statusConditions(t *testing.T) {
}) })
} }
} }
func TestHelmRepositoryReconciler_notify(t *testing.T) {
var aSize int64 = 30000
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.HelmRepository)
newObjBeforeFunc func(obj *sourcev1.HelmRepository)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
},
wantEvent: "Normal NewArtifact stored fetched index of size",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal Succeeded stored fetched index of size",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb", Size: &aSize}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal NewArtifact stored fetched index of size",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)
oldObj := &sourcev1.HelmRepository{}
newObj := oldObj.DeepCopy()
if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}
reconciler := &HelmRepositoryReconciler{
EventRecorder: recorder,
}
chartRepo := repository.ChartRepository{
URL: "some-address",
}
reconciler.notify(oldObj, newObj, chartRepo, tt.res, tt.resErr)
select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}

View File

@ -158,3 +158,19 @@ func LowestRequeuingResult(i, j Result) Result {
return j return j
} }
} }
// FailureRecovery finds out if a failure recovery occurred by checking the fail
// conditions in the old object and the new object.
func FailureRecovery(oldObj, newObj conditions.Getter, failConditions []string) bool {
failuresBefore := 0
for _, failCondition := range failConditions {
if conditions.Get(oldObj, failCondition) != nil {
failuresBefore++
}
if conditions.Get(newObj, failCondition) != nil {
// Short-circuit, there is failure now, can't be a recovery.
return false
}
}
return failuresBefore > 0
}

View File

@ -202,3 +202,99 @@ func TestComputeReconcileResult(t *testing.T) {
}) })
} }
} }
func TestFailureRecovery(t *testing.T) {
failCondns := []string{
"FooFailed",
"BarFailed",
"BazFailed",
}
tests := []struct {
name string
oldObjFunc func(obj conditions.Setter)
newObjFunc func(obj conditions.Setter)
failConditions []string
result bool
}{
{
name: "no failures",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: false,
},
{
name: "no recovery",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "FooFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "FooFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: false,
},
{
name: "different failure",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "FooFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "BarFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: false,
},
{
name: "failure recovery",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "FooFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: true,
},
{
name: "ready to fail",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "BazFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
oldObj := &sourcev1.GitRepository{}
newObj := oldObj.DeepCopy()
if tt.oldObjFunc != nil {
tt.oldObjFunc(oldObj)
}
if tt.newObjFunc != nil {
tt.newObjFunc(newObj)
}
g.Expect(FailureRecovery(oldObj, newObj, tt.failConditions)).To(Equal(tt.result))
})
}
}