diff --git a/charts/_crds/kustomization.yaml b/charts/_crds/kustomization.yaml index d511e6e53..df0b6fbdb 100644 --- a/charts/_crds/kustomization.yaml +++ b/charts/_crds/kustomization.yaml @@ -3,6 +3,7 @@ resources: - bases/multicluster.x-k8s.io_serviceimports.yaml - bases/policy.karmada.io_clusteroverridepolicies.yaml - bases/policy.karmada.io_clusterpropagationpolicies.yaml +- bases/policy.karmada.io_federatedresourcequotas.yaml - bases/policy.karmada.io_overridepolicies.yaml - bases/policy.karmada.io_propagationpolicies.yaml - bases/work.karmada.io_resourcebindings.yaml diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 95fcb2a24..1f589a0a1 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -30,6 +30,7 @@ import ( "github.com/karmada-io/karmada/pkg/controllers/cluster" controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context" "github.com/karmada-io/karmada/pkg/controllers/execution" + "github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota" "github.com/karmada-io/karmada/pkg/controllers/hpa" "github.com/karmada-io/karmada/pkg/controllers/mcs" "github.com/karmada-io/karmada/pkg/controllers/namespace" @@ -141,6 +142,7 @@ func init() { controllers["endpointSlice"] = startEndpointSliceController controllers["serviceImport"] = startServiceImportController controllers["unifiedAuth"] = startUnifiedAuthController + controllers["federatedResourceQuotaSync"] = startFederatedResourceQuotaSyncController } func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) { @@ -353,6 +355,17 @@ func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, e return true, nil } +func startFederatedResourceQuotaSyncController(ctx controllerscontext.Context) (enabled bool, err error) { + controller := federatedresourcequota.SyncController{ + Client: ctx.Mgr.GetClient(), + EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.SyncControllerName), + } + if err = controller.SetupWithManager(ctx.Mgr); err != nil { + return false, err + } + return true, nil +} + // setupControllers initialize controllers and setup one by one. func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) { restConfig := mgr.GetConfig() diff --git a/pkg/controllers/federatedresourcequota/federated_resource_quota_sync_controller.go b/pkg/controllers/federatedresourcequota/federated_resource_quota_sync_controller.go new file mode 100644 index 000000000..8603bc66e --- /dev/null +++ b/pkg/controllers/federatedresourcequota/federated_resource_quota_sync_controller.go @@ -0,0 +1,207 @@ +package federatedresourcequota + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/names" +) + +const ( + // SyncControllerName is the controller name that will be used when reporting events. + SyncControllerName = "federated-resource-quota-sync-controller" +) + +// SyncController is to sync FederatedResourceQuota. +type SyncController struct { + client.Client // used to operate Work resources. + EventRecorder record.EventRecorder +} + +// Reconcile performs a full reconciliation for the object referred to by the Request. +// The SyncController will requeue the Request to be processed again if an error is non-nil or +// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. +func (c *SyncController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) { + klog.V(4).Infof("FederatedResourceQuota sync controller reconciling %s", req.NamespacedName.String()) + + quota := &policyv1alpha1.FederatedResourceQuota{} + if err := c.Client.Get(context.TODO(), req.NamespacedName, quota); err != nil { + if apierrors.IsNotFound(err) { + klog.V(4).Infof("Begin to cleanup works created by federatedResourceQuota(%s)", req.NamespacedName.String()) + if err = c.cleanUpWorks(req.Namespace, req.Name); err != nil { + klog.Errorf("Failed to cleanup works created by federatedResourceQuota(%s)", req.NamespacedName.String()) + return controllerruntime.Result{Requeue: true}, err + } + return controllerruntime.Result{}, nil + } + return controllerruntime.Result{Requeue: true}, err + } + + clusterList := &clusterv1alpha1.ClusterList{} + if err := c.Client.List(context.TODO(), clusterList); err != nil { + klog.Errorf("Failed to list clusters, error: %v", err) + return controllerruntime.Result{Requeue: true}, err + } + + if err := c.buildWorks(quota, clusterList.Items); err != nil { + klog.Errorf("Failed to build works for federatedResourceQuota(%s), error: %v", req.NamespacedName.String(), err) + return controllerruntime.Result{Requeue: true}, err + } + + return controllerruntime.Result{}, nil +} + +// SetupWithManager creates a controller and register to controller manager. +func (c *SyncController) SetupWithManager(mgr controllerruntime.Manager) error { + fn := handler.MapFunc( + func(client.Object) []reconcile.Request { + var requests []reconcile.Request + + FederatedResourceQuotaList := &policyv1alpha1.FederatedResourceQuotaList{} + if err := c.Client.List(context.TODO(), FederatedResourceQuotaList); err != nil { + klog.Errorf("Failed to list FederatedResourceQuota, error: %v", err) + } + + for _, federatedResourceQuota := range FederatedResourceQuotaList.Items { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: federatedResourceQuota.Namespace, + Name: federatedResourceQuota.Name, + }, + }) + } + + return requests + }, + ) + + clusterPredicate := builder.WithPredicates(predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + UpdateFunc: func(e event.UpdateEvent) bool { + return false + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return false + }, + GenericFunc: func(e event.GenericEvent) bool { + return false + }, + }) + + return controllerruntime.NewControllerManagedBy(mgr). + For(&policyv1alpha1.FederatedResourceQuota{}). + Watches(&source.Kind{Type: &clusterv1alpha1.Cluster{}}, handler.EnqueueRequestsFromMapFunc(fn), clusterPredicate). + Complete(c) +} + +func (c *SyncController) cleanUpWorks(namespace, name string) error { + var errs []error + workList := &workv1alpha1.WorkList{} + if err := c.List(context.TODO(), workList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet( + labels.Set{ + util.FederatedResourceQuotaNamespaceLabel: namespace, + util.FederatedResourceQuotaNameLabel: name, + }), + }); err != nil { + klog.Errorf("Failed to list works, err: %v", err) + return err + } + + for index := range workList.Items { + work := &workList.Items[index] + if err := c.Delete(context.TODO(), work); err != nil && !apierrors.IsNotFound(err) { + klog.Errorf("Failed to delete work(%s): %v", klog.KObj(work).String(), err) + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return errors.NewAggregate(errs) + } + return nil +} + +func (c *SyncController) buildWorks(quota *policyv1alpha1.FederatedResourceQuota, clusters []clusterv1alpha1.Cluster) error { + var errs []error + for _, cluster := range clusters { + workNamespace, err := names.GenerateExecutionSpaceName(cluster.Name) + if err != nil { + klog.Errorf("Failed to generate execution space name for cluster(%s), error: %v", cluster.Name, err) + errs = append(errs, err) + continue + } + workName := names.GenerateWorkName("ResourceQuota", quota.Name, quota.Namespace) + + resourceQuota := &corev1.ResourceQuota{} + resourceQuota.APIVersion = "v1" + resourceQuota.Kind = "ResourceQuota" + resourceQuota.Namespace = quota.Namespace + resourceQuota.Name = quota.Name + resourceQuota.Labels = map[string]string{ + workv1alpha2.WorkNamespaceLabel: workNamespace, + workv1alpha2.WorkNameLabel: workName, + } + resourceQuota.Spec.Hard = extractClusterHardResourceList(quota.Spec, cluster.Name) + + resourceQuotaObj, err := helper.ToUnstructured(resourceQuota) + if err != nil { + klog.Errorf("Failed to transform resourceQuota(%s), error: %v", klog.KObj(resourceQuota).String(), err) + errs = append(errs, err) + continue + } + + objectMeta := metav1.ObjectMeta{ + Namespace: workNamespace, + Name: workName, + Finalizers: []string{util.ExecutionControllerFinalizer}, + Labels: map[string]string{ + util.FederatedResourceQuotaNamespaceLabel: quota.Namespace, + util.FederatedResourceQuotaNameLabel: quota.Name, + }, + } + + err = helper.CreateOrUpdateWork(c.Client, objectMeta, resourceQuotaObj) + if err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return errors.NewAggregate(errs) + } + return nil +} + +func extractClusterHardResourceList(spec policyv1alpha1.FederatedResourceQuotaSpec, cluster string) corev1.ResourceList { + for index := range spec.StaticAssignments { + if spec.StaticAssignments[index].ClusterName == cluster { + return spec.StaticAssignments[index].Hard + } + } + return nil +} diff --git a/pkg/util/constants.go b/pkg/util/constants.go index 26c50fbd3..49e625ae4 100644 --- a/pkg/util/constants.go +++ b/pkg/util/constants.go @@ -16,6 +16,12 @@ const ( // Note: This instruction is intended to set on Work objects to indicate the Work should be ignored by // execution controller. The instruction maybe deprecated once we extend the Work API and no other scenario want this. PropagationInstruction = "propagation.karmada.io/instruction" + + // FederatedResourceQuotaNamespaceLabel is added to Work to specify associated FederatedResourceQuota's namespace. + FederatedResourceQuotaNamespaceLabel = "federatedresourcequota.karmada.io/namespace" + + // FederatedResourceQuotaNameLabel is added to Work to specify associated FederatedResourceQuota's name. + FederatedResourceQuotaNameLabel = "federatedresourcequota.karmada.io/name" ) // Define annotations used by karmada system.