Merge pull request #561 from schylek/updater

APImock replaced with a real API in the Updater. Recommender_mock removed too.
This commit is contained in:
Marcin Wielgus 2018-01-19 15:50:39 +01:00 committed by GitHub
commit a7c94accef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 124 additions and 632 deletions

View File

@ -1,180 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package apimock contains temporary definitions of Vertical Pod Autoscaler related objects - to be replaced with real implementation
// Definitions based on VPA design doc : https://github.com/kubernetes/community/pull/338
package apimock
import (
"math/rand"
"time"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
// VerticalPodAutoscaler Represents Vertical Pod Autoscaler configuration - to be replaced by real implementation
type VerticalPodAutoscaler struct {
// Specification
Spec Spec
// Current state of VPA
Status Status
}
// Spec holds Vertical Pod Autoscaler configuration
type Spec struct {
// Defines which pods this Autoscaler should watch and update
Target Target
// Policy for pod updates
UpdatePolicy UpdatePolicy
// Policy for container resources updates
ResourcesPolicy ResourcesPolicy
}
// Status holds current Vertical Pod Autoscaler state
type Status struct {
// Recommended resources allocation
Recommendation *Recommendation
}
// Target Specifies pods to be managed by Vertical Pod Autoscaler
type Target struct {
// label query over pods. The string will be in the same format as the query-param syntax.
// More info about label selectors: http://kubernetes.io/docs/user-guide/labels#label-selectors
// +optional
Selector string `json:"selector,omitempty" protobuf:"bytes,2,opt,name=selector"`
}
// UpdatePolicy defines policy for Pod updates
type UpdatePolicy struct {
// Mode for update policy
Mode Mode
}
// Mode of update policy
type Mode struct {
}
// ResourcesPolicy represents Resources allocation policy
type ResourcesPolicy struct {
// Resources allocation policy for containers
Containers []ContainerPolicy
}
// Recommendation represents resources allocation recommended by Vertical Pod Autoscaler
type Recommendation struct {
// Recommended resources allocation for containers
Containers []ContainerRecommendation
}
// ContainerPolicy hold resources allocation policy for single container
type ContainerPolicy struct {
// Name of the container
Name string
// Memory allocation policy
ResourcePolicy map[apiv1.ResourceName]Policy
}
// Policy holds resource allocation policy
type Policy struct {
// Minimal resource quantity
Min resource.Quantity
// Maximal resource quantity
Max resource.Quantity
}
// ContainerRecommendation holds resource allocation recommendation for container
type ContainerRecommendation struct {
// Name of the container
Name string
// Resources allocation recommended
Resources map[apiv1.ResourceName]resource.Quantity
}
// VerticalPodAutoscalerLister provides list of all configured Vertical Pod Autoscalers
type VerticalPodAutoscalerLister interface {
// List returns all configured Vertical Pod Autoscalers
List() (ret []*VerticalPodAutoscaler, err error)
}
type vpaLister struct{}
// List Mock implementation of Vertical Pod Autoscaler Lister - to be replaced with real implementation
func (lister *vpaLister) List() (ret []*VerticalPodAutoscaler, err error) {
return []*VerticalPodAutoscaler{fake()}, nil
}
// NewVpaLister returns mock VerticalPodAutoscalerLister - to be replaced with real implementation
func NewVpaLister(_ interface{}) VerticalPodAutoscalerLister {
return &vpaLister{}
}
// RecommenderAPI defines api returning Vertical Pod Autoscaler recommendations for pods
type RecommenderAPI interface {
// GetRecommendation returns Vertical Pod Autoscaler recommendation for given pod
GetRecommendation(spec *apiv1.PodSpec) (*Recommendation, error)
}
type recommenderAPIImpl struct {
}
// NewRecommenderAPI return mock RecommenderAPI - to be replaced with real implementation
func NewRecommenderAPI() RecommenderAPI {
return &recommenderAPIImpl{}
}
// GetRecommendation Returns random recommendation of resources increase / decrease by 0 - 100 %
// To be replaced with real implementation of recommender request
func (rec *recommenderAPIImpl) GetRecommendation(spec *apiv1.PodSpec) (*Recommendation, error) {
rand.Seed(time.Now().UTC().UnixNano())
result := make([]ContainerRecommendation, len(spec.Containers))
for i, podContainer := range spec.Containers {
// scale memory and cpu by random factor [0, 2]
diffFactor := 2 * rand.Float64()
memory := podContainer.Resources.Requests.Memory().DeepCopy()
memory.Set(int64(float64(memory.Value()) * diffFactor))
diffFactor = 2 * rand.Float64()
cpu := podContainer.Resources.Requests.Cpu().DeepCopy()
cpu.Set(int64(float64(cpu.Value()) * diffFactor))
result[i] = ContainerRecommendation{
Name: podContainer.Name,
Resources: map[apiv1.ResourceName]resource.Quantity{
apiv1.ResourceCPU: cpu, apiv1.ResourceMemory: memory}}
}
return &Recommendation{Containers: result}, nil
}
func fake() *VerticalPodAutoscaler {
minCpu, _ := resource.ParseQuantity("1")
maxCpu, _ := resource.ParseQuantity("4")
minMem, _ := resource.ParseQuantity("10M")
maxMem, _ := resource.ParseQuantity("5G")
return &VerticalPodAutoscaler{
Spec: Spec{
Target: Target{Selector: "app = redis"},
UpdatePolicy: UpdatePolicy{Mode: Mode{}},
ResourcesPolicy: ResourcesPolicy{Containers: []ContainerPolicy{{
Name: "slave",
ResourcePolicy: map[apiv1.ResourceName]Policy{
apiv1.ResourceCPU: {minCpu, maxCpu},
apiv1.ResourceMemory: {minMem, maxMem}},
}}}},
}
}

View File

@ -18,13 +18,15 @@ package test
import ( import (
"fmt" "fmt"
"log"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/vertical-pod-autoscaler/apimock" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
vpa_lister "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/listers/poc.autoscaling.k8s.io/v1alpha1"
v1 "k8s.io/client-go/listers/core/v1" v1 "k8s.io/client-go/listers/core/v1"
) )
@ -88,52 +90,72 @@ func BuildTestContainer(containerName, cpu, mem string) apiv1.Container {
} }
// BuildTestPolicy creates ResourcesPolicy with specified constraints // BuildTestPolicy creates ResourcesPolicy with specified constraints
func BuildTestPolicy(containerName, minCpu, maxCpu, minMemory, maxMemory string) *apimock.ResourcesPolicy { func BuildTestPolicy(containerName, minCpu, maxCpu, minMemory, maxMemory string) *vpa_types.PodResourcePolicy {
minCpuVal, _ := resource.ParseQuantity(minCpu) minCpuVal, _ := resource.ParseQuantity(minCpu)
maxCpuVal, _ := resource.ParseQuantity(maxCpu) maxCpuVal, _ := resource.ParseQuantity(maxCpu)
minMemVal, _ := resource.ParseQuantity(minMemory) minMemVal, _ := resource.ParseQuantity(minMemory)
maxMemVal, _ := resource.ParseQuantity(maxMemory) maxMemVal, _ := resource.ParseQuantity(maxMemory)
return &apimock.ResourcesPolicy{Containers: []apimock.ContainerPolicy{{ return &vpa_types.PodResourcePolicy{ContainerPolicies: []vpa_types.ContainerResourcePolicy{{
Name: containerName, Name: containerName,
ResourcePolicy: map[apiv1.ResourceName]apimock.Policy{ MinAllowed: apiv1.ResourceList{
apiv1.ResourceMemory: { apiv1.ResourceMemory: minMemVal,
Min: minMemVal, apiv1.ResourceCPU: minCpuVal,
Max: maxMemVal}, },
apiv1.ResourceCPU: { MaxAllowed: apiv1.ResourceList{
Min: minCpuVal, apiv1.ResourceMemory: maxMemVal,
Max: maxCpuVal}}, apiv1.ResourceCPU: maxCpuVal,
},
}, },
}} }}
} }
// BuildTestVerticalPodAutoscaler creates VerticalPodAutoscaler withs specified policy constraints // BuildTestVerticalPodAutoscaler creates VerticalPodAutoscaler withs specified policy constraints
func BuildTestVerticalPodAutoscaler(containerName, minCpu, maxCpu, minMemory, maxMemory string, selector string) *apimock.VerticalPodAutoscaler { func BuildTestVerticalPodAutoscaler(containerName, targetCpu, minCpu, maxCpu, targetMemory, minMemory, maxMemory string, selector string) *vpa_types.VerticalPodAutoscaler {
resourcesPolicy := BuildTestPolicy(containerName, minCpu, maxCpu, minMemory, maxMemory) resourcesPolicy := BuildTestPolicy(containerName, minCpu, maxCpu, minMemory, maxMemory)
return &apimock.VerticalPodAutoscaler{ labelSelector, err := metav1.ParseToLabelSelector(selector)
Spec: apimock.Spec{ if err != nil {
Target: apimock.Target{Selector: selector}, log.Fatal(err)
UpdatePolicy: apimock.UpdatePolicy{Mode: apimock.Mode{}}, }
ResourcesPolicy: *resourcesPolicy, targetCpuVal, _ := resource.ParseQuantity(targetCpu)
targetMemoryVal, _ := resource.ParseQuantity(targetMemory)
return &vpa_types.VerticalPodAutoscaler{
Spec: vpa_types.VerticalPodAutoscalerSpec{
Selector: labelSelector,
UpdatePolicy: vpa_types.PodUpdatePolicy{},
ResourcePolicy: *resourcesPolicy,
},
Status: vpa_types.VerticalPodAutoscalerStatus{
Recommendation: vpa_types.RecommendedPodResources{
ContainerRecommendations: []vpa_types.RecommendedContainerResources{
{
Name: containerName,
Target: apiv1.ResourceList{
apiv1.ResourceMemory: targetMemoryVal,
apiv1.ResourceCPU: targetCpuVal,
},
},
},
},
}, },
} }
} }
// Recommendation creates Recommendation with specified container name and resources // Recommendation creates Recommendation with specified container name and resources
func Recommendation(containerName, cpu, mem string) *apimock.Recommendation { func Recommendation(containerName, cpu, mem string) *vpa_types.RecommendedPodResources {
result := &apimock.Recommendation{Containers: []apimock.ContainerRecommendation{ result := &vpa_types.RecommendedPodResources{ContainerRecommendations: []vpa_types.RecommendedContainerResources{
{Name: containerName, {Name: containerName,
Resources: make(map[apiv1.ResourceName]resource.Quantity, 0)}}, Target: make(map[apiv1.ResourceName]resource.Quantity, 0)}},
} }
if len(cpu) > 0 { if len(cpu) > 0 {
cpuVal, _ := resource.ParseQuantity(cpu) cpuVal, _ := resource.ParseQuantity(cpu)
result.Containers[0].Resources[apiv1.ResourceCPU] = cpuVal result.ContainerRecommendations[0].Target[apiv1.ResourceCPU] = cpuVal
} }
if len(mem) > 0 { if len(mem) > 0 {
memVal, _ := resource.ParseQuantity(mem) memVal, _ := resource.ParseQuantity(mem)
result.Containers[0].Resources[apiv1.ResourceMemory] = memVal result.ContainerRecommendations[0].Target[apiv1.ResourceMemory] = memVal
} }
return result return result
@ -145,11 +167,11 @@ type RecommenderAPIMock struct {
} }
// GetRecommendation is mock implementation of RecommenderAPI.GetRecommendation // GetRecommendation is mock implementation of RecommenderAPI.GetRecommendation
func (m *RecommenderAPIMock) GetRecommendation(spec *apiv1.PodSpec) (*apimock.Recommendation, error) { func (m *RecommenderAPIMock) GetRecommendation(spec *apiv1.PodSpec) (*vpa_types.RecommendedPodResources, error) {
args := m.Called(spec) args := m.Called(spec)
var returnArg *apimock.Recommendation var returnArg *vpa_types.RecommendedPodResources
if args.Get(0) != nil { if args.Get(0) != nil {
returnArg = args.Get(0).(*apimock.Recommendation) returnArg = args.Get(0).(*vpa_types.RecommendedPodResources)
} }
return returnArg, args.Error(1) return returnArg, args.Error(1)
} }
@ -160,11 +182,11 @@ type RecommenderMock struct {
} }
// Get is a mock implementation of Recommender.Get // Get is a mock implementation of Recommender.Get
func (m *RecommenderMock) Get(spec *apiv1.PodSpec) (*apimock.Recommendation, error) { func (m *RecommenderMock) Get(spec *apiv1.PodSpec) (*vpa_types.RecommendedPodResources, error) {
args := m.Called(spec) args := m.Called(spec)
var returnArg *apimock.Recommendation var returnArg *vpa_types.RecommendedPodResources
if args.Get(0) != nil { if args.Get(0) != nil {
returnArg = args.Get(0).(*apimock.Recommendation) returnArg = args.Get(0).(*vpa_types.RecommendedPodResources)
} }
return returnArg, args.Error(1) return returnArg, args.Error(1)
} }
@ -217,11 +239,16 @@ type VerticalPodAutoscalerListerMock struct {
} }
// List is a mock implementation of VerticalPodAutoscalerLister.List // List is a mock implementation of VerticalPodAutoscalerLister.List
func (m *VerticalPodAutoscalerListerMock) List() (ret []*apimock.VerticalPodAutoscaler, err error) { func (m *VerticalPodAutoscalerListerMock) List(selector labels.Selector) (ret []*vpa_types.VerticalPodAutoscaler, err error) {
args := m.Called() args := m.Called()
var returnArg []*apimock.VerticalPodAutoscaler var returnArg []*vpa_types.VerticalPodAutoscaler
if args.Get(0) != nil { if args.Get(0) != nil {
returnArg = args.Get(0).([]*apimock.VerticalPodAutoscaler) returnArg = args.Get(0).([]*vpa_types.VerticalPodAutoscaler)
} }
return returnArg, args.Error(1) return returnArg, args.Error(1)
} }
// VerticalPodAutoscalers is not implemented for this mock
func (m *VerticalPodAutoscalerListerMock) VerticalPodAutoscalers(namespace string) vpa_lister.VerticalPodAutoscalerNamespaceLister {
return nil
}

View File

@ -1,84 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package recommender
import (
"crypto/sha1"
"fmt"
"runtime"
"time"
"k8s.io/autoscaler/vertical-pod-autoscaler/apimock"
apiv1 "k8s.io/api/core/v1"
hashutil "k8s.io/kubernetes/pkg/util/hash"
)
// CachingRecommender provides VPA recommendations for pods.
// VPA responses are cached.
type CachingRecommender interface {
// Get returns VPA recommendation for given pod
Get(spec *apiv1.PodSpec) (*apimock.Recommendation, error)
}
type cachingRecommenderImpl struct {
api apimock.RecommenderAPI
cache *TTLCache
}
// NewCachingRecommender creates CachingRecommender with given cache TTL
func NewCachingRecommender(ttl time.Duration, api apimock.RecommenderAPI) CachingRecommender {
ca := NewTTLCache(ttl)
ca.StartCacheGC(ttl)
result := &cachingRecommenderImpl{api: api, cache: ca}
// We need to stop background cacheGC worker if cachingRecommenderImpl gets destryed.
// If we forget this, background go routine will forever run and hold a reference to TTLCache object.
runtime.SetFinalizer(result, stopChacheGC)
return result
}
// Get returns VPA recommendation for the given pod. If recommendation is not in cache, sends request to RecommenderAPI
func (c *cachingRecommenderImpl) Get(spec *apiv1.PodSpec) (*apimock.Recommendation, error) {
cacheKey := getCacheKey(spec)
if cacheKey != nil {
if cached := c.cache.Get(cacheKey); cached != nil {
return cached.(*apimock.Recommendation), nil
}
}
response, err := c.api.GetRecommendation(spec)
if err != nil {
return nil, fmt.Errorf("error fetching recommendation %v", err)
}
if response != nil && cacheKey != nil {
c.cache.Set(cacheKey, response)
}
return response, nil
}
func getCacheKey(spec *apiv1.PodSpec) *string {
podTemplateSpecHasher := sha1.New()
hashutil.DeepHashObject(podTemplateSpecHasher, *spec)
result := string(podTemplateSpecHasher.Sum(make([]byte, 0)))
return &result
}
func stopChacheGC(c *cachingRecommenderImpl) {
c.cache.StopCacheGC()
}

View File

@ -1,94 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package recommender
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
)
func TestGetWithCache(t *testing.T) {
apiMock := &test.RecommenderAPIMock{}
rec := test.Recommendation("test", "", "")
pod := test.BuildTestPod("test", "", "", "", nil, nil)
apiMock.On("GetRecommendation", &pod.Spec).Return(rec, nil)
recommender := NewCachingRecommender(10*time.Second, apiMock)
result, err := recommender.Get(&pod.Spec)
assert.Equal(t, rec, result)
assert.Equal(t, nil, err)
// test get from cache
for i := 0; i < 5; i++ {
result, err = recommender.Get(&pod.Spec)
}
apiMock.AssertNumberOfCalls(t, "GetRecommendation", 1)
}
func TestGetCacheExpired(t *testing.T) {
apiMock := &test.RecommenderAPIMock{}
rec := test.Recommendation("test", "", "")
pod := test.BuildTestPod("test", "", "", "", nil, nil)
apiMock.On("GetRecommendation", &pod.Spec).Return(rec, nil)
recommender := NewCachingRecommender(time.Second, apiMock)
result, err := recommender.Get(&pod.Spec)
assert.Equal(t, rec, result)
assert.Equal(t, nil, err)
<-time.After(2 * time.Second)
result, err = recommender.Get(&pod.Spec)
apiMock.AssertNumberOfCalls(t, "GetRecommendation", 2)
}
func TestNoRec(t *testing.T) {
apiMock := &test.RecommenderAPIMock{}
pod := test.BuildTestPod("test", "", "", "", nil, nil)
apiMock.On("GetRecommendation", &pod.Spec).Return(nil, nil)
recommender := NewCachingRecommender(time.Second, apiMock)
result, err := recommender.Get(&pod.Spec)
assert.Nil(t, result)
assert.Nil(t, err)
// check nil response not chached
result, err = recommender.Get(&pod.Spec)
apiMock.AssertNumberOfCalls(t, "GetRecommendation", 2)
}
func TestError(t *testing.T) {
apiMock := &test.RecommenderAPIMock{}
pod := test.BuildTestPod("test", "", "", "", nil, nil)
err := fmt.Errorf("Expected Fail")
apiMock.On("GetRecommendation", &pod.Spec).Return(nil, err)
recommender := NewCachingRecommender(time.Second, apiMock)
result, err := recommender.Get(&pod.Spec)
assert.Nil(t, result)
assert.Error(t, err)
// check error response not chached
result, err = recommender.Get(&pod.Spec)
apiMock.AssertNumberOfCalls(t, "GetRecommendation", 2)
}

