kubectl drain: avoid leaking goroutines

Recently, kubectl drain has been refactored to be consumeable
as a library.  Currently, if a pod cannot be evicted due
to PDBs, we will leak a goroutine.

This commit ensures the goroutine always exists.

Related-bug: https://github.com/kubernetes/kubernetes/issues/81333

Kubernetes-commit: 0e8998a040efe5dd82592911e739fe2e2258cfa3
This commit is contained in:
Michael Gugino 2019-10-15 15:24:27 -04:00 committed by Kubernetes Publisher
parent a4e4b8e16b
commit e09feee202
2 changed files with 32 additions and 16 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package drain
import (
"context"
"fmt"
"io"
"math"
@ -184,7 +185,6 @@ func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error {
getPodFn := func(namespace, name string) (*corev1.Pod, error) {
return d.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
if len(policyGroupVersion) > 0 {
return d.evictPods(pods, policyGroupVersion, getPodFn)
}
@ -194,11 +194,26 @@ func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error {
func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
returnCh := make(chan error, 1)
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if d.Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = d.Timeout
}
ctx, cancel := context.WithTimeout(context.TODO(), globalTimeout)
defer cancel()
for _, pod := range pods {
go func(pod corev1.Pod, returnCh chan error) {
for {
fmt.Fprintf(d.Out, "evicting pod %q\n", pod.Name)
select {
case <-ctx.Done():
// return here or we'll leak a goroutine.
returnCh <- fmt.Errorf("error when evicting pod %q: global timeout", pod.Name)
return
default:
}
err := d.EvictPod(pod, policyGroupVersion)
if err == nil {
break
@ -213,7 +228,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF
return
}
}
_, err := waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted)
_, err := waitForDelete(ctx, []corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted)
if err == nil {
returnCh <- nil
} else {
@ -225,14 +240,6 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF
doneCount := 0
var errors []error
// 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration
if d.Timeout == 0 {
globalTimeout = time.Duration(math.MaxInt64)
} else {
globalTimeout = d.Timeout
}
globalTimeoutCh := time.After(globalTimeout)
numPods := len(pods)
for doneCount < numPods {
select {
@ -241,10 +248,10 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF
if err != nil {
errors = append(errors, err)
}
case <-globalTimeoutCh:
return fmt.Errorf("drain did not complete within %v", globalTimeout)
default:
}
}
return utilerrors.NewAggregate(errors)
}
@ -262,11 +269,12 @@ func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name str
return err
}
}
_, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted)
ctx := context.TODO()
_, err := waitForDelete(ctx, pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted)
return err
}
func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) {
func waitForDelete(ctx context.Context, pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pendingPods := []corev1.Pod{}
for i, pod := range pods {
@ -284,6 +292,12 @@ func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEvic
}
pods = pendingPods
if len(pendingPods) > 0 {
select {
case <-ctx.Done():
return false, fmt.Errorf("global timeout")
default:
return false, nil
}
return false, nil
}
return true, nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package drain
import (
"context"
"errors"
"fmt"
"os"
@ -105,7 +106,8 @@ func TestDeletePods(t *testing.T) {
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
_, pods := createPods(false)
pendingPods, err := waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn, nil)
ctx := context.TODO()
pendingPods, err := waitForDelete(ctx, pods, test.interval, test.timeout, false, test.getPodFn, nil)
if test.expectError {
if err == nil {