Make generic SummarizeAndPatch()

summarizeAndPatch() was used by all the reconcilers with their own
object type. This creates a generic SummarizeAndPatch helper that takes
a conditions.Setter object and performs the same operations. All the
reconcilers are updated to use SummarizeAndPatch(). The process of
summarize and patch can be configured using the HelperOptions.

Introduce ResultProcessor to allow injecting middlewares in the
SummarizeAndPatch process.

Introduce RuntimeResultBuilder to allow defining how the reconciliation
result is computed for specific reconciler. This enabled different
reconcilers to have different meanings of the reconciliation results.

Introduce Conditions in summary package to store all the status
conditions related information of a reconciler. This is passed to
SummarizeAndPatch() to be used for summary and patch calculation.

Remove all the redundant summarizeAndPatch() tests per reconciler.

Add package internal/object containing helpers for interacting with
runtime.Object needed by the generic SummarizeAndPatch().

Add tests for ComputeReconcileResult().

Signed-off-by: Sunny <darkowlzz@protonmail.com>
This commit is contained in:
Sunny 2022-02-04 16:31:42 +05:30 committed by Hidde Beydals
parent 9b5613732f
commit d997876b07
17 changed files with 1474 additions and 687 deletions

View File

@ -37,10 +37,8 @@ import (
"golang.org/x/sync/semaphore"
"google.golang.org/api/option"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -57,33 +55,33 @@ import (
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
serror "github.com/fluxcd/source-controller/internal/error"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/pkg/sourceignore"
)
// Status conditions owned by Bucket reconciler.
var bucketOwnedConditions = []string{
sourcev1.ArtifactOutdatedCondition,
sourcev1.FetchFailedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
}
// Conditions that Ready condition is influenced by in descending order of their
// priority.
var bucketReadyDeps = []string{
sourcev1.ArtifactOutdatedCondition,
sourcev1.FetchFailedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
}
// Negative conditions that Ready condition is influenced by.
var bucketReadyDepsNegative = []string{
sourcev1.ArtifactOutdatedCondition,
sourcev1.FetchFailedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
// bucketReadyConditions contains all the conditions information needed
// for Bucket Ready status conditions summary calculation.
var bucketReadyConditions = summarize.Conditions{
Target: meta.ReadyCondition,
Owned: []string{
sourcev1.ArtifactOutdatedCondition,
sourcev1.FetchFailedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
Summarize: []string{
sourcev1.ArtifactOutdatedCondition,
sourcev1.FetchFailedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
NegativePolarity: []string{
sourcev1.ArtifactOutdatedCondition,
sourcev1.FetchFailedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
@ -151,7 +149,19 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
// Always attempt to patch the object and status after each reconciliation
// NOTE: The final runtime result and error are set in this block.
defer func() {
result, retErr = r.summarizeAndPatch(ctx, obj, patchHelper, recResult, retErr)
summarizeHelper := summarize.NewHelper(r.EventRecorder, patchHelper)
summarizeOpts := []summarize.Option{
summarize.WithConditions(bucketReadyConditions),
summarize.WithReconcileResult(recResult),
summarize.WithReconcileError(retErr),
summarize.WithIgnoreNotFound(),
summarize.WithProcessors(
summarize.RecordContextualError,
summarize.RecordReconcileReq,
),
summarize.WithResultBuilder(sreconcile.AlwaysRequeueResultBuilder{RequeueAfter: obj.GetInterval().Duration}),
}
result, retErr = summarizeHelper.SummarizeAndPatch(ctx, obj, summarizeOpts...)
// Always record readiness and duration metrics
r.Metrics.RecordReadiness(ctx, obj)
@ -181,50 +191,6 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
return
}
// summarizeAndPatch analyzes the object conditions to create a summary of the
// status conditions, computes runtime results and patches the object in the K8s
// API server.
func (r *BucketReconciler) summarizeAndPatch(
ctx context.Context,
obj *sourcev1.Bucket,
patchHelper *patch.Helper,
res sreconcile.Result,
recErr error) (ctrl.Result, error) {
sreconcile.RecordContextualError(ctx, r.EventRecorder, obj, recErr)
// Record the value of the reconciliation request if any.
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
obj.Status.SetLastHandledReconcileRequest(v)
}
// Compute the reconcile results, obtain patch options and reconcile error.
var patchOpts []patch.Option
var result ctrl.Result
patchOpts, result, recErr = sreconcile.ComputeReconcileResult(obj, obj.GetRequeueAfter(), res, recErr, bucketOwnedConditions)
// Summarize the Ready condition based on abnormalities that may have been observed.
conditions.SetSummary(obj,
meta.ReadyCondition,
conditions.WithConditions(
bucketReadyDeps...,
),
conditions.WithNegativePolarityConditions(
bucketReadyDepsNegative...,
),
)
// Finally, patch the resource.
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
// Ignore patch error "not found" when the object is being deleted.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
}
recErr = kerrors.NewAggregate([]error{recErr, err})
}
return result, recErr
}
// reconcile steps iterates through the actual reconciliation tasks for objec,
// it returns early on the first step that returns ResultRequeue or produces an
// error.

View File

@ -34,6 +34,7 @@ import (
"github.com/darkowlzz/controller-check/status"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
. "github.com/onsi/gomega"
raw "google.golang.org/api/storage/v1"
corev1 "k8s.io/api/core/v1"
@ -125,10 +126,25 @@ func TestBucketReconciler_Reconcile(t *testing.T) {
}, timeout).Should(BeTrue())
// Check if the object status is valid.
condns := &status.Conditions{NegativePolarity: bucketReadyDepsNegative}
condns := &status.Conditions{NegativePolarity: bucketReadyConditions.NegativePolarity}
checker := status.NewChecker(testEnv.Client, testEnv.GetScheme(), condns)
checker.CheckErr(ctx, obj)
// Patch the object with reconcile request annotation.
patchHelper, err := patch.NewHelper(obj, testEnv.Client)
g.Expect(err).ToNot(HaveOccurred())
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
g.Expect(patchHelper.Patch(ctx, obj)).ToNot(HaveOccurred())
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
return obj.Status.LastHandledReconcileAt == "now"
}, timeout).Should(BeTrue())
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
// Wait for Bucket to be deleted

View File

@ -27,10 +27,8 @@ import (
securejoin "github.com/cyphar/filepath-securejoin"
"github.com/fluxcd/pkg/runtime/logger"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
@ -49,41 +47,41 @@ import (
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
serror "github.com/fluxcd/source-controller/internal/error"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/util"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/fluxcd/source-controller/pkg/git/strategy"
"github.com/fluxcd/source-controller/pkg/sourceignore"
)
// Status conditions owned by the GitRepository reconciler.
var gitRepoOwnedConditions = []string{
sourcev1.SourceVerifiedCondition,
sourcev1.FetchFailedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.ArtifactOutdatedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
}
// Conditions that Ready condition is influenced by in descending order of their
// priority.
var gitRepoReadyDeps = []string{
sourcev1.IncludeUnavailableCondition,
sourcev1.SourceVerifiedCondition,
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
}
// Negative conditions that Ready condition is influenced by.
var gitRepoReadyDepsNegative = []string{
sourcev1.FetchFailedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
// gitRepoReadyConditions contains all the conditions information needed
// for GitRepository Ready status conditions summary calculation.
var gitRepoReadyConditions = summarize.Conditions{
Target: meta.ReadyCondition,
Owned: []string{
sourcev1.SourceVerifiedCondition,
sourcev1.FetchFailedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.ArtifactOutdatedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
Summarize: []string{
sourcev1.IncludeUnavailableCondition,
sourcev1.SourceVerifiedCondition,
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
NegativePolarity: []string{
sourcev1.FetchFailedCondition,
sourcev1.IncludeUnavailableCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
@ -157,7 +155,19 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
// Always attempt to patch the object and status after each reconciliation
// NOTE: The final runtime result and error are set in this block.
defer func() {
result, retErr = r.summarizeAndPatch(ctx, obj, patchHelper, recResult, retErr)
summarizeHelper := summarize.NewHelper(r.EventRecorder, patchHelper)
summarizeOpts := []summarize.Option{
summarize.WithConditions(gitRepoReadyConditions),
summarize.WithReconcileResult(recResult),
summarize.WithReconcileError(retErr),
summarize.WithIgnoreNotFound(),
summarize.WithProcessors(
summarize.RecordContextualError,
summarize.RecordReconcileReq,
),
summarize.WithResultBuilder(sreconcile.AlwaysRequeueResultBuilder{RequeueAfter: obj.GetInterval().Duration}),
}
result, retErr = summarizeHelper.SummarizeAndPatch(ctx, obj, summarizeOpts...)
// Always record readiness and duration metrics
r.Metrics.RecordReadiness(ctx, obj)
@ -189,50 +199,6 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return
}
// summarizeAndPatch analyzes the object conditions to create a summary of the
// status conditions, computes runtime results and patches the object in the K8s
// API server.
func (r *GitRepositoryReconciler) summarizeAndPatch(
ctx context.Context,
obj *sourcev1.GitRepository,
patchHelper *patch.Helper,
res sreconcile.Result,
recErr error) (ctrl.Result, error) {
sreconcile.RecordContextualError(ctx, r.EventRecorder, obj, recErr)
// Record the value of the reconciliation request if any.
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
obj.Status.SetLastHandledReconcileRequest(v)
}
// Compute the reconcile results, obtain patch options and reconcile error.
var patchOpts []patch.Option
var result ctrl.Result
patchOpts, result, recErr = sreconcile.ComputeReconcileResult(obj, obj.GetRequeueAfter(), res, recErr, gitRepoOwnedConditions)
// Summarize the Ready condition based on abnormalities that may have been observed.
conditions.SetSummary(obj,
meta.ReadyCondition,
conditions.WithConditions(
gitRepoReadyDeps...,
),
conditions.WithNegativePolarityConditions(
gitRepoReadyDepsNegative...,
),
)
// Finally, patch the resource.
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
// Ignore patch error "not found" when the object is being deleted.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
}
recErr = kerrors.NewAggregate([]error{recErr, err})
}
return result, recErr
}
// reconcile steps iterates through the actual reconciliation tasks for objec,
// it returns early on the first step that returns ResultRequeue or produces an
// error.

View File

@ -30,6 +30,7 @@ import (
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/ssh"
"github.com/fluxcd/pkg/testserver"
"github.com/go-git/go-billy/v5/memfs"
@ -190,10 +191,25 @@ func TestGitRepositoryReconciler_Reconcile(t *testing.T) {
}, timeout).Should(BeTrue())
// Check if the object status is valid.
condns := &status.Conditions{NegativePolarity: gitRepoReadyDepsNegative}
condns := &status.Conditions{NegativePolarity: gitRepoReadyConditions.NegativePolarity}
checker := status.NewChecker(testEnv.Client, testEnv.GetScheme(), condns)
checker.CheckErr(ctx, obj)
// Patch the object with reconcile request annotation.
patchHelper, err := patch.NewHelper(obj, testEnv.Client)
g.Expect(err).ToNot(HaveOccurred())
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
g.Expect(patchHelper.Patch(ctx, obj)).ToNot(HaveOccurred())
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
return obj.Status.LastHandledReconcileAt == "now"
}, timeout).Should(BeTrue())
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
// Wait for GitRepository to be deleted

View File

@ -34,7 +34,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
@ -60,36 +59,36 @@ import (
"github.com/fluxcd/source-controller/internal/helm/getter"
"github.com/fluxcd/source-controller/internal/helm/repository"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
"github.com/fluxcd/source-controller/internal/util"
)
// Status conditions owned by the HelmChart reconciler.
var helmChartOwnedConditions = []string{
sourcev1.BuildFailedCondition,
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
}
// Conditions that Ready condition is influenced by in descending order of their
// priority.
var helmChartReadyDeps = []string{
sourcev1.BuildFailedCondition,
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
}
// Negative conditions that Ready condition is influenced by.
var helmChartReadyDepsNegative = []string{
sourcev1.BuildFailedCondition,
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
// helmChartReadyConditions contains all the conditions information
// needed for HelmChart Ready status conditions summary calculation.
var helmChartReadyConditions = summarize.Conditions{
Target: meta.ReadyCondition,
Owned: []string{
sourcev1.BuildFailedCondition,
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
Summarize: []string{
sourcev1.BuildFailedCondition,
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
NegativePolarity: []string{
sourcev1.BuildFailedCondition,
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete
@ -181,7 +180,19 @@ func (r *HelmChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Always attempt to patch the object after each reconciliation.
// NOTE: The final runtime result and error are set in this block.
defer func() {
result, retErr = r.summarizeAndPatch(ctx, obj, patchHelper, recResult, retErr)
summarizeHelper := summarize.NewHelper(r.EventRecorder, patchHelper)
summarizeOpts := []summarize.Option{
summarize.WithConditions(helmChartReadyConditions),
summarize.WithReconcileResult(recResult),
summarize.WithReconcileError(retErr),
summarize.WithIgnoreNotFound(),
summarize.WithProcessors(
summarize.RecordContextualError,
summarize.RecordReconcileReq,
),
summarize.WithResultBuilder(sreconcile.AlwaysRequeueResultBuilder{RequeueAfter: obj.GetInterval().Duration}),
}
result, retErr = summarizeHelper.SummarizeAndPatch(ctx, obj, summarizeOpts...)
// Always record readiness and duration metrics
r.Metrics.RecordReadiness(ctx, obj)
@ -212,49 +223,6 @@ func (r *HelmChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return
}
// summarizeAndPatch analyzes the object conditions to create a summary of the
// status conditions, computes runtime results and patches the object in the K8s
// API server.
func (r *HelmChartReconciler) summarizeAndPatch(
ctx context.Context,
obj *sourcev1.HelmChart,
patchHelper *patch.Helper,
res sreconcile.Result,
recErr error) (ctrl.Result, error) {
sreconcile.RecordContextualError(ctx, r.EventRecorder, obj, recErr)
// Record the value of the reconciliation request, if any
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
obj.Status.SetLastHandledReconcileRequest(v)
}
// Compute the reconcile results, obtain patch options and reconcile error.
var patchOpts []patch.Option
var result ctrl.Result
patchOpts, result, recErr = sreconcile.ComputeReconcileResult(obj, obj.GetRequeueAfter(), res, recErr, helmChartOwnedConditions)
// Summarize Ready condition
conditions.SetSummary(obj,
meta.ReadyCondition,
conditions.WithConditions(
helmChartReadyDeps...,
),
conditions.WithNegativePolarityConditions(
helmChartReadyDepsNegative...,
),
)
// Finally, patch the resource
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
// Ignore patch error "not found" when the object is being deleted.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
err = kerrors.FilterOut(err, func(e error) bool { return apierrs.IsNotFound(err) })
}
recErr = kerrors.NewAggregate([]error{recErr, err})
}
return result, recErr
}
// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
// produces an error.
func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmChart, reconcilers []helmChartReconcilerFunc) (sreconcile.Result, error) {

View File

@ -125,10 +125,25 @@ func TestHelmChartReconciler_Reconcile(t *testing.T) {
}, timeout).Should(BeTrue())
// Check if the object status is valid.
condns := &status.Conditions{NegativePolarity: helmChartReadyDepsNegative}
condns := &status.Conditions{NegativePolarity: helmChartReadyConditions.NegativePolarity}
checker := status.NewChecker(testEnv.Client, testEnv.GetScheme(), condns)
checker.CheckErr(ctx, obj)
// Patch the object with reconcile request annotation.
patchHelper, err := patch.NewHelper(obj, testEnv.Client)
g.Expect(err).ToNot(HaveOccurred())
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
g.Expect(patchHelper.Patch(ctx, obj)).ToNot(HaveOccurred())
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
return obj.Status.LastHandledReconcileAt == "now"
}, timeout).Should(BeTrue())
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
// Wait for HelmChart to be deleted
@ -1326,181 +1341,6 @@ func TestHelmChartReconciler_reconcileDelete(t *testing.T) {
g.Expect(obj.Status.Artifact).To(BeNil())
}
func TestHelmChartReconciler_summarizeAndPatch(t *testing.T) {
tests := []struct {
name string
generation int64
beforeFunc func(obj *sourcev1.HelmChart)
result sreconcile.Result
reconcileErr error
wantErr bool
afterFunc func(t *WithT, obj *sourcev1.HelmChart)
assertConditions []metav1.Condition
}{
// Success/Fail indicates if a reconciliation succeeded or failed. On
// a successful reconciliation, the object generation is expected to
// match the observed generation in the object status.
// All the cases have some Ready condition set, even if a test case is
// unrelated to the conditions, because it's necessary for a valid
// status.
{
name: "Success, no extra conditions",
generation: 4,
beforeFunc: func(obj *sourcev1.HelmChart) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "test-msg")
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "test-msg"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmChart) {
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(4)))
},
},
{
name: "Success, Ready=True",
generation: 5,
beforeFunc: func(obj *sourcev1.HelmChart) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "created")
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "created"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmChart) {
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(5)))
},
},
{
name: "Success, removes reconciling for successful result",
generation: 2,
beforeFunc: func(obj *sourcev1.HelmChart) {
conditions.MarkReconciling(obj, "NewRevision", "new index version")
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "stored artifact")
},
result: sreconcile.ResultSuccess,
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmChart) {
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(2)))
},
},
{
name: "Success, record reconciliation request",
beforeFunc: func(obj *sourcev1.HelmChart) {
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "test-msg")
},
generation: 3,
result: sreconcile.ResultSuccess,
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "test-msg"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmChart) {
t.Expect(obj.Status.LastHandledReconcileAt).To(Equal("now"))
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(3)))
},
},
{
name: "Fail, with multiple conditions ArtifactOutdated=True,Reconciling=True",
generation: 7,
beforeFunc: func(obj *sourcev1.HelmChart) {
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", "new index revision")
conditions.MarkReconciling(obj, "NewRevision", "new index revision")
},
reconcileErr: fmt.Errorf("failed to create dir"),
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, "NewRevision", "new index revision"),
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new index revision"),
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new index revision"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmChart) {
t.Expect(obj.Status.ObservedGeneration).ToNot(Equal(int64(7)))
},
},
{
name: "Success, with subreconciler stalled error",
generation: 9,
beforeFunc: func(obj *sourcev1.HelmChart) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.FetchFailedCondition, "failed to construct helm client")
},
reconcileErr: &serror.Stalling{Err: fmt.Errorf("some error"), Reason: "some reason"},
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, sourcev1.FetchFailedCondition, "failed to construct helm client"),
*conditions.TrueCondition(meta.StalledCondition, "some reason", "some error"),
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.FetchFailedCondition, "failed to construct helm client"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmChart) {
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(9)))
},
},
{
name: "Fail, no error but requeue requested",
generation: 3,
beforeFunc: func(obj *sourcev1.HelmChart) {
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "test-msg")
},
result: sreconcile.ResultRequeue,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, meta.FailedReason, "test-msg"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmChart) {
t.Expect(obj.Status.ObservedGeneration).ToNot(Equal(int64(3)))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
builder := fake.NewClientBuilder().WithScheme(testEnv.GetScheme())
r := &HelmChartReconciler{
Client: builder.Build(),
EventRecorder: record.NewFakeRecorder(32),
}
obj := &sourcev1.HelmChart{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
Generation: tt.generation,
},
Spec: sourcev1.HelmChartSpec{
Interval: metav1.Duration{Duration: 5 * time.Second},
},
}
if tt.beforeFunc != nil {
tt.beforeFunc(obj)
}
ctx := context.TODO()
g.Expect(r.Create(ctx, obj)).To(Succeed())
patchHelper, err := patch.NewHelper(obj, r.Client)
g.Expect(err).ToNot(HaveOccurred())
_, gotErr := r.summarizeAndPatch(ctx, obj, patchHelper, tt.result, tt.reconcileErr)
g.Expect(gotErr != nil).To(Equal(tt.wantErr))
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
if tt.afterFunc != nil {
tt.afterFunc(g, obj)
}
// Check if the object status is valid.
condns := &status.Conditions{NegativePolarity: helmChartReadyDepsNegative}
checker := status.NewChecker(r.Client, testEnv.GetScheme(), condns)
checker.CheckErr(ctx, obj)
})
}
}
func TestHelmChartReconciler_reconcileSubRecs(t *testing.T) {
// Helper to build simple helmChartReconcilerFunc with result and error.
buildReconcileFuncs := func(r sreconcile.Result, e error) helmChartReconcilerFunc {

View File

@ -26,10 +26,8 @@ import (
helmgetter "helm.sh/helm/v3/pkg/getter"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -49,32 +47,32 @@ import (
"github.com/fluxcd/source-controller/internal/helm/getter"
"github.com/fluxcd/source-controller/internal/helm/repository"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
)
// Status conditions owned by HelmRepository reconciler.
var helmRepoOwnedConditions = []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
}
// Conditions that Ready condition is influenced by in descending order of their
// priority.
var helmRepoReadyDeps = []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
}
// Negative conditions that Ready condition is influenced by.
var helmRepoReadyDepsNegative = []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
// helmRepoReadyConditions contains all the conditions information needed
// for HelmRepository Ready status conditions summary calculation.
var helmRepoReadyConditions = summarize.Conditions{
Target: meta.ReadyCondition,
Owned: []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
Summarize: []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
NegativePolarity: []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
@ -144,7 +142,19 @@ func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// Always attempt to patch the object after each reconciliation.
// NOTE: The final runtime result and error are set in this block.
defer func() {
result, retErr = r.summarizeAndPatch(ctx, obj, patchHelper, recResult, retErr)
summarizeHelper := summarize.NewHelper(r.EventRecorder, patchHelper)
summarizeOpts := []summarize.Option{
summarize.WithConditions(helmRepoReadyConditions),
summarize.WithReconcileResult(recResult),
summarize.WithReconcileError(retErr),
summarize.WithIgnoreNotFound(),
summarize.WithProcessors(
summarize.RecordContextualError,
summarize.RecordReconcileReq,
),
summarize.WithResultBuilder(sreconcile.AlwaysRequeueResultBuilder{RequeueAfter: obj.GetInterval().Duration}),
}
result, retErr = summarizeHelper.SummarizeAndPatch(ctx, obj, summarizeOpts...)
// Always record readiness and duration metrics
r.Metrics.RecordReadiness(ctx, obj)
@ -175,50 +185,6 @@ func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return
}
// summarizeAndPatch analyzes the object conditions to create a summary of the
// status conditions, computes runtime results and patches the object in the K8s
// API server.
func (r *HelmRepositoryReconciler) summarizeAndPatch(
ctx context.Context,
obj *sourcev1.HelmRepository,
patchHelper *patch.Helper,
res sreconcile.Result,
recErr error) (ctrl.Result, error) {
sreconcile.RecordContextualError(ctx, r.EventRecorder, obj, recErr)
// Record the value of the reconciliation request, if any.
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
obj.Status.SetLastHandledReconcileRequest(v)
}
// Compute the reconcile results, obtain patch options and reconcile error.
var patchOpts []patch.Option
var result ctrl.Result
patchOpts, result, recErr = sreconcile.ComputeReconcileResult(obj, obj.GetRequeueAfter(), res, recErr, helmRepoOwnedConditions)
// Summarize Ready condition.
conditions.SetSummary(obj,
meta.ReadyCondition,
conditions.WithConditions(
helmRepoReadyDeps...,
),
conditions.WithNegativePolarityConditions(
helmRepoReadyDepsNegative...,
),
)
// Finally, patch the resource.
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
// Ignore patch error "not found" when the object is being deleted.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
}
recErr = kerrors.NewAggregate([]error{recErr, err})
}
return result, recErr
}
// reconcile iterates through the sub-reconcilers and processes the source
// object. The sub-reconcilers are run sequentially. The result and error of
// the sub-reconciliation are collected and returned. For multiple results

