fix issue that Scheduler do not schedule resourcebinding generated from clusterpropagationpolicy (#374)

Signed-off-by: pengli <justdoit.pli@gmail.com>
This commit is contained in:
Peng Li 2021-05-27 15:31:11 +08:00 committed by GitHub
parent 4ae750b65d
commit ec97ab4089
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 59 additions and 29 deletions

View File

@ -250,6 +250,52 @@ func (s *Scheduler) worker() {
}
}
func (s *Scheduler) getPlacement(resourceBinding *workv1alpha1.ResourceBinding) (policyv1alpha1.Placement, string, error) {
var placement policyv1alpha1.Placement
var clusterPolicyName string
var policyName string
var policyNamespace string
var err error
if clusterPolicyName = util.GetLabelValue(resourceBinding.Labels, util.ClusterPropagationPolicyLabel); clusterPolicyName != "" {
var clusterPolicy *policyv1alpha1.ClusterPropagationPolicy
clusterPolicy, err = s.clusterPolicyLister.Get(clusterPolicyName)
if err != nil {
return placement, "", err
}
placement = clusterPolicy.Spec.Placement
}
if policyName = util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNameLabel); policyName != "" {
policyNamespace = util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNamespaceLabel)
var policy *policyv1alpha1.PropagationPolicy
policy, err = s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
if err != nil {
return placement, "", err
}
placement = policy.Spec.Placement
}
var placementBytes []byte
placementBytes, err = json.Marshal(placement)
if err != nil {
return placement, "", err
}
defer func() {
if err != nil {
if clusterPolicyName != "" {
klog.Errorf("Failed to get placement of clusterPropagationPolicy %s, error: %v", clusterPolicyName, err)
} else {
klog.Errorf("Failed to get placement of propagationPolicy %s/%s, error: %v", policyNamespace, policyName, err)
}
}
}()
return placement, string(placementBytes), nil
}
func (s *Scheduler) getScheduleType(key string) ScheduleType {
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
@ -267,19 +313,10 @@ func (s *Scheduler) getScheduleType(key string) ScheduleType {
return FirstSchedule
}
policyNamespace := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNamespaceLabel)
policyName := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNameLabel)
policy, err := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
_, policyPlacementStr, err := s.getPlacement(resourceBinding)
if err != nil {
return Unknown
}
placement, err := json.Marshal(policy.Spec.Placement)
if err != nil {
klog.Errorf("Failed to marshal placement of propagationPolicy %s/%s, error: %v", policy.Namespace, policy.Name, err)
return Unknown
}
policyPlacementStr := string(placement)
appliedPlacement := util.GetLabelValue(resourceBinding.Annotations, util.PolicyPlacementAnnotation)
@ -403,19 +440,17 @@ func (s *Scheduler) scheduleOne(key string) (err error) {
if errors.IsNotFound(err) {
return nil
}
policyNamespace := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNamespaceLabel)
policyName := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNameLabel)
policy, err := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
return s.scheduleResourceBinding(resourceBinding)
}
func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.ResourceBinding) (err error) {
placement, placementStr, err := s.getPlacement(resourceBinding)
if err != nil {
return err
}
return s.scheduleResourceBinding(resourceBinding, policy)
}
func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.ResourceBinding, policy *policyv1alpha1.PropagationPolicy) (err error) {
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement)
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement)
if err != nil {
klog.V(2).Infof("failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
return err
@ -429,16 +464,10 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha1.Resour
}
binding.Spec.Clusters = targetClusters
placement, err := json.Marshal(policy.Spec.Placement)
if err != nil {
klog.Errorf("Failed to marshal placement of propagationPolicy %s/%s, error: %v", policy.Namespace, policy.Name, err)
return err
}
if binding.Annotations == nil {
binding.Annotations = make(map[string]string)
}
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr
_, err = s.KarmadaClient.WorkV1alpha1().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
if err != nil {
@ -702,11 +731,12 @@ func (s *Scheduler) rescheduleResourceBinding(resourceBinding *workv1alpha1.Reso
for i := 0; i < deltaLen; i++ {
for clusterName := range candidateClusters {
curCluster, _ := s.clusterLister.Get(clusterName)
policyNamespace := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNamespaceLabel)
policyName := util.GetLabelValue(resourceBinding.Labels, util.PropagationPolicyNameLabel)
policy, _ := s.policyLister.PropagationPolicies(policyNamespace).Get(policyName)
placement, _, err := s.getPlacement(resourceBinding)
if err != nil {
return err
}
if policy.Spec.Placement.ClusterAffinity != nil && !util.ClusterMatches(curCluster, *policy.Spec.Placement.ClusterAffinity) {
if placement.ClusterAffinity != nil && !util.ClusterMatches(curCluster, *placement.ClusterAffinity) {
continue
}
klog.Infof("Rescheduling %s/ %s to member cluster %s", resourceBinding.Namespace, resourceBinding.Name, clusterName)