remove orphan work and binding (#62)

This commit is contained in:
Xianpao Chen 2020-12-09 11:48:45 +08:00 committed by GitHub
parent ce26d4acfb
commit bf84ff9001
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 329 additions and 156 deletions

View File

@ -116,6 +116,7 @@ func setupControllers(mgr controllerruntime.Manager) {
DynamicClient: dynamicClientSet,
KarmadaClient: karmadaClient,
EventRecorder: mgr.GetEventRecorderFor(policy.ControllerName),
RESTMapper: mgr.GetRESTMapper(),
}
if err := policyController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup policy controller: %v", err)
@ -126,6 +127,7 @@ func setupControllers(mgr controllerruntime.Manager) {
DynamicClient: dynamicClientSet,
KarmadaClient: karmadaClient,
EventRecorder: mgr.GetEventRecorderFor(binding.ControllerName),
RESTMapper: mgr.GetRESTMapper(),
}
if err := bindingController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup binding controller: %v", err)
@ -141,5 +143,4 @@ func setupControllers(mgr controllerruntime.Manager) {
if err := executionController.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup execution controller: %v", err)
}
}

View File

@ -5,9 +5,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
@ -15,9 +19,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/huawei-cloudnative/karmada/pkg/apis/propagationstrategy/v1alpha1"
"github.com/huawei-cloudnative/karmada/pkg/controllers/util"
karmadaclientset "github.com/huawei-cloudnative/karmada/pkg/generated/clientset/versioned"
"github.com/huawei-cloudnative/karmada/pkg/util"
"github.com/huawei-cloudnative/karmada/pkg/util/names"
"github.com/huawei-cloudnative/karmada/pkg/util/restmapper"
)
// ControllerName is the controller name that will be used when reporting events.
@ -31,13 +36,14 @@ type PropagationBindingController struct {
DynamicClient dynamic.Interface // used to fetch arbitrary resources.
KarmadaClient karmadaclientset.Interface // used to create/update PropagationWork resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *PropagationBindingController) Reconcile(req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling PropagationBinding %s", req.NamespacedName.String())
klog.V(4).Infof("Reconciling PropagationBinding %s.", req.NamespacedName.String())
binding := &v1alpha1.PropagationBinding{}
if err := c.Client.Get(context.TODO(), req.NamespacedName, binding); err != nil {
@ -55,20 +61,85 @@ func (c *PropagationBindingController) Reconcile(req controllerruntime.Request)
return controllerruntime.Result{}, nil
}
isReady := c.isBindingReady(binding)
if !isReady {
klog.Infof("propagationBinding %s/%s is not ready to sync", binding.GetNamespace(), binding.GetName())
return controllerruntime.Result{Requeue: true}, nil
}
return c.syncBinding(binding)
}
// syncBinding will sync propagationBinding to propagationWorks
// isBindingReady will check if propagationBinding is ready to build propagationWork.
func (c *PropagationBindingController) isBindingReady(binding *v1alpha1.PropagationBinding) bool {
return len(binding.Spec.Clusters) != 0
}
// syncBinding will sync propagationBinding to propagationWorks.
func (c *PropagationBindingController) syncBinding(binding *v1alpha1.PropagationBinding) (controllerruntime.Result, error) {
err := c.transformBindingToWorks(binding)
clusterNames := c.getBindingClusterNames(binding)
ownerLabel := names.GenerateOwnerLabelValue(binding.GetNamespace(), binding.GetName())
works, err := c.findOrphanWorks(ownerLabel, clusterNames)
if err != nil {
klog.Errorf("Failed to transform propagationBinding %s/%s to propagationWorks. Error: %+v",
binding.Namespace, binding.Name, err)
klog.Errorf("Failed to find orphan propagationWorks by propagationBinding %s/%s. Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}
err = c.removeOrphanWorks(works)
if err != nil {
klog.Errorf("Failed to remove orphan propagationWorks by propagationBinding %s/%s. Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}
err = c.transformBindingToWorks(binding, clusterNames)
if err != nil {
klog.Errorf("Failed to transform propagationBinding %s/%s to propagationWorks. Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
// removeOrphanBindings will remove orphan propagationWorks.
func (c *PropagationBindingController) removeOrphanWorks(works []v1alpha1.PropagationWork) error {
for _, work := range works {
err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationWorks(work.GetNamespace()).Delete(context.TODO(), work.GetName(), metav1.DeleteOptions{})
if err != nil {
return err
}
klog.Infof("Delete orphan propagationWork %s/%s successfully.", work.GetNamespace(), work.GetName())
}
return nil
}
// findOrphanWorks will find orphan propagationWorks that don't match current propagationBinding clusters.
func (c *PropagationBindingController) findOrphanWorks(ownerLabel string, clusterNames []string) ([]v1alpha1.PropagationWork, error) {
ownerLabelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{util.OwnerLabel: ownerLabel},
}
propagationWorkList, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationWorks(metav1.NamespaceAll).List(context.TODO(),
metav1.ListOptions{LabelSelector: labels.Set(ownerLabelSelector.MatchLabels).String()})
if err != nil {
return nil, err
}
var orphanWorks []v1alpha1.PropagationWork
expectClusters := sets.NewString(clusterNames...)
for _, work := range propagationWorkList.Items {
workTargetCluster, err := names.GetMemberClusterName(work.GetNamespace())
if err != nil {
klog.Errorf("Failed to get cluster name which PropagationWork %s/%s belongs to. Error: %v.",
work.GetNamespace(), work.GetName(), err)
return nil, err
}
if !expectClusters.Has(workTargetCluster) {
orphanWorks = append(orphanWorks, work)
}
}
return orphanWorks, nil
}
// SetupWithManager creates a controller and register to controller manager.
func (c *PropagationBindingController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.PropagationBinding{}).Complete(c)
@ -95,17 +166,22 @@ func (c *PropagationBindingController) removeIrrelevantField(workload *unstructu
}
// transformBindingToWorks will transform propagationBinding to propagationWorks
func (c *PropagationBindingController) transformBindingToWorks(binding *v1alpha1.PropagationBinding) error {
workload, err := util.GetUnstructured(c.DynamicClient, binding.Spec.Resource.APIVersion,
binding.Spec.Resource.Kind, binding.Spec.Resource.Namespace, binding.Spec.Resource.Name)
func (c *PropagationBindingController) transformBindingToWorks(binding *v1alpha1.PropagationBinding, clusterNames []string) error {
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper,
schema.FromAPIVersionAndKind(binding.Spec.Resource.APIVersion, binding.Spec.Resource.Kind))
if err != nil {
klog.Errorf("Failed to get resource, kind: %s, namespace: %s, name: %s. Error: %v",
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", binding.Spec.Resource.APIVersion,
binding.Spec.Resource.Kind, err)
return err
}
workload, err := c.DynamicClient.Resource(dynamicResource).Namespace(binding.Spec.Resource.Namespace).Get(context.TODO(),
binding.Spec.Resource.Name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get workload, kind: %s, namespace: %s, name: %s. Error: %v",
binding.Spec.Resource.Kind, binding.Spec.Resource.Namespace, binding.Spec.Resource.Name, err)
return err
}
clusterNames := c.getBindingClusterNames(binding)
err = c.ensurePropagationWork(workload, clusterNames, binding)
if err != nil {
return err
@ -114,6 +190,7 @@ func (c *PropagationBindingController) transformBindingToWorks(binding *v1alpha1
}
// ensurePropagationWork ensure PropagationWork to be created or updated
// TODO: refactor by CreateOrUpdate Later, if propagationBinding is unchanged, do nothing.
func (c *PropagationBindingController) ensurePropagationWork(workload *unstructured.Unstructured, clusterNames []string,
binding *v1alpha1.PropagationBinding) error {
c.removeIrrelevantField(workload)
@ -132,6 +209,7 @@ func (c *PropagationBindingController) ensurePropagationWork(workload *unstructu
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(binding, controllerKind),
},
Labels: map[string]string{util.OwnerLabel: names.GenerateOwnerLabelValue(binding.GetNamespace(), binding.GetName())},
},
Spec: v1alpha1.PropagationWorkSpec{
Workload: v1alpha1.WorkloadTemplate{
@ -147,30 +225,30 @@ func (c *PropagationBindingController) ensurePropagationWork(workload *unstructu
for _, clusterName := range clusterNames {
executionSpace, err := names.GenerateExecutionSpaceName(clusterName)
if err != nil {
klog.Errorf("Failed to generate execution space name for propagationWork %s/%s. Error: %v", executionSpace, propagationWork.Name, err)
klog.Errorf("Failed to generate execution space name for propagationWork %s/%s. Error: %v.", executionSpace, propagationWork.GetName(), err)
return err
}
workGetResult, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationWorks(executionSpace).Get(context.TODO(), propagationWork.Name, metav1.GetOptions{})
workGetResult, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationWorks(executionSpace).Get(context.TODO(), propagationWork.GetName(), metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
_, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationWorks(executionSpace).Create(context.TODO(), &propagationWork, metav1.CreateOptions{})
if err != nil {
klog.Errorf("Failed to create propagationWork %s/%s. Error: %v", executionSpace, propagationWork.Name, err)
klog.Errorf("Failed to create propagationWork %s/%s. Error: %v.", executionSpace, propagationWork.GetName(), err)
return err
}
klog.Infof("Create propagationWork %s/%s successfully", executionSpace, propagationWork.Name)
klog.Infof("Create propagationWork %s/%s successfully.", executionSpace, propagationWork.GetName())
continue
} else if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get propagationWork %s/%s. Error: %v", executionSpace, propagationWork.Name, err)
klog.Errorf("Failed to get propagationWork %s/%s. Error: %v.", executionSpace, propagationWork.GetName(), err)
return err
}
workGetResult.Spec = propagationWork.Spec
workGetResult.ObjectMeta.OwnerReferences = propagationWork.ObjectMeta.OwnerReferences
_, err = c.KarmadaClient.PropagationstrategyV1alpha1().PropagationWorks(executionSpace).Update(context.TODO(), workGetResult, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update propagationWork %s/%s. Error: %v", executionSpace, propagationWork.Name, err)
klog.Errorf("Failed to update propagationWork %s/%s. Error: %v.", executionSpace, propagationWork.GetName(), err)
return err
}
klog.Infof("Update propagationWork %s/%s successfully", executionSpace, propagationWork.Name)
klog.Infof("Update propagationWork %s/%s successfully.", executionSpace, propagationWork.GetName())
}
return nil
}

View File

@ -2,12 +2,15 @@ package policy
import (
"context"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
@ -15,8 +18,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/huawei-cloudnative/karmada/pkg/apis/propagationstrategy/v1alpha1"
"github.com/huawei-cloudnative/karmada/pkg/controllers/util"
karmadaclientset "github.com/huawei-cloudnative/karmada/pkg/generated/clientset/versioned"
"github.com/huawei-cloudnative/karmada/pkg/util"
"github.com/huawei-cloudnative/karmada/pkg/util/names"
"github.com/huawei-cloudnative/karmada/pkg/util/restmapper"
)
// ControllerName is the controller name that will be used when reporting events.
@ -30,13 +35,14 @@ type PropagationPolicyController struct {
DynamicClient dynamic.Interface // used to fetch arbitrary resources.
KarmadaClient karmadaclientset.Interface // used to create/update PropagationBinding resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (c *PropagationPolicyController) Reconcile(req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling PropagationPolicy %s", req.NamespacedName.String())
klog.V(4).Infof("Reconciling PropagationPolicy %s.", req.NamespacedName.String())
policy := &v1alpha1.PropagationPolicy{}
if err := c.Client.Get(context.TODO(), req.NamespacedName, policy); err != nil {
@ -54,10 +60,15 @@ func (c *PropagationPolicyController) Reconcile(req controllerruntime.Request) (
return controllerruntime.Result{}, nil
}
return c.syncPolicy(policy)
result, err := c.syncPolicy(policy)
if err != nil {
return result, err
}
return controllerruntime.Result{Requeue: true, RequeueAfter: 10 * time.Second}, nil
}
// syncPolicy will fetch matched resource by policy, then transform them to propagationBindings
// syncPolicy will fetch matched resource by policy, then transform them to propagationBindings.
func (c *PropagationPolicyController) syncPolicy(policy *v1alpha1.PropagationPolicy) (controllerruntime.Result, error) {
workloads, err := c.fetchWorkloads(policy.Spec.ResourceSelectors)
if err != nil {
@ -65,16 +76,24 @@ func (c *PropagationPolicyController) syncPolicy(policy *v1alpha1.PropagationPol
}
if len(workloads) == 0 {
klog.Infof("No resource be selected, ignore sync: %s/%s", policy.Namespace, policy.Name)
klog.Infof("No resource be selected, ignore sync: %s/%s.", policy.Namespace, policy.Name)
// TODO(RainbowMango): Need to report an event for no resource policy that may be a mistake.
return controllerruntime.Result{}, nil
}
// TODO(RainbowMango): Ignore the workloads that owns by other policy and can not be shared.
// Ignore the workloads that owns by other policy and can not be shared.
policyReferenceWorkloads := c.ignoreIrrelevantWorkload(policy, workloads)
// TODO(RainbowMango): Claim the rest workloads that should be owned by current policy.
owner := names.GenerateOwnerLabelValue(policy.GetNamespace(), policy.GetName())
// Claim the rest workloads that should be owned by current policy.
err = c.claimResources(owner, policyReferenceWorkloads)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
return c.buildPropagationBinding(policy, workloads)
// TODO: Remove annotation of workloads owned by current policy, but should not be owned now.
return c.buildPropagationBinding(policy, policyReferenceWorkloads)
}
// fetchWorkloads fetches all matched resources via resource selectors.
@ -90,13 +109,13 @@ func (c *PropagationPolicyController) fetchWorkloads(resourceSelectors []v1alpha
if resourceSelector.LabelSelector == nil {
err := c.fetchWorkloadsWithOutLabelSelector(resourceSelector, namespace, names, &workloads)
if err != nil {
klog.Errorf("Failed to fetch workloads by names in namespace %s. Error: %v", namespace, err)
klog.Errorf("Failed to fetch workloads by names in namespace %s. Error: %v.", namespace, err)
return nil, err
}
} else {
err := c.fetchWorkloadsWithLabelSelector(resourceSelector, namespace, names, &workloads)
if err != nil {
klog.Errorf("Failed to fetch workloads with labelSelector in namespace %s. Error: %v", namespace, err)
klog.Errorf("Failed to fetch workloads with labelSelector in namespace %s. Error: %v.", namespace, err)
return nil, err
}
}
@ -105,10 +124,70 @@ func (c *PropagationPolicyController) fetchWorkloads(resourceSelectors []v1alpha
return workloads, nil
}
// buildPropagationBinding will build propagationBinding by matched resources
func (c *PropagationPolicyController) buildPropagationBinding(policy *v1alpha1.PropagationPolicy, workloads []*unstructured.Unstructured) (controllerruntime.Result, error) {
// deletePropagationBinding will create propagationBinding.
func (c *PropagationPolicyController) deletePropagationBinding(binding v1alpha1.PropagationBinding) error {
err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(binding.GetNamespace()).Delete(context.TODO(), binding.GetName(), metav1.DeleteOptions{})
if err != nil && apierrors.IsNotFound(err) {
klog.Infof("PropagationBinding %s/%s is already not exist.", binding.GetNamespace(), binding.GetName())
return nil
} else if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete propagationBinding %s/%s. Error: %v.", binding.GetNamespace(), binding.GetName(), err)
return err
}
klog.Infof("Delete propagationBinding %s/%s successfully.", binding.GetNamespace(), binding.GetName())
return nil
}
// calculatePropagationBindings will get orphanBindings and workloads that need to update or create.
func (c *PropagationPolicyController) calculatePropagationBindings(policy *v1alpha1.PropagationPolicy,
workloads []*unstructured.Unstructured) ([]v1alpha1.PropagationBinding, []*unstructured.Unstructured, error) {
ownerLabel := metav1.LabelSelector{
MatchLabels: map[string]string{util.OwnerLabel: names.GenerateOwnerLabelValue(policy.GetNamespace(), policy.GetName())},
}
bindingList, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(policy.GetNamespace()).List(context.TODO(),
metav1.ListOptions{LabelSelector: labels.Set(ownerLabel.MatchLabels).String()})
if err != nil {
klog.Errorf("Failed to list propagationBindings in namespace %s", policy.GetNamespace())
return nil, nil, err
}
var orphanBindings []v1alpha1.PropagationBinding
for _, binding := range bindingList.Items {
isFind := false
for _, workload := range workloads {
bindingName := names.GenerateBindingName(workload.GetNamespace(), workload.GetKind(), workload.GetName())
if binding.GetName() == bindingName {
isFind = true
break
}
}
if !isFind {
orphanBindings = append(orphanBindings, binding)
}
}
return orphanBindings, workloads, nil
}
// buildPropagationBinding will build propagationBinding by matched resources.
func (c *PropagationPolicyController) buildPropagationBinding(policy *v1alpha1.PropagationPolicy, policyReferenceWorkloads []*unstructured.Unstructured) (controllerruntime.Result, error) {
targetCluster := c.getTargetClusters(policy.Spec.Placement)
orphanBindings, workloads, err := c.calculatePropagationBindings(policy, policyReferenceWorkloads)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
// Remove orphan bindings.
for _, binding := range orphanBindings {
err := c.deletePropagationBinding(binding)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
}
// If binding already exist, update if changed.
// If binding not exist, create it.
for _, workload := range workloads {
err := c.ensurePropagationBinding(policy, workload, targetCluster)
if err != nil {
@ -119,10 +198,57 @@ func (c *PropagationPolicyController) buildPropagationBinding(policy *v1alpha1.P
return controllerruntime.Result{}, nil
}
// claimResources will set ownerLabel in resource that associate with policy.
func (c *PropagationPolicyController) claimResources(owner string, workloads []*unstructured.Unstructured) error {
for _, workload := range workloads {
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper,
schema.FromAPIVersionAndKind(workload.GetAPIVersion(), workload.GetKind()))
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", workload.GetAPIVersion(), workload.GetKind(), err)
return err
}
workloadLabel := workload.GetLabels()
workloadLabel[util.PolicyClaimLabel] = owner
workload.SetLabels(workloadLabel)
_, err = c.DynamicClient.Resource(dynamicResource).Namespace(workload.GetNamespace()).Update(context.TODO(), workload, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update resource, kind: %s, namespace: %s, name: %s. Error: %v.", workload.GetKind(),
workload.GetNamespace(), workload.GetName(), err)
return err
}
klog.V(1).Infof("Update resource successfully, kind: %s, namespace: %s, name: %s.", workload.GetKind(),
workload.GetNamespace(), workload.GetName())
}
return nil
}
// ignoreIrrelevantWorkload will ignore the workloads that owns by other policy and can not be shared.
func (c *PropagationPolicyController) ignoreIrrelevantWorkload(policy *v1alpha1.PropagationPolicy, workloads []*unstructured.Unstructured) []*unstructured.Unstructured {
var result []*unstructured.Unstructured
policyOwnerLabelReference := names.GenerateOwnerLabelValue(policy.GetNamespace(), policy.GetName())
for _, workload := range workloads {
workloadLabels := workload.GetLabels()
owner, exist := workloadLabels[util.PolicyClaimLabel]
if exist && owner != policyOwnerLabelReference {
// this workload owns by other policy, just ignore
continue
}
result = append(result, workload)
}
return result
}
// fetchWorkloadsWithLabelSelector query workloads by labelSelector and names
func (c *PropagationPolicyController) fetchWorkloadsWithLabelSelector(resourceSelector v1alpha1.ResourceSelector, namespace string, names []string, workloads *[]*unstructured.Unstructured) error {
unstructuredWorkLoadList, err := util.ListUnstructuredByFilter(c.DynamicClient, resourceSelector.APIVersion,
resourceSelector.Kind, namespace, resourceSelector.LabelSelector)
func (c *PropagationPolicyController) fetchWorkloadsWithLabelSelector(resourceSelector v1alpha1.ResourceSelector,
namespace string, names []string, workloads *[]*unstructured.Unstructured) error {
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper,
schema.FromAPIVersionAndKind(resourceSelector.APIVersion, resourceSelector.Kind))
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resourceSelector.APIVersion, resourceSelector.Kind, err)
return err
}
unstructuredWorkLoadList, err := c.DynamicClient.Resource(dynamicResource).Namespace(namespace).List(context.TODO(),
metav1.ListOptions{LabelSelector: labels.Set(resourceSelector.LabelSelector.MatchLabels).String()})
if err != nil {
return err
}
@ -146,9 +272,17 @@ func (c *PropagationPolicyController) fetchWorkloadsWithLabelSelector(resourceSe
// fetchWorkloadsWithOutLabelSelector query workloads by names
func (c *PropagationPolicyController) fetchWorkloadsWithOutLabelSelector(resourceSelector v1alpha1.ResourceSelector, namespace string, names []string, workloads *[]*unstructured.Unstructured) error {
for _, name := range names {
workload, err := util.GetUnstructured(c.DynamicClient, resourceSelector.APIVersion,
resourceSelector.Kind, namespace, name)
dynamicResource, err := restmapper.GetGroupVersionResource(c.RESTMapper,
schema.FromAPIVersionAndKind(resourceSelector.APIVersion, resourceSelector.Kind))
if err != nil {
klog.Errorf("Failed to get GVR from GVK %s %s. Error: %v", resourceSelector.APIVersion,
resourceSelector.Kind, err)
return err
}
workload, err := c.DynamicClient.Resource(dynamicResource).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get workload, kind: %s, namespace: %s, name: %s. Error: %v",
resourceSelector.Kind, namespace, name, err)
return err
}
*workloads = append(*workloads, workload)
@ -156,13 +290,13 @@ func (c *PropagationPolicyController) fetchWorkloadsWithOutLabelSelector(resourc
return nil
}
// getTargetClusters get targetClusters by placement
// getTargetClusters get targetClusters by placement.
// TODO(RainbowMango): This is a dummy function and will be removed once scheduler on board.
func (c *PropagationPolicyController) getTargetClusters(placement v1alpha1.Placement) []v1alpha1.TargetCluster {
matchClusterNames := util.GetDifferenceSet(placement.ClusterAffinity.ClusterNames, placement.ClusterAffinity.ExcludeClusters)
// todo: cluster labelSelector, fieldSelector, clusterTolerations
// todo: calc spread contraints. such as maximum, minimum
// TODO: cluster labelSelector, fieldSelector, clusterTolerations
// TODO: calc spread contraints. such as maximum, minimum
var targetClusters []v1alpha1.TargetCluster
for _, matchClusterName := range matchClusterNames {
targetClusters = append(targetClusters, v1alpha1.TargetCluster{Name: matchClusterName})
@ -171,15 +305,17 @@ func (c *PropagationPolicyController) getTargetClusters(placement v1alpha1.Place
}
// ensurePropagationBinding will ensure propagationBinding are created or updated.
func (c *PropagationPolicyController) ensurePropagationBinding(propagationPolicy *v1alpha1.PropagationPolicy, workload *unstructured.Unstructured, clusterNames []v1alpha1.TargetCluster) error {
bindingName := strings.ToLower(workload.GetNamespace() + "-" + workload.GetKind() + "-" + workload.GetName())
// TODO: refactor by CreateOrUpdate Later, if propagationBinding is unchanged, do nothing.
func (c *PropagationPolicyController) ensurePropagationBinding(policy *v1alpha1.PropagationPolicy, workload *unstructured.Unstructured, clusterNames []v1alpha1.TargetCluster) error {
bindingName := names.GenerateBindingName(workload.GetNamespace(), workload.GetKind(), workload.GetName())
propagationBinding := v1alpha1.PropagationBinding{
ObjectMeta: metav1.ObjectMeta{
Name: bindingName,
Namespace: propagationPolicy.Namespace,
Namespace: policy.GetNamespace(),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(propagationPolicy, controllerKind),
*metav1.NewControllerRef(policy, controllerKind),
},
Labels: map[string]string{util.OwnerLabel: names.GenerateOwnerLabelValue(policy.GetNamespace(), policy.GetName())},
},
Spec: v1alpha1.PropagationBindingSpec{
Resource: v1alpha1.ObjectReference{
@ -193,27 +329,27 @@ func (c *PropagationPolicyController) ensurePropagationBinding(propagationPolicy
},
}
bindingGetResult, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(propagationBinding.Namespace).Get(context.TODO(), propagationBinding.Name, metav1.GetOptions{})
bindingGetResult, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(propagationBinding.GetNamespace()).Get(context.TODO(), propagationBinding.GetName(), metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
_, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(propagationBinding.Namespace).Create(context.TODO(), &propagationBinding, metav1.CreateOptions{})
_, err := c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(propagationBinding.GetNamespace()).Create(context.TODO(), &propagationBinding, metav1.CreateOptions{})
if err != nil {
klog.Errorf("Failed to create propagationBinding %s/%s. Error: %v", propagationBinding.Namespace, propagationBinding.Name, err)
klog.Errorf("Failed to create propagationBinding %s/%s. Error: %v", propagationBinding.GetNamespace(), propagationBinding.GetName(), err)
return err
}
klog.Infof("Create propagationBinding %s/%s successfully", propagationBinding.Namespace, propagationBinding.Name)
klog.Infof("Create propagationBinding %s/%s successfully", propagationBinding.GetNamespace(), propagationBinding.GetName())
return nil
} else if err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to get propagationBinding %s/%s. Error: %v", propagationBinding.Namespace, propagationBinding.Name, err)
klog.Errorf("Failed to get propagationBinding %s/%s. Error: %v", propagationBinding.GetNamespace(), propagationBinding.GetName(), err)
return err
}
bindingGetResult.Spec = propagationBinding.Spec
bindingGetResult.ObjectMeta.OwnerReferences = propagationBinding.ObjectMeta.OwnerReferences
_, err = c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(propagationBinding.Namespace).Update(context.TODO(), bindingGetResult, metav1.UpdateOptions{})
_, err = c.KarmadaClient.PropagationstrategyV1alpha1().PropagationBindings(propagationBinding.GetNamespace()).Update(context.TODO(), bindingGetResult, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update propagationBinding %s/%s. Error: %v", propagationBinding.Namespace, propagationBinding.Name, err)
klog.Errorf("Failed to update propagationBinding %s/%s. Error: %v", propagationBinding.GetNamespace(), propagationBinding.GetName(), err)
return err
}
klog.Infof("Update propagationBinding %s/%s successfully", propagationBinding.Namespace, propagationBinding.Name)
klog.Infof("Update propagationBinding %s/%s successfully", propagationBinding.GetNamespace(), propagationBinding.GetName())
return nil
}

View File

@ -1,103 +0,0 @@
package util
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
)
// todo: this can get by kubectl api-resources
// ResourceKindMap get the Resource of a given kind
var ResourceKindMap = map[string]string{
"ConfigMap": "configmaps",
"Namespace": "namespaces",
"PersistentVolumeClaim": "persistentvolumeclaims",
"PersistentVolume": "persistentvolumes",
"Pod": "pods",
"Secret": "secrets",
"Service": "services",
"Deployment": "deployments",
"DaemonSet": "daemonsets",
"StatefulSet": "statefulsets",
"ReplicaSet": "replicasets",
"CronJob": "cronjobs",
"Job": "jobs",
"Ingress": "ingresses",
}
//
func generateGroupVersionResource(apiVersion, kind string) (schema.GroupVersionResource, error) {
groupVersion, err := schema.ParseGroupVersion(apiVersion)
if err != nil {
return schema.GroupVersionResource{}, err
}
dynamicResource := schema.GroupVersionResource{Group: groupVersion.Group, Version: groupVersion.Version, Resource: ResourceKindMap[kind]}
return dynamicResource, nil
}
// GetUnstructured will get unstructured object
func GetUnstructured(client dynamic.Interface, apiVersion, kind, namespace, name string) (*unstructured.Unstructured, error) {
dynamicResource, err := generateGroupVersionResource(apiVersion, kind)
if err != nil {
return nil, err
}
result, err := client.Resource(dynamicResource).Namespace(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return result, nil
}
// ListUnstructuredByFilter will list unstructuredList by labelSelector
func ListUnstructuredByFilter(client dynamic.Interface, apiVersion, kind, namespace string, labelSelector *metav1.LabelSelector) (*unstructured.UnstructuredList, error) {
dynamicResource, err := generateGroupVersionResource(apiVersion, kind)
if err != nil {
return nil, err
}
result, err := client.Resource(dynamicResource).Namespace(namespace).List(context.TODO(),
metav1.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).String()})
if err != nil {
return nil, err
}
return result, nil
}
// GetDifferenceSet will get difference set from includeItems and excludeItems
func GetDifferenceSet(includeItems, excludeItems []string) []string {
if includeItems == nil {
includeItems = []string{}
}
if excludeItems == nil {
excludeItems = []string{}
}
includeSet := sets.NewString()
excludeSet := sets.NewString()
for _, targetItem := range excludeItems {
excludeSet.Insert(targetItem)
}
for _, targetItem := range includeItems {
includeSet.Insert(targetItem)
}
matchItems := includeSet.Difference(excludeSet)
return matchItems.List()
}
// GetUniqueElements will delete duplicate element in list
func GetUniqueElements(list []string) []string {
if list == nil {
return []string{}
}
result := sets.String{}
for _, item := range list {
result.Insert(item)
}
return result.List()
}

12
pkg/util/constants.go Normal file
View File

@ -0,0 +1,12 @@
package util
const (
// PolicyClaimLabel will set in kubernetes resource, indicates that
// the resource is occupied by propagationPolicy
PolicyClaimLabel = "karmada.io/driven-by"
// OwnerLabel will set in karmada CRDs, indicates that who created it.
// We can use labelSelector to find who created it quickly.
// example1: set it in propagationBinding, the label value is propagationPolicy.
// example2: set it in propagationWork, the label value is propagationBinding.
OwnerLabel = "karmada.io/created-by"
)

View File

@ -24,3 +24,13 @@ func GetMemberClusterName(executionSpaceName string) (string, error) {
}
return strings.TrimPrefix(executionSpaceName, executionSpacePrefix), nil
}
// GenerateBindingName will generate binding name by namespace, kind and name
func GenerateBindingName(namespace, kind, name string) string {
return strings.ToLower(namespace + "-" + kind + "-" + name)
}
// GenerateOwnerLabelValue will get owner label value.
func GenerateOwnerLabelValue(namespace, name string) string {
return namespace + "." + name
}

39
pkg/util/util.go Normal file
View File

@ -0,0 +1,39 @@
package util
import (
"k8s.io/apimachinery/pkg/util/sets"
)
// GetDifferenceSet will get difference set from includeItems and excludeItems.
func GetDifferenceSet(includeItems, excludeItems []string) []string {
if includeItems == nil {
includeItems = []string{}
}
if excludeItems == nil {
excludeItems = []string{}
}
includeSet := sets.NewString()
excludeSet := sets.NewString()
for _, targetItem := range excludeItems {
excludeSet.Insert(targetItem)
}
for _, targetItem := range includeItems {
includeSet.Insert(targetItem)
}
matchItems := includeSet.Difference(excludeSet)
return matchItems.List()
}
// GetUniqueElements will delete duplicate element in list.
func GetUniqueElements(list []string) []string {
if list == nil {
return []string{}
}
result := sets.String{}
for _, item := range list {
result.Insert(item)
}
return result.List()
}