Run node drain/delete in a separate goroutine
This commit is contained in:
parent
d61b0bbcfc
commit
718e5db78e
|
@ -20,6 +20,7 @@ import (
|
|||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
|
||||
|
@ -53,6 +54,8 @@ const (
|
|||
ScaleDownNoNodeDeleted ScaleDownResult = iota
|
||||
// ScaleDownNodeDeleted - a node was deleted.
|
||||
ScaleDownNodeDeleted ScaleDownResult = iota
|
||||
// ScaleDownNodeDeleteStarted - a node deletion process was started.
|
||||
ScaleDownNodeDeleteStarted ScaleDownResult = iota
|
||||
// ScaleDownDisabledKey is the name of annotation marking node as not eligible for scale down.
|
||||
ScaleDownDisabledKey = "cluster-autoscaler.kubernetes.io/scale-down-disabled"
|
||||
)
|
||||
|
@ -68,11 +71,31 @@ const (
|
|||
EvictionRetryTime = 10 * time.Second
|
||||
// PodEvictionHeadroom is the extra time we wait to catch situations when the pod is ignoring SIGTERM and
|
||||
// is killed with SIGKILL after MaxGracefulTerminationTime
|
||||
PodEvictionHeadroom = 20 * time.Second
|
||||
PodEvictionHeadroom = 30 * time.Second
|
||||
// UnremovableNodeRecheckTimeout is the timeout before we check again a node that couldn't be removed before
|
||||
UnremovableNodeRecheckTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
// NodeDeleteStatus tells whether a node is being deleted right now.
|
||||
type NodeDeleteStatus struct {
|
||||
sync.Mutex
|
||||
deleteInProgress bool
|
||||
}
|
||||
|
||||
// IsDeleteInProgress returns true if a node is being deleted.
|
||||
func (n *NodeDeleteStatus) IsDeleteInProgress() bool {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
return n.deleteInProgress
|
||||
}
|
||||
|
||||
// SetDeleteInProgress sets deletion process status
|
||||
func (n *NodeDeleteStatus) SetDeleteInProgress(status bool) {
|
||||
n.Lock()
|
||||
defer n.Unlock()
|
||||
n.deleteInProgress = status
|
||||
}
|
||||
|
||||
// ScaleDown is responsible for maintaining the state needed to perform unneded node removals.
|
||||
type ScaleDown struct {
|
||||
context *AutoscalingContext
|
||||
|
@ -82,6 +105,7 @@ type ScaleDown struct {
|
|||
podLocationHints map[string]string
|
||||
nodeUtilizationMap map[string]float64
|
||||
usageTracker *simulator.UsageTracker
|
||||
nodeDeleteStatus *NodeDeleteStatus
|
||||
}
|
||||
|
||||
// NewScaleDown builds new ScaleDown object.
|
||||
|
@ -94,6 +118,7 @@ func NewScaleDown(context *AutoscalingContext) *ScaleDown {
|
|||
nodeUtilizationMap: make(map[string]float64),
|
||||
usageTracker: simulator.NewUsageTracker(),
|
||||
unneededNodesList: make([]*apiv1.Node, 0),
|
||||
nodeDeleteStatus: &NodeDeleteStatus{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -336,18 +361,27 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
|
|||
// Nothing super-bad should happen if the node is removed from tracker prematurely.
|
||||
simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes)
|
||||
nodeDeletionStart := time.Now()
|
||||
err = deleteNode(sd.context, toRemove.Node, toRemove.PodsToReschedule)
|
||||
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
|
||||
if err != nil {
|
||||
return ScaleDownError, err.AddPrefix("Failed to delete %s: ", toRemove.Node.Name)
|
||||
}
|
||||
if readinessMap[toRemove.Node.Name] {
|
||||
metrics.RegisterScaleDown(1, metrics.Underutilized)
|
||||
} else {
|
||||
metrics.RegisterScaleDown(1, metrics.Unready)
|
||||
}
|
||||
|
||||
return ScaleDownNodeDeleted, nil
|
||||
// Starting deletion.
|
||||
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
|
||||
sd.nodeDeleteStatus.SetDeleteInProgress(true)
|
||||
|
||||
go func() {
|
||||
// Finishing the delete probess once this goroutine is over.
|
||||
defer sd.nodeDeleteStatus.SetDeleteInProgress(false)
|
||||
err := deleteNode(sd.context, toRemove.Node, toRemove.PodsToReschedule)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
|
||||
return
|
||||
}
|
||||
if readinessMap[toRemove.Node.Name] {
|
||||
metrics.RegisterScaleDown(1, metrics.Underutilized)
|
||||
} else {
|
||||
metrics.RegisterScaleDown(1, metrics.Unready)
|
||||
}
|
||||
}()
|
||||
|
||||
return ScaleDownNodeDeleteStarted, nil
|
||||
}
|
||||
|
||||
// updateScaleDownMetrics registers duration of different parts of scale down.
|
||||
|
|
|
@ -315,12 +315,22 @@ func TestScaleDown(t *testing.T) {
|
|||
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
|
||||
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, time.Now().Add(-5*time.Minute), nil)
|
||||
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, nil)
|
||||
waitForDeleteToFinish(t, scaleDown)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, ScaleDownNodeDeleted, result)
|
||||
assert.Equal(t, ScaleDownNodeDeleteStarted, result)
|
||||
assert.Equal(t, n1.Name, getStringFromChan(deletedNodes))
|
||||
assert.Equal(t, n1.Name, getStringFromChan(updatedNodes))
|
||||
}
|
||||
|
||||
func waitForDeleteToFinish(t *testing.T, sd *ScaleDown) {
|
||||
for start := time.Now(); time.Now().Sub(start) < 20*time.Second; time.Sleep(100 * time.Millisecond) {
|
||||
if !sd.nodeDeleteStatus.IsDeleteInProgress() {
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatalf("Node delete not finished")
|
||||
}
|
||||
|
||||
func assertSubset(t *testing.T, a []string, b []string) {
|
||||
for _, x := range a {
|
||||
found := false
|
||||
|
@ -398,6 +408,8 @@ func TestScaleDownEmptyMultipleNodeGroups(t *testing.T) {
|
|||
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
|
||||
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
|
||||
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, nil)
|
||||
waitForDeleteToFinish(t, scaleDown)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, ScaleDownNodeDeleted, result)
|
||||
d1 := getStringFromChan(deletedNodes)
|
||||
|
@ -466,6 +478,8 @@ func TestScaleDownEmptySingleNodeGroup(t *testing.T) {
|
|||
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
|
||||
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, time.Now().Add(-5*time.Minute), nil)
|
||||
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{}, nil)
|
||||
waitForDeleteToFinish(t, scaleDown)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, ScaleDownNodeDeleted, result)
|
||||
d1 := getStringFromChan(deletedNodes)
|
||||
|
@ -529,6 +543,8 @@ func TestNoScaleDownUnready(t *testing.T) {
|
|||
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2},
|
||||
[]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, time.Now().Add(-5*time.Minute), nil)
|
||||
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil)
|
||||
waitForDeleteToFinish(t, scaleDown)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, ScaleDownNoUnneeded, result)
|
||||
|
||||
|
@ -549,8 +565,10 @@ func TestNoScaleDownUnready(t *testing.T) {
|
|||
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2},
|
||||
[]*apiv1.Pod{p2}, time.Now().Add(-2*time.Hour), nil)
|
||||
result, err = scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p2}, nil)
|
||||
waitForDeleteToFinish(t, scaleDown)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, ScaleDownNodeDeleted, result)
|
||||
assert.Equal(t, ScaleDownNodeDeleteStarted, result)
|
||||
assert.Equal(t, n1.Name, getStringFromChan(deletedNodes))
|
||||
}
|
||||
|
||||
|
@ -633,6 +651,8 @@ func TestScaleDownNoMove(t *testing.T) {
|
|||
scaleDown.UpdateUnneededNodes([]*apiv1.Node{n1, n2}, []*apiv1.Node{n1, n2},
|
||||
[]*apiv1.Pod{p1, p2}, time.Now().Add(5*time.Minute), nil)
|
||||
result, err := scaleDown.TryToScaleDown([]*apiv1.Node{n1, n2}, []*apiv1.Pod{p1, p2}, nil)
|
||||
waitForDeleteToFinish(t, scaleDown)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, ScaleDownNoUnneeded, result)
|
||||
}
|
||||
|
|
|
@ -256,7 +256,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
|
|||
// In dry run only utilization is updated
|
||||
calculateUnneededOnly := a.lastScaleUpTime.Add(a.ScaleDownDelay).After(time.Now()) ||
|
||||
a.lastScaleDownFailedTrial.Add(a.ScaleDownTrialInterval).After(time.Now()) ||
|
||||
schedulablePodsPresent
|
||||
schedulablePodsPresent ||
|
||||
scaleDown.nodeDeleteStatus.IsDeleteInProgress()
|
||||
|
||||
glog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "+
|
||||
"lastScaleDownFailedTrail=%s schedulablePodsPresent=%v", calculateUnneededOnly,
|
||||
|
|
|
@ -84,7 +84,7 @@ var (
|
|||
maxNodesTotal = flag.Int("max-nodes-total", 0, "Maximum number of nodes in all node groups. Cluster autoscaler will not grow the cluster beyond this number.")
|
||||
cloudProviderFlag = flag.String("cloud-provider", "gce", "Cloud provider type. Allowed values: gce, aws, kubemark")
|
||||
maxEmptyBulkDeleteFlag = flag.Int("max-empty-bulk-delete", 10, "Maximum number of empty nodes that can be deleted at the same time.")
|
||||
maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.")
|
||||
maxGracefulTerminationFlag = flag.Int("max-graceful-termination-sec", 10*60, "Maximum number of seconds CA waits for pod termination when trying to scale down a node.")
|
||||
maxTotalUnreadyPercentage = flag.Float64("max-total-unready-percentage", 33, "Maximum percentage of unready nodes after which CA halts operations")
|
||||
okTotalUnreadyCount = flag.Int("ok-total-unready-count", 3, "Number of allowed unready nodes, irrespective of max-total-unready-percentage")
|
||||
maxNodeProvisionTime = flag.Duration("max-node-provision-time", 15*time.Minute, "Maximum time CA waits for node to be provisioned")
|
||||
|
|
Loading…
Reference in New Issue