From d4d63038ed680bf3370ab096cea2976e118f3c0e Mon Sep 17 00:00:00 2001 From: tinyma123 <1397247577@qq.com> Date: Tue, 13 Apr 2021 16:25:24 +0800 Subject: [PATCH] Modify some log details of scheduler. (#263) Signed-off-by: mabotao <1397247577@qq.com> --- pkg/scheduler/scheduler.go | 154 ++++++++++++++++++++++++++++--------- 1 file changed, 119 insertions(+), 35 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b337eb824..e1ffc7b52 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -288,9 +288,9 @@ func (s *Scheduler) getScheduleType(key string) ScheduleType { clusters := s.schedulerCache.Snapshot().GetClusters() for _, tc := range resourceBinding.Spec.Clusters { - bindedCluster := tc.Name + boundCluster := tc.Name for _, c := range clusters { - if c.Cluster().Name == bindedCluster { + if c.Cluster().Name == boundCluster { if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) { return FailoverSchedule } @@ -328,9 +328,9 @@ func (s *Scheduler) getScheduleType(key string) ScheduleType { clusters := s.schedulerCache.Snapshot().GetClusters() for _, tc := range binding.Spec.Clusters { - bindedCluster := tc.Name + boundCluster := tc.Name for _, c := range clusters { - if c.Cluster().Name == bindedCluster { + if c.Cluster().Name == boundCluster { if meta.IsStatusConditionPresentAndEqual(c.Cluster().Status.Conditions, clusterv1alpha1.ClusterConditionReady, metav1.ConditionFalse) { return FailoverSchedule } @@ -518,7 +518,8 @@ func (s *Scheduler) updateCluster(_, newObj interface{}) { klog.Infof("Found cluster(%s) failure and failover flag is %v", newCluster.Name, Failover) if Failover { // Trigger reschedule on cluster failure only when flag is true. - s.expiredBindingInQueue(newCluster.Name) + s.enqueueAffectedBinding(newCluster.Name) + s.enqueueAffectedClusterBinding(newCluster.Name) return } } @@ -544,8 +545,8 @@ func (s *Scheduler) deleteCluster(obj interface{}) { s.schedulerCache.DeleteCluster(cluster) } -// expiredBindingInQueue will find all ResourceBindings which are related to the current NotReady cluster and add them in queue. -func (s *Scheduler) expiredBindingInQueue(notReadyClusterName string) { +// enqueueAffectedBinding will find all ResourceBindings which are related to the current NotReady cluster and add them in queue. +func (s *Scheduler) enqueueAffectedBinding(notReadyClusterName string) { bindings, _ := s.bindingLister.List(labels.Everything()) klog.Infof("Start traveling all ResourceBindings") for _, binding := range bindings { @@ -564,18 +565,39 @@ func (s *Scheduler) expiredBindingInQueue(notReadyClusterName string) { } } -func (s Scheduler) failoverCandidateCluster(binding *workv1alpha1.ResourceBinding) (reserved sets.String, candidates sets.String) { - bindedCluster := sets.NewString() - for _, cluster := range binding.Spec.Clusters { - bindedCluster.Insert(cluster.Name) +// enqueueAffectedClusterBinding will find all cluster resource bindings which are related to the current NotReady cluster and add them in queue. +func (s *Scheduler) enqueueAffectedClusterBinding(notReadyClusterName string) { + bindings, _ := s.clusterBindingLister.List(labels.Everything()) + klog.Infof("Start traveling all ClusterResourceBindings") + for _, binding := range bindings { + clusters := binding.Spec.Clusters + for _, bindingCluster := range clusters { + if bindingCluster.Name == notReadyClusterName { + rescheduleKey, err := cache.MetaNamespaceKeyFunc(binding) + if err != nil { + klog.Errorf("couldn't get rescheduleKey for ClusterResourceBinding %s: %v", bindingCluster.Name, err) + return + } + s.queue.Add(rescheduleKey) + klog.Infof("Add expired ClusterResourceBinding in queue successfully") + } + } + } +} + +// getReservedAndCandidates obtains the target clusters in the binding information, returns the reserved clusters and candidate clusters +func (s Scheduler) getReservedAndCandidates(clusters []workv1alpha1.TargetCluster) (reserved sets.String, candidates sets.String) { + boundClusters := sets.NewString() + for _, cluster := range clusters { + boundClusters.Insert(cluster.Name) } - availableCluster := sets.NewString() + availableClusters := sets.NewString() for _, cluster := range s.schedulerCache.Snapshot().GetReadyClusters() { - availableCluster.Insert(cluster.Cluster().Name) + availableClusters.Insert(cluster.Cluster().Name) } - return bindedCluster.Difference(bindedCluster.Difference(availableCluster)), availableCluster.Difference(bindedCluster) + return boundClusters.Difference(boundClusters.Difference(availableClusters)), availableClusters.Difference(boundClusters) } // rescheduleOne. @@ -585,42 +607,59 @@ func (s *Scheduler) rescheduleOne(key string) (err error) { return err } - klog.Infof("begin rescheduling ResourceBinding %s %s", ns, name) - defer klog.Infof("end rescheduling ResourceBinding %s: %s", ns, name) + // ClusterResourceBinding object + if ns == "" { + klog.Infof("begin rescheduling ClusterResourceBinding %s", name) + defer klog.Infof("end rescheduling ClusterResourceBinding %s", name) - resourceBinding, err := s.bindingLister.ResourceBindings(ns).Get(name) - if errors.IsNotFound(err) { - return nil + clusterResourceBinding, err := s.clusterBindingLister.Get(name) + if errors.IsNotFound(err) { + return nil + } + crbinding := clusterResourceBinding.DeepCopy() + return s.rescheduleClusterResourceBinding(crbinding, name) } - binding := resourceBinding.DeepCopy() - reservedClusters, candidateClusters := s.failoverCandidateCluster(resourceBinding) + // ResourceBinding object + if len(ns) > 0 { + klog.Infof("begin rescheduling ResourceBinding %s %s", ns, name) + defer klog.Infof("end rescheduling ResourceBinding %s: %s", ns, name) + + resourceBinding, err := s.bindingLister.ResourceBindings(ns).Get(name) + if errors.IsNotFound(err) { + return nil + } + binding := resourceBinding.DeepCopy() + return s.rescheduleResourceBinding(binding, ns, name) + } + return nil +} + +func (s *Scheduler) rescheduleClusterResourceBinding(clusterResourceBinding *workv1alpha1.ClusterResourceBinding, name string) (err error) { + reservedClusters, candidateClusters := s.getReservedAndCandidates(clusterResourceBinding.Spec.Clusters) klog.Infof("Reserved clusters : %v", reservedClusters.List()) klog.Infof("Candidate clusters: %v", candidateClusters.List()) - deltaLen := len(binding.Spec.Clusters) - len(reservedClusters) - - klog.Infof("binding(%s/%s) has %d failure clusters, and got %d candidates", ns, name, deltaLen, len(candidateClusters)) + deltaLen := len(clusterResourceBinding.Spec.Clusters) - len(reservedClusters) + klog.Infof("binding %s has %d failure clusters, and got %d candidates", name, deltaLen, len(candidateClusters)) // TODO: should schedule as much as possible? if len(candidateClusters) < deltaLen { - klog.Warningf("ignore reschedule binding(%s/%s) as insufficient available cluster", ns, name) + klog.Warningf("ignore reschedule binding(%s) as insufficient available cluster", name) return nil } - targetClusters := reservedClusters for i := 0; i < deltaLen; i++ { for clusterName := range candidateClusters { curCluster, _ := s.clusterLister.Get(clusterName) - policyNamespace := util.GetLabelValue(binding.Labels, util.PropagationPolicyNamespaceLabel) - policyName := util.GetLabelValue(binding.Labels, util.PropagationPolicyNameLabel) - policy, _ := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName) + policyName := util.GetLabelValue(clusterResourceBinding.Labels, util.ClusterPropagationPolicyLabel) + policy, _ := s.clusterPolicyLister.Get(policyName) if !util.ClusterMatches(curCluster, *policy.Spec.Placement.ClusterAffinity) { continue } - klog.Infof("Rescheduling %s/ %s to member cluster %s", binding.Namespace, binding.Name, clusterName) + klog.Infof("Rescheduling %s to member cluster %s", clusterResourceBinding.Name, clusterName) targetClusters.Insert(clusterName) candidateClusters.Delete(clusterName) @@ -628,16 +667,61 @@ func (s *Scheduler) rescheduleOne(key string) (err error) { break } } - // TODO(tinyma123) Check if the final result meets the spread constraints. - binding.Spec.Clusters = nil + clusterResourceBinding.Spec.Clusters = nil for cluster := range targetClusters { - binding.Spec.Clusters = append(binding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster}) + clusterResourceBinding.Spec.Clusters = append(clusterResourceBinding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster}) } - klog.Infof("The final binding.Spec.Cluster values are: %v\n", binding.Spec.Clusters) + klog.Infof("The final binding.Spec.Cluster values are: %v\n", clusterResourceBinding.Spec.Clusters) - _, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(ns).Update(context.TODO(), binding, metav1.UpdateOptions{}) + _, err = s.KarmadaClient.WorkV1alpha1().ClusterResourceBindings().Update(context.TODO(), clusterResourceBinding, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil +} + +func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha1.ResourceBinding, ns, name string) (err error) { + reservedClusters, candidateClusters := s.getReservedAndCandidates(resourceBinding.Spec.Clusters) + klog.Infof("Reserved clusters : %v", reservedClusters.List()) + klog.Infof("Candidate clusters: %v", candidateClusters.List()) + deltaLen := len(resourceBinding.Spec.Clusters) - len(reservedClusters) + klog.Infof("binding(%s/%s) has %d failure clusters, and got %d candidates", ns, name, deltaLen, len(candidateClusters)) + + // TODO: should schedule as much as possible? + if len(candidateClusters) < deltaLen { + klog.Warningf("ignore reschedule binding(%s/%s) as insufficient available cluster", ns, name) + return nil + } + targetClusters := reservedClusters + + for i := 0; i < deltaLen; i++ { + for clusterName := range candidateClusters { + curCluster, _ := s.clusterLister.Get(clusterName) + policyNamespace := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNamespaceLabel) + policyName := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNameLabel) + policy, _ := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName) + + if !util.ClusterMatches(curCluster, *policy.Spec.Placement.ClusterAffinity) { + continue + } + klog.Infof("Rescheduling %s/ %s to member cluster %s", resourceBinding.Namespace, resourceBinding.Name, clusterName) + targetClusters.Insert(clusterName) + candidateClusters.Delete(clusterName) + // break as soon as find a result + break + } + } + // TODO(tinyma123) Check if the final result meets the spread constraints. + + resourceBinding.Spec.Clusters = nil + for cluster := range targetClusters { + resourceBinding.Spec.Clusters = append(resourceBinding.Spec.Clusters, workv1alpha1.TargetCluster{Name: cluster}) + } + klog.Infof("The final binding.Spec.Cluster values are: %v\n", resourceBinding.Spec.Clusters) + + _, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(ns).Update(context.TODO(), resourceBinding, metav1.UpdateOptions{}) if err != nil { return err }