Merge pull request #2071 from losipiuk/lo/predicate-checker-speedup

Precompute inter pod equivalence groups in checkPodsSchedulableOnNode
This commit is contained in:
Kubernetes Prow Robot 2019-06-03 03:52:16 -07:00 committed by GitHub
commit a0853bcc80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 25 deletions

View File

@ -563,6 +563,7 @@ func getPodsPredicatePassingCheckFunctions(
podsPassingPredicatesCache := make(map[string][]*apiv1.Pod) podsPassingPredicatesCache := make(map[string][]*apiv1.Pod)
podsNotPassingPredicatesCache := make(map[string]map[*apiv1.Pod]status.Reasons) podsNotPassingPredicatesCache := make(map[string]map[*apiv1.Pod]status.Reasons)
errorsCache := make(map[string]error) errorsCache := make(map[string]error)
checker := newPodsSchedulableOnNodeChecker(context, unschedulablePods)
computeCaches := func(nodeGroupId string) { computeCaches := func(nodeGroupId string) {
nodeInfo, found := nodeInfos[nodeGroupId] nodeInfo, found := nodeInfos[nodeGroupId]
@ -573,7 +574,7 @@ func getPodsPredicatePassingCheckFunctions(
podsPassing := make([]*apiv1.Pod, 0) podsPassing := make([]*apiv1.Pod, 0)
podsNotPassing := make(map[*apiv1.Pod]status.Reasons) podsNotPassing := make(map[*apiv1.Pod]status.Reasons)
schedulableOnNode := checkPodsSchedulableOnNode(context, unschedulablePods, nodeGroupId, nodeInfo) schedulableOnNode := checker.checkPodsSchedulableOnNode(nodeGroupId, nodeInfo)
for pod, err := range schedulableOnNode { for pod, err := range schedulableOnNode {
if err == nil { if err == nil {
podsPassing = append(podsPassing, pod) podsPassing = append(podsPassing, pod)

View File

@ -18,6 +18,7 @@ package core
import ( import (
"fmt" "fmt"
"k8s.io/apimachinery/pkg/types"
"math/rand" "math/rand"
"reflect" "reflect"
"time" "time"
@ -150,40 +151,89 @@ func filterOutExpendablePods(pods []*apiv1.Pod, expendablePodsPriorityCutoff int
return result return result
} }
// checkPodsSchedulableOnNode checks if pods can be scheduled on the given node. type equivalenceGroupId int
func checkPodsSchedulableOnNode(context *context.AutoscalingContext, pods []*apiv1.Pod, nodeGroupId string, nodeInfo *schedulernodeinfo.NodeInfo) map[*apiv1.Pod]*simulator.PredicateError {
schedulingErrors := map[*apiv1.Pod]*simulator.PredicateError{} type podsSchedulableOnNodeChecker struct {
loggingQuota := glogx.PodsLoggingQuota() context *context.AutoscalingContext
podSchedulable := make(podSchedulableMap) pods []*apiv1.Pod
podsEquivalenceGroups map[types.UID]equivalenceGroupId
}
func newPodsSchedulableOnNodeChecker(context *context.AutoscalingContext, pods []*apiv1.Pod) *podsSchedulableOnNodeChecker {
checker := podsSchedulableOnNodeChecker{
context: context,
pods: pods,
podsEquivalenceGroups: make(map[types.UID]equivalenceGroupId),
}
// compute the podsEquivalenceGroups
var nextGroupId equivalenceGroupId
type equivalanceGroup struct {
id equivalenceGroupId
representant *apiv1.Pod
}
equivalenceGroupsByController := make(map[types.UID][]equivalanceGroup)
for _, pod := range pods { for _, pod := range pods {
// Check if pod isn't repeated before overwriting result for it. controllerRef := drain.ControllerRef(pod)
if _, repeated := schedulingErrors[pod]; repeated { if controllerRef == nil {
// This shouldn't really happen. checker.podsEquivalenceGroups[pod.UID] = nextGroupId
klog.Warningf("Pod %v appears multiple time on pods list, will only count it once in scale-up simulation", pod) nextGroupId++
continue
} }
// Try to get result from cache.
err, found := podSchedulable.get(pod) matchingFound := false
if found { for _, g := range equivalenceGroupsByController[controllerRef.UID] {
schedulingErrors[pod] = err if reflect.DeepEqual(pod.Labels, g.representant.Labels) && apiequality.Semantic.DeepEqual(pod.Spec, g.representant.Spec) {
if err != nil { matchingFound = true
checker.podsEquivalenceGroups[pod.UID] = g.id
break
}
}
if !matchingFound {
newGroup := equivalanceGroup{
id: nextGroupId,
representant: pod,
}
equivalenceGroupsByController[controllerRef.UID] = append(equivalenceGroupsByController[controllerRef.UID], newGroup)
checker.podsEquivalenceGroups[pod.UID] = newGroup.id
nextGroupId++
}
}
return &checker
}
// checkPodsSchedulableOnNode checks if pods can be scheduled on the given node.
func (c *podsSchedulableOnNodeChecker) checkPodsSchedulableOnNode(nodeGroupId string, nodeInfo *schedulernodeinfo.NodeInfo) map[*apiv1.Pod]*simulator.PredicateError {
loggingQuota := glogx.PodsLoggingQuota()
schedulingErrors := make(map[equivalenceGroupId]*simulator.PredicateError)
for _, pod := range c.pods {
equivalenceGroup := c.podsEquivalenceGroups[pod.UID]
err, found := schedulingErrors[equivalenceGroup]
if found && err != nil {
glogx.V(2).UpTo(loggingQuota).Infof("Pod %s can't be scheduled on %s. Used cached predicate check results", pod.Name, nodeGroupId) glogx.V(2).UpTo(loggingQuota).Infof("Pod %s can't be scheduled on %s. Used cached predicate check results", pod.Name, nodeGroupId)
} }
}
// Not found in cache, have to run the predicates. // Not found in cache, have to run the predicates.
if !found { if !found {
err = context.PredicateChecker.CheckPredicates(pod, nil, nodeInfo) err = c.context.PredicateChecker.CheckPredicates(pod, nil, nodeInfo)
podSchedulable.set(pod, err) schedulingErrors[equivalenceGroup] = err
schedulingErrors[pod] = err
if err != nil { if err != nil {
// Always log for the first pod in a controller. // Always log for the first pod in a controller.
klog.V(2).Infof("Pod %s can't be scheduled on %s, predicate failed: %v", pod.Name, nodeGroupId, err.VerboseError()) klog.V(2).Infof("Pod %s can't be scheduled on %s, predicate failed: %v", pod.Name, nodeGroupId, err.VerboseError())
} }
} }
} }
glogx.V(2).Over(loggingQuota).Infof("%v other pods can't be scheduled on %s.", -loggingQuota.Left(), nodeGroupId) glogx.V(2).Over(loggingQuota).Infof("%v other pods can't be scheduled on %s.", -loggingQuota.Left(), nodeGroupId)
return schedulingErrors
schedulingErrorsByPod := make(map[*apiv1.Pod]*simulator.PredicateError)
for _, pod := range c.pods {
schedulingErrorsByPod[pod] = schedulingErrors[c.podsEquivalenceGroups[pod.UID]]
}
return schedulingErrorsByPod
} }
// getNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping. // getNodeInfosForGroups finds NodeInfos for all node groups used to manage the given nodes. It also returns a node group to sample node mapping.

View File

@ -193,13 +193,13 @@ func TestFilterSchedulablePodsForNode(t *testing.T) {
} }
p1 := BuildTestPod("p1", 1500, 200000) p1 := BuildTestPod("p1", 1500, 200000)
p2_1 := BuildTestPod("p2_2", 3000, 200000) p2_1 := BuildTestPod("p2_1", 3000, 200000)
p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p2_2 := BuildTestPod("p2_2", 3000, 200000) p2_2 := BuildTestPod("p2_2", 3000, 200000)
p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID) p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p3_1 := BuildTestPod("p3", 100, 200000) p3_1 := BuildTestPod("p3_1", 100, 200000)
p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
p3_2 := BuildTestPod("p3", 100, 200000) p3_2 := BuildTestPod("p3_2", 100, 200000)
p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID) p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2} unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2}
@ -212,7 +212,8 @@ func TestFilterSchedulablePodsForNode(t *testing.T) {
PredicateChecker: simulator.NewTestPredicateChecker(), PredicateChecker: simulator.NewTestPredicateChecker(),
} }
res := checkPodsSchedulableOnNode(context, unschedulablePods, "T1-abc", tni) checker := newPodsSchedulableOnNodeChecker(context, unschedulablePods)
res := checker.checkPodsSchedulableOnNode("T1-abc", tni)
wantedSchedulable := []*apiv1.Pod{p1, p3_1, p3_2} wantedSchedulable := []*apiv1.Pod{p1, p3_1, p3_2}
wantedUnschedulable := []*apiv1.Pod{p2_1, p2_2} wantedUnschedulable := []*apiv1.Pod{p2_1, p2_2}

View File

@ -39,6 +39,7 @@ import (
func BuildTestPod(name string, cpu int64, mem int64) *apiv1.Pod { func BuildTestPod(name string, cpu int64, mem int64) *apiv1.Pod {
pod := &apiv1.Pod{ pod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
UID: types.UID(name),
Namespace: "default", Namespace: "default",
Name: name, Name: name,
SelfLink: fmt.Sprintf("/api/v1/namespaces/default/pods/%s", name), SelfLink: fmt.Sprintf("/api/v1/namespaces/default/pods/%s", name),