Merge pull request #4302 from jbartosik/cluster-state-self-cleanup

Rate limit garbage collection inside `ClusterState`
This commit is contained in:
Kubernetes Prow Robot 2021-09-16 03:27:45 -07:00 committed by GitHub
commit 68a7d27b84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 62 deletions

View File

@ -42,6 +42,8 @@ var (
}
)
const testGcPeriod = time.Minute
func addVpa(t *testing.T, cluster *model.ClusterState, vpaID model.VpaID, selector string) *model.Vpa {
var apiObject vpa_types.VerticalPodAutoscaler
apiObject.Namespace = vpaID.Namespace
@ -56,7 +58,7 @@ func addVpa(t *testing.T, cluster *model.ClusterState, vpaID model.VpaID, select
}
func TestMergeContainerStateForCheckpointDropsRecentMemoryPeak(t *testing.T) {
cluster := model.NewClusterState()
cluster := model.NewClusterState(testGcPeriod)
cluster.AddOrUpdatePod(testPodID1, testLabels, v1.PodRunning)
assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID1, testRequest))
container := cluster.GetContainer(testContainerID1)

View File

@ -62,11 +62,12 @@ var (
)
const (
kind = "dodokind"
name1 = "dotaro"
name2 = "doseph"
namespace = "testNamespace"
apiVersion = "stardust"
kind = "dodokind"
name1 = "dotaro"
name2 = "doseph"
namespace = "testNamespace"
apiVersion = "stardust"
testGcPeriod = time.Minute
)
func TestLoadPods(t *testing.T) {
@ -208,7 +209,7 @@ func TestLoadPods(t *testing.T) {
targetSelectorFetcher := target_mock.NewMockVpaTargetSelectorFetcher(ctrl)
clusterState := model.NewClusterState()
clusterState := model.NewClusterState(testGcPeriod)
clusterStateFeeder := clusterStateFeeder{
vpaLister: vpaLister,
@ -323,7 +324,7 @@ func TestClusterStateFeeder_LoadPods(t *testing.T) {
},
} {
t.Run(tc.Name, func(t *testing.T) {
clusterState := model.NewClusterState()
clusterState := model.NewClusterState(testGcPeriod)
for i, selector := range tc.VPALabelSelectors {
vpaLabel, err := labels.Parse(selector)
assert.NoError(t, err)

View File

@ -72,7 +72,7 @@ func addTestMemorySample(cluster *ClusterState, container ContainerID, memoryByt
// container CPU and memory peak histograms, grouping the two containers
// with the same name ('app-A') together.
func TestAggregateStateByContainerName(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
cluster.AddOrUpdatePod(testPodID1, testLabels, apiv1.PodRunning)
otherLabels := labels.Set{"label-2": "value-2"}
cluster.AddOrUpdatePod(testPodID2, otherLabels, apiv1.PodRunning)

View File

@ -54,6 +54,9 @@ type ClusterState struct {
// Map with all label sets used by the aggregations. It serves as a cache
// that allows to quickly access labels.Set corresponding to a labelSetKey.
labelSetMap labelSetMap
lastAggregateContainerStateGC time.Time
gcInterval time.Duration
}
// StateMapSize is the number of pods being tracked by the VPA
@ -92,13 +95,15 @@ type PodState struct {
}
// NewClusterState returns a new ClusterState with no pods.
func NewClusterState() *ClusterState {
func NewClusterState(gcInterval time.Duration) *ClusterState {
return &ClusterState{
Pods: make(map[PodID]*PodState),
Vpas: make(map[VpaID]*Vpa),
EmptyVPAs: make(map[VpaID]time.Time),
aggregateStateMap: make(aggregateContainerStatesMap),
labelSetMap: make(labelSetMap),
Pods: make(map[PodID]*PodState),
Vpas: make(map[VpaID]*Vpa),
EmptyVPAs: make(map[VpaID]time.Time),
aggregateStateMap: make(aggregateContainerStatesMap),
labelSetMap: make(labelSetMap),
lastAggregateContainerStateGC: time.Unix(0, 0),
gcInterval: gcInterval,
}
}
@ -343,12 +348,7 @@ func (cluster *ClusterState) findOrCreateAggregateContainerState(containerID Con
return aggregateContainerState
}
// GarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more active pods that can contribute,
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Time) {
func (cluster *ClusterState) garbageCollectAggregateCollectionStates(now time.Time) {
klog.V(1).Info("Garbage collection of AggregateCollectionStates triggered")
keysToDelete := make([]AggregateStateKey, 0)
activeKeys := cluster.getActiveAggregateStateKeys()
@ -372,6 +372,20 @@ func (cluster *ClusterState) GarbageCollectAggregateCollectionStates(now time.Ti
}
}
// RateLimitedGarbageCollectAggregateCollectionStates removes obsolete AggregateCollectionStates from the ClusterState.
// It performs clean up only if more than `gcInterval` passed since the last time it performed a clean up.
// AggregateCollectionState is obsolete in following situations:
// 1) It has no samples and there are no more active pods that can contribute,
// 2) The last sample is too old to give meaningful recommendation (>8 days),
// 3) There are no samples and the aggregate state was created >8 days ago.
func (cluster *ClusterState) RateLimitedGarbageCollectAggregateCollectionStates(now time.Time) {
if now.Sub(cluster.lastAggregateContainerStateGC) < cluster.gcInterval {
return
}
cluster.garbageCollectAggregateCollectionStates(now)
cluster.lastAggregateContainerStateGC = now
}
func (cluster *ClusterState) getActiveAggregateStateKeys() map[AggregateStateKey]bool {
activeKeys := map[AggregateStateKey]bool{}
for _, pod := range cluster.Pods {

View File

@ -42,6 +42,8 @@ var (
testSelectorStr = "label-1 = value-1"
)
const testGcPeriod = time.Minute
func makeTestUsageSample() *ContainerUsageSampleWithKey {
return &ContainerUsageSampleWithKey{ContainerUsageSample{
MeasureStart: testTimestamp,
@ -53,7 +55,7 @@ func makeTestUsageSample() *ContainerUsageSampleWithKey {
func TestClusterAddSample(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
cluster.AddOrUpdatePod(testPodID, testLabels, apiv1.PodRunning)
assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest))
@ -67,7 +69,7 @@ func TestClusterAddSample(t *testing.T) {
func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
addTestPod(cluster)
@ -81,7 +83,7 @@ func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) {
assert.NotEmpty(t, vpa.aggregateContainerStates)
// AggegateContainerState are valid for 8 days since last sample
cluster.GarbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(9 * 24 * time.Hour))
cluster.garbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(9 * 24 * time.Hour))
// AggegateContainerState should be deleted from both cluster and vpa
assert.Empty(t, cluster.aggregateStateMap)
@ -90,7 +92,7 @@ func TestClusterGCAggregateContainerStateDeletesOld(t *testing.T) {
func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
addTestPod(cluster)
@ -107,12 +109,12 @@ func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) {
}
// Verify empty aggregate states are not removed right away.
cluster.GarbageCollectAggregateCollectionStates(creationTime.Add(1 * time.Minute)) // AggegateContainerState should be deleted from both cluster and vpa
cluster.garbageCollectAggregateCollectionStates(creationTime.Add(1 * time.Minute)) // AggegateContainerState should be deleted from both cluster and vpa
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)
// AggegateContainerState are valid for 8 days since creation
cluster.GarbageCollectAggregateCollectionStates(creationTime.Add(9 * 24 * time.Hour))
cluster.garbageCollectAggregateCollectionStates(creationTime.Add(9 * 24 * time.Hour))
// AggegateContainerState should be deleted from both cluster and vpa
assert.Empty(t, cluster.aggregateStateMap)
@ -121,7 +123,7 @@ func TestClusterGCAggregateContainerStateDeletesOldEmpty(t *testing.T) {
func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
pod := addTestPod(cluster)
@ -131,14 +133,14 @@ func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) {
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)
cluster.GarbageCollectAggregateCollectionStates(testTimestamp)
cluster.garbageCollectAggregateCollectionStates(testTimestamp)
// AggegateContainerState should not be deleted as the pod is still active.
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)
cluster.Pods[pod.ID].Phase = apiv1.PodSucceeded
cluster.GarbageCollectAggregateCollectionStates(testTimestamp)
cluster.garbageCollectAggregateCollectionStates(testTimestamp)
// AggegateContainerState should be empty as the pod is no longer active and
// there are no usage samples.
@ -148,7 +150,7 @@ func TestClusterGCAggregateContainerStateDeletesEmptyInactive(t *testing.T) {
func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
addTestPod(cluster)
@ -162,7 +164,7 @@ func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) {
assert.NotEmpty(t, vpa.aggregateContainerStates)
// AggegateContainerState are valid for 8 days since last sample
cluster.GarbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(7 * 24 * time.Hour))
cluster.garbageCollectAggregateCollectionStates(usageSample.MeasureStart.Add(7 * 24 * time.Hour))
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)
@ -170,7 +172,7 @@ func TestClusterGCAggregateContainerStateLeavesValid(t *testing.T) {
func TestAddSampleAfterAggregateContainerStateGCed(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
pod := addTestPod(cluster)
addTestContainer(cluster)
@ -189,7 +191,7 @@ func TestAddSampleAfterAggregateContainerStateGCed(t *testing.T) {
// AggegateContainerState are invalid after 8 days since last sample
gcTimestamp := usageSample.MeasureStart.Add(10 * 24 * time.Hour)
cluster.GarbageCollectAggregateCollectionStates(gcTimestamp)
cluster.garbageCollectAggregateCollectionStates(gcTimestamp)
assert.Empty(t, cluster.aggregateStateMap)
assert.Empty(t, vpa.aggregateContainerStates)
@ -205,12 +207,40 @@ func TestAddSampleAfterAggregateContainerStateGCed(t *testing.T) {
assert.NoError(t, cluster.AddSample(newUsageSample))
assert.Contains(t, vpa.aggregateContainerStates, aggregateStateKey)
}
func TestClusterGCRateLimiting(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState(testGcPeriod)
usageSample := makeTestUsageSample()
sampleExpireTime := usageSample.MeasureStart.Add(9 * 24 * time.Hour)
// AggegateContainerState are valid for 8 days since last sample but this run
// doesn't remove the sample, because we didn't add it yet.
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime)
vpa := addTestVpa(cluster)
addTestPod(cluster)
assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest))
// Add a usage sample to the container.
assert.NoError(t, cluster.AddSample(usageSample))
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)
// Sample is expired but this run doesn't remove it yet, because less than testGcPeriod
// elapsed since the previous run.
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime.Add(testGcPeriod / 2))
assert.NotEmpty(t, cluster.aggregateStateMap)
assert.NotEmpty(t, vpa.aggregateContainerStates)
// AggegateContainerState should be deleted from both cluster and vpa
cluster.RateLimitedGarbageCollectAggregateCollectionStates(sampleExpireTime.Add(2 * testGcPeriod))
assert.Empty(t, cluster.aggregateStateMap)
assert.Empty(t, vpa.aggregateContainerStates)
}
func TestClusterRecordOOM(t *testing.T) {
// Create a pod with a single container.
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
cluster.AddOrUpdatePod(testPodID, testLabels, apiv1.PodRunning)
assert.NoError(t, cluster.AddOrUpdateContainer(testContainerID, testRequest))
@ -225,7 +255,7 @@ func TestClusterRecordOOM(t *testing.T) {
// Verifies that AddSample and AddOrUpdateContainer methods return a proper
// KeyError when referring to a non-existent pod.
func TestMissingKeys(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
err := cluster.AddSample(makeTestUsageSample())
assert.EqualError(t, err, "KeyError: {namespace-1 pod-1}")
@ -269,7 +299,7 @@ func addTestContainer(cluster *ClusterState) *ContainerState {
// Creates a VPA followed by a matching pod. Verifies that the links between
// VPA, the container and the aggregation are set correctly.
func TestAddVpaThenAddPod(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
assert.Empty(t, vpa.aggregateContainerStates)
addTestPod(cluster)
@ -281,7 +311,7 @@ func TestAddVpaThenAddPod(t *testing.T) {
// Creates a pod followed by a matching VPA. Verifies that the links between
// VPA, the container and the aggregation are set correctly.
func TestAddPodThenAddVpa(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
addTestPod(cluster)
addTestContainer(cluster)
vpa := addTestVpa(cluster)
@ -293,7 +323,7 @@ func TestAddPodThenAddVpa(t *testing.T) {
// no longer matched by the VPA. Verifies that the links between the pod and the
// VPA are removed.
func TestChangePodLabels(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
addTestPod(cluster)
addTestContainer(cluster)
@ -307,7 +337,7 @@ func TestChangePodLabels(t *testing.T) {
// Creates a VPA and verifies that annotation updates work properly.
func TestUpdateAnnotations(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
// Verify that the annotations match the test annotations.
assert.Equal(t, vpa.Annotations, testAnnotations)
@ -326,7 +356,7 @@ func TestUpdateAnnotations(t *testing.T) {
// the pod, finally such that it matches the pod again. Verifies that the links
// between the pod and the VPA are updated correctly each time.
func TestUpdatePodSelector(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addTestVpa(cluster)
addTestPod(cluster)
addTestContainer(cluster)
@ -456,7 +486,7 @@ func TestAddOrUpdateVPAPolicies(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
addTestPod(cluster)
addTestContainer(cluster)
if tc.oldVpa != nil {
@ -487,7 +517,7 @@ func TestAddOrUpdateVPAPolicies(t *testing.T) {
// Verify that two copies of the same AggregateStateKey are equal.
func TestEqualAggregateStateKey(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
pod := addTestPod(cluster)
key1 := cluster.MakeAggregateStateKey(pod, "container-1")
key2 := cluster.MakeAggregateStateKey(pod, "container-1")
@ -502,7 +532,7 @@ func TestTwoPodsWithSameLabels(t *testing.T) {
containerID1 := ContainerID{podID1, "foo-container"}
containerID2 := ContainerID{podID2, "foo-container"}
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
cluster.AddOrUpdatePod(podID1, testLabels, apiv1.PodRunning)
cluster.AddOrUpdatePod(podID2, testLabels, apiv1.PodRunning)
cluster.AddOrUpdateContainer(containerID1, testRequest)
@ -519,7 +549,7 @@ func TestTwoPodsWithDifferentNamespaces(t *testing.T) {
containerID1 := ContainerID{podID1, "foo-container"}
containerID2 := ContainerID{podID2, "foo-container"}
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
cluster.AddOrUpdatePod(podID1, testLabels, apiv1.PodRunning)
cluster.AddOrUpdatePod(podID2, testLabels, apiv1.PodRunning)
cluster.AddOrUpdateContainer(containerID1, testRequest)
@ -534,7 +564,7 @@ func TestTwoPodsWithDifferentNamespaces(t *testing.T) {
// Verifies that a VPA with an empty selector (matching all pods) matches a pod
// with labels as well as a pod with no labels.
func TestEmptySelector(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
// Create a VPA with an empty selector (matching all pods).
vpa := addVpa(cluster, testVpaID, testAnnotations, "")
// Create a pod with labels. Add a container.
@ -604,7 +634,7 @@ func TestRecordRecommendation(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addVpa(cluster, testVpaID, testAnnotations, testSelectorStr)
cluster.Vpas[testVpaID].Recommendation = tc.recommendation
if !tc.lastLogged.IsZero() {
@ -687,7 +717,7 @@ func TestGetActiveMatchingPods(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector)
for _, pod := range tc.pods {
cluster.AddOrUpdatePod(pod.id, pod.labels, pod.phase)
@ -761,7 +791,7 @@ func TestVPAWithMatchingPods(t *testing.T) {
// Run with adding VPA first
for _, tc := range cases {
t.Run(tc.name+", VPA first", func(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
vpa := addVpa(cluster, testVpaID, testAnnotations, tc.vpaSelector)
for _, podDesc := range tc.pods {
cluster.AddOrUpdatePod(podDesc.id, podDesc.labels, podDesc.phase)
@ -774,7 +804,7 @@ func TestVPAWithMatchingPods(t *testing.T) {
// Run with adding Pods first
for _, tc := range cases {
t.Run(tc.name+", Pods first", func(t *testing.T) {
cluster := NewClusterState()
cluster := NewClusterState(testGcPeriod)
for _, podDesc := range tc.pods {
cluster.AddOrUpdatePod(podDesc.id, podDesc.labels, podDesc.phase)
containerID := ContainerID{testPodID, "foo"}

View File

@ -57,8 +57,6 @@ type Recommender interface {
// MaintainCheckpoints writes at least minCheckpoints if there are more checkpoints to write.
// Checkpoints are written until ctx permits or all checkpoints are written.
MaintainCheckpoints(ctx context.Context, minCheckpoints int)
// GarbageCollect removes old AggregateCollectionStates
GarbageCollect()
}
type recommender struct {
@ -165,14 +163,6 @@ func (r *recommender) MaintainCheckpoints(ctx context.Context, minCheckpointsPer
}
}
func (r *recommender) GarbageCollect() {
gcTime := time.Now()
if gcTime.Sub(r.lastAggregateContainerStateGC) > AggregateContainerStateGCInterval {
r.clusterState.GarbageCollectAggregateCollectionStates(gcTime)
r.lastAggregateContainerStateGC = gcTime
}
}
func (r *recommender) RunOnce() {
timer := metrics_recommender.NewExecutionTimer()
defer timer.ObserveTotal()
@ -199,7 +189,7 @@ func (r *recommender) RunOnce() {
r.MaintainCheckpoints(ctx, *minCheckpointsPerRun)
timer.ObserveStep("MaintainCheckpoints")
r.GarbageCollect()
r.clusterState.RateLimitedGarbageCollectAggregateCollectionStates(time.Now())
timer.ObserveStep("GarbageCollect")
klog.V(3).Infof("ClusterState is tracking %d aggregated container states", r.clusterState.StateMapSize())
}
@ -239,7 +229,7 @@ func (c RecommenderFactory) Make() Recommender {
// Dependencies are created automatically.
// Deprecated; use RecommenderFactory instead.
func NewRecommender(config *rest.Config, checkpointsGCInterval time.Duration, useCheckpoints bool, namespace string) Recommender {
clusterState := model.NewClusterState()
clusterState := model.NewClusterState(AggregateContainerStateGCInterval)
return RecommenderFactory{
ClusterState: clusterState,
ClusterStateFeeder: input.NewClusterStateFeeder(config, clusterState, *memorySaver, namespace),