From 0c904a1879e023d7a8d782fb5be2cf904e7c490d Mon Sep 17 00:00:00 2001 From: Hidde Beydals Date: Thu, 26 Nov 2020 12:51:56 +0100 Subject: [PATCH] Watch chart sources for revision changes To enqueue a new reconciliation for the HelmChart sources as soon as the revision of their upstream source changes. Signed-off-by: Hidde Beydals --- api/v1beta1/source.go | 6 ++ controllers/helmchart_controller.go | 115 +++++++++++++++++++++++++++- controllers/source_predicate.go | 63 +++++++++++++++ 3 files changed, 182 insertions(+), 2 deletions(-) create mode 100644 controllers/source_predicate.go diff --git a/api/v1beta1/source.go b/api/v1beta1/source.go index b5afd25e..5cb8ae43 100644 --- a/api/v1beta1/source.go +++ b/api/v1beta1/source.go @@ -4,6 +4,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + // SourceIndexKey is the key used for indexing resources based on + // their sources. + SourceIndexKey string = ".metadata.source" +) + // Source interface must be supported by all API types. // +k8s:deepcopy-gen=false type Source interface { diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index 2188e009..ff7875af 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -40,9 +40,13 @@ import ( kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/tools/reference" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/fluxcd/pkg/runtime/events" "github.com/fluxcd/pkg/runtime/metrics" @@ -208,10 +212,28 @@ func (r *HelmChartReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts r.indexHelmRepositoryByURL); err != nil { return fmt.Errorf("failed setting index fields: %w", err) } + if err := mgr.GetCache().IndexField(context.TODO(), &sourcev1.HelmChart{}, sourcev1.SourceIndexKey, + r.indexHelmChartBySource); err != nil { + return fmt.Errorf("failed setting index fields: %w", err) + } return ctrl.NewControllerManagedBy(mgr). - For(&sourcev1.HelmChart{}). - WithEventFilter(predicates.ChangePredicate{}). + For(&sourcev1.HelmChart{}, builder.WithPredicates(predicates.ChangePredicate{})). + Watches( + &source.Kind{Type: &sourcev1.HelmRepository{}}, + &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForHelmRepositoryChange)}, + builder.WithPredicates(SourceRevisionChangePredicate{}), + ). + Watches( + &source.Kind{Type: &sourcev1.GitRepository{}}, + &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForGitRepositoryChange)}, + builder.WithPredicates(SourceRevisionChangePredicate{}), + ). + Watches( + &source.Kind{Type: &sourcev1.Bucket{}}, + &handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForBucketChange)}, + builder.WithPredicates(SourceRevisionChangePredicate{}), + ). WithOptions(controller.Options{MaxConcurrentReconciles: opts.MaxConcurrentReconciles}). Complete(r) } @@ -726,6 +748,14 @@ func (r *HelmChartReconciler) indexHelmRepositoryByURL(o runtime.Object) []strin return nil } +func (r *HelmChartReconciler) indexHelmChartBySource(o runtime.Object) []string { + hc, ok := o.(*sourcev1.HelmChart) + if !ok { + panic(fmt.Sprintf("Expected a HelmChart, got %T", o)) + } + return []string{fmt.Sprintf("%s/%s", hc.Spec.SourceRef.Kind, hc.Spec.SourceRef.Name)} +} + func (r *HelmChartReconciler) resolveDependencyRepository(ctx context.Context, dep *helmchart.Dependency, namespace string) (*sourcev1.HelmRepository, error) { url := helm.NormalizeChartRepositoryURL(dep.Repository) if url == "" { @@ -766,3 +796,84 @@ func (r *HelmChartReconciler) getHelmRepositorySecret(ctx context.Context, repos return nil, nil } + +func (r *HelmChartReconciler) requestsForHelmRepositoryChange(obj handler.MapObject) []reconcile.Request { + repo, ok := obj.Object.(*sourcev1.HelmRepository) + if !ok { + panic(fmt.Sprintf("Expected a HelmRepository, got %T", repo)) + } + // If we do not have an artifact, we have no requests to make + if repo.GetArtifact() == nil { + return nil + } + + ctx := context.Background() + var list sourcev1.HelmRepositoryList + if err := r.List(ctx, &list, client.MatchingFields{ + sourcev1.SourceIndexKey: fmt.Sprintf("%s/%s", repo.Kind, repo.Name), + }); err != nil { + r.Log.Error(err, "failed to list HelmCharts for HelmRepository") + return nil + } + + var reqs []reconcile.Request + for _, i := range list.Items { + req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: i.GetNamespace(), Name: i.GetName()}} + reqs = append(reqs, req) + } + return reqs +} + +func (r *HelmChartReconciler) requestsForGitRepositoryChange(obj handler.MapObject) []reconcile.Request { + repo, ok := obj.Object.(*sourcev1.GitRepository) + if !ok { + panic(fmt.Sprintf("Expected a GitRepository, got %T", repo)) + } + // If we do not have an artifact, we have no requests to make + if repo.GetArtifact() == nil { + return nil + } + + ctx := context.Background() + var list sourcev1.HelmChartList + if err := r.List(ctx, &list, client.MatchingFields{ + sourcev1.SourceIndexKey: fmt.Sprintf("%s/%s", repo.Kind, repo.Name), + }); err != nil { + r.Log.Error(err, "failed to list HelmCharts for GitRepository") + return nil + } + + var reqs []reconcile.Request + for _, i := range list.Items { + req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: i.GetNamespace(), Name: i.GetName()}} + reqs = append(reqs, req) + } + return reqs +} + +func (r *HelmChartReconciler) requestsForBucketChange(obj handler.MapObject) []reconcile.Request { + bucket, ok := obj.Object.(*sourcev1.Bucket) + if !ok { + panic(fmt.Sprintf("Expected a Bucket, got %T", bucket)) + } + // If we do not have an artifact, we have no requests to make + if bucket.GetArtifact() == nil { + return nil + } + + ctx := context.Background() + var list sourcev1.HelmChartList + if err := r.List(ctx, &list, client.MatchingFields{ + sourcev1.SourceIndexKey: fmt.Sprintf("%s/%s", bucket.Kind, bucket.Name), + }); err != nil { + r.Log.Error(err, "failed to list HelmCharts for Bucket") + return nil + } + + var reqs []reconcile.Request + for _, i := range list.Items { + req := reconcile.Request{NamespacedName: types.NamespacedName{Namespace: i.GetNamespace(), Name: i.GetName()}} + reqs = append(reqs, req) + } + return reqs +} diff --git a/controllers/source_predicate.go b/controllers/source_predicate.go new file mode 100644 index 00000000..67dda841 --- /dev/null +++ b/controllers/source_predicate.go @@ -0,0 +1,63 @@ +/* +Copyright 2020 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + sourcev1 "github.com/fluxcd/source-controller/api/v1beta1" +) + +type SourceRevisionChangePredicate struct { + predicate.Funcs +} + +func (SourceRevisionChangePredicate) Update(e event.UpdateEvent) bool { + if e.MetaOld == nil || e.MetaNew == nil { + return false + } + + oldSource, ok := e.ObjectOld.(sourcev1.Source) + if !ok { + return false + } + + newSource, ok := e.ObjectNew.(sourcev1.Source) + if !ok { + return false + } + + if oldSource.GetArtifact() == nil && newSource.GetArtifact() != nil { + return true + } + + if oldSource.GetArtifact() != nil && newSource.GetArtifact() != nil && + oldSource.GetArtifact().Revision != newSource.GetArtifact().Revision { + return true + } + + return false +} + +func (SourceRevisionChangePredicate) Create(e event.CreateEvent) bool { + return false +} + +func (SourceRevisionChangePredicate) Delete(e event.DeleteEvent) bool { + return false +}