Static helmrepository OCI

Remove the HelmRepositoryOCI reconciler and make HelmRepository of type
OCI static. The existing HelmRepository OCI objects are migrated to
static object by removing their finalizers and status. New
HelmRepository OCI objects go through one time migration to remove the
status. These are not reconciled again, unless the type is changed to
default. On type switching from HelmRepository default to OCI, the
finalizer, status and artifact are removed to make the object static. On
switching from OCI to default, a complete reconciliation of
HelmRepository takes place to build artifact and add status and
finalizer.

The HelmRepository .spec.url has a new validation to check the URL
scheme. This is to add some validation to HelmRepository OCI since it's
not backed by a reconciler for full validation.

Add HelmRepositoryOCIMigrationPredicate predicate to detect and allow
reconciliation of HelmRepository OCI objects that need migration. The
other predicates that filtered the HelmRepository events based on the
type have been removed as all the HelmRepositories will now be
reconciled by a single reconciler. HelmRepositoryOCIMigrationPredicate
readily allows non-OCI objects and only checks if a migration is needed
for OCI type object.

Add controller tests for different migration scenarios.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
This commit is contained in:
Sunny 2023-10-31 19:40:08 +00:00
parent f54a59c60b
commit cf3735e2a2
13 changed files with 401 additions and 1076 deletions

View File

@ -44,6 +44,7 @@ const (
type HelmRepositorySpec struct {
// URL of the Helm repository, a valid URL contains at least a protocol and
// host.
// +kubebuilder:validation:Pattern="^(http|https|oci)://.*$"
// +required
URL string `json:"url"`

View File

@ -373,6 +373,7 @@ spec:
url:
description: URL of the Helm repository, a valid URL contains at least
a protocol and host.
pattern: ^(http|https|oci)://.*$
type: string
required:
- interval

View File

@ -75,7 +75,6 @@ kubectl -n source-system rollout status deploy/source-controller --timeout=1m
kubectl -n source-system wait gitrepository/gitrepository-sample --for=condition=ready --timeout=1m
kubectl -n source-system wait ocirepository/ocirepository-sample --for=condition=ready --timeout=1m
kubectl -n source-system wait helmrepository/helmrepository-sample --for=condition=ready --timeout=1m
kubectl -n source-system wait helmrepository/helmrepository-sample-oci --for=condition=ready --timeout=1m
kubectl -n source-system wait helmchart/helmchart-sample --for=condition=ready --timeout=1m
kubectl -n source-system wait helmchart/helmchart-sample-oci --for=condition=ready --timeout=1m
kubectl -n source-system delete -f "${ROOT_DIR}/config/samples"
@ -145,7 +144,6 @@ kubectl -n source-system wait gitrepository/large-repo --for=condition=ready --t
echo "Run HelmChart from OCI registry tests"
kubectl -n source-system apply -f "${ROOT_DIR}/config/testdata/helmchart-from-oci/source.yaml"
kubectl -n source-system wait helmrepository/podinfo --for=condition=ready --timeout=1m
kubectl -n source-system wait helmchart/podinfo --for=condition=ready --timeout=1m
kubectl -n source-system wait helmchart/podinfo-keyless --for=condition=ready --timeout=1m

View File

@ -18,6 +18,7 @@ package controller
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/url"
@ -139,6 +140,12 @@ type HelmChartReconciler struct {
patchOptions []patch.Option
}
// RegistryClientGeneratorFunc is a function that returns a registry client
// and an optional file name.
// The file is used to store the registry client credentials.
// The caller is responsible for deleting the file.
type RegistryClientGeneratorFunc func(tlsConfig *tls.Config, isLogin bool) (*helmreg.Client, string, error)
func (r *HelmChartReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(ctx, mgr, HelmChartReconcilerOptions{})
}

View File

@ -197,7 +197,7 @@ func TestHelmChartReconciler_Reconcile(t *testing.T) {
{
name: "Stalling on invalid repository URL",
beforeFunc: func(repository *helmv1.HelmRepository) {
repository.Spec.URL = "://unsupported" // Invalid URL
repository.Spec.URL = "https://unsupported/foo://" // Invalid URL
},
assertFunc: func(g *WithT, obj *helmv1.HelmChart, _ *helmv1.HelmRepository) {
key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}

View File

@ -22,11 +22,13 @@ import (
"errors"
"fmt"
"net/url"
"strings"
"time"
"github.com/docker/go-units"
"github.com/opencontainers/go-digest"
helmgetter "helm.sh/helm/v3/pkg/getter"
helmreg "helm.sh/helm/v3/pkg/registry"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
kuberecorder "k8s.io/client-go/tools/record"
@ -138,10 +140,7 @@ func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager,
For(&helmv1.HelmRepository{}).
WithEventFilter(
predicate.And(
predicate.Or(
intpredicates.HelmRepositoryTypePredicate{RepositoryType: helmv1.HelmRepositoryTypeDefault},
intpredicates.HelmRepositoryTypePredicate{RepositoryType: ""},
),
intpredicates.HelmRepositoryOCIMigrationPredicate{},
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}),
),
).
@ -164,6 +163,11 @@ func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// Initialize the patch helper with the current version of the object.
serialPatcher := patch.NewSerialPatcher(obj, r.Client)
// If it's of type OCI, migrate the object to static.
if obj.Spec.Type == helmv1.HelmRepositoryTypeOCI {
return r.migrationToStatic(ctx, serialPatcher, obj)
}
// recResult stores the abstracted reconcile result.
var recResult sreconcile.Result
@ -193,8 +197,8 @@ func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reque
r.Metrics.RecordDuration(ctx, obj, start)
}()
// Examine if the object is under deletion or if a type change has happened.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() || (obj.Spec.Type != "" && obj.Spec.Type != helmv1.HelmRepositoryTypeDefault) {
// Examine if the object is under deletion.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
recResult, retErr = r.reconcileDelete(ctx, obj)
return
}
@ -391,6 +395,18 @@ func (r *HelmRepositoryReconciler) reconcileStorage(ctx context.Context, sp *pat
// pointer is set to the newly fetched index.
func (r *HelmRepositoryReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher,
obj *helmv1.HelmRepository, artifact *sourcev1.Artifact, chartRepo *repository.ChartRepository) (sreconcile.Result, error) {
// Ensure it's not an OCI URL. API validation ensures that only
// http/https/oci scheme are allowed.
if strings.HasPrefix(obj.Spec.URL, helmreg.OCIScheme) {
err := fmt.Errorf("'oci' URL scheme cannot be used with 'default' HelmRepository type")
e := serror.NewStalling(
fmt.Errorf("invalid Helm repository URL: %w", err),
sourcev1.URLInvalidReason,
)
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
return sreconcile.ResultEmpty, e
}
normalizedURL, err := repository.NormalizeURL(obj.Spec.URL)
if err != nil {
e := serror.NewStalling(
@ -685,3 +701,33 @@ func (r *HelmRepositoryReconciler) eventLogf(ctx context.Context, obj runtime.Ob
}
r.Eventf(obj, eventType, reason, msg)
}
// migrateToStatic is HelmRepository OCI migration to static object.
func (r *HelmRepositoryReconciler) migrationToStatic(ctx context.Context, sp *patch.SerialPatcher, obj *helmv1.HelmRepository) (result ctrl.Result, err error) {
// Skip migration if suspended and not being deleted.
if obj.Spec.Suspend && obj.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}
if !intpredicates.HelmRepositoryOCIRequireMigration(obj) {
// Already migrated, nothing to do.
return ctrl.Result{}, nil
}
// Delete any artifact.
_, err = r.reconcileDelete(ctx, obj)
if err != nil {
return ctrl.Result{}, err
}
// Delete finalizer and reset the status.
controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
obj.Status = helmv1.HelmRepositoryStatus{}
if err := sp.Patch(ctx, obj); err != nil {
return ctrl.Result{}, err
}
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "Migration",
"removed artifact and finalizer to migrate to static HelmRepository of type OCI")
return ctrl.Result{}, nil
}

View File

@ -1,418 +0,0 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/url"
"os"
"time"
helmreg "helm.sh/helm/v3/pkg/registry"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/jitter"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
rreconcile "github.com/fluxcd/pkg/runtime/reconcile"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
helmv1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/helm/getter"
"github.com/fluxcd/source-controller/internal/helm/repository"
"github.com/fluxcd/source-controller/internal/object"
intpredicates "github.com/fluxcd/source-controller/internal/predicates"
)
var helmRepositoryOCIOwnedConditions = []string{
meta.ReadyCondition,
meta.ReconcilingCondition,
meta.StalledCondition,
}
var helmRepositoryOCINegativeConditions = []string{
meta.StalledCondition,
meta.ReconcilingCondition,
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/finalizers,verbs=get;create;update;patch;delete
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// HelmRepositoryOCI Reconciler reconciles a v1beta2.HelmRepository object of type OCI.
type HelmRepositoryOCIReconciler struct {
client.Client
kuberecorder.EventRecorder
helper.Metrics
ControllerName string
RegistryClientGenerator RegistryClientGeneratorFunc
patchOptions []patch.Option
// unmanagedConditions are the conditions that are not managed by this
// reconciler and need to be removed from the object before taking ownership
// of the object being reconciled.
unmanagedConditions []string
}
// RegistryClientGeneratorFunc is a function that returns a registry client
// and an optional file name.
// The file is used to store the registry client credentials.
// The caller is responsible for deleting the file.
type RegistryClientGeneratorFunc func(tlsConfig *tls.Config, isLogin bool) (*helmreg.Client, string, error)
func (r *HelmRepositoryOCIReconciler) SetupWithManager(mgr ctrl.Manager) error {
return r.SetupWithManagerAndOptions(mgr, HelmRepositoryReconcilerOptions{})
}
func (r *HelmRepositoryOCIReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts HelmRepositoryReconcilerOptions) error {
r.unmanagedConditions = conditionsDiff(helmRepositoryReadyCondition.Owned, helmRepositoryOCIOwnedConditions)
r.patchOptions = getPatchOptions(helmRepositoryOCIOwnedConditions, r.ControllerName)
return ctrl.NewControllerManagedBy(mgr).
For(&helmv1.HelmRepository{}).
WithEventFilter(
predicate.And(
intpredicates.HelmRepositoryTypePredicate{RepositoryType: helmv1.HelmRepositoryTypeOCI},
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}),
),
).
WithOptions(controller.Options{
RateLimiter: opts.RateLimiter,
}).
Complete(r)
}
func (r *HelmRepositoryOCIReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
// Fetch the HelmRepository
obj := &helmv1.HelmRepository{}
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// If the object contains any of the unmanaged conditions, requeue and wait
// for those conditions to be removed first before processing the object.
// NOTE: This will happen only when a HelmRepository's spec.type is switched
// from "default" to "oci".
if conditions.HasAny(obj, r.unmanagedConditions) {
r.eventLogf(ctx, obj, eventv1.EventTypeTrace, "IncompleteTransition",
"object contains conditions managed by other reconciler")
return ctrl.Result{RequeueAfter: time.Second}, nil
}
// Initialize the patch helper with the current version of the object.
serialPatcher := patch.NewSerialPatcher(obj, r.Client)
// Always attempt to patch the object after each reconciliation.
defer func() {
// If a reconcile annotation value is found, set it in the object status
// as status.lastHandledReconcileAt.
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
object.SetStatusLastHandledReconcileAt(obj, v)
}
patchOpts := []patch.Option{}
patchOpts = append(patchOpts, r.patchOptions...)
// Set status observed generation option if the object is stalled, or
// if the object is ready.
if conditions.IsStalled(obj) || conditions.IsReady(obj) {
patchOpts = append(patchOpts, patch.WithStatusObservedGeneration{})
}
if err := serialPatcher.Patch(ctx, obj, patchOpts...); err != nil {
// Ignore patch error "not found" when the object is being deleted.
if !obj.GetDeletionTimestamp().IsZero() {
err = kerrors.FilterOut(err, func(e error) bool { return apierrors.IsNotFound(e) })
}
retErr = kerrors.NewAggregate([]error{retErr, err})
}
// Always record suspend, readiness and duration metrics.
r.Metrics.RecordSuspend(ctx, obj, obj.Spec.Suspend)
r.Metrics.RecordReadiness(ctx, obj)
r.Metrics.RecordDuration(ctx, obj, start)
}()
// Examine if the object is under deletion.
if !obj.ObjectMeta.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, obj)
}
// Add finalizer first if it doesn't exist to avoid the race condition
// between init and delete.
// Note: Finalizers in general can only be added when the deletionTimestamp
// is not set.
if !controllerutil.ContainsFinalizer(obj, sourcev1.SourceFinalizer) {
controllerutil.AddFinalizer(obj, sourcev1.SourceFinalizer)
return ctrl.Result{Requeue: true}, nil
}
// Return if the object is suspended.
if obj.Spec.Suspend {
log.Info("reconciliation is suspended for this object")
return ctrl.Result{}, nil
}
// Examine if a type change has happened and act accordingly
if obj.Spec.Type != helmv1.HelmRepositoryTypeOCI {
// Remove any stale condition and ignore the object if the type has
// changed.
obj.Status.Conditions = nil
return ctrl.Result{}, nil
}
result, retErr = r.reconcile(ctx, serialPatcher, obj)
return
}
// reconcile reconciles the HelmRepository object. While reconciling, when an
// error is encountered, it sets the failure details in the appropriate status
// condition type and returns the error with appropriate ctrl.Result. The object
// status conditions and the returned results are evaluated in the deferred
// block at the very end to summarize the conditions to be in a consistent
// state.
func (r *HelmRepositoryOCIReconciler) reconcile(ctx context.Context, sp *patch.SerialPatcher, obj *helmv1.HelmRepository) (result ctrl.Result, retErr error) {
ctxTimeout, cancel := context.WithTimeout(ctx, obj.Spec.Timeout.Duration)
defer cancel()
oldObj := obj.DeepCopy()
defer func() {
// If it's stalled, ensure reconciling is removed.
if sc := conditions.Get(obj, meta.StalledCondition); sc != nil && sc.Status == metav1.ConditionTrue {
conditions.Delete(obj, meta.ReconcilingCondition)
}
// Check if it's a successful reconciliation.
if result.RequeueAfter == obj.GetRequeueAfter() && !result.Requeue &&
retErr == nil {
// Remove reconciling condition if the reconciliation was successful.
conditions.Delete(obj, meta.ReconcilingCondition)
// If it's not ready even though it's not reconciling or stalled,
// set the ready failure message as the error.
// Based on isNonStalledSuccess() from internal/reconcile/summarize.
if ready := conditions.Get(obj, meta.ReadyCondition); ready != nil &&
ready.Status == metav1.ConditionFalse && !conditions.IsStalled(obj) {
retErr = errors.New(conditions.GetMessage(obj, meta.ReadyCondition))
}
}
// Presence of reconciling means that the reconciliation didn't succeed.
// Set the Reconciling reason to ProgressingWithRetry to indicate a
// failure retry.
if conditions.IsReconciling(obj) {
reconciling := conditions.Get(obj, meta.ReconcilingCondition)
reconciling.Reason = meta.ProgressingWithRetryReason
conditions.Set(obj, reconciling)
}
// If it's still a successful reconciliation and it's not reconciling or
// stalled, mark Ready=True.
if !conditions.IsReconciling(obj) && !conditions.IsStalled(obj) &&
retErr == nil && result.RequeueAfter == obj.GetRequeueAfter() {
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "Helm repository is ready")
}
// Emit events when object's state changes.
ready := conditions.Get(obj, meta.ReadyCondition)
// Became ready from not ready.
if !conditions.IsReady(oldObj) && conditions.IsReady(obj) {
r.eventLogf(ctx, obj, corev1.EventTypeNormal, ready.Reason, ready.Message)
}
// Became not ready from ready.
if conditions.IsReady(oldObj) && !conditions.IsReady(obj) {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, ready.Reason, ready.Message)
}
// Apply jitter.
if result.RequeueAfter == obj.GetRequeueAfter() {
result.RequeueAfter = jitter.JitteredIntervalDuration(result.RequeueAfter)
}
}()
// Set reconciling condition.
rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason, "reconciliation in progress")
var reconcileAtVal string
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
reconcileAtVal = v
}
// Persist reconciling if generation differs or reconciliation is requested.
switch {
case obj.Generation != obj.Status.ObservedGeneration:
rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason,
"processing object: new generation %d -> %d", obj.Status.ObservedGeneration, obj.Generation)
if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil {
result, retErr = ctrl.Result{}, err
return
}
case reconcileAtVal != obj.Status.GetLastHandledReconcileRequest():
if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil {
result, retErr = ctrl.Result{}, err
return
}
}
// Ensure that it's an OCI URL before continuing.
if !helmreg.IsOCI(obj.Spec.URL) {
u, err := url.Parse(obj.Spec.URL)
if err != nil {
err = fmt.Errorf("failed to parse URL: %w", err)
} else {
err = fmt.Errorf("URL scheme '%s' in '%s' is not supported", u.Scheme, obj.Spec.URL)
}
conditions.MarkStalled(obj, sourcev1.URLInvalidReason, err.Error())
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.URLInvalidReason, err.Error())
ctrl.LoggerFrom(ctx).Error(err, "reconciliation stalled")
result, retErr = ctrl.Result{}, nil
return
}
normalizedURL, err := repository.NormalizeURL(obj.Spec.URL)
if err != nil {
conditions.MarkStalled(obj, sourcev1.URLInvalidReason, err.Error())
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.URLInvalidReason, err.Error())
result, retErr = ctrl.Result{}, nil
return
}
conditions.Delete(obj, meta.StalledCondition)
clientOpts, certsTmpDir, err := getter.GetClientOpts(ctxTimeout, r.Client, obj, normalizedURL)
if err != nil {
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.AuthenticationFailedReason, err.Error())
result, retErr = ctrl.Result{}, err
return
}
if certsTmpDir != "" {
defer func() {
if err := os.RemoveAll(certsTmpDir); err != nil {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, meta.FailedReason,
"failed to delete temporary certs directory: %s", err)
}
}()
}
// Create registry client and login if needed.
registryClient, file, err := r.RegistryClientGenerator(clientOpts.TlsConfig, clientOpts.MustLoginToRegistry())
if err != nil {
e := fmt.Errorf("failed to create registry client: %w", err)
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, e.Error())
result, retErr = ctrl.Result{}, e
return
}
if file != "" {
defer func() {
if err := os.Remove(file); err != nil {
r.eventLogf(ctx, obj, corev1.EventTypeWarning, meta.FailedReason,
"failed to delete temporary credentials file: %s", err)
}
}()
}
chartRepo, err := repository.NewOCIChartRepository(obj.Spec.URL, repository.WithOCIRegistryClient(registryClient))
if err != nil {
e := fmt.Errorf("failed to parse URL '%s': %w", obj.Spec.URL, err)
conditions.MarkStalled(obj, sourcev1.URLInvalidReason, e.Error())
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.URLInvalidReason, e.Error())
result, retErr = ctrl.Result{}, nil
return
}
conditions.Delete(obj, meta.StalledCondition)
// Attempt to login to the registry if credentials are provided.
if clientOpts.MustLoginToRegistry() {
err = chartRepo.Login(clientOpts.RegLoginOpts...)
if err != nil {
e := fmt.Errorf("failed to login to registry '%s': %w", obj.Spec.URL, err)
conditions.MarkFalse(obj, meta.ReadyCondition, sourcev1.AuthenticationFailedReason, e.Error())
result, retErr = ctrl.Result{}, e
return
}
}
// Remove any stale Ready condition, most likely False, set above. Its value
// is derived from the overall result of the reconciliation in the deferred
// block at the very end.
conditions.Delete(obj, meta.ReadyCondition)
result, retErr = ctrl.Result{RequeueAfter: obj.GetRequeueAfter()}, nil
return
}
func (r *HelmRepositoryOCIReconciler) reconcileDelete(ctx context.Context, obj *helmv1.HelmRepository) (ctrl.Result, error) {
// Remove our finalizer from the list
controllerutil.RemoveFinalizer(obj, sourcev1.SourceFinalizer)
// Stop reconciliation as the object is being deleted
return ctrl.Result{}, nil
}
// eventLogf records events, and logs at the same time.
//
// This log is different from the debug log in the EventRecorder, in the sense
// that this is a simple log. While the debug log contains complete details
// about the event.
func (r *HelmRepositoryOCIReconciler) eventLogf(ctx context.Context, obj runtime.Object, eventType string, reason string, messageFmt string, args ...interface{}) {
msg := fmt.Sprintf(messageFmt, args...)
// Log and emit event.
if eventType == corev1.EventTypeWarning {
ctrl.LoggerFrom(ctx).Error(errors.New(reason), msg)
} else {
ctrl.LoggerFrom(ctx).Info(msg)
}
r.Eventf(obj, eventType, reason, msg)
}
func conditionsDiff(a, b []string) []string {
bMap := make(map[string]struct{}, len(b))
for _, j := range b {
bMap[j] = struct{}{}
}
r := []string{}
for _, i := range a {
if _, exists := bMap[i]; !exists {
r = append(r, i)
}
}
return r
}

