Modify some log details of scheduler. (#263)

Signed-off-by: mabotao <1397247577@qq.com>
This commit is contained in:
tinyma123 2021-04-13 16:25:24 +08:00 committed by GitHub
parent e4992e46fc
commit d4d63038ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 119 additions and 35 deletions

View File

@ -288,9 +288,9 @@ func (s *Scheduler) getScheduleType(key string) ScheduleType {
clusters := s.schedulerCache.Snapshot().GetClusters() clusters := s.schedulerCache.Snapshot().GetClusters()
for _, tc := range resourceBinding.Spec.Clusters { for _, tc := range resourceBinding.Spec.Clusters {
bindedCluster := tc.Name boundCluster := tc.Name
for _, c := range clusters { for _, c := range clusters {
if c.Cluster().Name == bindedCluster { if c.Cluster().Name == boundCluster {
if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) { if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
return FailoverSchedule return FailoverSchedule
} }
@ -328,9 +328,9 @@ func (s *Scheduler) getScheduleType(key string) ScheduleType {
clusters := s.schedulerCache.Snapshot().GetClusters() clusters := s.schedulerCache.Snapshot().GetClusters()
for _, tc := range binding.Spec.Clusters { for _, tc := range binding.Spec.Clusters {
bindedCluster := tc.Name boundCluster := tc.Name
for _, c := range clusters { for _, c := range clusters {
if c.Cluster().Name == bindedCluster { if c.Cluster().Name == boundCluster {
if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) { if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) {
return FailoverSchedule return FailoverSchedule
} }
@ -518,7 +518,8 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) {
klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, Failover) klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, Failover)
if Failover { // Trigger reschedule on cluster failure only when flag is true. if Failover { // Trigger reschedule on cluster failure only when flag is true.
s.expiredBindingInQueue(newCluster.Name) s.enqueueAffectedBinding(newCluster.Name)
s.enqueueAffectedClusterBinding(newCluster.Name)
return return
} }
} }
@ -544,8 +545,8 @@ func (s *Scheduler) deleteCluster(obj interface{}) {
s.schedulerCache.DeleteCluster(cluster) s.schedulerCache.DeleteCluster(cluster)
} }
// expiredBindingInQueue will find all ResourceBindings which are related to the current NotReady cluster and add them in queue. // enqueueAffectedBinding will find all ResourceBindings which are related to the current NotReady cluster and add them in queue.
func (s *Scheduler) expiredBindingInQueue(notReadyClusterName string) { func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) {
bindings, _ := s.bindingLister.List(labels.Everything()) bindings, _ := s.bindingLister.List(labels.Everything())
klog.Infof("Start traveling all ResourceBindings") klog.Infof("Start traveling all ResourceBindings")
for _, binding := range bindings { for _, binding := range bindings {
@ -564,18 +565,39 @@ func (s *Scheduler) expiredBindingInQueue(notReadyClusterName string) {
} }
} }
func (s Scheduler) failoverCandidateCluster(binding *workv1alpha1.ResourceBinding) (reserved sets.String, candidates sets.String) { // enqueueAffectedClusterBinding will find all cluster resource bindings which are related to the current NotReady cluster and add them in queue.
bindedCluster := sets.NewString() func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) {
for _, cluster := range binding.Spec.Clusters { bindings, _ := s.clusterBindingLister.List(labels.Everything())
bindedCluster.Insert(cluster.Name) klog.Infof("Start traveling all ClusterResourceBindings")
for _, binding := range bindings {
clusters := binding.Spec.Clusters
for _, bindingCluster := range clusters {
if bindingCluster.Name == notReadyClusterName {
rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding)
if err != nil {
klog.Errorf("couldn't get rescheduleKey for ClusterResourceBinding %s: %v", bindingCluster.Name, err)
return
}
s.queue.Add(rescheduleKey)
klog.Infof("Add expired ClusterResourceBinding in queue successfully")
}
}
}
} }
availableCluster := sets.NewString() // getReservedAndCandidates obtains the target clusters in the binding information, returns the reserved clusters and candidate clusters
func (s Scheduler) getReservedAndCandidates(clusters []workv1alpha1.TargetCluster) (reserved sets.String, candidates sets.String) {
boundClusters := sets.NewString()
for _, cluster := range clusters {
boundClusters.Insert(cluster.Name)
}
availableClusters := sets.NewString()
for _, cluster := range s.schedulerCache.Snapshot().GetReadyClusters() { for _, cluster := range s.schedulerCache.Snapshot().GetReadyClusters() {
availableCluster.Insert(cluster.Cluster().Name) availableClusters.Insert(cluster.Cluster().Name)
} }
return bindedCluster.Difference(bindedCluster.Difference(availableCluster)), availableCluster.Difference(bindedCluster) return boundClusters.Difference(boundClusters.Difference(availableClusters)), availableClusters.Difference(boundClusters)
} }
// rescheduleOne. // rescheduleOne.
@ -585,6 +607,21 @@ func (s *Scheduler) rescheduleOne(key string) (err error) {
return err return err
} }
// ClusterResourceBinding object
if ns == "" {
klog.Infof("begin rescheduling ClusterResourceBinding %s", name)
defer klog.Infof("end rescheduling ClusterResourceBinding %s", name)
clusterResourceBinding, err := s.clusterBindingLister.Get(name)
if errors.IsNotFound(err) {
return nil
}
crbinding := clusterResourceBinding.DeepCopy()
return s.rescheduleClusterResourceBinding(crbinding, name)
}
// ResourceBinding object
if len(ns) > 0 {
klog.Infof("begin rescheduling ResourceBinding %s %s", ns, name) klog.Infof("begin rescheduling ResourceBinding %s %s", ns, name)
defer klog.Infof("end rescheduling ResourceBinding %s: %s", ns, name) defer klog.Infof("end rescheduling ResourceBinding %s: %s", ns, name)
@ -592,35 +629,37 @@ func (s *Scheduler) rescheduleOne(key string) (err error) {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
return nil return nil
} }
binding := resourceBinding.DeepCopy() binding := resourceBinding.DeepCopy()
reservedClusters, candidateClusters := s.failoverCandidateCluster(resourceBinding) return s.rescheduleResourceBinding(binding, ns, name)
klog.Infof("Reserved clusters : %v", reservedClusters.List()) }
klog.Infof("Candidate clusters: %v", candidateClusters.List())
deltaLen := len(binding.Spec.Clusters) - len(reservedClusters)
klog.Infof("binding(%s/%s) has %d failure clusters, and got %d candidates", ns, name, deltaLen, len(candidateClusters))
// TODO: should schedule as much as possible?
if len(candidateClusters) < deltaLen {
klog.Warningf("ignore reschedule binding(%s/%s) as insufficient available cluster", ns, name)
return nil return nil
} }
func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *workv1alpha1.ClusterResourceBinding, name string) (err error) {
reservedClusters, candidateClusters := s.getReservedAndCandidates(clusterResourceBinding.Spec.Clusters)
klog.Infof("Reserved clusters : %v", reservedClusters.List())
klog.Infof("Candidate clusters: %v", candidateClusters.List())
deltaLen := len(clusterResourceBinding.Spec.Clusters) - len(reservedClusters)
klog.Infof("binding %s has %d failure clusters, and got %d candidates", name, deltaLen, len(candidateClusters))
// TODO: should schedule as much as possible?
if len(candidateClusters) < deltaLen {
klog.Warningf("ignore reschedule binding(%s) as insufficient available cluster", name)
return nil
}
targetClusters := reservedClusters targetClusters := reservedClusters
for i := 0; i < deltaLen; i++ { for i := 0; i < deltaLen; i++ {
for clusterName := range candidateClusters { for clusterName := range candidateClusters {
curCluster, _ := s.clusterLister.Get(clusterName) curCluster, _ := s.clusterLister.Get(clusterName)
policyNamespace := util.GetLabelValue(binding.Labels, util.PropagationPolicyNamespaceLabel) policyName := util.GetLabelValue(clusterResourceBinding.Labels, util.ClusterPropagationPolicyLabel)
policyName := util.GetLabelValue(binding.Labels, util.PropagationPolicyNameLabel) policy, _ := s.clusterPolicyLister.Get(policyName)
policy, _ := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
if !util.ClusterMatches(curCluster, *policy.Spec.Placement.ClusterAffinity) { if !util.ClusterMatches(curCluster, *policy.Spec.Placement.ClusterAffinity) {
continue continue
} }
klog.Infof("Rescheduling %s/ %s to member cluster %s", binding.Namespace, binding.Name, clusterName) klog.Infof("Rescheduling %s to member cluster %s", clusterResourceBinding.Name, clusterName)
targetClusters.Insert(clusterName) targetClusters.Insert(clusterName)
candidateClusters.Delete(clusterName) candidateClusters.Delete(clusterName)
@ -628,16 +667,61 @@ func (s *Scheduler) rescheduleOne(key string) (err error) {
break break
} }
} }
// TODO(tinyma123) Check if the final result meets the spread constraints. // TODO(tinyma123) Check if the final result meets the spread constraints.
binding.Spec.Clusters = nil clusterResourceBinding.Spec.Clusters = nil
for cluster := range targetClusters { for cluster := range targetClusters {
binding.Spec.Clusters = append(binding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster}) clusterResourceBinding.Spec.Clusters = append(clusterResourceBinding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster})
} }
klog.Infof("The final binding.Spec.Cluster values are: %v\n", binding.Spec.Clusters) klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters)
_, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(ns).Update(context.TODO(), binding, metav1.UpdateOptions{}) _, err = s.KarmadaClient.WorkV1alpha1().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha1.ResourceBinding, ns, name string) (err error) {
reservedClusters, candidateClusters := s.getReservedAndCandidates(resourceBinding.Spec.Clusters)
klog.Infof("Reserved clusters : %v", reservedClusters.List())
klog.Infof("Candidate clusters: %v", candidateClusters.List())
deltaLen := len(resourceBinding.Spec.Clusters) - len(reservedClusters)
klog.Infof("binding(%s/%s) has %d failure clusters, and got %d candidates", ns, name, deltaLen, len(candidateClusters))
// TODO: should schedule as much as possible?
if len(candidateClusters) < deltaLen {
klog.Warningf("ignore reschedule binding(%s/%s) as insufficient available cluster", ns, name)
return nil
}
targetClusters := reservedClusters
for i := 0; i < deltaLen; i++ {
for clusterName := range candidateClusters {
curCluster, _ := s.clusterLister.Get(clusterName)
policyNamespace := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNamespaceLabel)
policyName := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNameLabel)
policy, _ := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
if !util.ClusterMatches(curCluster, *policy.Spec.Placement.ClusterAffinity) {
continue
}
klog.Infof("Rescheduling %s/ %s to member cluster %s", resourceBinding.Namespace, resourceBinding.Name, clusterName)
targetClusters.Insert(clusterName)
candidateClusters.Delete(clusterName)
// break as soon as find a result
break
}
}
// TODO(tinyma123) Check if the final result meets the spread constraints.
resourceBinding.Spec.Clusters = nil
for cluster := range targetClusters {
resourceBinding.Spec.Clusters = append(resourceBinding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster})
}
klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters)
_, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(ns).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{})
if err != nil { if err != nil {
return err return err
} }