Detect priority change happend on PropagationPolicy
and enqueue PropagationPolicy that could repreempte. Signed-off-by: RainbowMango <qdurenhongcai@gmail.com>
This commit is contained in:
parent
273cc9a4dd
commit
62a0e50fa2
|
@ -31,6 +31,7 @@ import (
|
|||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/events"
|
||||
"github.com/karmada-io/karmada/pkg/features"
|
||||
"github.com/karmada-io/karmada/pkg/metrics"
|
||||
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
|
||||
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
|
||||
|
@ -783,7 +784,7 @@ func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) {
|
|||
}
|
||||
|
||||
// OnPropagationPolicyUpdate handles object update event and push the object to queue.
|
||||
func (d *ResourceDetector) OnPropagationPolicyUpdate(_, newObj interface{}) {
|
||||
func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) {
|
||||
key, err := ClusterWideKeyFunc(newObj)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -791,6 +792,52 @@ func (d *ResourceDetector) OnPropagationPolicyUpdate(_, newObj interface{}) {
|
|||
|
||||
klog.V(2).Infof("Update PropagationPolicy(%s)", key)
|
||||
d.policyReconcileWorker.Add(key)
|
||||
|
||||
// Temporary solution of corner case: After the priority(.spec.priority) of
|
||||
// PropagationPolicy changed from high priority (e.g. 5) to low priority(e.g. 3),
|
||||
// we should try to check if there is a PropagationPolicy(e.g. with priority 4)
|
||||
// could preempt the targeted resources.
|
||||
//
|
||||
// Recognized limitations of the temporary solution are:
|
||||
// - Too much logical processed in an event handler function will slow down
|
||||
// the overall reconcile speed.
|
||||
// - If there is an error raised during the process, the event will be lost
|
||||
// and no second chance to retry.
|
||||
//
|
||||
// The idea of the long-term solution, perhaps PropagationPolicy could have
|
||||
// a status, in that case we can record the observed priority(.status.observedPriority)
|
||||
// which can be used to detect priority changes during reconcile logic.
|
||||
if features.FeatureGate.Enabled(features.PolicyPreemption) {
|
||||
var unstructuredOldObj *unstructured.Unstructured
|
||||
var unstructuredNewObj *unstructured.Unstructured
|
||||
|
||||
unstructuredOldObj, err = helper.ToUnstructured(oldObj)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to transform oldObj, error: %v", err)
|
||||
return
|
||||
}
|
||||
unstructuredNewObj, err = helper.ToUnstructured(newObj)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to transform newObj, error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
var oldPolicy policyv1alpha1.PropagationPolicy
|
||||
var newPolicy policyv1alpha1.PropagationPolicy
|
||||
|
||||
if err = helper.ConvertToTypedObject(unstructuredOldObj, &oldPolicy); err != nil {
|
||||
klog.Errorf("Failed to convert typed PropagationPolicy(%s/%s): %v", unstructuredOldObj.GetNamespace(), unstructuredOldObj.GetName(), err)
|
||||
return
|
||||
}
|
||||
if err = helper.ConvertToTypedObject(unstructuredNewObj, &newPolicy); err != nil {
|
||||
klog.Errorf("Failed to convert typed PropagationPolicy(%s/%s): %v", newPolicy.GetNamespace(), newPolicy.GetName(), err)
|
||||
return
|
||||
}
|
||||
|
||||
if newPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
|
||||
d.HandleDeprioritizedPropagationPolicy(oldPolicy, newPolicy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OnPropagationPolicyDelete handles object delete event and push the object to queue.
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
|
@ -227,3 +228,46 @@ func (d *ResourceDetector) fetchResourceTemplate(rs policyv1alpha1.ResourceSelec
|
|||
}
|
||||
return resourceTemplate, nil
|
||||
}
|
||||
|
||||
// HandleDeprioritizedPropagationPolicy responses to priority change of a PropagationPolicy,
|
||||
// if the change is from high priority (e.g. 5) to low priority(e.g. 3), it will
|
||||
// check if there is another PropagationPolicy could preempt the targeted resource,
|
||||
// and put the PropagationPolicy in the queue to trigger preemption.
|
||||
func (d *ResourceDetector) HandleDeprioritizedPropagationPolicy(oldPolicy policyv1alpha1.PropagationPolicy, newPolicy policyv1alpha1.PropagationPolicy) {
|
||||
klog.Infof("PropagationPolicy(%s/%s) priority changed from %d to %d", newPolicy.GetNamespace(), newPolicy.GetName(), *oldPolicy.Spec.Priority, *newPolicy.Spec.Priority)
|
||||
policies, err := d.propagationPolicyLister.ByNamespace(newPolicy.GetNamespace()).List(labels.Everything())
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to list PropagationPolicy from namespace: %s, error: %v", newPolicy.GetNamespace(), err)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(@RainbowMango): Should sort the listed policies to ensure the
|
||||
// higher priority PropagationPolicy be process first to avoid possible
|
||||
// multiple preemption.
|
||||
|
||||
for i := range policies {
|
||||
var potentialPolicy policyv1alpha1.PropagationPolicy
|
||||
if err = helper.ConvertToTypedObject(policies[i], &potentialPolicy); err != nil {
|
||||
klog.Errorf("Failed to convert typed PropagationPolicy: %v", err)
|
||||
continue
|
||||
}
|
||||
// Re-queue the polies that enables preemption and with the priority
|
||||
// in range (new priority, old priority).
|
||||
// For the polices with higher priority than old priority, it can
|
||||
// perform preempt automatically and don't need to re-queue here.
|
||||
// For the polices with lower priority than new priority, it can't
|
||||
// perform preempt as insufficient priority.
|
||||
if potentialPolicy.Spec.Priority != nil &&
|
||||
potentialPolicy.Spec.Preemption == policyv1alpha1.PreemptAlways &&
|
||||
potentialPolicy.ExplicitPriority() > newPolicy.ExplicitPriority() &&
|
||||
potentialPolicy.ExplicitPriority() < oldPolicy.ExplicitPriority() {
|
||||
var potentialKey util.QueueKey
|
||||
potentialKey, err = ClusterWideKeyFunc(&potentialPolicy)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
klog.Infof("Enqueuing PropagationPolicy(%s/%s) in case of PropagationPolicy(%s/%s) priority changes", potentialPolicy.GetNamespace(), potentialPolicy.GetName(), newPolicy.GetNamespace(), newPolicy.GetName())
|
||||
d.policyReconcileWorker.Add(potentialKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue