From 5a223683d62eadc749f9a10fe32193be9bcf79a4 Mon Sep 17 00:00:00 2001 From: Slawomir Chylek Date: Thu, 22 Feb 2018 15:59:30 +0100 Subject: [PATCH 1/2] First VPA full stack e2e tests. --- .../e2e/autoscaling_utils.go | 526 ++++++++++++++++++ vertical-pod-autoscaler/e2e/common.go | 5 + vertical-pod-autoscaler/e2e/full_vpa.go | 128 +++++ 3 files changed, 659 insertions(+) create mode 100644 vertical-pod-autoscaler/e2e/autoscaling_utils.go create mode 100644 vertical-pod-autoscaler/e2e/full_vpa.go diff --git a/vertical-pod-autoscaler/e2e/autoscaling_utils.go b/vertical-pod-autoscaler/e2e/autoscaling_utils.go new file mode 100644 index 0000000000..dc184fb1dc --- /dev/null +++ b/vertical-pod-autoscaler/e2e/autoscaling_utils.go @@ -0,0 +1,526 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This is a fork of k8s.io/kubernetes/test/e2e/common/autoscaling_utils.go + +package autoscaling + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + autoscalingv1 "k8s.io/api/autoscaling/v1" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/test/e2e/framework" + testutils "k8s.io/kubernetes/test/utils" + + . "github.com/onsi/ginkgo" + imageutils "k8s.io/kubernetes/test/utils/image" +) + +const ( + dynamicConsumptionTimeInSeconds = 30 + staticConsumptionTimeInSeconds = 3600 + dynamicRequestSizeInMillicores = 20 + dynamicRequestSizeInMegabytes = 100 + dynamicRequestSizeCustomMetric = 10 + port = 80 + targetPort = 8080 + timeoutRC = 120 * time.Second + startServiceTimeout = time.Minute + startServiceInterval = 5 * time.Second + rcIsNil = "ERROR: replicationController = nil" + deploymentIsNil = "ERROR: deployment = nil" + rsIsNil = "ERROR: replicaset = nil" + invalidKind = "ERROR: invalid workload kind for resource consumer" + customMetricName = "QPS" + serviceInitializationTimeout = 2 * time.Minute + serviceInitializationInterval = 15 * time.Second +) + +var ( + resourceConsumerImage = imageutils.GetE2EImage(imageutils.ResourceConsumer) + resourceConsumerControllerImage = imageutils.GetE2EImage(imageutils.ResourceController) +) + +var ( + KindRC = schema.GroupVersionKind{Version: "v1", Kind: "ReplicationController"} + KindDeployment = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "Deployment"} + KindReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "ReplicaSet"} + subresource = "scale" +) + +/* +ResourceConsumer is a tool for testing. It helps create specified usage of CPU or memory (Warning: memory not supported) +typical use case: +rc.ConsumeCPU(600) +// ... check your assumption here +rc.ConsumeCPU(300) +// ... check your assumption here +*/ +type ResourceConsumer struct { + name string + controllerName string + kind schema.GroupVersionKind + nsName string + clientSet clientset.Interface + internalClientset *internalclientset.Clientset + cpu chan int + mem chan int + customMetric chan int + stopCPU chan int + stopMem chan int + stopCustomMetric chan int + stopWaitGroup sync.WaitGroup + consumptionTimeInSeconds int + sleepTime time.Duration + requestSizeInMillicores int + requestSizeInMegabytes int + requestSizeCustomMetric int +} + +func GetResourceConsumerImage() string { + return resourceConsumerImage +} + +func NewDynamicResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuRequest, memRequest resource.Quantity, clientset clientset.Interface, internalClientset *internalclientset.Clientset) *ResourceConsumer { + return newResourceConsumer(name, nsName, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds, + dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuRequest, memRequest, clientset, internalClientset) +} + +// TODO this still defaults to replication controller +func NewStaticResourceConsumer(name, nsName string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuRequest, memRequest resource.Quantity, clientset clientset.Interface, internalClientset *internalclientset.Clientset) *ResourceConsumer { + return newResourceConsumer(name, nsName, KindRC, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, staticConsumptionTimeInSeconds, + initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuRequest, memRequest, clientset, internalClientset) +} + +/* +NewResourceConsumer creates new ResourceConsumer +initCPUTotal argument is in millicores +initMemoryTotal argument is in megabytes +memLimit argument is in megabytes, memLimit is a maximum amount of memory that can be consumed by a single pod +cpuLimit argument is in millicores, cpuLimit is a maximum amount of cpu that can be consumed by a single pod +*/ +func newResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, consumptionTimeInSeconds, requestSizeInMillicores, + requestSizeInMegabytes int, requestSizeCustomMetric int, cpuRequest, memRequest resource.Quantity, clientset clientset.Interface, internalClientset *internalclientset.Clientset) *ResourceConsumer { + + runServiceAndWorkloadForResourceConsumer(clientset, internalClientset, nsName, name, kind, replicas, cpuRequest, memRequest) + rc := &ResourceConsumer{ + name: name, + controllerName: name + "-ctrl", + kind: kind, + nsName: nsName, + clientSet: clientset, + internalClientset: internalClientset, + cpu: make(chan int), + mem: make(chan int), + customMetric: make(chan int), + stopCPU: make(chan int), + stopMem: make(chan int), + stopCustomMetric: make(chan int), + consumptionTimeInSeconds: consumptionTimeInSeconds, + sleepTime: time.Duration(consumptionTimeInSeconds) * time.Second, + requestSizeInMillicores: requestSizeInMillicores, + requestSizeInMegabytes: requestSizeInMegabytes, + requestSizeCustomMetric: requestSizeCustomMetric, + } + + go rc.makeConsumeCPURequests() + rc.ConsumeCPU(initCPUTotal) + + go rc.makeConsumeMemRequests() + rc.ConsumeMem(initMemoryTotal) + go rc.makeConsumeCustomMetric() + rc.ConsumeCustomMetric(initCustomMetric) + return rc +} + +// ConsumeCPU consumes given number of CPU +func (rc *ResourceConsumer) ConsumeCPU(millicores int) { + framework.Logf("RC %s: consume %v millicores in total", rc.name, millicores) + rc.cpu <- millicores +} + +// ConsumeMem consumes given number of Mem +func (rc *ResourceConsumer) ConsumeMem(megabytes int) { + framework.Logf("RC %s: consume %v MB in total", rc.name, megabytes) + rc.mem <- megabytes +} + +// ConsumeMem consumes given number of custom metric +func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) { + framework.Logf("RC %s: consume custom metric %v in total", rc.name, amount) + rc.customMetric <- amount +} + +func (rc *ResourceConsumer) makeConsumeCPURequests() { + defer GinkgoRecover() + rc.stopWaitGroup.Add(1) + defer rc.stopWaitGroup.Done() + sleepTime := time.Duration(0) + millicores := 0 + for { + select { + case millicores = <-rc.cpu: + framework.Logf("RC %s: setting consumption to %v millicores in total", rc.name, millicores) + case <-time.After(sleepTime): + framework.Logf("RC %s: sending request to consume %d millicores", rc.name, millicores) + rc.sendConsumeCPURequest(millicores) + sleepTime = rc.sleepTime + case <-rc.stopCPU: + framework.Logf("RC %s: stopping CPU consumer", rc.name) + return + } + } +} + +func (rc *ResourceConsumer) makeConsumeMemRequests() { + defer GinkgoRecover() + rc.stopWaitGroup.Add(1) + defer rc.stopWaitGroup.Done() + sleepTime := time.Duration(0) + megabytes := 0 + for { + select { + case megabytes = <-rc.mem: + framework.Logf("RC %s: setting consumption to %v MB in total", rc.name, megabytes) + case <-time.After(sleepTime): + framework.Logf("RC %s: sending request to consume %d MB", rc.name, megabytes) + rc.sendConsumeMemRequest(megabytes) + sleepTime = rc.sleepTime + case <-rc.stopMem: + framework.Logf("RC %s: stopping mem consumer", rc.name) + return + } + } +} + +func (rc *ResourceConsumer) makeConsumeCustomMetric() { + defer GinkgoRecover() + rc.stopWaitGroup.Add(1) + defer rc.stopWaitGroup.Done() + sleepTime := time.Duration(0) + delta := 0 + for { + select { + case delta := <-rc.customMetric: + framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta) + case <-time.After(sleepTime): + framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName) + rc.sendConsumeCustomMetric(delta) + sleepTime = rc.sleepTime + case <-rc.stopCustomMetric: + framework.Logf("RC %s: stopping metric consumer", rc.name) + return + } + } +} + +func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) { + ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) + defer cancel() + + err := wait.PollImmediate(serviceInitializationInterval, serviceInitializationTimeout, func() (bool, error) { + proxyRequest, err := framework.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) + framework.ExpectNoError(err) + req := proxyRequest.Namespace(rc.nsName). + Context(ctx). + Name(rc.controllerName). + Suffix("ConsumeCPU"). + Param("millicores", strconv.Itoa(millicores)). + Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). + Param("requestSizeMillicores", strconv.Itoa(rc.requestSizeInMillicores)) + framework.Logf("ConsumeCPU URL: %v", *req.URL()) + _, err = req.DoRaw() + if err != nil { + framework.Logf("ConsumeCPU failure: %v", err) + return false, nil + } + return true, nil + }) + + framework.ExpectNoError(err) +} + +// sendConsumeMemRequest sends POST request for memory consumption +func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) { + ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) + defer cancel() + + err := wait.PollImmediate(serviceInitializationInterval, serviceInitializationTimeout, func() (bool, error) { + proxyRequest, err := framework.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) + framework.ExpectNoError(err) + req := proxyRequest.Namespace(rc.nsName). + Context(ctx). + Name(rc.controllerName). + Suffix("ConsumeMem"). + Param("megabytes", strconv.Itoa(megabytes)). + Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). + Param("requestSizeMegabytes", strconv.Itoa(rc.requestSizeInMegabytes)) + framework.Logf("ConsumeMem URL: %v", *req.URL()) + _, err = req.DoRaw() + if err != nil { + framework.Logf("ConsumeMem failure: %v", err) + return false, nil + } + return true, nil + }) + + framework.ExpectNoError(err) +} + +// sendConsumeCustomMetric sends POST request for custom metric consumption +func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) { + ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout) + defer cancel() + + err := wait.PollImmediate(serviceInitializationInterval, serviceInitializationTimeout, func() (bool, error) { + proxyRequest, err := framework.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post()) + framework.ExpectNoError(err) + req := proxyRequest.Namespace(rc.nsName). + Context(ctx). + Name(rc.controllerName). + Suffix("BumpMetric"). + Param("metric", customMetricName). + Param("delta", strconv.Itoa(delta)). + Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)). + Param("requestSizeMetrics", strconv.Itoa(rc.requestSizeCustomMetric)) + framework.Logf("ConsumeCustomMetric URL: %v", *req.URL()) + _, err = req.DoRaw() + if err != nil { + framework.Logf("ConsumeCustomMetric failure: %v", err) + return false, nil + } + return true, nil + }) + framework.ExpectNoError(err) +} + +func (rc *ResourceConsumer) GetReplicas() int { + switch rc.kind { + case KindRC: + replicationController, err := rc.clientSet.CoreV1().ReplicationControllers(rc.nsName).Get(rc.name, metav1.GetOptions{}) + framework.ExpectNoError(err) + if replicationController == nil { + framework.Failf(rcIsNil) + } + return int(replicationController.Status.ReadyReplicas) + case KindDeployment: + deployment, err := rc.clientSet.ExtensionsV1beta1().Deployments(rc.nsName).Get(rc.name, metav1.GetOptions{}) + framework.ExpectNoError(err) + if deployment == nil { + framework.Failf(deploymentIsNil) + } + return int(deployment.Status.ReadyReplicas) + case KindReplicaSet: + rs, err := rc.clientSet.ExtensionsV1beta1().ReplicaSets(rc.nsName).Get(rc.name, metav1.GetOptions{}) + framework.ExpectNoError(err) + if rs == nil { + framework.Failf(rsIsNil) + } + return int(rs.Status.ReadyReplicas) + default: + framework.Failf(invalidKind) + } + return 0 +} + +func (rc *ResourceConsumer) WaitForReplicas(desiredReplicas int, duration time.Duration) { + interval := 20 * time.Second + err := wait.PollImmediate(interval, duration, func() (bool, error) { + replicas := rc.GetReplicas() + framework.Logf("waiting for %d replicas (current: %d)", desiredReplicas, replicas) + return replicas == desiredReplicas, nil // Expected number of replicas found. Exit. + }) + framework.ExpectNoErrorWithOffset(1, err, "timeout waiting %v for %d replicas", duration, desiredReplicas) +} + +func (rc *ResourceConsumer) EnsureDesiredReplicas(desiredReplicas int, duration time.Duration) { + interval := 10 * time.Second + err := wait.PollImmediate(interval, duration, func() (bool, error) { + replicas := rc.GetReplicas() + framework.Logf("expecting there to be %d replicas (are: %d)", desiredReplicas, replicas) + if replicas != desiredReplicas { + return false, fmt.Errorf("number of replicas changed unexpectedly") + } else { + return false, nil // Expected number of replicas found. Continue polling until timeout. + } + }) + // The call above always returns an error, but if it is timeout, it's OK (condition satisfied all the time). + if err == wait.ErrWaitTimeout { + framework.Logf("Number of replicas was stable over %v", duration) + return + } + framework.ExpectNoErrorWithOffset(1, err) +} + +// Pause stops background goroutines responsible for consuming resources. +func (rc *ResourceConsumer) Pause() { + By(fmt.Sprintf("HPA pausing RC %s", rc.name)) + rc.stopCPU <- 0 + rc.stopMem <- 0 + rc.stopCustomMetric <- 0 + rc.stopWaitGroup.Wait() +} + +// Pause starts background goroutines responsible for consuming resources. +func (rc *ResourceConsumer) Resume() { + By(fmt.Sprintf("HPA resuming RC %s", rc.name)) + go rc.makeConsumeCPURequests() + go rc.makeConsumeMemRequests() + go rc.makeConsumeCustomMetric() +} + +func (rc *ResourceConsumer) CleanUp() { + By(fmt.Sprintf("Removing consuming RC %s", rc.name)) + close(rc.stopCPU) + close(rc.stopMem) + close(rc.stopCustomMetric) + rc.stopWaitGroup.Wait() + // Wait some time to ensure all child goroutines are finished. + time.Sleep(10 * time.Second) + kind := rc.kind.GroupKind() + framework.ExpectNoError(framework.DeleteResourceAndPods(rc.clientSet, rc.internalClientset, kind, rc.nsName, rc.name)) + framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.name, nil)) + framework.ExpectNoError(framework.DeleteResourceAndPods(rc.clientSet, rc.internalClientset, api.Kind("ReplicationController"), rc.nsName, rc.controllerName)) + framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(rc.controllerName, nil)) +} + +func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimit, memLimit resource.Quantity) { + By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas)) + _, err := c.CoreV1().Services(ns).Create(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Port: port, + TargetPort: intstr.FromInt(targetPort), + }}, + + Selector: map[string]string{ + "name": name, + }, + }, + }) + framework.ExpectNoError(err) + + rcConfig := testutils.RCConfig{ + Client: c, + InternalClient: internalClient, + Image: resourceConsumerImage, + Name: name, + Namespace: ns, + Timeout: timeoutRC, + Replicas: replicas, + CpuRequest: cpuLimit.MilliValue(), + MemRequest: memLimit.Value(), + } + + switch kind { + case KindRC: + framework.ExpectNoError(framework.RunRC(rcConfig)) + break + case KindDeployment: + dpConfig := testutils.DeploymentConfig{ + RCConfig: rcConfig, + } + framework.ExpectNoError(framework.RunDeployment(dpConfig)) + break + case KindReplicaSet: + rsConfig := testutils.ReplicaSetConfig{ + RCConfig: rcConfig, + } + By(fmt.Sprintf("creating replicaset %s in namespace %s", rsConfig.Name, rsConfig.Namespace)) + framework.ExpectNoError(framework.RunReplicaSet(rsConfig)) + break + default: + framework.Failf(invalidKind) + } + + By(fmt.Sprintf("Running controller")) + controllerName := name + "-ctrl" + _, err = c.CoreV1().Services(ns).Create(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: controllerName, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Port: port, + TargetPort: intstr.FromInt(targetPort), + }}, + + Selector: map[string]string{ + "name": controllerName, + }, + }, + }) + framework.ExpectNoError(err) + + dnsClusterFirst := v1.DNSClusterFirst + controllerRcConfig := testutils.RCConfig{ + Client: c, + Image: resourceConsumerControllerImage, + Name: controllerName, + Namespace: ns, + Timeout: timeoutRC, + Replicas: 1, + Command: []string{"/controller", "--consumer-service-name=" + name, "--consumer-service-namespace=" + ns, "--consumer-port=80"}, + DNSPolicy: &dnsClusterFirst, + } + framework.ExpectNoError(framework.RunRC(controllerRcConfig)) + + // Wait for endpoints to propagate for the controller service. + framework.ExpectNoError(framework.WaitForServiceEndpointsNum( + c, ns, controllerName, 1, startServiceInterval, startServiceTimeout)) +} + +func CreateCPUHorizontalPodAutoscaler(rc *ResourceConsumer, cpu, minReplicas, maxRepl int32) *autoscalingv1.HorizontalPodAutoscaler { + hpa := &autoscalingv1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: rc.name, + Namespace: rc.nsName, + }, + Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ + APIVersion: rc.kind.GroupVersion().String(), + Kind: rc.kind.Kind, + Name: rc.name, + }, + MinReplicas: &minReplicas, + MaxReplicas: maxRepl, + TargetCPUUtilizationPercentage: &cpu, + }, + } + hpa, errHPA := rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Create(hpa) + framework.ExpectNoError(errHPA) + return hpa +} + +func DeleteHorizontalPodAutoscaler(rc *ResourceConsumer, autoscalerName string) { + rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Delete(autoscalerName, nil) +} diff --git a/vertical-pod-autoscaler/e2e/common.go b/vertical-pod-autoscaler/e2e/common.go index 7bb270192c..ce9b0a0197 100644 --- a/vertical-pod-autoscaler/e2e/common.go +++ b/vertical-pod-autoscaler/e2e/common.go @@ -34,6 +34,7 @@ const ( recommenderComponent = "recommender" updateComponent = "updater" admissionControllerComponent = "admission-controller" + fullVpaSuite = "full-vpa" pollInterval = framework.Poll pollTimeout = 5 * time.Minute ) @@ -54,6 +55,10 @@ func admissionControllerE2eDescribe(name string, body func()) bool { return e2eDescribe(admissionControllerComponent, name, body) } +func fullVpaE2eDescribe(name string, body func()) bool { + return e2eDescribe(fullVpaSuite, name, body) +} + func hamsterDeployment(f *framework.Framework, cpuQuantity, memoryQuantity resource.Quantity) *extensions.Deployment { d := framework.NewDeployment("hamster-deployment", 3, map[string]string{"app": "hamster"}, "hamster", "gcr.io/google_containers/ubuntu-slim:0.1", extensions.RollingUpdateDeploymentStrategyType) d.ObjectMeta.Namespace = f.Namespace.Name diff --git a/vertical-pod-autoscaler/e2e/full_vpa.go b/vertical-pod-autoscaler/e2e/full_vpa.go new file mode 100644 index 0000000000..7dc50bb34b --- /dev/null +++ b/vertical-pod-autoscaler/e2e/full_vpa.go @@ -0,0 +1,128 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package autoscaling + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned" + e2e_common "k8s.io/kubernetes/test/e2e/common" + "k8s.io/kubernetes/test/e2e/framework" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" +) + +const ( + minimalCPU = "50m" +) + +var _ = fullVpaE2eDescribe("Pods under VPA", func() { + f := framework.NewDefaultFramework("vertical-pod-autoscaling") + + var rc *ResourceConsumer + replicas := 3 + + ginkgo.BeforeEach(func() { + ns := f.Namespace.Name + ginkgo.By("Setting up a hamster deployment") + rc = NewDynamicResourceConsumer("hamster", ns, e2e_common.KindDeployment, + replicas, + 1, /*initCPUTotal*/ + 10, /*initMemoryTotal*/ + 1, /*initCustomMetric*/ + parseQuantityOrDie("100m"), /*cpuRequest*/ + parseQuantityOrDie("10Mi"), /*memRequest*/ + f.ClientSet, + f.InternalClientset) + + ginkgo.By("Setting up a VPA CRD") + config, err := framework.LoadConfig() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + vpaCRD := newVPA(f, "hamster-vpa", &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "hamster", + }, + }) + + vpaClientSet := vpa_clientset.NewForConfigOrDie(config) + vpaClient := vpaClientSet.PocV1alpha1() + _, err = vpaClient.VerticalPodAutoscalers(ns).Create(vpaCRD) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + }) + + ginkgo.It("stabilize at minimum CPU if doing nothing", func() { + waitForSpecificCPURequestInPods(f, metav1.ListOptions{LabelSelector: "name=hamster"}, parseQuantityOrDie(minimalCPU)) + }) + + ginkgo.It("have cpu requests growing with usage", func() { + rc.ConsumeCPU(600 * replicas) + waitForCPURequestAboveThresholdInPods(f, metav1.ListOptions{LabelSelector: "name=hamster"}, parseQuantityOrDie("500m")) + }) +}) + +func waitForPodsMatch(f *framework.Framework, listOptions metav1.ListOptions, matcher func(pod apiv1.Pod) bool) error { + return wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) { + + ns := f.Namespace.Name + c := f.ClientSet + + podList, err := c.CoreV1().Pods(ns).List(listOptions) + if err != nil { + return false, err + } + + for _, pod := range podList.Items { + if !matcher(pod) { + return false, nil + } + } + return true, nil + + }) +} + +func waitForSpecificCPURequestInPods(f *framework.Framework, listOptions metav1.ListOptions, cpu resource.Quantity) error { + err := waitForPodsMatch(f, listOptions, + func(pod apiv1.Pod) bool { + return pod.Spec.Containers[0].Resources.Requests[apiv1.ResourceCPU] != cpu + }) + + if err != nil { + return fmt.Errorf("error waiting for cpu request equal %v for pods: %+v", cpu, listOptions) + } + return nil +} + +func waitForCPURequestAboveThresholdInPods(f *framework.Framework, listOptions metav1.ListOptions, cpuThreshold resource.Quantity) error { + err := waitForPodsMatch(f, listOptions, + func(pod apiv1.Pod) bool { + cpuRequest := pod.Spec.Containers[0].Resources.Requests[apiv1.ResourceCPU] + return cpuRequest.MilliValue() > cpuThreshold.MilliValue() + }) + + if err != nil { + return fmt.Errorf("error waiting for cpu request above %v for pods: %+v", cpuThreshold, listOptions) + } + return nil +} From 218854c60938cbab075704dfe2be1d2e8b5f402d Mon Sep 17 00:00:00 2001 From: Slawomir Chylek Date: Fri, 23 Feb 2018 10:56:58 +0100 Subject: [PATCH 2/2] Golint fixes for file forked from k8s. --- .../e2e/autoscaling_utils.go | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/vertical-pod-autoscaler/e2e/autoscaling_utils.go b/vertical-pod-autoscaler/e2e/autoscaling_utils.go index dc184fb1dc..d456a6205b 100644 --- a/vertical-pod-autoscaler/e2e/autoscaling_utils.go +++ b/vertical-pod-autoscaler/e2e/autoscaling_utils.go @@ -38,7 +38,7 @@ import ( "k8s.io/kubernetes/test/e2e/framework" testutils "k8s.io/kubernetes/test/utils" - . "github.com/onsi/ginkgo" + ginkgo "github.com/onsi/ginkgo" imageutils "k8s.io/kubernetes/test/utils/image" ) @@ -68,8 +68,11 @@ var ( ) var ( - KindRC = schema.GroupVersionKind{Version: "v1", Kind: "ReplicationController"} + // KindRC var + KindRC = schema.GroupVersionKind{Version: "v1", Kind: "ReplicationController"} + // KindDeployment var KindDeployment = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "Deployment"} + // KindReplicaSet var KindReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "ReplicaSet"} subresource = "scale" ) @@ -103,16 +106,18 @@ type ResourceConsumer struct { requestSizeCustomMetric int } +// GetResourceConsumerImage func func GetResourceConsumerImage() string { return resourceConsumerImage } +// NewDynamicResourceConsumer func func NewDynamicResourceConsumer(name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuRequest, memRequest resource.Quantity, clientset clientset.Interface, internalClientset *internalclientset.Clientset) *ResourceConsumer { return newResourceConsumer(name, nsName, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds, dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuRequest, memRequest, clientset, internalClientset) } -// TODO this still defaults to replication controller +// NewStaticResourceConsumer TODO this still defaults to replication controller func NewStaticResourceConsumer(name, nsName string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuRequest, memRequest resource.Quantity, clientset clientset.Interface, internalClientset *internalclientset.Clientset) *ResourceConsumer { return newResourceConsumer(name, nsName, KindRC, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, staticConsumptionTimeInSeconds, initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuRequest, memRequest, clientset, internalClientset) @@ -171,14 +176,14 @@ func (rc *ResourceConsumer) ConsumeMem(megabytes int) { rc.mem <- megabytes } -// ConsumeMem consumes given number of custom metric +// ConsumeCustomMetric consumes given number of custom metric func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) { framework.Logf("RC %s: consume custom metric %v in total", rc.name, amount) rc.customMetric <- amount } func (rc *ResourceConsumer) makeConsumeCPURequests() { - defer GinkgoRecover() + defer ginkgo.GinkgoRecover() rc.stopWaitGroup.Add(1) defer rc.stopWaitGroup.Done() sleepTime := time.Duration(0) @@ -199,7 +204,7 @@ func (rc *ResourceConsumer) makeConsumeCPURequests() { } func (rc *ResourceConsumer) makeConsumeMemRequests() { - defer GinkgoRecover() + defer ginkgo.GinkgoRecover() rc.stopWaitGroup.Add(1) defer rc.stopWaitGroup.Done() sleepTime := time.Duration(0) @@ -220,7 +225,7 @@ func (rc *ResourceConsumer) makeConsumeMemRequests() { } func (rc *ResourceConsumer) makeConsumeCustomMetric() { - defer GinkgoRecover() + defer ginkgo.GinkgoRecover() rc.stopWaitGroup.Add(1) defer rc.stopWaitGroup.Done() sleepTime := time.Duration(0) @@ -320,6 +325,7 @@ func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) { framework.ExpectNoError(err) } +// GetReplicas func func (rc *ResourceConsumer) GetReplicas() int { switch rc.kind { case KindRC: @@ -349,6 +355,7 @@ func (rc *ResourceConsumer) GetReplicas() int { return 0 } +// WaitForReplicas func func (rc *ResourceConsumer) WaitForReplicas(desiredReplicas int, duration time.Duration) { interval := 20 * time.Second err := wait.PollImmediate(interval, duration, func() (bool, error) { @@ -359,6 +366,7 @@ func (rc *ResourceConsumer) WaitForReplicas(desiredReplicas int, duration time.D framework.ExpectNoErrorWithOffset(1, err, "timeout waiting %v for %d replicas", duration, desiredReplicas) } +// EnsureDesiredReplicas func func (rc *ResourceConsumer) EnsureDesiredReplicas(desiredReplicas int, duration time.Duration) { interval := 10 * time.Second err := wait.PollImmediate(interval, duration, func() (bool, error) { @@ -366,9 +374,8 @@ func (rc *ResourceConsumer) EnsureDesiredReplicas(desiredReplicas int, duration framework.Logf("expecting there to be %d replicas (are: %d)", desiredReplicas, replicas) if replicas != desiredReplicas { return false, fmt.Errorf("number of replicas changed unexpectedly") - } else { - return false, nil // Expected number of replicas found. Continue polling until timeout. } + return false, nil // Expected number of replicas found. Continue polling until timeout. }) // The call above always returns an error, but if it is timeout, it's OK (condition satisfied all the time). if err == wait.ErrWaitTimeout { @@ -380,23 +387,24 @@ func (rc *ResourceConsumer) EnsureDesiredReplicas(desiredReplicas int, duration // Pause stops background goroutines responsible for consuming resources. func (rc *ResourceConsumer) Pause() { - By(fmt.Sprintf("HPA pausing RC %s", rc.name)) + ginkgo.By(fmt.Sprintf("HPA pausing RC %s", rc.name)) rc.stopCPU <- 0 rc.stopMem <- 0 rc.stopCustomMetric <- 0 rc.stopWaitGroup.Wait() } -// Pause starts background goroutines responsible for consuming resources. +// Resume starts background goroutines responsible for consuming resources. func (rc *ResourceConsumer) Resume() { - By(fmt.Sprintf("HPA resuming RC %s", rc.name)) + ginkgo.By(fmt.Sprintf("HPA resuming RC %s", rc.name)) go rc.makeConsumeCPURequests() go rc.makeConsumeMemRequests() go rc.makeConsumeCustomMetric() } +// CleanUp func func (rc *ResourceConsumer) CleanUp() { - By(fmt.Sprintf("Removing consuming RC %s", rc.name)) + ginkgo.By(fmt.Sprintf("Removing consuming RC %s", rc.name)) close(rc.stopCPU) close(rc.stopMem) close(rc.stopCustomMetric) @@ -411,7 +419,7 @@ func (rc *ResourceConsumer) CleanUp() { } func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalClient internalclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimit, memLimit resource.Quantity) { - By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas)) + ginkgo.By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas)) _, err := c.CoreV1().Services(ns).Create(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -455,14 +463,14 @@ func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalCli rsConfig := testutils.ReplicaSetConfig{ RCConfig: rcConfig, } - By(fmt.Sprintf("creating replicaset %s in namespace %s", rsConfig.Name, rsConfig.Namespace)) + ginkgo.By(fmt.Sprintf("creating replicaset %s in namespace %s", rsConfig.Name, rsConfig.Namespace)) framework.ExpectNoError(framework.RunReplicaSet(rsConfig)) break default: framework.Failf(invalidKind) } - By(fmt.Sprintf("Running controller")) + ginkgo.By(fmt.Sprintf("Running controller")) controllerName := name + "-ctrl" _, err = c.CoreV1().Services(ns).Create(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -499,6 +507,7 @@ func runServiceAndWorkloadForResourceConsumer(c clientset.Interface, internalCli c, ns, controllerName, 1, startServiceInterval, startServiceTimeout)) } +// CreateCPUHorizontalPodAutoscaler func func CreateCPUHorizontalPodAutoscaler(rc *ResourceConsumer, cpu, minReplicas, maxRepl int32) *autoscalingv1.HorizontalPodAutoscaler { hpa := &autoscalingv1.HorizontalPodAutoscaler{ ObjectMeta: metav1.ObjectMeta{ @@ -521,6 +530,7 @@ func CreateCPUHorizontalPodAutoscaler(rc *ResourceConsumer, cpu, minReplicas, ma return hpa } +// DeleteHorizontalPodAutoscaler func func DeleteHorizontalPodAutoscaler(rc *ResourceConsumer, autoscalerName string) { rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Delete(autoscalerName, nil) }