Check if pods fit on the new node in binpacking

Previously we've just assumed pod will always fit on a newly added node
during binpacking, because we've already checked that a pod fits on an
empty template node earlier in scale-up logic.
This assumption is incorrect, as it doesn't take into account potential
impact of other scheduling we've done in binpacking. For pods using
zonal Filters (such as PodTopologySpreading with zonal topology key) the
pod may no longer be able to schedule even on an empty node as a result
of earlier decisions we've made in binpacking.
This commit is contained in:
Maciek Pytel 2022-06-14 15:06:48 +02:00
parent ab891418f6
commit 5342f189f1
2 changed files with 106 additions and 37 deletions

View File

@ -73,6 +73,7 @@ func (e *BinpackingNodeEstimator) Estimate(
sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score }) sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score })
newNodeNames := make(map[string]bool) newNodeNames := make(map[string]bool)
newNodesWithPods := make(map[string]bool)
if err := e.clusterSnapshot.Fork(); err != nil { if err := e.clusterSnapshot.Fork(); err != nil {
klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err) klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err)
@ -86,6 +87,7 @@ func (e *BinpackingNodeEstimator) Estimate(
newNodeNameIndex := 0 newNodeNameIndex := 0
scheduledPods := []*apiv1.Pod{} scheduledPods := []*apiv1.Pod{}
lastNodeName := ""
for _, podInfo := range podInfos { for _, podInfo := range podInfos {
found := false found := false
@ -100,6 +102,7 @@ func (e *BinpackingNodeEstimator) Estimate(
return 0, nil return 0, nil
} }
scheduledPods = append(scheduledPods, podInfo.pod) scheduledPods = append(scheduledPods, podInfo.pod)
newNodesWithPods[nodeName] = true
} }
if !found { if !found {
@ -109,6 +112,13 @@ func (e *BinpackingNodeEstimator) Estimate(
break break
} }
// If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule
// on a new node either. There is no point adding more nodes to snapshot in such case, especially because of
// performance cost each extra node adds to future FitsAnyNodeMatching calls.
if lastNodeName != "" && !newNodesWithPods[lastNodeName] {
continue
}
// Add new node // Add new node
newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex) newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
if err != nil { if err != nil {
@ -116,16 +126,25 @@ func (e *BinpackingNodeEstimator) Estimate(
return 0, nil return 0, nil
} }
newNodeNameIndex++ newNodeNameIndex++
// And schedule pod to it newNodeNames[newNodeName] = true
lastNodeName = newNodeName
// And try to schedule pod to it.
// Note that this may still fail (ex. if topology spreading with zonal topologyKey is used);
// in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid
// adding and removing node to snapshot for each such pod.
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil {
continue
}
if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil { if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err) klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err)
return 0, nil return 0, nil
} }
newNodeNames[newNodeName] = true newNodesWithPods[newNodeName] = true
scheduledPods = append(scheduledPods, podInfo.pod) scheduledPods = append(scheduledPods, podInfo.pod)
} }
} }
return len(newNodeNames), scheduledPods return len(newNodesWithPods), scheduledPods
} }
func (e *BinpackingNodeEstimator) addNewNodeToSnapshot( func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(

View File

@ -22,6 +22,7 @@ import (
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test" . "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units" "k8s.io/autoscaler/cluster-autoscaler/utils/units"
@ -30,8 +31,15 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func makePods(cpuPerPod int64, memoryPerPod int64, hostport int32, podCount int) []*apiv1.Pod { func makePods(cpuPerPod int64, memoryPerPod int64, hostport int32, maxSkew int32, topologySpreadingKey string, podCount int) []*apiv1.Pod {
pod := &apiv1.Pod{ pod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "estimatee",
Namespace: "universe",
Labels: map[string]string{
"app": "estimatee",
},
},
Spec: apiv1.PodSpec{ Spec: apiv1.PodSpec{
Containers: []apiv1.Container{ Containers: []apiv1.Container{
{ {
@ -45,13 +53,27 @@ func makePods(cpuPerPod int64, memoryPerPod int64, hostport int32, podCount int)
}, },
}, },
} }
if hostport != 0 { if hostport > 0 {
pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{
{ {
HostPort: hostport, HostPort: hostport,
}, },
} }
} }
if maxSkew > 0 {
pod.Spec.TopologySpreadConstraints = []apiv1.TopologySpreadConstraint{
{
MaxSkew: maxSkew,
TopologyKey: topologySpreadingKey,
WhenUnsatisfiable: "DoNotSchedule",
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "estimatee",
},
},
},
}
}
pods := []*apiv1.Pod{} pods := []*apiv1.Pod{}
for i := 0; i < podCount; i++ { for i := 0; i < podCount; i++ {
pods = append(pods, pod) pods = append(pods, pod)
@ -59,21 +81,44 @@ func makePods(cpuPerPod int64, memoryPerPod int64, hostport int32, podCount int)
return pods return pods
} }
func makeNode(cpu int64, mem int64, name string, zone string) *apiv1.Node {
node := &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
"kubernetes.io/hostname": name,
"topology.kubernetes.io/zone": zone,
},
},
Status: apiv1.NodeStatus{
Capacity: apiv1.ResourceList{
apiv1.ResourceCPU: *resource.NewMilliQuantity(cpu, resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(mem*units.MiB, resource.DecimalSI),
apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI),
},
},
}
node.Status.Allocatable = node.Status.Capacity
SetNodeReadyState(node, true, time.Time{})
return node
}
func TestBinpackingEstimate(t *testing.T) { func TestBinpackingEstimate(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
millicores int64 millicores int64
memory int64 memory int64
maxNodes int maxNodes int
pods []*apiv1.Pod pods []*apiv1.Pod
expectNodeCount int topologySpreadingKey string
expectPodCount int expectNodeCount int
expectPodCount int
}{ }{
{ {
name: "simple resource-based binpacking", name: "simple resource-based binpacking",
millicores: 350*3 - 50, millicores: 350*3 - 50,
memory: 2 * 1000, memory: 2 * 1000,
pods: makePods(350, 1000, 0, 10), pods: makePods(350, 1000, 0, 0, "", 10),
expectNodeCount: 5, expectNodeCount: 5,
expectPodCount: 10, expectPodCount: 10,
}, },
@ -81,7 +126,7 @@ func TestBinpackingEstimate(t *testing.T) {
name: "pods-per-node bound binpacking", name: "pods-per-node bound binpacking",
millicores: 10000, millicores: 10000,
memory: 20000, memory: 20000,
pods: makePods(10, 100, 0, 20), pods: makePods(10, 100, 0, 0, "", 20),
expectNodeCount: 2, expectNodeCount: 2,
expectPodCount: 20, expectPodCount: 20,
}, },
@ -89,7 +134,7 @@ func TestBinpackingEstimate(t *testing.T) {
name: "hostport conflict forces pod-per-node", name: "hostport conflict forces pod-per-node",
millicores: 1000, millicores: 1000,
memory: 5000, memory: 5000,
pods: makePods(200, 1000, 5555, 8), pods: makePods(200, 1000, 5555, 0, "", 8),
expectNodeCount: 8, expectNodeCount: 8,
expectPodCount: 8, expectPodCount: 8,
}, },
@ -97,41 +142,46 @@ func TestBinpackingEstimate(t *testing.T) {
name: "limiter cuts binpacking", name: "limiter cuts binpacking",
millicores: 1000, millicores: 1000,
memory: 5000, memory: 5000,
pods: makePods(500, 1000, 0, 20), pods: makePods(500, 1000, 0, 0, "", 20),
maxNodes: 5, maxNodes: 5,
expectNodeCount: 5, expectNodeCount: 5,
expectPodCount: 10, expectPodCount: 10,
}, },
{
name: "hostname topology spreading with maxSkew=2 forces 2 pods/node",
millicores: 1000,
memory: 5000,
pods: makePods(20, 100, 0, 2, "kubernetes.io/hostname", 8),
expectNodeCount: 4,
expectPodCount: 8,
},
{
name: "zonal topology spreading with maxSkew=2 only allows 2 pods to schedule",
millicores: 1000,
memory: 5000,
pods: makePods(20, 100, 0, 2, "topology.kubernetes.io/zone", 8),
expectNodeCount: 1,
expectPodCount: 2,
},
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
limiter := NewThresholdBasedEstimationLimiter(tc.maxNodes, time.Duration(0)) clusterSnapshot := simulator.NewBasicClusterSnapshot()
estimator := newBinPackingEstimator(t, limiter) // Add one node in different zone to trigger topology spread constraints
node := &apiv1.Node{ clusterSnapshot.AddNode(makeNode(100, 100, "oldnode", "zone-jupiter"))
Status: apiv1.NodeStatus{
Capacity: apiv1.ResourceList{
apiv1.ResourceCPU: *resource.NewMilliQuantity(tc.millicores, resource.DecimalSI),
apiv1.ResourceMemory: *resource.NewQuantity(tc.memory*units.MiB, resource.DecimalSI),
apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI),
},
},
}
node.Status.Allocatable = node.Status.Capacity
SetNodeReadyState(node, true, time.Time{})
predicateChecker, err := simulator.NewTestPredicateChecker()
assert.NoError(t, err)
limiter := NewThresholdBasedEstimationLimiter(tc.maxNodes, time.Duration(0))
estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter)
node := makeNode(tc.millicores, tc.memory, "template", "zone-mars")
nodeInfo := schedulerframework.NewNodeInfo() nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(node) nodeInfo.SetNode(node)
estimatedNodes, estimatedPods := estimator.Estimate(tc.pods, nodeInfo, nil) estimatedNodes, estimatedPods := estimator.Estimate(tc.pods, nodeInfo, nil)
assert.Equal(t, tc.expectNodeCount, estimatedNodes) assert.Equal(t, tc.expectNodeCount, estimatedNodes)
assert.Equal(t, tc.expectPodCount, len(estimatedPods)) assert.Equal(t, tc.expectPodCount, len(estimatedPods))
}) })
} }
} }
func newBinPackingEstimator(t *testing.T, l EstimationLimiter) *BinpackingNodeEstimator {
predicateChecker, err := simulator.NewTestPredicateChecker()
clusterSnapshot := simulator.NewBasicClusterSnapshot()
assert.NoError(t, err)
estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, l)
return estimator
}