pub support get total replicas from annotations (#1135)

Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>

Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
This commit is contained in:
berg 2022-12-13 11:19:06 +08:00 committed by GitHub
parent cdb3b02b2e
commit a0659ec836
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 188 additions and 47 deletions

View File

@ -35,6 +35,9 @@ const (
PubUpdateOperation PubOperation = "UPDATE"
PubDeleteOperation PubOperation = "DELETE"
PubEvictOperation PubOperation = "EVICT"
// PubProtectTotalReplicas indicates the pub protected total replicas, rather than workload.spec.replicas.
// and must be used with pub.spec.selector.
PubProtectTotalReplicas = "pub.kruise.io/protect-total-replicas"
// Marked the pod will not be pub-protected, solving the scenario of force pod deletion
PodPubNoProtectionAnnotation = "pub.kruise.io/no-protect"
)

View File

@ -19,6 +19,7 @@ package pubcontrol
import (
"context"
"reflect"
"strconv"
"strings"
appspub "github.com/openkruise/kruise/apis/apps/pub"
@ -92,7 +93,6 @@ func (c *commonControl) GetPodsForPub(pub *policyv1alpha1.PodUnavailableBudget)
if err = c.List(context.TODO(), podList, listOptions, utilclient.DisableDeepCopy); err != nil {
return nil, 0, err
}
matchedPods := make([]*corev1.Pod, 0, len(podList.Items))
for i := range podList.Items {
pod := &podList.Items[i]
@ -104,6 +104,10 @@ func (c *commonControl) GetPodsForPub(pub *policyv1alpha1.PodUnavailableBudget)
if err != nil {
return nil, 0, err
}
if expectedCount == 0 && pub.Annotations[policyv1alpha1.PubProtectTotalReplicas] != "" {
expectedCount, _ := strconv.ParseInt(pub.Annotations[policyv1alpha1.PubProtectTotalReplicas], 10, 32)
return matchedPods, int32(expectedCount), nil
}
return matchedPods, expectedCount, nil
}

View File

@ -78,6 +78,9 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p
// if there is no matching PodUnavailableBudget, just return true
} else if pub == nil {
return true, "", nil
// if desired available == 0, then allow all request
} else if pub.Status.DesiredAvailable == 0 {
return true, "", nil
} else if !isNeedPubProtection(pub, operation) {
klog.V(3).Infof("pod(%s/%s) operation(%s) is not in pub(%s) protection", pod.Namespace, pod.Name, pub.Name)
return true, "", nil

View File

@ -68,6 +68,7 @@ var (
UnavailablePods: map[string]metav1.Time{},
DisruptedPods: map[string]metav1.Time{},
UnavailableAllowed: 0,
DesiredAvailable: 1,
},
}

View File

@ -145,6 +145,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
}); err != nil {
return err
}
@ -159,6 +162,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
}); err != nil {
return err
}
@ -173,6 +179,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
}); err != nil {
return err
}
@ -187,6 +196,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
}); err != nil {
return err
}
@ -301,7 +313,6 @@ func (r *ReconcilePodUnavailableBudget) syncPodUnavailableBudget(pub *policyv1al
} else {
pubClone = pub.DeepCopy()
}
informerCached := &policyv1alpha1.PodUnavailableBudget{}
if err := r.Get(context.TODO(), types.NamespacedName{Namespace: pub.Namespace,
Name: pub.Name}, informerCached); err == nil {

View File

@ -56,8 +56,9 @@ var (
Kind: "PodUnavailableBudget",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pub-test",
Namespace: "default",
Name: "pub-test",
Annotations: map[string]string{},
},
Spec: policyv1alpha1.PodUnavailableBudgetSpec{
Selector: &metav1.LabelSelector{
@ -262,7 +263,7 @@ func TestPubReconcile(t *testing.T) {
},
},
{
name: "select matched deployment, selector and maxUnavailable 30%",
name: "select matched deployment(Deletion), selector and maxUnavailable 30%",
getPods: func() []*corev1.Pod {
var matchedPods []*corev1.Pod
for i := 0; int32(i) < *deploymentDemo.Spec.Replicas; i++ {
@ -273,7 +274,10 @@ func TestPubReconcile(t *testing.T) {
return matchedPods
},
getDeployment: func() *apps.Deployment {
return deploymentDemo.DeepCopy()
obj := deploymentDemo.DeepCopy()
t := metav1.Now()
obj.DeletionTimestamp = &t
return obj
},
getReplicaSet: func() *apps.ReplicaSet {
return replicaSetDemo.DeepCopy()
@ -285,10 +289,10 @@ func TestPubReconcile(t *testing.T) {
},
expectPubStatus: func() policyv1alpha1.PodUnavailableBudgetStatus {
return policyv1alpha1.PodUnavailableBudgetStatus{
UnavailableAllowed: 3,
UnavailableAllowed: 0,
CurrentAvailable: *deploymentDemo.Spec.Replicas,
DesiredAvailable: 7,
TotalReplicas: *deploymentDemo.Spec.Replicas,
DesiredAvailable: 0,
TotalReplicas: 0,
}
},
},
@ -353,7 +357,7 @@ func TestPubReconcile(t *testing.T) {
},
expectPubStatus: func() policyv1alpha1.PodUnavailableBudgetStatus {
return policyv1alpha1.PodUnavailableBudgetStatus{
UnavailableAllowed: *deploymentDemo.Spec.Replicas,
UnavailableAllowed: 0,
CurrentAvailable: *deploymentDemo.Spec.Replicas,
DesiredAvailable: 0,
TotalReplicas: *deploymentDemo.Spec.Replicas,
@ -387,7 +391,7 @@ func TestPubReconcile(t *testing.T) {
},
expectPubStatus: func() policyv1alpha1.PodUnavailableBudgetStatus {
return policyv1alpha1.PodUnavailableBudgetStatus{
UnavailableAllowed: *deploymentDemo.Spec.Replicas,
UnavailableAllowed: 0,
CurrentAvailable: *deploymentDemo.Spec.Replicas,
DesiredAvailable: 0,
TotalReplicas: *deploymentDemo.Spec.Replicas,
@ -754,6 +758,66 @@ func TestPubReconcile(t *testing.T) {
return *status
},
},
{
name: "test select matched deployment, 10 UnavailablePods(5 ready), 10 DisruptionPods(5 delay) and 5 deletion",
getPods: func() []*corev1.Pod {
var matchedPods []*corev1.Pod
for i := 0; i < 100; i++ {
pod := podDemo.DeepCopy()
pod.OwnerReferences = nil
pod.Name = fmt.Sprintf("%s-%d", pod.Name, i)
if i >= 20 && i < 25 {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
}
matchedPods = append(matchedPods, pod)
}
return matchedPods
},
getDeployment: func() *apps.Deployment {
object := deploymentDemo.DeepCopy()
object.Spec.Replicas = utilpointer.Int32Ptr(100)
return object
},
getReplicaSet: func() *apps.ReplicaSet {
object := replicaSetDemo.DeepCopy()
object.Spec.Replicas = utilpointer.Int32Ptr(100)
return object
},
getPub: func() *policyv1alpha1.PodUnavailableBudget {
pub := pubDemo.DeepCopy()
pub.Annotations[policyv1alpha1.PubProtectTotalReplicas] = "50"
for i := 0; i < 10; i++ {
if i >= 0 && i < 5 {
pub.Status.UnavailablePods[fmt.Sprintf("test-pod-%d", i)] = metav1.Time{Time: time.Now().Add(-10 * time.Second)}
} else {
pub.Status.UnavailablePods[fmt.Sprintf("test-pod-%d", i)] = metav1.Now()
}
}
for i := 10; i < 20; i++ {
if i >= 10 && i < 15 {
pub.Status.DisruptedPods[fmt.Sprintf("test-pod-%d", i)] = metav1.Time{Time: time.Now().Add(-125 * time.Second)}
} else {
pub.Status.DisruptedPods[fmt.Sprintf("test-pod-%d", i)] = metav1.Now()
}
}
return pub
},
expectPubStatus: func() policyv1alpha1.PodUnavailableBudgetStatus {
status := pubDemo.Status.DeepCopy()
for i := 5; i < 10; i++ {
status.UnavailablePods[fmt.Sprintf("test-pod-%d", i)] = metav1.Now()
}
for i := 15; i < 20; i++ {
status.DisruptedPods[fmt.Sprintf("test-pod-%d", i)] = metav1.Now()
}
status.TotalReplicas = 50
status.DesiredAvailable = 35
status.CurrentAvailable = 85
status.UnavailableAllowed = 50
return *status
},
},
}
for _, cs := range cases {
@ -774,7 +838,6 @@ func TestPubReconcile(t *testing.T) {
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
pubControl: pubcontrol.NewPubControl(fakeClient),
}
_, err := reconciler.syncPodUnavailableBudget(pub)
if err != nil {
t.Fatalf("sync PodUnavailableBudget failed: %s", err.Error())
@ -784,7 +847,7 @@ func TestPubReconcile(t *testing.T) {
t.Fatalf("getLatestPub failed: %s", err.Error())
}
if !isPubStatusEqual(cs.expectPubStatus(), newPub.Status) {
t.Fatalf("expect pub status(%v) but get(%v)", cs.expectPubStatus(), newPub.Status)
t.Fatalf("expect pub status(%s) but get(%s)", util.DumpJSON(cs.expectPubStatus()), util.DumpJSON(newPub.Status))
}
_ = util.GlobalCache.Delete(pub)
})

View File

@ -202,6 +202,7 @@ func (e *SetEnqueueRequestForPUB) Update(evt event.UpdateEvent, q workqueue.Rate
// Delete implements EventHandler
func (e *SetEnqueueRequestForPUB) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
e.addSetRequest(evt.Object, q)
}
// Generic implements EventHandler
@ -240,23 +241,21 @@ func (e *SetEnqueueRequestForPUB) addSetRequest(object client.Object, q workqueu
targetRef.Name, namespace = obj.Name, obj.Namespace
temLabels = obj.Spec.Template.Labels
}
default:
return
}
// fetch matched pub
pubList := &policyv1alpha1.PodUnavailableBudgetList{}
if err := e.mgr.GetClient().List(context.TODO(), pubList, &client.ListOptions{Namespace: namespace}); err != nil {
klog.Errorf("SetEnqueueRequestForPUB list pub failed: %s", err.Error())
return
}
var matchedPubs []policyv1alpha1.PodUnavailableBudget
var matched policyv1alpha1.PodUnavailableBudget
for _, pub := range pubList.Items {
// if targetReference isn't nil, priority to take effect
if pub.Spec.TargetReference != nil {
// belongs the same workload
if pubcontrol.IsReferenceEqual(targetRef, pub.Spec.TargetReference) {
matchedPubs = append(matchedPubs, pub)
matched = pub
break
}
} else {
// This error is irreversible, so continue
@ -268,18 +267,17 @@ func (e *SetEnqueueRequestForPUB) addSetRequest(object client.Object, q workqueu
if labelSelector.Empty() || !labelSelector.Matches(labels.Set(temLabels)) {
continue
}
matchedPubs = append(matchedPubs, pub)
matched = pub
break
}
}
for _, pub := range matchedPubs {
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: pub.Name,
Namespace: pub.Namespace,
},
})
klog.V(3).Infof("workload(%s/%s) replicas changed, and reconcile pub(%s/%s)",
namespace, targetRef.Name, pub.Namespace, pub.Name)
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: matched.Name,
Namespace: matched.Namespace,
},
})
klog.V(3).Infof("workload(%s/%s) changed, and reconcile pub(%s/%s)",
namespace, targetRef.Name, matched.Namespace, matched.Name)
}

View File

@ -122,9 +122,10 @@ func (r *ControllerFinder) GetExpectedScaleForPods(pods []*corev1.Pod) (int32, e
workload, err := r.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, pod.Namespace, ref.Name, ref.UID)
if err != nil && !errors.IsNotFound(err) {
return 0, err
}
if workload != nil && workload.Metadata.DeletionTimestamp.IsZero() {
controllerScale[workload.UID] = workload.Scale
} else if workload != nil && workload.Metadata.DeletionTimestamp.IsZero() {
controllerScale[ref.UID] = workload.Scale
} else {
controllerScale[ref.UID] = 0
}
}

