diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index a665cf549..c517ef536 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -981,10 +981,11 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyDeletion(policyName str } // HandlePropagationPolicyCreationOrUpdate handles PropagationPolicy add and update event. -// When a new policy arrives, should first check whether existing objects are no longer -// matched by the current policy, if yes, clean the labels on the object. +// When a new policy arrives, should check whether existing objects are no longer matched by the current policy, +// if yes, clean the labels on the object. // And then check if object in waiting list matches the policy, if yes remove the object // from waiting list and throw the object to it's reconcile queue. If not, do nothing. +// Finally, handle the propagation policy preemption process if preemption is enabled. func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *policyv1alpha1.PropagationPolicy) error { err := d.cleanPPUnmatchedResourceBindings(policy.Namespace, policy.Name, policy.Spec.ResourceSelectors) if err != nil { @@ -1023,14 +1024,20 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic d.Processor.Add(key) } + // if preemption is enabled, handle the preemption process. + if preemptionEnabled(policy.Spec.Preemption) { + return d.handlePropagationPolicyPreemption(policy) + } + return nil } // HandleClusterPropagationPolicyCreationOrUpdate handles ClusterPropagationPolicy add and update event. -// When a new policy arrives, should first check whether existing objects are no longer -// matched by the current policy, if yes, clean the labels on the object. +// When a new policy arrives, should check whether existing objects are no longer matched by the current policy, +// if yes, clean the labels on the object. // And then check if object in waiting list matches the policy, if yes remove the object // from waiting list and throw the object to it's reconcile queue. If not, do nothing. +// Finally, handle the cluster propagation policy preemption process if preemption is enabled. func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy *policyv1alpha1.ClusterPropagationPolicy) error { err := d.cleanCPPUnmatchedResourceBindings(policy.Name, policy.Spec.ResourceSelectors) if err != nil { @@ -1085,6 +1092,11 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy d.Processor.Add(key) } + // if preemption is enabled, handle the preemption process. + if preemptionEnabled(policy.Spec.Preemption) { + return d.handleClusterPropagationPolicyPreemption(policy) + } + return nil } diff --git a/pkg/detector/preemption.go b/pkg/detector/preemption.go new file mode 100755 index 000000000..fa64a6081 --- /dev/null +++ b/pkg/detector/preemption.go @@ -0,0 +1,180 @@ +package detector + +import ( + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + "github.com/karmada-io/karmada/pkg/features" + "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/fedinformer/keys" + "github.com/karmada-io/karmada/pkg/util/helper" + "github.com/karmada-io/karmada/pkg/util/names" +) + +// preemptionEnabled checks if preemption is enabled. +func preemptionEnabled(preemption policyv1alpha1.PreemptionBehavior) bool { + if preemption != policyv1alpha1.PreemptAlways { + return false + } + if !features.FeatureGate.Enabled(features.PolicyPreemption) { + klog.Warningf("Cannot handle the preemption process because feature gate %q is not enabled.", features.PolicyPreemption) + return false + } + return true +} + +// handleClusterPropagationPolicyPreemption handles the preemption process of PropagationPolicy. +// The preemption rule: high-priority PP > low-priority PP > CPP. +func (d *ResourceDetector) handlePropagationPolicyPreemption(policy *policyv1alpha1.PropagationPolicy) error { + var errs []error + for _, rs := range policy.Spec.ResourceSelectors { + resourceTemplate, err := d.fetchResourceTemplate(rs) + if err != nil { + errs = append(errs, err) + continue + } + if resourceTemplate == nil { + continue + } + + errs = append(errs, d.preemptPropagationPolicy(resourceTemplate, policy)) + // TODO(whitewindmills): PP preempts CPP. + } + + return utilerrors.NewAggregate(errs) +} + +// handleClusterPropagationPolicyPreemption handles the preemption process of ClusterPropagationPolicy. +// The preemption rule: high-priority CPP > low-priority CPP. +func (d *ResourceDetector) handleClusterPropagationPolicyPreemption(policy *policyv1alpha1.ClusterPropagationPolicy) error { + var errs []error + for _, rs := range policy.Spec.ResourceSelectors { + resourceTemplate, err := d.fetchResourceTemplate(rs) + if err != nil { + errs = append(errs, err) + continue + } + if resourceTemplate == nil { + continue + } + + errs = append(errs, d.preemptClusterPropagationPolicy(resourceTemplate, policy)) + } + + return utilerrors.NewAggregate(errs) +} + +// preemptPropagationPolicy preempts resource template that is claimed by PropagationPolicy. +func (d *ResourceDetector) preemptPropagationPolicy(resourceTemplate *unstructured.Unstructured, policy *policyv1alpha1.PropagationPolicy) error { + rtLabels := resourceTemplate.GetLabels() + claimedPolicyNamespace := util.GetLabelValue(rtLabels, policyv1alpha1.PropagationPolicyNamespaceLabel) + claimedPolicyName := util.GetLabelValue(rtLabels, policyv1alpha1.PropagationPolicyNameLabel) + if claimedPolicyName == "" || claimedPolicyNamespace == "" { + return nil + } + // resource template has been claimed by policy itself. + if claimedPolicyNamespace == policy.Namespace && claimedPolicyName == policy.Name { + return nil + } + + claimedPolicyObj, err := d.propagationPolicyLister.ByNamespace(claimedPolicyNamespace).Get(claimedPolicyName) + if err != nil { + klog.Errorf("Failed to retrieve claimed propagation policy(%s/%s): %v.", claimedPolicyNamespace, claimedPolicyName, err) + return err + } + + claimedPolicy := &policyv1alpha1.PropagationPolicy{} + if err = helper.ConvertToTypedObject(claimedPolicyObj, claimedPolicy); err != nil { + klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v.", err) + return err + } + + if policy.ExplicitPriority() <= claimedPolicy.ExplicitPriority() { + klog.V(2).Infof("Propagation policy(%s/%s) cannot preempt another propagation policy(%s/%s) due to insufficient priority.", + policy.Namespace, policy.Name, claimedPolicyNamespace, claimedPolicyName) + return nil + } + + clusterWideKey, err := keys.ClusterWideKeyFunc(resourceTemplate) + if err != nil { + // should not happen. + return err + } + if err := d.ApplyPolicy(resourceTemplate, clusterWideKey, policy); err != nil { + klog.Errorf("Failed to apply new propagation policy(%s/%s) on resource template(%s, kind=%s, %s): %v.", policy.Namespace, policy.Name, + resourceTemplate.GetAPIVersion(), resourceTemplate.GetKind(), names.NamespacedKey(resourceTemplate.GetNamespace(), resourceTemplate.GetName()), err) + return err + } + klog.V(4).Infof("Propagation policy(%s/%s) has preempted another propagation policy(%s/%s).", + policy.Namespace, policy.Name, claimedPolicyNamespace, claimedPolicyName) + return nil +} + +// preemptClusterPropagationPolicy preempts resource template that is claimed by ClusterPropagationPolicy. +func (d *ResourceDetector) preemptClusterPropagationPolicy(resourceTemplate *unstructured.Unstructured, policy *policyv1alpha1.ClusterPropagationPolicy) error { + claimedPolicyName := util.GetLabelValue(resourceTemplate.GetLabels(), policyv1alpha1.ClusterPropagationPolicyLabel) + if claimedPolicyName == "" { + return nil + } + // resource template has been claimed by policy itself. + if claimedPolicyName == policy.Name { + return nil + } + + claimedPolicyObj, err := d.clusterPropagationPolicyLister.Get(claimedPolicyName) + if err != nil { + klog.Errorf("Failed to retrieve claimed cluster propagation policy(%s): %v.", claimedPolicyName, err) + return err + } + + claimedPolicy := &policyv1alpha1.ClusterPropagationPolicy{} + if err = helper.ConvertToTypedObject(claimedPolicyObj, claimedPolicy); err != nil { + klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v.", err) + return err + } + + if policy.ExplicitPriority() <= claimedPolicy.ExplicitPriority() { + klog.V(2).Infof("Cluster propagation policy(%s) cannot preempt another cluster propagation policy(%s) due to insufficient priority.", + policy.Name, claimedPolicyName) + return nil + } + + clusterWideKey, err := keys.ClusterWideKeyFunc(resourceTemplate) + if err != nil { + // should not happen. + return err + } + if err := d.ApplyClusterPolicy(resourceTemplate, clusterWideKey, policy); err != nil { + klog.Errorf("Failed to apply new cluster propagation policy(%s) on resource template(%s, kind=%s, %s): %v.", policy.Name, + resourceTemplate.GetAPIVersion(), resourceTemplate.GetKind(), names.NamespacedKey(resourceTemplate.GetNamespace(), resourceTemplate.GetName()), err) + return err + } + klog.V(4).Infof("Cluster propagation policy(%s) has preempted another cluster propagation policy(%s).", policy.Name, claimedPolicyName) + return nil +} + +// fetchResourceTemplate fetches resource template by resource selector, ignore it if not found or deleting. +func (d *ResourceDetector) fetchResourceTemplate(rs policyv1alpha1.ResourceSelector) (*unstructured.Unstructured, error) { + resourceTemplate, err := helper.FetchResourceTemplate(d.DynamicClient, d.InformerManager, d.RESTMapper, helper.ConstructObjectReference(rs)) + if err != nil { + // do nothing if resource template not exist, it might has been removed. + if apierrors.IsNotFound(err) { + klog.V(2).Infof("Resource template(%s, kind=%s, %s) cannot be preempted because it has been deleted.", + rs.APIVersion, rs.Kind, names.NamespacedKey(rs.Namespace, rs.Name)) + return nil, nil + } + klog.Errorf("Failed to fetch resource template(%s, kind=%s, %s): %v.", rs.APIVersion, rs.Kind, + names.NamespacedKey(rs.Namespace, rs.Name), err) + return nil, err + } + + if !resourceTemplate.GetDeletionTimestamp().IsZero() { + klog.V(2).Infof("Resource template(%s, kind=%s, %s) cannot be preempted because it's being deleted.", + rs.APIVersion, rs.Kind, names.NamespacedKey(rs.Namespace, rs.Name)) + return nil, nil + } + return resourceTemplate, nil +} diff --git a/pkg/util/helper/binding.go b/pkg/util/helper/binding.go index c7e8a60eb..c3f72090b 100644 --- a/pkg/util/helper/binding.go +++ b/pkg/util/helper/binding.go @@ -19,6 +19,7 @@ import ( "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" @@ -385,3 +386,13 @@ func EmitClusterEvictionEventForClusterResourceBinding(binding *workv1alpha2.Clu eventRecorder.Eventf(ref, corev1.EventTypeNormal, events.EventReasonEvictWorkloadFromClusterSucceed, "Evict from cluster %s succeed.", cluster) } } + +// ConstructObjectReference constructs ObjectReference from ResourceSelector. +func ConstructObjectReference(rs policyv1alpha1.ResourceSelector) workv1alpha2.ObjectReference { + return workv1alpha2.ObjectReference{ + APIVersion: rs.APIVersion, + Kind: rs.Kind, + Namespace: rs.Namespace, + Name: rs.Name, + } +} diff --git a/pkg/util/names/names.go b/pkg/util/names/names.go index daee8f70e..0496e14dd 100644 --- a/pkg/util/names/names.go +++ b/pkg/util/names/names.go @@ -157,3 +157,11 @@ func GeneratePolicyName(namespace, name, gvk string) string { } return strings.ToLower(fmt.Sprintf("%s-%s", name, rand.SafeEncodeString(fmt.Sprint(hash.Sum32())))) } + +// NamespacedKey generates key with namespace and name. +func NamespacedKey(namespace, name string) string { + if namespace == "" { + return name + } + return namespace + "/" + name +}