Merge pull request #1477 from mrlihanbo/scheduler-add-retry
replace update method with patch method when updating schedule result
This commit is contained in:
commit
38f326dd20
|
@ -6,10 +6,12 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
jsonpatch "github.com/evanphx/json-patch/v5"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
@ -376,27 +378,40 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec)
|
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
|
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
|
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
|
||||||
|
return s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
|
||||||
|
}
|
||||||
|
|
||||||
binding := resourceBinding.DeepCopy()
|
func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
|
||||||
binding.Spec.Clusters = scheduleResult.SuggestedClusters
|
newBinding := oldBinding.DeepCopy()
|
||||||
|
if newBinding.Annotations == nil {
|
||||||
if binding.Annotations == nil {
|
newBinding.Annotations = make(map[string]string)
|
||||||
binding.Annotations = make(map[string]string)
|
|
||||||
}
|
}
|
||||||
binding.Annotations[util.PolicyPlacementAnnotation] = placementStr
|
newBinding.Annotations[util.PolicyPlacementAnnotation] = placement
|
||||||
|
newBinding.Spec.Clusters = scheduleResult
|
||||||
|
|
||||||
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(binding.Namespace).Update(context.TODO(), binding, metav1.UpdateOptions{})
|
oldData, err := json.Marshal(oldBinding)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("failed to marshal the existing resource binding(%s/%s): %v", oldBinding.Namespace, oldBinding.Name, err)
|
||||||
|
}
|
||||||
|
newData, err := json.Marshal(newBinding)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal the new resource binding(%s/%s): %v", newBinding.Namespace, newBinding.Name, err)
|
||||||
|
}
|
||||||
|
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create a merge patch: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
_, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(newBinding.Namespace).Patch(context.TODO(), newBinding.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) (err error) {
|
func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv1alpha2.ClusterResourceBinding) (err error) {
|
||||||
|
@ -408,15 +423,6 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec)
|
|
||||||
if err != nil {
|
|
||||||
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
|
|
||||||
|
|
||||||
binding := clusterResourceBinding.DeepCopy()
|
|
||||||
binding.Spec.Clusters = scheduleResult.SuggestedClusters
|
|
||||||
|
|
||||||
placement, err := json.Marshal(policy.Spec.Placement)
|
placement, err := json.Marshal(policy.Spec.Placement)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -424,17 +430,39 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if binding.Annotations == nil {
|
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec)
|
||||||
binding.Annotations = make(map[string]string)
|
|
||||||
}
|
|
||||||
binding.Annotations[util.PolicyPlacementAnnotation] = string(placement)
|
|
||||||
|
|
||||||
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Update(context.TODO(), binding, metav1.UpdateOptions{})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
|
||||||
|
return s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
|
||||||
|
newBinding := oldBinding.DeepCopy()
|
||||||
|
if newBinding.Annotations == nil {
|
||||||
|
newBinding.Annotations = make(map[string]string)
|
||||||
|
}
|
||||||
|
newBinding.Annotations[util.PolicyPlacementAnnotation] = placement
|
||||||
|
newBinding.Spec.Clusters = scheduleResult
|
||||||
|
|
||||||
|
oldData, err := json.Marshal(oldBinding)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal the existing cluster resource binding(%s): %v", oldBinding.Name, err)
|
||||||
|
}
|
||||||
|
newData, err := json.Marshal(newBinding)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal the new resource binding(%s): %v", newBinding.Name, err)
|
||||||
|
}
|
||||||
|
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create a merge patch: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.KarmadaClient.WorkV1alpha2().ClusterResourceBindings().Patch(context.TODO(), newBinding.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) handleErr(err error, key interface{}) {
|
func (s *Scheduler) handleErr(err error, key interface{}) {
|
||||||
|
|
Loading…
Reference in New Issue