View File

@ -44,7 +44,7 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
if err != nil {
return nil, -1, err
}
if rs == nil {
if rs == nil || !rs.DeletionTimestamp.IsZero() {
return nil, 0, nil
}
workloadReplicas = *rs.Spec.Replicas
@ -54,7 +54,7 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
obj, err := r.GetScaleAndSelectorForRef(apiVersion, kind, ns, name, "")
if err != nil {
return nil, -1, err
} else if obj == nil {
} else if obj == nil || !obj.Metadata.DeletionTimestamp.IsZero() {
return nil, 0, nil
}
workloadReplicas = obj.Scale
@ -64,7 +64,7 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
obj, err := r.GetScaleAndSelectorForRef(apiVersion, kind, ns, name, "")
if err != nil {
return nil, -1, err
} else if obj == nil {
} else if obj == nil || !obj.Metadata.DeletionTimestamp.IsZero() {
return nil, 0, nil
}
workloadReplicas = obj.Scale

View File

@ -125,15 +125,5 @@ func (p *PodCreateHandler) podUnavailableBudgetValidatingPod(ctx context.Context
if checkPod.Annotations[pubcontrol.PodRelatedPubAnnotation] == "" {
return true, "", nil
}
// Get the workload corresponding to the pod, if it has been deleted then it is not protected
if ref := metav1.GetControllerOf(checkPod); ref != nil {
workload, err := p.finders.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, checkPod.Namespace, ref.Name, ref.UID)
if err != nil {
return false, "", err
} else if workload == nil || !workload.Metadata.DeletionTimestamp.IsZero() {
return true, "", nil
}
}
return pubcontrol.PodUnavailableBudgetValidatePod(p.Client, p.pubControl, checkPod, operation, dryRun)
}