View File

@ -1,97 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package recommender
import (
"sync"
"time"
)
// TTLCache represents cache with ttl
type TTLCache struct {
ttl time.Duration
mutex sync.RWMutex
items map[string]*cachedValue
stopGC chan struct{}
}
type cachedValue struct {
value interface{}
expirationTime time.Time
}
// NewTTLCache reates TTLCache for given TTL
func NewTTLCache(ttl time.Duration) *TTLCache {
return &TTLCache{
ttl: ttl,
items: make(map[string]*cachedValue),
stopGC: make(chan struct{})}
}
func (r *cachedValue) isFresh() bool {
return r.expirationTime.After(time.Now())
}
// Get Returns value present in cache for given cache key, or nil if key is not found or value TTL has expired.
func (c *TTLCache) Get(cacheKey *string) interface{} {
c.mutex.RLock()
defer c.mutex.RUnlock()
value, found := c.items[*cacheKey]
if found && value.isFresh() {
return value.value
}
return nil
}
// Set adds given value for given key
func (c *TTLCache) Set(cacheKey *string, value interface{}) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.items[*cacheKey] = &cachedValue{value: value, expirationTime: time.Now().Add(c.ttl)}
}
func (c *TTLCache) cleanup() {
c.mutex.Lock()
defer c.mutex.Unlock()
for key, item := range c.items {
if !item.isFresh() {
delete(c.items, key)
}
}
}
// StartCacheGC starts background garbage collector worker which on every time interval removes expired cache entries
// If StartCacheGC was called, in order to properly remove cache object, call StopCacheGC.
// Otherwise TTLCache will never be garbage collected since background worker holds reference to it.
func (c *TTLCache) StartCacheGC(interval time.Duration) {
ticker := time.Tick(interval)
go (func() {
for {
select {
case <-ticker:
c.cleanup()
case <-c.stopGC:
return
}
}
})()
}
// StopCacheGC stops background cache garbage collector
func (c *TTLCache) StopCacheGC() {
c.stopGC <- struct{}{}
}

