From f32bd501e6e50a851d9b3d0b84a80bb17f24e0c9 Mon Sep 17 00:00:00 2001 From: changzhen Date: Thu, 1 Jul 2021 14:43:54 +0800 Subject: [PATCH] add service_export_controller to watch serviceexport and report endpointslices to karmada Signed-off-by: changzhen --- cmd/agent/app/agent.go | 43 +- .../app/controllermanager.go | 90 +--- cmd/webhook/app/webhook.go | 2 +- pkg/controllers/hpa/hpa_controller.go | 1 + .../mcs/service_export_controller.go | 443 ++++++++++++++++++ .../namespace/namespace_sync_controller.go | 1 + .../status/workstatus_controller.go | 30 +- pkg/util/constants.go | 10 + pkg/util/helper/binding.go | 1 + pkg/util/helper/cache.go | 42 ++ pkg/util/helper/predicate.go | 139 ++++++ pkg/util/helper/work.go | 63 +++ pkg/util/helper/workstatus.go | 14 + 13 files changed, 763 insertions(+), 116 deletions(-) create mode 100644 pkg/controllers/mcs/service_export_controller.go create mode 100644 pkg/util/helper/cache.go create mode 100644 pkg/util/helper/predicate.go create mode 100644 pkg/util/helper/work.go diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index ec23620f5..92124f700 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -12,17 +12,18 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/karmada-io/karmada/cmd/agent/app/options" - clusterapi "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/execution" + "github.com/karmada-io/karmada/pkg/controllers/mcs" "github.com/karmada-io/karmada/pkg/controllers/status" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" "github.com/karmada-io/karmada/pkg/karmadactl" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/gclient" + "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" @@ -79,7 +80,7 @@ func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *opti setupControllers(controllerManager, opts, ctx.Done()) - // blocks until the stop channel is closed. + // blocks until the context is done. if err := controllerManager.Start(ctx); err != nil { klog.Errorf("controller manager exits unexpectedly: %v", err) return err @@ -89,26 +90,11 @@ func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *opti } func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { - predicateFun := predicate.Funcs{ - CreateFunc: func(createEvent event.CreateEvent) bool { - return createEvent.Object.GetName() == opts.ClusterName - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { - return updateEvent.ObjectOld.GetName() == opts.ClusterName - }, - DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - return deleteEvent.Object.GetName() == opts.ClusterName - }, - GenericFunc: func(genericEvent event.GenericEvent) bool { - return false - }, - } - clusterStatusController := &status.ClusterStatusController{ Client: mgr.GetClient(), KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), - PredicateFunc: predicateFun, + PredicateFunc: helper.NewClusterPredicateByAgent(opts.ClusterName), InformerManager: informermanager.NewMultiClusterInformerManager(), StopChan: stopChan, ClusterClientSetFunc: util.NewClusterClientSetForAgent, @@ -149,14 +135,29 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop if err := workStatusController.SetupWithManager(mgr); err != nil { klog.Fatalf("Failed to setup work status controller: %v", err) } + + serviceExportController := &mcs.ServiceExportController{ + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName), + RESTMapper: mgr.GetRESTMapper(), + InformerManager: informermanager.NewMultiClusterInformerManager(), + StopChan: stopChan, + WorkerNumber: 1, + PredicateFunc: helper.NewPredicateForServiceExportControllerByAgent(opts.ClusterName), + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, + } + serviceExportController.RunWorkQueue() + if err := serviceExportController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup ServiceExport controller: %v", err) + } } func registerWithControlPlaneAPIServer(controlPlaneRestConfig *restclient.Config, memberClusterName string) error { karmadaClient := karmadaclientset.NewForConfigOrDie(controlPlaneRestConfig) - clusterObj := &clusterapi.Cluster{} + clusterObj := &clusterv1alpha1.Cluster{} clusterObj.Name = memberClusterName - clusterObj.Spec.SyncMode = clusterapi.Pull + clusterObj.Spec.SyncMode = clusterv1alpha1.Pull _, err := karmadactl.CreateClusterObject(karmadaClient, clusterObj, false) if err != nil { diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index ca95c69ed..a0b86da56 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -19,20 +19,20 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/karmada-io/karmada/cmd/controller-manager/app/options" - "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" - workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/binding" "github.com/karmada-io/karmada/pkg/controllers/cluster" "github.com/karmada-io/karmada/pkg/controllers/execution" "github.com/karmada-io/karmada/pkg/controllers/hpa" + "github.com/karmada-io/karmada/pkg/controllers/mcs" "github.com/karmada-io/karmada/pkg/controllers/namespace" "github.com/karmada-io/karmada/pkg/controllers/propagationpolicy" "github.com/karmada-io/karmada/pkg/controllers/status" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/detector" "github.com/karmada-io/karmada/pkg/util/gclient" + "github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/informermanager" - "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/overridemanager" ) @@ -86,7 +86,7 @@ func Run(ctx context.Context, opts *options.Options) error { setupControllers(controllerManager, opts, ctx.Done()) - // blocks until the stop channel is closed. + // blocks until the context is done. if err := controllerManager.Start(ctx); err != nil { klog.Errorf("controller manager exits unexpectedly: %v", err) return err @@ -130,16 +130,16 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop clusterPredicateFunc := predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { - obj := createEvent.Object.(*v1alpha1.Cluster) - return obj.Spec.SyncMode == v1alpha1.Push + obj := createEvent.Object.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push }, UpdateFunc: func(updateEvent event.UpdateEvent) bool { - obj := updateEvent.ObjectNew.(*v1alpha1.Cluster) - return obj.Spec.SyncMode == v1alpha1.Push + obj := updateEvent.ObjectNew.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push }, DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - obj := deleteEvent.Object.(*v1alpha1.Cluster) - return obj.Spec.SyncMode == v1alpha1.Push + obj := deleteEvent.Object.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push }, GenericFunc: func(genericEvent event.GenericEvent) bool { return false @@ -202,14 +202,12 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop klog.Fatalf("Failed to setup cluster resource binding controller: %v", err) } - workPredicateFunc := newPredicateFuncsForWork(mgr) - executionController := &execution.Controller{ Client: mgr.GetClient(), EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName), RESTMapper: mgr.GetRESTMapper(), ObjectWatcher: objectWatcher, - PredicateFunc: workPredicateFunc, + PredicateFunc: helper.NewWorkPredicate(mgr), ClusterClientSetFunc: util.NewClusterDynamicClientSet, } if err := executionController.SetupWithManager(mgr); err != nil { @@ -224,7 +222,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop StopChan: stopChan, WorkerNumber: 1, ObjectWatcher: objectWatcher, - PredicateFunc: workPredicateFunc, + PredicateFunc: helper.NewWorkPredicate(mgr), ClusterClientSetFunc: util.NewClusterDynamicClientSet, } workStatusController.RunWorkQueue() @@ -239,57 +237,19 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop if err := namespaceSyncController.SetupWithManager(mgr); err != nil { klog.Fatalf("Failed to setup namespace sync controller: %v", err) } -} -func newPredicateFuncsForWork(mgr controllerruntime.Manager) predicate.Funcs { - return predicate.Funcs{ - CreateFunc: func(createEvent event.CreateEvent) bool { - obj := createEvent.Object.(*workv1alpha1.Work) - clusterName, err := names.GetClusterName(obj.Namespace) - if err != nil { - klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name) - return false - } - - clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) - if err != nil { - klog.Errorf("Failed to get the given member cluster %s", clusterName) - return false - } - return clusterObj.Spec.SyncMode == v1alpha1.Push - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { - obj := updateEvent.ObjectNew.(*workv1alpha1.Work) - clusterName, err := names.GetClusterName(obj.Namespace) - if err != nil { - klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name) - return false - } - - clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) - if err != nil { - klog.Errorf("Failed to get the given member cluster %s", clusterName) - return false - } - return clusterObj.Spec.SyncMode == v1alpha1.Push - }, - DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - obj := deleteEvent.Object.(*workv1alpha1.Work) - clusterName, err := names.GetClusterName(obj.Namespace) - if err != nil { - klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name) - return false - } - - clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) - if err != nil { - klog.Errorf("Failed to get the given member cluster %s", clusterName) - return false - } - return clusterObj.Spec.SyncMode == v1alpha1.Push - }, - GenericFunc: func(genericEvent event.GenericEvent) bool { - return false - }, + serviceExportController := &mcs.ServiceExportController{ + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(mcs.ControllerName), + RESTMapper: mgr.GetRESTMapper(), + InformerManager: informermanager.NewMultiClusterInformerManager(), + StopChan: stopChan, + WorkerNumber: 1, + PredicateFunc: helper.NewPredicateForServiceExportController(mgr), + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + } + serviceExportController.RunWorkQueue() + if err := serviceExportController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup ServiceExport controller: %v", err) } } diff --git a/cmd/webhook/app/webhook.go b/cmd/webhook/app/webhook.go index 811804d12..2afe62f46 100644 --- a/cmd/webhook/app/webhook.go +++ b/cmd/webhook/app/webhook.go @@ -78,7 +78,7 @@ func Run(ctx context.Context, opts *options.Options) error { hookServer.Register("/mutate-work", &webhook.Admission{Handler: &work.MutatingAdmission{}}) hookServer.WebhookMux.Handle("/readyz/", http.StripPrefix("/readyz/", &healthz.Handler{})) - // blocks until the stop channel is closed. + // blocks until the context is done. if err := hookManager.Start(ctx); err != nil { klog.Errorf("webhook server exits unexpectedly: %v", err) return err diff --git a/pkg/controllers/hpa/hpa_controller.go b/pkg/controllers/hpa/hpa_controller.go index dbfea2fb9..9a1c9719f 100644 --- a/pkg/controllers/hpa/hpa_controller.go +++ b/pkg/controllers/hpa/hpa_controller.go @@ -107,6 +107,7 @@ func (c *HorizontalPodAutoscalerController) buildWorks(hpa *autoscalingv1.Horizo return err } + // TODO(@XiShanYongYe-Chang): refactor util.CreateOrUpdateWork with pkg/util/helper/work.go err = util.CreateOrUpdateWork(c.Client, objectMeta, hpaJSON) if err != nil { return err diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go new file mode 100644 index 000000000..317303964 --- /dev/null +++ b/pkg/controllers/mcs/service_export_controller.go @@ -0,0 +1,443 @@ +package mcs + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + discoveryv1beta1 "k8s.io/api/discovery/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/kind/pkg/errors" + mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/informermanager" + "github.com/karmada-io/karmada/pkg/util/informermanager/keys" + "github.com/karmada-io/karmada/pkg/util/names" +) + +// ControllerName is the controller name that will be used when reporting events. +const ControllerName = "service-export-controller" + +// ServiceExportController is to sync ServiceExport and report EndpointSlices of exported service to control-plane. +type ServiceExportController struct { + client.Client + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + StopChan <-chan struct{} + InformerManager informermanager.MultiClusterInformerManager + WorkerNumber int // WorkerNumber is the number of worker goroutines + PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys. + ClusterDynamicClientSetFunc func(c *clusterv1alpha1.Cluster, client client.Client) (*util.DynamicClusterClient, error) + // eventHandlers holds the handlers which used to handle events reported from member clusters. + // Each handler takes the cluster name as key and takes the handler function as the value, e.g. + // "member1": instance of ResourceEventHandler + eventHandlers sync.Map + worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. +} + +var ( + serviceExportGVR = mcsv1alpha1.SchemeGroupVersion.WithResource("serviceexports") + serviceExportGVK = mcsv1alpha1.SchemeGroupVersion.WithKind("ServiceExport") + endpointSliceGVR = discoveryv1beta1.SchemeGroupVersion.WithResource("endpointslices") +) + +// Reconcile performs a full reconciliation for the object referred to by the Request. +func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("Reconciling Work %s", req.NamespacedName.String()) + + work := &workv1alpha1.Work{} + if err := c.Client.Get(context.TODO(), req.NamespacedName, work); err != nil { + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + return controllerruntime.Result{Requeue: true}, err + } + + if !work.DeletionTimestamp.IsZero() { + return controllerruntime.Result{}, nil + } + + if !helper.IsResourceApplied(&work.Status) { + return controllerruntime.Result{}, nil + } + + if !helper.IsWorkContains(&work.Status, serviceExportGVK) { + return controllerruntime.Result{}, nil + } + + clusterName, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name) + return controllerruntime.Result{Requeue: true}, err + } + + cluster, err := util.GetCluster(c.Client, clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return controllerruntime.Result{Requeue: true}, err + } + + return c.buildResourceInformers(cluster) +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager) error { + return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}).WithEventFilter(c.PredicateFunc).Complete(c) +} + +// RunWorkQueue initializes worker and run it, worker will process resource asynchronously. +func (c *ServiceExportController) RunWorkQueue() { + c.worker = util.NewAsyncWorker("service-export", time.Second, nil, c.syncServiceExportOrEndpointSlice) + c.worker.Run(c.WorkerNumber, c.StopChan) +} + +func (c *ServiceExportController) syncServiceExportOrEndpointSlice(key util.QueueKey) error { + klog.V(4).Infof("Begin to sync ServiceExport or EndpointSlice %s.", key) + + fedKey, ok := key.(keys.FederatedKey) + if !ok { + klog.Errorf("Failed to sync serviceExport as invalid key: %v", key) + return fmt.Errorf("invalid key") + } + + switch fedKey.Kind { + case util.ServiceExportKind: + if err := c.handleServiceExportEvent(fedKey); err != nil { + klog.Errorf("Failed to handle serviceExport(%s) event, Error: %v", + fedKey.NamespaceKey(), err) + return err + } + case util.EndpointSliceKind: + if err := c.handleEndpointSliceEvent(fedKey); err != nil { + klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v", + fedKey.NamespaceKey(), err) + return err + } + } + + return nil +} + +func (c *ServiceExportController) buildResourceInformers(cluster *clusterv1alpha1.Cluster) (controllerruntime.Result, error) { + err := c.registerInformersAndStart(cluster) + if err != nil { + klog.Errorf("Failed to register informer for Cluster %s. Error: %v.", cluster.Name, err) + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil +} + +// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr +// and start it. +func (c *ServiceExportController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error { + singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) + if singleClusterInformerManager == nil { + dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster, c.Client) + if err != nil { + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) + return err + } + singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0) + } + + gvrTargets := []schema.GroupVersionResource{ + serviceExportGVR, + endpointSliceGVR, + } + for _, gvr := range gvrTargets { + singleClusterInformerManager.ForResource(gvr, c.getEventHandler(cluster.Name)) + } + + c.InformerManager.Start(cluster.Name, c.StopChan) + synced := c.InformerManager.WaitForCacheSync(cluster.Name, c.StopChan) + if synced == nil { + klog.Errorf("No informerFactory for cluster %s exist.", cluster.Name) + return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) + } + for _, gvr := range gvrTargets { + if !synced[gvr] { + klog.Errorf("Informer for %s hasn't synced.", gvr) + return fmt.Errorf("informer for %s hasn't synced", gvr) + } + } + + return nil +} + +// getEventHandler return callback function that knows how to handle events from the member cluster. +func (c *ServiceExportController) getEventHandler(clusterName string) cache.ResourceEventHandler { + if value, exists := c.eventHandlers.Load(clusterName); exists { + return value.(cache.ResourceEventHandler) + } + + eventHandler := informermanager.NewHandlerOnEvents(c.genHandlerAddFunc(clusterName), c.genHandlerUpdateFunc(clusterName), + c.genHandlerDeleteFunc(clusterName)) + c.eventHandlers.Store(clusterName, eventHandler) + return eventHandler +} + +func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj interface{}) { + return func(obj interface{}) { + curObj := obj.(runtime.Object) + key, err := keys.FederatedKeyFunc(clusterName, curObj) + if err != nil { + klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind()) + return + } + c.worker.AddRateLimited(key) + } +} + +func (c *ServiceExportController) genHandlerUpdateFunc(clusterName string) func(oldObj, newObj interface{}) { + return func(oldObj, newObj interface{}) { + curObj := newObj.(runtime.Object) + if !reflect.DeepEqual(oldObj, newObj) { + key, err := keys.FederatedKeyFunc(clusterName, curObj) + if err != nil { + klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind()) + return + } + c.worker.AddRateLimited(key) + } + } +} + +func (c *ServiceExportController) genHandlerDeleteFunc(clusterName string) func(obj interface{}) { + return func(obj interface{}) { + if deleted, ok := obj.(cache.DeletedFinalStateUnknown); ok { + // This object might be stale but ok for our current usage. + obj = deleted.Obj + if obj == nil { + return + } + } + oldObj := obj.(runtime.Object) + key, err := keys.FederatedKeyFunc(clusterName, oldObj) + if err != nil { + klog.Warningf("Failed to generate key for obj: %s", oldObj.GetObjectKind().GroupVersionKind()) + return + } + c.worker.AddRateLimited(key) + } +} + +// handleServiceExportEvent syncs EndPointSlice objects to control-plane according to ServiceExport event. +// For ServiceExport create or update event, reports the referencing service's EndpointSlice. +// For ServiceExport delete event, cleanup the previously reported EndpointSlice. +func (c *ServiceExportController) handleServiceExportEvent(serviceExportKey keys.FederatedKey) error { + _, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, serviceExportKey) + if err != nil { + if apierrors.IsNotFound(err) { + return cleanupWorkWithServiceExportDelete(c.Client, serviceExportKey) + } + return err + } + + // Even though the EndpointSlice will be synced when dealing with EndpointSlice events, thus the 'report' here may + // be redundant, but it helps to avoid a corner case: + // If skip report here, after ServiceExport deletion and re-creation, if no EndpointSlice changes, we didn't get a + // change to sync. + if err = c.reportEndpointSliceWithServiceExportCreate(serviceExportKey); err != nil { + klog.Errorf("Failed to handle ServiceExport(%s) event, Error: %v", + serviceExportKey.NamespaceKey(), err) + return err + } + + return nil +} + +// handleEndpointSliceEvent syncs EndPointSlice objects to control-plane according to EndpointSlice event. +// For EndpointSlice create or update event, reports the EndpointSlice when referencing service has been exported. +// For EndpointSlice delete event, cleanup the previously reported EndpointSlice. +func (c *ServiceExportController) handleEndpointSliceEvent(endpointSliceKey keys.FederatedKey) error { + endpointSliceObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, endpointSliceKey) + if err != nil { + if apierrors.IsNotFound(err) { + return cleanupWorkWithEndpointSliceDelete(c.Client, endpointSliceKey) + } + return err + } + + if endpointSliceObj == nil { + klog.V(2).Infof("Ignore the event key %s which not listened by karmada.", endpointSliceKey) + return nil + } + + if err = c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(endpointSliceKey.Cluster, endpointSliceObj); err != nil { + klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v", + endpointSliceKey.NamespaceKey(), err) + return err + } + + return nil +} + +// reportEndpointSliceWithServiceExportCreate reports the referencing service's EndpointSlice. +func (c *ServiceExportController) reportEndpointSliceWithServiceExportCreate(serviceExportKey keys.FederatedKey) error { + var ( + endpointSliceObjects []runtime.Object + err error + errs []error + ) + + singleClusterManager := c.InformerManager.GetSingleClusterManager(serviceExportKey.Cluster) + if singleClusterManager == nil { + return nil + } + + endpointSliceLister := singleClusterManager.Lister(endpointSliceGVR) + if endpointSliceObjects, err = endpointSliceLister.ByNamespace(serviceExportKey.Namespace).List(labels.SelectorFromSet(labels.Set{ + discoveryv1beta1.LabelServiceName: serviceExportKey.Name, + })); err != nil { + return err + } + + for index := range endpointSliceObjects { + if err = reportEndpointSlice(c.Client, endpointSliceObjects[index].(*unstructured.Unstructured), serviceExportKey.Cluster); err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return errors.NewAggregate(errs) + } + + return nil +} + +// reportEndpointSliceWithEndpointSliceCreateOrUpdate reports the EndpointSlice when referencing service has been exported. +func (c *ServiceExportController) reportEndpointSliceWithEndpointSliceCreateOrUpdate(clusterName string, endpointSlice *unstructured.Unstructured) error { + relatedServiceName := endpointSlice.GetLabels()[discoveryv1beta1.LabelServiceName] + + singleClusterManager := c.InformerManager.GetSingleClusterManager(clusterName) + if singleClusterManager == nil { + return nil + } + + serviceExportLister := singleClusterManager.Lister(serviceExportGVR) + _, err := serviceExportLister.ByNamespace(endpointSlice.GetNamespace()).Get(relatedServiceName) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + + klog.Errorf("Failed to get ServiceExport object %s/%s. error: %v.", relatedServiceName, endpointSlice.GetNamespace(), err) + return err + } + + return reportEndpointSlice(c.Client, endpointSlice, clusterName) +} + +// reportEndpointSlice report EndPointSlice objects to control-plane. +func reportEndpointSlice(c client.Client, endpointSlice *unstructured.Unstructured, clusterName string) error { + var ( + executionSpace string + endpointSliceWork *workv1alpha1.Work + err error + ) + + executionSpace, err = names.GenerateExecutionSpaceName(clusterName) + if err != nil { + return err + } + + workMeta := metav1.ObjectMeta{ + Name: names.GenerateWorkName(endpointSlice.GetKind(), endpointSlice.GetName(), endpointSlice.GetNamespace()), + Namespace: executionSpace, + Labels: map[string]string{ + util.ServiceNamespaceLabel: endpointSlice.GetNamespace(), + util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1beta1.LabelServiceName], + // todo: add label to indicate work do not need execute + }, + } + + endpointSliceWork, err = helper.WarpResourceWithWork(workMeta, endpointSlice) + if err != nil { + return err + } + if err = helper.CreateOrUpdateWork(c, endpointSliceWork); err != nil { + return err + } + + return nil +} + +func cleanupWorkWithServiceExportDelete(c client.Client, serviceExportKey keys.FederatedKey) error { + executionSpace, err := names.GenerateExecutionSpaceName(serviceExportKey.Cluster) + if err != nil { + return err + } + + workList := &workv1alpha1.WorkList{} + if err = c.List(context.TODO(), workList, &client.ListOptions{ + Namespace: executionSpace, + LabelSelector: labels.SelectorFromSet(labels.Set{ + util.ServiceNamespaceLabel: serviceExportKey.Namespace, + util.ServiceNameLabel: serviceExportKey.Name, + }), + }); err != nil { + klog.Error("Failed to list workList reported by ServiceExport(%s) in namespace(%s), Error: %v", + serviceExportKey.NamespaceKey(), executionSpace, err) + return err + } + + var errs []error + for index, work := range workList.Items { + if err = c.Delete(context.TODO(), &workList.Items[index]); err != nil { + klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err) + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.NewAggregate(errs) + } + + return nil +} + +func cleanupWorkWithEndpointSliceDelete(c client.Client, endpointSliceKey keys.FederatedKey) error { + executionSpace, err := names.GenerateExecutionSpaceName(endpointSliceKey.Cluster) + if err != nil { + return err + } + + workNamespaceKey := types.NamespacedName{ + Namespace: executionSpace, + Name: names.GenerateWorkName(endpointSliceKey.Kind, endpointSliceKey.Name, endpointSliceKey.Namespace), + } + work := &workv1alpha1.Work{} + if err = c.Get(context.TODO(), workNamespaceKey, work); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + + klog.Error("Failed to get work(%s), Error: %v", workNamespaceKey, executionSpace, err) + return err + } + + if err = c.Delete(context.TODO(), work); err != nil { + klog.Errorf("Failed to delete work(%s), Error: %v", workNamespaceKey, err) + return err + } + + return nil +} diff --git a/pkg/controllers/namespace/namespace_sync_controller.go b/pkg/controllers/namespace/namespace_sync_controller.go index 5188de7b4..a115f0120 100644 --- a/pkg/controllers/namespace/namespace_sync_controller.go +++ b/pkg/controllers/namespace/namespace_sync_controller.go @@ -124,6 +124,7 @@ func (c *Controller) buildWorks(namespace *v1.Namespace, clusters []v1alpha1.Clu return err } + // TODO(@XiShanYongYe-Chang): refactor util.CreateOrUpdateWork with pkg/util/helper/work.go err = util.CreateOrUpdateWork(c.Client, objectMeta, namespaceJSON) if err != nil { return err diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index c48c03382..faf0fe345 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -120,7 +120,7 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error { return fmt.Errorf("invalid key") } - obj, err := c.getObjectFromCache(fedKey) + obj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey) if err != nil { if errors.IsNotFound(err) { return c.handleDeleteEvent(fedKey) @@ -335,34 +335,6 @@ func (c *WorkStatusController) getRawManifest(manifests []workv1alpha1.Manifest, return nil, fmt.Errorf("no such manifest exist") } -// getObjectFromCache gets full object information from cache by key in worker queue. -func (c *WorkStatusController) getObjectFromCache(key keys.FederatedKey) (*unstructured.Unstructured, error) { - gvr, err := restmapper.GetGroupVersionResource(c.RESTMapper, key.GroupVersionKind()) - if err != nil { - klog.Errorf("Failed to get GVR from GVK %s. Error: %v", key.GroupVersionKind(), err) - return nil, err - } - - singleClusterManager := c.InformerManager.GetSingleClusterManager(key.Cluster) - if singleClusterManager == nil { - return nil, nil - } - var obj runtime.Object - lister := singleClusterManager.Lister(gvr) - obj, err = lister.Get(key.NamespaceKey()) - if err != nil { - if errors.IsNotFound(err) { - return nil, err - } - - // print logs only for real error. - klog.Errorf("Failed to get obj %s. error: %v.", key.String(), err) - - return nil, err - } - return obj.(*unstructured.Unstructured), nil -} - // registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr // and start it. func (c *WorkStatusController) registerInformersAndStart(cluster *v1alpha1.Cluster, work *workv1alpha1.Work) error { diff --git a/pkg/util/constants.go b/pkg/util/constants.go index d575f373a..eb0ea37b1 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -24,6 +24,12 @@ const ( // WorkNameLabel is added to objects to specify associated Work's name. WorkNameLabel = "work.karmada.io/name" + + // ServiceNamespaceLabel is added to work object, which is report by member cluster, to specify service namespace associated with EndpointSlice. + ServiceNamespaceLabel = "endpointslice.karmada.io/namespace" + + // ServiceNameLabel is added to work object, which is report by member cluster, to specify service name associated with EndpointSlice. + ServiceNameLabel = "endpointslice.karmada.io/name" ) // Define annotations used by karmada system. @@ -78,4 +84,8 @@ const ( ReplicaSetKind = "ReplicaSet" // StatefulSetKind indicates the target resource is a statefulset StatefulSetKind = "StatefulSet" + // ServiceExportKind indicates the target resource is a serviceexport + ServiceExportKind = "ServiceExport" + // EndpointSliceKind indicates the target resource is a endpointslice + EndpointSliceKind = "EndpointSlice" ) diff --git a/pkg/util/helper/binding.go b/pkg/util/helper/binding.go index 4f16194fd..7a5e45675 100644 --- a/pkg/util/helper/binding.go +++ b/pkg/util/helper/binding.go @@ -201,6 +201,7 @@ func EnsureWork(c client.Client, workload *unstructured.Unstructured, clusterNam } } + // TODO(@XiShanYongYe-Chang): refactor util.CreateOrUpdateWork with pkg/util/helper/work.go workloadJSON, err := clonedWorkload.MarshalJSON() if err != nil { klog.Errorf("Failed to marshal workload, kind: %s, namespace: %s, name: %s. Error: %v", diff --git a/pkg/util/helper/cache.go b/pkg/util/helper/cache.go new file mode 100644 index 000000000..02fca96c0 --- /dev/null +++ b/pkg/util/helper/cache.go @@ -0,0 +1,42 @@ +package helper + +import ( + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + + "github.com/karmada-io/karmada/pkg/util/informermanager" + "github.com/karmada-io/karmada/pkg/util/informermanager/keys" + "github.com/karmada-io/karmada/pkg/util/restmapper" +) + +// GetObjectFromCache gets full object information from cache by key in worker queue. +func GetObjectFromCache(restMapper meta.RESTMapper, manager informermanager.MultiClusterInformerManager, + fedKey keys.FederatedKey) (*unstructured.Unstructured, error) { + gvr, err := restmapper.GetGroupVersionResource(restMapper, fedKey.GroupVersionKind()) + if err != nil { + klog.Errorf("Failed to get GVR from GVK %s. Error: %v", fedKey.GroupVersionKind(), err) + return nil, err + } + + singleClusterManager := manager.GetSingleClusterManager(fedKey.Cluster) + if singleClusterManager == nil { + return nil, nil + } + var obj runtime.Object + lister := singleClusterManager.Lister(gvr) + obj, err = lister.Get(fedKey.NamespaceKey()) + if err != nil { + if errors.IsNotFound(err) { + return nil, err + } + + // print logs only for real error. + klog.Errorf("Failed to get obj %s. error: %v.", fedKey.String(), err) + + return nil, err + } + return obj.(*unstructured.Unstructured), nil +} diff --git a/pkg/util/helper/predicate.go b/pkg/util/helper/predicate.go new file mode 100644 index 000000000..f1008cbb9 --- /dev/null +++ b/pkg/util/helper/predicate.go @@ -0,0 +1,139 @@ +package helper + +import ( + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/names" +) + +// NewWorkPredicate generates an event filter function with Work for karmada-controller-manager. +func NewWorkPredicate(mgr controllerruntime.Manager) predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + obj := createEvent.Object.(*workv1alpha1.Work) + clusterName, err := names.GetClusterName(obj.Namespace) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name) + return false + } + + clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return false + } + return clusterObj.Spec.SyncMode == clusterv1alpha1.Push + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + obj := updateEvent.ObjectNew.(*workv1alpha1.Work) + clusterName, err := names.GetClusterName(obj.Namespace) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name) + return false + } + + clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return false + } + return clusterObj.Spec.SyncMode == clusterv1alpha1.Push + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + obj := deleteEvent.Object.(*workv1alpha1.Work) + clusterName, err := names.GetClusterName(obj.Namespace) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name) + return false + } + + clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return false + } + return clusterObj.Spec.SyncMode == clusterv1alpha1.Push + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } +} + +// NewPredicateForServiceExportController generates an event filter function for ServiceExport controller running by karmada-controller-manager. +func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return false + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + clusterName, err := names.GetClusterName(updateEvent.ObjectOld.GetNamespace()) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", updateEvent.ObjectOld.GetNamespace(), + updateEvent.ObjectOld.GetName()) + return false + } + + clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return false + } + return clusterObj.Spec.SyncMode == clusterv1alpha1.Push + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + return false + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } +} + +// NewClusterPredicateByAgent generates an event filter function with Cluster for karmada-agent. +func NewClusterPredicateByAgent(clusterName string) predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return createEvent.Object.GetName() == clusterName + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + return updateEvent.ObjectOld.GetName() == clusterName + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + return deleteEvent.Object.GetName() == clusterName + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } +} + +// NewPredicateForServiceExportControllerByAgent generates an event filter function for ServiceExport controller running by karmada-agent. +func NewPredicateForServiceExportControllerByAgent(curClusterName string) predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return false + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + clusterName, err := names.GetClusterName(updateEvent.ObjectOld.GetNamespace()) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", updateEvent.ObjectOld.GetNamespace(), + updateEvent.ObjectOld.GetName()) + return false + } + return clusterName == curClusterName + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + return false + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } +} diff --git a/pkg/util/helper/work.go b/pkg/util/helper/work.go new file mode 100644 index 000000000..df5c23bd1 --- /dev/null +++ b/pkg/util/helper/work.go @@ -0,0 +1,63 @@ +package helper + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" +) + +// WarpResourceWithWork will warp resource into Work. +func WarpResourceWithWork(workMeta metav1.ObjectMeta, resource *unstructured.Unstructured) (*workv1alpha1.Work, error) { + workload := resource.DeepCopy() + workloadJSON, err := workload.MarshalJSON() + if err != nil { + klog.Errorf("Failed to marshal workload(%s/%s), Error: %v", workload.GetNamespace(), workload.GetName(), err) + return nil, err + } + + return &workv1alpha1.Work{ + ObjectMeta: workMeta, + Spec: workv1alpha1.WorkSpec{ + Workload: workv1alpha1.WorkloadTemplate{ + Manifests: []workv1alpha1.Manifest{ + { + RawExtension: runtime.RawExtension{ + Raw: workloadJSON, + }, + }, + }, + }, + }, + }, nil +} + +// CreateOrUpdateWork creates a Work object if not exist, or updates if it already exist. +func CreateOrUpdateWork(client client.Client, work *workv1alpha1.Work) error { + runtimeObject := work.DeepCopy() + operationResult, err := controllerutil.CreateOrUpdate(context.TODO(), client, runtimeObject, func() error { + runtimeObject.Spec = work.Spec + runtimeObject.Labels = work.Labels + return nil + }) + if err != nil { + klog.Errorf("Failed to create/update work %s/%s. Error: %v", work.GetNamespace(), work.GetName(), err) + return err + } + + if operationResult == controllerutil.OperationResultCreated { + klog.V(2).Infof("Create work %s/%s successfully.", work.GetNamespace(), work.GetName()) + } else if operationResult == controllerutil.OperationResultUpdated { + klog.V(2).Infof("Update work %s/%s successfully.", work.GetNamespace(), work.GetName()) + } else { + klog.V(2).Infof("Work %s/%s is up to date.", work.GetNamespace(), work.GetName()) + } + + return nil +} diff --git a/pkg/util/helper/workstatus.go b/pkg/util/helper/workstatus.go index d22f789d3..3e1671003 100644 --- a/pkg/util/helper/workstatus.go +++ b/pkg/util/helper/workstatus.go @@ -159,3 +159,17 @@ func IsResourceApplied(workStatus *workv1alpha1.WorkStatus) bool { } return false } + +// IsWorkContains checks if the target resource exists in a work. +// Note: This function checks the Work object's status to detect the target resource, so the Work should be 'Applied', +// otherwise always returns false. +func IsWorkContains(workStatus *workv1alpha1.WorkStatus, targetResource schema.GroupVersionKind) bool { + for _, manifestStatuses := range workStatus.ManifestStatuses { + if targetResource.Group == manifestStatuses.Identifier.Group && + targetResource.Version == manifestStatuses.Identifier.Version && + targetResource.Kind == manifestStatuses.Identifier.Kind { + return true + } + } + return false +}