From 62a0e50fa201ac636eb0c3fe18ae51f0fc2e86eb Mon Sep 17 00:00:00 2001 From: RainbowMango Date: Mon, 21 Aug 2023 16:50:37 +0800 Subject: [PATCH] Detect priority change happend on PropagationPolicy and enqueue PropagationPolicy that could repreempte. Signed-off-by: RainbowMango --- pkg/detector/detector.go | 49 +++++++++++++++++++++++++++++++++++++- pkg/detector/preemption.go | 44 ++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 1 deletion(-) diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index 65ff9a5f9..c3be4f83b 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -31,6 +31,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/metrics" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" @@ -783,7 +784,7 @@ func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) { } // OnPropagationPolicyUpdate handles object update event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyUpdate(_, newObj interface{}) { +func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) { key, err := ClusterWideKeyFunc(newObj) if err != nil { return @@ -791,6 +792,52 @@ func (d *ResourceDetector) OnPropagationPolicyUpdate(_, newObj interface{}) { klog.V(2).Infof("Update PropagationPolicy(%s)", key) d.policyReconcileWorker.Add(key) + + // Temporary solution of corner case: After the priority(.spec.priority) of + // PropagationPolicy changed from high priority (e.g. 5) to low priority(e.g. 3), + // we should try to check if there is a PropagationPolicy(e.g. with priority 4) + // could preempt the targeted resources. + // + // Recognized limitations of the temporary solution are: + // - Too much logical processed in an event handler function will slow down + // the overall reconcile speed. + // - If there is an error raised during the process, the event will be lost + // and no second chance to retry. + // + // The idea of the long-term solution, perhaps PropagationPolicy could have + // a status, in that case we can record the observed priority(.status.observedPriority) + // which can be used to detect priority changes during reconcile logic. + if features.FeatureGate.Enabled(features.PolicyPreemption) { + var unstructuredOldObj *unstructured.Unstructured + var unstructuredNewObj *unstructured.Unstructured + + unstructuredOldObj, err = helper.ToUnstructured(oldObj) + if err != nil { + klog.Errorf("Failed to transform oldObj, error: %v", err) + return + } + unstructuredNewObj, err = helper.ToUnstructured(newObj) + if err != nil { + klog.Errorf("Failed to transform newObj, error: %v", err) + return + } + + var oldPolicy policyv1alpha1.PropagationPolicy + var newPolicy policyv1alpha1.PropagationPolicy + + if err = helper.ConvertToTypedObject(unstructuredOldObj, &oldPolicy); err != nil { + klog.Errorf("Failed to convert typed PropagationPolicy(%s/%s): %v", unstructuredOldObj.GetNamespace(), unstructuredOldObj.GetName(), err) + return + } + if err = helper.ConvertToTypedObject(unstructuredNewObj, &newPolicy); err != nil { + klog.Errorf("Failed to convert typed PropagationPolicy(%s/%s): %v", newPolicy.GetNamespace(), newPolicy.GetName(), err) + return + } + + if newPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() { + d.HandleDeprioritizedPropagationPolicy(oldPolicy, newPolicy) + } + } } // OnPropagationPolicyDelete handles object delete event and push the object to queue. diff --git a/pkg/detector/preemption.go b/pkg/detector/preemption.go index 01dd9a37b..c4be656bd 100755 --- a/pkg/detector/preemption.go +++ b/pkg/detector/preemption.go @@ -4,6 +4,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" @@ -227,3 +228,46 @@ func (d *ResourceDetector) fetchResourceTemplate(rs policyv1alpha1.ResourceSelec } return resourceTemplate, nil } + +// HandleDeprioritizedPropagationPolicy responses to priority change of a PropagationPolicy, +// if the change is from high priority (e.g. 5) to low priority(e.g. 3), it will +// check if there is another PropagationPolicy could preempt the targeted resource, +// and put the PropagationPolicy in the queue to trigger preemption. +func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policyv1alpha1.PropagationPolicy, newPolicy policyv1alpha1.PropagationPolicy) { + klog.Infof("PropagationPolicy(%s/%s) priority changed from %d to %d", newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority) + policies, err := d.propagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything()) + if err != nil { + klog.Errorf("Failed to list PropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err) + return + } + + // TODO(@RainbowMango): Should sort the listed policies to ensure the + // higher priority PropagationPolicy be process first to avoid possible + // multiple preemption. + + for i := range policies { + var potentialPolicy policyv1alpha1.PropagationPolicy + if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil { + klog.Errorf("Failed to convert typed PropagationPolicy: %v", err) + continue + } + // Re-queue the polies that enables preemption and with the priority + // in range (new priority, old priority). + // For the polices with higher priority than old priority, it can + // perform preempt automatically and don't need to re-queue here. + // For the polices with lower priority than new priority, it can't + // perform preempt as insufficient priority. + if potentialPolicy.Spec.Priority != nil && + potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways && + potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() && + potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() { + var potentialKey util.QueueKey + potentialKey, err = ClusterWideKeyFunc(&potentialPolicy) + if err != nil { + return + } + klog.Infof("Enqueuing PropagationPolicy(%s/%s) in case of PropagationPolicy(%s/%s) priority changes", potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName()) + d.policyReconcileWorker.Add(potentialKey) + } + } +}