Merge pull request #515 from qianjun1993/scale-scheduler

Replicas changes with Duplicated ReplicaSchedulingStrategy or Weighte…
This commit is contained in:
karmada-bot 2021-07-21 10:35:44 +08:00 committed by GitHub
commit 135c628019
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 182 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import (
// ScheduleAlgorithm is the interface that should be implemented to schedule a resource to the target clusters.
type ScheduleAlgorithm interface {
Schedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ObjectReference) (scheduleResult ScheduleResult, err error)
ScaleSchedule(context.Context, *policyv1alpha1.Placement, *workv1alpha1.ResourceBindingSpec) (scheduleResult ScheduleResult, err error)
}
// ScheduleResult includes the clusters selected.
@ -415,3 +416,52 @@ func (g *genericScheduler) divideReplicasAggregatedWithClusterReplicas(clusterAv
}
return targetClusters, nil
}
func (g *genericScheduler) ScaleSchedule(ctx context.Context, placement *policyv1alpha1.Placement,
spec *workv1alpha1.ResourceBindingSpec) (result ScheduleResult, err error) {
newTargetClusters := make([]workv1alpha1.TargetCluster, len(spec.Clusters))
if spec.Resource.Replicas > 0 {
if placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
for i, cluster := range spec.Clusters {
newTargetClusters[i] = workv1alpha1.TargetCluster{Name: cluster.Name, Replicas: spec.Resource.Replicas}
}
result.SuggestedClusters = newTargetClusters
return result, nil
}
if placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided {
preSelectedClusters := g.getPreSelected(spec.Clusters)
if placement.ReplicaScheduling.ReplicaDivisionPreference == policyv1alpha1.ReplicaDivisionPreferenceWeighted {
if placement.ReplicaScheduling.WeightPreference == nil {
return result, fmt.Errorf("no WeightPreference find to divide replicas")
}
clustersWithReplicase, err := g.divideReplicasByStaticWeight(preSelectedClusters, placement.ReplicaScheduling.WeightPreference.StaticWeightList, spec.Resource.Replicas)
if err != nil {
return result, fmt.Errorf("failed to assignReplicas with Weight: %v", err)
}
result.SuggestedClusters = clustersWithReplicase
return result, nil
}
}
}
for i, cluster := range spec.Clusters {
newTargetClusters[i] = workv1alpha1.TargetCluster{Name: cluster.Name}
}
result.SuggestedClusters = newTargetClusters
return result, nil
}
func (g *genericScheduler) getPreSelected(targetClusters []workv1alpha1.TargetCluster) []*clusterv1alpha1.Cluster {
var preSelectedClusters []*clusterv1alpha1.Cluster
clusterInfoSnapshot := g.schedulerCache.Snapshot()
for _, targetCluster := range targetClusters {
for _, cluster := range clusterInfoSnapshot.GetClusters() {
if targetCluster.Name == cluster.Cluster().Name {
preSelectedClusters = append(preSelectedClusters, cluster.Cluster())
break
}
}
}
return preSelectedClusters
}

View File

@ -56,6 +56,9 @@ const (
// ReconcileSchedule means the binding object associated policy has been changed.
ReconcileSchedule ScheduleType = "ReconcileSchedule"
// ScaleSchedule means the replicas of binding object has been changed.
ScaleSchedule ScheduleType = "ScaleSchedule"
// FailoverSchedule means one of the cluster a binding object associated with becomes failure.
FailoverSchedule ScheduleType = "FailoverSchedule"
@ -340,7 +343,7 @@ func (s *Scheduler) getScheduleType(key string) ScheduleType {
return FirstSchedule
}
_, policyPlacementStr, err := s.getPlacement(resourceBinding)
policyPlacement, policyPlacementStr, err := s.getPlacement(resourceBinding)
if err != nil {
return Unknown
}
@ -351,6 +354,10 @@ func (s *Scheduler) getScheduleType(key string) ScheduleType {
return ReconcileSchedule
}
if policyPlacement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&resourceBinding.Spec, policyPlacement.ReplicaScheduling) {
return ScaleSchedule
}
clusters := s.schedulerCache.Snapshot().GetClusters()
for _, tc := range resourceBinding.Spec.Clusters {
boundCluster := tc.Name
@ -391,6 +398,10 @@ func (s *Scheduler) getScheduleType(key string) ScheduleType {
return ReconcileSchedule
}
if policy.Spec.Placement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&binding.Spec, policy.Spec.Placement.ReplicaScheduling) {
return ScaleSchedule
}
clusters := s.schedulerCache.Snapshot().GetClusters()
for _, tc := range binding.Spec.Clusters {
boundCluster := tc.Name
@ -423,6 +434,9 @@ func (s *Scheduler) scheduleNext() bool {
case ReconcileSchedule: // share same logic with first schedule
err = s.scheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as placement changed", key.(string))
case ScaleSchedule:
err = s.scaleScheduleOne(key.(string))
klog.Infof("Reschedule binding(%s) as replicas scaled down or scaled up", key.(string))
case FailoverSchedule:
if Failover {
err = s.rescheduleOne(key.(string))
@ -779,3 +793,96 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha1.Reso
}
return nil
}
func (s *Scheduler) scaleScheduleOne(key string) (err error) {
klog.V(4).Infof("begin scale scheduling ResourceBinding %s", key)
defer klog.V(4).Infof("end scale scheduling ResourceBinding %s: %v", key, err)
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
if ns == "" {
clusterResourceBinding, err := s.clusterBindingLister.Get(name)
if errors.IsNotFound(err) {
return nil
}
clusterPolicyName := util.GetLabelValue(clusterResourceBinding.Labels, util.ClusterPropagationPolicyLabel)
clusterPolicy, err := s.clusterPolicyLister.Get(clusterPolicyName)
if err != nil {
return err
}
return s.scaleScheduleClusterResourceBinding(clusterResourceBinding, clusterPolicy)
}
resourceBinding, err := s.bindingLister.ResourceBindings(ns).Get(name)
if errors.IsNotFound(err) {
return nil
}
return s.scaleScheduleResourceBinding(resourceBinding)
}
func (s *Scheduler) scaleScheduleResourceBinding(resourceBinding *workv1alpha1.ResourceBinding) (err error) {
placement, placementStr, err := s.getPlacement(resourceBinding)
if err != nil {
return err
}
scheduleResult, err := s.Algorithm.ScaleSchedule(context.TODO(), &placement, &resourceBinding.Spec)
if err != nil {
klog.V(2).Infof("failed rescheduling ResourceBinding %s/%s after replicas changes: %v", resourceBinding.Namespace, resourceBinding.Name, err)
return err
}
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
binding := resourceBinding.DeepCopy()
binding.Spec.Clusters = scheduleResult.SuggestedClusters
if binding.Annotations == nil {
binding.Annotations = make(map[string]string)
}
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr
_, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *workv1alpha1.ClusterResourceBinding,
policy *policyv1alpha1.ClusterPropagationPolicy) (err error) {
scheduleResult, err := s.Algorithm.ScaleSchedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec)
if err != nil {
klog.V(2).Infof("failed rescheduling ClusterResourceBinding %s after replicas scaled down: %v", clusterResourceBinding.Name, err)
return err
}
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
binding := clusterResourceBinding.DeepCopy()
binding.Spec.Clusters = scheduleResult.SuggestedClusters
placement, err := json.Marshal(policy.Spec.Placement)
if err != nil {
klog.Errorf("Failed to marshal placement of propagationPolicy %s/%s, error: %v", policy.Namespace, policy.Name, err)
return err
}
if binding.Annotations == nil {
binding.Annotations = make(map[string]string)
}
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)
_, err = s.KarmadaClient.WorkV1alpha1().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}

View File

@ -1,6 +1,7 @@
package util
import (
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
)
@ -12,3 +13,26 @@ func GetBindingClusterNames(binding *workv1alpha1.ResourceBinding) []string {
}
return clusterNames
}
// IsBindingReplicasChanged will check if the sum of replicas is different from the replicas of object
func IsBindingReplicasChanged(bindingSpec *workv1alpha1.ResourceBindingSpec, strategy *policyv1alpha1.ReplicaSchedulingStrategy) bool {
if strategy == nil {
return false
}
if strategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
for _, targetCluster := range bindingSpec.Clusters {
if targetCluster.Replicas != bindingSpec.Resource.Replicas {
return true
}
}
return false
}
if strategy.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDivided {
replicasSum := int32(0)
for _, targetCluster := range bindingSpec.Clusters {
replicasSum += targetCluster.Replicas
}
return replicasSum != bindingSpec.Resource.Replicas
}
return false
}