Merge pull request https://github.com/kubernetes/contrib/pull/2236 from mwielgus/estimator-coming

Automatic merge from submit-queue

Cluster-autoscaler: add comming nodes to estimators

Ref: #2228 #2229

cc: @fgrzadkowski @jszczepkowski @piosz
This commit is contained in:
Kubernetes Submit Queue 2016-12-30 07:53:01 -08:00 committed by GitHub
commit 94106d9ba3
5 changed files with 130 additions and 68 deletions

View File

@ -56,7 +56,8 @@ func NewBinpackingNodeEstimator(predicateChecker *simulator.PredicateChecker) *B
// still be maintained. // still be maintained.
// It is assumed that all pods from the given list can fit to nodeTemplate. // It is assumed that all pods from the given list can fit to nodeTemplate.
// Returns the number of nodes needed to accommodate all pods from the list. // Returns the number of nodes needed to accommodate all pods from the list.
func (estimator *BinpackingNodeEstimator) Estimate(pods []*apiv1.Pod, nodeTemplate *schedulercache.NodeInfo) int { func (estimator *BinpackingNodeEstimator) Estimate(pods []*apiv1.Pod, nodeTemplate *schedulercache.NodeInfo,
comingNodes []*schedulercache.NodeInfo) int {
podInfos := calculatePodScore(pods, nodeTemplate) podInfos := calculatePodScore(pods, nodeTemplate)
sort.Sort(byScoreDesc(podInfos)) sort.Sort(byScoreDesc(podInfos))
@ -71,6 +72,10 @@ func (estimator *BinpackingNodeEstimator) Estimate(pods []*apiv1.Pod, nodeTempla
} }
newNodes := make([]*schedulercache.NodeInfo, 0) newNodes := make([]*schedulercache.NodeInfo, 0)
for _, node := range comingNodes {
newNodes = append(newNodes, node)
}
for _, podInfo := range podInfos { for _, podInfo := range podInfos {
found := false found := false
for i, nodeInfo := range newNodes { for i, nodeInfo := range newNodes {
@ -84,7 +89,7 @@ func (estimator *BinpackingNodeEstimator) Estimate(pods []*apiv1.Pod, nodeTempla
newNodes = append(newNodes, nodeWithPod(nodeTemplate, podInfo.pod)) newNodes = append(newNodes, nodeWithPod(nodeTemplate, podInfo.pod))
} }
} }
return len(newNodes) return len(newNodes) - len(comingNodes)
} }
// Calculates score for all pods and returns podInfo structure. // Calculates score for all pods and returns podInfo structure.

View File

