Merge pull request #4875 from chaosi-zju/reschedule-fix
fix patching WorkloadRebalancer failed due to misuse of shared slices
This commit is contained in:
commit
c7c8f49396
|
@ -80,7 +80,7 @@ func (c *RebalancerController) Reconcile(ctx context.Context, req controllerrunt
|
||||||
newStatus, successNum, retryNum := c.doWorkloadRebalance(ctx, rebalancer)
|
newStatus, successNum, retryNum := c.doWorkloadRebalance(ctx, rebalancer)
|
||||||
|
|
||||||
// 3. update status of WorkloadRebalancer
|
// 3. update status of WorkloadRebalancer
|
||||||
if err := c.updateWorkloadRebalancerStatus(rebalancer, newStatus); err != nil {
|
if err := c.updateWorkloadRebalancerStatus(ctx, rebalancer, newStatus); err != nil {
|
||||||
return controllerruntime.Result{}, err
|
return controllerruntime.Result{}, err
|
||||||
}
|
}
|
||||||
klog.Infof("Finish handling WorkloadRebalancer (%s), %d/%d resource success in all, while %d resource need retry",
|
klog.Infof("Finish handling WorkloadRebalancer (%s), %d/%d resource success in all, while %d resource need retry",
|
||||||
|
@ -92,22 +92,22 @@ func (c *RebalancerController) Reconcile(ctx context.Context, req controllerrunt
|
||||||
return controllerruntime.Result{}, nil
|
return controllerruntime.Result{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RebalancerController) buildWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) appsv1alpha1.WorkloadRebalancerStatus {
|
func (c *RebalancerController) buildWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer) *appsv1alpha1.WorkloadRebalancerStatus {
|
||||||
resourceList := make([]appsv1alpha1.ObservedWorkload, 0)
|
resourceList := make([]appsv1alpha1.ObservedWorkload, 0)
|
||||||
for _, resource := range rebalancer.Spec.Workloads {
|
for _, resource := range rebalancer.Spec.Workloads {
|
||||||
resourceList = append(resourceList, appsv1alpha1.ObservedWorkload{
|
resourceList = append(resourceList, appsv1alpha1.ObservedWorkload{
|
||||||
Workload: resource,
|
Workload: resource,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return appsv1alpha1.WorkloadRebalancerStatus{
|
return &appsv1alpha1.WorkloadRebalancerStatus{
|
||||||
ObservedWorkloads: resourceList,
|
ObservedWorkloads: resourceList,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) (
|
func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer) (
|
||||||
newStatus appsv1alpha1.WorkloadRebalancerStatus, successNum int64, retryNum int64) {
|
newStatus *appsv1alpha1.WorkloadRebalancerStatus, successNum int64, retryNum int64) {
|
||||||
// get previous status and update basing on it
|
// get previous status and update basing on it
|
||||||
newStatus = rebalancer.Status
|
newStatus = rebalancer.Status.DeepCopy()
|
||||||
if len(newStatus.ObservedWorkloads) == 0 {
|
if len(newStatus.ObservedWorkloads) == 0 {
|
||||||
newStatus = c.buildWorkloadRebalancerStatus(rebalancer)
|
newStatus = c.buildWorkloadRebalancerStatus(rebalancer)
|
||||||
}
|
}
|
||||||
|
@ -127,8 +127,8 @@ func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalanc
|
||||||
if resource.Workload.Namespace != "" {
|
if resource.Workload.Namespace != "" {
|
||||||
binding := &workv1alpha2.ResourceBinding{}
|
binding := &workv1alpha2.ResourceBinding{}
|
||||||
if err := c.Client.Get(ctx, client.ObjectKey{Namespace: resource.Workload.Namespace, Name: bindingName}, binding); err != nil {
|
if err := c.Client.Get(ctx, client.ObjectKey{Namespace: resource.Workload.Namespace, Name: bindingName}, binding); err != nil {
|
||||||
klog.Errorf("get binding failed: %+v", err)
|
klog.Errorf("get binding for resource %+v failed: %+v", resource.Workload, err)
|
||||||
c.recordWorkloadRebalanceFailed(&newStatus.ObservedWorkloads[i], &retryNum, err)
|
c.recordAndCountRebalancerFailed(&newStatus.ObservedWorkloads[i], &retryNum, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// update spec.rescheduleTriggeredAt of referenced fetchTargetRefBindings to trigger a rescheduling
|
// update spec.rescheduleTriggeredAt of referenced fetchTargetRefBindings to trigger a rescheduling
|
||||||
|
@ -136,17 +136,17 @@ func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalanc
|
||||||
binding.Spec.RescheduleTriggeredAt = &rebalancer.CreationTimestamp
|
binding.Spec.RescheduleTriggeredAt = &rebalancer.CreationTimestamp
|
||||||
|
|
||||||
if err := c.Client.Update(ctx, binding); err != nil {
|
if err := c.Client.Update(ctx, binding); err != nil {
|
||||||
klog.Errorf("update binding failed: %+v", err)
|
klog.Errorf("update binding for resource %+v failed: %+v", resource.Workload, err)
|
||||||
c.recordWorkloadRebalanceFailed(&newStatus.ObservedWorkloads[i], &retryNum, err)
|
c.recordAndCountRebalancerFailed(&newStatus.ObservedWorkloads[i], &retryNum, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.recordWorkloadRebalanceSuccess(&newStatus.ObservedWorkloads[i], &successNum)
|
c.recordAndCountRebalancerSuccess(&newStatus.ObservedWorkloads[i], &successNum)
|
||||||
} else {
|
} else {
|
||||||
clusterbinding := &workv1alpha2.ClusterResourceBinding{}
|
clusterbinding := &workv1alpha2.ClusterResourceBinding{}
|
||||||
if err := c.Client.Get(ctx, client.ObjectKey{Name: bindingName}, clusterbinding); err != nil {
|
if err := c.Client.Get(ctx, client.ObjectKey{Name: bindingName}, clusterbinding); err != nil {
|
||||||
klog.Errorf("get cluster binding failed: %+v", err)
|
klog.Errorf("get cluster binding for resource %+v failed: %+v", resource.Workload, err)
|
||||||
c.recordWorkloadRebalanceFailed(&newStatus.ObservedWorkloads[i], &retryNum, err)
|
c.recordAndCountRebalancerFailed(&newStatus.ObservedWorkloads[i], &retryNum, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// update spec.rescheduleTriggeredAt of referenced clusterbinding to trigger a rescheduling
|
// update spec.rescheduleTriggeredAt of referenced clusterbinding to trigger a rescheduling
|
||||||
|
@ -154,12 +154,12 @@ func (c *RebalancerController) doWorkloadRebalance(ctx context.Context, rebalanc
|
||||||
clusterbinding.Spec.RescheduleTriggeredAt = &rebalancer.CreationTimestamp
|
clusterbinding.Spec.RescheduleTriggeredAt = &rebalancer.CreationTimestamp
|
||||||
|
|
||||||
if err := c.Client.Update(ctx, clusterbinding); err != nil {
|
if err := c.Client.Update(ctx, clusterbinding); err != nil {
|
||||||
klog.Errorf("update cluster binding failed: %+v", err)
|
klog.Errorf("update cluster binding for resource %+v failed: %+v", resource.Workload, err)
|
||||||
c.recordWorkloadRebalanceFailed(&newStatus.ObservedWorkloads[i], &retryNum, err)
|
c.recordAndCountRebalancerFailed(&newStatus.ObservedWorkloads[i], &retryNum, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.recordWorkloadRebalanceSuccess(&newStatus.ObservedWorkloads[i], &successNum)
|
c.recordAndCountRebalancerSuccess(&newStatus.ObservedWorkloads[i], &successNum)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -169,29 +169,30 @@ func (c *RebalancerController) needTriggerReschedule(creationTimestamp metav1.Ti
|
||||||
return rescheduleTriggeredAt == nil || creationTimestamp.After(rescheduleTriggeredAt.Time)
|
return rescheduleTriggeredAt == nil || creationTimestamp.After(rescheduleTriggeredAt.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RebalancerController) recordWorkloadRebalanceSuccess(resource *appsv1alpha1.ObservedWorkload, successNum *int64) {
|
func (c *RebalancerController) recordAndCountRebalancerSuccess(resource *appsv1alpha1.ObservedWorkload, successNum *int64) {
|
||||||
resource.Result = appsv1alpha1.RebalanceSuccessful
|
resource.Result = appsv1alpha1.RebalanceSuccessful
|
||||||
*successNum++
|
*successNum++
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RebalancerController) recordWorkloadRebalanceFailed(resource *appsv1alpha1.ObservedWorkload, retryNum *int64, err error) {
|
func (c *RebalancerController) recordAndCountRebalancerFailed(resource *appsv1alpha1.ObservedWorkload, retryNum *int64, err error) {
|
||||||
resource.Result = appsv1alpha1.RebalanceFailed
|
|
||||||
reason := apierrors.ReasonForError(err)
|
reason := apierrors.ReasonForError(err)
|
||||||
if reason == metav1.StatusReasonNotFound {
|
if reason == metav1.StatusReasonNotFound {
|
||||||
|
resource.Result = appsv1alpha1.RebalanceFailed
|
||||||
resource.Reason = appsv1alpha1.RebalanceObjectNotFound
|
resource.Reason = appsv1alpha1.RebalanceObjectNotFound
|
||||||
} else {
|
} else {
|
||||||
*retryNum++
|
*retryNum++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RebalancerController) updateWorkloadRebalancerStatus(rebalancer *appsv1alpha1.WorkloadRebalancer, newStatus appsv1alpha1.WorkloadRebalancerStatus) error {
|
func (c *RebalancerController) updateWorkloadRebalancerStatus(ctx context.Context, rebalancer *appsv1alpha1.WorkloadRebalancer,
|
||||||
|
newStatus *appsv1alpha1.WorkloadRebalancerStatus) error {
|
||||||
rebalancerPatch := client.MergeFrom(rebalancer)
|
rebalancerPatch := client.MergeFrom(rebalancer)
|
||||||
rebalancerCopy := rebalancer.DeepCopy()
|
modifiedRebalancer := rebalancer.DeepCopy()
|
||||||
rebalancerCopy.Status = newStatus
|
modifiedRebalancer.Status = *newStatus
|
||||||
|
|
||||||
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
|
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
|
||||||
klog.V(4).Infof("Start to patch WorkloadRebalancer(%s) status", rebalancer.Name)
|
klog.V(4).Infof("Start to patch WorkloadRebalancer(%s) status", rebalancer.Name)
|
||||||
if err := c.Client.Status().Patch(context.TODO(), rebalancerCopy, rebalancerPatch); err != nil {
|
if err = c.Client.Status().Patch(ctx, modifiedRebalancer, rebalancerPatch); err != nil {
|
||||||
klog.Errorf("Failed to patch WorkloadRebalancer (%s) status, err: %+v", rebalancer.Name, err)
|
klog.Errorf("Failed to patch WorkloadRebalancer (%s) status, err: %+v", rebalancer.Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue