Add expire time for nodeInfo cache items

This commit is contained in:
Yaroslava Serdiuk 2022-02-07 18:43:38 +00:00
parent 994fbac99f
commit a9a7d98f2c
7 changed files with 95 additions and 21 deletions

View File

@ -148,7 +148,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors {
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),

View File

@ -530,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
}
context.ExpanderStrategy = expander
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -691,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
assert.NoError(t, err)
nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
@ -732,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) {
assert.NoError(t, err)
nodes := []*apiv1.Node{n1}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
@ -799,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
assert.NoError(t, err)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -867,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0}
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)
@ -920,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2}
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
assert.NoError(t, err)

View File

@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@ -185,6 +186,7 @@ var (
emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.")
debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled")
nodeInfoCacheExpireTime = flag.Duration("node-info-cache-expire-time", 87600*time.Hour, "Node Info cache expire time for each item. Default value is 10 years.")
)
func createAutoscalingOptions() config.AutoscalingOptions {
@ -322,6 +324,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
}
opts.Processors = ca_processors.DefaultProcessors()
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime)
opts.Processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor()
nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator

View File

@ -35,18 +35,34 @@ import (
)
const stabilizationDelay = 1 * time.Minute
const maxCacheExpireTime = 87660 * time.Hour
type cacheItem struct {
*schedulerframework.NodeInfo
added time.Time
}
// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
type MixedTemplateNodeInfoProvider struct {
nodeInfoCache map[string]*schedulerframework.NodeInfo
nodeInfoCache map[string]cacheItem
ttl time.Duration
}
// NewMixedTemplateNodeInfoProvider returns a NodeInfoProvider processor building
// NodeInfos from real-world nodes when available, otherwise from node groups templates.
func NewMixedTemplateNodeInfoProvider() *MixedTemplateNodeInfoProvider {
return &MixedTemplateNodeInfoProvider{
nodeInfoCache: make(map[string]*schedulerframework.NodeInfo),
func NewMixedTemplateNodeInfoProvider(t *time.Duration) *MixedTemplateNodeInfoProvider {
ttl := maxCacheExpireTime
if t != nil {
ttl = *t
}
return &MixedTemplateNodeInfoProvider{
nodeInfoCache: make(map[string]cacheItem),
ttl: ttl,
}
}
func (p *MixedTemplateNodeInfoProvider) isCacheItemExpired(added time.Time) bool {
return time.Now().Sub(added) > p.ttl
}
// CleanUp cleans up processor's internal structures.
@ -102,7 +118,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
}
if added && p.nodeInfoCache != nil {
if nodeInfoCopy, err := utils.DeepCopyNodeInfo(result[id]); err == nil {
p.nodeInfoCache[id] = nodeInfoCopy
p.nodeInfoCache[id] = cacheItem{NodeInfo: nodeInfoCopy, added: time.Now()}
}
}
}
@ -115,8 +131,10 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
// No good template, check cache of previously running nodes.
if p.nodeInfoCache != nil {
if nodeInfo, found := p.nodeInfoCache[id]; found {
if nodeInfoCopy, err := utils.DeepCopyNodeInfo(nodeInfo); err == nil {
if cacheItem, found := p.nodeInfoCache[id]; found {
if p.isCacheItemExpired(cacheItem.added) {
delete(p.nodeInfoCache, id)
} else if nodeInfoCopy, err := utils.DeepCopyNodeInfo(cacheItem.NodeInfo); err == nil {
result[id] = nodeInfoCopy
continue
}

View File

@ -32,6 +32,10 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
var (
cacheTtl = 1 * time.Second
)
func TestGetNodeInfosForGroups(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
@ -81,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 5, len(res))
info, found := res["ng1"]
@ -108,7 +112,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
ListerRegistry: registry,
},
}
res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
}
@ -167,7 +171,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
ListerRegistry: registry,
},
}
niProcessor := NewMixedTemplateNodeInfoProvider()
niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl)
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
// Check results
@ -223,7 +227,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
// Fill cache manually
infoNg4Node6 := schedulerframework.NewNodeInfo()
infoNg4Node6.SetNode(ready6.DeepCopy())
niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6}
niProcessor.nodeInfoCache = map[string]cacheItem{"ng4": {NodeInfo: infoNg4Node6, added: now}}
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
// Check if cache was used
assert.NoError(t, err)
@ -236,6 +240,55 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
assertEqualNodeCapacities(t, ready6, info.Node())
}
func TestGetNodeInfosCacheExpired(t *testing.T) {
now := time.Now()
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, now.Add(-2*time.Minute))
// Cloud provider with TemplateNodeInfo not implemented.
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
predicateChecker, err := simulator.NewTestPredicateChecker()
assert.NoError(t, err)
ctx := context.AutoscalingContext{
CloudProvider: provider,
PredicateChecker: predicateChecker,
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: registry,
},
}
tn := BuildTestNode("tn", 5000, 5000)
tni := schedulerframework.NewNodeInfo()
tni.SetNode(tn)
// Cache expire time is set.
niProcessor1 := NewMixedTemplateNodeInfoProvider(&cacheTtl)
niProcessor1.nodeInfoCache = map[string]cacheItem{
"ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
"ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
}
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNode("ng1", ready1)
assert.Equal(t, 2, len(niProcessor1.nodeInfoCache))
_, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, len(niProcessor1.nodeInfoCache))
// Cache expire time isn't set.
niProcessor2 := NewMixedTemplateNodeInfoProvider(nil)
niProcessor2.nodeInfoCache = map[string]cacheItem{
"ng1": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
"ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)},
}
assert.Equal(t, 2, len(niProcessor2.nodeInfoCache))
_, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 2, len(niProcessor2.nodeInfoCache))
}
func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) {
t.Helper()
assert.NotEqual(t, actual.Status, nil, "")

View File

@ -37,6 +37,6 @@ type TemplateNodeInfoProvider interface {
}
// NewDefaultTemplateNodeInfoProvider returns a default TemplateNodeInfoProvider.
func NewDefaultTemplateNodeInfoProvider() TemplateNodeInfoProvider {
return NewMixedTemplateNodeInfoProvider()
func NewDefaultTemplateNodeInfoProvider(time *time.Duration) TemplateNodeInfoProvider {
return NewMixedTemplateNodeInfoProvider(time)
}

View File

@ -77,8 +77,8 @@ func DefaultProcessors() *AutoscalingProcessors {
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
}
}