@ -32,20 +32,7 @@ func TestBinpackingEstimate(t *testing.T) {
cpuPerPod := int64(350) cpuPerPod := int64(350)
memoryPerPod := int64(1000 * 1024 * 1024) memoryPerPod := int64(1000 * 1024 * 1024)
pod := &apiv1.Pod{ pod := makePod(cpuPerPod, memoryPerPod)
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
Resources: apiv1.ResourceRequirements{
Requests: apiv1.ResourceList{
apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod, resource.DecimalSI),
},
},
},
},
},
}
pods := make([]*apiv1.Pod, 0) pods := make([]*apiv1.Pod, 0)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
@ -64,32 +51,48 @@ func TestBinpackingEstimate(t *testing.T) {
nodeInfo := schedulercache.NewNodeInfo() nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(node) nodeInfo.SetNode(node)
estimate := estimator.Estimate(pods, nodeInfo) estimate := estimator.Estimate(pods, nodeInfo, []*schedulercache.NodeInfo{})
assert.Equal(t, 5, estimate) assert.Equal(t, 5, estimate)
} }
func TestBinpackingEstimateComingNodes(t *testing.T) {
estimator := NewBinpackingNodeEstimator(simulator.NewTestPredicateChecker())
cpuPerPod := int64(350)
memoryPerPod := int64(1000 * 1024 * 1024)
pod := makePod(cpuPerPod, memoryPerPod)
pods := make([]*apiv1.Pod, 0)
for i := 0; i < 10; i++ {
pods = append(pods, pod)
}
node := &apiv1.Node{
Status: apiv1.NodeStatus{
Capacity: apiv1.ResourceList{
apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod*3-50, resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI),
apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI),
},
},
}
node.Status.Allocatable = node.Status.Capacity
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(node)
estimate := estimator.Estimate(pods, nodeInfo, []*schedulercache.NodeInfo{nodeInfo, nodeInfo})
// 5 - 2 nodes that are coming.
assert.Equal(t, 3, estimate)
}
func TestBinpackingEstimateWithPorts(t *testing.T) { func TestBinpackingEstimateWithPorts(t *testing.T) {
estimator := NewBinpackingNodeEstimator(simulator.NewTestPredicateChecker()) estimator := NewBinpackingNodeEstimator(simulator.NewTestPredicateChecker())
cpuPerPod := int64(200) cpuPerPod := int64(200)
memoryPerPod := int64(1000 * 1024 * 1024) memoryPerPod := int64(1000 * 1024 * 1024)
pod := &apiv1.Pod{ pod := makePod(cpuPerPod, memoryPerPod)
Spec: apiv1.PodSpec{ pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{
Containers: []apiv1.Container{ {
{ HostPort: 5555,
Resources: apiv1.ResourceRequirements{
Requests: apiv1.ResourceList{
apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod, resource.DecimalSI),
},
},
Ports: []apiv1.ContainerPort{
{
HostPort: 5555,
},
},
},
},
}, },
} }
pods := make([]*apiv1.Pod, 0) pods := make([]*apiv1.Pod, 0)
@ -109,6 +112,6 @@ func TestBinpackingEstimateWithPorts(t *testing.T) {
nodeInfo := schedulercache.NewNodeInfo() nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(node) nodeInfo.SetNode(node)
estimate := estimator.Estimate(pods, nodeInfo) estimate := estimator.Estimate(pods, nodeInfo, []*schedulercache.NodeInfo{})
assert.Equal(t, 8, estimate) assert.Equal(t, 8, estimate)
} }

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1" apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
) )
// BasicNodeEstimator estimates the number of needed nodes to handle the given amount of pods. // BasicNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
@ -90,28 +91,53 @@ func (basicEstimator *BasicNodeEstimator) GetDebug() string {
} }
// Estimate estimates the number needed of nodes of the given shape. // Estimate estimates the number needed of nodes of the given shape.
func (basicEstimator *BasicNodeEstimator) Estimate(node *apiv1.Node) (int, string) { func (basicEstimator *BasicNodeEstimator) Estimate(node *apiv1.Node, comingNodes []*schedulercache.NodeInfo) (int, string) {
var buffer bytes.Buffer var buffer bytes.Buffer
buffer.WriteString("Needed nodes according to:\n") buffer.WriteString("Needed nodes according to:\n")
result := 0 result := 0
resources := apiv1.ResourceList{}
for _, node := range comingNodes {
cpu := resources[apiv1.ResourceCPU]
cpu.Add(node.Node().Status.Capacity[apiv1.ResourceCPU])
resources[apiv1.ResourceCPU] = cpu
mem := resources[apiv1.ResourceMemory]
mem.Add(node.Node().Status.Capacity[apiv1.ResourceMemory])
resources[apiv1.ResourceMemory] = mem
pods := resources[apiv1.ResourcePods]
pods.Add(node.Node().Status.Capacity[apiv1.ResourcePods])
resources[apiv1.ResourcePods] = pods
}
if cpuCapcaity, ok := node.Status.Capacity[apiv1.ResourceCPU]; ok { if cpuCapcaity, ok := node.Status.Capacity[apiv1.ResourceCPU]; ok {
prop := int(math.Ceil(float64(basicEstimator.cpuSum.MilliValue()) / float64(cpuCapcaity.MilliValue()))) comingCpu := resources[apiv1.ResourceCPU]
prop := int(math.Ceil(float64(
basicEstimator.cpuSum.MilliValue()-comingCpu.MilliValue()) /
float64(cpuCapcaity.MilliValue())))
buffer.WriteString(fmt.Sprintf("CPU: %d\n", prop)) buffer.WriteString(fmt.Sprintf("CPU: %d\n", prop))
result = maxInt(result, prop) result = maxInt(result, prop)
} }
if memCapcaity, ok := node.Status.Capacity[apiv1.ResourceMemory]; ok { if memCapcaity, ok := node.Status.Capacity[apiv1.ResourceMemory]; ok {
prop := int(math.Ceil(float64(basicEstimator.memorySum.Value()) / float64(memCapcaity.Value()))) comingMem := resources[apiv1.ResourceMemory]
prop := int(math.Ceil(float64(
basicEstimator.memorySum.Value()-comingMem.Value()) /
float64(memCapcaity.Value())))
buffer.WriteString(fmt.Sprintf("Mem: %d\n", prop)) buffer.WriteString(fmt.Sprintf("Mem: %d\n", prop))
result = maxInt(result, prop) result = maxInt(result, prop)
} }
if podCapcaity, ok := node.Status.Capacity[apiv1.ResourcePods]; ok { if podCapcaity, ok := node.Status.Capacity[apiv1.ResourcePods]; ok {
prop := int(math.Ceil(float64(basicEstimator.GetCount()) / float64(podCapcaity.Value()))) comingPods := resources[apiv1.ResourcePods]
prop := int(math.Ceil(float64(basicEstimator.GetCount()-int(comingPods.Value())) /
float64(podCapcaity.Value())))
buffer.WriteString(fmt.Sprintf("Pods: %d\n", prop)) buffer.WriteString(fmt.Sprintf("Pods: %d\n", prop))
result = maxInt(result, prop) result = maxInt(result, prop)
} }
for port, count := range basicEstimator.portSum { for port, count := range basicEstimator.portSum {
buffer.WriteString(fmt.Sprintf("Port %d: %d\n", port, count)) buffer.WriteString(fmt.Sprintf("Port %d: %d\n", port, count))
result = maxInt(result, count) result = maxInt(result, count-len(comingNodes))
} }
return result, buffer.String() return result, buffer.String()
} }

