Merge pull request #446 from mrlihanbo/reschedule
fix bug of choosing candidate clusters when failover schedule
This commit is contained in:
commit
fb6f3bfe43
|
@ -52,3 +52,13 @@ func (s *Snapshot) GetReadyClusterNames() sets.String {
|
|||
|
||||
return readyClusterNames
|
||||
}
|
||||
|
||||
// GetCluster returns the given clusters.
|
||||
func (s *Snapshot) GetCluster(clusterName string) *framework.ClusterInfo {
|
||||
for _, c := range s.clusterInfoList {
|
||||
if c.Cluster().Name == clusterName {
|
||||
return c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
type ScheduleAlgorithm interface {
|
||||
Schedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
|
||||
ScaleSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
|
||||
FailoverSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
|
||||
}
|
||||
|
||||
// ScheduleResult includes the clusters selected.
|
||||
|
@ -581,3 +582,65 @@ func (g *genericScheduler) getPreUsed(clusters []*clusterv1alpha1.Cluster, preUs
|
|||
}
|
||||
return preUsedCluster, unUsedCluster
|
||||
}
|
||||
|
||||
func (g *genericScheduler) FailoverSchedule(ctx context.Context, placement *policyv1alpha1.Placement,
|
||||
spec *workv1alpha1.ResourceBindingSpec) (result ScheduleResult, err error) {
|
||||
readyClusters := g.schedulerCache.Snapshot().GetReadyClusterNames()
|
||||
totalClusters := util.ConvertToClusterNames(spec.Clusters)
|
||||
|
||||
reservedClusters := calcReservedCluster(totalClusters, readyClusters)
|
||||
availableClusters := calcAvailableCluster(totalClusters, readyClusters)
|
||||
|
||||
candidateClusters := sets.NewString()
|
||||
for clusterName := range availableClusters {
|
||||
clusterObj := g.schedulerCache.Snapshot().GetCluster(clusterName)
|
||||
if clusterObj == nil {
|
||||
return result, fmt.Errorf("failed to get clusterObj by clusterName: %s", clusterName)
|
||||
}
|
||||
|
||||
resMap := g.scheduleFramework.RunFilterPlugins(ctx, placement, &spec.Resource, clusterObj.Cluster())
|
||||
res := resMap.Merge()
|
||||
if !res.IsSuccess() {
|
||||
klog.V(4).Infof("cluster %q is not fit", clusterName)
|
||||
} else {
|
||||
candidateClusters.Insert(clusterName)
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Reserved bindingClusters : %v", reservedClusters.List())
|
||||
klog.V(4).Infof("Candidate bindingClusters: %v", candidateClusters.List())
|
||||
|
||||
// TODO: should schedule as much as possible?
|
||||
deltaLen := len(spec.Clusters) - len(reservedClusters)
|
||||
if len(candidateClusters) < deltaLen {
|
||||
// for ReplicaSchedulingTypeDivided, we will try to migrate replicas to the other health clusters
|
||||
if placement.ReplicaScheduling == nil || placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
|
||||
klog.Warningf("ignore reschedule binding as insufficient available cluster")
|
||||
return ScheduleResult{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: check if the final result meets the spread constraints.
|
||||
targetClusters := reservedClusters
|
||||
clusterList := candidateClusters.List()
|
||||
for i := 0; i < deltaLen && i < len(candidateClusters); i++ {
|
||||
targetClusters.Insert(clusterList[i])
|
||||
}
|
||||
|
||||
var reScheduleResult []workv1alpha1.TargetCluster
|
||||
for cluster := range targetClusters {
|
||||
reScheduleResult = append(reScheduleResult, workv1alpha1.TargetCluster{Name: cluster})
|
||||
}
|
||||
|
||||
return ScheduleResult{reScheduleResult}, nil
|
||||
}
|
||||
|
||||
// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'.
|
||||
func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String {
|
||||
return bindClusters.Difference(bindClusters.Difference(readyClusters))
|
||||
}
|
||||
|
||||
// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'.
|
||||
func calcAvailableCluster(bindCluster, readyClusters sets.String) sets.String {
|
||||
return readyClusters.Difference(bindCluster)
|
||||
}
|
||||
|
|
|
@ -12,13 +12,11 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
v1helper "k8s.io/component-helpers/scheduling/corev1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
|
@ -595,20 +593,15 @@ func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *wor
|
|||
return err
|
||||
}
|
||||
|
||||
targetClusters, err := s.obtainTargetCluster(clusterResourceBinding.Spec.Clusters, policy.Spec.Placement)
|
||||
reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if targetClusters == nil {
|
||||
if len(reScheduleResult.SuggestedClusters) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(tinyma123) Check if the final result meets the spread constraints.
|
||||
|
||||
clusterResourceBinding.Spec.Clusters = nil
|
||||
for cluster := range targetClusters {
|
||||
clusterResourceBinding.Spec.Clusters = append(clusterResourceBinding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster})
|
||||
}
|
||||
clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
|
||||
klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters)
|
||||
|
||||
_, err = s.KarmadaClient.WorkV1alpha1().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
|
||||
|
@ -625,20 +618,15 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha1.Reso
|
|||
return err
|
||||
}
|
||||
|
||||
targetClusters, err := s.obtainTargetCluster(resourceBinding.Spec.Clusters, placement)
|
||||
reScheduleResult, err := s.Algorithm.FailoverSchedule(context.TODO(), &placement, &resourceBinding.Spec)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if targetClusters == nil {
|
||||
if len(reScheduleResult.SuggestedClusters) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(tinyma123) Check if the final result meets the spread constraints.
|
||||
|
||||
resourceBinding.Spec.Clusters = nil
|
||||
for cluster := range targetClusters {
|
||||
resourceBinding.Spec.Clusters = append(resourceBinding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster})
|
||||
}
|
||||
resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
|
||||
klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters)
|
||||
|
||||
_, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
|
||||
|
@ -648,66 +636,6 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha1.Reso
|
|||
return nil
|
||||
}
|
||||
|
||||
// calcReservedCluster eliminates the not-ready clusters from the 'bindClusters'.
|
||||
func calcReservedCluster(bindClusters, readyClusters sets.String) sets.String {
|
||||
return bindClusters.Difference(bindClusters.Difference(readyClusters))
|
||||
}
|
||||
|
||||
// calcAvailableCluster returns a list of ready clusters that not in 'bindClusters'.
|
||||
func calcAvailableCluster(bindCluster, readyClusters sets.String) sets.String {
|
||||
return readyClusters.Difference(bindCluster)
|
||||
}
|
||||
|
||||
func (s *Scheduler) obtainTargetCluster(bindingClusters []workv1alpha1.TargetCluster, placement policyv1alpha1.Placement) (sets.String, error) {
|
||||
readyClusters := s.schedulerCache.Snapshot().GetReadyClusterNames()
|
||||
totalClusters := util.ConvertToClusterNames(bindingClusters)
|
||||
|
||||
reservedClusters := calcReservedCluster(totalClusters, readyClusters)
|
||||
availableClusters := calcAvailableCluster(totalClusters, readyClusters)
|
||||
|
||||
filterPredicate := func(t *corev1.Taint) bool {
|
||||
// now only interested in NoSchedule taint which means do not allow new resource to schedule onto the cluster unless they tolerate the taint
|
||||
// todo: supprot NoExecute taint
|
||||
return t.Effect == corev1.TaintEffectNoSchedule
|
||||
}
|
||||
|
||||
candidateClusters := sets.NewString()
|
||||
for clusterName := range availableClusters {
|
||||
clusterObj, err := s.clusterLister.Get(clusterName)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to get clusterObj by clusterName: %s", clusterName)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if placement.ClusterAffinity != nil && !util.ClusterMatches(clusterObj, *placement.ClusterAffinity) {
|
||||
continue
|
||||
}
|
||||
|
||||
_, isUntolerated := v1helper.FindMatchingUntoleratedTaint(clusterObj.Spec.Taints, placement.ClusterTolerations, filterPredicate)
|
||||
if !isUntolerated {
|
||||
candidateClusters.Insert(clusterName)
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Reserved bindingClusters : %v", reservedClusters.List())
|
||||
klog.V(4).Infof("Candidate bindingClusters: %v", candidateClusters.List())
|
||||
|
||||
// TODO: should schedule as much as possible?
|
||||
deltaLen := len(bindingClusters) - len(reservedClusters)
|
||||
if len(candidateClusters) < deltaLen {
|
||||
klog.Warningf("ignore reschedule binding as insufficient available cluster")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
targetClusters := reservedClusters
|
||||
clusterList := candidateClusters.List()
|
||||
for i := 0; i < deltaLen; i++ {
|
||||
targetClusters.Insert(clusterList[i])
|
||||
}
|
||||
|
||||
return targetClusters, nil
|
||||
}
|
||||
|
||||
func (s *Scheduler) scaleScheduleOne(key string) (err error) {
|
||||
klog.V(4).Infof("begin scale scheduling ResourceBinding %s", key)
|
||||
defer klog.V(4).Infof("end scale scheduling ResourceBinding %s: %v", key, err)
|
||||
|
|
Loading…
Reference in New Issue