Merge pull request #624 from fluxcd/recovery-event

Add notify() in all the reconcilers
This commit is contained in:
Sunny 2022-04-07 22:32:45 +05:30 committed by GitHub
commit f2ae5784a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 724 additions and 35 deletions

View File

@ -99,6 +99,12 @@ var bucketReadyCondition = summarize.Conditions{
},
}
// bucketFailConditions contains the conditions that represent a failure.
var bucketFailConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.StorageOperationFailedCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
@ -307,10 +313,13 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
return
}
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the
// reconcile iterates through the bucketReconcileFunc tasks for the
// object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error.
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs.
if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
}
@ -355,9 +364,42 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
// Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult)
}
r.notify(oldObj, obj, index, res, resErr)
return res, resErr
}
// notify emits notification related to the reconciliation.
func (r *BucketReconciler) notify(oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}
var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}
message := fmt.Sprintf("stored artifact with %d fetched files from '%s' bucket", index.Len(), newObj.Spec.BucketName)
// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
}
}
}
}
// reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state.
//
@ -574,10 +616,6 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
r.annotatedEventLogf(ctx, obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName)
// Record it on the object
obj.Status.Artifact = artifact.DeepCopy()

View File

@ -18,6 +18,7 @@ package controllers
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
@ -1171,3 +1172,117 @@ func TestBucketReconciler_statusConditions(t *testing.T) {
})
}
}
func TestBucketReconciler_notify(t *testing.T) {
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.Bucket)
newObjBeforeFunc func(obj *sourcev1.Bucket)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
},
wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal Succeeded stored artifact with 2 fetched files from",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)
oldObj := &sourcev1.Bucket{
Spec: sourcev1.BucketSpec{
BucketName: "test-bucket",
},
}
newObj := oldObj.DeepCopy()
if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}
reconciler := &BucketReconciler{
EventRecorder: recorder,
}
index := &etagIndex{
index: map[string]string{
"zzz": "qqq",
"bbb": "ddd",
},
}
reconciler.notify(oldObj, newObj, index, tt.res, tt.resErr)
select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}

View File

@ -91,6 +91,13 @@ var gitRepositoryReadyCondition = summarize.Conditions{
},
}
// gitRepositoryFailConditions contains the conditions that represent a failure.
var gitRepositoryFailConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.StorageOperationFailedCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete
@ -212,6 +219,8 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error.
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepositoryReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs
if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
@ -258,9 +267,42 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
// Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult)
}
r.notify(oldObj, obj, commit, res, resErr)
return res, resErr
}
// notify emits notification related to the reconciliation.
func (r *GitRepositoryReconciler) notify(oldObj, newObj *sourcev1.GitRepository, commit git.Commit, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}
var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}
message := fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage())
// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
}
}
}
}
// reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state.
//
@ -523,10 +565,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context,
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
r.AnnotatedEventf(obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage())
// Record it on the object
obj.Status.Artifact = artifact.DeepCopy()

View File

@ -18,6 +18,7 @@ package controllers
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
@ -1776,3 +1777,109 @@ func TestGitRepositoryReconciler_statusConditions(t *testing.T) {
})
}
}
func TestGitRepositoryReconciler_notify(t *testing.T) {
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.GitRepository)
newObjBeforeFunc func(obj *sourcev1.GitRepository)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
},
wantEvent: "Normal NewArtifact stored artifact for commit",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal Succeeded stored artifact for commit",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal NewArtifact stored artifact for commit",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.GitRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)
oldObj := &sourcev1.GitRepository{}
newObj := oldObj.DeepCopy()
if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}
reconciler := &GitRepositoryReconciler{
EventRecorder: recorder,
}
commit := &git.Commit{
Message: "test commit",
}
reconciler.notify(oldObj, newObj, *commit, tt.res, tt.resErr)
select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}

View File

