Merge pull request #4860 from chaosi-zju/reschedule-update
feat: support update event in WorkloadRebalancer
This commit is contained in:
commit
4a0876cff5
|
@ -19,9 +19,12 @@ package workloadrebalancer
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog/v2"
|
||||
controllerruntime "sigs.k8s.io/controller-runtime"
|
||||
|
@ -48,8 +51,12 @@ type RebalancerController struct {
|
|||
// SetupWithManager creates a controller and register to controller manager.
|
||||
func (c *RebalancerController) SetupWithManager(mgr controllerruntime.Manager) error {
|
||||
var predicateFunc = predicate.Funcs{
|
||||
CreateFunc: func(e event.CreateEvent) bool { return true },
|
||||
UpdateFunc: func(e event.UpdateEvent) bool { return false },
|
||||
CreateFunc: func(e event.CreateEvent) bool { return true },
|
||||
UpdateFunc: func(e event.UpdateEvent) bool {
|
||||
oldObj := e.ObjectOld.(*appsv1alpha1.WorkloadRebalancer)
|
||||
newObj := e.ObjectNew.(*appsv1alpha1.WorkloadRebalancer)
|
||||
return !reflect.DeepEqual(oldObj.Spec, newObj.Spec)
|
||||
},
|
||||
DeleteFunc: func(event.DeleteEvent) bool { return false },
|
||||
GenericFunc: func(event.GenericEvent) bool { return false },
|
||||
}
|
||||
|
@ -92,25 +99,52 @@ func (c *RebalancerController) Reconcile(ctx context.Context, req controllerrunt
|
|||
return controllerruntime.Result{}, nil
|
||||
}
|
||||
|
||||
func (c *RebalancerController) buildWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) *appsv1alpha1.WorkloadRebalancerStatus {
|
||||
resourceList := make([]appsv1alpha1.ObservedWorkload, 0)
|
||||
for _, resource := range rebalancer.Spec.Workloads {
|
||||
resourceList = append(resourceList, appsv1alpha1.ObservedWorkload{
|
||||
Workload: resource,
|
||||
})
|
||||
// When spec filed of WorkloadRebalancer updated, we shall refresh the workload list in status.observedWorkloads:
|
||||
// 1. a new workload added to spec list, just add it into status list and do the rebalance.
|
||||
// 2. a workload deleted from previous spec list, keep it in status list if already success, and remove it if not.
|
||||
// 3. a workload is modified, just regard it as deleted an old one and inserted a new one.
|
||||
// 4. just list order is disrupted, no additional action.
|
||||
func (c *RebalancerController) syncWorkloadsFromSpecToStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) *appsv1alpha1.WorkloadRebalancerStatus {
|
||||
observedWorkloads := make([]appsv1alpha1.ObservedWorkload, 0)
|
||||
|
||||
specWorkloadSet := sets.New[appsv1alpha1.ObjectReference]()
|
||||
for _, workload := range rebalancer.Spec.Workloads {
|
||||
specWorkloadSet.Insert(workload)
|
||||
}
|
||||
return &appsv1alpha1.WorkloadRebalancerStatus{
|
||||
ObservedWorkloads: resourceList,
|
||||
|
||||
for _, item := range rebalancer.Status.ObservedWorkloads {
|
||||
// if item still exist in `spec`, keep it in `status` and remove it from `specWorkloadSet`.
|
||||
// if item no longer exist in `spec`, keep it in `status` if it already success, otherwise remove it from `status`.
|
||||
if specWorkloadSet.Has(item.Workload) {
|
||||
observedWorkloads = append(observedWorkloads, item)
|
||||
specWorkloadSet.Delete(item.Workload)
|
||||
} else if item.Result == appsv1alpha1.RebalanceSuccessful {
|
||||
observedWorkloads = append(observedWorkloads, item)
|
||||
}
|
||||
}
|
||||
|
||||
// since item exist in both `spec` and `status` has been removed, the left means the newly added workload,
|
||||
// add them into `status`.
|
||||
for workload := range specWorkloadSet {
|
||||
observedWorkloads = append(observedWorkloads, appsv1alpha1.ObservedWorkload{Workload: workload})
|
||||
}
|
||||
|
||||
sort.Slice(observedWorkloads, func(i, j int) bool {
|
||||
wi := observedWorkloads[i].Workload
|
||||
wj := observedWorkloads[j].Workload
|
||||
stri := fmt.Sprintf("%s#%s#%s#%s", wi.APIVersion, wi.Kind, wi.Namespace, wi.Name)
|
||||
strj := fmt.Sprintf("%s#%s#%s#%s", wj.APIVersion, wj.Kind, wj.Namespace, wj.Name)
|
||||
return stri < strj
|
||||
})
|
||||
|
||||
return &appsv1alpha1.WorkloadRebalancerStatus{ObservedWorkloads: observedWorkloads}
|
||||
}
|
||||
|
||||
func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) (
|
||||
newStatus *appsv1alpha1.WorkloadRebalancerStatus, successNum int64, retryNum int64) {
|
||||
// get previous status and update basing on it
|
||||
newStatus = rebalancer.Status.DeepCopy()
|
||||
if len(newStatus.ObservedWorkloads) == 0 {
|
||||
newStatus = c.buildWorkloadRebalancerStatus(rebalancer)
|
||||
}
|
||||
|
||||
newStatus = c.syncWorkloadsFromSpecToStatus(rebalancer)
|
||||
|
||||
successNum, retryNum = int64(0), int64(0)
|
||||
for i, resource := range newStatus.ObservedWorkloads {
|
||||
|
|
Loading…
Reference in New Issue