View File

@ -1,478 +0,0 @@
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"encoding/base64"
"fmt"
"strconv"
"testing"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
conditionscheck "github.com/fluxcd/pkg/runtime/conditions/check"
"github.com/fluxcd/pkg/runtime/patch"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
helmv1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/helm/registry"
)
func TestHelmRepositoryOCIReconciler_deleteBeforeFinalizer(t *testing.T) {
g := NewWithT(t)
namespaceName := "helmrepo-" + randStringRunes(5)
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: namespaceName},
}
g.Expect(k8sClient.Create(ctx, namespace)).ToNot(HaveOccurred())
t.Cleanup(func() {
g.Expect(k8sClient.Delete(ctx, namespace)).NotTo(HaveOccurred())
})
helmrepo := &helmv1.HelmRepository{}
helmrepo.Name = "test-helmrepo"
helmrepo.Namespace = namespaceName
helmrepo.Spec = helmv1.HelmRepositorySpec{
Interval: metav1.Duration{Duration: interval},
URL: "https://example.com",
Type: "oci",
}
// Add a test finalizer to prevent the object from getting deleted.
helmrepo.SetFinalizers([]string{"test-finalizer"})
g.Expect(k8sClient.Create(ctx, helmrepo)).NotTo(HaveOccurred())
// Add deletion timestamp by deleting the object.
g.Expect(k8sClient.Delete(ctx, helmrepo)).NotTo(HaveOccurred())
r := &HelmRepositoryOCIReconciler{
Client: k8sClient,
EventRecorder: record.NewFakeRecorder(32),
}
// NOTE: Only a real API server responds with an error in this scenario.
_, err := r.Reconcile(ctx, ctrl.Request{NamespacedName: client.ObjectKeyFromObject(helmrepo)})
g.Expect(err).NotTo(HaveOccurred())
}
func TestHelmRepositoryOCIReconciler_Reconcile(t *testing.T) {
tests := []struct {
name string
secretType corev1.SecretType
secretData map[string][]byte
}{
{
name: "valid auth data",
secretData: map[string][]byte{
"username": []byte(testRegistryUsername),
"password": []byte(testRegistryPassword),
},
},
{
name: "no auth data",
secretData: nil,
},
{
name: "dockerconfigjson Secret",
secretType: corev1.SecretTypeDockerConfigJson,
secretData: map[string][]byte{
".dockerconfigjson": []byte(`{"auths":{"` +
testRegistryServer.registryHost + `":{"` +
`auth":"` + base64.StdEncoding.EncodeToString([]byte(testRegistryUsername+":"+testRegistryPassword)) + `"}}}`),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
ns, err := testEnv.CreateNamespace(ctx, "helmrepository-oci-reconcile-test")
g.Expect(err).ToNot(HaveOccurred())
defer func() { g.Expect(testEnv.Delete(ctx, ns)).To(Succeed()) }()
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "helmrepository-",
Namespace: ns.Name,
},
Data: tt.secretData,
}
if tt.secretType != "" {
secret.Type = tt.secretType
}
g.Expect(testEnv.CreateAndWait(ctx, secret)).To(Succeed())
origObj := &helmv1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "helmrepository-oci-reconcile-",
Namespace: ns.Name,
},
Spec: helmv1.HelmRepositorySpec{
Interval: metav1.Duration{Duration: interval},
URL: fmt.Sprintf("oci://%s", testRegistryServer.registryHost),
SecretRef: &meta.LocalObjectReference{
Name: secret.Name,
},
Provider: helmv1.GenericOCIProvider,
Type: helmv1.HelmRepositoryTypeOCI,
},
}
obj := origObj.DeepCopy()
g.Expect(testEnv.Create(ctx, obj)).To(Succeed())
key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}
// Wait for finalizer to be set
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
return len(obj.Finalizers) > 0
}, timeout).Should(BeTrue())
// Wait for HelmRepository to be Ready
waitForSourceReadyWithoutArtifact(ctx, g, obj)
// Check if the object status is valid.
condns := &conditionscheck.Conditions{NegativePolarity: helmRepositoryReadyCondition.NegativePolarity}
checker := conditionscheck.NewChecker(testEnv.Client, condns)
checker.WithT(g).CheckErr(ctx, obj)
// kstatus client conformance check.
u, err := patch.ToUnstructured(obj)
g.Expect(err).ToNot(HaveOccurred())
res, err := kstatus.Compute(u)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(res.Status).To(Equal(kstatus.CurrentStatus))
// Patch the object with reconcile request annotation.
patchHelper, err := patch.NewHelper(obj, testEnv.Client)
g.Expect(err).ToNot(HaveOccurred())
annotations := map[string]string{
meta.ReconcileRequestAnnotation: "now",
}
obj.SetAnnotations(annotations)
g.Expect(patchHelper.Patch(ctx, obj)).ToNot(HaveOccurred())
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
return obj.Status.LastHandledReconcileAt == "now"
}, timeout).Should(BeTrue())
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
// Wait for HelmRepository to be deleted
waitForSourceDeletion(ctx, g, obj)
// Check if a suspended object gets deleted.
obj = origObj.DeepCopy()
testSuspendedObjectDeleteWithoutArtifact(ctx, g, obj)
})
}
}
func TestHelmRepositoryOCIReconciler_authStrategy(t *testing.T) {
type secretOptions struct {
username string
password string
}
tests := []struct {
name string
url string
registryOpts registryOptions
insecure bool
secretOpts secretOptions
secret *corev1.Secret
certsSecret *corev1.Secret
provider string
providerImg string
want ctrl.Result
wantErr bool
assertConditions []metav1.Condition
}{
{
name: "HTTP without basic auth",
want: ctrl.Result{RequeueAfter: interval},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Helm repository is ready"),
},
},
{
name: "HTTP with basic auth secret",
want: ctrl.Result{RequeueAfter: interval},
insecure: true,
registryOpts: registryOptions{
withBasicAuth: true,
},
secretOpts: secretOptions{
username: testRegistryUsername,
password: testRegistryPassword,
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "auth-secretref",
},
Type: corev1.SecretTypeDockerConfigJson,
Data: map[string][]byte{},
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Helm repository is ready"),
},
},
{
name: "HTTP registry - basic auth with invalid secret",
want: ctrl.Result{},
wantErr: true,
insecure: true,
registryOpts: registryOptions{
withBasicAuth: true,
},
secretOpts: secretOptions{
username: "wrong-pass",
password: "wrong-pass",
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "auth-secretref",
},
Type: corev1.SecretTypeDockerConfigJson,
Data: map[string][]byte{},
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingWithRetryReason, "processing object: new generation"),
*conditions.FalseCondition(meta.ReadyCondition, sourcev1.AuthenticationFailedReason, "failed to login to registry"),
},
},
{
name: "with contextual login provider",
wantErr: true,
insecure: true,
provider: "aws",
providerImg: "oci://123456789000.dkr.ecr.us-east-2.amazonaws.com/test",
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingWithRetryReason, "processing object: new generation"),
*conditions.FalseCondition(meta.ReadyCondition, sourcev1.AuthenticationFailedReason, "failed to get credential from"),
},
},
{
name: "with contextual login provider and secretRef",
want: ctrl.Result{RequeueAfter: interval},
registryOpts: registryOptions{
withBasicAuth: true,
},
insecure: true,
secretOpts: secretOptions{
username: testRegistryUsername,
password: testRegistryPassword,
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "auth-secretref",
},
Type: corev1.SecretTypeDockerConfigJson,
Data: map[string][]byte{},
},
provider: "azure",
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Helm repository is ready"),
},
},
{
name: "HTTPS With invalid CA cert",
wantErr: true,
registryOpts: registryOptions{
withTLS: true,
withClientCertAuth: true,
},
secretOpts: secretOptions{
username: testRegistryUsername,
password: testRegistryPassword,
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "auth-secretref",
},
Type: corev1.SecretTypeDockerConfigJson,
Data: map[string][]byte{},
},
certsSecret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "certs-secretref",
},
Data: map[string][]byte{
"ca.crt": []byte("invalid caFile"),
},
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingWithRetryReason, "processing object: new generation 0 -> 1"),
*conditions.FalseCondition(meta.ReadyCondition, sourcev1.AuthenticationFailedReason, "cannot append certificate into certificate pool: invalid CA certificate"),
},
},
{
name: "HTTPS With CA cert",
want: ctrl.Result{RequeueAfter: interval},
registryOpts: registryOptions{
withTLS: true,
withClientCertAuth: true,
},
secretOpts: secretOptions{
username: testRegistryUsername,
password: testRegistryPassword,
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "auth-secretref",
},
Type: corev1.SecretTypeDockerConfigJson,
Data: map[string][]byte{},
},
certsSecret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "certs-secretref",
},
Data: map[string][]byte{
"ca.crt": tlsCA,
"tls.crt": clientPublicKey,
"tls.key": clientPrivateKey,
},
},
assertConditions: []metav1.Condition{
*conditions.TrueCondition(meta.ReadyCondition, meta.SucceededReason, "Helm repository is ready"),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
clientBuilder := fakeclient.NewClientBuilder().
WithScheme(testEnv.GetScheme()).
WithStatusSubresource(&helmv1.HelmRepository{})
workspaceDir := t.TempDir()
if tt.insecure {
tt.registryOpts.disableDNSMocking = true
}
server, err := setupRegistryServer(ctx, workspaceDir, tt.registryOpts)
g.Expect(err).NotTo(HaveOccurred())
t.Cleanup(func() {
server.Close()
})
obj := &helmv1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "auth-strategy-",
Generation: 1,
},
Spec: helmv1.HelmRepositorySpec{
Interval: metav1.Duration{Duration: interval},
Timeout: &metav1.Duration{Duration: timeout},
Type: helmv1.HelmRepositoryTypeOCI,
Provider: helmv1.GenericOCIProvider,
URL: fmt.Sprintf("oci://%s", server.registryHost),
},
}
if tt.provider != "" {
obj.Spec.Provider = tt.provider
}
// If a provider specific image is provided, overwrite existing URL
// set earlier. It'll fail, but it's necessary to set them because
// the login check expects the URLs to be of certain pattern.
if tt.providerImg != "" {
obj.Spec.URL = tt.providerImg
}
if tt.secretOpts.username != "" && tt.secretOpts.password != "" {
tt.secret.Data[".dockerconfigjson"] = []byte(fmt.Sprintf(`{"auths": {%q: {"username": %q, "password": %q}}}`,
server.registryHost, tt.secretOpts.username, tt.secretOpts.password))
}
if tt.secret != nil {
clientBuilder.WithObjects(tt.secret)
obj.Spec.SecretRef = &meta.LocalObjectReference{
Name: tt.secret.Name,
}
}
if tt.certsSecret != nil {
clientBuilder.WithObjects(tt.certsSecret)
obj.Spec.CertSecretRef = &meta.LocalObjectReference{
Name: tt.certsSecret.Name,
}
}
r := &HelmRepositoryOCIReconciler{
Client: clientBuilder.Build(),
EventRecorder: record.NewFakeRecorder(32),
RegistryClientGenerator: registry.ClientGenerator,
patchOptions: getPatchOptions(helmRepositoryOCIOwnedConditions, "sc"),
}
g.Expect(r.Client.Create(ctx, obj)).ToNot(HaveOccurred())
defer func() {
g.Expect(r.Client.Delete(ctx, obj)).ToNot(HaveOccurred())
}()
sp := patch.NewSerialPatcher(obj, r.Client)
got, err := r.reconcile(ctx, sp, obj)
g.Expect(err != nil).To(Equal(tt.wantErr))
g.Expect(got).To(Equal(tt.want))
g.Expect(obj.Status.Conditions).To(conditions.MatchConditions(tt.assertConditions))
// In-progress status condition validity.
checker := conditionscheck.NewInProgressChecker(r.Client)
// NOTE: Check the object directly as reconcile() doesn't apply the
// final patch, the object has unapplied changes.
checker.DisableFetch = true
checker.WithT(g).CheckErr(ctx, obj)
})
}
}
func TestConditionsDiff(t *testing.T) {
tests := []struct {
a, b, want []string
}{
{[]string{"a", "b", "c"}, []string{"b", "d"}, []string{"a", "c"}},
{[]string{"a", "b", "c"}, []string{}, []string{"a", "b", "c"}},
{[]string{}, []string{"b", "d"}, []string{}},
{[]string{}, []string{}, []string{}},
{[]string{"a", "b"}, nil, []string{"a", "b"}},
{nil, []string{"a", "b"}, []string{}},
{nil, nil, []string{}},
}
for i, tt := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
g := NewWithT(t)
g.Expect(conditionsDiff(tt.a, tt.b)).To(Equal(tt.want))
})
}
}