View File

@ -21,15 +21,13 @@ import (
"k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1" apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestEstimate(t *testing.T) { func makePod(cpuPerPod, memoryPerPod int64) *apiv1.Pod {
cpuPerPod := int64(500) return &apiv1.Pod{
memoryPerPod := int64(1000 * 1024 * 1024)
pod := &apiv1.Pod{
Spec: apiv1.PodSpec{ Spec: apiv1.PodSpec{
Containers: []apiv1.Container{ Containers: []apiv1.Container{
{ {
@ -43,7 +41,43 @@ func TestEstimate(t *testing.T) {
}, },
}, },
} }
}
func TestEstimate(t *testing.T) {
cpuPerPod := int64(500)
memoryPerPod := int64(1000 * 1024 * 1024)
pod := makePod(cpuPerPod, memoryPerPod)
estimator := NewBasicNodeEstimator()
for i := 0; i < 5; i++ {
podCopy := *pod
estimator.Add(&podCopy)
}
assert.Equal(t, int64(500*5), estimator.cpuSum.MilliValue())
assert.Equal(t, int64(5*memoryPerPod), estimator.memorySum.Value())
assert.Equal(t, 5, estimator.GetCount())
node := &apiv1.Node{
Status: apiv1.NodeStatus{
Capacity: apiv1.ResourceList{
apiv1.ResourceCPU: *resource.NewMilliQuantity(3*cpuPerPod, resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI),
apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI),
},
},
}
estimate, report := estimator.Estimate(node, []*schedulercache.NodeInfo{})
assert.Contains(t, estimator.GetDebug(), "CPU")
assert.Contains(t, report, "CPU")
assert.Equal(t, 3, estimate)
}
func TestEstimateWithComing(t *testing.T) {
cpuPerPod := int64(500)
memoryPerPod := int64(1000 * 1024 * 1024)
pod := makePod(cpuPerPod, memoryPerPod)
estimator := NewBasicNodeEstimator() estimator := NewBasicNodeEstimator()
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
@ -64,33 +98,24 @@ func TestEstimate(t *testing.T) {
}, },
}, },
} }
estimate, report := estimator.Estimate(node) node.Status.Allocatable = node.Status.Capacity
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(node)
estimate, report := estimator.Estimate(node, []*schedulercache.NodeInfo{nodeInfo, nodeInfo})
assert.Contains(t, estimator.GetDebug(), "CPU") assert.Contains(t, estimator.GetDebug(), "CPU")
assert.Contains(t, report, "CPU") assert.Contains(t, report, "CPU")
assert.Equal(t, 3, estimate) assert.Equal(t, 1, estimate)
} }
func TestEstimateWithPorts(t *testing.T) { func TestEstimateWithPorts(t *testing.T) {
cpuPerPod := int64(500) cpuPerPod := int64(500)
memoryPerPod := int64(1000 * 1024 * 1024) memoryPerPod := int64(1000 * 1024 * 1024)
pod := &apiv1.Pod{ pod := makePod(cpuPerPod, memoryPerPod)
Spec: apiv1.PodSpec{ pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{
Containers: []apiv1.Container{ {
{ HostPort: 5555,
Resources: apiv1.ResourceRequirements{
Requests: apiv1.ResourceList{
apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod, resource.DecimalSI),
},
},
Ports: []apiv1.ContainerPort{
{
HostPort: 5555,
},
},
},
},
}, },
} }
@ -108,7 +133,7 @@ func TestEstimateWithPorts(t *testing.T) {
}, },
} }
estimate, report := estimator.Estimate(node) estimate, report := estimator.Estimate(node, []*schedulercache.NodeInfo{})
assert.Contains(t, estimator.GetDebug(), "CPU") assert.Contains(t, estimator.GetDebug(), "CPU")
assert.Contains(t, report, "CPU") assert.Contains(t, report, "CPU")
assert.Equal(t, 5, estimate) assert.Equal(t, 5, estimate)

View File

@ -22,6 +22,7 @@ import (
"k8s.io/contrib/cluster-autoscaler/estimator" "k8s.io/contrib/cluster-autoscaler/estimator"
"k8s.io/contrib/cluster-autoscaler/expander" "k8s.io/contrib/cluster-autoscaler/expander"
apiv1 "k8s.io/kubernetes/pkg/api/v1" apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -84,17 +85,19 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
if len(option.Pods) > 0 { if len(option.Pods) > 0 {
if context.EstimatorName == BinpackingEstimatorName { if context.EstimatorName == BinpackingEstimatorName {
binpackingEstimator := estimator.NewBinpackingNodeEstimator(context.PredicateChecker) binpackingEstimator := estimator.NewBinpackingNodeEstimator(context.PredicateChecker)
option.NodeCount = binpackingEstimator.Estimate(option.Pods, nodeInfo) option.NodeCount = binpackingEstimator.Estimate(option.Pods, nodeInfo, []*schedulercache.NodeInfo{})
} else if context.EstimatorName == BasicEstimatorName { } else if context.EstimatorName == BasicEstimatorName {
basicEstimator := estimator.NewBasicNodeEstimator() basicEstimator := estimator.NewBasicNodeEstimator()
for _, pod := range option.Pods { for _, pod := range option.Pods {
basicEstimator.Add(pod) basicEstimator.Add(pod)
} }
option.NodeCount, option.Debug = basicEstimator.Estimate(nodeInfo.Node()) option.NodeCount, option.Debug = basicEstimator.Estimate(nodeInfo.Node(), []*schedulercache.NodeInfo{})
} else { } else {
glog.Fatalf("Unrecognized estimator: %s", context.EstimatorName) glog.Fatalf("Unrecognized estimator: %s", context.EstimatorName)
} }
expansionOptions = append(expansionOptions, option) if option.NodeCount > 0 {
expansionOptions = append(expansionOptions, option)
}
} }
} }