Merge pull request #5239 from DataDog/david.benque/reco-post-processing

[vpa] introduce recommendation post processor
This commit is contained in:
Kubernetes Prow Robot 2022-11-02 06:15:15 -07:00 committed by GitHub
commit 3ceb97ae2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 179 additions and 129 deletions

View File

@ -18,7 +18,9 @@ package logic
import (
"flag"
"sort"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
)
@ -147,3 +149,31 @@ func CreatePodResourceRecommender() PodResourceRecommender {
lowerBoundEstimator,
upperBoundEstimator}
}
// MapToListOfRecommendedContainerResources converts the map of RecommendedContainerResources into a stable sorted list
// This can be used to get a stable sequence while ranging on the data
func MapToListOfRecommendedContainerResources(resources RecommendedPodResources) *vpa_types.RecommendedPodResources {
containerResources := make([]vpa_types.RecommendedContainerResources, 0, len(resources))
// Sort the container names from the map. This is because maps are an
// unordered data structure, and iterating through the map will return
// a different order on every call.
containerNames := make([]string, 0, len(resources))
for containerName := range resources {
containerNames = append(containerNames, containerName)
}
sort.Strings(containerNames)
// Create the list of recommendations for each container.
for _, name := range containerNames {
containerResources = append(containerResources, vpa_types.RecommendedContainerResources{
ContainerName: name,
Target: model.ResourcesAsResourceList(resources[name].Target),
LowerBound: model.ResourcesAsResourceList(resources[name].LowerBound),
UpperBound: model.ResourcesAsResourceList(resources[name].UpperBound),
UncappedTarget: model.ResourcesAsResourceList(resources[name].Target),
})
}
recommendation := &vpa_types.RecommendedPodResources{
ContainerRecommendations: containerResources,
}
return recommendation
}

View File

@ -17,10 +17,9 @@ limitations under the License.
package logic
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
"testing"
)
func TestMinResourcesApplied(t *testing.T) {
@ -115,3 +114,54 @@ func TestControlledResourcesFilteredDefault(t *testing.T) {
assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceCPU)
assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceCPU)
}
func TestMapToListOfRecommendedContainerResources(t *testing.T) {
cases := []struct {
name string
resources RecommendedPodResources
expectedLast []string
}{
{
name: "All recommendations sorted",
resources: RecommendedPodResources{
"a-container": RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1e6)}},
"b-container": RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(2), model.ResourceMemory: model.MemoryAmountFromBytes(2e6)}},
"c-container": RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(3), model.ResourceMemory: model.MemoryAmountFromBytes(3e6)}},
"d-container": RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(4), model.ResourceMemory: model.MemoryAmountFromBytes(4e6)}},
},
expectedLast: []string{
"a-container",
"b-container",
"c-container",
"d-container",
},
},
{
name: "All recommendations unsorted",
resources: RecommendedPodResources{
"b-container": RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1e6)}},
"a-container": RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(2), model.ResourceMemory: model.MemoryAmountFromBytes(2e6)}},
"d-container": RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(3), model.ResourceMemory: model.MemoryAmountFromBytes(3e6)}},
"c-container": RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(4), model.ResourceMemory: model.MemoryAmountFromBytes(4e6)}},
},
expectedLast: []string{
"a-container",
"b-container",
"c-container",
"d-container",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
outRecommendations := MapToListOfRecommendedContainerResources(tc.resources)
for i, outRecommendation := range outRecommendations.ContainerRecommendations {
containerName := tc.expectedLast[i]
assert.Equal(t, containerName, outRecommendation.ContainerName)
// also check that the recommendation is not changed
assert.Equal(t, int64(tc.resources[containerName].Target[model.ResourceCPU]), outRecommendation.Target.Cpu().MilliValue())
assert.Equal(t, int64(tc.resources[containerName].Target[model.ResourceMemory]), outRecommendation.Target.Memory().Value())
}
})
}
}

View File

