Merge pull request #2912 from jwcesign/fix-pp-rb
fix a corner case that re-schedule be skipped in case of the cluster becomes not fit.
This commit is contained in:
commit
f029394009
|
@ -63,6 +63,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph
|
||||||
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
|
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Short path for case no cluster fit.
|
||||||
if len(feasibleClusters) == 0 {
|
if len(feasibleClusters) == 0 {
|
||||||
return result, &framework.FitError{
|
return result, &framework.FitError{
|
||||||
NumAllClusters: clusterInfoSnapshot.NumOfClusters(),
|
NumAllClusters: clusterInfoSnapshot.NumOfClusters(),
|
||||||
|
|
|
@ -229,13 +229,13 @@ func (s *Scheduler) updateCluster(oldObj, newObj interface{}) {
|
||||||
case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels):
|
case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels):
|
||||||
fallthrough
|
fallthrough
|
||||||
case !equality.Semantic.DeepEqual(oldCluster.Spec, newCluster.Spec):
|
case !equality.Semantic.DeepEqual(oldCluster.Spec, newCluster.Spec):
|
||||||
s.enqueueAffectedPolicy(newCluster)
|
s.enqueueAffectedPolicy(oldCluster, newCluster)
|
||||||
s.enqueueAffectedClusterPolicy(newCluster)
|
s.enqueueAffectedClusterPolicy(oldCluster, newCluster)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueAffectedPolicy find all propagation policies related to the cluster and reschedule the RBs
|
// enqueueAffectedPolicy find all propagation policies related to the cluster and reschedule the RBs
|
||||||
func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
|
func (s *Scheduler) enqueueAffectedPolicy(oldCluster, newCluster *clusterv1alpha1.Cluster) {
|
||||||
policies, _ := s.policyLister.List(labels.Everything())
|
policies, _ := s.policyLister.List(labels.Everything())
|
||||||
for _, policy := range policies {
|
for _, policy := range policies {
|
||||||
selector := labels.SelectorFromSet(labels.Set{
|
selector := labels.SelectorFromSet(labels.Set{
|
||||||
|
@ -245,10 +245,13 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
|
||||||
affinity := policy.Spec.Placement.ClusterAffinity
|
affinity := policy.Spec.Placement.ClusterAffinity
|
||||||
switch {
|
switch {
|
||||||
case affinity == nil:
|
case affinity == nil:
|
||||||
// If no clusters specified, add it in queue
|
// If no clusters specified, add it to the queue
|
||||||
fallthrough
|
fallthrough
|
||||||
case util.ClusterMatches(newCluster, *affinity):
|
case util.ClusterMatches(newCluster, *affinity):
|
||||||
// If specific cluster matches the affinity. add it in queue
|
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
|
||||||
|
fallthrough
|
||||||
|
case util.ClusterMatches(oldCluster, *affinity):
|
||||||
|
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
|
||||||
err := s.requeueResourceBindings(selector, metrics.ClusterChanged)
|
err := s.requeueResourceBindings(selector, metrics.ClusterChanged)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
|
||||||
|
@ -258,7 +261,7 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueAffectedClusterPolicy find all cluster propagation policies related to the cluster and reschedule the RBs/CRBs
|
// enqueueAffectedClusterPolicy find all cluster propagation policies related to the cluster and reschedule the RBs/CRBs
|
||||||
func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Cluster) {
|
func (s *Scheduler) enqueueAffectedClusterPolicy(oldCluster, newCluster *clusterv1alpha1.Cluster) {
|
||||||
clusterPolicies, _ := s.clusterPolicyLister.List(labels.Everything())
|
clusterPolicies, _ := s.clusterPolicyLister.List(labels.Everything())
|
||||||
for _, policy := range clusterPolicies {
|
for _, policy := range clusterPolicies {
|
||||||
selector := labels.SelectorFromSet(labels.Set{
|
selector := labels.SelectorFromSet(labels.Set{
|
||||||
|
@ -267,10 +270,13 @@ func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Clu
|
||||||
affinity := policy.Spec.Placement.ClusterAffinity
|
affinity := policy.Spec.Placement.ClusterAffinity
|
||||||
switch {
|
switch {
|
||||||
case affinity == nil:
|
case affinity == nil:
|
||||||
// If no clusters specified, add it in queue
|
// If no clusters specified, add it to the queue
|
||||||
fallthrough
|
fallthrough
|
||||||
case util.ClusterMatches(newCluster, *affinity):
|
case util.ClusterMatches(newCluster, *affinity):
|
||||||
// If specific cluster matches the affinity. add it in queue
|
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
|
||||||
|
fallthrough
|
||||||
|
case util.ClusterMatches(oldCluster, *affinity):
|
||||||
|
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
|
||||||
err := s.requeueClusterResourceBindings(selector, metrics.ClusterChanged)
|
err := s.requeueClusterResourceBindings(selector, metrics.ClusterChanged)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
|
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package scheduler
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
@ -13,6 +14,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
@ -33,6 +35,7 @@ import (
|
||||||
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
|
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
|
||||||
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
|
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
|
||||||
"github.com/karmada-io/karmada/pkg/scheduler/core"
|
"github.com/karmada-io/karmada/pkg/scheduler/core"
|
||||||
|
"github.com/karmada-io/karmada/pkg/scheduler/framework"
|
||||||
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
|
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
|
||||||
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
|
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
|
||||||
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
||||||
|
@ -476,13 +479,16 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
|
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
|
||||||
if err != nil {
|
var noClusterFit *framework.FitError
|
||||||
|
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
|
||||||
|
if err != nil && !errors.As(err, &noClusterFit) {
|
||||||
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
|
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
|
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
|
||||||
return s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
|
scheduleErr := s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
|
||||||
|
return utilerrors.NewAggregate([]error{err, scheduleErr})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
|
func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
|
||||||
|
@ -527,13 +533,16 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
|
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
|
||||||
if err != nil {
|
var noClusterFit *framework.FitError
|
||||||
|
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
|
||||||
|
if err != nil && !errors.As(err, &noClusterFit) {
|
||||||
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
|
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
|
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
|
||||||
return s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
|
scheduleErr := s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
|
||||||
|
return utilerrors.NewAggregate([]error{err, scheduleErr})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
|
func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
|
||||||
|
|
|
@ -230,6 +230,50 @@ func setClusterLabel(c client.Client, clusterName string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateClusterLabels updates cluster labels.
|
||||||
|
func UpdateClusterLabels(client karmada.Interface, clusterName string, labels map[string]string) {
|
||||||
|
gomega.Eventually(func() (bool, error) {
|
||||||
|
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cluster.Labels == nil {
|
||||||
|
cluster.Labels = map[string]string{}
|
||||||
|
}
|
||||||
|
for key, value := range labels {
|
||||||
|
cluster.Labels[key] = value
|
||||||
|
}
|
||||||
|
_, err = client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteClusterLabels deletes cluster labels if it exists.
|
||||||
|
func DeleteClusterLabels(client karmada.Interface, clusterName string, labels map[string]string) {
|
||||||
|
gomega.Eventually(func() (bool, error) {
|
||||||
|
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cluster.Labels == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
for key := range labels {
|
||||||
|
delete(cluster.Labels, key)
|
||||||
|
}
|
||||||
|
_, err = client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
|
||||||
|
}
|
||||||
|
|
||||||
// GetClusterNamesFromClusters will get Clusters' names form Clusters Object.
|
// GetClusterNamesFromClusters will get Clusters' names form Clusters Object.
|
||||||
func GetClusterNamesFromClusters(clusters []*clusterv1alpha1.Cluster) []string {
|
func GetClusterNamesFromClusters(clusters []*clusterv1alpha1.Cluster) []string {
|
||||||
clusterNames := make([]string, 0, len(clusters))
|
clusterNames := make([]string, 0, len(clusters))
|
||||||
|
|
|
@ -319,3 +319,112 @@ var _ = ginkgo.Describe("[cluster joined] reschedule testing", func() {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// reschedule testing while policy matches, triggered by label changes.
|
||||||
|
var _ = ginkgo.Describe("[cluster labels changed] reschedule testing while policy matches", func() {
|
||||||
|
var deployment *appsv1.Deployment
|
||||||
|
var targetMember string
|
||||||
|
var labelKey string
|
||||||
|
var policyNamespace string
|
||||||
|
var policyName string
|
||||||
|
|
||||||
|
ginkgo.BeforeEach(func() {
|
||||||
|
targetMember = framework.ClusterNames()[0]
|
||||||
|
policyNamespace = testNamespace
|
||||||
|
policyName = deploymentNamePrefix + rand.String(RandomStrLength)
|
||||||
|
labelKey = "cluster" + rand.String(RandomStrLength)
|
||||||
|
|
||||||
|
deployment = testhelper.NewDeployment(testNamespace, policyName)
|
||||||
|
framework.CreateDeployment(kubeClient, deployment)
|
||||||
|
|
||||||
|
labels := map[string]string{labelKey: "ok"}
|
||||||
|
framework.UpdateClusterLabels(karmadaClient, targetMember, labels)
|
||||||
|
|
||||||
|
ginkgo.DeferCleanup(func() {
|
||||||
|
framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name)
|
||||||
|
framework.DeleteClusterLabels(karmadaClient, targetMember, labels)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.Context("Changes cluster labels to test reschedule while pp matches", func() {
|
||||||
|
var policy *policyv1alpha1.PropagationPolicy
|
||||||
|
|
||||||
|
ginkgo.BeforeEach(func() {
|
||||||
|
policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
|
||||||
|
{
|
||||||
|
APIVersion: deployment.APIVersion,
|
||||||
|
Kind: deployment.Kind,
|
||||||
|
Name: deployment.Name,
|
||||||
|
}}, policyv1alpha1.Placement{
|
||||||
|
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
|
||||||
|
LabelSelector: &metav1.LabelSelector{
|
||||||
|
MatchLabels: map[string]string{labelKey: "ok"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.BeforeEach(func() {
|
||||||
|
framework.CreatePropagationPolicy(karmadaClient, policy)
|
||||||
|
|
||||||
|
ginkgo.DeferCleanup(func() {
|
||||||
|
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
|
||||||
|
})
|
||||||
|
|
||||||
|
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
|
||||||
|
func(deployment *appsv1.Deployment) bool { return true })
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.It("change labels to testing deployment reschedule", func() {
|
||||||
|
labelsUpdate := map[string]string{labelKey: "not_ok"}
|
||||||
|
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
|
||||||
|
framework.WaitDeploymentDisappearOnCluster(targetMember, deployment.Namespace, deployment.Name)
|
||||||
|
|
||||||
|
labelsUpdate = map[string]string{labelKey: "ok"}
|
||||||
|
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
|
||||||
|
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
|
||||||
|
func(deployment *appsv1.Deployment) bool { return true })
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.Context("Changes cluster labels to test reschedule while cpp matches", func() {
|
||||||
|
var policy *policyv1alpha1.ClusterPropagationPolicy
|
||||||
|
|
||||||
|
ginkgo.BeforeEach(func() {
|
||||||
|
policy = testhelper.NewClusterPropagationPolicy(policyName, []policyv1alpha1.ResourceSelector{
|
||||||
|
{
|
||||||
|
APIVersion: deployment.APIVersion,
|
||||||
|
Kind: deployment.Kind,
|
||||||
|
Name: deployment.Name,
|
||||||
|
}}, policyv1alpha1.Placement{
|
||||||
|
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
|
||||||
|
LabelSelector: &metav1.LabelSelector{
|
||||||
|
MatchLabels: map[string]string{labelKey: "ok"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.BeforeEach(func() {
|
||||||
|
framework.CreateClusterPropagationPolicy(karmadaClient, policy)
|
||||||
|
|
||||||
|
ginkgo.DeferCleanup(func() {
|
||||||
|
framework.RemoveClusterPropagationPolicy(karmadaClient, policy.Name)
|
||||||
|
})
|
||||||
|
|
||||||
|
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
|
||||||
|
func(deployment *appsv1.Deployment) bool { return true })
|
||||||
|
})
|
||||||
|
|
||||||
|
ginkgo.It("change labels to testing deployment reschedule", func() {
|
||||||
|
labelsUpdate := map[string]string{labelKey: "not_ok"}
|
||||||
|
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
|
||||||
|
framework.WaitDeploymentDisappearOnCluster(targetMember, deployment.Namespace, deployment.Name)
|
||||||
|
|
||||||
|
labelsUpdate = map[string]string{labelKey: "ok"}
|
||||||
|
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
|
||||||
|
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
|
||||||
|
func(deployment *appsv1.Deployment) bool { return true })
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
Loading…
Reference in New Issue