View File

@ -24,7 +24,6 @@ import (
"path/filepath"
"strings"
"testing"
"time"
"github.com/darkowlzz/controller-check/status"
"github.com/fluxcd/pkg/apis/meta"
@ -40,7 +39,6 @@ import (
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/helm/repository"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
)
@ -95,10 +93,25 @@ func TestHelmRepositoryReconciler_Reconcile(t *testing.T) {
}, timeout).Should(BeTrue())
// Check if the object status is valid.
condns := &status.Conditions{NegativePolarity: helmRepoReadyDepsNegative}
condns := &status.Conditions{NegativePolarity: helmRepoReadyConditions.NegativePolarity}
checker := status.NewChecker(testEnv.Client, testEnv.GetScheme(), condns)
checker.CheckErr(ctx, obj)
// Patch the object with reconcile request annotation.
patchHelper, err := patch.NewHelper(obj, testEnv.Client)
g.Expect(err).ToNot(HaveOccurred())
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
g.Expect(patchHelper.Patch(ctx, obj)).ToNot(HaveOccurred())
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
return obj.Status.LastHandledReconcileAt == "now"
}, timeout).Should(BeTrue())
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
// Wait for HelmRepository to be deleted
@ -612,181 +625,6 @@ func TestHelmRepositoryReconciler_reconcileArtifact(t *testing.T) {
}
}
func TestHelmRepositoryReconciler_summarizeAndPatch(t *testing.T) {
tests := []struct {
name string
generation int64
beforeFunc func(obj *sourcev1.HelmRepository)
result sreconcile.Result
reconcileErr error
wantErr bool
afterFunc func(t *WithT, obj *sourcev1.HelmRepository)
assertConditions []metav1.Condition
}{
// Success/Fail indicates if a reconciliation succeeded or failed. On
// a successful reconciliation, the object generation is expected to
// match the observed generation in the object status.
// All the cases have some Ready condition set, even if a test case is
// unrelated to the conditions, because it's neseccary for a valid
// status.
{
name: "Success, no extra conditions",
generation: 4,
beforeFunc: func(obj *sourcev1.HelmRepository) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "test-msg")
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "test-msg"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(4)))
},
},
{
name: "Success, Ready=True",
generation: 5,
beforeFunc: func(obj *sourcev1.HelmRepository) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "created")
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "created"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(5)))
},
},
{
name: "Success, removes reconciling for successful result",
generation: 2,
beforeFunc: func(obj *sourcev1.HelmRepository) {
conditions.MarkReconciling(obj, "NewRevision", "new index version")
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "stored artifact")
},
result: sreconcile.ResultSuccess,
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(2)))
},
},
{
name: "Success, record reconciliation request",
beforeFunc: func(obj *sourcev1.HelmRepository) {
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "test-msg")
},
generation: 3,
result: sreconcile.ResultSuccess,
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "test-msg"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
t.Expect(obj.Status.LastHandledReconcileAt).To(Equal("now"))
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(3)))
},
},
{
name: "Fail, with multiple conditions ArtifactOutdated=True,Reconciling=True",
generation: 7,
beforeFunc: func(obj *sourcev1.HelmRepository) {
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", "new index revision")
conditions.MarkReconciling(obj, "NewRevision", "new index revision")
},
reconcileErr: fmt.Errorf("failed to create dir"),
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, "NewRevision", "new index revision"),
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new index revision"),
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new index revision"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
t.Expect(obj.Status.ObservedGeneration).ToNot(Equal(int64(7)))
},
},
{
name: "Success, with subreconciler stalled error",
generation: 9,
beforeFunc: func(obj *sourcev1.HelmRepository) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.FetchFailedCondition, "failed to construct helm client")
},
reconcileErr: &serror.Stalling{Err: fmt.Errorf("some error"), Reason: "some reason"},
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, sourcev1.FetchFailedCondition, "failed to construct helm client"),
*conditions.TrueCondition(meta.StalledCondition, "some reason", "some error"),
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.FetchFailedCondition, "failed to construct helm client"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
t.Expect(obj.Status.ObservedGeneration).To(Equal(int64(9)))
},
},
{
name: "Fail, no error but requeue requested",
generation: 3,
beforeFunc: func(obj *sourcev1.HelmRepository) {
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "test-msg")
},
result: sreconcile.ResultRequeue,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, meta.FailedReason, "test-msg"),
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
t.Expect(obj.Status.ObservedGeneration).ToNot(Equal(int64(3)))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
builder := fakeclient.NewClientBuilder().WithScheme(testEnv.GetScheme())
r := &HelmRepositoryReconciler{
Client: builder.Build(),
EventRecorder: record.NewFakeRecorder(32),
}
obj := &sourcev1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
Generation: tt.generation,
},
Spec: sourcev1.HelmRepositorySpec{
Interval: metav1.Duration{Duration: 5 * time.Second},
},
}
if tt.beforeFunc != nil {
tt.beforeFunc(obj)
}
ctx := context.TODO()
g.Expect(r.Create(ctx, obj)).To(Succeed())
patchHelper, err := patch.NewHelper(obj, r.Client)
g.Expect(err).ToNot(HaveOccurred())
_, gotErr := r.summarizeAndPatch(ctx, obj, patchHelper, tt.result, tt.reconcileErr)
g.Expect(gotErr != nil).To(Equal(tt.wantErr))
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
if tt.afterFunc != nil {
tt.afterFunc(g, obj)
}
// Check if the object status is valid.
condns := &status.Conditions{NegativePolarity: helmRepoReadyDepsNegative}
checker := status.NewChecker(r.Client, testEnv.GetScheme(), condns)
checker.CheckErr(ctx, obj)
})
}
}
func TestHelmRepositoryReconciler_reconcileSubRecs(t *testing.T) {
// Helper to build simple helmRepoReconcilerFunc with result and error.
buildReconcileFuncs := func(r sreconcile.Result, e error) helmRepoReconcilerFunc {

114
internal/object/object.go Normal file
View File

@ -0,0 +1,114 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package object
import (
"errors"
"time"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)
var (
ErrObservedGenerationNotFound = errors.New("observed generation not found")
ErrLastHandledReconcileAtNotFound = errors.New("last handled reconcile at not found")
ErrRequeueIntervalNotFound = errors.New("requeue interval not found")
)
// toUnstructured converts a runtime object into Unstructured.
// Based on https://github.com/fluxcd/pkg/blob/b4a14854c75753ea9431693b39c4be672f246552/runtime/patch/utils.go#L55.
func toUnstructured(obj runtime.Object) (*unstructured.Unstructured, error) {
// If the incoming object is already unstructured, perform a deep copy first
// otherwise DefaultUnstructuredConverter ends up returning the inner map without
// making a copy.
if _, ok := obj.(runtime.Unstructured); ok {
obj = obj.DeepCopyObject()
}
rawMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, err
}
return &unstructured.Unstructured{Object: rawMap}, nil
}
// GetStatusLastHandledReconcileAt returns the status.lastHandledReconcileAt
// value of a given runtime object, if present.
func GetStatusLastHandledReconcileAt(obj runtime.Object) (string, error) {
u, err := toUnstructured(obj)
if err != nil {
return "", err
}
ra, found, err := unstructured.NestedString(u.Object, "status", "lastHandledReconcileAt")
if err != nil {
return "", err
}
if !found {
return "", ErrLastHandledReconcileAtNotFound
}
return ra, nil
}
// SetStatusLastHandledReconcileAt sets the status.lastHandledReconcileAt value
// of a given runtime object.
func SetStatusLastHandledReconcileAt(obj runtime.Object, val string) error {
content, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return err
}
u := unstructured.Unstructured{}
u.SetUnstructuredContent(content)
if err := unstructured.SetNestedField(u.Object, val, "status", "lastHandledReconcileAt"); err != nil {
return err
}
return runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, obj)
}
// GetStatusObservedGeneration returns the status.observedGeneration of a given
// runtime object.
func GetStatusObservedGeneration(obj runtime.Object) (int64, error) {
u, err := toUnstructured(obj)
if err != nil {
return 0, err
}
og, found, err := unstructured.NestedInt64(u.Object, "status", "observedGeneration")
if err != nil {
return 0, err
}
if !found {
return 0, ErrObservedGenerationNotFound
}
return og, nil
}
// GetRequeueInterval returns the spec.interval of a given runtime object, if
// present.
func GetRequeueInterval(obj runtime.Object) (time.Duration, error) {
period := time.Second
u, err := toUnstructured(obj)
if err != nil {
return period, err
}
interval, found, err := unstructured.NestedString(u.Object, "spec", "interval")
if err != nil {
return period, err
}
if !found {
return period, ErrRequeueIntervalNotFound
}
return time.ParseDuration(interval)
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package object
import (
"testing"
"time"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
)
func TestGetStatusLastHandledReconcileAt(t *testing.T) {
g := NewWithT(t)
// Get unset status lastHandledReconcileAt.
obj := &sourcev1.GitRepository{}
_, err := GetStatusLastHandledReconcileAt(obj)
g.Expect(err).To(Equal(ErrLastHandledReconcileAtNotFound))
// Get set status lastHandledReconcileAt.
obj.Status.LastHandledReconcileAt = "foo"
ra, err := GetStatusLastHandledReconcileAt(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(ra).To(Equal("foo"))
}
func TestSetStatusLastHandledReconcileAt(t *testing.T) {
g := NewWithT(t)
obj := &sourcev1.GitRepository{}
err := SetStatusLastHandledReconcileAt(obj, "now")
g.Expect(err).ToNot(HaveOccurred())
g.Expect(obj.Status.LastHandledReconcileAt).To(Equal("now"))
}
func TestGetStatusObservedGeneration(t *testing.T) {
g := NewWithT(t)
// Get unset status observedGeneration.
obj := &sourcev1.GitRepository{}
_, err := GetStatusObservedGeneration(obj)
g.Expect(err).To(Equal(ErrObservedGenerationNotFound))
// Get set status observedGeneration.
obj.Status.ObservedGeneration = 7
og, err := GetStatusObservedGeneration(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(og).To(Equal(int64(7)))
}
func TestGetRequeueInterval(t *testing.T) {
g := NewWithT(t)
// Get empty requeue interval value.
obj := &sourcev1.GitRepository{}
pd, err := GetRequeueInterval(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(pd).To(Equal(time.Duration(0)))
// Get set requeue interval value.
obj.Spec.Interval = metav1.Duration{Duration: 3 * time.Second}
pd, err = GetRequeueInterval(obj)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(pd).To(Equal(3 * time.Second))
// Get non-existent requeue interval value.
obj2 := &corev1.Secret{}
_, err = GetRequeueInterval(obj2)
g.Expect(err).To(Equal(ErrRequeueIntervalNotFound))
}

View File

@ -17,12 +17,8 @@ limitations under the License.
package reconcile
import (
"context"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/fluxcd/pkg/apis/meta"
@ -37,20 +33,40 @@ import (
type Result int
const (
// ResultEmpty indicates a reconcile result which does not requeue.
// ResultEmpty indicates a reconcile result which does not requeue. It is
// also used when returning an error, since the error overshadows result.
ResultEmpty Result = iota
// ResultRequeue indicates a reconcile result which should immediately
// requeue.
ResultRequeue
// ResultSuccess indicates a reconcile result which should be
// requeued on the interval as defined on the reconciled object.
// ResultSuccess indicates a reconcile success result.
// For a reconciler that requeues regularly at a fixed interval, runtime
// result with a fixed RequeueAfter is success result.
// For a reconciler that doesn't requeue on successful reconciliation,
// an empty runtime result is success result.
// It is usually returned at the end of a reconciler/sub-reconciler.
ResultSuccess
)
// RuntimeResultBuilder defines an interface for runtime result builders. This
// can be implemented to build custom results based on the context of the
// reconciler.
type RuntimeResultBuilder interface {
BuildRuntimeResult(rr Result, err error) ctrl.Result
}
// AlwaysRequeueResultBuilder implements a RuntimeResultBuilder for always
// requeuing reconcilers. A successful reconciliation result for such
// reconcilers contains a fixed RequeueAfter value.
type AlwaysRequeueResultBuilder struct {
// RequeueAfter is the fixed period at which the reconciler requeues on
// successful execution.
RequeueAfter time.Duration
}
// BuildRuntimeResult converts a given Result and error into the
// return values of a controller's Reconcile function.
// func BuildRuntimeResult(ctx context.Context, recorder kuberecorder.EventRecorder, obj sourcev1.Source, rr Result, err error) (ctrl.Result, error) {
func BuildRuntimeResult(successInterval time.Duration, rr Result, err error) ctrl.Result {
func (r AlwaysRequeueResultBuilder) BuildRuntimeResult(rr Result, err error) ctrl.Result {
// Handle special errors that contribute to expressing the result.
if e, ok := err.(*serror.Waiting); ok {
return ctrl.Result{RequeueAfter: e.RequeueAfter}
@ -60,52 +76,32 @@ func BuildRuntimeResult(successInterval time.Duration, rr Result, err error) ctr
case ResultRequeue:
return ctrl.Result{Requeue: true}
case ResultSuccess:
return ctrl.Result{RequeueAfter: successInterval}
return ctrl.Result{RequeueAfter: r.RequeueAfter}
default:
return ctrl.Result{}
}
}
// RecordContextualError records the contextual errors based on their types.
// An event is recorded for the errors that are returned to the runtime. The
// runtime handles the logging of the error.
// An event is recorded and an error is logged for errors that are known to be
// swallowed, not returned to the runtime.
func RecordContextualError(ctx context.Context, recorder kuberecorder.EventRecorder, obj runtime.Object, err error) {
switch e := err.(type) {
case *serror.Event:
recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
case *serror.Waiting:
// Waiting errors are not returned to the runtime. Log it explicitly.
ctrl.LoggerFrom(ctx).Info("reconciliation waiting", "reason", e.Err, "duration", e.RequeueAfter)
recorder.Event(obj, corev1.EventTypeNormal, e.Reason, e.Error())
case *serror.Stalling:
// Stalling errors are not returned to the runtime. Log it explicitly.
ctrl.LoggerFrom(ctx).Error(e, "reconciliation stalled")
recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
}
}
// ComputeReconcileResult analyzes the reconcile results (result + error),
// updates the status conditions of the object with any corrections and returns
// object patch configuration, runtime result and runtime error. The caller is
// responsible for using the patch configuration to patch the object in the API
// server.
func ComputeReconcileResult(obj conditions.Setter, successInterval time.Duration, res Result, recErr error, ownedConditions []string) ([]patch.Option, ctrl.Result, error) {
result := BuildRuntimeResult(successInterval, res, recErr)
// responsible for using the patch configuration while patching the object in
// the API server.
// The RuntimeResultBuilder is used to define how the ctrl.Result is computed.
func ComputeReconcileResult(obj conditions.Setter, res Result, recErr error, rb RuntimeResultBuilder) ([]patch.Option, ctrl.Result, error) {
var pOpts []patch.Option
// Compute the runtime result.
var result ctrl.Result
if rb != nil {
result = rb.BuildRuntimeResult(res, recErr)
}
// Remove reconciling condition on successful reconciliation.
if recErr == nil && res == ResultSuccess {
conditions.Delete(obj, meta.ReconcilingCondition)
}
// Patch the object, ignoring conflicts on the conditions owned by this controller.
pOpts := []patch.Option{
patch.WithOwnedConditions{
Conditions: ownedConditions,
},
}
// Analyze the reconcile error.
switch t := recErr.(type) {
case *serror.Stalling:

View File

@ -17,9 +17,20 @@ limitations under the License.
package reconcile
import (
"fmt"
"testing"
"time"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
serror "github.com/fluxcd/source-controller/internal/error"
)
func TestLowestRequeuingResult(t *testing.T) {
@ -45,3 +56,149 @@ func TestLowestRequeuingResult(t *testing.T) {
})
}
}
// This test uses AlwaysRequeueResultBuilder as the RuntimeResultBuilder.
func TestComputeReconcileResult(t *testing.T) {
testSuccessInterval := time.Minute
tests := []struct {
name string
result Result
beforeFunc func(obj conditions.Setter)
recErr error
wantResult ctrl.Result
wantErr bool
assertConditions []metav1.Condition
afterFunc func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions)
}{
{
name: "successful result",
result: ResultSuccess,
recErr: nil,
wantResult: ctrl.Result{RequeueAfter: testSuccessInterval},
wantErr: false,
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeTrue())
},
},
{
name: "successful result, Reconciling=True, remove Reconciling",
result: ResultSuccess,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkReconciling(obj, "NewRevision", "new revision")
},
recErr: nil,
wantResult: ctrl.Result{RequeueAfter: testSuccessInterval},
wantErr: false,
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeTrue())
t.Expect(conditions.IsUnknown(obj, meta.ReconcilingCondition)).To(BeTrue())
},
},
{
name: "successful result, Stalled=True, remove Stalled",
result: ResultSuccess,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkStalled(obj, "SomeReason", "some message")
},
recErr: nil,
wantResult: ctrl.Result{RequeueAfter: testSuccessInterval},
wantErr: false,
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeTrue())
t.Expect(conditions.IsUnknown(obj, meta.StalledCondition)).To(BeTrue())
},
},
{
name: "requeue result",
result: ResultRequeue,
recErr: nil,
wantResult: ctrl.Result{Requeue: true},
wantErr: false,
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeFalse())
},
},
{
name: "requeue result",
result: ResultRequeue,
recErr: nil,
wantResult: ctrl.Result{Requeue: true},
wantErr: false,
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeFalse())
},
},
{
name: "stalling error",
result: ResultEmpty,
recErr: &serror.Stalling{Err: fmt.Errorf("some error"), Reason: "some reason"},
wantResult: ctrl.Result{},
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.StalledCondition, "some reason", "some error"),
},
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeTrue())
},
},
{
name: "waiting error",
result: ResultEmpty,
recErr: &serror.Waiting{Err: fmt.Errorf("some error"), Reason: "some reason"},
wantResult: ctrl.Result{},
wantErr: false,
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeFalse())
},
},
{
name: "random error",
result: ResultEmpty,
recErr: fmt.Errorf("some error"),
wantResult: ctrl.Result{},
wantErr: true,
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeFalse())
},
},
{
name: "random error, Stalled=True, remove Stalled",
result: ResultEmpty,
recErr: fmt.Errorf("some error"),
wantResult: ctrl.Result{},
wantErr: true,
afterFunc: func(t *WithT, obj conditions.Setter, patchOpts *patch.HelperOptions) {
t.Expect(patchOpts.IncludeStatusObservedGeneration).To(BeFalse())
t.Expect(conditions.IsUnknown(obj, meta.StalledCondition)).To(BeTrue())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
obj := &sourcev1.GitRepository{}
obj.Name = "test-git-repo"
obj.Namespace = "default"
obj.Spec.Interval = metav1.Duration{Duration: testSuccessInterval}
if tt.beforeFunc != nil {
tt.beforeFunc(obj)
}
rb := AlwaysRequeueResultBuilder{RequeueAfter: obj.Spec.Interval.Duration}
pOpts, result, err := ComputeReconcileResult(obj, tt.result, tt.recErr, rb)
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(result).To(Equal(tt.wantResult))
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
opts := &patch.HelperOptions{}
for _, o := range pOpts {
o.ApplyToHelper(opts)
}
tt.afterFunc(g, obj, opts)
})
}
}