View File

@ -40,6 +40,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
kstatus "github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/pkg/apis/meta"
@ -54,6 +55,7 @@ import (
intdigest "github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/helm/getter"
"github.com/fluxcd/source-controller/internal/helm/repository"
intpredicates "github.com/fluxcd/source-controller/internal/predicates"
sreconcile "github.com/fluxcd/source-controller/internal/reconcile"
"github.com/fluxcd/source-controller/internal/reconcile/summarize"
stls "github.com/fluxcd/source-controller/internal/tls"
@ -1522,50 +1524,22 @@ func TestHelmRepositoryReconciler_ReconcileTypeUpdatePredicateFilter(t *testing.
g.Expect(res.Status).To(Equal(kstatus.CurrentStatus))
// Switch to a OCI helm repository type
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "helmrepository-reconcile-",
Namespace: "default",
},
Data: map[string][]byte{
"username": []byte(testRegistryUsername),
"password": []byte(testRegistryPassword),
},
}
g.Expect(testEnv.CreateAndWait(ctx, secret)).To(Succeed())
obj.Spec.Type = helmv1.HelmRepositoryTypeOCI
obj.Spec.URL = fmt.Sprintf("oci://%s", testRegistryServer.registryHost)
obj.Spec.SecretRef = &meta.LocalObjectReference{
Name: secret.Name,
}
oldGen := obj.GetGeneration()
g.Expect(testEnv.Update(ctx, obj)).To(Succeed())
newGen := oldGen + 1
// Wait for HelmRepository to be Ready with new generation.
// Wait for HelmRepository to become static for new generation.
g.Eventually(func() bool {
if err := testEnv.Get(ctx, key, obj); err != nil {
return false
}
if !conditions.IsReady(obj) && obj.Status.Artifact != nil {
return false
}
readyCondition := conditions.Get(obj, meta.ReadyCondition)
if readyCondition == nil {
return false
}
return readyCondition.Status == metav1.ConditionTrue &&
newGen == readyCondition.ObservedGeneration &&
newGen == obj.Status.ObservedGeneration
return newGen == obj.Generation &&
!intpredicates.HelmRepositoryOCIRequireMigration(obj)
}, timeout).Should(BeTrue())
// Check if the object status is valid.
condns = &conditionscheck.Conditions{NegativePolarity: helmRepositoryOCINegativeConditions}
checker = conditionscheck.NewChecker(testEnv.Client, condns)
checker.WithT(g).CheckErr(ctx, obj)
g.Expect(testEnv.Delete(ctx, obj)).To(Succeed())
// Wait for HelmRepository to be deleted
@ -1730,3 +1704,90 @@ func TestHelmRepositoryReconciler_InMemoryCaching(t *testing.T) {
_, cacheHit := testCache.Get(helmRepo.GetArtifact().Path)
g.Expect(cacheHit).To(BeTrue())
}
func TestHelmRepositoryReconciler_ociMigration(t *testing.T) {
g := NewWithT(t)
testns, err := testEnv.CreateNamespace(ctx, "hr-oci-migration-test")
g.Expect(err).ToNot(HaveOccurred())
t.Cleanup(func() {
g.Expect(testEnv.Cleanup(ctx, testns)).ToNot(HaveOccurred())
})
hr := &helmv1.HelmRepository{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("hr-%s", randStringRunes(5)),
Namespace: testns.Name,
},
}
hrKey := client.ObjectKeyFromObject(hr)
// Migrates newly created object with finalizer.
hr.ObjectMeta.Finalizers = append(hr.ObjectMeta.Finalizers, "foo.bar", sourcev1.SourceFinalizer)
hr.Spec = helmv1.HelmRepositorySpec{
Type: helmv1.HelmRepositoryTypeOCI,
URL: "oci://foo/bar",
Interval: metav1.Duration{Duration: interval},
}
g.Expect(testEnv.Create(ctx, hr)).ToNot(HaveOccurred())
g.Eventually(func() bool {
_ = testEnv.Get(ctx, hrKey, hr)
return !intpredicates.HelmRepositoryOCIRequireMigration(hr)
}, timeout, time.Second).Should(BeTrue())
// Migrates updated object with finalizer.
patchHelper, err := patch.NewHelper(hr, testEnv.Client)
g.Expect(err).ToNot(HaveOccurred())
hr.ObjectMeta.Finalizers = append(hr.ObjectMeta.Finalizers, sourcev1.SourceFinalizer)
hr.Spec.URL = "oci://foo/baz"
g.Expect(patchHelper.Patch(ctx, hr)).ToNot(HaveOccurred())
g.Eventually(func() bool {
_ = testEnv.Get(ctx, hrKey, hr)
return !intpredicates.HelmRepositoryOCIRequireMigration(hr)
}, timeout, time.Second).Should(BeTrue())
// Migrates deleted object with finalizer.
patchHelper, err = patch.NewHelper(hr, testEnv.Client)
g.Expect(err).ToNot(HaveOccurred())
// Suspend the object to prevent finalizer from getting removed.
// Ensure only flux finalizer is set to allow the object to be garbage
// collected at the end.
// NOTE: Suspending and updating finalizers are done separately here as
// doing them in a single patch results in flaky test where the finalizer
// update doesn't gets registered with the kube-apiserver, resulting in
// timeout waiting for finalizer to appear on the object below.
hr.Spec.Suspend = true
g.Expect(patchHelper.Patch(ctx, hr)).ToNot(HaveOccurred())
g.Eventually(func() bool {
_ = k8sClient.Get(ctx, hrKey, hr)
return hr.Spec.Suspend == true
}, timeout).Should(BeTrue())
patchHelper, err = patch.NewHelper(hr, testEnv.Client)
g.Expect(err).ToNot(HaveOccurred())
// Add finalizer and verify that finalizer exists on the object using a live
// client.
hr.ObjectMeta.Finalizers = []string{sourcev1.SourceFinalizer}
g.Expect(patchHelper.Patch(ctx, hr)).ToNot(HaveOccurred())
g.Eventually(func() bool {
_ = k8sClient.Get(ctx, hrKey, hr)
return controllerutil.ContainsFinalizer(hr, sourcev1.SourceFinalizer)
}, timeout).Should(BeTrue())
// Delete the object and verify.
g.Expect(testEnv.Delete(ctx, hr)).ToNot(HaveOccurred())
g.Eventually(func() bool {
if err := testEnv.Get(ctx, hrKey, hr); err != nil {
return apierrors.IsNotFound(err)
}
return false
}, timeout).Should(BeTrue())
}

