Merge pull request #748 from fluxcd/oci-helmrepo-refactor

OCI HelmRepo: handle status conditions in-line
This commit is contained in:
Stefan Prodan 2022-05-31 11:30:09 +03:00 committed by GitHub
commit fe31ff9e77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 167 additions and 174 deletions

View File

@ -577,8 +577,8 @@ func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sour
// Clean status sub-resource
obj.Status.Artifact = nil
obj.Status.URL = ""
// Remove the condition as the artifact doesn't exist.
conditions.Delete(obj, sourcev1.ArtifactInStorageCondition)
// Remove any stale conditions.
obj.Status.Conditions = nil
return nil
}
if obj.GetArtifact() != nil {

View File

@ -20,58 +20,48 @@ import (
"context"
"errors"
"fmt"
"net/url"
"os"
"time"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
serror "github.com/fluxcd/source-controller/internal/error"
"github.com/fluxcd/source-controller/internal/helm/registry"
"github.com/fluxcd/source-controller/internal/helm/repository"
intpredicates "github.com/fluxcd/source-controller/internal/predicates"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
helmgetter "helm.sh/helm/v3/pkg/getter"
helmreg "helm.sh/helm/v3/pkg/registry"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
"github.com/fluxcd/source-controller/api/v1beta2"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/helm/registry"
"github.com/fluxcd/source-controller/internal/helm/repository"
"github.com/fluxcd/source-controller/internal/object"
intpredicates "github.com/fluxcd/source-controller/internal/predicates"
)
var helmRepositoryOCIReadyCondition = summarize.Conditions{
Target: meta.ReadyCondition,
Owned: []string{
sourcev1.FetchFailedCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
Summarize: []string{
sourcev1.FetchFailedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
NegativePolarity: []string{
sourcev1.FetchFailedCondition,
meta.StalledCondition,
meta.ReconcilingCondition,
},
var helmRepositoryOCIOwnedConditions = []string{
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
}
// helmRepositoryOCIFailConditions contains the conditions that represent a
// failure.
var helmRepositoryOCIFailConditions = []string{
sourcev1.FetchFailedCondition,
var helmRepositoryOCINegativeConditions = []string{
meta.StalledCondition,
meta.ReconcilingCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
@ -95,12 +85,6 @@ type HelmRepositoryOCIReconciler struct {
// The caller is responsible for deleting the file.
type RegistryClientGeneratorFunc func(isLogin bool) (*helmreg.Client, string, error)
// helmRepositoryOCIReconcileFunc is the function type for all the
// v1beta2.HelmRepository (sub)reconcile functions for OCI type. The type implementations
// are grouped and executed serially to perform the complete reconcile of the
// object.
type helmRepositoryOCIReconcileFunc func(ctx context.Context, obj *sourcev1.HelmRepository) (sreconcile.Result, error)
func (r *HelmRepositoryOCIReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, HelmRepositoryReconcilerOptions{})
}
@ -146,26 +130,35 @@ func (r *HelmRepositoryOCIReconciler) Reconcile(ctx context.Context, req ctrl.Re
return ctrl.Result{}, err
}
// recResult stores the abstracted reconcile result.
var recResult sreconcile.Result
// Always attempt to patch the object after each reconciliation.
// NOTE: The final runtime result and error are set in this block.
defer func() {
summarizeHelper := summarize.NewHelper(r.EventRecorder, patchHelper)
summarizeOpts := []summarize.Option{
summarize.WithConditions(helmRepositoryOCIReadyCondition),
summarize.WithReconcileResult(recResult),
summarize.WithReconcileError(retErr),
summarize.WithIgnoreNotFound(),
summarize.WithProcessors(
summarize.RecordContextualError,
summarize.RecordReconcileReq,
),
summarize.WithResultBuilder(sreconcile.AlwaysRequeueResultBuilder{RequeueAfter: obj.GetRequeueAfter()}),
summarize.WithPatchFieldOwner(r.ControllerName),
// Patch the object, prioritizing the conditions owned by the controller in
// case of any conflicts.
patchOpts := []patch.Option{
patch.WithOwnedConditions{
Conditions: helmRepositoryOCIOwnedConditions,
},
}
patchOpts = append(patchOpts, patch.WithFieldOwner(r.ControllerName))
// If a reconcile annotation value is found, set it in the object status
// as status.lastHandledReconcileAt.
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
object.SetStatusLastHandledReconcileAt(obj, v)
}
// Set status observed generation option if the object is stalled, or
// if the object is ready.
if conditions.IsStalled(obj) || conditions.IsReady(obj) {
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
}
if err = patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
// Ignore patch error "not found" when the object is being deleted.
if !obj.GetDeletionTimestamp().IsZero() {
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
}
retErr = kerrors.NewAggregate([]error{retErr, err})
}
result, retErr = summarizeHelper.SummarizeAndPatch(ctx, obj, summarizeOpts...)
// Always record readiness and duration metrics
r.Metrics.RecordReadiness(ctx, obj)
@ -173,126 +166,121 @@ func (r *HelmRepositoryOCIReconciler) Reconcile(ctx context.Context, req ctrl.Re
}()
// Add finalizer first if it doesn't exist to avoid the race condition
// between init and delete
// between init and delete.
if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
recResult = sreconcile.ResultRequeue
return
return ctrl.Result{Requeue: true}, nil
}
// Examine if the object is under deletion
// Examine if the object is under deletion.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
recResult, retErr = r.reconcileDelete(ctx, obj)
return
return r.reconcileDelete(ctx, obj)
}
// Examine if a type change has happened and act accordingly
if obj.Spec.Type != sourcev1.HelmRepositoryTypeOCI {
// just ignore the object if the type has changed
recResult, retErr = sreconcile.ResultEmpty, nil
return
// Remove any stale condition and ignore the object if the type has
// changed.
obj.Status.Conditions = nil
return ctrl.Result{}, nil
}
// Reconcile actual object
reconcilers := []helmRepositoryOCIReconcileFunc{
r.reconcileSource,
}
recResult, retErr = r.reconcile(ctx, obj, reconcilers)
result, retErr = r.reconcile(ctx, obj)
return
}
// reconcileDelete handles the deletion of the object.
// Removing the finalizer from the object if successful.
func (r *HelmRepositoryOCIReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.HelmRepository) (sreconcile.Result, error) {
// Remove our finalizer from the list
controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
// Stop reconciliation as the object is being deleted
return sreconcile.ResultEmpty, nil
}
// notify emits notification related to the reconciliation.
func (r *HelmRepositoryOCIReconciler) notify(oldObj, newObj *sourcev1.HelmRepository, res sreconcile.Result, resErr error) {
// Notify successful recovery from any failure.
if resErr == nil && res == sreconcile.ResultSuccess {
if sreconcile.FailureRecovery(oldObj, newObj, helmRepositoryOCIFailConditions) {
r.Eventf(newObj, corev1.EventTypeNormal,
meta.SucceededReason, "Helm repository %q has been successfully reconciled", newObj.Name)
}
}
}
func (r *HelmRepositoryOCIReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmRepository, reconcilers []helmRepositoryOCIReconcileFunc) (sreconcile.Result, error) {
// reconcile reconciles the HelmRepository object. While reconciling, when an
// error is encountered, it sets the failure details in the appropriate status
// condition type and returns the error with appropriate ctrl.Result. The object
// status conditions and the returned results are evaluated in the deferred
// block at the very end to summarize the conditions to be in a consistent
// state.
func (r *HelmRepositoryOCIReconciler) reconcile(ctx context.Context, obj *v1beta2.HelmRepository) (result ctrl.Result, retErr error) {
oldObj := obj.DeepCopy()
// Mark as reconciling if generation differs.
defer func() {
// If it's stalled, ensure reconciling is removed.
if sc := conditions.Get(obj, meta.StalledCondition); sc != nil && sc.Status == metav1.ConditionTrue {
conditions.Delete(obj, meta.ReconcilingCondition)
}
// Check if it's a successful reconciliation.
if result.RequeueAfter == obj.GetRequeueAfter() && result.Requeue == false &&
retErr == nil {
// Remove reconciling condition if the reconciliation was successful.
conditions.Delete(obj, meta.ReconcilingCondition)
// If it's not ready even though it's not reconciling or stalled,
// set the ready failure message as the error.
// Based on isNonStalledSuccess() from internal/reconcile/summarize.
if ready := conditions.Get(obj, meta.ReadyCondition); ready != nil &&
ready.Status == metav1.ConditionFalse && !conditions.IsStalled(obj) {
retErr = errors.New(conditions.GetMessage(obj, meta.ReadyCondition))
}
}
// If it's still a successful reconciliation and it's not reconciling or
// stalled, mark Ready=True.
if !conditions.IsReconciling(obj) && !conditions.IsStalled(obj) &&
retErr == nil && result.RequeueAfter == obj.GetRequeueAfter() {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Helm repository is ready")
}
// Emit events when object's state changes.
ready := conditions.Get(obj, meta.ReadyCondition)
// Became ready from not ready.
if !conditions.IsReady(oldObj) && conditions.IsReady(obj) {
r.Eventf(obj, corev1.EventTypeNormal, ready.Reason, ready.Message)
}
// Became not ready from ready.
if conditions.IsReady(oldObj) && !conditions.IsReady(obj) {
r.Eventf(obj, corev1.EventTypeWarning, ready.Reason, ready.Message)
}
}()
// Set reconciling condition.
if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
}
// Run the sub-reconcilers and build the result of reconciliation.
var res sreconcile.Result
var resErr error
for _, rec := range reconcilers {
recResult, err := rec(ctx, obj)
// Exit immediately on ResultRequeue.
if recResult == sreconcile.ResultRequeue {
return sreconcile.ResultRequeue, nil
}
// If an error is received, prioritize the returned results because an
// error also means immediate requeue.
if err != nil {
resErr = err
res = recResult
break
}
// Prioritize requeue request in the result for successful results.
res = sreconcile.LowestRequeuingResult(res, recResult)
}
r.notify(oldObj, obj, res, resErr)
return res, resErr
}
func (r *HelmRepositoryOCIReconciler) reconcileSource(ctx context.Context, obj *sourcev1.HelmRepository) (sreconcile.Result, error) {
// Ensure that it's an OCI URL before continuing.
if !helmreg.IsOCI(obj.Spec.URL) {
e := &serror.Stalling{
Err: fmt.Errorf("the url scheme is not supported: %s", obj.Spec.URL),
Reason: sourcev1.URLInvalidReason,
u, err := url.Parse(obj.Spec.URL)
if err != nil {
err = fmt.Errorf("failed to parse URL: %w", err)
} else {
err = fmt.Errorf("URL scheme '%s' in '%s' is not supported", u.Scheme, obj.Spec.URL)
}
conditions.MarkFalse(obj, meta.ReadyCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
conditions.MarkStalled(obj, sourcev1.URLInvalidReason, err.Error())
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.URLInvalidReason, err.Error())
ctrl.LoggerFrom(ctx).Error(err, "reconciliation stalled")
result, retErr = ctrl.Result{}, nil
return
}
conditions.Delete(obj, meta.StalledCondition)
var loginOpts []helmreg.LoginOption
// Configure any authentication related options
// Configure any authentication related options.
if obj.Spec.SecretRef != nil {
// Attempt to retrieve secret
// Attempt to retrieve secret.
name := types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.Spec.SecretRef.Name,
}
var secret corev1.Secret
if err := r.Client.Get(ctx, name, &secret); err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to get secret '%s': %w", name.String(), err),
Reason: sourcev1.AuthenticationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
e := fmt.Errorf("failed to get secret '%s': %w", name.String(), err)
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.AuthenticationFailedReason, e.Error())
result, retErr = ctrl.Result{}, e
return
}
// Construct actual options
// Construct login options.
loginOpt, err := registry.LoginOptionFromSecret(obj.Spec.URL, secret)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to configure Helm client with secret data: %w", err),
Reason: sourcev1.AuthenticationFailedReason,
}
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
// Return err as the content of the secret may change.
return sreconcile.ResultEmpty, e
e := fmt.Errorf("failed to configure Helm client with secret data: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.AuthenticationFailedReason, e.Error())
result, retErr = ctrl.Result{}, e
return
}
if loginOpt != nil {
@ -300,22 +288,14 @@ func (r *HelmRepositoryOCIReconciler) reconcileSource(ctx context.Context, obj *
}
}
return r.validateSource(ctx, obj, loginOpts...)
}
// validateSource the HelmRepository object by checking the url and connecting to the underlying registry
// with he provided credentials.
func (r *HelmRepositoryOCIReconciler) validateSource(ctx context.Context, obj *sourcev1.HelmRepository, logOpts ...helmreg.LoginOption) (sreconcile.Result, error) {
registryClient, file, err := r.RegistryClientGenerator(logOpts != nil)
// Create registry client and login if needed.
registryClient, file, err := r.RegistryClientGenerator(loginOpts != nil)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to create registry client:: %w", err),
Reason: meta.FailedReason,
}
conditions.MarkFalse(obj, meta.ReadyCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
e := fmt.Errorf("failed to create registry client: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, e.Error())
result, retErr = ctrl.Result{}, e
return
}
if file != "" {
defer func() {
if err := os.Remove(file); err != nil {
@ -327,30 +307,40 @@ func (r *HelmRepositoryOCIReconciler) validateSource(ctx context.Context, obj *s
chartRepo, err := repository.NewOCIChartRepository(obj.Spec.URL, repository.WithOCIRegistryClient(registryClient))
if err != nil {
e := &serror.Stalling{
Err: fmt.Errorf("failed to parse URL '%s': %w", obj.Spec.URL, err),
Reason: sourcev1.URLInvalidReason,
}
conditions.MarkFalse(obj, meta.ReadyCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
e := fmt.Errorf("failed to parse URL '%s': %w", obj.Spec.URL, err)
conditions.MarkStalled(obj, sourcev1.URLInvalidReason, e.Error())
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.URLInvalidReason, e.Error())
result, retErr = ctrl.Result{}, nil
return
}
conditions.Delete(obj, meta.StalledCondition)
// Attempt to login to the registry if credentials are provided.
if logOpts != nil {
err = chartRepo.Login(logOpts...)
if loginOpts != nil {
err = chartRepo.Login(loginOpts...)
if err != nil {
e := &serror.Event{
Err: fmt.Errorf("failed to create temporary file: %w", err),
Reason: meta.FailedReason,
}
conditions.MarkFalse(obj, meta.ReadyCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
e := fmt.Errorf("failed to log into registry '%s': %w", obj.Spec.URL, err)
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.AuthenticationFailedReason, e.Error())
result, retErr = ctrl.Result{}, e
return
}
}
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Helm repository %q is ready", obj.Name)
// Remove any stale Ready condition, most likely False, set above. Its value
// is derived from the overall result of the reconciliation in the deferred
// block at the very end.
conditions.Delete(obj, meta.ReadyCondition)
return sreconcile.ResultSuccess, nil
result, retErr = ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
return
}
func (r *HelmRepositoryOCIReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.HelmRepository) (ctrl.Result, error) {
// Remove our finalizer from the list
controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
// Stop reconciliation as the object is being deleted
return ctrl.Result{}, nil
}
// eventLogf records events, and logs at the same time.

View File

@ -1179,13 +1179,16 @@ func TestHelmRepositoryReconciler_ReconcileTypeUpdatePredicateFilter(t *testing.
return false
}
readyCondition := conditions.Get(obj, meta.ReadyCondition)
if readyCondition == nil {
return false
}
return readyCondition.Status == metav1.ConditionTrue &&
newGen == readyCondition.ObservedGeneration &&
newGen == obj.Status.ObservedGeneration
}, timeout).Should(BeTrue())
// Check if the object status is valid.
condns = &status.Conditions{NegativePolarity: helmRepositoryOCIReadyCondition.NegativePolarity}
condns = &status.Conditions{NegativePolarity: helmRepositoryOCINegativeConditions}
checker = status.NewChecker(testEnv.Client, condns)
checker.CheckErr(ctx, obj)