diff --git a/pkg/controller/workloadspread/workloadspread_controller.go b/pkg/controller/workloadspread/workloadspread_controller.go index b544b90a2..044aea360 100644 --- a/pkg/controller/workloadspread/workloadspread_controller.go +++ b/pkg/controller/workloadspread/workloadspread_controller.go @@ -21,6 +21,8 @@ import ( "encoding/json" "flag" "fmt" + "math" + "strings" "time" appsv1 "k8s.io/api/apps/v1" @@ -77,6 +79,9 @@ const ( // FakeSubsetName is a fake subset name for such pods that do not match any subsets FakeSubsetName = "kruise.io/workloadspread-fake-subset-name" + + // IgnorePatchExistingPodsAnnotation ignore ws.Spec.Subsets[x].Patch for existing pods + IgnorePatchExistingPodsAnnotation = "workloadspread.kruise.io/ignore-patch-existing-pods-metadata" ) var ( @@ -301,7 +306,7 @@ func (r *ReconcileWorkloadSpread) syncWorkloadSpread(ws *appsv1alpha1.WorkloadSp } // group Pods by subset - podMap, err := r.groupPod(ws, pods) + podMap, err := r.groupPod(ws, pods, workloadReplicas) if err != nil { return err } @@ -344,15 +349,30 @@ func getInjectWorkloadSpreadFromPod(pod *corev1.Pod) *wsutil.InjectWorkloadSprea } // groupPod returns a map, the key is the name of subset and the value represents the Pods of the corresponding subset. -func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod) (map[string][]*corev1.Pod, error) { +func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (map[string][]*corev1.Pod, error) { podMap := make(map[string][]*corev1.Pod, len(ws.Spec.Subsets)+1) podMap[FakeSubsetName] = []*corev1.Pod{} + subsetMissingReplicas := make(map[string]int) for _, subset := range ws.Spec.Subsets { podMap[subset.Name] = []*corev1.Pod{} + subsetMissingReplicas[subset.Name], _ = intstr.GetScaledValueFromIntOrPercent( + intstr.ValueOrDefault(subset.MaxReplicas, intstr.FromInt(math.MaxInt32)), int(replicas), true) + } + + // count managed pods for each subset + for i := range pods { + injectWS := getInjectWorkloadSpreadFromPod(pods[i]) + if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" { + continue + } + if _, exist := podMap[injectWS.Subset]; !exist { + continue + } + subsetMissingReplicas[injectWS.Subset]-- } for i := range pods { - subsetName, err := r.getSuitableSubsetNameForPod(ws, pods[i]) + subsetName, err := r.getSuitableSubsetNameForPod(ws, pods[i], subsetMissingReplicas) if err != nil { return nil, err } @@ -369,11 +389,11 @@ func (r *ReconcileWorkloadSpread) groupPod(ws *appsv1alpha1.WorkloadSpread, pods } // getSuitableSubsetNameForPod will return (FakeSubsetName, nil) if not found suitable subset for pod -func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) (string, error) { +func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (string, error) { injectWS := getInjectWorkloadSpreadFromPod(pod) - if injectWS == nil || injectWS.Name != ws.Name { + if injectWS == nil || injectWS.Name != ws.Name || injectWS.Subset == "" { // process the pods that were created before workloadSpread - matchedSubset, err := r.getSuitableSubsetForOldPod(ws, pod) + matchedSubset, err := r.getAndUpdateSuitableSubsetName(ws, pod, subsetMissingReplicas) if err != nil { return "", err } else if matchedSubset == nil { @@ -386,7 +406,7 @@ func (r *ReconcileWorkloadSpread) getSuitableSubsetNameForPod(ws *appsv1alpha1.W // getSuitableSubsetForOldPod returns a suitable subset for the pod which was created before workloadSpread. // getSuitableSubsetForOldPod will return (nil, nil) if there is no suitable subset for the pod. -func (r *ReconcileWorkloadSpread) getSuitableSubsetForOldPod(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) (*appsv1alpha1.WorkloadSpreadSubset, error) { +func (r *ReconcileWorkloadSpread) getAndUpdateSuitableSubsetName(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod, subsetMissingReplicas map[string]int) (*appsv1alpha1.WorkloadSpreadSubset, error) { if len(pod.Spec.NodeName) == 0 { return nil, nil } @@ -411,10 +431,16 @@ func (r *ReconcileWorkloadSpread) getSuitableSubsetForOldPod(ws *appsv1alpha1.Wo klog.Errorf("unexpected error occurred when matching pod (%s/%s) with subset, please check requiredSelectorTerm field of subset (%s) in WorkloadSpread (%s/%s), err: %s", pod.Namespace, pod.Name, subset.Name, ws.Namespace, ws.Name, err.Error()) } + quotaScore := int64(0) + // we prefer the subset that still has room for more replicas + if subsetMissingReplicas[subset.Name] > 0 { + quotaScore = int64(1) + } + finalScore := preferredScore*10 + quotaScore // select the most favorite subsets for the pod by subset.PreferredNodeSelectorTerms - if matched && preferredScore > maxPreferredScore { + if matched && finalScore > maxPreferredScore { favoriteSubset = subset - maxPreferredScore = preferredScore + maxPreferredScore = finalScore } } @@ -422,6 +448,7 @@ func (r *ReconcileWorkloadSpread) getSuitableSubsetForOldPod(ws *appsv1alpha1.Wo if err := r.patchFavoriteSubsetMetadataToPod(pod, ws, favoriteSubset); err != nil { return nil, err } + subsetMissingReplicas[favoriteSubset.Name]-- return favoriteSubset, nil } @@ -433,7 +460,7 @@ func (r *ReconcileWorkloadSpread) getSuitableSubsetForOldPod(ws *appsv1alpha1.Wo func (r *ReconcileWorkloadSpread) patchFavoriteSubsetMetadataToPod(pod *corev1.Pod, ws *appsv1alpha1.WorkloadSpread, favoriteSubset *appsv1alpha1.WorkloadSpreadSubset) error { patchMetadata := make(map[string]interface{}) // decode favoriteSubset.patch.raw and add their labels and annotations to the patch - if favoriteSubset.Patch.Raw != nil { + if favoriteSubset.Patch.Raw != nil && !strings.EqualFold(ws.Annotations[IgnorePatchExistingPodsAnnotation], "true") { patchField := map[string]interface{}{} if err := json.Unmarshal(favoriteSubset.Patch.Raw, &patchField); err == nil { if metadata, ok := patchField["metadata"].(map[string]interface{}); ok && metadata != nil { diff --git a/pkg/controller/workloadspread/workloadspread_controller_test.go b/pkg/controller/workloadspread/workloadspread_controller_test.go index 71bec9a17..b9254ba04 100644 --- a/pkg/controller/workloadspread/workloadspread_controller_test.go +++ b/pkg/controller/workloadspread/workloadspread_controller_test.go @@ -1483,7 +1483,7 @@ func TestUpdateSubsetSequence(t *testing.T) { } r := ReconcileWorkloadSpread{} - subsetsPods, err := r.groupPod(workloadSpread, pods) + subsetsPods, err := r.groupPod(workloadSpread, pods, 5) if err != nil { t.Fatalf("error group pods") }