View File

@ -0,0 +1,99 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package summarize
import (
"fmt"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
"k8s.io/apimachinery/pkg/runtime"
"github.com/fluxcd/source-controller/internal/object"
)
// HaveStatusObservedGeneration returns a custom matcher to check if a
// runtime.Object has a given status observedGeneration value.
func HaveStatusObservedGeneration(expected int64) types.GomegaMatcher {
return &haveStatusObservedGeneration{
expected: expected,
}
}
type haveStatusObservedGeneration struct {
expected int64
actual int64
}
func (m *haveStatusObservedGeneration) Match(actual interface{}) (success bool, err error) {
obj, ok := actual.(runtime.Object)
if !ok {
return false, fmt.Errorf("actual should be a runtime object")
}
og, err := object.GetStatusObservedGeneration(obj)
if err != nil && err != object.ErrObservedGenerationNotFound {
return false, err
}
m.actual = og
return Equal(m.expected).Match(og)
}
func (m *haveStatusObservedGeneration) FailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("expected\n\t%d\nto match\n\t%d\n", m.actual, m.expected)
}
func (m *haveStatusObservedGeneration) NegatedFailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("expected\n\t%d\nto not match\n\t%d\n", m.actual, m.expected)
}
// HaveStatusLastHandledReconcileAt returns a custom matcher to check if a
// runtime.Object has a given status lastHandledReconcileAt value.
func HaveStatusLastHandledReconcileAt(expected string) types.GomegaMatcher {
return &haveStatusLastHandledReconcileAt{
expected: expected,
}
}
type haveStatusLastHandledReconcileAt struct {
expected string
actual string
}
func (m *haveStatusLastHandledReconcileAt) Match(actual interface{}) (success bool, err error) {
obj, ok := actual.(runtime.Object)
if !ok {
return false, fmt.Errorf("actual should be a runtime object")
}
ra, err := object.GetStatusLastHandledReconcileAt(obj)
if err != nil && err != object.ErrLastHandledReconcileAtNotFound {
return false, err
}
m.actual = ra
return Equal(m.expected).Match(ra)
}
func (m *haveStatusLastHandledReconcileAt) FailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("expected\n\t%s\nto match\n\t%s\n", m.actual, m.expected)
}
func (m *haveStatusLastHandledReconcileAt) NegatedFailureMessage(actual interface{}) (message string) {
return fmt.Sprintf("expected\n\t%s\nto not match\n\t%s\n", m.actual, m.expected)
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package summarize
import (
"context"
corev1 "k8s.io/api/core/v1"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluxcd/pkg/apis/meta"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/object"
"github.com/fluxcd/source-controller/internal/reconcile"
)
// ResultProcessor processes the results of reconciliation (the object, result
// and error). Any errors during processing need not result in the
// reconciliation failure. The errors can be recorded as logs and events.
type ResultProcessor func(context.Context, kuberecorder.EventRecorder, client.Object, reconcile.Result, error)
// RecordContextualError is a ResultProcessor that records the contextual errors
// based on their types.
// An event is recorded for the errors that are returned to the runtime. The
// runtime handles the logging of the error.
// An event is recorded and an error is logged for errors that are known to be
// swallowed, not returned to the runtime.
func RecordContextualError(ctx context.Context, recorder kuberecorder.EventRecorder, obj client.Object, _ reconcile.Result, err error) {
switch e := err.(type) {
case *serror.Event:
recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
case *serror.Waiting:
// Waiting errors are not returned to the runtime. Log it explicitly.
ctrl.LoggerFrom(ctx).Info("reconciliation waiting", "reason", e.Err, "duration", e.RequeueAfter)
recorder.Event(obj, corev1.EventTypeNormal, e.Reason, e.Error())
case *serror.Stalling:
// Stalling errors are not returned to the runtime. Log it explicitly.
ctrl.LoggerFrom(ctx).Error(e, "reconciliation stalled")
recorder.Eventf(obj, corev1.EventTypeWarning, e.Reason, e.Error())
}
}
// RecordReconcileReq is a ResultProcessor that checks the reconcile
// annotation value and sets it in the object status as
// status.lastHandledReconcileAt.
func RecordReconcileReq(ctx context.Context, recorder kuberecorder.EventRecorder, obj client.Object, _ reconcile.Result, _ error) {
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
object.SetStatusLastHandledReconcileAt(obj, v)
}
}

