Merge pull request #2360 from Charlie17Li/toTypedManager

update the dynamic informer in cluster status controller with typed informer
This commit is contained in:
karmada-bot 2022-08-11 14:11:08 +08:00 committed by GitHub
commit bac3eb233e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 61 deletions

View File

@ -35,6 +35,7 @@ import (
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedManager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
@ -241,7 +242,8 @@ func startClusterStatusController(ctx controllerscontext.Context) (bool, error)
KubeClient: kubeclientset.NewForConfigOrDie(ctx.Mgr.GetConfig()),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: helper.NewClusterPredicateOnAgent(ctx.Opts.ClusterName),
InformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedManager.GetInstance(),
GenericInformerManager: genericmanager.GetInstance(),
StopChan: ctx.StopChan,
ClusterClientSetFunc: util.NewClusterClientSetForAgent,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,

View File

@ -48,6 +48,7 @@ import (
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedManager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
@ -254,7 +255,8 @@ func startClusterStatusController(ctx controllerscontext.Context) (enabled bool,
KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()),
EventRecorder: mgr.GetEventRecorderFor(status.ControllerName),
PredicateFunc: clusterPredicateFunc,
InformerManager: genericmanager.GetInstance(),
TypedInformerManager: typedManager.GetInstance(),
GenericInformerManager: genericmanager.GetInstance(),
StopChan: stopChan,
ClusterClientSetFunc: util.NewClusterClientSet,
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,

View File

@ -15,9 +15,9 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
clientset "k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/component-helpers/apimachinery/lease"
@ -34,6 +34,7 @@ import (
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/typedManager"
"github.com/karmada-io/karmada/pkg/util/helper"
)
@ -60,7 +61,8 @@ type ClusterStatusController struct {
KubeClient clientset.Interface
EventRecorder record.EventRecorder
PredicateFunc predicate.Predicate
InformerManager genericmanager.MultiClusterInformerManager
TypedInformerManager typedManager.MultiClusterInformerManager
GenericInformerManager genericmanager.MultiClusterInformerManager
StopChan <-chan struct{}
ClusterClientSetFunc func(string, client.Client, *util.ClientOption) (*util.ClusterClient, error)
ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
@ -98,7 +100,8 @@ func (c *ClusterStatusController) Reconcile(ctx context.Context, req controllerr
if err := c.Client.Get(context.TODO(), req.NamespacedName, cluster); err != nil {
// The resource may no longer exist, in which case we stop the informer.
if apierrors.IsNotFound(err) {
c.InformerManager.Stop(req.NamespacedName.Name)
c.GenericInformerManager.Stop(req.NamespacedName.Name)
c.TypedInformerManager.Stop(req.NamespacedName.Name)
c.clusterConditionCache.delete(req.Name)
return controllerruntime.Result{}, nil
}
@ -147,7 +150,8 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
if !online && readyCondition.Status != metav1.ConditionTrue {
klog.V(2).Infof("Cluster(%s) still offline after %s, ensuring offline is set.",
cluster.Name, c.ClusterFailureThreshold.Duration)
c.InformerManager.Stop(cluster.Name)
c.GenericInformerManager.Stop(cluster.Name)
c.TypedInformerManager.Stop(cluster.Name)
setTransitionTime(cluster.Status.Conditions, readyCondition)
meta.SetStatusCondition(&currentClusterStatus.Conditions, *readyCondition)
return c.updateStatusIfNeeded(cluster, currentClusterStatus)
@ -156,9 +160,12 @@ func (c *ClusterStatusController) syncClusterStatus(cluster *clusterv1alpha1.Clu
// skip collecting cluster status if not ready
if online && healthy && readyCondition.Status == metav1.ConditionTrue {
// get or create informer for pods and nodes in member cluster
clusterInformerManager, err := c.buildInformerForCluster(cluster)
clusterInformerManager, err := c.buildInformerForCluster(clusterClient)
if err != nil {
klog.Errorf("Failed to get or create informer for Cluster %s. Error: %v.", cluster.GetName(), err)
// in large-scale clusters, the timeout may occur.
// if clusterInformerManager fails to be built, should be returned, otherwise, it may cause a nil pointer
return controllerruntime.Result{Requeue: true}, err
}
if cluster.Spec.SyncMode == clusterv1alpha1.Pull {
@ -239,15 +246,23 @@ func (c *ClusterStatusController) updateStatusIfNeeded(cluster *clusterv1alpha1.
// buildInformerForCluster builds informer manager for cluster if it doesn't exist, then constructs informers for node
// and pod and start it. If the informer manager exist, return it.
func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alpha1.Cluster) (genericmanager.SingleClusterInformerManager, error) {
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name)
if singleClusterInformerManager == nil {
clusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client)
func (c *ClusterStatusController) buildInformerForCluster(clusterClient *util.ClusterClient) (typedManager.SingleClusterInformerManager, error) {
// cluster-status-controller will initialize the generic informer manager
// mainly because when the member cluster is joined, the dynamic informer required by the member cluster
// to distribute resources is prepared in advance
// in that case execution-controller can distribute resources without waiting.
if c.GenericInformerManager.GetSingleClusterManager(clusterClient.ClusterName) == nil {
dynamicClient, err := c.ClusterDynamicClientSetFunc(clusterClient.ClusterName, c.Client)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name)
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterClient.ClusterName)
return nil, err
}
singleClusterInformerManager = c.InformerManager.ForCluster(clusterClient.ClusterName, clusterClient.DynamicClientSet, 0)
c.GenericInformerManager.ForCluster(clusterClient.ClusterName, dynamicClient.DynamicClientSet, 0)
}
singleClusterInformerManager := c.TypedInformerManager.GetSingleClusterManager(clusterClient.ClusterName)
if singleClusterInformerManager == nil {
singleClusterInformerManager = c.TypedInformerManager.ForCluster(clusterClient.ClusterName, clusterClient.KubeClient, 0)
}
gvrs := []schema.GroupVersionResource{nodeGVR, podGVR}
@ -257,19 +272,22 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alph
for _, gvr := range gvrs {
if !singleClusterInformerManager.IsInformerSynced(gvr) {
allSynced = false
singleClusterInformerManager.Lister(gvr)
if _, err := singleClusterInformerManager.Lister(gvr); err != nil {
klog.Errorf("Failed to get the lister for gvr %s: %v", gvr.String(), err)
}
}
}
if allSynced {
return singleClusterInformerManager, nil
}
c.InformerManager.Start(cluster.Name)
c.TypedInformerManager.Start(clusterClient.ClusterName)
c.GenericInformerManager.Start(clusterClient.ClusterName)
if err := func() error {
synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, c.ClusterCacheSyncTimeout.Duration)
synced := c.TypedInformerManager.WaitForCacheSyncWithTimeout(clusterClient.ClusterName, c.ClusterCacheSyncTimeout.Duration)
if synced == nil {
return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name)
return fmt.Errorf("no informerFactory for cluster %s exist", clusterClient.ClusterName)
}
for _, gvr := range gvrs {
if !synced[gvr] {
@ -278,8 +296,8 @@ func (c *ClusterStatusController) buildInformerForCluster(cluster *clusterv1alph
}
return nil
}(); err != nil {
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err)
c.InformerManager.Stop(cluster.Name)
klog.Errorf("Failed to sync cache for cluster: %s, error: %v", clusterClient.ClusterName, err)
c.TypedInformerManager.Stop(clusterClient.ClusterName)
return nil, err
}
@ -404,35 +422,33 @@ func getAPIEnablements(clusterClient *util.ClusterClient) ([]clusterv1alpha1.API
}
// listPods returns the Pod list from the informerManager cache.
func listPods(informerManager genericmanager.SingleClusterInformerManager) ([]*corev1.Pod, error) {
podLister := informerManager.Lister(podGVR)
podList, err := podLister.List(labels.Everything())
if err != nil {
return nil, err
}
pods, err := convertObjectsToPods(podList)
func listPods(informerManager typedManager.SingleClusterInformerManager) ([]*corev1.Pod, error) {
podInterface, err := informerManager.Lister(podGVR)
if err != nil {
return nil, err
}
return pods, nil
podLister, ok := podInterface.(v1.PodLister)
if !ok {
return nil, fmt.Errorf("failed to convert interface to PodLister")
}
return podLister.List(labels.Everything())
}
// listNodes returns the Node list from the informerManager cache.
func listNodes(informerManager genericmanager.SingleClusterInformerManager) ([]*corev1.Node, error) {
nodeLister := informerManager.Lister(nodeGVR)
nodeList, err := nodeLister.List(labels.Everything())
if err != nil {
return nil, err
}
nodes, err := convertObjectsToNodes(nodeList)
func listNodes(informerManager typedManager.SingleClusterInformerManager) ([]*corev1.Node, error) {
nodeInterface, err := informerManager.Lister(nodeGVR)
if err != nil {
return nil, err
}
return nodes, nil
nodeLister, ok := nodeInterface.(v1.NodeLister)
if !ok {
return nil, fmt.Errorf("failed to convert interface to NodeLister")
}
return nodeLister.List(labels.Everything())
}
func getNodeSummary(nodes []*corev1.Node) *clusterv1alpha1.NodeSummary {
@ -465,30 +481,6 @@ func getResourceSummary(nodes []*corev1.Node, pods []*corev1.Pod) *clusterv1alph
return resourceSummary
}
func convertObjectsToNodes(nodeList []runtime.Object) ([]*corev1.Node, error) {
nodes := make([]*corev1.Node, 0, len(nodeList))
for _, obj := range nodeList {
node := &corev1.Node{}
if err := helper.ConvertToTypedObject(obj, node); err != nil {
return nil, fmt.Errorf("failed to convert unstructured to typed object: %v", err)
}
nodes = append(nodes, node)
}
return nodes, nil
}
func convertObjectsToPods(podList []runtime.Object) ([]*corev1.Pod, error) {
pods := make([]*corev1.Pod, 0, len(podList))
for _, obj := range podList {
pod := &corev1.Pod{}
if err := helper.ConvertToTypedObject(obj, pod); err != nil {
return nil, fmt.Errorf("failed to convert unstructured to typed object: %v", err)
}
pods = append(pods, pod)
}
return pods, nil
}
func getClusterAllocatable(nodeList []*corev1.Node) (allocatable corev1.ResourceList) {
allocatable = make(corev1.ResourceList)
for _, node := range nodeList {