From 57c982f3277763b470df1ff47f634ae22dda07f9 Mon Sep 17 00:00:00 2001 From: chaosi-zju Date: Mon, 6 May 2024 16:53:22 +0800 Subject: [PATCH] feat: support update event in WorkloadRebalancer Signed-off-by: chaosi-zju --- .../workloadrebalancer_controller.go | 62 ++++++++++++++----- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go b/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go index 12270330b..925c11832 100644 --- a/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go +++ b/pkg/controllers/workloadrebalancer/workloadrebalancer_controller.go @@ -19,9 +19,12 @@ package workloadrebalancer import ( "context" "fmt" + "reflect" + "sort" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" @@ -48,8 +51,12 @@ type RebalancerController struct { // SetupWithManager creates a controller and register to controller manager. func (c *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) error { var predicateFunc = predicate.Funcs{ - CreateFunc: func(e event.CreateEvent) bool { return true }, - UpdateFunc: func(e event.UpdateEvent) bool { return false }, + CreateFunc: func(e event.CreateEvent) bool { return true }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldObj := e.ObjectOld.(*appsv1alpha1.WorkloadRebalancer) + newObj := e.ObjectNew.(*appsv1alpha1.WorkloadRebalancer) + return !reflect.DeepEqual(oldObj.Spec, newObj.Spec) + }, DeleteFunc: func(event.DeleteEvent) bool { return false }, GenericFunc: func(event.GenericEvent) bool { return false }, } @@ -92,25 +99,52 @@ func (c *RebalancerController) Reconcile(ctx context.Context, req controllerrunt return controllerruntime.Result{}, nil } -func (c *RebalancerController) buildWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) *appsv1alpha1.WorkloadRebalancerStatus { - resourceList := make([]appsv1alpha1.ObservedWorkload, 0) - for _, resource := range rebalancer.Spec.Workloads { - resourceList = append(resourceList, appsv1alpha1.ObservedWorkload{ - Workload: resource, - }) +// When spec filed of WorkloadRebalancer updated, we shall refresh the workload list in status.observedWorkloads: +// 1. a new workload added to spec list, just add it into status list and do the rebalance. +// 2. a workload deleted from previous spec list, keep it in status list if already success, and remove it if not. +// 3. a workload is modified, just regard it as deleted an old one and inserted a new one. +// 4. just list order is disrupted, no additional action. +func (c *RebalancerController) syncWorkloadsFromSpecToStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) *appsv1alpha1.WorkloadRebalancerStatus { + observedWorkloads := make([]appsv1alpha1.ObservedWorkload, 0) + + specWorkloadSet := sets.New[appsv1alpha1.ObjectReference]() + for _, workload := range rebalancer.Spec.Workloads { + specWorkloadSet.Insert(workload) } - return &appsv1alpha1.WorkloadRebalancerStatus{ - ObservedWorkloads: resourceList, + + for _, item := range rebalancer.Status.ObservedWorkloads { + // if item still exist in `spec`, keep it in `status` and remove it from `specWorkloadSet`. + // if item no longer exist in `spec`, keep it in `status` if it already success, otherwise remove it from `status`. + if specWorkloadSet.Has(item.Workload) { + observedWorkloads = append(observedWorkloads, item) + specWorkloadSet.Delete(item.Workload) + } else if item.Result == appsv1alpha1.RebalanceSuccessful { + observedWorkloads = append(observedWorkloads, item) + } } + + // since item exist in both `spec` and `status` has been removed, the left means the newly added workload, + // add them into `status`. + for workload := range specWorkloadSet { + observedWorkloads = append(observedWorkloads, appsv1alpha1.ObservedWorkload{Workload: workload}) + } + + sort.Slice(observedWorkloads, func(i, j int) bool { + wi := observedWorkloads[i].Workload + wj := observedWorkloads[j].Workload + stri := fmt.Sprintf("%s#%s#%s#%s", wi.APIVersion, wi.Kind, wi.Namespace, wi.Name) + strj := fmt.Sprintf("%s#%s#%s#%s", wj.APIVersion, wj.Kind, wj.Namespace, wj.Name) + return stri < strj + }) + + return &appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads} } func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) ( newStatus *appsv1alpha1.WorkloadRebalancerStatus, successNum int64, retryNum int64) { // get previous status and update basing on it - newStatus = rebalancer.Status.DeepCopy() - if len(newStatus.ObservedWorkloads) == 0 { - newStatus = c.buildWorkloadRebalancerStatus(rebalancer) - } + + newStatus = c.syncWorkloadsFromSpecToStatus(rebalancer) successNum, retryNum = int64(0), int64(0) for i, resource := range newStatus.ObservedWorkloads {