View File

@ -0,0 +1,91 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package summarize
import (
"context"
"testing"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluxcd/pkg/apis/meta"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/object"
"github.com/fluxcd/source-controller/internal/reconcile"
)
func TestRecordReconcileReq(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj client.Object)
afterFunc func(t *WithT, obj client.Object)
}{
{
name: "no reconcile req",
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).To(HaveStatusLastHandledReconcileAt(""))
},
},
{
name: "no reconcile req, noop on existing value",
beforeFunc: func(obj client.Object) {
object.SetStatusLastHandledReconcileAt(obj, "zzz")
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).To(HaveStatusLastHandledReconcileAt("zzz"))
},
},
{
name: "with reconcile req",
beforeFunc: func(obj client.Object) {
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).To(HaveStatusLastHandledReconcileAt("now"))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
obj := &sourcev1.GitRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-obj",
},
}
if tt.beforeFunc != nil {
tt.beforeFunc(obj)
}
ctx := context.TODO()
RecordReconcileReq(ctx, record.NewFakeRecorder(32), obj, reconcile.ResultEmpty, nil)
if tt.afterFunc != nil {
tt.afterFunc(g, obj)
}
})
}
}

View File

@ -0,0 +1,204 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package summarize
import (
"context"
apierrors "k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/source-controller/internal/reconcile"
)
// Conditions contains all the conditions information needed to summarize the
// target condition.
type Conditions struct {
// Target is the target condition, e.g.: Ready.
Target string
// Owned conditions are the conditions owned by the reconciler for this
// target condition.
Owned []string
// Summarize conditions are the conditions that the target condition depends
// on.
Summarize []string
// NegativePolarity conditions are the conditions in Summarize with negative
// polarity.
NegativePolarity []string
}
// Helper is SummarizeAndPatch helper.
type Helper struct {
recorder kuberecorder.EventRecorder
patchHelper *patch.Helper
}
// NewHelper returns an initialized Helper.
func NewHelper(recorder kuberecorder.EventRecorder, patchHelper *patch.Helper) *Helper {
return &Helper{
recorder: recorder,
patchHelper: patchHelper,
}
}
// HelperOptions contains options for SummarizeAndPatch.
// Summarizing and patching at the very end of a reconciliation involves
// computing the result of the reconciler. This requires providing the
// ReconcileResult, ReconcileError and a ResultBuilder in the context of the
// reconciliation.
// For using this to perform intermediate patching in the middle of a
// reconciliation, no ReconcileResult, ReconcileError or ResultBuilder should
// be provided. Only Conditions summary would be calculated and patched.
type HelperOptions struct {
// Conditions are conditions that needs to be summarized and persisted on
// the object.
Conditions []Conditions
// Processors are chain of ResultProcessors for processing the results. This
// can be used to analyze and modify the results. This enables injecting
// custom middlewares in the SummarizeAndPatch operation.
Processors []ResultProcessor
// IgnoreNotFound can be used to ignores any resource not found error during
// patching.
IgnoreNotFound bool
// ReconcileResult is the abstracted result of reconciliation.
ReconcileResult reconcile.Result
// ReconcileError is the reconciliation error.
ReconcileError error
// ResultBuilder defines how the reconciliation result is computed.
ResultBuilder reconcile.RuntimeResultBuilder
}
// Option is configuration that modifies SummarizeAndPatch.
type Option func(*HelperOptions)
// WithConditions sets the Conditions for which summary is calculated in
// SummarizeAndPatch.
func WithConditions(condns ...Conditions) Option {
return func(s *HelperOptions) {
s.Conditions = append(s.Conditions, condns...)
}
}
// WithProcessors can be used to inject middlewares in the SummarizeAndPatch
// process, to be executed before the result calculation and patching.
func WithProcessors(rps ...ResultProcessor) Option {
return func(s *HelperOptions) {
s.Processors = append(s.Processors, rps...)
}
}
// WithIgnoreNotFound skips any resource not found error during patching.
func WithIgnoreNotFound() Option {
return func(s *HelperOptions) {
s.IgnoreNotFound = true
}
}
// WithResultBuilder sets the strategy for result computation in
// SummarizeAndPatch.
func WithResultBuilder(rb reconcile.RuntimeResultBuilder) Option {
return func(s *HelperOptions) {
s.ResultBuilder = rb
}
}
// WithReconcileResult sets the value of input result used to calculate the
// results of reconciliation in SummarizeAndPatch.
func WithReconcileResult(rr reconcile.Result) Option {
return func(s *HelperOptions) {
s.ReconcileResult = rr
}
}
// WithReconcileError sets the value of input error used to calculate the
// results reconciliation in SummarizeAndPatch.
func WithReconcileError(re error) Option {
return func(s *HelperOptions) {
s.ReconcileError = re
}
}
// SummarizeAndPatch summarizes and patches the result to the target object.
// When used at the very end of a reconciliation, the result builder must be
// specified using the Option WithResultBuilder(). The returned result and error
// can be returned as the return values of the reconciliation.
// When used in the middle of a reconciliation, no result builder should be set
// and the result can be ignored.
func (h *Helper) SummarizeAndPatch(ctx context.Context, obj conditions.Setter, options ...Option) (ctrl.Result, error) {
// Calculate the options.
opts := &HelperOptions{}
for _, o := range options {
o(opts)
}
// Combined the owned conditions of all the conditions for the patcher.
ownedConditions := []string{}
for _, c := range opts.Conditions {
ownedConditions = append(ownedConditions, c.Owned...)
}
// Patch the object, prioritizing the conditions owned by the controller in
// case of any conflicts.
patchOpts := []patch.Option{
patch.WithOwnedConditions{
Conditions: ownedConditions,
},
}
// Process the results of reconciliation.
for _, processor := range opts.Processors {
processor(ctx, h.recorder, obj, opts.ReconcileResult, opts.ReconcileError)
}
var result ctrl.Result
var recErr error
if opts.ResultBuilder != nil {
// Compute the reconcile results, obtain patch options and reconcile error.
var pOpts []patch.Option
pOpts, result, recErr = reconcile.ComputeReconcileResult(obj, opts.ReconcileResult, opts.ReconcileError, opts.ResultBuilder)
patchOpts = append(patchOpts, pOpts...)
}
// Summarize conditions. This must be performed only after computing the
// reconcile result, since the object status is adjusted based on the
// reconcile result and error.
for _, c := range opts.Conditions {
conditions.SetSummary(obj,
c.Target,
conditions.WithConditions(
c.Summarize...,
),
conditions.WithNegativePolarityConditions(
c.NegativePolarity...,
),
)
}
// Finally, patch the resource.
if err := h.patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
// Ignore patch error "not found" when the object is being deleted.
if opts.IgnoreNotFound && !obj.GetDeletionTimestamp().IsZero() {
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
}
recErr = kerrors.NewAggregate([]error{recErr, err})
}
return result, recErr
}

