Cache exemplar ready node for each node group

This commit is contained in:
Jacek Kaniuk 2019-02-01 14:17:42 +01:00
parent f054c53c46
commit d969baff22
5 changed files with 233 additions and 59 deletions

View File

@ -187,6 +187,14 @@ func (tcp *TestCloudProvider) AddAutoprovisionedNodeGroup(id string, min int, ma
return nodeGroup
}
// DeleteNodeGroup removes node group from test cloud provider.
func (tcp *TestCloudProvider) DeleteNodeGroup(id string) {
tcp.Lock()
defer tcp.Unlock()
delete(tcp.groups, id)
}
// AddNode adds the given node to the group.
func (tcp *TestCloudProvider) AddNode(nodeGroupId string, node *apiv1.Node) {
tcp.Lock()
@ -296,7 +304,11 @@ func (tng *TestNodeGroup) Create() (cloudprovider.NodeGroup, error) {
// Delete deletes the node group on the cloud provider side.
// This will be executed only for autoprovisioned node groups, once their size drops to 0.
func (tng *TestNodeGroup) Delete() error {
return tng.cloudProvider.onNodeGroupDelete(tng.id)
err := tng.cloudProvider.onNodeGroupDelete(tng.id)
if err == nil {
tng.cloudProvider.DeleteNodeGroup(tng.Id())
}
return err
}
// DecreaseTargetSize decreases the target size of the node group. This function

View File

@ -413,7 +413,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
}
context.ExpanderStrategy = expander
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -500,7 +500,7 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)
nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(
provider,
clusterstate.ClusterStateRegistryConfig{MaxNodeProvisionTime: 5 * time.Minute},
@ -546,7 +546,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
context := NewScaleTestAutoscalingContext(defaultOptions, &fake.Clientset{}, listers, provider)
nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(
provider,
clusterstate.ClusterStateRegistryConfig{
@ -600,7 +600,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)
nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
@ -639,7 +639,7 @@ func TestScaleUpNoHelp(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)
nodes := []*apiv1.Node{n1}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
@ -703,7 +703,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
@ -768,7 +768,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t}
nodes := []*apiv1.Node{}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, context.ListerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, context.ListerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)

View File

@ -67,6 +67,8 @@ type StaticAutoscaler struct {
scaleDown *ScaleDown
processors *ca_processors.AutoscalingProcessors
initialized bool
// Caches nodeInfo computed for previously seen nodes
nodeInfoCache map[string]*schedulercache.NodeInfo
}
// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
@ -99,6 +101,7 @@ func NewStaticAutoscaler(
scaleDown: scaleDown,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
nodeInfoCache: make(map[string]*schedulercache.NodeInfo),
}
}
@ -149,8 +152,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return errors.ToAutoscalerError(errors.ApiCallError, err)
}
nodeInfosForGroups, autoscalerError := getNodeInfosForGroups(readyNodes, autoscalingContext.CloudProvider, autoscalingContext.ListerRegistry,
daemonsets, autoscalingContext.PredicateChecker)
nodeInfosForGroups, autoscalerError := getNodeInfosForGroups(
readyNodes, a.nodeInfoCache, autoscalingContext.CloudProvider, autoscalingContext.ListerRegistry, daemonsets, autoscalingContext.PredicateChecker)
if err != nil {
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
}

View File

