Simplify equivalence group usage
This commit is contained in:
parent
ed151e637c
commit
d6849e82b6
|
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
apiv1 "k8s.io/api/core/v1"
|
||||||
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
|
||||||
|
)
|
||||||
|
|
||||||
|
type podEquivalenceGroup struct {
|
||||||
|
pods []*apiv1.Pod
|
||||||
|
schedulingErrors map[string]status.Reasons
|
||||||
|
schedulable bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildPodEquivalenceGroups prepares pod groups with equivalent scheduling properties.
|
||||||
|
func buildPodEquivalenceGroups(pods []*apiv1.Pod) []*podEquivalenceGroup {
|
||||||
|
podEquivalenceGroups := []*podEquivalenceGroup{}
|
||||||
|
for _, pods := range groupPodsBySchedulingProperties(pods) {
|
||||||
|
podEquivalenceGroups = append(podEquivalenceGroups, &podEquivalenceGroup{
|
||||||
|
pods: pods,
|
||||||
|
schedulingErrors: map[string]status.Reasons{},
|
||||||
|
schedulable: false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return podEquivalenceGroups
|
||||||
|
}
|
||||||
|
|
||||||
|
type equivalenceGroupId int
|
||||||
|
type equivalenceGroup struct {
|
||||||
|
id equivalenceGroupId
|
||||||
|
representant *apiv1.Pod
|
||||||
|
}
|
||||||
|
|
||||||
|
// groupPodsBySchedulingProperties groups pods based on scheduling properties. Group ID is meaningless.
|
||||||
|
func groupPodsBySchedulingProperties(pods []*apiv1.Pod) map[equivalenceGroupId][]*apiv1.Pod {
|
||||||
|
podEquivalenceGroups := map[equivalenceGroupId][]*apiv1.Pod{}
|
||||||
|
equivalenceGroupsByController := make(map[types.UID][]equivalenceGroup)
|
||||||
|
|
||||||
|
var nextGroupId equivalenceGroupId
|
||||||
|
for _, pod := range pods {
|
||||||
|
controllerRef := drain.ControllerRef(pod)
|
||||||
|
if controllerRef == nil {
|
||||||
|
podEquivalenceGroups[nextGroupId] = []*apiv1.Pod{pod}
|
||||||
|
nextGroupId++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
matchingFound := false
|
||||||
|
for _, g := range equivalenceGroupsByController[controllerRef.UID] {
|
||||||
|
if reflect.DeepEqual(pod.Labels, g.representant.Labels) && apiequality.Semantic.DeepEqual(pod.Spec, g.representant.Spec) {
|
||||||
|
matchingFound = true
|
||||||
|
podEquivalenceGroups[g.id] = append(podEquivalenceGroups[g.id], pod)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !matchingFound {
|
||||||
|
newGroup := equivalenceGroup{
|
||||||
|
id: nextGroupId,
|
||||||
|
representant: pod,
|
||||||
|
}
|
||||||
|
equivalenceGroupsByController[controllerRef.UID] = append(equivalenceGroupsByController[controllerRef.UID], newGroup)
|
||||||
|
podEquivalenceGroups[newGroup.id] = append(podEquivalenceGroups[newGroup.id], pod)
|
||||||
|
nextGroupId++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return podEquivalenceGroups
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,105 @@
|
||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||||
|
|
||||||
|
apiv1 "k8s.io/api/core/v1"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGroupSchedulablePodsForNode(t *testing.T) {
|
||||||
|
rc1 := apiv1.ReplicationController{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "rc1",
|
||||||
|
Namespace: "default",
|
||||||
|
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
|
||||||
|
UID: "12345678-1234-1234-1234-123456789012",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
rc2 := apiv1.ReplicationController{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "rc2",
|
||||||
|
Namespace: "default",
|
||||||
|
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
|
||||||
|
UID: "12345678-1234-1234-1234-12345678901a",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
p1 := BuildTestPod("p1", 1500, 200000)
|
||||||
|
p2_1 := BuildTestPod("p2_1", 3000, 200000)
|
||||||
|
p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
|
||||||
|
p2_2 := BuildTestPod("p2_2", 3000, 200000)
|
||||||
|
p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
|
||||||
|
p3_1 := BuildTestPod("p3_1", 100, 200000)
|
||||||
|
p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
|
||||||
|
p3_2 := BuildTestPod("p3_2", 100, 200000)
|
||||||
|
p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
|
||||||
|
unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2}
|
||||||
|
|
||||||
|
podGroups := groupPodsBySchedulingProperties(unschedulablePods)
|
||||||
|
assert.Equal(t, 3, len(podGroups))
|
||||||
|
|
||||||
|
wantedGroups := []struct {
|
||||||
|
pods []*apiv1.Pod
|
||||||
|
found bool
|
||||||
|
}{
|
||||||
|
{pods: []*apiv1.Pod{p1}},
|
||||||
|
{pods: []*apiv1.Pod{p2_1, p2_2}},
|
||||||
|
{pods: []*apiv1.Pod{p3_1, p3_2}},
|
||||||
|
}
|
||||||
|
|
||||||
|
equal := func(a, b []*apiv1.Pod) bool {
|
||||||
|
if len(a) != len(b) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
ma := map[*apiv1.Pod]bool{}
|
||||||
|
for _, ea := range a {
|
||||||
|
ma[ea] = true
|
||||||
|
}
|
||||||
|
for _, eb := range b {
|
||||||
|
if !ma[eb] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, g := range podGroups {
|
||||||
|
found := false
|
||||||
|
for i, wanted := range wantedGroups {
|
||||||
|
if equal(g, wanted.pods) {
|
||||||
|
wanted.found = true
|
||||||
|
wantedGroups[i] = wanted
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.True(t, found, fmt.Errorf("Unexpected pod group: %+v", g))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, w := range wantedGroups {
|
||||||
|
assert.True(t, w.found, fmt.Errorf("Expected pod group: %+v", w))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -19,11 +19,12 @@ package core
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
|
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
|
||||||
|
|
||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
apiv1 "k8s.io/api/core/v1"
|
apiv1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
|
@ -265,11 +266,8 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
|
||||||
|
|
||||||
loggingQuota := glogx.PodsLoggingQuota()
|
loggingQuota := glogx.PodsLoggingQuota()
|
||||||
|
|
||||||
podsRemainUnschedulable := make(map[*apiv1.Pod]map[string]status.Reasons)
|
|
||||||
|
|
||||||
for _, pod := range unschedulablePods {
|
for _, pod := range unschedulablePods {
|
||||||
glogx.V(1).UpTo(loggingQuota).Infof("Pod %s/%s is unschedulable", pod.Namespace, pod.Name)
|
glogx.V(1).UpTo(loggingQuota).Infof("Pod %s/%s is unschedulable", pod.Namespace, pod.Name)
|
||||||
podsRemainUnschedulable[pod] = make(map[string]status.Reasons)
|
|
||||||
}
|
}
|
||||||
glogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
|
glogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
|
||||||
|
|
||||||
|
|
@ -309,7 +307,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
|
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
|
||||||
|
|
||||||
expansionOptions := make([]expander.Option, 0)
|
expansionOptions := make(map[string]expander.Option, 0)
|
||||||
|
|
||||||
if processors != nil && processors.NodeGroupListProcessor != nil {
|
if processors != nil && processors.NodeGroupListProcessor != nil {
|
||||||
var errProc error
|
var errProc error
|
||||||
|
|
@ -319,9 +317,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
podsPredicatePassingCheckFunctions := getPodsPredicatePassingCheckFunctions(context, unschedulablePods, nodeInfos)
|
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
|
||||||
getPodsPassingPredicates := podsPredicatePassingCheckFunctions.getPodsPassingPredicates
|
|
||||||
getPodsNotPassingPredicates := podsPredicatePassingCheckFunctions.getPodsNotPassingPredicates
|
|
||||||
|
|
||||||
skippedNodeGroups := map[string]status.Reasons{}
|
skippedNodeGroups := map[string]status.Reasons{}
|
||||||
for _, nodeGroup := range nodeGroups {
|
for _, nodeGroup := range nodeGroups {
|
||||||
|
|
@ -375,34 +371,19 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
|
||||||
Pods: make([]*apiv1.Pod, 0),
|
Pods: make([]*apiv1.Pod, 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
// add list of pods which pass predicates to option
|
for _, eg := range podEquivalenceGroups {
|
||||||
podsPassing, err := getPodsPassingPredicates(nodeGroup.Id())
|
samplePod := eg.pods[0]
|
||||||
if err != nil {
|
if err := context.PredicateChecker.CheckPredicates(samplePod, nil, nodeInfo); err == nil {
|
||||||
klog.V(4).Infof("Skipping node group %s; cannot compute pods passing predicates", nodeGroup.Id())
|
// add pods to option
|
||||||
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
|
option.Pods = append(option.Pods, eg.pods...)
|
||||||
continue
|
// mark pod group as (theoretically) schedulable
|
||||||
|
eg.schedulable = true
|
||||||
} else {
|
} else {
|
||||||
option.Pods = make([]*apiv1.Pod, len(podsPassing))
|
klog.V(2).Infof("Pod %s can't be scheduled on %s, predicate failed: %v", samplePod.Name, nodeGroup.Id(), err.VerboseError())
|
||||||
copy(option.Pods, podsPassing)
|
if podCount := len(eg.pods); podCount > 1 {
|
||||||
|
klog.V(2).Infof("%d other pods similar to %s can't be scheduled on %s", podCount-1, samplePod.Name, nodeGroup.Id())
|
||||||
}
|
}
|
||||||
|
eg.schedulingErrors[nodeGroup.Id()] = err
|
||||||
// update information why we cannot schedule pods for which we did not find a working extension option so far
|
|
||||||
podsNotPassing, err := getPodsNotPassingPredicates(nodeGroup.Id())
|
|
||||||
if err != nil {
|
|
||||||
klog.V(4).Infof("Skipping node group %s; cannot compute pods not passing predicates", nodeGroup.Id())
|
|
||||||
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// mark that there is a scheduling option for pods which can be scheduled to node from currently analyzed node group
|
|
||||||
for _, pod := range podsPassing {
|
|
||||||
delete(podsRemainUnschedulable, pod)
|
|
||||||
}
|
|
||||||
|
|
||||||
for pod, err := range podsNotPassing {
|
|
||||||
_, found := podsRemainUnschedulable[pod]
|
|
||||||
if found {
|
|
||||||
podsRemainUnschedulable[pod][nodeGroup.Id()] = err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -410,7 +391,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
|
||||||
estimator := context.EstimatorBuilder(context.PredicateChecker)
|
estimator := context.EstimatorBuilder(context.PredicateChecker)
|
||||||
option.NodeCount = estimator.Estimate(option.Pods, nodeInfo, upcomingNodes)
|
option.NodeCount = estimator.Estimate(option.Pods, nodeInfo, upcomingNodes)
|
||||||
if option.NodeCount > 0 {
|
if option.NodeCount > 0 {
|
||||||
expansionOptions = append(expansionOptions, option)
|
expansionOptions[nodeGroup.Id()] = option
|
||||||
} else {
|
} else {
|
||||||
klog.V(2).Infof("No need for any nodes in %s", nodeGroup.Id())
|
klog.V(2).Infof("No need for any nodes in %s", nodeGroup.Id())
|
||||||
}
|
}
|
||||||
|
|
@ -421,12 +402,19 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
|
||||||
|
|
||||||
if len(expansionOptions) == 0 {
|
if len(expansionOptions) == 0 {
|
||||||
klog.V(1).Info("No expansion options")
|
klog.V(1).Info("No expansion options")
|
||||||
return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable, PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable, skippedNodeGroups),
|
return &status.ScaleUpStatus{
|
||||||
ConsideredNodeGroups: nodeGroups}, nil
|
Result: status.ScaleUpNoOptionsAvailable,
|
||||||
|
PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
|
||||||
|
ConsideredNodeGroups: nodeGroups,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pick some expansion option.
|
// Pick some expansion option.
|
||||||
bestOption := context.ExpanderStrategy.BestOption(expansionOptions, nodeInfos)
|
options := make([]expander.Option, 0, len(expansionOptions))
|
||||||
|
for _, o := range expansionOptions {
|
||||||
|
options = append(options, o)
|
||||||
|
}
|
||||||
|
bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
|
||||||
if bestOption != nil && bestOption.NodeCount > 0 {
|
if bestOption != nil && bestOption.NodeCount > 0 {
|
||||||
klog.V(1).Infof("Best option to resize: %s", bestOption.NodeGroup.Id())
|
klog.V(1).Infof("Best option to resize: %s", bestOption.NodeGroup.Id())
|
||||||
if len(bestOption.Debug) > 0 {
|
if len(bestOption.Debug) > 0 {
|
||||||
|
|
@ -509,7 +497,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
|
||||||
if typedErr != nil {
|
if typedErr != nil {
|
||||||
return &status.ScaleUpStatus{Result: status.ScaleUpError, CreateNodeGroupResults: createNodeGroupResults}, typedErr.AddPrefix("Failed to find matching node groups: ")
|
return &status.ScaleUpStatus{Result: status.ScaleUpError, CreateNodeGroupResults: createNodeGroupResults}, typedErr.AddPrefix("Failed to find matching node groups: ")
|
||||||
}
|
}
|
||||||
similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, getPodsPassingPredicates)
|
similarNodeGroups = filterNodeGroupsByPods(similarNodeGroups, bestOption.Pods, expansionOptions)
|
||||||
for _, ng := range similarNodeGroups {
|
for _, ng := range similarNodeGroups {
|
||||||
if clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
|
if clusterStateRegistry.IsNodeGroupSafeToScaleUp(ng, now) {
|
||||||
targetNodeGroups = append(targetNodeGroups, ng)
|
targetNodeGroups = append(targetNodeGroups, ng)
|
||||||
|
|
@ -548,139 +536,67 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
|
||||||
return &status.ScaleUpStatus{
|
return &status.ScaleUpStatus{
|
||||||
Result: status.ScaleUpSuccessful,
|
Result: status.ScaleUpSuccessful,
|
||||||
ScaleUpInfos: scaleUpInfos,
|
ScaleUpInfos: scaleUpInfos,
|
||||||
PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable, skippedNodeGroups),
|
PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
|
||||||
ConsideredNodeGroups: nodeGroups,
|
ConsideredNodeGroups: nodeGroups,
|
||||||
CreateNodeGroupResults: createNodeGroupResults,
|
CreateNodeGroupResults: createNodeGroupResults,
|
||||||
PodsTriggeredScaleUp: bestOption.Pods,
|
PodsTriggeredScaleUp: bestOption.Pods,
|
||||||
PodsAwaitEvaluation: getPodsAwaitingEvaluation(unschedulablePods, podsRemainUnschedulable, bestOption.Pods)},
|
PodsAwaitEvaluation: getPodsAwaitingEvaluation(podEquivalenceGroups, bestOption.NodeGroup.Id()),
|
||||||
nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable, PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable, skippedNodeGroups),
|
return &status.ScaleUpStatus{
|
||||||
ConsideredNodeGroups: nodeGroups}, nil
|
Result: status.ScaleUpNoOptionsAvailable,
|
||||||
|
PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
|
||||||
|
ConsideredNodeGroups: nodeGroups,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type podsPredicatePassingCheckFunctions struct {
|
func getRemainingPods(egs []*podEquivalenceGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
|
||||||
getPodsPassingPredicates func(nodeGroupId string) ([]*apiv1.Pod, error)
|
|
||||||
getPodsNotPassingPredicates func(nodeGroupId string) (map[*apiv1.Pod]status.Reasons, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getPodsPredicatePassingCheckFunctions(
|
|
||||||
context *context.AutoscalingContext,
|
|
||||||
unschedulablePods []*apiv1.Pod,
|
|
||||||
nodeInfos map[string]*schedulernodeinfo.NodeInfo) podsPredicatePassingCheckFunctions {
|
|
||||||
|
|
||||||
podsPassingPredicatesCache := make(map[string][]*apiv1.Pod)
|
|
||||||
podsNotPassingPredicatesCache := make(map[string]map[*apiv1.Pod]status.Reasons)
|
|
||||||
errorsCache := make(map[string]error)
|
|
||||||
checker := utils.NewPodsSchedulableOnNodeChecker(context, unschedulablePods)
|
|
||||||
|
|
||||||
computeCaches := func(nodeGroupId string) {
|
|
||||||
nodeInfo, found := nodeInfos[nodeGroupId]
|
|
||||||
if !found {
|
|
||||||
errorsCache[nodeGroupId] = errors.NewAutoscalerError(errors.InternalError, "NodeInfo not found for node group %v", nodeGroupId)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
podsPassing := make([]*apiv1.Pod, 0)
|
|
||||||
podsNotPassing := make(map[*apiv1.Pod]status.Reasons)
|
|
||||||
schedulableOnNode := checker.CheckPodsSchedulableOnNode(nodeGroupId, nodeInfo)
|
|
||||||
for pod, err := range schedulableOnNode {
|
|
||||||
if err == nil {
|
|
||||||
podsPassing = append(podsPassing, pod)
|
|
||||||
} else {
|
|
||||||
podsNotPassing[pod] = err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
podsPassingPredicatesCache[nodeGroupId] = podsPassing
|
|
||||||
podsNotPassingPredicatesCache[nodeGroupId] = podsNotPassing
|
|
||||||
}
|
|
||||||
|
|
||||||
return podsPredicatePassingCheckFunctions{
|
|
||||||
|
|
||||||
getPodsPassingPredicates: func(nodeGroupId string) ([]*apiv1.Pod, error) {
|
|
||||||
_, passingFound := podsPassingPredicatesCache[nodeGroupId]
|
|
||||||
_, errorFound := errorsCache[nodeGroupId]
|
|
||||||
|
|
||||||
if !passingFound && !errorFound {
|
|
||||||
computeCaches(nodeGroupId)
|
|
||||||
}
|
|
||||||
err, found := errorsCache[nodeGroupId]
|
|
||||||
if found {
|
|
||||||
return []*apiv1.Pod{}, err
|
|
||||||
}
|
|
||||||
pods, found := podsPassingPredicatesCache[nodeGroupId]
|
|
||||||
if found {
|
|
||||||
return pods, nil
|
|
||||||
}
|
|
||||||
return []*apiv1.Pod{}, errors.NewAutoscalerError(errors.InternalError, "Pods passing predicate entry not found in cache for node group %s", nodeGroupId)
|
|
||||||
},
|
|
||||||
|
|
||||||
getPodsNotPassingPredicates: func(nodeGroupId string) (map[*apiv1.Pod]status.Reasons, error) {
|
|
||||||
_, notPassingFound := podsNotPassingPredicatesCache[nodeGroupId]
|
|
||||||
_, errorFound := errorsCache[nodeGroupId]
|
|
||||||
|
|
||||||
if !notPassingFound && !errorFound {
|
|
||||||
computeCaches(nodeGroupId)
|
|
||||||
}
|
|
||||||
err, found := errorsCache[nodeGroupId]
|
|
||||||
if found {
|
|
||||||
return map[*apiv1.Pod]status.Reasons{}, err
|
|
||||||
}
|
|
||||||
pods, found := podsNotPassingPredicatesCache[nodeGroupId]
|
|
||||||
if found {
|
|
||||||
return pods, nil
|
|
||||||
}
|
|
||||||
return map[*apiv1.Pod]status.Reasons{}, errors.NewAutoscalerError(errors.InternalError, "Pods not passing predicate entry not found in cache for node group %s", nodeGroupId)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getRemainingPods(schedulingErrors map[*apiv1.Pod]map[string]status.Reasons, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
|
|
||||||
remaining := []status.NoScaleUpInfo{}
|
remaining := []status.NoScaleUpInfo{}
|
||||||
for pod, errs := range schedulingErrors {
|
for _, eg := range egs {
|
||||||
|
if eg.schedulable {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, pod := range eg.pods {
|
||||||
noScaleUpInfo := status.NoScaleUpInfo{
|
noScaleUpInfo := status.NoScaleUpInfo{
|
||||||
Pod: pod,
|
Pod: pod,
|
||||||
RejectedNodeGroups: errs,
|
RejectedNodeGroups: eg.schedulingErrors,
|
||||||
SkippedNodeGroups: skipped,
|
SkippedNodeGroups: skipped,
|
||||||
}
|
}
|
||||||
remaining = append(remaining, noScaleUpInfo)
|
remaining = append(remaining, noScaleUpInfo)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return remaining
|
return remaining
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPodsAwaitingEvaluation(allPods []*apiv1.Pod, unschedulable map[*apiv1.Pod]map[string]status.Reasons, bestOption []*apiv1.Pod) []*apiv1.Pod {
|
func getPodsAwaitingEvaluation(egs []*podEquivalenceGroup, bestOption string) []*apiv1.Pod {
|
||||||
awaitsEvaluation := make(map[*apiv1.Pod]bool, len(allPods))
|
awaitsEvaluation := []*apiv1.Pod{}
|
||||||
for _, pod := range allPods {
|
for _, eg := range egs {
|
||||||
if _, found := unschedulable[pod]; !found {
|
if eg.schedulable {
|
||||||
awaitsEvaluation[pod] = true
|
if _, found := eg.schedulingErrors[bestOption]; found {
|
||||||
|
// Schedulable, but not yet.
|
||||||
|
awaitsEvaluation = append(awaitsEvaluation, eg.pods...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, pod := range bestOption {
|
|
||||||
delete(awaitsEvaluation, pod)
|
|
||||||
}
|
}
|
||||||
|
return awaitsEvaluation
|
||||||
result := make([]*apiv1.Pod, 0)
|
|
||||||
for pod := range awaitsEvaluation {
|
|
||||||
result = append(result, pod)
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterNodeGroupsByPods(
|
func filterNodeGroupsByPods(
|
||||||
groups []cloudprovider.NodeGroup,
|
groups []cloudprovider.NodeGroup,
|
||||||
podsRequiredToFit []*apiv1.Pod,
|
podsRequiredToFit []*apiv1.Pod,
|
||||||
fittingPodsPerNodeGroup func(groupId string) ([]*apiv1.Pod, error)) []cloudprovider.NodeGroup {
|
expansionOptions map[string]expander.Option) []cloudprovider.NodeGroup {
|
||||||
|
|
||||||
result := make([]cloudprovider.NodeGroup, 0)
|
result := make([]cloudprovider.NodeGroup, 0)
|
||||||
|
|
||||||
groupsloop:
|
groupsloop:
|
||||||
for _, group := range groups {
|
for _, group := range groups {
|
||||||
fittingPods, err := fittingPodsPerNodeGroup(group.Id())
|
option, found := expansionOptions[group.Id()]
|
||||||
if err != nil {
|
if !found {
|
||||||
klog.V(1).Infof("No info about pods passing predicates found for group %v, skipping it from scale-up consideration; err=%v", group.Id(), err)
|
klog.V(1).Infof("No info about pods passing predicates found for group %v, skipping it from scale-up consideration", group.Id())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
fittingPods := option.Pods
|
||||||
podSet := make(map[*apiv1.Pod]bool, len(fittingPods))
|
podSet := make(map[*apiv1.Pod]bool, len(fittingPods))
|
||||||
for _, pod := range fittingPods {
|
for _, pod := range fittingPods {
|
||||||
podSet[pod] = true
|
podSet[pod] = true
|
||||||
|
|
|
||||||
|
|
@ -21,14 +21,8 @@ import (
|
||||||
|
|
||||||
apiv1 "k8s.io/api/core/v1"
|
apiv1 "k8s.io/api/core/v1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
|
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/utils/glogx"
|
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
|
||||||
|
|
||||||
"k8s.io/klog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// PodSchedulableInfo data structure is used to avoid running predicates #pending_pods * #nodes
|
// PodSchedulableInfo data structure is used to avoid running predicates #pending_pods * #nodes
|
||||||
|
|
@ -89,89 +83,3 @@ func (podMap PodSchedulableMap) Set(pod *apiv1.Pod, err *simulator.PredicateErro
|
||||||
schedulingError: err,
|
schedulingError: err,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodsSchedulableOnNodeChecker allows for querying what subset of pods from a set is schedulable on given node.
|
|
||||||
// Pods set is give at creation time and then multiple nodes can be used for querying.
|
|
||||||
type PodsSchedulableOnNodeChecker struct {
|
|
||||||
context *context.AutoscalingContext
|
|
||||||
pods []*apiv1.Pod
|
|
||||||
podsEquivalenceGroups map[types.UID]equivalenceGroupId
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPodsSchedulableOnNodeChecker creates an instance of PodsSchedulableOnNodeChecker
|
|
||||||
func NewPodsSchedulableOnNodeChecker(context *context.AutoscalingContext, pods []*apiv1.Pod) *PodsSchedulableOnNodeChecker {
|
|
||||||
checker := PodsSchedulableOnNodeChecker{
|
|
||||||
context: context,
|
|
||||||
pods: pods,
|
|
||||||
podsEquivalenceGroups: make(map[types.UID]equivalenceGroupId),
|
|
||||||
}
|
|
||||||
|
|
||||||
// compute the podsEquivalenceGroups
|
|
||||||
var nextGroupId equivalenceGroupId
|
|
||||||
type equivalenceGroup struct {
|
|
||||||
id equivalenceGroupId
|
|
||||||
representant *apiv1.Pod
|
|
||||||
}
|
|
||||||
|
|
||||||
equivalenceGroupsByController := make(map[types.UID][]equivalenceGroup)
|
|
||||||
|
|
||||||
for _, pod := range pods {
|
|
||||||
controllerRef := drain.ControllerRef(pod)
|
|
||||||
if controllerRef == nil {
|
|
||||||
checker.podsEquivalenceGroups[pod.UID] = nextGroupId
|
|
||||||
nextGroupId++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
matchingFound := false
|
|
||||||
for _, g := range equivalenceGroupsByController[controllerRef.UID] {
|
|
||||||
if reflect.DeepEqual(pod.Labels, g.representant.Labels) && apiequality.Semantic.DeepEqual(pod.Spec, g.representant.Spec) {
|
|
||||||
matchingFound = true
|
|
||||||
checker.podsEquivalenceGroups[pod.UID] = g.id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !matchingFound {
|
|
||||||
newGroup := equivalenceGroup{
|
|
||||||
id: nextGroupId,
|
|
||||||
representant: pod,
|
|
||||||
}
|
|
||||||
equivalenceGroupsByController[controllerRef.UID] = append(equivalenceGroupsByController[controllerRef.UID], newGroup)
|
|
||||||
checker.podsEquivalenceGroups[pod.UID] = newGroup.id
|
|
||||||
nextGroupId++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &checker
|
|
||||||
}
|
|
||||||
|
|
||||||
// CheckPodsSchedulableOnNode checks if pods can be scheduled on the given node.
|
|
||||||
func (c *PodsSchedulableOnNodeChecker) CheckPodsSchedulableOnNode(nodeGroupId string, nodeInfo *schedulernodeinfo.NodeInfo) map[*apiv1.Pod]*simulator.PredicateError {
|
|
||||||
loggingQuota := glogx.PodsLoggingQuota()
|
|
||||||
schedulingErrors := make(map[equivalenceGroupId]*simulator.PredicateError)
|
|
||||||
|
|
||||||
for _, pod := range c.pods {
|
|
||||||
equivalenceGroup := c.podsEquivalenceGroups[pod.UID]
|
|
||||||
err, found := schedulingErrors[equivalenceGroup]
|
|
||||||
if found && err != nil {
|
|
||||||
glogx.V(2).UpTo(loggingQuota).Infof("Pod %s can't be scheduled on %s. Used cached predicate check results", pod.Name, nodeGroupId)
|
|
||||||
}
|
|
||||||
// Not found in cache, have to run the predicates.
|
|
||||||
if !found {
|
|
||||||
err = c.context.PredicateChecker.CheckPredicates(pod, nil, nodeInfo)
|
|
||||||
schedulingErrors[equivalenceGroup] = err
|
|
||||||
if err != nil {
|
|
||||||
// Always log for the first pod in a controller.
|
|
||||||
klog.V(2).Infof("Pod %s can't be scheduled on %s, predicate failed: %v", pod.Name, nodeGroupId, err.VerboseError())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
glogx.V(2).Over(loggingQuota).Infof("%v other pods can't be scheduled on %s.", -loggingQuota.Left(), nodeGroupId)
|
|
||||||
|
|
||||||
schedulingErrorsByPod := make(map[*apiv1.Pod]*simulator.PredicateError)
|
|
||||||
for _, pod := range c.pods {
|
|
||||||
schedulingErrorsByPod[pod] = schedulingErrors[c.podsEquivalenceGroups[pod.UID]]
|
|
||||||
}
|
|
||||||
return schedulingErrorsByPod
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -19,17 +19,14 @@ package utils
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/context"
|
|
||||||
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
"k8s.io/autoscaler/cluster-autoscaler/simulator"
|
||||||
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
|
||||||
|
|
||||||
apiv1 "k8s.io/api/core/v1"
|
apiv1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPodSchedulableMap(t *testing.T) {
|
func TestPodSchedulableMap(t *testing.T) {
|
||||||
|
|
@ -108,60 +105,3 @@ func TestPodSchedulableMap(t *testing.T) {
|
||||||
assert.True(t, found)
|
assert.True(t, found)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFilterSchedulablePodsForNode(t *testing.T) {
|
|
||||||
rc1 := apiv1.ReplicationController{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "rc1",
|
|
||||||
Namespace: "default",
|
|
||||||
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
|
|
||||||
UID: "12345678-1234-1234-1234-123456789012",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
rc2 := apiv1.ReplicationController{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "rc2",
|
|
||||||
Namespace: "default",
|
|
||||||
SelfLink: testapi.Default.SelfLink("replicationcontrollers", "rc"),
|
|
||||||
UID: "12345678-1234-1234-1234-12345678901a",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
p1 := BuildTestPod("p1", 1500, 200000)
|
|
||||||
p2_1 := BuildTestPod("p2_1", 3000, 200000)
|
|
||||||
p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
|
|
||||||
p2_2 := BuildTestPod("p2_2", 3000, 200000)
|
|
||||||
p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
|
|
||||||
p3_1 := BuildTestPod("p3_1", 100, 200000)
|
|
||||||
p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
|
|
||||||
p3_2 := BuildTestPod("p3_2", 100, 200000)
|
|
||||||
p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
|
|
||||||
unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2}
|
|
||||||
|
|
||||||
tn := BuildTestNode("T1-abc", 2000, 2000000)
|
|
||||||
SetNodeReadyState(tn, true, time.Time{})
|
|
||||||
tni := schedulernodeinfo.NewNodeInfo()
|
|
||||||
tni.SetNode(tn)
|
|
||||||
|
|
||||||
context := &context.AutoscalingContext{
|
|
||||||
PredicateChecker: simulator.NewTestPredicateChecker(),
|
|
||||||
}
|
|
||||||
|
|
||||||
checker := NewPodsSchedulableOnNodeChecker(context, unschedulablePods)
|
|
||||||
res := checker.CheckPodsSchedulableOnNode("T1-abc", tni)
|
|
||||||
wantedSchedulable := []*apiv1.Pod{p1, p3_1, p3_2}
|
|
||||||
wantedUnschedulable := []*apiv1.Pod{p2_1, p2_2}
|
|
||||||
|
|
||||||
assert.Equal(t, 5, len(res))
|
|
||||||
for _, pod := range wantedSchedulable {
|
|
||||||
err, found := res[pod]
|
|
||||||
assert.True(t, found)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
}
|
|
||||||
for _, pod := range wantedUnschedulable {
|
|
||||||
err, found := res[pod]
|
|
||||||
assert.True(t, found)
|
|
||||||
assert.NotNil(t, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -46,8 +46,6 @@ const (
|
||||||
IgnoreTaintPrefix = "ignore-taint.cluster-autoscaler.kubernetes.io/"
|
IgnoreTaintPrefix = "ignore-taint.cluster-autoscaler.kubernetes.io/"
|
||||||
)
|
)
|
||||||
|
|
||||||
type equivalenceGroupId int
|
|
||||||
|
|
||||||
// GetNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping.
|
// GetNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping.
|
||||||
func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulernodeinfo.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
|
func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulernodeinfo.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
|
||||||
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
|
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue