Merge pull request #1398 from XiShanYongYe-Chang/resource-quota
Add FederatedResourceQuota sync controller
This commit is contained in:
commit
016595f2a5
|
@ -3,6 +3,7 @@ resources:
|
|||
- bases/multicluster.x-k8s.io_serviceimports.yaml
|
||||
- bases/policy.karmada.io_clusteroverridepolicies.yaml
|
||||
- bases/policy.karmada.io_clusterpropagationpolicies.yaml
|
||||
- bases/policy.karmada.io_federatedresourcequotas.yaml
|
||||
- bases/policy.karmada.io_overridepolicies.yaml
|
||||
- bases/policy.karmada.io_propagationpolicies.yaml
|
||||
- bases/work.karmada.io_resourcebindings.yaml
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/controllers/cluster"
|
||||
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/execution"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/federatedresourcequota"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/hpa"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/mcs"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/namespace"
|
||||
|
@ -141,6 +142,7 @@ func init() {
|
|||
controllers["endpointSlice"] = startEndpointSliceController
|
||||
controllers["serviceImport"] = startServiceImportController
|
||||
controllers["unifiedAuth"] = startUnifiedAuthController
|
||||
controllers["federatedResourceQuotaSync"] = startFederatedResourceQuotaSyncController
|
||||
}
|
||||
|
||||
func startClusterController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
|
@ -353,6 +355,17 @@ func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, e
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func startFederatedResourceQuotaSyncController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
controller := federatedresourcequota.SyncController{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
EventRecorder: ctx.Mgr.GetEventRecorderFor(federatedresourcequota.SyncControllerName),
|
||||
}
|
||||
if err = controller.SetupWithManager(ctx.Mgr); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// setupControllers initialize controllers and setup one by one.
|
||||
func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stopChan <-chan struct{}) {
|
||||
restConfig := mgr.GetConfig()
|
||||
|
|
|
@ -0,0 +1,207 @@
|
|||
package federatedresourcequota
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog/v2"
|
||||
controllerruntime "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
)
|
||||
|
||||
const (
|
||||
// SyncControllerName is the controller name that will be used when reporting events.
|
||||
SyncControllerName = "federated-resource-quota-sync-controller"
|
||||
)
|
||||
|
||||
// SyncController is to sync FederatedResourceQuota.
|
||||
type SyncController struct {
|
||||
client.Client // used to operate Work resources.
|
||||
EventRecorder record.EventRecorder
|
||||
}
|
||||
|
||||
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
||||
// The SyncController will requeue the Request to be processed again if an error is non-nil or
|
||||
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
|
||||
func (c *SyncController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
||||
klog.V(4).Infof("FederatedResourceQuota sync controller reconciling %s", req.NamespacedName.String())
|
||||
|
||||
quota := &policyv1alpha1.FederatedResourceQuota{}
|
||||
if err := c.Client.Get(context.TODO(), req.NamespacedName, quota); err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(4).Infof("Begin to cleanup works created by federatedResourceQuota(%s)", req.NamespacedName.String())
|
||||
if err = c.cleanUpWorks(req.Namespace, req.Name); err != nil {
|
||||
klog.Errorf("Failed to cleanup works created by federatedResourceQuota(%s)", req.NamespacedName.String())
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
return controllerruntime.Result{}, nil
|
||||
}
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
clusterList := &clusterv1alpha1.ClusterList{}
|
||||
if err := c.Client.List(context.TODO(), clusterList); err != nil {
|
||||
klog.Errorf("Failed to list clusters, error: %v", err)
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
if err := c.buildWorks(quota, clusterList.Items); err != nil {
|
||||
klog.Errorf("Failed to build works for federatedResourceQuota(%s), error: %v", req.NamespacedName.String(), err)
|
||||
return controllerruntime.Result{Requeue: true}, err
|
||||
}
|
||||
|
||||
return controllerruntime.Result{}, nil
|
||||
}
|
||||
|
||||
// SetupWithManager creates a controller and register to controller manager.
|
||||
func (c *SyncController) SetupWithManager(mgr controllerruntime.Manager) error {
|
||||
fn := handler.MapFunc(
|
||||
func(client.Object) []reconcile.Request {
|
||||
var requests []reconcile.Request
|
||||
|
||||
FederatedResourceQuotaList := &policyv1alpha1.FederatedResourceQuotaList{}
|
||||
if err := c.Client.List(context.TODO(), FederatedResourceQuotaList); err != nil {
|
||||
klog.Errorf("Failed to list FederatedResourceQuota, error: %v", err)
|
||||
}
|
||||
|
||||
for _, federatedResourceQuota := range FederatedResourceQuotaList.Items {
|
||||
requests = append(requests, reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: federatedResourceQuota.Namespace,
|
||||
Name: federatedResourceQuota.Name,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return requests
|
||||
},
|
||||
)
|
||||
|
||||
clusterPredicate := builder.WithPredicates(predicate.Funcs{
|
||||
CreateFunc: func(e event.CreateEvent) bool {
|
||||
return true
|
||||
},
|
||||
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||
return false
|
||||
},
|
||||
DeleteFunc: func(e event.DeleteEvent) bool {
|
||||
return false
|
||||
},
|
||||
GenericFunc: func(e event.GenericEvent) bool {
|
||||
return false
|
||||
},
|
||||
})
|
||||
|
||||
return controllerruntime.NewControllerManagedBy(mgr).
|
||||
For(&policyv1alpha1.FederatedResourceQuota{}).
|
||||
Watches(&source.Kind{Type: &clusterv1alpha1.Cluster{}}, handler.EnqueueRequestsFromMapFunc(fn), clusterPredicate).
|
||||
Complete(c)
|
||||
}
|
||||
|
||||
func (c *SyncController) cleanUpWorks(namespace, name string) error {
|
||||
var errs []error
|
||||
workList := &workv1alpha1.WorkList{}
|
||||
if err := c.List(context.TODO(), workList, &client.ListOptions{
|
||||
LabelSelector: labels.SelectorFromSet(
|
||||
labels.Set{
|
||||
util.FederatedResourceQuotaNamespaceLabel: namespace,
|
||||
util.FederatedResourceQuotaNameLabel: name,
|
||||
}),
|
||||
}); err != nil {
|
||||
klog.Errorf("Failed to list works, err: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
for index := range workList.Items {
|
||||
work := &workList.Items[index]
|
||||
if err := c.Delete(context.TODO(), work); err != nil && !apierrors.IsNotFound(err) {
|
||||
klog.Errorf("Failed to delete work(%s): %v", klog.KObj(work).String(), err)
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.NewAggregate(errs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *SyncController) buildWorks(quota *policyv1alpha1.FederatedResourceQuota, clusters []clusterv1alpha1.Cluster) error {
|
||||
var errs []error
|
||||
for _, cluster := range clusters {
|
||||
workNamespace, err := names.GenerateExecutionSpaceName(cluster.Name)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to generate execution space name for cluster(%s), error: %v", cluster.Name, err)
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
workName := names.GenerateWorkName("ResourceQuota", quota.Name, quota.Namespace)
|
||||
|
||||
resourceQuota := &corev1.ResourceQuota{}
|
||||
resourceQuota.APIVersion = "v1"
|
||||
resourceQuota.Kind = "ResourceQuota"
|
||||
resourceQuota.Namespace = quota.Namespace
|
||||
resourceQuota.Name = quota.Name
|
||||
resourceQuota.Labels = map[string]string{
|
||||
workv1alpha2.WorkNamespaceLabel: workNamespace,
|
||||
workv1alpha2.WorkNameLabel: workName,
|
||||
}
|
||||
resourceQuota.Spec.Hard = extractClusterHardResourceList(quota.Spec, cluster.Name)
|
||||
|
||||
resourceQuotaObj, err := helper.ToUnstructured(resourceQuota)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to transform resourceQuota(%s), error: %v", klog.KObj(resourceQuota).String(), err)
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
objectMeta := metav1.ObjectMeta{
|
||||
Namespace: workNamespace,
|
||||
Name: workName,
|
||||
Finalizers: []string{util.ExecutionControllerFinalizer},
|
||||
Labels: map[string]string{
|
||||
util.FederatedResourceQuotaNamespaceLabel: quota.Namespace,
|
||||
util.FederatedResourceQuotaNameLabel: quota.Name,
|
||||
},
|
||||
}
|
||||
|
||||
err = helper.CreateOrUpdateWork(c.Client, objectMeta, resourceQuotaObj)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return errors.NewAggregate(errs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func extractClusterHardResourceList(spec policyv1alpha1.FederatedResourceQuotaSpec, cluster string) corev1.ResourceList {
|
||||
for index := range spec.StaticAssignments {
|
||||
if spec.StaticAssignments[index].ClusterName == cluster {
|
||||
return spec.StaticAssignments[index].Hard
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -16,6 +16,12 @@ const (
|
|||
// Note: This instruction is intended to set on Work objects to indicate the Work should be ignored by
|
||||
// execution controller. The instruction maybe deprecated once we extend the Work API and no other scenario want this.
|
||||
PropagationInstruction = "propagation.karmada.io/instruction"
|
||||
|
||||
// FederatedResourceQuotaNamespaceLabel is added to Work to specify associated FederatedResourceQuota's namespace.
|
||||
FederatedResourceQuotaNamespaceLabel = "federatedresourcequota.karmada.io/namespace"
|
||||
|
||||
// FederatedResourceQuotaNameLabel is added to Work to specify associated FederatedResourceQuota's name.
|
||||
FederatedResourceQuotaNameLabel = "federatedresourcequota.karmada.io/name"
|
||||
)
|
||||
|
||||
// Define annotations used by karmada system.
|
||||
|
|
Loading…
Reference in New Issue