diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 398a41a0f..2b8c436ff 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "reflect" "time" jsonpatch "github.com/evanphx/json-patch/v5" @@ -37,6 +38,7 @@ import ( "github.com/karmada-io/karmada/pkg/scheduler/framework/runtime" "github.com/karmada-io/karmada/pkg/scheduler/metrics" "github.com/karmada-io/karmada/pkg/util" + "github.com/karmada-io/karmada/pkg/util/helper" utilmetrics "github.com/karmada-io/karmada/pkg/util/metrics" ) @@ -352,10 +354,10 @@ func (s *Scheduler) doScheduleBinding(namespace, name string) (err error) { } else { condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse) } - if updateErr := s.updateBindingScheduledConditionIfNeeded(rb, condition); updateErr != nil { - klog.Errorf("Failed update condition(%s) for ResourceBinding(%s/%s)", workv1alpha2.Scheduled, rb.Namespace, rb.Name) + if updateErr := s.patchBindingScheduleStatus(rb, condition); updateErr != nil { + klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err) if err == nil { - // schedule succeed but update condition failed, return err in order to retry in next loop. + // schedule succeed but update status failed, return err in order to retry in next loop. err = updateErr } } @@ -626,36 +628,38 @@ func (s *Scheduler) establishEstimatorConnections() { } } -// updateBindingScheduledConditionIfNeeded sets the scheduled condition of ResourceBinding if needed -func (s *Scheduler) updateBindingScheduledConditionIfNeeded(rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error { +// patchBindingScheduleStatus patches schedule status of ResourceBinding when necessary. +func (s *Scheduler) patchBindingScheduleStatus(rb *workv1alpha2.ResourceBinding, newScheduledCondition metav1.Condition) error { if rb == nil { return nil } - oldScheduledCondition := meta.FindStatusCondition(rb.Status.Conditions, workv1alpha2.Scheduled) - if oldScheduledCondition != nil { - if util.IsConditionsEqual(newScheduledCondition, *oldScheduledCondition) { - klog.V(4).Infof("No need to update scheduled condition") - return nil - } + modifiedObj := rb.DeepCopy() + meta.SetStatusCondition(&modifiedObj.Status.Conditions, newScheduledCondition) + // Postpone setting observed generation until schedule succeed, assume scheduler will retry and + // will succeed eventually. + if newScheduledCondition.Status == metav1.ConditionTrue { + modifiedObj.Status.SchedulerObservedGeneration = modifiedObj.Generation } - return retry.RetryOnConflict(retry.DefaultRetry, func() error { - meta.SetStatusCondition(&rb.Status.Conditions, newScheduledCondition) - _, updateErr := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).UpdateStatus(context.TODO(), rb, metav1.UpdateOptions{}) - if updateErr == nil { - return nil - } + // Short path, ignore patch if no change. + if reflect.DeepEqual(rb.Status, modifiedObj.Status) { + return nil + } - if updated, err := s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Get(context.TODO(), rb.Name, metav1.GetOptions{}); err == nil { - // make a copy so we don't mutate the shared cache - rb = updated.DeepCopy() - } else { - klog.Errorf("failed to get updated resource binding %s/%s: %v", rb.Namespace, rb.Name, err) - } + patchBytes, err := helper.GenMergePatch(rb, modifiedObj) + if err != nil { + return fmt.Errorf("failed to create a merge patch: %v", err) + } - return updateErr - }) + _, err = s.KarmadaClient.WorkV1alpha2().ResourceBindings(rb.Namespace).Patch(context.TODO(), rb.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") + if err != nil { + klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err) + return err + } + + klog.V(4).Infof("Patch schedule status to ResourceBinding(%s/%s) succeed", rb.Namespace, rb.Name) + return nil } // updateClusterBindingScheduledConditionIfNeeded sets the scheduled condition of ClusterResourceBinding if needed diff --git a/pkg/util/helper/patch.go b/pkg/util/helper/patch.go new file mode 100644 index 000000000..1e410e269 --- /dev/null +++ b/pkg/util/helper/patch.go @@ -0,0 +1,29 @@ +package helper + +import ( + "encoding/json" + "fmt" + + jsonpatch "github.com/evanphx/json-patch/v5" +) + +// GenMergePatch will return a merge patch document capable of converting the +// original object to the modified object. +// The merge patch format is primarily intended for use with the HTTP PATCH method +// as a means of describing a set of modifications to a target resource's content. +func GenMergePatch(originalObj interface{}, modifiedObj interface{}) (patchBytes []byte, err error) { + originalBytes, err := json.Marshal(originalObj) + if err != nil { + return nil, fmt.Errorf("failed to marshal original object: %w", err) + } + modifiedBytes, err := json.Marshal(modifiedObj) + if err != nil { + return nil, fmt.Errorf("failed to marshal modified object: %w", err) + } + patchBytes, err = jsonpatch.CreateMergePatch(originalBytes, modifiedBytes) + if err != nil { + return nil, fmt.Errorf("failed to create a merge patch: %w", err) + } + + return patchBytes, nil +} diff --git a/pkg/util/helper/patch_test.go b/pkg/util/helper/patch_test.go new file mode 100644 index 000000000..72418ec73 --- /dev/null +++ b/pkg/util/helper/patch_test.go @@ -0,0 +1,99 @@ +package helper + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" +) + +func TestGenMergePatch(t *testing.T) { + testObj := workv1alpha2.ResourceBinding{ + TypeMeta: metav1.TypeMeta{Kind: "ResourceBinding", APIVersion: "work.karmada.io/v1alpha2"}, + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: workv1alpha2.ResourceBindingSpec{Clusters: []workv1alpha2.TargetCluster{{Name: "foo", Replicas: 20}}}, + Status: workv1alpha2.ResourceBindingStatus{Conditions: []metav1.Condition{{Type: "Dummy", Reason: "Dummy"}}}, + } + + tests := []struct { + name string + modifyFunc func() interface{} + expectedPatch string + expectErr bool + }{ + { + name: "update spec", + modifyFunc: func() interface{} { + modified := testObj.DeepCopy() + modified.Spec.Replicas = 10 + modified.Spec.Clusters = []workv1alpha2.TargetCluster{ + { + Name: "m1", + Replicas: 5, + }, + { + Name: "m2", + Replicas: 5, + }, + } + return modified + }, + expectedPatch: `{"spec":{"clusters":[{"name":"m1","replicas":5},{"name":"m2","replicas":5}],"replicas":10}}`, + expectErr: false, + }, + { + name: "update status", + modifyFunc: func() interface{} { + modified := testObj.DeepCopy() + modified.Status.SchedulerObservedGeneration = 10 + modified.Status.Conditions = []metav1.Condition{ + { + Type: "Scheduled", + Reason: "BindingScheduled", + }, + { + Type: "Dummy", + Reason: "Dummy", + }, + } + return modified + }, + expectedPatch: `{"status":{"conditions":[{"lastTransitionTime":null,"message":"","reason":"BindingScheduled","status":"","type":"Scheduled"},{"lastTransitionTime":null,"message":"","reason":"Dummy","status":"","type":"Dummy"}],"schedulerObservedGeneration":10}}`, + expectErr: false, + }, + { + name: "no change", + modifyFunc: func() interface{} { + modified := testObj.DeepCopy() + return modified + }, + expectedPatch: `{}`, + expectErr: false, + }, + { + name: "invalid input should arise error", + modifyFunc: func() interface{} { + var invalid = 0 + return invalid + }, + expectedPatch: ``, // empty(nil) patch + expectErr: true, + }, + } + + for _, test := range tests { + tc := test + t.Run(test.name, func(t *testing.T) { + patch, err := GenMergePatch(testObj, tc.modifyFunc()) + if err != nil && tc.expectErr == false { + t.Fatalf("unexpect error, but got: %v", err) + } else if err == nil && tc.expectErr == true { + t.Fatalf("expect error, but got none") + } + if string(patch) != tc.expectedPatch { + t.Fatalf("want patch: %s, but got :%s", tc.expectedPatch, string(patch)) + } + }) + } +}