Add post scale up status processor.

This commit is contained in:
Beata Skiba 2018-05-30 08:16:39 +02:00
parent 7546dff1a4
commit b8ae6df5d3
7 changed files with 251 additions and 51 deletions

View File

@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset"
@ -41,12 +42,12 @@ import (
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*extensionsv1.DaemonSet) (bool, errors.AutoscalerError) {
nodes []*apiv1.Node, daemonSets []*extensionsv1.DaemonSet) (*status.ScaleUpStatus, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
glog.V(1).Info("No unschedulable pods")
return false, nil
return &status.ScaleUpStatus{ScaledUp: false}, nil
}
now := time.Now()
@ -63,14 +64,14 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet,
daemonSets, context.PredicateChecker)
if err != nil {
return false, err.AddPrefix("failed to build node infos for node groups: ")
return nil, err.AddPrefix("failed to build node infos for node groups: ")
}
nodeGroups := context.CloudProvider.NodeGroups()
resourceLimiter, errCP := context.CloudProvider.GetResourceLimiter()
if errCP != nil {
return false, errors.ToAutoscalerError(
return nil, errors.ToAutoscalerError(
errors.CloudProviderError,
errCP)
}
@ -81,7 +82,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
return false, errors.NewAutoscalerError(
return nil, errors.NewAutoscalerError(
errors.InternalError,
"failed to find template node for node group %s",
nodeGroup)
@ -99,7 +100,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
var errProc error
nodeGroups, nodeInfos, errProc = processors.NodeGroupListProcessor.Process(context, nodeGroups, nodeInfos, unschedulablePods)
if errProc != nil {
return false, errors.ToAutoscalerError(errors.InternalError, errProc)
return nil, errors.ToAutoscalerError(errors.InternalError, errProc)
}
}
@ -180,13 +181,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
if len(expansionOptions) == 0 {
glog.V(1).Info("No expansion options")
for pod, unschedulable := range podsRemainUnschedulable {
if unschedulable {
context.Recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp",
"pod didn't trigger scale-up (it wouldn't fit if a new node is added)")
}
}
return false, nil
return &status.ScaleUpStatus{ScaledUp: false, PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable)}, nil
}
// Pick some expansion option.
@ -204,7 +199,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newNodes = context.MaxNodesTotal - len(nodes)
if newNodes < 1 {
return false, errors.NewAutoscalerError(
return nil, errors.NewAutoscalerError(
errors.TransientError,
"max node total count already reached")
}
@ -219,7 +214,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
context.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToCreateNodeGroup",
"NodeAutoprovisioning: attempt to create node group %v failed: %v", oldId, err)
// TODO(maciekpytel): add some metric here after figuring out failure scenarios
return false, errors.ToAutoscalerError(errors.CloudProviderError, err)
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err)
}
newId := bestOption.NodeGroup.Id()
if newId != oldId {
@ -241,7 +236,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
// This should never happen, as we already should have retrieved
// nodeInfo for any considered nodegroup.
glog.Errorf("No node info for: %s", bestOption.NodeGroup.Id())
return false, errors.NewAutoscalerError(
return nil, errors.NewAutoscalerError(
errors.CloudProviderError,
"No node info for best expansion option!")
}
@ -249,14 +244,14 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
// apply upper limits for CPU and memory
newNodes, err = applyMaxClusterCoresMemoryLimits(newNodes, coresTotal, memoryTotal, resourceLimiter.GetMax(cloudprovider.ResourceNameCores), resourceLimiter.GetMax(cloudprovider.ResourceNameMemory), nodeInfo)
if err != nil {
return false, err
return nil, err
}
targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}
if context.BalanceSimilarNodeGroups {
similarNodeGroups, typedErr := nodegroupset.FindSimilarNodeGroups(bestOption.NodeGroup, context.CloudProvider, nodeInfos)
if typedErr != nil {
return false, typedErr.AddPrefix("Failed to find matching node groups: ")
return nil, typedErr.AddPrefix("Failed to find matching node groups: ")
}
similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, podsPassingPredicates)
for _, ng := range similarNodeGroups {
@ -283,32 +278,50 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
scaleUpInfos, typedErr := nodegroupset.BalanceScaleUpBetweenGroups(
targetNodeGroups, newNodes)
if typedErr != nil {
return false, typedErr
return nil, typedErr
}
glog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
for _, info := range scaleUpInfos {
typedErr := executeScaleUp(context, clusterStateRegistry, info)
if typedErr != nil {
return false, typedErr
return nil, typedErr
}
}
for _, pod := range bestOption.Pods {
context.Recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp",
"pod triggered scale-up: %v", scaleUpInfos)
}
clusterStateRegistry.Recalculate()
return true, nil
}
for pod, unschedulable := range podsRemainUnschedulable {
if unschedulable {
context.Recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp",
"pod didn't trigger scale-up (it wouldn't fit if a new node is added)")
}
return &status.ScaleUpStatus{
ScaledUp: true,
ScaleUpInfos: scaleUpInfos,
PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable),
PodsTriggeredScaleUp: bestOption.Pods,
PodsAwaitEvaluation: getPodsAwaitingEvaluation(unschedulablePods, podsRemainUnschedulable, bestOption.Pods)},
nil
}
return false, nil
return &status.ScaleUpStatus{ScaledUp: false, PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable)}, nil
}
func getRemainingPods(podSet map[*apiv1.Pod]bool) []*apiv1.Pod {
result := make([]*apiv1.Pod, 0)
for pod, val := range podSet {
if val {
result = append(result, pod)
}
}
return result
}
func getPodsAwaitingEvaluation(allPods []*apiv1.Pod, unschedulable map[*apiv1.Pod]bool, bestOption []*apiv1.Pod) []*apiv1.Pod {
result := make(map[*apiv1.Pod]bool, len(allPods))
for _, pod := range allPods {
if !unschedulable[pod] {
result[pod] = true
}
}
for _, pod := range bestOption {
result[pod] = false
}
return getRemainingPods(result)
}
func filterNodeGroupsByPods(groups []cloudprovider.NodeGroup, podsRequiredToFit []*apiv1.Pod,

View File

@ -372,9 +372,10 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, extraPods, nodes, []*extensionsv1.DaemonSet{})
status, err := ScaleUp(context, processors, clusterState, extraPods, nodes, []*extensionsv1.DaemonSet{})
processors.ScaleUpStatusProcessor.Process(context, status)
assert.NoError(t, err)
assert.True(t, result)
assert.True(t, status.ScaledUp)
expandedGroup := getGroupSizeChangeFromChan(expandedGroups)
assert.NotNil(t, expandedGroup, "Expected scale up event")
@ -476,10 +477,10 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
status, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
// A node is already coming - no need for scale up.
assert.False(t, result)
assert.False(t, status.ScaledUp)
}
func TestScaleUpNodeComingHasScale(t *testing.T) {
@ -539,11 +540,11 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
p3 := BuildTestPod("p-new", 550, 0)
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3, p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
status, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3, p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
// Two nodes needed but one node is already coming, so it should increase by one.
assert.True(t, result)
assert.True(t, status.ScaledUp)
assert.Equal(t, "ng2-1", getStringFromChan(expandedGroups))
}
@ -600,11 +601,11 @@ func TestScaleUpUnhealthy(t *testing.T) {
p3 := BuildTestPod("p-new", 550, 0)
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
status, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
// Node group is unhealthy.
assert.False(t, result)
assert.False(t, status.ScaledUp)
}
func TestScaleUpNoHelp(t *testing.T) {
@ -652,10 +653,11 @@ func TestScaleUpNoHelp(t *testing.T) {
p3 := BuildTestPod("p-new", 500, 0)
processors := ca_processors.TestProcessors()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*extensionsv1.DaemonSet{})
status, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*extensionsv1.DaemonSet{})
processors.ScaleUpStatusProcessor.Process(context, status)
assert.NoError(t, err)
assert.False(t, result)
assert.False(t, status.ScaledUp)
var event string
select {
case event = <-fakeRecorder.Events:
@ -738,10 +740,10 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
processors := ca_processors.TestProcessors()
result, typedErr := ScaleUp(context, processors, clusterState, pods, nodes, []*extensionsv1.DaemonSet{})
status, typedErr := ScaleUp(context, processors, clusterState, pods, nodes, []*extensionsv1.DaemonSet{})
assert.NoError(t, typedErr)
assert.True(t, result)
assert.True(t, status.ScaledUp)
groupMap := make(map[string]cloudprovider.NodeGroup, 3)
for _, group := range provider.NodeGroups() {
groupMap[group.Id()] = group
@ -800,9 +802,9 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors := ca_processors.TestProcessors()
processors.NodeGroupListProcessor = nodegroups.NewAutoprovisioningNodeGroupListProcessor()
result, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{})
status, err := ScaleUp(context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{})
assert.NoError(t, err)
assert.True(t, result)
assert.True(t, status.ScaledUp)
assert.Equal(t, "autoprovisioned-T1", getStringFromChan(createdGroups))
assert.Equal(t, "autoprovisioned-T1-1", getStringFromChan(expandedGroups))
}