View File

@ -1,63 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package recommender
import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestGet(t *testing.T) {
cache := NewTTLCache(5 * time.Second)
cacheKey := "hello"
result := cache.Get(&cacheKey)
assert.Nil(t, result, "Result should be nil")
cache.Set(&cacheKey, "world")
result = cache.Get(&cacheKey)
assert.Equal(t, "world", result)
}
func TestExpiration(t *testing.T) {
cache := NewTTLCache(5 * time.Second)
key1 := "A"
key2 := "B"
key3 := "C"
cache.Set(&key1, "1")
cache.Set(&key2, "2")
cache.Set(&key3, "3")
cache.StartCacheGC(time.Second)
<-time.After(2 * time.Second)
assert.Equal(t, "1", cache.Get(&key1))
cache.Set(&key1, "1")
<-time.After(3 * time.Second)
assert.Equal(t, "1", cache.Get(&key1))
assert.Nil(t, cache.Get(&key2))
assert.Nil(t, cache.Get(&key3))
<-time.After(5 * time.Second)
assert.Nil(t, cache.Get(&key1))
cache.StopCacheGC()
cache.Set(&key1, "1")
<-time.After(6 * time.Second)
assert.Nil(t, cache.Get(&key1))
}

View File

@ -20,6 +20,7 @@ import (
"flag" "flag"
"github.com/golang/glog" "github.com/golang/glog"
kube_flag "k8s.io/apiserver/pkg/util/flag" kube_flag "k8s.io/apiserver/pkg/util/flag"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
kube_client "k8s.io/client-go/kubernetes" kube_client "k8s.io/client-go/kubernetes"
kube_restclient "k8s.io/client-go/rest" kube_restclient "k8s.io/client-go/rest"
"time" "time"
@ -45,8 +46,8 @@ func main() {
// TODO monitoring // TODO monitoring
kubeClient := createKubeClient() kubeClient, vpaClient := createKubeClients()
updater := NewUpdater(kubeClient, *recommendationsCacheTtl, *minReplicas, *evictionToleranceFraction) updater := NewUpdater(kubeClient, vpaClient, *recommendationsCacheTtl, *minReplicas, *evictionToleranceFraction)
for { for {
select { select {
case <-time.After(*updaterInterval): case <-time.After(*updaterInterval):
@ -57,10 +58,10 @@ func main() {
} }
} }
func createKubeClient() kube_client.Interface { func createKubeClients() (kube_client.Interface, *vpa_clientset.Clientset) {
config, err := kube_restclient.InClusterConfig() config, err := kube_restclient.InClusterConfig()
if err != nil { if err != nil {
glog.Fatalf("Failed to build Kubernetes client : fail to create config: %v", err) glog.Fatalf("Failed to build Kubernetes client : fail to create config: %v", err)
} }
return kube_client.NewForConfigOrDie(config) return kube_client.NewForConfigOrDie(config), vpa_clientset.NewForConfigOrDie(config)
} }

View File

@ -20,7 +20,7 @@ import (
"math" "math"
"sort" "sort"
"k8s.io/autoscaler/vertical-pod-autoscaler/apimock" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
@ -39,8 +39,7 @@ const (
// i.e. pod with 10M current memory and recommendation 20M will have higher update priority // i.e. pod with 10M current memory and recommendation 20M will have higher update priority
// than pod with 100M current memory and 150M recommendation (100% increase vs 50% increase) // than pod with 100M current memory and 150M recommendation (100% increase vs 50% increase)
type UpdatePriorityCalculator struct { type UpdatePriorityCalculator struct {
resourcesPolicy *apimock.ResourcesPolicy resourcesPolicy *vpa_types.PodResourcePolicy
cpuPolicy *apimock.Policy
pods []podPriority pods []podPriority
config *UpdateConfig config *UpdateConfig
} }
@ -55,7 +54,7 @@ type UpdateConfig struct {
// NewUpdatePriorityCalculator creates new UpdatePriorityCalculator for the given resources policy and configuration. // NewUpdatePriorityCalculator creates new UpdatePriorityCalculator for the given resources policy and configuration.
// If the given policy is nil, there will be no policy restriction on update. // If the given policy is nil, there will be no policy restriction on update.
// If the given config is nil, default values are used. // If the given config is nil, default values are used.
func NewUpdatePriorityCalculator(policy *apimock.ResourcesPolicy, config *UpdateConfig) UpdatePriorityCalculator { func NewUpdatePriorityCalculator(policy *vpa_types.PodResourcePolicy, config *UpdateConfig) UpdatePriorityCalculator {
if config == nil { if config == nil {
config = &UpdateConfig{MinChangePriority: defaultUpdateThreshod} config = &UpdateConfig{MinChangePriority: defaultUpdateThreshod}
} }
@ -63,7 +62,7 @@ func NewUpdatePriorityCalculator(policy *apimock.ResourcesPolicy, config *Update
} }
// AddPod adds pod to the UpdatePriorityCalculator. // AddPod adds pod to the UpdatePriorityCalculator.
func (calc *UpdatePriorityCalculator) AddPod(pod *apiv1.Pod, recommendation *apimock.Recommendation) { func (calc *UpdatePriorityCalculator) AddPod(pod *apiv1.Pod, recommendation *vpa_types.RecommendedPodResources) {
updatePriority := calc.getUpdatePriority(pod, recommendation) updatePriority := calc.getUpdatePriority(pod, recommendation)
if updatePriority < calc.config.MinChangePriority { if updatePriority < calc.config.MinChangePriority {
@ -87,7 +86,7 @@ func (calc *UpdatePriorityCalculator) GetSortedPods() []*apiv1.Pod {
return result return result
} }
func (calc *UpdatePriorityCalculator) getUpdatePriority(pod *apiv1.Pod, recommendation *apimock.Recommendation) float64 { func (calc *UpdatePriorityCalculator) getUpdatePriority(pod *apiv1.Pod, recommendation *vpa_types.RecommendedPodResources) float64 {
var priority float64 var priority float64
for _, podContainer := range pod.Spec.Containers { for _, podContainer := range pod.Spec.Containers {
@ -99,28 +98,28 @@ func (calc *UpdatePriorityCalculator) getUpdatePriority(pod *apiv1.Pod, recommen
containerPolicy := getContainerPolicy(podContainer.Name, calc.resourcesPolicy) containerPolicy := getContainerPolicy(podContainer.Name, calc.resourcesPolicy)
for resourceName, recommended := range cr.Resources { for resourceName, recommended := range cr.Target {
var ( var requested, min, max *resource.Quantity
resourceRequested *resource.Quantity
resourcePolicy *apimock.Policy
)
if request, ok := podContainer.Resources.Requests[resourceName]; ok { if request, ok := podContainer.Resources.Requests[resourceName]; ok {
resourceRequested = &request requested = &request
} }
if containerPolicy != nil { if containerPolicy != nil {
if policy, ok := (*containerPolicy)[resourceName]; ok { if minAllowed, ok := containerPolicy.MinAllowed[resourceName]; ok {
resourcePolicy = &policy min = &minAllowed
}
if maxAllowed, ok := containerPolicy.MaxAllowed[resourceName]; ok {
max = &maxAllowed
} }
} }
resourceDiff := getPercentageDiff(resourceRequested, resourcePolicy, &recommended) resourceDiff := getPercentageDiff(requested, min, max, &recommended)
priority += math.Abs(resourceDiff) priority += math.Abs(resourceDiff)
} }
} }
return priority return priority
} }
func getPercentageDiff(request *resource.Quantity, policy *apimock.Policy, recommendation *resource.Quantity) float64 { func getPercentageDiff(request, min, max, recommendation *resource.Quantity) float64 {
if request == nil { if request == nil {
// resource requirement is not currently specified // resource requirement is not currently specified
// any recommendation for this resource we will treat as 100% change // any recommendation for this resource we will treat as 100% change
@ -130,35 +129,33 @@ func getPercentageDiff(request *resource.Quantity, policy *apimock.Policy, recom
return 0 return 0
} }
recommended := recommendation.Value() recommended := recommendation.Value()
if policy != nil { if min != nil && !min.IsZero() && recommendation.Value() < min.Value() {
if !policy.Min.IsZero() && recommendation.Value() < policy.Min.Value() {
glog.Warningf("recommendation outside of policy bounds : min value : %v recommended : %v", glog.Warningf("recommendation outside of policy bounds : min value : %v recommended : %v",
policy.Min.Value(), recommended) min.Value(), recommended)
recommended = policy.Min.Value() recommended = min.Value()
} }
if !policy.Max.IsZero() && recommendation.Value() > policy.Max.Value() { if max != nil && !max.IsZero() && recommendation.Value() > max.Value() {
glog.Warningf("recommendation outside of policy bounds : max value : %v recommended : %v", glog.Warningf("recommendation outside of policy bounds : max value : %v recommended : %v",
policy.Max.Value(), recommended) max.Value(), recommended)
recommended = policy.Max.Value() recommended = max.Value()
}
} }
diff := recommended - request.Value() diff := recommended - request.Value()
return float64(diff) / float64(request.Value()) return float64(diff) / float64(request.Value())
} }
func getContainerPolicy(containerName string, policy *apimock.ResourcesPolicy) *map[apiv1.ResourceName]apimock.Policy { func getContainerPolicy(containerName string, policy *vpa_types.PodResourcePolicy) *vpa_types.ContainerResourcePolicy {
if policy != nil { if policy != nil {
for _, container := range policy.Containers { for _, container := range policy.ContainerPolicies {
if containerName == container.Name { if containerName == container.Name {
return &container.ResourcePolicy return &container
} }
} }
} }
return nil return nil
} }
func getContainerRecommendation(containerName string, recommendation *apimock.Recommendation) *apimock.ContainerRecommendation { func getContainerRecommendation(containerName string, recommendation *vpa_types.RecommendedPodResources) *vpa_types.RecommendedContainerResources {
for _, container := range recommendation.Containers { for _, container := range recommendation.ContainerRecommendations {
if containerName == container.Name { if containerName == container.Name {
return &container return &container
} }

View File

@ -19,7 +19,7 @@ package priority
import ( import (
"testing" "testing"
"k8s.io/autoscaler/vertical-pod-autoscaler/apimock" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
@ -78,10 +78,10 @@ func TestSortPriorityMultiContainers(t *testing.T) {
recommendation := test.Recommendation(containerName, "6", "20M") recommendation := test.Recommendation(containerName, "6", "20M")
cpuRec, _ := resource.ParseQuantity("4") cpuRec, _ := resource.ParseQuantity("4")
memRec, _ := resource.ParseQuantity("20M") memRec, _ := resource.ParseQuantity("20M")
container2rec := apimock.ContainerRecommendation{ container2rec := vpa_types.RecommendedContainerResources{
Name: containerName2, Name: containerName2,
Resources: map[apiv1.ResourceName]resource.Quantity{apiv1.ResourceCPU: cpuRec, apiv1.ResourceMemory: memRec}} Target: map[apiv1.ResourceName]resource.Quantity{apiv1.ResourceCPU: cpuRec, apiv1.ResourceMemory: memRec}}
recommendation.Containers = append(recommendation.Containers, container2rec) recommendation.ContainerRecommendations = append(recommendation.ContainerRecommendations, container2rec)
calculator := NewUpdatePriorityCalculator(nil, nil) calculator := NewUpdatePriorityCalculator(nil, nil)
calculator.AddPod(pod1, recommendation) calculator.AddPod(pod1, recommendation)

View File

@ -19,14 +19,16 @@ package main
import ( import (
"time" "time"
"k8s.io/autoscaler/vertical-pod-autoscaler/apimock"
recommender "k8s.io/autoscaler/vertical-pod-autoscaler/recommender_mock"
"k8s.io/autoscaler/vertical-pod-autoscaler/updater/eviction" "k8s.io/autoscaler/vertical-pod-autoscaler/updater/eviction"
"k8s.io/autoscaler/vertical-pod-autoscaler/updater/priority" "k8s.io/autoscaler/vertical-pod-autoscaler/updater/priority"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
vpa_lister "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/listers/poc.autoscaling.k8s.io/v1alpha1"
kube_client "k8s.io/client-go/kubernetes" kube_client "k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1" v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -41,25 +43,23 @@ type Updater interface {
} }
type updater struct { type updater struct {
vpaLister apimock.VerticalPodAutoscalerLister // wait for VPA api vpaLister vpa_lister.VerticalPodAutoscalerLister
podLister v1lister.PodLister podLister v1lister.PodLister
recommender recommender.CachingRecommender
evictionFactrory eviction.PodsEvictionRestrictionFactory evictionFactrory eviction.PodsEvictionRestrictionFactory
} }
// NewUpdater creates Updater with given configuration // NewUpdater creates Updater with given configuration
func NewUpdater(kubeClient kube_client.Interface, cacheTTl time.Duration, minReplicasForEvicition int, evictionToleranceFraction float64) Updater { func NewUpdater(kubeClient kube_client.Interface, vpaClient *vpa_clientset.Clientset, cacheTTl time.Duration, minReplicasForEvicition int, evictionToleranceFraction float64) Updater {
return &updater{ return &updater{
vpaLister: newVpaLister(kubeClient), vpaLister: newVpaLister(vpaClient),
podLister: newPodLister(kubeClient), podLister: newPodLister(kubeClient),
recommender: recommender.NewCachingRecommender(cacheTTl, apimock.NewRecommenderAPI()),
evictionFactrory: eviction.NewPodsEvictionRestrictionFactory(kubeClient, minReplicasForEvicition, evictionToleranceFraction), evictionFactrory: eviction.NewPodsEvictionRestrictionFactory(kubeClient, minReplicasForEvicition, evictionToleranceFraction),
} }
} }
// RunOnce represents single iteration in the main-loop of Updater // RunOnce represents single iteration in the main-loop of Updater
func (u *updater) RunOnce() { func (u *updater) RunOnce() {
vpaList, err := u.vpaLister.List() vpaList, err := u.vpaLister.List(labels.Everything())
if err != nil { if err != nil {
glog.Fatalf("failed get VPA list: %v", err) glog.Fatalf("failed get VPA list: %v", err)
} }
@ -70,8 +70,8 @@ func (u *updater) RunOnce() {
} }
for _, vpa := range vpaList { for _, vpa := range vpaList {
glog.V(2).Infof("processing VPA object targeting %v", vpa.Spec.Target.Selector) glog.V(2).Infof("processing VPA object targeting %v", vpa.Spec.Selector)
selector, err := labels.Parse(vpa.Spec.Target.Selector) selector, err := metav1.LabelSelectorAsSelector(vpa.Spec.Selector)
if err != nil { if err != nil {
glog.Errorf("error processing VPA object: failed to create pod selector: %v", err) glog.Errorf("error processing VPA object: failed to create pod selector: %v", err)
continue continue
@ -106,27 +106,12 @@ func (u *updater) RunOnce() {
} }
// getPodsForUpdate returns list of pods that should be updated ordered by update priority // getPodsForUpdate returns list of pods that should be updated ordered by update priority
func (u *updater) getPodsForUpdate(pods []*apiv1.Pod, vpa *apimock.VerticalPodAutoscaler) []*apiv1.Pod { func (u *updater) getPodsForUpdate(pods []*apiv1.Pod, vpa *vpa_types.VerticalPodAutoscaler) []*apiv1.Pod {
priorityCalculator := priority.NewUpdatePriorityCalculator(&vpa.Spec.ResourcesPolicy, nil) priorityCalculator := priority.NewUpdatePriorityCalculator(&vpa.Spec.ResourcePolicy, nil)
recommendation := vpa.Status.Recommendation
for _, pod := range pods { for _, pod := range pods {
recommendation, err := u.recommender.Get(&pod.Spec) priorityCalculator.AddPod(pod, &recommendation)
if err != nil {
glog.Errorf("error while getting recommendation for pod %v: %v", pod.Name, err)
continue
}
if recommendation == nil {
if len(vpa.Status.Recommendation.Containers) == 0 {
glog.Warningf("no recommendation for pod: %v", pod.Name)
continue
}
glog.Warningf("fallback to default VPA recommendation for pod: %v", pod.Name)
recommendation = vpa.Status.Recommendation
}
priorityCalculator.AddPod(pod, recommendation)
} }
return priorityCalculator.GetSortedPods() return priorityCalculator.GetSortedPods()
@ -152,8 +137,14 @@ func filterDeletedPods(pods []*apiv1.Pod) []*apiv1.Pod {
return result return result
} }
func newVpaLister(kubeClient kube_client.Interface) apimock.VerticalPodAutoscalerLister { func newVpaLister(vpaClient *vpa_clientset.Clientset) vpa_lister.VerticalPodAutoscalerLister {
return apimock.NewVpaLister(kubeClient) vpaListWatch := cache.NewListWatchFromClient(vpaClient.Poc().RESTClient(), "vpa", apiv1.NamespaceAll, fields.Everything())
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
vpaLister := vpa_lister.NewVerticalPodAutoscalerLister(store)
vpaReflector := cache.NewReflector(vpaListWatch, &vpa_types.VerticalPodAutoscaler{}, store, time.Hour)
stopCh := make(chan struct{})
go vpaReflector.Run(stopCh)
return vpaLister
} }
func newPodLister(kubeClient kube_client.Interface) v1lister.PodLister { func newPodLister(kubeClient kube_client.Interface) v1lister.PodLister {

View File

@ -17,11 +17,12 @@ limitations under the License.
package main package main
import ( import (
"strconv"
"testing" "testing"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/apimock" vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test" "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
"k8s.io/autoscaler/vertical-pod-autoscaler/updater/eviction" "k8s.io/autoscaler/vertical-pod-autoscaler/updater/eviction"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
@ -46,15 +47,11 @@ func TestRunOnce(t *testing.T) {
pods := make([]*apiv1.Pod, livePods) pods := make([]*apiv1.Pod, livePods)
eviction := &test.PodsEvictionRestrictionMock{} eviction := &test.PodsEvictionRestrictionMock{}
recommender := &test.RecommenderMock{}
rec := test.Recommendation(containerName, "2", "200M")
for i := range pods { for i := range pods {
pods[i] = test.BuildTestPod("test"+string(i), containerName, "1", "100M", &rc.ObjectMeta, &rc.TypeMeta) pods[i] = test.BuildTestPod("test_"+strconv.Itoa(i), containerName, "1", "100M", &rc.ObjectMeta, &rc.TypeMeta)
pods[i].Spec.NodeSelector = labels pods[i].Spec.NodeSelector = labels
eviction.On("CanEvict", pods[i]).Return(true) eviction.On("CanEvict", pods[i]).Return(true)
eviction.On("Evict", pods[i]).Return(nil) eviction.On("Evict", pods[i]).Return(nil)
recommender.On("Get", &pods[i].Spec).Return(rec, nil)
} }
factory := &fakeEvictFactory{eviction} factory := &fakeEvictFactory{eviction}
@ -62,13 +59,12 @@ func TestRunOnce(t *testing.T) {
podLister := &test.PodListerMock{} podLister := &test.PodListerMock{}
podLister.On("List").Return(pods, nil) podLister.On("List").Return(pods, nil)
vpaObj := test.BuildTestVerticalPodAutoscaler(containerName, "1", "3", "100M", "1G", selector) vpaObj := test.BuildTestVerticalPodAutoscaler(containerName, "2", "1", "3", "200M", "100M", "1G", selector)
vpaLister.On("List").Return([]*apimock.VerticalPodAutoscaler{vpaObj}, nil).Once() vpaLister.On("List").Return([]*vpa_types.VerticalPodAutoscaler{vpaObj}, nil).Once()
updater := &updater{ updater := &updater{
vpaLister: vpaLister, vpaLister: vpaLister,
podLister: podLister, podLister: podLister,
recommender: recommender,
evictionFactrory: factory, evictionFactrory: factory,
} }
@ -77,7 +73,6 @@ func TestRunOnce(t *testing.T) {
} }
func TestRunOnceNotingToProcess(t *testing.T) { func TestRunOnceNotingToProcess(t *testing.T) {
recommender := &test.RecommenderMock{}
eviction := &test.PodsEvictionRestrictionMock{} eviction := &test.PodsEvictionRestrictionMock{}
factory := &fakeEvictFactory{eviction} factory := &fakeEvictFactory{eviction}
vpaLister := &test.VerticalPodAutoscalerListerMock{} vpaLister := &test.VerticalPodAutoscalerListerMock{}
@ -88,7 +83,6 @@ func TestRunOnceNotingToProcess(t *testing.T) {
updater := &updater{ updater := &updater{
vpaLister: vpaLister, vpaLister: vpaLister,
podLister: podLister, podLister: podLister,
recommender: recommender,
evictionFactrory: factory, evictionFactrory: factory,
} }
updater.RunOnce() updater.RunOnce()