View File

@ -0,0 +1,396 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package summarize
import (
"context"
"fmt"
"testing"
"time"
"github.com/darkowlzz/controller-check/status"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
"github.com/fluxcd/pkg/runtime/patch"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/reconcile"
)
// This tests the scenario where SummarizeAndPatch is used at the very end of a
// reconciliation.
func TestSummarizeAndPatch(t *testing.T) {
var testReadyConditions = Conditions{
Target: meta.ReadyCondition,
Owned: []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
Summarize: []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
NegativePolarity: []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
}
var testFooConditions = Conditions{
Target: "Foo",
Owned: []string{
"Foo",
"AAA",
"BBB",
},
Summarize: []string{
"AAA",
"BBB",
},
NegativePolarity: []string{
"BBB",
},
}
tests := []struct {
name string
generation int64
beforeFunc func(obj conditions.Setter)
result reconcile.Result
reconcileErr error
conditions []Conditions
wantErr bool
afterFunc func(t *WithT, obj client.Object)
assertConditions []metav1.Condition
}{
// Success/Fail indicates if a reconciliation succeeded or failed. On
// a successful reconciliation, the object generation is expected to
// match the observed generation in the object status.
// All the cases have some Ready condition set, even if a test case is
// unrelated to the conditions, because it's neseccary for a valid
// status.
{
name: "Success, no extra conditions",
generation: 4,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "test-msg")
},
conditions: []Conditions{testReadyConditions},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "test-msg"),
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).To(HaveStatusObservedGeneration(4))
},
},
{
name: "Success, Ready=True",
generation: 5,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "created")
},
conditions: []Conditions{testReadyConditions},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "created"),
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).To(HaveStatusObservedGeneration(5))
},
},
{
name: "Success, removes reconciling for successful result",
generation: 2,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkReconciling(obj, "NewRevision", "new index version")
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "stored artifact")
},
conditions: []Conditions{testReadyConditions},
result: reconcile.ResultSuccess,
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "stored artifact"),
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).To(HaveStatusObservedGeneration(2))
},
},
{
name: "Success, record reconciliation request",
beforeFunc: func(obj conditions.Setter) {
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "test-msg")
},
generation: 3,
conditions: []Conditions{testReadyConditions},
result: reconcile.ResultSuccess,
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "test-msg"),
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).To(HaveStatusLastHandledReconcileAt("now"))
t.Expect(obj).To(HaveStatusObservedGeneration(3))
},
},
{
name: "Fail, with multiple conditions ArtifactOutdated=True,Reconciling=True",
generation: 7,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", "new index revision")
conditions.MarkReconciling(obj, "NewRevision", "new index revision")
},
conditions: []Conditions{testReadyConditions},
reconcileErr: fmt.Errorf("failed to create dir"),
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, "NewRevision", "new index revision"),
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "new index revision"),
*conditions.TrueCondition(meta.ReconcilingCondition, "NewRevision", "new index revision"),
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).ToNot(HaveStatusObservedGeneration(7))
},
},
{
name: "Success, with subreconciler stalled error",
generation: 9,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.FetchFailedCondition, "failed to construct client")
},
conditions: []Conditions{testReadyConditions},
reconcileErr: &serror.Stalling{Err: fmt.Errorf("some error"), Reason: "some reason"},
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, sourcev1.FetchFailedCondition, "failed to construct client"),
*conditions.TrueCondition(meta.StalledCondition, "some reason", "some error"),
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.FetchFailedCondition, "failed to construct client"),
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).To(HaveStatusObservedGeneration(9))
},
},
{
name: "Fail, no error but requeue requested",
generation: 3,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "test-msg")
},
conditions: []Conditions{testReadyConditions},
result: reconcile.ResultRequeue,
assertConditions: []metav1.Condition{
*conditions.FalseCondition(meta.ReadyCondition, meta.FailedReason, "test-msg"),
},
afterFunc: func(t *WithT, obj client.Object) {
t.Expect(obj).ToNot(HaveStatusObservedGeneration(3))
},
},
{
name: "Success, multiple conditions summary",
generation: 3,
beforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "test-msg")
conditions.MarkTrue(obj, "AAA", "ZZZ", "zzz") // Positive polarity True.
conditions.MarkTrue(obj, "BBB", "YYY", "yyy") // Negative polarity True.
},
conditions: []Conditions{testReadyConditions, testFooConditions},
result: reconcile.ResultSuccess,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "test-msg"),
*conditions.FalseCondition("Foo", "YYY", "yyy"), // False summary.
*conditions.TrueCondition("BBB", "YYY", "yyy"),
*conditions.TrueCondition("AAA", "ZZZ", "zzz"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
scheme := runtime.NewScheme()
g.Expect(sourcev1.AddToScheme(scheme))
builder := fakeclient.NewClientBuilder().WithScheme(scheme)
client := builder.Build()
obj := &sourcev1.GitRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
Generation: tt.generation,
},
Spec: sourcev1.GitRepositorySpec{
Interval: metav1.Duration{Duration: 5 * time.Second},
},
}
if tt.beforeFunc != nil {
tt.beforeFunc(obj)
}
ctx := context.TODO()
g.Expect(client.Create(ctx, obj)).To(Succeed())
patchHelper, err := patch.NewHelper(obj, client)
g.Expect(err).ToNot(HaveOccurred())
summaryHelper := NewHelper(record.NewFakeRecorder(32), patchHelper)
summaryOpts := []Option{
WithReconcileResult(tt.result),
WithReconcileError(tt.reconcileErr),
WithConditions(tt.conditions...),
WithIgnoreNotFound(),
WithProcessors(RecordContextualError, RecordReconcileReq),
WithResultBuilder(reconcile.AlwaysRequeueResultBuilder{RequeueAfter: obj.Spec.Interval.Duration}),
}
_, gotErr := summaryHelper.SummarizeAndPatch(ctx, obj, summaryOpts...)
g.Expect(gotErr != nil).To(Equal(tt.wantErr))
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
if tt.afterFunc != nil {
tt.afterFunc(g, obj)
}
// Check if the object status is valid as per kstatus.
condns := &status.Conditions{NegativePolarity: testReadyConditions.NegativePolarity}
checker := status.NewChecker(client, scheme, condns)
checker.CheckErr(ctx, obj)
})
}
}
// This tests the scenario where SummarizeAndPatch is used in the middle of
// reconciliation.
func TestSummarizeAndPatch_Intermediate(t *testing.T) {
var testStageAConditions = Conditions{
Target: "StageA",
Owned: []string{"StageA", "A1", "A2", "A3"},
Summarize: []string{"A1", "A2", "A3"},
NegativePolarity: []string{"A3"},
}
var testStageBConditions = Conditions{
Target: "StageB",
Owned: []string{"StageB", "B1", "B2"},
Summarize: []string{"B1", "B2"},
NegativePolarity: []string{"B1"},
}
tests := []struct {
name string
conditions []Conditions
beforeFunc func(obj conditions.Setter)
assertConditions []metav1.Condition
}{
{
name: "single Conditions, True summary",
conditions: []Conditions{testStageAConditions},
beforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "A1", "ZZZ", "zzz") // Positive polarity True.
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition("StageA", "ZZZ", "zzz"), // True summary.
*conditions.TrueCondition("A1", "ZZZ", "zzz"),
},
},
{
name: "single Conditions, False summary",
conditions: []Conditions{testStageAConditions},
beforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "A1", "ZZZ", "zzz") // Positive polarity True.
conditions.MarkTrue(obj, "A3", "OOO", "ooo") // Negative polarity True.
},
assertConditions: []metav1.Condition{
*conditions.FalseCondition("StageA", "OOO", "ooo"), // False summary.
*conditions.TrueCondition("A3", "OOO", "ooo"),
*conditions.TrueCondition("A1", "ZZZ", "zzz"),
},
},
{
name: "multiple Conditions",
conditions: []Conditions{testStageAConditions, testStageBConditions},
beforeFunc: func(obj conditions.Setter) {
conditions.MarkTrue(obj, "A3", "ZZZ", "zzz") // Negative polarity True.
conditions.MarkTrue(obj, "B2", "RRR", "rrr") // Positive polarity True.
},
assertConditions: []metav1.Condition{
*conditions.FalseCondition("StageA", "ZZZ", "zzz"), // False summary.
*conditions.TrueCondition("A3", "ZZZ", "zzz"),
*conditions.TrueCondition("StageB", "RRR", "rrr"), // True summary.
*conditions.TrueCondition("B2", "RRR", "rrr"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
scheme := runtime.NewScheme()
g.Expect(sourcev1.AddToScheme(scheme))
builder := fakeclient.NewClientBuilder().WithScheme(scheme)
kclient := builder.Build()
obj := &sourcev1.GitRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
Spec: sourcev1.GitRepositorySpec{
Interval: metav1.Duration{Duration: 5 * time.Second},
},
Status: sourcev1.GitRepositoryStatus{
Conditions: []metav1.Condition{
*conditions.FalseCondition("StageA", "QQQ", "qqq"),
},
},
}
if tt.beforeFunc != nil {
tt.beforeFunc(obj)
}
ctx := context.TODO()
g.Expect(kclient.Create(ctx, obj)).To(Succeed())
patchHelper, err := patch.NewHelper(obj, kclient)
g.Expect(err).ToNot(HaveOccurred())
summaryHelper := NewHelper(record.NewFakeRecorder(32), patchHelper)
summaryOpts := []Option{
WithConditions(tt.conditions...),
}
_, err = summaryHelper.SummarizeAndPatch(ctx, obj, summaryOpts...)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
})
}
}