|
|
|
@ -16,6 +16,7 @@ import (
|
|
|
|
|
"k8s.io/client-go/dynamic"
|
|
|
|
|
"k8s.io/client-go/kubernetes"
|
|
|
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
|
"k8s.io/client-go/util/retry"
|
|
|
|
|
"k8s.io/client-go/util/workqueue"
|
|
|
|
|
"k8s.io/klog/v2"
|
|
|
|
|
|
|
|
|
@ -37,16 +38,12 @@ import (
|
|
|
|
|
"github.com/karmada-io/karmada/pkg/scheduler/framework/plugins/tainttoleration"
|
|
|
|
|
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
|
|
|
|
"github.com/karmada-io/karmada/pkg/util"
|
|
|
|
|
"github.com/karmada-io/karmada/pkg/util/helper"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// ScheduleType defines the schedule type of a binding object should be performed.
|
|
|
|
|
type ScheduleType string
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// FirstSchedule means the binding object hasn't been scheduled.
|
|
|
|
|
FirstSchedule ScheduleType = "FirstSchedule"
|
|
|
|
|
|
|
|
|
|
// ReconcileSchedule means the binding object associated policy has been changed.
|
|
|
|
|
ReconcileSchedule ScheduleType = "ReconcileSchedule"
|
|
|
|
|
|
|
|
|
@ -58,8 +55,8 @@ const (
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
scheduleSuccessReason = "BindingScheduled"
|
|
|
|
|
|
|
|
|
|
scheduleSuccessReason = "BindingScheduled"
|
|
|
|
|
scheduleFailedReason = "BindingFailedScheduling"
|
|
|
|
|
scheduleSuccessMessage = "the binding has been scheduled"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -398,22 +395,32 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update "Scheduled" condition according to schedule result.
|
|
|
|
|
defer func() {
|
|
|
|
|
var condition metav1.Condition
|
|
|
|
|
if err == nil {
|
|
|
|
|
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
|
|
|
|
|
} else {
|
|
|
|
|
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
|
|
|
|
|
}
|
|
|
|
|
if updateErr := s.updateBindingScheduledConditionIfNeeded(rb, condition); updateErr != nil {
|
|
|
|
|
klog.Errorf("Failed update condition(%s) for ResourceBinding(%s/%s)", workv1alpha2.Scheduled, rb.Namespace, rb.Name)
|
|
|
|
|
if err == nil {
|
|
|
|
|
// schedule succeed but update condition failed, return err in order to retry in next loop.
|
|
|
|
|
err = updateErr
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
if !helper.IsBindingReady(&rb.Status) {
|
|
|
|
|
// the binding has not been scheduled, need schedule
|
|
|
|
|
klog.Infof("Start scheduling ResourceBinding(%s/%s)", namespace, name)
|
|
|
|
|
err = s.scheduleResourceBinding(rb)
|
|
|
|
|
metrics.BindingSchedule(string(FirstSchedule), metrics.SinceInSeconds(start), err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
policyPlacement, policyPlacementStr, err := s.getPlacement(rb)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
appliedPlacement := util.GetLabelValue(rb.Annotations, util.PolicyPlacementAnnotation)
|
|
|
|
|
if policyPlacementStr != appliedPlacement {
|
|
|
|
|
// policy placement changed, need reschedule
|
|
|
|
|
klog.Infof("Reschedule ResourceBinding(%s/%s) as placement changed", namespace, name)
|
|
|
|
|
// policy placement changed, need schedule
|
|
|
|
|
klog.Infof("Start to schedule ResourceBinding(%s/%s) as placement changed", namespace, name)
|
|
|
|
|
err = s.scheduleResourceBinding(rb)
|
|
|
|
|
metrics.BindingSchedule(string(ReconcileSchedule), metrics.SinceInSeconds(start), err)
|
|
|
|
|
return err
|
|
|
|
@ -449,22 +456,32 @@ func (s *Scheduler) doScheduleClusterBinding(name string) error {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update "Scheduled" condition according to schedule result.
|
|
|
|
|
defer func() {
|
|
|
|
|
var condition metav1.Condition
|
|
|
|
|
if err == nil {
|
|
|
|
|
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
|
|
|
|
|
} else {
|
|
|
|
|
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
|
|
|
|
|
}
|
|
|
|
|
if updateErr := s.updateClusterBindingScheduledConditionIfNeeded(crb, condition); updateErr != nil {
|
|
|
|
|
klog.Errorf("Failed update condition(%s) for ClusterResourceBinding(%s)", workv1alpha2.Scheduled, crb.Name)
|
|
|
|
|
if err == nil {
|
|
|
|
|
// schedule succeed but update condition failed, return err in order to retry in next loop.
|
|
|
|
|
err = updateErr
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
if !helper.IsBindingReady(&crb.Status) {
|
|
|
|
|
// the binding has not been scheduled, need schedule
|
|
|
|
|
klog.Infof("Start scheduling ClusterResourceBinding(%s)", name)
|
|
|
|
|
err = s.scheduleClusterResourceBinding(crb)
|
|
|
|
|
metrics.BindingSchedule(string(FirstSchedule), metrics.SinceInSeconds(start), err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
policyPlacement, policyPlacementStr, err := s.getClusterPlacement(crb)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
appliedPlacement := util.GetLabelValue(crb.Annotations, util.PolicyPlacementAnnotation)
|
|
|
|
|
if policyPlacementStr != appliedPlacement {
|
|
|
|
|
// policy placement changed, need reschedule
|
|
|
|
|
klog.Infof("Reschedule ClusterResourceBinding(%s) as placement changed", name)
|
|
|
|
|
// policy placement changed, need schedule
|
|
|
|
|
klog.Infof("Start to schedule ClusterResourceBinding(%s) as placement changed", name)
|
|
|
|
|
err = s.scheduleClusterResourceBinding(crb)
|
|
|
|
|
metrics.BindingSchedule(string(ReconcileSchedule), metrics.SinceInSeconds(start), err)
|
|
|
|
|
return err
|
|
|
|
@ -513,11 +530,12 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
|
|
|
|
|
}
|
|
|
|
|
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr
|
|
|
|
|
|
|
|
|
|
binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
|
|
|
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.updateBindingStatusIfNeeded(binding)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) (err error) {
|
|
|
|
@ -550,11 +568,12 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
|
|
|
|
|
}
|
|
|
|
|
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)
|
|
|
|
|
|
|
|
|
|
binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
|
|
|
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.updateClusterBindingStatusIfNeeded(binding)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) handleErr(err error, key interface{}) {
|
|
|
|
@ -690,11 +709,12 @@ func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *wor
|
|
|
|
|
clusterResourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
|
|
|
|
|
klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters)
|
|
|
|
|
|
|
|
|
|
clusterResourceBinding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
|
|
|
|
|
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.updateClusterBindingStatusIfNeeded(clusterResourceBinding)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) error {
|
|
|
|
@ -717,11 +737,12 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha2.Reso
|
|
|
|
|
resourceBinding.Spec.Clusters = reScheduleResult.SuggestedClusters
|
|
|
|
|
klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters)
|
|
|
|
|
|
|
|
|
|
resourceBinding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
|
|
|
|
|
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(resourceBinding.Namespace).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.updateBindingStatusIfNeeded(resourceBinding)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) scaleScheduleResourceBinding(resourceBinding *workv1alpha2.ResourceBinding) (err error) {
|
|
|
|
@ -748,11 +769,12 @@ func (s *Scheduler) scaleScheduleResourceBinding(resourceBinding *workv1alpha2.R
|
|
|
|
|
}
|
|
|
|
|
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr
|
|
|
|
|
|
|
|
|
|
binding, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
|
|
|
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.updateBindingStatusIfNeeded(binding)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) (err error) {
|
|
|
|
@ -786,11 +808,12 @@ func (s *Scheduler) scaleScheduleClusterResourceBinding(clusterResourceBinding *
|
|
|
|
|
}
|
|
|
|
|
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)
|
|
|
|
|
|
|
|
|
|
binding, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
|
|
|
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return s.updateClusterBindingStatusIfNeeded(binding)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Scheduler) allClustersInReadyState(tcs []workv1alpha2.TargetCluster) bool {
|
|
|
|
@ -838,46 +861,66 @@ func (s *Scheduler) establishEstimatorConnections() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// updateBindingStatusIfNeeded sets the scheduled condition of ResourceBinding to true if needed
|
|
|
|
|
func (s *Scheduler) updateBindingStatusIfNeeded(rb *workv1alpha2.ResourceBinding) error {
|
|
|
|
|
// updateBindingScheduledConditionIfNeeded sets the scheduled condition of ResourceBinding if needed
|
|
|
|
|
func (s *Scheduler) updateBindingScheduledConditionIfNeeded(rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error {
|
|
|
|
|
if rb == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
oldScheduledCondition := meta.FindStatusCondition(rb.Status.Conditions, workv1alpha2.Scheduled)
|
|
|
|
|
newScheduledCondition := metav1.Condition{
|
|
|
|
|
Type: workv1alpha2.Scheduled,
|
|
|
|
|
Status: metav1.ConditionTrue,
|
|
|
|
|
Reason: scheduleSuccessReason,
|
|
|
|
|
Message: scheduleSuccessMessage,
|
|
|
|
|
if oldScheduledCondition != nil {
|
|
|
|
|
if util.IsConditionsEqual(newScheduledCondition, *oldScheduledCondition) {
|
|
|
|
|
klog.V(4).Infof("No need to update scheduled condition")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) {
|
|
|
|
|
|
|
|
|
|
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
|
|
|
meta.SetStatusCondition(&rb.Status.Conditions, newScheduledCondition)
|
|
|
|
|
_, updateErr := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).UpdateStatus(context.TODO(), rb, metav1.UpdateOptions{})
|
|
|
|
|
if updateErr == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if updated, err := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Get(context.TODO(), rb.Name, metav1.GetOptions{}); err == nil {
|
|
|
|
|
// make a copy so we don't mutate the shared cache
|
|
|
|
|
rb = updated.DeepCopy()
|
|
|
|
|
} else {
|
|
|
|
|
klog.Errorf("failed to get updated resource binding %s/%s: %v", rb.Namespace, rb.Name, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return updateErr
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// updateClusterBindingScheduledConditionIfNeeded sets the scheduled condition of ClusterResourceBinding if needed
|
|
|
|
|
func (s *Scheduler) updateClusterBindingScheduledConditionIfNeeded(crb *workv1alpha2.ClusterResourceBinding, newScheduledCondition metav1.Condition) error {
|
|
|
|
|
if crb == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
meta.SetStatusCondition(&rb.Status.Conditions, newScheduledCondition)
|
|
|
|
|
_, err := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).UpdateStatus(context.TODO(), rb, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
klog.Errorf("Failed to update ResourceBinding status(%s/%s): %v", rb.Namespace, rb.Name, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// updateClusterBindingStatusIfNeeded sets the scheduled condition of ClusterResourceBinding to true if needed
|
|
|
|
|
func (s *Scheduler) updateClusterBindingStatusIfNeeded(crb *workv1alpha2.ClusterResourceBinding) error {
|
|
|
|
|
oldScheduledCondition := meta.FindStatusCondition(crb.Status.Conditions, workv1alpha2.Scheduled)
|
|
|
|
|
newScheduledCondition := metav1.Condition{
|
|
|
|
|
Type: workv1alpha2.Scheduled,
|
|
|
|
|
Status: metav1.ConditionTrue,
|
|
|
|
|
Reason: scheduleSuccessReason,
|
|
|
|
|
Message: scheduleSuccessMessage,
|
|
|
|
|
}
|
|
|
|
|
if equality.Semantic.DeepEqual(oldScheduledCondition, newScheduledCondition) {
|
|
|
|
|
return nil
|
|
|
|
|
if oldScheduledCondition != nil {
|
|
|
|
|
if util.IsConditionsEqual(newScheduledCondition, *oldScheduledCondition) {
|
|
|
|
|
klog.V(4).Infof("No need to update scheduled condition")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
meta.SetStatusCondition(&crb.Status.Conditions, newScheduledCondition)
|
|
|
|
|
_, err := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().UpdateStatus(context.TODO(), crb, metav1.UpdateOptions{})
|
|
|
|
|
if err != nil {
|
|
|
|
|
klog.Errorf("Failed to update ClusterResourceBinding status(%s): %v", crb.Name, err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
|
|
|
|
meta.SetStatusCondition(&crb.Status.Conditions, newScheduledCondition)
|
|
|
|
|
_, updateErr := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().UpdateStatus(context.TODO(), crb, metav1.UpdateOptions{})
|
|
|
|
|
if updateErr == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if updated, err := s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Get(context.TODO(), crb.Name, metav1.GetOptions{}); err == nil {
|
|
|
|
|
// make a copy so we don't mutate the shared cache
|
|
|
|
|
crb = updated.DeepCopy()
|
|
|
|
|
} else {
|
|
|
|
|
klog.Errorf("failed to get updated cluster resource binding %s: %v", crb.Name, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return updateErr
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|