@ -82,7 +82,11 @@ func main() {
metrics_quality.Register()
useCheckpoints := *storage != "prometheus"
recommender := routines.NewRecommender(config, *checkpointsGCInterval, useCheckpoints, *vpaObjectNamespace, *recommenderName)
postProcessors := []routines.RecommendationPostProcessor{
&routines.CappingPostProcessor{},
}
recommender := routines.NewRecommender(config, *checkpointsGCInterval, useCheckpoints, *vpaObjectNamespace, *recommenderName, postProcessors)
promQueryTimeout, err := time.ParseDuration(*queryTimeout)
if err != nil {

View File

@ -0,0 +1,41 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package routines
import (
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
vpa_utils "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/vpa"
"k8s.io/klog/v2"
)
// CappingPostProcessor ensure that the policy is applied to recommendation
// it applies policy for fields: MinAllowed and MaxAllowed
type CappingPostProcessor struct{}
var _ RecommendationPostProcessor = &CappingPostProcessor{}
// Process apply the capping post-processing to the recommendation. (use to be function getCappedRecommendation)
func (c CappingPostProcessor) Process(vpa *model.Vpa, recommendation *vpa_types.RecommendedPodResources, policy *vpa_types.PodResourcePolicy) *vpa_types.RecommendedPodResources {
// TODO: maybe rename the vpa_utils.ApplyVPAPolicy to something that mention that it is doing capping only
cappedRecommendation, err := vpa_utils.ApplyVPAPolicy(recommendation, policy)
if err != nil {
klog.Errorf("Failed to apply policy for VPA %v/%v: %v", vpa.ID.Namespace, vpa.ID.VpaName, err)
return recommendation
}
return cappedRecommendation
}

View File

@ -0,0 +1,28 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package routines
import (
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
)
// RecommendationPostProcessor can amend the recommendation according to the defined policies
type RecommendationPostProcessor interface {
Process(vpa *model.Vpa, recommendation *vpa_types.RecommendedPodResources,
policy *vpa_types.PodResourcePolicy) *vpa_types.RecommendedPodResources
}

View File

@ -19,10 +19,8 @@ package routines
import (
"context"
"flag"
"sort"
"time"
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
vpa_clientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
vpa_api "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/typed/autoscaling.k8s.io/v1"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/checkpoint"
@ -80,6 +78,7 @@ type recommender struct {
podResourceRecommender logic.PodResourceRecommender
useCheckpoints bool
lastAggregateContainerStateGC time.Time
recommendationPostProcessor []RecommendationPostProcessor
}
func (r *recommender) GetClusterState() *model.ClusterState {
@ -107,7 +106,14 @@ func (r *recommender) UpdateVPAs() {
}
resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa))
had := vpa.HasRecommendation()
vpa.UpdateRecommendation(getCappedRecommendation(vpa.ID, resources, observedVpa.Spec.ResourcePolicy))
listOfResourceRecommendation := logic.MapToListOfRecommendedContainerResources(resources)
for _, postProcessor := range r.recommendationPostProcessor {
listOfResourceRecommendation = postProcessor.Process(vpa, listOfResourceRecommendation, observedVpa.Spec.ResourcePolicy)
}
vpa.UpdateRecommendation(listOfResourceRecommendation)
if vpa.HasRecommendation() && !had {
metrics_recommender.ObserveRecommendationLatency(vpa.Created)
}
@ -138,42 +144,6 @@ func (r *recommender) UpdateVPAs() {
}
}
// getCappedRecommendation creates a recommendation based on recommended pod
// resources, setting the UncappedTarget to the calculated recommended target
// and if necessary, capping the Target, LowerBound and UpperBound according
// to the ResourcePolicy.
func getCappedRecommendation(vpaID model.VpaID, resources logic.RecommendedPodResources,
policy *vpa_types.PodResourcePolicy) *vpa_types.RecommendedPodResources {
containerResources := make([]vpa_types.RecommendedContainerResources, 0, len(resources))
// Sort the container names from the map. This is because maps are an
// unordered data structure, and iterating through the map will return
// a different order on every call.
containerNames := make([]string, 0, len(resources))
for containerName := range resources {
containerNames = append(containerNames, containerName)
}
sort.Strings(containerNames)
// Create the list of recommendations for each container.
for _, name := range containerNames {
containerResources = append(containerResources, vpa_types.RecommendedContainerResources{
ContainerName: name,
Target: model.ResourcesAsResourceList(resources[name].Target),
LowerBound: model.ResourcesAsResourceList(resources[name].LowerBound),
UpperBound: model.ResourcesAsResourceList(resources[name].UpperBound),
UncappedTarget: model.ResourcesAsResourceList(resources[name].Target),
})
}
recommendation := &vpa_types.RecommendedPodResources{
ContainerRecommendations: containerResources,
}
cappedRecommendation, err := vpa_utils.ApplyVPAPolicy(recommendation, policy)
if err != nil {
klog.Errorf("Failed to apply policy for VPA %v/%v: %v", vpaID.Namespace, vpaID.VpaName, err)
return recommendation
}
return cappedRecommendation
}
func (r *recommender) MaintainCheckpoints(ctx context.Context, minCheckpointsPerRun int) {
now := time.Now()
if r.useCheckpoints {
@ -228,6 +198,8 @@ type RecommenderFactory struct {
PodResourceRecommender logic.PodResourceRecommender
VpaClient vpa_api.VerticalPodAutoscalersGetter
RecommendationPostProcessors []RecommendationPostProcessor
CheckpointsGCInterval time.Duration
UseCheckpoints bool
}
@ -244,6 +216,7 @@ func (c RecommenderFactory) Make() Recommender {
useCheckpoints: c.UseCheckpoints,
vpaClient: c.VpaClient,
podResourceRecommender: c.PodResourceRecommender,
recommendationPostProcessor: c.RecommendationPostProcessors,
lastAggregateContainerStateGC: time.Now(),
lastCheckpointGC: time.Now(),
}
@ -254,19 +227,21 @@ func (c RecommenderFactory) Make() Recommender {
// NewRecommender creates a new recommender instance.
// Dependencies are created automatically.
// Deprecated; use RecommenderFactory instead.
func NewRecommender(config *rest.Config, checkpointsGCInterval time.Duration, useCheckpoints bool, namespace string, recommenderName string) Recommender {
func NewRecommender(config *rest.Config, checkpointsGCInterval time.Duration, useCheckpoints bool, namespace string, recommenderName string, recommendationPostProcessors []RecommendationPostProcessor) Recommender {
clusterState := model.NewClusterState(AggregateContainerStateGCInterval)
kubeClient := kube_client.NewForConfigOrDie(config)
factory := informers.NewSharedInformerFactoryWithOptions(kubeClient, defaultResyncPeriod, informers.WithNamespace(namespace))
controllerFetcher := controllerfetcher.NewControllerFetcher(config, kubeClient, factory, scaleCacheEntryFreshnessTime, scaleCacheEntryLifetime, scaleCacheEntryJitterFactor)
return RecommenderFactory{
ClusterState: clusterState,
ClusterStateFeeder: input.NewClusterStateFeeder(config, clusterState, *memorySaver, namespace, "default-metrics-client", recommenderName),
ControllerFetcher: controllerFetcher,
CheckpointWriter: checkpoint.NewCheckpointWriter(clusterState, vpa_clientset.NewForConfigOrDie(config).AutoscalingV1()),
VpaClient: vpa_clientset.NewForConfigOrDie(config).AutoscalingV1(),
PodResourceRecommender: logic.CreatePodResourceRecommender(),
CheckpointsGCInterval: checkpointsGCInterval,
UseCheckpoints: useCheckpoints,
ClusterState: clusterState,
ClusterStateFeeder: input.NewClusterStateFeeder(config, clusterState, *memorySaver, namespace, "default-metrics-client", recommenderName),
ControllerFetcher: controllerFetcher,
CheckpointWriter: checkpoint.NewCheckpointWriter(clusterState, vpa_clientset.NewForConfigOrDie(config).AutoscalingV1()),
VpaClient: vpa_clientset.NewForConfigOrDie(config).AutoscalingV1(),
PodResourceRecommender: logic.CreatePodResourceRecommender(),
RecommendationPostProcessors: recommendationPostProcessors,
CheckpointsGCInterval: checkpointsGCInterval,
UseCheckpoints: useCheckpoints,
}.Make()
}

View File

@ -1,78 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package routines
import (
"testing"
"time"
labels "k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/logic"
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
"github.com/stretchr/testify/assert"
)
func TestSortedRecommendation(t *testing.T) {
cases := []struct {
name string
resources logic.RecommendedPodResources
expectedLast []string
}{
{
name: "All recommendations sorted",
resources: logic.RecommendedPodResources{
"a-container": logic.RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1000)}},
"b-container": logic.RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1000)}},
"c-container": logic.RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1000)}},
"d-container": logic.RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1000)}},
},
expectedLast: []string{
"a-container",
"b-container",
"c-container",
"d-container",
},
},
{
name: "All recommendations unsorted",
resources: logic.RecommendedPodResources{
"b-container": logic.RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1000)}},
"a-container": logic.RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1000)}},
"d-container": logic.RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1000)}},
"c-container": logic.RecommendedContainerResources{Target: model.Resources{model.ResourceCPU: model.CPUAmountFromCores(1), model.ResourceMemory: model.MemoryAmountFromBytes(1000)}},
},
expectedLast: []string{
"a-container",
"b-container",
"c-container",
"d-container",
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
namespace := "test-namespace"
vpa := model.NewVpa(model.VpaID{Namespace: namespace, VpaName: "my-vpa"}, labels.Nothing(), time.Unix(0, 0))
vpa.UpdateRecommendation(getCappedRecommendation(vpa.ID, tc.resources, nil))
// Check that the slice is in the correct order.
for i := range vpa.Recommendation.ContainerRecommendations {
assert.Equal(t, tc.expectedLast[i], vpa.Recommendation.ContainerRecommendations[i].ContainerName)
}
})
}
}