Merge pull request #89 from fluxcd/refactor-reconciliation

Improve error handling and reporting
This commit is contained in:
Stefan Prodan 2020-07-15 12:38:02 +03:00 committed by GitHub
commit 90ed251d3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 94 deletions

View File

@ -22,6 +22,7 @@ import (
"io/ioutil"
"os"
"strings"
"time"
"github.com/go-git/go-git/v5/plumbing/object"
"github.com/go-git/go-git/v5/plumbing/transport"
@ -56,61 +57,62 @@ type GitRepositoryReconciler struct {
func (r *GitRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
start := time.Now()
var repository sourcev1.GitRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log := r.Log.WithValues(repository.Kind, req.NamespacedName)
log := r.Log.WithValues("controller", strings.ToLower(sourcev1.GitRepositoryKind), "request", req.NamespacedName)
// set initial status
if reset, status := r.shouldResetStatus(repository); reset {
log.Info("Initializing Git repository")
repository.Status = status
if err := r.Status().Update(ctx, &repository); err != nil {
log.Error(err, "unable to update GitRepository status")
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
} else {
repository = sourcev1.GitRepositoryProgressing(repository)
if err := r.Status().Update(ctx, &repository); err != nil {
log.Error(err, "unable to update GitRepository status")
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
}
// try to remove old artifacts
// purge old artifacts from storage
if err := r.gc(repository); err != nil {
log.Error(err, "artifacts GC failed")
log.Error(err, "unable to purge old artifacts")
}
// try git sync
syncedRepo, err := r.sync(ctx, *repository.DeepCopy())
if err != nil {
log.Error(err, "Git repository sync failed")
r.event(repository, recorder.EventSeverityError, err.Error())
if err := r.Status().Update(ctx, &syncedRepo); err != nil {
log.Error(err, "unable to update GitRepository status")
}
return ctrl.Result{Requeue: true}, err
} else {
// emit revision change event
if repository.Status.Artifact == nil || syncedRepo.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(syncedRepo, recorder.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(syncedRepo))
}
}
// reconcile repository by pulling the latest Git commit
reconciledRepository, reconcileErr := r.reconcile(ctx, *repository.DeepCopy())
// update status
if err := r.Status().Update(ctx, &syncedRepo); err != nil {
log.Error(err, "unable to update GitRepository status")
// update status with the reconciliation result
if err := r.Status().Update(ctx, &reconciledRepository); err != nil {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
log.Info("Git repository sync succeeded", "msg", sourcev1.GitRepositoryReadyMessage(syncedRepo))
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledRepository, recorder.EventSeverityError, reconcileErr.Error())
return ctrl.Result{Requeue: true}, reconcileErr
}
// emit revision change event
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(reconciledRepository, recorder.EventSeverityInfo, sourcev1.GitRepositoryReadyMessage(reconciledRepository))
}
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
repository.GetInterval().Duration.String(),
))
// requeue repository
return ctrl.Result{RequeueAfter: repository.GetInterval().Duration}, nil
}
type GitRepositoryReconcilerOptions struct {
@ -130,7 +132,7 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o
Complete(r)
}
func (r *GitRepositoryReconciler) sync(ctx context.Context, repository sourcev1.GitRepository) (sourcev1.GitRepository, error) {
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, repository sourcev1.GitRepository) (sourcev1.GitRepository, error) {
// create tmp dir for the Git clone
tmpGit, err := ioutil.TempDir("", repository.Name)
if err != nil {
@ -215,7 +217,7 @@ func (r *GitRepositoryReconciler) sync(ctx context.Context, repository sourcev1.
}
// shouldResetStatus returns a boolean indicating if the status of the
// given repository should be reset and a reset HelmChartStatus.
// given repository should be reset.
func (r *GitRepositoryReconciler) shouldResetStatus(repository sourcev1.GitRepository) (bool, sourcev1.GitRepositoryStatus) {
resetStatus := false
if repository.Status.Artifact != nil {
@ -240,6 +242,7 @@ func (r *GitRepositoryReconciler) shouldResetStatus(repository sourcev1.GitRepos
}
}
// verify returns an error if the PGP signature can't be verified
func (r *GitRepositoryReconciler) verify(ctx context.Context, publicKeySecret types.NamespacedName, commit *object.Commit) error {
if commit.PGPSignature == "" {
return fmt.Errorf("no PGP signature found for commit: %s", commit.Hash)
@ -272,7 +275,7 @@ func (r *GitRepositoryReconciler) gc(repository sourcev1.GitRepository) error {
return nil
}
// emit Kubernetes event and forward event to notification controller if configured
// event emits a Kubernetes event and forwards the event to notification controller if configured
func (r *GitRepositoryReconciler) event(repository sourcev1.GitRepository, severity, msg string) {
if r.EventRecorder != nil {
r.EventRecorder.Eventf(&repository, "Normal", severity, msg)
@ -281,7 +284,7 @@ func (r *GitRepositoryReconciler) event(repository sourcev1.GitRepository, sever
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
r.Log.WithValues(
strings.ToLower(repository.Kind),
"request",
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
).Error(err, "unable to send event")
return
@ -289,7 +292,7 @@ func (r *GitRepositoryReconciler) event(repository sourcev1.GitRepository, sever
if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
r.Log.WithValues(
strings.ToLower(repository.Kind),
"request",
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
).Error(err, "unable to send event")
return

View File

@ -22,6 +22,7 @@ import (
"io/ioutil"
"net/url"
"strings"
"time"
"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/getter"
@ -58,33 +59,33 @@ type HelmChartReconciler struct {
func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
start := time.Now()
var chart sourcev1.HelmChart
if err := r.Get(ctx, req.NamespacedName, &chart); err != nil {
return ctrl.Result{Requeue: true}, client.IgnoreNotFound(err)
}
log := r.Log.WithValues(chart.Kind, req.NamespacedName)
log := r.Log.WithValues("controller", strings.ToLower(sourcev1.HelmChartKind), "request", req.NamespacedName)
// set initial status
if reset, status := r.shouldResetStatus(chart); reset {
log.Info("Initializing Helm chart")
chart.Status = status
if err := r.Status().Update(ctx, &chart); err != nil {
log.Error(err, "unable to update HelmChart status")
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
} else {
chart = sourcev1.HelmChartProgressing(chart)
if err := r.Status().Update(ctx, &chart); err != nil {
log.Error(err, "unable to update HelmChart status")
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
}
// try to remove old artifacts
// purge old artifacts from storage
if err := r.gc(chart); err != nil {
log.Error(err, "artifacts GC failed")
log.Error(err, "unable to purge old artifacts")
}
// get referenced chart repository
@ -92,7 +93,7 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if err != nil {
chart = sourcev1.HelmChartNotReady(*chart.DeepCopy(), sourcev1.ChartPullFailedReason, err.Error())
if err := r.Status().Update(ctx, &chart); err != nil {
log.Error(err, "unable to update HelmChart status")
log.Error(err, "unable to update status")
}
return ctrl.Result{Requeue: true}, err
}
@ -100,34 +101,34 @@ func (r *HelmChartReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// set ownership reference so chart is garbage collected on
// repository removal
if err := r.setOwnerRef(ctx, &chart, repository); err != nil {
log.Error(err, "failed to set owner reference")
log.Error(err, "unable to set owner reference")
}
// try to pull chart
pulledChart, err := r.sync(ctx, repository, *chart.DeepCopy())
if err != nil {
log.Error(err, "Helm chart sync failed")
r.event(chart, recorder.EventSeverityError, err.Error())
if err := r.Status().Update(ctx, &pulledChart); err != nil {
log.Error(err, "unable to update HelmChart status")
}
return ctrl.Result{Requeue: true}, err
} else {
// emit version change event
if chart.Status.Artifact == nil || pulledChart.Status.Artifact.Revision != chart.Status.Artifact.Revision {
r.event(pulledChart, recorder.EventSeverityInfo, sourcev1.HelmChartReadyMessage(pulledChart))
}
}
// reconcile repository by downloading the chart tarball
reconciledChart, reconcileErr := r.reconcile(ctx, repository, *chart.DeepCopy())
// update status
if err := r.Status().Update(ctx, &pulledChart); err != nil {
log.Error(err, "unable to update HelmChart status")
// update status with the reconciliation result
if err := r.Status().Update(ctx, &reconciledChart); err != nil {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
log.Info("Helm chart sync succeeded", "msg", sourcev1.HelmChartReadyMessage(pulledChart))
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledChart, recorder.EventSeverityError, reconcileErr.Error())
return ctrl.Result{Requeue: true}, reconcileErr
}
// emit revision change event
if chart.Status.Artifact == nil || reconciledChart.Status.Artifact.Revision != chart.Status.Artifact.Revision {
r.event(reconciledChart, recorder.EventSeverityInfo, sourcev1.HelmChartReadyMessage(reconciledChart))
}
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
chart.GetInterval().Duration.String(),
))
// requeue chart
return ctrl.Result{RequeueAfter: chart.GetInterval().Duration}, nil
}
@ -148,7 +149,7 @@ func (r *HelmChartReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts
Complete(r)
}
func (r *HelmChartReconciler) sync(ctx context.Context, repository sourcev1.HelmRepository, chart sourcev1.HelmChart) (sourcev1.HelmChart, error) {
func (r *HelmChartReconciler) reconcile(ctx context.Context, repository sourcev1.HelmRepository, chart sourcev1.HelmChart) (sourcev1.HelmChart, error) {
indexBytes, err := ioutil.ReadFile(repository.Status.Artifact.Path)
if err != nil {
err = fmt.Errorf("failed to read Helm repository index file: %w", err)
@ -339,7 +340,7 @@ func (r *HelmChartReconciler) setOwnerRef(ctx context.Context, chart *sourcev1.H
return r.Update(ctx, chart)
}
// emit Kubernetes event and forward event to notification controller if configured
// event emits a Kubernetes event and forwards the event to notification controller if configured
func (r *HelmChartReconciler) event(chart sourcev1.HelmChart, severity, msg string) {
if r.EventRecorder != nil {
r.EventRecorder.Eventf(&chart, "Normal", severity, msg)
@ -348,7 +349,7 @@ func (r *HelmChartReconciler) event(chart sourcev1.HelmChart, severity, msg stri
objRef, err := reference.GetReference(r.Scheme, &chart)
if err != nil {
r.Log.WithValues(
strings.ToLower(chart.Kind),
"request",
fmt.Sprintf("%s/%s", chart.GetNamespace(), chart.GetName()),
).Error(err, "unable to send event")
return
@ -356,7 +357,7 @@ func (r *HelmChartReconciler) event(chart sourcev1.HelmChart, severity, msg stri
if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
r.Log.WithValues(
strings.ToLower(chart.Kind),
"request",
fmt.Sprintf("%s/%s", chart.GetNamespace(), chart.GetName()),
).Error(err, "unable to send event")
return

View File

@ -23,6 +23,7 @@ import (
"net/url"
"path"
"strings"
"time"
"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/getter"
@ -60,59 +61,60 @@ type HelmRepositoryReconciler struct {
func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
start := time.Now()
var repository sourcev1.HelmRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log := r.Log.WithValues(repository.Kind, req.NamespacedName)
log := r.Log.WithValues("controller", strings.ToLower(sourcev1.HelmRepositoryKind), "request", req.NamespacedName)
// set initial status
if reset, status := r.shouldResetStatus(repository); reset {
log.Info("Initializing Helm repository")
repository.Status = status
if err := r.Status().Update(ctx, &repository); err != nil {
log.Error(err, "unable to update HelmRepository status")
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
} else {
repository = sourcev1.HelmRepositoryProgressing(repository)
if err := r.Status().Update(ctx, &repository); err != nil {
log.Error(err, "unable to update HelmRepository status")
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
}
// try to remove old artifacts
// purge old artifacts from storage
if err := r.gc(repository); err != nil {
log.Error(err, "artifacts GC failed")
log.Error(err, "unable to purge old artifacts")
}
// try to download index
syncedRepo, err := r.sync(ctx, *repository.DeepCopy())
if err != nil {
log.Error(err, "Helm repository sync failed")
r.event(repository, recorder.EventSeverityError, err.Error())
if err := r.Status().Update(ctx, &syncedRepo); err != nil {
log.Error(err, "unable to update HelmRepository status")
}
return ctrl.Result{Requeue: true}, err
} else {
// emit revision change event
if repository.Status.Artifact == nil || syncedRepo.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(syncedRepo, recorder.EventSeverityInfo, sourcev1.HelmRepositoryReadyMessage(syncedRepo))
}
}
// reconcile repository by downloading the index.yaml file
reconciledRepository, reconcileErr := r.reconcile(ctx, *repository.DeepCopy())
// update status
if err := r.Status().Update(ctx, &syncedRepo); err != nil {
log.Error(err, "unable to update HelmRepository status")
// update status with the reconciliation result
if err := r.Status().Update(ctx, &reconciledRepository); err != nil {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
log.Info("Helm repository sync succeeded", "msg", sourcev1.HelmRepositoryReadyMessage(syncedRepo))
// requeue repository
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(reconciledRepository, recorder.EventSeverityError, reconcileErr.Error())
return ctrl.Result{Requeue: true}, reconcileErr
}
// emit revision change event
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(reconciledRepository, recorder.EventSeverityInfo, sourcev1.HelmRepositoryReadyMessage(reconciledRepository))
}
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Now().Sub(start).String(),
repository.GetInterval().Duration.String(),
))
return ctrl.Result{RequeueAfter: repository.GetInterval().Duration}, nil
}
@ -133,7 +135,7 @@ func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager,
Complete(r)
}
func (r *HelmRepositoryReconciler) sync(ctx context.Context, repository sourcev1.HelmRepository) (sourcev1.HelmRepository, error) {
func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, repository sourcev1.HelmRepository) (sourcev1.HelmRepository, error) {
u, err := url.Parse(repository.Spec.URL)
if err != nil {
return sourcev1.HelmRepositoryNotReady(repository, sourcev1.URLInvalidReason, err.Error()), err
@ -233,7 +235,7 @@ func (r *HelmRepositoryReconciler) sync(ctx context.Context, repository sourcev1
}
// shouldResetStatus returns a boolean indicating if the status of the
// given repository should be reset and a reset HelmChartStatus.
// given repository should be reset.
func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRepository) (bool, sourcev1.HelmRepositoryStatus) {
resetStatus := false
if repository.Status.Artifact != nil {
@ -268,7 +270,7 @@ func (r *HelmRepositoryReconciler) gc(repository sourcev1.HelmRepository) error
return nil
}
// emit Kubernetes event and forward event to notification controller if configured
// event emits a Kubernetes event and forwards the event to notification controller if configured
func (r *HelmRepositoryReconciler) event(repository sourcev1.HelmRepository, severity, msg string) {
if r.EventRecorder != nil {
r.EventRecorder.Eventf(&repository, "Normal", severity, msg)
@ -277,7 +279,7 @@ func (r *HelmRepositoryReconciler) event(repository sourcev1.HelmRepository, sev
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
r.Log.WithValues(
strings.ToLower(repository.Kind),
"request",
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
).Error(err, "unable to send event")
return
@ -285,7 +287,7 @@ func (r *HelmRepositoryReconciler) event(repository sourcev1.HelmRepository, sev
if err := r.ExternalEventRecorder.Eventf(*objRef, nil, severity, severity, msg); err != nil {
r.Log.WithValues(
strings.ToLower(repository.Kind),
"request",
fmt.Sprintf("%s/%s", repository.GetNamespace(), repository.GetName()),
).Error(err, "unable to send event")
return