View File

@ -624,6 +624,43 @@ func TestValidateEvictPodForPub(t *testing.T) {
return pubStatus
},
},
{
name: "evict pod, allow",
eviction: func() *policy.Eviction {
return &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod-0",
Namespace: "default",
},
DeleteOptions: &metav1.DeleteOptions{},
}
},
newPod: func() *corev1.Pod {
podIn := podDemo.DeepCopy()
return podIn
},
pub: func() *policyv1alpha1.PodUnavailableBudget {
pub := pubDemo.DeepCopy()
pub.Status = policyv1alpha1.PodUnavailableBudgetStatus{
TotalReplicas: 0,
DesiredAvailable: 0,
CurrentAvailable: 10,
UnavailableAllowed: 0,
}
return pub
},
subresource: "eviction",
expectAllow: true,
expectPubStatus: func() *policyv1alpha1.PodUnavailableBudgetStatus {
pubStatus := &policyv1alpha1.PodUnavailableBudgetStatus{
TotalReplicas: 0,
DesiredAvailable: 0,
CurrentAvailable: 10,
UnavailableAllowed: 0,
}
return pubStatus
},
},
}
for _, cs := range cases {

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"reflect"
"strconv"
"strings"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
@ -88,11 +89,18 @@ func (h *PodUnavailableBudgetCreateUpdateHandler) validatingPodUnavailableBudget
if operationsValue, ok := obj.Annotations[policyv1alpha1.PubProtectOperationAnnotation]; ok {
operations := strings.Split(operationsValue, ",")
for _, operation := range operations {
if operation != string(admissionv1.Update) && operation != string(admissionv1.Delete) {
if operation != string(policyv1alpha1.PubUpdateOperation) && operation != string(policyv1alpha1.PubDeleteOperation) &&
operation != string(policyv1alpha1.PubEvictOperation) {
allErrs = append(allErrs, field.InternalError(field.NewPath("metadata"), fmt.Errorf("annotation[%s] is invalid", policyv1alpha1.PubProtectOperationAnnotation)))
}
}
}
if replicasValue, ok := obj.Annotations[policyv1alpha1.PubProtectTotalReplicas]; ok {
if _, err := strconv.ParseInt(replicasValue, 10, 32); err != nil {
allErrs = append(allErrs, field.InternalError(field.NewPath("metadata"), fmt.Errorf("annotation[%s] is invalid", policyv1alpha1.PubProtectTotalReplicas)))
}
}
//validate Pub.Spec
allErrs = append(allErrs, validatePodUnavailableBudgetSpec(obj, field.NewPath("spec"))...)
// when operation is update, validating whether old and new pub conflict

View File

@ -153,7 +153,29 @@ func TestValidatingPub(t *testing.T) {
pub := pubDemo.DeepCopy()
pub.Spec.Selector = nil
pub.Spec.MinAvailable = nil
pub.Annotations[policyv1alpha1.PubProtectOperationAnnotation] = "DELETE"
pub.Annotations[policyv1alpha1.PubProtectOperationAnnotation] = string(policyv1alpha1.PubEvictOperation + "," + policyv1alpha1.PubDeleteOperation)
return pub
},
expectErrList: 0,
},
{
name: "invalid pub feature-gate annotation",
pub: func() *policyv1alpha1.PodUnavailableBudget {
pub := pubDemo.DeepCopy()
pub.Spec.Selector = nil
pub.Spec.MinAvailable = nil
pub.Annotations[policyv1alpha1.PubProtectTotalReplicas] = "%%"
return pub
},
expectErrList: 1,
},
{
name: "valid pub feature-gate annotation",
pub: func() *policyv1alpha1.PodUnavailableBudget {
pub := pubDemo.DeepCopy()
pub.Spec.Selector = nil
pub.Spec.MinAvailable = nil
pub.Annotations[policyv1alpha1.PubProtectTotalReplicas] = "1000"
return pub
},
expectErrList: 0,