consider clusteraffinities when scedule with binding

Signed-off-by: changzhen <changzhen5@huawei.com>
This commit is contained in:
changzhen 2023-02-27 11:35:45 +08:00
parent 69d330f04c
commit 313ecd3b2c
5 changed files with 450 additions and 140 deletions

View File

@ -178,9 +178,18 @@ func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alp
for _, binding := range bindings {
placementPtr := binding.Spec.Placement
if placementPtr == nil {
// never reach here
continue
}
affinity := placementPtr.ClusterAffinity
var affinity *policyv1alpha1.ClusterAffinity
if placementPtr.ClusterAffinities != nil {
affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName)
affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity
} else {
affinity = placementPtr.ClusterAffinity
}
switch {
case affinity == nil:
// If no clusters specified, add it to the queue
@ -198,9 +207,18 @@ func (s *Scheduler) enqueueAffectedBindings(oldCluster, newCluster *clusterv1alp
for _, binding := range clusterBindings {
placementPtr := binding.Spec.Placement
if placementPtr == nil {
// never reach here
continue
}
affinity := placementPtr.ClusterAffinity
var affinity *policyv1alpha1.ClusterAffinity
if placementPtr.ClusterAffinities != nil {
affinityIndex := getAffinityIndex(placementPtr.ClusterAffinities, binding.Status.SchedulerObservedAffinityName)
affinity = &placementPtr.ClusterAffinities[affinityIndex].ClusterAffinity
} else {
affinity = placementPtr.ClusterAffinity
}
switch {
case affinity == nil:
// If no clusters specified, add it to the queue

View File

@ -71,3 +71,16 @@ func clusterAffinitiesChanged(
}
return false
}
func getAffinityIndex(affinities []policyv1alpha1.ClusterAffinityTerm, observedName string) int {
if observedName == "" {
return 0
}
for index, term := range affinities {
if term.AffinityName == observedName {
return index
}
}
return 0
}

View File

@ -372,3 +372,59 @@ func Test_needConsideredPlacementChanged(t *testing.T) {
})
}
}
func Test_getAffinityIndex(t *testing.T) {
type args struct {
affinities []policyv1alpha1.ClusterAffinityTerm
observedName string
}
tests := []struct {
name string
args args
want int
}{
{
name: "empty observedName",
args: args{
affinities: []policyv1alpha1.ClusterAffinityTerm{
{AffinityName: "group1"},
{AffinityName: "group2"},
{AffinityName: "group3"},
},
observedName: "",
},
want: 0,
},
{
name: "observedName can not find in affinities",
args: args{
affinities: []policyv1alpha1.ClusterAffinityTerm{
{AffinityName: "group1"},
{AffinityName: "group2"},
{AffinityName: "group3"},
},
observedName: "group0",
},
want: 0,
},
{
name: "observedName can find in affinities",
args: args{
affinities: []policyv1alpha1.ClusterAffinityTerm{
{AffinityName: "group1"},
{AffinityName: "group2"},
{AffinityName: "group3"},
},
observedName: "group3",
},
want: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getAffinityIndex(tt.args.affinities, tt.args.observedName); got != tt.want {
t.Errorf("getAffinityIndex() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -266,48 +266,6 @@ func (s *Scheduler) worker() {
}
}
func (s *Scheduler) getPlacement(resourceBinding *workv1alpha2.ResourceBinding) (policyv1alpha1.Placement, string, error) {
var placement policyv1alpha1.Placement
var err error
placementPtr := resourceBinding.Spec.Placement
if placementPtr == nil {
err = fmt.Errorf("failed to get placement from resourceBinding(%s/%s)", resourceBinding.Namespace, resourceBinding.Name)
klog.Error(err)
return placement, "", err
}
placement = *placementPtr
var placementBytes []byte
placementBytes, err = json.Marshal(placement)
if err != nil {
return placement, "", err
}
return placement, string(placementBytes), nil
}
func (s *Scheduler) getClusterPlacement(crb *workv1alpha2.ClusterResourceBinding) (policyv1alpha1.Placement, string, error) {
var placement policyv1alpha1.Placement
var err error
placementPtr := crb.Spec.Placement
if placementPtr == nil {
err = fmt.Errorf("failed to get placement from clusterResourceBinding(%s)", crb.Name)
klog.Error(err)
return placement, "", err
}
placement = *placementPtr
var placementBytes []byte
placementBytes, err = json.Marshal(placement)
if err != nil {
return placement, "", err
}
return placement, string(placementBytes), nil
}
func (s *Scheduler) scheduleNext() bool {
key, shutdown := s.queue.Get()
if shutdown {
@ -342,38 +300,41 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) {
return err
}
start := time.Now()
policyPlacement, policyPlacementStr, err := s.getPlacement(rb)
if err != nil {
if rb.Spec.Placement == nil {
// never reach here
err = fmt.Errorf("failed to get placement from resourceBinding(%s/%s)", rb.Namespace, rb.Name)
klog.Error(err)
return err
}
start := time.Now()
appliedPlacementStr := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation)
if placementChanged(*rb.Spec.Placement, appliedPlacementStr, rb.Status.SchedulerObservedAffinityName) {
// policy placement changed, need schedule
klog.Infof("Start to schedule ResourceBinding(%s/%s) as placement changed", namespace, name)
err = s.scheduleResourceBinding(rb, policyPlacementStr)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
if policyPlacement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&rb.Spec, policyPlacement.ReplicaScheduling) {
if rb.Spec.Placement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&rb.Spec, rb.Spec.Placement.ReplicaScheduling) {
// binding replicas changed, need reschedule
klog.Infof("Reschedule ResourceBinding(%s/%s) as replicas scaled down or scaled up", namespace, name)
err = s.scheduleResourceBinding(rb, policyPlacementStr)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
if rb.Spec.Replicas == 0 ||
policyPlacement.ReplicaScheduling == nil ||
policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
rb.Spec.Placement.ReplicaScheduling == nil ||
rb.Spec.Placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
// Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated
// even if scheduling type is divided.
klog.V(3).Infof("Start to schedule ResourceBinding(%s/%s) as scheduling type is duplicated", namespace, name)
err = s.scheduleResourceBinding(rb, policyPlacementStr)
err = s.scheduleResourceBinding(rb)
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
// TODO(dddddai): reschedule bindings on cluster change
klog.V(3).Infof("Don't need to schedule ResourceBinding(%s/%s)", namespace, name)
klog.V(3).Infof("Don't need to schedule ResourceBinding(%s/%s)", rb.Namespace, rb.Name)
return nil
}
@ -387,33 +348,36 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {
return err
}
start := time.Now()
policyPlacement, policyPlacementStr, err := s.getClusterPlacement(crb)
if err != nil {
if crb.Spec.Placement == nil {
// never reach here
err = fmt.Errorf("failed to get placement from clusterResourceBinding(%s)", crb.Name)
klog.Error(err)
return err
}
start := time.Now()
appliedPlacementStr := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation)
if placementChanged(*crb.Spec.Placement, appliedPlacementStr, crb.Status.SchedulerObservedAffinityName) {
// policy placement changed, need schedule
klog.Infof("Start to schedule ClusterResourceBinding(%s) as placement changed", name)
err = s.scheduleClusterResourceBinding(crb, policyPlacementStr)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
if policyPlacement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&crb.Spec, policyPlacement.ReplicaScheduling) {
if crb.Spec.Placement.ReplicaScheduling != nil && util.IsBindingReplicasChanged(&crb.Spec, crb.Spec.Placement.ReplicaScheduling) {
// binding replicas changed, need reschedule
klog.Infof("Reschedule ClusterResourceBinding(%s) as replicas scaled down or scaled up", name)
err = s.scheduleClusterResourceBinding(crb, policyPlacementStr)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(ScaleSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
if crb.Spec.Replicas == 0 ||
policyPlacement.ReplicaScheduling == nil ||
policyPlacement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
crb.Spec.Placement.ReplicaScheduling == nil ||
crb.Spec.Placement.ReplicaScheduling.ReplicaSchedulingType == policyv1alpha1.ReplicaSchedulingTypeDuplicated {
// Duplicated resources should always be scheduled. Note: non-workload is considered as duplicated
// even if scheduling type is divided.
klog.V(3).Infof("Start to schedule ClusterResourceBinding(%s) as scheduling type is duplicated", name)
err = s.scheduleClusterResourceBinding(crb, policyPlacementStr)
err = s.scheduleClusterResourceBinding(crb)
metrics.BindingSchedule(string(ReconcileSchedule), utilmetrics.DurationInSeconds(start), err)
return err
}
@ -422,21 +386,16 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {
return nil
}
func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding, placementStr string) (err error) {
klog.V(4).InfoS("Begin scheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
defer klog.V(4).InfoS("End scheduling resource binding", "resourceBinding", klog.KObj(resourceBinding))
// Update "Scheduled" condition according to schedule result.
func (s *Scheduler) scheduleResourceBinding(rb *workv1alpha2.ResourceBinding) (err error) {
defer func() {
s.recordScheduleResultEventForResourceBinding(resourceBinding, err)
var condition metav1.Condition
if err == nil {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
} else {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
if updateErr := s.patchBindingScheduleStatus(resourceBinding, condition); updateErr != nil {
klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", resourceBinding.Namespace, resourceBinding.Name, err)
if updateErr := patchBindingStatusCondition(s.KarmadaClient, rb, condition); updateErr != nil {
klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
if err == nil {
// schedule succeed but update status failed, return err in order to retry in next loop.
err = updateErr
@ -444,18 +403,98 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
}
}()
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &resourceBinding.Spec, &resourceBinding.Status,
&core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
var noClusterFit *framework.FitError
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterFit) {
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
if rb.Spec.Placement.ClusterAffinities != nil {
return s.scheduleResourceBindingWithClusterAffinities(rb)
}
return s.scheduleResourceBindingWithClusterAffinity(rb)
}
func (s *Scheduler) scheduleResourceBindingWithClusterAffinity(rb *workv1alpha2.ResourceBinding) error {
klog.V(4).InfoS("Begin scheduling resource binding with ClusterAffinity", "resourceBinding", klog.KObj(rb))
defer klog.V(4).InfoS("End scheduling resource binding with ClusterAffinity", "resourceBinding", klog.KObj(rb))
placementBytes, err := json.Marshal(*rb.Spec.Placement)
if err != nil {
klog.V(4).ErrorS(err, "Failed to marshal binding placement", "resourceBinding", klog.KObj(rb))
return err
}
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
scheduleErr := s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
return utilerrors.NewAggregate([]error{err, scheduleErr})
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &rb.Spec, &rb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
var noClusterFit *framework.FitError
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterFit) {
s.recordScheduleResultEventForResourceBinding(rb, err)
klog.Errorf("Failed scheduling ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
return err
}
klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, scheduleResult.SuggestedClusters)
patchErr := s.patchScheduleResultForResourceBinding(rb, string(placementBytes), scheduleResult.SuggestedClusters)
s.recordScheduleResultEventForResourceBinding(rb, utilerrors.NewAggregate([]error{err, patchErr}))
// only care about the patch result,
// for FitError already recorded by event
return patchErr
}
func (s *Scheduler) scheduleResourceBindingWithClusterAffinities(rb *workv1alpha2.ResourceBinding) error {
klog.V(4).InfoS("Begin scheduling resourceBinding with ClusterAffinities", "resourceBinding", klog.KObj(rb))
defer klog.V(4).InfoS("End scheduling resourceBinding with ClusterAffinities", "resourceBinding", klog.KObj(rb))
placementBytes, err := json.Marshal(*rb.Spec.Placement)
if err != nil {
klog.V(4).ErrorS(err, "Failed to marshal binding placement", "resourceBinding", klog.KObj(rb))
return err
}
var (
scheduleResult core.ScheduleResult
firstErr error
)
affinityIndex := getAffinityIndex(rb.Spec.Placement.ClusterAffinities, rb.Status.SchedulerObservedAffinityName)
updatedStatus := rb.Status.DeepCopy()
for affinityIndex < len(rb.Spec.Placement.ClusterAffinities) {
klog.V(4).Infof("Schedule ResourceBinding(%s/%s) with clusterAffiliates index(%d)", rb.Namespace, rb.Name, affinityIndex)
updatedStatus.SchedulerObservedAffinityName = rb.Spec.Placement.ClusterAffinities[affinityIndex].AffinityName
scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &rb.Spec, updatedStatus, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
if err == nil {
break
}
// obtain to err of the first scheduling
if firstErr == nil {
firstErr = err
}
err = fmt.Errorf("failed to schedule ResourceBinding(%s/%s) with clusterAffiliates index(%d): %v", rb.Namespace, rb.Name, affinityIndex, err)
klog.Error(err)
s.recordScheduleResultEventForResourceBinding(rb, err)
affinityIndex++
}
if affinityIndex >= len(rb.Spec.Placement.ClusterAffinities) {
klog.Errorf("Failed to schedule ResourceBinding(%s/%s) with all ClusterAffinities.", rb.Namespace, rb.Name)
updatedStatus.SchedulerObservedAffinityName = rb.Status.SchedulerObservedAffinityName
var noClusterFit *framework.FitError
if !errors.As(firstErr, &noClusterFit) {
return firstErr
}
klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, nil)
patchErr := s.patchScheduleResultForResourceBinding(rb, string(placementBytes), nil)
s.recordScheduleResultEventForResourceBinding(rb, patchErr)
return patchErr
}
klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, scheduleResult.SuggestedClusters)
patchErr := s.patchScheduleResultForResourceBinding(rb, string(placementBytes), scheduleResult.SuggestedClusters)
patchStatusErr := patchBindingStatusWithAffinityName(s.KarmadaClient, rb, updatedStatus.SchedulerObservedAffinityName)
scheduleErr := utilerrors.NewAggregate([]error{patchErr, patchStatusErr})
s.recordScheduleResultEventForResourceBinding(rb, scheduleErr)
return scheduleErr
}
func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
@ -483,24 +522,24 @@ func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alph
}
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(newBinding.Namespace).Patch(context.TODO(), newBinding.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
return err
if err != nil {
klog.Errorf("Failed to patch schedule to ResourceBinding(%s/%s): %v", oldBinding.Namespace, oldBinding.Name, err)
}
klog.V(4).Infof("Patch schedule to ResourceBinding(%s/%s) succeed", oldBinding.Namespace, oldBinding.Name)
return nil
}
func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding, placementStr string) (err error) {
klog.V(4).InfoS("Begin scheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
defer klog.V(4).InfoS("End scheduling cluster resource binding", "clusterResourceBinding", klog.KObj(clusterResourceBinding))
// Update "Scheduled" condition according to schedule result.
func (s *Scheduler) scheduleClusterResourceBinding(crb *workv1alpha2.ClusterResourceBinding) (err error) {
defer func() {
s.recordScheduleResultEventForClusterResourceBinding(clusterResourceBinding, err)
var condition metav1.Condition
if err == nil {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
} else {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
if updateErr := s.patchClusterBindingScheduleStatus(clusterResourceBinding, condition); updateErr != nil {
klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", clusterResourceBinding.Name, err)
if updateErr := patchClusterBindingStatusCondition(s.KarmadaClient, crb, condition); updateErr != nil {
klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", crb.Name, err)
if err == nil {
// schedule succeed but update status failed, return err in order to retry in next loop.
err = updateErr
@ -508,18 +547,98 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
}
}()
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &clusterResourceBinding.Spec, &clusterResourceBinding.Status,
&core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
var noClusterFit *framework.FitError
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterFit) {
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
if crb.Spec.Placement.ClusterAffinities != nil {
return s.scheduleClusterResourceBindingWithClusterAffinities(crb)
}
return s.scheduleClusterResourceBindingWithClusterAffinity(crb)
}
func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinity(crb *workv1alpha2.ClusterResourceBinding) error {
klog.V(4).InfoS("Begin scheduling clusterResourceBinding with ClusterAffinity", "clusterResourceBinding", klog.KObj(crb))
defer klog.V(4).InfoS("End scheduling clusterResourceBinding with ClusterAffinity", "clusterResourceBinding", klog.KObj(crb))
placementBytes, err := json.Marshal(*crb.Spec.Placement)
if err != nil {
klog.V(4).ErrorS(err, "Failed to marshal binding placement", "clusterResourceBinding", klog.KObj(crb))
return err
}
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
scheduleErr := s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, placementStr, scheduleResult.SuggestedClusters)
return utilerrors.NewAggregate([]error{err, scheduleErr})
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &crb.Spec, &crb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
var noClusterFit *framework.FitError
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterFit) {
s.recordScheduleResultEventForClusterResourceBinding(crb, err)
klog.Errorf("Failed scheduling clusterResourceBinding(%s): %v", crb.Name, err)
return err
}
klog.V(4).Infof("clusterResourceBinding(%s) scheduled to clusters %v", crb.Name, scheduleResult.SuggestedClusters)
patchErr := s.patchScheduleResultForClusterResourceBinding(crb, string(placementBytes), scheduleResult.SuggestedClusters)
s.recordScheduleResultEventForClusterResourceBinding(crb, utilerrors.NewAggregate([]error{err, patchErr}))
// only care about the patch result,
// for FitError already recorded by event
return patchErr
}
func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinities(crb *workv1alpha2.ClusterResourceBinding) error {
klog.V(4).InfoS("Begin scheduling clusterResourceBinding with ClusterAffinities", "clusterResourceBinding", klog.KObj(crb))
defer klog.V(4).InfoS("End scheduling clusterResourceBinding with ClusterAffinities", "clusterResourceBinding", klog.KObj(crb))
placementBytes, err := json.Marshal(*crb.Spec.Placement)
if err != nil {
klog.V(4).ErrorS(err, "Failed to marshal binding placement", "clusterResourceBinding", klog.KObj(crb))
return err
}
var (
scheduleResult core.ScheduleResult
firstErr error
)
affinityIndex := getAffinityIndex(crb.Spec.Placement.ClusterAffinities, crb.Status.SchedulerObservedAffinityName)
updatedStatus := crb.Status.DeepCopy()
for affinityIndex < len(crb.Spec.Placement.ClusterAffinities) {
klog.V(4).Infof("Schedule ClusterResourceBinding(%s) with clusterAffiliates index(%d)", crb.Name, affinityIndex)
updatedStatus.SchedulerObservedAffinityName = crb.Spec.Placement.ClusterAffinities[affinityIndex].AffinityName
scheduleResult, err = s.Algorithm.Schedule(context.TODO(), &crb.Spec, updatedStatus, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
if err == nil {
break
}
// obtain to err of the first scheduling
if firstErr == nil {
firstErr = err
}
err = fmt.Errorf("failed to schedule ClusterResourceBinding(%s) with clusterAffiliates index(%d): %v", crb.Name, affinityIndex, err)
klog.Error(err)
s.recordScheduleResultEventForClusterResourceBinding(crb, err)
affinityIndex++
}
if affinityIndex >= len(crb.Spec.Placement.ClusterAffinities) {
klog.Errorf("Failed to schedule ClusterResourceBinding(%s) with all ClusterAffinities.", crb.Name)
updatedStatus.SchedulerObservedAffinityName = crb.Status.SchedulerObservedAffinityName
var noClusterFit *framework.FitError
if !errors.As(firstErr, &noClusterFit) {
return firstErr
}
klog.V(4).Infof("ClusterResourceBinding(%s) scheduled to clusters %v", crb.Name, nil)
patchErr := s.patchScheduleResultForClusterResourceBinding(crb, string(placementBytes), nil)
s.recordScheduleResultEventForClusterResourceBinding(crb, patchErr)
return patchErr
}
klog.V(4).Infof("ClusterResourceBinding(%s) scheduled to clusters %v", crb.Name, scheduleResult.SuggestedClusters)
patchErr := s.patchScheduleResultForClusterResourceBinding(crb, string(placementBytes), scheduleResult.SuggestedClusters)
patchStatusErr := patchClusterBindingStatusWithAffinityName(s.KarmadaClient, crb, updatedStatus.SchedulerObservedAffinityName)
scheduleErr := utilerrors.NewAggregate([]error{patchErr, patchStatusErr})
s.recordScheduleResultEventForClusterResourceBinding(crb, scheduleErr)
return scheduleErr
}
func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
@ -597,33 +716,43 @@ func (s *Scheduler) establishEstimatorConnections() {
}
}
// patchBindingScheduleStatus patches schedule status of ResourceBinding when necessary.
func (s *Scheduler) patchBindingScheduleStatus(rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error {
if rb == nil {
return nil
}
// patchBindingStatusCondition patches schedule status condition of ResourceBinding when necessary.
func patchBindingStatusCondition(karmadaClient karmadaclientset.Interface, rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error {
klog.V(4).Infof("Begin to patch status condition to ResourceBinding(%s/%s)", rb.Namespace, rb.Name)
modifiedObj := rb.DeepCopy()
meta.SetStatusCondition(&modifiedObj.Status.Conditions, newScheduledCondition)
updateRB := rb.DeepCopy()
meta.SetStatusCondition(&updateRB.Status.Conditions, newScheduledCondition)
// Postpone setting observed generation until schedule succeed, assume scheduler will retry and
// will succeed eventually.
if newScheduledCondition.Status == metav1.ConditionTrue {
modifiedObj.Status.SchedulerObservedGeneration = modifiedObj.Generation
updateRB.Status.SchedulerObservedGeneration = rb.Generation
}
return patchBindingStatus(karmadaClient, rb, updateRB)
}
// patchBindingStatusWithAffinityName patches schedule status with affinityName of ResourceBinding when necessary.
func patchBindingStatusWithAffinityName(karmadaClient karmadaclientset.Interface, rb *workv1alpha2.ResourceBinding, affinityName string) error {
klog.V(4).Infof("Begin to patch status with affinityName(%s) to ResourceBinding(%s/%s).", affinityName, rb.Namespace, rb.Name)
updateRB := rb.DeepCopy()
updateRB.Status.SchedulerObservedAffinityName = affinityName
return patchBindingStatus(karmadaClient, rb, updateRB)
}
func patchBindingStatus(karmadaClient karmadaclientset.Interface, rb, updateRB *workv1alpha2.ResourceBinding) error {
// Short path, ignore patch if no change.
if reflect.DeepEqual(rb.Status, modifiedObj.Status) {
if reflect.DeepEqual(rb.Status, updateRB.Status) {
return nil
}
patchBytes, err := helper.GenMergePatch(rb, modifiedObj)
patchBytes, err := helper.GenMergePatch(rb, updateRB)
if err != nil {
return fmt.Errorf("failed to create a merge patch: %v", err)
}
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Patch(context.TODO(), rb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
_, err = karmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Patch(context.TODO(), rb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
klog.Errorf("Failed to patch schedule status ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
return err
}
@ -631,31 +760,40 @@ func (s *Scheduler) patchBindingScheduleStatus(rb *workv1alpha2.ResourceBinding,
return nil
}
// patchClusterBindingScheduleStatus patches schedule status of ClusterResourceBinding when necessary
func (s *Scheduler) patchClusterBindingScheduleStatus(crb *workv1alpha2.ClusterResourceBinding, newScheduledCondition metav1.Condition) error {
if crb == nil {
return nil
}
// patchClusterBindingStatusCondition patches schedule status condition of ClusterResourceBinding when necessary
func patchClusterBindingStatusCondition(karmadaClient karmadaclientset.Interface, crb *workv1alpha2.ClusterResourceBinding, newScheduledCondition metav1.Condition) error {
klog.V(4).Infof("Begin to patch status condition to ClusterResourceBinding(%s)", crb.Name)
modifiedObj := crb.DeepCopy()
meta.SetStatusCondition(&modifiedObj.Status.Conditions, newScheduledCondition)
updateCRB := crb.DeepCopy()
meta.SetStatusCondition(&updateCRB.Status.Conditions, newScheduledCondition)
// Postpone setting observed generation until schedule succeed, assume scheduler will retry and
// will succeed eventually
// will succeed eventually.
if newScheduledCondition.Status == metav1.ConditionTrue {
modifiedObj.Status.SchedulerObservedGeneration = modifiedObj.Generation
updateCRB.Status.SchedulerObservedGeneration = crb.Generation
}
return patchClusterResourceBindingStatus(karmadaClient, crb, updateCRB)
}
// patchClusterBindingStatusWithAffinityName patches schedule status with affinityName of ClusterResourceBinding when necessary.
func patchClusterBindingStatusWithAffinityName(karmadaClient karmadaclientset.Interface, crb *workv1alpha2.ClusterResourceBinding, affinityName string) error {
klog.V(4).Infof("Begin to patch status with affinityName(%s) to ClusterResourceBinding(%s).", affinityName, crb.Name)
updateCRB := crb.DeepCopy()
updateCRB.Status.SchedulerObservedAffinityName = affinityName
return patchClusterResourceBindingStatus(karmadaClient, crb, updateCRB)
}
func patchClusterResourceBindingStatus(karmadaClient karmadaclientset.Interface, crb, updateCRB *workv1alpha2.ClusterResourceBinding) error {
// Short path, ignore patch if no change.
if reflect.DeepEqual(crb.Status, modifiedObj.Status) {
if reflect.DeepEqual(crb.Status, updateCRB.Status) {
return nil
}
patchBytes, err := helper.GenMergePatch(crb, modifiedObj)
patchBytes, err := helper.GenMergePatch(crb, updateCRB)
if err != nil {
return fmt.Errorf("failed to create a merge patch: %v", err)
}
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), crb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
_, err = karmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), crb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", crb.Name, err)
return err

View File

@ -70,7 +70,7 @@ func TestCreateScheduler(t *testing.T) {
}
}
func Test_patchBindingScheduleStatus(t *testing.T) {
func Test_patchBindingStatusCondition(t *testing.T) {
oneHourBefore := time.Now().Add(-1 * time.Hour).Round(time.Second)
oneHourAfter := time.Now().Add(1 * time.Hour).Round(time.Second)
@ -80,14 +80,7 @@ func Test_patchBindingScheduleStatus(t *testing.T) {
successCondition.LastTransitionTime = metav1.Time{Time: oneHourBefore}
failureCondition.LastTransitionTime = metav1.Time{Time: oneHourAfter}
dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme())
karmadaClient := karmadafake.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
scheduler, err := NewScheduler(dynamicClient, karmadaClient, kubeClient)
if err != nil {
t.Error(err)
}
tests := []struct {
name string
@ -159,7 +152,7 @@ func Test_patchBindingScheduleStatus(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = scheduler.patchBindingScheduleStatus(test.binding, test.newScheduledCondition)
err = patchBindingStatusCondition(karmadaClient, test.binding, test.newScheduledCondition)
if err != nil {
t.Error(err)
}
@ -174,7 +167,53 @@ func Test_patchBindingScheduleStatus(t *testing.T) {
}
}
func Test_patchClusterBindingScheduleStatus(t *testing.T) {
func Test_patchBindingStatusWithAffinityName(t *testing.T) {
karmadaClient := karmadafake.NewSimpleClientset()
tests := []struct {
name string
binding *workv1alpha2.ResourceBinding
affinityName string
expected *workv1alpha2.ResourceBinding
}{
{
name: "add affinityName in status",
binding: &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "rb-1", Namespace: "default", Generation: 1},
Spec: workv1alpha2.ResourceBindingSpec{},
Status: workv1alpha2.ResourceBindingStatus{},
},
affinityName: "group1",
expected: &workv1alpha2.ResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "rb-1", Namespace: "default", Generation: 1},
Spec: workv1alpha2.ResourceBindingSpec{},
Status: workv1alpha2.ResourceBindingStatus{SchedulerObservedAffinityName: "group1"},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := karmadaClient.WorkV1alpha2().ResourceBindings(test.binding.Namespace).Create(context.TODO(), test.binding, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
err = patchBindingStatusWithAffinityName(karmadaClient, test.binding, test.affinityName)
if err != nil {
t.Error(err)
}
res, err := karmadaClient.WorkV1alpha2().ResourceBindings(test.binding.Namespace).Get(context.TODO(), test.binding.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(res.Status, test.expected.Status) {
t.Errorf("expected status: %v, but got: %v", test.expected.Status, res.Status)
}
})
}
}
func Test_patchClusterBindingStatusCondition(t *testing.T) {
oneHourBefore := time.Now().Add(-1 * time.Hour).Round(time.Second)
oneHourAfter := time.Now().Add(1 * time.Hour).Round(time.Second)
@ -184,14 +223,7 @@ func Test_patchClusterBindingScheduleStatus(t *testing.T) {
successCondition.LastTransitionTime = metav1.Time{Time: oneHourBefore}
failureCondition.LastTransitionTime = metav1.Time{Time: oneHourAfter}
dynamicClient := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme())
karmadaClient := karmadafake.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset()
scheduler, err := NewScheduler(dynamicClient, karmadaClient, kubeClient)
if err != nil {
t.Error(err)
}
tests := []struct {
name string
@ -263,7 +295,60 @@ func Test_patchClusterBindingScheduleStatus(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = scheduler.patchClusterBindingScheduleStatus(test.binding, test.newScheduledCondition)
err = patchClusterBindingStatusCondition(karmadaClient, test.binding, test.newScheduledCondition)
if err != nil {
t.Error(err)
}
res, err := karmadaClient.WorkV1alpha2().ClusterResourceBindings().Get(context.TODO(), test.binding.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(res.Status, test.expected.Status) {
t.Errorf("expected status: %v, but got: %v", test.expected.Status, res.Status)
}
})
}
}
func Test_patchClusterBindingStatusWithAffinityName(t *testing.T) {
karmadaClient := karmadafake.NewSimpleClientset()
tests := []struct {
name string
binding *workv1alpha2.ClusterResourceBinding
affinityName string
expected *workv1alpha2.ClusterResourceBinding
}{
{
name: "add affinityName in status",
binding: &workv1alpha2.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "crb-1", Generation: 1},
Spec: workv1alpha2.ResourceBindingSpec{},
Status: workv1alpha2.ResourceBindingStatus{
Conditions: []metav1.Condition{util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)},
SchedulerObservedGeneration: 1,
},
},
affinityName: "group1",
expected: &workv1alpha2.ClusterResourceBinding{
ObjectMeta: metav1.ObjectMeta{Name: "crb-1"},
Spec: workv1alpha2.ResourceBindingSpec{},
Status: workv1alpha2.ResourceBindingStatus{
SchedulerObservedAffinityName: "group1",
Conditions: []metav1.Condition{util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)},
SchedulerObservedGeneration: 1,
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := karmadaClient.WorkV1alpha2().ClusterResourceBindings().Create(context.TODO(), test.binding, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
err = patchClusterBindingStatusWithAffinityName(karmadaClient, test.binding, test.affinityName)
if err != nil {
t.Error(err)
}