karmada/pkg/controllers/namespace/namespace_sync_controller.go

274 lines
9.1 KiB
Go

package namespace
import (
"context"
"fmt"
"strings"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/controllers/binding"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/overridemanager"
)
const (
// ControllerName is the controller name that will be used when reporting events.
ControllerName = "namespace-sync-controller"
)
// Controller is to sync Work.
type Controller struct {
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
SkippedPropagatingNamespaces map[string]struct{}
OverrideManager overridemanager.OverrideManager
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Namespaces sync controller reconciling %s", req.NamespacedName.String())
if !c.namespaceShouldBeSynced(req.Name) {
return controllerruntime.Result{}, nil
}
namespace := &corev1.Namespace{}
if err := c.Client.Get(ctx, req.NamespacedName, namespace); err != nil {
// The resource may no longer exist, in which case we stop processing.
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}
if !namespace.DeletionTimestamp.IsZero() {
// Do nothing, just return as we have added owner reference to Work.
// Work will be removed automatically by garbage collector.
return controllerruntime.Result{}, nil
}
skipAutoPropagation := util.GetLabelValue(namespace.Labels, policyv1alpha1.NamespaceSkipAutoPropagationLabel)
if strings.ToLower(skipAutoPropagation) == "true" {
klog.Infof("Skip auto propagation namespace:%s", namespace.Name)
return controllerruntime.Result{}, nil
}
clusterList := &clusterv1alpha1.ClusterList{}
if err := c.Client.List(ctx, clusterList); err != nil {
klog.Errorf("Failed to list clusters, error: %v", err)
return controllerruntime.Result{Requeue: true}, err
}
err := c.buildWorks(namespace, clusterList.Items)
if err != nil {
klog.Errorf("Failed to build work for namespace %s. Error: %v.", namespace.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
func (c *Controller) namespaceShouldBeSynced(namespace string) bool {
if names.IsReservedNamespace(namespace) || namespace == names.NamespaceDefault {
return false
}
if _, ok := c.SkippedPropagatingNamespaces[namespace]; ok {
return false
}
return true
}
func (c *Controller) buildWorks(namespace *corev1.Namespace, clusters []clusterv1alpha1.Cluster) error {
namespaceObj, err := helper.ToUnstructured(namespace)
if err != nil {
klog.Errorf("Failed to transform namespace %s. Error: %v", namespace.GetName(), err)
return err
}
errChan := make(chan error, len(clusters))
for i := range clusters {
go func(cluster *clusterv1alpha1.Cluster, ch chan<- error) {
clonedNamespaced := namespaceObj.DeepCopy()
// namespace only care about ClusterOverridePolicy
cops, _, err := c.OverrideManager.ApplyOverridePolicies(clonedNamespaced, cluster.Name)
if err != nil {
klog.Errorf("Failed to apply overrides for %s/%s/%s, err is: %v", clonedNamespaced.GetKind(), clonedNamespaced.GetNamespace(), clonedNamespaced.GetName(), err)
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
}
annotations, err := binding.RecordAppliedOverrides(cops, nil, nil)
if err != nil {
klog.Errorf("failed to record appliedOverrides, Error: %v", err)
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
}
workNamespace, err := names.GenerateExecutionSpaceName(cluster.Name)
if err != nil {
klog.Errorf("Failed to generate execution space name for member cluster %s, err is %v", cluster.Name, err)
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
}
workName := names.GenerateWorkName(namespaceObj.GetKind(), namespaceObj.GetName(), namespaceObj.GetNamespace())
objectMeta := metav1.ObjectMeta{
Name: workName,
Namespace: workNamespace,
Finalizers: []string{util.ExecutionControllerFinalizer},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(namespace, namespace.GroupVersionKind()),
},
Annotations: annotations,
}
util.MergeLabel(clonedNamespaced, workv1alpha1.WorkNamespaceLabel, workNamespace)
util.MergeLabel(clonedNamespaced, workv1alpha1.WorkNameLabel, workName)
if err = helper.CreateOrUpdateWork(c.Client, objectMeta, clonedNamespaced); err != nil {
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
}
ch <- nil
}(&clusters[i], errChan)
}
errs := []error{}
for range clusters {
if err := <-errChan; err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}
// SetupWithManager creates a controller and register to controller manager.
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
clusterNamespaceFn := handler.MapFunc(
func(a client.Object) []reconcile.Request {
var requests []reconcile.Request
namespaceList := &corev1.NamespaceList{}
if err := c.Client.List(context.TODO(), namespaceList); err != nil {
klog.Errorf("Failed to list namespace, error: %v", err)
return nil
}
for _, namespace := range namespaceList.Items {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Name: namespace.Name,
}})
}
return requests
})
clusterPredicate := builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
return false
},
DeleteFunc: func(event.DeleteEvent) bool {
return false
},
GenericFunc: func(event.GenericEvent) bool {
return false
},
})
clusterOverridePolicyNamespaceFn := handler.MapFunc(
func(obj client.Object) []reconcile.Request {
var requests []reconcile.Request
cop, ok := obj.(*policyv1alpha1.ClusterOverridePolicy)
if !ok {
return requests
}
selectedNamespaces := sets.NewString()
containsAllNamespace := false
for _, rs := range cop.Spec.ResourceSelectors {
if rs.APIVersion != "v1" || rs.Kind != "Namespace" {
continue
}
if rs.Name == "" {
containsAllNamespace = true
break
}
selectedNamespaces.Insert(rs.Name)
}
if containsAllNamespace {
namespaceList := &corev1.NamespaceList{}
if err := c.Client.List(context.TODO(), namespaceList); err != nil {
klog.Errorf("Failed to list namespace, error: %v", err)
return nil
}
for _, namespace := range namespaceList.Items {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Name: namespace.Name,
}})
}
return requests
}
for _, ns := range selectedNamespaces.UnsortedList() {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{
Name: ns,
}})
}
return requests
})
clusterOverridePolicyPredicate := builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
return true
},
DeleteFunc: func(event.DeleteEvent) bool {
return true
},
GenericFunc: func(event.GenericEvent) bool {
return false
},
})
return controllerruntime.NewControllerManagedBy(mgr).
For(&corev1.Namespace{}).
Watches(&source.Kind{Type: &clusterv1alpha1.Cluster{}},
handler.EnqueueRequestsFromMapFunc(clusterNamespaceFn),
clusterPredicate).
Watches(&source.Kind{Type: &policyv1alpha1.ClusterOverridePolicy{}},
handler.EnqueueRequestsFromMapFunc(clusterOverridePolicyNamespaceFn),
clusterOverridePolicyPredicate).
Complete(c)
}