Merge pull request #3845 from whitewindmills/policy-preemption

feat: implement preemption between propagation policies
This commit is contained in:
karmada-bot 2023-07-31 19:57:49 +08:00 committed by GitHub
commit 71584bca33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 30 deletions

View File

@ -422,6 +422,7 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
bindingCopy.Spec.Placement = binding.Spec.Placement
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
excludeClusterPolicy(bindingCopy.Labels)
return nil
})
if err != nil {
@ -598,18 +599,23 @@ func (d *ResourceDetector) GetUnstructuredObject(objectKey keys.ClusterWideKey)
// ClaimPolicyForObject set policy identifier which the object associated with.
func (d *ResourceDetector) ClaimPolicyForObject(object *unstructured.Unstructured, policyNamespace string, policyName string) error {
claimedNS := util.GetLabelValue(object.GetLabels(), policyv1alpha1.PropagationPolicyNamespaceLabel)
claimedName := util.GetLabelValue(object.GetLabels(), policyv1alpha1.PropagationPolicyNameLabel)
// object has been claimed, don't need to claim again
if claimedNS == policyNamespace && claimedName == policyName {
return nil
objLabels := object.GetLabels()
if objLabels == nil {
objLabels = make(map[string]string)
} else if len(objLabels) > 0 {
// object has been claimed, don't need to claim again
if !excludeClusterPolicy(objLabels) &&
objLabels[policyv1alpha1.PropagationPolicyNamespaceLabel] == policyNamespace &&
objLabels[policyv1alpha1.PropagationPolicyNameLabel] == policyName {
return nil
}
}
util.MergeLabel(object, policyv1alpha1.PropagationPolicyNamespaceLabel, policyNamespace)
util.MergeLabel(object, policyv1alpha1.PropagationPolicyNameLabel, policyName)
return d.Client.Update(context.TODO(), object)
objLabels[policyv1alpha1.PropagationPolicyNamespaceLabel] = policyNamespace
objLabels[policyv1alpha1.PropagationPolicyNameLabel] = policyName
objectCopy := object.DeepCopy()
objectCopy.SetLabels(objLabels)
return d.Client.Update(context.TODO(), objectCopy)
}
// ClaimClusterPolicyForObject set cluster identifier which the object associated with.
@ -621,8 +627,9 @@ func (d *ResourceDetector) ClaimClusterPolicyForObject(object *unstructured.Unst
return nil
}
util.MergeLabel(object, policyv1alpha1.ClusterPropagationPolicyLabel, policyName)
return d.Client.Update(context.TODO(), object)
objectCopy := object.DeepCopy()
util.MergeLabel(objectCopy, policyv1alpha1.ClusterPropagationPolicyLabel, policyName)
return d.Client.Update(context.TODO(), objectCopy)
}
// BuildResourceBinding builds a desired ResourceBinding for object.

View File

@ -329,3 +329,13 @@ func (d *ResourceDetector) listCPPDerivedCRB(policyName string) (*workv1alpha2.C
return bindings, nil
}
// excludeClusterPolicy excludes cluster propagation policy.
// If propagation policy was claimed, cluster propagation policy should not exists.
func excludeClusterPolicy(objLabels map[string]string) bool {
if _, ok := objLabels[policyv1alpha1.ClusterPropagationPolicyLabel]; !ok {
return false
}
delete(objLabels, policyv1alpha1.ClusterPropagationPolicyLabel)
return true
}

View File

@ -9,7 +9,6 @@ import (
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
"github.com/karmada-io/karmada/pkg/features"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
)
@ -40,8 +39,13 @@ func (d *ResourceDetector) handlePropagationPolicyPreemption(policy *policyv1alp
continue
}
errs = append(errs, d.preemptPropagationPolicy(resourceTemplate, policy))
// TODO(whitewindmills): PP preempts CPP.
if err := d.preemptPropagationPolicy(resourceTemplate, policy); err != nil {
errs = append(errs, err)
continue
}
if err := d.preemptClusterPropagationPolicyDirectly(resourceTemplate, policy); err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
@ -61,7 +65,9 @@ func (d *ResourceDetector) handleClusterPropagationPolicyPreemption(policy *poli
continue
}
errs = append(errs, d.preemptClusterPropagationPolicy(resourceTemplate, policy))
if err := d.preemptClusterPropagationPolicy(resourceTemplate, policy); err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
@ -98,13 +104,8 @@ func (d *ResourceDetector) preemptPropagationPolicy(resourceTemplate *unstructur
return nil
}
clusterWideKey, err := keys.ClusterWideKeyFunc(resourceTemplate)
if err != nil {
// should not happen.
return err
}
if err := d.ApplyPolicy(resourceTemplate, clusterWideKey, policy); err != nil {
klog.Errorf("Failed to apply new propagation policy(%s/%s) on resource template(%s, kind=%s, %s): %v.", policy.Namespace, policy.Name,
if err := d.ClaimPolicyForObject(resourceTemplate, policy.Namespace, policy.Name); err != nil {
klog.Errorf("Failed to claim new propagation policy(%s/%s) on resource template(%s, kind=%s, %s): %v.", policy.Namespace, policy.Name,
resourceTemplate.GetAPIVersion(), resourceTemplate.GetKind(), names.NamespacedKey(resourceTemplate.GetNamespace(), resourceTemplate.GetName()), err)
return err
}
@ -113,6 +114,23 @@ func (d *ResourceDetector) preemptPropagationPolicy(resourceTemplate *unstructur
return nil
}
// preemptClusterPropagationPolicyDirectly directly preempts resource template claimed by ClusterPropagationPolicy regardless of priority.
func (d *ResourceDetector) preemptClusterPropagationPolicyDirectly(resourceTemplate *unstructured.Unstructured, policy *policyv1alpha1.PropagationPolicy) error {
claimedPolicyName := util.GetLabelValue(resourceTemplate.GetLabels(), policyv1alpha1.ClusterPropagationPolicyLabel)
if claimedPolicyName == "" {
return nil
}
if err := d.ClaimPolicyForObject(resourceTemplate, policy.Namespace, policy.Name); err != nil {
klog.Errorf("Failed to claim new propagation policy(%s/%s) on resource template(%s, kind=%s, %s) directly: %v.", policy.Namespace, policy.Name,
resourceTemplate.GetAPIVersion(), resourceTemplate.GetKind(), names.NamespacedKey(resourceTemplate.GetNamespace(), resourceTemplate.GetName()), err)
return err
}
klog.V(4).Infof("Propagation policy(%s/%s) has preempted another cluster propagation policy(%s).",
policy.Namespace, policy.Name, claimedPolicyName)
return nil
}
// preemptClusterPropagationPolicy preempts resource template that is claimed by ClusterPropagationPolicy.
func (d *ResourceDetector) preemptClusterPropagationPolicy(resourceTemplate *unstructured.Unstructured, policy *policyv1alpha1.ClusterPropagationPolicy) error {
claimedPolicyName := util.GetLabelValue(resourceTemplate.GetLabels(), policyv1alpha1.ClusterPropagationPolicyLabel)
@ -142,13 +160,8 @@ func (d *ResourceDetector) preemptClusterPropagationPolicy(resourceTemplate *uns
return nil
}
clusterWideKey, err := keys.ClusterWideKeyFunc(resourceTemplate)
if err != nil {
// should not happen.
return err
}
if err := d.ApplyClusterPolicy(resourceTemplate, clusterWideKey, policy); err != nil {
klog.Errorf("Failed to apply new cluster propagation policy(%s) on resource template(%s, kind=%s, %s): %v.", policy.Name,
if err := d.ClaimClusterPolicyForObject(resourceTemplate, policy.Name); err != nil {
klog.Errorf("Failed to claim new cluster propagation policy(%s) on resource template(%s, kind=%s, %s): %v.", policy.Name,
resourceTemplate.GetAPIVersion(), resourceTemplate.GetKind(), names.NamespacedKey(resourceTemplate.GetNamespace(), resourceTemplate.GetName()), err)
return err
}