diff --git a/pkg/scheduler/event_handler.go b/pkg/scheduler/event_handler.go index 500318088..df2e6f95b 100644 --- a/pkg/scheduler/event_handler.go +++ b/pkg/scheduler/event_handler.go @@ -178,9 +178,18 @@ func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alp for _, binding := range bindings { placementPtr := binding.Spec.Placement if placementPtr == nil { + // never reach here continue } - affinity := placementPtr.ClusterAffinity + + var affinity *policyv1alpha1.ClusterAffinity + if placementPtr.ClusterAffinities != nil { + affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName) + affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity + } else { + affinity = placementPtr.ClusterAffinity + } + switch { case affinity == nil: // If no clusters specified, add it to the queue @@ -198,9 +207,18 @@ func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alp for _, binding := range clusterBindings { placementPtr := binding.Spec.Placement if placementPtr == nil { + // never reach here continue } - affinity := placementPtr.ClusterAffinity + + var affinity *policyv1alpha1.ClusterAffinity + if placementPtr.ClusterAffinities != nil { + affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName) + affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity + } else { + affinity = placementPtr.ClusterAffinity + } + switch { case affinity == nil: // If no clusters specified, add it to the queue diff --git a/pkg/scheduler/helper.go b/pkg/scheduler/helper.go index 39e9ca269..0f1741a85 100644 --- a/pkg/scheduler/helper.go +++ b/pkg/scheduler/helper.go @@ -71,3 +71,16 @@ func clusterAffinitiesChanged( } return false } + +func getAffinityIndex(affinities []policyv1alpha1.ClusterAffinityTerm, observedName string) int { + if observedName == "" { + return 0 + } + + for index, term := range affinities { + if term.AffinityName == observedName { + return index + } + } + return 0 +} diff --git a/pkg/scheduler/helper_test.go b/pkg/scheduler/helper_test.go index 8a6d118ab..8db85bea5 100644 --- a/pkg/scheduler/helper_test.go +++ b/pkg/scheduler/helper_test.go @@ -372,3 +372,59 @@ func Test_needConsideredPlacementChanged(t *testing.T) { }) } } + +func Test_getAffinityIndex(t *testing.T) { + type args struct { + affinities []policyv1alpha1.ClusterAffinityTerm + observedName string + } + tests := []struct { + name string + args args + want int + }{ + { + name: "empty observedName", + args: args{ + affinities: []policyv1alpha1.ClusterAffinityTerm{ + {AffinityName: "group1"}, + {AffinityName: "group2"}, + {AffinityName: "group3"}, + }, + observedName: "", + }, + want: 0, + }, + { + name: "observedName can not find in affinities", + args: args{ + affinities: []policyv1alpha1.ClusterAffinityTerm{ + {AffinityName: "group1"}, + {AffinityName: "group2"}, + {AffinityName: "group3"}, + }, + observedName: "group0", + }, + want: 0, + }, + { + name: "observedName can find in affinities", + args: args{ + affinities: []policyv1alpha1.ClusterAffinityTerm{ + {AffinityName: "group1"}, + {AffinityName: "group2"}, + {AffinityName: "group3"}, + }, + observedName: "group3", + }, + want: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getAffinityIndex(tt.args.affinities, tt.args.observedName); got != tt.want { + t.Errorf("getAffinityIndex() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 18e659699..59b8b4c7d 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -266,48 +266,6 @@ func (s *Scheduler) worker() { } } -func (s *Scheduler) getPlacement(resourceBinding *workv1alpha2.ResourceBinding) (policyv1alpha1.Placement, string, error) { - var placement policyv1alpha1.Placement - var err error - - placementPtr := resourceBinding.Spec.Placement - if placementPtr == nil { - err = fmt.Errorf("failed to get placement from resourceBinding(%s/%s)", resourceBinding.Namespace, resourceBinding.Name) - klog.Error(err) - return placement, "", err - } - - placement = *placementPtr - var placementBytes []byte - placementBytes, err = json.Marshal(placement) - if err != nil { - return placement, "", err - } - - return placement, string(placementBytes), nil -} - -func (s *Scheduler) getClusterPlacement(crb *workv1alpha2.ClusterResourceBinding) (policyv1alpha1.Placement, string, error) { - var placement policyv1alpha1.Placement - var err error - - placementPtr := crb.Spec.Placement - if placementPtr == nil { - err = fmt.Errorf("failed to get placement from clusterResourceBinding(%s)", crb.Name) - klog.Error(err) - return placement, "", err - } - - placement = *placementPtr - var placementBytes []byte - placementBytes, err = json.Marshal(placement) - if err != nil { - return placement, "", err - } - - return placement, string(placementBytes), nil -} - func (s *Scheduler) scheduleNext() bool { key, shutdown := s.queue.Get() if shutdown { @@ -342,38 +300,41 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) { return err } - start := time.Now() - policyPlacement, policyPlacementStr, err := s.getPlacement(rb) - if err != nil { + if rb.Spec.Placement == nil { + // never reach here + err = fmt.Errorf("failed to get placement from resourceBinding(%s/%s)", rb.Namespace, rb.Name) + klog.Error(err) return err } + + start := time.Now() appliedPlacementStr := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation) if placementChanged(*rb.Spec.Placement, appliedPlacementStr, rb.Status.SchedulerObservedAffinityName) { // policy placement changed, need schedule klog.Infof("Start to schedule ResourceBinding(%s/%s) as placement changed", namespace, name) - err = s.scheduleResourceBinding(rb, policyPlacementStr) + err = s.scheduleResourceBinding(rb) metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err) return err } - if policyPlacement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&rb.Spec, policyPlacement.ReplicaScheduling) { + if rb.Spec.Placement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&rb.Spec, rb.Spec.Placement.ReplicaScheduling) { // binding replicas changed, need reschedule klog.Infof("Reschedule ResourceBinding(%s/%s) as replicas scaled down or scaled up", namespace, name) - err = s.scheduleResourceBinding(rb, policyPlacementStr) + err = s.scheduleResourceBinding(rb) metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err) return err } if rb.Spec.Replicas == 0 || - policyPlacement.ReplicaScheduling == nil || - policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { + rb.Spec.Placement.ReplicaScheduling == nil || + rb.Spec.Placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { // Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated // even if scheduling type is divided. klog.V(3).Infof("Start to schedule ResourceBinding(%s/%s) as scheduling type is duplicated", namespace, name) - err = s.scheduleResourceBinding(rb, policyPlacementStr) + err = s.scheduleResourceBinding(rb) metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err) return err } // TODO(dddddai): reschedule bindings on cluster change - klog.V(3).Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name) + klog.V(3).Infof("Don't need to schedule ResourceBinding(%s/%s)", rb.Namespace, rb.Name) return nil } @@ -387,33 +348,36 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) { return err } - start := time.Now() - policyPlacement, policyPlacementStr, err := s.getClusterPlacement(crb) - if err != nil { + if crb.Spec.Placement == nil { + // never reach here + err = fmt.Errorf("failed to get placement from clusterResourceBinding(%s)", crb.Name) + klog.Error(err) return err } + + start := time.Now() appliedPlacementStr := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation) if placementChanged(*crb.Spec.Placement, appliedPlacementStr, crb.Status.SchedulerObservedAffinityName) { // policy placement changed, need schedule klog.Infof("Start to schedule ClusterResourceBinding(%s) as placement changed", name) - err = s.scheduleClusterResourceBinding(crb, policyPlacementStr) + err = s.scheduleClusterResourceBinding(crb) metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err) return err } - if policyPlacement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&crb.Spec, policyPlacement.ReplicaScheduling) { + if crb.Spec.Placement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&crb.Spec, crb.Spec.Placement.ReplicaScheduling) { // binding replicas changed, need reschedule klog.Infof("Reschedule ClusterResourceBinding(%s) as replicas scaled down or scaled up", name) - err = s.scheduleClusterResourceBinding(crb, policyPlacementStr) + err = s.scheduleClusterResourceBinding(crb) metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err) return err } if crb.Spec.Replicas == 0 || - policyPlacement.ReplicaScheduling == nil || - policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { + crb.Spec.Placement.ReplicaScheduling == nil || + crb.Spec.Placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated { // Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated // even if scheduling type is divided. klog.V(3).Infof("Start to schedule ClusterResourceBinding(%s) as scheduling type is duplicated", name) - err = s.scheduleClusterResourceBinding(crb, policyPlacementStr) + err = s.scheduleClusterResourceBinding(crb) metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err) return err } @@ -422,21 +386,16 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) { return nil } -func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding, placementStr string) (err error) { - klog.V(4).InfoS("Begin scheduling resource binding", "resourceBinding", klog.KObj(resourceBinding)) - defer klog.V(4).InfoS("End scheduling resource binding", "resourceBinding", klog.KObj(resourceBinding)) - - // Update "Scheduled" condition according to schedule result. +func (s *Scheduler) scheduleResourceBinding(rb *workv1alpha2.ResourceBinding) (err error) { defer func() { - s.recordScheduleResultEventForResourceBinding(resourceBinding, err) var condition metav1.Condition if err == nil { condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue) } else { condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse) } - if updateErr := s.patchBindingScheduleStatus(resourceBinding, condition); updateErr != nil { - klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", resourceBinding.Namespace, resourceBinding.Name, err) + if updateErr := patchBindingStatusCondition(s.KarmadaClient, rb, condition); updateErr != nil { + klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err) if err == nil { // schedule succeed but update status failed, return err in order to retry in next loop. err = updateErr @@ -444,18 +403,98 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour } }() - scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &resourceBinding.Spec, &resourceBinding.Status, - &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation}) - var noClusterFit *framework.FitError - // in case of no cluster fit, can not return but continue to patch(cleanup) the result. - if err != nil && !errors.As(err, &noClusterFit) { - klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err) + if rb.Spec.Placement.ClusterAffinities != nil { + return s.scheduleResourceBindingWithClusterAffinities(rb) + } + return s.scheduleResourceBindingWithClusterAffinity(rb) +} + +func (s *Scheduler) scheduleResourceBindingWithClusterAffinity(rb *workv1alpha2.ResourceBinding) error { + klog.V(4).InfoS("Begin scheduling resource binding with ClusterAffinity", "resourceBinding", klog.KObj(rb)) + defer klog.V(4).InfoS("End scheduling resource binding with ClusterAffinity", "resourceBinding", klog.KObj(rb)) + + placementBytes, err := json.Marshal(*rb.Spec.Placement) + if err != nil { + klog.V(4).ErrorS(err, "Failed to marshal binding placement", "resourceBinding", klog.KObj(rb)) return err } - klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters) - scheduleErr := s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters) - return utilerrors.NewAggregate([]error{err, scheduleErr}) + scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &rb.Spec, &rb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation}) + var noClusterFit *framework.FitError + // in case of no cluster fit, can not return but continue to patch(cleanup) the result. + if err != nil && !errors.As(err, &noClusterFit) { + s.recordScheduleResultEventForResourceBinding(rb, err) + klog.Errorf("Failed scheduling ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err) + return err + } + + klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, scheduleResult.SuggestedClusters) + patchErr := s.patchScheduleResultForResourceBinding(rb, string(placementBytes), scheduleResult.SuggestedClusters) + s.recordScheduleResultEventForResourceBinding(rb, utilerrors.NewAggregate([]error{err, patchErr})) + + // only care about the patch result, + // for FitError already recorded by event + return patchErr +} + +func (s *Scheduler) scheduleResourceBindingWithClusterAffinities(rb *workv1alpha2.ResourceBinding) error { + klog.V(4).InfoS("Begin scheduling resourceBinding with ClusterAffinities", "resourceBinding", klog.KObj(rb)) + defer klog.V(4).InfoS("End scheduling resourceBinding with ClusterAffinities", "resourceBinding", klog.KObj(rb)) + + placementBytes, err := json.Marshal(*rb.Spec.Placement) + if err != nil { + klog.V(4).ErrorS(err, "Failed to marshal binding placement", "resourceBinding", klog.KObj(rb)) + return err + } + + var ( + scheduleResult core.ScheduleResult + firstErr error + ) + + affinityIndex := getAffinityIndex(rb.Spec.Placement.ClusterAffinities, rb.Status.SchedulerObservedAffinityName) + updatedStatus := rb.Status.DeepCopy() + for affinityIndex < len(rb.Spec.Placement.ClusterAffinities) { + klog.V(4).Infof("Schedule ResourceBinding(%s/%s) with clusterAffiliates index(%d)", rb.Namespace, rb.Name, affinityIndex) + updatedStatus.SchedulerObservedAffinityName = rb.Spec.Placement.ClusterAffinities[affinityIndex].AffinityName + scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &rb.Spec, updatedStatus, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation}) + if err == nil { + break + } + + // obtain to err of the first scheduling + if firstErr == nil { + firstErr = err + } + + err = fmt.Errorf("failed to schedule ResourceBinding(%s/%s) with clusterAffiliates index(%d): %v", rb.Namespace, rb.Name, affinityIndex, err) + klog.Error(err) + s.recordScheduleResultEventForResourceBinding(rb, err) + affinityIndex++ + } + + if affinityIndex >= len(rb.Spec.Placement.ClusterAffinities) { + klog.Errorf("Failed to schedule ResourceBinding(%s/%s) with all ClusterAffinities.", rb.Namespace, rb.Name) + + updatedStatus.SchedulerObservedAffinityName = rb.Status.SchedulerObservedAffinityName + + var noClusterFit *framework.FitError + if !errors.As(firstErr, &noClusterFit) { + return firstErr + } + + klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, nil) + patchErr := s.patchScheduleResultForResourceBinding(rb, string(placementBytes), nil) + s.recordScheduleResultEventForResourceBinding(rb, patchErr) + return patchErr + } + + klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, scheduleResult.SuggestedClusters) + patchErr := s.patchScheduleResultForResourceBinding(rb, string(placementBytes), scheduleResult.SuggestedClusters) + patchStatusErr := patchBindingStatusWithAffinityName(s.KarmadaClient, rb, updatedStatus.SchedulerObservedAffinityName) + scheduleErr := utilerrors.NewAggregate([]error{patchErr, patchStatusErr}) + s.recordScheduleResultEventForResourceBinding(rb, scheduleErr) + return scheduleErr } func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error { @@ -483,24 +522,24 @@ func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alph } _, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(newBinding.Namespace).Patch(context.TODO(), newBinding.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) - return err + if err != nil { + klog.Errorf("Failed to patch schedule to ResourceBinding(%s/%s): %v", oldBinding.Namespace, oldBinding.Name, err) + } + + klog.V(4).Infof("Patch schedule to ResourceBinding(%s/%s) succeed", oldBinding.Namespace, oldBinding.Name) + return nil } -func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, placementStr string) (err error) { - klog.V(4).InfoS("Begin scheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding)) - defer klog.V(4).InfoS("End scheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding)) - - // Update "Scheduled" condition according to schedule result. +func (s *Scheduler) scheduleClusterResourceBinding(crb *workv1alpha2.ClusterResourceBinding) (err error) { defer func() { - s.recordScheduleResultEventForClusterResourceBinding(clusterResourceBinding, err) var condition metav1.Condition if err == nil { condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue) } else { condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse) } - if updateErr := s.patchClusterBindingScheduleStatus(clusterResourceBinding, condition); updateErr != nil { - klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err) + if updateErr := patchClusterBindingStatusCondition(s.KarmadaClient, crb, condition); updateErr != nil { + klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", crb.Name, err) if err == nil { // schedule succeed but update status failed, return err in order to retry in next loop. err = updateErr @@ -508,18 +547,98 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv } }() - scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &clusterResourceBinding.Spec, &clusterResourceBinding.Status, - &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation}) - var noClusterFit *framework.FitError - // in case of no cluster fit, can not return but continue to patch(cleanup) the result. - if err != nil && !errors.As(err, &noClusterFit) { - klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err) + if crb.Spec.Placement.ClusterAffinities != nil { + return s.scheduleClusterResourceBindingWithClusterAffinities(crb) + } + return s.scheduleClusterResourceBindingWithClusterAffinity(crb) +} + +func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinity(crb *workv1alpha2.ClusterResourceBinding) error { + klog.V(4).InfoS("Begin scheduling clusterResourceBinding with ClusterAffinity", "clusterResourceBinding", klog.KObj(crb)) + defer klog.V(4).InfoS("End scheduling clusterResourceBinding with ClusterAffinity", "clusterResourceBinding", klog.KObj(crb)) + + placementBytes, err := json.Marshal(*crb.Spec.Placement) + if err != nil { + klog.V(4).ErrorS(err, "Failed to marshal binding placement", "clusterResourceBinding", klog.KObj(crb)) return err } - klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters) - scheduleErr := s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, placementStr, scheduleResult.SuggestedClusters) - return utilerrors.NewAggregate([]error{err, scheduleErr}) + scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &crb.Spec, &crb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation}) + var noClusterFit *framework.FitError + // in case of no cluster fit, can not return but continue to patch(cleanup) the result. + if err != nil && !errors.As(err, &noClusterFit) { + s.recordScheduleResultEventForClusterResourceBinding(crb, err) + klog.Errorf("Failed scheduling clusterResourceBinding(%s): %v", crb.Name, err) + return err + } + + klog.V(4).Infof("clusterResourceBinding(%s) scheduled to clusters %v", crb.Name, scheduleResult.SuggestedClusters) + patchErr := s.patchScheduleResultForClusterResourceBinding(crb, string(placementBytes), scheduleResult.SuggestedClusters) + s.recordScheduleResultEventForClusterResourceBinding(crb, utilerrors.NewAggregate([]error{err, patchErr})) + + // only care about the patch result, + // for FitError already recorded by event + return patchErr +} + +func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinities(crb *workv1alpha2.ClusterResourceBinding) error { + klog.V(4).InfoS("Begin scheduling clusterResourceBinding with ClusterAffinities", "clusterResourceBinding", klog.KObj(crb)) + defer klog.V(4).InfoS("End scheduling clusterResourceBinding with ClusterAffinities", "clusterResourceBinding", klog.KObj(crb)) + + placementBytes, err := json.Marshal(*crb.Spec.Placement) + if err != nil { + klog.V(4).ErrorS(err, "Failed to marshal binding placement", "clusterResourceBinding", klog.KObj(crb)) + return err + } + + var ( + scheduleResult core.ScheduleResult + firstErr error + ) + + affinityIndex := getAffinityIndex(crb.Spec.Placement.ClusterAffinities, crb.Status.SchedulerObservedAffinityName) + updatedStatus := crb.Status.DeepCopy() + for affinityIndex < len(crb.Spec.Placement.ClusterAffinities) { + klog.V(4).Infof("Schedule ClusterResourceBinding(%s) with clusterAffiliates index(%d)", crb.Name, affinityIndex) + updatedStatus.SchedulerObservedAffinityName = crb.Spec.Placement.ClusterAffinities[affinityIndex].AffinityName + scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &crb.Spec, updatedStatus, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation}) + if err == nil { + break + } + + // obtain to err of the first scheduling + if firstErr == nil { + firstErr = err + } + + err = fmt.Errorf("failed to schedule ClusterResourceBinding(%s) with clusterAffiliates index(%d): %v", crb.Name, affinityIndex, err) + klog.Error(err) + s.recordScheduleResultEventForClusterResourceBinding(crb, err) + affinityIndex++ + } + + if affinityIndex >= len(crb.Spec.Placement.ClusterAffinities) { + klog.Errorf("Failed to schedule ClusterResourceBinding(%s) with all ClusterAffinities.", crb.Name) + + updatedStatus.SchedulerObservedAffinityName = crb.Status.SchedulerObservedAffinityName + + var noClusterFit *framework.FitError + if !errors.As(firstErr, &noClusterFit) { + return firstErr + } + + klog.V(4).Infof("ClusterResourceBinding(%s) scheduled to clusters %v", crb.Name, nil) + patchErr := s.patchScheduleResultForClusterResourceBinding(crb, string(placementBytes), nil) + s.recordScheduleResultEventForClusterResourceBinding(crb, patchErr) + return patchErr + } + + klog.V(4).Infof("ClusterResourceBinding(%s) scheduled to clusters %v", crb.Name, scheduleResult.SuggestedClusters) + patchErr := s.patchScheduleResultForClusterResourceBinding(crb, string(placementBytes), scheduleResult.SuggestedClusters) + patchStatusErr := patchClusterBindingStatusWithAffinityName(s.KarmadaClient, crb, updatedStatus.SchedulerObservedAffinityName) + scheduleErr := utilerrors.NewAggregate([]error{patchErr, patchStatusErr}) + s.recordScheduleResultEventForClusterResourceBinding(crb, scheduleErr) + return scheduleErr } func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error { @@ -597,33 +716,43 @@ func (s *Scheduler) establishEstimatorConnections() { } } -// patchBindingScheduleStatus patches schedule status of ResourceBinding when necessary. -func (s *Scheduler) patchBindingScheduleStatus(rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error { - if rb == nil { - return nil - } +// patchBindingStatusCondition patches schedule status condition of ResourceBinding when necessary. +func patchBindingStatusCondition(karmadaClient karmadaclientset.Interface, rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error { + klog.V(4).Infof("Begin to patch status condition to ResourceBinding(%s/%s)", rb.Namespace, rb.Name) - modifiedObj := rb.DeepCopy() - meta.SetStatusCondition(&modifiedObj.Status.Conditions, newScheduledCondition) + updateRB := rb.DeepCopy() + meta.SetStatusCondition(&updateRB.Status.Conditions, newScheduledCondition) // Postpone setting observed generation until schedule succeed, assume scheduler will retry and // will succeed eventually. if newScheduledCondition.Status == metav1.ConditionTrue { - modifiedObj.Status.SchedulerObservedGeneration = modifiedObj.Generation + updateRB.Status.SchedulerObservedGeneration = rb.Generation } + return patchBindingStatus(karmadaClient, rb, updateRB) +} +// patchBindingStatusWithAffinityName patches schedule status with affinityName of ResourceBinding when necessary. +func patchBindingStatusWithAffinityName(karmadaClient karmadaclientset.Interface, rb *workv1alpha2.ResourceBinding, affinityName string) error { + klog.V(4).Infof("Begin to patch status with affinityName(%s) to ResourceBinding(%s/%s).", affinityName, rb.Namespace, rb.Name) + + updateRB := rb.DeepCopy() + updateRB.Status.SchedulerObservedAffinityName = affinityName + return patchBindingStatus(karmadaClient, rb, updateRB) +} + +func patchBindingStatus(karmadaClient karmadaclientset.Interface, rb, updateRB *workv1alpha2.ResourceBinding) error { // Short path, ignore patch if no change. - if reflect.DeepEqual(rb.Status, modifiedObj.Status) { + if reflect.DeepEqual(rb.Status, updateRB.Status) { return nil } - patchBytes, err := helper.GenMergePatch(rb, modifiedObj) + patchBytes, err := helper.GenMergePatch(rb, updateRB) if err != nil { return fmt.Errorf("failed to create a merge patch: %v", err) } - _, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Patch(context.TODO(), rb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + _, err = karmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Patch(context.TODO(), rb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") if err != nil { - klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err) + klog.Errorf("Failed to patch schedule status ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err) return err } @@ -631,31 +760,40 @@ func (s *Scheduler) patchBindingScheduleStatus(rb *workv1alpha2.ResourceBinding, return nil } -// patchClusterBindingScheduleStatus patches schedule status of ClusterResourceBinding when necessary -func (s *Scheduler) patchClusterBindingScheduleStatus(crb *workv1alpha2.ClusterResourceBinding, newScheduledCondition metav1.Condition) error { - if crb == nil { - return nil - } +// patchClusterBindingStatusCondition patches schedule status condition of ClusterResourceBinding when necessary +func patchClusterBindingStatusCondition(karmadaClient karmadaclientset.Interface, crb *workv1alpha2.ClusterResourceBinding, newScheduledCondition metav1.Condition) error { + klog.V(4).Infof("Begin to patch status condition to ClusterResourceBinding(%s)", crb.Name) - modifiedObj := crb.DeepCopy() - meta.SetStatusCondition(&modifiedObj.Status.Conditions, newScheduledCondition) + updateCRB := crb.DeepCopy() + meta.SetStatusCondition(&updateCRB.Status.Conditions, newScheduledCondition) // Postpone setting observed generation until schedule succeed, assume scheduler will retry and - // will succeed eventually + // will succeed eventually. if newScheduledCondition.Status == metav1.ConditionTrue { - modifiedObj.Status.SchedulerObservedGeneration = modifiedObj.Generation + updateCRB.Status.SchedulerObservedGeneration = crb.Generation } + return patchClusterResourceBindingStatus(karmadaClient, crb, updateCRB) +} +// patchClusterBindingStatusWithAffinityName patches schedule status with affinityName of ClusterResourceBinding when necessary. +func patchClusterBindingStatusWithAffinityName(karmadaClient karmadaclientset.Interface, crb *workv1alpha2.ClusterResourceBinding, affinityName string) error { + klog.V(4).Infof("Begin to patch status with affinityName(%s) to ClusterResourceBinding(%s).", affinityName, crb.Name) + updateCRB := crb.DeepCopy() + updateCRB.Status.SchedulerObservedAffinityName = affinityName + return patchClusterResourceBindingStatus(karmadaClient, crb, updateCRB) +} + +func patchClusterResourceBindingStatus(karmadaClient karmadaclientset.Interface, crb, updateCRB *workv1alpha2.ClusterResourceBinding) error { // Short path, ignore patch if no change. - if reflect.DeepEqual(crb.Status, modifiedObj.Status) { + if reflect.DeepEqual(crb.Status, updateCRB.Status) { return nil } - patchBytes, err := helper.GenMergePatch(crb, modifiedObj) + patchBytes, err := helper.GenMergePatch(crb, updateCRB) if err != nil { return fmt.Errorf("failed to create a merge patch: %v", err) } - _, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), crb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + _, err = karmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), crb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") if err != nil { klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", crb.Name, err) return err diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 364a2c08c..b77ba9fbd 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -70,7 +70,7 @@ func TestCreateScheduler(t *testing.T) { } } -func Test_patchBindingScheduleStatus(t *testing.T) { +func Test_patchBindingStatusCondition(t *testing.T) { oneHourBefore := time.Now().Add(-1 * time.Hour).Round(time.Second) oneHourAfter := time.Now().Add(1 * time.Hour).Round(time.Second) @@ -80,14 +80,7 @@ func Test_patchBindingScheduleStatus(t *testing.T) { successCondition.LastTransitionTime = metav1.Time{Time: oneHourBefore} failureCondition.LastTransitionTime = metav1.Time{Time: oneHourAfter} - dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()) karmadaClient := karmadafake.NewSimpleClientset() - kubeClient := fake.NewSimpleClientset() - - scheduler, err := NewScheduler(dynamicClient, karmadaClient, kubeClient) - if err != nil { - t.Error(err) - } tests := []struct { name string @@ -159,7 +152,7 @@ func Test_patchBindingScheduleStatus(t *testing.T) { if err != nil { t.Fatal(err) } - err = scheduler.patchBindingScheduleStatus(test.binding, test.newScheduledCondition) + err = patchBindingStatusCondition(karmadaClient, test.binding, test.newScheduledCondition) if err != nil { t.Error(err) } @@ -174,7 +167,53 @@ func Test_patchBindingScheduleStatus(t *testing.T) { } } -func Test_patchClusterBindingScheduleStatus(t *testing.T) { +func Test_patchBindingStatusWithAffinityName(t *testing.T) { + karmadaClient := karmadafake.NewSimpleClientset() + + tests := []struct { + name string + binding *workv1alpha2.ResourceBinding + affinityName string + expected *workv1alpha2.ResourceBinding + }{ + { + name: "add affinityName in status", + binding: &workv1alpha2.ResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Name: "rb-1", Namespace: "default", Generation: 1}, + Spec: workv1alpha2.ResourceBindingSpec{}, + Status: workv1alpha2.ResourceBindingStatus{}, + }, + affinityName: "group1", + expected: &workv1alpha2.ResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Name: "rb-1", Namespace: "default", Generation: 1}, + Spec: workv1alpha2.ResourceBindingSpec{}, + Status: workv1alpha2.ResourceBindingStatus{SchedulerObservedAffinityName: "group1"}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, err := karmadaClient.WorkV1alpha2().ResourceBindings(test.binding.Namespace).Create(context.TODO(), test.binding, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + err = patchBindingStatusWithAffinityName(karmadaClient, test.binding, test.affinityName) + if err != nil { + t.Error(err) + } + res, err := karmadaClient.WorkV1alpha2().ResourceBindings(test.binding.Namespace).Get(context.TODO(), test.binding.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res.Status, test.expected.Status) { + t.Errorf("expected status: %v, but got: %v", test.expected.Status, res.Status) + } + }) + } +} + +func Test_patchClusterBindingStatusCondition(t *testing.T) { oneHourBefore := time.Now().Add(-1 * time.Hour).Round(time.Second) oneHourAfter := time.Now().Add(1 * time.Hour).Round(time.Second) @@ -184,14 +223,7 @@ func Test_patchClusterBindingScheduleStatus(t *testing.T) { successCondition.LastTransitionTime = metav1.Time{Time: oneHourBefore} failureCondition.LastTransitionTime = metav1.Time{Time: oneHourAfter} - dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme()) karmadaClient := karmadafake.NewSimpleClientset() - kubeClient := fake.NewSimpleClientset() - - scheduler, err := NewScheduler(dynamicClient, karmadaClient, kubeClient) - if err != nil { - t.Error(err) - } tests := []struct { name string @@ -263,7 +295,60 @@ func Test_patchClusterBindingScheduleStatus(t *testing.T) { if err != nil { t.Fatal(err) } - err = scheduler.patchClusterBindingScheduleStatus(test.binding, test.newScheduledCondition) + err = patchClusterBindingStatusCondition(karmadaClient, test.binding, test.newScheduledCondition) + if err != nil { + t.Error(err) + } + res, err := karmadaClient.WorkV1alpha2().ClusterResourceBindings().Get(context.TODO(), test.binding.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res.Status, test.expected.Status) { + t.Errorf("expected status: %v, but got: %v", test.expected.Status, res.Status) + } + }) + } +} + +func Test_patchClusterBindingStatusWithAffinityName(t *testing.T) { + karmadaClient := karmadafake.NewSimpleClientset() + + tests := []struct { + name string + binding *workv1alpha2.ClusterResourceBinding + affinityName string + expected *workv1alpha2.ClusterResourceBinding + }{ + { + name: "add affinityName in status", + binding: &workv1alpha2.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Name: "crb-1", Generation: 1}, + Spec: workv1alpha2.ResourceBindingSpec{}, + Status: workv1alpha2.ResourceBindingStatus{ + Conditions: []metav1.Condition{util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)}, + SchedulerObservedGeneration: 1, + }, + }, + affinityName: "group1", + expected: &workv1alpha2.ClusterResourceBinding{ + ObjectMeta: metav1.ObjectMeta{Name: "crb-1"}, + Spec: workv1alpha2.ResourceBindingSpec{}, + Status: workv1alpha2.ResourceBindingStatus{ + SchedulerObservedAffinityName: "group1", + Conditions: []metav1.Condition{util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)}, + SchedulerObservedGeneration: 1, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, err := karmadaClient.WorkV1alpha2().ClusterResourceBindings().Create(context.TODO(), test.binding, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + err = patchClusterBindingStatusWithAffinityName(karmadaClient, test.binding, test.affinityName) if err != nil { t.Error(err) }