View File

@ -58,7 +58,6 @@ import (
sourcev1 "github.com/fluxcd/source-controller/api/v1"
sourcev1beta2 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/internal/cache"
"github.com/fluxcd/source-controller/internal/helm/registry"
// +kubebuilder:scaffold:imports
)
@ -372,17 +371,6 @@ func TestMain(m *testing.M) {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}
if err = (&HelmRepositoryOCIReconciler{
Client: testEnv,
EventRecorder: record.NewFakeRecorder(32),
Metrics: testMetricsH,
RegistryClientGenerator: registry.ClientGenerator,
}).SetupWithManagerAndOptions(testEnv, HelmRepositoryReconcilerOptions{
RateLimiter: controller.GetDefaultRateLimiter(),
}); err != nil {
panic(fmt.Sprintf("Failed to start HelmRepositoryOCIReconciler: %v", err))
}
if err := (&HelmChartReconciler{
Client: testEnv,
EventRecorder: record.NewFakeRecorder(32),

View File

@ -18,69 +18,69 @@ package predicates
import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
)
// helmRepositoryTypeFilter filters events for a given HelmRepository type.
// It returns true if the event is for a HelmRepository of the given type.
func helmRepositoryTypeFilter(repositoryType string, o client.Object) bool {
// HelmRepositoryOCIMigrationPredicate implements predicate functions to allow
// events for HelmRepository OCI that need migration to static object. Non-OCI
// HelmRepositories are always allowed.
type HelmRepositoryOCIMigrationPredicate struct {
predicate.Funcs
}
// Create allows events for objects that need migration to static object.
func (HelmRepositoryOCIMigrationPredicate) Create(e event.CreateEvent) bool {
return HelmRepositoryOCIRequireMigration(e.Object)
}
// Update allows events for objects that need migration to static object.
func (HelmRepositoryOCIMigrationPredicate) Update(e event.UpdateEvent) bool {
return HelmRepositoryOCIRequireMigration(e.ObjectNew)
}
// Delete allows events for objects that need migration to static object.
func (HelmRepositoryOCIMigrationPredicate) Delete(e event.DeleteEvent) bool {
return HelmRepositoryOCIRequireMigration(e.Object)
}
// HelmRepositoryOCIRequireMigration returns if a given HelmRepository of type
// OCI requires migration to static object. For non-OCI HelmRepository, it
// returns true.
func HelmRepositoryOCIRequireMigration(o client.Object) bool {
if o == nil {
return false
}
// return true if the object is a HelmRepository
// and the type is the same as the one we are looking for.
hr, ok := o.(*sourcev1.HelmRepository)
if !ok {
return false
}
return hr.Spec.Type == repositoryType
}
// HelmRepositoryTypePredicate is a predicate that filters events for a given HelmRepository type.
type HelmRepositoryTypePredicate struct {
RepositoryType string
predicate.Funcs
}
// Create returns true if the Create event is for a HelmRepository of the given type.
func (h HelmRepositoryTypePredicate) Create(e event.CreateEvent) bool {
return helmRepositoryTypeFilter(h.RepositoryType, e.Object)
}
// Update returns true if the Update event is for a HelmRepository of the given type.
func (h HelmRepositoryTypePredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
if hr.Spec.Type != sourcev1.HelmRepositoryTypeOCI {
// Always allow non-OCI HelmRepository.
return true
}
// check if the old object is a HelmRepository
oldObj, ok := e.ObjectOld.(*sourcev1.HelmRepository)
if !ok {
return false
if controllerutil.ContainsFinalizer(hr, sourcev1.SourceFinalizer) || !hasEmptyHelmRepositoryStatus(hr) {
return true
}
// check if the new object is a HelmRepository
newObj, ok := e.ObjectNew.(*sourcev1.HelmRepository)
if !ok {
return false
return false
}
// hasEmptyHelmRepositoryStatus checks if the status of a HelmRepository is
// empty.
func hasEmptyHelmRepositoryStatus(obj *sourcev1.HelmRepository) bool {
if obj.Status.ObservedGeneration == 0 &&
obj.Status.Conditions == nil &&
obj.Status.URL == "" &&
obj.Status.Artifact == nil &&
obj.Status.ReconcileRequestStatus.LastHandledReconcileAt == "" {
return true
}
isOfRepositoryType := newObj.Spec.Type == h.RepositoryType
wasOfRepositoryType := oldObj.Spec.Type == h.RepositoryType && !isOfRepositoryType
return isOfRepositoryType || wasOfRepositoryType
}
// Delete returns true if the Delete event is for a HelmRepository of the given type.
func (h HelmRepositoryTypePredicate) Delete(e event.DeleteEvent) bool {
return helmRepositoryTypeFilter(h.RepositoryType, e.Object)
}
// Generic returns true if the Generic event is for a HelmRepository of the given type.
func (h HelmRepositoryTypePredicate) Generic(e event.GenericEvent) bool {
return helmRepositoryTypeFilter(h.RepositoryType, e.Object)
return false
}

View File

@ -19,109 +19,241 @@ package predicates
import (
"testing"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/controller-runtime/pkg/client"
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/event"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
v1 "github.com/fluxcd/source-controller/api/v1"
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
)
func TestHelmRepositoryTypePredicate_Create(t *testing.T) {
obj := &sourcev1.HelmRepository{Spec: sourcev1.HelmRepositorySpec{}}
http := &sourcev1.HelmRepository{Spec: sourcev1.HelmRepositorySpec{Type: "default"}}
oci := &sourcev1.HelmRepository{Spec: sourcev1.HelmRepositorySpec{Type: "oci"}}
not := &unstructured.Unstructured{}
func TestHelmRepositoryOCIMigrationPredicate_Create(t *testing.T) {
tests := []struct {
name string
obj client.Object
want bool
name string
beforeFunc func(o *sourcev1.HelmRepository)
want bool
}{
{name: "new", obj: obj, want: false},
{name: "http", obj: http, want: true},
{name: "oci", obj: oci, want: false},
{name: "not a HelmRepository", obj: not, want: false},
{name: "nil", obj: nil, want: false},
{
name: "new oci helm repo no status",
beforeFunc: func(o *sourcev1.HelmRepository) {
o.Spec.Type = sourcev1.HelmRepositoryTypeOCI
},
want: false,
},
{
name: "new oci helm repo with default observed gen status",
beforeFunc: func(o *sourcev1.HelmRepository) {
o.Spec.Type = sourcev1.HelmRepositoryTypeOCI
o.Status.ObservedGeneration = -1
},
want: true,
},
{
name: "old oci helm repo with finalizer only",
beforeFunc: func(o *sourcev1.HelmRepository) {
o.Finalizers = []string{sourcev1.SourceFinalizer}
o.Spec.Type = sourcev1.HelmRepositoryTypeOCI
},
want: true,
},
{
name: "old oci helm repo with status only",
beforeFunc: func(o *sourcev1.HelmRepository) {
o.Spec.Type = sourcev1.HelmRepositoryTypeOCI
o.Status = sourcev1.HelmRepositoryStatus{
ObservedGeneration: 3,
}
conditions.MarkTrue(o, meta.ReadyCondition, "foo", "bar")
},
want: true,
},
{
name: "old oci helm repo with finalizer and status",
beforeFunc: func(o *sourcev1.HelmRepository) {
o.Finalizers = []string{sourcev1.SourceFinalizer}
o.Spec.Type = sourcev1.HelmRepositoryTypeOCI
o.Status = sourcev1.HelmRepositoryStatus{
ObservedGeneration: 3,
}
conditions.MarkTrue(o, meta.ReadyCondition, "foo", "bar")
},
want: true,
},
{
name: "new default helm repo",
beforeFunc: func(o *sourcev1.HelmRepository) {
o.Spec.Type = sourcev1.HelmRepositoryTypeDefault
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewWithT(t)
g := NewWithT(t)
so := HelmRepositoryTypePredicate{RepositoryType: "default"}
e := event.CreateEvent{
Object: tt.obj,
o := &sourcev1.HelmRepository{}
if tt.beforeFunc != nil {
tt.beforeFunc(o)
}
g.Expect(so.Create(e)).To(gomega.Equal(tt.want))
e := event.CreateEvent{Object: o}
p := HelmRepositoryOCIMigrationPredicate{}
g.Expect(p.Create(e)).To(Equal(tt.want))
})
}
}
func TestHelmRepositoryTypePredicate_Update(t *testing.T) {
repoA := &sourcev1.HelmRepository{Spec: sourcev1.HelmRepositorySpec{
Type: sourcev1.HelmRepositoryTypeDefault,
}}
repoB := &sourcev1.HelmRepository{Spec: sourcev1.HelmRepositorySpec{
Type: sourcev1.HelmRepositoryTypeOCI,
}}
empty := &sourcev1.HelmRepository{}
not := &unstructured.Unstructured{}
func TestHelmRepositoryOCIMigrationPredicate_Update(t *testing.T) {
tests := []struct {
name string
old client.Object
new client.Object
want bool
name string
beforeFunc func(oldObj, newObj *sourcev1.HelmRepository)
want bool
}{
{name: "diff type", old: repoA, new: repoB, want: true},
{name: "new with type", old: empty, new: repoA, want: true},
{name: "old with type", old: repoA, new: empty, want: true},
{name: "old not a HelmRepository", old: not, new: repoA, want: false},
{name: "new not a HelmRepository", old: repoA, new: not, want: false},
{name: "old nil", old: nil, new: repoA, want: false},
{name: "new nil", old: repoA, new: nil, want: false},
{
name: "update oci repo",
beforeFunc: func(oldObj, newObj *sourcev1.HelmRepository) {
oldObj.Spec = sourcev1.HelmRepositorySpec{
Type: sourcev1.HelmRepositoryTypeOCI,
URL: "oci://foo/bar",
}
*newObj = *oldObj.DeepCopy()
newObj.Spec.URL = "oci://foo/baz"
},
want: false,
},
{
name: "migrate old oci repo with status only",
beforeFunc: func(oldObj, newObj *sourcev1.HelmRepository) {
oldObj.Generation = 2
oldObj.Spec = sourcev1.HelmRepositorySpec{
Type: sourcev1.HelmRepositoryTypeOCI,
}
oldObj.Status = sourcev1.HelmRepositoryStatus{
ObservedGeneration: 2,
}
conditions.MarkTrue(oldObj, meta.ReadyCondition, "foo", "bar")
*newObj = *oldObj.DeepCopy()
newObj.Generation = 3
},
want: true,
},
{
name: "migrate old oci repo with finalizer only",
beforeFunc: func(oldObj, newObj *sourcev1.HelmRepository) {
oldObj.Generation = 2
oldObj.Finalizers = []string{sourcev1.SourceFinalizer}
oldObj.Spec = sourcev1.HelmRepositorySpec{
Type: sourcev1.HelmRepositoryTypeOCI,
}
*newObj = *oldObj.DeepCopy()
newObj.Generation = 3
},
want: true,
},
{
name: "type switch default to oci",
beforeFunc: func(oldObj, newObj *sourcev1.HelmRepository) {
oldObj.Spec = sourcev1.HelmRepositorySpec{
Type: sourcev1.HelmRepositoryTypeDefault,
}
oldObj.Status = sourcev1.HelmRepositoryStatus{
Artifact: &v1.Artifact{},
URL: "http://some-address",
ObservedGeneration: 3,
}
*newObj = *oldObj.DeepCopy()
newObj.Spec = sourcev1.HelmRepositorySpec{
Type: sourcev1.HelmRepositoryTypeOCI,
}
},
want: true,
},
{
name: "type switch oci to default",
beforeFunc: func(oldObj, newObj *sourcev1.HelmRepository) {
oldObj.Spec = sourcev1.HelmRepositorySpec{
Type: sourcev1.HelmRepositoryTypeOCI,
}
*newObj = *oldObj.DeepCopy()
newObj.Spec.Type = sourcev1.HelmRepositoryTypeDefault
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewWithT(t)
g := NewWithT(t)
so := HelmRepositoryTypePredicate{RepositoryType: "default"}
oldObj := &sourcev1.HelmRepository{}
newObj := oldObj.DeepCopy()
if tt.beforeFunc != nil {
tt.beforeFunc(oldObj, newObj)
}
e := event.UpdateEvent{
ObjectOld: tt.old,
ObjectNew: tt.new,
ObjectOld: oldObj,
ObjectNew: newObj,
}
g.Expect(so.Update(e)).To(gomega.Equal(tt.want))
p := HelmRepositoryOCIMigrationPredicate{}
g.Expect(p.Update(e)).To(Equal(tt.want))
})
}
}
func TestHelmRepositoryTypePredicate_Delete(t *testing.T) {
obj := &sourcev1.HelmRepository{Spec: sourcev1.HelmRepositorySpec{}}
http := &sourcev1.HelmRepository{Spec: sourcev1.HelmRepositorySpec{Type: "default"}}
oci := &sourcev1.HelmRepository{Spec: sourcev1.HelmRepositorySpec{Type: "oci"}}
not := &unstructured.Unstructured{}
func TestHelmRepositoryOCIMigrationPredicate_Delete(t *testing.T) {
tests := []struct {
name string
obj client.Object
want bool
name string
beforeFunc func(obj *sourcev1.HelmRepository)
want bool
}{
{name: "new", obj: obj, want: false},
{name: "http", obj: http, want: true},
{name: "oci", obj: oci, want: false},
{name: "not a HelmRepository", obj: not, want: false},
{name: "nil", obj: nil, want: false},
{
name: "oci with finalizer",
beforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Finalizers = []string{sourcev1.SourceFinalizer}
obj.Spec.Type = sourcev1.HelmRepositoryTypeOCI
},
want: true,
},
{
name: "oci with status",
beforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Spec.Type = sourcev1.HelmRepositoryTypeOCI
obj.Status.ObservedGeneration = 4
},
want: true,
},
{
name: "oci without finalizer or status",
beforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Spec.Type = sourcev1.HelmRepositoryTypeOCI
},
want: false,
},
{
name: "default helm repo",
beforeFunc: func(obj *sourcev1.HelmRepository) {
obj.Spec.Type = sourcev1.HelmRepositoryTypeDefault
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := gomega.NewWithT(t)
g := NewWithT(t)
so := HelmRepositoryTypePredicate{RepositoryType: "default"}
e := event.DeleteEvent{
Object: tt.obj,
obj := &sourcev1.HelmRepository{}
if tt.beforeFunc != nil {
tt.beforeFunc(obj)
}
g.Expect(so.Delete(e)).To(gomega.Equal(tt.want))
e := event.DeleteEvent{Object: obj}
p := HelmRepositoryOCIMigrationPredicate{}
g.Expect(p.Delete(e)).To(Equal(tt.want))
})
}
}

13
main.go
View File

@ -203,19 +203,6 @@ func main() {
os.Exit(1)
}
if err := (&controller.HelmRepositoryOCIReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metrics,
ControllerName: controllerName,
RegistryClientGenerator: registry.ClientGenerator,
}).SetupWithManagerAndOptions(mgr, controller.HelmRepositoryReconcilerOptions{
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", v1beta2.HelmRepositoryKind, "type", "OCI")
os.Exit(1)
}
if err := (&controller.HelmRepositoryReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,