@ -99,6 +99,13 @@ var helmChartReadyCondition = summarize.Conditions{
},
}
// helmChartFailConditions contains the conditions that represent a failure.
var helmChartFailConditions = []string{
sourcev1.BuildFailedCondition,
sourcev1.FetchFailedCondition,
sourcev1.StorageOperationFailedCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/finalizers,verbs=get;create;update;patch;delete
@ -239,10 +246,13 @@ func (r *HelmChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return
}
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the
// reconcile iterates through the helmChartReconcileFunc tasks for the
// object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error.
func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmChart, reconcilers []helmChartReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs.
if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
}
@ -269,9 +279,40 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC
// Prioritize requeue request in the result.
res = sreconcile.LowestRequeuingResult(res, recResult)
}
r.notify(oldObj, obj, &build, res, resErr)
return res, resErr
}
// notify emits notification related to the reconciliation.
func (r *HelmChartReconciler) notify(oldObj, newObj *sourcev1.HelmChart, build *chart.Build, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}
var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}
// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
reasonForBuild(build), build.Summary())
} else {
if sreconcile.FailureRecovery(oldObj, newObj, helmChartFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
reasonForBuild(build), build.Summary())
}
}
}
}
// reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state.
//
@ -714,12 +755,6 @@ func (r *HelmChartReconciler) reconcileArtifact(ctx context.Context, obj *source
obj.Status.Artifact = artifact.DeepCopy()
obj.Status.ObservedChartName = b.Name
// Publish an event
r.AnnotatedEventf(obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, reasonForBuild(b), b.Summary())
// Update symlink on a "best effort" basis
symURL, err := r.Storage.Symlink(artifact, "latest.tar.gz")
if err != nil {

View File

@ -1585,3 +1585,112 @@ func TestHelmChartReconciler_statusConditions(t *testing.T) {
})
}
}
func TestHelmChartReconciler_notify(t *testing.T) {
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.HelmChart)
newObjBeforeFunc func(obj *sourcev1.HelmChart)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
},
wantEvent: "Normal ChartPackageSucceeded packaged",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal ChartPackageSucceeded packaged",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal ChartPackageSucceeded packaged",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.HelmChart) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)
oldObj := &sourcev1.HelmChart{}
newObj := oldObj.DeepCopy()
if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}
reconciler := &HelmChartReconciler{
EventRecorder: recorder,
}
build := &chart.Build{
Name: "foo",
Version: "1.0.0",
Path: "some/path",
Packaged: true,
}
reconciler.notify(oldObj, newObj, build, tt.res, tt.resErr)
select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}

View File

@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"net/url"
"os"
"time"
"github.com/docker/go-units"
@ -82,6 +81,13 @@ var helmRepositoryReadyCondition = summarize.Conditions{
},
}
// helmRepositoryFailConditions contains the conditions that represent a
// failure.
var helmRepositoryFailConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.StorageOperationFailedCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/finalizers,verbs=get;create;update;patch;delete
@ -195,10 +201,13 @@ func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return
}
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the
// reconcile iterates through the helmRepositoryReconcileFunc tasks for the
// object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error.
func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmRepository, reconcilers []helmRepositoryReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs.
if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
}
@ -225,9 +234,44 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.
// Prioritize requeue request in the result for successful results.
res = sreconcile.LowestRequeuingResult(res, recResult)
}
r.notify(oldObj, obj, chartRepo, res, resErr)
return res, resErr
}
// notify emits notification related to the reconciliation.
func (r *HelmRepositoryReconciler) notify(oldObj, newObj *sourcev1.HelmRepository, chartRepo repository.ChartRepository, res sreconcile.Result, resErr error) {
// Notify successful reconciliation for new artifact and recovery from any
// failure.
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
annotations := map[string]string{
sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum,
}
size := units.HumanSize(float64(*newObj.Status.Artifact.Size))
var oldChecksum string
if oldObj.GetArtifact() != nil {
oldChecksum = oldObj.GetArtifact().Checksum
}
message := fmt.Sprintf("stored fetched index of size %s from '%s'", size, chartRepo.URL)
// Notify on new artifact and failure recovery.
if oldChecksum != newObj.GetArtifact().Checksum {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
"NewArtifact", message)
} else {
if sreconcile.FailureRecovery(oldObj, newObj, helmRepositoryFailConditions) {
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
meta.SucceededReason, message)
}
}
}
}
// reconcileStorage ensures the current state of the storage matches the
// desired and previously observed state.
//
@ -448,23 +492,6 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s
return sreconcile.ResultEmpty, e
}
// Calculate the artifact size to be included in the NewArtifact event.
fi, err := os.Stat(chartRepo.CachePath)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("unable to read the artifact: %w", err),
Reason: sourcev1.ReadOperationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
size := units.HumanSize(float64(fi.Size()))
r.AnnotatedEventf(obj, map[string]string{
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
}, corev1.EventTypeNormal, "NewArtifact", "fetched index of size %s from '%s'", size, chartRepo.URL)
// Record it on the object.
obj.Status.Artifact = artifact.DeepCopy()

