Rewrite `HelmRepositoryReconciler` to new standards

This commit rewrites the `HelmRepositoryReconciler` to new standards,
while implementing the newly introduced Condition types, and trying to
adhere better to Kubernetes API conventions.

More specifically it introduces:

- Implementation of more explicit Condition types to highlight
  abnormalities.
- Extensive usage of the `conditions` subpackage from `runtime`.
- Better and more conflict-resilient (status)patching of reconciled
  objects using the `patch` subpackage from runtime.
- Proper implementation of kstatus' `Reconciling` and `Stalled`
  conditions.
- Refactoring of some Helm elements to make them easier to use within
  the new reconciler logic.
- Integration tests that solely rely on `testenv` and do not
  use Ginkgo.

There are a couple of TODOs marked in-code, these are suggestions for
the future and should be non-blocking.
In addition to the TODOs, more complex and/or edge-case test scenarios
may be added as well.

Signed-off-by: Hidde Beydals <hello@hidde.co>
This commit is contained in:
Hidde Beydals 2021-07-31 16:48:03 +02:00
parent 5f125ebfcd
commit dd68cd57b7
6 changed files with 881 additions and 634 deletions

View File

@ -19,12 +19,10 @@ package v1beta2
import (
"time"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/fluxcd/pkg/apis/acl"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
)
const (
@ -108,47 +106,6 @@ const (
IndexationSucceededReason string = "IndexationSucceed"
)
// HelmRepositoryProgressing resets the conditions of the HelmRepository to
// metav1.Condition of type meta.ReadyCondition with status 'Unknown' and
// meta.ProgressingReason reason and message. It returns the modified
// HelmRepository.
func HelmRepositoryProgressing(repository HelmRepository) HelmRepository {
repository.Status.ObservedGeneration = repository.Generation
repository.Status.URL = ""
repository.Status.Conditions = []metav1.Condition{}
conditions.MarkUnknown(&repository, meta.ReadyCondition, meta.ProgressingReason, "reconciliation in progress")
return repository
}
// HelmRepositoryReady sets the given Artifact and URL on the HelmRepository and
// sets the meta.ReadyCondition to 'True', with the given reason and message. It
// returns the modified HelmRepository.
func HelmRepositoryReady(repository HelmRepository, artifact Artifact, url, reason, message string) HelmRepository {
repository.Status.Artifact = &artifact
repository.Status.URL = url
conditions.MarkTrue(&repository, meta.ReadyCondition, reason, message)
return repository
}
// HelmRepositoryNotReady sets the meta.ReadyCondition on the given
// HelmRepository to 'False', with the given reason and message. It returns the
// modified HelmRepository.
func HelmRepositoryNotReady(repository HelmRepository, reason, message string) HelmRepository {
conditions.MarkFalse(&repository, meta.ReadyCondition, reason, message)
return repository
}
// HelmRepositoryReadyMessage returns the message of the metav1.Condition of type
// meta.ReadyCondition with status 'True' if present, or an empty string.
func HelmRepositoryReadyMessage(repository HelmRepository) string {
if c := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); c != nil {
if c.Status == metav1.ConditionTrue {
return c.Message
}
}
return ""
}
// GetConditions returns the status conditions of the object.
func (in HelmRepository) GetConditions() []metav1.Condition {
return in.Status.Conditions

View File

@ -25,12 +25,11 @@ import (
helmgetter "helm.sh/helm/v3/pkg/getter"
corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
@ -38,8 +37,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/events"
"github.com/fluxcd/pkg/runtime/metrics"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
@ -55,12 +55,11 @@ import (
// HelmRepositoryReconciler reconciles a HelmRepository object
type HelmRepositoryReconciler struct {
client.Client
Scheme *runtime.Scheme
Storage *Storage
Getters helmgetter.Providers
EventRecorder kuberecorder.EventRecorder
ExternalEventRecorder *events.Recorder
MetricsRecorder *metrics.Recorder
kuberecorder.EventRecorder
helper.Metrics
Getters helmgetter.Providers
Storage *Storage
}
type HelmRepositoryReconcilerOptions struct {
@ -79,317 +78,388 @@ func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager,
Complete(r)
}
func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
var repository sourcev1.HelmRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
// Fetch the HelmRepository
obj := &sourcev1.HelmRepository{}
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Record suspended status metric
defer r.recordSuspension(ctx, repository)
r.RecordSuspend(ctx, obj, obj.Spec.Suspend)
// Add our finalizer if it does not exist
if !controllerutil.ContainsFinalizer(&repository, sourcev1.SourceFinalizer) {
patch := client.MergeFrom(repository.DeepCopy())
controllerutil.AddFinalizer(&repository, sourcev1.SourceFinalizer)
if err := r.Patch(ctx, &repository, patch); err != nil {
log.Error(err, "unable to register finalizer")
return ctrl.Result{}, err
}
}
// Examine if the object is under deletion
if !repository.ObjectMeta.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, repository)
}
// Return early if the object is suspended.
if repository.Spec.Suspend {
// Return early if the object is suspended
if obj.Spec.Suspend {
log.Info("Reconciliation is suspended for this object")
return ctrl.Result{}, nil
}
// record reconciliation duration
if r.MetricsRecorder != nil {
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
return ctrl.Result{}, err
// Initialize the patch helper
patchHelper, err := patch.NewHelper(obj, r.Client)
if err != nil {
return ctrl.Result{}, err
}
// Always attempt to patch the object and status after each reconciliation
defer func() {
// Record the value of the reconciliation request, if any
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
obj.Status.SetLastHandledReconcileRequest(v)
}
defer r.MetricsRecorder.RecordDuration(*objRef, start)
}
// set initial status
if resetRepository, ok := r.resetStatus(repository); ok {
repository = resetRepository
if err := r.updateStatus(ctx, req, repository.Status); err != nil {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
// Summarize Ready condition
conditions.SetSummary(obj,
meta.ReadyCondition,
conditions.WithConditions(
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
sourcev1.ArtifactUnavailableCondition,
),
conditions.WithNegativePolarityConditions(
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
sourcev1.ArtifactUnavailableCondition,
),
)
// Patch the object, ignoring conflicts on the conditions owned by this controller
patchOpts := []patch.Option{
patch.WithOwnedConditions{
Conditions: []string{
sourcev1.FetchFailedCondition,
sourcev1.ArtifactOutdatedCondition,
sourcev1.ArtifactUnavailableCondition,
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
},
},
}
r.recordReadiness(ctx, repository)
// Determine if the resource is still being reconciled, or if it has stalled, and record this observation
if retErr == nil && (result.IsZero() || !result.Requeue) {
// We are no longer reconciling
conditions.Delete(obj, meta.ReconcilingCondition)
// We have now observed this generation
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
readyCondition := conditions.Get(obj, meta.ReadyCondition)
switch readyCondition.Status {
case metav1.ConditionFalse:
// As we are no longer reconciling and the end-state
// is not ready, the reconciliation has stalled
conditions.MarkStalled(obj, readyCondition.Reason, readyCondition.Message)
case metav1.ConditionTrue:
// As we are no longer reconciling and the end-state
// is ready, the reconciliation is no longer stalled
conditions.Delete(obj, meta.StalledCondition)
}
}
// Finally, patch the resource
if err := patchHelper.Patch(ctx, obj, patchOpts...); err != nil {
// Ignore patch error "not found" when the object is being deleted.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
}
retErr = kerrors.NewAggregate([]error{retErr, err})
}
// Always record readiness and duration metrics
r.Metrics.RecordReadiness(ctx, obj)
r.Metrics.RecordDuration(ctx, obj, start)
}()
// Add finalizer first if not exist to avoid the race condition
// between init and delete
if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
return ctrl.Result{Requeue: true}, nil
}
// record the value of the reconciliation request, if any
// TODO(hidde): would be better to defer this in combination with
// always patching the status sub-resource after a reconciliation.
if v, ok := meta.ReconcileAnnotationValue(repository.GetAnnotations()); ok {
repository.Status.SetLastHandledReconcileRequest(v)
// Examine if the object is under deletion
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, obj)
}
// purge old artifacts from storage
if err := r.gc(repository); err != nil {
log.Error(err, "unable to purge old artifacts")
}
// reconcile repository by downloading the index.yaml file
reconciledRepository, reconcileErr := r.reconcile(ctx, *repository.DeepCopy())
// update status with the reconciliation result
if err := r.updateStatus(ctx, req, reconciledRepository.Status); err != nil {
log.Error(err, "unable to update status")
return ctrl.Result{Requeue: true}, err
}
// if reconciliation failed, record the failure and requeue immediately
if reconcileErr != nil {
r.event(ctx, reconciledRepository, events.EventSeverityError, reconcileErr.Error())
r.recordReadiness(ctx, reconciledRepository)
return ctrl.Result{Requeue: true}, reconcileErr
}
// emit revision change event
if repository.Status.Artifact == nil || reconciledRepository.Status.Artifact.Revision != repository.Status.Artifact.Revision {
r.event(ctx, reconciledRepository, events.EventSeverityInfo, sourcev1.HelmRepositoryReadyMessage(reconciledRepository))
}
r.recordReadiness(ctx, reconciledRepository)
log.Info(fmt.Sprintf("Reconciliation finished in %s, next run in %s",
time.Since(start).String(),
repository.GetRequeueAfter().String(),
))
return ctrl.Result{RequeueAfter: repository.GetRequeueAfter()}, nil
// Reconcile actual object
return r.reconcile(ctx, obj)
}
func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, repo sourcev1.HelmRepository) (sourcev1.HelmRepository, error) {
log := ctrl.LoggerFrom(ctx)
clientOpts := []helmgetter.Option{
helmgetter.WithURL(repo.Spec.URL),
helmgetter.WithTimeout(repo.Spec.Timeout.Duration),
helmgetter.WithPassCredentialsAll(repo.Spec.PassCredentials),
// reconcile steps through the actual reconciliation tasks for the object, it returns early on the first step that
// produces an error.
func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmRepository) (ctrl.Result, error) {
// Mark the resource as under reconciliation
conditions.MarkReconciling(obj, meta.ProgressingReason, "")
// Reconcile the storage data
if result, err := r.reconcileStorage(ctx, obj); err != nil {
return result, err
}
if repo.Spec.SecretRef != nil {
var chartRepo repository.ChartRepository
var artifact sourcev1.Artifact
// Reconcile the source from upstream
if result, err := r.reconcileSource(ctx, obj, &artifact, &chartRepo); err != nil || result.IsZero() {
return result, err
}
// Reconcile the artifact.
if result, err := r.reconcileArtifact(ctx, obj, artifact, &chartRepo); err != nil || result.IsZero() {
return result, err
}
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
// reconcileStorage ensures the current state of the storage matches the desired and previously observed state.
//
// All artifacts for the resource except for the current one are garbage collected from the storage.
// If the artifact in the Status object of the resource disappeared from storage, it is removed from the object.
// If the object does not have an artifact in its Status object, a v1beta1.ArtifactUnavailableCondition is set.
// If the hostname of any of the URLs on the object do not match the current storage server hostname, they are updated.
//
// The caller should assume a failure if an error is returned, or the Result is zero.
func (r *HelmRepositoryReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.HelmRepository) (ctrl.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
_ = r.garbageCollect(ctx, obj)
// Determine if the advertised artifact is still in storage
if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
obj.Status.Artifact = nil
obj.Status.URL = ""
}
// Record that we do not have an artifact
if obj.GetArtifact() == nil {
conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage")
return ctrl.Result{Requeue: true}, nil
}
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
// Always update URLs to ensure hostname is up-to-date
// TODO(hidde): we may want to send out an event only if we notice the URL has changed
r.Storage.SetArtifactURL(obj.GetArtifact())
obj.Status.URL = r.Storage.SetHostname(obj.Status.URL)
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
// reconcileSource ensures the upstream Helm repository can be reached and downloaded out using the declared
// configuration, and stores a new artifact in the storage.
//
// The Helm repository index is downloaded using the defined configuration, and in case of an error during this process
// (including transient errors), it records v1beta1.FetchFailedCondition=True and returns early.
// On a successful write of a new artifact, the artifact in the status of the given object is set, and the symlink in
// the storage is updated to its path.
//
// The caller should assume a failure if an error is returned, or the Result is zero.
func (r *HelmRepositoryReconciler) reconcileSource(ctx context.Context, obj *sourcev1.HelmRepository, artifact *sourcev1.Artifact, chartRepo *repository.ChartRepository) (ctrl.Result, error) {
// Configure Helm client to access repository
clientOpts := []helmgetter.Option{
helmgetter.WithTimeout(obj.Spec.Timeout.Duration),
helmgetter.WithURL(obj.Spec.URL),
helmgetter.WithPassCredentialsAll(obj.Spec.PassCredentials),
}
// Configure any authentication related options
if obj.Spec.SecretRef != nil {
// Attempt to retrieve secret
name := types.NamespacedName{
Namespace: repo.GetNamespace(),
Name: repo.Spec.SecretRef.Name,
Namespace: obj.GetNamespace(),
Name: obj.Spec.SecretRef.Name,
}
var secret corev1.Secret
err := r.Client.Get(ctx, name, &secret)
if err != nil {
err = fmt.Errorf("auth secret error: %w", err)
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.AuthenticationFailedReason, err.Error()), err
if err := r.Client.Get(ctx, name, &secret); err != nil {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason,
"Failed to get secret '%s': %s", name.String(), err.Error())
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.AuthenticationFailedReason,
"Failed to get secret '%s': %s", name.String(), err.Error())
// Return error as the world as observed may change
return ctrl.Result{}, err
}
authDir, err := os.MkdirTemp("", repo.Kind+"-"+repo.Namespace+"-"+repo.Name+"-")
// Get client options from secret
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-auth-", obj.Name, obj.Namespace))
if err != nil {
err = fmt.Errorf("failed to create temporary working directory for credentials: %w", err)
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.AuthenticationFailedReason, err.Error()), err
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.StorageOperationFailedReason,
"Failed to create temporary directory for credentials: %s", err.Error())
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
"Failed to create temporary directory for credentials: %s", err.Error())
return ctrl.Result{}, err
}
defer func() {
if err := os.RemoveAll(authDir); err != nil {
log.Error(err, "failed to remove working directory", "path", authDir)
}
}()
defer os.RemoveAll(tmpDir)
opts, err := getter.ClientOptionsFromSecret(authDir, secret)
// Construct actual options
opts, err := getter.ClientOptionsFromSecret(tmpDir, secret)
if err != nil {
err = fmt.Errorf("auth options error: %w", err)
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.AuthenticationFailedReason, err.Error()), err
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason,
"Failed to configure Helm client with secret data: %s", err)
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.AuthenticationFailedReason,
"Failed to configure Helm client with secret data: %s", err)
// Return err as the content of the secret may change
return ctrl.Result{}, err
}
clientOpts = append(clientOpts, opts...)
}
chartRepo, err := repository.NewChartRepository(repo.Spec.URL, "", r.Getters, clientOpts)
// Construct Helm chart repository with options and download index
newChartRepo, err := repository.NewChartRepository(obj.Spec.URL, "", r.Getters, clientOpts)
if err != nil {
switch err.(type) {
case *url.Error:
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.URLInvalidReason, err.Error()), err
ctrl.LoggerFrom(ctx).Error(err, "invalid Helm repository URL")
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.URLInvalidReason,
"Invalid Helm repository URL: %s", err.Error())
return ctrl.Result{}, nil
default:
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.IndexationFailedReason, err.Error()), err
ctrl.LoggerFrom(ctx).Error(err, "failed to construct Helm client")
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, meta.FailedReason,
"Failed to construct Helm client: %s", err.Error())
return ctrl.Result{}, nil
}
}
checksum, err := chartRepo.CacheIndex()
checksum, err := newChartRepo.CacheIndex()
if err != nil {
err = fmt.Errorf("failed to download repository index: %w", err)
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.IndexationFailedReason, err.Error()), err
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, meta.FailedReason,
"Failed to download Helm repository index: %s", err.Error())
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.FetchFailedCondition,
"Failed to download Helm repository index: %s", err.Error())
// Coin flip on transient or persistent error, return error and hope for the best
return ctrl.Result{}, err
}
defer chartRepo.RemoveCache()
*chartRepo = *newChartRepo
artifact := r.Storage.NewArtifactFor(repo.Kind,
repo.ObjectMeta.GetObjectMeta(),
"",
// Load the cached repository index to ensure it passes validation.
if err := chartRepo.LoadFromCache(); err != nil {
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.IndexationFailedReason,
"Failed to load Helm repository from cache: %s", err.Error())
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.FetchFailedCondition,
"Failed to load Helm repository from cache: %s", err.Error())
return ctrl.Result{}, err
}
defer chartRepo.Unload()
// Mark observations about the revision on the object.
if !obj.GetArtifact().HasRevision(checksum) {
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision",
"New index revision '%s'", checksum)
}
conditions.Delete(obj, sourcev1.FetchFailedCondition)
// Create potential new artifact.
*artifact = r.Storage.NewArtifactFor(obj.Kind,
obj.ObjectMeta.GetObjectMeta(),
chartRepo.Checksum,
fmt.Sprintf("index-%s.yaml", checksum))
// Return early on unchanged index
if apimeta.IsStatusConditionTrue(repo.Status.Conditions, meta.ReadyCondition) &&
(repo.GetArtifact() != nil && repo.GetArtifact().Checksum == checksum) {
if artifact.URL != repo.GetArtifact().URL {
r.Storage.SetArtifactURL(repo.GetArtifact())
repo.Status.URL = r.Storage.SetHostname(repo.Status.URL)
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.HelmRepository, artifact sourcev1.Artifact, chartRepo *repository.ChartRepository) (ctrl.Result, error) {
// Always restore the Ready condition in case it got removed due to a transient error.
defer func() {
if obj.GetArtifact() != nil {
conditions.Delete(obj, sourcev1.ArtifactUnavailableCondition)
}
return repo, nil
if obj.GetArtifact().HasRevision(artifact.Revision) {
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason,
"Stored artifact for revision '%s'", artifact.Revision)
}
}()
if obj.GetArtifact().HasRevision(artifact.Revision) {
ctrl.LoggerFrom(ctx).Info(fmt.Sprintf("Already up to date, current revision '%s'", artifact.Revision))
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
// Load the cached repository index to ensure it passes validation
if err := chartRepo.LoadFromCache(); err != nil {
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.IndexationFailedReason, err.Error()), err
}
// The repository checksum is the SHA256 of the loaded bytes, after sorting
artifact.Revision = chartRepo.Checksum
chartRepo.Unload()
// Clear cache at the very end.
defer chartRepo.RemoveCache()
// Create artifact dir
err = r.Storage.MkdirAll(artifact)
if err != nil {
err = fmt.Errorf("unable to create repository index directory: %w", err)
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.StorageOperationFailedReason, err.Error()), err
// Create artifact dir.
if err := r.Storage.MkdirAll(artifact); err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to create artifact directory")
return ctrl.Result{}, err
}
// Acquire lock
// Acquire lock.
unlock, err := r.Storage.Lock(artifact)
if err != nil {
err = fmt.Errorf("unable to acquire lock: %w", err)
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.StorageOperationFailedReason, err.Error()), err
ctrl.LoggerFrom(ctx).Error(err, "failed to acquire lock for artifact")
return ctrl.Result{}, err
}
defer unlock()
// Save artifact to storage
// Save artifact to storage.
if err = r.Storage.CopyFromPath(&artifact, chartRepo.CachePath); err != nil {
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.StorageOperationFailedReason, err.Error()), err
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
"Unable to save artifact to storage: %s", err)
return ctrl.Result{}, err
}
// Update index symlink
// Record it on the object.
obj.Status.Artifact = artifact.DeepCopy()
// Update index symlink.
indexURL, err := r.Storage.Symlink(artifact, "index.yaml")
if err != nil {
err = fmt.Errorf("storage error: %w", err)
return sourcev1.HelmRepositoryNotReady(repo, sourcev1.StorageOperationFailedReason, err.Error()), err
r.Eventf(obj, corev1.EventTypeWarning, sourcev1.StorageOperationFailedReason,
"Failed to update status URL symlink: %s", err)
}
message := fmt.Sprintf("Fetched revision: %s", artifact.Revision)
return sourcev1.HelmRepositoryReady(repo, artifact, indexURL, sourcev1.IndexationSucceededReason, message), nil
if indexURL != "" {
obj.Status.URL = indexURL
}
return ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
}
func (r *HelmRepositoryReconciler) reconcileDelete(ctx context.Context, repository sourcev1.HelmRepository) (ctrl.Result, error) {
// Our finalizer is still present, so lets handle garbage collection
if err := r.gc(repository); err != nil {
r.event(ctx, repository, events.EventSeverityError,
fmt.Sprintf("garbage collection for deleted resource failed: %s", err.Error()))
// reconcileDelete handles the delete of an object. It first garbage collects all artifacts for the object from the
// artifact storage, if successful, the finalizer is removed from the object.
func (r *HelmRepositoryReconciler) reconcileDelete(ctx context.Context, obj *sourcev1.HelmRepository) (ctrl.Result, error) {
// Garbage collect the resource's artifacts
if err := r.garbageCollect(ctx, obj); err != nil {
// Return the error so we retry the failed garbage collection
return ctrl.Result{}, err
}
// Record deleted status
r.recordReadiness(ctx, repository)
// Remove our finalizer from the list and update it
controllerutil.RemoveFinalizer(&repository, sourcev1.SourceFinalizer)
if err := r.Update(ctx, &repository); err != nil {
return ctrl.Result{}, err
}
// Remove our finalizer from the list
controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
// Stop reconciliation as the object is being deleted
return ctrl.Result{}, nil
}
// resetStatus returns a modified v1beta1.HelmRepository and a boolean indicating
// if the status field has been reset.
func (r *HelmRepositoryReconciler) resetStatus(repository sourcev1.HelmRepository) (sourcev1.HelmRepository, bool) {
// We do not have an artifact, or it does no longer exist
if repository.GetArtifact() == nil || !r.Storage.ArtifactExist(*repository.GetArtifact()) {
repository = sourcev1.HelmRepositoryProgressing(repository)
repository.Status.Artifact = nil
return repository, true
// garbageCollect performs a garbage collection for the given v1beta1.HelmRepository. It removes all but the current
// artifact except for when the deletion timestamp is set, which will result in the removal of all artifacts for the
// resource.
func (r *HelmRepositoryReconciler) garbageCollect(ctx context.Context, obj *sourcev1.HelmRepository) error {
if !obj.DeletionTimestamp.IsZero() {
if err := r.Storage.RemoveAll(r.Storage.NewArtifactFor(obj.Kind, obj.GetObjectMeta(), "", "*")); err != nil {
r.Eventf(obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
"Garbage collection for deleted resource failed: %s", err)
return err
}
obj.Status.Artifact = nil
// TODO(hidde): we should only push this event if we actually garbage collected something
r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded",
"Garbage collected artifacts for deleted resource")
return nil
}
if repository.Generation != repository.Status.ObservedGeneration {
return sourcev1.HelmRepositoryProgressing(repository), true
}
return repository, false
}
// gc performs a garbage collection for the given v1beta1.HelmRepository.
// It removes all but the current artifact except for when the
// deletion timestamp is set, which will result in the removal of
// all artifacts for the resource.
func (r *HelmRepositoryReconciler) gc(repository sourcev1.HelmRepository) error {
if !repository.DeletionTimestamp.IsZero() {
return r.Storage.RemoveAll(r.Storage.NewArtifactFor(repository.Kind, repository.GetObjectMeta(), "", "*"))
}
if repository.GetArtifact() != nil {
return r.Storage.RemoveAllButCurrent(*repository.GetArtifact())
if obj.GetArtifact() != nil {
if err := r.Storage.RemoveAllButCurrent(*obj.GetArtifact()); err != nil {
r.Eventf(obj, corev1.EventTypeWarning, "GarbageCollectionFailed",
"Garbage collection of old artifacts failed: %s", err)
return err
}
// TODO(hidde): we should only push this event if we actually garbage collected something
r.Eventf(obj, corev1.EventTypeNormal, "GarbageCollectionSucceeded",
"Garbage collected old artifacts")
}
return nil
}
// event emits a Kubernetes event and forwards the event to notification controller if configured
func (r *HelmRepositoryReconciler) event(ctx context.Context, repository sourcev1.HelmRepository, severity, msg string) {
if r.EventRecorder != nil {
r.EventRecorder.Eventf(&repository, corev1.EventTypeNormal, severity, msg)
}
if r.ExternalEventRecorder != nil {
r.ExternalEventRecorder.Eventf(&repository, corev1.EventTypeNormal, severity, msg)
}
}
func (r *HelmRepositoryReconciler) recordReadiness(ctx context.Context, repository sourcev1.HelmRepository) {
log := ctrl.LoggerFrom(ctx)
if r.MetricsRecorder == nil {
return
}
objRef, err := reference.GetReference(r.Scheme, &repository)
if err != nil {
log.Error(err, "unable to record readiness metric")
return
}
if rc := apimeta.FindStatusCondition(repository.Status.Conditions, meta.ReadyCondition); rc != nil {
r.MetricsRecorder.RecordCondition(*objRef, *rc, !repository.DeletionTimestamp.IsZero())
} else {
r.MetricsRecorder.RecordCondition(*objRef, metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionUnknown,
}, !repository.DeletionTimestamp.IsZero())
}
}
func (r *HelmRepositoryReconciler) updateStatus(ctx context.Context, req ctrl.Request, newStatus sourcev1.HelmRepositoryStatus) error {
var repository sourcev1.HelmRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
return err
}
patch := client.MergeFrom(repository.DeepCopy())
repository.Status = newStatus
return r.Status().Patch(ctx, &repository, patch)
}
func (r *HelmRepositoryReconciler) recordSuspension(ctx context.Context, hr sourcev1.HelmRepository) {
if r.MetricsRecorder == nil {
return
}
log := ctrl.LoggerFrom(ctx)
objRef, err := reference.GetReference(r.Scheme, &hr)
if err != nil {
log.Error(err, "unable to record suspended metric")
return
}
if !hr.DeletionTimestamp.IsZero() {
r.MetricsRecorder.RecordSuspend(*objRef, false)
} else {
r.MetricsRecorder.RecordSuspend(*objRef, hr.Spec.Suspend)
}
}

View File

@ -18,389 +18,600 @@ package controllers
import (
"context"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"time"
"testing"
. "github.com/onsi/ginkgo"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"
"helm.sh/helm/v3/pkg/getter"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/helmtestserver"
"github.com/fluxcd/pkg/runtime/conditions"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/helm/repository"
)
var _ = Describe("HelmRepositoryReconciler", func() {
var (
testGetters = getter.Providers{
getter.Provider{
Schemes: []string{"http", "https"},
New: getter.NewHTTPGetter,
},
}
)
const (
timeout = time.Second * 30
interval = time.Second * 1
indexInterval = time.Second * 2
repositoryTimeout = time.Second * 5
)
func TestHelmRepositoryReconciler_Reconcile(t *testing.T) {
g := NewWithT(t)
Context("HelmRepository", func() {
var (
namespace *corev1.Namespace
helmServer *helmtestserver.HelmServer
err error
)
testServer, err := helmtestserver.NewTempHelmServer()
g.Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(testServer.Root())
BeforeEach(func() {
namespace = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: "helm-repository-" + randStringRunes(5)},
g.Expect(testServer.PackageChart("testdata/charts/helmchart")).To(Succeed())
g.Expect(testServer.GenerateIndex()).To(Succeed())
testServer.Start()
defer testServer.Stop()
obj := &sourcev1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "helmrepository-reconcile-",
Namespace: "default",
},
Spec: sourcev1.HelmRepositorySpec{
Interval: metav1.Duration{Duration: interval},
URL: testServer.URL(),
},
}
g.Expect(testEnv.Create(ctx, obj)).To(Succeed())
key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}
// Wait for finalizer to be set
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
return len(obj.Finalizers) > 0
}, timeout).Should(BeTrue())
// Wait for HelmRepository to be Ready
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
if !conditions.IsReady(obj) && obj.Status.Artifact == nil {
return false
}
readyCondition := conditions.Get(obj, meta.ReadyCondition)
return readyCondition.Status == metav1.ConditionTrue &&
obj.Generation == readyCondition.ObservedGeneration
}, timeout).Should(BeTrue())
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
// Wait for HelmRepository to be deleted
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return apierrors.IsNotFound(err)
}
return false
}, timeout).Should(BeTrue())
}
func TestHelmRepositoryReconciler_reconcileStorage(t *testing.T) {
tests := []struct {
name string
beforeFunc func(obj *sourcev1.HelmRepository, storage *Storage) error
want ctrl.Result
wantErr bool
assertArtifact *sourcev1.Artifact
assertConditions []metav1.Condition
assertPaths []string
}{
{
name: "garbage collects",
beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error {
revisions := []string{"a", "b", "c"}
for n := range revisions {
v := revisions[n]
obj.Status.Artifact = &sourcev1.Artifact{
Path: fmt.Sprintf("/reconcile-storage/%s.txt", v),
Revision: v,
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader(v), 0644); err != nil {
return err
}
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/c.txt",
Revision: "c",
Checksum: "2e7d2c03a9507ae265ecf5b5356885a53393a2029d241394997265a1a25aefc6",
URL: testStorage.Hostname + "/reconcile-storage/c.txt",
},
assertPaths: []string{
"/reconcile-storage/c.txt",
"!/reconcile-storage/b.txt",
"!/reconcile-storage/a.txt",
},
},
{
name: "notices missing artifact in storage",
beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error {
obj.Status.Artifact = &sourcev1.Artifact{
Path: "/reconcile-storage/invalid.txt",
Revision: "d",
}
testStorage.SetArtifactURL(obj.Status.Artifact)
return nil
},
want: ctrl.Result{Requeue: true},
assertPaths: []string{
"!/reconcile-storage/invalid.txt",
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactUnavailableCondition, "NoArtifact", "No artifact for resource in storage"),
},
},
{
name: "updates hostname on diff from current",
beforeFunc: func(obj *sourcev1.HelmRepository, storage *Storage) error {
obj.Status.Artifact = &sourcev1.Artifact{
Path: "/reconcile-storage/hostname.txt",
Revision: "f",
Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80",
URL: "http://outdated.com/reconcile-storage/hostname.txt",
}
if err := testStorage.MkdirAll(*obj.Status.Artifact); err != nil {
return err
}
if err := testStorage.AtomicWriteFile(obj.Status.Artifact, strings.NewReader("file"), 0644); err != nil {
return err
}
return nil
},
assertPaths: []string{
"/reconcile-storage/hostname.txt",
},
assertArtifact: &sourcev1.Artifact{
Path: "/reconcile-storage/hostname.txt",
Revision: "f",
Checksum: "3b9c358f36f0a31b6ad3e14f309c7cf198ac9246e8316f9ce543d5b19ac02b80",
URL: testStorage.Hostname + "/reconcile-storage/hostname.txt",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
r := &HelmRepositoryReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}
err = k8sClient.Create(context.Background(), namespace)
Expect(err).NotTo(HaveOccurred(), "failed to create test namespace")
helmServer, err = helmtestserver.NewTempHelmServer()
Expect(err).To(Succeed())
})
AfterEach(func() {
helmServer.Stop()
os.RemoveAll(helmServer.Root())
Eventually(func() error {
return k8sClient.Delete(context.Background(), namespace)
}, timeout, interval).Should(Succeed(), "failed to delete test namespace")
})
It("Creates artifacts for", func() {
helmServer.Start()
Expect(helmServer.PackageChart(path.Join("testdata/charts/helmchart"))).Should(Succeed())
Expect(helmServer.GenerateIndex()).Should(Succeed())
key := types.NamespacedName{
Name: "helmrepository-sample-" + randStringRunes(5),
Namespace: namespace.Name,
}
created := &sourcev1.HelmRepository{
obj := &sourcev1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
Name: key.Name,
Namespace: key.Namespace,
},
Spec: sourcev1.HelmRepositorySpec{
URL: helmServer.URL(),
Interval: metav1.Duration{Duration: indexInterval},
Timeout: &metav1.Duration{Duration: repositoryTimeout},
GenerateName: "test-",
},
}
Expect(k8sClient.Create(context.Background(), created)).Should(Succeed())
if tt.beforeFunc != nil {
g.Expect(tt.beforeFunc(obj, testStorage)).To(Succeed())
}
By("Expecting artifact")
got := &sourcev1.HelmRepository{}
Eventually(func() bool {
_ = k8sClient.Get(context.Background(), key, got)
return got.Status.Artifact != nil && ginkgoTestStorage.ArtifactExist(*got.Status.Artifact)
}, timeout, interval).Should(BeTrue())
got, err := r.reconcileStorage(context.TODO(), obj)
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
By("Updating the chart index")
// Regenerating the index is sufficient to make the revision change
Expect(helmServer.GenerateIndex()).Should(Succeed())
g.Expect(obj.Status.Artifact).To(MatchArtifact(tt.assertArtifact))
if tt.assertArtifact != nil && tt.assertArtifact.URL != "" {
g.Expect(obj.Status.Artifact.URL).To(Equal(tt.assertArtifact.URL))
}
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
By("Expecting revision change and GC")
Eventually(func() bool {
now := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, now)
// Test revision change and garbage collection
return now.Status.Artifact.Revision != got.Status.Artifact.Revision &&
!ginkgoTestStorage.ArtifactExist(*got.Status.Artifact)
}, timeout, interval).Should(BeTrue())
updated := &sourcev1.HelmRepository{}
Expect(k8sClient.Get(context.Background(), key, updated)).Should(Succeed())
updated.Spec.URL = "invalid#url?"
Expect(k8sClient.Update(context.Background(), updated)).Should(Succeed())
Eventually(func() bool {
_ = k8sClient.Get(context.Background(), key, updated)
for _, c := range updated.Status.Conditions {
if c.Reason == sourcev1.IndexationFailedReason {
return true
}
for _, p := range tt.assertPaths {
absoluteP := filepath.Join(testStorage.BasePath, p)
if !strings.HasPrefix(p, "!") {
g.Expect(absoluteP).To(BeAnExistingFile())
continue
}
return false
}, timeout, interval).Should(BeTrue())
Expect(updated.Status.Artifact).ToNot(BeNil())
By("Expecting to delete successfully")
got = &sourcev1.HelmRepository{}
Eventually(func() error {
_ = k8sClient.Get(context.Background(), key, got)
return k8sClient.Delete(context.Background(), got)
}, timeout, interval).Should(Succeed())
By("Expecting delete to finish")
Eventually(func() error {
r := &sourcev1.HelmRepository{}
return k8sClient.Get(context.Background(), key, r)
}, timeout, interval).ShouldNot(Succeed())
exists := func(path string) bool {
// wait for tmp sync on macOS
time.Sleep(time.Second)
_, err := os.Stat(path)
return err == nil
g.Expect(absoluteP).NotTo(BeAnExistingFile())
}
By("Expecting GC after delete")
Eventually(exists(got.Status.Artifact.Path), timeout, interval).ShouldNot(BeTrue())
})
}
}
It("Handles timeout", func() {
helmServer.Start()
func TestHelmRepositoryReconciler_reconcileSource(t *testing.T) {
type options struct {
username string
password string
publicKey []byte
privateKey []byte
ca []byte
}
Expect(helmServer.PackageChart(path.Join("testdata/charts/helmchart"))).Should(Succeed())
Expect(helmServer.GenerateIndex()).Should(Succeed())
key := types.NamespacedName{
Name: "helmrepository-sample-" + randStringRunes(5),
Namespace: namespace.Name,
}
created := &sourcev1.HelmRepository{
tests := []struct {
name string
protocol string
server options
secret *corev1.Secret
beforeFunc func(t *WithT, obj *sourcev1.HelmRepository)
afterFunc func(t *WithT, obj *sourcev1.HelmRepository)
want ctrl.Result
wantErr bool
assertConditions []metav1.Condition
}{
{
name: "HTTP without secretRef makes ArtifactOutdated=True",
protocol: "http",
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New index revision"),
},
},
{
name: "HTTP with Basic Auth secret makes ArtifactOutdated=True",
protocol: "http",
server: options{
username: "git",
password: "1234",
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: key.Name,
Namespace: key.Namespace,
Name: "basic-auth",
},
Spec: sourcev1.HelmRepositorySpec{
URL: helmServer.URL(),
Interval: metav1.Duration{Duration: indexInterval},
Data: map[string][]byte{
"username": []byte("git"),
"password": []byte("1234"),
},
}
Expect(k8sClient.Create(context.Background(), created)).Should(Succeed())
defer k8sClient.Delete(context.Background(), created)
},
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
obj.Spec.SecretRef = &meta.LocalObjectReference{Name: "basic-auth"}
},
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New index revision"),
},
},
{
name: "HTTPS with CAFile secret makes ArtifactOutdated=True",
protocol: "https",
server: options{
publicKey: tlsPublicKey,
privateKey: tlsPrivateKey,
ca: tlsCA,
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "ca-file",
},
Data: map[string][]byte{
"caFile": tlsCA,
},
},
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
obj.Spec.SecretRef = &meta.LocalObjectReference{Name: "ca-file"}
},
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.ArtifactOutdatedCondition, "NewRevision", "New index revision"),
},
},
{
name: "HTTPS with invalid CAFile secret makes FetchFailed=True and returns error",
protocol: "https",
server: options{
publicKey: tlsPublicKey,
privateKey: tlsPrivateKey,
ca: tlsCA,
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "invalid-ca",
},
Data: map[string][]byte{
"caFile": []byte("invalid"),
},
},
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
obj.Spec.SecretRef = &meta.LocalObjectReference{Name: "invalid-ca"}
},
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, meta.FailedReason, "can't create TLS config for client: failed to append certificates from file"),
},
},
{
name: "Invalid URL makes FetchFailed=True and returns zero Result",
protocol: "http",
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
obj.Spec.URL = strings.ReplaceAll(obj.Spec.URL, "http://", "")
},
want: ctrl.Result{},
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.URLInvalidReason, "first path segment in URL cannot contain colon"),
},
},
{
name: "Unsupported scheme makes FetchFailed=True and returns zero Result",
protocol: "http",
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
obj.Spec.URL = strings.ReplaceAll(obj.Spec.URL, "http://", "ftp://")
},
want: ctrl.Result{},
wantErr: false,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, meta.FailedReason, "scheme \"ftp\" not supported"),
},
},
{
name: "Missing secret returns FetchFailed=True and returns error",
protocol: "http",
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
obj.Spec.SecretRef = &meta.LocalObjectReference{Name: "non-existing"}
},
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "secrets \"non-existing\" not found"),
},
},
{
name: "Malformed secret returns FetchFailed=True and returns error",
protocol: "http",
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "malformed-basic-auth",
},
Data: map[string][]byte{
"username": []byte("git"),
},
},
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
obj.Spec.SecretRef = &meta.LocalObjectReference{Name: "malformed-basic-auth"}
},
wantErr: true,
assertConditions: []metav1.Condition{
*conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "required fields 'username' and 'password"),
},
},
}
By("Expecting index download to succeed")
Eventually(func() bool {
got := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, got)
for _, condition := range got.Status.Conditions {
if condition.Reason == sourcev1.IndexationSucceededReason {
return true
}
}
return false
}, timeout, interval).Should(BeTrue())
for _, tt := range tests {
obj := &sourcev1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "auth-strategy-",
},
Spec: sourcev1.HelmRepositorySpec{
Interval: metav1.Duration{Duration: interval},
Timeout: &metav1.Duration{Duration: interval},
},
}
By("Expecting index download to timeout")
updated := &sourcev1.HelmRepository{}
Expect(k8sClient.Get(context.Background(), key, updated)).Should(Succeed())
updated.Spec.Timeout = &metav1.Duration{Duration: time.Microsecond}
Expect(k8sClient.Update(context.Background(), updated)).Should(Succeed())
Eventually(func() string {
got := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, got)
for _, condition := range got.Status.Conditions {
if condition.Reason == sourcev1.IndexationFailedReason {
return condition.Message
}
}
return ""
}, timeout, interval).Should(MatchRegexp("(?i)timeout"))
})
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
It("Authenticates when basic auth credentials are provided", func() {
var username, password = "john", "doe"
helmServer.WithMiddleware(func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u, p, ok := r.BasicAuth()
if !ok || username != u || password != p {
w.WriteHeader(401)
return
}
handler.ServeHTTP(w, r)
server, err := helmtestserver.NewTempHelmServer()
g.Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(server.Root())
g.Expect(server.PackageChart("testdata/charts/helmchart")).To(Succeed())
g.Expect(server.GenerateIndex()).To(Succeed())
if len(tt.server.username+tt.server.password) > 0 {
server.WithMiddleware(func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u, p, ok := r.BasicAuth()
if !ok || u != tt.server.username || p != tt.server.password {
w.WriteHeader(401)
return
}
handler.ServeHTTP(w, r)
})
})
})
helmServer.Start()
Expect(helmServer.PackageChart(path.Join("testdata/charts/helmchart"))).Should(Succeed())
Expect(helmServer.GenerateIndex()).Should(Succeed())
secretKey := types.NamespacedName{
Name: "helmrepository-auth-" + randStringRunes(5),
Namespace: namespace.Name,
}
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretKey.Name,
Namespace: secretKey.Namespace,
secret := tt.secret.DeepCopy()
switch tt.protocol {
case "http":
server.Start()
defer server.Stop()
obj.Spec.URL = server.URL()
case "https":
g.Expect(server.StartTLS(tt.server.publicKey, tt.server.privateKey, tt.server.ca, "example.com")).To(Succeed())
defer server.Stop()
obj.Spec.URL = server.URL()
default:
t.Fatalf("unsupported protocol %q", tt.protocol)
}
if tt.beforeFunc != nil {
tt.beforeFunc(g, obj)
}
builder := fakeclient.NewClientBuilder().WithScheme(testEnv.GetScheme())
if secret != nil {
builder.WithObjects(secret.DeepCopy())
}
r := &HelmRepositoryReconciler{
EventRecorder: record.NewFakeRecorder(32),
Client: builder.Build(),
Storage: testStorage,
Getters: testGetters,
}
var chartRepo repository.ChartRepository
var artifact sourcev1.Artifact
dlog := log.NewDelegatingLogSink(log.NullLogSink{})
nullLogger := logr.New(dlog)
got, err := r.reconcileSource(logr.NewContext(ctx, nullLogger), obj, &artifact, &chartRepo)
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
if tt.afterFunc != nil {
tt.afterFunc(g, obj)
}
})
}
}
func TestHelmRepositoryReconciler_reconcileArtifact(t *testing.T) {
tests := []struct {
name string
beforeFunc func(t *WithT, obj *sourcev1.HelmRepository, artifact sourcev1.Artifact, index *repository.ChartRepository)
afterFunc func(t *WithT, obj *sourcev1.HelmRepository)
want ctrl.Result
wantErr bool
assertConditions []metav1.Condition
}{
{
name: "Archiving artifact to storage makes Ready=True",
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository, artifact sourcev1.Artifact, index *repository.ChartRepository) {
obj.Spec.Interval = metav1.Duration{Duration: interval}
},
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
},
},
{
name: "Up-to-date artifact should not update status",
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository, artifact sourcev1.Artifact, index *repository.ChartRepository) {
obj.Spec.Interval = metav1.Duration{Duration: interval}
obj.Status.Artifact = artifact.DeepCopy()
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
t.Expect(obj.Status.URL).To(BeEmpty())
},
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
},
},
{
name: "Removes ArtifactUnavailableCondition after creating artifact",
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository, artifact sourcev1.Artifact, index *repository.ChartRepository) {
obj.Spec.Interval = metav1.Duration{Duration: interval}
conditions.MarkTrue(obj, sourcev1.ArtifactUnavailableCondition, "Foo", "")
},
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
},
},
{
name: "Removes ArtifactOutdatedCondition after creating a new artifact",
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository, artifact sourcev1.Artifact, index *repository.ChartRepository) {
obj.Spec.Interval = metav1.Duration{Duration: interval}
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "Foo", "")
},
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
},
},
{
name: "Creates latest symlink to the created artifact",
beforeFunc: func(t *WithT, obj *sourcev1.HelmRepository, artifact sourcev1.Artifact, index *repository.ChartRepository) {
obj.Spec.Interval = metav1.Duration{Duration: interval}
},
afterFunc: func(t *WithT, obj *sourcev1.HelmRepository) {
localPath := testStorage.LocalPath(*obj.GetArtifact())
symlinkPath := filepath.Join(filepath.Dir(localPath), "index.yaml")
targetFile, err := os.Readlink(symlinkPath)
t.Expect(err).NotTo(HaveOccurred())
t.Expect(localPath).To(Equal(targetFile))
},
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Stored artifact for revision 'existing'"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
r := &HelmRepositoryReconciler{
EventRecorder: record.NewFakeRecorder(32),
Storage: testStorage,
}
obj := &sourcev1.HelmRepository{
TypeMeta: metav1.TypeMeta{
Kind: sourcev1.HelmRepositoryKind,
},
}
Expect(k8sClient.Create(context.Background(), secret)).Should(Succeed())
key := types.NamespacedName{
Name: "helmrepository-sample-" + randStringRunes(5),
Namespace: namespace.Name,
}
created := &sourcev1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
Name: key.Name,
Namespace: key.Namespace,
GenerateName: "test-bucket-",
Generation: 1,
Namespace: "default",
},
Spec: sourcev1.HelmRepositorySpec{
URL: helmServer.URL(),
SecretRef: &meta.LocalObjectReference{
Name: secretKey.Name,
},
Interval: metav1.Duration{Duration: indexInterval},
Timeout: &metav1.Duration{Duration: timeout},
URL: "https://example.com/index.yaml",
},
}
Expect(k8sClient.Create(context.Background(), created)).Should(Succeed())
defer k8sClient.Delete(context.Background(), created)
By("Expecting 401")
Eventually(func() bool {
got := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, got)
for _, c := range got.Status.Conditions {
if c.Reason == sourcev1.IndexationFailedReason &&
strings.Contains(c.Message, "401 Unauthorized") {
return true
}
}
return false
}, timeout, interval).Should(BeTrue())
tmpDir, err := os.MkdirTemp("", "test-reconcile-artifact-")
g.Expect(err).ToNot(HaveOccurred())
defer os.RemoveAll(tmpDir)
By("Expecting missing field error")
secret.Data = map[string][]byte{
"username": []byte(username),
// Create an empty cache file.
cachePath := filepath.Join(tmpDir, "index.yaml")
cacheFile, err := os.Create(cachePath)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cacheFile.Close()).ToNot(HaveOccurred())
chartRepo, err := repository.NewChartRepository(obj.Spec.URL, "", testGetters, nil)
g.Expect(err).ToNot(HaveOccurred())
chartRepo.CachePath = cachePath
artifact := testStorage.NewArtifactFor(obj.Kind, obj, "existing", "foo.tar.gz")
// Checksum of the index file calculated by the ChartRepository.
artifact.Checksum = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
if tt.beforeFunc != nil {
tt.beforeFunc(g, obj, artifact, chartRepo)
}
Expect(k8sClient.Update(context.Background(), secret)).Should(Succeed())
Eventually(func() bool {
got := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, got)
for _, c := range got.Status.Conditions {
if c.Reason == sourcev1.AuthenticationFailedReason {
return true
}
}
return false
}, timeout, interval).Should(BeTrue())
dlog := log.NewDelegatingLogSink(log.NullLogSink{})
nullLogger := logr.New(dlog)
got, err := r.reconcileArtifact(logr.NewContext(ctx, nullLogger), obj, artifact, chartRepo)
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
By("Expecting artifact")
secret.Data["password"] = []byte(password)
Expect(k8sClient.Update(context.Background(), secret)).Should(Succeed())
Eventually(func() bool {
got := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, got)
return got.Status.Artifact != nil &&
ginkgoTestStorage.ArtifactExist(*got.Status.Artifact)
}, timeout, interval).Should(BeTrue())
// On error, artifact is empty. Check artifacts only on successful
// reconcile.
if !tt.wantErr {
g.Expect(obj.Status.Artifact).To(MatchArtifact(artifact.DeepCopy()))
}
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
By("Expecting missing secret error")
Expect(k8sClient.Delete(context.Background(), secret)).Should(Succeed())
got := &sourcev1.HelmRepository{}
Eventually(func() bool {
_ = k8sClient.Get(context.Background(), key, got)
for _, c := range got.Status.Conditions {
if c.Reason == sourcev1.AuthenticationFailedReason {
return true
}
}
return false
}, timeout, interval).Should(BeTrue())
Expect(got.Status.Artifact).ShouldNot(BeNil())
if tt.afterFunc != nil {
tt.afterFunc(g, obj)
}
})
It("Authenticates when TLS credentials are provided", func() {
err = helmServer.StartTLS(examplePublicKey, examplePrivateKey, exampleCA, "example.com")
Expect(err).NotTo(HaveOccurred())
Expect(helmServer.PackageChart(path.Join("testdata/charts/helmchart"))).Should(Succeed())
Expect(helmServer.GenerateIndex()).Should(Succeed())
secretKey := types.NamespacedName{
Name: "helmrepository-auth-" + randStringRunes(5),
Namespace: namespace.Name,
}
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretKey.Name,
Namespace: secretKey.Namespace,
},
}
Expect(k8sClient.Create(context.Background(), secret)).Should(Succeed())
key := types.NamespacedName{
Name: "helmrepository-sample-" + randStringRunes(5),
Namespace: namespace.Name,
}
created := &sourcev1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
Name: key.Name,
Namespace: key.Namespace,
},
Spec: sourcev1.HelmRepositorySpec{
URL: helmServer.URL(),
SecretRef: &meta.LocalObjectReference{
Name: secretKey.Name,
},
Interval: metav1.Duration{Duration: indexInterval},
},
}
Expect(k8sClient.Create(context.Background(), created)).Should(Succeed())
defer k8sClient.Delete(context.Background(), created)
By("Expecting unknown authority error")
Eventually(func() bool {
got := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, got)
for _, c := range got.Status.Conditions {
if c.Reason == sourcev1.IndexationFailedReason &&
strings.Contains(c.Message, "certificate signed by unknown authority") {
return true
}
}
return false
}, timeout, interval).Should(BeTrue())
By("Expecting missing field error")
secret.Data = map[string][]byte{
"certFile": examplePublicKey,
}
Expect(k8sClient.Update(context.Background(), secret)).Should(Succeed())
Eventually(func() bool {
got := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, got)
for _, c := range got.Status.Conditions {
if c.Reason == sourcev1.AuthenticationFailedReason {
return true
}
}
return false
}, timeout, interval).Should(BeTrue())
By("Expecting artifact")
secret.Data["keyFile"] = examplePrivateKey
secret.Data["caFile"] = exampleCA
Expect(k8sClient.Update(context.Background(), secret)).Should(Succeed())
Eventually(func() bool {
got := &sourcev1.HelmRepository{}
_ = k8sClient.Get(context.Background(), key, got)
return got.Status.Artifact != nil &&
ginkgoTestStorage.ArtifactExist(*got.Status.Artifact)
}, timeout, interval).Should(BeTrue())
By("Expecting missing secret error")
Expect(k8sClient.Delete(context.Background(), secret)).Should(Succeed())
got := &sourcev1.HelmRepository{}
Eventually(func() bool {
_ = k8sClient.Get(context.Background(), key, got)
for _, c := range got.Status.Conditions {
if c.Reason == sourcev1.AuthenticationFailedReason {
return true
}
}
return false
}, timeout, interval).Should(BeTrue())
Expect(got.Status.Artifact).ShouldNot(BeNil())
})
})
})
}
}

