Merge pull request #2383 from Poor12/improve-informer

adopt transform func to reduce memory usage
This commit is contained in:
karmada-bot 2022-08-30 20:38:59 +08:00 committed by GitHub
commit 90a51fe76f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 395 additions and 19 deletions

View File

@ -570,7 +570,13 @@ func getAllocatedResource(podList []*corev1.Pod) corev1.ResourceList {
}
func getNodeAvailable(allocatable corev1.ResourceList, podResources *util.Resource) corev1.ResourceList {
if podResources == nil {
return allocatable
}
allocatedResourceList := podResources.ResourceList()
if allocatedResourceList == nil {
return allocatable
}
allowedPodNumber := allocatable.Pods().Value() - allocatedResourceList.Pods().Value()
// When too many pods have been created, scheduling will fail so that the allocating pods number may be huge.
// If allowedPodNumber is less than or equal to 0, we don't allow more pods to be created.

View File

@ -0,0 +1,73 @@
package fedinformer
import (
"fmt"
"reflect"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// StripUnusedFields is the transform function for shared informers,
// it removes unused fields from objects before they are stored in the cache to save memory.
func StripUnusedFields(obj interface{}) (interface{}, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
// shouldn't happen
return obj, nil
}
// ManagedFields is large and we never use it
accessor.SetManagedFields(nil)
return obj, nil
}
// NodeTransformFunc is the dedicated transform function for Node objects.
// It cleans up some parts of the object before it will be put into the controller
// cache to reduce memory usage.
//
// Note: this function removes most of the fields, please make sure your controller
// doesn't care for the removed fields, especially when use in shared informers.
func NodeTransformFunc(obj interface{}) (interface{}, error) {
node, ok := obj.(*corev1.Node)
if !ok {
return obj, fmt.Errorf("expect resource Node but got %v", reflect.TypeOf(obj))
}
aggregatedNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: node.Name,
},
Status: corev1.NodeStatus{
Allocatable: node.Status.Allocatable,
Conditions: node.Status.Conditions,
},
}
return aggregatedNode, nil
}
// PodTransformFunc is the dedicated transform function for Pod objects.
// It cleans up some parts of the object before it will be put into the controller
// cache to reduce memory usage.
//
// Note: this function removes most of the fields, please make sure your controller
// doesn't care for the removed fields, especially when use in shared informers.
func PodTransformFunc(obj interface{}) (interface{}, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return obj, fmt.Errorf("expect resource Pod but got %v", reflect.TypeOf(obj))
}
aggregatedPod := &corev1.Pod{
Spec: corev1.PodSpec{
NodeName: pod.Spec.NodeName,
InitContainers: pod.Spec.InitContainers,
Containers: pod.Spec.Containers,
Overhead: pod.Spec.Overhead,
},
Status: corev1.PodStatus{
Phase: pod.Status.Phase,
},
}
return aggregatedPod, nil
}

View File