View File

@ -18,6 +18,7 @@ package controllers
import (
"context"
"errors"
"fmt"
"net/http"
"os"
@ -842,3 +843,110 @@ func TestHelmRepositoryReconciler_statusConditions(t *testing.T) {
})
}
}
func TestHelmRepositoryReconciler_notify(t *testing.T) {
var aSize int64 = 30000
tests := []struct {
name string
res sreconcile.Result
resErr error
oldObjBeforeFunc func(obj *sourcev1.HelmRepository)
newObjBeforeFunc func(obj *sourcev1.HelmRepository)
wantEvent string
}{
{
name: "error - no event",
res: sreconcile.ResultEmpty,
resErr: errors.New("some error"),
},
{
name: "new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
newObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
},
wantEvent: "Normal NewArtifact stored fetched index of size",
},
{
name: "recovery from failure",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal Succeeded stored fetched index of size",
},
{
name: "recovery and new artifact",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
},
newObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb", Size: &aSize}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
wantEvent: "Normal NewArtifact stored fetched index of size",
},
{
name: "no updates",
res: sreconcile.ResultSuccess,
resErr: nil,
oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjBeforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
recorder := record.NewFakeRecorder(32)
oldObj := &sourcev1.HelmRepository{}
newObj := oldObj.DeepCopy()
if tt.oldObjBeforeFunc != nil {
tt.oldObjBeforeFunc(oldObj)
}
if tt.newObjBeforeFunc != nil {
tt.newObjBeforeFunc(newObj)
}
reconciler := &HelmRepositoryReconciler{
EventRecorder: recorder,
}
chartRepo := repository.ChartRepository{
URL: "some-address",
}
reconciler.notify(oldObj, newObj, chartRepo, tt.res, tt.resErr)
select {
case x, ok := <-recorder.Events:
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
if tt.wantEvent != "" {
g.Expect(x).To(ContainSubstring(tt.wantEvent))
}
default:
if tt.wantEvent != "" {
t.Errorf("expected some event to be emitted")
}
}
})
}
}

View File

@ -158,3 +158,19 @@ func LowestRequeuingResult(i, j Result) Result {
return j
}
}
// FailureRecovery finds out if a failure recovery occurred by checking the fail
// conditions in the old object and the new object.
func FailureRecovery(oldObj, newObj conditions.Getter, failConditions []string) bool {
failuresBefore := 0
for _, failCondition := range failConditions {
if conditions.Get(oldObj, failCondition) != nil {
failuresBefore++
}
if conditions.Get(newObj, failCondition) != nil {
// Short-circuit, there is failure now, can't be a recovery.
return false
}
}
return failuresBefore > 0
}

View File

@ -202,3 +202,99 @@ func TestComputeReconcileResult(t *testing.T) {
})
}
}
func TestFailureRecovery(t *testing.T) {
failCondns := []string{
"FooFailed",
"BarFailed",
"BazFailed",
}
tests := []struct {
name string
oldObjFunc func(obj conditions.Setter)
newObjFunc func(obj conditions.Setter)
failConditions []string
result bool
}{
{
name: "no failures",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: false,
},
{
name: "no recovery",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "FooFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "FooFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: false,
},
{
name: "different failure",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "FooFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "BarFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: false,
},
{
name: "failure recovery",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "FooFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: true,
},
{
name: "ready to fail",
oldObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
newObjFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "BazFailed", "some-reason", "message")
conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
},
failConditions: failCondns,
result: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
oldObj := &sourcev1.GitRepository{}
newObj := oldObj.DeepCopy()
if tt.oldObjFunc != nil {
tt.oldObjFunc(oldObj)
}
if tt.newObjFunc != nil {
tt.newObjFunc(newObj)
}
g.Expect(FailureRecovery(oldObj, newObj, tt.failConditions)).To(Equal(tt.result))
})
}
}