View File

@ -275,14 +275,18 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
scaledUp, typedErr := ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets)
scaleUpStatus, typedErr := ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets)
metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)
if typedErr != nil {
glog.Errorf("Failed to scale up: %v", typedErr)
return typedErr
} else if scaledUp {
}
if a.processors != nil && a.processors.ScaleUpStatusProcessor != nil {
a.processors.ScaleUpStatusProcessor.Process(autoscalingContext, scaleUpStatus)
}
if scaleUpStatus.ScaledUp {
a.lastScaleUpTime = currentTime
// No scale down in this iteration.
return nil

View File

@ -19,6 +19,7 @@ package processors
import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
)
// AutoscalingProcessors are a set of customizable processors used for encapsulating
@ -28,18 +29,23 @@ type AutoscalingProcessors struct {
PodListProcessor pods.PodListProcessor
// NodeGroupListProcessor is used to process list of NodeGroups that can be used in scale-up.
NodeGroupListProcessor nodegroups.NodeGroupListProcessor
// ScaleUpStatusProcessor is used to process the state of the cluster after a scale-up.
ScaleUpStatusProcessor status.ScaleUpStatusProcessor
}
// DefaultProcessors returns default set of processors.
func DefaultProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: pods.NewDefaultPodListProcessor(),
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor()}
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(),
ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor()}
}
// TestProcessors returns a set of simple processors for use in tests.
func TestProcessors() *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: &pods.NoOpPodListProcessor{},
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}}
NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{},
// TODO(bskiba): change scale up test so that this can be a NoOpProcessor
ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}}
}

