From 89d87f085e8530fa8dbb1e877b198d625cbf9ce0 Mon Sep 17 00:00:00 2001 From: Joachim Bartosik Date: Wed, 1 Sep 2021 12:09:16 +0200 Subject: [PATCH] Rate limit garbage collection inside `ClusterState` It looks like it belongs inside the class, not with code using it. --- .../checkpoint/checkpoint_writer_test.go | 4 +- .../recommender/input/cluster_feeder_test.go | 16 ++-- .../model/aggregate_container_state_test.go | 2 +- .../pkg/recommender/model/cluster.go | 38 +++++--- .../pkg/recommender/model/cluster_test.go | 88 +++++++++++++------ .../pkg/recommender/routines/recommender.go | 14 +-- 6 files changed, 100 insertions(+), 62 deletions(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/checkpoint/checkpoint_writer_test.go b/vertical-pod-autoscaler/pkg/recommender/checkpoint/checkpoint_writer_test.go index 4b2d8c1943..ea9fc36f58 100644 --- a/vertical-pod-autoscaler/pkg/recommender/checkpoint/checkpoint_writer_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/checkpoint/checkpoint_writer_test.go @@ -42,6 +42,8 @@ var ( } ) +const testGcPeriod = time.Minute + func addVpa(t *testing.T, cluster *model.ClusterState, vpaID model.VpaID, selector string) *model.Vpa { var apiObject vpa_types.VerticalPodAutoscaler apiObject.Namespace = vpaID.Namespace @@ -56,7 +58,7 @@ func addVpa(t *testing.T, cluster *model.ClusterState, vpaID model.VpaID, select } func TestMergeContainerStateForCheckpointDropsRecentMemoryPeak(t *testing.T) { - cluster := model.NewClusterState() + cluster := model.NewClusterState(testGcPeriod) cluster.AddOrUpdatePod(testPodID1, testLabels, v1.PodRunning) assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID1, testRequest)) container := cluster.GetContainer(testContainerID1) diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go index 6465fd9f63..fb2fe65c2b 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go @@ -19,6 +19,7 @@ package input import ( "fmt" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -60,11 +61,12 @@ var ( ) const ( - kind = "dodokind" - name1 = "dotaro" - name2 = "doseph" - namespace = "testNamespace" - apiVersion = "stardust" + kind = "dodokind" + name1 = "dotaro" + name2 = "doseph" + namespace = "testNamespace" + apiVersion = "stardust" + testGcPeriod = time.Minute ) func TestLoadPods(t *testing.T) { @@ -206,7 +208,7 @@ func TestLoadPods(t *testing.T) { targetSelectorFetcher := target_mock.NewMockVpaTargetSelectorFetcher(ctrl) - clusterState := model.NewClusterState() + clusterState := model.NewClusterState(testGcPeriod) clusterStateFeeder := clusterStateFeeder{ vpaLister: vpaLister, @@ -321,7 +323,7 @@ func TestClusterStateFeeder_LoadPods(t *testing.T) { }, } { t.Run(tc.Name, func(t *testing.T) { - clusterState := model.NewClusterState() + clusterState := model.NewClusterState(testGcPeriod) for i, selector := range tc.VPALabelSelectors { vpaLabel, err := labels.Parse(selector) assert.NoError(t, err) diff --git a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state_test.go b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state_test.go index dc6b115483..2b4383b88a 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state_test.go @@ -72,7 +72,7 @@ func addTestMemorySample(cluster *ClusterState, container ContainerID, memoryByt // container CPU and memory peak histograms, grouping the two containers // with the same name ('app-A') together. func TestAggregateStateByContainerName(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) cluster.AddOrUpdatePod(testPodID1, testLabels, apiv1.PodRunning) otherLabels := labels.Set{"label-2": "value-2"} cluster.AddOrUpdatePod(testPodID2, otherLabels, apiv1.PodRunning) diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 2f85f9b56b..8e0e03b27c 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -54,6 +54,9 @@ type ClusterState struct { // Map with all label sets used by the aggregations. It serves as a cache // that allows to quickly access labels.Set corresponding to a labelSetKey. labelSetMap labelSetMap + + lastAggregateContainerStateGC time.Time + gcInterval time.Duration } // StateMapSize is the number of pods being tracked by the VPA @@ -92,13 +95,15 @@ type PodState struct { } // NewClusterState returns a new ClusterState with no pods. -func NewClusterState() *ClusterState { +func NewClusterState(gcInterval time.Duration) *ClusterState { return &ClusterState{ - Pods: make(map[PodID]*PodState), - Vpas: make(map[VpaID]*Vpa), - EmptyVPAs: make(map[VpaID]time.Time), - aggregateStateMap: make(aggregateContainerStatesMap), - labelSetMap: make(labelSetMap), + Pods: make(map[PodID]*PodState), + Vpas: make(map[VpaID]*Vpa), + EmptyVPAs: make(map[VpaID]time.Time), + aggregateStateMap: make(aggregateContainerStatesMap), + labelSetMap: make(labelSetMap), + lastAggregateContainerStateGC: time.Unix(0, 0), + gcInterval: gcInterval, } } @@ -343,12 +348,7 @@ func (cluster *ClusterState) findOrCreateAggregateContainerState(containerID Con return aggregateContainerState } -// GarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState. -// AggregateCollectionState is obsolete in following situations: -// 1) It has no samples and there are no more active pods that can contribute, -// 2) The last sample is too old to give meaningful recommendation (>8 days), -// 3) There are no samples and the aggregate state was created >8 days ago. -func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Time) { +func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Time) { klog.V(1).Info("Garbage collection of AggregateCollectionStates triggered") keysToDelete := make([]AggregateStateKey, 0) activeKeys := cluster.getActiveAggregateStateKeys() @@ -372,6 +372,20 @@ func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Ti } } +// RateLimitedGarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState. +// It performs clean up only if more than `gcInterval` passed since the last time it performed a clean up. +// AggregateCollectionState is obsolete in following situations: +// 1) It has no samples and there are no more active pods that can contribute, +// 2) The last sample is too old to give meaningful recommendation (>8 days), +// 3) There are no samples and the aggregate state was created >8 days ago. +func (cluster *ClusterState) RateLimitedGarbageCollectAggregateCollectionStates(now time.Time) { + if now.Sub(cluster.lastAggregateContainerStateGC) < cluster.gcInterval { + return + } + cluster.garbageCollectAggregateCollectionStates(now) + cluster.lastAggregateContainerStateGC = now +} + func (cluster *ClusterState) getActiveAggregateStateKeys() map[AggregateStateKey]bool { activeKeys := map[AggregateStateKey]bool{} for _, pod := range cluster.Pods { diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go index 116c9a69df..5ae08158bb 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster_test.go @@ -42,6 +42,8 @@ var ( testSelectorStr = "label-1 = value-1" ) +const testGcPeriod = time.Minute + func makeTestUsageSample() *ContainerUsageSampleWithKey { return &ContainerUsageSampleWithKey{ContainerUsageSample{ MeasureStart: testTimestamp, @@ -53,7 +55,7 @@ func makeTestUsageSample() *ContainerUsageSampleWithKey { func TestClusterAddSample(t *testing.T) { // Create a pod with a single container. - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) cluster.AddOrUpdatePod(testPodID, testLabels, apiv1.PodRunning) assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest)) @@ -67,7 +69,7 @@ func TestClusterAddSample(t *testing.T) { func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) { // Create a pod with a single container. - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) addTestPod(cluster) @@ -81,7 +83,7 @@ func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) { assert.NotEmpty(t, vpa.aggregateContainerStates) // AggegateContainerState are valid for 8 days since last sample - cluster.GarbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(9 * 24 * time.Hour)) + cluster.garbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(9 * 24 * time.Hour)) // AggegateContainerState should be deleted from both cluster and vpa assert.Empty(t, cluster.aggregateStateMap) @@ -90,7 +92,7 @@ func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) { func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) { // Create a pod with a single container. - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) addTestPod(cluster) @@ -107,12 +109,12 @@ func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) { } // Verify empty aggregate states are not removed right away. - cluster.GarbageCollectAggregateCollectionStates(creationTime.Add(1 * time.Minute)) // AggegateContainerState should be deleted from both cluster and vpa + cluster.garbageCollectAggregateCollectionStates(creationTime.Add(1 * time.Minute)) // AggegateContainerState should be deleted from both cluster and vpa assert.NotEmpty(t, cluster.aggregateStateMap) assert.NotEmpty(t, vpa.aggregateContainerStates) // AggegateContainerState are valid for 8 days since creation - cluster.GarbageCollectAggregateCollectionStates(creationTime.Add(9 * 24 * time.Hour)) + cluster.garbageCollectAggregateCollectionStates(creationTime.Add(9 * 24 * time.Hour)) // AggegateContainerState should be deleted from both cluster and vpa assert.Empty(t, cluster.aggregateStateMap) @@ -121,7 +123,7 @@ func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) { func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) { // Create a pod with a single container. - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) pod := addTestPod(cluster) @@ -131,14 +133,14 @@ func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) { assert.NotEmpty(t, cluster.aggregateStateMap) assert.NotEmpty(t, vpa.aggregateContainerStates) - cluster.GarbageCollectAggregateCollectionStates(testTimestamp) + cluster.garbageCollectAggregateCollectionStates(testTimestamp) // AggegateContainerState should not be deleted as the pod is still active. assert.NotEmpty(t, cluster.aggregateStateMap) assert.NotEmpty(t, vpa.aggregateContainerStates) cluster.Pods[pod.ID].Phase = apiv1.PodSucceeded - cluster.GarbageCollectAggregateCollectionStates(testTimestamp) + cluster.garbageCollectAggregateCollectionStates(testTimestamp) // AggegateContainerState should be empty as the pod is no longer active and // there are no usage samples. @@ -148,7 +150,7 @@ func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) { func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) { // Create a pod with a single container. - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) addTestPod(cluster) @@ -162,7 +164,7 @@ func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) { assert.NotEmpty(t, vpa.aggregateContainerStates) // AggegateContainerState are valid for 8 days since last sample - cluster.GarbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(7 * 24 * time.Hour)) + cluster.garbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(7 * 24 * time.Hour)) assert.NotEmpty(t, cluster.aggregateStateMap) assert.NotEmpty(t, vpa.aggregateContainerStates) @@ -170,7 +172,7 @@ func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) { func TestAddSampleAfterAggregateContainerStateGCed(t *testing.T) { // Create a pod with a single container. - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) pod := addTestPod(cluster) addTestContainer(cluster) @@ -189,7 +191,7 @@ func TestAddSampleAfterAggregateContainerStateGCed(t *testing.T) { // AggegateContainerState are invalid after 8 days since last sample gcTimestamp := usageSample.MeasureStart.Add(10 * 24 * time.Hour) - cluster.GarbageCollectAggregateCollectionStates(gcTimestamp) + cluster.garbageCollectAggregateCollectionStates(gcTimestamp) assert.Empty(t, cluster.aggregateStateMap) assert.Empty(t, vpa.aggregateContainerStates) @@ -205,12 +207,40 @@ func TestAddSampleAfterAggregateContainerStateGCed(t *testing.T) { assert.NoError(t, cluster.AddSample(newUsageSample)) assert.Contains(t, vpa.aggregateContainerStates, aggregateStateKey) +} +func TestClusterGCRateLimiting(t *testing.T) { + // Create a pod with a single container. + cluster := NewClusterState(testGcPeriod) + usageSample := makeTestUsageSample() + sampleExpireTime := usageSample.MeasureStart.Add(9 * 24 * time.Hour) + // AggegateContainerState are valid for 8 days since last sample but this run + // doesn't remove the sample, because we didn't add it yet. + cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime) + vpa := addTestVpa(cluster) + addTestPod(cluster) + assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest)) + + // Add a usage sample to the container. + assert.NoError(t, cluster.AddSample(usageSample)) + assert.NotEmpty(t, cluster.aggregateStateMap) + assert.NotEmpty(t, vpa.aggregateContainerStates) + + // Sample is expired but this run doesn't remove it yet, because less than testGcPeriod + // elapsed since the previous run. + cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime.Add(testGcPeriod / 2)) + assert.NotEmpty(t, cluster.aggregateStateMap) + assert.NotEmpty(t, vpa.aggregateContainerStates) + + // AggegateContainerState should be deleted from both cluster and vpa + cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime.Add(2 * testGcPeriod)) + assert.Empty(t, cluster.aggregateStateMap) + assert.Empty(t, vpa.aggregateContainerStates) } func TestClusterRecordOOM(t *testing.T) { // Create a pod with a single container. - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) cluster.AddOrUpdatePod(testPodID, testLabels, apiv1.PodRunning) assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest)) @@ -225,7 +255,7 @@ func TestClusterRecordOOM(t *testing.T) { // Verifies that AddSample and AddOrUpdateContainer methods return a proper // KeyError when referring to a non-existent pod. func TestMissingKeys(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) err := cluster.AddSample(makeTestUsageSample()) assert.EqualError(t, err, "KeyError: {namespace-1 pod-1}") @@ -269,7 +299,7 @@ func addTestContainer(cluster *ClusterState) *ContainerState { // Creates a VPA followed by a matching pod. Verifies that the links between // VPA, the container and the aggregation are set correctly. func TestAddVpaThenAddPod(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) assert.Empty(t, vpa.aggregateContainerStates) addTestPod(cluster) @@ -281,7 +311,7 @@ func TestAddVpaThenAddPod(t *testing.T) { // Creates a pod followed by a matching VPA. Verifies that the links between // VPA, the container and the aggregation are set correctly. func TestAddPodThenAddVpa(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) addTestPod(cluster) addTestContainer(cluster) vpa := addTestVpa(cluster) @@ -293,7 +323,7 @@ func TestAddPodThenAddVpa(t *testing.T) { // no longer matched by the VPA. Verifies that the links between the pod and the // VPA are removed. func TestChangePodLabels(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) addTestPod(cluster) addTestContainer(cluster) @@ -307,7 +337,7 @@ func TestChangePodLabels(t *testing.T) { // Creates a VPA and verifies that annotation updates work properly. func TestUpdateAnnotations(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) // Verify that the annotations match the test annotations. assert.Equal(t, vpa.Annotations, testAnnotations) @@ -326,7 +356,7 @@ func TestUpdateAnnotations(t *testing.T) { // the pod, finally such that it matches the pod again. Verifies that the links // between the pod and the VPA are updated correctly each time. func TestUpdatePodSelector(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addTestVpa(cluster) addTestPod(cluster) addTestContainer(cluster) @@ -456,7 +486,7 @@ func TestAddOrUpdateVPAPolicies(t *testing.T) { } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) addTestPod(cluster) addTestContainer(cluster) if tc.oldVpa != nil { @@ -487,7 +517,7 @@ func TestAddOrUpdateVPAPolicies(t *testing.T) { // Verify that two copies of the same AggregateStateKey are equal. func TestEqualAggregateStateKey(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) pod := addTestPod(cluster) key1 := cluster.MakeAggregateStateKey(pod, "container-1") key2 := cluster.MakeAggregateStateKey(pod, "container-1") @@ -502,7 +532,7 @@ func TestTwoPodsWithSameLabels(t *testing.T) { containerID1 := ContainerID{podID1, "foo-container"} containerID2 := ContainerID{podID2, "foo-container"} - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) cluster.AddOrUpdatePod(podID1, testLabels, apiv1.PodRunning) cluster.AddOrUpdatePod(podID2, testLabels, apiv1.PodRunning) cluster.AddOrUpdateContainer(containerID1, testRequest) @@ -519,7 +549,7 @@ func TestTwoPodsWithDifferentNamespaces(t *testing.T) { containerID1 := ContainerID{podID1, "foo-container"} containerID2 := ContainerID{podID2, "foo-container"} - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) cluster.AddOrUpdatePod(podID1, testLabels, apiv1.PodRunning) cluster.AddOrUpdatePod(podID2, testLabels, apiv1.PodRunning) cluster.AddOrUpdateContainer(containerID1, testRequest) @@ -534,7 +564,7 @@ func TestTwoPodsWithDifferentNamespaces(t *testing.T) { // Verifies that a VPA with an empty selector (matching all pods) matches a pod // with labels as well as a pod with no labels. func TestEmptySelector(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) // Create a VPA with an empty selector (matching all pods). vpa := addVpa(cluster, testVpaID, testAnnotations, "") // Create a pod with labels. Add a container. @@ -604,7 +634,7 @@ func TestRecordRecommendation(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addVpa(cluster, testVpaID, testAnnotations, testSelectorStr) cluster.Vpas[testVpaID].Recommendation = tc.recommendation if !tc.lastLogged.IsZero() { @@ -687,7 +717,7 @@ func TestGetActiveMatchingPods(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector) for _, pod := range tc.pods { cluster.AddOrUpdatePod(pod.id, pod.labels, pod.phase) @@ -761,7 +791,7 @@ func TestVPAWithMatchingPods(t *testing.T) { // Run with adding VPA first for _, tc := range cases { t.Run(tc.name+", VPA first", func(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector) for _, podDesc := range tc.pods { cluster.AddOrUpdatePod(podDesc.id, podDesc.labels, podDesc.phase) @@ -774,7 +804,7 @@ func TestVPAWithMatchingPods(t *testing.T) { // Run with adding Pods first for _, tc := range cases { t.Run(tc.name+", Pods first", func(t *testing.T) { - cluster := NewClusterState() + cluster := NewClusterState(testGcPeriod) for _, podDesc := range tc.pods { cluster.AddOrUpdatePod(podDesc.id, podDesc.labels, podDesc.phase) containerID := ContainerID{testPodID, "foo"} diff --git a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go index 241f9bb876..8179410b5a 100644 --- a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go +++ b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go @@ -57,8 +57,6 @@ type Recommender interface { // MaintainCheckpoints writes at least minCheckpoints if there are more checkpoints to write. // Checkpoints are written until ctx permits or all checkpoints are written. MaintainCheckpoints(ctx context.Context, minCheckpoints int) - // GarbageCollect removes old AggregateCollectionStates - GarbageCollect() } type recommender struct { @@ -165,14 +163,6 @@ func (r *recommender) MaintainCheckpoints(ctx context.Context, minCheckpointsPer } } -func (r *recommender) GarbageCollect() { - gcTime := time.Now() - if gcTime.Sub(r.lastAggregateContainerStateGC) > AggregateContainerStateGCInterval { - r.clusterState.GarbageCollectAggregateCollectionStates(gcTime) - r.lastAggregateContainerStateGC = gcTime - } -} - func (r *recommender) RunOnce() { timer := metrics_recommender.NewExecutionTimer() defer timer.ObserveTotal() @@ -199,7 +189,7 @@ func (r *recommender) RunOnce() { r.MaintainCheckpoints(ctx, *minCheckpointsPerRun) timer.ObserveStep("MaintainCheckpoints") - r.GarbageCollect() + r.clusterState.RateLimitedGarbageCollectAggregateCollectionStates(time.Now()) timer.ObserveStep("GarbageCollect") klog.V(3).Infof("ClusterState is tracking %d aggregated container states", r.clusterState.StateMapSize()) } @@ -239,7 +229,7 @@ func (c RecommenderFactory) Make() Recommender { // Dependencies are created automatically. // Deprecated; use RecommenderFactory instead. func NewRecommender(config *rest.Config, checkpointsGCInterval time.Duration, useCheckpoints bool, namespace string) Recommender { - clusterState := model.NewClusterState() + clusterState := model.NewClusterState(AggregateContainerStateGCInterval) return RecommenderFactory{ ClusterState: clusterState, ClusterStateFeeder: input.NewClusterStateFeeder(config, clusterState, *memorySaver, namespace),