View File

@ -126,9 +126,9 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred(), "failed to setup GitRepositoryReconciler")
err = (&HelmRepositoryReconciler{
Client: k8sManager.GetClient(),
Scheme: scheme.Scheme,
Storage: ginkgoTestStorage,
Client: k8sManager.GetClient(),
EventRecorder: record.NewFakeRecorder(32),
Storage: ginkgoTestStorage,
Getters: getter.Providers{getter.Provider{
Schemes: []string{"http", "https"},
New: getter.NewHTTPGetter,

View File

@ -106,6 +106,16 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start BucketReconciler: %v", err))
}
if err := (&HelmRepositoryReconciler{
Client: testEnv,
EventRecorder: record.NewFakeRecorder(32),
Metrics: testMetricsH,
Getters: testGetters,
Storage: testStorage,
}).SetupWithManager(testEnv); err != nil {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}
go func() {
fmt.Println("Starting the test environment")
if err := testEnv.Start(ctx); err != nil {

11
main.go
View File

@ -177,12 +177,11 @@ func main() {
os.Exit(1)
}
if err = (&controllers.HelmRepositoryReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Storage: storage,
Getters: getters,
EventRecorder: eventRecorder,
MetricsRecorder: metricsH.MetricsRecorder,
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Storage: storage,
Getters: getters,
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {