Cluster-autoscaler: pod relocation hints
This commit is contained in:
		
							parent
							
								
									09c12be106
								
							
						
					
					
						commit
						6c74123879
					
				| 
						 | 
				
			
			@ -109,6 +109,7 @@ func main() {
 | 
			
		|||
	lastScaleUpTime := time.Now()
 | 
			
		||||
	lastScaleDownFailedTrial := time.Now()
 | 
			
		||||
	unneededNodes := make(map[string]time.Time)
 | 
			
		||||
	podLocationHints := make(map[string]string)
 | 
			
		||||
 | 
			
		||||
	eventBroadcaster := kube_record.NewBroadcaster()
 | 
			
		||||
	eventBroadcaster.StartLogging(glog.Infof)
 | 
			
		||||
| 
						 | 
				
			
			@ -241,12 +242,13 @@ func main() {
 | 
			
		|||
					updateLastTime("findUnneeded")
 | 
			
		||||
					glog.V(4).Infof("Calculating unneded nodes")
 | 
			
		||||
 | 
			
		||||
					unneededNodes = FindUnneededNodes(
 | 
			
		||||
					unneededNodes, podLocationHints = FindUnneededNodes(
 | 
			
		||||
						nodes,
 | 
			
		||||
						unneededNodes,
 | 
			
		||||
						*scaleDownUtilizationThreshold,
 | 
			
		||||
						allScheduled,
 | 
			
		||||
						predicateChecker)
 | 
			
		||||
						predicateChecker,
 | 
			
		||||
						podLocationHints)
 | 
			
		||||
 | 
			
		||||
					updateDuration("findUnneeded", unneededStart)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -267,7 +269,10 @@ func main() {
 | 
			
		|||
							unneededNodes,
 | 
			
		||||
							*scaleDownUnneededTime,
 | 
			
		||||
							allScheduled,
 | 
			
		||||
							cloudProvider, kubeClient, predicateChecker)
 | 
			
		||||
							cloudProvider,
 | 
			
		||||
							kubeClient,
 | 
			
		||||
							predicateChecker,
 | 
			
		||||
							podLocationHints)
 | 
			
		||||
 | 
			
		||||
						updateDuration("scaledown", scaleDownStart)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -44,12 +44,13 @@ const (
 | 
			
		|||
)
 | 
			
		||||
 | 
			
		||||
// FindUnneededNodes calculates which nodes are not needed, i.e. all pods can be scheduled somewhere else,
 | 
			
		||||
// and updates unneededNodes map accordingly.
 | 
			
		||||
// and updates unneededNodes map accordingly. It also returns information where pods can be rescheduld.
 | 
			
		||||
func FindUnneededNodes(nodes []*kube_api.Node,
 | 
			
		||||
	unneededNodes map[string]time.Time,
 | 
			
		||||
	utilizationThreshold float64,
 | 
			
		||||
	pods []*kube_api.Pod,
 | 
			
		||||
	predicateChecker *simulator.PredicateChecker) map[string]time.Time {
 | 
			
		||||
	predicateChecker *simulator.PredicateChecker,
 | 
			
		||||
	oldHints map[string]string) (unnededTimeMap map[string]time.Time, podReschedulingHints map[string]string) {
 | 
			
		||||
 | 
			
		||||
	currentlyUnneededNodes := make([]*kube_api.Node, 0)
 | 
			
		||||
	nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods)
 | 
			
		||||
| 
						 | 
				
			
			@ -76,12 +77,12 @@ func FindUnneededNodes(nodes []*kube_api.Node,
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	// Phase2 - check which nodes can be probably removed using fast drain.
 | 
			
		||||
	nodesToRemove, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
 | 
			
		||||
	nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
 | 
			
		||||
		nil, predicateChecker,
 | 
			
		||||
		len(currentlyUnneededNodes), true)
 | 
			
		||||
		len(currentlyUnneededNodes), true, oldHints)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		glog.Errorf("Error while simulating node drains: %v", err)
 | 
			
		||||
		return map[string]time.Time{}
 | 
			
		||||
		return map[string]time.Time{}, oldHints
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Update the timestamp map.
 | 
			
		||||
| 
						 | 
				
			
			@ -95,7 +96,7 @@ func FindUnneededNodes(nodes []*kube_api.Node,
 | 
			
		|||
			result[name] = val
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return result
 | 
			
		||||
	return result, newHints
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
 | 
			
		||||
| 
						 | 
				
			
			@ -107,7 +108,8 @@ func ScaleDown(
 | 
			
		|||
	pods []*kube_api.Pod,
 | 
			
		||||
	cloudProvider cloudprovider.CloudProvider,
 | 
			
		||||
	client *kube_client.Client,
 | 
			
		||||
	predicateChecker *simulator.PredicateChecker) (ScaleDownResult, error) {
 | 
			
		||||
	predicateChecker *simulator.PredicateChecker,
 | 
			
		||||
	oldHints map[string]string) (ScaleDownResult, error) {
 | 
			
		||||
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	candidates := make([]*kube_api.Node, 0)
 | 
			
		||||
| 
						 | 
				
			
			@ -150,7 +152,9 @@ func ScaleDown(
 | 
			
		|||
		return ScaleDownNoUnneeded, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	nodesToRemove, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false)
 | 
			
		||||
	// TODO: use new hints
 | 
			
		||||
	nodesToRemove, _, err := simulator.FindNodesToRemove(candidates, nodes, pods, client, predicateChecker, 1, false,
 | 
			
		||||
		oldHints)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return ScaleDownError, fmt.Errorf("Find node to remove failed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -55,16 +55,17 @@ func TestFindUnneededNodes(t *testing.T) {
 | 
			
		|||
	n3 := BuildTestNode("n3", 1000, 10)
 | 
			
		||||
	n4 := BuildTestNode("n4", 10000, 10)
 | 
			
		||||
 | 
			
		||||
	result := FindUnneededNodes([]*kube_api.Node{n1, n2, n3, n4}, map[string]time.Time{}, 0.35,
 | 
			
		||||
		[]*kube_api.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker())
 | 
			
		||||
	result, hints := FindUnneededNodes([]*kube_api.Node{n1, n2, n3, n4}, map[string]time.Time{}, 0.35,
 | 
			
		||||
		[]*kube_api.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker(), make(map[string]string))
 | 
			
		||||
 | 
			
		||||
	assert.Equal(t, 1, len(result))
 | 
			
		||||
	addTime, found := result["n2"]
 | 
			
		||||
	assert.True(t, found)
 | 
			
		||||
	assert.Contains(t, hints, p2.Namespace+"/"+p2.Name)
 | 
			
		||||
 | 
			
		||||
	result["n1"] = time.Now()
 | 
			
		||||
	result2 := FindUnneededNodes([]*kube_api.Node{n1, n2, n3, n4}, result, 0.35,
 | 
			
		||||
		[]*kube_api.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker())
 | 
			
		||||
	result2, hints := FindUnneededNodes([]*kube_api.Node{n1, n2, n3, n4}, result, 0.35,
 | 
			
		||||
		[]*kube_api.Pod{p1, p2, p3, p4}, simulator.NewTestPredicateChecker(), hints)
 | 
			
		||||
 | 
			
		||||
	assert.Equal(t, 1, len(result2))
 | 
			
		||||
	addTime2, found := result2["n2"]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -38,10 +38,11 @@ var (
 | 
			
		|||
		"If true cluster autoscaler will never delete nodes with pods with local storage, e.g. EmptyDir or HostPath")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// FindNodesToRemove finds nodes that can be removed.
 | 
			
		||||
// FindNodesToRemove finds nodes that can be removed. Returns also an information about good
 | 
			
		||||
// rescheduling location for each of the pods.
 | 
			
		||||
func FindNodesToRemove(candidates []*kube_api.Node, allNodes []*kube_api.Node, pods []*kube_api.Pod,
 | 
			
		||||
	client *kube_client.Client, predicateChecker *PredicateChecker, maxCount int,
 | 
			
		||||
	fastCheck bool) ([]*kube_api.Node, error) {
 | 
			
		||||
	fastCheck bool, oldHints map[string]string) (nodesToRemove []*kube_api.Node, podReschedulingHints map[string]string, finalError error) {
 | 
			
		||||
 | 
			
		||||
	nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods)
 | 
			
		||||
	for _, node := range allNodes {
 | 
			
		||||
| 
						 | 
				
			
			@ -55,6 +56,7 @@ func FindNodesToRemove(candidates []*kube_api.Node, allNodes []*kube_api.Node, p
 | 
			
		|||
	if fastCheck {
 | 
			
		||||
		evaluationType = "Fast evaluation"
 | 
			
		||||
	}
 | 
			
		||||
	newHints := make(map[string]string, len(oldHints))
 | 
			
		||||
 | 
			
		||||
candidateloop:
 | 
			
		||||
	for _, node := range candidates {
 | 
			
		||||
| 
						 | 
				
			
			@ -87,7 +89,7 @@ candidateloop:
 | 
			
		|||
				podsToRemove = append(podsToRemove, &drainResult[i])
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		findProblems := findPlaceFor(node.Name, podsToRemove, allNodes, nodeNameToNodeInfo, predicateChecker)
 | 
			
		||||
		findProblems := findPlaceFor(node.Name, podsToRemove, allNodes, nodeNameToNodeInfo, predicateChecker, oldHints, newHints)
 | 
			
		||||
		if findProblems == nil {
 | 
			
		||||
			result = append(result, node)
 | 
			
		||||
			glog.V(2).Infof("%s: node %s may be removed", evaluationType, node.Name)
 | 
			
		||||
| 
						 | 
				
			
			@ -98,7 +100,7 @@ candidateloop:
 | 
			
		|||
			glog.V(2).Infof("%s: node %s is not suitable for removal %v", evaluationType, node.Name, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return result, nil
 | 
			
		||||
	return result, newHints, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CalculateUtilization calculates utilization of a node, defined as total amount of requested resources divided by capacity.
 | 
			
		||||
| 
						 | 
				
			
			@ -135,10 +137,37 @@ func calculateUtilizationOfResource(node *kube_api.Node, nodeInfo *schedulercach
 | 
			
		|||
 | 
			
		||||
// TODO: We don't need to pass list of nodes here as they are already available in nodeInfos.
 | 
			
		||||
func findPlaceFor(bannedNode string, pods []*kube_api.Pod, nodes []*kube_api.Node, nodeInfos map[string]*schedulercache.NodeInfo,
 | 
			
		||||
	predicateChecker *PredicateChecker) error {
 | 
			
		||||
	predicateChecker *PredicateChecker, oldHints map[string]string, newHints map[string]string) error {
 | 
			
		||||
 | 
			
		||||
	newNodeInfos := make(map[string]*schedulercache.NodeInfo)
 | 
			
		||||
 | 
			
		||||
	podKey := func(pod *kube_api.Pod) string {
 | 
			
		||||
		return fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tryNodeForPod := func(nodename string, pod *kube_api.Pod) bool {
 | 
			
		||||
		nodeInfo, found := newNodeInfos[nodename]
 | 
			
		||||
		if !found {
 | 
			
		||||
			nodeInfo, found = nodeInfos[nodename]
 | 
			
		||||
		}
 | 
			
		||||
		if found {
 | 
			
		||||
			nodeInfo.Node().Status.Allocatable = nodeInfo.Node().Status.Capacity
 | 
			
		||||
			err := predicateChecker.CheckPredicates(pod, nodeInfo)
 | 
			
		||||
			glog.V(4).Infof("Evaluation %s for %s/%s -> %v", nodename, pod.Namespace, pod.Name, err)
 | 
			
		||||
			if err == nil {
 | 
			
		||||
				// TODO(mwielgus): Optimize it.
 | 
			
		||||
				podsOnNode := nodeInfo.Pods()
 | 
			
		||||
				podsOnNode = append(podsOnNode, pod)
 | 
			
		||||
				newNodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
 | 
			
		||||
				newNodeInfo.SetNode(nodeInfo.Node())
 | 
			
		||||
				newNodeInfos[nodename] = newNodeInfo
 | 
			
		||||
				newHints[podKey(pod)] = nodename
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, podptr := range pods {
 | 
			
		||||
		newpod := *podptr
 | 
			
		||||
		newpod.Spec.NodeName = ""
 | 
			
		||||
| 
						 | 
				
			
			@ -146,38 +175,27 @@ func findPlaceFor(bannedNode string, pods []*kube_api.Pod, nodes []*kube_api.Nod
 | 
			
		|||
 | 
			
		||||
		foundPlace := false
 | 
			
		||||
		glog.V(4).Infof("Looking for place for %s/%s", pod.Namespace, pod.Name)
 | 
			
		||||
		podKey := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
 | 
			
		||||
 | 
			
		||||
		// TODO: Sort nodes by utilization
 | 
			
		||||
	nodeloop:
 | 
			
		||||
		for _, node := range nodes {
 | 
			
		||||
			if node.Name == bannedNode {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			node.Status.Allocatable = node.Status.Capacity
 | 
			
		||||
			nodeInfo, found := newNodeInfos[node.Name]
 | 
			
		||||
			if !found {
 | 
			
		||||
				nodeInfo, found = nodeInfos[node.Name]
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if found {
 | 
			
		||||
				err := predicateChecker.CheckPredicates(pod, nodeInfo)
 | 
			
		||||
				glog.V(4).Infof("Evaluation %s for %s -> %v", node.Name, podKey, err)
 | 
			
		||||
				if err == nil {
 | 
			
		||||
					foundPlace = true
 | 
			
		||||
					// TODO(mwielgus): Optimize it.
 | 
			
		||||
					podsOnNode := nodeInfo.Pods()
 | 
			
		||||
					podsOnNode = append(podsOnNode, pod)
 | 
			
		||||
					newNodeInfo := schedulercache.NewNodeInfo(podsOnNode...)
 | 
			
		||||
					newNodeInfo.SetNode(node)
 | 
			
		||||
					newNodeInfos[node.Name] = newNodeInfo
 | 
			
		||||
					break nodeloop
 | 
			
		||||
				}
 | 
			
		||||
		hintedNode, hasHint := oldHints[podKey(pod)]
 | 
			
		||||
		if hasHint {
 | 
			
		||||
			if hintedNode != bannedNode && tryNodeForPod(hintedNode, pod) {
 | 
			
		||||
				foundPlace = true
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !foundPlace {
 | 
			
		||||
			return fmt.Errorf("failed to find place for %s", podKey)
 | 
			
		||||
			// TODO: Sort nodes by utilization
 | 
			
		||||
			for _, node := range nodes {
 | 
			
		||||
				if node.Name == bannedNode {
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				if tryNodeForPod(node.Name, pod) {
 | 
			
		||||
					foundPlace = true
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			if !foundPlace {
 | 
			
		||||
				return fmt.Errorf("failed to find place for %s", podKey)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -58,11 +58,19 @@ func TestFindPlaceAllOk(t *testing.T) {
 | 
			
		|||
	nodeInfos["n1"].SetNode(node1)
 | 
			
		||||
	nodeInfos["n2"].SetNode(node2)
 | 
			
		||||
 | 
			
		||||
	oldHints := make(map[string]string)
 | 
			
		||||
	newHints := make(map[string]string)
 | 
			
		||||
 | 
			
		||||
	err := findPlaceFor(
 | 
			
		||||
		"x",
 | 
			
		||||
		[]*kube_api.Pod{new1, new2},
 | 
			
		||||
		[]*kube_api.Node{node1, node2},
 | 
			
		||||
		nodeInfos, NewTestPredicateChecker())
 | 
			
		||||
		nodeInfos, NewTestPredicateChecker(),
 | 
			
		||||
		oldHints, newHints)
 | 
			
		||||
 | 
			
		||||
	assert.Len(t, newHints, 2)
 | 
			
		||||
	assert.Contains(t, newHints, new1.Namespace+"/"+new1.Name)
 | 
			
		||||
	assert.Contains(t, newHints, new2.Namespace+"/"+new2.Name)
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -73,20 +81,31 @@ func TestFindPlaceAllBas(t *testing.T) {
 | 
			
		|||
	new3 := BuildTestPod("p4", 700, 500000)
 | 
			
		||||
 | 
			
		||||
	nodeInfos := map[string]*schedulercache.NodeInfo{
 | 
			
		||||
		"n1": schedulercache.NewNodeInfo(pod1),
 | 
			
		||||
		"n2": schedulercache.NewNodeInfo(),
 | 
			
		||||
		"n1":   schedulercache.NewNodeInfo(pod1),
 | 
			
		||||
		"n2":   schedulercache.NewNodeInfo(),
 | 
			
		||||
		"nbad": schedulercache.NewNodeInfo(),
 | 
			
		||||
	}
 | 
			
		||||
	nodebad := BuildTestNode("nbad", 1000, 2000000)
 | 
			
		||||
	node1 := BuildTestNode("n1", 1000, 2000000)
 | 
			
		||||
	node2 := BuildTestNode("n2", 1000, 2000000)
 | 
			
		||||
	nodeInfos["n1"].SetNode(node1)
 | 
			
		||||
	nodeInfos["n2"].SetNode(node2)
 | 
			
		||||
	nodeInfos["nbad"].SetNode(nodebad)
 | 
			
		||||
 | 
			
		||||
	oldHints := make(map[string]string)
 | 
			
		||||
	newHints := make(map[string]string)
 | 
			
		||||
 | 
			
		||||
	err := findPlaceFor(
 | 
			
		||||
		"x",
 | 
			
		||||
		"nbad",
 | 
			
		||||
		[]*kube_api.Pod{new1, new2, new3},
 | 
			
		||||
		[]*kube_api.Node{node1, node2},
 | 
			
		||||
		nodeInfos, NewTestPredicateChecker())
 | 
			
		||||
		[]*kube_api.Node{nodebad, node1, node2},
 | 
			
		||||
		nodeInfos, NewTestPredicateChecker(),
 | 
			
		||||
		oldHints, newHints)
 | 
			
		||||
 | 
			
		||||
	assert.Error(t, err)
 | 
			
		||||
	assert.True(t, len(newHints) == 2)
 | 
			
		||||
	assert.Contains(t, newHints, new1.Namespace+"/"+new1.Name)
 | 
			
		||||
	assert.Contains(t, newHints, new2.Namespace+"/"+new2.Name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestFindNone(t *testing.T) {
 | 
			
		||||
| 
						 | 
				
			
			@ -105,6 +124,8 @@ func TestFindNone(t *testing.T) {
 | 
			
		|||
		"x",
 | 
			
		||||
		[]*kube_api.Pod{},
 | 
			
		||||
		[]*kube_api.Node{node1, node2},
 | 
			
		||||
		nodeInfos, NewTestPredicateChecker())
 | 
			
		||||
		nodeInfos, NewTestPredicateChecker(),
 | 
			
		||||
		make(map[string]string),
 | 
			
		||||
		make(map[string]string))
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue