diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 55b98d334..b38e9c671 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -139,12 +139,13 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSetForAgent, resourceInterpreter) executionController := &execution.Controller{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName), - RESTMapper: mgr.GetRESTMapper(), - ObjectWatcher: objectWatcher, - PredicateFunc: helper.NewExecutionPredicateOnAgent(), - InformerManager: informermanager.GetInstance(), + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(execution.ControllerName), + RESTMapper: mgr.GetRESTMapper(), + ObjectWatcher: objectWatcher, + PredicateFunc: helper.NewExecutionPredicateOnAgent(), + InformerManager: informermanager.GetInstance(), + ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent, } if err := executionController.SetupWithManager(mgr); err != nil { klog.Fatalf("Failed to setup execution controller: %v", err) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 944be151f..8ae28bcb5 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -259,12 +259,13 @@ func startBindingController(ctx ControllerContext) (enabled bool, err error) { func startExecutionController(ctx ControllerContext) (enabled bool, err error) { executionController := &execution.Controller{ - Client: ctx.Mgr.GetClient(), - EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), - RESTMapper: ctx.Mgr.GetRESTMapper(), - ObjectWatcher: ctx.ObjectWatcher, - PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), - InformerManager: informermanager.GetInstance(), + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(execution.ControllerName), + RESTMapper: ctx.Mgr.GetRESTMapper(), + ObjectWatcher: ctx.ObjectWatcher, + PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr), + InformerManager: informermanager.GetInstance(), + ClusterClientSetFunc: util.NewClusterDynamicClientSet, } if err := executionController.SetupWithManager(ctx.Mgr); err != nil { return false, err diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 89ccf7eb5..b3c39a87c 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -34,12 +34,13 @@ const ( // Controller is to sync Work. type Controller struct { - client.Client // used to operate Work resources. - EventRecorder record.EventRecorder - RESTMapper meta.RESTMapper - ObjectWatcher objectwatcher.ObjectWatcher - PredicateFunc predicate.Predicate - InformerManager informermanager.MultiClusterInformerManager + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder + RESTMapper meta.RESTMapper + ObjectWatcher objectwatcher.ObjectWatcher + PredicateFunc predicate.Predicate + InformerManager informermanager.MultiClusterInformerManager + ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -69,23 +70,27 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques klog.Errorf("Failed to get the given member cluster %s", clusterName) return controllerruntime.Result{Requeue: true}, err } - if !util.IsClusterReady(&cluster.Status) { - klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) - return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) - } if !work.DeletionTimestamp.IsZero() { - applied := helper.IsResourceApplied(&work.Status) - if applied { + // Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed. + if util.IsClusterReady(&cluster.Status) { err := c.tryDeleteWorkload(clusterName, work) if err != nil { klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) return controllerruntime.Result{Requeue: true}, err } + } else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating + return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) } + return c.removeFinalizer(work) } + if !util.IsClusterReady(&cluster.Status) { + klog.Errorf("Stop sync work(%s/%s) for cluster(%s) as cluster not ready.", work.Namespace, work.Name, cluster.Name) + return controllerruntime.Result{Requeue: true}, fmt.Errorf("cluster(%s) not ready", cluster.Name) + } + return c.syncWork(clusterName, work) } @@ -113,7 +118,6 @@ func (c *Controller) syncWork(clusterName string, work *workv1alpha1.Work) (cont } // tryDeleteWorkload tries to delete resource in the given member cluster. -// Abort deleting when the member cluster is unready, otherwise we can't unjoin the member cluster when the member cluster is unready func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Work) error { for _, manifest := range work.Spec.Workload.Manifests { workload := &unstructured.Unstructured{} @@ -123,6 +127,27 @@ func (c *Controller) tryDeleteWorkload(clusterName string, work *workv1alpha1.Wo return err } + fedKey, err := keys.FederatedKeyFunc(clusterName, workload) + if err != nil { + klog.Errorf("Failed to get FederatedKey %s, error: %v", workload.GetName(), err) + return err + } + + clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey, c.Client, c.ClusterClientSetFunc) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err) + return err + } + + // Avoid deleting resources that not managed by karmada. + if util.GetLabelValue(clusterObj.GetLabels(), workv1alpha1.WorkNameLabel) != util.GetLabelValue(workload.GetLabels(), workv1alpha1.WorkNameLabel) { + klog.Infof("Abort deleting the resource(kind=%s, %s/%s) exists in cluster %v but not managed by karamda", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), clusterName) + return nil + } + err = c.ObjectWatcher.Delete(clusterName, workload) if err != nil { klog.Errorf("Failed to delete resource in the given member cluster %v, err is %v", clusterName, err) @@ -206,7 +231,7 @@ func (c *Controller) tryUpdateWorkload(clusterName string, workload *unstructure return err } - clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey) + clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey, c.Client, c.ClusterClientSetFunc) if err != nil { if !apierrors.IsNotFound(err) { klog.Errorf("Failed to get resource %v from member cluster, err is %v ", workload.GetName(), err) diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index 69265c254..e7bf199c0 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -264,7 +264,7 @@ func (c *ServiceExportController) genHandlerDeleteFunc(clusterName string) func( // For ServiceExport create or update event, reports the referencing service's EndpointSlice. // For ServiceExport delete event, cleanup the previously reported EndpointSlice. func (c *ServiceExportController) handleServiceExportEvent(serviceExportKey keys.FederatedKey) error { - _, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, serviceExportKey) + _, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, serviceExportKey, c.Client, c.ClusterDynamicClientSetFunc) if err != nil { if apierrors.IsNotFound(err) { return cleanupWorkWithServiceExportDelete(c.Client, serviceExportKey) @@ -289,7 +289,7 @@ func (c *ServiceExportController) handleServiceExportEvent(serviceExportKey keys // For EndpointSlice create or update event, reports the EndpointSlice when referencing service has been exported. // For EndpointSlice delete event, cleanup the previously reported EndpointSlice. func (c *ServiceExportController) handleEndpointSliceEvent(endpointSliceKey keys.FederatedKey) error { - endpointSliceObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, endpointSliceKey) + endpointSliceObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, endpointSliceKey, c.Client, c.ClusterDynamicClientSetFunc) if err != nil { if apierrors.IsNotFound(err) { return cleanupWorkWithEndpointSliceDelete(c.Client, endpointSliceKey) diff --git a/pkg/controllers/status/workstatus_controller.go b/pkg/controllers/status/workstatus_controller.go index dbff435c8..6680bd83c 100644 --- a/pkg/controllers/status/workstatus_controller.go +++ b/pkg/controllers/status/workstatus_controller.go @@ -159,7 +159,7 @@ func (c *WorkStatusController) syncWorkStatus(key util.QueueKey) error { return fmt.Errorf("invalid key") } - obj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey) + obj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey, c.Client, c.ClusterClientSetFunc) if err != nil { if apierrors.IsNotFound(err) { return c.handleDeleteEvent(fedKey) diff --git a/pkg/util/helper/cache.go b/pkg/util/helper/cache.go index 8621d7ce3..0906a46b8 100644 --- a/pkg/util/helper/cache.go +++ b/pkg/util/helper/cache.go @@ -1,22 +1,29 @@ package helper import ( + "context" "fmt" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/informermanager/keys" "github.com/karmada-io/karmada/pkg/util/restmapper" ) +type clusterDynamicClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error) + // GetObjectFromCache gets full object information from cache by key in worker queue. -func GetObjectFromCache(restMapper meta.RESTMapper, manager informermanager.MultiClusterInformerManager, - fedKey keys.FederatedKey) (*unstructured.Unstructured, error) { +func GetObjectFromCache(restMapper meta.RESTMapper, manager informermanager.MultiClusterInformerManager, fedKey keys.FederatedKey, + client client.Client, clientSetFunc clusterDynamicClientSetFunc) (*unstructured.Unstructured, error) { gvr, err := restmapper.GetGroupVersionResource(restMapper, fedKey.GroupVersionKind()) if err != nil { klog.Errorf("Failed to get GVR from GVK %s. Error: %v", fedKey.GroupVersionKind(), err) @@ -31,6 +38,12 @@ func GetObjectFromCache(restMapper meta.RESTMapper, manager informermanager.Mult // Usually this error will be eliminated during the controller reconciling loop. return nil, fmt.Errorf("the informer of cluster(%s) has not been initialized", fedKey.Cluster) } + + if !singleClusterManager.IsInformerSynced(gvr) { + // fall back to call api server in case the cache has not been synchronized yet + return getObjectFromMemberCluster(gvr, fedKey, client, clientSetFunc) + } + var obj runtime.Object lister := singleClusterManager.Lister(gvr) obj, err = lister.Get(fedKey.NamespaceKey()) @@ -41,8 +54,24 @@ func GetObjectFromCache(restMapper meta.RESTMapper, manager informermanager.Mult // print logs only for real error. klog.Errorf("Failed to get obj %s. error: %v.", fedKey.String(), err) - return nil, err } return obj.(*unstructured.Unstructured), nil } + +// getObjectFromMemberCluster will try to get resource from member cluster by DynamicClientSet. +func getObjectFromMemberCluster(gvr schema.GroupVersionResource, fedKey keys.FederatedKey, client client.Client, + clientSetFunc clusterDynamicClientSetFunc) (*unstructured.Unstructured, error) { + dynamicClusterClient, err := clientSetFunc(fedKey.Cluster, client) + if err != nil { + klog.Errorf("Failed to build dynamic cluster client for cluster %s.", fedKey.Cluster) + return nil, err + } + + existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(fedKey.Namespace).Get(context.TODO(), + fedKey.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return existObj, nil +}