Move dedicated watcher to in-controller watcher
This prevents the resources from getting annotated, and instead uses the `handler.EnqueueRequestsFromMapFunc` to queue requests based on changes to the source objects. Signed-off-by: Hidde Beydals <hello@hidde.co>
This commit is contained in:
parent
2e246ce4be
commit
b3baf39e11
|
@ -25,6 +25,16 @@ rules:
|
|||
- patch
|
||||
- update
|
||||
- watch
|
||||
- apiGroups:
|
||||
- helm.toolkit.fluxcd.io
|
||||
resources:
|
||||
- helmreleases/finalizers
|
||||
verbs:
|
||||
- create
|
||||
- delete
|
||||
- get
|
||||
- patch
|
||||
- update
|
||||
- apiGroups:
|
||||
- helm.toolkit.fluxcd.io
|
||||
resources:
|
||||
|
|
|
@ -1,122 +0,0 @@
|
|||
/*
|
||||
Copyright 2020 The Flux authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/util/retry"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
sourcev1 "github.com/fluxcd/source-controller/api/v1beta1"
|
||||
|
||||
v2 "github.com/fluxcd/helm-controller/api/v2beta1"
|
||||
)
|
||||
|
||||
// HelmChartWatcher watches HelmChart objects for revision changes and
|
||||
// triggers a sync for all the HelmReleases that reference a changed source.
|
||||
type HelmChartWatcher struct {
|
||||
client.Client
|
||||
Log logr.Logger
|
||||
Scheme *runtime.Scheme
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch
|
||||
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/status,verbs=get
|
||||
|
||||
func (r *HelmChartWatcher) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var chart sourcev1.HelmChart
|
||||
if err := r.Get(ctx, req.NamespacedName, &chart); err != nil {
|
||||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||||
}
|
||||
|
||||
log := r.Log.WithValues(strings.ToLower(chart.Kind), req.NamespacedName)
|
||||
log.Info("new artifact detected", "revision", chart.GetArtifact().Revision)
|
||||
|
||||
// Get the list of HelmReleases that are using this HelmChart.
|
||||
var list v2.HelmReleaseList
|
||||
if err := r.List(ctx, &list,
|
||||
client.MatchingFields{v2.SourceIndexKey: fmt.Sprintf("%s/%s", req.Namespace, req.Name)}); err != nil {
|
||||
log.Error(err, "unable to list HelmReleases")
|
||||
return ctrl.Result{}, err
|
||||
}
|
||||
|
||||
// Trigger reconciliation for each HelmRelease using this HelmChart.
|
||||
for _, hr := range list.Items {
|
||||
namespacedName := types.NamespacedName{Namespace: hr.Namespace, Name: hr.Name}
|
||||
if err := r.requestReconciliation(ctx, hr); err != nil {
|
||||
log.Error(err, "unable to annotate HelmRelease", strings.ToLower(hr.Kind), namespacedName)
|
||||
continue
|
||||
}
|
||||
log.Info("requested immediate reconciliation", strings.ToLower(hr.Kind), namespacedName)
|
||||
}
|
||||
|
||||
return ctrl.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *HelmChartWatcher) SetupWithManager(mgr ctrl.Manager) error {
|
||||
// Create a HelmRelease index based on the HelmChart name
|
||||
err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v2.HelmRelease{}, v2.SourceIndexKey,
|
||||
func(rawObj runtime.Object) []string {
|
||||
hr := rawObj.(*v2.HelmRelease)
|
||||
return []string{fmt.Sprintf("%s/%s", hr.Spec.Chart.GetNamespace(hr.Namespace), hr.GetHelmChartName())}
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&sourcev1.HelmChart{}).
|
||||
WithEventFilter(HelmChartRevisionChangePredicate{}).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
// requestReconciliation annotates the given HelmRelease to be reconciled immediately.
|
||||
func (r *HelmChartWatcher) requestReconciliation(ctx context.Context, hr v2.HelmRelease) error {
|
||||
firstTry := true
|
||||
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
|
||||
if !firstTry {
|
||||
if err := r.Get(context.TODO(),
|
||||
types.NamespacedName{Namespace: hr.Namespace, Name: hr.Name},
|
||||
&hr,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
firstTry = false
|
||||
if hr.Annotations == nil {
|
||||
hr.Annotations = make(map[string]string)
|
||||
}
|
||||
hr.Annotations[meta.ReconcileRequestAnnotation] = metav1.Now().String()
|
||||
err = r.Update(ctx, &hr)
|
||||
return
|
||||
})
|
||||
}
|
|
@ -42,8 +42,12 @@ import (
|
|||
kuberecorder "k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/tools/reference"
|
||||
ctrl "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
|
||||
"github.com/fluxcd/pkg/apis/meta"
|
||||
"github.com/fluxcd/pkg/runtime/events"
|
||||
|
@ -57,6 +61,13 @@ import (
|
|||
"github.com/fluxcd/helm-controller/internal/util"
|
||||
)
|
||||
|
||||
// +kubebuilder:rbac:groups=helm.toolkit.fluxcd.io,resources=helmreleases,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=helm.toolkit.fluxcd.io,resources=helmreleases/status,verbs=get;update;patch
|
||||
// +kubebuilder:rbac:groups=helm.toolkit.fluxcd.io,resources=helmreleases/finalizers,verbs=get;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch
|
||||
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/status,verbs=get
|
||||
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
|
||||
|
||||
// HelmReleaseReconciler reconciles a HelmRelease object
|
||||
type HelmReleaseReconciler struct {
|
||||
client.Client
|
||||
|
@ -69,6 +80,29 @@ type HelmReleaseReconciler struct {
|
|||
MetricsRecorder *metrics.Recorder
|
||||
}
|
||||
|
||||
func (r *HelmReleaseReconciler) SetupWithManager(mgr ctrl.Manager, opts HelmReleaseReconcilerOptions) error {
|
||||
// Index the HelmRelease by the HelmChart references they point at
|
||||
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &v2.HelmRelease{}, v2.SourceIndexKey,
|
||||
func(rawObj runtime.Object) []string {
|
||||
hr := rawObj.(*v2.HelmRelease)
|
||||
return []string{fmt.Sprintf("%s/%s", hr.Spec.Chart.GetNamespace(hr.Namespace), hr.GetHelmChartName())}
|
||||
},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.requeueDependency = opts.DependencyRequeueInterval
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v2.HelmRelease{}, builder.WithPredicates(predicates.ChangePredicate{})).
|
||||
Watches(
|
||||
&source.Kind{Type: &sourcev1.HelmChart{}},
|
||||
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.helmReleasesForHelmChart)},
|
||||
builder.WithPredicates(HelmChartRevisionChangePredicate{}),
|
||||
).
|
||||
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
// ConditionError represents an error with a status condition reason attached.
|
||||
type ConditionError struct {
|
||||
Reason string
|
||||
|
@ -79,10 +113,6 @@ func (c ConditionError) Error() string {
|
|||
return c.Err.Error()
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups=helm.toolkit.fluxcd.io,resources=helmreleases,verbs=get;list;watch;create;update;patch;delete
|
||||
// +kubebuilder:rbac:groups=helm.toolkit.fluxcd.io,resources=helmreleases/status,verbs=get;update;patch
|
||||
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
|
||||
|
||||
func (r *HelmReleaseReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
|
||||
ctx := context.Background()
|
||||
start := time.Now()
|
||||
|
@ -243,15 +273,6 @@ type HelmReleaseReconcilerOptions struct {
|
|||
DependencyRequeueInterval time.Duration
|
||||
}
|
||||
|
||||
func (r *HelmReleaseReconciler) SetupWithManager(mgr ctrl.Manager, opts HelmReleaseReconcilerOptions) error {
|
||||
r.requeueDependency = opts.DependencyRequeueInterval
|
||||
return ctrl.NewControllerManagedBy(mgr).
|
||||
For(&v2.HelmRelease{}).
|
||||
WithEventFilter(predicates.ChangePredicate{}).
|
||||
WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}).
|
||||
Complete(r)
|
||||
}
|
||||
|
||||
func (r *HelmReleaseReconciler) reconcileChart(ctx context.Context, hr *v2.HelmRelease) (*sourcev1.HelmChart, error) {
|
||||
chartName := types.NamespacedName{
|
||||
Namespace: hr.Spec.Chart.GetNamespace(hr.Namespace),
|
||||
|
@ -681,6 +702,25 @@ func (r *HelmReleaseReconciler) handleHelmActionResult(hr *v2.HelmRelease, revis
|
|||
}
|
||||
}
|
||||
|
||||
func (r *HelmReleaseReconciler) helmReleasesForHelmChart(obj handler.MapObject) []reconcile.Request {
|
||||
ctx := context.Background()
|
||||
var list v2.HelmReleaseList
|
||||
if err := r.List(ctx, &list, client.MatchingFields{
|
||||
v2.SourceIndexKey: fmt.Sprintf("%s/%s", obj.Meta.GetNamespace(), obj.Meta.GetName()),
|
||||
}); err != nil {
|
||||
r.Log.Error(err, "failed to list HelmReleases for HelmChart")
|
||||
return nil
|
||||
}
|
||||
reqs := make([]reconcile.Request, len(list.Items), len(list.Items))
|
||||
for i := range list.Items {
|
||||
reqs[i].NamespacedName.Name = list.Items[i].Name
|
||||
reqs[i].NamespacedName.Namespace = list.Items[i].Namespace
|
||||
|
||||
r.Log.Info("requesting reconciliation", v2.HelmReleaseKind, reqs[i].NamespacedName)
|
||||
}
|
||||
return reqs
|
||||
}
|
||||
|
||||
// event emits a Kubernetes event and forwards the event to notification controller if configured.
|
||||
func (r *HelmReleaseReconciler) event(hr v2.HelmRelease, revision, severity, msg string) {
|
||||
r.EventRecorder.Event(&hr, "Normal", severity, msg)
|
||||
|
|
8
main.go
8
main.go
|
@ -112,14 +112,6 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err = (&controllers.HelmChartWatcher{
|
||||
Client: mgr.GetClient(),
|
||||
Log: ctrl.Log.WithName("controllers").WithName("HelmChart"),
|
||||
Scheme: mgr.GetScheme(),
|
||||
}).SetupWithManager(mgr); err != nil {
|
||||
setupLog.Error(err, "unable to create controller", "controller", "HelmChart")
|
||||
os.Exit(1)
|
||||
}
|
||||
if err = (&controllers.HelmReleaseReconciler{
|
||||
Client: mgr.GetClient(),
|
||||
Config: mgr.GetConfig(),
|
||||
|
|
Loading…
Reference in New Issue