From d5229046ffd6890b86b574c84944468c7660e90a Mon Sep 17 00:00:00 2001 From: Marcin Wielgus Date: Wed, 28 Dec 2016 12:17:12 +0100 Subject: [PATCH] Cluster-autoscaler: cluster status registry --- .../clusterstate/clusterstate.go | 340 ++++++++++++++++++ .../clusterstate/clusterstate_test.go | 205 +++++++++++ 2 files changed, 545 insertions(+) create mode 100644 cluster-autoscaler/clusterstate/clusterstate.go create mode 100644 cluster-autoscaler/clusterstate/clusterstate_test.go diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go new file mode 100644 index 0000000000..6bf1ff9383 --- /dev/null +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -0,0 +1,340 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterstate + +import ( + "fmt" + "reflect" + "sync" + "time" + + "k8s.io/contrib/cluster-autoscaler/cloudprovider" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + + "github.com/golang/glog" +) + +const ( + // MaxNodeStartupTime is the maximum time from the moment the node is registered to the time the node is ready. + MaxNodeStartupTime = 3 * time.Minute +) + +// ScaleUpRequest contains information about the requested node group scale up. +type ScaleUpRequest struct { + // NodeGroupName is the node group to be scaled up. + NodeGroupName string + // Time is the time when the request was submitted. + Time time.Time + // ExpectedAddTime is the time at which the request should be fulfilled. + ExpectedAddTime time.Time + // NodeTemplate is the template of the node that will appear due to this request. + NodeTemplate *schedulercache.NodeInfo + // How much the node group is increased. + Increase int +} + +// ScaleDownRequest contains information about the requested node deletion. +type ScaleDownRequest struct { + // NodeName is the name of the node to be deleted. + NodeName string + // NodeGroupName is the node group of the deleted node. + NodeGroupName string + // Time is the time when the node deletion was requested. + Time time.Time + // ExpectedDeleteTime is the time when the node is excpected to be deleted. + ExpectedDeleteTime time.Time +} + +// ClusterStateRegistryConfig contains configuration information for ClusterStateRegistry. +type ClusterStateRegistryConfig struct { + // Maximum percentage of unready nodes in total in, if the number is higher than OkTotalUnreadyCount + MaxTotalUnreadyPercentage float64 + // Number of nodes that can be unready in total. If the number is higer than that then MaxTotalUnreadyPercentage applies. + OkTotalUnreadyCount int +} + +// ClusterStateRegistry is a structure to keep track the current state of the cluster. +type ClusterStateRegistry struct { + sync.Mutex + config ClusterStateRegistryConfig + scaleUpRequests []*ScaleUpRequest + scaleDownRequests []*ScaleDownRequest + nodes []*apiv1.Node + cloudProvider cloudprovider.CloudProvider + perNodeGroupReadiness map[string]Readiness + totalReadiness Readiness + acceptableRanges map[string]AcceptableRange +} + +// NewClusterStateRegistry creates new ClusterStateRegistry. +func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config ClusterStateRegistryConfig) *ClusterStateRegistry { + return &ClusterStateRegistry{ + scaleUpRequests: make([]*ScaleUpRequest, 0), + scaleDownRequests: make([]*ScaleDownRequest, 0), + nodes: make([]*apiv1.Node, 0), + cloudProvider: cloudProvider, + config: config, + perNodeGroupReadiness: make(map[string]Readiness), + acceptableRanges: make(map[string]AcceptableRange), + } +} + +// RegisterScaleUp registers scale up. +func (csr *ClusterStateRegistry) RegisterScaleUp(request *ScaleUpRequest) { + csr.Lock() + defer csr.Unlock() + csr.scaleUpRequests = append(csr.scaleUpRequests, request) +} + +// RegisterScaleDown registers node scale down. +func (csr *ClusterStateRegistry) RegisterScaleDown(request *ScaleDownRequest) { + csr.Lock() + defer csr.Unlock() + csr.scaleDownRequests = append(csr.scaleDownRequests, request) +} + +// To be executed under a lock. +func (csr *ClusterStateRegistry) cleanUp(currentTime time.Time) { + newSur := make([]*ScaleUpRequest, 0) + for _, sur := range csr.scaleUpRequests { + if sur.ExpectedAddTime.After(currentTime) { + newSur = append(newSur, sur) + } + } + csr.scaleUpRequests = newSur + + newSdr := make([]*ScaleDownRequest, 0) + for _, sdr := range csr.scaleDownRequests { + if sdr.ExpectedDeleteTime.After(currentTime) { + newSdr = append(newSdr, sdr) + } + } + csr.scaleDownRequests = newSdr +} + +// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the statss +func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime time.Time) error { + targetSizes, err := getTargetSizes(csr.cloudProvider) + if err != nil { + return err + } + + csr.Lock() + defer csr.Unlock() + + csr.cleanUp(currentTime) + csr.nodes = nodes + csr.perNodeGroupReadiness, csr.totalReadiness = csr.calculateReadinessStats(currentTime) + csr.acceptableRanges = csr.calculateAcceptableRanges(targetSizes) + return nil +} + +// getTargetSizes gets target sizes of node groups. +func getTargetSizes(cp cloudprovider.CloudProvider) (map[string]int, error) { + result := make(map[string]int) + for _, ng := range cp.NodeGroups() { + size, err := ng.TargetSize() + if err != nil { + return map[string]int{}, err + } + result[ng.Id()] = size + } + return result, nil +} + +// IsClusterHealthy returns true if the cluster health is within the acceptable limits +func (csr *ClusterStateRegistry) IsClusterHealthy(currentTime time.Time) bool { + csr.Lock() + defer csr.Unlock() + + totalUnready := csr.totalReadiness.Unready + csr.totalReadiness.LongNotStarted + + if totalUnready > csr.config.OkTotalUnreadyCount && + float64(totalUnready) > csr.config.MaxTotalUnreadyPercentage/100.0*float64(len(csr.nodes)) { + return false + } + + return true +} + +// IsNodeGroupHealthy returns true if the node group health is within the acceptable limits +func (csr *ClusterStateRegistry) IsNodeGroupHealthy(nodeGroupName string) bool { + readiness, found := csr.perNodeGroupReadiness[nodeGroupName] + if !found { + glog.Warningf("Failed to find readiness information for %v", nodeGroupName) + return false + } + acceptable, found := csr.acceptableRanges[nodeGroupName] + if !found { + glog.Warningf("Failed to find acceptable ranges for %v", nodeGroupName) + return false + } + + unjustifiedUnready := 0 + // Too few nodes, something is missing. Below the expected node count. + if readiness.Ready < acceptable.MinNodes { + unjustifiedUnready += acceptable.MinNodes - readiness.Ready + } + // TODO: verify against maxnodes as well. + + glog.V(2).Infof("NodeGroupHealth %s: ready=%d, acceptable min=%d max=%d target=%d", + nodeGroupName, + readiness.Ready, + acceptable.MinNodes, + acceptable.MaxNodes, + acceptable.CurrentTarget, + ) + + if unjustifiedUnready > csr.config.OkTotalUnreadyCount && + float64(unjustifiedUnready) > csr.config.MaxTotalUnreadyPercentage/100.0* + float64(readiness.Ready+readiness.Unready+readiness.NotStarted+readiness.LongNotStarted) { + return false + } + + return true +} + +// AcceptableRange contains information about acceptable size of a node group. +type AcceptableRange struct { + // MinNodes is the minimum number of nodes in the group. + MinNodes int + // MaxNodes is the maximum number of nodes in the group. + MaxNodes int + // CurrentTarget is the current target size of the group. + CurrentTarget int +} + +// calculateAcceptableRanges calcualtes how many nodes can be in a cluster. +// The function assumes that the nodeGroup.TargetSize() is the desired number of nodes. +// So if there has been a recent scale up of size 5 then there should be between targetSize-5 and targetSize +// nodes in ready state. In the same way, if there have been 3 nodes removed recently then +// the expected number of ready nodes is between targetSize and targetSize + 3. +func (csr *ClusterStateRegistry) calculateAcceptableRanges(targetSize map[string]int) map[string]AcceptableRange { + result := make(map[string]AcceptableRange) + for _, nodeGroup := range csr.cloudProvider.NodeGroups() { + size := targetSize[nodeGroup.Id()] + result[nodeGroup.Id()] = AcceptableRange{ + MinNodes: size, + MaxNodes: size, + CurrentTarget: size, + } + } + for _, sur := range csr.scaleUpRequests { + val := result[sur.NodeGroupName] + val.MinNodes -= sur.Increase + result[sur.NodeGroupName] = val + } + for _, sdr := range csr.scaleDownRequests { + val := result[sdr.NodeGroupName] + val.MaxNodes += 1 + result[sdr.NodeGroupName] = val + } + return result +} + +// Readiness contains readiness information about a group of nodes. +type Readiness struct { + // Number of ready nodes. + Ready int + // Number of unready nodes that doesn't fall into other categories. + Unready int + // Number of nodes that are being currently deleted. + Deleted int + // Number of nodes that failed to start within a reasonable limit. + LongNotStarted int + // Number of nodes that are not yet fully started. + NotStarted int +} + +func (csr *ClusterStateRegistry) calculateReadinessStats(currentTime time.Time) (perNodeGroup map[string]Readiness, total Readiness) { + + perNodeGroup = make(map[string]Readiness) + + update := func(current Readiness, node *apiv1.Node, ready bool) Readiness { + if isNodeBeingDeleted(node) { + current.Deleted++ + } else if isNodeNotStarted(node) && node.CreationTimestamp.Time.Add(MaxNodeStartupTime).Before(currentTime) { + current.LongNotStarted++ + } else if isNodeNotStarted(node) { + current.NotStarted++ + } else if ready { + current.Ready++ + } else { + current.Unready++ + } + return current + } + + for _, node := range csr.nodes { + nodeGroup, errNg := csr.cloudProvider.NodeGroupForNode(node) + ready, _, errReady := getReadinessState(node) + + // Node is most likely not autoscaled, however check the errors. + if reflect.ValueOf(nodeGroup).IsNil() { + if errNg != nil { + glog.Warningf("Failed to get nodegroup for %s: %v", node.Name, errNg) + } + if errReady != nil { + glog.Warningf("Failed to get readiness info for %s: %v", node.Name, errReady) + } + } else { + perNodeGroup[nodeGroup.Id()] = update(perNodeGroup[nodeGroup.Id()], node, ready) + } + total = update(total, node, ready) + } + return perNodeGroup, total +} + +// getReadinessState gets readiness state for the node +func getReadinessState(node *apiv1.Node) (isNodeReady bool, lastTransitionTime time.Time, err error) { + for _, condition := range node.Status.Conditions { + if condition.Type == apiv1.NodeReady { + + if condition.Status == apiv1.ConditionTrue { + return true, condition.LastTransitionTime.Time, nil + } + return false, condition.LastTransitionTime.Time, nil + } + } + return false, time.Time{}, fmt.Errorf("NodeReady condition for %s not found", node.Name) +} + +func isNodeBeingDeleted(node *apiv1.Node) bool { + taints, err := apiv1.GetTaintsFromNodeAnnotations(node.Annotations) + if err != nil { + glog.Warningf("Failed to get taints for %s: %v", node.Name, err) + } + for _, taint := range taints { + // TODO: move the constant outside. Using scale_down.go constant would cause cyclic dependency. + if taint.Key == "ToBeDeletedByClusterAutoscaler" { + return true + } + } + return false +} + +func isNodeNotStarted(node *apiv1.Node) bool { + for _, condition := range node.Status.Conditions { + if condition.Type == apiv1.NodeReady && + condition.Status == apiv1.ConditionFalse && + condition.LastTransitionTime.Time.Sub(node.CreationTimestamp.Time) < time.Second { + return true + } + } + return false +} diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go new file mode 100644 index 0000000000..e440b7ec26 --- /dev/null +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -0,0 +1,205 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterstate + +import ( + "testing" + "time" + + "k8s.io/contrib/cluster-autoscaler/cloudprovider/test" + . "k8s.io/contrib/cluster-autoscaler/utils/test" + apiv1 "k8s.io/kubernetes/pkg/api/v1" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" + + "github.com/stretchr/testify/assert" +) + +func TestOKWithScaleUp(t *testing.T) { + now := time.Now() + + ng1_1 := BuildTestNode("ng1-1", 1000, 1000) + setReadyState(ng1_1, true, now.Add(-time.Minute)) + ng2_1 := BuildTestNode("ng2-1", 1000, 1000) + setReadyState(ng2_1, true, now.Add(-time.Minute)) + + provider := testprovider.NewTestCloudProvider(nil, nil) + provider.AddNodeGroup("ng1", 1, 10, 5) + provider.AddNodeGroup("ng2", 1, 10, 1) + + provider.AddNode("ng1", ng1_1) + provider.AddNode("ng2", ng2_1) + assert.NotNil(t, provider) + + clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }) + clusterstate.RegisterScaleUp(&ScaleUpRequest{ + NodeGroupName: "ng1", + Increase: 4, + Time: now, + ExpectedAddTime: now.Add(time.Minute), + }) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + assert.NoError(t, err) + assert.True(t, clusterstate.IsClusterHealthy(now)) +} + +func TestOKOneUnreadyNode(t *testing.T) { + now := time.Now() + + ng1_1 := BuildTestNode("ng1-1", 1000, 1000) + setReadyState(ng1_1, true, now.Add(-time.Minute)) + ng2_1 := BuildTestNode("ng2-1", 1000, 1000) + setReadyState(ng2_1, false, now.Add(-time.Minute)) + + provider := testprovider.NewTestCloudProvider(nil, nil) + provider.AddNodeGroup("ng1", 1, 10, 1) + provider.AddNodeGroup("ng2", 1, 10, 1) + provider.AddNode("ng1", ng1_1) + provider.AddNode("ng2", ng2_1) + assert.NotNil(t, provider) + + clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + assert.NoError(t, err) + assert.True(t, clusterstate.IsClusterHealthy(now)) + assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) +} + +func TestMissingNodes(t *testing.T) { + now := time.Now() + + ng1_1 := BuildTestNode("ng1-1", 1000, 1000) + setReadyState(ng1_1, true, now.Add(-time.Minute)) + ng2_1 := BuildTestNode("ng2-1", 1000, 1000) + setReadyState(ng2_1, true, now.Add(-time.Minute)) + + provider := testprovider.NewTestCloudProvider(nil, nil) + provider.AddNodeGroup("ng1", 1, 10, 5) + provider.AddNodeGroup("ng2", 1, 10, 1) + + provider.AddNode("ng1", ng1_1) + provider.AddNode("ng2", ng2_1) + assert.NotNil(t, provider) + clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + assert.NoError(t, err) + assert.True(t, clusterstate.IsClusterHealthy(now)) + assert.False(t, clusterstate.IsNodeGroupHealthy("ng1")) +} + +func TestToManyUnready(t *testing.T) { + now := time.Now() + + ng1_1 := BuildTestNode("ng1-1", 1000, 1000) + setReadyState(ng1_1, false, now.Add(-time.Minute)) + ng2_1 := BuildTestNode("ng2-1", 1000, 1000) + setReadyState(ng2_1, false, now.Add(-time.Minute)) + + provider := testprovider.NewTestCloudProvider(nil, nil) + provider.AddNodeGroup("ng1", 1, 10, 1) + provider.AddNodeGroup("ng2", 1, 10, 1) + provider.AddNode("ng1", ng1_1) + provider.AddNode("ng2", ng2_1) + + assert.NotNil(t, provider) + clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now) + assert.NoError(t, err) + assert.False(t, clusterstate.IsClusterHealthy(now)) + assert.True(t, clusterstate.IsNodeGroupHealthy("ng1")) +} + +func TestExpiredScaleUp(t *testing.T) { + now := time.Now() + + ng1_1 := BuildTestNode("ng1-1", 1000, 1000) + setReadyState(ng1_1, true, now.Add(-time.Minute)) + + provider := testprovider.NewTestCloudProvider(nil, nil) + provider.AddNodeGroup("ng1", 1, 10, 5) + provider.AddNode("ng1", ng1_1) + assert.NotNil(t, provider) + + clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }) + clusterstate.RegisterScaleUp(&ScaleUpRequest{ + NodeGroupName: "ng1", + Increase: 4, + Time: now.Add(-3 * time.Minute), + ExpectedAddTime: now.Add(-1 * time.Minute), + }) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now) + assert.NoError(t, err) + assert.True(t, clusterstate.IsClusterHealthy(now)) + assert.False(t, clusterstate.IsNodeGroupHealthy("ng1")) +} + +func setReadyState(node *apiv1.Node, ready bool, lastTransition time.Time) { + if ready { + node.Status.Conditions = append(node.Status.Conditions, + apiv1.NodeCondition{ + Type: apiv1.NodeReady, + Status: apiv1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: lastTransition}, + }) + } else { + node.Status.Conditions = append(node.Status.Conditions, + apiv1.NodeCondition{ + Type: apiv1.NodeReady, + Status: apiv1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: lastTransition}, + }) + } +} + +func TestRegisterScaleDown(t *testing.T) { + ng1_1 := BuildTestNode("ng1-1", 1000, 1000) + provider := testprovider.NewTestCloudProvider(nil, nil) + provider.AddNodeGroup("ng1", 1, 10, 1) + provider.AddNode("ng1", ng1_1) + assert.NotNil(t, provider) + + clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + }) + + now := time.Now() + + clusterstate.RegisterScaleDown(&ScaleDownRequest{ + NodeGroupName: "ng1", + NodeName: "ng1-1", + ExpectedDeleteTime: now.Add(time.Minute), + Time: now, + }) + assert.Equal(t, 1, len(clusterstate.scaleDownRequests)) + clusterstate.cleanUp(now.Add(5 * time.Minute)) + assert.Equal(t, 0, len(clusterstate.scaleDownRequests)) +}