karmada/pkg/controllers/federatedresourcequota/federated_resource_quota_sy...

207 lines
7.0 KiB
Go

package federatedresourcequota
import (
"context"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
)
const (
// SyncControllerName is the controller name that will be used when reporting events.
SyncControllerName = "federated-resource-quota-sync-controller"
)
// SyncController is to sync FederatedResourceQuota.
type SyncController struct {
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The SyncController will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *SyncController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("FederatedResourceQuota sync controller reconciling %s", req.NamespacedName.String())
quota := &policyv1alpha1.FederatedResourceQuota{}
if err := c.Client.Get(context.TODO(), req.NamespacedName, quota); err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Begin to cleanup works created by federatedResourceQuota(%s)", req.NamespacedName.String())
if err = c.cleanUpWorks(req.Namespace, req.Name); err != nil {
klog.Errorf("Failed to cleanup works created by federatedResourceQuota(%s)", req.NamespacedName.String())
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
clusterList := &clusterv1alpha1.ClusterList{}
if err := c.Client.List(context.TODO(), clusterList); err != nil {
klog.Errorf("Failed to list clusters, error: %v", err)
return controllerruntime.Result{Requeue: true}, err
}
if err := c.buildWorks(quota, clusterList.Items); err != nil {
klog.Errorf("Failed to build works for federatedResourceQuota(%s), error: %v", req.NamespacedName.String(), err)
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *SyncController) SetupWithManager(mgr controllerruntime.Manager) error {
fn := handler.MapFunc(
func(client.Object) []reconcile.Request {
var requests []reconcile.Request
FederatedResourceQuotaList := &policyv1alpha1.FederatedResourceQuotaList{}
if err := c.Client.List(context.TODO(), FederatedResourceQuotaList); err != nil {
klog.Errorf("Failed to list FederatedResourceQuota, error: %v", err)
}
for _, federatedResourceQuota := range FederatedResourceQuotaList.Items {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: federatedResourceQuota.Namespace,
Name: federatedResourceQuota.Name,
},
})
}
return requests
},
)
clusterPredicate := builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
return false
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
})
return controllerruntime.NewControllerManagedBy(mgr).
For(&policyv1alpha1.FederatedResourceQuota{}).
Watches(&source.Kind{Type: &clusterv1alpha1.Cluster{}}, handler.EnqueueRequestsFromMapFunc(fn), clusterPredicate).
Complete(c)
}
func (c *SyncController) cleanUpWorks(namespace, name string) error {
var errs []error
workList := &workv1alpha1.WorkList{}
if err := c.List(context.TODO(), workList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(
labels.Set{
util.FederatedResourceQuotaNamespaceLabel: namespace,
util.FederatedResourceQuotaNameLabel: name,
}),
}); err != nil {
klog.Errorf("Failed to list works, err: %v", err)
return err
}
for index := range workList.Items {
work := &workList.Items[index]
if err := c.Delete(context.TODO(), work); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete work(%s): %v", klog.KObj(work).String(), err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.NewAggregate(errs)
}
return nil
}
func (c *SyncController) buildWorks(quota *policyv1alpha1.FederatedResourceQuota, clusters []clusterv1alpha1.Cluster) error {
var errs []error
for _, cluster := range clusters {
workNamespace, err := names.GenerateExecutionSpaceName(cluster.Name)
if err != nil {
klog.Errorf("Failed to generate execution space name for cluster(%s), error: %v", cluster.Name, err)
errs = append(errs, err)
continue
}
workName := names.GenerateWorkName("ResourceQuota", quota.Name, quota.Namespace)
resourceQuota := &corev1.ResourceQuota{}
resourceQuota.APIVersion = "v1"
resourceQuota.Kind = "ResourceQuota"
resourceQuota.Namespace = quota.Namespace
resourceQuota.Name = quota.Name
resourceQuota.Labels = map[string]string{
workv1alpha1.WorkNamespaceLabel: workNamespace,
workv1alpha1.WorkNameLabel: workName,
}
resourceQuota.Spec.Hard = extractClusterHardResourceList(quota.Spec, cluster.Name)
resourceQuotaObj, err := helper.ToUnstructured(resourceQuota)
if err != nil {
klog.Errorf("Failed to transform resourceQuota(%s), error: %v", klog.KObj(resourceQuota).String(), err)
errs = append(errs, err)
continue
}
objectMeta := metav1.ObjectMeta{
Namespace: workNamespace,
Name: workName,
Finalizers: []string{util.ExecutionControllerFinalizer},
Labels: map[string]string{
util.FederatedResourceQuotaNamespaceLabel: quota.Namespace,
util.FederatedResourceQuotaNameLabel: quota.Name,
},
}
err = helper.CreateOrUpdateWork(c.Client, objectMeta, resourceQuotaObj)
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errors.NewAggregate(errs)
}
return nil
}
func extractClusterHardResourceList(spec policyv1alpha1.FederatedResourceQuotaSpec, cluster string) corev1.ResourceList {
for index := range spec.StaticAssignments {
if spec.StaticAssignments[index].ClusterName == cluster {
return spec.StaticAssignments[index].Hard
}
}
return nil
}