From 5932615cbae0bbc7e836e0dc713ceaf40564e4c0 Mon Sep 17 00:00:00 2001 From: jwcesign Date: Tue, 28 Nov 2023 23:21:20 +0800 Subject: [PATCH] feat: support mcs with native svc name Signed-off-by: jwcesign --- artifacts/deploy/webhook-configuration.yaml | 14 + cmd/agent/app/agent.go | 23 +- .../app/controllermanager.go | 36 +- cmd/webhook/app/webhook.go | 1 + pkg/apis/networking/v1alpha1/service_types.go | 2 + .../v1alpha1/well_known_constants.go | 3 + .../mcs/service_export_controller.go | 19 +- .../endpointslice_collect_controller.go | 469 ++++++++++++++++++ .../endpointslice_dispatch_controller.go | 280 +++++++++++ .../multiclusterservice/mcs_controller.go | 165 ++++-- pkg/events/events.go | 6 +- pkg/util/cluster.go | 14 + pkg/util/constants.go | 15 + pkg/util/helper/mcs.go | 21 +- pkg/util/helper/predicate.go | 62 +++ pkg/util/helper/work.go | 18 + pkg/util/names/names.go | 4 + pkg/webhook/multiclusterservice/mutating.go | 59 +++ 18 files changed, 1147 insertions(+), 64 deletions(-) create mode 100644 pkg/controllers/multiclusterservice/endpointslice_collect_controller.go create mode 100644 pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go create mode 100644 pkg/webhook/multiclusterservice/mutating.go diff --git a/artifacts/deploy/webhook-configuration.yaml b/artifacts/deploy/webhook-configuration.yaml index 2db21903d..c8e1e212f 100644 --- a/artifacts/deploy/webhook-configuration.yaml +++ b/artifacts/deploy/webhook-configuration.yaml @@ -75,6 +75,20 @@ webhooks: sideEffects: None admissionReviewVersions: [ "v1" ] timeoutSeconds: 3 + - name: multiclusterservice.karmada.io + rules: + - operations: ["CREATE", "UPDATE"] + apiGroups: ["networking.karmada.io"] + apiVersions: ["*"] + resources: ["multiclusterservices"] + scope: "Namespaced" + clientConfig: + url: https://karmada-webhook.karmada-system.svc:443/mutate-multiclusterservice + caBundle: {{caBundle}} + failurePolicy: Fail + sideEffects: None + admissionReviewVersions: [ "v1" ] + timeoutSeconds: 3 --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 917709ff8..edb8741a3 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -46,6 +46,7 @@ import ( controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context" "github.com/karmada-io/karmada/pkg/controllers/execution" "github.com/karmada-io/karmada/pkg/controllers/mcs" + "github.com/karmada-io/karmada/pkg/controllers/multiclusterservice" "github.com/karmada-io/karmada/pkg/controllers/status" karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" "github.com/karmada-io/karmada/pkg/karmadactl/util/apiclient" @@ -74,7 +75,7 @@ func NewAgentCommand(ctx context.Context) *cobra.Command { cmd := &cobra.Command{ Use: "karmada-agent", Long: `The karmada-agent is the agent of member clusters. It can register a specific cluster to the Karmada control -plane and sync manifests from the Karmada control plane to the member cluster. In addition, it also syncs the status of member +plane and sync manifests from the Karmada control plane to the member cluster. In addition, it also syncs the status of member cluster and manifests to the Karmada control plane.`, RunE: func(cmd *cobra.Command, args []string) error { // validate options @@ -127,6 +128,7 @@ func init() { controllers["workStatus"] = startWorkStatusController controllers["serviceExport"] = startServiceExportController controllers["certRotation"] = startCertRotationController + controllers["endpointsliceCollect"] = startEndpointSliceCollectController } func run(ctx context.Context, opts *options.Options) error { @@ -387,6 +389,25 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error) return true, nil } +func startEndpointSliceCollectController(ctx controllerscontext.Context) (enabled bool, err error) { + opts := ctx.Opts + endpointSliceCollectController := &multiclusterservice.EndpointSliceCollectController{ + Client: ctx.Mgr.GetClient(), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: genericmanager.GetInstance(), + StopChan: ctx.StopChan, + WorkerNumber: 3, + PredicateFunc: helper.NewPredicateForEndpointSliceCollectControllerOnAgent(opts.ClusterName), + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + } + endpointSliceCollectController.RunWorkQueue() + if err := endpointSliceCollectController.SetupWithManager(ctx.Mgr); err != nil { + return false, err + } + return true, nil +} + func startCertRotationController(ctx controllerscontext.Context) (bool, error) { certRotationController := &certificate.CertRotationController{ Client: ctx.Mgr.GetClient(), diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 66fa842d9..3e1f9dbf9 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -96,7 +96,7 @@ func NewControllerManagerCommand(ctx context.Context) *cobra.Command { cmd := &cobra.Command{ Use: "karmada-controller-manager", Long: `The karmada-controller-manager runs various controllers. -The controllers watch Karmada objects and then talk to the underlying clusters' API servers +The controllers watch Karmada objects and then talk to the underlying clusters' API servers to create regular Kubernetes resources.`, RunE: func(cmd *cobra.Command, args []string) error { // validate options @@ -226,6 +226,8 @@ func init() { controllers["cronFederatedHorizontalPodAutoscaler"] = startCronFederatedHorizontalPodAutoscalerController controllers["hpaReplicasSyncer"] = startHPAReplicasSyncerController controllers["multiclusterservice"] = startMCSController + controllers["endpointsliceCollect"] = startEndpointSliceCollectController + controllers["endpointsliceDispatch"] = startEndpointSliceDispatchController } func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) { @@ -461,6 +463,38 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool, return true, nil } +func startEndpointSliceCollectController(ctx controllerscontext.Context) (enabled bool, err error) { + opts := ctx.Opts + endpointSliceCollectController := &multiclusterservice.EndpointSliceCollectController{ + Client: ctx.Mgr.GetClient(), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: genericmanager.GetInstance(), + StopChan: ctx.StopChan, + WorkerNumber: 3, + PredicateFunc: helper.NewPredicateForEndpointSliceCollectController(ctx.Mgr), + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, + } + endpointSliceCollectController.RunWorkQueue() + if err := endpointSliceCollectController.SetupWithManager(ctx.Mgr); err != nil { + return false, err + } + return true, nil +} + +func startEndpointSliceDispatchController(ctx controllerscontext.Context) (enabled bool, err error) { + endpointSliceSyncController := &multiclusterservice.EndpointsliceDispatchController{ + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(multiclusterservice.EndpointsliceDispatchControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + InformerManager: genericmanager.GetInstance(), + } + if err := endpointSliceSyncController.SetupWithManager(ctx.Mgr); err != nil { + return false, err + } + return true, nil +} + func startEndpointSliceController(ctx controllerscontext.Context) (enabled bool, err error) { endpointSliceController := &mcs.EndpointSliceController{ Client: ctx.Mgr.GetClient(), diff --git a/cmd/webhook/app/webhook.go b/cmd/webhook/app/webhook.go index d7af99409..e50ceeb3c 100644 --- a/cmd/webhook/app/webhook.go +++ b/cmd/webhook/app/webhook.go @@ -155,6 +155,7 @@ func Run(ctx context.Context, opts *options.Options) error { hookServer.Register("/validate-resourceinterpretercustomization", &webhook.Admission{Handler: &resourceinterpretercustomization.ValidatingAdmission{Client: hookManager.GetClient(), Decoder: decoder}}) hookServer.Register("/validate-multiclusteringress", &webhook.Admission{Handler: &multiclusteringress.ValidatingAdmission{Decoder: decoder}}) hookServer.Register("/validate-multiclusterservice", &webhook.Admission{Handler: &multiclusterservice.ValidatingAdmission{Decoder: decoder}}) + hookServer.Register("/mutate-multiclusterservice", &webhook.Admission{Handler: &multiclusterservice.MutatingAdmission{Decoder: decoder}}) hookServer.Register("/mutate-federatedhpa", &webhook.Admission{Handler: &federatedhpa.MutatingAdmission{Decoder: decoder}}) hookServer.Register("/validate-resourcedeletionprotection", &webhook.Admission{Handler: &resourcedeletionprotection.ValidatingAdmission{Decoder: decoder}}) hookServer.WebhookMux().Handle("/readyz/", http.StripPrefix("/readyz/", &healthz.Handler{})) diff --git a/pkg/apis/networking/v1alpha1/service_types.go b/pkg/apis/networking/v1alpha1/service_types.go index 01abb9749..aac7272a8 100644 --- a/pkg/apis/networking/v1alpha1/service_types.go +++ b/pkg/apis/networking/v1alpha1/service_types.go @@ -32,6 +32,8 @@ const ( ResourceNamespaceScopedMultiClusterService = true // MCSServiceAppliedConditionType is indicates the condition type of mcs service applied. MCSServiceAppliedConditionType = "ServiceApplied" + // EndpointSliceDispatched indicates whether the EndpointSlice is dispatched to consumption clusters + EndpointSliceDispatched string = "EndpointSliceDispatched" ) // +genclient diff --git a/pkg/apis/networking/v1alpha1/well_known_constants.go b/pkg/apis/networking/v1alpha1/well_known_constants.go index 57cf51c4b..b037b7942 100644 --- a/pkg/apis/networking/v1alpha1/well_known_constants.go +++ b/pkg/apis/networking/v1alpha1/well_known_constants.go @@ -1,9 +1,12 @@ /* Copyright 2023 The Karmada Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 49b1dc9f3..0ca2d63e0 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -99,7 +99,7 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr return controllerruntime.Result{}, nil } - if !isWorkContains(work.Spec.Workload.Manifests, serviceExportGVK) { + if !helper.IsWorkContains(work.Spec.Workload.Manifests, serviceExportGVK) { return controllerruntime.Result{}, nil } @@ -123,23 +123,6 @@ func (c *ServiceExportController) Reconcile(ctx context.Context, req controllerr return c.buildResourceInformers(cluster) } -// isWorkContains checks if the target resource exists in a work.spec.workload.manifests. -func isWorkContains(manifests []workv1alpha1.Manifest, targetResource schema.GroupVersionKind) bool { - for index := range manifests { - workload := &unstructured.Unstructured{} - err := workload.UnmarshalJSON(manifests[index].Raw) - if err != nil { - klog.Errorf("Failed to unmarshal work manifests index %d, error is: %v", index, err) - continue - } - - if targetResource == workload.GroupVersionKind() { - return true - } - } - return false -} - // SetupWithManager creates a controller and register to controller manager. func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager) error { return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c) diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go new file mode 100644 index 000000000..9bd24b569 --- /dev/null +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go @@ -0,0 +1,469 @@ +/* +Copyright 2023 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiclusterservice + +import ( + "context" + "fmt" + "reflect" + "sync" + + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" + "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/names" +) + +// EndpointSliceCollectControllerName is the controller name that will be used when reporting events. +const EndpointSliceCollectControllerName = "endpointslice-collect-controller" + +type EndpointSliceCollectController struct { + client.Client + RESTMapper meta.RESTMapper + StopChan <-chan struct{} + InformerManager genericmanager.MultiClusterInformerManager + WorkerNumber int // WorkerNumber is the number of worker goroutines + PredicateFunc predicate.Predicate // PredicateFunc is the function that filters events before enqueuing the keys. + ClusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + // eventHandlers holds the handlers which used to handle events reported from member clusters. + // Each handler takes the cluster name as key and takes the handler function as the value, e.g. + // "member1": instance of ResourceEventHandler + eventHandlers sync.Map + worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. + + ClusterCacheSyncTimeout metav1.Duration +} + +var ( + endpointSliceGVR = discoveryv1.SchemeGroupVersion.WithResource("endpointslices") + multiClusterServiceGVK = networkingv1alpha1.SchemeGroupVersion.WithKind("MultiClusterService") +) + +// Reconcile performs a full reconciliation for the object referred to by the Request. +func (c *EndpointSliceCollectController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("Reconciling Work %s", req.NamespacedName.String()) + + work := &workv1alpha1.Work{} + if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil { + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + return controllerruntime.Result{Requeue: true}, err + } + + if !helper.IsWorkContains(work.Spec.Workload.Manifests, multiClusterServiceGVK) { + return controllerruntime.Result{}, nil + } + + clusterName, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get cluster name for work %s/%s", work.Namespace, work.Name) + return controllerruntime.Result{Requeue: true}, err + } + + if !work.DeletionTimestamp.IsZero() { + if err := c.cleanWorkWithMCSDelete(work); err != nil { + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil + } + + if err = c.buildResourceInformers(ctx, work, clusterName); err != nil { + return controllerruntime.Result{Requeue: true}, err + } + + if err = c.collectTargetEndpointSlice(work, clusterName); err != nil { + return controllerruntime.Result{Requeue: true}, err + } + + return controllerruntime.Result{}, nil +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime.Manager) error { + return controllerruntime.NewControllerManagedBy(mgr). + For(&workv1alpha1.Work{}, builder.WithPredicates(c.PredicateFunc)).Complete(c) +} + +func (c *EndpointSliceCollectController) cleanWorkWithMCSDelete(work *workv1alpha1.Work) error { + workList := &workv1alpha1.WorkList{} + if err := c.List(context.TODO(), workList, &client.ListOptions{ + Namespace: work.Namespace, + LabelSelector: labels.SelectorFromSet(labels.Set{ + util.ServiceNameLabel: util.GetLabelValue(work.Labels, util.ServiceNameLabel), + util.ServiceNamespaceLabel: util.GetLabelValue(work.Labels, util.ServiceNamespaceLabel), + }), + }); err != nil { + klog.Errorf("Failed to list workList reported by work(MultiClusterService)(%s/%s): %v", work.Namespace, work.Name, err) + return err + } + + var errs []error + for _, work := range workList.Items { + if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) { + continue + } + if err := c.Delete(context.TODO(), work.DeepCopy()); err != nil { + klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err) + errs = append(errs, err) + } + } + if err := utilerrors.NewAggregate(errs); err != nil { + return err + } + + if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) { + if err := c.Client.Update(context.Background(), work); err != nil { + klog.Errorf("Failed to remove finalizer %s for work %s/%s: %v", + util.MCSEndpointSliceCollectControllerFinalizer, work.Namespace, work.Name, err) + return err + } + } + + return nil +} + +// RunWorkQueue initializes worker and run it, worker will process resource asynchronously. +func (c *EndpointSliceCollectController) RunWorkQueue() { + workerOptions := util.Options{ + Name: "endpointslice-collect", + KeyFunc: nil, + ReconcileFunc: c.syncEndpointSlice, + } + c.worker = util.NewAsyncWorker(workerOptions) + c.worker.Run(c.WorkerNumber, c.StopChan) +} + +func (c *EndpointSliceCollectController) syncEndpointSlice(key util.QueueKey) error { + fedKey, ok := key.(keys.FederatedKey) + if !ok { + klog.Errorf("Failed to sync endpointslice as invalid key: %v", key) + return fmt.Errorf("invalid key") + } + + klog.V(4).Infof("Begin to sync %s %s.", fedKey.Kind, fedKey.NamespaceKey()) + if err := c.handleEndpointSliceEvent(fedKey); err != nil { + klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v", + fedKey.NamespaceKey(), err) + return err + } + + return nil +} + +func (c *EndpointSliceCollectController) buildResourceInformers(ctx context.Context, work *workv1alpha1.Work, clusterName string) error { + cluster, err := util.GetCluster(c.Client, clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return err + } + + if !util.IsClusterReady(&cluster.Status) { + klog.Errorf("Stop sync endpointslice for cluster(%s) as cluster not ready.", cluster.Name) + return fmt.Errorf("cluster(%s) not ready", cluster.Name) + } + + if err := c.registerInformersAndStart(cluster); err != nil { + klog.Errorf("Failed to register informer for Cluster %s. Error: %v.", cluster.Name, err) + return err + } + + if controllerutil.AddFinalizer(work, util.MCSEndpointSliceCollectControllerFinalizer) { + if err := c.Client.Update(ctx, work); err != nil { + klog.Errorf("Failed to add finalizer %s for work %s/%s: %v", util.MCSEndpointSliceCollectControllerFinalizer, work.Namespace, work.Name, err) + return err + } + } + + return nil +} + +// registerInformersAndStart builds informer manager for cluster if it doesn't exist, then constructs informers for gvr +// and start it. +func (c *EndpointSliceCollectController) registerInformersAndStart(cluster *clusterv1alpha1.Cluster) error { + singleClusterInformerManager := c.InformerManager.GetSingleClusterManager(cluster.Name) + if singleClusterInformerManager == nil { + dynamicClusterClient, err := c.ClusterDynamicClientSetFunc(cluster.Name, c.Client) + if err != nil { + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", cluster.Name) + return err + } + singleClusterInformerManager = c.InformerManager.ForCluster(dynamicClusterClient.ClusterName, dynamicClusterClient.DynamicClientSet, 0) + } + + gvrTargets := []schema.GroupVersionResource{ + endpointSliceGVR, + } + + allSynced := true + for _, gvr := range gvrTargets { + if !singleClusterInformerManager.IsInformerSynced(gvr) || !singleClusterInformerManager.IsHandlerExist(gvr, c.getEventHandler(cluster.Name)) { + allSynced = false + singleClusterInformerManager.ForResource(gvr, c.getEventHandler(cluster.Name)) + } + } + if allSynced { + return nil + } + + c.InformerManager.Start(cluster.Name) + + if err := func() error { + synced := c.InformerManager.WaitForCacheSyncWithTimeout(cluster.Name, c.ClusterCacheSyncTimeout.Duration) + if synced == nil { + return fmt.Errorf("no informerFactory for cluster %s exist", cluster.Name) + } + for _, gvr := range gvrTargets { + if !synced[gvr] { + return fmt.Errorf("informer for %s hasn't synced", gvr) + } + } + return nil + }(); err != nil { + klog.Errorf("Failed to sync cache for cluster: %s, error: %v", cluster.Name, err) + c.InformerManager.Stop(cluster.Name) + return err + } + + return nil +} + +// getEventHandler return callback function that knows how to handle events from the member cluster. +func (c *EndpointSliceCollectController) getEventHandler(clusterName string) cache.ResourceEventHandler { + if value, exists := c.eventHandlers.Load(clusterName); exists { + return value.(cache.ResourceEventHandler) + } + + eventHandler := fedinformer.NewHandlerOnEvents(c.genHandlerAddFunc(clusterName), c.genHandlerUpdateFunc(clusterName), + c.genHandlerDeleteFunc(clusterName)) + c.eventHandlers.Store(clusterName, eventHandler) + return eventHandler +} + +func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}) { + return func(obj interface{}) { + curObj := obj.(runtime.Object) + key, err := keys.FederatedKeyFunc(clusterName, curObj) + if err != nil { + klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind()) + return + } + c.worker.Add(key) + } +} + +func (c *EndpointSliceCollectController) genHandlerUpdateFunc(clusterName string) func(oldObj, newObj interface{}) { + return func(oldObj, newObj interface{}) { + curObj := newObj.(runtime.Object) + if !reflect.DeepEqual(oldObj, newObj) { + key, err := keys.FederatedKeyFunc(clusterName, curObj) + if err != nil { + klog.Warningf("Failed to generate key for obj: %s", curObj.GetObjectKind().GroupVersionKind()) + return + } + c.worker.Add(key) + } + } +} + +func (c *EndpointSliceCollectController) genHandlerDeleteFunc(clusterName string) func(obj interface{}) { + return func(obj interface{}) { + if deleted, ok := obj.(cache.DeletedFinalStateUnknown); ok { + // This object might be stale but ok for our current usage. + obj = deleted.Obj + if obj == nil { + return + } + } + oldObj := obj.(runtime.Object) + key, err := keys.FederatedKeyFunc(clusterName, oldObj) + if err != nil { + klog.Warningf("Failed to generate key for obj: %s", oldObj.GetObjectKind().GroupVersionKind()) + return + } + c.worker.Add(key) + } +} + +// handleEndpointSliceEvent syncs EndPointSlice objects to control-plane according to EndpointSlice event. +// For EndpointSlice create or update event, reports the EndpointSlice when referencing service has been exported. +// For EndpointSlice delete event, cleanup the previously reported EndpointSlice. +func (c *EndpointSliceCollectController) handleEndpointSliceEvent(endpointSliceKey keys.FederatedKey) error { + endpointSliceObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, endpointSliceKey) + if err != nil { + if apierrors.IsNotFound(err) { + return cleanupWorkWithEndpointSliceDelete(c.Client, endpointSliceKey) + } + return err + } + + if util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue { + return nil + } + + workList := &workv1alpha1.WorkList{} + if err := c.Client.List(context.TODO(), workList, &client.ListOptions{ + Namespace: names.GenerateExecutionSpaceName(endpointSliceKey.Cluster), + LabelSelector: labels.SelectorFromSet(labels.Set{ + util.ServiceNamespaceLabel: endpointSliceKey.Namespace, + util.ServiceNameLabel: util.GetLabelValue(endpointSliceObj.GetLabels(), discoveryv1.LabelServiceName), + })}); err != nil { + klog.Errorf("Failed to list workList reported by endpointSlice(%s/%s), error: %v", endpointSliceKey.Namespace, endpointSliceKey.Name, err) + return err + } + + mcsExists := false + for _, work := range workList.Items { + if helper.IsWorkContains(work.Spec.Workload.Manifests, multiClusterServiceGVK) { + mcsExists = true + break + } + } + if !mcsExists { + return nil + } + + if err = c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(endpointSliceKey.Cluster, endpointSliceObj); err != nil { + klog.Errorf("Failed to handle endpointSlice(%s) event, Error: %v", + endpointSliceKey.NamespaceKey(), err) + return err + } + + return nil +} + +func (c *EndpointSliceCollectController) collectTargetEndpointSlice(work *workv1alpha1.Work, clusterName string) error { + manager := c.InformerManager.GetSingleClusterManager(clusterName) + if manager == nil { + err := fmt.Errorf("failed to get informer manager for cluster %s", clusterName) + klog.Errorf("%v", err) + return err + } + + svcNamespace := util.GetLabelValue(work.Labels, util.ServiceNamespaceLabel) + svcName := util.GetLabelValue(work.Labels, util.ServiceNameLabel) + selector := labels.SelectorFromSet(labels.Set{ + discoveryv1.LabelServiceName: svcName, + }) + epsList, err := manager.Lister(discoveryv1.SchemeGroupVersion.WithResource("endpointslices")).ByNamespace(svcNamespace).List(selector) + if err != nil { + klog.Errorf("Failed to list EndpointSlice for Service(%s/%s) in cluster(%s), Error: %v", svcNamespace, svcName, clusterName, err) + return err + } + for _, epsObj := range epsList { + eps := &discoveryv1.EndpointSlice{} + if err = helper.ConvertToTypedObject(epsObj, eps); err != nil { + klog.Errorf("Failed to convert object to EndpointSlice, error: %v", err) + return err + } + if util.GetLabelValue(eps.GetLabels(), discoveryv1.LabelManagedBy) == util.EndpointSliceControllerLabelValue { + continue + } + epsUnstructured, err := helper.ToUnstructured(eps) + if err != nil { + klog.Errorf("Failed to convert EndpointSlice %s/%s to unstructured, error: %v", eps.GetNamespace(), eps.GetName(), err) + return err + } + if err := c.reportEndpointSliceWithEndpointSliceCreateOrUpdate(clusterName, epsUnstructured); err != nil { + return err + } + } + + return nil +} + +// reportEndpointSliceWithEndpointSliceCreateOrUpdate reports the EndpointSlice when referencing service has been exported. +func (c *EndpointSliceCollectController) reportEndpointSliceWithEndpointSliceCreateOrUpdate(clusterName string, endpointSlice *unstructured.Unstructured) error { + if err := reportEndpointSlice(c.Client, endpointSlice, clusterName); err != nil { + return fmt.Errorf("failed to report EndpointSlice(%s/%s) from cluster(%s) to control-plane", + endpointSlice.GetNamespace(), endpointSlice.GetName(), clusterName) + } + + return nil +} + +// reportEndpointSlice report EndPointSlice objects to control-plane. +func reportEndpointSlice(c client.Client, endpointSlice *unstructured.Unstructured, clusterName string) error { + executionSpace := names.GenerateExecutionSpaceName(clusterName) + + workMeta := metav1.ObjectMeta{ + // Karmada will synchronize this work to other cluster namespaces and add the cluster name to prevent conflicts. + Name: names.GenerateMCSWorkName(endpointSlice.GetKind(), endpointSlice.GetName(), endpointSlice.GetNamespace(), clusterName), + Namespace: executionSpace, + Labels: map[string]string{ + util.ServiceNamespaceLabel: endpointSlice.GetNamespace(), + util.ServiceNameLabel: endpointSlice.GetLabels()[discoveryv1.LabelServiceName], + // indicate the Work should be not propagated since it's collected resource. + util.PropagationInstruction: util.PropagationInstructionSuppressed, + util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue, + }, + } + + if err := helper.CreateOrUpdateWork(c, workMeta, endpointSlice); err != nil { + klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err) + return err + } + + return nil +} + +func cleanupWorkWithEndpointSliceDelete(c client.Client, endpointSliceKey keys.FederatedKey) error { + executionSpace := names.GenerateExecutionSpaceName(endpointSliceKey.Cluster) + + workNamespaceKey := types.NamespacedName{ + Namespace: executionSpace, + Name: names.GenerateMCSWorkName(endpointSliceKey.Kind, endpointSliceKey.Name, endpointSliceKey.Namespace, endpointSliceKey.Cluster), + } + work := &workv1alpha1.Work{} + if err := c.Get(context.TODO(), workNamespaceKey, work); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + klog.Errorf("Failed to get work(%s) in executionSpace(%s): %v", workNamespaceKey.String(), executionSpace, err) + return err + } + + if err := c.Delete(context.TODO(), work); err != nil { + klog.Errorf("Failed to delete work(%s): %v", workNamespaceKey.String(), err) + return err + } + + return nil +} diff --git a/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go new file mode 100644 index 000000000..c6c3ac055 --- /dev/null +++ b/pkg/controllers/multiclusterservice/endpointslice_dispatch_controller.go @@ -0,0 +1,280 @@ +/* +Copyright 2023 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiclusterservice + +import ( + "context" + "strconv" + + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" + "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/names" +) + +// EndpointsliceDispatchControllerName is the controller name that will be used when reporting events. +const EndpointsliceDispatchControllerName = "endpointslice-dispatch-controller" + +var ( + endpointSliceGVK = discoveryv1.SchemeGroupVersion.WithKind("EndpointSlice") +) + +type EndpointsliceDispatchController struct { + client.Client + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + InformerManager genericmanager.MultiClusterInformerManager +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +func (c *EndpointsliceDispatchController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("Reconciling Work %s", req.NamespacedName.String()) + + work := &workv1alpha1.Work{} + if err := c.Client.Get(ctx, req.NamespacedName, work); err != nil { + if apierrors.IsNotFound(err) { + return controllerruntime.Result{}, nil + } + return controllerruntime.Result{Requeue: true}, err + } + + if !helper.IsWorkContains(work.Spec.Workload.Manifests, endpointSliceGVK) { + return controllerruntime.Result{}, nil + } + + if !work.DeletionTimestamp.IsZero() { + if err := c.cleanupEndpointSliceFromConsumerClusters(ctx, work); err != nil { + klog.Errorf("Failed to cleanup EndpointSlice from consumer clusters for work %s/%s", work.Namespace, work.Name) + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil + } + + mcsName := util.GetLabelValue(work.Labels, util.ServiceNameLabel) + mcsNS := util.GetLabelValue(work.Labels, util.ServiceNamespaceLabel) + mcs := &networkingv1alpha1.MultiClusterService{} + if err := c.Client.Get(ctx, types.NamespacedName{Namespace: mcsNS, Name: mcsName}, mcs); err != nil { + if apierrors.IsNotFound(err) { + klog.Warningf("MultiClusterService %s/%s is not found", mcsNS, mcsName) + return controllerruntime.Result{}, nil + } + return controllerruntime.Result{Requeue: true}, err + } + + var err error + defer func() { + if err != nil { + _ = c.updateEndpointSliceSynced(mcs, metav1.ConditionFalse, "EndpointSliceSyncFailed", err.Error()) + c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventReasonDispatchEndpointSliceFailed, err.Error()) + return + } + _ = c.updateEndpointSliceSynced(mcs, metav1.ConditionTrue, "EndpointSliceSyncSucceed", "EndpointSlice are synced successfully") + c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventReasonDispatchEndpointSliceSucceed, "EndpointSlice are synced successfully") + }() + + if err = c.syncEndpointSlice(ctx, work.DeepCopy(), mcs); err != nil { + return controllerruntime.Result{Requeue: true}, err + } + + return controllerruntime.Result{}, nil +} + +func (c *EndpointsliceDispatchController) updateEndpointSliceSynced(mcs *networkingv1alpha1.MultiClusterService, status metav1.ConditionStatus, reason, message string) error { + EndpointSliceCollected := metav1.Condition{ + Type: networkingv1alpha1.EndpointSliceDispatched, + Status: status, + Reason: reason, + Message: message, + LastTransitionTime: metav1.Now(), + } + + return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { + meta.SetStatusCondition(&mcs.Status.Conditions, EndpointSliceCollected) + updateErr := c.Status().Update(context.TODO(), mcs) + if updateErr == nil { + return nil + } + updated := &networkingv1alpha1.MultiClusterService{} + if err = c.Get(context.TODO(), client.ObjectKey{Namespace: mcs.Namespace, Name: mcs.Name}, updated); err == nil { + mcs = updated + } else { + klog.Errorf("Failed to get updated MultiClusterService %s/%s: %v", mcs.Namespace, mcs.Name, err) + } + return updateErr + }) +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *EndpointsliceDispatchController) SetupWithManager(mgr controllerruntime.Manager) error { + workPredicateFun := predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return util.GetLabelValue(createEvent.Object.GetLabels(), util.ServiceNameLabel) != "" + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + return util.GetLabelValue(updateEvent.ObjectNew.GetLabels(), util.ServiceNameLabel) != "" + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + return util.GetLabelValue(deleteEvent.Object.GetLabels(), util.ServiceNameLabel) != "" + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } + return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.Work{}, builder.WithPredicates(workPredicateFun)).Complete(c) +} + +func (c *EndpointsliceDispatchController) syncEndpointSlice(ctx context.Context, work *workv1alpha1.Work, mcs *networkingv1alpha1.MultiClusterService) error { + epsSourceCluster, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get EndpointSlice source cluster name for work %s/%s", work.Namespace, work.Name) + return err + } + + consumptionClusters := sets.New[string](mcs.Spec.ServiceConsumptionClusters...) + if len(consumptionClusters) == 0 { + consumptionClusters, err = util.GetClusterSet(c.Client) + if err != nil { + klog.Errorf("Failed to get cluster set, error is: %v", err) + return err + } + } + for clusterName := range consumptionClusters { + if clusterName == epsSourceCluster { + continue + } + + // It couldn't happen here + if len(work.Spec.Workload.Manifests) == 0 { + continue + } + + // There should be only one manifest in the work, let's use the first one. + manifest := work.Spec.Workload.Manifests[0] + unstructuredObj := &unstructured.Unstructured{} + if err := unstructuredObj.UnmarshalJSON(manifest.Raw); err != nil { + klog.Errorf("Failed to unmarshal work manifest, error is: %v", err) + return err + } + + endpointSlice := &discoveryv1.EndpointSlice{} + if err := helper.ConvertToTypedObject(unstructuredObj, endpointSlice); err != nil { + klog.Errorf("Failed to convert unstructured object to typed object, error is: %v", err) + return err + } + + // Use this name to avoid naming conflicts and locate the EPS source cluster. + endpointSlice.Name = epsSourceCluster + "-" + endpointSlice.Name + clusterNamespace := names.GenerateExecutionSpaceName(clusterName) + endpointSlice.Labels = map[string]string{ + discoveryv1.LabelServiceName: mcs.Name, + workv1alpha1.WorkNamespaceLabel: clusterNamespace, + workv1alpha1.WorkNameLabel: work.Name, + util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue, + discoveryv1.LabelManagedBy: util.EndpointSliceControllerLabelValue, + } + endpointSlice.Annotations = map[string]string{ + // This annotation is used to identify the source cluster of EndpointSlice and whether the eps are the newest version + util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster, + util.EndPointSliceProvisionGenerationAnnotation: strconv.FormatInt(endpointSlice.Generation, 10), + } + + workMeta := metav1.ObjectMeta{ + Name: work.Name, + Namespace: clusterNamespace, + Finalizers: []string{util.ExecutionControllerFinalizer}, + Annotations: map[string]string{ + util.EndpointSliceProvisionClusterAnnotation: epsSourceCluster, + }, + Labels: map[string]string{ + util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue, + }, + } + unstructuredEPS, err := helper.ToUnstructured(endpointSlice) + if err != nil { + klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err) + return err + } + if err := helper.CreateOrUpdateWork(c.Client, workMeta, unstructuredEPS); err != nil { + klog.Errorf("Failed to sync EndpointSlice %s/%s from %s to cluster %s:%v", + work.GetNamespace(), work.GetName(), epsSourceCluster, clusterName, err) + return err + } + } + + if controllerutil.AddFinalizer(work, util.MCSEndpointSliceDispatchControllerFinalizer) { + if err := c.Client.Update(ctx, work); err != nil { + klog.Errorf("Failed to add finalizer %s for work %s/%s:%v", util.MCSEndpointSliceDispatchControllerFinalizer, work.Namespace, work.Name, err) + return err + } + } + + return nil +} + +func (c *EndpointsliceDispatchController) cleanupEndpointSliceFromConsumerClusters(ctx context.Context, work *workv1alpha1.Work) error { + // TBD: There may be a better way without listing all works. + workList := &workv1alpha1.WorkList{} + err := c.Client.List(ctx, workList) + if err != nil { + klog.Errorf("Failed to list works serror: %v", err) + return err + } + + epsSourceCluster, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get EndpointSlice provision cluster name for work %s/%s", work.Namespace, work.Name) + return err + } + for _, item := range workList.Items { + if item.Name != work.Name || util.GetAnnotationValue(item.Annotations, util.EndpointSliceProvisionClusterAnnotation) != epsSourceCluster { + continue + } + if err := c.Client.Delete(ctx, item.DeepCopy()); err != nil { + return err + } + } + + if controllerutil.RemoveFinalizer(work, util.MCSEndpointSliceDispatchControllerFinalizer) { + if err := c.Client.Update(ctx, work); err != nil { + klog.Errorf("Failed to remove %s finalizer for work %s/%s:%v", util.MCSEndpointSliceDispatchControllerFinalizer, work.Namespace, work.Name, err) + return err + } + } + + return nil +} diff --git a/pkg/controllers/multiclusterservice/mcs_controller.go b/pkg/controllers/multiclusterservice/mcs_controller.go index 63abc4b69..e274e44eb 100644 --- a/pkg/controllers/multiclusterservice/mcs_controller.go +++ b/pkg/controllers/multiclusterservice/mcs_controller.go @@ -19,9 +19,7 @@ package multiclusterservice import ( "context" "fmt" - "strings" - "github.com/google/uuid" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -65,6 +63,10 @@ type MCSController struct { RateLimiterOptions ratelimiterflag.Options } +var ( + serviceGVK = corev1.SchemeGroupVersion.WithKind("Service") +) + // Reconcile performs a full reconciliation for the object referred to by the Request. // The Controller will requeue the Request to be processed again if an error is non-nil or // Result.Requeue is true, otherwise upon completion it will remove the work from the queue. @@ -110,6 +112,12 @@ func (c *MCSController) handleMCSDelete(ctx context.Context, mcs *networkingv1al return controllerruntime.Result{}, err } + if err := c.deleteMultiClusterServiceWork(mcs, true); err != nil { + c.EventRecorder.Event(mcs, corev1.EventTypeWarning, events.EventReasonSyncServiceWorkFailed, + fmt.Sprintf("failed to delete MultiClusterService work :%v", err)) + return controllerruntime.Result{}, err + } + finalizersUpdated := controllerutil.RemoveFinalizer(mcs, util.MCSControllerFinalizer) if finalizersUpdated { err := c.Client.Update(ctx, mcs) @@ -125,27 +133,31 @@ func (c *MCSController) handleMCSDelete(ctx context.Context, mcs *networkingv1al } func (c *MCSController) deleteServiceWork(mcs *networkingv1alpha1.MultiClusterService, retainClusters sets.Set[string]) error { - mcsID, err := c.getMultiClusterServiceID(mcs) - if err != nil { - klog.Errorf("Get MultiClusterService(%s/%s) ID error:%v", mcs.Namespace, mcs.Name, err) - return err - } - - workList, err := helper.GetWorksByLabelsSet(c, labels.Set{networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID}) + mcsID := util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel) + workList, err := helper.GetWorksByLabelsSet(c, labels.Set{ + networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID, + }) if err != nil { klog.ErrorS(err, "failed to get work", "namespace", mcs.Namespace, "name", mcs.Name) return err } - for index := range workList.Items { - clusterName := strings.TrimPrefix(workList.Items[index].Namespace, names.ExecutionSpacePrefix) + for _, work := range workList.Items { + if !helper.IsWorkContains(work.Spec.Workload.Manifests, serviceGVK) { + continue + } + clusterName, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", work.Namespace, work.Name) + continue + } if retainClusters.Has(clusterName) { continue } - if err = c.Client.Delete(context.TODO(), &workList.Items[index]); err != nil && !apierrors.IsNotFound(err) { - klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s", - workList.Items[index].Namespace, workList.Items[index].Name, err) + if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) { + klog.Errorf("Error while deleting work(%s/%s) deletion timestamp: %s", + work.Namespace, work.Name, err) return err } } @@ -154,13 +166,58 @@ func (c *MCSController) deleteServiceWork(mcs *networkingv1alpha1.MultiClusterSe return nil } +func (c *MCSController) deleteMultiClusterServiceWork(mcs *networkingv1alpha1.MultiClusterService, deleteAll bool) error { + mcsID := util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel) + workList, err := helper.GetWorksByLabelsSet(c, labels.Set{ + networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID, + }) + if err != nil { + klog.Errorf("Failed to list work by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err) + return err + } + + provisionClusters, err := helper.GetProvisionClusters(c.Client, mcs) + if err != nil { + klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err) + return err + } + + for _, work := range workList.Items { + if !helper.IsWorkContains(work.Spec.Workload.Manifests, multiClusterServiceGVK) { + continue + } + clusterName, err := names.GetClusterName(work.Namespace) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s:%v", work.Namespace, work.Name, work) + continue + } + if !deleteAll && provisionClusters.Has(clusterName) { + continue + } + + if err = c.Client.Delete(context.TODO(), work.DeepCopy()); err != nil && !apierrors.IsNotFound(err) { + klog.Errorf("Error while updating work(%s/%s) deletion timestamp: %s", + work.Namespace, work.Name, err) + return err + } + } + + klog.V(4).InfoS("Success to delete MultiClusterService work", "namespace", mcs.Namespace, "name", mcs.Name) + return nil +} + func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error { klog.V(4).InfoS("Begin to handle MultiClusterService create or update event", "namespace", mcs.Namespace, "name", mcs.Name) // 1. if mcs not contain CrossCluster type, delete service work if needed if !helper.MultiClusterServiceCrossClusterEnabled(mcs) { - return c.deleteServiceWork(mcs, sets.New[string]()) + if err := c.deleteServiceWork(mcs, sets.New[string]()); err != nil { + return err + } + if err := c.deleteMultiClusterServiceWork(mcs, true); err != nil { + return err + } } // 2. add finalizer if needed @@ -173,7 +230,12 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ } } - // 3. make sure service exist + // 3. Generate the MCS work + if err := c.ensureMultiClusterServiceWork(ctx, mcs); err != nil { + return err + } + + // 4. make sure service exist svc := &corev1.Service{} err := c.Client.Get(ctx, types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}, svc) if err != nil && !apierrors.IsNotFound(err) { @@ -181,7 +243,7 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ return err } - // 4. if service not exist, delete service work if needed + // 5. if service not exist, delete service work if needed if apierrors.IsNotFound(err) { delErr := c.deleteServiceWork(mcs, sets.New[string]()) if delErr != nil { @@ -192,13 +254,18 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ return err } - // 5. if service exist, create or update corresponding work in clusters + // 6. if service exist, create or update corresponding work in clusters syncClusters, err := c.syncSVCWorkToClusters(ctx, mcs, svc) if err != nil { return err } - // 6. delete service work not in need sync clusters + // 7. delete MultiClusterService work not in provision clusters + if err = c.deleteMultiClusterServiceWork(mcs, false); err != nil { + return err + } + + // 8. delete service work not in need sync clusters if err = c.deleteServiceWork(mcs, syncClusters); err != nil { return err } @@ -207,6 +274,42 @@ func (c *MCSController) handleMCSCreateOrUpdate(ctx context.Context, mcs *networ return nil } +func (c *MCSController) ensureMultiClusterServiceWork(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error { + provisionCluster, err := helper.GetProvisionClusters(c.Client, mcs) + if err != nil { + klog.Errorf("Failed to get provision clusters by MultiClusterService(%s/%s):%v", mcs.Namespace, mcs.Name, err) + return err + } + + for clusterName := range provisionCluster { + workMeta := metav1.ObjectMeta{ + Name: names.GenerateMCSWorkName(mcs.Kind, mcs.Name, mcs.Namespace, clusterName), + Namespace: names.GenerateExecutionSpaceName(clusterName), + Labels: map[string]string{ + // We add this id in mutating webhook, let's just use it + networkingv1alpha1.MultiClusterServicePermanentIDLabel: util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel), + util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue, + util.PropagationInstruction: util.PropagationInstructionSuppressed, + util.ServiceNamespaceLabel: mcs.Namespace, + util.ServiceNameLabel: mcs.Name, + }, + } + + mcsObj, err := helper.ToUnstructured(mcs) + if err != nil { + klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err) + return err + } + if err = helper.CreateOrUpdateWork(c, workMeta, mcsObj); err != nil { + klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v", + mcs.Namespace, mcs.Name, clusterName, err) + return err + } + } + + return nil +} + func (c *MCSController) syncSVCWorkToClusters( ctx context.Context, mcs *networkingv1alpha1.MultiClusterService, @@ -235,20 +338,15 @@ func (c *MCSController) syncSVCWorkToClusters( return syncClusters, err } - mcsID, err := c.getMultiClusterServiceID(mcs) - if err != nil { - klog.Errorf("Get MultiClusterService(%s/%s) ID error:%v", mcs.Namespace, mcs.Name, err) - return syncClusters, err - } - var errs []error for clusterName := range syncClusters { workMeta := metav1.ObjectMeta{ - Name: names.GenerateWorkName(svc.Kind, svc.Name, clusterName+"/"+svc.Namespace), + Name: names.GenerateMCSWorkName(svc.Kind, svc.Name, svc.Namespace, clusterName), Namespace: names.GenerateExecutionSpaceName(clusterName), Finalizers: []string{util.ExecutionControllerFinalizer}, Labels: map[string]string{ - networkingv1alpha1.MultiClusterServicePermanentIDLabel: mcsID, + // We add this id in mutating webhook, let's just use it + networkingv1alpha1.MultiClusterServicePermanentIDLabel: util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel), util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue, }, } @@ -294,19 +392,6 @@ func (c *MCSController) updateMCSStatus(mcs *networkingv1alpha1.MultiClusterServ }) } -func (c *MCSController) getMultiClusterServiceID(mcs *networkingv1alpha1.MultiClusterService) (string, error) { - id := util.GetLabelValue(mcs.GetLabels(), networkingv1alpha1.MultiClusterServicePermanentIDLabel) - if id == "" { - id = uuid.New().String() - mcs.Labels = util.DedupeAndMergeLabels(mcs.Labels, map[string]string{networkingv1alpha1.MultiClusterServicePermanentIDLabel: id}) - if err := c.Client.Update(context.TODO(), mcs); err != nil { - return id, err - } - } - - return id, nil -} - // SetupWithManager creates a controller and register to controller manager. func (c *MCSController) SetupWithManager(mgr controllerruntime.Manager) error { mcsPredicateFunc := predicate.Funcs{ diff --git a/pkg/events/events.go b/pkg/events/events.go index eb7853818..b6ec5f55e 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -130,10 +130,14 @@ const ( EventReasonSyncDerivedServiceFailed = "SyncDerivedServiceFailed" ) -// Define events for MultiClusterService objects and their associated resources. +// Define events for MultiClusterService objects with CrossCluster type. const ( // EventReasonSyncServiceWorkFailed is indicates that sync service work failed. EventReasonSyncServiceWorkFailed string = "SyncServiceWorkFailed" // EventReasonSyncServiceWorkSucceed is indicates that sync service work succeed. EventReasonSyncServiceWorkSucceed string = "SyncServiceWorkSucceed" + // EventReasonDispatchEndpointSliceFailed indicates that dispatch endpointslice failed. + EventReasonDispatchEndpointSliceFailed = "DispatchEndpointSliceFailed" + // EventReasonDispatchEndpointSliceSucceed indicates that dispatch endpointslice succeed. + EventReasonDispatchEndpointSliceSucceed = "DispatchEndpointSliceSucceed" ) diff --git a/pkg/util/cluster.go b/pkg/util/cluster.go index f4ea1d3f0..22e8fa7a7 100644 --- a/pkg/util/cluster.go +++ b/pkg/util/cluster.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -102,6 +103,19 @@ func GetCluster(hostClient client.Client, clusterName string) (*clusterv1alpha1. return cluster, nil } +// GetClusterWithKubeClient returns the given Clusters name set +func GetClusterSet(hostClient client.Client) (sets.Set[string], error) { + clusterList := &clusterv1alpha1.ClusterList{} + if err := hostClient.List(context.Background(), clusterList); err != nil { + return nil, err + } + clusterSet := sets.New[string]() + for _, cluster := range clusterList.Items { + clusterSet.Insert(cluster.Name) + } + return clusterSet, nil +} + // CreateClusterObject create cluster object in karmada control plane func CreateClusterObject(controlPlaneClient karmadaclientset.Interface, clusterObj *clusterv1alpha1.Cluster) (*clusterv1alpha1.Cluster, error) { cluster, exist, err := GetClusterWithKarmadaClient(controlPlaneClient, clusterObj.Name) diff --git a/pkg/util/constants.go b/pkg/util/constants.go index f3caaa12e..71dfb1fc3 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -25,6 +25,12 @@ const ( // ServiceNameLabel is added to work object, which is report by member cluster, to specify service name associated with EndpointSlice. ServiceNameLabel = "endpointslice.karmada.io/name" + // EndPointSliceProvisionClusterAnnotation is added to EndpointSlice to specify the cluster which cluster provides the EndpointSlice. + EndpointSliceProvisionClusterAnnotation = "endpointslice.karmada.io/provision-cluster" + + // EndPointSliceProvisionGenerationAnnotation is added to EndpointSlice to specify the generation of EndpointSlice of the provision cluster. + EndPointSliceProvisionGenerationAnnotation = "endpointslice.karmada.io/endpointslice-generation" + // PropagationInstruction is used to mark a resource(like Work) propagation instruction. // Valid values includes: // - suppressed: indicates that the resource should not be propagated. @@ -45,6 +51,9 @@ const ( // ManagedByKarmadaLabelValue indicates that resources are managed by karmada controllers. ManagedByKarmadaLabelValue = "true" + // EndpointSliceControllerLabelValue indicates the endpointSlice are controlled by karmada-mcs-endpointslice-controller + EndpointSliceControllerLabelValue = "karmada-mcs-endpointslice-controller" + // RetainReplicasLabel is a reserved label to indicate whether the replicas should be retained. e.g: // resourcetemplate.karmada.io/retain-replicas: true // with value `true` indicates retain // resourcetemplate.karmada.io/retain-replicas: false // with value `false` and others, indicates not retain @@ -86,6 +95,12 @@ const ( // before ResourceBinding itself is deleted. BindingControllerFinalizer = "karmada.io/binding-controller" + // MCSEndpointSliceCollectControllerFinalizer is added to mcs to ensure related Works in provision clusters are deleted + MCSEndpointSliceCollectControllerFinalizer = "karmada.io/mcs-endpointslice-collect-controller" + + // MCSEndpointSliceDispatchControllerFinalizer is added to mcs to ensure related Works in consumption clusters are deleted + MCSEndpointSliceDispatchControllerFinalizer = "karmada.io/mcs-endpointslice-dispatch-controller" + // ClusterResourceBindingControllerFinalizer is added to ClusterResourceBinding to ensure related Works are deleted // before ClusterResourceBinding itself is deleted. ClusterResourceBindingControllerFinalizer = "karmada.io/cluster-resource-binding-controller" diff --git a/pkg/util/helper/mcs.go b/pkg/util/helper/mcs.go index 4bf3c5614..aeb627fe7 100644 --- a/pkg/util/helper/mcs.go +++ b/pkg/util/helper/mcs.go @@ -23,12 +23,14 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" ) // CreateOrUpdateEndpointSlice creates a EndpointSlice object if not exist, or updates if it already exists. @@ -94,12 +96,25 @@ func DeleteEndpointSlice(c client.Client, selector labels.Set) error { return errors.NewAggregate(errs) } -// MultiClusterServiceCrossClusterEnabled checks weather the MultiClusterService contains CrossCluster type. func MultiClusterServiceCrossClusterEnabled(mcs *networkingv1alpha1.MultiClusterService) bool { - for _, t := range mcs.Spec.Types { - if t == networkingv1alpha1.ExposureTypeCrossCluster { + for _, svcType := range mcs.Spec.Types { + if svcType == networkingv1alpha1.ExposureTypeCrossCluster { return true } } + return false } + +func GetProvisionClusters(client client.Client, mcs *networkingv1alpha1.MultiClusterService) (sets.Set[string], error) { + provisionClusters := sets.New[string](mcs.Spec.ServiceProvisionClusters...) + if len(provisionClusters) == 0 { + var err error + provisionClusters, err = util.GetClusterSet(client) + if err != nil { + klog.Errorf("Failed to get cluster set, Error: %v", err) + return nil, err + } + } + return provisionClusters, nil +} diff --git a/pkg/util/helper/predicate.go b/pkg/util/helper/predicate.go index 2b0100f00..317deec80 100644 --- a/pkg/util/helper/predicate.go +++ b/pkg/util/helper/predicate.go @@ -114,6 +114,40 @@ func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predi } } +// NewPredicateForEndpointSliceCollectController generates an event filter function for EndpointSliceCollectController running by karmada-controller-manager. +func NewPredicateForEndpointSliceCollectController(mgr controllerruntime.Manager) predicate.Funcs { + predFunc := func(eventType string, object client.Object) bool { + obj := object.(*workv1alpha1.Work) + clusterName, err := names.GetClusterName(obj.GetNamespace()) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName()) + return false + } + + clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return false + } + return clusterObj.Spec.SyncMode == clusterv1alpha1.Push + } + + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return predFunc("create", createEvent.Object) + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + return predFunc("update", updateEvent.ObjectNew) || predFunc("update", updateEvent.ObjectOld) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + return predFunc("delete", deleteEvent.Object) + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } +} + // NewClusterPredicateOnAgent generates an event filter function with Cluster for karmada-agent. func NewClusterPredicateOnAgent(clusterName string) predicate.Funcs { return predicate.Funcs{ @@ -166,6 +200,34 @@ func NewPredicateForServiceExportControllerOnAgent(curClusterName string) predic } } +// NewPredicateForEndpointSliceCollectControllerOnAgent generates an event filter function for EndpointSliceCollectController running by karmada-agent. +func NewPredicateForEndpointSliceCollectControllerOnAgent(curClusterName string) predicate.Funcs { + predFunc := func(eventType string, object client.Object) bool { + obj := object.(*workv1alpha1.Work) + clusterName, err := names.GetClusterName(obj.GetNamespace()) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", obj.GetNamespace(), obj.GetName()) + return false + } + return clusterName == curClusterName + } + + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return predFunc("create", createEvent.Object) + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + return predFunc("update", updateEvent.ObjectNew) || predFunc("update", updateEvent.ObjectOld) + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + return predFunc("delete", deleteEvent.Object) + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } +} + // NewExecutionPredicateOnAgent generates the event filter function to skip events that the controllers are uninterested. // Used by controllers: // - execution controller working in agent diff --git a/pkg/util/helper/work.go b/pkg/util/helper/work.go index e181e73c6..06d50aa0c 100644 --- a/pkg/util/helper/work.go +++ b/pkg/util/helper/work.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -178,3 +179,20 @@ func GenEventRef(resource *unstructured.Unstructured) (*corev1.ObjectReference, return ref, nil } + +// IsWorkContains checks if the target resource exists in a work.spec.workload.manifests. +func IsWorkContains(manifests []workv1alpha1.Manifest, targetResource schema.GroupVersionKind) bool { + for index := range manifests { + workload := &unstructured.Unstructured{} + err := workload.UnmarshalJSON(manifests[index].Raw) + if err != nil { + klog.Errorf("Failed to unmarshal work manifests index %d, error is: %v", index, err) + continue + } + + if targetResource == workload.GroupVersionKind() { + return true + } + } + return false +} diff --git a/pkg/util/names/names.go b/pkg/util/names/names.go index 30ca752f4..e11bb5acf 100644 --- a/pkg/util/names/names.go +++ b/pkg/util/names/names.go @@ -89,6 +89,10 @@ func GenerateBindingReferenceKey(namespace, name string) string { return rand.SafeEncodeString(fmt.Sprint(hash.Sum32())) } +func GenerateMCSWorkName(kind, name, namespace, cluster string) string { + return GenerateWorkName(kind, name, cluster+"/"+namespace) +} + // GenerateWorkName will generate work name by its name and the hash of its namespace, kind and name. func GenerateWorkName(kind, name, namespace string) string { // The name of resources, like 'Role'/'ClusterRole'/'RoleBinding'/'ClusterRoleBinding', diff --git a/pkg/webhook/multiclusterservice/mutating.go b/pkg/webhook/multiclusterservice/mutating.go new file mode 100644 index 000000000..1f16c191e --- /dev/null +++ b/pkg/webhook/multiclusterservice/mutating.go @@ -0,0 +1,59 @@ +/* +Copyright 2023 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiclusterservice + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/google/uuid" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" +) + +// MutatingAdmission mutates API request if necessary. +type MutatingAdmission struct { + Decoder *admission.Decoder +} + +// Check if our MutatingAdmission implements necessary interface +var _ admission.Handler = &MutatingAdmission{} + +// Handle yields a response to an AdmissionRequest. +func (a *MutatingAdmission) Handle(_ context.Context, req admission.Request) admission.Response { + mcs := &networkingv1alpha1.MultiClusterService{} + + err := a.Decoder.Decode(req, mcs) + if err != nil { + return admission.Errored(http.StatusBadRequest, err) + } + + if util.GetLabelValue(mcs.Labels, networkingv1alpha1.MultiClusterServicePermanentIDLabel) == "" { + id := uuid.New().String() + mcs.Labels = util.DedupeAndMergeLabels(mcs.Labels, map[string]string{networkingv1alpha1.MultiClusterServicePermanentIDLabel: id}) + } + + marshaledBytes, err := json.Marshal(mcs) + if err != nil { + return admission.Errored(http.StatusInternalServerError, err) + } + + return admission.PatchResponseFromRaw(req.Object.Raw, marshaledBytes) +}