fetch control plane workload from cache instead of api server
Signed-off-by: dddddai <dddwq@foxmail.com>
This commit is contained in:
parent
2b58b98956
commit
65071d1e64
|
@ -120,10 +120,11 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
skippedPropagatingNamespaces[ns] = struct{}{}
|
||||
}
|
||||
|
||||
controlPlaneInformerManager := informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan)
|
||||
resourceDetector := &detector.ResourceDetector{
|
||||
DiscoveryClientSet: discoverClientSet,
|
||||
Client: mgr.GetClient(),
|
||||
InformerManager: informermanager.NewSingleClusterInformerManager(dynamicClientSet, 0, stopChan),
|
||||
InformerManager: controlPlaneInformerManager,
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
DynamicClient: dynamicClientSet,
|
||||
SkippedResourceConfig: skippedResourceConfig,
|
||||
|
@ -183,10 +184,11 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
}
|
||||
|
||||
hpaController := &hpa.HorizontalPodAutoscalerController{
|
||||
Client: mgr.GetClient(),
|
||||
DynamicClient: dynamicClientSet,
|
||||
EventRecorder: mgr.GetEventRecorderFor(hpa.ControllerName),
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
Client: mgr.GetClient(),
|
||||
DynamicClient: dynamicClientSet,
|
||||
EventRecorder: mgr.GetEventRecorderFor(hpa.ControllerName),
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
InformerManager: controlPlaneInformerManager,
|
||||
}
|
||||
if err := hpaController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup hpa controller: %v", err)
|
||||
|
@ -205,6 +207,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
EventRecorder: mgr.GetEventRecorderFor(binding.ControllerName),
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
OverrideManager: overrideManager,
|
||||
InformerManager: controlPlaneInformerManager,
|
||||
}
|
||||
if err := bindingController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup binding controller: %v", err)
|
||||
|
@ -216,6 +219,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
EventRecorder: mgr.GetEventRecorderFor(binding.ClusterResourceBindingControllerName),
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
OverrideManager: overrideManager,
|
||||
InformerManager: controlPlaneInformerManager,
|
||||
}
|
||||
if err := clusterResourceBindingController.SetupWithManager(mgr); err != nil {
|
||||
klog.Fatalf("Failed to setup cluster resource binding controller: %v", err)
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/overridemanager"
|
||||
)
|
||||
|
||||
|
@ -33,8 +34,9 @@ const ControllerName = "binding-controller"
|
|||
|
||||
// ResourceBindingController is to sync ResourceBinding.
|
||||
type ResourceBindingController struct {
|
||||
client.Client // used to operate ResourceBinding resources.
|
||||
DynamicClient dynamic.Interface // used to fetch arbitrary resources.
|
||||
client.Client // used to operate ClusterResourceBinding resources.
|
||||
DynamicClient dynamic.Interface // used to fetch arbitrary resources from api server.
|
||||
InformerManager informermanager.SingleClusterInformerManager // used to fetch arbitrary resources from cache.
|
||||
EventRecorder record.EventRecorder
|
||||
RESTMapper meta.RESTMapper
|
||||
OverrideManager overridemanager.OverrideManager
|
||||
|
@ -109,7 +111,7 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBi
|
|||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource)
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.",
|
||||
binding.GetNamespace(), binding.GetName(), err)
|
||||
|
@ -190,7 +192,7 @@ func (c *ResourceBindingController) newOverridePolicyFunc() handler.MapFunc {
|
|||
|
||||
var requests []reconcile.Request
|
||||
for _, binding := range bindingList.Items {
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource)
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.", binding.Namespace, binding.Name, err)
|
||||
return nil
|
||||
|
@ -219,7 +221,7 @@ func (c *ResourceBindingController) newReplicaSchedulingPolicyFunc() handler.Map
|
|||
|
||||
var requests []reconcile.Request
|
||||
for _, binding := range bindingList.Items {
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource)
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.", binding.Namespace, binding.Name, err)
|
||||
return nil
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/overridemanager"
|
||||
)
|
||||
|
||||
|
@ -33,8 +34,9 @@ const ClusterResourceBindingControllerName = "cluster-resource-binding-controlle
|
|||
|
||||
// ClusterResourceBindingController is to sync ClusterResourceBinding.
|
||||
type ClusterResourceBindingController struct {
|
||||
client.Client // used to operate ClusterResourceBinding resources.
|
||||
DynamicClient dynamic.Interface // used to fetch arbitrary resources.
|
||||
client.Client // used to operate ClusterResourceBinding resources.
|
||||
DynamicClient dynamic.Interface // used to fetch arbitrary resources from api server.
|
||||
InformerManager informermanager.SingleClusterInformerManager // used to fetch arbitrary resources from cache.
|
||||
EventRecorder record.EventRecorder
|
||||
RESTMapper meta.RESTMapper
|
||||
OverrideManager overridemanager.OverrideManager
|
||||
|
@ -106,7 +108,7 @@ func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.Clu
|
|||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource)
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to fetch workload for clusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
|
@ -182,7 +184,7 @@ func (c *ClusterResourceBindingController) newOverridePolicyFunc() handler.MapFu
|
|||
|
||||
var requests []reconcile.Request
|
||||
for _, binding := range bindingList.Items {
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource)
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to fetch workload for clusterResourceBinding(%s). Error: %v.", binding.Name, err)
|
||||
return nil
|
||||
|
@ -211,7 +213,7 @@ func (c *ClusterResourceBindingController) newReplicaSchedulingPolicyFunc() hand
|
|||
|
||||
var requests []reconcile.Request
|
||||
for _, binding := range bindingList.Items {
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource)
|
||||
workload, err := helper.FetchWorkload(c.DynamicClient, c.InformerManager, c.RESTMapper, binding.Spec.Resource)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to fetch workload for clusterResourceBinding(%s). Error: %v.", binding.Name, err)
|
||||
return nil
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
||||
)
|
||||
|
@ -30,10 +31,11 @@ const ControllerName = "hpa-controller"
|
|||
|
||||
// HorizontalPodAutoscalerController is to sync HorizontalPodAutoscaler.
|
||||
type HorizontalPodAutoscalerController struct {
|
||||
client.Client // used to operate HorizontalPodAutoscaler resources.
|
||||
DynamicClient dynamic.Interface // used to fetch arbitrary resources.
|
||||
EventRecorder record.EventRecorder
|
||||
RESTMapper meta.RESTMapper
|
||||
client.Client // used to operate HorizontalPodAutoscaler resources.
|
||||
DynamicClient dynamic.Interface // used to fetch arbitrary resources from api server.
|
||||
InformerManager informermanager.SingleClusterInformerManager // used to fetch arbitrary resources from cache.
|
||||
EventRecorder record.EventRecorder
|
||||
RESTMapper meta.RESTMapper
|
||||
}
|
||||
|
||||
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
||||
|
@ -121,10 +123,25 @@ func (c *HorizontalPodAutoscalerController) getTargetPlacement(objRef autoscalin
|
|||
}
|
||||
|
||||
// Kind in CrossVersionObjectReference is not equal to the kind in bindingName, need to get obj from cache.
|
||||
unstructuredWorkLoad, err := c.DynamicClient.Resource(dynamicResource).Namespace(namespace).Get(context.TODO(), objRef.Name, metav1.GetOptions{})
|
||||
workload, err := c.InformerManager.Lister(dynamicResource).ByNamespace(namespace).Get(objRef.Name)
|
||||
if err != nil {
|
||||
// fall back to call api server in case the cache has not been synchronized yet
|
||||
klog.Warningf("Failed to get workload from cache, kind: %s, namespace: %s, name: %s. Error: %v. Fall back to call api server",
|
||||
objRef.Kind, namespace, objRef.Name, err)
|
||||
workload, err = c.DynamicClient.Resource(dynamicResource).Namespace(namespace).Get(context.TODO(),
|
||||
objRef.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get workload from api server, kind: %s, namespace: %s, name: %s. Error: %v",
|
||||
objRef.Kind, namespace, objRef.Name, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to transform object(%s/%s): %v", namespace, objRef.Name, err)
|
||||
return nil, err
|
||||
}
|
||||
unstructuredWorkLoad := unstructured.Unstructured{Object: uncastObj}
|
||||
bindingName := names.GenerateBindingName(unstructuredWorkLoad.GetKind(), unstructuredWorkLoad.GetName())
|
||||
binding := &workv1alpha2.ResourceBinding{}
|
||||
namespacedName := types.NamespacedName{
|
||||
|
|
|
@ -1067,7 +1067,7 @@ func (d *ResourceDetector) OnClusterResourceBindingUpdate(oldObj, newObj interfa
|
|||
|
||||
// CleanupLabels removes labels from object referencing by objRef.
|
||||
func (d *ResourceDetector) CleanupLabels(objRef workv1alpha2.ObjectReference, labels ...string) error {
|
||||
workload, err := helper.FetchWorkload(d.DynamicClient, d.RESTMapper, objRef)
|
||||
workload, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, objRef)
|
||||
if err != nil {
|
||||
// do nothing if resource template not exist, it might has been removed.
|
||||
if apierrors.IsNotFound(err) {
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
@ -19,6 +20,7 @@ import (
|
|||
|
||||
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
||||
)
|
||||
|
@ -129,7 +131,8 @@ func RemoveOrphanWorks(c client.Client, works []workv1alpha1.Work) error {
|
|||
}
|
||||
|
||||
// FetchWorkload fetches the kubernetes resource to be propagated.
|
||||
func FetchWorkload(dynamicClient dynamic.Interface, restMapper meta.RESTMapper, resource workv1alpha2.ObjectReference) (*unstructured.Unstructured, error) {
|
||||
func FetchWorkload(dynamicClient dynamic.Interface, informerManager informermanager.SingleClusterInformerManager,
|
||||
restMapper meta.RESTMapper, resource workv1alpha2.ObjectReference) (*unstructured.Unstructured, error) {
|
||||
dynamicResource, err := restmapper.GetGroupVersionResource(restMapper,
|
||||
schema.FromAPIVersionAndKind(resource.APIVersion, resource.Kind))
|
||||
if err != nil {
|
||||
|
@ -138,15 +141,34 @@ func FetchWorkload(dynamicClient dynamic.Interface, restMapper meta.RESTMapper,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
workload, err := dynamicClient.Resource(dynamicResource).Namespace(resource.Namespace).Get(context.TODO(),
|
||||
resource.Name, metav1.GetOptions{})
|
||||
var workload runtime.Object
|
||||
|
||||
if len(resource.Namespace) == 0 {
|
||||
// cluster-scoped resource
|
||||
workload, err = informerManager.Lister(dynamicResource).Get(resource.Name)
|
||||
} else {
|
||||
workload, err = informerManager.Lister(dynamicResource).ByNamespace(resource.Namespace).Get(resource.Name)
|
||||
}
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get workload, kind: %s, namespace: %s, name: %s. Error: %v",
|
||||
// fall back to call api server in case the cache has not been synchronized yet
|
||||
klog.Warningf("Failed to get workload from cache, kind: %s, namespace: %s, name: %s. Error: %v. Fall back to call api server",
|
||||
resource.Kind, resource.Namespace, resource.Name, err)
|
||||
workload, err = dynamicClient.Resource(dynamicResource).Namespace(resource.Namespace).Get(context.TODO(),
|
||||
resource.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get workload from api server, kind: %s, namespace: %s, name: %s. Error: %v",
|
||||
resource.Kind, resource.Namespace, resource.Name, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
unstructuredWorkLoad, err := runtime.DefaultUnstructuredConverter.ToUnstructured(workload)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to transform object(%s/%s): %v", resource.Namespace, resource.Name, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return workload, nil
|
||||
return &unstructured.Unstructured{Object: unstructuredWorkLoad}, nil
|
||||
}
|
||||
|
||||
// GetClusterResourceBindings returns a ClusterResourceBindingList by labels.
|
||||
|
|
Loading…
Reference in New Issue