add informermangers for status processing (#91)

* add informermangers for status processing

Co-authored-by: Hongcai Ren <renhongcai@huawei.com>
This commit is contained in:
Xianpao Chen 2020-12-28 10:14:05 +08:00 committed by GitHub
parent a2925a3f9d
commit 62a85ea0f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 211 additions and 12 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/karmada-io/karmada/pkg/controllers/policy"
"github.com/karmada-io/karmada/pkg/controllers/status"
karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned"
"github.com/karmada-io/karmada/pkg/util/informermanager"
)
// aggregatedScheme aggregates all Kubernetes and extended schemes used by controllers.
@ -73,7 +74,7 @@ func Run(opts *options.Options, stopChan <-chan struct{}) error {
return err
}
setupControllers(controllerManager)
setupControllers(controllerManager, stopChan)
// blocks until the stop channel is closed.
if err := controllerManager.Start(stopChan); err != nil {
@ -85,7 +86,7 @@ func Run(opts *options.Options, stopChan <-chan struct{}) error {
return nil
}
func setupControllers(mgr controllerruntime.Manager) {
func setupControllers(mgr controllerruntime.Manager, stopChan <-chan struct{}) {
resetConfig := mgr.GetConfig()
dynamicClientSet := dynamic.NewForConfigOrDie(resetConfig)
karmadaClient := karmadaclientset.NewForConfigOrDie(resetConfig)
@ -142,10 +143,13 @@ func setupControllers(mgr controllerruntime.Manager) {
}
workStatusController := &status.PropagationWorkStatusController{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: mgr.GetRESTMapper(),
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
EventRecorder: mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: mgr.GetRESTMapper(),
KubeClientSet: kubeClientSet,
InformerManager: informermanager.NewMultiClusterInformerManager(),
StopChan: stopChan,
}
if err := workStatusController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup work status controller: %v", err)

View File

@ -2,16 +2,26 @@ package status
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/karmada-io/karmada/pkg/apis/propagationstrategy/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)
// WorkStatusControllerName is the controller name that will be used when reporting events.
@ -19,10 +29,14 @@ const WorkStatusControllerName = "work-status-controller"
// PropagationWorkStatusController is to sync status of PropagationWork.
type PropagationWorkStatusController struct {
client.Client // used to operate PropagationWork resources.
DynamicClient dynamic.Interface // used to fetch arbitrary resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
client.Client // used to operate PropagationWork resources.
DynamicClient dynamic.Interface // used to fetch arbitrary resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
KubeClientSet kubernetes.Interface // used to get kubernetes resources.
InformerManager informermanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
@ -45,9 +59,112 @@ func (c *PropagationWorkStatusController) Reconcile(req controllerruntime.Reques
return controllerruntime.Result{}, nil
}
return c.buildResourceInformers(work)
}
// buildResourceInformers builds informer dynamically for managed resources in member cluster.
// The created informer watches resource change and then sync to the relevant PropagationWork object.
func (c *PropagationWorkStatusController) buildResourceInformers(work *v1alpha1.PropagationWork) (controllerruntime.Result, error) {
err := c.registerInformersAndStart(work)
if err != nil {
klog.Errorf("Failed to register informer for propagationWork %s/%s. Error: %v.", work.GetNamespace(), work.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
// getEventHandler return callback function that knows how to handle events from the member cluster.
func (c *PropagationWorkStatusController) getEventHandler() cache.ResourceEventHandler {
if c.eventHandler == nil {
c.eventHandler = informermanager.NewHandlerOnAllEvents(c.syncPropagationWorkStatus)
}
return c.eventHandler
}
// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr
// and start it.
func (c *PropagationWorkStatusController) registerInformersAndStart(work *v1alpha1.PropagationWork) error {
memberClusterName, err := names.GetMemberClusterName(work.GetNamespace())
if err != nil {
klog.Errorf("Failed to get member cluster name by %s. Error: %v.", work.GetNamespace(), err)
return err
}
singleClusterInformerManager, err := c.getSingleClusterManager(memberClusterName)
if err != nil {
return err
}
gvrTargets, err := c.getGVRsFromPropagationWork(work)
if err != nil {
return err
}
for gvr := range gvrTargets {
singleClusterInformerManager.ForResource(gvr, c.getEventHandler())
}
c.InformerManager.Start(memberClusterName, c.StopChan)
synced := c.InformerManager.WaitForCacheSync(memberClusterName, c.StopChan)
if synced == nil {
klog.Errorf("No informerFactory for cluster %s exist.", memberClusterName)
return fmt.Errorf("no informerFactory for cluster %s exist", memberClusterName)
}
for gvr := range gvrTargets {
if !synced[gvr] {
klog.Errorf("Informer for %s hasn't synced.", gvr)
return fmt.Errorf("informer for %s hasn't synced", gvr)
}
}
return nil
}
// getGVRsFromPropagationWork traverses the manifests in propagationWork to find groupVersionResource list.
func (c *PropagationWorkStatusController) getGVRsFromPropagationWork(work *v1alpha1.PropagationWork) (map[schema.GroupVersionResource]bool, error) {
gvrTargets := map[schema.GroupVersionResource]bool{}
for _, manifest := range work.Spec.Workload.Manifests {
workload := &unstructured.Unstructured{}
err := workload.UnmarshalJSON(manifest.Raw)
if err != nil {
klog.Errorf("Failed to unmarshal workload. Error: %v.", err)
return nil, err
}
gvr, err := restmapper.GetGroupVersionResource(c.RESTMapper, workload.GroupVersionKind())
if err != nil {
klog.Errorf("Failed to get GVR from GVK for resource %s/%s. Error: %v.", workload.GetNamespace(), workload.GetName(), err)
return nil, err
}
gvrTargets[gvr] = true
}
return gvrTargets, nil
}
// getSingleClusterManager gets singleClusterInformerManager with clusterName.
// If manager is not exist, create it, otherwise gets it from map.
func (c *PropagationWorkStatusController) getSingleClusterManager(memberClusterName string) (informermanager.SingleClusterInformerManager, error) {
// TODO(chenxianpao): If cluster A is removed, then a new cluster that name also is A joins karmada,
// the cache in informer manager should be updated.
singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(memberClusterName)
if singleClusterInformerManager == nil {
dynamicClusterClient, err := util.BuildDynamicClusterClient(c.Client, c.KubeClientSet, memberClusterName)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", memberClusterName)
return nil, err
}
singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0)
}
return singleClusterInformerManager, nil
}
// syncPropagationWorkStatus will find propagationWork by label in workload, then update resource status to propagationWork status.
// label example: "karmada.io/created-by: karmada-es-member-cluster-1.default-deployment-nginx"
// TODO(chenxianpao): sync workload status to propagationWork status.
func (c *PropagationWorkStatusController) syncPropagationWorkStatus(obj runtime.Object) error {
resource := obj.(*unstructured.Unstructured)
klog.Infof("sync obj is %s/%s/%s", resource.GetKind(), resource.GetNamespace(), resource.GetName())
return nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *PropagationWorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.PropagationWork{}).Complete(c)

View File

@ -5,6 +5,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubeclientset "k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/karmada-io/karmada/pkg/apis/membercluster/v1alpha1"
@ -31,3 +32,18 @@ func GetMemberCluster(hostClient client.Client, clusterName string) (*v1alpha1.M
}
return memberCluster, nil
}
// BuildDynamicClusterClient builds dynamic client for informerFactory by clusterName,
// it will build kubeconfig from memberCluster resource and construct dynamic client.
func BuildDynamicClusterClient(hostClient client.Client, kubeClient kubeclientset.Interface, cluster string) (*DynamicClusterClient, error) {
memberCluster, err := GetMemberCluster(hostClient, cluster)
if err != nil {
return nil, err
}
dynamicClusterClient, err := NewClusterDynamicClientSet(memberCluster, kubeClient)
if err != nil {
return nil, err
}
return dynamicClusterClient, nil
}

View File

@ -0,0 +1,48 @@
package informermanager
import (
"reflect"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// NewHandlerOnAllEvents builds a ResourceEventHandler that the function 'fn' will be called on all events(add/update/delete).
func NewHandlerOnAllEvents(fn func(runtime.Object) error) cache.ResourceEventHandler {
return &cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
curObj := cur.(runtime.Object)
klog.V(2).Infof("Receive add event, obj is: %+v", curObj)
err := fn(curObj)
if err != nil {
klog.V(2).Infof("Failed to exec fn. Error: %v.", err)
}
},
UpdateFunc: func(old, cur interface{}) {
curObj := cur.(runtime.Object)
if !reflect.DeepEqual(old, cur) {
klog.V(2).Infof("Receive update event, obj is: %+v", curObj)
err := fn(curObj)
if err != nil {
klog.V(2).Infof("Failed to exec fn. Error: %v.", err)
}
}
},
DeleteFunc: func(old interface{}) {
if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok {
// This object might be stale but ok for our current usage.
old = deleted.Obj
if old == nil {
return
}
}
oldObj := old.(runtime.Object)
klog.V(2).Infof("Receive delete event, obj is: %+v", oldObj)
err := fn(oldObj)
if err != nil {
klog.V(2).Infof("Failed to exec fn. Error: %v.", err)
}
},
}
}

View File

@ -14,6 +14,10 @@ type MultiClusterInformerManager interface {
// ForCluster builds a informer manager for a specific cluster.
ForCluster(cluster string, client dynamic.Interface, defaultResync time.Duration) SingleClusterInformerManager
// GetSingleClusterManager gets the informer manager for a specific cluster.
// The informer manager should be created before, otherwise, nil will be returned.
GetSingleClusterManager(cluster string) SingleClusterInformerManager
// IsManagerExist checks if the informer manager for the cluster already created.
IsManagerExist(cluster string) bool
@ -26,8 +30,8 @@ type MultiClusterInformerManager interface {
WaitForCacheSync(cluster string, stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
}
// NewMultiClusterInformerManagerImpl constructs a new instance of multiClusterInformerManagerImpl.
func NewMultiClusterInformerManagerImpl() MultiClusterInformerManager {
// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
func NewMultiClusterInformerManager() MultiClusterInformerManager {
return &multiClusterInformerManagerImpl{
managers: make(map[string]SingleClusterInformerManager),
}
@ -54,6 +58,16 @@ func (m *multiClusterInformerManagerImpl) ForCluster(cluster string, client dyna
return manager
}
func (m *multiClusterInformerManagerImpl) GetSingleClusterManager(cluster string) SingleClusterInformerManager {
m.lock.RLock()
defer m.lock.RUnlock()
manager, exist := m.managers[cluster]
if exist {
return manager
}
return nil
}
func (m *multiClusterInformerManagerImpl) IsManagerExist(cluster string) bool {
m.lock.RLock()
defer m.lock.RUnlock()