@ -18,12 +18,13 @@ package core
import (
"fmt"
"k8s.io/kubernetes/pkg/scheduler/util"
"math/rand"
"reflect"
"sort"
"time"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
@ -247,34 +248,35 @@ func checkPodsSchedulableOnNode(context *context.AutoscalingContext, pods []*api
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
//
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
func getNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
func getNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulercache.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
daemonsets []*appsv1.DaemonSet, predicateChecker *simulator.PredicateChecker) (map[string]*schedulercache.NodeInfo, errors.AutoscalerError) {
result := make(map[string]*schedulercache.NodeInfo)
seenGroups := make(map[string]bool)
// processNode returns information whether the nodeTemplate was generated and if there was an error.
processNode := func(node *apiv1.Node) (bool, errors.AutoscalerError) {
processNode := func(node *apiv1.Node) (bool, string, errors.AutoscalerError) {
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return false, errors.ToAutoscalerError(errors.CloudProviderError, err)
return false, "", errors.ToAutoscalerError(errors.CloudProviderError, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return false, nil
return false, "", nil
}
id := nodeGroup.Id()
if _, found := result[id]; !found {
// Build nodeInfo.
nodeInfo, err := simulator.BuildNodeInfoForNode(node, listers)
if err != nil {
return false, err
return false, "", err
}
sanitizedNodeInfo, err := sanitizeNodeInfo(nodeInfo, id)
if err != nil {
return false, err
return false, "", err
}
result[id] = sanitizedNodeInfo
return true, nil
return true, id, nil
}
return false, nil
return false, "", nil
}
for _, node := range nodes {
@ -282,17 +284,33 @@ func getNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou
if !kube_util.IsNodeReadyAndSchedulable(node) {
continue
}
_, typedErr := processNode(node)
added, id, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulercache.NodeInfo{}, typedErr
}
if added && nodeInfoCache != nil {
if nodeInfoCopy, err := deepCopyNodeInfo(result[id]); err == nil {
nodeInfoCache[id] = nodeInfoCopy
}
}
}
for _, nodeGroup := range cloudProvider.NodeGroups() {
id := nodeGroup.Id()
seenGroups[id] = true
if _, found := result[id]; found {
continue
}
// No good template, check cache of previously running nodes.
if nodeInfoCache != nil {
if nodeInfo, found := nodeInfoCache[id]; found {
if nodeInfoCopy, err := deepCopyNodeInfo(nodeInfo); err == nil {
result[id] = nodeInfoCopy
continue
}
}
}
// No good template, trying to generate one. This is called only if there are no
// working nodes in the node groups. By default CA tries to use a real-world example.
nodeInfo, err := getNodeInfoFromTemplate(nodeGroup, daemonsets, predicateChecker)
@ -307,11 +325,18 @@ func getNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou
result[id] = nodeInfo
}
// Remove invalid node groups from cache
for id := range nodeInfoCache {
if _, ok := seenGroups[id]; !ok {
delete(nodeInfoCache, id)
}
}
// Last resort - unready/unschedulable nodes.
for _, node := range nodes {
// Allowing broken nodes
if !kube_util.IsNodeReadyAndSchedulable(node) {
added, typedErr := processNode(node)
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulercache.NodeInfo{}, typedErr
}
@ -365,6 +390,20 @@ func filterOutNodesFromNotAutoscaledGroups(nodes []*apiv1.Node, cloudProvider cl
return result, nil
}
func deepCopyNodeInfo(nodeInfo *schedulercache.NodeInfo) (*schedulercache.NodeInfo, errors.AutoscalerError) {
newPods := make([]*apiv1.Pod, 0)
for _, pod := range nodeInfo.Pods() {
newPods = append(newPods, pod.DeepCopy())
}
// Build a new node info.
newNodeInfo := schedulercache.NewNodeInfo(newPods...)
if err := newNodeInfo.SetNode(nodeInfo.Node().DeepCopy()); err != nil {
return nil, errors.ToAutoscalerError(errors.InternalError, err)
}
return newNodeInfo, nil
}
func sanitizeNodeInfo(nodeInfo *schedulercache.NodeInfo, nodeGroupName string) (*schedulercache.NodeInfo, errors.AutoscalerError) {
// Sanitize node name.
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName)

View File

@ -379,70 +379,190 @@ func TestFilterSchedulablePodsForNode(t *testing.T) {
}
func TestGetNodeInfosForGroups(t *testing.T) {
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 1000, 1000)
SetNodeReadyState(n2, true, time.Now())
n3 := BuildTestNode("n3", 1000, 1000)
SetNodeReadyState(n3, false, time.Now())
n4 := BuildTestNode("n4", 1000, 1000)
SetNodeReadyState(n4, false, time.Now())
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, time.Now())
ready2 := BuildTestNode("n2", 2000, 2000)
SetNodeReadyState(ready2, true, time.Now())
unready3 := BuildTestNode("n3", 3000, 3000)
SetNodeReadyState(unready3, false, time.Now())
unready4 := BuildTestNode("n4", 4000, 4000)
SetNodeReadyState(unready4, false, time.Now())
p1 := BuildTestPod("p1", 80, 0)
p2 := BuildTestPod("p2", 800, 0)
p3 := BuildTestPod("p3", 800, 0)
p1.Spec.NodeName = "n1"
p2.Spec.NodeName = "n2"
p3.Spec.NodeName = "n4"
tn := BuildTestNode("T1-abc", 4000, 1000000)
tn := BuildTestNode("tn", 5000, 5000)
tni := schedulercache.NewNodeInfo()
tni.SetNode(tn)
// Cloud provider with TemplateNodeInfo implemented.
provider1 := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
nil, map[string]*schedulercache.NodeInfo{"n3": tni, "n4": tni})
provider1.AddNodeGroup("n1", 1, 10, 1) // Nodegroup with ready node.
provider1.AddNodeGroup("n2", 1, 10, 1) // Nodegroup with ready and unready node.
provider1.AddNodeGroup("n3", 1, 10, 1) // Nodegroup with unready node.
provider1.AddNodeGroup("n4", 0, 1000, 0) // Nodegroup without nodes.
provider1.AddNode("n1", n1)
provider1.AddNode("n2", n2)
provider1.AddNode("n2", n3)
provider1.AddNode("n3", n4)
provider1 := testprovider.NewTestAutoprovisioningCloudProvider(
nil, nil, nil, nil, nil,
map[string]*schedulercache.NodeInfo{"ng3": tni, "ng4": tni})
provider1.AddNodeGroup("ng1", 1, 10, 1) // Nodegroup with ready node.
provider1.AddNode("ng1", ready1)
provider1.AddNodeGroup("ng2", 1, 10, 1) // Nodegroup with ready and unready node.
provider1.AddNode("ng2", ready2)
provider1.AddNode("ng2", unready3)
provider1.AddNodeGroup("ng3", 1, 10, 1) // Nodegroup with unready node.
provider1.AddNode("ng3", unready4)
provider1.AddNodeGroup("ng4", 0, 1000, 0) // Nodegroup without nodes.
// Cloud provider with TemplateNodeInfo not implemented.
provider2 := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil,
nil, nil,
nil, nil)
provider2.AddNodeGroup("n5", 1, 10, 1) // Nodegroup without nodes.
provider2 := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil)
provider2.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup without nodes.
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
predicateChecker := simulator.NewTestPredicateChecker()
res, err := getNodeInfosForGroups([]*apiv1.Node{n1, n2, n3, n4}, provider1, registry,
[]*appsv1.DaemonSet{}, predicateChecker)
res, err := getNodeInfosForGroups([]*apiv1.Node{unready4, unready3, ready2, ready1}, nil,
provider1, registry, []*appsv1.DaemonSet{}, predicateChecker)
assert.NoError(t, err)
assert.Equal(t, 4, len(res))
_, found := res["n1"]
info, found := res["ng1"]
assert.True(t, found)
_, found = res["n2"]
assertEqualNodeCapacities(t, ready1, info.Node())
info, found = res["ng2"]
assert.True(t, found)
_, found = res["n3"]
assertEqualNodeCapacities(t, ready2, info.Node())
info, found = res["ng3"]
assert.True(t, found)
_, found = res["n4"]
assertEqualNodeCapacities(t, tn, info.Node())
info, found = res["ng4"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())
// Test for a nodegroup without nodes and TemplateNodeInfo not implemented by cloud proivder
res, err = getNodeInfosForGroups([]*apiv1.Node{}, provider2, registry,
res, err = getNodeInfosForGroups([]*apiv1.Node{}, nil, provider2, registry,
[]*appsv1.DaemonSet{}, predicateChecker)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
}
func TestGetNodeInfosForGroupsCache(t *testing.T) {
ready1 := BuildTestNode("n1", 1000, 1000)
SetNodeReadyState(ready1, true, time.Now())
ready2 := BuildTestNode("n2", 2000, 2000)
SetNodeReadyState(ready2, true, time.Now())
unready3 := BuildTestNode("n3", 3000, 3000)
SetNodeReadyState(unready3, false, time.Now())
unready4 := BuildTestNode("n4", 4000, 4000)
SetNodeReadyState(unready4, false, time.Now())
ready5 := BuildTestNode("n5", 5000, 5000)
SetNodeReadyState(ready5, true, time.Now())
ready6 := BuildTestNode("n6", 6000, 6000)
SetNodeReadyState(ready6, true, time.Now())
tn := BuildTestNode("tn", 10000, 10000)
tni := schedulercache.NewNodeInfo()
tni.SetNode(tn)
lastDeletedGroup := ""
onDeleteGroup := func(id string) error {
lastDeletedGroup = id
return nil
}
// Cloud provider with TemplateNodeInfo implemented.
provider1 := testprovider.NewTestAutoprovisioningCloudProvider(
nil, nil, nil, onDeleteGroup, nil,
map[string]*schedulercache.NodeInfo{"ng3": tni, "ng4": tni})
provider1.AddNodeGroup("ng1", 1, 10, 1) // Nodegroup with ready node.
provider1.AddNode("ng1", ready1)
provider1.AddNodeGroup("ng2", 1, 10, 1) // Nodegroup with ready and unready node.
provider1.AddNode("ng2", ready2)
provider1.AddNode("ng2", unready3)
provider1.AddNodeGroup("ng3", 1, 10, 1) // Nodegroup with unready node (and 1 previously ready node).
provider1.AddNode("ng3", unready4)
provider1.AddNode("ng3", ready5)
provider1.AddNodeGroup("ng4", 0, 1000, 0) // Nodegroup without nodes (and 1 previously ready node).
provider1.AddNode("ng4", ready6)
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
predicateChecker := simulator.NewTestPredicateChecker()
nodeInfoCache := make(map[string]*schedulercache.NodeInfo)
// Fill cache
res, err := getNodeInfosForGroups([]*apiv1.Node{unready4, unready3, ready2, ready1}, nodeInfoCache,
provider1, registry, []*appsv1.DaemonSet{}, predicateChecker)
assert.NoError(t, err)
// Check results
assert.Equal(t, 4, len(res))
info, found := res["ng1"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready1, info.Node())
info, found = res["ng2"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready2, info.Node())
info, found = res["ng3"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())
info, found = res["ng4"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())
// Check cache
cachedInfo, found := nodeInfoCache["ng1"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready1, cachedInfo.Node())
cachedInfo, found = nodeInfoCache["ng2"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready2, cachedInfo.Node())
cachedInfo, found = nodeInfoCache["ng3"]
assert.False(t, found)
cachedInfo, found = nodeInfoCache["ng4"]
assert.False(t, found)
// Invalidate part of cache in two different ways
provider1.DeleteNodeGroup("ng1")
provider1.GetNodeGroup("ng3").Delete()
assert.Equal(t, "ng3", lastDeletedGroup)
// Check cache with all nodes removed
res, err = getNodeInfosForGroups([]*apiv1.Node{}, nodeInfoCache,
provider1, registry, []*appsv1.DaemonSet{}, predicateChecker)
assert.NoError(t, err)
// Check results
assert.Equal(t, 2, len(res))
info, found = res["ng2"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready2, info.Node())
// Check ng4 result and cache
info, found = res["ng4"]
assert.True(t, found)
assertEqualNodeCapacities(t, tn, info.Node())
// Check cache
cachedInfo, found = nodeInfoCache["ng2"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready2, cachedInfo.Node())
cachedInfo, found = nodeInfoCache["ng4"]
assert.False(t, found)
// Fill cache manually
infoNg4Node6 := schedulercache.NewNodeInfo()
err2 := infoNg4Node6.SetNode(ready6.DeepCopy())
assert.NoError(t, err2)
nodeInfoCache = map[string]*schedulercache.NodeInfo{"ng4": infoNg4Node6}
// Check if cache was used
res, err = getNodeInfosForGroups([]*apiv1.Node{ready1, ready2}, nodeInfoCache,
provider1, registry, []*appsv1.DaemonSet{}, predicateChecker)
assert.NoError(t, err)
assert.Equal(t, 2, len(res))
info, found = res["ng2"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready2, info.Node())
info, found = res["ng4"]
assert.True(t, found)
assertEqualNodeCapacities(t, ready6, info.Node())
}
func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) {
t.Helper()
assert.Equal(t, getNodeResource(expected, apiv1.ResourceCPU), getNodeResource(actual, apiv1.ResourceCPU), "CPU should be the same")
assert.Equal(t, getNodeResource(expected, apiv1.ResourceMemory), getNodeResource(actual, apiv1.ResourceMemory), "Memory should be the same")
}
func TestRemoveOldUnregisteredNodes(t *testing.T) {
deletedNodes := make(chan string, 10)