View File

@ -0,0 +1,42 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package status
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
)
// EventingScaleUpStatusProcessor processes the state of the cluster after
// a scale-up by emitting relevant events for pods depending on their post
// scale-up status.
type EventingScaleUpStatusProcessor struct{}
// Process processes the state of the cluster after a scale-up by emitting
// relevant events for pods depending on their post scale-up status.
func (p *EventingScaleUpStatusProcessor) Process(context *context.AutoscalingContext, status *ScaleUpStatus) {
for _, pod := range status.PodsRemainUnschedulable {
context.Recorder.Event(pod, apiv1.EventTypeNormal, "NotTriggerScaleUp",
"pod didn't trigger scale-up (it wouldn't fit if a new node is added)")
}
if len(status.ScaleUpInfos) > 0 {
for _, pod := range status.PodsTriggeredScaleUp {
context.Recorder.Eventf(pod, apiv1.EventTypeNormal, "TriggeredScaleUp",
"pod triggered scale-up: %v", status.ScaleUpInfos)
}
}
}

View File

@ -0,0 +1,89 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package status
import (
"strings"
"testing"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
kube_record "k8s.io/client-go/tools/record"
"github.com/stretchr/testify/assert"
)
func TestEventingScaleUpStatusProcessor(t *testing.T) {
p := &EventingScaleUpStatusProcessor{}
p1 := BuildTestPod("p1", 0, 0)
p2 := BuildTestPod("p2", 0, 0)
p3 := BuildTestPod("p3", 0, 0)
testCases := []struct {
caseName string
state *ScaleUpStatus
expectedTriggered int
expectedNoTriggered int
}{
{
caseName: "No scale up",
state: &ScaleUpStatus{
ScaleUpInfos: []nodegroupset.ScaleUpInfo{},
PodsRemainUnschedulable: []*apiv1.Pod{p1, p2},
},
expectedNoTriggered: 2,
},
{
caseName: "Scale up",
state: &ScaleUpStatus{
ScaleUpInfos: []nodegroupset.ScaleUpInfo{{}},
PodsTriggeredScaleUp: []*apiv1.Pod{p3},
PodsRemainUnschedulable: []*apiv1.Pod{p1, p2},
},
expectedTriggered: 1,
expectedNoTriggered: 2,
},
}
for _, tc := range testCases {
fakeRecorder := kube_record.NewFakeRecorder(5)
context := &context.AutoscalingContext{
Recorder: fakeRecorder,
}
p.Process(context, tc.state)
triggered := 0
noTriggered := 0
for eventsLeft := true; eventsLeft; {
select {
case event := <-fakeRecorder.Events:
if strings.Contains(event, "TriggeredScaleUp") {
triggered += 1
} else if strings.Contains(event, "NotTriggerScaleUp") {
noTriggered += 1
} else {
t.Fatalf("Test case '%v' failed. Unexpected event %v", tc.caseName, event)
}
default:
eventsLeft = false
}
}
assert.Equal(t, tc.expectedTriggered, triggered, "Test case '%v' failed.", tc.caseName)
assert.Equal(t, tc.expectedNoTriggered, noTriggered, "Test case '%v' failed.", tc.caseName)
}
}

View File

@ -0,0 +1,44 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package status
import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/nodegroupset"
)
// ScaleUpStatus is the status of a scale-up attempt. This includes information
// on if scale-up happened, description of scale-up operation performed and
// status of pods that took part in the scale-up evaluation.
type ScaleUpStatus struct {
ScaledUp bool
ScaleUpInfos []nodegroupset.ScaleUpInfo
PodsTriggeredScaleUp []*apiv1.Pod
PodsRemainUnschedulable []*apiv1.Pod
PodsAwaitEvaluation []*apiv1.Pod
}
// ScaleUpStatusProcessor processes the state of the cluster after a scale-up.
type ScaleUpStatusProcessor interface {
Process(context *context.AutoscalingContext, state *ScaleUpStatus)
}
// NewDefaultScaleUpStatusProcessor creates a default instance of ScaleUpStatusProcessor.
func NewDefaultScaleUpStatusProcessor() ScaleUpStatusProcessor {
return &EventingScaleUpStatusProcessor{}
}