@ -0,0 +1,236 @@
package fedinformer
import (
"reflect"
"testing"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
)
func TestStripUnusedFields(t *testing.T) {
tests := []struct {
name string
obj interface{}
want interface{}
}{
{
name: "transform pods",
obj: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"c": "d"},
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "whatever",
},
},
},
},
want: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"c": "d"},
},
},
},
{
name: "transform works",
obj: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"c": "d"},
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "whatever",
},
},
},
},
want: &workv1alpha1.Work{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"c": "d"},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := StripUnusedFields(tt.obj)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("StripUnusedFields: got %v, want %v", got, tt.want)
}
})
}
}
func TestNodeTransformFunc(t *testing.T) {
tests := []struct {
name string
obj interface{}
want interface{}
}{
{
name: "transform nodes without status",
obj: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"c": "d"},
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "whatever",
},
},
},
},
want: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
},
},
{
name: "transform nodes with status",
obj: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI),
corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI),
corev1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
{
Type: corev1.NodeMemoryPressure,
Status: corev1.ConditionTrue,
},
},
},
},
want: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI),
corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI),
corev1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI),
},
Conditions: []corev1.NodeCondition{
{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
},
{
Type: corev1.NodeMemoryPressure,
Status: corev1.ConditionTrue,
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := NodeTransformFunc(tt.obj)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NodeTransformFunc: got %v, want %v", got, tt.want)
}
})
}
}
func TestPodTransformFunc(t *testing.T) {
tests := []struct {
name string
obj interface{}
want interface{}
}{
{
name: "transform pods without needed",
obj: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"c": "d"},
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "whatever",
},
},
},
},
want: &corev1.Pod{},
},
{
name: "transform pods with needed",
obj: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: corev1.PodSpec{
NodeName: "test",
InitContainers: []corev1.Container{{Name: "test"}},
Containers: []corev1.Container{{Name: "test"}},
Overhead: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI),
corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI),
corev1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI),
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
},
want: &corev1.Pod{
Spec: corev1.PodSpec{
NodeName: "test",
InitContainers: []corev1.Container{{Name: "test"}},
Containers: []corev1.Container{{Name: "test"}},
Overhead: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewMilliQuantity(1, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(1, resource.BinarySI),
corev1.ResourcePods: *resource.NewQuantity(1, resource.DecimalSI),
corev1.ResourceEphemeralStorage: *resource.NewQuantity(1, resource.BinarySI),
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := PodTransformFunc(tt.obj)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("PodTransformFunc: got %v, want %v", got, tt.want)
}
})
}
}

View File

@ -6,6 +6,9 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
)
var (
@ -22,7 +25,11 @@ func init() {
// GetInstance returns a shared MultiClusterInformerManager instance.
func GetInstance() MultiClusterInformerManager {
once.Do(func() {
instance = NewMultiClusterInformerManager(stopCh)
transforms := map[schema.GroupVersionResource]cache.TransformFunc{
nodeGVR: fedinformer.NodeTransformFunc,
podGVR: fedinformer.PodTransformFunc,
}
instance = NewMultiClusterInformerManager(stopCh, transforms)
})
return instance
}
@ -64,17 +71,19 @@ type MultiClusterInformerManager interface {
}
// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
func NewMultiClusterInformerManager(stopCh <-chan struct{}) MultiClusterInformerManager {
func NewMultiClusterInformerManager(stopCh <-chan struct{}, transformFuncs map[schema.GroupVersionResource]cache.TransformFunc) MultiClusterInformerManager {
return &multiClusterInformerManagerImpl{
managers: make(map[string]SingleClusterInformerManager),
stopCh: stopCh,
managers: make(map[string]SingleClusterInformerManager),
transformFuncs: transformFuncs,
stopCh: stopCh,
}
}
type multiClusterInformerManagerImpl struct {
managers map[string]SingleClusterInformerManager
stopCh <-chan struct{}
lock sync.RWMutex
managers map[string]SingleClusterInformerManager
transformFuncs map[schema.GroupVersionResource]cache.TransformFunc
stopCh <-chan struct{}
lock sync.RWMutex
}
func (m *multiClusterInformerManagerImpl) getManager(cluster string) (SingleClusterInformerManager, bool) {
@ -92,7 +101,7 @@ func (m *multiClusterInformerManagerImpl) ForCluster(cluster string, client kube
m.lock.Lock()
defer m.lock.Unlock()
manager := NewSingleClusterInformerManager(client, defaultResync, m.stopCh)
manager := NewSingleClusterInformerManager(client, defaultResync, m.stopCh, m.transformFuncs)
m.managers[cluster] = manager
return manager
}

View File

@ -69,15 +69,18 @@ type SingleClusterInformerManager interface {
// NewSingleClusterInformerManager constructs a new instance of singleClusterInformerManagerImpl.
// defaultResync with value '0' means no re-sync.
func NewSingleClusterInformerManager(client kubernetes.Interface, defaultResync time.Duration, parentCh <-chan struct{}) SingleClusterInformerManager {
func NewSingleClusterInformerManager(client kubernetes.Interface, defaultResync time.Duration, parentCh <-chan struct{}, transformFuncs map[schema.GroupVersionResource]cache.TransformFunc) SingleClusterInformerManager {
ctx, cancel := util.ContextForChannel(parentCh)
return &singleClusterInformerManagerImpl{
informerFactory: informers.NewSharedInformerFactory(client, defaultResync),
handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler),
syncedInformers: make(map[schema.GroupVersionResource]struct{}),
ctx: ctx,
cancel: cancel,
client: client,
informerFactory: informers.NewSharedInformerFactory(client, defaultResync),
handlers: make(map[schema.GroupVersionResource][]cache.ResourceEventHandler),
syncedInformers: make(map[schema.GroupVersionResource]struct{}),
informers: make(map[schema.GroupVersionResource]struct{}),
startedInformers: make(map[schema.GroupVersionResource]struct{}),
transformFuncs: transformFuncs,
ctx: ctx,
cancel: cancel,
client: client,
}
}
@ -89,8 +92,14 @@ type singleClusterInformerManagerImpl struct {
syncedInformers map[schema.GroupVersionResource]struct{}
informers map[schema.GroupVersionResource]struct{}
startedInformers map[schema.GroupVersionResource]struct{}
handlers map[schema.GroupVersionResource][]cache.ResourceEventHandler
transformFuncs map[schema.GroupVersionResource]cache.TransformFunc
client kubernetes.Interface
lock sync.RWMutex
@ -107,6 +116,21 @@ func (s *singleClusterInformerManagerImpl) ForResource(resource schema.GroupVers
return err
}
s.lock.Lock()
if _, exist := s.informers[resource]; !exist {
s.informers[resource] = struct{}{}
}
s.lock.Unlock()
s.lock.RLock()
if resourceTransformFunc, ok := s.transformFuncs[resource]; ok && !s.isInformerStarted(resource) {
err = resourceInformer.Informer().SetTransform(resourceTransformFunc)
if err != nil {
return err
}
}
s.lock.RUnlock()
resourceInformer.Informer().AddEventHandler(handler)
s.appendHandler(resource, handler)
return nil
@ -120,6 +144,11 @@ func (s *singleClusterInformerManagerImpl) IsInformerSynced(resource schema.Grou
return exist
}
func (s *singleClusterInformerManagerImpl) isInformerStarted(resource schema.GroupVersionResource) bool {
_, exist := s.startedInformers[resource]
return exist
}
func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupVersionResource, handler cache.ResourceEventHandler) bool {
s.lock.RLock()
defer s.lock.RUnlock()
@ -139,6 +168,26 @@ func (s *singleClusterInformerManagerImpl) IsHandlerExist(resource schema.GroupV
}
func (s *singleClusterInformerManagerImpl) Lister(resource schema.GroupVersionResource) (interface{}, error) {
resourceInformer, err := s.informerFactory.ForResource(resource)
if err != nil {
return nil, err
}
s.lock.Lock()
if _, exist := s.informers[resource]; !exist {
s.informers[resource] = struct{}{}
}
s.lock.Unlock()
s.lock.RLock()
if resourceTransformFunc, ok := s.transformFuncs[resource]; ok && !s.isInformerStarted(resource) {
err = resourceInformer.Informer().SetTransform(resourceTransformFunc)
if err != nil {
return nil, err
}
}
s.lock.RUnlock()
if resource == nodeGVR {
return s.informerFactory.Core().V1().Nodes().Lister(), nil
}
@ -146,10 +195,6 @@ func (s *singleClusterInformerManagerImpl) Lister(resource schema.GroupVersionRe
return s.informerFactory.Core().V1().Pods().Lister(), nil
}
resourceInformer, err := s.informerFactory.ForResource(resource)
if err != nil {
return nil, err
}
return resourceInformer.Lister(), nil
}
@ -165,7 +210,14 @@ func (s *singleClusterInformerManagerImpl) appendHandler(resource schema.GroupVe
}
func (s *singleClusterInformerManagerImpl) Start() {
s.lock.Lock()
s.informerFactory.Start(s.ctx.Done())
for resource := range s.informers {
if _, exist := s.startedInformers[resource]; !exist {
s.startedInformers[resource] = struct{}{}
}
}
s.lock.Unlock()
}
func (s *singleClusterInformerManagerImpl) Stop() {