feat: restrict the cache's watches with label selector
This commit is contained in:
parent
485f7588fc
commit
727ae19e80
|
@ -1,7 +1,15 @@
|
||||||
package run
|
package run
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/dapr-sandbox/dapr-kubernetes-operator/pkg/resources"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
admregv1 "k8s.io/api/admissionregistration/v1"
|
||||||
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
rbacv1 "k8s.io/api/rbac/v1"
|
||||||
|
rtcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||||
|
rtclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||||
|
|
||||||
"github.com/dapr-sandbox/dapr-kubernetes-operator/pkg/controller"
|
"github.com/dapr-sandbox/dapr-kubernetes-operator/pkg/controller"
|
||||||
|
@ -38,13 +46,34 @@ func NewRunCmd() *cobra.Command {
|
||||||
Use: "run",
|
Use: "run",
|
||||||
Short: "run",
|
Short: "run",
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
selector, err := daprCtl.ReleaseSelector()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "unable to compute cache's watch selector")
|
||||||
|
}
|
||||||
|
|
||||||
|
controllerOpts.WatchSelectors = map[rtclient.Object]rtcache.ByObject{
|
||||||
|
// k8s
|
||||||
|
&rbacv1.ClusterRole{}: {Label: selector},
|
||||||
|
&rbacv1.ClusterRoleBinding{}: {Label: selector},
|
||||||
|
&rbacv1.Role{}: {Label: selector},
|
||||||
|
&rbacv1.RoleBinding{}: {Label: selector},
|
||||||
|
&admregv1.MutatingWebhookConfiguration{}: {Label: selector},
|
||||||
|
&corev1.Secret{}: {Label: selector},
|
||||||
|
&corev1.Service{}: {Label: selector},
|
||||||
|
&corev1.ServiceAccount{}: {Label: selector},
|
||||||
|
&appsv1.StatefulSet{}: {Label: selector},
|
||||||
|
&appsv1.Deployment{}: {Label: selector},
|
||||||
|
// dapr
|
||||||
|
resources.UnstructuredFor("dapr.io", "v1alpha1", "Configuration"): {Label: selector},
|
||||||
|
}
|
||||||
|
|
||||||
return controller.Start(controllerOpts, func(manager manager.Manager, opts controller.Options) error {
|
return controller.Start(controllerOpts, func(manager manager.Manager, opts controller.Options) error {
|
||||||
_, err := daprCtl.NewReconciler(cmd.Context(), manager, helmOpts)
|
_, err := daprCtl.NewReconciler(cmd.Context(), manager, helmOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Wrap(err, "unable to set-up DaprControlPlane reconciler")
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return nil
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,6 +107,9 @@ func (a *ApplyAction) Run(ctx context.Context, rc *ReconciliationRequest) error
|
||||||
r := gvk.GroupVersion().String() + ":" + gvk.Kind
|
r := gvk.GroupVersion().String() + ":" + gvk.Kind
|
||||||
|
|
||||||
if _, ok := a.subscriptions[r]; !ok {
|
if _, ok := a.subscriptions[r]; !ok {
|
||||||
|
|
||||||
|
a.l.Info("watch", "ref", r)
|
||||||
|
|
||||||
err = rc.Reconciler.Watch(
|
err = rc.Reconciler.Watch(
|
||||||
&obj,
|
&obj,
|
||||||
rc.Reconciler.EnqueueRequestForOwner(&daprApi.DaprControlPlane{}, handler.OnlyControllerOwner()),
|
rc.Reconciler.EnqueueRequestForOwner(&daprApi.DaprControlPlane{}, handler.OnlyControllerOwner()),
|
||||||
|
@ -135,6 +138,9 @@ func (a *ApplyAction) Run(ctx context.Context, rc *ReconciliationRequest) error
|
||||||
r := gvk.GroupVersion().String() + ":" + gvk.Kind
|
r := gvk.GroupVersion().String() + ":" + gvk.Kind
|
||||||
|
|
||||||
if _, ok := a.subscriptions[r]; !ok {
|
if _, ok := a.subscriptions[r]; !ok {
|
||||||
|
|
||||||
|
a.l.Info("watch", "ref", r)
|
||||||
|
|
||||||
err = rc.Reconciler.Watch(
|
err = rc.Reconciler.Watch(
|
||||||
&obj,
|
&obj,
|
||||||
rc.Reconciler.EnqueueRequestsFromMapFunc(labelsToRequest),
|
rc.Reconciler.EnqueueRequestsFromMapFunc(labelsToRequest),
|
||||||
|
@ -196,6 +202,7 @@ func (a *ApplyAction) Run(ctx context.Context, rc *ReconciliationRequest) error
|
||||||
|
|
||||||
a.l.Info("run",
|
a.l.Info("run",
|
||||||
"apply", "true",
|
"apply", "true",
|
||||||
|
"gen", rc.Resource.Generation,
|
||||||
"ref", resources.Ref(&obj))
|
"ref", resources.Ref(&obj))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,3 +88,21 @@ func dependantWithLabels(watchUpdate bool, watchDelete bool) predicate.Predicate
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ReleaseSelector() (labels.Selector, error) {
|
||||||
|
hasReleaseNameLabel, err := labels.NewRequirement(DaprReleaseName, selection.Exists, []string{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
hasReleaseNamespaceLabel, err := labels.NewRequirement(DaprReleaseNamespace, selection.Exists, []string{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
selector := labels.NewSelector().
|
||||||
|
Add(*hasReleaseNameLabel).
|
||||||
|
Add(*hasReleaseNamespaceLabel)
|
||||||
|
|
||||||
|
return selector, nil
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"net/http/pprof"
|
"net/http/pprof"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/dapr-sandbox/dapr-kubernetes-operator/pkg/logger"
|
"github.com/dapr-sandbox/dapr-kubernetes-operator/pkg/logger"
|
||||||
|
@ -48,6 +50,9 @@ func Start(options Options, setup func(manager.Manager, Options) error) error {
|
||||||
Metrics: metricsserver.Options{
|
Metrics: metricsserver.Options{
|
||||||
BindAddress: options.MetricsAddr,
|
BindAddress: options.MetricsAddr,
|
||||||
},
|
},
|
||||||
|
Cache: cache.Options{
|
||||||
|
ByObject: options.WatchSelectors,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package predicates
|
package predicates
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
||||||
"github.com/wI2L/jsondiff"
|
"github.com/wI2L/jsondiff"
|
||||||
|
@ -93,18 +92,13 @@ func (p DependentPredicate) Update(e event.UpdateEvent) bool {
|
||||||
log.Error(err, "failed to generate diff")
|
log.Error(err, "failed to generate diff")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
d, err := json.Marshal(patch)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err, "failed to generate diff")
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Reconciling due to dependent resource update",
|
log.Info("Reconciling due to dependent resource update",
|
||||||
"name", newObj.GetName(),
|
"name", newObj.GetName(),
|
||||||
"namespace", newObj.GetNamespace(),
|
"namespace", newObj.GetNamespace(),
|
||||||
"apiVersion", newObj.GroupVersionKind().GroupVersion(),
|
"apiVersion", newObj.GroupVersionKind().GroupVersion(),
|
||||||
"kind", newObj.GroupVersionKind().Kind,
|
"kind", newObj.GroupVersionKind().Kind,
|
||||||
"diff", string(d))
|
"diff", patch.String())
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,10 @@
|
||||||
package controller
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
rtcache "sigs.k8s.io/controller-runtime/pkg/cache"
|
||||||
|
rtclient "sigs.k8s.io/controller-runtime/pkg/client"
|
||||||
|
)
|
||||||
|
|
||||||
type ClusterType string
|
type ClusterType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -15,4 +20,5 @@ type Options struct {
|
||||||
LeaderElectionNamespace string
|
LeaderElectionNamespace string
|
||||||
EnableLeaderElection bool
|
EnableLeaderElection bool
|
||||||
ReleaseLeaderElectionOnCancel bool
|
ReleaseLeaderElectionOnCancel bool
|
||||||
|
WatchSelectors map[rtclient.Object]rtcache.ByObject
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,8 @@ package resources
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
|
||||||
"github.com/dapr-sandbox/dapr-kubernetes-operator/pkg/pointer"
|
"github.com/dapr-sandbox/dapr-kubernetes-operator/pkg/pointer"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
@ -67,3 +69,14 @@ func Ref(obj *unstructured.Unstructured) string {
|
||||||
name,
|
name,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UnstructuredFor(group string, version string, kind string) *unstructured.Unstructured {
|
||||||
|
u := unstructured.Unstructured{}
|
||||||
|
u.SetGroupVersionKind(schema.GroupVersionKind{
|
||||||
|
Kind: kind,
|
||||||
|
Group: group,
|
||||||
|
Version: version,
|
||||||
|
})
|
||||||
|
|
||||||
|
return &u
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue