fix leaking taints in case of cloud provider error on node deletion
This commit is contained in:
parent
67d0c147a7
commit
4c31a57374
|
|
@ -638,11 +638,42 @@ func (sd *ScaleDown) waitForEmptyNodesDeleted(emptyNodes []*apiv1.Node, confirma
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) errors.AutoscalerError {
|
func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) errors.AutoscalerError {
|
||||||
if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGracefulTerminationSec,
|
deleteSuccessful := false
|
||||||
MaxPodEvictionTime, EvictionRetryTime); err != nil {
|
drainSuccessful := false
|
||||||
|
|
||||||
|
if err := deletetaint.MarkToBeDeleted(node, context.ClientSet); err != nil {
|
||||||
|
context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
|
||||||
|
return errors.ToAutoscalerError(errors.ApiCallError, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we fail to evict all the pods from the node we want to remove delete taint
|
||||||
|
defer func() {
|
||||||
|
if !deleteSuccessful {
|
||||||
|
deletetaint.CleanToBeDeleted(node, context.ClientSet)
|
||||||
|
if !drainSuccessful {
|
||||||
|
context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain the node, aborting ScaleDown")
|
||||||
|
} else {
|
||||||
|
context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete the node")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
context.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "marked the node as toBeDeleted/unschedulable")
|
||||||
|
|
||||||
|
// attempt drain
|
||||||
|
if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGracefulTerminationSec, MaxPodEvictionTime, EvictionRetryTime); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return deleteNodeFromCloudProvider(node, context.CloudProvider, context.Recorder, context.ClusterStateRegistry)
|
drainSuccessful = true
|
||||||
|
|
||||||
|
// attempt delete from cloud provider
|
||||||
|
err := deleteNodeFromCloudProvider(node, context.CloudProvider, context.Recorder, context.ClusterStateRegistry)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteSuccessful = true // Let the deferred function know there is no need to cleanup
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func evictPod(podToEvict *apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
|
func evictPod(podToEvict *apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
|
||||||
|
|
@ -685,23 +716,7 @@ func evictPod(podToEvict *apiv1.Pod, client kube_client.Interface, recorder kube
|
||||||
func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
|
func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
|
||||||
maxGracefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) errors.AutoscalerError {
|
maxGracefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) errors.AutoscalerError {
|
||||||
|
|
||||||
drainSuccessful := false
|
|
||||||
toEvict := len(pods)
|
toEvict := len(pods)
|
||||||
if err := deletetaint.MarkToBeDeleted(node, client); err != nil {
|
|
||||||
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
|
|
||||||
return errors.ToAutoscalerError(errors.ApiCallError, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we fail to evict all the pods from the node we want to remove delete taint
|
|
||||||
defer func() {
|
|
||||||
if !drainSuccessful {
|
|
||||||
deletetaint.CleanToBeDeleted(node, client)
|
|
||||||
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain the node, aborting ScaleDown")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "marked the node as toBeDeleted/unschedulable")
|
|
||||||
|
|
||||||
retryUntil := time.Now().Add(maxPodEvictionTime)
|
retryUntil := time.Now().Add(maxPodEvictionTime)
|
||||||
confirmations := make(chan error, toEvict)
|
confirmations := make(chan error, toEvict)
|
||||||
for _, pod := range pods {
|
for _, pod := range pods {
|
||||||
|
|
@ -749,7 +764,6 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
|
||||||
if allGone {
|
if allGone {
|
||||||
glog.V(1).Infof("All pods removed from %s", node.Name)
|
glog.V(1).Infof("All pods removed from %s", node.Name)
|
||||||
// Let the deferred function know there is no need for cleanup
|
// Let the deferred function know there is no need for cleanup
|
||||||
drainSuccessful = true
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -334,9 +334,169 @@ func TestFindUnneededNodePool(t *testing.T) {
|
||||||
assert.NotEmpty(t, sd.unneededNodes)
|
assert.NotEmpty(t, sd.unneededNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteNode(t *testing.T) {
|
||||||
|
// common parameters
|
||||||
|
nothingReturned := "Nothing returned"
|
||||||
|
nodeDeleteFailedFunc :=
|
||||||
|
func(string, string) error {
|
||||||
|
return fmt.Errorf("won't remove node")
|
||||||
|
}
|
||||||
|
podNotFoundFunc :=
|
||||||
|
func(action core.Action) (bool, runtime.Object, error) {
|
||||||
|
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
|
||||||
|
}
|
||||||
|
|
||||||
|
// scenarios
|
||||||
|
testScenarios := []struct {
|
||||||
|
name string
|
||||||
|
pods []string
|
||||||
|
drainSuccess bool
|
||||||
|
nodeDeleteSuccess bool
|
||||||
|
expectedDeletion bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "successful attempt to delete node with pods",
|
||||||
|
pods: []string{"p1", "p2"},
|
||||||
|
drainSuccess: true,
|
||||||
|
nodeDeleteSuccess: true,
|
||||||
|
expectedDeletion: true,
|
||||||
|
},
|
||||||
|
/* Temporarily disabled as it takes several minutes due to hardcoded timeout.
|
||||||
|
* TODO(aleksandra-malinowska): move MaxPodEvictionTime to AutoscalingContext.
|
||||||
|
{
|
||||||
|
name: "failed on drain",
|
||||||
|
pods: []string{"p1", "p2"},
|
||||||
|
drainSuccess: false,
|
||||||
|
nodeDeleteSuccess: true,
|
||||||
|
expectedDeletion: false,
|
||||||
|
},
|
||||||
|
*/
|
||||||
|
{
|
||||||
|
name: "failed on node delete",
|
||||||
|
pods: []string{"p1", "p2"},
|
||||||
|
drainSuccess: true,
|
||||||
|
nodeDeleteSuccess: false,
|
||||||
|
expectedDeletion: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "successful attempt to delete empty node",
|
||||||
|
pods: []string{},
|
||||||
|
drainSuccess: true,
|
||||||
|
nodeDeleteSuccess: true,
|
||||||
|
expectedDeletion: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "failed attempt to delete empty node",
|
||||||
|
pods: []string{},
|
||||||
|
drainSuccess: true,
|
||||||
|
nodeDeleteSuccess: false,
|
||||||
|
expectedDeletion: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, scenario := range testScenarios {
|
||||||
|
// run each scenario as an independent test
|
||||||
|
t.Run(scenario.name, func(t *testing.T) {
|
||||||
|
// set up test channels
|
||||||
|
updatedNodes := make(chan string, 10)
|
||||||
|
deletedNodes := make(chan string, 10)
|
||||||
|
deletedPods := make(chan string, 10)
|
||||||
|
|
||||||
|
// set up test data
|
||||||
|
n1 := BuildTestNode("n1", 1000, 1000)
|
||||||
|
SetNodeReadyState(n1, true, time.Time{})
|
||||||
|
pods := make([]*apiv1.Pod, len(scenario.pods))
|
||||||
|
for i, podName := range scenario.pods {
|
||||||
|
pod := BuildTestPod(podName, 100, 0)
|
||||||
|
pods[i] = pod
|
||||||
|
}
|
||||||
|
|
||||||
|
// set up fake provider
|
||||||
|
deleteNodeHandler := nodeDeleteFailedFunc
|
||||||
|
if scenario.nodeDeleteSuccess {
|
||||||
|
deleteNodeHandler =
|
||||||
|
func(nodeGroup string, node string) error {
|
||||||
|
deletedNodes <- node
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
provider := testprovider.NewTestCloudProvider(nil, deleteNodeHandler)
|
||||||
|
provider.AddNodeGroup("ng1", 1, 100, 100)
|
||||||
|
provider.AddNode("ng1", n1)
|
||||||
|
|
||||||
|
// set up fake client
|
||||||
|
fakeClient := &fake.Clientset{}
|
||||||
|
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
|
return true, n1, nil
|
||||||
|
})
|
||||||
|
fakeClient.Fake.AddReactor("update", "nodes",
|
||||||
|
func(action core.Action) (bool, runtime.Object, error) {
|
||||||
|
update := action.(core.UpdateAction)
|
||||||
|
obj := update.GetObject().(*apiv1.Node)
|
||||||
|
taints := make([]string, 0, len(obj.Spec.Taints))
|
||||||
|
for _, taint := range obj.Spec.Taints {
|
||||||
|
taints = append(taints, taint.Key)
|
||||||
|
}
|
||||||
|
updatedNodes <- fmt.Sprintf("%s-%s", obj.Name, taints)
|
||||||
|
return true, obj, nil
|
||||||
|
})
|
||||||
|
fakeClient.Fake.AddReactor("create", "pods",
|
||||||
|
func(action core.Action) (bool, runtime.Object, error) {
|
||||||
|
if !scenario.drainSuccess {
|
||||||
|
return true, nil, fmt.Errorf("won't evict")
|
||||||
|
}
|
||||||
|
createAction := action.(core.CreateAction)
|
||||||
|
if createAction == nil {
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
eviction := createAction.GetObject().(*policyv1.Eviction)
|
||||||
|
if eviction == nil {
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
deletedPods <- eviction.Name
|
||||||
|
return true, nil, nil
|
||||||
|
})
|
||||||
|
fakeClient.Fake.AddReactor("get", "pods", podNotFoundFunc)
|
||||||
|
|
||||||
|
// set up fake recorders
|
||||||
|
fakeRecorder := kube_util.CreateEventRecorder(fakeClient)
|
||||||
|
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", fakeRecorder, false)
|
||||||
|
|
||||||
|
// build context
|
||||||
|
context := &AutoscalingContext{
|
||||||
|
AutoscalingOptions: AutoscalingOptions{},
|
||||||
|
ClientSet: fakeClient,
|
||||||
|
Recorder: fakeRecorder,
|
||||||
|
LogRecorder: fakeLogRecorder,
|
||||||
|
CloudProvider: provider,
|
||||||
|
ClusterStateRegistry: clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, fakeLogRecorder),
|
||||||
|
}
|
||||||
|
|
||||||
|
// attempt delete
|
||||||
|
err := deleteNode(context, n1, pods)
|
||||||
|
|
||||||
|
// verify
|
||||||
|
if scenario.expectedDeletion {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, n1.Name, getStringFromChan(deletedNodes))
|
||||||
|
} else {
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
}
|
||||||
|
assert.Equal(t, nothingReturned, getStringFromChanImmediately(deletedNodes))
|
||||||
|
|
||||||
|
taintedUpdate := fmt.Sprintf("%s-%s", n1.Name, []string{deletetaint.ToBeDeletedTaint})
|
||||||
|
assert.Equal(t, taintedUpdate, getStringFromChan(updatedNodes))
|
||||||
|
if !scenario.expectedDeletion {
|
||||||
|
untaintedUpdate := fmt.Sprintf("%s-%s", n1.Name, []string{})
|
||||||
|
assert.Equal(t, untaintedUpdate, getStringFromChan(updatedNodes))
|
||||||
|
}
|
||||||
|
assert.Equal(t, nothingReturned, getStringFromChanImmediately(updatedNodes))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDrainNode(t *testing.T) {
|
func TestDrainNode(t *testing.T) {
|
||||||
deletedPods := make(chan string, 10)
|
deletedPods := make(chan string, 10)
|
||||||
updatedNodes := make(chan string, 10)
|
|
||||||
fakeClient := &fake.Clientset{}
|
fakeClient := &fake.Clientset{}
|
||||||
|
|
||||||
p1 := BuildTestPod("p1", 100, 0)
|
p1 := BuildTestPod("p1", 100, 0)
|
||||||
|
|
@ -347,9 +507,6 @@ func TestDrainNode(t *testing.T) {
|
||||||
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
|
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
|
||||||
})
|
})
|
||||||
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
|
|
||||||
return true, n1, nil
|
|
||||||
})
|
|
||||||
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
createAction := action.(core.CreateAction)
|
createAction := action.(core.CreateAction)
|
||||||
if createAction == nil {
|
if createAction == nil {
|
||||||
|
|
@ -362,12 +519,6 @@ func TestDrainNode(t *testing.T) {
|
||||||
deletedPods <- eviction.Name
|
deletedPods <- eviction.Name
|
||||||
return true, nil, nil
|
return true, nil, nil
|
||||||
})
|
})
|
||||||
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
|
|
||||||
update := action.(core.UpdateAction)
|
|
||||||
obj := update.GetObject().(*apiv1.Node)
|
|
||||||
updatedNodes <- obj.Name
|
|
||||||
return true, obj, nil
|
|
||||||
})
|
|
||||||
err := drainNode(n1, []*apiv1.Pod{p1, p2}, fakeClient, kube_util.CreateEventRecorder(fakeClient), 20, 5*time.Second, 0*time.Second)
|
err := drainNode(n1, []*apiv1.Pod{p1, p2}, fakeClient, kube_util.CreateEventRecorder(fakeClient), 20, 5*time.Second, 0*time.Second)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
deleted := make([]string, 0)
|
deleted := make([]string, 0)
|
||||||
|
|
@ -376,12 +527,10 @@ func TestDrainNode(t *testing.T) {
|
||||||
sort.Strings(deleted)
|
sort.Strings(deleted)
|
||||||
assert.Equal(t, p1.Name, deleted[0])
|
assert.Equal(t, p1.Name, deleted[0])
|
||||||
assert.Equal(t, p2.Name, deleted[1])
|
assert.Equal(t, p2.Name, deleted[1])
|
||||||
assert.Equal(t, n1.Name, getStringFromChan(updatedNodes))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDrainNodeWithRetries(t *testing.T) {
|
func TestDrainNodeWithRetries(t *testing.T) {
|
||||||
deletedPods := make(chan string, 10)
|
deletedPods := make(chan string, 10)
|
||||||
updatedNodes := make(chan string, 10)
|
|
||||||
// Simulate pdb of size 1, by making them goroutine succeed sequentially
|
// Simulate pdb of size 1, by making them goroutine succeed sequentially
|
||||||
// and fail/retry before they can proceed.
|
// and fail/retry before they can proceed.
|
||||||
ticket := make(chan bool, 1)
|
ticket := make(chan bool, 1)
|
||||||
|
|
@ -396,9 +545,6 @@ func TestDrainNodeWithRetries(t *testing.T) {
|
||||||
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
fakeClient.Fake.AddReactor("get", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
|
return true, nil, errors.NewNotFound(apiv1.Resource("pod"), "whatever")
|
||||||
})
|
})
|
||||||
fakeClient.Fake.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
|
|
||||||
return true, n1, nil
|
|
||||||
})
|
|
||||||
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
fakeClient.Fake.AddReactor("create", "pods", func(action core.Action) (bool, runtime.Object, error) {
|
||||||
createAction := action.(core.CreateAction)
|
createAction := action.(core.CreateAction)
|
||||||
if createAction == nil {
|
if createAction == nil {
|
||||||
|
|
@ -420,12 +566,6 @@ func TestDrainNodeWithRetries(t *testing.T) {
|
||||||
return true, nil, fmt.Errorf("Too many concurrent evictions")
|
return true, nil, fmt.Errorf("Too many concurrent evictions")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
fakeClient.Fake.AddReactor("update", "nodes", func(action core.Action) (bool, runtime.Object, error) {
|
|
||||||
update := action.(core.UpdateAction)
|
|
||||||
obj := update.GetObject().(*apiv1.Node)
|
|
||||||
updatedNodes <- obj.Name
|
|
||||||
return true, obj, nil
|
|
||||||
})
|
|
||||||
err := drainNode(n1, []*apiv1.Pod{p1, p2, p3}, fakeClient, kube_util.CreateEventRecorder(fakeClient), 20, 5*time.Second, 0*time.Second)
|
err := drainNode(n1, []*apiv1.Pod{p1, p2, p3}, fakeClient, kube_util.CreateEventRecorder(fakeClient), 20, 5*time.Second, 0*time.Second)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
deleted := make([]string, 0)
|
deleted := make([]string, 0)
|
||||||
|
|
@ -436,7 +576,6 @@ func TestDrainNodeWithRetries(t *testing.T) {
|
||||||
assert.Equal(t, p1.Name, deleted[0])
|
assert.Equal(t, p1.Name, deleted[0])
|
||||||
assert.Equal(t, p2.Name, deleted[1])
|
assert.Equal(t, p2.Name, deleted[1])
|
||||||
assert.Equal(t, p3.Name, deleted[2])
|
assert.Equal(t, p3.Name, deleted[2])
|
||||||
assert.Equal(t, n1.Name, getStringFromChan(updatedNodes))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScaleDown(t *testing.T) {
|
func TestScaleDown(t *testing.T) {
|
||||||
|
|
@ -694,20 +833,15 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) {
|
||||||
assert.Equal(t, ScaleDownNodeDeleted, result)
|
assert.Equal(t, ScaleDownNodeDeleted, result)
|
||||||
|
|
||||||
// Check the channel (and make sure there isn't more than there should be).
|
// Check the channel (and make sure there isn't more than there should be).
|
||||||
|
// Report only up to 10 extra nodes found.
|
||||||
deleted := make([]string, 0, len(config.expectedScaleDowns)+10)
|
deleted := make([]string, 0, len(config.expectedScaleDowns)+10)
|
||||||
empty := false
|
for i := 0; i < len(config.expectedScaleDowns)+10; i++ {
|
||||||
for i := 0; i < len(config.expectedScaleDowns)+10 && !empty; i++ {
|
d := getStringFromChanImmediately(deletedNodes)
|
||||||
select {
|
|
||||||
case d := <-deletedNodes:
|
|
||||||
if d == "" { // a closed channel yields empty value
|
if d == "" { // a closed channel yields empty value
|
||||||
empty = true
|
break
|
||||||
} else {
|
}
|
||||||
deleted = append(deleted, d)
|
deleted = append(deleted, d)
|
||||||
}
|
}
|
||||||
default:
|
|
||||||
empty = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEqualSet(t, config.expectedScaleDowns, deleted)
|
assertEqualSet(t, config.expectedScaleDowns, deleted)
|
||||||
}
|
}
|
||||||
|
|
@ -884,7 +1018,16 @@ func getStringFromChan(c chan string) string {
|
||||||
select {
|
select {
|
||||||
case val := <-c:
|
case val := <-c:
|
||||||
return val
|
return val
|
||||||
case <-time.After(time.Second * 10):
|
case <-time.After(10 * time.Second):
|
||||||
|
return "Nothing returned"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getStringFromChanImmediately(c chan string) string {
|
||||||
|
select {
|
||||||
|
case val := <-c:
|
||||||
|
return val
|
||||||
|
default:
|
||||||
return "Nothing returned"
|
return "Nothing returned"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue