Merge pull request #2031 from krzysztof-jastrzebski/master

Add functionality which delays node deletion to let other components prepare for deletion.
This commit is contained in:
Kubernetes Prow Robot 2019-05-20 00:57:13 -07:00 committed by GitHub
commit cb4e60f8d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 288 additions and 112 deletions

View File

@ -103,6 +103,8 @@ type AutoscalingOptions struct {
// The formula to calculate additional candidates number is following: // The formula to calculate additional candidates number is following:
// max(#nodes * ScaleDownCandidatesPoolRatio, ScaleDownCandidatesPoolMinCount) // max(#nodes * ScaleDownCandidatesPoolRatio, ScaleDownCandidatesPoolMinCount)
ScaleDownCandidatesPoolMinCount int ScaleDownCandidatesPoolMinCount int
// NodeDeletionDelayTimeout is maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.
NodeDeletionDelayTimeout time.Duration
// WriteStatusConfigMap tells if the status information should be written to a ConfigMap // WriteStatusConfigMap tells if the status information should be written to a ConfigMap
WriteStatusConfigMap bool WriteStatusConfigMap bool
// BalanceSimilarNodeGroups enables logic that identifies node groups with similar machines and tries to balance node count between them. // BalanceSimilarNodeGroups enables logic that identifies node groups with similar machines and tries to balance node count between them.

View File

@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
@ -42,6 +43,7 @@ import (
kube_record "k8s.io/client-go/tools/record" kube_record "k8s.io/client-go/tools/record"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/klog" "k8s.io/klog"
@ -50,6 +52,9 @@ import (
const ( const (
// ScaleDownDisabledKey is the name of annotation marking node as not eligible for scale down. // ScaleDownDisabledKey is the name of annotation marking node as not eligible for scale down.
ScaleDownDisabledKey = "cluster-autoscaler.kubernetes.io/scale-down-disabled" ScaleDownDisabledKey = "cluster-autoscaler.kubernetes.io/scale-down-disabled"
// DelayDeletionAnnotationPrefix is the prefix of annotation marking node as it needs to wait
// for other K8s components before deleting node.
DelayDeletionAnnotationPrefix = "delay-deletion.cluster-autoscaler.kubernetes.io/"
) )
const ( const (
@ -66,42 +71,78 @@ const (
PodEvictionHeadroom = 30 * time.Second PodEvictionHeadroom = 30 * time.Second
) )
// NodeDeleteStatus tells whether a node is being deleted right now. // NodeDeletionTracker keeps track of node deletions.
type NodeDeleteStatus struct { type NodeDeletionTracker struct {
sync.Mutex sync.Mutex
deleteInProgress bool nonEmptyNodeDeleteInProgress bool
// A map of node delete results by node name. It's being constantly emptied into ScaleDownStatus // A map of node delete results by node name. It's being constantly emptied into ScaleDownStatus
// objects in order to notify the ScaleDownStatusProcessor that the node drain has ended or that // objects in order to notify the ScaleDownStatusProcessor that the node drain has ended or that
// an error occurred during the deletion process. // an error occurred during the deletion process.
nodeDeleteResults map[string]status.NodeDeleteResult nodeDeleteResults map[string]status.NodeDeleteResult
// A map which keeps track of deletions in progress for nodepools.
// Key is a node group id and value is a number of node deletions in progress.
deletionsInProgress map[string]int
} }
// Get current time. Proxy for unit tests. // Get current time. Proxy for unit tests.
var now func() time.Time = time.Now var now func() time.Time = time.Now
// IsDeleteInProgress returns true if a node is being deleted. // NewNodeDeletionTracker creates new NodeDeletionTracker.
func (n *NodeDeleteStatus) IsDeleteInProgress() bool { func NewNodeDeletionTracker() *NodeDeletionTracker {
n.Lock() return &NodeDeletionTracker{
defer n.Unlock() nodeDeleteResults: make(map[string]status.NodeDeleteResult),
return n.deleteInProgress deletionsInProgress: make(map[string]int),
}
} }
// SetDeleteInProgress sets deletion process status // IsNonEmptyNodeDeleteInProgress returns true if a non empty node is being deleted.
func (n *NodeDeleteStatus) SetDeleteInProgress(status bool) { func (n *NodeDeletionTracker) IsNonEmptyNodeDeleteInProgress() bool {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
n.deleteInProgress = status return n.nonEmptyNodeDeleteInProgress
}
// SetNonEmptyNodeDeleteInProgress sets non empty node deletion in progress status.
func (n *NodeDeletionTracker) SetNonEmptyNodeDeleteInProgress(status bool) {
n.Lock()
defer n.Unlock()
n.nonEmptyNodeDeleteInProgress = status
}
// StartDeletion increments node deletion in progress counter for the given nodegroup.
func (n *NodeDeletionTracker) StartDeletion(nodeGroupId string) {
n.Lock()
defer n.Unlock()
n.deletionsInProgress[nodeGroupId]++
}
// EndDeletion decrements node deletion in progress counter for the given nodegroup.
func (n *NodeDeletionTracker) EndDeletion(nodeGroupId string) {
n.Lock()
defer n.Unlock()
n.deletionsInProgress[nodeGroupId]--
if n.deletionsInProgress[nodeGroupId] < 0 {
delete(n.deletionsInProgress, nodeGroupId)
klog.Errorf("This should never happen, counter for %s in DelayedNodeDeletionStatus is below 0", nodeGroupId)
}
}
// GetDeletionsInProgress returns the number of deletions in progress for the given node group.
func (n *NodeDeletionTracker) GetDeletionsInProgress(nodeGroupId string) int {
n.Lock()
defer n.Unlock()
return n.deletionsInProgress[nodeGroupId]
} }
// AddNodeDeleteResult adds a node delete result to the result map. // AddNodeDeleteResult adds a node delete result to the result map.
func (n *NodeDeleteStatus) AddNodeDeleteResult(nodeName string, result status.NodeDeleteResult) { func (n *NodeDeletionTracker) AddNodeDeleteResult(nodeName string, result status.NodeDeleteResult) {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
n.nodeDeleteResults[nodeName] = result n.nodeDeleteResults[nodeName] = result
} }
// GetAndClearNodeDeleteResults returns the whole result map and replaces it with a new empty one. // GetAndClearNodeDeleteResults returns the whole result map and replaces it with a new empty one.
func (n *NodeDeleteStatus) GetAndClearNodeDeleteResults() map[string]status.NodeDeleteResult { func (n *NodeDeletionTracker) GetAndClearNodeDeleteResults() map[string]status.NodeDeleteResult {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
results := n.nodeDeleteResults results := n.nodeDeleteResults
@ -311,7 +352,7 @@ type ScaleDown struct {
podLocationHints map[string]string podLocationHints map[string]string
nodeUtilizationMap map[string]simulator.UtilizationInfo nodeUtilizationMap map[string]simulator.UtilizationInfo
usageTracker *simulator.UsageTracker usageTracker *simulator.UsageTracker
nodeDeleteStatus *NodeDeleteStatus nodeDeletionTracker *NodeDeletionTracker
} }
// NewScaleDown builds new ScaleDown object. // NewScaleDown builds new ScaleDown object.
@ -325,7 +366,7 @@ func NewScaleDown(context *context.AutoscalingContext, clusterStateRegistry *clu
nodeUtilizationMap: make(map[string]simulator.UtilizationInfo), nodeUtilizationMap: make(map[string]simulator.UtilizationInfo),
usageTracker: simulator.NewUsageTracker(), usageTracker: simulator.NewUsageTracker(),
unneededNodesList: make([]*apiv1.Node, 0), unneededNodesList: make([]*apiv1.Node, 0),
nodeDeleteStatus: &NodeDeleteStatus{nodeDeleteResults: make(map[string]status.NodeDeleteResult)}, nodeDeletionTracker: NewNodeDeletionTracker(),
} }
} }
@ -419,7 +460,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
emptyNodes := make(map[string]bool) emptyNodes := make(map[string]bool)
emptyNodesList := getEmptyNodesNoResourceLimits(currentlyUnneededNodes, pods, len(currentlyUnneededNodes), sd.context.CloudProvider) emptyNodesList := sd.getEmptyNodesNoResourceLimits(currentlyUnneededNodes, pods, len(currentlyUnneededNodes))
for _, node := range emptyNodesList { for _, node := range emptyNodesList {
emptyNodes[node.Name] = true emptyNodes[node.Name] = true
} }
@ -636,7 +677,7 @@ func (sd *ScaleDown) SoftTaintUnneededNodes(allNodes []*apiv1.Node) (errors []er
// TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was // TryToScaleDown tries to scale down the cluster. It returns a result inside a ScaleDownStatus indicating if any node was
// removed and error if such occurred. // removed and error if such occurred.
func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) { func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget, currentTime time.Time) (*status.ScaleDownStatus, errors.AutoscalerError) {
scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeleteStatus.GetAndClearNodeDeleteResults()} scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: sd.nodeDeletionTracker.GetAndClearNodeDeleteResults()}
nodeDeletionDuration := time.Duration(0) nodeDeletionDuration := time.Duration(0)
findNodesToRemoveDuration := time.Duration(0) findNodesToRemoveDuration := time.Duration(0)
defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration) defer updateScaleDownMetrics(time.Now(), &findNodesToRemoveDuration, &nodeDeletionDuration)
@ -697,7 +738,7 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
continue continue
} }
if size <= nodeGroup.MinSize() { if size-sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id()) <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name) klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
continue continue
} }
@ -727,27 +768,24 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
// Trying to delete empty nodes in bulk. If there are no empty nodes then CA will // Trying to delete empty nodes in bulk. If there are no empty nodes then CA will
// try to delete not-so-empty nodes, possibly killing some pods and allowing them // try to delete not-so-empty nodes, possibly killing some pods and allowing them
// to recreate on other nodes. // to recreate on other nodes.
emptyNodes := getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, scaleDownResourcesLeft, sd.context.CloudProvider) emptyNodes := sd.getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, scaleDownResourcesLeft)
if len(emptyNodes) > 0 { if len(emptyNodes) > 0 {
nodeDeletionStart := time.Now() nodeDeletionStart := time.Now()
confirmation := make(chan nodeDeletionConfirmation, len(emptyNodes)) deletedNodes, err := sd.scheduleDeleteEmptyNodes(emptyNodes, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups)
sd.scheduleDeleteEmptyNodes(emptyNodes, sd.context.CloudProvider, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups, confirmation)
deletedNodes, err := sd.waitForEmptyNodesDeleted(emptyNodes, confirmation)
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart) nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
// TODO: Give the processor some information about the nodes that failed to be deleted. // TODO: Give the processor some information about the nodes that failed to be deleted.
scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes(deletedNodes, candidateNodeGroups, make(map[string][]*apiv1.Pod)) scaleDownStatus.ScaledDownNodes = sd.mapNodesToStatusScaleDownNodes(deletedNodes, candidateNodeGroups, make(map[string][]*apiv1.Pod))
if len(deletedNodes) > 0 { if len(deletedNodes) > 0 {
scaleDownStatus.Result = status.ScaleDownNodeDeleted scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted
} else { } else {
scaleDownStatus.Result = status.ScaleDownError scaleDownStatus.Result = status.ScaleDownError
} }
if err != nil {
if err == nil {
return scaleDownStatus, nil
}
return scaleDownStatus, err.AddPrefix("failed to delete at least one empty node: ") return scaleDownStatus, err.AddPrefix("failed to delete at least one empty node: ")
} }
return scaleDownStatus, nil
}
findNodesToRemoveStart := time.Now() findNodesToRemoveStart := time.Now()
// Only scheduled non expendable pods are taken into account and have to be moved. // Only scheduled non expendable pods are taken into account and have to be moved.
@ -784,19 +822,24 @@ func (sd *ScaleDown) TryToScaleDown(allNodes []*apiv1.Node, pods []*apiv1.Pod, p
// Starting deletion. // Starting deletion.
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart) nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
sd.nodeDeleteStatus.SetDeleteInProgress(true) sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true)
go func() { go func() {
// Finishing the delete process once this goroutine is over. // Finishing the delete process once this goroutine is over.
var result status.NodeDeleteResult var result status.NodeDeleteResult
defer func() { sd.nodeDeleteStatus.AddNodeDeleteResult(toRemove.Node.Name, result) }() defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(toRemove.Node.Name, result) }()
defer sd.nodeDeleteStatus.SetDeleteInProgress(false) defer sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(false)
result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule) nodeGroup, found := candidateNodeGroups[toRemove.Node.Name]
if !found {
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: errors.NewAutoscalerError(
errors.CloudProviderError, "failed to find node group for %s", toRemove.Node.Name)}
return
}
result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, nodeGroup)
if result.ResultType != status.NodeDeleteOk { if result.ResultType != status.NodeDeleteOk {
klog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, result.Err) klog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, result.Err)
return return
} }
nodeGroup := candidateNodeGroups[toRemove.Node.Name]
if readinessMap[toRemove.Node.Name] { if readinessMap[toRemove.Node.Name] {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Underutilized) metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, toRemove.Node, nodeGroup), metrics.Underutilized)
} else { } else {
@ -819,24 +862,22 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration
metrics.UpdateDuration(metrics.ScaleDownMiscOperations, miscDuration) metrics.UpdateDuration(metrics.ScaleDownMiscOperations, miscDuration)
} }
func getEmptyNodesNoResourceLimits(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int, func (sd *ScaleDown) getEmptyNodesNoResourceLimits(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int) []*apiv1.Node {
cloudProvider cloudprovider.CloudProvider) []*apiv1.Node { return sd.getEmptyNodes(candidates, pods, maxEmptyBulkDelete, noScaleDownLimitsOnResources())
return getEmptyNodes(candidates, pods, maxEmptyBulkDelete, noScaleDownLimitsOnResources(), cloudProvider)
} }
// This functions finds empty nodes among passed candidates and returns a list of empty nodes // This functions finds empty nodes among passed candidates and returns a list of empty nodes
// that can be deleted at the same time. // that can be deleted at the same time.
func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int, func (sd *ScaleDown) getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDelete int,
resourcesLimits scaleDownResourcesLimits, cloudProvider cloudprovider.CloudProvider) []*apiv1.Node { resourcesLimits scaleDownResourcesLimits) []*apiv1.Node {
emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods) emptyNodes := simulator.FindEmptyNodesToRemove(candidates, pods)
availabilityMap := make(map[string]int) availabilityMap := make(map[string]int)
result := make([]*apiv1.Node, 0) result := make([]*apiv1.Node, 0)
resourcesLimitsCopy := copyScaleDownResourcesLimits(resourcesLimits) // we do not want to modify input parameter resourcesLimitsCopy := copyScaleDownResourcesLimits(resourcesLimits) // we do not want to modify input parameter
resourcesNames := sets.StringKeySet(resourcesLimits).List() resourcesNames := sets.StringKeySet(resourcesLimits).List()
for _, node := range emptyNodes { for _, node := range emptyNodes {
nodeGroup, err := cloudProvider.NodeGroupForNode(node) nodeGroup, err := sd.context.CloudProvider.NodeGroupForNode(node)
if err != nil { if err != nil {
klog.Errorf("Failed to get group for %s", node.Name) klog.Errorf("Failed to get group for %s", node.Name)
continue continue
@ -853,14 +894,14 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
klog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err) klog.Errorf("Failed to get size for %s: %v ", nodeGroup.Id(), err)
continue continue
} }
available = size - nodeGroup.MinSize() available = size - nodeGroup.MinSize() - sd.nodeDeletionTracker.GetDeletionsInProgress(nodeGroup.Id())
if available < 0 { if available < 0 {
available = 0 available = 0
} }
availabilityMap[nodeGroup.Id()] = available availabilityMap[nodeGroup.Id()] = available
} }
if available > 0 { if available > 0 {
resourcesDelta, err := computeScaleDownResourcesDelta(cloudProvider, node, nodeGroup, resourcesNames) resourcesDelta, err := computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesNames)
if err != nil { if err != nil {
klog.Errorf("Error: %v", err) klog.Errorf("Error: %v", err)
continue continue
@ -881,25 +922,30 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
return result[:limit] return result[:limit]
} }
type nodeDeletionConfirmation struct { func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client kube_client.Interface,
node *apiv1.Node
err errors.AutoscalerError
}
func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, cp cloudprovider.CloudProvider, client kube_client.Interface,
recorder kube_record.EventRecorder, readinessMap map[string]bool, recorder kube_record.EventRecorder, readinessMap map[string]bool,
candidateNodeGroups map[string]cloudprovider.NodeGroup, confirmation chan nodeDeletionConfirmation) { candidateNodeGroups map[string]cloudprovider.NodeGroup) ([]*apiv1.Node, errors.AutoscalerError) {
deletedNodes := []*apiv1.Node{}
for _, node := range emptyNodes { for _, node := range emptyNodes {
klog.V(0).Infof("Scale-down: removing empty node %s", node.Name) klog.V(0).Infof("Scale-down: removing empty node %s", node.Name)
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %s", node.Name) sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %s", node.Name)
simulator.RemoveNodeFromTracker(sd.usageTracker, node.Name, sd.unneededNodes) simulator.RemoveNodeFromTracker(sd.usageTracker, node.Name, sd.unneededNodes)
go func(nodeToDelete *apiv1.Node) { nodeGroup, found := candidateNodeGroups[node.Name]
taintErr := deletetaint.MarkToBeDeleted(nodeToDelete, client) if !found {
if taintErr != nil { return deletedNodes, errors.NewAutoscalerError(
recorder.Eventf(nodeToDelete, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr) errors.CloudProviderError, "failed to find node group for %s", node.Name)
confirmation <- nodeDeletionConfirmation{node: nodeToDelete, err: errors.ToAutoscalerError(errors.ApiCallError, taintErr)}
return
} }
taintErr := deletetaint.MarkToBeDeleted(node, client)
if taintErr != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr)
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
}
deletedNodes = append(deletedNodes, node)
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup) {
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id())
var result status.NodeDeleteResult
defer func() { sd.nodeDeletionTracker.AddNodeDeleteResult(nodeToDelete.Name, result) }()
var deleteErr errors.AutoscalerError var deleteErr errors.AutoscalerError
// If we fail to delete the node we want to remove delete taint // If we fail to delete the node we want to remove delete taint
@ -912,47 +958,32 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, cp cloud
} }
}() }()
deleteErr = waitForDelayDeletion(nodeToDelete, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout)
if deleteErr != nil {
klog.Errorf("Problem with empty node deletion: %v", deleteErr)
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: deleteErr}
return
}
deleteErr = deleteNodeFromCloudProvider(nodeToDelete, sd.context.CloudProvider, deleteErr = deleteNodeFromCloudProvider(nodeToDelete, sd.context.CloudProvider,
sd.context.Recorder, sd.clusterStateRegistry) sd.context.Recorder, sd.clusterStateRegistry)
if deleteErr == nil { if deleteErr != nil {
nodeGroup := candidateNodeGroups[nodeToDelete.Name] klog.Errorf("Problem with empty node deletion: %v", deleteErr)
result = status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: deleteErr}
return
}
if readinessMap[nodeToDelete.Name] { if readinessMap[nodeToDelete.Name] {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(cp.GPULabel(), cp.GetAvailableGPUTypes(), nodeToDelete, nodeGroup), metrics.Empty) metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(sd.context.CloudProvider.GPULabel(), sd.context.CloudProvider.GetAvailableGPUTypes(), nodeToDelete, nodeGroupForDeletedNode), metrics.Empty)
} else { } else {
metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(cp.GPULabel(), cp.GetAvailableGPUTypes(), nodeToDelete, nodeGroup), metrics.Unready) metrics.RegisterScaleDown(1, gpu.GetGpuTypeForMetrics(sd.context.CloudProvider.GPULabel(), sd.context.CloudProvider.GetAvailableGPUTypes(), nodeToDelete, nodeGroupForDeletedNode), metrics.Unready)
} }
result = status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}(node, nodeGroup)
} }
confirmation <- nodeDeletionConfirmation{node: nodeToDelete, err: deleteErr} return deletedNodes, nil
}(node)
}
} }
func (sd *ScaleDown) waitForEmptyNodesDeleted(emptyNodes []*apiv1.Node, confirmation chan nodeDeletionConfirmation) (deletedNodes []*apiv1.Node, finalError errors.AutoscalerError) { func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod,
deletedNodes = make([]*apiv1.Node, 0) nodeGroup cloudprovider.NodeGroup) status.NodeDeleteResult {
startTime := time.Now()
for range emptyNodes {
timeElapsed := time.Now().Sub(startTime)
timeLeft := MaxCloudProviderNodeDeletionTime - timeElapsed
if timeLeft < 0 {
return deletedNodes, errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time")
}
select {
case conf := <-confirmation:
if conf.err != nil {
klog.Errorf("Problem with empty node deletion: %v", conf.err)
finalError = conf.err
} else {
deletedNodes = append(deletedNodes, conf.node)
}
case <-time.After(timeLeft):
finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time")
}
}
return deletedNodes, finalError
}
func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod) status.NodeDeleteResult {
deleteSuccessful := false deleteSuccessful := false
drainSuccessful := false drainSuccessful := false
@ -961,6 +992,9 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod) status.Node
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)} return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)}
} }
sd.nodeDeletionTracker.StartDeletion(nodeGroup.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroup.Id())
// If we fail to evict all the pods from the node we want to remove delete taint // If we fail to evict all the pods from the node we want to remove delete taint
defer func() { defer func() {
if !deleteSuccessful { if !deleteSuccessful {
@ -982,9 +1016,14 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod) status.Node
} }
drainSuccessful = true drainSuccessful = true
if typedErr := waitForDelayDeletion(node, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout); typedErr != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: typedErr}
}
// attempt delete from cloud provider // attempt delete from cloud provider
if err := deleteNodeFromCloudProvider(node, sd.context.CloudProvider, sd.context.Recorder, sd.clusterStateRegistry); err != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: err} if typedErr := deleteNodeFromCloudProvider(node, sd.context.CloudProvider, sd.context.Recorder, sd.clusterStateRegistry); typedErr != nil {
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToDelete, Err: typedErr}
} }
deleteSuccessful = true // Let the deferred function know there is no need to cleanup deleteSuccessful = true // Let the deferred function know there is no need to cleanup
@ -1129,6 +1168,38 @@ func deleteNodeFromCloudProvider(node *apiv1.Node, cloudProvider cloudprovider.C
return nil return nil
} }
func waitForDelayDeletion(node *apiv1.Node, nodeLister kubernetes.NodeLister, timeout time.Duration) errors.AutoscalerError {
if hasDelayDeletionAnnotation(node) {
klog.V(1).Infof("Wait for removing %s annotations on node %v", DelayDeletionAnnotationPrefix, node.Name)
err := wait.Poll(5*time.Second, timeout, func() (bool, error) {
klog.V(2).Infof("Waiting for removing %s annotations on node %v", DelayDeletionAnnotationPrefix, node.Name)
freshNode, err := nodeLister.Get(node.Name)
if err != nil || freshNode == nil {
return false, fmt.Errorf("failed to get node %v: %v", node.Name, err)
}
return !hasDelayDeletionAnnotation(freshNode), nil
})
if err != nil && err != wait.ErrWaitTimeout {
return errors.ToAutoscalerError(errors.ApiCallError, err)
}
if err == wait.ErrWaitTimeout {
klog.Warningf("Delay node deletion timed out for node %v, delay deletion annotation wasn't removed within %v, this might slow down scale down.", node.Name, timeout)
} else {
klog.V(2).Infof("Annotation %s removed from node %v", DelayDeletionAnnotationPrefix, node.Name)
}
}
return nil
}
func hasDelayDeletionAnnotation(node *apiv1.Node) bool {
for annotation := range node.Annotations {
if strings.HasPrefix(annotation, DelayDeletionAnnotationPrefix) {
return true
}
}
return false
}
func hasNoScaleDownAnnotation(node *apiv1.Node) bool { func hasNoScaleDownAnnotation(node *apiv1.Node) bool {
return node.Annotations[ScaleDownDisabledKey] == "true" return node.Annotations[ScaleDownDisabledKey] == "true"
} }

View File

@ -22,6 +22,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/mock"
batchv1 "k8s.io/api/batch/v1" batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1" policyv1 "k8s.io/api/policy/v1beta1"
@ -599,13 +600,14 @@ func TestDeleteNode(t *testing.T) {
fakeClient.Fake.AddReactor("get", "pods", podNotFoundFunc) fakeClient.Fake.AddReactor("get", "pods", podNotFoundFunc)
// build context // build context
context := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, nil, provider, nil) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
sd := NewScaleDown(&context, clusterStateRegistry) sd := NewScaleDown(&context, clusterStateRegistry)
// attempt delete // attempt delete
result := sd.deleteNode(n1, pods) result := sd.deleteNode(n1, pods, provider.GetNodeGroup("ng1"))
// verify // verify
if scenario.expectedDeletion { if scenario.expectedDeletion {
@ -950,7 +952,7 @@ func TestScaleDown(t *testing.T) {
func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) { func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) {
for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) { for start := time.Now(); time.Since(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
if !sd.nodeDeleteStatus.IsDeleteInProgress() { if !sd.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
return return
} }
} }
@ -1088,6 +1090,24 @@ func TestScaleDownEmptyMinGroupSizeLimitHit(t *testing.T) {
simpleScaleDownEmpty(t, config) simpleScaleDownEmpty(t, config)
} }
func TestScaleDownEmptyMinGroupSizeLimitHitWhenOneNodeIsBeingDeleted(t *testing.T) {
nodeDeletionTracker := NewNodeDeletionTracker()
nodeDeletionTracker.StartDeletion("ng1")
nodeDeletionTracker.StartDeletion("ng1")
options := defaultScaleDownOptions
config := &scaleTestConfig{
nodes: []nodeConfig{
{"n1", 2000, 1000, 0, true, "ng1"},
{"n2", 2000, 1000, 0, true, "ng1"},
{"n3", 2000, 1000, 0, true, "ng1"},
},
options: options,
expectedScaleDowns: []string{},
nodeDeletionTracker: nodeDeletionTracker,
}
simpleScaleDownEmpty(t, config)
}
func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) { func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
updatedNodes := make(chan string, 10) updatedNodes := make(chan string, 10)
deletedNodes := make(chan string, 10) deletedNodes := make(chan string, 10)
@ -1147,22 +1167,23 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
assert.NotNil(t, provider) assert.NotNil(t, provider)
context := NewScaleTestAutoscalingContext(config.options, fakeClient, nil, provider, nil) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
scaleDown := NewScaleDown(&context, clusterStateRegistry) scaleDown := NewScaleDown(&context, clusterStateRegistry)
if config.nodeDeletionTracker != nil {
scaleDown.nodeDeletionTracker = config.nodeDeletionTracker
}
scaleDown.UpdateUnneededNodes(nodes, scaleDown.UpdateUnneededNodes(nodes,
nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil) nodes, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now()) scaleDownStatus, err := scaleDown.TryToScaleDown(nodes, []*apiv1.Pod{}, nil, time.Now())
waitForDeleteToFinish(t, scaleDown) assert.False(t, scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress())
// This helps to verify that TryToScaleDown doesn't attempt to remove anything
// after delete in progress status is gone.
close(deletedNodes)
assert.NoError(t, err) assert.NoError(t, err)
var expectedScaleDownResult status.ScaleDownResult var expectedScaleDownResult status.ScaleDownResult
if len(config.expectedScaleDowns) > 0 { if len(config.expectedScaleDowns) > 0 {
expectedScaleDownResult = status.ScaleDownNodeDeleted expectedScaleDownResult = status.ScaleDownNodeDeleteStarted
} else { } else {
expectedScaleDownResult = status.ScaleDownNoUnneeded expectedScaleDownResult = status.ScaleDownNoUnneeded
} }
@ -1172,8 +1193,8 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
// Report only up to 10 extra nodes found. // Report only up to 10 extra nodes found.
deleted := make([]string, 0, len(config.expectedScaleDowns)+10) deleted := make([]string, 0, len(config.expectedScaleDowns)+10)
for i := 0; i < len(config.expectedScaleDowns)+10; i++ { for i := 0; i < len(config.expectedScaleDowns)+10; i++ {
d := getStringFromChanImmediately(deletedNodes) d := getStringFromChan(deletedNodes)
if d == "" { // a closed channel yields empty value if d == nothingReturned { // a closed channel yields empty value
break break
} }
deleted = append(deleted, d) deleted = append(deleted, d)
@ -1222,7 +1243,8 @@ func TestNoScaleDownUnready(t *testing.T) {
ScaleDownUnreadyTime: time.Hour, ScaleDownUnreadyTime: time.Hour,
MaxGracefulTerminationSec: 60, MaxGracefulTerminationSec: 60,
} }
context := NewScaleTestAutoscalingContext(options, fakeClient, nil, provider, nil) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
context := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil)
// N1 is unready so it requires a bigger unneeded time. // N1 is unready so it requires a bigger unneeded time.
clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff()) clusterStateRegistry := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
@ -1730,3 +1752,55 @@ func TestSoftTaintTimeLimit(t *testing.T) {
assert.Empty(t, errs) assert.Empty(t, errs)
assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient)) assert.Equal(t, 0, countDeletionCandidateTaints(t, fakeClient))
} }
func TestWaitForDelayDeletion(t *testing.T) {
type testcase struct {
name string
addAnnotation bool
removeAnnotation bool
expectCallingGetNode bool
}
tests := []testcase{
{
name: "annotation not set",
addAnnotation: false,
removeAnnotation: false,
},
{
name: "annotation set and removed",
addAnnotation: true,
removeAnnotation: true,
},
{
name: "annotation set but not removed",
addAnnotation: true,
removeAnnotation: false,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
node := BuildTestNode("n1", 1000, 10)
nodeWithAnnotation := BuildTestNode("n1", 1000, 10)
nodeWithAnnotation.Annotations = map[string]string{DelayDeletionAnnotationPrefix + "ingress": "true"}
allNodeListerMock := &nodeListerMock{}
if test.addAnnotation {
if test.removeAnnotation {
allNodeListerMock.On("Get").Return(node, nil).Once()
} else {
allNodeListerMock.On("Get").Return(nodeWithAnnotation, nil).Twice()
}
}
var err error
if test.addAnnotation {
err = waitForDelayDeletion(nodeWithAnnotation, allNodeListerMock, 6*time.Second)
} else {
err = waitForDelayDeletion(node, allNodeListerMock, 6*time.Second)
}
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, allNodeListerMock)
})
}
}

View File

@ -75,6 +75,7 @@ type scaleTestConfig struct {
expectedFinalScaleUp groupSizeChange // we expect this to be delivered via scale-up event expectedFinalScaleUp groupSizeChange // we expect this to be delivered via scale-up event
expectedScaleDowns []string expectedScaleDowns []string
options config.AutoscalingOptions options config.AutoscalingOptions
nodeDeletionTracker *NodeDeletionTracker
} }
// NewScaleTestAutoscalingContext creates a new test autoscaling context for scaling tests. // NewScaleTestAutoscalingContext creates a new test autoscaling context for scaling tests.

View File

@ -398,16 +398,16 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) || a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime) a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
// In dry run only utilization is updated // In dry run only utilization is updated
calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeleteStatus.IsDeleteInProgress() calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()
klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+ klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+
"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v isDeleteInProgress=%v", "lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v isDeleteInProgress=%v",
calculateUnneededOnly, a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, calculateUnneededOnly, a.lastScaleUpTime, a.lastScaleDownDeleteTime, a.lastScaleDownFailTime,
a.processorCallbacks.disableScaleDownForLoop, scaleDown.nodeDeleteStatus.IsDeleteInProgress()) a.processorCallbacks.disableScaleDownForLoop, scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress())
if scaleDownInCooldown { if scaleDownInCooldown {
scaleDownStatus.Result = status.ScaleDownInCooldown scaleDownStatus.Result = status.ScaleDownInCooldown
} else if scaleDown.nodeDeleteStatus.IsDeleteInProgress() { } else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
scaleDownStatus.Result = status.ScaleDownInProgress scaleDownStatus.Result = status.ScaleDownInProgress
} else { } else {
klog.V(4).Infof("Starting scale down") klog.V(4).Infof("Starting scale down")

View File

@ -53,6 +53,10 @@ func (l *nodeListerMock) List() ([]*apiv1.Node, error) {
args := l.Called() args := l.Called()
return args.Get(0).([]*apiv1.Node), args.Error(1) return args.Get(0).([]*apiv1.Node), args.Error(1)
} }
func (l *nodeListerMock) Get(name string) (*apiv1.Node, error) {
args := l.Called()
return args.Get(0).(*apiv1.Node), args.Error(1)
}
type podListerMock struct { type podListerMock struct {
mock.Mock mock.Mock

View File

@ -33,6 +33,10 @@ func (n *testNodeLister) List() ([]*apiv1.Node, error) {
return n.list, nil return n.list, nil
} }
func (n *testNodeLister) Get(name string) (*apiv1.Node, error) {
return nil, nil
}
func testPreferredNodeSingleCase(t *testing.T, currentNodes int, expectedNodeSize int) { func testPreferredNodeSingleCase(t *testing.T, currentNodes int, expectedNodeSize int) {
nodes := []*apiv1.Node{} nodes := []*apiv1.Node{}
for i := 1; i <= currentNodes; i++ { for i := 1; i <= currentNodes; i++ {

View File

@ -114,6 +114,7 @@ var (
"for scale down when some candidates from previous iteration are no longer valid."+ "for scale down when some candidates from previous iteration are no longer valid."+
"When calculating the pool size for additional candidates we take"+ "When calculating the pool size for additional candidates we take"+
"max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).") "max(#nodes * scale-down-candidates-pool-ratio, scale-down-candidates-pool-min-count).")
nodeDeletionDelayTimeout = flag.Duration("node-deletion-delay-timeout", 2*time.Minute, "Maximum time CA waits for removing delay-deletion.cluster-autoscaler.kubernetes.io/ annotations before deleting the node.")
scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down") scanInterval = flag.Duration("scan-interval", 10*time.Second, "How often cluster is reevaluated for scale up or down")
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.") maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.") coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
@ -185,7 +186,6 @@ func createAutoscalingOptions() config.AutoscalingOptions {
if err != nil { if err != nil {
klog.Fatalf("Failed to parse flags: %v", err) klog.Fatalf("Failed to parse flags: %v", err)
} }
return config.AutoscalingOptions{ return config.AutoscalingOptions{
CloudConfig: *cloudConfig, CloudConfig: *cloudConfig,
CloudProviderName: *cloudProviderFlag, CloudProviderName: *cloudProviderFlag,
@ -231,6 +231,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NewPodScaleUpDelay: *newPodScaleUpDelay, NewPodScaleUpDelay: *newPodScaleUpDelay,
FilterOutSchedulablePodsUsesPacking: *filterOutSchedulablePodsUsesPacking, FilterOutSchedulablePodsUsesPacking: *filterOutSchedulablePodsUsesPacking,
IgnoredTaints: *ignoreTaintsFlag, IgnoredTaints: *ignoreTaintsFlag,
NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout,
} }
} }

View File

@ -223,6 +223,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
// NodeLister lists nodes. // NodeLister lists nodes.
type NodeLister interface { type NodeLister interface {
List() ([]*apiv1.Node, error) List() ([]*apiv1.Node, error)
Get(name string) (*apiv1.Node, error)
} }
// ReadyNodeLister lists ready nodes. // ReadyNodeLister lists ready nodes.
@ -245,6 +246,15 @@ func (readyNodeLister *ReadyNodeLister) List() ([]*apiv1.Node, error) {
return readyNodes, nil return readyNodes, nil
} }
// Get returns the node with the given name.
func (readyNodeLister *ReadyNodeLister) Get(name string) (*apiv1.Node, error) {
node, err := readyNodeLister.nodeLister.Get(name)
if err != nil {
return nil, err
}
return node, nil
}
// NewReadyNodeLister builds a node lister. // NewReadyNodeLister builds a node lister.
func NewReadyNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) NodeLister { func NewReadyNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) NodeLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
@ -272,6 +282,15 @@ func (allNodeLister *AllNodeLister) List() ([]*apiv1.Node, error) {
return allNodes, nil return allNodes, nil
} }
// Get returns the node with the given name.
func (allNodeLister *AllNodeLister) Get(name string) (*apiv1.Node, error) {
node, err := allNodeLister.nodeLister.Get(name)
if err != nil {
return nil, err
}
return node, nil
}
// NewAllNodeLister builds a node lister that returns all nodes (ready and unready) // NewAllNodeLister builds a node lister that returns all nodes (ready and unready)
func NewAllNodeLister(kubeClient client.Interface, stopchannel <-chan struct{}) NodeLister { func NewAllNodeLister(kubeClient client.Interface, stopchannel <-chan struct{}) NodeLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything()) listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())