diff --git a/pkg/controllers/status/cluster_status_controller.go b/pkg/controllers/status/cluster_status_controller.go index 0bc801788..09676b24b 100644 --- a/pkg/controllers/status/cluster_status_controller.go +++ b/pkg/controllers/status/cluster_status_controller.go @@ -570,7 +570,13 @@ func getAllocatedResource(podList []*corev1.Pod) corev1.ResourceList { } func getNodeAvailable(allocatable corev1.ResourceList, podResources *util.Resource) corev1.ResourceList { + if podResources == nil { + return allocatable + } allocatedResourceList := podResources.ResourceList() + if allocatedResourceList == nil { + return allocatable + } allowedPodNumber := allocatable.Pods().Value() - allocatedResourceList.Pods().Value() // When too many pods have been created, scheduling will fail so that the allocating pods number may be huge. // If allowedPodNumber is less than or equal to 0, we don't allow more pods to be created. diff --git a/pkg/util/fedinformer/transform.go b/pkg/util/fedinformer/transform.go new file mode 100644 index 000000000..5d9cbcc19 --- /dev/null +++ b/pkg/util/fedinformer/transform.go @@ -0,0 +1,73 @@ +package fedinformer + +import ( + "fmt" + "reflect" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// StripUnusedFields is the transform function for shared informers, +// it removes unused fields from objects before they are stored in the cache to save memory. +func StripUnusedFields(obj interface{}) (interface{}, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + // shouldn't happen + return obj, nil + } + // ManagedFields is large and we never use it + accessor.SetManagedFields(nil) + return obj, nil +} + +// NodeTransformFunc is the dedicated transform function for Node objects. +// It cleans up some parts of the object before it will be put into the controller +// cache to reduce memory usage. +// +// Note: this function removes most of the fields, please make sure your controller +// doesn't care for the removed fields, especially when use in shared informers. +func NodeTransformFunc(obj interface{}) (interface{}, error) { + node, ok := obj.(*corev1.Node) + if !ok { + return obj, fmt.Errorf("expect resource Node but got %v", reflect.TypeOf(obj)) + } + + aggregatedNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: node.Name, + }, + Status: corev1.NodeStatus{ + Allocatable: node.Status.Allocatable, + Conditions: node.Status.Conditions, + }, + } + return aggregatedNode, nil +} + +// PodTransformFunc is the dedicated transform function for Pod objects. +// It cleans up some parts of the object before it will be put into the controller +// cache to reduce memory usage. +// +// Note: this function removes most of the fields, please make sure your controller +// doesn't care for the removed fields, especially when use in shared informers. +func PodTransformFunc(obj interface{}) (interface{}, error) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return obj, fmt.Errorf("expect resource Pod but got %v", reflect.TypeOf(obj)) + } + + aggregatedPod := &corev1.Pod{ + Spec: corev1.PodSpec{ + NodeName: pod.Spec.NodeName, + InitContainers: pod.Spec.InitContainers, + Containers: pod.Spec.Containers, + Overhead: pod.Spec.Overhead, + }, + Status: corev1.PodStatus{ + Phase: pod.Status.Phase, + }, + } + return aggregatedPod, nil +} diff --git a/pkg/util/fedinformer/transform_test.go b/pkg/util/fedinformer/transform_test.go new file mode 100644 index 000000000..4d86f16ff --- /dev/null +++ b/pkg/util/fedinformer/transform_test.go @@ -0,0 +1,236 @@ +package fedinformer + +import ( + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" +) + +func TestStripUnusedFields(t *testing.T) { + tests := []struct { + name string + obj interface{} + want interface{} + }{ + { + name: "transform pods", + obj: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + ManagedFields: []metav1.ManagedFieldsEntry{ + { + Manager: "whatever", + }, + }, + }, + }, + want: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + }, + }, + }, + { + name: "transform works", + obj: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + ManagedFields: []metav1.ManagedFieldsEntry{ + { + Manager: "whatever", + }, + }, + }, + }, + want: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := StripUnusedFields(tt.obj) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("StripUnusedFields: got %v, want %v", got, tt.want) + } + }) + } +} + +func TestNodeTransformFunc(t *testing.T) { + tests := []struct { + name string + obj interface{} + want interface{} + }{ + { + name: "transform nodes without status", + obj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + ManagedFields: []metav1.ManagedFieldsEntry{ + { + Manager: "whatever", + }, + }, + }, + }, + want: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + }, + }, + { + name: "transform nodes with status", + obj: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI), + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + corev1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI), + }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.NodeMemoryPressure, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + want: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI), + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + corev1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI), + }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + { + Type: corev1.NodeMemoryPressure, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := NodeTransformFunc(tt.obj) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NodeTransformFunc: got %v, want %v", got, tt.want) + } + }) + } +} + +func TestPodTransformFunc(t *testing.T) { + tests := []struct { + name string + obj interface{} + want interface{} + }{ + { + name: "transform pods without needed", + obj: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + ManagedFields: []metav1.ManagedFieldsEntry{ + { + Manager: "whatever", + }, + }, + }, + }, + want: &corev1.Pod{}, + }, + { + name: "transform pods with needed", + obj: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "foo", + Name: "bar", + }, + Spec: corev1.PodSpec{ + NodeName: "test", + InitContainers: []corev1.Container{{Name: "test"}}, + Containers: []corev1.Container{{Name: "test"}}, + Overhead: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI), + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + corev1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI), + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + want: &corev1.Pod{ + Spec: corev1.PodSpec{ + NodeName: "test", + InitContainers: []corev1.Container{{Name: "test"}}, + Containers: []corev1.Container{{Name: "test"}}, + Overhead: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI), + corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI), + corev1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI), + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := PodTransformFunc(tt.obj) + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("PodTransformFunc: got %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/util/fedinformer/typedmanager/multi-cluster-manager.go b/pkg/util/fedinformer/typedmanager/multi-cluster-manager.go index 132745525..86763a211 100644 --- a/pkg/util/fedinformer/typedmanager/multi-cluster-manager.go +++ b/pkg/util/fedinformer/typedmanager/multi-cluster-manager.go @@ -6,6 +6,9 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "github.com/karmada-io/karmada/pkg/util/fedinformer" ) var ( @@ -22,7 +25,11 @@ func init() { // GetInstance returns a shared MultiClusterInformerManager instance. func GetInstance() MultiClusterInformerManager { once.Do(func() { - instance = NewMultiClusterInformerManager(stopCh) + transforms := map[schema.GroupVersionResource]cache.TransformFunc{ + nodeGVR: fedinformer.NodeTransformFunc, + podGVR: fedinformer.PodTransformFunc, + } + instance = NewMultiClusterInformerManager(stopCh, transforms) }) return instance } @@ -64,17 +71,19 @@ type MultiClusterInformerManager interface { } // NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl. -func NewMultiClusterInformerManager(stopCh <-chan struct{}) MultiClusterInformerManager { +func NewMultiClusterInformerManager(stopCh <-chan struct{}, transformFuncs map[schema.GroupVersionResource]cache.TransformFunc) MultiClusterInformerManager { return &multiClusterInformerManagerImpl{ - managers: make(map[string]SingleClusterInformerManager), - stopCh: stopCh, + managers: make(map[string]SingleClusterInformerManager), + transformFuncs: transformFuncs, + stopCh: stopCh, } } type multiClusterInformerManagerImpl struct { - managers map[string]SingleClusterInformerManager - stopCh <-chan struct{} - lock sync.RWMutex + managers map[string]SingleClusterInformerManager + transformFuncs map[schema.GroupVersionResource]cache.TransformFunc + stopCh <-chan struct{} + lock sync.RWMutex } func (m *multiClusterInformerManagerImpl) getManager(cluster string) (SingleClusterInformerManager, bool) { @@ -92,7 +101,7 @@ func (m *multiClusterInformerManagerImpl) ForCluster(cluster string, client kube m.lock.Lock() defer m.lock.Unlock() - manager := NewSingleClusterInformerManager(client, defaultResync, m.stopCh) + manager := NewSingleClusterInformerManager(client, defaultResync, m.stopCh, m.transformFuncs) m.managers[cluster] = manager return manager } diff --git a/pkg/util/fedinformer/typedmanager/single-cluster-manager.go b/pkg/util/fedinformer/typedmanager/single-cluster-manager.go index 47d11a607..a627c99bf 100644 --- a/pkg/util/fedinformer/typedmanager/single-cluster-manager.go +++ b/pkg/util/fedinformer/typedmanager/single-cluster-manager.go @@ -69,15 +69,18 @@ type SingleClusterInformerManager interface { // NewSingleClusterInformerManager constructs a new instance of singleClusterInformerManagerImpl. // defaultResync with value '0' means no re-sync. -func NewSingleClusterInformerManager(client kubernetes.Interface, defaultResync time.Duration, parentCh <-chan struct{}) SingleClusterInformerManager { +func NewSingleClusterInformerManager(client kubernetes.Interface, defaultResync time.Duration, parentCh <-chan struct{}, transformFuncs map[schema.GroupVersionResource]cache.TransformFunc) SingleClusterInformerManager { ctx, cancel := util.ContextForChannel(parentCh) return &singleClusterInformerManagerImpl{ - informerFactory: informers.NewSharedInformerFactory(client, defaultResync), - handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler), - syncedInformers: make(map[schema.GroupVersionResource]struct{}), - ctx: ctx, - cancel: cancel, - client: client, + informerFactory: informers.NewSharedInformerFactory(client, defaultResync), + handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler), + syncedInformers: make(map[schema.GroupVersionResource]struct{}), + informers: make(map[schema.GroupVersionResource]struct{}), + startedInformers: make(map[schema.GroupVersionResource]struct{}), + transformFuncs: transformFuncs, + ctx: ctx, + cancel: cancel, + client: client, } } @@ -89,8 +92,14 @@ type singleClusterInformerManagerImpl struct { syncedInformers map[schema.GroupVersionResource]struct{} + informers map[schema.GroupVersionResource]struct{} + + startedInformers map[schema.GroupVersionResource]struct{} + handlers map[schema.GroupVersionResource][]cache.ResourceEventHandler + transformFuncs map[schema.GroupVersionResource]cache.TransformFunc + client kubernetes.Interface lock sync.RWMutex @@ -107,6 +116,21 @@ func (s *singleClusterInformerManagerImpl) ForResource(resource schema.GroupVers return err } + s.lock.Lock() + if _, exist := s.informers[resource]; !exist { + s.informers[resource] = struct{}{} + } + s.lock.Unlock() + + s.lock.RLock() + if resourceTransformFunc, ok := s.transformFuncs[resource]; ok && !s.isInformerStarted(resource) { + err = resourceInformer.Informer().SetTransform(resourceTransformFunc) + if err != nil { + return err + } + } + s.lock.RUnlock() + resourceInformer.Informer().AddEventHandler(handler) s.appendHandler(resource, handler) return nil @@ -120,6 +144,11 @@ func (s *singleClusterInformerManagerImpl) IsInformerSynced(resource schema.Grou return exist } +func (s *singleClusterInformerManagerImpl) isInformerStarted(resource schema.GroupVersionResource) bool { + _, exist := s.startedInformers[resource] + return exist +} + func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool { s.lock.RLock() defer s.lock.RUnlock() @@ -139,6 +168,26 @@ func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupV } func (s *singleClusterInformerManagerImpl) Lister(resource schema.GroupVersionResource) (interface{}, error) { + resourceInformer, err := s.informerFactory.ForResource(resource) + if err != nil { + return nil, err + } + + s.lock.Lock() + if _, exist := s.informers[resource]; !exist { + s.informers[resource] = struct{}{} + } + s.lock.Unlock() + + s.lock.RLock() + if resourceTransformFunc, ok := s.transformFuncs[resource]; ok && !s.isInformerStarted(resource) { + err = resourceInformer.Informer().SetTransform(resourceTransformFunc) + if err != nil { + return nil, err + } + } + s.lock.RUnlock() + if resource == nodeGVR { return s.informerFactory.Core().V1().Nodes().Lister(), nil } @@ -146,10 +195,6 @@ func (s *singleClusterInformerManagerImpl) Lister(resource schema.GroupVersionRe return s.informerFactory.Core().V1().Pods().Lister(), nil } - resourceInformer, err := s.informerFactory.ForResource(resource) - if err != nil { - return nil, err - } return resourceInformer.Lister(), nil } @@ -165,7 +210,14 @@ func (s *singleClusterInformerManagerImpl) appendHandler(resource schema.GroupVe } func (s *singleClusterInformerManagerImpl) Start() { + s.lock.Lock() s.informerFactory.Start(s.ctx.Done()) + for resource := range s.informers { + if _, exist := s.startedInformers[resource]; !exist { + s.startedInformers[resource] = struct{}{} + } + } + s.lock.Unlock() } func (s *singleClusterInformerManagerImpl) Stop() {