default interpreter aggregate status: set sub-test name and parallelize

Signed-off-by: Amir Alavi <amiralavi7@gmail.com>
This commit is contained in:
Amir Alavi 2022-12-20 14:42:43 -05:00 committed by Amir Alavi
parent 7b4c541bb8
commit 05a98cc978
No known key found for this signature in database
GPG Key ID: D6CE9B89BD598BEE
11 changed files with 442 additions and 19 deletions

View File

@ -8,6 +8,8 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
@ -31,6 +33,7 @@ func getAllDefaultAggregateStatusInterpreter() map[schema.GroupVersionKind]aggre
s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = aggregatePodStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeKind)] = aggregatePersistentVolumeStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = aggregatePersistentVolumeClaimStatus
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = aggregatePodDisruptionBudgetStatus
return s
}
@ -455,3 +458,46 @@ func aggregatePersistentVolumeClaimStatus(object *unstructured.Unstructured, agg
pvc.Status = *newStatus
return helper.ToUnstructured(pvc)
}
func aggregatePodDisruptionBudgetStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
pdb := &policyv1.PodDisruptionBudget{}
err := helper.ConvertToTypedObject(object, pdb)
if err != nil {
return nil, err
}
newStatus := &policyv1.PodDisruptionBudgetStatus{
DisruptedPods: make(map[string]metav1.Time),
}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := &policyv1.PodDisruptionBudgetStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof(
"Grab pdb(%s/%s) status from cluster(%s), desired healthy: %d, current healthy: %d, disrupted allowed: %d, expected: %d",
pdb.Namespace, pdb.Name, item.ClusterName,
temp.DesiredHealthy, temp.CurrentHealthy, temp.DisruptionsAllowed, temp.ExpectedPods,
)
newStatus.CurrentHealthy += temp.CurrentHealthy
newStatus.DesiredHealthy += temp.DesiredHealthy
newStatus.ExpectedPods += temp.ExpectedPods
newStatus.DisruptionsAllowed += temp.DisruptionsAllowed
for podName, evictionTime := range temp.DisruptedPods {
newStatus.DisruptedPods[item.ClusterName+"/"+podName] = evictionTime
}
}
if reflect.DeepEqual(pdb.Status, *newStatus) {
klog.V(3).Infof("ignore update pdb(%s/%s) status as up to date", pdb.Namespace, pdb.Name)
return object, nil
}
pdb.Status = *newStatus
return helper.ToUnstructured(pdb)
}

View File

@ -9,6 +9,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -843,3 +844,96 @@ func TestAggregatePVStatus(t *testing.T) {
assert.Equal(t, tt.expectedObj, actualObj, tt.name)
}
}
func TestAggregatedPodDisruptionBudgetStatus(t *testing.T) {
currPdbObj, _ := helper.ToUnstructured(&policyv1.PodDisruptionBudget{
Status: policyv1.PodDisruptionBudgetStatus{
CurrentHealthy: 1,
DesiredHealthy: 1,
DisruptionsAllowed: 1,
ExpectedPods: 1,
},
})
expectedPdbObj, _ := helper.ToUnstructured(&policyv1.PodDisruptionBudget{
Status: policyv1.PodDisruptionBudgetStatus{
CurrentHealthy: 2,
DesiredHealthy: 2,
DisruptionsAllowed: 2,
ExpectedPods: 2,
},
})
healthyStatusRaw, _ := helper.BuildStatusRawExtension(map[string]interface{}{
"currentHealthy": 1,
"desiredHealthy": 1,
"disruptionsAllowed": 1,
"expectedPods": 1,
})
evictionTime := metav1.Now()
unHealthyStatusRaw, _ := helper.BuildStatusRawExtension(map[string]interface{}{
"currentHealthy": 0,
"desiredHealthy": 1,
"disruptionsAllowed": 0,
"expectedPods": 1,
"disruptedPods": map[string]metav1.Time{
"pod-1234": evictionTime,
},
})
expectedUnhealthyPdbObj, _ := helper.ToUnstructured(&policyv1.PodDisruptionBudget{
Status: policyv1.PodDisruptionBudgetStatus{
CurrentHealthy: 0,
DesiredHealthy: 2,
DisruptionsAllowed: 0,
ExpectedPods: 2,
DisruptedPods: map[string]metav1.Time{
"member1/pod-1234": evictionTime,
"member2/pod-1234": evictionTime,
},
},
})
aggregateStatusItems := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: healthyStatusRaw, Applied: true},
{ClusterName: "member2", Status: healthyStatusRaw, Applied: true},
}
unhealthyAggregateStatusItems := []workv1alpha2.AggregatedStatusItem{
{ClusterName: "member1", Status: unHealthyStatusRaw, Applied: true},
{ClusterName: "member2", Status: unHealthyStatusRaw, Applied: true},
}
for _, tt := range []struct {
name string
curObj *unstructured.Unstructured
aggregatedStatusItems []workv1alpha2.AggregatedStatusItem
expectedObj *unstructured.Unstructured
}{
{
name: "update pdb status",
curObj: currPdbObj,
aggregatedStatusItems: aggregateStatusItems,
expectedObj: expectedPdbObj,
},
{
name: "update pdb status with disrupted pods",
curObj: currPdbObj,
aggregatedStatusItems: unhealthyAggregateStatusItems,
expectedObj: expectedUnhealthyPdbObj,
},
{
name: "ignore update pdb status as up to date",
curObj: expectedPdbObj,
aggregatedStatusItems: aggregateStatusItems,
expectedObj: expectedPdbObj,
},
} {
t.Run(tt.name, func(t *testing.T) {
actualObj, _ := aggregatePodDisruptionBudgetStatus(tt.curObj, tt.aggregatedStatusItems)
assert.Equal(t, tt.expectedObj, actualObj)
})
}
}

View File

@ -4,6 +4,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -22,6 +23,7 @@ func getAllDefaultHealthInterpreter() map[schema.GroupVersionKind]healthInterpre
s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = interpretServiceHealth
s[networkingv1.SchemeGroupVersion.WithKind(util.IngressKind)] = interpretIngressHealth
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = interpretPersistentVolumeClaimHealth
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = interpretPodDisruptionBudgetHealth
return s
}
@ -144,3 +146,13 @@ func interpretPersistentVolumeClaimHealth(object *unstructured.Unstructured) (bo
return pvc.Status.Phase == corev1.ClaimBound, nil
}
func interpretPodDisruptionBudgetHealth(object *unstructured.Unstructured) (bool, error) {
pdb := &policyv1.PodDisruptionBudget{}
err := helper.ConvertToTypedObject(object, pdb)
if err != nil {
return false, err
}
return pdb.Status.CurrentHealthy >= pdb.Status.DesiredHealthy, nil
}

View File

@ -568,3 +568,56 @@ func Test_interpretPersistentVolumeClaimHealth(t *testing.T) {
})
}
}
func Test_interpretPodDisruptionBudgetHealth(t *testing.T) {
tests := []struct {
name string
object *unstructured.Unstructured
want bool
}{
{
name: "podDisruptionBudget healthy",
object: &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"desiredHealthy": 2,
"currentHealthy": 3,
},
},
},
want: true,
},
{
name: "podDisruptionBudget healthy when desired healthy equals to current healthy pods",
object: &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"desiredHealthy": 2,
"currentHealthy": 2,
},
},
},
want: true,
},
{
name: "podDisruptionBudget does not allow further disruption when number of healthy pods is less than desired",
object: &unstructured.Unstructured{
Object: map[string]interface{}{
"status": map[string]interface{}{
"desiredHealthy": 2,
"currentHealthy": 1,
},
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := interpretPodDisruptionBudgetHealth(tt.object)
if got != tt.want {
t.Errorf("interpretPodDisruptionBudgetHealth() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -7,6 +7,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -26,6 +27,7 @@ func getAllDefaultReflectStatusInterpreter() map[schema.GroupVersionKind]reflect
s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = reflectJobStatus
s[appsv1.SchemeGroupVersion.WithKind(util.DaemonSetKind)] = reflectDaemonSetStatus
s[appsv1.SchemeGroupVersion.WithKind(util.StatefulSetKind)] = reflectStatefulSetStatus
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = reflectPodDisruptionBudgetStatus
return s
}
@ -189,6 +191,35 @@ func reflectStatefulSetStatus(object *unstructured.Unstructured) (*runtime.RawEx
return helper.BuildStatusRawExtension(grabStatus)
}
func reflectPodDisruptionBudgetStatus(object *unstructured.Unstructured) (*runtime.RawExtension, error) {
statusMap, exist, err := unstructured.NestedMap(object.Object, "status")
if err != nil {
klog.Errorf("Failed to get status field from %s(%s/%s), error: %v",
object.GetKind(), object.GetNamespace(), object.GetName(), err)
return nil, err
}
if !exist {
klog.Errorf("Failed to grab status from %s(%s/%s) which should have status field.",
object.GetKind(), object.GetNamespace(), object.GetName())
return nil, nil
}
pdbStatus := &policyv1.PodDisruptionBudgetStatus{}
err = helper.ConvertToTypedObject(statusMap, pdbStatus)
if err != nil {
return nil, fmt.Errorf("failed to convert PodDisruptionBudget from map[string]interface{}: %v", err)
}
grabStatus := policyv1.PodDisruptionBudgetStatus{
DisruptionsAllowed: pdbStatus.DisruptionsAllowed,
ExpectedPods: pdbStatus.ExpectedPods,
DesiredHealthy: pdbStatus.DesiredHealthy,
CurrentHealthy: pdbStatus.CurrentHealthy,
DisruptedPods: pdbStatus.DisruptedPods,
}
return helper.BuildStatusRawExtension(grabStatus)
}
func reflectWholeStatus(object *unstructured.Unstructured) (*runtime.RawExtension, error) {
statusMap, exist, err := unstructured.NestedMap(object.Object, "status")
if err != nil {

View File

@ -4,6 +4,7 @@ import (
"reflect"
"testing"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@ -72,3 +73,66 @@ func Test_getEntireStatus(t *testing.T) {
})
}
}
func Test_reflectPodDisruptionBudgetStatus(t *testing.T) {
currStatus := policyv1.PodDisruptionBudgetStatus{
CurrentHealthy: 1,
DesiredHealthy: 1,
DisruptionsAllowed: 1,
ExpectedPods: 1,
}
currStatusUnstructured, _ := helper.ToUnstructured(&policyv1.PodDisruptionBudget{Status: currStatus})
wantRawExtension, _ := helper.BuildStatusRawExtension(&currStatus)
type args struct {
object *unstructured.Unstructured
}
tests := []struct {
name string
args args
want *runtime.RawExtension
wantErr bool
}{
{
"object doesn't have status",
args{
&unstructured.Unstructured{
Object: map[string]interface{}{},
},
},
nil,
false,
},
{
"object have wrong format status",
args{
&unstructured.Unstructured{
Object: map[string]interface{}{
"status": "a string",
},
},
},
nil,
true,
},
{
"object have correct format status",
args{
currStatusUnstructured,
},
wantRawExtension,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := reflectPodDisruptionBudgetStatus(tt.args.object)
if (err != nil) != tt.wantErr {
t.Errorf("reflectPodDisruptionBudgetStatus() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("reflectPodDisruptionBudgetStatus() got = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -98,8 +98,10 @@ const (
PersistentVolumeClaimKind = "PersistentVolumeClaim"
// PersistentVolumeKind indicates the target resource is a persistentvolume
PersistentVolumeKind = "PersistentVolume"
// HorizontalPodAutoscalerKind indicated the target resource is a horizontalpodautoscaler
// HorizontalPodAutoscalerKind indicates the target resource is a horizontalpodautoscaler
HorizontalPodAutoscalerKind = "HorizontalPodAutoscaler"
// PodDisruptionBudgetKind indicates the target resource is a poddisruptionbudget
PodDisruptionBudgetKind = "PodDisruptionBudget"
// ServiceExportKind indicates the target resource is a serviceexport crd
ServiceExportKind = "ServiceExport"

View File

@ -0,0 +1,28 @@
package framework
import (
"context"
"fmt"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// CreatePodDisruptionBudget creates PodDisruptionBudget.
func CreatePodDisruptionBudget(client kubernetes.Interface, pdb *policyv1.PodDisruptionBudget) {
ginkgo.By(fmt.Sprintf("Creating PodDisruptionBudget(%s/%s)", pdb.Namespace, pdb.Name), func() {
_, err := client.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(context.TODO(), pdb, metav1.CreateOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
}
// RemovePodDisruptionBudget deletes PodDisruptionBudget.
func RemovePodDisruptionBudget(client kubernetes.Interface, namespace, name string) {
ginkgo.By(fmt.Sprintf("Removing PodDisruptionBudget(%s/%s)", namespace, name), func() {
err := client.PolicyV1().PodDisruptionBudgets(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
})
}

View File

@ -3,6 +3,7 @@ package e2e
import (
"context"
"fmt"
"math"
"reflect"
"github.com/onsi/ginkgo/v2"
@ -11,8 +12,10 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
@ -538,6 +541,72 @@ var _ = ginkgo.Describe("[resource-status collection] resource status collection
})
})
})
ginkgo.Context("PodDisruptionBudget collection testing", func() {
var pdbNamespace, pdbName string
var pdb *policyv1.PodDisruptionBudget
var deployment *appsv1.Deployment
ginkgo.BeforeEach(func() {
policyNamespace = testNamespace
policyName = podDisruptionBudgetNamePrefix + rand.String(RandomStrLength)
pdbNamespace = testNamespace
pdbName = policyName
deploymentName := policyName
deployment = testhelper.NewDeployment(pdbNamespace, deploymentName)
pdb = testhelper.NewPodDisruptionBudget(pdbNamespace, pdbName, intstr.FromString("50%"))
policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
{
APIVersion: pdb.APIVersion,
Kind: pdb.Kind,
Name: pdb.Name,
},
{
APIVersion: deployment.APIVersion,
Kind: deployment.Kind,
Name: deployment.Name,
},
}, policyv1alpha1.Placement{
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
ClusterNames: framework.ClusterNames(),
},
})
})
ginkgo.BeforeEach(func() {
framework.CreateDeployment(kubeClient, deployment)
framework.CreatePodDisruptionBudget(kubeClient, pdb)
ginkgo.DeferCleanup(func() {
framework.RemovePodDisruptionBudget(kubeClient, pdbNamespace, pdbName)
})
})
ginkgo.It("pdb status collection testing", func() {
ginkgo.By("check whether the pdb status can be correctly collected", func() {
klog.Infof("Waiting for PodDisruptionBudget(%s/%s) collecting correctly status", pdbNamespace, pdbName)
maxUnavailable := 0.5 // 50%
numOfClusters := int32(len(framework.Clusters()))
wantedExpectedPods := *deployment.Spec.Replicas * numOfClusters
wantedDisruptionAllowed := int32(math.Ceil(float64(*deployment.Spec.Replicas)*maxUnavailable)) * numOfClusters
gomega.Eventually(func(g gomega.Gomega) (bool, error) {
currentPodDisruptionBudget, err := kubeClient.PolicyV1().PodDisruptionBudgets(pdbNamespace).Get(context.TODO(), pdbName, metav1.GetOptions{})
g.Expect(err).ShouldNot(gomega.HaveOccurred())
klog.Infof("PodDisruptionBudget(%s/%s) Disruption Allowed: %d, wanted: %d", pdbNamespace, pdbName, currentPodDisruptionBudget.Status.DisruptionsAllowed, wantedDisruptionAllowed)
klog.Infof("PodDisruptionBudget(%s/%s) Expected Pods: %d, wanted: %d", pdbNamespace, pdbName, currentPodDisruptionBudget.Status.ExpectedPods, wantedExpectedPods)
if currentPodDisruptionBudget.Status.DisruptionsAllowed == wantedDisruptionAllowed &&
currentPodDisruptionBudget.Status.ExpectedPods == wantedExpectedPods {
return true, nil
}
return false, nil
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
})
})
})
})
var _ = framework.SerialDescribe("workload status synchronization testing", func() {

View File

@ -55,6 +55,7 @@ const (
clusterRoleNamePrefix = "clusterrole-"
roleBindingNamePrefix = "rolebinding-"
clusterRoleBindingNamePrefix = "clusterrolebinding-"
podDisruptionBudgetNamePrefix = "poddisruptionbudget-"
updateDeploymentReplicas = 2
updateStatefulSetReplicas = 2

View File

@ -7,6 +7,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -742,3 +743,25 @@ func NewIngress(namespace, name string) *networkingv1.Ingress {
},
}}}}}}}}}
}
// NewPodDisruptionBudget will build a new PodDisruptionBudget object.
func NewPodDisruptionBudget(namespace, name string, maxUnAvailable intstr.IntOrString) *policyv1.PodDisruptionBudget {
podLabels := map[string]string{"app": "nginx"}
return &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
APIVersion: "policy/v1",
Kind: "PodDisruptionBudget",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: policyv1.PodDisruptionBudgetSpec{
MaxUnavailable: &maxUnAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: podLabels,
},
},
}
}